diff --git a/docs/content/how-to/cdc-ingestion.md b/docs/content/how-to/cdc-ingestion.md index 6529e2c83674..1e9afe41ab43 100644 --- a/docs/content/how-to/cdc-ingestion.md +++ b/docs/content/how-to/cdc-ingestion.md @@ -86,6 +86,7 @@ To use this feature through `flink run`, run the following shell command. [--primary-keys ] \ [--type-mapping ] \ [--computed-column <'column-name=expr-name(args[, ...])'> [--computed-column ...]] \ + [--metadata-column ] \ [--mysql-conf [--mysql-conf ...]] \ [--catalog-conf [--catalog-conf ...]] \ [--table-conf [--table-conf ...]] @@ -169,6 +170,7 @@ To use this feature through `flink run`, run the following shell command. [--including-tables ] \ [--excluding-tables ] \ [--mode ] \ + [--metadata-column ] \ [--type-mapping ] \ [--mysql-conf [--mysql-conf ...]] \ [--catalog-conf [--catalog-conf ...]] \ diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html b/docs/layouts/shortcodes/generated/mysql_sync_database.html index 0838850fd9f7..0fc65d96a231 100644 --- a/docs/layouts/shortcodes/generated/mysql_sync_database.html +++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html @@ -61,6 +61,10 @@
--mode
It is used to specify synchronization mode.
Possible values:
  • "divided" (the default mode if you haven't specified one): start a sink for each table, the synchronization of the new table requires restarting the job.
  • "combined": start a single combined sink for all tables, the new table will be automatically synchronized.
+ +
--metadata-column
+ --metadata-column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, such as the `table_name`, `database_name`, and `op_ts`. Each configuration should be specified in the format "key=value". See its document for a complete list of available metadata. +
--type-mapping
It is used to specify how to map MySQL data type to Paimon type.
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_table.html b/docs/layouts/shortcodes/generated/mysql_sync_table.html index 124802b1760d..471153cdd787 100644 --- a/docs/layouts/shortcodes/generated/mysql_sync_table.html +++ b/docs/layouts/shortcodes/generated/mysql_sync_table.html @@ -63,6 +63,10 @@
--computed-column
The definitions of computed columns. The argument field is from MySQL table field name. See here for a complete list of configurations. + +
--metadata-column
+ --metadata-column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, such as the `table_name`, `database_name`, and `op_ts`. Each configuration should be specified in the format "key=value". See its document for a complete list of available metadata. +
--mysql-conf
The configuration for Flink CDC MySQL sources. Each configuration should be specified in the format "key=value". hostname, username, password, database-name and table-name are required configurations, others are optional. See its document for a complete list of configurations. diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java index 8e0398795df7..c463effe4eb8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java @@ -204,5 +204,20 @@ public static T asSpecificNodeType(String json, Class cl return clazz.cast(resultNode); } + /** + * Converts the given Java object into its corresponding {@link JsonNode} representation. + * + *

This method utilizes the Jackson {@link ObjectMapper}'s valueToTree functionality to + * transform any Java object into a JsonNode, which can be useful for various JSON tree + * manipulations without serializing the object into a string format first. + * + * @param The type of the input object. + * @param value The Java object to be converted. + * @return The JsonNode representation of the given object. + */ + public static JsonNode toTree(T value) { + return OBJECT_MAPPER_INSTANCE.valueToTree(value); + } + private JsonSerdeUtil() {} } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java index 1561d9ede939..20d16460fb00 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java @@ -150,7 +150,8 @@ public static Schema buildPaimonSchema( List specifiedPrimaryKeys, List computedColumns, Map tableConfig, - Schema sourceSchema) { + Schema sourceSchema, + CdcMetadataConverter[] metadataConverters) { Schema.Builder builder = Schema.newBuilder(); // options @@ -171,6 +172,10 @@ public static Schema buildPaimonSchema( builder.column(computedColumn.columnName(), computedColumn.columnType()); } + for (CdcMetadataConverter metadataConverter : metadataConverters) { + builder.column(metadataConverter.getColumnName(), metadataConverter.getDataType()); + } + // primary keys if (!specifiedPrimaryKeys.isEmpty()) { Map sourceColumns = diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcMetadataConverter.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcMetadataConverter.java new file mode 100644 index 000000000000..68fe4f27703a --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcMetadataConverter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc; + +import org.apache.paimon.types.DataType; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; + +import java.io.Serializable; +import java.util.Map; + +/** + * A functional interface for converting CDC metadata. + * + *

This interface provides a mechanism to convert Change Data Capture (CDC) metadata from a given + * {@link JsonNode} source. Implementations of this interface can be used to process and transform + * metadata entries from CDC sources. + */ +public interface CdcMetadataConverter extends Serializable { + + Map read(JsonNode payload); + + DataType getDataType(); + + String getColumnName(); +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java index b00fa38b050d..bab36f3254b2 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java @@ -24,6 +24,7 @@ import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.action.Action; import org.apache.paimon.flink.action.ActionBase; +import org.apache.paimon.flink.action.cdc.CdcMetadataConverter; import org.apache.paimon.flink.action.cdc.ComputedColumn; import org.apache.paimon.flink.action.cdc.TypeMapping; import org.apache.paimon.flink.action.cdc.kafka.format.DataFormat; @@ -157,7 +158,12 @@ public void build(StreamExecutionEnvironment env) throws Exception { buildComputedColumns(computedColumnArgs, kafkaSchema); Schema fromKafka = buildPaimonSchema( - partitionKeys, primaryKeys, computedColumns, tableConfig, kafkaSchema); + partitionKeys, + primaryKeys, + computedColumns, + tableConfig, + kafkaSchema, + new CdcMetadataConverter[] {}); try { table = (FileStoreTable) catalog.getTable(identifier); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java index ddadcc3054e5..9a4dc8b342cf 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java @@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.action.ActionBase; +import org.apache.paimon.flink.action.cdc.CdcMetadataConverter; import org.apache.paimon.flink.action.cdc.ComputedColumn; import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder; import org.apache.paimon.flink.sink.cdc.EventParser; @@ -141,7 +142,8 @@ public void build(StreamExecutionEnvironment env) throws Exception { Collections.emptyList(), computedColumns, tableConfig, - mongodbSchema); + mongodbSchema, + new CdcMetadataConverter[] {}); // Check if table exists before trying to get or create it if (catalog.tableExists(identifier)) { table = (FileStoreTable) catalog.getTable(identifier); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java index e18cc1931d93..181fbbe5d8db 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java @@ -24,6 +24,7 @@ package org.apache.paimon.flink.action.cdc.mysql; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.action.cdc.CdcMetadataConverter; import org.apache.paimon.flink.action.cdc.ComputedColumn; import org.apache.paimon.flink.action.cdc.TableNameConverter; import org.apache.paimon.flink.action.cdc.TypeMapping; @@ -35,6 +36,7 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowKind; import org.apache.paimon.utils.DateTimeUtils; +import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.StringUtils; @@ -102,12 +104,14 @@ public class MySqlDebeziumJsonEventParser implements EventParser { // NOTE: current table name is not converted by tableNameConverter private String currentTable; private boolean shouldSynchronizeCurrentTable; + private final CdcMetadataConverter[] metadataConverters; public MySqlDebeziumJsonEventParser( ZoneId serverTimeZone, boolean caseSensitive, List computedColumns, - TypeMapping typeMapping) { + TypeMapping typeMapping, + CdcMetadataConverter[] metadataConverters) { this( serverTimeZone, caseSensitive, @@ -116,7 +120,8 @@ public MySqlDebeziumJsonEventParser( new MySqlTableSchemaBuilder(new HashMap<>(), caseSensitive, typeMapping), null, null, - typeMapping); + typeMapping, + metadataConverters); } public MySqlDebeziumJsonEventParser( @@ -126,7 +131,8 @@ public MySqlDebeziumJsonEventParser( NewTableSchemaBuilder schemaBuilder, @Nullable Pattern includingPattern, @Nullable Pattern excludingPattern, - TypeMapping typeMapping) { + TypeMapping typeMapping, + CdcMetadataConverter[] metadataConverters) { this( serverTimeZone, caseSensitive, @@ -135,7 +141,8 @@ public MySqlDebeziumJsonEventParser( schemaBuilder, includingPattern, excludingPattern, - typeMapping); + typeMapping, + metadataConverters); } public MySqlDebeziumJsonEventParser( @@ -146,7 +153,8 @@ public MySqlDebeziumJsonEventParser( NewTableSchemaBuilder schemaBuilder, @Nullable Pattern includingPattern, @Nullable Pattern excludingPattern, - TypeMapping typeMapping) { + TypeMapping typeMapping, + CdcMetadataConverter[] metadataConverters) { this.serverTimeZone = serverTimeZone; this.caseSensitive = caseSensitive; this.computedColumns = computedColumns; @@ -155,6 +163,7 @@ public MySqlDebeziumJsonEventParser( this.includingPattern = includingPattern; this.excludingPattern = excludingPattern; this.typeMapping = typeMapping; + this.metadataConverters = metadataConverters; } @Override @@ -416,6 +425,10 @@ else if (Date.SCHEMA_NAME.equals(className)) { computedColumn.eval(resultMap.get(computedColumn.fieldReference()))); } + for (CdcMetadataConverter metadataConverter : metadataConverters) { + resultMap.putAll(metadataConverter.read(JsonSerdeUtil.toTree(root.payload()))); + } + return resultMap; } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlMetadataProcessor.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlMetadataProcessor.java new file mode 100644 index 000000000000..1efe1ed8eba3 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlMetadataProcessor.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.mysql; + +import org.apache.paimon.flink.action.cdc.CdcMetadataConverter; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; + +import io.debezium.connector.AbstractSourceInfo; + +import java.util.Collections; +import java.util.Map; + +/** + * Enumerates the metadata processing behaviors for MySQL CDC related data. + * + *

This enumeration provides definitions for various MySQL CDC metadata keys along with their + * associated data types and converters. Each enum entry represents a specific type of metadata + * related to MySQL CDC and provides a mechanism to read and process this metadata from a given + * {@link JsonNode} source. + * + *

The provided converters, which are of type {@link CdcMetadataConverter}, define how the raw + * metadata is transformed or processed for each specific metadata key. + */ +public enum MySqlMetadataProcessor { + /** Name of the table that contain the row. */ + TABLE_NAME( + "table_name", + DataTypes.STRING().notNull(), + new CdcMetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Map read(JsonNode record) { + String table = + record.get("source").get(AbstractSourceInfo.TABLE_NAME_KEY).asText(); + return Collections.singletonMap("table_name", table); + } + + @Override + public DataType getDataType() { + return DataTypes.STRING().notNull(); + } + + @Override + public String getColumnName() { + return "table_name"; + } + }), + + /** Name of the database that contain the row. */ + DATABASE_NAME( + "database_name", + DataTypes.STRING().notNull(), + new CdcMetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Map read(JsonNode record) { + String database = + record.get("source").get(AbstractSourceInfo.DATABASE_NAME_KEY).asText(); + return Collections.singletonMap("database_name", database); + } + + @Override + public DataType getDataType() { + return DataTypes.STRING().notNull(); + } + + @Override + public String getColumnName() { + return "database_name"; + } + }), + + /** + * It indicates the time that the change was made in the database. If the record is read from + * snapshot of the table instead of the binlog, the value is always 0. + */ + OP_TS( + "op_ts", + DataTypes.TIMESTAMP(3).notNull(), + new CdcMetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Map read(JsonNode record) { + String timestamp = + record.get("source").get(AbstractSourceInfo.TIMESTAMP_KEY).asText(); + return Collections.singletonMap("op_ts", timestamp); + } + + @Override + public DataType getDataType() { + return DataTypes.TIMESTAMP(3).notNull(); + } + + @Override + public String getColumnName() { + return "op_ts"; + } + }); + + private final String key; + + private final DataType dataType; + + private final CdcMetadataConverter converter; + + MySqlMetadataProcessor(String key, DataType dataType, CdcMetadataConverter converter) { + this.key = key; + this.dataType = dataType; + this.converter = converter; + } + + public String getKey() { + return key; + } + + public DataType getDataType() { + return dataType; + } + + public CdcMetadataConverter getConverter() { + return converter; + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index 034bd5cfa1e1..4827b04808c9 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -25,6 +25,7 @@ import org.apache.paimon.flink.action.ActionBase; import org.apache.paimon.flink.action.MultiTablesSinkMode; import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils; +import org.apache.paimon.flink.action.cdc.CdcMetadataConverter; import org.apache.paimon.flink.action.cdc.TableNameConverter; import org.apache.paimon.flink.action.cdc.TypeMapping; import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo; @@ -55,6 +56,7 @@ import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.paimon.flink.action.MultiTablesSinkMode.DIVIDED; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.schemaCompatible; @@ -117,6 +119,7 @@ public class MySqlSyncDatabaseAction extends ActionBase { // for test purpose private final List monitoredTables = new ArrayList<>(); private final List excludedTables = new ArrayList<>(); + private List metadataColumn = new ArrayList<>(); public MySqlSyncDatabaseAction( String warehouse, @@ -179,6 +182,11 @@ public MySqlSyncDatabaseAction withTypeMapping(TypeMapping typeMapping) { return this; } + public MySqlSyncDatabaseAction withMetadataKeys(List metadataKeys) { + this.metadataColumn = metadataKeys; + return this; + } + @Override public void build(StreamExecutionEnvironment env) throws Exception { checkArgument( @@ -218,6 +226,17 @@ public void build(StreamExecutionEnvironment env) throws Exception { TableNameConverter tableNameConverter = new TableNameConverter(caseSensitive, mergeShards, tablePrefix, tableSuffix); + CdcMetadataConverter[] metadataConverters = + metadataColumn.stream() + .map( + key -> + Stream.of(MySqlMetadataProcessor.values()) + .filter(m -> m.getKey().equals(key)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .map(MySqlMetadataProcessor::getConverter) + .toArray(CdcMetadataConverter[]::new); + List fileStoreTables = new ArrayList<>(); for (MySqlTableInfo tableInfo : mySqlTableInfos) { Identifier identifier = @@ -230,7 +249,8 @@ public void build(StreamExecutionEnvironment env) throws Exception { Collections.emptyList(), Collections.emptyList(), tableConfig, - tableInfo.schema()); + tableInfo.schema(), + metadataConverters); try { table = (FileStoreTable) catalog.getTable(identifier); Supplier errMsg = @@ -279,7 +299,8 @@ public void build(StreamExecutionEnvironment env) throws Exception { schemaBuilder, includingPattern, excludingPattern, - typeMapping); + typeMapping, + metadataConverters); String database = this.database; MultiTablesSinkMode mode = this.mode; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java index ae68d7d4c46d..93ed27def163 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.utils.MultipleParameterTool; +import java.util.Arrays; import java.util.Optional; /** Factory to create {@link MySqlSyncDatabaseAction}. */ @@ -58,6 +59,9 @@ public Optional create(MultipleParameterTool params) { .includingTables(params.get("including-tables")) .excludingTables(params.get("excluding-tables")) .withMode(MultiTablesSinkMode.fromString(params.get("mode"))); + if (params.has("metadata-column")) { + action.withMetadataKeys(Arrays.asList(params.get("metadata-column").split(","))); + } if (params.has("type-mapping")) { String[] options = params.get("type-mapping").split(","); @@ -87,6 +91,7 @@ public void printHelp() { + "[--including-tables ] " + "[--excluding-tables ] " + "[--mode ] " + + "[--metadata-column ] " + "[--type-mapping ] " + "[--mysql-conf [--mysql-conf ...]] " + "[--catalog-conf [--catalog-conf ...]] " @@ -131,6 +136,10 @@ public void printHelp() { " 2. 'combined': start a single combined sink for all tables, the new table will be automatically synchronized."); System.out.println(); + System.out.println( + "--metadata-column is used to specify which metadata columns to include in the output schema of the connector. Please see the doc for usage."); + System.out.println(); + System.out.println( "--type-mapping is used to specify how to map MySQL type to Paimon type. Please see the doc for usage."); System.out.println(); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java index 487bead64349..c1a559b7fe1d 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java @@ -25,6 +25,7 @@ import org.apache.paimon.flink.action.Action; import org.apache.paimon.flink.action.ActionBase; import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils; +import org.apache.paimon.flink.action.cdc.CdcMetadataConverter; import org.apache.paimon.flink.action.cdc.ComputedColumn; import org.apache.paimon.flink.action.cdc.TypeMapping; import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo; @@ -50,6 +51,7 @@ import java.util.function.Predicate; import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComputedColumns; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -96,6 +98,7 @@ public class MySqlSyncTableAction extends ActionBase { private Map tableConfig = new HashMap<>(); private List computedColumnArgs = new ArrayList<>(); + private List metadataColumn = new ArrayList<>(); private TypeMapping typeMapping = TypeMapping.defaultMapping(); public MySqlSyncTableAction( @@ -143,6 +146,11 @@ public MySqlSyncTableAction withTypeMapping(TypeMapping typeMapping) { return this; } + public MySqlSyncTableAction withMetadataKeys(List metadataKeys) { + this.metadataColumn = metadataKeys; + return this; + } + @Override public void build(StreamExecutionEnvironment env) throws Exception { checkArgument( @@ -172,13 +180,26 @@ public void build(StreamExecutionEnvironment env) throws Exception { FileStoreTable table; List computedColumns = buildComputedColumns(computedColumnArgs, tableInfo.schema()); + + CdcMetadataConverter[] metadataConverters = + metadataColumn.stream() + .map( + key -> + Stream.of(MySqlMetadataProcessor.values()) + .filter(m -> m.getKey().equals(key)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .map(MySqlMetadataProcessor::getConverter) + .toArray(CdcMetadataConverter[]::new); + Schema fromMySql = CdcActionCommonUtils.buildPaimonSchema( partitionKeys, primaryKeys, computedColumns, tableConfig, - tableInfo.schema()); + tableInfo.schema(), + metadataConverters); try { table = (FileStoreTable) catalog.getTable(identifier); if (computedColumns.size() > 0) { @@ -211,7 +232,11 @@ public void build(StreamExecutionEnvironment env) throws Exception { EventParser.Factory parserFactory = () -> new MySqlDebeziumJsonEventParser( - zoneId, caseSensitive, computedColumns, typeMapping); + zoneId, + caseSensitive, + computedColumns, + typeMapping, + metadataConverters); CdcSinkBuilder sinkBuilder = new CdcSinkBuilder() diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java index dd5828b69612..93383c71287d 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionFactory.java @@ -65,6 +65,10 @@ public Optional create(MultipleParameterTool params) { new ArrayList<>(params.getMultiParameter("computed-column"))); } + if (params.has("metadata-column")) { + action.withMetadataKeys(new ArrayList<>(params.getMultiParameter("metadata-column"))); + } + if (params.has("type-mapping")) { String[] options = params.get("type-mapping").split(","); action.withTypeMapping(TypeMapping.parse(options)); @@ -88,6 +92,7 @@ public void printHelp() { + "[--primary-keys ] " + "[--type-mapping ] " + "[--computed-column <'column-name=expr-name(args[, ...])'> [--computed-column ...]] " + + "[--metadata-column ] " + "[--mysql-conf [--mysql-conf ...]] " + "[--catalog-conf [--catalog-conf ...]] " + "[--table-conf [--table-conf ...]]"); @@ -112,6 +117,10 @@ public void printHelp() { System.out.println("Please see doc for usage of --computed-column."); System.out.println(); + System.out.println( + "--metadata-column is used to specify which metadata columns to include in the output schema of the connector. Please see the doc for usage."); + System.out.println(); + System.out.println("MySQL CDC source conf syntax:"); System.out.println(" key=value"); System.out.println( diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEvent.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEvent.java index 48ab55ba4157..6520a075fe9a 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEvent.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEvent.java @@ -20,6 +20,7 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -140,6 +141,7 @@ public boolean hasHistoryRecord() { } /** Get table changes in history record. */ + @JsonIgnore public Iterator getTableChanges() throws IOException { return DebeziumEventUtils.getTableChanges(historyRecord).iterator(); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java index f4e1a4577d43..919a04b51582 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java @@ -213,6 +213,7 @@ protected abstract static class SyncTableActionBuilder { protected final List primaryKeys = new ArrayList<>(); protected final List computedColumnArgs = new ArrayList<>(); protected final List typeMappingModes = new ArrayList<>(); + protected final List metadataColumn = new ArrayList<>(); public SyncTableActionBuilder(Map sourceConfig) { this.sourceConfig = sourceConfig; @@ -252,6 +253,11 @@ public SyncTableActionBuilder withTypeMappingModes(String... typeMappingModes return this; } + public SyncTableActionBuilder withMetadataColumn(List metadataColumn) { + this.metadataColumn.addAll(metadataColumn); + return this; + } + public abstract T build(); } @@ -270,6 +276,7 @@ protected abstract static class SyncDatabaseActionBuilder { @Nullable protected String excludingTables; @Nullable protected String mode; protected final List typeMappingModes = new ArrayList<>(); + protected final List metadataColumn = new ArrayList<>(); public SyncDatabaseActionBuilder(Map sourceConfig) { this.sourceConfig = sourceConfig; @@ -325,6 +332,11 @@ public SyncDatabaseActionBuilder withTypeMappingModes(String... typeMappingMo return this; } + public SyncDatabaseActionBuilder withMetadataColumn(List metadataColumn) { + this.metadataColumn.addAll(metadataColumn); + return this; + } + public abstract T build(); } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java index e93b0e03e18b..937f96229ffb 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java @@ -128,6 +128,7 @@ public MySqlSyncTableAction build() { args.addAll(listToArgs("--type-mapping", typeMappingModes)); args.addAll(listToMultiArgs("--computed-column", computedColumnArgs)); + args.addAll(listToMultiArgs("--metadata-column", metadataColumn)); MultipleParameterTool params = MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0]))); @@ -164,6 +165,7 @@ public MySqlSyncDatabaseAction build() { args.addAll(nullableToArgs("--mode", mode)); args.addAll(listToArgs("--type-mapping", typeMappingModes)); + args.addAll(listToArgs("--metadata-column", metadataColumn)); MultipleParameterTool params = MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0]))); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java index 265c4ea375dd..0911b6e5a4a7 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java @@ -1197,6 +1197,74 @@ public void testCatalogAndTableConfig() { .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value")); } + @Test + @Timeout(60) + public void testMetadataColumns() throws Exception { + Map mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put("database-name", "metadata"); + + MultiTablesSinkMode mode = ThreadLocalRandom.current().nextBoolean() ? DIVIDED : COMBINED; + MySqlSyncDatabaseAction action = + syncDatabaseActionBuilder(mySqlConfig) + .withTableConfig(getBasicTableConfig()) + .withMode(mode.configString()) + .withMetadataColumn(Arrays.asList("table_name", "database_name")) + .build(); + runActionWithDefaultEnv(action); + + try (Statement statement = getStatement()) { + statement.executeUpdate("INSERT INTO metadata.t1 VALUES (1, 'db1_1')"); + statement.executeUpdate("INSERT INTO metadata.t1 VALUES (2, 'db1_2')"); + + statement.executeUpdate("INSERT INTO metadata.t1 VALUES (3, 'db2_3')"); + statement.executeUpdate("INSERT INTO metadata.t1 VALUES (4, 'db2_4')"); + + FileStoreTable table = getFileStoreTable("t1"); + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.VARCHAR(10), + DataTypes.STRING().notNull(), + DataTypes.STRING().notNull() + }, + new String[] {"k", "v1", "table_name", "database_name"}); + waitForResult( + Arrays.asList( + "+I[1, db1_1, t1, metadata]", + "+I[2, db1_2, t1, metadata]", + "+I[3, db2_3, t1, metadata]", + "+I[4, db2_4, t1, metadata]"), + table, + rowType, + Collections.singletonList("k")); + + statement.executeUpdate("INSERT INTO metadata.t2 VALUES (1, 'db1_1')"); + statement.executeUpdate("INSERT INTO metadata.t2 VALUES (2, 'db1_2')"); + statement.executeUpdate("INSERT INTO metadata.t2 VALUES (3, 'db1_3')"); + statement.executeUpdate("INSERT INTO metadata.t2 VALUES (4, 'db1_4')"); + table = getFileStoreTable("t2"); + rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.VARCHAR(10), + DataTypes.STRING().notNull(), + DataTypes.STRING().notNull() + }, + new String[] {"k", "v1", "table_name", "database_name"}); + waitForResult( + Arrays.asList( + "+I[1, db1_1, t2, metadata]", + "+I[2, db1_2, t2, metadata]", + "+I[3, db1_3, t2, metadata]", + "+I[4, db1_4, t2, metadata]"), + table, + rowType, + Collections.singletonList("k")); + } + } + private class SyncNewTableJob implements Runnable { private final int ith; diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index c8f2cf77574c..dfabca908188 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -843,6 +843,45 @@ public void testSyncShards() throws Exception { Arrays.asList("pk", "pt")); } + @Test + @Timeout(60) + public void testMetadataColumns() throws Exception { + try (Statement statement = getStatement()) { + statement.execute("USE metadata"); + statement.executeUpdate("INSERT INTO test_metadata_columns VALUES (1, '2023-07-30')"); + statement.executeUpdate("INSERT INTO test_metadata_columns VALUES (2, '2023-07-30')"); + } + Map mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put("database-name", "metadata"); + mySqlConfig.put("table-name", "test_metadata_columns"); + + MySqlSyncTableAction action = + syncTableActionBuilder(mySqlConfig) + .withPrimaryKeys("pk") + .withMetadataColumn(Arrays.asList("table_name", "database_name")) + .build(); + + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(); + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.VARCHAR(10), + DataTypes.STRING().notNull(), + DataTypes.STRING().notNull() + }, + new String[] {"pk", "_date", "table_name", "database_name"}); + waitForResult( + Arrays.asList( + "+I[1, 2023-07-30, test_metadata_columns, metadata]", + "+I[2, 2023-07-30, test_metadata_columns, metadata]"), + table, + rowType, + Collections.singletonList("pk")); + } + @Test public void testCatalogAndTableConfig() { MySqlSyncTableAction action = diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_database_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_database_setup.sql index 5232f47710d4..fa66301c990d 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_database_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_database_setup.sql @@ -449,3 +449,23 @@ CREATE TABLE t3 ( k INT, v2 VARCHAR(10) ); + + +-- ################################################################################ +-- testMetadataColumns +-- ################################################################################ + +CREATE DATABASE metadata; +USE metadata; + +CREATE TABLE t1 ( + k INT, + v1 VARCHAR(10), + PRIMARY KEY (k) +); + +CREATE TABLE t2 ( + k INT, + v1 VARCHAR(10), + PRIMARY KEY (k) +); diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql index 121f875601e9..2c1abc2691e6 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql @@ -314,3 +314,16 @@ CREATE TABLE t2 ( _date VARCHAR(10), PRIMARY KEY (pk) ); + +-- ################################################################################ +-- testMetadataColumns +-- ################################################################################ + +CREATE DATABASE metadata; +USE metadata; + +CREATE TABLE test_metadata_columns ( + pk INT, + _date VARCHAR(10), + PRIMARY KEY (pk) +); \ No newline at end of file