Refactor cli.py into a class
This commit is contained in:
parent
da21dc110d
commit
17d865b72f
@ -19,7 +19,6 @@ from __future__ import annotations
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from pprint import pprint
|
||||
@ -38,221 +37,182 @@ from comictalker.talkerbase import ComicTalker, TalkerError
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def actual_issue_data_fetch(issue_id: int, options: settings.OptionValues, talker_api: ComicTalker) -> GenericMetadata:
|
||||
# now get the particular issue data
|
||||
try:
|
||||
ct_md = talker_api.fetch_comic_data(issue_id)
|
||||
except TalkerError as e:
|
||||
logger.exception(f"Error retrieving issue details. Save aborted.\n{e}")
|
||||
return GenericMetadata()
|
||||
class CLI:
|
||||
def __init__(self, options: settings.OptionValues, talker_api: ComicTalker):
|
||||
self.options = options
|
||||
self.talker_api = talker_api
|
||||
self.batch_mode = False
|
||||
|
||||
if options["cbl"]["apply_cbl_transform_on_cv_import"]:
|
||||
ct_md = CBLTransformer(ct_md, options).apply()
|
||||
def actual_issue_data_fetch(self, issue_id: int) -> GenericMetadata:
|
||||
# now get the particular issue data
|
||||
try:
|
||||
ct_md = self.talker_api.fetch_comic_data(issue_id)
|
||||
except TalkerError as e:
|
||||
logger.exception(f"Error retrieving issue details. Save aborted.\n{e}")
|
||||
return GenericMetadata()
|
||||
|
||||
return ct_md
|
||||
if self.options["cbl"]["apply_cbl_transform_on_cv_import"]:
|
||||
ct_md = CBLTransformer(ct_md, self.options).apply()
|
||||
|
||||
return ct_md
|
||||
|
||||
def actual_metadata_save(ca: ComicArchive, options: settings.OptionValues, md: GenericMetadata) -> bool:
|
||||
if not options["runtime"]["dryrun"]:
|
||||
for metadata_style in options["runtime"]["type"]:
|
||||
# write out the new data
|
||||
if not ca.write_metadata(md, metadata_style):
|
||||
logger.error("The tag save seemed to fail for style: %s!", MetaDataStyle.name[metadata_style])
|
||||
return False
|
||||
def actual_metadata_save(self, ca: ComicArchive, md: GenericMetadata) -> bool:
|
||||
if not self.options["runtime"]["dryrun"]:
|
||||
for metadata_style in self.options["runtime"]["type"]:
|
||||
# write out the new data
|
||||
if not ca.write_metadata(md, metadata_style):
|
||||
logger.error("The tag save seemed to fail for style: %s!", MetaDataStyle.name[metadata_style])
|
||||
return False
|
||||
|
||||
print("Save complete.")
|
||||
logger.info("Save complete.")
|
||||
else:
|
||||
if options["runtime"]["terse"]:
|
||||
logger.info("dry-run option was set, so nothing was written")
|
||||
print("dry-run option was set, so nothing was written")
|
||||
print("Save complete.")
|
||||
logger.info("Save complete.")
|
||||
else:
|
||||
logger.info("dry-run option was set, so nothing was written, but here is the final set of tags:")
|
||||
print("dry-run option was set, so nothing was written, but here is the final set of tags:")
|
||||
print(f"{md}")
|
||||
return True
|
||||
|
||||
|
||||
def display_match_set_for_choice(
|
||||
label: str,
|
||||
match_set: MultipleMatch,
|
||||
options: settings.OptionValues,
|
||||
talker_api: ComicTalker,
|
||||
) -> None:
|
||||
print(f"{match_set.ca.path} -- {label}:")
|
||||
|
||||
# sort match list by year
|
||||
match_set.matches.sort(key=lambda k: k["year"] or 0)
|
||||
|
||||
for (counter, m) in enumerate(match_set.matches):
|
||||
counter += 1
|
||||
print(
|
||||
" {}. {} #{} [{}] ({}/{}) - {}".format(
|
||||
counter,
|
||||
m["series"],
|
||||
m["issue_number"],
|
||||
m["publisher"],
|
||||
m["month"],
|
||||
m["year"],
|
||||
m["issue_title"],
|
||||
)
|
||||
)
|
||||
if options["runtime"]["interactive"]:
|
||||
while True:
|
||||
i = input("Choose a match #, or 's' to skip: ")
|
||||
if (i.isdigit() and int(i) in range(1, len(match_set.matches) + 1)) or i == "s":
|
||||
break
|
||||
if i != "s":
|
||||
# save the data!
|
||||
# we know at this point, that the file is all good to go
|
||||
ca = match_set.ca
|
||||
md = create_local_metadata(ca, options)
|
||||
ct_md = actual_issue_data_fetch(match_set.matches[int(i) - 1]["issue_id"], options, talker_api)
|
||||
if options["comicvine"]["clear_metadata_on_import"]:
|
||||
md = ct_md
|
||||
if self.options["runtime"]["terse"]:
|
||||
logger.info("dry-run option was set, so nothing was written")
|
||||
print("dry-run option was set, so nothing was written")
|
||||
else:
|
||||
notes = (
|
||||
f"Tagged with ComicTagger {ctversion.version} using info from Comic Vine on"
|
||||
f" {datetime.now():%Y-%m-%d %H:%M:%S}. [Issue ID {ct_md.issue_id}]"
|
||||
logger.info("dry-run option was set, so nothing was written, but here is the final set of tags:")
|
||||
print("dry-run option was set, so nothing was written, but here is the final set of tags:")
|
||||
print(f"{md}")
|
||||
return True
|
||||
|
||||
def display_match_set_for_choice(self, label: str, match_set: MultipleMatch) -> None:
|
||||
print(f"{match_set.ca.path} -- {label}:")
|
||||
|
||||
# sort match list by year
|
||||
match_set.matches.sort(key=lambda k: k["year"] or 0)
|
||||
|
||||
for (counter, m) in enumerate(match_set.matches):
|
||||
counter += 1
|
||||
print(
|
||||
" {}. {} #{} [{}] ({}/{}) - {}".format(
|
||||
counter,
|
||||
m["series"],
|
||||
m["issue_number"],
|
||||
m["publisher"],
|
||||
m["month"],
|
||||
m["year"],
|
||||
m["issue_title"],
|
||||
)
|
||||
md.overlay(ct_md.replace(notes=utils.combine_notes(md.notes, notes, "Tagged with ComicTagger")))
|
||||
)
|
||||
if self.options["runtime"]["interactive"]:
|
||||
while True:
|
||||
i = input("Choose a match #, or 's' to skip: ")
|
||||
if (i.isdigit() and int(i) in range(1, len(match_set.matches) + 1)) or i == "s":
|
||||
break
|
||||
if i != "s":
|
||||
# save the data!
|
||||
# we know at this point, that the file is all good to go
|
||||
ca = match_set.ca
|
||||
md = self.create_local_metadata(ca)
|
||||
ct_md = self.actual_issue_data_fetch(match_set.matches[int(i) - 1]["issue_id"])
|
||||
if self.options["comicvine"]["clear_metadata_on_import"]:
|
||||
md = ct_md
|
||||
else:
|
||||
notes = (
|
||||
f"Tagged with ComicTagger {ctversion.version} using info from Comic Vine on"
|
||||
f" {datetime.now():%Y-%m-%d %H:%M:%S}. [Issue ID {ct_md.issue_id}]"
|
||||
)
|
||||
md.overlay(ct_md.replace(notes=utils.combine_notes(md.notes, notes, "Tagged with ComicTagger")))
|
||||
|
||||
if options["comicvine"]["auto_imprint"]:
|
||||
md.fix_publisher()
|
||||
if self.options["comicvine"]["auto_imprint"]:
|
||||
md.fix_publisher()
|
||||
|
||||
actual_metadata_save(ca, options, md)
|
||||
self.actual_metadata_save(ca, md)
|
||||
|
||||
def post_process_matches(self, match_results: OnlineMatchResults) -> None:
|
||||
# now go through the match results
|
||||
if self.options["runtime"]["show_save_summary"]:
|
||||
if len(match_results.good_matches) > 0:
|
||||
print("\nSuccessful matches:\n------------------")
|
||||
for f in match_results.good_matches:
|
||||
print(f)
|
||||
|
||||
def post_process_matches(
|
||||
match_results: OnlineMatchResults, options: settings.OptionValues, talker_api: ComicTalker
|
||||
) -> None:
|
||||
# now go through the match results
|
||||
if options["runtime"]["show_save_summary"]:
|
||||
if len(match_results.good_matches) > 0:
|
||||
print("\nSuccessful matches:\n------------------")
|
||||
for f in match_results.good_matches:
|
||||
print(f)
|
||||
if len(match_results.no_matches) > 0:
|
||||
print("\nNo matches:\n------------------")
|
||||
for f in match_results.no_matches:
|
||||
print(f)
|
||||
|
||||
if len(match_results.no_matches) > 0:
|
||||
print("\nNo matches:\n------------------")
|
||||
for f in match_results.no_matches:
|
||||
print(f)
|
||||
if len(match_results.write_failures) > 0:
|
||||
print("\nFile Write Failures:\n------------------")
|
||||
for f in match_results.write_failures:
|
||||
print(f)
|
||||
|
||||
if len(match_results.write_failures) > 0:
|
||||
print("\nFile Write Failures:\n------------------")
|
||||
for f in match_results.write_failures:
|
||||
print(f)
|
||||
if len(match_results.fetch_data_failures) > 0:
|
||||
print("\nNetwork Data Fetch Failures:\n------------------")
|
||||
for f in match_results.fetch_data_failures:
|
||||
print(f)
|
||||
|
||||
if len(match_results.fetch_data_failures) > 0:
|
||||
print("\nNetwork Data Fetch Failures:\n------------------")
|
||||
for f in match_results.fetch_data_failures:
|
||||
print(f)
|
||||
if not self.options["runtime"]["show_save_summary"] and not self.options["runtime"]["interactive"]:
|
||||
# just quit if we're not interactive or showing the summary
|
||||
return
|
||||
|
||||
if not options["runtime"]["show_save_summary"] and not options["runtime"]["interactive"]:
|
||||
# just quit if we're not interactive or showing the summary
|
||||
return
|
||||
if len(match_results.multiple_matches) > 0:
|
||||
print("\nArchives with multiple high-confidence matches:\n------------------")
|
||||
for match_set in match_results.multiple_matches:
|
||||
self.display_match_set_for_choice("Multiple high-confidence matches", match_set)
|
||||
|
||||
if len(match_results.multiple_matches) > 0:
|
||||
print("\nArchives with multiple high-confidence matches:\n------------------")
|
||||
for match_set in match_results.multiple_matches:
|
||||
display_match_set_for_choice("Multiple high-confidence matches", match_set, options, talker_api)
|
||||
if len(match_results.low_confidence_matches) > 0:
|
||||
print("\nArchives with low-confidence matches:\n------------------")
|
||||
for match_set in match_results.low_confidence_matches:
|
||||
if len(match_set.matches) == 1:
|
||||
label = "Single low-confidence match"
|
||||
else:
|
||||
label = "Multiple low-confidence matches"
|
||||
|
||||
if len(match_results.low_confidence_matches) > 0:
|
||||
print("\nArchives with low-confidence matches:\n------------------")
|
||||
for match_set in match_results.low_confidence_matches:
|
||||
if len(match_set.matches) == 1:
|
||||
label = "Single low-confidence match"
|
||||
else:
|
||||
label = "Multiple low-confidence matches"
|
||||
self.display_match_set_for_choice(label, match_set)
|
||||
|
||||
display_match_set_for_choice(label, match_set, options, talker_api)
|
||||
def run(self) -> None:
|
||||
if len(self.options["runtime"]["file_list"]) < 1:
|
||||
logger.error("You must specify at least one filename. Use the -h option for more info")
|
||||
return
|
||||
|
||||
match_results = OnlineMatchResults()
|
||||
self.batch_mode = len(self.options["runtime"]["file_list"]) > 1
|
||||
|
||||
def cli_mode(options: settings.OptionValues, talker_api: ComicTalker) -> None:
|
||||
if len(options["runtime"]["file_list"]) < 1:
|
||||
logger.error("You must specify at least one filename. Use the -h option for more info")
|
||||
return
|
||||
for f in self.options["runtime"]["file_list"]:
|
||||
self.process_file_cli(f, match_results)
|
||||
sys.stdout.flush()
|
||||
|
||||
match_results = OnlineMatchResults()
|
||||
self.post_process_matches(match_results)
|
||||
|
||||
for f in options["runtime"]["file_list"]:
|
||||
process_file_cli(f, options, talker_api, match_results)
|
||||
sys.stdout.flush()
|
||||
def create_local_metadata(self, ca: ComicArchive) -> GenericMetadata:
|
||||
md = GenericMetadata()
|
||||
md.set_default_page_list(ca.get_number_of_pages())
|
||||
|
||||
post_process_matches(match_results, options, talker_api)
|
||||
# now, overlay the parsed filename info
|
||||
if self.options["runtime"]["parse_filename"]:
|
||||
f_md = ca.metadata_from_filename(
|
||||
self.options["filename"]["complicated_parser"],
|
||||
self.options["filename"]["remove_c2c"],
|
||||
self.options["filename"]["remove_fcbd"],
|
||||
self.options["filename"]["remove_publisher"],
|
||||
self.options["runtime"]["split_words"],
|
||||
)
|
||||
|
||||
md.overlay(f_md)
|
||||
|
||||
def create_local_metadata(ca: ComicArchive, options: settings.OptionValues) -> GenericMetadata:
|
||||
md = GenericMetadata()
|
||||
md.set_default_page_list(ca.get_number_of_pages())
|
||||
for metadata_style in self.options["runtime"]["type"]:
|
||||
if ca.has_metadata(metadata_style):
|
||||
try:
|
||||
t_md = ca.read_metadata(metadata_style)
|
||||
md.overlay(t_md)
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error("Failed to load metadata for %s: %s", ca.path, e)
|
||||
|
||||
# now, overlay the parsed filename info
|
||||
if options["runtime"]["parse_filename"]:
|
||||
f_md = ca.metadata_from_filename(
|
||||
options["filename"]["complicated_parser"],
|
||||
options["filename"]["remove_c2c"],
|
||||
options["filename"]["remove_fcbd"],
|
||||
options["filename"]["remove_publisher"],
|
||||
options["runtime"]["split_words"],
|
||||
)
|
||||
# finally, use explicit stuff
|
||||
md.overlay(self.options["runtime"]["metadata"])
|
||||
|
||||
md.overlay(f_md)
|
||||
return md
|
||||
|
||||
for metadata_style in options["runtime"]["type"]:
|
||||
if ca.has_metadata(metadata_style):
|
||||
try:
|
||||
t_md = ca.read_metadata(metadata_style)
|
||||
md.overlay(t_md)
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error("Failed to load metadata for %s: %s", ca.path, e)
|
||||
|
||||
# finally, use explicit stuff
|
||||
md.overlay(options["runtime"]["metadata"])
|
||||
|
||||
return md
|
||||
|
||||
|
||||
def process_file_cli(
|
||||
filename: str, options: settings.OptionValues, talker_api: ComicTalker, match_results: OnlineMatchResults
|
||||
) -> None:
|
||||
batch_mode = len(options["runtime"]["file_list"]) > 1
|
||||
|
||||
ca = ComicArchive(filename, options["general"]["rar_exe_path"], str(graphics_path / "nocover.png"))
|
||||
|
||||
if not os.path.lexists(filename):
|
||||
logger.error("Cannot find %s", filename)
|
||||
return
|
||||
|
||||
if not ca.seems_to_be_a_comic_archive():
|
||||
logger.error("Sorry, but %s is not a comic archive!", filename)
|
||||
return
|
||||
|
||||
if not ca.is_writable() and (
|
||||
options["commands"]["delete"]
|
||||
or options["commands"]["copy"]
|
||||
or options["commands"]["save"]
|
||||
or options["commands"]["rename"]
|
||||
):
|
||||
logger.error("This archive is not writable")
|
||||
return
|
||||
|
||||
has = [False, False, False]
|
||||
if ca.has_cix():
|
||||
has[MetaDataStyle.CIX] = True
|
||||
if ca.has_cbi():
|
||||
has[MetaDataStyle.CBI] = True
|
||||
if ca.has_comet():
|
||||
has[MetaDataStyle.COMET] = True
|
||||
|
||||
if options["commands"]["print"]:
|
||||
|
||||
if not options["runtime"]["type"]:
|
||||
def print(self, ca: ComicArchive) -> None:
|
||||
if not self.options["runtime"]["type"]:
|
||||
page_count = ca.get_number_of_pages()
|
||||
|
||||
brief = ""
|
||||
|
||||
if batch_mode:
|
||||
if self.batch_mode:
|
||||
brief = f"{ca.path}: "
|
||||
|
||||
if ca.is_sevenzip():
|
||||
@ -267,146 +227,149 @@ def process_file_cli(
|
||||
brief += f"({page_count: >3} pages)"
|
||||
brief += " tags:[ "
|
||||
|
||||
if not (has[MetaDataStyle.CBI] or has[MetaDataStyle.CIX] or has[MetaDataStyle.COMET]):
|
||||
if not (
|
||||
ca.has_metadata(MetaDataStyle.CBI)
|
||||
or ca.has_metadata(MetaDataStyle.CIX)
|
||||
or ca.has_metadata(MetaDataStyle.COMET)
|
||||
):
|
||||
brief += "none "
|
||||
else:
|
||||
if has[MetaDataStyle.CBI]:
|
||||
if ca.has_metadata(MetaDataStyle.CBI):
|
||||
brief += "CBL "
|
||||
if has[MetaDataStyle.CIX]:
|
||||
if ca.has_metadata(MetaDataStyle.CIX):
|
||||
brief += "CR "
|
||||
if has[MetaDataStyle.COMET]:
|
||||
if ca.has_metadata(MetaDataStyle.COMET):
|
||||
brief += "CoMet "
|
||||
brief += "]"
|
||||
|
||||
print(brief)
|
||||
|
||||
if options["runtime"]["terse"]:
|
||||
if self.options["runtime"]["terse"]:
|
||||
return
|
||||
|
||||
print()
|
||||
|
||||
if not options["runtime"]["type"] or MetaDataStyle.CIX in options["runtime"]["type"]:
|
||||
if has[MetaDataStyle.CIX]:
|
||||
if not self.options["runtime"]["type"] or MetaDataStyle.CIX in self.options["runtime"]["type"]:
|
||||
if ca.has_metadata(MetaDataStyle.CIX):
|
||||
print("--------- ComicRack tags ---------")
|
||||
try:
|
||||
if options["runtime"]["raw"]:
|
||||
if self.options["runtime"]["raw"]:
|
||||
print(ca.read_raw_cix())
|
||||
else:
|
||||
print(ca.read_cix())
|
||||
except Exception as e:
|
||||
logger.error("Failed to load metadata for %s: %s", ca.path, e)
|
||||
|
||||
if not options["runtime"]["type"] or MetaDataStyle.CBI in options["runtime"]["type"]:
|
||||
if has[MetaDataStyle.CBI]:
|
||||
if not self.options["runtime"]["type"] or MetaDataStyle.CBI in self.options["runtime"]["type"]:
|
||||
if ca.has_metadata(MetaDataStyle.CBI):
|
||||
print("------- ComicBookLover tags -------")
|
||||
try:
|
||||
if options["runtime"]["raw"]:
|
||||
if self.options["runtime"]["raw"]:
|
||||
pprint(json.loads(ca.read_raw_cbi()))
|
||||
else:
|
||||
print(ca.read_cbi())
|
||||
except Exception as e:
|
||||
logger.error("Failed to load metadata for %s: %s", ca.path, e)
|
||||
|
||||
if not options["runtime"]["type"] or MetaDataStyle.COMET in options["runtime"]["type"]:
|
||||
if has[MetaDataStyle.COMET]:
|
||||
if not self.options["runtime"]["type"] or MetaDataStyle.COMET in self.options["runtime"]["type"]:
|
||||
if ca.has_metadata(MetaDataStyle.COMET):
|
||||
print("----------- CoMet tags -----------")
|
||||
try:
|
||||
if options["runtime"]["raw"]:
|
||||
if self.options["runtime"]["raw"]:
|
||||
print(ca.read_raw_comet())
|
||||
else:
|
||||
print(ca.read_comet())
|
||||
except Exception as e:
|
||||
logger.error("Failed to load metadata for %s: %s", ca.path, e)
|
||||
|
||||
elif options["commands"]["delete"]:
|
||||
for metadata_style in options["runtime"]["type"]:
|
||||
def delete(self, ca: ComicArchive) -> None:
|
||||
for metadata_style in self.options["runtime"]["type"]:
|
||||
style_name = MetaDataStyle.name[metadata_style]
|
||||
if has[metadata_style]:
|
||||
if not options["runtime"]["dryrun"]:
|
||||
if ca.has_metadata(metadata_style):
|
||||
if not self.options["runtime"]["dryrun"]:
|
||||
if not ca.remove_metadata(metadata_style):
|
||||
print(f"{filename}: Tag removal seemed to fail!")
|
||||
print(f"{ca.path}: Tag removal seemed to fail!")
|
||||
else:
|
||||
print(f"{filename}: Removed {style_name} tags.")
|
||||
print(f"{ca.path}: Removed {style_name} tags.")
|
||||
else:
|
||||
print(f"{filename}: dry-run. {style_name} tags not removed")
|
||||
print(f"{ca.path}: dry-run. {style_name} tags not removed")
|
||||
else:
|
||||
print(f"{filename}: This archive doesn't have {style_name} tags to remove.")
|
||||
print(f"{ca.path}: This archive doesn't have {style_name} tags to remove.")
|
||||
|
||||
elif options["commands"]["copy"] is not None:
|
||||
for metadata_style in options["runtime"]["type"]:
|
||||
def copy(self, ca: ComicArchive) -> None:
|
||||
for metadata_style in self.options["runtime"]["type"]:
|
||||
dst_style_name = MetaDataStyle.name[metadata_style]
|
||||
if options["runtime"]["no_overwrite"] and has[metadata_style]:
|
||||
print(f"{filename}: Already has {dst_style_name} tags. Not overwriting.")
|
||||
if self.options["runtime"]["no_overwrite"] and ca.has_metadata(metadata_style):
|
||||
print(f"{ca.path}: Already has {dst_style_name} tags. Not overwriting.")
|
||||
return
|
||||
if options["commands"]["copy"] == metadata_style:
|
||||
print(f"{filename}: Destination and source are same: {dst_style_name}. Nothing to do.")
|
||||
if self.options["commands"]["copy"] == metadata_style:
|
||||
print(f"{ca.path}: Destination and source are same: {dst_style_name}. Nothing to do.")
|
||||
return
|
||||
|
||||
src_style_name = MetaDataStyle.name[options["commands"]["copy"]]
|
||||
if has[options["commands"]["copy"]]:
|
||||
if not options["runtime"]["dryrun"]:
|
||||
src_style_name = MetaDataStyle.name[self.options["commands"]["copy"]]
|
||||
if ca.has_metadata(self.options["commands"]["copy"]):
|
||||
if not self.options["runtime"]["dryrun"]:
|
||||
try:
|
||||
md = ca.read_metadata(options["commands"]["copy"])
|
||||
md = ca.read_metadata(self.options["commands"]["copy"])
|
||||
except Exception as e:
|
||||
md = GenericMetadata()
|
||||
logger.error("Failed to load metadata for %s: %s", ca.path, e)
|
||||
|
||||
if options["apply_cbl_transform_on_bulk_operation"] and metadata_style == MetaDataStyle.CBI:
|
||||
md = CBLTransformer(md, options).apply()
|
||||
if self.options["apply_cbl_transform_on_bulk_operation"] and metadata_style == MetaDataStyle.CBI:
|
||||
md = CBLTransformer(md, self.options).apply()
|
||||
|
||||
if not ca.write_metadata(md, metadata_style):
|
||||
print(f"{filename}: Tag copy seemed to fail!")
|
||||
print(f"{ca.path}: Tag copy seemed to fail!")
|
||||
else:
|
||||
print(f"{filename}: Copied {src_style_name} tags to {dst_style_name}.")
|
||||
print(f"{ca.path}: Copied {src_style_name} tags to {dst_style_name}.")
|
||||
else:
|
||||
print(f"{filename}: dry-run. {src_style_name} tags not copied")
|
||||
print(f"{ca.path}: dry-run. {src_style_name} tags not copied")
|
||||
else:
|
||||
print(f"{filename}: This archive doesn't have {src_style_name} tags to copy.")
|
||||
print(f"{ca.path}: This archive doesn't have {src_style_name} tags to copy.")
|
||||
|
||||
elif options["commands"]["save"]:
|
||||
|
||||
if options["runtime"]["no_overwrite"]:
|
||||
for metadata_style in options["runtime"]["type"]:
|
||||
if has[metadata_style]:
|
||||
print(f"{filename}: Already has {MetaDataStyle.name[metadata_style]} tags. Not overwriting.")
|
||||
def save(self, ca: ComicArchive, match_results: OnlineMatchResults) -> None:
|
||||
if self.options["runtime"]["no_overwrite"]:
|
||||
for metadata_style in self.options["runtime"]["type"]:
|
||||
if ca.has_metadata(metadata_style):
|
||||
print(f"{ca.path}: Already has {MetaDataStyle.name[metadata_style]} tags. Not overwriting.")
|
||||
return
|
||||
|
||||
if batch_mode:
|
||||
if self.batch_mode:
|
||||
print(f"Processing {ca.path}...")
|
||||
|
||||
md = create_local_metadata(ca, options)
|
||||
md = self.create_local_metadata(ca)
|
||||
if md.issue is None or md.issue == "":
|
||||
if options["runtime"]["assume_issue_one"]:
|
||||
if self.options["runtime"]["assume_issue_one"]:
|
||||
md.issue = "1"
|
||||
|
||||
# now, search online
|
||||
if options["runtime"]["online"]:
|
||||
if options["runtime"]["issue_id"] is not None:
|
||||
if self.options["runtime"]["online"]:
|
||||
if self.options["runtime"]["issue_id"] is not None:
|
||||
# we were given the actual issue ID to search with
|
||||
try:
|
||||
ct_md = talker_api.fetch_comic_data(options["runtime"]["issue_id"])
|
||||
ct_md = self.talker_api.fetch_comic_data(self.options["runtime"]["issue_id"])
|
||||
except TalkerError as e:
|
||||
logger.exception(f"Error retrieving issue details. Save aborted.\n{e}")
|
||||
match_results.fetch_data_failures.append(str(ca.path.absolute()))
|
||||
return
|
||||
|
||||
if ct_md is None:
|
||||
logger.error("No match for ID %s was found.", options["runtime"]["issue_id"])
|
||||
logger.error("No match for ID %s was found.", self.options["runtime"]["issue_id"])
|
||||
match_results.no_matches.append(str(ca.path.absolute()))
|
||||
return
|
||||
|
||||
if options["cbl"]["apply_cbl_transform_on_cv_import"]:
|
||||
ct_md = CBLTransformer(ct_md, options).apply()
|
||||
if self.options["cbl"]["apply_cbl_transform_on_cv_import"]:
|
||||
ct_md = CBLTransformer(ct_md, self.options).apply()
|
||||
else:
|
||||
if md is None or md.is_empty:
|
||||
logger.error("No metadata given to search online with!")
|
||||
match_results.no_matches.append(str(ca.path.absolute()))
|
||||
return
|
||||
|
||||
ii = IssueIdentifier(ca, options, talker_api)
|
||||
ii = IssueIdentifier(ca, self.options, self.talker_api)
|
||||
|
||||
def myoutput(text: str) -> None:
|
||||
if options["runtime"]["verbose"]:
|
||||
if self.options["runtime"]["verbose"]:
|
||||
IssueIdentifier.default_write_output(text)
|
||||
|
||||
# use our overlaid MD struct to search
|
||||
@ -446,7 +409,7 @@ def process_file_cli(
|
||||
logger.error("Online search: Multiple good matches. Save aborted")
|
||||
match_results.multiple_matches.append(MultipleMatch(ca, matches))
|
||||
return
|
||||
if low_confidence and options["runtime"]["abort_on_low_confidence"]:
|
||||
if low_confidence and self.options["runtime"]["abort_on_low_confidence"]:
|
||||
logger.error("Online search: Low confidence match. Save aborted")
|
||||
match_results.low_confidence_matches.append(MultipleMatch(ca, matches))
|
||||
return
|
||||
@ -458,12 +421,12 @@ def process_file_cli(
|
||||
# we got here, so we have a single match
|
||||
|
||||
# now get the particular issue data
|
||||
ct_md = actual_issue_data_fetch(matches[0]["issue_id"], options, talker_api)
|
||||
ct_md = self.actual_issue_data_fetch(matches[0]["issue_id"])
|
||||
if ct_md.is_empty:
|
||||
match_results.fetch_data_failures.append(str(ca.path.absolute()))
|
||||
return
|
||||
|
||||
if options["comicvine"]["clear_metadata_on_import"]:
|
||||
if self.options["comicvine"]["clear_metadata_on_import"]:
|
||||
md = ct_md
|
||||
else:
|
||||
notes = (
|
||||
@ -472,29 +435,29 @@ def process_file_cli(
|
||||
)
|
||||
md.overlay(ct_md.replace(notes=utils.combine_notes(md.notes, notes, "Tagged with ComicTagger")))
|
||||
|
||||
if options["comicvine"]["auto_imprint"]:
|
||||
if self.options["comicvine"]["auto_imprint"]:
|
||||
md.fix_publisher()
|
||||
|
||||
# ok, done building our metadata. time to save
|
||||
if not actual_metadata_save(ca, options, md):
|
||||
if not self.actual_metadata_save(ca, md):
|
||||
match_results.write_failures.append(str(ca.path.absolute()))
|
||||
else:
|
||||
match_results.good_matches.append(str(ca.path.absolute()))
|
||||
|
||||
elif options["commands"]["rename"]:
|
||||
def rename(self, ca: ComicArchive) -> None:
|
||||
original_path = ca.path
|
||||
msg_hdr = ""
|
||||
if batch_mode:
|
||||
if self.batch_mode:
|
||||
msg_hdr = f"{ca.path}: "
|
||||
|
||||
md = create_local_metadata(ca, options)
|
||||
md = self.create_local_metadata(ca)
|
||||
|
||||
if md.series is None:
|
||||
logger.error(msg_hdr + "Can't rename without series name")
|
||||
return
|
||||
|
||||
new_ext = "" # default
|
||||
if options["filename"]["rename_set_extension_based_on_archive"]:
|
||||
if self.options["filename"]["rename_set_extension_based_on_archive"]:
|
||||
if ca.is_sevenzip():
|
||||
new_ext = ".cb7"
|
||||
elif ca.is_zip():
|
||||
@ -504,13 +467,13 @@ def process_file_cli(
|
||||
|
||||
renamer = FileRenamer(
|
||||
md,
|
||||
platform="universal" if options[filename]["rename_strict"] else "auto",
|
||||
replacements=options["rename"]["replacements"],
|
||||
platform="universal" if self.options["filename"]["rename_strict"] else "auto",
|
||||
replacements=self.options["rename"]["replacements"],
|
||||
)
|
||||
renamer.set_template(options[filename]["rename_template"])
|
||||
renamer.set_issue_zero_padding(options[filename]["rename_issue_number_padding"])
|
||||
renamer.set_smart_cleanup(options[filename]["rename_use_smart_string_cleanup"])
|
||||
renamer.move = options[filename]["rename_move_to_dir"]
|
||||
renamer.set_template(self.options["filename"]["rename_template"])
|
||||
renamer.set_issue_zero_padding(self.options["filename"]["rename_issue_number_padding"])
|
||||
renamer.set_smart_cleanup(self.options["filename"]["rename_use_smart_string_cleanup"])
|
||||
renamer.move = self.options["filename"]["rename_move_to_dir"]
|
||||
|
||||
try:
|
||||
new_name = renamer.determine_name(ext=new_ext)
|
||||
@ -522,16 +485,16 @@ def process_file_cli(
|
||||
"Please consult the template help in the settings "
|
||||
"and the documentation on the format at "
|
||||
"https://docs.python.org/3/library/string.html#format-string-syntax",
|
||||
options[filename]["rename_template"],
|
||||
self.options["filename"]["rename_template"],
|
||||
)
|
||||
return
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Formatter failure: %s metadata: %s", options[filename]["rename_template"], renamer.metadata
|
||||
"Formatter failure: %s metadata: %s", self.options["filename"]["rename_template"], renamer.metadata
|
||||
)
|
||||
|
||||
folder = get_rename_dir(
|
||||
ca, options[filename]["rename_dir"] if options[filename]["rename_move_to_dir"] else None
|
||||
ca, self.options["filename"]["rename_dir"] if self.options["filename"]["rename_move_to_dir"] else None
|
||||
)
|
||||
|
||||
full_path = folder / new_name
|
||||
@ -541,7 +504,7 @@ def process_file_cli(
|
||||
return
|
||||
|
||||
suffix = ""
|
||||
if not options["runtime"]["dryrun"]:
|
||||
if not self.options["runtime"]["dryrun"]:
|
||||
# rename the file
|
||||
try:
|
||||
ca.rename(utils.unique_file(full_path))
|
||||
@ -552,19 +515,19 @@ def process_file_cli(
|
||||
|
||||
print(f"renamed '{original_path.name}' -> '{new_name}' {suffix}")
|
||||
|
||||
elif options["commands"]["export_to_zip"]:
|
||||
def export(self, ca: ComicArchive) -> None:
|
||||
msg_hdr = ""
|
||||
if batch_mode:
|
||||
if self.batch_mode:
|
||||
msg_hdr = f"{ca.path}: "
|
||||
|
||||
if ca.is_zip():
|
||||
logger.error(msg_hdr + "Archive is already a zip file.")
|
||||
return
|
||||
|
||||
filename_path = pathlib.Path(filename).absolute()
|
||||
filename_path = ca.path
|
||||
new_file = filename_path.with_suffix(".cbz")
|
||||
|
||||
if options["runtime"]["abort_on_conflict"] and new_file.exists():
|
||||
if self.options["runtime"]["abort_on_conflict"] and new_file.exists():
|
||||
print(msg_hdr + f"{new_file.name} already exists in the that folder.")
|
||||
return
|
||||
|
||||
@ -572,10 +535,10 @@ def process_file_cli(
|
||||
|
||||
delete_success = False
|
||||
export_success = False
|
||||
if not options["runtime"]["dryrun"]:
|
||||
if not self.options["runtime"]["dryrun"]:
|
||||
if ca.export_as_zip(new_file):
|
||||
export_success = True
|
||||
if options["runtime"]["delete_after_zip_export"]:
|
||||
if self.options["runtime"]["delete_after_zip_export"]:
|
||||
try:
|
||||
filename_path.unlink(missing_ok=True)
|
||||
delete_success = True
|
||||
@ -587,7 +550,7 @@ def process_file_cli(
|
||||
new_file.unlink(missing_ok=True)
|
||||
else:
|
||||
msg = msg_hdr + f"Dry-run: Would try to create {os.path.split(new_file)[1]}"
|
||||
if options["runtime"]["delete_after_zip_export"]:
|
||||
if self.options["runtime"]["delete_after_zip_export"]:
|
||||
msg += " and delete original."
|
||||
print(msg)
|
||||
return
|
||||
@ -595,9 +558,47 @@ def process_file_cli(
|
||||
msg = msg_hdr
|
||||
if export_success:
|
||||
msg += f"Archive exported successfully to: {os.path.split(new_file)[1]}"
|
||||
if options["runtime"]["delete_after_zip_export"] and delete_success:
|
||||
if self.options["runtime"]["delete_after_zip_export"] and delete_success:
|
||||
msg += " (Original deleted) "
|
||||
else:
|
||||
msg += "Archive failed to export!"
|
||||
|
||||
print(msg)
|
||||
|
||||
def process_file_cli(self, filename: str, match_results: OnlineMatchResults) -> None:
|
||||
if not os.path.lexists(filename):
|
||||
logger.error("Cannot find %s", filename)
|
||||
return
|
||||
|
||||
ca = ComicArchive(filename, self.options["general"]["rar_exe_path"], str(graphics_path / "nocover.png"))
|
||||
|
||||
if not ca.seems_to_be_a_comic_archive():
|
||||
logger.error("Sorry, but %s is not a comic archive!", filename)
|
||||
return
|
||||
|
||||
if not ca.is_writable() and (
|
||||
self.options["commands"]["delete"]
|
||||
or self.options["commands"]["copy"]
|
||||
or self.options["commands"]["save"]
|
||||
or self.options["commands"]["rename"]
|
||||
):
|
||||
logger.error("This archive is not writable")
|
||||
return
|
||||
|
||||
if self.options["commands"]["print"]:
|
||||
self.print(ca)
|
||||
|
||||
elif self.options["commands"]["delete"]:
|
||||
self.delete(ca)
|
||||
|
||||
elif self.options["commands"]["copy"] is not None:
|
||||
self.copy(ca)
|
||||
|
||||
elif self.options["commands"]["save"]:
|
||||
self.save(ca, match_results)
|
||||
|
||||
elif self.options["commands"]["rename"]:
|
||||
self.rename(ca)
|
||||
|
||||
elif self.options["commands"]["export_to_zip"]:
|
||||
self.export(ca)
|
||||
|
@ -155,7 +155,7 @@ class App:
|
||||
|
||||
if self.options["runtime"]["no_gui"]:
|
||||
try:
|
||||
cli.cli_mode(self.options, talker_api)
|
||||
cli.CLI(self.options, talker_api).run()
|
||||
except Exception:
|
||||
logger.exception("CLI mode failed")
|
||||
else:
|
||||
|
Loading…
x
Reference in New Issue
Block a user