import inspect import json import os import pathlib import re from datetime import timedelta import enzyme import requests from blinker import Namespace from comicapi import comicarchive from flask import current_app from wand.image import Image import database rpi_signals = Namespace() comic_loaded = rpi_signals.signal("comic-loaded") movie_loaded = rpi_signals.signal("movie-loaded") tv_show_loaded = rpi_signals.signal("tv_show_loaded") tv_episodes_loaded = rpi_signals.signal("tv_episodes_loaded") games_loaded = rpi_signals.signal("games_loaded") publishers_to_ignore = ["***REMOVED***"] API_KEY = "***REMOVED***" # Directories COMICS_DIRECTORY = pathlib.Path("/srv/comics/") MOVIES_DIRECTORY = pathlib.Path("/srv/movies/") TV_SHOWS_DIRECTORY = pathlib.Path("/srv/tv/") GAMES_DIRECTORY = pathlib.Path("/srv/games/") ############# def get_comics(): total_comics = 0 comics_in_db = 0 comics_added = 0 meta = [] thumbnails = [] i = 0 for root, dirs, files in os.walk(COMICS_DIRECTORY): for f in files: if "temp" in root: continue if f.endswith((".cbr", ".cbz")): total_comics += 1 path = pathlib.Path(root, f) if not database.comic_path_in_db(str(path)): try: test_path = path.encode("utf8") except Exception as e: current_app.logger.error("encoding failed on: " + str(path)) continue archive = open_comic(path) md = archive.readCIX() if md.publisher in publishers_to_ignore: continue current_app.logger.info(str(path)) try: meta.append((str(path), md)) thumbnails.append(get_comic_thumbnails(archive)) comics_added += 1 i += 1 except Exception as e: current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e)) continue if i >= 2: comic_loaded.send("anonymous", meta=meta.copy(), thumbnails=thumbnails.copy()) meta.clear() thumbnails.clear() i = 0 comics_in_db += 1 current_app.logger.info("total number of comics: " + str(total_comics)) current_app.logger.info("comics in database: " + str(comics_in_db)) current_app.logger.info("number of comics added: " + str(comics_added)) comic_loaded.send("anonymous", meta=meta, thumbnails=thumbnails) def get_comic(path: pathlib.Path): meta = [] thumbnails = [] if path.suffix == ".cbr": if not database.comic_path_in_db(str(path)): try: test_path = str(path).encode("utf8") except Exception as e: current_app.logger.error(f"encoding failed on: {path}") return archive = open_comic(path) md = archive.readCIX() if md.publisher in publishers_to_ignore: return current_app.logger.info(path) meta.append((str(path), md)) try: thumbnails.append(get_comic_thumbnails(archive)) except Exception as e: current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e)) return comic_loaded.send("anonymous", meta=meta, thumbnails=thumbnails) def get_comic_thumbnails(comic): thumbnails = [] size = "256x256" new_height = 256 new_width = 256 for page in range(comic.getNumberOfPages()): with Image(blob=comic.getPage(page)) as image: orig_height = image.height orig_width = image.width if orig_height >= orig_width: width = int((orig_width / orig_height) * new_height) height = new_height else: height = int((orig_height / orig_width) * new_width) width = new_width image.thumbnail(width, height) thumbnails.append((image.make_blob(), "image/" + image.format)) return thumbnails def open_comic(path): archive = comicarchive.ComicArchive(str(path), default_image_path="static/images/icon.png") return archive def get_movies(): current_app.logger.info("start loading movies") pattern = r"(?P.+) \((?P<year>\d+)\)(?P<extended>\(extended\))?(?P<directors_cut> Director's Cut)?(?P<extension>\.mkv)" url = "https://api.themoviedb.org/3/search/movie" movies = [] total_movies = 0 movies_in_db = 0 movies_added = 0 for root, dirs, files in os.walk(MOVIES_DIRECTORY): for f in files: if f.endswith(".mkv"): total_movies += 1 path = pathlib.Path(root, f) if not database.movie_path_in_db(str(path)): try: match = re.match(pattern, f) if not match: current_app.logger.info(f + " did not match regex.") continue current_app.logger.info("movie path: " + str(path)) title = match.group("title") current_app.logger.info("movie title: " + title) year = int(match.group("year")) extended = True if match.group("extended") else False directors_cut = True if match.group("directors_cut") else False res_4k_path = (path.parent / path.name.replace(f"({year})", f"({year})(4k)")) res_4k = res_4k_path.exists() data = { "api_key": API_KEY, "query": title, "primary_release_year": year, "language": "en-US", } r = requests.get(url, params=data) if len(r.json()["results"]) == 0: data = { "api_key": API_KEY, "query": title, "year": year, "language": "en-US", } r = requests.get(url, params=data) if len(r.json()["results"]) == 0: current_app.logger.info(f"no movie results for {title} - ({year})") continue info = r.json()["results"][0] tmdb_id = info["id"] description = info["overview"] poster_path = info["poster_path"] backdrop_path = info["backdrop_path"] movies_added += 1 movies.append((str(path), tmdb_id, title, year, description, extended, directors_cut, poster_path, backdrop_path, res_4k,)) if len(movies) >= 20: movie_loaded.send("anonymous", movies=movies.copy()) movies.clear() except Exception as e: current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e)) movies_in_db += 1 movie_loaded.send("anonymous", movies=movies) current_app.logger.info("finish loading movies") current_app.logger.info("total movies: " + str(total_movies)) current_app.logger.info("movies in database: " + str(movies_in_db)) current_app.logger.info("movies added: " + str(movies_added)) def get_movie(path: pathlib.Path): pattern = r"(?P<title>.+) \((?P<year>\d+)\)(?P<extended>\(extended\))?(?P<directors_cut> Director's Cut)?(?P<extension>\.mkv)" url = "https://api.themoviedb.org/3/search/movie" movies = [] if not database.movie_path_in_db(str(path)): try: match = re.match(pattern, path.name) if not match: current_app.logger.info(f"{path.name} did not match regex.") return current_app.logger.info(f"movie path: {path}") title = match.group("title") current_app.logger.info("movie title: " + title) year = int(match.group("year")) extended = match.group("extended") is True directors_cut = match.group("directors_cut") is True res_4k = (path.parent / path.name.replace(f"({year})", f"({year})(4k)")).exists() data = { "api_key": API_KEY, "query": title, "primary_release_year": year, "language": "en-US", } r = requests.get(url, params=data) if len(r.json()["results"]) == 0: data = { "api_key": API_KEY, "query": title, "year": year, "language": "en-US", } r = requests.get(url, params=data) info = r.json()["results"][0] if len(r.json()["results"]) == 0: current_app.logger.info(f"no movie results for {title} - ({year})") return tmdb_id = info["id"] description = info["overview"] poster_path = info["poster_path"] backdrop_path = info["backdrop_path"] movies.append((str(path), tmdb_id, title, year, description, extended, directors_cut, poster_path, backdrop_path, res_4k,)) movie_loaded.send("anonymous", movies=movies.copy()) movies.clear() current_app.logger.info("finish loading movie") except Exception as e: current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e)) def get_tv_shows(): dir_pattern = r"(?P<title>.+) \((?P<year>\d+)\)" search_url = "https://api.themoviedb.org/3/search/tv" tv_url = "https://api.themoviedb.org/3/tv/" current_app.logger.info("start loading tv shows") for dir in sorted(TV_SHOWS_DIRECTORY.iterdir()): dir_match = re.match(dir_pattern, dir.name) if dir_match: path = TV_SHOWS_DIRECTORY / dir if not database.tv_show_path_in_db(str(path)): json_info = {} if (path / "info.json").exists(): with (path / "info.json").open() as f: json_info = json.load(f) series_name = dir_match.group("title") series_year = int(dir_match.group("year")) if not json_info: data = { "api_key": API_KEY, "query": series_name, "first_air_date_year": series_year, "language": "en-US", } r = requests.get(search_url, params=data) if len(r.json()["results"]) == 0: current_app.logger.info(f"no tv show results for {series_name} - ({series_year})") continue info = r.json()["results"][0] else: data = {"api_key": API_KEY, "language": "en-US"} r = requests.get(tv_url + str(json_info["tmdb_id"]), params=data) if "status_code" in r.json().keys(): current_app.logger.info(f"no tv show results for {series_name} - ({series_year})") continue info = r.json() tmdb_id = info["id"] description = info["overview"] poster_path = info["poster_path"] tv_show_data = ( tmdb_id, series_name, series_year, description, poster_path, str(path), ) tv_show_loaded.send("anonymous", tv_show=tv_show_data) current_app.logger.info("finished loading tv shows.") def get_tv_episodes(): video_pattern = r"S(?P<season>\d+)E(?P<episode>\d+) - (?P<title>.+)(?P<extension>.mkv)" rows = database.get_all_tv_shows() current_app.logger.info("start loading tv episodes") for tv_show in rows: try: episodes = [] tv_show_path = pathlib.Path(tv_show.path) for video in sorted(tv_show_path.iterdir()): video_match = re.match(video_pattern, str(video)) if video_match: path = tv_show_path / video if not database.tv_episode_path_in_db(str(path)): season = int(video_match.group("season")) episode = int(video_match.group("episode")) episode_name = video_match.group("title") current_app.logger.info(f"S{season} E{episode} - {tv_show.title}: {episode_name}") url = f"https://api.themoviedb.org/3/tv/{tv_show.tmdb_id}/season/{season}/episode/{episode}" data = {"api_key": API_KEY, "language": "en-US"} r = requests.get(url, params=data) if "status_code" in r.json().keys(): current_app.logger.info(f"no tv episode results for S{season} E{episode} - {tv_show.title}: {episode_name}") continue info = r.json() episode_tmdb_id = info["id"] episode_description = info["overview"] episode_still_path = info["still_path"] episodes.append( (episode_tmdb_id, tv_show.tmdb_id, episode_name, season, episode, episode_description, episode_still_path, str(path),) ) if len(episodes) >= 10: tv_episodes_loaded.send("anonymous", tv_episodes=episodes.copy()) episodes.clear() tv_episodes_loaded.send("anonymous", tv_episodes=episodes) except Exception as e: current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e)) current_app.logger.info("finished loading tv episodes") def get_tv_episode(path: pathlib.Path): video_pattern = r"S(?P<season>\d+)E(?P<episode>\d+) - (?P<title>.+)(?P<extension>.mkv)" video_match = re.match(video_pattern, path.name) if video_match: rows = database.get_all_tv_shows() for tv_show in rows: if path.parent == tv_show.path: if not database.tv_episode_path_in_db(str(path)): episodes = [] season = int(video_match.group("season")) episode = int(video_match.group("episode")) episode_name = video_match.group("title") url = f"https://api.themoviedb.org/3/tv/{tv_show.tmdb_id}/season/{season}/episode/{episode}" data = {"api_key": API_KEY, "language": "en-US"} r = requests.get(url, params=data) if "status_code" in r.json().keys(): current_app.logger.info(f"no tv episode results for S{season} E{episode} - {tv_show.title}: {episode_name}") continue info = r.json() episode_tmdb_id = info["id"] episode_description = info["overview"] episode_still_path = info["still_path"] episodes.append( (episode_tmdb_id, tv_show.tmdb_id, episode_name, season, episode, episode_description, episode_still_path, str(path),) ) tv_episodes_loaded.send("anonymous", tv_episodes=episodes) current_app.logger.info("finished loading tv episode") def get_chapters(path): try: with open(path, "rb") as f: mkv = enzyme.MKV(f) except Exception as e: current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e)) return {} mkv_info = {} for chapter in mkv.chapters: if chapter.string == "Intro": mkv_info["intro"] = { "start": chapter.start.seconds, "end": timedelta(microseconds=chapter.end // 1000).seconds, } if chapter.string == "Credits": mkv_info["credits"] = {"start": chapter.start.seconds} if chapter.string == "end-credit scene": if "end-credit scene" not in mkv_info.keys(): mkv_info["end-credit scene"] = [] end_credit = {"start": chapter.start.seconds} if chapter.end: end_credit["end"] = timedelta(microseconds=chapter.end // 1000).seconds mkv_info["end-credit scene"].append(end_credit) return mkv_info def get_tags(path): try: with open(path, "rb") as f: mkv = enzyme.MKV(f) except Exception as e: current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e)) mkv_info = {} for tag in mkv.tags: if tag.targets.data[0].data == 70: mkv_info["collection"] = {} for simple in tag.simpletags: if simple.name == "TITLE": mkv_info["collection"]["title"] = simple.string if simple.name == "TOTAL_PARTS": mkv_info["collection"]["episodes"] = int(simple.string) if simple.name == "KEYWORDS": mkv_info["collection"]["key_words"] = simple.string.split(",") if simple.name == "DATE_RELEASED": mkv_info["collection"]["year"] = int(simple.string) if simple.name == "SUMMARY": mkv_info["collection"]["summary"] = simple.string if tag.targets.data[0].data == 60: mkv_info["season"] = {} for simple in tag.simpletags: if simple.name == "TITLE": mkv_info["season"]["title"] = simple.string if simple.name == "TOTAL_PARTS": mkv_info["season"]["episodes"] = int(simple.string) if tag.targets.data[0].data == 50: mkv_info["movie"] = {} for simple in tag.simpletags: if simple.name == "TITLE": mkv_info["movie"]["title"] = simple.string if simple.name == "DATE_RELEASED": mkv_info["movie"]["year"] = int(simple.string) if simple.name == "PART_NUMBER": mkv_info["movie"]["episode"] = int(simple.string) if simple.name == "KEYWORDS": mkv_info["movie"]["key_words"] = simple.string.split(",") if simple.name == "SUMMARY": mkv_info["movie"]["summary"] = simple.string return mkv_info def get_games(): games = [] cover_url = "https://api-v3.igdb.com/covers" games_url = "https://api-v3.igdb.com/games" headers = { "accept": "application/json", "user-key": "641f7f0e3af5273dcc1105ce851ea804", } i = 0 current_app.logger.info("start loading games") for folder in sorted(GAMES_DIRECTORY.iterdir()): root = folder.absolute() if root.is_dir(): try: path = root / "info.json" with path.open() as f: info = json.load(f) game_id = info["id"] if not database.game_in_db(game_id): current_app.logger.info(f"start loading game: {info['name']}:{info['id']}") data = f"fields summary;limit 1;where id={game_id};" r = requests.get(games_url, headers=headers, data=data).json()[0] description = "" if "summary" in r.keys(): description = r["summary"] data = f"fields image_id;limit 1;where game={game_id};" r = requests.get(cover_url, headers=headers, data=data).json() poster_path = None if r: if "image_id" in r[0].keys(): poster_path = "https://images.igdb.com/igdb/image/upload/t_cover_big/" + r[0]["image_id"] + ".jpg" windows = False mac = False linux = False if "windows" in info.keys(): windows = True if "mac" in info.keys(): mac = True if "linux" in info.keys(): linux = True game = ( info["name"], game_id, description, poster_path, str(root), windows, mac, linux, folder.name, ) games.append(game) i += 1 if i >= 5: games_loaded.send("anonymous", games=games.copy()) games.clear() i = 0 except Exception as e: current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e)) games_loaded.send("anonymous", games=games) current_app.logger.info("finished loading games") def get_game(path: pathlib.Path): try: games = [] cover_url = "https://api-v3.igdb.com/covers" games_url = "https://api-v3.igdb.com/games" headers = { "accept": "application/json", "user-key": "***REMOVED***", } if not path.name == "info.json": return else: with path.open("r") as f: info = json.load(f) game_id = info["id"] if database.game_in_db(game_id): update_game(path) else: dir = path.parent folder = path.parts[-2] current_app.logger.info(f"start loading game: {info['name']}:{info['id']}") data = f"fields summary;limit 1;where id={game_id};" r = requests.get(games_url, headers=headers, data=data).json()[0] description = "" if "summary" in r.keys(): description = r["summary"] data = f"fields image_id;limit 1;where game={game_id};" r = requests.get(cover_url, headers=headers, data=data).json() poster_path = None if r: if "image_id" in r[0].keys(): poster_path = "https://images.igdb.com/igdb/image/upload/t_cover_big/" + r[0]["image_id"] + ".jpg" windows = False mac = False linux = False if "windows" in info.keys(): windows = True if "mac" in info.keys(): mac = True if "linux" in info.keys(): linux = True game = ( info["name"], game_id, description, poster_path, str(dir.relative_to(GAMES_DIRECTORY)), windows, mac, linux, folder, ) games.append(game) games_loaded.send("anonymous", games=games) current_app.logger.info("finished loading game") except Exception as e: current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e)) def update_games(): current_app.logger.info("start updating game data") for folder in sorted(GAMES_DIRECTORY.iterdir()): root = folder / "info.json" update_game(root) current_app.logger.info("finished updating game data") def update_game(path: pathlib.Path): try: if path.name == "info.json" and path.exists(): with path.open("r") as f: info = json.load(f) game_id = info["id"] windows = False mac = False linux = False if "windows" in info.keys(): windows = True if "mac" in info.keys(): mac = True if "linux" in info.keys(): linux = True database.update_game((game_id, windows, mac, linux)) except Exception as e: current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))