"""
Stores to read/write data to from/to `MongoDB` using pymongo.
"""
import abc
import logging
from typing import TYPE_CHECKING
import pymongo
from cowbird.monitoring.monitor import Monitor, MonitorException
if TYPE_CHECKING:
# pylint: disable=W0611,unused-import
from typing import Any, Dict, List, Optional, Tuple
from pymongo.collection import Collection
[docs]LOGGER = logging.getLogger(__name__)
[docs]class StoreInterface(object, metaclass=abc.ABCMeta):
# Store type being used as collection name in mongo and to retrieve a store
# Fields name used as index inside the mongo collection, with a length > 1, a compound index is created
def __init__(self):
"""
Check that the store implementation defines its type and index_fields.
"""
if not self.type:
raise NotImplementedError("Store 'type' must be overridden in inheriting class.")
if not self.index_fields:
raise NotImplementedError("Store 'index_fields' must be overridden in inheriting class.")
[docs]class MongodbStore:
"""
Base class extended by all concrete store implementations.
"""
def __init__(self, collection):
# type: (Collection, Optional[Dict[str, Any]]) -> None
"""
Validate and hold the collection for all the implementation.
"""
if not isinstance(collection, pymongo.collection.Collection):
raise TypeError("Collection not of expected type.")
self.collection = collection # type: Collection
@classmethod
[docs] def get_args_kwargs(cls, *args, **kwargs):
# type: (*Any, **Any) -> Tuple[Tuple, Dict]
"""
Filters :class:`MongodbStore`-specific arguments to safely pass them down its ``__init__``.
"""
if len(args):
collection = args[0]
else:
collection = kwargs.get("collection", None)
return tuple([collection]), {}
[docs]class MonitoringStore(StoreInterface, MongodbStore):
"""
Registry for monitoring instances.
Uses `MongoDB` to store what is monitored and by whom.
"""
[docs] index_fields = ["callback", "path"]
def __init__(self, *args, **kwargs):
"""
Init the store used to save monitors.
"""
db_args, db_kwargs = MongodbStore.get_args_kwargs(*args, **kwargs)
StoreInterface.__init__(self)
MongodbStore.__init__(self, *db_args, **db_kwargs)
[docs] def save_monitor(self, monitor):
# type: (Monitor) -> None
"""
Stores Monitor in `MongoDB` storage.
"""
# check if the Monitor is already registered
if self.collection.count_documents(monitor.key) > 0:
self.collection.delete_one(monitor.key)
self.collection.insert_one(monitor.params())
[docs] def delete_monitor(self, monitor):
# type: (Monitor) -> None
"""
Removes Monitor from `MongoDB` storage.
"""
self.collection.delete_one(monitor.key)
[docs] def list_monitors(self):
# type: () -> List[Monitor]
"""
Lists all Monitor in `MongoDB` storage.
"""
monitors = []
for mon_params in self.collection.find().sort("callback", pymongo.ASCENDING):
try:
monitors.append(Monitor(**{key: val for key, val in mon_params.items() if key != "_id"}))
except MonitorException as exc:
LOGGER.warning("Failed to start monitoring the following path [%s] with this monitor [%s] "
"(Will be removed from database) : [%s]",
mon_params["path"],
mon_params["callback"],
exc)
self.collection.delete_one(mon_params)
return monitors
[docs] def clear_services(self):
# type: () -> None
"""
Removes all Monitor from `MongoDB` storage.
"""
self.collection.drop()