diff --git a/comicapi/archivers/__init__.py b/comicapi/archivers/__init__.py new file mode 100644 index 0000000..8758c55 --- /dev/null +++ b/comicapi/archivers/__init__.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +from comicapi.archivers.unknown import UnknownArchiver + +__all__ = ["UnknownArchiver"] +from comicapi.archivers.folder import FolderArchiver +from comicapi.archivers.rar import RarArchiver, rar_support +from comicapi.archivers.sevenzip import SevenZipArchiver, z7_support +from comicapi.archivers.zip import ZipArchiver + +__all__ = [ + "UnknownArchiver", + "FolderArchiver", + "RarArchiver", + "rar_support", + "ZipArchiver", + "SevenZipArchiver", + "z7_support", +] diff --git a/comicapi/archivers/folder.py b/comicapi/archivers/folder.py new file mode 100644 index 0000000..4b0186f --- /dev/null +++ b/comicapi/archivers/folder.py @@ -0,0 +1,90 @@ +from __future__ import annotations + +import logging +import os +import pathlib + +from comicapi.archivers import UnknownArchiver + +logger = logging.getLogger(__name__) + + +class FolderArchiver(UnknownArchiver): + + """Folder implementation""" + + def __init__(self, path: pathlib.Path | str) -> None: + super().__init__(path) + self.comment_file_name = "ComicTaggerFolderComment.txt" + + def get_comment(self) -> str: + try: + return self.read_file(self.comment_file_name).decode("utf-8") + except OSError: + return "" + + def set_comment(self, comment: str) -> bool: + if (self.path / self.comment_file_name).exists() or comment: + return self.write_file(self.comment_file_name, comment.encode("utf-8")) + return True + + def read_file(self, archive_file: str) -> bytes: + try: + with open(self.path / archive_file, mode="rb") as f: + data = f.read() + except OSError as e: + logger.error("Error reading folder archive [%s]: %s :: %s", e, self.path, archive_file) + raise + + return data + + def remove_file(self, archive_file: str) -> bool: + try: + (self.path / archive_file).unlink(missing_ok=True) + except OSError as e: + logger.error("Error removing file for folder archive [%s]: %s :: %s", e, self.path, archive_file) + return False + else: + return True + + def write_file(self, archive_file: str, data: bytes) -> bool: + try: + file_path = self.path / archive_file + file_path.parent.mkdir(exist_ok=True, parents=True) + with open(self.path / archive_file, mode="wb") as f: + f.write(data) + except OSError as e: + logger.error("Error writing folder archive [%s]: %s :: %s", e, self.path, archive_file) + return False + else: + return True + + def get_filename_list(self) -> list[str]: + filenames = [] + try: + for root, _dirs, files in os.walk(self.path): + for f in files: + filenames.append(os.path.relpath(os.path.join(root, f), self.path).replace(os.path.sep, "/")) + return filenames + except OSError as e: + logger.error("Error listing files in folder archive [%s]: %s", e, self.path) + return [] + + def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: + """Replace the current zip with one copied from another archive""" + try: + for filename in other_archive.get_filename_list(): + data = other_archive.read_file(filename) + if data is not None: + self.write_file(filename, data) + + # preserve the old comment + comment = other_archive.get_comment() + if comment is not None: + if not self.set_comment(comment): + return False + except Exception: + logger.exception("Error while copying archive from %s to %s", other_archive.path, self.path) + return False + else: + return True diff --git a/comicapi/archivers/rar.py b/comicapi/archivers/rar.py new file mode 100644 index 0000000..db04b8b --- /dev/null +++ b/comicapi/archivers/rar.py @@ -0,0 +1,246 @@ +from __future__ import annotations + +import logging +import os +import pathlib +import platform +import shutil +import subprocess +import tempfile +import time + +from comicapi.archivers import UnknownArchiver + +try: + from unrar.cffi import rarfile + + rar_support = True +except ImportError: + rar_support = False + + +logger = logging.getLogger(__name__) + +if not rar_support: + logger.error("unrar-cffi unavailable") + + +class RarArchiver(UnknownArchiver): + """RAR implementation""" + + def __init__(self, path: pathlib.Path | str, rar_exe_path: str = "rar") -> None: + super().__init__(path) + self.rar_exe_path = shutil.which(rar_exe_path) or "" + + # windows only, keeps the cmd.exe from popping up + if platform.system() == "Windows": + self.startupinfo = subprocess.STARTUPINFO() # type: ignore + self.startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore + else: + self.startupinfo = None + + def get_comment(self) -> str: + rarc = self.get_rar_obj() + return rarc.comment.decode("utf-8") if rarc else "" + + def set_comment(self, comment: str) -> bool: + if rar_support and self.rar_exe_path: + try: + # write comment to temp file + with tempfile.TemporaryDirectory() as tmp_dir: + tmp_file = pathlib.Path(tmp_dir) / "rar_comment.txt" + tmp_file.write_text(comment, encoding="utf-8") + + working_dir = os.path.dirname(os.path.abspath(self.path)) + + # use external program to write comment to Rar archive + proc_args = [ + self.rar_exe_path, + "c", + f"-w{working_dir}", + "-c-", + f"-z{tmp_file}", + str(self.path), + ] + subprocess.run( + proc_args, + startupinfo=self.startupinfo, + stdout=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=True, + ) + + if platform.system() == "Darwin": + time.sleep(1) + except (subprocess.CalledProcessError, OSError) as e: + logger.exception("Error writing comment to rar archive [%s]: %s", e, self.path) + return False + else: + return True + else: + return False + + def read_file(self, archive_file: str) -> bytes: + + rarc = self.get_rar_obj() + if rarc is None: + return b"" + + tries = 0 + while tries < 7: + try: + tries = tries + 1 + data: bytes = rarc.open(archive_file).read() + entries = [(rarc.getinfo(archive_file), data)] + + if entries[0][0].file_size != len(entries[0][1]): + logger.info( + "Error reading rar archive [file is not expected size: %d vs %d] %s :: %s :: tries #%d", + entries[0][0].file_size, + len(entries[0][1]), + self.path, + archive_file, + tries, + ) + continue + + except OSError as e: + logger.error("Error reading rar archive [%s]: %s :: %s :: tries #%d", e, self.path, archive_file, tries) + time.sleep(1) + except Exception as e: + logger.error( + "Unexpected exception reading rar archive [%s]: %s :: %s :: tries #%d", + e, + self.path, + archive_file, + tries, + ) + break + + else: + # Success. Entries is a list of of tuples: ( rarinfo, filedata) + if len(entries) == 1: + return entries[0][1] + + raise OSError + + raise OSError + + def remove_file(self, archive_file: str) -> bool: + if self.rar_exe_path: + # use external program to remove file from Rar archive + result = subprocess.run( + [self.rar_exe_path, "d", "-c-", self.path, archive_file], + startupinfo=self.startupinfo, + stdout=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + if platform.system() == "Darwin": + time.sleep(1) + if result.returncode != 0: + logger.error( + "Error removing file from rar archive [exitcode: %d]: %s :: %s", + result.returncode, + self.path, + archive_file, + ) + return False + return True + else: + return False + + def write_file(self, archive_file: str, data: bytes) -> bool: + if self.rar_exe_path: + archive_path = pathlib.PurePosixPath(archive_file) + archive_name = archive_path.name + archive_parent = str(archive_path.parent).lstrip("./") + + # use external program to write file to Rar archive + result = subprocess.run( + [self.rar_exe_path, "a", f"-si{archive_name}", f"-ap{archive_parent}", "-c-", "-ep", self.path], + input=data, + startupinfo=self.startupinfo, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + if platform.system() == "Darwin": + time.sleep(1) + if result.returncode != 0: + logger.error( + "Error writing rar archive [exitcode: %d]: %s :: %s", result.returncode, self.path, archive_file + ) + return False + else: + return True + else: + return False + + def get_filename_list(self) -> list[str]: + rarc = self.get_rar_obj() + tries = 0 + if rar_support and rarc: + while tries < 7: + try: + tries = tries + 1 + namelist = [] + for item in rarc.infolist(): + if item.file_size != 0: + namelist.append(item.filename) + + except OSError as e: + logger.error("Error listing files in rar archive [%s]: %s :: attempt #%d", e, self.path, tries) + time.sleep(1) + + else: + return namelist + return [] + + def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: + """Replace the current archive with one copied from another archive""" + try: + with tempfile.TemporaryDirectory() as tmp_dir: + tmp_path = pathlib.Path(tmp_dir) + rar_cwd = tmp_path / "rar" + rar_cwd.mkdir(exist_ok=True) + rar_path = (tmp_path / self.path.name).with_suffix(".rar") + + for filename in other_archive.get_filename_list(): + (rar_cwd / filename).parent.mkdir(exist_ok=True, parents=True) + data = other_archive.read_file(filename) + if data is not None: + with open(rar_cwd / filename, mode="w+b") as tmp_file: + tmp_file.write(data) + result = subprocess.run( + [self.rar_exe_path, "a", "-r", "-c-", str(rar_path.absolute()), "."], + cwd=rar_cwd.absolute(), + startupinfo=self.startupinfo, + stdout=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + if result.returncode != 0: + logger.error("Error while copying to rar archive [exitcode: %d]: %s", result.returncode, self.path) + return False + + self.path.unlink(missing_ok=True) + shutil.move(rar_path, self.path) + except Exception as e: + logger.exception("Error while copying to rar archive [%s]: from %s to %s", e, other_archive.path, self.path) + return False + else: + return True + + def get_rar_obj(self) -> rarfile.RarFile | None: + if rar_support: + try: + rarc = rarfile.RarFile(str(self.path)) + except (OSError, rarfile.RarFileError) as e: + logger.error("Unable to get rar object [%s]: %s", e, self.path) + else: + return rarc + + return None diff --git a/comicapi/archivers/sevenzip.py b/comicapi/archivers/sevenzip.py new file mode 100644 index 0000000..9e0f3f0 --- /dev/null +++ b/comicapi/archivers/sevenzip.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +import logging +import os +import pathlib +import shutil +import tempfile + +from comicapi.archivers import UnknownArchiver + +try: + import py7zr + + z7_support = True +except ImportError: + z7_support = False + +logger = logging.getLogger(__name__) + + +class SevenZipArchiver(UnknownArchiver): + + """7Z implementation""" + + def __init__(self, path: pathlib.Path | str) -> None: + super().__init__(path) + + # @todo: Implement Comment? + def get_comment(self) -> str: + return "" + + def set_comment(self, comment: str) -> bool: + return False + + def read_file(self, archive_file: str) -> bytes: + data = b"" + try: + with py7zr.SevenZipFile(self.path, "r") as zf: + data = zf.read(archive_file)[archive_file].read() + except (py7zr.Bad7zFile, OSError) as e: + logger.error("Error reading 7zip archive [%s]: %s :: %s", e, self.path, archive_file) + raise + + return data + + def remove_file(self, archive_file: str) -> bool: + return self.rebuild([archive_file]) + + def write_file(self, archive_file: str, data: bytes) -> bool: + # At the moment, no other option but to rebuild the whole + # archive w/o the indicated file. Very sucky, but maybe + # another solution can be found + files = self.get_filename_list() + if archive_file in files: + if not self.rebuild([archive_file]): + return False + + try: + # now just add the archive file as a new one + with py7zr.SevenZipFile(self.path, "a") as zf: + zf.writestr(data, archive_file) + return True + except (py7zr.Bad7zFile, OSError) as e: + logger.error("Error writing 7zip archive [%s]: %s :: %s", e, self.path, archive_file) + return False + + def get_filename_list(self) -> list[str]: + try: + with py7zr.SevenZipFile(self.path, "r") as zf: + namelist: list[str] = [file.filename for file in zf.list() if not file.is_directory] + + return namelist + except (py7zr.Bad7zFile, OSError) as e: + logger.error("Error listing files in 7zip archive [%s]: %s", e, self.path) + return [] + + def rebuild(self, exclude_list: list[str]) -> bool: + """Zip helper func + + This recompresses the zip archive, without the files in the exclude_list + """ + + try: + # py7zr treats all archives as if they used solid compression + # so we need to get the filename list first to read all the files at once + with py7zr.SevenZipFile(self.path, mode="r") as zin: + targets = [f for f in zin.getnames() if f not in exclude_list] + with tempfile.NamedTemporaryFile(dir=os.path.dirname(self.path), delete=False) as tmp_file: + with py7zr.SevenZipFile(tmp_file.file, mode="w") as zout: + with py7zr.SevenZipFile(self.path, mode="r") as zin: + for filename, buffer in zin.read(targets).items(): + zout.writef(buffer, filename) + + self.path.unlink(missing_ok=True) + tmp_file.close() # Required on windows + + shutil.move(tmp_file.name, self.path) + except (py7zr.Bad7zFile, OSError) as e: + logger.error("Error rebuilding 7zip file [%s]: %s", e, self.path) + return False + return True + + def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: + """Replace the current zip with one copied from another archive""" + try: + with py7zr.SevenZipFile(self.path, "w") as zout: + for filename in other_archive.get_filename_list(): + data = other_archive.read_file( + filename + ) # This will be very inefficient if other_archive is a 7z file + if data is not None: + zout.writestr(data, filename) + except Exception as e: + logger.error("Error while copying to 7zip archive [%s]: from %s to %s", e, other_archive.path, self.path) + return False + else: + return True diff --git a/comicapi/archivers/unknown.py b/comicapi/archivers/unknown.py new file mode 100644 index 0000000..5f79720 --- /dev/null +++ b/comicapi/archivers/unknown.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +import pathlib + + +class UnknownArchiver: + + """Unknown implementation""" + + def __init__(self, path: pathlib.Path | str) -> None: + self.path = pathlib.Path(path) + + def get_comment(self) -> str: + return "" + + def set_comment(self, comment: str) -> bool: + return False + + def read_file(self, archive_file: str) -> bytes: + raise NotImplementedError + + def remove_file(self, archive_file: str) -> bool: + return False + + def write_file(self, archive_file: str, data: bytes) -> bool: + return False + + def get_filename_list(self) -> list[str]: + return [] + + def rebuild(self, exclude_list: list[str]) -> bool: + return False + + def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: + return False diff --git a/comicapi/archivers/zip.py b/comicapi/archivers/zip.py new file mode 100644 index 0000000..8d3d976 --- /dev/null +++ b/comicapi/archivers/zip.py @@ -0,0 +1,179 @@ +from __future__ import annotations + +import logging +import os +import pathlib +import shutil +import struct +import tempfile +import zipfile +from typing import cast + +from comicapi.archivers import UnknownArchiver + +logger = logging.getLogger(__name__) + + +class ZipArchiver(UnknownArchiver): + + """ZIP implementation""" + + def __init__(self, path: pathlib.Path | str) -> None: + super().__init__(path) + + def get_comment(self) -> str: + with zipfile.ZipFile(self.path, "r") as zf: + comment = zf.comment.decode("utf-8") + return comment + + def set_comment(self, comment: str) -> bool: + with zipfile.ZipFile(self.path, mode="a") as zf: + zf.comment = bytes(comment, "utf-8") + return True + + def read_file(self, archive_file: str) -> bytes: + with zipfile.ZipFile(self.path, mode="r") as zf: + try: + data = zf.read(archive_file) + except (zipfile.BadZipfile, OSError) as e: + logger.error("Error reading zip archive [%s]: %s :: %s", e, self.path, archive_file) + raise + return data + + def remove_file(self, archive_file: str) -> bool: + return self.rebuild([archive_file]) + + def write_file(self, archive_file: str, data: bytes) -> bool: + # At the moment, no other option but to rebuild the whole + # zip archive w/o the indicated file. Very sucky, but maybe + # another solution can be found + files = self.get_filename_list() + if archive_file in files: + if not self.rebuild([archive_file]): + return False + + try: + # now just add the archive file as a new one + with zipfile.ZipFile(self.path, mode="a", allowZip64=True, compression=zipfile.ZIP_DEFLATED) as zf: + zf.writestr(archive_file, data) + return True + except (zipfile.BadZipfile, OSError) as e: + logger.error("Error writing zip archive [%s]: %s :: %s", e, self.path, archive_file) + return False + + def get_filename_list(self) -> list[str]: + try: + with zipfile.ZipFile(self.path, mode="r") as zf: + namelist = [file.filename for file in zf.infolist() if not file.is_dir()] + return namelist + except (zipfile.BadZipfile, OSError) as e: + logger.error("Error listing files in zip archive [%s]: %s", e, self.path) + return [] + + def rebuild(self, exclude_list: list[str]) -> bool: + """Zip helper func + + This recompresses the zip archive, without the files in the exclude_list + """ + try: + with zipfile.ZipFile( + tempfile.NamedTemporaryFile(dir=os.path.dirname(self.path), delete=False), "w", allowZip64=True + ) as zout: + with zipfile.ZipFile(self.path, mode="r") as zin: + for item in zin.infolist(): + buffer = zin.read(item.filename) + if item.filename not in exclude_list: + zout.writestr(item, buffer) + + # preserve the old comment + zout.comment = zin.comment + + # replace with the new file + self.path.unlink(missing_ok=True) + zout.close() # Required on windows + + shutil.move(cast(str, zout.filename), self.path) + + except (zipfile.BadZipfile, OSError) as e: + logger.error("Error rebuilding zip file [%s]: %s", e, self.path) + return False + return True + + def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: + """Replace the current zip with one copied from another archive""" + try: + with zipfile.ZipFile(self.path, mode="w", allowZip64=True) as zout: + for filename in other_archive.get_filename_list(): + data = other_archive.read_file(filename) + if data is not None: + zout.writestr(filename, data) + + # preserve the old comment + comment = other_archive.get_comment() + if comment is not None: + if not self.write_zip_comment(self.path, comment): + return False + except Exception as e: + logger.error("Error while copying to zip archive [%s]: from %s to %s", e, other_archive.path, self.path) + return False + else: + return True + + def write_zip_comment(self, filename: pathlib.Path | str, comment: str) -> bool: + """ + This is a custom function for writing a comment to a zip file, + since the built-in one doesn't seem to work on Windows and Mac OS/X + + Fortunately, the zip comment is at the end of the file, and it's + easy to manipulate. See this website for more info: + see: http://en.wikipedia.org/wiki/Zip_(file_format)#Structure + """ + + # get file size + statinfo = os.stat(filename) + file_length = statinfo.st_size + + try: + with open(filename, mode="r+b") as file: + + # the starting position, relative to EOF + pos = -4 + found = False + + # walk backwards to find the "End of Central Directory" record + while (not found) and (-pos != file_length): + # seek, relative to EOF + file.seek(pos, 2) + value = file.read(4) + + # look for the end of central directory signature + if bytearray(value) == bytearray([0x50, 0x4B, 0x05, 0x06]): + found = True + else: + # not found, step back another byte + pos = pos - 1 + + if found: + + # now skip forward 20 bytes to the comment length word + pos += 20 + file.seek(pos, 2) + + # Pack the length of the comment string + fmt = "H" # one 2-byte integer + comment_length = struct.pack(fmt, len(comment)) # pack integer in a binary string + + # write out the length + file.write(comment_length) + file.seek(pos + 2, 2) + + # write out the comment itself + file.write(comment.encode("utf-8")) + file.truncate() + else: + raise Exception("Could not find the End of Central Directory record!") + except Exception as e: + logger.error("Error writing comment to zip archive [%s]: %s", e, self.path) + return False + else: + return True diff --git a/comicapi/comicarchive.py b/comicapi/comicarchive.py index da4fd83..1009b5d 100644 --- a/comicapi/comicarchive.py +++ b/comicapi/comicarchive.py @@ -18,12 +18,7 @@ import io import logging import os import pathlib -import platform import shutil -import struct -import subprocess -import tempfile -import time import zipfile from typing import cast @@ -31,6 +26,7 @@ import natsort import wordninja from comicapi import filenamelexer, filenameparser, utils +from comicapi.archivers import FolderArchiver, RarArchiver, SevenZipArchiver, UnknownArchiver, ZipArchiver from comicapi.comet import CoMet from comicapi.comicbookinfo import ComicBookInfo from comicapi.comicinfoxml import ComicInfoXml @@ -58,10 +54,9 @@ except ImportError: logger = logging.getLogger(__name__) + if not pil_available: logger.error("PIL unavalable") -if not rar_support: - logger.error("unrar-cffi unavailable") class MetaDataStyle: @@ -72,604 +67,6 @@ class MetaDataStyle: short_name = ["cbl", "cr", "comet"] -class UnknownArchiver: - - """Unknown implementation""" - - def __init__(self, path: pathlib.Path | str) -> None: - self.path = pathlib.Path(path) - - def get_comment(self) -> str: - return "" - - def set_comment(self, comment: str) -> bool: - return False - - def read_file(self, archive_file: str) -> bytes: - raise NotImplementedError - - def remove_file(self, archive_file: str) -> bool: - return False - - def write_file(self, archive_file: str, data: bytes) -> bool: - return False - - def get_filename_list(self) -> list[str]: - return [] - - def rebuild(self, exclude_list: list[str]) -> bool: - return False - - def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: - return False - - -class SevenZipArchiver(UnknownArchiver): - - """7Z implementation""" - - def __init__(self, path: pathlib.Path | str) -> None: - super().__init__(path) - - # @todo: Implement Comment? - def get_comment(self) -> str: - return "" - - def set_comment(self, comment: str) -> bool: - return False - - def read_file(self, archive_file: str) -> bytes: - data = b"" - try: - with py7zr.SevenZipFile(self.path, "r") as zf: - data = zf.read(archive_file)[archive_file].read() - except (py7zr.Bad7zFile, OSError) as e: - logger.error("Error reading 7zip archive [%s]: %s :: %s", e, self.path, archive_file) - raise - - return data - - def remove_file(self, archive_file: str) -> bool: - return self.rebuild([archive_file]) - - def write_file(self, archive_file: str, data: bytes) -> bool: - # At the moment, no other option but to rebuild the whole - # archive w/o the indicated file. Very sucky, but maybe - # another solution can be found - files = self.get_filename_list() - if archive_file in files: - if not self.rebuild([archive_file]): - return False - - try: - # now just add the archive file as a new one - with py7zr.SevenZipFile(self.path, "a") as zf: - zf.writestr(data, archive_file) - return True - except (py7zr.Bad7zFile, OSError) as e: - logger.error("Error writing 7zip archive [%s]: %s :: %s", e, self.path, archive_file) - return False - - def get_filename_list(self) -> list[str]: - try: - with py7zr.SevenZipFile(self.path, "r") as zf: - namelist: list[str] = [file.filename for file in zf.list() if not file.is_directory] - - return namelist - except (py7zr.Bad7zFile, OSError) as e: - logger.error("Error listing files in 7zip archive [%s]: %s", e, self.path) - return [] - - def rebuild(self, exclude_list: list[str]) -> bool: - """Zip helper func - - This recompresses the zip archive, without the files in the exclude_list - """ - - try: - # py7zr treats all archives as if they used solid compression - # so we need to get the filename list first to read all the files at once - with py7zr.SevenZipFile(self.path, mode="r") as zin: - targets = [f for f in zin.getnames() if f not in exclude_list] - with tempfile.NamedTemporaryFile(dir=os.path.dirname(self.path), delete=False) as tmp_file: - with py7zr.SevenZipFile(tmp_file.file, mode="w") as zout: - with py7zr.SevenZipFile(self.path, mode="r") as zin: - for filename, buffer in zin.read(targets).items(): - zout.writef(buffer, filename) - - self.path.unlink(missing_ok=True) - tmp_file.close() # Required on windows - - shutil.move(tmp_file.name, self.path) - except (py7zr.Bad7zFile, OSError) as e: - logger.error("Error rebuilding 7zip file [%s]: %s", e, self.path) - return False - return True - - def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: - """Replace the current zip with one copied from another archive""" - try: - with py7zr.SevenZipFile(self.path, "w") as zout: - for filename in other_archive.get_filename_list(): - data = other_archive.read_file( - filename - ) # This will be very inefficient if other_archive is a 7z file - if data is not None: - zout.writestr(data, filename) - except Exception as e: - logger.error("Error while copying to 7zip archive [%s]: from %s to %s", e, other_archive.path, self.path) - return False - else: - return True - - -class ZipArchiver(UnknownArchiver): - - """ZIP implementation""" - - def __init__(self, path: pathlib.Path | str) -> None: - super().__init__(path) - - def get_comment(self) -> str: - with zipfile.ZipFile(self.path, "r") as zf: - comment = zf.comment.decode("utf-8") - return comment - - def set_comment(self, comment: str) -> bool: - with zipfile.ZipFile(self.path, mode="a") as zf: - zf.comment = bytes(comment, "utf-8") - return True - - def read_file(self, archive_file: str) -> bytes: - with zipfile.ZipFile(self.path, mode="r") as zf: - try: - data = zf.read(archive_file) - except (zipfile.BadZipfile, OSError) as e: - logger.error("Error reading zip archive [%s]: %s :: %s", e, self.path, archive_file) - raise - return data - - def remove_file(self, archive_file: str) -> bool: - return self.rebuild([archive_file]) - - def write_file(self, archive_file: str, data: bytes) -> bool: - # At the moment, no other option but to rebuild the whole - # zip archive w/o the indicated file. Very sucky, but maybe - # another solution can be found - files = self.get_filename_list() - if archive_file in files: - if not self.rebuild([archive_file]): - return False - - try: - # now just add the archive file as a new one - with zipfile.ZipFile(self.path, mode="a", allowZip64=True, compression=zipfile.ZIP_DEFLATED) as zf: - zf.writestr(archive_file, data) - return True - except (zipfile.BadZipfile, OSError) as e: - logger.error("Error writing zip archive [%s]: %s :: %s", e, self.path, archive_file) - return False - - def get_filename_list(self) -> list[str]: - try: - with zipfile.ZipFile(self.path, mode="r") as zf: - namelist = [file.filename for file in zf.infolist() if not file.is_dir()] - return namelist - except (zipfile.BadZipfile, OSError) as e: - logger.error("Error listing files in zip archive [%s]: %s", e, self.path) - return [] - - def rebuild(self, exclude_list: list[str]) -> bool: - """Zip helper func - - This recompresses the zip archive, without the files in the exclude_list - """ - try: - with zipfile.ZipFile( - tempfile.NamedTemporaryFile(dir=os.path.dirname(self.path), delete=False), "w", allowZip64=True - ) as zout: - with zipfile.ZipFile(self.path, mode="r") as zin: - for item in zin.infolist(): - buffer = zin.read(item.filename) - if item.filename not in exclude_list: - zout.writestr(item, buffer) - - # preserve the old comment - zout.comment = zin.comment - - # replace with the new file - self.path.unlink(missing_ok=True) - zout.close() # Required on windows - - shutil.move(cast(str, zout.filename), self.path) - - except (zipfile.BadZipfile, OSError) as e: - logger.error("Error rebuilding zip file [%s]: %s", e, self.path) - return False - return True - - def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: - """Replace the current zip with one copied from another archive""" - try: - with zipfile.ZipFile(self.path, mode="w", allowZip64=True) as zout: - for filename in other_archive.get_filename_list(): - data = other_archive.read_file(filename) - if data is not None: - zout.writestr(filename, data) - - # preserve the old comment - comment = other_archive.get_comment() - if comment is not None: - if not self.write_zip_comment(self.path, comment): - return False - except Exception as e: - logger.error("Error while copying to zip archive [%s]: from %s to %s", e, other_archive.path, self.path) - return False - else: - return True - - def write_zip_comment(self, filename: pathlib.Path | str, comment: str) -> bool: - """ - This is a custom function for writing a comment to a zip file, - since the built-in one doesn't seem to work on Windows and Mac OS/X - - Fortunately, the zip comment is at the end of the file, and it's - easy to manipulate. See this website for more info: - see: http://en.wikipedia.org/wiki/Zip_(file_format)#Structure - """ - - # get file size - statinfo = os.stat(filename) - file_length = statinfo.st_size - - try: - with open(filename, mode="r+b") as file: - - # the starting position, relative to EOF - pos = -4 - found = False - - # walk backwards to find the "End of Central Directory" record - while (not found) and (-pos != file_length): - # seek, relative to EOF - file.seek(pos, 2) - value = file.read(4) - - # look for the end of central directory signature - if bytearray(value) == bytearray([0x50, 0x4B, 0x05, 0x06]): - found = True - else: - # not found, step back another byte - pos = pos - 1 - - if found: - - # now skip forward 20 bytes to the comment length word - pos += 20 - file.seek(pos, 2) - - # Pack the length of the comment string - fmt = "H" # one 2-byte integer - comment_length = struct.pack(fmt, len(comment)) # pack integer in a binary string - - # write out the length - file.write(comment_length) - file.seek(pos + 2, 2) - - # write out the comment itself - file.write(comment.encode("utf-8")) - file.truncate() - else: - raise Exception("Could not find the End of Central Directory record!") - except Exception as e: - logger.error("Error writing comment to zip archive [%s]: %s", e, self.path) - return False - else: - return True - - -class RarArchiver(UnknownArchiver): - """RAR implementation""" - - def __init__(self, path: pathlib.Path | str, rar_exe_path: str = "rar") -> None: - super().__init__(path) - self.rar_exe_path = shutil.which(rar_exe_path) or "" - - # windows only, keeps the cmd.exe from popping up - if platform.system() == "Windows": - self.startupinfo = subprocess.STARTUPINFO() # type: ignore - self.startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore - else: - self.startupinfo = None - - def get_comment(self) -> str: - rarc = self.get_rar_obj() - return rarc.comment.decode("utf-8") if rarc else "" - - def set_comment(self, comment: str) -> bool: - if rar_support and self.rar_exe_path: - try: - # write comment to temp file - with tempfile.TemporaryDirectory() as tmp_dir: - tmp_file = pathlib.Path(tmp_dir) / "rar_comment.txt" - tmp_file.write_text(comment, encoding="utf-8") - - working_dir = os.path.dirname(os.path.abspath(self.path)) - - # use external program to write comment to Rar archive - proc_args = [ - self.rar_exe_path, - "c", - f"-w{working_dir}", - "-c-", - f"-z{tmp_file}", - str(self.path), - ] - subprocess.run( - proc_args, - startupinfo=self.startupinfo, - stdout=subprocess.DEVNULL, - stdin=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - check=True, - ) - - if platform.system() == "Darwin": - time.sleep(1) - except (subprocess.CalledProcessError, OSError) as e: - logger.exception("Error writing comment to rar archive [%s]: %s", e, self.path) - return False - else: - return True - else: - return False - - def read_file(self, archive_file: str) -> bytes: - - rarc = self.get_rar_obj() - if rarc is None: - return b"" - - tries = 0 - while tries < 7: - try: - tries = tries + 1 - data: bytes = rarc.open(archive_file).read() - entries = [(rarc.getinfo(archive_file), data)] - - if entries[0][0].file_size != len(entries[0][1]): - logger.info( - "Error reading rar archive [file is not expected size: %d vs %d] %s :: %s :: tries #%d", - entries[0][0].file_size, - len(entries[0][1]), - self.path, - archive_file, - tries, - ) - continue - - except OSError as e: - logger.error("Error reading rar archive [%s]: %s :: %s :: tries #%d", e, self.path, archive_file, tries) - time.sleep(1) - except Exception as e: - logger.error( - "Unexpected exception reading rar archive [%s]: %s :: %s :: tries #%d", - e, - self.path, - archive_file, - tries, - ) - break - - else: - # Success. Entries is a list of of tuples: ( rarinfo, filedata) - if len(entries) == 1: - return entries[0][1] - - raise OSError - - raise OSError - - def remove_file(self, archive_file: str) -> bool: - if self.rar_exe_path: - # use external program to remove file from Rar archive - result = subprocess.run( - [self.rar_exe_path, "d", "-c-", self.path, archive_file], - startupinfo=self.startupinfo, - stdout=subprocess.DEVNULL, - stdin=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - - if platform.system() == "Darwin": - time.sleep(1) - if result.returncode != 0: - logger.error( - "Error removing file from rar archive [exitcode: %d]: %s :: %s", - result.returncode, - self.path, - archive_file, - ) - return False - return True - else: - return False - - def write_file(self, archive_file: str, data: bytes) -> bool: - if self.rar_exe_path: - archive_path = pathlib.PurePosixPath(archive_file) - archive_name = archive_path.name - archive_parent = str(archive_path.parent).lstrip("./") - - # use external program to write file to Rar archive - result = subprocess.run( - [self.rar_exe_path, "a", f"-si{archive_name}", f"-ap{archive_parent}", "-c-", "-ep", self.path], - input=data, - startupinfo=self.startupinfo, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - - if platform.system() == "Darwin": - time.sleep(1) - if result.returncode != 0: - logger.error( - "Error writing rar archive [exitcode: %d]: %s :: %s", result.returncode, self.path, archive_file - ) - return False - else: - return True - else: - return False - - def get_filename_list(self) -> list[str]: - rarc = self.get_rar_obj() - tries = 0 - if rar_support and rarc: - while tries < 7: - try: - tries = tries + 1 - namelist = [] - for item in rarc.infolist(): - if item.file_size != 0: - namelist.append(item.filename) - - except OSError as e: - logger.error("Error listing files in rar archive [%s]: %s :: attempt #%d", e, self.path, tries) - time.sleep(1) - - else: - return namelist - return [] - - def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: - """Replace the current archive with one copied from another archive""" - try: - with tempfile.TemporaryDirectory() as tmp_dir: - tmp_path = pathlib.Path(tmp_dir) - rar_cwd = tmp_path / "rar" - rar_cwd.mkdir(exist_ok=True) - rar_path = (tmp_path / self.path.name).with_suffix(".rar") - - for filename in other_archive.get_filename_list(): - (rar_cwd / filename).parent.mkdir(exist_ok=True, parents=True) - data = other_archive.read_file(filename) - if data is not None: - with open(rar_cwd / filename, mode="w+b") as tmp_file: - tmp_file.write(data) - result = subprocess.run( - [self.rar_exe_path, "a", "-r", "-c-", str(rar_path.absolute()), "."], - cwd=rar_cwd.absolute(), - startupinfo=self.startupinfo, - stdout=subprocess.DEVNULL, - stdin=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - if result.returncode != 0: - logger.error("Error while copying to rar archive [exitcode: %d]: %s", result.returncode, self.path) - return False - - self.path.unlink(missing_ok=True) - shutil.move(rar_path, self.path) - except Exception as e: - logger.exception("Error while copying to rar archive [%s]: from %s to %s", e, other_archive.path, self.path) - return False - else: - return True - - def get_rar_obj(self) -> rarfile.RarFile | None: - if rar_support: - try: - rarc = rarfile.RarFile(str(self.path)) - except (OSError, rarfile.RarFileError) as e: - logger.error("Unable to get rar object [%s]: %s", e, self.path) - else: - return rarc - - return None - - -class FolderArchiver(UnknownArchiver): - - """Folder implementation""" - - def __init__(self, path: pathlib.Path | str) -> None: - super().__init__(path) - self.comment_file_name = "ComicTaggerFolderComment.txt" - - def get_comment(self) -> str: - try: - return self.read_file(self.comment_file_name).decode("utf-8") - except OSError: - return "" - - def set_comment(self, comment: str) -> bool: - if (self.path / self.comment_file_name).exists() or comment: - return self.write_file(self.comment_file_name, comment.encode("utf-8")) - return True - - def read_file(self, archive_file: str) -> bytes: - try: - with open(self.path / archive_file, mode="rb") as f: - data = f.read() - except OSError as e: - logger.error("Error reading folder archive [%s]: %s :: %s", e, self.path, archive_file) - raise - - return data - - def remove_file(self, archive_file: str) -> bool: - try: - (self.path / archive_file).unlink(missing_ok=True) - except OSError as e: - logger.error("Error removing file for folder archive [%s]: %s :: %s", e, self.path, archive_file) - return False - else: - return True - - def write_file(self, archive_file: str, data: bytes) -> bool: - try: - file_path = self.path / archive_file - file_path.parent.mkdir(exist_ok=True, parents=True) - with open(self.path / archive_file, mode="wb") as f: - f.write(data) - except OSError as e: - logger.error("Error writing folder archive [%s]: %s :: %s", e, self.path, archive_file) - return False - else: - return True - - def get_filename_list(self) -> list[str]: - filenames = [] - try: - for root, _dirs, files in os.walk(self.path): - for f in files: - filenames.append(os.path.relpath(os.path.join(root, f), self.path).replace(os.path.sep, "/")) - return filenames - except OSError as e: - logger.error("Error listing files in folder archive [%s]: %s", e, self.path) - return [] - - def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: - """Replace the current zip with one copied from another archive""" - try: - for filename in other_archive.get_filename_list(): - data = other_archive.read_file(filename) - if data is not None: - self.write_file(filename, data) - - # preserve the old comment - comment = other_archive.get_comment() - if comment is not None: - if not self.set_comment(comment): - return False - except Exception: - logger.exception("Error while copying archive from %s to %s", other_archive.path, self.path) - return False - else: - return True - - class ComicArchive: logo_data = b""