# flink-clickhouse-sink **Repository Path**: zwjdcoding/flink-clickhouse-sink ## Basic Information - **Project Name**: flink-clickhouse-sink - **Description**: Flink ClickHouse Sink:生产级高可用写入方案 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-02-24 - **Last Updated**: 2026-02-24 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Flink ClickHouse Sink(参考:得物技术生产级方案) 该项目是一个 **Flink Sink(Sink2 API, Flink 1.15.x)**,将 `RowData` 写入 ClickHouse,包含: - 本地表写入(local table)+ 应用侧负载均衡/故障剔除 - 攒批:**按字节大小** + **按时间** 双触发 - 有界队列 + 线程池并发控制 - Future 超时 + 指数退避重试 - Checkpoint 时 `flushAll()` 等待所有写入完成(保证语义) - 两种模式: - `ExceptionsThrowableSink`:抛异常(At-Least-Once) - `UnexceptionableSink`:吞异常(At-Most-Once) ## 快速开始(Docker ClickHouse) 你可以用两个节点模拟: ```bash docker run -d --name ck1 --ulimit nofile=262144:262144 --restart always \ -p 0.0.0.0:8123:8123 -p 0.0.0.0:9000:9000 \ -e CLICKHOUSE_PASSWORD=123456 clickhouse/clickhouse-server:latest docker run -d --name ck2 --ulimit nofile=262144:262144 --restart always \ -p 0.0.0.0:8124:8123 -p 0.0.0.0:9001:9000 \ -e CLICKHOUSE_PASSWORD=123456 clickhouse/clickhouse-server:latest ``` 初始化(HTTP 方式): ```bash ./check_and_fix.sh ``` ## 运行 Example IDE 里直接运行: - `com.dewu.flink.clickhouse.sink.example.ClickHouseSinkExample` - `com.dewu.flink.clickhouse.sink.example.ClickHouseDistributedTableExample`(分布式表写入示例,需要 ClickHouse 侧已配置 cluster) ## 自动建表 / 分布式表支持 从 `1.0.0` 起(本仓库改造后)支持在写入前按需执行 DDL: - `autoCreateDatabase=true`:自动 `CREATE DATABASE IF NOT EXISTS ...` - `autoCreateTable=true`:自动 `CREATE TABLE IF NOT EXISTS ...`(列定义来自 `config.columns`,未配置则沿用示例三列) - `autoCreateDistributedTable=true`:自动创建 `ENGINE = Distributed(...)` 的分布式表 - `writeToDistributedTable=true`:写入分布式表(ClickHouse 负责转发到本地表) 注意: - 若要创建/写入分布式表,必须配置 `cluster`(对应 ClickHouse `system.clusters` 的 cluster 名称)。 - 建议 `targetTable` 仍表示本地表名,`distributedTable` 单独配置分布式表名。 ## 分表(按 application 动态表名) 对齐文章的“按应用维度分表”做法:通过 `ShardingStrategy` 决定写入目标表名,并且在 `autoCreateTable=true` 时会自动创建对应表。 - 示例策略:`ApplicationTableShardingStrategy`(将 `application` 规范化为 `tb_logs_{application}`) - `targetTable` 可以设置为模板(包含 `%s`),例如:`tb_logs_%s` ## 重要说明:HTTP(8123) vs Native(9000) - ClickHouse 的 **HTTP** 端口是 `8123`(ck2 映射为 `8124`)。 - ClickHouse 的 **Native/TCP** 端口是 `9000`(ck2 映射为 `9001`)。 本项目默认示例使用 **HTTP 端口**(与 PDF 里示例配置一致)。 如果你在某些 clickhouse-jdbc 版本上遇到 `Magic is not correct`,那是 **Native 握手打到了 HTTP 端口** 的典型症状。 > 对应的技术文章见:[mp.weixin.qq.com](https://mp.weixin.qq.com/s/DLFf7Z4DYPHV0PSXaA023g)