Ensure comicapi is as consistent as possible

This commit is contained in:
Timmy Welch 2022-06-01 21:14:51 -07:00
parent fd4c453854
commit e8fa51ad45
7 changed files with 415 additions and 330 deletions

View File

@ -36,7 +36,6 @@ class CoMet:
editor_synonyms = ["editor"]
def metadata_from_string(self, string: str) -> GenericMetadata:
tree = ET.ElementTree(ET.fromstring(string))
return self.convert_xml_to_metadata(tree)
@ -127,7 +126,6 @@ class CoMet:
return tree
def convert_xml_to_metadata(self, tree: ET.ElementTree) -> GenericMetadata:
root = tree.getroot()
if root.tag != "comet":
@ -143,24 +141,24 @@ class CoMet:
return node.text
return None
md.series = get("series")
md.title = get("title")
md.issue = get("issue")
md.volume = get("volume")
md.comments = get("description")
md.publisher = get("publisher")
md.language = get("language")
md.format = get("format")
md.page_count = get("pages")
md.maturity_rating = get("rating")
md.price = get("price")
md.is_version_of = get("isVersionOf")
md.rights = get("rights")
md.identifier = get("identifier")
md.last_mark = get("lastMark")
md.genre = get("genre") # TODO - repeatable field
md.series = utils.xlate(get("series"))
md.title = utils.xlate(get("title"))
md.issue = utils.xlate(get("issue"))
md.volume = utils.xlate(get("volume"))
md.comments = utils.xlate(get("description"))
md.publisher = utils.xlate(get("publisher"))
md.language = utils.xlate(get("language"))
md.format = utils.xlate(get("format"))
md.page_count = utils.xlate(get("pages"))
md.maturity_rating = utils.xlate(get("rating"))
md.price = utils.xlate(get("price"))
md.is_version_of = utils.xlate(get("isVersionOf"))
md.rights = utils.xlate(get("rights"))
md.identifier = utils.xlate(get("identifier"))
md.last_mark = utils.xlate(get("lastMark"))
md.genre = utils.xlate(get("genre")) # TODO - repeatable field
date = get("date")
date = utils.xlate(get("date"))
if date is not None:
parts = date.split("-")
if len(parts) > 0:
@ -168,9 +166,9 @@ class CoMet:
if len(parts) > 1:
md.month = parts[1]
md.cover_image = get("coverImage")
md.cover_image = utils.xlate(get("coverImage"))
reading_direction = get("readingDirection")
reading_direction = utils.xlate(get("readingDirection"))
if reading_direction is not None and reading_direction == "rtl":
md.manga = "YesAndRightToLeft"
@ -208,18 +206,16 @@ class CoMet:
tree = ET.ElementTree(ET.fromstring(string))
root = tree.getroot()
if root.tag != "comet":
raise Exception
return False
except ET.ParseError:
return False
return True
def write_to_external_file(self, filename: str, metadata: GenericMetadata) -> None:
tree = self.convert_metadata_to_xml(metadata)
tree.write(filename, encoding="utf-8")
def read_from_external_file(self, filename: str) -> GenericMetadata:
tree = ET.parse(filename)
return self.convert_xml_to_metadata(tree)

View File

@ -19,6 +19,7 @@ import logging
import os
import pathlib
import platform
import shutil
import struct
import subprocess
import tempfile
@ -53,7 +54,9 @@ except ImportError:
logger = logging.getLogger(__name__)
if not pil_available:
logger.exception("PIL unavalable")
logger.error("PIL unavalable")
if not rar_support:
logger.error("unrar-cffi unavailable")
class MetaDataStyle:
@ -69,7 +72,7 @@ class UnknownArchiver:
"""Unknown implementation"""
def __init__(self, path: pathlib.Path | str) -> None:
self.path = path
self.path = pathlib.Path(path)
def get_comment(self) -> str:
return ""
@ -77,25 +80,31 @@ class UnknownArchiver:
def set_comment(self, comment: str) -> bool:
return False
def read_file(self, archive_file: str) -> bytes | None:
return None
def write_file(self, archive_file: str, data: bytes) -> bool:
return False
def read_file(self, archive_file: str) -> bytes:
raise NotImplementedError
def remove_file(self, archive_file: str) -> bool:
return False
def write_file(self, archive_file: str, data: bytes) -> bool:
return False
def get_filename_list(self) -> list[str]:
return []
def rebuild(self, exclude_list: list[str]) -> bool:
return False
def copy_from_archive(self, other_archive: UnknownArchiver) -> bool:
return False
class SevenZipArchiver(UnknownArchiver):
"""7Z implementation"""
def __init__(self, path: pathlib.Path | str) -> None:
self.path = pathlib.Path(path)
super().__init__(path)
# @todo: Implement Comment?
def get_comment(self) -> str:
@ -109,39 +118,30 @@ class SevenZipArchiver(UnknownArchiver):
try:
with py7zr.SevenZipFile(self.path, "r") as zf:
data = zf.read(archive_file)[archive_file].read()
except py7zr.Bad7zFile as e:
logger.error("bad 7zip file [%s]: %s :: %s", e, self.path, archive_file)
raise OSError from e
except OSError as e:
logger.error("bad 7zip file [%s]: %s :: %s", e, self.path, archive_file)
raise OSError from e
except (py7zr.Bad7zFile, OSError) as e:
logger.error("Error reading 7zip archive [%s]: %s :: %s", e, self.path, archive_file)
raise
return data
def remove_file(self, archive_file: str) -> bool:
try:
self.rebuild_zip_file([archive_file])
except (py7zr.Bad7zFile, OSError):
logger.exception("Failed to remove %s from 7zip archive", archive_file)
return False
else:
return True
return self.rebuild([archive_file])
def write_file(self, archive_file: str, data: bytes) -> bool:
# At the moment, no other option but to rebuild the whole
# zip archive w/o the indicated file. Very sucky, but maybe
# At the moment, no other option but to rebuild the whole
# archive w/o the indicated file. Very sucky, but maybe
# another solution can be found
try:
files = self.get_filename_list()
if archive_file in files:
self.rebuild_zip_file([archive_file])
files = self.get_filename_list()
if archive_file in files:
self.rebuild([archive_file])
try:
# now just add the archive file as a new one
with py7zr.SevenZipFile(self.path, "a") as zf:
zf.writestr(data, archive_file)
return True
except (py7zr.Bad7zFile, OSError):
logger.exception("Writing zip file failed")
except (py7zr.Bad7zFile, OSError) as e:
logger.error("Error writing 7zip archive [%s]: %s :: %s", e, self.path, archive_file)
return False
def get_filename_list(self) -> list[str]:
@ -151,41 +151,44 @@ class SevenZipArchiver(UnknownArchiver):
return namelist
except (py7zr.Bad7zFile, OSError) as e:
logger.error("Unable to get 7zip file list [%s]: %s", e, self.path)
logger.error("Error listing files in 7zip archive [%s]: %s", e, self.path)
return []
def rebuild_zip_file(self, exclude_list: list[str]) -> None:
def rebuild(self, exclude_list: list[str]) -> bool:
"""Zip helper func
This recompresses the zip archive, without the files in the exclude_list
"""
tmp_fd, tmp_name = tempfile.mkstemp(dir=os.path.dirname(self.path))
os.close(tmp_fd)
try:
with py7zr.SevenZipFile(self.path, "r") as zin:
# py7zr treats all archives as if they used solid compression
# so we need to get the filename list first to read all the files at once
with py7zr.SevenZipFile(self.path, mode="r") as zin:
targets = [f for f in zin.getnames() if f not in exclude_list]
with py7zr.SevenZipFile(self.path, "r") as zin:
with py7zr.SevenZipFile(tmp_name, "w") as zout:
for fname, bio in zin.read(targets).items():
zout.writef(bio, fname)
except (py7zr.Bad7zFile, OSError):
logger.exception("Error rebuilding 7zip file: %s", self.path)
with tempfile.NamedTemporaryFile(dir=os.path.dirname(self.path), delete=False) as tmp_file:
with py7zr.SevenZipFile(tmp_file.file, mode="w") as zout:
with py7zr.SevenZipFile(self.path, mode="r") as zin:
for filename, buffer in zin.read(targets).items():
zout.writef(buffer, filename)
os.remove(self.path)
shutil.move(tmp_file.name, self.path)
except (py7zr.Bad7zFile, OSError) as e:
logger.error("Error rebuilding 7zip file [%s]: %s", e, self.path)
return False
return True
# replace with the new file
os.remove(self.path)
os.rename(tmp_name, self.path)
def copy_from_archive(self, otherArchive: UnknownArchiver) -> bool:
def copy_from_archive(self, other_archive: UnknownArchiver) -> bool:
"""Replace the current zip with one copied from another archive"""
try:
with py7zr.SevenZipFile(self.path, "w") as zout:
for fname in otherArchive.get_filename_list():
data = otherArchive.read_file(fname)
for filename in other_archive.get_filename_list():
data = other_archive.read_file(
filename
) # This will be very inefficient if other_archive is a 7z file
if data is not None:
zout.writestr(data, fname)
zout.writestr(data, filename)
except Exception as e:
logger.exception("Error while copying to %s: %s", self.path, e)
logger.error("Error while copying to 7zip archive [%s]: from %s to %s", e, other_archive.path, self.path)
return False
else:
return True
@ -196,7 +199,7 @@ class ZipArchiver(UnknownArchiver):
"""ZIP implementation"""
def __init__(self, path: pathlib.Path | str) -> None:
self.path = pathlib.Path(path)
super().__init__(path)
def get_comment(self) -> str:
with zipfile.ZipFile(self.path, "r") as zf:
@ -204,68 +207,58 @@ class ZipArchiver(UnknownArchiver):
return comment
def set_comment(self, comment: str) -> bool:
with zipfile.ZipFile(self.path, "a") as zf:
with zipfile.ZipFile(self.path, mode="a") as zf:
zf.comment = bytes(comment, "utf-8")
return True
def read_file(self, archive_file: str) -> bytes:
with zipfile.ZipFile(self.path, "r") as zf:
with zipfile.ZipFile(self.path, mode="r") as zf:
try:
data = zf.read(archive_file)
except zipfile.BadZipfile as e:
logger.error("bad zipfile [%s]: %s :: %s", e, self.path, archive_file)
raise OSError from e
except OSError as e:
logger.error("bad zipfile [%s]: %s :: %s", e, self.path, archive_file)
raise OSError from e
except (zipfile.BadZipfile, OSError) as e:
logger.error("Error reading zip archive [%s]: %s :: %s", e, self.path, archive_file)
raise
return data
def remove_file(self, archive_file: str) -> bool:
try:
self.rebuild_zip_file([archive_file])
except (zipfile.BadZipfile, OSError):
logger.exception("Failed to remove %s from zip archive", archive_file)
return False
else:
return True
return self.rebuild([archive_file])
def write_file(self, archive_file: str, data: bytes) -> bool:
# At the moment, no other option but to rebuild the whole
# zip archive w/o the indicated file. Very sucky, but maybe
# At the moment, no other option but to rebuild the whole
# zip archive w/o the indicated file. Very sucky, but maybe
# another solution can be found
try:
files = self.get_filename_list()
if archive_file in files:
self.rebuild_zip_file([archive_file])
files = self.get_filename_list()
if archive_file in files:
self.rebuild([archive_file])
try:
# now just add the archive file as a new one
with zipfile.ZipFile(self.path, mode="a", allowZip64=True, compression=zipfile.ZIP_DEFLATED) as zf:
zf.writestr(archive_file, data)
return True
except (zipfile.BadZipfile, OSError) as e:
logger.error("writing zip file failed [%s]: %s", e, self.path)
logger.error("Error writing zip archive [%s]: %s :: %s", e, self.path, archive_file)
return False
def get_filename_list(self) -> list[str]:
try:
with zipfile.ZipFile(self.path, "r") as zf:
with zipfile.ZipFile(self.path, mode="r") as zf:
namelist = zf.namelist()
return namelist
except (zipfile.BadZipfile, OSError) as e:
logger.error("Unable to get zipfile list [%s]: %s", e, self.path)
logger.error("Error listing files in zip archive [%s]: %s", e, self.path)
return []
def rebuild_zip_file(self, exclude_list: list[str]) -> None:
def rebuild(self, exclude_list: list[str]) -> bool:
"""Zip helper func
This recompresses the zip archive, without the files in the exclude_list
"""
tmp_fd, tmp_name = tempfile.mkstemp(dir=os.path.dirname(self.path))
os.close(tmp_fd)
try:
with zipfile.ZipFile(self.path, "r") as zin:
with zipfile.ZipFile(tmp_name, "w", allowZip64=True) as zout:
with zipfile.ZipFile(
tempfile.NamedTemporaryFile(dir=os.path.dirname(self.path), delete=False), "w", allowZip64=True
) as zout:
with zipfile.ZipFile(self.path, mode="r") as zin:
for item in zin.infolist():
buffer = zin.read(item.filename)
if item.filename not in exclude_list:
@ -273,12 +266,35 @@ class ZipArchiver(UnknownArchiver):
# preserve the old comment
zout.comment = zin.comment
except (zipfile.BadZipfile, OSError):
logger.exception("Error rebuilding zip file: %s", self.path)
# replace with the new file
os.remove(self.path)
os.rename(tmp_name, self.path)
# replace with the new file
os.remove(self.path)
shutil.move(cast(str, zout.filename), self.path)
except (zipfile.BadZipfile, OSError) as e:
logger.error("Error rebuilding zip file [%s]: %s", e, self.path)
return False
return True
def copy_from_archive(self, other_archive: UnknownArchiver) -> bool:
"""Replace the current zip with one copied from another archive"""
try:
with zipfile.ZipFile(self.path, mode="w", allowZip64=True) as zout:
for filename in other_archive.get_filename_list():
data = other_archive.read_file(filename)
if data is not None:
zout.writestr(filename, data)
# preserve the old comment
comment = other_archive.get_comment()
if comment is not None:
if not self.write_zip_comment(self.path, comment):
return False
except Exception as e:
logger.error("Error while copying to zip archive [%s]: from %s to %s", e, other_archive.path, self.path)
return False
else:
return True
def write_zip_comment(self, filename: pathlib.Path | str, comment: str) -> bool:
"""
@ -295,18 +311,16 @@ class ZipArchiver(UnknownArchiver):
file_length = statinfo.st_size
try:
with open(filename, "r+b") as fo:
with open(filename, mode="r+b") as fo:
# the starting position, relative to EOF
pos = -4
found = False
# walk backwards to find the "End of Central Directory" record
while (not found) and (-pos != file_length):
# seek, relative to EOF
fo.seek(pos, 2)
value = fo.read(4)
# look for the end of central directory signature
@ -334,29 +348,9 @@ class ZipArchiver(UnknownArchiver):
fo.write(comment.encode("utf-8"))
fo.truncate()
else:
raise Exception("Failed to write comment to zip file!")
except Exception:
logger.exception("Writing comment to %s failed", filename)
return False
else:
return True
def copy_from_archive(self, other_archive: UnknownArchiver) -> bool:
"""Replace the current zip with one copied from another archive"""
try:
with zipfile.ZipFile(self.path, "w", allowZip64=True) as zout:
for fname in other_archive.get_filename_list():
data = other_archive.read_file(fname)
if data is not None:
zout.writestr(fname, data)
# preserve the old comment
comment = other_archive.get_comment()
if comment is not None:
if not self.write_zip_comment(self.path, comment):
return False
except Exception:
logger.exception("Error while copying to %s", self.path)
raise Exception("Could not find the End of Central Directory record!")
except Exception as e:
logger.error("Error writing comment to zip archive [%s]: %s", e, self.path)
return False
else:
return True
@ -365,15 +359,10 @@ class ZipArchiver(UnknownArchiver):
class RarArchiver(UnknownArchiver):
"""RAR implementation"""
devnull = None
def __init__(self, path: pathlib.Path | str, rar_exe_path: str) -> None:
self.path = pathlib.Path(path)
def __init__(self, path: pathlib.Path | str, rar_exe_path: str = "rar") -> None:
super().__init__(path)
self.rar_exe_path = rar_exe_path
if RarArchiver.devnull is None:
RarArchiver.devnull = open(os.devnull, "bw")
# windows only, keeps the cmd.exe from popping up
if platform.system() == "Windows":
self.startupinfo = subprocess.STARTUPINFO() # type: ignore
@ -386,30 +375,36 @@ class RarArchiver(UnknownArchiver):
return str(rarc.comment) if rarc else ""
def set_comment(self, comment: str) -> bool:
if self.rar_exe_path:
if rar_support and self.rar_exe_path:
try:
# write comment to temp file
tmp_fd, tmp_name = tempfile.mkstemp()
with os.fdopen(tmp_fd, "wb") as f:
f.write(comment.encode("utf-8"))
with tempfile.NamedTemporaryFile() as tmp_file:
tmp_file.write(comment.encode("utf-8"))
working_dir = os.path.dirname(os.path.abspath(self.path))
working_dir = os.path.dirname(os.path.abspath(self.path))
# use external program to write comment to Rar archive
proc_args = [self.rar_exe_path, "c", "-w" + working_dir, "-c-", "-z" + tmp_name, str(self.path)]
subprocess.call(
proc_args,
startupinfo=self.startupinfo,
stdout=RarArchiver.devnull,
stdin=RarArchiver.devnull,
stderr=RarArchiver.devnull,
)
# use external program to write comment to Rar archive
proc_args = [
self.rar_exe_path,
"c",
f"-w{working_dir}",
"-c-",
f"-z{tmp_file.name}",
str(self.path),
]
subprocess.run(
proc_args,
startupinfo=self.startupinfo,
stdout=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=True,
)
if platform.system() == "Darwin":
time.sleep(1)
os.remove(tmp_name)
except (subprocess.CalledProcessError, OSError):
logger.exception("Failed to set a comment")
except (subprocess.CalledProcessError, OSError) as e:
logger.exception("Error writing comment to rar archive [%s]: %s", e, self.path)
return False
else:
return True
@ -431,7 +426,7 @@ class RarArchiver(UnknownArchiver):
if entries[0][0].file_size != len(entries[0][1]):
logger.info(
"read_file(): [file is not expected size: %d vs %d] %s:%s [attempt # %d]",
"Error reading rar archive [file is not expected size: %d vs %d] %s :: %s :: tries #%d",
entries[0][0].file_size,
len(entries[0][1]),
self.path,
@ -439,19 +434,22 @@ class RarArchiver(UnknownArchiver):
tries,
)
continue
except OSError as e:
logger.error("read_file(): [%s] %s:%s attempt #%d", e, self.path, archive_file, tries)
logger.error("Error reading rar archive [%s]: %s :: %s :: tries #%d", e, self.path, archive_file, tries)
time.sleep(1)
except Exception as e:
logger.error(
"Unexpected exception in read_file(): [%s] for %s:%s attempt #%d", e, self.path, archive_file, tries
"Unexpected exception reading rar archive [%s]: %s :: %s :: tries #%d",
e,
self.path,
archive_file,
tries,
)
break
else:
# Success. Entries is a list of of tuples: ( rarinfo, filedata)
if tries > 1:
logger.info("Attempted read_files() {%d} times", tries)
if len(entries) == 1:
return entries[0][1]
@ -459,59 +457,52 @@ class RarArchiver(UnknownArchiver):
raise OSError
def write_file(self, archive_file: str, data: bytes) -> bool:
def remove_file(self, archive_file: str) -> bool:
if self.rar_exe_path:
try:
tmp_folder = tempfile.mkdtemp()
# use external program to remove file from Rar archive
result = subprocess.run(
[self.rar_exe_path, "d", "-c-", self.path, archive_file],
startupinfo=self.startupinfo,
stdout=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
tmp_file = os.path.join(tmp_folder, archive_file)
working_dir = os.path.dirname(os.path.abspath(self.path))
# TODO: will this break if 'archive_file' is in a subfolder. i.e. "foo/bar.txt"
# will need to create the subfolder above, I guess...
with open(tmp_file, "wb") as f:
f.write(data)
# use external program to write file to Rar archive
subprocess.call(
[self.rar_exe_path, "a", "-w" + working_dir, "-c-", "-ep", self.path, tmp_file],
startupinfo=self.startupinfo,
stdout=RarArchiver.devnull,
stdin=RarArchiver.devnull,
stderr=RarArchiver.devnull,
if platform.system() == "Darwin":
time.sleep(1)
if result.returncode != 0:
logger.error(
"Error removing file from rar archive [exitcode: %d]: %s :: %s",
result.returncode,
self.path,
archive_file,
)
if platform.system() == "Darwin":
time.sleep(1)
os.remove(tmp_file)
os.rmdir(tmp_folder)
except (subprocess.CalledProcessError, OSError) as e:
logger.info(str(e))
logger.exception("Failed write %s to rar archive", archive_file)
return False
else:
return True
return True
else:
return False
def remove_file(self, archive_file: str) -> bool:
def write_file(self, archive_file: str, data: bytes) -> bool:
if self.rar_exe_path:
try:
# use external program to remove file from Rar archive
subprocess.call(
[self.rar_exe_path, "d", "-c-", self.path, archive_file],
startupinfo=self.startupinfo,
stdout=RarArchiver.devnull,
stdin=RarArchiver.devnull,
stderr=RarArchiver.devnull,
)
archive_path = pathlib.PurePosixPath(archive_file)
archive_name = archive_path.name
archive_parent = str(archive_path.parent).lstrip("./")
if platform.system() == "Darwin":
time.sleep(1)
except (subprocess.CalledProcessError, OSError):
logger.exception("Failed to remove %s from rar archive", archive_file)
# use external program to write file to Rar archive
result = subprocess.run(
[self.rar_exe_path, "a", f"-si{archive_name}", f"-ap{archive_parent}", "-c-", "-ep", self.path],
input=data,
startupinfo=self.startupinfo,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
if platform.system() == "Darwin":
time.sleep(1)
if result.returncode != 0:
logger.error(
"Error writing rar archive [exitcode: %d]: %s :: %s", result.returncode, self.path, archive_file
)
return False
else:
return True
@ -521,27 +512,65 @@ class RarArchiver(UnknownArchiver):
def get_filename_list(self) -> list[str]:
rarc = self.get_rar_obj()
tries = 0
# while tries < 7:
namelist = []
if rar_support and rarc:
while tries < 7:
try:
tries = tries + 1
namelist = []
for item in rarc.infolist():
if item.file_size != 0:
namelist.append(item.filename)
except OSError as e:
logger.error("Error listing files in rar archive [%s]: %s :: attempt #%d", e, self.path, tries)
time.sleep(1)
else:
return namelist
return []
def copy_from_archive(self, other_archive: UnknownArchiver) -> bool:
"""Replace the current archive with one copied from another archive"""
try:
tries = tries + 1
for item in rarc.infolist() if rarc else None:
if item.file_size != 0:
namelist.append(item.filename)
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_path = pathlib.Path(tmp_dir)
rar_cwd = tmp_path / "rar"
rar_cwd.mkdir(exist_ok=True)
rar_path = (tmp_path / self.path.name).with_suffix(".rar")
except OSError as e:
logger.error(f"get_filename_list(): [{e}] {self.path} attempt #{tries}".format(str(e), self.path, tries))
time.sleep(1)
for filename in other_archive.get_filename_list():
(rar_cwd / filename).parent.mkdir(exist_ok=True, parents=True)
data = other_archive.read_file(filename)
if data is not None:
with open(rar_cwd / filename, mode="w+b") as tmp_file:
tmp_file.write(data)
result = subprocess.run(
[self.rar_exe_path, "a", "-r", "-c-", str(rar_path.absolute()), "."],
cwd=rar_cwd.absolute(),
startupinfo=self.startupinfo,
stdout=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
if result.returncode != 0:
logger.error("Error while copying to rar archive [exitcode: %d]: %s", result.returncode, self.path)
return False
return namelist
shutil.move(rar_path, self.path)
except Exception as e:
logger.exception("Error while copying to rar archive [%s]: from %s to %s", e, other_archive.path, self.path)
return False
else:
return True
def get_rar_obj(self) -> rarfile.RarFile | None:
try:
rarc = rarfile.RarFile(str(self.path))
except OSError as e:
logger.error("getRARObj(): [%s] %s", e, self.path)
else:
return rarc
if rar_support:
try:
rarc = rarfile.RarFile(str(self.path))
except (OSError, rarfile.RarFileError) as e:
logger.error("Unable to get rar object [%s]: %s", e, self.path)
else:
return rarc
return None
@ -551,63 +580,81 @@ class FolderArchiver(UnknownArchiver):
"""Folder implementation"""
def __init__(self, path: pathlib.Path | str) -> None:
self.path = pathlib.Path(path)
super().__init__(path)
self.comment_file_name = "ComicTaggerFolderComment.txt"
def get_comment(self) -> str:
return self.read_file(self.comment_file_name).decode("utf-8")
try:
return self.read_file(self.comment_file_name).decode("utf-8")
except OSError:
return ""
def set_comment(self, comment: str) -> bool:
return self.write_file(self.comment_file_name, comment.encode("utf-8"))
if (self.path / self.comment_file_name).exists() or comment:
return self.write_file(self.comment_file_name, comment.encode("utf-8"))
return True
def read_file(self, archive_file: str) -> bytes:
data = bytes()
fname = os.path.join(self.path, archive_file)
try:
with open(fname, "rb") as f:
with open(self.path / archive_file, mode="rb") as f:
data = f.read()
except OSError:
logger.exception("Failed to read: %s", fname)
except OSError as e:
logger.error("Error reading folder archive [%s]: %s :: %s", e, self.path, archive_file)
raise
return data
def write_file(self, archive_file: str, data: bytes) -> bool:
fname = os.path.join(self.path, archive_file)
def remove_file(self, archive_file: str) -> bool:
try:
with open(fname, "wb") as f:
f.write(data)
except OSError:
logger.exception("Failed to write: %s", fname)
os.remove(self.path / archive_file)
except OSError as e:
logger.error("Error removing file for folder archive [%s]: %s :: %s", e, self.path, archive_file)
return False
else:
return True
def remove_file(self, archive_file: str) -> bool:
fname = os.path.join(self.path, archive_file)
def write_file(self, archive_file: str, data: bytes) -> bool:
logger.error("Fuck this: %s", archive_file)
try:
os.remove(fname)
except OSError:
logger.exception("Failed to remove: %s", fname)
file_path = self.path / archive_file
file_path.parent.mkdir(exist_ok=True, parents=True)
with open(self.path / archive_file, mode="wb") as f:
f.write(data)
except OSError as e:
logger.error("Error writing folder archive [%s]: %s :: %s", e, self.path, archive_file)
return False
else:
return True
def get_filename_list(self) -> list[str]:
return self.list_files(self.path)
filenames = []
try:
for root, dirs, files in os.walk(self.path):
for f in files:
filenames.append(os.path.relpath(os.path.join(root, f), self.path))
return filenames
except OSError as e:
logger.error("Error listing files in folder archive [%s]: %s", e, self.path)
return []
def list_files(self, folder: pathlib.Path | str) -> list[str]:
def copy_from_archive(self, other_archive: UnknownArchiver) -> bool:
"""Replace the current zip with one copied from another archive"""
try:
for filename in other_archive.get_filename_list():
data = other_archive.read_file(filename)
if data is not None:
self.write_file(filename, data)
itemlist = []
for item in os.listdir(folder):
itemlist.append(item)
if os.path.isdir(item):
itemlist.extend(self.list_files(os.path.join(folder, item)))
return itemlist
# preserve the old comment
comment = other_archive.get_comment()
if comment is not None:
if not self.set_comment(comment):
return False
except Exception:
logger.exception("Error while copying archive from %s to %s", other_archive.path, self.path)
return False
else:
return True
class ComicArchive:
@ -619,16 +666,16 @@ class ComicArchive:
def __init__(
self,
path: pathlib.Path | str,
rar_exe_path: str = "",
rar_exe_path: str = "rar",
default_image_path: pathlib.Path | str | None = None,
) -> None:
self.cbi_md: GenericMetadata | None = None
self.cix_md: GenericMetadata | None = None
self.comet_filename: str | None = None
self.comet_md: GenericMetadata | None = None
self.has__cbi: bool | None = None
self.has__cix: bool | None = None
self.has__comet: bool | None = None
self._has_cbi: bool | None = None
self._has_cix: bool | None = None
self._has_comet: bool | None = None
self.path = pathlib.Path(path)
self.page_count: int | None = None
self.page_list: list[str] = []
@ -666,18 +713,20 @@ class ComicArchive:
self.archive_type = self.ArchiveType.Rar
self.archiver = RarArchiver(self.path, rar_exe_path=self.rar_exe_path)
if not ComicArchive.logo_data:
fname = self.default_image_path
if fname:
with open(fname, "rb") as fd:
ComicArchive.logo_data = fd.read()
elif self.folder_test():
self.archive_type = self.ArchiveType.Folder
self.archiver = FolderArchiver(self.path)
if not ComicArchive.logo_data and self.default_image_path:
with open(self.default_image_path, mode="rb") as fd:
ComicArchive.logo_data = fd.read()
def reset_cache(self) -> None:
"""Clears the cached data"""
self.has__cix = None
self.has__cbi = None
self.has__comet = None
self._has_cix = None
self._has_cbi = None
self._has_comet = None
self.comet_filename = None
self.page_count = None
self.page_list = []
@ -704,6 +753,9 @@ class ComicArchive:
return rarfile.is_rarfile(str(self.path))
return False
def folder_test(self) -> bool:
return self.path.is_dir()
def is_sevenzip(self) -> bool:
return self.archive_type == self.ArchiveType.SevenZip
@ -742,7 +794,9 @@ class ComicArchive:
return self.is_writable()
def seems_to_be_a_comic_archive(self) -> bool:
if (self.is_zip() or self.is_rar() or self.is_sevenzip()) and (self.get_number_of_pages() > 0):
if (self.is_zip() or self.is_rar() or self.is_sevenzip() or self.is_folder()) and (
self.get_number_of_pages() > 0
):
return True
return False
@ -795,7 +849,7 @@ class ComicArchive:
try:
image_data = self.archiver.read_file(filename) or bytes()
except OSError:
logger.exception("Error reading in page. Substituting logo page.")
logger.exception("Error reading in page %d. Substituting logo page.", index)
image_data = ComicArchive.logo_data
return image_data
@ -834,7 +888,7 @@ class ComicArchive:
length_buckets[length] = 1
# sort by most common
sorted_buckets = sorted(iter(length_buckets.items()), key=lambda k_v: (k_v[1], k_v[0]), reverse=True)
sorted_buckets = sorted(length_buckets.items(), key=lambda tup: (tup[1], tup[0]), reverse=True)
# statistical mode occurrence is first
mode_length = sorted_buckets[0][0]
@ -905,14 +959,14 @@ class ComicArchive:
return self.archiver.get_comment()
def has_cbi(self) -> bool:
if self.has__cbi is None:
if self._has_cbi is None:
if not self.seems_to_be_a_comic_archive():
self.has__cbi = False
self._has_cbi = False
else:
comment = self.archiver.get_comment()
self.has__cbi = ComicBookInfo().validate_string(comment)
self._has_cbi = ComicBookInfo().validate_string(comment)
return self.has__cbi
return self._has_cbi
def write_cbi(self, metadata: GenericMetadata) -> bool:
if metadata is not None:
@ -920,7 +974,7 @@ class ComicArchive:
cbi_string = ComicBookInfo().string_from_metadata(metadata)
write_success = self.archiver.set_comment(cbi_string)
if write_success:
self.has__cbi = True
self._has_cbi = True
self.cbi_md = metadata
self.reset_cache()
return write_success
@ -931,7 +985,7 @@ class ComicArchive:
if self.has_cbi():
write_success = self.archiver.set_comment("")
if write_success:
self.has__cbi = False
self._has_cbi = False
self.cbi_md = None
self.reset_cache()
return write_success
@ -963,7 +1017,7 @@ class ComicArchive:
try:
raw_cix = self.archiver.read_file(self.ci_xml_filename) or b""
except OSError as e:
logger.error("Error reading in raw CIX!: %s", e)
logger.error("Error reading in raw CIX! for %s: %s", self.path, e)
raw_cix = bytes()
return raw_cix
@ -974,7 +1028,7 @@ class ComicArchive:
cix_string = ComicInfoXml().string_from_metadata(metadata, xml=raw_cix)
write_success = self.archiver.write_file(self.ci_xml_filename, cix_string.encode("utf-8"))
if write_success:
self.has__cix = True
self._has_cix = True
self.cix_md = metadata
self.reset_cache()
return write_success
@ -985,22 +1039,22 @@ class ComicArchive:
if self.has_cix():
write_success = self.archiver.remove_file(self.ci_xml_filename)
if write_success:
self.has__cix = False
self._has_cix = False
self.cix_md = None
self.reset_cache()
return write_success
return True
def has_cix(self) -> bool:
if self.has__cix is None:
if self._has_cix is None:
if not self.seems_to_be_a_comic_archive():
self.has__cix = False
self._has_cix = False
elif self.ci_xml_filename in self.archiver.get_filename_list():
self.has__cix = True
self._has_cix = True
else:
self.has__cix = False
return self.has__cix
self._has_cix = False
return self._has_cix
def read_comet(self) -> GenericMetadata:
if self.comet_md is None:
@ -1029,7 +1083,6 @@ class ComicArchive:
def read_raw_comet(self) -> str:
raw_comet = ""
if not self.has_comet():
logger.info("%s doesn't have CoMet data!", self.path)
raw_comet = ""
else:
try:
@ -1055,7 +1108,7 @@ class ComicArchive:
comet_string = CoMet().string_from_metadata(metadata)
write_success = self.archiver.write_file(cast(str, self.comet_filename), comet_string.encode("utf-8"))
if write_success:
self.has__comet = True
self._has_comet = True
self.comet_md = metadata
self.reset_cache()
return write_success
@ -1066,17 +1119,17 @@ class ComicArchive:
if self.has_comet():
write_success = self.archiver.remove_file(cast(str, self.comet_filename))
if write_success:
self.has__comet = False
self._has_comet = False
self.comet_md = None
self.reset_cache()
return write_success
return True
def has_comet(self) -> bool:
if self.has__comet is None:
self.has__comet = False
if self._has_comet is None:
self._has_comet = False
if not self.seems_to_be_a_comic_archive():
return self.has__comet
return self._has_comet
# look at all xml files in root, and search for CoMet data, get first
for n in self.archiver.get_filename_list():
@ -1088,20 +1141,20 @@ class ComicArchive:
if d:
data = d.decode("utf-8")
except Exception as e:
logger.warning("Error reading in Comet XML for validation!: %s", e)
logger.warning("Error reading in Comet XML for validation! from %s: %s", self.path, e)
if CoMet().validate_string(data):
# since we found it, save it!
self.comet_filename = n
self.has__comet = True
self._has_comet = True
break
return self.has__comet
return self._has_comet
def apply_archive_info_to_metadata(self, md: GenericMetadata, calc_page_sizes: bool = False) -> None:
md.page_count = self.get_number_of_pages()
if calc_page_sizes:
for p in md.pages:
for index, p in enumerate(md.pages):
idx = int(p["Image"])
if pil_available:
if "ImageSize" not in p or "ImageHeight" not in p or "ImageWidth" not in p:
@ -1118,7 +1171,7 @@ class ComicArchive:
p["ImageHeight"] = str(h)
p["ImageWidth"] = str(w)
except Exception as e:
logger.warning("decoding image failed: %s", e)
logger.warning("Error decoding image [%s] %s :: image %s", e, self.path, index)
p["ImageSize"] = str(len(data))
else:

View File

@ -115,7 +115,6 @@ class ComicBookInfo:
return metadata
def string_from_metadata(self, metadata: GenericMetadata) -> str:
cbi_container = self.create_json_dictionary(metadata)
return json.dumps(cbi_container)
@ -165,7 +164,6 @@ class ComicBookInfo:
return cbi_container
def write_to_external_file(self, filename: str, metadata: GenericMetadata) -> None:
cbi_container = self.create_json_dictionary(metadata)
with open(filename, "w", encoding="utf-8") as f:

View File

@ -268,11 +268,9 @@ class ComicInfoXml:
return md
def write_to_external_file(self, filename: str, metadata: GenericMetadata, xml: bytes = b"") -> None:
tree = self.convert_metadata_to_xml(self, metadata, xml)
tree.write(filename, encoding="utf-8", xml_declaration=True)
def read_from_external_file(self, filename: str) -> GenericMetadata:
tree = ET.parse(filename)
return self.convert_xml_to_metadata(tree)

View File

@ -20,6 +20,7 @@ import argparse
import json
import logging
import os
import shutil
import sys
from pprint import pprint
@ -507,7 +508,7 @@ def process_file_cli(
if not opts.dryrun:
# rename the file
os.makedirs(os.path.dirname(new_abs_path), 0o777, True)
os.rename(filename, new_abs_path)
shutil.move(filename, new_abs_path)
else:
suffix = " (dry-run, no change)"

View File

@ -17,6 +17,7 @@ from __future__ import annotations
import logging
import os
import shutil
from typing import TypedDict
from PyQt5 import QtCore, QtWidgets, uic
@ -199,7 +200,7 @@ class RenameWindow(QtWidgets.QDialog):
continue
os.makedirs(os.path.dirname(new_abs_path), 0o777, True)
os.rename(item["archive"].path, new_abs_path)
shutil.move(item["archive"].path, new_abs_path)
item["archive"].rename(new_abs_path)

View File

@ -1,20 +1,20 @@
from __future__ import annotations
import pathlib
import shutil
from os.path import dirname, join
import pytest
from comicapi.comicarchive import ComicArchive, rar_support
from comicapi.genericmetadata import GenericMetadata, md_test
import comicapi.comicarchive
import comicapi.genericmetadata
thisdir = dirname(__file__)
thisdir = pathlib.Path(__file__).parent
cbz_path = thisdir / "data" / "Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz"
@pytest.mark.xfail(not rar_support, reason="rar support")
@pytest.mark.xfail(not comicapi.comicarchive.rar_support, reason="rar support")
def test_getPageNameList():
ComicArchive.logo_data = b""
c = ComicArchive(join(thisdir, "data", "fake_cbr.cbr"))
c = comicapi.comicarchive.ComicArchive(thisdir / "data" / "fake_cbr.cbr")
pageNameList = c.get_page_name_list()
assert pageNameList == [
@ -28,40 +28,37 @@ def test_getPageNameList():
def test_set_default_page_list(tmpdir):
md = GenericMetadata()
md.overlay(md_test)
md = comicapi.genericmetadata.GenericMetadata()
md.overlay(comicapi.genericmetadata.md_test)
md.pages = []
md.set_default_page_list(len(md_test.pages))
md.set_default_page_list(len(comicapi.genericmetadata.md_test.pages))
assert isinstance(md.pages[0]["Image"], int)
def test_page_type_read():
c_path = join(thisdir, "data", "Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz")
c = ComicArchive(str(c_path))
c = comicapi.comicarchive.ComicArchive(cbz_path)
md = c.read_cix()
assert isinstance(md.pages[0]["Type"], str)
def test_metadat_read():
c = ComicArchive(
join(thisdir, "data", "Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz")
def test_metadata_read():
c = comicapi.comicarchive.ComicArchive(
thisdir / "data" / "Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz"
)
md = c.read_cix()
md_dict = md.__dict__
md_test_dict = md_test.__dict__
md_test_dict = comicapi.genericmetadata.md_test.__dict__
assert md_dict == md_test_dict
def test_save_cix(tmpdir):
comic_path = tmpdir.mkdir("cbz").join(
"Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz"
)
c_path = join(thisdir, "data", "Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz")
shutil.copy(c_path, comic_path)
comic_path = tmpdir.mkdir("cbz") / cbz_path.name
print(comic_path)
shutil.copy(cbz_path, comic_path)
c = ComicArchive(str(comic_path))
c = comicapi.comicarchive.ComicArchive(comic_path)
md = c.read_cix()
md.set_default_page_list(c.get_number_of_pages())
@ -71,13 +68,12 @@ def test_save_cix(tmpdir):
def test_page_type_save(tmpdir):
comic_path = tmpdir.mkdir("cbz").join(
"Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz"
)
c_path = join(thisdir, "data", "Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz")
shutil.copy(c_path, comic_path)
comic_path = tmpdir.mkdir("cbz") / cbz_path.name
print(comic_path)
c = ComicArchive(str(comic_path))
shutil.copy(cbz_path, comic_path)
c = comicapi.comicarchive.ComicArchive(comic_path)
md = c.read_cix()
t = md.pages[0]
t["Type"] = ""
@ -85,3 +81,45 @@ def test_page_type_save(tmpdir):
assert c.write_cix(md)
md = c.read_cix()
archivers = [
comicapi.comicarchive.ZipArchiver,
comicapi.comicarchive.SevenZipArchiver,
comicapi.comicarchive.FolderArchiver,
pytest.param(
comicapi.comicarchive.RarArchiver,
marks=pytest.mark.xfail(not comicapi.comicarchive.rar_support, reason="rar support"),
),
]
@pytest.mark.parametrize("archiver", archivers)
def test_copy_to_archive(archiver, tmpdir):
comic_path = tmpdir / cbz_path.with_suffix("").name
cbz = comicapi.comicarchive.ComicArchive(cbz_path)
archive = archiver(comic_path)
assert archive.copy_from_archive(cbz.archiver)
comic_archive = comicapi.comicarchive.ComicArchive(comic_path)
assert comic_archive.seems_to_be_a_comic_archive()
assert set(cbz.archiver.get_filename_list()) == set(comic_archive.archiver.get_filename_list())
md = comic_archive.read_cix()
md_dict = md.__dict__
md_test_dict = comicapi.genericmetadata.md_test.__dict__
assert md_dict == md_test_dict
md = comicapi.genericmetadata.GenericMetadata()
md.overlay(comicapi.genericmetadata.md_test)
md.series = "test"
assert comic_archive.write_cix(md)
test_md = comic_archive.read_cix()
md_dict = md.__dict__
test_md_dict = test_md.__dict__
assert md_dict == test_md_dict