Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.CommonOptions;
import org.apache.seatunnel.api.table.type.MetadataUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
Expand Down Expand Up @@ -58,6 +60,8 @@ public void emitRecord(
KafkaSourceSplitState splitState)
throws Exception {
outputCollector.output = collector;
// Propagate Kafka record timestamp as metadata EventTime so downstream can materialize it
outputCollector.setCurrentRecordTimestamp(consumerRecord.timestamp());
Comment on lines +63 to +64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not do it in DeserializationSchema ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reminder. I'll check it later.

// todo there is an additional loss in this place for non-multi-table scenarios
DeserializationSchema<SeaTunnelRow> deserializationSchema =
mapMetadata.get(splitState.getTablePath()).getDeserializationSchema();
Expand Down Expand Up @@ -87,9 +91,25 @@ public void emitRecord(

private static class OutputCollector<T> implements Collector<T> {
private Collector<T> output;
private Long currentRecordTimestamp;

void setCurrentRecordTimestamp(Long ts) {
this.currentRecordTimestamp = ts;
}

@Override
public void collect(T record) {
// Attach Kafka record timestamp into metadata only if the row doesn't already carry
// an EventTime (e.g., CDC formats provide their own 'ts').
if (record instanceof SeaTunnelRow
&& currentRecordTimestamp != null
&& currentRecordTimestamp >= 0) {
SeaTunnelRow row = (SeaTunnelRow) record;
Object existing = row.getOptions().get(CommonOptions.EVENT_TIME.getName());
if (null == existing) {
MetadataUtil.setEventTime(row, currentRecordTimestamp);
}
}
output.collect(record);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.seatunnel.api.options.table.TableSchemaOptions;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.MetadataColumn;
import org.apache.seatunnel.api.table.catalog.MetadataSchema;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
Expand Down Expand Up @@ -266,20 +268,38 @@ private CatalogTable createCatalogTable(ReadonlyConfig readonlyConfig) {
}
TablePath tablePath = getTablePathFromSchema(readonlyConfig, readonlyConfig.get(TOPIC));

return CatalogTable.of(
TableIdentifier.of("", tablePath),
tableSchema,
new HashMap<String, String>() {
{
Optional.ofNullable(readonlyConfig.get(PROTOBUF_MESSAGE_NAME))
.ifPresent(value -> put(PROTOBUF_MESSAGE_NAME.key(), value));

Optional.ofNullable(readonlyConfig.get(PROTOBUF_SCHEMA))
.ifPresent(value -> put(PROTOBUF_SCHEMA.key(), value));
}
},
Collections.emptyList(),
null);
CatalogTable catalogTable =
CatalogTable.of(
TableIdentifier.of("", tablePath),
tableSchema,
new HashMap<String, String>() {
{
Optional.ofNullable(readonlyConfig.get(PROTOBUF_MESSAGE_NAME))
.ifPresent(
value -> put(PROTOBUF_MESSAGE_NAME.key(), value));

Optional.ofNullable(readonlyConfig.get(PROTOBUF_SCHEMA))
.ifPresent(value -> put(PROTOBUF_SCHEMA.key(), value));
}
},
Collections.emptyList(),
null);

// Expose Kafka record timestamp as metadata 'EventTime' for Metadata transform
MetadataSchema metadataSchema =
MetadataSchema.builder()
.column(
MetadataColumn.of(
org.apache.seatunnel.api.table.type.CommonOptions.EVENT_TIME
.getName(),
BasicType.LONG_TYPE,
0L,
true,
null,
null))
.build();

return CatalogTable.withMetadata(catalogTable, metadataSchema);
}

private TablePath getTablePathFromSchema(ReadonlyConfig readonlyConfig, String topicName) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.kafka.source;

import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.CommonOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;

import org.apache.kafka.common.TopicPartition;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

class KafkaRecordEmitterTest {

@Test
void emitRecordShouldAttachKafkaTimestampAsEventTime() throws Exception {
long kafkaTimestamp = 1690000000000L;

// Prepare a simple deserialization schema that creates a single-field row from bytes
SeaTunnelRowType rowType =
new SeaTunnelRowType(
new String[] {"f0"}, new SeaTunnelDataType[] {BasicType.STRING_TYPE});
DeserializationSchema<SeaTunnelRow> schema = new SimpleStringRowSchema(rowType);

// Build ConsumerMetadata map for the table
ConsumerMetadata metadata = new ConsumerMetadata();
metadata.setDeserializationSchema(schema);
Map<TablePath, ConsumerMetadata> map = new HashMap<>();
TablePath tablePath = TablePath.DEFAULT;
map.put(tablePath, metadata);

KafkaRecordEmitter emitter = new KafkaRecordEmitter(map, MessageFormatErrorHandleWay.FAIL);

// Mock ConsumerRecord<byte[], byte[]>
org.apache.kafka.clients.consumer.ConsumerRecord<byte[], byte[]> record =
Mockito.mock(org.apache.kafka.clients.consumer.ConsumerRecord.class);
Mockito.when(record.timestamp()).thenReturn(kafkaTimestamp);
Mockito.when(record.value()).thenReturn("hello".getBytes(StandardCharsets.UTF_8));
Mockito.when(record.offset()).thenReturn(100L);

// Prepare split state
KafkaSourceSplit split = new KafkaSourceSplit(tablePath, new TopicPartition("t", 0));
KafkaSourceSplitState splitState = new KafkaSourceSplitState(split);

// Capture outputs
List<SeaTunnelRow> out = new ArrayList<>();
Collector<SeaTunnelRow> collector = new TestCollector(out);

emitter.emitRecord(record, collector, splitState);

Assertions.assertEquals(1, out.size());
SeaTunnelRow row = out.get(0);
Object eventTime = row.getOptions().get(CommonOptions.EVENT_TIME.getName());
Assertions.assertEquals(kafkaTimestamp, eventTime);

// Also verify split state offset advanced
Assertions.assertEquals(101L, splitState.getCurrentOffset());
}

@Test
void emitRecordShouldNotAttachEventTimeWhenTimestampNegative() throws Exception {
long kafkaTimestamp = -1L; // invalid timestamp

SeaTunnelRowType rowType =
new SeaTunnelRowType(
new String[] {"f0"}, new SeaTunnelDataType[] {BasicType.STRING_TYPE});
DeserializationSchema<SeaTunnelRow> schema = new SimpleStringRowSchema(rowType);

ConsumerMetadata metadata = new ConsumerMetadata();
metadata.setDeserializationSchema(schema);
Map<TablePath, ConsumerMetadata> map = new HashMap<>();
TablePath tablePath = TablePath.DEFAULT;
map.put(tablePath, metadata);

KafkaRecordEmitter emitter = new KafkaRecordEmitter(map, MessageFormatErrorHandleWay.FAIL);

org.apache.kafka.clients.consumer.ConsumerRecord<byte[], byte[]> record =
Mockito.mock(org.apache.kafka.clients.consumer.ConsumerRecord.class);
Mockito.when(record.timestamp()).thenReturn(kafkaTimestamp);
Mockito.when(record.value()).thenReturn("world".getBytes(StandardCharsets.UTF_8));
Mockito.when(record.offset()).thenReturn(5L);

KafkaSourceSplit split = new KafkaSourceSplit(tablePath, new TopicPartition("t2", 1));
KafkaSourceSplitState splitState = new KafkaSourceSplitState(split);

List<SeaTunnelRow> out = new ArrayList<>();
Collector<SeaTunnelRow> collector = new TestCollector(out);

emitter.emitRecord(record, collector, splitState);

Assertions.assertEquals(1, out.size());
SeaTunnelRow row = out.get(0);
Assertions.assertFalse(row.getOptions().containsKey(CommonOptions.EVENT_TIME.getName()));
Assertions.assertEquals(6L, splitState.getCurrentOffset());
}

private static class SimpleStringRowSchema implements DeserializationSchema<SeaTunnelRow> {
private final SeaTunnelRowType producedType;

private SimpleStringRowSchema(SeaTunnelRowType producedType) {
this.producedType = producedType;
}

@Override
public SeaTunnelRow deserialize(byte[] message) throws IOException {
String v = new String(message, StandardCharsets.UTF_8);
return new SeaTunnelRow(new Object[] {v});
}

@Override
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return producedType;
}
}

private static class TestCollector implements Collector<SeaTunnelRow> {
private final List<SeaTunnelRow> out;

private TestCollector(List<SeaTunnelRow> out) {
this.out = out;
}

@Override
public void collect(SeaTunnelRow record) {
out.add(record);
}

@Override
public Object getCheckpointLock() {
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,36 @@ public void testSourceKafkaJsonFormatErrorHandleWayFailToConsole(TestContainer c
Assertions.assertEquals(1, execResult.getExitCode(), execResult.getStderr());
}

@TestTemplate
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK},
disabledReason =
"The implementation of the Spark engine does not currently support metadata.")
public void testSourceKafkaTextEventTimeToAssert(TestContainer container)
throws IOException, InterruptedException {
long fixedTimestamp = 1738395840000L;
TextSerializationSchema serializer =
TextSerializationSchema.builder()
.seaTunnelRowType(SEATUNNEL_ROW_TYPE)
.delimiter(",")
.build();
generateTestData(
row ->
new ProducerRecord<>(
"test_topic_text_eventtime",
null,
fixedTimestamp,
null,
serializer.serialize(row)),
0,
10);
Container.ExecResult execResult =
container.executeJob(
"/textFormatIT/kafka_source_text_with_event_time_to_assert.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}

@TestTemplate
public void testSourceKafka(TestContainer container) throws IOException, InterruptedException {
testKafkaLatestToConsole(container);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"
}

source {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "test_topic_text_eventtime"
plugin_output = "kafka_table"
start_mode = "earliest"
format_error_handle_way = fail
schema = {
fields {
id = bigint
c_map = "map<string, smallint>"
c_array = "array<tinyint>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(2, 1)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
format = text
field_delimiter = ","
}
}

transform {
Metadata {
plugin_input = "kafka_table"
plugin_output = "kafka_table_with_meta"
metadata_fields = {
EventTime = event_time_ms
}
}
}

sink {
Assert {
plugin_input = "kafka_table_with_meta"
rules = {
field_rules = [
{
field_name = event_time_ms
field_type = bigint
field_value = [
{ rule_type = NOT_NULL }
]
}
]
}
}
}