flink-json的增强版,可使用Flink SQL解析Debezium采集数据的op
属性,解决官方版本的flink-json无法获取op属性的问题。
<!-- https://mvnrepository.com/artifact/cn.tenmg/flink-json-plus -->
<dependency>
<groupId>cn.tenmg</groupId>
<artifactId>flink-json-plus</artifactId>
<version>${flink-json-plus.version}</version>
</dependency>
例如希望将Debezium采集的test表的数据的op
属性解析出来,可以使用如下Flink SQL创建源表(Source Table):
CREATE TABLE test(
id STRING NOT NULL,
name STRING NOT NULL,
op STRING
)
WITH (
'properties.bootstrap.servers' = 'kafka1:9092,kafka2:9092,kafka3:9092',
'properties.group.id' = 'flink-jobs-data-sync.test',
'topic' = 'test.testdb.test', 'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset',
'format'='debezium-json-plus'
)
理论上,只要 Flink 关于 Kafka 连接器的实现未做大幅调整,则该 Flink 版本可支持。用户在测试时应以最小依赖集进行(依赖的 JAR 包越多,导致 JAR 冲突的可能性越高)。目前经过测试的 Flink 版本有:
Flink 版本 | 支持版本 |
---|---|
1.13 | 1.0+ |
1.14 | 1.0+ |
1.15 | 1.0+ |
1.16 | 1.0+ |
1.17 | 1.0+ |
Debezium 是一个 CDC(Changelog Data Capture,变更数据捕获)的工具,可以把来自 MySQL、PostgreSQL、Oracle、Microsoft SQL Server 和许多其他数据库的更改实时流式传输到 Kafka 中。 Debezium 为变更日志提供了统一的格式结构,并支持使用 JSON 和 Apache Avro 序列化消息。
flink-json-plus 目前支持将 Debezium JSON 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如
flink-json-plus 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Debezium 格式的 JSON 消息,输出到 Kafka 等存储中。 但需要注意的是,目前 flink-json-plus 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,flink-json-plus 和 flink-json 一样,将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Debezium 消息。
注意: 请参考 Debezium 文档,了解如何设置 Debezium Kafka Connect 用来将变更日志同步到 Kafka 主题。
问题描述: 在Flink SQL Client 中使用 flink-json-plus,加入 flink-json-plus 依赖后。
DROP TABLE IF EXISTS table_process;
CREATE TABLE table_process (
id BIGINT,
name STRING,
create_time TIMESTAMP,
update_time TIMESTAMP
) WITH (
'properties.bootstrap.servers' = 'hadoop101:9092',
'topic' = 'input_kafka',
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset',
'format'='debezium-json-plus'
);
select * from table_process;
Could not initialize class cn.tenmg.flink.formats.json.debezium.DebeziumJsonPlusFormatOptions
解决方案:
https://gitee.com/tenmg/flink-json-plus/issues/I6FGR7#note_16275275_link
https://gitee.com/tenmg/flink-json-plus/issues/I6GIUF#note_16396699_link
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。