使用JoinableQueue实现多进程任务的分发与收集
发布时间:2023-12-16 21:08:56
JoinableQueue是Queue的子类,它可以实现多进程任务的分发与收集。它提供了task_done()和join()方法来控制任务的完成和阻塞等待任务的完成。在分发任务时,可以使用put()方法将任务放入队列中,而在收集任务时,可以使用get()方法从队列中获取任务。
下面是一个使用JoinableQueue实现多进程任务分发与收集的例子:
import multiprocessing
def worker(queue):
while True:
task = queue.get() # 从队列中获取任务
if task is None:
break
result = task * task
queue.task_done() # 标记任务完成
print(result)
if __name__ == '__main__':
num_processes = multiprocessing.cpu_count() # 获取CPU核心数
tasks = [1, 2, 3, 4, 5] # 待处理的任务
queue = multiprocessing.JoinableQueue()
# 创建多个子进程并启动
for _ in range(num_processes):
process = multiprocessing.Process(target=worker, args=(queue,))
process.start()
# 分发任务
for task in tasks:
queue.put(task)
# 阻塞等待所有任务完成
queue.join()
# 结束子进程
for _ in range(num_processes):
queue.put(None)
在上面的例子中,首先获取CPU核心数以确定进程数量。然后创建JoinableQueue实例,实例化多个子进程并分别启动这些子进程。接下来,将待处理的任务放入队列中。最后,使用queue.join()方法等待所有任务完成。当所有任务完成后,可以使用queue.put(None)来结束子进程。
在每个子进程的worker函数中,通过循环不断从队列中获取任务,如果获取到的任务为None,则表示没有任务待处理,子进程退出循环。否则,子进程处理任务并将任务完成信号发送给JoinableQueue,然后输出任务的处理结果。
使用JoinableQueue实现多进程任务的分发与收集,可以充分利用计算机的多核处理能力,提高程序的运行效率。同时,JoinableQueue还提供了task_done()和join()方法来控制任务的完成和阻塞等待任务完成,使得任务的分发与收集更加灵活和方便。
