diff --git a/build-tools/generate_settngs.py b/build-tools/generate_settngs.py index 8006f32..bd52cf0 100644 --- a/build-tools/generate_settngs.py +++ b/build-tools/generate_settngs.py @@ -10,7 +10,7 @@ import comictaggerlib.main def generate() -> str: app = comictaggerlib.main.App() app.load_plugins(app.initial_arg_parser.parse_known_args()[0]) - app.register_settings() + app.register_settings(True) imports, types = settngs.generate_dict(app.manager.definitions) imports2, types2 = settngs.generate_ns(app.manager.definitions) i = imports.splitlines() diff --git a/comicapi/utils.py b/comicapi/utils.py index a6b6831..da9b0b5 100644 --- a/comicapi/utils.py +++ b/comicapi/utils.py @@ -88,7 +88,7 @@ if sys.version_info < (3, 11): cls._lower_members = {x.casefold(): x for x in cls} # type: ignore[attr-defined] return cls._lower_members.get(value.casefold(), None) # type: ignore[attr-defined] - def __str__(self): + def __str__(self) -> str: return self.value else: diff --git a/comictaggerlib/cli.py b/comictaggerlib/cli.py index b5bbc57..5b7781e 100644 --- a/comictaggerlib/cli.py +++ b/comictaggerlib/cli.py @@ -36,6 +36,7 @@ from comictaggerlib.filerenamer import FileRenamer, get_rename_dir from comictaggerlib.graphics import graphics_path from comictaggerlib.issueidentifier import IssueIdentifier from comictaggerlib.md import prepare_metadata +from comictaggerlib.quick_tag import QuickTag from comictaggerlib.resulttypes import Action, IssueResult, MatchStatus, OnlineMatchResults, Result, Status from comictalker.comictalker import ComicTalker, TalkerError @@ -397,6 +398,153 @@ class CLI: res.status = status return res + def try_quick_tag(self, ca: ComicArchive, md: GenericMetadata) -> GenericMetadata | None: + if not self.config.Runtime_Options__enable_quick_tag: + self.output("skipping quick tag") + return None + self.output("starting quick tag") + try: + qt = QuickTag( + self.config.Quick_Tag__url, + str(utils.parse_url(self.current_talker().website).host), + self.current_talker(), + self.config, + self.output, + ) + ct_md = qt.id_comic( + ca, + md, + self.config.Quick_Tag__simple, + set(self.config.Quick_Tag__hash), + self.config.Quick_Tag__skip_non_exact, + self.config.Runtime_Options__interactive, + self.config.Quick_Tag__aggressive_filtering, + self.config.Quick_Tag__max, + ) + if ct_md is None: + ct_md = GenericMetadata() + return ct_md + except Exception: + logger.exception("Quick Tagging failed") + return None + + def normal_tag( + self, ca: ComicArchive, tags_read: list[str], md: GenericMetadata, match_results: OnlineMatchResults + ) -> tuple[GenericMetadata, list[IssueResult], Result | None, OnlineMatchResults]: + # ct_md, results, matches, match_results + if md is None or md.is_empty: + logger.error("No metadata given to search online with!") + res = Result( + Action.save, + status=Status.match_failure, + original_path=ca.path, + match_status=MatchStatus.no_match, + tags_written=self.config.Runtime_Options__tags_write, + tags_read=tags_read, + ) + match_results.no_matches.append(res) + return GenericMetadata(), [], res, match_results + + ii = IssueIdentifier(ca, self.config, self.current_talker()) + + ii.set_output_function(functools.partial(self.output, already_logged=True)) + if not self.config.Auto_Tag__use_year_when_identifying: + md.year = None + if self.config.Auto_Tag__ignore_leading_numbers_in_filename and md.series is not None: + md.series = re.sub(r"^([\d.]+)(.*)", r"\2", md.series) + result, matches = ii.identify(ca, md) + + found_match = False + choices = False + low_confidence = False + + if result == IssueIdentifier.result_no_matches: + pass + elif result == IssueIdentifier.result_found_match_but_bad_cover_score: + low_confidence = True + found_match = True + elif result == IssueIdentifier.result_found_match_but_not_first_page: + found_match = True + elif result == IssueIdentifier.result_multiple_matches_with_bad_image_scores: + low_confidence = True + choices = True + elif result == IssueIdentifier.result_one_good_match: + found_match = True + elif result == IssueIdentifier.result_multiple_good_matches: + choices = True + + if choices: + if low_confidence: + logger.error("Online search: Multiple low confidence matches. Save aborted") + res = Result( + Action.save, + status=Status.match_failure, + original_path=ca.path, + online_results=matches, + match_status=MatchStatus.low_confidence_match, + tags_written=self.config.Runtime_Options__tags_write, + tags_read=tags_read, + ) + match_results.low_confidence_matches.append(res) + return GenericMetadata(), matches, res, match_results + + logger.error("Online search: Multiple good matches. Save aborted") + res = Result( + Action.save, + status=Status.match_failure, + original_path=ca.path, + online_results=matches, + match_status=MatchStatus.multiple_match, + tags_written=self.config.Runtime_Options__tags_write, + tags_read=tags_read, + ) + match_results.multiple_matches.append(res) + return GenericMetadata(), matches, res, match_results + if low_confidence and self.config.Runtime_Options__abort_on_low_confidence: + logger.error("Online search: Low confidence match. Save aborted") + res = Result( + Action.save, + status=Status.match_failure, + original_path=ca.path, + online_results=matches, + match_status=MatchStatus.low_confidence_match, + tags_written=self.config.Runtime_Options__tags_write, + tags_read=tags_read, + ) + match_results.low_confidence_matches.append(res) + return GenericMetadata(), matches, res, match_results + if not found_match: + logger.error("Online search: No match found. Save aborted") + res = Result( + Action.save, + status=Status.match_failure, + original_path=ca.path, + online_results=matches, + match_status=MatchStatus.no_match, + tags_written=self.config.Runtime_Options__tags_write, + tags_read=tags_read, + ) + match_results.no_matches.append(res) + return GenericMetadata(), matches, res, match_results + + # we got here, so we have a single match + + # now get the particular issue data + ct_md = self.fetch_metadata(matches[0].issue_id) + if ct_md.is_empty: + res = Result( + Action.save, + status=Status.fetch_data_failure, + original_path=ca.path, + online_results=matches, + match_status=MatchStatus.good_match, + tags_written=self.config.Runtime_Options__tags_write, + tags_read=tags_read, + ) + match_results.fetch_data_failures.append(res) + return GenericMetadata(), matches, res, match_results + return ct_md, matches, None, match_results + def save(self, ca: ComicArchive, match_results: OnlineMatchResults) -> tuple[Result, OnlineMatchResults]: if self.config.Runtime_Options__skip_existing_tags: for tag_id in self.config.Runtime_Options__tags_write: @@ -455,117 +603,34 @@ class CLI: return res, match_results else: - if md is None or md.is_empty: - logger.error("No metadata given to search online with!") - res = Result( - Action.save, - status=Status.match_failure, - original_path=ca.path, - match_status=MatchStatus.no_match, - tags_written=self.config.Runtime_Options__tags_write, - tags_read=tags_read, - ) - match_results.no_matches.append(res) - return res, match_results - - ii = IssueIdentifier(ca, self.config, self.current_talker()) - - ii.set_output_function(functools.partial(self.output, already_logged=True)) - if not self.config.Auto_Tag__use_year_when_identifying: - md.year = None - if self.config.Auto_Tag__ignore_leading_numbers_in_filename and md.series is not None: - md.series = re.sub(r"^([\d.]+)(.*)", r"\2", md.series) - result, matches = ii.identify(ca, md) - - found_match = False - choices = False - low_confidence = False - - if result == IssueIdentifier.result_no_matches: - pass - elif result == IssueIdentifier.result_found_match_but_bad_cover_score: - low_confidence = True - found_match = True - elif result == IssueIdentifier.result_found_match_but_not_first_page: - found_match = True - elif result == IssueIdentifier.result_multiple_matches_with_bad_image_scores: - low_confidence = True - choices = True - elif result == IssueIdentifier.result_one_good_match: - found_match = True - elif result == IssueIdentifier.result_multiple_good_matches: - choices = True - - if choices: - if low_confidence: - logger.error("Online search: Multiple low confidence matches. Save aborted") - res = Result( - Action.save, - status=Status.match_failure, - original_path=ca.path, - online_results=matches, - match_status=MatchStatus.low_confidence_match, - tags_written=self.config.Runtime_Options__tags_write, - tags_read=tags_read, - ) - match_results.low_confidence_matches.append(res) + qt_md = self.try_quick_tag(ca, md) + if qt_md is None or qt_md.is_empty: + if qt_md is not None: + self.output("Failed to find match via quick tag") + ct_md, matches, res, match_results = self.normal_tag(ca, tags_read, md, match_results) # type: ignore[assignment] + if res is not None: return res, match_results - - logger.error("Online search: Multiple good matches. Save aborted") - res = Result( - Action.save, - status=Status.match_failure, - original_path=ca.path, - online_results=matches, - match_status=MatchStatus.multiple_match, - tags_written=self.config.Runtime_Options__tags_write, - tags_read=tags_read, - ) - match_results.multiple_matches.append(res) - return res, match_results - if low_confidence and self.config.Runtime_Options__abort_on_low_confidence: - logger.error("Online search: Low confidence match. Save aborted") - res = Result( - Action.save, - status=Status.match_failure, - original_path=ca.path, - online_results=matches, - match_status=MatchStatus.low_confidence_match, - tags_written=self.config.Runtime_Options__tags_write, - tags_read=tags_read, - ) - match_results.low_confidence_matches.append(res) - return res, match_results - if not found_match: - logger.error("Online search: No match found. Save aborted") - res = Result( - Action.save, - status=Status.match_failure, - original_path=ca.path, - online_results=matches, - match_status=MatchStatus.no_match, - tags_written=self.config.Runtime_Options__tags_write, - tags_read=tags_read, - ) - match_results.no_matches.append(res) - return res, match_results - - # we got here, so we have a single match - - # now get the particular issue data - ct_md = self.fetch_metadata(matches[0].issue_id) - if ct_md.is_empty: - res = Result( - Action.save, - status=Status.fetch_data_failure, - original_path=ca.path, - online_results=matches, - match_status=MatchStatus.good_match, - tags_written=self.config.Runtime_Options__tags_write, - tags_read=tags_read, - ) - match_results.fetch_data_failures.append(res) - return res, match_results + else: + self.output("Successfully matched via quick tag") + ct_md = qt_md + matches = [ + IssueResult( + series=ct_md.series or "", + distance=-1, + issue_number=ct_md.issue or "", + issue_count=ct_md.issue_count, + url_image_hash=-1, + issue_title=ct_md.title or "", + issue_id=ct_md.issue_id or "", + series_id=ct_md.issue_id or "", + month=ct_md.month, + year=ct_md.year, + publisher=None, + image_url=ct_md._cover_image or "", + alt_image_urls=[], + description=ct_md.description or "", + ) + ] res = Result( Action.save, diff --git a/comictaggerlib/ctsettings/__init__.py b/comictaggerlib/ctsettings/__init__.py index 5e68f71..e31d4ff 100644 --- a/comictaggerlib/ctsettings/__init__.py +++ b/comictaggerlib/ctsettings/__init__.py @@ -104,6 +104,8 @@ def save_file( filename: A pathlib.Path object to save the json dictionary to """ file_options = settngs.clean_config(config, file=True) + file_options["Quick Tag"]["url"] = str(file_options["Quick Tag"]["url"]) + try: if not filename.exists(): filename.parent.mkdir(exist_ok=True, parents=True) diff --git a/comictaggerlib/ctsettings/commandline.py b/comictaggerlib/ctsettings/commandline.py index a702cb8..27b4f8b 100644 --- a/comictaggerlib/ctsettings/commandline.py +++ b/comictaggerlib/ctsettings/commandline.py @@ -27,7 +27,7 @@ import settngs from comicapi import utils from comicapi.comicarchive import tags -from comictaggerlib import ctversion +from comictaggerlib import ctversion, quick_tag from comictaggerlib.ctsettings.settngs_namespace import SettngsNS as ct_ns from comictaggerlib.ctsettings.types import ComicTaggerPaths, tag from comictaggerlib.resulttypes import Action @@ -51,6 +51,12 @@ def initial_commandline_parser() -> argparse.ArgumentParser: default=0, help="Be noisy when doing what it does. Use a second time to enable debug logs.\nShort option cannot be combined with other options.", ) + parser.add_argument( + "--enable-quick-tag", + action=argparse.BooleanOptionalAction, + default=False, + help='Enable the expiremental "quick tagger"', + ) return parser @@ -70,6 +76,13 @@ def register_runtime(parser: settngs.Manager) -> None: help="Be noisy when doing what it does. Use a second time to enable debug logs.\nShort option cannot be combined with other options.", file=False, ) + parser.add_setting( + "--enable-quick-tag", + action=argparse.BooleanOptionalAction, + default=False, + help='Enable the expiremental "quick tagger"', + file=False, + ) parser.add_setting("-q", "--quiet", action="store_true", help="Don't say much (for print mode).", file=False) parser.add_setting( "-j", @@ -240,9 +253,11 @@ def register_commands(parser: settngs.Manager) -> None: ) -def register_commandline_settings(parser: settngs.Manager) -> None: +def register_commandline_settings(parser: settngs.Manager, enable_quick_tag: bool) -> None: parser.add_group("Commands", register_commands, True) parser.add_persistent_group("Runtime Options", register_runtime) + if enable_quick_tag: + parser.add_group("Quick Tag", quick_tag.settings) def validate_commandline_settings(config: settngs.Config[ct_ns], parser: settngs.Manager) -> settngs.Config[ct_ns]: diff --git a/comictaggerlib/ctsettings/settngs_namespace.py b/comictaggerlib/ctsettings/settngs_namespace.py index 10a60cc..704d59b 100644 --- a/comictaggerlib/ctsettings/settngs_namespace.py +++ b/comictaggerlib/ctsettings/settngs_namespace.py @@ -3,6 +3,7 @@ from __future__ import annotations import typing import settngs +import urllib3.util.url import comicapi.genericmetadata import comicapi.merge @@ -19,6 +20,7 @@ class SettngsNS(settngs.TypedNS): Runtime_Options__config: comictaggerlib.ctsettings.types.ComicTaggerPaths Runtime_Options__verbose: int + Runtime_Options__enable_quick_tag: bool Runtime_Options__quiet: bool Runtime_Options__json: bool Runtime_Options__raw: bool @@ -37,6 +39,13 @@ class SettngsNS(settngs.TypedNS): Runtime_Options__skip_existing_tags: bool Runtime_Options__files: list[str] + Quick_Tag__url: urllib3.util.url.Url + Quick_Tag__max: int + Quick_Tag__simple: bool + Quick_Tag__aggressive_filtering: bool + Quick_Tag__hash: list[comictaggerlib.quick_tag.HashType] + Quick_Tag__skip_non_exact: bool + internal__install_id: str internal__write_tags: list[str] internal__read_tags: list[str] @@ -132,6 +141,7 @@ class Commands(typing.TypedDict): class Runtime_Options(typing.TypedDict): config: comictaggerlib.ctsettings.types.ComicTaggerPaths verbose: int + enable_quick_tag: bool quiet: bool json: bool raw: bool @@ -151,6 +161,15 @@ class Runtime_Options(typing.TypedDict): files: list[str] +class Quick_Tag(typing.TypedDict): + url: urllib3.util.url.Url + max: int + simple: bool + aggressive_filtering: bool + hash: list[comictaggerlib.quick_tag.HashType] + skip_non_exact: bool + + class internal(typing.TypedDict): install_id: str write_tags: list[str] @@ -263,6 +282,7 @@ SettngsDict = typing.TypedDict( { "Commands": Commands, "Runtime Options": Runtime_Options, + "Quick Tag": Quick_Tag, "internal": internal, "Issue Identifier": Issue_Identifier, "Filename Parsing": Filename_Parsing, diff --git a/comictaggerlib/imagehasher.py b/comictaggerlib/imagehasher.py index 562a724..a629673 100644 --- a/comictaggerlib/imagehasher.py +++ b/comictaggerlib/imagehasher.py @@ -73,24 +73,23 @@ class ImageHasher: return result - def average_hash2(self) -> None: - """ - # Got this one from somewhere on the net. Not a clue how the 'convolve2d' works! + def difference_hash(self) -> int: + try: + image = self.image.resize((self.width + 1, self.height), Image.Resampling.LANCZOS).convert("L") + except Exception: + logger.exception("difference_hash error") + return 0 - from numpy import array - from scipy.signal import convolve2d + pixels = list(image.getdata()) + diff = "" + for y in range(self.height): + for x in range(self.width): + idx = x + (self.width + 1 * y) + diff += str(int(pixels[idx] < pixels[idx + 1])) - im = self.image.resize((self.width, self.height), Image.ANTIALIAS).convert('L') + result = int(diff, 2) - in_data = array((im.getdata())).reshape(self.width, self.height) - filt = array([[0,1,0],[1,-4,1],[0,1,0]]) - filt_data = convolve2d(in_data,filt,mode='same',boundary='symm').flatten() - - result = reduce(lambda x, (y, z): x | (z << y), - enumerate(map(lambda i: 0 if i < 0 else 1, filt_data)), - 0) return result - """ def p_hash(self) -> int: """ diff --git a/comictaggerlib/main.py b/comictaggerlib/main.py index cc61c76..e0dec98 100644 --- a/comictaggerlib/main.py +++ b/comictaggerlib/main.py @@ -117,7 +117,7 @@ class App: conf = self.initialize() self.initialize_dirs(conf.config) self.load_plugins(conf) - self.register_settings() + self.register_settings(conf.enable_quick_tag) self.config = self.parse_settings(conf.config) self.main() @@ -215,13 +215,13 @@ class App: setup_logging(conf.verbose, conf.config.user_log_dir) return conf - def register_settings(self) -> None: + def register_settings(self, enable_quick_tag: bool) -> None: self.manager = settngs.Manager( description="A utility for reading and writing metadata to comic archives.\n\n\n" + "If no options are given, %(prog)s will run in windowed mode.\nPlease keep the '-v' option separated '-so -v' not '-sov'", epilog="For more help visit the wiki at: https://github.com/comictagger/comictagger/wiki", ) - ctsettings.register_commandline_settings(self.manager) + ctsettings.register_commandline_settings(self.manager, enable_quick_tag) ctsettings.register_file_settings(self.manager) ctsettings.register_plugin_settings(self.manager, getattr(self, "talkers", {})) diff --git a/comictaggerlib/quick_tag.py b/comictaggerlib/quick_tag.py new file mode 100644 index 0000000..a308bad --- /dev/null +++ b/comictaggerlib/quick_tag.py @@ -0,0 +1,391 @@ +from __future__ import annotations + +import argparse +import itertools +import logging +from enum import auto +from io import BytesIO +from typing import Callable, TypedDict, cast +from urllib.parse import urljoin + +import requests +import settngs +from PIL import Image + +from comicapi import comicarchive, utils +from comicapi.genericmetadata import GenericMetadata +from comicapi.issuestring import IssueString +from comictaggerlib.ctsettings.settngs_namespace import SettngsNS +from comictaggerlib.imagehasher import ImageHasher +from comictalker import ComicTalker + +logger = logging.getLogger(__name__) + +__version__ = "0.1" + + +class HashType(utils.StrEnum): + AHASH = auto() + DHASH = auto() + PHASH = auto() + + +class SimpleResult(TypedDict): + Distance: int + # Mapping of domains (eg comicvine.gamespot.com) to IDs + IDList: dict[str, list[str]] + + +class Hash(TypedDict): + Hash: int + Kind: str + + +class Result(TypedDict): + # Mapping of domains (eg comicvine.gamespot.com) to IDs + IDList: dict[str, list[str]] + Distance: int + Hash: Hash + + +def ihash(types: str) -> list[HashType]: + result: list[HashType] = [] + types = types.casefold() + choices = ", ".join(HashType) + for typ in utils.split(types, ","): + if typ not in list(HashType): + raise argparse.ArgumentTypeError(f"invalid choice: {typ} (choose from {choices.upper()})") + result.append(HashType[typ.upper()]) + + if not result: + raise argparse.ArgumentTypeError(f"invalid choice: {types} (choose from {choices.upper()})") + return result + + +def settings(manager: settngs.Manager) -> None: + manager.add_setting( + "--url", + "-u", + default="https://comic-hasher.narnian.us", + type=utils.parse_url, + help="Website to use for searching cover hashes", + ) + manager.add_setting( + "--max", + default=8, + type=int, + help="Maximum score to allow. Lower score means more accurate", + ) + manager.add_setting( + "--simple", + default=False, + action=argparse.BooleanOptionalAction, + help="Whether to retrieve simple results or full results", + ) + manager.add_setting( + "--aggressive-filtering", + default=False, + action=argparse.BooleanOptionalAction, + help="Will filter out worse matches if better matches are found", + ) + manager.add_setting( + "--hash", + default="ahash, dhash, phash", + type=ihash, + help="Pick what hashes you want to use to search (default: %(default)s)", + ) + manager.add_setting( + "--skip-non-exact", + default=True, + action=argparse.BooleanOptionalAction, + help="Skip non-exact matches if we have exact matches", + ) + + +class QuickTag: + def __init__( + self, url: utils.Url, domain: str, talker: ComicTalker, config: SettngsNS, output: Callable[[str], None] + ): + self.output = output + self.url = url + self.talker = talker + self.domain = domain + self.config = config + + def id_comic( + self, + ca: comicarchive.ComicArchive, + tags: GenericMetadata, + simple: bool, + hashes: set[HashType], + skip_non_exact: bool, + interactive: bool, + aggressive_filtering: bool, + max_hamming_distance: int, + ) -> GenericMetadata | None: + if not ca.seems_to_be_a_comic_archive(): + raise Exception(f"{ca.path} is not an archive") + + cover_index = tags.get_cover_page_index_list()[0] + cover_image = Image.open(BytesIO(ca.get_page(cover_index))) + + self.output(f"Tagging: {ca.path}") + + self.output("hashing cover") + phash = dhash = ahash = "" + hasher = ImageHasher(image=cover_image) + if HashType.AHASH in hashes: + ahash = hex(hasher.average_hash())[2:] + if HashType.DHASH in hashes: + dhash = hex(hasher.difference_hash())[2:] + if HashType.PHASH in hashes: + phash = hex(hasher.p_hash())[2:] + + logger.info(f"Searching with {ahash=}, {dhash=}, {phash=}") + + self.output("Searching hashes") + results = self.SearchHashes(simple, max_hamming_distance, ahash, dhash, phash, skip_non_exact) + logger.debug(f"{results=}") + + if simple: + filtered_simple_results = self.filter_simple_results( + cast(list[SimpleResult], results), interactive, aggressive_filtering + ) + metadata_simple_results = self.get_simple_results(filtered_simple_results) + chosen_result = self.display_simple_results(metadata_simple_results, tags, interactive) + else: + filtered_results = self.filter_results(cast(list[Result], results), interactive, aggressive_filtering) + metadata_results = self.get_results(filtered_results) + chosen_result = self.display_results(metadata_results, tags, interactive) + + return self.talker.fetch_comic_data(issue_id=chosen_result.issue_id) + + def SearchHashes( + self, simple: bool, max_hamming_distance: int, ahash: str, dhash: str, phash: str, skip_non_exact: bool + ) -> list[SimpleResult] | list[Result]: + + resp = requests.get( + urljoin(self.url.url, "/match_cover_hash"), + params={ + "simple": str(simple), + "max": str(max_hamming_distance), + "ahash": ahash, + "dhash": dhash, + "phash": phash, + "skipNonExact": str(skip_non_exact), + }, + ) + if resp.status_code != 200: + try: + text = resp.json()["msg"] + except Exception: + text = resp.text + if text == "No hashes found": + return [] + logger.error("message from server: %s", text) + raise Exception(f"Failed to retrieve results from the server: {text}") + return resp.json()["results"] + + def get_mds(self, results: list[SimpleResult] | list[Result]) -> list[GenericMetadata]: + md_results: list[GenericMetadata] = [] + results.sort(key=lambda r: r["Distance"]) + all_ids = set() + for res in results: + all_ids.update(res["IDList"].get(self.domain, [])) + + self.output(f"Retrieving basic {self.talker.name} data") + # Try to do a bulk feth of basic issue data + if hasattr(self.talker, "fetch_comics"): + md_results = self.talker.fetch_comics(issue_ids=list(all_ids)) + else: + for md_id in all_ids: + md_results.append(self.talker.fetch_comic_data(issue_id=md_id)) + return md_results + + def get_simple_results(self, results: list[SimpleResult]) -> list[tuple[int, GenericMetadata]]: + md_results = [] + mds = self.get_mds(results) + + # Re-associate the md to the distance + for res in results: + for md in mds: + if md.issue_id in res["IDList"].get(self.domain, []): + md_results.append((res["Distance"], md)) + return md_results + + def get_results(self, results: list[Result]) -> list[tuple[int, Hash, GenericMetadata]]: + md_results = [] + mds = self.get_mds(results) + + # Re-associate the md to the distance + for res in results: + for md in mds: + if md.issue_id in res["IDList"].get(self.domain, []): + md_results.append((res["Distance"], res["Hash"], md)) + return md_results + + def filter_simple_results( + self, results: list[SimpleResult], interactive: bool, aggressive_filtering: bool + ) -> list[SimpleResult]: + # If there is a single exact match return it + exact = [r for r in results if r["Distance"] == 0] + if len(exact) == 1: + logger.info("Exact result found. Ignoring any others") + return exact + + # If ther are more than 4 results and any are better than 6 return the first group of results + if len(results) > 4: + dist: list[tuple[int, list[SimpleResult]]] = [] + filtered_results: list[SimpleResult] = [] + for distance, group in itertools.groupby(results, key=lambda r: r["Distance"]): + dist.append((distance, list(group))) + if aggressive_filtering and dist[0][0] < 6: + logger.info(f"Aggressive filtering is enabled. Dropping matches above {dist[0]}") + for _, res in dist[:1]: + filtered_results.extend(res) + logger.debug(f"{filtered_results=}") + return filtered_results + return results + + def filter_results(self, results: list[Result], interactive: bool, aggressive_filtering: bool) -> list[Result]: + ahash_results = sorted([r for r in results if r["Hash"]["Kind"] == "ahash"], key=lambda r: r["Distance"]) + dhash_results = sorted([r for r in results if r["Hash"]["Kind"] == "dhash"], key=lambda r: r["Distance"]) + phash_results = sorted([r for r in results if r["Hash"]["Kind"] == "phash"], key=lambda r: r["Distance"]) + hash_results = [phash_results, dhash_results, ahash_results] + + # If any of the hash types have a single exact match return it. Prefer phash for no particular reason + for hashed_result in hash_results: + exact = [r for r in hashed_result if r["Distance"] == 0] + if len(exact) == 1: + logger.info(f"Exact {exact[0]['Hash']['Kind']} result found. Ignoring any others") + return exact + + results_filtered = False + # If any of the hash types have more than 4 results and they have results better than 6 return the first group of results for each hash type + for i, hashed_results in enumerate(hash_results): + filtered_results: list[Result] = [] + if len(hashed_results) > 4: + dist: list[tuple[int, list[Result]]] = [] + for distance, group in itertools.groupby(hashed_results, key=lambda r: r["Distance"]): + dist.append((distance, list(group))) + if aggressive_filtering and dist[0][0] < 6: + logger.info( + f"Aggressive filtering is enabled. Dropping {dist[0][1][0]['Hash']['Kind']} matches above {dist[0][0]}" + ) + for _, res in dist[:1]: + filtered_results.extend(res) + + if filtered_results: + hash_results[i] = filtered_results + results_filtered = True + if results_filtered: + logger.debug(f"filtered_results={list(itertools.chain(*hash_results))}") + return list(itertools.chain(*hash_results)) + + def display_simple_results( + self, md_results: list[tuple[int, GenericMetadata]], tags: GenericMetadata, interactive: bool + ) -> GenericMetadata: + if len(md_results) < 1: + return GenericMetadata() + if len(md_results) == 1 and md_results[0][0] <= 4: + self.output("Found a single match <=4. Assuming it's correct") + return md_results[0][1] + series_match: list[GenericMetadata] = [] + for score, md in md_results: + if ( + score < 10 + and tags.series + and md.series + and utils.titles_match(tags.series, md.series) + and IssueString(tags.issue).as_string() == IssueString(md.issue).as_string() + ): + series_match.append(md) + if len(series_match) == 1: + self.output(f"Found match with series name {series_match[0].series!r}") + return series_match[0] + + if not interactive: + return GenericMetadata() + + md_results.sort(key=lambda r: (r[0], len(r[1].publisher or ""))) + for counter, r in enumerate(md_results, 1): + self.output( + " {:2}. score: {} [{:15}] ({:02}/{:04}) - {} #{} - {}".format( + counter, + r[0], + r[1].publisher, + r[1].month or 0, + r[1].year or 0, + r[1].series, + r[1].issue, + r[1].title, + ), + ) + while True: + i = input( + f'Please select a result to tag the comic with or "q" to quit: [1-{len(md_results)}] ', + ).casefold() + if i.isdigit() and int(i) in range(1, len(md_results) + 1): + break + if i == "q": + logger.warning("User quit without saving metadata") + return GenericMetadata() + + return md_results[int(i) - 1][1] + + def display_results( + self, + md_results: list[tuple[int, Hash, GenericMetadata]], + tags: GenericMetadata, + interactive: bool, + ) -> GenericMetadata: + if len(md_results) < 1: + return GenericMetadata() + if len(md_results) == 1 and md_results[0][0] <= 4: + self.output("Found a single match <=4. Assuming it's correct") + return md_results[0][2] + series_match: dict[str, tuple[int, Hash, GenericMetadata]] = {} + for score, cover_hash, md in md_results: + if ( + score < 10 + and tags.series + and md.series + and utils.titles_match(tags.series, md.series) + and IssueString(tags.issue).as_string() == IssueString(md.issue).as_string() + ): + assert md.issue_id + series_match[md.issue_id] = (score, cover_hash, md) + + if len(series_match) == 1: + score, cover_hash, md = list(series_match.values())[0] + self.output(f"Found {cover_hash['Kind']} {score=} match with series name {md.series!r}") + return md + if not interactive: + return GenericMetadata() + md_results.sort(key=lambda r: (r[0], len(r[2].publisher or ""), r[1]["Kind"])) + for counter, r in enumerate(md_results, 1): + self.output( + " {:2}. score: {} {}: {:064b} [{:15}] ({:02}/{:04}) - {} #{} - {}".format( + counter, + r[0], + r[1]["Kind"], + r[1]["Hash"], + r[2].publisher or "", + r[2].month or 0, + r[2].year or 0, + r[2].series or "", + r[2].issue or "", + r[2].title or "", + ), + ) + while True: + i = input( + f'Please select a result to tag the comic with or "q" to quit: [1-{len(md_results)}] ', + ).casefold() + if i.isdigit() and int(i) in range(1, len(md_results) + 1): + break + if i == "q": + self.output("User quit without saving metadata") + return GenericMetadata() + + return md_results[int(i) - 1][2] diff --git a/comictalker/talkers/comicvine.py b/comictalker/talkers/comicvine.py index 3b2703d..752fc36 100644 --- a/comictalker/talkers/comicvine.py +++ b/comictalker/talkers/comicvine.py @@ -410,6 +410,132 @@ class ComicVineTalker(ComicTalker): return formatted_filtered_issues_result + def fetch_comics(self, *, issue_ids: list[str]) -> list[GenericMetadata]: + # before we search online, look in our cache, since we might already have this info + cvc = ComicCacher(self.cache_folder, self.version) + cached_results: list[GenericMetadata] = [] + needed_issues: list[int] = [] + for issue_id in issue_ids: + cached_issue = cvc.get_issue_info(issue_id, self.id) + + if cached_issue and cached_issue[1]: + cached_results.append( + self._map_comic_issue_to_metadata( + json.loads(cached_issue[0].data), + self._fetch_series([int(cached_issue[0].series_id)])[0][0], + ), + ) + else: + needed_issues.append(int(issue_id)) # CV uses integers for it's IDs + + if not needed_issues: + return cached_results + issue_filter = "" + for iid in needed_issues: + issue_filter += str(iid) + "|" + flt = "id:" + issue_filter.rstrip("|") + + issue_url = urljoin(self.api_url, "issues/") + params: dict[str, Any] = { + "api_key": self.api_key, + "format": "json", + "filter": flt, + } + cv_response: CVResult[list[CVIssue]] = self._get_cv_content(issue_url, params) + + issue_results = cv_response["results"] + page = 1 + offset = 0 + current_result_count = cv_response["number_of_page_results"] + total_result_count = cv_response["number_of_total_results"] + + # see if we need to keep asking for more pages... + while current_result_count < total_result_count: + page += 1 + offset += cv_response["number_of_page_results"] + + params["offset"] = offset + cv_response = self._get_cv_content(issue_url, params) + + issue_results.extend(cv_response["results"]) + current_result_count += cv_response["number_of_page_results"] + + series_info = {s[0].id: s[0] for s in self._fetch_series([int(i["volume"]["id"]) for i in issue_results])} + + for issue in issue_results: + cvc.add_issues_info( + self.id, + [ + Issue( + id=str(issue["id"]), + series_id=str(issue["volume"]["id"]), + data=json.dumps(issue).encode("utf-8"), + ), + ], + True, + ) + cached_results.append( + self._map_comic_issue_to_metadata(issue, series_info[str(issue["volume"]["id"])]), + ) + + return cached_results + + def _fetch_series(self, series_ids: list[int]) -> list[tuple[ComicSeries, bool]]: + # before we search online, look in our cache, since we might already have this info + cvc = ComicCacher(self.cache_folder, self.version) + cached_results: list[tuple[ComicSeries, bool]] = [] + needed_series: list[int] = [] + for series_id in series_ids: + cached_series = cvc.get_series_info(str(series_id), self.id) + if cached_series is not None: + cached_results.append((self._format_series(json.loads(cached_series[0].data)), cached_series[1])) + else: + needed_series.append(series_id) + + if needed_series == []: + return cached_results + + series_filter = "" + for vid in needed_series: + series_filter += str(vid) + "|" + flt = "id:" + series_filter.rstrip("|") # CV uses volume to mean series + + series_url = urljoin(self.api_url, "volumes/") # CV uses volume to mean series + params: dict[str, Any] = { + "api_key": self.api_key, + "format": "json", + "filter": flt, + } + cv_response: CVResult[list[CVSeries]] = self._get_cv_content(series_url, params) + + series_results = cv_response["results"] + page = 1 + offset = 0 + current_result_count = cv_response["number_of_page_results"] + total_result_count = cv_response["number_of_total_results"] + + # see if we need to keep asking for more pages... + while current_result_count < total_result_count: + page += 1 + offset += cv_response["number_of_page_results"] + + params["offset"] = offset + cv_response = self._get_cv_content(series_url, params) + + series_results.extend(cv_response["results"]) + current_result_count += cv_response["number_of_page_results"] + + if series_results: + for series in series_results: + cvc.add_series_info( + self.id, + Series(id=str(series["id"]), data=json.dumps(series).encode("utf-8")), + True, + ) + cached_results.append((self._format_series(series), True)) + + return cached_results + def _get_cv_content(self, url: str, params: dict[str, Any]) -> CVResult[T]: """ Get the content from the CV server. diff --git a/tests/conftest.py b/tests/conftest.py index ad94110..5fc5950 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -197,7 +197,7 @@ def config(tmp_path): from comictaggerlib.main import App app = App() - app.register_settings() + app.register_settings(False) defaults = app.parse_settings(comictaggerlib.ctsettings.ComicTaggerPaths(tmp_path / "config"), "") defaults[0].Runtime_Options__config.user_config_dir.mkdir(parents=True, exist_ok=True) @@ -214,7 +214,7 @@ def plugin_config(tmp_path): ns = Namespace(config=comictaggerlib.ctsettings.ComicTaggerPaths(tmp_path / "config")) app = App() app.load_plugins(ns) - app.register_settings() + app.register_settings(False) defaults = app.parse_settings(ns.config, "") defaults[0].Runtime_Options__config.user_config_dir.mkdir(parents=True, exist_ok=True)