Move most options passed in to ComicVineTalker to ComicTalker
Give ComicCacher and ComicTalker a version argument to remove all
  references to comictaggerlib
Update default arguments to reflect what is required to use these classes
This commit is contained in:
Timmy Welch 2022-11-25 15:47:05 -08:00
parent 129e19ac9d
commit fc4eb4f002
No known key found for this signature in database
8 changed files with 88 additions and 67 deletions

View File

@ -189,13 +189,15 @@ See https://github.com/comictagger/comictagger/releases/1.5.5 for more informati
talker_exception = None
try:
talker_api = ct_api.get_comic_talker("comicvine")(
settings.cv_url,
settings.cv_api_key,
settings.id_series_match_search_thresh,
settings.remove_html_tables,
settings.use_series_start_as_volume,
settings.wait_and_retry_on_rate_limit,
talker_api = ct_api.get_comic_talker("comicvine")( # type: ignore[call-arg]
version=version,
cache_folder=ComicTaggerSettings.get_settings_folder(),
series_match_thresh=settings.id_series_match_search_thresh,
remove_html_tables=settings.remove_html_tables,
use_series_start_as_volume=settings.use_series_start_as_volume,
wait_on_ratelimit=settings.wait_and_retry_on_rate_limit,
api_url=settings.cv_url,
api_key=settings.cv_api_key,
)
except TalkerError as e:
logger.exception("Unable to load talker")

View File

@ -25,6 +25,7 @@ from PyQt5 import QtCore, QtGui, QtWidgets, uic
from comicapi import utils
from comicapi.genericmetadata import md_test
from comictaggerlib.ctversion import version
from comictaggerlib.filerenamer import FileRenamer
from comictaggerlib.imagefetcher import ImageFetcher
from comictaggerlib.settings import ComicTaggerSettings
@ -333,7 +334,7 @@ class SettingsWindow(QtWidgets.QDialog):
self.talker_api.api_key = self.settings.cv_api_key
if self.leURL.text().strip():
self.settings.cv_url = self.leURL.text().strip()
self.talker_api.api_base_url = self.settings.cv_url
self.talker_api.api_url = self.settings.cv_url
self.settings.assume_lone_credit_is_primary = self.cbxAssumeLoneCreditIsPrimary.isChecked()
self.settings.copy_characters_to_tags = self.cbxCopyCharactersToTags.isChecked()
self.settings.copy_teams_to_tags = self.cbxCopyTeamsToTags.isChecked()
@ -361,7 +362,7 @@ class SettingsWindow(QtWidgets.QDialog):
def clear_cache(self) -> None:
ImageFetcher().clear_cache()
ComicCacher().clear_cache()
ComicCacher(ComicTaggerSettings.get_settings_folder(), version).clear_cache()
QtWidgets.QMessageBox.information(self, self.name, "Cache has been cleared.")
def test_api_key(self) -> None:

View File

@ -19,21 +19,21 @@ import datetime
import json
import logging
import os
import pathlib
import sqlite3 as lite
from typing import Any
from comictaggerlib import ctversion
from comictaggerlib.settings import ComicTaggerSettings
from comictalker.resulttypes import ComicIssue, ComicVolume
logger = logging.getLogger(__name__)
class ComicCacher:
def __init__(self) -> None:
self.settings_folder = ComicTaggerSettings.get_settings_folder()
self.db_file = os.path.join(self.settings_folder, "comic_cache.db")
self.version_file = os.path.join(self.settings_folder, "cache_version.txt")
def __init__(self, cache_folder: pathlib.Path, version: str) -> None:
self.cache_folder = cache_folder
self.db_file = cache_folder / "comic_cache.db"
self.version_file = cache_folder / "cache_version.txt"
self.version = version
# verify that cache is from same version as this one
data = ""
@ -43,7 +43,7 @@ class ComicCacher:
f.close()
except Exception:
pass
if data != ctversion.version:
if data != version:
self.clear_cache()
if not os.path.exists(self.db_file):
@ -63,7 +63,7 @@ class ComicCacher:
# create the version file
with open(self.version_file, "w", encoding="utf-8") as f:
f.write(ctversion.version)
f.write(self.version)
# this will wipe out any existing version
open(self.db_file, "wb").close()

View File

@ -33,6 +33,6 @@ def get_comic_talker(source_name: str) -> type[ComicTalker]:
return talker
def get_talkers():
def get_talkers() -> dict[str, type[ComicTalker]]:
"""Returns all comic talker modules NOT objects"""
return {"comicvine": comictalker.talkers.comicvine.ComicVineTalker}

View File

@ -16,7 +16,9 @@
from __future__ import annotations
import logging
import pathlib
from typing import Callable
from urllib.parse import urlsplit
from comicapi.genericmetadata import GenericMetadata
from comictalker.resulttypes import ComicIssue, ComicVolume
@ -75,7 +77,7 @@ class TalkerError(Exception):
4: "Other",
}
def __init__(self, source: str = "", code: int = 4, desc: str = "", sub_code: int = 0) -> None:
def __init__(self, source: str, desc: str, code: int = 4, sub_code: int = 0) -> None:
super().__init__()
if desc == "":
desc = "Unknown"
@ -85,7 +87,7 @@ class TalkerError(Exception):
self.sub_code = sub_code
self.source = source
def __str__(self):
def __str__(self) -> str:
return f"{self.source} encountered a {self.code_name} error. {self.desc}"
@ -113,7 +115,7 @@ class TalkerNetworkError(TalkerError):
if desc == "":
desc = self.net_codes[sub_code]
super().__init__(source, 2, desc, sub_code)
super().__init__(source, desc, 2, sub_code)
class TalkerDataError(TalkerError):
@ -137,19 +139,36 @@ class TalkerDataError(TalkerError):
if desc == "":
desc = self.data_codes[sub_code]
super().__init__(source, 3, desc, sub_code)
super().__init__(source, desc, 3, sub_code)
# Class talkers instance
class ComicTalker:
"""The base class for all comic source talkers"""
def __init__(self) -> None:
default_api_url: str = ""
default_api_key: str = ""
def __init__(self, version: str, cache_folder: pathlib.Path, api_url: str = "", api_key: str = "") -> None:
# Identity name for the information source etc.
self.source_details: SourceDetails = (
SourceDetails()
) # Can use this to test if custom talker has been configured
self.static_options: SourceStaticOptions = SourceStaticOptions()
self.api_key = api_key
self.cache_folder = cache_folder
self.version = version
self.api_key = api_key or self.default_api_key
self.api_url = api_url or self.default_api_url
tmp_url = urlsplit(self.api_url)
# joinurl only works properly if there is a trailing slash
if tmp_url.path and tmp_url.path[-1] != "/":
tmp_url = tmp_url._replace(path=tmp_url.path + "/")
self.api_url = tmp_url.geturl()
def check_api_key(self, key: str, url: str) -> bool:
"""If the talker has or requires an API key, this function should test its validity"""

View File

@ -17,6 +17,7 @@ from __future__ import annotations
import json
import logging
import pathlib
import time
from typing import Any, Callable, cast
from urllib.parse import urljoin, urlsplit
@ -28,7 +29,6 @@ import comictalker.talker_utils as talker_utils
from comicapi import utils
from comicapi.genericmetadata import GenericMetadata
from comicapi.issuestring import IssueString
from comictaggerlib import ctversion
from comictalker.comiccacher import ComicCacher
from comictalker.resulttypes import ComicIssue, ComicVolume, Credits
from comictalker.talkerbase import ComicTalker, SourceDetails, SourceStaticOptions, TalkerDataError, TalkerNetworkError
@ -164,10 +164,21 @@ CV_RATE_LIMIT_STATUS = 107
class ComicVineTalker(ComicTalker):
default_api_key = "27431e6787042105bd3e47e169a624521f89f3a4"
default_api_url = "https://comicvine.gamespot.com/api"
def __init__(
self, api_url, api_key, series_match_thresh, remove_html_tables, use_series_start_as_volume, wait_on_ratelimit
self,
version: str,
cache_folder: pathlib.Path,
api_url: str = "",
api_key: str = "",
series_match_thresh: int = 90,
remove_html_tables: bool = False,
use_series_start_as_volume: bool = False,
wait_on_ratelimit: bool = False,
):
super().__init__()
super().__init__(version, cache_folder, api_url, api_key)
self.source_details = SourceDetails(
name="Comic Vine",
ident="comicvine",
@ -182,34 +193,21 @@ class ComicVineTalker(ComicTalker):
)
# Identity name for the information source
self.source_name = self.source_details.id
self.source_name_friendly = self.source_details.name
self.source_name: str = self.source_details.id
self.source_name_friendly: str = self.source_details.name
self.wait_for_rate_limit = wait_on_ratelimit
self.wait_for_rate_limit: bool = wait_on_ratelimit
# NOTE: This was hardcoded before which is why it isn't passed in
self.wait_for_rate_limit_time = 20
self.wait_for_rate_limit_time: int = 20
self.issue_id: int | None = None
self.remove_html_tables: bool = remove_html_tables
self.use_series_start_as_volume: bool = use_series_start_as_volume
self.api_key = api_key if api_key else "27431e6787042105bd3e47e169a624521f89f3a4"
self.api_base_url = api_url if api_url else "https://comicvine.gamespot.com/api"
self.remove_html_tables = remove_html_tables
self.use_series_start_as_volume = use_series_start_as_volume
tmp_url = urlsplit(self.api_base_url)
# joinurl only works properly if there is a trailing slash
if tmp_url.path and tmp_url.path[-1] != "/":
tmp_url = tmp_url._replace(path=tmp_url.path + "/")
self.api_base_url = tmp_url.geturl()
self.series_match_thresh = series_match_thresh
self.series_match_thresh: int = series_match_thresh
def check_api_key(self, key: str, url: str) -> bool:
if not url:
url = self.api_base_url
url = self.api_url
try:
tmp_url = urlsplit(url)
if tmp_url.path and tmp_url.path[-1] != "/":
@ -219,7 +217,7 @@ class ComicVineTalker(ComicTalker):
cv_response: CVResult = requests.get(
test_url,
headers={"user-agent": "comictagger/" + ctversion.version},
headers={"user-agent": "comictagger/" + self.version},
params={
"api_key": key,
"format": "json",
@ -271,7 +269,7 @@ class ComicVineTalker(ComicTalker):
# any other error, just bail
for tries in range(3):
try:
resp = requests.get(url, params=params, headers={"user-agent": "comictagger/" + ctversion.version})
resp = requests.get(url, params=params, headers={"user-agent": "comictagger/" + self.version})
if resp.status_code == 200:
return resp.json()
if resp.status_code == 500:
@ -406,7 +404,7 @@ class ComicVineTalker(ComicTalker):
# Before we search online, look in our cache, since we might have done this same search recently
# For literal searches always retrieve from online
cvc = ComicCacher()
cvc = ComicCacher(self.cache_folder, self.version)
if not refresh_cache and not literal:
cached_search_results = cvc.get_search_results(self.source_name, series_name)
@ -423,7 +421,7 @@ class ComicVineTalker(ComicTalker):
"limit": 100,
}
cv_response = self.get_cv_content(urljoin(self.api_base_url, "search"), params)
cv_response = self.get_cv_content(urljoin(self.api_url, "search"), params)
search_results: list[CVVolumeResults] = []
@ -469,7 +467,7 @@ class ComicVineTalker(ComicTalker):
page += 1
params["page"] = page
cv_response = self.get_cv_content(urljoin(self.api_base_url, "search"), params)
cv_response = self.get_cv_content(urljoin(self.api_url, "search"), params)
search_results.extend(cast(list[CVVolumeResults], cv_response["results"]))
current_result_count += cv_response["number_of_page_results"]
@ -498,13 +496,13 @@ class ComicVineTalker(ComicTalker):
def fetch_partial_volume_data(self, series_id: int) -> ComicVolume:
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher()
cvc = ComicCacher(self.cache_folder, self.version)
cached_volume_result = cvc.get_volume_info(series_id, self.source_name)
if cached_volume_result is not None:
return cached_volume_result
volume_url = urljoin(self.api_base_url, f"volume/{CVTypeID.Volume}-{series_id}")
volume_url = urljoin(self.api_url, f"volume/{CVTypeID.Volume}-{series_id}")
params = {
"api_key": self.api_key,
@ -523,7 +521,7 @@ class ComicVineTalker(ComicTalker):
def fetch_issues_by_volume(self, series_id: int) -> list[ComicIssue]:
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher()
cvc = ComicCacher(self.cache_folder, self.version)
cached_volume_issues_result = cvc.get_volume_issues_info(series_id, self.source_name)
volume_data = self.fetch_partial_volume_data(series_id)
@ -538,7 +536,7 @@ class ComicVineTalker(ComicTalker):
"field_list": "id,volume,issue_number,name,image,cover_date,site_detail_url,description,aliases,associated_images",
"offset": 0,
}
cv_response = self.get_cv_content(urljoin(self.api_base_url, "issues/"), params)
cv_response = self.get_cv_content(urljoin(self.api_url, "issues/"), params)
current_result_count = cv_response["number_of_page_results"]
total_result_count = cv_response["number_of_total_results"]
@ -553,7 +551,7 @@ class ComicVineTalker(ComicTalker):
offset += cv_response["number_of_page_results"]
params["offset"] = offset
cv_response = self.get_cv_content(urljoin(self.api_base_url, "issues/"), params)
cv_response = self.get_cv_content(urljoin(self.api_url, "issues/"), params)
volume_issues_result.extend(cast(list[CVIssueDetailResults], cv_response["results"]))
current_result_count += cv_response["number_of_page_results"]
@ -584,7 +582,7 @@ class ComicVineTalker(ComicTalker):
"filter": flt,
}
cv_response = self.get_cv_content(urljoin(self.api_base_url, "issues/"), params)
cv_response = self.get_cv_content(urljoin(self.api_url, "issues/"), params)
current_result_count = cv_response["number_of_page_results"]
total_result_count = cv_response["number_of_total_results"]
@ -599,7 +597,7 @@ class ComicVineTalker(ComicTalker):
offset += cv_response["number_of_page_results"]
params["offset"] = offset
cv_response = self.get_cv_content(urljoin(self.api_base_url, "issues/"), params)
cv_response = self.get_cv_content(urljoin(self.api_url, "issues/"), params)
filtered_issues_result.extend(cast(list[CVIssueDetailResults], cv_response["results"]))
current_result_count += cv_response["number_of_page_results"]
@ -634,12 +632,11 @@ class ComicVineTalker(ComicTalker):
if f_record is not None:
return self.fetch_issue_data_by_issue_id(f_record["id"])
else:
return GenericMetadata()
return GenericMetadata()
def fetch_issue_data_by_issue_id(self, issue_id: int) -> GenericMetadata:
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher()
cvc = ComicCacher(self.cache_folder, self.version)
cached_issues_result = cvc.get_issue_info(issue_id, self.source_name)
if cached_issues_result and cached_issues_result["complete"]:
@ -650,7 +647,7 @@ class ComicVineTalker(ComicTalker):
self.use_series_start_as_volume,
)
issue_url = urljoin(self.api_base_url, f"issue/{CVTypeID.Issue}-{issue_id}")
issue_url = urljoin(self.api_url, f"issue/{CVTypeID.Issue}-{issue_id}")
params = {"api_key": self.api_key, "format": "json"}
cv_response = self.get_cv_content(issue_url, params)

View File

@ -6,8 +6,8 @@ import comictalker.comiccacher
from testing.comicdata import search_results
def test_create_cache(settings):
comictalker.comiccacher.ComicCacher()
def test_create_cache(settings, mock_version):
comictalker.comiccacher.ComicCacher(settings.get_settings_folder(), mock_version[0])
assert (settings.get_settings_folder() / "settings").exists()

View File

@ -115,6 +115,8 @@ def comicvine_api(
monkeypatch.setattr(requests, "get", m_get)
cv = comictalker.talkers.comicvine.ComicVineTalker(
version=mock_version[0],
cache_folder=settings.get_settings_folder(),
api_url="",
api_key="",
series_match_thresh=90,
@ -165,5 +167,5 @@ def settings(tmp_path):
@pytest.fixture
def comic_cache(settings) -> Generator[comictalker.comiccacher.ComicCacher, Any, None]:
yield comictalker.comiccacher.ComicCacher()
def comic_cache(settings, mock_version) -> Generator[comictalker.comiccacher.ComicCacher, Any, None]:
yield comictalker.comiccacher.ComicCacher(settings.get_settings_folder(), mock_version[0])