Merge branch 'json-output' into develop

This commit is contained in:
Timmy Welch 2023-12-17 18:28:53 -08:00
commit 3b2e763d7d
26 changed files with 1041 additions and 483 deletions

View File

@ -91,7 +91,7 @@ class CoMet:
date_str += f"-{md.month:02}"
assign("date", date_str)
assign("coverImage", md.cover_image)
assign("coverImage", md._cover_image)
# loop thru credits, and build a list for each role that CoMet supports
for credit in metadata.credits:
@ -156,7 +156,7 @@ class CoMet:
_, md.month, md.year = utils.parse_date_str(utils.xlate(get("date")))
md.cover_image = utils.xlate(get("coverImage"))
md._cover_image = utils.xlate(get("coverImage"))
reading_direction = utils.xlate(get("readingDirection"))
if reading_direction is not None and reading_direction == "rtl":

View File

@ -45,11 +45,10 @@ def load_archive_plugins() -> None:
for arch in entry_points(group="comicapi.archiver"):
try:
archiver: type[Archiver] = arch.load()
if archiver.enabled:
if arch.module.startswith("comicapi"):
builtin.append(archiver)
else:
archivers.append(archiver)
if arch.module.startswith("comicapi"):
builtin.append(archiver)
else:
archivers.append(archiver)
except Exception:
logger.warning("Failed to load talker: %s", arch.name)
archivers.extend(builtin)
@ -88,7 +87,7 @@ class ComicArchive:
load_archive_plugins()
for archiver in archivers:
if archiver.is_valid(self.path):
if archiver.enabled and archiver.is_valid(self.path):
self.archiver = archiver.open(self.path)
break
@ -425,10 +424,10 @@ class ComicArchive:
# use the coverImage value from the comet_data to mark the cover in this struct
# walk through list of images in file, and find the matching one for md.coverImage
# need to remove the existing one in the default
if self.comet_md.cover_image is not None:
if self.comet_md._cover_image is not None:
cover_idx = 0
for idx, f in enumerate(self.get_page_name_list()):
if self.comet_md.cover_image == f:
if self.comet_md._cover_image == f:
cover_idx = idx
break
if cover_idx != 0:
@ -462,7 +461,7 @@ class ComicArchive:
# Set the coverImage value, if it's not the first page
cover_idx = int(metadata.get_cover_page_index_list()[0])
if cover_idx != 0:
metadata.cover_image = self.get_page_name(cover_idx)
metadata._cover_image = self.get_page_name(cover_idx)
comet_string = CoMet().string_from_metadata(metadata)
write_success = self.archiver.write_file(cast(str, self.comet_filename), comet_string.encode("utf-8"))

View File

@ -109,46 +109,44 @@ class GenericMetadata:
series: str | None = None
series_aliases: set[str] = dataclasses.field(default_factory=set)
issue: str | None = None
issue_count: int | None = None
title: str | None = None
title_aliases: set[str] = dataclasses.field(default_factory=set)
publisher: str | None = None
month: int | None = None
year: int | None = None
day: int | None = None
issue_count: int | None = None
volume: int | None = None
genres: set[str] = dataclasses.field(default_factory=set)
language: str | None = None # 2 letter iso code
description: str | None = None # use same way as Summary in CIX
volume_count: int | None = None
critical_rating: float | None = None # rating in CBL; CommunityRating in CIX
country: str | None = None
genres: set[str] = dataclasses.field(default_factory=set)
description: str | None = None # use same way as Summary in CIX
notes: str | None = None
alternate_series: str | None = None
alternate_number: str | None = None
alternate_count: int | None = None
story_arcs: list[str] = dataclasses.field(default_factory=list)
series_groups: list[str] = dataclasses.field(default_factory=list)
publisher: str | None = None
imprint: str | None = None
notes: str | None = None
day: int | None = None
month: int | None = None
year: int | None = None
language: str | None = None # 2 letter iso code
country: str | None = None
web_link: str | None = None
format: str | None = None
manga: str | None = None
black_and_white: bool | None = None
page_count: int | None = None
maturity_rating: str | None = None
story_arcs: list[str] = dataclasses.field(default_factory=list)
series_groups: list[str] = dataclasses.field(default_factory=list)
critical_rating: float | None = None # rating in CBL; CommunityRating in CIX
scan_info: str | None = None
tags: set[str] = dataclasses.field(default_factory=set)
pages: list[ImageMetadata] = dataclasses.field(default_factory=list)
page_count: int | None = None
characters: set[str] = dataclasses.field(default_factory=set)
teams: set[str] = dataclasses.field(default_factory=set)
locations: set[str] = dataclasses.field(default_factory=set)
alternate_images: list[str] = dataclasses.field(default_factory=list)
credits: list[Credit] = dataclasses.field(default_factory=list)
tags: set[str] = dataclasses.field(default_factory=set)
pages: list[ImageMetadata] = dataclasses.field(default_factory=list)
# Some CoMet-only items
price: float | None = None
@ -156,7 +154,10 @@ class GenericMetadata:
rights: str | None = None
identifier: str | None = None
last_mark: str | None = None
cover_image: str | None = None # url to cover image
# urls to cover image, not generally part of the metadata
_cover_image: str | None = None
_alternate_images: list[str] = dataclasses.field(default_factory=list)
def __post_init__(self) -> None:
for key, value in self.__dict__.items():
@ -191,58 +192,59 @@ class GenericMetadata:
if not new_md.is_empty:
self.is_empty = False
assign("series", new_md.series)
assign("series_id", new_md.series_id)
assign("issue", new_md.issue)
assign("tag_origin", new_md.tag_origin)
assign("issue_id", new_md.issue_id)
assign("series_id", new_md.series_id)
assign("series", new_md.series)
assign("series_aliases", new_md.series_aliases)
assign("issue", new_md.issue)
assign("issue_count", new_md.issue_count)
assign("title", new_md.title)
assign("publisher", new_md.publisher)
assign("day", new_md.day)
assign("month", new_md.month)
assign("year", new_md.year)
assign("title_aliases", new_md.title_aliases)
assign("volume", new_md.volume)
assign("volume_count", new_md.volume_count)
assign("language", new_md.language)
assign("country", new_md.country)
assign("critical_rating", new_md.critical_rating)
assign("genres", new_md.genres)
assign("description", new_md.description)
assign("notes", new_md.notes)
assign("alternate_series", new_md.alternate_series)
assign("alternate_number", new_md.alternate_number)
assign("alternate_count", new_md.alternate_count)
assign("story_arcs", new_md.story_arcs)
assign("series_groups", new_md.series_groups)
assign("publisher", new_md.publisher)
assign("imprint", new_md.imprint)
assign("day", new_md.day)
assign("month", new_md.month)
assign("year", new_md.year)
assign("language", new_md.language)
assign("country", new_md.country)
assign("web_link", new_md.web_link)
assign("format", new_md.format)
assign("manga", new_md.manga)
assign("black_and_white", new_md.black_and_white)
assign("maturity_rating", new_md.maturity_rating)
assign("critical_rating", new_md.critical_rating)
assign("scan_info", new_md.scan_info)
assign("description", new_md.description)
assign("notes", new_md.notes)
assign("tags", new_md.tags)
assign("pages", new_md.pages)
assign("page_count", new_md.page_count)
assign("characters", new_md.characters)
assign("teams", new_md.teams)
assign("locations", new_md.locations)
self.overlay_credits(new_md.credits)
assign("price", new_md.price)
assign("is_version_of", new_md.is_version_of)
assign("rights", new_md.rights)
assign("identifier", new_md.identifier)
assign("last_mark", new_md.last_mark)
self.overlay_credits(new_md.credits)
# TODO
# not sure if the tags and pages should broken down, or treated
# as whole lists....
# For now, go the easy route, where any overlay
# value wipes out the whole list
assign("series_aliases", new_md.series_aliases)
assign("title_aliases", new_md.title_aliases)
assign("genres", new_md.genres)
assign("story_arcs", new_md.story_arcs)
assign("series_groups", new_md.series_groups)
assign("characters", new_md.characters)
assign("teams", new_md.teams)
assign("locations", new_md.locations)
assign("tags", new_md.tags)
assign("pages", new_md.pages)
assign("_cover_image", new_md._cover_image)
assign("_alternate_images", new_md._alternate_images)
def overlay_credits(self, new_credits: list[Credit]) -> None:
for c in new_credits:
@ -494,5 +496,5 @@ md_test: GenericMetadata = GenericMetadata(
rights=None,
identifier=None,
last_mark=None,
cover_image=None,
_cover_image=None,
)

View File

@ -153,6 +153,48 @@ def parse_date_str(date_str: str | None) -> tuple[int | None, int | None, int |
return day, month, year
def shorten_path(path: pathlib.Path, path2: pathlib.Path | None = None) -> tuple[pathlib.Path, pathlib.Path]:
if path2:
path2 = path2.absolute()
path = path.absolute()
shortened_path: pathlib.Path = path
relative_path = pathlib.Path(path.anchor)
if path.is_relative_to(path.home()):
relative_path = path.home()
shortened_path = path.relative_to(path.home())
if path.is_relative_to(path.cwd()):
relative_path = path.cwd()
shortened_path = path.relative_to(path.cwd())
if path2 and shortened_path.is_relative_to(path2.parent):
relative_path = path2
shortened_path = shortened_path.relative_to(path2)
return relative_path, shortened_path
def path_to_short_str(original_path: pathlib.Path, renamed_path: pathlib.Path | None = None) -> str:
rel, _original_path = shorten_path(original_path)
path_str = str(_original_path)
if rel.samefile(rel.cwd()):
path_str = f"./{_original_path}"
elif rel.samefile(rel.home()):
path_str = f"~/{_original_path}"
if renamed_path:
rel, path = shorten_path(renamed_path, original_path.parent)
rename_str = f" -> {path}"
if rel.samefile(rel.cwd()):
rename_str = f" -> ./{_original_path}"
elif rel.samefile(rel.home()):
rename_str = f" -> ~/{_original_path}"
path_str += rename_str
return path_str
def get_recursive_filelist(pathlist: list[str]) -> list[str]:
"""Get a recursive list of of all files under all path items in the list"""
@ -301,6 +343,14 @@ def unique_file(file_name: pathlib.Path) -> pathlib.Path:
counter += 1
def parse_version(s: str) -> tuple[int, int, int]:
str_parts = s.split(".")[:3]
parts = [int(x) if x.isdigit() else 0 for x in str_parts]
parts.extend([0] * (3 - len(parts))) # Ensure exactly three elements in the resulting list
return (parts[0], parts[1], parts[2])
_languages: dict[str | None, str | None] = defaultdict(lambda: None)
_countries: dict[str | None, str | None] = defaultdict(lambda: None)

View File

@ -1,6 +1,7 @@
from __future__ import annotations
import logging
import pathlib
from PyQt5 import QtCore, QtGui, QtWidgets, uic
@ -23,9 +24,11 @@ class QTextEditLogger(QtCore.QObject, logging.Handler):
class ApplicationLogWindow(QtWidgets.QDialog):
def __init__(self, log_handler: QTextEditLogger, parent: QtCore.QObject | None = None) -> None:
def __init__(
self, log_folder: pathlib.Path, log_handler: QTextEditLogger, parent: QtCore.QObject | None = None
) -> None:
super().__init__(parent)
with (ui_path / "logwindow.ui").open(encoding="utf-8") as uifile:
with (ui_path / "applicationlogwindow.ui").open(encoding="utf-8") as uifile:
uic.loadUi(uifile, self)
self.log_handler = log_handler
@ -37,6 +40,9 @@ class ApplicationLogWindow(QtWidgets.QDialog):
self._button = QtWidgets.QPushButton(self)
self._button.setText("Test Me")
self.log_folder = log_folder
self.lblLogLocation.setText(f'Log Location: <a href="file://{log_folder}">{log_folder}</a>')
layout = self.layout()
layout.addWidget(self._button)

View File

@ -21,11 +21,11 @@ from typing import Callable
from PyQt5 import QtCore, QtGui, QtWidgets, uic
from comicapi.comicarchive import MetaDataStyle
from comicapi.comicarchive import ComicArchive, MetaDataStyle
from comicapi.genericmetadata import GenericMetadata
from comictaggerlib.coverimagewidget import CoverImageWidget
from comictaggerlib.ctsettings import ct_ns
from comictaggerlib.resulttypes import IssueResult, MultipleMatch
from comictaggerlib.resulttypes import IssueResult, Result
from comictaggerlib.ui import ui_path
from comictaggerlib.ui.qtutils import reduce_widget_font_size
from comictalker.comictalker import ComicTalker
@ -37,7 +37,7 @@ class AutoTagMatchWindow(QtWidgets.QDialog):
def __init__(
self,
parent: QtWidgets.QWidget,
match_set_list: list[MultipleMatch],
match_set_list: list[Result],
style: int,
fetch_func: Callable[[IssueResult], GenericMetadata],
config: ct_ns,
@ -50,7 +50,7 @@ class AutoTagMatchWindow(QtWidgets.QDialog):
self.config = config
self.current_match_set: MultipleMatch = match_set_list[0]
self.current_match_set: Result = match_set_list[0]
self.altCoverWidget = CoverImageWidget(
self.altCoverContainer, CoverImageWidget.AltCoverMode, config.Runtime_Options__config.user_cache_dir, talker
@ -103,7 +103,7 @@ class AutoTagMatchWindow(QtWidgets.QDialog):
self.twList.resizeColumnsToContents()
self.twList.selectRow(0)
path = self.current_match_set.ca.path
path = self.current_match_set.original_path
self.setWindowTitle(
"Select correct match or skip ({} of {}): {}".format(
self.current_match_set_idx + 1,
@ -120,18 +120,18 @@ class AutoTagMatchWindow(QtWidgets.QDialog):
self.twList.setSortingEnabled(False)
for row, match in enumerate(self.current_match_set.matches):
for row, match in enumerate(self.current_match_set.online_results):
self.twList.insertRow(row)
item_text = match["series"]
item_text = match.series
item = QtWidgets.QTableWidgetItem(item_text)
item.setData(QtCore.Qt.ItemDataRole.ToolTipRole, item_text)
item.setData(QtCore.Qt.ItemDataRole.UserRole, (match,))
item.setFlags(QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled)
self.twList.setItem(row, 0, item)
if match["publisher"] is not None:
item_text = str(match["publisher"])
if match.publisher is not None:
item_text = str(match.publisher)
else:
item_text = "Unknown"
item = QtWidgets.QTableWidgetItem(item_text)
@ -141,10 +141,10 @@ class AutoTagMatchWindow(QtWidgets.QDialog):
month_str = ""
year_str = "????"
if match["month"] is not None:
month_str = f"-{int(match['month']):02d}"
if match["year"] is not None:
year_str = str(match["year"])
if match.month is not None:
month_str = f"-{int(match.month):02d}"
if match.year is not None:
year_str = str(match.year)
item_text = year_str + month_str
item = QtWidgets.QTableWidgetItem(item_text)
@ -152,7 +152,7 @@ class AutoTagMatchWindow(QtWidgets.QDialog):
item.setFlags(QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled)
self.twList.setItem(row, 2, item)
item_text = match["issue_title"]
item_text = match.issue_title
if item_text is None:
item_text = ""
item = QtWidgets.QTableWidgetItem(item_text)
@ -176,17 +176,15 @@ class AutoTagMatchWindow(QtWidgets.QDialog):
if prev is not None and prev.row() == curr.row():
return None
self.altCoverWidget.set_issue_details(
self.current_match()["issue_id"],
[self.current_match()["image_url"], *self.current_match()["alt_image_urls"]],
)
if self.current_match()["description"] is None:
match = self.current_match()
self.altCoverWidget.set_issue_details(match.issue_id, [match.image_url, *match.alt_image_urls])
if match.description is None:
self.teDescription.setText("")
else:
self.teDescription.setText(self.current_match()["description"])
self.teDescription.setText(match.description)
def set_cover_image(self) -> None:
ca = self.current_match_set.ca
ca = ComicArchive(self.current_match_set.original_path)
self.archiveCoverWidget.set_archive(ca)
def current_match(self) -> IssueResult:
@ -229,7 +227,7 @@ class AutoTagMatchWindow(QtWidgets.QDialog):
def save_match(self) -> None:
match = self.current_match()
ca = self.current_match_set.ca
ca = ComicArchive(self.current_match_set.original_path)
md = ca.read_metadata(self._style)
if md.is_empty:
@ -241,7 +239,7 @@ class AutoTagMatchWindow(QtWidgets.QDialog):
)
# now get the particular issue data
ct_md = self.fetch_func(match)
self.current_match_set.md = ct_md = self.fetch_func(match)
if ct_md is None:
QtWidgets.QMessageBox.critical(self, "Network Issue", "Could not retrieve issue details!")
return

View File

@ -16,10 +16,16 @@
# limitations under the License.
from __future__ import annotations
import dataclasses
import functools
import json
import logging
import os
import pathlib
import sys
from collections.abc import Collection
from datetime import datetime
from typing import Any, TextIO
from comicapi import utils
from comicapi.comicarchive import ComicArchive, MetaDataStyle
@ -30,18 +36,32 @@ from comictaggerlib.ctsettings import ct_ns
from comictaggerlib.filerenamer import FileRenamer, get_rename_dir
from comictaggerlib.graphics import graphics_path
from comictaggerlib.issueidentifier import IssueIdentifier
from comictaggerlib.resulttypes import MultipleMatch, OnlineMatchResults
from comictaggerlib.resulttypes import Action, IssueResult, MatchStatus, OnlineMatchResults, Result, Status
from comictalker.comictalker import ComicTalker, TalkerError
from comictalker.talker_utils import cleanup_html
logger = logging.getLogger(__name__)
class OutputEncoder(json.JSONEncoder):
def default(self, obj: Any) -> Any:
if isinstance(obj, pathlib.Path):
return str(obj)
if not isinstance(obj, str) and isinstance(obj, Collection):
return list(obj)
# Let the base class default method raise the TypeError
return json.JSONEncoder.default(self, obj)
class CLI:
def __init__(self, config: ct_ns, talkers: dict[str, ComicTalker]) -> None:
self.config = config
self.talkers = talkers
self.batch_mode = False
self.output_file = sys.stdout
if config.Runtime_Options__json:
self.output_file = sys.stderr
def current_talker(self) -> ComicTalker:
if self.config.Sources__source in self.talkers:
@ -49,6 +69,56 @@ class CLI:
logger.error("Could not find the '%s' talker", self.config.Sources__source)
raise SystemExit(2)
def output(
self,
*args: Any,
file: TextIO | None = None,
force_output: bool = False,
already_logged: bool = False,
**kwargs: Any,
) -> None:
if file is None:
file = self.output_file
if not args:
log_args: tuple[Any, ...] = ("",)
elif isinstance(args[0], str):
log_args = (args[0].strip("\n"), *args[1:])
else:
log_args = args
if not already_logged:
logger.info(*log_args, **kwargs)
if self.config.Runtime_Options__verbose > 0:
return
if not self.config.Runtime_Options__quiet or force_output:
print(*args, **kwargs, file=file)
def run(self) -> int:
if len(self.config.Runtime_Options__files) < 1:
logger.error("You must specify at least one filename. Use the -h option for more info")
return 1
return_code = 0
results: list[Result] = []
match_results = OnlineMatchResults()
self.batch_mode = len(self.config.Runtime_Options__files) > 1
for f in self.config.Runtime_Options__files:
results.append(self.process_file_cli(self.config.Commands__command, f, match_results))
if results[-1].status != Status.success:
return_code = 3
if self.config.Runtime_Options__json:
print(json.dumps(dataclasses.asdict(results[-1]), cls=OutputEncoder, indent=2))
sys.stdout.flush()
sys.stderr.flush()
self.post_process_matches(match_results)
if self.config.Runtime_Options__online:
self.output(
f"\nFiles tagged with metadata provided by {self.current_talker().name} {self.current_talker().website}",
)
return return_code
def actual_issue_data_fetch(self, issue_id: str) -> GenericMetadata:
# now get the particular issue data
try:
@ -70,122 +140,108 @@ class CLI:
logger.error("The tag save seemed to fail for style: %s!", MetaDataStyle.name[metadata_style])
return False
print("Save complete.")
logger.info("Save complete.")
self.output("Save complete.")
else:
if self.config.Runtime_Options__quiet:
logger.info("dry-run option was set, so nothing was written")
print("dry-run option was set, so nothing was written")
self.output("dry-run option was set, so nothing was written")
else:
logger.info("dry-run option was set, so nothing was written, but here is the final set of tags:")
print("dry-run option was set, so nothing was written, but here is the final set of tags:")
print(f"{md}")
self.output("dry-run option was set, so nothing was written, but here is the final set of tags:")
self.output(f"{md}")
return True
def display_match_set_for_choice(self, label: str, match_set: MultipleMatch) -> None:
print(f"{match_set.ca.path} -- {label}:")
def display_match_set_for_choice(self, label: str, match_set: Result) -> None:
self.output(f"{match_set.original_path} -- {label}:", force_output=True)
# sort match list by year
match_set.matches.sort(key=lambda k: k["year"] or 0)
match_set.online_results.sort(key=lambda k: k.year or 0)
for counter, m in enumerate(match_set.matches, 1):
print(
for counter, m in enumerate(match_set.online_results, 1):
self.output(
" {}. {} #{} [{}] ({}/{}) - {}".format(
counter,
m["series"],
m["issue_number"],
m["publisher"],
m["month"],
m["year"],
m["issue_title"],
)
m.series,
m.issue_number,
m.publisher,
m.month,
m.year,
m.issue_title,
),
force_output=True,
)
if self.config.Runtime_Options__interactive:
while True:
i = input("Choose a match #, or 's' to skip: ")
if (i.isdigit() and int(i) in range(1, len(match_set.matches) + 1)) or i == "s":
if (i.isdigit() and int(i) in range(1, len(match_set.online_results) + 1)) or i == "s":
break
if i != "s":
# save the data!
# we know at this point, that the file is all good to go
ca = match_set.ca
ca = ComicArchive(match_set.original_path)
md = self.create_local_metadata(ca)
ct_md = self.actual_issue_data_fetch(match_set.matches[int(i) - 1]["issue_id"])
ct_md = self.actual_issue_data_fetch(match_set.online_results[int(i) - 1].issue_id)
if self.config.Issue_Identifier__clear_metadata_on_import:
md = ct_md
else:
notes = (
f"Tagged with ComicTagger {ctversion.version} using info from {self.current_talker().name} on"
f" {datetime.now():%Y-%m-%d %H:%M:%S}. [Issue ID {ct_md.issue_id}]"
f" {datetime.now():%Y-%m-%d %H:%M:%S}. [Issue ID {ct_md.issue_id}]"
)
md.overlay(ct_md.replace(notes=utils.combine_notes(md.notes, notes, "Tagged with ComicTagger")))
if self.config.Issue_Identifier__auto_imprint:
md.fix_publisher()
match_set.md = md
self.actual_metadata_save(ca, md)
def post_process_matches(self, match_results: OnlineMatchResults) -> None:
def print_header(header: str) -> None:
self.output("", force_output=True)
self.output(header, force_output=True)
self.output("------------------", force_output=True)
# now go through the match results
if self.config.Runtime_Options__summary:
if len(match_results.good_matches) > 0:
print("\nSuccessful matches:\n------------------")
print_header("Successful matches:")
for f in match_results.good_matches:
print(f)
self.output(f, force_output=True)
if len(match_results.no_matches) > 0:
print("\nNo matches:\n------------------")
print_header("No matches:")
for f in match_results.no_matches:
print(f)
self.output(f, force_output=True)
if len(match_results.write_failures) > 0:
print("\nFile Write Failures:\n------------------")
print_header("File Write Failures:")
for f in match_results.write_failures:
print(f)
self.output(f, force_output=True)
if len(match_results.fetch_data_failures) > 0:
print("\nNetwork Data Fetch Failures:\n------------------")
print_header("Network Data Fetch Failures:")
for f in match_results.fetch_data_failures:
print(f)
self.output(f, force_output=True)
if not self.config.Runtime_Options__summary and not self.config.Runtime_Options__interactive:
# just quit if we're not interactive or showing the summary
return
if len(match_results.multiple_matches) > 0:
print("\nArchives with multiple high-confidence matches:\n------------------")
self.output("\nArchives with multiple high-confidence matches:\n------------------", force_output=True)
for match_set in match_results.multiple_matches:
self.display_match_set_for_choice("Multiple high-confidence matches", match_set)
if len(match_results.low_confidence_matches) > 0:
print("\nArchives with low-confidence matches:\n------------------")
self.output("\nArchives with low-confidence matches:\n------------------", force_output=True)
for match_set in match_results.low_confidence_matches:
if len(match_set.matches) == 1:
if len(match_set.online_results) == 1:
label = "Single low-confidence match"
else:
label = "Multiple low-confidence matches"
self.display_match_set_for_choice(label, match_set)
def run(self) -> None:
if len(self.config.Runtime_Options__files) < 1:
logger.error("You must specify at least one filename. Use the -h option for more info")
return
match_results = OnlineMatchResults()
self.batch_mode = len(self.config.Runtime_Options__files) > 1
for f in self.config.Runtime_Options__files:
self.process_file_cli(f, match_results)
sys.stdout.flush()
self.post_process_matches(match_results)
if self.config.Runtime_Options__online:
print(
f"\nFiles tagged with metadata provided by {self.current_talker().name} {self.current_talker().website}"
)
def create_local_metadata(self, ca: ComicArchive) -> GenericMetadata:
md = GenericMetadata()
md.set_default_page_list(ca.get_number_of_pages())
@ -216,7 +272,7 @@ class CLI:
return md
def print(self, ca: ComicArchive) -> None:
def print(self, ca: ComicArchive) -> Result:
if not self.config.Runtime_Options__type:
page_count = ca.get_number_of_pages()
@ -245,116 +301,152 @@ class CLI:
brief += "CoMet "
brief += "]"
print(brief)
self.output(brief)
if self.config.Runtime_Options__quiet:
return
return Result(Action.print, Status.success, ca.path)
print()
self.output()
raw: str | bytes = ""
md = None
if not self.config.Runtime_Options__type or MetaDataStyle.CIX in self.config.Runtime_Options__type:
if ca.has_metadata(MetaDataStyle.CIX):
print("--------- ComicRack tags ---------")
self.output("--------- ComicRack tags ---------")
try:
if self.config.Runtime_Options__raw:
raw = ca.read_raw_cix()
if isinstance(raw, bytes):
raw = raw.decode("utf-8")
print(raw)
self.output(raw)
else:
print(ca.read_cix())
md = ca.read_cix()
self.output(md)
except Exception as e:
logger.error("Failed to load metadata for %s: %s", ca.path, e)
if not self.config.Runtime_Options__type or MetaDataStyle.CBI in self.config.Runtime_Options__type:
if ca.has_metadata(MetaDataStyle.CBI):
print("------- ComicBookLover tags -------")
self.output("------- ComicBookLover tags -------")
try:
if self.config.Runtime_Options__raw:
raw = ca.read_raw_cbi()
if isinstance(raw, bytes):
raw = raw.decode("utf-8")
print(raw)
self.output(raw)
else:
print(ca.read_cbi())
md = ca.read_cbi()
self.output(md)
except Exception as e:
logger.error("Failed to load metadata for %s: %s", ca.path, e)
if not self.config.Runtime_Options__type or MetaDataStyle.COMET in self.config.Runtime_Options__type:
if ca.has_metadata(MetaDataStyle.COMET):
print("----------- CoMet tags -----------")
self.output("----------- CoMet tags -----------")
try:
if self.config.Runtime_Options__raw:
raw = ca.read_raw_comet()
if isinstance(raw, bytes):
raw = raw.decode("utf-8")
print(raw)
self.output(raw)
else:
print(ca.read_comet())
md = ca.read_comet()
self.output(md)
except Exception as e:
logger.error("Failed to load metadata for %s: %s", ca.path, e)
def delete(self, ca: ComicArchive) -> None:
for metadata_style in self.config.Runtime_Options__type:
style_name = MetaDataStyle.name[metadata_style]
if ca.has_metadata(metadata_style):
if not self.config.Runtime_Options__dryrun:
if not ca.remove_metadata(metadata_style):
print(f"{ca.path}: Tag removal seemed to fail!")
else:
print(f"{ca.path}: Removed {style_name} tags.")
return Result(Action.print, Status.success, ca.path, md=md)
def delete_style(self, ca: ComicArchive, style: int) -> Status:
style_name = MetaDataStyle.name[style]
if ca.has_metadata(style):
if not self.config.Runtime_Options__dryrun:
if ca.remove_metadata(style):
self.output(f"{ca.path}: Removed {style_name} tags.")
return Status.success
else:
print(f"{ca.path}: dry-run. {style_name} tags not removed")
self.output(f"{ca.path}: Tag removal seemed to fail!")
return Status.write_failure
else:
print(f"{ca.path}: This archive doesn't have {style_name} tags to remove.")
self.output(f"{ca.path}: dry-run. {style_name} tags not removed")
return Status.success
self.output(f"{ca.path}: This archive doesn't have {style_name} tags to remove.")
return Status.success
def copy(self, ca: ComicArchive) -> None:
def delete(self, ca: ComicArchive) -> Result:
res = Result(Action.delete, Status.success, ca.path)
for metadata_style in self.config.Runtime_Options__type:
dst_style_name = MetaDataStyle.name[metadata_style]
if not self.config.Runtime_Options__overwrite and ca.has_metadata(metadata_style):
print(f"{ca.path}: Already has {dst_style_name} tags. Not overwriting.")
return
if self.config.Commands__copy == metadata_style:
print(f"{ca.path}: Destination and source are same: {dst_style_name}. Nothing to do.")
return
src_style_name = MetaDataStyle.name[self.config.Commands__copy]
if ca.has_metadata(self.config.Commands__copy):
if not self.config.Runtime_Options__dryrun:
try:
md = ca.read_metadata(self.config.Commands__copy)
except Exception as e:
md = GenericMetadata()
logger.error("Failed to load metadata for %s: %s", ca.path, e)
if self.config.Comic_Book_Lover__apply_transform_on_bulk_operation == MetaDataStyle.CBI:
md = CBLTransformer(md, self.config).apply()
if not ca.write_metadata(md, metadata_style):
print(f"{ca.path}: Tag copy seemed to fail!")
else:
print(f"{ca.path}: Copied {src_style_name} tags to {dst_style_name}.")
else:
print(f"{ca.path}: dry-run. {src_style_name} tags not copied")
status = self.delete_style(ca, metadata_style)
if status == Status.success:
res.tags_deleted.append(metadata_style)
else:
print(f"{ca.path}: This archive doesn't have {src_style_name} tags to copy.")
res.status = status
return res
def save(self, ca: ComicArchive, match_results: OnlineMatchResults) -> None:
def copy_style(self, ca: ComicArchive, md: GenericMetadata, style: int) -> Status:
dst_style_name = MetaDataStyle.name[style]
if not self.config.Runtime_Options__overwrite and ca.has_metadata(style):
self.output(f"{ca.path}: Already has {dst_style_name} tags. Not overwriting.")
return Status.existing_tags
if self.config.Commands__copy == style:
self.output(f"{ca.path}: Destination and source are same: {dst_style_name}. Nothing to do.")
return Status.existing_tags
src_style_name = MetaDataStyle.name[self.config.Commands__copy]
if ca.has_metadata(self.config.Commands__copy):
if not self.config.Runtime_Options__dryrun:
if self.config.Comic_Book_Lover__apply_transform_on_bulk_operation == MetaDataStyle.CBI:
md = CBLTransformer(md, self.config).apply()
if ca.write_metadata(md, style):
self.output(f"{ca.path}: Copied {src_style_name} tags to {dst_style_name}.")
return Status.success
else:
self.output(f"{ca.path}: Tag copy seemed to fail!")
return Status.write_failure
else:
self.output(f"{ca.path}: dry-run. {src_style_name} tags not copied")
return Status.success
self.output(f"{ca.path}: This archive doesn't have {src_style_name} tags to copy.")
return Status.read_failure
def copy(self, ca: ComicArchive) -> Result:
res = Result(Action.copy, Status.success, ca.path)
try:
res.md = ca.read_metadata(self.config.Commands__copy)
except Exception as e:
logger.error("Failed to load metadata for %s: %s", ca.path, e)
return res
for metadata_style in self.config.Runtime_Options__type:
status = self.copy_style(ca, res.md, metadata_style)
if status == Status.success:
res.tags_written.append(metadata_style)
else:
res.status = status
return res
def save(self, ca: ComicArchive, match_results: OnlineMatchResults) -> Result:
if not self.config.Runtime_Options__overwrite:
for metadata_style in self.config.Runtime_Options__type:
if ca.has_metadata(metadata_style):
print(f"{ca.path}: Already has {MetaDataStyle.name[metadata_style]} tags. Not overwriting.")
return
self.output(f"{ca.path}: Already has {MetaDataStyle.name[metadata_style]} tags. Not overwriting.")
return Result(
Action.save,
original_path=ca.path,
status=Status.existing_tags,
tags_written=self.config.Runtime_Options__type,
)
if self.batch_mode:
print(f"Processing {ca.path}...")
self.output(f"Processing {utils.path_to_short_str(ca.path)}...")
md = self.create_local_metadata(ca)
if md.issue is None or md.issue == "":
if self.config.Auto_Tag__assume_1_if_no_issue_num:
md.issue = "1"
matches: list[IssueResult] = []
# now, search online
if self.config.Runtime_Options__online:
if self.config.Runtime_Options__issue_id is not None:
@ -363,32 +455,52 @@ class CLI:
ct_md = self.current_talker().fetch_comic_data(self.config.Runtime_Options__issue_id)
except TalkerError as e:
logger.exception(f"Error retrieving issue details. Save aborted.\n{e}")
match_results.fetch_data_failures.append(str(ca.path.absolute()))
return
res = Result(
Action.save,
original_path=ca.path,
status=Status.fetch_data_failure,
tags_written=self.config.Runtime_Options__type,
)
match_results.fetch_data_failures.append(res)
return res
if ct_md is None:
logger.error("No match for ID %s was found.", self.config.Runtime_Options__issue_id)
match_results.no_matches.append(str(ca.path.absolute()))
return
res = Result(
Action.save,
status=Status.match_failure,
original_path=ca.path,
match_status=MatchStatus.no_match,
tags_written=self.config.Runtime_Options__type,
)
match_results.no_matches.append(res)
return res
if self.config.Comic_Book_Lover__apply_transform_on_import:
ct_md = CBLTransformer(ct_md, self.config).apply()
else:
if md is None or md.is_empty:
logger.error("No metadata given to search online with!")
match_results.no_matches.append(str(ca.path.absolute()))
return
res = Result(
Action.save,
status=Status.match_failure,
original_path=ca.path,
match_status=MatchStatus.no_match,
tags_written=self.config.Runtime_Options__type,
)
match_results.no_matches.append(res)
return res
ii = IssueIdentifier(ca, self.config, self.current_talker())
def myoutput(text: str) -> None:
if self.config.Runtime_Options__verbose:
IssueIdentifier.default_write_output(text)
self.output(text)
# use our overlaid MD struct to search
ii.set_additional_metadata(md)
ii.only_use_additional_meta_data = True
ii.set_output_function(myoutput)
ii.set_output_function(functools.partial(self.output, already_logged=True))
ii.cover_page_index = md.get_cover_page_index_list()[0]
matches = ii.search()
@ -416,35 +528,75 @@ class CLI:
if choices:
if low_confidence:
logger.error("Online search: Multiple low confidence matches. Save aborted")
match_results.low_confidence_matches.append(MultipleMatch(ca, matches))
return
res = Result(
Action.save,
status=Status.match_failure,
original_path=ca.path,
online_results=matches,
match_status=MatchStatus.low_confidence_match,
tags_written=self.config.Runtime_Options__type,
)
match_results.low_confidence_matches.append(res)
return res
logger.error("Online search: Multiple good matches. Save aborted")
match_results.multiple_matches.append(MultipleMatch(ca, matches))
return
res = Result(
Action.save,
status=Status.match_failure,
original_path=ca.path,
online_results=matches,
match_status=MatchStatus.multiple_match,
tags_written=self.config.Runtime_Options__type,
)
match_results.multiple_matches.append(res)
return res
if low_confidence and self.config.Runtime_Options__abort_on_low_confidence:
logger.error("Online search: Low confidence match. Save aborted")
match_results.low_confidence_matches.append(MultipleMatch(ca, matches))
return
res = Result(
Action.save,
status=Status.match_failure,
original_path=ca.path,
online_results=matches,
match_status=MatchStatus.low_confidence_match,
tags_written=self.config.Runtime_Options__type,
)
match_results.low_confidence_matches.append(res)
return res
if not found_match:
logger.error("Online search: No match found. Save aborted")
match_results.no_matches.append(str(ca.path.absolute()))
return
res = Result(
Action.save,
status=Status.match_failure,
original_path=ca.path,
online_results=matches,
match_status=MatchStatus.no_match,
tags_written=self.config.Runtime_Options__type,
)
match_results.no_matches.append(res)
return res
# we got here, so we have a single match
# now get the particular issue data
ct_md = self.actual_issue_data_fetch(matches[0]["issue_id"])
ct_md = self.actual_issue_data_fetch(matches[0].issue_id)
if ct_md.is_empty:
match_results.fetch_data_failures.append(str(ca.path.absolute()))
return
res = Result(
Action.save,
status=Status.fetch_data_failure,
original_path=ca.path,
online_results=matches,
match_status=MatchStatus.good_match,
tags_written=self.config.Runtime_Options__type,
)
match_results.fetch_data_failures.append(res)
return res
if self.config.Issue_Identifier__clear_metadata_on_import:
md = GenericMetadata()
notes = (
f"Tagged with ComicTagger {ctversion.version} using info from {self.current_talker().name} on"
f" {datetime.now():%Y-%m-%d %H:%M:%S}. [Issue ID {ct_md.issue_id}]"
+ f" {datetime.now():%Y-%m-%d %H:%M:%S}. [Issue ID {ct_md.issue_id}]"
)
md.overlay(
ct_md.replace(
@ -456,13 +608,24 @@ class CLI:
if self.config.Issue_Identifier__auto_imprint:
md.fix_publisher()
res = Result(
Action.save,
status=Status.success,
original_path=ca.path,
online_results=matches,
match_status=MatchStatus.good_match,
md=md,
tags_written=self.config.Runtime_Options__type,
)
# ok, done building our metadata. time to save
if not self.actual_metadata_save(ca, md):
match_results.write_failures.append(str(ca.path.absolute()))
if self.actual_metadata_save(ca, md):
match_results.good_matches.append(res)
else:
match_results.good_matches.append(str(ca.path.absolute()))
res.status = Status.write_failure
match_results.write_failures.append(res)
return res
def rename(self, ca: ComicArchive) -> None:
def rename(self, ca: ComicArchive) -> Result:
original_path = ca.path
msg_hdr = ""
if self.batch_mode:
@ -472,7 +635,7 @@ class CLI:
if md.series is None:
logger.error(msg_hdr + "Can't rename without series name")
return
return Result(Action.rename, Status.read_failure, original_path)
new_ext = "" # default
if self.config.File_Rename__set_extension_based_on_archive:
@ -492,26 +655,27 @@ class CLI:
new_name = renamer.determine_name(ext=new_ext)
except ValueError:
logger.exception(
msg_hdr + "Invalid format string!\n"
"Your rename template is invalid!\n\n"
"%s\n\n"
"Please consult the template help in the settings "
"and the documentation on the format at "
"https://docs.python.org/3/library/string.html#format-string-syntax",
msg_hdr
+ "Invalid format string!\n"
+ "Your rename template is invalid!\n\n"
+ "%s\n\n"
+ "Please consult the template help in the settings "
+ "and the documentation on the format at "
+ "https://docs.python.org/3/library/string.html#format-string-syntax",
self.config.File_Rename__template,
)
return
return Result(Action.rename, Status.rename_failure, original_path, md=md)
except Exception:
logger.exception("Formatter failure: %s metadata: %s", self.config.File_Rename__template, renamer.metadata)
return
return Result(Action.rename, Status.rename_failure, original_path, md=md)
folder = get_rename_dir(ca, self.config.File_Rename__dir if self.config.File_Rename__move_to_dir else None)
full_path = folder / new_name
if full_path == ca.path:
print(msg_hdr + "Filename is already good!", file=sys.stderr)
return
self.output(msg_hdr + "Filename is already good!")
return Result(Action.rename, Status.success, original_path, full_path, md=md)
suffix = ""
if not self.config.Runtime_Options__dryrun:
@ -520,41 +684,41 @@ class CLI:
ca.rename(utils.unique_file(full_path))
except OSError:
logger.exception("Failed to rename comic archive: %s", ca.path)
return Result(Action.rename, Status.write_failure, original_path, full_path, md=md)
else:
suffix = " (dry-run, no change)"
print(f"renamed '{original_path.name}' -> '{new_name}' {suffix}")
self.output(f"renamed '{original_path.name}' -> '{new_name}' {suffix}")
return Result(Action.rename, Status.success, original_path, md=md)
def export(self, ca: ComicArchive) -> None:
def export(self, ca: ComicArchive) -> Result:
msg_hdr = ""
if self.batch_mode:
msg_hdr = f"{ca.path}: "
if ca.is_zip():
logger.error(msg_hdr + "Archive is already a zip file.")
return
return Result(Action.export, Status.success, ca.path)
filename_path = ca.path
new_file = filename_path.with_suffix(".cbz")
if self.config.Runtime_Options__abort_on_conflict and new_file.exists():
print(msg_hdr + f"{new_file.name} already exists in the that folder.")
return
self.output(msg_hdr + f"{new_file.name} already exists in the that folder.")
return Result(Action.export, Status.write_failure, ca.path)
new_file = utils.unique_file(new_file)
delete_success = False
export_success = False
if not self.config.Runtime_Options__dryrun:
if ca.export_as_zip(new_file):
export_success = True
if export_success := ca.export_as_zip(new_file):
if self.config.Runtime_Options__delete_after_zip_export:
try:
filename_path.unlink(missing_ok=True)
delete_success = True
except OSError:
logger.exception(msg_hdr + "Error deleting original archive after export")
delete_success = False
else:
# last export failed, so remove the zip, if it exists
new_file.unlink(missing_ok=True)
@ -562,8 +726,8 @@ class CLI:
msg = msg_hdr + f"Dry-run: Would try to create {os.path.split(new_file)[1]}"
if self.config.Runtime_Options__delete_after_zip_export:
msg += " and delete original."
print(msg)
return
self.output(msg)
return Result(Action.export, Status.success, ca.path, new_file)
msg = msg_hdr
if export_success:
@ -573,42 +737,40 @@ class CLI:
else:
msg += "Archive failed to export!"
print(msg)
self.output(msg)
def process_file_cli(self, filename: str, match_results: OnlineMatchResults) -> None:
return Result(Action.export, Status.success, ca.path, new_file)
def process_file_cli(self, command: Action, filename: str, match_results: OnlineMatchResults) -> Result:
if not os.path.lexists(filename):
logger.error("Cannot find %s", filename)
return
return Result(command, Status.read_failure, pathlib.Path(filename))
ca = ComicArchive(filename, str(graphics_path / "nocover.png"))
if not ca.seems_to_be_a_comic_archive():
logger.error("Sorry, but %s is not a comic archive!", filename)
return
return Result(Action.rename, Status.read_failure, ca.path)
if not ca.is_writable() and (
self.config.Commands__delete
or self.config.Commands__copy
or self.config.Commands__save
or self.config.Commands__rename
):
if not ca.is_writable() and (command in (Action.delete, Action.copy, Action.save, Action.rename)):
logger.error("This archive is not writable")
return
return Result(command, Status.write_permission_failure, ca.path)
if self.config.Commands__print:
self.print(ca)
if command == Action.print:
return self.print(ca)
elif self.config.Commands__delete:
self.delete(ca)
elif command == Action.delete:
return self.delete(ca)
elif self.config.Commands__copy is not None:
self.copy(ca)
elif command == Action.copy is not None:
return self.copy(ca)
elif self.config.Commands__save:
self.save(ca, match_results)
elif command == Action.save:
return self.save(ca, match_results)
elif self.config.Commands__rename:
self.rename(ca)
elif command == Action.rename:
return self.rename(ca)
elif self.config.Commands__export_to_zip:
self.export(ca)
elif command == Action.export:
return self.export(ca)
return Result(None, Status.read_failure, ca.path) # type: ignore[arg-type]

View File

@ -32,6 +32,7 @@ from comictaggerlib.ctsettings.types import (
metadata_type_single,
parse_metadata_from_string,
)
from comictaggerlib.resulttypes import Action
logger = logging.getLogger(__name__)
@ -112,7 +113,7 @@ def register_runtime(parser: settngs.Manager) -> None:
"-i",
"--interactive",
action="store_true",
help="""Interactively query the user when there are\nmultiple matches for an online search.\n\n""",
help="""Interactively query the user when there are\nmultiple matches for an online search. Disabled json output\n\n""",
file=False,
)
parser.add_setting(
@ -159,6 +160,9 @@ def register_runtime(parser: settngs.Manager) -> None:
parser.add_setting("--darkmode", action="store_true", help="Windows only. Force a dark pallet", file=False)
parser.add_setting("-g", "--glob", action="store_true", help="Windows only. Enable globbing", file=False)
parser.add_setting("--quiet", "-q", action="store_true", help="Don't say much (for print mode).", file=False)
parser.add_setting(
"--json", "-j", action="store_true", help="Output json on stdout. Ignored in interactive mode.", file=False
)
parser.add_setting(
"-t",
@ -187,14 +191,18 @@ def register_commands(parser: settngs.Manager) -> None:
parser.add_setting(
"-p",
"--print",
action="store_true",
dest="command",
action="store_const",
const=Action.print,
help="""Print out tag info from file. Specify type\n(via -t) to get only info of that tag type.\n\n""",
file=False,
)
parser.add_setting(
"-d",
"--delete",
action="store_true",
dest="command",
action="store_const",
const=Action.delete,
help="Deletes the tag block of specified type (via -t).\n",
file=False,
)
@ -209,33 +217,43 @@ def register_commands(parser: settngs.Manager) -> None:
parser.add_setting(
"-s",
"--save",
action="store_true",
dest="command",
action="store_const",
const=Action.save,
help="Save out tags as specified type (via -t).\nMust specify also at least -o, -f, or -m.\n\n",
file=False,
)
parser.add_setting(
"-r",
"--rename",
action="store_true",
dest="command",
action="store_const",
const=Action.print,
help="Rename the file based on specified tag style.",
file=False,
)
parser.add_setting(
"-e",
"--export-to-zip",
action="store_true",
dest="command",
action="store_const",
const=Action.export,
help="Export RAR archive to Zip format.",
file=False,
)
parser.add_setting(
"--only-save-config",
action="store_true",
dest="command",
action="store_const",
const=Action.save_config,
help="Only save the configuration (eg, Comic Vine API key) and quit.",
file=False,
)
parser.add_setting(
"--list-plugins",
action="store_true",
dest="command",
action="store_const",
const=Action.list_plugins,
help="List the available plugins.\n\n",
file=False,
)
@ -251,21 +269,11 @@ def validate_commandline_settings(config: settngs.Config[ct_ns], parser: settngs
parser.exit(
status=1,
message=f"ComicTagger {ctversion.version}: Copyright (c) 2012-2022 ComicTagger Team\n"
"Distributed under Apache License 2.0 (http://www.apache.org/licenses/LICENSE-2.0)\n",
+ "Distributed under Apache License 2.0 (http://www.apache.org/licenses/LICENSE-2.0)\n",
)
config[0].Runtime_Options__no_gui = any(
[
config[0].Commands__print,
config[0].Commands__delete,
config[0].Commands__save,
config[0].Commands__copy,
config[0].Commands__rename,
config[0].Commands__export_to_zip,
config[0].Commands__only_save_config,
config[0].Commands__list_plugins,
config[0].Runtime_Options__no_gui,
]
(config[0].Commands__command, config[0].Runtime_Options__no_gui, config[0].Commands__copy)
)
if platform.system() == "Windows" and config[0].Runtime_Options__glob:
@ -277,20 +285,24 @@ def validate_commandline_settings(config: settngs.Config[ct_ns], parser: settngs
for item in globs:
config[0].Runtime_Options__files.extend(glob.glob(item))
if config[0].Runtime_Options__json and config[0].Runtime_Options__interactive:
config[0].Runtime_Options__json = False
if (
not config[0].Commands__only_save_config
config[0].Commands__command != Action.save_config
and config[0].Runtime_Options__no_gui
and not config[0].Runtime_Options__files
):
parser.exit(message="Command requires at least one filename!\n", status=1)
if config[0].Commands__delete and not config[0].Runtime_Options__type:
if config[0].Commands__command == Action.delete and not config[0].Runtime_Options__type:
parser.exit(message="Please specify the type to delete with -t\n", status=1)
if config[0].Commands__save and not config[0].Runtime_Options__type:
if config[0].Commands__command == Action.save and not config[0].Runtime_Options__type:
parser.exit(message="Please specify the type to save with -t\n", status=1)
if config[0].Commands__copy:
config[0].Commands__command = Action.copy
if not config[0].Runtime_Options__type:
parser.exit(message="Please specify the type to copy to with -t\n", status=1)

View File

@ -50,7 +50,7 @@ def identifier(parser: settngs.Manager) -> None:
parser.add_setting("--series-match-search-thresh", default=90, type=int)
parser.add_setting(
"--clear-metadata",
default=True,
default=False,
help="Clears all existing metadata during import, default is to merges metadata.\nMay be used in conjunction with -o, -f and -m.\n\n",
dest="clear_metadata_on_import",
action=argparse.BooleanOptionalAction,
@ -78,12 +78,6 @@ def identifier(parser: settngs.Manager) -> None:
action=argparse.BooleanOptionalAction,
help="Enables the publisher filter",
)
parser.add_setting(
"--clear-form-before-populating",
default=False,
action=argparse.BooleanOptionalAction,
help="Clears all existing metadata when applying metadata from comic source",
)
def dialog(parser: settngs.Manager) -> None:

View File

@ -36,8 +36,8 @@ def archiver(manager: settngs.Manager) -> None:
)
def register_talker_settings(manager: settngs.Manager) -> None:
for talker in comictaggerlib.ctsettings.talkers.values():
def register_talker_settings(manager: settngs.Manager, talkers: dict[str, ComicTalker]) -> None:
for talker in talkers.values():
def api_options(manager: settngs.Manager) -> None:
# The default needs to be unset or None.
@ -76,10 +76,10 @@ def validate_archive_settings(config: settngs.Config[ct_ns]) -> settngs.Config[c
return config
def validate_talker_settings(config: settngs.Config[ct_ns]) -> settngs.Config[ct_ns]:
def validate_talker_settings(config: settngs.Config[ct_ns], talkers: dict[str, ComicTalker]) -> settngs.Config[ct_ns]:
# Apply talker settings from config file
cfg = settngs.normalize_config(config, True, True)
for talker in list(comictaggerlib.ctsettings.talkers.values()):
for talker in list(talkers.values()):
try:
cfg[0][group_for_plugin(talker)] = talker.parse_settings(cfg[0][group_for_plugin(talker)])
except Exception as e:
@ -90,12 +90,12 @@ def validate_talker_settings(config: settngs.Config[ct_ns]) -> settngs.Config[ct
return cast(settngs.Config[ct_ns], settngs.get_namespace(cfg, file=True, cmdline=True))
def validate_plugin_settings(config: settngs.Config[ct_ns]) -> settngs.Config[ct_ns]:
def validate_plugin_settings(config: settngs.Config[ct_ns], talkers: dict[str, ComicTalker]) -> settngs.Config[ct_ns]:
config = validate_archive_settings(config)
config = validate_talker_settings(config)
config = validate_talker_settings(config, talkers)
return config
def register_plugin_settings(manager: settngs.Manager) -> None:
def register_plugin_settings(manager: settngs.Manager, talkers: dict[str, ComicTalker]) -> None:
manager.add_persistent_group("Archive", archiver, False)
register_talker_settings(manager)
register_talker_settings(manager, talkers)

View File

@ -5,18 +5,13 @@ import settngs
import comicapi.genericmetadata
import comictaggerlib.ctsettings.types
import comictaggerlib.defaults
import comictaggerlib.resulttypes
class settngs_namespace(settngs.TypedNS):
Commands__version: bool
Commands__print: bool
Commands__delete: bool
Commands__command: comictaggerlib.resulttypes.Action
Commands__copy: int
Commands__save: bool
Commands__rename: bool
Commands__export_to_zip: bool
Commands__only_save_config: bool
Commands__list_plugins: bool
Runtime_Options__config: comictaggerlib.ctsettings.types.ComicTaggerPaths
Runtime_Options__verbose: int
@ -36,6 +31,7 @@ class settngs_namespace(settngs.TypedNS):
Runtime_Options__darkmode: bool
Runtime_Options__glob: bool
Runtime_Options__quiet: bool
Runtime_Options__json: bool
Runtime_Options__type: list[int]
Runtime_Options__overwrite: bool
Runtime_Options__no_gui: bool
@ -63,7 +59,6 @@ class settngs_namespace(settngs.TypedNS):
Issue_Identifier__sort_series_by_year: bool
Issue_Identifier__exact_series_matches_first: bool
Issue_Identifier__always_use_publisher_filter: bool
Issue_Identifier__clear_form_before_populating: bool
Filename_Parsing__complicated_parser: bool
Filename_Parsing__remove_c2c: bool

View File

@ -17,7 +17,7 @@ from __future__ import annotations
import io
import logging
import sys
from operator import attrgetter
from typing import Any, Callable
from typing_extensions import NotRequired, TypedDict
@ -102,8 +102,8 @@ class IssueIdentifier:
self.publisher_filter = [s.strip().casefold() for s in config.Issue_Identifier__publisher_filter]
self.additional_metadata = GenericMetadata()
self.output_function: Callable[[str], None] = IssueIdentifier.default_write_output
self.callback: Callable[[int, int], None] | None = None
self.output_function: Callable[[str], None] = print
self.progress_callback: Callable[[int, int], None] | None = None
self.cover_url_callback: Callable[[bytes], None] | None = None
self.search_result = self.result_no_matches
self.cover_page_index = 0
@ -208,7 +208,7 @@ class IssueIdentifier:
return None
def set_progress_callback(self, cb_func: Callable[[int, int], None]) -> None:
self.callback = cb_func
self.progress_callback = cb_func
def set_cover_url_callback(self, cb_func: Callable[[bytes], None]) -> None:
self.cover_url_callback = cb_func
@ -264,16 +264,33 @@ class IssueIdentifier:
return search_keys
@staticmethod
def default_write_output(text: str) -> None:
sys.stdout.write(text)
sys.stdout.flush()
def log_msg(self, msg: Any, newline: bool = True) -> None:
def log_msg(self, msg: Any) -> None:
msg = str(msg)
if newline:
msg += "\n"
self.output_function(msg)
for handler in logging.getLogger().handlers:
handler.flush()
self.output(msg)
def output(self, *args: Any, file: Any = None, **kwargs: Any) -> None:
# We intercept and discard the file argument otherwise everything is passed to self.output_function
# Ensure args[0] is defined and is a string for logger.info
if not args:
log_args: tuple[Any, ...] = ("",)
elif isinstance(args[0], str):
log_args = (args[0].strip("\n"), *args[1:])
else:
log_args = args
log_msg = " ".join([str(x) for x in log_args])
# Always send to logger so that we have a record for troubleshooting
logger.info(log_msg, **kwargs)
# If we are verbose or quiet we don't need to call the output function
if self.config.Runtime_Options__verbose > 0 or self.config.Runtime_Options__quiet:
return
# default output is stdout
self.output_function(*args, **kwargs)
def get_issue_cover_match_score(
self,
@ -281,7 +298,6 @@ class IssueIdentifier:
alt_urls: list[str],
local_cover_hash_list: list[int],
use_remote_alternates: bool = False,
use_log: bool = True,
) -> Score:
# local_cover_hash_list is a list of pre-calculated hashes.
# use_remote_alternates - indicates to use alternate covers from CV
@ -332,10 +348,7 @@ class IssueIdentifier:
if self.cancel:
raise IssueIdentifierCancelled
if use_log and use_remote_alternates:
self.log_msg(f"[{len(remote_cover_list) - 1} alt. covers]", False)
if use_log:
self.log_msg("[ ", False)
self.log_msg(f"[{len(remote_cover_list) - 1} alt. covers]")
score_list = []
done = False
@ -343,8 +356,8 @@ class IssueIdentifier:
for remote_cover_item in remote_cover_list:
score = ImageHasher.hamming_distance(local_cover_hash, remote_cover_item["hash"])
score_list.append(Score(score=score, url=remote_cover_item["url"], hash=remote_cover_item["hash"]))
if use_log:
self.log_msg(score, False)
self.log_msg(f" - {score:03}")
if score <= self.strong_score_thresh:
# such a good score, we can quit now, since for sure we have a winner
@ -353,9 +366,6 @@ class IssueIdentifier:
if done:
break
if use_log:
self.log_msg(" ]", False)
best_score_item = min(score_list, key=lambda x: x["score"])
return best_score_item
@ -446,8 +456,8 @@ class IssueIdentifier:
self.log_msg("Searching in " + str(len(series_second_round_list)) + " series")
if self.callback is not None:
self.callback(0, len(series_second_round_list))
if self.progress_callback is not None:
self.progress_callback(0, len(series_second_round_list))
# now sort the list by name length
series_second_round_list.sort(key=lambda x: len(x.name), reverse=False)
@ -485,13 +495,12 @@ class IssueIdentifier:
# Do first round of cover matching
counter = len(shortlist)
for series, issue in shortlist:
if self.callback is not None:
self.callback(counter, len(shortlist) * 3)
if self.progress_callback is not None:
self.progress_callback(counter, len(shortlist) * 3)
counter += 1
self.log_msg(
f"Examining covers for ID: {series.id} {series.name} ({series.start_year}) ...",
newline=False,
f"Examining covers for ID: {series.id} {series.name} ({series.start_year}):",
)
# Now check the cover match against the primary image
@ -505,8 +514,8 @@ class IssueIdentifier:
logger.info("Adding cropped cover to the hashlist")
try:
image_url = issue.cover_image or ""
alt_urls = issue.alternate_images
image_url = issue._cover_image or ""
alt_urls = issue._alternate_images
score_item = self.get_issue_cover_match_score(
image_url, alt_urls, hash_list, use_remote_alternates=False
@ -516,28 +525,28 @@ class IssueIdentifier:
self.match_list = []
return self.match_list
match: IssueResult = {
"series": f"{series.name} ({series.start_year})",
"distance": score_item["score"],
"issue_number": keys["issue_number"],
"cv_issue_count": series.count_of_issues,
"url_image_hash": score_item["hash"],
"issue_title": issue.title or "",
"issue_id": issue.issue_id or "",
"series_id": series.id,
"month": issue.month,
"year": issue.year,
"publisher": None,
"image_url": image_url,
"alt_image_urls": alt_urls,
"description": issue.description or "",
}
match = IssueResult(
series=f"{series.name} ({series.start_year})",
distance=score_item["score"],
issue_number=keys["issue_number"],
cv_issue_count=series.count_of_issues,
url_image_hash=score_item["hash"],
issue_title=issue.title or "",
issue_id=issue.issue_id or "",
series_id=series.id,
month=issue.month,
year=issue.year,
publisher=None,
image_url=image_url,
alt_image_urls=alt_urls,
description=issue.description or "",
)
if series.publisher is not None:
match["publisher"] = series.publisher
match.publisher = series.publisher
self.match_list.append(match)
self.log_msg(f" --> {match['distance']}", newline=False)
self.log_msg(f"best score {match.distance:03}")
self.log_msg("")
@ -547,28 +556,27 @@ class IssueIdentifier:
return self.match_list
# sort list by image match scores
self.match_list.sort(key=lambda k: k["distance"])
self.match_list.sort(key=attrgetter("distance"))
lst = []
for i in self.match_list:
lst.append(i["distance"])
lst.append(i.distance)
self.log_msg(f"Compared to covers in {len(self.match_list)} issue(s):", newline=False)
self.log_msg(str(lst))
self.log_msg(f"Compared to covers in {len(self.match_list)} issue(s): {lst}")
def print_match(item: IssueResult) -> None:
self.log_msg(
"-----> {} #{} {} ({}/{}) -- score: {}".format(
item["series"],
item["issue_number"],
item["issue_title"],
item["month"],
item["year"],
item["distance"],
item.series,
item.issue_number,
item.issue_title,
item.month,
item.year,
item.distance,
)
)
best_score: int = self.match_list[0]["distance"]
best_score: int = self.match_list[0].distance
if best_score >= self.min_score_thresh:
# we have 1 or more low-confidence matches (all bad cover scores)
@ -585,14 +593,14 @@ class IssueIdentifier:
second_match_list = []
counter = 2 * len(self.match_list)
for m in self.match_list:
if self.callback is not None:
self.callback(counter, len(self.match_list) * 3)
if self.progress_callback is not None:
self.progress_callback(counter, len(self.match_list) * 3)
counter += 1
self.log_msg(f"Examining alternate covers for ID: {m['series_id']} {m['series']} ...", newline=False)
self.log_msg(f"Examining alternate covers for ID: {m.series_id} {m.series}:")
try:
score_item = self.get_issue_cover_match_score(
m["image_url"],
m["alt_image_urls"],
m.image_url,
m.alt_image_urls,
hash_list,
use_remote_alternates=True,
)
@ -605,7 +613,7 @@ class IssueIdentifier:
if score_item["score"] < self.min_alternate_score_thresh:
second_match_list.append(m)
m["distance"] = score_item["score"]
m.distance = score_item["score"]
if len(second_match_list) == 0:
if len(self.match_list) == 1:
@ -626,17 +634,17 @@ class IssueIdentifier:
self.match_list = second_match_list
# sort new list by image match scores
self.match_list.sort(key=lambda k: k["distance"])
best_score = self.match_list[0]["distance"]
self.match_list.sort(key=attrgetter("distance"))
best_score = self.match_list[0].distance
self.log_msg("[Second round cover matching: best score = {best_score}]")
# now drop down into the rest of the processing
if self.callback is not None:
self.callback(99, 100)
if self.progress_callback is not None:
self.progress_callback(99, 100)
# now pare down list, remove any item more than specified distant from the top scores
for match_item in reversed(self.match_list):
if match_item["distance"] > best_score + self.min_score_distance:
if match_item.distance > best_score + self.min_score_distance:
self.match_list.remove(match_item)
# One more test for the case choosing limited series first issue vs a trade with the same cover:
@ -644,11 +652,11 @@ class IssueIdentifier:
if len(self.match_list) >= 2 and keys["issue_count"] is not None and keys["issue_count"] != 1:
new_list = []
for match in self.match_list:
if match["cv_issue_count"] != 1:
if match.cv_issue_count != 1:
new_list.append(match)
else:
self.log_msg(
f"Removing series {match['series']} [{match['series_id']}] from consideration (only 1 issue)"
f"Removing series {match.series} [{match.series_id}] from consideration (only 1 issue)"
)
if len(new_list) > 0:

View File

@ -222,7 +222,7 @@ class IssueSelectionWindow(QtWidgets.QDialog):
# list selection was changed, update the issue cover
issue = self.issue_list[self.issue_id]
if not (issue.issue and issue.year and issue.month and issue.cover_image and issue.title):
if not (issue.issue and issue.year and issue.month and issue._cover_image and issue.title):
QtWidgets.QApplication.setOverrideCursor(QtGui.QCursor(QtCore.Qt.CursorShape.WaitCursor))
try:
issue = self.talker.fetch_comic_data(issue_id=self.issue_id)
@ -231,7 +231,7 @@ class IssueSelectionWindow(QtWidgets.QDialog):
QtWidgets.QApplication.restoreOverrideCursor()
self.issue_number = issue.issue or ""
self.coverWidget.set_issue_details(self.issue_id, [issue.cover_image or "", *issue.alternate_images])
self.coverWidget.set_issue_details(self.issue_id, [issue._cover_image or "", *issue._alternate_images])
if issue.description is None:
self.set_description(self.teDescription, "")
else:

View File

@ -35,6 +35,8 @@ from comictaggerlib import cli, ctsettings
from comictaggerlib.ctsettings import ct_ns
from comictaggerlib.ctversion import version
from comictaggerlib.log import setup_logging
from comictaggerlib.resulttypes import Action
from comictalker.comictalker import ComicTalker
if sys.version_info < (3, 10):
import importlib_metadata
@ -106,6 +108,7 @@ class App:
self.config: settngs.Config[ct_ns]
self.initial_arg_parser = ctsettings.initial_commandline_parser()
self.config_load_success = False
self.talkers: dict[str, ComicTalker]
def run(self) -> None:
configure_locale()
@ -119,35 +122,76 @@ class App:
def load_plugins(self, opts: argparse.Namespace) -> None:
comicapi.comicarchive.load_archive_plugins()
ctsettings.talkers = comictalker.get_talkers(version, opts.config.user_cache_dir)
self.talkers = comictalker.get_talkers(version, opts.config.user_cache_dir)
def list_plugins(
self, talkers: list[comictalker.ComicTalker], archivers: list[type[comicapi.comicarchive.Archiver]]
) -> None:
print("Metadata Sources: (ID: Name URL)") # noqa: T201
for talker in talkers:
print(f"{talker.id}: {talker.name} {talker.default_api_url}") # noqa: T201
if self.config[0].Runtime_Options__json:
for talker in talkers:
print( # noqa: T201
json.dumps(
{
"type": "talker",
"id": talker.id,
"name": talker.name,
"website": talker.website,
}
)
)
print("\nComic Archive: (Name: extension, exe)") # noqa: T201
for archiver in archivers:
a = archiver()
print(f"{a.name()}: {a.extension()}, {a.exe}") # noqa: T201
for archiver in archivers:
try:
a = archiver()
print( # noqa: T201
json.dumps(
{
"type": "archiver",
"enabled": a.enabled,
"name": a.name(),
"extension": a.extension(),
"exe": a.exe,
}
)
)
except Exception:
print( # noqa: T201
json.dumps(
{
"type": "archiver",
"enabled": archiver.enabled,
"name": "",
"extension": "",
"exe": archiver.exe,
}
)
)
else:
print("Metadata Sources: (ID: Name URL)") # noqa: T201
for talker in talkers:
print(f"{talker.id}: {talker.name} {talker.website}") # noqa: T201
print("\nComic Archive: (Name: extension, exe)") # noqa: T201
for archiver in archivers:
a = archiver()
print(f"{a.name()}: {a.extension()}, {a.exe}") # noqa: T201
def initialize(self) -> argparse.Namespace:
conf, _ = self.initial_arg_parser.parse_known_args()
conf, _ = self.initial_arg_parser.parse_known_intermixed_args()
assert conf is not None
setup_logging(conf.verbose, conf.config.user_log_dir)
return conf
def register_settings(self) -> None:
self.manager = settngs.Manager(
"A utility for reading and writing metadata to comic archives.\n\n\n"
+ "If no options are given, %(prog)s will run in windowed mode.",
"For more help visit the wiki at: https://github.com/comictagger/comictagger/wiki",
description="A utility for reading and writing metadata to comic archives.\n\n\n"
+ "If no options are given, %(prog)s will run in windowed mode.\nPlease keep the '-v' option separated '-so -v' not '-sov'",
epilog="For more help visit the wiki at: https://github.com/comictagger/comictagger/wiki",
)
ctsettings.register_commandline_settings(self.manager)
ctsettings.register_file_settings(self.manager)
ctsettings.register_plugin_settings(self.manager)
ctsettings.register_plugin_settings(self.manager, getattr(self, "talkers", {}))
def parse_settings(self, config_paths: ctsettings.ComicTaggerPaths, *args: str) -> settngs.Config[ct_ns]:
cfg, self.config_load_success = ctsettings.parse_config(
@ -158,7 +202,7 @@ class App:
config = ctsettings.validate_commandline_settings(config, self.manager)
config = ctsettings.validate_file_settings(config)
config = ctsettings.validate_plugin_settings(config)
config = ctsettings.validate_plugin_settings(config, getattr(self, "talkers", {}))
return config
def initialize_dirs(self, paths: ctsettings.ComicTaggerPaths) -> None:
@ -178,10 +222,7 @@ class App:
# config already loaded
error = None
talkers = ctsettings.talkers
del ctsettings.talkers
if len(talkers) < 1:
if len(self.talkers) < 1:
error = error = (
"Failed to load any talkers, please re-install and check the log located in '"
+ str(self.config[0].Runtime_Options__config.user_log_dir)
@ -198,11 +239,11 @@ class App:
comicapi.utils.load_publishers()
update_publishers(self.config)
if self.config[0].Commands__list_plugins:
self.list_plugins(list(talkers.values()), comicapi.comicarchive.archivers)
if self.config[0].Commands__command == Action.list_plugins:
self.list_plugins(list(self.talkers.values()), comicapi.comicarchive.archivers)
return
if self.config[0].Commands__only_save_config:
if self.config[0].Commands__command == Action.save_config:
if self.config_load_success:
settings_path = self.config[0].Runtime_Options__config.user_config_dir / "settings.json"
if self.config_load_success:
@ -224,7 +265,7 @@ class App:
if not gui.qt_available:
raise gui.import_error
return gui.open_tagger_window(talkers, self.config, error)
return gui.open_tagger_window(self.talkers, self.config, error)
except ImportError:
self.config[0].Runtime_Options__no_gui = True
logger.warning("PyQt5 is not available. ComicTagger is limited to command-line mode.")
@ -233,8 +274,9 @@ class App:
if error and error[1]:
print(f"A fatal error occurred please check the log for more information: {error[0]}") # noqa: T201
raise SystemExit(1)
try:
cli.CLI(self.config[0], talkers).run()
raise SystemExit(cli.CLI(self.config[0], self.talkers).run())
except Exception:
logger.exception("CLI mode failed")

View File

@ -93,15 +93,15 @@ class MatchSelectionWindow(QtWidgets.QDialog):
for row, match in enumerate(self.matches):
self.twList.insertRow(row)
item_text = match["series"]
item_text = match.series
item = QtWidgets.QTableWidgetItem(item_text)
item.setData(QtCore.Qt.ItemDataRole.ToolTipRole, item_text)
item.setData(QtCore.Qt.ItemDataRole.UserRole, (match,))
item.setFlags(QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled)
self.twList.setItem(row, 0, item)
if match["publisher"] is not None:
item_text = str(match["publisher"])
if match.publisher is not None:
item_text = str(match.publisher)
else:
item_text = "Unknown"
item = QtWidgets.QTableWidgetItem(item_text)
@ -111,10 +111,10 @@ class MatchSelectionWindow(QtWidgets.QDialog):
month_str = ""
year_str = "????"
if match["month"] is not None:
month_str = f"-{int(match['month']):02d}"
if match["year"] is not None:
year_str = str(match["year"])
if match.month is not None:
month_str = f"-{int(match.month):02d}"
if match.year is not None:
year_str = str(match.year)
item_text = year_str + month_str
item = QtWidgets.QTableWidgetItem(item_text)
@ -122,7 +122,7 @@ class MatchSelectionWindow(QtWidgets.QDialog):
item.setFlags(QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled)
self.twList.setItem(row, 2, item)
item_text = match["issue_title"]
item_text = match.issue_title
if item_text is None:
item_text = ""
item = QtWidgets.QTableWidgetItem(item_text)
@ -146,14 +146,15 @@ class MatchSelectionWindow(QtWidgets.QDialog):
if prev is not None and prev.row() == curr.row():
return
match = self.current_match()
self.altCoverWidget.set_issue_details(
self.current_match()["issue_id"],
[self.current_match()["image_url"], *self.current_match()["alt_image_urls"]],
match.issue_id,
[match.image_url, *match.alt_image_urls],
)
if self.current_match()["description"] is None:
if match.description is None:
self.teDescription.setText("")
else:
self.teDescription.setText(self.current_match()["description"])
self.teDescription.setText(match.description)
def set_cover_image(self) -> None:
self.archiveCoverWidget.set_archive(self.comic_archive)

View File

@ -1,11 +1,55 @@
from __future__ import annotations
from typing_extensions import TypedDict
import dataclasses
import pathlib
import sys
from enum import Enum, auto
from typing import Any
from comicapi.comicarchive import ComicArchive
from comicapi import utils
from comicapi.genericmetadata import GenericMetadata
if sys.version_info < (3, 11):
class StrEnum(str, Enum):
"""
Enum where members are also (and must be) strings
"""
def __new__(cls, *values: Any) -> Any:
"values must already be of type `str`"
if len(values) > 3:
raise TypeError(f"too many arguments for str(): {values!r}")
if len(values) == 1:
# it must be a string
if not isinstance(values[0], str):
raise TypeError(f"{values[0]!r} is not a string")
if len(values) >= 2:
# check that encoding argument is a string
if not isinstance(values[1], str):
raise TypeError(f"encoding must be a string, not {values[1]!r}")
if len(values) == 3:
# check that errors argument is a string
if not isinstance(values[2], str):
raise TypeError("errors must be a string, not %r" % (values[2]))
value = str(*values)
member = str.__new__(cls, value)
member._value_ = value
return member
@staticmethod
def _generate_next_value_(name: str, start: int, count: int, last_values: Any) -> str:
"""
Return the lower-cased version of the member name.
"""
return name.lower()
else:
from enum import StrEnum
class IssueResult(TypedDict):
@dataclasses.dataclass
class IssueResult:
series: str
distance: int
issue_number: str
@ -21,18 +65,71 @@ class IssueResult(TypedDict):
alt_image_urls: list[str]
description: str
def __str__(self) -> str:
return f"series: {self.series}; series id: {self.series_id}; issue number: {self.issue_number}; issue id: {self.issue_id}; published: {self.month} {self.year}"
class Action(StrEnum):
print = auto()
delete = auto()
copy = auto()
save = auto()
rename = auto()
export = auto()
save_config = auto()
list_plugins = auto()
class MatchStatus(StrEnum):
good_match = auto()
no_match = auto()
multiple_match = auto()
low_confidence_match = auto()
class Status(StrEnum):
success = auto()
match_failure = auto()
write_failure = auto()
fetch_data_failure = auto()
existing_tags = auto()
read_failure = auto()
write_permission_failure = auto()
rename_failure = auto()
@dataclasses.dataclass
class OnlineMatchResults:
def __init__(self) -> None:
self.good_matches: list[str] = []
self.no_matches: list[str] = []
self.multiple_matches: list[MultipleMatch] = []
self.low_confidence_matches: list[MultipleMatch] = []
self.write_failures: list[str] = []
self.fetch_data_failures: list[str] = []
good_matches: list[Result] = dataclasses.field(default_factory=list)
no_matches: list[Result] = dataclasses.field(default_factory=list)
multiple_matches: list[Result] = dataclasses.field(default_factory=list)
low_confidence_matches: list[Result] = dataclasses.field(default_factory=list)
write_failures: list[Result] = dataclasses.field(default_factory=list)
fetch_data_failures: list[Result] = dataclasses.field(default_factory=list)
class MultipleMatch:
def __init__(self, ca: ComicArchive, match_list: list[IssueResult]) -> None:
self.ca: ComicArchive = ca
self.matches: list[IssueResult] = match_list
@dataclasses.dataclass
class Result:
action: Action
status: Status | None
original_path: pathlib.Path
renamed_path: pathlib.Path | None = None
online_results: list[IssueResult] = dataclasses.field(default_factory=list)
match_status: MatchStatus | None = None
md: GenericMetadata | None = None
tags_deleted: list[int] = dataclasses.field(default_factory=list)
tags_written: list[int] = dataclasses.field(default_factory=list)
def __str__(self) -> str:
if len(self.online_results) == 0:
matches = None
elif len(self.online_results) == 1:
matches = str(self.online_results[0])
else:
matches = "\n" + "".join([f" - {x}" for x in self.online_results])
path_str = utils.path_to_short_str(self.original_path, self.renamed_path)
return f"{path_str}: {matches}"

View File

@ -265,9 +265,11 @@ class SeriesSelectionWindow(QtWidgets.QDialog):
def log_id_output(self, text: str) -> None:
if self.iddialog is not None:
print(text, end=" ") # noqa: T201
self.iddialog.textEdit.append(text.rstrip())
self.iddialog.textEdit.ensureCursorVisible()
self.iddialog.textEdit.insertPlainText(text)
QtCore.QCoreApplication.processEvents()
QtCore.QCoreApplication.processEvents()
QtCore.QCoreApplication.processEvents()
def identify_progress(self, cur: int, total: int) -> None:
if self.iddialog is not None:
@ -325,8 +327,8 @@ class SeriesSelectionWindow(QtWidgets.QDialog):
if found_match is not None:
self.iddialog.accept()
self.series_id = utils.xlate(found_match["series_id"]) or ""
self.issue_number = found_match["issue_number"]
self.series_id = utils.xlate(found_match.series_id) or ""
self.issue_number = found_match.issue_number
self.select_by_id()
self.show_issues()

View File

@ -387,7 +387,6 @@ class SettingsWindow(QtWidgets.QDialog):
self.switch_parser()
self.cbxClearFormBeforePopulating.setChecked(self.config[0].Issue_Identifier__clear_form_before_populating)
self.cbxUseFilter.setChecked(self.config[0].Issue_Identifier__always_use_publisher_filter)
self.cbxSortByYear.setChecked(self.config[0].Issue_Identifier__sort_series_by_year)
self.cbxExactMatches.setChecked(self.config[0].Issue_Identifier__exact_series_matches_first)
@ -505,7 +504,6 @@ class SettingsWindow(QtWidgets.QDialog):
self.cbxProtofoliusIssueNumberScheme.isChecked()
)
self.config[0].Issue_Identifier__clear_form_before_populating = self.cbxClearFormBeforePopulating.isChecked()
self.config[0].Issue_Identifier__always_use_publisher_filter = self.cbxUseFilter.isChecked()
self.config[0].Issue_Identifier__sort_series_by_year = self.cbxSortByYear.isChecked()
self.config[0].Issue_Identifier__exact_series_matches_first = self.cbxExactMatches.isChecked()
@ -542,9 +540,7 @@ class SettingsWindow(QtWidgets.QDialog):
QtWidgets.QDialog.accept(self)
def update_talkers_config(self) -> None:
ctsettings.talkers = self.talkers
self.config = ctsettings.plugin.validate_talker_settings(self.config)
del ctsettings.talkers
self.config = ctsettings.plugin.validate_talker_settings(self.config, self.talkers)
def select_rar(self) -> None:
self.select_file(self.leRarExePath, "RAR")

View File

@ -57,7 +57,7 @@ from comictaggerlib.optionalmsgdialog import OptionalMessageDialog
from comictaggerlib.pagebrowser import PageBrowserWindow
from comictaggerlib.pagelisteditor import PageListEditor
from comictaggerlib.renamewindow import RenameWindow
from comictaggerlib.resulttypes import IssueResult, MultipleMatch, OnlineMatchResults
from comictaggerlib.resulttypes import Action, IssueResult, MatchStatus, OnlineMatchResults, Result, Status
from comictaggerlib.seriesselectionwindow import SeriesSelectionWindow
from comictaggerlib.settingswindow import SettingsWindow
from comictaggerlib.ui import ui_path
@ -292,6 +292,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
current_logs = ""
root_logger = logging.getLogger()
qapplogwindow = ApplicationLogWindow(
self.config[0].Runtime_Options__config.user_log_dir,
QTextEditLogger(logging.Formatter("%(asctime)s | %(name)s | %(levelname)s | %(message)s"), logging.DEBUG),
parent=self,
)
@ -1083,12 +1084,12 @@ class TaggerWindow(QtWidgets.QMainWindow):
if self.config[0].Comic_Book_Lover__apply_transform_on_import:
new_metadata = CBLTransformer(new_metadata, self.config[0]).apply()
if self.config[0].Issue_Identifier__clear_form_before_populating:
if self.config[0].Issue_Identifier__clear_metadata_on_import:
self.clear_form()
notes = (
f"Tagged with ComicTagger {ctversion.version} using info from {self.current_talker().name} on"
f" {datetime.now():%Y-%m-%d %H:%M:%S}. [Issue ID {new_metadata.issue_id}]"
f" {datetime.now():%Y-%m-%d %H:%M:%S}. [Issue ID {new_metadata.issue_id}]"
)
self.metadata.overlay(
new_metadata.replace(
@ -1684,7 +1685,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
QtWidgets.QApplication.setOverrideCursor(QtGui.QCursor(QtCore.Qt.CursorShape.WaitCursor))
try:
ct_md = self.current_talker().fetch_comic_data(match["issue_id"])
ct_md = self.current_talker().fetch_comic_data(match.issue_id)
except TalkerError:
logger.exception("Save aborted.")
@ -1697,7 +1698,6 @@ class TaggerWindow(QtWidgets.QMainWindow):
return ct_md
def auto_tag_log(self, text: str) -> None:
IssueIdentifier.default_write_output(text)
if self.atprogdialog is not None:
self.atprogdialog.textEdit.append(text.rstrip())
self.atprogdialog.textEdit.ensureCursorVisible()
@ -1778,16 +1778,48 @@ class TaggerWindow(QtWidgets.QMainWindow):
if choices:
if low_confidence:
self.auto_tag_log("Online search: Multiple low-confidence matches. Save aborted\n")
match_results.low_confidence_matches.append(MultipleMatch(ca, matches))
match_results.low_confidence_matches.append(
Result(
Action.save,
Status.match_failure,
ca.path,
online_results=matches,
match_status=MatchStatus.low_confidence_match,
)
)
else:
self.auto_tag_log("Online search: Multiple matches. Save aborted\n")
match_results.multiple_matches.append(MultipleMatch(ca, matches))
match_results.multiple_matches.append(
Result(
Action.save,
Status.match_failure,
ca.path,
online_results=matches,
match_status=MatchStatus.multiple_match,
)
)
elif low_confidence and not dlg.auto_save_on_low:
self.auto_tag_log("Online search: Low confidence match. Save aborted\n")
match_results.low_confidence_matches.append(MultipleMatch(ca, matches))
match_results.low_confidence_matches.append(
Result(
Action.save,
Status.match_failure,
ca.path,
online_results=matches,
match_status=MatchStatus.low_confidence_match,
)
)
elif not found_match:
self.auto_tag_log("Online search: No match found. Save aborted\n")
match_results.no_matches.append(str(ca.path.absolute()))
match_results.no_matches.append(
Result(
Action.save,
Status.match_failure,
ca.path,
online_results=matches,
match_status=MatchStatus.no_match,
)
)
else:
# a single match!
if low_confidence:
@ -1796,7 +1828,15 @@ class TaggerWindow(QtWidgets.QMainWindow):
# now get the particular issue data
ct_md = self.actual_issue_data_fetch(matches[0])
if ct_md is None:
match_results.fetch_data_failures.append(str(ca.path.absolute()))
match_results.fetch_data_failures.append(
Result(
Action.save,
Status.fetch_data_failure,
ca.path,
online_results=matches,
match_status=MatchStatus.good_match,
)
)
if ct_md is not None:
if dlg.cbxRemoveMetadata.isChecked():
@ -1804,7 +1844,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
else:
notes = (
f"Tagged with ComicTagger {ctversion.version} using info from {self.current_talker().name} on"
f" {datetime.now():%Y-%m-%d %H:%M:%S}. [Issue ID {ct_md.issue_id}]"
f" {datetime.now():%Y-%m-%d %H:%M:%S}. [Issue ID {ct_md.issue_id}]"
)
md.overlay(ct_md.replace(notes=utils.combine_notes(md.notes, notes, "Tagged with ComicTagger")))
@ -1812,10 +1852,26 @@ class TaggerWindow(QtWidgets.QMainWindow):
md.fix_publisher()
if not ca.write_metadata(md, self.save_data_style):
match_results.write_failures.append(str(ca.path.absolute()))
match_results.write_failures.append(
Result(
Action.save,
Status.write_failure,
ca.path,
online_results=matches,
match_status=MatchStatus.good_match,
)
)
self.auto_tag_log("Save failed ;-(\n")
else:
match_results.good_matches.append(str(ca.path.absolute()))
match_results.good_matches.append(
Result(
Action.save,
Status.success,
ca.path,
online_results=matches,
match_status=MatchStatus.good_match,
)
)
success = True
self.auto_tag_log("Save complete!\n")
ca.load_cache([MetaDataStyle.CBI, MetaDataStyle.CIX])

View File

@ -14,6 +14,16 @@
<string>Log Window</string>
</property>
<layout class="QVBoxLayout" name="verticalLayout">
<item>
<widget class="QLabel" name="lblLogLocation">
<property name="text">
<string>Log Location:</string>
</property>
<property name="openExternalLinks">
<bool>true</bool>
</property>
</widget>
</item>
<item>
<widget class="QTextEdit" name="textEdit">
<property name="readOnly">

View File

@ -642,12 +642,12 @@ class ComicVineTalker(ComicTalker):
series_aliases=series.aliases,
)
if issue.get("image") is None:
md.cover_image = ""
md._cover_image = ""
else:
md.cover_image = issue.get("image", {}).get("super_url", "")
md._cover_image = issue.get("image", {}).get("super_url", "")
for alt in issue.get("associated_images", []):
md.alternate_images.append(alt["original_url"])
md._alternate_images.append(alt["original_url"])
for character in issue.get("character_credits", set()):
md.characters.add(character["name"])

View File

@ -290,7 +290,7 @@ deps =
[flake8]
max-line-length = 120
extend-ignore = E203, E501, A003
extend-ignore = E203, E501, A003, T202
extend-exclude = venv, scripts, build, dist, comictaggerlib/ctversion.py
per-file-ignores =
comictaggerlib/cli.py: T20

View File

@ -181,7 +181,7 @@ comic_issue_result = comicapi.genericmetadata.GenericMetadata(
issue_id=str(cv_issue_result["results"]["id"]),
series=cv_issue_result["results"]["volume"]["name"],
series_id=str(cv_issue_result["results"]["volume"]["id"]),
cover_image=cv_issue_result["results"]["image"]["super_url"],
_cover_image=cv_issue_result["results"]["image"]["super_url"],
issue=cv_issue_result["results"]["issue_number"],
volume=None,
title=cv_issue_result["results"]["name"],
@ -236,7 +236,7 @@ cv_md = comicapi.genericmetadata.GenericMetadata(
rights=None,
identifier=None,
last_mark=None,
cover_image=cv_issue_result["results"]["image"]["super_url"],
_cover_image=cv_issue_result["results"]["image"]["super_url"],
)

View File

@ -1,9 +1,11 @@
from __future__ import annotations
import copy
import datetime
import io
import shutil
import unittest.mock
from argparse import Namespace
from collections.abc import Generator
from typing import Any
@ -15,6 +17,7 @@ from pyrate_limiter import Limiter, RequestRate
import comicapi.comicarchive
import comicapi.genericmetadata
import comictaggerlib.cli
import comictaggerlib.ctsettings
import comictalker
import comictalker.comiccacher
@ -127,10 +130,22 @@ def comicvine_api(monkeypatch, cbz, comic_cache, mock_version, config) -> comict
return cv
@pytest.fixture
def mock_now(monkeypatch):
class mydatetime:
time = datetime.datetime(2022, 4, 16, 15, 52, 26)
@classmethod
def now(cls):
return cls.time
monkeypatch.setattr(comictaggerlib.cli, "datetime", mydatetime)
@pytest.fixture
def mock_version(monkeypatch):
version = "1.4.4a9.dev20"
version_tuple = (1, 4, 4, "dev20")
version = "1.3.2a5"
version_tuple = (1, 3, 2)
monkeypatch.setattr(comictaggerlib.ctversion, "version", version)
monkeypatch.setattr(comictaggerlib.ctversion, "__version__", version)
@ -182,6 +197,24 @@ def config(tmp_path):
yield defaults
@pytest.fixture
def plugin_config(tmp_path):
from comictaggerlib.main import App
ns = Namespace(config=comictaggerlib.ctsettings.ComicTaggerPaths(tmp_path / "config"))
app = App()
app.load_plugins(ns)
app.register_settings()
defaults = app.parse_settings(ns.config, "")
defaults[0].Runtime_Options__config.user_data_dir.mkdir(parents=True, exist_ok=True)
defaults[0].Runtime_Options__config.user_config_dir.mkdir(parents=True, exist_ok=True)
defaults[0].Runtime_Options__config.user_cache_dir.mkdir(parents=True, exist_ok=True)
defaults[0].Runtime_Options__config.user_state_dir.mkdir(parents=True, exist_ok=True)
defaults[0].Runtime_Options__config.user_log_dir.mkdir(parents=True, exist_ok=True)
yield (defaults, app.talkers)
@pytest.fixture
def comic_cache(config, mock_version) -> Generator[comictalker.comiccacher.ComicCacher, Any, None]:
yield comictalker.comiccacher.ComicCacher(config[0].Runtime_Options__config.user_cache_dir, mock_version[0])

94
tests/integration_test.py Normal file
View File

@ -0,0 +1,94 @@
from __future__ import annotations
import settngs
import comicapi.comicarchive
import comicapi.comicinfoxml
import comicapi.genericmetadata
import comictaggerlib.resulttypes
from comictaggerlib import ctsettings
from comictaggerlib.cli import CLI
from comictalker.comictalker import ComicTalker
def test_save(
plugin_config: tuple[settngs.Config[ctsettings.ct_ns], dict[str, ComicTalker]],
tmp_comic,
comicvine_api,
md_saved,
mock_now,
) -> None:
# Overwrite the series so it has definitely changed
tmp_comic.write_cix(md_saved.replace(series="nothing"))
md = tmp_comic.read_cix()
# Check that it changed
assert md != md_saved
# Clear the cached metadata
tmp_comic.reset_cache()
# Setup the app
config = plugin_config[0]
talkers = plugin_config[1]
# Save
config[0].Commands__command = comictaggerlib.resulttypes.Action.save
# Check online, should be intercepted by comicvine_api
config[0].Runtime_Options__online = True
# Use the temporary comic we created
config[0].Runtime_Options__files = [tmp_comic.path]
# Save ComicRack tags
config[0].Runtime_Options__type = [comicapi.comicarchive.MetaDataStyle.CIX]
# Search using the correct series since we just put the wrong series name in the CBZ
config[0].Runtime_Options__metadata = comicapi.genericmetadata.GenericMetadata(series=md_saved.series)
# Run ComicTagger
CLI(config[0], talkers).run()
# Read the CBZ
md = tmp_comic.read_cix()
# Validate that we got the correct metadata back
assert md == md_saved
def test_delete(
plugin_config: tuple[settngs.Config[ctsettings.ct_ns], dict[str, ComicTalker]],
tmp_comic,
comicvine_api,
md_saved,
mock_now,
) -> None:
md = tmp_comic.read_cix()
# Check that the metadata starts correct
assert md == md_saved
# Clear the cached metadata
tmp_comic.reset_cache()
# Setup the app
config = plugin_config[0]
talkers = plugin_config[1]
# Delete
config[0].Commands__command = comictaggerlib.resulttypes.Action.delete
# Use the temporary comic we created
config[0].Runtime_Options__files = [tmp_comic.path]
# Delete ComicRack tags
config[0].Runtime_Options__type = [comicapi.comicarchive.MetaDataStyle.CIX]
# Run ComicTagger
CLI(config[0], talkers).run()
# Read the CBZ
md = tmp_comic.read_cix()
# Currently we set the default page list on load
empty_md = comicapi.genericmetadata.GenericMetadata()
empty_md.set_default_page_list(tmp_comic.get_number_of_pages())
# Validate that we got an empty metadata back
assert md == empty_md

View File

@ -8,6 +8,7 @@ from PIL import Image
import comictaggerlib.issueidentifier
import testing.comicdata
import testing.comicvine
from comictaggerlib.resulttypes import IssueResult
def test_crop(cbz_double_cover, config, tmp_path, comicvine_api):
@ -51,23 +52,23 @@ def test_search(cbz, config, comicvine_api):
config, definitions = config
ii = comictaggerlib.issueidentifier.IssueIdentifier(cbz, config, comicvine_api)
results = ii.search()
cv_expected = {
"series": f"{testing.comicvine.cv_volume_result['results']['name']} ({testing.comicvine.cv_volume_result['results']['start_year']})",
"distance": 0,
"issue_number": testing.comicvine.cv_issue_result["results"]["issue_number"],
"alt_image_urls": [],
"cv_issue_count": testing.comicvine.cv_volume_result["results"]["count_of_issues"],
"issue_title": testing.comicvine.cv_issue_result["results"]["name"],
"issue_id": str(testing.comicvine.cv_issue_result["results"]["id"]),
"series_id": str(testing.comicvine.cv_volume_result["results"]["id"]),
"month": testing.comicvine.date[1],
"year": testing.comicvine.date[2],
"publisher": testing.comicvine.cv_volume_result["results"]["publisher"]["name"],
"image_url": testing.comicvine.cv_issue_result["results"]["image"]["super_url"],
"description": testing.comicvine.cv_issue_result["results"]["description"],
}
cv_expected = IssueResult(
series=f"{testing.comicvine.cv_volume_result['results']['name']} ({testing.comicvine.cv_volume_result['results']['start_year']})",
distance=0,
issue_number=testing.comicvine.cv_issue_result["results"]["issue_number"],
alt_image_urls=[],
cv_issue_count=testing.comicvine.cv_volume_result["results"]["count_of_issues"],
issue_title=testing.comicvine.cv_issue_result["results"]["name"],
issue_id=str(testing.comicvine.cv_issue_result["results"]["id"]),
series_id=str(testing.comicvine.cv_volume_result["results"]["id"]),
month=testing.comicvine.date[1],
year=testing.comicvine.date[2],
publisher=testing.comicvine.cv_volume_result["results"]["publisher"]["name"],
image_url=testing.comicvine.cv_issue_result["results"]["image"]["super_url"],
description=testing.comicvine.cv_issue_result["results"]["description"],
url_image_hash=1747255366011518976,
)
for r, e in zip(results, [cv_expected]):
del r["url_image_hash"]
assert r == e