Files
comictagger/comicapi/comicarchive.py
2025-09-01 18:33:10 -07:00

629 lines
22 KiB
Python

"""A class to represent a single comic, be it file or folder of images"""
# Copyright 2012-2014 ComicTagger Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import hashlib
import importlib.util
import inspect
import io
import itertools
import logging
import os
import pathlib
import shutil
import sys
from collections.abc import Collection, Iterable
from comicapi import utils
from comicapi.comic import ComicFile, UnknownArchiver, WrongType
from comicapi.genericmetadata import FileHash, GenericMetadata
from comicapi.tags import Tag
from comicapi.tags.tag import TagLocation
from comictaggerlib.ctversion import version
logger = logging.getLogger(__name__)
archivers: list[type[ComicFile]] = []
loaded_tags: dict[str, Tag] = {}
def load_archive_plugins(local_plugins: Iterable[type[ComicFile]] = tuple()) -> None:
if archivers:
return
if sys.version_info < (3, 10):
from importlib_metadata import entry_points
else:
from importlib.metadata import entry_points
builtin: list[type[ComicFile]] = []
archive_plugins: list[type[ComicFile]] = []
# A list is used first matching plugin wins
for ep in itertools.chain(entry_points(group="comicapi.archiver")):
try:
spec = importlib.util.find_spec(ep.module)
except ValueError:
spec = None
try:
archiver: type[ComicFile] = ep.load()
if not archiver.enabled:
logger.info("Archiver %r (%s) is disabled. Refusing to load archiver plugin", archiver.name, ep.name)
continue
if ep.module.startswith("comicapi"):
builtin.append(archiver)
else:
archive_plugins.append(archiver)
except Exception:
if spec and spec.has_location:
logger.exception("Failed to load archive plugin: %s from %s", ep.name, spec.origin)
else:
logger.exception("Failed to load archive plugin: %s", ep.name)
archivers.clear()
archivers.extend(local_plugins)
archivers.extend(archive_plugins)
archivers.extend(builtin)
__custom_tags: dict[str, type[Tag]] = {}
def custom_tag(comic_file: type[ComicFile]) -> type[Tag]:
tag_id = f"custom_{comic_file.__name__.lower()}"
if tag_id in __custom_tags:
return __custom_tags[tag_id]
class ClassName(Tag):
id = tag_id
name = comic_file.name
enabled = comic_file.enabled
location = TagLocation.CUSTOM
supported_attributes = comic_file.supported_attributes
_comic_file = comic_file
ClassName.__name__ = comic_file.__name__ + "Tag"
return __custom_tags.setdefault(tag_id, ClassName)
def load_tag_plugins(version: str = f"ComicAPI/{version}", local_plugins: Iterable[type[Tag]] = tuple()) -> None:
if loaded_tags:
return
if sys.version_info < (3, 10):
from importlib_metadata import entry_points
else:
from importlib.metadata import entry_points
builtin: dict[str, Tag] = {}
tag_plugins: dict[str, tuple[Tag, str]] = {}
custom_tag_plugins: dict[str, Tag] = {}
# A dict is used, last plugin wins
for ep in entry_points(group="comicapi.tags"):
location = "Unknown"
try:
_spec = importlib.util.find_spec(ep.module)
if _spec and _spec.has_location and _spec.origin:
location = _spec.origin
except ValueError:
location = "Unknown"
try:
tagClass: type[Tag] = ep.load()
tag = tagClass() # tags are instantiated only because it makes typing simpler
if not tag.enabled:
logger.info("Tag %r (%s) is disabled. Refusing to load tag plugin", tag.name, ep.name)
continue
if ep.module.startswith("comicapi"):
builtin[tag.id] = tag
else:
if tag.id in tag_plugins:
logger.warning(
"Plugin %s from %s is overriding the existing plugin for %s tags",
ep.module,
location,
tag.id,
)
tag_plugins[tag.id] = (tag, location)
except Exception:
logger.exception("Failed to load tag plugin: %s from %s", ep.name, location)
# A dict is used, last plugin wins
for tagClass in local_plugins:
tag = tagClass() # tags are instantiated only because it makes typing simpler
if not tag.enabled:
logger.info("Local Tag %r (%s) is disabled. Refusing to load tag plugin", tag.name, tag.id)
continue
tag_plugins[tag.id] = (tag, "Local")
for archive in archivers:
if TagLocation.CUSTOM in archive.tag_locations:
tag = custom_tag(archive)()
custom_tag_plugins[tag.id] = tag
for tag_id in set(builtin.keys()).intersection(tag_plugins):
location = tag_plugins[tag_id][1]
logger.warning("Builtin plugin for %s tags are being overridden by a plugin from %s", tag_id, location)
loaded_tags.clear()
loaded_tags.update(builtin)
loaded_tags.update({s[0]: s[1][0] for s in tag_plugins.items()})
loaded_tags.update(custom_tag_plugins)
class ComicArchive:
logo_data = b""
def __init__(
self,
path: pathlib.Path | ComicFile,
default_image_path: pathlib.Path | None = None,
hash_archive: str = "",
) -> None:
self.md: dict[str, GenericMetadata] = {}
self.page_count: int | None = None
self.page_list: list[str] = []
self.hash_archive = hash_archive
self.Archiver: type[ComicFile] = UnknownArchiver
self.archiver: ComicFile | None = None
self.reset_cache()
self.default_image_path = default_image_path
if isinstance(path, pathlib.Path):
self.path = pathlib.Path(path).absolute()
load_archive_plugins()
load_tag_plugins()
tried_archivers = []
for archiver in archivers:
if self.path.suffix not in archiver.supported_extensions:
continue
tried_archivers.append(archiver)
try:
archiver.check_path(self.path)
self.Archiver = archiver
break
except WrongType:
continue
if self.Archiver == UnknownArchiver:
for archiver in archivers:
if archiver in tried_archivers:
continue
try:
archiver.check_path(self.path)
self.Archiver = archiver
break
except WrongType:
continue
else:
self.path = path.path
self.archiver = path
self.Archiver = type(path)
if not ComicArchive.logo_data and self.default_image_path:
with self.default_image_path.open(mode="rb") as fd:
ComicArchive.logo_data = fd.read()
def reset_cache(self) -> None:
"""Clears the cached data"""
self.page_count = None
self.page_list.clear()
self.md.clear()
def _open_archive(self) -> ComicFile:
if self.Archiver is UnknownArchiver:
raise Exception("Archive not opened")
if self.archiver is None:
self.archiver = self.Archiver(self.path)
return self.archiver
def get_supported_tags(self, tags: Collection[Tag] = loaded_tags.values()) -> list[Tag]:
allowed_locations = self.Archiver.tag_locations - {TagLocation.CUSTOM}
allowed_tags = [tag for tag in tags if tag.location in allowed_locations]
if TagLocation.CUSTOM in self.Archiver.tag_locations:
allowed_tags.append(custom_tag(self.Archiver)())
return allowed_tags
def _supported_tag(self, tag: Tag) -> None:
if tag.location not in self.Archiver.tag_locations:
raise Exception(f"{tag.name} tags Not Supported for Comic {self.Archiver.name}")
if tag.location == TagLocation.CUSTOM:
if tag._comic_file != self.Archiver: # type: ignore[attr-defined]
raise Exception(f"{tag.name} tags are only supported on {tag._comic_file.name} not {self.Archiver.name}") # type: ignore[attr-defined]
def rename(self, path: pathlib.Path) -> None:
if self.archiver is not None:
# self.archiver.close()
self.archiver = None
new_path = path.absolute()
if new_path == self.path:
return
os.makedirs(new_path.parent, 0o777, True)
shutil.move(self.path, new_path)
self.path = new_path
def is_writable(self, check_archive_status: bool = True) -> bool:
if not (os.access(self.path, os.W_OK) or os.access(self.path.parent, os.W_OK)):
return False
if check_archive_status:
self.archiver = self._open_archive()
if not self.archiver.is_writable():
return False
return True
def is_zip(self) -> bool:
return self.Archiver.extension == ".cbz"
def seems_to_be_a_comic_archive(self) -> bool:
if self.Archiver is UnknownArchiver:
return False
try:
self.Archiver.check_path(self.path)
return self.get_number_of_pages() > 0
except Exception:
...
return False
def extension(self) -> str:
return self.Archiver.extension
def read_tags(self, tag: Tag) -> GenericMetadata:
self._supported_tag(tag)
if tag.id in self.md:
return self.md[tag.id]
md = GenericMetadata()
if tag.location == TagLocation.COMMENT:
a = self._open_archive()
comment = a.read_comment()
if not tag.validate_tags(comment.encode(encoding="utf-8")):
return md
md = tag.load_tags(comment.encode(encoding="utf-8"))
if tag.location == TagLocation.FILE:
filename = self._find_file(tag)
if filename == "":
return md
a = self._open_archive()
file_content = a.read_file(filename)
if not tag.validate_tags(file_content):
return md
md = tag.load_tags(file_content)
if tag.location == TagLocation.CUSTOM:
a = self._open_archive()
if not a.has_tags():
return md
md = a.load_tags()
md.apply_default_page_list(self.get_page_name_list())
return md
def _find_file(self, tag: Tag) -> str:
a = self._open_archive()
filenames = a.get_filename_list()
if not filenames:
return ""
if tag.filename_match[0] == "*":
for name in filenames:
if name.endswith(tag.filename_match[1:]):
return name
return ""
for name in filenames:
if name == tag.filename_match:
return name
return ""
def read_raw_tags(self, tag: Tag) -> str:
self._supported_tag(tag)
if tag.location == TagLocation.COMMENT:
a = self._open_archive()
content = a.read_comment()
return tag.display_tags(content.encode(encoding="utf-8"))
if tag.location == TagLocation.FILE:
filename = self._find_file(tag)
if filename == "":
return ""
a = self._open_archive()
file_content = a.read_file(filename)
return tag.display_tags(file_content)
if tag.location == TagLocation.CUSTOM:
a = self._open_archive()
return a.display_tags()
return ""
def write_tags(self, version: str, metadata: GenericMetadata, tag: Tag) -> None:
self._supported_tag(tag)
if tag.id in self.md:
del self.md[tag.id]
self.apply_archive_info_to_metadata(metadata, True, True, hash_archive=self.hash_archive)
if tag.location == TagLocation.COMMENT:
a = self._open_archive()
content = a.read_comment()
return a.write_comment(tag.create_tags(version, metadata, content.encode(encoding="utf-8")).decode("utf-8"))
if tag.location == TagLocation.FILE:
filename = self._find_file(tag)
file_content = b""
a = self._open_archive()
if filename:
file_content = a.read_file(filename)
else:
filename = tag.filename
return a.write_file(filename, tag.create_tags(version, metadata, file_content))
if tag.location == TagLocation.CUSTOM:
a = self._open_archive()
return a.write_tags(version, metadata)
def has_tags(self, tag: Tag) -> bool:
self._supported_tag(tag)
if tag.location == TagLocation.COMMENT:
a = self._open_archive()
comment = a.read_comment()
return tag.validate_tags(comment.encode(encoding="utf-8"))
if tag.location == TagLocation.FILE:
filename = self._find_file(tag)
if filename == "":
return False
a = self._open_archive()
file_content = a.read_file(filename)
return tag.validate_tags(file_content)
if tag.location == TagLocation.CUSTOM:
a = self._open_archive()
return a.has_tags()
return False
def remove_tags(self, tag: Tag) -> None:
self._supported_tag(tag)
if tag.id in self.md:
del self.md[tag.id]
if tag.location == TagLocation.COMMENT:
a = self._open_archive()
return a.write_comment("")
if tag.location == TagLocation.FILE:
filename = self._find_file(tag)
if filename == "":
return
a = self._open_archive()
return a.remove_files([filename])
if tag.location == TagLocation.CUSTOM:
a = self._open_archive()
return a.remove_tags()
def load_cache(self, loaded_tags: Iterable[Tag]) -> None:
for tag in loaded_tags:
try:
md = self.read_tags(tag)
if not md.is_empty:
self.md[tag.id] = md
except Exception:
...
def get_page(self, index: int) -> bytes:
image_data = b""
filename = self.get_page_name(index)
if filename:
try:
a = self._open_archive()
image_data = a.read_file(filename)
except Exception:
logger.exception("Error reading in page %d. Substituting logo page.", index)
image_data = ComicArchive.logo_data
return image_data
def get_page_name(self, index: int) -> str:
page_list = self.get_page_name_list()
num_pages = len(page_list)
if num_pages == 0 or index >= num_pages:
return ""
return page_list[index]
def get_scanner_page_index(self) -> int | None:
scanner_page_index = None
# make a guess at the scanner page
name_list = self.get_page_name_list()
count = self.get_number_of_pages()
# too few pages to really know
if count < 5:
return None
# count the length of every filename, and count occurrences
length_buckets: dict[int, int] = {}
for name in name_list:
fname = os.path.split(name)[1]
length = len(fname)
if length in length_buckets:
length_buckets[length] += 1
else:
length_buckets[length] = 1
# sort by most common
sorted_buckets = sorted(length_buckets.items(), key=lambda tup: (tup[1], tup[0]), reverse=True)
# statistical mode occurrence is first
mode_length = sorted_buckets[0][0]
# we are only going to consider the final image file:
final_name = os.path.split(name_list[count - 1])[1]
common_length_list = []
for name in name_list:
if len(os.path.split(name)[1]) == mode_length:
common_length_list.append(os.path.split(name)[1])
prefix = os.path.commonprefix(common_length_list)
if mode_length <= 7 and prefix == "":
# probably all numbers
if len(final_name) > mode_length:
scanner_page_index = count - 1
# see if the last page doesn't start with the same prefix as most others
elif not final_name.startswith(prefix):
scanner_page_index = count - 1
return scanner_page_index
def get_page_name_list(self) -> list[str]:
if not self.page_list:
utils.initialize_pil()
a = self._open_archive()
self.page_list = utils.get_page_name_list(a.get_filename_list())
return self.page_list
def get_number_of_pages(self) -> int:
if self.page_count is None:
self.page_count = len(self.get_page_name_list())
return self.page_count
def apply_archive_info_to_metadata(
self,
md: GenericMetadata,
calc_page_sizes: bool = False,
detect_double_page: bool = False,
*,
hash_archive: str = "",
) -> None:
hash_archive = hash_archive
md.page_count = self.get_number_of_pages()
md.apply_default_page_list(self.get_page_name_list())
if not self.seems_to_be_a_comic_archive():
return
if hash_archive in hashlib.algorithms_available and not md.original_hash:
if self.path.is_dir():
return
hasher = getattr(hashlib, hash_archive, hash_archive)
try:
with self.path.open("b+r") as archive:
digest = utils.file_digest(archive, hasher)
if len(inspect.signature(digest.hexdigest).parameters) > 0:
length = digest.name.rpartition("_")[2]
if not length.isdigit():
length = "128"
md.original_hash = FileHash(digest.name, digest.hexdigest(int(length) // 8)) # type: ignore[call-arg]
else:
md.original_hash = FileHash(digest.name, digest.hexdigest())
except Exception:
logger.exception("Failed to calculate original hash for '%s'", self.path)
if not calc_page_sizes:
return
for p in md.pages:
if p.byte_size is None or p.height is None or p.width is None or p.double_page is None:
try:
data = self.get_page(p.archive_index)
p.byte_size = len(data)
if not data or not utils.initialize_pil():
continue
from PIL import Image
im = Image.open(io.BytesIO(data))
w, h = im.size
p.height = h
p.width = w
if detect_double_page:
p.double_page = p.is_double_page()
except Exception as e:
logger.exception("Error decoding image [%s] %s :: image %s", e, self.path, p.archive_index)
def metadata_from_filename(
self,
parser: utils.Parser = utils.Parser.ORIGINAL,
remove_c2c: bool = False,
remove_fcbd: bool = False,
remove_publisher: bool = False,
split_words: bool = False,
allow_issue_start_with_letter: bool = False,
protofolius_issue_number_scheme: bool = False,
) -> GenericMetadata:
metadata = GenericMetadata()
filename_info = utils.parse_filename(
self.path.name,
parser=parser,
remove_c2c=remove_c2c,
remove_fcbd=remove_fcbd,
remove_publisher=remove_publisher,
split_words=split_words,
allow_issue_start_with_letter=allow_issue_start_with_letter,
protofolius_issue_number_scheme=protofolius_issue_number_scheme,
)
metadata.alternate_number = utils.xlate(filename_info.get("alternate", None))
metadata.issue = utils.xlate(filename_info.get("issue", None))
metadata.issue_count = utils.xlate_int(filename_info.get("issue_count", None))
metadata.publisher = utils.xlate(filename_info.get("publisher", None))
metadata.series = utils.xlate(filename_info.get("series", None))
metadata.title = utils.xlate(filename_info.get("title", None))
metadata.volume = utils.xlate_int(filename_info.get("volume", None))
metadata.volume_count = utils.xlate_int(filename_info.get("volume_count", None))
metadata.year = utils.xlate_int(filename_info.get("year", None))
metadata.scan_info = utils.xlate(filename_info.get("remainder", None))
if filename_info.get("fcbd", None):
metadata.format = "FCBD"
metadata.tags.add("FCBD")
if filename_info.get("c2c", None):
metadata.tags.add("c2c")
if filename_info.get("annual", None):
metadata.format = "Annual"
if filename_info.get("format", None):
metadata.format = filename_info["format"]
metadata.is_empty = False
return metadata
# def export_as(self, new_filename: pathlib.Path, extension: str = ".cbz") -> None:
# """
# Copies all content from the current archive to
# """
# export_archiver: ComicFile = UnknownArchiver(new_filename)
# for archiver in archivers:
# if extension == archiver.extension:
# export_archiver = archiver(new_filename)
# if isinstance(export_archiver, UnknownArchiver):
# if extension == ".cbz":
# export_archiver = cast(ComicFile, ZipComic(new_filename))
# else:
# raise Exception(f"Cannot export as {extension}")
# a = self._open_archive()
# export_archiver.write_files(a.read_files(a.get_filename_list()), filenames=a.get_filename_list())
# if TagLocation.COMMENT in export_archiver.tag_locations and TagLocation.COMMENT in a.tag_locations:
# export_archiver.write_comment(a.read_comment())