欢迎访问宙启技术站
智能推送

通过Observer()实现实时监控文件夹的变动

发布时间:2024-01-04 09:59:26

Observer()是Python的一个观察者模式的实现库,它能够实时监控文件夹的变动,并在文件夹内容发生变化时进行相应的处理。下面将详细介绍如何使用Observer()来实现实时监控文件夹的变动,并给出一个使用例子。

首先,需要安装Observer()库。可以使用以下命令来安装:

pip install watchdog

### 使用Observer()监控文件夹的变动

1. 首先,创建一个继承自Observer类的子类,在子类的构造函数中初始化需要监控的文件夹路径和事件处理逻辑。

import os
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class MyFileHandler(FileSystemEventHandler):
    def __init__(self, folder_path):
        self.folder_path = folder_path

    def on_any_event(self, event):
        # 处理需要的文件夹变动事件,如文件创建、删除、修改等
        if event.is_directory:
            return None

        elif event.event_type == 'created':
            # 文件创建事件
            print(f"File {os.path.join(self.folder_path, event.src_path)} was created.")

        elif event.event_type == 'deleted':
            # 文件删除事件
            print(f"File {os.path.join(self.folder_path, event.src_path)} was deleted.")

        elif event.event_type == 'modified':
            # 文件修改事件
            print(f"File {os.path.join(self.folder_path, event.src_path)} was modified.")

        # 递归访问文件夹下的所有文件,并监控其变动
        for filename in os.listdir(self.folder_path):
            file_path = os.path.join(self.folder_path, filename)
            if os.path.isdir(file_path):
                self.monitor_folder(file_path)

2. 在子类中重写on_any_event()函数,该函数会在文件夹内容发生变动时被调用。在这个函数中,可以根据具体的事件类型进行相应的处理。

3. 创建一个监控文件夹的函数monitor_folder(),在该函数中使用Observer类创建一个观察者对象,并将需要监控的文件夹添加到观察者中。

class MyFileHandler(FileSystemEventHandler):
    ...

    def monitor_folder(self, folder_path):
        observer = Observer()
        observer.schedule(MyFileHandler(folder_path), path=folder_path, recursive=True)
        observer.start()

        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()

        observer.join()

4. 在主程序中调用monitor_folder()函数来监控指定的文件夹。

if __name__ == "__main__":
    folder_path = "/path/to/folder"
    handler = MyFileHandler(folder_path)
    handler.monitor_folder(folder_path)

### 使用例子

下面是一个使用Observer()监控文件夹变动的例子,监控文件夹下的图片文件,并在新图片文件创建时打印出文件路径。

import os
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class MyFileHandler(FileSystemEventHandler):
    def __init__(self, folder_path):
        self.folder_path = folder_path

    def on_any_event(self, event):
        if event.is_directory:
            return None

        elif event.event_type == 'created':
            # 文件创建事件
            file_ext = os.path.splitext(event.src_path)[1]
            if file_ext.lower() in ['.jpg', '.jpeg', '.png']:
                print(f"New image file created: {os.path.join(self.folder_path, event.src_path)}")

    def monitor_folder(self, folder_path):
        observer = Observer()
        observer.schedule(MyFileHandler(folder_path), path=folder_path, recursive=True)
        observer.start()

        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()

        observer.join()

if __name__ == "__main__":
    folder_path = "/path/to/folder"
    handler = MyFileHandler(folder_path)
    handler.monitor_folder(folder_path)

在上面的例子中,为了简化逻辑,只监控文件创建事件,并且只处理图片文件。代码会打印出新图片文件的路径。可以根据需要修改on_any_event()函数的实现来适应更复杂的监控和处理逻辑。