# FlinkCEP **Repository Path**: 36436022/flink-cep ## Basic Information - **Project Name**: FlinkCEP - **Description**: electronic电力数据运行 FLINK CEP检测部分,负责从kafka端获取两个数据按照时间戳对齐,进行检测 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2025-10-07 - **Last Updated**: 2025-10-07 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # FlinkCEP - 基于Flink的复杂事件处理系统 ## 项目简介 FlinkCEP是一个轻量级、结构清晰的基于Apache Flink的复杂事件处理(CEP)系统。该系统负责接收Kafka传来的事件数据,通过预定义的CEP模式进行实时分析,并将匹配的告警结果通过HTTP POST请求发送到后端系统。 ## 系统架构 ``` Kafka → Flink CEP → HTTP Backend ↓ ↓ ↓ 事件流 模式匹配 告警通知 ``` ## 功能特性 - **实时事件处理**: 从Kafka实时消费事件数据 - **复杂事件模式**: 支持多种预定义的CEP模式 - **告警生成**: 自动生成结构化的告警信息 - **HTTP通知**: 将告警通过POST请求发送到后端 - **容错机制**: 支持检查点和重启策略 - **配置化**: 支持外部配置文件 ## 支持的CEP模式 1. **高频错误模式**: 检测30秒内同一用户的3次以上ERROR事件 2. **异常登录模式**: 检测5分钟内同一用户的5次以上登录失败 3. **系统异常模式**: 检测CRITICAL事件后紧跟的多个ERROR事件 4. **数据异常模式**: 检测10秒内同一数据源的多次异常事件 5. **性能降级模式**: 检测性能警告后的系统错误 6. **安全威胁模式**: 检测30秒内的多次安全相关事件 ## 环境要求 - Java 8+ - Apache Flink 1.17.x - Apache Kafka 3.1.3 - Maven 3.6+ ## 快速开始 ### 1. 编译项目 ```bash mvn clean package ``` ### 2. 配置文件 编辑 `src/main/resources/application.properties` 文件,配置Kafka和HTTP后端信息: ```properties # Kafka配置 kafka.bootstrap.servers=localhost:9092 kafka.group.id=flink-cep-consumer kafka.topic=input-events # HTTP后端配置 http.backend.url=http://localhost:8080/api/events ``` ### 3. 启动应用 #### 本地运行 ```bash java -cp target/flink-cep-1.0.0.jar com.electric.flinkcep.FlinkCEPApplication ``` #### Flink集群运行 ```bash flink run target/flink-cep-1.0.0.jar ``` ## 事件数据格式 输入的Kafka消息应为JSON格式: ```json { "id": "event-001", "type": "LOGIN_FAILURE", "source": "AUTH_SERVICE", "timestamp": 1640995200000, "data": "用户登录失败", "severity": "ERROR", "userId": "user123" } ``` ### 字段说明 - `id`: 事件唯一标识符 - `type`: 事件类型 - `source`: 事件来源 - `timestamp`: 事件时间戳 - `data`: 事件详细数据 - `severity`: 严重级别 (INFO, WARNING, ERROR, CRITICAL) - `userId`: 用户ID ## 告警输出格式 生成的告警将以JSON格式发送到HTTP后端: ```json { "alertId": "high-frequency-error-1640995200000-abc12345", "patternName": "high-frequency-error", "alertType": "HIGH_FREQUENCY_ERROR", "severity": "HIGH", "description": "检测到高频错误事件,在30秒内发生了3次ERROR级别事件", "timestamp": 1640995200000, "userId": "user123", "matchedEvents": [...], "metadata": "{\"eventCount\": 3, \"timeSpan\": 25000}" } ``` ## 配置说明 ### Kafka配置 - `kafka.bootstrap.servers`: Kafka服务器地址 - `kafka.group.id`: 消费者组ID - `kafka.topic`: 消费的主题名称 - `kafka.auto.offset.reset`: 偏移量重置策略 ### HTTP配置 - `http.backend.url`: 后端API地址 - `http.timeout.connection`: 连接超时时间(毫秒) - `http.timeout.socket`: 读取超时时间(毫秒) ### Flink配置 - `flink.parallelism`: 并行度 - `flink.checkpoint.interval`: 检查点间隔(毫秒) - `flink.checkpoint.mode`: 检查点模式 ### CEP配置 - `cep.pattern.timeout`: 模式超时时间(毫秒) - `cep.pattern.name`: 模式名称 ## 监控和日志 ### 日志配置 日志配置文件位于 `src/main/resources/log4j2.xml`,支持: - 控制台输出 - 文件滚动输出 - 错误日志单独记录 ### 日志文件位置 - 应用日志: `logs/flink-cep.log` - 错误日志: `logs/flink-cep-error.log` ## 开发指南 ### 添加新的CEP模式 1. 在 `CEPPatternService` 类中添加新的模式定义方法 2. 在 `FlinkCEPApplication` 类中添加模式应用方法 3. 在 `AlertProcessFunction` 类中添加对应的告警类型处理 ### 自定义事件处理 可以通过继承 `PatternProcessFunction` 类来实现自定义的事件处理逻辑。 ## 故障排除 ### 常见问题 1. **Kafka连接失败** - 检查Kafka服务是否启动 - 验证bootstrap.servers配置 - 确认网络连通性 2. **HTTP请求失败** - 检查后端服务是否可用 - 验证URL配置 - 检查网络防火墙设置 3. **内存不足** - 调整JVM堆内存大小 - 优化并行度配置 - 检查检查点配置 ### 调试模式 启用DEBUG日志级别: ```properties logging.level=DEBUG ``` ## 性能优化 1. **并行度调优**: 根据集群资源调整并行度 2. **检查点优化**: 合理设置检查点间隔 3. **内存管理**: 配置合适的JVM参数 4. **网络优化**: 优化Kafka和HTTP连接参数 ## 版本历史 - v1.0.0: 初始版本,支持基本的CEP模式和HTTP通知 ## 许可证 本项目采用Apache 2.0许可证。 ## 联系方式 如有问题或建议,请联系开发团队。