通过Observer()实现实时监控文件夹的变动
发布时间:2024-01-04 09:59:26
Observer()是Python的一个观察者模式的实现库,它能够实时监控文件夹的变动,并在文件夹内容发生变化时进行相应的处理。下面将详细介绍如何使用Observer()来实现实时监控文件夹的变动,并给出一个使用例子。
首先,需要安装Observer()库。可以使用以下命令来安装:
pip install watchdog
### 使用Observer()监控文件夹的变动
1. 首先,创建一个继承自Observer类的子类,在子类的构造函数中初始化需要监控的文件夹路径和事件处理逻辑。
import os
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class MyFileHandler(FileSystemEventHandler):
def __init__(self, folder_path):
self.folder_path = folder_path
def on_any_event(self, event):
# 处理需要的文件夹变动事件,如文件创建、删除、修改等
if event.is_directory:
return None
elif event.event_type == 'created':
# 文件创建事件
print(f"File {os.path.join(self.folder_path, event.src_path)} was created.")
elif event.event_type == 'deleted':
# 文件删除事件
print(f"File {os.path.join(self.folder_path, event.src_path)} was deleted.")
elif event.event_type == 'modified':
# 文件修改事件
print(f"File {os.path.join(self.folder_path, event.src_path)} was modified.")
# 递归访问文件夹下的所有文件,并监控其变动
for filename in os.listdir(self.folder_path):
file_path = os.path.join(self.folder_path, filename)
if os.path.isdir(file_path):
self.monitor_folder(file_path)
2. 在子类中重写on_any_event()函数,该函数会在文件夹内容发生变动时被调用。在这个函数中,可以根据具体的事件类型进行相应的处理。
3. 创建一个监控文件夹的函数monitor_folder(),在该函数中使用Observer类创建一个观察者对象,并将需要监控的文件夹添加到观察者中。
class MyFileHandler(FileSystemEventHandler):
...
def monitor_folder(self, folder_path):
observer = Observer()
observer.schedule(MyFileHandler(folder_path), path=folder_path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
4. 在主程序中调用monitor_folder()函数来监控指定的文件夹。
if __name__ == "__main__":
folder_path = "/path/to/folder"
handler = MyFileHandler(folder_path)
handler.monitor_folder(folder_path)
### 使用例子
下面是一个使用Observer()监控文件夹变动的例子,监控文件夹下的图片文件,并在新图片文件创建时打印出文件路径。
import os
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class MyFileHandler(FileSystemEventHandler):
def __init__(self, folder_path):
self.folder_path = folder_path
def on_any_event(self, event):
if event.is_directory:
return None
elif event.event_type == 'created':
# 文件创建事件
file_ext = os.path.splitext(event.src_path)[1]
if file_ext.lower() in ['.jpg', '.jpeg', '.png']:
print(f"New image file created: {os.path.join(self.folder_path, event.src_path)}")
def monitor_folder(self, folder_path):
observer = Observer()
observer.schedule(MyFileHandler(folder_path), path=folder_path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
folder_path = "/path/to/folder"
handler = MyFileHandler(folder_path)
handler.monitor_folder(folder_path)
在上面的例子中,为了简化逻辑,只监控文件创建事件,并且只处理图片文件。代码会打印出新图片文件的路径。可以根据需要修改on_any_event()函数的实现来适应更复杂的监控和处理逻辑。
