# pyflink-etc-java **Repository Path**: jiang-linwen/pyflink-etc-java ## Basic Information - **Project Name**: pyflink-etc-java - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-08-08 - **Last Updated**: 2025-08-08 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # ETC Java 实时数据处理系统 基于Apache Flink的ETC(Electronic Toll Collection)实时数据处理系统,从PyFlink转换为Java版本。 ## 项目概述 该项目是一个实时流处理系统,用于处理ETC交易数据,包括: - 车辆轨迹预测 - 异常检测(重复交易、误交易、预测错误等) - 数据修复和补全 - 实时数据流处理和存储 ## 系统架构 ``` Kafka Source → 数据预处理 → 交易对比 → 数据修复 → Kafka Sink ↓ ↓ ↓ ↓ 原始数据 错误数据流 超时处理 历史数据 ``` ## 核心组件 ### 数据模型 - `CarInfo`: 车辆交易信息 - `CarPredictInfo`: 车辆预测信息 - `EtcState`: ETC状态数据 - `PredictPath`: 预测路径信息 ### 处理函数 - `SplitProcessFunction`: 数据分流和预处理 - `CompareFunction`: 交易对比和异常检测 - `RepairFunction`: 数据修复(待实现) - `TimeOutFunction`: 超时处理(待实现) ### 配置管理 - `Settings`: 统一配置管理 - `application.properties`: 配置文件 ## 快速开始 ### 环境要求 - Java 8 或更高版本 - Apache Maven 3.6+ - Apache Flink 1.18.1+ - Apache Kafka 2.8+ - Redis 6.0+ ### 构建项目 ```bash # 克隆项目(如果需要) cd /workdir/jbgs/pyflink-etc-java # 构建项目 ./build.sh # 运行测试(可选) ./build.sh --with-tests ``` ### 运行项目 #### 集群模式(推荐) ```bash # 使用默认配置运行 ./run.sh # 指定Flink集群地址 ./run.sh --host 10.10.119.208:9081 # 指定并行度 ./run.sh --parallelism 2 ``` #### 本地模式 ```bash ./run.sh --mode local ``` ### 配置文件 主要配置文件位于 `src/main/resources/application.properties`: ```properties # Kafka配置 kafka.bootstrap.servers=10.10.119.208:9094 kafka.group.id=test_test_group # Kafka主题配置 kafka.topics.etc.trade=etcTradeData kafka.topics.origin.data=originData kafka.topics.origin.err.data=originErrData kafka.topics.history.data=historyData # Redis配置 redis.primary.host=10.10.119.205 redis.primary.port=6379 redis.primary.database=0 # ClickHouse配置 clickhouse.primary.host=localhost clickhouse.primary.port=9000 clickhouse.primary.user=default clickhouse.primary.database=default # Flink配置 flink.parallelism=1 flink.checkpoint.interval=300000 ``` ## 命令行使用 ### 构建脚本选项 ```bash ./build.sh [--with-tests] ``` ### 运行脚本选项 ```bash ./run.sh [选项] 选项: -h, --host HOST Flink集群地址 (默认: 10.10.119.208:9081) -j, --jar JAR_FILE JAR文件路径 (默认: 自动查找target目录) -m, --mode MODE 运行模式: cluster/local (默认: cluster) -p, --parallelism NUM 并行度 (默认: 1) --help 显示帮助信息 ``` ## 数据流处理 ### 处理流程 1. **数据接入**: 从Kafka主题读取原始ETC交易数据 2. **数据预处理**: 验证数据格式,分离正常和异常数据 3. **轨迹比较**: 基于历史数据和预测路径进行轨迹比较 4. **异常检测**: 识别重复交易、误交易、预测错误等异常情况 5. **数据输出**: 将处理结果写入不同的Kafka主题 ### 异常类型 - `NORMAL`: 正常交易 - `REPEAT`: 重复交易 - `JAMREPEAT`: 拥堵重复交易 - `PROVINCEBACKUP`: 省界备份交易 - `REPEATMISTAKE`: 重复误交易 - `POSTMISTAKE`: 后续误交易 - `ERRPREDICT`: 预测错误 - `ERRTRADE`: 异常交易 - `TIMEOUT`: 超时 - `VIRTUAL`: 虚拟门架 ## 监控和日志 ### 日志配置 系统使用SLF4J + Logback进行日志管理,日志级别可通过配置文件调整: ```properties logging.level.root=INFO logging.level.com.jbgs.etc=DEBUG ``` ### Flink作业监控 访问Flink Web UI监控作业状态: ``` http://10.10.119.208:9081 ``` ## 开发指南 ### 项目结构 ``` src/main/java/com/jbgs/etc/ ├── MainApplication.java # 主应用程序 ├── config/ # 配置管理 │ └── Settings.java ├── models/ # 数据模型 │ ├── CarInfo.java │ ├── CarPredictInfo.java │ ├── EtcState.java │ └── ... ├── sources/ # 数据源 │ └── KafkaSourceFactory.java ├── sinks/ # 数据接收器 │ └── KafkaSinkFactory.java ├── transforms/ # 转换函数 │ ├── SplitProcessFunction.java │ ├── CompareFunction.java │ └── ... └── utils/ # 工具类 ├── DateUtils.java └── ConstValues.java ``` ### 扩展开发 1. 添加新的数据转换函数到 `transforms` 包 2. 扩展数据模型到 `models` 包 3. 添加新的配置项到 `application.properties` 4. 更新 `Settings.java` 以支持新的配置 ## 故障排查 ### 常见问题 1. **无法连接Flink集群** - 检查Flink集群地址和端口 - 确认Flink集群正在运行 2. **Kafka连接失败** - 验证Kafka集群地址和主题配置 - 检查网络连通性 3. **Redis连接异常** - 确认Redis服务状态 - 检查Redis配置和权限 4. **内存不足** - 调整Flink TaskManager内存配置 - 减少并行度或优化代码 ### 日志查看 ```bash # 查看Flink作业日志 flink list -m 10.10.120.29:8081 # 查看特定作业的日志 flink run-application --help ``` ## 从PyFlink迁移 该Java版本保持了与原PyFlink版本相同的处理逻辑和数据流结构: - 保留了所有核心业务逻辑 - 兼容相同的Kafka主题和数据格式 - 支持相同的配置参数 - 提供相同的功能特性 ## 许可证 该项目遵循内部开源许可证。 ## 贡献 请遵循公司代码规范和提交流程进行开发和提交。