diff --git a/comictalker/comiccacher.py b/comictalker/comiccacher.py
index 38ba26e..edf5222 100644
--- a/comictalker/comiccacher.py
+++ b/comictalker/comiccacher.py
@@ -16,19 +16,28 @@
from __future__ import annotations
import datetime
-import json
import logging
import os
import pathlib
import sqlite3
-from typing import Any, cast
+from typing import Any
-from comicapi import utils
-from comicapi.genericmetadata import ComicSeries, Credit, GenericMetadata, TagOrigin
+from typing_extensions import NamedTuple
logger = logging.getLogger(__name__)
+class Series(NamedTuple):
+ id: str
+ data: bytes
+
+
+class Issue(NamedTuple):
+ id: str
+ series_id: str
+ data: bytes
+
+
class ComicCacher:
def __init__(self, cache_folder: pathlib.Path, version: str) -> None:
self.cache_folder = cache_folder
@@ -74,70 +83,43 @@ class ComicCacher:
# create tables
with con:
cur = con.cursor()
- # source,name,id,start_year,publisher,image,description,count_of_issues
cur.execute(
- "CREATE TABLE SeriesSearchCache("
- + "timestamp DATE DEFAULT (datetime('now','localtime')),"
- + "id TEXT NOT NULL,"
- + "source TEXT NOT NULL,"
- + "search_term TEXT,"
- + "PRIMARY KEY (id, source, search_term))"
+ """CREATE TABLE SeriesSearchCache(
+ timestamp DATE DEFAULT (datetime('now','localtime')),
+ id TEXT NOT NULL,
+ source TEXT NOT NULL,
+ search_term TEXT,
+ PRIMARY KEY (id, source, search_term))"""
)
- cur.execute("CREATE TABLE Source(" + "id TEXT NOT NULL," + "name TEXT NOT NULL," + "PRIMARY KEY (id))")
+ cur.execute("CREATE TABLE Source(id TEXT NOT NULL, name TEXT NOT NULL, PRIMARY KEY (id))")
cur.execute(
- "CREATE TABLE Series("
- + "timestamp DATE DEFAULT (datetime('now','localtime')), "
- + "id TEXT NOT NULL,"
- + "source TEXT NOT NULL,"
- + "name TEXT,"
- + "publisher TEXT,"
- + "count_of_issues INT,"
- + "count_of_volumes INT,"
- + "start_year INT,"
- + "image_url TEXT,"
- + "aliases TEXT," # Newline separated
- + "description TEXT,"
- + "genres TEXT," # Newline separated. For filtering etc.
- + "format TEXT,"
- + "PRIMARY KEY (id, source))"
+ """CREATE TABLE Series(
+ timestamp DATE DEFAULT (datetime('now','localtime')),
+ id TEXT NOT NULL,
+ source TEXT NOT NULL,
+ data BLOB,
+ complete BOOL,
+ PRIMARY KEY (id, source))"""
)
cur.execute(
- "CREATE TABLE Issues("
- + "timestamp DATE DEFAULT (datetime('now','localtime')), "
- + "id TEXT NOT NULL,"
- + "source TEXT NOT NULL,"
- + "series_id TEXT,"
- + "name TEXT,"
- + "issue_number TEXT,"
- + "image_url TEXT,"
- + "thumb_url TEXT,"
- + "cover_date TEXT,"
- + "site_detail_url TEXT,"
- + "description TEXT,"
- + "aliases TEXT," # Newline separated
- + "alt_image_urls TEXT," # Newline separated URLs
- + "characters TEXT," # Newline separated
- + "locations TEXT," # Newline separated
- + "credits TEXT," # JSON: "{"name": "Bob Shakespeare", "role": "Writer"}"
- + "teams TEXT," # Newline separated
- + "story_arcs TEXT," # Newline separated
- + "genres TEXT," # Newline separated
- + "tags TEXT," # Newline separated
- + "critical_rating FLOAT,"
- + "manga TEXT," # Yes/YesAndRightToLeft/No
- + "maturity_rating TEXT,"
- + "language TEXT,"
- + "country TEXT,"
- + "volume TEXT,"
- + "complete BOOL," # Is the data complete? Includes characters, locations, credits.
- + "PRIMARY KEY (id, source))"
+ """CREATE TABLE Issues(
+ timestamp DATE DEFAULT (datetime('now','localtime')),
+ id TEXT NOT NULL,
+ source TEXT NOT NULL,
+ series_id TEXT,
+ data BLOB,
+ complete BOOL,
+ PRIMARY KEY (id, source))"""
)
- def add_search_results(self, source: TagOrigin, search_term: str, series_list: list[ComicSeries]) -> None:
- self.add_source(source)
+ def expire_stale_records(self, cur: sqlite3.Cursor, table: str) -> None:
+ # purge stale series info
+ a_week_ago = datetime.datetime.today() - datetime.timedelta(days=7)
+ cur.execute("DELETE FROM Series WHERE timestamp < ?", [str(a_week_ago)])
+ def add_search_results(self, source: str, search_term: str, series_list: list[Series], complete: bool) -> None:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
con.text_factory = str
@@ -146,153 +128,80 @@ class ComicCacher:
# remove all previous entries with this search term
cur.execute(
"DELETE FROM SeriesSearchCache WHERE search_term = ? AND source = ?",
- [search_term.casefold(), source.id],
+ [search_term.casefold(), source],
)
# now add in new results
- for record in series_list:
+ for series in series_list:
cur.execute(
"INSERT INTO SeriesSearchCache (source, search_term, id) VALUES(?, ?, ?)",
- (source.id, search_term.casefold(), record.id),
+ (source, search_term.casefold(), series.id),
)
-
data = {
- "id": record.id,
- "source": source.id,
- "name": record.name,
- "publisher": record.publisher,
- "count_of_issues": record.count_of_issues,
- "count_of_volumes": record.count_of_volumes,
- "start_year": record.start_year,
- "image_url": record.image_url,
- "description": record.description,
- "genres": "\n".join(record.genres),
- "format": record.format,
- "timestamp": datetime.datetime.now(),
- "aliases": "\n".join(record.aliases),
+ "id": series.id,
+ "source": source,
+ "data": series.data,
+ "complete": complete,
}
self.upsert(cur, "series", data)
- def add_series_info(self, source: TagOrigin, series: ComicSeries) -> None:
- self.add_source(source)
-
+ def add_series_info(self, source: str, series: Series, complete: bool) -> None:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
- timestamp = datetime.datetime.now()
-
data = {
"id": series.id,
- "source": source.id,
- "name": series.name,
- "publisher": series.publisher,
- "count_of_issues": series.count_of_issues,
- "count_of_volumes": series.count_of_volumes,
- "start_year": series.start_year,
- "image_url": series.image_url,
- "description": series.description,
- "genres": "\n".join(series.genres),
- "format": series.format,
- "timestamp": timestamp,
- "aliases": "\n".join(series.aliases),
+ "source": source,
+ "data": series.data,
+ "complete": complete,
}
self.upsert(cur, "series", data)
- def add_series_issues_info(self, source: TagOrigin, issues: list[GenericMetadata], complete: bool) -> None:
- self.add_source(source)
-
+ def add_issues_info(self, source: str, issues: list[Issue], complete: bool) -> None:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
- timestamp = datetime.datetime.now()
-
- # add in issues
-
for issue in issues:
data = {
- "id": issue.issue_id,
+ "id": issue.id,
"series_id": issue.series_id,
- "source": source.id,
- "name": issue.title,
- "issue_number": issue.issue,
- "volume": issue.volume,
- "site_detail_url": issue.web_link,
- "cover_date": f"{issue.year}-{issue.month}-{issue.day}",
- "image_url": issue.cover_image,
- "description": issue.description,
- "timestamp": timestamp,
- "aliases": "\n".join(issue.title_aliases),
- "alt_image_urls": "\n".join(issue.alternate_images),
- "characters": "\n".join(issue.characters),
- "locations": "\n".join(issue.locations),
- "teams": "\n".join(issue.teams),
- "story_arcs": "\n".join(issue.story_arcs),
- "genres": "\n".join(issue.genres),
- "tags": "\n".join(issue.tags),
- "critical_rating": issue.critical_rating,
- "manga": issue.manga,
- "maturity_rating": issue.maturity_rating,
- "language": issue.language,
- "country": issue.country,
- "credits": json.dumps(issue.credits),
+ "data": issue.data,
+ "source": source,
"complete": complete,
}
self.upsert(cur, "issues", data)
- def add_source(self, source: TagOrigin) -> None:
- with sqlite3.connect(self.db_file) as con:
- con.row_factory = sqlite3.Row
- cur = con.cursor()
- con.text_factory = str
-
- self.upsert(
- cur,
- "source",
- {
- "id": source.id,
- "name": source.name,
- },
- )
-
- def get_search_results(self, source: TagOrigin, search_term: str) -> list[ComicSeries]:
+ def get_search_results(self, source: str, search_term: str, expire_stale: bool = True) -> list[tuple[Series, bool]]:
results = []
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
con.text_factory = str
cur = con.cursor()
+ if expire_stale:
+ self.expire_stale_records(cur, "SeriesSearchCache")
+ self.expire_stale_records(cur, "Series")
+
cur.execute(
- "SELECT * FROM SeriesSearchCache INNER JOIN Series on"
- + " SeriesSearchCache.id=Series.id AND SeriesSearchCache.source=Series.source"
- + " WHERE search_term=? AND SeriesSearchCache.source=?",
- [search_term.casefold(), source.id],
+ """SELECT * FROM SeriesSearchCache INNER JOIN Series on
+ SeriesSearchCache.id=Series.id AND SeriesSearchCache.source=Series.source
+ WHERE search_term=? AND SeriesSearchCache.source=?""",
+ [search_term.casefold(), source],
)
rows = cur.fetchall()
- # now process the results
- for record in rows:
- result = ComicSeries(
- id=record["id"],
- name=record["name"],
- publisher=record["publisher"],
- count_of_issues=record["count_of_issues"],
- count_of_volumes=record["count_of_volumes"],
- start_year=record["start_year"],
- image_url=record["image_url"],
- aliases=utils.split(record["aliases"], "\n"),
- description=record["description"],
- genres=utils.split(record["genres"], "\n"),
- format=record["format"],
- )
- results.append(result)
+ for record in rows:
+ result = Series(id=record["id"], data=record["data"])
+
+ results.append((result, record["complete"]))
return results
- def get_series_info(self, series_id: str, source: TagOrigin, expire_stale: bool = True) -> ComicSeries | None:
- result: ComicSeries | None = None
+ def get_series_info(self, series_id: str, source: str, expire_stale: bool = True) -> tuple[Series, bool] | None:
+ result: Series | None = None
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
@@ -300,170 +209,64 @@ class ComicCacher:
con.text_factory = str
if expire_stale:
- # purge stale series info
- a_week_ago = datetime.datetime.today() - datetime.timedelta(days=7)
- cur.execute("DELETE FROM Series WHERE timestamp < ?", [str(a_week_ago)])
+ self.expire_stale_records(cur, "Series")
# fetch
- cur.execute("SELECT * FROM Series WHERE id=? AND source=?", [series_id, source.id])
+ cur.execute("SELECT * FROM Series WHERE id=? AND source=?", [series_id, source])
row = cur.fetchone()
if row is None:
- return result
+ return None
- # since ID is primary key, there is only one row
- result = ComicSeries(
- id=row["id"],
- name=row["name"],
- publisher=row["publisher"],
- count_of_issues=row["count_of_issues"],
- count_of_volumes=row["count_of_volumes"],
- start_year=row["start_year"],
- image_url=row["image_url"],
- aliases=utils.split(row["aliases"], "\n"),
- description=row["description"],
- genres=utils.split(row["genres"], "\n"),
- format=row["format"],
- )
+ result = Series(id=row["id"], data=row["data"])
- return result
-
- def get_series_issues_info(self, series_id: str, source: TagOrigin) -> list[tuple[GenericMetadata, bool]]:
- # get_series_info should only fail if someone is doing something weird
- series = self.get_series_info(series_id, source, False) or ComicSeries(
- id=series_id,
- name="",
- description="",
- genres=[],
- image_url="",
- publisher="",
- start_year=None,
- aliases=[],
- count_of_issues=None,
- count_of_volumes=None,
- format=None,
- )
+ return (result, row["complete"])
+ def get_series_issues_info(
+ self, series_id: str, source: str, expire_stale: bool = True
+ ) -> list[tuple[Issue, bool]]:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
con.text_factory = str
- # purge stale issue info - probably issue data won't change
- # much....
- a_week_ago = datetime.datetime.today() - datetime.timedelta(days=7)
- cur.execute("DELETE FROM Issues WHERE timestamp < ?", [str(a_week_ago)])
+ if expire_stale:
+ self.expire_stale_records(cur, "Issues")
# fetch
- results: list[tuple[GenericMetadata, bool]] = []
+ results: list[tuple[Issue, bool]] = []
- cur.execute("SELECT * FROM Issues WHERE series_id=? AND source=?", [series_id, source.id])
+ cur.execute("SELECT * FROM Issues WHERE series_id=? AND source=?", [series_id, source])
rows = cur.fetchall()
# now process the results
for row in rows:
- record = self.map_row_metadata(row, series, source)
+ record = (Issue(id=row["id"], series_id=row["series_id"], data=row["data"]), row["complete"])
results.append(record)
return results
- def get_issue_info(self, issue_id: int, source: TagOrigin) -> tuple[GenericMetadata, bool] | None:
+ def get_issue_info(self, issue_id: int, source: str, expire_stale: bool = True) -> tuple[Issue, bool] | None:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
con.text_factory = str
- # purge stale issue info - probably issue data won't change
- # much....
- a_week_ago = datetime.datetime.today() - datetime.timedelta(days=7)
- cur.execute("DELETE FROM Issues WHERE timestamp < ?", [str(a_week_ago)])
+ if expire_stale:
+ self.expire_stale_records(cur, "Issues")
- cur.execute("SELECT * FROM Issues WHERE id=? AND source=?", [issue_id, source.id])
+ cur.execute("SELECT * FROM Issues WHERE id=? AND source=?", [issue_id, source])
row = cur.fetchone()
record = None
if row:
- # get_series_info should only fail if someone is doing something weird
- series = self.get_series_info(row["id"], source, False) or ComicSeries(
- id=row["id"],
- name="",
- description="",
- genres=[],
- image_url="",
- publisher="",
- start_year=None,
- aliases=[],
- count_of_issues=None,
- count_of_volumes=None,
- format=None,
- )
-
- record = self.map_row_metadata(row, series, source)
+ record = (Issue(id=row["id"], series_id=row["series_id"], data=row["data"]), row["complete"])
return record
- def get_source(self, source_id: str) -> TagOrigin:
- con = sqlite3.connect(self.db_file)
- with sqlite3.connect(self.db_file) as con:
- con.row_factory = sqlite3.Row
- cur = con.cursor()
- con.text_factory = str
-
- cur.execute("SELECT * FROM Source WHERE id=?", [source_id])
- row = cur.fetchone()
-
- return TagOrigin(row["id"], row["name"])
-
- def map_row_metadata(
- self, row: sqlite3.Row, series: ComicSeries, source: TagOrigin
- ) -> tuple[GenericMetadata, bool]:
- day, month, year = utils.parse_date_str(row["cover_date"])
- credits = []
- try:
- for credit in json.loads(row["credits"]):
- credits.append(cast(Credit, credit))
- except Exception:
- logger.exception("credits failed")
- return (
- GenericMetadata(
- tag_origin=source,
- alternate_images=utils.split(row["alt_image_urls"], "\n"),
- characters=utils.split(row["characters"], "\n"),
- country=row["country"],
- cover_image=row["image_url"],
- credits=credits,
- critical_rating=row["critical_rating"],
- day=day,
- description=row["description"],
- genres=utils.split(row["genres"], "\n"),
- issue=row["issue_number"],
- issue_count=series.count_of_issues,
- issue_id=row["id"],
- language=row["language"],
- locations=utils.split(row["locations"], "\n"),
- manga=row["manga"],
- maturity_rating=row["maturity_rating"],
- month=month,
- publisher=series.publisher,
- series=series.name,
- series_aliases=series.aliases,
- series_id=series.id,
- story_arcs=utils.split(row["story_arcs"], "\n"),
- tags=set(utils.split(row["tags"], "\n")),
- teams=utils.split(row["teams"], "\n"),
- title=row["name"],
- title_aliases=utils.split(row["aliases"], "\n"),
- volume=row["volume"],
- volume_count=series.count_of_volumes,
- web_link=row["site_detail_url"],
- year=year,
- ),
- row["complete"],
- )
-
def upsert(self, cur: sqlite3.Cursor, tablename: str, data: dict[str, Any]) -> None:
"""This does an insert if the given PK doesn't exist, and an
update it if does
diff --git a/comictalker/comictalker.py b/comictalker/comictalker.py
index 6e62a3c..14a13c3 100644
--- a/comictalker/comictalker.py
+++ b/comictalker/comictalker.py
@@ -19,7 +19,7 @@ from typing import Any, Callable
import settngs
-from comicapi.genericmetadata import ComicSeries, GenericMetadata, TagOrigin
+from comicapi.genericmetadata import ComicSeries, GenericMetadata
from comictalker.talker_utils import fix_url
logger = logging.getLogger(__name__)
@@ -107,7 +107,6 @@ class ComicTalker:
name: str = "Example"
id: str = "example"
- origin: TagOrigin = TagOrigin(id, name)
website: str = "https://example.com"
logo_url: str = f"{website}/logo.png"
attribution: str = f"Metadata provided by {name}"
diff --git a/comictalker/talkers/comicvine.py b/comictalker/talkers/comicvine.py
index 3456a1e..68eca55 100644
--- a/comictalker/talkers/comicvine.py
+++ b/comictalker/talkers/comicvine.py
@@ -33,7 +33,7 @@ from comicapi import utils
from comicapi.genericmetadata import ComicSeries, GenericMetadata, TagOrigin
from comicapi.issuestring import IssueString
from comictalker import talker_utils
-from comictalker.comiccacher import ComicCacher
+from comictalker.comiccacher import ComicCacher, Issue, Series
from comictalker.comictalker import ComicTalker, TalkerDataError, TalkerNetworkError
logger = logging.getLogger(__name__)
@@ -159,7 +159,6 @@ default_limiter = Limiter(RequestRate(1, 5))
class ComicVineTalker(ComicTalker):
name: str = "Comic Vine"
id: str = "comicvine"
- origin: TagOrigin = TagOrigin(id, name)
website: str = "https://comicvine.gamespot.com"
logo_url: str = f"{website}/a/bundles/comicvinesite/images/logo.png"
attribution: str = f"Metadata provided by {name}"
@@ -244,10 +243,10 @@ class ComicVineTalker(ComicTalker):
# For literal searches always retrieve from online
cvc = ComicCacher(self.cache_folder, self.version)
if not refresh_cache and not literal:
- cached_search_results = cvc.get_search_results(self.origin, series_name)
+ cached_search_results = cvc.get_search_results(self.id, series_name)
if len(cached_search_results) > 0:
- return cached_search_results
+ return self._format_search_results([json.loads(x[0].data) for x in cached_search_results])
params = { # CV uses volume to mean series
"api_key": self.api_key,
@@ -317,7 +316,12 @@ class ComicVineTalker(ComicTalker):
# Cache these search results, even if it's literal we cache the results
# The most it will cause is extra processing time
- cvc.add_search_results(self.origin, series_name, formatted_search_results)
+ cvc.add_search_results(
+ self.id,
+ series_name,
+ [Series(id=str(x["id"]), data=json.dumps(x).encode("utf-8")) for x in search_results],
+ False,
+ )
return formatted_search_results
@@ -333,7 +337,7 @@ class ComicVineTalker(ComicTalker):
return comic_data
def fetch_series(self, series_id: str) -> ComicSeries:
- return self._fetch_series_data(int(series_id))
+ return self._fetch_series_data(int(series_id))[0]
def fetch_issues_in_series(self, series_id: str) -> list[GenericMetadata]:
return [x[0] for x in self._fetch_issues_in_series(series_id)]
@@ -378,7 +382,7 @@ class ComicVineTalker(ComicTalker):
current_result_count += cv_response["number_of_page_results"]
formatted_filtered_issues_result = [
- self.map_comic_issue_to_metadata(x, self._fetch_series_data(x["volume"]["id"]))
+ self._map_comic_issue_to_metadata(x, self._fetch_series_data(x["volume"]["id"])[0])
for x in filtered_issues_result
]
@@ -443,49 +447,52 @@ class ComicVineTalker(ComicTalker):
def _format_search_results(self, search_results: list[CVSeries]) -> list[ComicSeries]:
formatted_results = []
for record in search_results:
- # Flatten publisher to name only
- if record.get("publisher") is None:
- pub_name = ""
- else:
- pub_name = record["publisher"].get("name", "")
-
- if record.get("image") is None:
- image_url = ""
- else:
- image_url = record["image"].get("super_url", "")
-
- start_year = utils.xlate_int(record.get("start_year", ""))
-
- aliases = record.get("aliases") or ""
-
- formatted_results.append(
- ComicSeries(
- aliases=utils.split(aliases, "\n"),
- count_of_issues=record.get("count_of_issues", 0),
- count_of_volumes=None,
- description=record.get("description", ""),
- id=str(record["id"]),
- image_url=image_url,
- name=record["name"],
- publisher=pub_name,
- start_year=start_year,
- genres=[],
- format=None,
- )
- )
+ formatted_results.append(self._format_series(record))
return formatted_results
+ def _format_series(self, record) -> ComicSeries:
+ # Flatten publisher to name only
+ if record.get("publisher") is None:
+ pub_name = ""
+ else:
+ pub_name = record["publisher"].get("name", "")
+
+ if record.get("image") is None:
+ image_url = ""
+ else:
+ image_url = record["image"].get("super_url", "")
+
+ start_year = utils.xlate_int(record.get("start_year", ""))
+
+ aliases = record.get("aliases") or ""
+
+ return ComicSeries(
+ aliases=utils.split(aliases, "\n"),
+ count_of_issues=record.get("count_of_issues", 0),
+ count_of_volumes=None,
+ description=record.get("description", ""),
+ id=str(record["id"]),
+ image_url=image_url,
+ name=record["name"],
+ publisher=pub_name,
+ start_year=start_year,
+ genres=[],
+ format=None,
+ )
+
def _fetch_issues_in_series(self, series_id: str) -> list[tuple[GenericMetadata, bool]]:
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher(self.cache_folder, self.version)
- cached_series_issues_result = cvc.get_series_issues_info(series_id, self.origin)
+ cached_series_issues_result = cvc.get_series_issues_info(series_id, self.id)
- series = self._fetch_series_data(int(series_id))
+ series = self._fetch_series_data(int(series_id))[0]
if len(cached_series_issues_result) == series.count_of_issues:
- # Remove internal "complete" bool
- return cached_series_issues_result
+ return [
+ (self._map_comic_issue_to_metadata(json.loads(x[0].data), series), x[1])
+ for x in cached_series_issues_result
+ ]
params = { # CV uses volume to mean series
"api_key": self.api_key,
@@ -514,20 +521,27 @@ class ComicVineTalker(ComicTalker):
current_result_count += cv_response["number_of_page_results"]
# Format to expected output
formatted_series_issues_result = [
- self.map_comic_issue_to_metadata(x, self._fetch_series_data(x["volume"]["id"]))
+ self._map_comic_issue_to_metadata(x, self._fetch_series_data(x["volume"]["id"])[0])
for x in series_issues_result
]
- cvc.add_series_issues_info(self.origin, formatted_series_issues_result, False)
+ cvc.add_issues_info(
+ self.id,
+ [
+ Issue(id=str(x["id"]), series_id=series_id, data=json.dumps(x).encode("utf-8"))
+ for x in series_issues_result
+ ],
+ False,
+ )
return [(x, False) for x in formatted_series_issues_result]
- def _fetch_series_data(self, series_id: int) -> ComicSeries:
+ def _fetch_series_data(self, series_id: int) -> tuple[ComicSeries, bool]:
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher(self.cache_folder, self.version)
- cached_series_result = cvc.get_series_info(str(series_id), self.origin)
+ cached_series = cvc.get_series_info(str(series_id), self.id)
- if cached_series_result is not None:
- return cached_series_result
+ if cached_series is not None:
+ return (self._format_series(json.loads(cached_series[0].data)), cached_series[1])
series_url = urljoin(self.api_url, f"volume/{CVTypeID.Volume}-{series_id}") # CV uses volume to mean series
@@ -538,12 +552,13 @@ class ComicVineTalker(ComicTalker):
cv_response: CVResult[CVSeries] = self._get_cv_content(series_url, params)
series_results = cv_response["results"]
- formatted_series_results = self._format_search_results([series_results])
if series_results:
- cvc.add_series_info(self.origin, formatted_series_results[0])
+ cvc.add_series_info(
+ self.id, Series(id=str(series_results["id"]), data=json.dumps(series_results).encode("utf-8")), True
+ )
- return formatted_series_results[0]
+ return self._format_series(series_results), True
def _fetch_issue_data(self, series_id: int, issue_number: str) -> GenericMetadata:
issues_list_results = self._fetch_issues_in_series(str(series_id))
@@ -568,10 +583,12 @@ class ComicVineTalker(ComicTalker):
def _fetch_issue_data_by_issue_id(self, issue_id: str) -> GenericMetadata:
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher(self.cache_folder, self.version)
- cached_issues_result = cvc.get_issue_info(int(issue_id), self.origin)
+ cached_issue = cvc.get_issue_info(int(issue_id), self.id)
- if cached_issues_result and cached_issues_result[1]:
- return cached_issues_result[0]
+ if cached_issue and cached_issue[1]:
+ return self._map_comic_issue_to_metadata(
+ json.loads(cached_issue[0].data), self._fetch_series_data(int(cached_issue[0].series_id))[0]
+ )
issue_url = urljoin(self.api_url, f"issue/{CVTypeID.Issue}-{issue_id}")
params = {"api_key": self.api_key, "format": "json"}
@@ -579,19 +596,26 @@ class ComicVineTalker(ComicTalker):
issue_results = cv_response["results"]
- # Format to expected output
- cv_issues = self.map_comic_issue_to_metadata(
- issue_results, self._fetch_series_data(int(issue_results["volume"]["id"]))
+ cvc.add_issues_info(
+ self.id,
+ [
+ Issue(
+ id=str(issue_results["id"]),
+ series_id=str(issue_results["volume"]["id"]),
+ data=json.dumps(issue_results).encode("utf-8"),
+ )
+ ],
+ True,
)
- cvc.add_series_issues_info(self.origin, [cv_issues], True)
-
# Now, map the GenericMetadata data to generic metadata
- return cv_issues
+ return self._map_comic_issue_to_metadata(
+ issue_results, self._fetch_series_data(int(issue_results["volume"]["id"]))[0]
+ )
- def map_comic_issue_to_metadata(self, issue: CVIssue, series: ComicSeries) -> GenericMetadata:
+ def _map_comic_issue_to_metadata(self, issue: CVIssue, series: ComicSeries) -> GenericMetadata:
md = GenericMetadata(
- tag_origin=self.origin,
+ tag_origin=TagOrigin(self.id, self.name),
issue_id=utils.xlate(issue.get("id")),
series_id=series.id,
title_aliases=utils.split(issue.get("aliases"), "\n"),
@@ -638,7 +662,6 @@ class ComicVineTalker(ComicTalker):
if self.use_series_start_as_volume:
md.volume = series.start_year
- series = self._fetch_series_data(issue["volume"]["id"])
if issue.get("cover_date"):
md.day, md.month, md.year = utils.parse_date_str(issue.get("cover_date"))
elif series.start_year:
diff --git a/testing/comicdata.py b/testing/comicdata.py
index f09142f..560c67f 100644
--- a/testing/comicdata.py
+++ b/testing/comicdata.py
@@ -4,7 +4,7 @@ import comicapi.genericmetadata
from comicapi import utils
search_results = [
- comicapi.genericmetadata.ComicSeries(
+ dict(
count_of_issues=1,
count_of_volumes=1,
description="this is a description",
@@ -17,7 +17,7 @@ search_results = [
genres=[],
format=None,
),
- comicapi.genericmetadata.ComicSeries(
+ dict(
count_of_issues=1,
count_of_volumes=1,
description="this is a description",
diff --git a/tests/comiccacher_test.py b/tests/comiccacher_test.py
index 2d821e1..d9c3114 100644
--- a/tests/comiccacher_test.py
+++ b/tests/comiccacher_test.py
@@ -1,9 +1,10 @@
from __future__ import annotations
+import json
+
import pytest
import comictalker.comiccacher
-from comicapi.genericmetadata import TagOrigin
from testing.comicdata import search_results
@@ -14,13 +15,23 @@ def test_create_cache(config, mock_version):
def test_search_results(comic_cache):
- comic_cache.add_search_results(TagOrigin("test", "test"), "test search", search_results)
- assert search_results == comic_cache.get_search_results(TagOrigin("test", "test"), "test search")
+ comic_cache.add_search_results(
+ "test",
+ "test search",
+ [comictalker.comiccacher.Series(id=x["id"], data=json.dumps(x)) for x in search_results],
+ True,
+ )
+ cached_results = [json.loads(x[0].data) for x in comic_cache.get_search_results("test", "test search")]
+ assert search_results == cached_results
@pytest.mark.parametrize("series_info", search_results)
def test_series_info(comic_cache, series_info):
- comic_cache.add_series_info(series=series_info, source=TagOrigin("test", "test"))
+ comic_cache.add_series_info(
+ series=comictalker.comiccacher.Series(id=series_info["id"], data=json.dumps(series_info)),
+ source="test",
+ complete=True,
+ )
vi = series_info.copy()
- cache_result = comic_cache.get_series_info(series_id=series_info.id, source=TagOrigin("test", "test"))
+ cache_result = json.loads(comic_cache.get_series_info(series_id=series_info["id"], source="test")[0].data)
assert vi == cache_result
diff --git a/tests/comicvinetalker_test.py b/tests/comicvinetalker_test.py
index 429ca7e..467e48e 100644
--- a/tests/comicvinetalker_test.py
+++ b/tests/comicvinetalker_test.py
@@ -1,5 +1,7 @@
from __future__ import annotations
+import json
+
import pytest
import comicapi.genericmetadata
@@ -7,27 +9,34 @@ import testing.comicvine
def test_search_for_series(comicvine_api, comic_cache):
- results = comicvine_api.search_for_series("cory doctorows futuristic tales of the here and now")
- cache_issues = comic_cache.get_search_results(
- comicvine_api.origin, "cory doctorows futuristic tales of the here and now"
- )
- assert results == cache_issues
+ results = comicvine_api.search_for_series("cory doctorows futuristic tales of the here and now")[0]
+ cache_series = comic_cache.get_search_results(
+ comicvine_api.id, "cory doctorows futuristic tales of the here and now"
+ )[0][0]
+ series_results = comicvine_api._format_series(json.loads(cache_series.data))
+ assert results == series_results
-def test_fetch_series_data(comicvine_api, comic_cache):
- result = comicvine_api._fetch_series_data(23437)
- # del result["description"]
- # del result["image_url"]
- cache_result = comic_cache.get_series_info(23437, comicvine_api.origin)
- # del cache_result["description"]
- # del cache_result["image_url"]
- assert result == cache_result
+def test_fetch_series(comicvine_api, comic_cache):
+ result = comicvine_api.fetch_series(23437)
+ cache_series = comic_cache.get_series_info(23437, comicvine_api.id)[0]
+ series_result = comicvine_api._format_series(json.loads(cache_series.data))
+ assert result == series_result
def test_fetch_issues_in_series(comicvine_api, comic_cache):
results = comicvine_api.fetch_issues_in_series(23437)
- cache_issues = comic_cache.get_series_issues_info(23437, comicvine_api.origin)
- assert results[0] == cache_issues[0][0]
+ cache_issues = comic_cache.get_series_issues_info(23437, comicvine_api.id)
+ issues_results = [
+ comicvine_api._map_comic_issue_to_metadata(
+ json.loads(x[0].data),
+ comicvine_api._format_series(
+ json.loads(comic_cache.get_series_info(x[0].series_id, comicvine_api.id)[0].data)
+ ),
+ )
+ for x in cache_issues
+ ]
+ assert results == issues_results
def test_fetch_issue_data_by_issue_id(comicvine_api):