Compare commits

...

50 Commits

Author SHA1 Message Date
Timmy Welch
12f1d11ee8 Merge branch 'mizaki/issue_hash_cover' into develop 2025-05-05 00:20:57 -07:00
Timmy Welch
3d47e6b3b6 Make perception hash more efficient 2025-05-04 17:28:52 -07:00
Timmy Welch
0f1239f603 Remove probably unnecessary waits in rar code for macOS 2025-05-04 17:28:03 -07:00
Timmy Welch
66cc901027 Fix python 3.12 deprecation 2025-05-04 15:49:48 -07:00
Timmy Welch
ca969e12a7 Update quick tag for new api 2025-05-04 15:40:34 -07:00
Timmy Welch
039fd4598d Remove unnecessary log output 2025-05-04 15:32:45 -07:00
Timmy Welch
f1b729129e Fix mypy types 2025-05-04 15:32:26 -07:00
Timmy Welch
0a7bb4d93d Fix ratelimit on direct series/issue lookups 2025-05-04 15:32:00 -07:00
Mizaki
3c062a1cd3 Alter invalid hash test from hash value to kind value 2025-05-04 22:32:09 +01:00
Mizaki
bcc677ab12 Use empty string Kind instead of Hash != 0 for hash checking. Remove redundent or for HashImage.URL value 2025-05-03 22:07:28 +01:00
Timmy Welch
77ddbf5baa pre-sort filenames fixes #705
Provides consistent ordering for numbers in names
2025-05-02 20:02:24 -07:00
github-actions[bot]
71b32f6702 Update AUTHORS 2025-05-03 02:55:58 +00:00
github-actions[bot]
32dd3a253f
docs(contributor): contrib-readme-action has updated readme 2025-05-03 02:55:54 +00:00
Timmy Welch
dfaa2cc11d Reduce number of requests for quick_tag 2025-05-02 14:33:00 -07:00
Timmy Welch
2106883c67 Improve ComicCacher performance 2025-05-02 14:12:25 -07:00
Timmy Welch
3ebc11d95e Merge branch 'emmanuel-ferdman/develop' into develop 2025-05-02 13:50:32 -07:00
Timmy Welch
c9e368bf3f Speedup ComicArchive access fixes #728
Fix invalid zip test
Removing the check on each file inside of the zip, invalid zip files may still be opened but don't really matter in this case
Cache reading the filename list
Add a list of supported extensions to check first for an archiver
Remove unnecessary calls to rar executable
Fix limiter on integration test
Remove excess processEvents calls
Fix unnecessary calls when inserting into the FileSelectionList
2025-05-02 13:42:01 -07:00
Timmy Welch
2f64154cd2 Update to latest version of settngs 2025-05-01 18:18:50 -07:00
Timmy Welch
165388ce1b Show more options to the user if there are multiple bad matches
Fix some error cases in the comicvine talker
Remove leftover pprint statement
2025-04-30 17:32:05 -07:00
Timmy Welch
fb629891ba Sort files before processing 2025-04-30 17:27:25 -07:00
Timmy Welch
f0c644f5ec Fix flake8 error 2025-04-30 17:26:56 -07:00
Timmy Welch
5ee31f45a8 Fix performance when removing tags from cbz files 2025-04-30 17:26:36 -07:00
Timmy Welch
bfd9fe89dc Update quick-tag for new api 2025-04-25 13:45:28 -07:00
Emmanuel Ferdman
d65ce48882
Resolve bs4 deprecation warnings
Signed-off-by: Emmanuel Ferdman <emmanuelferdman@gmail.com>
2025-04-23 23:03:46 -07:00
Timmy Welch
75bba1814c Remove rapidfuzz and use stdlib difflib
Results are on-par (90% the same) and this removes a dependency
2025-04-23 18:57:28 -07:00
Timmy Welch
146f160802 Fix tag selection 2025-04-22 21:08:10 -07:00
Timmy Welch
ad26ee7818 Fix deprecation warning 2025-04-22 21:08:10 -07:00
Timmy Welch
b5eba8d715 Fix difference_hash 2025-04-22 21:08:10 -07:00
Timmy Welch
d4bdefa9c1 Simplify zip 2025-04-22 21:04:02 -07:00
Mizaki
506fac03c7 Use ImageHash solely 2025-04-17 23:48:53 +01:00
Timmy Welch
343be3b973 Upgrade pre-commit 2025-04-13 13:48:42 -07:00
Timmy Welch
3c6321faa0 Fix assertion about image pixels 2025-04-12 21:15:49 -07:00
Timmy Welch
161f2ae985 Add all pillow extensions to recognized image extensions Fixes #752 2025-04-12 14:05:07 -07:00
github-actions[bot]
2a8a3ab0c8 Update AUTHORS 2025-04-05 19:10:05 +00:00
github-actions[bot]
65ae288018
docs(contributor): contrib-readme-action has updated readme 2025-04-05 19:10:02 +00:00
Timmy Welch
1641182ec0 Merge branch 'N-Hertstein/develop' into develop 2025-04-05 12:09:11 -07:00
HSN
2fafd1b064
Fallback to C only and add Logging
Skip falling back to en_US and go straight to C as it is always available.
Add error logging.
2025-04-04 20:03:18 +02:00
HSN
827b7a2173
Remove .UTF-8 from fallback language options
Modify fallback languages from en_US.UTF-8 & C.UTF-8 to en_US & C to avoid errors when UTF-8 is not available.
2025-03-30 12:06:33 +02:00
pre-commit-ci[bot]
8aa422fd66 [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2025-03-29 07:07:43 +00:00
HSN
7e3824c769
Change ' to " because test error... 2025-03-29 08:05:15 +01:00
pre-commit-ci[bot]
4f8d4803e1 [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
2025-03-29 07:00:08 +00:00
HSN
b482b88c37
Add error catching to locale.getlocale()
Add error handling and fallbacks to en_US or C as language locale in case of misconfigured or minimal system.
2025-03-29 07:54:44 +01:00
Timmy Welch
bd6afb60ba Revert "Add Linux aarch64 runner"
This reverts commit 95c85e906da88ce1ceeb19736a3ffc834a6fabca.
2025-03-22 20:28:02 -07:00
Timmy Welch
a87368bd09 Fix #741 2025-03-22 20:21:07 -07:00
Timmy Welch
95c85e906d Add Linux aarch64 runner 2025-03-22 20:15:09 -07:00
Timmy Welch
3965bfe082 Merge branch 'mizaki/qtutils_image_exception' into develop 2025-03-22 20:02:43 -07:00
Mizaki
ba2d823993 Exit early if 0 bytes image data 2025-03-04 22:32:29 +00:00
Mizaki
cf3009ca02 Report image_data size in exception message 2025-02-28 17:31:20 +00:00
Mizaki
a0be90bbf5 Add URL to ImageHash and use in issue window 2025-02-28 16:55:56 +00:00
Mizaki
14213dd245 Change failed image loading from logger exception to warning 2025-02-28 14:10:01 +00:00
38 changed files with 1220 additions and 770 deletions

View File

@ -10,7 +10,7 @@ repos:
- id: name-tests-test
- id: requirements-txt-fixer
- repo: https://github.com/asottile/setup-cfg-fmt
rev: v2.7.0
rev: v2.8.0
hooks:
- id: setup-cfg-fmt
- repo: https://github.com/asottile/pyupgrade
@ -29,11 +29,11 @@ repos:
- id: isort
args: [--af,--add-import, 'from __future__ import annotations']
- repo: https://github.com/psf/black
rev: 24.4.2
rev: 25.1.0
hooks:
- id: black
- repo: https://github.com/PyCQA/flake8
rev: 7.1.2
rev: 7.2.0
hooks:
- id: flake8
additional_dependencies: [flake8-encodings, flake8-builtins, flake8-print, flake8-no-nested-comprehensions]

View File

@ -19,3 +19,5 @@ pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
kcgthb <kcgthb@users.noreply.github.com>
Kilian Cavalotti <kcgthb@users.noreply.github.com>
David Bugl <david.bugl@gmx.at>
HSN <64664577+N-Hertstein@users.noreply.github.com>
Emmanuel Ferdman <emmanuelferdman@gmail.com>

View File

@ -131,6 +131,13 @@ winget install ComicTagger.ComicTagger
<sub><b>abuchanan920</b></sub>
</a>
</td>
<td align="center">
<a href="https://github.com/N-Hertstein">
<img src="https://avatars.githubusercontent.com/u/64664577?v=4" width="100;" alt="N-Hertstein"/>
<br />
<sub><b>N-Hertstein</b></sub>
</a>
</td>
<td align="center">
<a href="https://github.com/kcgthb">
<img src="https://avatars.githubusercontent.com/u/186807?v=4" width="100;" alt="kcgthb"/>
@ -158,6 +165,14 @@ winget install ComicTagger.ComicTagger
<br />
<sub><b>Sn1cket</b></sub>
</a>
</td></tr>
<tr>
<td align="center">
<a href="https://github.com/emmanuel-ferdman">
<img src="https://avatars.githubusercontent.com/u/35470921?v=4" width="100;" alt="emmanuel-ferdman"/>
<br />
<sub><b>emmanuel-ferdman</b></sub>
</a>
</td>
<td align="center">
<a href="https://github.com/jpcranford">
@ -165,8 +180,7 @@ winget install ComicTagger.ComicTagger
<br />
<sub><b>jpcranford</b></sub>
</a>
</td></tr>
<tr>
</td>
<td align="center">
<a href="https://github.com/PawlakMarek">
<img src="https://avatars.githubusercontent.com/u/26022173?v=4" width="100;" alt="PawlakMarek"/>
@ -194,7 +208,8 @@ winget install ComicTagger.ComicTagger
<br />
<sub><b>thFrgttn</b></sub>
</a>
</td>
</td></tr>
<tr>
<td align="center">
<a href="https://github.com/tlc">
<img src="https://avatars.githubusercontent.com/u/19436?v=4" width="100;" alt="tlc"/>

View File

@ -1,6 +1,7 @@
from __future__ import annotations
import pathlib
from collections.abc import Collection
from typing import Protocol, runtime_checkable
@ -30,6 +31,8 @@ class Archiver(Protocol):
"""
hashable: bool = True
supported_extensions: Collection[str] = set()
def __init__(self) -> None:
self.path = pathlib.Path()

View File

@ -17,6 +17,7 @@ class FolderArchiver(Archiver):
def __init__(self) -> None:
super().__init__()
self.comment_file_name = "ComicTaggerFolderComment.txt"
self._filename_list: list[str] = []
def get_comment(self) -> str:
try:
@ -25,8 +26,10 @@ class FolderArchiver(Archiver):
return ""
def set_comment(self, comment: str) -> bool:
if (self.path / self.comment_file_name).exists() or comment:
self._filename_list = []
if comment:
return self.write_file(self.comment_file_name, comment.encode("utf-8"))
(self.path / self.comment_file_name).unlink(missing_ok=True)
return True
def supports_comment(self) -> bool:
@ -42,6 +45,7 @@ class FolderArchiver(Archiver):
return data
def remove_file(self, archive_file: str) -> bool:
self._filename_list = []
try:
(self.path / archive_file).unlink(missing_ok=True)
except OSError as e:
@ -51,6 +55,7 @@ class FolderArchiver(Archiver):
return True
def write_file(self, archive_file: str, data: bytes) -> bool:
self._filename_list = []
try:
file_path = self.path / archive_file
file_path.parent.mkdir(exist_ok=True, parents=True)
@ -63,11 +68,14 @@ class FolderArchiver(Archiver):
return True
def get_filename_list(self) -> list[str]:
if self._filename_list:
return self._filename_list
filenames = []
try:
for root, _dirs, files in os.walk(self.path):
for f in files:
filenames.append(os.path.relpath(os.path.join(root, f), self.path).replace(os.path.sep, "/"))
self._filename_list = filenames
return filenames
except OSError as e:
logger.error("Error listing files in folder archive [%s]: %s", e, self.path)
@ -78,6 +86,7 @@ class FolderArchiver(Archiver):
def copy_from_archive(self, other_archive: Archiver) -> bool:
"""Replace the current zip with one copied from another archive"""
self._filename_list = []
try:
for filename in other_archive.get_filename_list():
data = other_archive.read_file(filename)

View File

@ -8,7 +8,6 @@ import platform
import shutil
import subprocess
import tempfile
import time
from comicapi.archivers import Archiver
@ -24,6 +23,11 @@ logger = logging.getLogger(__name__)
if not rar_support:
logger.error("rar unavailable")
# windows only, keeps the cmd.exe from popping up
STARTUPINFO = None
if platform.system() == "Windows":
STARTUPINFO = subprocess.STARTUPINFO() # type: ignore
STARTUPINFO.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore
class RarArchiver(Archiver):
@ -31,22 +35,22 @@ class RarArchiver(Archiver):
enabled = rar_support
exe = "rar"
supported_extensions = frozenset({".cbr", ".rar"})
_rar: rarfile.RarFile | None = None
_rar_setup: rarfile.ToolSetup | None = None
_writeable: bool | None = None
def __init__(self) -> None:
super().__init__()
# windows only, keeps the cmd.exe from popping up
if platform.system() == "Windows":
self.startupinfo = subprocess.STARTUPINFO() # type: ignore
self.startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore
else:
self.startupinfo = None
self._filename_list: list[str] = []
def get_comment(self) -> str:
rarc = self.get_rar_obj()
return (rarc.comment if rarc else "") or ""
def set_comment(self, comment: str) -> bool:
self._reset()
if rar_support and self.exe:
try:
# write comment to temp file
@ -67,7 +71,7 @@ class RarArchiver(Archiver):
]
result = subprocess.run(
proc_args,
startupinfo=self.startupinfo,
startupinfo=STARTUPINFO,
stdin=subprocess.DEVNULL,
capture_output=True,
encoding="utf-8",
@ -81,16 +85,11 @@ class RarArchiver(Archiver):
result.stderr,
)
return False
if platform.system() == "Darwin":
time.sleep(1)
except OSError as e:
logger.exception("Error writing comment to rar archive [%s]: %s", e, self.path)
return False
else:
return True
else:
return False
return True
return False
def supports_comment(self) -> bool:
return True
@ -120,7 +119,6 @@ class RarArchiver(Archiver):
except OSError as e:
logger.error("Error reading rar archive [%s]: %s :: %s :: tries #%d", e, self.path, archive_file, tries)
time.sleep(1)
except Exception as e:
logger.error(
"Unexpected exception reading rar archive [%s]: %s :: %s :: tries #%d",
@ -141,20 +139,19 @@ class RarArchiver(Archiver):
raise OSError
def remove_file(self, archive_file: str) -> bool:
self._reset()
if self.exe:
working_dir = os.path.dirname(os.path.abspath(self.path))
# use external program to remove file from Rar archive
result = subprocess.run(
[self.exe, "d", f"-w{working_dir}", "-c-", self.path, archive_file],
startupinfo=self.startupinfo,
startupinfo=STARTUPINFO,
stdin=subprocess.DEVNULL,
capture_output=True,
encoding="utf-8",
cwd=self.path.absolute().parent,
)
if platform.system() == "Darwin":
time.sleep(1)
if result.returncode != 0:
logger.error(
"Error removing file from rar archive [exitcode: %d]: %s :: %s",
@ -164,10 +161,10 @@ class RarArchiver(Archiver):
)
return False
return True
else:
return False
return False
def write_file(self, archive_file: str, data: bytes) -> bool:
self._reset()
if self.exe:
archive_path = pathlib.PurePosixPath(archive_file)
archive_name = archive_path.name
@ -187,13 +184,11 @@ class RarArchiver(Archiver):
self.path,
],
input=data,
startupinfo=self.startupinfo,
startupinfo=STARTUPINFO,
capture_output=True,
cwd=self.path.absolute().parent,
)
if platform.system() == "Darwin":
time.sleep(1)
if result.returncode != 0:
logger.error(
"Error writing rar archive [exitcode: %d]: %s :: %s :: %s",
@ -203,12 +198,12 @@ class RarArchiver(Archiver):
result.stderr,
)
return False
else:
return True
else:
return False
return True
return False
def get_filename_list(self) -> list[str]:
if self._filename_list:
return self._filename_list
rarc = self.get_rar_obj()
tries = 0
if rar_support and rarc:
@ -222,9 +217,9 @@ class RarArchiver(Archiver):
except OSError as e:
logger.error("Error listing files in rar archive [%s]: %s :: attempt #%d", e, self.path, tries)
time.sleep(1)
else:
self._filename_list = namelist
return namelist
return []
@ -233,6 +228,7 @@ class RarArchiver(Archiver):
def copy_from_archive(self, other_archive: Archiver) -> bool:
"""Replace the current archive with one copied from another archive"""
self._reset()
try:
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_path = pathlib.Path(tmp_dir)
@ -250,7 +246,7 @@ class RarArchiver(Archiver):
result = subprocess.run(
[self.exe, "a", f"-w{working_dir}", "-r", "-c-", str(rar_path.absolute()), "."],
cwd=rar_cwd.absolute(),
startupinfo=self.startupinfo,
startupinfo=STARTUPINFO,
stdin=subprocess.DEVNULL,
capture_output=True,
encoding="utf-8",
@ -278,24 +274,7 @@ class RarArchiver(Archiver):
logger.warning("Unable to find a useable copy of %r, will not be able to write rar files", str)
def is_writable(self) -> bool:
writeable = False
try:
if bool(self.exe and (os.path.exists(self.exe) or shutil.which(self.exe))):
writeable = (
subprocess.run(
(self.exe,),
startupinfo=self.startupinfo,
capture_output=True,
cwd=self.path.absolute().parent,
)
.stdout.strip()
.startswith(b"RAR")
)
except OSError:
...
if not writeable:
self._log_not_writeable(self.exe or "rar")
return False
return bool(self._writeable and bool(self.exe and (os.path.exists(self.exe) or shutil.which(self.exe))))
def extension(self) -> str:
return ".cbr"
@ -304,27 +283,62 @@ class RarArchiver(Archiver):
return "RAR"
@classmethod
def is_valid(cls, path: pathlib.Path) -> bool:
if rar_support:
# Try using exe
def _setup_rar(cls) -> None:
if cls._rar_setup is None:
assert rarfile
orig = rarfile.UNRAR_TOOL
rarfile.UNRAR_TOOL = cls.exe
try:
return rarfile.is_rarfile(str(path)) and rarfile.tool_setup(sevenzip=False, sevenzip2=False, force=True)
cls._rar_setup = rarfile.tool_setup(sevenzip=False, sevenzip2=False, force=True)
except rarfile.RarCannotExec:
rarfile.UNRAR_TOOL = orig
try:
cls._rar_setup = rarfile.tool_setup(force=True)
except rarfile.RarCannotExec as e:
logger.info(e)
if cls._writeable is None:
try:
cls._writeable = (
subprocess.run(
(cls.exe,),
startupinfo=STARTUPINFO,
capture_output=True,
# cwd=cls.path.absolute().parent,
)
.stdout.strip()
.startswith(b"RAR")
)
except OSError:
cls._writeable = False
if not cls._writeable:
cls._log_not_writeable(cls.exe or "rar")
@classmethod
def is_valid(cls, path: pathlib.Path) -> bool:
if rar_support:
assert rarfile
cls._setup_rar()
# Fallback to standard
try:
return rarfile.is_rarfile(str(path)) and rarfile.tool_setup(force=True)
return rarfile.is_rarfile(str(path))
except rarfile.RarCannotExec as e:
logger.info(e)
return False
def _reset(self) -> None:
self._rar = None
self._filename_list = []
def get_rar_obj(self) -> rarfile.RarFile | None:
if self._rar is not None:
return self._rar
if rar_support:
try:
rarc = rarfile.RarFile(str(self.path))
self._rar = rarc
except (OSError, rarfile.RarFileError) as e:
logger.error("Unable to get rar object [%s]: %s", e, self.path)
else:

View File

@ -22,9 +22,11 @@ class SevenZipArchiver(Archiver):
"""7Z implementation"""
enabled = z7_support
supported_extensions = frozenset({".7z", ".cb7"})
def __init__(self) -> None:
super().__init__()
self._filename_list: list[str] = []
# @todo: Implement Comment?
def get_comment(self) -> str:
@ -45,6 +47,7 @@ class SevenZipArchiver(Archiver):
return data
def remove_file(self, archive_file: str) -> bool:
self._filename_list = []
return self.rebuild([archive_file])
def write_file(self, archive_file: str, data: bytes) -> bool:
@ -52,6 +55,7 @@ class SevenZipArchiver(Archiver):
# archive w/o the indicated file. Very sucky, but maybe
# another solution can be found
files = self.get_filename_list()
self._filename_list = []
if archive_file in files:
if not self.rebuild([archive_file]):
return False
@ -66,10 +70,13 @@ class SevenZipArchiver(Archiver):
return False
def get_filename_list(self) -> list[str]:
if self._filename_list:
return self._filename_list
try:
with py7zr.SevenZipFile(self.path, "r") as zf:
namelist: list[str] = [file.filename for file in zf.list() if not file.is_directory]
self._filename_list = namelist
return namelist
except (py7zr.Bad7zFile, OSError) as e:
logger.error("Error listing files in 7zip archive [%s]: %s", e, self.path)
@ -84,6 +91,7 @@ class SevenZipArchiver(Archiver):
This recompresses the zip archive, without the files in the exclude_list
"""
self._filename_list = []
try:
# py7zr treats all archives as if they used solid compression
# so we need to get the filename list first to read all the files at once
@ -106,6 +114,7 @@ class SevenZipArchiver(Archiver):
def copy_from_archive(self, other_archive: Archiver) -> bool:
"""Replace the current zip with one copied from another archive"""
self._filename_list = []
try:
with py7zr.SevenZipFile(self.path, "w") as zout:
for filename in other_archive.get_filename_list():

View File

@ -15,17 +15,110 @@ from comicapi.archivers import Archiver
logger = logging.getLogger(__name__)
class ZipFile(zipfile.ZipFile):
def remove(self, zinfo_or_arcname): # type: ignore
"""Remove a member from the archive."""
if self.mode not in ("w", "x", "a"):
raise ValueError("remove() requires mode 'w', 'x', or 'a'")
if not self.fp:
raise ValueError("Attempt to write to ZIP archive that was already closed")
if self._writing: # type: ignore[attr-defined]
raise ValueError("Can't write to ZIP archive while an open writing handle exists")
# Make sure we have an existing info object
if isinstance(zinfo_or_arcname, zipfile.ZipInfo):
zinfo = zinfo_or_arcname
# make sure zinfo exists
if zinfo not in self.filelist:
raise KeyError("There is no item %r in the archive" % zinfo_or_arcname)
else:
# get the info object
zinfo = self.getinfo(zinfo_or_arcname)
return self._remove_members({zinfo})
def _remove_members(self, members, *, remove_physical=True, chunk_size=2**20): # type: ignore
"""Remove members in a zip file.
All members (as zinfo) should exist in the zip; otherwise the zip file
will erroneously end in an inconsistent state.
"""
fp = self.fp
assert fp
entry_offset = 0
member_seen = False
# get a sorted filelist by header offset, in case the dir order
# doesn't match the actual entry order
filelist = sorted(self.filelist, key=lambda x: x.header_offset)
for i in range(len(filelist)):
info = filelist[i]
is_member = info in members
if not (member_seen or is_member):
continue
# get the total size of the entry
try:
offset = filelist[i + 1].header_offset
except IndexError:
offset = self.start_dir
entry_size = offset - info.header_offset
if is_member:
member_seen = True
entry_offset += entry_size
# update caches
self.filelist.remove(info)
try:
del self.NameToInfo[info.filename]
except KeyError:
pass
continue
# update the header and move entry data to the new position
if remove_physical:
old_header_offset = info.header_offset
info.header_offset -= entry_offset
read_size = 0
while read_size < entry_size:
fp.seek(old_header_offset + read_size)
data = fp.read(min(entry_size - read_size, chunk_size))
fp.seek(info.header_offset + read_size)
fp.write(data)
fp.flush()
read_size += len(data)
# Avoid missing entry if entries have a duplicated name.
# Reverse the order as NameToInfo normally stores the last added one.
for info in reversed(self.filelist):
self.NameToInfo.setdefault(info.filename, info)
# update state
if remove_physical:
self.start_dir -= entry_offset
self._didModify = True
# seek to the start of the central dir
fp.seek(self.start_dir)
class ZipArchiver(Archiver):
"""ZIP implementation"""
supported_extensions = frozenset((".cbz", ".zip"))
def __init__(self) -> None:
super().__init__()
self._filename_list: list[str] = []
def supports_comment(self) -> bool:
return True
def get_comment(self) -> str:
with zipfile.ZipFile(self.path, "r") as zf:
with ZipFile(self.path, "r") as zf:
encoding = chardet.detect(zf.comment, True)
if encoding["confidence"] > 60:
try:
@ -37,12 +130,12 @@ class ZipArchiver(Archiver):
return comment
def set_comment(self, comment: str) -> bool:
with zipfile.ZipFile(self.path, mode="a") as zf:
with ZipFile(self.path, mode="a") as zf:
zf.comment = bytes(comment, "utf-8")
return True
def read_file(self, archive_file: str) -> bytes:
with zipfile.ZipFile(self.path, mode="r") as zf:
with ZipFile(self.path, mode="r") as zf:
try:
data = zf.read(archive_file)
except (zipfile.BadZipfile, OSError) as e:
@ -51,20 +144,26 @@ class ZipArchiver(Archiver):
return data
def remove_file(self, archive_file: str) -> bool:
return self.rebuild([archive_file])
files = self.get_filename_list()
self._filename_list = []
try:
with ZipFile(self.path, mode="a", allowZip64=True, compression=zipfile.ZIP_DEFLATED) as zf:
if archive_file in files:
zf.remove(archive_file)
return True
except (zipfile.BadZipfile, OSError) as e:
logger.error("Error writing zip archive [%s]: %s :: %s", e, self.path, archive_file)
return False
def write_file(self, archive_file: str, data: bytes) -> bool:
# At the moment, no other option but to rebuild the whole
# zip archive w/o the indicated file. Very sucky, but maybe
# another solution can be found
files = self.get_filename_list()
self._filename_list = []
try:
# now just add the archive file as a new one
with zipfile.ZipFile(self.path, mode="a", allowZip64=True, compression=zipfile.ZIP_DEFLATED) as zf:
_patch_zipfile(zf)
with ZipFile(self.path, mode="a", allowZip64=True, compression=zipfile.ZIP_DEFLATED) as zf:
if archive_file in files:
zf.remove(archive_file) # type: ignore
zf.remove(archive_file)
zf.writestr(archive_file, data)
return True
except (zipfile.BadZipfile, OSError) as e:
@ -72,10 +171,12 @@ class ZipArchiver(Archiver):
return False
def get_filename_list(self) -> list[str]:
if self._filename_list:
return self._filename_list
try:
with zipfile.ZipFile(self.path, mode="r") as zf:
namelist = [file.filename for file in zf.infolist() if not file.is_dir()]
return namelist
with ZipFile(self.path, mode="r") as zf:
self._filename_list = [file.filename for file in zf.infolist() if not file.is_dir()]
return self._filename_list
except (zipfile.BadZipfile, OSError) as e:
logger.error("Error listing files in zip archive [%s]: %s", e, self.path)
return []
@ -88,11 +189,12 @@ class ZipArchiver(Archiver):
This recompresses the zip archive, without the files in the exclude_list
"""
self._filename_list = []
try:
with zipfile.ZipFile(
with ZipFile(
tempfile.NamedTemporaryFile(dir=os.path.dirname(self.path), delete=False), "w", allowZip64=True
) as zout:
with zipfile.ZipFile(self.path, mode="r") as zin:
with ZipFile(self.path, mode="r") as zin:
for item in zin.infolist():
buffer = zin.read(item.filename)
if item.filename not in exclude_list:
@ -114,8 +216,9 @@ class ZipArchiver(Archiver):
def copy_from_archive(self, other_archive: Archiver) -> bool:
"""Replace the current zip with one copied from another archive"""
self._filename_list = []
try:
with zipfile.ZipFile(self.path, mode="w", allowZip64=True) as zout:
with ZipFile(self.path, mode="w", allowZip64=True) as zout:
for filename in other_archive.get_filename_list():
data = other_archive.read_file(filename)
if data is not None:
@ -143,106 +246,4 @@ class ZipArchiver(Archiver):
@classmethod
def is_valid(cls, path: pathlib.Path) -> bool:
if not zipfile.is_zipfile(path): # only checks central directory ot the end of the archive
return False
try:
# test all the files in the zip. adds about 0.1 to execution time per zip
with zipfile.ZipFile(path) as zf:
for zipinfo in zf.filelist:
zf.open(zipinfo).close()
return True
except Exception:
return False
def _patch_zipfile(zf): # type: ignore
zf.remove = _zip_remove.__get__(zf, zipfile.ZipFile)
zf._remove_members = _zip_remove_members.__get__(zf, zipfile.ZipFile)
def _zip_remove(self, zinfo_or_arcname): # type: ignore
"""Remove a member from the archive."""
if self.mode not in ("w", "x", "a"):
raise ValueError("remove() requires mode 'w', 'x', or 'a'")
if not self.fp:
raise ValueError("Attempt to write to ZIP archive that was already closed")
if self._writing:
raise ValueError("Can't write to ZIP archive while an open writing handle exists")
# Make sure we have an existing info object
if isinstance(zinfo_or_arcname, zipfile.ZipInfo):
zinfo = zinfo_or_arcname
# make sure zinfo exists
if zinfo not in self.filelist:
raise KeyError("There is no item %r in the archive" % zinfo_or_arcname)
else:
# get the info object
zinfo = self.getinfo(zinfo_or_arcname)
return self._remove_members({zinfo})
def _zip_remove_members(self, members, *, remove_physical=True, chunk_size=2**20): # type: ignore
"""Remove members in a zip file.
All members (as zinfo) should exist in the zip; otherwise the zip file
will erroneously end in an inconsistent state.
"""
fp = self.fp
entry_offset = 0
member_seen = False
# get a sorted filelist by header offset, in case the dir order
# doesn't match the actual entry order
filelist = sorted(self.filelist, key=lambda x: x.header_offset)
for i in range(len(filelist)):
info = filelist[i]
is_member = info in members
if not (member_seen or is_member):
continue
# get the total size of the entry
try:
offset = filelist[i + 1].header_offset
except IndexError:
offset = self.start_dir
entry_size = offset - info.header_offset
if is_member:
member_seen = True
entry_offset += entry_size
# update caches
self.filelist.remove(info)
try:
del self.NameToInfo[info.filename]
except KeyError:
pass
continue
# update the header and move entry data to the new position
if remove_physical:
old_header_offset = info.header_offset
info.header_offset -= entry_offset
read_size = 0
while read_size < entry_size:
fp.seek(old_header_offset + read_size)
data = fp.read(min(entry_size - read_size, chunk_size))
fp.seek(info.header_offset + read_size)
fp.write(data)
fp.flush()
read_size += len(data)
# Avoid missing entry if entries have a duplicated name.
# Reverse the order as NameToInfo normally stores the last added one.
for info in reversed(self.filelist):
self.NameToInfo.setdefault(info.filename, info)
# update state
if remove_physical:
self.start_dir -= entry_offset
self._didModify = True
# seek to the start of the central dir
fp.seek(self.start_dir)
return zipfile.is_zipfile(path) # only checks central directory ot the end of the archive

View File

@ -123,7 +123,7 @@ def load_tag_plugins(version: str = f"ComicAPI/{version}", local_plugins: Iterab
class ComicArchive:
logo_data = b""
pil_available = True
pil_available: bool | None = None
def __init__(
self,
@ -146,12 +146,20 @@ class ComicArchive:
self.path = pathlib.Path(path).absolute()
self.archiver = UnknownArchiver.open(self.path)
load_archive_plugins()
load_tag_plugins()
for archiver in archivers:
if archiver.enabled and archiver.is_valid(self.path):
self.archiver = archiver.open(self.path)
break
load_archive_plugins()
load_tag_plugins()
archiver_missing = True
for archiver in archivers:
if self.path.suffix in archiver.supported_extensions and archiver.is_valid(self.path):
self.archiver = archiver.open(self.path)
archiver_missing = False
break
if archiver_missing:
for archiver in archivers:
if archiver.enabled and archiver.is_valid(self.path):
self.archiver = archiver.open(self.path)
break
if not ComicArchive.logo_data and self.default_image_path:
with open(self.default_image_path, mode="rb") as fd:
@ -330,6 +338,7 @@ class ComicArchive:
def get_page_name_list(self) -> list[str]:
if not self.page_list:
self.__import_pil__() # Import pillow for list of supported extensions
self.page_list = utils.get_page_name_list(self.archiver.get_filename_list())
return self.page_list
@ -339,6 +348,22 @@ class ComicArchive:
self.page_count = len(self.get_page_name_list())
return self.page_count
def __import_pil__(self) -> bool:
if self.pil_available is not None:
return self.pil_available
try:
from PIL import Image
Image.init()
utils.KNOWN_IMAGE_EXTENSIONS.update([ext for ext, typ in Image.EXTENSION.items() if typ in Image.OPEN])
self.pil_available = True
except Exception:
self.pil_available = False
logger.exception("Failed to load Pillow")
return False
return True
def apply_archive_info_to_metadata(
self,
md: GenericMetadata,
@ -370,30 +395,15 @@ class ComicArchive:
if not calc_page_sizes:
return
for p in md.pages:
if not self.pil_available:
if p.byte_size is not None:
data = self.get_page(p.archive_index)
p.byte_size = len(data)
continue
try:
from PIL import Image
self.pil_available = True
except ImportError:
self.pil_available = False
if p.byte_size is not None:
data = self.get_page(p.archive_index)
p.byte_size = len(data)
continue
if p.byte_size is None or p.height is None or p.width is None or p.double_page is None:
try:
data = self.get_page(p.archive_index)
p.byte_size = len(data)
if not data:
if not data or not self.__import_pil__():
continue
from PIL import Image
im = Image.open(io.BytesIO(data))
w, h = im.size

View File

@ -138,11 +138,16 @@ class MetadataOrigin(NamedTuple):
class ImageHash(NamedTuple):
Hash: int
Kind: str # ahash, phash
"""
A valid ImageHash requires at a minimum a Hash and Kind or a URL
If only a URL is given, it will be used for cover matching otherwise Hash is used
The URL is also required for the GUI to display covers
Available Kind's are "ahash" and "phash"
"""
def __str__(self) -> str:
return str(self.Hash) + ": " + self.Kind
Hash: int
Kind: str
URL: str
class FileHash(NamedTuple):
@ -230,8 +235,8 @@ class GenericMetadata:
last_mark: str | None = None
# urls to cover image, not generally part of the metadata
_cover_image: str | ImageHash | None = None
_alternate_images: list[str | ImageHash] = dataclasses.field(default_factory=list)
_cover_image: ImageHash | None = None
_alternate_images: list[ImageHash] = dataclasses.field(default_factory=list)
def __post_init__(self) -> None:
for key, value in self.__dict__.items():

View File

@ -15,6 +15,7 @@
# limitations under the License.
from __future__ import annotations
import difflib
import hashlib
import json
import logging
@ -184,13 +185,16 @@ def _custom_key(tup: Any) -> Any:
T = TypeVar("T")
def os_sorted(lst: Iterable[T]) -> Iterable[T]:
def os_sorted(lst: Iterable[T]) -> list[T]:
import natsort
key = _custom_key
if icu_available or platform.system() == "Windows":
key = natsort.os_sort_keygen()
return sorted(lst, key=key)
return sorted(sorted(lst), key=key) # type: ignore[type-var]
KNOWN_IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".avif"}
def parse_filename(
@ -358,10 +362,7 @@ def get_page_name_list(files: list[str]) -> list[str]:
# make a sub-list of image files
page_list = []
for name in files:
if (
os.path.splitext(name)[1].casefold() in [".jpg", ".jpeg", ".png", ".gif", ".webp", ".avif"]
and os.path.basename(name)[0] != "."
):
if os.path.splitext(name)[1].casefold() in KNOWN_IMAGE_EXTENSIONS and os.path.basename(name)[0] != ".":
page_list.append(name)
return page_list
@ -517,19 +518,30 @@ def sanitize_title(text: str, basic: bool = False) -> str:
def titles_match(search_title: str, record_title: str, threshold: int = 90) -> bool:
import rapidfuzz.fuzz
log_msg = "search title: %s ; record title: %s ; ratio: %d ; match threshold: %d"
thresh = threshold / 100
sanitized_search = sanitize_title(search_title)
sanitized_record = sanitize_title(record_title)
ratio = int(rapidfuzz.fuzz.ratio(sanitized_search, sanitized_record))
logger.debug(
"search title: %s ; record title: %s ; ratio: %d ; match threshold: %d",
search_title,
record_title,
ratio,
threshold,
)
return ratio >= threshold
s = difflib.SequenceMatcher(None, sanitized_search, sanitized_record)
ratio = s.real_quick_ratio()
if ratio < thresh:
logger.debug(log_msg, search_title, record_title, ratio * 100, threshold)
return False
ratio = s.quick_ratio()
if ratio < thresh:
logger.debug(log_msg, search_title, record_title, ratio * 100, threshold)
return False
ratio = s.ratio()
if ratio < thresh:
logger.debug(log_msg, search_title, record_title, ratio * 100, threshold)
return False
logger.debug(log_msg, search_title, record_title, ratio * 100, threshold)
return True
def unique_file(file_name: pathlib.Path) -> pathlib.Path:

View File

@ -65,7 +65,6 @@ class AutoTagProgressWindow(QtWidgets.QDialog):
def set_cover_image(self, img_data: bytes, widget: CoverImageWidget) -> None:
widget.set_image_data(img_data)
QtCore.QCoreApplication.processEvents()
QtCore.QCoreApplication.processEvents()
def reject(self) -> None:
QtWidgets.QDialog.reject(self)

View File

@ -82,6 +82,8 @@ class CLI:
if not args:
log_args: tuple[Any, ...] = ("",)
elif isinstance(args[0], str):
if args[0] == "":
already_logged = True
log_args = (args[0].strip("\n"), *args[1:])
else:
log_args = args
@ -112,6 +114,7 @@ class CLI:
for f in self.config.Runtime_Options__files:
res, match_results = self.process_file_cli(self.config.Commands__command, f, match_results)
results.append(res)
self.output("")
if results[-1].status != Status.success:
return_code = 3
if self.config.Runtime_Options__json:
@ -438,7 +441,6 @@ class CLI:
ct_md = qt.id_comic(
ca,
md,
self.config.Quick_Tag__simple,
set(self.config.Quick_Tag__hash),
self.config.Quick_Tag__exact_only,
self.config.Runtime_Options__interactive,

View File

@ -52,15 +52,17 @@ def validate_types(config: settngs.Config[settngs.Values]) -> settngs.Config[set
for setting in group.v.values():
# Get the value and if it is the default
value, default = settngs.get_option(config.values, setting)
if not default:
if setting.type is not None:
# If it is not the default and the type attribute is not None
# use it to convert the loaded string into the expected value
if (
isinstance(value, str)
or isinstance(default, Enum)
or (isinstance(setting.type, type) and issubclass(setting.type, Enum))
):
if not default and setting.type is not None:
# If it is not the default and the type attribute is not None
# use it to convert the loaded string into the expected value
if (
isinstance(value, str)
or isinstance(default, Enum)
or (isinstance(setting.type, type) and issubclass(setting.type, Enum))
):
if isinstance(setting.type, type) and issubclass(setting.type, Enum) and isinstance(value, list):
config.values[setting.group][setting.dest] = [setting.type(x) for x in value]
else:
config.values[setting.group][setting.dest] = setting.type(value)
return config

View File

@ -351,7 +351,9 @@ def validate_commandline_settings(config: settngs.Config[ct_ns], parser: settngs
parser.exit(message="Please specify the tags to copy to with --tags-write\n", status=1)
if config[0].Runtime_Options__recursive:
config[0].Runtime_Options__files = utils.get_recursive_filelist(config[0].Runtime_Options__files)
config[0].Runtime_Options__files = utils.os_sorted(
set(utils.get_recursive_filelist(config[0].Runtime_Options__files))
)
if not config[0].Runtime_Options__enable_embedding_hashes:
config[0].Runtime_Options__preferred_hash = ""
@ -360,7 +362,7 @@ def validate_commandline_settings(config: settngs.Config[ct_ns], parser: settngs
if not utils.which("rar"):
if platform.system() == "Windows":
letters = ["C"]
letters.extend({f"{d}" for d in "ABCDEFGHIJKLMNOPQRSTUVWXYZ" if os.path.exists(f"{d}:\\")} - {"C"})
letters.extend({f"{d}" for d in "ABDEFGHIJKLMNOPQRSTUVWXYZ" if os.path.exists(f"{d}:\\")})
for letter in letters:
# look in some likely places for Windows machines
utils.add_to_path(rf"{letter}:\Program Files\WinRAR")

View File

@ -43,7 +43,6 @@ class SettngsNS(settngs.TypedNS):
Quick_Tag__url: urllib3.util.url.Url
Quick_Tag__max: int
Quick_Tag__simple: bool
Quick_Tag__aggressive_filtering: bool
Quick_Tag__hash: list[comictaggerlib.quick_tag.HashType]
Quick_Tag__exact_only: bool
@ -170,7 +169,6 @@ class Runtime_Options(typing.TypedDict):
class Quick_Tag(typing.TypedDict):
url: urllib3.util.url.Url
max: int
simple: bool
aggressive_filtering: bool
hash: list[comictaggerlib.quick_tag.HashType]
exact_only: bool

View File

@ -18,6 +18,7 @@ from __future__ import annotations
import logging
import os
import pathlib
import platform
from typing import Callable, cast
@ -78,6 +79,8 @@ class FileSelectionList(QtWidgets.QWidget):
self.addAction(remove_action)
self.addAction(self.separator)
self.loaded_paths: set[pathlib.Path] = set()
self.dirty_flag_verification = dirty_flag_verification
self.rar_ro_shown = False
@ -115,6 +118,7 @@ class FileSelectionList(QtWidgets.QWidget):
if row == self.twList.currentRow():
current_removed = True
self.twList.removeRow(row)
self.loaded_paths -= {ca.path}
break
self.twList.setSortingEnabled(True)
@ -158,6 +162,7 @@ class FileSelectionList(QtWidgets.QWidget):
self.twList.setSortingEnabled(False)
for i in row_list:
self.loaded_paths -= {self.get_archive_by_row(i).path} # type: ignore[union-attr]
self.twList.removeRow(i)
self.twList.setSortingEnabled(True)
@ -188,21 +193,20 @@ class FileSelectionList(QtWidgets.QWidget):
progdialog.show()
center_window_on_parent(progdialog)
QtCore.QCoreApplication.processEvents()
first_added = None
rar_added_ro = False
self.twList.setSortingEnabled(False)
for idx, f in enumerate(filelist):
QtCore.QCoreApplication.processEvents()
if idx % 10 == 0:
QtCore.QCoreApplication.processEvents()
if progdialog is not None:
if progdialog.wasCanceled():
break
progdialog.setValue(idx + 1)
progdialog.setLabelText(f)
QtCore.QCoreApplication.processEvents()
row = self.add_path_item(f)
row, ca = self.add_path_item(f)
if row is not None:
ca = self.get_archive_by_row(row)
rar_added_ro = bool(ca and ca.archiver.name() == "RAR" and not ca.archiver.is_writable())
if first_added is None and row != -1:
first_added = row
@ -256,29 +260,32 @@ class FileSelectionList(QtWidgets.QWidget):
)
self.rar_ro_shown = True
def is_list_dupe(self, path: str) -> bool:
return self.get_current_list_row(path) >= 0
def get_current_list_row(self, path: str) -> tuple[int, ComicArchive]:
pl = pathlib.Path(path)
if pl not in self.loaded_paths:
return -1, None # type: ignore[return-value]
def get_current_list_row(self, path: str) -> int:
for r in range(self.twList.rowCount()):
ca = cast(ComicArchive, self.get_archive_by_row(r))
if str(ca.path) == path:
return r
if ca.path == pl:
return r, ca
return -1
return -1, None # type: ignore[return-value]
def add_path_item(self, path: str) -> int:
def add_path_item(self, path: str) -> tuple[int, ComicArchive]:
path = str(path)
path = os.path.abspath(path)
if self.is_list_dupe(path):
return self.get_current_list_row(path)
current_row, ca = self.get_current_list_row(path)
if current_row >= 0:
return current_row, ca
ca = ComicArchive(
path, str(graphics_path / "nocover.png"), hash_archive=self.config.Runtime_Options__preferred_hash
)
if ca.seems_to_be_a_comic_archive():
self.loaded_paths.add(ca.path)
row: int = self.twList.rowCount()
self.twList.insertRow(row)
@ -288,28 +295,44 @@ class FileSelectionList(QtWidgets.QWidget):
readonly_item = QtWidgets.QTableWidgetItem()
type_item = QtWidgets.QTableWidgetItem()
item_text = os.path.split(ca.path)[1]
filename_item.setFlags(QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled)
filename_item.setData(QtCore.Qt.ItemDataRole.UserRole, ca)
filename_item.setText(item_text)
filename_item.setData(QtCore.Qt.ItemDataRole.ToolTipRole, item_text)
self.twList.setItem(row, FileSelectionList.fileColNum, filename_item)
item_text = os.path.split(ca.path)[0]
folder_item.setText(item_text)
folder_item.setData(QtCore.Qt.ItemDataRole.ToolTipRole, item_text)
folder_item.setFlags(QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled)
self.twList.setItem(row, FileSelectionList.folderColNum, folder_item)
type_item.setFlags(QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled)
self.twList.setItem(row, FileSelectionList.typeColNum, type_item)
md_item.setText(", ".join(x for x in ca.get_supported_tags() if ca.has_tags(x)))
md_item.setFlags(QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled)
md_item.setTextAlignment(QtCore.Qt.AlignmentFlag.AlignHCenter)
self.twList.setItem(row, FileSelectionList.MDFlagColNum, md_item)
if not ca.is_writable():
readonly_item.setCheckState(QtCore.Qt.CheckState.Checked)
readonly_item.setData(QtCore.Qt.ItemDataRole.UserRole, True)
readonly_item.setText(" ")
else:
readonly_item.setData(QtCore.Qt.ItemDataRole.UserRole, False)
readonly_item.setCheckState(QtCore.Qt.CheckState.Unchecked)
# This is a nbsp it sorts after a space ' '
readonly_item.setText("\xa0")
readonly_item.setFlags(QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled)
readonly_item.setTextAlignment(QtCore.Qt.AlignmentFlag.AlignHCenter)
self.twList.setItem(row, FileSelectionList.readonlyColNum, readonly_item)
self.update_row(row)
return row
return -1
return row, ca
return -1, None # type: ignore[return-value]
def update_row(self, row: int) -> None:
if row >= 0:
@ -321,14 +344,14 @@ class FileSelectionList(QtWidgets.QWidget):
type_item = self.twList.item(row, FileSelectionList.typeColNum)
readonly_item = self.twList.item(row, FileSelectionList.readonlyColNum)
item_text = os.path.split(ca.path)[0]
folder_item.setText(item_text)
folder_item.setData(QtCore.Qt.ItemDataRole.ToolTipRole, item_text)
item_text = os.path.split(ca.path)[1]
filename_item.setText(item_text)
filename_item.setData(QtCore.Qt.ItemDataRole.ToolTipRole, item_text)
item_text = os.path.split(ca.path)[0]
folder_item.setText(item_text)
folder_item.setData(QtCore.Qt.ItemDataRole.ToolTipRole, item_text)
item_text = ca.archiver.name()
type_item.setText(item_text)
type_item.setData(QtCore.Qt.ItemDataRole.ToolTipRole, item_text)

View File

@ -20,6 +20,7 @@ import io
import itertools
import logging
import math
import statistics
from collections.abc import Sequence
from statistics import median
from typing import TypeVar
@ -70,13 +71,14 @@ class ImageHasher:
return 0
pixels = list(image.getdata())
avg = sum(pixels) / len(pixels)
avg = statistics.mean(pixels)
diff = "".join(str(int(p > avg)) for p in pixels)
h = 0
for i, p in enumerate(pixels):
if p > avg:
h |= 1 << len(pixels) - 1 - i
result = int(diff, 2)
return result
return h
def difference_hash(self) -> int:
try:
@ -86,24 +88,25 @@ class ImageHasher:
return 0
pixels = list(image.getdata())
diff = ""
h = 0
z = (self.width * self.height) - 1
for y in range(self.height):
for x in range(self.width):
idx = x + (self.width + 1 * y)
diff += str(int(pixels[idx] < pixels[idx + 1]))
idx = x + ((self.width + 1) * y)
if pixels[idx] < pixels[idx + 1]:
h |= 1 << z
z -= 1
result = int(diff, 2)
return h
return result
def p_hash(self) -> int:
def perception_hash(self) -> int:
"""
Pure python version of Perceptual Hash computation of https://github.com/JohannesBuchner/imagehash/tree/master
Implementation follows http://www.hackerfactor.com/blog/index.php?/archives/432-Looks-Like-It.html
"""
def generate_dct2(block: Sequence[Sequence[float]], axis: int = 0) -> list[list[float]]:
def dct1(block: Sequence[float]) -> list[float]:
def generate_dct2(block: Sequence[Sequence[float | int]], axis: int = 0) -> list[list[float | int]]:
def dct1(block: Sequence[float | int]) -> list[float | int]:
"""Perform 1D Discrete Cosine Transform (DCT) on a given block."""
N = len(block)
dct_block = [0.0] * N
@ -120,7 +123,7 @@ class ImageHasher:
"""Perform 2D Discrete Cosine Transform (DCT) on a given block along the specified axis."""
rows = len(block)
cols = len(block[0])
dct_block = [[0.0] * cols for _ in range(rows)]
dct_block: list[list[float | int]] = [[0.0] * cols for _ in range(rows)]
if axis == 0:
# Apply 1D DCT on each row
@ -138,18 +141,12 @@ class ImageHasher:
return dct_block
def convert_image_to_ndarray(image: Image.Image) -> Sequence[Sequence[float]]:
width, height = image.size
def convert_to_array(data: list[float | int]) -> list[list[float | int]]:
pixels2 = []
for y in range(height):
row = []
for x in range(width):
pixel = image.getpixel((x, y))
assert isinstance(pixel, float)
row.append(pixel)
pixels2.append(row)
for row in range(32):
x = row * 32
pixels2.append(data[x : x + 32])
return pixels2
highfreq_factor = 4
@ -161,16 +158,18 @@ class ImageHasher:
logger.exception("p_hash error converting to greyscale and resizing")
return 0
pixels = convert_image_to_ndarray(image)
pixels = convert_to_array(list(image.getdata()))
dct = generate_dct2(generate_dct2(pixels, axis=0), axis=1)
dctlowfreq = list(itertools.chain.from_iterable(row[:8] for row in dct[:8]))
med = median(dctlowfreq)
# Convert to a bit string
diff = "".join(str(int(item > med)) for item in dctlowfreq)
result = int(diff, 2)
h = 0
for i, p in enumerate(dctlowfreq):
if p > med:
h |= 1 << len(dctlowfreq) - 1 - i
return result
return h
# accepts 2 hashes (longs or hex strings) and returns the hamming distance
@ -191,5 +190,4 @@ class ImageHasher:
# xor the two numbers
n = n1 ^ n2
# count up the 1's in the binary string
return sum(b == "1" for b in bin(n)[2:])
return bin(n).count("1")

View File

@ -16,6 +16,7 @@
# limitations under the License.
from __future__ import annotations
import copy
import io
import logging
from operator import attrgetter
@ -134,7 +135,7 @@ class IssueIdentifier:
def calculate_hash(self, image_data: bytes = b"", image: Image.Image | None = None) -> int:
if self.image_hasher == 3:
return ImageHasher(data=image_data, image=image).p_hash()
return ImageHasher(data=image_data, image=image).perception_hash()
if self.image_hasher == 2:
return -1 # ImageHasher(data=image_data, image=image).average_hash2()
@ -185,7 +186,7 @@ class IssueIdentifier:
self.log_msg(f"Found {len(issues)} series that have an issue #{terms['issue_number']}")
final_cover_matching = self._cover_matching(terms, images, extra_images, issues)
final_cover_matching, full = self._cover_matching(terms, images, extra_images, issues)
# One more test for the case choosing limited series first issue vs a trade with the same cover:
# if we have a given issue count > 1 and the series from CV has count==1, remove it from match list
@ -197,10 +198,9 @@ class IssueIdentifier:
)
final_cover_matching.remove(match)
best_score = 0
if final_cover_matching:
best_score = final_cover_matching[0].distance
else:
best_score = 0
if best_score >= self.min_score_thresh:
if len(final_cover_matching) == 1:
self.log_msg("No matching pages in the issue.")
@ -220,7 +220,7 @@ class IssueIdentifier:
self.log_msg("--------------------------------------------------------------------------")
search_result = self.result_one_good_match
elif len(self.match_list) == 0:
elif len(final_cover_matching) == 0:
self.log_msg("--------------------------------------------------------------------------")
self.log_msg("No matches found :(")
self.log_msg("--------------------------------------------------------------------------")
@ -229,6 +229,7 @@ class IssueIdentifier:
# we've got multiple good matches:
self.log_msg("More than one likely candidate.")
search_result = self.result_multiple_good_matches
final_cover_matching = full # display more options for the user to pick
self.log_msg("--------------------------------------------------------------------------")
for match_item in final_cover_matching:
self._print_match(match_item)
@ -306,35 +307,42 @@ class IssueIdentifier:
def _get_issue_cover_match_score(
self,
primary_img_url: str | ImageHash,
alt_urls: list[str | ImageHash],
primary_img_url: ImageHash | None,
alt_urls: list[ImageHash],
local_hashes: list[tuple[str, int]],
use_alt_urls: bool = False,
) -> Score:
# local_hashes is a list of pre-calculated hashes.
# use_alt_urls - indicates to use alternate covers from CV
# use_alt_urls - indicates to use alternate covers
# If there is no URL return 100
if not primary_img_url:
# If there is no ImageHash or no URL and Kind, return 100 for a bad match
if primary_img_url is None or (not primary_img_url.Kind and not primary_img_url.URL and not use_alt_urls):
return Score(score=100, url="", remote_hash=0, local_hash=0, local_hash_name="0")
self._user_canceled()
remote_hashes = []
# If the cover is ImageHash and the alternate covers are URLs, the alts will not be hashed/checked currently
if isinstance(primary_img_url, ImageHash):
# ImageHash doesn't have a url so we just give it an empty string
remote_hashes.append(("", primary_img_url.Hash))
if use_alt_urls and alt_urls:
remote_hashes.extend(("", alt_hash.Hash) for alt_hash in alt_urls if isinstance(alt_hash, ImageHash))
else:
urls = [primary_img_url]
if use_alt_urls:
only_urls = [url for url in alt_urls if isinstance(url, str)]
urls.extend(only_urls)
self.log_msg(f"[{len(only_urls)} alt. covers]")
remote_hashes = self._get_remote_hashes(urls)
if primary_img_url.Kind:
remote_hashes.append((primary_img_url.URL, primary_img_url.Hash))
self.log_msg(
f"Using provided hash for cover matching. Hash: {primary_img_url.Hash}, Kind: {primary_img_url.Kind}"
)
elif primary_img_url.URL:
remote_hashes = self._get_remote_hashes([primary_img_url.URL])
self.log_msg(f"Downloading image for cover matching: {primary_img_url.URL}")
if use_alt_urls and alt_urls:
only_urls = []
for alt_url in alt_urls:
if alt_url.Kind:
remote_hashes.append((alt_url.URL, alt_url.Hash))
elif alt_url.URL:
only_urls.append(alt_url.URL)
if only_urls:
remote_hashes.extend(self._get_remote_hashes(only_urls))
self.log_msg(f"[{len(remote_hashes) - 1} alt. covers]")
score_list = []
done = False
@ -525,13 +533,12 @@ class IssueIdentifier:
)
try:
image_url = issue._cover_image if isinstance(issue._cover_image, str) else ""
# We only include urls in the IssueResult so we don't have to deal with it down the line
# TODO: display the hash to the user so they know a direct hash was used instead of downloading an image
alt_urls: list[str] = [url for url in issue._alternate_images if isinstance(url, str)]
alt_urls: list[str] = [img.URL for img in issue._alternate_images]
score_item = self._get_issue_cover_match_score(
image_url, issue._alternate_images, hashes, use_alt_urls=use_alternates
issue._cover_image, issue._alternate_images, hashes, use_alt_urls=use_alternates
)
except Exception:
logger.exception(f"Scoring series{alternate} covers failed")
@ -549,7 +556,7 @@ class IssueIdentifier:
month=issue.month,
year=issue.year,
publisher=None,
image_url=image_url,
image_url=issue._cover_image.URL if issue._cover_image else "",
alt_image_urls=alt_urls,
description=issue.description or "",
)
@ -632,7 +639,7 @@ class IssueIdentifier:
images: list[tuple[str, Image.Image]],
extra_images: list[tuple[str, Image.Image]],
issues: list[tuple[ComicSeries, GenericMetadata]],
) -> list[IssueResult]:
) -> tuple[list[IssueResult], list[IssueResult]]:
# Set hashing kind, will presume all hashes are of the same kind
for series, issue in issues:
if isinstance(issue._cover_image, ImageHash):
@ -647,7 +654,7 @@ class IssueIdentifier:
if len(cover_matching_1) == 0:
self.log_msg(":-( no matches!")
return cover_matching_1
return cover_matching_1, cover_matching_1
# sort list by image match scores
cover_matching_1.sort(key=attrgetter("distance"))
@ -681,8 +688,14 @@ class IssueIdentifier:
# now drop down into the rest of the processing
best_score = final_cover_matching[0].distance
full = copy.copy(final_cover_matching)
# now pare down list, remove any item more than specified distant from the top scores
for match_item in reversed(final_cover_matching):
if match_item.distance > (best_score + self.min_score_distance):
final_cover_matching.remove(match_item)
return final_cover_matching
# If we have 5 or less results we don't trim as the user can pick
if len(final_cover_matching) > 5:
full = final_cover_matching
return final_cover_matching, full

View File

@ -223,8 +223,9 @@ class IssueSelectionWindow(QtWidgets.QDialog):
self.issue_number = issue.issue or ""
# We don't currently have a way to display hashes to the user
# TODO: display the hash to the user so they know it will be used for cover matching
alt_images = [url for url in issue._alternate_images if isinstance(url, str)]
self.coverWidget.set_issue_details(self.issue_id, [str(issue._cover_image) or "", *alt_images])
alt_images = [url.URL for url in issue._alternate_images]
cover = issue._cover_image.URL if issue._cover_image else ""
self.coverWidget.set_issue_details(self.issue_id, [cover, *alt_images])
if issue.description is None:
self.set_description(self.teDescription, "")
else:

View File

@ -46,7 +46,8 @@ def setup_logging(verbose: int, log_dir: pathlib.Path) -> None:
logging.basicConfig(
handlers=[stream_handler, file_handler],
level=logging.WARNING,
format="%(asctime)s | %(name)s | %(levelname)s | %(message)s",
style="{",
format="{asctime} | {name:<30} | {levelname:<7} | {message}",
datefmt="%Y-%m-%dT%H:%M:%S",
)

View File

@ -88,7 +88,12 @@ def configure_locale() -> None:
if code != "":
os.environ["LANG"] = f"{code}.utf-8"
locale.setlocale(locale.LC_ALL, "")
# Get locale settings from OS, fall back to en_US or C in case of error for minimalist or misconfigured systems
try:
locale.setlocale(locale.LC_ALL, "")
except locale.Error:
locale.setlocale(locale.LC_ALL, "C")
logger.error("Couldn't set the locale: unsupported locale setting; falling back to 'C' locale")
sys.stdout.reconfigure(encoding=sys.getdefaultencoding()) # type: ignore[union-attr]
sys.stderr.reconfigure(encoding=sys.getdefaultencoding()) # type: ignore[union-attr]
sys.stdin.reconfigure(encoding=sys.getdefaultencoding()) # type: ignore[union-attr]

View File

@ -1,11 +1,18 @@
from __future__ import annotations
import argparse
import contextlib
import itertools
import logging
import pathlib
import sqlite3
import statistics
import threading
from collections.abc import Iterable
from enum import auto
from functools import cached_property
from io import BytesIO
from typing import Callable, TypedDict, cast
from typing import TYPE_CHECKING, Callable, NamedTuple, TypedDict, overload
from urllib.parse import urljoin
import requests
@ -18,47 +25,217 @@ from comictaggerlib.ctsettings.settngs_namespace import SettngsNS
from comictaggerlib.imagehasher import ImageHasher
from comictalker import ComicTalker
if TYPE_CHECKING:
from _typeshed import SupportsRichComparison
logger = logging.getLogger(__name__)
__version__ = "0.1"
class HashType(utils.StrEnum):
AHASH = auto()
DHASH = auto()
# Unknown = 'Unknown'
PHASH = auto()
DHASH = auto()
AHASH = auto()
class SimpleResult(TypedDict):
Distance: int
# Mapping of domains (eg comicvine.gamespot.com) to IDs
IDList: dict[str, list[str]]
def __repr__(self) -> str:
return str(self)
class Hash(TypedDict):
Hash: int
Kind: str
Kind: HashType
class ID_dict(TypedDict):
Domain: str
ID: str
class ID(NamedTuple):
Domain: str
ID: str
class Result(TypedDict):
# Mapping of domains (eg comicvine.gamespot.com) to IDs
IDs: dict[str, list[str]]
Distance: int
Hash: Hash
ID: ID_dict
Distance: int
EquivalentIDs: list[ID_dict]
def ihash(types: str) -> list[HashType]:
result: list[HashType] = []
types = types.casefold()
choices = ", ".join(HashType)
for typ in utils.split(types, ","):
if typ not in list(HashType):
raise argparse.ArgumentTypeError(f"invalid choice: {typ} (choose from {choices.upper()})")
result.append(HashType[typ.upper()])
class ResultList(NamedTuple):
distance: int
results: list[Result]
if not result:
raise argparse.ArgumentTypeError(f"invalid choice: {types} (choose from {choices.upper()})")
return result
class Distance(NamedTuple):
hash: HashType
distance: int
def __repr__(self) -> str:
return f"{self.hash}={self.distance}"
class Hashes:
hashes: tuple[Result, ...]
id: ID
def __init__(
self,
*,
hashes: Iterable[Result],
id: ID | None = None, # noqa: A002
) -> None:
self.hashes = tuple(
sorted(hashes, key=lambda x: list(HashType.__members__.values()).index(HashType(x["Hash"]["Kind"])))
)
self.count = len(self.hashes)
if id is None:
self.id = ID(**self.hash()["ID"])
else:
self.id = id
@overload
def hash(self) -> Result: ...
@overload
def hash(self, hash_type: HashType) -> Result | None: ...
def hash(self, hash_type: HashType | None = None) -> Result | None:
if hash_type:
for _hash in self.hashes:
if _hash["Hash"]["Kind"] == hash_type:
return _hash
return None
return self.hashes[0]
@cached_property
def distance(self) -> int:
return int(statistics.mean(x["Distance"] for x in self.hashes))
@cached_property
def score(self) -> int:
# Get the distances as a value between 0 and 1. Lowest value is 55/64 ~ 0.85
hashes: list[float] = [(64 - x["Distance"]) / 64 for x in self.hashes]
hashes.extend((64 - 9) // 64 for x in range(len(HashType) - len(hashes)))
mod = {
3: 64 / 64,
2: 60 / 64,
1: 58 / 64,
}[len(self.hashes)]
# Add an extra mod value to bring the score up if there are more hashes
hashes.append(mod)
return int(statistics.mean(int(x * 100) for x in hashes))
@cached_property
def kinds(self) -> set[HashType]:
return {HashType(x["Hash"]["Kind"]) for x in self.hashes}
@cached_property
def distances(self) -> tuple[Distance, ...]:
return tuple(Distance(HashType(x["Hash"]["Kind"]), x["Distance"]) for x in self.hashes)
@cached_property
def exact(self) -> bool:
return self.score >= 95 and len(self.hashes) > 1
@cached_property
def key(self) -> tuple[SupportsRichComparison, ...]:
return (-self.count, tuple(x["Distance"] for x in self.hashes))
def should_break(self, previous: Hashes) -> bool:
group_limit = 3
if (previous.count - self.count) == 1:
group_limit = 2
if (previous.count - self.count) == 2:
group_limit = 0
if (self.distance - previous.distance) > group_limit:
return True
if len(self.hashes) == 1 and self.hashes[0]["Hash"]["Kind"] == HashType.AHASH:
if previous.count > 1:
return True
return False
def __repr__(self) -> str:
return f"Hashes(id={self.id!r}, count={self.count!r}, distance={self.distance!r}, score={self.score!r}, 'exact'={self.exact!r})"
class NameMatches(NamedTuple):
confident_match: tuple[tuple[Hashes, GenericMetadata], ...]
probable_match: tuple[tuple[Hashes, GenericMetadata], ...]
other_match: tuple[tuple[Hashes, GenericMetadata], ...]
class IDCache:
def __init__(self, cache_folder: pathlib.Path, version: str) -> None:
self.cache_folder = cache_folder
self.db_file = cache_folder / "bad_ids.db"
self.version = version
self.local: threading.Thread | None = None
self.db: sqlite3.Connection | None = None
self.create_cache_db()
def clear_cache(self) -> None:
try:
self.close()
except Exception:
pass
try:
self.db_file.unlink(missing_ok=True)
except Exception:
pass
def connect(self) -> sqlite3.Connection:
if self.local != threading.current_thread():
self.db = None
if self.db is None:
self.local = threading.current_thread()
self.db = sqlite3.connect(self.db_file)
self.db.row_factory = sqlite3.Row
self.db.text_factory = str
return self.db
def close(self) -> None:
if self.db is not None:
self.db.close()
self.db = None
def create_cache_db(self) -> None:
# create tables
with self.connect() as con, contextlib.closing(con.cursor()) as cur:
cur.execute(
"""CREATE TABLE IF NOT EXISTS bad_ids(
domain TEXT NOT NULL,
id TEXT NOT NULL,
PRIMARY KEY (id, domain))"""
)
def add_ids(self, bad_ids: set[ID]) -> None:
with self.connect() as con, contextlib.closing(con.cursor()) as cur:
for bad_id in bad_ids:
cur.execute(
"""INSERT into bad_ids (domain, ID) VALUES (?, ?) ON CONFLICT DO NOTHING""",
(bad_id.Domain, bad_id.ID),
)
def get_ids(self) -> dict[str, set[ID]]:
# purge stale series info
ids: dict[str, set[ID]] = utils.DefaultDict(default=lambda x: set())
with self.connect() as con, contextlib.closing(con.cursor()) as cur:
cur.execute(
"""SELECT * FROM bad_ids""",
)
for record in cur.fetchall():
ids[record["domain"]] |= {ID(Domain=record["domain"], ID=record["id"])}
return ids
def settings(manager: settngs.Manager) -> None:
@ -67,7 +244,7 @@ def settings(manager: settngs.Manager) -> None:
"-u",
default="https://comic-hasher.narnian.us",
type=utils.parse_url,
help="Website to use for searching cover hashes",
help="Server to use for searching cover hashes",
)
manager.add_setting(
"--max",
@ -75,47 +252,70 @@ def settings(manager: settngs.Manager) -> None:
type=int,
help="Maximum score to allow. Lower score means more accurate",
)
manager.add_setting(
"--simple",
default=False,
action=argparse.BooleanOptionalAction,
help="Whether to retrieve simple results or full results",
)
manager.add_setting(
"--aggressive-filtering",
default=False,
action=argparse.BooleanOptionalAction,
help="Will filter out worse matches if better matches are found",
help="Will filter out matches more aggressively",
)
manager.add_setting(
"--hash",
default="ahash, dhash, phash",
type=ihash,
default=list(HashType),
type=HashType,
nargs="+",
help="Pick what hashes you want to use to search (default: %(default)s)",
)
manager.add_setting(
"--exact-only",
default=True,
action=argparse.BooleanOptionalAction,
help="Skip non-exact matches if we have exact matches",
help="Skip non-exact matches if exact matches are found",
)
KNOWN_BAD_IDS: dict[str, set[ID]] = utils.DefaultDict(
{
"comicvine.gamespot.com": {
ID("comicvine.gamespot.com", "737049"),
ID("comicvine.gamespot.com", "753078"),
ID("comicvine.gamespot.com", "390219"),
}
},
default=lambda x: set(),
)
def limit(results: Iterable[Hashes], limit: int) -> list[list[Hashes]]:
hashes: list[list[Hashes]] = []
r = list(results)
for _, result_list in itertools.groupby(r, key=lambda r: r.count):
result_l = list(result_list)
hashes.append(sorted(result_l[:limit], key=lambda r: r.key))
limit -= len(result_l)
if limit <= 0:
break
return hashes
class QuickTag:
def __init__(
self, url: utils.Url, domain: str, talker: ComicTalker, config: SettngsNS, output: Callable[[str], None]
self, url: utils.Url, domain: str, talker: ComicTalker, config: SettngsNS, output: Callable[..., None]
):
self.output = output
self.url = url
self.talker = talker
self.domain = domain
self.config = config
self.bad_ids = IDCache(config.Runtime_Options__config.user_cache_dir, __version__)
self.known_bad_ids = self.bad_ids.get_ids()
for domain, bad_ids in KNOWN_BAD_IDS.items():
self.known_bad_ids[domain] |= bad_ids
def id_comic(
self,
ca: comicarchive.ComicArchive,
tags: GenericMetadata,
simple: bool,
hashes: set[HashType],
exact_only: bool,
interactive: bool,
@ -128,6 +328,10 @@ class QuickTag:
cover_index = tags.get_cover_page_index_list()[0]
cover_image = Image.open(BytesIO(ca.get_page(cover_index)))
cover_image.load()
self.limit = 30
if aggressive_filtering:
self.limit = 15
self.output(f"Tagging: {ca.path}")
@ -139,35 +343,47 @@ class QuickTag:
if HashType.DHASH in hashes:
dhash = hex(hasher.difference_hash())[2:]
if HashType.PHASH in hashes:
phash = hex(hasher.p_hash())[2:]
logger.info(f"Searching with {ahash=}, {dhash=}, {phash=}")
phash = hex(hasher.perception_hash())[2:]
self.output("Searching hashes")
results = self.SearchHashes(simple, max_hamming_distance, ahash, dhash, phash, exact_only)
logger.debug(f"{results=}")
logger.info(
"Searching with ahash=%s, dhash=%s, phash=%s",
ahash,
dhash,
phash,
)
results = self.SearchHashes(max_hamming_distance, ahash, dhash, phash, exact_only)
logger.debug("results=%s", results)
if not results:
self.output("No results found for QuickTag")
return None
if simple:
filtered_simple_results = self.filter_simple_results(
cast(list[SimpleResult], results), interactive, aggressive_filtering
IDs = [
Hashes(hashes=(g[1] for g in group), id=i)
for i, group in itertools.groupby(
sorted(((ID(**r["ID"]), (r)) for r in results), key=lambda r: (r[0], r[1]["Hash"]["Kind"])),
key=lambda r: r[0],
)
metadata_simple_results = self.get_simple_results(filtered_simple_results)
chosen_result = self.display_simple_results(metadata_simple_results, tags, interactive)
else:
filtered_results = self.filter_results(cast(list[Result], results), interactive, aggressive_filtering)
metadata_results = self.get_results(filtered_results)
chosen_result = self.display_results(metadata_results, tags, interactive)
]
IDs = sorted(IDs, key=lambda r: r.key)
self.output(f"Total number of IDs found: {len(IDs)}")
logger.debug("IDs=%s", IDs)
return self.talker.fetch_comic_data(issue_id=chosen_result.issue_id)
aggressive_results, display_results = self.match_results(IDs, aggressive_filtering)
chosen_result = self.display_results(
aggressive_results, display_results, ca, tags, interactive, aggressive_filtering
)
if chosen_result:
return self.talker.fetch_comic_data(issue_id=chosen_result.ID)
return None
def SearchHashes(
self, simple: bool, max_hamming_distance: int, ahash: str, dhash: str, phash: str, exact_only: bool
) -> list[SimpleResult] | list[Result]:
self, max_hamming_distance: int, ahash: str, dhash: str, phash: str, exact_only: bool
) -> list[Result]:
resp = requests.get(
urljoin(self.url.url, "/match_cover_hash"),
params={
"simple": str(simple),
"max": str(max_hamming_distance),
"ahash": ahash,
"dhash": dhash,
@ -186,206 +402,205 @@ class QuickTag:
raise Exception(f"Failed to retrieve results from the server: {text}")
return resp.json()["results"]
def get_mds(self, results: list[SimpleResult] | list[Result]) -> list[GenericMetadata]:
def get_mds(self, ids: Iterable[ID]) -> list[GenericMetadata]:
md_results: list[GenericMetadata] = []
results.sort(key=lambda r: r["Distance"])
all_ids = set()
for res in results:
all_ids.update(res.get("IDList", res.get("IDs", {})).get(self.domain, [])) # type: ignore[attr-defined]
ids = {md_id for md_id in ids if md_id.Domain == self.domain}
all_ids = {md_id.ID for md_id in ids if md_id.Domain == self.domain}
self.output(f"Retrieving basic {self.talker.name} data")
# Try to do a bulk feth of basic issue data
if hasattr(self.talker, "fetch_comics"):
# Try to do a bulk fetch of basic issue data, if we have more than 1 id
if hasattr(self.talker, "fetch_comics") and len(all_ids) > 1:
md_results = self.talker.fetch_comics(issue_ids=list(all_ids))
else:
for md_id in all_ids:
md_results.append(self.talker.fetch_comic_data(issue_id=md_id))
retrieved_ids = {ID(self.domain, md.issue_id) for md in md_results} # type: ignore[arg-type]
bad_ids = ids - retrieved_ids
if bad_ids:
logger.debug("Adding bad IDs to known list: %s", bad_ids)
self.known_bad_ids[self.domain] |= bad_ids
self.bad_ids.add_ids(bad_ids)
return md_results
def get_simple_results(self, results: list[SimpleResult]) -> list[tuple[int, GenericMetadata]]:
md_results = []
mds = self.get_mds(results)
def _filter_hash_results(self, results: Iterable[Hashes]) -> list[Hashes]:
groups: list[Hashes] = []
previous: dict[HashType, None | int] = dict.fromkeys(HashType)
skipped: list[Hashes] = []
for hash_group in sorted(results, key=lambda r: r.key):
b = []
if skipped:
skipped.append(hash_group)
for _hash in hash_group.hashes:
prev = previous[_hash["Hash"]["Kind"]]
b.append(prev is not None and (_hash["Distance"] - prev) > 3)
previous[_hash["Hash"]["Kind"]] = _hash["Distance"]
if b and all(b):
skipped.append(hash_group)
# Re-associate the md to the distance
for res in results:
for md in mds:
if md.issue_id in res["IDList"].get(self.domain, []):
md_results.append((res["Distance"], md))
return md_results
def get_results(self, results: list[Result]) -> list[tuple[int, Hash, GenericMetadata]]:
md_results = []
mds = self.get_mds(results)
# Re-associate the md to the distance
for res in results:
for md in mds:
if md.issue_id in res["IDs"].get(self.domain, []):
md_results.append((res["Distance"], res["Hash"], md))
return md_results
def filter_simple_results(
self, results: list[SimpleResult], interactive: bool, aggressive_filtering: bool
) -> list[SimpleResult]:
# If there is a single exact match return it
exact = [r for r in results if r["Distance"] == 0]
if len(exact) == 1:
logger.info("Exact result found. Ignoring any others")
return exact
# If ther are more than 4 results and any are better than 6 return the first group of results
if len(results) > 4:
dist: list[tuple[int, list[SimpleResult]]] = []
filtered_results: list[SimpleResult] = []
for distance, group in itertools.groupby(results, key=lambda r: r["Distance"]):
dist.append((distance, list(group)))
if aggressive_filtering and dist[0][0] < 6:
logger.info(f"Aggressive filtering is enabled. Dropping matches above {dist[0]}")
for _, res in dist[:1]:
filtered_results.extend(res)
logger.debug(f"{filtered_results=}")
return filtered_results
return results
def filter_results(self, results: list[Result], interactive: bool, aggressive_filtering: bool) -> list[Result]:
ahash_results = sorted([r for r in results if r["Hash"]["Kind"] == "ahash"], key=lambda r: r["Distance"])
dhash_results = sorted([r for r in results if r["Hash"]["Kind"] == "dhash"], key=lambda r: r["Distance"])
phash_results = sorted([r for r in results if r["Hash"]["Kind"] == "phash"], key=lambda r: r["Distance"])
hash_results = [phash_results, dhash_results, ahash_results]
# If any of the hash types have a single exact match return it. Prefer phash for no particular reason
for hashed_result in hash_results:
exact = [r for r in hashed_result if r["Distance"] == 0]
if len(exact) == 1:
logger.info(f"Exact {exact[0]['Hash']['Kind']} result found. Ignoring any others")
return exact
results_filtered = False
# If any of the hash types have more than 4 results and they have results better than 6 return the first group of results for each hash type
for i, hashed_results in enumerate(hash_results):
filtered_results: list[Result] = []
if len(hashed_results) > 4:
dist: list[tuple[int, list[Result]]] = []
for distance, group in itertools.groupby(hashed_results, key=lambda r: r["Distance"]):
dist.append((distance, list(group)))
if aggressive_filtering and dist[0][0] < 6:
logger.info(
f"Aggressive filtering is enabled. Dropping {dist[0][1][0]['Hash']['Kind']} matches above {dist[0][0]}"
)
for _, res in dist[:1]:
filtered_results.extend(res)
if filtered_results:
hash_results[i] = filtered_results
results_filtered = True
if results_filtered:
logger.debug(f"filtered_results={list(itertools.chain(*hash_results))}")
return list(itertools.chain(*hash_results))
def display_simple_results(
self, md_results: list[tuple[int, GenericMetadata]], tags: GenericMetadata, interactive: bool
) -> GenericMetadata:
if len(md_results) < 1:
return GenericMetadata()
if len(md_results) == 1 and md_results[0][0] <= 4:
self.output("Found a single match <=4. Assuming it's correct")
return md_results[0][1]
series_match: list[GenericMetadata] = []
for score, md in md_results:
if (
score < 10
and tags.series
and md.series
and utils.titles_match(tags.series, md.series)
and IssueString(tags.issue).as_string() == IssueString(md.issue).as_string()
):
series_match.append(md)
if len(series_match) == 1:
self.output(f"Found match with series name {series_match[0].series!r}")
return series_match[0]
if not interactive:
return GenericMetadata()
md_results.sort(key=lambda r: (r[0], len(r[1].publisher or "")))
for counter, r in enumerate(md_results, 1):
self.output(
" {:2}. score: {} [{:15}] ({:02}/{:04}) - {} #{} - {}".format(
counter,
r[0],
r[1].publisher,
r[1].month or 0,
r[1].year or 0,
r[1].series,
r[1].issue,
r[1].title,
),
groups.append(hash_group)
if skipped:
logger.debug(
"Filtering bottom %d of %s results as they seem to all be substantially worse",
len(skipped),
len(skipped) + len(groups),
)
while True:
i = input(
f'Please select a result to tag the comic with or "q" to quit: [1-{len(md_results)}] ',
).casefold()
if i.isdigit() and int(i) in range(1, len(md_results) + 1):
break
if i == "q":
logger.warning("User quit without saving metadata")
return GenericMetadata()
return groups
return md_results[int(i) - 1][1]
def _filter_hashes(self, hashes: Iterable[Hashes], aggressive_filtering: bool) -> tuple[list[Hashes], list[Hashes]]:
hashes = list(hashes)
if not hashes:
return [], []
aggressive_skip = False
skipped: list[Hashes] = []
hashes = sorted(hashes, key=lambda r: r.key)
groups: list[Hashes] = [hashes[0]]
aggressive_groups = [hashes[0]]
previous = hashes[0]
for group in hashes[1:]:
group_limit = 3
if (group.distance - previous.distance) > group_limit or skipped:
skipped.append(group)
elif aggressive_filtering:
if group.should_break(previous):
aggressive_skip = True
if not aggressive_skip:
aggressive_groups.append(group)
groups.append(group)
previous = group
if skipped or len(groups) - len(aggressive_groups) > 0:
logger.debug("skipping (%d|%d)/%d results", len(skipped), len(groups) - len(aggressive_groups), len(hashes))
return aggressive_groups, groups
def match_results(self, results: list[Hashes], aggressive_filtering: bool) -> tuple[list[Hashes], list[Hashes]]:
exact = [r for r in results if r.exact]
limited = limit(results, self.limit)
logger.debug("Only looking at the top %d out of %d hash scores", min(len(results), self.limit), len(results))
# Filter out results if there is a gap > 3 in distance
for i, hashed_results in enumerate(limited):
limited[i] = self._filter_hash_results(hashed_results)
aggressive, normal = self._filter_hashes(itertools.chain.from_iterable(limited), aggressive_filtering)
if exact:
self.output(f"{len(exact)} exact result found. Ignoring any others: {exact}")
aggressive = exact # I've never seen more than 2 "exact" matches
return aggressive, normal
def match_names(self, tags: GenericMetadata, results: list[tuple[Hashes, GenericMetadata]]) -> NameMatches:
confident_match: list[tuple[Hashes, GenericMetadata]] = []
probable_match: list[tuple[Hashes, GenericMetadata]] = []
other_match: list[tuple[Hashes, GenericMetadata]] = []
for result, md in results:
assert md.issue_id
assert md.series
assert md.issue
titles_match = tags.series and utils.titles_match(tags.series, md.series, threshold=70)
issues_match = tags.issue and IssueString(tags.issue).as_string() == IssueString(md.issue).as_string()
if titles_match and issues_match:
confident_match.append((result, md))
elif (titles_match or issues_match) and result.distance < 6:
probable_match.append((result, md))
else:
other_match.append((result, md))
return NameMatches(tuple(confident_match), tuple(probable_match), tuple(other_match))
def display_results(
self,
md_results: list[tuple[int, Hash, GenericMetadata]],
results: list[Hashes],
display_results: list[Hashes],
ca: comicarchive.ComicArchive,
tags: GenericMetadata,
interactive: bool,
) -> GenericMetadata:
if len(md_results) < 1:
return GenericMetadata()
if len(md_results) == 1 and md_results[0][0] <= 4:
self.output("Found a single match <=4. Assuming it's correct")
return md_results[0][2]
series_match: dict[str, tuple[int, Hash, GenericMetadata]] = {}
for score, cover_hash, md in md_results:
if (
score < 10
and tags.series
and md.series
and utils.titles_match(tags.series, md.series)
and IssueString(tags.issue).as_string() == IssueString(md.issue).as_string()
):
assert md.issue_id
series_match[md.issue_id] = (score, cover_hash, md)
aggressive_filtering: bool,
) -> ID | None:
if len(results) < 1:
return None
# we only return early if we don't have a series name or issue as get_mds will pull the full info if there is only one result
if (
not (tags.series or tags.issue)
and not interactive
and aggressive_filtering
and len(results) == 1
and (results[0].distance < 4 or results[0].score >= 95)
):
self.output("Found a single match < 4. Assuming it's correct")
return results[0].id
limited = limit((r for r in results if r.id not in KNOWN_BAD_IDS.get(self.domain, set())), self.limit)
ids = {r.id: r for r in itertools.chain.from_iterable(limited)}
mds = [(ids[ID(self.domain, md.issue_id)], md) for md in self.get_mds(ids)] # type: ignore[arg-type]
matches = self.match_names(tags, mds)
if len(matches.confident_match) == 1:
result, md = matches.confident_match[0]
self.output(f"Found confident {result.distances} match with series name {md.series!r}")
return result.id
elif len(matches.probable_match) == 1:
result, md = matches.probable_match[0]
self.output(f"Found probable {result.distances} match with series name {md.series!r}")
return result.id
elif len(matches.other_match) == 1 and matches.other_match[0][0].distance < 4:
result, md = matches.other_match[0]
self.output(f"Found a {result.distances} match with series name {md.series!r}")
return result.id
if len(series_match) == 1:
score, cover_hash, md = list(series_match.values())[0]
self.output(f"Found {cover_hash['Kind']} {score=} match with series name {md.series!r}")
return md
if not interactive:
return GenericMetadata()
md_results.sort(key=lambda r: (r[0], len(r[2].publisher or ""), r[1]["Kind"]))
for counter, r in enumerate(md_results, 1):
return None
limited_interactive = limit(
(r for r in display_results if r.id not in KNOWN_BAD_IDS.get(self.domain, set())), self.limit
)
ids_interactive = {r.id: r for r in itertools.chain.from_iterable(limited_interactive)}
mds_interactive = [(ids_interactive[ID(self.domain, md.issue_id)], md) for md in self.get_mds(ids_interactive)] # type: ignore[arg-type]
interactive_only_ids = set(ids_interactive).difference(ids)
items = sorted(mds_interactive, key=lambda r: r[0].key)
self.output(
f"\nSelect result for {ca.path.name}, page count: {ca.get_number_of_pages()} :\n", force_output=True
)
for counter, r in enumerate(items, 1):
hashes, md = r
self.output(
" {:2}. score: {} {}: {:064b} [{:15}] ({:02}/{:04}) - {} #{} - {}".format(
"{}{:2}. {:6} {!s} distance: {}({}) - {} #{} [{}] ({}/{}) - {}".format(
" " if hashes.id in interactive_only_ids else "*",
counter,
r[0],
r[1]["Kind"],
r[1]["Hash"],
r[2].publisher or "",
r[2].month or 0,
r[2].year or 0,
r[2].series or "",
r[2].issue or "",
r[2].title or "",
hashes.id.ID,
hashes.distances,
hashes.distance,
hashes.score,
md.series or "",
md.issue or "",
md.publisher or "",
md.month or "",
md.year or "",
md.title or "",
),
force_output=True,
)
while True:
i = input(
f'Please select a result to tag the comic with or "q" to quit: [1-{len(md_results)}] ',
f'Please select a result to tag the comic with or "q" to quit: [1-{len(results)}] ',
).casefold()
if i.isdigit() and int(i) in range(1, len(md_results) + 1):
if i.isdigit() and int(i) in range(1, len(results) + 1):
break
if i == "q":
if i.startswith("q"):
self.output("User quit without saving metadata")
return GenericMetadata()
return None
self.output("")
return md_results[int(i) - 1][2]
return items[int(i) - 1][0].id

View File

@ -191,13 +191,13 @@ class RenameWindow(QtWidgets.QDialog):
try:
for idx, comic in enumerate(zip(self.comic_archive_list, self.rename_list), 1):
QtCore.QCoreApplication.processEvents()
if prog_dialog.wasCanceled():
break
prog_dialog.setValue(idx)
prog_dialog.setLabelText(comic[1])
QtCore.QCoreApplication.processEvents()
if idx % 5 == 0:
QtCore.QCoreApplication.processEvents()
folder = get_rename_dir(
comic[0],

View File

@ -254,8 +254,6 @@ class SeriesSelectionWindow(QtWidgets.QDialog):
self.iddialog.textEdit.append(text.rstrip())
self.iddialog.textEdit.ensureCursorVisible()
QtCore.QCoreApplication.processEvents()
QtCore.QCoreApplication.processEvents()
QtCore.QCoreApplication.processEvents()
def identify_progress(self, cur: int, total: int) -> None:
if self.iddialog is not None:
@ -489,14 +487,13 @@ class SeriesSelectionWindow(QtWidgets.QDialog):
def showEvent(self, event: QtGui.QShowEvent) -> None:
self.perform_query()
QtCore.QCoreApplication.processEvents()
if not self.series_list:
QtCore.QCoreApplication.processEvents()
QtWidgets.QMessageBox.information(self, "Search Result", "No matches found!")
QtCore.QTimer.singleShot(200, self.close_me)
elif self.immediate_autoselect:
# defer the immediate autoselect so this dialog has time to pop up
QtCore.QCoreApplication.processEvents()
QtCore.QTimer.singleShot(10, self.do_immediate_autoselect)
def do_immediate_autoselect(self) -> None:

View File

@ -234,8 +234,8 @@ class TaggerWindow(QtWidgets.QMainWindow):
if self.config[0].Runtime_Options__preferred_hash:
self.config[0].internal__embedded_hash_type = self.config[0].Runtime_Options__preferred_hash
self.selected_write_tags: list[str] = config[0].internal__write_tags
self.selected_read_tags: list[str] = config[0].internal__read_tags
self.selected_write_tags: list[str] = config[0].internal__write_tags or list(self.enabled_tags())
self.selected_read_tags: list[str] = config[0].internal__read_tags or list(self.enabled_tags())
self.setAcceptDrops(True)
self.view_tag_actions, self.remove_tag_actions = self.tag_actions()
@ -574,13 +574,13 @@ class TaggerWindow(QtWidgets.QMainWindow):
for prog_idx, ca in enumerate(to_zip, 1):
logger.debug("Exporting comic %d: %s", prog_idx, ca.path)
QtCore.QCoreApplication.processEvents()
if prog_idx % 10 == 0:
QtCore.QCoreApplication.processEvents()
if prog_dialog is not None:
if prog_dialog.wasCanceled():
break
prog_dialog.setValue(prog_idx)
prog_dialog.setLabelText(str(ca.path))
QtCore.QCoreApplication.processEvents()
export_name = ca.path.with_suffix(".cbz")
export = True
@ -610,7 +610,6 @@ class TaggerWindow(QtWidgets.QMainWindow):
if prog_dialog is not None:
prog_dialog.hide()
QtCore.QCoreApplication.processEvents()
self.fileSelectionList.remove_archive_list(archives_to_remove)
summary = f"Successfully created {success_count} Zip archive(s)."
@ -1060,7 +1059,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
if dialog.exec():
file_list = dialog.selectedFiles()
if file_list:
self.fileSelectionList.twList.selectRow(self.fileSelectionList.add_path_item(file_list[0]))
self.fileSelectionList.twList.selectRow(self.fileSelectionList.add_path_item(file_list[0])[0])
def select_file(self, folder_mode: bool = False) -> None:
dialog = self.file_dialog(folder_mode=folder_mode)
@ -1595,17 +1594,16 @@ class TaggerWindow(QtWidgets.QMainWindow):
progdialog.setWindowModality(QtCore.Qt.WindowModality.WindowModal)
progdialog.setMinimumDuration(300)
center_window_on_parent(progdialog)
QtCore.QCoreApplication.processEvents()
failed_list = []
success_count = 0
for prog_idx, ca in enumerate(ca_list, 1):
QtCore.QCoreApplication.processEvents()
if prog_idx % 10 == 0:
QtCore.QCoreApplication.processEvents()
if progdialog.wasCanceled():
break
progdialog.setValue(prog_idx)
progdialog.setLabelText(str(ca.path))
QtCore.QCoreApplication.processEvents()
for tag_id in tag_ids:
if ca.has_tags(tag_id) and ca.is_writable():
if ca.remove_tags(tag_id):
@ -1694,6 +1692,8 @@ class TaggerWindow(QtWidgets.QMainWindow):
failed_list = []
success_count = 0
for prog_idx, ca in enumerate(ca_list, 1):
if prog_idx % 10 == 0:
QtCore.QCoreApplication.processEvents()
ca_saved = False
md, error = self.read_selected_tags(src_tag_ids, ca)
if error is not None:
@ -1704,14 +1704,12 @@ class TaggerWindow(QtWidgets.QMainWindow):
for tag_id in dest_tag_ids:
if ca.has_tags(tag_id):
QtCore.QCoreApplication.processEvents()
if prog_dialog.wasCanceled():
break
prog_dialog.setValue(prog_idx)
prog_dialog.setLabelText(str(ca.path))
center_window_on_parent(prog_dialog)
QtCore.QCoreApplication.processEvents()
if tag_id == "cbi" and self.config[0].Metadata_Options__apply_transform_on_bulk_operation:
md = CBLTransformer(md, self.config[0]).apply()
@ -1748,8 +1746,6 @@ class TaggerWindow(QtWidgets.QMainWindow):
self.atprogdialog.textEdit.append(text.rstrip())
self.atprogdialog.textEdit.ensureCursorVisible()
QtCore.QCoreApplication.processEvents()
QtCore.QCoreApplication.processEvents()
QtCore.QCoreApplication.processEvents()
def identify_and_tag_single_archive(
self, ca: ComicArchive, match_results: OnlineMatchResults, dlg: AutoTagStartWindow
@ -1981,6 +1977,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
self.auto_tag_log("==========================================================================\n")
self.auto_tag_log(f"Auto-Tagging {prog_idx} of {len(ca_list)}\n")
self.auto_tag_log(f"{ca.path}\n")
QtCore.QCoreApplication.processEvents()
try:
cover_idx = ca.read_tags(self.selected_read_tags[0]).get_cover_page_index_list()[0]
except Exception as e:
@ -1990,13 +1987,11 @@ class TaggerWindow(QtWidgets.QMainWindow):
self.atprogdialog.set_archive_image(image_data)
self.atprogdialog.set_test_image(b"")
QtCore.QCoreApplication.processEvents()
if self.atprogdialog.isdone:
break
self.atprogdialog.progressBar.setValue(prog_idx)
self.atprogdialog.label.setText(str(ca.path))
QtCore.QCoreApplication.processEvents()
if ca.is_writable():
success, match_results = self.identify_and_tag_single_archive(ca, match_results, atstartdlg)
@ -2307,7 +2302,6 @@ class TaggerWindow(QtWidgets.QMainWindow):
self.setWindowFlags(
flags | QtCore.Qt.WindowType.WindowStaysOnTopHint | QtCore.Qt.WindowType.X11BypassWindowManagerHint
)
QtCore.QCoreApplication.processEvents()
self.setWindowFlags(flags)
self.show()

View File

@ -9,6 +9,7 @@ import webbrowser
from collections.abc import Collection, Sequence
from PyQt5.QtCore import QUrl
from PyQt5.QtGui import QPalette
from PyQt5.QtWidgets import QWidget
logger = logging.getLogger(__name__)
@ -28,7 +29,7 @@ if qt_available:
pil_available = True
except ImportError:
pil_available = False
active_palette: QPalette | None = None
try:
from PyQt5.QtWebEngineWidgets import QWebEnginePage, QWebEngineView
@ -124,6 +125,12 @@ if qt_available:
def get_qimage_from_data(image_data: bytes) -> QtGui.QImage:
img = QtGui.QImage()
if len(image_data) == 0:
logger.warning("Empty image data.")
img.load(":/graphics/nocover.png")
return img
success = img.loadFromData(image_data)
if not success:
try:
@ -133,7 +140,7 @@ if qt_available:
Image.open(io.BytesIO(image_data)).save(buffer, format="ppm")
success = img.loadFromData(buffer.getvalue())
except Exception:
logger.exception("Failed to load the image")
logger.exception("Failed to load the image.")
# if still nothing, go with default image
if not success:
img.load(":/graphics/nocover.png")
@ -146,8 +153,6 @@ if qt_available:
QtWidgets.QMessageBox.critical(QtWidgets.QMainWindow(), "Error", msg + trace)
active_palette = None
def enable_widget(widget: QtWidgets.QWidget | Collection[QtWidgets.QWidget], enable: bool) -> None:
if isinstance(widget, Sequence):
for w in widget:
@ -156,8 +161,7 @@ if qt_available:
_enable_widget(widget, enable)
def _enable_widget(widget: QtWidgets.QWidget, enable: bool) -> None:
global active_palette
if not (widget is not None and active_palette is not None):
if widget is None or active_palette is None:
return
active_color = active_palette.color(QtGui.QPalette.ColorRole.Base)

View File

@ -16,11 +16,13 @@
# limitations under the License.
from __future__ import annotations
import contextlib
import datetime
import logging
import os
import pathlib
import sqlite3
import threading
from typing import Any, Generic, TypeVar
from typing_extensions import NamedTuple
@ -53,6 +55,8 @@ class ComicCacher:
self.db_file = cache_folder / "comic_cache.db"
self.version_file = cache_folder / "cache_version.txt"
self.version = version
self.local: threading.Thread | None = None
self.db: sqlite3.Connection | None = None
# verify that cache is from same version as this one
data = ""
@ -65,10 +69,13 @@ class ComicCacher:
if data != version:
self.clear_cache()
if not os.path.exists(self.db_file):
self.create_cache_db()
self.create_cache_db()
def clear_cache(self) -> None:
try:
self.close()
except Exception:
pass
try:
os.unlink(self.db_file)
except Exception:
@ -78,32 +85,40 @@ class ComicCacher:
except Exception:
pass
def connect(self) -> sqlite3.Connection:
if self.local != threading.current_thread():
self.db = None
if self.db is None:
self.local = threading.current_thread()
self.db = sqlite3.connect(self.db_file)
self.db.row_factory = sqlite3.Row
self.db.text_factory = str
return self.db
def close(self) -> None:
if self.db is not None:
self.db.close()
self.db = None
def create_cache_db(self) -> None:
# create the version file
with open(self.version_file, "w", encoding="utf-8") as f:
f.write(self.version)
# this will wipe out any existing version
open(self.db_file, "wb").close()
con = sqlite3.connect(self.db_file)
con.row_factory = sqlite3.Row
# create tables
with con:
cur = con.cursor()
with self.connect() as con, contextlib.closing(con.cursor()) as cur:
cur.execute(
"""CREATE TABLE SeriesSearchCache(
"""CREATE TABLE IF NOT EXISTS SeriesSearchCache(
timestamp DATE DEFAULT (datetime('now','localtime')),
id TEXT NOT NULL,
source TEXT NOT NULL,
search_term TEXT,
PRIMARY KEY (id, source, search_term))"""
)
cur.execute("CREATE TABLE Source(id TEXT NOT NULL, name TEXT NOT NULL, PRIMARY KEY (id))")
cur.execute("CREATE TABLE IF NOT EXISTS Source(id TEXT NOT NULL, name TEXT NOT NULL, PRIMARY KEY (id))")
cur.execute(
"""CREATE TABLE Series(
"""CREATE TABLE IF NOT EXISTS Series(
timestamp DATE DEFAULT (datetime('now','localtime')),
id TEXT NOT NULL,
source TEXT NOT NULL,
@ -113,7 +128,7 @@ class ComicCacher:
)
cur.execute(
"""CREATE TABLE Issues(
"""CREATE TABLE IF NOT EXISTS Issues(
timestamp DATE DEFAULT (datetime('now','localtime')),
id TEXT NOT NULL,
source TEXT NOT NULL,
@ -129,10 +144,7 @@ class ComicCacher:
cur.execute("DELETE FROM Series WHERE timestamp < ?", [str(a_week_ago)])
def add_search_results(self, source: str, search_term: str, series_list: list[Series], complete: bool) -> None:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
con.text_factory = str
cur = con.cursor()
with self.connect() as con, contextlib.closing(con.cursor()) as cur:
# remove all previous entries with this search term
cur.execute(
@ -155,9 +167,7 @@ class ComicCacher:
self.upsert(cur, "series", data)
def add_series_info(self, source: str, series: Series, complete: bool) -> None:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
with self.connect() as con, contextlib.closing(con.cursor()) as cur:
data = {
"id": series.id,
@ -168,9 +178,7 @@ class ComicCacher:
self.upsert(cur, "series", data)
def add_issues_info(self, source: str, issues: list[Issue], complete: bool) -> None:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
with self.connect() as con, contextlib.closing(con.cursor()) as cur:
for issue in issues:
data = {
@ -184,10 +192,7 @@ class ComicCacher:
def get_search_results(self, source: str, search_term: str, expire_stale: bool = True) -> list[CacheResult[Series]]:
results = []
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
con.text_factory = str
cur = con.cursor()
with self.connect() as con, contextlib.closing(con.cursor()) as cur:
if expire_stale:
self.expire_stale_records(cur, "SeriesSearchCache")
@ -210,10 +215,7 @@ class ComicCacher:
return results
def get_series_info(self, series_id: str, source: str, expire_stale: bool = True) -> CacheResult[Series] | None:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
con.text_factory = str
with self.connect() as con, contextlib.closing(con.cursor()) as cur:
if expire_stale:
self.expire_stale_records(cur, "Series")
@ -233,10 +235,7 @@ class ComicCacher:
def get_series_issues_info(
self, series_id: str, source: str, expire_stale: bool = True
) -> list[CacheResult[Issue]]:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
con.text_factory = str
with self.connect() as con, contextlib.closing(con.cursor()) as cur:
if expire_stale:
self.expire_stale_records(cur, "Issues")
@ -256,10 +255,7 @@ class ComicCacher:
return results
def get_issue_info(self, issue_id: str, source: str, expire_stale: bool = True) -> CacheResult[Issue] | None:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
con.text_factory = str
with self.connect() as con, contextlib.closing(con.cursor()) as cur:
if expire_stale:
self.expire_stale_records(cur, "Issues")
@ -309,3 +305,17 @@ class ComicCacher:
vals.append(True) # If the cache is complete and this isn't complete we don't update it
cur.execute(sql_ins, vals)
def adapt_datetime_iso(val: datetime.datetime) -> str:
"""Adapt datetime.datetime to timezone-naive ISO 8601 date."""
return val.isoformat()
def convert_datetime(val: bytes) -> datetime.datetime:
"""Convert ISO 8601 datetime to datetime.datetime object."""
return datetime.datetime.fromisoformat(val.decode())
sqlite3.register_adapter(datetime.datetime, adapt_datetime_iso)
sqlite3.register_converter("datetime", convert_datetime)

View File

@ -43,7 +43,7 @@ def cleanup_html(string: str | None, remove_html_tables: bool = False) -> str:
# find any tables
soup = BeautifulSoup(string, "html.parser")
tables = soup.findAll("table")
tables = soup.find_all("table")
# put in our own
string = re.sub(r"<br>|</li>", "\n", string, flags=re.IGNORECASE)
@ -78,15 +78,15 @@ def cleanup_html(string: str | None, remove_html_tables: bool = False) -> str:
rows = []
hdrs = []
col_widths = []
for hdr in table.findAll("th"):
for hdr in table.find_all("th"):
item = hdr.string.strip()
hdrs.append(item)
col_widths.append(len(item))
rows.append(hdrs)
for row in table.findAll("tr"):
for row in table.find_all("tr"):
cols = []
col = row.findAll("td")
col = row.find_all("td")
for i, c in enumerate(col):
item = c.string.strip()

View File

@ -22,20 +22,21 @@ import json
import logging
import pathlib
import time
from functools import cache
from typing import Any, Callable, Generic, TypeVar, cast
from urllib.parse import parse_qsl, urljoin
from urllib.parse import parse_qsl, urlencode, urljoin
import settngs
from pyrate_limiter import Limiter, RequestRate
from typing_extensions import Required, TypedDict
from comicapi import utils
from comicapi.genericmetadata import ComicSeries, GenericMetadata, MetadataOrigin
from comicapi.genericmetadata import ComicSeries, GenericMetadata, ImageHash, MetadataOrigin
from comicapi.issuestring import IssueString
from comicapi.utils import LocationParseError, parse_url
from comicapi.utils import LocationParseError, StrEnum, parse_url
from comictalker import talker_utils
from comictalker.comiccacher import ComicCacher, Issue, Series
from comictalker.comictalker import ComicTalker, TalkerDataError, TalkerNetworkError
from comictalker.comictalker import ComicTalker, TalkerDataError, TalkerError, TalkerNetworkError
try:
import niquests as requests
@ -46,7 +47,7 @@ logger = logging.getLogger(__name__)
TWITTER_TOO_MANY_REQUESTS = 420
class CVTypeID:
class CVTypeID(StrEnum):
Volume = "4050" # CV uses volume to mean series
Issue = "4000"
@ -262,6 +263,10 @@ class ComicVineTalker(ComicTalker):
self._log_total_requests()
return "Failed to connect to the URL!", False
@cache
def cacher(self) -> ComicCacher:
return ComicCacher(self.cache_folder, self.version)
def search_for_series(
self,
series_name: str,
@ -281,7 +286,7 @@ class ComicVineTalker(ComicTalker):
# Before we search online, look in our cache, since we might have done this same search recently
# For literal searches always retrieve from online
cvc = ComicCacher(self.cache_folder, self.version)
cvc = self.cacher()
if not refresh_cache and not literal:
cached_search_results = cvc.get_search_results(self.id, series_name)
@ -389,7 +394,7 @@ class ComicVineTalker(ComicTalker):
) -> list[GenericMetadata]:
logger.debug("Fetching comics by series ids: %s and number: %s", series_id_list, issue_number)
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher(self.cache_folder, self.version)
cvc = self.cacher()
cached_results: list[GenericMetadata] = []
needed_volumes: set[int] = set()
for series_id in series_id_list:
@ -476,136 +481,137 @@ class ComicVineTalker(ComicTalker):
return formatted_filtered_issues_result
def fetch_comics(self, *, issue_ids: list[str]) -> list[GenericMetadata]:
logger.debug("Fetching comic IDs: %s", issue_ids)
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher(self.cache_folder, self.version)
cached_results: list[GenericMetadata] = []
needed_issues: list[int] = []
for issue_id in issue_ids:
cached_issue = cvc.get_issue_info(issue_id, self.id)
def _get_id_list(self, needed_issues: list[str]) -> tuple[str, set[str]]:
used_issues = set(needed_issues[: min(len(needed_issues), 100)])
flt = "id:" + "|".join(used_issues)
return flt, used_issues
if cached_issue is not None:
cached_results.append(
self._map_comic_issue_to_metadata(
json.loads(cached_issue[0].data),
self._fetch_series([int(cached_issue[0].series_id)])[0][0],
),
)
else:
needed_issues.append(int(issue_id)) # CV uses integers for it's IDs
def fetch_comics(self, *, issue_ids: list[str]) -> list[GenericMetadata]:
# before we search online, look in our cache, since we might already have this info
cvc = self.cacher()
cached_results: list[GenericMetadata] = []
needed_issues: set[str] = set(issue_ids)
cached_issues = [x for x in (cvc.get_issue_info(issue_id, self.id) for issue_id in issue_ids) if x is not None]
needed_issues -= {i.data.id for i in cached_issues}
for cached_issue in cached_issues:
issue: CVIssue = json.loads(cached_issue.data.data)
series: CVSeries = issue["volume"]
cached_series = cvc.get_series_info(cached_issue.data.series_id, self.id, expire_stale=False)
if cached_series is not None and cached_series.complete:
series = json.loads(cached_series.data.data)
cached_results.append(
self._map_comic_issue_to_metadata(
issue,
self._format_series(series),
),
)
logger.debug("Found %d issues cached need %d issues", len(cached_results), len(needed_issues))
if not needed_issues:
return cached_results
issue_filter = ""
for iid in needed_issues:
issue_filter += str(iid) + "|"
flt = "id:" + issue_filter.rstrip("|")
issue_url = urljoin(self.api_url, "issues/")
params: dict[str, Any] = {
"api_key": self.api_key,
"format": "json",
"filter": flt,
}
cv_response: CVResult[list[CVIssue]] = self._get_cv_content(issue_url, params)
issue_results = cv_response["results"]
page = 1
offset = 0
current_result_count = cv_response["number_of_page_results"]
total_result_count = cv_response["number_of_total_results"]
issue_results: list[CVIssue] = []
# see if we need to keep asking for more pages...
while current_result_count < total_result_count:
page += 1
offset += cv_response["number_of_page_results"]
while needed_issues:
flt, used_issues = self._get_id_list(list(needed_issues))
params["filter"] = flt
params["offset"] = offset
cv_response = self._get_cv_content(issue_url, params)
cv_response: CVResult[list[CVIssue]] = self._get_cv_content(issue_url, params)
issue_results.extend(cv_response["results"])
current_result_count += cv_response["number_of_page_results"]
series_info = {s[0].id: s[0] for s in self._fetch_series([int(i["volume"]["id"]) for i in issue_results])}
retrieved_issues = {str(x["id"]) for x in cv_response["results"]}
used_issues.difference_update(retrieved_issues)
if used_issues:
logger.debug("%s issue ids %r do not exist anymore", self.name, used_issues)
cache_issue: list[Issue] = []
for issue in issue_results:
cache_issue.append(
Issue(
id=str(issue["id"]),
series_id=str(issue["volume"]["id"]),
data=json.dumps(issue).encode("utf-8"),
needed_issues = needed_issues.difference(retrieved_issues, used_issues)
cache_issue: list[Issue] = []
for issue in issue_results:
cache_issue.append(
Issue(
id=str(issue["id"]),
series_id=str(issue["volume"]["id"]),
data=json.dumps(issue).encode("utf-8"),
)
)
cvc.add_issues_info(
self.id,
cache_issue,
False, # The /issues/ endpoint never provides credits
)
cached_results.append(
self._map_comic_issue_to_metadata(issue, series_info[str(issue["volume"]["id"])]),
cvc.add_series_info(
self.id,
Series(id=str(issue["volume"]["id"]), data=json.dumps(issue["volume"]).encode("utf-8")),
False,
)
from pprint import pp
pp(cache_issue, indent=2)
cvc.add_issues_info(
self.id,
cache_issue,
False, # The /issues/ endpoint never provides credits
)
for issue in issue_results:
series = issue["volume"]
cached_series = cvc.get_series_info(str(series["id"]), self.id, expire_stale=False)
if cached_series is not None and cached_series.complete:
series = json.loads(cached_series.data.data)
cached_results.append(
self._map_comic_issue_to_metadata(issue, self._format_series(series)),
)
return cached_results
def _fetch_series(self, series_ids: list[int]) -> list[tuple[ComicSeries, bool]]:
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher(self.cache_folder, self.version)
cvc = self.cacher()
cached_results: list[tuple[ComicSeries, bool]] = []
needed_series: list[int] = []
needed_series: set[str] = set()
for series_id in series_ids:
cached_series = cvc.get_series_info(str(series_id), self.id)
if cached_series is not None:
if cached_series is not None and cached_series.complete:
cached_results.append((self._format_series(json.loads(cached_series[0].data)), cached_series[1]))
else:
needed_series.append(series_id)
needed_series.add(str(series_id))
if needed_series == []:
if not needed_series:
return cached_results
series_filter = ""
for vid in needed_series:
series_filter += str(vid) + "|"
flt = "id:" + series_filter.rstrip("|") # CV uses volume to mean series
logger.debug("Found %d series cached need %d series", len(cached_results), len(needed_series))
series_url = urljoin(self.api_url, "volumes/") # CV uses volume to mean series
params: dict[str, Any] = {
"api_key": self.api_key,
"format": "json",
"filter": flt,
}
cv_response: CVResult[list[CVSeries]] = self._get_cv_content(series_url, params)
series_results: list[CVSeries] = []
series_results = cv_response["results"]
page = 1
offset = 0
current_result_count = cv_response["number_of_page_results"]
total_result_count = cv_response["number_of_total_results"]
while needed_series:
flt, used_series = self._get_id_list(list(needed_series))
params["filter"] = flt
# see if we need to keep asking for more pages...
while current_result_count < total_result_count:
page += 1
offset += cv_response["number_of_page_results"]
params["offset"] = offset
cv_response = self._get_cv_content(series_url, params)
cv_response: CVResult[list[CVSeries]] = self._get_cv_content(series_url, params)
series_results.extend(cv_response["results"])
current_result_count += cv_response["number_of_page_results"]
if series_results:
retrieved_series = {str(x["id"]) for x in series_results}
used_series.difference_update(retrieved_series)
if used_series:
logger.debug("%s series ids %r do not exist anymore", self.name, used_series)
needed_series = needed_series.difference(retrieved_series, used_series)
for series in series_results:
cvc.add_series_info(
self.id,
Series(id=str(series["id"]), data=json.dumps(series).encode("utf-8")),
True,
)
if series_results:
for series in series_results:
cached_results.append((self._format_series(series), True))
return cached_results
@ -614,19 +620,15 @@ class ComicVineTalker(ComicTalker):
"""
Get the content from the CV server.
"""
ratelimit_key = url
if self.api_key == self.default_api_key:
ratelimit_key = "cv"
with self.limiter.ratelimit(ratelimit_key, delay=True):
cv_response: CVResult[T] = self._get_url_content(url, params)
if cv_response["status_code"] != 1:
logger.debug(
f"{self.name} query failed with error #{cv_response['status_code']}: [{cv_response['error']}]."
)
raise TalkerNetworkError(self.name, 0, f"{cv_response['status_code']}: {cv_response['error']}")
cv_response: CVResult[T] = self._get_url_content(url, params)
if cv_response["status_code"] != 1:
logger.debug(
f"{self.name} query failed with error #{cv_response['status_code']}: [{cv_response['error']}]."
)
raise TalkerNetworkError(self.name, 0, f"{cv_response['status_code']}: {cv_response['error']}")
return cv_response
return cv_response
def _get_url_content(self, url: str, params: dict[str, Any]) -> Any:
# if there is a 500 error, try a few more times before giving up
@ -636,47 +638,65 @@ class ComicVineTalker(ComicTalker):
for tries in range(1, 5):
try:
self.total_requests_made[url.removeprefix(self.api_url)] += 1
resp = requests.get(
url, params=final_params, headers={"user-agent": "comictagger/" + self.version}, timeout=10
)
ratelimit_key = self._get_ratelimit_key(url)
with self.limiter.ratelimit(ratelimit_key, delay=True):
logger.debug("Requesting: %s?%s", url, urlencode(final_params))
self.total_requests_made[ratelimit_key] += 1
resp = requests.get(
url, params=final_params, headers={"user-agent": "comictagger/" + self.version}, timeout=60
)
if resp.status_code == 200:
return resp.json()
elif resp.status_code == 500:
logger.debug(f"Try #{tries}: ")
time.sleep(1)
logger.debug(str(resp.status_code))
elif resp.status_code in (
requests.codes.SERVER_ERROR,
requests.codes.BAD_GATEWAY,
requests.codes.UNAVAILABLE,
):
logger.debug("Try #%d: %d", tries, resp.status_code)
elif resp.status_code in (requests.status_codes.codes.TOO_MANY_REQUESTS, TWITTER_TOO_MANY_REQUESTS):
logger.info(f"{self.name} rate limit encountered. Waiting for 10 seconds\n")
elif resp.status_code in (requests.codes.TOO_MANY_REQUESTS, TWITTER_TOO_MANY_REQUESTS):
logger.info("%s rate limit encountered. Waiting for 10 seconds", self.name)
self._log_total_requests()
time.sleep(10)
limit_counter += 1
if limit_counter > 3:
# Tried 3 times, inform user to check CV website.
logger.error(f"{self.name} rate limit error. Exceeded 3 retires.")
logger.error("%s rate limit error. Exceeded 3 retires.", self.name)
raise TalkerNetworkError(
self.name,
3,
"Rate Limit Error: Check your current API usage limit at https://comicvine.gamespot.com/api/",
)
else:
logger.error("Unknown status code: %d, %s", resp.status_code, resp.content)
break
except requests.exceptions.Timeout:
logger.debug(f"Connection to {self.name} timed out.")
raise TalkerNetworkError(self.name, 4)
if tries > 3:
raise TalkerNetworkError(self.name, 4)
except requests.exceptions.RequestException as e:
logger.debug(f"Request exception: {e}")
raise TalkerNetworkError(self.name, 0, str(e)) from e
except json.JSONDecodeError as e:
logger.debug(f"JSON decode error: {e}")
raise TalkerDataError(self.name, 2, "ComicVine did not provide json")
except TalkerError as e:
raise e
except Exception as e:
raise TalkerNetworkError(self.name, 5, str(e))
raise TalkerNetworkError(self.name, 5, "Unknown error occurred")
def _get_ratelimit_key(self, url: str) -> str:
if self.api_key == self.default_api_key:
return "cv"
ratelimit_key = url.removeprefix(self.api_url)
for x in CVTypeID:
ratelimit_key = ratelimit_key.partition(f"/{x}-")[0]
return ratelimit_key
def _format_search_results(self, search_results: list[CVSeries]) -> list[ComicSeries]:
formatted_results = []
for record in search_results:
@ -716,7 +736,7 @@ class ComicVineTalker(ComicTalker):
def _fetch_issues_in_series(self, series_id: str) -> list[tuple[GenericMetadata, bool]]:
logger.debug("Fetching all issues in series: %s", series_id)
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher(self.cache_folder, self.version)
cvc = self.cacher()
cached_results = cvc.get_series_issues_info(series_id, self.id)
series = self._fetch_series_data(int(series_id))[0]
@ -773,11 +793,11 @@ class ComicVineTalker(ComicTalker):
def _fetch_series_data(self, series_id: int) -> tuple[ComicSeries, bool]:
logger.debug("Fetching series info: %s", series_id)
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher(self.cache_folder, self.version)
cvc = self.cacher()
cached_series = cvc.get_series_info(str(series_id), self.id)
logger.debug("Series cached: %s", bool(cached_series))
if cached_series is not None:
if cached_series is not None and cached_series.complete:
return (self._format_series(json.loads(cached_series[0].data)), cached_series[1])
series_url = urljoin(self.api_url, f"volume/{CVTypeID.Volume}-{series_id}") # CV uses volume to mean series
@ -821,7 +841,7 @@ class ComicVineTalker(ComicTalker):
def _fetch_issue_data_by_issue_id(self, issue_id: str) -> GenericMetadata:
logger.debug("Fetching issue by issue ID: %s", issue_id)
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher(self.cache_folder, self.version)
cvc = self.cacher()
cached_issue = cvc.get_issue_info(issue_id, self.id)
logger.debug("Issue cached: %s", bool(cached_issue and cached_issue[1]))
@ -875,13 +895,11 @@ class ComicVineTalker(ComicTalker):
md.web_links = [parse_url(url)]
except LocationParseError:
...
if issue.get("image") is None:
md._cover_image = ""
else:
md._cover_image = issue.get("image", {}).get("super_url", "")
if issue.get("image") is not None:
md._cover_image = ImageHash(URL=issue.get("image", {}).get("super_url", ""), Hash=0, Kind="")
for alt in issue.get("associated_images", []):
md._alternate_images.append(alt["original_url"])
md._alternate_images.append(ImageHash(URL=alt["original_url"], Hash=0, Kind=""))
for character in issue.get("character_credits", set()):
md.characters.add(character["name"])

View File

@ -15,7 +15,6 @@ classifiers =
Environment :: Win32 (MS Windows)
Environment :: X11 Applications :: Qt
Intended Audience :: End Users/Desktop
License :: OSI Approved :: Apache Software License
Natural Language :: English
Operating System :: OS Independent
Programming Language :: Python :: 3
@ -46,9 +45,8 @@ install_requires =
pillow>=9.1.0
pyrate-limiter>=2.6,<3
pyyaml
rapidfuzz>=2.12.0
requests==2.*
settngs==0.10.4
settngs==0.11.0
text2digits
typing-extensions>=4.3.0
wordninja
@ -329,6 +327,7 @@ per-file-ignores =
[mypy]
exclude = comictaggerlib/graphics/resources.py
check_untyped_defs = true
local_partial_types = true
disallow_any_generics = true
disallow_incomplete_defs = true
disallow_untyped_defs = true

View File

@ -289,34 +289,78 @@ metadata_prepared = (
),
)
issueidentifier_score = (
issueidentifier_score = ( # type: ignore[var-annotated]
(
(
comicapi.genericmetadata.ImageHash(
Hash=0, # Force using the alternate, since the alternate is a url it will be ignored
Kind="ahash",
),
["https://comicvine.gamespot.com/cory-doctorows-futuristic-tales-of-the-here-and-no/4000-140529/"],
True,
None,
[],
False,
),
{
"remote_hash": 0,
"score": 31,
"score": 100,
"url": "",
"local_hash": 0,
"local_hash_name": "0",
},
),
(
(
# Test invalid ImageHash Kind value
comicapi.genericmetadata.ImageHash(
Hash=0,
Kind="",
URL="",
),
[],
False,
),
{
"remote_hash": 0,
"score": 100,
"url": "",
"local_hash": 0,
"local_hash_name": "0",
},
),
(
(
# Test URL alternative
comicapi.genericmetadata.ImageHash(
Hash=0,
Kind="ahash",
URL="",
),
[
comicapi.genericmetadata.ImageHash(
URL="https://comicvine.gamespot.com/a/uploads/scale_large/0/574/585444-109004_20080707014047_large.jpg",
Hash=0,
Kind="",
)
],
True,
),
{
"remote_hash": 212201432349720,
"score": 0,
"url": "https://comicvine.gamespot.com/a/uploads/scale_large/0/574/585444-109004_20080707014047_large.jpg",
"local_hash": 212201432349720,
"local_hash_name": "Cover 1",
},
),
(
(
# Test hash alternative
comicapi.genericmetadata.ImageHash(
Hash=0,
Kind="ahash",
URL="",
),
[
comicapi.genericmetadata.ImageHash(
Hash=212201432349720,
Kind="ahash",
URL="",
),
],
True,
@ -334,8 +378,9 @@ issueidentifier_score = (
comicapi.genericmetadata.ImageHash(
Hash=212201432349720,
Kind="ahash",
URL="",
),
["https://comicvine.gamespot.com/cory-doctorows-futuristic-tales-of-the-here-and-no/4000-140529/"],
[],
False,
),
{
@ -348,8 +393,12 @@ issueidentifier_score = (
),
(
(
"https://comicvine.gamespot.com/a/uploads/scale_large/0/574/585444-109004_20080707014047_large.jpg",
["https://comicvine.gamespot.com/cory-doctorows-futuristic-tales-of-the-here-and-no/4000-140529/"],
comicapi.genericmetadata.ImageHash(
Hash=0,
Kind="",
URL="https://comicvine.gamespot.com/a/uploads/scale_large/0/574/585444-109004_20080707014047_large.jpg",
),
[],
False,
),
{

View File

@ -181,7 +181,9 @@ comic_issue_result = comicapi.genericmetadata.GenericMetadata(
issue_id=str(cv_issue_result["results"]["id"]),
series=cv_issue_result["results"]["volume"]["name"],
series_id=str(cv_issue_result["results"]["volume"]["id"]),
_cover_image=cv_issue_result["results"]["image"]["super_url"],
_cover_image=comicapi.genericmetadata.ImageHash(
URL=cv_issue_result["results"]["image"]["super_url"], Hash=0, Kind=""
),
issue=cv_issue_result["results"]["issue_number"],
volume=None,
title=cv_issue_result["results"]["name"],
@ -240,7 +242,9 @@ cv_md = comicapi.genericmetadata.GenericMetadata(
rights=None,
identifier=None,
last_mark=None,
_cover_image=cv_issue_result["results"]["image"]["super_url"],
_cover_image=comicapi.genericmetadata.ImageHash(
URL=cv_issue_result["results"]["image"]["super_url"], Hash=0, Kind=""
),
)

View File

@ -1,5 +1,6 @@
from __future__ import annotations
import os
import pathlib
import platform
import shutil
@ -81,8 +82,9 @@ def test_page_type_write(tmp_comic):
def test_invalid_zip(tmp_comic: comicapi.comicarchive.ComicArchive):
with open(tmp_comic.path, mode="b+r") as f:
# This only corrupts the first file. If it is never read then no exception will be caused
f.write(b"PK\000\000")
# Corrupting the first file only breaks the first file. If it is never read then no exception will be raised
f.seek(-10, os.SEEK_END) # seek to a probably bad place in th Central Directory and write some bytes
f.write(b"PK\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000")
result = tmp_comic.write_tags(comicapi.genericmetadata.md_test, "cr") # This is not the first file
assert result

View File

@ -215,7 +215,7 @@ def config(tmp_path):
@pytest.fixture
def plugin_config(tmp_path):
def plugin_config(tmp_path, comicvine_api):
from comictaggerlib.main import App
ns = Namespace(config=comictaggerlib.ctsettings.ComicTaggerPaths(tmp_path / "config"))

37
tests/imagehasher_test.py Normal file
View File

@ -0,0 +1,37 @@
from __future__ import annotations
from comicapi.comicarchive import ComicArchive
from comictaggerlib.imagehasher import ImageHasher
def test_ahash(cbz: ComicArchive):
md = cbz.read_tags("cr")
covers = md.get_cover_page_index_list()
assert covers
cover = cbz.get_page(covers[0])
assert cover
ih = ImageHasher(data=cover)
assert bin(212201432349720) == bin(ih.average_hash())
def test_dhash(cbz: ComicArchive):
md = cbz.read_tags("cr")
covers = md.get_cover_page_index_list()
assert covers
cover = cbz.get_page(covers[0])
assert cover
ih = ImageHasher(data=cover)
assert bin(11278294082955047009) == bin(ih.difference_hash())
def test_phash(cbz: ComicArchive):
md = cbz.read_tags("cr")
covers = md.get_cover_page_index_list()
assert covers
cover = cbz.get_page(covers[0])
assert cover
ih = ImageHasher(data=cover)
assert bin(15307782992485167995) == bin(ih.perception_hash())

View File

@ -13,7 +13,6 @@ from comictalker.comictalker import ComicTalker
def test_save(
plugin_config: tuple[settngs.Config[ctsettings.ct_ns], dict[str, ComicTalker]],
tmp_comic,
comicvine_api,
md_saved,
mock_now,
) -> None:
@ -70,7 +69,6 @@ def test_save(
def test_delete(
plugin_config: tuple[settngs.Config[ctsettings.ct_ns], dict[str, ComicTalker]],
tmp_comic,
comicvine_api,
md_saved,
mock_now,
) -> None:
@ -109,7 +107,6 @@ def test_delete(
def test_rename(
plugin_config: tuple[settngs.Config[ctsettings.ct_ns], dict[str, ComicTalker]],
tmp_comic,
comicvine_api,
md_saved,
mock_now,
) -> None:

View File

@ -42,7 +42,7 @@ def test_get_issue_cover_match_score(
cbz,
config,
comicvine_api,
data: tuple[str | ImageHash, list[str | ImageHash], bool],
data: tuple[ImageHash, list[ImageHash], bool],
expected: comictaggerlib.issueidentifier.Score,
):
config, definitions = config