rpiwebapp-public/func.py

593 lines
25 KiB
Python

import inspect
import json
import os
import pathlib
import re
from datetime import timedelta
import enzyme
import requests
from blinker import Namespace
from comicapi import comicarchive
from flask import current_app
from wand.image import Image
import database
rpi_signals = Namespace()
comic_loaded = rpi_signals.signal("comic-loaded")
movie_loaded = rpi_signals.signal("movie-loaded")
tv_show_loaded = rpi_signals.signal("tv_show_loaded")
tv_episodes_loaded = rpi_signals.signal("tv_episodes_loaded")
games_loaded = rpi_signals.signal("games_loaded")
publishers_to_ignore = ["***REMOVED***"]
API_KEY = "***REMOVED***"
# Directories
COMICS_DIRECTORY = pathlib.Path("/srv/comics/")
MOVIES_DIRECTORY = pathlib.Path("/srv/movies/")
TV_SHOWS_DIRECTORY = pathlib.Path("/srv/tv/")
GAMES_DIRECTORY = pathlib.Path("/srv/games/")
#############
def get_comics():
total_comics = 0
comics_in_db = 0
comics_added = 0
meta = []
thumbnails = []
i = 0
for root, dirs, files in os.walk(COMICS_DIRECTORY):
for f in files:
if "temp" in root:
continue
if f.endswith((".cbr", ".cbz")):
total_comics += 1
path = pathlib.Path(root, f)
if not database.comic_path_in_db(str(path)):
try:
test_path = path.encode("utf8")
except Exception as e:
current_app.logger.error("encoding failed on: " + str(path))
continue
archive = open_comic(path)
md = archive.readCIX()
if md.publisher in publishers_to_ignore:
continue
current_app.logger.info(str(path))
try:
meta.append((str(path), md))
thumbnails.append(get_comic_thumbnails(archive))
comics_added += 1
i += 1
except Exception as e:
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
continue
if i >= 2:
comic_loaded.send("anonymous", meta=meta.copy(), thumbnails=thumbnails.copy())
meta.clear()
thumbnails.clear()
i = 0
comics_in_db += 1
current_app.logger.info("total number of comics: " + str(total_comics))
current_app.logger.info("comics in database: " + str(comics_in_db))
current_app.logger.info("number of comics added: " + str(comics_added))
comic_loaded.send("anonymous", meta=meta, thumbnails=thumbnails)
def get_comic(path: pathlib.Path):
meta = []
thumbnails = []
if path.suffix == ".cbr":
if not database.comic_path_in_db(str(path)):
try:
test_path = str(path).encode("utf8")
except Exception as e:
current_app.logger.error(f"encoding failed on: {path}")
return
archive = open_comic(path)
md = archive.readCIX()
if md.publisher in publishers_to_ignore:
return
current_app.logger.info(path)
meta.append((str(path), md))
try:
thumbnails.append(get_comic_thumbnails(archive))
except Exception as e:
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
return
comic_loaded.send("anonymous", meta=meta, thumbnails=thumbnails)
def get_comic_thumbnails(comic):
thumbnails = []
size = "256x256"
new_height = 256
new_width = 256
for page in range(comic.getNumberOfPages()):
with Image(blob=comic.getPage(page)) as image:
orig_height = image.height
orig_width = image.width
if orig_height >= orig_width:
width = int((orig_width / orig_height) * new_height)
height = new_height
else:
height = int((orig_height / orig_width) * new_width)
width = new_width
image.thumbnail(width, height)
thumbnails.append((image.make_blob(), "image/" + image.format))
return thumbnails
def open_comic(path):
archive = comicarchive.ComicArchive(str(path), default_image_path="static/images/icon.png")
return archive
def get_movies():
current_app.logger.info("start loading movies")
pattern = r"(?P<title>.+) \((?P<year>\d+)\)(?P<extended>\(extended\))?(?P<directors_cut> Director's Cut)?(?P<extension>\.mkv)"
url = "https://api.themoviedb.org/3/search/movie"
movies = []
total_movies = 0
movies_in_db = 0
movies_added = 0
for root, dirs, files in os.walk(MOVIES_DIRECTORY):
for f in files:
if f.endswith(".mkv"):
total_movies += 1
path = pathlib.Path(root, f)
if not database.movie_path_in_db(str(path)):
try:
match = re.match(pattern, f)
if not match:
current_app.logger.info(f + " did not match regex.")
continue
current_app.logger.info("movie path: " + str(path))
title = match.group("title")
current_app.logger.info("movie title: " + title)
year = int(match.group("year"))
extended = True if match.group("extended") else False
directors_cut = True if match.group("directors_cut") else False
res_4k_path = (path.parent / path.name.replace(f"({year})", f"({year})(4k)"))
res_4k = res_4k_path.exists()
data = {
"api_key": API_KEY,
"query": title,
"primary_release_year": year,
"language": "en-US",
}
r = requests.get(url, params=data)
if len(r.json()["results"]) == 0:
data = {
"api_key": API_KEY,
"query": title,
"year": year,
"language": "en-US",
}
r = requests.get(url, params=data)
if len(r.json()["results"]) == 0:
current_app.logger.info(f"no movie results for {title} - ({year})")
continue
info = r.json()["results"][0]
tmdb_id = info["id"]
description = info["overview"]
poster_path = info["poster_path"]
backdrop_path = info["backdrop_path"]
movies_added += 1
movies.append((str(path), tmdb_id, title, year, description, extended, directors_cut, poster_path, backdrop_path, res_4k,))
if len(movies) >= 20:
movie_loaded.send("anonymous", movies=movies.copy())
movies.clear()
except Exception as e:
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
movies_in_db += 1
movie_loaded.send("anonymous", movies=movies)
current_app.logger.info("finish loading movies")
current_app.logger.info("total movies: " + str(total_movies))
current_app.logger.info("movies in database: " + str(movies_in_db))
current_app.logger.info("movies added: " + str(movies_added))
def get_movie(path: pathlib.Path):
pattern = r"(?P<title>.+) \((?P<year>\d+)\)(?P<extended>\(extended\))?(?P<directors_cut> Director's Cut)?(?P<extension>\.mkv)"
url = "https://api.themoviedb.org/3/search/movie"
movies = []
if not database.movie_path_in_db(str(path)):
try:
match = re.match(pattern, path.name)
if not match:
current_app.logger.info(f"{path.name} did not match regex.")
return
current_app.logger.info(f"movie path: {path}")
title = match.group("title")
current_app.logger.info("movie title: " + title)
year = int(match.group("year"))
extended = match.group("extended") is True
directors_cut = match.group("directors_cut") is True
res_4k = (path.parent / path.name.replace(f"({year})", f"({year})(4k)")).exists()
data = {
"api_key": API_KEY,
"query": title,
"primary_release_year": year,
"language": "en-US",
}
r = requests.get(url, params=data)
if len(r.json()["results"]) == 0:
data = {
"api_key": API_KEY,
"query": title,
"year": year,
"language": "en-US",
}
r = requests.get(url, params=data)
info = r.json()["results"][0]
if len(r.json()["results"]) == 0:
current_app.logger.info(f"no movie results for {title} - ({year})")
return
tmdb_id = info["id"]
description = info["overview"]
poster_path = info["poster_path"]
backdrop_path = info["backdrop_path"]
movies.append((str(path), tmdb_id, title, year, description, extended, directors_cut, poster_path, backdrop_path, res_4k,))
movie_loaded.send("anonymous", movies=movies.copy())
movies.clear()
current_app.logger.info("finish loading movie")
except Exception as e:
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
def get_tv_shows():
dir_pattern = r"(?P<title>.+) \((?P<year>\d+)\)"
search_url = "https://api.themoviedb.org/3/search/tv"
tv_url = "https://api.themoviedb.org/3/tv/"
current_app.logger.info("start loading tv shows")
for dir in sorted(TV_SHOWS_DIRECTORY.iterdir()):
dir_match = re.match(dir_pattern, dir.name)
if dir_match:
path = TV_SHOWS_DIRECTORY / dir
if not database.tv_show_path_in_db(str(path)):
json_info = {}
if (path / "info.json").exists():
with (path / "info.json").open() as f:
json_info = json.load(f)
series_name = dir_match.group("title")
series_year = int(dir_match.group("year"))
if not json_info:
data = {
"api_key": API_KEY,
"query": series_name,
"first_air_date_year": series_year,
"language": "en-US",
}
r = requests.get(search_url, params=data)
if len(r.json()["results"]) == 0:
current_app.logger.info(f"no tv show results for {series_name} - ({series_year})")
continue
info = r.json()["results"][0]
else:
data = {"api_key": API_KEY, "language": "en-US"}
r = requests.get(tv_url + str(json_info["tmdb_id"]), params=data)
if "status_code" in r.json().keys():
current_app.logger.info(f"no tv show results for {series_name} - ({series_year})")
continue
info = r.json()
tmdb_id = info["id"]
description = info["overview"]
poster_path = info["poster_path"]
tv_show_data = (
tmdb_id,
series_name,
series_year,
description,
poster_path,
str(path),
)
tv_show_loaded.send("anonymous", tv_show=tv_show_data)
current_app.logger.info("finished loading tv shows.")
def get_tv_episodes():
video_pattern = r"S(?P<season>\d+)E(?P<episode>\d+) - (?P<title>.+)(?P<extension>.mkv)"
rows = database.get_all_tv_shows()
current_app.logger.info("start loading tv episodes")
for tv_show in rows:
try:
episodes = []
tv_show_path = pathlib.Path(tv_show.path)
for video in sorted(tv_show_path.iterdir()):
video_match = re.match(video_pattern, str(video))
if video_match:
path = tv_show_path / video
if not database.tv_episode_path_in_db(str(path)):
season = int(video_match.group("season"))
episode = int(video_match.group("episode"))
episode_name = video_match.group("title")
current_app.logger.info(f"S{season} E{episode} - {tv_show.title}: {episode_name}")
url = f"https://api.themoviedb.org/3/tv/{tv_show.tmdb_id}/season/{season}/episode/{episode}"
data = {"api_key": API_KEY, "language": "en-US"}
r = requests.get(url, params=data)
if "status_code" in r.json().keys():
current_app.logger.info(f"no tv episode results for S{season} E{episode} - {tv_show.title}: {episode_name}")
continue
info = r.json()
episode_tmdb_id = info["id"]
episode_description = info["overview"]
episode_still_path = info["still_path"]
episodes.append(
(episode_tmdb_id, tv_show.tmdb_id, episode_name, season, episode, episode_description, episode_still_path, str(path),)
)
if len(episodes) >= 10:
tv_episodes_loaded.send("anonymous", tv_episodes=episodes.copy())
episodes.clear()
tv_episodes_loaded.send("anonymous", tv_episodes=episodes)
except Exception as e:
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
current_app.logger.info("finished loading tv episodes")
def get_tv_episode(path: pathlib.Path):
video_pattern = r"S(?P<season>\d+)E(?P<episode>\d+) - (?P<title>.+)(?P<extension>.mkv)"
video_match = re.match(video_pattern, path.name)
if video_match:
rows = database.get_all_tv_shows()
for tv_show in rows:
if path.parent == tv_show.path:
if not database.tv_episode_path_in_db(str(path)):
episodes = []
season = int(video_match.group("season"))
episode = int(video_match.group("episode"))
episode_name = video_match.group("title")
url = f"https://api.themoviedb.org/3/tv/{tv_show.tmdb_id}/season/{season}/episode/{episode}"
data = {"api_key": API_KEY, "language": "en-US"}
r = requests.get(url, params=data)
if "status_code" in r.json().keys():
current_app.logger.info(f"no tv episode results for S{season} E{episode} - {tv_show.title}: {episode_name}")
continue
info = r.json()
episode_tmdb_id = info["id"]
episode_description = info["overview"]
episode_still_path = info["still_path"]
episodes.append(
(episode_tmdb_id, tv_show.tmdb_id, episode_name, season, episode, episode_description, episode_still_path, str(path),)
)
tv_episodes_loaded.send("anonymous", tv_episodes=episodes)
current_app.logger.info("finished loading tv episode")
def get_chapters(path):
try:
with open(path, "rb") as f:
mkv = enzyme.MKV(f)
except Exception as e:
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
return {}
mkv_info = {}
for chapter in mkv.chapters:
if chapter.string == "Intro":
mkv_info["intro"] = {
"start": chapter.start.seconds,
"end": timedelta(microseconds=chapter.end // 1000).seconds,
}
if chapter.string == "Credits":
mkv_info["credits"] = {"start": chapter.start.seconds}
if chapter.string == "end-credit scene":
if "end-credit scene" not in mkv_info.keys():
mkv_info["end-credit scene"] = []
end_credit = {"start": chapter.start.seconds}
if chapter.end:
end_credit["end"] = timedelta(microseconds=chapter.end // 1000).seconds
mkv_info["end-credit scene"].append(end_credit)
return mkv_info
def get_tags(path):
try:
with open(path, "rb") as f:
mkv = enzyme.MKV(f)
except Exception as e:
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
mkv_info = {}
for tag in mkv.tags:
if tag.targets.data[0].data == 70:
mkv_info["collection"] = {}
for simple in tag.simpletags:
if simple.name == "TITLE":
mkv_info["collection"]["title"] = simple.string
if simple.name == "TOTAL_PARTS":
mkv_info["collection"]["episodes"] = int(simple.string)
if simple.name == "KEYWORDS":
mkv_info["collection"]["key_words"] = simple.string.split(",")
if simple.name == "DATE_RELEASED":
mkv_info["collection"]["year"] = int(simple.string)
if simple.name == "SUMMARY":
mkv_info["collection"]["summary"] = simple.string
if tag.targets.data[0].data == 60:
mkv_info["season"] = {}
for simple in tag.simpletags:
if simple.name == "TITLE":
mkv_info["season"]["title"] = simple.string
if simple.name == "TOTAL_PARTS":
mkv_info["season"]["episodes"] = int(simple.string)
if tag.targets.data[0].data == 50:
mkv_info["movie"] = {}
for simple in tag.simpletags:
if simple.name == "TITLE":
mkv_info["movie"]["title"] = simple.string
if simple.name == "DATE_RELEASED":
mkv_info["movie"]["year"] = int(simple.string)
if simple.name == "PART_NUMBER":
mkv_info["movie"]["episode"] = int(simple.string)
if simple.name == "KEYWORDS":
mkv_info["movie"]["key_words"] = simple.string.split(",")
if simple.name == "SUMMARY":
mkv_info["movie"]["summary"] = simple.string
return mkv_info
def get_games():
games = []
cover_url = "https://api-v3.igdb.com/covers"
games_url = "https://api-v3.igdb.com/games"
headers = {
"accept": "application/json",
"user-key": "641f7f0e3af5273dcc1105ce851ea804",
}
i = 0
current_app.logger.info("start loading games")
for folder in sorted(GAMES_DIRECTORY.iterdir()):
root = folder.absolute()
if root.is_dir():
try:
path = root / "info.json"
with path.open() as f:
info = json.load(f)
game_id = info["id"]
if not database.game_in_db(game_id):
current_app.logger.info(f"start loading game: {info['name']}:{info['id']}")
data = f"fields summary;limit 1;where id={game_id};"
r = requests.get(games_url, headers=headers, data=data).json()[0]
description = ""
if "summary" in r.keys():
description = r["summary"]
data = f"fields image_id;limit 1;where game={game_id};"
r = requests.get(cover_url, headers=headers, data=data).json()
poster_path = None
if r:
if "image_id" in r[0].keys():
poster_path = "https://images.igdb.com/igdb/image/upload/t_cover_big/" + r[0]["image_id"] + ".jpg"
windows = False
mac = False
linux = False
if "windows" in info.keys():
windows = True
if "mac" in info.keys():
mac = True
if "linux" in info.keys():
linux = True
game = (
info["name"],
game_id,
description,
poster_path,
str(root),
windows,
mac,
linux,
folder.name,
)
games.append(game)
i += 1
if i >= 5:
games_loaded.send("anonymous", games=games.copy())
games.clear()
i = 0
except Exception as e:
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
games_loaded.send("anonymous", games=games)
current_app.logger.info("finished loading games")
def get_game(path: pathlib.Path):
try:
games = []
cover_url = "https://api-v3.igdb.com/covers"
games_url = "https://api-v3.igdb.com/games"
headers = {
"accept": "application/json",
"user-key": "***REMOVED***",
}
if not path.name == "info.json":
return
else:
with path.open("r") as f:
info = json.load(f)
game_id = info["id"]
if database.game_in_db(game_id):
update_game(path)
else:
dir = path.parent
folder = path.parts[-2]
current_app.logger.info(f"start loading game: {info['name']}:{info['id']}")
data = f"fields summary;limit 1;where id={game_id};"
r = requests.get(games_url, headers=headers, data=data).json()[0]
description = ""
if "summary" in r.keys():
description = r["summary"]
data = f"fields image_id;limit 1;where game={game_id};"
r = requests.get(cover_url, headers=headers, data=data).json()
poster_path = None
if r:
if "image_id" in r[0].keys():
poster_path = "https://images.igdb.com/igdb/image/upload/t_cover_big/" + r[0]["image_id"] + ".jpg"
windows = False
mac = False
linux = False
if "windows" in info.keys():
windows = True
if "mac" in info.keys():
mac = True
if "linux" in info.keys():
linux = True
game = (
info["name"],
game_id,
description,
poster_path,
str(dir.relative_to(GAMES_DIRECTORY)),
windows,
mac,
linux,
folder,
)
games.append(game)
games_loaded.send("anonymous", games=games)
current_app.logger.info("finished loading game")
except Exception as e:
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
def update_games():
current_app.logger.info("start updating game data")
for folder in sorted(GAMES_DIRECTORY.iterdir()):
root = folder / "info.json"
update_game(root)
current_app.logger.info("finished updating game data")
def update_game(path: pathlib.Path):
try:
if path.name == "info.json" and path.exists():
with path.open("r") as f:
info = json.load(f)
game_id = info["id"]
windows = False
mac = False
linux = False
if "windows" in info.keys():
windows = True
if "mac" in info.keys():
mac = True
if "linux" in info.keys():
linux = True
database.update_game((game_id, windows, mac, linux))
except Exception as e:
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))