From 4e757310241b6a8bff86bbf5efec6e13a7a35c47 Mon Sep 17 00:00:00 2001 From: Timmy Welch Date: Fri, 23 Feb 2024 20:47:04 -0800 Subject: [PATCH 1/5] Re-write IssueIdentifier.search as IssueIdentifier.identify --- comictaggerlib/cli.py | 22 +- comictaggerlib/imagehasher.py | 10 +- comictaggerlib/issueidentifier.py | 531 ++++++++++++++++++++++-- comictaggerlib/resulttypes.py | 2 +- comictaggerlib/seriesselectionwindow.py | 46 +- comictaggerlib/taggerwindow.py | 10 +- 6 files changed, 551 insertions(+), 70 deletions(-) diff --git a/comictaggerlib/cli.py b/comictaggerlib/cli.py index 3633a91..f718159 100644 --- a/comictaggerlib/cli.py +++ b/comictaggerlib/cli.py @@ -459,31 +459,29 @@ class CLI: self.output(text) # use our overlaid MD struct to search - ii.set_additional_metadata(md) - ii.only_use_additional_meta_data = True + # ii.set_additional_metadata(md) + # ii.only_use_additional_meta_data = True ii.set_output_function(functools.partial(self.output, already_logged=True)) - ii.cover_page_index = md.get_cover_page_index_list()[0] - matches = ii.search() - - result = ii.search_result + # ii.cover_page_index = md.get_cover_page_index_list()[0] + result, matches = ii.identify(ca, md) found_match = False choices = False low_confidence = False - if result == ii.result_no_matches: + if result == IssueIdentifier.result_no_matches: pass - elif result == ii.result_found_match_but_bad_cover_score: + elif result == IssueIdentifier.result_found_match_but_bad_cover_score: low_confidence = True found_match = True - elif result == ii.result_found_match_but_not_first_page: + elif result == IssueIdentifier.result_found_match_but_not_first_page: found_match = True - elif result == ii.result_multiple_matches_with_bad_image_scores: + elif result == IssueIdentifier.result_multiple_matches_with_bad_image_scores: low_confidence = True choices = True - elif result == ii.result_one_good_match: + elif result == IssueIdentifier.result_one_good_match: found_match = True - elif result == ii.result_multiple_good_matches: + elif result == IssueIdentifier.result_multiple_good_matches: choices = True if choices: diff --git a/comictaggerlib/imagehasher.py b/comictaggerlib/imagehasher.py index a70f380..562a724 100644 --- a/comictaggerlib/imagehasher.py +++ b/comictaggerlib/imagehasher.py @@ -34,13 +34,19 @@ logger = logging.getLogger(__name__) class ImageHasher: - def __init__(self, path: str | None = None, data: bytes = b"", width: int = 8, height: int = 8) -> None: + def __init__( + self, path: str | None = None, image: Image | None = None, data: bytes = b"", width: int = 8, height: int = 8 + ) -> None: self.width = width self.height = height - if path is None and not data: + if path is None and not data and not image: raise OSError + if image is not None: + self.image = image + return + try: if path is not None: self.image = Image.open(path) diff --git a/comictaggerlib/issueidentifier.py b/comictaggerlib/issueidentifier.py index 181ef4e..3beb81f 100644 --- a/comictaggerlib/issueidentifier.py +++ b/comictaggerlib/issueidentifier.py @@ -25,7 +25,7 @@ from typing_extensions import NotRequired, TypedDict from comicapi import utils from comicapi.comicarchive import ComicArchive, metadata_styles -from comicapi.genericmetadata import GenericMetadata +from comicapi.genericmetadata import ComicSeries, GenericMetadata from comicapi.issuestring import IssueString from comictaggerlib.ctsettings import ct_ns from comictaggerlib.imagefetcher import ImageFetcher, ImageFetcherException @@ -44,17 +44,23 @@ except ImportError: class SearchKeys(TypedDict): - series: str | None - issue_number: str | None + series: str + issue_number: str + alternate_number: str | None month: int | None year: int | None issue_count: int | None + alternate_count: int | None + publisher: str | None + imprint: str | None class Score(TypedDict): score: NotRequired[int] url: str - hash: int + remote_hash: int + local_hash_name: str + local_hash: int class IssueIdentifierNetworkError(Exception): ... @@ -71,10 +77,17 @@ class IssueIdentifier: result_one_good_match = 4 result_multiple_good_matches = 5 - def __init__(self, comic_archive: ComicArchive, config: ct_ns, talker: ComicTalker) -> None: + def __init__( + self, + comic_archive: ComicArchive, + config: ct_ns, + talker: ComicTalker, + metadata: GenericMetadata = GenericMetadata(), + ) -> None: self.config = config self.talker = talker self.comic_archive: ComicArchive = comic_archive + self.md = metadata self.image_hasher = 1 self.only_use_additional_meta_data = False @@ -147,14 +160,12 @@ class IssueIdentifier: except Exception: return 1.5 - def crop_cover(self, image_data: bytes) -> bytes: + def crop_double_page(self, image_data: bytes) -> bytes: im = Image.open(io.BytesIO(image_data)) - w, h = im.size - try: - cropped_im = im.crop((int(w / 2), 0, w, h)) - except Exception: - logger.exception("cropCover() error") + cropped_im = self._crop_double_page(im) + + if cropped_im is None: return b"" output = io.BytesIO() @@ -164,10 +175,36 @@ class IssueIdentifier: return cropped_image_data + def _crop_double_page(self, im: Image.Image) -> Image.Image | None: + w, h = im.size + + try: + cropped_im = im.crop((int(w / 2), 0, w, h)) + except Exception: + logger.exception("cropCover() error") + return None + + return cropped_im + # Adapted from https://stackoverflow.com/a/10616717/20629671 def crop_border(self, image_data: bytes, ratio: int) -> bytes | None: im = Image.open(io.BytesIO(image_data)) + cropped_image = self._crop_border(im, ratio) + + # If there is a difference return the image otherwise return None + if cropped_image is not None: + output = io.BytesIO() + cropped_image.save(output, format="PNG") + cropped_image_data = output.getvalue() + output.close() + return cropped_image_data + return None + + # Adapted from https://stackoverflow.com/a/10616717/20629671 + def _crop_border(self, im: Image.Image, ratio: int) -> Image.Image | None: + assert Image + assert ImageChops # RGBA doesn't work???? tmp = im.convert("RGB") @@ -199,11 +236,7 @@ class IssueIdentifier: # If there is a difference return the image otherwise return None if width_percent > ratio or height_percent > ratio: - output = io.BytesIO() - im.crop(bbox).save(output, format="PNG") - cropped_image_data = output.getvalue() - output.close() - return cropped_image_data + return im.crop(bbox) return None def set_progress_callback(self, cb_func: Callable[[int, int], None]) -> None: @@ -303,7 +336,7 @@ class IssueIdentifier: # If there is no URL return 100 if not primary_img_url: - return Score(score=100, url="", hash=0) + return Score(score=100, url="", remote_hash=0) try: url_image_data = ImageFetcher(self.config.Runtime_Options__config.user_cache_dir).fetch( @@ -320,7 +353,7 @@ class IssueIdentifier: if self.cover_url_callback is not None: self.cover_url_callback(url_image_data) - remote_cover_list = [Score(url=primary_img_url, hash=self.calculate_hash(url_image_data))] + remote_cover_list = [Score(url=primary_img_url, remote_hash=self.calculate_hash(url_image_data))] if self.cancel: raise IssueIdentifierCancelled @@ -342,7 +375,7 @@ class IssueIdentifier: if self.cover_url_callback is not None: self.cover_url_callback(alt_url_image_data) - remote_cover_list.append(Score(url=alt_url, hash=self.calculate_hash(alt_url_image_data))) + remote_cover_list.append(Score(url=alt_url, remote_hash=self.calculate_hash(alt_url_image_data))) if self.cancel: raise IssueIdentifierCancelled @@ -353,8 +386,10 @@ class IssueIdentifier: done = False for local_cover_hash in local_cover_hash_list: for remote_cover_item in remote_cover_list: - score = ImageHasher.hamming_distance(local_cover_hash, remote_cover_item["hash"]) - score_list.append(Score(score=score, url=remote_cover_item["url"], hash=remote_cover_item["hash"])) + score = ImageHasher.hamming_distance(local_cover_hash, remote_cover_item["remote_hash"]) + score_list.append( + Score(score=score, url=remote_cover_item["url"], remote_hash=remote_cover_item["remote_hash"]) + ) self.log_msg(f" - {score:03}") @@ -369,6 +404,450 @@ class IssueIdentifier: return best_score_item + def _get_remote_hashes(self, urls: list[str]) -> list[tuple[str, int]]: + remote_hashes: list[tuple[str, int]] = [] + for url in urls: + try: + alt_url_image_data = ImageFetcher(self.config.Runtime_Options__config.user_cache_dir).fetch( + url, blocking=True + ) + except ImageFetcherException as e: + self.log_msg(f"Network issue while fetching alt. cover image from {self.talker.name}. Aborting...") + raise IssueIdentifierNetworkError from e + + self._user_canceled(self.cover_url_callback, alt_url_image_data) + + remote_hashes.append((url, self.calculate_hash(alt_url_image_data))) + + if self.cancel: + raise IssueIdentifierCancelled + return remote_hashes + + def _get_issue_cover_match_score( + self, + primary_img_url: str, + alt_urls: list[str], + local_hashes: list[tuple[str, int]], + use_alt_urls: bool = False, + ) -> Score: + # local_hashes is a list of pre-calculated hashes. + # use_alt_urls - indicates to use alternate covers from CV + + # If there is no URL return 100 + if not primary_img_url: + return Score(score=100, url="", remote_hash=0) + + self._user_canceled() + + urls = [primary_img_url] + if use_alt_urls: + urls.extend(alt_urls) + self.log_msg(f"[{len(alt_urls)} alt. covers]") + + remote_hashes = self._get_remote_hashes(urls) + + score_list = [] + done = False + for local_hash in local_hashes: + for remote_hash in remote_hashes: + score = ImageHasher.hamming_distance(local_hash[1], remote_hash[1]) + score_list.append( + Score( + score=score, + url=remote_hash[0], + remote_hash=remote_hash[1], + local_hash_name=local_hash[0], + local_hash=local_hash[1], + ) + ) + + self.log_msg(f" - {score:03}") + + if score <= self.strong_score_thresh: + # such a good score, we can quit now, since for sure we have a winner + done = True + break + if done: + break + + best_score_item = min(score_list, key=lambda x: x["score"]) + + return best_score_item + + def _check_requirements(self, ca: ComicArchive) -> bool: + + if not pil_available: + self.log_msg("Python Imaging Library (PIL) is not available and is needed for issue identification.") + return False + + if not ca.seems_to_be_a_comic_archive(): + self.log_msg(f"Sorry, but {ca.path} is not a comic archive!") + return False + return True + + def _process_cover(self, name: str, image_data: bytes) -> list[tuple[str, Image.Image]]: + assert Image + cover_image = Image.open(io.BytesIO(image_data)) + images = [(name, cover_image)] + + # check the aspect ratio + # if it's wider than it is high, it's probably a two page spread (back_cover, front_cover) + # if so, crop it and calculate a second hash + aspect_ratio = float(cover_image.height) / float(cover_image.width) + if aspect_ratio < 1.0: + im = self._crop_double_page(cover_image) + if im is not None: + images.append(("double page", im)) + + # Check and remove black borders. Helps in identifying comics with an excessive black border like https://comicvine.gamespot.com/marvel-graphic-novel-1-the-death-of-captain-marvel/4000-21782/ + cropped = self._crop_border(cover_image, self.config.Issue_Identifier__border_crop_percent) + if cropped is not None: + images.append(("black border cropped", cropped)) + + return images + + def _get_images(self, ca: ComicArchive, md: GenericMetadata) -> list[tuple[str, Image.Image]]: + covers: list[tuple[str, Image.Image]] = [] + for cover_index in md.get_cover_page_index_list(): + image_data = ca.get_page(cover_index) + covers.extend(self._process_cover(f"{cover_index}", image_data)) + return covers + + def _get_extra_images(self, ca: ComicArchive, md: GenericMetadata) -> list[tuple[str, Image.Image]]: + assert md + covers: list[tuple[str, Image.Image]] = [] + for cover_index in range(1, min(3, ca.get_number_of_pages())): + image_data = ca.get_page(cover_index) + covers.extend(self._process_cover(f"{cover_index}", image_data)) + return covers + + def _get_search_keys(self, md: GenericMetadata) -> Any: + search_keys = SearchKeys( + series=md.series, + issue_number=IssueString(md.issue).as_string(), + alternate_number=IssueString(md.alternate_number).as_string(), + month=md.month, + year=md.year, + issue_count=md.issue_count, + alternate_count=md.alternate_count, + publisher=md.publisher, + imprint=md.imprint, + ) + return search_keys + + def _get_search_terms( + self, ca: ComicArchive, md: GenericMetadata + ) -> tuple[SearchKeys, list[tuple[str, Image.Image]], list[tuple[str, Image.Image]]]: + return self._get_search_keys(md), self._get_images(ca, md), self._get_extra_images(ca, md) + + def _user_canceled(self, callback: Callable[..., Any] | None = None, *args: Any) -> Any: + if self.cancel: + raise IssueIdentifierCancelled + if callback is not None: + return callback(*args) + + def _print_terms(self, keys: SearchKeys, images: list[tuple[str, Image.Image]]) -> None: + assert keys["series"] + assert keys["issue_number"] + self.log_msg(f"Using {self.talker.name} to search for:") + self.log_msg("\tSeries: " + keys["series"]) + self.log_msg("\tIssue: " + keys["issue_number"]) + # if keys["alternate_number"] is not None: + # self.log_msg("\tAlternate Issue: " + str(keys["alternate_number"])) + if keys["month"] is not None: + self.log_msg("\tMonth: " + str(keys["month"])) + if keys["year"] is not None: + self.log_msg("\tYear: " + str(keys["year"])) + if keys["issue_count"] is not None: + self.log_msg("\tCount: " + str(keys["issue_count"])) + # if keys["alternate_count"] is not None: + # self.log_msg("\tAlternate Count: " + str(keys["alternate_count"])) + # if keys["publisher"] is not None: + # self.log_msg("\tPublisher: " + str(keys["publisher"])) + # if keys["imprint"] is not None: + # self.log_msg("\tImprint: " + str(keys["imprint"])) + for name, _ in images: + self.log_msg("Cover: " + name) + + self.log_msg(f"Searching for {keys['series']} #{keys['issue_number']} ...") + + def _filter_series(self, terms: SearchKeys, search_results: list[ComicSeries]) -> list[ComicSeries]: + assert terms["series"] + + filtered_results = [] + for item in search_results: + length_approved = False + publisher_approved = True + date_approved = True + + # remove any series that starts after the issue year + if terms["year"] is not None and item.start_year is not None: + if terms["year"] < item.start_year: + date_approved = False + + for name in [item.name, *item.aliases]: + if utils.titles_match(terms["series"], name, self.series_match_thresh): + length_approved = True + break + # remove any series from publishers on the filter + if item.publisher is not None: + if item.publisher is not None and item.publisher.casefold() in self.publisher_filter: + publisher_approved = False + + if length_approved and publisher_approved and date_approved: + filtered_results.append(item) + else: + logger.debug( + "Filtered out series: '%s' length approved: '%s', publisher approved: '%s', date approved: '%s'", + item.name, + length_approved, + publisher_approved, + date_approved, + ) + return filtered_results + + def _calculate_hashes(self, images: list[tuple[str, Image.Image]]) -> list[tuple[str, int]]: + hashes = [] + for name, image in images: + hashes.append((name, ImageHasher(image=image).average_hash())) + return hashes + + def _match_covers( + self, + terms: SearchKeys, + images: list[tuple[str, Image.Image]], + issues: list[tuple[ComicSeries, GenericMetadata]], + use_alternates: bool, + ) -> list[IssueResult]: + assert terms["issue_number"] + match_results: list[IssueResult] = [] + hashes = self._calculate_hashes(images) + counter = 0 + alternate = "" + if use_alternates: + alternate = " Alternate" + for series, issue in issues: + self._user_canceled(self.progress_callback, counter, len(issues)) + counter += 1 + + self.log_msg( + f"Examining{alternate} covers for Series ID: {series.id} {series.name} ({series.start_year}):", + ) + + try: + image_url = issue._cover_image or "" + alt_urls = issue._alternate_images + + score_item = self._get_issue_cover_match_score(image_url, alt_urls, hashes, use_alt_urls=use_alternates) + except Exception: + logger.exception(f"Scoring series{alternate} covers failed") + return [] + + match = IssueResult( + series=f"{series.name} ({series.start_year})", + distance=score_item["score"], + issue_number=terms["issue_number"], + issue_count=series.count_of_issues, + url_image_hash=score_item["remote_hash"], + issue_title=issue.title or "", + issue_id=issue.issue_id or "", + series_id=series.id, + month=issue.month, + year=issue.year, + publisher=None, + image_url=image_url, + alt_image_urls=alt_urls, + description=issue.description or "", + ) + if series.publisher is not None: + match.publisher = series.publisher + + match_results.append(match) + + self.log_msg(f"best score {match.distance:03}") + + self.log_msg("") + return match_results + + def _print_match(self, item: IssueResult) -> None: + self.log_msg( + "-----> {} #{} {} ({}/{}) -- score: {}".format( + item.series, + item.issue_number, + item.issue_title, + item.month, + item.year, + item.distance, + ) + ) + + def _search_for_issues(self, terms: SearchKeys) -> list[tuple[ComicSeries, GenericMetadata]]: + try: + search_results = self.talker.search_for_series( + terms["series"], + callback=lambda x, y: self._user_canceled(self.progress_callback, x, y), + series_match_thresh=self.config.Issue_Identifier__series_match_search_thresh, + ) + except TalkerError as e: + self.log_msg(f"Error searching for series.\n{e}") + return [] + # except IssueIdentifierCancelled: + # return [] + + if not search_results: + return [] + + filtered_series = self._filter_series(terms, search_results) + if not filtered_series: + return [] + + self.log_msg(f"Searching in {len(filtered_series)} series") + + self._user_canceled(self.progress_callback, 0, len(filtered_series)) + + series_by_id = {series.id: series for series in filtered_series} + + try: + talker_result = self.talker.fetch_issues_by_series_issue_num_and_year( + list(series_by_id.keys()), terms["issue_number"], terms["year"] + ) + except TalkerError as e: + self.log_msg(f"Issue with while searching for series details. Aborting...\n{e}") + return [] + # except IssueIdentifierCancelled: + # return [] + + if not talker_result: + return [] + + self._user_canceled(self.progress_callback, 0, 0) + + issues: list[tuple[ComicSeries, GenericMetadata]] = [] + + # now re-associate the issues and series + for issue in talker_result: + if issue.series_id in series_by_id: + issues.append((series_by_id[issue.series_id], issue)) + else: + logger.warning("Talker '%s' is returning arbitrary series when searching by id", self.talker.id) + return issues + + def _cover_matching( + self, + terms: SearchKeys, + images: list[tuple[str, Image.Image]], + extra_images: list[tuple[str, Image.Image]], + issues: list[tuple[ComicSeries, GenericMetadata]], + ) -> list[IssueResult]: + cover_matching_1 = self._match_covers(terms, images, issues, use_alternates=False) + + if len(cover_matching_1) == 0: + self.log_msg(":-( no matches!") + self.search_result = self.result_no_matches + return cover_matching_1 + + # sort list by image match scores + cover_matching_1.sort(key=attrgetter("distance")) + + lst = [] + for i in cover_matching_1: + lst.append(i.distance) + + self.log_msg(f"Compared to covers in {len(cover_matching_1)} issue(s): {lst}") + + cover_matching_2 = [] + final_cover_matching = cover_matching_1 + if cover_matching_1[0].distance >= self.min_score_thresh: + # we have 1 or more low-confidence matches (all bad cover scores) + # look at a few more pages in the archive, and also alternate covers online + self.log_msg("Very weak scores for the cover. Analyzing alternate pages and covers...") + + temp = self._match_covers(terms, images + extra_images, issues, use_alternates=True) + for score in temp: + if score.distance < self.min_alternate_score_thresh: + cover_matching_2.append(score) + + if len(cover_matching_2) == 0: + if len(cover_matching_1) == 1: + self.log_msg("No matching pages in the issue.") + self.log_msg("--------------------------------------------------------------------------") + self._print_match(cover_matching_1[0]) + self.log_msg("--------------------------------------------------------------------------") + self.search_result = self.result_found_match_but_bad_cover_score + else: + self.log_msg("--------------------------------------------------------------------------") + self.log_msg("Multiple bad cover matches! Need to use other info...") + self.log_msg("--------------------------------------------------------------------------") + self.search_result = self.result_multiple_matches_with_bad_image_scores + return cover_matching_1 + + # We did good, found something! + self.log_msg("Success in secondary/alternate cover matching!") + + final_cover_matching = cover_matching_2 + # sort new list by image match scores + final_cover_matching.sort(key=attrgetter("distance")) + self.log_msg("[Second round cover matching: best score = {best_score}]") + # now drop down into the rest of the processing + + best_score = final_cover_matching[0].distance + # now pare down list, remove any item more than specified distant from the top scores + for match_item in reversed(final_cover_matching): + if match_item.distance > (best_score + self.min_score_distance): + final_cover_matching.remove(match_item) + return final_cover_matching + + def identify(self, ca: ComicArchive, md: GenericMetadata) -> tuple[int, list[IssueResult]]: + if not self._check_requirements(ca): + return self.result_no_matches, [] + + terms, images, extra_images = self._get_search_terms(ca, md) + + # we need, at minimum, a series and issue number + if not (terms["series"] and terms["issue_number"]): + self.log_msg("Not enough info for a search!") + return self.result_no_matches, [] + + self._print_terms(terms, images) + + issues = self._search_for_issues(terms) + + self.log_msg(f"Found {len(issues)} series that have an issue #{terms['issue_number']}") + + final_cover_matching = self._cover_matching(terms, images, extra_images, issues) + + # One more test for the case choosing limited series first issue vs a trade with the same cover: + # if we have a given issue count > 1 and the series from CV has count==1, remove it from match list + if len(final_cover_matching) > 1 and terms["issue_count"] is not None and terms["issue_count"] != 1: + for match in final_cover_matching.copy(): + if match.issue_count == 1: + self.log_msg( + f"Removing series {match.series} [{match.series_id}] from consideration (only 1 issue)" + ) + final_cover_matching.remove(match) + + if len(final_cover_matching) == 1: + self.log_msg("--------------------------------------------------------------------------") + self._print_match(final_cover_matching[0]) + self.log_msg("--------------------------------------------------------------------------") + search_result = self.result_one_good_match + + elif len(self.match_list) == 0: + self.log_msg("--------------------------------------------------------------------------") + self.log_msg("No matches found :(") + self.log_msg("--------------------------------------------------------------------------") + search_result = self.result_no_matches + else: + # we've got multiple good matches: + self.log_msg("More than one likely candidate.") + search_result = self.result_multiple_good_matches + self.log_msg("--------------------------------------------------------------------------") + for match_item in final_cover_matching: + self._print_match(match_item) + self.log_msg("--------------------------------------------------------------------------") + + return search_result, final_cover_matching + def search(self) -> list[IssueResult]: ca = self.comic_archive self.match_list = [] @@ -392,8 +871,8 @@ class IssueIdentifier: narrow_cover_hash = None aspect_ratio = self.get_aspect_ratio(cover_image_data) if aspect_ratio < 1.0: - right_side_image_data = self.crop_cover(cover_image_data) - if right_side_image_data is not None: + right_side_image_data = self.crop_double_page(cover_image_data) + if right_side_image_data: narrow_cover_hash = self.calculate_hash(right_side_image_data) keys = self.get_search_keys() @@ -528,8 +1007,8 @@ class IssueIdentifier: series=f"{series.name} ({series.start_year})", distance=score_item["score"], issue_number=keys["issue_number"], - cv_issue_count=series.count_of_issues, - url_image_hash=score_item["hash"], + issue_count=series.count_of_issues, + url_image_hash=score_item["remote_hash"], issue_title=issue.title or "", issue_id=issue.issue_id or "", series_id=series.id, @@ -651,7 +1130,7 @@ class IssueIdentifier: if len(self.match_list) >= 2 and keys["issue_count"] is not None and keys["issue_count"] != 1: new_list = [] for match in self.match_list: - if match.cv_issue_count != 1: + if match.issue_count != 1: new_list.append(match) else: self.log_msg( diff --git a/comictaggerlib/resulttypes.py b/comictaggerlib/resulttypes.py index a0237a1..cdc0d8b 100644 --- a/comictaggerlib/resulttypes.py +++ b/comictaggerlib/resulttypes.py @@ -53,7 +53,7 @@ class IssueResult: series: str distance: int issue_number: str - cv_issue_count: int | None + issue_count: int | None url_image_hash: int issue_title: str issue_id: str diff --git a/comictaggerlib/seriesselectionwindow.py b/comictaggerlib/seriesselectionwindow.py index bf22407..f79ea66 100644 --- a/comictaggerlib/seriesselectionwindow.py +++ b/comictaggerlib/seriesselectionwindow.py @@ -33,6 +33,7 @@ from comictaggerlib.issueidentifier import IssueIdentifier from comictaggerlib.issueselectionwindow import IssueSelectionWindow from comictaggerlib.matchselectionwindow import MatchSelectionWindow from comictaggerlib.progresswindow import IDProgressWindow +from comictaggerlib.resulttypes import IssueResult from comictaggerlib.ui import qtutils, ui_path from comictaggerlib.ui.qtutils import new_web_view, reduce_widget_font_size from comictalker.comictalker import ComicTalker, TalkerError @@ -76,15 +77,17 @@ class SearchThread(QtCore.QThread): class IdentifyThread(QtCore.QThread): - identifyComplete = pyqtSignal() + identifyComplete = pyqtSignal((int, list)) identifyLogMsg = pyqtSignal(str) identifyProgress = pyqtSignal(int, int) - def __init__(self, identifier: IssueIdentifier) -> None: + def __init__(self, identifier: IssueIdentifier, ca: ComicArchive, md: GenericMetadata) -> None: QtCore.QThread.__init__(self) self.identifier = identifier self.identifier.set_output_function(self.log_output) self.identifier.set_progress_callback(self.progress_callback) + self.ca = ca + self.md = md def log_output(self, text: str) -> None: self.identifyLogMsg.emit(str(text)) @@ -93,8 +96,7 @@ class IdentifyThread(QtCore.QThread): self.identifyProgress.emit(cur, total) def run(self) -> None: - self.identifier.search() - self.identifyComplete.emit() + self.identifyComplete.emit(*self.identifier.identify(self.ca, self.md)) class SeriesSelectionWindow(QtWidgets.QDialog): @@ -245,12 +247,12 @@ class SeriesSelectionWindow(QtWidgets.QDialog): md.year = self.year md.issue_count = self.issue_count - self.ii.set_additional_metadata(md) - self.ii.only_use_additional_meta_data = True + # self.ii.set_additional_metadata(md) + # self.ii.only_use_additional_meta_data = True - self.ii.cover_page_index = int(self.cover_index_list[0]) + # self.ii.cover_page_index = int(self.cover_index_list[0]) - self.id_thread = IdentifyThread(self.ii) + self.id_thread = IdentifyThread(self.ii, self.comic_archive, md) self.id_thread.identifyComplete.connect(self.identify_complete) self.id_thread.identifyLogMsg.connect(self.log_id_output) self.id_thread.identifyProgress.connect(self.identify_progress) @@ -276,35 +278,33 @@ class SeriesSelectionWindow(QtWidgets.QDialog): if self.ii is not None: self.ii.cancel = True - def identify_complete(self) -> None: - if self.ii is not None and self.iddialog is not None and self.comic_archive is not None: - matches = self.ii.match_list - result = self.ii.search_result + def identify_complete(self, result: int, issues: list[IssueResult]) -> None: + if self.iddialog is not None and self.comic_archive is not None: found_match = None choices = False - if result == self.ii.result_no_matches: - QtWidgets.QMessageBox.information(self, "Auto-Select Result", " No matches found :-(") - elif result == self.ii.result_found_match_but_bad_cover_score: + if result == IssueIdentifier.result_no_matches: + QtWidgets.QMessageBox.information(self, "Auto-Select Result", " No issues found :-(") + elif result == IssueIdentifier.result_found_match_but_bad_cover_score: QtWidgets.QMessageBox.information( self, "Auto-Select Result", " Found a match, but cover doesn't seem the same. Verify before committing!", ) - found_match = matches[0] - elif result == self.ii.result_found_match_but_not_first_page: + found_match = issues[0] + elif result == IssueIdentifier.result_found_match_but_not_first_page: QtWidgets.QMessageBox.information( self, "Auto-Select Result", " Found a match, but not with the first page of the archive." ) - found_match = matches[0] - elif result == self.ii.result_multiple_matches_with_bad_image_scores: + found_match = issues[0] + elif result == IssueIdentifier.result_multiple_matches_with_bad_image_scores: QtWidgets.QMessageBox.information( self, "Auto-Select Result", " Found some possibilities, but no confidence. Proceed manually." ) choices = True - elif result == self.ii.result_one_good_match: - found_match = matches[0] - elif result == self.ii.result_multiple_good_matches: + elif result == IssueIdentifier.result_one_good_match: + found_match = issues[0] + elif result == IssueIdentifier.result_multiple_good_matches: QtWidgets.QMessageBox.information( self, "Auto-Select Result", " Found multiple likely matches. Please select." ) @@ -312,7 +312,7 @@ class SeriesSelectionWindow(QtWidgets.QDialog): if choices: selector = MatchSelectionWindow( - self, matches, self.comic_archive, talker=self.talker, config=self.config + self, issues, self.comic_archive, talker=self.talker, config=self.config ) selector.setModal(True) selector.exec() diff --git a/comictaggerlib/taggerwindow.py b/comictaggerlib/taggerwindow.py index 9e31465..8f31231 100644 --- a/comictaggerlib/taggerwindow.py +++ b/comictaggerlib/taggerwindow.py @@ -1752,17 +1752,15 @@ class TaggerWindow(QtWidgets.QMainWindow): md.issue = "1" else: md.issue = utils.xlate(md.volume) - ii.set_additional_metadata(md) - ii.only_use_additional_meta_data = True + # ii.set_additional_metadata(md) + # ii.only_use_additional_meta_data = True ii.set_output_function(self.auto_tag_log) - ii.cover_page_index = md.get_cover_page_index_list()[0] + # ii.cover_page_index = md.get_cover_page_index_list()[0] if self.atprogdialog is not None: ii.set_cover_url_callback(self.atprogdialog.set_test_image) ii.set_name_series_match_threshold(dlg.name_length_match_tolerance) - matches: list[IssueResult] = ii.search() - - result = ii.search_result + result, matches = ii.identify(ca, md) found_match = False choices = False From f382c2f8147825a5b30f9cc4287de62476f0ac35 Mon Sep 17 00:00:00 2001 From: Timmy Welch Date: Fri, 23 Feb 2024 20:47:22 -0800 Subject: [PATCH 2/5] Update Tests --- tests/issueidentifier_test.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/issueidentifier_test.py b/tests/issueidentifier_test.py index 6fb2c1d..1435dff 100644 --- a/tests/issueidentifier_test.py +++ b/tests/issueidentifier_test.py @@ -14,7 +14,7 @@ from comictaggerlib.resulttypes import IssueResult def test_crop(cbz_double_cover, config, tmp_path, comicvine_api): config, definitions = config ii = comictaggerlib.issueidentifier.IssueIdentifier(cbz_double_cover, config, comicvine_api) - cropped = ii.crop_cover(cbz_double_cover.archiver.read_file("double_cover.jpg")) + cropped = ii.crop_double_page(cbz_double_cover.archiver.read_file("double_cover.jpg")) original_cover = cbz_double_cover.get_page(0) original_hash = ii.calculate_hash(original_cover) @@ -41,7 +41,7 @@ def test_get_issue_cover_match_score(cbz, config, comicvine_api): [ii.calculate_hash(cbz.get_page(0))], ) expected = { - "hash": 212201432349720, + "remote_hash": 212201432349720, "score": 0, "url": "https://comicvine.gamespot.com/a/uploads/scale_large/0/574/585444-109004_20080707014047_large.jpg", } @@ -51,13 +51,13 @@ def test_get_issue_cover_match_score(cbz, config, comicvine_api): def test_search(cbz, config, comicvine_api): config, definitions = config ii = comictaggerlib.issueidentifier.IssueIdentifier(cbz, config, comicvine_api) - results = ii.search() + result, issues = ii.identify(cbz, cbz.read_metadata("cr")) cv_expected = IssueResult( series=f"{testing.comicvine.cv_volume_result['results']['name']} ({testing.comicvine.cv_volume_result['results']['start_year']})", distance=0, issue_number=testing.comicvine.cv_issue_result["results"]["issue_number"], alt_image_urls=[], - cv_issue_count=testing.comicvine.cv_volume_result["results"]["count_of_issues"], + issue_count=testing.comicvine.cv_volume_result["results"]["count_of_issues"], issue_title=testing.comicvine.cv_issue_result["results"]["name"], issue_id=str(testing.comicvine.cv_issue_result["results"]["id"]), series_id=str(testing.comicvine.cv_volume_result["results"]["id"]), @@ -68,7 +68,7 @@ def test_search(cbz, config, comicvine_api): description=testing.comicvine.cv_issue_result["results"]["description"], url_image_hash=212201432349720, ) - for r, e in zip(results, [cv_expected]): + for r, e in zip(issues, [cv_expected]): assert r == e From 938f760a3758b03b4c3a81f8d6e9842b0aca54da Mon Sep 17 00:00:00 2001 From: Timmy Welch Date: Fri, 23 Feb 2024 20:49:54 -0800 Subject: [PATCH 3/5] Remove IssueIdentifier.search --- comictaggerlib/issueidentifier.py | 485 +----------------------------- 1 file changed, 1 insertion(+), 484 deletions(-) diff --git a/comictaggerlib/issueidentifier.py b/comictaggerlib/issueidentifier.py index 3beb81f..4f6df1f 100644 --- a/comictaggerlib/issueidentifier.py +++ b/comictaggerlib/issueidentifier.py @@ -24,7 +24,7 @@ from typing import Any, Callable from typing_extensions import NotRequired, TypedDict from comicapi import utils -from comicapi.comicarchive import ComicArchive, metadata_styles +from comicapi.comicarchive import ComicArchive from comicapi.genericmetadata import ComicSeries, GenericMetadata from comicapi.issuestring import IssueString from comictaggerlib.ctsettings import ct_ns @@ -152,29 +152,6 @@ class IssueIdentifier: return ImageHasher(data=image_data).average_hash() - def get_aspect_ratio(self, image_data: bytes) -> float: - try: - im = Image.open(io.BytesIO(image_data)) - w, h = im.size - return float(h) / float(w) - except Exception: - return 1.5 - - def crop_double_page(self, image_data: bytes) -> bytes: - im = Image.open(io.BytesIO(image_data)) - - cropped_im = self._crop_double_page(im) - - if cropped_im is None: - return b"" - - output = io.BytesIO() - cropped_im.convert("RGB").save(output, format="PNG") - cropped_image_data = output.getvalue() - output.close() - - return cropped_image_data - def _crop_double_page(self, im: Image.Image) -> Image.Image | None: w, h = im.size @@ -186,21 +163,6 @@ class IssueIdentifier: return cropped_im - # Adapted from https://stackoverflow.com/a/10616717/20629671 - def crop_border(self, image_data: bytes, ratio: int) -> bytes | None: - im = Image.open(io.BytesIO(image_data)) - - cropped_image = self._crop_border(im, ratio) - - # If there is a difference return the image otherwise return None - if cropped_image is not None: - output = io.BytesIO() - cropped_image.save(output, format="PNG") - cropped_image_data = output.getvalue() - output.close() - return cropped_image_data - return None - # Adapted from https://stackoverflow.com/a/10616717/20629671 def _crop_border(self, im: Image.Image, ratio: int) -> Image.Image | None: assert Image @@ -245,57 +207,6 @@ class IssueIdentifier: def set_cover_url_callback(self, cb_func: Callable[[bytes], None]) -> None: self.cover_url_callback = cb_func - def get_search_keys(self) -> SearchKeys: - ca = self.comic_archive - - search_keys: SearchKeys - if self.only_use_additional_meta_data: - search_keys = SearchKeys( - series=self.additional_metadata.series, - issue_number=self.additional_metadata.issue, - year=self.additional_metadata.year, - month=self.additional_metadata.month, - issue_count=self.additional_metadata.issue_count, - ) - return search_keys - - # see if the archive has any useful meta data for searching with - try: - for style in metadata_styles: - internal_metadata = ca.read_metadata(style) - if not internal_metadata.is_empty: - break - except Exception as e: - internal_metadata = GenericMetadata() - logger.error("Failed to load metadata for %s: %s", ca.path, e) - - # try to get some metadata from filename - md_from_filename = ca.metadata_from_filename( - self.config.Filename_Parsing__complicated_parser, - self.config.Filename_Parsing__remove_c2c, - self.config.Filename_Parsing__remove_fcbd, - self.config.Filename_Parsing__remove_publisher, - ) - - working_md = md_from_filename.copy() - - working_md.overlay(internal_metadata) - working_md.overlay(self.additional_metadata) - - # preference order: - # 1. Additional metadata - # 1. Internal metadata - # 1. Filename metadata - search_keys = SearchKeys( - series=working_md.series, - issue_number=working_md.issue, - year=working_md.year, - month=working_md.month, - issue_count=working_md.issue_count, - ) - - return search_keys - def log_msg(self, msg: Any) -> None: msg = str(msg) for handler in logging.getLogger().handlers: @@ -324,86 +235,6 @@ class IssueIdentifier: # default output is stdout self.output_function(*args, **kwargs) - def get_issue_cover_match_score( - self, - primary_img_url: str, - alt_urls: list[str], - local_cover_hash_list: list[int], - use_remote_alternates: bool = False, - ) -> Score: - # local_cover_hash_list is a list of pre-calculated hashes. - # use_remote_alternates - indicates to use alternate covers from CV - - # If there is no URL return 100 - if not primary_img_url: - return Score(score=100, url="", remote_hash=0) - - try: - url_image_data = ImageFetcher(self.config.Runtime_Options__config.user_cache_dir).fetch( - primary_img_url, blocking=True - ) - except ImageFetcherException as e: - self.log_msg(f"Network issue while fetching cover image from {self.talker.name}. Aborting...") - raise IssueIdentifierNetworkError from e - - if self.cancel: - raise IssueIdentifierCancelled - - # alert the GUI, if needed - if self.cover_url_callback is not None: - self.cover_url_callback(url_image_data) - - remote_cover_list = [Score(url=primary_img_url, remote_hash=self.calculate_hash(url_image_data))] - - if self.cancel: - raise IssueIdentifierCancelled - - if use_remote_alternates: - for alt_url in alt_urls: - try: - alt_url_image_data = ImageFetcher(self.config.Runtime_Options__config.user_cache_dir).fetch( - alt_url, blocking=True - ) - except ImageFetcherException as e: - self.log_msg(f"Network issue while fetching alt. cover image from {self.talker.name}. Aborting...") - raise IssueIdentifierNetworkError from e - - if self.cancel: - raise IssueIdentifierCancelled - - # alert the GUI, if needed - if self.cover_url_callback is not None: - self.cover_url_callback(alt_url_image_data) - - remote_cover_list.append(Score(url=alt_url, remote_hash=self.calculate_hash(alt_url_image_data))) - - if self.cancel: - raise IssueIdentifierCancelled - - self.log_msg(f"[{len(remote_cover_list) - 1} alt. covers]") - - score_list = [] - done = False - for local_cover_hash in local_cover_hash_list: - for remote_cover_item in remote_cover_list: - score = ImageHasher.hamming_distance(local_cover_hash, remote_cover_item["remote_hash"]) - score_list.append( - Score(score=score, url=remote_cover_item["url"], remote_hash=remote_cover_item["remote_hash"]) - ) - - self.log_msg(f" - {score:03}") - - if score <= self.strong_score_thresh: - # such a good score, we can quit now, since for sure we have a winner - done = True - break - if done: - break - - best_score_item = min(score_list, key=lambda x: x["score"]) - - return best_score_item - def _get_remote_hashes(self, urls: list[str]) -> list[tuple[str, int]]: remote_hashes: list[tuple[str, int]] = [] for url in urls: @@ -847,317 +678,3 @@ class IssueIdentifier: self.log_msg("--------------------------------------------------------------------------") return search_result, final_cover_matching - - def search(self) -> list[IssueResult]: - ca = self.comic_archive - self.match_list = [] - self.cancel = False - self.search_result = self.result_no_matches - - if not pil_available: - self.log_msg("Python Imaging Library (PIL) is not available and is needed for issue identification.") - return self.match_list - - if not ca.seems_to_be_a_comic_archive(): - self.log_msg(f"Sorry, but {ca.path} is not a comic archive!") - return self.match_list - - cover_image_data = ca.get_page(self.cover_page_index) - cover_hash = self.calculate_hash(cover_image_data) - - # check the aspect ratio - # if it's wider than it is high, it's probably a two page spread - # if so, crop it and calculate a second hash - narrow_cover_hash = None - aspect_ratio = self.get_aspect_ratio(cover_image_data) - if aspect_ratio < 1.0: - right_side_image_data = self.crop_double_page(cover_image_data) - if right_side_image_data: - narrow_cover_hash = self.calculate_hash(right_side_image_data) - - keys = self.get_search_keys() - # normalize the issue number, None will return as "" - keys["issue_number"] = IssueString(keys["issue_number"]).as_string() - - # we need, at minimum, a series and issue number - if not (keys["series"] and keys["issue_number"]): - self.log_msg("Not enough info for a search!") - return [] - - self.log_msg(f"Using {self.talker.name} to search for:") - self.log_msg("\tSeries: " + keys["series"]) - self.log_msg("\tIssue: " + keys["issue_number"]) - if keys["issue_count"] is not None: - self.log_msg("\tCount: " + str(keys["issue_count"])) - if keys["year"] is not None: - self.log_msg("\tYear: " + str(keys["year"])) - if keys["month"] is not None: - self.log_msg("\tMonth: " + str(keys["month"])) - - self.log_msg(f"Searching for {keys['series']} #{keys['issue_number']} ...") - try: - ct_search_results = self.talker.search_for_series(keys["series"]) - except TalkerError as e: - self.log_msg(f"Error searching for series.\n{e}") - return [] - - if self.cancel: - return [] - - if ct_search_results is None: - return [] - - series_second_round_list = [] - - for item in ct_search_results: - length_approved = False - publisher_approved = True - date_approved = True - - # remove any series that starts after the issue year - if keys["year"] is not None and item.start_year is not None: - if keys["year"] < item.start_year: - date_approved = False - - for name in [item.name, *item.aliases]: - if utils.titles_match(keys["series"], name, self.series_match_thresh): - length_approved = True - break - # remove any series from publishers on the filter - if item.publisher is not None: - publisher = item.publisher - if publisher is not None and publisher.casefold() in self.publisher_filter: - publisher_approved = False - - if length_approved and publisher_approved and date_approved: - series_second_round_list.append(item) - - self.log_msg("Searching in " + str(len(series_second_round_list)) + " series") - - if self.progress_callback is not None: - self.progress_callback(0, len(series_second_round_list)) - - # now sort the list by name length - series_second_round_list.sort(key=lambda x: len(x.name), reverse=False) - - series_by_id = {series.id: series for series in series_second_round_list} - - issue_list = None - try: - if len(series_by_id) > 0: - issue_list = self.talker.fetch_issues_by_series_issue_num_and_year( - list(series_by_id.keys()), keys["issue_number"], keys["year"] - ) - except TalkerError as e: - self.log_msg(f"Issue with while searching for series details. Aborting...\n{e}") - return [] - - if issue_list is None: - return [] - - shortlist = [] - # now re-associate the issues and series - # is this really needed? - for issue in issue_list: - if issue.series_id in series_by_id: - shortlist.append((series_by_id[issue.series_id], issue)) - - if keys["year"] is None: - self.log_msg(f"Found {len(shortlist)} series that have an issue #{keys['issue_number']}") - else: - self.log_msg( - f"Found {len(shortlist)} series that have an issue #{keys['issue_number']} from {keys['year']}" - ) - - # now we have a shortlist of series with the desired issue number - # Do first round of cover matching - counter = len(shortlist) - for series, issue in shortlist: - if self.progress_callback is not None: - self.progress_callback(counter, len(shortlist) * 3) - counter += 1 - - self.log_msg( - f"Examining covers for ID: {series.id} {series.name} ({series.start_year}):", - ) - - # Now check the cover match against the primary image - hash_list = [cover_hash] - if narrow_cover_hash is not None: - hash_list.append(narrow_cover_hash) - - cropped_border = self.crop_border(cover_image_data, self.config.Issue_Identifier__border_crop_percent) - if cropped_border is not None: - hash_list.append(self.calculate_hash(cropped_border)) - logger.info("Adding cropped cover to the hashlist") - - try: - image_url = issue._cover_image or "" - alt_urls = issue._alternate_images - - score_item = self.get_issue_cover_match_score( - image_url, alt_urls, hash_list, use_remote_alternates=False - ) - except Exception: - logger.exception("Scoring series failed") - self.match_list = [] - return self.match_list - - match = IssueResult( - series=f"{series.name} ({series.start_year})", - distance=score_item["score"], - issue_number=keys["issue_number"], - issue_count=series.count_of_issues, - url_image_hash=score_item["remote_hash"], - issue_title=issue.title or "", - issue_id=issue.issue_id or "", - series_id=series.id, - month=issue.month, - year=issue.year, - publisher=None, - image_url=image_url, - alt_image_urls=alt_urls, - description=issue.description or "", - ) - if series.publisher is not None: - match.publisher = series.publisher - - self.match_list.append(match) - - self.log_msg(f"best score {match.distance:03}") - - self.log_msg("") - - if len(self.match_list) == 0: - self.log_msg(":-( no matches!") - self.search_result = self.result_no_matches - return self.match_list - - # sort list by image match scores - self.match_list.sort(key=attrgetter("distance")) - - lst = [] - for i in self.match_list: - lst.append(i.distance) - - self.log_msg(f"Compared to covers in {len(self.match_list)} issue(s): {lst}") - - def print_match(item: IssueResult) -> None: - self.log_msg( - "-----> {} #{} {} ({}/{}) -- score: {}".format( - item.series, - item.issue_number, - item.issue_title, - item.month, - item.year, - item.distance, - ) - ) - - best_score: int = self.match_list[0].distance - - if best_score >= self.min_score_thresh: - # we have 1 or more low-confidence matches (all bad cover scores) - # look at a few more pages in the archive, and also alternate covers online - self.log_msg("Very weak scores for the cover. Analyzing alternate pages and covers...") - hash_list = [cover_hash] - if narrow_cover_hash is not None: - hash_list.append(narrow_cover_hash) - for page_index in range(1, min(3, ca.get_number_of_pages())): - image_data = ca.get_page(page_index) - page_hash = self.calculate_hash(image_data) - hash_list.append(page_hash) - - second_match_list = [] - counter = 2 * len(self.match_list) - for m in self.match_list: - if self.progress_callback is not None: - self.progress_callback(counter, len(self.match_list) * 3) - counter += 1 - self.log_msg(f"Examining alternate covers for ID: {m.series_id} {m.series}:") - try: - score_item = self.get_issue_cover_match_score( - m.image_url, - m.alt_image_urls, - hash_list, - use_remote_alternates=True, - ) - except Exception: - logger.exception("failed examining alt covers") - self.match_list = [] - return self.match_list - self.log_msg(f"--->{score_item['score']}") - self.log_msg("") - - if score_item["score"] < self.min_alternate_score_thresh: - second_match_list.append(m) - m.distance = score_item["score"] - - if len(second_match_list) == 0: - if len(self.match_list) == 1: - self.log_msg("No matching pages in the issue.") - self.log_msg("--------------------------------------------------------------------------") - print_match(self.match_list[0]) - self.log_msg("--------------------------------------------------------------------------") - self.search_result = self.result_found_match_but_bad_cover_score - else: - self.log_msg("--------------------------------------------------------------------------") - self.log_msg("Multiple bad cover matches! Need to use other info...") - self.log_msg("--------------------------------------------------------------------------") - self.search_result = self.result_multiple_matches_with_bad_image_scores - return self.match_list - - # We did good, found something! - self.log_msg("Success in secondary/alternate cover matching!") - - self.match_list = second_match_list - # sort new list by image match scores - self.match_list.sort(key=attrgetter("distance")) - best_score = self.match_list[0].distance - self.log_msg("[Second round cover matching: best score = {best_score}]") - # now drop down into the rest of the processing - - if self.progress_callback is not None: - self.progress_callback(99, 100) - - # now pare down list, remove any item more than specified distant from the top scores - for match_item in reversed(self.match_list): - if match_item.distance > best_score + self.min_score_distance: - self.match_list.remove(match_item) - - # One more test for the case choosing limited series first issue vs a trade with the same cover: - # if we have a given issue count > 1 and the series from CV has count==1, remove it from match list - if len(self.match_list) >= 2 and keys["issue_count"] is not None and keys["issue_count"] != 1: - new_list = [] - for match in self.match_list: - if match.issue_count != 1: - new_list.append(match) - else: - self.log_msg( - f"Removing series {match.series} [{match.series_id}] from consideration (only 1 issue)" - ) - - if len(new_list) > 0: - self.match_list = new_list - - if len(self.match_list) == 1: - self.log_msg("--------------------------------------------------------------------------") - print_match(self.match_list[0]) - self.log_msg("--------------------------------------------------------------------------") - self.search_result = self.result_one_good_match - - elif len(self.match_list) == 0: - self.log_msg("--------------------------------------------------------------------------") - self.log_msg("No matches found :(") - self.log_msg("--------------------------------------------------------------------------") - self.search_result = self.result_no_matches - else: - # we've got multiple good matches: - self.log_msg("More than one likely candidate.") - self.search_result = self.result_multiple_good_matches - self.log_msg("--------------------------------------------------------------------------") - for match_item in self.match_list: - print_match(match_item) - self.log_msg("--------------------------------------------------------------------------") - - return self.match_list From bad8b85874bceff7ce29de8803a398bf761a8b0d Mon Sep 17 00:00:00 2001 From: Timmy Welch Date: Sat, 24 Feb 2024 18:30:41 -0800 Subject: [PATCH 4/5] Fix tests --- testing/comicdata.py | 26 +++++------------------- tests/issueidentifier_test.py | 38 ++++++++++++++++++----------------- 2 files changed, 25 insertions(+), 39 deletions(-) diff --git a/testing/comicdata.py b/testing/comicdata.py index 88bcf3f..338969a 100644 --- a/testing/comicdata.py +++ b/testing/comicdata.py @@ -78,33 +78,17 @@ metadata = [ metadata_keys = [ ( - comicapi.genericmetadata.GenericMetadata(), + comicapi.genericmetadata.md_test, { "issue_count": 6, "issue_number": "1", "month": 10, "series": "Cory Doctorow's Futuristic Tales of the Here and Now", "year": 2007, - }, - ), - ( - comicapi.genericmetadata.GenericMetadata(series="test"), - { - "issue_count": 6, - "issue_number": "1", - "month": 10, - "series": "test", - "year": 2007, - }, - ), - ( - comicapi.genericmetadata.GenericMetadata(series="test", issue="3"), - { - "issue_count": 6, - "issue_number": "3", - "month": 10, - "series": "test", - "year": 2007, + "alternate_count": 7, + "alternate_number": "2", + "imprint": "craphound.com", + "publisher": "IDW Publishing", }, ), ] diff --git a/tests/issueidentifier_test.py b/tests/issueidentifier_test.py index 1435dff..b32451d 100644 --- a/tests/issueidentifier_test.py +++ b/tests/issueidentifier_test.py @@ -5,6 +5,7 @@ import io import pytest from PIL import Image +import comictaggerlib.imagehasher import comictaggerlib.issueidentifier import testing.comicdata import testing.comicvine @@ -13,12 +14,16 @@ from comictaggerlib.resulttypes import IssueResult def test_crop(cbz_double_cover, config, tmp_path, comicvine_api): config, definitions = config - ii = comictaggerlib.issueidentifier.IssueIdentifier(cbz_double_cover, config, comicvine_api) - cropped = ii.crop_double_page(cbz_double_cover.archiver.read_file("double_cover.jpg")) - original_cover = cbz_double_cover.get_page(0) - original_hash = ii.calculate_hash(original_cover) - cropped_hash = ii.calculate_hash(cropped) + ii = comictaggerlib.issueidentifier.IssueIdentifier(cbz_double_cover, config, comicvine_api) + + im = Image.open(io.BytesIO(cbz_double_cover.archiver.read_file("double_cover.jpg"))) + + cropped = ii._crop_double_page(im) + original = cbz_double_cover.get_page(0) + + original_hash = comictaggerlib.imagehasher.ImageHasher(data=original).average_hash() + cropped_hash = comictaggerlib.imagehasher.ImageHasher(image=cropped).average_hash() assert original_hash == cropped_hash @@ -27,23 +32,24 @@ def test_crop(cbz_double_cover, config, tmp_path, comicvine_api): def test_get_search_keys(cbz, config, additional_md, expected, comicvine_api): config, definitions = config ii = comictaggerlib.issueidentifier.IssueIdentifier(cbz, config, comicvine_api) - ii.set_additional_metadata(additional_md) - assert expected == ii.get_search_keys() + assert expected == ii._get_search_keys(additional_md) def test_get_issue_cover_match_score(cbz, config, comicvine_api): config, definitions = config ii = comictaggerlib.issueidentifier.IssueIdentifier(cbz, config, comicvine_api) - score = ii.get_issue_cover_match_score( + score = ii._get_issue_cover_match_score( "https://comicvine.gamespot.com/a/uploads/scale_large/0/574/585444-109004_20080707014047_large.jpg", ["https://comicvine.gamespot.com/cory-doctorows-futuristic-tales-of-the-here-and-no/4000-140529/"], - [ii.calculate_hash(cbz.get_page(0))], + [("Cover 1", ii.calculate_hash(cbz.get_page(0)))], ) expected = { "remote_hash": 212201432349720, "score": 0, "url": "https://comicvine.gamespot.com/a/uploads/scale_large/0/574/585444-109004_20080707014047_large.jpg", + "local_hash": 212201432349720, + "local_hash_name": "Cover 1", } assert expected == score @@ -80,14 +86,10 @@ def test_crop_border(cbz, config, comicvine_api): bg = Image.new("RGBA", (100, 100), (0, 0, 0, 255)) fg = Image.new("RGBA", (50, 50), (255, 255, 255, 255)) bg.paste(fg, (bg.width // 2 - (fg.width // 2), bg.height // 2 - (fg.height // 2))) - output = io.BytesIO() - bg.save(output, format="PNG") - image_data = output.getvalue() - output.close() - cropped = ii.crop_border(image_data, 49) + cropped = ii._crop_border(bg, 49) - im = Image.open(io.BytesIO(cropped)) - assert im.width == fg.width - assert im.height == fg.height - assert list(im.getdata()) == list(fg.getdata()) + assert cropped + assert cropped.width == fg.width + assert cropped.height == fg.height + assert list(cropped.getdata()) == list(fg.getdata()) From 22d92e1dede0a5cee43d288e02c36743e7f5d49c Mon Sep 17 00:00:00 2001 From: Timmy Welch Date: Mon, 26 Feb 2024 15:38:13 -0800 Subject: [PATCH 5/5] Move result determination out of _cover_matching --- comictaggerlib/issueidentifier.py | 80 +++++++++++++++---------------- 1 file changed, 40 insertions(+), 40 deletions(-) diff --git a/comictaggerlib/issueidentifier.py b/comictaggerlib/issueidentifier.py index 4f6df1f..d5e9ef1 100644 --- a/comictaggerlib/issueidentifier.py +++ b/comictaggerlib/issueidentifier.py @@ -574,7 +574,6 @@ class IssueIdentifier: if len(cover_matching_1) == 0: self.log_msg(":-( no matches!") - self.search_result = self.result_no_matches return cover_matching_1 # sort list by image match scores @@ -598,28 +597,15 @@ class IssueIdentifier: if score.distance < self.min_alternate_score_thresh: cover_matching_2.append(score) - if len(cover_matching_2) == 0: - if len(cover_matching_1) == 1: - self.log_msg("No matching pages in the issue.") - self.log_msg("--------------------------------------------------------------------------") - self._print_match(cover_matching_1[0]) - self.log_msg("--------------------------------------------------------------------------") - self.search_result = self.result_found_match_but_bad_cover_score - else: - self.log_msg("--------------------------------------------------------------------------") - self.log_msg("Multiple bad cover matches! Need to use other info...") - self.log_msg("--------------------------------------------------------------------------") - self.search_result = self.result_multiple_matches_with_bad_image_scores - return cover_matching_1 + if len(cover_matching_2) > 0: + # We did good, found something! + self.log_msg("Success in secondary/alternate cover matching!") - # We did good, found something! - self.log_msg("Success in secondary/alternate cover matching!") - - final_cover_matching = cover_matching_2 - # sort new list by image match scores - final_cover_matching.sort(key=attrgetter("distance")) - self.log_msg("[Second round cover matching: best score = {best_score}]") - # now drop down into the rest of the processing + final_cover_matching = cover_matching_2 + # sort new list by image match scores + final_cover_matching.sort(key=attrgetter("distance")) + self.log_msg("[Second round cover matching: best score = {best_score}]") + # now drop down into the rest of the processing best_score = final_cover_matching[0].distance # now pare down list, remove any item more than specified distant from the top scores @@ -657,24 +643,38 @@ class IssueIdentifier: ) final_cover_matching.remove(match) - if len(final_cover_matching) == 1: - self.log_msg("--------------------------------------------------------------------------") - self._print_match(final_cover_matching[0]) - self.log_msg("--------------------------------------------------------------------------") - search_result = self.result_one_good_match - - elif len(self.match_list) == 0: - self.log_msg("--------------------------------------------------------------------------") - self.log_msg("No matches found :(") - self.log_msg("--------------------------------------------------------------------------") - search_result = self.result_no_matches + best_score = final_cover_matching[0].distance + if best_score >= self.min_score_thresh: + if len(final_cover_matching) == 1: + self.log_msg("No matching pages in the issue.") + self.log_msg("--------------------------------------------------------------------------") + self._print_match(final_cover_matching[0]) + self.log_msg("--------------------------------------------------------------------------") + search_result = self.result_found_match_but_bad_cover_score + else: + self.log_msg("--------------------------------------------------------------------------") + self.log_msg("Multiple bad cover matches! Need to use other info...") + self.log_msg("--------------------------------------------------------------------------") + search_result = self.result_multiple_matches_with_bad_image_scores else: - # we've got multiple good matches: - self.log_msg("More than one likely candidate.") - search_result = self.result_multiple_good_matches - self.log_msg("--------------------------------------------------------------------------") - for match_item in final_cover_matching: - self._print_match(match_item) - self.log_msg("--------------------------------------------------------------------------") + if len(final_cover_matching) == 1: + self.log_msg("--------------------------------------------------------------------------") + self._print_match(final_cover_matching[0]) + self.log_msg("--------------------------------------------------------------------------") + search_result = self.result_one_good_match + + elif len(self.match_list) == 0: + self.log_msg("--------------------------------------------------------------------------") + self.log_msg("No matches found :(") + self.log_msg("--------------------------------------------------------------------------") + search_result = self.result_no_matches + else: + # we've got multiple good matches: + self.log_msg("More than one likely candidate.") + search_result = self.result_multiple_good_matches + self.log_msg("--------------------------------------------------------------------------") + for match_item in final_cover_matching: + self._print_match(match_item) + self.log_msg("--------------------------------------------------------------------------") return search_result, final_cover_matching