Finish implementing quick_tag with simple results

This commit is contained in:
Timmy Welch 2024-08-10 19:46:53 -07:00
parent 2cbbaa0d65
commit a9630ac31e
2 changed files with 204 additions and 120 deletions

View File

@ -1,11 +1,19 @@
from typing import Collection, Sequence from __future__ import annotations
import argparse
import pathlib
import sys
from typing import Collection
from typing import Sequence
import imagehash
import numpy
from PIL import Image from PIL import Image
import argparse,pathlib,numpy,imagehash,sys
ap = argparse.ArgumentParser() ap = argparse.ArgumentParser()
ap.add_argument("--file", type=pathlib.Path) ap.add_argument('--file', type=pathlib.Path)
ap.add_argument("--debug", action='store_true') ap.add_argument('--debug', action='store_true')
opts = ap.parse_args() opts = ap.parse_args()
opts.file = pathlib.Path(opts.file) opts.file = pathlib.Path(opts.file)
@ -24,7 +32,7 @@ def print_image(image: Image.Image) -> None:
if isinstance(i, Collection): if isinstance(i, Collection):
print('{ ', end='', file=sys.stderr) print('{ ', end='', file=sys.stderr)
for idx, x in enumerate(i): for idx, x in enumerate(i):
if idx == len(i)-1: if idx == len(i) - 1:
print(f'{int(x):03d} ', end='', file=sys.stderr) print(f'{int(x):03d} ', end='', file=sys.stderr)
else: else:
print(f'{int(x):03d}, ', end='', file=sys.stderr) print(f'{int(x):03d}, ', end='', file=sys.stderr)
@ -33,28 +41,29 @@ def print_image(image: Image.Image) -> None:
print(f'{int(i):03d}, ', end='', file=sys.stderr) print(f'{int(i):03d}, ', end='', file=sys.stderr)
print(']', file=sys.stderr) print(']', file=sys.stderr)
def bin_str(hash): def bin_str(hash):
return ''.join(str(b) for b in 1 * hash.hash.flatten()) return ''.join(str(b) for b in 1 * hash.hash.flatten())
if opts.debug: if opts.debug:
image.save("py.rgb.png") image.save('py.rgb.png')
print("rgb", file=sys.stderr) print('rgb', file=sys.stderr)
print_image(image) print_image(image)
print(file=sys.stderr) print(file=sys.stderr)
if opts.debug: if opts.debug:
gray.save("py.gray.png") gray.save('py.gray.png')
print("gray", file=sys.stderr) print('gray', file=sys.stderr)
print_image(gray) print_image(gray)
print(file=sys.stderr) print(file=sys.stderr)
if opts.debug: if opts.debug:
resized.save("py.resized.png") resized.save('py.resized.png')
print("resized", file=sys.stderr) print('resized', file=sys.stderr)
print_image(resized) print_image(resized)
print(file=sys.stderr) print(file=sys.stderr)
print('ahash: ', bin_str(imagehash.average_hash(image))) print('ahash: ', str(imagehash.average_hash(image)))
print('dhash: ', bin_str(imagehash.dhash(image))) print('dhash: ', str(imagehash.dhash(image)))
print('phash: ', bin_str(imagehash.phash(image))) print('phash: ', str(imagehash.phash(image)))

View File

@ -1,91 +1,161 @@
from __future__ import annotations
import argparse import argparse
import itertools
import logging import logging
import pathlib
from datetime import datetime
from io import BytesIO
from typing import TypedDict from typing import TypedDict
from urllib.parse import urljoin from urllib.parse import urljoin
from PIL import Image
import appdirs import appdirs
from comicapi.genericmetadata import GenericMetadata import comictaggerlib.cli
import pathlib, imagehash, requests import imagehash
import settngs, comictaggerlib.cli import requests
from io import BytesIO import settngs
from comicapi import comicarchive, merge from comicapi import comicarchive
from comicapi import merge
from datetime import datetime
from comicapi import utils from comicapi import utils
from comicapi.genericmetadata import GenericMetadata from comicapi.genericmetadata import GenericMetadata
from comictaggerlib import ctversion from comicapi.issuestring import IssueString
from comictaggerlib.cbltransformer import CBLTransformer
from comictaggerlib.ctsettings.settngs_namespace import SettngsNS
from comictalker.talker_utils import cleanup_html from comictalker.talker_utils import cleanup_html
from PIL import Image
logger = logging.getLogger("quick_tag") logger = logging.getLogger('quick_tag')
__version__ = '0.1' __version__ = '0.1'
class SimpleResult(TypedDict): class SimpleResult(TypedDict):
Distance: int Distance: int
IDList: dict[str, list[str]] # Mapping of domains (eg comicvine.gamespot.com) to IDs # Mapping of domains (eg comicvine.gamespot.com) to IDs
IDList: dict[str, list[str]]
def settings(manager: settngs.Manager): def settings(manager: settngs.Manager):
manager.add_setting("--url", '-u', default='https://comic-hasher.narnian.us', type=utils.parse_url, help='Website to use for searching cover hashes') manager.add_setting(
manager.add_setting("--max","-m", default=8, type=int, help='Maximum score to allow. Lower score means more accurate') '--url', '-u', default='https://comic-hasher.narnian.us',
manager.add_setting("--simple", "-s", default=True, action=argparse.BooleanOptionalAction, help='Whether to retrieve simple results or full results') type=utils.parse_url, help='Website to use for searching cover hashes',
manager.add_setting("--force-interactive", "-f", default=True, action=argparse.BooleanOptionalAction, help='When not set will automatically tag comics that have a single match with a score of 4 or lower') )
manager.add_setting("--cv-api-key", "-c") manager.add_setting(
manager.add_setting("comic_archive", type=pathlib.Path) '--max', '-m', default=8, type=int,
help='Maximum score to allow. Lower score means more accurate',
)
manager.add_setting(
'--simple', '-s', default=True, action=argparse.BooleanOptionalAction,
help='Whether to retrieve simple results or full results',
)
manager.add_setting(
'--force-interactive', '-f', default=True, action=argparse.BooleanOptionalAction,
help='When not set will automatically tag comics that have a single match with a score of 4 or lower',
)
manager.add_setting(
'--aggressive-filtering', '-a', default=False, action=argparse.BooleanOptionalAction,
help='Will filter out worse matches if better matches are found',
)
manager.add_setting('--cv-api-key', '-c')
manager.add_setting('comic_archive', type=pathlib.Path)
def SearchHashes(url: str, simple: bool, max: int, ahash: str, dhash: str, phash: str) -> list[SimpleResult]: def SearchHashes(url: str, simple: bool, max: int, ahash: str, dhash: str, phash: str) -> list[SimpleResult]:
resp = requests.get(urljoin(url, '/match_cover_hash'), {"simple": simple, "max": max, "ahash":ahash, "dhash": dhash, "phash": phash}) resp = requests.get(
if resp.status_code != 200: urljoin(url, '/match_cover_hash'),
logger.error("bad response from server: %s", resp.text) {
raise SystemExit(3) 'simple': simple,
return resp.json() 'max': max,
'ahash': ahash,
'dhash': dhash,
'phash': phash,
},
)
if resp.status_code != 200:
logger.error('bad response from server: %s', resp.text)
raise SystemExit(3)
return resp.json()
def get_simple_results(results: list[SimpleResult], cv_api_key: str | None = None) -> list[tuple[int, GenericMetadata]]: def get_simple_results(results: list[SimpleResult], cv_api_key: str | None = None) -> list[tuple[int, GenericMetadata]]:
from comictalker.talkers.comicvine import ComicVineTalker from comictalker.talkers.comicvine import ComicVineTalker
cache_dir = pathlib.Path(appdirs.user_cache_dir('quick_tag')) cache_dir = pathlib.Path(appdirs.user_cache_dir('quick_tag'))
cache_dir.mkdir(parents=True, exist_ok=True) cache_dir.mkdir(parents=True, exist_ok=True)
cv = ComicVineTalker(f"quick_tag/{__version__}",cache_dir) cv = ComicVineTalker(f"quick_tag/{__version__}", cache_dir)
cv.parse_settings({"comicvine_key": cv_api_key}) cv.parse_settings({
md_results: list[tuple[int, GenericMetadata]] = [] 'comicvine_key': cv_api_key,
results.sort(key=lambda r: r['Distance']) 'cv_use_series_start_as_volume': True,
for result in results: })
for cv_id in result['IDList']['comicvine.gamespot.com']: md_results: list[tuple[int, GenericMetadata]] = []
md_results.append((result['Distance'], cv.fetch_comic_data(issue_id=cv_id))) results.sort(key=lambda r: r['Distance'])
return md_results for result in results:
for cv_id in result['IDList']['comicvine.gamespot.com']:
for md in cv.fetch_comics(issue_ids=result['IDList']['comicvine.gamespot.com']):
md_results.append((result['Distance'], md))
return md_results
def display_simple_results(md_results: list[tuple[int, GenericMetadata]], force_interactive=True) -> GenericMetadata: def filter_simple_results(results: list[SimpleResult], force_interactive=True, aggressive_filtering=False) -> list[SimpleResult]:
if len(md_results) == 1 and md_results[0][0] <= 4 and not force_interactive: if not force_interactive:
return md_results[0][1] exact = [r for r in results if r['Distance'] == 0]
for counter, r in enumerate(md_results, 1): if len(exact) == 1:
print( return exact
" {}. {} #{} [{}] ({}/{}) - {} score: {}".format( if len(results) > 4:
counter, dist: list[tuple[int, list[SimpleResult]]] = []
r[1].series, filtered_results: list[SimpleResult] = []
r[1].issue, for distance, group in itertools.groupby(results, key=lambda r: r['Distance']):
r[1].publisher, dist.append((distance, list(group)))
r[1].month, if aggressive_filtering and dist[0][0] < 6:
r[1].year, for _, res in dist[:1]:
r[1].title, filtered_results.extend(res)
r[0]
),
)
while True:
i = input(f'Please select a result to tag the comic with or "q" to quit: [1-{len(md_results)}] ').casefold()
if (i.isdigit() and int(i) in range(1, len(md_results) + 1)):
break
if i == 'q':
logger.warning("User quit without saving metadata")
raise SystemExit(4)
return md_results[int(i)-1][1] return filtered_results
return results
def display_simple_results(md_results: list[tuple[int, GenericMetadata]], ca: comictaggerlib.cli.ComicArchive, force_interactive=True) -> GenericMetadata:
filename_md = ca.metadata_from_filename(utils.Parser.COMICFN2DICT)
if len(md_results) < 1:
logger.warning('No results found for comic')
raise SystemExit(4)
if not force_interactive:
if len(md_results) == 1 and md_results[0][0] <= 4:
return md_results[0][1]
series_match = []
for score, md in md_results:
if (
score < 10
and filename_md.series
and md.series
and utils.titles_match(filename_md.series, md.series)
and IssueString(filename_md.issue).as_string() == IssueString(md.issue).as_string()
):
series_match.append(md)
if len(series_match) == 1:
return series_match[0]
for counter, r in enumerate(md_results, 1):
print(
' {}. {} #{} [{}] ({}/{}) - {} score: {}'.format(
counter,
r[1].series,
r[1].issue,
r[1].publisher,
r[1].month,
r[1].year,
r[1].title,
r[0],
),
)
while True:
i = input(
f'Please select a result to tag the comic with or "q" to quit: [1-{len(md_results)}] ',
).casefold()
if (i.isdigit() and int(i) in range(1, len(md_results) + 1)):
break
if i == 'q':
logger.warning('User quit without saving metadata')
raise SystemExit(4)
return md_results[int(i) - 1][1]
def prepare_metadata(md: GenericMetadata, new_md: GenericMetadata, clear_tags: bool, auto_imprint: bool, remove_html_tables: bool) -> GenericMetadata: def prepare_metadata(md: GenericMetadata, new_md: GenericMetadata, clear_tags: bool, auto_imprint: bool, remove_html_tables: bool) -> GenericMetadata:
@ -96,11 +166,11 @@ def prepare_metadata(md: GenericMetadata, new_md: GenericMetadata, clear_tags: b
final_md.overlay(new_md, merge.Mode.OVERLAY, True) final_md.overlay(new_md, merge.Mode.OVERLAY, True)
issue_id = "" issue_id = ''
if final_md.issue_id: if final_md.issue_id:
issue_id = f" [Issue ID {final_md.issue_id}]" issue_id = f" [Issue ID {final_md.issue_id}]"
origin = "" origin = ''
if final_md.data_origin is not None: if final_md.data_origin is not None:
origin = f" using info from {final_md.data_origin.name}" origin = f" using info from {final_md.data_origin.name}"
notes = f"Tagged with quick_tag {__version__}{origin} on {datetime.now():%Y-%m-%d %H:%M:%S}.{issue_id}" notes = f"Tagged with quick_tag {__version__}{origin} on {datetime.now():%Y-%m-%d %H:%M:%S}.{issue_id}"
@ -110,54 +180,59 @@ def prepare_metadata(md: GenericMetadata, new_md: GenericMetadata, clear_tags: b
return final_md.replace( return final_md.replace(
is_empty=False, is_empty=False,
notes=utils.combine_notes(final_md.notes, notes, "Tagged with quick_tag"), notes=utils.combine_notes(final_md.notes, notes, 'Tagged with quick_tag'),
description=cleanup_html(final_md.description, remove_html_tables) or None, description=cleanup_html(final_md.description, remove_html_tables),
) )
def main(): def main():
manager = settngs.Manager('Simple comictagging script using ImageHash: https://pypi.org/project/ImageHash/') manager = settngs.Manager('Simple comictagging script using ImageHash: https://pypi.org/project/ImageHash/')
manager.add_group("runtime", settings) manager.add_group('runtime', settings)
opts,_ = manager.parse_cmdline() opts, _ = manager.parse_cmdline()
url: utils.Url = opts['runtime']['url'] url: utils.Url = opts['runtime']['url']
print(url) print(url)
max_hamming_distance: int = opts['runtime']['max'] max_hamming_distance: int = opts['runtime']['max']
simple: bool = opts['runtime']['simple'] simple: bool = opts['runtime']['simple']
if not simple: if not simple:
logger.error("Full results not implemented yet") logger.error('Full results not implemented yet')
raise SystemExit(1) raise SystemExit(1)
ca = comicarchive.ComicArchive(opts['runtime']['comic_archive']) ca = comicarchive.ComicArchive(opts['runtime']['comic_archive'])
if not ca.seems_to_be_a_comic_archive(): if not ca.seems_to_be_a_comic_archive():
logger.error("Could not open %s as an archive", ca.path) logger.error('Could not open %s as an archive', ca.path)
raise SystemExit(1) raise SystemExit(1)
try: try:
tags = ca.read_tags('cr') tags = ca.read_tags('cr')
cover_index = tags.get_cover_page_index_list()[0] cover_index = tags.get_cover_page_index_list()[0]
cover_image = Image.open(BytesIO(ca.get_page(cover_index))) cover_image = Image.open(BytesIO(ca.get_page(cover_index)))
except Exception: except Exception:
logger.exception("Unable to read cover image from archive") logger.exception('Unable to read cover image from archive')
raise SystemExit(2) raise SystemExit(2)
print('Tagging: ', ca.path) print('Tagging: ', ca.path)
ahash = imagehash.average_hash(cover_image) print("hashing cover")
dhash = imagehash.dhash(cover_image) ahash = imagehash.average_hash(cover_image)
phash = imagehash.phash(cover_image) dhash = imagehash.dhash(cover_image)
phash = imagehash.phash(cover_image)
results = SearchHashes(url.url, simple,max_hamming_distance,str(ahash),str(dhash),str(phash)) print("Searching hashes")
print(results) results = SearchHashes(url.url, simple, max_hamming_distance, str(ahash), str(dhash), str(phash))
if simple:
metadata_results = get_simple_results(results, opts['runtime']['cv_api_key'])
chosen_result = display_simple_results(metadata_results, opts['runtime']['force_interactive'])
else:
metadata_results = get_full_results(results)
chosen_result = display_full_results(metadata_results)
if ca.write_tags(prepare_metadata(GenericMetadata(), chosen_result, clear_tags=False, auto_imprint=True, remove_html_tables=True), 'cr'): print("Retrieving ComicVine data")
print(f'successfully saved metadata to {ca.path}') if simple:
raise SystemExit(0) filtered_results = filter_simple_results(results, opts['runtime']['force_interactive'], opts['runtime']['aggressive_filtering'])
logger.error("Failed to save metadata to %s", ca.path) metadata_results = get_simple_results(filtered_results, opts['runtime']['cv_api_key'])
raise SystemExit(2) chosen_result = display_simple_results(metadata_results, ca, opts['runtime']['force_interactive'])
else:
metadata_results = get_full_results(results)
chosen_result = display_full_results(metadata_results)
if ca.write_tags(prepare_metadata(GenericMetadata(), chosen_result, clear_tags=False, auto_imprint=True, remove_html_tables=True), 'cr'):
print(f'successfully saved metadata to {ca.path}')
raise SystemExit(0)
logger.error('Failed to save metadata to %s', ca.path)
raise SystemExit(2)
if __name__ == '__main__':
if __name__ == "__main__": main()
main()