Skip to content

Conversation

@Adamyuanyuan
Copy link
Contributor

Purpose of this pull request

Motivation

  • Problem: In Kafka->Hive streaming, using CURRENT_DATE()/CURRENT_TIMESTAMP() misplaces records when replaying; parsing create_date is brittle due to dirty/mixed formats.

  • Goal: Reuse SeaTunnel’s metadata mechanism to inject Kafka ConsumerRecord.timestamp as EventTime, then let users materialize it via the Metadata transform for SQL/partitioning.
    Design

  • In KafkaRecordEmitter: capture ConsumerRecord.timestamp per record; in OutputCollector.collect, if record is SeaTunnelRow and timestamp>=0, call MetadataUtil.setEventTime(row, ts).

  • No schema change, no mandatory new options; injection is on by default. Users opt-in to materialize via the Metadata transform (e.g., mapping EventTime to kafka_ts).

Does this PR introduce any user-facing change?

user can use

transform {
  Metadata {
    source_table_name = "result_table"
    result_table_name = "result_with_meta"
    metadata_fields = { EventTime = "kafka_ts" }
  }
  Sql {
    source_table_name = "result_with_meta"
    result_table_name = "source_table"
    query = "select ..., FROM_UNIXTIME(kafka_ts/1000, 'yyyy-MM-dd', 'Asia/Shanghai') as pt from result_with_meta where kafka_ts >= 0"
  }
}

to partitioning and transforms

How was this patch tested?

yes,UT and E2E

Check list

@Adamyuanyuan Adamyuanyuan changed the title [Feature][Kafka source connector] Inject Kafka record timestamp as EventTime metadata [Feature][Kafka source] Inject Kafka record timestamp as EventTime metadata Oct 29, 2025
@Adamyuanyuan
Copy link
Contributor Author

Adamyuanyuan commented Nov 3, 2025

This PR can run successfully in Flink, but the E2E run in Spark fails. The reason is that the Spark engine path loses options during the conversion between SeaTunnelRow and Spark Row, causing the Metadata Transform to fail to retrieve EventTime. As a result, the output event_time_ms is empty, leading to an assertion failure.

I need to confirm whether to fix this issue in this PR or start a separate PR specifically to address the problem of metadata loss in the Spark translation layer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant