欢迎访问宙启技术站
智能推送

KafkaSimpleProducer与KafkaConsumer的配合使用方法

发布时间:2024-01-13 00:04:24

KafkaSimpleProducer和KafkaConsumer是Apache Kafka提供的两个重要的类,用于实现Kafka消息的生产和消费。它们可以通过以下步骤配合使用:

1. 引入依赖

首先需要在项目的pom.xml文件中引入kafka-client的依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

2. 创建KafkaSimpleProducer

使用KafkaSimpleProducer可以将消息发送到Kafka集群中的指定topic。创建KafkaSimpleProducer的示例代码如下:

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaSimpleProducerExample {

    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    private final static String TOPIC_NAME = "test-topic";

    public static void main(String[] args) {
        // 配置生产者
        Properties props = new Properties();
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建生产者实例
        Producer<String, String> producer = new KafkaProducer<>(props);

        // 发送消息
        for (int i = 0; i < 10; i++) {
            String key = "key" + i;
            String value = "value" + i;
            producer.send(new ProducerRecord<>(TOPIC_NAME, key, value));
        }

        // 关闭生产者
        producer.close();
    }
}

上述代码中,需要配置Kafka集群的地址和要发送的topic名称,然后创建Producer实例,使用send方法发送消息。

3. 创建KafkaConsumer

使用KafkaConsumer可以从Kafka集群中的指定topic消费消息。创建KafkaConsumer的示例代码如下:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerExample {

    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    private final static String TOPIC_NAME = "test-topic";
    private final static String GROUP_ID = "test-group";

    public static void main(String[] args) {
        // 配置消费者
        Properties props = new Properties();
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("group.id", GROUP_ID);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 创建消费者实例
        Consumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅topic
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        // 消费消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                String key = record.key();
                String value = record.value();
                System.out.println("Received message: " + key + " = " + value);
            }
        }
    }
}

上述代码中,需要配置Kafka集群的地址、要消费的topic名称和消费者分组的ID,然后创建Consumer实例,使用subscribe方法订阅topic,最后使用poll方法从Kafka集群中拉取消息并进行处理。

使用KafkaSimpleProducer和KafkaConsumer配合可以实现消息的生产和消费。在示例代码中,首先使用KafkaSimpleProducer发送消息到名为"test-topic"的topic中,然后使用KafkaConsumer从同一topic中消费消息并打印。可以根据实际需求进行扩展和修改。