# MultiKafkaConsumerStarter **Repository Path**: bysun1/MultiKafkaConsumerStarter ## Basic Information - **Project Name**: MultiKafkaConsumerStarter - **Description**: No description available - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-01-16 - **Last Updated**: 2024-01-16 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # MultiKafkaConsumerStarter [V1.0] SpringBoot 零代码方式整合多个kafka数据源,支持任意kafka集群,已封装为一个小模块,集成所有kafka配置,让注意力重新回归业务本身。 [参考文档](http://t.csdnimg.cn/SHwBF) ## 一、功能特性 * SpringBoot无编程方式整合多个kafka数据源 ## 二、快速开始 1、引入最新依赖包,如果找不到依赖包,请到工程目录```mvn clean package install```执行一下命令。 ```xml io.github.vipjoey multi-kafka-consumer-starter 最新版本号 ``` 2、添加kafka地址等相关配置。 ```properties ## topic1的kafka配置 spring.kafka.one.enabled=true spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers} spring.kafka.one.topic=mmc-topic-one spring.kafka.one.group-id=group-consumer-one spring.kafka.one.processor=你的处理类bean名称(例如:oneProcessor) spring.kafka.one.consumer.auto-offset-reset=latest spring.kafka.one.consumer.max-poll-records=10 spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer ## topic2的kafka配置 spring.kafka.two.enabled=true spring.kafka.two.consumer.bootstrapServers=${spring.embedded.kafka.brokers} spring.kafka.two.topic=mmc-topic-two spring.kafka.two.group-id=group-consumer-two spring.kafka.two.processor=你的处理类bean名称 spring.kafka.two.consumer.auto-offset-reset=latest spring.kafka.two.consumer.max-poll-records=10 spring.kafka.two.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.two.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer ``` 3、新建kafka消息对应的实体类,要求需要实现`MmcKafkaMsg`接口,例如 ```java @Data class DemoMsg implements MmcKafkaMsg { private String routekey; private String name; private Long timestamp; } ``` 4、新建kafka消息处理类,要求继承`MmcKafkaKafkaAbastrctProcessor`,然后就可以愉快地编写你的业务逻辑了。 ```java @Slf4j @Service("oneProcessor") // 你的处理类bean名称,如果没有定义bean名称,那么默认就是首字母缩写的类名称 public class OneProcessor extends MmcKafkaKafkaAbastrctProcessor { @Override protected Class getEntityClass() { return DemoMsg.class; } @Override protected void dealMessage(List datas) { // 下面开始编写你的业务代码 } } ``` ## 三、变更记录 * 20231111 初始化 ## 四、特别说明 * 欢迎共建 * 佛系改bug