This commit is contained in:
lordwelch 2022-05-17 13:57:04 -07:00
parent a375a63328
commit 2942c64bae
9 changed files with 515 additions and 457 deletions

View File

@ -16,6 +16,7 @@
import logging
import xml.etree.ElementTree as ET
from typing import Any
from comicapi import utils
from comicapi.genericmetadata import GenericMetadata
@ -33,19 +34,16 @@ class CoMet:
cover_synonyms = ["cover", "covers", "coverartist", "cover artist"]
editor_synonyms = ["editor"]
def metadata_from_string(self, string):
def metadata_from_string(self, string: str) -> GenericMetadata:
tree = ET.ElementTree(ET.fromstring(string))
return self.convert_xml_to_metadata(tree)
def string_from_metadata(self, metadata):
header = '<?xml version="1.0" encoding="UTF-8"?>\n'
def string_from_metadata(self, metadata: GenericMetadata) -> str:
tree = self.convert_metadata_to_xml(metadata)
return header + ET.tostring(tree.getroot())
return str(ET.tostring(tree.getroot(), encoding="utf-8", xml_declaration=True).decode("utf-8"))
def convert_metadata_to_xml(self, metadata):
def convert_metadata_to_xml(self, metadata: GenericMetadata) -> ET.ElementTree:
# shorthand for the metadata
md = metadata
@ -57,7 +55,7 @@ class CoMet:
root.attrib["xsi:schemaLocation"] = "http://www.denvog.com http://www.denvog.com/comet/comet.xsd"
# helper func
def assign(comet_entry, md_entry):
def assign(comet_entry: str, md_entry: Any) -> None:
if md_entry is not None:
ET.SubElement(root, comet_entry).text = str(md_entry)
@ -127,41 +125,41 @@ class CoMet:
tree = ET.ElementTree(root)
return tree
def convert_xml_to_metadata(self, tree):
def convert_xml_to_metadata(self, tree: ET.ElementTree) -> GenericMetadata:
root = tree.getroot()
if root.tag != "comet":
raise "1"
raise Exception("Not a CoMet file")
metadata = GenericMetadata()
md = metadata
# Helper function
def xlate(tag):
def get(tag: str) -> Any:
node = root.find(tag)
if node is not None:
return node.text
return None
md.series = xlate("series")
md.title = xlate("title")
md.issue = xlate("issue")
md.volume = xlate("volume")
md.comments = xlate("description")
md.publisher = xlate("publisher")
md.language = xlate("language")
md.format = xlate("format")
md.page_count = xlate("pages")
md.maturity_rating = xlate("rating")
md.price = xlate("price")
md.is_version_of = xlate("isVersionOf")
md.rights = xlate("rights")
md.identifier = xlate("identifier")
md.last_mark = xlate("lastMark")
md.genre = xlate("genre") # TODO - repeatable field
md.series = get("series")
md.title = get("title")
md.issue = get("issue")
md.volume = get("volume")
md.comments = get("description")
md.publisher = get("publisher")
md.language = get("language")
md.format = get("format")
md.page_count = get("pages")
md.maturity_rating = get("rating")
md.price = get("price")
md.is_version_of = get("isVersionOf")
md.rights = get("rights")
md.identifier = get("identifier")
md.last_mark = get("lastMark")
md.genre = get("genre") # TODO - repeatable field
date = xlate("date")
date = get("date")
if date is not None:
parts = date.split("-")
if len(parts) > 0:
@ -169,9 +167,9 @@ class CoMet:
if len(parts) > 1:
md.month = parts[1]
md.cover_image = xlate("coverImage")
md.cover_image = get("coverImage")
reading_direction = xlate("readingDirection")
reading_direction = get("readingDirection")
if reading_direction is not None and reading_direction == "rtl":
md.manga = "YesAndRightToLeft"
@ -179,7 +177,7 @@ class CoMet:
char_list = []
for n in root:
if n.tag == "character":
char_list.append(n.text.strip())
char_list.append((n.text or "").strip())
md.characters = utils.list_to_string(char_list)
# Now extract the credit info
@ -194,17 +192,17 @@ class CoMet:
n.tag == "editor",
]
):
metadata.add_credit(n.text.strip(), n.tag.title())
metadata.add_credit((n.text or "").strip(), n.tag.title())
if n.tag == "coverDesigner":
metadata.add_credit(n.text.strip(), "Cover")
metadata.add_credit((n.text or "").strip(), "Cover")
metadata.is_empty = False
return metadata
# verify that the string actually contains CoMet data in XML format
def validate_string(self, string):
def validate_string(self, string: str) -> bool:
try:
tree = ET.ElementTree(ET.fromstring(string))
root = tree.getroot()
@ -215,12 +213,12 @@ class CoMet:
return True
def write_to_external_file(self, filename, metadata):
def write_to_external_file(self, filename: str, metadata: GenericMetadata) -> None:
tree = self.convert_metadata_to_xml(metadata)
tree.write(filename, encoding="utf-8")
def read_from_external_file(self, filename):
def read_from_external_file(self, filename: str) -> GenericMetadata:
tree = ET.parse(filename)
return self.convert_xml_to_metadata(tree)

View File

@ -17,6 +17,7 @@
import io
import logging
import os
import pathlib
import platform
import struct
import subprocess
@ -42,7 +43,9 @@ try:
except ImportError:
pil_available = False
from comicapi import filenamelexer, filenameparser
from typing import List, Optional, Union, cast
from comicapi import filenamelexer, filenameparser, utils
from comicapi.comet import CoMet
from comicapi.comicbookinfo import ComicBookInfo
from comicapi.comicinfoxml import ComicInfoXml
@ -62,22 +65,48 @@ class MetaDataStyle:
name = ["ComicBookLover", "ComicRack", "CoMet"]
class SevenZipArchiver:
class UnknownArchiver:
"""Unknown implementation"""
def __init__(self, path: Union[pathlib.Path, str]) -> None:
self.path = path
def get_comment(self) -> str:
return ""
def set_comment(self, comment: str) -> bool:
return False
def read_file(self, archive_file: str) -> Optional[bytes]:
return None
def write_file(self, archive_file: str, data: bytes) -> bool:
return False
def remove_file(self, archive_file: str) -> bool:
return False
def get_filename_list(self) -> list[str]:
return []
class SevenZipArchiver(UnknownArchiver):
"""7Z implementation"""
def __init__(self, path):
self.path = path
def __init__(self, path: Union[pathlib.Path, str]) -> None:
self.path = pathlib.Path(path)
# @todo: Implement Comment?
def get_comment(self):
def get_comment(self) -> str:
return ""
def set_comment(self, comment):
def set_comment(self, comment: str) -> bool:
return False
def read_file(self, archive_file):
data = ""
def read_file(self, archive_file: str) -> bytes:
data = bytes()
try:
with py7zr.SevenZipFile(self.path, "r") as zf:
data = zf.read(archive_file)[archive_file].read()
@ -90,7 +119,7 @@ class SevenZipArchiver:
return data
def remove_file(self, archive_file):
def remove_file(self, archive_file: str) -> bool:
try:
self.rebuild_zip_file([archive_file])
except:
@ -99,7 +128,7 @@ class SevenZipArchiver:
else:
return True
def write_file(self, archive_file, data):
def write_file(self, archive_file: str, data: bytes) -> bool:
# At the moment, no other option but to rebuild the whole
# zip archive w/o the indicated file. Very sucky, but maybe
# another solution can be found
@ -116,17 +145,17 @@ class SevenZipArchiver:
logger.exception("Writing zip file failed")
return False
def get_filename_list(self):
def get_filename_list(self) -> list[str]:
try:
with py7zr.SevenZipFile(self.path, "r") as zf:
namelist = zf.getnames()
namelist: list[str] = zf.getnames()
return namelist
except Exception as e:
logger.error("Unable to get 7zip file list [%s]: %s", e, self.path)
return []
def rebuild_zip_file(self, exclude_list):
def rebuild_zip_file(self, exclude_list: list[str]) -> None:
"""Zip helper func
This recompresses the zip archive, without the files in the exclude_list
@ -148,7 +177,7 @@ class SevenZipArchiver:
os.remove(self.path)
os.rename(tmp_name, self.path)
def copy_from_archive(self, otherArchive):
def copy_from_archive(self, otherArchive: UnknownArchiver) -> bool:
"""Replace the current zip with one copied from another archive"""
try:
with py7zr.SevenZipFile(self.path, "w") as zout:
@ -163,26 +192,25 @@ class SevenZipArchiver:
return True
class ZipArchiver:
class ZipArchiver(UnknownArchiver):
"""ZIP implementation"""
def __init__(self, path):
self.path = path
def __init__(self, path: Union[pathlib.Path, str]) -> None:
self.path = pathlib.Path(path)
def get_comment(self):
def get_comment(self) -> str:
with zipfile.ZipFile(self.path, "r") as zf:
comment = zf.comment
comment = zf.comment.decode("utf-8")
return comment
def set_comment(self, comment):
def set_comment(self, comment: str) -> bool:
with zipfile.ZipFile(self.path, "a") as zf:
zf.comment = bytes(comment, "utf-8")
return True
def read_file(self, archive_file):
def read_file(self, archive_file: str) -> bytes:
with zipfile.ZipFile(self.path, "r") as zf:
try:
data = zf.read(archive_file)
except zipfile.BadZipfile as e:
@ -193,7 +221,7 @@ class ZipArchiver:
raise IOError from e
return data
def remove_file(self, archive_file):
def remove_file(self, archive_file: str) -> bool:
try:
self.rebuild_zip_file([archive_file])
except:
@ -202,7 +230,7 @@ class ZipArchiver:
else:
return True
def write_file(self, archive_file, data):
def write_file(self, archive_file: str, data: bytes) -> bool:
# At the moment, no other option but to rebuild the whole
# zip archive w/o the indicated file. Very sucky, but maybe
# another solution can be found
@ -219,7 +247,7 @@ class ZipArchiver:
logger.error("writing zip file failed [%s]: %s", e, self.path)
return False
def get_filename_list(self):
def get_filename_list(self) -> List[str]:
try:
with zipfile.ZipFile(self.path, "r") as zf:
namelist = zf.namelist()
@ -228,7 +256,7 @@ class ZipArchiver:
logger.error("Unable to get zipfile list [%s]: %s", e, self.path)
return []
def rebuild_zip_file(self, exclude_list):
def rebuild_zip_file(self, exclude_list: List[str]) -> None:
"""Zip helper func
This recompresses the zip archive, without the files in the exclude_list
@ -253,7 +281,7 @@ class ZipArchiver:
os.remove(self.path)
os.rename(tmp_name, self.path)
def write_zip_comment(self, filename, comment):
def write_zip_comment(self, filename: Union[pathlib.Path, str], comment: str) -> bool:
"""
This is a custom function for writing a comment to a zip file,
since the built-in one doesn't seem to work on Windows and Mac OS/X
@ -304,7 +332,7 @@ class ZipArchiver:
fo.seek(pos + 2, 2)
# write out the comment itself
fo.write(bytes(comment))
fo.write(comment.encode("utf-8"))
fo.truncate()
else:
raise Exception("Failed to write comment to zip file!")
@ -314,7 +342,7 @@ class ZipArchiver:
else:
return True
def copy_from_archive(self, other_archive):
def copy_from_archive(self, other_archive: UnknownArchiver) -> bool:
"""Replace the current zip with one copied from another archive"""
try:
with zipfile.ZipFile(self.path, "w", allowZip64=True) as zout:
@ -335,13 +363,13 @@ class ZipArchiver:
return True
class RarArchiver:
class RarArchiver(UnknownArchiver):
"""RAR implementation"""
devnull = None
def __init__(self, path, rar_exe_path):
self.path = path
def __init__(self, path: Union[pathlib.Path, str], rar_exe_path: str) -> None:
self.path = pathlib.Path(path)
self.rar_exe_path = rar_exe_path
if RarArchiver.devnull is None:
@ -349,17 +377,17 @@ class RarArchiver:
# windows only, keeps the cmd.exe from popping up
if platform.system() == "Windows":
self.startupinfo = subprocess.STARTUPINFO()
self.startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
self.startupinfo = subprocess.STARTUPINFO() # type: ignore
self.startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore
else:
self.startupinfo = None
def get_comment(self):
def get_comment(self) -> str:
rarc = self.get_rar_obj()
return rarc.comment
return str(rarc.comment) if rarc else ""
def set_comment(self, comment):
if self.rar_exe_path is not None:
def set_comment(self, comment: str) -> bool:
if self.rar_exe_path:
try:
# write comment to temp file
tmp_fd, tmp_name = tempfile.mkstemp()
@ -369,7 +397,7 @@ class RarArchiver:
working_dir = os.path.dirname(os.path.abspath(self.path))
# use external program to write comment to Rar archive
proc_args = [self.rar_exe_path, "c", "-w" + working_dir, "-c-", "-z" + tmp_name, self.path]
proc_args = [self.rar_exe_path, "c", "-w" + working_dir, "-c-", "-z" + tmp_name, str(self.path)]
subprocess.call(
proc_args,
startupinfo=self.startupinfo,
@ -389,15 +417,17 @@ class RarArchiver:
else:
return False
def read_file(self, archive_file):
def read_file(self, archive_file: str) -> bytes:
rarc = self.get_rar_obj()
if rarc is None:
return bytes()
tries = 0
while tries < 7:
try:
tries = tries + 1
data = rarc.open(archive_file).read()
data: bytes = rarc.open(archive_file).read()
entries = [(rarc.getinfo(archive_file), data)]
if entries[0][0].file_size != len(entries[0][1]):
@ -430,9 +460,9 @@ class RarArchiver:
raise IOError
def write_file(self, archive_file, data):
def write_file(self, archive_file: str, data: bytes) -> bool:
if self.rar_exe_path is not None:
if self.rar_exe_path:
try:
tmp_folder = tempfile.mkdtemp()
@ -467,8 +497,8 @@ class RarArchiver:
else:
return False
def remove_file(self, archive_file):
if self.rar_exe_path is not None:
def remove_file(self, archive_file: str) -> bool:
if self.rar_exe_path:
try:
# use external program to remove file from Rar archive
subprocess.call(
@ -489,14 +519,14 @@ class RarArchiver:
else:
return False
def get_filename_list(self):
def get_filename_list(self) -> list[str]:
rarc = self.get_rar_obj()
tries = 0
# while tries < 7:
namelist = []
try:
tries = tries + 1
namelist = []
for item in rarc.infolist():
for item in rarc.infolist() if rarc else None:
if item.file_size != 0:
namelist.append(item.filename)
@ -504,45 +534,36 @@ class RarArchiver:
logger.error(f"get_filename_list(): [{e}] {self.path} attempt #{tries}".format(str(e), self.path, tries))
time.sleep(1)
else:
# Success
return namelist
return None
def get_rar_obj(self):
tries = 0
def get_rar_obj(self) -> Optional[rarfile.RarFile]:
try:
tries = tries + 1
rarc = rarfile.RarFile(self.path)
rarc = rarfile.RarFile(str(self.path))
except (OSError, IOError) as e:
logger.error("getRARObj(): [%s] %s attempt #%s", e, self.path, tries)
time.sleep(1)
logger.error("getRARObj(): [%s] %s", e, self.path)
else:
return rarc
return None
class FolderArchiver:
class FolderArchiver(UnknownArchiver):
"""Folder implementation"""
def __init__(self, path):
self.path = path
def __init__(self, path: Union[pathlib.Path, str]) -> None:
self.path = pathlib.Path(path)
self.comment_file_name = "ComicTaggerFolderComment.txt"
def get_comment(self):
return self.read_file(self.comment_file_name)
def get_comment(self) -> str:
return self.read_file(self.comment_file_name).decode("utf-8")
def set_comment(self, comment):
return self.write_file(self.comment_file_name, comment)
def set_comment(self, comment: str) -> bool:
return self.write_file(self.comment_file_name, comment.encode("utf-8"))
def read_file(self, archive_file):
def read_file(self, archive_file: str) -> bytes:
data = ""
data = bytes()
fname = os.path.join(self.path, archive_file)
try:
with open(fname, "rb") as f:
@ -552,7 +573,7 @@ class FolderArchiver:
return data
def write_file(self, archive_file, data):
def write_file(self, archive_file: str, data: bytes) -> bool:
fname = os.path.join(self.path, archive_file)
try:
@ -564,7 +585,7 @@ class FolderArchiver:
else:
return True
def remove_file(self, archive_file):
def remove_file(self, archive_file: str) -> bool:
fname = os.path.join(self.path, archive_file)
try:
@ -575,10 +596,10 @@ class FolderArchiver:
else:
return True
def get_filename_list(self):
def get_filename_list(self) -> list[str]:
return self.list_files(self.path)
def list_files(self, folder):
def list_files(self, folder: Union[pathlib.Path, str]) -> list[str]:
itemlist = []
@ -590,49 +611,28 @@ class FolderArchiver:
return itemlist
class UnknownArchiver:
"""Unknown implementation"""
def __init__(self, path):
self.path = path
def get_comment(self):
return ""
def set_comment(self, comment):
return False
def read_file(self, archive_file):
return ""
def write_file(self, archive_file, data):
return False
def remove_file(self, archive_file):
return False
def get_filename_list(self):
return []
class ComicArchive:
logo_data = None
logo_data = bytes()
class ArchiveType:
SevenZip, Zip, Rar, Folder, Pdf, Unknown = list(range(6))
def __init__(self, path, rar_exe_path=None, default_image_path=None):
self.cbi_md = None
self.cix_md = None
self.comet_filename = None
self.comet_md = None
self.has__cbi = None
self.has__cix = None
self.has__comet = None
self.path = path
self.page_count = None
self.page_list = None
def __init__(
self,
path: Union[pathlib.Path, str],
rar_exe_path: str = "",
default_image_path: Union[pathlib.Path, str, None] = None,
) -> None:
self.cbi_md: Optional[GenericMetadata] = None
self.cix_md: Optional[GenericMetadata] = None
self.comet_filename: Optional[str] = None
self.comet_md: Optional[GenericMetadata] = None
self.has__cbi: Optional[bool] = None
self.has__cix: Optional[bool] = None
self.has__comet: Optional[bool] = None
self.path = pathlib.Path(path)
self.page_count: Optional[int] = None
self.page_list: list[str] = []
self.rar_exe_path = rar_exe_path
self.ci_xml_filename = "ComicInfo.xml"
@ -641,7 +641,7 @@ class ComicArchive:
self.default_image_path = default_image_path
# Use file extension to decide which archive test we do first
ext = os.path.splitext(path)[1].lower()
ext = self.path.suffix
self.archive_type = self.ArchiveType.Unknown
self.archiver = UnknownArchiver(self.path)
@ -667,12 +667,13 @@ class ComicArchive:
self.archive_type = self.ArchiveType.Rar
self.archiver = RarArchiver(self.path, rar_exe_path=self.rar_exe_path)
if ComicArchive.logo_data is None:
if not ComicArchive.logo_data:
fname = self.default_image_path
if fname:
with open(fname, "rb") as fd:
ComicArchive.logo_data = fd.read()
def reset_cache(self):
def reset_cache(self) -> None:
"""Clears the cached data"""
self.has__cix = None
@ -680,47 +681,47 @@ class ComicArchive:
self.has__comet = None
self.comet_filename = None
self.page_count = None
self.page_list = None
self.page_list = []
self.cix_md = None
self.cbi_md = None
self.comet_md = None
def load_cache(self, style_list):
def load_cache(self, style_list: List[int]) -> None:
for style in style_list:
self.read_metadata(style)
def rename(self, path):
self.path = path
self.archiver.path = path
def rename(self, path: Union[pathlib.Path, str]) -> None:
self.path = pathlib.Path(path)
self.archiver.path = pathlib.Path(path)
def sevenzip_test(self):
def sevenzip_test(self) -> bool:
return py7zr.is_7zfile(self.path)
def zip_test(self):
def zip_test(self) -> bool:
return zipfile.is_zipfile(self.path)
def rar_test(self):
def rar_test(self) -> bool:
try:
return rarfile.is_rarfile(self.path)
return bool(rarfile.is_rarfile(str(self.path)))
except:
return False
def is_sevenzip(self):
def is_sevenzip(self) -> bool:
return self.archive_type == self.ArchiveType.SevenZip
def is_zip(self):
def is_zip(self) -> bool:
return self.archive_type == self.ArchiveType.Zip
def is_rar(self):
def is_rar(self) -> bool:
return self.archive_type == self.ArchiveType.Rar
def is_pdf(self):
def is_pdf(self) -> bool:
return self.archive_type == self.ArchiveType.Pdf
def is_folder(self):
def is_folder(self) -> bool:
return self.archive_type == self.ArchiveType.Folder
def is_writable(self, check_rar_status=True):
def is_writable(self, check_rar_status: bool = True) -> bool:
if self.archive_type == self.ArchiveType.Unknown:
return False
@ -730,27 +731,25 @@ class ComicArchive:
if not os.access(self.path, os.W_OK):
return False
if (self.archive_type != self.ArchiveType.Folder) and (
not os.access(os.path.dirname(os.path.abspath(self.path)), os.W_OK)
):
if (self.archive_type != self.ArchiveType.Folder) and (not os.access(self.path.parent, os.W_OK)):
return False
return True
def is_writable_for_style(self, data_style):
def is_writable_for_style(self, data_style: int) -> bool:
if (self.is_rar() or self.is_sevenzip()) and data_style == MetaDataStyle.CBI:
return False
return self.is_writable()
def seems_to_be_a_comic_archive(self):
def seems_to_be_a_comic_archive(self) -> bool:
if (self.is_zip() or self.is_rar() or self.is_sevenzip()) and (self.get_number_of_pages() > 0):
return True
return False
def read_metadata(self, style):
def read_metadata(self, style: int) -> GenericMetadata:
if style == MetaDataStyle.CIX:
return self.read_cix()
@ -760,8 +759,8 @@ class ComicArchive:
return self.read_comet()
return GenericMetadata()
def write_metadata(self, metadata, style):
retcode = None
def write_metadata(self, metadata: GenericMetadata, style: int) -> bool:
retcode = False
if style == MetaDataStyle.CIX:
retcode = self.write_cix(metadata)
if style == MetaDataStyle.CBI:
@ -770,7 +769,7 @@ class ComicArchive:
retcode = self.write_comet(metadata)
return retcode
def has_metadata(self, style):
def has_metadata(self, style: int) -> bool:
if style == MetaDataStyle.CIX:
return self.has_cix()
if style == MetaDataStyle.CBI:
@ -779,7 +778,7 @@ class ComicArchive:
return self.has_comet()
return False
def remove_metadata(self, style):
def remove_metadata(self, style: int) -> bool:
retcode = True
if style == MetaDataStyle.CIX:
retcode = self.remove_cix()
@ -789,21 +788,21 @@ class ComicArchive:
retcode = self.remove_co_met()
return retcode
def get_page(self, index):
image_data = None
def get_page(self, index: int) -> bytes:
image_data = bytes()
filename = self.get_page_name(index)
if filename is not None:
if filename:
try:
image_data = self.archiver.read_file(filename)
image_data = self.archiver.read_file(filename) or bytes()
except IOError:
logger.exception("Error reading in page. Substituting logo page.")
image_data = ComicArchive.logo_data
return image_data
def get_page_name(self, index):
def get_page_name(self, index: int) -> str:
if index is None:
return None
@ -811,11 +810,11 @@ class ComicArchive:
num_pages = len(page_list)
if num_pages == 0 or index >= num_pages:
return None
return ""
return page_list[index]
def get_scanner_page_index(self):
def get_scanner_page_index(self) -> Optional[int]:
scanner_page_index = None
# make a guess at the scanner page
@ -827,7 +826,7 @@ class ComicArchive:
return None
# count the length of every filename, and count occurrences
length_buckets = {}
length_buckets: dict[int, int] = {}
for name in name_list:
fname = os.path.split(name)[1]
length = len(fname)
@ -863,15 +862,15 @@ class ComicArchive:
return scanner_page_index
def get_page_name_list(self, sort_list=True):
if self.page_list is None:
def get_page_name_list(self, sort_list: bool = True) -> List[str]:
if not self.page_list:
# get the list file names in the archive, and sort
files = self.archiver.get_filename_list()
files: list[str] = self.archiver.get_filename_list()
# seems like some archive creators are on Windows, and don't know about case-sensitivity!
if sort_list:
files = natsort.natsorted(files, alg=natsort.ns.IC | natsort.ns.I | natsort.ns.U)
files = cast(list[str], natsort.natsorted(files, alg=natsort.ns.IC | natsort.ns.I | natsort.ns.U))
# make a sub-list of image files
self.page_list = []
@ -884,30 +883,30 @@ class ComicArchive:
return self.page_list
def get_number_of_pages(self):
def get_number_of_pages(self) -> int:
if self.page_count is None:
self.page_count = len(self.get_page_name_list())
return self.page_count
def read_cbi(self):
def read_cbi(self) -> GenericMetadata:
if self.cbi_md is None:
raw_cbi = self.read_raw_cbi()
if raw_cbi is None:
self.cbi_md = GenericMetadata()
else:
if raw_cbi:
self.cbi_md = ComicBookInfo().metadata_from_string(raw_cbi)
else:
self.cbi_md = GenericMetadata()
self.cbi_md.set_default_page_list(self.get_number_of_pages())
return self.cbi_md
def read_raw_cbi(self):
def read_raw_cbi(self) -> str:
if not self.has_cbi():
return None
return ""
return self.archiver.get_comment()
def has_cbi(self):
def has_cbi(self) -> bool:
if self.has__cbi is None:
if not self.seems_to_be_a_comic_archive():
self.has__cbi = False
@ -917,7 +916,7 @@ class ComicArchive:
return self.has__cbi
def write_cbi(self, metadata):
def write_cbi(self, metadata: GenericMetadata) -> bool:
if metadata is not None:
self.apply_archive_info_to_metadata(metadata)
cbi_string = ComicBookInfo().string_from_metadata(metadata)
@ -930,7 +929,7 @@ class ComicArchive:
return False
def remove_cbi(self):
def remove_cbi(self) -> bool:
if self.has_cbi():
write_success = self.archiver.set_comment("")
if write_success:
@ -940,13 +939,13 @@ class ComicArchive:
return write_success
return True
def read_cix(self):
def read_cix(self) -> GenericMetadata:
if self.cix_md is None:
raw_cix = self.read_raw_cix()
if raw_cix is None or raw_cix == "":
self.cix_md = GenericMetadata()
else:
if raw_cix:
self.cix_md = ComicInfoXml().metadata_from_string(raw_cix)
else:
self.cix_md = GenericMetadata()
# validate the existing page list (make sure count is correct)
if len(self.cix_md.pages) != 0:
@ -960,22 +959,20 @@ class ComicArchive:
return self.cix_md
def read_raw_cix(self):
def read_raw_cix(self) -> bytes:
if not self.has_cix():
return None
return b""
try:
raw_cix = self.archiver.read_file(self.ci_xml_filename)
raw_cix = self.archiver.read_file(self.ci_xml_filename) or b""
except IOError as e:
logger.error("Error reading in raw CIX!: %s", e)
raw_cix = ""
raw_cix = bytes()
return raw_cix
def write_cix(self, metadata):
def write_cix(self, metadata: GenericMetadata) -> bool:
if metadata is not None:
self.apply_archive_info_to_metadata(metadata, calc_page_sizes=True)
raw_cix = self.read_raw_cix()
if raw_cix == "":
raw_cix = None
cix_string = ComicInfoXml().string_from_metadata(metadata, xml=raw_cix)
write_success = self.archiver.write_file(self.ci_xml_filename, cix_string.encode("utf-8"))
if write_success:
@ -986,7 +983,7 @@ class ComicArchive:
return False
def remove_cix(self):
def remove_cix(self) -> bool:
if self.has_cix():
write_success = self.archiver.remove_file(self.ci_xml_filename)
if write_success:
@ -996,7 +993,7 @@ class ComicArchive:
return write_success
return True
def has_cix(self):
def has_cix(self) -> bool:
if self.has__cix is None:
if not self.seems_to_be_a_comic_archive():
@ -1007,7 +1004,7 @@ class ComicArchive:
self.has__cix = False
return self.has__cix
def read_comet(self):
def read_comet(self) -> GenericMetadata:
if self.comet_md is None:
raw_comet = self.read_raw_comet()
if raw_comet is None or raw_comet == "":
@ -1031,19 +1028,21 @@ class ComicArchive:
return self.comet_md
def read_raw_comet(self):
def read_raw_comet(self) -> str:
raw_comet = ""
if not self.has_comet():
logger.info("%s doesn't have CoMet data!", self.path)
return None
raw_comet = ""
try:
raw_comet = self.archiver.read_file(self.comet_filename)
raw_bytes = self.archiver.read_file(cast(str, self.comet_filename))
if raw_bytes:
raw_comet = raw_bytes.decode("utf-8")
except:
logger.exception("Error reading in raw CoMet!")
raw_comet = ""
return raw_comet
def write_comet(self, metadata):
def write_comet(self, metadata: GenericMetadata) -> bool:
if metadata is not None:
if not self.has_comet():
@ -1056,7 +1055,7 @@ class ComicArchive:
metadata.cover_image = self.get_page_name(cover_idx)
comet_string = CoMet().string_from_metadata(metadata)
write_success = self.archiver.write_file(self.comet_filename, comet_string)
write_success = self.archiver.write_file(cast(str, self.comet_filename), comet_string.encode("utf-8"))
if write_success:
self.has__comet = True
self.comet_md = metadata
@ -1065,9 +1064,9 @@ class ComicArchive:
return False
def remove_co_met(self):
def remove_co_met(self) -> bool:
if self.has_comet():
write_success = self.archiver.remove_file(self.comet_filename)
write_success = self.archiver.remove_file(cast(str, self.comet_filename))
if write_success:
self.has__comet = False
self.comet_md = None
@ -1075,7 +1074,7 @@ class ComicArchive:
return write_success
return True
def has_comet(self):
def has_comet(self) -> bool:
if self.has__comet is None:
self.has__comet = False
if not self.seems_to_be_a_comic_archive():
@ -1086,9 +1085,11 @@ class ComicArchive:
if os.path.dirname(n) == "" and os.path.splitext(n)[1].lower() == ".xml":
# read in XML file, and validate it
try:
data = self.archiver.read_file(n)
except Exception as e:
data = ""
d = self.archiver.read_file(n)
if d:
data = d.decode("utf-8")
except Exception as e:
logger.warning("Error reading in Comet XML for validation!: %s", e)
if CoMet().validate_string(data):
# since we found it, save it!
@ -1098,7 +1099,7 @@ class ComicArchive:
return self.has__comet
def apply_archive_info_to_metadata(self, md, calc_page_sizes=False):
def apply_archive_info_to_metadata(self, md: GenericMetadata, calc_page_sizes: bool = False) -> None:
md.page_count = self.get_number_of_pages()
if calc_page_sizes:
@ -1107,7 +1108,7 @@ class ComicArchive:
if pil_available:
if "ImageSize" not in p or "ImageHeight" not in p or "ImageWidth" not in p:
data = self.get_page(idx)
if data is not None:
if data:
try:
if isinstance(data, bytes):
im = Image.open(io.BytesIO(data))
@ -1128,44 +1129,48 @@ class ComicArchive:
p["ImageSize"] = str(len(data))
def metadata_from_filename(
self, complicated_parser=False, remove_c2c=False, remove_fcbd=False, remove_publisher=False
):
self,
complicated_parser: bool = False,
remove_c2c: bool = False,
remove_fcbd: bool = False,
remove_publisher: bool = False,
) -> GenericMetadata:
metadata = GenericMetadata()
if complicated_parser:
lex = filenamelexer.Lex(self.path)
lex = filenamelexer.Lex(self.path.name)
p = filenameparser.Parse(
lex.items, remove_c2c=remove_c2c, remove_fcbd=remove_fcbd, remove_publisher=remove_publisher
)
metadata.alternate_number = p.filename_info["alternate"] or None
metadata.issue = p.filename_info["issue"] or None
metadata.issue_count = p.filename_info["issue_count"] or None
metadata.publisher = p.filename_info["publisher"] or None
metadata.series = p.filename_info["series"] or None
metadata.title = p.filename_info["title"] or None
metadata.volume = p.filename_info["volume"] or None
metadata.volume_count = p.filename_info["volume_count"] or None
metadata.year = p.filename_info["year"] or None
metadata.alternate_number = utils.xlate(p.filename_info["alternate"])
metadata.issue = utils.xlate(p.filename_info["issue"])
metadata.issue_count = utils.xlate(p.filename_info["issue_count"])
metadata.publisher = utils.xlate(p.filename_info["publisher"])
metadata.series = utils.xlate(p.filename_info["series"])
metadata.title = utils.xlate(p.filename_info["title"])
metadata.volume = utils.xlate(p.filename_info["volume"])
metadata.volume_count = utils.xlate(p.filename_info["volume_count"])
metadata.year = utils.xlate(p.filename_info["year"])
metadata.scan_info = p.filename_info["remainder"] or None
metadata.scan_info = utils.xlate(p.filename_info["remainder"])
metadata.format = "FCBD" if p.filename_info["fcbd"] else None
if p.filename_info["annual"]:
metadata.format = "Annual"
else:
fnp = filenameparser.FileNameParser()
fnp.parse_filename(self.path)
fnp.parse_filename(str(self.path))
if fnp.issue:
metadata.issue = fnp.issue
if fnp.series:
metadata.series = fnp.series
if fnp.volume:
metadata.volume = fnp.volume
metadata.volume = utils.xlate(fnp.volume, True)
if fnp.year:
metadata.year = fnp.year
metadata.year = utils.xlate(fnp.year, True)
if fnp.issue_count:
metadata.issue_count = fnp.issue_count
metadata.issue_count = utils.xlate(fnp.issue_count, True)
if fnp.remainder:
metadata.scan_info = fnp.remainder
@ -1173,7 +1178,7 @@ class ComicArchive:
return metadata
def export_as_zip(self, zipfilename):
def export_as_zip(self, zipfilename: str) -> bool:
if self.archive_type == self.ArchiveType.Zip:
# nothing to do, we're already a zip
return True

View File

@ -18,17 +18,65 @@ import json
import logging
from collections import defaultdict
from datetime import datetime
from typing import Any, Literal, TypedDict, Union
from comicapi import utils
from comicapi.genericmetadata import GenericMetadata
logger = logging.getLogger(__name__)
CBILiteralType = Literal[
"series",
"title",
"issue",
"publisher",
"publicationMonth",
"publicationYear",
"numberOfIssues",
"comments",
"genre",
"volume",
"numberOfVolumes",
"language",
"country",
"rating",
"credits",
"tags",
]
class Credits(TypedDict):
person: str
role: str
primary: bool
class ComicBookInfoJson(TypedDict, total=False):
series: str
title: str
publisher: str
publicationMonth: int
publicationYear: int
issue: int
numberOfIssues: int
volume: int
numberOfVolumes: int
rating: int
genre: str
language: str
country: str
credits: list[Credits]
tags: list[str]
comments: str
CBIContainer = TypedDict("CBIContainer", {"appID": str, "lastModified": str, "ComicBookInfo/1.0": ComicBookInfoJson})
class ComicBookInfo:
def metadata_from_string(self, string):
def metadata_from_string(self, string: str) -> GenericMetadata:
cbi_container = json.loads(str(string, "utf-8"))
cbi_container = json.loads(string)
metadata = GenericMetadata()
@ -66,12 +114,12 @@ class ComicBookInfo:
return metadata
def string_from_metadata(self, metadata):
def string_from_metadata(self, metadata: GenericMetadata) -> str:
cbi_container = self.create_json_dictionary(metadata)
return json.dumps(cbi_container)
def validate_string(self, string):
def validate_string(self, string: Union[bytes, str]) -> bool:
"""Verify that the string actually contains CBI data in JSON format"""
try:
@ -81,20 +129,21 @@ class ComicBookInfo:
return "ComicBookInfo/1.0" in cbi_container
def create_json_dictionary(self, metadata):
def create_json_dictionary(self, metadata: GenericMetadata) -> CBIContainer:
"""Create the dictionary that we will convert to JSON text"""
cbi = {}
cbi_container = {
cbi_container = CBIContainer(
{
"appID": "ComicTagger/" + "1.0.0",
"lastModified": str(datetime.now()),
"ComicBookInfo/1.0": cbi,
} # TODO: ctversion.version,
"ComicBookInfo/1.0": {},
}
) # TODO: ctversion.version,
# helper func
def assign(cbi_entry, md_entry):
def assign(cbi_entry: CBILiteralType, md_entry: Any) -> None:
if md_entry is not None or isinstance(md_entry, str) and md_entry != "":
cbi[cbi_entry] = md_entry
cbi_container["ComicBookInfo/1.0"][cbi_entry] = md_entry
assign("series", utils.xlate(metadata.series))
assign("title", utils.xlate(metadata.title))
@ -115,7 +164,7 @@ class ComicBookInfo:
return cbi_container
def write_to_external_file(self, filename, metadata):
def write_to_external_file(self, filename: str, metadata: GenericMetadata) -> None:
cbi_container = self.create_json_dictionary(metadata)

View File

@ -16,9 +16,12 @@
import logging
import xml.etree.ElementTree as ET
from collections import OrderedDict
from typing import Any, List, Optional, cast
from xml.etree.ElementTree import ElementTree
from comicapi import utils
from comicapi.genericmetadata import GenericMetadata
from comicapi.genericmetadata import GenericMetadata, ImageMetadata
from comicapi.issuestring import IssueString
logger = logging.getLogger(__name__)
@ -34,7 +37,7 @@ class ComicInfoXml:
cover_synonyms = ["cover", "covers", "coverartist", "cover artist"]
editor_synonyms = ["editor"]
def get_parseable_credits(self):
def get_parseable_credits(self) -> List[str]:
parsable_credits = []
parsable_credits.extend(self.writer_synonyms)
parsable_credits.extend(self.penciller_synonyms)
@ -45,17 +48,19 @@ class ComicInfoXml:
parsable_credits.extend(self.editor_synonyms)
return parsable_credits
def metadata_from_string(self, string):
def metadata_from_string(self, string: bytes) -> GenericMetadata:
tree = ET.ElementTree(ET.fromstring(string))
return self.convert_xml_to_metadata(tree)
def string_from_metadata(self, metadata, xml=None):
def string_from_metadata(self, metadata: GenericMetadata, xml: bytes = b"") -> str:
tree = self.convert_metadata_to_xml(self, metadata, xml)
tree_str = ET.tostring(tree.getroot(), encoding="utf-8", xml_declaration=True).decode()
return tree_str
tree_str = ET.tostring(tree.getroot(), encoding="utf-8", xml_declaration=True).decode("utf-8")
return str(tree_str)
def convert_metadata_to_xml(self, filename, metadata, xml=None):
def convert_metadata_to_xml(
self, filename: "ComicInfoXml", metadata: GenericMetadata, xml: bytes = b""
) -> ElementTree:
# shorthand for the metadata
md = metadata
@ -69,7 +74,7 @@ class ComicInfoXml:
root.attrib["xmlns:xsd"] = "http://www.w3.org/2001/XMLSchema"
# helper func
def assign(cix_entry, md_entry):
def assign(cix_entry: str, md_entry: Any) -> None:
if md_entry is not None and md_entry:
et_entry = root.find(cix_entry)
if et_entry is not None:
@ -171,11 +176,8 @@ class ComicInfoXml:
pages_node = ET.SubElement(root, "Pages")
for page_dict in md.pages:
page = page_dict
if "Image" in page:
page["Image"] = str(page["Image"])
page_node = ET.SubElement(pages_node, "Page")
page_node.attrib = dict(sorted(page_dict.items()))
page_node.attrib = OrderedDict(sorted((k, str(v)) for k, v in page_dict.items()))
ET.indent(root)
@ -183,14 +185,14 @@ class ComicInfoXml:
tree = ET.ElementTree(root)
return tree
def convert_xml_to_metadata(self, tree):
def convert_xml_to_metadata(self, tree: ElementTree) -> GenericMetadata:
root = tree.getroot()
if root.tag != "ComicInfo":
raise "1"
raise Exception("Not a ComicInfo file")
def get(name):
def get(name: str) -> Optional[str]:
tag = root.find(name)
if tag is None:
return None
@ -256,20 +258,21 @@ class ComicInfoXml:
pages_node = root.find("Pages")
if pages_node is not None:
for page in pages_node:
if "Image" in page.attrib:
page.attrib["Image"] = int(page.attrib["Image"])
md.pages.append(page.attrib)
p: dict[str, Any] = page.attrib
if "Image" in p:
p["Image"] = int(p["Image"])
md.pages.append(cast(ImageMetadata, p))
md.is_empty = False
return md
def write_to_external_file(self, filename, metadata, xml=None):
def write_to_external_file(self, filename: str, metadata: GenericMetadata, xml: bytes = b"") -> None:
tree = self.convert_metadata_to_xml(self, metadata, xml)
tree.write(filename, encoding="utf-8", xml_declaration=True)
def read_from_external_file(self, filename):
def read_from_external_file(self, filename: str) -> GenericMetadata:
tree = ET.parse(filename)
return self.convert_xml_to_metadata(tree)

View File

@ -2,6 +2,7 @@ import calendar
import os
import unicodedata
from enum import Enum, auto
from typing import Any, Callable, Optional, Set
class ItemType(Enum):
@ -73,26 +74,26 @@ key = {
class Item:
def __init__(self, typ: ItemType, pos: int, val: str):
def __init__(self, typ: ItemType, pos: int, val: str) -> None:
self.typ: ItemType = typ
self.pos: int = pos
self.val: str = val
def __repr__(self):
def __repr__(self) -> str:
return f"{self.val}: index: {self.pos}: {self.typ}"
class Lexer:
def __init__(self, string):
def __init__(self, string: str) -> None:
self.input: str = string # The string being scanned
self.state = None # The next lexing function to enter
self.state: Optional[Callable[[Lexer], Optional[Callable]]] = None # The next lexing function to enter
self.pos: int = -1 # Current position in the input
self.start: int = 0 # Start position of this item
self.lastPos: int = 0 # Position of most recent item returned by nextItem
self.paren_depth: int = 0 # Nesting depth of ( ) exprs
self.brace_depth: int = 0 # Nesting depth of { }
self.sbrace_depth: int = 0 # Nesting depth of [ ]
self.items = []
self.items: list[Item] = []
# Next returns the next rune in the input.
def get(self) -> str:
@ -110,20 +111,20 @@ class Lexer:
return self.input[self.pos + 1]
def backup(self):
def backup(self) -> None:
self.pos -= 1
# Emit passes an item back to the client.
def emit(self, t: ItemType):
def emit(self, t: ItemType) -> None:
self.items.append(Item(t, self.start, self.input[self.start : self.pos + 1]))
self.start = self.pos + 1
# Ignore skips over the pending input before this point.
def ignore(self):
def ignore(self) -> None:
self.start = self.pos
# Accept consumes the next rune if it's from the valid se:
def accept(self, valid: str):
def accept(self, valid: str) -> bool:
if self.get() in valid:
return True
@ -131,17 +132,12 @@ class Lexer:
return False
# AcceptRun consumes a run of runes from the valid set.
def accept_run(self, valid: str):
def accept_run(self, valid: str) -> None:
while self.get() in valid:
pass
self.backup()
# Errorf returns an error token and terminates the scan by passing
# Back a nil pointer that will be the next state, terminating self.nextItem.
def errorf(self, message: str):
self.items.append(Item(ItemType.Error, self.start, message))
# NextItem returns the next item from the input.
# Called by the parser, not in the lexing goroutine.
# def next_item(self) -> Item:
@ -149,7 +145,7 @@ class Lexer:
# self.lastPos = item.pos
# return item
def scan_number(self):
def scan_number(self) -> bool:
digits = "0123456789"
self.accept_run(digits)
@ -171,21 +167,28 @@ class Lexer:
return True
# Runs the state machine for the lexer.
def run(self):
def run(self) -> None:
self.state = lex_filename
while self.state is not None:
self.state = self.state(self)
# Errorf returns an error token and terminates the scan by passing
# Back a nil pointer that will be the next state, terminating self.nextItem.
def errorf(lex: Lexer, message: str) -> Optional[Callable[[Lexer], Optional[Callable]]]:
lex.items.append(Item(ItemType.Error, lex.start, message))
return None
# Scans the elements inside action delimiters.
def lex_filename(lex: Lexer):
def lex_filename(lex: Lexer) -> Optional[Callable[[Lexer], Optional[Callable]]]:
r = lex.get()
if r == eof:
if lex.paren_depth != 0:
return lex.errorf("unclosed left paren")
return errorf(lex, "unclosed left paren")
if lex.brace_depth != 0:
return lex.errorf("unclosed left paren")
return errorf(lex, "unclosed left paren")
lex.emit(ItemType.EOF)
return None
elif is_space(r):
@ -230,7 +233,7 @@ def lex_filename(lex: Lexer):
lex.emit(ItemType.RightParen)
lex.paren_depth -= 1
if lex.paren_depth < 0:
return lex.errorf("unexpected right paren " + r)
return errorf(lex, "unexpected right paren " + r)
elif r == "{":
lex.emit(ItemType.LeftBrace)
@ -239,7 +242,7 @@ def lex_filename(lex: Lexer):
lex.emit(ItemType.RightBrace)
lex.brace_depth -= 1
if lex.brace_depth < 0:
return lex.errorf("unexpected right brace " + r)
return errorf(lex, "unexpected right brace " + r)
elif r == "[":
lex.emit(ItemType.LeftSBrace)
@ -248,17 +251,17 @@ def lex_filename(lex: Lexer):
lex.emit(ItemType.RightSBrace)
lex.sbrace_depth -= 1
if lex.sbrace_depth < 0:
return lex.errorf("unexpected right brace " + r)
return errorf(lex, "unexpected right brace " + r)
elif is_symbol(r):
# L.backup()
lex.emit(ItemType.Symbol)
else:
return lex.errorf("unrecognized character in action: " + r)
return errorf(lex, "unrecognized character in action: " + r)
return lex_filename
def lex_operator(lex: Lexer):
def lex_operator(lex: Lexer) -> Callable:
lex.accept_run("-|:;")
lex.emit(ItemType.Operator)
return lex_filename
@ -266,7 +269,7 @@ def lex_operator(lex: Lexer):
# LexSpace scans a run of space characters.
# One space has already been seen.
def lex_space(lex: Lexer):
def lex_space(lex: Lexer) -> Callable:
while is_space(lex.peek()):
lex.get()
@ -275,7 +278,7 @@ def lex_space(lex: Lexer):
# Lex_text scans an alphanumeric.
def lex_text(lex: Lexer):
def lex_text(lex: Lexer) -> Callable:
while True:
r = lex.get()
if is_alpha_numeric(r):
@ -306,7 +309,7 @@ def lex_text(lex: Lexer):
return lex_filename
def cal(value: str):
def cal(value: str) -> Set[Any]:
month_abbr = [i for i, x in enumerate(calendar.month_abbr) if x == value.title()]
month_name = [i for i, x in enumerate(calendar.month_name) if x == value.title()]
day_abbr = [i for i, x in enumerate(calendar.day_abbr) if x == value.title()]
@ -314,9 +317,9 @@ def cal(value: str):
return set(month_abbr + month_name + day_abbr + day_name)
def lex_number(lex: Lexer):
def lex_number(lex: Lexer) -> Optional[Callable[[Lexer], Optional[Callable]]]:
if not lex.scan_number():
return lex.errorf("bad number syntax: " + lex.input[lex.start : lex.pos])
return errorf(lex, "bad number syntax: " + lex.input[lex.start : lex.pos])
# Complex number logic removed. Messes with math operations without space
if lex.input[lex.start] == "#":
@ -330,24 +333,24 @@ def lex_number(lex: Lexer):
return lex_filename
def is_space(character: str):
def is_space(character: str) -> bool:
return character in "_ \t"
# IsAlphaNumeric reports whether r is an alphabetic, digit, or underscore.
def is_alpha_numeric(character: str):
def is_alpha_numeric(character: str) -> bool:
return character.isalpha() or character.isnumeric()
def is_operator(character: str):
def is_operator(character: str) -> bool:
return character in "-|:;/\\"
def is_symbol(character: str):
def is_symbol(character: str) -> bool:
return unicodedata.category(character)[0] in "PS"
def Lex(filename: str):
def Lex(filename: str) -> Lexer:
lex = Lexer(string=os.path.basename(filename))
lex.run()
return lex

View File

@ -24,7 +24,7 @@ import logging
import os
import re
from operator import itemgetter
from typing import TypedDict
from typing import Callable, Match, Optional, TypedDict
from urllib.parse import unquote
from text2digits import text2digits
@ -38,7 +38,7 @@ logger = logging.getLogger(__name__)
class FileNameParser:
def __init__(self):
def __init__(self) -> None:
self.series = ""
self.volume = ""
self.year = ""
@ -46,10 +46,10 @@ class FileNameParser:
self.remainder = ""
self.issue = ""
def repl(self, m):
def repl(self, m: Match[str]) -> str:
return " " * len(m.group())
def fix_spaces(self, string, remove_dashes=True):
def fix_spaces(self, string: str, remove_dashes: bool = True) -> str:
if remove_dashes:
placeholders = [r"[-_]", r" +"]
else:
@ -58,7 +58,7 @@ class FileNameParser:
string = re.sub(ph, self.repl, string)
return string # .strip()
def get_issue_count(self, filename, issue_end):
def get_issue_count(self, filename: str, issue_end: int) -> str:
count = ""
filename = filename[issue_end:]
@ -79,7 +79,7 @@ class FileNameParser:
return count.lstrip("0")
def get_issue_number(self, filename):
def get_issue_number(self, filename: str) -> tuple[str, int, int]:
"""Returns a tuple of issue number string, and start and end indexes in the filename
(The indexes will be used to split the string up for further parsing)
"""
@ -161,7 +161,7 @@ class FileNameParser:
return issue, start, end
def get_series_name(self, filename, issue_start):
def get_series_name(self, filename: str, issue_start: int) -> tuple[str, str]:
"""Use the issue number string index to split the filename string"""
if issue_start != 0:
@ -223,7 +223,7 @@ class FileNameParser:
return series, volume.strip()
def get_year(self, filename, issue_end):
def get_year(self, filename: str, issue_end: int) -> str:
filename = filename[issue_end:]
@ -236,7 +236,7 @@ class FileNameParser:
year = re.sub(r"[^0-9]", "", year)
return year
def get_remainder(self, filename, year, count, volume, issue_end):
def get_remainder(self, filename: str, year: str, count: str, volume: str, issue_end: int) -> str:
"""Make a guess at where the the non-interesting stuff begins"""
remainder = ""
@ -261,7 +261,7 @@ class FileNameParser:
return remainder.strip()
def parse_filename(self, filename):
def parse_filename(self, filename: str) -> None:
# remove the path
filename = os.path.basename(filename)
@ -325,12 +325,12 @@ class Parser:
def __init__(
self,
lexer_result: list[filenamelexer.Item],
first_is_alt=False,
remove_c2c=False,
remove_fcbd=False,
remove_publisher=False,
):
self.state = None
first_is_alt: bool = False,
remove_c2c: bool = False,
remove_fcbd: bool = False,
remove_publisher: bool = False,
) -> None:
self.state: Optional[Callable[[Parser], Optional[Callable]]] = None
self.pos = -1
self.firstItem = True
@ -384,16 +384,16 @@ class Parser:
return self.input[self.pos - 1]
# Backup steps back one Item.
def backup(self):
def backup(self) -> None:
self.pos -= 1
def run(self):
def run(self) -> None:
self.state = parse
while self.state is not None:
self.state = self.state(self)
def parse(p: Parser):
def parse(p: Parser) -> Optional[Callable[[Parser], Optional[Callable]]]:
item: filenamelexer.Item = p.get()
# We're done, time to do final processing
@ -644,7 +644,7 @@ def parse(p: Parser):
# TODO: What about more esoteric numbers???
def parse_issue_number(p: Parser):
def parse_issue_number(p: Parser) -> Optional[Callable[[Parser], Optional[Callable]]]:
item = p.input[p.pos]
if "issue" in p.filename_info:
@ -677,7 +677,7 @@ def parse_issue_number(p: Parser):
return parse
def parse_series(p: Parser):
def parse_series(p: Parser) -> Optional[Callable[[Parser], Optional[Callable]]]:
item = p.input[p.pos]
series: list[list[filenamelexer.Item]] = [[]]
@ -812,7 +812,7 @@ def parse_series(p: Parser):
return parse
def resolve_year(p: Parser):
def resolve_year(p: Parser) -> None:
if len(p.year_candidates) > 0:
# Sort by likely_year boolean
p.year_candidates.sort(key=itemgetter(0))
@ -842,7 +842,7 @@ def resolve_year(p: Parser):
p.title_parts.remove(selected_year)
def parse_finish(p: Parser):
def parse_finish(p: Parser) -> Optional[Callable[[Parser], Optional[Callable]]]:
resolve_year(p)
# If we don't have an issue try to find it in the series
@ -924,13 +924,14 @@ def parse_finish(p: Parser):
"publisher",
]:
if s not in p.filename_info:
p.filename_info[s] = ""
p.filename_info[s] = "" # type: ignore
for s in ["fcbd", "c2c", "annual"]:
if s not in p.filename_info:
p.filename_info[s] = False
p.filename_info[s] = False # type: ignore
return None
def get_remainder(p: Parser):
def get_remainder(p: Parser) -> str:
remainder = ""
rem = []
@ -988,7 +989,7 @@ def get_remainder(p: Parser):
return remainder.strip()
def parse_info_specifier(p: Parser):
def parse_info_specifier(p: Parser) -> Optional[Callable[[Parser], Optional[Callable]]]:
item = p.input[p.pos]
index = p.pos
@ -1009,6 +1010,7 @@ def parse_info_specifier(p: Parser):
# 'of' is only special if it is inside a parenthesis.
elif item.val.lower() == "of":
i = get_number(p, index)
if i is not None:
if p.in_something > 0:
if p.issue_number_at is None:
# TODO: Figure out what to do here if it ever happens
@ -1034,7 +1036,6 @@ def parse_info_specifier(p: Parser):
pass
else:
# Lets 'The Wrath of Foobar-Man, Part 1 of 2' parse correctly as the title
if i is not None:
p.pos = [ind for ind, x in enumerate(p.input) if x == i][0]
if not p.in_something:
@ -1043,7 +1044,7 @@ def parse_info_specifier(p: Parser):
# Gets 03 in '03 of 6'
def get_number(p: Parser, index: int):
def get_number(p: Parser, index: int) -> Optional[filenamelexer.Item]:
# Go backward through the filename to see if we can find what this is of eg '03 (of 6)' or '008 title 03 (of 6)'
rev = p.input[:index]
rev.reverse()
@ -1064,7 +1065,7 @@ def get_number(p: Parser, index: int):
return None
def join_title(lst: list[filenamelexer.Item]):
def join_title(lst: list[filenamelexer.Item]) -> str:
title = ""
for i, item in enumerate(lst):
if i + 1 == len(lst) and item.val == ",": # We ignore commas on the end
@ -1094,11 +1095,11 @@ def join_title(lst: list[filenamelexer.Item]):
def Parse(
lexer_result: list[filenamelexer.Item],
first_is_alt=False,
remove_c2c=False,
remove_fcbd=False,
remove_publisher=False,
):
first_is_alt: bool = False,
remove_c2c: bool = False,
remove_fcbd: bool = False,
remove_publisher: bool = False,
) -> Parser:
p = Parser(
lexer_result=lexer_result,
first_is_alt=first_is_alt,

View File

@ -21,7 +21,7 @@ possible, however lossy it might be
# limitations under the License.
import logging
from typing import List, TypedDict
from typing import Any, List, Optional, TypedDict
from comicapi import utils
@ -73,76 +73,75 @@ class GenericMetadata:
cover_synonyms = ["cover", "covers", "coverartist", "cover artist"]
editor_synonyms = ["editor"]
def __init__(self):
def __init__(self) -> None:
self.is_empty = True
self.tag_origin = None
self.is_empty: bool = True
self.tag_origin: Optional[str] = None
self.series = None
self.issue = None
self.title = None
self.publisher = None
self.month = None
self.year = None
self.day = None
self.issue_count = None
self.volume = None
self.genre = None
self.language = None # 2 letter iso code
self.comments = None # use same way as Summary in CIX
self.series: Optional[str] = None
self.issue: Optional[str] = None
self.title: Optional[str] = None
self.publisher: Optional[str] = None
self.month: Optional[int] = None
self.year: Optional[int] = None
self.day: Optional[int] = None
self.issue_count: Optional[int] = None
self.volume: Optional[int] = None
self.genre: Optional[str] = None
self.language: Optional[str] = None # 2 letter iso code
self.comments: Optional[str] = None # use same way as Summary in CIX
self.volume_count = None
self.critical_rating = None
self.country = None
self.volume_count: Optional[int] = None
self.critical_rating: Optional[str] = None
self.country: Optional[str] = None
self.alternate_series = None
self.alternate_number = None
self.alternate_count = None
self.imprint = None
self.notes = None
self.web_link = None
self.format = None
self.manga = None
self.black_and_white = None
self.page_count = None
self.maturity_rating = None
self.community_rating = None
self.alternate_series: Optional[str] = None
self.alternate_number: Optional[str] = None
self.alternate_count: Optional[int] = None
self.imprint: Optional[str] = None
self.notes: Optional[str] = None
self.web_link: Optional[str] = None
self.format: Optional[str] = None
self.manga: Optional[str] = None
self.black_and_white: Optional[bool] = None
self.page_count: Optional[int] = None
self.maturity_rating: Optional[str] = None
self.community_rating: Optional[str] = None
self.story_arc = None
self.series_group = None
self.scan_info = None
self.story_arc: Optional[str] = None
self.series_group: Optional[str] = None
self.scan_info: Optional[str] = None
self.characters = None
self.teams = None
self.locations = None
self.characters: Optional[str] = None
self.teams: Optional[str] = None
self.locations: Optional[str] = None
self.credits: List[CreditMetadata] = []
self.tags: List[str] = []
self.pages: List[ImageMetadata] = []
# Some CoMet-only items
self.price = None
self.is_version_of = None
self.rights = None
self.identifier = None
self.last_mark = None
self.cover_image = None
self.price: Optional[str] = None
self.is_version_of: Optional[str] = None
self.rights: Optional[str] = None
self.identifier: Optional[str] = None
self.last_mark: Optional[str] = None
self.cover_image: Optional[str] = None
def overlay(self, new_md):
def overlay(self, new_md: "GenericMetadata") -> None:
"""Overlay a metadata object on this one
That is, when the new object has non-None values, over-write them
to this one.
"""
def assign(cur, new):
def assign(cur: str, new: Any) -> None:
if new is not None:
if isinstance(new, str) and len(new) == 0:
setattr(self, cur, None)
else:
setattr(self, cur, new)
new_md: GenericMetadata
if not new_md.is_empty:
self.is_empty = False
@ -199,7 +198,7 @@ class GenericMetadata:
if len(new_md.pages) > 0:
assign("pages", new_md.pages)
def overlay_credits(self, new_credits):
def overlay_credits(self, new_credits: List[CreditMetadata]) -> None:
for c in new_credits:
primary = bool("primary" in c and c["primary"])
@ -212,7 +211,7 @@ class GenericMetadata:
else:
self.add_credit(c["person"], c["role"], primary)
def set_default_page_list(self, count):
def set_default_page_list(self, count: int) -> None:
# generate a default page list, with the first page marked as the cover
for i in range(count):
page_dict = ImageMetadata(Image=i)
@ -220,7 +219,7 @@ class GenericMetadata:
page_dict["Type"] = PageType.FrontCover
self.pages.append(page_dict)
def get_archive_page_index(self, pagenum):
def get_archive_page_index(self, pagenum: int) -> int:
# convert the displayed page number to the page index of the file in
# the archive
if pagenum < len(self.pages):
@ -228,7 +227,7 @@ class GenericMetadata:
return 0
def get_cover_page_index_list(self):
def get_cover_page_index_list(self) -> list[int]:
# return a list of archive page indices of cover pages
coverlist = []
for p in self.pages:
@ -240,7 +239,7 @@ class GenericMetadata:
return coverlist
def add_credit(self, person, role, primary=False):
def add_credit(self, person: str, role: str, primary: bool = False) -> None:
credit: CreditMetadata = {"person": person, "role": role, "primary": primary}
@ -256,7 +255,7 @@ class GenericMetadata:
if not found:
self.credits.append(credit)
def get_primary_credit(self, role):
def get_primary_credit(self, role: str) -> str:
primary = ""
for credit in self.credits:
if (primary == "" and credit["role"].lower() == role.lower()) or (
@ -265,16 +264,16 @@ class GenericMetadata:
primary = credit["person"]
return primary
def __str__(self):
vals = []
def __str__(self) -> str:
vals: list[tuple[str, Any]] = []
if self.is_empty:
return "No metadata"
def add_string(tag, val):
def add_string(tag: str, val: Any) -> None:
if val is not None and str(val) != "":
vals.append((tag, val))
def add_attr_string(tag):
def add_attr_string(tag: str) -> None:
add_string(tag, getattr(self, tag))
add_attr_string("series")

View File

@ -22,12 +22,13 @@ comics industry throws at us.
import logging
import unicodedata
from typing import Optional
logger = logging.getLogger(__name__)
class IssueString:
def __init__(self, text):
def __init__(self, text: Optional[str]) -> None:
# break up the issue number string into 2 parts: the numeric and suffix string.
# (assumes that the numeric portion is always first)
@ -84,7 +85,7 @@ class IssueString:
else:
self.suffix = text
def as_string(self, pad=0):
def as_string(self, pad: int = 0) -> str:
# return the float, left side zero-padded, with suffix attached
if self.num is None:
return self.suffix
@ -112,7 +113,7 @@ class IssueString:
return num_s
def as_float(self):
def as_float(self) -> Optional[float]:
# return the float, with no suffix
if len(self.suffix) == 1 and self.suffix.isnumeric():
return (self.num or 0) + unicodedata.numeric(self.suffix)
@ -120,7 +121,7 @@ class IssueString:
return 0.5
return self.num
def as_int(self):
def as_int(self) -> Optional[int]:
# return the int version of the float
if self.num is None:
return None

View File

@ -14,15 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import codecs
import locale
import logging
import os
import platform
import re
import sys
import unicodedata
from collections import defaultdict
from typing import Any, List, Optional, Union
import pycountry
@ -33,26 +32,26 @@ class UtilsVars:
already_fixed_encoding = False
def get_actual_preferred_encoding():
def get_actual_preferred_encoding() -> str:
preferred_encoding = locale.getpreferredencoding()
if platform.system() == "Darwin":
preferred_encoding = "utf-8"
return preferred_encoding
def fix_output_encoding():
if not UtilsVars.already_fixed_encoding:
# this reads the environment and inits the right locale
locale.setlocale(locale.LC_ALL, "")
# def fix_output_encoding() -> None:
# if not UtilsVars.already_fixed_encoding:
# # this reads the environment and inits the right locale
# locale.setlocale(locale.LC_ALL, "")
# try to make stdout/stderr encodings happy for unicode printing
preferred_encoding = get_actual_preferred_encoding()
sys.stdout = codecs.getwriter(preferred_encoding)(sys.stdout)
sys.stderr = codecs.getwriter(preferred_encoding)(sys.stderr)
UtilsVars.already_fixed_encoding = True
# # try to make stdout/stderr encodings happy for unicode printing
# preferred_encoding = get_actual_preferred_encoding()
# sys.stdout = codecs.getwriter(preferred_encoding)(sys.stdout)
# sys.stderr = codecs.getwriter(preferred_encoding)(sys.stderr)
# UtilsVars.already_fixed_encoding = True
def get_recursive_filelist(pathlist):
def get_recursive_filelist(pathlist: List[str]) -> List[str]:
"""Get a recursive list of of all files under all path items in the list"""
filelist = []
@ -75,7 +74,7 @@ def get_recursive_filelist(pathlist):
return filelist
def list_to_string(lst):
def list_to_string(lst: List[Union[str, Any]]) -> str:
string = ""
if lst is not None:
for item in lst:
@ -85,7 +84,7 @@ def list_to_string(lst):
return string
def add_to_path(dirname):
def add_to_path(dirname: str) -> None:
if dirname is not None and dirname != "":
# verify that path doesn't already contain the given dirname
@ -97,10 +96,10 @@ def add_to_path(dirname):
os.environ["PATH"] = dirname + os.pathsep + os.environ["PATH"]
def which(program):
def which(program: str) -> Optional[str]:
"""Returns path of the executable, if it exists"""
def is_exe(fpath):
def is_exe(fpath: str) -> bool:
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
fpath, _ = os.path.split(program)
@ -116,7 +115,7 @@ def which(program):
return None
def xlate(data, is_int=False):
def xlate(data: Any, is_int: bool = False) -> Any:
if data is None or data == "":
return None
if is_int:
@ -130,7 +129,7 @@ def xlate(data, is_int=False):
return str(data)
def remove_articles(text):
def remove_articles(text: str) -> str:
text = text.lower()
articles = [
"&",
@ -168,7 +167,7 @@ def remove_articles(text):
return new_text
def sanitize_title(text):
def sanitize_title(text: str) -> str:
# normalize unicode and convert to ascii. Does not work for everything eg ½ to 12 not 1/2
# this will probably cause issues with titles in other character sets e.g. chinese, japanese
text = unicodedata.normalize("NFKD", text).encode("ascii", "ignore").decode("ascii")
@ -183,7 +182,7 @@ def sanitize_title(text):
return text
def unique_file(file_name):
def unique_file(file_name: str) -> str:
counter = 1
file_name_parts = os.path.splitext(file_name)
while True:
@ -193,9 +192,9 @@ def unique_file(file_name):
counter += 1
languages = defaultdict(lambda: None)
languages: dict[Optional[str], Optional[str]] = defaultdict(lambda: None)
countries = defaultdict(lambda: None)
countries: dict[Optional[str], Optional[str]] = defaultdict(lambda: None)
for c in pycountry.countries:
if "alpha_2" in c._fields:
@ -206,11 +205,11 @@ for lng in pycountry.languages:
languages[lng.alpha_2] = lng.name
def get_language_from_iso(iso: str):
def get_language_from_iso(iso: Optional[str]) -> Optional[str]:
return languages[iso]
def get_language(string):
def get_language(string: Optional[str]) -> Optional[str]:
if string is None:
return None
@ -218,7 +217,7 @@ def get_language(string):
if lang is None:
try:
return pycountry.languages.lookup(string).name
return str(pycountry.languages.lookup(string).name)
except:
return None
return lang