comictagger/comictaggerlib/issueidentifier.py

720 lines
26 KiB
Python
Raw Normal View History

"""A class to automatically identify a comic archive"""
# Copyright 2012-2014 Anthony Beville
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import io
try:
from PIL import Image
pil_available = True
except ImportError:
pil_available = False
from .genericmetadata import GenericMetadata
from .comicvinetalker import ComicVineTalker, ComicVineTalkerException
from .imagehasher import ImageHasher
from .imagefetcher import ImageFetcher, ImageFetcherException
from .issuestring import IssueString
from . import utils
#from settings import ComicTaggerSettings
#from comicvinecacher import ComicVineCacher
class IssueIdentifierNetworkError(Exception):
pass
class IssueIdentifierCancelled(Exception):
pass
class IssueIdentifier:
ResultNoMatches = 0
ResultFoundMatchButBadCoverScore = 1
ResultFoundMatchButNotFirstPage = 2
ResultMultipleMatchesWithBadImageScores = 3
ResultOneGoodMatch = 4
ResultMultipleGoodMatches = 5
def __init__(self, comic_archive, settings):
self.comic_archive = comic_archive
self.image_hasher = 1
self.onlyUseAdditionalMetaData = False
# a decent hamming score, good enough to call it a match
self.min_score_thresh = 16
# for alternate covers, be more stringent, since we're a bit more
# scattershot in comparisons
self.min_alternate_score_thresh = 12
# the min distance a hamming score must be to separate itself from
# closest neighbor
self.min_score_distance = 4
# a very strong hamming score, almost certainly the same image
self.strong_score_thresh = 8
# used to eliminate series names that are too long based on our search
# string
self.length_delta_thresh = settings.id_length_delta_thresh
# used to eliminate unlikely publishers
self.publisher_filter = [
s.strip().lower() for s in settings.id_publisher_filter.split(',')]
self.additional_metadata = GenericMetadata()
self.output_function = IssueIdentifier.defaultWriteOutput
self.callback = None
self.coverUrlCallback = None
self.search_result = self.ResultNoMatches
self.cover_page_index = 0
self.cancel = False
self.waitAndRetryOnRateLimit = False
def setScoreMinThreshold(self, thresh):
self.min_score_thresh = thresh
def setScoreMinDistance(self, distance):
self.min_score_distance = distance
def setAdditionalMetadata(self, md):
self.additional_metadata = md
def setNameLengthDeltaThreshold(self, delta):
self.length_delta_thresh = delta
def setPublisherFilter(self, filter):
self.publisher_filter = filter
def setHasherAlgorithm(self, algo):
self.image_hasher = algo
pass
def setOutputFunction(self, func):
self.output_function = func
pass
def calculateHash(self, image_data):
if self.image_hasher == '3':
return ImageHasher(data=image_data).dct_average_hash()
elif self.image_hasher == '2':
return ImageHasher(data=image_data).average_hash2()
else:
return ImageHasher(data=image_data).average_hash()
def getAspectRatio(self, image_data):
try:
im = Image.open(io.StringIO(image_data))
w, h = im.size
return float(h) / float(w)
except:
return 1.5
def cropCover(self, image_data):
im = Image.open(io.StringIO(image_data))
w, h = im.size
try:
cropped_im = im.crop((int(w / 2), 0, w, h))
except Exception as e:
print("cropCover() error:", e)
return None
output = io.StringIO()
cropped_im.save(output, format="PNG")
cropped_image_data = output.getvalue()
output.close()
return cropped_image_data
def setProgressCallback(self, cb_func):
self.callback = cb_func
def setCoverURLCallback(self, cb_func):
self.coverUrlCallback = cb_func
def getSearchKeys(self):
ca = self.comic_archive
search_keys = dict()
search_keys['series'] = None
search_keys['issue_number'] = None
search_keys['month'] = None
search_keys['year'] = None
search_keys['issue_count'] = None
if ca is None:
return
if self.onlyUseAdditionalMetaData:
search_keys['series'] = self.additional_metadata.series
search_keys['issue_number'] = self.additional_metadata.issue
search_keys['year'] = self.additional_metadata.year
search_keys['month'] = self.additional_metadata.month
search_keys['issue_count'] = self.additional_metadata.issueCount
return search_keys
# see if the archive has any useful meta data for searching with
if ca.hasCIX():
internal_metadata = ca.readCIX()
elif ca.hasCBI():
internal_metadata = ca.readCBI()
else:
internal_metadata = ca.readCBI()
# try to get some metadata from filename
md_from_filename = ca.metadataFromFilename()
# preference order:
# 1. Additional metadata
# 1. Internal metadata
# 1. Filename metadata
if self.additional_metadata.series is not None:
search_keys['series'] = self.additional_metadata.series
elif internal_metadata.series is not None:
search_keys['series'] = internal_metadata.series
else:
search_keys['series'] = md_from_filename.series
if self.additional_metadata.issue is not None:
search_keys['issue_number'] = self.additional_metadata.issue
elif internal_metadata.issue is not None:
search_keys['issue_number'] = internal_metadata.issue
else:
search_keys['issue_number'] = md_from_filename.issue
if self.additional_metadata.year is not None:
search_keys['year'] = self.additional_metadata.year
elif internal_metadata.year is not None:
search_keys['year'] = internal_metadata.year
else:
search_keys['year'] = md_from_filename.year
if self.additional_metadata.month is not None:
search_keys['month'] = self.additional_metadata.month
elif internal_metadata.month is not None:
search_keys['month'] = internal_metadata.month
else:
search_keys['month'] = md_from_filename.month
if self.additional_metadata.issueCount is not None:
search_keys['issue_count'] = self.additional_metadata.issueCount
elif internal_metadata.issueCount is not None:
search_keys['issue_count'] = internal_metadata.issueCount
else:
search_keys['issue_count'] = md_from_filename.issueCount
return search_keys
@staticmethod
def defaultWriteOutput(text):
sys.stdout.write(text)
sys.stdout.flush()
def log_msg(self, msg, newline=True):
if newline:
msg += "\n"
self.output_function(msg)
2015-02-15 03:55:04 -08:00
def getIssueCoverMatchScore(
self,
comicVine,
issue_id,
primary_img_url,
primary_thumb_url,
page_url,
localCoverHashList,
useRemoteAlternates=False,
useLog=True):
# localHashes is a list of pre-calculated hashs.
# useRemoteAlternates - indicates to use alternate covers from CV
try:
url_image_data = ImageFetcher().fetch(
primary_thumb_url, blocking=True)
except ImageFetcherException:
self.log_msg(
"Network issue while fetching cover image from Comic Vine. Aborting...")
raise IssueIdentifierNetworkError
2015-02-15 03:55:04 -08:00
if self.cancel:
raise IssueIdentifierCancelled
# alert the GUI, if needed
if self.coverUrlCallback is not None:
self.coverUrlCallback(url_image_data)
remote_cover_list = []
item = dict()
item['url'] = primary_img_url
item['hash'] = self.calculateHash(url_image_data)
remote_cover_list.append(item)
2015-02-15 03:55:04 -08:00
if self.cancel:
raise IssueIdentifierCancelled
if useRemoteAlternates:
alt_img_url_list = comicVine.fetchAlternateCoverURLs(
issue_id, page_url)
for alt_url in alt_img_url_list:
try:
alt_url_image_data = ImageFetcher().fetch(
alt_url, blocking=True)
except ImageFetcherException:
self.log_msg(
"Network issue while fetching alt. cover image from Comic Vine. Aborting...")
raise IssueIdentifierNetworkError
2015-02-15 03:55:04 -08:00
if self.cancel:
raise IssueIdentifierCancelled
# alert the GUI, if needed
if self.coverUrlCallback is not None:
self.coverUrlCallback(alt_url_image_data)
item = dict()
item['url'] = alt_url
item['hash'] = self.calculateHash(alt_url_image_data)
remote_cover_list.append(item)
2015-02-15 03:55:04 -08:00
if self.cancel:
raise IssueIdentifierCancelled
if useLog and useRemoteAlternates:
self.log_msg(
"[{0} alt. covers]".format(len(remote_cover_list) - 1), False)
if useLog:
self.log_msg("[ ", False)
score_list = []
done = False
for local_cover_hash in localCoverHashList:
for remote_cover_item in remote_cover_list:
score = ImageHasher.hamming_distance(
local_cover_hash, remote_cover_item['hash'])
score_item = dict()
score_item['score'] = score
score_item['url'] = remote_cover_item['url']
score_item['hash'] = remote_cover_item['hash']
score_list.append(score_item)
if useLog:
self.log_msg("{0}".format(score), False)
if score <= self.strong_score_thresh:
# such a good score, we can quit now, since for sure we
# have a winner
done = True
break
if done:
break
if useLog:
self.log_msg(" ]", False)
best_score_item = min(score_list, key=lambda x: x['score'])
return best_score_item
# def validate(self, issue_id):
# create hash list
# score = self.getIssueMatchScore(issue_id, hash_list, useRemoteAlternates = True)
# if score < 20:
# return True
# else:
# return False
def search(self):
ca = self.comic_archive
self.match_list = []
self.cancel = False
self.search_result = self.ResultNoMatches
if not pil_available:
self.log_msg(
"Python Imaging Library (PIL) is not available and is needed for issue identification.")
return self.match_list
if not ca.seemsToBeAComicArchive():
self.log_msg(
"Sorry, but " + opts.filename + " is not a comic archive!")
return self.match_list
cover_image_data = ca.getPage(self.cover_page_index)
cover_hash = self.calculateHash(cover_image_data)
# check the aspect ratio
# if it's wider than it is high, it's probably a two page spread
# if so, crop it and calculate a second hash
narrow_cover_hash = None
aspect_ratio = self.getAspectRatio(cover_image_data)
if aspect_ratio < 1.0:
right_side_image_data = self.cropCover(cover_image_data)
if right_side_image_data is not None:
narrow_cover_hash = self.calculateHash(right_side_image_data)
#self.log_msg("Cover hash = {0:016x}".format(cover_hash))
keys = self.getSearchKeys()
# normalize the issue number
keys['issue_number'] = IssueString(keys['issue_number']).asString()
# we need, at minimum, a series and issue number
if keys['series'] is None or keys['issue_number'] is None:
self.log_msg("Not enough info for a search!")
return []
self.log_msg("Going to search for:")
self.log_msg("\tSeries: " + keys['series'])
self.log_msg("\tIssue: " + keys['issue_number'])
if keys['issue_count'] is not None:
self.log_msg("\tCount: " + str(keys['issue_count']))
if keys['year'] is not None:
self.log_msg("\tYear: " + str(keys['year']))
if keys['month'] is not None:
self.log_msg("\tMonth: " + str(keys['month']))
#self.log_msg("Publisher Filter: " + str(self.publisher_filter))
comicVine = ComicVineTalker()
comicVine.wait_for_rate_limit = self.waitAndRetryOnRateLimit
comicVine.setLogFunc(self.output_function)
# self.log_msg(("Searching for " + keys['series'] + "...")
self.log_msg("Searching for {0} #{1} ...".format(
keys['series'], keys['issue_number']))
try:
cv_search_results = comicVine.searchForSeries(keys['series'])
except ComicVineTalkerException:
self.log_msg(
"Network issue while searching for series. Aborting...")
return []
#self.log_msg("Found " + str(len(cv_search_results)) + " initial results")
2015-02-15 03:55:04 -08:00
if self.cancel:
return []
2015-02-15 03:44:09 -08:00
if cv_search_results is None:
return []
series_second_round_list = []
#self.log_msg("Removing results with too long names, banned publishers, or future start dates")
for item in cv_search_results:
length_approved = False
publisher_approved = True
date_approved = True
# remove any series that starts after the issue year
2015-02-15 03:55:04 -08:00
if keys['year'] is not None and str(
keys['year']).isdigit() and item['start_year'] is not None and str(
item['start_year']).isdigit():
if int(keys['year']) < int(item['start_year']):
date_approved = False
# assume that our search name is close to the actual name, say
# within ,e.g. 5 chars
# sanitize both the search string and the result so that
# we are comparing the same type of data
shortened_key = utils.sanitize_title(keys['series'])
shortened_item_name = utils.sanitize_title(item['name'])
2015-02-15 03:44:09 -08:00
if len(shortened_item_name) < (
len(shortened_key) + self.length_delta_thresh):
length_approved = True
# remove any series from publishers on the filter
if item['publisher'] is not None:
publisher = item['publisher']['name']
2015-02-15 03:44:09 -08:00
if publisher is not None and publisher.lower(
) in self.publisher_filter:
publisher_approved = False
if length_approved and publisher_approved and date_approved:
series_second_round_list.append(item)
self.log_msg(
"Searching in " + str(len(series_second_round_list)) + " series")
if self.callback is not None:
self.callback(0, len(series_second_round_list))
# now sort the list by name length
series_second_round_list.sort(
key=lambda x: len(x['name']), reverse=False)
# build a list of volume IDs
volume_id_list = list()
for series in series_second_round_list:
volume_id_list.append(series['id'])
try:
2015-02-15 03:55:04 -08:00
issue_list = comicVine.fetchIssuesByVolumeIssueNumAndYear(
volume_id_list,
keys['issue_number'],
keys['year'])
except ComicVineTalkerException:
self.log_msg(
"Network issue while searching for series details. Aborting...")
return []
if issue_list is None:
return []
shortlist = list()
# now re-associate the issues and volumes
for issue in issue_list:
for series in series_second_round_list:
if series['id'] == issue['volume']['id']:
shortlist.append((series, issue))
break
if keys['year'] is None:
self.log_msg("Found {0} series that have an issue #{1}".format(
len(shortlist), keys['issue_number']))
else:
2015-02-15 03:55:04 -08:00
self.log_msg(
"Found {0} series that have an issue #{1} from {2}".format(
2015-02-15 03:55:04 -08:00
len(shortlist),
keys['issue_number'],
keys['year']))
# now we have a shortlist of volumes with the desired issue number
# Do first round of cover matching
counter = len(shortlist)
for series, issue in shortlist:
if self.callback is not None:
self.callback(counter, len(shortlist) * 3)
counter += 1
self.log_msg("Examining covers for ID: {0} {1} ({2}) ...".format(
series['id'],
series['name'],
series['start_year']), newline=False)
# parse out the cover date
day, month, year = comicVine.parseDateStr(issue['cover_date'])
# Now check the cover match against the primary image
hash_list = [cover_hash]
if narrow_cover_hash is not None:
hash_list.append(narrow_cover_hash)
try:
image_url = issue['image']['super_url']
thumb_url = issue['image']['thumb_url']
page_url = issue['site_detail_url']
2015-02-15 03:55:04 -08:00
score_item = self.getIssueCoverMatchScore(
comicVine,
issue['id'],
image_url,
thumb_url,
page_url,
hash_list,
useRemoteAlternates=False)
except:
self.match_list = []
return self.match_list
match = dict()
match['series'] = "{0} ({1})".format(
series['name'], series['start_year'])
match['distance'] = score_item['score']
match['issue_number'] = keys['issue_number']
match['cv_issue_count'] = series['count_of_issues']
match['url_image_hash'] = score_item['hash']
match['issue_title'] = issue['name']
match['issue_id'] = issue['id']
match['volume_id'] = series['id']
match['month'] = month
match['year'] = year
match['publisher'] = None
if series['publisher'] is not None:
match['publisher'] = series['publisher']['name']
match['image_url'] = image_url
match['thumb_url'] = thumb_url
match['page_url'] = page_url
match['description'] = issue['description']
self.match_list.append(match)
self.log_msg(" --> {0}".format(match['distance']), newline=False)
self.log_msg("")
if len(self.match_list) == 0:
self.log_msg(":-(no matches!")
self.search_result = self.ResultNoMatches
return self.match_list
# sort list by image match scores
self.match_list.sort(key=lambda k: k['distance'])
l = []
for i in self.match_list:
l.append(i['distance'])
self.log_msg("Compared to covers in {0} issue(s):".format(
len(self.match_list)), newline=False)
self.log_msg(str(l))
def print_match(item):
self.log_msg("-----> {0} #{1} {2} ({3}/{4}) -- score: {5}".format(
item['series'],
item['issue_number'],
item['issue_title'],
item['month'],
item['year'],
item['distance']))
best_score = self.match_list[0]['distance']
if best_score >= self.min_score_thresh:
# we have 1 or more low-confidence matches (all bad cover scores)
# look at a few more pages in the archive, and also alternate
# covers online
self.log_msg(
"Very weak scores for the cover. Analyzing alternate pages and covers...")
hash_list = [cover_hash]
if narrow_cover_hash is not None:
hash_list.append(narrow_cover_hash)
for i in range(1, min(3, ca.getNumberOfPages())):
image_data = ca.getPage(i)
page_hash = self.calculateHash(image_data)
hash_list.append(page_hash)
second_match_list = []
counter = 2 * len(self.match_list)
for m in self.match_list:
if self.callback is not None:
self.callback(counter, len(self.match_list) * 3)
counter += 1
2015-02-15 03:55:04 -08:00
self.log_msg(
"Examining alternate covers for ID: {0} {1} ...".format(
2015-02-15 03:55:04 -08:00
m['volume_id'],
m['series']),
newline=False)
try:
2015-02-15 03:55:04 -08:00
score_item = self.getIssueCoverMatchScore(
comicVine,
m['issue_id'],
m['image_url'],
m['thumb_url'],
m['page_url'],
hash_list,
useRemoteAlternates=True)
except:
self.match_list = []
return self.match_list
self.log_msg("--->{0}".format(score_item['score']))
self.log_msg("")
if score_item['score'] < self.min_alternate_score_thresh:
second_match_list.append(m)
m['distance'] = score_item['score']
if len(second_match_list) == 0:
if len(self.match_list) == 1:
self.log_msg("No matching pages in the issue.")
self.log_msg(
"--------------------------------------------------------------------------")
print_match(self.match_list[0])
self.log_msg(
"--------------------------------------------------------------------------")
self.search_result = self.ResultFoundMatchButBadCoverScore
else:
self.log_msg(
"--------------------------------------------------------------------------")
self.log_msg(
"Multiple bad cover matches! Need to use other info...")
self.log_msg(
"--------------------------------------------------------------------------")
self.search_result = self.ResultMultipleMatchesWithBadImageScores
return self.match_list
else:
# We did good, found something!
self.log_msg("Success in secondary/alternate cover matching!")
self.match_list = second_match_list
# sort new list by image match scores
self.match_list.sort(key=lambda k: k['distance'])
best_score = self.match_list[0]['distance']
self.log_msg(
"[Second round cover matching: best score = {0}]".format(best_score))
# now drop down into the rest of the processing
if self.callback is not None:
self.callback(99, 100)
# now pare down list, remove any item more than specified distant from
# the top scores
for item in reversed(self.match_list):
if item['distance'] > best_score + self.min_score_distance:
self.match_list.remove(item)
# One more test for the case choosing limited series first issue vs a trade with the same cover:
# if we have a given issue count > 1 and the volume from CV has
# count==1, remove it from match list
2015-02-15 03:44:09 -08:00
if len(self.match_list) >= 2 and keys[
'issue_count'] is not None and keys['issue_count'] != 1:
new_list = list()
for match in self.match_list:
if match['cv_issue_count'] != 1:
new_list.append(match)
else:
2015-02-15 03:55:04 -08:00
self.log_msg(
"Removing volume {0} [{1}] from consideration (only 1 issue)".format(
match['series'],
match['volume_id']))
if len(new_list) > 0:
self.match_list = new_list
if len(self.match_list) == 1:
self.log_msg(
"--------------------------------------------------------------------------")
print_match(self.match_list[0])
self.log_msg(
"--------------------------------------------------------------------------")
self.search_result = self.ResultOneGoodMatch
elif len(self.match_list) == 0:
self.log_msg(
"--------------------------------------------------------------------------")
self.log_msg("No matches found :(")
self.log_msg(
"--------------------------------------------------------------------------")
self.search_result = self.ResultNoMatches
else:
# we've got multiple good matches:
self.log_msg("More than one likely candidate.")
self.search_result = self.ResultMultipleGoodMatches
self.log_msg(
"--------------------------------------------------------------------------")
for item in self.match_list:
print_match(item)
self.log_msg(
"--------------------------------------------------------------------------")
return self.match_list