# pulsar-spring-boot-starter **Repository Path**: minz_team/pulsar-spring-boot-starter ## Basic Information - **Project Name**: pulsar-spring-boot-starter - **Description**: pulsar-spring-boot-starter - **Primary Language**: Unknown - **License**: MIT - **Default Branch**: master - **Homepage**: https://gitee.com/minz_team/pulsar-spring-boot-starter - **GVP Project**: No ## Statistics - **Stars**: 11 - **Forks**: 4 - **Created**: 2021-07-22 - **Last Updated**: 2024-01-11 ## Categories & Tags **Categories**: Uncategorized **Tags**: pulsar ## README # pulsar-spring-boot-starter #### 介绍 * [Pulsar](https://pulsar.apache.org/docs/zh-CN/2.7.1/standalone/) 实现多租户,高可靠,解耦, 适用于实时推荐,金融场景 * 参考(抄袭)RocketMQ的脚手架,实现pulsar的脚手架,仅仅作为学习,本公司使用. * 腾讯云TDMQ作为pulsar的商业版本(目前免费), 免去搭建的烦恼.文档参考[TDMQ](https://cloud.tencent.com/document/product/1179) #### pom依赖 ```xml cn.zhaopin.starter pulsar-spring-boot-starter 1.0.0 ``` #### 配置 ``` yml app.mq: type: pulsar pulsar: serviceUrl: http://pulsar-7dodn8wqepzd.tdmq-pulsar.ap-sh.public.tencenttdmq.com:8080 authenticationToken: eyJrZXlJZCI6InB1bHNhci03ZG9kbjh3cWVwemQiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItN2RvZG44d3FlcHpkX3JvbGUtYXRzLWRldiJ9.kbILeNTOlmFq-D9Pbsc6xQsXjIfG3Rz5XI4ed3UNeDM clusterId: pulsar-7dodn8wqepzd namespace: ns-dev-01 ``` #### 生产 ##### 模板方法 ```java @Autowired private PulsarTemplate pulsarTemplate; ``` ##### 普通消息 ```java @Data @ToString public class Demo { private Long id; private String name; private Date date; private LocalDateTime dateTime; private Boolean flag = Boolean.FALSE; } @GetMapping("/sendMq") public void tm() { Demo demo = new Demo(); demo.setDate(new Date()); demo.setDateTime(LocalDateTime.now()); demo.setId(1L); demo.setName("test"); Message message = MessageBuilder.withPayload(demo).build(); MessageId dev_topic_01 = pulsarTemplate.sendSync("dev_topic_01", message); System.out.println(dev_topic_01); } ``` ##### 定时消息 * 例如 定时在 2021-07-30 11:17:00(MQ服务器的时间) 通知消费者 ```java @GetMapping("/sendTimed") public void tam(){ LocalDateTime timed = LocalDateTime.parse("2021-07-30 11:17:00", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Message deliver = MessageBuilder.withPayload("test_deliver").build(); pulsarTemplate.sendTimed("topic_02", deliver, timed); } ``` ##### 延迟消息 * 延迟30s后通知消费者 ```java @GetMapping("/sendDelay") public void tdm(){ Message deliver = MessageBuilder.withPayload("test_deliver").build(); pulsarTemplate.sendDelay("topic_03", deliver, 30L, TimeUnit.SECONDS); } ``` #### 消费 ```java @Slf4j @Component @PulsarMessageListener(topic = "dev_topic_01", subscriptionName = "sub_dev_01", subscriptionType = SubscriptionType.Shared) public class DemoMessageListener implements PulsarListener { @Override public boolean onMessage(PulsarMessageExt message) throws Exception { Demo body = message.getBody(); System.out.println(body.toString()); return true; } } ``` ##### 确认机制 onMessage方法必须返回boolean类型的值, 确保消费者消费到消息 true : 确认消息触达 false : 确认消息未触达, 会导致重试, 消息堆积 #### 不支持 1. 不支持topic, subscriptionName使用占位符(${app.test.topicName})的方式 #### FAQ & 待办事项 1. 传输对象json序列化带class类型和不带如何选择 * ~~目前带有泛型 指定class转具体的对象比较方便~~ * ~~objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);~~ * 2021-08-01重大调整: 通过jackson2实现, 选择不带class类型的序列化, 保证原始json格式, 方便以后扩展消费, 例如: 其他语言也可以无差别消费 * json格式如下: ```json {"payload":{"id":1,"name":"test","date":1627743462539,"dateTime":[2021,7,31,22,57,42,539790500],"flag":false},"headers":{"id":"ef52a865-3439-1fcd-9818-fe9fdaa93100","timestamp":1627743464057}} ``` 2. 本地偶尔能消费到 * 解决方式: 无需解决, 负载均衡(轮询),因为存在多个消费者 3. consumer, producer, client 关闭顺序 * consumer在DefaultPulsarListenerContainer.destroy进行关闭 * producer在PulsarTemplate.destroy进行关闭 * client目前未处理,但会跟随应用关闭 4. PulsarMessageExt泛型处理 * ~~利用ObjectMapper转json带class特定 反序列化比较方便,同问题1~~ * 2021-08-01重大调整: 同问题1 5. 事务消息 * tdmq官方回复: 2.7.1集群不支持事务 ![img.png](img.png) 6. 消费从最早或最近消费, 还需要调试 * Listener注解新增subscriptionInitialPosition属性, 默认最近开始消费 * SubscriptionInitialPosition.Latest 最近消费 * SubscriptionInitialPosition.Earliest 最早消费 7. 创建新的topic以及订阅 启动报错: Topic does not have schema to check * 如果在TDMQ的控制台创建了订阅, 启动服务会报错, 如果自动生成订阅(不创建, 系统启动会自动创建)则不会报错,不会影响到生产消息, 仅仅会影响消费(消费不到数据). 生产一条消息之后, 此时重启服务,就不会报错了. 且消息正常消费. * 暂时想不到解决思路. 8. 自定义重试级别 9. 整合RocketMQ * 是否可以将starter进行再次包装, 通过指定type多种类型, 实现多个MQ的整合 类似于 (type:tdmq,rocketmq).这样tdmq和rocketmq都可以使用