欢迎访问宙启技术站
智能推送

使用Observer()实现Python中watchdog.observers的文件夹监控器

发布时间:2024-01-04 10:04:45

watchdog.observers模块提供了一个文件夹监控器,可以用来跟踪文件夹中的文件和子文件夹的变化。该模块中的Observer类用于启动、停止和管理监控任务。

下面是使用Observer类来实现文件夹监控器的示例代码:

import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler


class MyEventHandler(FileSystemEventHandler):
    def on_modified(self, event):
        if event.is_directory:
            return
        print(f"File modified: {event.src_path}")

    def on_created(self, event):
        if event.is_directory:
            return
        print(f"File created: {event.src_path}")

    def on_deleted(self, event):
        if event.is_directory:
            return
        print(f"File deleted: {event.src_path}")


if __name__ == "__main__":
    # 创建一个观察者对象
    observer = Observer()

    # 创建一个事件处理对象并将其附加到观察者中
    event_handler = MyEventHandler()
    observer.schedule(event_handler, '/path/to/folder', recursive=True)

    # 启动观察者
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        # 停止观察者
        observer.stop()

    # 等待观察者完全停止
    observer.join()

在上面的示例代码中,我们首先定义了一个自定义的FileSystemEventHandler类,该类继承自watchdog.events.FileSystemEventHandler。在这个类中,我们可以重写一些触发文件夹变化事件的方法,如on_modifiedon_createdon_deleted,并在这些方法中执行我们自己的逻辑。

然后,在__name__ == "__main__"条件语句中,我们首先创建了一个观察者对象observer,然后创建了一个事件处理对象event_handler,并将其附加到观察者中。通过调用observer.schedule(event_handler, '/path/to/folder', recursive=True)方法,我们可以指定要监控的文件夹路径以及是否递归地监控其子文件夹。

接下来,我们调用observer.start()方法来启动观察者,开始监控操作。然后,在一个无限循环中,我们使用time.sleep(1)来让程序休眠一秒钟,以允许观察者持续进行监测。当我们在终端中按下Ctrl+C时,会触发KeyboardInterrupt异常,我们在异常处理块中调用observer.stop()方法来停止观察者,并调用observer.join()方法等待观察者完全停止。

这样,我们就可以通过使用Observer类来实现一个文件夹监控器,并在文件夹中的文件或子文件夹发生变化时执行自定义的逻辑。