import inspect import json import os import pathlib import re from datetime import timedelta import enzyme import pathvalidate import requests from blinker import Namespace from comicapi import comicarchive from flask import current_app from wand.image import Image import database rpi_signals = Namespace() comic_loaded = rpi_signals.signal("comic-loaded") movie_loaded = rpi_signals.signal("movie-loaded") tv_show_loaded = rpi_signals.signal("tv_show_loaded") tv_episodes_loaded = rpi_signals.signal("tv_episodes_loaded") games_loaded = rpi_signals.signal("games_loaded") publishers_to_ignore = ["***REMOVED***"] API_KEY = "***REMOVED***" # Directories COMICS_DIRECTORY = pathlib.Path("/srv/comics/") MOVIES_DIRECTORY = pathlib.Path("/srv/movies/") TV_SHOWS_DIRECTORY = pathlib.Path("/srv/tv/") GAMES_DIRECTORY = pathlib.Path("/srv/games/") ############# def get_comics(): total_comics = 0 comics_in_db = 0 comics_added = 0 for root, dirs, files in os.walk(COMICS_DIRECTORY): for f in files: if "temp" in root: continue if f.endswith((".cbr", ".cbz")): total_comics += 1 absolute_path = pathlib.Path(root, f) result = get_comic(absolute_path) comics_added += result["added"] comics_in_db += result["in_database"] current_app.logger.info("total number of comics: " + str(total_comics)) current_app.logger.info("comics in database: " + str(comics_in_db)) current_app.logger.info("number of comics added: " + str(comics_added)) def get_comic(absolute_path: pathlib.Path): meta = [] thumbnails = [] result = {"in_database": 0, "added": 0} relative_path = absolute_path.relative_to(COMICS_DIRECTORY) if relative_path.suffix in [".cbr", ".cbz"]: if not database.comic_path_in_db(str(relative_path)): try: test_path = str(relative_path).encode("utf8") except Exception as e: current_app.logger.error(f"encoding failed on: {absolute_path}") return result archive = open_comic(relative_path) md = archive.readCIX() if md.publisher in publishers_to_ignore: return result current_app.logger.info(absolute_path) meta.append((str(relative_path), md)) try: thumbnails.append(get_comic_thumbnails(archive)) except Exception as e: current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e)) return result comic_loaded.send("anonymous", meta=meta, thumbnails=thumbnails) result["added"] = 1 return result result["in_database"] = 1 return result def get_comic_thumbnails(comic): thumbnails = [] size = "256x256" new_height = 256 new_width = 256 for page in range(comic.getNumberOfPages()): with Image(blob=comic.getPage(page)) as image: orig_height = image.height orig_width = image.width if orig_height >= orig_width: width = int((orig_width / orig_height) * new_height) height = new_height else: height = int((orig_height / orig_width) * new_width) width = new_width image.thumbnail(width, height) thumbnails.append((image.make_blob(), "image/" + image.format)) return thumbnails def open_comic(path): archive = comicarchive.ComicArchive(str(COMICS_DIRECTORY / path), default_image_path="static/images/icon.png") return archive def get_movies(): current_app.logger.info("start loading movies") total_movies = 0 movies_in_db = 0 movies_added = 0 for root, dirs, files in os.walk(MOVIES_DIRECTORY): for f in files: if f.endswith(".mkv"): total_movies += 1 absolute_path = pathlib.Path(root, f) result = get_movie(absolute_path) movies_in_db += result["in_database"] movies_added += result["added"] current_app.logger.info("finish loading movies") current_app.logger.info("total movies: " + str(total_movies)) current_app.logger.info("movies in database: " + str(movies_in_db)) current_app.logger.info("movies added: " + str(movies_added)) def get_movie(absolute_path: pathlib.Path): pattern = r"(?P.+) \((?P<year>\d+)\)(?P<extended>\(extended\))?(?P<directors_cut> Director's Cut)?(?P<extension>\.mkv)" url = "https://api.themoviedb.org/3/search/movie" relative_path = absolute_path.relative_to(MOVIES_DIRECTORY) movies = [] result = {"in_database": 0, "added": 0} if not database.movie_path_in_db(str(relative_path)): try: match = re.match(pattern, relative_path.name) if not match: current_app.logger.info(f"{absolute_path.name} did not match regex.") return result current_app.logger.info(f"movie path: {absolute_path}") title = match.group("title") current_app.logger.info("movie title: " + title) year = int(match.group("year")) extended = match.group("extended") is True directors_cut = match.group("directors_cut") is True res_4k = (absolute_path.parent / absolute_path.name.replace(f"({year})", f"({year})(4k)")).exists() data = { "api_key": API_KEY, "query": title, "primary_release_year": year, "language": "en-US", } r = requests.get(url, params=data) if len(r.json()["results"]) == 0: data = { "api_key": API_KEY, "query": title, "year": year, "language": "en-US", } r = requests.get(url, params=data) info = r.json()["results"][0] if len(r.json()["results"]) == 0: current_app.logger.info(f"no movie results for {title} - ({year})") return result tmdb_id = info["id"] description = info["overview"] poster_path = info["poster_path"] backdrop_path = info["backdrop_path"] movies.append( [ str(relative_path), tmdb_id, title, year, description, extended, directors_cut, poster_path, backdrop_path, res_4k, ] ) movie_loaded.send("anonymous", movies=movies.copy()) current_app.logger.info("finish loading movie") result["added"] = 1 return result except Exception as e: current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e)) result["in_database"] = 1 return result def get_tv_shows(): dir_pattern = r"(?P<title>.+) \((?P<year>\d+)\)" search_url = "https://api.themoviedb.org/3/search/tv" tv_url = "https://api.themoviedb.org/3/tv/" current_app.logger.info("start loading tv shows") for dir in sorted(TV_SHOWS_DIRECTORY.iterdir()): dir_match = re.match(dir_pattern, dir.name) if dir_match: absolute_path = TV_SHOWS_DIRECTORY / dir relative_path = absolute_path.relative_to(TV_SHOWS_DIRECTORY) if not database.tv_show_path_in_db(str(relative_path)): json_info = {} if (absolute_path / "info.json").exists(): with (absolute_path / "info.json").open() as f: json_info = json.load(f) series_name = dir_match.group("title") series_year = int(dir_match.group("year")) if not json_info: data = { "api_key": API_KEY, "query": series_name, "first_air_date_year": series_year, "language": "en-US", } r = requests.get(search_url, params=data) if len(r.json()["results"]) == 0: current_app.logger.info(f"no tv show results for {series_name} - ({series_year})") continue info = r.json()["results"][0] else: data = {"api_key": API_KEY, "language": "en-US"} r = requests.get(tv_url + str(json_info["tmdb_id"]), params=data) if "status_code" in r.json().keys(): current_app.logger.info(f"no tv show results for {series_name} - ({series_year})") continue info = r.json() tmdb_id = info["id"] description = info["overview"] poster_path = info["poster_path"] tv_show_data = [ tmdb_id, series_name, series_year, description, poster_path, str(relative_path), ] tv_show_loaded.send("anonymous", tv_show=tv_show_data) current_app.logger.info("finished loading tv shows.") def get_tv_episodes(): video_pattern = r"S(?P<season>\d+)E(?P<episode>\d+) - (?P<title>.+)(?P<extension>.mkv)" tv_shows = database.get_all_tv_shows() current_app.logger.info("start loading tv episodes") for tv_show in tv_shows: tv_show_path = pathlib.Path(tv_show.path) for video in sorted((TV_SHOWS_DIRECTORY / tv_show_path).iterdir()): video_match = re.match(video_pattern, str(video)) if video_match: absolute_path = tv_show_path / video get_tv_episode(absolute_path) current_app.logger.info("finished loading tv episodes") def get_tv_episode(absolute_path: pathlib.Path): video_pattern = r"S(?P<season>\d+)E(?P<episode>\d+) - (?P<title>.+)(?P<extension>.mkv)" video_match = re.match(video_pattern, absolute_path.name) relative_path = absolute_path.relative_to(TV_SHOWS_DIRECTORY) if video_match: try: rows = database.get_all_tv_shows() for tv_show in rows: if relative_path.parent == tv_show.path: if not database.tv_episode_path_in_db(str(relative_path)): episodes = [] season = int(video_match.group("season")) episode = int(video_match.group("episode")) episode_name = video_match.group("title") current_app.logger.info(f"S{season} E{episode} - {tv_show.title}: {episode_name}") url = f"https://api.themoviedb.org/3/tv/{tv_show.tmdb_id}/season/{season}/episode/{episode}" data = {"api_key": API_KEY, "language": "en-US"} r = requests.get(url, params=data) if "status_code" in r.json().keys(): current_app.logger.info(f"no tv episode results for S{season} E{episode} - {tv_show.title}: {episode_name}") continue info = r.json() episode_tmdb_id = info["id"] episode_description = info["overview"] episode_still_path = info["still_path"] episodes.append( [ episode_tmdb_id, tv_show.tmdb_id, episode_name, season, episode, episode_description, episode_still_path, str(relative_path), ] ) tv_episodes_loaded.send("anonymous", tv_episodes=episodes) current_app.logger.info("finished loading tv episode") except Exception as e: current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e)) def get_chapters(path): try: with open(path, "rb") as f: mkv = enzyme.MKV(f) except Exception as e: current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e)) return {} mkv_info = {} for chapter in mkv.chapters: if chapter.string == "Intro": mkv_info["intro"] = { "start": chapter.start.seconds, "end": timedelta(microseconds=chapter.end // 1000).seconds, } if chapter.string == "Credits": mkv_info["credits"] = {"start": chapter.start.seconds} if chapter.string == "end-credit scene": if "end-credit scene" not in mkv_info.keys(): mkv_info["end-credit scene"] = [] end_credit = {"start": chapter.start.seconds} if chapter.end: end_credit["end"] = timedelta(microseconds=chapter.end // 1000).seconds mkv_info["end-credit scene"].append(end_credit) return mkv_info def get_tags(path): try: with open(path, "rb") as f: mkv = enzyme.MKV(f) except Exception as e: current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e)) mkv_info = {} for tag in mkv.tags: if tag.targets.data[0].data == 70: mkv_info["collection"] = {} for simple in tag.simpletags: if simple.name == "TITLE": mkv_info["collection"]["title"] = simple.string if simple.name == "TOTAL_PARTS": mkv_info["collection"]["episodes"] = int(simple.string) if simple.name == "KEYWORDS": mkv_info["collection"]["key_words"] = simple.string.split(",") if simple.name == "DATE_RELEASED": mkv_info["collection"]["year"] = int(simple.string) if simple.name == "SUMMARY": mkv_info["collection"]["summary"] = simple.string if tag.targets.data[0].data == 60: mkv_info["season"] = {} for simple in tag.simpletags: if simple.name == "TITLE": mkv_info["season"]["title"] = simple.string if simple.name == "TOTAL_PARTS": mkv_info["season"]["episodes"] = int(simple.string) if tag.targets.data[0].data == 50: mkv_info["movie"] = {} for simple in tag.simpletags: if simple.name == "TITLE": mkv_info["movie"]["title"] = simple.string if simple.name == "DATE_RELEASED": mkv_info["movie"]["year"] = int(simple.string) if simple.name == "PART_NUMBER": mkv_info["movie"]["episode"] = int(simple.string) if simple.name == "KEYWORDS": mkv_info["movie"]["key_words"] = simple.string.split(",") if simple.name == "SUMMARY": mkv_info["movie"]["summary"] = simple.string return mkv_info def get_games(): current_app.logger.info("start loading games") for folder in sorted(GAMES_DIRECTORY.iterdir()): absolute_path = folder.absolute() get_game(absolute_path) current_app.logger.info("finished loading games") def get_game(path: pathlib.Path): try: games = [] cover_url = "https://api-v3.igdb.com/covers" games_url = "https://api-v3.igdb.com/games" headers = { "accept": "application/json", "user-key": "***REMOVED***", } if not path.name == "info.json": return else: with path.open("r") as f: info = json.load(f) game_id = info["id"] if database.game_in_db(game_id): update_game(path) else: dir = path.parent current_app.logger.info(f"start loading game: {info['name']}:{info['id']}") data = f"fields summary;limit 1;where id={game_id};" r = requests.get(games_url, headers=headers, data=data).json()[0] description = "" if "summary" in r.keys(): description = r["summary"] data = f"fields image_id;limit 1;where game={game_id};" r = requests.get(cover_url, headers=headers, data=data).json() poster_path = None if r: if "image_id" in r[0].keys(): poster_path = "https://images.igdb.com/igdb/image/upload/t_cover_big/" + r[0]["image_id"] + ".jpg" windows = False mac = False linux = False if "windows" in info.keys(): windows = True if "mac" in info.keys(): mac = True if "linux" in info.keys(): linux = True game = [ info["name"], game_id, description, poster_path, str(dir.relative_to(GAMES_DIRECTORY)), windows, mac, linux, pathvalidate.sanitize_filename(info["name"]), ] games.append(game) games_loaded.send("anonymous", games=games) current_app.logger.info(f"finished loading game: {info['name']}:{info['id']}") except Exception as e: current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e)) def update_games(): current_app.logger.info("start updating game data") for folder in sorted(GAMES_DIRECTORY.iterdir()): root = folder / "info.json" update_game(root) current_app.logger.info("finished updating game data") def update_game(path: pathlib.Path): try: if path.name == "info.json" and path.exists(): with path.open("r") as f: info = json.load(f) game_id = info["id"] windows = False mac = False linux = False if "windows" in info.keys(): windows = True if "mac" in info.keys(): mac = True if "linux" in info.keys(): linux = True database.update_game((game_id, windows, mac, linux)) except Exception as e: current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e))