Source code for cowbird.monitoring.monitor

import importlib
import os
from typing import Optional, Type, TypedDict, Union

from watchdog.events import (
    DirCreatedEvent,
    DirDeletedEvent,
    DirModifiedEvent,
    DirMovedEvent,
    FileCreatedEvent,
    FileDeletedEvent,
    FileModifiedEvent,
    FileMovedEvent,
    FileSystemEventHandler
)
from watchdog.observers import Observer
from watchdog.observers.api import BaseObserver

from cowbird.monitoring.fsmonitor import FSMonitor
from cowbird.utils import get_logger

[docs] LOGGER = get_logger(__name__)
[docs] MonitorKey = TypedDict( "MonitorKey", { "callback": str, "path": str, }, total=True, )
[docs] MonitorParameters = TypedDict( "MonitorParameters", { "callback": str, "path": str, "recursive": bool, }, total=True, )
[docs] class MonitorException(Exception): """ Error indicating that a :class:`Monitor` cannot be started because of an invalid path or callback. """
[docs] class Monitor(FileSystemEventHandler): """ Implementation of the watchdog :class:`FileSystemEventHandler` class Allows to start/stop directory monitoring and send events to :class:`FSMonitor` callback. """ def __init__(self, path: str, recursive: bool, callback: Union[FSMonitor, Type[FSMonitor], str]) -> None: """ Initialize the path monitoring and ready to be started. :param path: Path to monitor :param recursive: Monitor subdirectory recursively? :param callback: Events are sent to this FSMonitor. Can be an object, a class type implementing :class:`FSMonitor` or a string containing module and class name. The class type or string is used to instantiate an object using the class method :meth:`FSMonitor.get_instance()` """ if not os.path.exists(path): raise MonitorException(f"Cannot monitor the following file or directory [{path}]: " "No such file or directory") self.__src_path = path self.__recursive = recursive self.__callback = self.get_fsmonitor_instance(callback) self.__event_observer: Optional[BaseObserver] = None @staticmethod
[docs] def get_fsmonitor_instance(callback: Union[FSMonitor, Type[FSMonitor], str]) -> FSMonitor: """ Return a :class:`FSMonitor` instance from multiple possible forms including the :class:`FSMonitor` type, the :class:`FSMonitor` full qualified class name or a direct instance which is returned as is. """ if isinstance(callback, FSMonitor): return callback if isinstance(callback, type) and issubclass(callback, FSMonitor): return callback.get_instance() if isinstance(callback, str): try: module_name = ".".join(callback.split(".")[:-1]) class_name = callback.split(".")[-1] module = importlib.import_module(module_name) cls = getattr(module, class_name) return cls.get_instance() except (AttributeError, ValueError): raise MonitorException(f"Cannot instantiate the following FSMonitor callback : {callback}") raise TypeError(f"Unsupported callback type : [{callback}] ({type(callback)})")
@staticmethod
[docs] def get_qualified_class_name(monitor: FSMonitor) -> str: """ Returns the full qualified class name of the :class:`FSMonitor` object (string of the form module.class_name) """ cls = type(monitor) return ".".join([cls.__module__, cls.__qualname__])
@property
[docs] def recursive(self) -> bool: return self.__recursive
@recursive.setter def recursive(self, value: bool) -> None: if self.__recursive != value: self.stop() self.__recursive = value self.start() @property
[docs] def path(self) -> str: return self.__src_path
@property
[docs] def callback(self) -> str: return self.get_qualified_class_name(self.__callback)
@property
[docs] def callback_instance(self) -> FSMonitor: return self.__callback
@property
[docs] def key(self) -> MonitorKey: """ Return a dict that can be used as a unique key to identify this :class:`Monitor` in a BD. """ return {"callback": self.callback, "path": self.path}
@property
[docs] def is_alive(self) -> bool: """ Returns true if the monitor observer exists and is currently running. """ return bool(self.__event_observer) and self.__event_observer.is_alive()
[docs] def params(self) -> MonitorParameters: """ Return a dict serializing this object from which a new :class:`Monitor` can be recreated using the init function. """ return {"callback": self.callback, "path": self.path, "recursive": self.__recursive}
[docs] def start(self) -> None: """ Start the monitoring so that events can be fired. """ if self.is_alive: msg = f"This monitor [path={self.path}, callback={self.callback}] is already started" LOGGER.error(msg) raise MonitorException(msg) self.__event_observer = Observer() self.__event_observer.schedule(self, # type: ignore[no-untyped-call] self.__src_path, recursive=self.__recursive) try: self.__event_observer.start() # type: ignore[no-untyped-call] except OSError: LOGGER.warning("Cannot monitor the following file or directory [%s]: No such file or directory", self.__src_path)
[docs] def stop(self) -> None: """ Stop the monitoring so that events stop to be fired. """ self.__event_observer.stop() # type: ignore[no-untyped-call] self.__event_observer.join() self.__event_observer = None
[docs] def on_moved(self, event: Union[DirMovedEvent, FileMovedEvent]) -> None: # type: ignore[override] """ Called when a file or a directory is moved or renamed. :param event: Event representing file/directory movement. """ self.__callback.on_deleted(event.src_path) # If moved outside of __src_path don't send a create event if event.dest_path.startswith(self.__src_path): # If move under subdirectory and recursive is False don't send a # create event neither if self.__recursive or \ os.path.dirname(event.dest_path) == \ os.path.dirname(self.__src_path): self.__callback.on_created(event.dest_path)
[docs] def on_created(self, event: Union[DirCreatedEvent, FileCreatedEvent]) -> None: # type: ignore[override] """ Called when a file or directory is created. :param event: Event representing file/directory creation. """ self.__callback.on_created(event.src_path)
[docs] def on_deleted(self, event: Union[DirDeletedEvent, FileDeletedEvent]) -> None: # type: ignore[override] """ Called when a file or directory is deleted. :param event: Event representing file/directory deletion. """ self.__callback.on_deleted(event.src_path)
[docs] def on_modified(self, event: Union[DirModifiedEvent, FileModifiedEvent]) -> None: # type: ignore[override] """ Called when a file or directory is modified. :param event: Event representing file/directory modification. """ self.__callback.on_modified(event.src_path)