# spring-boot-demo-mq-kafka
**Repository Path**: mogubianda/kafka
## Basic Information
- **Project Name**: spring-boot-demo-mq-kafka
- **Description**: No description available
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2019-09-06
- **Last Updated**: 2020-12-19
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# spring-boot-demo-mq-kafka
> 本 demo 主要演示了 Spring Boot 如何集成 kafka,实现消息的发送和接收。
## 环境准备
> 注意:本 demo 基于 Spring Boot 2.1.0.RELEASE 版本,因此 spring-kafka 的版本为 2.2.0.RELEASE,kafka-clients 的版本为2.0.0,所以 kafka 的版本选用为 kafka_2.11-2.1.0
创建一个名为 `test` 的Topic
```bash
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
```
## pom.xml
```xml
4.0.0
spring-boot-demo-mq-kafka
1.0.0-SNAPSHOT
jar
spring-boot-demo-mq-kafka
Demo project for Spring Boot
com.xkcoding
spring-boot-demo
1.0.0-SNAPSHOT
UTF-8
UTF-8
1.8
org.springframework.boot
spring-boot-starter
org.springframework.kafka
spring-kafka
org.springframework.boot
spring-boot-starter-test
test
org.projectlombok
lombok
true
cn.hutool
hutool-all
com.google.guava
guava
spring-boot-demo-mq-kafka
org.springframework.boot
spring-boot-maven-plugin
```
## application.yml
```yaml
server:
port: 8080
servlet:
context-path: /demo
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
retries: 0
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: spring-boot-demo
# 手动提交
enable-auto-commit: false
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
session.timeout.ms: 60000
listener:
log-container-config: false
concurrency: 5
# 手动提交
ack-mode: manual_immediate
```
## KafkaConfig.java
```java
/**
*
* kafka配置类
*
*
* @package: com.xkcoding.mq.kafka.config
* @description: kafka配置类
* @author: yangkai.shen
* @date: Created in 2019-01-07 14:49
* @copyright: Copyright (c) 2019
* @version: V1.0
* @modified: yangkai.shen
*/
@Configuration
@EnableConfigurationProperties({KafkaProperties.class})
@EnableKafka
@AllArgsConstructor
public class KafkaConfig {
private final KafkaProperties kafkaProperties;
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
@Bean("ackContainerFactory")
public ConcurrentKafkaListenerContainerFactory ackContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
return factory;
}
}
```
## MessageHandler.java
```java
/**
*
* 消息处理器
*
*
* @package: com.xkcoding.mq.kafka.handler
* @description: 消息处理器
* @author: yangkai.shen
* @date: Created in 2019-01-07 14:58
* @copyright: Copyright (c) 2019
* @version: V1.0
* @modified: yangkai.shen
*/
@Component
@Slf4j
public class MessageHandler {
@KafkaListener(topics = KafkaConsts.TOPIC_TEST, containerFactory = "ackContainerFactory")
public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
try {
String message = (String) record.value();
log.info("收到消息: {}", message);
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
// 手动提交 offset
acknowledgment.acknowledge();
}
}
}
```
## SpringBootDemoMqKafkaApplicationTests.java
```java
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootDemoMqKafkaApplicationTests {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 测试发送消息
*/
@Test
public void testSend() {
kafkaTemplate.send(KafkaConsts.TOPIC_TEST, "hello,kafka...");
}
}
```
## 参考
1. Spring Boot 版本和 Spring-Kafka 的版本对应关系:https://spring.io/projects/spring-kafka
| Spring for Apache Kafka Version | Spring Integration for Apache Kafka Version | kafka-clients |
| ------------------------------- | ------------------------------------------- | ------------------- |
| 2.2.x | 3.1.x | 2.0.0, 2.1.0 |
| 2.1.x | 3.0.x | 1.0.x, 1.1.x, 2.0.0 |
| 2.0.x | 3.0.x | 0.11.0.x, 1.0.x |
| 1.3.x | 2.3.x | 0.11.0.x, 1.0.x |
| 1.2.x | 2.2.x | 0.10.2.x |
| 1.1.x | 2.1.x | 0.10.0.x, 0.10.1.x |
| 1.0.x | 2.0.x | 0.9.x.x |
| N/A* | 1.3.x | 0.8.2.2 |
> **IMPORTANT:** This matrix is client compatibility; in most cases (since 0.10.2.0) newer clients can communicate with older brokers. All users with brokers >= 0.10.x.x **(and all spring boot 1.5.x users)** are recommended to use spring-kafka version 1.3.x or higher due to its simpler threading model thanks to [KIP-62](https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread). For a complete discussion about client/broker compatibility, see the Kafka [Compatibility Matrix](https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix)
>
> - Spring Integration Kafka versions prior to 2.0 pre-dated the Spring for Apache Kafka project and therefore were not based on it.
>
> These versions will be referenced transitively when using maven or gradle for version management. For the 1.1.x version, the 0.10.1.x is the default.
>
> 2.1.x uses the 1.1.x kafka-clients by default. When overriding the kafka-clients for 2.1.x see [the documentation appendix](https://docs.spring.io/spring-kafka/docs/2.1.x/reference/html/deps-for-11x.html).
>
> 2.2.x uses the 2.0.x kafka-clients by default. When overriding the kafka-clients for 2.2.x see [the documentation appendix](https://docs.spring.io/spring-kafka/docs/2.2.1.BUILD-SNAPSHOT/reference/html/deps-for-21x.html).
>
> - Spring Boot 1.5 users should use 1.3.x (Boot dependency management will use 1.1.x by default so this should be overridden).
> - Spring Boot 2.0 users should use 2.0.x (Boot dependency management will use the correct version).
> - Spring Boot 2.1 users should use 2.2.x (Boot dependency management will use the correct version).
2. Spring-Kafka 官方文档:https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/