This commit is contained in:
lordwelch 2022-05-31 15:46:12 -07:00
parent 61d0db6257
commit 8a9935e1a9
20 changed files with 1338 additions and 221 deletions

View File

@ -38,11 +38,16 @@ repos:
rev: 22.3.0 rev: 22.3.0
hooks: hooks:
- id: black - id: black
args: [--line-length=120] - repo: https://github.com/PyCQA/autoflake
rev: v1.4
hooks:
- id: autoflake
args: [-i]
- repo: https://github.com/PyCQA/flake8 - repo: https://github.com/PyCQA/flake8
rev: 4.0.1 rev: 4.0.1
hooks: hooks:
- id: flake8 - id: flake8
additional_dependencies: [flake8-encodings, flake8-warnings, flake8-builtins, flake8-eradicate]
- repo: https://github.com/pre-commit/mirrors-mypy - repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.950 rev: v0.950
hooks: hooks:

View File

@ -206,8 +206,8 @@ class CoMet:
tree = ET.ElementTree(ET.fromstring(string)) tree = ET.ElementTree(ET.fromstring(string))
root = tree.getroot() root = tree.getroot()
if root.tag != "comet": if root.tag != "comet":
raise Exception return False
except: except ET.ParseError:
return False return False
return True return True

View File

@ -21,15 +21,12 @@ import pathlib
import platform import platform
import struct import struct
import subprocess import subprocess
import sys
import tarfile import tarfile
import tempfile import tempfile
import time import time
import zipfile import zipfile
from typing import cast from typing import cast
from typing import List from typing import IO
from typing import Optional
from typing import Union
import natsort import natsort
import py7zr import py7zr
@ -48,7 +45,7 @@ try:
from unrar.cffi import rarfile from unrar.cffi import rarfile
rar_support = True rar_support = True
except: except ImportError:
rar_support = False rar_support = False
try: try:
@ -61,9 +58,7 @@ except ImportError:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if not pil_available: if not pil_available:
logger.exception("PIL unavalable") logger.error("PIL unavalable")
sys.path.insert(0, os.path.abspath("."))
class MetaDataStyle: class MetaDataStyle:
@ -86,8 +81,8 @@ class UnknownArchiver:
def set_comment(self, comment: str) -> bool: def set_comment(self, comment: str) -> bool:
return False return False
def read_file(self, archive_file: str) -> bytes | None: def read_file(self, archive_file: str) -> bytes:
return None raise NotImplementedError
def write_file(self, archive_file: str, data: bytes) -> bool: def write_file(self, archive_file: str, data: bytes) -> bool:
return False return False
@ -98,6 +93,12 @@ class UnknownArchiver:
def get_filename_list(self) -> list[str]: def get_filename_list(self) -> list[str]:
return [] return []
def rebuild(self, exclude_list: list[str]) -> bool:
return False
def copy_from_archive(self, other_archive: UnknownArchiver) -> bool:
return False
class ZipArchiver(UnknownArchiver): class ZipArchiver(UnknownArchiver):
@ -112,30 +113,21 @@ class ZipArchiver(UnknownArchiver):
return comment return comment
def set_comment(self, comment: str) -> bool: def set_comment(self, comment: str) -> bool:
with zipfile.ZipFile(self.path, "a") as zf: with zipfile.ZipFile(self.path, mode="a") as zf:
zf.comment = bytes(comment, "utf-8") zf.comment = bytes(comment, "utf-8")
return True return True
def read_file(self, archive_file: str) -> bytes: def read_file(self, archive_file: str) -> bytes:
with zipfile.ZipFile(self.path, "r") as zf: with zipfile.ZipFile(self.path, mode="r") as zf:
try: try:
data = zf.read(archive_file) data = zf.read(archive_file)
except zipfile.BadZipfile as e:
logger.error("bad zipfile [%s]: %s :: %s", e, self.path, archive_file)
raise OSError from e
except Exception as e: except Exception as e:
logger.error("bad zipfile [%s]: %s :: %s", e, self.path, archive_file) logger.error("bad zipfile [%s]: %s :: %s", e, self.path, archive_file)
raise OSError from e raise
return data return data
def remove_file(self, archive_file: str) -> bool: def remove_file(self, archive_file: str) -> bool:
try: return self.rebuild([archive_file])
self.rebuild_zip_file([archive_file])
except:
logger.exception("Failed to remove %s from zip archive", archive_file)
return False
else:
return True
def write_file(self, archive_file: str, data: bytes) -> bool: def write_file(self, archive_file: str, data: bytes) -> bool:
# At the moment, no other option but to rebuild the whole # At the moment, no other option but to rebuild the whole
@ -144,7 +136,7 @@ class ZipArchiver(UnknownArchiver):
try: try:
files = self.get_filename_list() files = self.get_filename_list()
if archive_file in files: if archive_file in files:
self.rebuild_zip_file([archive_file]) self.rebuild([archive_file])
# now just add the archive file as a new one # now just add the archive file as a new one
with zipfile.ZipFile(self.path, mode="a", allowZip64=True, compression=zipfile.ZIP_DEFLATED) as zf: with zipfile.ZipFile(self.path, mode="a", allowZip64=True, compression=zipfile.ZIP_DEFLATED) as zf:
@ -156,24 +148,23 @@ class ZipArchiver(UnknownArchiver):
def get_filename_list(self) -> list[str]: def get_filename_list(self) -> list[str]:
try: try:
with zipfile.ZipFile(self.path, "r") as zf: with zipfile.ZipFile(self.path, mode="r") as zf:
namelist = zf.namelist() namelist = zf.namelist()
return namelist return namelist
except Exception as e: except Exception as e:
logger.error("Unable to get zipfile list [%s]: %s", e, self.path) logger.error("Unable to get zipfile list [%s]: %s", e, self.path)
return [] return []
def rebuild_zip_file(self, exclude_list: list[str]) -> None: def rebuild(self, exclude_list: list[str]) -> bool:
"""Zip helper func """Zip helper func
This recompresses the zip archive, without the files in the exclude_list This recompresses the zip archive, without the files in the exclude_list
""" """
tmp_fd, tmp_name = tempfile.mkstemp(dir=os.path.dirname(self.path))
os.close(tmp_fd)
try: try:
with zipfile.ZipFile(self.path, "r") as zin: with zipfile.ZipFile(
with zipfile.ZipFile(tmp_name, "w", allowZip64=True) as zout: tempfile.NamedTemporaryFile(dir=os.path.dirname(self.path), delete=False), "w", allowZip64=True
) as zout:
with zipfile.ZipFile(self.path, mode="r") as zin:
for item in zin.infolist(): for item in zin.infolist():
buffer = zin.read(item.filename) buffer = zin.read(item.filename)
if item.filename not in exclude_list: if item.filename not in exclude_list:
@ -181,12 +172,14 @@ class ZipArchiver(UnknownArchiver):
# preserve the old comment # preserve the old comment
zout.comment = zin.comment zout.comment = zin.comment
except Exception:
logger.exception("Error rebuilding zip file: %s", self.path)
# replace with the new file # replace with the new file
os.remove(self.path) os.remove(self.path)
os.rename(tmp_name, self.path) os.rename(cast(str, zout.filename), self.path)
except (OSError, zipfile.BadZipfile):
logger.exception("Error rebuilding zip file")
return False
return True
def write_zip_comment(self, filename: pathlib.Path | str, comment: str) -> bool: def write_zip_comment(self, filename: pathlib.Path | str, comment: str) -> bool:
""" """
@ -203,7 +196,7 @@ class ZipArchiver(UnknownArchiver):
file_length = statinfo.st_size file_length = statinfo.st_size
try: try:
with open(filename, "r+b") as fo: with open(filename, mode="r+b") as fo:
# the starting position, relative to EOF # the starting position, relative to EOF
pos = -4 pos = -4
@ -252,7 +245,7 @@ class ZipArchiver(UnknownArchiver):
def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: def copy_from_archive(self, other_archive: UnknownArchiver) -> bool:
"""Replace the current zip with one copied from another archive""" """Replace the current zip with one copied from another archive"""
try: try:
with zipfile.ZipFile(self.path, "w", allowZip64=True) as zout: with zipfile.ZipFile(self.path, mode="w", allowZip64=True) as zout:
for fname in other_archive.get_filename_list(): for fname in other_archive.get_filename_list():
data = other_archive.read_file(fname) data = other_archive.read_file(fname)
if data is not None: if data is not None:
@ -264,7 +257,7 @@ class ZipArchiver(UnknownArchiver):
if not self.write_zip_comment(self.path, comment): if not self.write_zip_comment(self.path, comment):
return False return False
except Exception: except Exception:
logger.exception("Error while copying to %s", self.path) logger.exception("Error while copying archive from %s to %s", other_archive.path, self.path)
return False return False
else: else:
return True return True
@ -272,103 +265,110 @@ class ZipArchiver(UnknownArchiver):
class TarArchiver(UnknownArchiver): class TarArchiver(UnknownArchiver):
def __init__(self, path: pathlib.Path | str) -> None: def __init__(self, path: pathlib.Path | str) -> None:
self.path = path self.path = pathlib.Path(path)
# tar archives take filenames literally `./ComicInfo.xml` is not `ComicInfo.xml`
self.path_norm: dict[str, str] = {}
def get_comment(self) -> str: def get_comment(self) -> str:
return comment return ""
def set_comment(self, comment: str) -> bool: def set_comment(self, comment: str) -> bool:
return self.writeTarComment(self.path, comment) return False
def read_file(self, archive_file: str) -> bytes | None: def read_file(self, archive_file: str) -> bytes:
tf = tarfile.TarFile(self.path, "r") with tarfile.TarFile(self.path, mode="r", encoding="utf-8") as tf:
archive_file = str(pathlib.PurePosixPath(archive_file))
if not self.path_norm:
self.get_filename_list()
if archive_file in self.path_norm:
archive_file = self.path_norm[archive_file]
try: try:
data = tf.extractfile(archive_file).read() with cast(IO[bytes], tf.extractfile(archive_file)) as file:
except tarfile.TarError as e: data = file.read()
errMsg = f"bad tarfile [{e}]: {self.path} :: {archive_file}" except (tarfile.TarError, OSError) as e:
logger.info(errMsg) logger.info(f"bad tarfile [{e}]: {self.path} :: {archive_file}")
tf.close() raise
raise OSError
except Exception as e:
tf.close()
errMsg = f"bad tarfile [{e}]: {self.path} :: {archive_file}"
logger.info(errMsg)
raise OSError
finally:
tf.close()
return data return data
def remove_file(self, archive_file: str) -> bool: def remove_file(self, archive_file: str) -> bool:
try: return self.rebuild([archive_file])
self.rebuild_tar_file([archive_file])
except:
return False
else:
return True
def write_file(self, archive_file: str, data: bytes) -> bool: def write_file(self, archive_file: str, data: bytes) -> bool:
# At the moment, no other option but to rebuild the whole # At the moment, no other option but to rebuild the whole
# zip archive w/o the indicated file. Very sucky, but maybe # zip archive w/o the indicated file. Very sucky, but maybe
# another solution can be found # another solution can be found
try: try:
self.rebuild_tar_file([archive_file]) self.rebuild([archive_file])
archive_file = str(pathlib.PurePosixPath(archive_file))
# now just add the archive file as a new one # now just add the archive file as a new one
tf = tarfile.Tarfile(self.path, mode="a") with tarfile.TarFile(self.path, mode="a") as tf:
tf.writestr(archive_file, data) tfi = tarfile.TarInfo(archive_file)
tf.close() tfi.size = len(data)
tfi.mtime = int(time.time())
tf.addfile(tfi, io.BytesIO(data))
return True return True
except: except Exception:
return False return False
def get_filename_list(self) -> list[str]: def get_filename_list(self) -> list[str]:
try: try:
tf = tarfile.TarFile(self.path, "r") tf = tarfile.TarFile(self.path, mode="r", encoding="utf-8")
namelist = tf.getnames() namelist = []
for name in tf.getnames():
new_name = pathlib.PurePosixPath(name)
namelist.append(str(new_name))
self.path_norm[str(new_name)] = name
tf.close() tf.close()
return namelist return namelist
except Exception as e: except (OSError, tarfile.TarError) as e:
errMsg = f"Unable to get tarfile list [{e}]: {self.path}" logger.info(f"Unable to get tarfile list [{e}]: {self.path}")
logger.info(errMsg)
return [] return []
# zip helper func def rebuild(self, exclude_list: list[str]) -> bool:
def rebuild_tar_file(self, exclude_list: list[str]) -> None:
"""Tar helper func """Tar helper func
This re-creates the tar archive without the files in the exclude list This re-creates the tar archive without the files in the exclude list
""" """
# generate temp file
tmp_fd, tmp_name = tempfile.mkstemp(dir=os.path.dirname(self.path))
os.close(tmp_fd)
try: try:
with tarfile.TarFile(self.path, "r") as tin: with tarfile.TarFile(
with tarfile.TarFile(tmp_name, "w") as tout: fileobj=tempfile.NamedTemporaryFile(dir=os.path.dirname(self.path), delete=False),
mode="w",
encoding="utf-8",
) as tout:
with tarfile.TarFile(self.path, mode="r", encoding="utf-8") as tin:
for item in tin.getmembers(): for item in tin.getmembers():
buffer = tin.extractfile(item) buffer = tin.extractfile(item)
if item.name not in exclude_list: if str(pathlib.PurePosixPath(item.name)) not in exclude_list:
tout.addfile(item, buffer) tout.addfile(item, buffer)
except Exception:
logger.exception("Error rebuilding tar file: %s", self.path)
# replace with the new file # replace with the new file
os.remove(self.path) os.remove(self.path)
os.rename(tmp_name, self.path) os.rename(str(tout.name), self.path)
except (OSError, tarfile.TarError):
logger.exception("Error rebuilding tar file")
return False
return True
def copy_from_archive(self, other_archive: UnknownArchiver) -> bool: def copy_from_archive(self, other_archive: UnknownArchiver) -> bool:
# Replace the current tar with one copied from another archive # Replace the current tar with one copied from another archive
try: try:
with zipfile.ZipFile(self.path, "w", allowZip64=True) as zout: with tarfile.TarFile(self.path, mode="w", encoding="utf-8") as tout:
for fname in other_archive.get_filename_list(): for fname in other_archive.get_filename_list():
data = other_archive.read_file(fname) data = other_archive.read_file(fname)
if data is not None: if data is not None:
tout.addfile(fname, data) # All archives should use `/` as the directory separator
tfi = tarfile.TarInfo(str(pathlib.PurePosixPath(fname)))
tfi.size = len(data)
tfi.mtime = int(time.time())
tout.addfile(tfi, io.BytesIO(data))
except Exception as e: except Exception:
logger.exception("Error while copying to %s", self.path) logger.exception("Error while copying archive from %s to %s", other_archive.path, self.path)
return False return False
else: else:
return True return True
@ -403,28 +403,22 @@ class SevenZipArchiver(UnknownArchiver):
return data return data
def remove_file(self, archive_file: str) -> bool: def remove_file(self, archive_file: str) -> bool:
try: return self.rebuild([archive_file])
self.rebuild_zip_file([archive_file])
except:
logger.exception("Failed to remove %s from 7zip archive", archive_file)
return False
else:
return True
def write_file(self, archive_file: str, data: bytes) -> bool: def write_file(self, archive_file: str, data: bytes) -> bool:
# At the moment, no other option but to rebuild the whole # At the moment, no other option but to rebuild the whole
# zip archive w/o the indicated file. Very sucky, but maybe # zip archive w/o the indicated file. Very sucky, but maybe
# another solution can be found # another solution can be found
try:
files = self.get_filename_list() files = self.get_filename_list()
if archive_file in files: if archive_file in files:
self.rebuild_zip_file([archive_file]) self.rebuild([archive_file])
try:
# now just add the archive file as a new one # now just add the archive file as a new one
with py7zr.SevenZipFile(self.path, "a") as zf: with py7zr.SevenZipFile(self.path, "a") as zf:
zf.writestr(data, archive_file) zf.writestr(data, archive_file)
return True return True
except: except (py7zr.Bad7zFile, OSError):
logger.exception("Writing zip file failed") logger.exception("Writing zip file failed")
return False return False
@ -434,42 +428,41 @@ class SevenZipArchiver(UnknownArchiver):
namelist: list[str] = zf.getnames() namelist: list[str] = zf.getnames()
return namelist return namelist
except Exception as e: except (py7zr.Bad7zFile, OSError) as e:
logger.error("Unable to get 7zip file list [%s]: %s", e, self.path) logger.error("Unable to get 7zip file list [%s]: %s", e, self.path)
return [] return []
def rebuild_zip_file(self, exclude_list: list[str]) -> None: def rebuild(self, exclude_list: list[str]) -> bool:
"""Zip helper func """Zip helper func
This recompresses the zip archive, without the files in the exclude_list This recompresses the zip archive, without the files in the exclude_list
""" """
tmp_fd, tmp_name = tempfile.mkstemp(dir=os.path.dirname(self.path))
os.close(tmp_fd)
try: try:
with py7zr.SevenZipFile(self.path, "r") as zin: with py7zr.SevenZipFile(self.path, mode="r") as zin:
targets = [f for f in zin.getnames() if f not in exclude_list] targets = [f for f in zin.getnames() if f not in exclude_list]
with py7zr.SevenZipFile(self.path, "r") as zin: with tempfile.NamedTemporaryFile(dir=os.path.dirname(self.path), delete=False) as tmp_file:
with py7zr.SevenZipFile(tmp_name, "w") as zout: with py7zr.SevenZipFile(tmp_file.file, mode="w") as zout:
with py7zr.SevenZipFile(self.path, mode="r") as zin:
for fname, bio in zin.read(targets).items(): for fname, bio in zin.read(targets).items():
zout.writef(bio, fname) zout.writef(bio, fname)
except Exception:
logger.exception("Error rebuilding 7zip file: %s", self.path)
# replace with the new file
os.remove(self.path) os.remove(self.path)
os.rename(tmp_name, self.path) os.rename(tmp_file.name, self.path)
except (py7zr.Bad7zFile, OSError):
logger.exception("Error rebuilding 7zip file: %s", self.path)
return False
return True
def copy_from_archive(self, otherArchive: UnknownArchiver) -> bool: def copy_from_archive(self, other_archive: UnknownArchiver) -> bool:
"""Replace the current zip with one copied from another archive""" """Replace the current zip with one copied from another archive"""
try: try:
with py7zr.SevenZipFile(self.path, "w") as zout: with py7zr.SevenZipFile(self.path, "w") as zout:
for fname in otherArchive.get_filename_list(): for fname in other_archive.get_filename_list():
data = otherArchive.read_file(fname) data = other_archive.read_file(fname)
if data is not None: if data is not None:
zout.writestr(data, fname) zout.writestr(data, fname)
except Exception as e: except Exception:
logger.exception("Error while copying to %s: %s", self.path, e) logger.exception("Error while copying archive from %s to %s", other_archive.path, self.path)
return False return False
else: else:
return True return True
@ -480,8 +473,8 @@ class RarArchiver(UnknownArchiver):
devnull = None devnull = None
def __init__(self, path: pathlib.Path | str, rar_exe_path: str) -> None: def __init__(self, path: pathlib.Path | str, rar_exe_path: str = "rar") -> None:
self.path = pathlib.Path(path) self.path: pathlib.Path = pathlib.Path(path)
self.rar_exe_path = rar_exe_path self.rar_exe_path = rar_exe_path
if RarArchiver.devnull is None: if RarArchiver.devnull is None:
@ -502,14 +495,20 @@ class RarArchiver(UnknownArchiver):
if self.rar_exe_path: if self.rar_exe_path:
try: try:
# write comment to temp file # write comment to temp file
tmp_fd, tmp_name = tempfile.mkstemp() with tempfile.NamedTemporaryFile() as tmp_file:
with os.fdopen(tmp_fd, "wb") as f: tmp_file.write(comment.encode("utf-8"))
f.write(comment.encode("utf-8"))
working_dir = os.path.dirname(os.path.abspath(self.path)) working_dir = os.path.dirname(os.path.abspath(self.path))
# use external program to write comment to Rar archive # use external program to write comment to Rar archive
proc_args = [self.rar_exe_path, "c", "-w" + working_dir, "-c-", "-z" + tmp_name, self.path] proc_args = [
self.rar_exe_path,
"c",
f"-w{working_dir}",
"-c-",
f"-z{tmp_file.name}",
str(self.path),
]
subprocess.call( subprocess.call(
proc_args, proc_args,
startupinfo=self.startupinfo, startupinfo=self.startupinfo,
@ -520,7 +519,6 @@ class RarArchiver(UnknownArchiver):
if platform.system() == "Darwin": if platform.system() == "Darwin":
time.sleep(1) time.sleep(1)
os.remove(tmp_name)
except Exception: except Exception:
logger.exception("Failed to set a comment") logger.exception("Failed to set a comment")
return False return False
@ -601,8 +599,7 @@ class RarArchiver(UnknownArchiver):
time.sleep(1) time.sleep(1)
os.remove(tmp_file) os.remove(tmp_file)
os.rmdir(tmp_folder) os.rmdir(tmp_folder)
except Exception as e: except Exception:
logger.info(str(e))
logger.exception("Failed write %s to rar archive", archive_file) logger.exception("Failed write %s to rar archive", archive_file)
return False return False
else: else:
@ -612,22 +609,20 @@ class RarArchiver(UnknownArchiver):
def remove_file(self, archive_file: str) -> bool: def remove_file(self, archive_file: str) -> bool:
if self.rar_exe_path: if self.rar_exe_path:
try:
# use external program to remove file from Rar archive # use external program to remove file from Rar archive
subprocess.call( result = subprocess.run(
[self.rar_exe_path, "d", "-c-", self.path, archive_file], [self.rar_exe_path, "d", "-c-", self.path, archive_file],
startupinfo=self.startupinfo, startupinfo=self.startupinfo,
stdout=RarArchiver.devnull, stdout=subprocess.DEVNULL,
stdin=RarArchiver.devnull, stdin=subprocess.DEVNULL,
stderr=RarArchiver.devnull, stderr=subprocess.DEVNULL,
) )
if platform.system() == "Darwin": if platform.system() == "Darwin":
time.sleep(1) time.sleep(1)
except: if result.returncode != 0:
logger.exception("Failed to remove %s from rar archive", archive_file) logger.exception("Failed to remove %s from rar archive", archive_file)
return False return False
else:
return True return True
else: else:
return False return False
@ -659,6 +654,33 @@ class RarArchiver(UnknownArchiver):
return None return None
def copy_from_archive(self, other_archive: UnknownArchiver) -> bool:
"""Replace the current zip with one copied from another archive"""
try:
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_path = pathlib.Path(tmp_dir)
rar_cwd = tmp_path / "rar"
rar_cwd.mkdir(exist_ok=True)
rar_path = (tmp_path / self.path.name).with_suffix(".rar")
for fname in other_archive.get_filename_list():
(rar_cwd / fname).parent.mkdir(exist_ok=True)
with open(rar_cwd / fname, mode="w+b") as tmp_file:
tmp_file.write(other_archive.read_file(fname))
subprocess.run(
[self.rar_exe_path, "a", f"-w{rar_cwd}", "-c-", rar_path, "."],
startupinfo=self.startupinfo,
stdout=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=True,
)
os.rename(rar_path, self.path)
except Exception:
logger.exception("Error while copying archive from %s to %s", other_archive.path, self.path)
return False
else:
return True
class FolderArchiver(UnknownArchiver): class FolderArchiver(UnknownArchiver):
@ -679,10 +701,11 @@ class FolderArchiver(UnknownArchiver):
data = bytes() data = bytes()
fname = os.path.join(self.path, archive_file) fname = os.path.join(self.path, archive_file)
try: try:
with open(fname, "rb") as f: with open(fname, mode="rb") as f:
data = f.read() data = f.read()
except OSError: except OSError:
logger.exception("Failed to read: %s", fname) logger.exception("Failed to read: %s", fname)
raise
return data return data
@ -690,9 +713,9 @@ class FolderArchiver(UnknownArchiver):
fname = os.path.join(self.path, archive_file) fname = os.path.join(self.path, archive_file)
try: try:
with open(fname, "wb") as f: with open(fname, mode="wb") as f:
f.write(data) f.write(data)
except: except OSError:
logger.exception("Failed to write: %s", fname) logger.exception("Failed to write: %s", fname)
return False return False
else: else:
@ -703,7 +726,7 @@ class FolderArchiver(UnknownArchiver):
fname = os.path.join(self.path, archive_file) fname = os.path.join(self.path, archive_file)
try: try:
os.remove(fname) os.remove(fname)
except: except OSError:
logger.exception("Failed to remove: %s", fname) logger.exception("Failed to remove: %s", fname)
return False return False
else: else:
@ -728,7 +751,7 @@ class ComicArchive:
logo_data = bytes() logo_data = bytes()
class ArchiveType: class ArchiveType:
Zip, Rar, SevenZip, Tar, Folder, Pdf, Unknown = list(range(6)) Zip, Rar, SevenZip, Tar, Folder, Pdf, Unknown = list(range(7))
def __init__( def __init__(
self, self,
@ -785,7 +808,7 @@ class ComicArchive:
self.archiver = RarArchiver(self.path, rar_exe_path=self.rar_exe_path) self.archiver = RarArchiver(self.path, rar_exe_path=self.rar_exe_path)
if not ComicArchive.logo_data and self.default_image_path: if not ComicArchive.logo_data and self.default_image_path:
with open(self.default_image_path, "rb") as fd: with open(self.default_image_path, mode="rb") as fd:
ComicArchive.logo_data = fd.read() ComicArchive.logo_data = fd.read()
def reset_cache(self) -> None: def reset_cache(self) -> None:
@ -815,13 +838,12 @@ class ComicArchive:
def zip_test(self) -> bool: def zip_test(self) -> bool:
return zipfile.is_zipfile(self.path) return zipfile.is_zipfile(self.path)
def tar_test(self): def tar_test(self) -> bool:
return tarfile.is_tarfile(self.path) return tarfile.is_tarfile(self.path)
def rar_test(self) -> bool: def rar_test(self) -> bool:
try: if rar_support:
return bool(rarfile.is_rarfile(str(self.path))) return rarfile.is_rarfile(str(self.path))
except:
return False return False
def is_sevenzip(self) -> bool: def is_sevenzip(self) -> bool:
@ -830,7 +852,7 @@ class ComicArchive:
def is_zip(self) -> bool: def is_zip(self) -> bool:
return self.archive_type == self.ArchiveType.Zip return self.archive_type == self.ArchiveType.Zip
def is_tar(self): def is_tar(self) -> bool:
return self.archive_type == self.ArchiveType.Tar return self.archive_type == self.ArchiveType.Tar
def is_rar(self) -> bool: def is_rar(self) -> bool:
@ -957,7 +979,7 @@ class ComicArchive:
length_buckets[length] = 1 length_buckets[length] = 1
# sort by most common # sort by most common
sorted_buckets = sorted(length_buckets.items(), key=lambda k, v: (v, k), reverse=True) sorted_buckets = sorted(length_buckets.items(), key=lambda tup: (tup[1], tup[0]), reverse=True)
# statistical mode occurrence is first # statistical mode occurrence is first
mode_length = sorted_buckets[0][0] mode_length = sorted_buckets[0][0]
@ -1086,7 +1108,7 @@ class ComicArchive:
try: try:
raw_cix = self.archiver.read_file(self.ci_xml_filename) or b"" raw_cix = self.archiver.read_file(self.ci_xml_filename) or b""
except OSError as e: except OSError as e:
logger.error("Error reading in raw CIX!: %s", e) logger.error("Error reading in raw CIX! for %s: %s", self.path, e)
raw_cix = bytes() raw_cix = bytes()
return raw_cix return raw_cix
@ -1155,12 +1177,9 @@ class ComicArchive:
logger.info("%s doesn't have CoMet data!", self.path) logger.info("%s doesn't have CoMet data!", self.path)
raw_comet = "" raw_comet = ""
try:
raw_bytes = self.archiver.read_file(cast(str, self.comet_filename)) raw_bytes = self.archiver.read_file(cast(str, self.comet_filename))
if raw_bytes: if raw_bytes:
raw_comet = raw_bytes.decode("utf-8") raw_comet = raw_bytes.decode("utf-8", "replace")
except:
logger.exception("Error reading in raw CoMet!")
return raw_comet return raw_comet
def write_comet(self, metadata: GenericMetadata) -> bool: def write_comet(self, metadata: GenericMetadata) -> bool:
@ -1211,7 +1230,7 @@ class ComicArchive:
if d: if d:
data = d.decode("utf-8") data = d.decode("utf-8")
except Exception as e: except Exception as e:
logger.warning("Error reading in Comet XML for validation!: %s", e) logger.warning("Error reading in Comet XML for validation! from %s: %s", self.path, e)
if CoMet().validate_string(data): if CoMet().validate_string(data):
# since we found it, save it! # since we found it, save it!
self.comet_filename = n self.comet_filename = n
@ -1241,7 +1260,7 @@ class ComicArchive:
p["ImageHeight"] = str(h) p["ImageHeight"] = str(h)
p["ImageWidth"] = str(w) p["ImageWidth"] = str(w)
except Exception as e: except Exception as e:
logger.warning("decoding image failed: %s", e) logger.warning("decoding image failed for %s: %s", self.path, e)
p["ImageSize"] = str(len(data)) p["ImageSize"] = str(len(data))
else: else:

View File

@ -21,7 +21,6 @@ from datetime import datetime
from typing import Any from typing import Any
from typing import Literal from typing import Literal
from typing import TypedDict from typing import TypedDict
from typing import Union
from comicapi import utils from comicapi import utils
from comicapi.genericmetadata import GenericMetadata from comicapi.genericmetadata import GenericMetadata
@ -126,7 +125,7 @@ class ComicBookInfo:
try: try:
cbi_container = json.loads(string) cbi_container = json.loads(string)
except: except json.JSONDecodeError:
return False return False
return "ComicBookInfo/1.0" in cbi_container return "ComicBookInfo/1.0" in cbi_container

View File

@ -19,8 +19,6 @@ import xml.etree.ElementTree as ET
from collections import OrderedDict from collections import OrderedDict
from typing import Any from typing import Any
from typing import cast from typing import cast
from typing import List
from typing import Optional
from xml.etree.ElementTree import ElementTree from xml.etree.ElementTree import ElementTree
from comicapi import utils from comicapi import utils

View File

@ -7,8 +7,6 @@ from enum import auto
from enum import Enum from enum import Enum
from typing import Any from typing import Any
from typing import Callable from typing import Callable
from typing import Optional
from typing import Set
class ItemType(Enum): class ItemType(Enum):

View File

@ -26,7 +26,6 @@ import re
from operator import itemgetter from operator import itemgetter
from typing import Callable from typing import Callable
from typing import Match from typing import Match
from typing import Optional
from typing import TypedDict from typing import TypedDict
from urllib.parse import unquote from urllib.parse import unquote
@ -194,9 +193,10 @@ class FileNameParser:
volume = "" volume = ""
# save the last word # save the last word
try: split = series.split()
last_word = series.split()[-1] if split:
except: last_word = split[-1]
else:
last_word = "" last_word = ""
# remove any parenthetical phrases # remove any parenthetical phrases
@ -225,12 +225,11 @@ class FileNameParser:
# be removed to help search online # be removed to help search online
if issue_start == 0: if issue_start == 0:
one_shot_words = ["tpb", "os", "one-shot", "ogn", "gn"] one_shot_words = ["tpb", "os", "one-shot", "ogn", "gn"]
try: split = series.split()
last_word = series.split()[-1] if split:
if last_word.lower() in one_shot_words: last_word = split[-1]
series = series.rsplit(" ", 1)[0] if last_word.casefold() in one_shot_words:
except: series, _, _ = series.rpartition(" ")
pass
if volume: if volume:
series = re.sub(r"\s+v(|ol|olume)$", "", series) series = re.sub(r"\s+v(|ol|olume)$", "", series)
@ -432,7 +431,8 @@ def parse(p: Parser) -> Callable[[Parser], Callable | None] | None:
if len(item.val.lstrip("0")) < 4: if len(item.val.lstrip("0")) < 4:
# An issue number starting with # Was not found and no previous number was found # An issue number starting with # Was not found and no previous number was found
if p.issue_number_at is None: if p.issue_number_at is None:
# Series has already been started/parsed, filters out leading alternate numbers leading alternate number # Series has already been started/parsed,
# filters out leading alternate numbers leading alternate number
if len(p.series_parts) > 0: if len(p.series_parts) > 0:
# Unset first item # Unset first item
if p.firstItem: if p.firstItem:
@ -528,7 +528,8 @@ def parse(p: Parser) -> Callable[[Parser], Callable | None] | None:
if p.peek_back().typ == filenamelexer.ItemType.Dot: if p.peek_back().typ == filenamelexer.ItemType.Dot:
p.used_items.append(p.peek_back()) p.used_items.append(p.peek_back())
# Allows removing DC from 'Wonder Woman 49 DC Sep-Oct 1951' dependent on publisher being in a static list in the lexer # Allows removing DC from 'Wonder Woman 49 DC Sep-Oct 1951'
# dependent on publisher being in a static list in the lexer
elif item.typ == filenamelexer.ItemType.Publisher: elif item.typ == filenamelexer.ItemType.Publisher:
p.filename_info["publisher"] = item.val p.filename_info["publisher"] = item.val
p.used_items.append(item) p.used_items.append(item)
@ -867,14 +868,16 @@ def parse_finish(p: Parser) -> Callable[[Parser], Callable | None] | None:
if issue_num in [x[1] for x in p.year_candidates]: if issue_num in [x[1] for x in p.year_candidates]:
p.series_parts.append(issue_num) p.series_parts.append(issue_num)
else: else:
# If this number was rejected because of an operator and the operator is still there add it back e.g. 'IG-88' # If this number was rejected because of an operator and the operator is still there add it back
# e.g. 'IG-88'
if ( if (
issue_num in p.operator_rejected issue_num in p.operator_rejected
and p.series_parts and p.series_parts
and p.series_parts[-1].typ == filenamelexer.ItemType.Operator and p.series_parts[-1].typ == filenamelexer.ItemType.Operator
): ):
p.series_parts.append(issue_num) p.series_parts.append(issue_num)
# We have no reason to not use this number as the issue number. Specifically happens when parsing 'X-Men-V1-067.cbr' # We have no reason to not use this number as the issue number.
# Specifically happens when parsing 'X-Men-V1-067.cbr'
else: else:
p.filename_info["issue"] = issue_num.val p.filename_info["issue"] = issue_num.val
p.used_items.append(issue_num) p.used_items.append(issue_num)
@ -1035,7 +1038,8 @@ def parse_info_specifier(p: Parser) -> Callable[[Parser], Callable | None] | Non
p.used_items.append(item) p.used_items.append(item)
p.used_items.append(number) p.used_items.append(number)
# This is not for the issue number it is not in either the issue or the title, assume it is the volume number and count # This is not for the issue number it is not in either the issue or the title,
# assume it is the volume number and count
elif p.issue_number_at != i.pos and i not in p.series_parts and i not in p.title_parts: elif p.issue_number_at != i.pos and i not in p.series_parts and i not in p.title_parts:
p.filename_info["volume"] = i.val p.filename_info["volume"] = i.val
p.filename_info["volume_count"] = str(int(t2do.convert(number.val))) p.filename_info["volume_count"] = str(int(t2do.convert(number.val)))
@ -1060,7 +1064,8 @@ def get_number(p: Parser, index: int) -> filenamelexer.Item | None:
rev = p.input[:index] rev = p.input[:index]
rev.reverse() rev.reverse()
for i in rev: for i in rev:
# We don't care about these types, we are looking to see if there is a number that is possibly different from the issue number for this count # We don't care about these types, we are looking to see if there is a number that is possibly different from
# the issue number for this count
if i.typ in [ if i.typ in [
filenamelexer.ItemType.LeftParen, filenamelexer.ItemType.LeftParen,
filenamelexer.ItemType.LeftBrace, filenamelexer.ItemType.LeftBrace,

View File

@ -22,8 +22,6 @@ from __future__ import annotations
import logging import logging
from typing import Any from typing import Any
from typing import List
from typing import Optional
from typing import TypedDict from typing import TypedDict
from comicapi import utils from comicapi import utils
@ -376,9 +374,9 @@ md_test.volume = 1
md_test.genre = "Sci-Fi" md_test.genre = "Sci-Fi"
md_test.language = "en" md_test.language = "en"
md_test.comments = ( md_test.comments = (
"For 12-year-old Anda, getting paid real money to kill the characters of players who were cheating in her favorite online " "For 12-year-old Anda, getting paid real money to kill the characters of players who were cheating"
"computer game was a win-win situation. Until she found out who was paying her, and what those characters meant to the " " in her favorite online computer game was a win-win situation. Until she found out who was paying her,"
"livelihood of children around the world." " and what those characters meant to the livelihood of children around the world."
) )
md_test.volume_count = None md_test.volume_count = None
md_test.critical_rating = None md_test.critical_rating = None

View File

@ -21,7 +21,6 @@ from __future__ import annotations
import logging import logging
import unicodedata import unicodedata
from typing import Optional
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -22,9 +22,7 @@ import re
import unicodedata import unicodedata
from collections import defaultdict from collections import defaultdict
from typing import Any from typing import Any
from typing import List from typing import Mapping
from typing import Optional
from typing import Union
import pycountry import pycountry
@ -202,7 +200,7 @@ def get_language(string: str | None) -> str | None:
if lang is None: if lang is None:
try: try:
return str(pycountry.languages.lookup(string).name) return str(pycountry.languages.lookup(string).name)
except: except LookupError:
return None return None
return lang return lang
@ -220,7 +218,7 @@ def get_publisher(publisher: str) -> tuple[str, str]:
return (imprint, publisher) return (imprint, publisher)
def update_publishers(new_publishers: dict[str, dict[str, str]]) -> None: def update_publishers(new_publishers: Mapping[str, Mapping[str, str]]) -> None:
for publisher in new_publishers: for publisher in new_publishers:
if publisher in publishers: if publisher in publishers:
publishers[publisher].update(new_publishers[publisher]) publishers[publisher].update(new_publishers[publisher])
@ -236,7 +234,7 @@ class ImprintDict(dict):
if the key does not exist the key is returned as the publisher unchanged if the key does not exist the key is returned as the publisher unchanged
""" """
def __init__(self, publisher, mapping=(), **kwargs): def __init__(self, publisher: str, mapping=(), **kwargs) -> None:
super().__init__(mapping, **kwargs) super().__init__(mapping, **kwargs)
self.publisher = publisher self.publisher = publisher

2
pyproject.toml Normal file
View File

@ -0,0 +1,2 @@
[tool.black]
line-length = 120

View File

@ -23,9 +23,20 @@ classifiers =
[options] [options]
packages = comicapi packages = comicapi
install_requires = install_requires =
natsort>=3.5.2 natsort>=8.1.0
pillow>=4.3.0
py7zr
pycountry
text2digits
typing-extensions
wordninja
python_requires = >=3.6 python_requires = >=3.6
[options.extras_require] [options.extras_require]
cbr = cbr =
rarfile==2.7 unrar-cffi>=0.2.2
[flake8]
max-line-length = 120
extend-ignore =
E203

789
testing/filenames.py Normal file
View File

@ -0,0 +1,789 @@
"""
format is
(
"filename",
"reason or unique case",
{
"expected": "Dictionary of properties extracted from filename",
},
bool(xfail: expected failure on the old parser)
)
"""
from __future__ import annotations
fnames = [
(
"batman 3 title (DC).cbz",
"honorific and publisher in series",
{
"issue": "3",
"series": "batman",
"title": "title",
"publisher": "DC",
"volume": "",
"year": "",
"remainder": "",
"issue_count": "",
"alternate": "",
},
True,
),
(
"batman 3 title DC.cbz",
"honorific and publisher in series",
{
"issue": "3",
"series": "batman",
"title": "title DC",
"publisher": "DC",
"volume": "",
"year": "",
"remainder": "",
"issue_count": "",
"alternate": "",
},
True,
),
(
"ms. Marvel 3.cbz",
"honorific and publisher in series",
{
"issue": "3",
"series": "ms. Marvel",
"title": "",
"publisher": "Marvel",
"volume": "",
"year": "",
"remainder": "",
"issue_count": "",
"alternate": "",
},
False,
),
(
"january jones 2.cbz",
"month in series",
{
"issue": "2",
"series": "january jones",
"title": "",
"volume": "",
"year": "",
"remainder": "",
"issue_count": "",
"alternate": "",
},
False,
),
(
"52.cbz",
"issue number only",
{
"issue": "52",
"series": "",
"title": "",
"volume": "",
"year": "",
"remainder": "",
"issue_count": "",
"alternate": "",
},
True,
),
(
"52 Monster_Island_v1_2__repaired__c2c.cbz",
"leading alternate",
{
"issue": "2",
"series": "Monster Island",
"title": "",
"volume": "1",
"year": "",
"remainder": "repaired",
"issue_count": "",
"alternate": "52",
"c2c": True,
},
True,
),
(
"Monster_Island_v1_2__repaired__c2c.cbz",
"Example from userguide",
{
"issue": "2",
"series": "Monster Island",
"title": "",
"volume": "1",
"year": "",
"remainder": "repaired",
"issue_count": "",
"c2c": True,
},
False,
),
(
"Monster Island v1 3 (1957) -- The Revenge Of King Klong (noads).cbz",
"Example from userguide",
{
"issue": "3",
"series": "Monster Island",
"title": "",
"volume": "1",
"year": "1957",
"remainder": "The Revenge Of King Klong (noads)",
"issue_count": "",
},
False,
),
(
"Foobar-Man Annual 121 - The Wrath of Foobar-Man, Part 1 of 2.cbz",
"Example from userguide",
{
"issue": "121",
"series": "Foobar-Man Annual",
"title": "The Wrath of Foobar-Man, Part 1 of 2",
"volume": "",
"year": "",
"remainder": "",
"issue_count": "",
"annual": True,
},
True,
),
(
"Plastic Man v1 002 (1942).cbz",
"Example from userguide",
{
"issue": "2",
"series": "Plastic Man",
"title": "",
"volume": "1",
"year": "1942",
"remainder": "",
"issue_count": "",
},
False,
),
(
"Blue Beetle 02.cbr",
"Example from userguide",
{
"issue": "2",
"series": "Blue Beetle",
"title": "",
"volume": "",
"year": "",
"remainder": "",
"issue_count": "",
},
False,
),
(
"Monster Island vol. 2 #2.cbz",
"Example from userguide",
{
"issue": "2",
"series": "Monster Island",
"title": "",
"volume": "2",
"year": "",
"remainder": "",
"issue_count": "",
},
False,
),
(
"Crazy Weird Comics 2 (of 2) (1969).rar",
"Example from userguide",
{
"issue": "2",
"series": "Crazy Weird Comics",
"title": "",
"volume": "",
"year": "1969",
"remainder": "",
"issue_count": "2",
},
False,
),
(
"Super Strange Yarns (1957) #92 (1969).cbz",
"Example from userguide",
{
"issue": "92",
"series": "Super Strange Yarns",
"title": "",
"volume": "1957",
"year": "1969",
"remainder": "",
"issue_count": "",
},
False,
),
(
"Action Spy Tales v1965 #3.cbr",
"Example from userguide",
{
"issue": "3",
"series": "Action Spy Tales",
"title": "",
"volume": "1965",
"year": "",
"remainder": "",
"issue_count": "",
},
False,
),
(
" X-Men-V1-067.cbr",
"hyphen separated with hyphen in series", # only parses corretly because v1 designates the volume
{
"issue": "67",
"series": "X-Men",
"title": "",
"volume": "1",
"year": "",
"remainder": "",
"issue_count": "",
},
True,
),
(
"Amazing Spider-Man 078.BEY (2022) (Digital) (Zone-Empire).cbr",
"number issue with extra",
{
"issue": "78.BEY",
"series": "Amazing Spider-Man",
"title": "",
"volume": "",
"year": "2022",
"remainder": "(Digital) (Zone-Empire)",
"issue_count": "",
},
False,
),
(
"Angel Wings 02 - Black Widow (2015) (Scanlation) (phillywilly).cbr",
"title after issue",
{
"issue": "2",
"series": "Angel Wings",
"title": "Black Widow",
"volume": "",
"year": "2015",
"remainder": "(Scanlation) (phillywilly)",
"issue_count": "",
},
True,
),
(
"Angel Wings #02 - Black Widow (2015) (Scanlation) (phillywilly).cbr",
"title after #issue",
{
"issue": "2",
"series": "Angel Wings",
"title": "Black Widow",
"volume": "",
"year": "2015",
"remainder": "(Scanlation) (phillywilly)",
"issue_count": "",
},
False,
),
(
"Aquaman - Green Arrow - Deep Target 01 (of 07) (2021) (digital) (Son of Ultron-Empire).cbr",
"issue count",
{
"issue": "1",
"series": "Aquaman - Green Arrow - Deep Target",
"title": "",
"volume": "",
"year": "2021",
"issue_count": "7",
"remainder": "(digital) (Son of Ultron-Empire)",
},
False,
),
(
"Aquaman 80th Anniversary 100-Page Super Spectacular (2021) 001 (2021) (Digital) (BlackManta-Empire).cbz",
"numbers in series",
{
"issue": "1",
"series": "Aquaman 80th Anniversary 100-Page Super Spectacular",
"title": "",
"volume": "2021",
"year": "2021",
"remainder": "(Digital) (BlackManta-Empire)",
"issue_count": "",
},
False,
),
(
"Avatar - The Last Airbender - The Legend of Korra (FCBD 2021) (Digital) (mv-DCP).cbr",
"FCBD date",
{
"issue": "",
"series": "Avatar - The Last Airbender - The Legend of Korra",
"title": "",
"volume": "",
"year": "2021",
"remainder": "(Digital) (mv-DCP)",
"issue_count": "",
"fcbd": True,
},
True,
),
(
"Avengers By Brian Michael Bendis volume 03 (2013) (Digital) (F2) (Kileko-Empire).cbz",
"volume without issue",
{
"issue": "3",
"series": "Avengers By Brian Michael Bendis",
"title": "",
"volume": "3",
"year": "2013",
"remainder": "(Digital) (F2) (Kileko-Empire)",
"issue_count": "",
},
False,
),
(
"Avengers By Brian Michael Bendis v03 (2013) (Digital) (F2) (Kileko-Empire).cbz",
"volume without issue",
{
"issue": "3",
"series": "Avengers By Brian Michael Bendis",
"title": "",
"volume": "3",
"year": "2013",
"remainder": "(Digital) (F2) (Kileko-Empire)",
"issue_count": "",
},
False,
),
(
"Batman '89 (2021) (Webrip) (The Last Kryptonian-DCP).cbr",
"year in title without issue",
{
"issue": "",
"series": "Batman '89",
"title": "",
"volume": "",
"year": "2021",
"remainder": "(Webrip) (The Last Kryptonian-DCP)",
"issue_count": "",
},
False,
),
(
"Batman_-_Superman_020_(2021)_(digital)_(NeverAngel-Empire).cbr",
"underscores",
{
"issue": "20",
"series": "Batman - Superman",
"title": "",
"volume": "",
"year": "2021",
"remainder": "(digital) (NeverAngel-Empire)",
"issue_count": "",
},
False,
),
(
"Black Widow 009 (2021) (Digital) (Zone-Empire).cbr",
"standard",
{
"issue": "9",
"series": "Black Widow",
"title": "",
"volume": "",
"year": "2021",
"remainder": "(Digital) (Zone-Empire)",
"issue_count": "",
},
False,
),
(
"Blade Runner 2029 006 (2021) (3 covers) (digital) (Son of Ultron-Empire).cbr",
"year before issue",
{
"issue": "6",
"series": "Blade Runner 2029",
"title": "",
"volume": "",
"year": "2021",
"remainder": "(3 covers) (digital) (Son of Ultron-Empire)",
"issue_count": "",
},
False,
),
(
"Blade Runner Free Comic Book Day 2021 (2021) (digital-Empire).cbr",
"FCBD year and (year)",
{
"issue": "",
"series": "Blade Runner Free Comic Book Day 2021",
"title": "",
"volume": "",
"year": "2021",
"remainder": "(digital-Empire)",
"issue_count": "",
"fcbd": True,
},
True,
),
(
"Bloodshot Book 03 (2020) (digital) (Son of Ultron-Empire).cbr",
"book",
{
"issue": "3",
"series": "Bloodshot",
"title": "Book 03",
"volume": "3",
"year": "2020",
"remainder": "(digital) (Son of Ultron-Empire)",
"issue_count": "",
},
True,
),
(
"book of eli (2020) (digital) (Son of Ultron-Empire).cbr",
"book",
{
"issue": "",
"series": "book of eli",
"title": "",
"volume": "",
"year": "2020",
"remainder": "(digital) (Son of Ultron-Empire)",
"issue_count": "",
},
False,
),
(
"Cyberpunk 2077 - You Have My Word 02 (2021) (digital) (Son of Ultron-Empire).cbr",
"title",
{
"issue": "2",
"series": "Cyberpunk 2077",
"title": "You Have My Word",
"volume": "",
"year": "2021",
"issue_count": "",
"remainder": "(digital) (Son of Ultron-Empire)",
},
True,
),
(
"Elephantmen 2259 008 - Simple Truth 03 (of 06) (2021) (digital) (Son of Ultron-Empire).cbr",
"volume count",
{
"issue": "8",
"series": "Elephantmen 2259",
"title": "Simple Truth",
"volume": "3",
"year": "2021",
"volume_count": "6",
"remainder": "(digital) (Son of Ultron-Empire)",
"issue_count": "",
},
True,
),
(
"Elephantmen 2259 #008 - Simple Truth 03 (of 06) (2021) (digital) (Son of Ultron-Empire).cbr",
"volume count",
{
"issue": "8",
"series": "Elephantmen 2259",
"title": "Simple Truth",
"volume": "3",
"year": "2021",
"volume_count": "6",
"remainder": "(digital) (Son of Ultron-Empire)",
"issue_count": "",
},
True,
),
(
"Free Comic Book Day - Avengers.Hulk (2021) (2048px) (db).cbz",
"'.' in name",
{
"issue": "",
"series": "Free Comic Book Day - Avengers Hulk",
"title": "",
"volume": "",
"year": "2021",
"remainder": "(2048px) (db)",
"issue_count": "",
"fcbd": True,
},
True,
),
(
"Goblin (2021) (digital) (Son of Ultron-Empire).cbr",
"no-issue",
{
"issue": "",
"series": "Goblin",
"title": "",
"volume": "",
"year": "2021",
"remainder": "(digital) (Son of Ultron-Empire)",
"issue_count": "",
},
False,
),
(
"Marvel Previews 002 (January 2022) (Digital-Empire).cbr",
"(month year)",
{
"issue": "2",
"series": "Marvel Previews",
"title": "",
"publisher": "Marvel",
"volume": "",
"year": "2022",
"remainder": "(Digital-Empire)",
"issue_count": "",
},
True,
),
(
"Marvel Two In One V1 090 c2c (Comixbear-DCP).cbr",
"volume issue ctc",
{
"issue": "90",
"series": "Marvel Two In One",
"title": "",
"publisher": "Marvel",
"volume": "1",
"year": "",
"remainder": "(Comixbear-DCP)",
"issue_count": "",
"c2c": True,
},
True,
),
(
"Marvel Two In One V1 #090 c2c (Comixbear-DCP).cbr",
"volume then issue",
{
"issue": "90",
"series": "Marvel Two In One",
"title": "",
"publisher": "Marvel",
"volume": "1",
"year": "",
"remainder": "(Comixbear-DCP)",
"issue_count": "",
"c2c": True,
},
False,
),
(
"Star Wars - War of the Bounty Hunters - IG-88 (2021) (Digital) (Kileko-Empire).cbz",
"number ends series, no-issue",
{
"issue": "",
"series": "Star Wars - War of the Bounty Hunters - IG-88",
"title": "",
"volume": "",
"year": "2021",
"remainder": "(Digital) (Kileko-Empire)",
"issue_count": "",
},
True,
),
(
"Star Wars - War of the Bounty Hunters - IG-88 #1 (2021) (Digital) (Kileko-Empire).cbz",
"number ends series",
{
"issue": "1",
"series": "Star Wars - War of the Bounty Hunters - IG-88",
"title": "",
"volume": "",
"year": "2021",
"remainder": "(Digital) (Kileko-Empire)",
"issue_count": "",
},
False,
),
(
"The Defenders v1 058 (1978) (digital).cbz",
"",
{
"issue": "58",
"series": "The Defenders",
"title": "",
"volume": "1",
"year": "1978",
"remainder": "(digital)",
"issue_count": "",
},
False,
),
(
"The Defenders v1 Annual 01 (1976) (Digital) (Minutemen-Slayer).cbr",
" v in series",
{
"issue": "1",
"series": "The Defenders Annual",
"title": "",
"volume": "1",
"year": "1976",
"remainder": "(Digital) (Minutemen-Slayer)",
"issue_count": "",
"annual": True,
},
True,
),
(
"The Magic Order 2 06 (2022) (Digital) (Zone-Empire)[__913302__].cbz",
"ending id",
{
"issue": "6",
"series": "The Magic Order 2",
"title": "",
"volume": "",
"year": "2022",
"remainder": "(Digital) (Zone-Empire)[913302]", # Don't really care about double underscores
"issue_count": "",
},
False,
),
(
"Wonder Woman 001 Wonder Woman Day Special Edition (2021) (digital-Empire).cbr",
"issue separates title",
{
"issue": "1",
"series": "Wonder Woman",
"title": "Wonder Woman Day Special Edition",
"volume": "",
"year": "2021",
"remainder": "(digital-Empire)",
"issue_count": "",
},
True,
),
(
"Wonder Woman #001 Wonder Woman Day Special Edition (2021) (digital-Empire).cbr",
"issue separates title",
{
"issue": "1",
"series": "Wonder Woman",
"title": "Wonder Woman Day Special Edition",
"volume": "",
"year": "2021",
"remainder": "(digital-Empire)",
"issue_count": "",
},
False,
),
(
"Wonder Woman 49 DC Sep-Oct 1951 digital [downsized, lightened, 4 missing story pages restored] (Shadowcat-Empire).cbz",
"date-range, no paren, braces",
{
"issue": "49",
"series": "Wonder Woman",
"title": "digital", # Don't have a way to get rid of this
"publisher": "DC",
"volume": "",
"year": "1951",
"remainder": "[downsized, lightened, 4 missing story pages restored] (Shadowcat-Empire)",
"issue_count": "",
},
True,
),
(
"Wonder Woman #49 DC Sep-Oct 1951 digital [downsized, lightened, 4 missing story pages restored] (Shadowcat-Empire).cbz",
"date-range, no paren, braces",
{
"issue": "49",
"series": "Wonder Woman",
"title": "digital", # Don't have a way to get rid of this
"publisher": "DC",
"volume": "",
"year": "1951",
"remainder": "[downsized, lightened, 4 missing story pages restored] (Shadowcat-Empire)",
"issue_count": "",
},
True,
),
(
"X-Men, 2021-08-04 (#02) (digital) (Glorith-HD).cbz",
"full-date, issue in parenthesis",
{
"issue": "2",
"series": "X-Men",
"title": "",
"volume": "",
"year": "2021",
"remainder": "(digital) (Glorith-HD)",
"issue_count": "",
},
True,
),
]
rnames = [
(
"{series} #{issue} - {title} ({year}) ({price})", # price should be none
False,
"universal",
"Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz",
),
(
"{series} #{issue} - {title} ({year})",
False,
"universal",
"Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz",
),
(
"{series}: {title} #{issue} ({year})",
False,
"universal",
"Cory Doctorow's Futuristic Tales of the Here and Now - Anda's Game #001 (2007).cbz",
),
(
"{series}: {title} #{issue} ({year})",
False,
"Linux",
"Cory Doctorow's Futuristic Tales of the Here and Now: Anda's Game #001 (2007).cbz",
),
(
"{publisher}/ {series} #{issue} - {title} ({year})",
True,
"universal",
"IDW Publishing/Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz",
),
(
"{publisher}/ {series} #{issue} - {title} ({year})",
False,
"universal",
"Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz",
),
(
r"{publisher}\ {series} #{issue} - {title} ({year})",
False,
"universal",
"Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz",
),
(
"{series} # {issue} - {title} ({year})",
False,
"universal",
"Cory Doctorow's Futuristic Tales of the Here and Now # 001 - Anda's Game (2007).cbz",
),
(
"{series} # {issue} - {locations} ({year})",
False,
"universal",
"Cory Doctorow's Futuristic Tales of the Here and Now # 001 - lonely cottage (2007).cbz",
),
(
"{series} #{issue} - {title} - {WriteR}, {EDITOR} ({year})",
False,
"universal",
"Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game - Dara Naraghi, Ted Adams (2007).cbz",
),
]

95
tests/autoimprint_test.py Normal file
View File

@ -0,0 +1,95 @@
from __future__ import annotations
from typing import Mapping
import pytest
from comicapi import utils
imprints = [
("marvel", ("", "Marvel")),
("marvel comics", ("", "Marvel")),
("aircel", ("Aircel Comics", "Marvel")),
]
additional_imprints = [
("test", ("Test", "Marvel")),
("temp", ("Temp", "DC Comics")),
]
all_imprints = imprints + additional_imprints
seed: Mapping[str, utils.ImprintDict] = {
"Marvel": utils.ImprintDict(
"Marvel",
{
"marvel comics": "",
"aircel": "Aircel Comics",
},
)
}
additional_seed: Mapping[str, utils.ImprintDict] = {
"Marvel": utils.ImprintDict("Marvel", {"test": "Test"}),
"DC Comics": utils.ImprintDict("DC Comics", {"temp": "Temp"}),
}
all_seed: Mapping[str, utils.ImprintDict] = {
"Marvel": seed["Marvel"].copy(),
"DC Comics": additional_seed["DC Comics"].copy(),
}
all_seed["Marvel"].update(additional_seed["Marvel"])
conflicting_seed = {"Marvel": {"test": "Never"}}
# manually seeds publishers
@pytest.fixture
def seed_publishers(monkeypatch):
publisher_seed = {}
for publisher, imprint in seed.items():
publisher_seed[publisher] = imprint
monkeypatch.setattr(utils, "publishers", publisher_seed)
@pytest.fixture
def seed_all_publishers(monkeypatch):
publisher_seed = {}
for publisher, imprint in all_seed.items():
publisher_seed[publisher] = imprint
monkeypatch.setattr(utils, "publishers", publisher_seed)
# test that that an empty list returns the input unchanged
@pytest.mark.parametrize("publisher, expected", imprints)
def test_get_publisher_empty(publisher: str, expected: tuple[str, str]):
assert ("", publisher) == utils.get_publisher(publisher)
# initial test
@pytest.mark.parametrize("publisher, expected", imprints)
def test_get_publisher(publisher: str, expected: tuple[str, str], seed_publishers):
assert expected == utils.get_publisher(publisher)
# tests that update_publishers will initially set values
@pytest.mark.parametrize("publisher, expected", imprints)
def test_set_publisher(publisher: str, expected: tuple[str, str]):
utils.update_publishers(seed)
assert expected == utils.get_publisher(publisher)
# tests that update_publishers will add to existing values
@pytest.mark.parametrize("publisher, expected", all_imprints)
def test_update_publisher(publisher: str, expected: tuple[str, str], seed_publishers):
utils.update_publishers(additional_seed)
assert expected == utils.get_publisher(publisher)
# tests that update_publishers will overwrite conflicting existing values
def test_conflict_publisher(seed_all_publishers):
assert ("Test", "Marvel") == utils.get_publisher("test")
utils.update_publishers(conflicting_seed)
assert ("Never", "Marvel") == utils.get_publisher("test")

122
tests/comicarchive_test.py Normal file
View File

@ -0,0 +1,122 @@
from __future__ import annotations
import pathlib
import shutil
import pytest
import comicapi.comicarchive
import comicapi.genericmetadata
thisdir = pathlib.Path(__file__).parent
cbz_path = thisdir / "data" / "Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz"
@pytest.mark.xfail(not comicapi.comicarchive.rar_support, reason="rar support")
def test_getPageNameList():
c = comicapi.comicarchive.ComicArchive(thisdir / "data" / "fake_cbr.cbr")
pageNameList = c.get_page_name_list()
assert pageNameList == [
"page0.jpg",
"Page1.jpeg",
"Page2.png",
"Page3.gif",
"page4.webp",
"page10.jpg",
]
def test_set_default_page_list(tmpdir):
md = comicapi.genericmetadata.GenericMetadata()
md.overlay(comicapi.genericmetadata.md_test)
md.pages = []
md.set_default_page_list(len(comicapi.genericmetadata.md_test.pages))
assert isinstance(md.pages[0]["Image"], int)
def test_page_type_read():
c = comicapi.comicarchive.ComicArchive(cbz_path)
md = c.read_cix()
assert isinstance(md.pages[0]["Type"], str)
def test_metadata_read():
c = comicapi.comicarchive.ComicArchive(
thisdir / "data" / "Cory Doctorow's Futuristic Tales of the Here and Now #001 - Anda's Game (2007).cbz"
)
md = c.read_cix()
md_dict = md.__dict__
md_test_dict = comicapi.genericmetadata.md_test.__dict__
assert md_dict == md_test_dict
def test_save_cix(tmpdir):
comic_path = tmpdir.mkdir("cbz") / cbz_path.name
print(comic_path)
shutil.copy(cbz_path, comic_path)
c = comicapi.comicarchive.ComicArchive(comic_path)
md = c.read_cix()
md.set_default_page_list(c.get_number_of_pages())
assert c.write_cix(md)
md = c.read_cix()
def test_page_type_save(tmpdir):
comic_path = tmpdir.mkdir("cbz") / cbz_path.name
print(comic_path)
shutil.copy(cbz_path, comic_path)
c = comicapi.comicarchive.ComicArchive(comic_path)
md = c.read_cix()
t = md.pages[0]
t["Type"] = ""
assert c.write_cix(md)
md = c.read_cix()
archivers = [
comicapi.comicarchive.ZipArchiver,
comicapi.comicarchive.TarArchiver,
comicapi.comicarchive.SevenZipArchiver,
pytest.param(
comicapi.comicarchive.RarArchiver,
marks=pytest.mark.xfail(not comicapi.comicarchive.rar_support, reason="rar support"),
),
]
@pytest.mark.parametrize("archiver", archivers)
def test_copy_to_archive(archiver, tmpdir):
comic_path = tmpdir / cbz_path.with_suffix("").name
cbz = comicapi.comicarchive.ZipArchiver(cbz_path)
archive = archiver(comic_path)
assert archive.copy_from_archive(cbz)
comic_archive = comicapi.comicarchive.ComicArchive(comic_path)
md = comic_archive.read_cix()
md_dict = md.__dict__
md_test_dict = comicapi.genericmetadata.md_test.__dict__
assert md_dict == md_test_dict
md = comicapi.genericmetadata.GenericMetadata()
md.overlay(comicapi.genericmetadata.md_test)
md.series = "test"
assert comic_archive.write_cix(md)
test_md = comic_archive.read_cix()
md_dict = md.__dict__
test_md_dict = test_md.__dict__
assert md_dict == test_md_dict

BIN
tests/data/fake_cbr.cbr Normal file

Binary file not shown.

View File

@ -0,0 +1,49 @@
from __future__ import annotations
import pytest
import comicapi.filenameparser
from testing.filenames import fnames
@pytest.mark.parametrize("filename, reason, expected, xfail", fnames)
def test_file_name_parser_new(filename, reason, expected, xfail):
p = comicapi.filenameparser.Parse(
comicapi.filenamelexer.Lex(filename).items,
first_is_alt=True,
remove_c2c=True,
remove_fcbd=True,
remove_publisher=True,
)
fp = p.filename_info
for s in ["archive"]:
if s in fp:
del fp[s]
for s in ["alternate", "publisher", "volume_count"]:
if s not in expected:
expected[s] = ""
for s in ["fcbd", "c2c", "annual"]:
if s not in expected:
expected[s] = False
assert fp == expected
@pytest.mark.parametrize("filename, reason, expected, xfail", fnames)
def test_file_name_parser(filename, reason, expected, xfail):
p = comicapi.filenameparser.FileNameParser()
p.parse_filename(filename)
fp = p.__dict__
# These are currently not tracked in this parser
for s in ["title", "alternate", "publisher", "fcbd", "c2c", "annual", "volume_count", "remainder"]:
if s in expected:
del expected[s]
# The remainder is not considered compatible between parsers
if "remainder" in fp:
del fp["remainder"]
if xfail and fp != expected:
pytest.xfail("old parser")
assert fp == expected

30
tests/issuestring_test.py Normal file
View File

@ -0,0 +1,30 @@
from __future__ import annotations
import pytest
import comicapi.issuestring
issues = [
("¼", 0.25, "¼"),
("", 1.5, "001½"),
("0.5", 0.5, "000.5"),
("0", 0.0, "000"),
("1", 1.0, "001"),
("22.BEY", 22.0, "022.BEY"),
("22A", 22.0, "022A"),
("22-A", 22.0, "022-A"),
]
@pytest.mark.parametrize("issue, expected_float, expected_pad_str", issues)
def test_issue_string_as_float(issue, expected_float, expected_pad_str):
issue_float = comicapi.issuestring.IssueString(issue).as_float()
assert issue_float == expected_float
@pytest.mark.parametrize("issue, expected_float, expected_pad_str", issues)
def test_issue_string_as_string(issue, expected_float, expected_pad_str):
issue_str = comicapi.issuestring.IssueString(issue).as_string()
issue_str_pad = comicapi.issuestring.IssueString(issue).as_string(3)
assert issue_str == issue
assert issue_str_pad == expected_pad_str