# flink-java **Repository Path**: jiang-linwen/flink-java ## Basic Information - **Project Name**: flink-java - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-08-20 - **Last Updated**: 2025-08-20 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # ETC Java 版本数据处理系统 现在 Java 版本已经完全仿照 Python 版本添加了数据生成功能!**已修复 Metaspace 内存溢出问题**,使用 Flink 命令行提交方式与 Python 版本完全一致!以下是完整的运行指令: ## 📊 数据流架构 (完全仿照 Python 版本) **完整数据流向(含预测路径机制):** ``` [ETC交易原始数据] ↓ [Kafka输入: test_trade_data1] ↓ [Flink Source: KafkaSource消费] ↓ [预处理算子: SplitProcessFunction] ├─ 主流 → [交易对比算子: CompareFunction] │ │ │ ├─ 预测路径生成机制: │ │ ├─ Redis历史路径缓存 (优先) │ │ ├─ 拓扑图随机预测 (降级) │ │ └─ ML模型预测 (可选) │ │ │ ├─ 路径验证对比: │ │ ├─ 空间维度: 实际门架 vs 预测门架 │ │ ├─ 时间维度: 到达时间 vs 预测时间 (±90秒) │ │ └─ 路径合法性: 是否在拓扑图中存在 │ │ │ ├─ 主流 → [数据修复算子: RepairFunction] │ │ │ ├─ BFS路径搜索算法 (异常数据修复) │ │ │ ├─ 距离/速度/耗时计算 │ │ │ └─ 22种错误类型处理 │ │ ↓ │ │ [Kafka输出: test_history_data1] ← 核心处理结果 │ │ ↓ │ │ [ClickHouse: kafka_history_data1 → mv_history_data1 → history_data1] │ │ │ └─ 侧流 → [延迟定时器算子: TimeOutFunction] │ │ ├─ 动态超时计算 (基础时间+20%缓冲) │ │ ├─ 无出口检测 (最大7小时) │ │ └─ 分级延迟处理 (level 1-10) │ ↓ │ [Kafka输出: test_history_data1] ← 延迟处理结果 │ ├─ 侧流1 → [Kafka输出: test_origin_data1] ← 原始数据备份 │ ↓ │ [ClickHouse: kafka_origin_data1 → mv_origin_data1 → origin_data1] │ └─ 侧流2 → [Kafka输出: test_origin_err_data1] ← 错误数据记录 ↓ [ClickHouse: kafka_origin_err_data1 → mv_origin_err_data1 → origin_err_data1] ``` **核心算子功能:** 1. **SplitProcessFunction (预处理)**:JSON 解析、车牌验证、数据分流 2. **CompareFunction (交易对比)**:路径预测、错误类型识别、状态管理 3. **RepairFunction (数据修复)**:距离计算、速度分析、路径补全 4. **TimeOutFunction (延迟处理)**:超时检测、无出口判断、定时器管理 **错误类型分类 (与 Python 版本完全一致):** - `NORMAL(0)`: 正常交易 - `ERRPREDICT(1)`: 路径预测错误 - `TIMEOUT(2)`: 时间偏差过大 - `NEWNOTENTRY(10)`: 新行程无入口 - `ERRTRADE(11)`: 异常交易 - `REPEAT(30)`: 重复交易 - `VIRTUAL(32)`: 虚拟门架 - `JAMREPEAT(33)`: 拥堵重复交易 - 等 22 种错误类型... **Redis 缓存架构:** - **历史路径缓存**:Key={carid}, Field={flagid}, Value=路径 JSON 数组 - **预测优先级**:Redis 历史数据 > 拓扑图随机 > ML 模型预测 - **数据结构**:`{carid: {flagid: [{"flagid","distance","speed","spendtime"}]}}` - **服务器**:`10.10.119.205:6379` **ClickHouse 存储架构原理:** ``` 为什么不直接 Flink → ClickHouse? 传统方式:[Flink] → [ClickHouse JDBC连接] 缺点:- 需要处理连接池、事务、重试 - Flink故障时可能丢失数据 - 直接耦合,扩展性差 当前架构:[Flink] → [Kafka] → [ClickHouse Kafka引擎表] → [物化视图] → [MergeTree表] 优点:- 解耦:Flink只负责计算,ClickHouse独立消费 - 可靠性:Kafka作为消息缓冲,避免数据丢失 - 扩展性:可以多个消费者并行处理 - 容错性:ClickHouse故障不影响Flink,数据在Kafka中保留 ``` **数据库**:`test_data`,服务器:`10.10.119.208:8123` ## 📦 1. 打包项目 ```bash cd /root/pyflink-etc-java chmod +x build.sh ./build.sh ``` ## 🗄️ 2. 清空 Kafka 主题(重新运行前清理数据) # 方式二:通过 Kafka UI 手动删除(简单方式) ## 🚀 3. 生成测试数据到 Kafka(完全仿照 Python 版本) echo -e "2021-05-01 10:00:00\n2021-05-07 11:30:00" | ./generate-data.sh direct # 使用 flink 命令行提交作业到集群(完全仿照 Python 版本提交方式) ./run.sh 当前数据生成命令: # 生成单天数据 (May 1st) ./generate-data.sh direct "2021-05-01 00:00:00" "2021-05-01 23:59:59" # 生成多天数据 (May 1-7) ./generate-data.sh direct "2021-05-01 00:00:00" "2021-05-07 23:59:59" # 交互式模式 ./generate-data.sh direct