# kafka_demo
**Repository Path**: jonie/kafka_demo
## Basic Information
- **Project Name**: kafka_demo
- **Description**: SpringBoot整合Kafka消息队列
- **Primary Language**: Java
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 2
- **Created**: 2023-03-01
- **Last Updated**: 2023-03-01
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
## 一。环境准备
windows下安装kafka可以参考这一篇博客:[https://blog.csdn.net/w546097639/article/details/88578635](https://blog.csdn.net/w546097639/article/details/88578635)
## 二。项目搭建
1.在pom.xml文件中引入以下依赖:
```xml
org.springframework.kafka
spring-kafka
org.springframework.kafka
spring-kafka-test
test
```
项目完整pom.xml文件内容如下:
```xml
4.0.0
org.springframework.boot
spring-boot-starter-parent
2.5.4
com.example
kafka_demo
0.0.1-SNAPSHOT
kafka_demo
Demo project for Spring Boot
1.8
org.springframework.boot
spring-boot-starter
org.springframework.boot
spring-boot-starter-test
test
org.springframework.boot
spring-boot-starter-web
RELEASE
compile
org.springframework.kafka
spring-kafka
org.springframework.kafka
spring-kafka-test
test
org.projectlombok
lombok
RELEASE
compile
com.fasterxml.jackson.core
jackson-databind
org.springframework.boot
spring-boot-maven-plugin
```
2.配置文件application.yml文件内容如下:
```yml
server:
port: 8001 #端口号
servlet:
context-path: /${spring.application.name} #访问前缀
spring:
application:
name: kafka-demo #服务名称
kafka:
bootstrap-servers: localhost:9092 #kafka地址
consumer:
group-id: order-consumer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #kafkaTemplate.send()方法参数类型
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
security:
protocol: PLAINTEXT
producer:
batch-size: 16384
buffer-memory: 33554432
retries: 0
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
linger.ms: 1
security:
protocol: PLAINTEXT
# kafka自定义消息发送配置
kafka:
topic:
kafka-demo-topic: order-event
kafka-demo-key: order-key
autoCreate: true
```
3.kafka配置类内容如下:
```java
package com.example.kafka_demo.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.TopicBuilder;
/**
* @author Keson
* @version 1.0
* @description: TODO kafka配置类
* @date 2021/9/6 15:33
*/
@Configuration
@EnableKafka //开启Kafka监听器标注的端点
public class KafkaConfig {
@Value("${kafka.topic.kafka-demo-topic}")
private String orderTopic;
@Bean
//条件匹配,如果配置kafka.topic.autoCreate=true的话,该配置类生效,反之则无效
@ConditionalOnProperty(name = "kafka.topic.autoCreate", havingValue = "true")
public NewTopic orderTopic(){
return TopicBuilder.name(orderTopic).build();
}
}
```
4.创建kafka接受消息实体类:
```java
package com.example.kafka_demo.dto;
import lombok.*;
/**
* @author Keson
* @version 1.0
* @description: TODO 消息实体
* @date 2021/9/6 14:56
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class ParamsDetail {
private String orderNumber;
private String orderSubNumber;
private String customerNumber;
private String subAccountNumber;
private String tradeType;
}
```
5.创建KafkaService接口,使用`KafkaTemplate`来发送消息至kafka中:
```java
package com.example.kafka_demo.service;
import com.example.kafka_demo.dto.ParamsDetail;
public interface KafkaService {
//发送消息给kafka
public void sendMessage(ParamsDetail paramsDetail);
}
```
```java
package com.example.kafka_demo.service.impl;
import com.example.kafka_demo.dto.ParamsDetail;
import com.example.kafka_demo.service.KafkaService;
import com.example.kafka_demo.util.JsonUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
/**
* @author Keson
* @version 1.0
* @description: TODO KafkaService实现类
* @date 2021/9/6 15:01
*/
@Service
@Slf4j
public class KafkaServiceImpl implements KafkaService {
@Autowired
private KafkaTemplate kafkaTemplate;
@Value("${kafka.topic.kafka-demo-topic}")
private String topic;
@Value("${kafka.topic.kafka-demo-key}")
private String key;
@Override
public void sendMessage(ParamsDetail detail) {
try {
kafkaTemplate.send(topic, key, JsonUtil.toJson(detail));
} catch (JsonProcessingException e){
log.error("对象转换为Json出错!");
}
}
}
```
6.Json与对象相互转换工具类代码如下:
```java
package com.example.kafka_demo.util;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* @author Keson
* @version 1.0
* @description: TODO Object与Json转换工具类
* @date 2021/9/6 15:08
*/
public class JsonUtil {
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
//将json转换成对象
public static T fromJson(String json, Class clz) throws JsonProcessingException {
return OBJECT_MAPPER.readValue(json, clz);
}
//将对象转换成json格式
public static String toJson(Object object) throws JsonProcessingException {
return OBJECT_MAPPER.writeValueAsString(object);
}
}
```
7.创建消息的消费者,使用`@KafkaListener`实现:
```java
package com.example.kafka_demo.listener;
import com.example.kafka_demo.dto.ParamsDetail;
import com.example.kafka_demo.util.JsonUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @author Keson
* @version 1.0
* @description: TODO kafka消费者
* @date 2021/9/6 15:23
*/
@Component
public class OrderListener {
//用于监听kafka,符合条件的消息能够即时消费
@KafkaListener(topics = {"${kafka.topic.kafka-demo-topic}"}, autoStartup = "${kafka.topic.autoCreate:true}")
public void processing (String message) throws JsonProcessingException {
ParamsDetail detail = JsonUtil.fromJson(message, ParamsDetail.class);
System.err.println(detail);
}
}
```
8.最后创建一个Api用于向kafka发送消息:
```java
package com.example.kafka_demo.api;
import com.example.kafka_demo.dto.ParamsDetail;
import com.example.kafka_demo.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author Keson
* @version 1.0
* @description: TODO kafka测试发送Api
* @date 2021/9/6 15:46
*/
@RestController
@RequestMapping("/order")
public class OrderApi {
@Autowired
private KafkaService kafkaService;
@GetMapping("/sendMessage")
public String sendMessage(@RequestBody ParamsDetail paramsDetail){
kafkaService.sendMessage(paramsDetail);
return "消息发送成功!";
}
}
```
全部创建完成后项目结构如下:

## 三。运行结果
1.启动项目,使用postman测试发送接口:

2.消息成功发送,打开程序查看消息是否成功被消费。控制台输出刚刚发送的消息,说明order-event这个topic下的消息被成功消费:

3.再次连续发几个不同的消息,发现消息即时存入即时消费:
