# mq学习笔记 **Repository Path**: A66626/mq-learning-notes ## Basic Information - **Project Name**: mq学习笔记 - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-02-05 - **Last Updated**: 2026-02-05 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 工作区说明(ai-demo4) 本目录包含 3 个相互独立的 Demo: 1. `springboot-kafka-demo/`:Spring Boot 集成 Kafka(强调“尽量不丢 + 幂等/死信”) 2. `springboot-rocketmq-demo/`:Spring Boot 集成 RocketMQ 5.x(含“特殊消息”与“事务消息两种绑定方式”) 3. `rtspConvert/`:RTSP H.265/HEVC -> H.264 转码服务(FastAPI + FFmpeg + RTSP Server) 下面按项目分别说明:如何运行、有哪些接口、涉及哪些关键代码文件。 ## 1) Kafka Demo:`springboot-kafka-demo/` ### 1.1 目标与特性 - Producer 侧:`acks=all` + 幂等生产者 + 等待 broker ack(尽量避免“发送阶段丢消息”) - Consumer 侧:手动提交 offset(处理成功后再提交)+ 失败重试 + 失败进入 DLT(避免“消费阶段丢消息/毒丸阻塞”) - 业务幂等示例:用 `eventId` 做去重(demo 用内存 Set,生产建议 DB 唯一键/幂等表) ### 1.1.1 Kafka 的 ack/提交机制(面试必问) Kafka 里常说的“ack”分两类:**生产端 ack** 和 **消费端提交 offset**。 生产端(Producer)`acks`: - `acks=0`:不等 broker 确认,吞吐最高,但你几乎无法知道是否写入成功。 - `acks=1`:只要 Leader 写入成功就确认;Leader 切换时存在丢失风险。 - `acks=all`:等待 ISR 副本确认后再返回,可靠性最高(需要集群侧 `min.insync.replicas` 配合才更有意义)。 消费端(Consumer)“ack”本质是**提交 offset**: - 先处理后提交:At-least-once(尽量不丢,但可能重复)。 - 先提交后处理:At-most-once(可能丢,但不会重复)。 本 demo 对应: - Producer `acks=all`:`springboot-kafka-demo/src/main/resources/application.yml` - Producer 等待 broker 返回:`springboot-kafka-demo/src/main/java/com/example/kafkademo/web/ProduceController.java` - Consumer 手动提交 offset(处理成功后才 ack):`springboot-kafka-demo/src/main/java/com/example/kafkademo/consumer/OrderCreatedListener.java` - 发送 `orderId=FAIL-xxx` 可触发消费端抛异常,观察“未提交 offset -> 重试 -> DLT” ### 1.2 如何运行 启动 Kafka(本地单节点,KRaft): ```powershell cd springboot-kafka-demo docker compose up -d ``` 启动应用: ```powershell cd springboot-kafka-demo mvn -q spring-boot:run ``` 发送消息: ```powershell curl -X POST "http://localhost:8088/produce/order-created?orderId=1001" ``` ### 1.3 接口列表 - `POST /produce/order-created?orderId=...`:发送一条 `OrderCreatedEvent`(Producer 同步等待 ack) ### 1.4 关键代码文件(全部列出) 配置与入口: - `springboot-kafka-demo/pom.xml` - `springboot-kafka-demo/src/main/resources/application.yml` - `springboot-kafka-demo/src/main/java/com/example/kafkademo/KafkaDemoApplication.java` 生产端(等待 broker ack,避免“只入本机缓冲就当成功”): - `springboot-kafka-demo/src/main/java/com/example/kafkademo/web/ProduceController.java` 消费端(手动 ack:处理成功后提交 offset;并演示业务幂等): - `springboot-kafka-demo/src/main/java/com/example/kafkademo/consumer/OrderCreatedListener.java` 失败重试 + DLT(失败最终投递到 `topic.DLT`): - `springboot-kafka-demo/src/main/java/com/example/kafkademo/config/KafkaConfig.java` 事件模型: - `springboot-kafka-demo/src/main/java/com/example/kafkademo/model/OrderCreatedEvent.java` 运行说明: - `springboot-kafka-demo/docker-compose.yml` - `springboot-kafka-demo/README.md` - `springboot-kafka-demo/INTERVIEW_NOTES.md`(中文面试笔记 + 流程图) ## 2) RocketMQ Demo:`springboot-rocketmq-demo/` ### 2.1 目标与特性 - Producer 侧:同步发送拿到 `SendResult` + 重试(尽量避免“发送阶段不知成功失败”) - Consumer 侧:失败抛异常触发 RocketMQ 自动重试;超过次数进入 DLQ(死信队列) - “特殊消息”发送:Tag、延时/定时、事务、批量、OneWay、顺序、广播 - 事务消息提供两种“区分 listener”的实现方式: - 方式一:**同一个事务 listener 内按 `TYPE` 分发业务** - 方式二:**多 producerGroup + 多 RocketMQTemplate + 多事务 listener 隔离** ### 2.1.1 RocketMQ 的 ack 机制(面试必问) RocketMQ 的“ack”同样分两类:**生产端发送确认** 和 **消费端消费确认**。 生产端(Producer): - 同步发送(sync):方法返回 `SendResult`,`SendStatus.SEND_OK` 表示 broker 已接收并按策略写入。 - 异步发送(async):通过回调拿到成功/失败。 - OneWay:不等待结果,最快但不适合“不能丢”的业务。 消费端(Consumer): - 一般语义是 At-least-once。 - 消费成功:正常返回(不抛异常)即表示成功确认。 - 消费失败:抛异常/返回失败,RocketMQ 会按策略重试投递;超过次数进入 DLQ(`%DLQ%{consumerGroup}`)。 本 demo 对应: - Producer 同步发送并检查 `SendResult`:`springboot-rocketmq-demo/src/main/java/com/example/rocketmqdemo/web/ProduceController.java` - 特殊消息(含 OneWay/事务):`springboot-rocketmq-demo/src/main/java/com/example/rocketmqdemo/web/SpecialMessageController.java` - Consumer 失败重试与 DLQ:`springboot-rocketmq-demo/src/main/java/com/example/rocketmqdemo/consumer/OrderCreatedConsumer.java` - 发送 `orderId=FAIL-xxx` 可触发消费端抛异常,观察“重试 -> 死信队列 `%DLQ%order-service`” ### 2.2 如何运行 启动 RocketMQ(NameServer + Broker + Console): ```powershell cd springboot-rocketmq-demo docker compose up -d ``` 控制台(可选): - http://localhost:8080 启动应用: ```powershell cd springboot-rocketmq-demo mvn -q spring-boot:run ``` ### 2.3 接口列表 基础消息: - `POST /produce/order-created?orderId=...`:基础发送(同步发送 + 检查结果) 特殊消息发送(重点): - `POST /produce/special/tag?orderId=...&tag=TagA|TagB`:Tag 消息(用于过滤消费) - `POST /produce/special/delay-seconds?orderId=...&delaySeconds=10`:延时消息(按秒延迟投递) - `POST /produce/special/deliver-at?orderId=...&deliverAfterSeconds=10`:定时消息(指定未来投递时间) - `POST /produce/special/transaction?orderId=...&txOk=true|false&type=ORDER|REFUND`:事务消息(方式一:listener 内分发) - `POST /produce/special/transaction-order?orderId=...&txOk=true|false`:事务消息(方式一:ORDER) - `POST /produce/special/transaction-refund?orderId=...&txOk=true|false`:事务消息(方式一:REFUND) - `POST /produce/special/batch?count=5&tag=TagA`:批量消息 - `POST /produce/special/oneway?orderId=...`:OneWay(单向,不等结果,不适合“不能丢”的业务) 事务消息方式二(多 template 多 listener): - `POST /produce/m2/tx-order?orderId=...&rollback=false|true`:订单事务(进入 `M2OrderTxListener`) - `POST /produce/m2/tx-refund?orderId=...&rollback=false|true`:退款事务(进入 `M2RefundTxListener`) ### 2.4 关键代码文件(全部列出) 配置与入口: - `springboot-rocketmq-demo/pom.xml` - `springboot-rocketmq-demo/src/main/resources/application.yml` - `springboot-rocketmq-demo/src/main/java/com/example/rocketmqdemo/RocketMqDemoApplication.java` 基础发送与基础消费(At-least-once + 幂等示例): - `springboot-rocketmq-demo/src/main/java/com/example/rocketmqdemo/web/ProduceController.java` - `springboot-rocketmq-demo/src/main/java/com/example/rocketmqdemo/consumer/OrderCreatedConsumer.java` 特殊消息发送(Tag/延时/定时/事务/批量/OneWay): - `springboot-rocketmq-demo/src/main/java/com/example/rocketmqdemo/web/SpecialMessageController.java` 消费模式示例(便于理解/面试): - `springboot-rocketmq-demo/src/main/java/com/example/rocketmqdemo/consumer/TagAConsumer.java`(Tag 过滤) - `springboot-rocketmq-demo/src/main/java/com/example/rocketmqdemo/consumer/OrderedConsumer.java`(顺序消费) - `springboot-rocketmq-demo/src/main/java/com/example/rocketmqdemo/consumer/BroadcastConsumer.java`(广播消费) 事务消息:方式一(一个 listener 内按 TYPE 分发业务): - `springboot-rocketmq-demo/src/main/java/com/example/rocketmqdemo/tx/OrderTxListener.java` - `springboot-rocketmq-demo/src/main/java/com/example/rocketmqdemo/tx/TxHandler.java` - `springboot-rocketmq-demo/src/main/java/com/example/rocketmqdemo/tx/OrderTxHandler.java` - `springboot-rocketmq-demo/src/main/java/com/example/rocketmqdemo/tx/RefundTxHandler.java` 事务消息:方式二(多 producerGroup + 多 template + 多 listener 隔离): - `springboot-rocketmq-demo/src/main/java/com/example/rocketmqdemo/method2/Method2RocketMqConfig.java` - `springboot-rocketmq-demo/src/main/java/com/example/rocketmqdemo/method2/M2TxSendController.java` - `springboot-rocketmq-demo/src/main/java/com/example/rocketmqdemo/method2/M2OrderTxListener.java` - `springboot-rocketmq-demo/src/main/java/com/example/rocketmqdemo/method2/M2RefundTxListener.java` - `springboot-rocketmq-demo/METHOD2_DOC.md`(方式二解释文档) 事件模型: - `springboot-rocketmq-demo/src/main/java/com/example/rocketmqdemo/model/OrderCreatedEvent.java` RocketMQ 运行与 broker 配置: - `springboot-rocketmq-demo/docker-compose.yml` - `springboot-rocketmq-demo/rocketmq/broker.conf` - `springboot-rocketmq-demo/README.md` - `springboot-rocketmq-demo/INTERVIEW_NOTES.md`(中文面试笔记 + 流程图) ## 3) RTSP 转码服务:`rtspConvert/` ### 3.1 目标与特性 - 提供 HTTP API 接收 RTSP 输入流(常见摄像头 H.265/HEVC) - 用 FFmpeg 转码为 H.264 并推送到 RTSP Server(默认推荐 MediaMTX) ### 3.2 如何运行 安装依赖: ```powershell cd rtspConvert python -m venv .venv . .venv/Scripts/activate pip install -r requirements.txt ``` 启动服务: ```powershell cd rtspConvert uvicorn app.main:app --host 0.0.0.0 --port 8000 ``` RTSP Server(MediaMTX)搭建文档: - `rtspConvert/docs/rtsp-server-setup.md` ### 3.3 接口列表 - `POST /start`:启动转码任务(输入 RTSP -> 输出 RTSP) - `POST /stop/{job_id}`:停止任务 - `GET /jobs`:任务列表 - `GET /health`:健康检查 ### 3.4 关键代码文件(全部列出) - `rtspConvert/README.md` - `rtspConvert/requirements.txt` - `rtspConvert/app/main.py`(FastAPI 路由) - `rtspConvert/app/process.py`(FFmpeg 进程管理与转码参数) - `rtspConvert/docs/rtsp-server-setup.md`(MediaMTX 搭建说明)