入门指南:Apache Pulsar 与 Spring Boot 集成
入门指南:Apache Pulsar 与 Spring Boot 集成
Apache Pulsar 是一个分布式发布-订阅消息系统。虽然 Apache Pulsar 提供的功能与 Apache Kafka 类似,但 Pulsar 旨在克服 Kafka 在高延迟、低吞吐量、扩展和地理复制等方面的限制,并且更多。当处理需要实时处理的大量数据时,Apache Pulsar 是一个很好的选择。
在本教程中,我们将看到如何将 Apache Pulsar 与我们的 Spring Boot 应用程序集成。我们将利用 Pulsar 的 Spring Boot Starter 配置的 PulsarTemplate 和 PulsarListener。我们还将看到如何根据我们的要求修改它们的默认配置。
2. Maven 依赖
我们首先按照 Apache Pulsar 简介中描述的运行一个独立的 Apache Pulsar 服务器。
接下来,让我们将 spring-pulsar-spring-boot-starter 库添加到我们的项目中:
`<dependency>`
`<groupId>`org.springframework.pulsar`</groupId>`
`<artifactId>`spring-pulsar-spring-boot-starter`</artifactId>`
`<version>`0.2.0`</version>`
`</dependency>`
3. PulsarClient
为了与 Pulsar 服务器交互,我们需要配置一个 PulsarClient。默认情况下,Spring 会自动配置一个连接到 localhost:6650 上 Pulsar 服务器的 PulsarClient:
spring:
pulsar:
client:
service-url: pulsar://localhost:6650
我们可以更改此配置以在不同的地址上建立连接。
要连接到安全服务器,我们可以使用 pulsar+ssl 而不是 pulsar。 我们还可以通过向 application.yml 添加 spring.pulsar.client. 属性来配置诸如连接超时、认证和内存限制等属性。
4. 为自定义对象指定模式
我们将使用一个简单的 User 类作为我们的应用程序:
public class User {
private String email;
private String firstName;
// 标准构造函数,getter 和 setter
}
Spring-Pulsar 自动检测原始数据类型并生成相关的模式。但是,如果我们需要使用自定义的 JSON 对象,我们将不得不为 PulsarClient 配置其模式信息:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.baeldung.springpulsar.User
schema-info:
schema-type: JSON
在这里,message-type 属性接受消息类的完全限定名,schema-type 提供要使用的模式类型的信息。对于复杂对象,schema-type 属性接受 AVRO 或 JSON 值。
尽管使用属性文件指定模式是首选方法,我们也可以通过 bean 提供此模式:
@Bean
public SchemaResolverCustomizer`<DefaultSchemaResolver>` schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.JSON(User.class));
};
}
此配置应该同时添加到生产者和监听器应用程序中。
5. 发布者
要发布 Pulsar 主题上的消息,我们将使用 PulsarTemplate。PulsarTemplate 实现了 PulsarOperations 接口,并提供了同步和异步形式发布记录的方法。send 方法阻塞调用以提供同步操作能力,而 sendAsync 方法提供非阻塞的异步操作。
在本教程中,我们将使用同步操作发布记录。
5.1. 发布消息
Spring Boot 自动配置了一个现成的 PulsarTemplate,用于向指定主题发布记录。
让我们创建一个生产者,将 String 消息发布到队列:
@Component
public class PulsarProducer {
@Autowired
private PulsarTemplate`<String>` stringTemplate;
private static final String STRING_TOPIC = "string-topic";
public void sendStringMessageToPulsarTopic(String str) throws PulsarClientException {
stringTemplate.send(STRING_TOPIC, str);
}
}
现在,让我们尝试将一个 User 对象发送到一个新的队列:
@Autowired
private PulsarTemplate`<User>` template;
private static final String USER_TOPIC = "user-topic";
public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
template.send(USER_TOPIC, user);
}
在上面的代码片段中,我们使用 PulsarTemplate 将 User 类的对象发送到名为 user-topic 的 Apache Pulsar 主题。
5.2. 生产者端自定义
PulsarTemplate 接受 TypedMessageBuilderCustomizer 来配置传出消息和 ProducerBuilderCustomizer 来自定义生产者的属性。
我们可以使用 TypedMessageBuilderCustomizer 来配置消息延迟、在特定时间发送、禁用复制并提供额外的属性:
public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
template.newMessage(user)
.withMessageCustomizer(mc -> {
mc.deliverAfter(10L, TimeUnit.SECONDS);
})
.send();
}
ProducerBuilderCustomizer 可以用来添加访问模式、自定义消息路由器和拦截器,并启用或禁用分块和批处理:
public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
template.newMessage(user)
.withProducerCustomizer(pc -> {
pc.accessMode(ProducerAccessMode.Shared);
})
.send();
}
6. 消费者
发布消息到我们的主题后,我们现在将为同一主题建立一个监听器。要启用对主题的监听,我们需要用 @PulsarListener 注解装饰监听方法。
Spring Boot 为监听方法配置了所有必要的组件。
我们还需要使用 @EnablePulsar 来使用 PulsarListener。
6.1. 接收消息
我们将首先为前面部分创建的 string-topic 创建一个监听方法:
@Service
public class PulsarConsumer {
private static final String STRING_TOPIC = "string-topic";
@PulsarListener(
subscriptionName = "string-topic-subscription",
topics = STRING_TOPIC,
subscriptionType = SubscriptionType.Shared
)
public void stringTopicListener(String str) {
LOGGER.info("Received String message: {}", str);
}
}
在这里,《PulsarListener》注解中,我们已经配置了这个方法将监听的 topicName 在 topicName 属性中,并在 subscriptionName 属性中给出了一个订阅名称。
现在,让我们为 User 类使用的 user-topic 创建一个监听方法:
private static final String USER_TOPIC = "user-topic";
@PulsarListener(
subscriptionName = "user-topic-subscription",
topics = USER_TOPIC,
schemaType = SchemaType.JSON
)
public void userTopicListener(User user) {
LOGGER.info("Received user object with email: {}", user.getEmail());
}
除了前面 Listener 方法中提供的属性外,我们还添加了一个 schemaType 属性,其值与生产者中的相同。
我们还将 @EnablePulsar 注解添加到我们的主类中:
@EnablePulsar
@SpringBootApplication
public class SpringPulsarApplication {
public static void main(String[] args) {
SpringApplication.run(SpringPulsarApplication.class, args);
}
}
6.2. 消费者端自定义
除了订阅名称和模式类型外,《PulsarListener》还可以用来配置诸如自动启动、批处理和确认模式等属性:
@PulsarListener(
subscriptionName = "user-topic-subscription",
topics = USER_TOPIC,
subscriptionType = SubscriptionType.Shared,
schemaType = SchemaType.JSON,
ackMode = AckMode.RECORD,
properties = {"ackTimeout=60s"}
)
public void userTopicListener(User user) {
LOGGER.info("Received user object with email: {}", user.getEmail());
}
在这里,我们将确认模式设置为 Record 并将确认超时设置为 60 秒。
7. 使用死信主题
如果消息确认超时或服务器收到 nack,Pulsar 会尝试重新交付消息一定次数。这些重试用尽后,这些未交付的消息可以发送到称为 死信队列 (DLQ) 的队列中。
此选项仅用于 Shared 订阅类型。为了为我们的 user-topic 队列配置 DLQ,我们首先将创建一个 DeadLetterPolicy bean,它将定义重新交付尝试的次数以及用作 DLQ 的队列名称:
private static final String USER_DEAD_LETTER_TOPIC = "user-dead-letter-topic";
@Bean
DeadLetterPolicy deadLetterPolicy() {
return DeadLetterPolicy.builder()
.maxRedeliverCount(10)
.deadLetterTopic(USER_DEAD_LETTER_TOPIC)
.build();
}
现在,我们将此策略添加到我们之前创建的 PulsarListener 中:
@PulsarListener(
subscriptionName = "user-topic-subscription",
topics = USER_TOPIC,
subscriptionType = SubscriptionType.Shared,
schemaType = SchemaType.JSON,
deadLetterPolicy = "deadLetterPolicy",
properties = {"ackTimeout=60s"}
)
public void userTopicListener(User user) {
LOGGER.info("Received user object with email: {}", user.getEmail());
}
在这里,我们已经配置了 userTopicListener 使用我们之前创建的 deadLetterPolicy,并且我们已经配置了 60 秒的确认时间。
我们可以创建一个单独的 Listener 来处理 DQL 中的消息:
@PulsarListener(
subscriptionName = "dead-letter-topic-subscription",
topics = USER_DEAD_LETTER_TOPIC,
subscriptionType = SubscriptionType.Shared
)
public void userDlqTopicListener(User user) {
LOGGER.info("Received user object in user-DLQ with email: {}", user.getEmail());
}
8. 结论
在本教程中,我们看到了如何使用 Apache Pulsar 与我们的 Spring Boot 应用程序以及一些改变默认配置的方法。
正如往常一样,示例实现可以在 GitHub 上找到。
OK