Source code for cowbird.config

import logging
import os
import re
from typing import Any, Dict, List, Literal, Tuple, Union, cast, overload

import yaml
from schema import And, Optional, Or, Regex, Schema

from cowbird.typedefs import (
    ConfigDict,
    ConfigResTokenInfo,
    ConfigSegment,
    HandlerConfig,
    SyncPermissionConfig,
    SyncPointConfig
)
from cowbird.utils import get_logger, print_log, raise_log

[docs]LOGGER = get_logger(__name__)
[docs]MULTI_TOKEN = "**" # nosec: B105
[docs]BIDIRECTIONAL_ARROW = "<->"
[docs]RIGHT_ARROW = "->"
[docs]LEFT_ARROW = "<-"
[docs]PERMISSION_REGEX = r"[\w-]+"
# Either a single word, or a list of words in array
[docs]PERMISSIONS_REGEX = rf"({PERMISSION_REGEX}|\[\s*{PERMISSION_REGEX}(?:\s*,\s*{PERMISSION_REGEX})*\s*\])"
[docs]DIRECTION_REGEX = rf"({BIDIRECTIONAL_ARROW}|{LEFT_ARROW}|{RIGHT_ARROW})"
# Mapping format # <res_key1> : <permission(s)> <direction> <res_key2> : <permission(s)>
[docs]MAPPING_REGEX = r"(\w+)\s*:\s*" + PERMISSIONS_REGEX + r"\s*" + DIRECTION_REGEX + r"\s*(\w+)\s*:\s*" + PERMISSIONS_REGEX
[docs]NAMED_TOKEN_REGEX = r"^\{\s*(\w+)\s*\}$" # nosec: B105
[docs]class ConfigError(RuntimeError): """ Generic error during configuration loading. """
[docs]class ConfigErrorInvalidTokens(ConfigError): """ Config error specific to invalid SINGLE_TOKEN or MULTI_TOKEN tokens. """
[docs]class ConfigErrorInvalidServiceKey(ConfigError): """ Config error for invalid service keys. """
[docs]class ConfigErrorInvalidResourceKey(ConfigError): """ Config error for invalid resource keys. """
[docs]def _load_config(path_or_dict: Union[str, ConfigDict], section: str, allow_missing: bool = False) -> ConfigDict: """ Loads a file path or dictionary as YAML/JSON configuration. """ try: if isinstance(path_or_dict, str): with open(path_or_dict, "r", encoding="utf-8") as f: cfg = yaml.safe_load(f) else: cfg = path_or_dict return _expand_all(cfg[section]) except KeyError: msg = f"Config file section [{section!s}] not found." if allow_missing: print_log(msg, level=logging.WARNING, logger=LOGGER) return {} raise_log(msg, exception=ConfigError, logger=LOGGER) except Exception as exc: raise_log(f"Invalid config file [{exc!r}]", exception=ConfigError, logger=LOGGER)
@overload
[docs]def get_all_configs( # type: ignore[misc,unused-ignore] path_or_dict: Union[str, ConfigDict], section: Literal["handlers"], allow_missing: bool = False, ) -> List[Dict[str, HandlerConfig]]: ...
@overload def get_all_configs( # type: ignore[misc,unused-ignore] path_or_dict: Union[str, ConfigDict], section: Literal["sync_permissions"], allow_missing: bool = False, ) -> List[SyncPointConfig]: ... def get_all_configs( # type: ignore[misc,unused-ignore] path_or_dict: Union[str, ConfigDict], section: str, allow_missing: bool = False, ) -> List[Union[ConfigDict, Dict[str, HandlerConfig], SyncPointConfig]]: """ Loads all configuration files specified by the path (if a directory), a single configuration (if a file) or directly returns the specified dictionary section (if a configuration dictionary). :returns: - list of configurations loaded if input was a directory path - list of single configuration if input was a file path - list of single configuration if input was a JSON dict - empty list if none of the other cases where matched .. note:: Order of file loading will be resolved by alphabetically sorted filename if specifying a directory path. """ if isinstance(path_or_dict, str): if os.path.isdir(path_or_dict): dir_path = os.path.abspath(path_or_dict) known_extensions = [".cfg", ".yml", ".yaml", ".json"] cfg_names = list(sorted({fn for fn in os.listdir(dir_path) if any(fn.endswith(ext) for ext in known_extensions)})) return [_load_config(os.path.join(dir_path, fn), section, allow_missing) for fn in cfg_names] if os.path.isfile(path_or_dict): return [_load_config(path_or_dict, section, allow_missing)] elif isinstance(path_or_dict, dict): return [_load_config(path_or_dict, section, allow_missing)] return []
[docs]def _expand_all(config: ConfigDict) -> ConfigDict: """ Applies environment variable expansion recursively to all applicable fields of a configuration definition. """ if isinstance(config, dict): for cfg in list(config): cfg_key = os.path.expandvars(cfg) if cfg_key != cfg: config[cfg_key] = config.pop(cfg) config[cfg_key] = _expand_all(cast(ConfigDict, config[cfg_key])) elif isinstance(config, (list, set)): for i, cfg in enumerate(config): config[i] = _expand_all(cfg) elif isinstance(config, str): config = os.path.expandvars(str(config)) elif isinstance(config, (int, bool, float, type(None))): pass else: raise NotImplementedError(f"unknown parsing of config of type: {type(config)}") return config
[docs]def validate_handlers_config_schema(handlers_cfg: Dict[str, HandlerConfig]) -> None: """ Validates the schema of the `handlers` section found in the config. """ str_not_empty_validator = And(str, lambda s: len(s) > 0) schema = Schema({ str: { # Handler name # parameters common to all handlers Optional("active"): bool, Optional("priority"): int, Optional("url"): str, Optional("workspace_dir"): str, # parameters for specific handlers Optional("jupyterhub_user_data_dir"): str_not_empty_validator, Optional("wps_outputs_dir"): str_not_empty_validator, Optional("secure_data_proxy_name"): str_not_empty_validator, Optional("wps_outputs_res_name"): str_not_empty_validator, Optional("notebooks_dir_name"): str_not_empty_validator, Optional("public_workspace_wps_outputs_subpath"): str_not_empty_validator, Optional("user_wps_outputs_dir_name"): str_not_empty_validator, } }, ignore_extra_keys=True) schema.validate(handlers_cfg)
[docs]def validate_sync_perm_config_schema(sync_cfg: SyncPointConfig) -> None: """ Validates the schema of the `sync_permissions` section found in the config. """ schema = Schema({ Optional(str): { "services": { str: { # Service type, must correspond to an actual Magpie service type str: [ # Resource key, used to identify the resource here and in the permissions_mapping {Or("name", "regex", only_one=True): str, "type": str, Optional("field"): str} ] } }, "permissions_mapping": [Regex(MAPPING_REGEX)] } }) schema.validate(sync_cfg)
[docs]def validate_and_get_resource_info(res_key: str, segments: List[ConfigSegment]) -> ConfigResTokenInfo: """ Validates a resource_key and its related info from the config and returns some resource info relevant to the config mapping validation. Returned info contains the following: - if the resource uses a MULTI_TOKEN in its resource_path - the list of named tokens found in the resource_path """ named_tokens = set() has_multi_token = False for seg in segments: if "name" in seg: if seg["name"] == MULTI_TOKEN: if has_multi_token: raise ConfigErrorInvalidTokens(f"Invalid config value for resource key {res_key}. Only one " f"`{MULTI_TOKEN}` token is permitted per resource.") has_multi_token = True else: matched_groups = re.match(NAMED_TOKEN_REGEX, seg["name"]) if matched_groups: # Save the first group as a named token, since there's only 1 matching group in the regex. if matched_groups.groups()[0] in named_tokens: raise ConfigErrorInvalidTokens( f"Invalid config value for resource key {res_key}. Named token " f"{matched_groups.groups()[0]} was found in multiple segments of " "the resource path. Each named token should only be used once in a " "resource path.") named_tokens.add(matched_groups.groups()[0]) return {"has_multi_token": has_multi_token, "named_tokens": named_tokens}
[docs]def validate_bidirectional_mapping(mapping: str, res_info: Dict[str, ConfigResTokenInfo], res_key1: str, res_key2: str) -> None: """ Validates if both resources of a bidirectional mapping respect validation rules. Both should either use MULTI_TOKEN or not use it and both should use exactly the same named tokens. """ if res_info[res_key1]["has_multi_token"] != res_info[res_key2]["has_multi_token"]: raise ConfigErrorInvalidTokens(f"Invalid permission mapping `{mapping}`. For a bidirectional mapping, " f"either all mapped resources should have `{MULTI_TOKEN}` " "or none should use them.") if res_info[res_key1]["named_tokens"] != res_info[res_key2]["named_tokens"]: raise ConfigErrorInvalidTokens(f"Invalid permission mapping `{mapping}`. For a bidirectional mapping, " "both resources should have exactly the same named_tokens. " f"({res_key1}: {res_info[res_key1]['named_tokens']}, " f"{res_key2}: {res_info[res_key2]['named_tokens']})")
[docs]def validate_unidirectional_mapping(mapping: str, src_info: ConfigResTokenInfo, tgt_info: ConfigResTokenInfo) -> None: """ Validates if both source and target resource of a unidirectional mapping respect validation rules. Source resource should use MULTI_TOKEN if target uses it, and source resource should include all named tokens found in the target resource. """ if not src_info["has_multi_token"] and tgt_info["has_multi_token"]: raise ConfigErrorInvalidTokens(f"Invalid permission mapping `{mapping}`. For a unidirectional mapping, " "the source resource should use a MULTI_TOKEN " "if the target is using one.") missing_named_tokens = tgt_info["named_tokens"] - src_info["named_tokens"] if missing_named_tokens: raise ConfigErrorInvalidTokens(f"Invalid permission mapping `{mapping}`. For a unidirectional mapping, " "all named tokens found in the target resource should also be found in " f"the source resource, but the tokens `{missing_named_tokens}` are " f"missing from the source.")
[docs]def get_mapping_info(mapping: str) -> Tuple[Union[str, Any], ...]: """ Obtain the different info found in a mapping string from the config. Returns the following matching groups : (res_key1, permission1, direction, res_key2, permission2) """ matched_groups = re.match(MAPPING_REGEX, mapping) if not matched_groups or len(matched_groups.groups()) != 5: raise ConfigError(f"Error parsing mapping `{mapping}`. " "Couldn't find all mapping info because of invalid format.") return matched_groups.groups()
[docs]def get_permissions_from_str(permissions: str) -> List[str]: """ Returns a tuple of all permissions found in a string. Used for permission strings found in the config, which can either be a single permission or a list of permissions. """ matched_groups = re.findall(PERMISSION_REGEX, permissions) if not matched_groups: raise ConfigError("Couldn't find permission, invalid format.") return matched_groups
[docs]def validate_sync_mapping_config(sync_cfg: SyncPermissionConfig, res_info: Dict[str, ConfigResTokenInfo]) -> None: """ Validates if mappings in the config have valid resource keys and use tokens properly. """ for mapping in sync_cfg["permissions_mapping"]: res_key1, _, direction, res_key2, _ = get_mapping_info(mapping) for res_key in [res_key1, res_key2]: if res_key not in res_info: raise ConfigErrorInvalidResourceKey(f"Invalid config mapping references resource {res_key} which is " "not defined in any service.") if direction == BIDIRECTIONAL_ARROW: validate_bidirectional_mapping(mapping, res_info, res_key1, res_key2) elif direction == RIGHT_ARROW: validate_unidirectional_mapping(mapping, src_info=res_info[res_key1], tgt_info=res_info[res_key2]) elif direction == LEFT_ARROW: validate_unidirectional_mapping(mapping, src_info=res_info[res_key2], tgt_info=res_info[res_key1]) else: raise ConfigError(f"Invalid direction `{direction}` found in the permissions_mapping.")
[docs]def validate_sync_config(sync_cfg: SyncPermissionConfig) -> None: # validate and get all resources info res_info = {} for resources in sync_cfg["services"].values(): for res_key in resources: if res_key in res_info: raise ConfigErrorInvalidResourceKey(f"Found duplicate resource key {res_key} in config. Config resource" " keys should be unique even between different services.") res_info[res_key] = validate_and_get_resource_info(res_key, resources[res_key]) validate_sync_mapping_config(sync_cfg, res_info)
[docs]def validate_sync_config_services(sync_cfg: SyncPermissionConfig, available_services: List[str]) -> None: """ Validates if all services used in the sync config are actual available services. All services should correspond to actual services available in Magpie. """ for svc in sync_cfg["services"]: if svc not in available_services: raise ConfigErrorInvalidServiceKey(f"Service `{svc}` used in sync config is not valid since it was not " f"found in Magpie services ({available_services}).")