# kafka_demo **Repository Path**: jonie/kafka_demo ## Basic Information - **Project Name**: kafka_demo - **Description**: SpringBoot整合Kafka消息队列 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 2 - **Created**: 2023-03-01 - **Last Updated**: 2023-03-01 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 一。环境准备 windows下安装kafka可以参考这一篇博客:[https://blog.csdn.net/w546097639/article/details/88578635](https://blog.csdn.net/w546097639/article/details/88578635) ## 二。项目搭建 1.在pom.xml文件中引入以下依赖: ```xml org.springframework.kafka spring-kafka org.springframework.kafka spring-kafka-test test ``` 项目完整pom.xml文件内容如下: ```xml 4.0.0 org.springframework.boot spring-boot-starter-parent 2.5.4 com.example kafka_demo 0.0.1-SNAPSHOT kafka_demo Demo project for Spring Boot 1.8 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-starter-web RELEASE compile org.springframework.kafka spring-kafka org.springframework.kafka spring-kafka-test test org.projectlombok lombok RELEASE compile com.fasterxml.jackson.core jackson-databind org.springframework.boot spring-boot-maven-plugin ``` 2.配置文件application.yml文件内容如下: ```yml server: port: 8001 #端口号 servlet: context-path: /${spring.application.name} #访问前缀 spring: application: name: kafka-demo #服务名称 kafka: bootstrap-servers: localhost:9092 #kafka地址 consumer: group-id: order-consumer key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #kafkaTemplate.send()方法参数类型 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: earliest security: protocol: PLAINTEXT producer: batch-size: 16384 buffer-memory: 33554432 retries: 0 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: linger.ms: 1 security: protocol: PLAINTEXT # kafka自定义消息发送配置 kafka: topic: kafka-demo-topic: order-event kafka-demo-key: order-key autoCreate: true ``` 3.kafka配置类内容如下: ```java package com.example.kafka_demo.config; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.TopicBuilder; /** * @author Keson * @version 1.0 * @description: TODO kafka配置类 * @date 2021/9/6 15:33 */ @Configuration @EnableKafka //开启Kafka监听器标注的端点 public class KafkaConfig { @Value("${kafka.topic.kafka-demo-topic}") private String orderTopic; @Bean //条件匹配,如果配置kafka.topic.autoCreate=true的话,该配置类生效,反之则无效 @ConditionalOnProperty(name = "kafka.topic.autoCreate", havingValue = "true") public NewTopic orderTopic(){ return TopicBuilder.name(orderTopic).build(); } } ``` 4.创建kafka接受消息实体类: ```java package com.example.kafka_demo.dto; import lombok.*; /** * @author Keson * @version 1.0 * @description: TODO 消息实体 * @date 2021/9/6 14:56 */ @Data @Builder @NoArgsConstructor @AllArgsConstructor @ToString public class ParamsDetail { private String orderNumber; private String orderSubNumber; private String customerNumber; private String subAccountNumber; private String tradeType; } ``` 5.创建KafkaService接口,使用`KafkaTemplate`来发送消息至kafka中: ```java package com.example.kafka_demo.service; import com.example.kafka_demo.dto.ParamsDetail; public interface KafkaService { //发送消息给kafka public void sendMessage(ParamsDetail paramsDetail); } ``` ```java package com.example.kafka_demo.service.impl; import com.example.kafka_demo.dto.ParamsDetail; import com.example.kafka_demo.service.KafkaService; import com.example.kafka_demo.util.JsonUtil; import com.fasterxml.jackson.core.JsonProcessingException; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; /** * @author Keson * @version 1.0 * @description: TODO KafkaService实现类 * @date 2021/9/6 15:01 */ @Service @Slf4j public class KafkaServiceImpl implements KafkaService { @Autowired private KafkaTemplate kafkaTemplate; @Value("${kafka.topic.kafka-demo-topic}") private String topic; @Value("${kafka.topic.kafka-demo-key}") private String key; @Override public void sendMessage(ParamsDetail detail) { try { kafkaTemplate.send(topic, key, JsonUtil.toJson(detail)); } catch (JsonProcessingException e){ log.error("对象转换为Json出错!"); } } } ``` 6.Json与对象相互转换工具类代码如下: ```java package com.example.kafka_demo.util; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; /** * @author Keson * @version 1.0 * @description: TODO Object与Json转换工具类 * @date 2021/9/6 15:08 */ public class JsonUtil { public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); //将json转换成对象 public static T fromJson(String json, Class clz) throws JsonProcessingException { return OBJECT_MAPPER.readValue(json, clz); } //将对象转换成json格式 public static String toJson(Object object) throws JsonProcessingException { return OBJECT_MAPPER.writeValueAsString(object); } } ``` 7.创建消息的消费者,使用`@KafkaListener`实现: ```java package com.example.kafka_demo.listener; import com.example.kafka_demo.dto.ParamsDetail; import com.example.kafka_demo.util.JsonUtil; import com.fasterxml.jackson.core.JsonProcessingException; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * @author Keson * @version 1.0 * @description: TODO kafka消费者 * @date 2021/9/6 15:23 */ @Component public class OrderListener { //用于监听kafka,符合条件的消息能够即时消费 @KafkaListener(topics = {"${kafka.topic.kafka-demo-topic}"}, autoStartup = "${kafka.topic.autoCreate:true}") public void processing (String message) throws JsonProcessingException { ParamsDetail detail = JsonUtil.fromJson(message, ParamsDetail.class); System.err.println(detail); } } ``` 8.最后创建一个Api用于向kafka发送消息: ```java package com.example.kafka_demo.api; import com.example.kafka_demo.dto.ParamsDetail; import com.example.kafka_demo.service.KafkaService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author Keson * @version 1.0 * @description: TODO kafka测试发送Api * @date 2021/9/6 15:46 */ @RestController @RequestMapping("/order") public class OrderApi { @Autowired private KafkaService kafkaService; @GetMapping("/sendMessage") public String sendMessage(@RequestBody ParamsDetail paramsDetail){ kafkaService.sendMessage(paramsDetail); return "消息发送成功!"; } } ``` 全部创建完成后项目结构如下: ![在这里插入图片描述](https://img-blog.csdnimg.cn/f32b30422ff74ae8b4afae2f29a8c6e0.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAS2Vzb24gWg==,size_20,color_FFFFFF,t_70,g_se,x_16) ## 三。运行结果 1.启动项目,使用postman测试发送接口: ![在这里插入图片描述](https://img-blog.csdnimg.cn/ffae1cf8c7544e0aa074e4cd6d7c2053.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAS2Vzb24gWg==,size_20,color_FFFFFF,t_70,g_se,x_16) 2.消息成功发送,打开程序查看消息是否成功被消费。控制台输出刚刚发送的消息,说明order-event这个topic下的消息被成功消费: ![在这里插入图片描述](https://img-blog.csdnimg.cn/af5a74375d054454a0b4f31409e34b43.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAS2Vzb24gWg==,size_20,color_FFFFFF,t_70,g_se,x_16) 3.再次连续发几个不同的消息,发现消息即时存入即时消费: ![在这里插入图片描述](https://img-blog.csdnimg.cn/84d7c1ea4a3145be8feb7844d33120b5.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAS2Vzb24gWg==,size_20,color_FFFFFF,t_70,g_se,x_16)