Convert ComicIssue into GenericMetadata

I could not find a good reason for ComicIssue to exist other than that
 it had more attributes than GenericMetadata, so it has been replaced.
New attributes for GenericMetadata:
  series_id:        a string uniquely identifying the series to tag_origin
  series_aliases:   alternate series names that are not the canonical name
  title_aliases:    alternate issue titles that are not the canonical name
  alternate_images: a list of urls to alternate cover images

Updated attributes for GenericMetadata:
  genre        -> genres:        str -> list[str]
  comments     -> description:   str -> str
  story_arc    -> story_arcs:    str -> list[str]
  series_group -> series_groups: str -> list[str]
  character    -> characters:    str -> list[str]
  team         -> teams:         str -> list[str]
  location     -> locations:     str -> list[str]
  tag_origin   -> tag_origin:    str -> TagOrigin (tuple[str, str])

ComicSeries has been relocated to the ComicAPI package, currently has no
 usage within ComicAPI.
CreditMetadata has been renamed to Credit and has replaced Credit from
 ComicTalker.
fetch_series has been added to ComicTalker, this is currently only used
 in the GUI when a series is selected and does not already contain the
 needed fields, this function should always be cached.

A new split function has been added to ComicAPI, all uses of split on
 single characters have been updated to use this

cleanup_html and the corresponding setting are now only used in
 ComicTagger proper, for display we want any html directly from the
 upstream. When applying the metadata we then strip the description of
 any html.

A new conversion has been added to the MetadataFormatter:
  j: joins any lists into a string with ', '. Note this is a valid
     operation on strings as well, it will add ', ' in between every
     character.

parse_settings now assigns the given ComicTaggerPaths object to the
 result ensuring that the correct path is always used.
This commit is contained in:
Timmy Welch 2023-08-02 09:00:04 -07:00
parent 1b6307f9c2
commit 2c3a2566cc
36 changed files with 770 additions and 918 deletions

View File

@ -64,7 +64,7 @@ class CoMet:
assign("series", md.series)
assign("issue", md.issue) # must be int??
assign("volume", md.volume)
assign("description", md.comments)
assign("description", md.description)
assign("publisher", md.publisher)
assign("pages", md.page_count)
assign("format", md.format)
@ -75,10 +75,10 @@ class CoMet:
assign("rights", md.rights)
assign("identifier", md.identifier)
assign("lastMark", md.last_mark)
assign("genre", md.genre) # TODO repeatable
assign("genre", ",".join(md.genres)) # TODO repeatable
if md.characters is not None:
char_list = [c.strip() for c in md.characters.split(",")]
char_list = [c.strip() for c in md.characters]
for c in char_list:
assign("character", c)
@ -142,7 +142,7 @@ class CoMet:
md.title = utils.xlate(get("title"))
md.issue = utils.xlate(get("issue"))
md.volume = utils.xlate_int(get("volume"))
md.comments = utils.xlate(get("description"))
md.description = utils.xlate(get("description"))
md.publisher = utils.xlate(get("publisher"))
md.language = utils.xlate(get("language"))
md.format = utils.xlate(get("format"))
@ -153,7 +153,6 @@ class CoMet:
md.rights = utils.xlate(get("rights"))
md.identifier = utils.xlate(get("identifier"))
md.last_mark = utils.xlate(get("lastMark"))
md.genre = utils.xlate(get("genre")) # TODO - repeatable field
_, md.month, md.year = utils.parse_date_str(utils.xlate(get("date")))
@ -163,12 +162,15 @@ class CoMet:
if reading_direction is not None and reading_direction == "rtl":
md.manga = "YesAndRightToLeft"
# loop for genre tags
for n in root:
if n.tag == "genre":
md.genres.append((n.text or "").strip())
# loop for character tags
char_list = []
for n in root:
if n.tag == "character":
char_list.append((n.text or "").strip())
md.characters = ", ".join(char_list)
md.characters.append((n.text or "").strip())
# Now extract the credit info
for n in root:

View File

@ -88,8 +88,8 @@ class ComicBookInfo:
metadata.month = utils.xlate_int(cbi["publicationMonth"])
metadata.year = utils.xlate_int(cbi["publicationYear"])
metadata.issue_count = utils.xlate_int(cbi["numberOfIssues"])
metadata.comments = utils.xlate(cbi["comments"])
metadata.genre = utils.xlate(cbi["genre"])
metadata.description = utils.xlate(cbi["comments"])
metadata.genres = utils.split(cbi["genre"], ",")
metadata.volume = utils.xlate_int(cbi["volume"])
metadata.volume_count = utils.xlate_int(cbi["numberOfVolumes"])
metadata.language = utils.xlate(cbi["language"])
@ -104,11 +104,7 @@ class ComicBookInfo:
)
for x in cbi["credits"]
]
metadata.tags = set(cbi["tags"]) if cbi["tags"] is not None else set()
# make sure credits and tags are at least empty lists and not None
if metadata.credits is None:
metadata.credits = []
metadata.tags.update(cbi["tags"] if cbi["tags"] is not None else set())
# need the language string to be ISO
if metadata.language:
@ -155,8 +151,8 @@ class ComicBookInfo:
assign("publicationMonth", utils.xlate_int(metadata.month))
assign("publicationYear", utils.xlate_int(metadata.year))
assign("numberOfIssues", utils.xlate_int(metadata.issue_count))
assign("comments", utils.xlate(metadata.comments))
assign("genre", utils.xlate(metadata.genre))
assign("comments", utils.xlate(metadata.description))
assign("genre", utils.xlate(",".join(metadata.genres)))
assign("volume", utils.xlate_int(metadata.volume))
assign("numberOfVolumes", utils.xlate_int(metadata.volume_count))
assign("language", utils.xlate(utils.get_language_from_iso(metadata.language)))

View File

@ -69,12 +69,19 @@ class ComicInfoXml:
# helper func
def assign(cix_entry: str, md_entry: Any) -> None:
if md_entry is not None and md_entry:
if md_entry:
text = ""
if isinstance(md_entry, str):
text = md_entry
elif isinstance(md_entry, list):
text = ",".join(md_entry)
else:
text = str(md_entry)
et_entry = root.find(cix_entry)
if et_entry is not None:
et_entry.text = str(md_entry)
et_entry.text = text
else:
ET.SubElement(root, cix_entry).text = str(md_entry)
ET.SubElement(root, cix_entry).text = text
else:
et_entry = root.find(cix_entry)
if et_entry is not None:
@ -87,10 +94,10 @@ class ComicInfoXml:
assign("Volume", md.volume)
assign("AlternateSeries", md.alternate_series)
assign("AlternateNumber", md.alternate_number)
assign("StoryArc", md.story_arc)
assign("SeriesGroup", md.series_group)
assign("StoryArc", md.story_arcs)
assign("SeriesGroup", md.series_groups)
assign("AlternateCount", md.alternate_count)
assign("Summary", md.comments)
assign("Summary", md.description)
assign("Notes", md.notes)
assign("Year", md.year)
assign("Month", md.month)
@ -141,7 +148,7 @@ class ComicInfoXml:
assign("Publisher", md.publisher)
assign("Imprint", md.imprint)
assign("Genre", md.genre)
assign("Genre", md.genres)
assign("Web", md.web_link)
assign("PageCount", md.page_count)
assign("LanguageISO", md.language)
@ -194,25 +201,25 @@ class ComicInfoXml:
md.alternate_series = utils.xlate(get("AlternateSeries"))
md.alternate_number = utils.xlate(get("AlternateNumber"))
md.alternate_count = utils.xlate_int(get("AlternateCount"))
md.comments = utils.xlate(get("Summary"))
md.description = utils.xlate(get("Summary"))
md.notes = utils.xlate(get("Notes"))
md.year = utils.xlate_int(get("Year"))
md.month = utils.xlate_int(get("Month"))
md.day = utils.xlate_int(get("Day"))
md.publisher = utils.xlate(get("Publisher"))
md.imprint = utils.xlate(get("Imprint"))
md.genre = utils.xlate(get("Genre"))
md.genres = utils.split(get("Genre"), ",")
md.web_link = utils.xlate(get("Web"))
md.language = utils.xlate(get("LanguageISO"))
md.format = utils.xlate(get("Format"))
md.manga = utils.xlate(get("Manga"))
md.characters = utils.xlate(get("Characters"))
md.teams = utils.xlate(get("Teams"))
md.locations = utils.xlate(get("Locations"))
md.characters = utils.split(get("Characters"), ",")
md.teams = utils.split(get("Teams"), ",")
md.locations = utils.split(get("Locations"), ",")
md.page_count = utils.xlate_int(get("PageCount"))
md.scan_info = utils.xlate(get("ScanInformation"))
md.story_arc = utils.xlate(get("StoryArc"))
md.series_group = utils.xlate(get("SeriesGroup"))
md.story_arcs = utils.split(get("StoryArc"), ",")
md.series_groups = utils.split(get("SeriesGroup"), ",")
md.maturity_rating = utils.xlate(get("AgeRating"))
md.critical_rating = utils.xlate_float(get("CommunityRating"))
@ -232,12 +239,12 @@ class ComicInfoXml:
]
):
if n.text is not None:
for name in n.text.split(","):
for name in utils.split(n.text, ","):
md.add_credit(name.strip(), n.tag)
if n.tag == "CoverArtist":
if n.text is not None:
for name in n.text.split(","):
for name in utils.split(n.text, ","):
md.add_credit(name.strip(), "Cover")
# parse page data now

View File

@ -253,9 +253,9 @@ class FileNameParser:
remainder = ""
if "--" in filename:
remainder = filename.split("--", 1)[1]
remainder = "--".join(filename.split("--", 1)[1:])
elif "__" in filename:
remainder = filename.split("__", 1)[1]
remainder = "__".join(filename.split("__", 1)[1:])
elif issue_end != 0:
remainder = filename[issue_end:]

View File

@ -25,6 +25,8 @@ import dataclasses
import logging
from typing import Any, TypedDict
from typing_extensions import NamedTuple
from comicapi import utils
logger = logging.getLogger(__name__)
@ -60,12 +62,35 @@ class ImageMetadata(TypedDict, total=False):
ImageWidth: str
class CreditMetadata(TypedDict):
class Credit(TypedDict):
person: str
role: str
primary: bool
@dataclasses.dataclass
class ComicSeries:
id: str
name: str
aliases: list[str]
count_of_issues: int | None
count_of_volumes: int | None
description: str
image_url: str
publisher: str
start_year: int | None
genres: list[str]
format: str | None
def copy(self) -> ComicSeries:
return copy.deepcopy(self)
class TagOrigin(NamedTuple):
id: str
name: str
@dataclasses.dataclass
class GenericMetadata:
writer_synonyms = ["writer", "plotter", "scripter"]
@ -77,21 +102,24 @@ class GenericMetadata:
editor_synonyms = ["editor"]
is_empty: bool = True
tag_origin: str | None = None
tag_origin: TagOrigin | None = None
issue_id: str | None = None
series_id: str | None = None
series: str | None = None
series_aliases: list[str] = dataclasses.field(default_factory=list)
issue: str | None = None
title: str | None = None
title_aliases: list[str] = dataclasses.field(default_factory=list)
publisher: str | None = None
month: int | None = None
year: int | None = None
day: int | None = None
issue_count: int | None = None
volume: int | None = None
genre: str | None = None
genres: list[str] = dataclasses.field(default_factory=list)
language: str | None = None # 2 letter iso code
comments: str | None = None # use same way as Summary in CIX
description: str | None = None # use same way as Summary in CIX
volume_count: int | None = None
critical_rating: float | None = None # rating in CBL; CommunityRating in CIX
@ -109,15 +137,16 @@ class GenericMetadata:
page_count: int | None = None
maturity_rating: str | None = None
story_arc: str | None = None
series_group: str | None = None
story_arcs: list[str] = dataclasses.field(default_factory=list)
series_groups: list[str] = dataclasses.field(default_factory=list)
scan_info: str | None = None
characters: str | None = None
teams: str | None = None
locations: str | None = None
characters: list[str] = dataclasses.field(default_factory=list)
teams: list[str] = dataclasses.field(default_factory=list)
locations: list[str] = dataclasses.field(default_factory=list)
credits: list[CreditMetadata] = dataclasses.field(default_factory=list)
alternate_images: list[str] = dataclasses.field(default_factory=list)
credits: list[Credit] = dataclasses.field(default_factory=list)
tags: set[str] = dataclasses.field(default_factory=set)
pages: list[ImageMetadata] = dataclasses.field(default_factory=list)
@ -127,7 +156,7 @@ class GenericMetadata:
rights: str | None = None
identifier: str | None = None
last_mark: str | None = None
cover_image: str | None = None
cover_image: str | None = None # url to cover image
def __post_init__(self) -> None:
for key, value in self.__dict__.items():
@ -154,6 +183,8 @@ class GenericMetadata:
if new is not None:
if isinstance(new, str) and len(new) == 0:
setattr(self, cur, None)
elif isinstance(new, list) and len(new) == 0:
pass
else:
setattr(self, cur, new)
@ -161,7 +192,9 @@ class GenericMetadata:
self.is_empty = False
assign("series", new_md.series)
assign("series_id", new_md.series_id)
assign("issue", new_md.issue)
assign("issue_id", new_md.issue_id)
assign("issue_count", new_md.issue_count)
assign("title", new_md.title)
assign("publisher", new_md.publisher)
@ -170,7 +203,6 @@ class GenericMetadata:
assign("year", new_md.year)
assign("volume", new_md.volume)
assign("volume_count", new_md.volume_count)
assign("genre", new_md.genre)
assign("language", new_md.language)
assign("country", new_md.country)
assign("critical_rating", new_md.critical_rating)
@ -183,13 +215,8 @@ class GenericMetadata:
assign("manga", new_md.manga)
assign("black_and_white", new_md.black_and_white)
assign("maturity_rating", new_md.maturity_rating)
assign("story_arc", new_md.story_arc)
assign("series_group", new_md.series_group)
assign("scan_info", new_md.scan_info)
assign("characters", new_md.characters)
assign("teams", new_md.teams)
assign("locations", new_md.locations)
assign("comments", new_md.comments)
assign("description", new_md.description)
assign("notes", new_md.notes)
assign("price", new_md.price)
@ -206,13 +233,18 @@ class GenericMetadata:
# For now, go the easy route, where any overlay
# value wipes out the whole list
if len(new_md.tags) > 0:
assign("tags", new_md.tags)
assign("series_aliases", new_md.series_aliases)
assign("title_aliases", new_md.title_aliases)
assign("genres", new_md.genres)
assign("story_arcs", new_md.story_arcs)
assign("series_groups", new_md.series_groups)
assign("characters", new_md.characters)
assign("teams", new_md.teams)
assign("locations", new_md.locations)
assign("tags", new_md.tags)
assign("pages", new_md.pages)
if len(new_md.pages) > 0:
assign("pages", new_md.pages)
def overlay_credits(self, new_credits: list[CreditMetadata]) -> None:
def overlay_credits(self, new_credits: list[Credit]) -> None:
for c in new_credits:
primary = bool("primary" in c and c["primary"])
@ -253,7 +285,7 @@ class GenericMetadata:
return coverlist
def add_credit(self, person: str, role: str, primary: bool = False) -> None:
credit = CreditMetadata(person=person, role=role, primary=primary)
credit = Credit(person=person, role=role, primary=primary)
# look to see if it's not already there...
found = False
@ -373,9 +405,11 @@ class GenericMetadata:
md_test: GenericMetadata = GenericMetadata(
is_empty=False,
tag_origin=None,
tag_origin=TagOrigin("comicvine", "Comic Vine"),
series="Cory Doctorow's Futuristic Tales of the Here and Now",
series_id="23437",
issue="1",
issue_id="140529",
title="Anda's Game",
publisher="IDW Publishing",
month=10,
@ -383,9 +417,9 @@ md_test: GenericMetadata = GenericMetadata(
day=1,
issue_count=6,
volume=1,
genre="Sci-Fi",
genres=["Sci-Fi"],
language="en",
comments=(
description=(
"For 12-year-old Anda, getting paid real money to kill the characters of players who were cheating"
" in her favorite online computer game was a win-win situation. Until she found out who was paying her,"
" and what those characters meant to the livelihood of children around the world."
@ -404,19 +438,19 @@ md_test: GenericMetadata = GenericMetadata(
black_and_white=None,
page_count=24,
maturity_rating="Everyone 10+",
story_arc="Here and Now",
series_group="Futuristic Tales",
story_arcs=["Here and Now"],
series_groups=["Futuristic Tales"],
scan_info="(CC BY-NC-SA 3.0)",
characters="Anda",
teams="Fahrenheit",
locations="lonely cottage ",
characters=["Anda"],
teams=["Fahrenheit"],
locations=utils.split("lonely cottage ", ","),
credits=[
CreditMetadata(primary=False, person="Dara Naraghi", role="Writer"),
CreditMetadata(primary=False, person="Esteve Polls", role="Penciller"),
CreditMetadata(primary=False, person="Esteve Polls", role="Inker"),
CreditMetadata(primary=False, person="Neil Uyetake", role="Letterer"),
CreditMetadata(primary=False, person="Sam Kieth", role="Cover"),
CreditMetadata(primary=False, person="Ted Adams", role="Editor"),
Credit(primary=False, person="Dara Naraghi", role="Writer"),
Credit(primary=False, person="Esteve Polls", role="Penciller"),
Credit(primary=False, person="Esteve Polls", role="Inker"),
Credit(primary=False, person="Neil Uyetake", role="Letterer"),
Credit(primary=False, person="Sam Kieth", role="Cover"),
Credit(primary=False, person="Ted Adams", role="Editor"),
],
tags=set(),
pages=[

View File

@ -100,7 +100,7 @@ def get_recursive_filelist(pathlist: list[str]) -> list[str]:
def add_to_path(dirname: str) -> None:
if dirname:
dirname = os.path.abspath(dirname)
paths = [os.path.normpath(x) for x in os.environ["PATH"].split(os.pathsep)]
paths = [os.path.normpath(x) for x in split(os.environ["PATH"], os.pathsep)]
if dirname not in paths:
paths.insert(0, dirname)
@ -136,7 +136,14 @@ def xlate(data: Any) -> str | None:
if data is None or isinstance(data, str) and data.strip() == "":
return None
return str(data)
return str(data).strip()
def split(s: str | None, c: str) -> list[str]:
s = xlate(s)
if s:
return [x.strip() for x in s.strip().split(c) if x.strip()]
return []
def remove_articles(text: str) -> str:

View File

@ -119,8 +119,7 @@ class AutoTagMatchWindow(QtWidgets.QDialog):
self.twList.setSortingEnabled(False)
row = 0
for match in self.current_match_set.matches:
for row, match in enumerate(self.current_match_set.matches):
self.twList.insertRow(row)
item_text = match["series"]
@ -160,8 +159,6 @@ class AutoTagMatchWindow(QtWidgets.QDialog):
item.setFlags(QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled)
self.twList.setItem(row, 3, item)
row += 1
self.twList.resizeColumnsToContents()
self.twList.setSortingEnabled(True)
self.twList.sortItems(2, QtCore.Qt.SortOrder.AscendingOrder)

View File

@ -17,7 +17,7 @@ from __future__ import annotations
import logging
from comicapi.genericmetadata import CreditMetadata, GenericMetadata
from comicapi.genericmetadata import Credit, GenericMetadata
from comictaggerlib.ctsettings import ct_ns
logger = logging.getLogger(__name__)
@ -29,21 +29,10 @@ class CBLTransformer:
self.config = config
def apply(self) -> GenericMetadata:
# helper funcs
def append_to_tags_if_unique(item: str) -> None:
if item.casefold() not in (tag.casefold() for tag in self.metadata.tags):
self.metadata.tags.add(item)
def add_string_list_to_tags(str_list: str | None) -> None:
if str_list:
items = [s.strip() for s in str_list.split(",")]
for item in items:
append_to_tags_if_unique(item)
if self.config.cbl_assume_lone_credit_is_primary:
# helper
def set_lone_primary(role_list: list[str]) -> tuple[CreditMetadata | None, int]:
lone_credit: CreditMetadata | None = None
def set_lone_primary(role_list: list[str]) -> tuple[Credit | None, int]:
lone_credit: Credit | None = None
count = 0
for c in self.metadata.credits:
if c["role"].casefold() in role_list:
@ -67,33 +56,33 @@ class CBLTransformer:
self.metadata.add_credit(c["person"], "Artist", True)
if self.config.cbl_copy_characters_to_tags:
add_string_list_to_tags(self.metadata.characters)
self.metadata.tags.update(x for x in self.metadata.characters)
if self.config.cbl_copy_teams_to_tags:
add_string_list_to_tags(self.metadata.teams)
self.metadata.tags.update(x for x in self.metadata.teams)
if self.config.cbl_copy_locations_to_tags:
add_string_list_to_tags(self.metadata.locations)
self.metadata.tags.update(x for x in self.metadata.locations)
if self.config.cbl_copy_storyarcs_to_tags:
add_string_list_to_tags(self.metadata.story_arc)
self.metadata.tags.update(x for x in self.metadata.story_arcs)
if self.config.cbl_copy_notes_to_comments:
if self.metadata.notes is not None:
if self.metadata.comments is None:
self.metadata.comments = ""
if self.metadata.description is None:
self.metadata.description = ""
else:
self.metadata.comments += "\n\n"
if self.metadata.notes not in self.metadata.comments:
self.metadata.comments += self.metadata.notes
self.metadata.description += "\n\n"
if self.metadata.notes not in self.metadata.description:
self.metadata.description += self.metadata.notes
if self.config.cbl_copy_weblink_to_comments:
if self.metadata.web_link is not None:
if self.metadata.comments is None:
self.metadata.comments = ""
if self.metadata.description is None:
self.metadata.description = ""
else:
self.metadata.comments += "\n\n"
if self.metadata.web_link not in self.metadata.comments:
self.metadata.comments += self.metadata.web_link
self.metadata.description += "\n\n"
if self.metadata.web_link not in self.metadata.description:
self.metadata.description += self.metadata.web_link
return self.metadata

View File

@ -34,6 +34,7 @@ from comictaggerlib.graphics import graphics_path
from comictaggerlib.issueidentifier import IssueIdentifier
from comictaggerlib.resulttypes import MultipleMatch, OnlineMatchResults
from comictalker.comictalker import ComicTalker, TalkerError
from comictalker.talker_utils import cleanup_html
logger = logging.getLogger(__name__)
@ -89,8 +90,7 @@ class CLI:
# sort match list by year
match_set.matches.sort(key=lambda k: k["year"] or 0)
for counter, m in enumerate(match_set.matches):
counter += 1
for counter, m in enumerate(match_set.matches, 1):
print(
" {}. {} #{} [{}] ({}/{}) - {}".format(
counter,
@ -435,7 +435,12 @@ class CLI:
f"Tagged with ComicTagger {ctversion.version} using info from {self.current_talker().name} on"
f" {datetime.now():%Y-%m-%d %H:%M:%S}. [Issue ID {ct_md.issue_id}]"
)
md.overlay(ct_md.replace(notes=utils.combine_notes(md.notes, notes, "Tagged with ComicTagger")))
md.overlay(
ct_md.replace(
notes=utils.combine_notes(md.notes, notes, "Tagged with ComicTagger"),
description=cleanup_html(ct_md.description, self.config.talker_remove_html_tables),
)
)
if self.config.identifier_auto_imprint:
md.fix_publisher()

View File

@ -124,6 +124,13 @@ def filename(parser: settngs.Manager) -> None:
def talker(parser: settngs.Manager) -> None:
# General settings for talkers
parser.add_setting("--source", default="comicvine", help="Use a specified source by source ID")
parser.add_setting(
"--remove-html-tables",
default=False,
action=argparse.BooleanOptionalAction,
display_name="Remove HTML tables",
help="Removes html tables instead of converting them to text",
)
def cbl(parser: settngs.Manager) -> None:

View File

@ -77,6 +77,7 @@ class settngs_namespace(settngs.TypedNS):
filename_remove_publisher: bool
talker_source: str
talker_remove_html_tables: bool
cbl_assume_lone_credit_is_primary: bool
cbl_copy_characters_to_tags: bool

View File

@ -5,6 +5,7 @@ import pathlib
from appdirs import AppDirs
from comicapi import utils
from comicapi.comicarchive import MetaDataStyle
from comicapi.genericmetadata import GenericMetadata
@ -67,7 +68,7 @@ def metadata_type_single(types: str) -> int:
def metadata_type(types: str) -> list[int]:
result = []
types = types.casefold()
for typ in types.split(","):
for typ in utils.split(types, ","):
typ = typ.strip()
if typ not in MetaDataStyle.short_name:
choices = ", ".join(MetaDataStyle.short_name)
@ -93,7 +94,7 @@ def parse_metadata_from_string(mdstr: str) -> GenericMetadata:
# First, replace escaped commas with with a unique token (to be changed back later)
mdstr = mdstr.replace(escaped_comma, replacement_token)
tmp_list = mdstr.split(",")
tmp_list = utils.split(mdstr, ",")
md_list = []
for item in tmp_list:
item = item.replace(replacement_token, ",")
@ -104,11 +105,11 @@ def parse_metadata_from_string(mdstr: str) -> GenericMetadata:
for item in md_list:
# Make sure to fix any escaped equal signs
i = item.replace(escaped_equals, replacement_token)
key, value = i.split("=")
key, value = utils.split(i, "=")
value = value.replace(replacement_token, "=").strip()
key = key.strip()
if key.casefold() == "credit":
cred_attribs = value.split(":")
cred_attribs = utils.split(value, ":")
role = cred_attribs[0]
person = cred_attribs[1] if len(cred_attribs) > 1 else ""
primary = len(cred_attribs) > 2

View File

@ -67,6 +67,8 @@ class MetadataFormatter(string.Formatter):
return str(value).swapcase()
if conversion == "t":
return str(value).title()
if conversion == "j":
return ", ".join(list(value))
return cast(str, super().convert_field(value, conversion))
def handle_replacements(self, string: str, replacements: list[Replacement]) -> str:

View File

@ -471,8 +471,8 @@ class IssueIdentifier:
# now re-associate the issues and series
# is this really needed?
for issue in issue_list:
if issue.series.id in series_by_id:
shortlist.append((series_by_id[issue.series.id], issue))
if issue.series_id in series_by_id:
shortlist.append((series_by_id[issue.series_id], issue))
if keys["year"] is None:
self.log_msg(f"Found {len(shortlist)} series that have an issue #{keys['issue_number']}")
@ -494,9 +494,6 @@ class IssueIdentifier:
newline=False,
)
# parse out the cover date
_, month, year = utils.parse_date_str(issue.cover_date)
# Now check the cover match against the primary image
hash_list = [cover_hash]
if narrow_cover_hash is not None:
@ -508,8 +505,8 @@ class IssueIdentifier:
logger.info("Adding cropped cover to the hashlist")
try:
image_url = issue.image_url
alt_urls = issue.alt_image_urls
image_url = issue.cover_image or ""
alt_urls = issue.alternate_images
score_item = self.get_issue_cover_match_score(
image_url, alt_urls, hash_list, use_remote_alternates=False
@ -525,15 +522,15 @@ class IssueIdentifier:
"issue_number": keys["issue_number"],
"cv_issue_count": series.count_of_issues,
"url_image_hash": score_item["hash"],
"issue_title": issue.name,
"issue_id": issue.id,
"issue_title": issue.title or "",
"issue_id": issue.issue_id or "",
"series_id": series.id,
"month": month,
"year": year,
"month": issue.month,
"year": issue.year,
"publisher": None,
"image_url": image_url,
"alt_image_urls": alt_urls,
"description": issue.description,
"description": issue.description or "",
}
if series.publisher is not None:
match["publisher"] = series.publisher

View File

@ -19,13 +19,13 @@ import logging
from PyQt5 import QtCore, QtGui, QtWidgets, uic
from comicapi.genericmetadata import GenericMetadata
from comicapi.issuestring import IssueString
from comictaggerlib.coverimagewidget import CoverImageWidget
from comictaggerlib.ctsettings import ct_ns
from comictaggerlib.ui import ui_path
from comictaggerlib.ui.qtutils import new_web_view, reduce_widget_font_size
from comictalker.comictalker import ComicTalker, TalkerError
from comictalker.resulttypes import ComicIssue
logger = logging.getLogger(__name__)
@ -87,7 +87,7 @@ class IssueSelectionWindow(QtWidgets.QDialog):
self.config = config
self.talker = talker
self.url_fetch_thread = None
self.issue_list: list[ComicIssue] = []
self.issue_list: list[GenericMetadata] = []
# Display talker logo and set url
self.lblIssuesSourceName.setText(talker.attribution)
@ -143,7 +143,7 @@ class IssueSelectionWindow(QtWidgets.QDialog):
QtWidgets.QApplication.setOverrideCursor(QtGui.QCursor(QtCore.Qt.CursorShape.WaitCursor))
try:
self.issue_list = self.talker.fetch_issues_by_series(self.series_id)
self.issue_list = self.talker.fetch_issues_in_series(self.series_id)
except TalkerError as e:
QtWidgets.QApplication.restoreOverrideCursor()
QtWidgets.QMessageBox.critical(self, f"{e.source} {e.code_name} Error", f"{e}")
@ -153,46 +153,36 @@ class IssueSelectionWindow(QtWidgets.QDialog):
self.twList.setSortingEnabled(False)
row = 0
for record in self.issue_list:
for row, issue in enumerate(self.issue_list):
self.twList.insertRow(row)
item_text = record.issue_number
item_text = issue.issue or ""
item = IssueNumberTableWidgetItem(item_text)
item.setData(QtCore.Qt.ItemDataRole.ToolTipRole, item_text)
item.setData(QtCore.Qt.ItemDataRole.UserRole, record.id)
item.setData(QtCore.Qt.ItemDataRole.UserRole, issue.issue_id)
item.setData(QtCore.Qt.ItemDataRole.DisplayRole, item_text)
item.setFlags(QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled)
self.twList.setItem(row, 0, item)
item_text = record.cover_date
if item_text is None:
item_text = ""
# remove the day of "YYYY-MM-DD"
parts = item_text.split("-")
if len(parts) > 1:
item_text = parts[0] + "-" + parts[1]
item_text = ""
if issue.year is not None:
item_text = f"{issue.year:04}"
if issue.month is not None:
item_text = f"{issue.month:02}"
qtw_item = QtWidgets.QTableWidgetItem(item_text)
qtw_item.setData(QtCore.Qt.ItemDataRole.ToolTipRole, item_text)
qtw_item.setFlags(QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled)
self.twList.setItem(row, 1, qtw_item)
item_text = record.name
if item_text is None:
item_text = ""
item_text = issue.title or ""
qtw_item = QtWidgets.QTableWidgetItem(item_text)
qtw_item.setData(QtCore.Qt.ItemDataRole.ToolTipRole, item_text)
qtw_item.setFlags(QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled)
self.twList.setItem(row, 2, qtw_item)
if (
IssueString(record.issue_number).as_string().casefold()
== IssueString(self.issue_number).as_string().casefold()
):
self.initial_id = record.id
row += 1
if IssueString(issue.issue).as_string().casefold() == IssueString(self.issue_number).as_string().casefold():
self.initial_id = issue.issue_id or ""
self.twList.setSortingEnabled(True)
self.twList.sortItems(0, QtCore.Qt.SortOrder.AscendingOrder)
@ -218,13 +208,12 @@ class IssueSelectionWindow(QtWidgets.QDialog):
self.issue_id = self.twList.item(curr.row(), 0).data(QtCore.Qt.ItemDataRole.UserRole)
# list selection was changed, update the the issue cover
for record in self.issue_list:
if record.id == self.issue_id:
self.issue_number = record.issue_number
self.coverWidget.set_issue_details(self.issue_id, [record.image_url, *record.alt_image_urls])
if record.description is None:
self.set_description(self.teDescription, "")
else:
self.set_description(self.teDescription, record.description)
break
issue = self.issue_list[curr.row()]
if not all((issue.issue, issue.year, issue.month, issue.cover_image)): # issue.title, issue.description
issue = self.talker.fetch_comic_data(issue_id=self.issue_id)
self.issue_number = issue.issue or ""
self.coverWidget.set_issue_details(self.issue_id, [issue.cover_image or "", *issue.alternate_images])
if issue.description is None:
self.set_description(self.teDescription, "")
else:
self.set_description(self.teDescription, issue.description)

View File

@ -141,6 +141,7 @@ class App:
config_paths.user_config_dir / "settings.json", list(args) or None
)
config = cast(settngs.Config[ct_ns], self.manager.get_namespace(cfg, file=True, cmdline=True))
config[0].runtime_config = config_paths
config = ctsettings.validate_commandline_settings(config, self.manager)
config = ctsettings.validate_file_settings(config)

View File

@ -89,8 +89,7 @@ class MatchSelectionWindow(QtWidgets.QDialog):
self.twList.setSortingEnabled(False)
row = 0
for match in self.matches:
for row, match in enumerate(self.matches):
self.twList.insertRow(row)
item_text = match["series"]
@ -130,8 +129,6 @@ class MatchSelectionWindow(QtWidgets.QDialog):
item.setFlags(QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled)
self.twList.setItem(row, 3, item)
row += 1
self.twList.resizeColumnsToContents()
self.twList.setSortingEnabled(True)
self.twList.sortItems(2, QtCore.Qt.SortOrder.AscendingOrder)

View File

@ -178,11 +178,11 @@ class RenameWindow(QtWidgets.QDialog):
QtCore.QCoreApplication.processEvents()
try:
for idx, comic in enumerate(zip(self.comic_archive_list, self.rename_list)):
for idx, comic in enumerate(zip(self.comic_archive_list, self.rename_list), 1):
QtCore.QCoreApplication.processEvents()
if prog_dialog.wasCanceled():
break
idx += 1
prog_dialog.setValue(idx)
prog_dialog.setLabelText(comic[1])
center_window_on_parent(prog_dialog)

View File

@ -19,12 +19,13 @@ import itertools
import logging
from collections import deque
import natsort
from PyQt5 import QtCore, QtGui, QtWidgets, uic
from PyQt5.QtCore import QUrl, pyqtSignal
from comicapi import utils
from comicapi.comicarchive import ComicArchive
from comicapi.genericmetadata import GenericMetadata
from comicapi.genericmetadata import ComicSeries, GenericMetadata
from comictaggerlib.coverimagewidget import CoverImageWidget
from comictaggerlib.ctsettings import ct_ns
from comictaggerlib.issueidentifier import IssueIdentifier
@ -34,7 +35,6 @@ from comictaggerlib.progresswindow import IDProgressWindow
from comictaggerlib.ui import ui_path
from comictaggerlib.ui.qtutils import new_web_view, reduce_widget_font_size
from comictalker.comictalker import ComicTalker, TalkerError
from comictalker.resulttypes import ComicSeries
logger = logging.getLogger(__name__)
@ -153,7 +153,7 @@ class SeriesSelectionWindow(QtWidgets.QDialog):
self.comic_archive = comic_archive
self.immediate_autoselect = autoselect
self.cover_index_list = cover_index_list
self.ct_search_results: list[ComicSeries] = []
self.series_list: list[ComicSeries] = []
self.literal = literal
self.ii: IssueIdentifier | None = None
self.iddialog: IDProgressWindow | None = None
@ -209,7 +209,7 @@ class SeriesSelectionWindow(QtWidgets.QDialog):
self.twList.hideRow(r)
def update_buttons(self) -> None:
enabled = bool(self.ct_search_results)
enabled = bool(self.series_list)
self.btnRequery.setEnabled(enabled)
@ -332,11 +332,9 @@ class SeriesSelectionWindow(QtWidgets.QDialog):
def show_issues(self) -> None:
selector = IssueSelectionWindow(self, self.config, self.talker, self.series_id, self.issue_number)
title = ""
for record in self.ct_search_results:
if record.id == self.series_id:
title = record.name
title += " (" + str(record.start_year) + ")"
title += " - "
for series in self.series_list:
if series.id == self.series_id:
title = f"{series.name} ({series.start_year:04}) - "
break
selector.setWindowTitle(title + "Select Issue")
@ -351,9 +349,8 @@ class SeriesSelectionWindow(QtWidgets.QDialog):
self.imageWidget.update_content()
def select_by_id(self) -> None:
for r in range(0, self.twList.rowCount()):
series_id = self.twList.item(r, 0).data(QtCore.Qt.ItemDataRole.UserRole)
if series_id == self.series_id:
for r, series in enumerate(self.series_list):
if series.id == self.series_id:
self.twList.selectRow(r)
break
@ -407,16 +404,16 @@ class SeriesSelectionWindow(QtWidgets.QDialog):
)
return
self.ct_search_results = self.search_thread.ct_search_results if self.search_thread is not None else []
self.series_list = self.search_thread.ct_search_results if self.search_thread is not None else []
# filter the publishers if enabled set
if self.use_filter:
try:
publisher_filter = {s.strip().casefold() for s in self.config.identifier_publisher_filter}
# use '' as publisher name if None
self.ct_search_results = list(
self.series_list = list(
filter(
lambda d: ("" if d.publisher is None else str(d.publisher).casefold()) not in publisher_filter,
self.ct_search_results,
self.series_list,
)
)
except Exception:
@ -428,8 +425,8 @@ class SeriesSelectionWindow(QtWidgets.QDialog):
# sort by start_year if set
if self.config.identifier_sort_series_by_year:
try:
self.ct_search_results = sorted(
self.ct_search_results,
self.series_list = natsort.natsorted(
self.series_list,
key=lambda i: (str(i.start_year), str(i.count_of_issues)),
reverse=True,
)
@ -437,8 +434,8 @@ class SeriesSelectionWindow(QtWidgets.QDialog):
logger.exception("bad data error sorting results by start_year,count_of_issues")
else:
try:
self.ct_search_results = sorted(
self.ct_search_results, key=lambda i: str(i.count_of_issues), reverse=True
self.series_list = natsort.natsorted(
self.series_list, key=lambda i: str(i.count_of_issues), reverse=True
)
except Exception:
logger.exception("bad data error sorting results by count_of_issues")
@ -461,10 +458,10 @@ class SeriesSelectionWindow(QtWidgets.QDialog):
return 1
return 2
for comic in self.ct_search_results:
for comic in self.series_list:
deques[categorize(comic)].append(comic)
logger.info("Length: %d, %d, %d", len(deques[0]), len(deques[1]), len(deques[2]))
self.ct_search_results = list(itertools.chain.from_iterable(deques))
self.series_list = list(itertools.chain.from_iterable(deques))
except Exception:
logger.exception("bad data error filtering exact/near matches")
@ -474,42 +471,39 @@ class SeriesSelectionWindow(QtWidgets.QDialog):
self.twList.setRowCount(0)
row = 0
for record in self.ct_search_results:
for row, series in enumerate(self.series_list):
self.twList.insertRow(row)
item_text = record.name
item_text = series.name
item = QtWidgets.QTableWidgetItem(item_text)
item.setData(QtCore.Qt.ItemDataRole.ToolTipRole, item_text)
item.setData(QtCore.Qt.ItemDataRole.UserRole, record.id)
item.setData(QtCore.Qt.ItemDataRole.UserRole, series.id)
item.setFlags(QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled)
self.twList.setItem(row, 0, item)
if record.start_year is not None:
item_text = f"{record.start_year:04}"
if series.start_year is not None:
item_text = f"{series.start_year:04}"
item = QtWidgets.QTableWidgetItem(item_text)
item.setData(QtCore.Qt.ItemDataRole.ToolTipRole, item_text)
item.setData(QtCore.Qt.ItemDataRole.DisplayRole, record.start_year)
item.setData(QtCore.Qt.ItemDataRole.DisplayRole, series.start_year)
item.setFlags(QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled)
self.twList.setItem(row, 1, item)
if record.count_of_issues is not None:
item_text = f"{record.count_of_issues:04}"
if series.count_of_issues is not None:
item_text = f"{series.count_of_issues:04}"
item = QtWidgets.QTableWidgetItem(item_text)
item.setData(QtCore.Qt.ItemDataRole.ToolTipRole, item_text)
item.setData(QtCore.Qt.ItemDataRole.DisplayRole, record.count_of_issues)
item.setData(QtCore.Qt.ItemDataRole.DisplayRole, series.count_of_issues)
item.setFlags(QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled)
self.twList.setItem(row, 2, item)
if record.publisher is not None:
item_text = record.publisher
if series.publisher is not None:
item_text = series.publisher
item.setData(QtCore.Qt.ItemDataRole.ToolTipRole, item_text)
item = QtWidgets.QTableWidgetItem(item_text)
item.setFlags(QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled)
self.twList.setItem(row, 3, item)
row += 1
self.twList.setSortingEnabled(True)
self.twList.selectRow(0)
self.twList.resizeColumnsToContents()
@ -526,7 +520,7 @@ class SeriesSelectionWindow(QtWidgets.QDialog):
def showEvent(self, event: QtGui.QShowEvent) -> None:
self.perform_query()
if not self.ct_search_results:
if not self.series_list:
QtCore.QCoreApplication.processEvents()
QtWidgets.QMessageBox.information(self, "Search Result", "No matches found!")
QtCore.QTimer.singleShot(200, self.close_me)
@ -559,11 +553,20 @@ class SeriesSelectionWindow(QtWidgets.QDialog):
self.series_id = self.twList.item(curr.row(), 0).data(QtCore.Qt.ItemDataRole.UserRole)
# list selection was changed, update the info on the series
for record in self.ct_search_results:
if record.id == self.series_id:
if record.description is None:
self.set_description(self.teDetails, "")
else:
self.set_description(self.teDetails, record.description)
self.imageWidget.set_url(record.image_url)
break
series = self.series_list[curr.row()]
if not all(
(
series.name,
series.start_year,
series.count_of_issues,
series.publisher,
series.description,
series.image_url,
)
):
series = self.talker.fetch_series(self.series_id)
if series.description is None:
self.set_description(self.teDetails, "")
else:
self.set_description(self.teDetails, series.description)
self.imageWidget.set_url(series.image_url)

View File

@ -424,9 +424,7 @@ class SettingsWindow(QtWidgets.QDialog):
self.config[0].identifier_series_match_identify_thresh = self.sbNameMatchIdentifyThresh.value()
self.config[0].identifier_series_match_search_thresh = self.sbNameMatchSearchThresh.value()
self.config[0].identifier_publisher_filter = [
x.strip() for x in str(self.tePublisherFilter.toPlainText()).splitlines() if x.strip()
]
self.config[0].identifier_publisher_filter = utils.split(self.tePublisherFilter.toPlainText(), "\n")
self.config[0].filename_complicated_parser = self.cbxComplicatedParser.isChecked()
self.config[0].filename_remove_c2c = self.cbxRemoveC2C.isChecked()

View File

@ -25,7 +25,6 @@ import pprint
import re
import sys
import webbrowser
from collections.abc import Iterable
from datetime import datetime
from typing import Any, Callable
from urllib.parse import urlparse
@ -65,6 +64,7 @@ from comictaggerlib.ui import ui_path
from comictaggerlib.ui.qtutils import center_window_on_parent, reduce_widget_font_size
from comictaggerlib.versionchecker import VersionChecker
from comictalker.comictalker import ComicTalker, TalkerError
from comictalker.talker_utils import cleanup_html
logger = logging.getLogger(__name__)
@ -468,9 +468,12 @@ class TaggerWindow(QtWidgets.QMainWindow):
def repackage_archive(self) -> None:
ca_list = self.fileSelectionList.get_selected_archive_list()
non_zip_count = 0
zip_list = []
for ca in ca_list:
if not ca.is_zip():
non_zip_count += 1
else:
zip_list.append(ca)
if non_zip_count == 0:
QtWidgets.QMessageBox.information(
@ -507,7 +510,6 @@ class TaggerWindow(QtWidgets.QMainWindow):
prog_dialog.setMinimumDuration(300)
center_window_on_parent(prog_dialog)
QtCore.QCoreApplication.processEvents()
prog_idx = 0
new_archives_to_add = []
archives_to_remove = []
@ -515,41 +517,39 @@ class TaggerWindow(QtWidgets.QMainWindow):
failed_list = []
success_count = 0
for ca in ca_list:
if not ca.is_zip():
QtCore.QCoreApplication.processEvents()
if prog_dialog.wasCanceled():
break
prog_idx += 1
prog_dialog.setValue(prog_idx)
prog_dialog.setLabelText(str(ca.path))
center_window_on_parent(prog_dialog)
QtCore.QCoreApplication.processEvents()
for prog_idx, ca in enumerate(zip_list, 1):
QtCore.QCoreApplication.processEvents()
if prog_dialog.wasCanceled():
break
prog_dialog.setValue(prog_idx)
prog_dialog.setLabelText(str(ca.path))
center_window_on_parent(prog_dialog)
QtCore.QCoreApplication.processEvents()
export_name = ca.path.with_suffix(".cbz")
export = True
export_name = ca.path.with_suffix(".cbz")
export = True
if export_name.exists():
if EW.fileConflictBehavior == ExportConflictOpts.dontCreate:
export = False
skipped_list.append(ca.path)
elif EW.fileConflictBehavior == ExportConflictOpts.createUnique:
export_name = utils.unique_file(export_name)
if export_name.exists():
if EW.fileConflictBehavior == ExportConflictOpts.dontCreate:
export = False
skipped_list.append(ca.path)
elif EW.fileConflictBehavior == ExportConflictOpts.createUnique:
export_name = utils.unique_file(export_name)
if export:
if ca.export_as_zip(export_name):
success_count += 1
if EW.addToList:
new_archives_to_add.append(str(export_name))
if EW.deleteOriginal:
archives_to_remove.append(ca)
ca.path.unlink(missing_ok=True)
if export:
if ca.export_as_zip(export_name):
success_count += 1
if EW.addToList:
new_archives_to_add.append(str(export_name))
if EW.deleteOriginal:
archives_to_remove.append(ca)
ca.path.unlink(missing_ok=True)
else:
# last export failed, so remove the zip, if it exists
failed_list.append(ca.path)
if export_name.exists():
export_name.unlink(missing_ok=True)
else:
# last export failed, so remove the zip, if it exists
failed_list.append(ca.path)
if export_name.exists():
export_name.unlink(missing_ok=True)
prog_dialog.hide()
QtCore.QCoreApplication.processEvents()
@ -797,20 +797,20 @@ class TaggerWindow(QtWidgets.QMainWindow):
assign_text(self.lePubMonth, md.month)
assign_text(self.lePubYear, md.year)
assign_text(self.lePubDay, md.day)
assign_text(self.leGenre, md.genre)
assign_text(self.leGenre, ",".join(md.genres))
assign_text(self.leImprint, md.imprint)
assign_text(self.teComments, md.comments)
assign_text(self.teComments, md.description)
assign_text(self.teNotes, md.notes)
assign_text(self.leStoryArc, md.story_arc)
assign_text(self.leStoryArc, ",".join(md.story_arcs))
assign_text(self.leScanInfo, md.scan_info)
assign_text(self.leSeriesGroup, md.series_group)
assign_text(self.leSeriesGroup, ",".join(md.series_groups))
assign_text(self.leAltSeries, md.alternate_series)
assign_text(self.leAltIssueNum, md.alternate_number)
assign_text(self.leAltIssueCount, md.alternate_count)
assign_text(self.leWebLink, md.web_link)
assign_text(self.teCharacters, md.characters)
assign_text(self.teTeams, md.teams)
assign_text(self.teLocations, md.locations)
assign_text(self.teCharacters, "\n".join(md.characters))
assign_text(self.teTeams, "\n".join(md.teams))
assign_text(self.teLocations, "\n".join(md.locations))
self.dsbCriticalRating.setValue(md.critical_rating or 0.0)
@ -860,8 +860,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
if md.credits is not None and len(md.credits) != 0:
self.twCredits.setSortingEnabled(False)
row = 0
for credit in md.credits:
for row, credit in enumerate(md.credits):
# if the role-person pair already exists, just skip adding it to the list
if self.is_dupe_credit(credit["role"].title(), credit["person"]):
continue
@ -870,8 +869,6 @@ class TaggerWindow(QtWidgets.QMainWindow):
row, credit["role"].title(), credit["person"], (credit["primary"] if "primary" in credit else False)
)
row += 1
self.twCredits.setSortingEnabled(True)
self.update_credit_colors()
@ -919,9 +916,9 @@ class TaggerWindow(QtWidgets.QMainWindow):
md.series = utils.xlate(self.leSeries.text())
md.title = utils.xlate(self.leTitle.text())
md.publisher = utils.xlate(self.lePublisher.text())
md.genre = utils.xlate(self.leGenre.text())
md.genres = utils.split(self.leGenre.text(), ",")
md.imprint = utils.xlate(self.leImprint.text())
md.comments = utils.xlate(self.teComments.toPlainText())
md.description = utils.xlate(self.teComments.toPlainText())
md.notes = utils.xlate(self.teNotes.toPlainText())
md.maturity_rating = self.cbMaturityRating.currentText()
@ -929,14 +926,14 @@ class TaggerWindow(QtWidgets.QMainWindow):
if md.critical_rating == 0.0:
md.critical_rating = None
md.story_arc = utils.xlate(self.leStoryArc.text())
md.story_arcs = utils.split(self.leStoryArc.text(), ",")
md.scan_info = utils.xlate(self.leScanInfo.text())
md.series_group = utils.xlate(self.leSeriesGroup.text())
md.alternate_series = utils.xlate(self.leAltSeries.text())
md.series_groups = utils.split(self.leSeriesGroup.text(), ",")
md.alternate_series = self.leAltSeries.text()
md.web_link = utils.xlate(self.leWebLink.text())
md.characters = utils.xlate(self.teCharacters.toPlainText())
md.teams = utils.xlate(self.teTeams.toPlainText())
md.locations = utils.xlate(self.teLocations.toPlainText())
md.characters = utils.split(self.teCharacters.toPlainText(), "\n")
md.teams = utils.split(self.teTeams.toPlainText(), "\n")
md.locations = utils.split(self.teLocations.toPlainText(), "\n")
md.format = utils.xlate(self.cbFormat.currentText())
md.country = utils.xlate(self.cbCountry.currentText())
@ -946,13 +943,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
md.manga = utils.xlate(self.cbManga.itemData(self.cbManga.currentIndex()))
# Make a list from the comma delimited tags string
tmp = self.teTags.toPlainText()
if tmp is not None:
def strip_list(i: Iterable[str]) -> set[str]:
return {x.strip() for x in i}
md.tags = strip_list(tmp.split(","))
md.tags = set(utils.split(self.teTags.toPlainText(), ","))
md.black_and_white = self.cbBW.isChecked()
@ -997,7 +988,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
if dialog.exec():
file_list = dialog.selectedFiles()
if file_list:
self.fileSelectionList.add_path_item(file_list[0])
self.fileSelectionList.twList.selectRow(self.fileSelectionList.add_path_item(file_list[0]))
def select_file(self, folder_mode: bool = False) -> None:
dialog = self.file_dialog(folder_mode=folder_mode)
@ -1100,7 +1091,10 @@ class TaggerWindow(QtWidgets.QMainWindow):
)
self.metadata.overlay(
new_metadata.replace(
notes=utils.combine_notes(self.metadata.notes, notes, "Tagged with ComicTagger")
notes=utils.combine_notes(self.metadata.notes, notes, "Tagged with ComicTagger"),
description=cleanup_html(
new_metadata.description, self.config[0].talker_remove_html_tables
),
)
)
# Now push the new combined data into the edit controls
@ -1548,27 +1542,23 @@ class TaggerWindow(QtWidgets.QMainWindow):
center_window_on_parent(progdialog)
QtCore.QCoreApplication.processEvents()
prog_idx = 0
failed_list = []
success_count = 0
for ca in ca_list:
for prog_idx, ca in enumerate(ca_list, 1):
QtCore.QCoreApplication.processEvents()
if progdialog.wasCanceled():
break
progdialog.setValue(prog_idx)
progdialog.setLabelText(str(ca.path))
center_window_on_parent(progdialog)
QtCore.QCoreApplication.processEvents()
if ca.has_metadata(style):
QtCore.QCoreApplication.processEvents()
if progdialog.wasCanceled():
break
prog_idx += 1
progdialog.setValue(prog_idx)
progdialog.setLabelText(str(ca.path))
center_window_on_parent(progdialog)
QtCore.QCoreApplication.processEvents()
if ca.has_metadata(style) and ca.is_writable():
if not ca.remove_metadata(style):
failed_list.append(ca.path)
else:
success_count += 1
ca.load_cache([MetaDataStyle.CBI, MetaDataStyle.CIX])
if ca.is_writable():
if not ca.remove_metadata(style):
failed_list.append(ca.path)
else:
success_count += 1
ca.load_cache([MetaDataStyle.CBI, MetaDataStyle.CIX])
progdialog.hide()
QtCore.QCoreApplication.processEvents()
@ -1630,20 +1620,18 @@ class TaggerWindow(QtWidgets.QMainWindow):
prog_dialog.setMinimumDuration(300)
center_window_on_parent(prog_dialog)
QtCore.QCoreApplication.processEvents()
prog_idx = 0
failed_list = []
success_count = 0
for ca in ca_list:
if ca.has_metadata(src_style):
QtCore.QCoreApplication.processEvents()
if prog_dialog.wasCanceled():
break
prog_idx += 1
prog_dialog.setValue(prog_idx)
prog_dialog.setLabelText(str(ca.path))
center_window_on_parent(prog_dialog)
QtCore.QCoreApplication.processEvents()
for prog_idx, ca in enumerate(ca_list, 1):
QtCore.QCoreApplication.processEvents()
if prog_dialog.wasCanceled():
break
prog_dialog.setValue(prog_idx)
prog_dialog.setLabelText(str(ca.path))
center_window_on_parent(prog_dialog)
QtCore.QCoreApplication.processEvents()
if ca.has_metadata(src_style) and ca.is_writable():
md = ca.read_metadata(src_style)
@ -1855,13 +1843,11 @@ class TaggerWindow(QtWidgets.QMainWindow):
self.auto_tag_log("==========================================================================\n")
self.auto_tag_log(f"Auto-Tagging Started for {len(ca_list)} items\n")
prog_idx = 0
match_results = OnlineMatchResults()
archives_to_remove = []
for ca in ca_list:
for prog_idx, ca in enumerate(ca_list):
self.auto_tag_log("==========================================================================\n")
self.auto_tag_log(f"Auto-Tagging {prog_idx + 1} of {len(ca_list)}\n")
self.auto_tag_log(f"Auto-Tagging {prog_idx} of {len(ca_list)}\n")
self.auto_tag_log(f"{ca.path}\n")
try:
cover_idx = ca.read_metadata(style).get_cover_page_index_list()[0]
@ -1876,7 +1862,7 @@ class TaggerWindow(QtWidgets.QMainWindow):
if self.atprogdialog.isdone:
break
self.atprogdialog.progressBar.setValue(prog_idx)
prog_idx += 1
self.atprogdialog.label.setText(str(ca.path))
center_window_on_parent(self.atprogdialog)
QtCore.QCoreApplication.processEvents()

View File

@ -10,15 +10,12 @@ else:
from importlib.metadata import entry_points
from comictalker.comictalker import ComicTalker, TalkerError
from comictalker.resulttypes import ComicIssue, ComicSeries
logger = logging.getLogger(__name__)
__all__ = [
"ComicTalker",
"TalkerError",
"ComicIssue",
"ComicSeries",
]

View File

@ -15,16 +15,16 @@
# limitations under the License.
from __future__ import annotations
import dataclasses
import datetime
import json
import logging
import os
import pathlib
import sqlite3
from typing import Any
from typing import Any, cast
from comictalker.resulttypes import ComicIssue, ComicSeries, Credit
from comicapi import utils
from comicapi.genericmetadata import ComicSeries, Credit, GenericMetadata, TagOrigin
logger = logging.getLogger(__name__)
@ -74,21 +74,22 @@ class ComicCacher:
# create tables
with con:
cur = con.cursor()
# source_name,name,id,start_year,publisher,image,description,count_of_issues
# source,name,id,start_year,publisher,image,description,count_of_issues
cur.execute(
"CREATE TABLE SeriesSearchCache("
+ "timestamp DATE DEFAULT (datetime('now','localtime')),"
+ "id TEXT NOT NULL,"
+ "source_name TEXT NOT NULL,"
+ "source TEXT NOT NULL,"
+ "search_term TEXT,"
+ "PRIMARY KEY (id, source_name, search_term))"
+ "PRIMARY KEY (id, source, search_term))"
)
cur.execute("CREATE TABLE Source(" + "id TEXT NOT NULL," + "name TEXT NOT NULL," + "PRIMARY KEY (id))")
cur.execute(
"CREATE TABLE Series("
+ "timestamp DATE DEFAULT (datetime('now','localtime')), "
+ "id TEXT NOT NULL,"
+ "source_name TEXT NOT NULL,"
+ "source TEXT NOT NULL,"
+ "name TEXT,"
+ "publisher TEXT,"
+ "count_of_issues INT,"
@ -99,14 +100,14 @@ class ComicCacher:
+ "description TEXT,"
+ "genres TEXT," # Newline separated. For filtering etc.
+ "format TEXT,"
+ "PRIMARY KEY (id, source_name))"
+ "PRIMARY KEY (id, source))"
)
cur.execute(
"CREATE TABLE Issues("
+ "timestamp DATE DEFAULT (datetime('now','localtime')), "
+ "id TEXT NOT NULL,"
+ "source_name TEXT NOT NULL,"
+ "source TEXT NOT NULL,"
+ "series_id TEXT,"
+ "name TEXT,"
+ "issue_number TEXT,"
@ -131,33 +132,33 @@ class ComicCacher:
+ "country TEXT,"
+ "volume TEXT,"
+ "complete BOOL," # Is the data complete? Includes characters, locations, credits.
+ "PRIMARY KEY (id, source_name))"
+ "PRIMARY KEY (id, source))"
)
def add_search_results(self, source_name: str, search_term: str, ct_search_results: list[ComicSeries]) -> None:
con = sqlite3.connect(self.db_file)
con.row_factory = sqlite3.Row
def add_search_results(self, source: TagOrigin, search_term: str, series_list: list[ComicSeries]) -> None:
self.add_source(source)
with con:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
con.text_factory = str
cur = con.cursor()
# remove all previous entries with this search term
cur.execute(
"DELETE FROM SeriesSearchCache WHERE search_term = ? AND source_name = ?",
[search_term.casefold(), source_name],
"DELETE FROM SeriesSearchCache WHERE search_term = ? AND source = ?",
[search_term.casefold(), source.id],
)
# now add in new results
for record in ct_search_results:
for record in series_list:
cur.execute(
"INSERT INTO SeriesSearchCache (source_name, search_term, id) VALUES(?, ?, ?)",
(source_name, search_term.casefold(), record.id),
"INSERT INTO SeriesSearchCache (source, search_term, id) VALUES(?, ?, ?)",
(source.id, search_term.casefold(), record.id),
)
data = {
"id": record.id,
"source_name": source_name,
"source": source.id,
"name": record.name,
"publisher": record.publisher,
"count_of_issues": record.count_of_issues,
@ -172,19 +173,101 @@ class ComicCacher:
}
self.upsert(cur, "series", data)
def get_search_results(self, source_name: str, search_term: str) -> list[ComicSeries]:
def add_series_info(self, source: TagOrigin, series: ComicSeries) -> None:
self.add_source(source)
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
timestamp = datetime.datetime.now()
data = {
"id": series.id,
"source": source.id,
"name": series.name,
"publisher": series.publisher,
"count_of_issues": series.count_of_issues,
"count_of_volumes": series.count_of_volumes,
"start_year": series.start_year,
"image_url": series.image_url,
"description": series.description,
"genres": "\n".join(series.genres),
"format": series.format,
"timestamp": timestamp,
"aliases": "\n".join(series.aliases),
}
self.upsert(cur, "series", data)
def add_series_issues_info(self, source: TagOrigin, issues: list[GenericMetadata], complete: bool) -> None:
self.add_source(source)
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
timestamp = datetime.datetime.now()
# add in issues
for issue in issues:
data = {
"id": issue.issue_id,
"series_id": issue.series_id,
"source": source.id,
"name": issue.title,
"issue_number": issue.issue,
"volume": issue.volume,
"site_detail_url": issue.web_link,
"cover_date": f"{issue.year}-{issue.month}-{issue.day}",
"image_url": issue.cover_image,
"description": issue.description,
"timestamp": timestamp,
"aliases": "\n".join(issue.title_aliases),
"alt_image_urls": "\n".join(issue.alternate_images),
"characters": "\n".join(issue.characters),
"locations": "\n".join(issue.locations),
"teams": "\n".join(issue.teams),
"story_arcs": "\n".join(issue.story_arcs),
"genres": "\n".join(issue.genres),
"tags": "\n".join(issue.tags),
"critical_rating": issue.critical_rating,
"manga": issue.manga,
"maturity_rating": issue.maturity_rating,
"language": issue.language,
"country": issue.country,
"credits": json.dumps(issue.credits),
"complete": complete,
}
self.upsert(cur, "issues", data)
def add_source(self, source: TagOrigin) -> None:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
con.text_factory = str
self.upsert(
cur,
"source",
{
"id": source.id,
"name": source.name,
},
)
def get_search_results(self, source: TagOrigin, search_term: str) -> list[ComicSeries]:
results = []
con = sqlite3.connect(self.db_file)
con.row_factory = sqlite3.Row
with con:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
con.text_factory = str
cur = con.cursor()
cur.execute(
"SELECT * FROM SeriesSearchCache INNER JOIN Series on"
+ " SeriesSearchCache.id=Series.id AND SeriesSearchCache.source_name=Series.source_name"
+ " WHERE search_term=? AND SeriesSearchCache.source_name=?",
[search_term.casefold(), source_name],
+ " SeriesSearchCache.id=Series.id AND SeriesSearchCache.source=Series.source"
+ " WHERE search_term=? AND SeriesSearchCache.source=?",
[search_term.casefold(), source.id],
)
rows = cur.fetchall()
@ -198,9 +281,9 @@ class ComicCacher:
count_of_volumes=record["count_of_volumes"],
start_year=record["start_year"],
image_url=record["image_url"],
aliases=record["aliases"].strip().splitlines(),
aliases=utils.split(record["aliases"], "\n"),
description=record["description"],
genres=record["genres"].strip().splitlines(),
genres=utils.split(record["genres"], "\n"),
format=record["format"],
)
@ -208,90 +291,21 @@ class ComicCacher:
return results
def add_series_info(self, source_name: str, series_record: ComicSeries) -> None:
con = sqlite3.connect(self.db_file)
con.row_factory = sqlite3.Row
with con:
cur = con.cursor()
timestamp = datetime.datetime.now()
data = {
"id": series_record.id,
"source_name": source_name,
"name": series_record.name,
"publisher": series_record.publisher,
"count_of_issues": series_record.count_of_issues,
"count_of_volumes": series_record.count_of_volumes,
"start_year": series_record.start_year,
"image_url": series_record.image_url,
"description": series_record.description,
"genres": "\n".join(series_record.genres),
"format": series_record.format,
"timestamp": timestamp,
"aliases": "\n".join(series_record.aliases),
}
self.upsert(cur, "series", data)
def add_series_issues_info(self, source_name: str, series_issues: list[ComicIssue]) -> None:
con = sqlite3.connect(self.db_file)
con.row_factory = sqlite3.Row
with con:
cur = con.cursor()
timestamp = datetime.datetime.now()
# add in issues
for issue in series_issues:
data = {
"id": issue.id,
"series_id": issue.series.id,
"source_name": source_name,
"name": issue.name,
"issue_number": issue.issue_number,
"volume": issue.volume,
"site_detail_url": issue.site_detail_url,
"cover_date": issue.cover_date,
"image_url": issue.image_url,
"description": issue.description,
"timestamp": timestamp,
"aliases": "\n".join(issue.aliases),
"alt_image_urls": "\n".join(issue.alt_image_urls),
"characters": "\n".join(issue.characters),
"locations": "\n".join(issue.locations),
"teams": "\n".join(issue.teams),
"story_arcs": "\n".join(issue.story_arcs),
"genres": "\n".join(issue.genres),
"tags": "\n".join(issue.tags),
"critical_rating": issue.critical_rating,
"manga": issue.manga,
"maturity_rating": issue.maturity_rating,
"language": issue.language,
"country": issue.country,
"credits": json.dumps([dataclasses.asdict(x) for x in issue.credits]),
"complete": issue.complete,
}
self.upsert(cur, "issues", data)
def get_series_info(self, series_id: str, source_name: str, purge: bool = True) -> ComicSeries | None:
def get_series_info(self, series_id: str, source: TagOrigin, expire_stale: bool = True) -> ComicSeries | None:
result: ComicSeries | None = None
con = sqlite3.connect(self.db_file)
con.row_factory = sqlite3.Row
with con:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
con.text_factory = str
if purge:
if expire_stale:
# purge stale series info
a_week_ago = datetime.datetime.today() - datetime.timedelta(days=7)
cur.execute("DELETE FROM Series WHERE timestamp < ?", [str(a_week_ago)])
# fetch
cur.execute("SELECT * FROM Series WHERE id=? AND source_name=?", [series_id, source_name])
cur.execute("SELECT * FROM Series WHERE id=? AND source=?", [series_id, source.id])
row = cur.fetchone()
@ -307,17 +321,17 @@ class ComicCacher:
count_of_volumes=row["count_of_volumes"],
start_year=row["start_year"],
image_url=row["image_url"],
aliases=row["aliases"].strip().splitlines(),
aliases=utils.split(row["aliases"], "\n"),
description=row["description"],
genres=row["genres"].strip().splitlines(),
genres=utils.split(row["genres"], "\n"),
format=row["format"],
)
return result
def get_series_issues_info(self, series_id: str, source_name: str) -> list[ComicIssue]:
def get_series_issues_info(self, series_id: str, source: TagOrigin) -> list[tuple[GenericMetadata, bool]]:
# get_series_info should only fail if someone is doing something weird
series = self.get_series_info(series_id, source_name, False) or ComicSeries(
series = self.get_series_info(series_id, source, False) or ComicSeries(
id=series_id,
name="",
description="",
@ -330,9 +344,9 @@ class ComicCacher:
count_of_volumes=None,
format=None,
)
con = sqlite3.connect(self.db_file)
con.row_factory = sqlite3.Row
with con:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
con.text_factory = str
@ -342,54 +356,22 @@ class ComicCacher:
cur.execute("DELETE FROM Issues WHERE timestamp < ?", [str(a_week_ago)])
# fetch
results: list[ComicIssue] = []
results: list[tuple[GenericMetadata, bool]] = []
cur.execute("SELECT * FROM Issues WHERE series_id=? AND source_name=?", [series_id, source_name])
cur.execute("SELECT * FROM Issues WHERE series_id=? AND source=?", [series_id, source.id])
rows = cur.fetchall()
# now process the results
for row in rows:
credits = []
try:
for credit in json.loads(row["credits"]):
credits.append(Credit(**credit))
except Exception:
logger.exception("credits failed")
record = ComicIssue(
series=series,
credits=credits,
id=row["id"],
name=row["name"],
issue_number=row["issue_number"],
image_url=row["image_url"],
cover_date=row["cover_date"],
site_detail_url=row["site_detail_url"],
description=row["description"],
aliases=row["aliases"].strip().splitlines(),
alt_image_urls=row["alt_image_urls"].strip().splitlines(),
characters=row["characters"].strip().splitlines(),
locations=row["locations"].strip().splitlines(),
teams=row["teams"].strip().splitlines(),
story_arcs=row["story_arcs"].strip().splitlines(),
genres=row["genres"].strip().splitlines(),
tags=row["tags"].strip().splitlines(),
critical_rating=row["critical_rating"],
manga=row["manga"],
maturity_rating=row["maturity_rating"],
language=row["language"],
country=row["country"],
volume=row["volume"],
complete=bool(["complete"]),
)
record = self.map_row_metadata(row, series, source)
results.append(record)
return results
def get_issue_info(self, issue_id: int, source_name: str) -> ComicIssue | None:
con = sqlite3.connect(self.db_file)
con.row_factory = sqlite3.Row
with con:
def get_issue_info(self, issue_id: int, source: TagOrigin) -> tuple[GenericMetadata, bool] | None:
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
con.text_factory = str
@ -398,14 +380,14 @@ class ComicCacher:
a_week_ago = datetime.datetime.today() - datetime.timedelta(days=7)
cur.execute("DELETE FROM Issues WHERE timestamp < ?", [str(a_week_ago)])
cur.execute("SELECT * FROM Issues WHERE id=? AND source_name=?", [issue_id, source_name])
cur.execute("SELECT * FROM Issues WHERE id=? AND source=?", [issue_id, source.id])
row = cur.fetchone()
record = None
if row:
# get_series_info should only fail if someone is doing something weird
series = self.get_series_info(row["id"], source_name, False) or ComicSeries(
series = self.get_series_info(row["id"], source, False) or ComicSeries(
id=row["id"],
name="",
description="",
@ -419,42 +401,69 @@ class ComicCacher:
format=None,
)
# now process the results
credits = []
try:
for credit in json.loads(row["credits"]):
credits.append(Credit(**credit))
except Exception:
logger.exception("credits failed")
record = ComicIssue(
series=series,
credits=credits,
id=row["id"],
name=row["name"],
issue_number=row["issue_number"],
image_url=row["image_url"],
cover_date=row["cover_date"],
site_detail_url=row["site_detail_url"],
description=row["description"],
aliases=row["aliases"].strip().splitlines(),
alt_image_urls=row["alt_image_urls"].strip().splitlines(),
characters=row["characters"].strip().splitlines(),
locations=row["locations"].strip().splitlines(),
teams=row["teams"].strip().splitlines(),
story_arcs=row["story_arcs"].strip().splitlines(),
genres=row["genres"].strip().splitlines(),
tags=row["tags"].strip().splitlines(),
critical_rating=row["critical_rating"],
manga=row["manga"],
maturity_rating=row["maturity_rating"],
language=row["language"],
country=row["country"],
volume=row["volume"],
complete=bool(row["complete"]),
)
record = self.map_row_metadata(row, series, source)
return record
def get_source(self, source_id: str) -> TagOrigin:
con = sqlite3.connect(self.db_file)
with sqlite3.connect(self.db_file) as con:
con.row_factory = sqlite3.Row
cur = con.cursor()
con.text_factory = str
cur.execute("SELECT * FROM Source WHERE id=?", [source_id])
row = cur.fetchone()
return TagOrigin(row["id"], row["name"])
def map_row_metadata(
self, row: sqlite3.Row, series: ComicSeries, source: TagOrigin
) -> tuple[GenericMetadata, bool]:
day, month, year = utils.parse_date_str(row["cover_date"])
credits = []
try:
for credit in json.loads(row["credits"]):
credits.append(cast(Credit, credit))
except Exception:
logger.exception("credits failed")
return (
GenericMetadata(
tag_origin=source,
alternate_images=utils.split(row["alt_image_urls"], "\n"),
characters=utils.split(row["characters"], "\n"),
country=row["country"],
cover_image=row["image_url"],
credits=credits,
critical_rating=row["critical_rating"],
day=day,
description=row["description"],
genres=utils.split(row["genres"], "\n"),
issue=row["issue_number"],
issue_count=series.count_of_issues,
issue_id=row["id"],
language=row["language"],
locations=utils.split(row["locations"], "\n"),
manga=row["manga"],
maturity_rating=row["maturity_rating"],
month=month,
publisher=series.publisher,
series=series.name,
series_aliases=series.aliases,
series_id=series.id,
story_arcs=utils.split(row["story_arcs"], "\n"),
tags=set(utils.split(row["tags"], "\n")),
teams=utils.split(row["teams"], "\n"),
title=row["name"],
title_aliases=utils.split(row["aliases"], "\n"),
volume=row["volume"],
volume_count=series.count_of_volumes,
web_link=row["site_detail_url"],
year=year,
),
row["complete"],
)
def upsert(self, cur: sqlite3.Cursor, tablename: str, data: dict[str, Any]) -> None:
"""This does an insert if the given PK doesn't exist, and an
update it if does

View File

@ -19,8 +19,7 @@ from typing import Any, Callable
import settngs
from comicapi.genericmetadata import GenericMetadata
from comictalker.resulttypes import ComicIssue, ComicSeries
from comicapi.genericmetadata import ComicSeries, GenericMetadata, TagOrigin
from comictalker.talker_utils import fix_url
logger = logging.getLogger(__name__)
@ -108,6 +107,7 @@ class ComicTalker:
name: str = "Example"
id: str = "example"
origin: TagOrigin = TagOrigin(id, name)
website: str = "https://example.com"
logo_url: str = f"{website}/logo.png"
attribution: str = f"Metadata provided by <a href='{website}'>{name}</a>"
@ -153,6 +153,8 @@ class ComicTalker:
If the Talker does not use an API key it should validate that the URL works.
If the Talker does not use an API key or URL it should check that the source is available.
Caching MUST NOT be implemented on this function.
"""
raise NotImplementedError
@ -175,6 +177,8 @@ class ComicTalker:
A sensible amount of results should be returned.
Caching SHOULD be implemented on this function.
For example the `ComicVineTalker` stops requesting new pages after the results
become too different from the `series_name` by use of the `titles_match` function
provided by the `comicapi.utils` module, and only allows a maximum of 5 pages
@ -187,6 +191,9 @@ class ComicTalker:
"""
This function should return an instance of GenericMetadata for a single issue.
It is guaranteed that either `issue_id` or (`series_id` and `issue_number` is set).
Caching MUST be implemented on this function.
Below is an example of how this function might be implemented:
if issue_number and series_id:
@ -198,13 +205,20 @@ class ComicTalker:
"""
raise NotImplementedError
def fetch_issues_by_series(self, series_id: str) -> list[ComicIssue]:
def fetch_series(self, series_id: str) -> ComicSeries:
"""
This function should return an instance of ComicSeries from the given series ID.
Caching MUST be implemented on this function.
"""
raise NotImplementedError
def fetch_issues_in_series(self, series_id: str) -> list[GenericMetadata]:
"""Expected to return a list of issues with a given series ID"""
raise NotImplementedError
def fetch_issues_by_series_issue_num_and_year(
self, series_id_list: list[str], issue_number: str, year: int | None
) -> list[ComicIssue]:
) -> list[GenericMetadata]:
"""
This function should return a single issue for each series id in
the `series_id_list` and it should match the issue_number.
@ -213,5 +227,7 @@ class ComicTalker:
If there is no year given (`year` == None) or the Talker does not have issue publication info
return the results unfiltered.
Caching SHOULD be implemented on this function.
"""
raise NotImplementedError

View File

@ -1,59 +0,0 @@
from __future__ import annotations
import copy
import dataclasses
@dataclasses.dataclass
class Credit:
name: str
role: str
@dataclasses.dataclass
class ComicSeries:
aliases: list[str]
count_of_issues: int | None
count_of_volumes: int | None
description: str
id: str
image_url: str
name: str
publisher: str
start_year: int | None
genres: list[str]
format: str | None
def copy(self) -> ComicSeries:
return copy.deepcopy(self)
@dataclasses.dataclass
class ComicIssue:
aliases: list[str]
cover_date: str
description: str
id: str
image_url: str
issue_number: str
volume: str | None
critical_rating: float
maturity_rating: str
manga: str
genres: list[str]
tags: list[str]
name: str
language: str
country: str
site_detail_url: str
series: ComicSeries
alt_image_urls: list[str]
characters: list[str]
locations: list[str]
credits: list[Credit]
teams: list[str]
story_arcs: list[str]
complete: bool # Is this a complete ComicIssue? or is there more data to fetch
def copy(self) -> ComicIssue:
return copy.deepcopy(self)

View File

@ -18,11 +18,6 @@ import posixpath
import re
from urllib.parse import urlsplit
from comicapi import utils
from comicapi.genericmetadata import GenericMetadata
from comicapi.issuestring import IssueString
from comictalker.resulttypes import ComicIssue
logger = logging.getLogger(__name__)
@ -38,100 +33,7 @@ def fix_url(url: str) -> str:
return tmp_url.geturl()
def map_comic_issue_to_metadata(
issue_results: ComicIssue, source: str, remove_html_tables: bool = False, use_year_volume: bool = False
) -> GenericMetadata:
"""Maps ComicIssue to generic metadata"""
metadata = GenericMetadata()
metadata.is_empty = False
metadata.series = utils.xlate(issue_results.series.name)
metadata.issue = utils.xlate(IssueString(issue_results.issue_number).as_string())
# Rely on comic talker to validate this number
metadata.issue_count = utils.xlate_int(issue_results.series.count_of_issues)
if issue_results.series.format:
metadata.format = issue_results.series.format
metadata.volume = utils.xlate_int(issue_results.volume)
metadata.volume_count = utils.xlate_int(issue_results.series.count_of_volumes)
if issue_results.name:
metadata.title = utils.xlate(issue_results.name)
if issue_results.image_url:
metadata.cover_image = issue_results.image_url
if issue_results.series.publisher:
metadata.publisher = utils.xlate(issue_results.series.publisher)
if issue_results.cover_date:
metadata.day, metadata.month, metadata.year = utils.parse_date_str(issue_results.cover_date)
elif issue_results.series.start_year:
metadata.year = utils.xlate_int(issue_results.series.start_year)
metadata.comments = cleanup_html(issue_results.description, remove_html_tables)
if use_year_volume:
metadata.volume = issue_results.series.start_year
metadata.tag_origin = source
metadata.issue_id = issue_results.id
metadata.web_link = issue_results.site_detail_url
for person in issue_results.credits:
if person.role:
roles = person.role.split(",")
for role in roles:
# can we determine 'primary' from CV??
metadata.add_credit(person.name, role.title().strip(), False)
if issue_results.characters:
metadata.characters = ", ".join(issue_results.characters)
if issue_results.teams:
metadata.teams = ", ".join(issue_results.teams)
if issue_results.locations:
metadata.locations = ", ".join(issue_results.locations)
if issue_results.story_arcs:
metadata.story_arc = ", ".join(issue_results.story_arcs)
if issue_results.genres:
metadata.genre = ", ".join(issue_results.genres)
if issue_results.tags:
metadata.tags = set(issue_results.tags)
if issue_results.manga:
metadata.manga = issue_results.manga
if issue_results.critical_rating:
metadata.critical_rating = utils.xlate_float(issue_results.critical_rating)
if issue_results.maturity_rating:
metadata.maturity_rating = issue_results.maturity_rating
if issue_results.language:
metadata.language = issue_results.language
if issue_results.country:
metadata.country = issue_results.country
return metadata
def parse_date_str(date_str: str) -> tuple[int | None, int | None, int | None]:
day = None
month = None
year = None
if date_str:
parts = date_str.split("-")
year = utils.xlate_int(parts[0])
if len(parts) > 1:
month = utils.xlate_int(parts[1])
if len(parts) > 2:
day = utils.xlate_int(parts[2])
return day, month, year
def cleanup_html(string: str, remove_html_tables: bool = False) -> str:
def cleanup_html(string: str | None, remove_html_tables: bool = False) -> str:
"""Cleans HTML code from any text. Will remove any HTML tables with remove_html_tables"""
if string is None:
return ""
@ -195,13 +97,13 @@ def cleanup_html(string: str, remove_html_tables: bool = False) -> str:
for row in table.findAll("tr"):
cols = []
col = row.findAll("td")
i = 0
for c in col:
for i, c in enumerate(col):
item = c.string.strip()
cols.append(item)
if len(item) > col_widths[i]:
col_widths[i] = len(item)
i += 1
if len(cols) != 0:
rows.append(cols)
# now we have the data, make it into text
@ -209,15 +111,14 @@ def cleanup_html(string: str, remove_html_tables: bool = False) -> str:
for w in col_widths:
fmtstr += f" {{:{w + 1}}}|"
table_text = ""
counter = 0
for row in rows:
for counter, row in enumerate(rows):
table_text += fmtstr.format(*row) + "\n"
if counter == 0 and len(hdrs) != 0:
table_text += "|"
for w in col_widths:
table_text += "-" * (w + 2) + "|"
table_text += "\n"
counter += 1
table_strings.append(table_text + "\n")

View File

@ -29,13 +29,12 @@ import settngs
from pyrate_limiter import Limiter, RequestRate
from typing_extensions import Required, TypedDict
import comictalker.talker_utils as talker_utils
from comicapi import utils
from comicapi.genericmetadata import GenericMetadata
from comicapi.genericmetadata import ComicSeries, GenericMetadata, TagOrigin
from comicapi.issuestring import IssueString
from comictalker import talker_utils
from comictalker.comiccacher import ComicCacher
from comictalker.comictalker import ComicTalker, TalkerDataError, TalkerNetworkError
from comictalker.resulttypes import ComicIssue, ComicSeries, Credit
logger = logging.getLogger(__name__)
@ -60,27 +59,27 @@ class CVImage(TypedDict, total=False):
class CVAltImage(TypedDict):
original_url: str
id: int
id: Required[int]
caption: str
image_tags: str
class CVPublisher(TypedDict, total=False):
api_detail_url: str
id: int
id: Required[int]
name: Required[str]
class CVCredit(TypedDict):
api_detail_url: str
id: int
id: Required[int]
name: str
site_detail_url: str
class CVPersonCredit(TypedDict):
api_detail_url: str
id: int
id: Required[int]
name: str
site_detail_url: str
role: str
@ -92,7 +91,7 @@ class CVSeries(TypedDict):
aliases: str
count_of_issues: int
description: str
id: int
id: Required[int]
image: CVImage
name: str
publisher: CVPublisher
@ -122,7 +121,7 @@ class CVIssue(TypedDict, total=False):
first_appearance_storyarcs: None
first_appearance_teams: None
has_staff_review: bool
id: int
id: Required[int]
image: CVImage
issue_number: str
location_credits: list[CVCredit]
@ -134,7 +133,7 @@ class CVIssue(TypedDict, total=False):
story_arc_credits: list[CVCredit]
team_credits: list[CVCredit]
team_disbanded_in: None
volume: CVSeries # CV uses volume to mean series
volume: Required[CVSeries] # CV uses volume to mean series
T = TypeVar("T", CVIssue, CVSeries, list[CVSeries], list[CVIssue])
@ -160,6 +159,7 @@ default_limiter = Limiter(RequestRate(1, 5))
class ComicVineTalker(ComicTalker):
name: str = "Comic Vine"
id: str = "comicvine"
origin: TagOrigin = TagOrigin(id, name)
website: str = "https://comicvine.gamespot.com"
logo_url: str = f"{website}/a/bundles/comicvinesite/images/logo.png"
attribution: str = f"Metadata provided by <a href='{website}'>{name}</a>"
@ -170,7 +170,6 @@ class ComicVineTalker(ComicTalker):
# Default settings
self.default_api_url = self.api_url = f"{self.website}/api/"
self.default_api_key = self.api_key = "27431e6787042105bd3e47e169a624521f89f3a4"
self.remove_html_tables: bool = False
self.use_series_start_as_volume: bool = False
def register_settings(self, parser: settngs.Manager) -> None:
@ -181,13 +180,6 @@ class ComicVineTalker(ComicTalker):
display_name="Use series start as volume",
help="Use the series start year as the volume number",
)
parser.add_setting(
"--cv-remove-html-tables",
default=False,
action=argparse.BooleanOptionalAction,
display_name="Remove HTML tables",
help="Removes html tables instead of converting them to text",
)
# The default needs to be unset or None.
# This allows this setting to be unset with the empty string, allowing the default to change
@ -206,7 +198,6 @@ class ComicVineTalker(ComicTalker):
settings = super().parse_settings(settings)
self.use_series_start_as_volume = settings["cv_use_series_start_as_volume"]
self.remove_html_tables = settings["cv_remove_html_tables"]
# Set a different limit if using the default API key
if self.api_key == self.default_api_key:
@ -253,7 +244,7 @@ class ComicVineTalker(ComicTalker):
# For literal searches always retrieve from online
cvc = ComicCacher(self.cache_folder, self.version)
if not refresh_cache and not literal:
cached_search_results = cvc.get_search_results(self.id, series_name)
cached_search_results = cvc.get_search_results(self.origin, series_name)
if len(cached_search_results) > 0:
return cached_search_results
@ -321,12 +312,12 @@ class ComicVineTalker(ComicTalker):
if callback is not None:
callback(current_result_count, total_result_count)
# Format result to ComicIssue
# Format result to GenericMetadata
formatted_search_results = self._format_search_results(search_results)
# Cache these search results, even if it's literal we cache the results
# The most it will cause is extra processing time
cvc.add_search_results(self.id, series_name, formatted_search_results)
cvc.add_search_results(self.origin, series_name, formatted_search_results)
return formatted_search_results
@ -341,53 +332,15 @@ class ComicVineTalker(ComicTalker):
return comic_data
def fetch_issues_by_series(self, series_id: str) -> list[ComicIssue]:
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher(self.cache_folder, self.version)
cached_series_issues_result = cvc.get_series_issues_info(series_id, self.id)
def fetch_series(self, series_id: str) -> ComicSeries:
return self._fetch_series_data(int(series_id))
series_data = self._fetch_series_data(int(series_id))
if len(cached_series_issues_result) == series_data.count_of_issues:
return cached_series_issues_result
params = { # CV uses volume to mean series
"api_key": self.api_key,
"filter": f"volume:{series_id}",
"format": "json",
"field_list": "id,volume,issue_number,name,image,cover_date,site_detail_url,description,aliases,associated_images",
"offset": 0,
}
cv_response: CVResult[list[CVIssue]] = self._get_cv_content(urljoin(self.api_url, "issues/"), params)
current_result_count = cv_response["number_of_page_results"]
total_result_count = cv_response["number_of_total_results"]
series_issues_result = cv_response["results"]
page = 1
offset = 0
# see if we need to keep asking for more pages...
while current_result_count < total_result_count:
page += 1
offset += cv_response["number_of_page_results"]
params["offset"] = offset
cv_response = self._get_cv_content(urljoin(self.api_url, "issues/"), params)
series_issues_result.extend(cv_response["results"])
current_result_count += cv_response["number_of_page_results"]
# Format to expected output
formatted_series_issues_result = self._format_issue_results(series_issues_result)
cvc.add_series_issues_info(self.id, formatted_series_issues_result)
return formatted_series_issues_result
def fetch_issues_in_series(self, series_id: str) -> list[GenericMetadata]:
return [x[0] for x in self._fetch_issues_in_series(series_id)]
def fetch_issues_by_series_issue_num_and_year(
self, series_id_list: list[str], issue_number: str, year: str | int | None
) -> list[ComicIssue]:
) -> list[GenericMetadata]:
series_filter = ""
for vid in series_id_list:
series_filter += str(vid) + "|"
@ -424,7 +377,10 @@ class ComicVineTalker(ComicTalker):
filtered_issues_result.extend(cv_response["results"])
current_result_count += cv_response["number_of_page_results"]
formatted_filtered_issues_result = self._format_issue_results(filtered_issues_result)
formatted_filtered_issues_result = [
self.map_comic_issue_to_metadata(x, self._fetch_series_data(x["volume"]["id"]))
for x in filtered_issues_result
]
return formatted_filtered_issues_result
@ -446,17 +402,17 @@ class ComicVineTalker(ComicTalker):
def _get_url_content(self, url: str, params: dict[str, Any]) -> Any:
# if there is a 500 error, try a few more times before giving up
limit_counter = 0
tries = 0
while tries < 4:
for tries in range(1, 5):
try:
resp = requests.get(url, params=params, headers={"user-agent": "comictagger/" + self.version})
if resp.status_code == 200:
return resp.json()
if resp.status_code == 500:
logger.debug(f"Try #{tries + 1}: ")
logger.debug(f"Try #{tries}: ")
time.sleep(1)
logger.debug(str(resp.status_code))
tries += 1
if resp.status_code == requests.status_codes.codes.TOO_MANY_REQUESTS:
logger.info(f"{self.name} rate limit encountered. Waiting for 10 seconds\n")
time.sleep(10)
@ -504,7 +460,7 @@ class ComicVineTalker(ComicTalker):
formatted_results.append(
ComicSeries(
aliases=aliases.splitlines(),
aliases=utils.split(aliases, "\n"),
count_of_issues=record.get("count_of_issues", 0),
count_of_volumes=None,
description=record.get("description", ""),
@ -520,81 +476,55 @@ class ComicVineTalker(ComicTalker):
return formatted_results
def _format_issue_results(self, issue_results: list[CVIssue], complete: bool = False) -> list[ComicIssue]:
formatted_results = []
for record in issue_results:
# Extract image super
if record.get("image") is None:
image_url = ""
else:
image_url = record["image"].get("super_url", "")
def _fetch_issues_in_series(self, series_id: str) -> list[tuple[GenericMetadata, bool]]:
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher(self.cache_folder, self.version)
cached_series_issues_result = cvc.get_series_issues_info(series_id, self.origin)
alt_images_list = []
for alt in record["associated_images"]:
alt_images_list.append(alt["original_url"])
series = self._fetch_series_data(int(series_id))
character_list = []
if record.get("character_credits"):
for char in record["character_credits"]:
character_list.append(char["name"])
if len(cached_series_issues_result) == series.count_of_issues:
# Remove internal "complete" bool
return cached_series_issues_result
location_list = []
if record.get("location_credits"):
for loc in record["location_credits"]:
location_list.append(loc["name"])
params = { # CV uses volume to mean series
"api_key": self.api_key,
"filter": f"volume:{series_id}",
"format": "json",
"offset": 0,
}
cv_response: CVResult[list[CVIssue]] = self._get_cv_content(urljoin(self.api_url, "issues/"), params)
teams_list = []
if record.get("team_credits"):
for loc in record["team_credits"]:
teams_list.append(loc["name"])
current_result_count = cv_response["number_of_page_results"]
total_result_count = cv_response["number_of_total_results"]
story_list = []
if record.get("story_arc_credits"):
for loc in record["story_arc_credits"]:
story_list.append(loc["name"])
series_issues_result = cv_response["results"]
page = 1
offset = 0
persons_list = []
if record.get("person_credits"):
for person in record["person_credits"]:
persons_list.append(Credit(name=person["name"], role=person["role"]))
# see if we need to keep asking for more pages...
while current_result_count < total_result_count:
page += 1
offset += cv_response["number_of_page_results"]
series = self._fetch_series_data(record["volume"]["id"])
params["offset"] = offset
cv_response = self._get_cv_content(urljoin(self.api_url, "issues/"), params)
formatted_results.append(
ComicIssue(
aliases=record["aliases"].split("\n") if record["aliases"] else [],
cover_date=record.get("cover_date", ""),
description=record.get("description", ""),
id=str(record["id"]),
image_url=image_url,
issue_number=record["issue_number"],
volume=None,
name=record["name"],
site_detail_url=record.get("site_detail_url", ""),
series=series, # CV uses volume to mean series
alt_image_urls=alt_images_list,
characters=character_list,
locations=location_list,
teams=teams_list,
story_arcs=story_list,
critical_rating=0,
maturity_rating="",
manga="",
language="",
country="",
genres=[],
tags=[],
credits=persons_list,
complete=complete,
)
)
series_issues_result.extend(cv_response["results"])
current_result_count += cv_response["number_of_page_results"]
# Format to expected output
formatted_series_issues_result = [
self.map_comic_issue_to_metadata(x, self._fetch_series_data(x["volume"]["id"]))
for x in series_issues_result
]
return formatted_results
cvc.add_series_issues_info(self.origin, formatted_series_issues_result, False)
return [(x, False) for x in formatted_series_issues_result]
def _fetch_series_data(self, series_id: int) -> ComicSeries:
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher(self.cache_folder, self.version)
cached_series_result = cvc.get_series_info(str(series_id), self.id)
cached_series_result = cvc.get_series_info(str(series_id), self.origin)
if cached_series_result is not None:
return cached_series_result
@ -611,47 +541,37 @@ class ComicVineTalker(ComicTalker):
formatted_series_results = self._format_search_results([series_results])
if series_results:
cvc.add_series_info(self.id, formatted_series_results[0])
cvc.add_series_info(self.origin, formatted_series_results[0])
return formatted_series_results[0]
def _fetch_issue_data(self, series_id: int, issue_number: str) -> GenericMetadata:
issues_list_results = self.fetch_issues_by_series(str(series_id))
issues_list_results = self._fetch_issues_in_series(str(series_id))
# Loop through issue list to find the required issue info
f_record = None
f_record = (GenericMetadata(), False)
for record in issues_list_results:
if not IssueString(issue_number).as_string():
issue_number = "1"
if (
IssueString(record.issue_number).as_string().casefold()
== IssueString(issue_number).as_string().casefold()
):
if IssueString(record[0].issue).as_string().casefold() == IssueString(issue_number).as_string().casefold():
f_record = record
break
if f_record and f_record.complete:
if not f_record[0].is_empty and f_record[1]:
# Cache had full record
return talker_utils.map_comic_issue_to_metadata(
f_record, self.name, self.remove_html_tables, self.use_series_start_as_volume
)
return f_record[0]
if f_record is not None:
return self._fetch_issue_data_by_issue_id(f_record.id)
if f_record[0].issue_id is not None:
return self._fetch_issue_data_by_issue_id(f_record[0].issue_id)
return GenericMetadata()
def _fetch_issue_data_by_issue_id(self, issue_id: str) -> GenericMetadata:
# before we search online, look in our cache, since we might already have this info
cvc = ComicCacher(self.cache_folder, self.version)
cached_issues_result = cvc.get_issue_info(int(issue_id), self.id)
cached_issues_result = cvc.get_issue_info(int(issue_id), self.origin)
if cached_issues_result and cached_issues_result.complete:
return talker_utils.map_comic_issue_to_metadata(
cached_issues_result,
self.name,
self.remove_html_tables,
self.use_series_start_as_volume,
)
if cached_issues_result and cached_issues_result[1]:
return cached_issues_result[0]
issue_url = urljoin(self.api_url, f"issue/{CVTypeID.Issue}-{issue_id}")
params = {"api_key": self.api_key, "format": "json"}
@ -660,17 +580,68 @@ class ComicVineTalker(ComicTalker):
issue_results = cv_response["results"]
# Format to expected output
cv_issues = self._format_issue_results([issue_results], True)
# Due to issue not returning publisher, fetch the series.
cv_issues[0].series = self._fetch_series_data(int(cv_issues[0].series.id))
cvc.add_series_issues_info(self.id, cv_issues)
# Now, map the ComicIssue data to generic metadata
return talker_utils.map_comic_issue_to_metadata(
cv_issues[0],
self.name,
self.remove_html_tables,
self.use_series_start_as_volume,
cv_issues = self.map_comic_issue_to_metadata(
issue_results, self._fetch_series_data(int(issue_results["volume"]["id"]))
)
cvc.add_series_issues_info(self.origin, [cv_issues], True)
# Now, map the GenericMetadata data to generic metadata
return cv_issues
def map_comic_issue_to_metadata(self, issue: CVIssue, series: ComicSeries) -> GenericMetadata:
md = GenericMetadata(
tag_origin=self.origin,
issue_id=utils.xlate(issue.get("id")),
series_id=series.id,
title_aliases=utils.split(issue.get("aliases"), "\n"),
publisher=utils.xlate(series.publisher),
description=issue.get("description"),
issue=utils.xlate(IssueString(issue.get("issue_number")).as_string()),
issue_count=utils.xlate_int(series.count_of_issues),
format=utils.xlate(series.format),
volume_count=utils.xlate_int(series.count_of_volumes),
title=utils.xlate(issue.get("name")),
web_link=utils.xlate(issue.get("site_detail_url")),
series=utils.xlate(series.name),
series_aliases=series.aliases,
)
if issue.get("image") is None:
md.cover_image = ""
else:
md.cover_image = issue.get("image", {}).get("super_url", "")
md.alternate_images = []
for alt in issue.get("associated_images", []):
md.alternate_images.append(alt["original_url"])
md.characters = []
for character in issue.get("character_credits", []):
md.characters.append(character["name"])
md.locations = []
for location in issue.get("location_credits", []):
md.locations.append(location["name"])
md.teams = []
for team in issue.get("team_credits", []):
md.teams.append(team["name"])
md.story_arcs = []
for arc in issue.get("story_arc_credits", []):
md.story_arcs.append(arc["name"])
for person in issue.get("person_credits", []):
md.add_credit(person["name"], person["role"].title().strip(), False)
md.volume = utils.xlate_int(issue.get("volume"))
if self.use_series_start_as_volume:
md.volume = series.start_year
series = self._fetch_series_data(issue["volume"]["id"])
if issue.get("cover_date"):
md.day, md.month, md.year = utils.parse_date_str(issue.get("cover_date"))
elif series.start_year:
md.year = utils.xlate_int(series.start_year)
return md

View File

@ -1,11 +1,10 @@
from __future__ import annotations
import comicapi.genericmetadata
import comictalker.resulttypes
from comicapi import utils
search_results = [
comictalker.resulttypes.ComicSeries(
comicapi.genericmetadata.ComicSeries(
count_of_issues=1,
count_of_volumes=1,
description="this is a description",
@ -18,7 +17,7 @@ search_results = [
genres=[],
format=None,
),
comictalker.resulttypes.ComicSeries(
comicapi.genericmetadata.ComicSeries(
count_of_issues=1,
count_of_volumes=1,
description="this is a description",

View File

@ -4,8 +4,6 @@ from typing import Any
import comicapi.genericmetadata
from comicapi import utils
from comictalker.resulttypes import ComicIssue, ComicSeries
from comictalker.talker_utils import cleanup_html
def filter_field_list(cv_result, kwargs):
@ -158,62 +156,56 @@ cv_not_found = {
"status_code": 101,
"results": [],
}
comic_issue_result = ComicIssue(
aliases=cv_issue_result["results"]["aliases"] or [],
cover_date=cv_issue_result["results"]["cover_date"],
description=cv_issue_result["results"]["description"],
id=str(cv_issue_result["results"]["id"]),
image_url=cv_issue_result["results"]["image"]["super_url"],
issue_number=cv_issue_result["results"]["issue_number"],
volume=None,
name=cv_issue_result["results"]["name"],
site_detail_url=cv_issue_result["results"]["site_detail_url"],
series=ComicSeries(
id=str(cv_issue_result["results"]["volume"]["id"]),
name=cv_issue_result["results"]["volume"]["name"],
aliases=[],
count_of_issues=cv_volume_result["results"]["count_of_issues"],
count_of_volumes=None,
description=cv_volume_result["results"]["description"],
image_url=cv_volume_result["results"]["image"]["super_url"],
publisher=cv_volume_result["results"]["publisher"]["name"],
start_year=int(cv_volume_result["results"]["start_year"]),
genres=[],
format=None,
),
characters=[],
alt_image_urls=[],
complete=False,
credits=[],
locations=[],
story_arcs=[],
critical_rating=0,
maturity_rating="",
manga="",
language="",
country="",
comic_series_result = comicapi.genericmetadata.ComicSeries(
id=str(cv_issue_result["results"]["volume"]["id"]),
name=cv_issue_result["results"]["volume"]["name"],
aliases=[],
count_of_issues=cv_volume_result["results"]["count_of_issues"],
count_of_volumes=None,
description=cv_volume_result["results"]["description"],
image_url=cv_volume_result["results"]["image"]["super_url"],
publisher=cv_volume_result["results"]["publisher"]["name"],
start_year=int(cv_volume_result["results"]["start_year"]),
genres=[],
tags=[],
teams=[],
format=None,
)
date = utils.parse_date_str(cv_issue_result["results"]["cover_date"])
comic_issue_result = comicapi.genericmetadata.GenericMetadata(
tag_origin=comicapi.genericmetadata.TagOrigin("comicvine", "Comic Vine"),
title_aliases=cv_issue_result["results"]["aliases"] or [],
month=date[1],
year=date[2],
day=date[0],
description=cv_issue_result["results"]["description"],
publisher=cv_volume_result["results"]["publisher"]["name"],
issue_count=cv_volume_result["results"]["count_of_issues"],
issue_id=str(cv_issue_result["results"]["id"]),
series=cv_issue_result["results"]["volume"]["name"],
series_id=str(cv_issue_result["results"]["volume"]["id"]),
cover_image=cv_issue_result["results"]["image"]["super_url"],
issue=cv_issue_result["results"]["issue_number"],
volume=None,
title=cv_issue_result["results"]["name"],
web_link=cv_issue_result["results"]["site_detail_url"],
)
cv_md = comicapi.genericmetadata.GenericMetadata(
is_empty=False,
tag_origin="Comic Vine",
tag_origin=comicapi.genericmetadata.TagOrigin("comicvine", "Comic Vine"),
issue_id=str(cv_issue_result["results"]["id"]),
series=cv_issue_result["results"]["volume"]["name"],
series_id=str(cv_issue_result["results"]["volume"]["id"]),
issue=cv_issue_result["results"]["issue_number"],
title=cv_issue_result["results"]["name"],
publisher=cv_volume_result["results"]["publisher"]["name"],
month=date[1],
year=date[2],
day=date[0],
issue_count=6,
issue_count=cv_volume_result["results"]["count_of_issues"],
volume=None,
genre=None,
genres=[],
language=None,
comments=cleanup_html(cv_issue_result["results"]["description"], False),
description=cv_issue_result["results"]["description"],
volume_count=None,
critical_rating=None,
country=None,
@ -228,14 +220,14 @@ cv_md = comicapi.genericmetadata.GenericMetadata(
black_and_white=None,
page_count=None,
maturity_rating=None,
story_arc=None,
series_group=None,
story_arcs=[],
series_groups=[],
scan_info=None,
characters=None,
teams=None,
locations=None,
characters=[],
teams=[],
locations=[],
credits=[
comicapi.genericmetadata.CreditMetadata(person=x["name"], role=x["role"].title(), primary=False)
comicapi.genericmetadata.Credit(person=x["name"], role=x["role"].title(), primary=False)
for x in cv_issue_result["results"]["person_credits"]
],
tags=set(),

View File

@ -872,7 +872,7 @@ rnames = [
does_not_raise(),
),
(
"{series} #{issue} - {locations} ({year})",
"{series} #{issue} - {locations!j} ({year})",
False,
"universal",
"Cory Doctorow's Futuristic Tales of the Here and Now #001 - lonely cottage (2007).cbz",

View File

@ -36,9 +36,9 @@ def test_page_type_read(cbz):
assert isinstance(md.pages[0]["Type"], str)
def test_metadata_read(cbz):
def test_metadata_read(cbz, md_saved):
md = cbz.read_cix()
assert md == comicapi.genericmetadata.md_test
assert md == md_saved
def test_save_cix(tmp_comic):
@ -73,7 +73,7 @@ def test_save_cix_rar(tmp_path):
@pytest.mark.xfail(not (comicapi.archivers.rar.rar_support and shutil.which("rar")), reason="rar support")
def test_save_cbi_rar(tmp_path):
def test_save_cbi_rar(tmp_path, md_saved):
cbr_path = datadir / "fake_cbr.cbr"
shutil.copy(cbr_path, tmp_path)
@ -82,7 +82,7 @@ def test_save_cbi_rar(tmp_path):
assert tmp_comic.write_cbi(comicapi.genericmetadata.md_test)
md = tmp_comic.read_cbi()
assert md.replace(pages=[]) == comicapi.genericmetadata.md_test.replace(
assert md.replace(pages=[]) == md_saved.replace(
pages=[],
day=None,
alternate_series=None,
@ -136,7 +136,7 @@ for x in entry_points(group="comicapi.archiver"):
@pytest.mark.parametrize("archiver", archivers)
def test_copy_from_archive(archiver, tmp_path, cbz):
def test_copy_from_archive(archiver, tmp_path, cbz, md_saved):
comic_path = tmp_path / cbz.path.with_suffix("").name
archive = archiver.open(comic_path)
@ -149,7 +149,7 @@ def test_copy_from_archive(archiver, tmp_path, cbz):
assert set(cbz.archiver.get_filename_list()) == set(comic_archive.archiver.get_filename_list())
md = comic_archive.read_cix()
assert md == comicapi.genericmetadata.md_test
assert md == md_saved
def test_rename(tmp_comic, tmp_path):

View File

@ -3,6 +3,7 @@ from __future__ import annotations
import pytest
import comictalker.comiccacher
from comicapi.genericmetadata import TagOrigin
from testing.comicdata import search_results
@ -13,13 +14,13 @@ def test_create_cache(config, mock_version):
def test_search_results(comic_cache):
comic_cache.add_search_results("test", "test search", search_results)
assert search_results == comic_cache.get_search_results("test", "test search")
comic_cache.add_search_results(TagOrigin("test", "test"), "test search", search_results)
assert search_results == comic_cache.get_search_results(TagOrigin("test", "test"), "test search")
@pytest.mark.parametrize("series_info", search_results)
def test_series_info(comic_cache, series_info):
comic_cache.add_series_info(series_record=series_info, source_name="test")
comic_cache.add_series_info(series=series_info, source=TagOrigin("test", "test"))
vi = series_info.copy()
cache_result = comic_cache.get_series_info(series_id=series_info.id, source_name="test")
cache_result = comic_cache.get_series_info(series_id=series_info.id, source=TagOrigin("test", "test"))
assert vi == cache_result

View File

@ -1,7 +1,5 @@
from __future__ import annotations
import dataclasses
import pytest
import comicapi.genericmetadata
@ -11,7 +9,7 @@ import testing.comicvine
def test_search_for_series(comicvine_api, comic_cache):
results = comicvine_api.search_for_series("cory doctorows futuristic tales of the here and now")
cache_issues = comic_cache.get_search_results(
comicvine_api.id, "cory doctorows futuristic tales of the here and now"
comicvine_api.origin, "cory doctorows futuristic tales of the here and now"
)
assert results == cache_issues
@ -20,16 +18,16 @@ def test_fetch_series_data(comicvine_api, comic_cache):
result = comicvine_api._fetch_series_data(23437)
# del result["description"]
# del result["image_url"]
cache_result = comic_cache.get_series_info(23437, comicvine_api.id)
cache_result = comic_cache.get_series_info(23437, comicvine_api.origin)
# del cache_result["description"]
# del cache_result["image_url"]
assert result == cache_result
def test_fetch_issues_by_series(comicvine_api, comic_cache):
results = comicvine_api.fetch_issues_by_series(23437)
cache_issues = comic_cache.get_series_issues_info(23437, comicvine_api.id)
assert dataclasses.asdict(results[0])["series"] == dataclasses.asdict(cache_issues[0])["series"]
def test_fetch_issues_in_series(comicvine_api, comic_cache):
results = comicvine_api.fetch_issues_in_series(23437)
cache_issues = comic_cache.get_series_issues_info(23437, comicvine_api.origin)
assert results[0] == cache_issues[0][0]
def test_fetch_issue_data_by_issue_id(comicvine_api):
@ -38,7 +36,7 @@ def test_fetch_issue_data_by_issue_id(comicvine_api):
assert result == testing.comicvine.cv_md
def test_fetch_issues_by_series_issue_num_and_year(comicvine_api):
def test_fetch_issues_in_series_issue_num_and_year(comicvine_api):
results = comicvine_api.fetch_issues_by_series_issue_num_and_year([23437], "1", None)
cv_expected = testing.comicvine.comic_issue_result.copy()

View File

@ -11,6 +11,7 @@ import pytest
import requests
import settngs
from PIL import Image
from pyrate_limiter import Limiter, RequestRate
import comicapi.comicarchive
import comicapi.genericmetadata
@ -111,6 +112,8 @@ def comicvine_api(monkeypatch, cbz, comic_cache, mock_version, config) -> comict
# apply the monkeypatch for requests.get to mock_get
monkeypatch.setattr(requests, "get", m_get)
monkeypatch.setattr(comictalker.talkers.comicvine, "custom_limiter", Limiter(RequestRate(100, 1)))
monkeypatch.setattr(comictalker.talkers.comicvine, "default_limiter", Limiter(RequestRate(100, 1)))
cv = comictalker.talkers.comicvine.ComicVineTalker(
version=mock_version[0],
@ -141,6 +144,11 @@ def md():
yield comicapi.genericmetadata.md_test.copy()
@pytest.fixture
def md_saved():
yield comicapi.genericmetadata.md_test.replace(tag_origin=None, issue_id=None, series_id=None)
# manually seeds publishers
@pytest.fixture
def seed_publishers(monkeypatch):

View File

@ -26,7 +26,7 @@ def test_add_credit():
md = comicapi.genericmetadata.GenericMetadata()
md.add_credit(person="test", role="writer", primary=False)
assert md.credits == [comicapi.genericmetadata.CreditMetadata(person="test", role="writer", primary=False)]
assert md.credits == [comicapi.genericmetadata.Credit(person="test", role="writer", primary=False)]
def test_add_credit_primary():
@ -34,7 +34,7 @@ def test_add_credit_primary():
md.add_credit(person="test", role="writer", primary=False)
md.add_credit(person="test", role="writer", primary=True)
assert md.credits == [comicapi.genericmetadata.CreditMetadata(person="test", role="writer", primary=True)]
assert md.credits == [comicapi.genericmetadata.Credit(person="test", role="writer", primary=True)]
@pytest.mark.parametrize("md, role, expected", credits)

View File

@ -5,27 +5,27 @@ import comicapi.comicinfoxml
import comicapi.genericmetadata
def test_cix():
def test_cix(md_saved):
CIX = comicapi.comicinfoxml.ComicInfoXml()
string = CIX.string_from_metadata(comicapi.genericmetadata.md_test)
md = CIX.metadata_from_string(string)
assert md == comicapi.genericmetadata.md_test
assert md == md_saved
def test_cbi():
def test_cbi(md_saved):
CBI = comicapi.comicbookinfo.ComicBookInfo()
string = CBI.string_from_metadata(comicapi.genericmetadata.md_test)
md = CBI.metadata_from_string(string)
md_test = comicapi.genericmetadata.md_test.replace(
md_test = md_saved.replace(
day=None,
page_count=None,
maturity_rating=None,
story_arc=None,
series_group=None,
story_arcs=[],
series_groups=[],
scan_info=None,
characters=None,
teams=None,
locations=None,
characters=[],
teams=[],
locations=[],
pages=[],
alternate_series=None,
alternate_number=None,
@ -39,17 +39,17 @@ def test_cbi():
assert md == md_test
def test_comet():
def test_comet(md_saved):
CBI = comicapi.comet.CoMet()
string = CBI.string_from_metadata(comicapi.genericmetadata.md_test)
md = CBI.metadata_from_string(string)
md_test = comicapi.genericmetadata.md_test.replace(
md_test = md_saved.replace(
day=None,
story_arc=None,
series_group=None,
story_arcs=[],
series_groups=[],
scan_info=None,
teams=None,
locations=None,
teams=[],
locations=[],
pages=[],
alternate_series=None,
alternate_number=None,