# pulsar-spring-boot-starter
**Repository Path**: minz_team/pulsar-spring-boot-starter
## Basic Information
- **Project Name**: pulsar-spring-boot-starter
- **Description**: pulsar-spring-boot-starter
- **Primary Language**: Unknown
- **License**: MIT
- **Default Branch**: master
- **Homepage**: https://gitee.com/minz_team/pulsar-spring-boot-starter
- **GVP Project**: No
## Statistics
- **Stars**: 11
- **Forks**: 4
- **Created**: 2021-07-22
- **Last Updated**: 2024-01-11
## Categories & Tags
**Categories**: Uncategorized
**Tags**: pulsar
## README
# pulsar-spring-boot-starter
#### 介绍
* [Pulsar](https://pulsar.apache.org/docs/zh-CN/2.7.1/standalone/) 实现多租户,高可靠,解耦, 适用于实时推荐,金融场景
* 参考(抄袭)RocketMQ的脚手架,实现pulsar的脚手架,仅仅作为学习,本公司使用.
* 腾讯云TDMQ作为pulsar的商业版本(目前免费), 免去搭建的烦恼.文档参考[TDMQ](https://cloud.tencent.com/document/product/1179)
#### pom依赖
```xml
cn.zhaopin.starter
pulsar-spring-boot-starter
1.0.0
```
#### 配置
``` yml
app.mq:
type: pulsar
pulsar:
serviceUrl: http://pulsar-7dodn8wqepzd.tdmq-pulsar.ap-sh.public.tencenttdmq.com:8080
authenticationToken: eyJrZXlJZCI6InB1bHNhci03ZG9kbjh3cWVwemQiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItN2RvZG44d3FlcHpkX3JvbGUtYXRzLWRldiJ9.kbILeNTOlmFq-D9Pbsc6xQsXjIfG3Rz5XI4ed3UNeDM
clusterId: pulsar-7dodn8wqepzd
namespace: ns-dev-01
```
#### 生产
##### 模板方法
```java
@Autowired
private PulsarTemplate pulsarTemplate;
```
##### 普通消息
```java
@Data
@ToString
public class Demo {
private Long id;
private String name;
private Date date;
private LocalDateTime dateTime;
private Boolean flag = Boolean.FALSE;
}
@GetMapping("/sendMq")
public void tm() {
Demo demo = new Demo();
demo.setDate(new Date());
demo.setDateTime(LocalDateTime.now());
demo.setId(1L);
demo.setName("test");
Message message = MessageBuilder.withPayload(demo).build();
MessageId dev_topic_01 = pulsarTemplate.sendSync("dev_topic_01", message);
System.out.println(dev_topic_01);
}
```
##### 定时消息
* 例如 定时在 2021-07-30 11:17:00(MQ服务器的时间) 通知消费者
```java
@GetMapping("/sendTimed")
public void tam(){
LocalDateTime timed = LocalDateTime.parse("2021-07-30 11:17:00", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Message deliver = MessageBuilder.withPayload("test_deliver").build();
pulsarTemplate.sendTimed("topic_02", deliver, timed);
}
```
##### 延迟消息
* 延迟30s后通知消费者
```java
@GetMapping("/sendDelay")
public void tdm(){
Message deliver = MessageBuilder.withPayload("test_deliver").build();
pulsarTemplate.sendDelay("topic_03", deliver, 30L, TimeUnit.SECONDS);
}
```
#### 消费
```java
@Slf4j
@Component
@PulsarMessageListener(topic = "dev_topic_01", subscriptionName = "sub_dev_01", subscriptionType = SubscriptionType.Shared)
public class DemoMessageListener implements PulsarListener {
@Override
public boolean onMessage(PulsarMessageExt message) throws Exception {
Demo body = message.getBody();
System.out.println(body.toString());
return true;
}
}
```
##### 确认机制
onMessage方法必须返回boolean类型的值, 确保消费者消费到消息
true : 确认消息触达
false : 确认消息未触达, 会导致重试, 消息堆积
#### 不支持
1. 不支持topic, subscriptionName使用占位符(${app.test.topicName})的方式
#### FAQ & 待办事项
1. 传输对象json序列化带class类型和不带如何选择
* ~~目前带有泛型 指定class转具体的对象比较方便~~
* ~~objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);~~
* 2021-08-01重大调整: 通过jackson2实现, 选择不带class类型的序列化, 保证原始json格式, 方便以后扩展消费, 例如: 其他语言也可以无差别消费
* json格式如下:
```json
{"payload":{"id":1,"name":"test","date":1627743462539,"dateTime":[2021,7,31,22,57,42,539790500],"flag":false},"headers":{"id":"ef52a865-3439-1fcd-9818-fe9fdaa93100","timestamp":1627743464057}}
```
2. 本地偶尔能消费到
* 解决方式: 无需解决, 负载均衡(轮询),因为存在多个消费者
3. consumer, producer, client 关闭顺序
* consumer在DefaultPulsarListenerContainer.destroy进行关闭
* producer在PulsarTemplate.destroy进行关闭
* client目前未处理,但会跟随应用关闭
4. PulsarMessageExt泛型处理
* ~~利用ObjectMapper转json带class特定 反序列化比较方便,同问题1~~
* 2021-08-01重大调整: 同问题1
5. 事务消息
* tdmq官方回复: 2.7.1集群不支持事务

6. 消费从最早或最近消费, 还需要调试
* Listener注解新增subscriptionInitialPosition属性, 默认最近开始消费
* SubscriptionInitialPosition.Latest 最近消费
* SubscriptionInitialPosition.Earliest 最早消费
7. 创建新的topic以及订阅 启动报错: Topic does not have schema to check
* 如果在TDMQ的控制台创建了订阅, 启动服务会报错, 如果自动生成订阅(不创建, 系统启动会自动创建)则不会报错,不会影响到生产消息, 仅仅会影响消费(消费不到数据).
生产一条消息之后, 此时重启服务,就不会报错了. 且消息正常消费.
* 暂时想不到解决思路.
8. 自定义重试级别
9. 整合RocketMQ
* 是否可以将starter进行再次包装, 通过指定type多种类型, 实现多个MQ的整合 类似于 (type:tdmq,rocketmq).这样tdmq和rocketmq都可以使用