#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import logging
import os
import sys
import types
from configparser import ConfigParser
from enum import Enum
from inspect import isclass, isfunction
from typing import TYPE_CHECKING
from pyramid.config import Configurator
from pyramid.httpexceptions import HTTPClientError, HTTPException
from pyramid.registry import Registry
from pyramid.request import Request
from pyramid.response import Response
from pyramid.settings import truthy
from pyramid.threadlocal import get_current_registry
from requests.structures import CaseInsensitiveDict
from webob.headers import EnvironHeaders, ResponseHeaders
from cowbird import __meta__
from cowbird.constants import get_constant, validate_required
if TYPE_CHECKING:
# pylint: disable=W0611,unused-import
from typing import _TC # noqa: E0611,F401,W0212 # pylint: disable=E0611
from typing import Any, List, NoReturn, Optional, Type, Union
from pyramid.events import NewRequest
from cowbird.typedefs import AnyHeadersType, AnyKey, AnyResponseType, AnySettingsContainer, SettingsType
[docs]CONTENT_TYPE_ANY = "*/*"
[docs]CONTENT_TYPE_JSON = "application/json"
[docs]CONTENT_TYPE_FORM = "application/x-www-form-urlencoded"
[docs]CONTENT_TYPE_HTML = "text/html"
[docs]CONTENT_TYPE_PLAIN = "text/plain"
[docs]CONTENT_TYPE_APP_XML = "application/xml"
[docs]CONTENT_TYPE_TXT_XML = "text/xml"
}
[docs]SUPPORTED_ACCEPT_TYPES = [
CONTENT_TYPE_JSON, CONTENT_TYPE_HTML, CONTENT_TYPE_PLAIN, CONTENT_TYPE_APP_XML, CONTENT_TYPE_TXT_XML
]
[docs]KNOWN_CONTENT_TYPES = SUPPORTED_ACCEPT_TYPES + [CONTENT_TYPE_FORM, CONTENT_TYPE_ANY]
[docs]def get_logger(name, level=None, force_stdout=None, message_format=None, datetime_format=None):
# type: (str, Optional[int], bool, Optional[str], Optional[str]) -> logging.Logger
"""
Immediately sets the logger level to avoid duplicate log outputs from the `root logger` and `this logger` when
`level` is ``logging.NOTSET``.
"""
logger = logging.getLogger(name)
if logger.level == logging.NOTSET:
# use package log level if it was specified via ini config with logger sections
level = level or logging.getLogger(__meta__.__package__).getEffectiveLevel()
if not level:
# pylint: disable=C0415 # avoid circular import
from cowbird.constants import COWBIRD_LOG_LEVEL
level = COWBIRD_LOG_LEVEL
logger.setLevel(level)
if force_stdout or message_format or datetime_format:
set_logger_config(logger, force_stdout, message_format, datetime_format)
return logger
[docs]LOGGER = get_logger(__name__)
[docs]def set_logger_config(logger, force_stdout=False, message_format=None, datetime_format=None):
# type: (logging.Logger, bool, Optional[str], Optional[str]) -> logging.Logger
"""
Applies the provided logging configuration settings to the logger.
"""
if not logger:
return logger
handler = None
if force_stdout:
all_handlers = logging.root.handlers + logger.handlers
if not any(isinstance(h, logging.StreamHandler) for h in all_handlers):
handler = logging.StreamHandler(sys.stdout)
logger.addHandler(handler) # noqa: type
if not handler:
if logger.handlers:
handler = logger.handlers
else:
handler = logging.StreamHandler(sys.stdout)
logger.addHandler(handler)
if message_format or datetime_format:
handler.setFormatter(logging.Formatter(fmt=message_format, datefmt=datetime_format))
return logger
[docs]def print_log(msg, logger=None, level=logging.INFO, **kwargs):
# type: (str, Optional[logging.Logger], int, Any) -> None
"""
Logs the requested message to the logger and optionally enforce printing to the console according to configuration
value defined by ``COWBIRD_LOG_PRINT``.
"""
# pylint: disable=C0415 # cannot use 'get_constant', recursive call
from cowbird.constants import COWBIRD_LOG_PRINT
if not logger:
logger = get_logger(__name__)
if COWBIRD_LOG_PRINT:
set_logger_config(logger, force_stdout=True)
if logger.disabled:
logger.disabled = False
logger.log(level, msg, **kwargs)
[docs]def raise_log(msg, exception=Exception, logger=None, level=logging.ERROR):
# type: (str, Type[Exception], Optional[logging.Logger], int) -> NoReturn
"""
Logs the provided message to the logger and raises the corresponding exception afterwards.
:raises exception: whichever exception provided is raised systematically after logging.
"""
if not logger:
logger = get_logger(__name__)
logger.log(level, msg)
if not isclass(exception) or not issubclass(exception, Exception):
exception = Exception
raise exception(msg)
[docs]def bool2str(value):
# type: (Any) -> str
"""
Converts :paramref:`value` to explicit ``"true"`` or ``"false"`` :class:`str` with permissive variants comparison
that can represent common falsy or truthy values.
"""
return "true" if str(value).lower() in truthy else "false"
[docs]def islambda(func):
# type: (Any) -> bool
"""
Evaluate if argument is a callable :class:`lambda` expression.
"""
return isinstance(func, types.LambdaType) and func.__name__ == (lambda: None).__name__ # noqa
[docs]def get_app_config(container, celery=True):
# type: (AnySettingsContainer, bool) -> Configurator
"""
Generates application configuration with all required utilities and settings configured.
"""
import cowbird.constants # pylint: disable=C0415 # to override specific constants/variables
# override INI config path if provided with --paste to gunicorn, otherwise use environment variable
config_settings = get_settings(container)
config_env = get_constant("COWBIRD_INI_FILE_PATH", config_settings, raise_missing=True)
config_ini = (container or {}).get("__file__", config_env)
if config_ini != config_env:
cowbird.constants.COWBIRD_INI_FILE_PATH = config_ini
config_settings["cowbird.ini_file_path"] = config_ini
settings = get_settings_from_config_ini(config_ini)
settings.update(config_settings)
print_log("Setting up loggers...", LOGGER)
log_lvl = get_constant("COWBIRD_LOG_LEVEL", settings, "cowbird.log_level", default_value="INFO",
raise_missing=False, raise_not_set=False, print_missing=True)
# apply proper value in case it was in ini AND env since up until then, only env was check
# we want to prioritize the ini definition
cowbird.constants.COWBIRD_LOG_LEVEL = log_lvl
LOGGER.setLevel(log_lvl)
print_log("Validate settings that require explicit definitions...", LOGGER)
validate_required(settings)
# avoid cornice conflicting with pyramid exception views
settings["handle_exceptions"] = False
# create configurator or use one defined as input to preserve previous setup/include/etc.
config = Configurator() if not isinstance(container, Configurator) else container
config.setup_registry(settings=settings)
# don't use scan otherwise modules like 'cowbird.adapter' are
# automatically found and cause import errors on missing packages
print_log("Including Cowbird modules...", LOGGER)
config.include("pyramid_mako")
config.include("cowbird")
# NOTE: don't call 'config.scan("cowbird")' to avoid parsing issues with colander/cornice,
# add them explicitly with 'config.include(<module>)', and then they can do 'config.scan()'
if celery:
config.include("pyramid_celery")
config.configure_celery(config_ini)
return config
[docs]def get_settings_from_config_ini(config_ini_path, section=None):
"""
Loads configuration INI settings with additional handling.
"""
parser = ConfigParser()
parser.optionxform = lambda option: option # preserve case of config (default applies lowercase)
result = parser.read([config_ini_path])
# raise silently ignored missing file
if len(result) != 1 or not os.path.isfile(result[0]):
if result:
result = result[0] or os.path.abspath(str(config_ini_path)) # in case not found, use expected location
message = "Cannot find INI configuration file [{}] resolved as [{}]".format(config_ini_path, result)
else:
message = "Cannot find INI configuration file [{}]".format(config_ini_path)
raise ValueError(message)
if section is None:
section = "app:{}_app".format(__meta__.__package__)
return dict(parser.items(section=section))
[docs]def get_json(response):
"""
Retrieves the 'JSON' body of a response using the property/callable according to the response's implementation.
"""
if isinstance(response.json, dict):
return response.json
return response.json()
[docs]def convert_response(response):
# type: (AnyResponseType) -> Response
"""
Converts a :class:`requests.Response` object to an equivalent :class:`pyramid.response.Response` object.
Content of the :paramref:`response` is expected to be JSON.
:param response: response to be converted
:returns: converted response
"""
if isinstance(response, Response):
return response
json_body = get_json(response)
pyramid_response = Response(body=json_body, headers=response.headers)
if hasattr(response, "cookies"):
for cookie in response.cookies:
pyramid_response.set_cookie(name=cookie.name, value=cookie.value, overwrite=True) # noqa
if isinstance(response, HTTPException):
for header_name, header_value in response.headers._items: # noqa # pylint: disable=W0212
if header_name.lower() == "set-cookie":
pyramid_response.set_cookie(name=header_name, value=header_value, overwrite=True)
return pyramid_response
[docs]def get_settings(container, app=False):
# type: (Optional[AnySettingsContainer], bool) -> SettingsType
"""
Retrieve application settings from a supported container.
:param container: supported container with an handle to application settings.
:param app: allow retrieving from current thread registry if no container was defined.
:return: found application settings dictionary.
:raise TypeError: when no application settings could be found or unsupported container.
"""
if isinstance(container, (Configurator, Request)):
return container.registry.settings # noqa
if isinstance(container, Registry):
return container.settings
if isinstance(container, dict):
return container
if container is None and app:
print_log("Using settings from local thread.", level=logging.DEBUG)
registry = get_current_registry()
return registry.settings
raise TypeError("Could not retrieve settings from container object [{}]".format(type(container)))
[docs]def fully_qualified_name(obj):
# type: (Union[Any, Type[Any]]) -> str
"""
Obtains the ``'<module>.<name>'`` full path definition of the object to allow finding and importing it.
"""
cls = obj if isclass(obj) or isfunction(obj) else type(obj)
return ".".join([obj.__module__, cls.__name__])
[docs]def log_request(event):
# type: (NewRequest) -> None
"""
Subscriber event that logs basic details about the incoming requests.
"""
request = event.request # type: Request
LOGGER.info("Request: [%s]", log_request_format(request))
if LOGGER.isEnabledFor(logging.DEBUG):
def items_str(items):
return "\n ".join(["{!s}: {!s}".format(h, items[h]) for h in items]) if len(items) else "-"
header_str = items_str(request.headers)
params_str = items_str(request.params)
body_str = str(request.body) or "-"
LOGGER.debug("Request details:\n"
"URL: %s\n"
"Path: %s\n"
"Method: %s\n"
"Headers:\n"
" %s\n"
"Parameters:\n"
" %s\n"
"Body:\n"
" %s",
request.url, request.path, request.method, header_str, params_str, body_str)
[docs]def log_exception_tween(handler, registry): # noqa: F811
"""
Tween factory that logs any exception before re-raising it.
Application errors are marked as ``ERROR`` while non critical HTTP errors are marked as ``WARNING``.
"""
def log_exc(request):
try:
return handler(request)
except Exception as err:
lvl = logging.ERROR
exc = True
if isinstance(err, HTTPClientError):
lvl = logging.WARNING
exc = False
LOGGER.log(lvl, "Exception during request: [%s]", log_request_format(request), exc_info=exc)
raise err
return log_exc
[docs]def is_json_body(body):
# type: (Any) -> bool
if not body:
return False
try:
json.loads(body)
except (ValueError, TypeError):
return False
return True
# note: must not define any enum value here to allow inheritance by subclasses
[docs]class ExtendedEnum(Enum):
"""
Utility :class:`enum.Enum` methods.
Create an extended enum with these utilities as follows::
class CustomEnum(ExtendedEnum):
ItemA = "A"
ItemB = "B"
"""
@classmethod
[docs] def names(cls):
# type: () -> List[str]
"""
Returns the member names assigned to corresponding enum elements.
"""
return list(cls.__members__)
@classmethod
[docs] def values(cls):
# type: () -> List[AnyKey]
"""
Returns the literal values assigned to corresponding enum elements.
"""
return [m.value for m in cls.__members__.values()] # pylint: disable=E1101
@classmethod
[docs] def get(cls, key_or_value, default=None):
# type: (AnyKey, Optional[Any]) -> Optional[_TC]
"""
Finds an enum entry by defined name or its value.
Returns the entry directly if it is already a valid enum.
"""
# Python 3.8 disallow direct check of 'str' in 'enum'
members = [member for member in cls]
if key_or_value in members: # pylint: disable=E1133
return key_or_value
for m_key, m_val in cls.__members__.items(): # pylint: disable=E1101
if key_or_value == m_key or key_or_value == m_val.value: # pylint: disable=R1714
return m_val
return default
# taken from https://stackoverflow.com/questions/6760685/creating-a-singleton-in-python
# works in Python 2 & 3
[docs]class NullType(object, metaclass=SingletonMeta):
"""
Represents a null value to differentiate from None.
"""
[docs] def __repr__(self):
return "<null>"
@staticmethod
[docs] def __nonzero__():
return False
[docs]null = NullType() # pylint: disable=C0103,invalid-name
[docs]def is_null(item):
return isinstance(item, NullType) or item is null
[docs]def get_config_path():
settings = get_settings(None, app=True)
return get_constant("COWBIRD_CONFIG_PATH", settings,
default_value=None,
raise_missing=False, raise_not_set=False,
print_missing=True)