欢迎访问宙启技术站
智能推送

使用watchdog.observers中的Observer()类在Python中实现实时文件夹监控

发布时间:2024-01-04 10:07:39

在Python中,我们可以使用watchdog库来实现实时文件夹监控。这个库提供了Observer()类,可以用来监控文件夹中的文件变化,并触发相应的事件。下面是一个使用Observer()类监控文件夹变化的示例。

首先,我们需要安装watchdog库。可以使用pip来安装:

pip install watchdog

接下来,我们可以创建一个python脚本,命名为file_monitor.py。在这个脚本中,我们需要导入Observer()类:

from watchdog.observers import Observer

然后,我们需要导入事件处理器类,该类定义了文件夹变化时的具体操作。在本例中,我们实现了一个文件变化事件处理器类FileEventHandler:

from watchdog.events import FileSystemEventHandler

class FileEventHandler(FileSystemEventHandler):
    def on_created(self, event):
        if not event.is_directory:  # 如果不是目录
            print(f"文件被创建: {event.src_path}")

    def on_deleted(self, event):
        if not event.is_directory:
            print(f"文件被删除: {event.src_path}")

    def on_modified(self, event):
        if not event.is_directory:
            print(f"文件被修改: {event.src_path}")

    def on_moved(self, event):
        if not event.is_directory:
            print(f"文件被移动: {event.src_path} -> {event.dest_path}")

在FileEventHandler类中,我们重写了一些文件变化事件的处理方法。每当有文件创建、删除、修改或移动时,相应的方法就会被调用,并打印出相关信息。

现在,我们可以在主函数中创建一个Observer()实例,并将其与FileEventHandler()实例进行关联:

if __name__ == "__main__":
    event_handler = FileEventHandler()  # 创建事件处理器实例
    observer = Observer()  # 创建观察者实例
    observer.schedule(event_handler, path="/path/to/folder", recursive=True)  # 监控文件夹
    observer.start()  # 启动观察者

    try:
        while True:
            pass  # 保持监控状态
    except KeyboardInterrupt:
        observer.stop()  # 停止观察者

    observer.join()  # 等待观察者线程结束

在主函数中,我们首先创建了一个FileEventHandler实例和一个Observer实例。然后,我们使用observer.schedule()方法将FileEventHandler与要监控的文件夹路径进行关联,并设置recursive=True以递归地监控所有子文件夹。

接下来,我们通过调用observer.start()方法来启动观察者。然后,我们进入一个无限循环,以保持监控状态,直到用户按下键盘中断(KeyboardInterrupt)为止。

最后,我们调用observer.stop()方法停止观察者,并调用observer.join()方法等待观察者线程结束。

整个脚本的示例代码如下:

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class FileEventHandler(FileSystemEventHandler):
    def on_created(self, event):
        if not event.is_directory:
            print(f"文件被创建: {event.src_path}")

    def on_deleted(self, event):
        if not event.is_directory:
            print(f"文件被删除: {event.src_path}")

    def on_modified(self, event):
        if not event.is_directory:
            print(f"文件被修改: {event.src_path}")

    def on_moved(self, event):
        if not event.is_directory:
            print(f"文件被移动: {event.src_path} -> {event.dest_path}")

if __name__ == "__main__":
    event_handler = FileEventHandler()
    observer = Observer()
    observer.schedule(event_handler, path="/path/to/folder", recursive=True)
    observer.start()

    try:
        while True:
            pass
    except KeyboardInterrupt:
        observer.stop()

    observer.join()

使用这个脚本,我们可以实时监控指定文件夹中的文件变化,并进行相应的操作。你可以根据自己的需求修改事件处理器类FileEventHandler中的方法来执行你需要的操作。