Compare commits

...

2 Commits

Author SHA1 Message Date
529f52c6cc Update comictaggerlib and comicapi for exception handling
Some checks failed
CI / lint (ubuntu-latest, 3.9) (push) Has been cancelled
CI / build-and-test (macos-13, 3.13) (push) Has been cancelled
CI / build-and-test (macos-13, 3.9) (push) Has been cancelled
CI / build-and-test (macos-14, 3.13) (push) Has been cancelled
CI / build-and-test (macos-14, 3.9) (push) Has been cancelled
CI / build-and-test (ubuntu-22.04, 3.13) (push) Has been cancelled
CI / build-and-test (ubuntu-22.04, 3.9) (push) Has been cancelled
CI / build-and-test (ubuntu-22.04-arm, 3.13) (push) Has been cancelled
CI / build-and-test (ubuntu-22.04-arm, 3.9) (push) Has been cancelled
CI / build-and-test (windows-latest, 3.13) (push) Has been cancelled
CI / build-and-test (windows-latest, 3.9) (push) Has been cancelled
2025-07-18 17:36:03 -07:00
2010d1c3c6 Fix Archive and Tag plugin definitions for exception handling 2025-07-18 17:35:32 -07:00
16 changed files with 508 additions and 421 deletions

View File

@ -2,34 +2,34 @@ from __future__ import annotations
import pathlib
from collections.abc import Collection
from typing import Protocol, runtime_checkable
from typing import ClassVar, Protocol, runtime_checkable
@runtime_checkable
class Archiver(Protocol):
"""Archiver Protocol"""
"""The path to the archive"""
path: pathlib.Path
"""The path to the archive"""
exe: ClassVar[str] = ""
"""
The name of the executable used for this archiver. This should be the base name of the executable.
For example if 'rar.exe' is needed this should be "rar".
If an executable is not used this should be the empty string.
"""
exe: str = ""
enabled: ClassVar[bool] = True
"""
Whether or not this archiver is enabled.
If external imports are required and are not available this should be false. See rar.py and sevenzip.py.
"""
enabled: bool = True
hashable: bool = True
"""
If self.path is a single file that can be hashed.
For example directories cannot be hashed.
"""
hashable: bool = True
supported_extensions: Collection[str] = set()
@ -39,21 +39,21 @@ class Archiver(Protocol):
def get_comment(self) -> str:
"""
Returns the comment from the current archive as a string.
Should always return a string. If comments are not supported in the archive the empty string should be returned.
If comments are not supported in the archive the empty string should be returned.
"""
return ""
raise NotImplementedError
def set_comment(self, comment: str) -> bool:
def set_comment(self, comment: str) -> None:
"""
Returns True if the comment was successfully set on the current archive.
Should always return a boolean. If comments are not supported in the archive False should be returned.
Should raise an exception if a comment cannot be set
"""
return False
raise NotImplementedError
def supports_comment(self) -> bool:
"""
Returns True if the current archive supports comments.
Should always return a boolean. If comments are not supported in the archive False should be returned.
Should always return a boolean.
MUST NOT cause an exception.
"""
return False
@ -65,63 +65,59 @@ class Archiver(Protocol):
"""
raise NotImplementedError
def remove_file(self, archive_file: str) -> bool:
def remove_file(self, archive_file: str) -> None:
"""
Removes the named file from the current archive.
archive_file should always come from the output of get_filename_list.
Should always return a boolean. Failures should return False.
archive_file will always come from the output of get_filename_list.
Rebuilding the archive without the named file is a standard way to remove a file.
"""
return False
raise NotImplementedError
def write_file(self, archive_file: str, data: bytes) -> bool:
def write_file(self, archive_file: str, data: bytes) -> None:
"""
Writes the named file to the current archive.
Should always return a boolean. Failures should return False.
"""
return False
raise NotImplementedError
def get_filename_list(self) -> list[str]:
"""
Returns a list of filenames in the current archive.
Should always return a list of string. Failures should return an empty list.
Should always return a list of string.
"""
return []
raise NotImplementedError
def supports_files(self) -> bool:
"""
Returns True if the current archive supports arbitrary non-picture files.
Should always return a boolean.
If arbitrary non-picture files are not supported in the archive False should be returned.
MUST NOT cause an exception.
"""
return False
raise NotImplementedError
def copy_from_archive(self, other_archive: Archiver) -> bool:
def copy_from_archive(self, other_archive: Archiver) -> None:
"""
Copies the contents of another achive to the current archive.
Should always return a boolean. Failures should return False.
"""
return False
raise NotImplementedError
def is_writable(self) -> bool:
"""
Retuns True if the current archive is writeable
Should always return a boolean. Failures should return False.
"""
return False
raise NotImplementedError
def extension(self) -> str:
"""
Returns the extension that this archiver should use eg ".cbz".
Should always return a string. Failures should return the empty string.
MUST NOT cause an exception.
"""
return ""
def name(self) -> str:
"""
Returns the name of this archiver for display purposes eg "CBZ".
Should always return a string. Failures should return the empty string.
MUST NOT cause an exception.
"""
return ""
@ -130,6 +126,7 @@ class Archiver(Protocol):
"""
Returns True if the given path can be opened by this archiver.
Should always return a boolean. Failures should return False.
MUST NOT cause an exception.
"""
return False
@ -138,8 +135,10 @@ class Archiver(Protocol):
"""
Opens the given archive.
Should always return a an Archver.
Should never cause an exception no file operations should take place in this method,
is_valid will always be called before open.
Should validate that file can be opened.
NOTE: is_7zfile from py7zr does not validate that py7zr can open the file
MUST not keep file open.
"""
archiver = cls()
archiver.path = path

View File

@ -25,47 +25,50 @@ class FolderArchiver(Archiver):
except OSError:
return ""
def set_comment(self, comment: str) -> bool:
def set_comment(self, comment: str) -> None:
self._filename_list = []
if comment:
return self.write_file(self.comment_file_name, comment.encode("utf-8"))
(self.path / self.comment_file_name).unlink(missing_ok=True)
return True
try:
if comment:
file_path = self.path / self.comment_file_name
file_path.parent.mkdir(exist_ok=True, parents=True)
file_path.write_text(comment, encoding="utf-8")
else:
(self.path / self.comment_file_name).unlink(missing_ok=True)
except OSError as e:
logger.error(
"Error writing comment for folder archive [%s]: %s :: %s", e, self.path, self.comment_file_name
)
raise OSError(
f"Error writing comment for folder archive [{e}]: {self.path} :: {self.comment_file_name}"
) from e
def supports_comment(self) -> bool:
return True
def read_file(self, archive_file: str) -> bytes:
try:
data = (self.path / archive_file).read_bytes()
return (self.path / archive_file).read_bytes()
except OSError as e:
logger.error("Error reading folder archive [%s]: %s :: %s", e, self.path, archive_file)
raise
raise OSError(f"Error reading folder archive [{e}]: {self.path} :: {archive_file}") from e
return data
def remove_file(self, archive_file: str) -> bool:
def remove_file(self, archive_file: str) -> None:
self._filename_list = []
try:
(self.path / archive_file).unlink(missing_ok=True)
except OSError as e:
logger.error("Error removing file for folder archive [%s]: %s :: %s", e, self.path, archive_file)
return False
else:
return True
raise OSError(f"Error removing file for folder archive [{e}]: {self.path} :: {archive_file}") from e
def write_file(self, archive_file: str, data: bytes) -> bool:
def write_file(self, archive_file: str, data: bytes) -> None:
self._filename_list = []
try:
file_path = self.path / archive_file
file_path.parent.mkdir(exist_ok=True, parents=True)
with open(self.path / archive_file, mode="wb") as f:
f.write(data)
file_path.write_bytes(data)
except OSError as e:
logger.error("Error writing folder archive [%s]: %s :: %s", e, self.path, archive_file)
return False
else:
return True
raise OSError(f"Error writing folder archive [{e}]: {self.path} :: {archive_file}") from e
def get_filename_list(self) -> list[str]:
if self._filename_list:
@ -79,12 +82,12 @@ class FolderArchiver(Archiver):
return filenames
except OSError as e:
logger.error("Error listing files in folder archive [%s]: %s", e, self.path)
return []
raise OSError(f"Error listing files in folder archive [{e}]: {self.path}") from e
def supports_files(self) -> bool:
return True
def copy_from_archive(self, other_archive: Archiver) -> bool:
def copy_from_archive(self, other_archive: Archiver) -> None:
"""Replace the current zip with one copied from another archive"""
self._filename_list = []
try:
@ -94,15 +97,14 @@ class FolderArchiver(Archiver):
self.write_file(filename, data)
# preserve the old comment
comment = other_archive.get_comment()
if comment is not None:
if not self.set_comment(comment):
return False
except Exception:
logger.exception("Error while copying archive from %s to %s", other_archive.path, self.path)
return False
else:
return True
self.set_comment(other_archive.get_comment())
except Exception as e:
logger.exception(
"Error while copying to folder archive [%s]: from %s to %s", e, other_archive.path, self.path
)
raise OSError(
f"Error while copying to folder archive [{e}]: from {str(other_archive)!r} to {str(self.path)!r}"
) from e
def is_writable(self) -> bool:
return True

View File

@ -49,47 +49,47 @@ class RarArchiver(Archiver):
rarc = self.get_rar_obj()
return (rarc.comment if rarc else "") or ""
def set_comment(self, comment: str) -> bool:
def set_comment(self, comment: str) -> None:
self._reset()
if rar_support and self.exe:
try:
# write comment to temp file
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_file = pathlib.Path(tmp_dir) / "rar_comment.txt"
tmp_file.write_text(comment, encoding="utf-8")
if not (rar_support and self.exe):
return
working_dir = os.path.dirname(os.path.abspath(self.path))
try:
# write comment to temp file
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_file = pathlib.Path(tmp_dir) / "rar_comment.txt"
tmp_file.write_text(comment, encoding="utf-8")
# use external program to write comment to Rar archive
proc_args = [
self.exe,
"c",
f"-w{working_dir}",
"-c-",
f"-z{tmp_file}",
str(self.path),
]
result = subprocess.run(
proc_args,
startupinfo=STARTUPINFO,
stdin=subprocess.DEVNULL,
capture_output=True,
encoding="utf-8",
cwd=tmp_dir,
)
if result.returncode != 0:
logger.error(
"Error writing comment to rar archive [exitcode: %d]: %s :: %s",
result.returncode,
self.path,
result.stderr,
)
return False
except OSError as e:
logger.exception("Error writing comment to rar archive [%s]: %s", e, self.path)
return False
return True
return False
working_dir = os.path.dirname(os.path.abspath(self.path))
# use external program to write comment to Rar archive
proc_args = [
self.exe,
"c",
f"-w{working_dir}",
"-c-",
f"-z{tmp_file}",
str(self.path),
]
result = subprocess.run(
proc_args,
startupinfo=STARTUPINFO,
stdin=subprocess.DEVNULL,
capture_output=True,
encoding="utf-8",
cwd=tmp_dir,
)
except Exception as e:
logger.exception("Error writing comment to rar archive [%s]: %s", e, self.path)
raise OSError(f"Error writing comment to rar archive [{e}]: {self.path}")
if result.returncode != 0:
logger.error(
"Error writing comment to rar archive [exitcode: %d]: %s :: %s",
result.returncode,
self.path,
result.stderr,
)
raise OSError(f"Error writing comment to rar archive [exitcode: {result.returncode}]: {self.path}")
def supports_comment(self) -> bool:
return True
@ -100,9 +100,11 @@ class RarArchiver(Archiver):
return b""
tries = 0
error = None
entries = []
while tries < 7:
tries += 1
try:
tries = tries + 1
data: bytes = rarc.open(archive_file).read()
entries = [(rarc.getinfo(archive_file), data)]
@ -115,10 +117,24 @@ class RarArchiver(Archiver):
archive_file,
tries,
)
error = OSError(
'"Error reading rar archive [file is not expected size: {:d} vs {:d}] {} :: {} :: tries #{:d}"'.format(
entries[0][0].file_size,
len(entries[0][1]),
self.path,
archive_file,
tries,
)
)
continue
except OSError as e:
logger.error("Error reading rar archive [%s]: %s :: %s :: tries #%d", e, self.path, archive_file, tries)
logger.error(
"Error reading file from rar archive [%s]: %s :: %s :: tries #%d", e, self.path, archive_file, tries
)
error = OSError(
f"Error reading file from rar archive [{e}]: {self.path} :: {archive_file} :: tries#{tries}"
)
except Exception as e:
logger.error(
"Unexpected exception reading rar archive [%s]: %s :: %s :: tries #%d",
@ -127,22 +143,30 @@ class RarArchiver(Archiver):
archive_file,
tries,
)
break
raise RuntimeError(
f"Unexpected exception reading file from rar archive [{e}]: {self.path} :: {archive_file} :: tries#{tries}"
)
else:
# Success. Entries is a list of of tuples: ( rarinfo, filedata)
if error is None:
# Success, return early. Entries is a list of of tuples: ( rarinfo, filedata)
if len(entries) == 1:
return entries[0][1]
raise OSError(
f"Error reading file from rar archive [File not found]: {self.path} :: {archive_file} :: tries#{tries}"
)
raise OSError
if error is None:
# Somehow we have success but exited the loop
raise RuntimeError("Something failed")
raise error
raise OSError
def remove_file(self, archive_file: str) -> bool:
def remove_file(self, archive_file: str) -> None:
self._reset()
if self.exe:
working_dir = os.path.dirname(os.path.abspath(self.path))
# use external program to remove file from Rar archive
if not self.exe:
return
working_dir = os.path.dirname(os.path.abspath(self.path))
# use external program to remove file from Rar archive
try:
result = subprocess.run(
[self.exe, "d", f"-w{working_dir}", "-c-", self.path, archive_file],
startupinfo=STARTUPINFO,
@ -151,26 +175,35 @@ class RarArchiver(Archiver):
encoding="utf-8",
cwd=self.path.absolute().parent,
)
except Exception as e:
raise OSError(f"Error removing file from rar archive [{e}]: {self.path}:: {archive_file}")
if result.returncode != 0:
logger.error(
"Error removing file from rar archive [exitcode: %d]: %s :: %s",
if result.returncode != 0:
logger.error(
"Error removing file from rar archive [exitcode: %d]: %s :: %s",
result.returncode,
self.path,
archive_file,
)
raise RuntimeError(
"Error removing file from rar archive [exitcode: {:d}]: {} :: {}".format(
result.returncode,
self.path,
archive_file,
)
return False
return True
return False
)
def write_file(self, archive_file: str, data: bytes) -> bool:
def write_file(self, archive_file: str, data: bytes) -> None:
self._reset()
if self.exe:
archive_path = pathlib.PurePosixPath(archive_file)
archive_name = archive_path.name
archive_parent = str(archive_path.parent).lstrip("./")
working_dir = os.path.dirname(os.path.abspath(self.path))
if not self.exe:
return
archive_path = pathlib.PurePosixPath(archive_file)
archive_name = archive_path.name
archive_parent = str(archive_path.parent).lstrip("./")
working_dir = os.path.dirname(os.path.abspath(self.path))
try:
# use external program to write file to Rar archive
result = subprocess.run(
[
@ -188,45 +221,53 @@ class RarArchiver(Archiver):
capture_output=True,
cwd=self.path.absolute().parent,
)
if result.returncode != 0:
logger.error(
"Error writing rar archive [exitcode: %d]: %s :: %s :: %s",
result.returncode,
self.path,
archive_file,
result.stderr,
)
return False
return True
return False
except Exception as e:
raise OSError(f"Error writing file to rar archive [{e}]: {self.path}:: {archive_file}")
if result.returncode != 0:
logger.error(
"Error writing rar archive [exitcode: %d]: %s :: %s :: %s",
result.returncode,
self.path,
archive_file,
result.stderr,
)
raise OSError(
f"Error writing file to rar archive [exitcode: {result.returncode}]: {self.path}:: {archive_file}"
)
def get_filename_list(self) -> list[str]:
if self._filename_list:
return self._filename_list
rarc = self.get_rar_obj()
tries = 0
if rar_support and rarc:
while tries < 7:
try:
tries = tries + 1
namelist = []
for item in rarc.infolist():
if item.file_size != 0:
namelist.append(item.filename)
if not (rar_support and rarc):
return []
except OSError as e:
logger.error("Error listing files in rar archive [%s]: %s :: attempt #%d", e, self.path, tries)
error = None
while tries < 7:
tries += 1
try:
namelist = []
for item in rarc.infolist():
if item.file_size != 0:
namelist.append(item.filename)
else:
self._filename_list = namelist
return namelist
return []
except OSError as e:
logger.error("Error listing files in rar archive [%s]: %s :: attempt #%d", e, self.path, tries)
error = OSError(f"Error listing files in rar archive [{e}]: {self.path} :: tries#{tries}")
else:
self._filename_list = namelist
return self._filename_list
if error is None:
# Somehow we have success but exited the loop
raise RuntimeError("Something failed")
raise error
def supports_files(self) -> bool:
return True
def copy_from_archive(self, other_archive: Archiver) -> bool:
def copy_from_archive(self, other_archive: Archiver) -> None:
"""Replace the current archive with one copied from another archive"""
self._reset()
try:
@ -251,22 +292,22 @@ class RarArchiver(Archiver):
capture_output=True,
encoding="utf-8",
)
if result.returncode != 0:
logger.error(
"Error while copying to rar archive [exitcode: %d]: %s: %s",
result.returncode,
self.path,
result.stderr,
)
return False
self.path.unlink(missing_ok=True)
shutil.move(rar_path, self.path)
except Exception as e:
logger.exception("Error while copying to rar archive [%s]: from %s to %s", e, other_archive.path, self.path)
return False
else:
return True
raise OSError(f"Error listing files in rar archive [{e}]: from {other_archive.path} to {self.path}") from e
if result.returncode != 0:
logger.error(
"Error while copying to rar archive [exitcode: %d]: %s: %s",
result.returncode,
self.path,
result.stderr,
)
raise OSError(
f"Error while copying to rar archive [exitcode: {result.returncode}]: {self.path}: {result.stderr}"
)
@classmethod
@functools.cache

View File

@ -28,46 +28,37 @@ class SevenZipArchiver(Archiver):
super().__init__()
self._filename_list: list[str] = []
# @todo: Implement Comment?
def get_comment(self) -> str:
return ""
def set_comment(self, comment: str) -> bool:
return False
def read_file(self, archive_file: str) -> bytes:
data = b""
try:
with py7zr.SevenZipFile(self.path, "r") as zf:
data = zf.read([archive_file])[archive_file].read()
except (py7zr.Bad7zFile, OSError) as e:
logger.error("Error reading 7zip archive [%s]: %s :: %s", e, self.path, archive_file)
raise
logger.error("Error reading file in 7zip archive [%s]: %s :: %s", e, self.path, archive_file)
raise OSError(f"Error reading file in 7zip archive [{e}]: {self.path} :: {archive_file}") from e
return data
def remove_file(self, archive_file: str) -> bool:
def remove_file(self, archive_file: str) -> None:
self._filename_list = []
return self.rebuild([archive_file])
def write_file(self, archive_file: str, data: bytes) -> bool:
def write_file(self, archive_file: str, data: bytes) -> None:
# At the moment, no other option but to rebuild the whole
# archive w/o the indicated file. Very sucky, but maybe
# another solution can be found
files = self.get_filename_list()
self._filename_list = []
if archive_file in files:
if not self.rebuild([archive_file]):
return False
self.rebuild([archive_file])
try:
# now just add the archive file as a new one
with py7zr.SevenZipFile(self.path, "a") as zf:
zf.writestr(data, archive_file)
return True
except (py7zr.Bad7zFile, OSError) as e:
logger.error("Error writing 7zip archive [%s]: %s :: %s", e, self.path, archive_file)
return False
logger.error("Error writing file in 7zip archive [%s]: %s :: %s", e, self.path, archive_file)
raise OSError(f"Error writing file in 7zip archive [{e}]: {self.path} :: {archive_file}") from e
def get_filename_list(self) -> list[str]:
if self._filename_list:
@ -80,12 +71,12 @@ class SevenZipArchiver(Archiver):
return namelist
except (py7zr.Bad7zFile, OSError) as e:
logger.error("Error listing files in 7zip archive [%s]: %s", e, self.path)
return []
raise OSError(f"Error listing files in 7zip archive [{e}]: {self.path}") from e
def supports_files(self) -> bool:
return True
def rebuild(self, exclude_list: list[str]) -> bool:
def rebuild(self, exclude_list: list[str]) -> None:
"""Zip helper func
This recompresses the zip archive, without the files in the exclude_list
@ -108,11 +99,10 @@ class SevenZipArchiver(Archiver):
shutil.move(tmp_file.name, self.path)
except (py7zr.Bad7zFile, OSError) as e:
logger.error("Error rebuilding 7zip file [%s]: %s", e, self.path)
return False
return True
logger.error("Error rebuilding 7zip archive [%s]: %s", e, self.path)
raise OSError(f"Error rebuilding 7zip archive [{e}]: {self.path}") from e
def copy_from_archive(self, other_archive: Archiver) -> bool:
def copy_from_archive(self, other_archive: Archiver) -> None:
"""Replace the current zip with one copied from another archive"""
self._filename_list = []
try:
@ -125,9 +115,7 @@ class SevenZipArchiver(Archiver):
zout.writestr(data, filename)
except Exception as e:
logger.error("Error while copying to 7zip archive [%s]: from %s to %s", e, other_archive.path, self.path)
return False
else:
return True
raise OSError(f"Error while copying to 7zip archive [{e}]: from {other_archive.path} to {self.path}") from e
def is_writable(self) -> bool:
return True

View File

@ -40,33 +40,35 @@ class ZipArchiver(Archiver):
comment = zf.comment.decode("utf-8", errors="replace")
return comment
def set_comment(self, comment: str) -> bool:
with ZipFile(self.path, mode="a") as zf:
zf.comment = bytes(comment, "utf-8")
return True
def set_comment(self, comment: str) -> None:
try:
with ZipFile(self.path, mode="a") as zf:
zf.comment = bytes(comment, "utf-8")
except Exception as e:
logger.error("Error writing zip comment [%s]: %s", e, self.path)
raise OSError(f"Error writing zip comment [{e}]: {self.path}") from e
def read_file(self, archive_file: str) -> bytes:
with ZipFile(self.path, mode="r") as zf:
try:
data = zf.read(archive_file)
except (zipfile.BadZipfile, OSError) as e:
logger.exception("Error reading zip archive [%s]: %s :: %s", e, self.path, archive_file)
raise
logger.exception("Error reading file in zip archive [%s]: %s :: %s", e, self.path, archive_file)
raise OSError(f"Error reading file in zip archive [{e}]: {self.path} :: {archive_file}") from e
return data
def remove_file(self, archive_file: str) -> bool:
def remove_file(self, archive_file: str) -> None:
files = self.get_filename_list()
self._filename_list = []
try:
with ZipFile(self.path, mode="a", allowZip64=True, compression=zipfile.ZIP_DEFLATED) as zf:
if archive_file in files:
zf.repack([zf.remove(archive_file)])
return True
except (zipfile.BadZipfile, OSError) as e:
logger.error("Error writing zip archive [%s]: %s :: %s", e, self.path, archive_file)
return False
logger.error("Error removing file in zip archive [%s]: %s :: %s", e, self.path, archive_file)
raise OSError(f"Error removing file in zip archive [{e}]: {self.path} :: {archive_file}") from e
def write_file(self, archive_file: str, data: bytes) -> bool:
def write_file(self, archive_file: str, data: bytes) -> None:
files = self.get_filename_list()
self._filename_list = []
@ -76,10 +78,9 @@ class ZipArchiver(Archiver):
if archive_file in files:
zf.repack([zf.remove(archive_file)])
zf.writestr(archive_file, data)
return True
except (zipfile.BadZipfile, OSError) as e:
logger.error("Error writing zip archive [%s]: %s :: %s", e, self.path, archive_file)
return False
raise OSError(f"Error writing zip archive [{e}]: {self.path} :: {archive_file}") from e
def get_filename_list(self) -> list[str]:
if self._filename_list:
@ -90,7 +91,7 @@ class ZipArchiver(Archiver):
return self._filename_list
except (zipfile.BadZipfile, OSError) as e:
logger.error("Error listing files in zip archive [%s]: %s", e, self.path)
return []
raise OSError(f"Error listing files in zip archive [{e}]: {self.path}") from e
def supports_files(self) -> bool:
return True
@ -125,7 +126,7 @@ class ZipArchiver(Archiver):
return False
return True
def copy_from_archive(self, other_archive: Archiver) -> bool:
def copy_from_archive(self, other_archive: Archiver) -> None:
"""Replace the current zip with one copied from another archive"""
self._filename_list = []
try:
@ -136,15 +137,12 @@ class ZipArchiver(Archiver):
zout.writestr(filename, data)
# preserve the old comment
comment = other_archive.get_comment()
if comment is not None:
if not self.set_comment(comment):
return False
self.set_comment(other_archive.get_comment())
except Exception as e:
logger.error("Error while copying to zip archive [%s]: from %s to %s", e, other_archive.path, self.path)
return False
else:
return True
raise OSError(
f"Error while copying to zip archive [{e}]: from {str(other_archive)!r} to {str(self.path)!r}"
) from e
def is_writable(self) -> bool:
return True

View File

@ -122,6 +122,8 @@ def load_tag_plugins(version: str = f"ComicAPI/{version}", local_plugins: Iterab
class ComicArchive:
"""Exceptions from tags/archive should already be logged. Caller must handle display to user and recovery"""
logo_data = b""
pil_available: bool | None = None
@ -208,7 +210,7 @@ class ComicArchive:
return True
def is_zip(self) -> bool:
return self.archiver.name() == "ZIP"
return self.archiver.extension() == ".cbz"
def seems_to_be_a_comic_archive(self) -> bool:
if (
@ -238,15 +240,15 @@ class ComicArchive:
return ""
return tags[tag_id].read_raw_tags(self.archiver)
def write_tags(self, metadata: GenericMetadata, tag_id: str) -> bool:
def write_tags(self, metadata: GenericMetadata, tag_id: str) -> None:
if tag_id in self.md:
del self.md[tag_id]
if not tags[tag_id].enabled:
logger.warning("%s tags not enabled", tags[tag_id].name())
return False
return
self.apply_archive_info_to_metadata(metadata, True, True, hash_archive=self.hash_archive)
return tags[tag_id].write_tags(metadata, self.archiver)
tags[tag_id].write_tags(metadata, self.archiver)
def has_tags(self, tag_id: str) -> bool:
if tag_id in self.md:
@ -255,12 +257,12 @@ class ComicArchive:
return False
return tags[tag_id].has_tags(self.archiver)
def remove_tags(self, tag_id: str) -> bool:
def remove_tags(self, tag_id: str) -> None:
if tag_id in self.md:
del self.md[tag_id]
if not tags[tag_id].enabled:
return False
return tags[tag_id].remove_tags(self.archiver)
return
tags[tag_id].remove_tags(self.archiver)
def get_page(self, index: int) -> bytes:
image_data = b""
@ -464,10 +466,17 @@ class ComicArchive:
metadata.is_empty = False
return metadata
def export_as_zip(self, zip_filename: pathlib.Path) -> bool:
if self.archiver.name() == "ZIP":
# nothing to do, we're already a zip
return True
def export_as(self, new_filename: pathlib.Path, extension: str = ".zip") -> None:
"""
Unconditionally creates a new file. Does not check the current archive.
zip_archiver = ZipArchiver.open(zip_filename)
return zip_archiver.copy_from_archive(self.archiver)
If extension cannot be find reverts to .zip
"""
zip_archiver = UnknownArchiver.open(new_filename)
for archiver in archivers:
if extension in archiver.supported_extensions:
zip_archiver = archiver.open(new_filename)
if isinstance(zip_archiver, UnknownArchiver):
zip_archiver = ZipArchiver.open(new_filename)
zip_archiver.copy_from_archive(self.archiver)

View File

@ -94,11 +94,12 @@ class ComicRack(Tag):
and self.file in archive.get_filename_list()
and self._validate_bytes(archive.read_file(self.file))
)
except Exception:
return False
except Exception as e:
raise RuntimeError(f"Failed to Read {self.id} tags from {archive.path}({archive.name()})") from e
def remove_tags(self, archive: Archiver) -> bool:
return self.has_tags(archive) and archive.remove_file(self.file)
def remove_tags(self, archive: Archiver) -> None:
if self.has_tags(archive):
archive.remove_file(self.file)
def read_tags(self, archive: Archiver) -> GenericMetadata:
if self.has_tags(archive):
@ -106,8 +107,8 @@ class ComicRack(Tag):
metadata = archive.read_file(self.file) or b""
if self._validate_bytes(metadata):
return self._metadata_from_bytes(metadata)
except Exception:
...
except Exception as e:
raise RuntimeError(f"Failed to Read {self.id} tags from {archive.path}({archive.name()})") from e
return GenericMetadata()
def read_raw_tags(self, archive: Archiver) -> str:
@ -116,22 +117,20 @@ class ComicRack(Tag):
b = archive.read_file(self.file)
# ET.fromstring is used as xml can declare the encoding
return ET.tostring(ET.fromstring(b), encoding="unicode", xml_declaration=True)
except Exception:
...
except Exception as e:
raise RuntimeError(f"Failed to Read {self.id} tags from {archive.path}({archive.name()})") from e
return ""
def write_tags(self, metadata: GenericMetadata, archive: Archiver) -> bool:
def write_tags(self, metadata: GenericMetadata, archive: Archiver) -> None:
if self.supports_tags(archive):
xml = b""
try: # read_file can cause an exception
if self.has_tags(archive):
xml = archive.read_file(self.file)
return archive.write_file(self.file, self._bytes_from_metadata(metadata, xml))
except Exception:
...
else:
logger.warning("Archive %s(%s) does not support '%s' metadata", archive.path, archive.name(), self.name())
return False
except Exception as e:
raise RuntimeError(f"Failed to write {self.id} tags to {archive.path}({archive.name()})") from e
logger.warning("Archive %s(%s) does not support '%s' metadata", archive.path, archive.name(), self.name())
def name(self) -> str:
return "Comic Rack"

View File

@ -1,126 +1,144 @@
from __future__ import annotations
from typing import ClassVar
from comicapi.archivers import Archiver
from comicapi.genericmetadata import GenericMetadata
class Tag:
id: ClassVar[str] = ""
"""
ID form this tag format.
Currently known used IDs are cr, cix, comet, cbi, metroninfo and acbf.
You can use an existing ID to override it's behaiour. It is not recommended to do so.
"""
enabled: bool = False
id: str = ""
"""When set to False it will be excluded from selection in ComicTagger"""
supported_attributes: set[str] = {
"data_origin",
"issue_id",
"series_id",
"original_hash",
"series",
"series_aliases",
"issue",
"issue_count",
"title",
"title_aliases",
"volume",
"volume_count",
"genres",
"description",
"notes",
"alternate_series",
"alternate_number",
"alternate_count",
"gtin",
"story_arcs",
"series_groups",
"publisher",
"imprint",
"day",
"month",
"year",
"language",
"country",
"web_link",
"format",
"manga",
"black_and_white",
"maturity_rating",
"critical_rating",
"scan_info",
"tags",
"pages",
"pages.type",
"pages.bookmark",
"pages.double_page",
"pages.image_index",
"pages.size",
"pages.height",
"pages.width",
"page_count",
"characters",
"teams",
"locations",
"credits",
"credits.person",
"credits.role",
"credits.primary",
"credits.language",
"price",
"is_version_of",
"rights",
"identifier",
"last_mark",
}
"""Set of GenericMetadata attributes this tag format can handle"""
version: str
"""Current version of ComicTagger"""
def __init__(self, version: str) -> None:
self.version: str = version
self.supported_attributes = {
"data_origin",
"issue_id",
"series_id",
"original_hash",
"series",
"series_aliases",
"issue",
"issue_count",
"title",
"title_aliases",
"volume",
"volume_count",
"genres",
"description",
"notes",
"alternate_series",
"alternate_number",
"alternate_count",
"gtin",
"story_arcs",
"series_groups",
"publisher",
"imprint",
"day",
"month",
"year",
"language",
"country",
"web_link",
"format",
"manga",
"black_and_white",
"maturity_rating",
"critical_rating",
"scan_info",
"tags",
"pages",
"pages.type",
"pages.bookmark",
"pages.double_page",
"pages.image_index",
"pages.size",
"pages.height",
"pages.width",
"page_count",
"characters",
"teams",
"locations",
"credits",
"credits.person",
"credits.role",
"credits.primary",
"credits.language",
"price",
"is_version_of",
"rights",
"identifier",
"last_mark",
}
def supports_credit_role(self, role: str) -> bool:
"""
Return True if this tag format can handle this credit role.
Should always return a bool.
MUST NOT cause an exception.
"""
self.supported_attributes
return False
def supports_tags(self, archive: Archiver) -> bool:
"""
Checks the given archive for the ability to save these tags.
Should always return a bool. Failures should return False.
Should always return a bool.
Typically consists of a call to either `archive.supports_comment` or `archive.supports_file`
"""
return False
raise NotImplementedError
def has_tags(self, archive: Archiver) -> bool:
"""
Checks the given archive for tags.
Should always return a bool. Failures should return False.
Should always return a bool.
"""
return False
raise NotImplementedError
def remove_tags(self, archive: Archiver) -> bool:
def remove_tags(self, archive: Archiver) -> None:
"""
Removes the tags from the given archive.
Should always return a bool. Failures should return False.
"""
return False
raise NotImplementedError
def read_tags(self, archive: Archiver) -> GenericMetadata:
"""
Returns a GenericMetadata representing the tags saved in the given archive.
Should always return a GenericMetadata. Failures should return an empty metadata object.
"""
return GenericMetadata()
raise NotImplementedError
def read_raw_tags(self, archive: Archiver) -> str:
"""
Returns the raw tags as a string.
If the tags are a binary format a roughly similar text format should be used.
Should always return a string. Failures should return the empty string.
"""
return ""
raise NotImplementedError
def write_tags(self, metadata: GenericMetadata, archive: Archiver) -> bool:
def write_tags(self, metadata: GenericMetadata, archive: Archiver) -> None:
"""
Saves the given metadata to the given archive.
Should always return a bool. Failures should return False.
Should always return a bool
"""
return False
raise NotImplementedError
def name(self) -> str:
"""
Returns the name of these tags for display purposes eg "Comic Rack".
Should always return a string. Failures should return the empty string.
Should always return a string.
MUST NOT cause an exception.
"""
return ""

View File

@ -265,14 +265,15 @@ class AutoTagMatchWindow(QtWidgets.QDialog):
QtWidgets.QApplication.setOverrideCursor(QtGui.QCursor(QtCore.Qt.CursorShape.WaitCursor))
md = prepare_metadata(md, ct_md, self.config)
for tag_id in self._tags:
success = ca.write_tags(md, tag_id)
QtWidgets.QApplication.restoreOverrideCursor()
if not success:
try:
ca.write_tags(md, tag_id)
except Exception as e:
QtWidgets.QMessageBox.warning(
self,
"Write Error",
f"Saving {tags[tag_id].name()} the tags to the archive seemed to fail!",
f"Saving {tags[tag_id].name()} the tags to the archive seemed to fail! {e}",
)
break
QtWidgets.QApplication.restoreOverrideCursor()
ca.reset_cache()

View File

@ -147,8 +147,10 @@ class CLI:
if not self.config.Runtime_Options__dryrun:
for tag_id in self.config.Runtime_Options__tags_write:
# write out the new data
if not ca.write_tags(md, tag_id):
logger.error("The tag save seemed to fail for: %s!", tags[tag_id].name())
try:
ca.write_tags(md, tag_id)
except Exception:
# Error is already displayed in the log
return False
self.output("Save complete.")
@ -352,19 +354,21 @@ class CLI:
def delete_tags(self, ca: ComicArchive, tag_id: str) -> Status:
tag_name = tags[tag_id].name()
if ca.has_tags(tag_id):
if not self.config.Runtime_Options__dryrun:
if ca.remove_tags(tag_id):
self.output(f"{ca.path}: Removed {tag_name} tags.")
return Status.success
else:
self.output(f"{ca.path}: Tag removal seemed to fail!")
return Status.write_failure
else:
self.output(f"{ca.path}: dry-run. {tag_name} tags not removed")
return Status.success
self.output(f"{ca.path}: This archive doesn't have {tag_name} tags to remove.")
return Status.success
if not ca.has_tags(tag_id):
self.output(f"{ca.path}: This archive doesn't have {tag_name} tags to remove.")
return Status.success
if self.config.Runtime_Options__dryrun:
self.output(f"{ca.path}: dry-run. {tag_name} tags would be removed")
return Status.success
try:
ca.remove_tags(tag_id)
self.output(f"{ca.path}: Removed {tag_name} tags.")
return Status.success
except Exception:
self.output(f"{ca.path}: Tag removal seemed to fail!")
return Status.write_failure
def delete(self, ca: ComicArchive) -> Result:
res = Result(Action.delete, Status.success, ca.path)
@ -386,18 +390,21 @@ class CLI:
self.output(f"{ca.path}: Destination and source are same: {dst_tag_name}. Nothing to do.")
return Status.existing_tags
if not self.config.Runtime_Options__dryrun:
if self.config.Metadata_Options__apply_transform_on_bulk_operation and dst_tag_id == "cbi":
md = CBLTransformer(md, self.config).apply()
if ca.write_tags(md, dst_tag_id):
self.output(f"{ca.path}: Copied {source_names} tags to {dst_tag_name}.")
else:
self.output(f"{ca.path}: Tag copy seemed to fail!")
return Status.write_failure
else:
if self.config.Runtime_Options__dryrun:
self.output(f"{ca.path}: dry-run. {source_names} tags not copied")
return Status.success
return Status.success
if self.config.Metadata_Options__apply_transform_on_bulk_operation and dst_tag_id == "cbi":
md = CBLTransformer(md, self.config).apply()
try:
ca.write_tags(md, dst_tag_id)
self.output(f"{ca.path}: Copied {source_names} tags to {dst_tag_name}.")
return Status.success
except Exception:
self.output(f"{ca.path}: Tag copy seemed to fail!")
return Status.write_failure
def copy(self, ca: ComicArchive) -> Result:
res = Result(Action.copy, Status.success, ca.path)
@ -770,25 +777,25 @@ class CLI:
delete_success = False
export_success = False
if not self.config.Runtime_Options__dryrun:
if ca.export_as_zip(new_file):
export_success = True
if self.config.Runtime_Options__delete_original:
try:
filename_path.unlink(missing_ok=True)
delete_success = True
except OSError:
logger.exception("%sError deleting original archive after export", msg_hdr)
else:
# last export failed, so remove the zip, if it exists
new_file.unlink(missing_ok=True)
else:
if self.config.Runtime_Options__dryrun:
msg = msg_hdr + f"Dry-run: Would try to create {os.path.split(new_file)[1]}"
if self.config.Runtime_Options__delete_original:
msg += " and delete original."
self.output(msg)
return Result(Action.export, Status.success, ca.path, new_file)
try:
ca.export_as(new_file)
export_success = True
if self.config.Runtime_Options__delete_original:
try:
filename_path.unlink(missing_ok=False)
delete_success = True
except OSError:
logger.exception("%sError deleting original archive after export", msg_hdr)
except Exception:
new_file.unlink(missing_ok=True)
msg = msg_hdr
if export_success:
msg += f"Archive exported successfully to: {os.path.split(new_file)[1]}"

View File

@ -533,7 +533,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
def repackage_archive(self) -> None:
ca_list = self.fileSelectionList.get_selected_archive_list()
non_zip_count = 0
to_zip = []
to_zip: list[ComicArchive] = []
largest_page_size = 0
for ca in ca_list:
largest_page_size = max(largest_page_size, len(ca.get_page_name_list()))
@ -581,7 +581,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
new_archives_to_add = []
archives_to_remove = []
skipped_list = []
failed_list = []
failed_list: list[Exception] = []
success_count = 0
logger.debug("Exporting %d comics to zip", len(to_zip))
@ -607,7 +607,8 @@ class TaggerWindow(QtWidgets.QMainWindow):
if export:
logger.debug("Exporting %s to %s", ca.path, export_name)
if ca.export_as_zip(export_name):
try:
ca.export_as(export_name)
success_count += 1
if EW.addToList:
new_archives_to_add.append(str(export_name))
@ -615,9 +616,8 @@ class TaggerWindow(QtWidgets.QMainWindow):
archives_to_remove.append(ca)
ca.path.unlink(missing_ok=True)
else:
# last export failed, so remove the zip, if it exists
failed_list.append(ca.path)
except Exception as e:
failed_list.append(OSError(f"Failed to export {ca.path} to {export_name}: {e}"))
if export_name.exists():
export_name.unlink(missing_ok=True)
@ -633,11 +633,9 @@ class TaggerWindow(QtWidgets.QMainWindow):
for f in skipped_list:
summary += f"\t{f}\n"
if failed_list:
summary += (
f"\n\nThe following {len(failed_list)} archive(s) failed to export due to read/write errors:\n"
)
for f in failed_list:
summary += f"\t{f}\n"
summary += f"\n\nThe following {len(failed_list)} archive(s) failed to export:\n"
for ex in failed_list:
summary += f"\t{ex}\n"
logger.info(summary)
dlg = LogWindow(self)
@ -1198,8 +1196,9 @@ class TaggerWindow(QtWidgets.QMainWindow):
failed_tag: str = ""
# Save each tag
for tag_id in self.selected_write_tags:
success = self.comic_archive.write_tags(self.metadata, tag_id)
if not success:
try:
self.comic_archive.write_tags(self.metadata, tag_id)
except Exception:
failed_tag = tags[tag_id].name()
break
@ -1225,7 +1224,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
"Read Failed!",
f"One or more of the selected read tags failed to load for {self.comic_archive.path}, check log for details",
)
logger.error("Failed to load metadata for %s: %s", self.ca.path, error)
logger.error("Failed to load metadata for %s: %s", self.comic_archive.path, error)
self.fileSelectionList.update_current_row()
self.update_ui_for_archive()
@ -1611,7 +1610,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
progdialog.setMinimumDuration(300)
center_window_on_parent(progdialog)
failed_list = []
failed_list: list[Exception] = []
success_count = 0
for prog_idx, ca in enumerate(ca_list, 1):
if prog_idx % 10 == 0:
@ -1622,10 +1621,13 @@ class TaggerWindow(QtWidgets.QMainWindow):
progdialog.setLabelText(str(ca.path))
for tag_id in tag_ids:
if ca.has_tags(tag_id) and ca.is_writable():
if ca.remove_tags(tag_id):
try:
ca.remove_tags(tag_id)
success_count += 1
else:
failed_list.append(ca.path)
except Exception as e:
failed_list.append(
OSError(f"Failed to remove {tags[tag_id].name()} from {ca.path}: {e}")
)
# Abandon any further tag removals to prevent any greater damage to archive
break
ca.reset_cache()
@ -1640,8 +1642,8 @@ class TaggerWindow(QtWidgets.QMainWindow):
summary = f"Successfully removed {success_count} tags in archive(s)."
if failed_list:
summary += f"\n\nThe remove operation failed in the following {len(failed_list)} archive(s):\n"
for f in failed_list:
summary += f"\t{f}\n"
for ex in failed_list:
summary += f"\t{ex}\n"
dlg = LogWindow(self)
dlg.set_text(summary)
@ -1686,13 +1688,13 @@ class TaggerWindow(QtWidgets.QMainWindow):
return
if has_src_count != 0:
src_tags = ", ".join([tags[tag_id].name() for tag_id in src_tag_ids])
dst_tags = ", ".join([tags[tag_id].name() for tag_id in dest_tag_ids])
reply = QtWidgets.QMessageBox.question(
self,
"Copy Tags",
f"Are you sure you wish to copy the combined (with overlay order) tags of "
f"{', '.join([tags[tag_id].name() for tag_id in src_tag_ids])} "
f"to {', '.join([tags[tag_id].name() for tag_id in dest_tag_ids])} tags in "
f"{has_src_count} archive(s)?",
f"{src_tags} to {dst_tags} tags in {has_src_count} archive(s)?",
QtWidgets.QMessageBox.StandardButton.Yes,
QtWidgets.QMessageBox.StandardButton.No,
)
@ -1705,7 +1707,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
center_window_on_parent(prog_dialog)
QtCore.QCoreApplication.processEvents()
failed_list = []
failed_list: list[Exception] = []
success_count = 0
for prog_idx, ca in enumerate(ca_list, 1):
if prog_idx % 10 == 0:
@ -1713,7 +1715,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
ca_saved = False
md, error = self.read_selected_tags(src_tag_ids, ca)
if error is not None:
failed_list.append(ca.path)
failed_list.append(error)
continue
if md.is_empty:
continue
@ -1730,12 +1732,15 @@ class TaggerWindow(QtWidgets.QMainWindow):
if tag_id == "cbi" and self.config[0].Metadata_Options__apply_transform_on_bulk_operation:
md = CBLTransformer(md, self.config[0]).apply()
if ca.write_tags(md, tag_id):
try:
ca.write_tags(md, tag_id)
if not ca_saved:
success_count += 1
ca_saved = True
else:
failed_list.append(ca.path)
except Exception as e:
failed_list.append(
OSError(f"Failed to copy {src_tags} to {dst_tags} tags for {ca.path}: {e}")
)
ca.reset_cache()
ca.load_cache({*self.selected_read_tags, *self.selected_write_tags})
@ -1749,8 +1754,8 @@ class TaggerWindow(QtWidgets.QMainWindow):
summary = f"Successfully copied tags in {success_count} archive(s)."
if failed_list:
summary += f"\n\nThe copy operation failed in the following {len(failed_list)} archive(s):\n"
for f in failed_list:
summary += f"\t{f}\n"
for ex in failed_list:
summary += f"\t{ex}\n"
dlg = LogWindow(self)
dlg.set_text(summary)
@ -1778,7 +1783,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
"Aborting...",
f"One or more of the read tags failed to load for {ca.path}. Aborting to prevent any possible further damage. Check log for details.",
)
logger.error("Failed to load tags from %s: %s", self.ca.path, error)
logger.error("Failed to load tags from %s: %s", ca.path, error)
return False, match_results
if md.is_empty:
@ -1929,9 +1934,11 @@ class TaggerWindow(QtWidgets.QMainWindow):
def write_Tags() -> bool:
for tag_id in self.selected_write_tags:
# write out the new data
if not ca.write_tags(md, tag_id):
try:
ca.write_tags(md, tag_id)
except Exception as e:
self.auto_tag_log(
f"{tags[tag_id].name()} save failed! Aborting any additional tag saves.\n"
f"{tags[tag_id].name()} save failed! {e}\nAborting any additional tag saves.\n"
)
return False
return True

View File

@ -44,7 +44,8 @@ def test_read_tags(cbz, md_saved):
assert md == md_saved
def test_write_cr(tmp_comic):
def test_write_cr(tmp_comic_path):
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
md = tmp_comic.read_tags("cr")
md.apply_default_page_list(tmp_comic.get_page_name_list())
@ -70,7 +71,8 @@ def test_save_cr_rar(tmp_path, md_saved):
assert md == md_saved
def test_page_type_write(tmp_comic):
def test_page_type_write(tmp_comic_path):
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
md = tmp_comic.read_tags("cr")
t = md.pages[0]
t.type = ""
@ -80,12 +82,13 @@ def test_page_type_write(tmp_comic):
md = tmp_comic.read_tags("cr")
def test_invalid_zip(tmp_comic: comicapi.comicarchive.ComicArchive):
with open(tmp_comic.path, mode="b+r") as f:
def test_invalid_zip(tmp_comic_path):
with open(tmp_comic_path, mode="b+r") as f:
# Corrupting the first file only breaks the first file. If it is never read then no exception will be raised
f.seek(-10, os.SEEK_END) # seek to a probably bad place in th Central Directory and write some bytes
f.write(b"PK\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000")
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
result = tmp_comic.write_tags(comicapi.genericmetadata.md_test, "cr") # This is not the first file
assert result
assert not tmp_comic.seems_to_be_a_comic_archive() # Calls archiver.is_valid
@ -121,7 +124,8 @@ def test_copy_from_archive(archiver, tmp_path, cbz, md_saved):
assert md == md_saved
def test_rename(tmp_comic, tmp_path):
def test_rename(tmp_comic_path, tmp_path):
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
old_path = tmp_comic.path
tmp_comic.rename(tmp_path / "test.cbz")
assert not old_path.exists()
@ -129,8 +133,9 @@ def test_rename(tmp_comic, tmp_path):
assert tmp_comic.path != old_path
def test_rename_ro_dest(tmp_comic, tmp_path):
old_path = tmp_comic.path
def test_rename_ro_dest(tmp_comic_path, tmp_path):
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
dest = tmp_path / "tmp"
dest.mkdir(mode=0o000)
with pytest.raises(OSError):
@ -138,6 +143,6 @@ def test_rename_ro_dest(tmp_comic, tmp_path):
raise OSError("Windows sucks")
tmp_comic.rename(dest / "test.cbz")
dest.chmod(mode=0o777)
assert old_path.exists()
assert tmp_comic_path.exists()
assert tmp_comic.path.exists()
assert tmp_comic.path == old_path
assert tmp_comic.path == tmp_comic_path

View File

@ -3,6 +3,7 @@ from __future__ import annotations
import copy
import datetime
import io
import pathlib
import shutil
import unittest.mock
from argparse import Namespace
@ -33,17 +34,20 @@ except ImportError:
@pytest.fixture
def cbz():
yield comicapi.comicarchive.ComicArchive(filenames.cbz_path)
yield comicapi.comicarchive.ComicArchive(
str(filenames.cbz_path)
) # When testing these always refer to a file on a filesystem
@pytest.fixture
def tmp_comic(tmp_path):
shutil.copy(filenames.cbz_path, tmp_path)
yield comicapi.comicarchive.ComicArchive(tmp_path / filenames.cbz_path.name)
def tmp_comic_path(tmp_path: pathlib.Path):
shutil.copy(str(filenames.cbz_path), str(tmp_path)) # When testing these always refer to a file on a filesystem
yield (tmp_path / filenames.cbz_path.name)
@pytest.fixture
def cbz_double_cover(tmp_path, tmp_comic):
def cbz_double_cover(tmp_path, tmp_comic_path):
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
cover = Image.open(io.BytesIO(tmp_comic.get_page(0)))
other_page = Image.open(io.BytesIO(tmp_comic.get_page(tmp_comic.get_number_of_pages() - 1)))
@ -53,7 +57,6 @@ def cbz_double_cover(tmp_path, tmp_comic):
double_cover.paste(cover, (cover.width, 0))
tmp_comic.archiver.write_file("double_cover.jpg", double_cover.tobytes("jpeg", "RGB"))
yield tmp_comic
@pytest.fixture(autouse=True)

View File

@ -12,10 +12,11 @@ from comictalker.comictalker import ComicTalker
def test_save(
plugin_config: tuple[settngs.Config[ctsettings.ct_ns], dict[str, ComicTalker]],
tmp_comic,
tmp_comic_path,
md_saved,
mock_now,
) -> None:
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
# Overwrite the series so it has definitely changed
tmp_comic.write_tags(md_saved.replace(series="nothing"), "cr")
@ -24,9 +25,6 @@ def test_save(
# Check that it changed
assert md != md_saved
# Clear the cached tags
tmp_comic.reset_cache()
# Setup the app
config = plugin_config[0]
talkers = plugin_config[1]
@ -37,7 +35,7 @@ def test_save(
# Check online, should be intercepted by comicvine_api
config[0].Auto_Tag__online = True
# Use the temporary comic we created
config[0].Runtime_Options__files = [tmp_comic.path]
config[0].Runtime_Options__files = [tmp_comic_path]
# Read and save ComicRack tags
config[0].Runtime_Options__tags_read = ["cr"]
config[0].Runtime_Options__tags_write = ["cr"]
@ -46,6 +44,9 @@ def test_save(
# Run ComicTagger
CLI(config[0], talkers).run()
# tmp_comic is invalid it can't handle outside changes so we need a new one
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
# Read the CBZ
md = tmp_comic.read_tags("cr")
@ -68,18 +69,16 @@ def test_save(
def test_delete(
plugin_config: tuple[settngs.Config[ctsettings.ct_ns], dict[str, ComicTalker]],
tmp_comic,
tmp_comic_path,
md_saved,
mock_now,
) -> None:
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
md = tmp_comic.read_tags("cr")
# Check that the metadata starts correct
assert md == md_saved
# Clear the cached metadata
tmp_comic.reset_cache()
# Setup the app
config = plugin_config[0]
talkers = plugin_config[1]
@ -88,12 +87,15 @@ def test_delete(
config[0].Commands__command = comictaggerlib.resulttypes.Action.delete
# Use the temporary comic we created
config[0].Runtime_Options__files = [tmp_comic.path]
config[0].Runtime_Options__files = [tmp_comic_path]
# Delete ComicRack tags
config[0].Runtime_Options__tags_write = ["cr"]
# Run ComicTagger
CLI(config[0], talkers).run()
# tmp_comic is invalid it can't handle outside changes so we need a new one
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
# Read the CBZ
md = tmp_comic.read_tags("cr")
@ -106,10 +108,11 @@ def test_delete(
def test_rename(
plugin_config: tuple[settngs.Config[ctsettings.ct_ns], dict[str, ComicTalker]],
tmp_comic,
tmp_comic_path,
md_saved,
mock_now,
) -> None:
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
md = tmp_comic.read_tags("cr")
# Check that the metadata starts correct
@ -126,7 +129,7 @@ def test_rename(
config[0].Commands__command = comictaggerlib.resulttypes.Action.rename
# Use the temporary comic we created
config[0].Runtime_Options__files = [tmp_comic.path]
config[0].Runtime_Options__files = [tmp_comic_path]
# Set the template
config[0].File_Rename__template = "{series}"
@ -135,8 +138,11 @@ def test_rename(
# Run ComicTagger
CLI(config[0], talkers).run()
# tmp_comic is invalid it can't handle outside changes so we need a new one
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
# Update the comic path
tmp_comic.path = tmp_comic.path.parent / (md.series + ".cbz")
tmp_comic.path = tmp_comic.path.parent / ((md.series or "comic") + ".cbz")
# Read the CBZ
md = tmp_comic.read_tags("cr")

View File

@ -5,6 +5,7 @@ import io
import pytest
from PIL import Image
import comicapi.comicarchive
import comictaggerlib.imagehasher
import comictaggerlib.issueidentifier
import testing.comicdata
@ -13,15 +14,16 @@ from comicapi.genericmetadata import ImageHash
from comictaggerlib.resulttypes import IssueResult
def test_crop(cbz_double_cover, config, tmp_path, comicvine_api):
def test_crop(cbz_double_cover, config, tmp_path, comicvine_api, tmp_comic_path):
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
config, definitions = config
ii = comictaggerlib.issueidentifier.IssueIdentifier(cbz_double_cover, config, comicvine_api)
ii = comictaggerlib.issueidentifier.IssueIdentifier(tmp_comic, config, comicvine_api)
im = Image.open(io.BytesIO(cbz_double_cover.archiver.read_file("double_cover.jpg")))
im = Image.open(io.BytesIO(tmp_comic.archiver.read_file("double_cover.jpg")))
cropped = ii._crop_double_page(im)
original = cbz_double_cover.get_page(0)
original = tmp_comic.get_page(0)
original_hash = comictaggerlib.imagehasher.ImageHasher(data=original).average_hash()
cropped_hash = comictaggerlib.imagehasher.ImageHasher(image=cropped).average_hash()

View File

@ -5,6 +5,7 @@ from importlib_metadata import entry_points
import comicapi.genericmetadata
import testing.comicdata
from comicapi.archivers.zip import ZipArchiver
from comictaggerlib.md import prepare_metadata
tags = []
@ -20,11 +21,12 @@ if not tags:
@pytest.mark.parametrize("tag_type", tags)
def test_metadata(mock_version, tmp_comic, md_saved, tag_type):
def test_metadata(mock_version, tmp_comic_path, md_saved, tag_type):
archiver = ZipArchiver.open(tmp_comic_path)
tag = tag_type(mock_version[0])
supported_attributes = tag.supported_attributes
tag.write_tags(comicapi.genericmetadata.md_test, tmp_comic.archiver)
written_metadata = tag.read_tags(tmp_comic.archiver)
tag.write_tags(comicapi.genericmetadata.md_test, archiver)
written_metadata = tag.read_tags(archiver)
md = md_saved._get_clean_metadata(*supported_attributes)
# Hack back in the pages variable because CoMet supports identifying the cover by the filename