From 2942c64bae9d8c0ab67147766cc5d4258ace055e Mon Sep 17 00:00:00 2001 From: lordwelch Date: Tue, 17 May 2022 13:57:04 -0700 Subject: [PATCH] Typed --- comicapi/comet.py | 70 +++--- comicapi/comicarchive.py | 423 ++++++++++++++++++------------------ comicapi/comicbookinfo.py | 77 +++++-- comicapi/comicinfoxml.py | 43 ++-- comicapi/filenamelexer.py | 73 ++++--- comicapi/filenameparser.py | 115 +++++----- comicapi/genericmetadata.py | 111 +++++----- comicapi/issuestring.py | 9 +- comicapi/utils.py | 51 +++-- 9 files changed, 515 insertions(+), 457 deletions(-) diff --git a/comicapi/comet.py b/comicapi/comet.py index f823374..686cc2e 100644 --- a/comicapi/comet.py +++ b/comicapi/comet.py @@ -16,6 +16,7 @@ import logging import xml.etree.ElementTree as ET +from typing import Any from comicapi import utils from comicapi.genericmetadata import GenericMetadata @@ -33,19 +34,16 @@ class CoMet: cover_synonyms = ["cover", "covers", "coverartist", "cover artist"] editor_synonyms = ["editor"] - def metadata_from_string(self, string): + def metadata_from_string(self, string: str) -> GenericMetadata: tree = ET.ElementTree(ET.fromstring(string)) return self.convert_xml_to_metadata(tree) - def string_from_metadata(self, metadata): - - header = '\n' - + def string_from_metadata(self, metadata: GenericMetadata) -> str: tree = self.convert_metadata_to_xml(metadata) - return header + ET.tostring(tree.getroot()) + return str(ET.tostring(tree.getroot(), encoding="utf-8", xml_declaration=True).decode("utf-8")) - def convert_metadata_to_xml(self, metadata): + def convert_metadata_to_xml(self, metadata: GenericMetadata) -> ET.ElementTree: # shorthand for the metadata md = metadata @@ -57,7 +55,7 @@ class CoMet: root.attrib["xsi:schemaLocation"] = "http://www.denvog.com http://www.denvog.com/comet/comet.xsd" # helper func - def assign(comet_entry, md_entry): + def assign(comet_entry: str, md_entry: Any) -> None: if md_entry is not None: ET.SubElement(root, comet_entry).text = str(md_entry) @@ -127,41 +125,41 @@ class CoMet: tree = ET.ElementTree(root) return tree - def convert_xml_to_metadata(self, tree): + def convert_xml_to_metadata(self, tree: ET.ElementTree) -> GenericMetadata: root = tree.getroot() if root.tag != "comet": - raise "1" + raise Exception("Not a CoMet file") metadata = GenericMetadata() md = metadata # Helper function - def xlate(tag): + def get(tag: str) -> Any: node = root.find(tag) if node is not None: return node.text return None - md.series = xlate("series") - md.title = xlate("title") - md.issue = xlate("issue") - md.volume = xlate("volume") - md.comments = xlate("description") - md.publisher = xlate("publisher") - md.language = xlate("language") - md.format = xlate("format") - md.page_count = xlate("pages") - md.maturity_rating = xlate("rating") - md.price = xlate("price") - md.is_version_of = xlate("isVersionOf") - md.rights = xlate("rights") - md.identifier = xlate("identifier") - md.last_mark = xlate("lastMark") - md.genre = xlate("genre") # TODO - repeatable field + md.series = get("series") + md.title = get("title") + md.issue = get("issue") + md.volume = get("volume") + md.comments = get("description") + md.publisher = get("publisher") + md.language = get("language") + md.format = get("format") + md.page_count = get("pages") + md.maturity_rating = get("rating") + md.price = get("price") + md.is_version_of = get("isVersionOf") + md.rights = get("rights") + md.identifier = get("identifier") + md.last_mark = get("lastMark") + md.genre = get("genre") # TODO - repeatable field - date = xlate("date") + date = get("date") if date is not None: parts = date.split("-") if len(parts) > 0: @@ -169,9 +167,9 @@ class CoMet: if len(parts) > 1: md.month = parts[1] - md.cover_image = xlate("coverImage") + md.cover_image = get("coverImage") - reading_direction = xlate("readingDirection") + reading_direction = get("readingDirection") if reading_direction is not None and reading_direction == "rtl": md.manga = "YesAndRightToLeft" @@ -179,7 +177,7 @@ class CoMet: char_list = [] for n in root: if n.tag == "character": - char_list.append(n.text.strip()) + char_list.append((n.text or "").strip()) md.characters = utils.list_to_string(char_list) # Now extract the credit info @@ -194,17 +192,17 @@ class CoMet: n.tag == "editor", ] ): - metadata.add_credit(n.text.strip(), n.tag.title()) + metadata.add_credit((n.text or "").strip(), n.tag.title()) if n.tag == "coverDesigner": - metadata.add_credit(n.text.strip(), "Cover") + metadata.add_credit((n.text or "").strip(), "Cover") metadata.is_empty = False return metadata # verify that the string actually contains CoMet data in XML format - def validate_string(self, string): + def validate_string(self, string: str) -> bool: try: tree = ET.ElementTree(ET.fromstring(string)) root = tree.getroot() @@ -215,12 +213,12 @@ class CoMet: return True - def write_to_external_file(self, filename, metadata): + def write_to_external_file(self, filename: str, metadata: GenericMetadata) -> None: tree = self.convert_metadata_to_xml(metadata) tree.write(filename, encoding="utf-8") - def read_from_external_file(self, filename): + def read_from_external_file(self, filename: str) -> GenericMetadata: tree = ET.parse(filename) return self.convert_xml_to_metadata(tree) diff --git a/comicapi/comicarchive.py b/comicapi/comicarchive.py index b4cc108..3842d21 100644 --- a/comicapi/comicarchive.py +++ b/comicapi/comicarchive.py @@ -17,6 +17,7 @@ import io import logging import os +import pathlib import platform import struct import subprocess @@ -42,7 +43,9 @@ try: except ImportError: pil_available = False -from comicapi import filenamelexer, filenameparser +from typing import List, Optional, Union, cast + +from comicapi import filenamelexer, filenameparser, utils from comicapi.comet import CoMet from comicapi.comicbookinfo import ComicBookInfo from comicapi.comicinfoxml import ComicInfoXml @@ -62,22 +65,48 @@ class MetaDataStyle: name = ["ComicBookLover", "ComicRack", "CoMet"] -class SevenZipArchiver: +class UnknownArchiver: + + """Unknown implementation""" + + def __init__(self, path: Union[pathlib.Path, str]) -> None: + self.path = path + + def get_comment(self) -> str: + return "" + + def set_comment(self, comment: str) -> bool: + return False + + def read_file(self, archive_file: str) -> Optional[bytes]: + return None + + def write_file(self, archive_file: str, data: bytes) -> bool: + return False + + def remove_file(self, archive_file: str) -> bool: + return False + + def get_filename_list(self) -> list[str]: + return [] + + +class SevenZipArchiver(UnknownArchiver): """7Z implementation""" - def __init__(self, path): - self.path = path + def __init__(self, path: Union[pathlib.Path, str]) -> None: + self.path = pathlib.Path(path) # @todo: Implement Comment? - def get_comment(self): + def get_comment(self) -> str: return "" - def set_comment(self, comment): + def set_comment(self, comment: str) -> bool: return False - def read_file(self, archive_file): - data = "" + def read_file(self, archive_file: str) -> bytes: + data = bytes() try: with py7zr.SevenZipFile(self.path, "r") as zf: data = zf.read(archive_file)[archive_file].read() @@ -90,7 +119,7 @@ class SevenZipArchiver: return data - def remove_file(self, archive_file): + def remove_file(self, archive_file: str) -> bool: try: self.rebuild_zip_file([archive_file]) except: @@ -99,7 +128,7 @@ class SevenZipArchiver: else: return True - def write_file(self, archive_file, data): + def write_file(self, archive_file: str, data: bytes) -> bool: # At the moment, no other option but to rebuild the whole # zip archive w/o the indicated file. Very sucky, but maybe # another solution can be found @@ -116,17 +145,17 @@ class SevenZipArchiver: logger.exception("Writing zip file failed") return False - def get_filename_list(self): + def get_filename_list(self) -> list[str]: try: with py7zr.SevenZipFile(self.path, "r") as zf: - namelist = zf.getnames() + namelist: list[str] = zf.getnames() return namelist except Exception as e: logger.error("Unable to get 7zip file list [%s]: %s", e, self.path) return [] - def rebuild_zip_file(self, exclude_list): + def rebuild_zip_file(self, exclude_list: list[str]) -> None: """Zip helper func This recompresses the zip archive, without the files in the exclude_list @@ -148,7 +177,7 @@ class SevenZipArchiver: os.remove(self.path) os.rename(tmp_name, self.path) - def copy_from_archive(self, otherArchive): + def copy_from_archive(self, otherArchive: UnknownArchiver) -> bool: """Replace the current zip with one copied from another archive""" try: with py7zr.SevenZipFile(self.path, "w") as zout: @@ -163,26 +192,25 @@ class SevenZipArchiver: return True -class ZipArchiver: +class ZipArchiver(UnknownArchiver): """ZIP implementation""" - def __init__(self, path): - self.path = path + def __init__(self, path: Union[pathlib.Path, str]) -> None: + self.path = pathlib.Path(path) - def get_comment(self): + def get_comment(self) -> str: with zipfile.ZipFile(self.path, "r") as zf: - comment = zf.comment + comment = zf.comment.decode("utf-8") return comment - def set_comment(self, comment): + def set_comment(self, comment: str) -> bool: with zipfile.ZipFile(self.path, "a") as zf: zf.comment = bytes(comment, "utf-8") return True - def read_file(self, archive_file): + def read_file(self, archive_file: str) -> bytes: with zipfile.ZipFile(self.path, "r") as zf: - try: data = zf.read(archive_file) except zipfile.BadZipfile as e: @@ -193,7 +221,7 @@ class ZipArchiver: raise IOError from e return data - def remove_file(self, archive_file): + def remove_file(self, archive_file: str) -> bool: try: self.rebuild_zip_file([archive_file]) except: @@ -202,7 +230,7 @@ class ZipArchiver: else: return True - def write_file(self, archive_file, data): + def write_file(self, archive_file: str, data: bytes) -> bool: # At the moment, no other option but to rebuild the whole # zip archive w/o the indicated file. Very sucky, but maybe # another solution can be found @@ -219,7 +247,7 @@ class ZipArchiver: logger.error("writing zip file failed [%s]: %s", e, self.path) return False - def get_filename_list(self): + def get_filename_list(self) -> List[str]: try: with zipfile.ZipFile(self.path, "r") as zf: namelist = zf.namelist() @@ -228,7 +256,7 @@ class ZipArchiver: logger.error("Unable to get zipfile list [%s]: %s", e, self.path) return [] - def rebuild_zip_file(self, exclude_list): + def rebuild_zip_file(self, exclude_list: List[str]) -> None: """Zip helper func This recompresses the zip archive, without the files in the exclude_list @@ -253,7 +281,7 @@ class ZipArchiver: os.remove(self.path) os.rename(tmp_name, self.path) - def write_zip_comment(self, filename, comment): + def write_zip_comment(self, filename: Union[pathlib.Path, str], comment: str) -> bool: """ This is a custom function for writing a comment to a zip file, since the built-in one doesn't seem to work on Windows and Mac OS/X @@ -304,7 +332,7 @@ class ZipArchiver: fo.seek(pos + 2, 2) # write out the comment itself - fo.write(bytes(comment)) + fo.write(comment.encode("utf-8")) fo.truncate() else: raise Exception("Failed to write comment to zip file!") @@ -314,7 +342,7 @@ class ZipArchiver: else: return True - def copy_from_archive(self, other_archive): + def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: """Replace the current zip with one copied from another archive""" try: with zipfile.ZipFile(self.path, "w", allowZip64=True) as zout: @@ -335,13 +363,13 @@ class ZipArchiver: return True -class RarArchiver: +class RarArchiver(UnknownArchiver): """RAR implementation""" devnull = None - def __init__(self, path, rar_exe_path): - self.path = path + def __init__(self, path: Union[pathlib.Path, str], rar_exe_path: str) -> None: + self.path = pathlib.Path(path) self.rar_exe_path = rar_exe_path if RarArchiver.devnull is None: @@ -349,17 +377,17 @@ class RarArchiver: # windows only, keeps the cmd.exe from popping up if platform.system() == "Windows": - self.startupinfo = subprocess.STARTUPINFO() - self.startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW + self.startupinfo = subprocess.STARTUPINFO() # type: ignore + self.startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore else: self.startupinfo = None - def get_comment(self): + def get_comment(self) -> str: rarc = self.get_rar_obj() - return rarc.comment + return str(rarc.comment) if rarc else "" - def set_comment(self, comment): - if self.rar_exe_path is not None: + def set_comment(self, comment: str) -> bool: + if self.rar_exe_path: try: # write comment to temp file tmp_fd, tmp_name = tempfile.mkstemp() @@ -369,7 +397,7 @@ class RarArchiver: working_dir = os.path.dirname(os.path.abspath(self.path)) # use external program to write comment to Rar archive - proc_args = [self.rar_exe_path, "c", "-w" + working_dir, "-c-", "-z" + tmp_name, self.path] + proc_args = [self.rar_exe_path, "c", "-w" + working_dir, "-c-", "-z" + tmp_name, str(self.path)] subprocess.call( proc_args, startupinfo=self.startupinfo, @@ -389,15 +417,17 @@ class RarArchiver: else: return False - def read_file(self, archive_file): + def read_file(self, archive_file: str) -> bytes: rarc = self.get_rar_obj() + if rarc is None: + return bytes() tries = 0 while tries < 7: try: tries = tries + 1 - data = rarc.open(archive_file).read() + data: bytes = rarc.open(archive_file).read() entries = [(rarc.getinfo(archive_file), data)] if entries[0][0].file_size != len(entries[0][1]): @@ -430,9 +460,9 @@ class RarArchiver: raise IOError - def write_file(self, archive_file, data): + def write_file(self, archive_file: str, data: bytes) -> bool: - if self.rar_exe_path is not None: + if self.rar_exe_path: try: tmp_folder = tempfile.mkdtemp() @@ -467,8 +497,8 @@ class RarArchiver: else: return False - def remove_file(self, archive_file): - if self.rar_exe_path is not None: + def remove_file(self, archive_file: str) -> bool: + if self.rar_exe_path: try: # use external program to remove file from Rar archive subprocess.call( @@ -489,14 +519,14 @@ class RarArchiver: else: return False - def get_filename_list(self): + def get_filename_list(self) -> list[str]: rarc = self.get_rar_obj() tries = 0 # while tries < 7: + namelist = [] try: tries = tries + 1 - namelist = [] - for item in rarc.infolist(): + for item in rarc.infolist() if rarc else None: if item.file_size != 0: namelist.append(item.filename) @@ -504,45 +534,36 @@ class RarArchiver: logger.error(f"get_filename_list(): [{e}] {self.path} attempt #{tries}".format(str(e), self.path, tries)) time.sleep(1) - else: - # Success - return namelist + return namelist - return None - - def get_rar_obj(self): - tries = 0 + def get_rar_obj(self) -> Optional[rarfile.RarFile]: try: - tries = tries + 1 - rarc = rarfile.RarFile(self.path) - + rarc = rarfile.RarFile(str(self.path)) except (OSError, IOError) as e: - logger.error("getRARObj(): [%s] %s attempt #%s", e, self.path, tries) - time.sleep(1) - + logger.error("getRARObj(): [%s] %s", e, self.path) else: return rarc return None -class FolderArchiver: +class FolderArchiver(UnknownArchiver): """Folder implementation""" - def __init__(self, path): - self.path = path + def __init__(self, path: Union[pathlib.Path, str]) -> None: + self.path = pathlib.Path(path) self.comment_file_name = "ComicTaggerFolderComment.txt" - def get_comment(self): - return self.read_file(self.comment_file_name) + def get_comment(self) -> str: + return self.read_file(self.comment_file_name).decode("utf-8") - def set_comment(self, comment): - return self.write_file(self.comment_file_name, comment) + def set_comment(self, comment: str) -> bool: + return self.write_file(self.comment_file_name, comment.encode("utf-8")) - def read_file(self, archive_file): + def read_file(self, archive_file: str) -> bytes: - data = "" + data = bytes() fname = os.path.join(self.path, archive_file) try: with open(fname, "rb") as f: @@ -552,7 +573,7 @@ class FolderArchiver: return data - def write_file(self, archive_file, data): + def write_file(self, archive_file: str, data: bytes) -> bool: fname = os.path.join(self.path, archive_file) try: @@ -564,7 +585,7 @@ class FolderArchiver: else: return True - def remove_file(self, archive_file): + def remove_file(self, archive_file: str) -> bool: fname = os.path.join(self.path, archive_file) try: @@ -575,10 +596,10 @@ class FolderArchiver: else: return True - def get_filename_list(self): + def get_filename_list(self) -> list[str]: return self.list_files(self.path) - def list_files(self, folder): + def list_files(self, folder: Union[pathlib.Path, str]) -> list[str]: itemlist = [] @@ -590,49 +611,28 @@ class FolderArchiver: return itemlist -class UnknownArchiver: - - """Unknown implementation""" - - def __init__(self, path): - self.path = path - - def get_comment(self): - return "" - - def set_comment(self, comment): - return False - - def read_file(self, archive_file): - return "" - - def write_file(self, archive_file, data): - return False - - def remove_file(self, archive_file): - return False - - def get_filename_list(self): - return [] - - class ComicArchive: - logo_data = None + logo_data = bytes() class ArchiveType: SevenZip, Zip, Rar, Folder, Pdf, Unknown = list(range(6)) - def __init__(self, path, rar_exe_path=None, default_image_path=None): - self.cbi_md = None - self.cix_md = None - self.comet_filename = None - self.comet_md = None - self.has__cbi = None - self.has__cix = None - self.has__comet = None - self.path = path - self.page_count = None - self.page_list = None + def __init__( + self, + path: Union[pathlib.Path, str], + rar_exe_path: str = "", + default_image_path: Union[pathlib.Path, str, None] = None, + ) -> None: + self.cbi_md: Optional[GenericMetadata] = None + self.cix_md: Optional[GenericMetadata] = None + self.comet_filename: Optional[str] = None + self.comet_md: Optional[GenericMetadata] = None + self.has__cbi: Optional[bool] = None + self.has__cix: Optional[bool] = None + self.has__comet: Optional[bool] = None + self.path = pathlib.Path(path) + self.page_count: Optional[int] = None + self.page_list: list[str] = [] self.rar_exe_path = rar_exe_path self.ci_xml_filename = "ComicInfo.xml" @@ -641,7 +641,7 @@ class ComicArchive: self.default_image_path = default_image_path # Use file extension to decide which archive test we do first - ext = os.path.splitext(path)[1].lower() + ext = self.path.suffix self.archive_type = self.ArchiveType.Unknown self.archiver = UnknownArchiver(self.path) @@ -667,12 +667,13 @@ class ComicArchive: self.archive_type = self.ArchiveType.Rar self.archiver = RarArchiver(self.path, rar_exe_path=self.rar_exe_path) - if ComicArchive.logo_data is None: + if not ComicArchive.logo_data: fname = self.default_image_path - with open(fname, "rb") as fd: - ComicArchive.logo_data = fd.read() + if fname: + with open(fname, "rb") as fd: + ComicArchive.logo_data = fd.read() - def reset_cache(self): + def reset_cache(self) -> None: """Clears the cached data""" self.has__cix = None @@ -680,47 +681,47 @@ class ComicArchive: self.has__comet = None self.comet_filename = None self.page_count = None - self.page_list = None + self.page_list = [] self.cix_md = None self.cbi_md = None self.comet_md = None - def load_cache(self, style_list): + def load_cache(self, style_list: List[int]) -> None: for style in style_list: self.read_metadata(style) - def rename(self, path): - self.path = path - self.archiver.path = path + def rename(self, path: Union[pathlib.Path, str]) -> None: + self.path = pathlib.Path(path) + self.archiver.path = pathlib.Path(path) - def sevenzip_test(self): + def sevenzip_test(self) -> bool: return py7zr.is_7zfile(self.path) - def zip_test(self): + def zip_test(self) -> bool: return zipfile.is_zipfile(self.path) - def rar_test(self): + def rar_test(self) -> bool: try: - return rarfile.is_rarfile(self.path) + return bool(rarfile.is_rarfile(str(self.path))) except: return False - def is_sevenzip(self): + def is_sevenzip(self) -> bool: return self.archive_type == self.ArchiveType.SevenZip - def is_zip(self): + def is_zip(self) -> bool: return self.archive_type == self.ArchiveType.Zip - def is_rar(self): + def is_rar(self) -> bool: return self.archive_type == self.ArchiveType.Rar - def is_pdf(self): + def is_pdf(self) -> bool: return self.archive_type == self.ArchiveType.Pdf - def is_folder(self): + def is_folder(self) -> bool: return self.archive_type == self.ArchiveType.Folder - def is_writable(self, check_rar_status=True): + def is_writable(self, check_rar_status: bool = True) -> bool: if self.archive_type == self.ArchiveType.Unknown: return False @@ -730,27 +731,25 @@ class ComicArchive: if not os.access(self.path, os.W_OK): return False - if (self.archive_type != self.ArchiveType.Folder) and ( - not os.access(os.path.dirname(os.path.abspath(self.path)), os.W_OK) - ): + if (self.archive_type != self.ArchiveType.Folder) and (not os.access(self.path.parent, os.W_OK)): return False return True - def is_writable_for_style(self, data_style): + def is_writable_for_style(self, data_style: int) -> bool: if (self.is_rar() or self.is_sevenzip()) and data_style == MetaDataStyle.CBI: return False return self.is_writable() - def seems_to_be_a_comic_archive(self): + def seems_to_be_a_comic_archive(self) -> bool: if (self.is_zip() or self.is_rar() or self.is_sevenzip()) and (self.get_number_of_pages() > 0): return True return False - def read_metadata(self, style): + def read_metadata(self, style: int) -> GenericMetadata: if style == MetaDataStyle.CIX: return self.read_cix() @@ -760,8 +759,8 @@ class ComicArchive: return self.read_comet() return GenericMetadata() - def write_metadata(self, metadata, style): - retcode = None + def write_metadata(self, metadata: GenericMetadata, style: int) -> bool: + retcode = False if style == MetaDataStyle.CIX: retcode = self.write_cix(metadata) if style == MetaDataStyle.CBI: @@ -770,7 +769,7 @@ class ComicArchive: retcode = self.write_comet(metadata) return retcode - def has_metadata(self, style): + def has_metadata(self, style: int) -> bool: if style == MetaDataStyle.CIX: return self.has_cix() if style == MetaDataStyle.CBI: @@ -779,7 +778,7 @@ class ComicArchive: return self.has_comet() return False - def remove_metadata(self, style): + def remove_metadata(self, style: int) -> bool: retcode = True if style == MetaDataStyle.CIX: retcode = self.remove_cix() @@ -789,21 +788,21 @@ class ComicArchive: retcode = self.remove_co_met() return retcode - def get_page(self, index): - image_data = None + def get_page(self, index: int) -> bytes: + image_data = bytes() filename = self.get_page_name(index) - if filename is not None: + if filename: try: - image_data = self.archiver.read_file(filename) + image_data = self.archiver.read_file(filename) or bytes() except IOError: logger.exception("Error reading in page. Substituting logo page.") image_data = ComicArchive.logo_data return image_data - def get_page_name(self, index): + def get_page_name(self, index: int) -> str: if index is None: return None @@ -811,11 +810,11 @@ class ComicArchive: num_pages = len(page_list) if num_pages == 0 or index >= num_pages: - return None + return "" return page_list[index] - def get_scanner_page_index(self): + def get_scanner_page_index(self) -> Optional[int]: scanner_page_index = None # make a guess at the scanner page @@ -827,7 +826,7 @@ class ComicArchive: return None # count the length of every filename, and count occurrences - length_buckets = {} + length_buckets: dict[int, int] = {} for name in name_list: fname = os.path.split(name)[1] length = len(fname) @@ -863,15 +862,15 @@ class ComicArchive: return scanner_page_index - def get_page_name_list(self, sort_list=True): - if self.page_list is None: + def get_page_name_list(self, sort_list: bool = True) -> List[str]: + if not self.page_list: # get the list file names in the archive, and sort - files = self.archiver.get_filename_list() + files: list[str] = self.archiver.get_filename_list() # seems like some archive creators are on Windows, and don't know about case-sensitivity! if sort_list: - files = natsort.natsorted(files, alg=natsort.ns.IC | natsort.ns.I | natsort.ns.U) + files = cast(list[str], natsort.natsorted(files, alg=natsort.ns.IC | natsort.ns.I | natsort.ns.U)) # make a sub-list of image files self.page_list = [] @@ -884,30 +883,30 @@ class ComicArchive: return self.page_list - def get_number_of_pages(self): + def get_number_of_pages(self) -> int: if self.page_count is None: self.page_count = len(self.get_page_name_list()) return self.page_count - def read_cbi(self): + def read_cbi(self) -> GenericMetadata: if self.cbi_md is None: raw_cbi = self.read_raw_cbi() - if raw_cbi is None: - self.cbi_md = GenericMetadata() - else: + if raw_cbi: self.cbi_md = ComicBookInfo().metadata_from_string(raw_cbi) + else: + self.cbi_md = GenericMetadata() self.cbi_md.set_default_page_list(self.get_number_of_pages()) return self.cbi_md - def read_raw_cbi(self): + def read_raw_cbi(self) -> str: if not self.has_cbi(): - return None + return "" return self.archiver.get_comment() - def has_cbi(self): + def has_cbi(self) -> bool: if self.has__cbi is None: if not self.seems_to_be_a_comic_archive(): self.has__cbi = False @@ -917,7 +916,7 @@ class ComicArchive: return self.has__cbi - def write_cbi(self, metadata): + def write_cbi(self, metadata: GenericMetadata) -> bool: if metadata is not None: self.apply_archive_info_to_metadata(metadata) cbi_string = ComicBookInfo().string_from_metadata(metadata) @@ -930,7 +929,7 @@ class ComicArchive: return False - def remove_cbi(self): + def remove_cbi(self) -> bool: if self.has_cbi(): write_success = self.archiver.set_comment("") if write_success: @@ -940,13 +939,13 @@ class ComicArchive: return write_success return True - def read_cix(self): + def read_cix(self) -> GenericMetadata: if self.cix_md is None: raw_cix = self.read_raw_cix() - if raw_cix is None or raw_cix == "": - self.cix_md = GenericMetadata() - else: + if raw_cix: self.cix_md = ComicInfoXml().metadata_from_string(raw_cix) + else: + self.cix_md = GenericMetadata() # validate the existing page list (make sure count is correct) if len(self.cix_md.pages) != 0: @@ -960,22 +959,20 @@ class ComicArchive: return self.cix_md - def read_raw_cix(self): + def read_raw_cix(self) -> bytes: if not self.has_cix(): - return None + return b"" try: - raw_cix = self.archiver.read_file(self.ci_xml_filename) + raw_cix = self.archiver.read_file(self.ci_xml_filename) or b"" except IOError as e: logger.error("Error reading in raw CIX!: %s", e) - raw_cix = "" + raw_cix = bytes() return raw_cix - def write_cix(self, metadata): + def write_cix(self, metadata: GenericMetadata) -> bool: if metadata is not None: self.apply_archive_info_to_metadata(metadata, calc_page_sizes=True) raw_cix = self.read_raw_cix() - if raw_cix == "": - raw_cix = None cix_string = ComicInfoXml().string_from_metadata(metadata, xml=raw_cix) write_success = self.archiver.write_file(self.ci_xml_filename, cix_string.encode("utf-8")) if write_success: @@ -986,7 +983,7 @@ class ComicArchive: return False - def remove_cix(self): + def remove_cix(self) -> bool: if self.has_cix(): write_success = self.archiver.remove_file(self.ci_xml_filename) if write_success: @@ -996,7 +993,7 @@ class ComicArchive: return write_success return True - def has_cix(self): + def has_cix(self) -> bool: if self.has__cix is None: if not self.seems_to_be_a_comic_archive(): @@ -1007,7 +1004,7 @@ class ComicArchive: self.has__cix = False return self.has__cix - def read_comet(self): + def read_comet(self) -> GenericMetadata: if self.comet_md is None: raw_comet = self.read_raw_comet() if raw_comet is None or raw_comet == "": @@ -1031,19 +1028,21 @@ class ComicArchive: return self.comet_md - def read_raw_comet(self): + def read_raw_comet(self) -> str: + raw_comet = "" if not self.has_comet(): logger.info("%s doesn't have CoMet data!", self.path) - return None + raw_comet = "" try: - raw_comet = self.archiver.read_file(self.comet_filename) + raw_bytes = self.archiver.read_file(cast(str, self.comet_filename)) + if raw_bytes: + raw_comet = raw_bytes.decode("utf-8") except: logger.exception("Error reading in raw CoMet!") - raw_comet = "" return raw_comet - def write_comet(self, metadata): + def write_comet(self, metadata: GenericMetadata) -> bool: if metadata is not None: if not self.has_comet(): @@ -1056,7 +1055,7 @@ class ComicArchive: metadata.cover_image = self.get_page_name(cover_idx) comet_string = CoMet().string_from_metadata(metadata) - write_success = self.archiver.write_file(self.comet_filename, comet_string) + write_success = self.archiver.write_file(cast(str, self.comet_filename), comet_string.encode("utf-8")) if write_success: self.has__comet = True self.comet_md = metadata @@ -1065,9 +1064,9 @@ class ComicArchive: return False - def remove_co_met(self): + def remove_co_met(self) -> bool: if self.has_comet(): - write_success = self.archiver.remove_file(self.comet_filename) + write_success = self.archiver.remove_file(cast(str, self.comet_filename)) if write_success: self.has__comet = False self.comet_md = None @@ -1075,7 +1074,7 @@ class ComicArchive: return write_success return True - def has_comet(self): + def has_comet(self) -> bool: if self.has__comet is None: self.has__comet = False if not self.seems_to_be_a_comic_archive(): @@ -1086,9 +1085,11 @@ class ComicArchive: if os.path.dirname(n) == "" and os.path.splitext(n)[1].lower() == ".xml": # read in XML file, and validate it try: - data = self.archiver.read_file(n) - except Exception as e: data = "" + d = self.archiver.read_file(n) + if d: + data = d.decode("utf-8") + except Exception as e: logger.warning("Error reading in Comet XML for validation!: %s", e) if CoMet().validate_string(data): # since we found it, save it! @@ -1098,7 +1099,7 @@ class ComicArchive: return self.has__comet - def apply_archive_info_to_metadata(self, md, calc_page_sizes=False): + def apply_archive_info_to_metadata(self, md: GenericMetadata, calc_page_sizes: bool = False) -> None: md.page_count = self.get_number_of_pages() if calc_page_sizes: @@ -1107,7 +1108,7 @@ class ComicArchive: if pil_available: if "ImageSize" not in p or "ImageHeight" not in p or "ImageWidth" not in p: data = self.get_page(idx) - if data is not None: + if data: try: if isinstance(data, bytes): im = Image.open(io.BytesIO(data)) @@ -1128,44 +1129,48 @@ class ComicArchive: p["ImageSize"] = str(len(data)) def metadata_from_filename( - self, complicated_parser=False, remove_c2c=False, remove_fcbd=False, remove_publisher=False - ): + self, + complicated_parser: bool = False, + remove_c2c: bool = False, + remove_fcbd: bool = False, + remove_publisher: bool = False, + ) -> GenericMetadata: metadata = GenericMetadata() if complicated_parser: - lex = filenamelexer.Lex(self.path) + lex = filenamelexer.Lex(self.path.name) p = filenameparser.Parse( lex.items, remove_c2c=remove_c2c, remove_fcbd=remove_fcbd, remove_publisher=remove_publisher ) - metadata.alternate_number = p.filename_info["alternate"] or None - metadata.issue = p.filename_info["issue"] or None - metadata.issue_count = p.filename_info["issue_count"] or None - metadata.publisher = p.filename_info["publisher"] or None - metadata.series = p.filename_info["series"] or None - metadata.title = p.filename_info["title"] or None - metadata.volume = p.filename_info["volume"] or None - metadata.volume_count = p.filename_info["volume_count"] or None - metadata.year = p.filename_info["year"] or None + metadata.alternate_number = utils.xlate(p.filename_info["alternate"]) + metadata.issue = utils.xlate(p.filename_info["issue"]) + metadata.issue_count = utils.xlate(p.filename_info["issue_count"]) + metadata.publisher = utils.xlate(p.filename_info["publisher"]) + metadata.series = utils.xlate(p.filename_info["series"]) + metadata.title = utils.xlate(p.filename_info["title"]) + metadata.volume = utils.xlate(p.filename_info["volume"]) + metadata.volume_count = utils.xlate(p.filename_info["volume_count"]) + metadata.year = utils.xlate(p.filename_info["year"]) - metadata.scan_info = p.filename_info["remainder"] or None + metadata.scan_info = utils.xlate(p.filename_info["remainder"]) metadata.format = "FCBD" if p.filename_info["fcbd"] else None if p.filename_info["annual"]: metadata.format = "Annual" else: fnp = filenameparser.FileNameParser() - fnp.parse_filename(self.path) + fnp.parse_filename(str(self.path)) if fnp.issue: metadata.issue = fnp.issue if fnp.series: metadata.series = fnp.series if fnp.volume: - metadata.volume = fnp.volume + metadata.volume = utils.xlate(fnp.volume, True) if fnp.year: - metadata.year = fnp.year + metadata.year = utils.xlate(fnp.year, True) if fnp.issue_count: - metadata.issue_count = fnp.issue_count + metadata.issue_count = utils.xlate(fnp.issue_count, True) if fnp.remainder: metadata.scan_info = fnp.remainder @@ -1173,7 +1178,7 @@ class ComicArchive: return metadata - def export_as_zip(self, zipfilename): + def export_as_zip(self, zipfilename: str) -> bool: if self.archive_type == self.ArchiveType.Zip: # nothing to do, we're already a zip return True diff --git a/comicapi/comicbookinfo.py b/comicapi/comicbookinfo.py index 2c8a645..6bdcebb 100644 --- a/comicapi/comicbookinfo.py +++ b/comicapi/comicbookinfo.py @@ -18,17 +18,65 @@ import json import logging from collections import defaultdict from datetime import datetime +from typing import Any, Literal, TypedDict, Union from comicapi import utils from comicapi.genericmetadata import GenericMetadata logger = logging.getLogger(__name__) +CBILiteralType = Literal[ + "series", + "title", + "issue", + "publisher", + "publicationMonth", + "publicationYear", + "numberOfIssues", + "comments", + "genre", + "volume", + "numberOfVolumes", + "language", + "country", + "rating", + "credits", + "tags", +] + + +class Credits(TypedDict): + person: str + role: str + primary: bool + + +class ComicBookInfoJson(TypedDict, total=False): + series: str + title: str + publisher: str + publicationMonth: int + publicationYear: int + issue: int + numberOfIssues: int + volume: int + numberOfVolumes: int + rating: int + genre: str + language: str + country: str + credits: list[Credits] + tags: list[str] + comments: str + + +CBIContainer = TypedDict("CBIContainer", {"appID": str, "lastModified": str, "ComicBookInfo/1.0": ComicBookInfoJson}) + class ComicBookInfo: - def metadata_from_string(self, string): + def metadata_from_string(self, string: str) -> GenericMetadata: - cbi_container = json.loads(str(string, "utf-8")) + cbi_container = json.loads(string) metadata = GenericMetadata() @@ -66,12 +114,12 @@ class ComicBookInfo: return metadata - def string_from_metadata(self, metadata): + def string_from_metadata(self, metadata: GenericMetadata) -> str: cbi_container = self.create_json_dictionary(metadata) return json.dumps(cbi_container) - def validate_string(self, string): + def validate_string(self, string: Union[bytes, str]) -> bool: """Verify that the string actually contains CBI data in JSON format""" try: @@ -81,20 +129,21 @@ class ComicBookInfo: return "ComicBookInfo/1.0" in cbi_container - def create_json_dictionary(self, metadata): + def create_json_dictionary(self, metadata: GenericMetadata) -> CBIContainer: """Create the dictionary that we will convert to JSON text""" - cbi = {} - cbi_container = { - "appID": "ComicTagger/" + "1.0.0", - "lastModified": str(datetime.now()), - "ComicBookInfo/1.0": cbi, - } # TODO: ctversion.version, + cbi_container = CBIContainer( + { + "appID": "ComicTagger/" + "1.0.0", + "lastModified": str(datetime.now()), + "ComicBookInfo/1.0": {}, + } + ) # TODO: ctversion.version, # helper func - def assign(cbi_entry, md_entry): + def assign(cbi_entry: CBILiteralType, md_entry: Any) -> None: if md_entry is not None or isinstance(md_entry, str) and md_entry != "": - cbi[cbi_entry] = md_entry + cbi_container["ComicBookInfo/1.0"][cbi_entry] = md_entry assign("series", utils.xlate(metadata.series)) assign("title", utils.xlate(metadata.title)) @@ -115,7 +164,7 @@ class ComicBookInfo: return cbi_container - def write_to_external_file(self, filename, metadata): + def write_to_external_file(self, filename: str, metadata: GenericMetadata) -> None: cbi_container = self.create_json_dictionary(metadata) diff --git a/comicapi/comicinfoxml.py b/comicapi/comicinfoxml.py index 827cb9d..fd619c2 100644 --- a/comicapi/comicinfoxml.py +++ b/comicapi/comicinfoxml.py @@ -16,9 +16,12 @@ import logging import xml.etree.ElementTree as ET +from collections import OrderedDict +from typing import Any, List, Optional, cast +from xml.etree.ElementTree import ElementTree from comicapi import utils -from comicapi.genericmetadata import GenericMetadata +from comicapi.genericmetadata import GenericMetadata, ImageMetadata from comicapi.issuestring import IssueString logger = logging.getLogger(__name__) @@ -34,7 +37,7 @@ class ComicInfoXml: cover_synonyms = ["cover", "covers", "coverartist", "cover artist"] editor_synonyms = ["editor"] - def get_parseable_credits(self): + def get_parseable_credits(self) -> List[str]: parsable_credits = [] parsable_credits.extend(self.writer_synonyms) parsable_credits.extend(self.penciller_synonyms) @@ -45,17 +48,19 @@ class ComicInfoXml: parsable_credits.extend(self.editor_synonyms) return parsable_credits - def metadata_from_string(self, string): + def metadata_from_string(self, string: bytes) -> GenericMetadata: tree = ET.ElementTree(ET.fromstring(string)) return self.convert_xml_to_metadata(tree) - def string_from_metadata(self, metadata, xml=None): + def string_from_metadata(self, metadata: GenericMetadata, xml: bytes = b"") -> str: tree = self.convert_metadata_to_xml(self, metadata, xml) - tree_str = ET.tostring(tree.getroot(), encoding="utf-8", xml_declaration=True).decode() - return tree_str + tree_str = ET.tostring(tree.getroot(), encoding="utf-8", xml_declaration=True).decode("utf-8") + return str(tree_str) - def convert_metadata_to_xml(self, filename, metadata, xml=None): + def convert_metadata_to_xml( + self, filename: "ComicInfoXml", metadata: GenericMetadata, xml: bytes = b"" + ) -> ElementTree: # shorthand for the metadata md = metadata @@ -69,7 +74,7 @@ class ComicInfoXml: root.attrib["xmlns:xsd"] = "http://www.w3.org/2001/XMLSchema" # helper func - def assign(cix_entry, md_entry): + def assign(cix_entry: str, md_entry: Any) -> None: if md_entry is not None and md_entry: et_entry = root.find(cix_entry) if et_entry is not None: @@ -171,11 +176,8 @@ class ComicInfoXml: pages_node = ET.SubElement(root, "Pages") for page_dict in md.pages: - page = page_dict - if "Image" in page: - page["Image"] = str(page["Image"]) page_node = ET.SubElement(pages_node, "Page") - page_node.attrib = dict(sorted(page_dict.items())) + page_node.attrib = OrderedDict(sorted((k, str(v)) for k, v in page_dict.items())) ET.indent(root) @@ -183,14 +185,14 @@ class ComicInfoXml: tree = ET.ElementTree(root) return tree - def convert_xml_to_metadata(self, tree): + def convert_xml_to_metadata(self, tree: ElementTree) -> GenericMetadata: root = tree.getroot() if root.tag != "ComicInfo": - raise "1" + raise Exception("Not a ComicInfo file") - def get(name): + def get(name: str) -> Optional[str]: tag = root.find(name) if tag is None: return None @@ -256,20 +258,21 @@ class ComicInfoXml: pages_node = root.find("Pages") if pages_node is not None: for page in pages_node: - if "Image" in page.attrib: - page.attrib["Image"] = int(page.attrib["Image"]) - md.pages.append(page.attrib) + p: dict[str, Any] = page.attrib + if "Image" in p: + p["Image"] = int(p["Image"]) + md.pages.append(cast(ImageMetadata, p)) md.is_empty = False return md - def write_to_external_file(self, filename, metadata, xml=None): + def write_to_external_file(self, filename: str, metadata: GenericMetadata, xml: bytes = b"") -> None: tree = self.convert_metadata_to_xml(self, metadata, xml) tree.write(filename, encoding="utf-8", xml_declaration=True) - def read_from_external_file(self, filename): + def read_from_external_file(self, filename: str) -> GenericMetadata: tree = ET.parse(filename) return self.convert_xml_to_metadata(tree) diff --git a/comicapi/filenamelexer.py b/comicapi/filenamelexer.py index ed3f27a..03de6ec 100644 --- a/comicapi/filenamelexer.py +++ b/comicapi/filenamelexer.py @@ -2,6 +2,7 @@ import calendar import os import unicodedata from enum import Enum, auto +from typing import Any, Callable, Optional, Set class ItemType(Enum): @@ -73,26 +74,26 @@ key = { class Item: - def __init__(self, typ: ItemType, pos: int, val: str): + def __init__(self, typ: ItemType, pos: int, val: str) -> None: self.typ: ItemType = typ self.pos: int = pos self.val: str = val - def __repr__(self): + def __repr__(self) -> str: return f"{self.val}: index: {self.pos}: {self.typ}" class Lexer: - def __init__(self, string): + def __init__(self, string: str) -> None: self.input: str = string # The string being scanned - self.state = None # The next lexing function to enter + self.state: Optional[Callable[[Lexer], Optional[Callable]]] = None # The next lexing function to enter self.pos: int = -1 # Current position in the input self.start: int = 0 # Start position of this item self.lastPos: int = 0 # Position of most recent item returned by nextItem self.paren_depth: int = 0 # Nesting depth of ( ) exprs self.brace_depth: int = 0 # Nesting depth of { } self.sbrace_depth: int = 0 # Nesting depth of [ ] - self.items = [] + self.items: list[Item] = [] # Next returns the next rune in the input. def get(self) -> str: @@ -110,20 +111,20 @@ class Lexer: return self.input[self.pos + 1] - def backup(self): + def backup(self) -> None: self.pos -= 1 # Emit passes an item back to the client. - def emit(self, t: ItemType): + def emit(self, t: ItemType) -> None: self.items.append(Item(t, self.start, self.input[self.start : self.pos + 1])) self.start = self.pos + 1 # Ignore skips over the pending input before this point. - def ignore(self): + def ignore(self) -> None: self.start = self.pos # Accept consumes the next rune if it's from the valid se: - def accept(self, valid: str): + def accept(self, valid: str) -> bool: if self.get() in valid: return True @@ -131,17 +132,12 @@ class Lexer: return False # AcceptRun consumes a run of runes from the valid set. - def accept_run(self, valid: str): + def accept_run(self, valid: str) -> None: while self.get() in valid: pass self.backup() - # Errorf returns an error token and terminates the scan by passing - # Back a nil pointer that will be the next state, terminating self.nextItem. - def errorf(self, message: str): - self.items.append(Item(ItemType.Error, self.start, message)) - # NextItem returns the next item from the input. # Called by the parser, not in the lexing goroutine. # def next_item(self) -> Item: @@ -149,7 +145,7 @@ class Lexer: # self.lastPos = item.pos # return item - def scan_number(self): + def scan_number(self) -> bool: digits = "0123456789" self.accept_run(digits) @@ -171,21 +167,28 @@ class Lexer: return True # Runs the state machine for the lexer. - def run(self): + def run(self) -> None: self.state = lex_filename while self.state is not None: self.state = self.state(self) +# Errorf returns an error token and terminates the scan by passing +# Back a nil pointer that will be the next state, terminating self.nextItem. +def errorf(lex: Lexer, message: str) -> Optional[Callable[[Lexer], Optional[Callable]]]: + lex.items.append(Item(ItemType.Error, lex.start, message)) + return None + + # Scans the elements inside action delimiters. -def lex_filename(lex: Lexer): +def lex_filename(lex: Lexer) -> Optional[Callable[[Lexer], Optional[Callable]]]: r = lex.get() if r == eof: if lex.paren_depth != 0: - return lex.errorf("unclosed left paren") + return errorf(lex, "unclosed left paren") if lex.brace_depth != 0: - return lex.errorf("unclosed left paren") + return errorf(lex, "unclosed left paren") lex.emit(ItemType.EOF) return None elif is_space(r): @@ -230,7 +233,7 @@ def lex_filename(lex: Lexer): lex.emit(ItemType.RightParen) lex.paren_depth -= 1 if lex.paren_depth < 0: - return lex.errorf("unexpected right paren " + r) + return errorf(lex, "unexpected right paren " + r) elif r == "{": lex.emit(ItemType.LeftBrace) @@ -239,7 +242,7 @@ def lex_filename(lex: Lexer): lex.emit(ItemType.RightBrace) lex.brace_depth -= 1 if lex.brace_depth < 0: - return lex.errorf("unexpected right brace " + r) + return errorf(lex, "unexpected right brace " + r) elif r == "[": lex.emit(ItemType.LeftSBrace) @@ -248,17 +251,17 @@ def lex_filename(lex: Lexer): lex.emit(ItemType.RightSBrace) lex.sbrace_depth -= 1 if lex.sbrace_depth < 0: - return lex.errorf("unexpected right brace " + r) + return errorf(lex, "unexpected right brace " + r) elif is_symbol(r): # L.backup() lex.emit(ItemType.Symbol) else: - return lex.errorf("unrecognized character in action: " + r) + return errorf(lex, "unrecognized character in action: " + r) return lex_filename -def lex_operator(lex: Lexer): +def lex_operator(lex: Lexer) -> Callable: lex.accept_run("-|:;") lex.emit(ItemType.Operator) return lex_filename @@ -266,7 +269,7 @@ def lex_operator(lex: Lexer): # LexSpace scans a run of space characters. # One space has already been seen. -def lex_space(lex: Lexer): +def lex_space(lex: Lexer) -> Callable: while is_space(lex.peek()): lex.get() @@ -275,7 +278,7 @@ def lex_space(lex: Lexer): # Lex_text scans an alphanumeric. -def lex_text(lex: Lexer): +def lex_text(lex: Lexer) -> Callable: while True: r = lex.get() if is_alpha_numeric(r): @@ -306,7 +309,7 @@ def lex_text(lex: Lexer): return lex_filename -def cal(value: str): +def cal(value: str) -> Set[Any]: month_abbr = [i for i, x in enumerate(calendar.month_abbr) if x == value.title()] month_name = [i for i, x in enumerate(calendar.month_name) if x == value.title()] day_abbr = [i for i, x in enumerate(calendar.day_abbr) if x == value.title()] @@ -314,9 +317,9 @@ def cal(value: str): return set(month_abbr + month_name + day_abbr + day_name) -def lex_number(lex: Lexer): +def lex_number(lex: Lexer) -> Optional[Callable[[Lexer], Optional[Callable]]]: if not lex.scan_number(): - return lex.errorf("bad number syntax: " + lex.input[lex.start : lex.pos]) + return errorf(lex, "bad number syntax: " + lex.input[lex.start : lex.pos]) # Complex number logic removed. Messes with math operations without space if lex.input[lex.start] == "#": @@ -330,24 +333,24 @@ def lex_number(lex: Lexer): return lex_filename -def is_space(character: str): +def is_space(character: str) -> bool: return character in "_ \t" # IsAlphaNumeric reports whether r is an alphabetic, digit, or underscore. -def is_alpha_numeric(character: str): +def is_alpha_numeric(character: str) -> bool: return character.isalpha() or character.isnumeric() -def is_operator(character: str): +def is_operator(character: str) -> bool: return character in "-|:;/\\" -def is_symbol(character: str): +def is_symbol(character: str) -> bool: return unicodedata.category(character)[0] in "PS" -def Lex(filename: str): +def Lex(filename: str) -> Lexer: lex = Lexer(string=os.path.basename(filename)) lex.run() return lex diff --git a/comicapi/filenameparser.py b/comicapi/filenameparser.py index e4f829d..efea26a 100644 --- a/comicapi/filenameparser.py +++ b/comicapi/filenameparser.py @@ -24,7 +24,7 @@ import logging import os import re from operator import itemgetter -from typing import TypedDict +from typing import Callable, Match, Optional, TypedDict from urllib.parse import unquote from text2digits import text2digits @@ -38,7 +38,7 @@ logger = logging.getLogger(__name__) class FileNameParser: - def __init__(self): + def __init__(self) -> None: self.series = "" self.volume = "" self.year = "" @@ -46,10 +46,10 @@ class FileNameParser: self.remainder = "" self.issue = "" - def repl(self, m): + def repl(self, m: Match[str]) -> str: return " " * len(m.group()) - def fix_spaces(self, string, remove_dashes=True): + def fix_spaces(self, string: str, remove_dashes: bool = True) -> str: if remove_dashes: placeholders = [r"[-_]", r" +"] else: @@ -58,7 +58,7 @@ class FileNameParser: string = re.sub(ph, self.repl, string) return string # .strip() - def get_issue_count(self, filename, issue_end): + def get_issue_count(self, filename: str, issue_end: int) -> str: count = "" filename = filename[issue_end:] @@ -79,7 +79,7 @@ class FileNameParser: return count.lstrip("0") - def get_issue_number(self, filename): + def get_issue_number(self, filename: str) -> tuple[str, int, int]: """Returns a tuple of issue number string, and start and end indexes in the filename (The indexes will be used to split the string up for further parsing) """ @@ -161,7 +161,7 @@ class FileNameParser: return issue, start, end - def get_series_name(self, filename, issue_start): + def get_series_name(self, filename: str, issue_start: int) -> tuple[str, str]: """Use the issue number string index to split the filename string""" if issue_start != 0: @@ -223,7 +223,7 @@ class FileNameParser: return series, volume.strip() - def get_year(self, filename, issue_end): + def get_year(self, filename: str, issue_end: int) -> str: filename = filename[issue_end:] @@ -236,7 +236,7 @@ class FileNameParser: year = re.sub(r"[^0-9]", "", year) return year - def get_remainder(self, filename, year, count, volume, issue_end): + def get_remainder(self, filename: str, year: str, count: str, volume: str, issue_end: int) -> str: """Make a guess at where the the non-interesting stuff begins""" remainder = "" @@ -261,7 +261,7 @@ class FileNameParser: return remainder.strip() - def parse_filename(self, filename): + def parse_filename(self, filename: str) -> None: # remove the path filename = os.path.basename(filename) @@ -325,12 +325,12 @@ class Parser: def __init__( self, lexer_result: list[filenamelexer.Item], - first_is_alt=False, - remove_c2c=False, - remove_fcbd=False, - remove_publisher=False, - ): - self.state = None + first_is_alt: bool = False, + remove_c2c: bool = False, + remove_fcbd: bool = False, + remove_publisher: bool = False, + ) -> None: + self.state: Optional[Callable[[Parser], Optional[Callable]]] = None self.pos = -1 self.firstItem = True @@ -384,16 +384,16 @@ class Parser: return self.input[self.pos - 1] # Backup steps back one Item. - def backup(self): + def backup(self) -> None: self.pos -= 1 - def run(self): + def run(self) -> None: self.state = parse while self.state is not None: self.state = self.state(self) -def parse(p: Parser): +def parse(p: Parser) -> Optional[Callable[[Parser], Optional[Callable]]]: item: filenamelexer.Item = p.get() # We're done, time to do final processing @@ -644,7 +644,7 @@ def parse(p: Parser): # TODO: What about more esoteric numbers??? -def parse_issue_number(p: Parser): +def parse_issue_number(p: Parser) -> Optional[Callable[[Parser], Optional[Callable]]]: item = p.input[p.pos] if "issue" in p.filename_info: @@ -677,7 +677,7 @@ def parse_issue_number(p: Parser): return parse -def parse_series(p: Parser): +def parse_series(p: Parser) -> Optional[Callable[[Parser], Optional[Callable]]]: item = p.input[p.pos] series: list[list[filenamelexer.Item]] = [[]] @@ -812,7 +812,7 @@ def parse_series(p: Parser): return parse -def resolve_year(p: Parser): +def resolve_year(p: Parser) -> None: if len(p.year_candidates) > 0: # Sort by likely_year boolean p.year_candidates.sort(key=itemgetter(0)) @@ -842,7 +842,7 @@ def resolve_year(p: Parser): p.title_parts.remove(selected_year) -def parse_finish(p: Parser): +def parse_finish(p: Parser) -> Optional[Callable[[Parser], Optional[Callable]]]: resolve_year(p) # If we don't have an issue try to find it in the series @@ -924,13 +924,14 @@ def parse_finish(p: Parser): "publisher", ]: if s not in p.filename_info: - p.filename_info[s] = "" + p.filename_info[s] = "" # type: ignore for s in ["fcbd", "c2c", "annual"]: if s not in p.filename_info: - p.filename_info[s] = False + p.filename_info[s] = False # type: ignore + return None -def get_remainder(p: Parser): +def get_remainder(p: Parser) -> str: remainder = "" rem = [] @@ -988,7 +989,7 @@ def get_remainder(p: Parser): return remainder.strip() -def parse_info_specifier(p: Parser): +def parse_info_specifier(p: Parser) -> Optional[Callable[[Parser], Optional[Callable]]]: item = p.input[p.pos] index = p.pos @@ -1009,32 +1010,32 @@ def parse_info_specifier(p: Parser): # 'of' is only special if it is inside a parenthesis. elif item.val.lower() == "of": i = get_number(p, index) - if p.in_something > 0: - if p.issue_number_at is None: - # TODO: Figure out what to do here if it ever happens - p.filename_info["issue_count"] = str(int(t2do.convert(number.val))) - p.used_items.append(item) - p.used_items.append(number) + if i is not None: + if p.in_something > 0: + if p.issue_number_at is None: + # TODO: Figure out what to do here if it ever happens + p.filename_info["issue_count"] = str(int(t2do.convert(number.val))) + p.used_items.append(item) + p.used_items.append(number) - # This is definitely the issue number - elif p.issue_number_at == i.pos: - p.filename_info["issue_count"] = str(int(t2do.convert(number.val))) - p.used_items.append(item) - p.used_items.append(number) + # This is definitely the issue number + elif p.issue_number_at == i.pos: + p.filename_info["issue_count"] = str(int(t2do.convert(number.val))) + p.used_items.append(item) + p.used_items.append(number) - # This is not for the issue number it is not in either the issue or the title, assume it is the volume number and count - elif p.issue_number_at != i.pos and i not in p.series_parts and i not in p.title_parts: - p.filename_info["volume"] = i.val - p.filename_info["volume_count"] = str(int(t2do.convert(number.val))) - p.used_items.append(i) - p.used_items.append(item) - p.used_items.append(number) + # This is not for the issue number it is not in either the issue or the title, assume it is the volume number and count + elif p.issue_number_at != i.pos and i not in p.series_parts and i not in p.title_parts: + p.filename_info["volume"] = i.val + p.filename_info["volume_count"] = str(int(t2do.convert(number.val))) + p.used_items.append(i) + p.used_items.append(item) + p.used_items.append(number) + else: + # TODO: Figure out what to do here if it ever happens + pass else: - # TODO: Figure out what to do here if it ever happens - pass - else: - # Lets 'The Wrath of Foobar-Man, Part 1 of 2' parse correctly as the title - if i is not None: + # Lets 'The Wrath of Foobar-Man, Part 1 of 2' parse correctly as the title p.pos = [ind for ind, x in enumerate(p.input) if x == i][0] if not p.in_something: @@ -1043,7 +1044,7 @@ def parse_info_specifier(p: Parser): # Gets 03 in '03 of 6' -def get_number(p: Parser, index: int): +def get_number(p: Parser, index: int) -> Optional[filenamelexer.Item]: # Go backward through the filename to see if we can find what this is of eg '03 (of 6)' or '008 title 03 (of 6)' rev = p.input[:index] rev.reverse() @@ -1064,7 +1065,7 @@ def get_number(p: Parser, index: int): return None -def join_title(lst: list[filenamelexer.Item]): +def join_title(lst: list[filenamelexer.Item]) -> str: title = "" for i, item in enumerate(lst): if i + 1 == len(lst) and item.val == ",": # We ignore commas on the end @@ -1094,11 +1095,11 @@ def join_title(lst: list[filenamelexer.Item]): def Parse( lexer_result: list[filenamelexer.Item], - first_is_alt=False, - remove_c2c=False, - remove_fcbd=False, - remove_publisher=False, -): + first_is_alt: bool = False, + remove_c2c: bool = False, + remove_fcbd: bool = False, + remove_publisher: bool = False, +) -> Parser: p = Parser( lexer_result=lexer_result, first_is_alt=first_is_alt, diff --git a/comicapi/genericmetadata.py b/comicapi/genericmetadata.py index dc0ddc4..5d5e6ec 100644 --- a/comicapi/genericmetadata.py +++ b/comicapi/genericmetadata.py @@ -21,7 +21,7 @@ possible, however lossy it might be # limitations under the License. import logging -from typing import List, TypedDict +from typing import Any, List, Optional, TypedDict from comicapi import utils @@ -73,76 +73,75 @@ class GenericMetadata: cover_synonyms = ["cover", "covers", "coverartist", "cover artist"] editor_synonyms = ["editor"] - def __init__(self): + def __init__(self) -> None: - self.is_empty = True - self.tag_origin = None + self.is_empty: bool = True + self.tag_origin: Optional[str] = None - self.series = None - self.issue = None - self.title = None - self.publisher = None - self.month = None - self.year = None - self.day = None - self.issue_count = None - self.volume = None - self.genre = None - self.language = None # 2 letter iso code - self.comments = None # use same way as Summary in CIX + self.series: Optional[str] = None + self.issue: Optional[str] = None + self.title: Optional[str] = None + self.publisher: Optional[str] = None + self.month: Optional[int] = None + self.year: Optional[int] = None + self.day: Optional[int] = None + self.issue_count: Optional[int] = None + self.volume: Optional[int] = None + self.genre: Optional[str] = None + self.language: Optional[str] = None # 2 letter iso code + self.comments: Optional[str] = None # use same way as Summary in CIX - self.volume_count = None - self.critical_rating = None - self.country = None + self.volume_count: Optional[int] = None + self.critical_rating: Optional[str] = None + self.country: Optional[str] = None - self.alternate_series = None - self.alternate_number = None - self.alternate_count = None - self.imprint = None - self.notes = None - self.web_link = None - self.format = None - self.manga = None - self.black_and_white = None - self.page_count = None - self.maturity_rating = None - self.community_rating = None + self.alternate_series: Optional[str] = None + self.alternate_number: Optional[str] = None + self.alternate_count: Optional[int] = None + self.imprint: Optional[str] = None + self.notes: Optional[str] = None + self.web_link: Optional[str] = None + self.format: Optional[str] = None + self.manga: Optional[str] = None + self.black_and_white: Optional[bool] = None + self.page_count: Optional[int] = None + self.maturity_rating: Optional[str] = None + self.community_rating: Optional[str] = None - self.story_arc = None - self.series_group = None - self.scan_info = None + self.story_arc: Optional[str] = None + self.series_group: Optional[str] = None + self.scan_info: Optional[str] = None - self.characters = None - self.teams = None - self.locations = None + self.characters: Optional[str] = None + self.teams: Optional[str] = None + self.locations: Optional[str] = None self.credits: List[CreditMetadata] = [] self.tags: List[str] = [] self.pages: List[ImageMetadata] = [] # Some CoMet-only items - self.price = None - self.is_version_of = None - self.rights = None - self.identifier = None - self.last_mark = None - self.cover_image = None + self.price: Optional[str] = None + self.is_version_of: Optional[str] = None + self.rights: Optional[str] = None + self.identifier: Optional[str] = None + self.last_mark: Optional[str] = None + self.cover_image: Optional[str] = None - def overlay(self, new_md): + def overlay(self, new_md: "GenericMetadata") -> None: """Overlay a metadata object on this one That is, when the new object has non-None values, over-write them to this one. """ - def assign(cur, new): + def assign(cur: str, new: Any) -> None: if new is not None: if isinstance(new, str) and len(new) == 0: setattr(self, cur, None) else: setattr(self, cur, new) - new_md: GenericMetadata if not new_md.is_empty: self.is_empty = False @@ -199,7 +198,7 @@ class GenericMetadata: if len(new_md.pages) > 0: assign("pages", new_md.pages) - def overlay_credits(self, new_credits): + def overlay_credits(self, new_credits: List[CreditMetadata]) -> None: for c in new_credits: primary = bool("primary" in c and c["primary"]) @@ -212,7 +211,7 @@ class GenericMetadata: else: self.add_credit(c["person"], c["role"], primary) - def set_default_page_list(self, count): + def set_default_page_list(self, count: int) -> None: # generate a default page list, with the first page marked as the cover for i in range(count): page_dict = ImageMetadata(Image=i) @@ -220,7 +219,7 @@ class GenericMetadata: page_dict["Type"] = PageType.FrontCover self.pages.append(page_dict) - def get_archive_page_index(self, pagenum): + def get_archive_page_index(self, pagenum: int) -> int: # convert the displayed page number to the page index of the file in # the archive if pagenum < len(self.pages): @@ -228,7 +227,7 @@ class GenericMetadata: return 0 - def get_cover_page_index_list(self): + def get_cover_page_index_list(self) -> list[int]: # return a list of archive page indices of cover pages coverlist = [] for p in self.pages: @@ -240,7 +239,7 @@ class GenericMetadata: return coverlist - def add_credit(self, person, role, primary=False): + def add_credit(self, person: str, role: str, primary: bool = False) -> None: credit: CreditMetadata = {"person": person, "role": role, "primary": primary} @@ -256,7 +255,7 @@ class GenericMetadata: if not found: self.credits.append(credit) - def get_primary_credit(self, role): + def get_primary_credit(self, role: str) -> str: primary = "" for credit in self.credits: if (primary == "" and credit["role"].lower() == role.lower()) or ( @@ -265,16 +264,16 @@ class GenericMetadata: primary = credit["person"] return primary - def __str__(self): - vals = [] + def __str__(self) -> str: + vals: list[tuple[str, Any]] = [] if self.is_empty: return "No metadata" - def add_string(tag, val): + def add_string(tag: str, val: Any) -> None: if val is not None and str(val) != "": vals.append((tag, val)) - def add_attr_string(tag): + def add_attr_string(tag: str) -> None: add_string(tag, getattr(self, tag)) add_attr_string("series") diff --git a/comicapi/issuestring.py b/comicapi/issuestring.py index 6775d07..3567761 100644 --- a/comicapi/issuestring.py +++ b/comicapi/issuestring.py @@ -22,12 +22,13 @@ comics industry throws at us. import logging import unicodedata +from typing import Optional logger = logging.getLogger(__name__) class IssueString: - def __init__(self, text): + def __init__(self, text: Optional[str]) -> None: # break up the issue number string into 2 parts: the numeric and suffix string. # (assumes that the numeric portion is always first) @@ -84,7 +85,7 @@ class IssueString: else: self.suffix = text - def as_string(self, pad=0): + def as_string(self, pad: int = 0) -> str: # return the float, left side zero-padded, with suffix attached if self.num is None: return self.suffix @@ -112,7 +113,7 @@ class IssueString: return num_s - def as_float(self): + def as_float(self) -> Optional[float]: # return the float, with no suffix if len(self.suffix) == 1 and self.suffix.isnumeric(): return (self.num or 0) + unicodedata.numeric(self.suffix) @@ -120,7 +121,7 @@ class IssueString: return 0.5 return self.num - def as_int(self): + def as_int(self) -> Optional[int]: # return the int version of the float if self.num is None: return None diff --git a/comicapi/utils.py b/comicapi/utils.py index bbe5a7a..f432079 100644 --- a/comicapi/utils.py +++ b/comicapi/utils.py @@ -14,15 +14,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import codecs import locale import logging import os import platform import re -import sys import unicodedata from collections import defaultdict +from typing import Any, List, Optional, Union import pycountry @@ -33,26 +32,26 @@ class UtilsVars: already_fixed_encoding = False -def get_actual_preferred_encoding(): +def get_actual_preferred_encoding() -> str: preferred_encoding = locale.getpreferredencoding() if platform.system() == "Darwin": preferred_encoding = "utf-8" return preferred_encoding -def fix_output_encoding(): - if not UtilsVars.already_fixed_encoding: - # this reads the environment and inits the right locale - locale.setlocale(locale.LC_ALL, "") +# def fix_output_encoding() -> None: +# if not UtilsVars.already_fixed_encoding: +# # this reads the environment and inits the right locale +# locale.setlocale(locale.LC_ALL, "") - # try to make stdout/stderr encodings happy for unicode printing - preferred_encoding = get_actual_preferred_encoding() - sys.stdout = codecs.getwriter(preferred_encoding)(sys.stdout) - sys.stderr = codecs.getwriter(preferred_encoding)(sys.stderr) - UtilsVars.already_fixed_encoding = True +# # try to make stdout/stderr encodings happy for unicode printing +# preferred_encoding = get_actual_preferred_encoding() +# sys.stdout = codecs.getwriter(preferred_encoding)(sys.stdout) +# sys.stderr = codecs.getwriter(preferred_encoding)(sys.stderr) +# UtilsVars.already_fixed_encoding = True -def get_recursive_filelist(pathlist): +def get_recursive_filelist(pathlist: List[str]) -> List[str]: """Get a recursive list of of all files under all path items in the list""" filelist = [] @@ -75,7 +74,7 @@ def get_recursive_filelist(pathlist): return filelist -def list_to_string(lst): +def list_to_string(lst: List[Union[str, Any]]) -> str: string = "" if lst is not None: for item in lst: @@ -85,7 +84,7 @@ def list_to_string(lst): return string -def add_to_path(dirname): +def add_to_path(dirname: str) -> None: if dirname is not None and dirname != "": # verify that path doesn't already contain the given dirname @@ -97,10 +96,10 @@ def add_to_path(dirname): os.environ["PATH"] = dirname + os.pathsep + os.environ["PATH"] -def which(program): +def which(program: str) -> Optional[str]: """Returns path of the executable, if it exists""" - def is_exe(fpath): + def is_exe(fpath: str) -> bool: return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, _ = os.path.split(program) @@ -116,7 +115,7 @@ def which(program): return None -def xlate(data, is_int=False): +def xlate(data: Any, is_int: bool = False) -> Any: if data is None or data == "": return None if is_int: @@ -130,7 +129,7 @@ def xlate(data, is_int=False): return str(data) -def remove_articles(text): +def remove_articles(text: str) -> str: text = text.lower() articles = [ "&", @@ -168,7 +167,7 @@ def remove_articles(text): return new_text -def sanitize_title(text): +def sanitize_title(text: str) -> str: # normalize unicode and convert to ascii. Does not work for everything eg ½ to 1⁄2 not 1/2 # this will probably cause issues with titles in other character sets e.g. chinese, japanese text = unicodedata.normalize("NFKD", text).encode("ascii", "ignore").decode("ascii") @@ -183,7 +182,7 @@ def sanitize_title(text): return text -def unique_file(file_name): +def unique_file(file_name: str) -> str: counter = 1 file_name_parts = os.path.splitext(file_name) while True: @@ -193,9 +192,9 @@ def unique_file(file_name): counter += 1 -languages = defaultdict(lambda: None) +languages: dict[Optional[str], Optional[str]] = defaultdict(lambda: None) -countries = defaultdict(lambda: None) +countries: dict[Optional[str], Optional[str]] = defaultdict(lambda: None) for c in pycountry.countries: if "alpha_2" in c._fields: @@ -206,11 +205,11 @@ for lng in pycountry.languages: languages[lng.alpha_2] = lng.name -def get_language_from_iso(iso: str): +def get_language_from_iso(iso: Optional[str]) -> Optional[str]: return languages[iso] -def get_language(string): +def get_language(string: Optional[str]) -> Optional[str]: if string is None: return None @@ -218,7 +217,7 @@ def get_language(string): if lang is None: try: - return pycountry.languages.lookup(string).name + return str(pycountry.languages.lookup(string).name) except: return None return lang