Merge branch 'plugableArchivers' into develop

This commit is contained in:
Timmy Welch 2023-01-31 19:44:07 -08:00
commit 7c4e5b775b
No known key found for this signature in database
21 changed files with 978 additions and 788 deletions

View File

@ -1,6 +1,6 @@
from __future__ import annotations
from PyInstaller.utils.hooks import collect_data_files
from PyInstaller.utils.hooks import collect_data_files, collect_entry_point
datas = []
datas, hiddenimports = collect_entry_point("comicapi.archiver")
datas += collect_data_files("comicapi.data")

View File

@ -0,0 +1,15 @@
from __future__ import annotations
from comicapi.archivers.archiver import Archiver
from comicapi.archivers.folder import FolderArchiver
from comicapi.archivers.rar import RarArchiver
from comicapi.archivers.sevenzip import SevenZipArchiver
from comicapi.archivers.zip import ZipArchiver
class UnknownArchiver(Archiver):
def name(self) -> str:
return "Unknown"
__all__ = ["Archiver", "UnknownArchiver", "FolderArchiver", "RarArchiver", "ZipArchiver", "SevenZipArchiver"]

View File

@ -0,0 +1,130 @@
from __future__ import annotations
import pathlib
from typing import Protocol, runtime_checkable
@runtime_checkable
class Archiver(Protocol):
"""Archiver Protocol"""
"""The path to the archive"""
path: pathlib.Path
"""
The name of the executable used for this archiver. This should be the base name of the executable.
For example if 'rar.exe' is needed this should be "rar".
If an executable is not used this should be the empty string.
"""
exe: str = ""
"""
Whether or not this archiver is enabled.
If external imports are required and are not available this should be false. See rar.py and sevenzip.py.
"""
enabled: bool = True
def __init__(self):
self.path = pathlib.Path()
def get_comment(self) -> str:
"""
Returns the comment from the archive as a string.
Should always return a string. If comments are not supported in the archive the empty string should be returned.
"""
return ""
def set_comment(self, comment: str) -> bool:
"""
Returns True if the comment was successfully set on the archive.
Should always return a boolean. If comments are not supported in the archive False should be returned.
"""
return False
def supports_comment(self) -> bool:
"""
Returns True if the archive supports comments.
Should always return a boolean. If comments are not supported in the archive False should be returned.
"""
return False
def read_file(self, archive_file: str) -> bytes:
"""
Reads the named file from the archive.
archive_file should always come from the output of get_filename_list.
Should always return a bytes object. Exceptions should be of the type OSError.
"""
raise NotImplementedError
def remove_file(self, archive_file: str) -> bool:
"""
Removes the named file from the archive.
archive_file should always come from the output of get_filename_list.
Should always return a boolean. Failures should return False.
Rebuilding the archive without the named file is a standard way to remove a file.
"""
return False
def write_file(self, archive_file: str, data: bytes) -> bool:
"""
Writes the named file to the archive.
Should always return a boolean. Failures should return False.
"""
return False
def get_filename_list(self) -> list[str]:
"""
Returns a list of filenames in the archive.
Should always return a list of string. Failures should return an empty list.
"""
return []
def copy_from_archive(self, other_archive: Archiver) -> bool:
"""
Copies the contents of another achive to this archive.
Should always return a boolean. Failures should return False.
"""
return False
def is_writable(self) -> bool:
"""
Retuns True if the archive is writeable
Should always return a boolean. Failures should return False.
"""
return False
def extension(self) -> str:
"""
Returns the extension that this archive should use eg ".cbz".
Should always return a string. Failures should return the empty string.
"""
return ""
def name(self) -> str:
"""
Returns the name of this archive for display purposes eg "CBZ".
Should always return a string. Failures should return the empty string.
"""
return ""
@classmethod
def is_valid(cls, path: pathlib.Path) -> bool:
"""
Returns True if the given path can be opened by this archive.
Should always return a boolean. Failures should return False.
"""
return False
@classmethod
def open(cls, path: pathlib.Path) -> Archiver:
"""
Opens the given archive.
Should always return a an Archver.
Should never cause an exception no file operations should take place in this method,
is_valid will always be called before open.
"""
archiver = cls()
archiver.path = path
return archiver

View File

@ -0,0 +1,97 @@
from __future__ import annotations
import logging
import os
import pathlib
from comicapi.archivers import Archiver
logger = logging.getLogger(__name__)
class FolderArchiver(Archiver):
"""Folder implementation"""
def __init__(self) -> None:
super().__init__()
self.comment_file_name = "ComicTaggerFolderComment.txt"
def get_comment(self) -> str:
try:
return self.read_file(self.comment_file_name).decode("utf-8")
except OSError:
return ""
def set_comment(self, comment: str) -> bool:
if (self.path / self.comment_file_name).exists() or comment:
return self.write_file(self.comment_file_name, comment.encode("utf-8"))
return True
def read_file(self, archive_file: str) -> bytes:
try:
with open(self.path / archive_file, mode="rb") as f:
data = f.read()
except OSError as e:
logger.error("Error reading folder archive [%s]: %s :: %s", e, self.path, archive_file)
raise
return data
def remove_file(self, archive_file: str) -> bool:
try:
(self.path / archive_file).unlink(missing_ok=True)
except OSError as e:
logger.error("Error removing file for folder archive [%s]: %s :: %s", e, self.path, archive_file)
return False
else:
return True
def write_file(self, archive_file: str, data: bytes) -> bool:
try:
file_path = self.path / archive_file
file_path.parent.mkdir(exist_ok=True, parents=True)
with open(self.path / archive_file, mode="wb") as f:
f.write(data)
except OSError as e:
logger.error("Error writing folder archive [%s]: %s :: %s", e, self.path, archive_file)
return False
else:
return True
def get_filename_list(self) -> list[str]:
filenames = []
try:
for root, _dirs, files in os.walk(self.path):
for f in files:
filenames.append(os.path.relpath(os.path.join(root, f), self.path).replace(os.path.sep, "/"))
return filenames
except OSError as e:
logger.error("Error listing files in folder archive [%s]: %s", e, self.path)
return []
def copy_from_archive(self, other_archive: Archiver) -> bool:
"""Replace the current zip with one copied from another archive"""
try:
for filename in other_archive.get_filename_list():
data = other_archive.read_file(filename)
if data is not None:
self.write_file(filename, data)
# preserve the old comment
comment = other_archive.get_comment()
if comment is not None:
if not self.set_comment(comment):
return False
except Exception:
logger.exception("Error while copying archive from %s to %s", other_archive.path, self.path)
return False
else:
return True
def name(self) -> str:
return "Folder"
@classmethod
def is_valid(cls, path: pathlib.Path) -> bool:
return path.is_dir()

263
comicapi/archivers/rar.py Normal file
View File

@ -0,0 +1,263 @@
from __future__ import annotations
import logging
import os
import pathlib
import platform
import shutil
import subprocess
import tempfile
import time
from comicapi.archivers import Archiver
try:
from unrar.cffi import rarfile
rar_support = True
except ImportError:
rar_support = False
logger = logging.getLogger(__name__)
if not rar_support:
logger.error("unrar-cffi unavailable")
class RarArchiver(Archiver):
"""RAR implementation"""
enabled = rar_support
exe = "rar"
def __init__(self) -> None:
super().__init__()
# windows only, keeps the cmd.exe from popping up
if platform.system() == "Windows":
self.startupinfo = subprocess.STARTUPINFO() # type: ignore
self.startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore
else:
self.startupinfo = None
def get_comment(self) -> str:
rarc = self.get_rar_obj()
return rarc.comment.decode("utf-8") if rarc else ""
def set_comment(self, comment: str) -> bool:
if rar_support and self.exe:
try:
# write comment to temp file
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_file = pathlib.Path(tmp_dir) / "rar_comment.txt"
tmp_file.write_text(comment, encoding="utf-8")
working_dir = os.path.dirname(os.path.abspath(self.path))
# use external program to write comment to Rar archive
proc_args = [
self.exe,
"c",
f"-w{working_dir}",
"-c-",
f"-z{tmp_file}",
str(self.path),
]
subprocess.run(
proc_args,
startupinfo=self.startupinfo,
stdout=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=True,
)
if platform.system() == "Darwin":
time.sleep(1)
except (subprocess.CalledProcessError, OSError) as e:
logger.exception("Error writing comment to rar archive [%s]: %s", e, self.path)
return False
else:
return True
else:
return False
def read_file(self, archive_file: str) -> bytes:
rarc = self.get_rar_obj()
if rarc is None:
return b""
tries = 0
while tries < 7:
try:
tries = tries + 1
data: bytes = rarc.open(archive_file).read()
entries = [(rarc.getinfo(archive_file), data)]
if entries[0][0].file_size != len(entries[0][1]):
logger.info(
"Error reading rar archive [file is not expected size: %d vs %d] %s :: %s :: tries #%d",
entries[0][0].file_size,
len(entries[0][1]),
self.path,
archive_file,
tries,
)
continue
except OSError as e:
logger.error("Error reading rar archive [%s]: %s :: %s :: tries #%d", e, self.path, archive_file, tries)
time.sleep(1)
except Exception as e:
logger.error(
"Unexpected exception reading rar archive [%s]: %s :: %s :: tries #%d",
e,
self.path,
archive_file,
tries,
)
break
else:
# Success. Entries is a list of of tuples: ( rarinfo, filedata)
if len(entries) == 1:
return entries[0][1]
raise OSError
raise OSError
def remove_file(self, archive_file: str) -> bool:
if self.exe:
# use external program to remove file from Rar archive
result = subprocess.run(
[self.exe, "d", "-c-", self.path, archive_file],
startupinfo=self.startupinfo,
stdout=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
if platform.system() == "Darwin":
time.sleep(1)
if result.returncode != 0:
logger.error(
"Error removing file from rar archive [exitcode: %d]: %s :: %s",
result.returncode,
self.path,
archive_file,
)
return False
return True
else:
return False
def write_file(self, archive_file: str, data: bytes) -> bool:
if self.exe:
archive_path = pathlib.PurePosixPath(archive_file)
archive_name = archive_path.name
archive_parent = str(archive_path.parent).lstrip("./")
# use external program to write file to Rar archive
result = subprocess.run(
[self.exe, "a", f"-si{archive_name}", f"-ap{archive_parent}", "-c-", "-ep", self.path],
input=data,
startupinfo=self.startupinfo,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
if platform.system() == "Darwin":
time.sleep(1)
if result.returncode != 0:
logger.error(
"Error writing rar archive [exitcode: %d]: %s :: %s", result.returncode, self.path, archive_file
)
return False
else:
return True
else:
return False
def get_filename_list(self) -> list[str]:
rarc = self.get_rar_obj()
tries = 0
if rar_support and rarc:
while tries < 7:
try:
tries = tries + 1
namelist = []
for item in rarc.infolist():
if item.file_size != 0:
namelist.append(item.filename)
except OSError as e:
logger.error("Error listing files in rar archive [%s]: %s :: attempt #%d", e, self.path, tries)
time.sleep(1)
else:
return namelist
return []
def copy_from_archive(self, other_archive: Archiver) -> bool:
"""Replace the current archive with one copied from another archive"""
try:
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_path = pathlib.Path(tmp_dir)
rar_cwd = tmp_path / "rar"
rar_cwd.mkdir(exist_ok=True)
rar_path = (tmp_path / self.path.name).with_suffix(".rar")
for filename in other_archive.get_filename_list():
(rar_cwd / filename).parent.mkdir(exist_ok=True, parents=True)
data = other_archive.read_file(filename)
if data is not None:
with open(rar_cwd / filename, mode="w+b") as tmp_file:
tmp_file.write(data)
result = subprocess.run(
[self.exe, "a", "-r", "-c-", str(rar_path.absolute()), "."],
cwd=rar_cwd.absolute(),
startupinfo=self.startupinfo,
stdout=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
if result.returncode != 0:
logger.error("Error while copying to rar archive [exitcode: %d]: %s", result.returncode, self.path)
return False
self.path.unlink(missing_ok=True)
shutil.move(rar_path, self.path)
except Exception as e:
logger.exception("Error while copying to rar archive [%s]: from %s to %s", e, other_archive.path, self.path)
return False
else:
return True
def is_writable(self) -> bool:
return bool(self.exe and (os.path.exists(self.exe) or shutil.which(self.exe)))
def extension(self) -> str:
return ".cbr"
def name(self) -> str:
return "RAR"
@classmethod
def is_valid(cls, path: pathlib.Path) -> bool:
if rar_support:
return rarfile.is_rarfile(str(path))
return False
def get_rar_obj(self) -> rarfile.RarFile | None:
if rar_support:
try:
rarc = rarfile.RarFile(str(self.path))
except (OSError, rarfile.RarFileError) as e:
logger.error("Unable to get rar object [%s]: %s", e, self.path)
else:
return rarc
return None

View File

@ -0,0 +1,131 @@
from __future__ import annotations
import logging
import os
import pathlib
import shutil
import tempfile
from comicapi.archivers import Archiver
try:
import py7zr
z7_support = True
except ImportError:
z7_support = False
logger = logging.getLogger(__name__)
class SevenZipArchiver(Archiver):
"""7Z implementation"""
enabled = z7_support
def __init__(self) -> None:
super().__init__()
# @todo: Implement Comment?
def get_comment(self) -> str:
return ""
def set_comment(self, comment: str) -> bool:
return False
def read_file(self, archive_file: str) -> bytes:
data = b""
try:
with py7zr.SevenZipFile(self.path, "r") as zf:
data = zf.read(archive_file)[archive_file].read()
except (py7zr.Bad7zFile, OSError) as e:
logger.error("Error reading 7zip archive [%s]: %s :: %s", e, self.path, archive_file)
raise
return data
def remove_file(self, archive_file: str) -> bool:
return self.rebuild([archive_file])
def write_file(self, archive_file: str, data: bytes) -> bool:
# At the moment, no other option but to rebuild the whole
# archive w/o the indicated file. Very sucky, but maybe
# another solution can be found
files = self.get_filename_list()
if archive_file in files:
if not self.rebuild([archive_file]):
return False
try:
# now just add the archive file as a new one
with py7zr.SevenZipFile(self.path, "a") as zf:
zf.writestr(data, archive_file)
return True
except (py7zr.Bad7zFile, OSError) as e:
logger.error("Error writing 7zip archive [%s]: %s :: %s", e, self.path, archive_file)
return False
def get_filename_list(self) -> list[str]:
try:
with py7zr.SevenZipFile(self.path, "r") as zf:
namelist: list[str] = [file.filename for file in zf.list() if not file.is_directory]
return namelist
except (py7zr.Bad7zFile, OSError) as e:
logger.error("Error listing files in 7zip archive [%s]: %s", e, self.path)
return []
def rebuild(self, exclude_list: list[str]) -> bool:
"""Zip helper func
This recompresses the zip archive, without the files in the exclude_list
"""
try:
# py7zr treats all archives as if they used solid compression
# so we need to get the filename list first to read all the files at once
with py7zr.SevenZipFile(self.path, mode="r") as zin:
targets = [f for f in zin.getnames() if f not in exclude_list]
with tempfile.NamedTemporaryFile(dir=os.path.dirname(self.path), delete=False) as tmp_file:
with py7zr.SevenZipFile(tmp_file.file, mode="w") as zout:
with py7zr.SevenZipFile(self.path, mode="r") as zin:
for filename, buffer in zin.read(targets).items():
zout.writef(buffer, filename)
self.path.unlink(missing_ok=True)
tmp_file.close() # Required on windows
shutil.move(tmp_file.name, self.path)
except (py7zr.Bad7zFile, OSError) as e:
logger.error("Error rebuilding 7zip file [%s]: %s", e, self.path)
return False
return True
def copy_from_archive(self, other_archive: Archiver) -> bool:
"""Replace the current zip with one copied from another archive"""
try:
with py7zr.SevenZipFile(self.path, "w") as zout:
for filename in other_archive.get_filename_list():
data = other_archive.read_file(
filename
) # This will be very inefficient if other_archive is a 7z file
if data is not None:
zout.writestr(data, filename)
except Exception as e:
logger.error("Error while copying to 7zip archive [%s]: from %s to %s", e, other_archive.path, self.path)
return False
else:
return True
def is_writable(self) -> bool:
return True
def extension(self) -> str:
return ".cb7"
def name(self) -> str:
return "Seven Zip"
@classmethod
def is_valid(cls, path: pathlib.Path) -> bool:
return py7zr.is_7zfile(path)

192
comicapi/archivers/zip.py Normal file
View File

@ -0,0 +1,192 @@
from __future__ import annotations
import logging
import os
import pathlib
import shutil
import struct
import tempfile
import zipfile
from typing import cast
from comicapi.archivers import Archiver
logger = logging.getLogger(__name__)
class ZipArchiver(Archiver):
"""ZIP implementation"""
def __init__(self) -> None:
super().__init__()
def get_comment(self) -> str:
with zipfile.ZipFile(self.path, "r") as zf:
comment = zf.comment.decode("utf-8")
return comment
def set_comment(self, comment: str) -> bool:
with zipfile.ZipFile(self.path, mode="a") as zf:
zf.comment = bytes(comment, "utf-8")
return True
def read_file(self, archive_file: str) -> bytes:
with zipfile.ZipFile(self.path, mode="r") as zf:
try:
data = zf.read(archive_file)
except (zipfile.BadZipfile, OSError) as e:
logger.error("Error reading zip archive [%s]: %s :: %s", e, self.path, archive_file)
raise
return data
def remove_file(self, archive_file: str) -> bool:
return self.rebuild([archive_file])
def write_file(self, archive_file: str, data: bytes) -> bool:
# At the moment, no other option but to rebuild the whole
# zip archive w/o the indicated file. Very sucky, but maybe
# another solution can be found
files = self.get_filename_list()
if archive_file in files:
if not self.rebuild([archive_file]):
return False
try:
# now just add the archive file as a new one
with zipfile.ZipFile(self.path, mode="a", allowZip64=True, compression=zipfile.ZIP_DEFLATED) as zf:
zf.writestr(archive_file, data)
return True
except (zipfile.BadZipfile, OSError) as e:
logger.error("Error writing zip archive [%s]: %s :: %s", e, self.path, archive_file)
return False
def get_filename_list(self) -> list[str]:
try:
with zipfile.ZipFile(self.path, mode="r") as zf:
namelist = [file.filename for file in zf.infolist() if not file.is_dir()]
return namelist
except (zipfile.BadZipfile, OSError) as e:
logger.error("Error listing files in zip archive [%s]: %s", e, self.path)
return []
def rebuild(self, exclude_list: list[str]) -> bool:
"""Zip helper func
This recompresses the zip archive, without the files in the exclude_list
"""
try:
with zipfile.ZipFile(
tempfile.NamedTemporaryFile(dir=os.path.dirname(self.path), delete=False), "w", allowZip64=True
) as zout:
with zipfile.ZipFile(self.path, mode="r") as zin:
for item in zin.infolist():
buffer = zin.read(item.filename)
if item.filename not in exclude_list:
zout.writestr(item, buffer)
# preserve the old comment
zout.comment = zin.comment
# replace with the new file
self.path.unlink(missing_ok=True)
zout.close() # Required on windows
shutil.move(cast(str, zout.filename), self.path)
except (zipfile.BadZipfile, OSError) as e:
logger.error("Error rebuilding zip file [%s]: %s", e, self.path)
return False
return True
def copy_from_archive(self, other_archive: Archiver) -> bool:
"""Replace the current zip with one copied from another archive"""
try:
with zipfile.ZipFile(self.path, mode="w", allowZip64=True) as zout:
for filename in other_archive.get_filename_list():
data = other_archive.read_file(filename)
if data is not None:
zout.writestr(filename, data)
# preserve the old comment
comment = other_archive.get_comment()
if comment is not None:
if not self.write_zip_comment(self.path, comment):
return False
except Exception as e:
logger.error("Error while copying to zip archive [%s]: from %s to %s", e, other_archive.path, self.path)
return False
else:
return True
def is_writable(self) -> bool:
return True
def extension(self) -> str:
return ".cbz"
def name(self) -> str:
return "ZIP"
@classmethod
def is_valid(cls, path: pathlib.Path) -> bool:
return zipfile.is_zipfile(path)
def write_zip_comment(self, filename: pathlib.Path | str, comment: str) -> bool:
"""
This is a custom function for writing a comment to a zip file,
since the built-in one doesn't seem to work on Windows and Mac OS/X
Fortunately, the zip comment is at the end of the file, and it's
easy to manipulate. See this website for more info:
see: http://en.wikipedia.org/wiki/Zip_(file_format)#Structure
"""
# get file size
statinfo = os.stat(filename)
file_length = statinfo.st_size
try:
with open(filename, mode="r+b") as file:
# the starting position, relative to EOF
pos = -4
found = False
# walk backwards to find the "End of Central Directory" record
while (not found) and (-pos != file_length):
# seek, relative to EOF
file.seek(pos, 2)
value = file.read(4)
# look for the end of central directory signature
if bytearray(value) == bytearray([0x50, 0x4B, 0x05, 0x06]):
found = True
else:
# not found, step back another byte
pos = pos - 1
if found:
# now skip forward 20 bytes to the comment length word
pos += 20
file.seek(pos, 2)
# Pack the length of the comment string
fmt = "H" # one 2-byte integer
comment_length = struct.pack(fmt, len(comment)) # pack integer in a binary string
# write out the length
file.write(comment_length)
file.seek(pos + 2, 2)
# write out the comment itself
file.write(comment.encode("utf-8"))
file.truncate()
else:
raise Exception("Could not find the End of Central Directory record!")
except Exception as e:
logger.error("Error writing comment to zip archive [%s]: %s", e, self.path)
return False
else:
return True

View File

@ -18,36 +18,24 @@ import io
import logging
import os
import pathlib
import platform
import shutil
import struct
import subprocess
import tempfile
import time
import zipfile
import sys
from typing import cast
import natsort
import wordninja
from comicapi import filenamelexer, filenameparser, utils
from comicapi.archivers import Archiver, UnknownArchiver, ZipArchiver
from comicapi.comet import CoMet
from comicapi.comicbookinfo import ComicBookInfo
from comicapi.comicinfoxml import ComicInfoXml
from comicapi.genericmetadata import GenericMetadata, PageType
try:
import py7zr
z7_support = True
except ImportError:
z7_support = False
try:
from unrar.cffi import rarfile
rar_support = True
except ImportError:
rar_support = False
if sys.version_info < (3, 10):
from importlib_metadata import entry_points
else:
from importlib.metadata import entry_points
try:
from PIL import Image
@ -56,12 +44,28 @@ try:
except ImportError:
pil_available = False
logger = logging.getLogger(__name__)
if not pil_available:
logger.error("PIL unavalable")
if not rar_support:
logger.error("unrar-cffi unavailable")
archivers: list[type[Archiver]] = []
def load_archive_plugins() -> None:
if not archivers:
builtin: list[type[Archiver]] = []
for arch in entry_points(group="comicapi.archiver"):
try:
archiver: type[Archiver] = arch.load()
if archiver.enabled:
if arch.module.startswith("comicapi"):
builtin.append(archiver)
else:
archivers.append(archiver)
except Exception:
logger.warning("Failed to load talker: %s", arch.name)
archivers.extend(builtin)
class MetaDataStyle:
@ -72,614 +76,12 @@ class MetaDataStyle:
short_name = ["cbl", "cr", "comet"]
class UnknownArchiver:
"""Unknown implementation"""
def __init__(self, path: pathlib.Path | str) -> None:
self.path = pathlib.Path(path)
def get_comment(self) -> str:
return ""
def set_comment(self, comment: str) -> bool:
return False
def read_file(self, archive_file: str) -> bytes:
raise NotImplementedError
def remove_file(self, archive_file: str) -> bool:
return False
def write_file(self, archive_file: str, data: bytes) -> bool:
return False
def get_filename_list(self) -> list[str]:
return []
def rebuild(self, exclude_list: list[str]) -> bool:
return False
def copy_from_archive(self, other_archive: UnknownArchiver) -> bool:
return False
class SevenZipArchiver(UnknownArchiver):
"""7Z implementation"""
def __init__(self, path: pathlib.Path | str) -> None:
super().__init__(path)
# @todo: Implement Comment?
def get_comment(self) -> str:
return ""
def set_comment(self, comment: str) -> bool:
return False
def read_file(self, archive_file: str) -> bytes:
data = b""
try:
with py7zr.SevenZipFile(self.path, "r") as zf:
data = zf.read(archive_file)[archive_file].read()
except (py7zr.Bad7zFile, OSError) as e:
logger.error("Error reading 7zip archive [%s]: %s :: %s", e, self.path, archive_file)
raise
return data
def remove_file(self, archive_file: str) -> bool:
return self.rebuild([archive_file])
def write_file(self, archive_file: str, data: bytes) -> bool:
# At the moment, no other option but to rebuild the whole
# archive w/o the indicated file. Very sucky, but maybe
# another solution can be found
files = self.get_filename_list()
if archive_file in files:
if not self.rebuild([archive_file]):
return False
try:
# now just add the archive file as a new one
with py7zr.SevenZipFile(self.path, "a") as zf:
zf.writestr(data, archive_file)
return True
except (py7zr.Bad7zFile, OSError) as e:
logger.error("Error writing 7zip archive [%s]: %s :: %s", e, self.path, archive_file)
return False
def get_filename_list(self) -> list[str]:
try:
with py7zr.SevenZipFile(self.path, "r") as zf:
namelist: list[str] = [file.filename for file in zf.list() if not file.is_directory]
return namelist
except (py7zr.Bad7zFile, OSError) as e:
logger.error("Error listing files in 7zip archive [%s]: %s", e, self.path)
return []
def rebuild(self, exclude_list: list[str]) -> bool:
"""Zip helper func
This recompresses the zip archive, without the files in the exclude_list
"""
try:
# py7zr treats all archives as if they used solid compression
# so we need to get the filename list first to read all the files at once
with py7zr.SevenZipFile(self.path, mode="r") as zin:
targets = [f for f in zin.getnames() if f not in exclude_list]
with tempfile.NamedTemporaryFile(dir=os.path.dirname(self.path), delete=False) as tmp_file:
with py7zr.SevenZipFile(tmp_file.file, mode="w") as zout:
with py7zr.SevenZipFile(self.path, mode="r") as zin:
for filename, buffer in zin.read(targets).items():
zout.writef(buffer, filename)
self.path.unlink(missing_ok=True)
tmp_file.close() # Required on windows
shutil.move(tmp_file.name, self.path)
except (py7zr.Bad7zFile, OSError) as e:
logger.error("Error rebuilding 7zip file [%s]: %s", e, self.path)
return False
return True
def copy_from_archive(self, other_archive: UnknownArchiver) -> bool:
"""Replace the current zip with one copied from another archive"""
try:
with py7zr.SevenZipFile(self.path, "w") as zout:
for filename in other_archive.get_filename_list():
data = other_archive.read_file(
filename
) # This will be very inefficient if other_archive is a 7z file
if data is not None:
zout.writestr(data, filename)
except Exception as e:
logger.error("Error while copying to 7zip archive [%s]: from %s to %s", e, other_archive.path, self.path)
return False
else:
return True
class ZipArchiver(UnknownArchiver):
"""ZIP implementation"""
def __init__(self, path: pathlib.Path | str) -> None:
super().__init__(path)
def get_comment(self) -> str:
with zipfile.ZipFile(self.path, "r") as zf:
comment = zf.comment.decode("utf-8")
return comment
def set_comment(self, comment: str) -> bool:
with zipfile.ZipFile(self.path, mode="a") as zf:
zf.comment = bytes(comment, "utf-8")
return True
def read_file(self, archive_file: str) -> bytes:
with zipfile.ZipFile(self.path, mode="r") as zf:
try:
data = zf.read(archive_file)
except (zipfile.BadZipfile, OSError) as e:
logger.error("Error reading zip archive [%s]: %s :: %s", e, self.path, archive_file)
raise
return data
def remove_file(self, archive_file: str) -> bool:
return self.rebuild([archive_file])
def write_file(self, archive_file: str, data: bytes) -> bool:
# At the moment, no other option but to rebuild the whole
# zip archive w/o the indicated file. Very sucky, but maybe
# another solution can be found
files = self.get_filename_list()
if archive_file in files:
if not self.rebuild([archive_file]):
return False
try:
# now just add the archive file as a new one
with zipfile.ZipFile(self.path, mode="a", allowZip64=True, compression=zipfile.ZIP_DEFLATED) as zf:
zf.writestr(archive_file, data)
return True
except (zipfile.BadZipfile, OSError) as e:
logger.error("Error writing zip archive [%s]: %s :: %s", e, self.path, archive_file)
return False
def get_filename_list(self) -> list[str]:
try:
with zipfile.ZipFile(self.path, mode="r") as zf:
namelist = [file.filename for file in zf.infolist() if not file.is_dir()]
return namelist
except (zipfile.BadZipfile, OSError) as e:
logger.error("Error listing files in zip archive [%s]: %s", e, self.path)
return []
def rebuild(self, exclude_list: list[str]) -> bool:
"""Zip helper func
This recompresses the zip archive, without the files in the exclude_list
"""
try:
with zipfile.ZipFile(
tempfile.NamedTemporaryFile(dir=os.path.dirname(self.path), delete=False), "w", allowZip64=True
) as zout:
with zipfile.ZipFile(self.path, mode="r") as zin:
for item in zin.infolist():
buffer = zin.read(item.filename)
if item.filename not in exclude_list:
zout.writestr(item, buffer)
# preserve the old comment
zout.comment = zin.comment
# replace with the new file
self.path.unlink(missing_ok=True)
zout.close() # Required on windows
shutil.move(cast(str, zout.filename), self.path)
except (zipfile.BadZipfile, OSError) as e:
logger.error("Error rebuilding zip file [%s]: %s", e, self.path)
return False
return True
def copy_from_archive(self, other_archive: UnknownArchiver) -> bool:
"""Replace the current zip with one copied from another archive"""
try:
with zipfile.ZipFile(self.path, mode="w", allowZip64=True) as zout:
for filename in other_archive.get_filename_list():
data = other_archive.read_file(filename)
if data is not None:
zout.writestr(filename, data)
# preserve the old comment
comment = other_archive.get_comment()
if comment is not None:
if not self.write_zip_comment(self.path, comment):
return False
except Exception as e:
logger.error("Error while copying to zip archive [%s]: from %s to %s", e, other_archive.path, self.path)
return False
else:
return True
def write_zip_comment(self, filename: pathlib.Path | str, comment: str) -> bool:
"""
This is a custom function for writing a comment to a zip file,
since the built-in one doesn't seem to work on Windows and Mac OS/X
Fortunately, the zip comment is at the end of the file, and it's
easy to manipulate. See this website for more info:
see: http://en.wikipedia.org/wiki/Zip_(file_format)#Structure
"""
# get file size
statinfo = os.stat(filename)
file_length = statinfo.st_size
try:
with open(filename, mode="r+b") as file:
# the starting position, relative to EOF
pos = -4
found = False
# walk backwards to find the "End of Central Directory" record
while (not found) and (-pos != file_length):
# seek, relative to EOF
file.seek(pos, 2)
value = file.read(4)
# look for the end of central directory signature
if bytearray(value) == bytearray([0x50, 0x4B, 0x05, 0x06]):
found = True
else:
# not found, step back another byte
pos = pos - 1
if found:
# now skip forward 20 bytes to the comment length word
pos += 20
file.seek(pos, 2)
# Pack the length of the comment string
fmt = "H" # one 2-byte integer
comment_length = struct.pack(fmt, len(comment)) # pack integer in a binary string
# write out the length
file.write(comment_length)
file.seek(pos + 2, 2)
# write out the comment itself
file.write(comment.encode("utf-8"))
file.truncate()
else:
raise Exception("Could not find the End of Central Directory record!")
except Exception as e:
logger.error("Error writing comment to zip archive [%s]: %s", e, self.path)
return False
else:
return True
class RarArchiver(UnknownArchiver):
"""RAR implementation"""
def __init__(self, path: pathlib.Path | str, rar_exe_path: str = "rar") -> None:
super().__init__(path)
self.rar_exe_path = shutil.which(rar_exe_path) or ""
# windows only, keeps the cmd.exe from popping up
if platform.system() == "Windows":
self.startupinfo = subprocess.STARTUPINFO() # type: ignore
self.startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore
else:
self.startupinfo = None
def get_comment(self) -> str:
rarc = self.get_rar_obj()
return rarc.comment.decode("utf-8") if rarc else ""
def set_comment(self, comment: str) -> bool:
if rar_support and self.rar_exe_path:
try:
# write comment to temp file
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_file = pathlib.Path(tmp_dir) / "rar_comment.txt"
tmp_file.write_text(comment, encoding="utf-8")
working_dir = os.path.dirname(os.path.abspath(self.path))
# use external program to write comment to Rar archive
proc_args = [
self.rar_exe_path,
"c",
f"-w{working_dir}",
"-c-",
f"-z{tmp_file}",
str(self.path),
]
subprocess.run(
proc_args,
startupinfo=self.startupinfo,
stdout=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=True,
)
if platform.system() == "Darwin":
time.sleep(1)
except (subprocess.CalledProcessError, OSError) as e:
logger.exception("Error writing comment to rar archive [%s]: %s", e, self.path)
return False
else:
return True
else:
return False
def read_file(self, archive_file: str) -> bytes:
rarc = self.get_rar_obj()
if rarc is None:
return b""
tries = 0
while tries < 7:
try:
tries = tries + 1
data: bytes = rarc.open(archive_file).read()
entries = [(rarc.getinfo(archive_file), data)]
if entries[0][0].file_size != len(entries[0][1]):
logger.info(
"Error reading rar archive [file is not expected size: %d vs %d] %s :: %s :: tries #%d",
entries[0][0].file_size,
len(entries[0][1]),
self.path,
archive_file,
tries,
)
continue
except OSError as e:
logger.error("Error reading rar archive [%s]: %s :: %s :: tries #%d", e, self.path, archive_file, tries)
time.sleep(1)
except Exception as e:
logger.error(
"Unexpected exception reading rar archive [%s]: %s :: %s :: tries #%d",
e,
self.path,
archive_file,
tries,
)
break
else:
# Success. Entries is a list of of tuples: ( rarinfo, filedata)
if len(entries) == 1:
return entries[0][1]
raise OSError
raise OSError
def remove_file(self, archive_file: str) -> bool:
if self.rar_exe_path:
# use external program to remove file from Rar archive
result = subprocess.run(
[self.rar_exe_path, "d", "-c-", self.path, archive_file],
startupinfo=self.startupinfo,
stdout=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
if platform.system() == "Darwin":
time.sleep(1)
if result.returncode != 0:
logger.error(
"Error removing file from rar archive [exitcode: %d]: %s :: %s",
result.returncode,
self.path,
archive_file,
)
return False
return True
else:
return False
def write_file(self, archive_file: str, data: bytes) -> bool:
if self.rar_exe_path:
archive_path = pathlib.PurePosixPath(archive_file)
archive_name = archive_path.name
archive_parent = str(archive_path.parent).lstrip("./")
# use external program to write file to Rar archive
result = subprocess.run(
[self.rar_exe_path, "a", f"-si{archive_name}", f"-ap{archive_parent}", "-c-", "-ep", self.path],
input=data,
startupinfo=self.startupinfo,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
if platform.system() == "Darwin":
time.sleep(1)
if result.returncode != 0:
logger.error(
"Error writing rar archive [exitcode: %d]: %s :: %s", result.returncode, self.path, archive_file
)
return False
else:
return True
else:
return False
def get_filename_list(self) -> list[str]:
rarc = self.get_rar_obj()
tries = 0
if rar_support and rarc:
while tries < 7:
try:
tries = tries + 1
namelist = []
for item in rarc.infolist():
if item.file_size != 0:
namelist.append(item.filename)
except OSError as e:
logger.error("Error listing files in rar archive [%s]: %s :: attempt #%d", e, self.path, tries)
time.sleep(1)
else:
return namelist
return []
def copy_from_archive(self, other_archive: UnknownArchiver) -> bool:
"""Replace the current archive with one copied from another archive"""
try:
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_path = pathlib.Path(tmp_dir)
rar_cwd = tmp_path / "rar"
rar_cwd.mkdir(exist_ok=True)
rar_path = (tmp_path / self.path.name).with_suffix(".rar")
for filename in other_archive.get_filename_list():
(rar_cwd / filename).parent.mkdir(exist_ok=True, parents=True)
data = other_archive.read_file(filename)
if data is not None:
with open(rar_cwd / filename, mode="w+b") as tmp_file:
tmp_file.write(data)
result = subprocess.run(
[self.rar_exe_path, "a", "-r", "-c-", str(rar_path.absolute()), "."],
cwd=rar_cwd.absolute(),
startupinfo=self.startupinfo,
stdout=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
if result.returncode != 0:
logger.error("Error while copying to rar archive [exitcode: %d]: %s", result.returncode, self.path)
return False
self.path.unlink(missing_ok=True)
shutil.move(rar_path, self.path)
except Exception as e:
logger.exception("Error while copying to rar archive [%s]: from %s to %s", e, other_archive.path, self.path)
return False
else:
return True
def get_rar_obj(self) -> rarfile.RarFile | None:
if rar_support:
try:
rarc = rarfile.RarFile(str(self.path))
except (OSError, rarfile.RarFileError) as e:
logger.error("Unable to get rar object [%s]: %s", e, self.path)
else:
return rarc
return None
class FolderArchiver(UnknownArchiver):
"""Folder implementation"""
def __init__(self, path: pathlib.Path | str) -> None:
super().__init__(path)
self.comment_file_name = "ComicTaggerFolderComment.txt"
def get_comment(self) -> str:
try:
return self.read_file(self.comment_file_name).decode("utf-8")
except OSError:
return ""
def set_comment(self, comment: str) -> bool:
if (self.path / self.comment_file_name).exists() or comment:
return self.write_file(self.comment_file_name, comment.encode("utf-8"))
return True
def read_file(self, archive_file: str) -> bytes:
try:
with open(self.path / archive_file, mode="rb") as f:
data = f.read()
except OSError as e:
logger.error("Error reading folder archive [%s]: %s :: %s", e, self.path, archive_file)
raise
return data
def remove_file(self, archive_file: str) -> bool:
try:
(self.path / archive_file).unlink(missing_ok=True)
except OSError as e:
logger.error("Error removing file for folder archive [%s]: %s :: %s", e, self.path, archive_file)
return False
else:
return True
def write_file(self, archive_file: str, data: bytes) -> bool:
try:
file_path = self.path / archive_file
file_path.parent.mkdir(exist_ok=True, parents=True)
with open(self.path / archive_file, mode="wb") as f:
f.write(data)
except OSError as e:
logger.error("Error writing folder archive [%s]: %s :: %s", e, self.path, archive_file)
return False
else:
return True
def get_filename_list(self) -> list[str]:
filenames = []
try:
for root, _dirs, files in os.walk(self.path):
for f in files:
filenames.append(os.path.relpath(os.path.join(root, f), self.path).replace(os.path.sep, "/"))
return filenames
except OSError as e:
logger.error("Error listing files in folder archive [%s]: %s", e, self.path)
return []
def copy_from_archive(self, other_archive: UnknownArchiver) -> bool:
"""Replace the current zip with one copied from another archive"""
try:
for filename in other_archive.get_filename_list():
data = other_archive.read_file(filename)
if data is not None:
self.write_file(filename, data)
# preserve the old comment
comment = other_archive.get_comment()
if comment is not None:
if not self.set_comment(comment):
return False
except Exception:
logger.exception("Error while copying archive from %s to %s", other_archive.path, self.path)
return False
else:
return True
class ComicArchive:
logo_data = b""
class ArchiveType:
SevenZip, Zip, Rar, Folder, Pdf, Unknown = list(range(6))
def __init__(
self,
path: pathlib.Path | str,
rar_exe_path: str = "rar",
default_image_path: pathlib.Path | str | None = None,
) -> None:
self.cbi_md: GenericMetadata | None = None
@ -693,42 +95,18 @@ class ComicArchive:
self.page_count: int | None = None
self.page_list: list[str] = []
self.rar_exe_path = shutil.which(rar_exe_path or "rar") or ""
self.ci_xml_filename = "ComicInfo.xml"
self.comet_default_filename = "CoMet.xml"
self.reset_cache()
self.default_image_path = default_image_path
# Use file extension to decide which archive test we do first
ext = self.path.suffix
self.archiver: Archiver = UnknownArchiver.open(self.path)
self.archive_type = self.ArchiveType.Unknown
self.archiver = UnknownArchiver(self.path)
if ext in [".cbr", ".rar"]:
if self.rar_test():
self.archive_type = self.ArchiveType.Rar
self.archiver = RarArchiver(self.path, rar_exe_path=self.rar_exe_path)
elif self.zip_test():
self.archive_type = self.ArchiveType.Zip
self.archiver = ZipArchiver(self.path)
else:
if self.sevenzip_test():
self.archive_type = self.ArchiveType.SevenZip
self.archiver = SevenZipArchiver(self.path)
elif self.zip_test():
self.archive_type = self.ArchiveType.Zip
self.archiver = ZipArchiver(self.path)
elif self.rar_test():
self.archive_type = self.ArchiveType.Rar
self.archiver = RarArchiver(self.path, rar_exe_path=self.rar_exe_path)
elif self.folder_test():
self.archive_type = self.ArchiveType.Folder
self.archiver = FolderArchiver(self.path)
load_archive_plugins()
for archiver in archivers:
if archiver.is_valid(self.path):
self.archiver = archiver.open(self.path)
break
if not ComicArchive.logo_data and self.default_image_path:
with open(self.default_image_path, mode="rb") as fd:
@ -760,63 +138,33 @@ class ComicArchive:
self.path = new_path
self.archiver.path = pathlib.Path(path)
def sevenzip_test(self) -> bool:
return z7_support and py7zr.is_7zfile(self.path)
def zip_test(self) -> bool:
return zipfile.is_zipfile(self.path)
def rar_test(self) -> bool:
return rar_support and rarfile.is_rarfile(str(self.path))
def folder_test(self) -> bool:
return self.path.is_dir()
def is_sevenzip(self) -> bool:
return self.archive_type == self.ArchiveType.SevenZip
def is_zip(self) -> bool:
return self.archive_type == self.ArchiveType.Zip
def is_rar(self) -> bool:
return self.archive_type == self.ArchiveType.Rar
def is_pdf(self) -> bool:
return self.archive_type == self.ArchiveType.Pdf
def is_folder(self) -> bool:
return self.archive_type == self.ArchiveType.Folder
def is_writable(self, check_rar_status: bool = True) -> bool:
if self.archive_type == self.ArchiveType.Unknown:
def is_writable(self, check_archive_status: bool = True) -> bool:
if isinstance(self.archiver, UnknownArchiver):
return False
if check_rar_status and self.is_rar() and not self.rar_exe_path:
if check_archive_status and not self.archiver.is_writable():
return False
if not os.access(self.path, os.W_OK):
return False
if (self.archive_type != self.ArchiveType.Folder) and (not os.access(self.path.parent, os.W_OK)):
if not (os.access(self.path, os.W_OK) or os.access(self.path.parent, os.W_OK)):
return False
return True
def is_writable_for_style(self, data_style: int) -> bool:
return not (data_style == MetaDataStyle.CBI and not self.archiver.supports_comment)
if (self.is_rar() or self.is_sevenzip()) and data_style == MetaDataStyle.CBI:
return False
return self.is_writable()
def is_zip(self) -> bool:
return self.archiver.name() == "ZIP"
def seems_to_be_a_comic_archive(self) -> bool:
if (self.is_zip() or self.is_rar() or self.is_sevenzip() or self.is_folder()) and (
self.get_number_of_pages() > 0
):
if not (isinstance(self.archiver, UnknownArchiver)) and self.get_number_of_pages() > 0:
return True
return False
def extension(self) -> str:
return self.archiver.extension()
def read_metadata(self, style: int) -> GenericMetadata:
if style == MetaDataStyle.CIX:
@ -937,7 +285,6 @@ class ComicArchive:
# seems like some archive creators are on Windows, and don't know about case-sensitivity!
if sort_list:
files = cast(list[str], natsort.os_sorted(files))
# make a sub-list of image files
@ -1256,10 +603,10 @@ class ComicArchive:
return metadata
def export_as_zip(self, zip_filename: pathlib.Path | str) -> bool:
if self.archive_type == self.ArchiveType.Zip:
def export_as_zip(self, zip_filename: pathlib.Path) -> bool:
if self.archiver.name() == "ZIP":
# nothing to do, we're already a zip
return True
zip_archiver = ZipArchiver(zip_filename)
zip_archiver = ZipArchiver.open(zip_filename)
return zip_archiver.copy_from_archive(self.archiver)

View File

@ -217,14 +217,7 @@ class CLI:
if self.batch_mode:
brief = f"{ca.path}: "
if ca.is_sevenzip():
brief += "7Z archive "
elif ca.is_zip():
brief += "ZIP archive "
elif ca.is_rar():
brief += "RAR archive "
elif ca.is_folder():
brief += "Folder archive "
brief += ca.archiver.name() + " archive "
brief += f"({page_count: >3} pages)"
brief += " tags:[ "
@ -460,12 +453,7 @@ class CLI:
new_ext = "" # default
if self.options.filename_rename_set_extension_based_on_archive:
if ca.is_sevenzip():
new_ext = ".cb7"
elif ca.is_zip():
new_ext = ".cbz"
elif ca.is_rar():
new_ext = ".cbr"
new_ext = ca.extension()
renamer = FileRenamer(
md,
@ -572,7 +560,7 @@ class CLI:
logger.error("Cannot find %s", filename)
return
ca = ComicArchive(filename, self.options.general_rar_exe_path, str(graphics_path / "nocover.png"))
ca = ComicArchive(filename, str(graphics_path / "nocover.png"))
if not ca.seems_to_be_a_comic_archive():
logger.error("Sorry, but %s is not a comic archive!", filename)

View File

@ -2,13 +2,16 @@ from __future__ import annotations
from comictaggerlib.ctoptions.cmdline import initial_cmd_line_parser, register_commandline, validate_commandline_options
from comictaggerlib.ctoptions.file import register_settings, validate_settings
from comictaggerlib.ctoptions.plugin import register_plugin_settings, validate_plugin_settings
from comictaggerlib.ctoptions.types import ComicTaggerPaths
__all__ = [
"initial_cmd_line_parser",
"register_commandline",
"register_settings",
"register_plugin_settings",
"validate_commandline_options",
"validate_settings",
"validate_plugin_settings",
"ComicTaggerPaths",
]

View File

@ -325,20 +325,16 @@ def validate_commandline_options(options: settngs.Config[settngs.Values], parser
else:
options[0].runtime_file_list = options[0].runtime_files
# take a crack at finding rar exe, if not set already
if options[0].general_rar_exe_path.strip() in ("", "rar"):
# take a crack at finding rar exe if it's not in the path
if not utils.which("rar"):
if platform.system() == "Windows":
# look in some likely places for Windows machines
if os.path.exists(r"C:\Program Files\WinRAR\Rar.exe"):
options[0].general_rar_exe_path = r"C:\Program Files\WinRAR\Rar.exe"
utils.add_to_path(r"C:\Program Files\WinRAR")
elif os.path.exists(r"C:\Program Files (x86)\WinRAR\Rar.exe"):
options[0].general_rar_exe_path = r"C:\Program Files (x86)\WinRAR\Rar.exe"
utils.add_to_path(r"C:\Program Files (x86)\WinRAR")
else:
if os.path.exists("/opt/homebrew/bin"):
utils.add_to_path("/opt/homebrew/bin")
# see if it's in the path of unix user
rarpath = utils.which("rar")
if rarpath is not None:
options[0].general_rar_exe_path = "rar"
return options

View File

@ -12,13 +12,6 @@ from comictaggerlib.defaults import DEFAULT_REPLACEMENTS, Replacement, Replaceme
def general(parser: settngs.Manager) -> None:
# General Settings
parser.add_setting("--rar-exe-path", default="rar", help="The path to the rar program")
parser.add_setting(
"--allow-cbi-in-rar",
default=True,
action=argparse.BooleanOptionalAction,
help="Allows ComicBookLover tags in RAR/CBR files",
)
parser.add_setting("check_for_new_version", default=False, cmdline=False)
parser.add_setting("send_usage_stats", default=False, cmdline=False)
@ -59,7 +52,6 @@ def identifier(parser: settngs.Manager) -> None:
def dialog(parser: settngs.Manager) -> None:
# Show/ask dialog flags
parser.add_setting("ask_about_cbi_in_rar", default=True, cmdline=False)
parser.add_setting("show_disclaimer", default=True, cmdline=False)
parser.add_setting("dont_notify_about_this_version", default="", cmdline=False)
parser.add_setting("ask_about_usage_stats", default=True, cmdline=False)
@ -248,7 +240,7 @@ def autotag(parser: settngs.Manager) -> None:
)
def validate_settings(options: settngs.Config[settngs.Values], parser: settngs.Manager) -> dict[str, dict[str, Any]]:
def validate_settings(options: settngs.Config[settngs.Values]) -> dict[str, dict[str, Any]]:
options[0].identifier_publisher_filter = [x.strip() for x in options[0].identifier_publisher_filter if x.strip()]
options[0].rename_replacements = Replacements(
[Replacement(x[0], x[1], x[2]) for x in options[0].rename_replacements[0]],

View File

@ -0,0 +1,45 @@
from __future__ import annotations
import logging
import os
import settngs
import comicapi.comicarchive
logger = logging.getLogger("comictagger")
def archiver(manager: settngs.Manager) -> None:
exe_registered: set[str] = set()
for archiver in comicapi.comicarchive.archivers:
if archiver.exe and archiver.exe not in exe_registered:
manager.add_setting(
f"--{archiver.exe.replace(' ', '-').replace('_', '-').strip().strip('-')}",
default=archiver.exe,
help="Path to the %(default)s executable\n\n",
)
exe_registered.add(archiver.exe)
def validate_plugin_settings(options: settngs.Config) -> settngs.Config:
if "archiver" not in options[1]:
return options
cfg = settngs.normalize_config(options, file=True, cmdline=True, defaults=False)
for archiver in comicapi.comicarchive.archivers:
exe_name = archiver.exe.replace(" ", "-").replace("_", "-").strip().strip("-").replace("-", "_")
if (
exe_name in cfg[0]["archiver"]
and cfg[0]["archiver"][exe_name]
and cfg[0]["archiver"][exe_name] != archiver.exe
):
if os.path.basename(cfg[0]["archiver"][exe_name]) == archiver.exe:
comicapi.utils.add_to_path(os.path.dirname(cfg[0]["archiver"][exe_name]))
else:
archiver.exe = cfg[0]["archiver"][exe_name]
return options
def register_plugin_settings(manager: settngs.Manager):
manager.add_persistent_group("archiver", archiver, False)

View File

@ -193,7 +193,7 @@ class FileSelectionList(QtWidgets.QWidget):
QtCore.QCoreApplication.processEvents()
first_added = None
rar_added = False
rar_added_ro = False
self.twList.setSortingEnabled(False)
for idx, f in enumerate(filelist):
QtCore.QCoreApplication.processEvents()
@ -206,8 +206,7 @@ class FileSelectionList(QtWidgets.QWidget):
row = self.add_path_item(f)
if row is not None:
ca = self.get_archive_by_row(row)
if ca and ca.is_rar():
rar_added = True
rar_added_ro = bool(ca and ca.archiver.name() == "RAR" and not ca.archiver.is_writable())
if first_added is None:
first_added = row
@ -224,7 +223,7 @@ class FileSelectionList(QtWidgets.QWidget):
else:
QtWidgets.QMessageBox.information(self, "File/Folder Open", "No readable comic archives were found.")
if rar_added and not utils.which(self.options.general_rar_exe_path or "rar"):
if rar_added_ro:
self.rar_ro_message()
self.twList.setSortingEnabled(True)
@ -278,7 +277,7 @@ class FileSelectionList(QtWidgets.QWidget):
if self.is_list_dupe(path):
return self.get_current_list_row(path)
ca = ComicArchive(path, self.options.general_rar_exe_path, str(graphics_path / "nocover.png"))
ca = ComicArchive(path, str(graphics_path / "nocover.png"))
if ca.seems_to_be_a_comic_archive():
row: int = self.twList.rowCount()
@ -339,14 +338,7 @@ class FileSelectionList(QtWidgets.QWidget):
filename_item.setText(item_text)
filename_item.setData(QtCore.Qt.ItemDataRole.ToolTipRole, item_text)
if fi.ca.is_sevenzip():
item_text = "7Z"
elif fi.ca.is_zip():
item_text = "ZIP"
elif fi.ca.is_rar():
item_text = "RAR"
else:
item_text = ""
item_text = fi.ca.archiver.name()
type_item.setText(item_text)
type_item.setData(QtCore.Qt.ItemDataRole.ToolTipRole, item_text)

View File

@ -68,12 +68,16 @@ class App:
def run(self) -> None:
opts = self.initialize()
self.load_plugins()
self.register_options()
self.parse_options(opts.config)
self.initialize_dirs()
self.main()
def load_plugins(self) -> None:
comicapi.comicarchive.load_archive_plugins()
def initialize(self) -> argparse.Namespace:
opts, _ = self.initial_arg_parser.parse_known_args()
assert opts is not None
@ -87,6 +91,7 @@ class App:
)
ctoptions.register_commandline(self.manager)
ctoptions.register_settings(self.manager)
ctoptions.register_plugin_settings(self.manager)
def parse_options(self, config_paths: ctoptions.ComicTaggerPaths) -> None:
self.options, self.config_load_success = self.manager.parse_config(
@ -95,7 +100,8 @@ class App:
self.options = self.manager.get_namespace(self.options)
self.options = ctoptions.validate_commandline_options(self.options, self.manager)
self.options = ctoptions.validate_settings(self.options, self.manager)
self.options = ctoptions.validate_settings(self.options)
self.options = ctoptions.validate_plugin_settings(self.options)
self.options = self.options
def initialize_dirs(self) -> None:
@ -160,6 +166,7 @@ class App:
f"Failed to load settings, check the log located in '{self.options[0].runtime_config.user_log_dir}' for more details",
True,
)
if self.options[0].runtime_no_gui:
if error and error[1]:
print(f"A fatal error occurred please check the log for more information: {error[0]}") # noqa: T201

View File

@ -73,13 +73,8 @@ class RenameWindow(QtWidgets.QDialog):
self.renamer.replacements = self.options[0].rename_replacements
new_ext = ca.path.suffix # default
if self.options[0].rename_set_extension_based_on_archive:
if ca.is_sevenzip():
new_ext = ".cb7"
elif ca.is_zip():
new_ext = ".cbz"
elif ca.is_rar():
new_ext = ".cbr"
if self.options[0].filename_rename_set_extension_based_on_archive:
new_ext = ca.extension()
if md is None:
md = ca.read_metadata(self.data_style)
@ -206,7 +201,7 @@ class RenameWindow(QtWidgets.QDialog):
logger.info("%s: Filename is already good!", comic[1])
continue
if not comic[0].is_writable(check_rar_status=False):
if not comic[0].is_writable(check_archive_status=False):
continue
comic[0].rename(utils.unique_file(full_path))

View File

@ -269,7 +269,10 @@ class SettingsWindow(QtWidgets.QDialog):
def settings_to_form(self) -> None:
# Copy values from settings to form
self.leRarExePath.setText(self.options[0].general_rar_exe_path)
if "archiver" in self.options[1] and "rar" in self.options[1]["archiver"]:
self.leRarExePath.setText(getattr(self.options[0], self.options[1]["archiver"]["rar"].internal_name))
else:
self.leRarExePath.setEnabled(False)
self.sbNameMatchIdentifyThresh.setValue(self.options[0].identifier_series_match_identify_thresh)
self.sbNameMatchSearchThresh.setValue(self.options[0].comicvine_series_match_search_thresh)
self.tePublisherFilter.setPlainText("\n".join(self.options[0].identifier_publisher_filter))
@ -374,11 +377,12 @@ class SettingsWindow(QtWidgets.QDialog):
)
# Copy values from form to settings and save
self.options[0].general_rar_exe_path = str(self.leRarExePath.text())
if "archiver" in self.options[1] and "rar" in self.options[1]["archiver"]:
setattr(self.options[0], self.options[1]["archiver"]["rar"].internal_name, str(self.leRarExePath.text()))
# make sure rar program is now in the path for the rar class
if self.options[0].general_rar_exe_path:
utils.add_to_path(os.path.dirname(self.options[0].general_rar_exe_path))
# make sure rar program is now in the path for the rar class
if self.options[0].archivers_rar:
utils.add_to_path(os.path.dirname(str(self.leRarExePath.text())))
if not str(self.leIssueNumPadding.text()).isdigit():
self.leIssueNumPadding.setText("0")

View File

@ -696,16 +696,7 @@ Have fun!
self.lblFilename.setText(filename)
if ca.is_sevenzip():
self.lblArchiveType.setText("7Z archive")
elif ca.is_zip():
self.lblArchiveType.setText("ZIP archive")
elif ca.is_rar():
self.lblArchiveType.setText("RAR archive")
elif ca.is_folder():
self.lblArchiveType.setText("Folder archive")
else:
self.lblArchiveType.setText("")
self.lblArchiveType.setText(ca.archiver.name() + " archive")
page_count = f" ({ca.get_number_of_pages()} pages)"
self.lblPageCount.setText(page_count)

View File

@ -8,7 +8,7 @@ pycountry
pyicu; sys_platform == 'linux' or sys_platform == 'darwin'
rapidfuzz>=2.12.0
requests==2.*
settngs==0.3.0
settngs==0.5.0
text2digits
typing_extensions
wordninja

View File

@ -59,12 +59,18 @@ setup(
exclude=["tests", "testing"],
),
package_data={"comictaggerlib": ["ui/*", "graphics/*"], "comicapi": ["data/*"]},
entry_points=dict(
console_scripts=["comictagger=comictaggerlib.main:main"],
pyinstaller40=[
entry_points={
"console_scripts": ["comictagger=comictaggerlib.main:main"],
"pyinstaller40": [
"hook-dirs = comictaggerlib.__pyinstaller:get_hook_dirs",
],
),
"comicapi.archiver": [
"zip = comicapi.archivers.zip:ZipArchiver",
"sevenzip = comicapi.archivers.sevenzip:SevenZipArchiver",
"rar = comicapi.archivers.rar:RarArchiver",
"folder = comicapi.archivers.folder:FolderArchiver",
],
},
classifiers=[
"Development Status :: 4 - Beta",
"Environment :: Console",

View File

@ -4,15 +4,17 @@ import platform
import shutil
import pytest
from importlib_metadata import entry_points
import comicapi.comicarchive
import comicapi.genericmetadata
from testing.filenames import datadir
@pytest.mark.xfail(not comicapi.comicarchive.rar_support, reason="rar support")
@pytest.mark.xfail(not comicapi.archivers.rar.rar_support, reason="rar support")
def test_getPageNameList():
c = comicapi.comicarchive.ComicArchive(datadir / "fake_cbr.cbr")
assert c.seems_to_be_a_comic_archive()
pageNameList = c.get_page_name_list()
assert pageNameList == [
@ -56,24 +58,26 @@ def test_save_cbi(tmp_comic):
md = tmp_comic.read_cbi()
@pytest.mark.xfail(not (comicapi.comicarchive.rar_support and shutil.which("rar")), reason="rar support")
@pytest.mark.xfail(not (comicapi.archivers.rar.rar_support and shutil.which("rar")), reason="rar support")
def test_save_cix_rar(tmp_path):
cbr_path = datadir / "fake_cbr.cbr"
shutil.copy(cbr_path, tmp_path)
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_path / cbr_path.name)
assert tmp_comic.seems_to_be_a_comic_archive()
assert tmp_comic.write_cix(comicapi.genericmetadata.md_test)
md = tmp_comic.read_cix()
assert md.replace(pages=[]) == comicapi.genericmetadata.md_test.replace(pages=[])
@pytest.mark.xfail(not (comicapi.comicarchive.rar_support and shutil.which("rar")), reason="rar support")
@pytest.mark.xfail(not (comicapi.archivers.rar.rar_support and shutil.which("rar")), reason="rar support")
def test_save_cbi_rar(tmp_path):
cbr_path = datadir / "fake_cbr.cbr"
shutil.copy(cbr_path, tmp_path)
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_path / cbr_path.name)
assert tmp_comic.seems_to_be_a_comic_archive()
assert tmp_comic.write_cbi(comicapi.genericmetadata.md_test)
md = tmp_comic.read_cbi()
@ -118,16 +122,8 @@ def test_invalid_zip(tmp_comic):
archivers = [
comicapi.comicarchive.ZipArchiver,
comicapi.comicarchive.FolderArchiver,
pytest.param(
comicapi.comicarchive.SevenZipArchiver,
marks=pytest.mark.xfail(not (comicapi.comicarchive.z7_support), reason="7z support"),
),
pytest.param(
comicapi.comicarchive.RarArchiver,
marks=pytest.mark.xfail(not (comicapi.comicarchive.rar_support and shutil.which("rar")), reason="rar support"),
),
pytest.param(x.load(), marks=pytest.mark.xfail(not (x.load().enabled), reason="archiver not enabled"))
for x in entry_points(group="comicapi_archivers")
]
@ -135,7 +131,7 @@ archivers = [
def test_copy_from_archive(archiver, tmp_path, cbz):
comic_path = tmp_path / cbz.path.with_suffix("").name
archive = archiver(comic_path)
archive = archiver.open(comic_path)
assert archive.copy_from_archive(cbz.archiver)