Source code for cowbird.request_task

from abc import ABC
from typing import Tuple

from celery.app.task import Task
from requests.exceptions import RequestException


[docs]class AbortException(Exception): """ Exception raised when the chain must be interrupted. """
[docs]class RequestTask(Task, ABC): # type: ignore[type-arg] """ Celery base task that should be used to handle API requests. Using this class will set the following Task configuration : - auto-retry for every RequestException - backoff and jitter strategy There is also an abort_chain function to stop the chain of requests in case of an unrecoverable event To use this class simply decorate your asynchronous function like this: .. code-block:: python from celery import shared_task @shared_task(bind=True, base=RequestTask) def function_name(self, any, wanted, parameters): pass # function operations Parameter ``bind=True`` will provide the self argument to the function which is the celery Task (not required). Parameter ``base=RequestTask`` will instantiate a RequestTask rather than a base celery Task as the self object. """
[docs] autoretry_for: Tuple[Exception] = (RequestException,)
""" Exceptions that are accepted as valid raising cases to attempt request retry. """
[docs] retry_backoff = True
""" Enable backoff strategy during request retry upon known raised exception. """
[docs] retry_backoff_max = 600 # Max backoff to 10 min
""" Maximum backoff delay permitted using request retry. Retries are abandoned if this delay is reached. """
[docs] retry_jitter = True
""" Enable jitter strategy during request retry upon known raised exception. """
[docs] retry_kwargs = {"max_retries": 15}
""" Additional parameters to be passed down to requests for retry control. """
[docs] def abort_chain(self) -> None: """ Calling this function from a task will prevent any downstream tasks to be run after it. """ raise AbortException("Aborting chain")