import inspect
from typing import Callable
import requests
from pyramid.httpexceptions import HTTPBadRequest, HTTPInternalServerError, HTTPOk
from pyramid.request import Request
from pyramid.view import view_config
from cowbird.api import exception as ax
from cowbird.api import requests as ar
from cowbird.api import schemas as s
from cowbird.api.schemas import ValidOperations
from cowbird.handlers import Handler, get_handlers
from cowbird.permissions_synchronizer import Permission
from cowbird.typedefs import AnyResponseType
from cowbird.utils import CONTENT_TYPE_JSON, get_logger, get_ssl_verify, get_timeout
[docs]
LOGGER = get_logger(__name__)
[docs]
class WebhookDispatchException(Exception):
"""
Error indicating that an exception occurred during a webhook dispatch.
"""
[docs]
def dispatch(handler_fct: Callable[[Handler], None]) -> None:
exceptions = []
event_name = inspect.getsource(handler_fct).split(":")[1].strip()
handlers = get_handlers()
for handler in handlers:
# Allow every handler to be notified even if one of them throw an error
try:
LOGGER.info("Dispatching event [%s] for handler [%s].", event_name, handler.name)
handler_fct(handler)
except Exception as exception: # noqa
exceptions.append(exception)
LOGGER.error("Exception raised while handling event [%s] for handler [%s] : [%r].",
event_name, handler.name, exception, exc_info=True)
if not handlers:
LOGGER.warning("No handlers matched for dispatch of event [%s].", event_name)
if exceptions:
raise WebhookDispatchException(exceptions)
@s.UserWebhookAPI.post(schema=s.UserWebhook_POST_RequestSchema, tags=[s.WebhooksTag],
response_schemas=s.UserWebhook_POST_responses)
@view_config(route_name=s.UserWebhookAPI.name, request_method="POST")
[docs]
def post_user_webhook_view(request: Request) -> AnyResponseType:
"""
User webhook used for created or removed user events.
"""
event = ar.get_multiformat_body(request, "event")
ax.verify_param(event, param_name="event",
param_compare=ValidOperations.values(),
is_in=True,
http_error=HTTPBadRequest,
msg_on_fail=s.UserWebhook_POST_BadRequestResponseSchema.description)
user_name = ar.get_multiformat_body(request, "user_name")
LOGGER.debug("Received user webhook event [%s] for user [%s].", event, user_name)
if event == ValidOperations.CreateOperation.value:
# FIXME: Tried with ax.URL_REGEX, but cannot match what seems valid urls...
callback_url = ar.get_multiformat_body(request, "callback_url", pattern=None)
def handler_fct(handler: Handler) -> None:
handler.user_created(user_name=user_name)
else:
callback_url = None
def handler_fct(handler: Handler) -> None:
handler.user_deleted(user_name=user_name)
try:
dispatch(handler_fct)
except Exception as dispatch_exc: # noqa
if callback_url:
# If something bad happens, set the status as erroneous in Magpie
LOGGER.warning("Exception occurred while dispatching event [%s], "
"calling Magpie callback url : [%s]", event, callback_url, exc_info=dispatch_exc)
try:
timeout = get_timeout(request) # false positive security warning when passed directly
requests.head(callback_url, verify=get_ssl_verify(request), timeout=timeout)
except requests.exceptions.RequestException as exc:
LOGGER.warning("Cannot complete the Magpie callback url request to [%s] : [%s]", callback_url, exc)
else:
LOGGER.warning("Exception occurred while dispatching event [%s].", event, exc_info=dispatch_exc)
ax.raise_http(HTTPInternalServerError,
detail=s.UserWebhook_POST_InternalServerErrorResponseSchema.description,
content_type=CONTENT_TYPE_JSON,
content={
"webhook": request.json_body,
"exception": repr(dispatch_exc)
})
return ax.valid_http(HTTPOk, detail=s.UserWebhook_POST_OkResponseSchema.description)
@s.PermissionWebhookAPI.post(schema=s.PermissionWebhook_POST_RequestSchema, tags=[s.WebhooksTag],
response_schemas=s.PermissionWebhook_POST_responses)
@view_config(route_name=s.PermissionWebhookAPI.name, request_method="POST")
[docs]
def post_permission_webhook_view(request: Request) -> AnyResponseType:
"""
Permission webhook used for created or removed permission events.
"""
event = ar.get_multiformat_body(request, "event")
ax.verify_param(event, param_name="event",
param_compare=ValidOperations.values(),
is_in=True,
http_error=HTTPBadRequest,
msg_on_fail=s.PermissionWebhook_POST_BadRequestResponseSchema.description)
# Use raw value for service name, to avoid errors with `None` values
# when the permission is not applied to a `service` type resource.
service_name = ar.get_multiformat_body(request, "service_name", check_type=(str, type(None)))
service_type = ar.get_multiformat_body(request, "service_type")
resource_id = ar.get_multiformat_body(request, "resource_id", check_type=int)
param_regex_with_slashes = r"^/?[A-Za-z0-9]+(?:[\s_\-\./:][A-Za-z0-9]+)*$"
resource_full_name = ar.get_multiformat_body(request, "resource_full_name",
pattern=param_regex_with_slashes)
resource_display_name = ar.get_multiformat_body(request, "resource_display_name", check_type=(str, type(None)))
name = ar.get_multiformat_body(request, "name")
access = ar.get_multiformat_body(request, "access")
scope = ar.get_multiformat_body(request, "scope")
user = ar.get_multiformat_body(request, "user", check_type=(str, type(None)))
group = ar.get_multiformat_body(request, "group", check_type=(str, type(None)))
ax.verify_param(bool(user or group), is_true=True, http_error=HTTPBadRequest,
msg_on_fail=s.PermissionWebhook_POST_BadRequestResponseSchema.description)
permission = Permission(
service_name=service_name,
service_type=service_type,
resource_id=resource_id,
resource_full_name=resource_full_name,
resource_display_name=resource_display_name,
name=name,
access=access,
scope=scope,
user=user,
group=group
)
LOGGER.debug("Received permission webhook event [%s] for [%s].", event, permission)
if event == ValidOperations.CreateOperation.value:
dispatch(lambda handler: handler.permission_created(permission=permission))
else:
dispatch(lambda handler: handler.permission_deleted(permission=permission))
return ax.valid_http(HTTPOk, detail=s.PermissionWebhook_POST_OkResponseSchema.description)