# flink实时写入达梦数据库 **Repository Path**: Carlosg_admin/flink-connector-jdbc-dm8 ## Basic Information - **Project Name**: flink实时写入达梦数据库 - **Description**: 扩展flink-connector-jdbc组件功能,使得实时可以实时写入达梦V8数据库。 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 12 - **Forks**: 9 - **Created**: 2023-11-09 - **Last Updated**: 2025-08-12 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 技术背景 随着 Oracle、SAP、英特尔等科技公司宣布**企业不会继续在俄罗斯开展正常业务**,一定程度上也给了我们一个警示,提高我们整体科技水平至关重要,从芯片上就是一个很好的例子。**国产软件替代至关重要,大国发展要懂得居安思危**!达梦国产数据库经过40年的发展,已经成功应用于中国国内金融、能源、航空、通信等数十个领域。因此要基于Flink计算引擎,面向达梦数据库做实时数据的开发就是一个非常有价值的事情,也是一个迫切的需求。 ​ 经过本人近一年不懈的钻研与调测,终于在 **flink-connector-jdbc(3.1.0和3.1.1版本)**中开发出了支持写入DM8数据库的功能,本文将说明扩展后的flink-connector-jdbc如何高效写入国产数据库达梦(V8)。以下是达到的效果: ![image-20231109171959241](images/image-20231109171959241.png) ### 相关脚本 #### FlinkSQL建表语句 ```sql create database watermark_db; use watermark_db; -- 采集MySQL DROP TABLE IF EXISTS table_process_cdc; CREATE TABLE IF NOT EXISTS table_process_cdc ( id bigint COMMENT '自增主键id' , `name` string COMMENT '名称' , age bigint COMMENT '年龄' , gender bigint COMMENT '性别,0-男生,1-女生' , `address` string COMMENT '住址' , create_time timestamp(3) COMMENT '创建时间' , update_time timestamp(3) COMMENT '更新时间' -- 声明 update_time 是事件时间属性,并且用延迟5秒的策略来生成watermark ,WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND ,PRIMARY KEY(id) NOT ENFORCED ) COMMENT '配置表' WITH ( 'connector' = 'mysql-cdc' ,'hostname' = 'hadoop105' ,'port' = '3306' ,'username' = 'root' ,'password' = 'root' ,'server-time-zone' = 'UTC+8' ,'scan.incremental.snapshot.enabled' = 'true' ,'scan.startup.mode' = 'earliest-offset' ,'database-name' = 'testdb' ,'table-name' = 'table_process_o' ); -- kafka_dm映射表 DROP TABLE IF EXISTS ods_table_process_dm; CREATE TABLE IF NOT EXISTS ods_table_process_dm ( id bigint COMMENT '自增主键id' , `name` string COMMENT '名称' , age bigint COMMENT '年龄' , gender bigint COMMENT '性别,0-男生,1-女生' , `address` string COMMENT '住址' , create_time timestamp(3) COMMENT '创建时间' , update_time timestamp(3) COMMENT '更新时间' , kafka_time timestamp(3) COMMENT '进入Kafka的时间' -- 声明 update_time 是事件时间属性,并且用延迟5秒的策略来生成watermark ,WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND ,PRIMARY KEY(id) NOT ENFORCED ) COMMENT '配置表' WITH ( 'connector' = 'kafka' ,'topic' = 'ods_table_process_dm' ,'properties.bootstrap.servers' = 'hadoop105:9092' ,'properties.group.id' = 'my_group_id' ,'value.format' = 'debezium-json' ,'scan.startup.mode' = 'earliest-offset' ,'value.debezium-json.ignore-parse-errors' = 'true' ,'value.debezium-json.timestamp-format.standard' = 'ISO-8601' ); -- 写入JDBC(DM8数据库) DROP TABLE IF EXISTS table_process_sink; CREATE TABLE IF NOT EXISTS table_process_sink ( ID bigint , NAME string , AGE bigint , GENDER bigint , `ADDRESS` string , CREATE_TIME timestamp(3) , UPDATE_TIME timestamp(3) , KAFKA_TIME timestamp(3) COMMENT '进入kafka的时间' , ETL_TIME timestamp(3) COMMENT '进入jdbc的时间' -- 声明 UPDATE_TIME 是事件时间,并且用 延迟5秒 的策略来生成 watermark ,WATERMARK FOR UPDATE_TIME AS UPDATE_TIME - INTERVAL '5' SECOND ,PRIMARY KEY (ID) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:dm://192.168.137.103:5236', 'username' = 'DMHS', 'password' = 'Dmhs_1234', 'table-name' = 'TABLE_PROCESS', 'driver' = 'dm.jdbc.driver.DmDriver' ); ``` > 注意: 演示部分为FlinkCDC实时读取mysql数据库的binlog,写入kafka的主题,然后FlinkSQL基于flink-connector-jdbc 组件消费Kafka主题数据,写入达梦V8数据库,并支持实时同步数据库增,删,改操作。 #### FlinkJOB作业 ```sql set 'pipeline.name' = 'table_process_sink_达梦'; SET 'table.local-time-zone' = 'Asia/Shanghai'; set 'parallelism.default' = '1'; SET 'pipeline.operator-chaining' = 'false'; set 'execution.runtime-mode' = 'streaming'; SET 'table.exec.source.idle-timeout' = '10s'; SET 'execution.checkpointing.interval' = '5min'; SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE'; set 'execution.checkpointing.tolerable-failed-checkpoints' = '2'; set 'restart-strategy' = 'fixed-delay'; set 'restart-strategy.fixed-delay.attempts' = '4'; set 'restart-strategy.fixed-delay.delay' = '30s'; set 'execution.checkpointing.unaligned' = 'true'; SET 'execution.savepoint.ignore-unclaimed-state' = 'true'; SET 'table.exec.source.cdc-events-duplicate' = 'true'; -- 采集MySQL数据写入Kafka INSERT INTO ods_table_process_dm SELECT id, `name`, age, gender, `address`, create_time, update_time, PROCTIME() as kafka_time FROM table_process_cdc; -- 消费kafka数据写入达梦数据库 INSERT INTO table_process_sink SELECT id, `name`, age, gender, `address`, create_time, update_time, kafka_time, PROCTIME() as etl_time FROM ods_table_process_dm; ``` ## 开发详解 ### flink-connector-jdbc组件分析 source/sink 是 flink最核心的部分之一,通过对其实现原理的学习,再结合源码分析,有助于加深对框架处理过程的理解,以及架构设计上的提升。 ## 逻辑原理 如果我们对自己对接一个数据源,核心的话就是连接器 connector,比如关系型数据库就是 **JDBC**。 ### connector架构 flink-connector-jdbc 是 Apache Flink 框架提供的一个用于与关系型数据库进行连接和交互的连接器。它提供了使用 Flink 进行批处理和流处理的功能,可以方便地将关系型数据库中的数据引入 Flink 进行分析和处理,或者将 Flink 计算结果写入关系型数据库。 flink官方connector的架构如下 ![flink-connector-jdbc](images/flink-connector-jdbc.png) * MetaData 将 sql create source table 转化为实际的 CatalogTable,对应代码 RelNode; * Planning 创建 RelNode 的过程中使用 SPI 将所有的 source(DynamicTableSourceFactory)\sink(DynamicTableSinkFactory) 工厂动态加载,获取到 connector = kafka,然后从所有 source 工厂中过滤出名称为 kafka 并且 继承自 DynamicTableSourceFactory.class 的工厂类 KafkaDynamicTableFactory,使用 KafkaDynamicTableFactory 创建出 KafkaDynamicSource; * Runtime KafkaDynamicSource 创建出 FlinkKafkaConsumer,负责flink程序实际运行。 flink-connector-jdbc可以实现以下核心功能: * 数据源连接:通过 flink-connector-jdbc连接到各种支持JDBC标准的关系型数据库,如MySQL、PostgreSQL、Oracle等。 * 数据写入:可以将Flink的计算结果写入关系型数据库中,实现数据的持久化。 * 数据读取:可以从关系型数据库中读取数据,并将其作为Flink计算的输入数据。 * 数据格式转换:可以将关系型数据库中的数据转换为适合Flink计算的数据格式。 * 并行处理:可以根据数据源的并行度将数据进行分区和并行处理,以加速数据处理的速度。 flink-connector-jdbc为Flink提供了与关系型数据库集成的能力,可以方便地进行数据的导入、导出和处理,为开发人员提供了更强大和灵活的数据处理能力。 以下是 flink-connector-jdbc 源码组成:**红色框中的代码就是我开发的哦** ![image-20231102164731459](images/image-20231102164731459.png) ### 开发思路 * 首先添加达梦数据库的pom.xml依赖 ```xml 8.1.2.192 com.dameng DmJdbcDriver18 ${dm.version} provided ``` 经官网推荐和Maven仓库克洗 * 首先根据SPI机制添加支持DM数据库的工厂类 ```sh # resource/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory org.apache.flink.connector.jdbc.databases.dm.dialect.DmDialectFactory ``` * 创建工程类DmDialectFactory ```java // 在database下面创建dm,再创建 /dm/dialect // 路径如下:flink-connector-jdbc-3.1.1/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/dm/dialect/DmDialectFactory.java package org.apache.flink.connector.jdbc.databases.dm.dialect; import org.apache.flink.connector.jdbc.dialect.JdbcDialect; import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory; /** Factory for {@link DmDialect}. */ public class DmDialectFactory implements JdbcDialectFactory { @Override public boolean acceptsURL(String url) { return url.startsWith("jdbc:dm:"); } @Override public JdbcDialect create() { return new DmDialect(); } } ``` JdbcDialectFactory是一个工厂类,用于创建特定数据库的JdbcDialect实例。它的主要作用是根据用户提供的JDBC连接URL,确定要连接的数据库类型,并创建对应的JdbcDialect实例。通过JdbcDialect实例,flink-connector-jdbc可以为特定类型的数据库提供更高级的功能和最佳性能。例如,JdbcDialect 可以优化生成的SQL查询,使用特定的语法和函数。它还可以检测数据库支持的特性,以避免不支持的操作。 * 创建方言类DmDialect ```java package org.apache.flink.connector.jdbc.databases.dm.dialect; import org.apache.flink.annotation.Internal; import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; import org.apache.flink.connector.jdbc.dialect.AbstractDialect; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; import java.util.Arrays; import java.util.EnumSet; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; /** JDBC dialect for Dm8. */ @Internal public class DmDialect extends AbstractDialect { private static final long serialVersionUID = 1L; private static final int MAX_TIMESTAMP_PRECISION = 9; private static final int MIN_TIMESTAMP_PRECISION = 1; private static final int MAX_DECIMAL_PRECISION = 38; private static final int MIN_DECIMAL_PRECISION = 1; @Override public JdbcRowConverter getRowConverter(RowType rowType) { return new DmRowConverter(rowType); } @Override public String getLimitClause(long limit) { return "FETCH FIRST " + limit + " ROWS ONLY"; } @Override public Optional defaultDriverName() { return Optional.of("dm.jdbc.driver.DmDriver"); } @Override public String dialectName() { return "Dm"; } @Override public String quoteIdentifier(String identifier) { return identifier; } @Override public Optional getUpsertStatement( String tableName, String[] fieldNames, String[] uniqueKeyFields) { String sourceFields = Arrays.stream(fieldNames) .map(f -> ":" + f + " " + quoteIdentifier(f)) .collect(Collectors.joining(", ")); String onClause = Arrays.stream(uniqueKeyFields) .map(f -> "t." + quoteIdentifier(f) + "=s." + quoteIdentifier(f)) .collect(Collectors.joining(" and ")); final Set uniqueKeyFieldsSet = Arrays.stream(uniqueKeyFields).collect(Collectors.toSet()); String updateClause = Arrays.stream(fieldNames) .filter(f -> !uniqueKeyFieldsSet.contains(f)) .map(f -> "t." + quoteIdentifier(f) + "=s." + quoteIdentifier(f)) .collect(Collectors.joining(", ")); String insertFields = Arrays.stream(fieldNames) .map(this::quoteIdentifier) .collect(Collectors.joining(", ")); String valuesClause = Arrays.stream(fieldNames) .map(f -> "s." + quoteIdentifier(f)) .collect(Collectors.joining(", ")); // if we can't divide schema and table-name is risky to call quoteIdentifier(tableName) // for example [tbo].[sometable] is ok but [tbo.sometable] is not String mergeQuery = " MERGE INTO " + tableName + " t " + " USING (SELECT " + sourceFields + " FROM DUAL) s " + " ON (" + onClause + ") " + " WHEN MATCHED THEN UPDATE SET " + updateClause + " WHEN NOT MATCHED THEN INSERT (" + insertFields + ")" + " VALUES (" + valuesClause + ")"; return Optional.of(mergeQuery); } @Override public Optional decimalPrecisionRange() { return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION)); } @Override public Optional timestampPrecisionRange() { return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION)); } @Override public Set supportedTypes() { return EnumSet.of( LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR, LogicalTypeRoot.BOOLEAN, LogicalTypeRoot.VARBINARY, LogicalTypeRoot.DECIMAL, LogicalTypeRoot.TINYINT, LogicalTypeRoot.SMALLINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BIGINT, LogicalTypeRoot.FLOAT, LogicalTypeRoot.DOUBLE, LogicalTypeRoot.DATE, LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, LogicalTypeRoot.ARRAY); } } ``` JdbcDialect 是一个接口,用于定义与特定数据库相关的SQL语法和行为。每种不同类型的数据库可能有一些特定的SQL方言和行为,JdbcDialect提供了一种方式来处理这些差异,以确保在不同类型的数据库上执行的SQL操作正确执行,并且能够提供最佳的性能。 * 创建数据转换类DmRowConverter ```java /** * Runtime converter that responsible to convert between JDBC object and Flink internal object for * Dm8. */ @Internal public class DmRowConverter extends AbstractJdbcRowConverter { private static final long serialVersionUID = 1L; public DmRowConverter(RowType rowType) { super(rowType); } @Override public JdbcDeserializationConverter createInternalConverter(LogicalType type) { switch (type.getTypeRoot()) { case NULL: return val -> null; case BOOLEAN: return val -> val instanceof NUMBER ? ((NUMBER) val).booleanValue() : val; case FLOAT: return val -> val instanceof NUMBER ? ((NUMBER) val).floatValue() : val instanceof BINARY_FLOAT ? ((BINARY_FLOAT) val).floatValue() : val instanceof BigDecimal ? ((BigDecimal) val).floatValue() : val; case DOUBLE: return val -> val instanceof NUMBER ? ((NUMBER) val).doubleValue() : val instanceof BINARY_DOUBLE ? ((BINARY_DOUBLE) val).doubleValue() : val instanceof BigDecimal ? ((BigDecimal) val).doubleValue() : val; case TINYINT: return val -> val instanceof NUMBER ? ((NUMBER) val).byteValue() : val instanceof BigDecimal ? ((BigDecimal) val).byteValue() : val; case SMALLINT: return val -> val instanceof NUMBER ? ((NUMBER) val).shortValue() : val instanceof BigDecimal ? ((BigDecimal) val).shortValue() : val; case INTEGER: return val -> val instanceof NUMBER ? ((NUMBER) val).intValue() : val instanceof BigDecimal ? ((BigDecimal) val).intValue() : val; case BIGINT: return val -> val instanceof NUMBER ? ((NUMBER) val).longValue() : val instanceof BigDecimal ? ((BigDecimal) val).longValue() : val; case DECIMAL: final int precision = ((DecimalType) type).getPrecision(); final int scale = ((DecimalType) type).getScale(); return val -> val instanceof BigInteger ? DecimalData.fromBigDecimal( new BigDecimal((BigInteger) val, 0), precision, scale) : DecimalData.fromBigDecimal((BigDecimal) val, precision, scale); case CHAR: case VARCHAR: return val -> (val instanceof CHAR) ? StringData.fromString(((CHAR) val).getString()) : (val instanceof OracleClob) ? StringData.fromString(((OracleClob) val).stringValue()) : StringData.fromString((String) val); case BINARY: case VARBINARY: case RAW: return val -> val instanceof RAW ? ((RAW) val).getBytes() : val instanceof OracleBlob ? ((OracleBlob) val) .getBytes(1, (int) ((OracleBlob) val).length()) : val.toString().getBytes(); case INTERVAL_YEAR_MONTH: case INTERVAL_DAY_TIME: return val -> val instanceof NUMBER ? ((NUMBER) val).intValue() : val; case DATE: return val -> val instanceof DATE ? (int) (((DATE) val).dateValue().toLocalDate().toEpochDay()) : val instanceof Timestamp ? (int) (((Timestamp) val) .toLocalDateTime() .toLocalDate() .toEpochDay()) : (int) (((Date) val).toLocalDate().toEpochDay()); case TIME_WITHOUT_TIME_ZONE: return val -> val instanceof DATE ? (int) (((DATE) val).timeValue().toLocalTime().toNanoOfDay() / 1_000_000L) : (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L); case TIMESTAMP_WITHOUT_TIME_ZONE: return val -> val instanceof TIMESTAMP ? TimestampData.fromTimestamp(((TIMESTAMP) val).timestampValue()) : TimestampData.fromTimestamp((Timestamp) val); case TIMESTAMP_WITH_TIME_ZONE: return val -> { if (val instanceof TIMESTAMPTZ) { final TIMESTAMPTZ ts = (TIMESTAMPTZ) val; final ZonedDateTime zdt = ZonedDateTime.ofInstant( ts.timestampValue().toInstant(), ts.getTimeZone().toZoneId()); return TimestampData.fromLocalDateTime(zdt.toLocalDateTime()); } else { return TimestampData.fromTimestamp((Timestamp) val); } }; case ARRAY: case ROW: case MAP: case MULTISET: default: return super.createInternalConverter(type); } } @Override public String converterName() { return "Dm"; } } ``` createInternalConverter 是一个方法,用于创建将 JDBC ResultSet中的数据转换为Flink的内部数据结构的转换器。这个方法通常在JDBCInputFormat中被调用。在 Flink中,使用JDBCInputFormat从关系型数据库中读取数据时,它会将JDBC的ResultSet对象作为输入,然后通过 createInternalConverter方法将 ResultSet中的每一行数据转换为Flink的内部数据结构(例如Tuple或Row),以便后续的处理和计算。 ### 源码打包 使用idea工具的maven打包并在项目文件夹打开。 `target/flink-connector-jdbc-3.1.1.jar`