使用watchdog.observers中的Observer()类在Python中实现实时文件夹监控
在Python中,我们可以使用watchdog库来实现实时文件夹监控。这个库提供了Observer()类,可以用来监控文件夹中的文件变化,并触发相应的事件。下面是一个使用Observer()类监控文件夹变化的示例。
首先,我们需要安装watchdog库。可以使用pip来安装:
pip install watchdog
接下来,我们可以创建一个python脚本,命名为file_monitor.py。在这个脚本中,我们需要导入Observer()类:
from watchdog.observers import Observer
然后,我们需要导入事件处理器类,该类定义了文件夹变化时的具体操作。在本例中,我们实现了一个文件变化事件处理器类FileEventHandler:
from watchdog.events import FileSystemEventHandler
class FileEventHandler(FileSystemEventHandler):
def on_created(self, event):
if not event.is_directory: # 如果不是目录
print(f"文件被创建: {event.src_path}")
def on_deleted(self, event):
if not event.is_directory:
print(f"文件被删除: {event.src_path}")
def on_modified(self, event):
if not event.is_directory:
print(f"文件被修改: {event.src_path}")
def on_moved(self, event):
if not event.is_directory:
print(f"文件被移动: {event.src_path} -> {event.dest_path}")
在FileEventHandler类中,我们重写了一些文件变化事件的处理方法。每当有文件创建、删除、修改或移动时,相应的方法就会被调用,并打印出相关信息。
现在,我们可以在主函数中创建一个Observer()实例,并将其与FileEventHandler()实例进行关联:
if __name__ == "__main__":
event_handler = FileEventHandler() # 创建事件处理器实例
observer = Observer() # 创建观察者实例
observer.schedule(event_handler, path="/path/to/folder", recursive=True) # 监控文件夹
observer.start() # 启动观察者
try:
while True:
pass # 保持监控状态
except KeyboardInterrupt:
observer.stop() # 停止观察者
observer.join() # 等待观察者线程结束
在主函数中,我们首先创建了一个FileEventHandler实例和一个Observer实例。然后,我们使用observer.schedule()方法将FileEventHandler与要监控的文件夹路径进行关联,并设置recursive=True以递归地监控所有子文件夹。
接下来,我们通过调用observer.start()方法来启动观察者。然后,我们进入一个无限循环,以保持监控状态,直到用户按下键盘中断(KeyboardInterrupt)为止。
最后,我们调用observer.stop()方法停止观察者,并调用observer.join()方法等待观察者线程结束。
整个脚本的示例代码如下:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class FileEventHandler(FileSystemEventHandler):
def on_created(self, event):
if not event.is_directory:
print(f"文件被创建: {event.src_path}")
def on_deleted(self, event):
if not event.is_directory:
print(f"文件被删除: {event.src_path}")
def on_modified(self, event):
if not event.is_directory:
print(f"文件被修改: {event.src_path}")
def on_moved(self, event):
if not event.is_directory:
print(f"文件被移动: {event.src_path} -> {event.dest_path}")
if __name__ == "__main__":
event_handler = FileEventHandler()
observer = Observer()
observer.schedule(event_handler, path="/path/to/folder", recursive=True)
observer.start()
try:
while True:
pass
except KeyboardInterrupt:
observer.stop()
observer.join()
使用这个脚本,我们可以实时监控指定文件夹中的文件变化,并进行相应的操作。你可以根据自己的需求修改事件处理器类FileEventHandler中的方法来执行你需要的操作。
