Source code for cowbird.permissions_synchronizer

import copy
from typing import TYPE_CHECKING

from cowbird.config import get_all_configs
from cowbird.services.service_factory import ServiceFactory
from cowbird.utils import get_config_path, get_logger

if TYPE_CHECKING:
    from typing import Callable, Dict, Generator, List, Tuple

    from cowbird.services.impl.magpie import Magpie

[docs] SyncPointServicesType = Dict[str, str]
SyncPointMappingType = List[Dict[str, List[str]]]
[docs]LOGGER = get_logger(__name__)
[docs]class Permission: """ Define every property required to set a permission in Magpie. """ def __init__(self, service_name, # type: str resource_id, # type: str resource_full_name, # type: str name, # type: str access, # type: str scope, # type: str user=None, # type: str group=None # type: str ): # type: (...) -> None self.service_name = service_name self.resource_id = resource_id self.resource_full_name = resource_full_name self.name = name self.access = access self.scope = scope self.user = user self.group = group
[docs] def __eq__(self, other): # type: (Permission) -> bool return (self.service_name == other.service_name and self.resource_id == other.resource_id and self.resource_full_name == other.resource_full_name and self.name == other.name and self.access == other.access and self.scope == other.scope and self.user == other.user and self.group == other.group)
[docs]class SyncPoint: """ A sync point contain services sharing resources via multiple API. It defines how the same resource is defined in each service and what are the mapping between permission accesses. """ def __init__(self, services, # type: SyncPointServicesType mapping # type: SyncPointMappingType ): # type: (...) -> None """ Init the sync point, holding services with their respective resources root and how access are mapped between them. @param services: Dict, where the service is the key and its resources root is the value @param mapping: List of dict where the service is the key and an access list is the value """ available_services = ServiceFactory().services_cfg.keys() # Make sure that only active services are used self.services = {svc: svc_cfg for svc, svc_cfg in services.items() if svc in available_services} self.mapping = [{svc: perms for svc, perms in mapping_pt.items() if svc in available_services} for mapping_pt in mapping]
[docs] def resource_match(self, permission): # type: (Permission) -> bool """ Define if the permission name is covered by this sync point. """ return permission.resource_full_name.startswith(self.services[permission.service_name])
[docs] def find_match(self, permission): # type: (Permission) -> Generator[Tuple[str, str], None, None] """ Search and yield for every match a (service, permission name) tuple that is mapped with this permission. """ # check if the permission name is covered by this sync point if not self.resource_match(permission): return # For each permission mapping for mapping in self.mapping: # Does the current service has some mapped permissions? if permission.service_name not in mapping: continue # Does the current permission access is covered? if permission.name not in mapping[permission.service_name]: continue # This permission is mapped : yields matches for svc, mapped_perm_name in mapping.items(): # Don't synch with itself if svc == permission.service_name: continue for perm_name in mapped_perm_name: yield svc, perm_name
[docs] def sync(self, perm_operation, permission): # type: (Callable[Permission], Permission) -> None """ Create or delete the same permission on each service sharing the same resource. @param perm_operation Magpie create_permission or delete_permission function @param permission Permission to synchronize with others services """ res_common_part_idx = len(self.services[permission.service_name]) for svc, perm_name in self.find_match(permission): new_permission = copy.copy(permission) new_permission.service_name = svc new_permission.resource_full_name = self.services[svc] + \ permission.resource_full_name[res_common_part_idx:] new_permission.resource_id = ServiceFactory().get_service(svc).get_resource_id( new_permission.resource_full_name) new_permission.name = perm_name perm_operation(new_permission)
[docs]class PermissionSynchronizer(object): """ Keep service-shared resources in sync when permissions are updated for one of them. .. todo:: At some point we will need a consistency function that goes through all permissions of all services and make sure that linked services have the same permissions. """ def __init__(self, magpie_inst): # type: (Magpie) -> None config_path = get_config_path() sync_perm_cfgs = get_all_configs(config_path, "sync_permissions", allow_missing=True) self.sync_point = [] self.magpie_inst = magpie_inst for sync_perm_config in sync_perm_cfgs: if not sync_perm_config: LOGGER.warning("Sync_permissions configuration is empty.") continue for sync_cfg in sync_perm_config.values(): self.sync_point.append(SyncPoint(services=sync_cfg["services"], mapping=sync_cfg["permissions_mapping"]))
[docs] def create_permission(self, permission): # type: (Permission) -> None """ Create the same permission on each service sharing the same resource. """ for point in self.sync_point: point.sync(self.magpie_inst.create_permission, permission)
[docs] def delete_permission(self, permission): # type: (Permission) -> None """ Delete the same permission on each service sharing the same resource. """ for point in self.sync_point: point.sync(self.magpie_inst.delete_permission, permission)