# 流计算 **Repository Path**: tan_yu_hung/flow-calculation ## Basic Information - **Project Name**: 流计算 - **Description**: 青训营项目-大数据 - **Primary Language**: Python - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 0 - **Created**: 2025-02-26 - **Last Updated**: 2025-03-21 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 流计算 - 青训营大数据项目 ## 项目介绍 **简易流式计算框架**,实现基于事件时间的单词实时计数(WordCounter)功能,支持乱序数据处理与窗口聚合计算。 --- ## 软件架构 ``` flow-calculation/ ├── sources/ # 数据源模块 │ ├── generators.py # Kafka数据生成器 │ ├── kafka.py # Kafka数据消费者 │ └── __init__.py ├── framework/ # 流计算框架核心 │ ├── dag.py # DAG引擎 │ ├── operator.py # 算子库 │ ├── watermark.py # 水印生成器 │ ├── windows.py # 时间窗口管理 │ ├── manager.py # 自动故障恢复 │ └── __init__.py ├── producer_main.py # 生产者入口 ├── streaming_app.py # 流处理主程序 ├── config.py # 配置文件 └── data.txt # 计算结果输出文件 ``` ## 主要功能介绍 ### framework **1.DAG 引擎 (dag.py)** ​功能: - 通过 Node 类定义数据处理节点(如 Source、Map、KeyBy、Reduce、Sink) - 支持构建拓扑结构,通过 add_edge 连接节点形成处理流水线 - 集成水印生成器与事件时间滚动窗口,处理乱序数据 ​关键逻辑: - 递归执行节点,根据节点类型调用对应操作(如映射、分组、聚合) - 自动生成唯一事件ID,避免重复处理 - 窗口触发时统计词频并输出结果 **2.算子库 (operator.py)** ​内置算子: - ​MapOperator:数据映射(如提取字段) - ​KeyByOperator:按键分组(默认以 word 字段分组) - ​ReduceOperator:聚合计算(默认累加计数) - ​FileSink:结果写入文件(支持JSON格式) - ​RetractOperator:系统能够正确处理撤销数据 - ExactlyOnceSink:系统内实现 exactly once 计算语义 ​扩展性:可通过自定义函数灵活配置算子逻辑 **3.水印生成器 (watermark.py)** 功能: - 跟踪事件时间进度,计算水印值(当前最大时间 - 最大延迟) - 用于确定窗口计算的触发时机 ​关键方法: - update() 更新最大事件时间 - value 属性获取当前水印值 **4.事件时间窗口 (windows.py)** 功能: - 实现滚动窗口​(Tumbling Window),按事件时间划分窗口 - 缓冲数据直至水印超过窗口结束时间,触发聚合计算 ​核心逻辑: - assign_window() 分配记录到对应窗口 - check_trigger() 检查水印触发条件,输出窗口统计结果 **_主要特性_** - **1.​乱序事件处理​** 通过水印机制和允许延迟(allowed_lateness),容忍迟到数据,确保计算准确性。 - **2.​窗口化聚合​** 支持按事件时间划分窗口,统计窗口内数据(如单词计数),适用于实时流式分析。 - **​3.可扩展流水线​** 用户可通过组合 Map、KeyBy、Reduce 等算子,自定义数据处理逻辑(如词频统计、会话分析)。 **5.检查点恢复(manager.py)** CheckpointManager 功能: - 负责保存和恢复每个算子的状态。将其扩展为支持持久化存储(如文件或数据库)。 ​核心逻辑: - _load_checkpoints():从文件加载已保存的检查点数据(使用pickle反序列化)。 - save_checkpoint(node_name, state):将某个节点的状态保存到文件。 - restore_checkpoint(node_name):读取某个节点的历史状态。 数据存储方式: -检查点数据以字典形式保存,键为节点名(node_name),值为节点状态(state),最终通过pickle序列化到文件。 RecoveryManager 功能: - 系统重启时恢复每个算子的状态。。 ​核心逻辑: - recover():遍历DAG中的所有节点,从CheckpointManager中读取检查点状态并注入到节点中(假设节点有func.state属性)。 ### sources **1.Kafka 数据生成器 (generators.py)** 功能: - 模拟实时数据流,生成带乱序时间戳的测试数据(如随机单词事件)。 - 自动将数据发送至 Kafka 指定主题,用于开发和调试流处理逻辑。 ​关键实现: - ​时间偏移机制:生成数据时,时间戳在 [-20, 20] 秒范围内随机偏移,模拟现实场景中的网络延迟或乱序事件。 - ​生产者配置:通过 confluent_kafka.Producer 实现高效异步消息发送,支持回调函数处理发送状态。 - ​持续运行:按指定间隔(interval)循环生成数据,支持 KeyboardInterrupt 安全停止。 示例数据格式 ``` { 'word': 'apple', # 随机选择的单词 'count': 1, # 固定计数 'timestamp': '2024-05-21T10:00:05.123' # 带偏移的事件时间(ISO 格式) } ``` **2.Kafka 数据源 (kafka.py)** ​功能: - 作为 DAG 的 ​Source 算子,从 Kafka 主题持续消费数据,为流水线提供输入流 - 反序列化数据并转换时间戳类型,与框架的事件时间处理逻辑无缝集成 ​关键实现: - ​消费者配置:支持消费者组管理(group.id)、从最早偏移量消费(auto.offset.reset=earliest)。 - ​迭代器模式:通过 __iter__ 方法实现生成器,逐条返回解析后的数据记录。 - ​异常处理:捕获 KafkaException 并优雅关闭消费者,确保资源释放。 ### 配置文件 (config.py) 功能: - 集中管理 Kafka 连接参数(服务器地址、主题、消费者组,提升代码可维护性。 ### 生产者入口 (producer_main.py) 功能: - 启动 Kafka 生产者,生成模拟数据流用于测试。 ### 流处理主程序 (streaming_app.py) ​功能: - 构建 DAG 流水线,从 Kafka 消费数据并执行实时处理。 ## 项目启动方式 1. 先启动producer_main.py。`python producer_main.py` 2. 再启动streaming_app.py。`python streaming_app.py` 3. 项目根目录下生成的data.txt为sink算子输出的计算结果。