605 lines
22 KiB
Python
605 lines
22 KiB
Python
"""A class to represent a single comic, be it file or folder of images"""
|
|
# Copyright 2012-2014 Anthony Beville
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
from __future__ import annotations
|
|
|
|
import io
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import shutil
|
|
import sys
|
|
from typing import cast
|
|
|
|
import natsort
|
|
import wordninja
|
|
|
|
from comicapi import filenamelexer, filenameparser, utils
|
|
from comicapi.archivers import Archiver, UnknownArchiver, ZipArchiver
|
|
from comicapi.comet import CoMet
|
|
from comicapi.comicbookinfo import ComicBookInfo
|
|
from comicapi.comicinfoxml import ComicInfoXml
|
|
from comicapi.genericmetadata import GenericMetadata, PageType
|
|
|
|
if sys.version_info < (3, 10):
|
|
from importlib_metadata import entry_points
|
|
else:
|
|
from importlib.metadata import entry_points
|
|
|
|
try:
|
|
from PIL import Image
|
|
|
|
pil_available = True
|
|
except ImportError:
|
|
pil_available = False
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
if not pil_available:
|
|
logger.error("PIL unavalable")
|
|
|
|
archivers: list[type[Archiver]] = []
|
|
|
|
|
|
def load_archive_plugins() -> None:
|
|
if not archivers:
|
|
builtin: list[type[Archiver]] = []
|
|
for arch in entry_points(group="comicapi.archiver"):
|
|
try:
|
|
archiver: type[Archiver] = arch.load()
|
|
if archiver.enabled:
|
|
if arch.module.startswith("comicapi"):
|
|
builtin.append(archiver)
|
|
else:
|
|
archivers.append(archiver)
|
|
except Exception:
|
|
logger.warning("Failed to load talker: %s", arch.name)
|
|
archivers.extend(builtin)
|
|
|
|
|
|
class MetaDataStyle:
|
|
CBI = 0
|
|
CIX = 1
|
|
COMET = 2
|
|
name = ["ComicBookLover", "ComicRack", "CoMet"]
|
|
short_name = ["cbl", "cr", "comet"]
|
|
|
|
|
|
class ComicArchive:
|
|
logo_data = b""
|
|
|
|
def __init__(self, path: pathlib.Path | str, default_image_path: pathlib.Path | str | None = None) -> None:
|
|
self.cbi_md: GenericMetadata | None = None
|
|
self.cix_md: GenericMetadata | None = None
|
|
self.comet_filename: str | None = None
|
|
self.comet_md: GenericMetadata | None = None
|
|
self._has_cbi: bool | None = None
|
|
self._has_cix: bool | None = None
|
|
self._has_comet: bool | None = None
|
|
self.path = pathlib.Path(path).absolute()
|
|
self.page_count: int | None = None
|
|
self.page_list: list[str] = []
|
|
|
|
self.ci_xml_filename = "ComicInfo.xml"
|
|
self.comet_default_filename = "CoMet.xml"
|
|
self.reset_cache()
|
|
self.default_image_path = default_image_path
|
|
|
|
self.archiver: Archiver = UnknownArchiver.open(self.path)
|
|
|
|
load_archive_plugins()
|
|
for archiver in archivers:
|
|
if archiver.is_valid(self.path):
|
|
self.archiver = archiver.open(self.path)
|
|
break
|
|
|
|
if not ComicArchive.logo_data and self.default_image_path:
|
|
with open(self.default_image_path, mode="rb") as fd:
|
|
ComicArchive.logo_data = fd.read()
|
|
|
|
def reset_cache(self) -> None:
|
|
"""Clears the cached data"""
|
|
|
|
self._has_cix = None
|
|
self._has_cbi = None
|
|
self._has_comet = None
|
|
self.comet_filename = None
|
|
self.page_count = None
|
|
self.page_list = []
|
|
self.cix_md = None
|
|
self.cbi_md = None
|
|
self.comet_md = None
|
|
|
|
def load_cache(self, style_list: list[int]) -> None:
|
|
for style in style_list:
|
|
self.read_metadata(style)
|
|
|
|
def rename(self, path: pathlib.Path | str) -> None:
|
|
new_path = pathlib.Path(path).absolute()
|
|
if new_path == self.path:
|
|
return
|
|
os.makedirs(new_path.parent, 0o777, True)
|
|
shutil.move(self.path, new_path)
|
|
self.path = new_path
|
|
self.archiver.path = pathlib.Path(path)
|
|
|
|
def is_writable(self, check_archive_status: bool = True) -> bool:
|
|
if isinstance(self.archiver, UnknownArchiver):
|
|
return False
|
|
|
|
if check_archive_status and not self.archiver.is_writable():
|
|
return False
|
|
|
|
if not (os.access(self.path, os.W_OK) or os.access(self.path.parent, os.W_OK)):
|
|
return False
|
|
|
|
return True
|
|
|
|
def is_writable_for_style(self, data_style: int) -> bool:
|
|
return not (data_style == MetaDataStyle.CBI and not self.archiver.supports_comment)
|
|
|
|
def is_zip(self) -> bool:
|
|
return self.archiver.name() == "ZIP"
|
|
|
|
def seems_to_be_a_comic_archive(self) -> bool:
|
|
if not (isinstance(self.archiver, UnknownArchiver)) and self.get_number_of_pages() > 0:
|
|
return True
|
|
|
|
return False
|
|
|
|
def extension(self) -> str:
|
|
return self.archiver.extension()
|
|
|
|
def read_metadata(self, style: int) -> GenericMetadata:
|
|
if style == MetaDataStyle.CIX:
|
|
return self.read_cix()
|
|
if style == MetaDataStyle.CBI:
|
|
return self.read_cbi()
|
|
if style == MetaDataStyle.COMET:
|
|
return self.read_comet()
|
|
return GenericMetadata()
|
|
|
|
def write_metadata(self, metadata: GenericMetadata, style: int) -> bool:
|
|
retcode = False
|
|
if style == MetaDataStyle.CIX:
|
|
retcode = self.write_cix(metadata)
|
|
if style == MetaDataStyle.CBI:
|
|
retcode = self.write_cbi(metadata)
|
|
if style == MetaDataStyle.COMET:
|
|
retcode = self.write_comet(metadata)
|
|
return retcode
|
|
|
|
def has_metadata(self, style: int) -> bool:
|
|
if style == MetaDataStyle.CIX:
|
|
return self.has_cix()
|
|
if style == MetaDataStyle.CBI:
|
|
return self.has_cbi()
|
|
if style == MetaDataStyle.COMET:
|
|
return self.has_comet()
|
|
return False
|
|
|
|
def remove_metadata(self, style: int) -> bool:
|
|
retcode = True
|
|
if style == MetaDataStyle.CIX:
|
|
retcode = self.remove_cix()
|
|
elif style == MetaDataStyle.CBI:
|
|
retcode = self.remove_cbi()
|
|
elif style == MetaDataStyle.COMET:
|
|
retcode = self.remove_co_met()
|
|
return retcode
|
|
|
|
def get_page(self, index: int) -> bytes:
|
|
image_data = b""
|
|
|
|
filename = self.get_page_name(index)
|
|
|
|
if filename:
|
|
try:
|
|
image_data = self.archiver.read_file(filename) or b""
|
|
except Exception:
|
|
logger.error("Error reading in page %d. Substituting logo page.", index)
|
|
image_data = ComicArchive.logo_data
|
|
|
|
return image_data
|
|
|
|
def get_page_name(self, index: int) -> str:
|
|
if index is None:
|
|
return ""
|
|
|
|
page_list = self.get_page_name_list()
|
|
|
|
num_pages = len(page_list)
|
|
if num_pages == 0 or index >= num_pages:
|
|
return ""
|
|
|
|
return page_list[index]
|
|
|
|
def get_scanner_page_index(self) -> int | None:
|
|
scanner_page_index = None
|
|
|
|
# make a guess at the scanner page
|
|
name_list = self.get_page_name_list()
|
|
count = self.get_number_of_pages()
|
|
|
|
# too few pages to really know
|
|
if count < 5:
|
|
return None
|
|
|
|
# count the length of every filename, and count occurrences
|
|
length_buckets: dict[int, int] = {}
|
|
for name in name_list:
|
|
fname = os.path.split(name)[1]
|
|
length = len(fname)
|
|
if length in length_buckets:
|
|
length_buckets[length] += 1
|
|
else:
|
|
length_buckets[length] = 1
|
|
|
|
# sort by most common
|
|
sorted_buckets = sorted(length_buckets.items(), key=lambda tup: (tup[1], tup[0]), reverse=True)
|
|
|
|
# statistical mode occurrence is first
|
|
mode_length = sorted_buckets[0][0]
|
|
|
|
# we are only going to consider the final image file:
|
|
final_name = os.path.split(name_list[count - 1])[1]
|
|
|
|
common_length_list = []
|
|
for name in name_list:
|
|
if len(os.path.split(name)[1]) == mode_length:
|
|
common_length_list.append(os.path.split(name)[1])
|
|
|
|
prefix = os.path.commonprefix(common_length_list)
|
|
|
|
if mode_length <= 7 and prefix == "":
|
|
# probably all numbers
|
|
if len(final_name) > mode_length:
|
|
scanner_page_index = count - 1
|
|
|
|
# see if the last page doesn't start with the same prefix as most others
|
|
elif not final_name.startswith(prefix):
|
|
scanner_page_index = count - 1
|
|
|
|
return scanner_page_index
|
|
|
|
def get_page_name_list(self, sort_list: bool = True) -> list[str]:
|
|
if not self.page_list:
|
|
# get the list file names in the archive, and sort
|
|
files: list[str] = self.archiver.get_filename_list()
|
|
|
|
# seems like some archive creators are on Windows, and don't know about case-sensitivity!
|
|
if sort_list:
|
|
files = cast(list[str], natsort.os_sorted(files))
|
|
|
|
# make a sub-list of image files
|
|
self.page_list = []
|
|
for name in files:
|
|
if (
|
|
os.path.splitext(name)[1].casefold() in [".jpg", ".jpeg", ".png", ".gif", ".webp"]
|
|
and os.path.basename(name)[0] != "."
|
|
):
|
|
self.page_list.append(name)
|
|
|
|
return self.page_list
|
|
|
|
def get_number_of_pages(self) -> int:
|
|
if self.page_count is None:
|
|
self.page_count = len(self.get_page_name_list())
|
|
return self.page_count
|
|
|
|
def read_cbi(self) -> GenericMetadata:
|
|
if self.cbi_md is None:
|
|
raw_cbi = self.read_raw_cbi()
|
|
if raw_cbi:
|
|
self.cbi_md = ComicBookInfo().metadata_from_string(raw_cbi)
|
|
else:
|
|
self.cbi_md = GenericMetadata()
|
|
|
|
self.cbi_md.set_default_page_list(self.get_number_of_pages())
|
|
|
|
return self.cbi_md
|
|
|
|
def read_raw_cbi(self) -> str:
|
|
if not self.has_cbi():
|
|
return ""
|
|
|
|
return self.archiver.get_comment()
|
|
|
|
def has_cbi(self) -> bool:
|
|
if self._has_cbi is None:
|
|
if not self.seems_to_be_a_comic_archive():
|
|
self._has_cbi = False
|
|
else:
|
|
comment = self.archiver.get_comment()
|
|
self._has_cbi = ComicBookInfo().validate_string(comment)
|
|
|
|
return self._has_cbi
|
|
|
|
def write_cbi(self, metadata: GenericMetadata) -> bool:
|
|
if metadata is not None:
|
|
try:
|
|
self.apply_archive_info_to_metadata(metadata)
|
|
cbi_string = ComicBookInfo().string_from_metadata(metadata)
|
|
write_success = self.archiver.set_comment(cbi_string)
|
|
if write_success:
|
|
self._has_cbi = True
|
|
self.cbi_md = metadata
|
|
self.reset_cache()
|
|
return write_success
|
|
except Exception as e:
|
|
logger.error("Error saving CBI! for %s: %s", self.path, e)
|
|
|
|
return False
|
|
|
|
def remove_cbi(self) -> bool:
|
|
if self.has_cbi():
|
|
write_success = self.archiver.set_comment("")
|
|
if write_success:
|
|
self._has_cbi = False
|
|
self.cbi_md = None
|
|
self.reset_cache()
|
|
return write_success
|
|
return True
|
|
|
|
def read_cix(self) -> GenericMetadata:
|
|
if self.cix_md is None:
|
|
raw_cix = self.read_raw_cix()
|
|
if raw_cix:
|
|
self.cix_md = ComicInfoXml().metadata_from_string(raw_cix)
|
|
else:
|
|
self.cix_md = GenericMetadata()
|
|
|
|
# validate the existing page list (make sure count is correct)
|
|
if len(self.cix_md.pages) != 0:
|
|
if len(self.cix_md.pages) != self.get_number_of_pages():
|
|
# pages array doesn't match the actual number of images we're seeing
|
|
# in the archive, so discard the data
|
|
self.cix_md.pages = []
|
|
|
|
if len(self.cix_md.pages) == 0:
|
|
self.cix_md.set_default_page_list(self.get_number_of_pages())
|
|
|
|
return self.cix_md
|
|
|
|
def read_raw_cix(self) -> bytes:
|
|
if not self.has_cix():
|
|
return b""
|
|
try:
|
|
raw_cix = self.archiver.read_file(self.ci_xml_filename) or b""
|
|
except Exception as e:
|
|
logger.error("Error reading in raw CIX! for %s: %s", self.path, e)
|
|
raw_cix = b""
|
|
return raw_cix
|
|
|
|
def write_cix(self, metadata: GenericMetadata) -> bool:
|
|
if metadata is not None:
|
|
try:
|
|
self.apply_archive_info_to_metadata(metadata, calc_page_sizes=True)
|
|
raw_cix = self.read_raw_cix()
|
|
cix_string = ComicInfoXml().string_from_metadata(metadata, xml=raw_cix)
|
|
write_success = self.archiver.write_file(self.ci_xml_filename, cix_string.encode("utf-8"))
|
|
if write_success:
|
|
self._has_cix = True
|
|
self.cix_md = metadata
|
|
self.reset_cache()
|
|
return write_success
|
|
except Exception as e:
|
|
logger.error("Error saving CIX! for %s: %s", self.path, e)
|
|
|
|
return False
|
|
|
|
def remove_cix(self) -> bool:
|
|
if self.has_cix():
|
|
write_success = self.archiver.remove_file(self.ci_xml_filename)
|
|
if write_success:
|
|
self._has_cix = False
|
|
self.cix_md = None
|
|
self.reset_cache()
|
|
return write_success
|
|
return True
|
|
|
|
def has_cix(self) -> bool:
|
|
if self._has_cix is None:
|
|
if not self.seems_to_be_a_comic_archive():
|
|
self._has_cix = False
|
|
elif self.ci_xml_filename in self.archiver.get_filename_list():
|
|
self._has_cix = True
|
|
else:
|
|
self._has_cix = False
|
|
return self._has_cix
|
|
|
|
def read_comet(self) -> GenericMetadata:
|
|
if self.comet_md is None:
|
|
raw_comet = self.read_raw_comet()
|
|
if raw_comet is None or raw_comet == "":
|
|
self.comet_md = GenericMetadata()
|
|
else:
|
|
self.comet_md = CoMet().metadata_from_string(raw_comet)
|
|
|
|
self.comet_md.set_default_page_list(self.get_number_of_pages())
|
|
# use the coverImage value from the comet_data to mark the cover in this struct
|
|
# walk through list of images in file, and find the matching one for md.coverImage
|
|
# need to remove the existing one in the default
|
|
if self.comet_md.cover_image is not None:
|
|
cover_idx = 0
|
|
for idx, f in enumerate(self.get_page_name_list()):
|
|
if self.comet_md.cover_image == f:
|
|
cover_idx = idx
|
|
break
|
|
if cover_idx != 0:
|
|
del self.comet_md.pages[0]["Type"]
|
|
self.comet_md.pages[cover_idx]["Type"] = PageType.FrontCover
|
|
|
|
return self.comet_md
|
|
|
|
def read_raw_comet(self) -> str:
|
|
raw_comet = ""
|
|
if not self.has_comet():
|
|
raw_comet = ""
|
|
else:
|
|
try:
|
|
raw_bytes = self.archiver.read_file(cast(str, self.comet_filename))
|
|
if raw_bytes:
|
|
raw_comet = raw_bytes.decode("utf-8")
|
|
except OSError as e:
|
|
logger.exception("Error reading in raw CoMet!: %s", e)
|
|
return raw_comet
|
|
|
|
def write_comet(self, metadata: GenericMetadata) -> bool:
|
|
if metadata is not None:
|
|
if not self.has_comet():
|
|
self.comet_filename = self.comet_default_filename
|
|
|
|
self.apply_archive_info_to_metadata(metadata)
|
|
# Set the coverImage value, if it's not the first page
|
|
cover_idx = int(metadata.get_cover_page_index_list()[0])
|
|
if cover_idx != 0:
|
|
metadata.cover_image = self.get_page_name(cover_idx)
|
|
|
|
comet_string = CoMet().string_from_metadata(metadata)
|
|
write_success = self.archiver.write_file(cast(str, self.comet_filename), comet_string.encode("utf-8"))
|
|
if write_success:
|
|
self._has_comet = True
|
|
self.comet_md = metadata
|
|
self.reset_cache()
|
|
return write_success
|
|
|
|
return False
|
|
|
|
def remove_co_met(self) -> bool:
|
|
if self.has_comet():
|
|
write_success = self.archiver.remove_file(cast(str, self.comet_filename))
|
|
if write_success:
|
|
self._has_comet = False
|
|
self.comet_md = None
|
|
self.reset_cache()
|
|
return write_success
|
|
return True
|
|
|
|
def has_comet(self) -> bool:
|
|
if self._has_comet is None:
|
|
self._has_comet = False
|
|
if not self.seems_to_be_a_comic_archive():
|
|
return self._has_comet
|
|
|
|
# look at all xml files in root, and search for CoMet data, get first
|
|
for n in self.archiver.get_filename_list():
|
|
if os.path.dirname(n) == "" and os.path.splitext(n)[1].casefold() == ".xml":
|
|
# read in XML file, and validate it
|
|
data = ""
|
|
try:
|
|
d = self.archiver.read_file(n)
|
|
if d:
|
|
data = d.decode("utf-8")
|
|
except Exception as e:
|
|
logger.warning("Error reading in Comet XML for validation! from %s: %s", self.path, e)
|
|
if CoMet().validate_string(data):
|
|
# since we found it, save it!
|
|
self.comet_filename = n
|
|
self._has_comet = True
|
|
break
|
|
|
|
return self._has_comet
|
|
|
|
def apply_archive_info_to_metadata(self, md: GenericMetadata, calc_page_sizes: bool = False) -> None:
|
|
md.page_count = self.get_number_of_pages()
|
|
|
|
if calc_page_sizes:
|
|
for index, p in enumerate(md.pages):
|
|
idx = int(p["Image"])
|
|
if pil_available:
|
|
if "ImageSize" not in p or "ImageHeight" not in p or "ImageWidth" not in p:
|
|
data = self.get_page(idx)
|
|
if data:
|
|
try:
|
|
if isinstance(data, bytes):
|
|
im = Image.open(io.BytesIO(data))
|
|
else:
|
|
im = Image.open(io.StringIO(data))
|
|
w, h = im.size
|
|
|
|
p["ImageSize"] = str(len(data))
|
|
p["ImageHeight"] = str(h)
|
|
p["ImageWidth"] = str(w)
|
|
except Exception as e:
|
|
logger.warning("Error decoding image [%s] %s :: image %s", e, self.path, index)
|
|
p["ImageSize"] = str(len(data))
|
|
|
|
else:
|
|
if "ImageSize" not in p:
|
|
data = self.get_page(idx)
|
|
p["ImageSize"] = str(len(data))
|
|
|
|
def metadata_from_filename(
|
|
self,
|
|
complicated_parser: bool = False,
|
|
remove_c2c: bool = False,
|
|
remove_fcbd: bool = False,
|
|
remove_publisher: bool = False,
|
|
split_words: bool = False,
|
|
) -> GenericMetadata:
|
|
metadata = GenericMetadata()
|
|
|
|
filename = self.path.name
|
|
if split_words:
|
|
filename = " ".join(wordninja.split(self.path.stem)) + self.path.suffix
|
|
|
|
if complicated_parser:
|
|
lex = filenamelexer.Lex(filename)
|
|
p = filenameparser.Parse(
|
|
lex.items, remove_c2c=remove_c2c, remove_fcbd=remove_fcbd, remove_publisher=remove_publisher
|
|
)
|
|
metadata.alternate_number = utils.xlate(p.filename_info["alternate"])
|
|
metadata.issue = utils.xlate(p.filename_info["issue"])
|
|
metadata.issue_count = utils.xlate(p.filename_info["issue_count"])
|
|
metadata.publisher = utils.xlate(p.filename_info["publisher"])
|
|
metadata.series = utils.xlate(p.filename_info["series"])
|
|
metadata.title = utils.xlate(p.filename_info["title"])
|
|
metadata.volume = utils.xlate(p.filename_info["volume"])
|
|
metadata.volume_count = utils.xlate(p.filename_info["volume_count"])
|
|
metadata.year = utils.xlate(p.filename_info["year"])
|
|
|
|
metadata.scan_info = utils.xlate(p.filename_info["remainder"])
|
|
metadata.format = "FCBD" if p.filename_info["fcbd"] else None
|
|
if p.filename_info["annual"]:
|
|
metadata.format = "Annual"
|
|
else:
|
|
fnp = filenameparser.FileNameParser()
|
|
fnp.parse_filename(filename)
|
|
|
|
if fnp.issue:
|
|
metadata.issue = fnp.issue
|
|
if fnp.series:
|
|
metadata.series = fnp.series
|
|
if fnp.volume:
|
|
metadata.volume = utils.xlate(fnp.volume, True)
|
|
if fnp.year:
|
|
metadata.year = utils.xlate(fnp.year, True)
|
|
if fnp.issue_count:
|
|
metadata.issue_count = utils.xlate(fnp.issue_count, True)
|
|
if fnp.remainder:
|
|
metadata.scan_info = fnp.remainder
|
|
|
|
metadata.is_empty = False
|
|
|
|
return metadata
|
|
|
|
def export_as_zip(self, zip_filename: pathlib.Path) -> bool:
|
|
if self.archiver.name() == "ZIP":
|
|
# nothing to do, we're already a zip
|
|
return True
|
|
|
|
zip_archiver = ZipArchiver.open(zip_filename)
|
|
return zip_archiver.copy_from_archive(self.archiver)
|