From e8fa51ad45be5d6d9304502f45b216c684bb9c53 Mon Sep 17 00:00:00 2001 From: Timmy Welch Date: Wed, 1 Jun 2022 21:14:51 -0700 Subject: [PATCH] Ensure comicapi is as consistent as possible --- comicapi/comet.py | 44 ++- comicapi/comicarchive.py | 597 ++++++++++++++++++--------------- comicapi/comicbookinfo.py | 2 - comicapi/comicinfoxml.py | 2 - comictaggerlib/cli.py | 3 +- comictaggerlib/renamewindow.py | 3 +- tests/comicarchive_test.py | 94 ++++-- 7 files changed, 415 insertions(+), 330 deletions(-) diff --git a/comicapi/comet.py b/comicapi/comet.py index d8da089..4bec783 100644 --- a/comicapi/comet.py +++ b/comicapi/comet.py @@ -36,7 +36,6 @@ class CoMet: editor_synonyms = ["editor"] def metadata_from_string(self, string: str) -> GenericMetadata: - tree = ET.ElementTree(ET.fromstring(string)) return self.convert_xml_to_metadata(tree) @@ -127,7 +126,6 @@ class CoMet: return tree def convert_xml_to_metadata(self, tree: ET.ElementTree) -> GenericMetadata: - root = tree.getroot() if root.tag != "comet": @@ -143,24 +141,24 @@ class CoMet: return node.text return None - md.series = get("series") - md.title = get("title") - md.issue = get("issue") - md.volume = get("volume") - md.comments = get("description") - md.publisher = get("publisher") - md.language = get("language") - md.format = get("format") - md.page_count = get("pages") - md.maturity_rating = get("rating") - md.price = get("price") - md.is_version_of = get("isVersionOf") - md.rights = get("rights") - md.identifier = get("identifier") - md.last_mark = get("lastMark") - md.genre = get("genre") # TODO - repeatable field + md.series = utils.xlate(get("series")) + md.title = utils.xlate(get("title")) + md.issue = utils.xlate(get("issue")) + md.volume = utils.xlate(get("volume")) + md.comments = utils.xlate(get("description")) + md.publisher = utils.xlate(get("publisher")) + md.language = utils.xlate(get("language")) + md.format = utils.xlate(get("format")) + md.page_count = utils.xlate(get("pages")) + md.maturity_rating = utils.xlate(get("rating")) + md.price = utils.xlate(get("price")) + md.is_version_of = utils.xlate(get("isVersionOf")) + md.rights = utils.xlate(get("rights")) + md.identifier = utils.xlate(get("identifier")) + md.last_mark = utils.xlate(get("lastMark")) + md.genre = utils.xlate(get("genre")) # TODO - repeatable field - date = get("date") + date = utils.xlate(get("date")) if date is not None: parts = date.split("-") if len(parts) > 0: @@ -168,9 +166,9 @@ class CoMet: if len(parts) > 1: md.month = parts[1] - md.cover_image = get("coverImage") + md.cover_image = utils.xlate(get("coverImage")) - reading_direction = get("readingDirection") + reading_direction = utils.xlate(get("readingDirection")) if reading_direction is not None and reading_direction == "rtl": md.manga = "YesAndRightToLeft" @@ -208,18 +206,16 @@ class CoMet: tree = ET.ElementTree(ET.fromstring(string)) root = tree.getroot() if root.tag != "comet": - raise Exception + return False except ET.ParseError: return False return True def write_to_external_file(self, filename: str, metadata: GenericMetadata) -> None: - tree = self.convert_metadata_to_xml(metadata) tree.write(filename, encoding="utf-8") def read_from_external_file(self, filename: str) -> GenericMetadata: - tree = ET.parse(filename) return self.convert_xml_to_metadata(tree) diff --git a/comicapi/comicarchive.py b/comicapi/comicarchive.py index 9d36f4d..b3b6eb5 100644 --- a/comicapi/comicarchive.py +++ b/comicapi/comicarchive.py @@ -19,6 +19,7 @@ import logging import os import pathlib import platform +import shutil import struct import subprocess import tempfile @@ -53,7 +54,9 @@ except ImportError: logger = logging.getLogger(__name__) if not pil_available: - logger.exception("PIL unavalable") + logger.error("PIL unavalable") +if not rar_support: + logger.error("unrar-cffi unavailable") class MetaDataStyle: @@ -69,7 +72,7 @@ class UnknownArchiver: """Unknown implementation""" def __init__(self, path: pathlib.Path | str) -> None: - self.path = path + self.path = pathlib.Path(path) def get_comment(self) -> str: return "" @@ -77,25 +80,31 @@ class UnknownArchiver: def set_comment(self, comment: str) -> bool: return False - def read_file(self, archive_file: str) -> bytes | None: - return None - - def write_file(self, archive_file: str, data: bytes) -> bool: - return False + def read_file(self, archive_file: str) -> bytes: + raise NotImplementedError def remove_file(self, archive_file: str) -> bool: return False + def write_file(self, archive_file: str, data: bytes) -> bool: + return False + def get_filename_list(self) -> list[str]: return [] + def rebuild(self, exclude_list: list[str]) -> bool: + return False + + def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: + return False + class SevenZipArchiver(UnknownArchiver): """7Z implementation""" def __init__(self, path: pathlib.Path | str) -> None: - self.path = pathlib.Path(path) + super().__init__(path) # @todo: Implement Comment? def get_comment(self) -> str: @@ -109,39 +118,30 @@ class SevenZipArchiver(UnknownArchiver): try: with py7zr.SevenZipFile(self.path, "r") as zf: data = zf.read(archive_file)[archive_file].read() - except py7zr.Bad7zFile as e: - logger.error("bad 7zip file [%s]: %s :: %s", e, self.path, archive_file) - raise OSError from e - except OSError as e: - logger.error("bad 7zip file [%s]: %s :: %s", e, self.path, archive_file) - raise OSError from e + except (py7zr.Bad7zFile, OSError) as e: + logger.error("Error reading 7zip archive [%s]: %s :: %s", e, self.path, archive_file) + raise return data def remove_file(self, archive_file: str) -> bool: - try: - self.rebuild_zip_file([archive_file]) - except (py7zr.Bad7zFile, OSError): - logger.exception("Failed to remove %s from 7zip archive", archive_file) - return False - else: - return True + return self.rebuild([archive_file]) def write_file(self, archive_file: str, data: bytes) -> bool: - # At the moment, no other option but to rebuild the whole - # zip archive w/o the indicated file. Very sucky, but maybe + # At the moment, no other option but to rebuild the whole + # archive w/o the indicated file. Very sucky, but maybe # another solution can be found - try: - files = self.get_filename_list() - if archive_file in files: - self.rebuild_zip_file([archive_file]) + files = self.get_filename_list() + if archive_file in files: + self.rebuild([archive_file]) + try: # now just add the archive file as a new one with py7zr.SevenZipFile(self.path, "a") as zf: zf.writestr(data, archive_file) return True - except (py7zr.Bad7zFile, OSError): - logger.exception("Writing zip file failed") + except (py7zr.Bad7zFile, OSError) as e: + logger.error("Error writing 7zip archive [%s]: %s :: %s", e, self.path, archive_file) return False def get_filename_list(self) -> list[str]: @@ -151,41 +151,44 @@ class SevenZipArchiver(UnknownArchiver): return namelist except (py7zr.Bad7zFile, OSError) as e: - logger.error("Unable to get 7zip file list [%s]: %s", e, self.path) + logger.error("Error listing files in 7zip archive [%s]: %s", e, self.path) return [] - def rebuild_zip_file(self, exclude_list: list[str]) -> None: + def rebuild(self, exclude_list: list[str]) -> bool: """Zip helper func This recompresses the zip archive, without the files in the exclude_list """ - tmp_fd, tmp_name = tempfile.mkstemp(dir=os.path.dirname(self.path)) - os.close(tmp_fd) try: - with py7zr.SevenZipFile(self.path, "r") as zin: + # py7zr treats all archives as if they used solid compression + # so we need to get the filename list first to read all the files at once + with py7zr.SevenZipFile(self.path, mode="r") as zin: targets = [f for f in zin.getnames() if f not in exclude_list] - with py7zr.SevenZipFile(self.path, "r") as zin: - with py7zr.SevenZipFile(tmp_name, "w") as zout: - for fname, bio in zin.read(targets).items(): - zout.writef(bio, fname) - except (py7zr.Bad7zFile, OSError): - logger.exception("Error rebuilding 7zip file: %s", self.path) + with tempfile.NamedTemporaryFile(dir=os.path.dirname(self.path), delete=False) as tmp_file: + with py7zr.SevenZipFile(tmp_file.file, mode="w") as zout: + with py7zr.SevenZipFile(self.path, mode="r") as zin: + for filename, buffer in zin.read(targets).items(): + zout.writef(buffer, filename) + os.remove(self.path) + shutil.move(tmp_file.name, self.path) + except (py7zr.Bad7zFile, OSError) as e: + logger.error("Error rebuilding 7zip file [%s]: %s", e, self.path) + return False + return True - # replace with the new file - os.remove(self.path) - os.rename(tmp_name, self.path) - - def copy_from_archive(self, otherArchive: UnknownArchiver) -> bool: + def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: """Replace the current zip with one copied from another archive""" try: with py7zr.SevenZipFile(self.path, "w") as zout: - for fname in otherArchive.get_filename_list(): - data = otherArchive.read_file(fname) + for filename in other_archive.get_filename_list(): + data = other_archive.read_file( + filename + ) # This will be very inefficient if other_archive is a 7z file if data is not None: - zout.writestr(data, fname) + zout.writestr(data, filename) except Exception as e: - logger.exception("Error while copying to %s: %s", self.path, e) + logger.error("Error while copying to 7zip archive [%s]: from %s to %s", e, other_archive.path, self.path) return False else: return True @@ -196,7 +199,7 @@ class ZipArchiver(UnknownArchiver): """ZIP implementation""" def __init__(self, path: pathlib.Path | str) -> None: - self.path = pathlib.Path(path) + super().__init__(path) def get_comment(self) -> str: with zipfile.ZipFile(self.path, "r") as zf: @@ -204,68 +207,58 @@ class ZipArchiver(UnknownArchiver): return comment def set_comment(self, comment: str) -> bool: - with zipfile.ZipFile(self.path, "a") as zf: + with zipfile.ZipFile(self.path, mode="a") as zf: zf.comment = bytes(comment, "utf-8") return True def read_file(self, archive_file: str) -> bytes: - with zipfile.ZipFile(self.path, "r") as zf: + with zipfile.ZipFile(self.path, mode="r") as zf: try: data = zf.read(archive_file) - except zipfile.BadZipfile as e: - logger.error("bad zipfile [%s]: %s :: %s", e, self.path, archive_file) - raise OSError from e - except OSError as e: - logger.error("bad zipfile [%s]: %s :: %s", e, self.path, archive_file) - raise OSError from e + except (zipfile.BadZipfile, OSError) as e: + logger.error("Error reading zip archive [%s]: %s :: %s", e, self.path, archive_file) + raise return data def remove_file(self, archive_file: str) -> bool: - try: - self.rebuild_zip_file([archive_file]) - except (zipfile.BadZipfile, OSError): - logger.exception("Failed to remove %s from zip archive", archive_file) - return False - else: - return True + return self.rebuild([archive_file]) def write_file(self, archive_file: str, data: bytes) -> bool: - # At the moment, no other option but to rebuild the whole - # zip archive w/o the indicated file. Very sucky, but maybe + # At the moment, no other option but to rebuild the whole + # zip archive w/o the indicated file. Very sucky, but maybe # another solution can be found - try: - files = self.get_filename_list() - if archive_file in files: - self.rebuild_zip_file([archive_file]) + files = self.get_filename_list() + if archive_file in files: + self.rebuild([archive_file]) + try: # now just add the archive file as a new one with zipfile.ZipFile(self.path, mode="a", allowZip64=True, compression=zipfile.ZIP_DEFLATED) as zf: zf.writestr(archive_file, data) return True except (zipfile.BadZipfile, OSError) as e: - logger.error("writing zip file failed [%s]: %s", e, self.path) + logger.error("Error writing zip archive [%s]: %s :: %s", e, self.path, archive_file) return False def get_filename_list(self) -> list[str]: try: - with zipfile.ZipFile(self.path, "r") as zf: + with zipfile.ZipFile(self.path, mode="r") as zf: namelist = zf.namelist() return namelist except (zipfile.BadZipfile, OSError) as e: - logger.error("Unable to get zipfile list [%s]: %s", e, self.path) + logger.error("Error listing files in zip archive [%s]: %s", e, self.path) return [] - def rebuild_zip_file(self, exclude_list: list[str]) -> None: + def rebuild(self, exclude_list: list[str]) -> bool: """Zip helper func This recompresses the zip archive, without the files in the exclude_list """ - tmp_fd, tmp_name = tempfile.mkstemp(dir=os.path.dirname(self.path)) - os.close(tmp_fd) - try: - with zipfile.ZipFile(self.path, "r") as zin: - with zipfile.ZipFile(tmp_name, "w", allowZip64=True) as zout: + with zipfile.ZipFile( + tempfile.NamedTemporaryFile(dir=os.path.dirname(self.path), delete=False), "w", allowZip64=True + ) as zout: + with zipfile.ZipFile(self.path, mode="r") as zin: for item in zin.infolist(): buffer = zin.read(item.filename) if item.filename not in exclude_list: @@ -273,12 +266,35 @@ class ZipArchiver(UnknownArchiver): # preserve the old comment zout.comment = zin.comment - except (zipfile.BadZipfile, OSError): - logger.exception("Error rebuilding zip file: %s", self.path) - # replace with the new file - os.remove(self.path) - os.rename(tmp_name, self.path) + # replace with the new file + os.remove(self.path) + shutil.move(cast(str, zout.filename), self.path) + + except (zipfile.BadZipfile, OSError) as e: + logger.error("Error rebuilding zip file [%s]: %s", e, self.path) + return False + return True + + def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: + """Replace the current zip with one copied from another archive""" + try: + with zipfile.ZipFile(self.path, mode="w", allowZip64=True) as zout: + for filename in other_archive.get_filename_list(): + data = other_archive.read_file(filename) + if data is not None: + zout.writestr(filename, data) + + # preserve the old comment + comment = other_archive.get_comment() + if comment is not None: + if not self.write_zip_comment(self.path, comment): + return False + except Exception as e: + logger.error("Error while copying to zip archive [%s]: from %s to %s", e, other_archive.path, self.path) + return False + else: + return True def write_zip_comment(self, filename: pathlib.Path | str, comment: str) -> bool: """ @@ -295,18 +311,16 @@ class ZipArchiver(UnknownArchiver): file_length = statinfo.st_size try: - with open(filename, "r+b") as fo: + with open(filename, mode="r+b") as fo: # the starting position, relative to EOF pos = -4 - found = False # walk backwards to find the "End of Central Directory" record while (not found) and (-pos != file_length): # seek, relative to EOF fo.seek(pos, 2) - value = fo.read(4) # look for the end of central directory signature @@ -334,29 +348,9 @@ class ZipArchiver(UnknownArchiver): fo.write(comment.encode("utf-8")) fo.truncate() else: - raise Exception("Failed to write comment to zip file!") - except Exception: - logger.exception("Writing comment to %s failed", filename) - return False - else: - return True - - def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: - """Replace the current zip with one copied from another archive""" - try: - with zipfile.ZipFile(self.path, "w", allowZip64=True) as zout: - for fname in other_archive.get_filename_list(): - data = other_archive.read_file(fname) - if data is not None: - zout.writestr(fname, data) - - # preserve the old comment - comment = other_archive.get_comment() - if comment is not None: - if not self.write_zip_comment(self.path, comment): - return False - except Exception: - logger.exception("Error while copying to %s", self.path) + raise Exception("Could not find the End of Central Directory record!") + except Exception as e: + logger.error("Error writing comment to zip archive [%s]: %s", e, self.path) return False else: return True @@ -365,15 +359,10 @@ class ZipArchiver(UnknownArchiver): class RarArchiver(UnknownArchiver): """RAR implementation""" - devnull = None - - def __init__(self, path: pathlib.Path | str, rar_exe_path: str) -> None: - self.path = pathlib.Path(path) + def __init__(self, path: pathlib.Path | str, rar_exe_path: str = "rar") -> None: + super().__init__(path) self.rar_exe_path = rar_exe_path - if RarArchiver.devnull is None: - RarArchiver.devnull = open(os.devnull, "bw") - # windows only, keeps the cmd.exe from popping up if platform.system() == "Windows": self.startupinfo = subprocess.STARTUPINFO() # type: ignore @@ -386,30 +375,36 @@ class RarArchiver(UnknownArchiver): return str(rarc.comment) if rarc else "" def set_comment(self, comment: str) -> bool: - if self.rar_exe_path: + if rar_support and self.rar_exe_path: try: # write comment to temp file - tmp_fd, tmp_name = tempfile.mkstemp() - with os.fdopen(tmp_fd, "wb") as f: - f.write(comment.encode("utf-8")) + with tempfile.NamedTemporaryFile() as tmp_file: + tmp_file.write(comment.encode("utf-8")) - working_dir = os.path.dirname(os.path.abspath(self.path)) + working_dir = os.path.dirname(os.path.abspath(self.path)) - # use external program to write comment to Rar archive - proc_args = [self.rar_exe_path, "c", "-w" + working_dir, "-c-", "-z" + tmp_name, str(self.path)] - subprocess.call( - proc_args, - startupinfo=self.startupinfo, - stdout=RarArchiver.devnull, - stdin=RarArchiver.devnull, - stderr=RarArchiver.devnull, - ) + # use external program to write comment to Rar archive + proc_args = [ + self.rar_exe_path, + "c", + f"-w{working_dir}", + "-c-", + f"-z{tmp_file.name}", + str(self.path), + ] + subprocess.run( + proc_args, + startupinfo=self.startupinfo, + stdout=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=True, + ) if platform.system() == "Darwin": time.sleep(1) - os.remove(tmp_name) - except (subprocess.CalledProcessError, OSError): - logger.exception("Failed to set a comment") + except (subprocess.CalledProcessError, OSError) as e: + logger.exception("Error writing comment to rar archive [%s]: %s", e, self.path) return False else: return True @@ -431,7 +426,7 @@ class RarArchiver(UnknownArchiver): if entries[0][0].file_size != len(entries[0][1]): logger.info( - "read_file(): [file is not expected size: %d vs %d] %s:%s [attempt # %d]", + "Error reading rar archive [file is not expected size: %d vs %d] %s :: %s :: tries #%d", entries[0][0].file_size, len(entries[0][1]), self.path, @@ -439,19 +434,22 @@ class RarArchiver(UnknownArchiver): tries, ) continue + except OSError as e: - logger.error("read_file(): [%s] %s:%s attempt #%d", e, self.path, archive_file, tries) + logger.error("Error reading rar archive [%s]: %s :: %s :: tries #%d", e, self.path, archive_file, tries) time.sleep(1) except Exception as e: logger.error( - "Unexpected exception in read_file(): [%s] for %s:%s attempt #%d", e, self.path, archive_file, tries + "Unexpected exception reading rar archive [%s]: %s :: %s :: tries #%d", + e, + self.path, + archive_file, + tries, ) break else: # Success. Entries is a list of of tuples: ( rarinfo, filedata) - if tries > 1: - logger.info("Attempted read_files() {%d} times", tries) if len(entries) == 1: return entries[0][1] @@ -459,59 +457,52 @@ class RarArchiver(UnknownArchiver): raise OSError - def write_file(self, archive_file: str, data: bytes) -> bool: - + def remove_file(self, archive_file: str) -> bool: if self.rar_exe_path: - try: - tmp_folder = tempfile.mkdtemp() + # use external program to remove file from Rar archive + result = subprocess.run( + [self.rar_exe_path, "d", "-c-", self.path, archive_file], + startupinfo=self.startupinfo, + stdout=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) - tmp_file = os.path.join(tmp_folder, archive_file) - - working_dir = os.path.dirname(os.path.abspath(self.path)) - - # TODO: will this break if 'archive_file' is in a subfolder. i.e. "foo/bar.txt" - # will need to create the subfolder above, I guess... - with open(tmp_file, "wb") as f: - f.write(data) - - # use external program to write file to Rar archive - subprocess.call( - [self.rar_exe_path, "a", "-w" + working_dir, "-c-", "-ep", self.path, tmp_file], - startupinfo=self.startupinfo, - stdout=RarArchiver.devnull, - stdin=RarArchiver.devnull, - stderr=RarArchiver.devnull, + if platform.system() == "Darwin": + time.sleep(1) + if result.returncode != 0: + logger.error( + "Error removing file from rar archive [exitcode: %d]: %s :: %s", + result.returncode, + self.path, + archive_file, ) - - if platform.system() == "Darwin": - time.sleep(1) - os.remove(tmp_file) - os.rmdir(tmp_folder) - except (subprocess.CalledProcessError, OSError) as e: - logger.info(str(e)) - logger.exception("Failed write %s to rar archive", archive_file) return False - else: - return True + return True else: return False - def remove_file(self, archive_file: str) -> bool: + def write_file(self, archive_file: str, data: bytes) -> bool: if self.rar_exe_path: - try: - # use external program to remove file from Rar archive - subprocess.call( - [self.rar_exe_path, "d", "-c-", self.path, archive_file], - startupinfo=self.startupinfo, - stdout=RarArchiver.devnull, - stdin=RarArchiver.devnull, - stderr=RarArchiver.devnull, - ) + archive_path = pathlib.PurePosixPath(archive_file) + archive_name = archive_path.name + archive_parent = str(archive_path.parent).lstrip("./") - if platform.system() == "Darwin": - time.sleep(1) - except (subprocess.CalledProcessError, OSError): - logger.exception("Failed to remove %s from rar archive", archive_file) + # use external program to write file to Rar archive + result = subprocess.run( + [self.rar_exe_path, "a", f"-si{archive_name}", f"-ap{archive_parent}", "-c-", "-ep", self.path], + input=data, + startupinfo=self.startupinfo, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + if platform.system() == "Darwin": + time.sleep(1) + if result.returncode != 0: + logger.error( + "Error writing rar archive [exitcode: %d]: %s :: %s", result.returncode, self.path, archive_file + ) return False else: return True @@ -521,27 +512,65 @@ class RarArchiver(UnknownArchiver): def get_filename_list(self) -> list[str]: rarc = self.get_rar_obj() tries = 0 - # while tries < 7: - namelist = [] + if rar_support and rarc: + while tries < 7: + try: + tries = tries + 1 + namelist = [] + for item in rarc.infolist(): + if item.file_size != 0: + namelist.append(item.filename) + + except OSError as e: + logger.error("Error listing files in rar archive [%s]: %s :: attempt #%d", e, self.path, tries) + time.sleep(1) + + else: + return namelist + return [] + + def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: + """Replace the current archive with one copied from another archive""" try: - tries = tries + 1 - for item in rarc.infolist() if rarc else None: - if item.file_size != 0: - namelist.append(item.filename) + with tempfile.TemporaryDirectory() as tmp_dir: + tmp_path = pathlib.Path(tmp_dir) + rar_cwd = tmp_path / "rar" + rar_cwd.mkdir(exist_ok=True) + rar_path = (tmp_path / self.path.name).with_suffix(".rar") - except OSError as e: - logger.error(f"get_filename_list(): [{e}] {self.path} attempt #{tries}".format(str(e), self.path, tries)) - time.sleep(1) + for filename in other_archive.get_filename_list(): + (rar_cwd / filename).parent.mkdir(exist_ok=True, parents=True) + data = other_archive.read_file(filename) + if data is not None: + with open(rar_cwd / filename, mode="w+b") as tmp_file: + tmp_file.write(data) + result = subprocess.run( + [self.rar_exe_path, "a", "-r", "-c-", str(rar_path.absolute()), "."], + cwd=rar_cwd.absolute(), + startupinfo=self.startupinfo, + stdout=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + if result.returncode != 0: + logger.error("Error while copying to rar archive [exitcode: %d]: %s", result.returncode, self.path) + return False - return namelist + shutil.move(rar_path, self.path) + except Exception as e: + logger.exception("Error while copying to rar archive [%s]: from %s to %s", e, other_archive.path, self.path) + return False + else: + return True def get_rar_obj(self) -> rarfile.RarFile | None: - try: - rarc = rarfile.RarFile(str(self.path)) - except OSError as e: - logger.error("getRARObj(): [%s] %s", e, self.path) - else: - return rarc + if rar_support: + try: + rarc = rarfile.RarFile(str(self.path)) + except (OSError, rarfile.RarFileError) as e: + logger.error("Unable to get rar object [%s]: %s", e, self.path) + else: + return rarc return None @@ -551,63 +580,81 @@ class FolderArchiver(UnknownArchiver): """Folder implementation""" def __init__(self, path: pathlib.Path | str) -> None: - self.path = pathlib.Path(path) + super().__init__(path) self.comment_file_name = "ComicTaggerFolderComment.txt" def get_comment(self) -> str: - return self.read_file(self.comment_file_name).decode("utf-8") + try: + return self.read_file(self.comment_file_name).decode("utf-8") + except OSError: + return "" def set_comment(self, comment: str) -> bool: - return self.write_file(self.comment_file_name, comment.encode("utf-8")) + if (self.path / self.comment_file_name).exists() or comment: + return self.write_file(self.comment_file_name, comment.encode("utf-8")) + return True def read_file(self, archive_file: str) -> bytes: - - data = bytes() - fname = os.path.join(self.path, archive_file) try: - with open(fname, "rb") as f: + with open(self.path / archive_file, mode="rb") as f: data = f.read() - except OSError: - logger.exception("Failed to read: %s", fname) + except OSError as e: + logger.error("Error reading folder archive [%s]: %s :: %s", e, self.path, archive_file) + raise return data - def write_file(self, archive_file: str, data: bytes) -> bool: - - fname = os.path.join(self.path, archive_file) + def remove_file(self, archive_file: str) -> bool: try: - with open(fname, "wb") as f: - f.write(data) - except OSError: - logger.exception("Failed to write: %s", fname) + os.remove(self.path / archive_file) + except OSError as e: + logger.error("Error removing file for folder archive [%s]: %s :: %s", e, self.path, archive_file) return False else: return True - def remove_file(self, archive_file: str) -> bool: - - fname = os.path.join(self.path, archive_file) + def write_file(self, archive_file: str, data: bytes) -> bool: + logger.error("Fuck this: %s", archive_file) try: - os.remove(fname) - except OSError: - logger.exception("Failed to remove: %s", fname) + file_path = self.path / archive_file + file_path.parent.mkdir(exist_ok=True, parents=True) + with open(self.path / archive_file, mode="wb") as f: + f.write(data) + except OSError as e: + logger.error("Error writing folder archive [%s]: %s :: %s", e, self.path, archive_file) return False else: return True def get_filename_list(self) -> list[str]: - return self.list_files(self.path) + filenames = [] + try: + for root, dirs, files in os.walk(self.path): + for f in files: + filenames.append(os.path.relpath(os.path.join(root, f), self.path)) + return filenames + except OSError as e: + logger.error("Error listing files in folder archive [%s]: %s", e, self.path) + return [] - def list_files(self, folder: pathlib.Path | str) -> list[str]: + def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: + """Replace the current zip with one copied from another archive""" + try: + for filename in other_archive.get_filename_list(): + data = other_archive.read_file(filename) + if data is not None: + self.write_file(filename, data) - itemlist = [] - - for item in os.listdir(folder): - itemlist.append(item) - if os.path.isdir(item): - itemlist.extend(self.list_files(os.path.join(folder, item))) - - return itemlist + # preserve the old comment + comment = other_archive.get_comment() + if comment is not None: + if not self.set_comment(comment): + return False + except Exception: + logger.exception("Error while copying archive from %s to %s", other_archive.path, self.path) + return False + else: + return True class ComicArchive: @@ -619,16 +666,16 @@ class ComicArchive: def __init__( self, path: pathlib.Path | str, - rar_exe_path: str = "", + rar_exe_path: str = "rar", default_image_path: pathlib.Path | str | None = None, ) -> None: self.cbi_md: GenericMetadata | None = None self.cix_md: GenericMetadata | None = None self.comet_filename: str | None = None self.comet_md: GenericMetadata | None = None - self.has__cbi: bool | None = None - self.has__cix: bool | None = None - self.has__comet: bool | None = None + self._has_cbi: bool | None = None + self._has_cix: bool | None = None + self._has_comet: bool | None = None self.path = pathlib.Path(path) self.page_count: int | None = None self.page_list: list[str] = [] @@ -666,18 +713,20 @@ class ComicArchive: self.archive_type = self.ArchiveType.Rar self.archiver = RarArchiver(self.path, rar_exe_path=self.rar_exe_path) - if not ComicArchive.logo_data: - fname = self.default_image_path - if fname: - with open(fname, "rb") as fd: - ComicArchive.logo_data = fd.read() + elif self.folder_test(): + self.archive_type = self.ArchiveType.Folder + self.archiver = FolderArchiver(self.path) + + if not ComicArchive.logo_data and self.default_image_path: + with open(self.default_image_path, mode="rb") as fd: + ComicArchive.logo_data = fd.read() def reset_cache(self) -> None: """Clears the cached data""" - self.has__cix = None - self.has__cbi = None - self.has__comet = None + self._has_cix = None + self._has_cbi = None + self._has_comet = None self.comet_filename = None self.page_count = None self.page_list = [] @@ -704,6 +753,9 @@ class ComicArchive: return rarfile.is_rarfile(str(self.path)) return False + def folder_test(self) -> bool: + return self.path.is_dir() + def is_sevenzip(self) -> bool: return self.archive_type == self.ArchiveType.SevenZip @@ -742,7 +794,9 @@ class ComicArchive: return self.is_writable() def seems_to_be_a_comic_archive(self) -> bool: - if (self.is_zip() or self.is_rar() or self.is_sevenzip()) and (self.get_number_of_pages() > 0): + if (self.is_zip() or self.is_rar() or self.is_sevenzip() or self.is_folder()) and ( + self.get_number_of_pages() > 0 + ): return True return False @@ -795,7 +849,7 @@ class ComicArchive: try: image_data = self.archiver.read_file(filename) or bytes() except OSError: - logger.exception("Error reading in page. Substituting logo page.") + logger.exception("Error reading in page %d. Substituting logo page.", index) image_data = ComicArchive.logo_data return image_data @@ -834,7 +888,7 @@ class ComicArchive: length_buckets[length] = 1 # sort by most common - sorted_buckets = sorted(iter(length_buckets.items()), key=lambda k_v: (k_v[1], k_v[0]), reverse=True) + sorted_buckets = sorted(length_buckets.items(), key=lambda tup: (tup[1], tup[0]), reverse=True) # statistical mode occurrence is first mode_length = sorted_buckets[0][0] @@ -905,14 +959,14 @@ class ComicArchive: return self.archiver.get_comment() def has_cbi(self) -> bool: - if self.has__cbi is None: + if self._has_cbi is None: if not self.seems_to_be_a_comic_archive(): - self.has__cbi = False + self._has_cbi = False else: comment = self.archiver.get_comment() - self.has__cbi = ComicBookInfo().validate_string(comment) + self._has_cbi = ComicBookInfo().validate_string(comment) - return self.has__cbi + return self._has_cbi def write_cbi(self, metadata: GenericMetadata) -> bool: if metadata is not None: @@ -920,7 +974,7 @@ class ComicArchive: cbi_string = ComicBookInfo().string_from_metadata(metadata) write_success = self.archiver.set_comment(cbi_string) if write_success: - self.has__cbi = True + self._has_cbi = True self.cbi_md = metadata self.reset_cache() return write_success @@ -931,7 +985,7 @@ class ComicArchive: if self.has_cbi(): write_success = self.archiver.set_comment("") if write_success: - self.has__cbi = False + self._has_cbi = False self.cbi_md = None self.reset_cache() return write_success @@ -963,7 +1017,7 @@ class ComicArchive: try: raw_cix = self.archiver.read_file(self.ci_xml_filename) or b"" except OSError as e: - logger.error("Error reading in raw CIX!: %s", e) + logger.error("Error reading in raw CIX! for %s: %s", self.path, e) raw_cix = bytes() return raw_cix @@ -974,7 +1028,7 @@ class ComicArchive: cix_string = ComicInfoXml().string_from_metadata(metadata, xml=raw_cix) write_success = self.archiver.write_file(self.ci_xml_filename, cix_string.encode("utf-8")) if write_success: - self.has__cix = True + self._has_cix = True self.cix_md = metadata self.reset_cache() return write_success @@ -985,22 +1039,22 @@ class ComicArchive: if self.has_cix(): write_success = self.archiver.remove_file(self.ci_xml_filename) if write_success: - self.has__cix = False + self._has_cix = False self.cix_md = None self.reset_cache() return write_success return True def has_cix(self) -> bool: - if self.has__cix is None: + if self._has_cix is None: if not self.seems_to_be_a_comic_archive(): - self.has__cix = False + self._has_cix = False elif self.ci_xml_filename in self.archiver.get_filename_list(): - self.has__cix = True + self._has_cix = True else: - self.has__cix = False - return self.has__cix + self._has_cix = False + return self._has_cix def read_comet(self) -> GenericMetadata: if self.comet_md is None: @@ -1029,7 +1083,6 @@ class ComicArchive: def read_raw_comet(self) -> str: raw_comet = "" if not self.has_comet(): - logger.info("%s doesn't have CoMet data!", self.path) raw_comet = "" else: try: @@ -1055,7 +1108,7 @@ class ComicArchive: comet_string = CoMet().string_from_metadata(metadata) write_success = self.archiver.write_file(cast(str, self.comet_filename), comet_string.encode("utf-8")) if write_success: - self.has__comet = True + self._has_comet = True self.comet_md = metadata self.reset_cache() return write_success @@ -1066,17 +1119,17 @@ class ComicArchive: if self.has_comet(): write_success = self.archiver.remove_file(cast(str, self.comet_filename)) if write_success: - self.has__comet = False + self._has_comet = False self.comet_md = None self.reset_cache() return write_success return True def has_comet(self) -> bool: - if self.has__comet is None: - self.has__comet = False + if self._has_comet is None: + self._has_comet = False if not self.seems_to_be_a_comic_archive(): - return self.has__comet + return self._has_comet # look at all xml files in root, and search for CoMet data, get first for n in self.archiver.get_filename_list(): @@ -1088,20 +1141,20 @@ class ComicArchive: if d: data = d.decode("utf-8") except Exception as e: - logger.warning("Error reading in Comet XML for validation!: %s", e) + logger.warning("Error reading in Comet XML for validation! from %s: %s", self.path, e) if CoMet().validate_string(data): # since we found it, save it! self.comet_filename = n - self.has__comet = True + self._has_comet = True break - return self.has__comet + return self._has_comet def apply_archive_info_to_metadata(self, md: GenericMetadata, calc_page_sizes: bool = False) -> None: md.page_count = self.get_number_of_pages() if calc_page_sizes: - for p in md.pages: + for index, p in enumerate(md.pages): idx = int(p["Image"]) if pil_available: if "ImageSize" not in p or "ImageHeight" not in p or "ImageWidth" not in p: @@ -1118,7 +1171,7 @@ class ComicArchive: p["ImageHeight"] = str(h) p["ImageWidth"] = str(w) except Exception as e: - logger.warning("decoding image failed: %s", e) + logger.warning("Error decoding image [%s] %s :: image %s", e, self.path, index) p["ImageSize"] = str(len(data)) else: diff --git a/comicapi/comicbookinfo.py b/comicapi/comicbookinfo.py index b2cd35a..a1939a3 100644 --- a/comicapi/comicbookinfo.py +++ b/comicapi/comicbookinfo.py @@ -115,7 +115,6 @@ class ComicBookInfo: return metadata def string_from_metadata(self, metadata: GenericMetadata) -> str: - cbi_container = self.create_json_dictionary(metadata) return json.dumps(cbi_container) @@ -165,7 +164,6 @@ class ComicBookInfo: return cbi_container def write_to_external_file(self, filename: str, metadata: GenericMetadata) -> None: - cbi_container = self.create_json_dictionary(metadata) with open(filename, "w", encoding="utf-8") as f: diff --git a/comicapi/comicinfoxml.py b/comicapi/comicinfoxml.py index 271c966..80cb381 100644 --- a/comicapi/comicinfoxml.py +++ b/comicapi/comicinfoxml.py @@ -268,11 +268,9 @@ class ComicInfoXml: return md def write_to_external_file(self, filename: str, metadata: GenericMetadata, xml: bytes = b"") -> None: - tree = self.convert_metadata_to_xml(self, metadata, xml) tree.write(filename, encoding="utf-8", xml_declaration=True) def read_from_external_file(self, filename: str) -> GenericMetadata: - tree = ET.parse(filename) return self.convert_xml_to_metadata(tree) diff --git a/comictaggerlib/cli.py b/comictaggerlib/cli.py index 83e213e..d43f05c 100644 --- a/comictaggerlib/cli.py +++ b/comictaggerlib/cli.py @@ -20,6 +20,7 @@ import argparse import json import logging import os +import shutil import sys from pprint import pprint @@ -507,7 +508,7 @@ def process_file_cli( if not opts.dryrun: # rename the file os.makedirs(os.path.dirname(new_abs_path), 0o777, True) - os.rename(filename, new_abs_path) + shutil.move(filename, new_abs_path) else: suffix = " (dry-run, no change)" diff --git a/comictaggerlib/renamewindow.py b/comictaggerlib/renamewindow.py index ae27818..90fe667 100644 --- a/comictaggerlib/renamewindow.py +++ b/comictaggerlib/renamewindow.py @@ -17,6 +17,7 @@ from __future__ import annotations import logging import os +import shutil from typing import TypedDict from PyQt5 import QtCore, QtWidgets, uic @@ -199,7 +200,7 @@ class RenameWindow(QtWidgets.QDialog): continue os.makedirs(os.path.dirname(new_abs_path), 0o777, True) - os.rename(item["archive"].path, new_abs_path) + shutil.move(item["archive"].path, new_abs_path) item["archive"].rename(new_abs_path) diff --git a/tests/comicarchive_test.py b/tests/comicarchive_test.py index 3c44fe9..1161282 100644 --- a/tests/comicarchive_test.py +++ b/tests/comicarchive_test.py @@ -1,20 +1,20 @@ from __future__ import annotations +import pathlib import shutil -from os.path import dirname, join import pytest -from comicapi.comicarchive import ComicArchive, rar_support -from comicapi.genericmetadata import GenericMetadata, md_test +import comicapi.comicarchive +import comicapi.genericmetadata -thisdir = dirname(__file__) +thisdir = pathlib.Path(__file__).parent +cbz_path = thisdir / "data" / "Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz" -@pytest.mark.xfail(not rar_support, reason="rar support") +@pytest.mark.xfail(not comicapi.comicarchive.rar_support, reason="rar support") def test_getPageNameList(): - ComicArchive.logo_data = b"" - c = ComicArchive(join(thisdir, "data", "fake_cbr.cbr")) + c = comicapi.comicarchive.ComicArchive(thisdir / "data" / "fake_cbr.cbr") pageNameList = c.get_page_name_list() assert pageNameList == [ @@ -28,40 +28,37 @@ def test_getPageNameList(): def test_set_default_page_list(tmpdir): - md = GenericMetadata() - md.overlay(md_test) + md = comicapi.genericmetadata.GenericMetadata() + md.overlay(comicapi.genericmetadata.md_test) md.pages = [] - md.set_default_page_list(len(md_test.pages)) + md.set_default_page_list(len(comicapi.genericmetadata.md_test.pages)) assert isinstance(md.pages[0]["Image"], int) def test_page_type_read(): - c_path = join(thisdir, "data", "Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz") - c = ComicArchive(str(c_path)) + c = comicapi.comicarchive.ComicArchive(cbz_path) md = c.read_cix() assert isinstance(md.pages[0]["Type"], str) -def test_metadat_read(): - c = ComicArchive( - join(thisdir, "data", "Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz") +def test_metadata_read(): + c = comicapi.comicarchive.ComicArchive( + thisdir / "data" / "Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz" ) md = c.read_cix() md_dict = md.__dict__ - md_test_dict = md_test.__dict__ + md_test_dict = comicapi.genericmetadata.md_test.__dict__ assert md_dict == md_test_dict def test_save_cix(tmpdir): - comic_path = tmpdir.mkdir("cbz").join( - "Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz" - ) - c_path = join(thisdir, "data", "Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz") - shutil.copy(c_path, comic_path) + comic_path = tmpdir.mkdir("cbz") / cbz_path.name + print(comic_path) + shutil.copy(cbz_path, comic_path) - c = ComicArchive(str(comic_path)) + c = comicapi.comicarchive.ComicArchive(comic_path) md = c.read_cix() md.set_default_page_list(c.get_number_of_pages()) @@ -71,13 +68,12 @@ def test_save_cix(tmpdir): def test_page_type_save(tmpdir): - comic_path = tmpdir.mkdir("cbz").join( - "Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz" - ) - c_path = join(thisdir, "data", "Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz") - shutil.copy(c_path, comic_path) + comic_path = tmpdir.mkdir("cbz") / cbz_path.name + print(comic_path) - c = ComicArchive(str(comic_path)) + shutil.copy(cbz_path, comic_path) + + c = comicapi.comicarchive.ComicArchive(comic_path) md = c.read_cix() t = md.pages[0] t["Type"] = "" @@ -85,3 +81,45 @@ def test_page_type_save(tmpdir): assert c.write_cix(md) md = c.read_cix() + + +archivers = [ + comicapi.comicarchive.ZipArchiver, + comicapi.comicarchive.SevenZipArchiver, + comicapi.comicarchive.FolderArchiver, + pytest.param( + comicapi.comicarchive.RarArchiver, + marks=pytest.mark.xfail(not comicapi.comicarchive.rar_support, reason="rar support"), + ), +] + + +@pytest.mark.parametrize("archiver", archivers) +def test_copy_to_archive(archiver, tmpdir): + comic_path = tmpdir / cbz_path.with_suffix("").name + + cbz = comicapi.comicarchive.ComicArchive(cbz_path) + archive = archiver(comic_path) + + assert archive.copy_from_archive(cbz.archiver) + + comic_archive = comicapi.comicarchive.ComicArchive(comic_path) + + assert comic_archive.seems_to_be_a_comic_archive() + assert set(cbz.archiver.get_filename_list()) == set(comic_archive.archiver.get_filename_list()) + + md = comic_archive.read_cix() + md_dict = md.__dict__ + md_test_dict = comicapi.genericmetadata.md_test.__dict__ + assert md_dict == md_test_dict + + md = comicapi.genericmetadata.GenericMetadata() + md.overlay(comicapi.genericmetadata.md_test) + md.series = "test" + + assert comic_archive.write_cix(md) + + test_md = comic_archive.read_cix() + md_dict = md.__dict__ + test_md_dict = test_md.__dict__ + assert md_dict == test_md_dict