Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/content/how-to/cdc-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ To use this feature through `flink run`, run the following shell command.
[--primary-keys <primary-keys>] \
[--type-mapping <option1,option2...>] \
[--computed-column <'column-name=expr-name(args[, ...])'> [--computed-column ...]] \
[--metadata-column <metadata-column>] \
[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...]] \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
[--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]
Expand Down Expand Up @@ -169,6 +170,7 @@ To use this feature through `flink run`, run the following shell command.
[--including-tables <mysql-table-name|name-regular-expr>] \
[--excluding-tables <mysql-table-name|name-regular-expr>] \
[--mode <sync-mode>] \
[--metadata-column <metadata-column>] \
[--type-mapping <option1,option2...>] \
[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...]] \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
Expand Down
4 changes: 4 additions & 0 deletions docs/layouts/shortcodes/generated/mysql_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@
<td><h5>--mode</h5></td>
<td>It is used to specify synchronization mode.<br />Possible values:<ul><li>"divided" (the default mode if you haven't specified one): start a sink for each table, the synchronization of the new table requires restarting the job.</li><li>"combined": start a single combined sink for all tables, the new table will be automatically synchronized.</li></ul></td>
</tr>
<tr>
<td><h5>--metadata-column</h5></td>
<td>--metadata-column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, such as the `table_name`, `database_name`, and `op_ts`. Each configuration should be specified in the format "key=value". See its <a href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#available-metadata">document</a> for a complete list of available metadata.</td>
</tr>
<tr>
<td><h5>--type-mapping</h5></td>
<td>It is used to specify how to map MySQL data type to Paimon type.<br />
Expand Down
4 changes: 4 additions & 0 deletions docs/layouts/shortcodes/generated/mysql_sync_table.html
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
<td><h5>--computed-column</h5></td>
<td>The definitions of computed columns. The argument field is from MySQL table field name. See <a href="#computed-functions">here</a> for a complete list of configurations. </td>
</tr>
<tr>
<td><h5>--metadata-column</h5></td>
<td>--metadata-column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, such as the `table_name`, `database_name`, and `op_ts`. Each configuration should be specified in the format "key=value". See its <a href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#available-metadata">document</a> for a complete list of available metadata.</td>
</tr>
<tr>
<td><h5>--mysql-conf</h5></td>
<td>The configuration for Flink CDC MySQL sources. Each configuration should be specified in the format "key=value". hostname, username, password, database-name and table-name are required configurations, others are optional. See its <a href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options">document</a> for a complete list of configurations.</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,5 +204,20 @@ public static <T extends JsonNode> T asSpecificNodeType(String json, Class<T> cl
return clazz.cast(resultNode);
}

/**
* Converts the given Java object into its corresponding {@link JsonNode} representation.
*
* <p>This method utilizes the Jackson {@link ObjectMapper}'s valueToTree functionality to
* transform any Java object into a JsonNode, which can be useful for various JSON tree
* manipulations without serializing the object into a string format first.
*
* @param <T> The type of the input object.
* @param value The Java object to be converted.
* @return The JsonNode representation of the given object.
*/
public static <T> JsonNode toTree(T value) {
return OBJECT_MAPPER_INSTANCE.valueToTree(value);
}

private JsonSerdeUtil() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ public static Schema buildPaimonSchema(
List<String> specifiedPrimaryKeys,
List<ComputedColumn> computedColumns,
Map<String, String> tableConfig,
Schema sourceSchema) {
Schema sourceSchema,
CdcMetadataConverter[] metadataConverters) {
Schema.Builder builder = Schema.newBuilder();

// options
Expand All @@ -171,6 +172,10 @@ public static Schema buildPaimonSchema(
builder.column(computedColumn.columnName(), computedColumn.columnType());
}

for (CdcMetadataConverter metadataConverter : metadataConverters) {
builder.column(metadataConverter.getColumnName(), metadataConverter.getDataType());
}

// primary keys
if (!specifiedPrimaryKeys.isEmpty()) {
Map<String, Integer> sourceColumns =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.types.DataType;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;

import java.io.Serializable;
import java.util.Map;

/**
* A functional interface for converting CDC metadata.
*
* <p>This interface provides a mechanism to convert Change Data Capture (CDC) metadata from a given
* {@link JsonNode} source. Implementations of this interface can be used to process and transform
* metadata entries from CDC sources.
*/
public interface CdcMetadataConverter extends Serializable {

Map<String, String> read(JsonNode payload);

DataType getDataType();

String getColumnName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.format.DataFormat;
Expand Down Expand Up @@ -157,7 +158,12 @@ public void build(StreamExecutionEnvironment env) throws Exception {
buildComputedColumns(computedColumnArgs, kafkaSchema);
Schema fromKafka =
buildPaimonSchema(
partitionKeys, primaryKeys, computedColumns, tableConfig, kafkaSchema);
partitionKeys,
primaryKeys,
computedColumns,
tableConfig,
kafkaSchema,
new CdcMetadataConverter[] {});

try {
table = (FileStoreTable) catalog.getTable(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
import org.apache.paimon.flink.sink.cdc.EventParser;
Expand Down Expand Up @@ -141,7 +142,8 @@ public void build(StreamExecutionEnvironment env) throws Exception {
Collections.emptyList(),
computedColumns,
tableConfig,
mongodbSchema);
mongodbSchema,
new CdcMetadataConverter[] {});
// Check if table exists before trying to get or create it
if (catalog.tableExists(identifier)) {
table = (FileStoreTable) catalog.getTable(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package org.apache.paimon.flink.action.cdc.mysql;

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
Expand All @@ -35,6 +36,7 @@
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;

Expand Down Expand Up @@ -102,12 +104,14 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> {
// NOTE: current table name is not converted by tableNameConverter
private String currentTable;
private boolean shouldSynchronizeCurrentTable;
private final CdcMetadataConverter[] metadataConverters;

public MySqlDebeziumJsonEventParser(
ZoneId serverTimeZone,
boolean caseSensitive,
List<ComputedColumn> computedColumns,
TypeMapping typeMapping) {
TypeMapping typeMapping,
CdcMetadataConverter[] metadataConverters) {
this(
serverTimeZone,
caseSensitive,
Expand All @@ -116,7 +120,8 @@ public MySqlDebeziumJsonEventParser(
new MySqlTableSchemaBuilder(new HashMap<>(), caseSensitive, typeMapping),
null,
null,
typeMapping);
typeMapping,
metadataConverters);
}

public MySqlDebeziumJsonEventParser(
Expand All @@ -126,7 +131,8 @@ public MySqlDebeziumJsonEventParser(
NewTableSchemaBuilder<TableChanges.TableChange> schemaBuilder,
@Nullable Pattern includingPattern,
@Nullable Pattern excludingPattern,
TypeMapping typeMapping) {
TypeMapping typeMapping,
CdcMetadataConverter[] metadataConverters) {
this(
serverTimeZone,
caseSensitive,
Expand All @@ -135,7 +141,8 @@ public MySqlDebeziumJsonEventParser(
schemaBuilder,
includingPattern,
excludingPattern,
typeMapping);
typeMapping,
metadataConverters);
}

public MySqlDebeziumJsonEventParser(
Expand All @@ -146,7 +153,8 @@ public MySqlDebeziumJsonEventParser(
NewTableSchemaBuilder<TableChanges.TableChange> schemaBuilder,
@Nullable Pattern includingPattern,
@Nullable Pattern excludingPattern,
TypeMapping typeMapping) {
TypeMapping typeMapping,
CdcMetadataConverter[] metadataConverters) {
this.serverTimeZone = serverTimeZone;
this.caseSensitive = caseSensitive;
this.computedColumns = computedColumns;
Expand All @@ -155,6 +163,7 @@ public MySqlDebeziumJsonEventParser(
this.includingPattern = includingPattern;
this.excludingPattern = excludingPattern;
this.typeMapping = typeMapping;
this.metadataConverters = metadataConverters;
}

@Override
Expand Down Expand Up @@ -416,6 +425,10 @@ else if (Date.SCHEMA_NAME.equals(className)) {
computedColumn.eval(resultMap.get(computedColumn.fieldReference())));
}

for (CdcMetadataConverter metadataConverter : metadataConverters) {
resultMap.putAll(metadataConverter.read(JsonSerdeUtil.toTree(root.payload())));
}

return resultMap;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.mysql;

import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;

import io.debezium.connector.AbstractSourceInfo;

import java.util.Collections;
import java.util.Map;

/**
* Enumerates the metadata processing behaviors for MySQL CDC related data.
*
* <p>This enumeration provides definitions for various MySQL CDC metadata keys along with their
* associated data types and converters. Each enum entry represents a specific type of metadata
* related to MySQL CDC and provides a mechanism to read and process this metadata from a given
* {@link JsonNode} source.
*
* <p>The provided converters, which are of type {@link CdcMetadataConverter}, define how the raw
* metadata is transformed or processed for each specific metadata key.
*/
public enum MySqlMetadataProcessor {
/** Name of the table that contain the row. */
TABLE_NAME(
"table_name",
DataTypes.STRING().notNull(),
new CdcMetadataConverter() {
private static final long serialVersionUID = 1L;

@Override
public Map<String, String> read(JsonNode record) {
String table =
record.get("source").get(AbstractSourceInfo.TABLE_NAME_KEY).asText();
return Collections.singletonMap("table_name", table);
}

@Override
public DataType getDataType() {
return DataTypes.STRING().notNull();
}

@Override
public String getColumnName() {
return "table_name";
}
}),

/** Name of the database that contain the row. */
DATABASE_NAME(
"database_name",
DataTypes.STRING().notNull(),
new CdcMetadataConverter() {
private static final long serialVersionUID = 1L;

@Override
public Map<String, String> read(JsonNode record) {
String database =
record.get("source").get(AbstractSourceInfo.DATABASE_NAME_KEY).asText();
return Collections.singletonMap("database_name", database);
}

@Override
public DataType getDataType() {
return DataTypes.STRING().notNull();
}

@Override
public String getColumnName() {
return "database_name";
}
}),

/**
* It indicates the time that the change was made in the database. If the record is read from
* snapshot of the table instead of the binlog, the value is always 0.
*/
OP_TS(
"op_ts",
DataTypes.TIMESTAMP(3).notNull(),
new CdcMetadataConverter() {
private static final long serialVersionUID = 1L;

@Override
public Map<String, String> read(JsonNode record) {
String timestamp =
record.get("source").get(AbstractSourceInfo.TIMESTAMP_KEY).asText();
return Collections.singletonMap("op_ts", timestamp);
}

@Override
public DataType getDataType() {
return DataTypes.TIMESTAMP(3).notNull();
}

@Override
public String getColumnName() {
return "op_ts";
}
});

private final String key;

private final DataType dataType;

private final CdcMetadataConverter converter;

MySqlMetadataProcessor(String key, DataType dataType, CdcMetadataConverter converter) {
this.key = key;
this.dataType = dataType;
this.converter = converter;
}

public String getKey() {
return key;
}

public DataType getDataType() {
return dataType;
}

public CdcMetadataConverter getConverter() {
return converter;
}
}
Loading