"""
Stores to read/write data to from/to `MongoDB` using pymongo.
"""
import abc
import logging
from typing import Any, Dict, List, Tuple
import pymongo
from pymongo.collection import Collection
from cowbird.monitoring.monitor import Monitor, MonitorException
[docs]
LOGGER = logging.getLogger(__name__)
[docs]
class StoreInterface(object, metaclass=abc.ABCMeta):
# Store type being used as collection name in mongo and to retrieve a store
# Fields name used as index inside the mongo collection, with a length > 1, a compound index is created
[docs]
index_fields: List[str] = []
def __init__(self) -> None:
"""
Check that the store implementation defines its type and index_fields.
"""
if not self.type:
raise NotImplementedError("Store 'type' must be overridden in inheriting class.")
if not self.index_fields:
raise NotImplementedError("Store 'index_fields' must be overridden in inheriting class.")
[docs]
class MongodbStore:
"""
Base class extended by all concrete store implementations.
"""
def __init__(self, collection: Collection, *_: Any, **__: Any) -> None:
"""
Validate and hold the collection for all the implementation.
"""
if not isinstance(collection, pymongo.collection.Collection):
raise TypeError("Collection not of expected type.")
self.collection: Collection = collection
@classmethod
[docs]
def get_args_kwargs(cls, *args: Any, **kwargs: Any) -> Tuple[Tuple[Any, ...], Dict[str, Any]]:
"""
Filters :class:`MongodbStore`-specific arguments to safely pass them down its ``__init__``.
"""
if len(args):
collection = args[0]
else:
collection = kwargs.get("collection", None)
return tuple([collection]), {}
[docs]
class MonitoringStore(StoreInterface, MongodbStore):
"""
Registry for monitoring instances.
Uses `MongoDB` to store what is monitored and by whom.
"""
[docs]
index_fields = ["callback", "path"]
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""
Init the store used to save monitors.
"""
db_args, db_kwargs = MongodbStore.get_args_kwargs(*args, **kwargs)
StoreInterface.__init__(self)
MongodbStore.__init__(self, *db_args, **db_kwargs)
[docs]
def save_monitor(self, monitor: Monitor) -> None:
"""
Stores Monitor in `MongoDB` storage.
"""
# check if the Monitor is already registered
if self.collection.count_documents(monitor.key) > 0:
self.collection.delete_one(monitor.key)
self.collection.insert_one(monitor.params())
[docs]
def delete_monitor(self, monitor: Monitor) -> None:
"""
Removes Monitor from `MongoDB` storage.
"""
self.collection.delete_one(monitor.key)
[docs]
def list_monitors(self) -> List[Monitor]:
"""
Lists all Monitor in `MongoDB` storage.
"""
monitors = []
for mon_params in self.collection.find().sort("callback", pymongo.ASCENDING):
try:
monitors.append(Monitor(**{key: val for key, val in mon_params.items() if key != "_id"}))
except MonitorException as exc:
LOGGER.warning("Failed to start monitoring the following path [%s] with this monitor [%s] "
"(Will be removed from database) : [%s]",
mon_params["path"],
mon_params["callback"],
exc)
self.collection.delete_one(mon_params)
return monitors
[docs]
def clear_services(self, drop: bool = True) -> None:
"""
Removes all Monitor from `MongoDB` storage.
"""
if drop:
self.collection.drop()
else:
self.collection.delete_many({})