# kafkareliable **Repository Path**: xcOschina/kafkareliable ## Basic Information - **Project Name**: kafkareliable - **Description**: 这是一个生产级的高吞吐可靠Kafka消费者模板,支持多种序列化格式(JSON、Avro、Protobuf),具备死信队列、监控指标、优雅启停等完整功能。 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2025-12-24 - **Last Updated**: 2026-01-05 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 高吞吐可靠Kafka消费者模板 ## 项目概述 这是一个生产级的高吞吐可靠Kafka消费者模板,支持多种序列化格式(JSON、Avro、Protobuf),具备死信队列、监控指标、优雅启停等完整功能。 ### 核心特性 - **多格式支持**:支持 JSON、Avro、Protobuf 三种序列化格式 - **死信队列(DLQ)**:处理失败的消息自动发送到死信队列 - **完整监控**:提供详细的处理指标和性能统计 - **自动注册**:消费者自动注册,无需手动配置 - **泛型支持**:支持强类型的泛型消费者 - **灵活配置**:支持多种配置选项和扩展点 ## 项目结构 ``` src/main/java/com/badu/kafka/ ├── config/ # 配置类 │ ├── KafkaConsumerConfig.java │ ├── KafkaConsumerAutoConfiguration.java │ ├── KafkaConsumerProperties.java │ └── CustomKafkaListenerContainerFactory.java ├── consumer/ # 消费者核心组件 │ ├── TypedKafkaConsumer.java # 泛型消费者基类 │ ├── MultiFormatDeserializer.java # 多格式反序列化器 │ ├── KafkaTopic.java # 消费者注解 │ ├── AutoKafkaConsumerRegistrar.java # 自动注册器 │ └── ConsumerLifecycleManager.java # 生命周期管理 ├── consumer/impl/ # 消费者实现示例 │ ├── NotificationJsonConsumer.java # JSON消费者示例 │ └── OrderConsumer.java # Avro消费者示例 ├── dlq/ # 死信队列组件 │ ├── DeadLetterQueueProducer.java │ └── DlqMessage.java ├── monitoring/ # 监控组件 │ └── ConsumerMetrics.java ├── producer/ # 生产者组件 │ ├── SimpleMessageSender.java │ └── GenericKafkaProducer.java ├── exception/ # 异常类 │ └── ConsumerProcessingException.java └── model/ # 数据模型 └── OrderEventProto.java # Protobuf 消息定义 ``` ## 快速开始 ### 1. JSON 消费者实现 创建 JSON 格式的消费者,继承 `TypedKafkaConsumer>`: ```java @Slf4j @Component @KafkaTopic("json-topic") public class JsonConsumer extends TypedKafkaConsumer> { @Override protected void handleMessage(Map message, ConsumerRecord record) { log.info("Processing JSON message: {}", message); // 实现消息处理逻辑 } } ``` ### 2. Avro 消费者实现 创建 Avro 格式的消费者,处理 Avro 特定记录: ```java @Slf4j @Component @KafkaTopic("avro-topic") public class AvroOrderConsumer extends TypedKafkaConsumer { @Override protected void handleMessage(Order order, ConsumerRecord record) { log.info("Processing Avro order: {}", order.getOrderId()); // 实现 Avro 消息处理逻辑 } } ``` ### 3. Protobuf 消费者实现 创建 Protobuf 格式的消费者,处理 Protobuf 消息: ```java @Slf4j @Component @KafkaTopic("protobuf-topic") public class ProtobufOrderEventConsumer extends TypedKafkaConsumer { @Override protected void handleMessage(OrderEventProto.OrderEvent orderEvent, ConsumerRecord record) { log.info("Processing Protobuf order event: {}", orderEvent.getEventType()); // 实现 Protobuf 消息处理逻辑 } } ``` ### 4. 配置文件 在 `application.yml` 中配置相关参数: ```yaml spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: reliable-consumer-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer properties: max.poll.records: 1000 fetch.min.bytes: 10240 fetch.max.wait.ms: 500 enable.auto.commit: false isolation.level: read_committed # Schema Registry (用于 Avro) schema: registry: url: http://localhost:8081 # DLQ 配置 kafka: dlq: topic: dead-letter-topic # 消费者配置 kafka-consumer: concurrency: 3 max-attempts: 3 backoff: 1000 ``` ## 核心功能 ### 1. 多格式自动识别 - 通过主题后缀自动识别格式:`-json`、`-avro`、`-pb` - 通过消息头 `format` 字段指定格式 - 支持动态注册 Protobuf 解析器 ### 2. 死信队列(DLQ) - 处理失败的消息自动发送到 DLQ 主题 - 包含原始消息、错误信息和堆栈跟踪 - 可配置的重试次数和退避策略 ### 3. 监控指标 - 消息处理成功率/失败率 - 处理时间统计 - 消息吞吐量监控 - 支持 Micrometer 指标收集 ### 4. 自动注册机制 - 基于注解自动发现和注册消费者 - 根据泛型类型自动配置反序列化器 - 支持 JSON、Avro、Protobuf 类型自动注册 ### 5. 泛型类型安全 - 使用泛型确保类型安全 - 自动反序列化为正确的类型 - 编译时类型检查 ## 部署说明 1. 确保 Kafka 集群正常运行 2. 如使用 Avro,确保 Schema Registry 正常运行 3. 创建所需的 topics 4. 根据环境配置 `application.yml` 5. 运行应用:`mvn spring-boot:run` ## 监控和运维 - 通过 `/actuator/prometheus` 端点获取指标 - 检查 DLQ 主题以了解失败的消息 - 监控应用日志以跟踪处理状态 - 使用 Kafka 管理工具监控消费者组状态 ## 技术栈 - Spring Boot 3.x - Spring Kafka - Apache Avro - Google Protobuf - Jackson - Micrometer - Lombok ## 扩展点 项目设计了多个扩展点: - 自定义反序列化器 - 自定义 DLQ 处理逻辑 - 自定义监控指标 - 自定义消费者生命周期管理