Merge branch 'ii-rework' into develop
This commit is contained in:
commit
ea43eccd78
@ -459,31 +459,29 @@ class CLI:
|
||||
self.output(text)
|
||||
|
||||
# use our overlaid MD struct to search
|
||||
ii.set_additional_metadata(md)
|
||||
ii.only_use_additional_meta_data = True
|
||||
# ii.set_additional_metadata(md)
|
||||
# ii.only_use_additional_meta_data = True
|
||||
ii.set_output_function(functools.partial(self.output, already_logged=True))
|
||||
ii.cover_page_index = md.get_cover_page_index_list()[0]
|
||||
matches = ii.search()
|
||||
|
||||
result = ii.search_result
|
||||
# ii.cover_page_index = md.get_cover_page_index_list()[0]
|
||||
result, matches = ii.identify(ca, md)
|
||||
|
||||
found_match = False
|
||||
choices = False
|
||||
low_confidence = False
|
||||
|
||||
if result == ii.result_no_matches:
|
||||
if result == IssueIdentifier.result_no_matches:
|
||||
pass
|
||||
elif result == ii.result_found_match_but_bad_cover_score:
|
||||
elif result == IssueIdentifier.result_found_match_but_bad_cover_score:
|
||||
low_confidence = True
|
||||
found_match = True
|
||||
elif result == ii.result_found_match_but_not_first_page:
|
||||
elif result == IssueIdentifier.result_found_match_but_not_first_page:
|
||||
found_match = True
|
||||
elif result == ii.result_multiple_matches_with_bad_image_scores:
|
||||
elif result == IssueIdentifier.result_multiple_matches_with_bad_image_scores:
|
||||
low_confidence = True
|
||||
choices = True
|
||||
elif result == ii.result_one_good_match:
|
||||
elif result == IssueIdentifier.result_one_good_match:
|
||||
found_match = True
|
||||
elif result == ii.result_multiple_good_matches:
|
||||
elif result == IssueIdentifier.result_multiple_good_matches:
|
||||
choices = True
|
||||
|
||||
if choices:
|
||||
|
@ -34,13 +34,19 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ImageHasher:
|
||||
def __init__(self, path: str | None = None, data: bytes = b"", width: int = 8, height: int = 8) -> None:
|
||||
def __init__(
|
||||
self, path: str | None = None, image: Image | None = None, data: bytes = b"", width: int = 8, height: int = 8
|
||||
) -> None:
|
||||
self.width = width
|
||||
self.height = height
|
||||
|
||||
if path is None and not data:
|
||||
if path is None and not data and not image:
|
||||
raise OSError
|
||||
|
||||
if image is not None:
|
||||
self.image = image
|
||||
return
|
||||
|
||||
try:
|
||||
if path is not None:
|
||||
self.image = Image.open(path)
|
||||
|
@ -24,8 +24,8 @@ from typing import Any, Callable
|
||||
from typing_extensions import NotRequired, TypedDict
|
||||
|
||||
from comicapi import utils
|
||||
from comicapi.comicarchive import ComicArchive, metadata_styles
|
||||
from comicapi.genericmetadata import GenericMetadata
|
||||
from comicapi.comicarchive import ComicArchive
|
||||
from comicapi.genericmetadata import ComicSeries, GenericMetadata
|
||||
from comicapi.issuestring import IssueString
|
||||
from comictaggerlib.ctsettings import ct_ns
|
||||
from comictaggerlib.imagefetcher import ImageFetcher, ImageFetcherException
|
||||
@ -44,17 +44,23 @@ except ImportError:
|
||||
|
||||
|
||||
class SearchKeys(TypedDict):
|
||||
series: str | None
|
||||
issue_number: str | None
|
||||
series: str
|
||||
issue_number: str
|
||||
alternate_number: str | None
|
||||
month: int | None
|
||||
year: int | None
|
||||
issue_count: int | None
|
||||
alternate_count: int | None
|
||||
publisher: str | None
|
||||
imprint: str | None
|
||||
|
||||
|
||||
class Score(TypedDict):
|
||||
score: NotRequired[int]
|
||||
url: str
|
||||
hash: int
|
||||
remote_hash: int
|
||||
local_hash_name: str
|
||||
local_hash: int
|
||||
|
||||
|
||||
class IssueIdentifierNetworkError(Exception): ...
|
||||
@ -71,10 +77,17 @@ class IssueIdentifier:
|
||||
result_one_good_match = 4
|
||||
result_multiple_good_matches = 5
|
||||
|
||||
def __init__(self, comic_archive: ComicArchive, config: ct_ns, talker: ComicTalker) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
comic_archive: ComicArchive,
|
||||
config: ct_ns,
|
||||
talker: ComicTalker,
|
||||
metadata: GenericMetadata = GenericMetadata(),
|
||||
) -> None:
|
||||
self.config = config
|
||||
self.talker = talker
|
||||
self.comic_archive: ComicArchive = comic_archive
|
||||
self.md = metadata
|
||||
self.image_hasher = 1
|
||||
|
||||
self.only_use_additional_meta_data = False
|
||||
@ -139,35 +152,21 @@ class IssueIdentifier:
|
||||
|
||||
return ImageHasher(data=image_data).average_hash()
|
||||
|
||||
def get_aspect_ratio(self, image_data: bytes) -> float:
|
||||
try:
|
||||
im = Image.open(io.BytesIO(image_data))
|
||||
w, h = im.size
|
||||
return float(h) / float(w)
|
||||
except Exception:
|
||||
return 1.5
|
||||
|
||||
def crop_cover(self, image_data: bytes) -> bytes:
|
||||
im = Image.open(io.BytesIO(image_data))
|
||||
def _crop_double_page(self, im: Image.Image) -> Image.Image | None:
|
||||
w, h = im.size
|
||||
|
||||
try:
|
||||
cropped_im = im.crop((int(w / 2), 0, w, h))
|
||||
except Exception:
|
||||
logger.exception("cropCover() error")
|
||||
return b""
|
||||
return None
|
||||
|
||||
output = io.BytesIO()
|
||||
cropped_im.convert("RGB").save(output, format="PNG")
|
||||
cropped_image_data = output.getvalue()
|
||||
output.close()
|
||||
|
||||
return cropped_image_data
|
||||
return cropped_im
|
||||
|
||||
# Adapted from https://stackoverflow.com/a/10616717/20629671
|
||||
def crop_border(self, image_data: bytes, ratio: int) -> bytes | None:
|
||||
im = Image.open(io.BytesIO(image_data))
|
||||
|
||||
def _crop_border(self, im: Image.Image, ratio: int) -> Image.Image | None:
|
||||
assert Image
|
||||
assert ImageChops
|
||||
# RGBA doesn't work????
|
||||
tmp = im.convert("RGB")
|
||||
|
||||
@ -199,11 +198,7 @@ class IssueIdentifier:
|
||||
|
||||
# If there is a difference return the image otherwise return None
|
||||
if width_percent > ratio or height_percent > ratio:
|
||||
output = io.BytesIO()
|
||||
im.crop(bbox).save(output, format="PNG")
|
||||
cropped_image_data = output.getvalue()
|
||||
output.close()
|
||||
return cropped_image_data
|
||||
return im.crop(bbox)
|
||||
return None
|
||||
|
||||
def set_progress_callback(self, cb_func: Callable[[int, int], None]) -> None:
|
||||
@ -212,57 +207,6 @@ class IssueIdentifier:
|
||||
def set_cover_url_callback(self, cb_func: Callable[[bytes], None]) -> None:
|
||||
self.cover_url_callback = cb_func
|
||||
|
||||
def get_search_keys(self) -> SearchKeys:
|
||||
ca = self.comic_archive
|
||||
|
||||
search_keys: SearchKeys
|
||||
if self.only_use_additional_meta_data:
|
||||
search_keys = SearchKeys(
|
||||
series=self.additional_metadata.series,
|
||||
issue_number=self.additional_metadata.issue,
|
||||
year=self.additional_metadata.year,
|
||||
month=self.additional_metadata.month,
|
||||
issue_count=self.additional_metadata.issue_count,
|
||||
)
|
||||
return search_keys
|
||||
|
||||
# see if the archive has any useful meta data for searching with
|
||||
try:
|
||||
for style in metadata_styles:
|
||||
internal_metadata = ca.read_metadata(style)
|
||||
if not internal_metadata.is_empty:
|
||||
break
|
||||
except Exception as e:
|
||||
internal_metadata = GenericMetadata()
|
||||
logger.error("Failed to load metadata for %s: %s", ca.path, e)
|
||||
|
||||
# try to get some metadata from filename
|
||||
md_from_filename = ca.metadata_from_filename(
|
||||
self.config.Filename_Parsing__complicated_parser,
|
||||
self.config.Filename_Parsing__remove_c2c,
|
||||
self.config.Filename_Parsing__remove_fcbd,
|
||||
self.config.Filename_Parsing__remove_publisher,
|
||||
)
|
||||
|
||||
working_md = md_from_filename.copy()
|
||||
|
||||
working_md.overlay(internal_metadata)
|
||||
working_md.overlay(self.additional_metadata)
|
||||
|
||||
# preference order:
|
||||
# 1. Additional metadata
|
||||
# 1. Internal metadata
|
||||
# 1. Filename metadata
|
||||
search_keys = SearchKeys(
|
||||
series=working_md.series,
|
||||
issue_number=working_md.issue,
|
||||
year=working_md.year,
|
||||
month=working_md.month,
|
||||
issue_count=working_md.issue_count,
|
||||
)
|
||||
|
||||
return search_keys
|
||||
|
||||
def log_msg(self, msg: Any) -> None:
|
||||
msg = str(msg)
|
||||
for handler in logging.getLogger().handlers:
|
||||
@ -291,70 +235,62 @@ class IssueIdentifier:
|
||||
# default output is stdout
|
||||
self.output_function(*args, **kwargs)
|
||||
|
||||
def get_issue_cover_match_score(
|
||||
def _get_remote_hashes(self, urls: list[str]) -> list[tuple[str, int]]:
|
||||
remote_hashes: list[tuple[str, int]] = []
|
||||
for url in urls:
|
||||
try:
|
||||
alt_url_image_data = ImageFetcher(self.config.Runtime_Options__config.user_cache_dir).fetch(
|
||||
url, blocking=True
|
||||
)
|
||||
except ImageFetcherException as e:
|
||||
self.log_msg(f"Network issue while fetching alt. cover image from {self.talker.name}. Aborting...")
|
||||
raise IssueIdentifierNetworkError from e
|
||||
|
||||
self._user_canceled(self.cover_url_callback, alt_url_image_data)
|
||||
|
||||
remote_hashes.append((url, self.calculate_hash(alt_url_image_data)))
|
||||
|
||||
if self.cancel:
|
||||
raise IssueIdentifierCancelled
|
||||
return remote_hashes
|
||||
|
||||
def _get_issue_cover_match_score(
|
||||
self,
|
||||
primary_img_url: str,
|
||||
alt_urls: list[str],
|
||||
local_cover_hash_list: list[int],
|
||||
use_remote_alternates: bool = False,
|
||||
local_hashes: list[tuple[str, int]],
|
||||
use_alt_urls: bool = False,
|
||||
) -> Score:
|
||||
# local_cover_hash_list is a list of pre-calculated hashes.
|
||||
# use_remote_alternates - indicates to use alternate covers from CV
|
||||
# local_hashes is a list of pre-calculated hashes.
|
||||
# use_alt_urls - indicates to use alternate covers from CV
|
||||
|
||||
# If there is no URL return 100
|
||||
if not primary_img_url:
|
||||
return Score(score=100, url="", hash=0)
|
||||
return Score(score=100, url="", remote_hash=0)
|
||||
|
||||
try:
|
||||
url_image_data = ImageFetcher(self.config.Runtime_Options__config.user_cache_dir).fetch(
|
||||
primary_img_url, blocking=True
|
||||
)
|
||||
except ImageFetcherException as e:
|
||||
self.log_msg(f"Network issue while fetching cover image from {self.talker.name}. Aborting...")
|
||||
raise IssueIdentifierNetworkError from e
|
||||
self._user_canceled()
|
||||
|
||||
if self.cancel:
|
||||
raise IssueIdentifierCancelled
|
||||
urls = [primary_img_url]
|
||||
if use_alt_urls:
|
||||
urls.extend(alt_urls)
|
||||
self.log_msg(f"[{len(alt_urls)} alt. covers]")
|
||||
|
||||
# alert the GUI, if needed
|
||||
if self.cover_url_callback is not None:
|
||||
self.cover_url_callback(url_image_data)
|
||||
|
||||
remote_cover_list = [Score(url=primary_img_url, hash=self.calculate_hash(url_image_data))]
|
||||
|
||||
if self.cancel:
|
||||
raise IssueIdentifierCancelled
|
||||
|
||||
if use_remote_alternates:
|
||||
for alt_url in alt_urls:
|
||||
try:
|
||||
alt_url_image_data = ImageFetcher(self.config.Runtime_Options__config.user_cache_dir).fetch(
|
||||
alt_url, blocking=True
|
||||
)
|
||||
except ImageFetcherException as e:
|
||||
self.log_msg(f"Network issue while fetching alt. cover image from {self.talker.name}. Aborting...")
|
||||
raise IssueIdentifierNetworkError from e
|
||||
|
||||
if self.cancel:
|
||||
raise IssueIdentifierCancelled
|
||||
|
||||
# alert the GUI, if needed
|
||||
if self.cover_url_callback is not None:
|
||||
self.cover_url_callback(alt_url_image_data)
|
||||
|
||||
remote_cover_list.append(Score(url=alt_url, hash=self.calculate_hash(alt_url_image_data)))
|
||||
|
||||
if self.cancel:
|
||||
raise IssueIdentifierCancelled
|
||||
|
||||
self.log_msg(f"[{len(remote_cover_list) - 1} alt. covers]")
|
||||
remote_hashes = self._get_remote_hashes(urls)
|
||||
|
||||
score_list = []
|
||||
done = False
|
||||
for local_cover_hash in local_cover_hash_list:
|
||||
for remote_cover_item in remote_cover_list:
|
||||
score = ImageHasher.hamming_distance(local_cover_hash, remote_cover_item["hash"])
|
||||
score_list.append(Score(score=score, url=remote_cover_item["url"], hash=remote_cover_item["hash"]))
|
||||
for local_hash in local_hashes:
|
||||
for remote_hash in remote_hashes:
|
||||
score = ImageHasher.hamming_distance(local_hash[1], remote_hash[1])
|
||||
score_list.append(
|
||||
Score(
|
||||
score=score,
|
||||
url=remote_hash[0],
|
||||
remote_hash=remote_hash[1],
|
||||
local_hash_name=local_hash[0],
|
||||
local_hash=local_hash[1],
|
||||
)
|
||||
)
|
||||
|
||||
self.log_msg(f" - {score:03}")
|
||||
|
||||
@ -369,167 +305,181 @@ class IssueIdentifier:
|
||||
|
||||
return best_score_item
|
||||
|
||||
def search(self) -> list[IssueResult]:
|
||||
ca = self.comic_archive
|
||||
self.match_list = []
|
||||
self.cancel = False
|
||||
self.search_result = self.result_no_matches
|
||||
def _check_requirements(self, ca: ComicArchive) -> bool:
|
||||
|
||||
if not pil_available:
|
||||
self.log_msg("Python Imaging Library (PIL) is not available and is needed for issue identification.")
|
||||
return self.match_list
|
||||
return False
|
||||
|
||||
if not ca.seems_to_be_a_comic_archive():
|
||||
self.log_msg(f"Sorry, but {ca.path} is not a comic archive!")
|
||||
return self.match_list
|
||||
return False
|
||||
return True
|
||||
|
||||
cover_image_data = ca.get_page(self.cover_page_index)
|
||||
cover_hash = self.calculate_hash(cover_image_data)
|
||||
def _process_cover(self, name: str, image_data: bytes) -> list[tuple[str, Image.Image]]:
|
||||
assert Image
|
||||
cover_image = Image.open(io.BytesIO(image_data))
|
||||
images = [(name, cover_image)]
|
||||
|
||||
# check the aspect ratio
|
||||
# if it's wider than it is high, it's probably a two page spread
|
||||
# if it's wider than it is high, it's probably a two page spread (back_cover, front_cover)
|
||||
# if so, crop it and calculate a second hash
|
||||
narrow_cover_hash = None
|
||||
aspect_ratio = self.get_aspect_ratio(cover_image_data)
|
||||
aspect_ratio = float(cover_image.height) / float(cover_image.width)
|
||||
if aspect_ratio < 1.0:
|
||||
right_side_image_data = self.crop_cover(cover_image_data)
|
||||
if right_side_image_data is not None:
|
||||
narrow_cover_hash = self.calculate_hash(right_side_image_data)
|
||||
im = self._crop_double_page(cover_image)
|
||||
if im is not None:
|
||||
images.append(("double page", im))
|
||||
|
||||
keys = self.get_search_keys()
|
||||
# normalize the issue number, None will return as ""
|
||||
keys["issue_number"] = IssueString(keys["issue_number"]).as_string()
|
||||
# Check and remove black borders. Helps in identifying comics with an excessive black border like https://comicvine.gamespot.com/marvel-graphic-novel-1-the-death-of-captain-marvel/4000-21782/
|
||||
cropped = self._crop_border(cover_image, self.config.Issue_Identifier__border_crop_percent)
|
||||
if cropped is not None:
|
||||
images.append(("black border cropped", cropped))
|
||||
|
||||
# we need, at minimum, a series and issue number
|
||||
if not (keys["series"] and keys["issue_number"]):
|
||||
self.log_msg("Not enough info for a search!")
|
||||
return []
|
||||
return images
|
||||
|
||||
def _get_images(self, ca: ComicArchive, md: GenericMetadata) -> list[tuple[str, Image.Image]]:
|
||||
covers: list[tuple[str, Image.Image]] = []
|
||||
for cover_index in md.get_cover_page_index_list():
|
||||
image_data = ca.get_page(cover_index)
|
||||
covers.extend(self._process_cover(f"{cover_index}", image_data))
|
||||
return covers
|
||||
|
||||
def _get_extra_images(self, ca: ComicArchive, md: GenericMetadata) -> list[tuple[str, Image.Image]]:
|
||||
assert md
|
||||
covers: list[tuple[str, Image.Image]] = []
|
||||
for cover_index in range(1, min(3, ca.get_number_of_pages())):
|
||||
image_data = ca.get_page(cover_index)
|
||||
covers.extend(self._process_cover(f"{cover_index}", image_data))
|
||||
return covers
|
||||
|
||||
def _get_search_keys(self, md: GenericMetadata) -> Any:
|
||||
search_keys = SearchKeys(
|
||||
series=md.series,
|
||||
issue_number=IssueString(md.issue).as_string(),
|
||||
alternate_number=IssueString(md.alternate_number).as_string(),
|
||||
month=md.month,
|
||||
year=md.year,
|
||||
issue_count=md.issue_count,
|
||||
alternate_count=md.alternate_count,
|
||||
publisher=md.publisher,
|
||||
imprint=md.imprint,
|
||||
)
|
||||
return search_keys
|
||||
|
||||
def _get_search_terms(
|
||||
self, ca: ComicArchive, md: GenericMetadata
|
||||
) -> tuple[SearchKeys, list[tuple[str, Image.Image]], list[tuple[str, Image.Image]]]:
|
||||
return self._get_search_keys(md), self._get_images(ca, md), self._get_extra_images(ca, md)
|
||||
|
||||
def _user_canceled(self, callback: Callable[..., Any] | None = None, *args: Any) -> Any:
|
||||
if self.cancel:
|
||||
raise IssueIdentifierCancelled
|
||||
if callback is not None:
|
||||
return callback(*args)
|
||||
|
||||
def _print_terms(self, keys: SearchKeys, images: list[tuple[str, Image.Image]]) -> None:
|
||||
assert keys["series"]
|
||||
assert keys["issue_number"]
|
||||
self.log_msg(f"Using {self.talker.name} to search for:")
|
||||
self.log_msg("\tSeries: " + keys["series"])
|
||||
self.log_msg("\tIssue: " + keys["issue_number"])
|
||||
if keys["issue_count"] is not None:
|
||||
self.log_msg("\tCount: " + str(keys["issue_count"]))
|
||||
if keys["year"] is not None:
|
||||
self.log_msg("\tYear: " + str(keys["year"]))
|
||||
# if keys["alternate_number"] is not None:
|
||||
# self.log_msg("\tAlternate Issue: " + str(keys["alternate_number"]))
|
||||
if keys["month"] is not None:
|
||||
self.log_msg("\tMonth: " + str(keys["month"]))
|
||||
if keys["year"] is not None:
|
||||
self.log_msg("\tYear: " + str(keys["year"]))
|
||||
if keys["issue_count"] is not None:
|
||||
self.log_msg("\tCount: " + str(keys["issue_count"]))
|
||||
# if keys["alternate_count"] is not None:
|
||||
# self.log_msg("\tAlternate Count: " + str(keys["alternate_count"]))
|
||||
# if keys["publisher"] is not None:
|
||||
# self.log_msg("\tPublisher: " + str(keys["publisher"]))
|
||||
# if keys["imprint"] is not None:
|
||||
# self.log_msg("\tImprint: " + str(keys["imprint"]))
|
||||
for name, _ in images:
|
||||
self.log_msg("Cover: " + name)
|
||||
|
||||
self.log_msg(f"Searching for {keys['series']} #{keys['issue_number']} ...")
|
||||
try:
|
||||
ct_search_results = self.talker.search_for_series(keys["series"])
|
||||
except TalkerError as e:
|
||||
self.log_msg(f"Error searching for series.\n{e}")
|
||||
return []
|
||||
|
||||
if self.cancel:
|
||||
return []
|
||||
def _filter_series(self, terms: SearchKeys, search_results: list[ComicSeries]) -> list[ComicSeries]:
|
||||
assert terms["series"]
|
||||
|
||||
if ct_search_results is None:
|
||||
return []
|
||||
|
||||
series_second_round_list = []
|
||||
|
||||
for item in ct_search_results:
|
||||
filtered_results = []
|
||||
for item in search_results:
|
||||
length_approved = False
|
||||
publisher_approved = True
|
||||
date_approved = True
|
||||
|
||||
# remove any series that starts after the issue year
|
||||
if keys["year"] is not None and item.start_year is not None:
|
||||
if keys["year"] < item.start_year:
|
||||
if terms["year"] is not None and item.start_year is not None:
|
||||
if terms["year"] < item.start_year:
|
||||
date_approved = False
|
||||
|
||||
for name in [item.name, *item.aliases]:
|
||||
if utils.titles_match(keys["series"], name, self.series_match_thresh):
|
||||
if utils.titles_match(terms["series"], name, self.series_match_thresh):
|
||||
length_approved = True
|
||||
break
|
||||
# remove any series from publishers on the filter
|
||||
if item.publisher is not None:
|
||||
publisher = item.publisher
|
||||
if publisher is not None and publisher.casefold() in self.publisher_filter:
|
||||
if item.publisher is not None and item.publisher.casefold() in self.publisher_filter:
|
||||
publisher_approved = False
|
||||
|
||||
if length_approved and publisher_approved and date_approved:
|
||||
series_second_round_list.append(item)
|
||||
|
||||
self.log_msg("Searching in " + str(len(series_second_round_list)) + " series")
|
||||
|
||||
if self.progress_callback is not None:
|
||||
self.progress_callback(0, len(series_second_round_list))
|
||||
|
||||
# now sort the list by name length
|
||||
series_second_round_list.sort(key=lambda x: len(x.name), reverse=False)
|
||||
|
||||
series_by_id = {series.id: series for series in series_second_round_list}
|
||||
|
||||
issue_list = None
|
||||
try:
|
||||
if len(series_by_id) > 0:
|
||||
issue_list = self.talker.fetch_issues_by_series_issue_num_and_year(
|
||||
list(series_by_id.keys()), keys["issue_number"], keys["year"]
|
||||
filtered_results.append(item)
|
||||
else:
|
||||
logger.debug(
|
||||
"Filtered out series: '%s' length approved: '%s', publisher approved: '%s', date approved: '%s'",
|
||||
item.name,
|
||||
length_approved,
|
||||
publisher_approved,
|
||||
date_approved,
|
||||
)
|
||||
except TalkerError as e:
|
||||
self.log_msg(f"Issue with while searching for series details. Aborting...\n{e}")
|
||||
return []
|
||||
return filtered_results
|
||||
|
||||
if issue_list is None:
|
||||
return []
|
||||
def _calculate_hashes(self, images: list[tuple[str, Image.Image]]) -> list[tuple[str, int]]:
|
||||
hashes = []
|
||||
for name, image in images:
|
||||
hashes.append((name, ImageHasher(image=image).average_hash()))
|
||||
return hashes
|
||||
|
||||
shortlist = []
|
||||
# now re-associate the issues and series
|
||||
# is this really needed?
|
||||
for issue in issue_list:
|
||||
if issue.series_id in series_by_id:
|
||||
shortlist.append((series_by_id[issue.series_id], issue))
|
||||
|
||||
if keys["year"] is None:
|
||||
self.log_msg(f"Found {len(shortlist)} series that have an issue #{keys['issue_number']}")
|
||||
else:
|
||||
self.log_msg(
|
||||
f"Found {len(shortlist)} series that have an issue #{keys['issue_number']} from {keys['year']}"
|
||||
)
|
||||
|
||||
# now we have a shortlist of series with the desired issue number
|
||||
# Do first round of cover matching
|
||||
counter = len(shortlist)
|
||||
for series, issue in shortlist:
|
||||
if self.progress_callback is not None:
|
||||
self.progress_callback(counter, len(shortlist) * 3)
|
||||
counter += 1
|
||||
def _match_covers(
|
||||
self,
|
||||
terms: SearchKeys,
|
||||
images: list[tuple[str, Image.Image]],
|
||||
issues: list[tuple[ComicSeries, GenericMetadata]],
|
||||
use_alternates: bool,
|
||||
) -> list[IssueResult]:
|
||||
assert terms["issue_number"]
|
||||
match_results: list[IssueResult] = []
|
||||
hashes = self._calculate_hashes(images)
|
||||
counter = 0
|
||||
alternate = ""
|
||||
if use_alternates:
|
||||
alternate = " Alternate"
|
||||
for series, issue in issues:
|
||||
self._user_canceled(self.progress_callback, counter, len(issues))
|
||||
counter += 1
|
||||
|
||||
self.log_msg(
|
||||
f"Examining covers for ID: {series.id} {series.name} ({series.start_year}):",
|
||||
f"Examining{alternate} covers for Series ID: {series.id} {series.name} ({series.start_year}):",
|
||||
)
|
||||
|
||||
# Now check the cover match against the primary image
|
||||
hash_list = [cover_hash]
|
||||
if narrow_cover_hash is not None:
|
||||
hash_list.append(narrow_cover_hash)
|
||||
|
||||
cropped_border = self.crop_border(cover_image_data, self.config.Issue_Identifier__border_crop_percent)
|
||||
if cropped_border is not None:
|
||||
hash_list.append(self.calculate_hash(cropped_border))
|
||||
logger.info("Adding cropped cover to the hashlist")
|
||||
|
||||
try:
|
||||
image_url = issue._cover_image or ""
|
||||
alt_urls = issue._alternate_images
|
||||
|
||||
score_item = self.get_issue_cover_match_score(
|
||||
image_url, alt_urls, hash_list, use_remote_alternates=False
|
||||
)
|
||||
score_item = self._get_issue_cover_match_score(image_url, alt_urls, hashes, use_alt_urls=use_alternates)
|
||||
except Exception:
|
||||
logger.exception("Scoring series failed")
|
||||
self.match_list = []
|
||||
return self.match_list
|
||||
logger.exception(f"Scoring series{alternate} covers failed")
|
||||
return []
|
||||
|
||||
match = IssueResult(
|
||||
series=f"{series.name} ({series.start_year})",
|
||||
distance=score_item["score"],
|
||||
issue_number=keys["issue_number"],
|
||||
cv_issue_count=series.count_of_issues,
|
||||
url_image_hash=score_item["hash"],
|
||||
issue_number=terms["issue_number"],
|
||||
issue_count=series.count_of_issues,
|
||||
url_image_hash=score_item["remote_hash"],
|
||||
issue_title=issue.title or "",
|
||||
issue_id=issue.issue_id or "",
|
||||
series_id=series.id,
|
||||
@ -543,142 +493,188 @@ class IssueIdentifier:
|
||||
if series.publisher is not None:
|
||||
match.publisher = series.publisher
|
||||
|
||||
self.match_list.append(match)
|
||||
match_results.append(match)
|
||||
|
||||
self.log_msg(f"best score {match.distance:03}")
|
||||
|
||||
self.log_msg("")
|
||||
return match_results
|
||||
|
||||
if len(self.match_list) == 0:
|
||||
def _print_match(self, item: IssueResult) -> None:
|
||||
self.log_msg(
|
||||
"-----> {} #{} {} ({}/{}) -- score: {}".format(
|
||||
item.series,
|
||||
item.issue_number,
|
||||
item.issue_title,
|
||||
item.month,
|
||||
item.year,
|
||||
item.distance,
|
||||
)
|
||||
)
|
||||
|
||||
def _search_for_issues(self, terms: SearchKeys) -> list[tuple[ComicSeries, GenericMetadata]]:
|
||||
try:
|
||||
search_results = self.talker.search_for_series(
|
||||
terms["series"],
|
||||
callback=lambda x, y: self._user_canceled(self.progress_callback, x, y),
|
||||
series_match_thresh=self.config.Issue_Identifier__series_match_search_thresh,
|
||||
)
|
||||
except TalkerError as e:
|
||||
self.log_msg(f"Error searching for series.\n{e}")
|
||||
return []
|
||||
# except IssueIdentifierCancelled:
|
||||
# return []
|
||||
|
||||
if not search_results:
|
||||
return []
|
||||
|
||||
filtered_series = self._filter_series(terms, search_results)
|
||||
if not filtered_series:
|
||||
return []
|
||||
|
||||
self.log_msg(f"Searching in {len(filtered_series)} series")
|
||||
|
||||
self._user_canceled(self.progress_callback, 0, len(filtered_series))
|
||||
|
||||
series_by_id = {series.id: series for series in filtered_series}
|
||||
|
||||
try:
|
||||
talker_result = self.talker.fetch_issues_by_series_issue_num_and_year(
|
||||
list(series_by_id.keys()), terms["issue_number"], terms["year"]
|
||||
)
|
||||
except TalkerError as e:
|
||||
self.log_msg(f"Issue with while searching for series details. Aborting...\n{e}")
|
||||
return []
|
||||
# except IssueIdentifierCancelled:
|
||||
# return []
|
||||
|
||||
if not talker_result:
|
||||
return []
|
||||
|
||||
self._user_canceled(self.progress_callback, 0, 0)
|
||||
|
||||
issues: list[tuple[ComicSeries, GenericMetadata]] = []
|
||||
|
||||
# now re-associate the issues and series
|
||||
for issue in talker_result:
|
||||
if issue.series_id in series_by_id:
|
||||
issues.append((series_by_id[issue.series_id], issue))
|
||||
else:
|
||||
logger.warning("Talker '%s' is returning arbitrary series when searching by id", self.talker.id)
|
||||
return issues
|
||||
|
||||
def _cover_matching(
|
||||
self,
|
||||
terms: SearchKeys,
|
||||
images: list[tuple[str, Image.Image]],
|
||||
extra_images: list[tuple[str, Image.Image]],
|
||||
issues: list[tuple[ComicSeries, GenericMetadata]],
|
||||
) -> list[IssueResult]:
|
||||
cover_matching_1 = self._match_covers(terms, images, issues, use_alternates=False)
|
||||
|
||||
if len(cover_matching_1) == 0:
|
||||
self.log_msg(":-( no matches!")
|
||||
self.search_result = self.result_no_matches
|
||||
return self.match_list
|
||||
return cover_matching_1
|
||||
|
||||
# sort list by image match scores
|
||||
self.match_list.sort(key=attrgetter("distance"))
|
||||
cover_matching_1.sort(key=attrgetter("distance"))
|
||||
|
||||
lst = []
|
||||
for i in self.match_list:
|
||||
for i in cover_matching_1:
|
||||
lst.append(i.distance)
|
||||
|
||||
self.log_msg(f"Compared to covers in {len(self.match_list)} issue(s): {lst}")
|
||||
self.log_msg(f"Compared to covers in {len(cover_matching_1)} issue(s): {lst}")
|
||||
|
||||
def print_match(item: IssueResult) -> None:
|
||||
self.log_msg(
|
||||
"-----> {} #{} {} ({}/{}) -- score: {}".format(
|
||||
item.series,
|
||||
item.issue_number,
|
||||
item.issue_title,
|
||||
item.month,
|
||||
item.year,
|
||||
item.distance,
|
||||
)
|
||||
)
|
||||
|
||||
best_score: int = self.match_list[0].distance
|
||||
|
||||
if best_score >= self.min_score_thresh:
|
||||
cover_matching_2 = []
|
||||
final_cover_matching = cover_matching_1
|
||||
if cover_matching_1[0].distance >= self.min_score_thresh:
|
||||
# we have 1 or more low-confidence matches (all bad cover scores)
|
||||
# look at a few more pages in the archive, and also alternate covers online
|
||||
self.log_msg("Very weak scores for the cover. Analyzing alternate pages and covers...")
|
||||
hash_list = [cover_hash]
|
||||
if narrow_cover_hash is not None:
|
||||
hash_list.append(narrow_cover_hash)
|
||||
for page_index in range(1, min(3, ca.get_number_of_pages())):
|
||||
image_data = ca.get_page(page_index)
|
||||
page_hash = self.calculate_hash(image_data)
|
||||
hash_list.append(page_hash)
|
||||
|
||||
second_match_list = []
|
||||
counter = 2 * len(self.match_list)
|
||||
for m in self.match_list:
|
||||
if self.progress_callback is not None:
|
||||
self.progress_callback(counter, len(self.match_list) * 3)
|
||||
counter += 1
|
||||
self.log_msg(f"Examining alternate covers for ID: {m.series_id} {m.series}:")
|
||||
try:
|
||||
score_item = self.get_issue_cover_match_score(
|
||||
m.image_url,
|
||||
m.alt_image_urls,
|
||||
hash_list,
|
||||
use_remote_alternates=True,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("failed examining alt covers")
|
||||
self.match_list = []
|
||||
return self.match_list
|
||||
self.log_msg(f"--->{score_item['score']}")
|
||||
self.log_msg("")
|
||||
temp = self._match_covers(terms, images + extra_images, issues, use_alternates=True)
|
||||
for score in temp:
|
||||
if score.distance < self.min_alternate_score_thresh:
|
||||
cover_matching_2.append(score)
|
||||
|
||||
if score_item["score"] < self.min_alternate_score_thresh:
|
||||
second_match_list.append(m)
|
||||
m.distance = score_item["score"]
|
||||
if len(cover_matching_2) > 0:
|
||||
# We did good, found something!
|
||||
self.log_msg("Success in secondary/alternate cover matching!")
|
||||
|
||||
if len(second_match_list) == 0:
|
||||
if len(self.match_list) == 1:
|
||||
self.log_msg("No matching pages in the issue.")
|
||||
self.log_msg("--------------------------------------------------------------------------")
|
||||
print_match(self.match_list[0])
|
||||
self.log_msg("--------------------------------------------------------------------------")
|
||||
self.search_result = self.result_found_match_but_bad_cover_score
|
||||
else:
|
||||
self.log_msg("--------------------------------------------------------------------------")
|
||||
self.log_msg("Multiple bad cover matches! Need to use other info...")
|
||||
self.log_msg("--------------------------------------------------------------------------")
|
||||
self.search_result = self.result_multiple_matches_with_bad_image_scores
|
||||
return self.match_list
|
||||
|
||||
# We did good, found something!
|
||||
self.log_msg("Success in secondary/alternate cover matching!")
|
||||
|
||||
self.match_list = second_match_list
|
||||
# sort new list by image match scores
|
||||
self.match_list.sort(key=attrgetter("distance"))
|
||||
best_score = self.match_list[0].distance
|
||||
self.log_msg("[Second round cover matching: best score = {best_score}]")
|
||||
# now drop down into the rest of the processing
|
||||
|
||||
if self.progress_callback is not None:
|
||||
self.progress_callback(99, 100)
|
||||
final_cover_matching = cover_matching_2
|
||||
# sort new list by image match scores
|
||||
final_cover_matching.sort(key=attrgetter("distance"))
|
||||
self.log_msg("[Second round cover matching: best score = {best_score}]")
|
||||
# now drop down into the rest of the processing
|
||||
|
||||
best_score = final_cover_matching[0].distance
|
||||
# now pare down list, remove any item more than specified distant from the top scores
|
||||
for match_item in reversed(self.match_list):
|
||||
if match_item.distance > best_score + self.min_score_distance:
|
||||
self.match_list.remove(match_item)
|
||||
for match_item in reversed(final_cover_matching):
|
||||
if match_item.distance > (best_score + self.min_score_distance):
|
||||
final_cover_matching.remove(match_item)
|
||||
return final_cover_matching
|
||||
|
||||
def identify(self, ca: ComicArchive, md: GenericMetadata) -> tuple[int, list[IssueResult]]:
|
||||
if not self._check_requirements(ca):
|
||||
return self.result_no_matches, []
|
||||
|
||||
terms, images, extra_images = self._get_search_terms(ca, md)
|
||||
|
||||
# we need, at minimum, a series and issue number
|
||||
if not (terms["series"] and terms["issue_number"]):
|
||||
self.log_msg("Not enough info for a search!")
|
||||
return self.result_no_matches, []
|
||||
|
||||
self._print_terms(terms, images)
|
||||
|
||||
issues = self._search_for_issues(terms)
|
||||
|
||||
self.log_msg(f"Found {len(issues)} series that have an issue #{terms['issue_number']}")
|
||||
|
||||
final_cover_matching = self._cover_matching(terms, images, extra_images, issues)
|
||||
|
||||
# One more test for the case choosing limited series first issue vs a trade with the same cover:
|
||||
# if we have a given issue count > 1 and the series from CV has count==1, remove it from match list
|
||||
if len(self.match_list) >= 2 and keys["issue_count"] is not None and keys["issue_count"] != 1:
|
||||
new_list = []
|
||||
for match in self.match_list:
|
||||
if match.cv_issue_count != 1:
|
||||
new_list.append(match)
|
||||
else:
|
||||
if len(final_cover_matching) > 1 and terms["issue_count"] is not None and terms["issue_count"] != 1:
|
||||
for match in final_cover_matching.copy():
|
||||
if match.issue_count == 1:
|
||||
self.log_msg(
|
||||
f"Removing series {match.series} [{match.series_id}] from consideration (only 1 issue)"
|
||||
)
|
||||
final_cover_matching.remove(match)
|
||||
|
||||
if len(new_list) > 0:
|
||||
self.match_list = new_list
|
||||
|
||||
if len(self.match_list) == 1:
|
||||
self.log_msg("--------------------------------------------------------------------------")
|
||||
print_match(self.match_list[0])
|
||||
self.log_msg("--------------------------------------------------------------------------")
|
||||
self.search_result = self.result_one_good_match
|
||||
|
||||
elif len(self.match_list) == 0:
|
||||
self.log_msg("--------------------------------------------------------------------------")
|
||||
self.log_msg("No matches found :(")
|
||||
self.log_msg("--------------------------------------------------------------------------")
|
||||
self.search_result = self.result_no_matches
|
||||
best_score = final_cover_matching[0].distance
|
||||
if best_score >= self.min_score_thresh:
|
||||
if len(final_cover_matching) == 1:
|
||||
self.log_msg("No matching pages in the issue.")
|
||||
self.log_msg("--------------------------------------------------------------------------")
|
||||
self._print_match(final_cover_matching[0])
|
||||
self.log_msg("--------------------------------------------------------------------------")
|
||||
search_result = self.result_found_match_but_bad_cover_score
|
||||
else:
|
||||
self.log_msg("--------------------------------------------------------------------------")
|
||||
self.log_msg("Multiple bad cover matches! Need to use other info...")
|
||||
self.log_msg("--------------------------------------------------------------------------")
|
||||
search_result = self.result_multiple_matches_with_bad_image_scores
|
||||
else:
|
||||
# we've got multiple good matches:
|
||||
self.log_msg("More than one likely candidate.")
|
||||
self.search_result = self.result_multiple_good_matches
|
||||
self.log_msg("--------------------------------------------------------------------------")
|
||||
for match_item in self.match_list:
|
||||
print_match(match_item)
|
||||
self.log_msg("--------------------------------------------------------------------------")
|
||||
if len(final_cover_matching) == 1:
|
||||
self.log_msg("--------------------------------------------------------------------------")
|
||||
self._print_match(final_cover_matching[0])
|
||||
self.log_msg("--------------------------------------------------------------------------")
|
||||
search_result = self.result_one_good_match
|
||||
|
||||
return self.match_list
|
||||
elif len(self.match_list) == 0:
|
||||
self.log_msg("--------------------------------------------------------------------------")
|
||||
self.log_msg("No matches found :(")
|
||||
self.log_msg("--------------------------------------------------------------------------")
|
||||
search_result = self.result_no_matches
|
||||
else:
|
||||
# we've got multiple good matches:
|
||||
self.log_msg("More than one likely candidate.")
|
||||
search_result = self.result_multiple_good_matches
|
||||
self.log_msg("--------------------------------------------------------------------------")
|
||||
for match_item in final_cover_matching:
|
||||
self._print_match(match_item)
|
||||
self.log_msg("--------------------------------------------------------------------------")
|
||||
|
||||
return search_result, final_cover_matching
|
||||
|
@ -53,7 +53,7 @@ class IssueResult:
|
||||
series: str
|
||||
distance: int
|
||||
issue_number: str
|
||||
cv_issue_count: int | None
|
||||
issue_count: int | None
|
||||
url_image_hash: int
|
||||
issue_title: str
|
||||
issue_id: str
|
||||
|
@ -33,6 +33,7 @@ from comictaggerlib.issueidentifier import IssueIdentifier
|
||||
from comictaggerlib.issueselectionwindow import IssueSelectionWindow
|
||||
from comictaggerlib.matchselectionwindow import MatchSelectionWindow
|
||||
from comictaggerlib.progresswindow import IDProgressWindow
|
||||
from comictaggerlib.resulttypes import IssueResult
|
||||
from comictaggerlib.ui import qtutils, ui_path
|
||||
from comictaggerlib.ui.qtutils import new_web_view, reduce_widget_font_size
|
||||
from comictalker.comictalker import ComicTalker, TalkerError
|
||||
@ -76,15 +77,17 @@ class SearchThread(QtCore.QThread):
|
||||
|
||||
|
||||
class IdentifyThread(QtCore.QThread):
|
||||
identifyComplete = pyqtSignal()
|
||||
identifyComplete = pyqtSignal((int, list))
|
||||
identifyLogMsg = pyqtSignal(str)
|
||||
identifyProgress = pyqtSignal(int, int)
|
||||
|
||||
def __init__(self, identifier: IssueIdentifier) -> None:
|
||||
def __init__(self, identifier: IssueIdentifier, ca: ComicArchive, md: GenericMetadata) -> None:
|
||||
QtCore.QThread.__init__(self)
|
||||
self.identifier = identifier
|
||||
self.identifier.set_output_function(self.log_output)
|
||||
self.identifier.set_progress_callback(self.progress_callback)
|
||||
self.ca = ca
|
||||
self.md = md
|
||||
|
||||
def log_output(self, text: str) -> None:
|
||||
self.identifyLogMsg.emit(str(text))
|
||||
@ -93,8 +96,7 @@ class IdentifyThread(QtCore.QThread):
|
||||
self.identifyProgress.emit(cur, total)
|
||||
|
||||
def run(self) -> None:
|
||||
self.identifier.search()
|
||||
self.identifyComplete.emit()
|
||||
self.identifyComplete.emit(*self.identifier.identify(self.ca, self.md))
|
||||
|
||||
|
||||
class SeriesSelectionWindow(QtWidgets.QDialog):
|
||||
@ -245,12 +247,12 @@ class SeriesSelectionWindow(QtWidgets.QDialog):
|
||||
md.year = self.year
|
||||
md.issue_count = self.issue_count
|
||||
|
||||
self.ii.set_additional_metadata(md)
|
||||
self.ii.only_use_additional_meta_data = True
|
||||
# self.ii.set_additional_metadata(md)
|
||||
# self.ii.only_use_additional_meta_data = True
|
||||
|
||||
self.ii.cover_page_index = int(self.cover_index_list[0])
|
||||
# self.ii.cover_page_index = int(self.cover_index_list[0])
|
||||
|
||||
self.id_thread = IdentifyThread(self.ii)
|
||||
self.id_thread = IdentifyThread(self.ii, self.comic_archive, md)
|
||||
self.id_thread.identifyComplete.connect(self.identify_complete)
|
||||
self.id_thread.identifyLogMsg.connect(self.log_id_output)
|
||||
self.id_thread.identifyProgress.connect(self.identify_progress)
|
||||
@ -276,35 +278,33 @@ class SeriesSelectionWindow(QtWidgets.QDialog):
|
||||
if self.ii is not None:
|
||||
self.ii.cancel = True
|
||||
|
||||
def identify_complete(self) -> None:
|
||||
if self.ii is not None and self.iddialog is not None and self.comic_archive is not None:
|
||||
matches = self.ii.match_list
|
||||
result = self.ii.search_result
|
||||
def identify_complete(self, result: int, issues: list[IssueResult]) -> None:
|
||||
if self.iddialog is not None and self.comic_archive is not None:
|
||||
|
||||
found_match = None
|
||||
choices = False
|
||||
if result == self.ii.result_no_matches:
|
||||
QtWidgets.QMessageBox.information(self, "Auto-Select Result", " No matches found :-(")
|
||||
elif result == self.ii.result_found_match_but_bad_cover_score:
|
||||
if result == IssueIdentifier.result_no_matches:
|
||||
QtWidgets.QMessageBox.information(self, "Auto-Select Result", " No issues found :-(")
|
||||
elif result == IssueIdentifier.result_found_match_but_bad_cover_score:
|
||||
QtWidgets.QMessageBox.information(
|
||||
self,
|
||||
"Auto-Select Result",
|
||||
" Found a match, but cover doesn't seem the same. Verify before committing!",
|
||||
)
|
||||
found_match = matches[0]
|
||||
elif result == self.ii.result_found_match_but_not_first_page:
|
||||
found_match = issues[0]
|
||||
elif result == IssueIdentifier.result_found_match_but_not_first_page:
|
||||
QtWidgets.QMessageBox.information(
|
||||
self, "Auto-Select Result", " Found a match, but not with the first page of the archive."
|
||||
)
|
||||
found_match = matches[0]
|
||||
elif result == self.ii.result_multiple_matches_with_bad_image_scores:
|
||||
found_match = issues[0]
|
||||
elif result == IssueIdentifier.result_multiple_matches_with_bad_image_scores:
|
||||
QtWidgets.QMessageBox.information(
|
||||
self, "Auto-Select Result", " Found some possibilities, but no confidence. Proceed manually."
|
||||
)
|
||||
choices = True
|
||||
elif result == self.ii.result_one_good_match:
|
||||
found_match = matches[0]
|
||||
elif result == self.ii.result_multiple_good_matches:
|
||||
elif result == IssueIdentifier.result_one_good_match:
|
||||
found_match = issues[0]
|
||||
elif result == IssueIdentifier.result_multiple_good_matches:
|
||||
QtWidgets.QMessageBox.information(
|
||||
self, "Auto-Select Result", " Found multiple likely matches. Please select."
|
||||
)
|
||||
@ -312,7 +312,7 @@ class SeriesSelectionWindow(QtWidgets.QDialog):
|
||||
|
||||
if choices:
|
||||
selector = MatchSelectionWindow(
|
||||
self, matches, self.comic_archive, talker=self.talker, config=self.config
|
||||
self, issues, self.comic_archive, talker=self.talker, config=self.config
|
||||
)
|
||||
selector.setModal(True)
|
||||
selector.exec()
|
||||
|
@ -1759,17 +1759,15 @@ class TaggerWindow(QtWidgets.QMainWindow):
|
||||
md.issue = "1"
|
||||
else:
|
||||
md.issue = utils.xlate(md.volume)
|
||||
ii.set_additional_metadata(md)
|
||||
ii.only_use_additional_meta_data = True
|
||||
# ii.set_additional_metadata(md)
|
||||
# ii.only_use_additional_meta_data = True
|
||||
ii.set_output_function(self.auto_tag_log)
|
||||
ii.cover_page_index = md.get_cover_page_index_list()[0]
|
||||
# ii.cover_page_index = md.get_cover_page_index_list()[0]
|
||||
if self.atprogdialog is not None:
|
||||
ii.set_cover_url_callback(self.atprogdialog.set_test_image)
|
||||
ii.set_name_series_match_threshold(dlg.name_length_match_tolerance)
|
||||
|
||||
matches: list[IssueResult] = ii.search()
|
||||
|
||||
result = ii.search_result
|
||||
result, matches = ii.identify(ca, md)
|
||||
|
||||
found_match = False
|
||||
choices = False
|
||||
|
@ -78,33 +78,17 @@ metadata = [
|
||||
|
||||
metadata_keys = [
|
||||
(
|
||||
comicapi.genericmetadata.GenericMetadata(),
|
||||
comicapi.genericmetadata.md_test,
|
||||
{
|
||||
"issue_count": 6,
|
||||
"issue_number": "1",
|
||||
"month": 10,
|
||||
"series": "Cory Doctorow's Futuristic Tales of the Here and Now",
|
||||
"year": 2007,
|
||||
},
|
||||
),
|
||||
(
|
||||
comicapi.genericmetadata.GenericMetadata(series="test"),
|
||||
{
|
||||
"issue_count": 6,
|
||||
"issue_number": "1",
|
||||
"month": 10,
|
||||
"series": "test",
|
||||
"year": 2007,
|
||||
},
|
||||
),
|
||||
(
|
||||
comicapi.genericmetadata.GenericMetadata(series="test", issue="3"),
|
||||
{
|
||||
"issue_count": 6,
|
||||
"issue_number": "3",
|
||||
"month": 10,
|
||||
"series": "test",
|
||||
"year": 2007,
|
||||
"alternate_count": 7,
|
||||
"alternate_number": "2",
|
||||
"imprint": "craphound.com",
|
||||
"publisher": "IDW Publishing",
|
||||
},
|
||||
),
|
||||
]
|
||||
|
@ -5,6 +5,7 @@ import io
|
||||
import pytest
|
||||
from PIL import Image
|
||||
|
||||
import comictaggerlib.imagehasher
|
||||
import comictaggerlib.issueidentifier
|
||||
import testing.comicdata
|
||||
import testing.comicvine
|
||||
@ -13,12 +14,16 @@ from comictaggerlib.resulttypes import IssueResult
|
||||
|
||||
def test_crop(cbz_double_cover, config, tmp_path, comicvine_api):
|
||||
config, definitions = config
|
||||
ii = comictaggerlib.issueidentifier.IssueIdentifier(cbz_double_cover, config, comicvine_api)
|
||||
cropped = ii.crop_cover(cbz_double_cover.archiver.read_file("double_cover.jpg"))
|
||||
original_cover = cbz_double_cover.get_page(0)
|
||||
|
||||
original_hash = ii.calculate_hash(original_cover)
|
||||
cropped_hash = ii.calculate_hash(cropped)
|
||||
ii = comictaggerlib.issueidentifier.IssueIdentifier(cbz_double_cover, config, comicvine_api)
|
||||
|
||||
im = Image.open(io.BytesIO(cbz_double_cover.archiver.read_file("double_cover.jpg")))
|
||||
|
||||
cropped = ii._crop_double_page(im)
|
||||
original = cbz_double_cover.get_page(0)
|
||||
|
||||
original_hash = comictaggerlib.imagehasher.ImageHasher(data=original).average_hash()
|
||||
cropped_hash = comictaggerlib.imagehasher.ImageHasher(image=cropped).average_hash()
|
||||
|
||||
assert original_hash == cropped_hash
|
||||
|
||||
@ -27,23 +32,24 @@ def test_crop(cbz_double_cover, config, tmp_path, comicvine_api):
|
||||
def test_get_search_keys(cbz, config, additional_md, expected, comicvine_api):
|
||||
config, definitions = config
|
||||
ii = comictaggerlib.issueidentifier.IssueIdentifier(cbz, config, comicvine_api)
|
||||
ii.set_additional_metadata(additional_md)
|
||||
|
||||
assert expected == ii.get_search_keys()
|
||||
assert expected == ii._get_search_keys(additional_md)
|
||||
|
||||
|
||||
def test_get_issue_cover_match_score(cbz, config, comicvine_api):
|
||||
config, definitions = config
|
||||
ii = comictaggerlib.issueidentifier.IssueIdentifier(cbz, config, comicvine_api)
|
||||
score = ii.get_issue_cover_match_score(
|
||||
score = ii._get_issue_cover_match_score(
|
||||
"https://comicvine.gamespot.com/a/uploads/scale_large/0/574/585444-109004_20080707014047_large.jpg",
|
||||
["https://comicvine.gamespot.com/cory-doctorows-futuristic-tales-of-the-here-and-no/4000-140529/"],
|
||||
[ii.calculate_hash(cbz.get_page(0))],
|
||||
[("Cover 1", ii.calculate_hash(cbz.get_page(0)))],
|
||||
)
|
||||
expected = {
|
||||
"hash": 212201432349720,
|
||||
"remote_hash": 212201432349720,
|
||||
"score": 0,
|
||||
"url": "https://comicvine.gamespot.com/a/uploads/scale_large/0/574/585444-109004_20080707014047_large.jpg",
|
||||
"local_hash": 212201432349720,
|
||||
"local_hash_name": "Cover 1",
|
||||
}
|
||||
assert expected == score
|
||||
|
||||
@ -51,13 +57,13 @@ def test_get_issue_cover_match_score(cbz, config, comicvine_api):
|
||||
def test_search(cbz, config, comicvine_api):
|
||||
config, definitions = config
|
||||
ii = comictaggerlib.issueidentifier.IssueIdentifier(cbz, config, comicvine_api)
|
||||
results = ii.search()
|
||||
result, issues = ii.identify(cbz, cbz.read_metadata("cr"))
|
||||
cv_expected = IssueResult(
|
||||
series=f"{testing.comicvine.cv_volume_result['results']['name']} ({testing.comicvine.cv_volume_result['results']['start_year']})",
|
||||
distance=0,
|
||||
issue_number=testing.comicvine.cv_issue_result["results"]["issue_number"],
|
||||
alt_image_urls=[],
|
||||
cv_issue_count=testing.comicvine.cv_volume_result["results"]["count_of_issues"],
|
||||
issue_count=testing.comicvine.cv_volume_result["results"]["count_of_issues"],
|
||||
issue_title=testing.comicvine.cv_issue_result["results"]["name"],
|
||||
issue_id=str(testing.comicvine.cv_issue_result["results"]["id"]),
|
||||
series_id=str(testing.comicvine.cv_volume_result["results"]["id"]),
|
||||
@ -68,7 +74,7 @@ def test_search(cbz, config, comicvine_api):
|
||||
description=testing.comicvine.cv_issue_result["results"]["description"],
|
||||
url_image_hash=212201432349720,
|
||||
)
|
||||
for r, e in zip(results, [cv_expected]):
|
||||
for r, e in zip(issues, [cv_expected]):
|
||||
assert r == e
|
||||
|
||||
|
||||
@ -80,14 +86,10 @@ def test_crop_border(cbz, config, comicvine_api):
|
||||
bg = Image.new("RGBA", (100, 100), (0, 0, 0, 255))
|
||||
fg = Image.new("RGBA", (50, 50), (255, 255, 255, 255))
|
||||
bg.paste(fg, (bg.width // 2 - (fg.width // 2), bg.height // 2 - (fg.height // 2)))
|
||||
output = io.BytesIO()
|
||||
bg.save(output, format="PNG")
|
||||
image_data = output.getvalue()
|
||||
output.close()
|
||||
|
||||
cropped = ii.crop_border(image_data, 49)
|
||||
cropped = ii._crop_border(bg, 49)
|
||||
|
||||
im = Image.open(io.BytesIO(cropped))
|
||||
assert im.width == fg.width
|
||||
assert im.height == fg.height
|
||||
assert list(im.getdata()) == list(fg.getdata())
|
||||
assert cropped
|
||||
assert cropped.width == fg.width
|
||||
assert cropped.height == fg.height
|
||||
assert list(cropped.getdata()) == list(fg.getdata())
|
||||
|
Loading…
Reference in New Issue
Block a user