Compare commits
2 Commits
4ec2e2d11b
...
529f52c6cc
Author | SHA1 | Date | |
---|---|---|---|
529f52c6cc | |||
2010d1c3c6 |
@ -2,34 +2,34 @@ from __future__ import annotations
|
||||
|
||||
import pathlib
|
||||
from collections.abc import Collection
|
||||
from typing import Protocol, runtime_checkable
|
||||
from typing import ClassVar, Protocol, runtime_checkable
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class Archiver(Protocol):
|
||||
"""Archiver Protocol"""
|
||||
|
||||
"""The path to the archive"""
|
||||
path: pathlib.Path
|
||||
"""The path to the archive"""
|
||||
|
||||
exe: ClassVar[str] = ""
|
||||
"""
|
||||
The name of the executable used for this archiver. This should be the base name of the executable.
|
||||
For example if 'rar.exe' is needed this should be "rar".
|
||||
If an executable is not used this should be the empty string.
|
||||
"""
|
||||
exe: str = ""
|
||||
|
||||
enabled: ClassVar[bool] = True
|
||||
"""
|
||||
Whether or not this archiver is enabled.
|
||||
If external imports are required and are not available this should be false. See rar.py and sevenzip.py.
|
||||
"""
|
||||
enabled: bool = True
|
||||
|
||||
hashable: bool = True
|
||||
"""
|
||||
If self.path is a single file that can be hashed.
|
||||
For example directories cannot be hashed.
|
||||
"""
|
||||
hashable: bool = True
|
||||
|
||||
supported_extensions: Collection[str] = set()
|
||||
|
||||
@ -39,21 +39,21 @@ class Archiver(Protocol):
|
||||
def get_comment(self) -> str:
|
||||
"""
|
||||
Returns the comment from the current archive as a string.
|
||||
Should always return a string. If comments are not supported in the archive the empty string should be returned.
|
||||
If comments are not supported in the archive the empty string should be returned.
|
||||
"""
|
||||
return ""
|
||||
raise NotImplementedError
|
||||
|
||||
def set_comment(self, comment: str) -> bool:
|
||||
def set_comment(self, comment: str) -> None:
|
||||
"""
|
||||
Returns True if the comment was successfully set on the current archive.
|
||||
Should always return a boolean. If comments are not supported in the archive False should be returned.
|
||||
Should raise an exception if a comment cannot be set
|
||||
"""
|
||||
return False
|
||||
raise NotImplementedError
|
||||
|
||||
def supports_comment(self) -> bool:
|
||||
"""
|
||||
Returns True if the current archive supports comments.
|
||||
Should always return a boolean. If comments are not supported in the archive False should be returned.
|
||||
Should always return a boolean.
|
||||
MUST NOT cause an exception.
|
||||
"""
|
||||
return False
|
||||
|
||||
@ -65,63 +65,59 @@ class Archiver(Protocol):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def remove_file(self, archive_file: str) -> bool:
|
||||
def remove_file(self, archive_file: str) -> None:
|
||||
"""
|
||||
Removes the named file from the current archive.
|
||||
archive_file should always come from the output of get_filename_list.
|
||||
Should always return a boolean. Failures should return False.
|
||||
archive_file will always come from the output of get_filename_list.
|
||||
|
||||
Rebuilding the archive without the named file is a standard way to remove a file.
|
||||
"""
|
||||
return False
|
||||
raise NotImplementedError
|
||||
|
||||
def write_file(self, archive_file: str, data: bytes) -> bool:
|
||||
def write_file(self, archive_file: str, data: bytes) -> None:
|
||||
"""
|
||||
Writes the named file to the current archive.
|
||||
Should always return a boolean. Failures should return False.
|
||||
"""
|
||||
return False
|
||||
raise NotImplementedError
|
||||
|
||||
def get_filename_list(self) -> list[str]:
|
||||
"""
|
||||
Returns a list of filenames in the current archive.
|
||||
Should always return a list of string. Failures should return an empty list.
|
||||
Should always return a list of string.
|
||||
"""
|
||||
return []
|
||||
raise NotImplementedError
|
||||
|
||||
def supports_files(self) -> bool:
|
||||
"""
|
||||
Returns True if the current archive supports arbitrary non-picture files.
|
||||
Should always return a boolean.
|
||||
If arbitrary non-picture files are not supported in the archive False should be returned.
|
||||
MUST NOT cause an exception.
|
||||
"""
|
||||
return False
|
||||
raise NotImplementedError
|
||||
|
||||
def copy_from_archive(self, other_archive: Archiver) -> bool:
|
||||
def copy_from_archive(self, other_archive: Archiver) -> None:
|
||||
"""
|
||||
Copies the contents of another achive to the current archive.
|
||||
Should always return a boolean. Failures should return False.
|
||||
"""
|
||||
return False
|
||||
raise NotImplementedError
|
||||
|
||||
def is_writable(self) -> bool:
|
||||
"""
|
||||
Retuns True if the current archive is writeable
|
||||
Should always return a boolean. Failures should return False.
|
||||
"""
|
||||
return False
|
||||
raise NotImplementedError
|
||||
|
||||
def extension(self) -> str:
|
||||
"""
|
||||
Returns the extension that this archiver should use eg ".cbz".
|
||||
Should always return a string. Failures should return the empty string.
|
||||
MUST NOT cause an exception.
|
||||
"""
|
||||
return ""
|
||||
|
||||
def name(self) -> str:
|
||||
"""
|
||||
Returns the name of this archiver for display purposes eg "CBZ".
|
||||
Should always return a string. Failures should return the empty string.
|
||||
MUST NOT cause an exception.
|
||||
"""
|
||||
return ""
|
||||
|
||||
@ -130,6 +126,7 @@ class Archiver(Protocol):
|
||||
"""
|
||||
Returns True if the given path can be opened by this archiver.
|
||||
Should always return a boolean. Failures should return False.
|
||||
MUST NOT cause an exception.
|
||||
"""
|
||||
return False
|
||||
|
||||
@ -138,8 +135,10 @@ class Archiver(Protocol):
|
||||
"""
|
||||
Opens the given archive.
|
||||
Should always return a an Archver.
|
||||
Should never cause an exception no file operations should take place in this method,
|
||||
is_valid will always be called before open.
|
||||
Should validate that file can be opened.
|
||||
NOTE: is_7zfile from py7zr does not validate that py7zr can open the file
|
||||
MUST not keep file open.
|
||||
"""
|
||||
archiver = cls()
|
||||
archiver.path = path
|
||||
|
@ -25,47 +25,50 @@ class FolderArchiver(Archiver):
|
||||
except OSError:
|
||||
return ""
|
||||
|
||||
def set_comment(self, comment: str) -> bool:
|
||||
def set_comment(self, comment: str) -> None:
|
||||
self._filename_list = []
|
||||
if comment:
|
||||
return self.write_file(self.comment_file_name, comment.encode("utf-8"))
|
||||
(self.path / self.comment_file_name).unlink(missing_ok=True)
|
||||
return True
|
||||
try:
|
||||
if comment:
|
||||
file_path = self.path / self.comment_file_name
|
||||
file_path.parent.mkdir(exist_ok=True, parents=True)
|
||||
file_path.write_text(comment, encoding="utf-8")
|
||||
else:
|
||||
(self.path / self.comment_file_name).unlink(missing_ok=True)
|
||||
except OSError as e:
|
||||
logger.error(
|
||||
"Error writing comment for folder archive [%s]: %s :: %s", e, self.path, self.comment_file_name
|
||||
)
|
||||
raise OSError(
|
||||
f"Error writing comment for folder archive [{e}]: {self.path} :: {self.comment_file_name}"
|
||||
) from e
|
||||
|
||||
def supports_comment(self) -> bool:
|
||||
return True
|
||||
|
||||
def read_file(self, archive_file: str) -> bytes:
|
||||
try:
|
||||
data = (self.path / archive_file).read_bytes()
|
||||
return (self.path / archive_file).read_bytes()
|
||||
except OSError as e:
|
||||
logger.error("Error reading folder archive [%s]: %s :: %s", e, self.path, archive_file)
|
||||
raise
|
||||
raise OSError(f"Error reading folder archive [{e}]: {self.path} :: {archive_file}") from e
|
||||
|
||||
return data
|
||||
|
||||
def remove_file(self, archive_file: str) -> bool:
|
||||
def remove_file(self, archive_file: str) -> None:
|
||||
self._filename_list = []
|
||||
try:
|
||||
(self.path / archive_file).unlink(missing_ok=True)
|
||||
except OSError as e:
|
||||
logger.error("Error removing file for folder archive [%s]: %s :: %s", e, self.path, archive_file)
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
raise OSError(f"Error removing file for folder archive [{e}]: {self.path} :: {archive_file}") from e
|
||||
|
||||
def write_file(self, archive_file: str, data: bytes) -> bool:
|
||||
def write_file(self, archive_file: str, data: bytes) -> None:
|
||||
self._filename_list = []
|
||||
try:
|
||||
file_path = self.path / archive_file
|
||||
file_path.parent.mkdir(exist_ok=True, parents=True)
|
||||
with open(self.path / archive_file, mode="wb") as f:
|
||||
f.write(data)
|
||||
file_path.write_bytes(data)
|
||||
except OSError as e:
|
||||
logger.error("Error writing folder archive [%s]: %s :: %s", e, self.path, archive_file)
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
raise OSError(f"Error writing folder archive [{e}]: {self.path} :: {archive_file}") from e
|
||||
|
||||
def get_filename_list(self) -> list[str]:
|
||||
if self._filename_list:
|
||||
@ -79,12 +82,12 @@ class FolderArchiver(Archiver):
|
||||
return filenames
|
||||
except OSError as e:
|
||||
logger.error("Error listing files in folder archive [%s]: %s", e, self.path)
|
||||
return []
|
||||
raise OSError(f"Error listing files in folder archive [{e}]: {self.path}") from e
|
||||
|
||||
def supports_files(self) -> bool:
|
||||
return True
|
||||
|
||||
def copy_from_archive(self, other_archive: Archiver) -> bool:
|
||||
def copy_from_archive(self, other_archive: Archiver) -> None:
|
||||
"""Replace the current zip with one copied from another archive"""
|
||||
self._filename_list = []
|
||||
try:
|
||||
@ -94,15 +97,14 @@ class FolderArchiver(Archiver):
|
||||
self.write_file(filename, data)
|
||||
|
||||
# preserve the old comment
|
||||
comment = other_archive.get_comment()
|
||||
if comment is not None:
|
||||
if not self.set_comment(comment):
|
||||
return False
|
||||
except Exception:
|
||||
logger.exception("Error while copying archive from %s to %s", other_archive.path, self.path)
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
self.set_comment(other_archive.get_comment())
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"Error while copying to folder archive [%s]: from %s to %s", e, other_archive.path, self.path
|
||||
)
|
||||
raise OSError(
|
||||
f"Error while copying to folder archive [{e}]: from {str(other_archive)!r} to {str(self.path)!r}"
|
||||
) from e
|
||||
|
||||
def is_writable(self) -> bool:
|
||||
return True
|
||||
|
@ -49,47 +49,47 @@ class RarArchiver(Archiver):
|
||||
rarc = self.get_rar_obj()
|
||||
return (rarc.comment if rarc else "") or ""
|
||||
|
||||
def set_comment(self, comment: str) -> bool:
|
||||
def set_comment(self, comment: str) -> None:
|
||||
self._reset()
|
||||
if rar_support and self.exe:
|
||||
try:
|
||||
# write comment to temp file
|
||||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||||
tmp_file = pathlib.Path(tmp_dir) / "rar_comment.txt"
|
||||
tmp_file.write_text(comment, encoding="utf-8")
|
||||
if not (rar_support and self.exe):
|
||||
return
|
||||
|
||||
working_dir = os.path.dirname(os.path.abspath(self.path))
|
||||
try:
|
||||
# write comment to temp file
|
||||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||||
tmp_file = pathlib.Path(tmp_dir) / "rar_comment.txt"
|
||||
tmp_file.write_text(comment, encoding="utf-8")
|
||||
|
||||
# use external program to write comment to Rar archive
|
||||
proc_args = [
|
||||
self.exe,
|
||||
"c",
|
||||
f"-w{working_dir}",
|
||||
"-c-",
|
||||
f"-z{tmp_file}",
|
||||
str(self.path),
|
||||
]
|
||||
result = subprocess.run(
|
||||
proc_args,
|
||||
startupinfo=STARTUPINFO,
|
||||
stdin=subprocess.DEVNULL,
|
||||
capture_output=True,
|
||||
encoding="utf-8",
|
||||
cwd=tmp_dir,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
logger.error(
|
||||
"Error writing comment to rar archive [exitcode: %d]: %s :: %s",
|
||||
result.returncode,
|
||||
self.path,
|
||||
result.stderr,
|
||||
)
|
||||
return False
|
||||
except OSError as e:
|
||||
logger.exception("Error writing comment to rar archive [%s]: %s", e, self.path)
|
||||
return False
|
||||
return True
|
||||
return False
|
||||
working_dir = os.path.dirname(os.path.abspath(self.path))
|
||||
|
||||
# use external program to write comment to Rar archive
|
||||
proc_args = [
|
||||
self.exe,
|
||||
"c",
|
||||
f"-w{working_dir}",
|
||||
"-c-",
|
||||
f"-z{tmp_file}",
|
||||
str(self.path),
|
||||
]
|
||||
result = subprocess.run(
|
||||
proc_args,
|
||||
startupinfo=STARTUPINFO,
|
||||
stdin=subprocess.DEVNULL,
|
||||
capture_output=True,
|
||||
encoding="utf-8",
|
||||
cwd=tmp_dir,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception("Error writing comment to rar archive [%s]: %s", e, self.path)
|
||||
raise OSError(f"Error writing comment to rar archive [{e}]: {self.path}")
|
||||
if result.returncode != 0:
|
||||
logger.error(
|
||||
"Error writing comment to rar archive [exitcode: %d]: %s :: %s",
|
||||
result.returncode,
|
||||
self.path,
|
||||
result.stderr,
|
||||
)
|
||||
raise OSError(f"Error writing comment to rar archive [exitcode: {result.returncode}]: {self.path}")
|
||||
|
||||
def supports_comment(self) -> bool:
|
||||
return True
|
||||
@ -100,9 +100,11 @@ class RarArchiver(Archiver):
|
||||
return b""
|
||||
|
||||
tries = 0
|
||||
error = None
|
||||
entries = []
|
||||
while tries < 7:
|
||||
tries += 1
|
||||
try:
|
||||
tries = tries + 1
|
||||
data: bytes = rarc.open(archive_file).read()
|
||||
entries = [(rarc.getinfo(archive_file), data)]
|
||||
|
||||
@ -115,10 +117,24 @@ class RarArchiver(Archiver):
|
||||
archive_file,
|
||||
tries,
|
||||
)
|
||||
error = OSError(
|
||||
'"Error reading rar archive [file is not expected size: {:d} vs {:d}] {} :: {} :: tries #{:d}"'.format(
|
||||
entries[0][0].file_size,
|
||||
len(entries[0][1]),
|
||||
self.path,
|
||||
archive_file,
|
||||
tries,
|
||||
)
|
||||
)
|
||||
continue
|
||||
|
||||
except OSError as e:
|
||||
logger.error("Error reading rar archive [%s]: %s :: %s :: tries #%d", e, self.path, archive_file, tries)
|
||||
logger.error(
|
||||
"Error reading file from rar archive [%s]: %s :: %s :: tries #%d", e, self.path, archive_file, tries
|
||||
)
|
||||
error = OSError(
|
||||
f"Error reading file from rar archive [{e}]: {self.path} :: {archive_file} :: tries#{tries}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Unexpected exception reading rar archive [%s]: %s :: %s :: tries #%d",
|
||||
@ -127,22 +143,30 @@ class RarArchiver(Archiver):
|
||||
archive_file,
|
||||
tries,
|
||||
)
|
||||
break
|
||||
raise RuntimeError(
|
||||
f"Unexpected exception reading file from rar archive [{e}]: {self.path} :: {archive_file} :: tries#{tries}"
|
||||
)
|
||||
|
||||
else:
|
||||
# Success. Entries is a list of of tuples: ( rarinfo, filedata)
|
||||
if error is None:
|
||||
# Success, return early. Entries is a list of of tuples: ( rarinfo, filedata)
|
||||
if len(entries) == 1:
|
||||
return entries[0][1]
|
||||
raise OSError(
|
||||
f"Error reading file from rar archive [File not found]: {self.path} :: {archive_file} :: tries#{tries}"
|
||||
)
|
||||
|
||||
raise OSError
|
||||
if error is None:
|
||||
# Somehow we have success but exited the loop
|
||||
raise RuntimeError("Something failed")
|
||||
raise error
|
||||
|
||||
raise OSError
|
||||
|
||||
def remove_file(self, archive_file: str) -> bool:
|
||||
def remove_file(self, archive_file: str) -> None:
|
||||
self._reset()
|
||||
if self.exe:
|
||||
working_dir = os.path.dirname(os.path.abspath(self.path))
|
||||
# use external program to remove file from Rar archive
|
||||
if not self.exe:
|
||||
return
|
||||
working_dir = os.path.dirname(os.path.abspath(self.path))
|
||||
# use external program to remove file from Rar archive
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[self.exe, "d", f"-w{working_dir}", "-c-", self.path, archive_file],
|
||||
startupinfo=STARTUPINFO,
|
||||
@ -151,26 +175,35 @@ class RarArchiver(Archiver):
|
||||
encoding="utf-8",
|
||||
cwd=self.path.absolute().parent,
|
||||
)
|
||||
except Exception as e:
|
||||
raise OSError(f"Error removing file from rar archive [{e}]: {self.path}:: {archive_file}")
|
||||
|
||||
if result.returncode != 0:
|
||||
logger.error(
|
||||
"Error removing file from rar archive [exitcode: %d]: %s :: %s",
|
||||
if result.returncode != 0:
|
||||
logger.error(
|
||||
"Error removing file from rar archive [exitcode: %d]: %s :: %s",
|
||||
result.returncode,
|
||||
self.path,
|
||||
archive_file,
|
||||
)
|
||||
raise RuntimeError(
|
||||
"Error removing file from rar archive [exitcode: {:d}]: {} :: {}".format(
|
||||
result.returncode,
|
||||
self.path,
|
||||
archive_file,
|
||||
)
|
||||
return False
|
||||
return True
|
||||
return False
|
||||
)
|
||||
|
||||
def write_file(self, archive_file: str, data: bytes) -> bool:
|
||||
def write_file(self, archive_file: str, data: bytes) -> None:
|
||||
self._reset()
|
||||
if self.exe:
|
||||
archive_path = pathlib.PurePosixPath(archive_file)
|
||||
archive_name = archive_path.name
|
||||
archive_parent = str(archive_path.parent).lstrip("./")
|
||||
working_dir = os.path.dirname(os.path.abspath(self.path))
|
||||
if not self.exe:
|
||||
return
|
||||
|
||||
archive_path = pathlib.PurePosixPath(archive_file)
|
||||
archive_name = archive_path.name
|
||||
archive_parent = str(archive_path.parent).lstrip("./")
|
||||
working_dir = os.path.dirname(os.path.abspath(self.path))
|
||||
|
||||
try:
|
||||
# use external program to write file to Rar archive
|
||||
result = subprocess.run(
|
||||
[
|
||||
@ -188,45 +221,53 @@ class RarArchiver(Archiver):
|
||||
capture_output=True,
|
||||
cwd=self.path.absolute().parent,
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
logger.error(
|
||||
"Error writing rar archive [exitcode: %d]: %s :: %s :: %s",
|
||||
result.returncode,
|
||||
self.path,
|
||||
archive_file,
|
||||
result.stderr,
|
||||
)
|
||||
return False
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
raise OSError(f"Error writing file to rar archive [{e}]: {self.path}:: {archive_file}")
|
||||
if result.returncode != 0:
|
||||
logger.error(
|
||||
"Error writing rar archive [exitcode: %d]: %s :: %s :: %s",
|
||||
result.returncode,
|
||||
self.path,
|
||||
archive_file,
|
||||
result.stderr,
|
||||
)
|
||||
raise OSError(
|
||||
f"Error writing file to rar archive [exitcode: {result.returncode}]: {self.path}:: {archive_file}"
|
||||
)
|
||||
|
||||
def get_filename_list(self) -> list[str]:
|
||||
if self._filename_list:
|
||||
return self._filename_list
|
||||
rarc = self.get_rar_obj()
|
||||
tries = 0
|
||||
if rar_support and rarc:
|
||||
while tries < 7:
|
||||
try:
|
||||
tries = tries + 1
|
||||
namelist = []
|
||||
for item in rarc.infolist():
|
||||
if item.file_size != 0:
|
||||
namelist.append(item.filename)
|
||||
if not (rar_support and rarc):
|
||||
return []
|
||||
|
||||
except OSError as e:
|
||||
logger.error("Error listing files in rar archive [%s]: %s :: attempt #%d", e, self.path, tries)
|
||||
error = None
|
||||
while tries < 7:
|
||||
tries += 1
|
||||
try:
|
||||
namelist = []
|
||||
for item in rarc.infolist():
|
||||
if item.file_size != 0:
|
||||
namelist.append(item.filename)
|
||||
|
||||
else:
|
||||
self._filename_list = namelist
|
||||
return namelist
|
||||
return []
|
||||
except OSError as e:
|
||||
logger.error("Error listing files in rar archive [%s]: %s :: attempt #%d", e, self.path, tries)
|
||||
error = OSError(f"Error listing files in rar archive [{e}]: {self.path} :: tries#{tries}")
|
||||
else:
|
||||
self._filename_list = namelist
|
||||
return self._filename_list
|
||||
|
||||
if error is None:
|
||||
# Somehow we have success but exited the loop
|
||||
raise RuntimeError("Something failed")
|
||||
raise error
|
||||
|
||||
def supports_files(self) -> bool:
|
||||
return True
|
||||
|
||||
def copy_from_archive(self, other_archive: Archiver) -> bool:
|
||||
def copy_from_archive(self, other_archive: Archiver) -> None:
|
||||
"""Replace the current archive with one copied from another archive"""
|
||||
self._reset()
|
||||
try:
|
||||
@ -251,22 +292,22 @@ class RarArchiver(Archiver):
|
||||
capture_output=True,
|
||||
encoding="utf-8",
|
||||
)
|
||||
if result.returncode != 0:
|
||||
logger.error(
|
||||
"Error while copying to rar archive [exitcode: %d]: %s: %s",
|
||||
result.returncode,
|
||||
self.path,
|
||||
result.stderr,
|
||||
)
|
||||
return False
|
||||
|
||||
self.path.unlink(missing_ok=True)
|
||||
shutil.move(rar_path, self.path)
|
||||
except Exception as e:
|
||||
logger.exception("Error while copying to rar archive [%s]: from %s to %s", e, other_archive.path, self.path)
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
raise OSError(f"Error listing files in rar archive [{e}]: from {other_archive.path} to {self.path}") from e
|
||||
if result.returncode != 0:
|
||||
logger.error(
|
||||
"Error while copying to rar archive [exitcode: %d]: %s: %s",
|
||||
result.returncode,
|
||||
self.path,
|
||||
result.stderr,
|
||||
)
|
||||
raise OSError(
|
||||
f"Error while copying to rar archive [exitcode: {result.returncode}]: {self.path}: {result.stderr}"
|
||||
)
|
||||
|
||||
@classmethod
|
||||
@functools.cache
|
||||
|
@ -28,46 +28,37 @@ class SevenZipArchiver(Archiver):
|
||||
super().__init__()
|
||||
self._filename_list: list[str] = []
|
||||
|
||||
# @todo: Implement Comment?
|
||||
def get_comment(self) -> str:
|
||||
return ""
|
||||
|
||||
def set_comment(self, comment: str) -> bool:
|
||||
return False
|
||||
|
||||
def read_file(self, archive_file: str) -> bytes:
|
||||
data = b""
|
||||
try:
|
||||
with py7zr.SevenZipFile(self.path, "r") as zf:
|
||||
data = zf.read([archive_file])[archive_file].read()
|
||||
except (py7zr.Bad7zFile, OSError) as e:
|
||||
logger.error("Error reading 7zip archive [%s]: %s :: %s", e, self.path, archive_file)
|
||||
raise
|
||||
logger.error("Error reading file in 7zip archive [%s]: %s :: %s", e, self.path, archive_file)
|
||||
raise OSError(f"Error reading file in 7zip archive [{e}]: {self.path} :: {archive_file}") from e
|
||||
|
||||
return data
|
||||
|
||||
def remove_file(self, archive_file: str) -> bool:
|
||||
def remove_file(self, archive_file: str) -> None:
|
||||
self._filename_list = []
|
||||
return self.rebuild([archive_file])
|
||||
|
||||
def write_file(self, archive_file: str, data: bytes) -> bool:
|
||||
def write_file(self, archive_file: str, data: bytes) -> None:
|
||||
# At the moment, no other option but to rebuild the whole
|
||||
# archive w/o the indicated file. Very sucky, but maybe
|
||||
# another solution can be found
|
||||
files = self.get_filename_list()
|
||||
self._filename_list = []
|
||||
if archive_file in files:
|
||||
if not self.rebuild([archive_file]):
|
||||
return False
|
||||
self.rebuild([archive_file])
|
||||
|
||||
try:
|
||||
# now just add the archive file as a new one
|
||||
with py7zr.SevenZipFile(self.path, "a") as zf:
|
||||
zf.writestr(data, archive_file)
|
||||
return True
|
||||
except (py7zr.Bad7zFile, OSError) as e:
|
||||
logger.error("Error writing 7zip archive [%s]: %s :: %s", e, self.path, archive_file)
|
||||
return False
|
||||
logger.error("Error writing file in 7zip archive [%s]: %s :: %s", e, self.path, archive_file)
|
||||
raise OSError(f"Error writing file in 7zip archive [{e}]: {self.path} :: {archive_file}") from e
|
||||
|
||||
def get_filename_list(self) -> list[str]:
|
||||
if self._filename_list:
|
||||
@ -80,12 +71,12 @@ class SevenZipArchiver(Archiver):
|
||||
return namelist
|
||||
except (py7zr.Bad7zFile, OSError) as e:
|
||||
logger.error("Error listing files in 7zip archive [%s]: %s", e, self.path)
|
||||
return []
|
||||
raise OSError(f"Error listing files in 7zip archive [{e}]: {self.path}") from e
|
||||
|
||||
def supports_files(self) -> bool:
|
||||
return True
|
||||
|
||||
def rebuild(self, exclude_list: list[str]) -> bool:
|
||||
def rebuild(self, exclude_list: list[str]) -> None:
|
||||
"""Zip helper func
|
||||
|
||||
This recompresses the zip archive, without the files in the exclude_list
|
||||
@ -108,11 +99,10 @@ class SevenZipArchiver(Archiver):
|
||||
|
||||
shutil.move(tmp_file.name, self.path)
|
||||
except (py7zr.Bad7zFile, OSError) as e:
|
||||
logger.error("Error rebuilding 7zip file [%s]: %s", e, self.path)
|
||||
return False
|
||||
return True
|
||||
logger.error("Error rebuilding 7zip archive [%s]: %s", e, self.path)
|
||||
raise OSError(f"Error rebuilding 7zip archive [{e}]: {self.path}") from e
|
||||
|
||||
def copy_from_archive(self, other_archive: Archiver) -> bool:
|
||||
def copy_from_archive(self, other_archive: Archiver) -> None:
|
||||
"""Replace the current zip with one copied from another archive"""
|
||||
self._filename_list = []
|
||||
try:
|
||||
@ -125,9 +115,7 @@ class SevenZipArchiver(Archiver):
|
||||
zout.writestr(data, filename)
|
||||
except Exception as e:
|
||||
logger.error("Error while copying to 7zip archive [%s]: from %s to %s", e, other_archive.path, self.path)
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
raise OSError(f"Error while copying to 7zip archive [{e}]: from {other_archive.path} to {self.path}") from e
|
||||
|
||||
def is_writable(self) -> bool:
|
||||
return True
|
||||
|
@ -40,33 +40,35 @@ class ZipArchiver(Archiver):
|
||||
comment = zf.comment.decode("utf-8", errors="replace")
|
||||
return comment
|
||||
|
||||
def set_comment(self, comment: str) -> bool:
|
||||
with ZipFile(self.path, mode="a") as zf:
|
||||
zf.comment = bytes(comment, "utf-8")
|
||||
return True
|
||||
def set_comment(self, comment: str) -> None:
|
||||
try:
|
||||
with ZipFile(self.path, mode="a") as zf:
|
||||
zf.comment = bytes(comment, "utf-8")
|
||||
except Exception as e:
|
||||
logger.error("Error writing zip comment [%s]: %s", e, self.path)
|
||||
raise OSError(f"Error writing zip comment [{e}]: {self.path}") from e
|
||||
|
||||
def read_file(self, archive_file: str) -> bytes:
|
||||
with ZipFile(self.path, mode="r") as zf:
|
||||
try:
|
||||
data = zf.read(archive_file)
|
||||
except (zipfile.BadZipfile, OSError) as e:
|
||||
logger.exception("Error reading zip archive [%s]: %s :: %s", e, self.path, archive_file)
|
||||
raise
|
||||
logger.exception("Error reading file in zip archive [%s]: %s :: %s", e, self.path, archive_file)
|
||||
raise OSError(f"Error reading file in zip archive [{e}]: {self.path} :: {archive_file}") from e
|
||||
return data
|
||||
|
||||
def remove_file(self, archive_file: str) -> bool:
|
||||
def remove_file(self, archive_file: str) -> None:
|
||||
files = self.get_filename_list()
|
||||
self._filename_list = []
|
||||
try:
|
||||
with ZipFile(self.path, mode="a", allowZip64=True, compression=zipfile.ZIP_DEFLATED) as zf:
|
||||
if archive_file in files:
|
||||
zf.repack([zf.remove(archive_file)])
|
||||
return True
|
||||
except (zipfile.BadZipfile, OSError) as e:
|
||||
logger.error("Error writing zip archive [%s]: %s :: %s", e, self.path, archive_file)
|
||||
return False
|
||||
logger.error("Error removing file in zip archive [%s]: %s :: %s", e, self.path, archive_file)
|
||||
raise OSError(f"Error removing file in zip archive [{e}]: {self.path} :: {archive_file}") from e
|
||||
|
||||
def write_file(self, archive_file: str, data: bytes) -> bool:
|
||||
def write_file(self, archive_file: str, data: bytes) -> None:
|
||||
files = self.get_filename_list()
|
||||
self._filename_list = []
|
||||
|
||||
@ -76,10 +78,9 @@ class ZipArchiver(Archiver):
|
||||
if archive_file in files:
|
||||
zf.repack([zf.remove(archive_file)])
|
||||
zf.writestr(archive_file, data)
|
||||
return True
|
||||
except (zipfile.BadZipfile, OSError) as e:
|
||||
logger.error("Error writing zip archive [%s]: %s :: %s", e, self.path, archive_file)
|
||||
return False
|
||||
raise OSError(f"Error writing zip archive [{e}]: {self.path} :: {archive_file}") from e
|
||||
|
||||
def get_filename_list(self) -> list[str]:
|
||||
if self._filename_list:
|
||||
@ -90,7 +91,7 @@ class ZipArchiver(Archiver):
|
||||
return self._filename_list
|
||||
except (zipfile.BadZipfile, OSError) as e:
|
||||
logger.error("Error listing files in zip archive [%s]: %s", e, self.path)
|
||||
return []
|
||||
raise OSError(f"Error listing files in zip archive [{e}]: {self.path}") from e
|
||||
|
||||
def supports_files(self) -> bool:
|
||||
return True
|
||||
@ -125,7 +126,7 @@ class ZipArchiver(Archiver):
|
||||
return False
|
||||
return True
|
||||
|
||||
def copy_from_archive(self, other_archive: Archiver) -> bool:
|
||||
def copy_from_archive(self, other_archive: Archiver) -> None:
|
||||
"""Replace the current zip with one copied from another archive"""
|
||||
self._filename_list = []
|
||||
try:
|
||||
@ -136,15 +137,12 @@ class ZipArchiver(Archiver):
|
||||
zout.writestr(filename, data)
|
||||
|
||||
# preserve the old comment
|
||||
comment = other_archive.get_comment()
|
||||
if comment is not None:
|
||||
if not self.set_comment(comment):
|
||||
return False
|
||||
self.set_comment(other_archive.get_comment())
|
||||
except Exception as e:
|
||||
logger.error("Error while copying to zip archive [%s]: from %s to %s", e, other_archive.path, self.path)
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
raise OSError(
|
||||
f"Error while copying to zip archive [{e}]: from {str(other_archive)!r} to {str(self.path)!r}"
|
||||
) from e
|
||||
|
||||
def is_writable(self) -> bool:
|
||||
return True
|
||||
|
@ -122,6 +122,8 @@ def load_tag_plugins(version: str = f"ComicAPI/{version}", local_plugins: Iterab
|
||||
|
||||
|
||||
class ComicArchive:
|
||||
"""Exceptions from tags/archive should already be logged. Caller must handle display to user and recovery"""
|
||||
|
||||
logo_data = b""
|
||||
pil_available: bool | None = None
|
||||
|
||||
@ -208,7 +210,7 @@ class ComicArchive:
|
||||
return True
|
||||
|
||||
def is_zip(self) -> bool:
|
||||
return self.archiver.name() == "ZIP"
|
||||
return self.archiver.extension() == ".cbz"
|
||||
|
||||
def seems_to_be_a_comic_archive(self) -> bool:
|
||||
if (
|
||||
@ -238,15 +240,15 @@ class ComicArchive:
|
||||
return ""
|
||||
return tags[tag_id].read_raw_tags(self.archiver)
|
||||
|
||||
def write_tags(self, metadata: GenericMetadata, tag_id: str) -> bool:
|
||||
def write_tags(self, metadata: GenericMetadata, tag_id: str) -> None:
|
||||
if tag_id in self.md:
|
||||
del self.md[tag_id]
|
||||
if not tags[tag_id].enabled:
|
||||
logger.warning("%s tags not enabled", tags[tag_id].name())
|
||||
return False
|
||||
return
|
||||
|
||||
self.apply_archive_info_to_metadata(metadata, True, True, hash_archive=self.hash_archive)
|
||||
return tags[tag_id].write_tags(metadata, self.archiver)
|
||||
tags[tag_id].write_tags(metadata, self.archiver)
|
||||
|
||||
def has_tags(self, tag_id: str) -> bool:
|
||||
if tag_id in self.md:
|
||||
@ -255,12 +257,12 @@ class ComicArchive:
|
||||
return False
|
||||
return tags[tag_id].has_tags(self.archiver)
|
||||
|
||||
def remove_tags(self, tag_id: str) -> bool:
|
||||
def remove_tags(self, tag_id: str) -> None:
|
||||
if tag_id in self.md:
|
||||
del self.md[tag_id]
|
||||
if not tags[tag_id].enabled:
|
||||
return False
|
||||
return tags[tag_id].remove_tags(self.archiver)
|
||||
return
|
||||
tags[tag_id].remove_tags(self.archiver)
|
||||
|
||||
def get_page(self, index: int) -> bytes:
|
||||
image_data = b""
|
||||
@ -464,10 +466,17 @@ class ComicArchive:
|
||||
metadata.is_empty = False
|
||||
return metadata
|
||||
|
||||
def export_as_zip(self, zip_filename: pathlib.Path) -> bool:
|
||||
if self.archiver.name() == "ZIP":
|
||||
# nothing to do, we're already a zip
|
||||
return True
|
||||
def export_as(self, new_filename: pathlib.Path, extension: str = ".zip") -> None:
|
||||
"""
|
||||
Unconditionally creates a new file. Does not check the current archive.
|
||||
|
||||
zip_archiver = ZipArchiver.open(zip_filename)
|
||||
return zip_archiver.copy_from_archive(self.archiver)
|
||||
If extension cannot be find reverts to .zip
|
||||
"""
|
||||
zip_archiver = UnknownArchiver.open(new_filename)
|
||||
for archiver in archivers:
|
||||
if extension in archiver.supported_extensions:
|
||||
zip_archiver = archiver.open(new_filename)
|
||||
if isinstance(zip_archiver, UnknownArchiver):
|
||||
zip_archiver = ZipArchiver.open(new_filename)
|
||||
|
||||
zip_archiver.copy_from_archive(self.archiver)
|
||||
|
@ -94,11 +94,12 @@ class ComicRack(Tag):
|
||||
and self.file in archive.get_filename_list()
|
||||
and self._validate_bytes(archive.read_file(self.file))
|
||||
)
|
||||
except Exception:
|
||||
return False
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to Read {self.id} tags from {archive.path}({archive.name()})") from e
|
||||
|
||||
def remove_tags(self, archive: Archiver) -> bool:
|
||||
return self.has_tags(archive) and archive.remove_file(self.file)
|
||||
def remove_tags(self, archive: Archiver) -> None:
|
||||
if self.has_tags(archive):
|
||||
archive.remove_file(self.file)
|
||||
|
||||
def read_tags(self, archive: Archiver) -> GenericMetadata:
|
||||
if self.has_tags(archive):
|
||||
@ -106,8 +107,8 @@ class ComicRack(Tag):
|
||||
metadata = archive.read_file(self.file) or b""
|
||||
if self._validate_bytes(metadata):
|
||||
return self._metadata_from_bytes(metadata)
|
||||
except Exception:
|
||||
...
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to Read {self.id} tags from {archive.path}({archive.name()})") from e
|
||||
return GenericMetadata()
|
||||
|
||||
def read_raw_tags(self, archive: Archiver) -> str:
|
||||
@ -116,22 +117,20 @@ class ComicRack(Tag):
|
||||
b = archive.read_file(self.file)
|
||||
# ET.fromstring is used as xml can declare the encoding
|
||||
return ET.tostring(ET.fromstring(b), encoding="unicode", xml_declaration=True)
|
||||
except Exception:
|
||||
...
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to Read {self.id} tags from {archive.path}({archive.name()})") from e
|
||||
return ""
|
||||
|
||||
def write_tags(self, metadata: GenericMetadata, archive: Archiver) -> bool:
|
||||
def write_tags(self, metadata: GenericMetadata, archive: Archiver) -> None:
|
||||
if self.supports_tags(archive):
|
||||
xml = b""
|
||||
try: # read_file can cause an exception
|
||||
if self.has_tags(archive):
|
||||
xml = archive.read_file(self.file)
|
||||
return archive.write_file(self.file, self._bytes_from_metadata(metadata, xml))
|
||||
except Exception:
|
||||
...
|
||||
else:
|
||||
logger.warning("Archive %s(%s) does not support '%s' metadata", archive.path, archive.name(), self.name())
|
||||
return False
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to write {self.id} tags to {archive.path}({archive.name()})") from e
|
||||
logger.warning("Archive %s(%s) does not support '%s' metadata", archive.path, archive.name(), self.name())
|
||||
|
||||
def name(self) -> str:
|
||||
return "Comic Rack"
|
||||
|
@ -1,126 +1,144 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import ClassVar
|
||||
|
||||
from comicapi.archivers import Archiver
|
||||
from comicapi.genericmetadata import GenericMetadata
|
||||
|
||||
|
||||
class Tag:
|
||||
id: ClassVar[str] = ""
|
||||
"""
|
||||
ID form this tag format.
|
||||
Currently known used IDs are cr, cix, comet, cbi, metroninfo and acbf.
|
||||
You can use an existing ID to override it's behaiour. It is not recommended to do so.
|
||||
"""
|
||||
|
||||
enabled: bool = False
|
||||
id: str = ""
|
||||
"""When set to False it will be excluded from selection in ComicTagger"""
|
||||
|
||||
supported_attributes: set[str] = {
|
||||
"data_origin",
|
||||
"issue_id",
|
||||
"series_id",
|
||||
"original_hash",
|
||||
"series",
|
||||
"series_aliases",
|
||||
"issue",
|
||||
"issue_count",
|
||||
"title",
|
||||
"title_aliases",
|
||||
"volume",
|
||||
"volume_count",
|
||||
"genres",
|
||||
"description",
|
||||
"notes",
|
||||
"alternate_series",
|
||||
"alternate_number",
|
||||
"alternate_count",
|
||||
"gtin",
|
||||
"story_arcs",
|
||||
"series_groups",
|
||||
"publisher",
|
||||
"imprint",
|
||||
"day",
|
||||
"month",
|
||||
"year",
|
||||
"language",
|
||||
"country",
|
||||
"web_link",
|
||||
"format",
|
||||
"manga",
|
||||
"black_and_white",
|
||||
"maturity_rating",
|
||||
"critical_rating",
|
||||
"scan_info",
|
||||
"tags",
|
||||
"pages",
|
||||
"pages.type",
|
||||
"pages.bookmark",
|
||||
"pages.double_page",
|
||||
"pages.image_index",
|
||||
"pages.size",
|
||||
"pages.height",
|
||||
"pages.width",
|
||||
"page_count",
|
||||
"characters",
|
||||
"teams",
|
||||
"locations",
|
||||
"credits",
|
||||
"credits.person",
|
||||
"credits.role",
|
||||
"credits.primary",
|
||||
"credits.language",
|
||||
"price",
|
||||
"is_version_of",
|
||||
"rights",
|
||||
"identifier",
|
||||
"last_mark",
|
||||
}
|
||||
"""Set of GenericMetadata attributes this tag format can handle"""
|
||||
version: str
|
||||
"""Current version of ComicTagger"""
|
||||
|
||||
def __init__(self, version: str) -> None:
|
||||
self.version: str = version
|
||||
self.supported_attributes = {
|
||||
"data_origin",
|
||||
"issue_id",
|
||||
"series_id",
|
||||
"original_hash",
|
||||
"series",
|
||||
"series_aliases",
|
||||
"issue",
|
||||
"issue_count",
|
||||
"title",
|
||||
"title_aliases",
|
||||
"volume",
|
||||
"volume_count",
|
||||
"genres",
|
||||
"description",
|
||||
"notes",
|
||||
"alternate_series",
|
||||
"alternate_number",
|
||||
"alternate_count",
|
||||
"gtin",
|
||||
"story_arcs",
|
||||
"series_groups",
|
||||
"publisher",
|
||||
"imprint",
|
||||
"day",
|
||||
"month",
|
||||
"year",
|
||||
"language",
|
||||
"country",
|
||||
"web_link",
|
||||
"format",
|
||||
"manga",
|
||||
"black_and_white",
|
||||
"maturity_rating",
|
||||
"critical_rating",
|
||||
"scan_info",
|
||||
"tags",
|
||||
"pages",
|
||||
"pages.type",
|
||||
"pages.bookmark",
|
||||
"pages.double_page",
|
||||
"pages.image_index",
|
||||
"pages.size",
|
||||
"pages.height",
|
||||
"pages.width",
|
||||
"page_count",
|
||||
"characters",
|
||||
"teams",
|
||||
"locations",
|
||||
"credits",
|
||||
"credits.person",
|
||||
"credits.role",
|
||||
"credits.primary",
|
||||
"credits.language",
|
||||
"price",
|
||||
"is_version_of",
|
||||
"rights",
|
||||
"identifier",
|
||||
"last_mark",
|
||||
}
|
||||
|
||||
def supports_credit_role(self, role: str) -> bool:
|
||||
"""
|
||||
Return True if this tag format can handle this credit role.
|
||||
Should always return a bool.
|
||||
MUST NOT cause an exception.
|
||||
"""
|
||||
self.supported_attributes
|
||||
return False
|
||||
|
||||
def supports_tags(self, archive: Archiver) -> bool:
|
||||
"""
|
||||
Checks the given archive for the ability to save these tags.
|
||||
Should always return a bool. Failures should return False.
|
||||
Should always return a bool.
|
||||
Typically consists of a call to either `archive.supports_comment` or `archive.supports_file`
|
||||
"""
|
||||
return False
|
||||
raise NotImplementedError
|
||||
|
||||
def has_tags(self, archive: Archiver) -> bool:
|
||||
"""
|
||||
Checks the given archive for tags.
|
||||
Should always return a bool. Failures should return False.
|
||||
Should always return a bool.
|
||||
"""
|
||||
return False
|
||||
raise NotImplementedError
|
||||
|
||||
def remove_tags(self, archive: Archiver) -> bool:
|
||||
def remove_tags(self, archive: Archiver) -> None:
|
||||
"""
|
||||
Removes the tags from the given archive.
|
||||
Should always return a bool. Failures should return False.
|
||||
"""
|
||||
return False
|
||||
raise NotImplementedError
|
||||
|
||||
def read_tags(self, archive: Archiver) -> GenericMetadata:
|
||||
"""
|
||||
Returns a GenericMetadata representing the tags saved in the given archive.
|
||||
Should always return a GenericMetadata. Failures should return an empty metadata object.
|
||||
"""
|
||||
return GenericMetadata()
|
||||
raise NotImplementedError
|
||||
|
||||
def read_raw_tags(self, archive: Archiver) -> str:
|
||||
"""
|
||||
Returns the raw tags as a string.
|
||||
If the tags are a binary format a roughly similar text format should be used.
|
||||
Should always return a string. Failures should return the empty string.
|
||||
"""
|
||||
return ""
|
||||
raise NotImplementedError
|
||||
|
||||
def write_tags(self, metadata: GenericMetadata, archive: Archiver) -> bool:
|
||||
def write_tags(self, metadata: GenericMetadata, archive: Archiver) -> None:
|
||||
"""
|
||||
Saves the given metadata to the given archive.
|
||||
Should always return a bool. Failures should return False.
|
||||
Should always return a bool
|
||||
"""
|
||||
return False
|
||||
raise NotImplementedError
|
||||
|
||||
def name(self) -> str:
|
||||
"""
|
||||
Returns the name of these tags for display purposes eg "Comic Rack".
|
||||
Should always return a string. Failures should return the empty string.
|
||||
Should always return a string.
|
||||
MUST NOT cause an exception.
|
||||
"""
|
||||
return ""
|
||||
|
@ -265,14 +265,15 @@ class AutoTagMatchWindow(QtWidgets.QDialog):
|
||||
QtWidgets.QApplication.setOverrideCursor(QtGui.QCursor(QtCore.Qt.CursorShape.WaitCursor))
|
||||
md = prepare_metadata(md, ct_md, self.config)
|
||||
for tag_id in self._tags:
|
||||
success = ca.write_tags(md, tag_id)
|
||||
QtWidgets.QApplication.restoreOverrideCursor()
|
||||
if not success:
|
||||
try:
|
||||
ca.write_tags(md, tag_id)
|
||||
except Exception as e:
|
||||
QtWidgets.QMessageBox.warning(
|
||||
self,
|
||||
"Write Error",
|
||||
f"Saving {tags[tag_id].name()} the tags to the archive seemed to fail!",
|
||||
f"Saving {tags[tag_id].name()} the tags to the archive seemed to fail! {e}",
|
||||
)
|
||||
break
|
||||
QtWidgets.QApplication.restoreOverrideCursor()
|
||||
|
||||
ca.reset_cache()
|
||||
|
@ -147,8 +147,10 @@ class CLI:
|
||||
if not self.config.Runtime_Options__dryrun:
|
||||
for tag_id in self.config.Runtime_Options__tags_write:
|
||||
# write out the new data
|
||||
if not ca.write_tags(md, tag_id):
|
||||
logger.error("The tag save seemed to fail for: %s!", tags[tag_id].name())
|
||||
try:
|
||||
ca.write_tags(md, tag_id)
|
||||
except Exception:
|
||||
# Error is already displayed in the log
|
||||
return False
|
||||
|
||||
self.output("Save complete.")
|
||||
@ -352,19 +354,21 @@ class CLI:
|
||||
def delete_tags(self, ca: ComicArchive, tag_id: str) -> Status:
|
||||
tag_name = tags[tag_id].name()
|
||||
|
||||
if ca.has_tags(tag_id):
|
||||
if not self.config.Runtime_Options__dryrun:
|
||||
if ca.remove_tags(tag_id):
|
||||
self.output(f"{ca.path}: Removed {tag_name} tags.")
|
||||
return Status.success
|
||||
else:
|
||||
self.output(f"{ca.path}: Tag removal seemed to fail!")
|
||||
return Status.write_failure
|
||||
else:
|
||||
self.output(f"{ca.path}: dry-run. {tag_name} tags not removed")
|
||||
return Status.success
|
||||
self.output(f"{ca.path}: This archive doesn't have {tag_name} tags to remove.")
|
||||
return Status.success
|
||||
if not ca.has_tags(tag_id):
|
||||
self.output(f"{ca.path}: This archive doesn't have {tag_name} tags to remove.")
|
||||
return Status.success
|
||||
if self.config.Runtime_Options__dryrun:
|
||||
self.output(f"{ca.path}: dry-run. {tag_name} tags would be removed")
|
||||
return Status.success
|
||||
|
||||
try:
|
||||
ca.remove_tags(tag_id)
|
||||
self.output(f"{ca.path}: Removed {tag_name} tags.")
|
||||
return Status.success
|
||||
except Exception:
|
||||
self.output(f"{ca.path}: Tag removal seemed to fail!")
|
||||
|
||||
return Status.write_failure
|
||||
|
||||
def delete(self, ca: ComicArchive) -> Result:
|
||||
res = Result(Action.delete, Status.success, ca.path)
|
||||
@ -386,18 +390,21 @@ class CLI:
|
||||
self.output(f"{ca.path}: Destination and source are same: {dst_tag_name}. Nothing to do.")
|
||||
return Status.existing_tags
|
||||
|
||||
if not self.config.Runtime_Options__dryrun:
|
||||
if self.config.Metadata_Options__apply_transform_on_bulk_operation and dst_tag_id == "cbi":
|
||||
md = CBLTransformer(md, self.config).apply()
|
||||
|
||||
if ca.write_tags(md, dst_tag_id):
|
||||
self.output(f"{ca.path}: Copied {source_names} tags to {dst_tag_name}.")
|
||||
else:
|
||||
self.output(f"{ca.path}: Tag copy seemed to fail!")
|
||||
return Status.write_failure
|
||||
else:
|
||||
if self.config.Runtime_Options__dryrun:
|
||||
self.output(f"{ca.path}: dry-run. {source_names} tags not copied")
|
||||
return Status.success
|
||||
return Status.success
|
||||
|
||||
if self.config.Metadata_Options__apply_transform_on_bulk_operation and dst_tag_id == "cbi":
|
||||
md = CBLTransformer(md, self.config).apply()
|
||||
|
||||
try:
|
||||
ca.write_tags(md, dst_tag_id)
|
||||
self.output(f"{ca.path}: Copied {source_names} tags to {dst_tag_name}.")
|
||||
return Status.success
|
||||
except Exception:
|
||||
self.output(f"{ca.path}: Tag copy seemed to fail!")
|
||||
|
||||
return Status.write_failure
|
||||
|
||||
def copy(self, ca: ComicArchive) -> Result:
|
||||
res = Result(Action.copy, Status.success, ca.path)
|
||||
@ -770,25 +777,25 @@ class CLI:
|
||||
|
||||
delete_success = False
|
||||
export_success = False
|
||||
if not self.config.Runtime_Options__dryrun:
|
||||
if ca.export_as_zip(new_file):
|
||||
export_success = True
|
||||
if self.config.Runtime_Options__delete_original:
|
||||
try:
|
||||
filename_path.unlink(missing_ok=True)
|
||||
delete_success = True
|
||||
except OSError:
|
||||
logger.exception("%sError deleting original archive after export", msg_hdr)
|
||||
else:
|
||||
# last export failed, so remove the zip, if it exists
|
||||
new_file.unlink(missing_ok=True)
|
||||
else:
|
||||
if self.config.Runtime_Options__dryrun:
|
||||
msg = msg_hdr + f"Dry-run: Would try to create {os.path.split(new_file)[1]}"
|
||||
if self.config.Runtime_Options__delete_original:
|
||||
msg += " and delete original."
|
||||
self.output(msg)
|
||||
return Result(Action.export, Status.success, ca.path, new_file)
|
||||
|
||||
try:
|
||||
ca.export_as(new_file)
|
||||
export_success = True
|
||||
if self.config.Runtime_Options__delete_original:
|
||||
try:
|
||||
filename_path.unlink(missing_ok=False)
|
||||
delete_success = True
|
||||
except OSError:
|
||||
logger.exception("%sError deleting original archive after export", msg_hdr)
|
||||
except Exception:
|
||||
new_file.unlink(missing_ok=True)
|
||||
|
||||
msg = msg_hdr
|
||||
if export_success:
|
||||
msg += f"Archive exported successfully to: {os.path.split(new_file)[1]}"
|
||||
|
@ -533,7 +533,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
|
||||
def repackage_archive(self) -> None:
|
||||
ca_list = self.fileSelectionList.get_selected_archive_list()
|
||||
non_zip_count = 0
|
||||
to_zip = []
|
||||
to_zip: list[ComicArchive] = []
|
||||
largest_page_size = 0
|
||||
for ca in ca_list:
|
||||
largest_page_size = max(largest_page_size, len(ca.get_page_name_list()))
|
||||
@ -581,7 +581,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
|
||||
new_archives_to_add = []
|
||||
archives_to_remove = []
|
||||
skipped_list = []
|
||||
failed_list = []
|
||||
failed_list: list[Exception] = []
|
||||
success_count = 0
|
||||
logger.debug("Exporting %d comics to zip", len(to_zip))
|
||||
|
||||
@ -607,7 +607,8 @@ class TaggerWindow(QtWidgets.QMainWindow):
|
||||
|
||||
if export:
|
||||
logger.debug("Exporting %s to %s", ca.path, export_name)
|
||||
if ca.export_as_zip(export_name):
|
||||
try:
|
||||
ca.export_as(export_name)
|
||||
success_count += 1
|
||||
if EW.addToList:
|
||||
new_archives_to_add.append(str(export_name))
|
||||
@ -615,9 +616,8 @@ class TaggerWindow(QtWidgets.QMainWindow):
|
||||
archives_to_remove.append(ca)
|
||||
ca.path.unlink(missing_ok=True)
|
||||
|
||||
else:
|
||||
# last export failed, so remove the zip, if it exists
|
||||
failed_list.append(ca.path)
|
||||
except Exception as e:
|
||||
failed_list.append(OSError(f"Failed to export {ca.path} to {export_name}: {e}"))
|
||||
if export_name.exists():
|
||||
export_name.unlink(missing_ok=True)
|
||||
|
||||
@ -633,11 +633,9 @@ class TaggerWindow(QtWidgets.QMainWindow):
|
||||
for f in skipped_list:
|
||||
summary += f"\t{f}\n"
|
||||
if failed_list:
|
||||
summary += (
|
||||
f"\n\nThe following {len(failed_list)} archive(s) failed to export due to read/write errors:\n"
|
||||
)
|
||||
for f in failed_list:
|
||||
summary += f"\t{f}\n"
|
||||
summary += f"\n\nThe following {len(failed_list)} archive(s) failed to export:\n"
|
||||
for ex in failed_list:
|
||||
summary += f"\t{ex}\n"
|
||||
|
||||
logger.info(summary)
|
||||
dlg = LogWindow(self)
|
||||
@ -1198,8 +1196,9 @@ class TaggerWindow(QtWidgets.QMainWindow):
|
||||
failed_tag: str = ""
|
||||
# Save each tag
|
||||
for tag_id in self.selected_write_tags:
|
||||
success = self.comic_archive.write_tags(self.metadata, tag_id)
|
||||
if not success:
|
||||
try:
|
||||
self.comic_archive.write_tags(self.metadata, tag_id)
|
||||
except Exception:
|
||||
failed_tag = tags[tag_id].name()
|
||||
break
|
||||
|
||||
@ -1225,7 +1224,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
|
||||
"Read Failed!",
|
||||
f"One or more of the selected read tags failed to load for {self.comic_archive.path}, check log for details",
|
||||
)
|
||||
logger.error("Failed to load metadata for %s: %s", self.ca.path, error)
|
||||
logger.error("Failed to load metadata for %s: %s", self.comic_archive.path, error)
|
||||
|
||||
self.fileSelectionList.update_current_row()
|
||||
self.update_ui_for_archive()
|
||||
@ -1611,7 +1610,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
|
||||
progdialog.setMinimumDuration(300)
|
||||
center_window_on_parent(progdialog)
|
||||
|
||||
failed_list = []
|
||||
failed_list: list[Exception] = []
|
||||
success_count = 0
|
||||
for prog_idx, ca in enumerate(ca_list, 1):
|
||||
if prog_idx % 10 == 0:
|
||||
@ -1622,10 +1621,13 @@ class TaggerWindow(QtWidgets.QMainWindow):
|
||||
progdialog.setLabelText(str(ca.path))
|
||||
for tag_id in tag_ids:
|
||||
if ca.has_tags(tag_id) and ca.is_writable():
|
||||
if ca.remove_tags(tag_id):
|
||||
try:
|
||||
ca.remove_tags(tag_id)
|
||||
success_count += 1
|
||||
else:
|
||||
failed_list.append(ca.path)
|
||||
except Exception as e:
|
||||
failed_list.append(
|
||||
OSError(f"Failed to remove {tags[tag_id].name()} from {ca.path}: {e}")
|
||||
)
|
||||
# Abandon any further tag removals to prevent any greater damage to archive
|
||||
break
|
||||
ca.reset_cache()
|
||||
@ -1640,8 +1642,8 @@ class TaggerWindow(QtWidgets.QMainWindow):
|
||||
summary = f"Successfully removed {success_count} tags in archive(s)."
|
||||
if failed_list:
|
||||
summary += f"\n\nThe remove operation failed in the following {len(failed_list)} archive(s):\n"
|
||||
for f in failed_list:
|
||||
summary += f"\t{f}\n"
|
||||
for ex in failed_list:
|
||||
summary += f"\t{ex}\n"
|
||||
|
||||
dlg = LogWindow(self)
|
||||
dlg.set_text(summary)
|
||||
@ -1686,13 +1688,13 @@ class TaggerWindow(QtWidgets.QMainWindow):
|
||||
return
|
||||
|
||||
if has_src_count != 0:
|
||||
src_tags = ", ".join([tags[tag_id].name() for tag_id in src_tag_ids])
|
||||
dst_tags = ", ".join([tags[tag_id].name() for tag_id in dest_tag_ids])
|
||||
reply = QtWidgets.QMessageBox.question(
|
||||
self,
|
||||
"Copy Tags",
|
||||
f"Are you sure you wish to copy the combined (with overlay order) tags of "
|
||||
f"{', '.join([tags[tag_id].name() for tag_id in src_tag_ids])} "
|
||||
f"to {', '.join([tags[tag_id].name() for tag_id in dest_tag_ids])} tags in "
|
||||
f"{has_src_count} archive(s)?",
|
||||
f"{src_tags} to {dst_tags} tags in {has_src_count} archive(s)?",
|
||||
QtWidgets.QMessageBox.StandardButton.Yes,
|
||||
QtWidgets.QMessageBox.StandardButton.No,
|
||||
)
|
||||
@ -1705,7 +1707,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
|
||||
center_window_on_parent(prog_dialog)
|
||||
QtCore.QCoreApplication.processEvents()
|
||||
|
||||
failed_list = []
|
||||
failed_list: list[Exception] = []
|
||||
success_count = 0
|
||||
for prog_idx, ca in enumerate(ca_list, 1):
|
||||
if prog_idx % 10 == 0:
|
||||
@ -1713,7 +1715,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
|
||||
ca_saved = False
|
||||
md, error = self.read_selected_tags(src_tag_ids, ca)
|
||||
if error is not None:
|
||||
failed_list.append(ca.path)
|
||||
failed_list.append(error)
|
||||
continue
|
||||
if md.is_empty:
|
||||
continue
|
||||
@ -1730,12 +1732,15 @@ class TaggerWindow(QtWidgets.QMainWindow):
|
||||
if tag_id == "cbi" and self.config[0].Metadata_Options__apply_transform_on_bulk_operation:
|
||||
md = CBLTransformer(md, self.config[0]).apply()
|
||||
|
||||
if ca.write_tags(md, tag_id):
|
||||
try:
|
||||
ca.write_tags(md, tag_id)
|
||||
if not ca_saved:
|
||||
success_count += 1
|
||||
ca_saved = True
|
||||
else:
|
||||
failed_list.append(ca.path)
|
||||
except Exception as e:
|
||||
failed_list.append(
|
||||
OSError(f"Failed to copy {src_tags} to {dst_tags} tags for {ca.path}: {e}")
|
||||
)
|
||||
|
||||
ca.reset_cache()
|
||||
ca.load_cache({*self.selected_read_tags, *self.selected_write_tags})
|
||||
@ -1749,8 +1754,8 @@ class TaggerWindow(QtWidgets.QMainWindow):
|
||||
summary = f"Successfully copied tags in {success_count} archive(s)."
|
||||
if failed_list:
|
||||
summary += f"\n\nThe copy operation failed in the following {len(failed_list)} archive(s):\n"
|
||||
for f in failed_list:
|
||||
summary += f"\t{f}\n"
|
||||
for ex in failed_list:
|
||||
summary += f"\t{ex}\n"
|
||||
|
||||
dlg = LogWindow(self)
|
||||
dlg.set_text(summary)
|
||||
@ -1778,7 +1783,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
|
||||
"Aborting...",
|
||||
f"One or more of the read tags failed to load for {ca.path}. Aborting to prevent any possible further damage. Check log for details.",
|
||||
)
|
||||
logger.error("Failed to load tags from %s: %s", self.ca.path, error)
|
||||
logger.error("Failed to load tags from %s: %s", ca.path, error)
|
||||
return False, match_results
|
||||
|
||||
if md.is_empty:
|
||||
@ -1929,9 +1934,11 @@ class TaggerWindow(QtWidgets.QMainWindow):
|
||||
def write_Tags() -> bool:
|
||||
for tag_id in self.selected_write_tags:
|
||||
# write out the new data
|
||||
if not ca.write_tags(md, tag_id):
|
||||
try:
|
||||
ca.write_tags(md, tag_id)
|
||||
except Exception as e:
|
||||
self.auto_tag_log(
|
||||
f"{tags[tag_id].name()} save failed! Aborting any additional tag saves.\n"
|
||||
f"{tags[tag_id].name()} save failed! {e}\nAborting any additional tag saves.\n"
|
||||
)
|
||||
return False
|
||||
return True
|
||||
|
@ -44,7 +44,8 @@ def test_read_tags(cbz, md_saved):
|
||||
assert md == md_saved
|
||||
|
||||
|
||||
def test_write_cr(tmp_comic):
|
||||
def test_write_cr(tmp_comic_path):
|
||||
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
|
||||
md = tmp_comic.read_tags("cr")
|
||||
md.apply_default_page_list(tmp_comic.get_page_name_list())
|
||||
|
||||
@ -70,7 +71,8 @@ def test_save_cr_rar(tmp_path, md_saved):
|
||||
assert md == md_saved
|
||||
|
||||
|
||||
def test_page_type_write(tmp_comic):
|
||||
def test_page_type_write(tmp_comic_path):
|
||||
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
|
||||
md = tmp_comic.read_tags("cr")
|
||||
t = md.pages[0]
|
||||
t.type = ""
|
||||
@ -80,12 +82,13 @@ def test_page_type_write(tmp_comic):
|
||||
md = tmp_comic.read_tags("cr")
|
||||
|
||||
|
||||
def test_invalid_zip(tmp_comic: comicapi.comicarchive.ComicArchive):
|
||||
with open(tmp_comic.path, mode="b+r") as f:
|
||||
def test_invalid_zip(tmp_comic_path):
|
||||
with open(tmp_comic_path, mode="b+r") as f:
|
||||
# Corrupting the first file only breaks the first file. If it is never read then no exception will be raised
|
||||
f.seek(-10, os.SEEK_END) # seek to a probably bad place in th Central Directory and write some bytes
|
||||
f.write(b"PK\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000")
|
||||
|
||||
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
|
||||
result = tmp_comic.write_tags(comicapi.genericmetadata.md_test, "cr") # This is not the first file
|
||||
assert result
|
||||
assert not tmp_comic.seems_to_be_a_comic_archive() # Calls archiver.is_valid
|
||||
@ -121,7 +124,8 @@ def test_copy_from_archive(archiver, tmp_path, cbz, md_saved):
|
||||
assert md == md_saved
|
||||
|
||||
|
||||
def test_rename(tmp_comic, tmp_path):
|
||||
def test_rename(tmp_comic_path, tmp_path):
|
||||
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
|
||||
old_path = tmp_comic.path
|
||||
tmp_comic.rename(tmp_path / "test.cbz")
|
||||
assert not old_path.exists()
|
||||
@ -129,8 +133,9 @@ def test_rename(tmp_comic, tmp_path):
|
||||
assert tmp_comic.path != old_path
|
||||
|
||||
|
||||
def test_rename_ro_dest(tmp_comic, tmp_path):
|
||||
old_path = tmp_comic.path
|
||||
def test_rename_ro_dest(tmp_comic_path, tmp_path):
|
||||
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
|
||||
|
||||
dest = tmp_path / "tmp"
|
||||
dest.mkdir(mode=0o000)
|
||||
with pytest.raises(OSError):
|
||||
@ -138,6 +143,6 @@ def test_rename_ro_dest(tmp_comic, tmp_path):
|
||||
raise OSError("Windows sucks")
|
||||
tmp_comic.rename(dest / "test.cbz")
|
||||
dest.chmod(mode=0o777)
|
||||
assert old_path.exists()
|
||||
assert tmp_comic_path.exists()
|
||||
assert tmp_comic.path.exists()
|
||||
assert tmp_comic.path == old_path
|
||||
assert tmp_comic.path == tmp_comic_path
|
||||
|
@ -3,6 +3,7 @@ from __future__ import annotations
|
||||
import copy
|
||||
import datetime
|
||||
import io
|
||||
import pathlib
|
||||
import shutil
|
||||
import unittest.mock
|
||||
from argparse import Namespace
|
||||
@ -33,17 +34,20 @@ except ImportError:
|
||||
|
||||
@pytest.fixture
|
||||
def cbz():
|
||||
yield comicapi.comicarchive.ComicArchive(filenames.cbz_path)
|
||||
yield comicapi.comicarchive.ComicArchive(
|
||||
str(filenames.cbz_path)
|
||||
) # When testing these always refer to a file on a filesystem
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_comic(tmp_path):
|
||||
shutil.copy(filenames.cbz_path, tmp_path)
|
||||
yield comicapi.comicarchive.ComicArchive(tmp_path / filenames.cbz_path.name)
|
||||
def tmp_comic_path(tmp_path: pathlib.Path):
|
||||
shutil.copy(str(filenames.cbz_path), str(tmp_path)) # When testing these always refer to a file on a filesystem
|
||||
yield (tmp_path / filenames.cbz_path.name)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def cbz_double_cover(tmp_path, tmp_comic):
|
||||
def cbz_double_cover(tmp_path, tmp_comic_path):
|
||||
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
|
||||
cover = Image.open(io.BytesIO(tmp_comic.get_page(0)))
|
||||
|
||||
other_page = Image.open(io.BytesIO(tmp_comic.get_page(tmp_comic.get_number_of_pages() - 1)))
|
||||
@ -53,7 +57,6 @@ def cbz_double_cover(tmp_path, tmp_comic):
|
||||
double_cover.paste(cover, (cover.width, 0))
|
||||
|
||||
tmp_comic.archiver.write_file("double_cover.jpg", double_cover.tobytes("jpeg", "RGB"))
|
||||
yield tmp_comic
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
|
@ -12,10 +12,11 @@ from comictalker.comictalker import ComicTalker
|
||||
|
||||
def test_save(
|
||||
plugin_config: tuple[settngs.Config[ctsettings.ct_ns], dict[str, ComicTalker]],
|
||||
tmp_comic,
|
||||
tmp_comic_path,
|
||||
md_saved,
|
||||
mock_now,
|
||||
) -> None:
|
||||
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
|
||||
# Overwrite the series so it has definitely changed
|
||||
tmp_comic.write_tags(md_saved.replace(series="nothing"), "cr")
|
||||
|
||||
@ -24,9 +25,6 @@ def test_save(
|
||||
# Check that it changed
|
||||
assert md != md_saved
|
||||
|
||||
# Clear the cached tags
|
||||
tmp_comic.reset_cache()
|
||||
|
||||
# Setup the app
|
||||
config = plugin_config[0]
|
||||
talkers = plugin_config[1]
|
||||
@ -37,7 +35,7 @@ def test_save(
|
||||
# Check online, should be intercepted by comicvine_api
|
||||
config[0].Auto_Tag__online = True
|
||||
# Use the temporary comic we created
|
||||
config[0].Runtime_Options__files = [tmp_comic.path]
|
||||
config[0].Runtime_Options__files = [tmp_comic_path]
|
||||
# Read and save ComicRack tags
|
||||
config[0].Runtime_Options__tags_read = ["cr"]
|
||||
config[0].Runtime_Options__tags_write = ["cr"]
|
||||
@ -46,6 +44,9 @@ def test_save(
|
||||
# Run ComicTagger
|
||||
CLI(config[0], talkers).run()
|
||||
|
||||
# tmp_comic is invalid it can't handle outside changes so we need a new one
|
||||
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
|
||||
|
||||
# Read the CBZ
|
||||
md = tmp_comic.read_tags("cr")
|
||||
|
||||
@ -68,18 +69,16 @@ def test_save(
|
||||
|
||||
def test_delete(
|
||||
plugin_config: tuple[settngs.Config[ctsettings.ct_ns], dict[str, ComicTalker]],
|
||||
tmp_comic,
|
||||
tmp_comic_path,
|
||||
md_saved,
|
||||
mock_now,
|
||||
) -> None:
|
||||
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
|
||||
md = tmp_comic.read_tags("cr")
|
||||
|
||||
# Check that the metadata starts correct
|
||||
assert md == md_saved
|
||||
|
||||
# Clear the cached metadata
|
||||
tmp_comic.reset_cache()
|
||||
|
||||
# Setup the app
|
||||
config = plugin_config[0]
|
||||
talkers = plugin_config[1]
|
||||
@ -88,12 +87,15 @@ def test_delete(
|
||||
config[0].Commands__command = comictaggerlib.resulttypes.Action.delete
|
||||
|
||||
# Use the temporary comic we created
|
||||
config[0].Runtime_Options__files = [tmp_comic.path]
|
||||
config[0].Runtime_Options__files = [tmp_comic_path]
|
||||
# Delete ComicRack tags
|
||||
config[0].Runtime_Options__tags_write = ["cr"]
|
||||
# Run ComicTagger
|
||||
CLI(config[0], talkers).run()
|
||||
|
||||
# tmp_comic is invalid it can't handle outside changes so we need a new one
|
||||
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
|
||||
|
||||
# Read the CBZ
|
||||
md = tmp_comic.read_tags("cr")
|
||||
|
||||
@ -106,10 +108,11 @@ def test_delete(
|
||||
|
||||
def test_rename(
|
||||
plugin_config: tuple[settngs.Config[ctsettings.ct_ns], dict[str, ComicTalker]],
|
||||
tmp_comic,
|
||||
tmp_comic_path,
|
||||
md_saved,
|
||||
mock_now,
|
||||
) -> None:
|
||||
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
|
||||
md = tmp_comic.read_tags("cr")
|
||||
|
||||
# Check that the metadata starts correct
|
||||
@ -126,7 +129,7 @@ def test_rename(
|
||||
config[0].Commands__command = comictaggerlib.resulttypes.Action.rename
|
||||
|
||||
# Use the temporary comic we created
|
||||
config[0].Runtime_Options__files = [tmp_comic.path]
|
||||
config[0].Runtime_Options__files = [tmp_comic_path]
|
||||
|
||||
# Set the template
|
||||
config[0].File_Rename__template = "{series}"
|
||||
@ -135,8 +138,11 @@ def test_rename(
|
||||
# Run ComicTagger
|
||||
CLI(config[0], talkers).run()
|
||||
|
||||
# tmp_comic is invalid it can't handle outside changes so we need a new one
|
||||
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
|
||||
|
||||
# Update the comic path
|
||||
tmp_comic.path = tmp_comic.path.parent / (md.series + ".cbz")
|
||||
tmp_comic.path = tmp_comic.path.parent / ((md.series or "comic") + ".cbz")
|
||||
|
||||
# Read the CBZ
|
||||
md = tmp_comic.read_tags("cr")
|
||||
|
@ -5,6 +5,7 @@ import io
|
||||
import pytest
|
||||
from PIL import Image
|
||||
|
||||
import comicapi.comicarchive
|
||||
import comictaggerlib.imagehasher
|
||||
import comictaggerlib.issueidentifier
|
||||
import testing.comicdata
|
||||
@ -13,15 +14,16 @@ from comicapi.genericmetadata import ImageHash
|
||||
from comictaggerlib.resulttypes import IssueResult
|
||||
|
||||
|
||||
def test_crop(cbz_double_cover, config, tmp_path, comicvine_api):
|
||||
def test_crop(cbz_double_cover, config, tmp_path, comicvine_api, tmp_comic_path):
|
||||
tmp_comic = comicapi.comicarchive.ComicArchive(tmp_comic_path)
|
||||
config, definitions = config
|
||||
|
||||
ii = comictaggerlib.issueidentifier.IssueIdentifier(cbz_double_cover, config, comicvine_api)
|
||||
ii = comictaggerlib.issueidentifier.IssueIdentifier(tmp_comic, config, comicvine_api)
|
||||
|
||||
im = Image.open(io.BytesIO(cbz_double_cover.archiver.read_file("double_cover.jpg")))
|
||||
im = Image.open(io.BytesIO(tmp_comic.archiver.read_file("double_cover.jpg")))
|
||||
|
||||
cropped = ii._crop_double_page(im)
|
||||
original = cbz_double_cover.get_page(0)
|
||||
original = tmp_comic.get_page(0)
|
||||
|
||||
original_hash = comictaggerlib.imagehasher.ImageHasher(data=original).average_hash()
|
||||
cropped_hash = comictaggerlib.imagehasher.ImageHasher(image=cropped).average_hash()
|
||||
|
@ -5,6 +5,7 @@ from importlib_metadata import entry_points
|
||||
|
||||
import comicapi.genericmetadata
|
||||
import testing.comicdata
|
||||
from comicapi.archivers.zip import ZipArchiver
|
||||
from comictaggerlib.md import prepare_metadata
|
||||
|
||||
tags = []
|
||||
@ -20,11 +21,12 @@ if not tags:
|
||||
|
||||
|
||||
@pytest.mark.parametrize("tag_type", tags)
|
||||
def test_metadata(mock_version, tmp_comic, md_saved, tag_type):
|
||||
def test_metadata(mock_version, tmp_comic_path, md_saved, tag_type):
|
||||
archiver = ZipArchiver.open(tmp_comic_path)
|
||||
tag = tag_type(mock_version[0])
|
||||
supported_attributes = tag.supported_attributes
|
||||
tag.write_tags(comicapi.genericmetadata.md_test, tmp_comic.archiver)
|
||||
written_metadata = tag.read_tags(tmp_comic.archiver)
|
||||
tag.write_tags(comicapi.genericmetadata.md_test, archiver)
|
||||
written_metadata = tag.read_tags(archiver)
|
||||
md = md_saved._get_clean_metadata(*supported_attributes)
|
||||
|
||||
# Hack back in the pages variable because CoMet supports identifying the cover by the filename
|
||||
|
Reference in New Issue
Block a user