如何订阅Kafka消费者到多个主题 | Baeldung
如何订阅Kafka消费者到多个主题 | Baeldung
1. 概述
在本教程中,我们将学习如何订阅Kafka消费者到多个主题。当相同的业务逻辑用于不同的主题时,这是一个常见的需求。
2. 创建模型类
我们将考虑一个简单的支付系统,有两个Kafka主题,一个用于信用卡支付,另一个用于银行转账。让我们创建模型类:
public class PaymentData {
private String paymentReference;
private String type;
private BigDecimal amount;
private Currency currency;
// 标准getter和setter
}
我们将讨论的第一种方法使用Kafka消费者API。让我们添加所需的Maven依赖项:
```<dependency>```
```<groupId>```org.apache.kafka```</groupId>```
```<artifactId>```kafka-clients```</artifactId>```
```<version>```3.5.1```</version>```
```</dependency>```
让我们也配置Kafka消费者:
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "payments");
kafkaConsumer = new KafkaConsumer<>(properties);
在消费消息之前,我们需要使用_subscribe()_方法订阅_kafkaConsumer_到两个主题:
kafkaConsumer.subscribe(Arrays.asList("card-payments", "bank-transfers"));
我们现在准备好测试我们的配置。让我们在每个主题上发布一条消息:
void publishMessages() throws Exception {
ProducerRecord```<String, String>``` cardPayment = new ProducerRecord<>("card-payments",
"{\"paymentReference\":\"A184028KM0013790\", \"type\":\"card\", \"amount\":\"275\", \"currency\":\"GBP\"}");
kafkaProducer.send(cardPayment).get();
ProducerRecord```<String, String>``` bankTransfer = new ProducerRecord<>("bank-transfers",
"{\"paymentReference\":\"19ae2-18mk73-009\", \"type\":\"bank\", \"amount\":\"150\", \"currency\":\"EUR\"}");
kafkaProducer.send(bankTransfer).get();
}
最后,我们可以编写集成测试:
@Test
void whenSendingMessagesOnTwoTopics_thenConsumerReceivesMessages() throws Exception {
publishMessages();
kafkaConsumer.subscribe(Arrays.asList("card-payments", "bank-transfers"));
int eventsProcessed = 0;
for (ConsumerRecord```<String, String>``` record : kafkaConsumer.poll(Duration.ofSeconds(10))) {
log.info("Event on topic={}, payload={}", record.topic(), record.value());
eventsProcessed++;
}
assertThat(eventsProcessed).isEqualTo(2);
}
4. 使用Spring Kafka订阅多个主题
我们将讨论的第二种方法使用Spring Kafka。
让我们将_spring-kafka_和_jackson-databind_依赖项添加到我们的_pom.xml_:
```<dependency>```
```<groupId>```org.springframework.kafka```</groupId>```
```<artifactId>```spring-kafka```</artifactId>```
```<version>```3.1.2```</version>```
```</dependency>```
```<dependency>```
```<groupId>```com.fasterxml.jackson.core```</groupId>```
```<artifactId>```jackson-databind```</artifactId>```
```<version>```2.15.2```</version>```
```</dependency>```
让我们还定义_ConsumerFactory_和_ConcurrentKafkaListenerContainerFactory_ beans:
@Bean
public ConsumerFactory```<String, PaymentData>``` consumerFactory() {
Map`<String, Object>` config = new HashMap<>();
config.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
config.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(
config, new StringDeserializer(), new JsonDeserializer<>(PaymentData.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory```<String, PaymentData>``` containerFactory() {
ConcurrentKafkaListenerContainerFactory```<String, PaymentData>``` factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
我们需要使用_@KafkaListener_注解的_topics_属性订阅两个主题:
@KafkaListener(topics = { "card-payments", "bank-transfers" }, groupId = "payments")
最后,我们可以创建消费者。此外,我们还包括了Kafka头信息以识别接收到消息的主题:
@KafkaListener(topics = { "card-payments", "bank-transfers" }, groupId = "payments")
public void handlePaymentEvents(
PaymentData paymentData, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Event on topic={}, payload={}", topic, paymentData);
}
让我们验证我们的配置:
@Test
public void whenSendingMessagesOnTwoTopics_thenConsumerReceivesMessages() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(2);
doAnswer(invocation -> {
countDownLatch.countDown();
return null;
}).when(paymentsConsumer)
.handlePaymentEvents(any(), any());
kafkaTemplate.send("card-payments", createCardPayment());
kafkaTemplate.send("bank-transfers", createBankTransfer());
assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
}
5. 使用Kafka CLI订阅多个主题
我们将讨论的最后方法是Kafka CLI。
首先,让我们在每个主题上发送一条消息:
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic card-payments
>{"paymentReference":"A184028KM0013790", "type":"card", "amount":"275", "currency":"GBP"}
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic bank-transfers
>{"paymentReference":"19ae2-18mk73-009", "type":"bank", "amount":"150", "currency":"EUR"}
现在,我们可以启动Kafka CLI消费者。_include_选项允许我们指定要包含的消息消费主题列表:
$ bin/kafka-console-consumer.sh --from-beginning --bootstrap-server localhost:9092 --include "card-payments|bank-transfers"
这是我们运行上一个命令时的输出:
{"paymentReference":"A184028KM0013790", "type":"card", "amount":"275", "currency":"GBP"}
{"paymentReference":"19ae2-18mk73-009", "type":"bank", "amount":"150", "currency":"EUR"}
6. 结论
在本文中,我们学习了三种不同的方法来订阅Kafka消费者到多个主题。这在为多个主题实现相同的功能时非常有用。
前两种方法基于Kafka消费者API和Spring Kafka,可以集成到现有应用程序中。最后一种使用Kafka CLI,可以用来快速验证多个主题。
如往常一样,完整的代码可以在GitHub上找到。