Fix failing test

This commit is contained in:
Timmy Welch 2024-08-03 23:11:31 -07:00
parent b8728c5eed
commit 8709ef301d
4 changed files with 85 additions and 54 deletions

View File

@ -46,7 +46,7 @@ class ZipArchiver(Archiver):
try:
data = zf.read(archive_file)
except (zipfile.BadZipfile, OSError) as e:
logger.error("Error reading zip archive [%s]: %s :: %s", e, self.path, archive_file)
logger.exception("Error reading zip archive [%s]: %s :: %s", e, self.path, archive_file)
raise
return data
@ -143,7 +143,16 @@ class ZipArchiver(Archiver):
@classmethod
def is_valid(cls, path: pathlib.Path) -> bool:
return zipfile.is_zipfile(path)
if not zipfile.is_zipfile(path): # only checks central directory ot the end of the archive
return False
try:
# test all the files in the zip. adds about 0.1 to execution time per zip
with zipfile.ZipFile(path) as zf:
for zipinfo in zf.filelist:
zf.open(zipinfo).close()
return True
except Exception:
return False
def _patch_zipfile(zf): # type: ignore

View File

@ -23,7 +23,6 @@ import os
import pathlib
import shutil
import sys
import traceback
from collections.abc import Iterable
from typing import TYPE_CHECKING
@ -198,7 +197,11 @@ class ComicArchive:
return self.archiver.name() == "ZIP"
def seems_to_be_a_comic_archive(self) -> bool:
if not (isinstance(self.archiver, UnknownArchiver)) and self.get_number_of_pages() > 0:
if (
not (isinstance(self.archiver, UnknownArchiver))
and self.get_number_of_pages() > 0
and self.archiver.is_valid(self.path)
):
return True
return False
@ -252,11 +255,8 @@ class ComicArchive:
if filename:
try:
image_data = self.archiver.read_file(filename) or b""
except Exception as e:
tb = traceback.extract_tb(e.__traceback__)
logger.error(
"%s:%s: Error reading in page %d. Substituting logo page.", tb[1].filename, tb[1].lineno, index
)
except Exception:
logger.exception("Error reading in page %d. Substituting logo page.", index)
image_data = ComicArchive.logo_data
return image_data
@ -337,37 +337,42 @@ class ComicArchive:
) -> None:
md.page_count = self.get_number_of_pages()
md.apply_default_page_list(self.get_page_name_list())
if calc_page_sizes:
for index, p in enumerate(md.pages):
idx = p.display_index
if self.pil_available:
try:
from PIL import Image
if not calc_page_sizes or not self.seems_to_be_a_comic_archive():
return
for p in md.pages:
self.pil_available = True
except ImportError:
self.pil_available = False
if p.byte_size is None or p.height is None or p.width is None or p.double_page is None:
data = self.get_page(idx)
p.byte_size = len(data)
if data:
try:
if isinstance(data, bytes):
im = Image.open(io.BytesIO(data))
else:
im = Image.open(io.StringIO(data))
w, h = im.size
if not self.pil_available:
if p.byte_size is not None:
data = self.get_page(p.archive_index)
p.byte_size = len(data)
continue
try:
from PIL import Image
p.height = h
p.width = w
if detect_double_page:
p.double_page = p.is_double_page()
except Exception as e:
logger.warning("Error decoding image [%s] %s :: image %s", e, self.path, index)
else:
if p.byte_size is not None:
data = self.get_page(idx)
p.byte_size = len(data)
self.pil_available = True
except ImportError:
self.pil_available = False
if p.byte_size is not None:
data = self.get_page(p.archive_index)
p.byte_size = len(data)
continue
if p.byte_size is None or p.height is None or p.width is None or p.double_page is None:
try:
data = self.get_page(p.archive_index)
p.byte_size = len(data)
if not data:
continue
im = Image.open(io.BytesIO(data))
w, h = im.size
p.height = h
p.width = w
if detect_double_page:
p.double_page = p.is_double_page()
except Exception as e:
logger.exception("Error decoding image [%s] %s :: image %s", e, self.path, p.archive_index)
def metadata_from_filename(
self,

View File

@ -87,33 +87,47 @@ class ComicRack(Tag):
return archive.supports_files()
def has_tags(self, archive: Archiver) -> bool:
return (
self.supports_tags(archive)
and self.file in archive.get_filename_list()
and self._validate_bytes(archive.read_file(self.file))
)
try: # read_file can cause an exception
return (
self.supports_tags(archive)
and self.file in archive.get_filename_list()
and self._validate_bytes(archive.read_file(self.file))
)
except Exception:
return False
def remove_tags(self, archive: Archiver) -> bool:
return self.has_tags(archive) and archive.remove_file(self.file)
def read_tags(self, archive: Archiver) -> GenericMetadata:
if self.has_tags(archive):
metadata = archive.read_file(self.file) or b""
if self._validate_bytes(metadata):
return self._metadata_from_bytes(metadata)
try: # read_file can cause an exception
metadata = archive.read_file(self.file) or b""
if self._validate_bytes(metadata):
return self._metadata_from_bytes(metadata)
except Exception:
...
return GenericMetadata()
def read_raw_tags(self, archive: Archiver) -> str:
if self.has_tags(archive):
return ET.tostring(ET.fromstring(archive.read_file(self.file)), encoding="unicode", xml_declaration=True)
try: # read_file can cause an exception
if self.has_tags(archive):
b = archive.read_file(self.file)
# ET.fromstring is used as xml can declare the encoding
return ET.tostring(ET.fromstring(b), encoding="unicode", xml_declaration=True)
except Exception:
...
return ""
def write_tags(self, metadata: GenericMetadata, archive: Archiver) -> bool:
if self.supports_tags(archive):
xml = b""
if self.has_tags(archive):
xml = archive.read_file(self.file)
return archive.write_file(self.file, self._bytes_from_metadata(metadata, xml))
try: # read_file can cause an exception
if self.has_tags(archive):
xml = archive.read_file(self.file)
return archive.write_file(self.file, self._bytes_from_metadata(metadata, xml))
except Exception:
...
else:
logger.warning(f"Archive ({archive.name()}) does not support {self.name()} metadata")
return False

View File

@ -8,6 +8,7 @@ import pytest
from importlib_metadata import entry_points
import comicapi.archivers.rar
import comicapi.archivers.zip
import comicapi.comicarchive
import comicapi.genericmetadata
from testing.filenames import datadir
@ -101,12 +102,14 @@ def test_page_type_write(tmp_comic):
md = tmp_comic.read_tags("cr")
def test_invalid_zip(tmp_comic):
def test_invalid_zip(tmp_comic: comicapi.comicarchive.ComicArchive):
with open(tmp_comic.path, mode="b+r") as f:
# This only corrupts the first file. If it is never read then no exception will be caused
f.write(b"PK\000\000")
result = tmp_comic.write_tags(comicapi.genericmetadata.md_test, "cr")
assert not result
result = tmp_comic.write_tags(comicapi.genericmetadata.md_test, "cr") # This is not the first file
assert result
assert not tmp_comic.seems_to_be_a_comic_archive() # Calls archiver.is_valid
archivers = []