Add experimental quick-tag
This commit is contained in:
parent
2cb6caea8d
commit
fab30f3f29
@ -10,7 +10,7 @@ import comictaggerlib.main
|
||||
def generate() -> str:
|
||||
app = comictaggerlib.main.App()
|
||||
app.load_plugins(app.initial_arg_parser.parse_known_args()[0])
|
||||
app.register_settings()
|
||||
app.register_settings(True)
|
||||
imports, types = settngs.generate_dict(app.manager.definitions)
|
||||
imports2, types2 = settngs.generate_ns(app.manager.definitions)
|
||||
i = imports.splitlines()
|
||||
|
@ -88,7 +88,7 @@ if sys.version_info < (3, 11):
|
||||
cls._lower_members = {x.casefold(): x for x in cls} # type: ignore[attr-defined]
|
||||
return cls._lower_members.get(value.casefold(), None) # type: ignore[attr-defined]
|
||||
|
||||
def __str__(self):
|
||||
def __str__(self) -> str:
|
||||
return self.value
|
||||
|
||||
else:
|
||||
|
@ -36,6 +36,7 @@ from comictaggerlib.filerenamer import FileRenamer, get_rename_dir
|
||||
from comictaggerlib.graphics import graphics_path
|
||||
from comictaggerlib.issueidentifier import IssueIdentifier
|
||||
from comictaggerlib.md import prepare_metadata
|
||||
from comictaggerlib.quick_tag import QuickTag
|
||||
from comictaggerlib.resulttypes import Action, IssueResult, MatchStatus, OnlineMatchResults, Result, Status
|
||||
from comictalker.comictalker import ComicTalker, TalkerError
|
||||
|
||||
@ -397,6 +398,153 @@ class CLI:
|
||||
res.status = status
|
||||
return res
|
||||
|
||||
def try_quick_tag(self, ca: ComicArchive, md: GenericMetadata) -> GenericMetadata | None:
|
||||
if not self.config.Runtime_Options__enable_quick_tag:
|
||||
self.output("skipping quick tag")
|
||||
return None
|
||||
self.output("starting quick tag")
|
||||
try:
|
||||
qt = QuickTag(
|
||||
self.config.Quick_Tag__url,
|
||||
str(utils.parse_url(self.current_talker().website).host),
|
||||
self.current_talker(),
|
||||
self.config,
|
||||
self.output,
|
||||
)
|
||||
ct_md = qt.id_comic(
|
||||
ca,
|
||||
md,
|
||||
self.config.Quick_Tag__simple,
|
||||
set(self.config.Quick_Tag__hash),
|
||||
self.config.Quick_Tag__skip_non_exact,
|
||||
self.config.Runtime_Options__interactive,
|
||||
self.config.Quick_Tag__aggressive_filtering,
|
||||
self.config.Quick_Tag__max,
|
||||
)
|
||||
if ct_md is None:
|
||||
ct_md = GenericMetadata()
|
||||
return ct_md
|
||||
except Exception:
|
||||
logger.exception("Quick Tagging failed")
|
||||
return None
|
||||
|
||||
def normal_tag(
|
||||
self, ca: ComicArchive, tags_read: list[str], md: GenericMetadata, match_results: OnlineMatchResults
|
||||
) -> tuple[GenericMetadata, list[IssueResult], Result | None, OnlineMatchResults]:
|
||||
# ct_md, results, matches, match_results
|
||||
if md is None or md.is_empty:
|
||||
logger.error("No metadata given to search online with!")
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.match_failure,
|
||||
original_path=ca.path,
|
||||
match_status=MatchStatus.no_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.no_matches.append(res)
|
||||
return GenericMetadata(), [], res, match_results
|
||||
|
||||
ii = IssueIdentifier(ca, self.config, self.current_talker())
|
||||
|
||||
ii.set_output_function(functools.partial(self.output, already_logged=True))
|
||||
if not self.config.Auto_Tag__use_year_when_identifying:
|
||||
md.year = None
|
||||
if self.config.Auto_Tag__ignore_leading_numbers_in_filename and md.series is not None:
|
||||
md.series = re.sub(r"^([\d.]+)(.*)", r"\2", md.series)
|
||||
result, matches = ii.identify(ca, md)
|
||||
|
||||
found_match = False
|
||||
choices = False
|
||||
low_confidence = False
|
||||
|
||||
if result == IssueIdentifier.result_no_matches:
|
||||
pass
|
||||
elif result == IssueIdentifier.result_found_match_but_bad_cover_score:
|
||||
low_confidence = True
|
||||
found_match = True
|
||||
elif result == IssueIdentifier.result_found_match_but_not_first_page:
|
||||
found_match = True
|
||||
elif result == IssueIdentifier.result_multiple_matches_with_bad_image_scores:
|
||||
low_confidence = True
|
||||
choices = True
|
||||
elif result == IssueIdentifier.result_one_good_match:
|
||||
found_match = True
|
||||
elif result == IssueIdentifier.result_multiple_good_matches:
|
||||
choices = True
|
||||
|
||||
if choices:
|
||||
if low_confidence:
|
||||
logger.error("Online search: Multiple low confidence matches. Save aborted")
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.match_failure,
|
||||
original_path=ca.path,
|
||||
online_results=matches,
|
||||
match_status=MatchStatus.low_confidence_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.low_confidence_matches.append(res)
|
||||
return GenericMetadata(), matches, res, match_results
|
||||
|
||||
logger.error("Online search: Multiple good matches. Save aborted")
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.match_failure,
|
||||
original_path=ca.path,
|
||||
online_results=matches,
|
||||
match_status=MatchStatus.multiple_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.multiple_matches.append(res)
|
||||
return GenericMetadata(), matches, res, match_results
|
||||
if low_confidence and self.config.Runtime_Options__abort_on_low_confidence:
|
||||
logger.error("Online search: Low confidence match. Save aborted")
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.match_failure,
|
||||
original_path=ca.path,
|
||||
online_results=matches,
|
||||
match_status=MatchStatus.low_confidence_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.low_confidence_matches.append(res)
|
||||
return GenericMetadata(), matches, res, match_results
|
||||
if not found_match:
|
||||
logger.error("Online search: No match found. Save aborted")
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.match_failure,
|
||||
original_path=ca.path,
|
||||
online_results=matches,
|
||||
match_status=MatchStatus.no_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.no_matches.append(res)
|
||||
return GenericMetadata(), matches, res, match_results
|
||||
|
||||
# we got here, so we have a single match
|
||||
|
||||
# now get the particular issue data
|
||||
ct_md = self.fetch_metadata(matches[0].issue_id)
|
||||
if ct_md.is_empty:
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.fetch_data_failure,
|
||||
original_path=ca.path,
|
||||
online_results=matches,
|
||||
match_status=MatchStatus.good_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.fetch_data_failures.append(res)
|
||||
return GenericMetadata(), matches, res, match_results
|
||||
return ct_md, matches, None, match_results
|
||||
|
||||
def save(self, ca: ComicArchive, match_results: OnlineMatchResults) -> tuple[Result, OnlineMatchResults]:
|
||||
if self.config.Runtime_Options__skip_existing_tags:
|
||||
for tag_id in self.config.Runtime_Options__tags_write:
|
||||
@ -455,117 +603,34 @@ class CLI:
|
||||
return res, match_results
|
||||
|
||||
else:
|
||||
if md is None or md.is_empty:
|
||||
logger.error("No metadata given to search online with!")
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.match_failure,
|
||||
original_path=ca.path,
|
||||
match_status=MatchStatus.no_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.no_matches.append(res)
|
||||
return res, match_results
|
||||
|
||||
ii = IssueIdentifier(ca, self.config, self.current_talker())
|
||||
|
||||
ii.set_output_function(functools.partial(self.output, already_logged=True))
|
||||
if not self.config.Auto_Tag__use_year_when_identifying:
|
||||
md.year = None
|
||||
if self.config.Auto_Tag__ignore_leading_numbers_in_filename and md.series is not None:
|
||||
md.series = re.sub(r"^([\d.]+)(.*)", r"\2", md.series)
|
||||
result, matches = ii.identify(ca, md)
|
||||
|
||||
found_match = False
|
||||
choices = False
|
||||
low_confidence = False
|
||||
|
||||
if result == IssueIdentifier.result_no_matches:
|
||||
pass
|
||||
elif result == IssueIdentifier.result_found_match_but_bad_cover_score:
|
||||
low_confidence = True
|
||||
found_match = True
|
||||
elif result == IssueIdentifier.result_found_match_but_not_first_page:
|
||||
found_match = True
|
||||
elif result == IssueIdentifier.result_multiple_matches_with_bad_image_scores:
|
||||
low_confidence = True
|
||||
choices = True
|
||||
elif result == IssueIdentifier.result_one_good_match:
|
||||
found_match = True
|
||||
elif result == IssueIdentifier.result_multiple_good_matches:
|
||||
choices = True
|
||||
|
||||
if choices:
|
||||
if low_confidence:
|
||||
logger.error("Online search: Multiple low confidence matches. Save aborted")
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.match_failure,
|
||||
original_path=ca.path,
|
||||
online_results=matches,
|
||||
match_status=MatchStatus.low_confidence_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.low_confidence_matches.append(res)
|
||||
qt_md = self.try_quick_tag(ca, md)
|
||||
if qt_md is None or qt_md.is_empty:
|
||||
if qt_md is not None:
|
||||
self.output("Failed to find match via quick tag")
|
||||
ct_md, matches, res, match_results = self.normal_tag(ca, tags_read, md, match_results) # type: ignore[assignment]
|
||||
if res is not None:
|
||||
return res, match_results
|
||||
|
||||
logger.error("Online search: Multiple good matches. Save aborted")
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.match_failure,
|
||||
original_path=ca.path,
|
||||
online_results=matches,
|
||||
match_status=MatchStatus.multiple_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.multiple_matches.append(res)
|
||||
return res, match_results
|
||||
if low_confidence and self.config.Runtime_Options__abort_on_low_confidence:
|
||||
logger.error("Online search: Low confidence match. Save aborted")
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.match_failure,
|
||||
original_path=ca.path,
|
||||
online_results=matches,
|
||||
match_status=MatchStatus.low_confidence_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.low_confidence_matches.append(res)
|
||||
return res, match_results
|
||||
if not found_match:
|
||||
logger.error("Online search: No match found. Save aborted")
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.match_failure,
|
||||
original_path=ca.path,
|
||||
online_results=matches,
|
||||
match_status=MatchStatus.no_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.no_matches.append(res)
|
||||
return res, match_results
|
||||
|
||||
# we got here, so we have a single match
|
||||
|
||||
# now get the particular issue data
|
||||
ct_md = self.fetch_metadata(matches[0].issue_id)
|
||||
if ct_md.is_empty:
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.fetch_data_failure,
|
||||
original_path=ca.path,
|
||||
online_results=matches,
|
||||
match_status=MatchStatus.good_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.fetch_data_failures.append(res)
|
||||
return res, match_results
|
||||
else:
|
||||
self.output("Successfully matched via quick tag")
|
||||
ct_md = qt_md
|
||||
matches = [
|
||||
IssueResult(
|
||||
series=ct_md.series or "",
|
||||
distance=-1,
|
||||
issue_number=ct_md.issue or "",
|
||||
issue_count=ct_md.issue_count,
|
||||
url_image_hash=-1,
|
||||
issue_title=ct_md.title or "",
|
||||
issue_id=ct_md.issue_id or "",
|
||||
series_id=ct_md.issue_id or "",
|
||||
month=ct_md.month,
|
||||
year=ct_md.year,
|
||||
publisher=None,
|
||||
image_url=ct_md._cover_image or "",
|
||||
alt_image_urls=[],
|
||||
description=ct_md.description or "",
|
||||
)
|
||||
]
|
||||
|
||||
res = Result(
|
||||
Action.save,
|
||||
|
@ -104,6 +104,8 @@ def save_file(
|
||||
filename: A pathlib.Path object to save the json dictionary to
|
||||
"""
|
||||
file_options = settngs.clean_config(config, file=True)
|
||||
file_options["Quick Tag"]["url"] = str(file_options["Quick Tag"]["url"])
|
||||
|
||||
try:
|
||||
if not filename.exists():
|
||||
filename.parent.mkdir(exist_ok=True, parents=True)
|
||||
|
@ -27,7 +27,7 @@ import settngs
|
||||
|
||||
from comicapi import utils
|
||||
from comicapi.comicarchive import tags
|
||||
from comictaggerlib import ctversion
|
||||
from comictaggerlib import ctversion, quick_tag
|
||||
from comictaggerlib.ctsettings.settngs_namespace import SettngsNS as ct_ns
|
||||
from comictaggerlib.ctsettings.types import ComicTaggerPaths, tag
|
||||
from comictaggerlib.resulttypes import Action
|
||||
@ -51,6 +51,12 @@ def initial_commandline_parser() -> argparse.ArgumentParser:
|
||||
default=0,
|
||||
help="Be noisy when doing what it does. Use a second time to enable debug logs.\nShort option cannot be combined with other options.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--enable-quick-tag",
|
||||
action=argparse.BooleanOptionalAction,
|
||||
default=False,
|
||||
help='Enable the expiremental "quick tagger"',
|
||||
)
|
||||
return parser
|
||||
|
||||
|
||||
@ -70,6 +76,13 @@ def register_runtime(parser: settngs.Manager) -> None:
|
||||
help="Be noisy when doing what it does. Use a second time to enable debug logs.\nShort option cannot be combined with other options.",
|
||||
file=False,
|
||||
)
|
||||
parser.add_setting(
|
||||
"--enable-quick-tag",
|
||||
action=argparse.BooleanOptionalAction,
|
||||
default=False,
|
||||
help='Enable the expiremental "quick tagger"',
|
||||
file=False,
|
||||
)
|
||||
parser.add_setting("-q", "--quiet", action="store_true", help="Don't say much (for print mode).", file=False)
|
||||
parser.add_setting(
|
||||
"-j",
|
||||
@ -240,9 +253,11 @@ def register_commands(parser: settngs.Manager) -> None:
|
||||
)
|
||||
|
||||
|
||||
def register_commandline_settings(parser: settngs.Manager) -> None:
|
||||
def register_commandline_settings(parser: settngs.Manager, enable_quick_tag: bool) -> None:
|
||||
parser.add_group("Commands", register_commands, True)
|
||||
parser.add_persistent_group("Runtime Options", register_runtime)
|
||||
if enable_quick_tag:
|
||||
parser.add_group("Quick Tag", quick_tag.settings)
|
||||
|
||||
|
||||
def validate_commandline_settings(config: settngs.Config[ct_ns], parser: settngs.Manager) -> settngs.Config[ct_ns]:
|
||||
|
@ -3,6 +3,7 @@ from __future__ import annotations
|
||||
import typing
|
||||
|
||||
import settngs
|
||||
import urllib3.util.url
|
||||
|
||||
import comicapi.genericmetadata
|
||||
import comicapi.merge
|
||||
@ -19,6 +20,7 @@ class SettngsNS(settngs.TypedNS):
|
||||
|
||||
Runtime_Options__config: comictaggerlib.ctsettings.types.ComicTaggerPaths
|
||||
Runtime_Options__verbose: int
|
||||
Runtime_Options__enable_quick_tag: bool
|
||||
Runtime_Options__quiet: bool
|
||||
Runtime_Options__json: bool
|
||||
Runtime_Options__raw: bool
|
||||
@ -37,6 +39,13 @@ class SettngsNS(settngs.TypedNS):
|
||||
Runtime_Options__skip_existing_tags: bool
|
||||
Runtime_Options__files: list[str]
|
||||
|
||||
Quick_Tag__url: urllib3.util.url.Url
|
||||
Quick_Tag__max: int
|
||||
Quick_Tag__simple: bool
|
||||
Quick_Tag__aggressive_filtering: bool
|
||||
Quick_Tag__hash: list[comictaggerlib.quick_tag.HashType]
|
||||
Quick_Tag__skip_non_exact: bool
|
||||
|
||||
internal__install_id: str
|
||||
internal__write_tags: list[str]
|
||||
internal__read_tags: list[str]
|
||||
@ -132,6 +141,7 @@ class Commands(typing.TypedDict):
|
||||
class Runtime_Options(typing.TypedDict):
|
||||
config: comictaggerlib.ctsettings.types.ComicTaggerPaths
|
||||
verbose: int
|
||||
enable_quick_tag: bool
|
||||
quiet: bool
|
||||
json: bool
|
||||
raw: bool
|
||||
@ -151,6 +161,15 @@ class Runtime_Options(typing.TypedDict):
|
||||
files: list[str]
|
||||
|
||||
|
||||
class Quick_Tag(typing.TypedDict):
|
||||
url: urllib3.util.url.Url
|
||||
max: int
|
||||
simple: bool
|
||||
aggressive_filtering: bool
|
||||
hash: list[comictaggerlib.quick_tag.HashType]
|
||||
skip_non_exact: bool
|
||||
|
||||
|
||||
class internal(typing.TypedDict):
|
||||
install_id: str
|
||||
write_tags: list[str]
|
||||
@ -263,6 +282,7 @@ SettngsDict = typing.TypedDict(
|
||||
{
|
||||
"Commands": Commands,
|
||||
"Runtime Options": Runtime_Options,
|
||||
"Quick Tag": Quick_Tag,
|
||||
"internal": internal,
|
||||
"Issue Identifier": Issue_Identifier,
|
||||
"Filename Parsing": Filename_Parsing,
|
||||
|
@ -73,24 +73,23 @@ class ImageHasher:
|
||||
|
||||
return result
|
||||
|
||||
def average_hash2(self) -> None:
|
||||
"""
|
||||
# Got this one from somewhere on the net. Not a clue how the 'convolve2d' works!
|
||||
def difference_hash(self) -> int:
|
||||
try:
|
||||
image = self.image.resize((self.width + 1, self.height), Image.Resampling.LANCZOS).convert("L")
|
||||
except Exception:
|
||||
logger.exception("difference_hash error")
|
||||
return 0
|
||||
|
||||
from numpy import array
|
||||
from scipy.signal import convolve2d
|
||||
pixels = list(image.getdata())
|
||||
diff = ""
|
||||
for y in range(self.height):
|
||||
for x in range(self.width):
|
||||
idx = x + (self.width + 1 * y)
|
||||
diff += str(int(pixels[idx] < pixels[idx + 1]))
|
||||
|
||||
im = self.image.resize((self.width, self.height), Image.ANTIALIAS).convert('L')
|
||||
result = int(diff, 2)
|
||||
|
||||
in_data = array((im.getdata())).reshape(self.width, self.height)
|
||||
filt = array([[0,1,0],[1,-4,1],[0,1,0]])
|
||||
filt_data = convolve2d(in_data,filt,mode='same',boundary='symm').flatten()
|
||||
|
||||
result = reduce(lambda x, (y, z): x | (z << y),
|
||||
enumerate(map(lambda i: 0 if i < 0 else 1, filt_data)),
|
||||
0)
|
||||
return result
|
||||
"""
|
||||
|
||||
def p_hash(self) -> int:
|
||||
"""
|
||||
|
@ -117,7 +117,7 @@ class App:
|
||||
conf = self.initialize()
|
||||
self.initialize_dirs(conf.config)
|
||||
self.load_plugins(conf)
|
||||
self.register_settings()
|
||||
self.register_settings(conf.enable_quick_tag)
|
||||
self.config = self.parse_settings(conf.config)
|
||||
|
||||
self.main()
|
||||
@ -215,13 +215,13 @@ class App:
|
||||
setup_logging(conf.verbose, conf.config.user_log_dir)
|
||||
return conf
|
||||
|
||||
def register_settings(self) -> None:
|
||||
def register_settings(self, enable_quick_tag: bool) -> None:
|
||||
self.manager = settngs.Manager(
|
||||
description="A utility for reading and writing metadata to comic archives.\n\n\n"
|
||||
+ "If no options are given, %(prog)s will run in windowed mode.\nPlease keep the '-v' option separated '-so -v' not '-sov'",
|
||||
epilog="For more help visit the wiki at: https://github.com/comictagger/comictagger/wiki",
|
||||
)
|
||||
ctsettings.register_commandline_settings(self.manager)
|
||||
ctsettings.register_commandline_settings(self.manager, enable_quick_tag)
|
||||
ctsettings.register_file_settings(self.manager)
|
||||
ctsettings.register_plugin_settings(self.manager, getattr(self, "talkers", {}))
|
||||
|
||||
|
391
comictaggerlib/quick_tag.py
Normal file
391
comictaggerlib/quick_tag.py
Normal file
@ -0,0 +1,391 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import itertools
|
||||
import logging
|
||||
from enum import auto
|
||||
from io import BytesIO
|
||||
from typing import Callable, TypedDict, cast
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import requests
|
||||
import settngs
|
||||
from PIL import Image
|
||||
|
||||
from comicapi import comicarchive, utils
|
||||
from comicapi.genericmetadata import GenericMetadata
|
||||
from comicapi.issuestring import IssueString
|
||||
from comictaggerlib.ctsettings.settngs_namespace import SettngsNS
|
||||
from comictaggerlib.imagehasher import ImageHasher
|
||||
from comictalker import ComicTalker
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
__version__ = "0.1"
|
||||
|
||||
|
||||
class HashType(utils.StrEnum):
|
||||
AHASH = auto()
|
||||
DHASH = auto()
|
||||
PHASH = auto()
|
||||
|
||||
|
||||
class SimpleResult(TypedDict):
|
||||
Distance: int
|
||||
# Mapping of domains (eg comicvine.gamespot.com) to IDs
|
||||
IDList: dict[str, list[str]]
|
||||
|
||||
|
||||
class Hash(TypedDict):
|
||||
Hash: int
|
||||
Kind: str
|
||||
|
||||
|
||||
class Result(TypedDict):
|
||||
# Mapping of domains (eg comicvine.gamespot.com) to IDs
|
||||
IDList: dict[str, list[str]]
|
||||
Distance: int
|
||||
Hash: Hash
|
||||
|
||||
|
||||
def ihash(types: str) -> list[HashType]:
|
||||
result: list[HashType] = []
|
||||
types = types.casefold()
|
||||
choices = ", ".join(HashType)
|
||||
for typ in utils.split(types, ","):
|
||||
if typ not in list(HashType):
|
||||
raise argparse.ArgumentTypeError(f"invalid choice: {typ} (choose from {choices.upper()})")
|
||||
result.append(HashType[typ.upper()])
|
||||
|
||||
if not result:
|
||||
raise argparse.ArgumentTypeError(f"invalid choice: {types} (choose from {choices.upper()})")
|
||||
return result
|
||||
|
||||
|
||||
def settings(manager: settngs.Manager) -> None:
|
||||
manager.add_setting(
|
||||
"--url",
|
||||
"-u",
|
||||
default="https://comic-hasher.narnian.us",
|
||||
type=utils.parse_url,
|
||||
help="Website to use for searching cover hashes",
|
||||
)
|
||||
manager.add_setting(
|
||||
"--max",
|
||||
default=8,
|
||||
type=int,
|
||||
help="Maximum score to allow. Lower score means more accurate",
|
||||
)
|
||||
manager.add_setting(
|
||||
"--simple",
|
||||
default=False,
|
||||
action=argparse.BooleanOptionalAction,
|
||||
help="Whether to retrieve simple results or full results",
|
||||
)
|
||||
manager.add_setting(
|
||||
"--aggressive-filtering",
|
||||
default=False,
|
||||
action=argparse.BooleanOptionalAction,
|
||||
help="Will filter out worse matches if better matches are found",
|
||||
)
|
||||
manager.add_setting(
|
||||
"--hash",
|
||||
default="ahash, dhash, phash",
|
||||
type=ihash,
|
||||
help="Pick what hashes you want to use to search (default: %(default)s)",
|
||||
)
|
||||
manager.add_setting(
|
||||
"--skip-non-exact",
|
||||
default=True,
|
||||
action=argparse.BooleanOptionalAction,
|
||||
help="Skip non-exact matches if we have exact matches",
|
||||
)
|
||||
|
||||
|
||||
class QuickTag:
|
||||
def __init__(
|
||||
self, url: utils.Url, domain: str, talker: ComicTalker, config: SettngsNS, output: Callable[[str], None]
|
||||
):
|
||||
self.output = output
|
||||
self.url = url
|
||||
self.talker = talker
|
||||
self.domain = domain
|
||||
self.config = config
|
||||
|
||||
def id_comic(
|
||||
self,
|
||||
ca: comicarchive.ComicArchive,
|
||||
tags: GenericMetadata,
|
||||
simple: bool,
|
||||
hashes: set[HashType],
|
||||
skip_non_exact: bool,
|
||||
interactive: bool,
|
||||
aggressive_filtering: bool,
|
||||
max_hamming_distance: int,
|
||||
) -> GenericMetadata | None:
|
||||
if not ca.seems_to_be_a_comic_archive():
|
||||
raise Exception(f"{ca.path} is not an archive")
|
||||
|
||||
cover_index = tags.get_cover_page_index_list()[0]
|
||||
cover_image = Image.open(BytesIO(ca.get_page(cover_index)))
|
||||
|
||||
self.output(f"Tagging: {ca.path}")
|
||||
|
||||
self.output("hashing cover")
|
||||
phash = dhash = ahash = ""
|
||||
hasher = ImageHasher(image=cover_image)
|
||||
if HashType.AHASH in hashes:
|
||||
ahash = hex(hasher.average_hash())[2:]
|
||||
if HashType.DHASH in hashes:
|
||||
dhash = hex(hasher.difference_hash())[2:]
|
||||
if HashType.PHASH in hashes:
|
||||
phash = hex(hasher.p_hash())[2:]
|
||||
|
||||
logger.info(f"Searching with {ahash=}, {dhash=}, {phash=}")
|
||||
|
||||
self.output("Searching hashes")
|
||||
results = self.SearchHashes(simple, max_hamming_distance, ahash, dhash, phash, skip_non_exact)
|
||||
logger.debug(f"{results=}")
|
||||
|
||||
if simple:
|
||||
filtered_simple_results = self.filter_simple_results(
|
||||
cast(list[SimpleResult], results), interactive, aggressive_filtering
|
||||
)
|
||||
metadata_simple_results = self.get_simple_results(filtered_simple_results)
|
||||
chosen_result = self.display_simple_results(metadata_simple_results, tags, interactive)
|
||||
else:
|
||||
filtered_results = self.filter_results(cast(list[Result], results), interactive, aggressive_filtering)
|
||||
metadata_results = self.get_results(filtered_results)
|
||||
chosen_result = self.display_results(metadata_results, tags, interactive)
|
||||
|
||||
return self.talker.fetch_comic_data(issue_id=chosen_result.issue_id)
|
||||
|
||||
def SearchHashes(
|
||||
self, simple: bool, max_hamming_distance: int, ahash: str, dhash: str, phash: str, skip_non_exact: bool
|
||||
) -> list[SimpleResult] | list[Result]:
|
||||
|
||||
resp = requests.get(
|
||||
urljoin(self.url.url, "/match_cover_hash"),
|
||||
params={
|
||||
"simple": str(simple),
|
||||
"max": str(max_hamming_distance),
|
||||
"ahash": ahash,
|
||||
"dhash": dhash,
|
||||
"phash": phash,
|
||||
"skipNonExact": str(skip_non_exact),
|
||||
},
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
try:
|
||||
text = resp.json()["msg"]
|
||||
except Exception:
|
||||
text = resp.text
|
||||
if text == "No hashes found":
|
||||
return []
|
||||
logger.error("message from server: %s", text)
|
||||
raise Exception(f"Failed to retrieve results from the server: {text}")
|
||||
return resp.json()["results"]
|
||||
|
||||
def get_mds(self, results: list[SimpleResult] | list[Result]) -> list[GenericMetadata]:
|
||||
md_results: list[GenericMetadata] = []
|
||||
results.sort(key=lambda r: r["Distance"])
|
||||
all_ids = set()
|
||||
for res in results:
|
||||
all_ids.update(res["IDList"].get(self.domain, []))
|
||||
|
||||
self.output(f"Retrieving basic {self.talker.name} data")
|
||||
# Try to do a bulk feth of basic issue data
|
||||
if hasattr(self.talker, "fetch_comics"):
|
||||
md_results = self.talker.fetch_comics(issue_ids=list(all_ids))
|
||||
else:
|
||||
for md_id in all_ids:
|
||||
md_results.append(self.talker.fetch_comic_data(issue_id=md_id))
|
||||
return md_results
|
||||
|
||||
def get_simple_results(self, results: list[SimpleResult]) -> list[tuple[int, GenericMetadata]]:
|
||||
md_results = []
|
||||
mds = self.get_mds(results)
|
||||
|
||||
# Re-associate the md to the distance
|
||||
for res in results:
|
||||
for md in mds:
|
||||
if md.issue_id in res["IDList"].get(self.domain, []):
|
||||
md_results.append((res["Distance"], md))
|
||||
return md_results
|
||||
|
||||
def get_results(self, results: list[Result]) -> list[tuple[int, Hash, GenericMetadata]]:
|
||||
md_results = []
|
||||
mds = self.get_mds(results)
|
||||
|
||||
# Re-associate the md to the distance
|
||||
for res in results:
|
||||
for md in mds:
|
||||
if md.issue_id in res["IDList"].get(self.domain, []):
|
||||
md_results.append((res["Distance"], res["Hash"], md))
|
||||
return md_results
|
||||
|
||||
def filter_simple_results(
|
||||
self, results: list[SimpleResult], interactive: bool, aggressive_filtering: bool
|
||||
) -> list[SimpleResult]:
|
||||
# If there is a single exact match return it
|
||||
exact = [r for r in results if r["Distance"] == 0]
|
||||
if len(exact) == 1:
|
||||
logger.info("Exact result found. Ignoring any others")
|
||||
return exact
|
||||
|
||||
# If ther are more than 4 results and any are better than 6 return the first group of results
|
||||
if len(results) > 4:
|
||||
dist: list[tuple[int, list[SimpleResult]]] = []
|
||||
filtered_results: list[SimpleResult] = []
|
||||
for distance, group in itertools.groupby(results, key=lambda r: r["Distance"]):
|
||||
dist.append((distance, list(group)))
|
||||
if aggressive_filtering and dist[0][0] < 6:
|
||||
logger.info(f"Aggressive filtering is enabled. Dropping matches above {dist[0]}")
|
||||
for _, res in dist[:1]:
|
||||
filtered_results.extend(res)
|
||||
logger.debug(f"{filtered_results=}")
|
||||
return filtered_results
|
||||
return results
|
||||
|
||||
def filter_results(self, results: list[Result], interactive: bool, aggressive_filtering: bool) -> list[Result]:
|
||||
ahash_results = sorted([r for r in results if r["Hash"]["Kind"] == "ahash"], key=lambda r: r["Distance"])
|
||||
dhash_results = sorted([r for r in results if r["Hash"]["Kind"] == "dhash"], key=lambda r: r["Distance"])
|
||||
phash_results = sorted([r for r in results if r["Hash"]["Kind"] == "phash"], key=lambda r: r["Distance"])
|
||||
hash_results = [phash_results, dhash_results, ahash_results]
|
||||
|
||||
# If any of the hash types have a single exact match return it. Prefer phash for no particular reason
|
||||
for hashed_result in hash_results:
|
||||
exact = [r for r in hashed_result if r["Distance"] == 0]
|
||||
if len(exact) == 1:
|
||||
logger.info(f"Exact {exact[0]['Hash']['Kind']} result found. Ignoring any others")
|
||||
return exact
|
||||
|
||||
results_filtered = False
|
||||
# If any of the hash types have more than 4 results and they have results better than 6 return the first group of results for each hash type
|
||||
for i, hashed_results in enumerate(hash_results):
|
||||
filtered_results: list[Result] = []
|
||||
if len(hashed_results) > 4:
|
||||
dist: list[tuple[int, list[Result]]] = []
|
||||
for distance, group in itertools.groupby(hashed_results, key=lambda r: r["Distance"]):
|
||||
dist.append((distance, list(group)))
|
||||
if aggressive_filtering and dist[0][0] < 6:
|
||||
logger.info(
|
||||
f"Aggressive filtering is enabled. Dropping {dist[0][1][0]['Hash']['Kind']} matches above {dist[0][0]}"
|
||||
)
|
||||
for _, res in dist[:1]:
|
||||
filtered_results.extend(res)
|
||||
|
||||
if filtered_results:
|
||||
hash_results[i] = filtered_results
|
||||
results_filtered = True
|
||||
if results_filtered:
|
||||
logger.debug(f"filtered_results={list(itertools.chain(*hash_results))}")
|
||||
return list(itertools.chain(*hash_results))
|
||||
|
||||
def display_simple_results(
|
||||
self, md_results: list[tuple[int, GenericMetadata]], tags: GenericMetadata, interactive: bool
|
||||
) -> GenericMetadata:
|
||||
if len(md_results) < 1:
|
||||
return GenericMetadata()
|
||||
if len(md_results) == 1 and md_results[0][0] <= 4:
|
||||
self.output("Found a single match <=4. Assuming it's correct")
|
||||
return md_results[0][1]
|
||||
series_match: list[GenericMetadata] = []
|
||||
for score, md in md_results:
|
||||
if (
|
||||
score < 10
|
||||
and tags.series
|
||||
and md.series
|
||||
and utils.titles_match(tags.series, md.series)
|
||||
and IssueString(tags.issue).as_string() == IssueString(md.issue).as_string()
|
||||
):
|
||||
series_match.append(md)
|
||||
if len(series_match) == 1:
|
||||
self.output(f"Found match with series name {series_match[0].series!r}")
|
||||
return series_match[0]
|
||||
|
||||
if not interactive:
|
||||
return GenericMetadata()
|
||||
|
||||
md_results.sort(key=lambda r: (r[0], len(r[1].publisher or "")))
|
||||
for counter, r in enumerate(md_results, 1):
|
||||
self.output(
|
||||
" {:2}. score: {} [{:15}] ({:02}/{:04}) - {} #{} - {}".format(
|
||||
counter,
|
||||
r[0],
|
||||
r[1].publisher,
|
||||
r[1].month or 0,
|
||||
r[1].year or 0,
|
||||
r[1].series,
|
||||
r[1].issue,
|
||||
r[1].title,
|
||||
),
|
||||
)
|
||||
while True:
|
||||
i = input(
|
||||
f'Please select a result to tag the comic with or "q" to quit: [1-{len(md_results)}] ',
|
||||
).casefold()
|
||||
if i.isdigit() and int(i) in range(1, len(md_results) + 1):
|
||||
break
|
||||
if i == "q":
|
||||
logger.warning("User quit without saving metadata")
|
||||
return GenericMetadata()
|
||||
|
||||
return md_results[int(i) - 1][1]
|
||||
|
||||
def display_results(
|
||||
self,
|
||||
md_results: list[tuple[int, Hash, GenericMetadata]],
|
||||
tags: GenericMetadata,
|
||||
interactive: bool,
|
||||
) -> GenericMetadata:
|
||||
if len(md_results) < 1:
|
||||
return GenericMetadata()
|
||||
if len(md_results) == 1 and md_results[0][0] <= 4:
|
||||
self.output("Found a single match <=4. Assuming it's correct")
|
||||
return md_results[0][2]
|
||||
series_match: dict[str, tuple[int, Hash, GenericMetadata]] = {}
|
||||
for score, cover_hash, md in md_results:
|
||||
if (
|
||||
score < 10
|
||||
and tags.series
|
||||
and md.series
|
||||
and utils.titles_match(tags.series, md.series)
|
||||
and IssueString(tags.issue).as_string() == IssueString(md.issue).as_string()
|
||||
):
|
||||
assert md.issue_id
|
||||
series_match[md.issue_id] = (score, cover_hash, md)
|
||||
|
||||
if len(series_match) == 1:
|
||||
score, cover_hash, md = list(series_match.values())[0]
|
||||
self.output(f"Found {cover_hash['Kind']} {score=} match with series name {md.series!r}")
|
||||
return md
|
||||
if not interactive:
|
||||
return GenericMetadata()
|
||||
md_results.sort(key=lambda r: (r[0], len(r[2].publisher or ""), r[1]["Kind"]))
|
||||
for counter, r in enumerate(md_results, 1):
|
||||
self.output(
|
||||
" {:2}. score: {} {}: {:064b} [{:15}] ({:02}/{:04}) - {} #{} - {}".format(
|
||||
counter,
|
||||
r[0],
|
||||
r[1]["Kind"],
|
||||
r[1]["Hash"],
|
||||
r[2].publisher or "",
|
||||
r[2].month or 0,
|
||||
r[2].year or 0,
|
||||
r[2].series or "",
|
||||
r[2].issue or "",
|
||||
r[2].title or "",
|
||||
),
|
||||
)
|
||||
while True:
|
||||
i = input(
|
||||
f'Please select a result to tag the comic with or "q" to quit: [1-{len(md_results)}] ',
|
||||
).casefold()
|
||||
if i.isdigit() and int(i) in range(1, len(md_results) + 1):
|
||||
break
|
||||
if i == "q":
|
||||
self.output("User quit without saving metadata")
|
||||
return GenericMetadata()
|
||||
|
||||
return md_results[int(i) - 1][2]
|
@ -410,6 +410,132 @@ class ComicVineTalker(ComicTalker):
|
||||
|
||||
return formatted_filtered_issues_result
|
||||
|
||||
def fetch_comics(self, *, issue_ids: list[str]) -> list[GenericMetadata]:
|
||||
# before we search online, look in our cache, since we might already have this info
|
||||
cvc = ComicCacher(self.cache_folder, self.version)
|
||||
cached_results: list[GenericMetadata] = []
|
||||
needed_issues: list[int] = []
|
||||
for issue_id in issue_ids:
|
||||
cached_issue = cvc.get_issue_info(issue_id, self.id)
|
||||
|
||||
if cached_issue and cached_issue[1]:
|
||||
cached_results.append(
|
||||
self._map_comic_issue_to_metadata(
|
||||
json.loads(cached_issue[0].data),
|
||||
self._fetch_series([int(cached_issue[0].series_id)])[0][0],
|
||||
),
|
||||
)
|
||||
else:
|
||||
needed_issues.append(int(issue_id)) # CV uses integers for it's IDs
|
||||
|
||||
if not needed_issues:
|
||||
return cached_results
|
||||
issue_filter = ""
|
||||
for iid in needed_issues:
|
||||
issue_filter += str(iid) + "|"
|
||||
flt = "id:" + issue_filter.rstrip("|")
|
||||
|
||||
issue_url = urljoin(self.api_url, "issues/")
|
||||
params: dict[str, Any] = {
|
||||
"api_key": self.api_key,
|
||||
"format": "json",
|
||||
"filter": flt,
|
||||
}
|
||||
cv_response: CVResult[list[CVIssue]] = self._get_cv_content(issue_url, params)
|
||||
|
||||
issue_results = cv_response["results"]
|
||||
page = 1
|
||||
offset = 0
|
||||
current_result_count = cv_response["number_of_page_results"]
|
||||
total_result_count = cv_response["number_of_total_results"]
|
||||
|
||||
# see if we need to keep asking for more pages...
|
||||
while current_result_count < total_result_count:
|
||||
page += 1
|
||||
offset += cv_response["number_of_page_results"]
|
||||
|
||||
params["offset"] = offset
|
||||
cv_response = self._get_cv_content(issue_url, params)
|
||||
|
||||
issue_results.extend(cv_response["results"])
|
||||
current_result_count += cv_response["number_of_page_results"]
|
||||
|
||||
series_info = {s[0].id: s[0] for s in self._fetch_series([int(i["volume"]["id"]) for i in issue_results])}
|
||||
|
||||
for issue in issue_results:
|
||||
cvc.add_issues_info(
|
||||
self.id,
|
||||
[
|
||||
Issue(
|
||||
id=str(issue["id"]),
|
||||
series_id=str(issue["volume"]["id"]),
|
||||
data=json.dumps(issue).encode("utf-8"),
|
||||
),
|
||||
],
|
||||
True,
|
||||
)
|
||||
cached_results.append(
|
||||
self._map_comic_issue_to_metadata(issue, series_info[str(issue["volume"]["id"])]),
|
||||
)
|
||||
|
||||
return cached_results
|
||||
|
||||
def _fetch_series(self, series_ids: list[int]) -> list[tuple[ComicSeries, bool]]:
|
||||
# before we search online, look in our cache, since we might already have this info
|
||||
cvc = ComicCacher(self.cache_folder, self.version)
|
||||
cached_results: list[tuple[ComicSeries, bool]] = []
|
||||
needed_series: list[int] = []
|
||||
for series_id in series_ids:
|
||||
cached_series = cvc.get_series_info(str(series_id), self.id)
|
||||
if cached_series is not None:
|
||||
cached_results.append((self._format_series(json.loads(cached_series[0].data)), cached_series[1]))
|
||||
else:
|
||||
needed_series.append(series_id)
|
||||
|
||||
if needed_series == []:
|
||||
return cached_results
|
||||
|
||||
series_filter = ""
|
||||
for vid in needed_series:
|
||||
series_filter += str(vid) + "|"
|
||||
flt = "id:" + series_filter.rstrip("|") # CV uses volume to mean series
|
||||
|
||||
series_url = urljoin(self.api_url, "volumes/") # CV uses volume to mean series
|
||||
params: dict[str, Any] = {
|
||||
"api_key": self.api_key,
|
||||
"format": "json",
|
||||
"filter": flt,
|
||||
}
|
||||
cv_response: CVResult[list[CVSeries]] = self._get_cv_content(series_url, params)
|
||||
|
||||
series_results = cv_response["results"]
|
||||
page = 1
|
||||
offset = 0
|
||||
current_result_count = cv_response["number_of_page_results"]
|
||||
total_result_count = cv_response["number_of_total_results"]
|
||||
|
||||
# see if we need to keep asking for more pages...
|
||||
while current_result_count < total_result_count:
|
||||
page += 1
|
||||
offset += cv_response["number_of_page_results"]
|
||||
|
||||
params["offset"] = offset
|
||||
cv_response = self._get_cv_content(series_url, params)
|
||||
|
||||
series_results.extend(cv_response["results"])
|
||||
current_result_count += cv_response["number_of_page_results"]
|
||||
|
||||
if series_results:
|
||||
for series in series_results:
|
||||
cvc.add_series_info(
|
||||
self.id,
|
||||
Series(id=str(series["id"]), data=json.dumps(series).encode("utf-8")),
|
||||
True,
|
||||
)
|
||||
cached_results.append((self._format_series(series), True))
|
||||
|
||||
return cached_results
|
||||
|
||||
def _get_cv_content(self, url: str, params: dict[str, Any]) -> CVResult[T]:
|
||||
"""
|
||||
Get the content from the CV server.
|
||||
|
@ -197,7 +197,7 @@ def config(tmp_path):
|
||||
from comictaggerlib.main import App
|
||||
|
||||
app = App()
|
||||
app.register_settings()
|
||||
app.register_settings(False)
|
||||
|
||||
defaults = app.parse_settings(comictaggerlib.ctsettings.ComicTaggerPaths(tmp_path / "config"), "")
|
||||
defaults[0].Runtime_Options__config.user_config_dir.mkdir(parents=True, exist_ok=True)
|
||||
@ -214,7 +214,7 @@ def plugin_config(tmp_path):
|
||||
ns = Namespace(config=comictaggerlib.ctsettings.ComicTaggerPaths(tmp_path / "config"))
|
||||
app = App()
|
||||
app.load_plugins(ns)
|
||||
app.register_settings()
|
||||
app.register_settings(False)
|
||||
|
||||
defaults = app.parse_settings(ns.config, "")
|
||||
defaults[0].Runtime_Options__config.user_config_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
Loading…
Reference in New Issue
Block a user