diff --git a/comicapi/comicarchive.py b/comicapi/comicarchive.py index 0a7822d..6fd92d9 100644 --- a/comicapi/comicarchive.py +++ b/comicapi/comicarchive.py @@ -16,12 +16,15 @@ from __future__ import annotations import io +import itertools import logging import os import pathlib import shutil import sys import traceback +from collections.abc import Sequence +from typing import TYPE_CHECKING from comicapi import utils from comicapi.archivers import Archiver, UnknownArchiver, ZipArchiver @@ -29,32 +32,37 @@ from comicapi.genericmetadata import GenericMetadata from comicapi.metadata import Metadata from comictaggerlib.ctversion import version +if TYPE_CHECKING: + from importlib.metadata import EntryPoint + logger = logging.getLogger(__name__) archivers: list[type[Archiver]] = [] metadata_styles: dict[str, Metadata] = {} -def load_archive_plugins() -> None: +def load_archive_plugins(local_plugins: Sequence[EntryPoint] = tuple()) -> None: if not archivers: if sys.version_info < (3, 10): from importlib_metadata import entry_points else: from importlib.metadata import entry_points builtin: list[type[Archiver]] = [] - for arch in entry_points(group="comicapi.archiver"): + # A list is used first matching plugin wins + for ep in itertools.chain(local_plugins, entry_points(group="comicapi.archiver")): + logger.warning(ep) try: - archiver: type[Archiver] = arch.load() - if arch.module.startswith("comicapi"): + archiver: type[Archiver] = ep.load() + if ep.module.startswith("comicapi"): builtin.append(archiver) else: archivers.append(archiver) except Exception: - logger.exception("Failed to load archive plugin: %s", arch.name) + logger.exception("Failed to load archive plugin: %s", ep.name) archivers.extend(builtin) -def load_metadata_plugins(version: str = f"ComicAPI/{version}") -> None: +def load_metadata_plugins(version: str = f"ComicAPI/{version}", local_plugins: Sequence[EntryPoint] = tuple()) -> None: if not metadata_styles: if sys.version_info < (3, 10): from importlib_metadata import entry_points @@ -62,22 +70,24 @@ def load_metadata_plugins(version: str = f"ComicAPI/{version}") -> None: from importlib.metadata import entry_points builtin: dict[str, Metadata] = {} styles: dict[str, Metadata] = {} - for arch in entry_points(group="comicapi.metadata"): + # A dict is used, last plugin wins + for ep in itertools.chain(entry_points(group="comicapi.metadata"), local_plugins): + logger.warning(ep) try: - style: type[Metadata] = arch.load() + style: type[Metadata] = ep.load() if style.enabled: - if arch.module.startswith("comicapi"): + if ep.module.startswith("comicapi"): builtin[style.short_name] = style(version) else: if style.short_name in styles: logger.warning( "Plugin %s is overriding the existing metadata plugin for %s tags", - arch.module, + ep.module, style.short_name, ) styles[style.short_name] = style(version) except Exception: - logger.exception("Failed to load metadata plugin: %s", arch.name) + logger.exception("Failed to load metadata plugin: %s", ep.name) for style_name in set(builtin.keys()).intersection(styles): logger.warning("Builtin metadata for %s tags are being overridden by a plugin", style_name) metadata_styles.clear() diff --git a/comictaggerlib/ctsettings/plugin_finder.py b/comictaggerlib/ctsettings/plugin_finder.py new file mode 100644 index 0000000..0c93920 --- /dev/null +++ b/comictaggerlib/ctsettings/plugin_finder.py @@ -0,0 +1,153 @@ +"""Functions related to finding and loading plugins.""" + +# Lifted from flake8 https://github.com/PyCQA/flake8/blob/main/src/flake8/plugins/finder.py#L127 + +from __future__ import annotations + +import configparser +import importlib.metadata +import logging +import pathlib +import re +from collections.abc import Generator +from typing import Any, NamedTuple + +LOG = logging.getLogger(__name__) + +NORMALIZE_PACKAGE_NAME_RE = re.compile(r"[-_.]+") +PLUGIN_GROUPS = frozenset(("comictagger.talker", "comicapi.archiver", "comicapi.metadata")) + + +class FailedToLoadPlugin(Exception): + """Exception raised when a plugin fails to load.""" + + FORMAT = 'ComicTagger failed to load local plugin "{name}" due to {exc}.' + + def __init__(self, plugin_name: str, exception: Exception) -> None: + """Initialize our FailedToLoadPlugin exception.""" + self.plugin_name = plugin_name + self.original_exception = exception + super().__init__(plugin_name, exception) + + def __str__(self) -> str: + """Format our exception message.""" + return self.FORMAT.format( + name=self.plugin_name, + exc=self.original_exception, + ) + + +def normalize_pypi_name(s: str) -> str: + """Normalize a distribution name according to PEP 503.""" + return NORMALIZE_PACKAGE_NAME_RE.sub("-", s).lower() + + +class Plugin(NamedTuple): + """A plugin before loading.""" + + package: str + version: str + entry_point: importlib.metadata.EntryPoint + path: pathlib.Path + + +class LoadedPlugin(NamedTuple): + """Represents a plugin after being imported.""" + + plugin: Plugin + obj: Any + + @property + def entry_name(self) -> str: + """Return the name given in the packaging metadata.""" + return self.plugin.entry_point.name + + @property + def display_name(self) -> str: + """Return the name for use in user-facing / error messages.""" + return f"{self.plugin.package}[{self.entry_name}]" + + +class Plugins(NamedTuple): + """Classified plugins.""" + + archivers: list[Plugin] + metadata: list[Plugin] + talkers: list[Plugin] + + def all_plugins(self) -> Generator[Plugin, None, None]: + """Return an iterator over all :class:`LoadedPlugin`s.""" + yield from self.archivers + yield from self.metadata + yield from self.talkers + + def versions_str(self) -> str: + """Return a user-displayed list of plugin versions.""" + return ", ".join(sorted({f"{plugin.package}: {plugin.version}" for plugin in self.all_plugins()})) + + +def _find_local_plugins(plugin_path: pathlib.Path) -> Generator[Plugin, None, None]: + + cfg = configparser.ConfigParser(interpolation=None) + cfg.read(plugin_path / "setup.cfg") + + for group in PLUGIN_GROUPS: + for plugin_s in cfg.get("options.entry_points", group, fallback="").splitlines(): + if not plugin_s: + continue + + name, _, entry_str = plugin_s.partition("=") + name, entry_str = name.strip(), entry_str.strip() + ep = importlib.metadata.EntryPoint(name, entry_str, group) + yield Plugin(plugin_path.name, cfg.get("metadata", "version", fallback="0.0.1"), ep, plugin_path) + + +def _check_required_plugins(plugins: list[Plugin], expected: frozenset[str]) -> None: + plugin_names = {normalize_pypi_name(plugin.package) for plugin in plugins} + expected_names = {normalize_pypi_name(name) for name in expected} + missing_plugins = expected_names - plugin_names + + if missing_plugins: + raise Exception( + "required plugins were not installed!\n" + + f"- installed: {', '.join(sorted(plugin_names))}\n" + + f"- expected: {', '.join(sorted(expected_names))}\n" + + f"- missing: {', '.join(sorted(missing_plugins))}" + ) + + +def find_plugins(plugin_folder: pathlib.Path) -> Plugins: + """Discovers all plugins (but does not load them).""" + ret: list[Plugin] = [] + for plugin_path in plugin_folder.glob("*/setup.cfg"): + try: + ret.extend(_find_local_plugins(plugin_path.parent)) + except Exception as err: + FailedToLoadPlugin(plugin_path.parent.name, err) + + # for determinism, sort the list + ret.sort() + + return _classify_plugins(ret) + + +def _classify_plugins(plugins: list[Plugin]) -> Plugins: + archivers = [] + metadata = [] + talkers = [] + + for p in plugins: + if p.entry_point.group == "comictagger.talker": + talkers.append(p) + elif p.entry_point.group == "comicapi.metadata": + metadata.append(p) + elif p.entry_point.group == "comicapi.archiver": + archivers.append(p) + else: + LOG.warning(NotImplementedError(f"what plugin type? {p}")) + + return Plugins( + metadata=metadata, + archivers=archivers, + talkers=talkers, + ) diff --git a/comictaggerlib/ctsettings/types.py b/comictaggerlib/ctsettings/types.py index d8ac1c6..668248f 100644 --- a/comictaggerlib/ctsettings/types.py +++ b/comictaggerlib/ctsettings/types.py @@ -49,6 +49,13 @@ class ComicTaggerPaths(AppDirs): return path return pathlib.Path(super().user_log_dir) + @property + def user_plugin_dir(self) -> pathlib.Path: + if self.path: + path = self.path / "plugins" + return path + return pathlib.Path(super().user_config_dir) + @property def site_data_dir(self) -> pathlib.Path: return pathlib.Path(super().site_data_dir) diff --git a/comictaggerlib/main.py b/comictaggerlib/main.py index 3b749b0..d99365f 100644 --- a/comictaggerlib/main.py +++ b/comictaggerlib/main.py @@ -34,7 +34,7 @@ import comicapi.comicarchive import comicapi.utils import comictalker from comictaggerlib import cli, ctsettings -from comictaggerlib.ctsettings import ct_ns +from comictaggerlib.ctsettings import ct_ns, plugin_finder from comictaggerlib.ctversion import version from comictaggerlib.log import setup_logging from comictaggerlib.resulttypes import Action @@ -123,9 +123,20 @@ class App: self.main() def load_plugins(self, opts: argparse.Namespace) -> None: - comicapi.comicarchive.load_archive_plugins() - comicapi.comicarchive.load_metadata_plugins(version=version) - self.talkers = comictalker.get_talkers(version, opts.config.user_cache_dir) + local_plugins = plugin_finder.find_plugins(opts.config.user_plugin_dir) + self._extend_plugin_paths(local_plugins) + logger.warning(sys.path) + + comicapi.comicarchive.load_archive_plugins(local_plugins=[p.entry_point for p in local_plugins.archivers]) + comicapi.comicarchive.load_metadata_plugins( + version=version, local_plugins=[p.entry_point for p in local_plugins.metadata] + ) + self.talkers = comictalker.get_talkers( + version, opts.config.user_cache_dir, local_plugins=[p.entry_point for p in local_plugins.talkers] + ) + + def _extend_plugin_paths(self, plugins: plugin_finder.Plugins) -> None: + sys.path.extend(str(p.path.absolute()) for p in plugins.all_plugins()) def list_plugins( self, @@ -228,16 +239,14 @@ class App: return config def initialize_dirs(self, paths: ctsettings.ComicTaggerPaths) -> None: - paths.user_data_dir.mkdir(parents=True, exist_ok=True) paths.user_config_dir.mkdir(parents=True, exist_ok=True) paths.user_cache_dir.mkdir(parents=True, exist_ok=True) - paths.user_state_dir.mkdir(parents=True, exist_ok=True) paths.user_log_dir.mkdir(parents=True, exist_ok=True) - logger.debug("user_data_dir: %s", paths.user_data_dir) + paths.user_plugin_dir.mkdir(parents=True, exist_ok=True) logger.debug("user_config_dir: %s", paths.user_config_dir) logger.debug("user_cache_dir: %s", paths.user_cache_dir) - logger.debug("user_state_dir: %s", paths.user_state_dir) logger.debug("user_log_dir: %s", paths.user_log_dir) + logger.debug("user_plugin_dir: %s", paths.user_plugin_dir) def main(self) -> None: assert self.config is not None diff --git a/comictalker/__init__.py b/comictalker/__init__.py index 540d023..070e472 100644 --- a/comictalker/__init__.py +++ b/comictalker/__init__.py @@ -1,15 +1,17 @@ from __future__ import annotations +import itertools import logging import pathlib import sys +from collections.abc import Sequence from packaging.version import InvalidVersion, parse if sys.version_info < (3, 10): - from importlib_metadata import entry_points + from importlib_metadata import EntryPoint, entry_points else: - from importlib.metadata import entry_points + from importlib.metadata import entry_points, EntryPoint from comictalker.comictalker import ComicTalker, TalkerError @@ -21,12 +23,15 @@ __all__ = [ ] -def get_talkers(version: str, cache: pathlib.Path) -> dict[str, ComicTalker]: +def get_talkers( + version: str, cache: pathlib.Path, local_plugins: Sequence[EntryPoint] = tuple() +) -> dict[str, ComicTalker]: """Returns all comic talker instances""" talkers: dict[str, ComicTalker] = {} ct_version = parse(version) - for talker in entry_points(group="comictagger.talker"): + # A dict is used, last plugin wins + for talker in itertools.chain(entry_points(group="comictagger.talker"), local_plugins): try: talker_cls = talker.load() obj = talker_cls(version, cache) diff --git a/tests/conftest.py b/tests/conftest.py index d2a4a44..16e36d0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -189,11 +189,10 @@ def config(tmp_path): app.register_settings() defaults = app.parse_settings(comictaggerlib.ctsettings.ComicTaggerPaths(tmp_path / "config"), "") - defaults[0].Runtime_Options__config.user_data_dir.mkdir(parents=True, exist_ok=True) defaults[0].Runtime_Options__config.user_config_dir.mkdir(parents=True, exist_ok=True) defaults[0].Runtime_Options__config.user_cache_dir.mkdir(parents=True, exist_ok=True) - defaults[0].Runtime_Options__config.user_state_dir.mkdir(parents=True, exist_ok=True) defaults[0].Runtime_Options__config.user_log_dir.mkdir(parents=True, exist_ok=True) + defaults[0].Runtime_Options__config.user_plugin_dir.mkdir(parents=True, exist_ok=True) yield defaults @@ -207,11 +206,10 @@ def plugin_config(tmp_path): app.register_settings() defaults = app.parse_settings(ns.config, "") - defaults[0].Runtime_Options__config.user_data_dir.mkdir(parents=True, exist_ok=True) defaults[0].Runtime_Options__config.user_config_dir.mkdir(parents=True, exist_ok=True) defaults[0].Runtime_Options__config.user_cache_dir.mkdir(parents=True, exist_ok=True) - defaults[0].Runtime_Options__config.user_state_dir.mkdir(parents=True, exist_ok=True) defaults[0].Runtime_Options__config.user_log_dir.mkdir(parents=True, exist_ok=True) + defaults[0].Runtime_Options__config.user_plugin_dir.mkdir(parents=True, exist_ok=True) yield (defaults, app.talkers)