#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import logging
import os
import sys
import types
from configparser import ConfigParser
from enum import Enum
from inspect import isclass, isfunction
from typing import TYPE_CHECKING
from pyramid.config import Configurator
from pyramid.httpexceptions import HTTPClientError, HTTPException
from pyramid.registry import Registry
from pyramid.request import Request
from pyramid.response import Response
from pyramid.settings import truthy
from requests.structures import CaseInsensitiveDict
from webob.headers import EnvironHeaders, ResponseHeaders
from cowbird import __meta__
from cowbird.constants import get_constant, validate_required
if TYPE_CHECKING:
# pylint: disable=W0611,unused-import
from typing import _TC # noqa: E0611,F401,W0212 # pylint: disable=E0611
from typing import Any, List, NoReturn, Optional, Type, Union
from pyramid.events import NewRequest
from cowbird.typedefs import (
AnyHeadersType,
AnyKey,
AnyResponseType,
AnySettingsContainer,
SettingsType,
)
[docs]CONTENT_TYPE_ANY = "*/*"
[docs]CONTENT_TYPE_JSON = "application/json"
[docs]CONTENT_TYPE_FORM = "application/x-www-form-urlencoded"
[docs]CONTENT_TYPE_HTML = "text/html"
[docs]CONTENT_TYPE_PLAIN = "text/plain"
[docs]CONTENT_TYPE_APP_XML = "application/xml"
[docs]CONTENT_TYPE_TXT_XML = "text/xml"
}
[docs]SUPPORTED_ACCEPT_TYPES = [
CONTENT_TYPE_JSON, CONTENT_TYPE_HTML, CONTENT_TYPE_PLAIN, CONTENT_TYPE_APP_XML, CONTENT_TYPE_TXT_XML
]
[docs]KNOWN_CONTENT_TYPES = SUPPORTED_ACCEPT_TYPES + [CONTENT_TYPE_FORM, CONTENT_TYPE_ANY]
[docs]def get_logger(name, level=None, force_stdout=None, message_format=None, datetime_format=None):
# type: (str, Optional[int], bool, Optional[str], Optional[str]) -> logging.Logger
"""
Immediately sets the logger level to avoid duplicate log outputs from the `root logger` and `this logger` when
`level` is ``logging.NOTSET``.
"""
logger = logging.getLogger(name)
if logger.level == logging.NOTSET:
# use package log level if it was specified via ini config with logger sections
level = level or logging.getLogger(__meta__.__package__).getEffectiveLevel()
if not level:
# pylint: disable=C0415 # avoid circular import
from cowbird.constants import COWBIRD_LOG_LEVEL
level = COWBIRD_LOG_LEVEL
logger.setLevel(level)
if force_stdout or message_format or datetime_format:
set_logger_config(logger, force_stdout, message_format, datetime_format)
return logger
[docs]LOGGER = get_logger(__name__)
[docs]def set_logger_config(logger, force_stdout=False, message_format=None, datetime_format=None):
# type: (logging.Logger, bool, Optional[str], Optional[str]) -> logging.Logger
"""
Applies the provided logging configuration settings to the logger.
"""
if not logger:
return logger
handler = None
if force_stdout:
all_handlers = logging.root.handlers + logger.handlers
if not any(isinstance(h, logging.StreamHandler) for h in all_handlers):
handler = logging.StreamHandler(sys.stdout)
logger.addHandler(handler) # noqa: type
if not handler:
if logger.handlers:
handler = logger.handlers
else:
handler = logging.StreamHandler(sys.stdout)
logger.addHandler(handler)
if message_format or datetime_format:
handler.setFormatter(logging.Formatter(fmt=message_format, datefmt=datetime_format))
return logger
[docs]def print_log(msg, logger=None, level=logging.INFO, **kwargs):
# type: (str, Optional[logging.Logger], int, Any) -> None
"""
Logs the requested message to the logger and optionally enforce printing to the console according to configuration
value defined by ``COWBIRD_LOG_PRINT``.
"""
# pylint: disable=C0415 # cannot use 'get_constant', recursive call
from cowbird.constants import COWBIRD_LOG_PRINT
if not logger:
logger = get_logger(__name__)
if COWBIRD_LOG_PRINT:
set_logger_config(logger, force_stdout=True)
if logger.disabled:
logger.disabled = False
logger.log(level, msg, **kwargs)
[docs]def raise_log(msg, exception=Exception, logger=None, level=logging.ERROR):
# type: (str, Type[Exception], Optional[logging.Logger], int) -> NoReturn
"""
Logs the provided message to the logger and raises the corresponding exception afterwards.
:raises exception: whichever exception provided is raised systematically after logging.
"""
if not logger:
logger = get_logger(__name__)
logger.log(level, msg)
if not isclass(exception) or not issubclass(exception, Exception):
exception = Exception
raise exception(msg)
[docs]def bool2str(value):
# type: (Any) -> str
"""
Converts :paramref:`value` to explicit ``"true"`` or ``"false"`` :class:`str` with permissive variants comparison
that can represent common falsy or truthy values.
"""
return "true" if str(value).lower() in truthy else "false"
[docs]def islambda(func):
# type: (Any) -> bool
"""
Evaluate if argument is a callable :class:`lambda` expression.
"""
return isinstance(func, types.LambdaType) and func.__name__ == (lambda: None).__name__ # noqa
[docs]def get_app_config(container):
# type: (AnySettingsContainer) -> Configurator
"""
Generates application configuration with all required utilities and settings configured.
"""
import cowbird.constants # pylint: disable=C0415 # to override specific constants/variables
# override INI config path if provided with --paste to gunicorn, otherwise use environment variable
config_settings = get_settings(container)
config_env = get_constant("COWBIRD_INI_FILE_PATH", config_settings, raise_missing=True)
config_ini = (container or {}).get("__file__", config_env)
if config_ini != config_env:
cowbird.constants.COWBIRD_INI_FILE_PATH = config_ini
config_settings["cowbird.ini_file_path"] = config_ini
settings = get_settings_from_config_ini(config_ini)
settings.update(config_settings)
print_log("Setting up loggers...", LOGGER)
log_lvl = get_constant("COWBIRD_LOG_LEVEL", settings, "cowbird.log_level", default_value="INFO",
raise_missing=False, raise_not_set=False, print_missing=True)
# apply proper value in case it was in ini AND env since up until then, only env was check
# we want to prioritize the ini definition
cowbird.constants.COWBIRD_LOG_LEVEL = log_lvl
LOGGER.setLevel(log_lvl)
print_log("Validate settings that require explicit definitions...", LOGGER)
validate_required(settings)
# avoid cornice conflicting with pyramid exception views
settings["handle_exceptions"] = False
# create configurator or use one defined as input to preserve previous setup/include/etc.
config = Configurator() if not isinstance(container, Configurator) else container
config.setup_registry(settings=settings)
# don't use scan otherwise modules like 'cowbird.adapter' are
# automatically found and cause import errors on missing packages
print_log("Including Cowbird modules...", LOGGER)
config.include("pyramid_mako")
config.include("cowbird")
# NOTE: don't call 'config.scan("cowbird")' to avoid parsing issues with colander/cornice,
# add them explicitly with 'config.include(<module>)', and then they can do 'config.scan()'
return config
[docs]def get_settings_from_config_ini(config_ini_path, section=None):
"""
Loads configuration INI settings with additional handling.
"""
parser = ConfigParser()
parser.optionxform = lambda option: option # preserve case of config (default applies lowercase)
result = parser.read([config_ini_path])
# raise silently ignored missing file
if len(result) != 1 or not os.path.isfile(result[0]):
if result:
result = result[0] or os.path.abspath(str(config_ini_path)) # in case not found, use expected location
message = "Cannot find INI configuration file [{}] resolved as [{}]".format(config_ini_path, result)
else:
message = "Cannot find INI configuration file [{}]".format(config_ini_path)
raise ValueError(message)
if section is None:
section = "app:{}_app".format(__meta__.__package__)
return dict(parser.items(section=section))
[docs]def get_json(response):
"""
Retrieves the 'JSON' body of a response using the property/callable according to the response's implementation.
"""
if isinstance(response.json, dict):
return response.json
return response.json()
[docs]def convert_response(response):
# type: (AnyResponseType) -> Response
"""
Converts a :class:`requests.Response` object to an equivalent :class:`pyramid.response.Response` object.
Content of the :paramref:`response` is expected to be JSON.
:param response: response to be converted
:returns: converted response
"""
if isinstance(response, Response):
return response
json_body = get_json(response)
pyramid_response = Response(body=json_body, headers=response.headers)
if hasattr(response, "cookies"):
for cookie in response.cookies:
pyramid_response.set_cookie(name=cookie.name, value=cookie.value, overwrite=True) # noqa
if isinstance(response, HTTPException):
for header_name, header_value in response.headers._items: # noqa # pylint: disable=W0212
if header_name.lower() == "set-cookie":
pyramid_response.set_cookie(name=header_name, value=header_value, overwrite=True)
return pyramid_response
[docs]def get_settings(container):
# type: (AnySettingsContainer) -> SettingsType
if isinstance(container, (Configurator, Request)):
return container.registry.settings # noqa
if isinstance(container, Registry):
return container.settings
if isinstance(container, dict):
return container
raise TypeError("Could not retrieve settings from container object [{}]".format(type(container)))
[docs]def fully_qualified_name(obj):
# type: (Union[Any, Type[Any]]) -> str
"""
Obtains the ``'<module>.<name>'`` full path definition of the object to allow finding and importing it.
"""
cls = obj if isclass(obj) or isfunction(obj) else type(obj)
return ".".join([obj.__module__, cls.__name__])
[docs]def log_request(event):
# type: (NewRequest) -> None
"""
Subscriber event that logs basic details about the incoming requests.
"""
request = event.request # type: Request
LOGGER.info("Request: [%s]", log_request_format(request))
if LOGGER.isEnabledFor(logging.DEBUG):
def items_str(items):
return "\n ".join(["{!s}: {!s}".format(h, items[h]) for h in items]) if len(items) else "-"
header_str = items_str(request.headers)
params_str = items_str(request.params)
body_str = str(request.body) or "-"
LOGGER.debug("Request details:\n"
"URL: %s\n"
"Path: %s\n"
"Method: %s\n"
"Headers:\n"
" %s\n"
"Parameters:\n"
" %s\n"
"Body:\n"
" %s",
request.url, request.path, request.method, header_str, params_str, body_str)
[docs]def log_exception_tween(handler, registry): # noqa: F811
"""
Tween factory that logs any exception before re-raising it.
Application errors are marked as ``ERROR`` while non critical HTTP errors are marked as ``WARNING``.
"""
def log_exc(request):
try:
return handler(request)
except Exception as err:
lvl = logging.ERROR
exc = True
if isinstance(err, HTTPClientError):
lvl = logging.WARNING
exc = False
LOGGER.log(lvl, "Exception during request: [%s]", log_request_format(request), exc_info=exc)
raise err
return log_exc
[docs]def is_json_body(body):
# type: (Any) -> bool
if not body:
return False
try:
json.loads(body)
except (ValueError, TypeError):
return False
return True
# note: must not define any enum value here to allow inheritance by subclasses
[docs]class ExtendedEnum(Enum):
"""
Utility :class:`enum.Enum` methods.
Create an extended enum with these utilities as follows::
class CustomEnum(ExtendedEnum):
ItemA = "A"
ItemB = "B"
"""
@classmethod
[docs] def names(cls):
# type: () -> List[str]
"""
Returns the member names assigned to corresponding enum elements.
"""
return list(cls.__members__)
@classmethod
[docs] def values(cls):
# type: () -> List[AnyKey]
"""
Returns the literal values assigned to corresponding enum elements.
"""
return [m.value for m in cls.__members__.values()] # pylint: disable=E1101
@classmethod
[docs] def get(cls, key_or_value, default=None):
# type: (AnyKey, Optional[Any]) -> Optional[_TC]
"""
Finds an enum entry by defined name or its value.
Returns the entry directly if it is already a valid enum.
"""
# Python 3.8 disallow direct check of 'str' in 'enum'
members = [member for member in cls]
if key_or_value in members: # pylint: disable=E1133
return key_or_value
for m_key, m_val in cls.__members__.items(): # pylint: disable=E1101
if key_or_value == m_key or key_or_value == m_val.value: # pylint: disable=R1714
return m_val
return default
# taken from https://stackoverflow.com/questions/6760685/creating-a-singleton-in-python
# works in Python 2 & 3
[docs]class NullType(object, metaclass=SingletonMeta):
"""
Represents a null value to differentiate from None.
"""
[docs] def __repr__(self):
return "<null>"
@staticmethod
[docs] def __nonzero__():
return False
[docs]null = NullType() # pylint: disable=C0103,invalid-name
[docs]def is_null(item):
return isinstance(item, NullType) or item is null