Compare commits

...

10 Commits

Author SHA1 Message Date
Timmy Welch
4c9096a11b Implement the most basic local plugin isolation possible
Remove modules belonging to local plugins after loading
Remove sys.path entry after loading

This means that multiple local plugins can be installed with the same import path and should work correctly
This does not allow loading a local plugin that has the same import path as an installed plugin
2024-09-15 17:09:33 -07:00
Timmy Welch
c9c0c99a2a Increase rate limits on CV to cover the 200 requests/Hr restriction
Add twitter's alternative to HTTP code 429
2024-09-12 13:56:57 -07:00
Timmy Welch
58f71cf6d9 Remove archived tags from tests 2024-09-12 13:17:06 -07:00
Timmy Welch
befffc98b1 Catch all exceptions when parsing metadata from the CLI 2024-09-12 13:11:30 -07:00
Timmy Welch
006f3cbd1f Remove comet and cbl tags 2024-09-12 12:09:07 -07:00
Timmy Welch
582224abec Fixes for quick-tag 2024-09-12 11:51:38 -07:00
Timmy Welch
acb59f9e83 Fix saving settings 2024-08-24 12:19:11 -07:00
Timmy Welch
fab30f3f29 Add experimental quick-tag 2024-08-18 19:16:55 -07:00
Timmy Welch
2cb6caea8d Ignore update with incomplete data when complete data is already cached 2024-08-16 17:05:28 -07:00
Timmy Welch
ffdf7d71e1 Fix tests 2024-08-16 12:50:14 -07:00
26 changed files with 1045 additions and 815 deletions

View File

@ -10,7 +10,7 @@ import comictaggerlib.main
def generate() -> str:
app = comictaggerlib.main.App()
app.load_plugins(app.initial_arg_parser.parse_known_args()[0])
app.register_settings()
app.register_settings(True)
imports, types = settngs.generate_dict(app.manager.definitions)
imports2, types2 = settngs.generate_ns(app.manager.definitions)
i = imports.splitlines()

View File

@ -24,7 +24,6 @@ import pathlib
import shutil
import sys
from collections.abc import Iterable
from typing import TYPE_CHECKING
from comicapi import utils
from comicapi.archivers import Archiver, UnknownArchiver, ZipArchiver
@ -32,16 +31,13 @@ from comicapi.genericmetadata import GenericMetadata
from comicapi.tags import Tag
from comictaggerlib.ctversion import version
if TYPE_CHECKING:
from importlib.metadata import EntryPoint
logger = logging.getLogger(__name__)
archivers: list[type[Archiver]] = []
tags: dict[str, Tag] = {}
def load_archive_plugins(local_plugins: Iterable[EntryPoint] = tuple()) -> None:
def load_archive_plugins(local_plugins: Iterable[type[Archiver]] = tuple()) -> None:
if archivers:
return
if sys.version_info < (3, 10):
@ -52,7 +48,7 @@ def load_archive_plugins(local_plugins: Iterable[EntryPoint] = tuple()) -> None:
archive_plugins: list[type[Archiver]] = []
# A list is used first matching plugin wins
for ep in itertools.chain(local_plugins, entry_points(group="comicapi.archiver")):
for ep in itertools.chain(entry_points(group="comicapi.archiver")):
try:
spec = importlib.util.find_spec(ep.module)
except ValueError:
@ -70,11 +66,12 @@ def load_archive_plugins(local_plugins: Iterable[EntryPoint] = tuple()) -> None:
else:
logger.exception("Failed to load archive plugin: %s", ep.name)
archivers.clear()
archivers.extend(local_plugins)
archivers.extend(archive_plugins)
archivers.extend(builtin)
def load_tag_plugins(version: str = f"ComicAPI/{version}", local_plugins: Iterable[EntryPoint] = tuple()) -> None:
def load_tag_plugins(version: str = f"ComicAPI/{version}", local_plugins: Iterable[type[Tag]] = tuple()) -> None:
if tags:
return
if sys.version_info < (3, 10):
@ -84,7 +81,7 @@ def load_tag_plugins(version: str = f"ComicAPI/{version}", local_plugins: Iterab
builtin: dict[str, Tag] = {}
tag_plugins: dict[str, tuple[Tag, str]] = {}
# A dict is used, last plugin wins
for ep in itertools.chain(entry_points(group="comicapi.tags"), local_plugins):
for ep in entry_points(group="comicapi.tags"):
location = "Unknown"
try:
_spec = importlib.util.find_spec(ep.module)
@ -109,6 +106,9 @@ def load_tag_plugins(version: str = f"ComicAPI/{version}", local_plugins: Iterab
tag_plugins[tag.id] = (tag(version), location)
except Exception:
logger.exception("Failed to load tag plugin: %s from %s", ep.name, location)
# A dict is used, last plugin wins
for tag in local_plugins:
tag_plugins[tag.id] = (tag(version), "Local")
for tag_id in set(builtin.keys()).intersection(tag_plugins):
location = tag_plugins[tag_id][1]

View File

@ -94,6 +94,19 @@ class PageMetadata:
return False
return self.archive_index == other.archive_index
def _get_clean_metadata(self, *attributes: str) -> PageMetadata:
return PageMetadata(
filename=self.filename if "filename" in attributes else "",
type=self.type if "type" in attributes else "",
bookmark=self.bookmark if "bookmark" in attributes else "",
display_index=self.display_index if "display_index" in attributes else 0,
archive_index=self.archive_index if "archive_index" in attributes else 0,
double_page=self.double_page if "double_page" in attributes else None,
byte_size=self.byte_size if "byte_size" in attributes else None,
height=self.height if "height" in attributes else None,
width=self.width if "width" in attributes else None,
)
Credit = merge.Credit
@ -206,14 +219,23 @@ class GenericMetadata:
tmp.__dict__.update(kwargs)
return tmp
def get_clean_metadata(self, *attributes: str) -> GenericMetadata:
def _get_clean_metadata(self, *attributes: str) -> GenericMetadata:
new_md = GenericMetadata()
list_handled = []
for attr in sorted(attributes):
if "." in attr:
lst, _, name = attr.partition(".")
if lst in list_handled:
continue
old_value = getattr(self, lst)
new_value = getattr(new_md, lst)
if old_value:
if hasattr(old_value[0], "_get_clean_metadata"):
list_attributes = [x.removeprefix(lst + ".") for x in attributes if x.startswith(lst)]
for x in old_value:
new_value.append(x._get_clean_metadata(*list_attributes))
list_handled.append(lst)
continue
if not new_value:
for x in old_value:
new_value.append(x.__class__())

View File

@ -1,323 +0,0 @@
"""A class to encapsulate CoMet data"""
#
# Copyright 2012-2014 ComicTagger Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import logging
import os
import xml.etree.ElementTree as ET
from typing import Any
from comicapi import utils
from comicapi.archivers import Archiver
from comicapi.comicarchive import ComicArchive
from comicapi.genericmetadata import GenericMetadata, PageMetadata, PageType
from comicapi.tags import Tag
logger = logging.getLogger(__name__)
class CoMet(Tag):
enabled = True
id = "comet"
def __init__(self, version: str) -> None:
super().__init__(version)
self.comet_filename = "CoMet.xml"
self.file = "CoMet.xml"
self.supported_attributes = {
"series",
"issue",
"title",
"volume",
"genres",
"description",
"publisher",
"language",
"format",
"maturity_rating",
"month",
"year",
"page_count",
"characters",
"credits",
"credits.person",
"credits.primary",
"credits.role",
"price",
"is_version_of",
"rights",
"identifier",
"last_mark",
"pages.type", # This is required for setting the cover image none of the other types will be saved
"pages",
}
def supports_credit_role(self, role: str) -> bool:
return role.casefold() in self._get_parseable_credits()
def supports_tags(self, archive: Archiver) -> bool:
return archive.supports_files()
def has_tags(self, archive: Archiver) -> bool:
if not self.supports_tags(archive):
return False
has_tags = False
# look at all xml files in root, and search for CoMet data, get first
for n in archive.get_filename_list():
if os.path.dirname(n) == "" and os.path.splitext(n)[1].casefold() == ".xml":
# read in XML file, and validate it
data = b""
try:
data = archive.read_file(n)
except Exception as e:
logger.warning("Error reading in Comet XML for validation! from %s: %s", archive.path, e)
if self._validate_bytes(data):
# since we found it, save it!
self.file = n
has_tags = True
break
return has_tags
def remove_tags(self, archive: Archiver) -> bool:
return self.has_tags(archive) and archive.remove_file(self.file)
def read_tags(self, archive: Archiver) -> GenericMetadata:
if self.has_tags(archive):
metadata = archive.read_file(self.file) or b""
if self._validate_bytes(metadata):
return self._metadata_from_bytes(metadata, archive)
return GenericMetadata()
def read_raw_tags(self, archive: Archiver) -> str:
if self.has_tags(archive):
return ET.tostring(ET.fromstring(archive.read_file(self.file)), encoding="unicode", xml_declaration=True)
return ""
def write_tags(self, metadata: GenericMetadata, archive: Archiver) -> bool:
if self.supports_tags(archive):
success = True
xml = b""
if self.has_tags(archive):
xml = archive.read_file(self.file)
if self.file != self.comet_filename:
success = self.remove_tags(archive)
return success and archive.write_file(self.comet_filename, self._bytes_from_metadata(metadata, xml))
else:
logger.warning(f"Archive ({archive.name()}) does not support {self.name()} metadata")
return False
def name(self) -> str:
return "Comic Metadata (CoMet)"
@classmethod
def _get_parseable_credits(cls) -> list[str]:
parsable_credits: list[str] = []
parsable_credits.extend(GenericMetadata.writer_synonyms)
parsable_credits.extend(GenericMetadata.penciller_synonyms)
parsable_credits.extend(GenericMetadata.inker_synonyms)
parsable_credits.extend(GenericMetadata.colorist_synonyms)
parsable_credits.extend(GenericMetadata.letterer_synonyms)
parsable_credits.extend(GenericMetadata.cover_synonyms)
parsable_credits.extend(GenericMetadata.editor_synonyms)
return parsable_credits
def _metadata_from_bytes(self, string: bytes, archive: Archiver) -> GenericMetadata:
tree = ET.ElementTree(ET.fromstring(string))
return self._convert_xml_to_metadata(tree, archive)
def _bytes_from_metadata(self, metadata: GenericMetadata, xml: bytes = b"") -> bytes:
tree = self._convert_metadata_to_xml(metadata, xml)
return ET.tostring(tree.getroot(), encoding="utf-8", xml_declaration=True)
def _convert_metadata_to_xml(self, metadata: GenericMetadata, xml: bytes = b"") -> ET.ElementTree:
# shorthand for the metadata
md = metadata
if xml:
root = ET.fromstring(xml)
else:
# build a tree structure
root = ET.Element("comet")
root.attrib["xmlns:comet"] = "http://www.denvog.com/comet/"
root.attrib["xmlns:xsi"] = "http://www.w3.org/2001/XMLSchema-instance"
root.attrib["xsi:schemaLocation"] = "http://www.denvog.com http://www.denvog.com/comet/comet.xsd"
# helper func
def assign(comet_entry: str, md_entry: Any) -> None:
if md_entry is not None:
ET.SubElement(root, comet_entry).text = str(md_entry)
# title is manditory
assign("title", md.title or "")
assign("series", md.series)
assign("issue", md.issue) # must be int??
assign("volume", md.volume)
assign("description", md.description)
assign("publisher", md.publisher)
assign("pages", md.page_count)
assign("format", md.format)
assign("language", md.language)
assign("rating", md.maturity_rating)
assign("price", md.price)
assign("isVersionOf", md.is_version_of)
assign("rights", md.rights)
assign("identifier", md.identifier)
assign("lastMark", md.last_mark)
assign("genre", ",".join(md.genres)) # TODO repeatable
for c in md.characters:
assign("character", c.strip())
if md.manga is not None and md.manga == "YesAndRightToLeft":
assign("readingDirection", "rtl")
if md.year is not None:
date_str = f"{md.year:04}"
if md.month is not None:
date_str += f"-{md.month:02}"
assign("date", date_str)
cover_index = md.get_cover_page_index_list()[0]
assign("coverImage", md.pages[cover_index].filename)
# loop thru credits, and build a list for each role that CoMet supports
for credit in metadata.credits:
if credit.role.casefold() in set(GenericMetadata.writer_synonyms):
ET.SubElement(root, "writer").text = str(credit.person)
if credit.role.casefold() in set(GenericMetadata.penciller_synonyms):
ET.SubElement(root, "penciller").text = str(credit.person)
if credit.role.casefold() in set(GenericMetadata.inker_synonyms):
ET.SubElement(root, "inker").text = str(credit.person)
if credit.role.casefold() in set(GenericMetadata.colorist_synonyms):
ET.SubElement(root, "colorist").text = str(credit.person)
if credit.role.casefold() in set(GenericMetadata.letterer_synonyms):
ET.SubElement(root, "letterer").text = str(credit.person)
if credit.role.casefold() in set(GenericMetadata.cover_synonyms):
ET.SubElement(root, "coverDesigner").text = str(credit.person)
if credit.role.casefold() in set(GenericMetadata.editor_synonyms):
ET.SubElement(root, "editor").text = str(credit.person)
ET.indent(root)
# wrap it in an ElementTree instance, and save as XML
tree = ET.ElementTree(root)
return tree
def _convert_xml_to_metadata(self, tree: ET.ElementTree, archive: Archiver) -> GenericMetadata:
root = tree.getroot()
if root.tag != "comet":
raise Exception("Not a CoMet file")
metadata = GenericMetadata()
md = metadata
# Helper function
def get(tag: str) -> Any:
node = root.find(tag)
if node is not None:
return node.text
return None
md.series = utils.xlate(get("series"))
md.title = utils.xlate(get("title"))
md.issue = utils.xlate(get("issue"))
md.volume = utils.xlate_int(get("volume"))
md.description = utils.xlate(get("description"))
md.publisher = utils.xlate(get("publisher"))
md.language = utils.xlate(get("language"))
md.format = utils.xlate(get("format"))
md.page_count = utils.xlate_int(get("pages"))
md.maturity_rating = utils.xlate(get("rating"))
md.price = utils.xlate_float(get("price"))
md.is_version_of = utils.xlate(get("isVersionOf"))
md.rights = utils.xlate(get("rights"))
md.identifier = utils.xlate(get("identifier"))
md.last_mark = utils.xlate(get("lastMark"))
_, md.month, md.year = utils.parse_date_str(utils.xlate(get("date")))
ca = ComicArchive(archive)
cover_filename = utils.xlate(get("coverImage"))
page_list = ca.get_page_name_list()
if cover_filename in page_list:
cover_index = page_list.index(cover_filename)
md.pages = [
PageMetadata(
archive_index=cover_index,
display_index=0,
filename=cover_filename,
type=PageType.FrontCover,
bookmark="",
)
]
reading_direction = utils.xlate(get("readingDirection"))
if reading_direction is not None and reading_direction == "rtl":
md.manga = "YesAndRightToLeft"
# loop for genre tags
for n in root:
if n.tag == "genre":
md.genres.add((n.text or "").strip())
# loop for character tags
for n in root:
if n.tag == "character":
md.characters.add((n.text or "").strip())
# Now extract the credit info
for n in root:
if any(
[
n.tag == "writer",
n.tag == "penciller",
n.tag == "inker",
n.tag == "colorist",
n.tag == "letterer",
n.tag == "editor",
]
):
metadata.add_credit((n.text or "").strip(), n.tag.title())
if n.tag == "coverDesigner":
metadata.add_credit((n.text or "").strip(), "Cover")
metadata.is_empty = False
return metadata
# verify that the string actually contains CoMet data in XML format
def _validate_bytes(self, string: bytes) -> bool:
try:
tree = ET.ElementTree(ET.fromstring(string))
root = tree.getroot()
if root.tag != "comet":
return False
except ET.ParseError:
return False
return True

View File

@ -1,229 +0,0 @@
"""A class to encapsulate the ComicBookInfo data"""
# Copyright 2012-2014 ComicTagger Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import json
import logging
from datetime import datetime
from typing import Any, Literal, TypedDict
from comicapi import utils
from comicapi.archivers import Archiver
from comicapi.genericmetadata import Credit, GenericMetadata
from comicapi.tags import Tag
logger = logging.getLogger(__name__)
_CBILiteralType = Literal[
"series",
"title",
"issue",
"publisher",
"publicationMonth",
"publicationYear",
"numberOfIssues",
"comments",
"genre",
"volume",
"numberOfVolumes",
"language",
"country",
"rating",
"credits",
"tags",
]
class credit(TypedDict):
person: str
role: str
primary: bool
class _ComicBookInfoJson(TypedDict, total=False):
series: str
title: str
publisher: str
publicationMonth: int
publicationYear: int
issue: int
numberOfIssues: int
volume: int
numberOfVolumes: int
rating: int
genre: str
language: str
country: str
credits: list[credit]
tags: list[str]
comments: str
_CBIContainer = TypedDict("_CBIContainer", {"appID": str, "lastModified": str, "ComicBookInfo/1.0": _ComicBookInfoJson})
class ComicBookInfo(Tag):
enabled = True
id = "cbi"
def __init__(self, version: str) -> None:
super().__init__(version)
self.supported_attributes = {
"series",
"issue",
"issue_count",
"title",
"volume",
"volume_count",
"genres",
"description",
"publisher",
"month",
"year",
"language",
"country",
"critical_rating",
"tags",
"credits",
"credits.person",
"credits.primary",
"credits.role",
}
def supports_credit_role(self, role: str) -> bool:
return True
def supports_tags(self, archive: Archiver) -> bool:
return archive.supports_comment()
def has_tags(self, archive: Archiver) -> bool:
return self.supports_tags(archive) and self._validate_string(archive.get_comment())
def remove_tags(self, archive: Archiver) -> bool:
return archive.set_comment("")
def read_tags(self, archive: Archiver) -> GenericMetadata:
if self.has_tags(archive):
comment = archive.get_comment()
if self._validate_string(comment):
return self._metadata_from_string(comment)
return GenericMetadata()
def read_raw_tags(self, archive: Archiver) -> str:
if self.has_tags(archive):
return json.dumps(json.loads(archive.get_comment()), indent=2)
return ""
def write_tags(self, metadata: GenericMetadata, archive: Archiver) -> bool:
if self.supports_tags(archive):
return archive.set_comment(self._string_from_metadata(metadata))
else:
logger.warning(f"Archive ({archive.name()}) does not support {self.name()} metadata")
return False
def name(self) -> str:
return "ComicBookInfo"
def _metadata_from_string(self, string: str) -> GenericMetadata:
cbi_container: _CBIContainer = json.loads(string)
metadata = GenericMetadata()
cbi = cbi_container["ComicBookInfo/1.0"]
metadata.series = utils.xlate(cbi.get("series"))
metadata.title = utils.xlate(cbi.get("title"))
metadata.issue = utils.xlate(cbi.get("issue"))
metadata.publisher = utils.xlate(cbi.get("publisher"))
metadata.month = utils.xlate_int(cbi.get("publicationMonth"))
metadata.year = utils.xlate_int(cbi.get("publicationYear"))
metadata.issue_count = utils.xlate_int(cbi.get("numberOfIssues"))
metadata.description = utils.xlate(cbi.get("comments"))
metadata.genres = set(utils.split(cbi.get("genre"), ","))
metadata.volume = utils.xlate_int(cbi.get("volume"))
metadata.volume_count = utils.xlate_int(cbi.get("numberOfVolumes"))
metadata.language = utils.xlate(cbi.get("language"))
metadata.country = utils.xlate(cbi.get("country"))
metadata.critical_rating = utils.xlate_int(cbi.get("rating"))
metadata.credits = [
Credit(
person=x["person"] if "person" in x else "",
role=x["role"] if "role" in x else "",
primary=x["primary"] if "primary" in x else False,
)
for x in cbi.get("credits", [])
]
metadata.tags.update(cbi.get("tags", set()))
# need the language string to be ISO
if metadata.language:
metadata.language = utils.get_language_iso(metadata.language)
metadata.is_empty = False
return metadata
def _string_from_metadata(self, metadata: GenericMetadata) -> str:
cbi_container = self._create_json_dictionary(metadata)
return json.dumps(cbi_container)
def _validate_string(self, string: bytes | str) -> bool:
"""Verify that the string actually contains CBI data in JSON format"""
try:
cbi_container = json.loads(string)
except json.JSONDecodeError:
return False
return "ComicBookInfo/1.0" in cbi_container
def _create_json_dictionary(self, metadata: GenericMetadata) -> _CBIContainer:
"""Create the dictionary that we will convert to JSON text"""
cbi_container = _CBIContainer(
{
"appID": "ComicTagger/1.0.0",
"lastModified": str(datetime.now()),
"ComicBookInfo/1.0": {},
}
) # TODO: ctversion.version,
# helper func
def assign(cbi_entry: _CBILiteralType, md_entry: Any) -> None:
if md_entry is not None or isinstance(md_entry, str) and md_entry != "":
cbi_container["ComicBookInfo/1.0"][cbi_entry] = md_entry
assign("series", utils.xlate(metadata.series))
assign("title", utils.xlate(metadata.title))
assign("issue", utils.xlate(metadata.issue))
assign("publisher", utils.xlate(metadata.publisher))
assign("publicationMonth", utils.xlate_int(metadata.month))
assign("publicationYear", utils.xlate_int(metadata.year))
assign("numberOfIssues", utils.xlate_int(metadata.issue_count))
assign("comments", utils.xlate(metadata.description))
assign("genre", utils.xlate(",".join(metadata.genres)))
assign("volume", utils.xlate_int(metadata.volume))
assign("numberOfVolumes", utils.xlate_int(metadata.volume_count))
assign("language", utils.xlate(utils.get_language_from_iso(metadata.language)))
assign("country", utils.xlate(metadata.country))
assign("rating", utils.xlate_int(metadata.critical_rating))
assign("credits", [credit(person=c.person, role=c.role, primary=c.primary) for c in metadata.credits])
assign("tags", list(metadata.tags))
return cbi_container

View File

@ -88,7 +88,7 @@ if sys.version_info < (3, 11):
cls._lower_members = {x.casefold(): x for x in cls} # type: ignore[attr-defined]
return cls._lower_members.get(value.casefold(), None) # type: ignore[attr-defined]
def __str__(self):
def __str__(self) -> str:
return self.value
else:

View File

@ -36,6 +36,7 @@ from comictaggerlib.filerenamer import FileRenamer, get_rename_dir
from comictaggerlib.graphics import graphics_path
from comictaggerlib.issueidentifier import IssueIdentifier
from comictaggerlib.md import prepare_metadata
from comictaggerlib.quick_tag import QuickTag
from comictaggerlib.resulttypes import Action, IssueResult, MatchStatus, OnlineMatchResults, Result, Status
from comictalker.comictalker import ComicTalker, TalkerError
@ -397,6 +398,153 @@ class CLI:
res.status = status
return res
def try_quick_tag(self, ca: ComicArchive, md: GenericMetadata) -> GenericMetadata | None:
if not self.config.Runtime_Options__enable_quick_tag:
self.output("skipping quick tag")
return None
self.output("starting quick tag")
try:
qt = QuickTag(
self.config.Quick_Tag__url,
str(utils.parse_url(self.current_talker().website).host),
self.current_talker(),
self.config,
self.output,
)
ct_md = qt.id_comic(
ca,
md,
self.config.Quick_Tag__simple,
set(self.config.Quick_Tag__hash),
self.config.Quick_Tag__exact_only,
self.config.Runtime_Options__interactive,
self.config.Quick_Tag__aggressive_filtering,
self.config.Quick_Tag__max,
)
if ct_md is None:
ct_md = GenericMetadata()
return ct_md
except Exception:
logger.exception("Quick Tagging failed")
return None
def normal_tag(
self, ca: ComicArchive, tags_read: list[str], md: GenericMetadata, match_results: OnlineMatchResults
) -> tuple[GenericMetadata, list[IssueResult], Result | None, OnlineMatchResults]:
# ct_md, results, matches, match_results
if md is None or md.is_empty:
logger.error("No metadata given to search online with!")
res = Result(
Action.save,
status=Status.match_failure,
original_path=ca.path,
match_status=MatchStatus.no_match,
tags_written=self.config.Runtime_Options__tags_write,
tags_read=tags_read,
)
match_results.no_matches.append(res)
return GenericMetadata(), [], res, match_results
ii = IssueIdentifier(ca, self.config, self.current_talker())
ii.set_output_function(functools.partial(self.output, already_logged=True))
if not self.config.Auto_Tag__use_year_when_identifying:
md.year = None
if self.config.Auto_Tag__ignore_leading_numbers_in_filename and md.series is not None:
md.series = re.sub(r"^([\d.]+)(.*)", r"\2", md.series)
result, matches = ii.identify(ca, md)
found_match = False
choices = False
low_confidence = False
if result == IssueIdentifier.result_no_matches:
pass
elif result == IssueIdentifier.result_found_match_but_bad_cover_score:
low_confidence = True
found_match = True
elif result == IssueIdentifier.result_found_match_but_not_first_page:
found_match = True
elif result == IssueIdentifier.result_multiple_matches_with_bad_image_scores:
low_confidence = True
choices = True
elif result == IssueIdentifier.result_one_good_match:
found_match = True
elif result == IssueIdentifier.result_multiple_good_matches:
choices = True
if choices:
if low_confidence:
logger.error("Online search: Multiple low confidence matches. Save aborted")
res = Result(
Action.save,
status=Status.match_failure,
original_path=ca.path,
online_results=matches,
match_status=MatchStatus.low_confidence_match,
tags_written=self.config.Runtime_Options__tags_write,
tags_read=tags_read,
)
match_results.low_confidence_matches.append(res)
return GenericMetadata(), matches, res, match_results
logger.error("Online search: Multiple good matches. Save aborted")
res = Result(
Action.save,
status=Status.match_failure,
original_path=ca.path,
online_results=matches,
match_status=MatchStatus.multiple_match,
tags_written=self.config.Runtime_Options__tags_write,
tags_read=tags_read,
)
match_results.multiple_matches.append(res)
return GenericMetadata(), matches, res, match_results
if low_confidence and self.config.Runtime_Options__abort_on_low_confidence:
logger.error("Online search: Low confidence match. Save aborted")
res = Result(
Action.save,
status=Status.match_failure,
original_path=ca.path,
online_results=matches,
match_status=MatchStatus.low_confidence_match,
tags_written=self.config.Runtime_Options__tags_write,
tags_read=tags_read,
)
match_results.low_confidence_matches.append(res)
return GenericMetadata(), matches, res, match_results
if not found_match:
logger.error("Online search: No match found. Save aborted")
res = Result(
Action.save,
status=Status.match_failure,
original_path=ca.path,
online_results=matches,
match_status=MatchStatus.no_match,
tags_written=self.config.Runtime_Options__tags_write,
tags_read=tags_read,
)
match_results.no_matches.append(res)
return GenericMetadata(), matches, res, match_results
# we got here, so we have a single match
# now get the particular issue data
ct_md = self.fetch_metadata(matches[0].issue_id)
if ct_md.is_empty:
res = Result(
Action.save,
status=Status.fetch_data_failure,
original_path=ca.path,
online_results=matches,
match_status=MatchStatus.good_match,
tags_written=self.config.Runtime_Options__tags_write,
tags_read=tags_read,
)
match_results.fetch_data_failures.append(res)
return GenericMetadata(), matches, res, match_results
return ct_md, matches, None, match_results
def save(self, ca: ComicArchive, match_results: OnlineMatchResults) -> tuple[Result, OnlineMatchResults]:
if self.config.Runtime_Options__skip_existing_tags:
for tag_id in self.config.Runtime_Options__tags_write:
@ -455,117 +603,34 @@ class CLI:
return res, match_results
else:
if md is None or md.is_empty:
logger.error("No metadata given to search online with!")
res = Result(
Action.save,
status=Status.match_failure,
original_path=ca.path,
match_status=MatchStatus.no_match,
tags_written=self.config.Runtime_Options__tags_write,
tags_read=tags_read,
)
match_results.no_matches.append(res)
return res, match_results
ii = IssueIdentifier(ca, self.config, self.current_talker())
ii.set_output_function(functools.partial(self.output, already_logged=True))
if not self.config.Auto_Tag__use_year_when_identifying:
md.year = None
if self.config.Auto_Tag__ignore_leading_numbers_in_filename and md.series is not None:
md.series = re.sub(r"^([\d.]+)(.*)", r"\2", md.series)
result, matches = ii.identify(ca, md)
found_match = False
choices = False
low_confidence = False
if result == IssueIdentifier.result_no_matches:
pass
elif result == IssueIdentifier.result_found_match_but_bad_cover_score:
low_confidence = True
found_match = True
elif result == IssueIdentifier.result_found_match_but_not_first_page:
found_match = True
elif result == IssueIdentifier.result_multiple_matches_with_bad_image_scores:
low_confidence = True
choices = True
elif result == IssueIdentifier.result_one_good_match:
found_match = True
elif result == IssueIdentifier.result_multiple_good_matches:
choices = True
if choices:
if low_confidence:
logger.error("Online search: Multiple low confidence matches. Save aborted")
res = Result(
Action.save,
status=Status.match_failure,
original_path=ca.path,
online_results=matches,
match_status=MatchStatus.low_confidence_match,
tags_written=self.config.Runtime_Options__tags_write,
tags_read=tags_read,
)
match_results.low_confidence_matches.append(res)
qt_md = self.try_quick_tag(ca, md)
if qt_md is None or qt_md.is_empty:
if qt_md is not None:
self.output("Failed to find match via quick tag")
ct_md, matches, res, match_results = self.normal_tag(ca, tags_read, md, match_results) # type: ignore[assignment]
if res is not None:
return res, match_results
logger.error("Online search: Multiple good matches. Save aborted")
res = Result(
Action.save,
status=Status.match_failure,
original_path=ca.path,
online_results=matches,
match_status=MatchStatus.multiple_match,
tags_written=self.config.Runtime_Options__tags_write,
tags_read=tags_read,
)
match_results.multiple_matches.append(res)
return res, match_results
if low_confidence and self.config.Runtime_Options__abort_on_low_confidence:
logger.error("Online search: Low confidence match. Save aborted")
res = Result(
Action.save,
status=Status.match_failure,
original_path=ca.path,
online_results=matches,
match_status=MatchStatus.low_confidence_match,
tags_written=self.config.Runtime_Options__tags_write,
tags_read=tags_read,
)
match_results.low_confidence_matches.append(res)
return res, match_results
if not found_match:
logger.error("Online search: No match found. Save aborted")
res = Result(
Action.save,
status=Status.match_failure,
original_path=ca.path,
online_results=matches,
match_status=MatchStatus.no_match,
tags_written=self.config.Runtime_Options__tags_write,
tags_read=tags_read,
)
match_results.no_matches.append(res)
return res, match_results
# we got here, so we have a single match
# now get the particular issue data
ct_md = self.fetch_metadata(matches[0].issue_id)
if ct_md.is_empty:
res = Result(
Action.save,
status=Status.fetch_data_failure,
original_path=ca.path,
online_results=matches,
match_status=MatchStatus.good_match,
tags_written=self.config.Runtime_Options__tags_write,
tags_read=tags_read,
)
match_results.fetch_data_failures.append(res)
return res, match_results
else:
self.output("Successfully matched via quick tag")
ct_md = qt_md
matches = [
IssueResult(
series=ct_md.series or "",
distance=-1,
issue_number=ct_md.issue or "",
issue_count=ct_md.issue_count,
url_image_hash=-1,
issue_title=ct_md.title or "",
issue_id=ct_md.issue_id or "",
series_id=ct_md.issue_id or "",
month=ct_md.month,
year=ct_md.year,
publisher=None,
image_url=ct_md._cover_image or "",
alt_image_urls=[],
description=ct_md.description or "",
)
]
res = Result(
Action.save,

View File

@ -104,6 +104,9 @@ def save_file(
filename: A pathlib.Path object to save the json dictionary to
"""
file_options = settngs.clean_config(config, file=True)
if "Quick Tag" in file_options and "url" in file_options["Quick Tag"]:
file_options["Quick Tag"]["url"] = str(file_options["Quick Tag"]["url"])
try:
if not filename.exists():
filename.parent.mkdir(exist_ok=True, parents=True)

View File

@ -27,7 +27,7 @@ import settngs
from comicapi import utils
from comicapi.comicarchive import tags
from comictaggerlib import ctversion
from comictaggerlib import ctversion, quick_tag
from comictaggerlib.ctsettings.settngs_namespace import SettngsNS as ct_ns
from comictaggerlib.ctsettings.types import ComicTaggerPaths, tag
from comictaggerlib.resulttypes import Action
@ -51,6 +51,12 @@ def initial_commandline_parser() -> argparse.ArgumentParser:
default=0,
help="Be noisy when doing what it does. Use a second time to enable debug logs.\nShort option cannot be combined with other options.",
)
parser.add_argument(
"--enable-quick-tag",
action=argparse.BooleanOptionalAction,
default=False,
help='Enable the expiremental "quick tagger"',
)
return parser
@ -70,6 +76,13 @@ def register_runtime(parser: settngs.Manager) -> None:
help="Be noisy when doing what it does. Use a second time to enable debug logs.\nShort option cannot be combined with other options.",
file=False,
)
parser.add_setting(
"--enable-quick-tag",
action=argparse.BooleanOptionalAction,
default=False,
help='Enable the expiremental "quick tagger"',
file=False,
)
parser.add_setting("-q", "--quiet", action="store_true", help="Don't say much (for print mode).", file=False)
parser.add_setting(
"-j",
@ -240,9 +253,11 @@ def register_commands(parser: settngs.Manager) -> None:
)
def register_commandline_settings(parser: settngs.Manager) -> None:
def register_commandline_settings(parser: settngs.Manager, enable_quick_tag: bool) -> None:
parser.add_group("Commands", register_commands, True)
parser.add_persistent_group("Runtime Options", register_runtime)
if enable_quick_tag:
parser.add_group("Quick Tag", quick_tag.settings)
def validate_commandline_settings(config: settngs.Config[ct_ns], parser: settngs.Manager) -> settngs.Config[ct_ns]:

View File

@ -27,8 +27,8 @@ def general(parser: settngs.Manager) -> None:
def internal(parser: settngs.Manager) -> None:
# automatic settings
parser.add_setting("install_id", default=uuid.uuid4().hex, cmdline=False)
parser.add_setting("write_tags", default=["cbi"], cmdline=False)
parser.add_setting("read_tags", default=["cbi"], cmdline=False)
parser.add_setting("write_tags", default=["cr"], cmdline=False)
parser.add_setting("read_tags", default=["cr"], cmdline=False)
parser.add_setting("last_opened_folder", default="", cmdline=False)
parser.add_setting("window_width", default=0, cmdline=False)
parser.add_setting("window_height", default=0, cmdline=False)
@ -356,7 +356,7 @@ def migrate_settings(config: settngs.Config[ct_ns]) -> settngs.Config[ct_ns]:
elif isinstance(write_Tags, str):
config[0].internal__write_tags = [write_Tags]
else:
config[0].internal__write_tags = ["cbi"]
config[0].internal__write_tags = ["cr"]
read_tags = config[0].internal__read_tags
if not isinstance(read_tags, list):
@ -365,7 +365,7 @@ def migrate_settings(config: settngs.Config[ct_ns]) -> settngs.Config[ct_ns]:
elif isinstance(read_tags, str):
config[0].internal__read_tags = [read_tags]
else:
config[0].internal__read_tags = ["cbi"]
config[0].internal__read_tags = ["cr"]
return config

View File

@ -5,17 +5,52 @@
from __future__ import annotations
import configparser
import importlib.metadata
import importlib.util
import itertools
import logging
import pathlib
import platform
import re
from collections.abc import Generator
from typing import Any, NamedTuple
import sys
from collections.abc import Generator, Iterable
from typing import Any, NamedTuple, TypeVar
if sys.version_info < (3, 10):
import importlib_metadata
else:
import importlib.metadata as importlib_metadata
import tomli
logger = logging.getLogger(__name__)
NORMALIZE_PACKAGE_NAME_RE = re.compile(r"[-_.]+")
PLUGIN_GROUPS = frozenset(("comictagger.talker", "comicapi.archiver", "comicapi.tags"))
icu_available = importlib.util.find_spec("icu") is not None
def _custom_key(tup: Any) -> Any:
import natsort
lst = []
for x in natsort.os_sort_keygen()(tup):
ret = x
if len(x) > 1 and isinstance(x[1], int) and isinstance(x[0], str) and x[0] == "":
ret = ("a", *x[1:])
lst.append(ret)
return tuple(lst)
T = TypeVar("T")
def os_sorted(lst: Iterable[T]) -> Iterable[T]:
import natsort
key = _custom_key
if icu_available or platform.system() == "Windows":
key = natsort.os_sort_keygen()
return sorted(lst, key=key)
class FailedToLoadPlugin(Exception):
@ -47,9 +82,12 @@ class Plugin(NamedTuple):
package: str
version: str
entry_point: importlib.metadata.EntryPoint
entry_point: importlib_metadata.EntryPoint
path: pathlib.Path
def load(self) -> LoadedPlugin:
return LoadedPlugin(self, self.entry_point.load())
class LoadedPlugin(NamedTuple):
"""Represents a plugin after being imported."""
@ -71,11 +109,11 @@ class LoadedPlugin(NamedTuple):
class Plugins(NamedTuple):
"""Classified plugins."""
archivers: list[Plugin]
tags: list[Plugin]
talkers: list[Plugin]
archivers: list[LoadedPlugin]
tags: list[LoadedPlugin]
talkers: list[LoadedPlugin]
def all_plugins(self) -> Generator[Plugin]:
def all_plugins(self) -> Generator[LoadedPlugin]:
"""Return an iterator over all :class:`LoadedPlugin`s."""
yield from self.archivers
yield from self.tags
@ -83,13 +121,24 @@ class Plugins(NamedTuple):
def versions_str(self) -> str:
"""Return a user-displayed list of plugin versions."""
return ", ".join(sorted({f"{plugin.package}: {plugin.version}" for plugin in self.all_plugins()}))
return ", ".join(sorted({f"{plugin.plugin.package}: {plugin.plugin.version}" for plugin in self.all_plugins()}))
def _find_local_plugins(plugin_path: pathlib.Path) -> Generator[Plugin]:
def _find_ep_plugin(plugin_path: pathlib.Path) -> None | Generator[Plugin]:
logger.debug("Checking for distributions in %s", plugin_path)
for dist in importlib_metadata.distributions(path=[str(plugin_path)]):
logger.debug("found distribution %s", dist.name)
eps = dist.entry_points
for group in PLUGIN_GROUPS:
for ep in eps.select(group=group):
logger.debug("found EntryPoint group %s %s=%s", group, ep.name, ep.value)
yield Plugin(plugin_path.name, dist.version, ep, plugin_path)
return None
def _find_cfg_plugin(setup_cfg_path: pathlib.Path) -> Generator[Plugin]:
cfg = configparser.ConfigParser(interpolation=None)
cfg.read(plugin_path / "setup.cfg")
cfg.read(setup_cfg_path)
for group in PLUGIN_GROUPS:
for plugin_s in cfg.get("options.entry_points", group, fallback="").splitlines():
@ -98,8 +147,43 @@ def _find_local_plugins(plugin_path: pathlib.Path) -> Generator[Plugin]:
name, _, entry_str = plugin_s.partition("=")
name, entry_str = name.strip(), entry_str.strip()
ep = importlib.metadata.EntryPoint(name, entry_str, group)
yield Plugin(plugin_path.name, cfg.get("metadata", "version", fallback="0.0.1"), ep, plugin_path)
ep = importlib_metadata.EntryPoint(name, entry_str, group)
yield Plugin(
setup_cfg_path.parent.name, cfg.get("metadata", "version", fallback="0.0.1"), ep, setup_cfg_path.parent
)
def _find_pyproject_plugin(pyproject_path: pathlib.Path) -> Generator[Plugin]:
cfg = tomli.loads(pyproject_path.read_text())
for group in PLUGIN_GROUPS:
cfg["project"]["entry-points"]
for plugins in cfg.get("project", {}).get("entry-points", {}).get(group, {}):
if not plugins:
continue
for name, entry_str in plugins.items():
ep = importlib_metadata.EntryPoint(name, entry_str, group)
yield Plugin(
pyproject_path.parent.name,
cfg.get("project", {}).get("version", "0.0.1"),
ep,
pyproject_path.parent,
)
def _find_local_plugins(plugin_path: pathlib.Path) -> Generator[Plugin]:
gen = _find_ep_plugin(plugin_path)
if gen is not None:
yield from gen
return
if (plugin_path / "setup.cfg").is_file():
yield from _find_cfg_plugin(plugin_path / "setup.cfg")
return
if (plugin_path / "pyproject.cfg").is_file():
yield from _find_pyproject_plugin(plugin_path / "setup.cfg")
return
def _check_required_plugins(plugins: list[Plugin], expected: frozenset[str]) -> None:
@ -118,30 +202,48 @@ def _check_required_plugins(plugins: list[Plugin], expected: frozenset[str]) ->
def find_plugins(plugin_folder: pathlib.Path) -> Plugins:
"""Discovers all plugins (but does not load them)."""
ret: list[Plugin] = []
for plugin_path in plugin_folder.glob("*/setup.cfg"):
try:
ret.extend(_find_local_plugins(plugin_path.parent))
except Exception as err:
FailedToLoadPlugin(plugin_path.parent.name, err)
ret: list[LoadedPlugin] = []
# for determinism, sort the list
ret.sort()
dirs = {x.parent for x in plugin_folder.glob("*/setup.cfg")}
dirs.update({x.parent for x in plugin_folder.glob("*/pyproject.toml")})
zips = [x for x in plugin_folder.glob("*.zip") if x.is_file()]
for plugin_path in itertools.chain(os_sorted(zips), os_sorted(dirs)):
logger.debug("looking for plugins in %s", plugin_path)
try:
sys.path.append(str(plugin_path))
for plugin in _find_local_plugins(plugin_path):
logger.debug("Attempting to load %s from %s", plugin.entry_point.name, plugin.path)
ret.append(plugin.load())
# sys.path.remove(str(plugin_path))
except Exception as err:
FailedToLoadPlugin(plugin_path.name, err)
finally:
sys.path.remove(str(plugin_path))
for mod in list(sys.modules.values()):
if (
mod is not None
and hasattr(mod, "__spec__")
and mod.__spec__
and str(plugin_path) in (mod.__spec__.origin or "")
):
sys.modules.pop(mod.__name__)
return _classify_plugins(ret)
def _classify_plugins(plugins: list[Plugin]) -> Plugins:
def _classify_plugins(plugins: list[LoadedPlugin]) -> Plugins:
archivers = []
tags = []
talkers = []
for p in plugins:
if p.entry_point.group == "comictagger.talker":
if p.plugin.entry_point.group == "comictagger.talker":
talkers.append(p)
elif p.entry_point.group == "comicapi.tags":
elif p.plugin.entry_point.group == "comicapi.tags":
tags.append(p)
elif p.entry_point.group == "comicapi.archiver":
elif p.plugin.entry_point.group == "comicapi.archiver":
archivers.append(p)
else:
logger.warning(NotImplementedError(f"what plugin type? {p}"))

View File

@ -3,6 +3,7 @@ from __future__ import annotations
import typing
import settngs
import urllib3.util.url
import comicapi.genericmetadata
import comicapi.merge
@ -19,6 +20,7 @@ class SettngsNS(settngs.TypedNS):
Runtime_Options__config: comictaggerlib.ctsettings.types.ComicTaggerPaths
Runtime_Options__verbose: int
Runtime_Options__enable_quick_tag: bool
Runtime_Options__quiet: bool
Runtime_Options__json: bool
Runtime_Options__raw: bool
@ -37,6 +39,13 @@ class SettngsNS(settngs.TypedNS):
Runtime_Options__skip_existing_tags: bool
Runtime_Options__files: list[str]
Quick_Tag__url: urllib3.util.url.Url
Quick_Tag__max: int
Quick_Tag__simple: bool
Quick_Tag__aggressive_filtering: bool
Quick_Tag__hash: list[comictaggerlib.quick_tag.HashType]
Quick_Tag__exact_only: bool
internal__install_id: str
internal__write_tags: list[str]
internal__read_tags: list[str]
@ -132,6 +141,7 @@ class Commands(typing.TypedDict):
class Runtime_Options(typing.TypedDict):
config: comictaggerlib.ctsettings.types.ComicTaggerPaths
verbose: int
enable_quick_tag: bool
quiet: bool
json: bool
raw: bool
@ -151,6 +161,15 @@ class Runtime_Options(typing.TypedDict):
files: list[str]
class Quick_Tag(typing.TypedDict):
url: urllib3.util.url.Url
max: int
simple: bool
aggressive_filtering: bool
hash: list[comictaggerlib.quick_tag.HashType]
exact_only: bool
class internal(typing.TypedDict):
install_id: str
write_tags: list[str]
@ -263,6 +282,7 @@ SettngsDict = typing.TypedDict(
{
"Commands": Commands,
"Runtime Options": Runtime_Options,
"Quick Tag": Quick_Tag,
"internal": internal,
"Issue Identifier": Issue_Identifier,
"Filename Parsing": Filename_Parsing,

View File

@ -1,6 +1,7 @@
from __future__ import annotations
import argparse
import logging
import pathlib
import sys
import types
@ -15,6 +16,8 @@ from comicapi import utils
from comicapi.comicarchive import tags
from comicapi.genericmetadata import REMOVE, GenericMetadata
logger = logging.getLogger(__name__)
if sys.version_info < (3, 10):
@typing.no_type_check
@ -198,44 +201,48 @@ def parse_metadata_from_string(mdstr: str) -> GenericMetadata:
md = GenericMetadata()
if not mdstr:
return md
if mdstr[0] == "@":
p = pathlib.Path(mdstr[1:])
if not p.is_file():
raise argparse.ArgumentTypeError("Invalid filepath")
mdstr = p.read_text()
if mdstr[0] != "{":
mdstr = "{" + mdstr + "}"
try:
if not mdstr:
return md
if mdstr[0] == "@":
p = pathlib.Path(mdstr[1:])
if not p.is_file():
raise argparse.ArgumentTypeError("Invalid filepath")
mdstr = p.read_text()
if mdstr[0] != "{":
mdstr = "{" + mdstr + "}"
md_dict = yaml.safe_load(mdstr)
md_dict = yaml.safe_load(mdstr)
empty = True
# Map the dict to the metadata object
for key, value in md_dict.items():
if hasattr(md, key):
t = get_type(key)
if value is None:
value = REMOVE
elif isinstance(t, tuple):
if value == "":
value = t[0]()
empty = True
# Map the dict to the metadata object
for key, value in md_dict.items():
if hasattr(md, key):
t = get_type(key)
if value is None:
value = REMOVE
elif isinstance(t, tuple):
if value == "":
value = t[0]()
else:
if isinstance(value, str):
value = [value]
if not isinstance(value, Collection):
raise argparse.ArgumentTypeError(f"Invalid syntax for tag '{key}'")
values = list(value)
for idx, v in enumerate(values):
if not isinstance(v, t[1]):
values[idx] = convert_value(t[1], v)
value = t[0](values)
else:
if isinstance(value, str):
value = [value]
if not isinstance(value, Collection):
raise argparse.ArgumentTypeError(f"Invalid syntax for tag '{key}'")
values = list(value)
for idx, v in enumerate(values):
if not isinstance(v, t[1]):
values[idx] = convert_value(t[1], v)
value = t[0](values)
else:
value = convert_value(t, value)
value = convert_value(t, value)
empty = False
setattr(md, key, value)
else:
raise argparse.ArgumentTypeError(f"'{key}' is not a valid tag name")
md.is_empty = empty
empty = False
setattr(md, key, value)
else:
raise argparse.ArgumentTypeError(f"'{key}' is not a valid tag name")
md.is_empty = empty
except Exception as e:
logger.exception("Unable to read metadata from the commandline '%s'", mdstr)
raise Exception("Unable to read metadata from the commandline") from e
return md

View File

@ -73,24 +73,23 @@ class ImageHasher:
return result
def average_hash2(self) -> None:
"""
# Got this one from somewhere on the net. Not a clue how the 'convolve2d' works!
def difference_hash(self) -> int:
try:
image = self.image.resize((self.width + 1, self.height), Image.Resampling.LANCZOS).convert("L")
except Exception:
logger.exception("difference_hash error")
return 0
from numpy import array
from scipy.signal import convolve2d
pixels = list(image.getdata())
diff = ""
for y in range(self.height):
for x in range(self.width):
idx = x + (self.width + 1 * y)
diff += str(int(pixels[idx] < pixels[idx + 1]))
im = self.image.resize((self.width, self.height), Image.ANTIALIAS).convert('L')
result = int(diff, 2)
in_data = array((im.getdata())).reshape(self.width, self.height)
filt = array([[0,1,0],[1,-4,1],[0,1,0]])
filt_data = convolve2d(in_data,filt,mode='same',boundary='symm').flatten()
result = reduce(lambda x, (y, z): x | (z << y),
enumerate(map(lambda i: 0 if i < 0 else 1, filt_data)),
0)
return result
"""
def p_hash(self) -> int:
"""

View File

@ -44,7 +44,6 @@ if sys.version_info < (3, 10):
import importlib_metadata
else:
import importlib.metadata as importlib_metadata
logger = logging.getLogger("comictagger")
@ -117,26 +116,20 @@ class App:
conf = self.initialize()
self.initialize_dirs(conf.config)
self.load_plugins(conf)
self.register_settings()
self.register_settings(conf.enable_quick_tag)
self.config = self.parse_settings(conf.config)
self.main()
def load_plugins(self, opts: argparse.Namespace) -> None:
local_plugins = plugin_finder.find_plugins(opts.config.user_plugin_dir)
self._extend_plugin_paths(local_plugins)
comicapi.comicarchive.load_archive_plugins(local_plugins=[p.entry_point for p in local_plugins.archivers])
comicapi.comicarchive.load_tag_plugins(
version=version, local_plugins=[p.entry_point for p in local_plugins.tags]
)
comicapi.comicarchive.load_archive_plugins(local_plugins=[p.obj for p in local_plugins.archivers])
comicapi.comicarchive.load_tag_plugins(version=version, local_plugins=[p.obj for p in local_plugins.tags])
self.talkers = comictalker.get_talkers(
version, opts.config.user_cache_dir, local_plugins=[p.entry_point for p in local_plugins.talkers]
version, opts.config.user_cache_dir, local_plugins=[p.obj for p in local_plugins.talkers]
)
def _extend_plugin_paths(self, plugins: plugin_finder.Plugins) -> None:
sys.path.extend(str(p.path.absolute()) for p in plugins.all_plugins())
def list_plugins(
self,
talkers: Collection[comictalker.ComicTalker],
@ -215,13 +208,13 @@ class App:
setup_logging(conf.verbose, conf.config.user_log_dir)
return conf
def register_settings(self) -> None:
def register_settings(self, enable_quick_tag: bool) -> None:
self.manager = settngs.Manager(
description="A utility for reading and writing metadata to comic archives.\n\n\n"
+ "If no options are given, %(prog)s will run in windowed mode.\nPlease keep the '-v' option separated '-so -v' not '-sov'",
epilog="For more help visit the wiki at: https://github.com/comictagger/comictagger/wiki",
)
ctsettings.register_commandline_settings(self.manager)
ctsettings.register_commandline_settings(self.manager, enable_quick_tag)
ctsettings.register_file_settings(self.manager)
ctsettings.register_plugin_settings(self.manager, getattr(self, "talkers", {}))

391
comictaggerlib/quick_tag.py Normal file
View File

@ -0,0 +1,391 @@
from __future__ import annotations
import argparse
import itertools
import logging
from enum import auto
from io import BytesIO
from typing import Callable, TypedDict, cast
from urllib.parse import urljoin
import requests
import settngs
from comicapi import comicarchive, utils
from comicapi.genericmetadata import GenericMetadata
from comicapi.issuestring import IssueString
from comictaggerlib.ctsettings.settngs_namespace import SettngsNS
from comictaggerlib.imagehasher import ImageHasher
from comictalker import ComicTalker
logger = logging.getLogger(__name__)
__version__ = "0.1"
class HashType(utils.StrEnum):
AHASH = auto()
DHASH = auto()
PHASH = auto()
class SimpleResult(TypedDict):
Distance: int
# Mapping of domains (eg comicvine.gamespot.com) to IDs
IDList: dict[str, list[str]]
class Hash(TypedDict):
Hash: int
Kind: str
class Result(TypedDict):
# Mapping of domains (eg comicvine.gamespot.com) to IDs
IDs: dict[str, list[str]]
Distance: int
Hash: Hash
def ihash(types: str) -> list[HashType]:
result: list[HashType] = []
types = types.casefold()
choices = ", ".join(HashType)
for typ in utils.split(types, ","):
if typ not in list(HashType):
raise argparse.ArgumentTypeError(f"invalid choice: {typ} (choose from {choices.upper()})")
result.append(HashType[typ.upper()])
if not result:
raise argparse.ArgumentTypeError(f"invalid choice: {types} (choose from {choices.upper()})")
return result
def settings(manager: settngs.Manager) -> None:
manager.add_setting(
"--url",
"-u",
default="https://comic-hasher.narnian.us",
type=utils.parse_url,
help="Website to use for searching cover hashes",
)
manager.add_setting(
"--max",
default=8,
type=int,
help="Maximum score to allow. Lower score means more accurate",
)
manager.add_setting(
"--simple",
default=False,
action=argparse.BooleanOptionalAction,
help="Whether to retrieve simple results or full results",
)
manager.add_setting(
"--aggressive-filtering",
default=False,
action=argparse.BooleanOptionalAction,
help="Will filter out worse matches if better matches are found",
)
manager.add_setting(
"--hash",
default="ahash, dhash, phash",
type=ihash,
help="Pick what hashes you want to use to search (default: %(default)s)",
)
manager.add_setting(
"--exact-only",
default=True,
action=argparse.BooleanOptionalAction,
help="Skip non-exact matches if we have exact matches",
)
class QuickTag:
def __init__(
self, url: utils.Url, domain: str, talker: ComicTalker, config: SettngsNS, output: Callable[[str], None]
):
self.output = output
self.url = url
self.talker = talker
self.domain = domain
self.config = config
def id_comic(
self,
ca: comicarchive.ComicArchive,
tags: GenericMetadata,
simple: bool,
hashes: set[HashType],
exact_only: bool,
interactive: bool,
aggressive_filtering: bool,
max_hamming_distance: int,
) -> GenericMetadata | None:
if not ca.seems_to_be_a_comic_archive():
raise Exception(f"{ca.path} is not an archive")
from PIL import Image
cover_index = tags.get_cover_page_index_list()[0]
cover_image = Image.open(BytesIO(ca.get_page(cover_index)))
self.output(f"Tagging: {ca.path}")
self.output("hashing cover")
phash = dhash = ahash = ""
hasher = ImageHasher(image=cover_image)
if HashType.AHASH in hashes:
ahash = hex(hasher.average_hash())[2:]
if HashType.DHASH in hashes:
dhash = hex(hasher.difference_hash())[2:]
if HashType.PHASH in hashes:
phash = hex(hasher.p_hash())[2:]
logger.info(f"Searching with {ahash=}, {dhash=}, {phash=}")
self.output("Searching hashes")
results = self.SearchHashes(simple, max_hamming_distance, ahash, dhash, phash, exact_only)
logger.debug(f"{results=}")
if simple:
filtered_simple_results = self.filter_simple_results(
cast(list[SimpleResult], results), interactive, aggressive_filtering
)
metadata_simple_results = self.get_simple_results(filtered_simple_results)
chosen_result = self.display_simple_results(metadata_simple_results, tags, interactive)
else:
filtered_results = self.filter_results(cast(list[Result], results), interactive, aggressive_filtering)
metadata_results = self.get_results(filtered_results)
chosen_result = self.display_results(metadata_results, tags, interactive)
return self.talker.fetch_comic_data(issue_id=chosen_result.issue_id)
def SearchHashes(
self, simple: bool, max_hamming_distance: int, ahash: str, dhash: str, phash: str, exact_only: bool
) -> list[SimpleResult] | list[Result]:
resp = requests.get(
urljoin(self.url.url, "/match_cover_hash"),
params={
"simple": str(simple),
"max": str(max_hamming_distance),
"ahash": ahash,
"dhash": dhash,
"phash": phash,
"exactOnly": str(exact_only),
},
)
if resp.status_code != 200:
try:
text = resp.json()["msg"]
except Exception:
text = resp.text
if text == "No hashes found":
return []
logger.error("message from server: %s", text)
raise Exception(f"Failed to retrieve results from the server: {text}")
return resp.json()["results"]
def get_mds(self, results: list[SimpleResult] | list[Result]) -> list[GenericMetadata]:
md_results: list[GenericMetadata] = []
results.sort(key=lambda r: r["Distance"])
all_ids = set()
for res in results:
all_ids.update(res.get("IDList", res.get("IDs", {})).get(self.domain, [])) # type: ignore[attr-defined]
self.output(f"Retrieving basic {self.talker.name} data")
# Try to do a bulk feth of basic issue data
if hasattr(self.talker, "fetch_comics"):
md_results = self.talker.fetch_comics(issue_ids=list(all_ids))
else:
for md_id in all_ids:
md_results.append(self.talker.fetch_comic_data(issue_id=md_id))
return md_results
def get_simple_results(self, results: list[SimpleResult]) -> list[tuple[int, GenericMetadata]]:
md_results = []
mds = self.get_mds(results)
# Re-associate the md to the distance
for res in results:
for md in mds:
if md.issue_id in res["IDList"].get(self.domain, []):
md_results.append((res["Distance"], md))
return md_results
def get_results(self, results: list[Result]) -> list[tuple[int, Hash, GenericMetadata]]:
md_results = []
mds = self.get_mds(results)
# Re-associate the md to the distance
for res in results:
for md in mds:
if md.issue_id in res["IDs"].get(self.domain, []):
md_results.append((res["Distance"], res["Hash"], md))
return md_results
def filter_simple_results(
self, results: list[SimpleResult], interactive: bool, aggressive_filtering: bool
) -> list[SimpleResult]:
# If there is a single exact match return it
exact = [r for r in results if r["Distance"] == 0]
if len(exact) == 1:
logger.info("Exact result found. Ignoring any others")
return exact
# If ther are more than 4 results and any are better than 6 return the first group of results
if len(results) > 4:
dist: list[tuple[int, list[SimpleResult]]] = []
filtered_results: list[SimpleResult] = []
for distance, group in itertools.groupby(results, key=lambda r: r["Distance"]):
dist.append((distance, list(group)))
if aggressive_filtering and dist[0][0] < 6:
logger.info(f"Aggressive filtering is enabled. Dropping matches above {dist[0]}")
for _, res in dist[:1]:
filtered_results.extend(res)
logger.debug(f"{filtered_results=}")
return filtered_results
return results
def filter_results(self, results: list[Result], interactive: bool, aggressive_filtering: bool) -> list[Result]:
ahash_results = sorted([r for r in results if r["Hash"]["Kind"] == "ahash"], key=lambda r: r["Distance"])
dhash_results = sorted([r for r in results if r["Hash"]["Kind"] == "dhash"], key=lambda r: r["Distance"])
phash_results = sorted([r for r in results if r["Hash"]["Kind"] == "phash"], key=lambda r: r["Distance"])
hash_results = [phash_results, dhash_results, ahash_results]
# If any of the hash types have a single exact match return it. Prefer phash for no particular reason
for hashed_result in hash_results:
exact = [r for r in hashed_result if r["Distance"] == 0]
if len(exact) == 1:
logger.info(f"Exact {exact[0]['Hash']['Kind']} result found. Ignoring any others")
return exact
results_filtered = False
# If any of the hash types have more than 4 results and they have results better than 6 return the first group of results for each hash type
for i, hashed_results in enumerate(hash_results):
filtered_results: list[Result] = []
if len(hashed_results) > 4:
dist: list[tuple[int, list[Result]]] = []
for distance, group in itertools.groupby(hashed_results, key=lambda r: r["Distance"]):
dist.append((distance, list(group)))
if aggressive_filtering and dist[0][0] < 6:
logger.info(
f"Aggressive filtering is enabled. Dropping {dist[0][1][0]['Hash']['Kind']} matches above {dist[0][0]}"
)
for _, res in dist[:1]:
filtered_results.extend(res)
if filtered_results:
hash_results[i] = filtered_results
results_filtered = True
if results_filtered:
logger.debug(f"filtered_results={list(itertools.chain(*hash_results))}")
return list(itertools.chain(*hash_results))
def display_simple_results(
self, md_results: list[tuple[int, GenericMetadata]], tags: GenericMetadata, interactive: bool
) -> GenericMetadata:
if len(md_results) < 1:
return GenericMetadata()
if len(md_results) == 1 and md_results[0][0] <= 4:
self.output("Found a single match <=4. Assuming it's correct")
return md_results[0][1]
series_match: list[GenericMetadata] = []
for score, md in md_results:
if (
score < 10
and tags.series
and md.series
and utils.titles_match(tags.series, md.series)
and IssueString(tags.issue).as_string() == IssueString(md.issue).as_string()
):
series_match.append(md)
if len(series_match) == 1:
self.output(f"Found match with series name {series_match[0].series!r}")
return series_match[0]
if not interactive:
return GenericMetadata()
md_results.sort(key=lambda r: (r[0], len(r[1].publisher or "")))
for counter, r in enumerate(md_results, 1):
self.output(
" {:2}. score: {} [{:15}] ({:02}/{:04}) - {} #{} - {}".format(
counter,
r[0],
r[1].publisher,
r[1].month or 0,
r[1].year or 0,
r[1].series,
r[1].issue,
r[1].title,
),
)
while True:
i = input(
f'Please select a result to tag the comic with or "q" to quit: [1-{len(md_results)}] ',
).casefold()
if i.isdigit() and int(i) in range(1, len(md_results) + 1):
break
if i == "q":
logger.warning("User quit without saving metadata")
return GenericMetadata()
return md_results[int(i) - 1][1]
def display_results(
self,
md_results: list[tuple[int, Hash, GenericMetadata]],
tags: GenericMetadata,
interactive: bool,
) -> GenericMetadata:
if len(md_results) < 1:
return GenericMetadata()
if len(md_results) == 1 and md_results[0][0] <= 4:
self.output("Found a single match <=4. Assuming it's correct")
return md_results[0][2]
series_match: dict[str, tuple[int, Hash, GenericMetadata]] = {}
for score, cover_hash, md in md_results:
if (
score < 10
and tags.series
and md.series
and utils.titles_match(tags.series, md.series)
and IssueString(tags.issue).as_string() == IssueString(md.issue).as_string()
):
assert md.issue_id
series_match[md.issue_id] = (score, cover_hash, md)
if len(series_match) == 1:
score, cover_hash, md = list(series_match.values())[0]
self.output(f"Found {cover_hash['Kind']} {score=} match with series name {md.series!r}")
return md
if not interactive:
return GenericMetadata()
md_results.sort(key=lambda r: (r[0], len(r[2].publisher or ""), r[1]["Kind"]))
for counter, r in enumerate(md_results, 1):
self.output(
" {:2}. score: {} {}: {:064b} [{:15}] ({:02}/{:04}) - {} #{} - {}".format(
counter,
r[0],
r[1]["Kind"],
r[1]["Hash"],
r[2].publisher or "",
r[2].month or 0,
r[2].year or 0,
r[2].series or "",
r[2].issue or "",
r[2].title or "",
),
)
while True:
i = input(
f'Please select a result to tag the comic with or "q" to quit: [1-{len(md_results)}] ',
).casefold()
if i.isdigit() and int(i) in range(1, len(md_results) + 1):
break
if i == "q":
self.output("User quit without saving metadata")
return GenericMetadata()
return md_results[int(i) - 1][2]

View File

@ -228,8 +228,8 @@ class TaggerWindow(QtWidgets.QMainWindow):
if tag_id not in tags:
config[0].internal__read_tags.remove(tag_id)
self.selected_write_tags: list[str] = config[0].internal__write_tags
self.selected_read_tags: list[str] = config[0].internal__read_tags
self.selected_write_tags: list[str] = config[0].internal__write_tags or ["cr"]
self.selected_read_tags: list[str] = config[0].internal__read_tags or ["cr"]
self.setAcceptDrops(True)
self.view_tag_actions, self.remove_tag_actions = self.tag_actions()

View File

@ -9,9 +9,9 @@ from collections.abc import Sequence
from packaging.version import InvalidVersion, parse
if sys.version_info < (3, 10):
from importlib_metadata import EntryPoint, entry_points
from importlib_metadata import entry_points
else:
from importlib.metadata import entry_points, EntryPoint
from importlib.metadata import entry_points
from comictalker.comictalker import ComicTalker, TalkerError
@ -24,14 +24,14 @@ __all__ = [
def get_talkers(
version: str, cache: pathlib.Path, local_plugins: Sequence[EntryPoint] = tuple()
version: str, cache: pathlib.Path, local_plugins: Sequence[type[ComicTalker]] = tuple()
) -> dict[str, ComicTalker]:
"""Returns all comic talker instances"""
talkers: dict[str, ComicTalker] = {}
ct_version = parse(version)
# A dict is used, last plugin wins
for talker in itertools.chain(entry_points(group="comictagger.talker"), local_plugins):
for talker in itertools.chain(entry_points(group="comictagger.talker")):
try:
talker_cls = talker.load()
obj = talker_cls(version, cache)
@ -56,4 +56,26 @@ def get_talkers(
except Exception:
logger.exception("Failed to load talker: %s", talker.name)
# A dict is used, last plugin wins
for talker_cls in local_plugins:
try:
obj = talker_cls(version, cache)
try:
if ct_version >= parse(talker_cls.comictagger_min_ver):
talkers[talker_cls.id] = obj
else:
logger.error(
f"Minimum ComicTagger version required of {talker_cls.comictagger_min_ver} for talker {talker_cls.id} is not met, will NOT load talker"
)
except InvalidVersion:
logger.warning(
f"Invalid minimum required ComicTagger version number for talker: {talker_cls.id} - version: {talker_cls.comictagger_min_ver}, will load talker anyway"
)
# Attempt to use the talker anyway
# TODO flag this problem for later display to the user
talkers[talker_cls.id] = obj
except Exception:
logger.exception("Failed to load talker: %s", talker_cls.id)
return talkers

View File

@ -295,4 +295,9 @@ class ComicCacher:
set_slots += key + " = ?"
sql_ins = f"INSERT OR REPLACE INTO {tablename} ({keys}) VALUES ({ins_slots})"
if not data.get("complete", True):
sql_ins += f" ON CONFLICT DO UPDATE SET {set_slots} WHERE complete != ?"
vals.extend(vals)
vals.append(True) # If the cache is complete and this isn't complete we don't update it
cur.execute(sql_ins, vals)

View File

@ -21,7 +21,7 @@ from urllib.parse import urlsplit
logger = logging.getLogger(__name__)
def fix_url(url: str) -> str:
def fix_url(url: str | None) -> str:
if not url:
return ""
tmp_url = urlsplit(url)

View File

@ -43,6 +43,8 @@ except ImportError:
import requests
logger = logging.getLogger(__name__)
TWITTER_TOO_MANY_REQUESTS = 420
class CVTypeID:
Volume = "4050" # CV uses volume to mean series
@ -157,8 +159,8 @@ class CVResult(TypedDict, Generic[T]):
# https://comicvine.gamespot.com/forums/api-developers-2334/api-rate-limiting-1746419/
# "Space out your requests so AT LEAST one second passes between each and you can make requests all day."
custom_limiter = Limiter(RequestRate(10, 10))
default_limiter = Limiter(RequestRate(1, 5))
custom_limiter = Limiter(RequestRate(10, 10), RequestRate(200, 1 * 60 * 60))
default_limiter = Limiter(RequestRate(1, 10), RequestRate(100, 1 * 60 * 60))
class ComicVineTalker(ComicTalker):
@ -171,7 +173,7 @@ class ComicVineTalker(ComicTalker):
f"<a href='{website}'>{name}</a> has the largest collection of comic book data available through "
f"its public facing API. "
f"<p>NOTE: Using the default API key will serverly limit access times. A personal API "
f"key will allow for a <b>5 times increase</b> in online search speed. See the "
f"key will allow for a <b>10 times increase</b> in online search speed. See the "
"<a href='https://github.com/comictagger/comictagger/wiki/UserGuide#comic-vine'>Wiki page</a> for "
"more information.</p>"
)
@ -410,11 +412,140 @@ class ComicVineTalker(ComicTalker):
return formatted_filtered_issues_result
def fetch_comics(self, *, issue_ids: list[str]) -> list[GenericMetadata]:
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher(self.cache_folder, self.version)
cached_results: list[GenericMetadata] = []
needed_issues: list[int] = []
for issue_id in issue_ids:
cached_issue = cvc.get_issue_info(issue_id, self.id)
if cached_issue and cached_issue[1]:
cached_results.append(
self._map_comic_issue_to_metadata(
json.loads(cached_issue[0].data),
self._fetch_series([int(cached_issue[0].series_id)])[0][0],
),
)
else:
needed_issues.append(int(issue_id)) # CV uses integers for it's IDs
if not needed_issues:
return cached_results
issue_filter = ""
for iid in needed_issues:
issue_filter += str(iid) + "|"
flt = "id:" + issue_filter.rstrip("|")
issue_url = urljoin(self.api_url, "issues/")
params: dict[str, Any] = {
"api_key": self.api_key,
"format": "json",
"filter": flt,
}
cv_response: CVResult[list[CVIssue]] = self._get_cv_content(issue_url, params)
issue_results = cv_response["results"]
page = 1
offset = 0
current_result_count = cv_response["number_of_page_results"]
total_result_count = cv_response["number_of_total_results"]
# see if we need to keep asking for more pages...
while current_result_count < total_result_count:
page += 1
offset += cv_response["number_of_page_results"]
params["offset"] = offset
cv_response = self._get_cv_content(issue_url, params)
issue_results.extend(cv_response["results"])
current_result_count += cv_response["number_of_page_results"]
series_info = {s[0].id: s[0] for s in self._fetch_series([int(i["volume"]["id"]) for i in issue_results])}
for issue in issue_results:
cvc.add_issues_info(
self.id,
[
Issue(
id=str(issue["id"]),
series_id=str(issue["volume"]["id"]),
data=json.dumps(issue).encode("utf-8"),
),
],
False, # The /issues/ endpoint never provides credits
)
cached_results.append(
self._map_comic_issue_to_metadata(issue, series_info[str(issue["volume"]["id"])]),
)
return cached_results
def _fetch_series(self, series_ids: list[int]) -> list[tuple[ComicSeries, bool]]:
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher(self.cache_folder, self.version)
cached_results: list[tuple[ComicSeries, bool]] = []
needed_series: list[int] = []
for series_id in series_ids:
cached_series = cvc.get_series_info(str(series_id), self.id)
if cached_series is not None:
cached_results.append((self._format_series(json.loads(cached_series[0].data)), cached_series[1]))
else:
needed_series.append(series_id)
if needed_series == []:
return cached_results
series_filter = ""
for vid in needed_series:
series_filter += str(vid) + "|"
flt = "id:" + series_filter.rstrip("|") # CV uses volume to mean series
series_url = urljoin(self.api_url, "volumes/") # CV uses volume to mean series
params: dict[str, Any] = {
"api_key": self.api_key,
"format": "json",
"filter": flt,
}
cv_response: CVResult[list[CVSeries]] = self._get_cv_content(series_url, params)
series_results = cv_response["results"]
page = 1
offset = 0
current_result_count = cv_response["number_of_page_results"]
total_result_count = cv_response["number_of_total_results"]
# see if we need to keep asking for more pages...
while current_result_count < total_result_count:
page += 1
offset += cv_response["number_of_page_results"]
params["offset"] = offset
cv_response = self._get_cv_content(series_url, params)
series_results.extend(cv_response["results"])
current_result_count += cv_response["number_of_page_results"]
if series_results:
for series in series_results:
cvc.add_series_info(
self.id,
Series(id=str(series["id"]), data=json.dumps(series).encode("utf-8")),
True,
)
cached_results.append((self._format_series(series), True))
return cached_results
def _get_cv_content(self, url: str, params: dict[str, Any]) -> CVResult[T]:
"""
Get the content from the CV server.
"""
with self.limiter.ratelimit("cv", delay=True):
ratelimit_key = url
if self.api_key == self.default_api_key:
ratelimit_key = "cv"
with self.limiter.ratelimit(ratelimit_key, delay=True):
cv_response: CVResult[T] = self._get_url_content(url, params)
if cv_response["status_code"] != 1:
@ -439,7 +570,7 @@ class ComicVineTalker(ComicTalker):
time.sleep(1)
logger.debug(str(resp.status_code))
if resp.status_code == requests.status_codes.codes.TOO_MANY_REQUESTS:
if resp.status_code in (requests.status_codes.codes.TOO_MANY_REQUESTS, TWITTER_TOO_MANY_REQUESTS):
logger.info(f"{self.name} rate limit encountered. Waiting for 10 seconds\n")
time.sleep(10)
limit_counter += 1

View File

@ -50,6 +50,7 @@ install_requires =
requests==2.*
settngs==0.10.4
text2digits
tomli
typing-extensions>=4.3.0
wordninja
python_requires = >=3.9
@ -66,8 +67,6 @@ comicapi.archiver =
folder = comicapi.archivers.folder:FolderArchiver
comicapi.tags =
cr = comicapi.tags.comicrack:ComicRack
cbi = comicapi.tags.comicbookinfo:ComicBookInfo
comet = comicapi.tags.comet:CoMet
comictagger.talker =
comicvine = comictalker.talkers.comicvine:ComicVineTalker
pyinstaller40 =
@ -88,7 +87,7 @@ QTW =
all =
PyQt5
PyQtWebEngine
comicinfoxml>=0.2.0
comicinfoxml==0.4.*
gcd-talker>0.1.0
metron-talker>0.1.5
pillow-avif-plugin>=1.4.1
@ -96,10 +95,12 @@ all =
py7zr
rarfile>=4.0
pyicu;sys_platform == 'linux' or sys_platform == 'darwin'
archived_tags =
ct-archived-tags
avif =
pillow-avif-plugin>=1.4.1
cix =
comicinfoxml>=0.2.0
comicinfoxml==0.4.*
gcd =
gcd-talker>=0.1.0
jxl =

View File

@ -52,15 +52,6 @@ def test_write_cr(tmp_comic):
md = tmp_comic.read_tags("cr")
def test_write_cbi(tmp_comic):
md = tmp_comic.read_tags("cr")
md.apply_default_page_list(tmp_comic.get_page_name_list())
assert tmp_comic.write_tags(md, "cbi")
md = tmp_comic.read_tags("cbi")
@pytest.mark.xfail(not (comicapi.archivers.rar.rar_support and shutil.which("rar")), reason="rar support")
def test_save_cr_rar(tmp_path, md_saved):
cbr_path = datadir / "fake_cbr.cbr"
@ -78,20 +69,6 @@ def test_save_cr_rar(tmp_path, md_saved):
assert md == md_saved
@pytest.mark.xfail(not (comicapi.archivers.rar.rar_support and shutil.which("rar")), reason="rar support")
def test_save_cbi_rar(tmp_path, md_saved):
cbr_path = pathlib.Path(str(datadir)) / "fake_cbr.cbr"
shutil.copy(cbr_path, tmp_path)
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_path / cbr_path.name)
assert tmp_comic.seems_to_be_a_comic_archive()
assert tmp_comic.write_tags(comicapi.genericmetadata.md_test, "cbi")
md = tmp_comic.read_tags("cbi")
supported_attributes = comicapi.comicarchive.tags["cbi"].supported_attributes
assert md.get_clean_metadata(*supported_attributes) == md_saved.get_clean_metadata(*supported_attributes)
def test_page_type_write(tmp_comic):
md = tmp_comic.read_tags("cr")
t = md.pages[0]

View File

@ -28,10 +28,32 @@ def test_search_results(comic_cache):
@pytest.mark.parametrize("series_info", search_results)
def test_series_info(comic_cache, series_info):
comic_cache.add_series_info(
series=comictalker.comiccacher.Series(id=series_info["id"], data=json.dumps(series_info)),
series=comictalker.comiccacher.Series(id=series_info["id"], data=json.dumps(series_info).encode("utf-8")),
source="test",
complete=True,
)
vi = series_info.copy()
cache_result = json.loads(comic_cache.get_series_info(series_id=series_info["id"], source="test")[0].data)
assert vi == cache_result
@pytest.mark.parametrize("series_info", search_results)
def test_cache_overwrite(comic_cache, series_info):
vi = series_info.copy()
comic_cache.add_series_info(
series=comictalker.comiccacher.Series(id=series_info["id"], data=json.dumps(series_info).encode("utf-8")),
source="test",
complete=True,
) # Populate the cache
# Try to insert an incomplete series with different data
series_info["name"] = "test 3"
comic_cache.add_series_info(
series=comictalker.comiccacher.Series(id=series_info["id"], data=json.dumps(series_info).encode("utf-8")),
source="test",
complete=False,
)
cache_result = json.loads(comic_cache.get_series_info(series_id=series_info["id"], source="test")[0].data)
# Validate that the Series marked complete is still in the cache
assert vi == cache_result

View File

@ -197,7 +197,7 @@ def config(tmp_path):
from comictaggerlib.main import App
app = App()
app.register_settings()
app.register_settings(False)
defaults = app.parse_settings(comictaggerlib.ctsettings.ComicTaggerPaths(tmp_path / "config"), "")
defaults[0].Runtime_Options__config.user_config_dir.mkdir(parents=True, exist_ok=True)
@ -214,7 +214,7 @@ def plugin_config(tmp_path):
ns = Namespace(config=comictaggerlib.ctsettings.ComicTaggerPaths(tmp_path / "config"))
app = App()
app.load_plugins(ns)
app.register_settings()
app.register_settings(False)
defaults = app.parse_settings(ns.config, "")
defaults[0].Runtime_Options__config.user_config_dir.mkdir(parents=True, exist_ok=True)

View File

@ -9,12 +9,15 @@ from comictaggerlib.md import prepare_metadata
tags = []
for x in entry_points(group="comicapi.tag"):
for x in entry_points(group="comicapi.tags"):
tag = x.load()
supported = tag.enabled
exe_found = True
tags.append(pytest.param(tag, marks=pytest.mark.xfail(not supported, reason="tags not enabled")))
if not tags:
raise Exception("No tags found")
@pytest.mark.parametrize("tag_type", tags)
def test_metadata(mock_version, tmp_comic, md_saved, tag_type):
@ -22,20 +25,24 @@ def test_metadata(mock_version, tmp_comic, md_saved, tag_type):
supported_attributes = tag.supported_attributes
tag.write_tags(comicapi.genericmetadata.md_test, tmp_comic.archiver)
written_metadata = tag.read_tags(tmp_comic.archiver)
md = md_saved.get_clean_metadata(*supported_attributes)
md = md_saved._get_clean_metadata(*supported_attributes)
# Hack back in the pages variable because CoMet supports identifying the cover by the filename
if tag.id == "comet":
md.pages = [
comicapi.genericmetadata.ImageMetadata(
image_index=0, filename="!cover.jpg", type=comicapi.genericmetadata.PageType.FrontCover
comicapi.genericmetadata.PageMetadata(
archive_index=0,
bookmark="",
display_index=0,
filename="!cover.jpg",
type=comicapi.genericmetadata.PageType.FrontCover,
)
]
written_metadata = written_metadata.get_clean_metadata(*supported_attributes).replace(
written_metadata = written_metadata._get_clean_metadata(*supported_attributes).replace(
pages=written_metadata.pages
)
else:
written_metadata = written_metadata.get_clean_metadata(*supported_attributes)
written_metadata = written_metadata._get_clean_metadata(*supported_attributes)
assert written_metadata == md