d2326eadb9
Persistent setting groups allow settings that are not declared to survive normalization and other operations that would normally remove any unknown keys in a group. Calling normalize or get_namespace with persistent=False will remove unknown keys even from persistent groups. Currently the only way to retrieve the defaults for a config and preserve unknown keys is to manually get the defaults and update each existing group with the default values.
644 lines
22 KiB
Python
644 lines
22 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import pathlib
|
|
import re
|
|
import sys
|
|
from argparse import Namespace
|
|
from collections import defaultdict
|
|
from collections.abc import Sequence
|
|
from typing import Any
|
|
from typing import Callable
|
|
from typing import Dict
|
|
from typing import Generic
|
|
from typing import NoReturn
|
|
from typing import overload
|
|
from typing import TYPE_CHECKING
|
|
from typing import TypeVar
|
|
from typing import Union
|
|
logger = logging.getLogger(__name__)
|
|
|
|
if sys.version_info < (3, 11): # pragma: no cover
|
|
from typing_extensions import NamedTuple
|
|
else: # pragma: no cover
|
|
from typing import NamedTuple
|
|
|
|
|
|
if sys.version_info < (3, 9): # pragma: no cover
|
|
def removeprefix(self: str, prefix: str, /) -> str:
|
|
if self.startswith(prefix):
|
|
return self[len(prefix):]
|
|
else:
|
|
return self[:]
|
|
|
|
class BooleanOptionalAction(argparse.Action):
|
|
def __init__(
|
|
self,
|
|
option_strings,
|
|
dest,
|
|
default=None,
|
|
type=None, # noqa: A002
|
|
choices=None,
|
|
required=False,
|
|
help=None, # noqa: A002
|
|
metavar=None,
|
|
):
|
|
|
|
_option_strings = []
|
|
for option_string in option_strings:
|
|
_option_strings.append(option_string)
|
|
|
|
if option_string.startswith('--'):
|
|
option_string = '--no-' + option_string[2:]
|
|
_option_strings.append(option_string)
|
|
|
|
if help is not None and default is not None and default is not argparse.SUPPRESS:
|
|
help += ' (default: %(default)s)'
|
|
|
|
super().__init__(
|
|
option_strings=_option_strings,
|
|
dest=dest,
|
|
nargs=0,
|
|
default=default,
|
|
type=type,
|
|
choices=choices,
|
|
required=required,
|
|
help=help,
|
|
metavar=metavar,
|
|
)
|
|
|
|
def __call__(self, parser, namespace, values, option_string=None): # dead: disable
|
|
if option_string in self.option_strings:
|
|
setattr(namespace, self.dest, not option_string.startswith('--no-'))
|
|
else: # pragma: no cover
|
|
from argparse import BooleanOptionalAction
|
|
removeprefix = str.removeprefix
|
|
|
|
|
|
class Setting:
|
|
def __init__(
|
|
self,
|
|
# From argparse
|
|
*names: str,
|
|
action: type[argparse.Action] | None = None,
|
|
nargs: str | int | None = None,
|
|
const: str | None = None,
|
|
default: str | None = None,
|
|
type: Callable[..., Any] | None = None, # noqa: A002
|
|
choices: Sequence[Any] | None = None,
|
|
required: bool | None = None,
|
|
help: str | None = None, # noqa: A002
|
|
metavar: str | None = None,
|
|
dest: str | None = None,
|
|
# ComicTagger
|
|
cmdline: bool = True,
|
|
file: bool = True,
|
|
group: str = '',
|
|
exclusive: bool = False,
|
|
):
|
|
if not names:
|
|
raise ValueError('names must be specified')
|
|
# We prefix the destination name used by argparse so that there are no conflicts
|
|
# Argument names will still cause an exception if there is a conflict e.g. if '-f' is defined twice
|
|
self.internal_name, dest, flag = self.get_dest(group, names, dest)
|
|
args: Sequence[str] = names
|
|
|
|
# We then also set the metavar so that '--config' in the group runtime shows as 'CONFIG' instead of 'RUNTIME_CONFIG'
|
|
if not metavar and action not in ('store_true', 'store_false', 'count'):
|
|
metavar = dest.upper()
|
|
|
|
# If we are not a flag, no '--' or '-' in front
|
|
# we prefix the first name with the group as argparse sets dest to args[0]
|
|
# I believe internal name may be able to be used here
|
|
if not flag:
|
|
args = tuple((f'{group}_{names[0]}'.lstrip('_'), *names[1:]))
|
|
|
|
self.action = action
|
|
self.nargs = nargs
|
|
self.const = const
|
|
self.default = default
|
|
self.type = type
|
|
self.choices = choices
|
|
self.required = required
|
|
self.help = help
|
|
self.metavar = metavar
|
|
self.dest = dest
|
|
self.cmdline = cmdline
|
|
self.file = file
|
|
self.argparse_args = args
|
|
self.group = group
|
|
self.exclusive = exclusive
|
|
|
|
self.argparse_kwargs = {
|
|
'action': action,
|
|
'nargs': nargs,
|
|
'const': const,
|
|
'default': default,
|
|
'type': type,
|
|
'choices': choices,
|
|
'required': required,
|
|
'help': help,
|
|
'metavar': metavar,
|
|
'dest': self.internal_name if flag else None,
|
|
}
|
|
|
|
def __str__(self) -> str: # pragma: no cover
|
|
return f'Setting({self.argparse_args}, type={self.type}, file={self.file}, cmdline={self.cmdline}, kwargs={self.argparse_kwargs})'
|
|
|
|
def __repr__(self) -> str: # pragma: no cover
|
|
return self.__str__()
|
|
|
|
def get_dest(self, prefix: str, names: Sequence[str], dest: str | None) -> tuple[str, str, bool]:
|
|
dest_name = None
|
|
flag = False
|
|
|
|
for n in names:
|
|
if n.startswith('--'):
|
|
flag = True
|
|
dest_name = sanitize_name(n)
|
|
break
|
|
if n.startswith('-'):
|
|
flag = True
|
|
|
|
if dest_name is None:
|
|
dest_name = names[0]
|
|
if dest:
|
|
dest_name = dest
|
|
if not dest_name.isidentifier():
|
|
raise Exception('Cannot use {dest_name} in a namespace')
|
|
|
|
internal_name = f'{prefix}_{dest_name}'.lstrip('_')
|
|
return internal_name, dest_name, flag
|
|
|
|
def filter_argparse_kwargs(self) -> dict[str, Any]:
|
|
return {k: v for k, v in self.argparse_kwargs.items() if v is not None}
|
|
|
|
def to_argparse(self) -> tuple[Sequence[str], dict[str, Any]]:
|
|
return self.argparse_args, self.filter_argparse_kwargs()
|
|
|
|
|
|
class Group(NamedTuple):
|
|
persistent: bool
|
|
v: dict[str, Setting]
|
|
|
|
|
|
Values = Dict[str, Dict[str, Any]]
|
|
Definitions = Dict[str, Group]
|
|
|
|
T = TypeVar('T', Values, Namespace)
|
|
|
|
|
|
class Config(NamedTuple, Generic[T]):
|
|
values: T
|
|
definitions: Definitions
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
ArgParser = Union[argparse._MutuallyExclusiveGroup, argparse._ArgumentGroup, argparse.ArgumentParser]
|
|
ns = Namespace | Config[T] | None
|
|
|
|
|
|
def sanitize_name(name: str) -> str:
|
|
return re.sub('[' + re.escape(' -_,.!@#$%^&*(){}[]\',."<>;:') + ']+', '-', name).strip('-')
|
|
|
|
|
|
def get_option(options: Values | Namespace, setting: Setting) -> tuple[Any, bool]:
|
|
"""
|
|
Helper function to retrieve the value for a setting and if the value is the default value
|
|
|
|
Args:
|
|
options: Dictionary or namespace of options
|
|
setting: The setting object describing the value to retrieve
|
|
"""
|
|
if isinstance(options, dict):
|
|
value = options.get(setting.group, {}).get(setting.dest, setting.default)
|
|
else:
|
|
value = getattr(options, setting.internal_name, setting.default)
|
|
return value, value == setting.default
|
|
|
|
|
|
def get_options(options: Config[T], group: str) -> dict[str, Any]:
|
|
"""
|
|
Helper function to retrieve all of the values for a group. Only to be used on persistent groups.
|
|
|
|
Args:
|
|
options: Dictionary or namespace of options
|
|
group: The name of the group to retrieve
|
|
"""
|
|
if isinstance(options[0], dict):
|
|
values = options[0].get(group, {}).copy()
|
|
else:
|
|
internal_names = {x.internal_name: x for x in options[1][group].v.values()}
|
|
values = {}
|
|
v = vars(options[0])
|
|
for name, value in v.items():
|
|
if name.startswith(f'{group}_'):
|
|
if name in internal_names:
|
|
values[internal_names[name].dest] = value
|
|
else:
|
|
values[removeprefix(name, f'{group}_')] = value
|
|
|
|
return values
|
|
|
|
|
|
def normalize_config(
|
|
config: Config[T],
|
|
file: bool = False,
|
|
cmdline: bool = False,
|
|
defaults: bool = True,
|
|
persistent: bool = True,
|
|
) -> Config[Values]:
|
|
"""
|
|
Creates an `OptionValues` dictionary with setting definitions taken from `self.definitions`
|
|
and values taken from `raw_options` and `raw_options_2' if defined.
|
|
Values are assigned so if the value is a dictionary mutating it will mutate the original.
|
|
|
|
Args:
|
|
raw_options: The dict or Namespace to normalize options from
|
|
definitions: The definition of the options
|
|
file: Include file options
|
|
cmdline: Include cmdline options
|
|
defaults: Include default values in the returned dict
|
|
persistent: Include unknown keys in persistent groups
|
|
"""
|
|
|
|
normalized: Values = {}
|
|
options, definitions = config
|
|
for group_name, group in definitions.items():
|
|
group_options = {}
|
|
if group.persistent and persistent:
|
|
group_options = get_options(config, group_name)
|
|
for setting_name, setting in group.v.items():
|
|
if (setting.cmdline and cmdline) or (setting.file and file):
|
|
# Ensures the option exists with the default if not already set
|
|
value, default = get_option(options, setting)
|
|
if not default or default and defaults:
|
|
group_options[setting_name] = value
|
|
elif setting_name in group_options:
|
|
del group_options[setting_name]
|
|
normalized[group_name] = group_options
|
|
return Config(normalized, definitions)
|
|
|
|
|
|
def parse_file(definitions: Definitions, filename: pathlib.Path) -> tuple[Config[Values], bool]:
|
|
"""
|
|
Helper function to read options from a json dictionary from a file
|
|
Args:
|
|
filename: A pathlib.Path object to read a json dictionary from
|
|
"""
|
|
options: Values = {}
|
|
success = True
|
|
if filename.exists():
|
|
try:
|
|
with filename.open() as file:
|
|
opts = json.load(file)
|
|
if isinstance(opts, dict):
|
|
options = opts
|
|
except Exception:
|
|
logger.exception('Failed to load config file: %s', filename)
|
|
success = False
|
|
else:
|
|
logger.info('No config file found')
|
|
success = True
|
|
|
|
return (normalize_config(Config(options, definitions), file=True), success)
|
|
|
|
|
|
def clean_config(
|
|
config: Config[T], file: bool = False, cmdline: bool = False,
|
|
) -> Values:
|
|
"""
|
|
Normalizes options and then cleans up empty groups
|
|
Args:
|
|
options:
|
|
file:
|
|
cmdline:
|
|
|
|
Returns:
|
|
|
|
"""
|
|
|
|
clean_options, definitions = normalize_config(config, file=file, cmdline=cmdline)
|
|
for group in list(clean_options.keys()):
|
|
if not clean_options[group]:
|
|
del clean_options[group]
|
|
return clean_options
|
|
|
|
|
|
def defaults(definitions: Definitions) -> Config[Values]:
|
|
return normalize_config(Config(Namespace(), definitions), file=True, cmdline=True)
|
|
|
|
|
|
def get_namespace(config: Config[T], defaults: bool = True, persistent: bool = True) -> Config[Namespace]:
|
|
"""
|
|
Returns an Namespace object with options in the form "{group_name}_{setting_name}"
|
|
`options` should already be normalized.
|
|
Throws an exception if the internal_name is duplicated
|
|
|
|
Args:
|
|
options: Normalized options to turn into a Namespace
|
|
defaults: Include default values in the returned dict
|
|
persistent: Include unknown keys in persistent groups
|
|
"""
|
|
|
|
if isinstance(config.values, Namespace):
|
|
options, definitions = normalize_config(config)
|
|
else:
|
|
options, definitions = config
|
|
namespace = Namespace()
|
|
for group_name, group in definitions.items():
|
|
if group.persistent and persistent:
|
|
group_options = get_options(config, group_name)
|
|
for name, value in group_options.items():
|
|
if name in group.v:
|
|
internal_name, default = group.v[name].internal_name, group.v[name].default == value
|
|
else:
|
|
internal_name, default = f'{group_name}_' + sanitize_name(name), None
|
|
|
|
if hasattr(namespace, internal_name):
|
|
raise Exception(f'Duplicate internal name: {internal_name}')
|
|
|
|
if not default or default and defaults:
|
|
setattr(namespace, internal_name, value)
|
|
|
|
else:
|
|
for setting_name, setting in group.v.items():
|
|
if hasattr(namespace, setting.internal_name):
|
|
raise Exception(f'Duplicate internal name: {setting.internal_name}')
|
|
value, default = get_option(options, setting)
|
|
|
|
if not default or default and defaults:
|
|
setattr(namespace, setting.internal_name, value)
|
|
return Config(namespace, definitions)
|
|
|
|
|
|
def save_file(
|
|
config: Config[T], filename: pathlib.Path,
|
|
) -> bool:
|
|
"""
|
|
Helper function to save options from a json dictionary to a file
|
|
Args:
|
|
options: The options to save to a json dictionary
|
|
filename: A pathlib.Path object to save the json dictionary to
|
|
"""
|
|
file_options = clean_config(config, file=True)
|
|
if not filename.exists():
|
|
filename.parent.mkdir(exist_ok=True, parents=True)
|
|
filename.touch()
|
|
|
|
try:
|
|
json_str = json.dumps(file_options, indent=2)
|
|
filename.write_text(json_str + '\n', encoding='utf-8')
|
|
except Exception:
|
|
logger.exception('Failed to save config file: %s', filename)
|
|
return False
|
|
return True
|
|
|
|
|
|
def create_argparser(definitions: Definitions, description: str, epilog: str) -> argparse.ArgumentParser:
|
|
"""Creates an :class:`argparse.ArgumentParser` from all cmdline settings"""
|
|
groups: dict[str, ArgParser] = {}
|
|
argparser = argparse.ArgumentParser(
|
|
description=description, epilog=epilog, formatter_class=argparse.RawTextHelpFormatter,
|
|
)
|
|
for group_name, group in definitions.items():
|
|
for setting_name, setting in group.v.items():
|
|
if setting.cmdline:
|
|
argparse_args, argparse_kwargs = setting.to_argparse()
|
|
current_group: ArgParser = argparser
|
|
if setting.group:
|
|
if setting.group not in groups:
|
|
if setting.exclusive:
|
|
groups[setting.group] = argparser.add_argument_group(
|
|
setting.group,
|
|
).add_mutually_exclusive_group()
|
|
else:
|
|
groups[setting.group] = argparser.add_argument_group(setting.group)
|
|
|
|
# hard coded exception for files
|
|
if not (setting.group == 'runtime' and setting.nargs == '*'):
|
|
current_group = groups[setting.group]
|
|
current_group.add_argument(*argparse_args, **argparse_kwargs)
|
|
return argparser
|
|
|
|
|
|
def parse_cmdline(
|
|
definitions: Definitions,
|
|
description: str,
|
|
epilog: str,
|
|
args: list[str] | None = None,
|
|
config: Namespace | Config[T] | None = None,
|
|
) -> Config[Values]:
|
|
"""
|
|
Creates an `argparse.ArgumentParser` from cmdline settings in `self.definitions`.
|
|
`args` and `namespace` are passed to `argparse.ArgumentParser.parse_args`
|
|
|
|
Args:
|
|
args: Passed to argparse.ArgumentParser.parse
|
|
namespace: Passed to argparse.ArgumentParser.parse
|
|
"""
|
|
namespace = None
|
|
if isinstance(config, Config):
|
|
if isinstance(config.values, Namespace):
|
|
namespace = config.values
|
|
else:
|
|
namespace = get_namespace(config, defaults=False)[0]
|
|
else:
|
|
namespace = config
|
|
argparser = create_argparser(definitions, description, epilog)
|
|
ns = argparser.parse_args(args, namespace=namespace)
|
|
|
|
return normalize_config(Config(ns, definitions), cmdline=True, file=True)
|
|
|
|
|
|
def parse_config(
|
|
definitions: Definitions,
|
|
description: str,
|
|
epilog: str,
|
|
config_path: pathlib.Path,
|
|
args: list[str] | None = None,
|
|
) -> tuple[Config[Values], bool]:
|
|
file_options, success = parse_file(definitions, config_path)
|
|
cmdline_options = parse_cmdline(
|
|
definitions, description, epilog, args, get_namespace(file_options, defaults=False),
|
|
)
|
|
|
|
final_options = normalize_config(cmdline_options, file=True, cmdline=True)
|
|
return (final_options, success)
|
|
|
|
|
|
class Manager:
|
|
"""docstring for Manager"""
|
|
|
|
def __init__(self, description: str = '', epilog: str = '', definitions: Definitions | Config[T] | None = None):
|
|
# This one is never used, it just makes MyPy happy
|
|
self.argparser = argparse.ArgumentParser(description=description, epilog=epilog)
|
|
self.description = description
|
|
self.epilog = epilog
|
|
|
|
if isinstance(definitions, Config):
|
|
self.definitions = definitions.definitions
|
|
else:
|
|
self.definitions = defaultdict(lambda: Group(False, {}), definitions or {})
|
|
|
|
self.exclusive_group = False
|
|
self.current_group_name = ''
|
|
|
|
def create_argparser(self) -> None:
|
|
self.argparser = create_argparser(self.definitions, self.description, self.epilog)
|
|
|
|
def add_setting(self, *args: Any, **kwargs: Any) -> None:
|
|
"""Takes passes all arguments through to `Setting`, `group` and `exclusive` are already set"""
|
|
setting = Setting(*args, **kwargs, group=self.current_group_name, exclusive=self.exclusive_group)
|
|
self.definitions[self.current_group_name].v[setting.dest] = setting
|
|
|
|
def add_group(self, name: str, group: Callable[[Manager], None], exclusive_group: bool = False) -> None:
|
|
"""
|
|
The primary way to add define options on this class
|
|
|
|
Args:
|
|
name: The name of the group to define
|
|
group: A function that registers individual options using :meth:`add_setting`
|
|
exclusive_group: If this group is an argparse exclusive group
|
|
"""
|
|
self.current_group_name = name
|
|
self.exclusive_group = exclusive_group
|
|
group(self)
|
|
self.current_group_name = ''
|
|
self.exclusive_group = False
|
|
|
|
def add_persistent_group(self, name: str, group: Callable[[Manager], None], exclusive_group: bool = False) -> None:
|
|
"""
|
|
The primary way to add define options on this class
|
|
|
|
Args:
|
|
name: The name of the group to define
|
|
group: A function that registers individual options using :meth:`add_setting`
|
|
exclusive_group: If this group is an argparse exclusive group
|
|
"""
|
|
self.current_group_name = name
|
|
self.exclusive_group = exclusive_group
|
|
self.definitions[self.current_group_name] = Group(True, {})
|
|
group(self)
|
|
self.current_group_name = ''
|
|
self.exclusive_group = False
|
|
|
|
def exit(self, *args: Any, **kwargs: Any) -> NoReturn:
|
|
"""See :class:`~argparse.ArgumentParser`"""
|
|
self.argparser.exit(*args, **kwargs)
|
|
raise SystemExit(99)
|
|
|
|
def defaults(self) -> Config[Values]:
|
|
return defaults(self.definitions)
|
|
|
|
def clean_config(
|
|
self, options: T | Config[T], file: bool = False, cmdline: bool = False,
|
|
) -> Values:
|
|
if isinstance(options, Config):
|
|
config = options
|
|
else:
|
|
config = Config(options, self.definitions)
|
|
return clean_config(config, file=file, cmdline=cmdline)
|
|
|
|
def normalize_config(
|
|
self,
|
|
options: T | Config[T],
|
|
file: bool = False,
|
|
cmdline: bool = False,
|
|
defaults: bool = True,
|
|
) -> Config[Values]:
|
|
if isinstance(options, Config):
|
|
config = options
|
|
else:
|
|
config = Config(options, self.definitions)
|
|
return normalize_config(
|
|
config=config,
|
|
file=file,
|
|
cmdline=cmdline,
|
|
defaults=defaults,
|
|
)
|
|
|
|
@overload
|
|
def get_namespace(self, options: Values, defaults: bool = True) -> Namespace:
|
|
...
|
|
|
|
@overload
|
|
def get_namespace(self, options: Config[Values], defaults: bool = True) -> Config[Namespace]:
|
|
...
|
|
|
|
def get_namespace(self, options: Values | Config[Values], defaults: bool = True) -> Config[Namespace] | Namespace:
|
|
if isinstance(options, Config):
|
|
self.definitions = options[1]
|
|
return get_namespace(options, defaults=defaults)
|
|
else:
|
|
return get_namespace(Config(options, self.definitions), defaults=defaults)
|
|
|
|
def parse_file(self, filename: pathlib.Path) -> tuple[Config[Values], bool]:
|
|
return parse_file(filename=filename, definitions=self.definitions)
|
|
|
|
def save_file(self, options: T | Config[T], filename: pathlib.Path) -> bool:
|
|
if isinstance(options, Config):
|
|
return save_file(options, filename=filename)
|
|
return save_file(Config(options, self.definitions), filename=filename)
|
|
|
|
def parse_cmdline(self, args: list[str] | None = None, namespace: ns[T] = None) -> Config[Values]:
|
|
return parse_cmdline(self.definitions, self.description, self.epilog, args, namespace)
|
|
|
|
def parse_config(self, config_path: pathlib.Path, args: list[str] | None = None) -> tuple[Config[Values], bool]:
|
|
return parse_config(self.definitions, self.description, self.epilog, config_path, args)
|
|
|
|
|
|
def example(manager: Manager) -> None:
|
|
manager.add_setting(
|
|
'--hello',
|
|
default='world',
|
|
)
|
|
manager.add_setting(
|
|
'--save', '-s',
|
|
default=False,
|
|
action='store_true',
|
|
file=False,
|
|
)
|
|
manager.add_setting(
|
|
'--verbose', '-v',
|
|
default=False,
|
|
action=BooleanOptionalAction, # Added in Python 3.9
|
|
)
|
|
|
|
|
|
def persistent(manager: Manager) -> None:
|
|
manager.add_setting(
|
|
'--test', '-t',
|
|
default=False,
|
|
action=BooleanOptionalAction, # Added in Python 3.9
|
|
)
|
|
|
|
|
|
def _main(args: list[str] | None = None) -> None:
|
|
settings_path = pathlib.Path('./settings.json')
|
|
manager = Manager(description='This is an example', epilog='goodbye!')
|
|
|
|
manager.add_group('example', example)
|
|
manager.add_persistent_group('persistent', persistent)
|
|
|
|
file_config, success = manager.parse_file(settings_path)
|
|
file_namespace = manager.get_namespace(file_config)
|
|
|
|
merged_config = manager.parse_cmdline(args=args, namespace=file_namespace)
|
|
merged_namespace = manager.get_namespace(merged_config)
|
|
|
|
print(f'Hello {merged_config.values["example"]["hello"]}') # noqa: T201
|
|
if merged_namespace.values.example_save:
|
|
if manager.save_file(merged_config, settings_path):
|
|
print(f'Successfully saved settings to {settings_path}') # noqa: T201
|
|
else:
|
|
print(f'Failed saving settings to a {settings_path}') # noqa: T201
|
|
if merged_namespace.values.example_verbose:
|
|
print(f'{merged_namespace.values.example_verbose=}') # noqa: T201
|
|
|
|
|
|
if __name__ == '__main__':
|
|
_main()
|