rpiwebapp-public/scripts/func.py

604 lines
25 KiB
Python
Raw Normal View History

2019-08-23 21:34:42 -07:00
import inspect
import json
import os
import pathlib
import re
from datetime import timedelta
from io import BytesIO
import enzyme
import requests
from blinker import Namespace
from comicapi import comicarchive
from flask import current_app
from wand.image import Image
2019-07-06 23:00:00 -07:00
from scripts import database
rpi_signals = Namespace()
comic_loaded = rpi_signals.signal("comic-loaded")
movie_loaded = rpi_signals.signal("movie-loaded")
2019-07-30 15:19:03 -07:00
tv_show_loaded = rpi_signals.signal("tv_show_loaded")
tv_episodes_loaded = rpi_signals.signal("tv_episodes_loaded")
games_loaded = rpi_signals.signal("games_loaded")
2019-07-06 23:00:00 -07:00
publishers_to_ignore = ["***REMOVED***"]
API_KEY = "***REMOVED***"
2019-07-06 23:00:00 -07:00
# Directories
RPI_COMICS_DIRECTORY = "/usb/storage/media/Comics/"
RPI_MOVIES_DIRECTORY = "/usb/storage/media/Videos/Movies/"
RPI_TV_SHOWS_DIRECTORY = "/usb/storage/media/Videos/TV/"
RPI_VIDEOS_DIRECTORY = "/usb/storage/media/Videos/Videos/"
RPI_GAMES_DIRECTORY = "/usb/storage/Games/"
RPI_MUSIC_DIRECTORY = "/usb/storage/media/Music/"
MC_COMICS_DIRECTORY = "/mnt/c/Users/Matthew/Documents/Comics/"
MC_MOVIES_DIRECTORY = "/mnt/c/Users/Matthew/Documents/Movies/"
MC_TV_SHOWS_DIRECTORY = "/mnt/c/Users/Matthew/Documents/TV/"
MC_GAMES_DIRECTORY = "/mnt/g/Humble Bundle/rpi/"
COMICS_DIRECTORY = RPI_COMICS_DIRECTORY if os.path.exists(RPI_COMICS_DIRECTORY) else MC_COMICS_DIRECTORY
MOVIES_DIRECTORY = RPI_MOVIES_DIRECTORY if os.path.exists(RPI_MOVIES_DIRECTORY) else MC_MOVIES_DIRECTORY
TV_SHOWS_DIRECTORY = RPI_TV_SHOWS_DIRECTORY if os.path.exists(RPI_TV_SHOWS_DIRECTORY) else MC_TV_SHOWS_DIRECTORY
GAMES_DIRECTORY = RPI_GAMES_DIRECTORY if os.path.exists(RPI_GAMES_DIRECTORY) else MC_GAMES_DIRECTORY
2019-07-06 23:00:00 -07:00
#############
def get_comics():
total_comics = 0
comics_in_db = 0
comics_added = 0
meta = []
thumbnails = []
i = 0
for root, dirs, files in os.walk(COMICS_DIRECTORY):
for f in files:
if "temp" in root:
continue
if f.endswith(".cbr"):
total_comics += 1
path = os.path.join(root, f)
if not database.comic_path_in_db(path):
try:
test_path = path.encode("utf8")
except Exception as e:
current_app.logger.info("encoding failed on: " + path)
continue
archive = open_comic(path)
md = archive.readCIX()
if md.publisher in publishers_to_ignore:
continue
current_app.logger.info(path)
try:
meta.append((path, md))
thumbnails.append(get_comic_thumbnails(archive))
comics_added += 1
i += 1
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
continue
if i >= 2:
comic_loaded.send("anonymous", meta=meta.copy(), thumbnails=thumbnails.copy())
meta.clear()
thumbnails.clear()
i = 0
comics_in_db += 1
current_app.logger.info("total number of comics: " + str(total_comics))
current_app.logger.info("comics in database: " + str(comics_in_db))
current_app.logger.info("number of comics added: " + str(comics_added))
comic_loaded.send("anonymous", meta=meta, thumbnails=thumbnails)
def get_comic(path: pathlib.Path):
meta = []
thumbnails = []
if path.suffix == ".cbr":
if not database.comic_path_in_db(str(path)):
try:
test_path = str(path).encode("utf8")
except Exception as e:
current_app.logger.info(f"encoding failed on: {path}")
return
archive = open_comic(str(path))
md = archive.readCIX()
if md.publisher in publishers_to_ignore:
return
current_app.logger.info(path)
meta.append((str(path), md))
try:
thumbnails.append(get_comic_thumbnails(archive))
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
return
comic_loaded.send("anonymous", meta=meta, thumbnails=thumbnails)
def get_comic_thumbnails(comic):
thumbnails = []
size = "256x256"
new_height = 256
new_width = 256
for page in range(comic.getNumberOfPages()):
image_bytes = BytesIO(comic.getPage(page))
image = Image(file=image_bytes)
orig_height = image.height
orig_width = image.width
if orig_height >= orig_width:
width = int((orig_width / orig_height) * new_height)
height = new_height
else:
height = int((orig_height / orig_width) * new_width)
width = new_width
image.thumbnail(width, height)
thumbnails.append((image.make_blob(), "image/" + image.format))
return thumbnails
def open_comic(path):
archive = comicarchive.ComicArchive(path, default_image_path="static/images/icon.png")
return archive
2019-07-06 23:00:00 -07:00
def get_movies():
current_app.logger.info("start loading movies")
pattern = r"(?P<title>.+) \((?P<year>\d+)\)(?P<extended>\(extended\))?(?P<directors_cut> Director's Cut)?(?P<extension>\.mkv)"
url = "https://api.themoviedb.org/3/search/movie"
movies = []
total_movies = 0
movies_in_db = 0
movies_added = 0
for root, dirs, files in os.walk(MOVIES_DIRECTORY):
for f in files:
if f.endswith(".mkv"):
total_movies += 1
path = os.path.join(root, f)
if not database.movie_path_in_db(path):
try:
match = re.match(pattern, f)
if not match:
current_app.logger.info(f + " did not match regex.")
continue
current_app.logger.info("movie path: " + path)
title = match.group("title")
current_app.logger.info("movie title: " + title)
year = int(match.group("year"))
extended = True if match.group("extended") else False
directors_cut = True if match.group("directors_cut") else False
data = {
"api_key": API_KEY,
"query": title,
"primary_release_year": year,
"language": "en-US",
}
r = requests.get(url, params=data)
if len(r.json()["results"]) == 0:
data = {
"api_key": API_KEY,
"query": title,
"year": year,
"language": "en-US",
}
r = requests.get(url, params=data)
if len(r.json()["results"]) == 0:
current_app.logger.info(f"no movie results for {title} - ({year})")
continue
info = r.json()["results"][0]
tmdb_id = info["id"]
description = info["overview"]
poster_path = info["poster_path"]
backdrop_path = info["backdrop_path"]
movies_added += 1
movies.append((path, tmdb_id, title, year, description, extended, directors_cut, poster_path, backdrop_path,))
if len(movies) >= 20:
movie_loaded.send("anonymous", movies=movies.copy())
movies.clear()
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
# print(e)
movies_in_db += 1
movie_loaded.send("anonymous", movies=movies)
current_app.logger.info("finish loading movies")
current_app.logger.info("total movies: " + str(total_movies))
current_app.logger.info("movies in database: " + str(movies_in_db))
current_app.logger.info("movies added: " + str(movies_added))
def get_movie(path: pathlib.Path):
pattern = r"(?P<title>.+) \((?P<year>\d+)\)(?P<extended>\(extended\))?(?P<directors_cut> Director's Cut)?(?P<extension>\.mkv)"
url = "https://api.themoviedb.org/3/search/movie"
movies = []
if not database.movie_path_in_db(str(path)):
try:
match = re.match(pattern, path.name)
if not match:
current_app.logger.info(f"{path.name} did not match regex.")
return
current_app.logger.info(f"movie path: {path}")
title = match.group("title")
current_app.logger.info("movie title: " + title)
year = int(match.group("year"))
extended = match.group("extended") is True
directors_cut = match.group("directors_cut") is True
data = {
"api_key": API_KEY,
"query": title,
"primary_release_year": year,
"language": "en-US",
}
r = requests.get(url, params=data)
if len(r.json()["results"]) == 0:
data = {
"api_key": API_KEY,
"query": title,
"year": year,
"language": "en-US",
}
r = requests.get(url, params=data)
info = r.json()["results"][0]
if len(r.json()["results"]) == 0:
current_app.logger.info(f"no movie results for {title} - ({year})")
return
tmdb_id = info["id"]
description = info["overview"]
poster_path = info["poster_path"]
backdrop_path = info["backdrop_path"]
movies.append((str(path), tmdb_id, title, year, description, extended, directors_cut, poster_path, backdrop_path,))
movie_loaded.send("anonymous", movies=movies.copy())
movies.clear()
current_app.logger.info("finish loading movie")
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
2019-07-30 15:19:03 -07:00
def get_tv_shows():
dir_pattern = r"(?P<title>.+) \((?P<year>\d+)\)"
search_url = "https://api.themoviedb.org/3/search/tv"
tv_url = "https://api.themoviedb.org/3/tv/"
current_app.logger.info("start loading tv shows")
for dir in sorted(os.listdir(TV_SHOWS_DIRECTORY)):
dir_match = re.match(dir_pattern, dir)
if dir_match:
path = TV_SHOWS_DIRECTORY + dir
if not database.tv_show_path_in_db(path):
json_info = {}
if os.path.exists(path + "/info.json"):
with open(path + "/info.json") as f:
json_info = json.load(f)
series_name = dir_match.group("title")
series_year = int(dir_match.group("year"))
if not json_info:
data = {
"api_key": API_KEY,
"query": series_name,
"first_air_date_year": series_year,
"language": "en-US",
}
r = requests.get(search_url, params=data)
if len(r.json()["results"]) == 0:
current_app.logger.info(f"no tv show results for {series_name} - ({series_year})")
continue
info = r.json()["results"][0]
else:
data = {"api_key": API_KEY, "language": "en-US"}
r = requests.get(tv_url + str(json_info["tmdb_id"]), params=data)
if "status_code" in r.json().keys():
current_app.logger.info(f"no tv show results for {series_name} - ({series_year})")
continue
info = r.json()
tmdb_id = info["id"]
description = info["overview"]
poster_path = info["poster_path"]
tv_show_data = (
tmdb_id,
series_name,
series_year,
description,
poster_path,
path,
)
tv_show_loaded.send("anonymous", tv_show=tv_show_data)
current_app.logger.info("finished loading tv shows.")
2019-07-30 15:19:03 -07:00
def get_tv_episodes():
video_pattern = r"S(?P<season>\d+)E(?P<episode>\d+) - (?P<title>.+)(?P<extension>.mkv)"
rows = database.get_all_tv_shows()
current_app.logger.info("start loading tv episodes")
for tv_show in rows:
try:
episodes = []
for video in sorted(os.listdir(tv_show.path)):
video_match = re.match(video_pattern, video)
if video_match:
path = os.path.join(tv_show.path, video)
if not database.tv_episode_path_in_db(path):
season = int(video_match.group("season"))
episode = int(video_match.group("episode"))
episode_name = video_match.group("title")
current_app.logger.info(f"S{season} E{episode} - {tv_show.title}: {episode_name}")
url = f"https://api.themoviedb.org/3/tv/{tv_show.tmdb_id}/season/{season}/episode/{episode}"
data = {"api_key": API_KEY, "language": "en-US"}
r = requests.get(url, params=data)
if "status_code" in r.json().keys():
current_app.logger.info(f"no tv episode results for S{season} E{episode} - {tv_show.title}: {episode_name}")
continue
info = r.json()
episode_tmdb_id = info["id"]
episode_description = info["overview"]
episode_still_path = info["still_path"]
episodes.append(
(episode_tmdb_id, tv_show.tmdb_id, episode_name, season, episode, episode_description, episode_still_path, path,)
)
if len(episodes) >= 10:
tv_episodes_loaded.send("anonymous", tv_episodes=episodes.copy())
episodes.clear()
tv_episodes_loaded.send("anonymous", tv_episodes=episodes)
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
current_app.logger.info("finished loading tv episodes")
def get_tv_episode(path: pathlib.Path):
video_pattern = r"S(?P<season>\d+)E(?P<episode>\d+) - (?P<title>.+)(?P<extension>.mkv)"
video_match = re.match(video_pattern, path.name)
if video_match:
rows = database.get_all_tv_shows()
for tv_show in rows:
if path.parent == tv_show.path:
if not database.tv_episode_path_in_db(str(path)):
episodes = []
season = int(video_match.group("season"))
episode = int(video_match.group("episode"))
episode_name = video_match.group("title")
url = f"https://api.themoviedb.org/3/tv/{tv_show.tmdb_id}/season/{season}/episode/{episode}"
data = {"api_key": API_KEY, "language": "en-US"}
r = requests.get(url, params=data)
if "status_code" in r.json().keys():
current_app.logger.info(f"no tv episode results for S{season} E{episode} - {tv_show.title}: {episode_name}")
continue
info = r.json()
episode_tmdb_id = info["id"]
episode_description = info["overview"]
episode_still_path = info["still_path"]
episodes.append(
(episode_tmdb_id, tv_show.tmdb_id, episode_name, season, episode, episode_description, episode_still_path, str(path),)
)
tv_episodes_loaded.send("anonymous", tv_episodes=episodes)
current_app.logger.info("finished loading tv episode")
def get_chapters(path):
try:
with open(path, "rb") as f:
mkv = enzyme.MKV(f)
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
return {}
mkv_info = {}
for chapter in mkv.chapters:
if chapter.string == "Intro":
mkv_info["intro"] = {
"start": chapter.start.seconds,
"end": timedelta(microseconds=chapter.end // 1000).seconds,
}
if chapter.string == "Credits":
mkv_info["credits"] = {"start": chapter.start.seconds}
if chapter.string == "end-credit scene":
if "end-credit scene" not in mkv_info.keys():
mkv_info["end-credit scene"] = []
end_credit = {"start": chapter.start.seconds}
if chapter.end:
end_credit["end"] = timedelta(microseconds=chapter.end // 1000).seconds
mkv_info["end-credit scene"].append(end_credit)
return mkv_info
def get_tags(path):
try:
with open(path, "rb") as f:
mkv = enzyme.MKV(f)
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
mkv_info = {}
for tag in mkv.tags:
if tag.targets.data[0].data == 70:
mkv_info["collection"] = {}
for simple in tag.simpletags:
if simple.name == "TITLE":
mkv_info["collection"]["title"] = simple.string
if simple.name == "TOTAL_PARTS":
mkv_info["collection"]["episodes"] = int(simple.string)
if simple.name == "KEYWORDS":
mkv_info["collection"]["key_words"] = simple.string.split(",")
if simple.name == "DATE_RELEASED":
mkv_info["collection"]["year"] = int(simple.string)
if simple.name == "SUMMARY":
mkv_info["collection"]["summary"] = simple.string
if tag.targets.data[0].data == 60:
mkv_info["season"] = {}
for simple in tag.simpletags:
if simple.name == "TITLE":
mkv_info["season"]["title"] = simple.string
if simple.name == "TOTAL_PARTS":
mkv_info["season"]["episodes"] = int(simple.string)
if tag.targets.data[0].data == 50:
mkv_info["movie"] = {}
for simple in tag.simpletags:
if simple.name == "TITLE":
mkv_info["movie"]["title"] = simple.string
if simple.name == "DATE_RELEASED":
mkv_info["movie"]["year"] = int(simple.string)
if simple.name == "PART_NUMBER":
mkv_info["movie"]["episode"] = int(simple.string)
if simple.name == "KEYWORDS":
mkv_info["movie"]["key_words"] = simple.string.split(",")
if simple.name == "SUMMARY":
mkv_info["movie"]["summary"] = simple.string
return mkv_info
def get_games():
games = []
cover_url = "https://api-v3.igdb.com/covers"
games_url = "https://api-v3.igdb.com/games"
headers = {
"accept": "application/json",
"user-key": "641f7f0e3af5273dcc1105ce851ea804",
}
i = 0
current_app.logger.info("start loading games")
for folder in sorted(os.listdir(GAMES_DIRECTORY), key=str.casefold):
root = os.path.join(GAMES_DIRECTORY, folder)
if os.path.isdir(os.path.join(root)):
try:
path = os.path.join(root, "info.json")
with open(path, "r") as f:
info = json.load(f)
game_id = info["id"]
if not database.game_in_db(game_id):
current_app.logger.info(f"start loading game: {info['name']}:{info['id']}")
data = f"fields summary;limit 1;where id={game_id};"
r = requests.get(games_url, headers=headers, data=data).json()[0]
description = ""
if "summary" in r.keys():
description = r["summary"]
data = f"fields image_id;limit 1;where game={game_id};"
r = requests.get(cover_url, headers=headers, data=data).json()
poster_path = None
if r:
if "image_id" in r[0].keys():
poster_path = "https://images.igdb.com/igdb/image/upload/t_cover_big/" + r[0]["image_id"] + ".jpg"
windows = False
mac = False
linux = False
if "windows" in info.keys():
windows = True
if "mac" in info.keys():
mac = True
if "linux" in info.keys():
linux = True
game = (
info["name"],
game_id,
description,
poster_path,
root,
windows,
mac,
linux,
folder,
)
games.append(game)
i += 1
if i >= 5:
games_loaded.send("anonymous", games=games.copy())
games.clear()
i = 0
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
games_loaded.send("anonymous", games=games)
current_app.logger.info("finished loading games")
def get_game(path: pathlib.Path):
try:
games = []
cover_url = "https://api-v3.igdb.com/covers"
games_url = "https://api-v3.igdb.com/games"
headers = {
"accept": "application/json",
"user-key": "***REMOVED***",
}
if not path.name == "info.json":
return
else:
with path.open("r") as f:
info = json.load(f)
game_id = info["id"]
if database.game_in_db(game_id):
update_game(path)
else:
dir = path.parent
folder = path.parts[-2]
current_app.logger.info(f"start loading game: {info['name']}:{info['id']}")
data = f"fields summary;limit 1;where id={game_id};"
r = requests.get(games_url, headers=headers, data=data).json()[0]
description = ""
if "summary" in r.keys():
description = r["summary"]
data = f"fields image_id;limit 1;where game={game_id};"
r = requests.get(cover_url, headers=headers, data=data).json()
poster_path = None
if r:
if "image_id" in r[0].keys():
poster_path = "https://images.igdb.com/igdb/image/upload/t_cover_big/" + r[0]["image_id"] + ".jpg"
windows = False
mac = False
linux = False
if "windows" in info.keys():
windows = True
if "mac" in info.keys():
mac = True
if "linux" in info.keys():
linux = True
game = (
info["name"],
game_id,
description,
poster_path,
str(dir),
windows,
mac,
linux,
folder,
)
games.append(game)
games_loaded.send("anonymous", games=games)
current_app.logger.info("finished loading game")
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))
def update_games():
current_app.logger.info("start updating game data")
for folder in sorted(os.listdir(GAMES_DIRECTORY), key=str.casefold):
root = pathlib.Path(GAMES_DIRECTORY, folder, "info.json")
update_game(root)
current_app.logger.info("finished updating game data")
def update_game(path: pathlib.Path):
try:
if path.name == "info.json" and path.exists():
with path.open("r") as f:
info = json.load(f)
game_id = info["id"]
windows = False
mac = False
linux = False
if "windows" in info.keys():
windows = True
if "mac" in info.keys():
mac = True
if "linux" in info.keys():
linux = True
database.update_game((game_id, windows, mac, linux))
except Exception as e:
current_app.logger.info(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))