Don't proxy talker (really this time). Remove talker custom logging. Move static_options and settings_options to root of class object. Temp hack to keep talker menu genration working until settings revamp.

This commit is contained in:
Mizaki 2022-10-27 23:36:57 +01:00
parent 4514ae80d0
commit 561dc28044
21 changed files with 217 additions and 314 deletions

View File

@ -27,7 +27,7 @@ from comictaggerlib.coverimagewidget import CoverImageWidget
from comictaggerlib.resulttypes import IssueResult, MultipleMatch
from comictaggerlib.settings import ComicTaggerSettings
from comictaggerlib.ui.qtutils import reduce_widget_font_size
from comictalker.comictalker import ComicTalker
from comictalker.talkerbase import ComicTalker
logger = logging.getLogger(__name__)

View File

@ -22,7 +22,7 @@ from PyQt5 import QtCore, QtWidgets, uic
from comictaggerlib.coverimagewidget import CoverImageWidget
from comictaggerlib.settings import ComicTaggerSettings
from comictaggerlib.ui.qtutils import reduce_widget_font_size
from comictalker.comictalker import ComicTalker
from comictalker.talkerbase import ComicTalker
logger = logging.getLogger(__name__)

View File

@ -32,8 +32,7 @@ from comictaggerlib.filerenamer import FileRenamer, get_rename_dir
from comictaggerlib.issueidentifier import IssueIdentifier
from comictaggerlib.resulttypes import IssueResult, MultipleMatch, OnlineMatchResults
from comictaggerlib.settings import ComicTaggerSettings
from comictalker.comictalker import ComicTalker
from comictalker.talkerbase import TalkerError
from comictalker.talkerbase import ComicTalker, TalkerError
logger = logging.getLogger(__name__)
@ -43,7 +42,7 @@ def actual_issue_data_fetch(
) -> GenericMetadata:
# now get the particular issue data
try:
ct_md = talker_api.talker.fetch_comic_data(match["volume_id"], match["issue_number"])
ct_md = talker_api.fetch_comic_data(match["volume_id"], match["issue_number"])
except TalkerError as e:
logger.exception(f"Error retrieving issue details. Save aborted.\n{e}")
return GenericMetadata()
@ -382,7 +381,7 @@ def process_file_cli(
if opts.issue_id is not None:
# we were given the actual issue ID to search with
try:
ct_md = talker_api.talker.fetch_comic_data(0, "", opts.issue_id)
ct_md = talker_api.fetch_comic_data(issue_id=opts.issue_id)
except TalkerError as e:
logger.exception(f"Error retrieving issue details. Save aborted.\n{e}")
match_results.fetch_data_failures.append(str(ca.path.absolute()))

View File

@ -24,6 +24,7 @@ from typing import Callable
from PyQt5 import QtCore, QtGui, QtWidgets, uic
import comictalker.comictalkerapi as ct_api
from comicapi import utils
from comicapi.comicarchive import ComicArchive
from comictaggerlib.imagefetcher import ImageFetcher
@ -31,7 +32,7 @@ from comictaggerlib.imagepopup import ImagePopup
from comictaggerlib.pageloader import PageLoader
from comictaggerlib.settings import ComicTaggerSettings
from comictaggerlib.ui.qtutils import get_qimage_from_data, reduce_widget_font_size
from comictalker.comictalker import ComicTalker
from comictalker.talkerbase import ComicTalker
logger = logging.getLogger(__name__)
@ -183,8 +184,8 @@ class CoverImageWidget(QtWidgets.QWidget):
self.update_content()
self.issue_id = issue_id
ComicTalker.url_fetch_complete = self.sig.emit_url
ComicTalker.url_fetch_complete(image_url, None)
ct_api.url_fetch_complete = self.sig.emit_url
ct_api.url_fetch_complete(image_url, None)
def set_image_data(self, image_data: bytes) -> None:
if self.mode == CoverImageWidget.DataMode:
@ -205,7 +206,7 @@ class CoverImageWidget(QtWidgets.QWidget):
self.update_content()
# TODO No need to search for alt covers as they are now in ComicIssue data
if self.talker_api.talker.source_details.static_options.has_alt_covers:
if self.talker_api.static_options.has_alt_covers:
QtCore.QTimer.singleShot(1, self.start_alt_cover_search)
def start_alt_cover_search(self) -> None:
@ -215,8 +216,8 @@ class CoverImageWidget(QtWidgets.QWidget):
self.label.setText("Searching for alt. covers...")
# page URL should already be cached, so no need to defer
ComicTalker.alt_url_list_fetch_complete = self.sig.emit_list
self.talker_api.talker.fetch_alternate_cover_urls(utils.xlate(self.issue_id))
ct_api.alt_url_list_fetch_complete = self.sig.emit_list
self.talker_api.fetch_alternate_cover_urls(utils.xlate(self.issue_id))
def alt_cover_url_list_fetch_complete(self, url_list: list[str]) -> None:
if url_list:

View File

@ -30,8 +30,7 @@ from comictaggerlib.imagefetcher import ImageFetcher, ImageFetcherException
from comictaggerlib.imagehasher import ImageHasher
from comictaggerlib.resulttypes import IssueResult
from comictaggerlib.settings import ComicTaggerSettings
from comictalker.comictalker import ComicTalker
from comictalker.talkerbase import TalkerError
from comictalker.talkerbase import ComicTalker, TalkerError
from comictalker.utils import parse_date_str
logger = logging.getLogger(__name__)
@ -271,7 +270,7 @@ class IssueIdentifier:
raise IssueIdentifierCancelled
if use_remote_alternates:
alt_img_url_list = self.talker_api.talker.fetch_alternate_cover_urls(issue_id)
alt_img_url_list = self.talker_api.fetch_alternate_cover_urls(issue_id)
for alt_url in alt_img_url_list:
try:
alt_url_image_data = ImageFetcher().fetch(alt_url, blocking=True)
@ -366,11 +365,9 @@ class IssueIdentifier:
if keys["month"] is not None:
self.log_msg("\tMonth: " + str(keys["month"]))
self.talker_api.set_log_func(self.output_function)
self.log_msg(f"Searching for {keys['series']} #{keys['issue_number']} ...")
try:
ct_search_results = self.talker_api.talker.search_for_series(keys["series"])
ct_search_results = self.talker_api.search_for_series(keys["series"])
except TalkerError as e:
self.log_msg(f"Error searching for series.\n{e}")
return []
@ -423,7 +420,7 @@ class IssueIdentifier:
series_second_round_list.sort(key=lambda x: len(x["name"]), reverse=False)
# If the sources lacks issue level data, don't look for it.
if not self.talker_api.talker.source_details.static_options.has_issues:
if not self.talker_api.static_options.has_issues:
for series in series_second_round_list:
hash_list = [cover_hash]
if narrow_cover_hash is not None:
@ -478,7 +475,7 @@ class IssueIdentifier:
issue_list = None
try:
if len(volume_id_list) > 0:
issue_list = self.talker_api.talker.fetch_issues_by_volume_issue_num_and_year(
issue_list = self.talker_api.fetch_issues_by_volume_issue_num_and_year(
volume_id_list, keys["issue_number"], keys["year"]
)
except TalkerError as e:

View File

@ -23,9 +23,8 @@ from comicapi.issuestring import IssueString
from comictaggerlib.coverimagewidget import CoverImageWidget
from comictaggerlib.settings import ComicTaggerSettings
from comictaggerlib.ui.qtutils import reduce_widget_font_size
from comictalker.comictalker import ComicTalker
from comictalker.resulttypes import ComicIssue
from comictalker.talkerbase import TalkerError
from comictalker.talkerbase import ComicTalker, TalkerError
logger = logging.getLogger(__name__)
@ -104,7 +103,7 @@ class IssueSelectionWindow(QtWidgets.QDialog):
QtWidgets.QApplication.setOverrideCursor(QtGui.QCursor(QtCore.Qt.CursorShape.WaitCursor))
try:
self.issue_list = self.talker_api.talker.fetch_issues_by_volume(self.series_id)
self.issue_list = self.talker_api.fetch_issues_by_volume(self.series_id)
except TalkerError as e:
QtWidgets.QApplication.restoreOverrideCursor()
QtWidgets.QMessageBox.critical(

View File

@ -25,12 +25,12 @@ import sys
import traceback
import types
import comictalker.comictalkerapi as ct_api
from comicapi import utils
from comictaggerlib import cli
from comictaggerlib.ctversion import version
from comictaggerlib.options import parse_cmd_line
from comictaggerlib.settings import ComicTaggerSettings
from comictalker.comictalker import ComicTalker
if sys.version_info < (3, 10):
import importlib_metadata
@ -149,7 +149,7 @@ def ctmain() -> None:
for pkg in sorted(importlib_metadata.distributions(), key=lambda x: x.name):
logger.debug("%s\t%s", pkg.metadata["Name"], pkg.metadata["Version"])
talker_api = ComicTalker(settings.comic_info_source)
talker_api = ct_api.get_comic_talker(settings.comic_info_source)()
utils.load_publishers()
update_publishers()

View File

@ -25,7 +25,7 @@ from comictaggerlib.coverimagewidget import CoverImageWidget
from comictaggerlib.resulttypes import IssueResult
from comictaggerlib.settings import ComicTaggerSettings
from comictaggerlib.ui.qtutils import reduce_widget_font_size
from comictalker.comictalker import ComicTalker
from comictalker.talkerbase import ComicTalker
logger = logging.getLogger(__name__)

View File

@ -24,7 +24,7 @@ from comicapi.comicarchive import ComicArchive
from comicapi.genericmetadata import GenericMetadata
from comictaggerlib.coverimagewidget import CoverImageWidget
from comictaggerlib.settings import ComicTaggerSettings
from comictalker.comictalker import ComicTalker
from comictalker.talkerbase import ComicTalker
logger = logging.getLogger(__name__)

View File

@ -23,7 +23,7 @@ from comicapi.comicarchive import ComicArchive, MetaDataStyle
from comicapi.genericmetadata import ImageMetadata, PageType
from comictaggerlib.coverimagewidget import CoverImageWidget
from comictaggerlib.settings import ComicTaggerSettings
from comictalker.comictalker import ComicTalker
from comictalker.talkerbase import ComicTalker
logger = logging.getLogger(__name__)

View File

@ -26,7 +26,7 @@ from comictaggerlib.filerenamer import FileRenamer, get_rename_dir
from comictaggerlib.settings import ComicTaggerSettings
from comictaggerlib.settingswindow import SettingsWindow
from comictaggerlib.ui.qtutils import center_window_on_parent
from comictalker.comictalker import ComicTalker
from comictalker.talkerbase import ComicTalker
logger = logging.getLogger(__name__)

View File

@ -23,13 +23,14 @@ import platform
from PyQt5 import QtCore, QtGui, QtWidgets, uic
import comictalker.comictalkerapi as ct_api
from comicapi import utils
from comicapi.genericmetadata import md_test
from comictaggerlib.filerenamer import FileRenamer
from comictaggerlib.imagefetcher import ImageFetcher
from comictaggerlib.settings import ComicTaggerSettings
from comictalker.comiccacher import ComicCacher
from comictalker.comictalker import ComicTalker
from comictalker.talkerbase import ComicTalker
logger = logging.getLogger(__name__)
@ -137,6 +138,8 @@ class SettingsWindow(QtWidgets.QDialog):
)
self.settings = settings
# TODO Quick hack to allow menus to work
self.available_talkers = ct_api.get_talkers()
self.talker_api = talker_api
self.name = "Settings"
@ -199,7 +202,9 @@ class SettingsWindow(QtWidgets.QDialog):
def generate_source_option_tabs(self) -> None:
# Add source sub tabs to Comic Sources tab
for source in self.talker_api.sources.values():
for source_cls in self.available_talkers.values():
# TODO Remove hack
source = source_cls()
# Add source to general tab dropdown list
self.cobxInfoSource.addItem(source.source_details.name, source.source_details.id)
# Use a dict to make a var name from var
@ -208,7 +213,7 @@ class SettingsWindow(QtWidgets.QDialog):
source_info[tab_name] = {"tab": QtWidgets.QWidget(), "widgets": {}}
layout_grid = QtWidgets.QGridLayout()
row = 0
for option in source.source_details.settings_options.values():
for option in source.settings_options.values():
if not option["hidden"]:
current_widget = None
if option["type"] is bool:
@ -398,7 +403,8 @@ class SettingsWindow(QtWidgets.QDialog):
self.settings.id_publisher_filter = str(self.tePublisherFilter.toPlainText())
self.settings.comic_info_source = str(self.cobxInfoSource.itemData(self.cobxInfoSource.currentIndex()))
# Also change current talker_api object
self.talker_api.source = self.settings.comic_info_source
# TODO
# self.talker_api.source = self.settings.comic_info_source
self.settings.complicated_parser = self.cbxComplicatedParser.isChecked()
self.settings.remove_c2c = self.cbxRemoveC2C.isChecked()
@ -430,12 +436,14 @@ class SettingsWindow(QtWidgets.QDialog):
self.settings.rename_strict = self.cbxRenameStrict.isChecked()
# Read settings from sources tabs and generate self.settings.config data
for source in self.talker_api.sources.values():
for source_cls in self.available_talkers.values():
# TODO Remove hack
source = source_cls()
source_info = self.sources[source.source_details.id]
if not self.settings.config.has_section(source.source_details.id):
self.settings.config.add_section(source.source_details.id)
# Iterate over sources options and get the tab setting
for option in source.source_details.settings_options.values():
for option in source.settings_options.values():
# Only save visible here
if option["name"] in source_info["widgets"]:
# Set the tab setting for the talker class var
@ -457,9 +465,9 @@ class SettingsWindow(QtWidgets.QDialog):
if option["name"] == "enabled":
# Set to disabled if is not the selected talker
if source.source_details.id != self.settings.comic_info_source:
source.source_details.settings_options["enabled"]["value"] = False
source.settings_options["enabled"]["value"] = False
else:
source.source_details.settings_options["enabled"]["value"] = True
source.settings_options["enabled"]["value"] = True
else:
# Ensure correct type
if option["type"] is bool:
@ -486,12 +494,17 @@ class SettingsWindow(QtWidgets.QDialog):
QtWidgets.QMessageBox.information(self, self.name, "Cache has been cleared.")
def test_api_key(self, source_id) -> None:
key = self.sources[source_id]["widgets"]["api_key"].text().strip()
url = self.sources[source_id]["widgets"]["url_root"].text().strip()
if self.talker_api.check_api_key(key, url, source_id):
QtWidgets.QMessageBox.information(self, "API Key Test", "Key is valid!")
# TODO Only allow testing of active talker?
if source_id == self.settings.comic_info_source:
key = self.sources[source_id]["widgets"]["api_key"].text().strip()
url = self.sources[source_id]["widgets"]["url_root"].text().strip()
if self.talker_api.check_api_key(key, url):
QtWidgets.QMessageBox.information(self, "API Key Test", "Key is valid!")
else:
QtWidgets.QMessageBox.warning(self, "API Key Test", "Key is NOT valid!")
else:
QtWidgets.QMessageBox.warning(self, "API Key Test", "Key is NOT valid.")
QtWidgets.QMessageBox.warning(self, "API Key Test", "Can only test active comic source.")
def reset_settings(self) -> None:
self.settings.reset()

View File

@ -61,8 +61,7 @@ from comictaggerlib.settingswindow import SettingsWindow
from comictaggerlib.ui.qtutils import center_window_on_parent, reduce_widget_font_size
from comictaggerlib.versionchecker import VersionChecker
from comictaggerlib.volumeselectionwindow import VolumeSelectionWindow
from comictalker.comictalker import ComicTalker
from comictalker.talkerbase import TalkerError
from comictalker.talkerbase import ComicTalker, TalkerError
logger = logging.getLogger(__name__)
@ -1026,7 +1025,7 @@ Have fun!
issue_number = str(self.leIssueNum.text()).strip()
# Only need this check is the source has issue level data.
if autoselect and issue_number == "" and self.talker_api.talker.source_details.static_options.has_issues:
if autoselect and issue_number == "" and self.talker_api.static_options.has_issues:
QtWidgets.QMessageBox.information(
self, "Automatic Identify Search", "Can't auto-identify without an issue number (yet!)"
)
@ -1070,7 +1069,7 @@ Have fun!
self.form_to_metadata()
try:
new_metadata = self.talker_api.talker.fetch_comic_data(selector.volume_id, selector.issue_number)
new_metadata = self.talker_api.fetch_comic_data(selector.volume_id, selector.issue_number)
except TalkerError as e:
QtWidgets.QApplication.restoreOverrideCursor()
QtWidgets.QMessageBox.critical(
@ -1670,7 +1669,7 @@ Have fun!
QtWidgets.QApplication.setOverrideCursor(QtGui.QCursor(QtCore.Qt.CursorShape.WaitCursor))
try:
ct_md = self.talker_api.talker.fetch_comic_data(match["volume_id"], match["issue_number"])
ct_md = self.talker_api.fetch_comic_data(match["volume_id"], match["issue_number"])
except TalkerError as e:
logger.exception(f"Save aborted.\n{e}")

View File

@ -32,9 +32,8 @@ from comictaggerlib.matchselectionwindow import MatchSelectionWindow
from comictaggerlib.progresswindow import IDProgressWindow
from comictaggerlib.settings import ComicTaggerSettings
from comictaggerlib.ui.qtutils import reduce_widget_font_size
from comictalker.comictalker import ComicTalker
from comictalker.resulttypes import ComicVolume
from comictalker.talkerbase import TalkerError
from comictalker.talkerbase import ComicTalker, TalkerError
logger = logging.getLogger(__name__)
@ -64,7 +63,7 @@ class SearchThread(QtCore.QThread):
def run(self) -> None:
try:
self.ct_error = False
self.ct_search_results = self.talker_api.talker.search_for_series(
self.ct_search_results = self.talker_api.search_for_series(
self.series_name, self.prog_callback, self.refresh, self.literal
)
except TalkerError as e:
@ -177,14 +176,14 @@ class VolumeSelectionWindow(QtWidgets.QDialog):
self.btnRequery.setEnabled(enabled)
if self.talker_api.talker.source_details.static_options.has_issues:
if self.talker_api.static_options.has_issues:
self.btnIssues.setEnabled(enabled)
self.btnAutoSelect.setEnabled(enabled)
else:
self.btnIssues.setEnabled(False)
self.btnIssues.setToolTip("Unsupported by " + self.talker_api.talker.source_details.name)
self.btnIssues.setToolTip("Unsupported by " + self.talker_api.source_details.name)
self.btnAutoSelect.setEnabled(False)
self.btnAutoSelect.setToolTip("Unsupported by " + self.talker_api.talker.source_details.name)
self.btnAutoSelect.setToolTip("Unsupported by " + self.talker_api.source_details.name)
self.buttonBox.button(QtWidgets.QDialogButtonBox.StandardButton.Ok).setEnabled(enabled)
@ -198,7 +197,7 @@ class VolumeSelectionWindow(QtWidgets.QDialog):
def auto_select(self) -> None:
if self.talker_api.talker.source_details.static_options.has_issues:
if self.talker_api.static_options.has_issues:
if self.comic_archive is None:
QtWidgets.QMessageBox.information(self, "Auto-Select", "You need to load a comic first!")
return
@ -494,7 +493,7 @@ class VolumeSelectionWindow(QtWidgets.QDialog):
self.auto_select()
def cell_double_clicked(self, r: int, c: int) -> None:
if self.talker_api.talker.source_details.static_options.has_issues:
if self.talker_api.static_options.has_issues:
self.show_issues()
else:
# Pass back to have taggerwindow get full series data

View File

@ -308,7 +308,7 @@ class ComicCacher:
return results
def get_issue_info(self, issue_id: int, source_name: str) -> ComicIssue:
def get_issue_info(self, issue_id: int, source_name: str) -> ComicIssue | None:
con = lite.connect(self.db_file)
with con:
cur = con.cursor()

View File

@ -1,139 +0,0 @@
"""Handles collecting data from source talkers.
"""
# Copyright 2012-2014 Anthony Beville
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import inspect
import logging
from importlib import import_module
from typing import Callable
logger = logging.getLogger(__name__)
# To signal image loaded etc.
def list_fetch_complete(url_list: list[str]) -> None:
...
def url_fetch_complete(image_url: str, thumb_url: str | None) -> None:
...
class ComicTalker:
alt_url_list_fetch_complete = list_fetch_complete
url_fetch_complete = url_fetch_complete
def __init__(self, source_name) -> None:
# ID of the source to use e.g. comicvine
self.source = source_name
# Retrieve the available sources modules
self.sources = self.get_talkers()
# Set the active talker
self.talker = self.get_active_talker()
def get_active_talker(self):
# This should always work because it will have errored at get_talkers if there are none
if not self.sources[self.source] is None:
return self.sources[self.source]
@staticmethod
def get_talkers():
def check_talker(module: str):
testmodule = import_module("comictalker.talkers." + module)
for name, obj in inspect.getmembers(testmodule):
if inspect.isclass(obj):
if name != "ComicTalker" and name.endswith("Talker"):
# TODO Check if enabled?
talker = obj()
required_fields_details = ["name", "id"]
required_fields_static = ["has_issues", "has_alt_covers", "has_censored_covers"]
required_fields_settings = ["enabled", "url_root"]
errors_found = False
if talker.source_details is None:
logger.warning(module + " is missing required source_details.")
return False
if not talker.source_details.static_options:
logger.warning(module + " is missing required static_options.")
return False
if not talker.source_details.settings_options:
logger.warning(module + " is missing required settings_options.")
return False
for field in required_fields_details:
if not hasattr(talker.source_details, field):
logger.warning(module + " is missing required source_details: " + field)
errors_found = True
# No need to check these as they have defaults, should defaults be None to catch?
for field in required_fields_static:
if not hasattr(talker.source_details.static_options, field):
logger.warning(module + " is missing required static_options: " + field)
errors_found = True
for field in required_fields_settings:
if field not in talker.source_details.settings_options:
logger.warning(module + " is missing required settings_options: " + field)
errors_found = True
if errors_found:
return False
for key, val in talker.source_details.static_options.__dict__.items():
# Check for required options has the correct type
if key == "has_issues":
if type(val) is not bool:
logger.warning(module + " has incorrect key type: " + key + ":" + str(val))
errors_found = True
if key == "has_alt_covers":
if type(val) is not bool:
logger.warning(module + " has incorrect key type: " + key + ":" + str(val))
errors_found = True
if key == "has_censored_covers":
if type(val) is not bool:
logger.warning(module + " has incorrect key type: " + key + ":" + str(val))
errors_found = True
for key, val in talker.source_details.settings_options.items():
if key == "enabled":
if type(val["value"]) is not bool:
logger.warning(module + " has incorrect key type: " + key + ":" + str(val))
errors_found = True
if key == "url_root":
# Check starts with http[s]:// too?
if not val["value"]:
logger.warning(module + " has missing value: " + key + ":" + str(val))
errors_found = True
if errors_found:
logger.warning(module + " is missing required settings. Check logs.")
return False
return True
# Hardcode import for now. Placed here to prevent circular import
import comictalker.talkers.comicvine
if check_talker("comicvine"):
return {"comicvine": comictalker.talkers.comicvine.ComicVineTalker()}
# For issueidentifier
def set_log_func(self, log_func: Callable[[str], None]) -> None:
self.talker.log_func = log_func
def check_api_key(self, key: str, url: str, source_id: str):
for source in self.sources.values():
if source.source_details.id == source_id:
return source.check_api_key(key, url)
# Return false as back up or error?
return False

View File

@ -0,0 +1,50 @@
"""Handles collecting data from source talkers."""
# Copyright 2012-2014 Anthony Beville
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import logging
import comictalker.talkers.comicvine
from comictalker.talkerbase import ComicTalker, TalkerError # renamed TalkerBase to ComicTalker
logger = logging.getLogger(__name__)
# To signal image loaded etc. TODO Won't be needed hopefully with new async loading
def list_fetch_complete(url_list: list[str]) -> None:
...
def url_fetch_complete(image_url: str, thumb_url: str | None) -> None:
...
alt_url_list_fetch_complete = list_fetch_complete
url_fetch_complete = url_fetch_complete
def get_comic_talker(source_name: str) -> type[ComicTalker]:
# Retrieve the available sources modules
sources = get_talkers()
if source_name not in sources:
raise TalkerError(source=source_name, code=4, desc="The talker does not exist")
talker = sources[source_name]
return talker
def get_talkers():
return {"comicvine": comictalker.talkers.comicvine.ComicVineTalker}

View File

@ -31,15 +31,11 @@ logger = logging.getLogger(__name__)
class SourceDetails:
def __init__(
self,
name: str,
ident: str,
static_options: SourceStaticOptions,
settings_options: dict[str, SourceSettingsOptions],
name: str = "",
ident: str = "",
):
self.name = name
self.id = ident
self.static_options = static_options
self.settings_options = settings_options
class SourceStaticOptions:
@ -155,24 +151,15 @@ class TalkerDataError(TalkerError):
# Class talkers instance
class TalkerBase:
class ComicTalker:
"""This is the class for mysource."""
def __init__(self) -> None:
# Identity name for the information source etc.
self.source_details: SourceDetails | None = None # Can use this to test if custom talker has been configured
self.log_func: Callable[[str], None] | None = None
def set_log_func(self, log_func: Callable[[str], None]) -> None:
self.log_func = log_func
# issueidentifier will set_log_func to print any write_log to console otherwise logger.info is not printed
def write_log(self, text: str) -> None:
if self.log_func is None:
logger.info(text)
else:
self.log_func(text)
self.source_details: SourceDetails = (
SourceDetails()
) # Can use this to test if custom talker has been configured
self.static_options: SourceStaticOptions = SourceStaticOptions()
def check_api_key(self, key: str, url: str):
raise NotImplementedError
@ -192,13 +179,14 @@ class TalkerBase:
raise NotImplementedError
# Get issue or volume information
def fetch_comic_data(self, series_id: int, issue_number: str = "") -> GenericMetadata:
def fetch_comic_data(self, series_id: int = 0, issue_number: str = "", issue_id: int = 0) -> GenericMetadata:
"""This function is expected to handle a few possibilities:
1. Only series_id. Retrieve the SERIES/VOLUME information only.
2. series_id and issue_number. Retrieve the ISSUE information.
3. Only issue_id. Retrieve the ISSUE information."""
raise NotImplementedError
# TODO Should be able to remove with alt cover rework
def fetch_alternate_cover_urls(self, issue_id: int) -> list[str]:
raise NotImplementedError

View File

@ -27,19 +27,19 @@ import requests
from bs4 import BeautifulSoup
from typing_extensions import Required, TypedDict
import comictalker.comictalkerapi as ct_api
from comicapi import utils
from comicapi.genericmetadata import GenericMetadata
from comicapi.issuestring import IssueString
from comictaggerlib import ctversion
from comictaggerlib.settings import ComicTaggerSettings
from comictalker.comiccacher import ComicCacher
from comictalker.comictalker import ComicTalker
from comictalker.resulttypes import ComicIssue, ComicVolume
from comictalker.talkerbase import (
ComicTalker,
SourceDetails,
SourceSettingsOptions,
SourceStaticOptions,
TalkerBase,
TalkerDataError,
TalkerNetworkError,
)
@ -193,98 +193,94 @@ class CVIssueDetailResults(TypedDict):
CV_RATE_LIMIT_STATUS = 107
class ComicVineTalker(TalkerBase):
class ComicVineTalker(ComicTalker):
def __init__(self, series_match_thresh: int = 90) -> None:
super().__init__()
self.source_details = source_details = SourceDetails(
self.source_details = SourceDetails(
name="Comic Vine",
ident="comicvine",
static_options=SourceStaticOptions(
logo_url="http://static.comicvine.com/bundles/comicvinesite/images/logo.png",
has_issues=True,
has_alt_covers=True,
requires_apikey=True,
has_nsfw=False,
has_censored_covers=False,
),
settings_options={
"enabled": SourceSettingsOptions(
name="enabled", text="Enabled", help_text="", hidden=True, type=bool, value=True
),
"order": SourceSettingsOptions(
name="order", text="Order", help_text="", hidden=True, type=int, value=1
),
"remove_html_tables": SourceSettingsOptions(
name="remove_html_tables",
text="Remove HTML tables",
help_text="Remove tables in description",
hidden=False,
type=bool,
value=False,
),
"use_series_start_as_volume": SourceSettingsOptions(
name="use_series_start_as_volume",
text="Use series start year as volume number",
help_text="Use the series start year as the volume number",
hidden=False,
type=bool,
value=False,
),
"wait_on_ratelimit": SourceSettingsOptions(
name="wait_on_ratelimit",
text="Retry on API limit",
help_text="If the Comic Vine API limit is reached, wait and retry",
hidden=False,
type=bool,
value=False,
),
"ratelimit_waittime": SourceSettingsOptions(
name="ratelimit_waittime",
text="API maximum wait time (minutes)",
help_text="Maximum time to wait before abandoning retries",
hidden=False,
type=int,
value=20,
),
"url_root": SourceSettingsOptions(
name="url_root",
text="Comic Vine API address",
help_text="Example: https://api.comicsource.net",
hidden=False,
type=str,
value="https://comicvine.gamespot.com/api",
),
"api_key": SourceSettingsOptions(
name="api_key",
text="API key",
help_text="Comic Vine API key",
hidden=False,
type=str,
value="27431e6787042105bd3e47e169a624521f89f3a4",
),
},
)
self.static_options = SourceStaticOptions(
logo_url="http://static.comicvine.com/bundles/comicvinesite/images/logo.png",
has_issues=True,
has_alt_covers=True,
requires_apikey=True,
has_nsfw=False,
has_censored_covers=False,
)
self.settings_options = {
"enabled": SourceSettingsOptions(
name="enabled", text="Enabled", help_text="", hidden=True, type=bool, value=True
),
"order": SourceSettingsOptions(name="order", text="Order", help_text="", hidden=True, type=int, value=1),
"remove_html_tables": SourceSettingsOptions(
name="remove_html_tables",
text="Remove HTML tables",
help_text="Remove tables in description",
hidden=False,
type=bool,
value=False,
),
"use_series_start_as_volume": SourceSettingsOptions(
name="use_series_start_as_volume",
text="Use series start year as volume number",
help_text="Use the series start year as the volume number",
hidden=False,
type=bool,
value=False,
),
"wait_on_ratelimit": SourceSettingsOptions(
name="wait_on_ratelimit",
text="Retry on API limit",
help_text="If the Comic Vine API limit is reached, wait and retry",
hidden=False,
type=bool,
value=False,
),
"ratelimit_waittime": SourceSettingsOptions(
name="ratelimit_waittime",
text="API maximum wait time (minutes)",
help_text="Maximum time to wait before abandoning retries",
hidden=False,
type=int,
value=20,
),
"url_root": SourceSettingsOptions(
name="url_root",
text="Comic Vine API address",
help_text="Example: https://api.comicsource.net",
hidden=False,
type=str,
value="https://comicvine.gamespot.com/api",
),
"api_key": SourceSettingsOptions(
name="api_key",
text="API key",
help_text="Comic Vine API key",
hidden=False,
type=str,
value="27431e6787042105bd3e47e169a624521f89f3a4",
),
}
# Identity name for the information source
self.source_name = self.source_details.id
self.source_name_friendly = self.source_details.name
# Overwrite any source_details.options that have saved settings
source_settings = ComicTaggerSettings.get_source_settings(
self.source_name, self.source_details.settings_options
)
source_settings = ComicTaggerSettings.get_source_settings(self.source_name, self.settings_options)
if not source_settings:
# No saved settings, do something?
...
self.wait_for_rate_limit = source_details.settings_options["wait_on_ratelimit"]["value"]
self.wait_for_rate_limit_time = source_details.settings_options["ratelimit_waittime"]["value"]
self.wait_for_rate_limit = self.settings_options["wait_on_ratelimit"]["value"]
self.wait_for_rate_limit_time = self.settings_options["ratelimit_waittime"]["value"]
self.issue_id: int | None = None
self.api_key = source_details.settings_options["api_key"]["value"]
self.api_base_url = source_details.settings_options["url_root"]["value"]
self.api_key = self.settings_options["api_key"]["value"]
self.api_base_url = self.settings_options["url_root"]["value"]
tmp_url = urlsplit(self.api_base_url)
@ -329,7 +325,7 @@ class ComicVineTalker(TalkerBase):
while True:
cv_response: CVResult = self.get_url_content(url, params)
if self.wait_for_rate_limit and cv_response["status_code"] == CV_RATE_LIMIT_STATUS:
self.write_log(f"Rate limit encountered. Waiting for {limit_wait_time} minutes\n")
logger.info(f"Rate limit encountered. Waiting for {limit_wait_time} minutes\n")
time.sleep(limit_wait_time * 60)
total_time_waited += limit_wait_time
limit_wait_time = wait_times[counter]
@ -339,7 +335,8 @@ class ComicVineTalker(TalkerBase):
if total_time_waited < self.wait_for_rate_limit_time:
continue
if cv_response["status_code"] != 1:
self.write_log(
# TODO Rather than logging and erroring, have error write to log?
logger.debug(
f"Comic Vine query failed with error #{cv_response['status_code']}: [{cv_response['error']}]. \n"
)
raise TalkerNetworkError(
@ -360,20 +357,20 @@ class ComicVineTalker(TalkerBase):
if resp.status_code == 200:
return resp.json()
if resp.status_code == 500:
self.write_log(f"Try #{tries + 1}: ")
logger.debug(f"Try #{tries + 1}: ")
time.sleep(1)
self.write_log(str(resp.status_code) + "\n")
logger.debug(str(resp.status_code) + "\n")
else:
break
except requests.exceptions.Timeout:
self.write_log("Connection to " + self.source_name_friendly + " timed out.\n")
logger.debug("Connection to " + self.source_name_friendly + " timed out.\n")
raise TalkerNetworkError(self.source_name_friendly, 4)
except requests.exceptions.RequestException as e:
self.write_log(f"{e}\n")
logger.debug(f"{e}\n")
raise TalkerNetworkError(self.source_name_friendly, 0, str(e)) from e
except json.JSONDecodeError as e:
self.write_log(f"{e}\n")
logger.debug(f"{e}\n")
raise TalkerDataError(self.source_name_friendly, 2, "ComicVine did not provide json")
raise TalkerNetworkError(self.source_name_friendly, 5)
@ -504,7 +501,7 @@ class ComicVineTalker(TalkerBase):
total_result_count = min(total_result_count, max_results)
if callback is None:
self.write_log(
logger.debug(
f"Found {cv_response['number_of_page_results']} of {cv_response['number_of_total_results']} results\n"
)
search_results.extend(cast(list[CVVolumeResults], cv_response["results"]))
@ -527,7 +524,7 @@ class ComicVineTalker(TalkerBase):
break
if callback is None:
self.write_log(f"getting another page of results {current_result_count} of {total_result_count}...\n")
logger.debug(f"getting another page of results {current_result_count} of {total_result_count}...\n")
page += 1
params["page"] = page
@ -567,10 +564,10 @@ class ComicVineTalker(TalkerBase):
return self.map_cv_volume_data_to_metadata(volume_results)
# Get issue or volume information
def fetch_comic_data(self, series_id: int, issue_number: str = "", issue_id: int = 0) -> GenericMetadata:
def fetch_comic_data(self, series_id: int = 0, issue_number: str = "", issue_id: int = 0) -> GenericMetadata:
comic_data = GenericMetadata()
# TODO remove has_issues check? Enables testing. Possibly add source option to only get volume info?
if self.source_details.static_options.has_issues and issue_number and series_id:
if self.static_options.has_issues and issue_number and series_id:
comic_data = self.fetch_issue_data(series_id, issue_number)
elif issue_id:
comic_data = self.fetch_issue_data_by_issue_id(issue_id)
@ -740,7 +737,6 @@ class ComicVineTalker(TalkerBase):
# To support volume only searching
def map_cv_volume_data_to_metadata(self, volume_results: CVVolumeFullResult) -> GenericMetadata:
settings = self.source_details.settings_options
# Now, map the Comic Vine data to generic metadata
metadata = GenericMetadata()
metadata.is_empty = False
@ -751,8 +747,10 @@ class ComicVineTalker(TalkerBase):
metadata.publisher = utils.xlate(volume_results["publisher"]["name"])
metadata.year = int(volume_results["start_year"])
metadata.comments = self.cleanup_html(volume_results["description"], settings["remove_html_tables"]["value"])
if settings["use_series_start_as_volume"]["value"]:
metadata.comments = self.cleanup_html(
volume_results["description"], self.settings_options["remove_html_tables"]["value"]
)
if self.settings_options["use_series_start_as_volume"]["value"]:
metadata.volume = int(volume_results["start_year"])
# TODO How to handle multiple sources? Leave this to sourcesapi to write?
@ -792,7 +790,6 @@ class ComicVineTalker(TalkerBase):
self, volume_results: ComicVolume, issue_results: CVIssueDetailResults
) -> GenericMetadata:
settings = self.source_details.settings_options
# Now, map the Comic Vine data to generic metadata
metadata = GenericMetadata()
metadata.is_empty = False
@ -806,8 +803,10 @@ class ComicVineTalker(TalkerBase):
metadata.publisher = utils.xlate(volume_results["publisher"])
metadata.day, metadata.month, metadata.year = utils.parse_date_str(issue_results["cover_date"])
metadata.comments = self.cleanup_html(issue_results["description"], settings["remove_html_tables"]["value"])
if settings["use_series_start_as_volume"]["value"]:
metadata.comments = self.cleanup_html(
issue_results["description"], self.settings_options["remove_html_tables"]["value"]
)
if self.settings_options["use_series_start_as_volume"]["value"]:
metadata.volume = volume_results["start_year"]
metadata.notes = (
@ -964,16 +963,16 @@ class ComicVineTalker(TalkerBase):
}
cv_response = self.get_cv_content(urljoin(self.api_base_url, "issues/"), params)
issue_result = cast(CVIssuesResults, cv_response["results"])
issue_result = cast(list[CVIssuesResults], cv_response["results"])
# Format to expected output
formatted_volume_issues_result = self.format_issue_results(issue_result)
cvc.add_volume_issues_info(self.source_name, formatted_volume_issues_result)
url_list = [x for x in issue["alt_images_url"].split(",") if x]
url_list = [x for x in formatted_volume_issues_result[0]["alt_images_url"].split(",") if x]
ComicTalker.alt_url_list_fetch_complete(url_list)
ct_api.alt_url_list_fetch_complete(url_list)
# CLI expects a list returned
return url_list

View File

@ -39,7 +39,7 @@ def test_fetch_issues_by_volume(comicvine_api, comic_cache):
def test_fetch_issue_data_by_issue_id(comicvine_api, settings, mock_now, mock_version):
ct = comictalker.talkers.comicvine.ComicVineTalker()
result = ct.fetch_comic_data(0, "", 140529)
result = ct.fetch_comic_data(issue_id=140529)
assert result == testing.comicvine.cv_md

View File

@ -16,7 +16,6 @@ import comicapi.comicarchive
import comicapi.genericmetadata
import comictaggerlib.settings
import comictalker.comiccacher
import comictalker.comictalker
import comictalker.talkers.comicvine
from comicapi import utils
from testing import comicvine, filenames
@ -115,7 +114,6 @@ def comicvine_api(monkeypatch, cbz, comic_cache) -> comictalker.talkers.comicvin
monkeypatch.setattr(requests, "get", m_get)
cv = comictalker.talkers.comicvine.ComicVineTalker()
cv.static_options = cv.source_details.static_options
return cv