Skip to content
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/content/cdc-ingestion/kafka-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ To use this feature through `flink run`, run the following shell command.
[--primary_keys <primary-keys>] \
[--type_mapping to-string] \
[--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \
[--metadata_column <metadata-column>] \
[--kafka_conf <kafka-source-conf> [--kafka_conf <kafka-source-conf> ...]] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
[--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]
Expand Down Expand Up @@ -215,6 +216,7 @@ To use this feature through `flink run`, run the following shell command.
[--partition_keys <partition_keys>] \
[--primary_keys <primary-keys>] \
[--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \
[--metadata_column <metadata-column>] \
[--kafka_conf <kafka-source-conf> [--kafka_conf <kafka-source-conf> ...]] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
[--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]
Expand Down
4 changes: 4 additions & 0 deletions docs/layouts/shortcodes/generated/kafka_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@
<td><h5>--computed_column</h5></td>
<td>The definitions of computed columns. The argument field is from Kafka topic's table field name. See <a href="../overview/#computed-functions">here</a> for a complete list of configurations. NOTICE: It returns null if the referenced column does not exist in the source table.</td>
</tr>
<tr>
<td><h5>--metadata_column</h5></td>
<td>--metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data. Available values are topic, partition, offset, timestamp and timestamp_type (one of 'NoTimestampType', 'CreateTime' or 'LogAppendTime').</td>
</tr>
<tr>
<td><h5>--eager_init</h5></td>
<td>It is default false. If true, all relevant tables commiter will be initialized eagerly, which means those tables could be forced to create snapshot.</td>
Expand Down
6 changes: 5 additions & 1 deletion docs/layouts/shortcodes/generated/kafka_sync_table.html
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@
<td><h5>--computed_column</h5></td>
<td>The definitions of computed columns. The argument field is from Kafka topic's table field name. See <a href="../overview/#computed-functions">here</a> for a complete list of configurations. </td>
</tr>
<tr>
<td><h5>--metadata_column</h5></td>
<td>--metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data. Available values are topic, partition, offset, timestamp and timestamp_type (one of 'NoTimestampType', 'CreateTime' or 'LogAppendTime').</td>
</tr>
<tr>
<td><h5>--kafka_conf</h5></td>
<td>The configuration for Flink Kafka sources. Each configuration should be specified in the format `key=value`. `properties.bootstrap.servers`, `topic/topic-pattern`, `properties.group.id`, and `value.format` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#connector-options">document</a> for a complete list of configurations.</td>
Expand All @@ -83,4 +87,4 @@
<td>The configuration for Paimon table sink. Each configuration should be specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a complete list of table configurations.</td>
</tr>
</tbody>
</table>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,25 @@
* A functional interface for converting CDC metadata.
*
* <p>This interface provides a mechanism to convert Change Data Capture (CDC) metadata from a given
* {@link JsonNode} source. Implementations of this interface can be used to process and transform
* metadata entries from CDC sources.
* {@link JsonNode} source or {@link CdcSourceRecord}. Implementations of this interface can be used
* to process and transform metadata entries from CDC sources.
*/
public interface CdcMetadataConverter extends Serializable {

String read(JsonNode payload);

/**
* Read metadata from a CDC source record. Default implementation throws
* UnsupportedOperationException to maintain backward compatibility.
*
* @param record the CDC source record
* @return the metadata value as a string
*/
default String read(CdcSourceRecord record) {
throw new UnsupportedOperationException(
"This metadata converter does not support reading from CdcSourceRecord");
}

DataType dataType();

String columnName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.flink.action.cdc.kafka.KafkaMetadataConverter;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;

import java.util.Arrays;
Expand Down Expand Up @@ -49,7 +51,14 @@ public enum CdcMetadataProcessor {
new CdcMetadataConverter.DatabaseNameConverter(),
new CdcMetadataConverter.TableNameConverter(),
new CdcMetadataConverter.SchemaNameConverter(),
new CdcMetadataConverter.OpTsConverter());
new CdcMetadataConverter.OpTsConverter()),
KAFKA_METADATA_PROCESSOR(
SyncJobHandler.SourceType.KAFKA,
new KafkaMetadataConverter.TopicConverter(),
new KafkaMetadataConverter.PartitionConverter(),
new KafkaMetadataConverter.OffsetConverter(),
new KafkaMetadataConverter.TimestampConverter(),
new KafkaMetadataConverter.TimestampTypeConverter());

private final SyncJobHandler.SourceType sourceType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/** A data change record from the CDC source. */
Expand All @@ -35,14 +38,29 @@ public class CdcSourceRecord implements Serializable {
// TODO Use generics to support more scenarios.
private final Object value;

// Generic metadata map - any source can add metadata
private final Map<String, Object> metadata;

public CdcSourceRecord(@Nullable String topic, @Nullable Object key, Object value) {
this.topic = topic;
this.key = key;
this.value = value;
this(topic, key, value, null);
}

public CdcSourceRecord(Object value) {
this(null, null, value);
this(null, null, value, null);
}

public CdcSourceRecord(
@Nullable String topic,
@Nullable Object key,
Object value,
@Nullable Map<String, Object> metadata) {
this.topic = topic;
this.key = key;
this.value = value;
this.metadata =
metadata != null
? Collections.unmodifiableMap(new HashMap<>(metadata))
: Collections.emptyMap();
}

@Nullable
Expand All @@ -59,6 +77,15 @@ public Object getValue() {
return value;
}

public Map<String, Object> getMetadata() {
return metadata;
}

@Nullable
public Object getMetadata(String key) {
return metadata.get(key);
}

@Override
public boolean equals(Object o) {
if (!(o instanceof CdcSourceRecord)) {
Expand All @@ -68,12 +95,13 @@ public boolean equals(Object o) {
CdcSourceRecord that = (CdcSourceRecord) o;
return Objects.equals(topic, that.topic)
&& Objects.equals(key, that.key)
&& Objects.equals(value, that.value);
&& Objects.equals(value, that.value)
&& Objects.equals(metadata, that.metadata);
}

@Override
public int hashCode() {
return Objects.hash(topic, key, value);
return Objects.hash(topic, key, value, metadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.action.cdc.format;

import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
Expand Down Expand Up @@ -49,6 +50,16 @@ public AbstractRecordParser createParser(
return parser().createParser(typeMapping, computedColumns);
}

@Override
public AbstractRecordParser createParser(
TypeMapping typeMapping,
List<ComputedColumn> computedColumns,
CdcMetadataConverter[] metadataConverters) {
// Most parsers don't support metadata converters, so we default to the 2-parameter version
// Only specific parsers like DebeziumAvroRecordParser will override this
return createParser(typeMapping, computedColumns);
}

@Override
public KafkaDeserializationSchema<CdcSourceRecord> createKafkaDeserializer(
Configuration cdcSourceConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.action.cdc.format;

import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
Expand Down Expand Up @@ -55,10 +56,21 @@ public abstract class AbstractRecordParser
protected static final String FIELD_DATABASE = "database";
protected final TypeMapping typeMapping;
protected final List<ComputedColumn> computedColumns;
protected final CdcMetadataConverter[] metadataConverters;
protected CdcSourceRecord currentRecord; // Store current record for metadata access

public AbstractRecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
this(typeMapping, computedColumns, new CdcMetadataConverter[0]);
}

public AbstractRecordParser(
TypeMapping typeMapping,
List<ComputedColumn> computedColumns,
CdcMetadataConverter[] metadataConverters) {
this.typeMapping = typeMapping;
this.computedColumns = computedColumns;
this.metadataConverters =
metadataConverters != null ? metadataConverters : new CdcMetadataConverter[0];
}

@Nullable
Expand Down Expand Up @@ -88,7 +100,11 @@ public void flatMap(CdcSourceRecord value, Collector<RichCdcMultiplexRecord> out
}
}

protected abstract void setRoot(CdcSourceRecord record);
protected void setRoot(CdcSourceRecord record) {
this.currentRecord = record;
// Call the original setRoot method for backward compatibility
// Subclasses can override this method as they used to
}

protected abstract List<RichCdcMultiplexRecord> extractRecords();

Expand All @@ -111,6 +127,24 @@ protected void evalComputedColumns(
});
}

/** Extract metadata values using metadata converters. */
protected void extractMetadata(Map<String, String> rowData, CdcSchema.Builder schemaBuilder) {
for (CdcMetadataConverter metadataConverter : metadataConverters) {
try {
String value = metadataConverter.read(currentRecord);
if (value != null) {
rowData.put(metadataConverter.columnName(), value);
}
schemaBuilder.column(metadataConverter.columnName(), metadataConverter.dataType());
} catch (UnsupportedOperationException e) {
// This converter doesn't support CdcSourceRecord, skip it
LOG.debug(
"Metadata converter {} does not support CdcSourceRecord",
metadataConverter.getClass().getSimpleName());
}
}
}

/** Handle case sensitivity here. */
protected RichCdcMultiplexRecord createRecord(
RowKind rowKind, Map<String, String> data, CdcSchema.Builder schemaBuilder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.action.cdc.format;

import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
Expand All @@ -38,11 +39,28 @@ public interface DataFormat {
* Creates a new instance of {@link AbstractRecordParser} for this data format with the
* specified configurations.
*
* @param typeMapping Type mapping configuration
* @param computedColumns List of computed columns to be considered by the parser.
* @return A new instance of {@link AbstractRecordParser}.
*/
default AbstractRecordParser createParser(
TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
return createParser(typeMapping, computedColumns, new CdcMetadataConverter[0]);
}

/**
* Creates a new instance of {@link AbstractRecordParser} for this data format with the
* specified configurations including metadata converters.
*
* @param typeMapping Type mapping configuration
* @param computedColumns List of computed columns to be considered by the parser.
* @param metadataConverters Array of metadata converters for extracting CDC metadata
* @return A new instance of {@link AbstractRecordParser}.
*/
AbstractRecordParser createParser(
TypeMapping typeMapping, List<ComputedColumn> computedColumns);
TypeMapping typeMapping,
List<ComputedColumn> computedColumns,
CdcMetadataConverter[] metadataConverters);

KafkaDeserializationSchema<CdcSourceRecord> createKafkaDeserializer(
Configuration cdcSourceConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@

package org.apache.paimon.flink.action.cdc.format.debezium;

import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.AbstractDataFormat;
import org.apache.paimon.flink.action.cdc.format.AbstractRecordParser;
import org.apache.paimon.flink.action.cdc.format.RecordParserFactory;
import org.apache.paimon.flink.action.cdc.kafka.KafkaDebeziumAvroDeserializationSchema;
import org.apache.paimon.flink.action.cdc.pulsar.PulsarDebeziumAvroDeserializationSchema;
Expand All @@ -28,6 +32,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;

import java.util.List;
import java.util.function.Function;

/**
Expand All @@ -41,6 +46,14 @@ protected RecordParserFactory parser() {
return DebeziumAvroRecordParser::new;
}

@Override
public AbstractRecordParser createParser(
TypeMapping typeMapping,
List<ComputedColumn> computedColumns,
CdcMetadataConverter[] metadataConverters) {
return new DebeziumAvroRecordParser(typeMapping, computedColumns, metadataConverters);
}

@Override
protected Function<Configuration, KafkaDeserializationSchema<CdcSourceRecord>>
kafkaDeserializer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.action.cdc.format.debezium;

import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
Expand Down Expand Up @@ -78,8 +79,16 @@ public DebeziumAvroRecordParser(TypeMapping typeMapping, List<ComputedColumn> co
super(typeMapping, computedColumns);
}

public DebeziumAvroRecordParser(
TypeMapping typeMapping,
List<ComputedColumn> computedColumns,
CdcMetadataConverter[] metadataConverters) {
super(typeMapping, computedColumns, metadataConverters);
}

@Override
protected void setRoot(CdcSourceRecord record) {
super.setRoot(record); // Store current record for metadata access
keyRecord = (GenericRecord) record.getKey();
valueRecord = (GenericRecord) record.getValue();
}
Expand Down Expand Up @@ -159,6 +168,7 @@ private Map<String, String> extractRowData(
}

evalComputedColumns(resultMap, schemaBuilder);
extractMetadata(resultMap, schemaBuilder);
return resultMap;
}

Expand Down
Loading
Loading