From 4665a17a3f2e1c8fcc184395173025e3f9740dfd Mon Sep 17 00:00:00 2001 From: Matthew Welch Date: Sat, 22 Jan 2022 20:37:52 -0800 Subject: [PATCH] Refactor func to remove duplicate code Change database to use relative instead of absolute paths --- func.py | 295 +++++++++++------------------------------ requirements.txt | 4 +- tv_movies/tv_movies.py | 8 +- 3 files changed, 82 insertions(+), 225 deletions(-) diff --git a/func.py b/func.py index 2e91a48..c4191d2 100644 --- a/func.py +++ b/func.py @@ -3,10 +3,10 @@ import json import os import pathlib import re -from datetime import timedelta - import enzyme +import pathvalidate import requests +from datetime import timedelta from blinker import Namespace from comicapi import comicarchive from flask import current_app @@ -39,69 +39,49 @@ def get_comics(): total_comics = 0 comics_in_db = 0 comics_added = 0 - meta = [] - thumbnails = [] - i = 0 for root, dirs, files in os.walk(COMICS_DIRECTORY): for f in files: if "temp" in root: continue if f.endswith((".cbr", ".cbz")): total_comics += 1 - path = pathlib.Path(root, f) - if not database.comic_path_in_db(str(path)): - try: - test_path = path.encode("utf8") - except Exception as e: - current_app.logger.error("encoding failed on: " + str(path)) - continue - archive = open_comic(path) - md = archive.readCIX() - if md.publisher in publishers_to_ignore: - continue - current_app.logger.info(str(path)) - try: - meta.append((str(path), md)) - thumbnails.append(get_comic_thumbnails(archive)) - comics_added += 1 - i += 1 - except Exception as e: - current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e)) - continue - if i >= 2: - comic_loaded.send("anonymous", meta=meta.copy(), thumbnails=thumbnails.copy()) - meta.clear() - thumbnails.clear() - i = 0 - comics_in_db += 1 + absolute_path = pathlib.Path(root, f) + result = get_comic(absolute_path) + comics_added += result["added"] + comics_in_db += result["in_database"] current_app.logger.info("total number of comics: " + str(total_comics)) current_app.logger.info("comics in database: " + str(comics_in_db)) current_app.logger.info("number of comics added: " + str(comics_added)) - comic_loaded.send("anonymous", meta=meta, thumbnails=thumbnails) -def get_comic(path: pathlib.Path): +def get_comic(absolute_path: pathlib.Path): meta = [] thumbnails = [] - if path.suffix == ".cbr": - if not database.comic_path_in_db(str(path)): + result = {"in_database": 0, "added": 0} + relative_path = absolute_path.relative_to(COMICS_DIRECTORY) + if relative_path.suffix in [".cbr", ".cbz"]: + if not database.comic_path_in_db(str(relative_path)): try: - test_path = str(path).encode("utf8") + test_path = str(relative_path).encode("utf8") except Exception as e: - current_app.logger.error(f"encoding failed on: {path}") - return - archive = open_comic(path) + current_app.logger.error(f"encoding failed on: {absolute_path}") + return result + archive = open_comic(relative_path) md = archive.readCIX() if md.publisher in publishers_to_ignore: - return - current_app.logger.info(path) - meta.append((str(path), md)) + return result + current_app.logger.info(absolute_path) + meta.append((str(relative_path), md)) try: thumbnails.append(get_comic_thumbnails(archive)) except Exception as e: current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e)) - return + return result comic_loaded.send("anonymous", meta=meta, thumbnails=thumbnails) + result["added"] = 1 + return result + result["in_database"] = 1 + return result def get_comic_thumbnails(comic): @@ -125,15 +105,12 @@ def get_comic_thumbnails(comic): def open_comic(path): - archive = comicarchive.ComicArchive(str(path), default_image_path="static/images/icon.png") + archive = comicarchive.ComicArchive(str(COMICS_DIRECTORY / path), default_image_path="static/images/icon.png") return archive def get_movies(): current_app.logger.info("start loading movies") - pattern = r"(?P.+) \((?P<year>\d+)\)(?P<extended>\(extended\))?(?P<directors_cut> Director's Cut)?(?P<extension>\.mkv)" - url = "https://api.themoviedb.org/3/search/movie" - movies = [] total_movies = 0 movies_in_db = 0 movies_added = 0 @@ -141,79 +118,35 @@ def get_movies(): for f in files: if f.endswith(".mkv"): total_movies += 1 - path = pathlib.Path(root, f) - if not database.movie_path_in_db(str(path)): - try: - match = re.match(pattern, f) - if not match: - current_app.logger.info(f + " did not match regex.") - continue - current_app.logger.info("movie path: " + str(path)) - title = match.group("title") - current_app.logger.info("movie title: " + title) - year = int(match.group("year")) - extended = True if match.group("extended") else False - directors_cut = True if match.group("directors_cut") else False - res_4k_path = (path.parent / path.name.replace(f"({year})", f"({year})(4k)")) - res_4k = res_4k_path.exists() - - data = { - "api_key": API_KEY, - "query": title, - "primary_release_year": year, - "language": "en-US", - } - r = requests.get(url, params=data) - if len(r.json()["results"]) == 0: - data = { - "api_key": API_KEY, - "query": title, - "year": year, - "language": "en-US", - } - r = requests.get(url, params=data) - if len(r.json()["results"]) == 0: - current_app.logger.info(f"no movie results for {title} - ({year})") - continue - info = r.json()["results"][0] - - tmdb_id = info["id"] - description = info["overview"] - poster_path = info["poster_path"] - backdrop_path = info["backdrop_path"] - movies_added += 1 - - movies.append((str(path), tmdb_id, title, year, description, extended, directors_cut, poster_path, backdrop_path, res_4k,)) - if len(movies) >= 20: - movie_loaded.send("anonymous", movies=movies.copy()) - movies.clear() - except Exception as e: - current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e)) - movies_in_db += 1 - movie_loaded.send("anonymous", movies=movies) + absolute_path = pathlib.Path(root, f) + result = get_movie(absolute_path) + movies_in_db += result["in_database"] + movies_added += result["added"] current_app.logger.info("finish loading movies") current_app.logger.info("total movies: " + str(total_movies)) current_app.logger.info("movies in database: " + str(movies_in_db)) current_app.logger.info("movies added: " + str(movies_added)) -def get_movie(path: pathlib.Path): +def get_movie(absolute_path: pathlib.Path): pattern = r"(?P<title>.+) \((?P<year>\d+)\)(?P<extended>\(extended\))?(?P<directors_cut> Director's Cut)?(?P<extension>\.mkv)" url = "https://api.themoviedb.org/3/search/movie" + relative_path = absolute_path.relative_to(MOVIES_DIRECTORY) movies = [] - if not database.movie_path_in_db(str(path)): + result = {"in_database": 0, "added": 0} + if not database.movie_path_in_db(str(relative_path)): try: - match = re.match(pattern, path.name) + match = re.match(pattern, relative_path.name) if not match: - current_app.logger.info(f"{path.name} did not match regex.") - return - current_app.logger.info(f"movie path: {path}") + current_app.logger.info(f"{absolute_path.name} did not match regex.") + return result + current_app.logger.info(f"movie path: {absolute_path}") title = match.group("title") current_app.logger.info("movie title: " + title) year = int(match.group("year")) extended = match.group("extended") is True directors_cut = match.group("directors_cut") is True - res_4k = (path.parent / path.name.replace(f"({year})", f"({year})(4k)")).exists() + res_4k = (absolute_path.parent / absolute_path.name.replace(f"({year})", f"({year})(4k)")).exists() data = { "api_key": API_KEY, @@ -233,19 +166,22 @@ def get_movie(path: pathlib.Path): info = r.json()["results"][0] if len(r.json()["results"]) == 0: current_app.logger.info(f"no movie results for {title} - ({year})") - return + return result tmdb_id = info["id"] description = info["overview"] poster_path = info["poster_path"] backdrop_path = info["backdrop_path"] - movies.append((str(path), tmdb_id, title, year, description, extended, directors_cut, poster_path, backdrop_path, res_4k,)) + movies.append((str(relative_path), tmdb_id, title, year, description, extended, directors_cut, poster_path, backdrop_path, res_4k,)) movie_loaded.send("anonymous", movies=movies.copy()) - movies.clear() current_app.logger.info("finish loading movie") + result["added"] = 1 + return result except Exception as e: current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e)) + result["in_database"] = 1 + return result def get_tv_shows(): @@ -256,11 +192,12 @@ def get_tv_shows(): for dir in sorted(TV_SHOWS_DIRECTORY.iterdir()): dir_match = re.match(dir_pattern, dir.name) if dir_match: - path = TV_SHOWS_DIRECTORY / dir - if not database.tv_show_path_in_db(str(path)): + absolute_path = TV_SHOWS_DIRECTORY / dir + relative_path = absolute_path.relative_to(COMICS_DIRECTORY) + if not database.tv_show_path_in_db(str(relative_path)): json_info = {} - if (path / "info.json").exists(): - with (path / "info.json").open() as f: + if (absolute_path / "info.json").exists(): + with (absolute_path / "info.json").open() as f: json_info = json.load(f) series_name = dir_match.group("title") series_year = int(dir_match.group("year")) @@ -294,7 +231,7 @@ def get_tv_shows(): series_year, description, poster_path, - str(path), + str(relative_path), ) tv_show_loaded.send("anonymous", tv_show=tv_show_data) current_app.logger.info("finished loading tv shows.") @@ -302,17 +239,29 @@ def get_tv_shows(): def get_tv_episodes(): video_pattern = r"S(?P<season>\d+)E(?P<episode>\d+) - (?P<title>.+)(?P<extension>.mkv)" - rows = database.get_all_tv_shows() + tv_shows = database.get_all_tv_shows() current_app.logger.info("start loading tv episodes") - for tv_show in rows: + for tv_show in tv_shows: + tv_show_path = pathlib.Path(tv_show.path) + for video in sorted(tv_show_path.iterdir()): + video_match = re.match(video_pattern, str(video)) + if video_match: + absolute_path = tv_show_path / video + get_tv_episode(absolute_path) + current_app.logger.info("finished loading tv episodes") + + +def get_tv_episode(absolute_path: pathlib.Path): + video_pattern = r"S(?P<season>\d+)E(?P<episode>\d+) - (?P<title>.+)(?P<extension>.mkv)" + video_match = re.match(video_pattern, absolute_path.name) + relative_path = absolute_path.relative_to(TV_SHOWS_DIRECTORY) + if video_match: try: - episodes = [] - tv_show_path = pathlib.Path(tv_show.path) - for video in sorted(tv_show_path.iterdir()): - video_match = re.match(video_pattern, str(video)) - if video_match: - path = tv_show_path / video - if not database.tv_episode_path_in_db(str(path)): + rows = database.get_all_tv_shows() + for tv_show in rows: + if relative_path.parent == tv_show.path: + if not database.tv_episode_path_in_db(str(relative_path)): + episodes = [] season = int(video_match.group("season")) episode = int(video_match.group("episode")) episode_name = video_match.group("title") @@ -330,46 +279,12 @@ def get_tv_episodes(): episode_description = info["overview"] episode_still_path = info["still_path"] episodes.append( - (episode_tmdb_id, tv_show.tmdb_id, episode_name, season, episode, episode_description, episode_still_path, str(path),) + (episode_tmdb_id, tv_show.tmdb_id, episode_name, season, episode, episode_description, episode_still_path, str(relative_path),) ) - if len(episodes) >= 10: - tv_episodes_loaded.send("anonymous", tv_episodes=episodes.copy()) - episodes.clear() - tv_episodes_loaded.send("anonymous", tv_episodes=episodes) + tv_episodes_loaded.send("anonymous", tv_episodes=episodes) + current_app.logger.info("finished loading tv episode") except Exception as e: current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e)) - current_app.logger.info("finished loading tv episodes") - - -def get_tv_episode(path: pathlib.Path): - video_pattern = r"S(?P<season>\d+)E(?P<episode>\d+) - (?P<title>.+)(?P<extension>.mkv)" - video_match = re.match(video_pattern, path.name) - if video_match: - rows = database.get_all_tv_shows() - for tv_show in rows: - if path.parent == tv_show.path: - if not database.tv_episode_path_in_db(str(path)): - episodes = [] - season = int(video_match.group("season")) - episode = int(video_match.group("episode")) - episode_name = video_match.group("title") - url = f"https://api.themoviedb.org/3/tv/{tv_show.tmdb_id}/season/{season}/episode/{episode}" - - data = {"api_key": API_KEY, "language": "en-US"} - r = requests.get(url, params=data) - if "status_code" in r.json().keys(): - current_app.logger.info(f"no tv episode results for S{season} E{episode} - {tv_show.title}: {episode_name}") - continue - info = r.json() - - episode_tmdb_id = info["id"] - episode_description = info["overview"] - episode_still_path = info["still_path"] - episodes.append( - (episode_tmdb_id, tv_show.tmdb_id, episode_name, season, episode, episode_description, episode_still_path, str(path),) - ) - tv_episodes_loaded.send("anonymous", tv_episodes=episodes) - current_app.logger.info("finished loading tv episode") def get_chapters(path): @@ -443,65 +358,10 @@ def get_tags(path): def get_games(): - games = [] - cover_url = "https://api-v3.igdb.com/covers" - games_url = "https://api-v3.igdb.com/games" - headers = { - "accept": "application/json", - "user-key": "641f7f0e3af5273dcc1105ce851ea804", - } - i = 0 current_app.logger.info("start loading games") for folder in sorted(GAMES_DIRECTORY.iterdir()): - root = folder.absolute() - if root.is_dir(): - try: - path = root / "info.json" - with path.open() as f: - info = json.load(f) - game_id = info["id"] - if not database.game_in_db(game_id): - current_app.logger.info(f"start loading game: {info['name']}:{info['id']}") - data = f"fields summary;limit 1;where id={game_id};" - r = requests.get(games_url, headers=headers, data=data).json()[0] - description = "" - if "summary" in r.keys(): - description = r["summary"] - data = f"fields image_id;limit 1;where game={game_id};" - r = requests.get(cover_url, headers=headers, data=data).json() - poster_path = None - if r: - if "image_id" in r[0].keys(): - poster_path = "https://images.igdb.com/igdb/image/upload/t_cover_big/" + r[0]["image_id"] + ".jpg" - windows = False - mac = False - linux = False - if "windows" in info.keys(): - windows = True - if "mac" in info.keys(): - mac = True - if "linux" in info.keys(): - linux = True - game = ( - info["name"], - game_id, - description, - poster_path, - str(root), - windows, - mac, - linux, - folder.name, - ) - games.append(game) - i += 1 - if i >= 5: - games_loaded.send("anonymous", games=games.copy()) - games.clear() - i = 0 - except Exception as e: - current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e)) - games_loaded.send("anonymous", games=games) + absolute_path = folder.absolute() + get_game(absolute_path) current_app.logger.info("finished loading games") @@ -524,7 +384,6 @@ def get_game(path: pathlib.Path): update_game(path) else: dir = path.parent - folder = path.parts[-2] current_app.logger.info(f"start loading game: {info['name']}:{info['id']}") data = f"fields summary;limit 1;where id={game_id};" r = requests.get(games_url, headers=headers, data=data).json()[0] @@ -555,11 +414,11 @@ def get_game(path: pathlib.Path): windows, mac, linux, - folder, + pathvalidate.sanitize_filename(info["name"]), ) games.append(game) games_loaded.send("anonymous", games=games) - current_app.logger.info("finished loading game") + current_app.logger.info(f"finished loading game: {info['name']}:{info['id']}") except Exception as e: current_app.logger.error(inspect.stack()[0][3] + " " + str(type(e)) + " " + str(e)) diff --git a/requirements.txt b/requirements.txt index 3f3e421..2b6b34f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,4 +11,6 @@ Wand~=0.6.5 SQLAlchemy~=1.3.22 inotify~=0.2.10 oauthlib~=3.1.0 -git+https://github.com/lordwelch/comictagger#egg=comictagger \ No newline at end of file +git+https://github.com/lordwelch/comictagger#egg=comictagger +pathvalidate~=2.4.1 +future~=0.18.2 \ No newline at end of file diff --git a/tv_movies/tv_movies.py b/tv_movies/tv_movies.py index e11f4b5..531cf09 100644 --- a/tv_movies/tv_movies.py +++ b/tv_movies/tv_movies.py @@ -168,9 +168,7 @@ def get_movie(tmdb_id): directors_cut = request.args.get("directors_cut", default=False, type=bool) res_4k = request.args.get("res_4k", default=False, type=bool) movie_data = database.db_get_movie_by_tmdb_id(tmdb_id, extended=extended, directors_cut=directors_cut) - filename: str = movie_data.path.replace(str(func.MOVIES_DIRECTORY), "") - if filename.startswith("/"): - filename = filename[1:] + filename: str = movie_data.path if res_4k: filename = filename.replace(f"({movie_data.year})", f"({movie_data.year})(4K)") response = make_response(send_from_directory(func.MOVIES_DIRECTORY, filename)) @@ -182,9 +180,7 @@ def get_movie(tmdb_id): @login_required def get_episode(tmdb_id): episode_data = database.db_get_episode_by_tmdb_id(tmdb_id) - filename: str = episode_data.path.replace(str(func.TV_SHOWS_DIRECTORY), "") - if filename.startswith("/"): - filename = filename[1:] + filename: str = episode_data.path response = make_response(send_from_directory(func.TV_SHOWS_DIRECTORY, filename)) response.headers["content-type"] = "video/webm" return response