rpiwebapp-public/func.py

452 lines
18 KiB
Python
Raw Normal View History

2019-08-23 21:34:42 -07:00
import inspect
import json
import os
import pathlib
import re
import enzyme
import pathvalidate
import requests
from datetime import timedelta
from blinker import Namespace
from comicapi import comicarchive
from flask import current_app
from wand.image import Image
2019-07-06 23:00:00 -07:00
import database
2019-07-06 23:00:00 -07:00
rpi_signals = Namespace()
comic_loaded = rpi_signals.signal("comic-loaded")
movie_loaded = rpi_signals.signal("movie-loaded")
2019-07-30 15:19:03 -07:00
tv_show_loaded = rpi_signals.signal("tv_show_loaded")
tv_episodes_loaded = rpi_signals.signal("tv_episodes_loaded")
games_loaded = rpi_signals.signal("games_loaded")
2019-07-06 23:00:00 -07:00
publishers_to_ignore = ["***REMOVED***"]
API_KEY = "***REMOVED***"
2019-07-06 23:00:00 -07:00
# Directories
COMICS_DIRECTORY = pathlib.Path("/srv/comics/")
MOVIES_DIRECTORY = pathlib.Path("/srv/movies/")
TV_SHOWS_DIRECTORY = pathlib.Path("/srv/tv/")
GAMES_DIRECTORY = pathlib.Path("/srv/games/")
2019-07-06 23:00:00 -07:00
#############
def get_comics():
total_comics = 0
comics_in_db = 0
comics_added = 0
for root, dirs, files in os.walk(COMICS_DIRECTORY):
for f in files:
if "temp" in root:
continue
if f.endswith((".cbr", ".cbz")):
total_comics += 1
absolute_path = pathlib.Path(root, f)
result = get_comic(absolute_path)
comics_added += result["added"]
comics_in_db += result["in_database"]
current_app.logger.info("total number of comics: " + str(total_comics))
current_app.logger.info("comics in database: " + str(comics_in_db))
current_app.logger.info("number of comics added: " + str(comics_added))
def get_comic(absolute_path: pathlib.Path):
meta = []
thumbnails = []
result = {"in_database": 0, "added": 0}
relative_path = absolute_path.relative_to(COMICS_DIRECTORY)
if relative_path.suffix in [".cbr", ".cbz"]:
if not database.comic_path_in_db(str(relative_path)):
try:
test_path = str(relative_path).encode("utf8")
except Exception as e:
current_app.logger.error(f"encoding failed on: {absolute_path}")
return result
archive = open_comic(relative_path)
md = archive.readCIX()
if md.publisher in publishers_to_ignore:
return result
current_app.logger.info(absolute_path)
meta.append((str(relative_path), md))
try:
thumbnails.append(get_comic_thumbnails(archive))
except Exception as e:
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
return result
comic_loaded.send("anonymous", meta=meta, thumbnails=thumbnails)
result["added"] = 1
return result
result["in_database"] = 1
return result
def get_comic_thumbnails(comic):
thumbnails = []
size = "256x256"
new_height = 256
new_width = 256
for page in range(comic.getNumberOfPages()):
with Image(blob=comic.getPage(page)) as image:
orig_height = image.height
orig_width = image.width
if orig_height >= orig_width:
width = int((orig_width / orig_height) * new_height)
height = new_height
else:
height = int((orig_height / orig_width) * new_width)
width = new_width
image.thumbnail(width, height)
thumbnails.append((image.make_blob(), "image/" + image.format))
return thumbnails
def open_comic(path):
archive = comicarchive.ComicArchive(str(COMICS_DIRECTORY / path), default_image_path="static/images/icon.png")
return archive
2019-07-06 23:00:00 -07:00
def get_movies():
current_app.logger.info("start loading movies")
total_movies = 0
movies_in_db = 0
movies_added = 0
for root, dirs, files in os.walk(MOVIES_DIRECTORY):
for f in files:
if f.endswith(".mkv"):
total_movies += 1
absolute_path = pathlib.Path(root, f)
result = get_movie(absolute_path)
movies_in_db += result["in_database"]
movies_added += result["added"]
current_app.logger.info("finish loading movies")
current_app.logger.info("total movies: " + str(total_movies))
current_app.logger.info("movies in database: " + str(movies_in_db))
current_app.logger.info("movies added: " + str(movies_added))
def get_movie(absolute_path: pathlib.Path):
pattern = r"(?P<title>.+) \((?P<year>\d+)\)(?P<extended>\(extended\))?(?P<directors_cut> Director's Cut)?(?P<extension>\.mkv)"
url = "https://api.themoviedb.org/3/search/movie"
relative_path = absolute_path.relative_to(MOVIES_DIRECTORY)
movies = []
result = {"in_database": 0, "added": 0}
if not database.movie_path_in_db(str(relative_path)):
try:
match = re.match(pattern, relative_path.name)
if not match:
current_app.logger.info(f"{absolute_path.name} did not match regex.")
return result
current_app.logger.info(f"movie path: {absolute_path}")
title = match.group("title")
current_app.logger.info("movie title: " + title)
year = int(match.group("year"))
extended = match.group("extended") is True
directors_cut = match.group("directors_cut") is True
res_4k = (absolute_path.parent / absolute_path.name.replace(f"({year})", f"({year})(4k)")).exists()
data = {
"api_key": API_KEY,
"query": title,
"primary_release_year": year,
"language": "en-US",
}
r = requests.get(url, params=data)
if len(r.json()["results"]) == 0:
data = {
"api_key": API_KEY,
"query": title,
"year": year,
"language": "en-US",
}
r = requests.get(url, params=data)
info = r.json()["results"][0]
if len(r.json()["results"]) == 0:
current_app.logger.info(f"no movie results for {title} - ({year})")
return result
tmdb_id = info["id"]
description = info["overview"]
poster_path = info["poster_path"]
backdrop_path = info["backdrop_path"]
movies.append((str(relative_path), tmdb_id, title, year, description, extended, directors_cut, poster_path, backdrop_path, res_4k,))
movie_loaded.send("anonymous", movies=movies.copy())
current_app.logger.info("finish loading movie")
result["added"] = 1
return result
except Exception as e:
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
result["in_database"] = 1
return result
2019-07-30 15:19:03 -07:00
def get_tv_shows():
dir_pattern = r"(?P<title>.+) \((?P<year>\d+)\)"
search_url = "https://api.themoviedb.org/3/search/tv"
tv_url = "https://api.themoviedb.org/3/tv/"
current_app.logger.info("start loading tv shows")
for dir in sorted(TV_SHOWS_DIRECTORY.iterdir()):
2021-07-10 13:46:20 -07:00
dir_match = re.match(dir_pattern, dir.name)
if dir_match:
absolute_path = TV_SHOWS_DIRECTORY / dir
relative_path = absolute_path.relative_to(COMICS_DIRECTORY)
if not database.tv_show_path_in_db(str(relative_path)):
json_info = {}
if (absolute_path / "info.json").exists():
with (absolute_path / "info.json").open() as f:
json_info = json.load(f)
series_name = dir_match.group("title")
series_year = int(dir_match.group("year"))
if not json_info:
data = {
"api_key": API_KEY,
"query": series_name,
"first_air_date_year": series_year,
"language": "en-US",
}
r = requests.get(search_url, params=data)
if len(r.json()["results"]) == 0:
current_app.logger.info(f"no tv show results for {series_name} - ({series_year})")
continue
info = r.json()["results"][0]
else:
data = {"api_key": API_KEY, "language": "en-US"}
r = requests.get(tv_url + str(json_info["tmdb_id"]), params=data)
if "status_code" in r.json().keys():
current_app.logger.info(f"no tv show results for {series_name} - ({series_year})")
continue
info = r.json()
tmdb_id = info["id"]
description = info["overview"]
poster_path = info["poster_path"]
tv_show_data = (
tmdb_id,
series_name,
series_year,
description,
poster_path,
str(relative_path),
)
tv_show_loaded.send("anonymous", tv_show=tv_show_data)
current_app.logger.info("finished loading tv shows.")
2019-07-30 15:19:03 -07:00
def get_tv_episodes():
video_pattern = r"S(?P<season>\d+)E(?P<episode>\d+) - (?P<title>.+)(?P<extension>.mkv)"
tv_shows = database.get_all_tv_shows()
current_app.logger.info("start loading tv episodes")
for tv_show in tv_shows:
tv_show_path = pathlib.Path(tv_show.path)
for video in sorted(tv_show_path.iterdir()):
video_match = re.match(video_pattern, str(video))
if video_match:
absolute_path = tv_show_path / video
get_tv_episode(absolute_path)
current_app.logger.info("finished loading tv episodes")
def get_tv_episode(absolute_path: pathlib.Path):
video_pattern = r"S(?P<season>\d+)E(?P<episode>\d+) - (?P<title>.+)(?P<extension>.mkv)"
video_match = re.match(video_pattern, absolute_path.name)
relative_path = absolute_path.relative_to(TV_SHOWS_DIRECTORY)
if video_match:
try:
rows = database.get_all_tv_shows()
for tv_show in rows:
if relative_path.parent == tv_show.path:
if not database.tv_episode_path_in_db(str(relative_path)):
episodes = []
season = int(video_match.group("season"))
episode = int(video_match.group("episode"))
episode_name = video_match.group("title")
current_app.logger.info(f"S{season} E{episode} - {tv_show.title}: {episode_name}")
url = f"https://api.themoviedb.org/3/tv/{tv_show.tmdb_id}/season/{season}/episode/{episode}"
data = {"api_key": API_KEY, "language": "en-US"}
r = requests.get(url, params=data)
if "status_code" in r.json().keys():
current_app.logger.info(f"no tv episode results for S{season} E{episode} - {tv_show.title}: {episode_name}")
continue
info = r.json()
episode_tmdb_id = info["id"]
episode_description = info["overview"]
episode_still_path = info["still_path"]
episodes.append(
(episode_tmdb_id, tv_show.tmdb_id, episode_name, season, episode, episode_description, episode_still_path, str(relative_path),)
)
tv_episodes_loaded.send("anonymous", tv_episodes=episodes)
current_app.logger.info("finished loading tv episode")
except Exception as e:
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
def get_chapters(path):
try:
with open(path, "rb") as f:
mkv = enzyme.MKV(f)
except Exception as e:
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
return {}
mkv_info = {}
for chapter in mkv.chapters:
if chapter.string == "Intro":
mkv_info["intro"] = {
"start": chapter.start.seconds,
"end": timedelta(microseconds=chapter.end // 1000).seconds,
}
if chapter.string == "Credits":
mkv_info["credits"] = {"start": chapter.start.seconds}
if chapter.string == "end-credit scene":
if "end-credit scene" not in mkv_info.keys():
mkv_info["end-credit scene"] = []
end_credit = {"start": chapter.start.seconds}
if chapter.end:
end_credit["end"] = timedelta(microseconds=chapter.end // 1000).seconds
mkv_info["end-credit scene"].append(end_credit)
return mkv_info
def get_tags(path):
try:
with open(path, "rb") as f:
mkv = enzyme.MKV(f)
except Exception as e:
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
mkv_info = {}
for tag in mkv.tags:
if tag.targets.data[0].data == 70:
mkv_info["collection"] = {}
for simple in tag.simpletags:
if simple.name == "TITLE":
mkv_info["collection"]["title"] = simple.string
if simple.name == "TOTAL_PARTS":
mkv_info["collection"]["episodes"] = int(simple.string)
if simple.name == "KEYWORDS":
mkv_info["collection"]["key_words"] = simple.string.split(",")
if simple.name == "DATE_RELEASED":
mkv_info["collection"]["year"] = int(simple.string)
if simple.name == "SUMMARY":
mkv_info["collection"]["summary"] = simple.string
if tag.targets.data[0].data == 60:
mkv_info["season"] = {}
for simple in tag.simpletags:
if simple.name == "TITLE":
mkv_info["season"]["title"] = simple.string
if simple.name == "TOTAL_PARTS":
mkv_info["season"]["episodes"] = int(simple.string)
if tag.targets.data[0].data == 50:
mkv_info["movie"] = {}
for simple in tag.simpletags:
if simple.name == "TITLE":
mkv_info["movie"]["title"] = simple.string
if simple.name == "DATE_RELEASED":
mkv_info["movie"]["year"] = int(simple.string)
if simple.name == "PART_NUMBER":
mkv_info["movie"]["episode"] = int(simple.string)
if simple.name == "KEYWORDS":
mkv_info["movie"]["key_words"] = simple.string.split(",")
if simple.name == "SUMMARY":
mkv_info["movie"]["summary"] = simple.string
return mkv_info
def get_games():
current_app.logger.info("start loading games")
for folder in sorted(GAMES_DIRECTORY.iterdir()):
absolute_path = folder.absolute()
get_game(absolute_path)
current_app.logger.info("finished loading games")
def get_game(path: pathlib.Path):
try:
games = []
cover_url = "https://api-v3.igdb.com/covers"
games_url = "https://api-v3.igdb.com/games"
headers = {
"accept": "application/json",
"user-key": "***REMOVED***",
}
if not path.name == "info.json":
return
else:
with path.open("r") as f:
info = json.load(f)
game_id = info["id"]
if database.game_in_db(game_id):
update_game(path)
else:
dir = path.parent
current_app.logger.info(f"start loading game: {info['name']}:{info['id']}")
data = f"fields summary;limit 1;where id={game_id};"
r = requests.get(games_url, headers=headers, data=data).json()[0]
description = ""
if "summary" in r.keys():
description = r["summary"]
data = f"fields image_id;limit 1;where game={game_id};"
r = requests.get(cover_url, headers=headers, data=data).json()
poster_path = None
if r:
if "image_id" in r[0].keys():
poster_path = "https://images.igdb.com/igdb/image/upload/t_cover_big/" + r[0]["image_id"] + ".jpg"
windows = False
mac = False
linux = False
if "windows" in info.keys():
windows = True
if "mac" in info.keys():
mac = True
if "linux" in info.keys():
linux = True
game = (
info["name"],
game_id,
description,
poster_path,
str(dir.relative_to(GAMES_DIRECTORY)),
windows,
mac,
linux,
pathvalidate.sanitize_filename(info["name"]),
)
games.append(game)
games_loaded.send("anonymous", games=games)
current_app.logger.info(f"finished loading game: {info['name']}:{info['id']}")
except Exception as e:
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
def update_games():
current_app.logger.info("start updating game data")
for folder in sorted(GAMES_DIRECTORY.iterdir()):
root = folder / "info.json"
update_game(root)
current_app.logger.info("finished updating game data")
def update_game(path: pathlib.Path):
try:
if path.name == "info.json" and path.exists():
with path.open("r") as f:
info = json.load(f)
game_id = info["id"]
windows = False
mac = False
linux = False
if "windows" in info.keys():
windows = True
if "mac" in info.keys():
mac = True
if "linux" in info.keys():
linux = True
database.update_game((game_id, windows, mac, linux))
except Exception as e:
current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))