Remove proxying from ComicTalker. Add some checks for talkers.
This commit is contained in:
parent
c5ad75370f
commit
cab69a32be
@ -43,7 +43,7 @@ def actual_issue_data_fetch(
|
||||
) -> GenericMetadata:
|
||||
# now get the particular issue data
|
||||
try:
|
||||
ct_md = talker_api.fetch_comic_data(match["volume_id"], match["issue_number"])
|
||||
ct_md = talker_api.talker.fetch_comic_data(match["volume_id"], match["issue_number"])
|
||||
except TalkerError as e:
|
||||
logger.exception(f"Error retrieving issue details. Save aborted.\n{e}")
|
||||
return GenericMetadata()
|
||||
@ -382,7 +382,7 @@ def process_file_cli(
|
||||
if opts.issue_id is not None:
|
||||
# we were given the actual issue ID to search with
|
||||
try:
|
||||
ct_md = talker_api.fetch_comic_data(0, "", opts.issue_id)
|
||||
ct_md = talker_api.talker.fetch_comic_data(0, "", opts.issue_id)
|
||||
except TalkerError as e:
|
||||
logger.exception(f"Error retrieving issue details. Save aborted.\n{e}")
|
||||
match_results.fetch_data_failures.append(str(ca.path.absolute()))
|
||||
|
@ -205,7 +205,7 @@ class CoverImageWidget(QtWidgets.QWidget):
|
||||
self.update_content()
|
||||
|
||||
# defer the alt cover search
|
||||
if self.talker_api.static_options.has_alt_covers:
|
||||
if self.talker_api.talker.source_details.static_options.has_alt_covers:
|
||||
QtCore.QTimer.singleShot(1, self.start_alt_cover_search)
|
||||
|
||||
def start_alt_cover_search(self) -> None:
|
||||
@ -216,7 +216,7 @@ class CoverImageWidget(QtWidgets.QWidget):
|
||||
|
||||
# page URL should already be cached, so no need to defer
|
||||
ComicTalker.alt_url_list_fetch_complete = self.sig.emit_list
|
||||
self.talker_api.async_fetch_alternate_cover_urls(utils.xlate(self.issue_id))
|
||||
self.talker_api.talker.async_fetch_alternate_cover_urls(utils.xlate(self.issue_id))
|
||||
|
||||
def alt_cover_url_list_fetch_complete(self, url_list: list[str]) -> None:
|
||||
if url_list:
|
||||
|
@ -271,7 +271,7 @@ class IssueIdentifier:
|
||||
raise IssueIdentifierCancelled
|
||||
|
||||
if use_remote_alternates:
|
||||
alt_img_url_list = self.talker_api.fetch_alternate_cover_urls(issue_id)
|
||||
alt_img_url_list = self.talker_api.talker.fetch_alternate_cover_urls(issue_id)
|
||||
for alt_url in alt_img_url_list:
|
||||
try:
|
||||
alt_url_image_data = ImageFetcher().fetch(alt_url, blocking=True)
|
||||
@ -370,7 +370,7 @@ class IssueIdentifier:
|
||||
|
||||
self.log_msg(f"Searching for {keys['series']} #{keys['issue_number']} ...")
|
||||
try:
|
||||
ct_search_results = self.talker_api.search_for_series(keys["series"])
|
||||
ct_search_results = self.talker_api.talker.search_for_series(keys["series"])
|
||||
except TalkerError as e:
|
||||
self.log_msg(f"Error searching for series.\n{e}")
|
||||
return []
|
||||
@ -423,7 +423,7 @@ class IssueIdentifier:
|
||||
series_second_round_list.sort(key=lambda x: len(x["name"]), reverse=False)
|
||||
|
||||
# If the sources lacks issue level data, don't look for it.
|
||||
if not self.talker_api.static_options.has_issues:
|
||||
if not self.talker_api.talker.source_details.static_options.has_issues:
|
||||
for series in series_second_round_list:
|
||||
hash_list = [cover_hash]
|
||||
if narrow_cover_hash is not None:
|
||||
@ -478,7 +478,7 @@ class IssueIdentifier:
|
||||
issue_list = None
|
||||
try:
|
||||
if len(volume_id_list) > 0:
|
||||
issue_list = self.talker_api.fetch_issues_by_volume_issue_num_and_year(
|
||||
issue_list = self.talker_api.talker.fetch_issues_by_volume_issue_num_and_year(
|
||||
volume_id_list, keys["issue_number"], keys["year"]
|
||||
)
|
||||
except TalkerError as e:
|
||||
|
@ -104,7 +104,7 @@ class IssueSelectionWindow(QtWidgets.QDialog):
|
||||
QtWidgets.QApplication.setOverrideCursor(QtGui.QCursor(QtCore.Qt.CursorShape.WaitCursor))
|
||||
|
||||
try:
|
||||
self.issue_list = self.talker_api.fetch_issues_by_volume(self.series_id)
|
||||
self.issue_list = self.talker_api.talker.fetch_issues_by_volume(self.series_id)
|
||||
except TalkerError as e:
|
||||
QtWidgets.QApplication.restoreOverrideCursor()
|
||||
QtWidgets.QMessageBox.critical(
|
||||
|
@ -1026,7 +1026,7 @@ Have fun!
|
||||
issue_number = str(self.leIssueNum.text()).strip()
|
||||
|
||||
# Only need this check is the source has issue level data.
|
||||
if autoselect and issue_number == "" and self.talker_api.static_options.has_issues:
|
||||
if autoselect and issue_number == "" and self.talker_api.talker.source_details.static_options.has_issues:
|
||||
QtWidgets.QMessageBox.information(
|
||||
self, "Automatic Identify Search", "Can't auto-identify without an issue number (yet!)"
|
||||
)
|
||||
@ -1070,7 +1070,7 @@ Have fun!
|
||||
self.form_to_metadata()
|
||||
|
||||
try:
|
||||
new_metadata = self.talker_api.fetch_comic_data(selector.volume_id, selector.issue_number)
|
||||
new_metadata = self.talker_api.talker.fetch_comic_data(selector.volume_id, selector.issue_number)
|
||||
except TalkerError as e:
|
||||
QtWidgets.QApplication.restoreOverrideCursor()
|
||||
QtWidgets.QMessageBox.critical(
|
||||
@ -1670,7 +1670,7 @@ Have fun!
|
||||
QtWidgets.QApplication.setOverrideCursor(QtGui.QCursor(QtCore.Qt.CursorShape.WaitCursor))
|
||||
|
||||
try:
|
||||
ct_md = self.talker_api.fetch_comic_data(match["volume_id"], match["issue_number"])
|
||||
ct_md = self.talker_api.talker.fetch_comic_data(match["volume_id"], match["issue_number"])
|
||||
except TalkerError as e:
|
||||
logger.exception(f"Save aborted.\n{e}")
|
||||
|
||||
|
@ -64,7 +64,7 @@ class SearchThread(QtCore.QThread):
|
||||
def run(self) -> None:
|
||||
try:
|
||||
self.ct_error = False
|
||||
self.ct_search_results = self.talker_api.search_for_series(
|
||||
self.ct_search_results = self.talker_api.talker.search_for_series(
|
||||
self.series_name, self.prog_callback, self.refresh, self.literal
|
||||
)
|
||||
except TalkerError as e:
|
||||
@ -177,7 +177,7 @@ class VolumeSelectionWindow(QtWidgets.QDialog):
|
||||
|
||||
self.btnRequery.setEnabled(enabled)
|
||||
|
||||
if self.talker_api.static_options.has_issues:
|
||||
if self.talker_api.talker.source_details.static_options.has_issues:
|
||||
self.btnIssues.setEnabled(enabled)
|
||||
self.btnAutoSelect.setEnabled(enabled)
|
||||
else:
|
||||
@ -198,7 +198,7 @@ class VolumeSelectionWindow(QtWidgets.QDialog):
|
||||
|
||||
def auto_select(self) -> None:
|
||||
|
||||
if self.talker_api.static_options.has_issues:
|
||||
if self.talker_api.talker.source_details.static_options.has_issues:
|
||||
if self.comic_archive is None:
|
||||
QtWidgets.QMessageBox.information(self, "Auto-Select", "You need to load a comic first!")
|
||||
return
|
||||
@ -494,7 +494,7 @@ class VolumeSelectionWindow(QtWidgets.QDialog):
|
||||
self.auto_select()
|
||||
|
||||
def cell_double_clicked(self, r: int, c: int) -> None:
|
||||
if self.talker_api.static_options.has_issues:
|
||||
if self.talker_api.talker.source_details.static_options.has_issues:
|
||||
self.show_issues()
|
||||
else:
|
||||
# Pass back to have taggerwindow get full series data
|
||||
|
@ -15,13 +15,11 @@
|
||||
# limitations under the License.
|
||||
from __future__ import annotations
|
||||
|
||||
import inspect
|
||||
import logging
|
||||
from importlib import import_module
|
||||
from typing import Callable
|
||||
|
||||
from comicapi.genericmetadata import GenericMetadata
|
||||
from comictalker.resulttypes import ComicIssue, ComicVolume
|
||||
from comictalker.talkerbase import SourceStaticOptions, TalkerError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -45,7 +43,6 @@ class ComicTalker:
|
||||
self.sources = self.get_talkers()
|
||||
# Set the active talker
|
||||
self.talker = self.get_active_talker()
|
||||
self.static_options = self.get_static_options()
|
||||
|
||||
def get_active_talker(self):
|
||||
# This should always work because it will have errored at get_talkers if there are none
|
||||
@ -54,115 +51,89 @@ class ComicTalker:
|
||||
|
||||
@staticmethod
|
||||
def get_talkers():
|
||||
def check_talker(module: str):
|
||||
testmodule = import_module("comictalker.talkers." + module)
|
||||
for name, obj in inspect.getmembers(testmodule):
|
||||
if inspect.isclass(obj):
|
||||
if name != "ComicTalker" and name.endswith("Talker"):
|
||||
# TODO Check if enabled?
|
||||
talker = obj()
|
||||
required_fields_details = ["name", "id"]
|
||||
required_fields_static = ["has_issues", "has_alt_covers", "has_censored_covers"]
|
||||
required_fields_settings = ["enabled", "url_root"]
|
||||
errors_found = False
|
||||
|
||||
if talker.source_details is None:
|
||||
logger.warning(module + " is missing required source_details.")
|
||||
return False
|
||||
if not talker.source_details.static_options:
|
||||
logger.warning(module + " is missing required static_options.")
|
||||
return False
|
||||
if not talker.source_details.settings_options:
|
||||
logger.warning(module + " is missing required settings_options.")
|
||||
return False
|
||||
|
||||
for field in required_fields_details:
|
||||
if not hasattr(talker.source_details, field):
|
||||
logger.warning(module + " is missing required source_details: " + field)
|
||||
errors_found = True
|
||||
# No need to check these as they have defaults, should defaults be None to catch?
|
||||
for field in required_fields_static:
|
||||
if not hasattr(talker.source_details.static_options, field):
|
||||
logger.warning(module + " is missing required static_options: " + field)
|
||||
errors_found = True
|
||||
for field in required_fields_settings:
|
||||
if field not in talker.source_details.settings_options:
|
||||
logger.warning(module + " is missing required settings_options: " + field)
|
||||
errors_found = True
|
||||
|
||||
if errors_found:
|
||||
return False
|
||||
|
||||
for key, val in talker.source_details.static_options.__dict__.items():
|
||||
# Check for required options has the correct type
|
||||
if key == "has_issues":
|
||||
if type(val) is not bool:
|
||||
logger.warning(module + " has incorrect key type: " + key + ":" + str(val))
|
||||
errors_found = True
|
||||
if key == "has_alt_covers":
|
||||
if type(val) is not bool:
|
||||
logger.warning(module + " has incorrect key type: " + key + ":" + str(val))
|
||||
errors_found = True
|
||||
if key == "has_censored_covers":
|
||||
if type(val) is not bool:
|
||||
logger.warning(module + " has incorrect key type: " + key + ":" + str(val))
|
||||
errors_found = True
|
||||
|
||||
for key, val in talker.source_details.settings_options.items():
|
||||
if key == "enabled":
|
||||
if type(val["value"]) is not bool:
|
||||
logger.warning(module + " has incorrect key type: " + key + ":" + str(val))
|
||||
errors_found = True
|
||||
if key == "url_root":
|
||||
# Check starts with http[s]:// too?
|
||||
if not val["value"]:
|
||||
logger.warning(module + " has missing value: " + key + ":" + str(val))
|
||||
errors_found = True
|
||||
|
||||
if errors_found:
|
||||
logger.warning(module + " is missing required settings. Check logs.")
|
||||
return False
|
||||
return True
|
||||
|
||||
# Hardcode import for now. Placed here to prevent circular import
|
||||
import comictalker.talkers.comicvine
|
||||
|
||||
return {"comicvine": comictalker.talkers.comicvine.ComicVineTalker()}
|
||||
if check_talker("comicvine"):
|
||||
return {"comicvine": comictalker.talkers.comicvine.ComicVineTalker()}
|
||||
|
||||
# For issueidentifier
|
||||
def set_log_func(self, log_func: Callable[[str], None]) -> None:
|
||||
self.talker.log_func = log_func
|
||||
|
||||
def get_static_options(self) -> SourceStaticOptions:
|
||||
return self.talker.source_details.static_options
|
||||
|
||||
def check_api_key(self, key: str, url: str, source_id: str):
|
||||
for source in self.sources.values():
|
||||
if source.source_details.id == source_id:
|
||||
return source.check_api_key(key, url)
|
||||
# Return false as back up or error?
|
||||
return False
|
||||
|
||||
# Master function to search for series/volumes
|
||||
def search_for_series(
|
||||
self,
|
||||
series_name: str,
|
||||
callback: Callable[[int, int], None] | None = None,
|
||||
refresh_cache: bool = False,
|
||||
literal: bool = False,
|
||||
) -> list[ComicVolume]:
|
||||
try:
|
||||
series_result = self.talker.search_for_series(series_name, callback, refresh_cache, literal)
|
||||
return series_result
|
||||
except NotImplementedError:
|
||||
logger.warning(f"{self.talker.source_details.name} has not implemented: 'search_for_series'")
|
||||
raise TalkerError(
|
||||
self.talker.source_details.name,
|
||||
4,
|
||||
"The source has not implemented: 'search_for_series'",
|
||||
)
|
||||
|
||||
# Get issue or volume information. issue_id is used by CLI
|
||||
def fetch_comic_data(self, series_id: int = 0, issue_number: str = "", issue_id: int = 0) -> GenericMetadata:
|
||||
"""This function is expected to handle a few possibilities:
|
||||
1. Only series_id. Retrieve the SERIES/VOLUME information only.
|
||||
2. series_id and issue_number. Retrieve the ISSUE information.
|
||||
3. Only issue_id. Used solely by the CLI to retrieve the ISSUE information."""
|
||||
try:
|
||||
comic_data = self.talker.fetch_comic_data(series_id, issue_number, issue_id)
|
||||
return comic_data
|
||||
except NotImplementedError:
|
||||
logger.warning(f"{self.talker.source_details.name} has not implemented: 'fetch_comic_data'")
|
||||
raise TalkerError(
|
||||
self.talker.source_details.name,
|
||||
4,
|
||||
"The source has not implemented: 'fetch_comic_data'",
|
||||
)
|
||||
|
||||
# Master function to get issues in a series/volume
|
||||
def fetch_issues_by_volume(self, series_id: int) -> list[ComicIssue]:
|
||||
try:
|
||||
issues_result = self.talker.fetch_issues_by_volume(series_id)
|
||||
return issues_result
|
||||
except NotImplementedError:
|
||||
logger.warning(f"{self.talker.source_details.name} has not implemented: 'fetch_issues_by_volume'")
|
||||
raise TalkerError(
|
||||
self.talker.source_details.name,
|
||||
4,
|
||||
"The source has not implemented: 'fetch_issues_by_volume'",
|
||||
)
|
||||
|
||||
# For issueidentifer
|
||||
def fetch_alternate_cover_urls(self, issue_id: int) -> list[str]:
|
||||
try:
|
||||
alt_covers = self.talker.fetch_alternate_cover_urls(issue_id)
|
||||
return alt_covers
|
||||
except NotImplementedError:
|
||||
logger.warning(f"{self.talker.source_details.name} has not implemented: 'fetch_alternate_cover_urls'")
|
||||
raise TalkerError(
|
||||
self.talker.source_details.name,
|
||||
4,
|
||||
"The source has not implemented: 'fetch_alternate_cover_urls'",
|
||||
)
|
||||
|
||||
# For issueidentifier
|
||||
def fetch_issues_by_volume_issue_num_and_year(
|
||||
self, volume_id_list: list[int], issue_number: str, year: str | int | None
|
||||
) -> list[ComicIssue]:
|
||||
try:
|
||||
issue_results = self.talker.fetch_issues_by_volume_issue_num_and_year(volume_id_list, issue_number, year)
|
||||
return issue_results
|
||||
except NotImplementedError:
|
||||
logger.warning(
|
||||
f"{self.talker.source_details.name} has not implemented: 'fetch_issues_by_volume_issue_num_and_year'"
|
||||
)
|
||||
raise TalkerError(
|
||||
self.talker.source_details.name,
|
||||
4,
|
||||
"The source has not implemented: 'fetch_issues_by_volume_issue_num_and_year'",
|
||||
)
|
||||
|
||||
def async_fetch_alternate_cover_urls(
|
||||
self,
|
||||
issue_id: int,
|
||||
) -> None:
|
||||
try:
|
||||
# TODO: Figure out async
|
||||
url_list = self.fetch_alternate_cover_urls(issue_id)
|
||||
ComicTalker.alt_url_list_fetch_complete(url_list)
|
||||
logger.info("Should be downloading alt image list: %s", url_list)
|
||||
return
|
||||
|
||||
self.talker.async_fetch_alternate_cover_urls(issue_id)
|
||||
except NotImplementedError:
|
||||
logger.warning(f"{self.talker.source_details.name} has not implemented: 'async_fetch_alternate_cover_urls'")
|
||||
|
@ -193,6 +193,10 @@ class TalkerBase:
|
||||
|
||||
# Get issue or volume information
|
||||
def fetch_comic_data(self, series_id: int, issue_number: str = "") -> GenericMetadata:
|
||||
"""This function is expected to handle a few possibilities:
|
||||
1. Only series_id. Retrieve the SERIES/VOLUME information only.
|
||||
2. series_id and issue_number. Retrieve the ISSUE information.
|
||||
3. Only issue_id. Used solely by the CLI to retrieve the ISSUE information."""
|
||||
raise NotImplementedError
|
||||
|
||||
def fetch_alternate_cover_urls(self, issue_id: int) -> list[str]:
|
||||
|
Loading…
Reference in New Issue
Block a user