KafkaSimpleProducer与KafkaConsumer的配合使用方法
发布时间:2024-01-13 00:04:24
KafkaSimpleProducer和KafkaConsumer是Apache Kafka提供的两个重要的类,用于实现Kafka消息的生产和消费。它们可以通过以下步骤配合使用:
1. 引入依赖
首先需要在项目的pom.xml文件中引入kafka-client的依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
2. 创建KafkaSimpleProducer
使用KafkaSimpleProducer可以将消息发送到Kafka集群中的指定topic。创建KafkaSimpleProducer的示例代码如下:
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaSimpleProducerExample {
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static String TOPIC_NAME = "test-topic";
public static void main(String[] args) {
// 配置生产者
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建生产者实例
Producer<String, String> producer = new KafkaProducer<>(props);
// 发送消息
for (int i = 0; i < 10; i++) {
String key = "key" + i;
String value = "value" + i;
producer.send(new ProducerRecord<>(TOPIC_NAME, key, value));
}
// 关闭生产者
producer.close();
}
}
上述代码中,需要配置Kafka集群的地址和要发送的topic名称,然后创建Producer实例,使用send方法发送消息。
3. 创建KafkaConsumer
使用KafkaConsumer可以从Kafka集群中的指定topic消费消息。创建KafkaConsumer的示例代码如下:
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerExample {
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static String TOPIC_NAME = "test-topic";
private final static String GROUP_ID = "test-group";
public static void main(String[] args) {
// 配置消费者
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("group.id", GROUP_ID);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 创建消费者实例
Consumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅topic
consumer.subscribe(Arrays.asList(TOPIC_NAME));
// 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
System.out.println("Received message: " + key + " = " + value);
}
}
}
}
上述代码中,需要配置Kafka集群的地址、要消费的topic名称和消费者分组的ID,然后创建Consumer实例,使用subscribe方法订阅topic,最后使用poll方法从Kafka集群中拉取消息并进行处理。
使用KafkaSimpleProducer和KafkaConsumer配合可以实现消息的生产和消费。在示例代码中,首先使用KafkaSimpleProducer发送消息到名为"test-topic"的topic中,然后使用KafkaConsumer从同一topic中消费消息并打印。可以根据实际需求进行扩展和修改。
