Simplify ComicCacher to store a single binary data field and ID(s)

If the ComicCacher is to be a generic cache for talkers it must assume
 very little. Current assumptions:
 - There are issues that can be queried individually by an "Issue ID" and they have a relation to a single series
 - There are series that can be queried individually by an "Series ID" and they have a relation to zero or more issues
 - There are Searches that can be queried by the search term and they have a relation to zero or more series

Each series and issue have a boolean `complete` attribute which is up to the talker to decide what it means.
Data is returned as a tuple ([series, complete] or [issue, complete]) or a list of tuples
An issue consists of an ID, an series ID and a binary data attribute which is up to the talker to determine what it means.
An series consists of in ID and a binary data attribute which is up to the talker to determine what it means.

The data attribute is binary to allow for compression and efficient storage of binary data (e.g. pickle) it is suggested to store it as json or similar text format encoded with utf-8. If the talker is using a website API it is suggested to store the raw response from the server.

All caches automatically expire 7 days after insertion.
This commit is contained in:
Timmy Welch 2023-08-05 03:02:12 -07:00
parent ea84031b87
commit f72ebdb149
6 changed files with 216 additions and 371 deletions

View File

@ -16,19 +16,28 @@
from __future__ import annotations
import datetime
import json
import logging
import os
import pathlib
import sqlite3
from typing import Any, cast
from typing import Any
from comicapi import utils
from comicapi.genericmetadata import ComicSeries, Credit, GenericMetadata, TagOrigin
from typing_extensions import NamedTuple
logger = logging.getLogger(__name__)
class Series(NamedTuple):
id: str
data: bytes
class Issue(NamedTuple):
id: str
series_id: str
data: bytes
class ComicCacher:
def __init__(self, cache_folder: pathlib.Path, version: str) -> None:
self.cache_folder = cache_folder
@ -74,70 +83,43 @@ class ComicCacher:
# create tables
with con:
cur = con.cursor()
# source,name,id,start_year,publisher,image,description,count_of_issues
cur.execute(
"CREATE TABLE SeriesSearchCache("
+ "timestamp DATE DEFAULT (datetime('now','localtime')),"
+ "id TEXT NOT NULL,"
+ "source TEXT NOT NULL,"
+ "search_term TEXT,"
+ "PRIMARY KEY (id, source, search_term))"
"""CREATE TABLE SeriesSearchCache(
timestamp DATE DEFAULT (datetime('now','localtime')),
id TEXT NOT NULL,
source TEXT NOT NULL,
search_term TEXT,
PRIMARY KEY (id, source, search_term))"""
)
cur.execute("CREATE TABLE Source(" + "id TEXT NOT NULL," + "name TEXT NOT NULL," + "PRIMARY KEY (id))")
cur.execute("CREATE TABLE Source(id TEXT NOT NULL, name TEXT NOT NULL, PRIMARY KEY (id))")
cur.execute(
"CREATE TABLE Series("
+ "timestamp DATE DEFAULT (datetime('now','localtime')), "
+ "id TEXT NOT NULL,"
+ "source TEXT NOT NULL,"
+ "name TEXT,"
+ "publisher TEXT,"
+ "count_of_issues INT,"
+ "count_of_volumes INT,"
+ "start_year INT,"
+ "image_url TEXT,"
+ "aliases TEXT," # Newline separated
+ "description TEXT,"
+ "genres TEXT," # Newline separated. For filtering etc.
+ "format TEXT,"
+ "PRIMARY KEY (id, source))"
"""CREATE TABLE Series(
timestamp DATE DEFAULT (datetime('now','localtime')),
id TEXT NOT NULL,
source TEXT NOT NULL,
data BLOB,
complete BOOL,
PRIMARY KEY (id, source))"""
)
cur.execute(
"CREATE TABLE Issues("
+ "timestamp DATE DEFAULT (datetime('now','localtime')), "
+ "id TEXT NOT NULL,"
+ "source TEXT NOT NULL,"
+ "series_id TEXT,"
+ "name TEXT,"
+ "issue_number TEXT,"
+ "image_url TEXT,"
+ "thumb_url TEXT,"
+ "cover_date TEXT,"
+ "site_detail_url TEXT,"
+ "description TEXT,"
+ "aliases TEXT," # Newline separated
+ "alt_image_urls TEXT," # Newline separated URLs
+ "characters TEXT," # Newline separated
+ "locations TEXT," # Newline separated
+ "credits TEXT," # JSON: "{"name": "Bob Shakespeare", "role": "Writer"}"
+ "teams TEXT," # Newline separated
+ "story_arcs TEXT," # Newline separated
+ "genres TEXT," # Newline separated
+ "tags TEXT," # Newline separated
+ "critical_rating FLOAT,"
+ "manga TEXT," # Yes/YesAndRightToLeft/No
+ "maturity_rating TEXT,"
+ "language TEXT,"
+ "country TEXT,"
+ "volume TEXT,"
+ "complete BOOL," # Is the data complete? Includes characters, locations, credits.
+ "PRIMARY KEY (id, source))"
"""CREATE TABLE Issues(
timestamp DATE DEFAULT (datetime('now','localtime')),
id TEXT NOT NULL,
source TEXT NOT NULL,
series_id TEXT,
data BLOB,
complete BOOL,
PRIMARY KEY (id, source))"""
)
def add_search_results(self, source: TagOrigin, search_term: str, series_list: list[ComicSeries]) -> None:
self.add_source(source)
def expire_stale_records(self, cur: sqlite3.Cursor, table: str) -> None:
# purge stale series info
a_week_ago = datetime.datetime.today() - datetime.timedelta(days=7)
cur.execute("DELETE FROM Series WHERE timestamp < ?", [str(a_week_ago)])
def add_search_results(self, source: str, search_term: str, series_list: list[Series], complete: bool) -> None:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
con.text_factory = str
@ -146,153 +128,80 @@ class ComicCacher:
# remove all previous entries with this search term
cur.execute(
"DELETE FROM SeriesSearchCache WHERE search_term = ? AND source = ?",
[search_term.casefold(), source.id],
[search_term.casefold(), source],
)
# now add in new results
for record in series_list:
for series in series_list:
cur.execute(
"INSERT INTO SeriesSearchCache (source, search_term, id) VALUES(?, ?, ?)",
(source.id, search_term.casefold(), record.id),
(source, search_term.casefold(), series.id),
)
data = {
"id": record.id,
"source": source.id,
"name": record.name,
"publisher": record.publisher,
"count_of_issues": record.count_of_issues,
"count_of_volumes": record.count_of_volumes,
"start_year": record.start_year,
"image_url": record.image_url,
"description": record.description,
"genres": "\n".join(record.genres),
"format": record.format,
"timestamp": datetime.datetime.now(),
"aliases": "\n".join(record.aliases),
"id": series.id,
"source": source,
"data": series.data,
"complete": complete,
}
self.upsert(cur, "series", data)
def add_series_info(self, source: TagOrigin, series: ComicSeries) -> None:
self.add_source(source)
def add_series_info(self, source: str, series: Series, complete: bool) -> None:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
timestamp = datetime.datetime.now()
data = {
"id": series.id,
"source": source.id,
"name": series.name,
"publisher": series.publisher,
"count_of_issues": series.count_of_issues,
"count_of_volumes": series.count_of_volumes,
"start_year": series.start_year,
"image_url": series.image_url,
"description": series.description,
"genres": "\n".join(series.genres),
"format": series.format,
"timestamp": timestamp,
"aliases": "\n".join(series.aliases),
"source": source,
"data": series.data,
"complete": complete,
}
self.upsert(cur, "series", data)
def add_series_issues_info(self, source: TagOrigin, issues: list[GenericMetadata], complete: bool) -> None:
self.add_source(source)
def add_issues_info(self, source: str, issues: list[Issue], complete: bool) -> None:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
timestamp = datetime.datetime.now()
# add in issues
for issue in issues:
data = {
"id": issue.issue_id,
"id": issue.id,
"series_id": issue.series_id,
"source": source.id,
"name": issue.title,
"issue_number": issue.issue,
"volume": issue.volume,
"site_detail_url": issue.web_link,
"cover_date": f"{issue.year}-{issue.month}-{issue.day}",
"image_url": issue.cover_image,
"description": issue.description,
"timestamp": timestamp,
"aliases": "\n".join(issue.title_aliases),
"alt_image_urls": "\n".join(issue.alternate_images),
"characters": "\n".join(issue.characters),
"locations": "\n".join(issue.locations),
"teams": "\n".join(issue.teams),
"story_arcs": "\n".join(issue.story_arcs),
"genres": "\n".join(issue.genres),
"tags": "\n".join(issue.tags),
"critical_rating": issue.critical_rating,
"manga": issue.manga,
"maturity_rating": issue.maturity_rating,
"language": issue.language,
"country": issue.country,
"credits": json.dumps(issue.credits),
"data": issue.data,
"source": source,
"complete": complete,
}
self.upsert(cur, "issues", data)
def add_source(self, source: TagOrigin) -> None:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
con.text_factory = str
self.upsert(
cur,
"source",
{
"id": source.id,
"name": source.name,
},
)
def get_search_results(self, source: TagOrigin, search_term: str) -> list[ComicSeries]:
def get_search_results(self, source: str, search_term: str, expire_stale: bool = True) -> list[tuple[Series, bool]]:
results = []
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
con.text_factory = str
cur = con.cursor()
if expire_stale:
self.expire_stale_records(cur, "SeriesSearchCache")
self.expire_stale_records(cur, "Series")
cur.execute(
"SELECT * FROM SeriesSearchCache INNER JOIN Series on"
+ " SeriesSearchCache.id=Series.id AND SeriesSearchCache.source=Series.source"
+ " WHERE search_term=? AND SeriesSearchCache.source=?",
[search_term.casefold(), source.id],
"""SELECT * FROM SeriesSearchCache INNER JOIN Series on
SeriesSearchCache.id=Series.id AND SeriesSearchCache.source=Series.source
WHERE search_term=? AND SeriesSearchCache.source=?""",
[search_term.casefold(), source],
)
rows = cur.fetchall()
# now process the results
for record in rows:
result = ComicSeries(
id=record["id"],
name=record["name"],
publisher=record["publisher"],
count_of_issues=record["count_of_issues"],
count_of_volumes=record["count_of_volumes"],
start_year=record["start_year"],
image_url=record["image_url"],
aliases=utils.split(record["aliases"], "\n"),
description=record["description"],
genres=utils.split(record["genres"], "\n"),
format=record["format"],
)
results.append(result)
for record in rows:
result = Series(id=record["id"], data=record["data"])
results.append((result, record["complete"]))
return results
def get_series_info(self, series_id: str, source: TagOrigin, expire_stale: bool = True) -> ComicSeries | None:
result: ComicSeries | None = None
def get_series_info(self, series_id: str, source: str, expire_stale: bool = True) -> tuple[Series, bool] | None:
result: Series | None = None
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
@ -300,170 +209,64 @@ class ComicCacher:
con.text_factory = str
if expire_stale:
# purge stale series info
a_week_ago = datetime.datetime.today() - datetime.timedelta(days=7)
cur.execute("DELETE FROM Series WHERE timestamp < ?", [str(a_week_ago)])
self.expire_stale_records(cur, "Series")
# fetch
cur.execute("SELECT * FROM Series WHERE id=? AND source=?", [series_id, source.id])
cur.execute("SELECT * FROM Series WHERE id=? AND source=?", [series_id, source])
row = cur.fetchone()
if row is None:
return result
return None
# since ID is primary key, there is only one row
result = ComicSeries(
id=row["id"],
name=row["name"],
publisher=row["publisher"],
count_of_issues=row["count_of_issues"],
count_of_volumes=row["count_of_volumes"],
start_year=row["start_year"],
image_url=row["image_url"],
aliases=utils.split(row["aliases"], "\n"),
description=row["description"],
genres=utils.split(row["genres"], "\n"),
format=row["format"],
)
result = Series(id=row["id"], data=row["data"])
return result
def get_series_issues_info(self, series_id: str, source: TagOrigin) -> list[tuple[GenericMetadata, bool]]:
# get_series_info should only fail if someone is doing something weird
series = self.get_series_info(series_id, source, False) or ComicSeries(
id=series_id,
name="",
description="",
genres=[],
image_url="",
publisher="",
start_year=None,
aliases=[],
count_of_issues=None,
count_of_volumes=None,
format=None,
)
return (result, row["complete"])
def get_series_issues_info(
self, series_id: str, source: str, expire_stale: bool = True
) -> list[tuple[Issue, bool]]:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
con.text_factory = str
# purge stale issue info - probably issue data won't change
# much....
a_week_ago = datetime.datetime.today() - datetime.timedelta(days=7)
cur.execute("DELETE FROM Issues WHERE timestamp < ?", [str(a_week_ago)])
if expire_stale:
self.expire_stale_records(cur, "Issues")
# fetch
results: list[tuple[GenericMetadata, bool]] = []
results: list[tuple[Issue, bool]] = []
cur.execute("SELECT * FROM Issues WHERE series_id=? AND source=?", [series_id, source.id])
cur.execute("SELECT * FROM Issues WHERE series_id=? AND source=?", [series_id, source])
rows = cur.fetchall()
# now process the results
for row in rows:
record = self.map_row_metadata(row, series, source)
record = (Issue(id=row["id"], series_id=row["series_id"], data=row["data"]), row["complete"])
results.append(record)
return results
def get_issue_info(self, issue_id: int, source: TagOrigin) -> tuple[GenericMetadata, bool] | None:
def get_issue_info(self, issue_id: int, source: str, expire_stale: bool = True) -> tuple[Issue, bool] | None:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
con.text_factory = str
# purge stale issue info - probably issue data won't change
# much....
a_week_ago = datetime.datetime.today() - datetime.timedelta(days=7)
cur.execute("DELETE FROM Issues WHERE timestamp < ?", [str(a_week_ago)])
if expire_stale:
self.expire_stale_records(cur, "Issues")
cur.execute("SELECT * FROM Issues WHERE id=? AND source=?", [issue_id, source.id])
cur.execute("SELECT * FROM Issues WHERE id=? AND source=?", [issue_id, source])
row = cur.fetchone()
record = None
if row:
# get_series_info should only fail if someone is doing something weird
series = self.get_series_info(row["id"], source, False) or ComicSeries(
id=row["id"],
name="",
description="",
genres=[],
image_url="",
publisher="",
start_year=None,
aliases=[],
count_of_issues=None,
count_of_volumes=None,
format=None,
)
record = self.map_row_metadata(row, series, source)
record = (Issue(id=row["id"], series_id=row["series_id"], data=row["data"]), row["complete"])
return record
def get_source(self, source_id: str) -> TagOrigin:
con = sqlite3.connect(self.db_file)
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
con.text_factory = str
cur.execute("SELECT * FROM Source WHERE id=?", [source_id])
row = cur.fetchone()
return TagOrigin(row["id"], row["name"])
def map_row_metadata(
self, row: sqlite3.Row, series: ComicSeries, source: TagOrigin
) -> tuple[GenericMetadata, bool]:
day, month, year = utils.parse_date_str(row["cover_date"])
credits = []
try:
for credit in json.loads(row["credits"]):
credits.append(cast(Credit, credit))
except Exception:
logger.exception("credits failed")
return (
GenericMetadata(
tag_origin=source,
alternate_images=utils.split(row["alt_image_urls"], "\n"),
characters=utils.split(row["characters"], "\n"),
country=row["country"],
cover_image=row["image_url"],
credits=credits,
critical_rating=row["critical_rating"],
day=day,
description=row["description"],
genres=utils.split(row["genres"], "\n"),
issue=row["issue_number"],
issue_count=series.count_of_issues,
issue_id=row["id"],
language=row["language"],
locations=utils.split(row["locations"], "\n"),
manga=row["manga"],
maturity_rating=row["maturity_rating"],
month=month,
publisher=series.publisher,
series=series.name,
series_aliases=series.aliases,
series_id=series.id,
story_arcs=utils.split(row["story_arcs"], "\n"),
tags=set(utils.split(row["tags"], "\n")),
teams=utils.split(row["teams"], "\n"),
title=row["name"],
title_aliases=utils.split(row["aliases"], "\n"),
volume=row["volume"],
volume_count=series.count_of_volumes,
web_link=row["site_detail_url"],
year=year,
),
row["complete"],
)
def upsert(self, cur: sqlite3.Cursor, tablename: str, data: dict[str, Any]) -> None:
"""This does an insert if the given PK doesn't exist, and an
update it if does

View File

@ -19,7 +19,7 @@ from typing import Any, Callable
import settngs
from comicapi.genericmetadata import ComicSeries, GenericMetadata, TagOrigin
from comicapi.genericmetadata import ComicSeries, GenericMetadata
from comictalker.talker_utils import fix_url
logger = logging.getLogger(__name__)
@ -107,7 +107,6 @@ class ComicTalker:
name: str = "Example"
id: str = "example"
origin: TagOrigin = TagOrigin(id, name)
website: str = "https://example.com"
logo_url: str = f"{website}/logo.png"
attribution: str = f"Metadata provided by <a href='{website}'>{name}</a>"

View File

@ -33,7 +33,7 @@ from comicapi import utils
from comicapi.genericmetadata import ComicSeries, GenericMetadata, TagOrigin
from comicapi.issuestring import IssueString
from comictalker import talker_utils
from comictalker.comiccacher import ComicCacher
from comictalker.comiccacher import ComicCacher, Issue, Series
from comictalker.comictalker import ComicTalker, TalkerDataError, TalkerNetworkError
logger = logging.getLogger(__name__)
@ -159,7 +159,6 @@ default_limiter = Limiter(RequestRate(1, 5))
class ComicVineTalker(ComicTalker):
name: str = "Comic Vine"
id: str = "comicvine"
origin: TagOrigin = TagOrigin(id, name)
website: str = "https://comicvine.gamespot.com"
logo_url: str = f"{website}/a/bundles/comicvinesite/images/logo.png"
attribution: str = f"Metadata provided by <a href='{website}'>{name}</a>"
@ -244,10 +243,10 @@ class ComicVineTalker(ComicTalker):
# For literal searches always retrieve from online
cvc = ComicCacher(self.cache_folder, self.version)
if not refresh_cache and not literal:
cached_search_results = cvc.get_search_results(self.origin, series_name)
cached_search_results = cvc.get_search_results(self.id, series_name)
if len(cached_search_results) > 0:
return cached_search_results
return self._format_search_results([json.loads(x[0].data) for x in cached_search_results])
params = { # CV uses volume to mean series
"api_key": self.api_key,
@ -317,7 +316,12 @@ class ComicVineTalker(ComicTalker):
# Cache these search results, even if it's literal we cache the results
# The most it will cause is extra processing time
cvc.add_search_results(self.origin, series_name, formatted_search_results)
cvc.add_search_results(
self.id,
series_name,
[Series(id=str(x["id"]), data=json.dumps(x).encode("utf-8")) for x in search_results],
False,
)
return formatted_search_results
@ -333,7 +337,7 @@ class ComicVineTalker(ComicTalker):
return comic_data
def fetch_series(self, series_id: str) -> ComicSeries:
return self._fetch_series_data(int(series_id))
return self._fetch_series_data(int(series_id))[0]
def fetch_issues_in_series(self, series_id: str) -> list[GenericMetadata]:
return [x[0] for x in self._fetch_issues_in_series(series_id)]
@ -378,7 +382,7 @@ class ComicVineTalker(ComicTalker):
current_result_count += cv_response["number_of_page_results"]
formatted_filtered_issues_result = [
self.map_comic_issue_to_metadata(x, self._fetch_series_data(x["volume"]["id"]))
self._map_comic_issue_to_metadata(x, self._fetch_series_data(x["volume"]["id"])[0])
for x in filtered_issues_result
]
@ -443,49 +447,52 @@ class ComicVineTalker(ComicTalker):
def _format_search_results(self, search_results: list[CVSeries]) -> list[ComicSeries]:
formatted_results = []
for record in search_results:
# Flatten publisher to name only
if record.get("publisher") is None:
pub_name = ""
else:
pub_name = record["publisher"].get("name", "")
if record.get("image") is None:
image_url = ""
else:
image_url = record["image"].get("super_url", "")
start_year = utils.xlate_int(record.get("start_year", ""))
aliases = record.get("aliases") or ""
formatted_results.append(
ComicSeries(
aliases=utils.split(aliases, "\n"),
count_of_issues=record.get("count_of_issues", 0),
count_of_volumes=None,
description=record.get("description", ""),
id=str(record["id"]),
image_url=image_url,
name=record["name"],
publisher=pub_name,
start_year=start_year,
genres=[],
format=None,
)
)
formatted_results.append(self._format_series(record))
return formatted_results
def _format_series(self, record) -> ComicSeries:
# Flatten publisher to name only
if record.get("publisher") is None:
pub_name = ""
else:
pub_name = record["publisher"].get("name", "")
if record.get("image") is None:
image_url = ""
else:
image_url = record["image"].get("super_url", "")
start_year = utils.xlate_int(record.get("start_year", ""))
aliases = record.get("aliases") or ""
return ComicSeries(
aliases=utils.split(aliases, "\n"),
count_of_issues=record.get("count_of_issues", 0),
count_of_volumes=None,
description=record.get("description", ""),
id=str(record["id"]),
image_url=image_url,
name=record["name"],
publisher=pub_name,
start_year=start_year,
genres=[],
format=None,
)
def _fetch_issues_in_series(self, series_id: str) -> list[tuple[GenericMetadata, bool]]:
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher(self.cache_folder, self.version)
cached_series_issues_result = cvc.get_series_issues_info(series_id, self.origin)
cached_series_issues_result = cvc.get_series_issues_info(series_id, self.id)
series = self._fetch_series_data(int(series_id))
series = self._fetch_series_data(int(series_id))[0]
if len(cached_series_issues_result) == series.count_of_issues:
# Remove internal "complete" bool
return cached_series_issues_result
return [
(self._map_comic_issue_to_metadata(json.loads(x[0].data), series), x[1])
for x in cached_series_issues_result
]
params = { # CV uses volume to mean series
"api_key": self.api_key,
@ -514,20 +521,27 @@ class ComicVineTalker(ComicTalker):
current_result_count += cv_response["number_of_page_results"]
# Format to expected output
formatted_series_issues_result = [
self.map_comic_issue_to_metadata(x, self._fetch_series_data(x["volume"]["id"]))
self._map_comic_issue_to_metadata(x, self._fetch_series_data(x["volume"]["id"])[0])
for x in series_issues_result
]
cvc.add_series_issues_info(self.origin, formatted_series_issues_result, False)
cvc.add_issues_info(
self.id,
[
Issue(id=str(x["id"]), series_id=series_id, data=json.dumps(x).encode("utf-8"))
for x in series_issues_result
],
False,
)
return [(x, False) for x in formatted_series_issues_result]
def _fetch_series_data(self, series_id: int) -> ComicSeries:
def _fetch_series_data(self, series_id: int) -> tuple[ComicSeries, bool]:
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher(self.cache_folder, self.version)
cached_series_result = cvc.get_series_info(str(series_id), self.origin)
cached_series = cvc.get_series_info(str(series_id), self.id)
if cached_series_result is not None:
return cached_series_result
if cached_series is not None:
return (self._format_series(json.loads(cached_series[0].data)), cached_series[1])
series_url = urljoin(self.api_url, f"volume/{CVTypeID.Volume}-{series_id}") # CV uses volume to mean series
@ -538,12 +552,13 @@ class ComicVineTalker(ComicTalker):
cv_response: CVResult[CVSeries] = self._get_cv_content(series_url, params)
series_results = cv_response["results"]
formatted_series_results = self._format_search_results([series_results])
if series_results:
cvc.add_series_info(self.origin, formatted_series_results[0])
cvc.add_series_info(
self.id, Series(id=str(series_results["id"]), data=json.dumps(series_results).encode("utf-8")), True
)
return formatted_series_results[0]
return self._format_series(series_results), True
def _fetch_issue_data(self, series_id: int, issue_number: str) -> GenericMetadata:
issues_list_results = self._fetch_issues_in_series(str(series_id))
@ -568,10 +583,12 @@ class ComicVineTalker(ComicTalker):
def _fetch_issue_data_by_issue_id(self, issue_id: str) -> GenericMetadata:
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher(self.cache_folder, self.version)
cached_issues_result = cvc.get_issue_info(int(issue_id), self.origin)
cached_issue = cvc.get_issue_info(int(issue_id), self.id)
if cached_issues_result and cached_issues_result[1]:
return cached_issues_result[0]
if cached_issue and cached_issue[1]:
return self._map_comic_issue_to_metadata(
json.loads(cached_issue[0].data), self._fetch_series_data(int(cached_issue[0].series_id))[0]
)
issue_url = urljoin(self.api_url, f"issue/{CVTypeID.Issue}-{issue_id}")
params = {"api_key": self.api_key, "format": "json"}
@ -579,19 +596,26 @@ class ComicVineTalker(ComicTalker):
issue_results = cv_response["results"]
# Format to expected output
cv_issues = self.map_comic_issue_to_metadata(
issue_results, self._fetch_series_data(int(issue_results["volume"]["id"]))
cvc.add_issues_info(
self.id,
[
Issue(
id=str(issue_results["id"]),
series_id=str(issue_results["volume"]["id"]),
data=json.dumps(issue_results).encode("utf-8"),
)
],
True,
)
cvc.add_series_issues_info(self.origin, [cv_issues], True)
# Now, map the GenericMetadata data to generic metadata
return cv_issues
return self._map_comic_issue_to_metadata(
issue_results, self._fetch_series_data(int(issue_results["volume"]["id"]))[0]
)
def map_comic_issue_to_metadata(self, issue: CVIssue, series: ComicSeries) -> GenericMetadata:
def _map_comic_issue_to_metadata(self, issue: CVIssue, series: ComicSeries) -> GenericMetadata:
md = GenericMetadata(
tag_origin=self.origin,
tag_origin=TagOrigin(self.id, self.name),
issue_id=utils.xlate(issue.get("id")),
series_id=series.id,
title_aliases=utils.split(issue.get("aliases"), "\n"),
@ -638,7 +662,6 @@ class ComicVineTalker(ComicTalker):
if self.use_series_start_as_volume:
md.volume = series.start_year
series = self._fetch_series_data(issue["volume"]["id"])
if issue.get("cover_date"):
md.day, md.month, md.year = utils.parse_date_str(issue.get("cover_date"))
elif series.start_year:

View File

@ -4,7 +4,7 @@ import comicapi.genericmetadata
from comicapi import utils
search_results = [
comicapi.genericmetadata.ComicSeries(
dict(
count_of_issues=1,
count_of_volumes=1,
description="this is a description",
@ -17,7 +17,7 @@ search_results = [
genres=[],
format=None,
),
comicapi.genericmetadata.ComicSeries(
dict(
count_of_issues=1,
count_of_volumes=1,
description="this is a description",

View File

@ -1,9 +1,10 @@
from __future__ import annotations
import json
import pytest
import comictalker.comiccacher
from comicapi.genericmetadata import TagOrigin
from testing.comicdata import search_results
@ -14,13 +15,23 @@ def test_create_cache(config, mock_version):
def test_search_results(comic_cache):
comic_cache.add_search_results(TagOrigin("test", "test"), "test search", search_results)
assert search_results == comic_cache.get_search_results(TagOrigin("test", "test"), "test search")
comic_cache.add_search_results(
"test",
"test search",
[comictalker.comiccacher.Series(id=x["id"], data=json.dumps(x)) for x in search_results],
True,
)
cached_results = [json.loads(x[0].data) for x in comic_cache.get_search_results("test", "test search")]
assert search_results == cached_results
@pytest.mark.parametrize("series_info", search_results)
def test_series_info(comic_cache, series_info):
comic_cache.add_series_info(series=series_info, source=TagOrigin("test", "test"))
comic_cache.add_series_info(
series=comictalker.comiccacher.Series(id=series_info["id"], data=json.dumps(series_info)),
source="test",
complete=True,
)
vi = series_info.copy()
cache_result = comic_cache.get_series_info(series_id=series_info.id, source=TagOrigin("test", "test"))
cache_result = json.loads(comic_cache.get_series_info(series_id=series_info["id"], source="test")[0].data)
assert vi == cache_result

View File

@ -1,5 +1,7 @@
from __future__ import annotations
import json
import pytest
import comicapi.genericmetadata
@ -7,27 +9,34 @@ import testing.comicvine
def test_search_for_series(comicvine_api, comic_cache):
results = comicvine_api.search_for_series("cory doctorows futuristic tales of the here and now")
cache_issues = comic_cache.get_search_results(
comicvine_api.origin, "cory doctorows futuristic tales of the here and now"
)
assert results == cache_issues
results = comicvine_api.search_for_series("cory doctorows futuristic tales of the here and now")[0]
cache_series = comic_cache.get_search_results(
comicvine_api.id, "cory doctorows futuristic tales of the here and now"
)[0][0]
series_results = comicvine_api._format_series(json.loads(cache_series.data))
assert results == series_results
def test_fetch_series_data(comicvine_api, comic_cache):
result = comicvine_api._fetch_series_data(23437)
# del result["description"]
# del result["image_url"]
cache_result = comic_cache.get_series_info(23437, comicvine_api.origin)
# del cache_result["description"]
# del cache_result["image_url"]
assert result == cache_result
def test_fetch_series(comicvine_api, comic_cache):
result = comicvine_api.fetch_series(23437)
cache_series = comic_cache.get_series_info(23437, comicvine_api.id)[0]
series_result = comicvine_api._format_series(json.loads(cache_series.data))
assert result == series_result
def test_fetch_issues_in_series(comicvine_api, comic_cache):
results = comicvine_api.fetch_issues_in_series(23437)
cache_issues = comic_cache.get_series_issues_info(23437, comicvine_api.origin)
assert results[0] == cache_issues[0][0]
cache_issues = comic_cache.get_series_issues_info(23437, comicvine_api.id)
issues_results = [
comicvine_api._map_comic_issue_to_metadata(
json.loads(x[0].data),
comicvine_api._format_series(
json.loads(comic_cache.get_series_info(x[0].series_id, comicvine_api.id)[0].data)
),
)
for x in cache_issues
]
assert results == issues_results
def test_fetch_issue_data_by_issue_id(comicvine_api):