diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java index 1e3811ba49a..66810cf33cb 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java @@ -61,6 +61,10 @@ public void emitRecord( // todo there is an additional loss in this place for non-multi-table scenarios DeserializationSchema deserializationSchema = mapMetadata.get(splitState.getTablePath()).getDeserializationSchema(); + if (deserializationSchema instanceof KafkaEventTimeDeserializationSchema) { + ((KafkaEventTimeDeserializationSchema) deserializationSchema) + .setCurrentRecordTimestamp(consumerRecord.timestamp()); + } try { if (deserializationSchema instanceof CompatibleKafkaConnectDeserializationSchema) { ((CompatibleKafkaConnectDeserializationSchema) deserializationSchema) diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java index 97e5451ce5f..ce2c38cbb04 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java @@ -25,6 +25,8 @@ import org.apache.seatunnel.api.options.table.TableSchemaOptions; import org.apache.seatunnel.api.serialization.DeserializationSchema; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.MetadataColumn; +import org.apache.seatunnel.api.table.catalog.MetadataSchema; import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; @@ -266,20 +268,38 @@ private CatalogTable createCatalogTable(ReadonlyConfig readonlyConfig) { } TablePath tablePath = getTablePathFromSchema(readonlyConfig, readonlyConfig.get(TOPIC)); - return CatalogTable.of( - TableIdentifier.of("", tablePath), - tableSchema, - new HashMap() { - { - Optional.ofNullable(readonlyConfig.get(PROTOBUF_MESSAGE_NAME)) - .ifPresent(value -> put(PROTOBUF_MESSAGE_NAME.key(), value)); + CatalogTable catalogTable = + CatalogTable.of( + TableIdentifier.of("", tablePath), + tableSchema, + new HashMap() { + { + Optional.ofNullable(readonlyConfig.get(PROTOBUF_MESSAGE_NAME)) + .ifPresent( + value -> put(PROTOBUF_MESSAGE_NAME.key(), value)); - Optional.ofNullable(readonlyConfig.get(PROTOBUF_SCHEMA)) - .ifPresent(value -> put(PROTOBUF_SCHEMA.key(), value)); - } - }, - Collections.emptyList(), - null); + Optional.ofNullable(readonlyConfig.get(PROTOBUF_SCHEMA)) + .ifPresent(value -> put(PROTOBUF_SCHEMA.key(), value)); + } + }, + Collections.emptyList(), + null); + + // Expose Kafka record timestamp as metadata 'EventTime' for Metadata transform + MetadataSchema metadataSchema = + MetadataSchema.builder() + .column( + MetadataColumn.of( + org.apache.seatunnel.api.table.type.CommonOptions.EVENT_TIME + .getName(), + BasicType.LONG_TYPE, + 0L, + true, + null, + null)) + .build(); + + return CatalogTable.withMetadata(catalogTable, metadataSchema); } private TablePath getTablePathFromSchema(ReadonlyConfig readonlyConfig, String topicName) { @@ -299,85 +319,110 @@ private DeserializationSchema createDeserializationSchema( SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType(); MessageFormat format = readonlyConfig.get(FORMAT); + DeserializationSchema schema; + if (format == MessageFormat.NATIVE) { - return new NativeKafkaConnectDeserializationSchema( - catalogTable, false, false, false, false); + schema = + new NativeKafkaConnectDeserializationSchema( + catalogTable, false, false, false, false); + } else if (!readonlyConfig.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) { + schema = + TextDeserializationSchema.builder() + .seaTunnelRowType(seaTunnelRowType) + .delimiter(TextFormatConstant.PLACEHOLDER) + .setCatalogTable(catalogTable) + .build(); + } else { + switch (format) { + case JSON: + schema = new JsonDeserializationSchema(catalogTable, false, false); + break; + case TEXT: + String delimiter = readonlyConfig.get(FIELD_DELIMITER); + schema = + TextDeserializationSchema.builder() + .seaTunnelRowType(seaTunnelRowType) + .delimiter(delimiter) + .build(); + break; + case CANAL_JSON: + schema = + CanalJsonDeserializationSchema.builder(catalogTable) + .setIgnoreParseErrors(true) + .build(); + break; + case OGG_JSON: + schema = + OggJsonDeserializationSchema.builder(catalogTable) + .setIgnoreParseErrors(true) + .build(); + break; + case MAXWELL_JSON: + schema = + MaxWellJsonDeserializationSchema.builder(catalogTable) + .setIgnoreParseErrors(true) + .build(); + break; + case COMPATIBLE_KAFKA_CONNECT_JSON: + Boolean keySchemaEnable = + readonlyConfig.get( + KafkaConnectJsonFormatOptions.KEY_CONVERTER_SCHEMA_ENABLED); + Boolean valueSchemaEnable = + readonlyConfig.get( + KafkaConnectJsonFormatOptions.VALUE_CONVERTER_SCHEMA_ENABLED); + schema = + new CompatibleKafkaConnectDeserializationSchema( + catalogTable, keySchemaEnable, valueSchemaEnable, false, false); + break; + case DEBEZIUM_JSON: + boolean includeSchema = readonlyConfig.get(DEBEZIUM_RECORD_INCLUDE_SCHEMA); + TableIdentifierConfig tableFilter = + readonlyConfig.get(DEBEZIUM_RECORD_TABLE_FILTER); + if (tableFilter != null) { + TablePath tablePath = + TablePath.of( + StringUtils.isNotEmpty(tableFilter.getDatabaseName()) + ? tableFilter.getDatabaseName() + : null, + StringUtils.isNotEmpty(tableFilter.getSchemaName()) + ? tableFilter.getSchemaName() + : null, + StringUtils.isNotEmpty(tableFilter.getTableName()) + ? tableFilter.getTableName() + : null); + Map tableDeserializationMap = + Collections.singletonMap( + tablePath, + new DebeziumJsonDeserializationSchema( + catalogTable, true, includeSchema)); + schema = + new DebeziumJsonDeserializationSchemaDispatcher( + tableDeserializationMap, true, includeSchema); + } else { + schema = + new DebeziumJsonDeserializationSchema( + catalogTable, true, includeSchema); + } + break; + case AVRO: + schema = new AvroDeserializationSchema(catalogTable); + break; + case PROTOBUF: + schema = new ProtobufDeserializationSchema(catalogTable); + break; + default: + throw new SeaTunnelJsonFormatException( + CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, + "Unsupported format: " + format); + } } - if (!readonlyConfig.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) { - return TextDeserializationSchema.builder() - .seaTunnelRowType(seaTunnelRowType) - .delimiter(TextFormatConstant.PLACEHOLDER) - .setCatalogTable(catalogTable) - .build(); + if (schema instanceof NativeKafkaConnectDeserializationSchema + || schema instanceof CompatibleKafkaConnectDeserializationSchema) { + return schema; } - switch (format) { - case JSON: - return new JsonDeserializationSchema(catalogTable, false, false); - case TEXT: - String delimiter = readonlyConfig.get(FIELD_DELIMITER); - return TextDeserializationSchema.builder() - .seaTunnelRowType(seaTunnelRowType) - .delimiter(delimiter) - .build(); - case CANAL_JSON: - return CanalJsonDeserializationSchema.builder(catalogTable) - .setIgnoreParseErrors(true) - .build(); - case OGG_JSON: - return OggJsonDeserializationSchema.builder(catalogTable) - .setIgnoreParseErrors(true) - .build(); - case MAXWELL_JSON: - return MaxWellJsonDeserializationSchema.builder(catalogTable) - .setIgnoreParseErrors(true) - .build(); - - case COMPATIBLE_KAFKA_CONNECT_JSON: - Boolean keySchemaEnable = - readonlyConfig.get( - KafkaConnectJsonFormatOptions.KEY_CONVERTER_SCHEMA_ENABLED); - Boolean valueSchemaEnable = - readonlyConfig.get( - KafkaConnectJsonFormatOptions.VALUE_CONVERTER_SCHEMA_ENABLED); - return new CompatibleKafkaConnectDeserializationSchema( - catalogTable, keySchemaEnable, valueSchemaEnable, false, false); - case DEBEZIUM_JSON: - boolean includeSchema = readonlyConfig.get(DEBEZIUM_RECORD_INCLUDE_SCHEMA); - TableIdentifierConfig tableFilter = - readonlyConfig.get(DEBEZIUM_RECORD_TABLE_FILTER); - if (tableFilter != null) { - TablePath tablePath = - TablePath.of( - StringUtils.isNotEmpty(tableFilter.getDatabaseName()) - ? tableFilter.getDatabaseName() - : null, - StringUtils.isNotEmpty(tableFilter.getSchemaName()) - ? tableFilter.getSchemaName() - : null, - StringUtils.isNotEmpty(tableFilter.getTableName()) - ? tableFilter.getTableName() - : null); - Map tableDeserializationMap = - Collections.singletonMap( - tablePath, - new DebeziumJsonDeserializationSchema( - catalogTable, true, includeSchema)); - return new DebeziumJsonDeserializationSchemaDispatcher( - tableDeserializationMap, true, includeSchema); - } else { - return new DebeziumJsonDeserializationSchema(catalogTable, true, includeSchema); - } - case AVRO: - return new AvroDeserializationSchema(catalogTable); - case PROTOBUF: - return new ProtobufDeserializationSchema(catalogTable); - default: - throw new SeaTunnelJsonFormatException( - CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, - "Unsupported format: " + format); - } + return new KafkaEventTimeDeserializationSchema(schema); } private TableSchema nativeTableSchema() { diff --git a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitterTest.java b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitterTest.java new file mode 100644 index 00000000000..14fd6303048 --- /dev/null +++ b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitterTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.kafka.source; + +import org.apache.seatunnel.api.serialization.DeserializationSchema; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.CommonOptions; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay; + +import org.apache.kafka.common.TopicPartition; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +class KafkaRecordEmitterTest { + + @Test + void emitRecordShouldAttachKafkaTimestampAsEventTime() throws Exception { + long kafkaTimestamp = 1690000000000L; + + // Prepare a simple deserialization schema that creates a single-field row from bytes + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"f0"}, new SeaTunnelDataType[] {BasicType.STRING_TYPE}); + DeserializationSchema schema = + new KafkaEventTimeDeserializationSchema(new SimpleStringRowSchema(rowType)); + + // Build ConsumerMetadata map for the table + ConsumerMetadata metadata = new ConsumerMetadata(); + metadata.setDeserializationSchema(schema); + Map map = new HashMap<>(); + TablePath tablePath = TablePath.DEFAULT; + map.put(tablePath, metadata); + + KafkaRecordEmitter emitter = new KafkaRecordEmitter(map, MessageFormatErrorHandleWay.FAIL); + + // Mock ConsumerRecord + org.apache.kafka.clients.consumer.ConsumerRecord record = + Mockito.mock(org.apache.kafka.clients.consumer.ConsumerRecord.class); + Mockito.when(record.timestamp()).thenReturn(kafkaTimestamp); + Mockito.when(record.value()).thenReturn("hello".getBytes(StandardCharsets.UTF_8)); + Mockito.when(record.offset()).thenReturn(100L); + + // Prepare split state + KafkaSourceSplit split = new KafkaSourceSplit(tablePath, new TopicPartition("t", 0)); + KafkaSourceSplitState splitState = new KafkaSourceSplitState(split); + + // Capture outputs + List out = new ArrayList<>(); + Collector collector = new TestCollector(out); + + emitter.emitRecord(record, collector, splitState); + + Assertions.assertEquals(1, out.size()); + SeaTunnelRow row = out.get(0); + Object eventTime = row.getOptions().get(CommonOptions.EVENT_TIME.getName()); + Assertions.assertEquals(kafkaTimestamp, eventTime); + + // Also verify split state offset advanced + Assertions.assertEquals(101L, splitState.getCurrentOffset()); + } + + @Test + void emitRecordShouldNotAttachEventTimeWhenTimestampNegative() throws Exception { + long kafkaTimestamp = -1L; // invalid timestamp + + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"f0"}, new SeaTunnelDataType[] {BasicType.STRING_TYPE}); + DeserializationSchema schema = + new KafkaEventTimeDeserializationSchema(new SimpleStringRowSchema(rowType)); + + ConsumerMetadata metadata = new ConsumerMetadata(); + metadata.setDeserializationSchema(schema); + Map map = new HashMap<>(); + TablePath tablePath = TablePath.DEFAULT; + map.put(tablePath, metadata); + + KafkaRecordEmitter emitter = new KafkaRecordEmitter(map, MessageFormatErrorHandleWay.FAIL); + + org.apache.kafka.clients.consumer.ConsumerRecord record = + Mockito.mock(org.apache.kafka.clients.consumer.ConsumerRecord.class); + Mockito.when(record.timestamp()).thenReturn(kafkaTimestamp); + Mockito.when(record.value()).thenReturn("world".getBytes(StandardCharsets.UTF_8)); + Mockito.when(record.offset()).thenReturn(5L); + + KafkaSourceSplit split = new KafkaSourceSplit(tablePath, new TopicPartition("t2", 1)); + KafkaSourceSplitState splitState = new KafkaSourceSplitState(split); + + List out = new ArrayList<>(); + Collector collector = new TestCollector(out); + + emitter.emitRecord(record, collector, splitState); + + Assertions.assertEquals(1, out.size()); + SeaTunnelRow row = out.get(0); + Assertions.assertFalse(row.getOptions().containsKey(CommonOptions.EVENT_TIME.getName())); + Assertions.assertEquals(6L, splitState.getCurrentOffset()); + } + + private static class SimpleStringRowSchema implements DeserializationSchema { + private final SeaTunnelRowType producedType; + + private SimpleStringRowSchema(SeaTunnelRowType producedType) { + this.producedType = producedType; + } + + @Override + public SeaTunnelRow deserialize(byte[] message) throws IOException { + String v = new String(message, StandardCharsets.UTF_8); + return new SeaTunnelRow(new Object[] {v}); + } + + @Override + public SeaTunnelDataType getProducedType() { + return producedType; + } + } + + private static class TestCollector implements Collector { + private final List out; + + private TestCollector(List out) { + this.out = out; + } + + @Override + public void collect(SeaTunnelRow record) { + out.add(record); + } + + @Override + public Object getCheckpointLock() { + return this; + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java index 124ed7ff176..38c9184139f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java @@ -469,6 +469,36 @@ public void testSourceKafkaJsonFormatErrorHandleWayFailToConsole(TestContainer c Assertions.assertEquals(1, execResult.getExitCode(), execResult.getStderr()); } + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK}, + disabledReason = + "The implementation of the Spark engine does not currently support metadata.") + public void testSourceKafkaTextEventTimeToAssert(TestContainer container) + throws IOException, InterruptedException { + long fixedTimestamp = 1738395840000L; + TextSerializationSchema serializer = + TextSerializationSchema.builder() + .seaTunnelRowType(SEATUNNEL_ROW_TYPE) + .delimiter(",") + .build(); + generateTestData( + row -> + new ProducerRecord<>( + "test_topic_text_eventtime", + null, + fixedTimestamp, + null, + serializer.serialize(row)), + 0, + 10); + Container.ExecResult execResult = + container.executeJob( + "/textFormatIT/kafka_source_text_with_event_time_to_assert.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + @TestTemplate public void testSourceKafka(TestContainer container) throws IOException, InterruptedException { testKafkaLatestToConsole(container); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_with_event_time_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_with_event_time_to_assert.conf new file mode 100644 index 00000000000..6ccd055c1fe --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_with_event_time_to_assert.conf @@ -0,0 +1,80 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9092" + topic = "test_topic_text_eventtime" + plugin_output = "kafka_table" + start_mode = "earliest" + format_error_handle_way = fail + schema = { + fields { + id = bigint + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(2, 1)" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + format = text + field_delimiter = "," + } +} + +transform { + Metadata { + plugin_input = "kafka_table" + plugin_output = "kafka_table_with_meta" + metadata_fields = { + EventTime = event_time_ms + } + } +} + +sink { + Assert { + plugin_input = "kafka_table_with_meta" + rules = { + field_rules = [ + { + field_name = event_time_ms + field_type = bigint + field_value = [ + { rule_type = NOT_NULL } + ] + } + ] + } + } +} + diff --git a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java index 1d0974668b7..0a9e8867eab 100644 --- a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java +++ b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java @@ -21,6 +21,8 @@ import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.type.CommonOptions; +import org.apache.seatunnel.api.table.type.MetadataUtil; import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -122,6 +124,7 @@ public void deserialize(ConsumerRecord msg, Collector msg, Collector getProducedType() { return seaTunnelRowType; } + private void attachEventTime(SeaTunnelRow row, long timestamp) { + if (row == null || timestamp < 0) { + return; + } + Object existing = row.getOptions().get(CommonOptions.EVENT_TIME.getName()); + if (existing == null) { + MetadataUtil.setEventTime(row, timestamp); + } + } + private void tryInitConverter() { if (keyConverter == null) { synchronized (this) { diff --git a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/NativeKafkaConnectDeserializationSchema.java b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/NativeKafkaConnectDeserializationSchema.java index d4aca3ff4b9..e6fa90100e9 100644 --- a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/NativeKafkaConnectDeserializationSchema.java +++ b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/NativeKafkaConnectDeserializationSchema.java @@ -21,6 +21,8 @@ import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.type.CommonOptions; +import org.apache.seatunnel.api.table.type.MetadataUtil; import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -112,6 +114,7 @@ public void deserialize(ConsumerRecord msg, Collector