comictagger/comictaggerlib/issueidentifier.py

705 lines
27 KiB
Python

"""A class to automatically identify a comic archive"""
#
# Copyright 2012-2014 Anthony Beville
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import io
import logging
import sys
from typing import Any, Callable
from typing_extensions import NotRequired, TypedDict
from comicapi import utils
from comicapi.comicarchive import ComicArchive
from comicapi.genericmetadata import GenericMetadata
from comicapi.issuestring import IssueString
from comictaggerlib.imagefetcher import ImageFetcher, ImageFetcherException
from comictaggerlib.imagehasher import ImageHasher
from comictaggerlib.resulttypes import IssueResult
from comictaggerlib.settings import ComicTaggerSettings
from comictalker.comictalker import ComicTalker
from comictalker.talkerbase import TalkerError
from comictalker.utils import parse_date_str
logger = logging.getLogger(__name__)
try:
from PIL import Image
pil_available = True
except ImportError:
pil_available = False
class SearchKeys(TypedDict):
series: str | None
issue_number: str | None
month: int | None
year: int | None
issue_count: int | None
class Score(TypedDict):
score: NotRequired[int]
url: str
hash: int
class IssueIdentifierNetworkError(Exception):
...
class IssueIdentifierCancelled(Exception):
...
class IssueIdentifier:
result_no_matches = 0
result_found_match_but_bad_cover_score = 1
result_found_match_but_not_first_page = 2
result_multiple_matches_with_bad_image_scores = 3
result_one_good_match = 4
result_multiple_good_matches = 5
def __init__(self, comic_archive: ComicArchive, settings: ComicTaggerSettings, talker_api: ComicTalker) -> None:
self.settings = settings
self.talker_api = talker_api
self.comic_archive: ComicArchive = comic_archive
self.image_hasher = 1
self.only_use_additional_meta_data = False
# a decent hamming score, good enough to call it a match
self.min_score_thresh: int = 16
# for alternate covers, be more stringent, since we're a bit more
# scattershot in comparisons
self.min_alternate_score_thresh = 12
# the min distance a hamming score must be to separate itself from
# closest neighbor
self.min_score_distance = 4
# a very strong hamming score, almost certainly the same image
self.strong_score_thresh = 8
# used to eliminate series names that are too long based on our search
# string
self.series_match_thresh = settings.id_series_match_identify_thresh
# used to eliminate unlikely publishers
self.publisher_filter = [s.strip().casefold() for s in settings.id_publisher_filter.split(",")]
self.additional_metadata = GenericMetadata()
self.output_function: Callable[[str], None] = IssueIdentifier.default_write_output
self.callback: Callable[[int, int], None] | None = None
self.cover_url_callback: Callable[[bytes], None] | None = None
self.search_result = self.result_no_matches
self.cover_page_index = 0
self.cancel = False
self.match_list: list[IssueResult] = []
def set_score_min_threshold(self, thresh: int) -> None:
self.min_score_thresh = thresh
def set_score_min_distance(self, distance: int) -> None:
self.min_score_distance = distance
def set_additional_metadata(self, md: GenericMetadata) -> None:
self.additional_metadata = md
def set_name_series_match_threshold(self, delta: int) -> None:
self.series_match_thresh = delta
def set_publisher_filter(self, flt: list[str]) -> None:
self.publisher_filter = flt
def set_hasher_algorithm(self, algo: int) -> None:
self.image_hasher = algo
def set_output_function(self, func: Callable[[str], None]) -> None:
self.output_function = func
def calculate_hash(self, image_data: bytes) -> int:
if self.image_hasher == 3:
return -1 # ImageHasher(data=image_data).dct_average_hash()
if self.image_hasher == 2:
return -1 # ImageHasher(data=image_data).average_hash2()
return ImageHasher(data=image_data).average_hash()
def get_aspect_ratio(self, image_data: bytes) -> float:
try:
im = Image.open(io.BytesIO(image_data))
w, h = im.size
return float(h) / float(w)
except Exception:
return 1.5
def crop_cover(self, image_data: bytes) -> bytes:
im = Image.open(io.BytesIO(image_data))
w, h = im.size
try:
cropped_im = im.crop((int(w / 2), 0, w, h))
except Exception:
logger.exception("cropCover() error")
return bytes()
output = io.BytesIO()
cropped_im.save(output, format="PNG")
cropped_image_data = output.getvalue()
output.close()
return cropped_image_data
def set_progress_callback(self, cb_func: Callable[[int, int], None]) -> None:
self.callback = cb_func
def set_cover_url_callback(self, cb_func: Callable[[bytes], None]) -> None:
self.cover_url_callback = cb_func
def get_search_keys(self) -> SearchKeys:
ca = self.comic_archive
if ca is None:
return None
search_keys: SearchKeys
if self.only_use_additional_meta_data:
search_keys = SearchKeys(
series=self.additional_metadata.series,
issue_number=self.additional_metadata.issue,
year=self.additional_metadata.year,
month=self.additional_metadata.month,
issue_count=self.additional_metadata.issue_count,
)
return search_keys
# see if the archive has any useful meta data for searching with
try:
if ca.has_cix():
internal_metadata = ca.read_cix()
else:
internal_metadata = ca.read_cbi()
except Exception as e:
internal_metadata = GenericMetadata()
logger.error("Failed to load metadata for %s: %s", ca.path, e)
# try to get some metadata from filename
md_from_filename = ca.metadata_from_filename(
self.settings.complicated_parser,
self.settings.remove_c2c,
self.settings.remove_fcbd,
self.settings.remove_publisher,
)
working_md = md_from_filename.copy()
working_md.overlay(internal_metadata)
working_md.overlay(self.additional_metadata)
# preference order:
# 1. Additional metadata
# 1. Internal metadata
# 1. Filename metadata
search_keys = SearchKeys(
series=working_md.series,
issue_number=working_md.issue,
year=working_md.year,
month=working_md.month,
issue_count=working_md.issue_count,
)
return search_keys
@staticmethod
def default_write_output(text: str) -> None:
sys.stdout.write(text)
sys.stdout.flush()
def log_msg(self, msg: Any, newline: bool = True) -> None:
msg = str(msg)
if newline:
msg += "\n"
self.output_function(msg)
def get_issue_cover_match_score(
self,
issue_id: int,
primary_img_url: str,
primary_thumb_url: str,
page_url: str,
local_cover_hash_list: list[int],
use_remote_alternates: bool = False,
use_log: bool = True,
) -> Score:
# local_cover_hash_list is a list of pre-calculated hashes.
# use_remote_alternates - indicates to use alternate covers from CV
try:
url_image_data = ImageFetcher().fetch(primary_thumb_url, blocking=True)
except ImageFetcherException as e:
self.log_msg("Network issue while fetching cover image from Comic Vine. Aborting...")
raise IssueIdentifierNetworkError from e
if self.cancel:
raise IssueIdentifierCancelled
# alert the GUI, if needed
if self.cover_url_callback is not None:
self.cover_url_callback(url_image_data)
remote_cover_list = [Score(url=primary_img_url, hash=self.calculate_hash(url_image_data))]
if self.cancel:
raise IssueIdentifierCancelled
if use_remote_alternates:
alt_img_url_list = self.talker_api.fetch_alternate_cover_urls(issue_id, page_url)
for alt_url in alt_img_url_list:
try:
alt_url_image_data = ImageFetcher().fetch(alt_url, blocking=True)
except ImageFetcherException as e:
self.log_msg("Network issue while fetching alt. cover image from Comic Vine. Aborting...")
raise IssueIdentifierNetworkError from e
if self.cancel:
raise IssueIdentifierCancelled
# alert the GUI, if needed
if self.cover_url_callback is not None:
self.cover_url_callback(alt_url_image_data)
remote_cover_list.append(Score(url=alt_url, hash=self.calculate_hash(alt_url_image_data)))
if self.cancel:
raise IssueIdentifierCancelled
if use_log and use_remote_alternates:
self.log_msg(f"[{len(remote_cover_list) - 1} alt. covers]", False)
if use_log:
self.log_msg("[ ", False)
score_list = []
done = False
for local_cover_hash in local_cover_hash_list:
for remote_cover_item in remote_cover_list:
score = ImageHasher.hamming_distance(local_cover_hash, remote_cover_item["hash"])
score_list.append(Score(score=score, url=remote_cover_item["url"], hash=remote_cover_item["hash"]))
if use_log:
self.log_msg(score, False)
if score <= self.strong_score_thresh:
# such a good score, we can quit now, since for sure we
# have a winner
done = True
break
if done:
break
if use_log:
self.log_msg(" ]", False)
best_score_item = min(score_list, key=lambda x: x["score"])
return best_score_item
def search(self) -> list[IssueResult]:
ca = self.comic_archive
self.match_list = []
self.cancel = False
self.search_result = self.result_no_matches
if not pil_available:
self.log_msg("Python Imaging Library (PIL) is not available and is needed for issue identification.")
return self.match_list
if not ca.seems_to_be_a_comic_archive():
self.log_msg(f"Sorry, but {ca.path} is not a comic archive!")
return self.match_list
cover_image_data = ca.get_page(self.cover_page_index)
cover_hash = self.calculate_hash(cover_image_data)
# check the aspect ratio
# if it's wider than it is high, it's probably a two page spread
# if so, crop it and calculate a second hash
narrow_cover_hash = None
aspect_ratio = self.get_aspect_ratio(cover_image_data)
if aspect_ratio < 1.0:
right_side_image_data = self.crop_cover(cover_image_data)
if right_side_image_data is not None:
narrow_cover_hash = self.calculate_hash(right_side_image_data)
keys = self.get_search_keys()
# normalize the issue number
keys["issue_number"] = IssueString(keys["issue_number"]).as_string()
# we need, at minimum, a series and issue number
if keys["series"] is None or keys["issue_number"] is None:
self.log_msg("Not enough info for a search!")
return []
self.log_msg("Going to search for:")
self.log_msg("\tSeries: " + keys["series"])
self.log_msg("\tIssue: " + keys["issue_number"])
if keys["issue_count"] is not None:
self.log_msg("\tCount: " + str(keys["issue_count"]))
if keys["year"] is not None:
self.log_msg("\tYear: " + str(keys["year"]))
if keys["month"] is not None:
self.log_msg("\tMonth: " + str(keys["month"]))
self.talker_api.set_log_func(self.output_function)
self.log_msg(f"Searching for {keys['series']} #{keys['issue_number']} ...")
try:
ct_search_results = self.talker_api.search_for_series(keys["series"])
except TalkerError as e:
self.log_msg(f"Error searching for series.\n{e}")
return []
if self.cancel:
return []
if ct_search_results is None:
return []
series_second_round_list = []
for item in ct_search_results:
length_approved = False
publisher_approved = True
date_approved = True
# remove any series that starts after the issue year
if (
keys["year"] is not None
and str(keys["year"]).isdigit()
and item["start_year"] is not None
and str(item["start_year"]).isdigit()
):
if int(keys["year"]) < int(item["start_year"]):
date_approved = False
aliases = []
if item["aliases"]:
aliases = item["aliases"].split("\n")
for name in [item["name"], *aliases]:
if utils.titles_match(keys["series"], name, self.series_match_thresh):
length_approved = True
break
# remove any series from publishers on the filter
if item["publisher"] is not None:
publisher = item["publisher"]
if publisher is not None and publisher.casefold() in self.publisher_filter:
publisher_approved = False
if length_approved and publisher_approved and date_approved:
series_second_round_list.append(item)
self.log_msg("Searching in " + str(len(series_second_round_list)) + " series")
if self.callback is not None:
self.callback(0, len(series_second_round_list))
# now sort the list by name length
series_second_round_list.sort(key=lambda x: len(x["name"]), reverse=False)
# If the sources lacks issue level data, don't look for it.
if not self.talker_api.static_options.has_issues:
for series in series_second_round_list:
hash_list = [cover_hash]
if narrow_cover_hash is not None:
hash_list.append(narrow_cover_hash)
try:
image_url = series["image"]
thumb_url = series["image"]
page_url = ""
score_item = self.get_issue_cover_match_score(
series["id"],
image_url,
thumb_url,
page_url, # Only required for alt covers
hash_list,
use_remote_alternates=False,
)
except Exception:
self.match_list = []
return self.match_list
match: IssueResult = {
"series": f"{series['name']} ({series['start_year']})",
"distance": score_item["score"],
"issue_number": "",
"cv_issue_count": series["count_of_issues"],
"url_image_hash": score_item["hash"],
"issue_title": series["name"],
"issue_id": 0,
"volume_id": series["id"],
"month": 0,
"year": int(series["start_year"]),
"publisher": series["publisher"],
"image_url": image_url,
"thumb_url": thumb_url,
"page_url": page_url,
"description": series["description"],
}
self.match_list.append(match)
self.log_msg(f" --> {match['distance']}", newline=False)
self.log_msg("")
else:
# build a list of volume IDs
volume_id_list = []
for series in series_second_round_list:
volume_id_list.append(series["id"])
issue_list = None
try:
if len(volume_id_list) > 0:
issue_list = self.talker_api.fetch_issues_by_volume_issue_num_and_year(
volume_id_list, keys["issue_number"], keys["year"]
)
except TalkerError as e:
self.log_msg(f"Issue with while searching for series details. Aborting...\n{e}")
return []
if issue_list is None:
return []
shortlist = []
# now re-associate the issues and volumes
for issue in issue_list:
for series in series_second_round_list:
if series["id"] == issue["volume"]["id"]:
shortlist.append((series, issue))
break
if keys["year"] is None:
self.log_msg(f"Found {len(shortlist)} series that have an issue #{keys['issue_number']}")
else:
self.log_msg(
f"Found {len(shortlist)} series that have an issue #{keys['issue_number']} from {keys['year']}"
)
# now we have a shortlist of volumes with the desired issue number
# Do first round of cover matching
counter = len(shortlist)
for series, issue in shortlist:
if self.callback is not None:
self.callback(counter, len(shortlist) * 3)
counter += 1
self.log_msg(
f"Examining covers for ID: {series['id']} {series['name']} ({series['start_year']}) ...",
newline=False,
)
# parse out the cover date
_, month, year = parse_date_str(issue["cover_date"])
# Now check the cover match against the primary image
hash_list = [cover_hash]
if narrow_cover_hash is not None:
hash_list.append(narrow_cover_hash)
try:
image_url = issue["image"]
thumb_url = issue["image_thumb"]
page_url = issue["site_detail_url"]
score_item = self.get_issue_cover_match_score(
issue["id"],
image_url,
thumb_url,
page_url,
hash_list,
use_remote_alternates=False,
)
except Exception:
self.match_list = []
return self.match_list
match: IssueResult = {
"series": f"{series['name']} ({series['start_year']})",
"distance": score_item["score"],
"issue_number": keys["issue_number"],
"cv_issue_count": series["count_of_issues"],
"url_image_hash": score_item["hash"],
"issue_title": issue["name"],
"issue_id": issue["id"],
"volume_id": series["id"],
"month": month,
"year": year,
"publisher": None,
"image_url": image_url,
"thumb_url": thumb_url,
"page_url": page_url,
"description": issue["description"],
}
if series["publisher"] is not None:
match["publisher"] = series["publisher"]
self.match_list.append(match)
self.log_msg(f" --> {match['distance']}", newline=False)
self.log_msg("")
if len(self.match_list) == 0:
self.log_msg(":-(no matches!")
self.search_result = self.result_no_matches
return self.match_list
# sort list by image match scores
self.match_list.sort(key=lambda k: k["distance"])
lst = []
for i in self.match_list:
lst.append(i["distance"])
self.log_msg(f"Compared to covers in {len(self.match_list)} issue(s):", newline=False)
self.log_msg(str(lst))
def print_match(item: IssueResult) -> None:
self.log_msg(
"-----> {} #{} {} ({}/{}) -- score: {}".format(
item["series"],
item["issue_number"],
item["issue_title"],
item["month"],
item["year"],
item["distance"],
)
)
best_score: int = self.match_list[0]["distance"]
if best_score >= self.min_score_thresh:
# we have 1 or more low-confidence matches (all bad cover scores)
# look at a few more pages in the archive, and also alternate covers online
self.log_msg("Very weak scores for the cover. Analyzing alternate pages and covers...")
hash_list = [cover_hash]
if narrow_cover_hash is not None:
hash_list.append(narrow_cover_hash)
for page_index in range(1, min(3, ca.get_number_of_pages())):
image_data = ca.get_page(page_index)
page_hash = self.calculate_hash(image_data)
hash_list.append(page_hash)
second_match_list = []
counter = 2 * len(self.match_list)
for m in self.match_list:
if self.callback is not None:
self.callback(counter, len(self.match_list) * 3)
counter += 1
self.log_msg(f"Examining alternate covers for ID: {m['volume_id']} {m['series']} ...", newline=False)
try:
score_item = self.get_issue_cover_match_score(
m["issue_id"],
m["image_url"],
m["thumb_url"],
m["page_url"],
hash_list,
use_remote_alternates=True,
)
except Exception:
self.match_list = []
return self.match_list
self.log_msg(f"--->{score_item['score']}")
self.log_msg("")
if score_item["score"] < self.min_alternate_score_thresh:
second_match_list.append(m)
m["distance"] = score_item["score"]
if len(second_match_list) == 0:
if len(self.match_list) == 1:
self.log_msg("No matching pages in the issue.")
self.log_msg("--------------------------------------------------------------------------")
print_match(self.match_list[0])
self.log_msg("--------------------------------------------------------------------------")
self.search_result = self.result_found_match_but_bad_cover_score
else:
self.log_msg("--------------------------------------------------------------------------")
self.log_msg("Multiple bad cover matches! Need to use other info...")
self.log_msg("--------------------------------------------------------------------------")
self.search_result = self.result_multiple_matches_with_bad_image_scores
return self.match_list
# We did good, found something!
self.log_msg("Success in secondary/alternate cover matching!")
self.match_list = second_match_list
# sort new list by image match scores
self.match_list.sort(key=lambda k: k["distance"])
best_score = self.match_list[0]["distance"]
self.log_msg("[Second round cover matching: best score = {best_score}]")
# now drop down into the rest of the processing
if self.callback is not None:
self.callback(99, 100)
# now pare down list, remove any item more than specified distant from the top scores
for match_item in reversed(self.match_list):
if match_item["distance"] > best_score + self.min_score_distance:
self.match_list.remove(match_item)
# One more test for the case choosing limited series first issue vs a trade with the same cover:
# if we have a given issue count > 1 and the volume from CV has count==1, remove it from match list
if len(self.match_list) >= 2 and keys["issue_count"] is not None and keys["issue_count"] != 1:
new_list = []
for match in self.match_list:
if match["cv_issue_count"] != 1:
new_list.append(match)
else:
self.log_msg(
f"Removing volume {match['series']} [{match['volume_id']}] from consideration (only 1 issue)"
)
if len(new_list) > 0:
self.match_list = new_list
if len(self.match_list) == 1:
self.log_msg("--------------------------------------------------------------------------")
print_match(self.match_list[0])
self.log_msg("--------------------------------------------------------------------------")
self.search_result = self.result_one_good_match
elif len(self.match_list) == 0:
self.log_msg("--------------------------------------------------------------------------")
self.log_msg("No matches found :(")
self.log_msg("--------------------------------------------------------------------------")
self.search_result = self.result_no_matches
else:
# we've got multiple good matches:
self.log_msg("More than one likely candidate.")
self.search_result = self.result_multiple_good_matches
self.log_msg("--------------------------------------------------------------------------")
for match_item in self.match_list:
print_match(match_item)
self.log_msg("--------------------------------------------------------------------------")
return self.match_list