使用selectors模块实现基于事件驱动的服务器和客户端程序
selectors模块是Python标准库中提供的一个高级I/O多路复用模块,可以用于编写基于事件驱动的服务器和客户端程序。它提供了一个 Selector 类,可以注册和管理多个I/O事件,并在事件发生时触发相应的回调函数。下面我们将分别介绍如何使用selectors模块编写服务器和客户端程序,并给出相应的示例代码。
## 服务器端程序
下面我们以一个简单的回声服务器为例,介绍如何使用selectors模块编写基于事件驱动的服务器端程序。
import selectors
import socket
def accept(sock, mask):
conn, addr = sock.accept()
print('Connected by', addr)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, echo)
def echo(conn, mask):
data = conn.recv(1024)
if data:
conn.send(data)
else:
print('Disconnected', conn.getpeername())
sel.unregister(conn)
conn.close()
sel = selectors.DefaultSelector()
sock = socket.socket()
sock.bind(('localhost', 8888))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
在上述代码中,首先我们导入了selectors和socket模块。然后,我们定义了两个回调函数accept和echo,分别用于处理新建连接和处理已建立的连接的读写事件。
在accept函数中,我们首先通过sock.accept()方法接受一个新的连接,并将其设置为非阻塞模式。然后,我们使用sel.register()方法将该连接注册到选择器中,同时指定感兴趣的事件为EVENT_READ,并关联回调函数echo。
在echo函数中,我们首先通过conn.recv()方法接收客户端发送的数据,并对数据进行处理。如果接收到的数据不为空,则使用conn.send()方法将数据回传给客户端。如果接收到的数据为空,则说明客户端断开连接,我们将其从选择器中注销,并关闭连接。
最后,我们创建一个默认选择器对象sel,并使用sock.bind()方法绑定IP地址和端口,并通过sock.setblocking(False)方法将套接字设置为非阻塞模式。然后,我们使用sel.register()方法将套接字注册到选择器中,同时指定感兴趣的事件为EVENT_READ,并关联回调函数accept。
在无限循环中,我们使用sel.select()方法等待事件发生,并返回就绪事件列表。然后,我们遍历就绪事件列表,获取每个事件的回调函数,并调用回调函数处理相应的事件。
## 客户端程序
下面我们以一个简单的回声客户端为例,介绍如何使用selectors模块编写基于事件驱动的客户端程序。
import selectors
import socket
import sys
def start_connection(host, port, num_conns):
server_addr = (host, port)
for i in range(num_conns):
conn = socket.socket()
conn.setblocking(False)
conn.connect_ex(server_addr)
events = selectors.EVENT_READ | selectors.EVENT_WRITE
data = types.SimpleNamespace(
conn=conn,
msg_total=sum(len(m) for m in messages),
recv_total=0,
messages=list(messages),
outb=b'',
)
sel.register(conn, events, data=data)
def service_connection(key, mask):
sock = key.fileobj
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(1024)
if recv_data:
data.recv_total += len(recv_data)
if data.recv_total == data.msg_total:
sel.unregister(sock)
sock.close()
if mask & selectors.EVENT_WRITE:
if not data.outb and data.messages:
data.outb = data.messages.pop(0)
if data.outb:
sent = sock.send(data.outb)
data.outb = data.outb[sent:]
sel = selectors.DefaultSelector()
messages = [b'Message 1 from client.', b'Message 2 from client.']
start_connection('localhost', 8888, 2)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key, mask)
在上述代码中,首先我们导入了selectors、socket和sys模块。然后,我们定义了两个函数start_connection和service_connection,分别用于启动连接和服务连接。
在start_connection函数中,我们首先通过socket.socket()方法创建一个套接字,并将其设置为非阻塞模式。然后,我们使用socket.connect_ex()方法发起连接,并将其注册到选择器中,并指定感兴趣的事件为可读和可写,并关联自定义的数据对象。
在service_connection函数中,我们首先获取文件描述符和关联的数据对象。然后,我们根据事件类型进行相应的操作。如果事件为可读事件,我们使用conn.recv()方法接收服务器发送的数据,并更新接收数据的总量。如果接收总量等于消息总量,则说明已接收完所有消息,我们将连接从选择器中注销,并关闭连接。如果事件为可写事件,我们首先判断输出缓冲区是否为空,如果不为空,则调用conn.send()方法发送数据,并更新输出缓冲区。
最后,我们创建一个默认选择器对象sel。然后,我们定义了一个列表messages,用于存储客户端要发送的消息。我们调用start_connection函数启动连接,指定需要连接的主机、端口和连接数。
在无限循环中,我们使用sel.select()方法等待事件发生,并返回就绪事件列表。然后,我们遍历就绪事件列表,获取每个事件的回调函数,并调用回调函数处理相应的事件。
总结:
通过上述示例,我们可以看到,使用selectors模块可以方便地编写基于事件驱动的服务器和客户端程序。我们可以通过注册和管理套接字相关的事件来实现非阻塞的并发处理,从而提供更好的性能和可扩展性。
