Merge branch 'volume-to-series' into develop

This commit is contained in:
Timmy Welch 2022-12-22 10:45:58 -08:00
commit a7f6349aa4
No known key found for this signature in database
18 changed files with 216 additions and 243 deletions

View File

@ -34,8 +34,6 @@ logger = logging.getLogger(__name__)
class AutoTagMatchWindow(QtWidgets.QDialog):
volume_id = 0
def __init__(
self,
parent: QtWidgets.QWidget,

View File

@ -420,16 +420,16 @@ class IssueIdentifier:
# now sort the list by name length
series_second_round_list.sort(key=lambda x: len(x["name"]), reverse=False)
# build a list of volume IDs
volume_id_list = []
# build a list of series IDs
series_id_list = []
for series in series_second_round_list:
volume_id_list.append(series["id"])
series_id_list.append(series["id"])
issue_list = None
try:
if len(volume_id_list) > 0:
issue_list = self.talker_api.fetch_issues_by_volume_issue_num_and_year(
volume_id_list, keys["issue_number"], keys["year"]
if len(series_id_list) > 0:
issue_list = self.talker_api.fetch_issues_by_series_issue_num_and_year(
series_id_list, keys["issue_number"], keys["year"]
)
except TalkerError as e:
self.log_msg(f"Issue with while searching for series details. Aborting...\n{e}")
@ -439,10 +439,10 @@ class IssueIdentifier:
return []
shortlist = []
# now re-associate the issues and volumes
# now re-associate the issues and series
for issue in issue_list:
for series in series_second_round_list:
if series["id"] == issue["volume"]["id"]:
if series["id"] == issue["series"]["id"]:
shortlist.append((series, issue))
break
@ -453,7 +453,7 @@ class IssueIdentifier:
f"Found {len(shortlist)} series that have an issue #{keys['issue_number']} from {keys['year']}"
)
# now we have a shortlist of volumes with the desired issue number
# now we have a shortlist of series with the desired issue number
# Do first round of cover matching
counter = len(shortlist)
for series, issue in shortlist:
@ -495,7 +495,7 @@ class IssueIdentifier:
"url_image_hash": score_item["hash"],
"issue_title": issue["name"],
"issue_id": issue["id"],
"volume_id": series["id"],
"series_id": series["id"],
"month": month,
"year": year,
"publisher": None,
@ -561,7 +561,7 @@ class IssueIdentifier:
if self.callback is not None:
self.callback(counter, len(self.match_list) * 3)
counter += 1
self.log_msg(f"Examining alternate covers for ID: {m['volume_id']} {m['series']} ...", newline=False)
self.log_msg(f"Examining alternate covers for ID: {m['series_id']} {m['series']} ...", newline=False)
try:
score_item = self.get_issue_cover_match_score(
m["issue_id"],
@ -615,7 +615,7 @@ class IssueIdentifier:
self.match_list.remove(match_item)
# One more test for the case choosing limited series first issue vs a trade with the same cover:
# if we have a given issue count > 1 and the volume from CV has count==1, remove it from match list
# if we have a given issue count > 1 and the series from CV has count==1, remove it from match list
if len(self.match_list) >= 2 and keys["issue_count"] is not None and keys["issue_count"] != 1:
new_list = []
for match in self.match_list:
@ -623,7 +623,7 @@ class IssueIdentifier:
new_list.append(match)
else:
self.log_msg(
f"Removing volume {match['series']} [{match['volume_id']}] from consideration (only 1 issue)"
f"Removing series {match['series']} [{match['series_id']}] from consideration (only 1 issue)"
)
if len(new_list) > 0:

View File

@ -39,8 +39,6 @@ class IssueNumberTableWidgetItem(QtWidgets.QTableWidgetItem):
class IssueSelectionWindow(QtWidgets.QDialog):
volume_id = 0
def __init__(
self,
parent: QtWidgets.QWidget,
@ -106,7 +104,7 @@ class IssueSelectionWindow(QtWidgets.QDialog):
QtWidgets.QApplication.setOverrideCursor(QtGui.QCursor(QtCore.Qt.CursorShape.WaitCursor))
try:
self.issue_list = self.talker_api.fetch_issues_by_volume(self.series_id)
self.issue_list = self.talker_api.fetch_issues_by_series(self.series_id)
except TalkerError as e:
QtWidgets.QApplication.restoreOverrideCursor()
QtWidgets.QMessageBox.critical(self, f"{e.source} {e.code_name} Error", f"{e}")

View File

@ -32,8 +32,6 @@ logger = logging.getLogger(__name__)
class MatchSelectionWindow(QtWidgets.QDialog):
volume_id = 0
def __init__(
self,
parent: QtWidgets.QWidget,

View File

@ -13,7 +13,7 @@ class IssueResult(TypedDict):
url_image_hash: int
issue_title: str
issue_id: int # int?
volume_id: int # int?
series_id: int # int?
month: int | None
year: int | None
publisher: str | None

View File

@ -33,7 +33,7 @@ from comictaggerlib.matchselectionwindow import MatchSelectionWindow
from comictaggerlib.progresswindow import IDProgressWindow
from comictaggerlib.ui import ui_path
from comictaggerlib.ui.qtutils import reduce_widget_font_size
from comictalker.resulttypes import ComicVolume
from comictalker.resulttypes import ComicSeries
from comictalker.talkerbase import ComicTalker, TalkerError
logger = logging.getLogger(__name__)
@ -57,7 +57,7 @@ class SearchThread(QtCore.QThread):
self.refresh: bool = refresh
self.error_e: TalkerError
self.ct_error = False
self.ct_search_results: list[ComicVolume] = []
self.ct_search_results: list[ComicSeries] = []
self.literal = literal
self.series_match_thresh = series_match_thresh
@ -101,7 +101,7 @@ class IdentifyThread(QtCore.QThread):
self.identifyComplete.emit()
class VolumeSelectionWindow(QtWidgets.QDialog):
class SeriesSelectionWindow(QtWidgets.QDialog):
def __init__(
self,
parent: QtWidgets.QWidget,
@ -118,7 +118,7 @@ class VolumeSelectionWindow(QtWidgets.QDialog):
) -> None:
super().__init__(parent)
uic.loadUi(ui_path / "volumeselectionwindow.ui", self)
uic.loadUi(ui_path / "serieselectionwindow.ui", self)
self.imageWidget = CoverImageWidget(
self.imageContainer, CoverImageWidget.URLMode, options.runtime_config.user_cache_dir, talker_api
@ -144,11 +144,11 @@ class VolumeSelectionWindow(QtWidgets.QDialog):
self.issue_id: int | None = None
self.year = year
self.issue_count = issue_count
self.volume_id = 0
self.series_id = 0
self.comic_archive = comic_archive
self.immediate_autoselect = autoselect
self.cover_index_list = cover_index_list
self.ct_search_results: list[ComicVolume] = []
self.ct_search_results: list[ComicSeries] = []
self.literal = literal
self.ii: IssueIdentifier | None = None
self.iddialog: IDProgressWindow | None = None
@ -294,16 +294,16 @@ class VolumeSelectionWindow(QtWidgets.QDialog):
if found_match is not None:
self.iddialog.accept()
self.volume_id = utils.xlate(found_match["volume_id"])
self.series_id = utils.xlate(found_match["series_id"])
self.issue_number = found_match["issue_number"]
self.select_by_id()
self.show_issues()
def show_issues(self) -> None:
selector = IssueSelectionWindow(self, self.options, self.talker_api, self.volume_id, self.issue_number)
selector = IssueSelectionWindow(self, self.options, self.talker_api, self.series_id, self.issue_number)
title = ""
for record in self.ct_search_results:
if record["id"] == self.volume_id:
if record["id"] == self.series_id:
title = record["name"]
title += " (" + str(record["start_year"]) + ")"
title += " - "
@ -313,15 +313,15 @@ class VolumeSelectionWindow(QtWidgets.QDialog):
selector.setModal(True)
selector.exec()
if selector.result():
# we should now have a volume ID
# we should now have a series ID
self.issue_number = selector.issue_number
self.issue_id = selector.issue_id
self.accept()
def select_by_id(self) -> None:
for r in range(0, self.twList.rowCount()):
volume_id = self.twList.item(r, 0).data(QtCore.Qt.ItemDataRole.UserRole)
if volume_id == self.volume_id:
series_id = self.twList.item(r, 0).data(QtCore.Qt.ItemDataRole.UserRole)
if series_id == self.series_id:
self.twList.selectRow(r)
break
@ -419,9 +419,9 @@ class VolumeSelectionWindow(QtWidgets.QDialog):
sanitized = utils.sanitize_title(self.series_name, False).casefold()
sanitized_no_articles = utils.sanitize_title(self.series_name, True).casefold()
deques: list[deque[ComicVolume]] = [deque(), deque(), deque()]
deques: list[deque[ComicSeries]] = [deque(), deque(), deque()]
def categorize(result: ComicVolume) -> int:
def categorize(result: ComicSeries) -> int:
# We don't remove anything on this one so that we only get exact matches
if utils.sanitize_title(result["name"], True).casefold() == sanitized_no_articles:
return 0
@ -517,11 +517,11 @@ class VolumeSelectionWindow(QtWidgets.QDialog):
if prev is not None and prev.row() == curr.row():
return
self.volume_id = self.twList.item(curr.row(), 0).data(QtCore.Qt.ItemDataRole.UserRole)
self.series_id = self.twList.item(curr.row(), 0).data(QtCore.Qt.ItemDataRole.UserRole)
# list selection was changed, update the info on the volume
# list selection was changed, update the info on the series
for record in self.ct_search_results:
if record["id"] == self.volume_id:
if record["id"] == self.series_id:
if record["description"] is None:
self.teDetails.setText("")
else:

View File

@ -58,11 +58,11 @@ from comictaggerlib.pagebrowser import PageBrowserWindow
from comictaggerlib.pagelisteditor import PageListEditor
from comictaggerlib.renamewindow import RenameWindow
from comictaggerlib.resulttypes import IssueResult, MultipleMatch, OnlineMatchResults
from comictaggerlib.serieselectionwindow import SeriesSelectionWindow
from comictaggerlib.settingswindow import SettingsWindow
from comictaggerlib.ui import ui_path
from comictaggerlib.ui.qtutils import center_window_on_parent, reduce_widget_font_size
from comictaggerlib.versionchecker import VersionChecker
from comictaggerlib.volumeselectionwindow import VolumeSelectionWindow
from comictalker.talkerbase import ComicTalker, TalkerError
logger = logging.getLogger(__name__)
@ -1045,7 +1045,7 @@ Have fun!
issue_count = utils.xlate(self.leIssueCount.text(), True)
cover_index_list = self.metadata.get_cover_page_index_list()
selector = VolumeSelectionWindow(
selector = SeriesSelectionWindow(
self,
series_name,
issue_number,
@ -1065,7 +1065,7 @@ Have fun!
selector.exec()
if selector.result():
# we should now have a volume ID
# we should now have a series ID
QtWidgets.QApplication.setOverrideCursor(QtGui.QCursor(QtCore.Qt.CursorShape.WaitCursor))
# copy the form onto metadata object
@ -1073,7 +1073,7 @@ Have fun!
try:
new_metadata = self.talker_api.fetch_comic_data(
issue_id=selector.issue_id or 0, series_id=selector.volume_id, issue_number=selector.issue_number
issue_id=selector.issue_id or 0, series_id=selector.series_id, issue_number=selector.issue_number
)
except TalkerError as e:
QtWidgets.QApplication.restoreOverrideCursor()

View File

@ -23,7 +23,7 @@ import pathlib
import sqlite3 as lite
from typing import Any
from comictalker.resulttypes import ComicIssue, ComicVolume
from comictalker.resulttypes import ComicIssue, ComicSeries
logger = logging.getLogger(__name__)
@ -75,7 +75,7 @@ class ComicCacher:
cur = con.cursor()
# source_name,name,id,start_year,publisher,image,description,count_of_issues
cur.execute(
"CREATE TABLE VolumeSearchCache("
"CREATE TABLE SeriesSearchCache("
+ "search_term TEXT,"
+ "id INT NOT NULL,"
+ "timestamp DATE DEFAULT (datetime('now','localtime')),"
@ -83,7 +83,7 @@ class ComicCacher:
)
cur.execute(
"CREATE TABLE Volumes("
"CREATE TABLE Series("
+ "id INT NOT NULL,"
+ "name TEXT,"
+ "publisher TEXT,"
@ -100,7 +100,7 @@ class ComicCacher:
cur.execute(
"CREATE TABLE Issues("
+ "id INT NOT NULL,"
+ "volume_id INT,"
+ "series_id INT,"
+ "name TEXT,"
+ "issue_number TEXT,"
+ "image_url TEXT,"
@ -121,7 +121,7 @@ class ComicCacher:
+ "PRIMARY KEY (id, source_name))"
)
def add_search_results(self, source_name: str, search_term: str, ct_search_results: list[ComicVolume]) -> None:
def add_search_results(self, source_name: str, search_term: str, ct_search_results: list[ComicSeries]) -> None:
con = lite.connect(self.db_file)
with con:
@ -130,14 +130,14 @@ class ComicCacher:
# remove all previous entries with this search term
cur.execute(
"DELETE FROM VolumeSearchCache WHERE search_term = ? AND source_name = ?",
"DELETE FROM SeriesSearchCache WHERE search_term = ? AND source_name = ?",
[search_term.casefold(), source_name],
)
# now add in new results
for record in ct_search_results:
cur.execute(
"INSERT INTO VolumeSearchCache " + "(source_name, search_term, id) " + "VALUES(?, ?, ?)",
"INSERT INTO SeriesSearchCache " + "(source_name, search_term, id) " + "VALUES(?, ?, ?)",
(
source_name,
search_term.casefold(),
@ -157,9 +157,9 @@ class ComicCacher:
"timestamp": datetime.datetime.now(),
"aliases": "\n".join(record.get("aliases", [])),
}
self.upsert(cur, "volumes", data)
self.upsert(cur, "series", data)
def get_search_results(self, source_name: str, search_term: str) -> list[ComicVolume]:
def get_search_results(self, source_name: str, search_term: str) -> list[ComicSeries]:
results = []
con = lite.connect(self.db_file)
with con:
@ -167,16 +167,16 @@ class ComicCacher:
cur = con.cursor()
cur.execute(
"SELECT * FROM VolumeSearchCache INNER JOIN Volumes on"
" VolumeSearchCache.id=Volumes.id AND VolumeSearchCache.source_name=Volumes.source_name"
" WHERE search_term=? AND VolumeSearchCache.source_name=?",
"SELECT * FROM SeriesSearchCache INNER JOIN Series on"
" SeriesSearchCache.id=Series.id AND SeriesSearchCache.source_name=Series.source_name"
" WHERE search_term=? AND SeriesSearchCache.source_name=?",
[search_term.casefold(), source_name],
)
rows = cur.fetchall()
# now process the results
for record in rows:
result = ComicVolume(
result = ComicSeries(
id=record[4],
name=record[5],
publisher=record[6],
@ -191,7 +191,7 @@ class ComicCacher:
return results
def add_volume_info(self, source_name: str, volume_record: ComicVolume) -> None:
def add_series_info(self, source_name: str, series_record: ComicSeries) -> None:
con = lite.connect(self.db_file)
with con:
@ -201,20 +201,20 @@ class ComicCacher:
timestamp = datetime.datetime.now()
data = {
"id": volume_record["id"],
"id": series_record["id"],
"source_name": source_name,
"name": volume_record["name"],
"publisher": volume_record.get("publisher", ""),
"count_of_issues": volume_record.get("count_of_issues"),
"start_year": volume_record.get("start_year"),
"image_url": volume_record.get("image_url", ""),
"description": volume_record.get("description", ""),
"name": series_record["name"],
"publisher": series_record.get("publisher", ""),
"count_of_issues": series_record.get("count_of_issues"),
"start_year": series_record.get("start_year"),
"image_url": series_record.get("image_url", ""),
"description": series_record.get("description", ""),
"timestamp": timestamp,
"aliases": "\n".join(volume_record.get("aliases", [])),
"aliases": "\n".join(series_record.get("aliases", [])),
}
self.upsert(cur, "volumes", data)
self.upsert(cur, "series", data)
def add_volume_issues_info(self, source_name: str, volume_issues: list[ComicIssue]) -> None:
def add_series_issues_info(self, source_name: str, series_issues: list[ComicIssue]) -> None:
con = lite.connect(self.db_file)
with con:
@ -224,10 +224,10 @@ class ComicCacher:
# add in issues
for issue in volume_issues:
for issue in series_issues:
data = {
"id": issue["id"],
"volume_id": issue["volume"]["id"],
"series_id": issue["series"]["id"],
"source_name": source_name,
"name": issue["name"],
"issue_number": issue["issue_number"],
@ -248,8 +248,8 @@ class ComicCacher:
}
self.upsert(cur, "issues", data)
def get_volume_info(self, volume_id: int, source_name: str, purge: bool = True) -> ComicVolume | None:
result: ComicVolume | None = None
def get_series_info(self, series_id: int, source_name: str, purge: bool = True) -> ComicSeries | None:
result: ComicSeries | None = None
con = lite.connect(self.db_file)
with con:
@ -257,14 +257,14 @@ class ComicCacher:
con.text_factory = str
if purge:
# purge stale volume info
# purge stale series info
a_week_ago = datetime.datetime.today() - datetime.timedelta(days=7)
cur.execute("DELETE FROM Volumes WHERE timestamp < ?", [str(a_week_ago)])
cur.execute("DELETE FROM Series WHERE timestamp < ?", [str(a_week_ago)])
# fetch
cur.execute(
"SELECT * FROM Volumes" " WHERE id=? AND source_name=?",
[volume_id, source_name],
"SELECT * FROM Series" " WHERE id=? AND source_name=?",
[series_id, source_name],
)
row = cur.fetchone()
@ -273,7 +273,7 @@ class ComicCacher:
return result
# since ID is primary key, there is only one row
result = ComicVolume(
result = ComicSeries(
id=row[0],
name=row[1],
publisher=row[2],
@ -286,9 +286,9 @@ class ComicCacher:
return result
def get_volume_issues_info(self, volume_id: int, source_name: str) -> list[ComicIssue]:
# get_volume_info should only fail if someone is doing something weird
volume = self.get_volume_info(volume_id, source_name, False) or ComicVolume(id=volume_id, name="")
def get_series_issues_info(self, series_id: int, source_name: str) -> list[ComicIssue]:
# get_series_info should only fail if someone is doing something weird
series = self.get_series_info(series_id, source_name, False) or ComicSeries(id=series_id, name="")
con = lite.connect(self.db_file)
with con:
cur = con.cursor()
@ -305,9 +305,9 @@ class ComicCacher:
cur.execute(
(
"SELECT source_name,id,name,issue_number,site_detail_url,cover_date,image_url,thumb_url,description,aliases,alt_image_urls,characters,locations,credits,teams,story_arcs,complete"
" FROM Issues WHERE volume_id=? AND source_name=?"
" FROM Issues WHERE series_id=? AND source_name=?"
),
[volume_id, source_name],
[series_id, source_name],
)
rows = cur.fetchall()
@ -321,7 +321,7 @@ class ComicCacher:
cover_date=row[5],
image_url=row[6],
description=row[8],
volume=volume,
series=series,
aliases=row[9].strip().splitlines(),
alt_image_urls=row[10].strip().splitlines(),
characters=row[11].strip().splitlines(),
@ -349,7 +349,7 @@ class ComicCacher:
cur.execute(
(
"SELECT source_name,id,name,issue_number,site_detail_url,cover_date,image_url,thumb_url,description,aliases,volume_id,alt_image_urls,characters,locations,credits,teams,story_arcs,complete"
"SELECT source_name,id,name,issue_number,site_detail_url,cover_date,image_url,thumb_url,description,aliases,series_id,alt_image_urls,characters,locations,credits,teams,story_arcs,complete"
" FROM Issues WHERE id=? AND source_name=?"
),
[issue_id, source_name],
@ -359,8 +359,8 @@ class ComicCacher:
record = None
if row:
# get_volume_info should only fail if someone is doing something weird
volume = self.get_volume_info(row[10], source_name, False) or ComicVolume(id=row[10], name="")
# get_series_info should only fail if someone is doing something weird
series = self.get_series_info(row[10], source_name, False) or ComicSeries(id=row[10], name="")
# now process the results
@ -373,7 +373,7 @@ class ComicCacher:
image_url=row[6],
image_thumb_url=row[7],
description=row[8],
volume=volume,
series=series,
aliases=row[9].strip().splitlines(),
alt_image_urls=row[11].strip().splitlines(),
characters=row[12].strip().splitlines(),

View File

@ -8,7 +8,7 @@ class Credits(TypedDict):
role: str
class ComicVolume(TypedDict, total=False):
class ComicSeries(TypedDict, total=False):
aliases: list[str]
count_of_issues: int
description: str
@ -16,7 +16,7 @@ class ComicVolume(TypedDict, total=False):
image_url: str
name: Required[str]
publisher: str
start_year: int
start_year: int | None
class ComicIssue(TypedDict, total=False):
@ -29,7 +29,7 @@ class ComicIssue(TypedDict, total=False):
issue_number: Required[str]
name: Required[str]
site_detail_url: str
volume: ComicVolume
series: ComicSeries
alt_image_urls: list[str]
characters: list[str]
locations: list[str]

View File

@ -36,8 +36,8 @@ def map_comic_issue_to_metadata(
metadata.is_empty = False
# Is this best way to go about checking?
if issue_results["volume"].get("name"):
metadata.series = utils.xlate(issue_results["volume"]["name"])
if issue_results["series"].get("name"):
metadata.series = utils.xlate(issue_results["series"]["name"])
if issue_results.get("issue_number"):
metadata.issue = IssueString(issue_results["issue_number"]).as_string()
if issue_results.get("name"):
@ -45,17 +45,17 @@ def map_comic_issue_to_metadata(
if issue_results.get("image_url"):
metadata.cover_image = issue_results["image_url"]
if issue_results["volume"].get("publisher"):
metadata.publisher = utils.xlate(issue_results["volume"]["publisher"])
if issue_results["series"].get("publisher"):
metadata.publisher = utils.xlate(issue_results["series"]["publisher"])
if issue_results.get("cover_date"):
metadata.day, metadata.month, metadata.year = utils.parse_date_str(issue_results["cover_date"])
elif issue_results["volume"].get("start_year"):
metadata.year = utils.xlate(issue_results["volume"]["start_year"], True)
elif issue_results["series"].get("start_year"):
metadata.year = utils.xlate(issue_results["series"]["start_year"], True)
metadata.comments = cleanup_html(issue_results["description"], remove_html_tables)
if use_year_volume:
metadata.volume = issue_results["volume"]["start_year"]
metadata.volume = issue_results["series"]["start_year"]
metadata.tag_origin = source
metadata.issue_id = issue_results["id"]

View File

@ -21,7 +21,7 @@ from typing import Callable
from urllib.parse import urlsplit
from comicapi.genericmetadata import GenericMetadata
from comictalker.resulttypes import ComicIssue, ComicVolume
from comictalker.resulttypes import ComicIssue, ComicSeries
logger = logging.getLogger(__name__)
@ -175,15 +175,15 @@ class ComicTalker:
callback: Callable[[int, int], None] | None = None,
refresh_cache: bool = False,
literal: bool = False,
) -> list[ComicVolume]:
"""Searches for the series/volumes with the given series_name
) -> list[ComicSeries]:
"""Searches for the series with the given series_name
callback is used for...
refresh_cache signals if the data in the cache should be used
literal indicates that no articles (a, the, etc.) should be removed when searching"""
raise NotImplementedError
def fetch_issues_by_volume(self, series_id: int) -> list[ComicIssue]:
"""Expected to return a list of issues with a given series/volume ID"""
def fetch_issues_by_series(self, series_id: int) -> list[ComicIssue]:
"""Expected to return a list of issues with a given series ID"""
raise NotImplementedError
def fetch_comic_data(self, issue_id: int = 0, series_id: int = 0, issue_number: str = "") -> GenericMetadata:
@ -193,8 +193,8 @@ class ComicTalker:
3. Only issue_id: Retrieve the ISSUE information"""
raise NotImplementedError
def fetch_issues_by_volume_issue_num_and_year(
self, volume_id_list: list[int], issue_number: str, year: str | int | None
def fetch_issues_by_series_issue_num_and_year(
self, series_id_list: list[int], issue_number: str, year: str | int | None
) -> list[ComicIssue]:
"""Searches for a list of issues within the given year. Used solely by issueidentifer"""
raise NotImplementedError

View File

@ -30,14 +30,14 @@ from comicapi import utils
from comicapi.genericmetadata import GenericMetadata
from comicapi.issuestring import IssueString
from comictalker.comiccacher import ComicCacher
from comictalker.resulttypes import ComicIssue, ComicVolume, Credits
from comictalker.resulttypes import ComicIssue, ComicSeries, Credits
from comictalker.talkerbase import ComicTalker, SourceDetails, SourceStaticOptions, TalkerDataError, TalkerNetworkError
logger = logging.getLogger(__name__)
class CVTypeID:
Volume = "4050"
Volume = "4050" # CV uses volume to mean series
Issue = "4000"
@ -67,13 +67,6 @@ class CVPublisher(TypedDict, total=False):
name: Required[str]
class CVVolume(TypedDict):
api_detail_url: str
id: int
name: str
site_detail_url: str
class CVCredits(TypedDict):
api_detail_url: str
id: int
@ -89,7 +82,9 @@ class CVPersonCredits(TypedDict):
role: str
class CVVolumeResults(TypedDict):
class CVSeries(TypedDict):
api_detail_url: str
site_detail_url: str
aliases: str
count_of_issues: int
description: str
@ -99,6 +94,9 @@ class CVVolumeResults(TypedDict):
publisher: CVPublisher
start_year: str
resource_type: str
characters: list[CVCredits]
locations: list[CVCredits]
people: list[CVPersonCredits]
class CVResult(TypedDict):
@ -108,25 +106,11 @@ class CVResult(TypedDict):
number_of_page_results: int
number_of_total_results: int
status_code: int
results: (CVIssueDetailResults | CVVolumeResults | list[CVVolumeResults] | list[CVIssueDetailResults])
results: (CVIssue | CVSeries | list[CVSeries] | list[CVIssue])
version: str
class CVVolumeFullResult(TypedDict):
characters: list[CVCredits]
locations: list[CVCredits]
people: list[CVPersonCredits]
site_detail_url: str
count_of_issues: int
description: str
id: int
name: str
publisher: CVPublisher
start_year: str
resource_type: str
class CVIssueDetailResults(TypedDict, total=False):
class CVIssue(TypedDict, total=False):
aliases: str
api_detail_url: str
associated_images: list[CVAltImages]
@ -157,7 +141,7 @@ class CVIssueDetailResults(TypedDict, total=False):
story_arc_credits: list[CVCredits]
team_credits: list[CVCredits]
team_disbanded_in: None
volume: CVVolume
volume: CVSeries # CV uses volume to mean series
CV_RATE_LIMIT_STATUS = 107
@ -284,7 +268,7 @@ class ComicVineTalker(ComicTalker):
raise TalkerNetworkError(self.source_name_friendly, 5)
def format_search_results(self, search_results: list[CVVolumeResults]) -> list[ComicVolume]:
def format_search_results(self, search_results: list[CVSeries]) -> list[ComicSeries]:
formatted_results = []
for record in search_results:
# Flatten publisher to name only
@ -298,14 +282,13 @@ class ComicVineTalker(ComicTalker):
else:
image_url = record["image"].get("super_url", "")
if record.get("start_year") is None:
start_year = 0
else:
start_year = utils.xlate(record["start_year"], True)
start_year = utils.xlate(record.get("start_year", ""), True)
aliases = record.get("aliases") or ""
formatted_results.append(
ComicVolume(
aliases=record["aliases"].split("\n") if record["aliases"] else [], # CV returns a null because...?
ComicSeries(
aliases=aliases.splitlines(),
count_of_issues=record.get("count_of_issues", 0),
description=record.get("description", ""),
id=record["id"],
@ -318,9 +301,7 @@ class ComicVineTalker(ComicTalker):
return formatted_results
def format_issue_results(
self, issue_results: list[CVIssueDetailResults], complete: bool = False
) -> list[ComicIssue]:
def format_issue_results(self, issue_results: list[CVIssue], complete: bool = False) -> list[ComicIssue]:
formatted_results = []
for record in issue_results:
# Extract image super and thumb to name only
@ -371,7 +352,7 @@ class ComicVineTalker(ComicTalker):
issue_number=record["issue_number"],
name=record["name"],
site_detail_url=record.get("site_detail_url", ""),
volume=cast(ComicVolume, record["volume"]),
series=self.format_search_results([record["volume"]])[0], # CV uses volume to mean series
alt_image_urls=alt_images_list,
characters=character_list,
locations=location_list,
@ -390,7 +371,7 @@ class ComicVineTalker(ComicTalker):
callback: Callable[[int, int], None] | None = None,
refresh_cache: bool = False,
literal: bool = False,
) -> list[ComicVolume]:
) -> list[ComicSeries]:
# Sanitize the series name for comicvine searching, comicvine search ignore symbols
search_series_name = utils.sanitize_title(series_name, literal)
logger.info(f"{self.source_name_friendly} searching: {search_series_name}")
@ -404,7 +385,7 @@ class ComicVineTalker(ComicTalker):
if len(cached_search_results) > 0:
return cached_search_results
params = {
params = { # CV uses volume to mean series
"api_key": self.api_key,
"format": "json",
"resources": "volume",
@ -416,7 +397,7 @@ class ComicVineTalker(ComicTalker):
cv_response = self.get_cv_content(urljoin(self.api_url, "search"), params)
search_results: list[CVVolumeResults] = []
search_results: list[CVSeries] = []
# see http://api.comicvine.com/documentation/#handling_responses
@ -436,7 +417,7 @@ class ComicVineTalker(ComicTalker):
logger.debug(
f"Found {cv_response['number_of_page_results']} of {cv_response['number_of_total_results']} results"
)
search_results.extend(cast(list[CVVolumeResults], cv_response["results"]))
search_results.extend(cast(list[CVSeries], cv_response["results"]))
page = 1
if callback is not None:
@ -448,8 +429,8 @@ class ComicVineTalker(ComicTalker):
if not literal:
# Stop searching once any entry falls below the threshold
stop_searching = any(
not utils.titles_match(search_series_name, volume["name"], self.series_match_thresh)
for volume in cast(list[CVVolumeResults], cv_response["results"])
not utils.titles_match(search_series_name, series["name"], self.series_match_thresh)
for series in cast(list[CVSeries], cv_response["results"])
)
if stop_searching:
@ -462,7 +443,7 @@ class ComicVineTalker(ComicTalker):
params["page"] = page
cv_response = self.get_cv_content(urljoin(self.api_url, "search"), params)
search_results.extend(cast(list[CVVolumeResults], cv_response["results"]))
search_results.extend(cast(list[CVSeries], cv_response["results"]))
current_result_count += cv_response["number_of_page_results"]
if callback is not None:
@ -477,7 +458,7 @@ class ComicVineTalker(ComicTalker):
return formatted_search_results
# Get issue or volume information
# Get issue or series information
def fetch_comic_data(self, issue_id: int = 0, series_id: int = 0, issue_number: str = "") -> GenericMetadata:
comic_data = GenericMetadata()
if issue_number and series_id:
@ -487,42 +468,41 @@ class ComicVineTalker(ComicTalker):
return comic_data
def fetch_partial_volume_data(self, series_id: int) -> ComicVolume:
def fetch_series_data(self, series_id: int) -> ComicSeries:
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher(self.cache_folder, self.version)
cached_volume_result = cvc.get_volume_info(series_id, self.source_name)
cached_series_result = cvc.get_series_info(series_id, self.source_name)
if cached_volume_result is not None:
return cached_volume_result
if cached_series_result is not None:
return cached_series_result
volume_url = urljoin(self.api_url, f"volume/{CVTypeID.Volume}-{series_id}")
series_url = urljoin(self.api_url, f"volume/{CVTypeID.Volume}-{series_id}") # CV uses volume to mean series
params = {
"api_key": self.api_key,
"format": "json",
"field_list": "name,id,start_year,publisher,count_of_issues,aliases",
}
cv_response = self.get_cv_content(volume_url, params)
cv_response = self.get_cv_content(series_url, params)
volume_results = cast(CVVolumeResults, cv_response["results"])
formatted_volume_results = self.format_search_results([volume_results])
series_results = cast(CVSeries, cv_response["results"])
formatted_series_results = self.format_search_results([series_results])
if volume_results:
cvc.add_volume_info(self.source_name, formatted_volume_results[0])
if series_results:
cvc.add_series_info(self.source_name, formatted_series_results[0])
return formatted_volume_results[0]
return formatted_series_results[0]
def fetch_issues_by_volume(self, series_id: int) -> list[ComicIssue]:
def fetch_issues_by_series(self, series_id: int) -> list[ComicIssue]:
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher(self.cache_folder, self.version)
cached_volume_issues_result = cvc.get_volume_issues_info(series_id, self.source_name)
cached_series_issues_result = cvc.get_series_issues_info(series_id, self.source_name)
volume_data = self.fetch_partial_volume_data(series_id)
series_data = self.fetch_series_data(series_id)
if len(cached_volume_issues_result) == volume_data["count_of_issues"]:
return cached_volume_issues_result
if len(cached_series_issues_result) == series_data["count_of_issues"]:
return cached_series_issues_result
params = {
params = { # CV uses volume to mean series
"api_key": self.api_key,
"filter": f"volume:{series_id}",
"format": "json",
@ -534,7 +514,7 @@ class ComicVineTalker(ComicTalker):
current_result_count = cv_response["number_of_page_results"]
total_result_count = cv_response["number_of_total_results"]
volume_issues_result = cast(list[CVIssueDetailResults], cv_response["results"])
series_issues_result = cast(list[CVIssue], cv_response["results"])
page = 1
offset = 0
@ -546,29 +526,29 @@ class ComicVineTalker(ComicTalker):
params["offset"] = offset
cv_response = self.get_cv_content(urljoin(self.api_url, "issues/"), params)
volume_issues_result.extend(cast(list[CVIssueDetailResults], cv_response["results"]))
series_issues_result.extend(cast(list[CVIssue], cv_response["results"]))
current_result_count += cv_response["number_of_page_results"]
# Format to expected output !! issues/ volume does NOT return publisher!!
formatted_volume_issues_result = self.format_issue_results(volume_issues_result)
# Format to expected output
formatted_series_issues_result = self.format_issue_results(series_issues_result)
cvc.add_volume_issues_info(self.source_name, formatted_volume_issues_result)
cvc.add_series_issues_info(self.source_name, formatted_series_issues_result)
return formatted_volume_issues_result
return formatted_series_issues_result
def fetch_issues_by_volume_issue_num_and_year(
self, volume_id_list: list[int], issue_number: str, year: str | int | None
def fetch_issues_by_series_issue_num_and_year(
self, series_id_list: list[int], issue_number: str, year: str | int | None
) -> list[ComicIssue]:
volume_filter = ""
for vid in volume_id_list:
volume_filter += str(vid) + "|"
flt = f"volume:{volume_filter},issue_number:{issue_number}"
series_filter = ""
for vid in series_id_list:
series_filter += str(vid) + "|"
flt = f"volume:{series_filter},issue_number:{issue_number}" # CV uses volume to mean series
int_year = utils.xlate(year, True)
if int_year is not None:
flt += f",cover_date:{int_year}-1-1|{int_year + 1}-1-1"
params: dict[str, str | int] = {
params: dict[str, str | int] = { # CV uses volume to mean series
"api_key": self.api_key,
"format": "json",
"field_list": "id,volume,issue_number,name,image,cover_date,site_detail_url,description,aliases,associated_images",
@ -580,7 +560,7 @@ class ComicVineTalker(ComicTalker):
current_result_count = cv_response["number_of_page_results"]
total_result_count = cv_response["number_of_total_results"]
filtered_issues_result = cast(list[CVIssueDetailResults], cv_response["results"])
filtered_issues_result = cast(list[CVIssue], cv_response["results"])
page = 1
offset = 0
@ -592,7 +572,7 @@ class ComicVineTalker(ComicTalker):
params["offset"] = offset
cv_response = self.get_cv_content(urljoin(self.api_url, "issues/"), params)
filtered_issues_result.extend(cast(list[CVIssueDetailResults], cv_response["results"]))
filtered_issues_result.extend(cast(list[CVIssue], cv_response["results"]))
current_result_count += cv_response["number_of_page_results"]
formatted_filtered_issues_result = self.format_issue_results(filtered_issues_result)
@ -600,7 +580,7 @@ class ComicVineTalker(ComicTalker):
return formatted_filtered_issues_result
def fetch_issue_data(self, series_id: int, issue_number: str) -> GenericMetadata:
issues_list_results = self.fetch_issues_by_volume(series_id)
issues_list_results = self.fetch_issues_by_series(series_id)
# Loop through issue list to find the required issue info
f_record = None
@ -641,20 +621,19 @@ class ComicVineTalker(ComicTalker):
params = {"api_key": self.api_key, "format": "json"}
cv_response = self.get_cv_content(issue_url, params)
issue_results = cast(CVIssueDetailResults, cv_response["results"])
issue_results = cast(CVIssue, cv_response["results"])
# Format to expected output
formatted_issues_result = self.format_issue_results([issue_results], True)
cv_issues = self.format_issue_results([issue_results], True)
# Due to issue/ not returning volume publisher, get it.
volume_info = self.fetch_partial_volume_data(formatted_issues_result[0]["volume"]["id"])
formatted_issues_result[0]["volume"]["publisher"] = volume_info["publisher"]
# Due to issue not returning publisher, fetch the series.
cv_issues[0]["series"] = self.fetch_series_data(cv_issues[0]["series"]["id"])
cvc.add_volume_issues_info(self.source_name, formatted_issues_result)
cvc.add_series_issues_info(self.source_name, cv_issues)
# Now, map the ComicIssue data to generic metadata
return talker_utils.map_comic_issue_to_metadata(
formatted_issues_result[0],
cv_issues[0],
self.source_name_friendly,
self.remove_html_tables,
self.use_series_start_as_volume,

View File

@ -5,7 +5,7 @@ import comictalker.resulttypes
from comicapi import utils
search_results = [
comictalker.resulttypes.ComicVolume(
comictalker.resulttypes.ComicSeries(
count_of_issues=1,
description="this is a description",
id=1,
@ -15,7 +15,7 @@ search_results = [
start_year=0,
aliases=[],
),
comictalker.resulttypes.ComicVolume(
comictalker.resulttypes.ComicSeries(
count_of_issues=1,
description="this is a description",
id=2,

View File

@ -4,6 +4,7 @@ from typing import Any
import comicapi.genericmetadata
from comicapi import utils
from comictalker.resulttypes import ComicIssue, ComicSeries
from comictalker.talker_utils import cleanup_html
@ -22,7 +23,7 @@ cv_issue_result: dict[str, Any] = {
"number_of_total_results": 1,
"status_code": 1,
"results": {
"aliases": [],
"aliases": None,
"api_detail_url": "https://comicvine.gamespot.com/api/issue/4000-140529/",
"associated_images": [],
"character_credits": [],
@ -117,7 +118,7 @@ cv_volume_result: dict[str, Any] = {
"number_of_total_results": 1,
"status_code": 1,
"results": {
"aliases": [],
"aliases": None,
"api_detail_url": "https://comicvine.gamespot.com/api/volume/4050-23437/",
"count_of_issues": 6,
"date_added": "2008-10-16 05:25:47",
@ -157,23 +158,34 @@ cv_not_found = {
"status_code": 101,
"results": [],
}
comic_issue_result: dict[str, Any] = {
"aliases": cv_issue_result["results"]["aliases"],
"cover_date": cv_issue_result["results"]["cover_date"],
"description": cv_issue_result["results"]["description"],
"id": cv_issue_result["results"]["id"],
"image_url": cv_issue_result["results"]["image"]["super_url"],
"image_thumb_url": cv_issue_result["results"]["image"]["thumb_url"],
"issue_number": cv_issue_result["results"]["issue_number"],
"name": cv_issue_result["results"]["name"],
"site_detail_url": cv_issue_result["results"]["site_detail_url"],
"volume": {
"api_detail_url": cv_issue_result["results"]["volume"]["api_detail_url"],
"id": cv_issue_result["results"]["volume"]["id"],
"name": cv_issue_result["results"]["volume"]["name"],
"site_detail_url": cv_issue_result["results"]["volume"]["site_detail_url"],
},
}
comic_issue_result = ComicIssue(
aliases=cv_issue_result["results"]["aliases"] or [],
cover_date=cv_issue_result["results"]["cover_date"],
description=cv_issue_result["results"]["description"],
id=cv_issue_result["results"]["id"],
image_url=cv_issue_result["results"]["image"]["super_url"],
image_thumb_url=cv_issue_result["results"]["image"]["thumb_url"],
issue_number=cv_issue_result["results"]["issue_number"],
name=cv_issue_result["results"]["name"],
site_detail_url=cv_issue_result["results"]["site_detail_url"],
series=ComicSeries(
id=cv_issue_result["results"]["volume"]["id"],
name=cv_issue_result["results"]["volume"]["name"],
aliases=[],
count_of_issues=0,
description="",
image_url="",
publisher="",
start_year=None,
),
characters=[],
alt_image_urls=[],
complete=False,
credits=[],
locations=[],
story_arcs=[],
teams=[],
)
date = utils.parse_date_str(cv_issue_result["results"]["cover_date"])
cv_md = comicapi.genericmetadata.GenericMetadata(

View File

@ -21,13 +21,13 @@ def test_search_results(comic_cache):
assert search_results == comic_cache.get_search_results("test", "test search")
@pytest.mark.parametrize("volume_info", search_results)
def test_volume_info(comic_cache, volume_info):
comic_cache.add_volume_info(volume_record=volume_info, source_name="test")
vi = volume_info.copy()
@pytest.mark.parametrize("series_info", search_results)
def test_series_info(comic_cache, series_info):
comic_cache.add_series_info(series_record=series_info, source_name="test")
vi = series_info.copy()
del vi["description"]
del vi["image_url"]
cache_result = comic_cache.get_volume_info(volume_id=volume_info["id"], source_name="test")
cache_result = comic_cache.get_series_info(series_id=series_info["id"], source_name="test")
del cache_result["description"]
del cache_result["image_url"]
assert vi == cache_result

View File

@ -14,28 +14,28 @@ def test_search_for_series(comicvine_api, comic_cache):
assert results == cache_issues
def test_fetch_volume_data(comicvine_api, comic_cache):
result = comicvine_api.fetch_partial_volume_data(23437)
def test_fetch_series_data(comicvine_api, comic_cache):
result = comicvine_api.fetch_series_data(23437)
del result["description"]
del result["image_url"]
cache_result = comic_cache.get_volume_info(23437, comicvine_api.source_name)
cache_result = comic_cache.get_series_info(23437, comicvine_api.source_name)
del cache_result["description"]
del cache_result["image_url"]
assert result == cache_result
def test_fetch_issues_by_volume(comicvine_api, comic_cache):
results = comicvine_api.fetch_issues_by_volume(23437)
cache_issues = comic_cache.get_volume_issues_info(23437, comicvine_api.source_name)
def test_fetch_issues_by_series(comicvine_api, comic_cache):
results = comicvine_api.fetch_issues_by_series(23437)
cache_issues = comic_cache.get_series_issues_info(23437, comicvine_api.source_name)
for r in results:
del r["volume"]
del r["series"]
del r["image_thumb_url"]
del r["characters"]
del r["locations"]
del r["story_arcs"]
del r["teams"]
for c in cache_issues:
del c["volume"]
del c["series"]
del c["characters"]
del c["locations"]
del c["story_arcs"]
@ -49,24 +49,12 @@ def test_fetch_issue_data_by_issue_id(comicvine_api):
assert result == testing.comicvine.cv_md
def test_fetch_issues_by_volume_issue_num_and_year(comicvine_api):
results = comicvine_api.fetch_issues_by_volume_issue_num_and_year([23437], "1", None)
def test_fetch_issues_by_series_issue_num_and_year(comicvine_api):
results = comicvine_api.fetch_issues_by_series_issue_num_and_year([23437], "1", None)
cv_expected = testing.comicvine.comic_issue_result.copy()
testing.comicvine.filter_field_list(
cv_expected,
{"params": {"field_list": "id,volume,issue_number,name,image,cover_date,site_detail_url,description,aliases"}},
)
for r, e in zip(results, [cv_expected]):
del r["image_thumb_url"]
del r["image_url"]
del r["alt_image_urls"]
del r["characters"]
del r["credits"]
del r["locations"]
del r["story_arcs"]
del r["teams"]
del r["complete"]
del r["volume"]["publisher"]
assert r["series"] == e["series"]
assert r == e
@ -77,8 +65,8 @@ cv_issue = [
]
@pytest.mark.parametrize("volume_id, issue_number, expected", cv_issue)
def test_fetch_issue_data(comicvine_api, volume_id, issue_number, expected):
results = comicvine_api.fetch_issue_data(volume_id, issue_number)
@pytest.mark.parametrize("series_id, issue_number, expected", cv_issue)
def test_fetch_issue_data(comicvine_api, series_id, issue_number, expected):
results = comicvine_api.fetch_issue_data(series_id, issue_number)
results.notes = None
assert results == expected

View File

@ -64,7 +64,7 @@ def test_search(cbz, options, comicvine_api):
"cv_issue_count": testing.comicvine.cv_volume_result["results"]["count_of_issues"],
"issue_title": testing.comicvine.cv_issue_result["results"]["name"],
"issue_id": testing.comicvine.cv_issue_result["results"]["id"],
"volume_id": testing.comicvine.cv_volume_result["results"]["id"],
"series_id": testing.comicvine.cv_volume_result["results"]["id"],
"month": testing.comicvine.date[1],
"year": testing.comicvine.date[2],
"publisher": testing.comicvine.cv_volume_result["results"]["publisher"]["name"],