diff --git a/comictaggerlib/issueidentifier.py b/comictaggerlib/issueidentifier.py index 3beb81f..4f6df1f 100644 --- a/comictaggerlib/issueidentifier.py +++ b/comictaggerlib/issueidentifier.py @@ -24,7 +24,7 @@ from typing import Any, Callable from typing_extensions import NotRequired, TypedDict from comicapi import utils -from comicapi.comicarchive import ComicArchive, metadata_styles +from comicapi.comicarchive import ComicArchive from comicapi.genericmetadata import ComicSeries, GenericMetadata from comicapi.issuestring import IssueString from comictaggerlib.ctsettings import ct_ns @@ -152,29 +152,6 @@ class IssueIdentifier: return ImageHasher(data=image_data).average_hash() - def get_aspect_ratio(self, image_data: bytes) -> float: - try: - im = Image.open(io.BytesIO(image_data)) - w, h = im.size - return float(h) / float(w) - except Exception: - return 1.5 - - def crop_double_page(self, image_data: bytes) -> bytes: - im = Image.open(io.BytesIO(image_data)) - - cropped_im = self._crop_double_page(im) - - if cropped_im is None: - return b"" - - output = io.BytesIO() - cropped_im.convert("RGB").save(output, format="PNG") - cropped_image_data = output.getvalue() - output.close() - - return cropped_image_data - def _crop_double_page(self, im: Image.Image) -> Image.Image | None: w, h = im.size @@ -186,21 +163,6 @@ class IssueIdentifier: return cropped_im - # Adapted from https://stackoverflow.com/a/10616717/20629671 - def crop_border(self, image_data: bytes, ratio: int) -> bytes | None: - im = Image.open(io.BytesIO(image_data)) - - cropped_image = self._crop_border(im, ratio) - - # If there is a difference return the image otherwise return None - if cropped_image is not None: - output = io.BytesIO() - cropped_image.save(output, format="PNG") - cropped_image_data = output.getvalue() - output.close() - return cropped_image_data - return None - # Adapted from https://stackoverflow.com/a/10616717/20629671 def _crop_border(self, im: Image.Image, ratio: int) -> Image.Image | None: assert Image @@ -245,57 +207,6 @@ class IssueIdentifier: def set_cover_url_callback(self, cb_func: Callable[[bytes], None]) -> None: self.cover_url_callback = cb_func - def get_search_keys(self) -> SearchKeys: - ca = self.comic_archive - - search_keys: SearchKeys - if self.only_use_additional_meta_data: - search_keys = SearchKeys( - series=self.additional_metadata.series, - issue_number=self.additional_metadata.issue, - year=self.additional_metadata.year, - month=self.additional_metadata.month, - issue_count=self.additional_metadata.issue_count, - ) - return search_keys - - # see if the archive has any useful meta data for searching with - try: - for style in metadata_styles: - internal_metadata = ca.read_metadata(style) - if not internal_metadata.is_empty: - break - except Exception as e: - internal_metadata = GenericMetadata() - logger.error("Failed to load metadata for %s: %s", ca.path, e) - - # try to get some metadata from filename - md_from_filename = ca.metadata_from_filename( - self.config.Filename_Parsing__complicated_parser, - self.config.Filename_Parsing__remove_c2c, - self.config.Filename_Parsing__remove_fcbd, - self.config.Filename_Parsing__remove_publisher, - ) - - working_md = md_from_filename.copy() - - working_md.overlay(internal_metadata) - working_md.overlay(self.additional_metadata) - - # preference order: - # 1. Additional metadata - # 1. Internal metadata - # 1. Filename metadata - search_keys = SearchKeys( - series=working_md.series, - issue_number=working_md.issue, - year=working_md.year, - month=working_md.month, - issue_count=working_md.issue_count, - ) - - return search_keys - def log_msg(self, msg: Any) -> None: msg = str(msg) for handler in logging.getLogger().handlers: @@ -324,86 +235,6 @@ class IssueIdentifier: # default output is stdout self.output_function(*args, **kwargs) - def get_issue_cover_match_score( - self, - primary_img_url: str, - alt_urls: list[str], - local_cover_hash_list: list[int], - use_remote_alternates: bool = False, - ) -> Score: - # local_cover_hash_list is a list of pre-calculated hashes. - # use_remote_alternates - indicates to use alternate covers from CV - - # If there is no URL return 100 - if not primary_img_url: - return Score(score=100, url="", remote_hash=0) - - try: - url_image_data = ImageFetcher(self.config.Runtime_Options__config.user_cache_dir).fetch( - primary_img_url, blocking=True - ) - except ImageFetcherException as e: - self.log_msg(f"Network issue while fetching cover image from {self.talker.name}. Aborting...") - raise IssueIdentifierNetworkError from e - - if self.cancel: - raise IssueIdentifierCancelled - - # alert the GUI, if needed - if self.cover_url_callback is not None: - self.cover_url_callback(url_image_data) - - remote_cover_list = [Score(url=primary_img_url, remote_hash=self.calculate_hash(url_image_data))] - - if self.cancel: - raise IssueIdentifierCancelled - - if use_remote_alternates: - for alt_url in alt_urls: - try: - alt_url_image_data = ImageFetcher(self.config.Runtime_Options__config.user_cache_dir).fetch( - alt_url, blocking=True - ) - except ImageFetcherException as e: - self.log_msg(f"Network issue while fetching alt. cover image from {self.talker.name}. Aborting...") - raise IssueIdentifierNetworkError from e - - if self.cancel: - raise IssueIdentifierCancelled - - # alert the GUI, if needed - if self.cover_url_callback is not None: - self.cover_url_callback(alt_url_image_data) - - remote_cover_list.append(Score(url=alt_url, remote_hash=self.calculate_hash(alt_url_image_data))) - - if self.cancel: - raise IssueIdentifierCancelled - - self.log_msg(f"[{len(remote_cover_list) - 1} alt. covers]") - - score_list = [] - done = False - for local_cover_hash in local_cover_hash_list: - for remote_cover_item in remote_cover_list: - score = ImageHasher.hamming_distance(local_cover_hash, remote_cover_item["remote_hash"]) - score_list.append( - Score(score=score, url=remote_cover_item["url"], remote_hash=remote_cover_item["remote_hash"]) - ) - - self.log_msg(f" - {score:03}") - - if score <= self.strong_score_thresh: - # such a good score, we can quit now, since for sure we have a winner - done = True - break - if done: - break - - best_score_item = min(score_list, key=lambda x: x["score"]) - - return best_score_item - def _get_remote_hashes(self, urls: list[str]) -> list[tuple[str, int]]: remote_hashes: list[tuple[str, int]] = [] for url in urls: @@ -847,317 +678,3 @@ class IssueIdentifier: self.log_msg("--------------------------------------------------------------------------") return search_result, final_cover_matching - - def search(self) -> list[IssueResult]: - ca = self.comic_archive - self.match_list = [] - self.cancel = False - self.search_result = self.result_no_matches - - if not pil_available: - self.log_msg("Python Imaging Library (PIL) is not available and is needed for issue identification.") - return self.match_list - - if not ca.seems_to_be_a_comic_archive(): - self.log_msg(f"Sorry, but {ca.path} is not a comic archive!") - return self.match_list - - cover_image_data = ca.get_page(self.cover_page_index) - cover_hash = self.calculate_hash(cover_image_data) - - # check the aspect ratio - # if it's wider than it is high, it's probably a two page spread - # if so, crop it and calculate a second hash - narrow_cover_hash = None - aspect_ratio = self.get_aspect_ratio(cover_image_data) - if aspect_ratio < 1.0: - right_side_image_data = self.crop_double_page(cover_image_data) - if right_side_image_data: - narrow_cover_hash = self.calculate_hash(right_side_image_data) - - keys = self.get_search_keys() - # normalize the issue number, None will return as "" - keys["issue_number"] = IssueString(keys["issue_number"]).as_string() - - # we need, at minimum, a series and issue number - if not (keys["series"] and keys["issue_number"]): - self.log_msg("Not enough info for a search!") - return [] - - self.log_msg(f"Using {self.talker.name} to search for:") - self.log_msg("\tSeries: " + keys["series"]) - self.log_msg("\tIssue: " + keys["issue_number"]) - if keys["issue_count"] is not None: - self.log_msg("\tCount: " + str(keys["issue_count"])) - if keys["year"] is not None: - self.log_msg("\tYear: " + str(keys["year"])) - if keys["month"] is not None: - self.log_msg("\tMonth: " + str(keys["month"])) - - self.log_msg(f"Searching for {keys['series']} #{keys['issue_number']} ...") - try: - ct_search_results = self.talker.search_for_series(keys["series"]) - except TalkerError as e: - self.log_msg(f"Error searching for series.\n{e}") - return [] - - if self.cancel: - return [] - - if ct_search_results is None: - return [] - - series_second_round_list = [] - - for item in ct_search_results: - length_approved = False - publisher_approved = True - date_approved = True - - # remove any series that starts after the issue year - if keys["year"] is not None and item.start_year is not None: - if keys["year"] < item.start_year: - date_approved = False - - for name in [item.name, *item.aliases]: - if utils.titles_match(keys["series"], name, self.series_match_thresh): - length_approved = True - break - # remove any series from publishers on the filter - if item.publisher is not None: - publisher = item.publisher - if publisher is not None and publisher.casefold() in self.publisher_filter: - publisher_approved = False - - if length_approved and publisher_approved and date_approved: - series_second_round_list.append(item) - - self.log_msg("Searching in " + str(len(series_second_round_list)) + " series") - - if self.progress_callback is not None: - self.progress_callback(0, len(series_second_round_list)) - - # now sort the list by name length - series_second_round_list.sort(key=lambda x: len(x.name), reverse=False) - - series_by_id = {series.id: series for series in series_second_round_list} - - issue_list = None - try: - if len(series_by_id) > 0: - issue_list = self.talker.fetch_issues_by_series_issue_num_and_year( - list(series_by_id.keys()), keys["issue_number"], keys["year"] - ) - except TalkerError as e: - self.log_msg(f"Issue with while searching for series details. Aborting...\n{e}") - return [] - - if issue_list is None: - return [] - - shortlist = [] - # now re-associate the issues and series - # is this really needed? - for issue in issue_list: - if issue.series_id in series_by_id: - shortlist.append((series_by_id[issue.series_id], issue)) - - if keys["year"] is None: - self.log_msg(f"Found {len(shortlist)} series that have an issue #{keys['issue_number']}") - else: - self.log_msg( - f"Found {len(shortlist)} series that have an issue #{keys['issue_number']} from {keys['year']}" - ) - - # now we have a shortlist of series with the desired issue number - # Do first round of cover matching - counter = len(shortlist) - for series, issue in shortlist: - if self.progress_callback is not None: - self.progress_callback(counter, len(shortlist) * 3) - counter += 1 - - self.log_msg( - f"Examining covers for ID: {series.id} {series.name} ({series.start_year}):", - ) - - # Now check the cover match against the primary image - hash_list = [cover_hash] - if narrow_cover_hash is not None: - hash_list.append(narrow_cover_hash) - - cropped_border = self.crop_border(cover_image_data, self.config.Issue_Identifier__border_crop_percent) - if cropped_border is not None: - hash_list.append(self.calculate_hash(cropped_border)) - logger.info("Adding cropped cover to the hashlist") - - try: - image_url = issue._cover_image or "" - alt_urls = issue._alternate_images - - score_item = self.get_issue_cover_match_score( - image_url, alt_urls, hash_list, use_remote_alternates=False - ) - except Exception: - logger.exception("Scoring series failed") - self.match_list = [] - return self.match_list - - match = IssueResult( - series=f"{series.name} ({series.start_year})", - distance=score_item["score"], - issue_number=keys["issue_number"], - issue_count=series.count_of_issues, - url_image_hash=score_item["remote_hash"], - issue_title=issue.title or "", - issue_id=issue.issue_id or "", - series_id=series.id, - month=issue.month, - year=issue.year, - publisher=None, - image_url=image_url, - alt_image_urls=alt_urls, - description=issue.description or "", - ) - if series.publisher is not None: - match.publisher = series.publisher - - self.match_list.append(match) - - self.log_msg(f"best score {match.distance:03}") - - self.log_msg("") - - if len(self.match_list) == 0: - self.log_msg(":-( no matches!") - self.search_result = self.result_no_matches - return self.match_list - - # sort list by image match scores - self.match_list.sort(key=attrgetter("distance")) - - lst = [] - for i in self.match_list: - lst.append(i.distance) - - self.log_msg(f"Compared to covers in {len(self.match_list)} issue(s): {lst}") - - def print_match(item: IssueResult) -> None: - self.log_msg( - "-----> {} #{} {} ({}/{}) -- score: {}".format( - item.series, - item.issue_number, - item.issue_title, - item.month, - item.year, - item.distance, - ) - ) - - best_score: int = self.match_list[0].distance - - if best_score >= self.min_score_thresh: - # we have 1 or more low-confidence matches (all bad cover scores) - # look at a few more pages in the archive, and also alternate covers online - self.log_msg("Very weak scores for the cover. Analyzing alternate pages and covers...") - hash_list = [cover_hash] - if narrow_cover_hash is not None: - hash_list.append(narrow_cover_hash) - for page_index in range(1, min(3, ca.get_number_of_pages())): - image_data = ca.get_page(page_index) - page_hash = self.calculate_hash(image_data) - hash_list.append(page_hash) - - second_match_list = [] - counter = 2 * len(self.match_list) - for m in self.match_list: - if self.progress_callback is not None: - self.progress_callback(counter, len(self.match_list) * 3) - counter += 1 - self.log_msg(f"Examining alternate covers for ID: {m.series_id} {m.series}:") - try: - score_item = self.get_issue_cover_match_score( - m.image_url, - m.alt_image_urls, - hash_list, - use_remote_alternates=True, - ) - except Exception: - logger.exception("failed examining alt covers") - self.match_list = [] - return self.match_list - self.log_msg(f"--->{score_item['score']}") - self.log_msg("") - - if score_item["score"] < self.min_alternate_score_thresh: - second_match_list.append(m) - m.distance = score_item["score"] - - if len(second_match_list) == 0: - if len(self.match_list) == 1: - self.log_msg("No matching pages in the issue.") - self.log_msg("--------------------------------------------------------------------------") - print_match(self.match_list[0]) - self.log_msg("--------------------------------------------------------------------------") - self.search_result = self.result_found_match_but_bad_cover_score - else: - self.log_msg("--------------------------------------------------------------------------") - self.log_msg("Multiple bad cover matches! Need to use other info...") - self.log_msg("--------------------------------------------------------------------------") - self.search_result = self.result_multiple_matches_with_bad_image_scores - return self.match_list - - # We did good, found something! - self.log_msg("Success in secondary/alternate cover matching!") - - self.match_list = second_match_list - # sort new list by image match scores - self.match_list.sort(key=attrgetter("distance")) - best_score = self.match_list[0].distance - self.log_msg("[Second round cover matching: best score = {best_score}]") - # now drop down into the rest of the processing - - if self.progress_callback is not None: - self.progress_callback(99, 100) - - # now pare down list, remove any item more than specified distant from the top scores - for match_item in reversed(self.match_list): - if match_item.distance > best_score + self.min_score_distance: - self.match_list.remove(match_item) - - # One more test for the case choosing limited series first issue vs a trade with the same cover: - # if we have a given issue count > 1 and the series from CV has count==1, remove it from match list - if len(self.match_list) >= 2 and keys["issue_count"] is not None and keys["issue_count"] != 1: - new_list = [] - for match in self.match_list: - if match.issue_count != 1: - new_list.append(match) - else: - self.log_msg( - f"Removing series {match.series} [{match.series_id}] from consideration (only 1 issue)" - ) - - if len(new_list) > 0: - self.match_list = new_list - - if len(self.match_list) == 1: - self.log_msg("--------------------------------------------------------------------------") - print_match(self.match_list[0]) - self.log_msg("--------------------------------------------------------------------------") - self.search_result = self.result_one_good_match - - elif len(self.match_list) == 0: - self.log_msg("--------------------------------------------------------------------------") - self.log_msg("No matches found :(") - self.log_msg("--------------------------------------------------------------------------") - self.search_result = self.result_no_matches - else: - # we've got multiple good matches: - self.log_msg("More than one likely candidate.") - self.search_result = self.result_multiple_good_matches - self.log_msg("--------------------------------------------------------------------------") - for match_item in self.match_list: - print_match(match_item) - self.log_msg("--------------------------------------------------------------------------") - - return self.match_list