Compare commits
10 Commits
db3d5d6a01
...
4c9096a11b
Author | SHA1 | Date | |
---|---|---|---|
|
4c9096a11b | ||
|
c9c0c99a2a | ||
|
58f71cf6d9 | ||
|
befffc98b1 | ||
|
006f3cbd1f | ||
|
582224abec | ||
|
acb59f9e83 | ||
|
fab30f3f29 | ||
|
2cb6caea8d | ||
|
ffdf7d71e1 |
@ -10,7 +10,7 @@ import comictaggerlib.main
|
||||
def generate() -> str:
|
||||
app = comictaggerlib.main.App()
|
||||
app.load_plugins(app.initial_arg_parser.parse_known_args()[0])
|
||||
app.register_settings()
|
||||
app.register_settings(True)
|
||||
imports, types = settngs.generate_dict(app.manager.definitions)
|
||||
imports2, types2 = settngs.generate_ns(app.manager.definitions)
|
||||
i = imports.splitlines()
|
||||
|
@ -24,7 +24,6 @@ import pathlib
|
||||
import shutil
|
||||
import sys
|
||||
from collections.abc import Iterable
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from comicapi import utils
|
||||
from comicapi.archivers import Archiver, UnknownArchiver, ZipArchiver
|
||||
@ -32,16 +31,13 @@ from comicapi.genericmetadata import GenericMetadata
|
||||
from comicapi.tags import Tag
|
||||
from comictaggerlib.ctversion import version
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from importlib.metadata import EntryPoint
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
archivers: list[type[Archiver]] = []
|
||||
tags: dict[str, Tag] = {}
|
||||
|
||||
|
||||
def load_archive_plugins(local_plugins: Iterable[EntryPoint] = tuple()) -> None:
|
||||
def load_archive_plugins(local_plugins: Iterable[type[Archiver]] = tuple()) -> None:
|
||||
if archivers:
|
||||
return
|
||||
if sys.version_info < (3, 10):
|
||||
@ -52,7 +48,7 @@ def load_archive_plugins(local_plugins: Iterable[EntryPoint] = tuple()) -> None:
|
||||
archive_plugins: list[type[Archiver]] = []
|
||||
# A list is used first matching plugin wins
|
||||
|
||||
for ep in itertools.chain(local_plugins, entry_points(group="comicapi.archiver")):
|
||||
for ep in itertools.chain(entry_points(group="comicapi.archiver")):
|
||||
try:
|
||||
spec = importlib.util.find_spec(ep.module)
|
||||
except ValueError:
|
||||
@ -70,11 +66,12 @@ def load_archive_plugins(local_plugins: Iterable[EntryPoint] = tuple()) -> None:
|
||||
else:
|
||||
logger.exception("Failed to load archive plugin: %s", ep.name)
|
||||
archivers.clear()
|
||||
archivers.extend(local_plugins)
|
||||
archivers.extend(archive_plugins)
|
||||
archivers.extend(builtin)
|
||||
|
||||
|
||||
def load_tag_plugins(version: str = f"ComicAPI/{version}", local_plugins: Iterable[EntryPoint] = tuple()) -> None:
|
||||
def load_tag_plugins(version: str = f"ComicAPI/{version}", local_plugins: Iterable[type[Tag]] = tuple()) -> None:
|
||||
if tags:
|
||||
return
|
||||
if sys.version_info < (3, 10):
|
||||
@ -84,7 +81,7 @@ def load_tag_plugins(version: str = f"ComicAPI/{version}", local_plugins: Iterab
|
||||
builtin: dict[str, Tag] = {}
|
||||
tag_plugins: dict[str, tuple[Tag, str]] = {}
|
||||
# A dict is used, last plugin wins
|
||||
for ep in itertools.chain(entry_points(group="comicapi.tags"), local_plugins):
|
||||
for ep in entry_points(group="comicapi.tags"):
|
||||
location = "Unknown"
|
||||
try:
|
||||
_spec = importlib.util.find_spec(ep.module)
|
||||
@ -109,6 +106,9 @@ def load_tag_plugins(version: str = f"ComicAPI/{version}", local_plugins: Iterab
|
||||
tag_plugins[tag.id] = (tag(version), location)
|
||||
except Exception:
|
||||
logger.exception("Failed to load tag plugin: %s from %s", ep.name, location)
|
||||
# A dict is used, last plugin wins
|
||||
for tag in local_plugins:
|
||||
tag_plugins[tag.id] = (tag(version), "Local")
|
||||
|
||||
for tag_id in set(builtin.keys()).intersection(tag_plugins):
|
||||
location = tag_plugins[tag_id][1]
|
||||
|
@ -94,6 +94,19 @@ class PageMetadata:
|
||||
return False
|
||||
return self.archive_index == other.archive_index
|
||||
|
||||
def _get_clean_metadata(self, *attributes: str) -> PageMetadata:
|
||||
return PageMetadata(
|
||||
filename=self.filename if "filename" in attributes else "",
|
||||
type=self.type if "type" in attributes else "",
|
||||
bookmark=self.bookmark if "bookmark" in attributes else "",
|
||||
display_index=self.display_index if "display_index" in attributes else 0,
|
||||
archive_index=self.archive_index if "archive_index" in attributes else 0,
|
||||
double_page=self.double_page if "double_page" in attributes else None,
|
||||
byte_size=self.byte_size if "byte_size" in attributes else None,
|
||||
height=self.height if "height" in attributes else None,
|
||||
width=self.width if "width" in attributes else None,
|
||||
)
|
||||
|
||||
|
||||
Credit = merge.Credit
|
||||
|
||||
@ -206,14 +219,23 @@ class GenericMetadata:
|
||||
tmp.__dict__.update(kwargs)
|
||||
return tmp
|
||||
|
||||
def get_clean_metadata(self, *attributes: str) -> GenericMetadata:
|
||||
def _get_clean_metadata(self, *attributes: str) -> GenericMetadata:
|
||||
new_md = GenericMetadata()
|
||||
list_handled = []
|
||||
for attr in sorted(attributes):
|
||||
if "." in attr:
|
||||
lst, _, name = attr.partition(".")
|
||||
if lst in list_handled:
|
||||
continue
|
||||
old_value = getattr(self, lst)
|
||||
new_value = getattr(new_md, lst)
|
||||
if old_value:
|
||||
if hasattr(old_value[0], "_get_clean_metadata"):
|
||||
list_attributes = [x.removeprefix(lst + ".") for x in attributes if x.startswith(lst)]
|
||||
for x in old_value:
|
||||
new_value.append(x._get_clean_metadata(*list_attributes))
|
||||
list_handled.append(lst)
|
||||
continue
|
||||
if not new_value:
|
||||
for x in old_value:
|
||||
new_value.append(x.__class__())
|
||||
|
@ -1,323 +0,0 @@
|
||||
"""A class to encapsulate CoMet data"""
|
||||
|
||||
#
|
||||
# Copyright 2012-2014 ComicTagger Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import xml.etree.ElementTree as ET
|
||||
from typing import Any
|
||||
|
||||
from comicapi import utils
|
||||
from comicapi.archivers import Archiver
|
||||
from comicapi.comicarchive import ComicArchive
|
||||
from comicapi.genericmetadata import GenericMetadata, PageMetadata, PageType
|
||||
from comicapi.tags import Tag
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CoMet(Tag):
|
||||
enabled = True
|
||||
|
||||
id = "comet"
|
||||
|
||||
def __init__(self, version: str) -> None:
|
||||
super().__init__(version)
|
||||
|
||||
self.comet_filename = "CoMet.xml"
|
||||
self.file = "CoMet.xml"
|
||||
self.supported_attributes = {
|
||||
"series",
|
||||
"issue",
|
||||
"title",
|
||||
"volume",
|
||||
"genres",
|
||||
"description",
|
||||
"publisher",
|
||||
"language",
|
||||
"format",
|
||||
"maturity_rating",
|
||||
"month",
|
||||
"year",
|
||||
"page_count",
|
||||
"characters",
|
||||
"credits",
|
||||
"credits.person",
|
||||
"credits.primary",
|
||||
"credits.role",
|
||||
"price",
|
||||
"is_version_of",
|
||||
"rights",
|
||||
"identifier",
|
||||
"last_mark",
|
||||
"pages.type", # This is required for setting the cover image none of the other types will be saved
|
||||
"pages",
|
||||
}
|
||||
|
||||
def supports_credit_role(self, role: str) -> bool:
|
||||
return role.casefold() in self._get_parseable_credits()
|
||||
|
||||
def supports_tags(self, archive: Archiver) -> bool:
|
||||
return archive.supports_files()
|
||||
|
||||
def has_tags(self, archive: Archiver) -> bool:
|
||||
if not self.supports_tags(archive):
|
||||
return False
|
||||
has_tags = False
|
||||
# look at all xml files in root, and search for CoMet data, get first
|
||||
for n in archive.get_filename_list():
|
||||
if os.path.dirname(n) == "" and os.path.splitext(n)[1].casefold() == ".xml":
|
||||
# read in XML file, and validate it
|
||||
data = b""
|
||||
try:
|
||||
data = archive.read_file(n)
|
||||
except Exception as e:
|
||||
logger.warning("Error reading in Comet XML for validation! from %s: %s", archive.path, e)
|
||||
if self._validate_bytes(data):
|
||||
# since we found it, save it!
|
||||
self.file = n
|
||||
has_tags = True
|
||||
break
|
||||
return has_tags
|
||||
|
||||
def remove_tags(self, archive: Archiver) -> bool:
|
||||
return self.has_tags(archive) and archive.remove_file(self.file)
|
||||
|
||||
def read_tags(self, archive: Archiver) -> GenericMetadata:
|
||||
if self.has_tags(archive):
|
||||
metadata = archive.read_file(self.file) or b""
|
||||
if self._validate_bytes(metadata):
|
||||
return self._metadata_from_bytes(metadata, archive)
|
||||
return GenericMetadata()
|
||||
|
||||
def read_raw_tags(self, archive: Archiver) -> str:
|
||||
if self.has_tags(archive):
|
||||
return ET.tostring(ET.fromstring(archive.read_file(self.file)), encoding="unicode", xml_declaration=True)
|
||||
return ""
|
||||
|
||||
def write_tags(self, metadata: GenericMetadata, archive: Archiver) -> bool:
|
||||
if self.supports_tags(archive):
|
||||
success = True
|
||||
xml = b""
|
||||
if self.has_tags(archive):
|
||||
xml = archive.read_file(self.file)
|
||||
if self.file != self.comet_filename:
|
||||
success = self.remove_tags(archive)
|
||||
|
||||
return success and archive.write_file(self.comet_filename, self._bytes_from_metadata(metadata, xml))
|
||||
else:
|
||||
logger.warning(f"Archive ({archive.name()}) does not support {self.name()} metadata")
|
||||
return False
|
||||
|
||||
def name(self) -> str:
|
||||
return "Comic Metadata (CoMet)"
|
||||
|
||||
@classmethod
|
||||
def _get_parseable_credits(cls) -> list[str]:
|
||||
parsable_credits: list[str] = []
|
||||
parsable_credits.extend(GenericMetadata.writer_synonyms)
|
||||
parsable_credits.extend(GenericMetadata.penciller_synonyms)
|
||||
parsable_credits.extend(GenericMetadata.inker_synonyms)
|
||||
parsable_credits.extend(GenericMetadata.colorist_synonyms)
|
||||
parsable_credits.extend(GenericMetadata.letterer_synonyms)
|
||||
parsable_credits.extend(GenericMetadata.cover_synonyms)
|
||||
parsable_credits.extend(GenericMetadata.editor_synonyms)
|
||||
return parsable_credits
|
||||
|
||||
def _metadata_from_bytes(self, string: bytes, archive: Archiver) -> GenericMetadata:
|
||||
tree = ET.ElementTree(ET.fromstring(string))
|
||||
return self._convert_xml_to_metadata(tree, archive)
|
||||
|
||||
def _bytes_from_metadata(self, metadata: GenericMetadata, xml: bytes = b"") -> bytes:
|
||||
tree = self._convert_metadata_to_xml(metadata, xml)
|
||||
return ET.tostring(tree.getroot(), encoding="utf-8", xml_declaration=True)
|
||||
|
||||
def _convert_metadata_to_xml(self, metadata: GenericMetadata, xml: bytes = b"") -> ET.ElementTree:
|
||||
# shorthand for the metadata
|
||||
md = metadata
|
||||
|
||||
if xml:
|
||||
root = ET.fromstring(xml)
|
||||
else:
|
||||
# build a tree structure
|
||||
root = ET.Element("comet")
|
||||
root.attrib["xmlns:comet"] = "http://www.denvog.com/comet/"
|
||||
root.attrib["xmlns:xsi"] = "http://www.w3.org/2001/XMLSchema-instance"
|
||||
root.attrib["xsi:schemaLocation"] = "http://www.denvog.com http://www.denvog.com/comet/comet.xsd"
|
||||
|
||||
# helper func
|
||||
def assign(comet_entry: str, md_entry: Any) -> None:
|
||||
if md_entry is not None:
|
||||
ET.SubElement(root, comet_entry).text = str(md_entry)
|
||||
|
||||
# title is manditory
|
||||
assign("title", md.title or "")
|
||||
assign("series", md.series)
|
||||
assign("issue", md.issue) # must be int??
|
||||
assign("volume", md.volume)
|
||||
assign("description", md.description)
|
||||
assign("publisher", md.publisher)
|
||||
assign("pages", md.page_count)
|
||||
assign("format", md.format)
|
||||
assign("language", md.language)
|
||||
assign("rating", md.maturity_rating)
|
||||
assign("price", md.price)
|
||||
assign("isVersionOf", md.is_version_of)
|
||||
assign("rights", md.rights)
|
||||
assign("identifier", md.identifier)
|
||||
assign("lastMark", md.last_mark)
|
||||
assign("genre", ",".join(md.genres)) # TODO repeatable
|
||||
|
||||
for c in md.characters:
|
||||
assign("character", c.strip())
|
||||
|
||||
if md.manga is not None and md.manga == "YesAndRightToLeft":
|
||||
assign("readingDirection", "rtl")
|
||||
|
||||
if md.year is not None:
|
||||
date_str = f"{md.year:04}"
|
||||
if md.month is not None:
|
||||
date_str += f"-{md.month:02}"
|
||||
assign("date", date_str)
|
||||
|
||||
cover_index = md.get_cover_page_index_list()[0]
|
||||
assign("coverImage", md.pages[cover_index].filename)
|
||||
|
||||
# loop thru credits, and build a list for each role that CoMet supports
|
||||
for credit in metadata.credits:
|
||||
if credit.role.casefold() in set(GenericMetadata.writer_synonyms):
|
||||
ET.SubElement(root, "writer").text = str(credit.person)
|
||||
|
||||
if credit.role.casefold() in set(GenericMetadata.penciller_synonyms):
|
||||
ET.SubElement(root, "penciller").text = str(credit.person)
|
||||
|
||||
if credit.role.casefold() in set(GenericMetadata.inker_synonyms):
|
||||
ET.SubElement(root, "inker").text = str(credit.person)
|
||||
|
||||
if credit.role.casefold() in set(GenericMetadata.colorist_synonyms):
|
||||
ET.SubElement(root, "colorist").text = str(credit.person)
|
||||
|
||||
if credit.role.casefold() in set(GenericMetadata.letterer_synonyms):
|
||||
ET.SubElement(root, "letterer").text = str(credit.person)
|
||||
|
||||
if credit.role.casefold() in set(GenericMetadata.cover_synonyms):
|
||||
ET.SubElement(root, "coverDesigner").text = str(credit.person)
|
||||
|
||||
if credit.role.casefold() in set(GenericMetadata.editor_synonyms):
|
||||
ET.SubElement(root, "editor").text = str(credit.person)
|
||||
|
||||
ET.indent(root)
|
||||
|
||||
# wrap it in an ElementTree instance, and save as XML
|
||||
tree = ET.ElementTree(root)
|
||||
return tree
|
||||
|
||||
def _convert_xml_to_metadata(self, tree: ET.ElementTree, archive: Archiver) -> GenericMetadata:
|
||||
root = tree.getroot()
|
||||
|
||||
if root.tag != "comet":
|
||||
raise Exception("Not a CoMet file")
|
||||
|
||||
metadata = GenericMetadata()
|
||||
md = metadata
|
||||
|
||||
# Helper function
|
||||
def get(tag: str) -> Any:
|
||||
node = root.find(tag)
|
||||
if node is not None:
|
||||
return node.text
|
||||
return None
|
||||
|
||||
md.series = utils.xlate(get("series"))
|
||||
md.title = utils.xlate(get("title"))
|
||||
md.issue = utils.xlate(get("issue"))
|
||||
md.volume = utils.xlate_int(get("volume"))
|
||||
md.description = utils.xlate(get("description"))
|
||||
md.publisher = utils.xlate(get("publisher"))
|
||||
md.language = utils.xlate(get("language"))
|
||||
md.format = utils.xlate(get("format"))
|
||||
md.page_count = utils.xlate_int(get("pages"))
|
||||
md.maturity_rating = utils.xlate(get("rating"))
|
||||
md.price = utils.xlate_float(get("price"))
|
||||
md.is_version_of = utils.xlate(get("isVersionOf"))
|
||||
md.rights = utils.xlate(get("rights"))
|
||||
md.identifier = utils.xlate(get("identifier"))
|
||||
md.last_mark = utils.xlate(get("lastMark"))
|
||||
|
||||
_, md.month, md.year = utils.parse_date_str(utils.xlate(get("date")))
|
||||
|
||||
ca = ComicArchive(archive)
|
||||
cover_filename = utils.xlate(get("coverImage"))
|
||||
page_list = ca.get_page_name_list()
|
||||
if cover_filename in page_list:
|
||||
cover_index = page_list.index(cover_filename)
|
||||
md.pages = [
|
||||
PageMetadata(
|
||||
archive_index=cover_index,
|
||||
display_index=0,
|
||||
filename=cover_filename,
|
||||
type=PageType.FrontCover,
|
||||
bookmark="",
|
||||
)
|
||||
]
|
||||
|
||||
reading_direction = utils.xlate(get("readingDirection"))
|
||||
if reading_direction is not None and reading_direction == "rtl":
|
||||
md.manga = "YesAndRightToLeft"
|
||||
|
||||
# loop for genre tags
|
||||
for n in root:
|
||||
if n.tag == "genre":
|
||||
md.genres.add((n.text or "").strip())
|
||||
|
||||
# loop for character tags
|
||||
for n in root:
|
||||
if n.tag == "character":
|
||||
md.characters.add((n.text or "").strip())
|
||||
|
||||
# Now extract the credit info
|
||||
for n in root:
|
||||
if any(
|
||||
[
|
||||
n.tag == "writer",
|
||||
n.tag == "penciller",
|
||||
n.tag == "inker",
|
||||
n.tag == "colorist",
|
||||
n.tag == "letterer",
|
||||
n.tag == "editor",
|
||||
]
|
||||
):
|
||||
metadata.add_credit((n.text or "").strip(), n.tag.title())
|
||||
|
||||
if n.tag == "coverDesigner":
|
||||
metadata.add_credit((n.text or "").strip(), "Cover")
|
||||
|
||||
metadata.is_empty = False
|
||||
|
||||
return metadata
|
||||
|
||||
# verify that the string actually contains CoMet data in XML format
|
||||
def _validate_bytes(self, string: bytes) -> bool:
|
||||
try:
|
||||
tree = ET.ElementTree(ET.fromstring(string))
|
||||
root = tree.getroot()
|
||||
if root.tag != "comet":
|
||||
return False
|
||||
except ET.ParseError:
|
||||
return False
|
||||
|
||||
return True
|
@ -1,229 +0,0 @@
|
||||
"""A class to encapsulate the ComicBookInfo data"""
|
||||
|
||||
# Copyright 2012-2014 ComicTagger Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Any, Literal, TypedDict
|
||||
|
||||
from comicapi import utils
|
||||
from comicapi.archivers import Archiver
|
||||
from comicapi.genericmetadata import Credit, GenericMetadata
|
||||
from comicapi.tags import Tag
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_CBILiteralType = Literal[
|
||||
"series",
|
||||
"title",
|
||||
"issue",
|
||||
"publisher",
|
||||
"publicationMonth",
|
||||
"publicationYear",
|
||||
"numberOfIssues",
|
||||
"comments",
|
||||
"genre",
|
||||
"volume",
|
||||
"numberOfVolumes",
|
||||
"language",
|
||||
"country",
|
||||
"rating",
|
||||
"credits",
|
||||
"tags",
|
||||
]
|
||||
|
||||
|
||||
class credit(TypedDict):
|
||||
person: str
|
||||
role: str
|
||||
primary: bool
|
||||
|
||||
|
||||
class _ComicBookInfoJson(TypedDict, total=False):
|
||||
series: str
|
||||
title: str
|
||||
publisher: str
|
||||
publicationMonth: int
|
||||
publicationYear: int
|
||||
issue: int
|
||||
numberOfIssues: int
|
||||
volume: int
|
||||
numberOfVolumes: int
|
||||
rating: int
|
||||
genre: str
|
||||
language: str
|
||||
country: str
|
||||
credits: list[credit]
|
||||
tags: list[str]
|
||||
comments: str
|
||||
|
||||
|
||||
_CBIContainer = TypedDict("_CBIContainer", {"appID": str, "lastModified": str, "ComicBookInfo/1.0": _ComicBookInfoJson})
|
||||
|
||||
|
||||
class ComicBookInfo(Tag):
|
||||
enabled = True
|
||||
|
||||
id = "cbi"
|
||||
|
||||
def __init__(self, version: str) -> None:
|
||||
super().__init__(version)
|
||||
|
||||
self.supported_attributes = {
|
||||
"series",
|
||||
"issue",
|
||||
"issue_count",
|
||||
"title",
|
||||
"volume",
|
||||
"volume_count",
|
||||
"genres",
|
||||
"description",
|
||||
"publisher",
|
||||
"month",
|
||||
"year",
|
||||
"language",
|
||||
"country",
|
||||
"critical_rating",
|
||||
"tags",
|
||||
"credits",
|
||||
"credits.person",
|
||||
"credits.primary",
|
||||
"credits.role",
|
||||
}
|
||||
|
||||
def supports_credit_role(self, role: str) -> bool:
|
||||
return True
|
||||
|
||||
def supports_tags(self, archive: Archiver) -> bool:
|
||||
return archive.supports_comment()
|
||||
|
||||
def has_tags(self, archive: Archiver) -> bool:
|
||||
return self.supports_tags(archive) and self._validate_string(archive.get_comment())
|
||||
|
||||
def remove_tags(self, archive: Archiver) -> bool:
|
||||
return archive.set_comment("")
|
||||
|
||||
def read_tags(self, archive: Archiver) -> GenericMetadata:
|
||||
if self.has_tags(archive):
|
||||
comment = archive.get_comment()
|
||||
if self._validate_string(comment):
|
||||
return self._metadata_from_string(comment)
|
||||
return GenericMetadata()
|
||||
|
||||
def read_raw_tags(self, archive: Archiver) -> str:
|
||||
if self.has_tags(archive):
|
||||
return json.dumps(json.loads(archive.get_comment()), indent=2)
|
||||
return ""
|
||||
|
||||
def write_tags(self, metadata: GenericMetadata, archive: Archiver) -> bool:
|
||||
if self.supports_tags(archive):
|
||||
return archive.set_comment(self._string_from_metadata(metadata))
|
||||
else:
|
||||
logger.warning(f"Archive ({archive.name()}) does not support {self.name()} metadata")
|
||||
return False
|
||||
|
||||
def name(self) -> str:
|
||||
return "ComicBookInfo"
|
||||
|
||||
def _metadata_from_string(self, string: str) -> GenericMetadata:
|
||||
cbi_container: _CBIContainer = json.loads(string)
|
||||
|
||||
metadata = GenericMetadata()
|
||||
|
||||
cbi = cbi_container["ComicBookInfo/1.0"]
|
||||
|
||||
metadata.series = utils.xlate(cbi.get("series"))
|
||||
metadata.title = utils.xlate(cbi.get("title"))
|
||||
metadata.issue = utils.xlate(cbi.get("issue"))
|
||||
metadata.publisher = utils.xlate(cbi.get("publisher"))
|
||||
metadata.month = utils.xlate_int(cbi.get("publicationMonth"))
|
||||
metadata.year = utils.xlate_int(cbi.get("publicationYear"))
|
||||
metadata.issue_count = utils.xlate_int(cbi.get("numberOfIssues"))
|
||||
metadata.description = utils.xlate(cbi.get("comments"))
|
||||
metadata.genres = set(utils.split(cbi.get("genre"), ","))
|
||||
metadata.volume = utils.xlate_int(cbi.get("volume"))
|
||||
metadata.volume_count = utils.xlate_int(cbi.get("numberOfVolumes"))
|
||||
metadata.language = utils.xlate(cbi.get("language"))
|
||||
metadata.country = utils.xlate(cbi.get("country"))
|
||||
metadata.critical_rating = utils.xlate_int(cbi.get("rating"))
|
||||
|
||||
metadata.credits = [
|
||||
Credit(
|
||||
person=x["person"] if "person" in x else "",
|
||||
role=x["role"] if "role" in x else "",
|
||||
primary=x["primary"] if "primary" in x else False,
|
||||
)
|
||||
for x in cbi.get("credits", [])
|
||||
]
|
||||
metadata.tags.update(cbi.get("tags", set()))
|
||||
|
||||
# need the language string to be ISO
|
||||
if metadata.language:
|
||||
metadata.language = utils.get_language_iso(metadata.language)
|
||||
|
||||
metadata.is_empty = False
|
||||
|
||||
return metadata
|
||||
|
||||
def _string_from_metadata(self, metadata: GenericMetadata) -> str:
|
||||
cbi_container = self._create_json_dictionary(metadata)
|
||||
return json.dumps(cbi_container)
|
||||
|
||||
def _validate_string(self, string: bytes | str) -> bool:
|
||||
"""Verify that the string actually contains CBI data in JSON format"""
|
||||
|
||||
try:
|
||||
cbi_container = json.loads(string)
|
||||
except json.JSONDecodeError:
|
||||
return False
|
||||
|
||||
return "ComicBookInfo/1.0" in cbi_container
|
||||
|
||||
def _create_json_dictionary(self, metadata: GenericMetadata) -> _CBIContainer:
|
||||
"""Create the dictionary that we will convert to JSON text"""
|
||||
|
||||
cbi_container = _CBIContainer(
|
||||
{
|
||||
"appID": "ComicTagger/1.0.0",
|
||||
"lastModified": str(datetime.now()),
|
||||
"ComicBookInfo/1.0": {},
|
||||
}
|
||||
) # TODO: ctversion.version,
|
||||
|
||||
# helper func
|
||||
def assign(cbi_entry: _CBILiteralType, md_entry: Any) -> None:
|
||||
if md_entry is not None or isinstance(md_entry, str) and md_entry != "":
|
||||
cbi_container["ComicBookInfo/1.0"][cbi_entry] = md_entry
|
||||
|
||||
assign("series", utils.xlate(metadata.series))
|
||||
assign("title", utils.xlate(metadata.title))
|
||||
assign("issue", utils.xlate(metadata.issue))
|
||||
assign("publisher", utils.xlate(metadata.publisher))
|
||||
assign("publicationMonth", utils.xlate_int(metadata.month))
|
||||
assign("publicationYear", utils.xlate_int(metadata.year))
|
||||
assign("numberOfIssues", utils.xlate_int(metadata.issue_count))
|
||||
assign("comments", utils.xlate(metadata.description))
|
||||
assign("genre", utils.xlate(",".join(metadata.genres)))
|
||||
assign("volume", utils.xlate_int(metadata.volume))
|
||||
assign("numberOfVolumes", utils.xlate_int(metadata.volume_count))
|
||||
assign("language", utils.xlate(utils.get_language_from_iso(metadata.language)))
|
||||
assign("country", utils.xlate(metadata.country))
|
||||
assign("rating", utils.xlate_int(metadata.critical_rating))
|
||||
assign("credits", [credit(person=c.person, role=c.role, primary=c.primary) for c in metadata.credits])
|
||||
assign("tags", list(metadata.tags))
|
||||
|
||||
return cbi_container
|
@ -88,7 +88,7 @@ if sys.version_info < (3, 11):
|
||||
cls._lower_members = {x.casefold(): x for x in cls} # type: ignore[attr-defined]
|
||||
return cls._lower_members.get(value.casefold(), None) # type: ignore[attr-defined]
|
||||
|
||||
def __str__(self):
|
||||
def __str__(self) -> str:
|
||||
return self.value
|
||||
|
||||
else:
|
||||
|
@ -36,6 +36,7 @@ from comictaggerlib.filerenamer import FileRenamer, get_rename_dir
|
||||
from comictaggerlib.graphics import graphics_path
|
||||
from comictaggerlib.issueidentifier import IssueIdentifier
|
||||
from comictaggerlib.md import prepare_metadata
|
||||
from comictaggerlib.quick_tag import QuickTag
|
||||
from comictaggerlib.resulttypes import Action, IssueResult, MatchStatus, OnlineMatchResults, Result, Status
|
||||
from comictalker.comictalker import ComicTalker, TalkerError
|
||||
|
||||
@ -397,6 +398,153 @@ class CLI:
|
||||
res.status = status
|
||||
return res
|
||||
|
||||
def try_quick_tag(self, ca: ComicArchive, md: GenericMetadata) -> GenericMetadata | None:
|
||||
if not self.config.Runtime_Options__enable_quick_tag:
|
||||
self.output("skipping quick tag")
|
||||
return None
|
||||
self.output("starting quick tag")
|
||||
try:
|
||||
qt = QuickTag(
|
||||
self.config.Quick_Tag__url,
|
||||
str(utils.parse_url(self.current_talker().website).host),
|
||||
self.current_talker(),
|
||||
self.config,
|
||||
self.output,
|
||||
)
|
||||
ct_md = qt.id_comic(
|
||||
ca,
|
||||
md,
|
||||
self.config.Quick_Tag__simple,
|
||||
set(self.config.Quick_Tag__hash),
|
||||
self.config.Quick_Tag__exact_only,
|
||||
self.config.Runtime_Options__interactive,
|
||||
self.config.Quick_Tag__aggressive_filtering,
|
||||
self.config.Quick_Tag__max,
|
||||
)
|
||||
if ct_md is None:
|
||||
ct_md = GenericMetadata()
|
||||
return ct_md
|
||||
except Exception:
|
||||
logger.exception("Quick Tagging failed")
|
||||
return None
|
||||
|
||||
def normal_tag(
|
||||
self, ca: ComicArchive, tags_read: list[str], md: GenericMetadata, match_results: OnlineMatchResults
|
||||
) -> tuple[GenericMetadata, list[IssueResult], Result | None, OnlineMatchResults]:
|
||||
# ct_md, results, matches, match_results
|
||||
if md is None or md.is_empty:
|
||||
logger.error("No metadata given to search online with!")
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.match_failure,
|
||||
original_path=ca.path,
|
||||
match_status=MatchStatus.no_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.no_matches.append(res)
|
||||
return GenericMetadata(), [], res, match_results
|
||||
|
||||
ii = IssueIdentifier(ca, self.config, self.current_talker())
|
||||
|
||||
ii.set_output_function(functools.partial(self.output, already_logged=True))
|
||||
if not self.config.Auto_Tag__use_year_when_identifying:
|
||||
md.year = None
|
||||
if self.config.Auto_Tag__ignore_leading_numbers_in_filename and md.series is not None:
|
||||
md.series = re.sub(r"^([\d.]+)(.*)", r"\2", md.series)
|
||||
result, matches = ii.identify(ca, md)
|
||||
|
||||
found_match = False
|
||||
choices = False
|
||||
low_confidence = False
|
||||
|
||||
if result == IssueIdentifier.result_no_matches:
|
||||
pass
|
||||
elif result == IssueIdentifier.result_found_match_but_bad_cover_score:
|
||||
low_confidence = True
|
||||
found_match = True
|
||||
elif result == IssueIdentifier.result_found_match_but_not_first_page:
|
||||
found_match = True
|
||||
elif result == IssueIdentifier.result_multiple_matches_with_bad_image_scores:
|
||||
low_confidence = True
|
||||
choices = True
|
||||
elif result == IssueIdentifier.result_one_good_match:
|
||||
found_match = True
|
||||
elif result == IssueIdentifier.result_multiple_good_matches:
|
||||
choices = True
|
||||
|
||||
if choices:
|
||||
if low_confidence:
|
||||
logger.error("Online search: Multiple low confidence matches. Save aborted")
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.match_failure,
|
||||
original_path=ca.path,
|
||||
online_results=matches,
|
||||
match_status=MatchStatus.low_confidence_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.low_confidence_matches.append(res)
|
||||
return GenericMetadata(), matches, res, match_results
|
||||
|
||||
logger.error("Online search: Multiple good matches. Save aborted")
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.match_failure,
|
||||
original_path=ca.path,
|
||||
online_results=matches,
|
||||
match_status=MatchStatus.multiple_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.multiple_matches.append(res)
|
||||
return GenericMetadata(), matches, res, match_results
|
||||
if low_confidence and self.config.Runtime_Options__abort_on_low_confidence:
|
||||
logger.error("Online search: Low confidence match. Save aborted")
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.match_failure,
|
||||
original_path=ca.path,
|
||||
online_results=matches,
|
||||
match_status=MatchStatus.low_confidence_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.low_confidence_matches.append(res)
|
||||
return GenericMetadata(), matches, res, match_results
|
||||
if not found_match:
|
||||
logger.error("Online search: No match found. Save aborted")
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.match_failure,
|
||||
original_path=ca.path,
|
||||
online_results=matches,
|
||||
match_status=MatchStatus.no_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.no_matches.append(res)
|
||||
return GenericMetadata(), matches, res, match_results
|
||||
|
||||
# we got here, so we have a single match
|
||||
|
||||
# now get the particular issue data
|
||||
ct_md = self.fetch_metadata(matches[0].issue_id)
|
||||
if ct_md.is_empty:
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.fetch_data_failure,
|
||||
original_path=ca.path,
|
||||
online_results=matches,
|
||||
match_status=MatchStatus.good_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.fetch_data_failures.append(res)
|
||||
return GenericMetadata(), matches, res, match_results
|
||||
return ct_md, matches, None, match_results
|
||||
|
||||
def save(self, ca: ComicArchive, match_results: OnlineMatchResults) -> tuple[Result, OnlineMatchResults]:
|
||||
if self.config.Runtime_Options__skip_existing_tags:
|
||||
for tag_id in self.config.Runtime_Options__tags_write:
|
||||
@ -455,117 +603,34 @@ class CLI:
|
||||
return res, match_results
|
||||
|
||||
else:
|
||||
if md is None or md.is_empty:
|
||||
logger.error("No metadata given to search online with!")
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.match_failure,
|
||||
original_path=ca.path,
|
||||
match_status=MatchStatus.no_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.no_matches.append(res)
|
||||
return res, match_results
|
||||
|
||||
ii = IssueIdentifier(ca, self.config, self.current_talker())
|
||||
|
||||
ii.set_output_function(functools.partial(self.output, already_logged=True))
|
||||
if not self.config.Auto_Tag__use_year_when_identifying:
|
||||
md.year = None
|
||||
if self.config.Auto_Tag__ignore_leading_numbers_in_filename and md.series is not None:
|
||||
md.series = re.sub(r"^([\d.]+)(.*)", r"\2", md.series)
|
||||
result, matches = ii.identify(ca, md)
|
||||
|
||||
found_match = False
|
||||
choices = False
|
||||
low_confidence = False
|
||||
|
||||
if result == IssueIdentifier.result_no_matches:
|
||||
pass
|
||||
elif result == IssueIdentifier.result_found_match_but_bad_cover_score:
|
||||
low_confidence = True
|
||||
found_match = True
|
||||
elif result == IssueIdentifier.result_found_match_but_not_first_page:
|
||||
found_match = True
|
||||
elif result == IssueIdentifier.result_multiple_matches_with_bad_image_scores:
|
||||
low_confidence = True
|
||||
choices = True
|
||||
elif result == IssueIdentifier.result_one_good_match:
|
||||
found_match = True
|
||||
elif result == IssueIdentifier.result_multiple_good_matches:
|
||||
choices = True
|
||||
|
||||
if choices:
|
||||
if low_confidence:
|
||||
logger.error("Online search: Multiple low confidence matches. Save aborted")
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.match_failure,
|
||||
original_path=ca.path,
|
||||
online_results=matches,
|
||||
match_status=MatchStatus.low_confidence_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.low_confidence_matches.append(res)
|
||||
qt_md = self.try_quick_tag(ca, md)
|
||||
if qt_md is None or qt_md.is_empty:
|
||||
if qt_md is not None:
|
||||
self.output("Failed to find match via quick tag")
|
||||
ct_md, matches, res, match_results = self.normal_tag(ca, tags_read, md, match_results) # type: ignore[assignment]
|
||||
if res is not None:
|
||||
return res, match_results
|
||||
|
||||
logger.error("Online search: Multiple good matches. Save aborted")
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.match_failure,
|
||||
original_path=ca.path,
|
||||
online_results=matches,
|
||||
match_status=MatchStatus.multiple_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.multiple_matches.append(res)
|
||||
return res, match_results
|
||||
if low_confidence and self.config.Runtime_Options__abort_on_low_confidence:
|
||||
logger.error("Online search: Low confidence match. Save aborted")
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.match_failure,
|
||||
original_path=ca.path,
|
||||
online_results=matches,
|
||||
match_status=MatchStatus.low_confidence_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.low_confidence_matches.append(res)
|
||||
return res, match_results
|
||||
if not found_match:
|
||||
logger.error("Online search: No match found. Save aborted")
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.match_failure,
|
||||
original_path=ca.path,
|
||||
online_results=matches,
|
||||
match_status=MatchStatus.no_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.no_matches.append(res)
|
||||
return res, match_results
|
||||
|
||||
# we got here, so we have a single match
|
||||
|
||||
# now get the particular issue data
|
||||
ct_md = self.fetch_metadata(matches[0].issue_id)
|
||||
if ct_md.is_empty:
|
||||
res = Result(
|
||||
Action.save,
|
||||
status=Status.fetch_data_failure,
|
||||
original_path=ca.path,
|
||||
online_results=matches,
|
||||
match_status=MatchStatus.good_match,
|
||||
tags_written=self.config.Runtime_Options__tags_write,
|
||||
tags_read=tags_read,
|
||||
)
|
||||
match_results.fetch_data_failures.append(res)
|
||||
return res, match_results
|
||||
else:
|
||||
self.output("Successfully matched via quick tag")
|
||||
ct_md = qt_md
|
||||
matches = [
|
||||
IssueResult(
|
||||
series=ct_md.series or "",
|
||||
distance=-1,
|
||||
issue_number=ct_md.issue or "",
|
||||
issue_count=ct_md.issue_count,
|
||||
url_image_hash=-1,
|
||||
issue_title=ct_md.title or "",
|
||||
issue_id=ct_md.issue_id or "",
|
||||
series_id=ct_md.issue_id or "",
|
||||
month=ct_md.month,
|
||||
year=ct_md.year,
|
||||
publisher=None,
|
||||
image_url=ct_md._cover_image or "",
|
||||
alt_image_urls=[],
|
||||
description=ct_md.description or "",
|
||||
)
|
||||
]
|
||||
|
||||
res = Result(
|
||||
Action.save,
|
||||
|
@ -104,6 +104,9 @@ def save_file(
|
||||
filename: A pathlib.Path object to save the json dictionary to
|
||||
"""
|
||||
file_options = settngs.clean_config(config, file=True)
|
||||
if "Quick Tag" in file_options and "url" in file_options["Quick Tag"]:
|
||||
file_options["Quick Tag"]["url"] = str(file_options["Quick Tag"]["url"])
|
||||
|
||||
try:
|
||||
if not filename.exists():
|
||||
filename.parent.mkdir(exist_ok=True, parents=True)
|
||||
|
@ -27,7 +27,7 @@ import settngs
|
||||
|
||||
from comicapi import utils
|
||||
from comicapi.comicarchive import tags
|
||||
from comictaggerlib import ctversion
|
||||
from comictaggerlib import ctversion, quick_tag
|
||||
from comictaggerlib.ctsettings.settngs_namespace import SettngsNS as ct_ns
|
||||
from comictaggerlib.ctsettings.types import ComicTaggerPaths, tag
|
||||
from comictaggerlib.resulttypes import Action
|
||||
@ -51,6 +51,12 @@ def initial_commandline_parser() -> argparse.ArgumentParser:
|
||||
default=0,
|
||||
help="Be noisy when doing what it does. Use a second time to enable debug logs.\nShort option cannot be combined with other options.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--enable-quick-tag",
|
||||
action=argparse.BooleanOptionalAction,
|
||||
default=False,
|
||||
help='Enable the expiremental "quick tagger"',
|
||||
)
|
||||
return parser
|
||||
|
||||
|
||||
@ -70,6 +76,13 @@ def register_runtime(parser: settngs.Manager) -> None:
|
||||
help="Be noisy when doing what it does. Use a second time to enable debug logs.\nShort option cannot be combined with other options.",
|
||||
file=False,
|
||||
)
|
||||
parser.add_setting(
|
||||
"--enable-quick-tag",
|
||||
action=argparse.BooleanOptionalAction,
|
||||
default=False,
|
||||
help='Enable the expiremental "quick tagger"',
|
||||
file=False,
|
||||
)
|
||||
parser.add_setting("-q", "--quiet", action="store_true", help="Don't say much (for print mode).", file=False)
|
||||
parser.add_setting(
|
||||
"-j",
|
||||
@ -240,9 +253,11 @@ def register_commands(parser: settngs.Manager) -> None:
|
||||
)
|
||||
|
||||
|
||||
def register_commandline_settings(parser: settngs.Manager) -> None:
|
||||
def register_commandline_settings(parser: settngs.Manager, enable_quick_tag: bool) -> None:
|
||||
parser.add_group("Commands", register_commands, True)
|
||||
parser.add_persistent_group("Runtime Options", register_runtime)
|
||||
if enable_quick_tag:
|
||||
parser.add_group("Quick Tag", quick_tag.settings)
|
||||
|
||||
|
||||
def validate_commandline_settings(config: settngs.Config[ct_ns], parser: settngs.Manager) -> settngs.Config[ct_ns]:
|
||||
|
@ -27,8 +27,8 @@ def general(parser: settngs.Manager) -> None:
|
||||
def internal(parser: settngs.Manager) -> None:
|
||||
# automatic settings
|
||||
parser.add_setting("install_id", default=uuid.uuid4().hex, cmdline=False)
|
||||
parser.add_setting("write_tags", default=["cbi"], cmdline=False)
|
||||
parser.add_setting("read_tags", default=["cbi"], cmdline=False)
|
||||
parser.add_setting("write_tags", default=["cr"], cmdline=False)
|
||||
parser.add_setting("read_tags", default=["cr"], cmdline=False)
|
||||
parser.add_setting("last_opened_folder", default="", cmdline=False)
|
||||
parser.add_setting("window_width", default=0, cmdline=False)
|
||||
parser.add_setting("window_height", default=0, cmdline=False)
|
||||
@ -356,7 +356,7 @@ def migrate_settings(config: settngs.Config[ct_ns]) -> settngs.Config[ct_ns]:
|
||||
elif isinstance(write_Tags, str):
|
||||
config[0].internal__write_tags = [write_Tags]
|
||||
else:
|
||||
config[0].internal__write_tags = ["cbi"]
|
||||
config[0].internal__write_tags = ["cr"]
|
||||
|
||||
read_tags = config[0].internal__read_tags
|
||||
if not isinstance(read_tags, list):
|
||||
@ -365,7 +365,7 @@ def migrate_settings(config: settngs.Config[ct_ns]) -> settngs.Config[ct_ns]:
|
||||
elif isinstance(read_tags, str):
|
||||
config[0].internal__read_tags = [read_tags]
|
||||
else:
|
||||
config[0].internal__read_tags = ["cbi"]
|
||||
config[0].internal__read_tags = ["cr"]
|
||||
|
||||
return config
|
||||
|
||||
|
@ -5,17 +5,52 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import configparser
|
||||
import importlib.metadata
|
||||
import importlib.util
|
||||
import itertools
|
||||
import logging
|
||||
import pathlib
|
||||
import platform
|
||||
import re
|
||||
from collections.abc import Generator
|
||||
from typing import Any, NamedTuple
|
||||
import sys
|
||||
from collections.abc import Generator, Iterable
|
||||
from typing import Any, NamedTuple, TypeVar
|
||||
|
||||
if sys.version_info < (3, 10):
|
||||
import importlib_metadata
|
||||
else:
|
||||
import importlib.metadata as importlib_metadata
|
||||
import tomli
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
NORMALIZE_PACKAGE_NAME_RE = re.compile(r"[-_.]+")
|
||||
PLUGIN_GROUPS = frozenset(("comictagger.talker", "comicapi.archiver", "comicapi.tags"))
|
||||
icu_available = importlib.util.find_spec("icu") is not None
|
||||
|
||||
|
||||
def _custom_key(tup: Any) -> Any:
|
||||
import natsort
|
||||
|
||||
lst = []
|
||||
for x in natsort.os_sort_keygen()(tup):
|
||||
ret = x
|
||||
if len(x) > 1 and isinstance(x[1], int) and isinstance(x[0], str) and x[0] == "":
|
||||
ret = ("a", *x[1:])
|
||||
|
||||
lst.append(ret)
|
||||
return tuple(lst)
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
def os_sorted(lst: Iterable[T]) -> Iterable[T]:
|
||||
import natsort
|
||||
|
||||
key = _custom_key
|
||||
if icu_available or platform.system() == "Windows":
|
||||
key = natsort.os_sort_keygen()
|
||||
return sorted(lst, key=key)
|
||||
|
||||
|
||||
class FailedToLoadPlugin(Exception):
|
||||
@ -47,9 +82,12 @@ class Plugin(NamedTuple):
|
||||
|
||||
package: str
|
||||
version: str
|
||||
entry_point: importlib.metadata.EntryPoint
|
||||
entry_point: importlib_metadata.EntryPoint
|
||||
path: pathlib.Path
|
||||
|
||||
def load(self) -> LoadedPlugin:
|
||||
return LoadedPlugin(self, self.entry_point.load())
|
||||
|
||||
|
||||
class LoadedPlugin(NamedTuple):
|
||||
"""Represents a plugin after being imported."""
|
||||
@ -71,11 +109,11 @@ class LoadedPlugin(NamedTuple):
|
||||
class Plugins(NamedTuple):
|
||||
"""Classified plugins."""
|
||||
|
||||
archivers: list[Plugin]
|
||||
tags: list[Plugin]
|
||||
talkers: list[Plugin]
|
||||
archivers: list[LoadedPlugin]
|
||||
tags: list[LoadedPlugin]
|
||||
talkers: list[LoadedPlugin]
|
||||
|
||||
def all_plugins(self) -> Generator[Plugin]:
|
||||
def all_plugins(self) -> Generator[LoadedPlugin]:
|
||||
"""Return an iterator over all :class:`LoadedPlugin`s."""
|
||||
yield from self.archivers
|
||||
yield from self.tags
|
||||
@ -83,13 +121,24 @@ class Plugins(NamedTuple):
|
||||
|
||||
def versions_str(self) -> str:
|
||||
"""Return a user-displayed list of plugin versions."""
|
||||
return ", ".join(sorted({f"{plugin.package}: {plugin.version}" for plugin in self.all_plugins()}))
|
||||
return ", ".join(sorted({f"{plugin.plugin.package}: {plugin.plugin.version}" for plugin in self.all_plugins()}))
|
||||
|
||||
|
||||
def _find_local_plugins(plugin_path: pathlib.Path) -> Generator[Plugin]:
|
||||
def _find_ep_plugin(plugin_path: pathlib.Path) -> None | Generator[Plugin]:
|
||||
logger.debug("Checking for distributions in %s", plugin_path)
|
||||
for dist in importlib_metadata.distributions(path=[str(plugin_path)]):
|
||||
logger.debug("found distribution %s", dist.name)
|
||||
eps = dist.entry_points
|
||||
for group in PLUGIN_GROUPS:
|
||||
for ep in eps.select(group=group):
|
||||
logger.debug("found EntryPoint group %s %s=%s", group, ep.name, ep.value)
|
||||
yield Plugin(plugin_path.name, dist.version, ep, plugin_path)
|
||||
return None
|
||||
|
||||
|
||||
def _find_cfg_plugin(setup_cfg_path: pathlib.Path) -> Generator[Plugin]:
|
||||
cfg = configparser.ConfigParser(interpolation=None)
|
||||
cfg.read(plugin_path / "setup.cfg")
|
||||
cfg.read(setup_cfg_path)
|
||||
|
||||
for group in PLUGIN_GROUPS:
|
||||
for plugin_s in cfg.get("options.entry_points", group, fallback="").splitlines():
|
||||
@ -98,8 +147,43 @@ def _find_local_plugins(plugin_path: pathlib.Path) -> Generator[Plugin]:
|
||||
|
||||
name, _, entry_str = plugin_s.partition("=")
|
||||
name, entry_str = name.strip(), entry_str.strip()
|
||||
ep = importlib.metadata.EntryPoint(name, entry_str, group)
|
||||
yield Plugin(plugin_path.name, cfg.get("metadata", "version", fallback="0.0.1"), ep, plugin_path)
|
||||
ep = importlib_metadata.EntryPoint(name, entry_str, group)
|
||||
yield Plugin(
|
||||
setup_cfg_path.parent.name, cfg.get("metadata", "version", fallback="0.0.1"), ep, setup_cfg_path.parent
|
||||
)
|
||||
|
||||
|
||||
def _find_pyproject_plugin(pyproject_path: pathlib.Path) -> Generator[Plugin]:
|
||||
cfg = tomli.loads(pyproject_path.read_text())
|
||||
|
||||
for group in PLUGIN_GROUPS:
|
||||
cfg["project"]["entry-points"]
|
||||
for plugins in cfg.get("project", {}).get("entry-points", {}).get(group, {}):
|
||||
if not plugins:
|
||||
continue
|
||||
for name, entry_str in plugins.items():
|
||||
ep = importlib_metadata.EntryPoint(name, entry_str, group)
|
||||
yield Plugin(
|
||||
pyproject_path.parent.name,
|
||||
cfg.get("project", {}).get("version", "0.0.1"),
|
||||
ep,
|
||||
pyproject_path.parent,
|
||||
)
|
||||
|
||||
|
||||
def _find_local_plugins(plugin_path: pathlib.Path) -> Generator[Plugin]:
|
||||
gen = _find_ep_plugin(plugin_path)
|
||||
if gen is not None:
|
||||
yield from gen
|
||||
return
|
||||
|
||||
if (plugin_path / "setup.cfg").is_file():
|
||||
yield from _find_cfg_plugin(plugin_path / "setup.cfg")
|
||||
return
|
||||
|
||||
if (plugin_path / "pyproject.cfg").is_file():
|
||||
yield from _find_pyproject_plugin(plugin_path / "setup.cfg")
|
||||
return
|
||||
|
||||
|
||||
def _check_required_plugins(plugins: list[Plugin], expected: frozenset[str]) -> None:
|
||||
@ -118,30 +202,48 @@ def _check_required_plugins(plugins: list[Plugin], expected: frozenset[str]) ->
|
||||
|
||||
def find_plugins(plugin_folder: pathlib.Path) -> Plugins:
|
||||
"""Discovers all plugins (but does not load them)."""
|
||||
ret: list[Plugin] = []
|
||||
for plugin_path in plugin_folder.glob("*/setup.cfg"):
|
||||
try:
|
||||
ret.extend(_find_local_plugins(plugin_path.parent))
|
||||
except Exception as err:
|
||||
FailedToLoadPlugin(plugin_path.parent.name, err)
|
||||
ret: list[LoadedPlugin] = []
|
||||
|
||||
# for determinism, sort the list
|
||||
ret.sort()
|
||||
dirs = {x.parent for x in plugin_folder.glob("*/setup.cfg")}
|
||||
dirs.update({x.parent for x in plugin_folder.glob("*/pyproject.toml")})
|
||||
|
||||
zips = [x for x in plugin_folder.glob("*.zip") if x.is_file()]
|
||||
|
||||
for plugin_path in itertools.chain(os_sorted(zips), os_sorted(dirs)):
|
||||
logger.debug("looking for plugins in %s", plugin_path)
|
||||
try:
|
||||
sys.path.append(str(plugin_path))
|
||||
for plugin in _find_local_plugins(plugin_path):
|
||||
logger.debug("Attempting to load %s from %s", plugin.entry_point.name, plugin.path)
|
||||
ret.append(plugin.load())
|
||||
# sys.path.remove(str(plugin_path))
|
||||
except Exception as err:
|
||||
FailedToLoadPlugin(plugin_path.name, err)
|
||||
finally:
|
||||
sys.path.remove(str(plugin_path))
|
||||
for mod in list(sys.modules.values()):
|
||||
if (
|
||||
mod is not None
|
||||
and hasattr(mod, "__spec__")
|
||||
and mod.__spec__
|
||||
and str(plugin_path) in (mod.__spec__.origin or "")
|
||||
):
|
||||
sys.modules.pop(mod.__name__)
|
||||
|
||||
return _classify_plugins(ret)
|
||||
|
||||
|
||||
def _classify_plugins(plugins: list[Plugin]) -> Plugins:
|
||||
def _classify_plugins(plugins: list[LoadedPlugin]) -> Plugins:
|
||||
archivers = []
|
||||
tags = []
|
||||
talkers = []
|
||||
|
||||
for p in plugins:
|
||||
if p.entry_point.group == "comictagger.talker":
|
||||
if p.plugin.entry_point.group == "comictagger.talker":
|
||||
talkers.append(p)
|
||||
elif p.entry_point.group == "comicapi.tags":
|
||||
elif p.plugin.entry_point.group == "comicapi.tags":
|
||||
tags.append(p)
|
||||
elif p.entry_point.group == "comicapi.archiver":
|
||||
elif p.plugin.entry_point.group == "comicapi.archiver":
|
||||
archivers.append(p)
|
||||
else:
|
||||
logger.warning(NotImplementedError(f"what plugin type? {p}"))
|
||||
|
@ -3,6 +3,7 @@ from __future__ import annotations
|
||||
import typing
|
||||
|
||||
import settngs
|
||||
import urllib3.util.url
|
||||
|
||||
import comicapi.genericmetadata
|
||||
import comicapi.merge
|
||||
@ -19,6 +20,7 @@ class SettngsNS(settngs.TypedNS):
|
||||
|
||||
Runtime_Options__config: comictaggerlib.ctsettings.types.ComicTaggerPaths
|
||||
Runtime_Options__verbose: int
|
||||
Runtime_Options__enable_quick_tag: bool
|
||||
Runtime_Options__quiet: bool
|
||||
Runtime_Options__json: bool
|
||||
Runtime_Options__raw: bool
|
||||
@ -37,6 +39,13 @@ class SettngsNS(settngs.TypedNS):
|
||||
Runtime_Options__skip_existing_tags: bool
|
||||
Runtime_Options__files: list[str]
|
||||
|
||||
Quick_Tag__url: urllib3.util.url.Url
|
||||
Quick_Tag__max: int
|
||||
Quick_Tag__simple: bool
|
||||
Quick_Tag__aggressive_filtering: bool
|
||||
Quick_Tag__hash: list[comictaggerlib.quick_tag.HashType]
|
||||
Quick_Tag__exact_only: bool
|
||||
|
||||
internal__install_id: str
|
||||
internal__write_tags: list[str]
|
||||
internal__read_tags: list[str]
|
||||
@ -132,6 +141,7 @@ class Commands(typing.TypedDict):
|
||||
class Runtime_Options(typing.TypedDict):
|
||||
config: comictaggerlib.ctsettings.types.ComicTaggerPaths
|
||||
verbose: int
|
||||
enable_quick_tag: bool
|
||||
quiet: bool
|
||||
json: bool
|
||||
raw: bool
|
||||
@ -151,6 +161,15 @@ class Runtime_Options(typing.TypedDict):
|
||||
files: list[str]
|
||||
|
||||
|
||||
class Quick_Tag(typing.TypedDict):
|
||||
url: urllib3.util.url.Url
|
||||
max: int
|
||||
simple: bool
|
||||
aggressive_filtering: bool
|
||||
hash: list[comictaggerlib.quick_tag.HashType]
|
||||
exact_only: bool
|
||||
|
||||
|
||||
class internal(typing.TypedDict):
|
||||
install_id: str
|
||||
write_tags: list[str]
|
||||
@ -263,6 +282,7 @@ SettngsDict = typing.TypedDict(
|
||||
{
|
||||
"Commands": Commands,
|
||||
"Runtime Options": Runtime_Options,
|
||||
"Quick Tag": Quick_Tag,
|
||||
"internal": internal,
|
||||
"Issue Identifier": Issue_Identifier,
|
||||
"Filename Parsing": Filename_Parsing,
|
||||
|
@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import pathlib
|
||||
import sys
|
||||
import types
|
||||
@ -15,6 +16,8 @@ from comicapi import utils
|
||||
from comicapi.comicarchive import tags
|
||||
from comicapi.genericmetadata import REMOVE, GenericMetadata
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
if sys.version_info < (3, 10):
|
||||
|
||||
@typing.no_type_check
|
||||
@ -198,44 +201,48 @@ def parse_metadata_from_string(mdstr: str) -> GenericMetadata:
|
||||
|
||||
md = GenericMetadata()
|
||||
|
||||
if not mdstr:
|
||||
return md
|
||||
if mdstr[0] == "@":
|
||||
p = pathlib.Path(mdstr[1:])
|
||||
if not p.is_file():
|
||||
raise argparse.ArgumentTypeError("Invalid filepath")
|
||||
mdstr = p.read_text()
|
||||
if mdstr[0] != "{":
|
||||
mdstr = "{" + mdstr + "}"
|
||||
try:
|
||||
if not mdstr:
|
||||
return md
|
||||
if mdstr[0] == "@":
|
||||
p = pathlib.Path(mdstr[1:])
|
||||
if not p.is_file():
|
||||
raise argparse.ArgumentTypeError("Invalid filepath")
|
||||
mdstr = p.read_text()
|
||||
if mdstr[0] != "{":
|
||||
mdstr = "{" + mdstr + "}"
|
||||
|
||||
md_dict = yaml.safe_load(mdstr)
|
||||
md_dict = yaml.safe_load(mdstr)
|
||||
|
||||
empty = True
|
||||
# Map the dict to the metadata object
|
||||
for key, value in md_dict.items():
|
||||
if hasattr(md, key):
|
||||
t = get_type(key)
|
||||
if value is None:
|
||||
value = REMOVE
|
||||
elif isinstance(t, tuple):
|
||||
if value == "":
|
||||
value = t[0]()
|
||||
empty = True
|
||||
# Map the dict to the metadata object
|
||||
for key, value in md_dict.items():
|
||||
if hasattr(md, key):
|
||||
t = get_type(key)
|
||||
if value is None:
|
||||
value = REMOVE
|
||||
elif isinstance(t, tuple):
|
||||
if value == "":
|
||||
value = t[0]()
|
||||
else:
|
||||
if isinstance(value, str):
|
||||
value = [value]
|
||||
if not isinstance(value, Collection):
|
||||
raise argparse.ArgumentTypeError(f"Invalid syntax for tag '{key}'")
|
||||
values = list(value)
|
||||
for idx, v in enumerate(values):
|
||||
if not isinstance(v, t[1]):
|
||||
values[idx] = convert_value(t[1], v)
|
||||
value = t[0](values)
|
||||
else:
|
||||
if isinstance(value, str):
|
||||
value = [value]
|
||||
if not isinstance(value, Collection):
|
||||
raise argparse.ArgumentTypeError(f"Invalid syntax for tag '{key}'")
|
||||
values = list(value)
|
||||
for idx, v in enumerate(values):
|
||||
if not isinstance(v, t[1]):
|
||||
values[idx] = convert_value(t[1], v)
|
||||
value = t[0](values)
|
||||
else:
|
||||
value = convert_value(t, value)
|
||||
value = convert_value(t, value)
|
||||
|
||||
empty = False
|
||||
setattr(md, key, value)
|
||||
else:
|
||||
raise argparse.ArgumentTypeError(f"'{key}' is not a valid tag name")
|
||||
md.is_empty = empty
|
||||
empty = False
|
||||
setattr(md, key, value)
|
||||
else:
|
||||
raise argparse.ArgumentTypeError(f"'{key}' is not a valid tag name")
|
||||
md.is_empty = empty
|
||||
except Exception as e:
|
||||
logger.exception("Unable to read metadata from the commandline '%s'", mdstr)
|
||||
raise Exception("Unable to read metadata from the commandline") from e
|
||||
return md
|
||||
|
@ -73,24 +73,23 @@ class ImageHasher:
|
||||
|
||||
return result
|
||||
|
||||
def average_hash2(self) -> None:
|
||||
"""
|
||||
# Got this one from somewhere on the net. Not a clue how the 'convolve2d' works!
|
||||
def difference_hash(self) -> int:
|
||||
try:
|
||||
image = self.image.resize((self.width + 1, self.height), Image.Resampling.LANCZOS).convert("L")
|
||||
except Exception:
|
||||
logger.exception("difference_hash error")
|
||||
return 0
|
||||
|
||||
from numpy import array
|
||||
from scipy.signal import convolve2d
|
||||
pixels = list(image.getdata())
|
||||
diff = ""
|
||||
for y in range(self.height):
|
||||
for x in range(self.width):
|
||||
idx = x + (self.width + 1 * y)
|
||||
diff += str(int(pixels[idx] < pixels[idx + 1]))
|
||||
|
||||
im = self.image.resize((self.width, self.height), Image.ANTIALIAS).convert('L')
|
||||
result = int(diff, 2)
|
||||
|
||||
in_data = array((im.getdata())).reshape(self.width, self.height)
|
||||
filt = array([[0,1,0],[1,-4,1],[0,1,0]])
|
||||
filt_data = convolve2d(in_data,filt,mode='same',boundary='symm').flatten()
|
||||
|
||||
result = reduce(lambda x, (y, z): x | (z << y),
|
||||
enumerate(map(lambda i: 0 if i < 0 else 1, filt_data)),
|
||||
0)
|
||||
return result
|
||||
"""
|
||||
|
||||
def p_hash(self) -> int:
|
||||
"""
|
||||
|
@ -44,7 +44,6 @@ if sys.version_info < (3, 10):
|
||||
import importlib_metadata
|
||||
else:
|
||||
import importlib.metadata as importlib_metadata
|
||||
|
||||
logger = logging.getLogger("comictagger")
|
||||
|
||||
|
||||
@ -117,26 +116,20 @@ class App:
|
||||
conf = self.initialize()
|
||||
self.initialize_dirs(conf.config)
|
||||
self.load_plugins(conf)
|
||||
self.register_settings()
|
||||
self.register_settings(conf.enable_quick_tag)
|
||||
self.config = self.parse_settings(conf.config)
|
||||
|
||||
self.main()
|
||||
|
||||
def load_plugins(self, opts: argparse.Namespace) -> None:
|
||||
local_plugins = plugin_finder.find_plugins(opts.config.user_plugin_dir)
|
||||
self._extend_plugin_paths(local_plugins)
|
||||
|
||||
comicapi.comicarchive.load_archive_plugins(local_plugins=[p.entry_point for p in local_plugins.archivers])
|
||||
comicapi.comicarchive.load_tag_plugins(
|
||||
version=version, local_plugins=[p.entry_point for p in local_plugins.tags]
|
||||
)
|
||||
comicapi.comicarchive.load_archive_plugins(local_plugins=[p.obj for p in local_plugins.archivers])
|
||||
comicapi.comicarchive.load_tag_plugins(version=version, local_plugins=[p.obj for p in local_plugins.tags])
|
||||
self.talkers = comictalker.get_talkers(
|
||||
version, opts.config.user_cache_dir, local_plugins=[p.entry_point for p in local_plugins.talkers]
|
||||
version, opts.config.user_cache_dir, local_plugins=[p.obj for p in local_plugins.talkers]
|
||||
)
|
||||
|
||||
def _extend_plugin_paths(self, plugins: plugin_finder.Plugins) -> None:
|
||||
sys.path.extend(str(p.path.absolute()) for p in plugins.all_plugins())
|
||||
|
||||
def list_plugins(
|
||||
self,
|
||||
talkers: Collection[comictalker.ComicTalker],
|
||||
@ -215,13 +208,13 @@ class App:
|
||||
setup_logging(conf.verbose, conf.config.user_log_dir)
|
||||
return conf
|
||||
|
||||
def register_settings(self) -> None:
|
||||
def register_settings(self, enable_quick_tag: bool) -> None:
|
||||
self.manager = settngs.Manager(
|
||||
description="A utility for reading and writing metadata to comic archives.\n\n\n"
|
||||
+ "If no options are given, %(prog)s will run in windowed mode.\nPlease keep the '-v' option separated '-so -v' not '-sov'",
|
||||
epilog="For more help visit the wiki at: https://github.com/comictagger/comictagger/wiki",
|
||||
)
|
||||
ctsettings.register_commandline_settings(self.manager)
|
||||
ctsettings.register_commandline_settings(self.manager, enable_quick_tag)
|
||||
ctsettings.register_file_settings(self.manager)
|
||||
ctsettings.register_plugin_settings(self.manager, getattr(self, "talkers", {}))
|
||||
|
||||
|
391
comictaggerlib/quick_tag.py
Normal file
391
comictaggerlib/quick_tag.py
Normal file
@ -0,0 +1,391 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import itertools
|
||||
import logging
|
||||
from enum import auto
|
||||
from io import BytesIO
|
||||
from typing import Callable, TypedDict, cast
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import requests
|
||||
import settngs
|
||||
|
||||
from comicapi import comicarchive, utils
|
||||
from comicapi.genericmetadata import GenericMetadata
|
||||
from comicapi.issuestring import IssueString
|
||||
from comictaggerlib.ctsettings.settngs_namespace import SettngsNS
|
||||
from comictaggerlib.imagehasher import ImageHasher
|
||||
from comictalker import ComicTalker
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
__version__ = "0.1"
|
||||
|
||||
|
||||
class HashType(utils.StrEnum):
|
||||
AHASH = auto()
|
||||
DHASH = auto()
|
||||
PHASH = auto()
|
||||
|
||||
|
||||
class SimpleResult(TypedDict):
|
||||
Distance: int
|
||||
# Mapping of domains (eg comicvine.gamespot.com) to IDs
|
||||
IDList: dict[str, list[str]]
|
||||
|
||||
|
||||
class Hash(TypedDict):
|
||||
Hash: int
|
||||
Kind: str
|
||||
|
||||
|
||||
class Result(TypedDict):
|
||||
# Mapping of domains (eg comicvine.gamespot.com) to IDs
|
||||
IDs: dict[str, list[str]]
|
||||
Distance: int
|
||||
Hash: Hash
|
||||
|
||||
|
||||
def ihash(types: str) -> list[HashType]:
|
||||
result: list[HashType] = []
|
||||
types = types.casefold()
|
||||
choices = ", ".join(HashType)
|
||||
for typ in utils.split(types, ","):
|
||||
if typ not in list(HashType):
|
||||
raise argparse.ArgumentTypeError(f"invalid choice: {typ} (choose from {choices.upper()})")
|
||||
result.append(HashType[typ.upper()])
|
||||
|
||||
if not result:
|
||||
raise argparse.ArgumentTypeError(f"invalid choice: {types} (choose from {choices.upper()})")
|
||||
return result
|
||||
|
||||
|
||||
def settings(manager: settngs.Manager) -> None:
|
||||
manager.add_setting(
|
||||
"--url",
|
||||
"-u",
|
||||
default="https://comic-hasher.narnian.us",
|
||||
type=utils.parse_url,
|
||||
help="Website to use for searching cover hashes",
|
||||
)
|
||||
manager.add_setting(
|
||||
"--max",
|
||||
default=8,
|
||||
type=int,
|
||||
help="Maximum score to allow. Lower score means more accurate",
|
||||
)
|
||||
manager.add_setting(
|
||||
"--simple",
|
||||
default=False,
|
||||
action=argparse.BooleanOptionalAction,
|
||||
help="Whether to retrieve simple results or full results",
|
||||
)
|
||||
manager.add_setting(
|
||||
"--aggressive-filtering",
|
||||
default=False,
|
||||
action=argparse.BooleanOptionalAction,
|
||||
help="Will filter out worse matches if better matches are found",
|
||||
)
|
||||
manager.add_setting(
|
||||
"--hash",
|
||||
default="ahash, dhash, phash",
|
||||
type=ihash,
|
||||
help="Pick what hashes you want to use to search (default: %(default)s)",
|
||||
)
|
||||
manager.add_setting(
|
||||
"--exact-only",
|
||||
default=True,
|
||||
action=argparse.BooleanOptionalAction,
|
||||
help="Skip non-exact matches if we have exact matches",
|
||||
)
|
||||
|
||||
|
||||
class QuickTag:
|
||||
def __init__(
|
||||
self, url: utils.Url, domain: str, talker: ComicTalker, config: SettngsNS, output: Callable[[str], None]
|
||||
):
|
||||
self.output = output
|
||||
self.url = url
|
||||
self.talker = talker
|
||||
self.domain = domain
|
||||
self.config = config
|
||||
|
||||
def id_comic(
|
||||
self,
|
||||
ca: comicarchive.ComicArchive,
|
||||
tags: GenericMetadata,
|
||||
simple: bool,
|
||||
hashes: set[HashType],
|
||||
exact_only: bool,
|
||||
interactive: bool,
|
||||
aggressive_filtering: bool,
|
||||
max_hamming_distance: int,
|
||||
) -> GenericMetadata | None:
|
||||
if not ca.seems_to_be_a_comic_archive():
|
||||
raise Exception(f"{ca.path} is not an archive")
|
||||
from PIL import Image
|
||||
|
||||
cover_index = tags.get_cover_page_index_list()[0]
|
||||
cover_image = Image.open(BytesIO(ca.get_page(cover_index)))
|
||||
|
||||
self.output(f"Tagging: {ca.path}")
|
||||
|
||||
self.output("hashing cover")
|
||||
phash = dhash = ahash = ""
|
||||
hasher = ImageHasher(image=cover_image)
|
||||
if HashType.AHASH in hashes:
|
||||
ahash = hex(hasher.average_hash())[2:]
|
||||
if HashType.DHASH in hashes:
|
||||
dhash = hex(hasher.difference_hash())[2:]
|
||||
if HashType.PHASH in hashes:
|
||||
phash = hex(hasher.p_hash())[2:]
|
||||
|
||||
logger.info(f"Searching with {ahash=}, {dhash=}, {phash=}")
|
||||
|
||||
self.output("Searching hashes")
|
||||
results = self.SearchHashes(simple, max_hamming_distance, ahash, dhash, phash, exact_only)
|
||||
logger.debug(f"{results=}")
|
||||
|
||||
if simple:
|
||||
filtered_simple_results = self.filter_simple_results(
|
||||
cast(list[SimpleResult], results), interactive, aggressive_filtering
|
||||
)
|
||||
metadata_simple_results = self.get_simple_results(filtered_simple_results)
|
||||
chosen_result = self.display_simple_results(metadata_simple_results, tags, interactive)
|
||||
else:
|
||||
filtered_results = self.filter_results(cast(list[Result], results), interactive, aggressive_filtering)
|
||||
metadata_results = self.get_results(filtered_results)
|
||||
chosen_result = self.display_results(metadata_results, tags, interactive)
|
||||
|
||||
return self.talker.fetch_comic_data(issue_id=chosen_result.issue_id)
|
||||
|
||||
def SearchHashes(
|
||||
self, simple: bool, max_hamming_distance: int, ahash: str, dhash: str, phash: str, exact_only: bool
|
||||
) -> list[SimpleResult] | list[Result]:
|
||||
|
||||
resp = requests.get(
|
||||
urljoin(self.url.url, "/match_cover_hash"),
|
||||
params={
|
||||
"simple": str(simple),
|
||||
"max": str(max_hamming_distance),
|
||||
"ahash": ahash,
|
||||
"dhash": dhash,
|
||||
"phash": phash,
|
||||
"exactOnly": str(exact_only),
|
||||
},
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
try:
|
||||
text = resp.json()["msg"]
|
||||
except Exception:
|
||||
text = resp.text
|
||||
if text == "No hashes found":
|
||||
return []
|
||||
logger.error("message from server: %s", text)
|
||||
raise Exception(f"Failed to retrieve results from the server: {text}")
|
||||
return resp.json()["results"]
|
||||
|
||||
def get_mds(self, results: list[SimpleResult] | list[Result]) -> list[GenericMetadata]:
|
||||
md_results: list[GenericMetadata] = []
|
||||
results.sort(key=lambda r: r["Distance"])
|
||||
all_ids = set()
|
||||
for res in results:
|
||||
all_ids.update(res.get("IDList", res.get("IDs", {})).get(self.domain, [])) # type: ignore[attr-defined]
|
||||
|
||||
self.output(f"Retrieving basic {self.talker.name} data")
|
||||
# Try to do a bulk feth of basic issue data
|
||||
if hasattr(self.talker, "fetch_comics"):
|
||||
md_results = self.talker.fetch_comics(issue_ids=list(all_ids))
|
||||
else:
|
||||
for md_id in all_ids:
|
||||
md_results.append(self.talker.fetch_comic_data(issue_id=md_id))
|
||||
return md_results
|
||||
|
||||
def get_simple_results(self, results: list[SimpleResult]) -> list[tuple[int, GenericMetadata]]:
|
||||
md_results = []
|
||||
mds = self.get_mds(results)
|
||||
|
||||
# Re-associate the md to the distance
|
||||
for res in results:
|
||||
for md in mds:
|
||||
if md.issue_id in res["IDList"].get(self.domain, []):
|
||||
md_results.append((res["Distance"], md))
|
||||
return md_results
|
||||
|
||||
def get_results(self, results: list[Result]) -> list[tuple[int, Hash, GenericMetadata]]:
|
||||
md_results = []
|
||||
mds = self.get_mds(results)
|
||||
|
||||
# Re-associate the md to the distance
|
||||
for res in results:
|
||||
for md in mds:
|
||||
if md.issue_id in res["IDs"].get(self.domain, []):
|
||||
md_results.append((res["Distance"], res["Hash"], md))
|
||||
return md_results
|
||||
|
||||
def filter_simple_results(
|
||||
self, results: list[SimpleResult], interactive: bool, aggressive_filtering: bool
|
||||
) -> list[SimpleResult]:
|
||||
# If there is a single exact match return it
|
||||
exact = [r for r in results if r["Distance"] == 0]
|
||||
if len(exact) == 1:
|
||||
logger.info("Exact result found. Ignoring any others")
|
||||
return exact
|
||||
|
||||
# If ther are more than 4 results and any are better than 6 return the first group of results
|
||||
if len(results) > 4:
|
||||
dist: list[tuple[int, list[SimpleResult]]] = []
|
||||
filtered_results: list[SimpleResult] = []
|
||||
for distance, group in itertools.groupby(results, key=lambda r: r["Distance"]):
|
||||
dist.append((distance, list(group)))
|
||||
if aggressive_filtering and dist[0][0] < 6:
|
||||
logger.info(f"Aggressive filtering is enabled. Dropping matches above {dist[0]}")
|
||||
for _, res in dist[:1]:
|
||||
filtered_results.extend(res)
|
||||
logger.debug(f"{filtered_results=}")
|
||||
return filtered_results
|
||||
return results
|
||||
|
||||
def filter_results(self, results: list[Result], interactive: bool, aggressive_filtering: bool) -> list[Result]:
|
||||
ahash_results = sorted([r for r in results if r["Hash"]["Kind"] == "ahash"], key=lambda r: r["Distance"])
|
||||
dhash_results = sorted([r for r in results if r["Hash"]["Kind"] == "dhash"], key=lambda r: r["Distance"])
|
||||
phash_results = sorted([r for r in results if r["Hash"]["Kind"] == "phash"], key=lambda r: r["Distance"])
|
||||
hash_results = [phash_results, dhash_results, ahash_results]
|
||||
|
||||
# If any of the hash types have a single exact match return it. Prefer phash for no particular reason
|
||||
for hashed_result in hash_results:
|
||||
exact = [r for r in hashed_result if r["Distance"] == 0]
|
||||
if len(exact) == 1:
|
||||
logger.info(f"Exact {exact[0]['Hash']['Kind']} result found. Ignoring any others")
|
||||
return exact
|
||||
|
||||
results_filtered = False
|
||||
# If any of the hash types have more than 4 results and they have results better than 6 return the first group of results for each hash type
|
||||
for i, hashed_results in enumerate(hash_results):
|
||||
filtered_results: list[Result] = []
|
||||
if len(hashed_results) > 4:
|
||||
dist: list[tuple[int, list[Result]]] = []
|
||||
for distance, group in itertools.groupby(hashed_results, key=lambda r: r["Distance"]):
|
||||
dist.append((distance, list(group)))
|
||||
if aggressive_filtering and dist[0][0] < 6:
|
||||
logger.info(
|
||||
f"Aggressive filtering is enabled. Dropping {dist[0][1][0]['Hash']['Kind']} matches above {dist[0][0]}"
|
||||
)
|
||||
for _, res in dist[:1]:
|
||||
filtered_results.extend(res)
|
||||
|
||||
if filtered_results:
|
||||
hash_results[i] = filtered_results
|
||||
results_filtered = True
|
||||
if results_filtered:
|
||||
logger.debug(f"filtered_results={list(itertools.chain(*hash_results))}")
|
||||
return list(itertools.chain(*hash_results))
|
||||
|
||||
def display_simple_results(
|
||||
self, md_results: list[tuple[int, GenericMetadata]], tags: GenericMetadata, interactive: bool
|
||||
) -> GenericMetadata:
|
||||
if len(md_results) < 1:
|
||||
return GenericMetadata()
|
||||
if len(md_results) == 1 and md_results[0][0] <= 4:
|
||||
self.output("Found a single match <=4. Assuming it's correct")
|
||||
return md_results[0][1]
|
||||
series_match: list[GenericMetadata] = []
|
||||
for score, md in md_results:
|
||||
if (
|
||||
score < 10
|
||||
and tags.series
|
||||
and md.series
|
||||
and utils.titles_match(tags.series, md.series)
|
||||
and IssueString(tags.issue).as_string() == IssueString(md.issue).as_string()
|
||||
):
|
||||
series_match.append(md)
|
||||
if len(series_match) == 1:
|
||||
self.output(f"Found match with series name {series_match[0].series!r}")
|
||||
return series_match[0]
|
||||
|
||||
if not interactive:
|
||||
return GenericMetadata()
|
||||
|
||||
md_results.sort(key=lambda r: (r[0], len(r[1].publisher or "")))
|
||||
for counter, r in enumerate(md_results, 1):
|
||||
self.output(
|
||||
" {:2}. score: {} [{:15}] ({:02}/{:04}) - {} #{} - {}".format(
|
||||
counter,
|
||||
r[0],
|
||||
r[1].publisher,
|
||||
r[1].month or 0,
|
||||
r[1].year or 0,
|
||||
r[1].series,
|
||||
r[1].issue,
|
||||
r[1].title,
|
||||
),
|
||||
)
|
||||
while True:
|
||||
i = input(
|
||||
f'Please select a result to tag the comic with or "q" to quit: [1-{len(md_results)}] ',
|
||||
).casefold()
|
||||
if i.isdigit() and int(i) in range(1, len(md_results) + 1):
|
||||
break
|
||||
if i == "q":
|
||||
logger.warning("User quit without saving metadata")
|
||||
return GenericMetadata()
|
||||
|
||||
return md_results[int(i) - 1][1]
|
||||
|
||||
def display_results(
|
||||
self,
|
||||
md_results: list[tuple[int, Hash, GenericMetadata]],
|
||||
tags: GenericMetadata,
|
||||
interactive: bool,
|
||||
) -> GenericMetadata:
|
||||
if len(md_results) < 1:
|
||||
return GenericMetadata()
|
||||
if len(md_results) == 1 and md_results[0][0] <= 4:
|
||||
self.output("Found a single match <=4. Assuming it's correct")
|
||||
return md_results[0][2]
|
||||
series_match: dict[str, tuple[int, Hash, GenericMetadata]] = {}
|
||||
for score, cover_hash, md in md_results:
|
||||
if (
|
||||
score < 10
|
||||
and tags.series
|
||||
and md.series
|
||||
and utils.titles_match(tags.series, md.series)
|
||||
and IssueString(tags.issue).as_string() == IssueString(md.issue).as_string()
|
||||
):
|
||||
assert md.issue_id
|
||||
series_match[md.issue_id] = (score, cover_hash, md)
|
||||
|
||||
if len(series_match) == 1:
|
||||
score, cover_hash, md = list(series_match.values())[0]
|
||||
self.output(f"Found {cover_hash['Kind']} {score=} match with series name {md.series!r}")
|
||||
return md
|
||||
if not interactive:
|
||||
return GenericMetadata()
|
||||
md_results.sort(key=lambda r: (r[0], len(r[2].publisher or ""), r[1]["Kind"]))
|
||||
for counter, r in enumerate(md_results, 1):
|
||||
self.output(
|
||||
" {:2}. score: {} {}: {:064b} [{:15}] ({:02}/{:04}) - {} #{} - {}".format(
|
||||
counter,
|
||||
r[0],
|
||||
r[1]["Kind"],
|
||||
r[1]["Hash"],
|
||||
r[2].publisher or "",
|
||||
r[2].month or 0,
|
||||
r[2].year or 0,
|
||||
r[2].series or "",
|
||||
r[2].issue or "",
|
||||
r[2].title or "",
|
||||
),
|
||||
)
|
||||
while True:
|
||||
i = input(
|
||||
f'Please select a result to tag the comic with or "q" to quit: [1-{len(md_results)}] ',
|
||||
).casefold()
|
||||
if i.isdigit() and int(i) in range(1, len(md_results) + 1):
|
||||
break
|
||||
if i == "q":
|
||||
self.output("User quit without saving metadata")
|
||||
return GenericMetadata()
|
||||
|
||||
return md_results[int(i) - 1][2]
|
@ -228,8 +228,8 @@ class TaggerWindow(QtWidgets.QMainWindow):
|
||||
if tag_id not in tags:
|
||||
config[0].internal__read_tags.remove(tag_id)
|
||||
|
||||
self.selected_write_tags: list[str] = config[0].internal__write_tags
|
||||
self.selected_read_tags: list[str] = config[0].internal__read_tags
|
||||
self.selected_write_tags: list[str] = config[0].internal__write_tags or ["cr"]
|
||||
self.selected_read_tags: list[str] = config[0].internal__read_tags or ["cr"]
|
||||
|
||||
self.setAcceptDrops(True)
|
||||
self.view_tag_actions, self.remove_tag_actions = self.tag_actions()
|
||||
|
@ -9,9 +9,9 @@ from collections.abc import Sequence
|
||||
from packaging.version import InvalidVersion, parse
|
||||
|
||||
if sys.version_info < (3, 10):
|
||||
from importlib_metadata import EntryPoint, entry_points
|
||||
from importlib_metadata import entry_points
|
||||
else:
|
||||
from importlib.metadata import entry_points, EntryPoint
|
||||
from importlib.metadata import entry_points
|
||||
|
||||
from comictalker.comictalker import ComicTalker, TalkerError
|
||||
|
||||
@ -24,14 +24,14 @@ __all__ = [
|
||||
|
||||
|
||||
def get_talkers(
|
||||
version: str, cache: pathlib.Path, local_plugins: Sequence[EntryPoint] = tuple()
|
||||
version: str, cache: pathlib.Path, local_plugins: Sequence[type[ComicTalker]] = tuple()
|
||||
) -> dict[str, ComicTalker]:
|
||||
"""Returns all comic talker instances"""
|
||||
talkers: dict[str, ComicTalker] = {}
|
||||
ct_version = parse(version)
|
||||
|
||||
# A dict is used, last plugin wins
|
||||
for talker in itertools.chain(entry_points(group="comictagger.talker"), local_plugins):
|
||||
for talker in itertools.chain(entry_points(group="comictagger.talker")):
|
||||
try:
|
||||
talker_cls = talker.load()
|
||||
obj = talker_cls(version, cache)
|
||||
@ -56,4 +56,26 @@ def get_talkers(
|
||||
except Exception:
|
||||
logger.exception("Failed to load talker: %s", talker.name)
|
||||
|
||||
# A dict is used, last plugin wins
|
||||
for talker_cls in local_plugins:
|
||||
try:
|
||||
obj = talker_cls(version, cache)
|
||||
try:
|
||||
if ct_version >= parse(talker_cls.comictagger_min_ver):
|
||||
talkers[talker_cls.id] = obj
|
||||
else:
|
||||
logger.error(
|
||||
f"Minimum ComicTagger version required of {talker_cls.comictagger_min_ver} for talker {talker_cls.id} is not met, will NOT load talker"
|
||||
)
|
||||
except InvalidVersion:
|
||||
logger.warning(
|
||||
f"Invalid minimum required ComicTagger version number for talker: {talker_cls.id} - version: {talker_cls.comictagger_min_ver}, will load talker anyway"
|
||||
)
|
||||
# Attempt to use the talker anyway
|
||||
# TODO flag this problem for later display to the user
|
||||
talkers[talker_cls.id] = obj
|
||||
|
||||
except Exception:
|
||||
logger.exception("Failed to load talker: %s", talker_cls.id)
|
||||
|
||||
return talkers
|
||||
|
@ -295,4 +295,9 @@ class ComicCacher:
|
||||
set_slots += key + " = ?"
|
||||
|
||||
sql_ins = f"INSERT OR REPLACE INTO {tablename} ({keys}) VALUES ({ins_slots})"
|
||||
if not data.get("complete", True):
|
||||
sql_ins += f" ON CONFLICT DO UPDATE SET {set_slots} WHERE complete != ?"
|
||||
vals.extend(vals)
|
||||
vals.append(True) # If the cache is complete and this isn't complete we don't update it
|
||||
|
||||
cur.execute(sql_ins, vals)
|
||||
|
@ -21,7 +21,7 @@ from urllib.parse import urlsplit
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def fix_url(url: str) -> str:
|
||||
def fix_url(url: str | None) -> str:
|
||||
if not url:
|
||||
return ""
|
||||
tmp_url = urlsplit(url)
|
||||
|
@ -43,6 +43,8 @@ except ImportError:
|
||||
import requests
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
TWITTER_TOO_MANY_REQUESTS = 420
|
||||
|
||||
|
||||
class CVTypeID:
|
||||
Volume = "4050" # CV uses volume to mean series
|
||||
@ -157,8 +159,8 @@ class CVResult(TypedDict, Generic[T]):
|
||||
|
||||
# https://comicvine.gamespot.com/forums/api-developers-2334/api-rate-limiting-1746419/
|
||||
# "Space out your requests so AT LEAST one second passes between each and you can make requests all day."
|
||||
custom_limiter = Limiter(RequestRate(10, 10))
|
||||
default_limiter = Limiter(RequestRate(1, 5))
|
||||
custom_limiter = Limiter(RequestRate(10, 10), RequestRate(200, 1 * 60 * 60))
|
||||
default_limiter = Limiter(RequestRate(1, 10), RequestRate(100, 1 * 60 * 60))
|
||||
|
||||
|
||||
class ComicVineTalker(ComicTalker):
|
||||
@ -171,7 +173,7 @@ class ComicVineTalker(ComicTalker):
|
||||
f"<a href='{website}'>{name}</a> has the largest collection of comic book data available through "
|
||||
f"its public facing API. "
|
||||
f"<p>NOTE: Using the default API key will serverly limit access times. A personal API "
|
||||
f"key will allow for a <b>5 times increase</b> in online search speed. See the "
|
||||
f"key will allow for a <b>10 times increase</b> in online search speed. See the "
|
||||
"<a href='https://github.com/comictagger/comictagger/wiki/UserGuide#comic-vine'>Wiki page</a> for "
|
||||
"more information.</p>"
|
||||
)
|
||||
@ -410,11 +412,140 @@ class ComicVineTalker(ComicTalker):
|
||||
|
||||
return formatted_filtered_issues_result
|
||||
|
||||
def fetch_comics(self, *, issue_ids: list[str]) -> list[GenericMetadata]:
|
||||
# before we search online, look in our cache, since we might already have this info
|
||||
cvc = ComicCacher(self.cache_folder, self.version)
|
||||
cached_results: list[GenericMetadata] = []
|
||||
needed_issues: list[int] = []
|
||||
for issue_id in issue_ids:
|
||||
cached_issue = cvc.get_issue_info(issue_id, self.id)
|
||||
|
||||
if cached_issue and cached_issue[1]:
|
||||
cached_results.append(
|
||||
self._map_comic_issue_to_metadata(
|
||||
json.loads(cached_issue[0].data),
|
||||
self._fetch_series([int(cached_issue[0].series_id)])[0][0],
|
||||
),
|
||||
)
|
||||
else:
|
||||
needed_issues.append(int(issue_id)) # CV uses integers for it's IDs
|
||||
|
||||
if not needed_issues:
|
||||
return cached_results
|
||||
issue_filter = ""
|
||||
for iid in needed_issues:
|
||||
issue_filter += str(iid) + "|"
|
||||
flt = "id:" + issue_filter.rstrip("|")
|
||||
|
||||
issue_url = urljoin(self.api_url, "issues/")
|
||||
params: dict[str, Any] = {
|
||||
"api_key": self.api_key,
|
||||
"format": "json",
|
||||
"filter": flt,
|
||||
}
|
||||
cv_response: CVResult[list[CVIssue]] = self._get_cv_content(issue_url, params)
|
||||
|
||||
issue_results = cv_response["results"]
|
||||
page = 1
|
||||
offset = 0
|
||||
current_result_count = cv_response["number_of_page_results"]
|
||||
total_result_count = cv_response["number_of_total_results"]
|
||||
|
||||
# see if we need to keep asking for more pages...
|
||||
while current_result_count < total_result_count:
|
||||
page += 1
|
||||
offset += cv_response["number_of_page_results"]
|
||||
|
||||
params["offset"] = offset
|
||||
cv_response = self._get_cv_content(issue_url, params)
|
||||
|
||||
issue_results.extend(cv_response["results"])
|
||||
current_result_count += cv_response["number_of_page_results"]
|
||||
|
||||
series_info = {s[0].id: s[0] for s in self._fetch_series([int(i["volume"]["id"]) for i in issue_results])}
|
||||
|
||||
for issue in issue_results:
|
||||
cvc.add_issues_info(
|
||||
self.id,
|
||||
[
|
||||
Issue(
|
||||
id=str(issue["id"]),
|
||||
series_id=str(issue["volume"]["id"]),
|
||||
data=json.dumps(issue).encode("utf-8"),
|
||||
),
|
||||
],
|
||||
False, # The /issues/ endpoint never provides credits
|
||||
)
|
||||
cached_results.append(
|
||||
self._map_comic_issue_to_metadata(issue, series_info[str(issue["volume"]["id"])]),
|
||||
)
|
||||
|
||||
return cached_results
|
||||
|
||||
def _fetch_series(self, series_ids: list[int]) -> list[tuple[ComicSeries, bool]]:
|
||||
# before we search online, look in our cache, since we might already have this info
|
||||
cvc = ComicCacher(self.cache_folder, self.version)
|
||||
cached_results: list[tuple[ComicSeries, bool]] = []
|
||||
needed_series: list[int] = []
|
||||
for series_id in series_ids:
|
||||
cached_series = cvc.get_series_info(str(series_id), self.id)
|
||||
if cached_series is not None:
|
||||
cached_results.append((self._format_series(json.loads(cached_series[0].data)), cached_series[1]))
|
||||
else:
|
||||
needed_series.append(series_id)
|
||||
|
||||
if needed_series == []:
|
||||
return cached_results
|
||||
|
||||
series_filter = ""
|
||||
for vid in needed_series:
|
||||
series_filter += str(vid) + "|"
|
||||
flt = "id:" + series_filter.rstrip("|") # CV uses volume to mean series
|
||||
|
||||
series_url = urljoin(self.api_url, "volumes/") # CV uses volume to mean series
|
||||
params: dict[str, Any] = {
|
||||
"api_key": self.api_key,
|
||||
"format": "json",
|
||||
"filter": flt,
|
||||
}
|
||||
cv_response: CVResult[list[CVSeries]] = self._get_cv_content(series_url, params)
|
||||
|
||||
series_results = cv_response["results"]
|
||||
page = 1
|
||||
offset = 0
|
||||
current_result_count = cv_response["number_of_page_results"]
|
||||
total_result_count = cv_response["number_of_total_results"]
|
||||
|
||||
# see if we need to keep asking for more pages...
|
||||
while current_result_count < total_result_count:
|
||||
page += 1
|
||||
offset += cv_response["number_of_page_results"]
|
||||
|
||||
params["offset"] = offset
|
||||
cv_response = self._get_cv_content(series_url, params)
|
||||
|
||||
series_results.extend(cv_response["results"])
|
||||
current_result_count += cv_response["number_of_page_results"]
|
||||
|
||||
if series_results:
|
||||
for series in series_results:
|
||||
cvc.add_series_info(
|
||||
self.id,
|
||||
Series(id=str(series["id"]), data=json.dumps(series).encode("utf-8")),
|
||||
True,
|
||||
)
|
||||
cached_results.append((self._format_series(series), True))
|
||||
|
||||
return cached_results
|
||||
|
||||
def _get_cv_content(self, url: str, params: dict[str, Any]) -> CVResult[T]:
|
||||
"""
|
||||
Get the content from the CV server.
|
||||
"""
|
||||
with self.limiter.ratelimit("cv", delay=True):
|
||||
ratelimit_key = url
|
||||
if self.api_key == self.default_api_key:
|
||||
ratelimit_key = "cv"
|
||||
with self.limiter.ratelimit(ratelimit_key, delay=True):
|
||||
cv_response: CVResult[T] = self._get_url_content(url, params)
|
||||
|
||||
if cv_response["status_code"] != 1:
|
||||
@ -439,7 +570,7 @@ class ComicVineTalker(ComicTalker):
|
||||
time.sleep(1)
|
||||
logger.debug(str(resp.status_code))
|
||||
|
||||
if resp.status_code == requests.status_codes.codes.TOO_MANY_REQUESTS:
|
||||
if resp.status_code in (requests.status_codes.codes.TOO_MANY_REQUESTS, TWITTER_TOO_MANY_REQUESTS):
|
||||
logger.info(f"{self.name} rate limit encountered. Waiting for 10 seconds\n")
|
||||
time.sleep(10)
|
||||
limit_counter += 1
|
||||
|
@ -50,6 +50,7 @@ install_requires =
|
||||
requests==2.*
|
||||
settngs==0.10.4
|
||||
text2digits
|
||||
tomli
|
||||
typing-extensions>=4.3.0
|
||||
wordninja
|
||||
python_requires = >=3.9
|
||||
@ -66,8 +67,6 @@ comicapi.archiver =
|
||||
folder = comicapi.archivers.folder:FolderArchiver
|
||||
comicapi.tags =
|
||||
cr = comicapi.tags.comicrack:ComicRack
|
||||
cbi = comicapi.tags.comicbookinfo:ComicBookInfo
|
||||
comet = comicapi.tags.comet:CoMet
|
||||
comictagger.talker =
|
||||
comicvine = comictalker.talkers.comicvine:ComicVineTalker
|
||||
pyinstaller40 =
|
||||
@ -88,7 +87,7 @@ QTW =
|
||||
all =
|
||||
PyQt5
|
||||
PyQtWebEngine
|
||||
comicinfoxml>=0.2.0
|
||||
comicinfoxml==0.4.*
|
||||
gcd-talker>0.1.0
|
||||
metron-talker>0.1.5
|
||||
pillow-avif-plugin>=1.4.1
|
||||
@ -96,10 +95,12 @@ all =
|
||||
py7zr
|
||||
rarfile>=4.0
|
||||
pyicu;sys_platform == 'linux' or sys_platform == 'darwin'
|
||||
archived_tags =
|
||||
ct-archived-tags
|
||||
avif =
|
||||
pillow-avif-plugin>=1.4.1
|
||||
cix =
|
||||
comicinfoxml>=0.2.0
|
||||
comicinfoxml==0.4.*
|
||||
gcd =
|
||||
gcd-talker>=0.1.0
|
||||
jxl =
|
||||
|
@ -52,15 +52,6 @@ def test_write_cr(tmp_comic):
|
||||
md = tmp_comic.read_tags("cr")
|
||||
|
||||
|
||||
def test_write_cbi(tmp_comic):
|
||||
md = tmp_comic.read_tags("cr")
|
||||
md.apply_default_page_list(tmp_comic.get_page_name_list())
|
||||
|
||||
assert tmp_comic.write_tags(md, "cbi")
|
||||
|
||||
md = tmp_comic.read_tags("cbi")
|
||||
|
||||
|
||||
@pytest.mark.xfail(not (comicapi.archivers.rar.rar_support and shutil.which("rar")), reason="rar support")
|
||||
def test_save_cr_rar(tmp_path, md_saved):
|
||||
cbr_path = datadir / "fake_cbr.cbr"
|
||||
@ -78,20 +69,6 @@ def test_save_cr_rar(tmp_path, md_saved):
|
||||
assert md == md_saved
|
||||
|
||||
|
||||
@pytest.mark.xfail(not (comicapi.archivers.rar.rar_support and shutil.which("rar")), reason="rar support")
|
||||
def test_save_cbi_rar(tmp_path, md_saved):
|
||||
cbr_path = pathlib.Path(str(datadir)) / "fake_cbr.cbr"
|
||||
shutil.copy(cbr_path, tmp_path)
|
||||
|
||||
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_path / cbr_path.name)
|
||||
assert tmp_comic.seems_to_be_a_comic_archive()
|
||||
assert tmp_comic.write_tags(comicapi.genericmetadata.md_test, "cbi")
|
||||
|
||||
md = tmp_comic.read_tags("cbi")
|
||||
supported_attributes = comicapi.comicarchive.tags["cbi"].supported_attributes
|
||||
assert md.get_clean_metadata(*supported_attributes) == md_saved.get_clean_metadata(*supported_attributes)
|
||||
|
||||
|
||||
def test_page_type_write(tmp_comic):
|
||||
md = tmp_comic.read_tags("cr")
|
||||
t = md.pages[0]
|
||||
|
@ -28,10 +28,32 @@ def test_search_results(comic_cache):
|
||||
@pytest.mark.parametrize("series_info", search_results)
|
||||
def test_series_info(comic_cache, series_info):
|
||||
comic_cache.add_series_info(
|
||||
series=comictalker.comiccacher.Series(id=series_info["id"], data=json.dumps(series_info)),
|
||||
series=comictalker.comiccacher.Series(id=series_info["id"], data=json.dumps(series_info).encode("utf-8")),
|
||||
source="test",
|
||||
complete=True,
|
||||
)
|
||||
vi = series_info.copy()
|
||||
cache_result = json.loads(comic_cache.get_series_info(series_id=series_info["id"], source="test")[0].data)
|
||||
assert vi == cache_result
|
||||
|
||||
|
||||
@pytest.mark.parametrize("series_info", search_results)
|
||||
def test_cache_overwrite(comic_cache, series_info):
|
||||
vi = series_info.copy()
|
||||
comic_cache.add_series_info(
|
||||
series=comictalker.comiccacher.Series(id=series_info["id"], data=json.dumps(series_info).encode("utf-8")),
|
||||
source="test",
|
||||
complete=True,
|
||||
) # Populate the cache
|
||||
|
||||
# Try to insert an incomplete series with different data
|
||||
series_info["name"] = "test 3"
|
||||
comic_cache.add_series_info(
|
||||
series=comictalker.comiccacher.Series(id=series_info["id"], data=json.dumps(series_info).encode("utf-8")),
|
||||
source="test",
|
||||
complete=False,
|
||||
)
|
||||
cache_result = json.loads(comic_cache.get_series_info(series_id=series_info["id"], source="test")[0].data)
|
||||
|
||||
# Validate that the Series marked complete is still in the cache
|
||||
assert vi == cache_result
|
||||
|
@ -197,7 +197,7 @@ def config(tmp_path):
|
||||
from comictaggerlib.main import App
|
||||
|
||||
app = App()
|
||||
app.register_settings()
|
||||
app.register_settings(False)
|
||||
|
||||
defaults = app.parse_settings(comictaggerlib.ctsettings.ComicTaggerPaths(tmp_path / "config"), "")
|
||||
defaults[0].Runtime_Options__config.user_config_dir.mkdir(parents=True, exist_ok=True)
|
||||
@ -214,7 +214,7 @@ def plugin_config(tmp_path):
|
||||
ns = Namespace(config=comictaggerlib.ctsettings.ComicTaggerPaths(tmp_path / "config"))
|
||||
app = App()
|
||||
app.load_plugins(ns)
|
||||
app.register_settings()
|
||||
app.register_settings(False)
|
||||
|
||||
defaults = app.parse_settings(ns.config, "")
|
||||
defaults[0].Runtime_Options__config.user_config_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
@ -9,12 +9,15 @@ from comictaggerlib.md import prepare_metadata
|
||||
|
||||
tags = []
|
||||
|
||||
for x in entry_points(group="comicapi.tag"):
|
||||
for x in entry_points(group="comicapi.tags"):
|
||||
tag = x.load()
|
||||
supported = tag.enabled
|
||||
exe_found = True
|
||||
tags.append(pytest.param(tag, marks=pytest.mark.xfail(not supported, reason="tags not enabled")))
|
||||
|
||||
if not tags:
|
||||
raise Exception("No tags found")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("tag_type", tags)
|
||||
def test_metadata(mock_version, tmp_comic, md_saved, tag_type):
|
||||
@ -22,20 +25,24 @@ def test_metadata(mock_version, tmp_comic, md_saved, tag_type):
|
||||
supported_attributes = tag.supported_attributes
|
||||
tag.write_tags(comicapi.genericmetadata.md_test, tmp_comic.archiver)
|
||||
written_metadata = tag.read_tags(tmp_comic.archiver)
|
||||
md = md_saved.get_clean_metadata(*supported_attributes)
|
||||
md = md_saved._get_clean_metadata(*supported_attributes)
|
||||
|
||||
# Hack back in the pages variable because CoMet supports identifying the cover by the filename
|
||||
if tag.id == "comet":
|
||||
md.pages = [
|
||||
comicapi.genericmetadata.ImageMetadata(
|
||||
image_index=0, filename="!cover.jpg", type=comicapi.genericmetadata.PageType.FrontCover
|
||||
comicapi.genericmetadata.PageMetadata(
|
||||
archive_index=0,
|
||||
bookmark="",
|
||||
display_index=0,
|
||||
filename="!cover.jpg",
|
||||
type=comicapi.genericmetadata.PageType.FrontCover,
|
||||
)
|
||||
]
|
||||
written_metadata = written_metadata.get_clean_metadata(*supported_attributes).replace(
|
||||
written_metadata = written_metadata._get_clean_metadata(*supported_attributes).replace(
|
||||
pages=written_metadata.pages
|
||||
)
|
||||
else:
|
||||
written_metadata = written_metadata.get_clean_metadata(*supported_attributes)
|
||||
written_metadata = written_metadata._get_clean_metadata(*supported_attributes)
|
||||
|
||||
assert written_metadata == md
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user