7 Star 5 Fork 23

src-openEuler/kafka

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
0008-Cast-SMT-allow-null.patch 3.00 KB
一键复制 编辑 原始数据 按行查看 历史
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
index e872b336e8..7ffd0a90f3 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
@@ -116,6 +116,10 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation
@Override
public R apply(R record) {
+ if (operatingValue(record) == null) {
+ return record;
+ }
+
if (operatingSchema(record) == null) {
return applySchemaless(record);
} else {
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
index ae90c1956b..d25fd8cf2a 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
@@ -88,6 +88,43 @@ public class CastTest {
assertThrows(ConfigException.class, () -> xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8,int32")));
}
+ @Test
+ public void castNullValueRecordWithSchema() {
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64"));
+ SourceRecord original = new SourceRecord(null, null, "topic", 0,
+ Schema.STRING_SCHEMA, "key", Schema.STRING_SCHEMA, null);
+ SourceRecord transformed = xformValue.apply(original);
+ assertEquals(original, transformed);
+ }
+
+ @Test
+ public void castNullValueRecordSchemaless() {
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64"));
+ SourceRecord original = new SourceRecord(null, null, "topic", 0,
+ Schema.STRING_SCHEMA, "key", null, null);
+ SourceRecord transformed = xformValue.apply(original);
+ assertEquals(original, transformed);
+ }
+
+ @Test
+ public void castNullKeyRecordWithSchema() {
+ xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64"));
+ SourceRecord original = new SourceRecord(null, null, "topic", 0,
+ Schema.STRING_SCHEMA, null, Schema.STRING_SCHEMA, "value");
+ SourceRecord transformed = xformKey.apply(original);
+ assertEquals(original, transformed);
+ }
+
+ @Test
+ public void castNullKeyRecordSchemaless() {
+ xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64"));
+ SourceRecord original = new SourceRecord(null, null, "topic", 0,
+ null, null, Schema.STRING_SCHEMA, "value");
+ SourceRecord transformed = xformKey.apply(original);
+ assertEquals(original, transformed);
+ }
+
+
@Test
public void castWholeRecordKeyWithSchema() {
xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/src-openeuler/kafka.git
git@gitee.com:src-openeuler/kafka.git
src-openeuler
kafka
kafka
master

搜索帮助