Ensure that all output goes through a logger before output to the user

Adds an option to output json for CLI options
This commit is contained in:
Timmy Welch 2023-12-17 15:51:43 -08:00
parent c5cfd3ebdc
commit bb67ab009e
20 changed files with 726 additions and 400 deletions

View File

@ -91,7 +91,7 @@ class CoMet:
date_str += f"-{md.month:02}"
assign("date", date_str)
assign("coverImage", md.cover_image)
assign("coverImage", md._cover_image)
# loop thru credits, and build a list for each role that CoMet supports
for credit in metadata.credits:
@ -156,7 +156,7 @@ class CoMet:
_, md.month, md.year = utils.parse_date_str(utils.xlate(get("date")))
md.cover_image = utils.xlate(get("coverImage"))
md._cover_image = utils.xlate(get("coverImage"))
reading_direction = utils.xlate(get("readingDirection"))
if reading_direction is not None and reading_direction == "rtl":

View File

@ -45,11 +45,10 @@ def load_archive_plugins() -> None:
for arch in entry_points(group="comicapi.archiver"):
try:
archiver: type[Archiver] = arch.load()
if archiver.enabled:
if arch.module.startswith("comicapi"):
builtin.append(archiver)
else:
archivers.append(archiver)
if arch.module.startswith("comicapi"):
builtin.append(archiver)
else:
archivers.append(archiver)
except Exception:
logger.warning("Failed to load talker: %s", arch.name)
archivers.extend(builtin)
@ -88,7 +87,7 @@ class ComicArchive:
load_archive_plugins()
for archiver in archivers:
if archiver.is_valid(self.path):
if archiver.enabled and archiver.is_valid(self.path):
self.archiver = archiver.open(self.path)
break
@ -425,10 +424,10 @@ class ComicArchive:
# use the coverImage value from the comet_data to mark the cover in this struct
# walk through list of images in file, and find the matching one for md.coverImage
# need to remove the existing one in the default
if self.comet_md.cover_image is not None:
if self.comet_md._cover_image is not None:
cover_idx = 0
for idx, f in enumerate(self.get_page_name_list()):
if self.comet_md.cover_image == f:
if self.comet_md._cover_image == f:
cover_idx = idx
break
if cover_idx != 0:
@ -462,7 +461,7 @@ class ComicArchive:
# Set the coverImage value, if it's not the first page
cover_idx = int(metadata.get_cover_page_index_list()[0])
if cover_idx != 0:
metadata.cover_image = self.get_page_name(cover_idx)
metadata._cover_image = self.get_page_name(cover_idx)
comet_string = CoMet().string_from_metadata(metadata)
write_success = self.archiver.write_file(cast(str, self.comet_filename), comet_string.encode("utf-8"))

View File

@ -109,46 +109,44 @@ class GenericMetadata:
series: str | None = None
series_aliases: set[str] = dataclasses.field(default_factory=set)
issue: str | None = None
issue_count: int | None = None
title: str | None = None
title_aliases: set[str] = dataclasses.field(default_factory=set)
publisher: str | None = None
month: int | None = None
year: int | None = None
day: int | None = None
issue_count: int | None = None
volume: int | None = None
genres: set[str] = dataclasses.field(default_factory=set)
language: str | None = None # 2 letter iso code
description: str | None = None # use same way as Summary in CIX
volume_count: int | None = None
critical_rating: float | None = None # rating in CBL; CommunityRating in CIX
country: str | None = None
genres: set[str] = dataclasses.field(default_factory=set)
description: str | None = None # use same way as Summary in CIX
notes: str | None = None
alternate_series: str | None = None
alternate_number: str | None = None
alternate_count: int | None = None
story_arcs: list[str] = dataclasses.field(default_factory=list)
series_groups: list[str] = dataclasses.field(default_factory=list)
publisher: str | None = None
imprint: str | None = None
notes: str | None = None
day: int | None = None
month: int | None = None
year: int | None = None
language: str | None = None # 2 letter iso code
country: str | None = None
web_link: str | None = None
format: str | None = None
manga: str | None = None
black_and_white: bool | None = None
page_count: int | None = None
maturity_rating: str | None = None
story_arcs: list[str] = dataclasses.field(default_factory=list)
series_groups: list[str] = dataclasses.field(default_factory=list)
critical_rating: float | None = None # rating in CBL; CommunityRating in CIX
scan_info: str | None = None
tags: set[str] = dataclasses.field(default_factory=set)
pages: list[ImageMetadata] = dataclasses.field(default_factory=list)
page_count: int | None = None
characters: set[str] = dataclasses.field(default_factory=set)
teams: set[str] = dataclasses.field(default_factory=set)
locations: set[str] = dataclasses.field(default_factory=set)
alternate_images: list[str] = dataclasses.field(default_factory=list)
credits: list[Credit] = dataclasses.field(default_factory=list)
tags: set[str] = dataclasses.field(default_factory=set)
pages: list[ImageMetadata] = dataclasses.field(default_factory=list)
# Some CoMet-only items
price: float | None = None
@ -156,7 +154,10 @@ class GenericMetadata:
rights: str | None = None
identifier: str | None = None
last_mark: str | None = None
cover_image: str | None = None # url to cover image
# urls to cover image, not generally part of the metadata
_cover_image: str | None = None
_alternate_images: list[str] = dataclasses.field(default_factory=list)
def __post_init__(self) -> None:
for key, value in self.__dict__.items():
@ -191,58 +192,61 @@ class GenericMetadata:
if not new_md.is_empty:
self.is_empty = False
assign("series", new_md.series)
assign("series_id", new_md.series_id)
assign("issue", new_md.issue)
assign("tag_origin", new_md.tag_origin)
assign("issue_id", new_md.issue_id)
assign("series_id", new_md.series_id)
assign("series", new_md.series)
assign("series_aliases", new_md.series_aliases)
assign("issue", new_md.issue)
assign("issue_count", new_md.issue_count)
assign("title", new_md.title)
assign("publisher", new_md.publisher)
assign("day", new_md.day)
assign("month", new_md.month)
assign("year", new_md.year)
assign("title_aliases", new_md.title_aliases)
assign("volume", new_md.volume)
assign("volume_count", new_md.volume_count)
assign("language", new_md.language)
assign("country", new_md.country)
assign("critical_rating", new_md.critical_rating)
assign("genres", new_md.genres)
assign("description", new_md.description)
assign("notes", new_md.notes)
assign("alternate_series", new_md.alternate_series)
assign("alternate_number", new_md.alternate_number)
assign("alternate_count", new_md.alternate_count)
assign("story_arcs", new_md.story_arcs)
assign("series_groups", new_md.series_groups)
assign("publisher", new_md.publisher)
assign("imprint", new_md.imprint)
assign("day", new_md.day)
assign("month", new_md.month)
assign("year", new_md.year)
assign("language", new_md.language)
assign("country", new_md.country)
assign("web_link", new_md.web_link)
assign("format", new_md.format)
assign("manga", new_md.manga)
assign("black_and_white", new_md.black_and_white)
assign("maturity_rating", new_md.maturity_rating)
assign("critical_rating", new_md.critical_rating)
assign("scan_info", new_md.scan_info)
assign("description", new_md.description)
assign("notes", new_md.notes)
assign("tags", new_md.tags)
print("before", self.pages, new_md.pages)
assign("pages", new_md.pages)
print("after", self.pages, new_md.pages)
assign("page_count", new_md.page_count)
assign("characters", new_md.characters)
assign("teams", new_md.teams)
assign("locations", new_md.locations)
self.overlay_credits(new_md.credits)
assign("price", new_md.price)
assign("is_version_of", new_md.is_version_of)
assign("rights", new_md.rights)
assign("identifier", new_md.identifier)
assign("last_mark", new_md.last_mark)
self.overlay_credits(new_md.credits)
# TODO
# not sure if the tags and pages should broken down, or treated
# as whole lists....
# For now, go the easy route, where any overlay
# value wipes out the whole list
assign("series_aliases", new_md.series_aliases)
assign("title_aliases", new_md.title_aliases)
assign("genres", new_md.genres)
assign("story_arcs", new_md.story_arcs)
assign("series_groups", new_md.series_groups)
assign("characters", new_md.characters)
assign("teams", new_md.teams)
assign("locations", new_md.locations)
assign("tags", new_md.tags)
assign("pages", new_md.pages)
assign("_cover_image", new_md._cover_image)
assign("_alternate_images", new_md._alternate_images)
def overlay_credits(self, new_credits: list[Credit]) -> None:
for c in new_credits:
@ -494,5 +498,5 @@ md_test: GenericMetadata = GenericMetadata(
rights=None,
identifier=None,
last_mark=None,
cover_image=None,
_cover_image=None,
)

View File

@ -153,6 +153,48 @@ def parse_date_str(date_str: str | None) -> tuple[int | None, int | None, int |
return day, month, year
def shorten_path(path: pathlib.Path, path2: pathlib.Path | None = None) -> tuple[pathlib.Path, pathlib.Path]:
if path2:
path2 = path2.absolute()
path = path.absolute()
shortened_path: pathlib.Path = path
relative_path = pathlib.Path(path.anchor)
if path.is_relative_to(path.home()):
relative_path = path.home()
shortened_path = path.relative_to(path.home())
if path.is_relative_to(path.cwd()):
relative_path = path.cwd()
shortened_path = path.relative_to(path.cwd())
if path2 and shortened_path.is_relative_to(path2.parent):
relative_path = path2
shortened_path = shortened_path.relative_to(path2)
return relative_path, shortened_path
def path_to_short_str(original_path: pathlib.Path, renamed_path: pathlib.Path | None = None) -> str:
rel, _original_path = shorten_path(original_path)
path_str = str(_original_path)
if rel.samefile(rel.cwd()):
path_str = f"./{_original_path}"
elif rel.samefile(rel.home()):
path_str = f"~/{_original_path}"
if renamed_path:
rel, path = shorten_path(renamed_path, original_path.parent)
rename_str = f" -> {path}"
if rel.samefile(rel.cwd()):
rename_str = f" -> ./{_original_path}"
elif rel.samefile(rel.home()):
rename_str = f" -> ~/{_original_path}"
path_str += rename_str
return path_str
def get_recursive_filelist(pathlist: list[str]) -> list[str]:
"""Get a recursive list of of all files under all path items in the list"""
@ -301,6 +343,14 @@ def unique_file(file_name: pathlib.Path) -> pathlib.Path:
counter += 1
def parse_version(s: str) -> tuple[int, int, int]:
str_parts = s.split(".")[:3]
parts = [int(x) if x.isdigit() else 0 for x in str_parts]
parts.extend([0] * (3 - len(parts))) # Ensure exactly three elements in the resulting list
return (parts[0], parts[1], parts[2])
_languages: dict[str | None, str | None] = defaultdict(lambda: None)
_countries: dict[str | None, str | None] = defaultdict(lambda: None)

View File

@ -21,11 +21,11 @@ from typing import Callable
from PyQt5 import QtCore, QtGui, QtWidgets, uic
from comicapi.comicarchive import MetaDataStyle
from comicapi.comicarchive import ComicArchive, MetaDataStyle
from comicapi.genericmetadata import GenericMetadata
from comictaggerlib.coverimagewidget import CoverImageWidget
from comictaggerlib.ctsettings import ct_ns
from comictaggerlib.resulttypes import IssueResult, MultipleMatch
from comictaggerlib.resulttypes import IssueResult, Result
from comictaggerlib.ui import ui_path
from comictaggerlib.ui.qtutils import reduce_widget_font_size
from comictalker.comictalker import ComicTalker
@ -37,7 +37,7 @@ class AutoTagMatchWindow(QtWidgets.QDialog):
def __init__(
self,
parent: QtWidgets.QWidget,
match_set_list: list[MultipleMatch],
match_set_list: list[Result],
style: int,
fetch_func: Callable[[IssueResult], GenericMetadata],
config: ct_ns,
@ -50,7 +50,7 @@ class AutoTagMatchWindow(QtWidgets.QDialog):
self.config = config
self.current_match_set: MultipleMatch = match_set_list[0]
self.current_match_set: Result = match_set_list[0]
self.altCoverWidget = CoverImageWidget(
self.altCoverContainer, CoverImageWidget.AltCoverMode, config.Runtime_Options__config.user_cache_dir, talker
@ -103,7 +103,7 @@ class AutoTagMatchWindow(QtWidgets.QDialog):
self.twList.resizeColumnsToContents()
self.twList.selectRow(0)
path = self.current_match_set.ca.path
path = self.current_match_set.original_path
self.setWindowTitle(
"Select correct match or skip ({} of {}): {}".format(
self.current_match_set_idx + 1,
@ -120,18 +120,18 @@ class AutoTagMatchWindow(QtWidgets.QDialog):
self.twList.setSortingEnabled(False)
for row, match in enumerate(self.current_match_set.matches):
for row, match in enumerate(self.current_match_set.online_results):
self.twList.insertRow(row)
item_text = match["series"]
item_text = match.series
item = QtWidgets.QTableWidgetItem(item_text)
item.setData(QtCore.Qt.ItemDataRole.ToolTipRole, item_text)
item.setData(QtCore.Qt.ItemDataRole.UserRole, (match,))
item.setFlags(QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled)
self.twList.setItem(row, 0, item)
if match["publisher"] is not None:
item_text = str(match["publisher"])
if match.publisher is not None:
item_text = str(match.publisher)
else:
item_text = "Unknown"
item = QtWidgets.QTableWidgetItem(item_text)
@ -141,10 +141,10 @@ class AutoTagMatchWindow(QtWidgets.QDialog):
month_str = ""
year_str = "????"
if match["month"] is not None:
month_str = f"-{int(match['month']):02d}"
if match["year"] is not None:
year_str = str(match["year"])
if match.month is not None:
month_str = f"-{int(match.month):02d}"
if match.year is not None:
year_str = str(match.year)
item_text = year_str + month_str
item = QtWidgets.QTableWidgetItem(item_text)
@ -152,7 +152,7 @@ class AutoTagMatchWindow(QtWidgets.QDialog):
item.setFlags(QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled)
self.twList.setItem(row, 2, item)
item_text = match["issue_title"]
item_text = match.issue_title
if item_text is None:
item_text = ""
item = QtWidgets.QTableWidgetItem(item_text)
@ -176,17 +176,15 @@ class AutoTagMatchWindow(QtWidgets.QDialog):
if prev is not None and prev.row() == curr.row():
return None
self.altCoverWidget.set_issue_details(
self.current_match()["issue_id"],
[self.current_match()["image_url"], *self.current_match()["alt_image_urls"]],
)
if self.current_match()["description"] is None:
match = self.current_match()
self.altCoverWidget.set_issue_details(match.issue_id, [match.image_url, *match.alt_image_urls])
if match.description is None:
self.teDescription.setText("")
else:
self.teDescription.setText(self.current_match()["description"])
self.teDescription.setText(match.description)
def set_cover_image(self) -> None:
ca = self.current_match_set.ca
ca = ComicArchive(self.current_match_set.original_path)
self.archiveCoverWidget.set_archive(ca)
def current_match(self) -> IssueResult:
@ -229,7 +227,7 @@ class AutoTagMatchWindow(QtWidgets.QDialog):
def save_match(self) -> None:
match = self.current_match()
ca = self.current_match_set.ca
ca = ComicArchive(self.current_match_set.original_path)
md = ca.read_metadata(self._style)
if md.is_empty:
@ -241,7 +239,7 @@ class AutoTagMatchWindow(QtWidgets.QDialog):
)
# now get the particular issue data
ct_md = self.fetch_func(match)
self.current_match_set.md = ct_md = self.fetch_func(match)
if ct_md is None:
QtWidgets.QMessageBox.critical(self, "Network Issue", "Could not retrieve issue details!")
return

View File

@ -16,10 +16,15 @@
# limitations under the License.
from __future__ import annotations
import dataclasses
import json
import logging
import os
import pathlib
import sys
from collections.abc import Collection
from datetime import datetime
from typing import Any, TextIO
from comicapi import utils
from comicapi.comicarchive import ComicArchive, MetaDataStyle
@ -30,18 +35,33 @@ from comictaggerlib.ctsettings import ct_ns
from comictaggerlib.filerenamer import FileRenamer, get_rename_dir
from comictaggerlib.graphics import graphics_path
from comictaggerlib.issueidentifier import IssueIdentifier
from comictaggerlib.resulttypes import MultipleMatch, OnlineMatchResults
from comictaggerlib.resulttypes import IssueResult, OnlineMatchResults, Result, Status
from comictalker.comictalker import ComicTalker, TalkerError
from comictalker.talker_utils import cleanup_html
logger = logging.getLogger(__name__)
class OutputEncoder(json.JSONEncoder):
def default(self, obj: Any) -> Any:
print(obj)
if isinstance(obj, pathlib.Path):
return str(obj)
if not isinstance(obj, str) and isinstance(obj, Collection):
return list(obj)
# Let the base class default method raise the TypeError
return json.JSONEncoder.default(self, obj)
class CLI:
def __init__(self, config: ct_ns, talkers: dict[str, ComicTalker]) -> None:
self.config = config
self.talkers = talkers
self.batch_mode = False
self.output_file = sys.stdout
if config.Runtime_Options__json:
self.output_file = sys.stderr
def current_talker(self) -> ComicTalker:
if self.config.Sources__source in self.talkers:
@ -49,6 +69,45 @@ class CLI:
logger.error("Could not find the '%s' talker", self.config.Sources__source)
raise SystemExit(2)
def output(self, *args: Any, file: TextIO | None = None, **kwargs: Any) -> None:
if file is None:
file = self.output_file
if not args:
log_args: tuple[Any, ...] = ("",)
elif isinstance(args[0], str):
log_args = (args[0].strip("\n"), *args[1:])
else:
log_args = args
logger.info(*log_args, **kwargs)
if self.config.Runtime_Options__verbose > 0:
return
if not self.config.Runtime_Options__quiet:
print(*args, **kwargs, file=file)
def run(self) -> None:
if len(self.config.Runtime_Options__files) < 1:
logger.error("You must specify at least one filename. Use the -h option for more info")
return
results: list[Result] = []
match_results = OnlineMatchResults()
self.batch_mode = len(self.config.Runtime_Options__files) > 1
for f in self.config.Runtime_Options__files:
results.append(self.process_file_cli(f, match_results))
if self.config.Runtime_Options__json:
print(dataclasses.asdict(results[-1]))
print(json.dumps(dataclasses.asdict(results[-1]), cls=OutputEncoder, indent=2))
sys.stdout.flush()
sys.stderr.flush()
self.post_process_matches(match_results)
if self.config.Runtime_Options__online:
self.output(
f"\nFiles tagged with metadata provided by {self.current_talker().name} {self.current_talker().website}",
)
def actual_issue_data_fetch(self, issue_id: str) -> GenericMetadata:
# now get the particular issue data
try:
@ -70,47 +129,44 @@ class CLI:
logger.error("The tag save seemed to fail for style: %s!", MetaDataStyle.name[metadata_style])
return False
print("Save complete.")
logger.info("Save complete.")
self.output("Save complete.")
else:
if self.config.Runtime_Options__quiet:
logger.info("dry-run option was set, so nothing was written")
print("dry-run option was set, so nothing was written")
self.output("dry-run option was set, so nothing was written")
else:
logger.info("dry-run option was set, so nothing was written, but here is the final set of tags:")
print("dry-run option was set, so nothing was written, but here is the final set of tags:")
print(f"{md}")
self.output("dry-run option was set, so nothing was written, but here is the final set of tags:")
self.output(f"{md}")
return True
def display_match_set_for_choice(self, label: str, match_set: MultipleMatch) -> None:
print(f"{match_set.ca.path} -- {label}:")
def display_match_set_for_choice(self, label: str, match_set: Result) -> None:
self.output(f"{match_set.original_path} -- {label}:")
# sort match list by year
match_set.matches.sort(key=lambda k: k["year"] or 0)
match_set.online_results.sort(key=lambda k: k.year or 0)
for counter, m in enumerate(match_set.matches, 1):
print(
for counter, m in enumerate(match_set.online_results, 1):
self.output(
" {}. {} #{} [{}] ({}/{}) - {}".format(
counter,
m["series"],
m["issue_number"],
m["publisher"],
m["month"],
m["year"],
m["issue_title"],
m.series,
m.issue_number,
m.publisher,
m.month,
m.year,
m.issue_title,
)
)
if self.config.Runtime_Options__interactive:
while True:
i = input("Choose a match #, or 's' to skip: ")
if (i.isdigit() and int(i) in range(1, len(match_set.matches) + 1)) or i == "s":
if (i.isdigit() and int(i) in range(1, len(match_set.online_results) + 1)) or i == "s":
break
if i != "s":
# save the data!
# we know at this point, that the file is all good to go
ca = match_set.ca
ca = ComicArchive(match_set.original_path)
md = self.create_local_metadata(ca)
ct_md = self.actual_issue_data_fetch(match_set.matches[int(i) - 1]["issue_id"])
ct_md = self.actual_issue_data_fetch(match_set.online_results[int(i) - 1].issue_id)
if self.config.Issue_Identifier__clear_metadata_on_import:
md = ct_md
else:
@ -123,69 +179,57 @@ class CLI:
if self.config.Issue_Identifier__auto_imprint:
md.fix_publisher()
match_set.md = md
self.actual_metadata_save(ca, md)
def post_process_matches(self, match_results: OnlineMatchResults) -> None:
def print_header(header: str) -> None:
self.output()
self.output(header)
self.output("------------------")
# now go through the match results
if self.config.Runtime_Options__summary:
if len(match_results.good_matches) > 0:
print("\nSuccessful matches:\n------------------")
print_header("Successful matches:")
for f in match_results.good_matches:
print(f)
self.output(f)
if len(match_results.no_matches) > 0:
print("\nNo matches:\n------------------")
print_header("No matches:")
for f in match_results.no_matches:
print(f)
self.output(f)
if len(match_results.write_failures) > 0:
print("\nFile Write Failures:\n------------------")
print_header("File Write Failures:")
for f in match_results.write_failures:
print(f)
self.output(f)
if len(match_results.fetch_data_failures) > 0:
print("\nNetwork Data Fetch Failures:\n------------------")
print_header("Network Data Fetch Failures:")
for f in match_results.fetch_data_failures:
print(f)
self.output(f)
if not self.config.Runtime_Options__summary and not self.config.Runtime_Options__interactive:
# just quit if we're not interactive or showing the summary
return
if len(match_results.multiple_matches) > 0:
print("\nArchives with multiple high-confidence matches:\n------------------")
self.output("\nArchives with multiple high-confidence matches:\n------------------")
for match_set in match_results.multiple_matches:
self.display_match_set_for_choice("Multiple high-confidence matches", match_set)
if len(match_results.low_confidence_matches) > 0:
print("\nArchives with low-confidence matches:\n------------------")
self.output("\nArchives with low-confidence matches:\n------------------")
for match_set in match_results.low_confidence_matches:
if len(match_set.matches) == 1:
if len(match_set.online_results) == 1:
label = "Single low-confidence match"
else:
label = "Multiple low-confidence matches"
self.display_match_set_for_choice(label, match_set)
def run(self) -> None:
if len(self.config.Runtime_Options__files) < 1:
logger.error("You must specify at least one filename. Use the -h option for more info")
return
match_results = OnlineMatchResults()
self.batch_mode = len(self.config.Runtime_Options__files) > 1
for f in self.config.Runtime_Options__files:
self.process_file_cli(f, match_results)
sys.stdout.flush()
self.post_process_matches(match_results)
if self.config.Runtime_Options__online:
print(
f"\nFiles tagged with metadata provided by {self.current_talker().name} {self.current_talker().website}"
)
def create_local_metadata(self, ca: ComicArchive) -> GenericMetadata:
md = GenericMetadata()
md.set_default_page_list(ca.get_number_of_pages())
@ -216,7 +260,7 @@ class CLI:
return md
def print(self, ca: ComicArchive) -> None:
def print(self, ca: ComicArchive) -> Result:
if not self.config.Runtime_Options__type:
page_count = ca.get_number_of_pages()
@ -245,116 +289,153 @@ class CLI:
brief += "CoMet "
brief += "]"
print(brief)
self.output(brief)
if self.config.Runtime_Options__quiet:
return
return Result(ca.path, None, [], Status.success)
print()
self.output()
raw: str | bytes = ""
md = None
if not self.config.Runtime_Options__type or MetaDataStyle.CIX in self.config.Runtime_Options__type:
if ca.has_metadata(MetaDataStyle.CIX):
print("--------- ComicRack tags ---------")
self.output("--------- ComicRack tags ---------")
try:
if self.config.Runtime_Options__raw:
raw = ca.read_raw_cix()
if isinstance(raw, bytes):
raw = raw.decode("utf-8")
print(raw)
self.output(raw)
else:
print(ca.read_cix())
md = ca.read_cix()
self.output(md)
except Exception as e:
logger.error("Failed to load metadata for %s: %s", ca.path, e)
if not self.config.Runtime_Options__type or MetaDataStyle.CBI in self.config.Runtime_Options__type:
if ca.has_metadata(MetaDataStyle.CBI):
print("------- ComicBookLover tags -------")
self.output("------- ComicBookLover tags -------")
try:
if self.config.Runtime_Options__raw:
raw = ca.read_raw_cbi()
if isinstance(raw, bytes):
raw = raw.decode("utf-8")
print(raw)
self.output(raw)
else:
print(ca.read_cbi())
md = ca.read_cbi()
self.output(md)
except Exception as e:
logger.error("Failed to load metadata for %s: %s", ca.path, e)
if not self.config.Runtime_Options__type or MetaDataStyle.COMET in self.config.Runtime_Options__type:
if ca.has_metadata(MetaDataStyle.COMET):
print("----------- CoMet tags -----------")
self.output("----------- CoMet tags -----------")
try:
if self.config.Runtime_Options__raw:
raw = ca.read_raw_comet()
if isinstance(raw, bytes):
raw = raw.decode("utf-8")
print(raw)
self.output(raw)
else:
print(ca.read_comet())
md = ca.read_comet()
self.output(md)
except Exception as e:
logger.error("Failed to load metadata for %s: %s", ca.path, e)
def delete(self, ca: ComicArchive) -> None:
for metadata_style in self.config.Runtime_Options__type:
style_name = MetaDataStyle.name[metadata_style]
if ca.has_metadata(metadata_style):
if not self.config.Runtime_Options__dryrun:
if not ca.remove_metadata(metadata_style):
print(f"{ca.path}: Tag removal seemed to fail!")
else:
print(f"{ca.path}: Removed {style_name} tags.")
return Result(ca.path, None, [], Status.success, md=md)
def delete_style(self, ca: ComicArchive, style: int) -> Status:
style_name = MetaDataStyle.name[style]
if ca.has_metadata(style):
if not self.config.Runtime_Options__dryrun:
if ca.remove_metadata(style):
self.output(f"{ca.path}: Removed {style_name} tags.")
return Status.success
else:
print(f"{ca.path}: dry-run. {style_name} tags not removed")
self.output(f"{ca.path}: Tag removal seemed to fail!")
return Status.write_failure
else:
print(f"{ca.path}: This archive doesn't have {style_name} tags to remove.")
self.output(f"{ca.path}: dry-run. {style_name} tags not removed")
return Status.success
self.output(f"{ca.path}: This archive doesn't have {style_name} tags to remove.")
return Status.success
def copy(self, ca: ComicArchive) -> None:
def delete(self, ca: ComicArchive) -> Result:
res = Result(ca.path, None, status=Status.success)
for metadata_style in self.config.Runtime_Options__type:
dst_style_name = MetaDataStyle.name[metadata_style]
if not self.config.Runtime_Options__overwrite and ca.has_metadata(metadata_style):
print(f"{ca.path}: Already has {dst_style_name} tags. Not overwriting.")
return
if self.config.Commands__copy == metadata_style:
print(f"{ca.path}: Destination and source are same: {dst_style_name}. Nothing to do.")
return
src_style_name = MetaDataStyle.name[self.config.Commands__copy]
if ca.has_metadata(self.config.Commands__copy):
if not self.config.Runtime_Options__dryrun:
try:
md = ca.read_metadata(self.config.Commands__copy)
except Exception as e:
md = GenericMetadata()
logger.error("Failed to load metadata for %s: %s", ca.path, e)
if self.config.Comic_Book_Lover__apply_transform_on_bulk_operation == MetaDataStyle.CBI:
md = CBLTransformer(md, self.config).apply()
if not ca.write_metadata(md, metadata_style):
print(f"{ca.path}: Tag copy seemed to fail!")
else:
print(f"{ca.path}: Copied {src_style_name} tags to {dst_style_name}.")
else:
print(f"{ca.path}: dry-run. {src_style_name} tags not copied")
status = self.delete_style(ca, metadata_style)
if status == Status.success:
res.tags_removed.append(metadata_style)
else:
print(f"{ca.path}: This archive doesn't have {src_style_name} tags to copy.")
res.status = status
return res
def save(self, ca: ComicArchive, match_results: OnlineMatchResults) -> None:
def copy_style(self, ca: ComicArchive, md: GenericMetadata, style: int) -> Status:
dst_style_name = MetaDataStyle.name[style]
if not self.config.Runtime_Options__overwrite and ca.has_metadata(style):
self.output(f"{ca.path}: Already has {dst_style_name} tags. Not overwriting.")
return Status.existing_tags
if self.config.Commands__copy == style:
self.output(f"{ca.path}: Destination and source are same: {dst_style_name}. Nothing to do.")
return Status.existing_tags
src_style_name = MetaDataStyle.name[self.config.Commands__copy]
if ca.has_metadata(self.config.Commands__copy):
if not self.config.Runtime_Options__dryrun:
if self.config.Comic_Book_Lover__apply_transform_on_bulk_operation == MetaDataStyle.CBI:
md = CBLTransformer(md, self.config).apply()
if ca.write_metadata(md, style):
self.output(f"{ca.path}: Copied {src_style_name} tags to {dst_style_name}.")
return Status.success
else:
self.output(f"{ca.path}: Tag copy seemed to fail!")
return Status.write_failure
else:
self.output(f"{ca.path}: dry-run. {src_style_name} tags not copied")
return Status.success
self.output(f"{ca.path}: This archive doesn't have {src_style_name} tags to copy.")
return Status.read_failure
def copy(self, ca: ComicArchive) -> Result:
res = Result(ca.path, None)
try:
res.md = ca.read_metadata(self.config.Commands__copy)
except Exception as e:
logger.error("Failed to load metadata for %s: %s", ca.path, e)
return res
for metadata_style in self.config.Runtime_Options__type:
status = self.copy_style(ca, res.md, metadata_style)
if status == Status.success:
res.tags_saved.append(metadata_style)
else:
res.status = status
return res
def save(self, ca: ComicArchive, match_results: OnlineMatchResults) -> Result:
if not self.config.Runtime_Options__overwrite:
for metadata_style in self.config.Runtime_Options__type:
if ca.has_metadata(metadata_style):
print(f"{ca.path}: Already has {MetaDataStyle.name[metadata_style]} tags. Not overwriting.")
return
self.output(f"{ca.path}: Already has {MetaDataStyle.name[metadata_style]} tags. Not overwriting.")
return Result(
original_path=ca.path,
renamed_path=None,
online_results=[],
status=Status.existing_tags,
tags_saved=self.config.Runtime_Options__type,
)
if self.batch_mode:
print(f"Processing {ca.path}...")
self.output(f"Processing {utils.path_to_short_str(ca.path)}...")
md = self.create_local_metadata(ca)
if md.issue is None or md.issue == "":
if self.config.Auto_Tag__assume_1_if_no_issue_num:
md.issue = "1"
matches: list[IssueResult] = []
# now, search online
if self.config.Runtime_Options__online:
if self.config.Runtime_Options__issue_id is not None:
@ -363,32 +444,53 @@ class CLI:
ct_md = self.current_talker().fetch_comic_data(self.config.Runtime_Options__issue_id)
except TalkerError as e:
logger.exception(f"Error retrieving issue details. Save aborted.\n{e}")
match_results.fetch_data_failures.append(str(ca.path.absolute()))
return
res = Result(
original_path=ca.path,
renamed_path=None,
online_results=[],
status=Status.fetch_data_failure,
tags_saved=self.config.Runtime_Options__type,
)
match_results.fetch_data_failures.append(res)
return res
if ct_md is None:
logger.error("No match for ID %s was found.", self.config.Runtime_Options__issue_id)
match_results.no_matches.append(str(ca.path.absolute()))
return
res = Result(
original_path=ca.path,
renamed_path=None,
online_results=[],
status=Status.no_match,
tags_saved=self.config.Runtime_Options__type,
)
match_results.no_matches.append(res)
return res
if self.config.Comic_Book_Lover__apply_transform_on_import:
ct_md = CBLTransformer(ct_md, self.config).apply()
else:
if md is None or md.is_empty:
logger.error("No metadata given to search online with!")
match_results.no_matches.append(str(ca.path.absolute()))
return
res = Result(
original_path=ca.path,
renamed_path=None,
online_results=[],
status=Status.no_match,
tags_saved=self.config.Runtime_Options__type,
)
match_results.no_matches.append(res)
return res
ii = IssueIdentifier(ca, self.config, self.current_talker())
def myoutput(text: str) -> None:
if self.config.Runtime_Options__verbose:
IssueIdentifier.default_write_output(text)
self.output(text)
# use our overlaid MD struct to search
ii.set_additional_metadata(md)
ii.only_use_additional_meta_data = True
ii.set_output_function(myoutput)
# ii.set_output_function(myoutput)
ii.cover_page_index = md.get_cover_page_index_list()[0]
matches = ii.search()
@ -416,35 +518,70 @@ class CLI:
if choices:
if low_confidence:
logger.error("Online search: Multiple low confidence matches. Save aborted")
match_results.low_confidence_matches.append(MultipleMatch(ca, matches))
return
res = Result(
original_path=ca.path,
renamed_path=None,
online_results=matches,
status=Status.low_confidence_match,
tags_saved=self.config.Runtime_Options__type,
)
match_results.low_confidence_matches.append(res)
return res
logger.error("Online search: Multiple good matches. Save aborted")
match_results.multiple_matches.append(MultipleMatch(ca, matches))
return
res = Result(
original_path=ca.path,
renamed_path=None,
online_results=matches,
status=Status.multiple_match,
tags_saved=self.config.Runtime_Options__type,
)
match_results.multiple_matches.append(res)
return res
if low_confidence and self.config.Runtime_Options__abort_on_low_confidence:
logger.error("Online search: Low confidence match. Save aborted")
match_results.low_confidence_matches.append(MultipleMatch(ca, matches))
return
res = Result(
original_path=ca.path,
renamed_path=None,
online_results=matches,
status=Status.low_confidence_match,
tags_saved=self.config.Runtime_Options__type,
)
match_results.low_confidence_matches.append(res)
return res
if not found_match:
logger.error("Online search: No match found. Save aborted")
match_results.no_matches.append(str(ca.path.absolute()))
return
res = Result(
original_path=ca.path,
renamed_path=None,
online_results=matches,
status=Status.no_match,
tags_saved=self.config.Runtime_Options__type,
)
match_results.no_matches.append(res)
return res
# we got here, so we have a single match
# now get the particular issue data
ct_md = self.actual_issue_data_fetch(matches[0]["issue_id"])
ct_md = self.actual_issue_data_fetch(matches[0].issue_id)
if ct_md.is_empty:
match_results.fetch_data_failures.append(str(ca.path.absolute()))
return
res = Result(
original_path=ca.path,
renamed_path=None,
online_results=matches,
status=Status.fetch_data_failure,
tags_saved=self.config.Runtime_Options__type,
)
match_results.fetch_data_failures.append(res)
return res
if self.config.Issue_Identifier__clear_metadata_on_import:
md = GenericMetadata()
notes = (
f"Tagged with ComicTagger {ctversion.version} using info from {self.current_talker().name} on"
f" {datetime.now():%Y-%m-%d %H:%M:%S}. [Issue ID {ct_md.issue_id}]"
+ f" {datetime.now():%Y-%m-%d %H:%M:%S}. [Issue ID {ct_md.issue_id}]"
)
md.overlay(
ct_md.replace(
@ -456,13 +593,23 @@ class CLI:
if self.config.Issue_Identifier__auto_imprint:
md.fix_publisher()
res = Result(
original_path=ca.path,
renamed_path=None,
online_results=matches,
status=Status.success,
md=md,
tags_saved=self.config.Runtime_Options__type,
)
# ok, done building our metadata. time to save
if not self.actual_metadata_save(ca, md):
match_results.write_failures.append(str(ca.path.absolute()))
if self.actual_metadata_save(ca, md):
match_results.good_matches.append(res)
else:
match_results.good_matches.append(str(ca.path.absolute()))
res.status = Status.write_failure
match_results.write_failures.append(res)
return res
def rename(self, ca: ComicArchive) -> None:
def rename(self, ca: ComicArchive) -> Result:
original_path = ca.path
msg_hdr = ""
if self.batch_mode:
@ -472,7 +619,7 @@ class CLI:
if md.series is None:
logger.error(msg_hdr + "Can't rename without series name")
return
return Result(original_path, None, [], Status.no_match)
new_ext = "" # default
if self.config.File_Rename__set_extension_based_on_archive:
@ -492,26 +639,27 @@ class CLI:
new_name = renamer.determine_name(ext=new_ext)
except ValueError:
logger.exception(
msg_hdr + "Invalid format string!\n"
"Your rename template is invalid!\n\n"
"%s\n\n"
"Please consult the template help in the settings "
"and the documentation on the format at "
"https://docs.python.org/3/library/string.html#format-string-syntax",
msg_hdr
+ "Invalid format string!\n"
+ "Your rename template is invalid!\n\n"
+ "%s\n\n"
+ "Please consult the template help in the settings "
+ "and the documentation on the format at "
+ "https://docs.python.org/3/library/string.html#format-string-syntax",
self.config.File_Rename__template,
)
return
return Result(original_path, None, [], Status.rename_failure, md=md)
except Exception:
logger.exception("Formatter failure: %s metadata: %s", self.config.File_Rename__template, renamer.metadata)
return
return Result(original_path, None, [], Status.rename_failure, md=md)
folder = get_rename_dir(ca, self.config.File_Rename__dir if self.config.File_Rename__move_to_dir else None)
full_path = folder / new_name
if full_path == ca.path:
print(msg_hdr + "Filename is already good!", file=sys.stderr)
return
self.output(msg_hdr + "Filename is already good!")
return Result(original_path, full_path, [], Status.existing_tags, md=md)
suffix = ""
if not self.config.Runtime_Options__dryrun:
@ -520,10 +668,12 @@ class CLI:
ca.rename(utils.unique_file(full_path))
except OSError:
logger.exception("Failed to rename comic archive: %s", ca.path)
return Result(original_path, full_path, [], Status.write_failure, md=md)
else:
suffix = " (dry-run, no change)"
print(f"renamed '{original_path.name}' -> '{new_name}' {suffix}")
self.output(f"renamed '{original_path.name}' -> '{new_name}' {suffix}")
return Result(original_path, None, [], Status.success, md=md)
def export(self, ca: ComicArchive) -> None:
msg_hdr = ""
@ -538,7 +688,7 @@ class CLI:
new_file = filename_path.with_suffix(".cbz")
if self.config.Runtime_Options__abort_on_conflict and new_file.exists():
print(msg_hdr + f"{new_file.name} already exists in the that folder.")
self.output(msg_hdr + f"{new_file.name} already exists in the that folder.")
return
new_file = utils.unique_file(new_file)
@ -562,7 +712,7 @@ class CLI:
msg = msg_hdr + f"Dry-run: Would try to create {os.path.split(new_file)[1]}"
if self.config.Runtime_Options__delete_after_zip_export:
msg += " and delete original."
print(msg)
self.output(msg)
return
msg = msg_hdr
@ -573,18 +723,18 @@ class CLI:
else:
msg += "Archive failed to export!"
print(msg)
self.output(msg)
def process_file_cli(self, filename: str, match_results: OnlineMatchResults) -> None:
def process_file_cli(self, filename: str, match_results: OnlineMatchResults) -> Result:
if not os.path.lexists(filename):
logger.error("Cannot find %s", filename)
return
return Result(pathlib.Path(filename), None, [], Status.read_failure)
ca = ComicArchive(filename, str(graphics_path / "nocover.png"))
if not ca.seems_to_be_a_comic_archive():
logger.error("Sorry, but %s is not a comic archive!", filename)
return
return Result(ca.path, None, [], Status.read_failure)
if not ca.is_writable() and (
self.config.Commands__delete
@ -593,22 +743,22 @@ class CLI:
or self.config.Commands__rename
):
logger.error("This archive is not writable")
return
return Result(ca.path, None, [], Status.write_permission_failure)
if self.config.Commands__print:
self.print(ca)
return self.print(ca)
elif self.config.Commands__delete:
self.delete(ca)
return self.delete(ca)
elif self.config.Commands__copy is not None:
self.copy(ca)
return self.copy(ca)
elif self.config.Commands__save:
self.save(ca, match_results)
return self.save(ca, match_results)
elif self.config.Commands__rename:
self.rename(ca)
return self.rename(ca)
elif self.config.Commands__export_to_zip:
self.export(ca)
return self.export(ca)

View File

@ -159,6 +159,7 @@ def register_runtime(parser: settngs.Manager) -> None:
parser.add_setting("--darkmode", action="store_true", help="Windows only. Force a dark pallet", file=False)
parser.add_setting("-g", "--glob", action="store_true", help="Windows only. Enable globbing", file=False)
parser.add_setting("--quiet", "-q", action="store_true", help="Don't say much (for print mode).", file=False)
parser.add_setting("--json", "-j", action="store_true", help="Output json on stdout", file=False)
parser.add_setting(
"-t",

View File

@ -50,7 +50,7 @@ def identifier(parser: settngs.Manager) -> None:
parser.add_setting("--series-match-search-thresh", default=90, type=int)
parser.add_setting(
"--clear-metadata",
default=True,
default=False,
help="Clears all existing metadata during import, default is to merges metadata.\nMay be used in conjunction with -o, -f and -m.\n\n",
dest="clear_metadata_on_import",
action=argparse.BooleanOptionalAction,
@ -78,12 +78,6 @@ def identifier(parser: settngs.Manager) -> None:
action=argparse.BooleanOptionalAction,
help="Enables the publisher filter",
)
parser.add_setting(
"--clear-form-before-populating",
default=False,
action=argparse.BooleanOptionalAction,
help="Clears all existing metadata when applying metadata from comic source",
)
def dialog(parser: settngs.Manager) -> None:

View File

@ -36,6 +36,7 @@ class settngs_namespace(settngs.TypedNS):
Runtime_Options__darkmode: bool
Runtime_Options__glob: bool
Runtime_Options__quiet: bool
Runtime_Options__json: bool
Runtime_Options__type: list[int]
Runtime_Options__overwrite: bool
Runtime_Options__no_gui: bool
@ -63,7 +64,6 @@ class settngs_namespace(settngs.TypedNS):
Issue_Identifier__sort_series_by_year: bool
Issue_Identifier__exact_series_matches_first: bool
Issue_Identifier__always_use_publisher_filter: bool
Issue_Identifier__clear_form_before_populating: bool
Filename_Parsing__complicated_parser: bool
Filename_Parsing__remove_c2c: bool

View File

@ -17,7 +17,7 @@ from __future__ import annotations
import io
import logging
import sys
from operator import attrgetter
from typing import Any, Callable
from typing_extensions import NotRequired, TypedDict
@ -102,8 +102,8 @@ class IssueIdentifier:
self.publisher_filter = [s.strip().casefold() for s in config.Issue_Identifier__publisher_filter]
self.additional_metadata = GenericMetadata()
self.output_function: Callable[[str], None] = IssueIdentifier.default_write_output
self.callback: Callable[[int, int], None] | None = None
self.output_function: Callable[[str], None] = lambda x: print(x)
self.progress_callback: Callable[[int, int], None] | None = None
self.cover_url_callback: Callable[[bytes], None] | None = None
self.search_result = self.result_no_matches
self.cover_page_index = 0
@ -208,7 +208,7 @@ class IssueIdentifier:
return None
def set_progress_callback(self, cb_func: Callable[[int, int], None]) -> None:
self.callback = cb_func
self.progress_callback = cb_func
def set_cover_url_callback(self, cb_func: Callable[[bytes], None]) -> None:
self.cover_url_callback = cb_func
@ -264,16 +264,32 @@ class IssueIdentifier:
return search_keys
@staticmethod
def default_write_output(text: str) -> None:
sys.stdout.write(text)
sys.stdout.flush()
def log_msg(self, msg: Any, newline: bool = True) -> None:
def log_msg(self, msg: Any) -> None:
msg = str(msg)
if newline:
msg += "\n"
self.output_function(msg)
for handler in logging.getLogger().handlers:
handler.flush()
self.output(msg)
def output(self, *args: Any, file: Any = None, **kwargs: Any) -> None:
# We intercept the file argument otherwise everything is passed to self.output_function
# Ensure args[0] is defined and is a string for logger.info
if not args:
log_args: tuple[Any, ...] = ("",)
elif isinstance(args[0], str):
log_args = (args[0].strip("\n"), *args[1:])
else:
log_args = args
log_msg = " ".join([str(x) for x in log_args])
# Always send to logger so that we have a record for troubleshooting
logger.info(log_msg, **kwargs)
# If we are verbose or quiet we don't need to call the output function
if self.config.Runtime_Options__verbose > 0 or self.config.Runtime_Options__quiet:
return
self.output_function(*args, **kwargs)
def get_issue_cover_match_score(
self,
@ -281,7 +297,6 @@ class IssueIdentifier:
alt_urls: list[str],
local_cover_hash_list: list[int],
use_remote_alternates: bool = False,
use_log: bool = True,
) -> Score:
# local_cover_hash_list is a list of pre-calculated hashes.
# use_remote_alternates - indicates to use alternate covers from CV
@ -332,10 +347,8 @@ class IssueIdentifier:
if self.cancel:
raise IssueIdentifierCancelled
if use_log and use_remote_alternates:
self.log_msg(f"[{len(remote_cover_list) - 1} alt. covers]", False)
if use_log:
self.log_msg("[ ", False)
if use_remote_alternates:
self.log_msg(f"[{len(remote_cover_list) - 1} alt. covers]")
score_list = []
done = False
@ -343,8 +356,8 @@ class IssueIdentifier:
for remote_cover_item in remote_cover_list:
score = ImageHasher.hamming_distance(local_cover_hash, remote_cover_item["hash"])
score_list.append(Score(score=score, url=remote_cover_item["url"], hash=remote_cover_item["hash"]))
if use_log:
self.log_msg(score, False)
self.log_msg(f" - {score:03}")
if score <= self.strong_score_thresh:
# such a good score, we can quit now, since for sure we have a winner
@ -353,9 +366,6 @@ class IssueIdentifier:
if done:
break
if use_log:
self.log_msg(" ]", False)
best_score_item = min(score_list, key=lambda x: x["score"])
return best_score_item
@ -446,8 +456,8 @@ class IssueIdentifier:
self.log_msg("Searching in " + str(len(series_second_round_list)) + " series")
if self.callback is not None:
self.callback(0, len(series_second_round_list))
if self.progress_callback is not None:
self.progress_callback(0, len(series_second_round_list))
# now sort the list by name length
series_second_round_list.sort(key=lambda x: len(x.name), reverse=False)
@ -485,13 +495,12 @@ class IssueIdentifier:
# Do first round of cover matching
counter = len(shortlist)
for series, issue in shortlist:
if self.callback is not None:
self.callback(counter, len(shortlist) * 3)
if self.progress_callback is not None:
self.progress_callback(counter, len(shortlist) * 3)
counter += 1
self.log_msg(
f"Examining covers for ID: {series.id} {series.name} ({series.start_year}) ...",
newline=False,
f"Examining covers for ID: {series.id} {series.name} ({series.start_year}):",
)
# Now check the cover match against the primary image
@ -505,8 +514,8 @@ class IssueIdentifier:
logger.info("Adding cropped cover to the hashlist")
try:
image_url = issue.cover_image or ""
alt_urls = issue.alternate_images
image_url = issue._cover_image or ""
alt_urls = issue._alternate_images
score_item = self.get_issue_cover_match_score(
image_url, alt_urls, hash_list, use_remote_alternates=False
@ -516,28 +525,28 @@ class IssueIdentifier:
self.match_list = []
return self.match_list
match: IssueResult = {
"series": f"{series.name} ({series.start_year})",
"distance": score_item["score"],
"issue_number": keys["issue_number"],
"cv_issue_count": series.count_of_issues,
"url_image_hash": score_item["hash"],
"issue_title": issue.title or "",
"issue_id": issue.issue_id or "",
"series_id": series.id,
"month": issue.month,
"year": issue.year,
"publisher": None,
"image_url": image_url,
"alt_image_urls": alt_urls,
"description": issue.description or "",
}
match = IssueResult(
series=f"{series.name} ({series.start_year})",
distance=score_item["score"],
issue_number=keys["issue_number"],
cv_issue_count=series.count_of_issues,
url_image_hash=score_item["hash"],
issue_title=issue.title or "",
issue_id=issue.issue_id or "",
series_id=series.id,
month=issue.month,
year=issue.year,
publisher=None,
image_url=image_url,
alt_image_urls=alt_urls,
description=issue.description or "",
)
if series.publisher is not None:
match["publisher"] = series.publisher
match.publisher = series.publisher
self.match_list.append(match)
self.log_msg(f" --> {match['distance']}", newline=False)
self.log_msg(f"best score {match.distance:03}")
self.log_msg("")
@ -547,28 +556,27 @@ class IssueIdentifier:
return self.match_list
# sort list by image match scores
self.match_list.sort(key=lambda k: k["distance"])
self.match_list.sort(key=attrgetter("distance"))
lst = []
for i in self.match_list:
lst.append(i["distance"])
lst.append(i.distance)
self.log_msg(f"Compared to covers in {len(self.match_list)} issue(s):", newline=False)
self.log_msg(str(lst))
self.log_msg(f"Compared to covers in {len(self.match_list)} issue(s): {lst}")
def print_match(item: IssueResult) -> None:
self.log_msg(
"-----> {} #{} {} ({}/{}) -- score: {}".format(
item["series"],
item["issue_number"],
item["issue_title"],
item["month"],
item["year"],
item["distance"],
item.series,
item.issue_number,
item.issue_title,
item.month,
item.year,
item.distance,
)
)
best_score: int = self.match_list[0]["distance"]
best_score: int = self.match_list[0].distance
if best_score >= self.min_score_thresh:
# we have 1 or more low-confidence matches (all bad cover scores)
@ -585,14 +593,14 @@ class IssueIdentifier:
second_match_list = []
counter = 2 * len(self.match_list)
for m in self.match_list:
if self.callback is not None:
self.callback(counter, len(self.match_list) * 3)
if self.progress_callback is not None:
self.progress_callback(counter, len(self.match_list) * 3)
counter += 1
self.log_msg(f"Examining alternate covers for ID: {m['series_id']} {m['series']} ...", newline=False)
self.log_msg(f"Examining alternate covers for ID: {m.series_id} {m.series}:")
try:
score_item = self.get_issue_cover_match_score(
m["image_url"],
m["alt_image_urls"],
m.image_url,
m.alt_image_urls,
hash_list,
use_remote_alternates=True,
)
@ -605,7 +613,7 @@ class IssueIdentifier:
if score_item["score"] < self.min_alternate_score_thresh:
second_match_list.append(m)
m["distance"] = score_item["score"]
m.distance = score_item["score"]
if len(second_match_list) == 0:
if len(self.match_list) == 1:
@ -626,17 +634,17 @@ class IssueIdentifier:
self.match_list = second_match_list
# sort new list by image match scores
self.match_list.sort(key=lambda k: k["distance"])
best_score = self.match_list[0]["distance"]
self.match_list.sort(key=attrgetter("distance"))
best_score = self.match_list[0].distance
self.log_msg("[Second round cover matching: best score = {best_score}]")
# now drop down into the rest of the processing
if self.callback is not None:
self.callback(99, 100)
if self.progress_callback is not None:
self.progress_callback(99, 100)
# now pare down list, remove any item more than specified distant from the top scores
for match_item in reversed(self.match_list):
if match_item["distance"] > best_score + self.min_score_distance:
if match_item.distance > best_score + self.min_score_distance:
self.match_list.remove(match_item)
# One more test for the case choosing limited series first issue vs a trade with the same cover:
@ -644,11 +652,11 @@ class IssueIdentifier:
if len(self.match_list) >= 2 and keys["issue_count"] is not None and keys["issue_count"] != 1:
new_list = []
for match in self.match_list:
if match["cv_issue_count"] != 1:
if match.cv_issue_count != 1:
new_list.append(match)
else:
self.log_msg(
f"Removing series {match['series']} [{match['series_id']}] from consideration (only 1 issue)"
f"Removing series {match.series} [{match.series_id}] from consideration (only 1 issue)"
)
if len(new_list) > 0:

View File

@ -222,7 +222,7 @@ class IssueSelectionWindow(QtWidgets.QDialog):
# list selection was changed, update the issue cover
issue = self.issue_list[self.issue_id]
if not (issue.issue and issue.year and issue.month and issue.cover_image and issue.title):
if not (issue.issue and issue.year and issue.month and issue._cover_image and issue.title):
QtWidgets.QApplication.setOverrideCursor(QtGui.QCursor(QtCore.Qt.CursorShape.WaitCursor))
try:
issue = self.talker.fetch_comic_data(issue_id=self.issue_id)
@ -231,7 +231,7 @@ class IssueSelectionWindow(QtWidgets.QDialog):
QtWidgets.QApplication.restoreOverrideCursor()
self.issue_number = issue.issue or ""
self.coverWidget.set_issue_details(self.issue_id, [issue.cover_image or "", *issue.alternate_images])
self.coverWidget.set_issue_details(self.issue_id, [issue._cover_image or "", *issue._alternate_images])
if issue.description is None:
self.set_description(self.teDescription, "")
else:

View File

@ -124,26 +124,67 @@ class App:
def list_plugins(
self, talkers: list[comictalker.ComicTalker], archivers: list[type[comicapi.comicarchive.Archiver]]
) -> None:
print("Metadata Sources: (ID: Name URL)") # noqa: T201
for talker in talkers:
print(f"{talker.id}: {talker.name} {talker.default_api_url}") # noqa: T201
if self.config[0].Runtime_Options__json:
for talker in talkers:
print( # noqa: T201
json.dumps(
{
"type": "talker",
"id": talker.id,
"name": talker.name,
"website": talker.website,
}
)
)
print("\nComic Archive: (Name: extension, exe)") # noqa: T201
for archiver in archivers:
a = archiver()
print(f"{a.name()}: {a.extension()}, {a.exe}") # noqa: T201
for archiver in archivers:
try:
a = archiver()
print( # noqa: T201
json.dumps(
{
"type": "archiver",
"enabled": a.enabled,
"name": a.name(),
"extension": a.extension(),
"exe": a.exe,
}
)
)
except Exception:
print( # noqa: T201
json.dumps(
{
"type": "archiver",
"enabled": archiver.enabled,
"name": "",
"extension": "",
"exe": archiver.exe,
}
)
)
else:
print("Metadata Sources: (ID: Name URL)") # noqa: T201
for talker in talkers:
print(f"{talker.id}: {talker.name} {talker.website}") # noqa: T201
print("\nComic Archive: (Name: extension, exe)") # noqa: T201
for archiver in archivers:
a = archiver()
print(f"{a.name()}: {a.extension()}, {a.exe}") # noqa: T201
def initialize(self) -> argparse.Namespace:
conf, _ = self.initial_arg_parser.parse_known_args()
conf, _ = self.initial_arg_parser.parse_known_intermixed_args()
assert conf is not None
setup_logging(conf.verbose, conf.config.user_log_dir)
return conf
def register_settings(self) -> None:
self.manager = settngs.Manager(
"A utility for reading and writing metadata to comic archives.\n\n\n"
+ "If no options are given, %(prog)s will run in windowed mode.",
"For more help visit the wiki at: https://github.com/comictagger/comictagger/wiki",
description="A utility for reading and writing metadata to comic archives.\n\n\n"
+ "If no options are given, %(prog)s will run in windowed mode.\nPlease keep the '-v' option separated '-so -v' not '-sov'",
epilog="For more help visit the wiki at: https://github.com/comictagger/comictagger/wiki",
)
ctsettings.register_commandline_settings(self.manager)
ctsettings.register_file_settings(self.manager)
@ -233,6 +274,7 @@ class App:
if error and error[1]:
print(f"A fatal error occurred please check the log for more information: {error[0]}") # noqa: T201
raise SystemExit(1)
try:
cli.CLI(self.config[0], talkers).run()
except Exception:

View File

@ -93,15 +93,15 @@ class MatchSelectionWindow(QtWidgets.QDialog):
for row, match in enumerate(self.matches):
self.twList.insertRow(row)
item_text = match["series"]
item_text = match.series
item = QtWidgets.QTableWidgetItem(item_text)
item.setData(QtCore.Qt.ItemDataRole.ToolTipRole, item_text)
item.setData(QtCore.Qt.ItemDataRole.UserRole, (match,))
item.setFlags(QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled)
self.twList.setItem(row, 0, item)
if match["publisher"] is not None:
item_text = str(match["publisher"])
if match.publisher is not None:
item_text = str(match.publisher)
else:
item_text = "Unknown"
item = QtWidgets.QTableWidgetItem(item_text)
@ -111,10 +111,10 @@ class MatchSelectionWindow(QtWidgets.QDialog):
month_str = ""
year_str = "????"
if match["month"] is not None:
month_str = f"-{int(match['month']):02d}"
if match["year"] is not None:
year_str = str(match["year"])
if match.month is not None:
month_str = f"-{int(match.month):02d}"
if match.year is not None:
year_str = str(match.year)
item_text = year_str + month_str
item = QtWidgets.QTableWidgetItem(item_text)
@ -122,7 +122,7 @@ class MatchSelectionWindow(QtWidgets.QDialog):
item.setFlags(QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled)
self.twList.setItem(row, 2, item)
item_text = match["issue_title"]
item_text = match.issue_title
if item_text is None:
item_text = ""
item = QtWidgets.QTableWidgetItem(item_text)
@ -146,14 +146,15 @@ class MatchSelectionWindow(QtWidgets.QDialog):
if prev is not None and prev.row() == curr.row():
return
match = self.current_match()
self.altCoverWidget.set_issue_details(
self.current_match()["issue_id"],
[self.current_match()["image_url"], *self.current_match()["alt_image_urls"]],
match.issue_id,
[match.image_url, *match.alt_image_urls],
)
if self.current_match()["description"] is None:
if match.description is None:
self.teDescription.setText("")
else:
self.teDescription.setText(self.current_match()["description"])
self.teDescription.setText(match.description)
def set_cover_image(self) -> None:
self.archiveCoverWidget.set_archive(self.comic_archive)

View File

@ -1,11 +1,55 @@
from __future__ import annotations
from typing_extensions import TypedDict
import dataclasses
import pathlib
import sys
from enum import Enum, auto
from typing import Any
from comicapi.comicarchive import ComicArchive
from comicapi import utils
from comicapi.genericmetadata import GenericMetadata
if sys.version_info < (3, 11):
class StrEnum(str, Enum):
"""
Enum where members are also (and must be) strings
"""
def __new__(cls, *values: Any) -> Any:
"values must already be of type `str`"
if len(values) > 3:
raise TypeError(f"too many arguments for str(): {values!r}")
if len(values) == 1:
# it must be a string
if not isinstance(values[0], str):
raise TypeError(f"{values[0]!r} is not a string")
if len(values) >= 2:
# check that encoding argument is a string
if not isinstance(values[1], str):
raise TypeError(f"encoding must be a string, not {values[1]!r}")
if len(values) == 3:
# check that errors argument is a string
if not isinstance(values[2], str):
raise TypeError("errors must be a string, not %r" % (values[2]))
value = str(*values)
member = str.__new__(cls, value)
member._value_ = value
return member
@staticmethod
def _generate_next_value_(name: str, start: int, count: int, last_values: Any) -> str:
"""
Return the lower-cased version of the member name.
"""
return name.lower()
else:
from enum import StrEnum
class IssueResult(TypedDict):
@dataclasses.dataclass
class IssueResult:
series: str
distance: int
issue_number: str
@ -21,18 +65,52 @@ class IssueResult(TypedDict):
alt_image_urls: list[str]
description: str
def __str__(self) -> str:
return f"series: {self.series}; series id: {self.series_id}; issue number: {self.issue_number}; issue id: {self.issue_id}; published: {self.month} {self.year}"
class MatchStatus(StrEnum):
good_match = auto()
no_match = auto()
multiple_match = auto()
low_confidence_match = auto()
class Status(StrEnum):
write_failure = auto()
fetch_data_failure = auto()
existing_tags = auto()
read_failure = auto()
write_permission_failure = auto()
@dataclasses.dataclass
class OnlineMatchResults:
def __init__(self) -> None:
self.good_matches: list[str] = []
self.no_matches: list[str] = []
self.multiple_matches: list[MultipleMatch] = []
self.low_confidence_matches: list[MultipleMatch] = []
self.write_failures: list[str] = []
self.fetch_data_failures: list[str] = []
good_matches: list[Result] = dataclasses.field(default_factory=list)
no_matches: list[Result] = dataclasses.field(default_factory=list)
multiple_matches: list[Result] = dataclasses.field(default_factory=list)
low_confidence_matches: list[Result] = dataclasses.field(default_factory=list)
write_failures: list[Result] = dataclasses.field(default_factory=list)
fetch_data_failures: list[Result] = dataclasses.field(default_factory=list)
class MultipleMatch:
def __init__(self, ca: ComicArchive, match_list: list[IssueResult]) -> None:
self.ca: ComicArchive = ca
self.matches: list[IssueResult] = match_list
@dataclasses.dataclass
class Result:
original_path: pathlib.Path
renamed_path: pathlib.Path | None
online_results: list[IssueResult] = dataclasses.field(default_factory=list)
match_status: MatchStatus | None = None
status: Status | None = None
md: GenericMetadata | None = None
tags_removed: list[int] = dataclasses.field(default_factory=list)
tags_saved: list[int] = dataclasses.field(default_factory=list)
def __str__(self) -> str:
if len(self.online_results) == 0:
matches = None
elif len(self.online_results) == 1:
matches = str(self.online_results[0])
else:
matches = "\n" + "".join([f" - {x}" for x in self.online_results])
path_str = utils.path_to_short_str(self.original_path, self.renamed_path)
return f"{path_str}: {matches}"

View File

@ -325,8 +325,8 @@ class SeriesSelectionWindow(QtWidgets.QDialog):
if found_match is not None:
self.iddialog.accept()
self.series_id = utils.xlate(found_match["series_id"]) or ""
self.issue_number = found_match["issue_number"]
self.series_id = utils.xlate(found_match.series_id) or ""
self.issue_number = found_match.issue_number
self.select_by_id()
self.show_issues()

View File

@ -389,7 +389,6 @@ class SettingsWindow(QtWidgets.QDialog):
self.switch_parser()
self.cbxClearFormBeforePopulating.setChecked(self.config[0].Issue_Identifier__clear_form_before_populating)
self.cbxUseFilter.setChecked(self.config[0].Issue_Identifier__always_use_publisher_filter)
self.cbxSortByYear.setChecked(self.config[0].Issue_Identifier__sort_series_by_year)
self.cbxExactMatches.setChecked(self.config[0].Issue_Identifier__exact_series_matches_first)
@ -507,7 +506,6 @@ class SettingsWindow(QtWidgets.QDialog):
self.cbxProtofoliusIssueNumberScheme.isChecked()
)
self.config[0].Issue_Identifier__clear_form_before_populating = self.cbxClearFormBeforePopulating.isChecked()
self.config[0].Issue_Identifier__always_use_publisher_filter = self.cbxUseFilter.isChecked()
self.config[0].Issue_Identifier__sort_series_by_year = self.cbxSortByYear.isChecked()
self.config[0].Issue_Identifier__exact_series_matches_first = self.cbxExactMatches.isChecked()

View File

@ -57,7 +57,7 @@ from comictaggerlib.optionalmsgdialog import OptionalMessageDialog
from comictaggerlib.pagebrowser import PageBrowserWindow
from comictaggerlib.pagelisteditor import PageListEditor
from comictaggerlib.renamewindow import RenameWindow
from comictaggerlib.resulttypes import IssueResult, MultipleMatch, OnlineMatchResults
from comictaggerlib.resulttypes import IssueResult, OnlineMatchResults, Result
from comictaggerlib.seriesselectionwindow import SeriesSelectionWindow
from comictaggerlib.settingswindow import SettingsWindow
from comictaggerlib.ui import ui_path
@ -292,6 +292,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
current_logs = ""
root_logger = logging.getLogger()
qapplogwindow = ApplicationLogWindow(
self.config[0].Runtime_Options__config.user_log_dir,
QTextEditLogger(logging.Formatter("%(asctime)s | %(name)s | %(levelname)s | %(message)s"), logging.DEBUG),
parent=self,
)
@ -1083,7 +1084,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
if self.config[0].Comic_Book_Lover__apply_transform_on_import:
new_metadata = CBLTransformer(new_metadata, self.config[0]).apply()
if self.config[0].Issue_Identifier__clear_form_before_populating:
if self.config[0].Issue_Identifier__clear_metadata_on_import:
self.clear_form()
notes = (
@ -1684,7 +1685,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
QtWidgets.QApplication.setOverrideCursor(QtGui.QCursor(QtCore.Qt.CursorShape.WaitCursor))
try:
ct_md = self.current_talker().fetch_comic_data(match["issue_id"])
ct_md = self.current_talker().fetch_comic_data(match.issue_id)
except TalkerError:
logger.exception("Save aborted.")
@ -1697,7 +1698,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
return ct_md
def auto_tag_log(self, text: str) -> None:
IssueIdentifier.default_write_output(text)
print(text)
if self.atprogdialog is not None:
self.atprogdialog.textEdit.append(text.rstrip())
self.atprogdialog.textEdit.ensureCursorVisible()
@ -1778,16 +1779,16 @@ class TaggerWindow(QtWidgets.QMainWindow):
if choices:
if low_confidence:
self.auto_tag_log("Online search: Multiple low-confidence matches. Save aborted\n")
match_results.low_confidence_matches.append(MultipleMatch(ca, matches))
match_results.low_confidence_matches.append(Result(ca.path, None, matches))
else:
self.auto_tag_log("Online search: Multiple matches. Save aborted\n")
match_results.multiple_matches.append(MultipleMatch(ca, matches))
match_results.multiple_matches.append(Result(ca.path, None, matches))
elif low_confidence and not dlg.auto_save_on_low:
self.auto_tag_log("Online search: Low confidence match. Save aborted\n")
match_results.low_confidence_matches.append(MultipleMatch(ca, matches))
match_results.low_confidence_matches.append(Result(ca.path, None, matches))
elif not found_match:
self.auto_tag_log("Online search: No match found. Save aborted\n")
match_results.no_matches.append(str(ca.path.absolute()))
match_results.no_matches.append(Result(ca.path, None, matches))
else:
# a single match!
if low_confidence:
@ -1796,7 +1797,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
# now get the particular issue data
ct_md = self.actual_issue_data_fetch(matches[0])
if ct_md is None:
match_results.fetch_data_failures.append(str(ca.path.absolute()))
match_results.fetch_data_failures.append(Result(ca.path, None, matches))
if ct_md is not None:
if dlg.cbxRemoveMetadata.isChecked():
@ -1812,10 +1813,10 @@ class TaggerWindow(QtWidgets.QMainWindow):
md.fix_publisher()
if not ca.write_metadata(md, self.save_data_style):
match_results.write_failures.append(str(ca.path.absolute()))
match_results.write_failures.append(Result(ca.path, None, matches))
self.auto_tag_log("Save failed ;-(\n")
else:
match_results.good_matches.append(str(ca.path.absolute()))
match_results.good_matches.append(Result(ca.path, None, matches))
success = True
self.auto_tag_log("Save complete!\n")
ca.load_cache([MetaDataStyle.CBI, MetaDataStyle.CIX])

View File

@ -642,12 +642,12 @@ class ComicVineTalker(ComicTalker):
series_aliases=series.aliases,
)
if issue.get("image") is None:
md.cover_image = ""
md._cover_image = ""
else:
md.cover_image = issue.get("image", {}).get("super_url", "")
md._cover_image = issue.get("image", {}).get("super_url", "")
for alt in issue.get("associated_images", []):
md.alternate_images.append(alt["original_url"])
md._alternate_images.append(alt["original_url"])
for character in issue.get("character_credits", set()):
md.characters.add(character["name"])

View File

@ -181,7 +181,7 @@ comic_issue_result = comicapi.genericmetadata.GenericMetadata(
issue_id=str(cv_issue_result["results"]["id"]),
series=cv_issue_result["results"]["volume"]["name"],
series_id=str(cv_issue_result["results"]["volume"]["id"]),
cover_image=cv_issue_result["results"]["image"]["super_url"],
_cover_image=cv_issue_result["results"]["image"]["super_url"],
issue=cv_issue_result["results"]["issue_number"],
volume=None,
title=cv_issue_result["results"]["name"],
@ -236,7 +236,7 @@ cv_md = comicapi.genericmetadata.GenericMetadata(
rights=None,
identifier=None,
last_mark=None,
cover_image=cv_issue_result["results"]["image"]["super_url"],
_cover_image=cv_issue_result["results"]["image"]["super_url"],
)

View File

@ -8,6 +8,7 @@ from PIL import Image
import comictaggerlib.issueidentifier
import testing.comicdata
import testing.comicvine
from comictaggerlib.resulttypes import IssueResult
def test_crop(cbz_double_cover, config, tmp_path, comicvine_api):
@ -51,23 +52,24 @@ def test_search(cbz, config, comicvine_api):
config, definitions = config
ii = comictaggerlib.issueidentifier.IssueIdentifier(cbz, config, comicvine_api)
results = ii.search()
cv_expected = {
"series": f"{testing.comicvine.cv_volume_result['results']['name']} ({testing.comicvine.cv_volume_result['results']['start_year']})",
"distance": 0,
"issue_number": testing.comicvine.cv_issue_result["results"]["issue_number"],
"alt_image_urls": [],
"cv_issue_count": testing.comicvine.cv_volume_result["results"]["count_of_issues"],
"issue_title": testing.comicvine.cv_issue_result["results"]["name"],
"issue_id": str(testing.comicvine.cv_issue_result["results"]["id"]),
"series_id": str(testing.comicvine.cv_volume_result["results"]["id"]),
"month": testing.comicvine.date[1],
"year": testing.comicvine.date[2],
"publisher": testing.comicvine.cv_volume_result["results"]["publisher"]["name"],
"image_url": testing.comicvine.cv_issue_result["results"]["image"]["super_url"],
"description": testing.comicvine.cv_issue_result["results"]["description"],
}
cv_expected = IssueResult(
series=f"{testing.comicvine.cv_volume_result['results']['name']} ({testing.comicvine.cv_volume_result['results']['start_year']})",
distance=0,
issue_number=testing.comicvine.cv_issue_result["results"]["issue_number"],
alt_image_urls=[],
cv_issue_count=testing.comicvine.cv_volume_result["results"]["count_of_issues"],
issue_title=testing.comicvine.cv_issue_result["results"]["name"],
issue_id=str(testing.comicvine.cv_issue_result["results"]["id"]),
series_id=str(testing.comicvine.cv_volume_result["results"]["id"]),
month=testing.comicvine.date[1],
year=testing.comicvine.date[2],
publisher=testing.comicvine.cv_volume_result["results"]["publisher"]["name"],
image_url=testing.comicvine.cv_issue_result["results"]["image"]["super_url"],
description=testing.comicvine.cv_issue_result["results"]["description"],
url_image_hash=1747255366011518976,
)
for r, e in zip(results, [cv_expected]):
del r["url_image_hash"]
# del r["url_image_hash"]
assert r == e