代码拉取完成,页面将自动刷新
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
index e872b336e8..7ffd0a90f3 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
@@ -116,6 +116,10 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation
@Override
public R apply(R record) {
+ if (operatingValue(record) == null) {
+ return record;
+ }
+
if (operatingSchema(record) == null) {
return applySchemaless(record);
} else {
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
index ae90c1956b..d25fd8cf2a 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
@@ -88,6 +88,43 @@ public class CastTest {
assertThrows(ConfigException.class, () -> xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8,int32")));
}
+ @Test
+ public void castNullValueRecordWithSchema() {
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64"));
+ SourceRecord original = new SourceRecord(null, null, "topic", 0,
+ Schema.STRING_SCHEMA, "key", Schema.STRING_SCHEMA, null);
+ SourceRecord transformed = xformValue.apply(original);
+ assertEquals(original, transformed);
+ }
+
+ @Test
+ public void castNullValueRecordSchemaless() {
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64"));
+ SourceRecord original = new SourceRecord(null, null, "topic", 0,
+ Schema.STRING_SCHEMA, "key", null, null);
+ SourceRecord transformed = xformValue.apply(original);
+ assertEquals(original, transformed);
+ }
+
+ @Test
+ public void castNullKeyRecordWithSchema() {
+ xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64"));
+ SourceRecord original = new SourceRecord(null, null, "topic", 0,
+ Schema.STRING_SCHEMA, null, Schema.STRING_SCHEMA, "value");
+ SourceRecord transformed = xformKey.apply(original);
+ assertEquals(original, transformed);
+ }
+
+ @Test
+ public void castNullKeyRecordSchemaless() {
+ xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64"));
+ SourceRecord original = new SourceRecord(null, null, "topic", 0,
+ null, null, Schema.STRING_SCHEMA, "value");
+ SourceRecord transformed = xformKey.apply(original);
+ assertEquals(original, transformed);
+ }
+
+
@Test
public void castWholeRecordKeyWithSchema() {
xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。