From 15d658177fcfde9e8c19912f1cae16b53630239e Mon Sep 17 00:00:00 2001 From: Max Falk Date: Mon, 29 Sep 2025 11:34:38 +0200 Subject: [PATCH 1/8] add kafka metadata support Signed-off-by: Max Falk --- docs/content/cdc-ingestion/kafka-cdc.md | 2 + .../generated/kafka_sync_database.html | 4 + .../generated/kafka_sync_table.html | 6 +- .../action/cdc/CdcMetadataConverter.java | 16 +- .../action/cdc/CdcMetadataProcessor.java | 11 +- .../flink/action/cdc/CdcSourceRecord.java | 40 +++- .../action/cdc/format/AbstractDataFormat.java | 11 + .../cdc/format/AbstractRecordParser.java | 36 +++- .../flink/action/cdc/format/DataFormat.java | 20 +- .../debezium/DebeziumAvroDataFormat.java | 13 ++ .../debezium/DebeziumAvroRecordParser.java | 10 + .../action/cdc/kafka/KafkaActionUtils.java | 18 +- ...afkaDebeziumAvroDeserializationSchema.java | 4 +- ...afkaDebeziumJsonDeserializationSchema.java | 5 +- .../cdc/kafka/KafkaMetadataConverter.java | 178 ++++++++++++++++ .../kafka/KafkaSyncDatabaseActionFactory.java | 1 + .../kafka/KafkaSyncTableActionFactory.java | 1 + .../cdc/kafka/KafkaMetadataConverterTest.java | 192 ++++++++++++++++++ 18 files changed, 550 insertions(+), 18 deletions(-) create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java create mode 100644 paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverterTest.java diff --git a/docs/content/cdc-ingestion/kafka-cdc.md b/docs/content/cdc-ingestion/kafka-cdc.md index 7ca3b2728cf9..42ab4fddf8c7 100644 --- a/docs/content/cdc-ingestion/kafka-cdc.md +++ b/docs/content/cdc-ingestion/kafka-cdc.md @@ -103,6 +103,7 @@ To use this feature through `flink run`, run the following shell command. [--primary_keys ] \ [--type_mapping to-string] \ [--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \ + [--metadata_column ] \ [--kafka_conf [--kafka_conf ...]] \ [--catalog_conf [--catalog_conf ...]] \ [--table_conf [--table_conf ...]] @@ -215,6 +216,7 @@ To use this feature through `flink run`, run the following shell command. [--partition_keys ] \ [--primary_keys ] \ [--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \ + [--metadata_column ] \ [--kafka_conf [--kafka_conf ...]] \ [--catalog_conf [--catalog_conf ...]] \ [--table_conf [--table_conf ...]] diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html b/docs/layouts/shortcodes/generated/kafka_sync_database.html index e8d5898c3401..41ace31c45e1 100644 --- a/docs/layouts/shortcodes/generated/kafka_sync_database.html +++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html @@ -94,6 +94,10 @@
--computed_column
The definitions of computed columns. The argument field is from Kafka topic's table field name. See here for a complete list of configurations. NOTICE: It returns null if the referenced column does not exist in the source table. + +
--metadata_column
+ --metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data. Available values are topic, partition, offset, timestamp and timestamp_type (one of 'NoTimestampType', 'CreateTime' or 'LogAppendTime'). +
--eager_init
It is default false. If true, all relevant tables commiter will be initialized eagerly, which means those tables could be forced to create snapshot. diff --git a/docs/layouts/shortcodes/generated/kafka_sync_table.html b/docs/layouts/shortcodes/generated/kafka_sync_table.html index 10669f594f41..ed6fb823522f 100644 --- a/docs/layouts/shortcodes/generated/kafka_sync_table.html +++ b/docs/layouts/shortcodes/generated/kafka_sync_table.html @@ -70,6 +70,10 @@
--computed_column
The definitions of computed columns. The argument field is from Kafka topic's table field name. See here for a complete list of configurations. + +
--metadata_column
+ --metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data. Available values are topic, partition, offset, timestamp and timestamp_type (one of 'NoTimestampType', 'CreateTime' or 'LogAppendTime'). +
--kafka_conf
The configuration for Flink Kafka sources. Each configuration should be specified in the format `key=value`. `properties.bootstrap.servers`, `topic/topic-pattern`, `properties.group.id`, and `value.format` are required configurations, others are optional.See its document for a complete list of configurations. @@ -83,4 +87,4 @@ The configuration for Paimon table sink. Each configuration should be specified in the format "key=value". See here for a complete list of table configurations. - \ No newline at end of file + diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcMetadataConverter.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcMetadataConverter.java index 3ffeaa3d788b..0ee292e251d4 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcMetadataConverter.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcMetadataConverter.java @@ -34,13 +34,25 @@ * A functional interface for converting CDC metadata. * *

This interface provides a mechanism to convert Change Data Capture (CDC) metadata from a given - * {@link JsonNode} source. Implementations of this interface can be used to process and transform - * metadata entries from CDC sources. + * {@link JsonNode} source or {@link CdcSourceRecord}. Implementations of this interface can be used + * to process and transform metadata entries from CDC sources. */ public interface CdcMetadataConverter extends Serializable { String read(JsonNode payload); + /** + * Read metadata from a CDC source record. Default implementation throws + * UnsupportedOperationException to maintain backward compatibility. + * + * @param record the CDC source record + * @return the metadata value as a string + */ + default String read(CdcSourceRecord record) { + throw new UnsupportedOperationException( + "This metadata converter does not support reading from CdcSourceRecord"); + } + DataType dataType(); String columnName(); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcMetadataProcessor.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcMetadataProcessor.java index 9fdd7a4377e7..ce1114b3fd86 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcMetadataProcessor.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcMetadataProcessor.java @@ -18,6 +18,8 @@ package org.apache.paimon.flink.action.cdc; +import org.apache.paimon.flink.action.cdc.kafka.KafkaMetadataConverter; + import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; import java.util.Arrays; @@ -49,7 +51,14 @@ public enum CdcMetadataProcessor { new CdcMetadataConverter.DatabaseNameConverter(), new CdcMetadataConverter.TableNameConverter(), new CdcMetadataConverter.SchemaNameConverter(), - new CdcMetadataConverter.OpTsConverter()); + new CdcMetadataConverter.OpTsConverter()), + KAFKA_METADATA_PROCESSOR( + SyncJobHandler.SourceType.KAFKA, + new KafkaMetadataConverter.TopicConverter(), + new KafkaMetadataConverter.PartitionConverter(), + new KafkaMetadataConverter.OffsetConverter(), + new KafkaMetadataConverter.TimestampConverter(), + new KafkaMetadataConverter.TimestampTypeConverter()); private final SyncJobHandler.SourceType sourceType; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java index 51a14534c4c9..d0309ff3c7b4 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java @@ -21,6 +21,9 @@ import javax.annotation.Nullable; import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; /** A data change record from the CDC source. */ @@ -35,14 +38,29 @@ public class CdcSourceRecord implements Serializable { // TODO Use generics to support more scenarios. private final Object value; + // Generic metadata map - any source can add metadata + private final Map metadata; + public CdcSourceRecord(@Nullable String topic, @Nullable Object key, Object value) { - this.topic = topic; - this.key = key; - this.value = value; + this(topic, key, value, null); } public CdcSourceRecord(Object value) { - this(null, null, value); + this(null, null, value, null); + } + + public CdcSourceRecord( + @Nullable String topic, + @Nullable Object key, + Object value, + @Nullable Map metadata) { + this.topic = topic; + this.key = key; + this.value = value; + this.metadata = + metadata != null + ? Collections.unmodifiableMap(new HashMap<>(metadata)) + : Collections.emptyMap(); } @Nullable @@ -59,6 +77,15 @@ public Object getValue() { return value; } + public Map getMetadata() { + return metadata; + } + + @Nullable + public Object getMetadata(String key) { + return metadata.get(key); + } + @Override public boolean equals(Object o) { if (!(o instanceof CdcSourceRecord)) { @@ -68,12 +95,13 @@ public boolean equals(Object o) { CdcSourceRecord that = (CdcSourceRecord) o; return Objects.equals(topic, that.topic) && Objects.equals(key, that.key) - && Objects.equals(value, that.value); + && Objects.equals(value, that.value) + && Objects.equals(metadata, that.metadata); } @Override public int hashCode() { - return Objects.hash(topic, key, value); + return Objects.hash(topic, key, value, metadata); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractDataFormat.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractDataFormat.java index 66deba9b80f1..36d3cb59c8e2 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractDataFormat.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractDataFormat.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.action.cdc.format; +import org.apache.paimon.flink.action.cdc.CdcMetadataConverter; import org.apache.paimon.flink.action.cdc.CdcSourceRecord; import org.apache.paimon.flink.action.cdc.ComputedColumn; import org.apache.paimon.flink.action.cdc.TypeMapping; @@ -49,6 +50,16 @@ public AbstractRecordParser createParser( return parser().createParser(typeMapping, computedColumns); } + @Override + public AbstractRecordParser createParser( + TypeMapping typeMapping, + List computedColumns, + CdcMetadataConverter[] metadataConverters) { + // Most parsers don't support metadata converters, so we default to the 2-parameter version + // Only specific parsers like DebeziumAvroRecordParser will override this + return createParser(typeMapping, computedColumns); + } + @Override public KafkaDeserializationSchema createKafkaDeserializer( Configuration cdcSourceConfig) { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java index 85442067b981..45aca7311f05 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.action.cdc.format; +import org.apache.paimon.flink.action.cdc.CdcMetadataConverter; import org.apache.paimon.flink.action.cdc.CdcSourceRecord; import org.apache.paimon.flink.action.cdc.ComputedColumn; import org.apache.paimon.flink.action.cdc.TypeMapping; @@ -55,10 +56,21 @@ public abstract class AbstractRecordParser protected static final String FIELD_DATABASE = "database"; protected final TypeMapping typeMapping; protected final List computedColumns; + protected final CdcMetadataConverter[] metadataConverters; + protected CdcSourceRecord currentRecord; // Store current record for metadata access public AbstractRecordParser(TypeMapping typeMapping, List computedColumns) { + this(typeMapping, computedColumns, new CdcMetadataConverter[0]); + } + + public AbstractRecordParser( + TypeMapping typeMapping, + List computedColumns, + CdcMetadataConverter[] metadataConverters) { this.typeMapping = typeMapping; this.computedColumns = computedColumns; + this.metadataConverters = + metadataConverters != null ? metadataConverters : new CdcMetadataConverter[0]; } @Nullable @@ -88,7 +100,11 @@ public void flatMap(CdcSourceRecord value, Collector out } } - protected abstract void setRoot(CdcSourceRecord record); + protected void setRoot(CdcSourceRecord record) { + this.currentRecord = record; + // Call the original setRoot method for backward compatibility + // Subclasses can override this method as they used to + } protected abstract List extractRecords(); @@ -111,6 +127,24 @@ protected void evalComputedColumns( }); } + /** Extract metadata values using metadata converters. */ + protected void extractMetadata(Map rowData, CdcSchema.Builder schemaBuilder) { + for (CdcMetadataConverter metadataConverter : metadataConverters) { + try { + String value = metadataConverter.read(currentRecord); + if (value != null) { + rowData.put(metadataConverter.columnName(), value); + } + schemaBuilder.column(metadataConverter.columnName(), metadataConverter.dataType()); + } catch (UnsupportedOperationException e) { + // This converter doesn't support CdcSourceRecord, skip it + LOG.debug( + "Metadata converter {} does not support CdcSourceRecord", + metadataConverter.getClass().getSimpleName()); + } + } + } + /** Handle case sensitivity here. */ protected RichCdcMultiplexRecord createRecord( RowKind rowKind, Map data, CdcSchema.Builder schemaBuilder) { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java index 711f596ac545..4044e6e0d8e7 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.action.cdc.format; +import org.apache.paimon.flink.action.cdc.CdcMetadataConverter; import org.apache.paimon.flink.action.cdc.CdcSourceRecord; import org.apache.paimon.flink.action.cdc.ComputedColumn; import org.apache.paimon.flink.action.cdc.TypeMapping; @@ -38,11 +39,28 @@ public interface DataFormat { * Creates a new instance of {@link AbstractRecordParser} for this data format with the * specified configurations. * + * @param typeMapping Type mapping configuration * @param computedColumns List of computed columns to be considered by the parser. * @return A new instance of {@link AbstractRecordParser}. */ + default AbstractRecordParser createParser( + TypeMapping typeMapping, List computedColumns) { + return createParser(typeMapping, computedColumns, new CdcMetadataConverter[0]); + } + + /** + * Creates a new instance of {@link AbstractRecordParser} for this data format with the + * specified configurations including metadata converters. + * + * @param typeMapping Type mapping configuration + * @param computedColumns List of computed columns to be considered by the parser. + * @param metadataConverters Array of metadata converters for extracting CDC metadata + * @return A new instance of {@link AbstractRecordParser}. + */ AbstractRecordParser createParser( - TypeMapping typeMapping, List computedColumns); + TypeMapping typeMapping, + List computedColumns, + CdcMetadataConverter[] metadataConverters); KafkaDeserializationSchema createKafkaDeserializer( Configuration cdcSourceConfig); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroDataFormat.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroDataFormat.java index e75a49fb501e..654ae3e3caaf 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroDataFormat.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroDataFormat.java @@ -18,8 +18,12 @@ package org.apache.paimon.flink.action.cdc.format.debezium; +import org.apache.paimon.flink.action.cdc.CdcMetadataConverter; import org.apache.paimon.flink.action.cdc.CdcSourceRecord; +import org.apache.paimon.flink.action.cdc.ComputedColumn; +import org.apache.paimon.flink.action.cdc.TypeMapping; import org.apache.paimon.flink.action.cdc.format.AbstractDataFormat; +import org.apache.paimon.flink.action.cdc.format.AbstractRecordParser; import org.apache.paimon.flink.action.cdc.format.RecordParserFactory; import org.apache.paimon.flink.action.cdc.kafka.KafkaDebeziumAvroDeserializationSchema; import org.apache.paimon.flink.action.cdc.pulsar.PulsarDebeziumAvroDeserializationSchema; @@ -28,6 +32,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; +import java.util.List; import java.util.function.Function; /** @@ -41,6 +46,14 @@ protected RecordParserFactory parser() { return DebeziumAvroRecordParser::new; } + @Override + public AbstractRecordParser createParser( + TypeMapping typeMapping, + List computedColumns, + CdcMetadataConverter[] metadataConverters) { + return new DebeziumAvroRecordParser(typeMapping, computedColumns, metadataConverters); + } + @Override protected Function> kafkaDeserializer() { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java index 7c3763a604c5..1f944df4a29f 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.action.cdc.format.debezium; +import org.apache.paimon.flink.action.cdc.CdcMetadataConverter; import org.apache.paimon.flink.action.cdc.CdcSourceRecord; import org.apache.paimon.flink.action.cdc.ComputedColumn; import org.apache.paimon.flink.action.cdc.TypeMapping; @@ -78,8 +79,16 @@ public DebeziumAvroRecordParser(TypeMapping typeMapping, List co super(typeMapping, computedColumns); } + public DebeziumAvroRecordParser( + TypeMapping typeMapping, + List computedColumns, + CdcMetadataConverter[] metadataConverters) { + super(typeMapping, computedColumns, metadataConverters); + } + @Override protected void setRoot(CdcSourceRecord record) { + super.setRoot(record); // Store current record for metadata access keyRecord = (GenericRecord) record.getKey(); valueRecord = (GenericRecord) record.getValue(); } @@ -159,6 +168,7 @@ private Map extractRowData( } evalComputedColumns(resultMap, schemaBuilder); + extractMetadata(resultMap, schemaBuilder); return resultMap; } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java index b937ad2eda4c..5d2526030ee0 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java @@ -36,13 +36,11 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.jetbrains.annotations.NotNull; import java.time.Duration; import java.util.Arrays; @@ -320,6 +318,18 @@ private static String findOneTopic(Configuration kafkaConfig, Properties propert } } + protected static @NotNull Map extractKafkaMetadata( + ConsumerRecord message) { + // Add the Kafka message metadata that can be used with --metadata_column + Map kafkaMetadata = new HashMap<>(); + kafkaMetadata.put("topic", message.topic()); + kafkaMetadata.put("partition", message.partition()); + kafkaMetadata.put("offset", message.offset()); + kafkaMetadata.put("timestamp", message.timestamp()); + kafkaMetadata.put("timestamp_type", message.timestampType().name); + return kafkaMetadata; + } + private static class KafkaConsumerWrapper implements MessageQueueSchemaUtils.ConsumerWrapper { private final KafkaConsumer consumer; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java index eea364d460de..1f98c60e8dc8 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java @@ -76,7 +76,9 @@ public CdcSourceRecord deserialize(ConsumerRecord message) throw key = (GenericRecord) keyContainerWithVersion.container(); } GenericRecord value = (GenericRecord) valueContainerWithVersion.container(); - return new CdcSourceRecord(topic, key, value); + + return new CdcSourceRecord( + topic, key, value, KafkaActionUtils.extractKafkaMetadata(message)); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java index 887af5f6060a..1bd7ed25a09e 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Map; import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass; @@ -76,7 +77,9 @@ public CdcSourceRecord deserialize(ConsumerRecord message) throw } JsonNode valueNode = objectMapper.readValue(message.value(), JsonNode.class); - return new CdcSourceRecord(null, keyNode, valueNode); + + Map kafkaMetadata = KafkaActionUtils.extractKafkaMetadata(message); + return new CdcSourceRecord(message.topic(), keyNode, valueNode, kafkaMetadata); } catch (Exception e) { LOG.error("Invalid Json:\n{}", new String(message.value())); throw e; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java new file mode 100644 index 000000000000..ccd8413e6cfb --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.kafka; + +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.flink.action.cdc.CdcMetadataConverter; +import org.apache.paimon.flink.action.cdc.CdcSourceRecord; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.DateTimeUtils; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; + +import java.util.TimeZone; + +/** + * Kafka-specific implementations of {@link CdcMetadataConverter} for extracting Kafka message + * metadata. + * + *

These converters read from the generic metadata map in {@link CdcSourceRecord} to extract + * Kafka-specific metadata like topic, partition, offset, timestamp, and timestamp type. + */ +public class KafkaMetadataConverter { + + /** Converter for Kafka topic name. */ + public static class TopicConverter implements CdcMetadataConverter { + private static final long serialVersionUID = 1L; + + @Override + public String read(JsonNode source) { + throw new UnsupportedOperationException( + "Kafka metadata converters should be used with CdcSourceRecord, not JsonNode"); + } + + @Override + public String read(CdcSourceRecord record) { + return record.getTopic(); + } + + @Override + public DataType dataType() { + return DataTypes.STRING().notNull(); + } + + @Override + public String columnName() { + return "topic"; + } + } + + /** Converter for Kafka partition number. */ + public static class PartitionConverter implements CdcMetadataConverter { + private static final long serialVersionUID = 1L; + + @Override + public String read(JsonNode source) { + throw new UnsupportedOperationException( + "Kafka metadata converters should be used with CdcSourceRecord, not JsonNode"); + } + + @Override + public String read(CdcSourceRecord record) { + Object partition = record.getMetadata("partition"); + return partition != null ? partition.toString() : null; + } + + @Override + public DataType dataType() { + return DataTypes.INT(); + } + + @Override + public String columnName() { + return "partition"; + } + } + + /** Converter for Kafka message offset. */ + public static class OffsetConverter implements CdcMetadataConverter { + private static final long serialVersionUID = 1L; + + @Override + public String read(JsonNode source) { + throw new UnsupportedOperationException( + "Kafka metadata converters should be used with CdcSourceRecord, not JsonNode"); + } + + @Override + public String read(CdcSourceRecord record) { + Object offset = record.getMetadata("offset"); + return offset != null ? offset.toString() : null; + } + + @Override + public DataType dataType() { + return DataTypes.BIGINT(); + } + + @Override + public String columnName() { + return "offset"; + } + } + + /** Converter for Kafka message timestamp. */ + public static class TimestampConverter implements CdcMetadataConverter { + private static final long serialVersionUID = 1L; + + @Override + public String read(JsonNode source) { + throw new UnsupportedOperationException( + "Kafka metadata converters should be used with CdcSourceRecord, not JsonNode"); + } + + @Override + public String read(CdcSourceRecord record) { + Object timestamp = record.getMetadata("timestamp"); + if (timestamp != null && timestamp instanceof Long) { + return DateTimeUtils.formatTimestamp( + Timestamp.fromEpochMillis((Long) timestamp), TimeZone.getDefault(), 3); + } + return null; + } + + @Override + public DataType dataType() { + return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3); + } + + @Override + public String columnName() { + return "timestamp"; + } + } + + /** Converter for Kafka timestamp type. */ + public static class TimestampTypeConverter implements CdcMetadataConverter { + private static final long serialVersionUID = 1L; + + @Override + public String read(JsonNode source) { + throw new UnsupportedOperationException( + "Kafka metadata converters should be used with CdcSourceRecord, not JsonNode"); + } + + @Override + public String read(CdcSourceRecord record) { + Object timestampType = record.getMetadata("timestamp_type"); + return timestampType != null ? timestampType.toString() : null; + } + + @Override + public DataType dataType() { + return DataTypes.STRING(); + } + + @Override + public String columnName() { + return "timestamp_type"; + } + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java index eb3332c731fd..ab17d4408d14 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java @@ -62,6 +62,7 @@ public void printHelp() { + "[--including_tables ] \\\n" + "[--excluding_tables ] \\\n" + "[--type_mapping ] \\\n" + + "[--metadata_column ] \\\n" + "[--kafka_conf [--kafka_conf ...]] \\\n" + "[--catalog_conf [--catalog_conf ...]] \\\n" + "[--table_conf [--table_conf ...]]"); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java index 59976c9abbd0..770c85434ca3 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java @@ -60,6 +60,7 @@ public void printHelp() { + "[--primary_keys ] \\\n" + "[--type_mapping ] \\\n" + "[--computed_column <'column_name=expr_name(args[, ...])'> [--computed_column ...]] \\\n" + + "[--metadata_column ] \\\n" + "[--kafka_conf [--kafka_conf ...]] \\\n" + "[--catalog_conf [--catalog_conf ...]] \\\n" + "[--table_conf [--table_conf ...]]"); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverterTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverterTest.java new file mode 100644 index 000000000000..81aa9f9c8055 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverterTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.kafka; + +import org.apache.paimon.flink.action.cdc.CdcSourceRecord; +import org.apache.paimon.types.DataTypes; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link KafkaMetadataConverter}. */ +public class KafkaMetadataConverterTest { + + @Test + public void testTopicConverter() { + KafkaMetadataConverter.TopicConverter converter = + new KafkaMetadataConverter.TopicConverter(); + + // Test data type and column name + assertThat(converter.dataType()).isEqualTo(DataTypes.STRING().notNull()); + assertThat(converter.columnName()).isEqualTo("topic"); + + // Test reading from CdcSourceRecord + CdcSourceRecord record = new CdcSourceRecord("test-topic", null, "value"); + assertThat(converter.read(record)).isEqualTo("test-topic"); + + // Test with null topic + CdcSourceRecord recordWithNullTopic = new CdcSourceRecord(null, null, "value"); + assertThat(converter.read(recordWithNullTopic)).isNull(); + + // Test JsonNode method throws exception + assertThatThrownBy( + () -> + converter.read( + (org.apache.paimon.shade.jackson2.com.fasterxml.jackson + .databind.JsonNode) + null)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining( + "Kafka metadata converters should be used with CdcSourceRecord"); + } + + @Test + public void testPartitionConverter() { + KafkaMetadataConverter.PartitionConverter converter = + new KafkaMetadataConverter.PartitionConverter(); + + // Test data type and column name + assertThat(converter.dataType()).isEqualTo(DataTypes.INT()); + assertThat(converter.columnName()).isEqualTo("partition"); + + // Test reading from CdcSourceRecord with metadata + Map metadata = new HashMap<>(); + metadata.put("partition", 5); + CdcSourceRecord record = new CdcSourceRecord("topic", null, "value", metadata); + assertThat(converter.read(record)).isEqualTo("5"); + + // Test with missing partition metadata + CdcSourceRecord recordWithoutPartition = new CdcSourceRecord("topic", null, "value"); + assertThat(converter.read(recordWithoutPartition)).isNull(); + } + + @Test + public void testOffsetConverter() { + KafkaMetadataConverter.OffsetConverter converter = + new KafkaMetadataConverter.OffsetConverter(); + + // Test data type and column name + assertThat(converter.dataType()).isEqualTo(DataTypes.BIGINT()); + assertThat(converter.columnName()).isEqualTo("offset"); + + // Test reading from CdcSourceRecord with metadata + Map metadata = new HashMap<>(); + metadata.put("offset", 12345L); + CdcSourceRecord record = new CdcSourceRecord("topic", null, "value", metadata); + assertThat(converter.read(record)).isEqualTo("12345"); + + // Test with missing offset metadata + CdcSourceRecord recordWithoutOffset = new CdcSourceRecord("topic", null, "value"); + assertThat(converter.read(recordWithoutOffset)).isNull(); + } + + @Test + public void testTimestampConverter() { + KafkaMetadataConverter.TimestampConverter converter = + new KafkaMetadataConverter.TimestampConverter(); + + // Test data type and column name + assertThat(converter.dataType()).isEqualTo(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)); + assertThat(converter.columnName()).isEqualTo("timestamp"); + + // Test reading from CdcSourceRecord with metadata + Map metadata = new HashMap<>(); + metadata.put("timestamp", 1640995200000L); // 2022-01-01 00:00:00 UTC + CdcSourceRecord record = new CdcSourceRecord("topic", null, "value", metadata); + String result = converter.read(record); + assertThat(result).isNotNull(); + assertThat(result).contains("2022-01-01"); + + // Test with missing timestamp metadata + CdcSourceRecord recordWithoutTimestamp = new CdcSourceRecord("topic", null, "value"); + assertThat(converter.read(recordWithoutTimestamp)).isNull(); + + // Test with non-Long timestamp + Map invalidMetadata = new HashMap<>(); + invalidMetadata.put("timestamp", "not-a-long"); + CdcSourceRecord recordWithInvalidTimestamp = + new CdcSourceRecord("topic", null, "value", invalidMetadata); + assertThat(converter.read(recordWithInvalidTimestamp)).isNull(); + } + + @Test + public void testTimestampTypeConverter() { + KafkaMetadataConverter.TimestampTypeConverter converter = + new KafkaMetadataConverter.TimestampTypeConverter(); + + // Test data type and column name + assertThat(converter.dataType()).isEqualTo(DataTypes.STRING()); + assertThat(converter.columnName()).isEqualTo("timestamp_type"); + + // Test reading from CdcSourceRecord with metadata + Map metadata = new HashMap<>(); + metadata.put("timestamp_type", "CreateTime"); + CdcSourceRecord record = new CdcSourceRecord("topic", null, "value", metadata); + assertThat(converter.read(record)).isEqualTo("CreateTime"); + + // Test with LogAppendTime + metadata.put("timestamp_type", "LogAppendTime"); + CdcSourceRecord recordLogAppend = new CdcSourceRecord("topic", null, "value", metadata); + assertThat(converter.read(recordLogAppend)).isEqualTo("LogAppendTime"); + + // Test with NoTimestampType + metadata.put("timestamp_type", "NoTimestampType"); + CdcSourceRecord recordNoTimestamp = new CdcSourceRecord("topic", null, "value", metadata); + assertThat(converter.read(recordNoTimestamp)).isEqualTo("NoTimestampType"); + + // Test with missing timestamp_type metadata + CdcSourceRecord recordWithoutTimestampType = new CdcSourceRecord("topic", null, "value"); + assertThat(converter.read(recordWithoutTimestampType)).isNull(); + } + + @Test + public void testAllConvertersWithCompleteMetadata() { + // Create a CdcSourceRecord with all Kafka metadata + Map metadata = new HashMap<>(); + metadata.put("partition", 3); + metadata.put("offset", 9876L); + metadata.put("timestamp", 1640995200000L); + metadata.put("timestamp_type", "CreateTime"); + + CdcSourceRecord record = new CdcSourceRecord("my-topic", "key", "value", metadata); + + // Test all converters + KafkaMetadataConverter.TopicConverter topicConverter = + new KafkaMetadataConverter.TopicConverter(); + KafkaMetadataConverter.PartitionConverter partitionConverter = + new KafkaMetadataConverter.PartitionConverter(); + KafkaMetadataConverter.OffsetConverter offsetConverter = + new KafkaMetadataConverter.OffsetConverter(); + KafkaMetadataConverter.TimestampConverter timestampConverter = + new KafkaMetadataConverter.TimestampConverter(); + KafkaMetadataConverter.TimestampTypeConverter timestampTypeConverter = + new KafkaMetadataConverter.TimestampTypeConverter(); + + assertThat(topicConverter.read(record)).isEqualTo("my-topic"); + assertThat(partitionConverter.read(record)).isEqualTo("3"); + assertThat(offsetConverter.read(record)).isEqualTo("9876"); + assertThat(timestampConverter.read(record)).isNotNull(); + assertThat(timestampTypeConverter.read(record)).isEqualTo("CreateTime"); + } +} From ae9a58a4ee19341df353cae527a8a12a4aaabfa7 Mon Sep 17 00:00:00 2001 From: Max Falk Date: Mon, 29 Sep 2025 11:43:53 +0200 Subject: [PATCH 2/8] explicit imports Signed-off-by: Max Falk --- .../paimon/flink/action/cdc/kafka/KafkaActionUtils.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java index 5d2526030ee0..563733a1f3f1 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java @@ -36,7 +36,11 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; From df113fb09a72713658afca8bdfb9d01512410012 Mon Sep 17 00:00:00 2001 From: Max Falk Date: Mon, 29 Sep 2025 12:08:27 +0200 Subject: [PATCH 3/8] use message metadata for topic Signed-off-by: Max Falk --- .../paimon/flink/action/cdc/kafka/KafkaActionUtils.java | 3 +-- .../flink/action/cdc/kafka/KafkaMetadataConverter.java | 5 +++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java index 563733a1f3f1..6391981d0f58 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java @@ -44,7 +44,6 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.jetbrains.annotations.NotNull; import java.time.Duration; import java.util.Arrays; @@ -322,7 +321,7 @@ private static String findOneTopic(Configuration kafkaConfig, Properties propert } } - protected static @NotNull Map extractKafkaMetadata( + protected static Map extractKafkaMetadata( ConsumerRecord message) { // Add the Kafka message metadata that can be used with --metadata_column Map kafkaMetadata = new HashMap<>(); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java index ccd8413e6cfb..af88bd2deb75 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java @@ -50,12 +50,13 @@ public String read(JsonNode source) { @Override public String read(CdcSourceRecord record) { - return record.getTopic(); + Object partition = record.getMetadata("topic"); + return partition != null ? partition.toString() : null; } @Override public DataType dataType() { - return DataTypes.STRING().notNull(); + return DataTypes.STRING(); } @Override From 0740237f6abd7a852f210ada67aed30e245e2ee7 Mon Sep 17 00:00:00 2001 From: Max Falk Date: Mon, 29 Sep 2025 12:13:57 +0200 Subject: [PATCH 4/8] fix tests Signed-off-by: Max Falk --- .../flink/action/cdc/kafka/KafkaMetadataConverter.java | 4 ++-- .../flink/action/cdc/kafka/KafkaMetadataConverterTest.java | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java index af88bd2deb75..212dc3053466 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java @@ -50,8 +50,8 @@ public String read(JsonNode source) { @Override public String read(CdcSourceRecord record) { - Object partition = record.getMetadata("topic"); - return partition != null ? partition.toString() : null; + Object topic = record.getMetadata("topic"); + return topic != null ? topic.toString() : null; } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverterTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverterTest.java index 81aa9f9c8055..7408768b04af 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverterTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverterTest.java @@ -38,12 +38,12 @@ public void testTopicConverter() { new KafkaMetadataConverter.TopicConverter(); // Test data type and column name - assertThat(converter.dataType()).isEqualTo(DataTypes.STRING().notNull()); + assertThat(converter.dataType()).isEqualTo(DataTypes.STRING()); assertThat(converter.columnName()).isEqualTo("topic"); // Test reading from CdcSourceRecord CdcSourceRecord record = new CdcSourceRecord("test-topic", null, "value"); - assertThat(converter.read(record)).isEqualTo("test-topic"); + assertThat(converter.read(record)).isEqualTo(null); // Test with null topic CdcSourceRecord recordWithNullTopic = new CdcSourceRecord(null, null, "value"); @@ -164,6 +164,7 @@ public void testTimestampTypeConverter() { public void testAllConvertersWithCompleteMetadata() { // Create a CdcSourceRecord with all Kafka metadata Map metadata = new HashMap<>(); + metadata.put("topic", "my-topic"); metadata.put("partition", 3); metadata.put("offset", 9876L); metadata.put("timestamp", 1640995200000L); From 42c01953213fd657b52325e2f78a2f8e12a8799e Mon Sep 17 00:00:00 2001 From: Max Falk Date: Mon, 29 Sep 2025 12:58:07 +0200 Subject: [PATCH 5/8] add itest Signed-off-by: Max Falk --- .../cdc/kafka/KafkaMetadataConverter.java | 12 +++-- .../KafkaDebeziumSyncTableActionITCase.java | 6 +++ .../cdc/kafka/KafkaSyncTableActionITCase.java | 53 +++++++++++++++++++ .../table/metadatacolumn/debezium-data-1.txt | 19 +++++++ 4 files changed, 85 insertions(+), 5 deletions(-) create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/metadatacolumn/debezium-data-1.txt diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java index 212dc3053466..a5bdbd7c5b4c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java @@ -38,6 +38,8 @@ */ public class KafkaMetadataConverter { + public static String KAFKA_METADATA_COLUMN_PREFIX = "__kafka_"; + /** Converter for Kafka topic name. */ public static class TopicConverter implements CdcMetadataConverter { private static final long serialVersionUID = 1L; @@ -61,7 +63,7 @@ public DataType dataType() { @Override public String columnName() { - return "topic"; + return KAFKA_METADATA_COLUMN_PREFIX + "topic"; } } @@ -88,7 +90,7 @@ public DataType dataType() { @Override public String columnName() { - return "partition"; + return KAFKA_METADATA_COLUMN_PREFIX + "partition"; } } @@ -115,7 +117,7 @@ public DataType dataType() { @Override public String columnName() { - return "offset"; + return KAFKA_METADATA_COLUMN_PREFIX + "offset"; } } @@ -146,7 +148,7 @@ public DataType dataType() { @Override public String columnName() { - return "timestamp"; + return KAFKA_METADATA_COLUMN_PREFIX + "timestamp"; } } @@ -173,7 +175,7 @@ public DataType dataType() { @Override public String columnName() { - return "timestamp_type"; + return KAFKA_METADATA_COLUMN_PREFIX + "timestamp_type"; } } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java index 430598753629..2ba5bd82b117 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java @@ -92,6 +92,12 @@ public void testComputedColumn() throws Exception { testComputedColumn(DEBEZIUM); } + @Test + @Timeout(60) + public void testMetadataColumn() throws Exception { + testMetadataColumn(DEBEZIUM); + } + @Test @Timeout(60) public void testWaterMarkSyncTable() throws Exception { diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java index f5b6bb5923e9..c17d8bfb043d 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java @@ -535,6 +535,59 @@ public void testComputedColumn(String format) throws Exception { Arrays.asList("_id", "_year")); } + public void testMetadataColumn(String format) throws Exception { + String topic = "metadata_column"; + createTestTopic(topic, 1, 1); + writeRecordsToKafka(topic, "kafka/%s/table/metadatacolumn/%s-data-1.txt", format, format); + + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); + kafkaConfig.put(TOPIC.key(), topic); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig) + .withPartitionKeys("_year") + .withPrimaryKeys("_id", "_year") + .withMetadataColumns("topic,offset,partition,timestamp,timestamp_type") + .withTableConfig(getBasicTableConfig()) + .build(); + runActionWithDefaultEnv(action); + + Configuration kafkaConfigObject = Configuration.fromMap(kafkaConfig); + + Schema kafkaSchema = + MessageQueueSchemaUtils.getSchema( + getKafkaEarliestConsumer( + kafkaConfigObject, new KafkaDebeziumJsonDeserializationSchema()), + getDataFormat(kafkaConfigObject), + TypeMapping.defaultMapping()); + List fields = new ArrayList<>(); + // {"id": 101, "name": "scooter", "description": "Small 2-wheel scooter", "weight": 3.14} + fields.add(new DataField(0, "_id", DataTypes.STRING())); + fields.add(new DataField(1, "_date", DataTypes.STRING())); + fields.add(new DataField(2, "_year", DataTypes.STRING())); + fields.add(new DataField(3, "__kafka_topic", DataTypes.STRING())); + fields.add(new DataField(4, "__kafka_partition", DataTypes.INT())); + fields.add(new DataField(5, "__kafka_offset", DataTypes.BIGINT())); + fields.add( + new DataField(6, "__kafka_timestamp", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))); + fields.add(new DataField(6, "__kafka_timestamp_type", DataTypes.STRING())); + assertThat(kafkaSchema.fields()).isEqualTo(fields); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.INT().notNull() + }, + new String[] {"_id", "_date", "_year"}); + waitForResult( + Collections.singletonList("+I[101, 2023-03-23, 2023, metadata_column]"), + getFileStoreTable(tableName), + rowType, + Arrays.asList("_id", "_year")); + } + protected void testCDCOperations(String format) throws Exception { String topic = "event"; createTestTopic(topic, 1, 1); diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/metadatacolumn/debezium-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/metadatacolumn/debezium-data-1.txt new file mode 100644 index 000000000000..fce341e17d4d --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/metadatacolumn/debezium-data-1.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"before": null, "after": {"_id":101,"_date":"2023-03-23"}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null} From 8b523b5168dd26e4f5deb0b5cef010d0a95d3726 Mon Sep 17 00:00:00 2001 From: Max Falk Date: Mon, 29 Sep 2025 13:56:38 +0200 Subject: [PATCH 6/8] add metadata_column evaluation to other kafka cdc formats Signed-off-by: Max Falk --- .../flink/action/cdc/format/AbstractJsonRecordParser.java | 2 ++ .../paimon/flink/action/cdc/format/AbstractRecordParser.java | 2 +- .../flink/action/cdc/format/aliyun/AliyunRecordParser.java | 2 ++ .../paimon/flink/action/cdc/format/canal/CanalRecordParser.java | 1 + .../action/cdc/format/debezium/DebeziumAvroRecordParser.java | 2 +- .../action/cdc/format/debezium/DebeziumBsonRecordParser.java | 1 + .../action/cdc/format/debezium/DebeziumJsonRecordParser.java | 1 + 7 files changed, 9 insertions(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonRecordParser.java index 76289aa355fb..d9e10df1ba54 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonRecordParser.java @@ -104,6 +104,8 @@ protected Map extractRowData(JsonNode record, CdcSchema.Builder return Objects.toString(entry.getValue()); })); evalComputedColumns(rowData, schemaBuilder); + evalMetadataColumns(rowData, schemaBuilder); + return rowData; } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java index 45aca7311f05..670efc2f1f80 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java @@ -128,7 +128,7 @@ protected void evalComputedColumns( } /** Extract metadata values using metadata converters. */ - protected void extractMetadata(Map rowData, CdcSchema.Builder schemaBuilder) { + protected void evalMetadataColumns(Map rowData, CdcSchema.Builder schemaBuilder) { for (CdcMetadataConverter metadataConverter : metadataConverters) { try { String value = metadataConverter.read(currentRecord); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java index e14e4ab4b7e9..4f695d11a02e 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java @@ -192,6 +192,8 @@ protected Map extractRowData(JsonNode record, CdcSchema.Builder } evalComputedColumns(rowData, schemaBuilder); + evalMetadataColumns(rowData, schemaBuilder); + return rowData; } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java index 5864396564fe..b196dec8db9a 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java @@ -181,6 +181,7 @@ protected Map extractRowData(JsonNode record, CdcSchema.Builder } evalComputedColumns(rowData, schemaBuilder); + evalMetadataColumns(rowData, schemaBuilder); return rowData; } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java index 1f944df4a29f..e5135db551e8 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java @@ -168,7 +168,7 @@ private Map extractRowData( } evalComputedColumns(resultMap, schemaBuilder); - extractMetadata(resultMap, schemaBuilder); + evalMetadataColumns(resultMap, schemaBuilder); return resultMap; } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java index 5c1317063841..134ed8b3831c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java @@ -137,6 +137,7 @@ protected Map extractRowData(JsonNode record, CdcSchema.Builder } evalComputedColumns(resultMap, schemaBuilder); + evalMetadataColumns(resultMap, schemaBuilder); return resultMap; } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java index 19156fb916c7..fcad38377071 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java @@ -212,6 +212,7 @@ protected Map extractRowData(JsonNode record, CdcSchema.Builder } evalComputedColumns(resultMap, schemaBuilder); + evalMetadataColumns(resultMap, schemaBuilder); return resultMap; } From 2701a64665fdad8e928fbbc5b154cb3e24b16823 Mon Sep 17 00:00:00 2001 From: Max Falk Date: Mon, 29 Sep 2025 13:57:01 +0200 Subject: [PATCH 7/8] spotless Signed-off-by: Max Falk --- .../paimon/flink/action/cdc/format/AbstractRecordParser.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java index 670efc2f1f80..48c7fdce21d6 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java @@ -128,7 +128,8 @@ protected void evalComputedColumns( } /** Extract metadata values using metadata converters. */ - protected void evalMetadataColumns(Map rowData, CdcSchema.Builder schemaBuilder) { + protected void evalMetadataColumns( + Map rowData, CdcSchema.Builder schemaBuilder) { for (CdcMetadataConverter metadataConverter : metadataConverters) { try { String value = metadataConverter.read(currentRecord); From 2a6715ea29e76d30e1138e3c02c2595d0051818d Mon Sep 17 00:00:00 2001 From: Max Falk Date: Mon, 29 Sep 2025 14:56:05 +0200 Subject: [PATCH 8/8] dedup Signed-off-by: Max Falk --- .../cdc/kafka/KafkaMetadataConverter.java | 145 ++++++------------ 1 file changed, 46 insertions(+), 99 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java index a5bdbd7c5b4c..cb36a9206229 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMetadataConverter.java @@ -36,99 +36,74 @@ *

These converters read from the generic metadata map in {@link CdcSourceRecord} to extract * Kafka-specific metadata like topic, partition, offset, timestamp, and timestamp type. */ -public class KafkaMetadataConverter { +public class KafkaMetadataConverter implements CdcMetadataConverter { - public static String KAFKA_METADATA_COLUMN_PREFIX = "__kafka_"; + protected static String KAFKA_METADATA_COLUMN_PREFIX = "__kafka_"; + private static final long serialVersionUID = 1L; - /** Converter for Kafka topic name. */ - public static class TopicConverter implements CdcMetadataConverter { - private static final long serialVersionUID = 1L; + private final String fieldName; + private final DataType dataType; - @Override - public String read(JsonNode source) { - throw new UnsupportedOperationException( - "Kafka metadata converters should be used with CdcSourceRecord, not JsonNode"); - } + public KafkaMetadataConverter(String fieldName, DataType dataType) { + this.fieldName = fieldName; + this.dataType = dataType; + } - @Override - public String read(CdcSourceRecord record) { - Object topic = record.getMetadata("topic"); - return topic != null ? topic.toString() : null; - } + @Override + public String read(JsonNode source) { + throw new UnsupportedOperationException( + "Kafka metadata converters should be used with CdcSourceRecord, not JsonNode"); + } - @Override - public DataType dataType() { - return DataTypes.STRING(); - } + @Override + public String read(CdcSourceRecord record) { + Object metadata = record.getMetadata(this.fieldName); + return metadata != null ? metadata.toString() : null; + } - @Override - public String columnName() { - return KAFKA_METADATA_COLUMN_PREFIX + "topic"; - } + @Override + public DataType dataType() { + return this.dataType; } - /** Converter for Kafka partition number. */ - public static class PartitionConverter implements CdcMetadataConverter { - private static final long serialVersionUID = 1L; + @Override + public String columnName() { + return KAFKA_METADATA_COLUMN_PREFIX + this.fieldName; + } - @Override - public String read(JsonNode source) { - throw new UnsupportedOperationException( - "Kafka metadata converters should be used with CdcSourceRecord, not JsonNode"); - } + /** Converter for Kafka topic name. */ + public static class TopicConverter extends KafkaMetadataConverter { + private static final long serialVersionUID = 1L; - @Override - public String read(CdcSourceRecord record) { - Object partition = record.getMetadata("partition"); - return partition != null ? partition.toString() : null; + public TopicConverter() { + super("topic", DataTypes.STRING()); } + } - @Override - public DataType dataType() { - return DataTypes.INT(); - } + /** Converter for Kafka partition number. */ + public static class PartitionConverter extends KafkaMetadataConverter { + private static final long serialVersionUID = 1L; - @Override - public String columnName() { - return KAFKA_METADATA_COLUMN_PREFIX + "partition"; + public PartitionConverter() { + super("partition", DataTypes.INT()); } } /** Converter for Kafka message offset. */ - public static class OffsetConverter implements CdcMetadataConverter { + public static class OffsetConverter extends KafkaMetadataConverter { private static final long serialVersionUID = 1L; - @Override - public String read(JsonNode source) { - throw new UnsupportedOperationException( - "Kafka metadata converters should be used with CdcSourceRecord, not JsonNode"); - } - - @Override - public String read(CdcSourceRecord record) { - Object offset = record.getMetadata("offset"); - return offset != null ? offset.toString() : null; - } - - @Override - public DataType dataType() { - return DataTypes.BIGINT(); - } - - @Override - public String columnName() { - return KAFKA_METADATA_COLUMN_PREFIX + "offset"; + public OffsetConverter() { + super("offset", DataTypes.BIGINT()); } } /** Converter for Kafka message timestamp. */ - public static class TimestampConverter implements CdcMetadataConverter { + public static class TimestampConverter extends KafkaMetadataConverter { private static final long serialVersionUID = 1L; - @Override - public String read(JsonNode source) { - throw new UnsupportedOperationException( - "Kafka metadata converters should be used with CdcSourceRecord, not JsonNode"); + public TimestampConverter() { + super("timestamp", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)); } @Override @@ -140,42 +115,14 @@ public String read(CdcSourceRecord record) { } return null; } - - @Override - public DataType dataType() { - return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3); - } - - @Override - public String columnName() { - return KAFKA_METADATA_COLUMN_PREFIX + "timestamp"; - } } /** Converter for Kafka timestamp type. */ - public static class TimestampTypeConverter implements CdcMetadataConverter { + public static class TimestampTypeConverter extends KafkaMetadataConverter { private static final long serialVersionUID = 1L; - @Override - public String read(JsonNode source) { - throw new UnsupportedOperationException( - "Kafka metadata converters should be used with CdcSourceRecord, not JsonNode"); - } - - @Override - public String read(CdcSourceRecord record) { - Object timestampType = record.getMetadata("timestamp_type"); - return timestampType != null ? timestampType.toString() : null; - } - - @Override - public DataType dataType() { - return DataTypes.STRING(); - } - - @Override - public String columnName() { - return KAFKA_METADATA_COLUMN_PREFIX + "timestamp_type"; + public TimestampTypeConverter() { + super("timestamp_type", DataTypes.STRING()); } } }