# flink-json-plus
**Repository Path**: tenmg/flink-json-plus
## Basic Information
- **Project Name**: flink-json-plus
- **Description**: flink-json的增强版,可使用Flink SQL解析Debezium采集数据的op属性
- **Primary Language**: Java
- **License**: Apache-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 6
- **Forks**: 1
- **Created**: 2022-06-08
- **Last Updated**: 2025-08-08
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# flink-json-plus
## 介绍
flink-json的增强版,可使用Flink SQL解析Debezium采集数据的`op`属性,解决官方版本的flink-json无法获取op属性的问题。
## 安装教程
1. 如果使用Flink SQL客户端,则将JAR包上传至flink的lib目录下,重启flink即可使用
2. 如果使用Table API,则除将JAR包上传至flink的lib目录及重启flink外,还需要在项目中引入该JAR。以Maven项目为例(其中${flink-json-plus.version}为版本号,可定义属性或直接使用版本号替换):
```
cn.tenmg
flink-json-plus
${flink-json-plus.version}
```
## 使用说明
例如希望将Debezium采集的test表的数据的`op`属性解析出来,可以使用如下Flink SQL创建源表(Source Table):
```
CREATE TABLE test(
id STRING NOT NULL,
name STRING NOT NULL,
op STRING
)
WITH (
'properties.bootstrap.servers' = 'kafka1:9092,kafka2:9092,kafka3:9092',
'properties.group.id' = 'flink-jobs-data-sync.test',
'topic' = 'test.testdb.test', 'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset',
'format'='debezium-json-plus'
)
```
## 支持版本
理论上,只要 Flink 关于 Kafka 连接器的实现未做大幅调整,则该 Flink 版本可支持。用户在测试时应以最小依赖集进行(依赖的 JAR 包越多,导致 JAR 冲突的可能性越高)。目前经过测试的 Flink 版本有:
Flink 版本|支持版本
---------|--------
1.13 | 1.0+
1.14 | 1.0+
1.15 | 1.0+
1.16 | 1.0+
1.17 | 1.0+
## 支持格式
### Debezium
[Debezium](https://debezium.io) 是一个 CDC(Changelog Data Capture,变更数据捕获)的工具,可以把来自 MySQL、PostgreSQL、Oracle、Microsoft SQL Server 和许多其他数据库的更改实时流式传输到 Kafka 中。 Debezium 为变更日志提供了统一的格式结构,并支持使用 JSON 和 Apache Avro 序列化消息。
flink-json-plus 目前支持将 Debezium JSON 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如
- 将增量数据从数据库同步到其他系统
- 日志审计
- 数据库的实时物化视图
- 关联维度数据库的变更历史,等等。
flink-json-plus 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Debezium 格式的 JSON 消息,输出到 Kafka 等存储中。 但需要注意的是,目前 flink-json-plus 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,flink-json-plus 和 flink-json 一样,将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Debezium 消息。
_注意:_ 请参考 Debezium 文档,了解如何设置 Debezium Kafka Connect 用来将变更日志同步到 Kafka 主题。
## 问题解答
### SQL Client
**问题描述:** 在Flink SQL Client 中使用 [flink-json-plus](https://gitee.com/tenmg/flink-json-plus),加入 [flink-json-plus](https://gitee.com/tenmg/flink-json-plus) 依赖后。
1. 创建源表
```
DROP TABLE IF EXISTS table_process;
CREATE TABLE table_process (
id BIGINT,
name STRING,
create_time TIMESTAMP,
update_time TIMESTAMP
) WITH (
'properties.bootstrap.servers' = 'hadoop101:9092',
'topic' = 'input_kafka',
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset',
'format'='debezium-json-plus'
);
```
2. 执行查询
```
select * from table_process;
```
3. 发生异常
```
Could not initialize class cn.tenmg.flink.formats.json.debezium.DebeziumJsonPlusFormatOptions
```
**解决方案:**
[https://gitee.com/tenmg/flink-json-plus/issues/I6FGR7#note_16275275_link](https://gitee.com/tenmg/flink-json-plus/issues/I6FGR7#note_16275275_link)
[https://gitee.com/tenmg/flink-json-plus/issues/I6GIUF#note_16396699_link](https://gitee.com/tenmg/flink-json-plus/issues/I6GIUF#note_16396699_link)
## 参与贡献
1. Fork 本仓库
2. 新建 Feat_xxx 分支
3. 提交代码
4. 新建 Pull Request