# FlinkSourceToTarget **Repository Path**: Freshman_Qin/FlinkSourceToTarget ## Basic Information - **Project Name**: FlinkSourceToTarget - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-01-30 - **Last Updated**: 2026-01-30 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Flink PostgreSQL CDC 实时监听工具 ## 介绍 本项目是一个基于 Apache Flink 和 Flink CDC 的实时数据监听工具,用于捕获 PostgreSQL 数据库的变更事件。利用 CDC (Change Data Capture) 技术,可以实时监听 PostgreSQL 数据库中的增删改操作,并将数据变更实时输出到控制台。当前版本实现了从 PostgreSQL 到控制台的数据流,可作为构建更复杂数据同步管道的基础。 ## 软件架构 - **Apache Flink**: 分布式流处理框架 - **Flink CDC**: Change Data Capture 组件,用于捕获数据库变更 - **PostgreSQL CDC Connector**: 用于连接 PostgreSQL 数据库并捕获 WAL 日志中的变更 - **PostgreSQL**: 源数据库,作为数据变更的源头 ## 环境要求 - Java 11+ - Maven 3.6+ - PostgreSQL (支持 CDC 功能) - Flink 1.20+ ## 安装教程 1. 克隆项目代码到本地 ```bash git clone https://github.com/your-repo/flink-pg-cdc-listener.git cd flink-pg-cdc-listener ``` 2. 编译打包项目 ```bash mvn clean package -DskipTests ``` 3. 确保 PostgreSQL 服务正常运行 ## 配置说明 目前配置参数直接硬编码在 [Main](file:///media/USER538291/数据盘1/Github/FlinkSourceToTarget/src/main/java/com/flink/cdc/Main.java#L10-L70) 类中,主要包括: ### PostgreSQL 源端配置 - hostname: localhost - port: 15432 - username: postgres - password: qinxianbo - database-name: postgres - schema-name: public - table-name: players3 - slot.name: flink_slot_auto2 - decoding.plugin.name: pgoutput - scan.startup.mode: initial ## 使用说明 1. 根据实际环境修改 [Main](file:///media/USER538291/数据盘1/Github/FlinkSourceToTarget/src/main/java/com/flink/cdc/Main.java#L10-L70) 类中的数据库连接参数 2. 编译项目 ```bash mvn clean package ``` 3. 提交 Flink 作业 ```bash flink run -c com.flink.cdc.Main ./target/cdc-pg2clickhouse-1.0-SNAPSHOT.jar ``` 4. 观察日志输出,确认数据监听任务正常运行 ## 数据表结构 程序会监听 PostgreSQL 中的 `players3` 表,该表包含以下字段: - player_id (INT, 主键) - team_id (INT) - player_name (VARCHAR) - height (DOUBLE) ## 注意事项 1. 确保 PostgreSQL 已启用逻辑复制(logical replication),在 postgresql.conf 中设置 `wal_level = logical` 2. 需要为 PostgreSQL 创建复制槽(replication slot)并授予相应权限 3. 生产环境中应避免在代码中硬编码敏感信息,建议使用配置文件或环境变量 4. 本工具当前仅实现监听功能,若需写入其他目标系统(如ClickHouse),需在此基础上扩展 ## 项目特点 - 实时性高:基于 CDC 技术,实现准实时数据监听 - 可靠性强:具备容错和恢复能力 - 易于扩展:可轻松扩展支持更多表的监听或添加数据输出目标 - 支持全量+增量:初始模式支持全量读取已有数据,后续增量监听变更