from collections import defaultdict
from typing import Dict, MutableMapping, Optional, Type, Union
from cowbird.database import get_db
from cowbird.database.stores import MonitoringStore
from cowbird.handlers import HandlerFactory
from cowbird.monitoring.fsmonitor import FSMonitor
from cowbird.monitoring.monitor import Monitor, MonitorException
from cowbird.typedefs import AnySettingsContainer
from cowbird.utils import SingletonMeta, get_logger
[docs]
LOGGER = get_logger(__name__)
[docs]
class MonitoringConfigurationException(Exception):
"""
Exception thrown when the monitoring instance cannot be initialized because of a bad configuration.
"""
[docs]
class Monitoring(metaclass=SingletonMeta):
"""
Class handling file system monitoring and registering listeners.
.. todo:: At some point we will need a consistency function that goes through all monitored folder and make sure
that monitoring handlers are up to date.
"""
def __init__(self, config: AnySettingsContainer = None) -> None:
"""
Initialize the monitoring instance from configured application settings.
:param config: AnySettingsContainer object from which the db can be retrieved.
The default value of None is only there to disable pylint E1120. The singleton instance
must be initialized with a proper config and after that the init function should not be hit.
"""
if not config: # pragma: no cover
raise MonitoringConfigurationException("Without proper application settings, the Monitoring class cannot "
"obtains a proper database store.")
self.monitors: MutableMapping[str, Dict[str, Monitor]] = defaultdict(lambda: {})
self.store = get_db(config).get_store(MonitoringStore)
[docs]
def start(self) -> None:
"""
Load existing monitors and start the monitoring.
"""
monitors = self.store.list_monitors()
for mon in monitors:
self.monitors[mon.path][mon.callback] = mon
mon.start()
# Initialize FileSystem handler which must monitor the WPS outputs folder on startup
filesystem_handler = HandlerFactory().get_handler("FileSystem")
if filesystem_handler:
filesystem_handler.start_wps_outputs_monitoring(self)
[docs]
def register(self,
path: str,
recursive: bool,
cb_monitor: Union[FSMonitor, Type[FSMonitor], str],
) -> Optional[Monitor]:
"""
Register a monitor for a specific path and start it. If a monitor already exists for the specific
path/cb_monitor combination it is directly returned. If this monitor was not recursively monitoring its path and
the `recursive` flag is now true, this one take precedence and the monitor is updated accordingly. If the
`recursive` flag was true, and now it is false it has no effect.
:param path: Path to monitor
:param recursive: Monitor subdirectory recursively?
:param cb_monitor: FSMonitor for which an instance is created and events are sent
Can be an object, a class type implementing FSMonitor or a string containing module and class
name.
:returns: The monitor registered or already existing for the specific path/cb_monitor combination. Note that
the monitor is not created/returned if a MonitorException occurs.
"""
callback = None
try:
callback = Monitor.get_qualified_class_name(Monitor.get_fsmonitor_instance(cb_monitor))
if path in self.monitors and callback in self.monitors[path]:
mon = self.monitors[path][callback]
# If the monitor already exists but is not recursive, make it recursive if required
# (recursive takes precedence)
if not mon.recursive and recursive:
mon.recursive = True
else:
# Doesn't already exist
mon = Monitor(path, recursive, cb_monitor)
self.monitors[mon.path][mon.callback] = mon
self.store.collection.update_one(
{"callback": mon.callback, "path": mon.path},
{"$set": {"callback": mon.callback, "path": mon.path, "recursive": mon.recursive}},
upsert=True)
if not mon.is_alive:
mon.start()
return mon
except MonitorException as exc:
LOGGER.warning("Failed to start monitoring the following path [%s] with this monitor [%s] : [%s]",
path, callback, exc)
return None
[docs]
def unregister(self, path: str, cb_monitor: Union[FSMonitor, Type[FSMonitor], str]) -> bool:
"""
Stop a monitor and unregister it.
:param path: Path used by the monitor
:param cb_monitor: FSMonitor object to remove
Can be an object, a class type implementing FSMonitor or a string containing module and class
name.
:returns: True if the monitor is found and successfully stopped, False otherwise
"""
mon_qualname = Monitor.get_qualified_class_name(Monitor.get_fsmonitor_instance(cb_monitor))
self.store.collection.delete_one({"callback": mon_qualname, "path": path})
if path in self.monitors:
try:
mon = self.monitors[path].pop(mon_qualname)
if len(self.monitors[path]) == 0:
self.monitors.pop(path)
mon.stop()
return True
except KeyError:
pass
return False
[docs]
def unregister_all(self) -> None:
"""
Stops and unregisters all monitors.
"""
self.monitors.clear()
self.store.clear_services(drop=False)