Some checks failed
CI / lint (ubuntu-latest, 3.9) (push) Has been cancelled
CI / build-and-test (macos-13, 3.13) (push) Has been cancelled
CI / build-and-test (macos-13, 3.9) (push) Has been cancelled
CI / build-and-test (macos-14, 3.13) (push) Has been cancelled
CI / build-and-test (macos-14, 3.9) (push) Has been cancelled
CI / build-and-test (ubuntu-22.04, 3.13) (push) Has been cancelled
CI / build-and-test (ubuntu-22.04, 3.9) (push) Has been cancelled
CI / build-and-test (ubuntu-22.04-arm, 3.13) (push) Has been cancelled
CI / build-and-test (ubuntu-22.04-arm, 3.9) (push) Has been cancelled
CI / build-and-test (windows-latest, 3.13) (push) Has been cancelled
CI / build-and-test (windows-latest, 3.9) (push) Has been cancelled
Contributions / A job to automate contrib in readme (push) Has been cancelled
174 lines
5.5 KiB
Python
174 lines
5.5 KiB
Python
"""A class to manage fetching and caching of images by URL"""
|
|
|
|
#
|
|
# Copyright 2012-2014 ComicTagger Authors
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
from __future__ import annotations
|
|
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import shutil
|
|
import sqlite3 as lite
|
|
import tempfile
|
|
from typing import TYPE_CHECKING
|
|
|
|
try:
|
|
import niquests as requests
|
|
except ImportError:
|
|
import requests
|
|
|
|
from comictaggerlib import ctversion
|
|
|
|
if TYPE_CHECKING:
|
|
from PyQt6 import QtCore, QtNetwork
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ImageFetcherException(Exception): ...
|
|
|
|
|
|
def fetch_complete(url: str, image_data: bytes | QtCore.QByteArray) -> None: ...
|
|
|
|
|
|
class ImageFetcher:
|
|
image_fetch_complete = fetch_complete
|
|
qt_available = True
|
|
|
|
def __init__(self, cache_folder: pathlib.Path) -> None:
|
|
self.db_file = cache_folder / "image_url_cache.db"
|
|
self.cache_folder = cache_folder / "image_cache"
|
|
|
|
self.user_data = None
|
|
self.fetched_url = ""
|
|
|
|
if self.qt_available:
|
|
try:
|
|
from PyQt6 import QtNetwork
|
|
|
|
self.qt_available = True
|
|
except ImportError:
|
|
self.qt_available = False
|
|
if not os.path.exists(self.db_file):
|
|
self.create_image_db()
|
|
|
|
if self.qt_available:
|
|
self.nam = QtNetwork.QNetworkAccessManager()
|
|
|
|
def clear_cache(self) -> None:
|
|
os.unlink(self.db_file)
|
|
if os.path.isdir(self.cache_folder):
|
|
shutil.rmtree(self.cache_folder)
|
|
self.cache_folder.mkdir(parents=True, exist_ok=True)
|
|
|
|
def fetch(self, url: str, blocking: bool = False) -> bytes:
|
|
"""
|
|
If called with blocking=True, this will block until the image is
|
|
fetched.
|
|
If called with blocking=False, this will run the fetch in the
|
|
background, and emit a signal when done
|
|
"""
|
|
|
|
self.fetched_url = url
|
|
|
|
# first look in the DB
|
|
image_data = self.get_image_from_cache(url)
|
|
# Async for retrieving covers seems to work well
|
|
if blocking or not self.qt_available:
|
|
if not image_data:
|
|
try:
|
|
image_data = requests.get(url, headers={"user-agent": "comictagger/" + ctversion.version}).content
|
|
# save the image to the cache
|
|
self.add_image_to_cache(self.fetched_url, image_data)
|
|
except Exception as e:
|
|
logger.exception("Fetching url failed: %s", e)
|
|
raise ImageFetcherException("Network Error!") from e
|
|
return image_data
|
|
|
|
if self.qt_available:
|
|
from PyQt6 import QtCore, QtNetwork
|
|
|
|
# if we found it, just emit the signal asap
|
|
if image_data:
|
|
ImageFetcher.image_fetch_complete(url, QtCore.QByteArray(image_data))
|
|
return b""
|
|
|
|
# didn't find it. look online
|
|
self.nam.finished.connect(self.finish_request)
|
|
self.nam.get(QtNetwork.QNetworkRequest(QtCore.QUrl(url)))
|
|
|
|
# we'll get called back when done...
|
|
return b""
|
|
|
|
def finish_request(self, reply: QtNetwork.QNetworkReply) -> None:
|
|
# read in the image data
|
|
logger.debug("request finished")
|
|
image_data = reply.readAll()
|
|
|
|
# save the image to the cache
|
|
self.add_image_to_cache(reply.request().url().toString(), image_data)
|
|
|
|
ImageFetcher.image_fetch_complete(reply.request().url().toString(), image_data)
|
|
|
|
def create_image_db(self) -> None:
|
|
# this will wipe out any existing version
|
|
open(self.db_file, "wb").close()
|
|
|
|
# wipe any existing image cache folder too
|
|
if os.path.isdir(self.cache_folder):
|
|
shutil.rmtree(self.cache_folder)
|
|
os.makedirs(self.cache_folder)
|
|
|
|
# create tables
|
|
with lite.connect(self.db_file) as con:
|
|
cur = con.cursor()
|
|
|
|
cur.execute("CREATE TABLE Images(url TEXT,filename TEXT,timestamp TEXT,PRIMARY KEY (url))")
|
|
|
|
def add_image_to_cache(self, url: str, image_data: bytes | QtCore.QByteArray) -> None:
|
|
with lite.connect(self.db_file) as con:
|
|
cur = con.cursor()
|
|
|
|
timestamp = datetime.datetime.now()
|
|
|
|
tmp_fd, filename = tempfile.mkstemp(dir=self.cache_folder, prefix="img")
|
|
with os.fdopen(tmp_fd, "w+b") as f:
|
|
f.write(bytes(image_data))
|
|
|
|
cur.execute("INSERT or REPLACE INTO Images VALUES(?, ?, ?)", (url, filename, timestamp))
|
|
|
|
def get_image_from_cache(self, url: str) -> bytes:
|
|
with lite.connect(self.db_file) as con:
|
|
cur = con.cursor()
|
|
|
|
cur.execute("SELECT filename FROM Images WHERE url=?", [url])
|
|
row = cur.fetchone()
|
|
|
|
if row is None:
|
|
return b""
|
|
|
|
filename = row[0]
|
|
image_data = b""
|
|
|
|
try:
|
|
with open(filename, "rb") as f:
|
|
image_data = f.read()
|
|
f.close()
|
|
except OSError:
|
|
pass
|
|
|
|
return image_data
|