Implement local plugins

This commit is contained in:
Timmy Welch 2024-02-10 21:00:24 -08:00
parent 69615c6c07
commit 8ec16528ab
6 changed files with 209 additions and 27 deletions

View File

@ -16,12 +16,15 @@
from __future__ import annotations
import io
import itertools
import logging
import os
import pathlib
import shutil
import sys
import traceback
from collections.abc import Sequence
from typing import TYPE_CHECKING
from comicapi import utils
from comicapi.archivers import Archiver, UnknownArchiver, ZipArchiver
@ -29,32 +32,37 @@ from comicapi.genericmetadata import GenericMetadata
from comicapi.metadata import Metadata
from comictaggerlib.ctversion import version
if TYPE_CHECKING:
from importlib.metadata import EntryPoint
logger = logging.getLogger(__name__)
archivers: list[type[Archiver]] = []
metadata_styles: dict[str, Metadata] = {}
def load_archive_plugins() -> None:
def load_archive_plugins(local_plugins: Sequence[EntryPoint] = tuple()) -> None:
if not archivers:
if sys.version_info < (3, 10):
from importlib_metadata import entry_points
else:
from importlib.metadata import entry_points
builtin: list[type[Archiver]] = []
for arch in entry_points(group="comicapi.archiver"):
# A list is used first matching plugin wins
for ep in itertools.chain(local_plugins, entry_points(group="comicapi.archiver")):
logger.warning(ep)
try:
archiver: type[Archiver] = arch.load()
if arch.module.startswith("comicapi"):
archiver: type[Archiver] = ep.load()
if ep.module.startswith("comicapi"):
builtin.append(archiver)
else:
archivers.append(archiver)
except Exception:
logger.exception("Failed to load archive plugin: %s", arch.name)
logger.exception("Failed to load archive plugin: %s", ep.name)
archivers.extend(builtin)
def load_metadata_plugins(version: str = f"ComicAPI/{version}") -> None:
def load_metadata_plugins(version: str = f"ComicAPI/{version}", local_plugins: Sequence[EntryPoint] = tuple()) -> None:
if not metadata_styles:
if sys.version_info < (3, 10):
from importlib_metadata import entry_points
@ -62,22 +70,24 @@ def load_metadata_plugins(version: str = f"ComicAPI/{version}") -> None:
from importlib.metadata import entry_points
builtin: dict[str, Metadata] = {}
styles: dict[str, Metadata] = {}
for arch in entry_points(group="comicapi.metadata"):
# A dict is used, last plugin wins
for ep in itertools.chain(entry_points(group="comicapi.metadata"), local_plugins):
logger.warning(ep)
try:
style: type[Metadata] = arch.load()
style: type[Metadata] = ep.load()
if style.enabled:
if arch.module.startswith("comicapi"):
if ep.module.startswith("comicapi"):
builtin[style.short_name] = style(version)
else:
if style.short_name in styles:
logger.warning(
"Plugin %s is overriding the existing metadata plugin for %s tags",
arch.module,
ep.module,
style.short_name,
)
styles[style.short_name] = style(version)
except Exception:
logger.exception("Failed to load metadata plugin: %s", arch.name)
logger.exception("Failed to load metadata plugin: %s", ep.name)
for style_name in set(builtin.keys()).intersection(styles):
logger.warning("Builtin metadata for %s tags are being overridden by a plugin", style_name)
metadata_styles.clear()

View File

@ -0,0 +1,153 @@
"""Functions related to finding and loading plugins."""
# Lifted from flake8 https://github.com/PyCQA/flake8/blob/main/src/flake8/plugins/finder.py#L127
from __future__ import annotations
import configparser
import importlib.metadata
import logging
import pathlib
import re
from collections.abc import Generator
from typing import Any, NamedTuple
LOG = logging.getLogger(__name__)
NORMALIZE_PACKAGE_NAME_RE = re.compile(r"[-_.]+")
PLUGIN_GROUPS = frozenset(("comictagger.talker", "comicapi.archiver", "comicapi.metadata"))
class FailedToLoadPlugin(Exception):
"""Exception raised when a plugin fails to load."""
FORMAT = 'ComicTagger failed to load local plugin "{name}" due to {exc}.'
def __init__(self, plugin_name: str, exception: Exception) -> None:
"""Initialize our FailedToLoadPlugin exception."""
self.plugin_name = plugin_name
self.original_exception = exception
super().__init__(plugin_name, exception)
def __str__(self) -> str:
"""Format our exception message."""
return self.FORMAT.format(
name=self.plugin_name,
exc=self.original_exception,
)
def normalize_pypi_name(s: str) -> str:
"""Normalize a distribution name according to PEP 503."""
return NORMALIZE_PACKAGE_NAME_RE.sub("-", s).lower()
class Plugin(NamedTuple):
"""A plugin before loading."""
package: str
version: str
entry_point: importlib.metadata.EntryPoint
path: pathlib.Path
class LoadedPlugin(NamedTuple):
"""Represents a plugin after being imported."""
plugin: Plugin
obj: Any
@property
def entry_name(self) -> str:
"""Return the name given in the packaging metadata."""
return self.plugin.entry_point.name
@property
def display_name(self) -> str:
"""Return the name for use in user-facing / error messages."""
return f"{self.plugin.package}[{self.entry_name}]"
class Plugins(NamedTuple):
"""Classified plugins."""
archivers: list[Plugin]
metadata: list[Plugin]
talkers: list[Plugin]
def all_plugins(self) -> Generator[Plugin, None, None]:
"""Return an iterator over all :class:`LoadedPlugin`s."""
yield from self.archivers
yield from self.metadata
yield from self.talkers
def versions_str(self) -> str:
"""Return a user-displayed list of plugin versions."""
return ", ".join(sorted({f"{plugin.package}: {plugin.version}" for plugin in self.all_plugins()}))
def _find_local_plugins(plugin_path: pathlib.Path) -> Generator[Plugin, None, None]:
cfg = configparser.ConfigParser(interpolation=None)
cfg.read(plugin_path / "setup.cfg")
for group in PLUGIN_GROUPS:
for plugin_s in cfg.get("options.entry_points", group, fallback="").splitlines():
if not plugin_s:
continue
name, _, entry_str = plugin_s.partition("=")
name, entry_str = name.strip(), entry_str.strip()
ep = importlib.metadata.EntryPoint(name, entry_str, group)
yield Plugin(plugin_path.name, cfg.get("metadata", "version", fallback="0.0.1"), ep, plugin_path)
def _check_required_plugins(plugins: list[Plugin], expected: frozenset[str]) -> None:
plugin_names = {normalize_pypi_name(plugin.package) for plugin in plugins}
expected_names = {normalize_pypi_name(name) for name in expected}
missing_plugins = expected_names - plugin_names
if missing_plugins:
raise Exception(
"required plugins were not installed!\n"
+ f"- installed: {', '.join(sorted(plugin_names))}\n"
+ f"- expected: {', '.join(sorted(expected_names))}\n"
+ f"- missing: {', '.join(sorted(missing_plugins))}"
)
def find_plugins(plugin_folder: pathlib.Path) -> Plugins:
"""Discovers all plugins (but does not load them)."""
ret: list[Plugin] = []
for plugin_path in plugin_folder.glob("*/setup.cfg"):
try:
ret.extend(_find_local_plugins(plugin_path.parent))
except Exception as err:
FailedToLoadPlugin(plugin_path.parent.name, err)
# for determinism, sort the list
ret.sort()
return _classify_plugins(ret)
def _classify_plugins(plugins: list[Plugin]) -> Plugins:
archivers = []
metadata = []
talkers = []
for p in plugins:
if p.entry_point.group == "comictagger.talker":
talkers.append(p)
elif p.entry_point.group == "comicapi.metadata":
metadata.append(p)
elif p.entry_point.group == "comicapi.archiver":
archivers.append(p)
else:
LOG.warning(NotImplementedError(f"what plugin type? {p}"))
return Plugins(
metadata=metadata,
archivers=archivers,
talkers=talkers,
)

View File

@ -49,6 +49,13 @@ class ComicTaggerPaths(AppDirs):
return path
return pathlib.Path(super().user_log_dir)
@property
def user_plugin_dir(self) -> pathlib.Path:
if self.path:
path = self.path / "plugins"
return path
return pathlib.Path(super().user_config_dir)
@property
def site_data_dir(self) -> pathlib.Path:
return pathlib.Path(super().site_data_dir)

View File

@ -34,7 +34,7 @@ import comicapi.comicarchive
import comicapi.utils
import comictalker
from comictaggerlib import cli, ctsettings
from comictaggerlib.ctsettings import ct_ns
from comictaggerlib.ctsettings import ct_ns, plugin_finder
from comictaggerlib.ctversion import version
from comictaggerlib.log import setup_logging
from comictaggerlib.resulttypes import Action
@ -123,9 +123,20 @@ class App:
self.main()
def load_plugins(self, opts: argparse.Namespace) -> None:
comicapi.comicarchive.load_archive_plugins()
comicapi.comicarchive.load_metadata_plugins(version=version)
self.talkers = comictalker.get_talkers(version, opts.config.user_cache_dir)
local_plugins = plugin_finder.find_plugins(opts.config.user_plugin_dir)
self._extend_plugin_paths(local_plugins)
logger.warning(sys.path)
comicapi.comicarchive.load_archive_plugins(local_plugins=[p.entry_point for p in local_plugins.archivers])
comicapi.comicarchive.load_metadata_plugins(
version=version, local_plugins=[p.entry_point for p in local_plugins.metadata]
)
self.talkers = comictalker.get_talkers(
version, opts.config.user_cache_dir, local_plugins=[p.entry_point for p in local_plugins.talkers]
)
def _extend_plugin_paths(self, plugins: plugin_finder.Plugins) -> None:
sys.path.extend(str(p.path.absolute()) for p in plugins.all_plugins())
def list_plugins(
self,
@ -228,16 +239,14 @@ class App:
return config
def initialize_dirs(self, paths: ctsettings.ComicTaggerPaths) -> None:
paths.user_data_dir.mkdir(parents=True, exist_ok=True)
paths.user_config_dir.mkdir(parents=True, exist_ok=True)
paths.user_cache_dir.mkdir(parents=True, exist_ok=True)
paths.user_state_dir.mkdir(parents=True, exist_ok=True)
paths.user_log_dir.mkdir(parents=True, exist_ok=True)
logger.debug("user_data_dir: %s", paths.user_data_dir)
paths.user_plugin_dir.mkdir(parents=True, exist_ok=True)
logger.debug("user_config_dir: %s", paths.user_config_dir)
logger.debug("user_cache_dir: %s", paths.user_cache_dir)
logger.debug("user_state_dir: %s", paths.user_state_dir)
logger.debug("user_log_dir: %s", paths.user_log_dir)
logger.debug("user_plugin_dir: %s", paths.user_plugin_dir)
def main(self) -> None:
assert self.config is not None

View File

@ -1,15 +1,17 @@
from __future__ import annotations
import itertools
import logging
import pathlib
import sys
from collections.abc import Sequence
from packaging.version import InvalidVersion, parse
if sys.version_info < (3, 10):
from importlib_metadata import entry_points
from importlib_metadata import EntryPoint, entry_points
else:
from importlib.metadata import entry_points
from importlib.metadata import entry_points, EntryPoint
from comictalker.comictalker import ComicTalker, TalkerError
@ -21,12 +23,15 @@ __all__ = [
]
def get_talkers(version: str, cache: pathlib.Path) -> dict[str, ComicTalker]:
def get_talkers(
version: str, cache: pathlib.Path, local_plugins: Sequence[EntryPoint] = tuple()
) -> dict[str, ComicTalker]:
"""Returns all comic talker instances"""
talkers: dict[str, ComicTalker] = {}
ct_version = parse(version)
for talker in entry_points(group="comictagger.talker"):
# A dict is used, last plugin wins
for talker in itertools.chain(entry_points(group="comictagger.talker"), local_plugins):
try:
talker_cls = talker.load()
obj = talker_cls(version, cache)

View File

@ -189,11 +189,10 @@ def config(tmp_path):
app.register_settings()
defaults = app.parse_settings(comictaggerlib.ctsettings.ComicTaggerPaths(tmp_path / "config"), "")
defaults[0].Runtime_Options__config.user_data_dir.mkdir(parents=True, exist_ok=True)
defaults[0].Runtime_Options__config.user_config_dir.mkdir(parents=True, exist_ok=True)
defaults[0].Runtime_Options__config.user_cache_dir.mkdir(parents=True, exist_ok=True)
defaults[0].Runtime_Options__config.user_state_dir.mkdir(parents=True, exist_ok=True)
defaults[0].Runtime_Options__config.user_log_dir.mkdir(parents=True, exist_ok=True)
defaults[0].Runtime_Options__config.user_plugin_dir.mkdir(parents=True, exist_ok=True)
yield defaults
@ -207,11 +206,10 @@ def plugin_config(tmp_path):
app.register_settings()
defaults = app.parse_settings(ns.config, "")
defaults[0].Runtime_Options__config.user_data_dir.mkdir(parents=True, exist_ok=True)
defaults[0].Runtime_Options__config.user_config_dir.mkdir(parents=True, exist_ok=True)
defaults[0].Runtime_Options__config.user_cache_dir.mkdir(parents=True, exist_ok=True)
defaults[0].Runtime_Options__config.user_state_dir.mkdir(parents=True, exist_ok=True)
defaults[0].Runtime_Options__config.user_log_dir.mkdir(parents=True, exist_ok=True)
defaults[0].Runtime_Options__config.user_plugin_dir.mkdir(parents=True, exist_ok=True)
yield (defaults, app.talkers)