欢迎访问宙启技术站
智能推送

了解Python中的ZeroMQ多进程通信技术

发布时间:2023-12-19 02:26:49

ZeroMQ是一个高性能的消息传递库,可以用于构建分布式应用程序。它提供了多种通信模式,包括进程间通信。在Python中,可以使用zmq模块来实现ZeroMQ的多进程通信。

在ZeroMQ中,有两种常见的进程间通信模式:发布-订阅模式(PUB-SUB)和请求-应答模式(REQ-REP)。

首先,我们来看一个使用发布-订阅模式的例子。在这个例子中,我们将创建一个发布者和两个订阅者,发布者会向所有订阅者发送消息。

import zmq
import time
import random
from multiprocessing import Process

# 发布者函数
def publisher():
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://127.0.0.1:5555")
    
    while True:
        topic = random.choice(["A", "B", "C"])
        message = f"Message from publisher, topic: {topic}"
        socket.send_string(f"{topic} {message}")
        time.sleep(1)

# 订阅者函数
def subscriber(name):
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://127.0.0.1:5555")
    socket.setsockopt_string(zmq.SUBSCRIBE, "")
    
    while True:
        message = socket.recv_string()
        print(f"{name} received: {message}")

if __name__ == '__main__':
    # 创建发布者进程
    publisher_process = Process(target=publisher)
    # 创建两个订阅者进程
    subscriber_process1 = Process(target=subscriber, args=("Subscriber 1",))
    subscriber_process2 = Process(target=subscriber, args=("Subscriber 2",))
    
    # 启动进程
    publisher_process.start()
    subscriber_process1.start()
    subscriber_process2.start()
    
    # 等待进程结束
    publisher_process.join()
    subscriber_process1.join()
    subscriber_process2.join()

在这个例子中,首先创建了一个发布者进程和两个订阅者进程。发布者使用zmq.PUB创建一个PUB socket,并绑定到tcp://127.0.0.1:5555地址上。然后,在一个无限循环中,每隔一秒钟发送一个随机主题(A、B或C)的消息。

订阅者使用zmq.SUB创建一个SUB socket,并连接到发布者的地址。然后,使用zmq.SUBSCRIBE设置socket的订阅过滤器为空字符串,以便接收发布者发送的所有消息。在一个无限循环中,订阅者接收到消息后,打印出消息内容。

在主程序中,创建了发布者进程和两个订阅者进程,并使用start方法启动它们。然后,使用join方法等待进程结束。

接下来,我们来看一个使用请求-应答模式的例子。在这个例子中,我们将创建一个服务器进程和两个客户端进程,客户端会向服务器发送请求,并接收服务器的回复。

import zmq
import time
from multiprocessing import Process

# 服务器函数
def server():
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind("tcp://127.0.0.1:5555")
    
    while True:
        message = socket.recv_string()
        print(f"Server received: {message}")
        time.sleep(1)
        socket.send_string(f"Reply from server to {message}")

# 客户端函数
def client(name):
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://127.0.0.1:5555")
    
    for i in range(3):
        message = f"Message from client {name}, index: {i}"
        socket.send_string(message)
        reply = socket.recv_string()
        print(f"{name} received: {reply}")
        time.sleep(1)

if __name__ == '__main__':
    # 创建服务器进程
    server_process = Process(target=server)
    # 创建两个客户端进程
    client_process1 = Process(target=client, args=("Client 1",))
    client_process2 = Process(target=client, args=("Client 2",))
    
    # 启动进程
    server_process.start()
    client_process1.start()
    client_process2.start()
    
    # 等待进程结束
    server_process.join()
    client_process1.join()
    client_process2.join()

在这个例子中,首先创建了一个服务器进程和两个客户端进程。服务器使用zmq.REP创建一个REP socket,并绑定到tcp://127.0.0.1:5555地址上。然后,在一个无限循环中,服务器接收到请求后,打印出请求内容,并发送一个回复。

客户端使用zmq.REQ创建一个REQ socket,并连接到服务器的地址。然后,在一个循环中,客户端发送一个带有索引的消息给服务器,并接收服务器的回复。在每次发送消息后,客户端会休眠一秒钟。

在主程序中,创建了服务器进程和两个客户端进程,并使用start方法启动它们。然后,使用join方法等待进程结束。

这些例子只是ZeroMQ多进程通信的一些简单示例,实际上,ZeroMQ提供了丰富的API和通信模式,可以用于构建更复杂的分布式应用程序。