Source code for cowbird.monitoring.monitor

import importlib
import os
from typing import TYPE_CHECKING

from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer

from cowbird.monitoring.fsmonitor import FSMonitor
from cowbird.utils import get_logger

if TYPE_CHECKING:
    # pylint: disable=W0611,unused-import
    from typing import Dict, Type, Union

    from watchdog.events import (
        DirCreatedEvent,
        DirDeletedEvent,
        DirModifiedEvent,
        DirMovedEvent,
        FileCreatedEvent,
        FileDeletedEvent,
        FileModifiedEvent,
        FileMovedEvent
    )

[docs]LOGGER = get_logger(__name__)
[docs]class MonitorException(Exception):
""" Error indicating that a :class:`Monitor` cannot be started because of an invalid path or callback. """
[docs]class Monitor(FileSystemEventHandler): """ Implementation of the watchdog :class:`FileSystemEventHandler` class Allows to start/stop directory monitoring and send events to :class:`FSMonitor` callback. """ def __init__(self, path, recursive, callback): # type: (str, bool, Union[FSMonitor, Type[FSMonitor], str]) -> None """ Initialize the path monitoring and ready to be started. @param path: Path to monitor @param recursive: Monitor subdirectory recursively? @param callback: Events are sent to this FSMonitor. Can be an object, a class type implementing :class:`FSMonitor` or a string containing module and class name. The class type or string is used to instantiate an object using the class method :meth:`FSMonitor.get_instance()` """ if not os.path.exists(path): raise MonitorException("Cannot monitor the following file or directory [{}]: No such file or directory" .format(path)) self.__src_path = path self.__recursive = recursive self.__callback = self.get_fsmonitor_instance(callback) self.__event_observer = None @staticmethod
[docs] def get_fsmonitor_instance(callback): # type: (Union[FSMonitor, Type[FSMonitor], str]) -> FSMonitor """ Return a :class:`FSMonitor` instance from multiple possible forms including the :class:`FSMonitor` type, the :class:`FSMonitor` full qualified class name or a direct instance which is returned as is. """ if isinstance(callback, FSMonitor): return callback if isinstance(callback, type) and issubclass(callback, FSMonitor): return callback.get_instance() if isinstance(callback, str): try: module_name = ".".join(callback.split(".")[:-1]) class_name = callback.split(".")[-1] module = importlib.import_module(module_name) cls = getattr(module, class_name) return cls.get_instance() except (AttributeError, ValueError): raise MonitorException("Cannot instantiate the following FSMonitor callback : {}".format(callback)) raise TypeError("Unsupported callback type : [{}] ({})".format(callback, type(callback)))
@staticmethod
[docs] def get_qualified_class_name(monitor): # type: (FSMonitor) -> str """ Returns the full qualified class name of the :class:`FSMonitor` object (string of the form module.class_name) """ cls = type(monitor) return ".".join([cls.__module__, cls.__qualname__])
@property
[docs] def recursive(self): return self.__recursive
@recursive.setter def recursive(self, value): if self.__recursive != value: self.stop() self.__recursive = value self.start() @property
[docs] def path(self): return self.__src_path
@property
[docs] def callback(self): return self.get_qualified_class_name(self.__callback)
@property
[docs] def callback_instance(self): return self.__callback
@property
[docs] def key(self): # type: () -> Dict """ Return a dict that can be used as a unique key to identify this :class:`Monitor` in a BD. """ return dict(callback=self.callback, path=self.path)
[docs] def params(self): # type: () -> Dict """ Return a dict serializing this object from which a new :class:`Monitor` can be recreated using the init function. """ return dict(callback=self.callback, path=self.path, recursive=self.__recursive)
[docs] def start(self): """ Start the monitoring so that events can be fired. """ if self.__event_observer: msg = "This monitor [path={}, callback={}] is already started".format(self.path, self.callback) LOGGER.error(msg) raise MonitorException(msg) self.__event_observer = Observer() self.__event_observer.schedule(self, self.__src_path, recursive=self.__recursive) try: self.__event_observer.start() except OSError: LOGGER.warning("Cannot monitor the following file or directory [%s]: No such file or directory", self.__src_path)
[docs] def stop(self): """ Stop the monitoring so that events stop to be fired. """ self.__event_observer.stop() self.__event_observer.join() self.__event_observer = None
[docs] def on_moved(self, event): # type: (Union[DirMovedEvent, FileMovedEvent]) -> None """ Called when a file or a directory is moved or renamed. @param event: Event representing file/directory movement. """ self.__callback.on_deleted(event.src_path) # If moved outside of __src_path don't send a create event if event.dest_path.startswith(self.__src_path): # If move under subdirectory and recursive is False don't send a # create event neither if self.__recursive or \ os.path.dirname(event.dest_path) == \ os.path.dirname(self.__src_path): self.__callback.on_created(event.dest_path)
[docs] def on_created(self, event): # type: (Union[DirCreatedEvent, FileCreatedEvent]) -> None """ Called when a file or directory is created. @param event: Event representing file/directory creation. """ self.__callback.on_created(event.src_path)
[docs] def on_deleted(self, event): # type: (Union[DirDeletedEvent, FileDeletedEvent]) -> None """ Called when a file or directory is deleted. @param event: Event representing file/directory deletion. """ self.__callback.on_deleted(event.src_path)
[docs] def on_modified(self, event): # type: (Union[DirModifiedEvent, FileModifiedEvent]) -> None """ Called when a file or directory is modified. @param event: Event representing file/directory modification. """ self.__callback.on_modified(event.src_path)