# Aquaduct **Repository Path**: magic-flute_1/Aquaduct ## Basic Information - **Project Name**: Aquaduct - **Description**: 一个简单的用来替代flume完成mq->dfs的小工具 - **Primary Language**: Unknown - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2025-11-14 - **Last Updated**: 2025-12-09 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Aquaduct 这是一个很久很久之前写的小工具,当时只是满足自己使用场景.很多地方其实写的有问题(但我其实懒得改了),如果确实存在重大问题,欢迎批评指导. Kafka → Storm(LocalCluster) → 本地文件缓存 → HDFS 批量上传。可插拔“消息处理”和“HDFS 目录策略”,日志与告警开箱即用。 - 核心能力: - 从多个 Kafka 主题按不同消费组消费(KafkaSpout) - 可插拔的消息处理策略(默认:ETL JSON -> 前置两列 + 原始JSON) - 先写本地大文件,按策略周期性上传至 HDFS,降低小文件压力 - 运行指标通过 Dropwizard Metrics 输出日志;异常推送钉钉 - 运行模式:本地 Storm 集群(LocalCluster),无需独立 Storm 集群 --- ## 架构与数据流 ``` Kafka (topics, groups) └─> Storm KafkaSpout × N (按 jobs 配置) └─fieldsGrouping("group","topic") └─> KafkaMessageProcessor (Bolt) ├─ 消息处理策略(MessageProcessStrategy) ├─ 写入本地: ${local.log.dir.root.path}/${group}/${topic}/${topic}.log ├─ tick/批量阈值触发 flush └─ 周期批量上传到 HDFS: HdfsUploadDirStrategy 决定目录 ``` - ACK 语义:写本地成功即 ACK,不等待 HDFS 上传完成(追求吞吐;可能出现“已写本地但未及时上传”窗口)。 - 小文件控制:WriteIo 默认 256MB 或文件最近修改超过 1 小时触发归档上传。 --- ## 目录结构 ``` src/main/java/team/magic/flute/flume ├── TopologyMain.java # 程序入口 ├── bolt/KafkaMessageProcessor.java # 核心 Bolt ├── global/ # 扩展点接口 │ ├── MessageProcessStrategy.java │ └── HdfsUploadDirStrategy.java ├── impl/ # 默认策略实现 │ ├── EtlJsonMessageProcessStrategy.java │ ├── DefaultHdfsUploadDirStrategy.java │ └── SpecificPathHdfsUploadDirStrategy.java ├── commons/HdfsUtils.java # HDFS 工具 ├── model/{KafkaMessage,WriteIo}.java # 模型与本地IO └── utils/* # 配置/序列化/告警/指标 src/main/resources ├── config.json # 运行配置(发布时会复制到 target/conf) ├── log4j2.xml # 日志配置 ├── start.sh / stop.sh # 启停脚本(Linux) ``` --- ## 快速开始 - 前置条件: - JDK 8+(建议与本地 Maven/Storm 版本保持一致) - 可访问的 Kafka 集群;HDFS 客户端配置(core-site.xml/hdfs-site.xml)在运行机可见 - 钉钉机器人 webhook(可为空,生产环境请务必配置) - 构建(你已验证通过): - `mvn -DskipTests package` - 产物布局:`target/original-data.jar`、依赖在 `target/lib`、配置在 `target/conf` - 运行 - Linux(推荐使用脚本,需把 jar/lib/conf 放在同一目录) ```bash cd target bash ../src/main/resources/start.sh # 停止 bash ../src/main/resources/stop.sh ``` - 直接 java 命令 ```bash cd target java -cp .:original-data.jar:lib/* team.magic.flute.flume.TopologyMain -c conf/config.json ``` - Windows(分号分隔 classpath) ```bat cd target java -cp .;original-data.jar;lib/* team.magic.flute.flume.TopologyMain -c conf/config.json ``` --- ## 配置说明(config.json) 最小示例: ```json { "thread.num": 2, "ding.talk.url": "https://oapi.dingtalk.com/robot/send?access_token=xxxxxx", "local.log.dir.root.path": "/data/original-data-back", "msg.flush.batch.size": 1000, "flush.interval.seconds": 10, "flush.hdfs.interval.between.times": 1, "topology.max.spout.pending": 5000, "topology.worker.childopts": "-Xmx2048m", "topology.message.timeout.secs": 604800, "acker.num": 1, "file.sink.parallelism": 0, "storm.local.dir": "/data/storm-local", "jobs": [ { "bootstrap.server": "kafka1:9092,kafka2:9092", "topics": "trade,order,refund", "consumer.group": "original-prod" } ], "message.process.strategy.config": [ { "consumer.group": "original-prod", "topics": ["trade", "order"], "message.process.strategy.class": "team.magic.flute.flume.impl.EtlJsonMessageProcessStrategy", "init.param": "" } ], "hdfs.upload.dir.strategy.config": [ { "consumer.group": "original-prod", "topics": ["trade"], "hdfs.dir.location.strategy.class": "team.magic.flute.flume.impl.SpecificPathHdfsUploadDirStrategy", "init.param": "/user/hive/source_data/original-data-back" } ] } ``` 关键项解释: - 基本参数 - thread.num:KafkaSpout 并行度基数 - msg.flush.batch.size:Bolt 达到批量后写本地 - flush.interval.seconds:tick 周期,触发本地 flush - flush.hdfs.interval.between.times:累计 N 次 flush 后触发 HDFS 批量上传 - file.sink.parallelism:文件写入并行度;≤0 时默认 = jobs.size + thread.num - acker.num:Storm acker 数 - storm.local.dir:Storm 本地工作目录 - local.log.dir.root.path:本地缓存根目录 - ding.talk.url:钉钉机器人 webhook(强烈建议配置) - jobs(必填):每个元素对应一个 KafkaSpout - bootstrap.server:Kafka bootstrap servers - topics:逗号分隔主题(重复会自动去重) - consumer.group:消费组(与策略匹配键的一部分) - message.process.strategy.config(可选):消息处理策略 - 匹配键:`topic@consumer.group`,否则回退到 `default` - 默认实现:`team.magic.flute.flume.impl.EtlJsonMessageProcessStrategy` - 默认输出格式:`shopTag\u0001platform\u0001<原始JSON去换行>`(shopTag 从 headers.Shop-Tag/Tenant 取,platform 从 headers.Platform 取) - hdfs.upload.dir.strategy.config(可选):HDFS 目录策略 - 默认实现:`team.magic.flute.flume.impl.DefaultHdfsUploadDirStrategy` → `/user/hive/source_data/original-data-back/{topic}/dt=YYYY-MM-DD` - 可指定:`SpecificPathHdfsUploadDirStrategy`,init.param 传入 HDFS 根路径,实际目录为 `{root}/{topic}/dt=YYYY-MM-DD` --- ## 扩展指南(最少改动) - 自定义消息处理 ```java public class MyStrategy implements MessageProcessStrategy { public String process(Object input) throws Exception { /* 返回一行文本或 null */ } } ``` 配置加入:`message.process.strategy.config`,绑定 topics 与 consumer.group。 - 自定义 HDFS 目录 ```java public class MyDirStrategy implements HdfsUploadDirStrategy { public String getAbsoluteHdfsPath(Map params){ /* 返回绝对目录 */ } } ``` 配置加入:`hdfs.upload.dir.strategy.config`,必要时通过 `init.param` 传构造参数。 --- ## 运行时行为与运维 - 本地缓存路径:`{local.log.dir.root.path}/{group}/{topic}/{topic}.log` - 归档与上传:文件 ≥256MB 或最近修改超过 1 小时进入上传队列;上传成功后目录内所有文件会尝试再上传一次,尽量清空积压 - HDFS 连接:`HdfsUtils` 使用 Hadoop 缺省配置与用户 `hive` 连接 - 请确保运行环境能读取到 `core-site.xml / hdfs-site.xml`(可通过 HADOOP_CONF_DIR 或放到 classpath) - 指标:日志里每 30 秒输出一次 `msg-process-speed`(速率/计数) - 日志:log4j2 默认输出到控制台与 `logs/original-info.log`,按天滚动 + 100MB 分片 - 告警:初始化失败、本地写失败、HDFS 上传失败都会推送钉钉(需配置 `ding.talk.url`) --- ## 常见问题(FAQ) - 启动报“无法读取配置文件” - 使用 `-c /path/to/config.json` 指定绝对路径;或把 config.json 放在 `target/conf` 并按上面命令启动 - HDFS 找不到 NameNode - 确认运行机能加载 Hadoop 配置;必要时设置 `HADOOP_CONF_DIR` 指向包含 `core-site.xml/hdfs-site.xml` 的目录 - 小文件过多 - 适当调大 `msg.flush.batch.size` 和 `flush.hdfs.interval.between.times`,并确保本地磁盘容量充足 - Windows 路径 - JSON 里使用双反斜杠,例如:`"E:\\var\\test-original-data-back"` ---