欢迎访问宙启技术站
智能推送

KafkaProducer()在Python中的基本用法详解

发布时间:2023-12-28 03:57:50

KafkaProducer是Kafka提供的一个用于发送消息的客户端类,可以在Python中使用。下面详细介绍KafkaProducer的基本用法,并提供一个使用例子。

1. 导入依赖:首先需要导入kafka模块。

from kafka import KafkaProducer

2. 创建KafkaProducer实例:创建一个KafkaProducer实例,传入Kafka集群的地址列表。如果Kafka集群有多个地址,可以用逗号分隔。

producer = KafkaProducer(bootstrap_servers='kafka_host1:port1,kafka_host2:port2')

3. 发送消息:使用send方法发送消息。send方法接受两个参数, 个参数是要发送的主题(topic),第二个参数是消息的内容。

producer.send('my_topic', b'Hello Kafka!')

4. 发送批量消息:可以使用send方法发送多个消息,send方法接受一个Message对象的列表作为参数。可以使用KeyedMessage类来创建Message对象。

from kafka import KeyedMessage

message1 = KeyedMessage('my_topic', b'message 1', key='key1')
message2 = KeyedMessage('my_topic', b'message 2', key='key2')

producer.send([message1, message2])

5. 添加回调函数:可以为发送的消息添加一个回调函数,当发送完成或出错时会触发该函数。回调函数接受两个参数, 个参数是发送的结果,第二个参数是发送的消息。

def on_send_success(record_metadata):
    print('Message sent successfully:', record_metadata.topic, record_metadata.partition, record_metadata.offset)

def on_send_error(exception):
    print('Message sent failed:', exception)

producer.send('my_topic', b'Hello Kafka!').add_callback(on_send_success).add_errback(on_send_error)

6. 关闭KafkaProducer:在程序结束时,需要关闭KafkaProducer。可以使用close方法来关闭。

producer.close()

上面是KafkaProducer的基本用法,下面提供一个完整的使用例子:

from kafka import KafkaProducer

# 创建KafkaProducer实例
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# 发送消息
producer.send('my_topic', b'Hello Kafka!')

# 添加回调函数
def on_send_success(record_metadata):
    print('Message sent successfully:', record_metadata.topic, record_metadata.partition, record_metadata.offset)

def on_send_error(exception):
    print('Message sent failed:', exception)

producer.send('my_topic', b'Hello Kafka!').add_callback(on_send_success).add_errback(on_send_error)

# 关闭KafkaProducer
producer.close()

以上就是KafkaProducer在Python中的基本用法和一个使用例子。使用KafkaProducer可以很方便地向Kafka集群发送消息,并可以处理发送的结果和错误。