diff --git a/comicapi/__pyinstaller/hook-comicapi.py b/comicapi/__pyinstaller/hook-comicapi.py index 0be9c56..6101d69 100644 --- a/comicapi/__pyinstaller/hook-comicapi.py +++ b/comicapi/__pyinstaller/hook-comicapi.py @@ -1,6 +1,6 @@ from __future__ import annotations -from PyInstaller.utils.hooks import collect_data_files +from PyInstaller.utils.hooks import collect_data_files, collect_entry_point -datas = [] +datas, hiddenimports = collect_entry_point("comicapi.archiver") datas += collect_data_files("comicapi.data") diff --git a/comicapi/archivers/__init__.py b/comicapi/archivers/__init__.py new file mode 100644 index 0000000..c445eb5 --- /dev/null +++ b/comicapi/archivers/__init__.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from comicapi.archivers.archiver import Archiver +from comicapi.archivers.folder import FolderArchiver +from comicapi.archivers.rar import RarArchiver +from comicapi.archivers.sevenzip import SevenZipArchiver +from comicapi.archivers.zip import ZipArchiver + + +class UnknownArchiver(Archiver): + def name(self) -> str: + return "Unknown" + + +__all__ = ["Archiver", "UnknownArchiver", "FolderArchiver", "RarArchiver", "ZipArchiver", "SevenZipArchiver"] diff --git a/comicapi/archivers/archiver.py b/comicapi/archivers/archiver.py new file mode 100644 index 0000000..4d7efb7 --- /dev/null +++ b/comicapi/archivers/archiver.py @@ -0,0 +1,130 @@ +from __future__ import annotations + +import pathlib +from typing import Protocol, runtime_checkable + + +@runtime_checkable +class Archiver(Protocol): + + """Archiver Protocol""" + + """The path to the archive""" + path: pathlib.Path + + """ + The name of the executable used for this archiver. This should be the base name of the executable. + For example if 'rar.exe' is needed this should be "rar". + If an executable is not used this should be the empty string. + """ + exe: str = "" + + """ + Whether or not this archiver is enabled. + If external imports are required and are not available this should be false. See rar.py and sevenzip.py. + """ + enabled: bool = True + + def __init__(self): + self.path = pathlib.Path() + + def get_comment(self) -> str: + """ + Returns the comment from the archive as a string. + Should always return a string. If comments are not supported in the archive the empty string should be returned. + """ + return "" + + def set_comment(self, comment: str) -> bool: + """ + Returns True if the comment was successfully set on the archive. + Should always return a boolean. If comments are not supported in the archive False should be returned. + """ + return False + + def supports_comment(self) -> bool: + """ + Returns True if the archive supports comments. + Should always return a boolean. If comments are not supported in the archive False should be returned. + """ + return False + + def read_file(self, archive_file: str) -> bytes: + """ + Reads the named file from the archive. + archive_file should always come from the output of get_filename_list. + Should always return a bytes object. Exceptions should be of the type OSError. + """ + raise NotImplementedError + + def remove_file(self, archive_file: str) -> bool: + """ + Removes the named file from the archive. + archive_file should always come from the output of get_filename_list. + Should always return a boolean. Failures should return False. + + Rebuilding the archive without the named file is a standard way to remove a file. + """ + return False + + def write_file(self, archive_file: str, data: bytes) -> bool: + """ + Writes the named file to the archive. + Should always return a boolean. Failures should return False. + """ + return False + + def get_filename_list(self) -> list[str]: + """ + Returns a list of filenames in the archive. + Should always return a list of string. Failures should return an empty list. + """ + return [] + + def copy_from_archive(self, other_archive: Archiver) -> bool: + """ + Copies the contents of another achive to this archive. + Should always return a boolean. Failures should return False. + """ + return False + + def is_writable(self) -> bool: + """ + Retuns True if the archive is writeable + Should always return a boolean. Failures should return False. + """ + return False + + def extension(self) -> str: + """ + Returns the extension that this archive should use eg ".cbz". + Should always return a string. Failures should return the empty string. + """ + return "" + + def name(self) -> str: + """ + Returns the name of this archive for display purposes eg "CBZ". + Should always return a string. Failures should return the empty string. + """ + return "" + + @classmethod + def is_valid(cls, path: pathlib.Path) -> bool: + """ + Returns True if the given path can be opened by this archive. + Should always return a boolean. Failures should return False. + """ + return False + + @classmethod + def open(cls, path: pathlib.Path) -> Archiver: + """ + Opens the given archive. + Should always return a an Archver. + Should never cause an exception no file operations should take place in this method, + is_valid will always be called before open. + """ + archiver = cls() + archiver.path = path + return archiver diff --git a/comicapi/archivers/folder.py b/comicapi/archivers/folder.py new file mode 100644 index 0000000..ec94a3a --- /dev/null +++ b/comicapi/archivers/folder.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +import logging +import os +import pathlib + +from comicapi.archivers import Archiver + +logger = logging.getLogger(__name__) + + +class FolderArchiver(Archiver): + + """Folder implementation""" + + def __init__(self) -> None: + super().__init__() + self.comment_file_name = "ComicTaggerFolderComment.txt" + + def get_comment(self) -> str: + try: + return self.read_file(self.comment_file_name).decode("utf-8") + except OSError: + return "" + + def set_comment(self, comment: str) -> bool: + if (self.path / self.comment_file_name).exists() or comment: + return self.write_file(self.comment_file_name, comment.encode("utf-8")) + return True + + def read_file(self, archive_file: str) -> bytes: + try: + with open(self.path / archive_file, mode="rb") as f: + data = f.read() + except OSError as e: + logger.error("Error reading folder archive [%s]: %s :: %s", e, self.path, archive_file) + raise + + return data + + def remove_file(self, archive_file: str) -> bool: + try: + (self.path / archive_file).unlink(missing_ok=True) + except OSError as e: + logger.error("Error removing file for folder archive [%s]: %s :: %s", e, self.path, archive_file) + return False + else: + return True + + def write_file(self, archive_file: str, data: bytes) -> bool: + try: + file_path = self.path / archive_file + file_path.parent.mkdir(exist_ok=True, parents=True) + with open(self.path / archive_file, mode="wb") as f: + f.write(data) + except OSError as e: + logger.error("Error writing folder archive [%s]: %s :: %s", e, self.path, archive_file) + return False + else: + return True + + def get_filename_list(self) -> list[str]: + filenames = [] + try: + for root, _dirs, files in os.walk(self.path): + for f in files: + filenames.append(os.path.relpath(os.path.join(root, f), self.path).replace(os.path.sep, "/")) + return filenames + except OSError as e: + logger.error("Error listing files in folder archive [%s]: %s", e, self.path) + return [] + + def copy_from_archive(self, other_archive: Archiver) -> bool: + """Replace the current zip with one copied from another archive""" + try: + for filename in other_archive.get_filename_list(): + data = other_archive.read_file(filename) + if data is not None: + self.write_file(filename, data) + + # preserve the old comment + comment = other_archive.get_comment() + if comment is not None: + if not self.set_comment(comment): + return False + except Exception: + logger.exception("Error while copying archive from %s to %s", other_archive.path, self.path) + return False + else: + return True + + def name(self) -> str: + return "Folder" + + @classmethod + def is_valid(cls, path: pathlib.Path) -> bool: + return path.is_dir() diff --git a/comicapi/archivers/rar.py b/comicapi/archivers/rar.py new file mode 100644 index 0000000..fd3a61f --- /dev/null +++ b/comicapi/archivers/rar.py @@ -0,0 +1,263 @@ +from __future__ import annotations + +import logging +import os +import pathlib +import platform +import shutil +import subprocess +import tempfile +import time + +from comicapi.archivers import Archiver + +try: + from unrar.cffi import rarfile + + rar_support = True +except ImportError: + rar_support = False + + +logger = logging.getLogger(__name__) + +if not rar_support: + logger.error("unrar-cffi unavailable") + + +class RarArchiver(Archiver): + """RAR implementation""" + + enabled = rar_support + exe = "rar" + + def __init__(self) -> None: + super().__init__() + + # windows only, keeps the cmd.exe from popping up + if platform.system() == "Windows": + self.startupinfo = subprocess.STARTUPINFO() # type: ignore + self.startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore + else: + self.startupinfo = None + + def get_comment(self) -> str: + rarc = self.get_rar_obj() + return rarc.comment.decode("utf-8") if rarc else "" + + def set_comment(self, comment: str) -> bool: + if rar_support and self.exe: + try: + # write comment to temp file + with tempfile.TemporaryDirectory() as tmp_dir: + tmp_file = pathlib.Path(tmp_dir) / "rar_comment.txt" + tmp_file.write_text(comment, encoding="utf-8") + + working_dir = os.path.dirname(os.path.abspath(self.path)) + + # use external program to write comment to Rar archive + proc_args = [ + self.exe, + "c", + f"-w{working_dir}", + "-c-", + f"-z{tmp_file}", + str(self.path), + ] + subprocess.run( + proc_args, + startupinfo=self.startupinfo, + stdout=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + check=True, + ) + + if platform.system() == "Darwin": + time.sleep(1) + except (subprocess.CalledProcessError, OSError) as e: + logger.exception("Error writing comment to rar archive [%s]: %s", e, self.path) + return False + else: + return True + else: + return False + + def read_file(self, archive_file: str) -> bytes: + + rarc = self.get_rar_obj() + if rarc is None: + return b"" + + tries = 0 + while tries < 7: + try: + tries = tries + 1 + data: bytes = rarc.open(archive_file).read() + entries = [(rarc.getinfo(archive_file), data)] + + if entries[0][0].file_size != len(entries[0][1]): + logger.info( + "Error reading rar archive [file is not expected size: %d vs %d] %s :: %s :: tries #%d", + entries[0][0].file_size, + len(entries[0][1]), + self.path, + archive_file, + tries, + ) + continue + + except OSError as e: + logger.error("Error reading rar archive [%s]: %s :: %s :: tries #%d", e, self.path, archive_file, tries) + time.sleep(1) + except Exception as e: + logger.error( + "Unexpected exception reading rar archive [%s]: %s :: %s :: tries #%d", + e, + self.path, + archive_file, + tries, + ) + break + + else: + # Success. Entries is a list of of tuples: ( rarinfo, filedata) + if len(entries) == 1: + return entries[0][1] + + raise OSError + + raise OSError + + def remove_file(self, archive_file: str) -> bool: + if self.exe: + # use external program to remove file from Rar archive + result = subprocess.run( + [self.exe, "d", "-c-", self.path, archive_file], + startupinfo=self.startupinfo, + stdout=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + if platform.system() == "Darwin": + time.sleep(1) + if result.returncode != 0: + logger.error( + "Error removing file from rar archive [exitcode: %d]: %s :: %s", + result.returncode, + self.path, + archive_file, + ) + return False + return True + else: + return False + + def write_file(self, archive_file: str, data: bytes) -> bool: + if self.exe: + archive_path = pathlib.PurePosixPath(archive_file) + archive_name = archive_path.name + archive_parent = str(archive_path.parent).lstrip("./") + + # use external program to write file to Rar archive + result = subprocess.run( + [self.exe, "a", f"-si{archive_name}", f"-ap{archive_parent}", "-c-", "-ep", self.path], + input=data, + startupinfo=self.startupinfo, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + if platform.system() == "Darwin": + time.sleep(1) + if result.returncode != 0: + logger.error( + "Error writing rar archive [exitcode: %d]: %s :: %s", result.returncode, self.path, archive_file + ) + return False + else: + return True + else: + return False + + def get_filename_list(self) -> list[str]: + rarc = self.get_rar_obj() + tries = 0 + if rar_support and rarc: + while tries < 7: + try: + tries = tries + 1 + namelist = [] + for item in rarc.infolist(): + if item.file_size != 0: + namelist.append(item.filename) + + except OSError as e: + logger.error("Error listing files in rar archive [%s]: %s :: attempt #%d", e, self.path, tries) + time.sleep(1) + + else: + return namelist + return [] + + def copy_from_archive(self, other_archive: Archiver) -> bool: + """Replace the current archive with one copied from another archive""" + try: + with tempfile.TemporaryDirectory() as tmp_dir: + tmp_path = pathlib.Path(tmp_dir) + rar_cwd = tmp_path / "rar" + rar_cwd.mkdir(exist_ok=True) + rar_path = (tmp_path / self.path.name).with_suffix(".rar") + + for filename in other_archive.get_filename_list(): + (rar_cwd / filename).parent.mkdir(exist_ok=True, parents=True) + data = other_archive.read_file(filename) + if data is not None: + with open(rar_cwd / filename, mode="w+b") as tmp_file: + tmp_file.write(data) + result = subprocess.run( + [self.exe, "a", "-r", "-c-", str(rar_path.absolute()), "."], + cwd=rar_cwd.absolute(), + startupinfo=self.startupinfo, + stdout=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + if result.returncode != 0: + logger.error("Error while copying to rar archive [exitcode: %d]: %s", result.returncode, self.path) + return False + + self.path.unlink(missing_ok=True) + shutil.move(rar_path, self.path) + except Exception as e: + logger.exception("Error while copying to rar archive [%s]: from %s to %s", e, other_archive.path, self.path) + return False + else: + return True + + def is_writable(self) -> bool: + return bool(self.exe and (os.path.exists(self.exe) or shutil.which(self.exe))) + + def extension(self) -> str: + return ".cbr" + + def name(self) -> str: + return "RAR" + + @classmethod + def is_valid(cls, path: pathlib.Path) -> bool: + if rar_support: + return rarfile.is_rarfile(str(path)) + return False + + def get_rar_obj(self) -> rarfile.RarFile | None: + if rar_support: + try: + rarc = rarfile.RarFile(str(self.path)) + except (OSError, rarfile.RarFileError) as e: + logger.error("Unable to get rar object [%s]: %s", e, self.path) + else: + return rarc + + return None diff --git a/comicapi/archivers/sevenzip.py b/comicapi/archivers/sevenzip.py new file mode 100644 index 0000000..be6e059 --- /dev/null +++ b/comicapi/archivers/sevenzip.py @@ -0,0 +1,131 @@ +from __future__ import annotations + +import logging +import os +import pathlib +import shutil +import tempfile + +from comicapi.archivers import Archiver + +try: + import py7zr + + z7_support = True +except ImportError: + z7_support = False + +logger = logging.getLogger(__name__) + + +class SevenZipArchiver(Archiver): + """7Z implementation""" + + enabled = z7_support + + def __init__(self) -> None: + super().__init__() + + # @todo: Implement Comment? + def get_comment(self) -> str: + return "" + + def set_comment(self, comment: str) -> bool: + return False + + def read_file(self, archive_file: str) -> bytes: + data = b"" + try: + with py7zr.SevenZipFile(self.path, "r") as zf: + data = zf.read(archive_file)[archive_file].read() + except (py7zr.Bad7zFile, OSError) as e: + logger.error("Error reading 7zip archive [%s]: %s :: %s", e, self.path, archive_file) + raise + + return data + + def remove_file(self, archive_file: str) -> bool: + return self.rebuild([archive_file]) + + def write_file(self, archive_file: str, data: bytes) -> bool: + # At the moment, no other option but to rebuild the whole + # archive w/o the indicated file. Very sucky, but maybe + # another solution can be found + files = self.get_filename_list() + if archive_file in files: + if not self.rebuild([archive_file]): + return False + + try: + # now just add the archive file as a new one + with py7zr.SevenZipFile(self.path, "a") as zf: + zf.writestr(data, archive_file) + return True + except (py7zr.Bad7zFile, OSError) as e: + logger.error("Error writing 7zip archive [%s]: %s :: %s", e, self.path, archive_file) + return False + + def get_filename_list(self) -> list[str]: + try: + with py7zr.SevenZipFile(self.path, "r") as zf: + namelist: list[str] = [file.filename for file in zf.list() if not file.is_directory] + + return namelist + except (py7zr.Bad7zFile, OSError) as e: + logger.error("Error listing files in 7zip archive [%s]: %s", e, self.path) + return [] + + def rebuild(self, exclude_list: list[str]) -> bool: + """Zip helper func + + This recompresses the zip archive, without the files in the exclude_list + """ + + try: + # py7zr treats all archives as if they used solid compression + # so we need to get the filename list first to read all the files at once + with py7zr.SevenZipFile(self.path, mode="r") as zin: + targets = [f for f in zin.getnames() if f not in exclude_list] + with tempfile.NamedTemporaryFile(dir=os.path.dirname(self.path), delete=False) as tmp_file: + with py7zr.SevenZipFile(tmp_file.file, mode="w") as zout: + with py7zr.SevenZipFile(self.path, mode="r") as zin: + for filename, buffer in zin.read(targets).items(): + zout.writef(buffer, filename) + + self.path.unlink(missing_ok=True) + tmp_file.close() # Required on windows + + shutil.move(tmp_file.name, self.path) + except (py7zr.Bad7zFile, OSError) as e: + logger.error("Error rebuilding 7zip file [%s]: %s", e, self.path) + return False + return True + + def copy_from_archive(self, other_archive: Archiver) -> bool: + """Replace the current zip with one copied from another archive""" + try: + with py7zr.SevenZipFile(self.path, "w") as zout: + for filename in other_archive.get_filename_list(): + data = other_archive.read_file( + filename + ) # This will be very inefficient if other_archive is a 7z file + if data is not None: + zout.writestr(data, filename) + except Exception as e: + logger.error("Error while copying to 7zip archive [%s]: from %s to %s", e, other_archive.path, self.path) + return False + else: + return True + + def is_writable(self) -> bool: + return True + + def extension(self) -> str: + return ".cb7" + + def name(self) -> str: + return "Seven Zip" + + @classmethod + def is_valid(cls, path: pathlib.Path) -> bool: + return py7zr.is_7zfile(path) diff --git a/comicapi/archivers/zip.py b/comicapi/archivers/zip.py new file mode 100644 index 0000000..afc7ffa --- /dev/null +++ b/comicapi/archivers/zip.py @@ -0,0 +1,192 @@ +from __future__ import annotations + +import logging +import os +import pathlib +import shutil +import struct +import tempfile +import zipfile +from typing import cast + +from comicapi.archivers import Archiver + +logger = logging.getLogger(__name__) + + +class ZipArchiver(Archiver): + + """ZIP implementation""" + + def __init__(self) -> None: + super().__init__() + + def get_comment(self) -> str: + with zipfile.ZipFile(self.path, "r") as zf: + comment = zf.comment.decode("utf-8") + return comment + + def set_comment(self, comment: str) -> bool: + with zipfile.ZipFile(self.path, mode="a") as zf: + zf.comment = bytes(comment, "utf-8") + return True + + def read_file(self, archive_file: str) -> bytes: + with zipfile.ZipFile(self.path, mode="r") as zf: + try: + data = zf.read(archive_file) + except (zipfile.BadZipfile, OSError) as e: + logger.error("Error reading zip archive [%s]: %s :: %s", e, self.path, archive_file) + raise + return data + + def remove_file(self, archive_file: str) -> bool: + return self.rebuild([archive_file]) + + def write_file(self, archive_file: str, data: bytes) -> bool: + # At the moment, no other option but to rebuild the whole + # zip archive w/o the indicated file. Very sucky, but maybe + # another solution can be found + files = self.get_filename_list() + if archive_file in files: + if not self.rebuild([archive_file]): + return False + + try: + # now just add the archive file as a new one + with zipfile.ZipFile(self.path, mode="a", allowZip64=True, compression=zipfile.ZIP_DEFLATED) as zf: + zf.writestr(archive_file, data) + return True + except (zipfile.BadZipfile, OSError) as e: + logger.error("Error writing zip archive [%s]: %s :: %s", e, self.path, archive_file) + return False + + def get_filename_list(self) -> list[str]: + try: + with zipfile.ZipFile(self.path, mode="r") as zf: + namelist = [file.filename for file in zf.infolist() if not file.is_dir()] + return namelist + except (zipfile.BadZipfile, OSError) as e: + logger.error("Error listing files in zip archive [%s]: %s", e, self.path) + return [] + + def rebuild(self, exclude_list: list[str]) -> bool: + """Zip helper func + + This recompresses the zip archive, without the files in the exclude_list + """ + try: + with zipfile.ZipFile( + tempfile.NamedTemporaryFile(dir=os.path.dirname(self.path), delete=False), "w", allowZip64=True + ) as zout: + with zipfile.ZipFile(self.path, mode="r") as zin: + for item in zin.infolist(): + buffer = zin.read(item.filename) + if item.filename not in exclude_list: + zout.writestr(item, buffer) + + # preserve the old comment + zout.comment = zin.comment + + # replace with the new file + self.path.unlink(missing_ok=True) + zout.close() # Required on windows + + shutil.move(cast(str, zout.filename), self.path) + + except (zipfile.BadZipfile, OSError) as e: + logger.error("Error rebuilding zip file [%s]: %s", e, self.path) + return False + return True + + def copy_from_archive(self, other_archive: Archiver) -> bool: + """Replace the current zip with one copied from another archive""" + try: + with zipfile.ZipFile(self.path, mode="w", allowZip64=True) as zout: + for filename in other_archive.get_filename_list(): + data = other_archive.read_file(filename) + if data is not None: + zout.writestr(filename, data) + + # preserve the old comment + comment = other_archive.get_comment() + if comment is not None: + if not self.write_zip_comment(self.path, comment): + return False + except Exception as e: + logger.error("Error while copying to zip archive [%s]: from %s to %s", e, other_archive.path, self.path) + return False + else: + return True + + def is_writable(self) -> bool: + return True + + def extension(self) -> str: + return ".cbz" + + def name(self) -> str: + return "ZIP" + + @classmethod + def is_valid(cls, path: pathlib.Path) -> bool: + return zipfile.is_zipfile(path) + + def write_zip_comment(self, filename: pathlib.Path | str, comment: str) -> bool: + """ + This is a custom function for writing a comment to a zip file, + since the built-in one doesn't seem to work on Windows and Mac OS/X + + Fortunately, the zip comment is at the end of the file, and it's + easy to manipulate. See this website for more info: + see: http://en.wikipedia.org/wiki/Zip_(file_format)#Structure + """ + + # get file size + statinfo = os.stat(filename) + file_length = statinfo.st_size + + try: + with open(filename, mode="r+b") as file: + + # the starting position, relative to EOF + pos = -4 + found = False + + # walk backwards to find the "End of Central Directory" record + while (not found) and (-pos != file_length): + # seek, relative to EOF + file.seek(pos, 2) + value = file.read(4) + + # look for the end of central directory signature + if bytearray(value) == bytearray([0x50, 0x4B, 0x05, 0x06]): + found = True + else: + # not found, step back another byte + pos = pos - 1 + + if found: + + # now skip forward 20 bytes to the comment length word + pos += 20 + file.seek(pos, 2) + + # Pack the length of the comment string + fmt = "H" # one 2-byte integer + comment_length = struct.pack(fmt, len(comment)) # pack integer in a binary string + + # write out the length + file.write(comment_length) + file.seek(pos + 2, 2) + + # write out the comment itself + file.write(comment.encode("utf-8")) + file.truncate() + else: + raise Exception("Could not find the End of Central Directory record!") + except Exception as e: + logger.error("Error writing comment to zip archive [%s]: %s", e, self.path) + return False + else: + return True diff --git a/comicapi/comicarchive.py b/comicapi/comicarchive.py index da4fd83..b10eaf9 100644 --- a/comicapi/comicarchive.py +++ b/comicapi/comicarchive.py @@ -18,36 +18,24 @@ import io import logging import os import pathlib -import platform import shutil -import struct -import subprocess -import tempfile -import time -import zipfile +import sys from typing import cast import natsort import wordninja from comicapi import filenamelexer, filenameparser, utils +from comicapi.archivers import Archiver, UnknownArchiver, ZipArchiver from comicapi.comet import CoMet from comicapi.comicbookinfo import ComicBookInfo from comicapi.comicinfoxml import ComicInfoXml from comicapi.genericmetadata import GenericMetadata, PageType -try: - import py7zr - - z7_support = True -except ImportError: - z7_support = False -try: - from unrar.cffi import rarfile - - rar_support = True -except ImportError: - rar_support = False +if sys.version_info < (3, 10): + from importlib_metadata import entry_points +else: + from importlib.metadata import entry_points try: from PIL import Image @@ -56,12 +44,28 @@ try: except ImportError: pil_available = False - logger = logging.getLogger(__name__) + if not pil_available: logger.error("PIL unavalable") -if not rar_support: - logger.error("unrar-cffi unavailable") + +archivers: list[type[Archiver]] = [] + + +def load_archive_plugins() -> None: + if not archivers: + builtin: list[type[Archiver]] = [] + for arch in entry_points(group="comicapi.archiver"): + try: + archiver: type[Archiver] = arch.load() + if archiver.enabled: + if arch.module.startswith("comicapi"): + builtin.append(archiver) + else: + archivers.append(archiver) + except Exception: + logger.warning("Failed to load talker: %s", arch.name) + archivers.extend(builtin) class MetaDataStyle: @@ -72,614 +76,12 @@ class MetaDataStyle: short_name = ["cbl", "cr", "comet"] -class UnknownArchiver: - - """Unknown implementation""" - - def __init__(self, path: pathlib.Path | str) -> None: - self.path = pathlib.Path(path) - - def get_comment(self) -> str: - return "" - - def set_comment(self, comment: str) -> bool: - return False - - def read_file(self, archive_file: str) -> bytes: - raise NotImplementedError - - def remove_file(self, archive_file: str) -> bool: - return False - - def write_file(self, archive_file: str, data: bytes) -> bool: - return False - - def get_filename_list(self) -> list[str]: - return [] - - def rebuild(self, exclude_list: list[str]) -> bool: - return False - - def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: - return False - - -class SevenZipArchiver(UnknownArchiver): - - """7Z implementation""" - - def __init__(self, path: pathlib.Path | str) -> None: - super().__init__(path) - - # @todo: Implement Comment? - def get_comment(self) -> str: - return "" - - def set_comment(self, comment: str) -> bool: - return False - - def read_file(self, archive_file: str) -> bytes: - data = b"" - try: - with py7zr.SevenZipFile(self.path, "r") as zf: - data = zf.read(archive_file)[archive_file].read() - except (py7zr.Bad7zFile, OSError) as e: - logger.error("Error reading 7zip archive [%s]: %s :: %s", e, self.path, archive_file) - raise - - return data - - def remove_file(self, archive_file: str) -> bool: - return self.rebuild([archive_file]) - - def write_file(self, archive_file: str, data: bytes) -> bool: - # At the moment, no other option but to rebuild the whole - # archive w/o the indicated file. Very sucky, but maybe - # another solution can be found - files = self.get_filename_list() - if archive_file in files: - if not self.rebuild([archive_file]): - return False - - try: - # now just add the archive file as a new one - with py7zr.SevenZipFile(self.path, "a") as zf: - zf.writestr(data, archive_file) - return True - except (py7zr.Bad7zFile, OSError) as e: - logger.error("Error writing 7zip archive [%s]: %s :: %s", e, self.path, archive_file) - return False - - def get_filename_list(self) -> list[str]: - try: - with py7zr.SevenZipFile(self.path, "r") as zf: - namelist: list[str] = [file.filename for file in zf.list() if not file.is_directory] - - return namelist - except (py7zr.Bad7zFile, OSError) as e: - logger.error("Error listing files in 7zip archive [%s]: %s", e, self.path) - return [] - - def rebuild(self, exclude_list: list[str]) -> bool: - """Zip helper func - - This recompresses the zip archive, without the files in the exclude_list - """ - - try: - # py7zr treats all archives as if they used solid compression - # so we need to get the filename list first to read all the files at once - with py7zr.SevenZipFile(self.path, mode="r") as zin: - targets = [f for f in zin.getnames() if f not in exclude_list] - with tempfile.NamedTemporaryFile(dir=os.path.dirname(self.path), delete=False) as tmp_file: - with py7zr.SevenZipFile(tmp_file.file, mode="w") as zout: - with py7zr.SevenZipFile(self.path, mode="r") as zin: - for filename, buffer in zin.read(targets).items(): - zout.writef(buffer, filename) - - self.path.unlink(missing_ok=True) - tmp_file.close() # Required on windows - - shutil.move(tmp_file.name, self.path) - except (py7zr.Bad7zFile, OSError) as e: - logger.error("Error rebuilding 7zip file [%s]: %s", e, self.path) - return False - return True - - def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: - """Replace the current zip with one copied from another archive""" - try: - with py7zr.SevenZipFile(self.path, "w") as zout: - for filename in other_archive.get_filename_list(): - data = other_archive.read_file( - filename - ) # This will be very inefficient if other_archive is a 7z file - if data is not None: - zout.writestr(data, filename) - except Exception as e: - logger.error("Error while copying to 7zip archive [%s]: from %s to %s", e, other_archive.path, self.path) - return False - else: - return True - - -class ZipArchiver(UnknownArchiver): - - """ZIP implementation""" - - def __init__(self, path: pathlib.Path | str) -> None: - super().__init__(path) - - def get_comment(self) -> str: - with zipfile.ZipFile(self.path, "r") as zf: - comment = zf.comment.decode("utf-8") - return comment - - def set_comment(self, comment: str) -> bool: - with zipfile.ZipFile(self.path, mode="a") as zf: - zf.comment = bytes(comment, "utf-8") - return True - - def read_file(self, archive_file: str) -> bytes: - with zipfile.ZipFile(self.path, mode="r") as zf: - try: - data = zf.read(archive_file) - except (zipfile.BadZipfile, OSError) as e: - logger.error("Error reading zip archive [%s]: %s :: %s", e, self.path, archive_file) - raise - return data - - def remove_file(self, archive_file: str) -> bool: - return self.rebuild([archive_file]) - - def write_file(self, archive_file: str, data: bytes) -> bool: - # At the moment, no other option but to rebuild the whole - # zip archive w/o the indicated file. Very sucky, but maybe - # another solution can be found - files = self.get_filename_list() - if archive_file in files: - if not self.rebuild([archive_file]): - return False - - try: - # now just add the archive file as a new one - with zipfile.ZipFile(self.path, mode="a", allowZip64=True, compression=zipfile.ZIP_DEFLATED) as zf: - zf.writestr(archive_file, data) - return True - except (zipfile.BadZipfile, OSError) as e: - logger.error("Error writing zip archive [%s]: %s :: %s", e, self.path, archive_file) - return False - - def get_filename_list(self) -> list[str]: - try: - with zipfile.ZipFile(self.path, mode="r") as zf: - namelist = [file.filename for file in zf.infolist() if not file.is_dir()] - return namelist - except (zipfile.BadZipfile, OSError) as e: - logger.error("Error listing files in zip archive [%s]: %s", e, self.path) - return [] - - def rebuild(self, exclude_list: list[str]) -> bool: - """Zip helper func - - This recompresses the zip archive, without the files in the exclude_list - """ - try: - with zipfile.ZipFile( - tempfile.NamedTemporaryFile(dir=os.path.dirname(self.path), delete=False), "w", allowZip64=True - ) as zout: - with zipfile.ZipFile(self.path, mode="r") as zin: - for item in zin.infolist(): - buffer = zin.read(item.filename) - if item.filename not in exclude_list: - zout.writestr(item, buffer) - - # preserve the old comment - zout.comment = zin.comment - - # replace with the new file - self.path.unlink(missing_ok=True) - zout.close() # Required on windows - - shutil.move(cast(str, zout.filename), self.path) - - except (zipfile.BadZipfile, OSError) as e: - logger.error("Error rebuilding zip file [%s]: %s", e, self.path) - return False - return True - - def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: - """Replace the current zip with one copied from another archive""" - try: - with zipfile.ZipFile(self.path, mode="w", allowZip64=True) as zout: - for filename in other_archive.get_filename_list(): - data = other_archive.read_file(filename) - if data is not None: - zout.writestr(filename, data) - - # preserve the old comment - comment = other_archive.get_comment() - if comment is not None: - if not self.write_zip_comment(self.path, comment): - return False - except Exception as e: - logger.error("Error while copying to zip archive [%s]: from %s to %s", e, other_archive.path, self.path) - return False - else: - return True - - def write_zip_comment(self, filename: pathlib.Path | str, comment: str) -> bool: - """ - This is a custom function for writing a comment to a zip file, - since the built-in one doesn't seem to work on Windows and Mac OS/X - - Fortunately, the zip comment is at the end of the file, and it's - easy to manipulate. See this website for more info: - see: http://en.wikipedia.org/wiki/Zip_(file_format)#Structure - """ - - # get file size - statinfo = os.stat(filename) - file_length = statinfo.st_size - - try: - with open(filename, mode="r+b") as file: - - # the starting position, relative to EOF - pos = -4 - found = False - - # walk backwards to find the "End of Central Directory" record - while (not found) and (-pos != file_length): - # seek, relative to EOF - file.seek(pos, 2) - value = file.read(4) - - # look for the end of central directory signature - if bytearray(value) == bytearray([0x50, 0x4B, 0x05, 0x06]): - found = True - else: - # not found, step back another byte - pos = pos - 1 - - if found: - - # now skip forward 20 bytes to the comment length word - pos += 20 - file.seek(pos, 2) - - # Pack the length of the comment string - fmt = "H" # one 2-byte integer - comment_length = struct.pack(fmt, len(comment)) # pack integer in a binary string - - # write out the length - file.write(comment_length) - file.seek(pos + 2, 2) - - # write out the comment itself - file.write(comment.encode("utf-8")) - file.truncate() - else: - raise Exception("Could not find the End of Central Directory record!") - except Exception as e: - logger.error("Error writing comment to zip archive [%s]: %s", e, self.path) - return False - else: - return True - - -class RarArchiver(UnknownArchiver): - """RAR implementation""" - - def __init__(self, path: pathlib.Path | str, rar_exe_path: str = "rar") -> None: - super().__init__(path) - self.rar_exe_path = shutil.which(rar_exe_path) or "" - - # windows only, keeps the cmd.exe from popping up - if platform.system() == "Windows": - self.startupinfo = subprocess.STARTUPINFO() # type: ignore - self.startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore - else: - self.startupinfo = None - - def get_comment(self) -> str: - rarc = self.get_rar_obj() - return rarc.comment.decode("utf-8") if rarc else "" - - def set_comment(self, comment: str) -> bool: - if rar_support and self.rar_exe_path: - try: - # write comment to temp file - with tempfile.TemporaryDirectory() as tmp_dir: - tmp_file = pathlib.Path(tmp_dir) / "rar_comment.txt" - tmp_file.write_text(comment, encoding="utf-8") - - working_dir = os.path.dirname(os.path.abspath(self.path)) - - # use external program to write comment to Rar archive - proc_args = [ - self.rar_exe_path, - "c", - f"-w{working_dir}", - "-c-", - f"-z{tmp_file}", - str(self.path), - ] - subprocess.run( - proc_args, - startupinfo=self.startupinfo, - stdout=subprocess.DEVNULL, - stdin=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - check=True, - ) - - if platform.system() == "Darwin": - time.sleep(1) - except (subprocess.CalledProcessError, OSError) as e: - logger.exception("Error writing comment to rar archive [%s]: %s", e, self.path) - return False - else: - return True - else: - return False - - def read_file(self, archive_file: str) -> bytes: - - rarc = self.get_rar_obj() - if rarc is None: - return b"" - - tries = 0 - while tries < 7: - try: - tries = tries + 1 - data: bytes = rarc.open(archive_file).read() - entries = [(rarc.getinfo(archive_file), data)] - - if entries[0][0].file_size != len(entries[0][1]): - logger.info( - "Error reading rar archive [file is not expected size: %d vs %d] %s :: %s :: tries #%d", - entries[0][0].file_size, - len(entries[0][1]), - self.path, - archive_file, - tries, - ) - continue - - except OSError as e: - logger.error("Error reading rar archive [%s]: %s :: %s :: tries #%d", e, self.path, archive_file, tries) - time.sleep(1) - except Exception as e: - logger.error( - "Unexpected exception reading rar archive [%s]: %s :: %s :: tries #%d", - e, - self.path, - archive_file, - tries, - ) - break - - else: - # Success. Entries is a list of of tuples: ( rarinfo, filedata) - if len(entries) == 1: - return entries[0][1] - - raise OSError - - raise OSError - - def remove_file(self, archive_file: str) -> bool: - if self.rar_exe_path: - # use external program to remove file from Rar archive - result = subprocess.run( - [self.rar_exe_path, "d", "-c-", self.path, archive_file], - startupinfo=self.startupinfo, - stdout=subprocess.DEVNULL, - stdin=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - - if platform.system() == "Darwin": - time.sleep(1) - if result.returncode != 0: - logger.error( - "Error removing file from rar archive [exitcode: %d]: %s :: %s", - result.returncode, - self.path, - archive_file, - ) - return False - return True - else: - return False - - def write_file(self, archive_file: str, data: bytes) -> bool: - if self.rar_exe_path: - archive_path = pathlib.PurePosixPath(archive_file) - archive_name = archive_path.name - archive_parent = str(archive_path.parent).lstrip("./") - - # use external program to write file to Rar archive - result = subprocess.run( - [self.rar_exe_path, "a", f"-si{archive_name}", f"-ap{archive_parent}", "-c-", "-ep", self.path], - input=data, - startupinfo=self.startupinfo, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - - if platform.system() == "Darwin": - time.sleep(1) - if result.returncode != 0: - logger.error( - "Error writing rar archive [exitcode: %d]: %s :: %s", result.returncode, self.path, archive_file - ) - return False - else: - return True - else: - return False - - def get_filename_list(self) -> list[str]: - rarc = self.get_rar_obj() - tries = 0 - if rar_support and rarc: - while tries < 7: - try: - tries = tries + 1 - namelist = [] - for item in rarc.infolist(): - if item.file_size != 0: - namelist.append(item.filename) - - except OSError as e: - logger.error("Error listing files in rar archive [%s]: %s :: attempt #%d", e, self.path, tries) - time.sleep(1) - - else: - return namelist - return [] - - def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: - """Replace the current archive with one copied from another archive""" - try: - with tempfile.TemporaryDirectory() as tmp_dir: - tmp_path = pathlib.Path(tmp_dir) - rar_cwd = tmp_path / "rar" - rar_cwd.mkdir(exist_ok=True) - rar_path = (tmp_path / self.path.name).with_suffix(".rar") - - for filename in other_archive.get_filename_list(): - (rar_cwd / filename).parent.mkdir(exist_ok=True, parents=True) - data = other_archive.read_file(filename) - if data is not None: - with open(rar_cwd / filename, mode="w+b") as tmp_file: - tmp_file.write(data) - result = subprocess.run( - [self.rar_exe_path, "a", "-r", "-c-", str(rar_path.absolute()), "."], - cwd=rar_cwd.absolute(), - startupinfo=self.startupinfo, - stdout=subprocess.DEVNULL, - stdin=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - if result.returncode != 0: - logger.error("Error while copying to rar archive [exitcode: %d]: %s", result.returncode, self.path) - return False - - self.path.unlink(missing_ok=True) - shutil.move(rar_path, self.path) - except Exception as e: - logger.exception("Error while copying to rar archive [%s]: from %s to %s", e, other_archive.path, self.path) - return False - else: - return True - - def get_rar_obj(self) -> rarfile.RarFile | None: - if rar_support: - try: - rarc = rarfile.RarFile(str(self.path)) - except (OSError, rarfile.RarFileError) as e: - logger.error("Unable to get rar object [%s]: %s", e, self.path) - else: - return rarc - - return None - - -class FolderArchiver(UnknownArchiver): - - """Folder implementation""" - - def __init__(self, path: pathlib.Path | str) -> None: - super().__init__(path) - self.comment_file_name = "ComicTaggerFolderComment.txt" - - def get_comment(self) -> str: - try: - return self.read_file(self.comment_file_name).decode("utf-8") - except OSError: - return "" - - def set_comment(self, comment: str) -> bool: - if (self.path / self.comment_file_name).exists() or comment: - return self.write_file(self.comment_file_name, comment.encode("utf-8")) - return True - - def read_file(self, archive_file: str) -> bytes: - try: - with open(self.path / archive_file, mode="rb") as f: - data = f.read() - except OSError as e: - logger.error("Error reading folder archive [%s]: %s :: %s", e, self.path, archive_file) - raise - - return data - - def remove_file(self, archive_file: str) -> bool: - try: - (self.path / archive_file).unlink(missing_ok=True) - except OSError as e: - logger.error("Error removing file for folder archive [%s]: %s :: %s", e, self.path, archive_file) - return False - else: - return True - - def write_file(self, archive_file: str, data: bytes) -> bool: - try: - file_path = self.path / archive_file - file_path.parent.mkdir(exist_ok=True, parents=True) - with open(self.path / archive_file, mode="wb") as f: - f.write(data) - except OSError as e: - logger.error("Error writing folder archive [%s]: %s :: %s", e, self.path, archive_file) - return False - else: - return True - - def get_filename_list(self) -> list[str]: - filenames = [] - try: - for root, _dirs, files in os.walk(self.path): - for f in files: - filenames.append(os.path.relpath(os.path.join(root, f), self.path).replace(os.path.sep, "/")) - return filenames - except OSError as e: - logger.error("Error listing files in folder archive [%s]: %s", e, self.path) - return [] - - def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: - """Replace the current zip with one copied from another archive""" - try: - for filename in other_archive.get_filename_list(): - data = other_archive.read_file(filename) - if data is not None: - self.write_file(filename, data) - - # preserve the old comment - comment = other_archive.get_comment() - if comment is not None: - if not self.set_comment(comment): - return False - except Exception: - logger.exception("Error while copying archive from %s to %s", other_archive.path, self.path) - return False - else: - return True - - class ComicArchive: logo_data = b"" - class ArchiveType: - SevenZip, Zip, Rar, Folder, Pdf, Unknown = list(range(6)) - def __init__( self, path: pathlib.Path | str, - rar_exe_path: str = "rar", default_image_path: pathlib.Path | str | None = None, ) -> None: self.cbi_md: GenericMetadata | None = None @@ -693,42 +95,18 @@ class ComicArchive: self.page_count: int | None = None self.page_list: list[str] = [] - self.rar_exe_path = shutil.which(rar_exe_path or "rar") or "" self.ci_xml_filename = "ComicInfo.xml" self.comet_default_filename = "CoMet.xml" self.reset_cache() self.default_image_path = default_image_path - # Use file extension to decide which archive test we do first - ext = self.path.suffix + self.archiver: Archiver = UnknownArchiver.open(self.path) - self.archive_type = self.ArchiveType.Unknown - self.archiver = UnknownArchiver(self.path) - - if ext in [".cbr", ".rar"]: - if self.rar_test(): - self.archive_type = self.ArchiveType.Rar - self.archiver = RarArchiver(self.path, rar_exe_path=self.rar_exe_path) - - elif self.zip_test(): - self.archive_type = self.ArchiveType.Zip - self.archiver = ZipArchiver(self.path) - else: - if self.sevenzip_test(): - self.archive_type = self.ArchiveType.SevenZip - self.archiver = SevenZipArchiver(self.path) - - elif self.zip_test(): - self.archive_type = self.ArchiveType.Zip - self.archiver = ZipArchiver(self.path) - - elif self.rar_test(): - self.archive_type = self.ArchiveType.Rar - self.archiver = RarArchiver(self.path, rar_exe_path=self.rar_exe_path) - - elif self.folder_test(): - self.archive_type = self.ArchiveType.Folder - self.archiver = FolderArchiver(self.path) + load_archive_plugins() + for archiver in archivers: + if archiver.is_valid(self.path): + self.archiver = archiver.open(self.path) + break if not ComicArchive.logo_data and self.default_image_path: with open(self.default_image_path, mode="rb") as fd: @@ -760,63 +138,33 @@ class ComicArchive: self.path = new_path self.archiver.path = pathlib.Path(path) - def sevenzip_test(self) -> bool: - return z7_support and py7zr.is_7zfile(self.path) - - def zip_test(self) -> bool: - return zipfile.is_zipfile(self.path) - - def rar_test(self) -> bool: - return rar_support and rarfile.is_rarfile(str(self.path)) - - def folder_test(self) -> bool: - return self.path.is_dir() - - def is_sevenzip(self) -> bool: - return self.archive_type == self.ArchiveType.SevenZip - - def is_zip(self) -> bool: - return self.archive_type == self.ArchiveType.Zip - - def is_rar(self) -> bool: - return self.archive_type == self.ArchiveType.Rar - - def is_pdf(self) -> bool: - return self.archive_type == self.ArchiveType.Pdf - - def is_folder(self) -> bool: - return self.archive_type == self.ArchiveType.Folder - - def is_writable(self, check_rar_status: bool = True) -> bool: - if self.archive_type == self.ArchiveType.Unknown: + def is_writable(self, check_archive_status: bool = True) -> bool: + if isinstance(self.archiver, UnknownArchiver): return False - if check_rar_status and self.is_rar() and not self.rar_exe_path: + if check_archive_status and not self.archiver.is_writable(): return False - if not os.access(self.path, os.W_OK): - return False - - if (self.archive_type != self.ArchiveType.Folder) and (not os.access(self.path.parent, os.W_OK)): + if not (os.access(self.path, os.W_OK) or os.access(self.path.parent, os.W_OK)): return False return True def is_writable_for_style(self, data_style: int) -> bool: + return not (data_style == MetaDataStyle.CBI and not self.archiver.supports_comment) - if (self.is_rar() or self.is_sevenzip()) and data_style == MetaDataStyle.CBI: - return False - - return self.is_writable() + def is_zip(self) -> bool: + return self.archiver.name() == "ZIP" def seems_to_be_a_comic_archive(self) -> bool: - if (self.is_zip() or self.is_rar() or self.is_sevenzip() or self.is_folder()) and ( - self.get_number_of_pages() > 0 - ): + if not (isinstance(self.archiver, UnknownArchiver)) and self.get_number_of_pages() > 0: return True return False + def extension(self) -> str: + return self.archiver.extension() + def read_metadata(self, style: int) -> GenericMetadata: if style == MetaDataStyle.CIX: @@ -937,7 +285,6 @@ class ComicArchive: # seems like some archive creators are on Windows, and don't know about case-sensitivity! if sort_list: - files = cast(list[str], natsort.os_sorted(files)) # make a sub-list of image files @@ -1256,10 +603,10 @@ class ComicArchive: return metadata - def export_as_zip(self, zip_filename: pathlib.Path | str) -> bool: - if self.archive_type == self.ArchiveType.Zip: + def export_as_zip(self, zip_filename: pathlib.Path) -> bool: + if self.archiver.name() == "ZIP": # nothing to do, we're already a zip return True - zip_archiver = ZipArchiver(zip_filename) + zip_archiver = ZipArchiver.open(zip_filename) return zip_archiver.copy_from_archive(self.archiver) diff --git a/comictaggerlib/cli.py b/comictaggerlib/cli.py index db72231..08d826f 100644 --- a/comictaggerlib/cli.py +++ b/comictaggerlib/cli.py @@ -217,14 +217,7 @@ class CLI: if self.batch_mode: brief = f"{ca.path}: " - if ca.is_sevenzip(): - brief += "7Z archive " - elif ca.is_zip(): - brief += "ZIP archive " - elif ca.is_rar(): - brief += "RAR archive " - elif ca.is_folder(): - brief += "Folder archive " + brief += ca.archiver.name() + " archive " brief += f"({page_count: >3} pages)" brief += " tags:[ " @@ -460,12 +453,7 @@ class CLI: new_ext = "" # default if self.options.filename_rename_set_extension_based_on_archive: - if ca.is_sevenzip(): - new_ext = ".cb7" - elif ca.is_zip(): - new_ext = ".cbz" - elif ca.is_rar(): - new_ext = ".cbr" + new_ext = ca.extension() renamer = FileRenamer( md, @@ -572,7 +560,7 @@ class CLI: logger.error("Cannot find %s", filename) return - ca = ComicArchive(filename, self.options.general_rar_exe_path, str(graphics_path / "nocover.png")) + ca = ComicArchive(filename, str(graphics_path / "nocover.png")) if not ca.seems_to_be_a_comic_archive(): logger.error("Sorry, but %s is not a comic archive!", filename) diff --git a/comictaggerlib/ctoptions/__init__.py b/comictaggerlib/ctoptions/__init__.py index 688d84f..255abbf 100644 --- a/comictaggerlib/ctoptions/__init__.py +++ b/comictaggerlib/ctoptions/__init__.py @@ -3,14 +3,17 @@ from __future__ import annotations from comictaggerlib.ctoptions.cmdline import initial_cmd_line_parser, register_commandline, validate_commandline_options from comictaggerlib.ctoptions.file import register_settings, validate_settings from comictaggerlib.ctoptions.talker_plugins import register_talker_settings +from comictaggerlib.ctoptions.plugin import register_plugin_settings, validate_plugin_settings from comictaggerlib.ctoptions.types import ComicTaggerPaths __all__ = [ "initial_cmd_line_parser", "register_commandline", "register_settings", + "register_plugin_settings", "register_talker_settings", "validate_commandline_options", "validate_settings", + "validate_plugin_settings", "ComicTaggerPaths", ] diff --git a/comictaggerlib/ctoptions/cmdline.py b/comictaggerlib/ctoptions/cmdline.py index f0c1c36..4d9d413 100644 --- a/comictaggerlib/ctoptions/cmdline.py +++ b/comictaggerlib/ctoptions/cmdline.py @@ -325,20 +325,16 @@ def validate_commandline_options(options: settngs.Config[settngs.Values], parser else: options[0].runtime_file_list = options[0].runtime_files - # take a crack at finding rar exe, if not set already - if options[0].general_rar_exe_path.strip() in ("", "rar"): + # take a crack at finding rar exe if it's not in the path + if not utils.which("rar"): if platform.system() == "Windows": # look in some likely places for Windows machines if os.path.exists(r"C:\Program Files\WinRAR\Rar.exe"): - options[0].general_rar_exe_path = r"C:\Program Files\WinRAR\Rar.exe" + utils.add_to_path(r"C:\Program Files\WinRAR") elif os.path.exists(r"C:\Program Files (x86)\WinRAR\Rar.exe"): - options[0].general_rar_exe_path = r"C:\Program Files (x86)\WinRAR\Rar.exe" + utils.add_to_path(r"C:\Program Files (x86)\WinRAR") else: if os.path.exists("/opt/homebrew/bin"): utils.add_to_path("/opt/homebrew/bin") - # see if it's in the path of unix user - rarpath = utils.which("rar") - if rarpath is not None: - options[0].general_rar_exe_path = "rar" return options diff --git a/comictaggerlib/ctoptions/file.py b/comictaggerlib/ctoptions/file.py index e2d87fe..c5cab51 100644 --- a/comictaggerlib/ctoptions/file.py +++ b/comictaggerlib/ctoptions/file.py @@ -12,13 +12,6 @@ from comictaggerlib.defaults import DEFAULT_REPLACEMENTS, Replacement, Replaceme def general(parser: settngs.Manager) -> None: # General Settings - parser.add_setting("--rar-exe-path", default="rar", help="The path to the rar program") - parser.add_setting( - "--allow-cbi-in-rar", - default=True, - action=argparse.BooleanOptionalAction, - help="Allows ComicBookLover tags in RAR/CBR files", - ) parser.add_setting("check_for_new_version", default=False, cmdline=False) parser.add_setting("send_usage_stats", default=False, cmdline=False) @@ -59,7 +52,6 @@ def identifier(parser: settngs.Manager) -> None: def dialog(parser: settngs.Manager) -> None: # Show/ask dialog flags - parser.add_setting("ask_about_cbi_in_rar", default=True, cmdline=False) parser.add_setting("show_disclaimer", default=True, cmdline=False) parser.add_setting("dont_notify_about_this_version", default="", cmdline=False) parser.add_setting("ask_about_usage_stats", default=True, cmdline=False) @@ -234,7 +226,7 @@ def autotag(parser: settngs.Manager) -> None: ) -def validate_settings(options: settngs.Config[settngs.Values], parser: settngs.Manager) -> dict[str, dict[str, Any]]: +def validate_settings(options: settngs.Config[settngs.Values]) -> dict[str, dict[str, Any]]: options[0].identifier_publisher_filter = [x.strip() for x in options[0].identifier_publisher_filter if x.strip()] options[0].rename_replacements = Replacements( [Replacement(x[0], x[1], x[2]) for x in options[0].rename_replacements[0]], diff --git a/comictaggerlib/ctoptions/plugin.py b/comictaggerlib/ctoptions/plugin.py new file mode 100644 index 0000000..5276907 --- /dev/null +++ b/comictaggerlib/ctoptions/plugin.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +import logging +import os + +import settngs + +import comicapi.comicarchive + +logger = logging.getLogger("comictagger") + + +def archiver(manager: settngs.Manager) -> None: + exe_registered: set[str] = set() + for archiver in comicapi.comicarchive.archivers: + if archiver.exe and archiver.exe not in exe_registered: + manager.add_setting( + f"--{archiver.exe.replace(' ', '-').replace('_', '-').strip().strip('-')}", + default=archiver.exe, + help="Path to the %(default)s executable\n\n", + ) + exe_registered.add(archiver.exe) + + +def validate_plugin_settings(options: settngs.Config) -> settngs.Config: + if "archiver" not in options[1]: + return options + cfg = settngs.normalize_config(options, file=True, cmdline=True, defaults=False) + for archiver in comicapi.comicarchive.archivers: + exe_name = archiver.exe.replace(" ", "-").replace("_", "-").strip().strip("-").replace("-", "_") + if ( + exe_name in cfg[0]["archiver"] + and cfg[0]["archiver"][exe_name] + and cfg[0]["archiver"][exe_name] != archiver.exe + ): + if os.path.basename(cfg[0]["archiver"][exe_name]) == archiver.exe: + comicapi.utils.add_to_path(os.path.dirname(cfg[0]["archiver"][exe_name])) + else: + archiver.exe = cfg[0]["archiver"][exe_name] + + return options + + +def register_plugin_settings(manager: settngs.Manager): + manager.add_persistent_group("archiver", archiver, False) diff --git a/comictaggerlib/fileselectionlist.py b/comictaggerlib/fileselectionlist.py index e5066a5..db2abab 100644 --- a/comictaggerlib/fileselectionlist.py +++ b/comictaggerlib/fileselectionlist.py @@ -193,7 +193,7 @@ class FileSelectionList(QtWidgets.QWidget): QtCore.QCoreApplication.processEvents() first_added = None - rar_added = False + rar_added_ro = False self.twList.setSortingEnabled(False) for idx, f in enumerate(filelist): QtCore.QCoreApplication.processEvents() @@ -206,8 +206,7 @@ class FileSelectionList(QtWidgets.QWidget): row = self.add_path_item(f) if row is not None: ca = self.get_archive_by_row(row) - if ca and ca.is_rar(): - rar_added = True + rar_added_ro = bool(ca and ca.archiver.name() == "RAR" and not ca.archiver.is_writable()) if first_added is None: first_added = row @@ -224,7 +223,7 @@ class FileSelectionList(QtWidgets.QWidget): else: QtWidgets.QMessageBox.information(self, "File/Folder Open", "No readable comic archives were found.") - if rar_added and not utils.which(self.options.general_rar_exe_path or "rar"): + if rar_added_ro: self.rar_ro_message() self.twList.setSortingEnabled(True) @@ -278,7 +277,7 @@ class FileSelectionList(QtWidgets.QWidget): if self.is_list_dupe(path): return self.get_current_list_row(path) - ca = ComicArchive(path, self.options.general_rar_exe_path, str(graphics_path / "nocover.png")) + ca = ComicArchive(path, str(graphics_path / "nocover.png")) if ca.seems_to_be_a_comic_archive(): row: int = self.twList.rowCount() @@ -339,14 +338,7 @@ class FileSelectionList(QtWidgets.QWidget): filename_item.setText(item_text) filename_item.setData(QtCore.Qt.ItemDataRole.ToolTipRole, item_text) - if fi.ca.is_sevenzip(): - item_text = "7Z" - elif fi.ca.is_zip(): - item_text = "ZIP" - elif fi.ca.is_rar(): - item_text = "RAR" - else: - item_text = "" + item_text = fi.ca.archiver.name() type_item.setText(item_text) type_item.setData(QtCore.Qt.ItemDataRole.ToolTipRole, item_text) diff --git a/comictaggerlib/issueselectionwindow.py b/comictaggerlib/issueselectionwindow.py index fc48fc2..12cc245 100644 --- a/comictaggerlib/issueselectionwindow.py +++ b/comictaggerlib/issueselectionwindow.py @@ -76,6 +76,21 @@ class IssueSelectionWindow(QtWidgets.QDialog): self.url_fetch_thread = None self.issue_list: list[ComicIssue] = [] + # Display talker logo and set url + self.lblIssuesSourceName.setText(talker_api.static_options.attribution_string) + + self.imageIssuesSourceWidget = CoverImageWidget( + self.imageIssuesSourceLogo, + CoverImageWidget.URLMode, + options.runtime_config.user_cache_dir, + talker_api, + False, + ) + gridlayoutIssuesSourceLogo = QtWidgets.QGridLayout(self.imageIssuesSourceLogo) + gridlayoutIssuesSourceLogo.addWidget(self.imageIssuesSourceWidget) + gridlayoutIssuesSourceLogo.setContentsMargins(0, 2, 0, 0) + self.imageIssuesSourceWidget.set_url(talker_api.source_details.logo) + if issue_number is None or issue_number == "": self.issue_number = "1" else: diff --git a/comictaggerlib/log.py b/comictaggerlib/log.py index efdf98b..1bb9f53 100644 --- a/comictaggerlib/log.py +++ b/comictaggerlib/log.py @@ -2,6 +2,10 @@ from __future__ import annotations import logging.handlers import pathlib +import platform +import sys + +from comictaggerlib.ctversion import version logger = logging.getLogger("comictagger") @@ -44,3 +48,10 @@ def setup_logging(verbose: int, log_dir: pathlib.Path) -> None: format="%(asctime)s | %(name)s | %(levelname)s | %(message)s", datefmt="%Y-%m-%dT%H:%M:%S", ) + + logger.info( + "ComicTagger Version: %s running on: %s PyInstaller: %s", + version, + platform.system(), + "Yes" if getattr(sys, "frozen", None) else "No", + ) diff --git a/comictaggerlib/main.py b/comictaggerlib/main.py index b2e7561..01e2ee1 100644 --- a/comictaggerlib/main.py +++ b/comictaggerlib/main.py @@ -18,15 +18,14 @@ from __future__ import annotations import argparse import json import logging.handlers -import platform import signal import sys from collections.abc import Mapping import settngs +import comicapi import comictalker.comictalkerapi as ct_api -from comicapi import utils from comictaggerlib import cli, ctoptions from comictaggerlib.ctversion import version from comictaggerlib.log import setup_logging @@ -54,7 +53,7 @@ def update_publishers(options: settngs.Namespace) -> None: json_file = options.runtime_config.user_config_dir / "publishers.json" if json_file.exists(): try: - utils.update_publishers(json.loads(json_file.read_text("utf-8"))) + comicapi.utils.update_publishers(json.loads(json_file.read_text("utf-8"))) except Exception: logger.exception("Failed to load publishers from %s", json_file) # show_exception_box(str(e)) @@ -71,6 +70,7 @@ class App: def run(self) -> None: opts = self.initialize() + self.load_plugins() self.initialize_dirs(opts.config) self.talker_plugins = ct_api.get_talkers(version, opts.config.user_cache_dir) self.register_options() @@ -78,6 +78,9 @@ class App: self.main() + def load_plugins(self) -> None: + comicapi.comicarchive.load_archive_plugins() + def initialize(self) -> argparse.Namespace: opts, _ = self.initial_arg_parser.parse_known_args() assert opts is not None @@ -91,6 +94,7 @@ class App: ) ctoptions.register_commandline(self.manager) ctoptions.register_settings(self.manager) + ctoptions.register_plugin_settings(self.manager) ctoptions.register_talker_settings(self.manager, self.talker_plugins) def parse_options(self, config_paths: ctoptions.ComicTaggerPaths) -> None: @@ -101,7 +105,8 @@ class App: self.options = self.manager.get_namespace(self.options) self.options = ctoptions.validate_commandline_options(self.options, self.manager) - self.options = ctoptions.validate_settings(self.options, self.manager) + self.options = ctoptions.validate_settings(self.options) + self.options = ctoptions.validate_plugin_settings(self.options) self.options = self.options def initialize_dirs(self, paths: ctoptions.ComicTaggerPaths) -> None: @@ -133,18 +138,11 @@ class App: signal.signal(signal.SIGINT, signal.SIG_DFL) - logger.info( - "ComicTagger Version: %s running on: %s PyInstaller: %s", - version, - platform.system(), - "Yes" if getattr(sys, "frozen", None) else "No", - ) - logger.debug("Installed Packages") for pkg in sorted(importlib_metadata.distributions(), key=lambda x: x.name): logger.debug("%s\t%s", pkg.metadata["Name"], pkg.metadata["Version"]) - utils.load_publishers() + comicapi.utils.load_publishers() update_publishers(self.options[0]) if not qt_available and not self.options[0].runtime_no_gui: @@ -165,6 +163,7 @@ class App: f"Failed to load settings, check the log located in '{self.options[0].runtime_config.user_log_dir}' for more details", True, ) + if self.options[0].runtime_no_gui: if error and error[1]: print(f"A fatal error occurred please check the log for more information: {error[0]}") # noqa: T201 @@ -175,3 +174,7 @@ class App: logger.exception("CLI mode failed") else: gui.open_tagger_window(talker_api, self.options, error) + + +def main(): + App().run() diff --git a/comictaggerlib/renamewindow.py b/comictaggerlib/renamewindow.py index c896d70..a856ef5 100644 --- a/comictaggerlib/renamewindow.py +++ b/comictaggerlib/renamewindow.py @@ -73,13 +73,8 @@ class RenameWindow(QtWidgets.QDialog): self.renamer.replacements = self.options[0].rename_replacements new_ext = ca.path.suffix # default - if self.options[0].rename_set_extension_based_on_archive: - if ca.is_sevenzip(): - new_ext = ".cb7" - elif ca.is_zip(): - new_ext = ".cbz" - elif ca.is_rar(): - new_ext = ".cbr" + if self.options[0].filename_rename_set_extension_based_on_archive: + new_ext = ca.extension() if md is None: md = ca.read_metadata(self.data_style) @@ -206,7 +201,7 @@ class RenameWindow(QtWidgets.QDialog): logger.info("%s: Filename is already good!", comic[1]) continue - if not comic[0].is_writable(check_rar_status=False): + if not comic[0].is_writable(check_archive_status=False): continue comic[0].rename(utils.unique_file(full_path)) diff --git a/comictaggerlib/seriesselectionwindow.py b/comictaggerlib/seriesselectionwindow.py index 088ed86..bfaa0d6 100644 --- a/comictaggerlib/seriesselectionwindow.py +++ b/comictaggerlib/seriesselectionwindow.py @@ -161,6 +161,21 @@ class SeriesSelectionWindow(QtWidgets.QDialog): # Load to retrieve settings self.talker_api = talker_api + # Display talker logo and set url + self.lblSourceName.setText(talker_api.static_options.attribution_string) + + self.imageSourceWidget = CoverImageWidget( + self.imageSourceLogo, + CoverImageWidget.URLMode, + options.runtime_config.user_cache_dir, + talker_api, + False, + ) + gridlayoutSourceLogo = QtWidgets.QGridLayout(self.imageSourceLogo) + gridlayoutSourceLogo.addWidget(self.imageSourceWidget) + gridlayoutSourceLogo.setContentsMargins(0, 2, 0, 0) + self.imageSourceWidget.set_url(talker_api.source_details.logo) + # Set the minimum row height to the default. # this way rows will be more consistent when resizeRowsToContents is called self.twList.verticalHeader().setMinimumSectionSize(self.twList.verticalHeader().defaultSectionSize()) diff --git a/comictaggerlib/settingswindow.py b/comictaggerlib/settingswindow.py index ec32e23..07b1a05 100644 --- a/comictaggerlib/settingswindow.py +++ b/comictaggerlib/settingswindow.py @@ -269,7 +269,10 @@ class SettingsWindow(QtWidgets.QDialog): def settings_to_form(self) -> None: # Copy values from settings to form - self.leRarExePath.setText(self.options[0].general_rar_exe_path) + if "archiver" in self.options[1] and "rar" in self.options[1]["archiver"]: + self.leRarExePath.setText(getattr(self.options[0], self.options[1]["archiver"]["rar"].internal_name)) + else: + self.leRarExePath.setEnabled(False) self.sbNameMatchIdentifyThresh.setValue(self.options[0].identifier_series_match_identify_thresh) self.sbNameMatchSearchThresh.setValue(self.options[0].talkers_series_match_search_thresh) self.tePublisherFilter.setPlainText("\n".join(self.options[0].identifier_publisher_filter)) @@ -374,11 +377,12 @@ class SettingsWindow(QtWidgets.QDialog): ) # Copy values from form to settings and save - self.options[0].general_rar_exe_path = str(self.leRarExePath.text()) + if "archiver" in self.options[1] and "rar" in self.options[1]["archiver"]: + setattr(self.options[0], self.options[1]["archiver"]["rar"].internal_name, str(self.leRarExePath.text())) - # make sure rar program is now in the path for the rar class - if self.options[0].general_rar_exe_path: - utils.add_to_path(os.path.dirname(self.options[0].general_rar_exe_path)) + # make sure rar program is now in the path for the rar class + if self.options[0].archivers_rar: + utils.add_to_path(os.path.dirname(str(self.leRarExePath.text()))) if not str(self.leIssueNumPadding.text()).isdigit(): self.leIssueNumPadding.setText("0") diff --git a/comictaggerlib/taggerwindow.py b/comictaggerlib/taggerwindow.py index 0d19458..98b2b05 100644 --- a/comictaggerlib/taggerwindow.py +++ b/comictaggerlib/taggerwindow.py @@ -696,16 +696,7 @@ Have fun! self.lblFilename.setText(filename) - if ca.is_sevenzip(): - self.lblArchiveType.setText("7Z archive") - elif ca.is_zip(): - self.lblArchiveType.setText("ZIP archive") - elif ca.is_rar(): - self.lblArchiveType.setText("RAR archive") - elif ca.is_folder(): - self.lblArchiveType.setText("Folder archive") - else: - self.lblArchiveType.setText("") + self.lblArchiveType.setText(ca.archiver.name() + " archive") page_count = f" ({ca.get_number_of_pages()} pages)" self.lblPageCount.setText(page_count) diff --git a/comictaggerlib/ui/issueselectionwindow.ui b/comictaggerlib/ui/issueselectionwindow.ui index 8e3845c..4d23de9 100644 --- a/comictaggerlib/ui/issueselectionwindow.ui +++ b/comictaggerlib/ui/issueselectionwindow.ui @@ -7,7 +7,7 @@ 0 0 872 - 550 + 670 @@ -74,9 +74,6 @@ Title - - AlignCenter - @@ -93,20 +90,78 @@ - - - - 300 - 450 - - - - - 300 - 450 - - - + + + + + + 300 + 450 + + + + + 300 + 450 + + + + + + + + + 0 + 0 + + + + 2 + + + 1 + + + Qt::Horizontal + + + + + + + + 300 + 16777215 + + + + Data Source: + + + Qt::AlignLeading|Qt::AlignLeft|Qt::AlignTop + + + true + + + + + + + + 300 + 100 + + + + + 300 + 16777215 + + + + + diff --git a/comictaggerlib/ui/seriesselectionwindow.ui b/comictaggerlib/ui/seriesselectionwindow.ui index 4812423..ceafc65 100644 --- a/comictaggerlib/ui/seriesselectionwindow.ui +++ b/comictaggerlib/ui/seriesselectionwindow.ui @@ -7,7 +7,7 @@ 0 0 950 - 480 + 600 @@ -17,23 +17,96 @@ false - + - - - - 300 - 450 - + + + 0 - - - 300 - 450 - + + QLayout::SetMaximumSize - + + 0 + + + + + + 0 + 0 + + + + + 300 + 450 + + + + + 300 + 450 + + + + + + + + 2 + + + 1 + + + Qt::Horizontal + + + + + + + + 0 + 0 + + + + + 300 + 16777215 + + + + Data Source: + + + Qt::AlignLeading|Qt::AlignLeft|Qt::AlignTop + + + true + + + + + + + + 300 + 100 + + + + + 300 + 16777215 + + + + + @@ -85,17 +158,11 @@ Year - - AlignCenter - Issues - - AlignCenter - diff --git a/comictalker/talkerbase.py b/comictalker/talkerbase.py index a6607d6..36d5f3f 100644 --- a/comictalker/talkerbase.py +++ b/comictalker/talkerbase.py @@ -41,6 +41,7 @@ class SourceStaticOptions: def __init__( self, website: str = "", + attribution_string: str = "", # Full string including web link, example: Metadata provided by Example has_issues: bool = False, has_alt_covers: bool = False, requires_apikey: bool = False, @@ -48,6 +49,7 @@ class SourceStaticOptions: has_censored_covers: bool = False, ) -> None: self.website = website + self.attribution_string = attribution_string self.has_issues = has_issues self.has_alt_covers = has_alt_covers self.requires_apikey = requires_apikey diff --git a/comictalker/talkers/comicvine.py b/comictalker/talkers/comicvine.py index e72a44d..07a6f83 100644 --- a/comictalker/talkers/comicvine.py +++ b/comictalker/talkers/comicvine.py @@ -160,9 +160,14 @@ class ComicVineTalker(ComicTalker): cache_folder: pathlib.Path, ): super().__init__(version, cache_folder) - self.source_details = SourceDetails(name="Comic Vine", ident="comicvine") + self.source_details = SourceDetails( + name="Comic Vine", + ident="comicvine", + logo="https://comicvine.gamespot.com/a/bundles/comicvinesite/images/logo.png", + ) self.static_options = SourceStaticOptions( website="https://comicvine.gamespot.com/", + attribution_string="Metadata provided by Comic Vine", has_issues=True, has_alt_covers=True, requires_apikey=True, diff --git a/requirements.txt b/requirements.txt index 3e4ea89..2d29204 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ pycountry pyicu; sys_platform == 'linux' or sys_platform == 'darwin' rapidfuzz>=2.12.0 requests==2.* -settngs==0.3.0 +settngs==0.5.0 text2digits typing_extensions wordninja diff --git a/setup.py b/setup.py index e8d1c3d..a6f8c0f 100644 --- a/setup.py +++ b/setup.py @@ -59,12 +59,18 @@ setup( exclude=["tests", "testing"], ), package_data={"comictaggerlib": ["ui/*", "graphics/*"], "comicapi": ["data/*"]}, - entry_points=dict( - console_scripts=["comictagger=comictaggerlib.main:main"], - pyinstaller40=[ + entry_points={ + "console_scripts": ["comictagger=comictaggerlib.main:main"], + "pyinstaller40": [ "hook-dirs = comictaggerlib.__pyinstaller:get_hook_dirs", ], - ), + "comicapi.archiver": [ + "zip = comicapi.archivers.zip:ZipArchiver", + "sevenzip = comicapi.archivers.sevenzip:SevenZipArchiver", + "rar = comicapi.archivers.rar:RarArchiver", + "folder = comicapi.archivers.folder:FolderArchiver", + ], + }, classifiers=[ "Development Status :: 4 - Beta", "Environment :: Console", diff --git a/tests/comicarchive_test.py b/tests/comicarchive_test.py index b675431..e0949aa 100644 --- a/tests/comicarchive_test.py +++ b/tests/comicarchive_test.py @@ -4,15 +4,17 @@ import platform import shutil import pytest +from importlib_metadata import entry_points import comicapi.comicarchive import comicapi.genericmetadata from testing.filenames import datadir -@pytest.mark.xfail(not comicapi.comicarchive.rar_support, reason="rar support") +@pytest.mark.xfail(not comicapi.archivers.rar.rar_support, reason="rar support") def test_getPageNameList(): c = comicapi.comicarchive.ComicArchive(datadir / "fake_cbr.cbr") + assert c.seems_to_be_a_comic_archive() pageNameList = c.get_page_name_list() assert pageNameList == [ @@ -56,24 +58,26 @@ def test_save_cbi(tmp_comic): md = tmp_comic.read_cbi() -@pytest.mark.xfail(not (comicapi.comicarchive.rar_support and shutil.which("rar")), reason="rar support") +@pytest.mark.xfail(not (comicapi.archivers.rar.rar_support and shutil.which("rar")), reason="rar support") def test_save_cix_rar(tmp_path): cbr_path = datadir / "fake_cbr.cbr" shutil.copy(cbr_path, tmp_path) tmp_comic = comicapi.comicarchive.ComicArchive(tmp_path / cbr_path.name) + assert tmp_comic.seems_to_be_a_comic_archive() assert tmp_comic.write_cix(comicapi.genericmetadata.md_test) md = tmp_comic.read_cix() assert md.replace(pages=[]) == comicapi.genericmetadata.md_test.replace(pages=[]) -@pytest.mark.xfail(not (comicapi.comicarchive.rar_support and shutil.which("rar")), reason="rar support") +@pytest.mark.xfail(not (comicapi.archivers.rar.rar_support and shutil.which("rar")), reason="rar support") def test_save_cbi_rar(tmp_path): cbr_path = datadir / "fake_cbr.cbr" shutil.copy(cbr_path, tmp_path) tmp_comic = comicapi.comicarchive.ComicArchive(tmp_path / cbr_path.name) + assert tmp_comic.seems_to_be_a_comic_archive() assert tmp_comic.write_cbi(comicapi.genericmetadata.md_test) md = tmp_comic.read_cbi() @@ -118,16 +122,8 @@ def test_invalid_zip(tmp_comic): archivers = [ - comicapi.comicarchive.ZipArchiver, - comicapi.comicarchive.FolderArchiver, - pytest.param( - comicapi.comicarchive.SevenZipArchiver, - marks=pytest.mark.xfail(not (comicapi.comicarchive.z7_support), reason="7z support"), - ), - pytest.param( - comicapi.comicarchive.RarArchiver, - marks=pytest.mark.xfail(not (comicapi.comicarchive.rar_support and shutil.which("rar")), reason="rar support"), - ), + pytest.param(x.load(), marks=pytest.mark.xfail(not (x.load().enabled), reason="archiver not enabled")) + for x in entry_points(group="comicapi_archivers") ] @@ -135,7 +131,7 @@ archivers = [ def test_copy_from_archive(archiver, tmp_path, cbz): comic_path = tmp_path / cbz.path.with_suffix("").name - archive = archiver(comic_path) + archive = archiver.open(comic_path) assert archive.copy_from_archive(cbz.archiver)