"""A class to automatically identify a comic archive""" # # Copyright 2012-2014 ComicTagger Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import annotations import dataclasses import functools import io import logging import pathlib from enum import Enum, auto from operator import attrgetter from typing import Any, Callable from typing_extensions import NotRequired, TypedDict from comicapi import utils from comicapi.comicarchive import ComicArchive from comicapi.genericmetadata import ComicSeries, GenericMetadata from comicapi.issuestring import IssueString from comictaggerlib.imagefetcher import ImageFetcher, ImageFetcherException from comictaggerlib.imagehasher import ImageHasher from comictaggerlib.resulttypes import IssueResult from comictalker.comictalker import ComicTalker, RLCallBack, TalkerError logger = logging.getLogger(__name__) try: from PIL import Image, ImageChops pil_available = True except ImportError: pil_available = False class SearchKeys(TypedDict): series: str issue_number: str alternate_number: str | None month: int | None year: int | None issue_count: int | None alternate_count: int | None publisher: str | None imprint: str | None class Score(TypedDict): score: NotRequired[int] url: str remote_hash: int local_hash_name: str local_hash: int class IssueIdentifierNetworkError(Exception): ... class IssueIdentifierCancelled(Exception): ... class Result(Enum): single_good_match = auto() no_matches = auto() single_bad_cover_score = auto() multiple_bad_cover_scores = auto() multiple_good_matches = auto() @dataclasses.dataclass class IssueIdentifierOptions: series_match_search_thresh: int series_match_identify_thresh: int use_publisher_filter: bool publisher_filter: list[str] quiet: bool cache_dir: pathlib.Path border_crop_percent: int talker: ComicTalker class IssueIdentifier: def __init__( self, config: IssueIdentifierOptions, on_rate_limit: RLCallBack | None, output: Callable[[str], Any] = print, on_progress: Callable[[int, int, bytes], Any] | None = None, ) -> None: self.config = config self.talker = config.talker self.image_hasher = 1 self.only_use_additional_meta_data = False # a decent hamming score, good enough to call it a match self.min_score_thresh: int = 16 # for alternate covers, be more stringent, since we're a bit more # scattershot in comparisons self.min_alternate_score_thresh = 12 # the min distance a hamming score must be to separate itself from # closest neighbor self.min_score_distance = 4 # a very strong hamming score, almost certainly the same image self.strong_score_thresh = 8 # used to eliminate series names that are too long based on our search # string self.series_match_thresh = config.series_match_identify_thresh # used to eliminate unlikely publishers self.use_publisher_filter = config.use_publisher_filter self.publisher_filter = [s.strip().casefold() for s in config.publisher_filter] self.additional_metadata = GenericMetadata() self.output_function = output self.progress_callback: Callable[[int, int, bytes], Any] = lambda *x: ... if on_progress: self.progress_callback = on_progress self.on_rate_limit = on_rate_limit self.search_result = Result.no_matches self.cancel = False self.current_progress = (0, 0) self.match_list: list[IssueResult] = [] def calculate_hash(self, image_data: bytes) -> int: if self.image_hasher == 3: return ImageHasher(data=image_data).p_hash() if self.image_hasher == 2: return -1 # ImageHasher(data=image_data).average_hash2() return ImageHasher(data=image_data).average_hash() def log_msg(self, msg: Any) -> None: msg = str(msg) for handler in logging.getLogger().handlers: handler.flush() self.output(msg) def output(self, *args: Any, file: Any = None, **kwargs: Any) -> None: # We intercept and discard the file argument otherwise everything is passed to self.output_function # Ensure args[0] is defined and is a string for logger.info if not args: log_args: tuple[Any, ...] = ("",) elif isinstance(args[0], str): log_args = (args[0].strip("\n"), *args[1:]) else: log_args = args log_msg = " ".join([str(x) for x in log_args]) # Always send to logger so that we have a record for troubleshooting logger.info(log_msg, **kwargs) # If we are quiet we don't need to call the output function if self.config.quiet: return # default output is stdout self.output_function(*args, **kwargs) def identify(self, ca: ComicArchive, md: GenericMetadata) -> tuple[Result, list[IssueResult]]: if not self._check_requirements(ca): return Result.no_matches, [] terms, images, extra_images = self._get_search_terms(ca, md) # we need, at minimum, a series and issue number if not (terms["series"] and terms["issue_number"]): self.log_msg("Not enough info for a search!") return Result.no_matches, [] self._print_terms(terms, images) issues = self._search_for_issues(terms) self.log_msg(f"Found {len(issues)} series that have an issue #{terms['issue_number']}") final_cover_matching = self._cover_matching(terms, images, extra_images, issues) # One more test for the case choosing limited series first issue vs a trade with the same cover: # if we have a given issue count > 1 and the series from CV has count==1, remove it from match list if len(final_cover_matching) > 1 and terms["issue_count"] is not None and terms["issue_count"] != 1: for match in final_cover_matching.copy(): if match.issue_count == 1: self.log_msg( f"Removing series {match.series} [{match.series_id}] from consideration (only 1 issue)" ) final_cover_matching.remove(match) if final_cover_matching: best_score = final_cover_matching[0].distance else: best_score = 0 if best_score >= self.min_score_thresh: if len(final_cover_matching) == 1: self.log_msg("No matching pages in the issue.") self.log_msg("--------------------------------------------------------------------------") self._print_match(final_cover_matching[0]) self.log_msg("--------------------------------------------------------------------------") search_result = Result.single_bad_cover_score else: self.log_msg("--------------------------------------------------------------------------") self.log_msg("Multiple bad cover matches! Need to use other info...") self.log_msg("--------------------------------------------------------------------------") search_result = Result.multiple_bad_cover_scores else: if len(final_cover_matching) == 1: self.log_msg("--------------------------------------------------------------------------") self._print_match(final_cover_matching[0]) self.log_msg("--------------------------------------------------------------------------") search_result = Result.single_good_match elif len(self.match_list) == 0: self.log_msg("--------------------------------------------------------------------------") self.log_msg("No matches found :(") self.log_msg("--------------------------------------------------------------------------") search_result = Result.no_matches else: # we've got multiple good matches: self.log_msg("More than one likely candidate.") search_result = Result.multiple_good_matches self.log_msg("--------------------------------------------------------------------------") for match_item in final_cover_matching: self._print_match(match_item) self.log_msg("--------------------------------------------------------------------------") return search_result, final_cover_matching def _crop_double_page(self, im: Image.Image) -> Image.Image | None: w, h = im.size try: cropped_im = im.crop((int(w / 2), 0, w, h)) except Exception: logger.exception("cropCover() error") return None return cropped_im # Adapted from https://stackoverflow.com/a/10616717/20629671 def _crop_border(self, im: Image.Image, ratio: int) -> Image.Image | None: assert Image assert ImageChops # RGBA doesn't work???? tmp = im.convert("RGB") bg = Image.new("RGB", tmp.size, "black") diff = ImageChops.difference(tmp, bg) diff = ImageChops.add(diff, diff, 2.0, -100) bbox = diff.getbbox() width_percent = 0 height_percent = 0 # If bbox is None that should mean it's solid black if bbox: width = bbox[2] - bbox[0] height = bbox[3] - bbox[1] # Convert to percent width_percent = int(100 - ((width / im.width) * 100)) height_percent = int(100 - ((height / im.height) * 100)) logger.debug( "Width: %s Height: %s, ratio: %s %s ratio met: %s", im.width, im.height, width_percent, height_percent, width_percent > ratio or height_percent > ratio, ) # If there is a difference return the image otherwise return None if width_percent > ratio or height_percent > ratio: return im.crop(bbox) return None def _get_remote_hashes(self, urls: list[str]) -> list[tuple[str, int]]: remote_hashes: list[tuple[str, int]] = [] for url in urls: try: alt_url_image_data = ImageFetcher(self.config.cache_dir).fetch(url, blocking=True) except ImageFetcherException as e: self.log_msg(f"Network issue while fetching alt. cover image from {self.talker.name}. Aborting...") raise IssueIdentifierNetworkError from e self._user_canceled( functools.partial( self.progress_callback, self.current_progress[0], self.current_progress[1], alt_url_image_data ) ) remote_hashes.append((url, self.calculate_hash(alt_url_image_data))) if self.cancel: raise IssueIdentifierCancelled return remote_hashes def _get_issue_cover_match_score( self, primary_img_url: str, alt_urls: list[str], local_hashes: list[tuple[str, int]], use_alt_urls: bool = False, ) -> Score: # local_hashes is a list of pre-calculated hashes. # use_alt_urls - indicates to use alternate covers from CV # If there is no URL return 100 if not primary_img_url: return Score(score=100, url="", remote_hash=0) # self._user_canceled() urls = [primary_img_url] if use_alt_urls: urls.extend(alt_urls) self.log_msg(f"[{len(alt_urls)} alt. covers]") remote_hashes = self._get_remote_hashes(urls) score_list = [] done = False for local_hash in local_hashes: for remote_hash in remote_hashes: score = ImageHasher.hamming_distance(local_hash[1], remote_hash[1]) score_list.append( Score( score=score, url=remote_hash[0], remote_hash=remote_hash[1], local_hash_name=local_hash[0], local_hash=local_hash[1], ) ) self.log_msg(f" - {score:03}") if score <= self.strong_score_thresh: # such a good score, we can quit now, since for sure we have a winner done = True break if done: break best_score_item = min(score_list, key=lambda x: x["score"]) return best_score_item def _check_requirements(self, ca: ComicArchive) -> bool: if not pil_available: self.log_msg("Python Imaging Library (PIL) is not available and is needed for issue identification.") return False if not ca.seems_to_be_a_comic_archive(): self.log_msg(f"Sorry, but {ca.path} is not a comic archive!") return False return True def _process_cover(self, name: str, image_data: bytes) -> list[tuple[str, Image.Image]]: assert Image cover_image = Image.open(io.BytesIO(image_data)) images = [(name, cover_image)] # check the aspect ratio # if it's wider than it is high, it's probably a two page spread (back_cover, front_cover) # if so, crop it and calculate a second hash aspect_ratio = float(cover_image.height) / float(cover_image.width) if aspect_ratio < 1.0: im = self._crop_double_page(cover_image) if im is not None: images.append(("double page", im)) # Check and remove black borders. Helps in identifying comics with an excessive black border like https://comicvine.gamespot.com/marvel-graphic-novel-1-the-death-of-captain-marvel/4000-21782/ cropped = self._crop_border(cover_image, self.config.border_crop_percent) if cropped is not None: images.append(("black border cropped", cropped)) return images def _get_images(self, ca: ComicArchive, md: GenericMetadata) -> list[tuple[str, Image.Image]]: covers: list[tuple[str, Image.Image]] = [] for cover_index in md.get_cover_page_index_list(): image_data = ca.get_page(cover_index) covers.extend(self._process_cover(f"{cover_index}", image_data)) return covers def _get_extra_images(self, ca: ComicArchive, md: GenericMetadata) -> list[tuple[str, Image.Image]]: assert md covers: list[tuple[str, Image.Image]] = [] for cover_index in range(1, min(3, ca.get_number_of_pages())): image_data = ca.get_page(md.get_archive_page_index(cover_index)) covers.extend(self._process_cover(f"{cover_index}", image_data)) return covers def _get_search_keys(self, md: GenericMetadata) -> Any: search_keys = SearchKeys( series=md.series, issue_number=IssueString(md.issue).as_string(), alternate_number=IssueString(md.alternate_number).as_string(), month=md.month, year=md.year, issue_count=md.issue_count, alternate_count=md.alternate_count, publisher=md.publisher, imprint=md.imprint, ) return search_keys def _get_search_terms( self, ca: ComicArchive, md: GenericMetadata ) -> tuple[SearchKeys, list[tuple[str, Image.Image]], list[tuple[str, Image.Image]]]: return self._get_search_keys(md), self._get_images(ca, md), self._get_extra_images(ca, md) def _user_canceled(self, callback: Callable[[], Any] | None = None) -> Any: if self.cancel: raise IssueIdentifierCancelled if callback is not None: return callback() def _print_terms(self, keys: SearchKeys, images: list[tuple[str, Image.Image]]) -> None: assert keys["series"] assert keys["issue_number"] self.log_msg(f"Using {self.talker.name} to search for:") self.log_msg("\tSeries: " + keys["series"]) self.log_msg("\tIssue: " + keys["issue_number"]) # if keys["alternate_number"] is not None: # self.log_msg("\tAlternate Issue: " + str(keys["alternate_number"])) if keys["month"] is not None: self.log_msg("\tMonth: " + str(keys["month"])) if keys["year"] is not None: self.log_msg("\tYear: " + str(keys["year"])) if keys["issue_count"] is not None: self.log_msg("\tCount: " + str(keys["issue_count"])) # if keys["alternate_count"] is not None: # self.log_msg("\tAlternate Count: " + str(keys["alternate_count"])) # if keys["publisher"] is not None: # self.log_msg("\tPublisher: " + str(keys["publisher"])) # if keys["imprint"] is not None: # self.log_msg("\tImprint: " + str(keys["imprint"])) for name, _ in images: self.log_msg("Cover: " + name) self.log_msg(f"Searching for {keys['series']} #{keys['issue_number']} ...") def _filter_series(self, terms: SearchKeys, search_results: list[ComicSeries]) -> list[ComicSeries]: assert terms["series"] filtered_results = [] for item in search_results: length_approved = False publisher_approved = True date_approved = True # remove any series that starts after the issue year if terms["year"] is not None and item.start_year is not None: if item.start_year > terms["year"] + 1: date_approved = False for name in [item.name, *item.aliases]: if utils.titles_match(terms["series"], name, self.series_match_thresh): length_approved = True break # remove any series from publishers on the filter if self.use_publisher_filter and item.publisher: if item.publisher.casefold() in self.publisher_filter: publisher_approved = False if length_approved and publisher_approved and date_approved: filtered_results.append(item) else: logger.debug( "Filtered out series: '%s' length approved: '%s', publisher approved: '%s', date approved: '%s'", item.name, length_approved, publisher_approved, date_approved, ) return filtered_results def _calculate_hashes(self, images: list[tuple[str, Image.Image]]) -> list[tuple[str, int]]: hashes = [] for name, image in images: hashes.append((name, ImageHasher(image=image).average_hash())) return hashes def _match_covers( self, terms: SearchKeys, images: list[tuple[str, Image.Image]], issues: list[tuple[ComicSeries, GenericMetadata]], use_alternates: bool, ) -> list[IssueResult]: assert terms["issue_number"] match_results: list[IssueResult] = [] hashes = self._calculate_hashes(images) counter = 0 alternate = "" if use_alternates: alternate = " Alternate" for series, issue in issues: self.current_progress = counter, len(issues) self._user_canceled(functools.partial(self.progress_callback, counter, len(issues), b"")) counter += 1 self.log_msg( f"Examining{alternate} covers for Series ID: {series.id} {series.name} ({series.start_year}):", ) try: image_url = issue._cover_image or "" alt_urls = issue._alternate_images score_item = self._get_issue_cover_match_score(image_url, alt_urls, hashes, use_alt_urls=use_alternates) except Exception: logger.exception(f"Scoring series{alternate} covers failed") return [] match = IssueResult( series=f"{series.name} ({series.start_year})", distance=score_item["score"], issue_number=terms["issue_number"], issue_count=series.count_of_issues, url_image_hash=score_item["remote_hash"], issue_title=issue.title or "", issue_id=issue.issue_id or "", series_id=series.id, month=issue.month, year=issue.year, publisher=None, image_url=image_url, alt_image_urls=alt_urls, description=issue.description or "", ) if series.publisher is not None: match.publisher = series.publisher match_results.append(match) self.log_msg(f"best score {match.distance:03}") self.log_msg("") return match_results def _print_match(self, item: IssueResult) -> None: self.log_msg( "-----> {} #{} {} ({}/{}) -- score: {}".format( item.series, item.issue_number, item.issue_title, item.month, item.year, item.distance, ) ) def _search_for_issues(self, terms: SearchKeys) -> list[tuple[ComicSeries, GenericMetadata]]: try: search_results = self.talker.search_for_series( terms["series"], callback=lambda x, y: self._user_canceled(functools.partial(self.progress_callback, x, y, b"")), series_match_thresh=self.config.series_match_search_thresh, on_rate_limit=self.on_rate_limit, ) except TalkerError as e: self.log_msg(f"Error searching for series.\n{e}") return [] # except IssueIdentifierCancelled: # return [] if not search_results: return [] filtered_series = self._filter_series(terms, search_results) if not filtered_series: return [] self.log_msg(f"Searching in {len(filtered_series)} series") self._user_canceled(functools.partial(self.progress_callback, 0, len(filtered_series), b"")) series_by_id = {series.id: series for series in filtered_series} try: talker_result = self.talker.fetch_issues_by_series_issue_num_and_year( list(series_by_id.keys()), terms["issue_number"], terms["year"], on_rate_limit=self.on_rate_limit, ) except TalkerError as e: self.log_msg(f"Issue with while searching for series details. Aborting...\n{e}") return [] # except IssueIdentifierCancelled: # return [] if not talker_result: return [] self._user_canceled(functools.partial(self.progress_callback, 0, 0, b"")) issues: list[tuple[ComicSeries, GenericMetadata]] = [] # now re-associate the issues and series for issue in talker_result: if issue.series_id in series_by_id: issues.append((series_by_id[issue.series_id], issue)) else: logger.warning("Talker '%s' is returning arbitrary series when searching by id", self.talker.id) return issues def _cover_matching( self, terms: SearchKeys, images: list[tuple[str, Image.Image]], extra_images: list[tuple[str, Image.Image]], issues: list[tuple[ComicSeries, GenericMetadata]], ) -> list[IssueResult]: cover_matching_1 = self._match_covers(terms, images, issues, use_alternates=False) if len(cover_matching_1) == 0: self.log_msg(":-( no matches!") return cover_matching_1 # sort list by image match scores cover_matching_1.sort(key=attrgetter("distance")) lst = [] for i in cover_matching_1: lst.append(i.distance) self.log_msg(f"Compared to covers in {len(cover_matching_1)} issue(s): {lst}") cover_matching_2 = [] final_cover_matching = cover_matching_1 if cover_matching_1[0].distance >= self.min_score_thresh: # we have 1 or more low-confidence matches (all bad cover scores) # look at a few more pages in the archive, and also alternate covers online self.log_msg("Very weak scores for the cover. Analyzing alternate pages and covers...") temp = self._match_covers(terms, images + extra_images, issues, use_alternates=True) for score in temp: if score.distance < self.min_alternate_score_thresh: cover_matching_2.append(score) if len(cover_matching_2) > 0: # We did good, found something! self.log_msg("Success in secondary/alternate cover matching!") final_cover_matching = cover_matching_2 # sort new list by image match scores final_cover_matching.sort(key=attrgetter("distance")) self.log_msg("[Second round cover matching: best score = {best_score}]") # now drop down into the rest of the processing best_score = final_cover_matching[0].distance # now pare down list, remove any item more than specified distant from the top scores for match_item in reversed(final_cover_matching): if match_item.distance > (best_score + self.min_score_distance): final_cover_matching.remove(match_item) return final_cover_matching