667 lines
26 KiB
Python
667 lines
26 KiB
Python
"""A class to automatically identify a comic archive"""
|
|
|
|
#
|
|
# Copyright 2012-2014 ComicTagger Authors
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
from __future__ import annotations
|
|
|
|
import io
|
|
import logging
|
|
from operator import attrgetter
|
|
from typing import Any, Callable
|
|
|
|
from typing_extensions import NotRequired, TypedDict
|
|
|
|
from comicapi import utils
|
|
from comicapi.comicarchive import ComicArchive
|
|
from comicapi.genericmetadata import ComicSeries, GenericMetadata
|
|
from comicapi.issuestring import IssueString
|
|
from comictaggerlib.ctsettings import ct_ns
|
|
from comictaggerlib.imagefetcher import ImageFetcher, ImageFetcherException
|
|
from comictaggerlib.imagehasher import ImageHasher
|
|
from comictaggerlib.resulttypes import IssueResult
|
|
from comictalker.comictalker import ComicTalker, TalkerError
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
from PIL import Image, ImageChops
|
|
|
|
pil_available = True
|
|
except ImportError:
|
|
pil_available = False
|
|
|
|
|
|
class SearchKeys(TypedDict):
|
|
series: str
|
|
issue_number: str
|
|
alternate_number: str | None
|
|
month: int | None
|
|
year: int | None
|
|
issue_count: int | None
|
|
alternate_count: int | None
|
|
publisher: str | None
|
|
imprint: str | None
|
|
|
|
|
|
class Score(TypedDict):
|
|
score: NotRequired[int]
|
|
url: str
|
|
remote_hash: int
|
|
local_hash_name: str
|
|
local_hash: int
|
|
|
|
|
|
class IssueIdentifierNetworkError(Exception): ...
|
|
|
|
|
|
class IssueIdentifierCancelled(Exception): ...
|
|
|
|
|
|
class IssueIdentifier:
|
|
result_no_matches = 0
|
|
result_found_match_but_bad_cover_score = 1
|
|
result_found_match_but_not_first_page = 2
|
|
result_multiple_matches_with_bad_image_scores = 3
|
|
result_one_good_match = 4
|
|
result_multiple_good_matches = 5
|
|
|
|
def __init__(
|
|
self,
|
|
comic_archive: ComicArchive,
|
|
config: ct_ns,
|
|
talker: ComicTalker,
|
|
metadata: GenericMetadata = GenericMetadata(),
|
|
) -> None:
|
|
self.config = config
|
|
self.talker = talker
|
|
self.comic_archive: ComicArchive = comic_archive
|
|
self.md = metadata
|
|
self.image_hasher = 1
|
|
|
|
self.only_use_additional_meta_data = False
|
|
|
|
# a decent hamming score, good enough to call it a match
|
|
self.min_score_thresh: int = 16
|
|
|
|
# for alternate covers, be more stringent, since we're a bit more
|
|
# scattershot in comparisons
|
|
self.min_alternate_score_thresh = 12
|
|
|
|
# the min distance a hamming score must be to separate itself from
|
|
# closest neighbor
|
|
self.min_score_distance = 4
|
|
|
|
# a very strong hamming score, almost certainly the same image
|
|
self.strong_score_thresh = 8
|
|
|
|
# used to eliminate series names that are too long based on our search
|
|
# string
|
|
self.series_match_thresh = config.Issue_Identifier__series_match_identify_thresh
|
|
|
|
# used to eliminate unlikely publishers
|
|
self.use_publisher_filter = config.Auto_Tag__use_publisher_filter
|
|
self.publisher_filter = [s.strip().casefold() for s in config.Auto_Tag__publisher_filter]
|
|
|
|
self.additional_metadata = GenericMetadata()
|
|
self.output_function: Callable[[str], None] = print
|
|
self.progress_callback: Callable[[int, int], None] | None = None
|
|
self.cover_url_callback: Callable[[bytes], None] | None = None
|
|
self.search_result = self.result_no_matches
|
|
self.cover_page_index = 0
|
|
self.cancel = False
|
|
|
|
self.match_list: list[IssueResult] = []
|
|
|
|
def set_output_function(self, func: Callable[[str], None]) -> None:
|
|
self.output_function = func
|
|
|
|
def set_progress_callback(self, cb_func: Callable[[int, int], None]) -> None:
|
|
self.progress_callback = cb_func
|
|
|
|
def set_cover_url_callback(self, cb_func: Callable[[bytes], None]) -> None:
|
|
self.cover_url_callback = cb_func
|
|
|
|
def calculate_hash(self, image_data: bytes) -> int:
|
|
if self.image_hasher == 3:
|
|
return ImageHasher(data=image_data).p_hash()
|
|
if self.image_hasher == 2:
|
|
return -1 # ImageHasher(data=image_data).average_hash2()
|
|
|
|
return ImageHasher(data=image_data).average_hash()
|
|
|
|
def log_msg(self, msg: Any) -> None:
|
|
msg = str(msg)
|
|
for handler in logging.getLogger().handlers:
|
|
handler.flush()
|
|
self.output(msg)
|
|
|
|
def output(self, *args: Any, file: Any = None, **kwargs: Any) -> None:
|
|
# We intercept and discard the file argument otherwise everything is passed to self.output_function
|
|
|
|
# Ensure args[0] is defined and is a string for logger.info
|
|
if not args:
|
|
log_args: tuple[Any, ...] = ("",)
|
|
elif isinstance(args[0], str):
|
|
log_args = (args[0].strip("\n"), *args[1:])
|
|
else:
|
|
log_args = args
|
|
log_msg = " ".join([str(x) for x in log_args])
|
|
|
|
# Always send to logger so that we have a record for troubleshooting
|
|
logger.info(log_msg, **kwargs)
|
|
|
|
# If we are verbose or quiet we don't need to call the output function
|
|
if self.config.Runtime_Options__verbose > 0 or self.config.Runtime_Options__quiet:
|
|
return
|
|
|
|
# default output is stdout
|
|
self.output_function(*args, **kwargs)
|
|
|
|
def identify(self, ca: ComicArchive, md: GenericMetadata) -> tuple[int, list[IssueResult]]:
|
|
if not self._check_requirements(ca):
|
|
return self.result_no_matches, []
|
|
|
|
terms, images, extra_images = self._get_search_terms(ca, md)
|
|
|
|
# we need, at minimum, a series and issue number
|
|
if not (terms["series"] and terms["issue_number"]):
|
|
self.log_msg("Not enough info for a search!")
|
|
return self.result_no_matches, []
|
|
|
|
self._print_terms(terms, images)
|
|
|
|
issues = self._search_for_issues(terms)
|
|
|
|
self.log_msg(f"Found {len(issues)} series that have an issue #{terms['issue_number']}")
|
|
|
|
final_cover_matching = self._cover_matching(terms, images, extra_images, issues)
|
|
|
|
# One more test for the case choosing limited series first issue vs a trade with the same cover:
|
|
# if we have a given issue count > 1 and the series from CV has count==1, remove it from match list
|
|
if len(final_cover_matching) > 1 and terms["issue_count"] is not None and terms["issue_count"] != 1:
|
|
for match in final_cover_matching.copy():
|
|
if match.issue_count == 1:
|
|
self.log_msg(
|
|
f"Removing series {match.series} [{match.series_id}] from consideration (only 1 issue)"
|
|
)
|
|
final_cover_matching.remove(match)
|
|
|
|
if final_cover_matching:
|
|
best_score = final_cover_matching[0].distance
|
|
else:
|
|
best_score = 0
|
|
if best_score >= self.min_score_thresh:
|
|
if len(final_cover_matching) == 1:
|
|
self.log_msg("No matching pages in the issue.")
|
|
self.log_msg("--------------------------------------------------------------------------")
|
|
self._print_match(final_cover_matching[0])
|
|
self.log_msg("--------------------------------------------------------------------------")
|
|
search_result = self.result_found_match_but_bad_cover_score
|
|
else:
|
|
self.log_msg("--------------------------------------------------------------------------")
|
|
self.log_msg("Multiple bad cover matches! Need to use other info...")
|
|
self.log_msg("--------------------------------------------------------------------------")
|
|
search_result = self.result_multiple_matches_with_bad_image_scores
|
|
else:
|
|
if len(final_cover_matching) == 1:
|
|
self.log_msg("--------------------------------------------------------------------------")
|
|
self._print_match(final_cover_matching[0])
|
|
self.log_msg("--------------------------------------------------------------------------")
|
|
search_result = self.result_one_good_match
|
|
|
|
elif len(self.match_list) == 0:
|
|
self.log_msg("--------------------------------------------------------------------------")
|
|
self.log_msg("No matches found :(")
|
|
self.log_msg("--------------------------------------------------------------------------")
|
|
search_result = self.result_no_matches
|
|
else:
|
|
# we've got multiple good matches:
|
|
self.log_msg("More than one likely candidate.")
|
|
search_result = self.result_multiple_good_matches
|
|
self.log_msg("--------------------------------------------------------------------------")
|
|
for match_item in final_cover_matching:
|
|
self._print_match(match_item)
|
|
self.log_msg("--------------------------------------------------------------------------")
|
|
|
|
return search_result, final_cover_matching
|
|
|
|
def _crop_double_page(self, im: Image.Image) -> Image.Image | None:
|
|
w, h = im.size
|
|
|
|
try:
|
|
cropped_im = im.crop((int(w / 2), 0, w, h))
|
|
except Exception:
|
|
logger.exception("cropCover() error")
|
|
return None
|
|
|
|
return cropped_im
|
|
|
|
# Adapted from https://stackoverflow.com/a/10616717/20629671
|
|
def _crop_border(self, im: Image.Image, ratio: int) -> Image.Image | None:
|
|
assert Image
|
|
assert ImageChops
|
|
# RGBA doesn't work????
|
|
tmp = im.convert("RGB")
|
|
|
|
bg = Image.new("RGB", tmp.size, "black")
|
|
diff = ImageChops.difference(tmp, bg)
|
|
diff = ImageChops.add(diff, diff, 2.0, -100)
|
|
|
|
bbox = diff.getbbox()
|
|
|
|
width_percent = 0
|
|
height_percent = 0
|
|
|
|
# If bbox is None that should mean it's solid black
|
|
if bbox:
|
|
width = bbox[2] - bbox[0]
|
|
height = bbox[3] - bbox[1]
|
|
|
|
# Convert to percent
|
|
width_percent = int(100 - ((width / im.width) * 100))
|
|
height_percent = int(100 - ((height / im.height) * 100))
|
|
logger.debug(
|
|
"Width: %s Height: %s, ratio: %s %s ratio met: %s",
|
|
im.width,
|
|
im.height,
|
|
width_percent,
|
|
height_percent,
|
|
width_percent > ratio or height_percent > ratio,
|
|
)
|
|
|
|
# If there is a difference return the image otherwise return None
|
|
if width_percent > ratio or height_percent > ratio:
|
|
return im.crop(bbox)
|
|
return None
|
|
|
|
def _get_remote_hashes(self, urls: list[str]) -> list[tuple[str, int]]:
|
|
remote_hashes: list[tuple[str, int]] = []
|
|
for url in urls:
|
|
try:
|
|
alt_url_image_data = ImageFetcher(self.config.Runtime_Options__config.user_cache_dir).fetch(
|
|
url, blocking=True
|
|
)
|
|
except ImageFetcherException as e:
|
|
self.log_msg(f"Network issue while fetching alt. cover image from {self.talker.name}. Aborting...")
|
|
raise IssueIdentifierNetworkError from e
|
|
|
|
self._user_canceled(self.cover_url_callback, alt_url_image_data)
|
|
|
|
remote_hashes.append((url, self.calculate_hash(alt_url_image_data)))
|
|
|
|
if self.cancel:
|
|
raise IssueIdentifierCancelled
|
|
return remote_hashes
|
|
|
|
def _get_issue_cover_match_score(
|
|
self,
|
|
primary_img_url: str,
|
|
alt_urls: list[str],
|
|
local_hashes: list[tuple[str, int]],
|
|
use_alt_urls: bool = False,
|
|
) -> Score:
|
|
# local_hashes is a list of pre-calculated hashes.
|
|
# use_alt_urls - indicates to use alternate covers from CV
|
|
|
|
# If there is no URL return 100
|
|
if not primary_img_url:
|
|
return Score(score=100, url="", remote_hash=0)
|
|
|
|
self._user_canceled()
|
|
|
|
urls = [primary_img_url]
|
|
if use_alt_urls:
|
|
urls.extend(alt_urls)
|
|
self.log_msg(f"[{len(alt_urls)} alt. covers]")
|
|
|
|
remote_hashes = self._get_remote_hashes(urls)
|
|
|
|
score_list = []
|
|
done = False
|
|
for local_hash in local_hashes:
|
|
for remote_hash in remote_hashes:
|
|
score = ImageHasher.hamming_distance(local_hash[1], remote_hash[1])
|
|
score_list.append(
|
|
Score(
|
|
score=score,
|
|
url=remote_hash[0],
|
|
remote_hash=remote_hash[1],
|
|
local_hash_name=local_hash[0],
|
|
local_hash=local_hash[1],
|
|
)
|
|
)
|
|
|
|
self.log_msg(f" - {score:03}")
|
|
|
|
if score <= self.strong_score_thresh:
|
|
# such a good score, we can quit now, since for sure we have a winner
|
|
done = True
|
|
break
|
|
if done:
|
|
break
|
|
|
|
best_score_item = min(score_list, key=lambda x: x["score"])
|
|
|
|
return best_score_item
|
|
|
|
def _check_requirements(self, ca: ComicArchive) -> bool:
|
|
|
|
if not pil_available:
|
|
self.log_msg("Python Imaging Library (PIL) is not available and is needed for issue identification.")
|
|
return False
|
|
|
|
if not ca.seems_to_be_a_comic_archive():
|
|
self.log_msg(f"Sorry, but {ca.path} is not a comic archive!")
|
|
return False
|
|
return True
|
|
|
|
def _process_cover(self, name: str, image_data: bytes) -> list[tuple[str, Image.Image]]:
|
|
assert Image
|
|
cover_image = Image.open(io.BytesIO(image_data))
|
|
images = [(name, cover_image)]
|
|
|
|
# check the aspect ratio
|
|
# if it's wider than it is high, it's probably a two page spread (back_cover, front_cover)
|
|
# if so, crop it and calculate a second hash
|
|
aspect_ratio = float(cover_image.height) / float(cover_image.width)
|
|
if aspect_ratio < 1.0:
|
|
im = self._crop_double_page(cover_image)
|
|
if im is not None:
|
|
images.append(("double page", im))
|
|
|
|
# Check and remove black borders. Helps in identifying comics with an excessive black border like https://comicvine.gamespot.com/marvel-graphic-novel-1-the-death-of-captain-marvel/4000-21782/
|
|
cropped = self._crop_border(cover_image, self.config.Issue_Identifier__border_crop_percent)
|
|
if cropped is not None:
|
|
images.append(("black border cropped", cropped))
|
|
|
|
return images
|
|
|
|
def _get_images(self, ca: ComicArchive, md: GenericMetadata) -> list[tuple[str, Image.Image]]:
|
|
covers: list[tuple[str, Image.Image]] = []
|
|
for cover_index in md.get_cover_page_index_list():
|
|
image_data = ca.get_page(cover_index)
|
|
covers.extend(self._process_cover(f"{cover_index}", image_data))
|
|
return covers
|
|
|
|
def _get_extra_images(self, ca: ComicArchive, md: GenericMetadata) -> list[tuple[str, Image.Image]]:
|
|
assert md
|
|
covers: list[tuple[str, Image.Image]] = []
|
|
for cover_index in range(1, min(3, ca.get_number_of_pages())):
|
|
image_data = ca.get_page(cover_index)
|
|
covers.extend(self._process_cover(f"{cover_index}", image_data))
|
|
return covers
|
|
|
|
def _get_search_keys(self, md: GenericMetadata) -> Any:
|
|
search_keys = SearchKeys(
|
|
series=md.series,
|
|
issue_number=IssueString(md.issue).as_string(),
|
|
alternate_number=IssueString(md.alternate_number).as_string(),
|
|
month=md.month,
|
|
year=md.year,
|
|
issue_count=md.issue_count,
|
|
alternate_count=md.alternate_count,
|
|
publisher=md.publisher,
|
|
imprint=md.imprint,
|
|
)
|
|
return search_keys
|
|
|
|
def _get_search_terms(
|
|
self, ca: ComicArchive, md: GenericMetadata
|
|
) -> tuple[SearchKeys, list[tuple[str, Image.Image]], list[tuple[str, Image.Image]]]:
|
|
return self._get_search_keys(md), self._get_images(ca, md), self._get_extra_images(ca, md)
|
|
|
|
def _user_canceled(self, callback: Callable[..., Any] | None = None, *args: Any) -> Any:
|
|
if self.cancel:
|
|
raise IssueIdentifierCancelled
|
|
if callback is not None:
|
|
return callback(*args)
|
|
|
|
def _print_terms(self, keys: SearchKeys, images: list[tuple[str, Image.Image]]) -> None:
|
|
assert keys["series"]
|
|
assert keys["issue_number"]
|
|
self.log_msg(f"Using {self.talker.name} to search for:")
|
|
self.log_msg("\tSeries: " + keys["series"])
|
|
self.log_msg("\tIssue: " + keys["issue_number"])
|
|
# if keys["alternate_number"] is not None:
|
|
# self.log_msg("\tAlternate Issue: " + str(keys["alternate_number"]))
|
|
if keys["month"] is not None:
|
|
self.log_msg("\tMonth: " + str(keys["month"]))
|
|
if keys["year"] is not None:
|
|
self.log_msg("\tYear: " + str(keys["year"]))
|
|
if keys["issue_count"] is not None:
|
|
self.log_msg("\tCount: " + str(keys["issue_count"]))
|
|
# if keys["alternate_count"] is not None:
|
|
# self.log_msg("\tAlternate Count: " + str(keys["alternate_count"]))
|
|
# if keys["publisher"] is not None:
|
|
# self.log_msg("\tPublisher: " + str(keys["publisher"]))
|
|
# if keys["imprint"] is not None:
|
|
# self.log_msg("\tImprint: " + str(keys["imprint"]))
|
|
for name, _ in images:
|
|
self.log_msg("Cover: " + name)
|
|
|
|
self.log_msg(f"Searching for {keys['series']} #{keys['issue_number']} ...")
|
|
|
|
def _filter_series(self, terms: SearchKeys, search_results: list[ComicSeries]) -> list[ComicSeries]:
|
|
assert terms["series"]
|
|
|
|
filtered_results = []
|
|
for item in search_results:
|
|
length_approved = False
|
|
publisher_approved = True
|
|
date_approved = True
|
|
|
|
# remove any series that starts after the issue year
|
|
if terms["year"] is not None and item.start_year is not None:
|
|
if item.start_year > terms["year"] + 1:
|
|
date_approved = False
|
|
|
|
for name in [item.name, *item.aliases]:
|
|
if utils.titles_match(terms["series"], name, self.series_match_thresh):
|
|
length_approved = True
|
|
break
|
|
# remove any series from publishers on the filter
|
|
if self.use_publisher_filter and item.publisher:
|
|
if item.publisher.casefold() in self.publisher_filter:
|
|
publisher_approved = False
|
|
|
|
if length_approved and publisher_approved and date_approved:
|
|
filtered_results.append(item)
|
|
else:
|
|
logger.debug(
|
|
"Filtered out series: '%s' length approved: '%s', publisher approved: '%s', date approved: '%s'",
|
|
item.name,
|
|
length_approved,
|
|
publisher_approved,
|
|
date_approved,
|
|
)
|
|
return filtered_results
|
|
|
|
def _calculate_hashes(self, images: list[tuple[str, Image.Image]]) -> list[tuple[str, int]]:
|
|
hashes = []
|
|
for name, image in images:
|
|
hashes.append((name, ImageHasher(image=image).average_hash()))
|
|
return hashes
|
|
|
|
def _match_covers(
|
|
self,
|
|
terms: SearchKeys,
|
|
images: list[tuple[str, Image.Image]],
|
|
issues: list[tuple[ComicSeries, GenericMetadata]],
|
|
use_alternates: bool,
|
|
) -> list[IssueResult]:
|
|
assert terms["issue_number"]
|
|
match_results: list[IssueResult] = []
|
|
hashes = self._calculate_hashes(images)
|
|
counter = 0
|
|
alternate = ""
|
|
if use_alternates:
|
|
alternate = " Alternate"
|
|
for series, issue in issues:
|
|
self._user_canceled(self.progress_callback, counter, len(issues))
|
|
counter += 1
|
|
|
|
self.log_msg(
|
|
f"Examining{alternate} covers for Series ID: {series.id} {series.name} ({series.start_year}):",
|
|
)
|
|
|
|
try:
|
|
image_url = issue._cover_image or ""
|
|
alt_urls = issue._alternate_images
|
|
|
|
score_item = self._get_issue_cover_match_score(image_url, alt_urls, hashes, use_alt_urls=use_alternates)
|
|
except Exception:
|
|
logger.exception(f"Scoring series{alternate} covers failed")
|
|
return []
|
|
|
|
match = IssueResult(
|
|
series=f"{series.name} ({series.start_year})",
|
|
distance=score_item["score"],
|
|
issue_number=terms["issue_number"],
|
|
issue_count=series.count_of_issues,
|
|
url_image_hash=score_item["remote_hash"],
|
|
issue_title=issue.title or "",
|
|
issue_id=issue.issue_id or "",
|
|
series_id=series.id,
|
|
month=issue.month,
|
|
year=issue.year,
|
|
publisher=None,
|
|
image_url=image_url,
|
|
alt_image_urls=alt_urls,
|
|
description=issue.description or "",
|
|
)
|
|
if series.publisher is not None:
|
|
match.publisher = series.publisher
|
|
|
|
match_results.append(match)
|
|
|
|
self.log_msg(f"best score {match.distance:03}")
|
|
|
|
self.log_msg("")
|
|
return match_results
|
|
|
|
def _print_match(self, item: IssueResult) -> None:
|
|
self.log_msg(
|
|
"-----> {} #{} {} ({}/{}) -- score: {}".format(
|
|
item.series,
|
|
item.issue_number,
|
|
item.issue_title,
|
|
item.month,
|
|
item.year,
|
|
item.distance,
|
|
)
|
|
)
|
|
|
|
def _search_for_issues(self, terms: SearchKeys) -> list[tuple[ComicSeries, GenericMetadata]]:
|
|
try:
|
|
search_results = self.talker.search_for_series(
|
|
terms["series"],
|
|
callback=lambda x, y: self._user_canceled(self.progress_callback, x, y),
|
|
series_match_thresh=self.config.Issue_Identifier__series_match_search_thresh,
|
|
)
|
|
except TalkerError as e:
|
|
self.log_msg(f"Error searching for series.\n{e}")
|
|
return []
|
|
# except IssueIdentifierCancelled:
|
|
# return []
|
|
|
|
if not search_results:
|
|
return []
|
|
|
|
filtered_series = self._filter_series(terms, search_results)
|
|
if not filtered_series:
|
|
return []
|
|
|
|
self.log_msg(f"Searching in {len(filtered_series)} series")
|
|
|
|
self._user_canceled(self.progress_callback, 0, len(filtered_series))
|
|
|
|
series_by_id = {series.id: series for series in filtered_series}
|
|
|
|
try:
|
|
talker_result = self.talker.fetch_issues_by_series_issue_num_and_year(
|
|
list(series_by_id.keys()), terms["issue_number"], terms["year"]
|
|
)
|
|
except TalkerError as e:
|
|
self.log_msg(f"Issue with while searching for series details. Aborting...\n{e}")
|
|
return []
|
|
# except IssueIdentifierCancelled:
|
|
# return []
|
|
|
|
if not talker_result:
|
|
return []
|
|
|
|
self._user_canceled(self.progress_callback, 0, 0)
|
|
|
|
issues: list[tuple[ComicSeries, GenericMetadata]] = []
|
|
|
|
# now re-associate the issues and series
|
|
for issue in talker_result:
|
|
if issue.series_id in series_by_id:
|
|
issues.append((series_by_id[issue.series_id], issue))
|
|
else:
|
|
logger.warning("Talker '%s' is returning arbitrary series when searching by id", self.talker.id)
|
|
return issues
|
|
|
|
def _cover_matching(
|
|
self,
|
|
terms: SearchKeys,
|
|
images: list[tuple[str, Image.Image]],
|
|
extra_images: list[tuple[str, Image.Image]],
|
|
issues: list[tuple[ComicSeries, GenericMetadata]],
|
|
) -> list[IssueResult]:
|
|
cover_matching_1 = self._match_covers(terms, images, issues, use_alternates=False)
|
|
|
|
if len(cover_matching_1) == 0:
|
|
self.log_msg(":-( no matches!")
|
|
return cover_matching_1
|
|
|
|
# sort list by image match scores
|
|
cover_matching_1.sort(key=attrgetter("distance"))
|
|
|
|
lst = []
|
|
for i in cover_matching_1:
|
|
lst.append(i.distance)
|
|
|
|
self.log_msg(f"Compared to covers in {len(cover_matching_1)} issue(s): {lst}")
|
|
|
|
cover_matching_2 = []
|
|
final_cover_matching = cover_matching_1
|
|
if cover_matching_1[0].distance >= self.min_score_thresh:
|
|
# we have 1 or more low-confidence matches (all bad cover scores)
|
|
# look at a few more pages in the archive, and also alternate covers online
|
|
self.log_msg("Very weak scores for the cover. Analyzing alternate pages and covers...")
|
|
|
|
temp = self._match_covers(terms, images + extra_images, issues, use_alternates=True)
|
|
for score in temp:
|
|
if score.distance < self.min_alternate_score_thresh:
|
|
cover_matching_2.append(score)
|
|
|
|
if len(cover_matching_2) > 0:
|
|
# We did good, found something!
|
|
self.log_msg("Success in secondary/alternate cover matching!")
|
|
|
|
final_cover_matching = cover_matching_2
|
|
# sort new list by image match scores
|
|
final_cover_matching.sort(key=attrgetter("distance"))
|
|
self.log_msg("[Second round cover matching: best score = {best_score}]")
|
|
# now drop down into the rest of the processing
|
|
|
|
best_score = final_cover_matching[0].distance
|
|
# now pare down list, remove any item more than specified distant from the top scores
|
|
for match_item in reversed(final_cover_matching):
|
|
if match_item.distance > (best_score + self.min_score_distance):
|
|
final_cover_matching.remove(match_item)
|
|
return final_cover_matching
|