Skip to content

Commit 2c9e22e

Browse files
author
chenzy15
committed
[flink] CDC Ingestion supported metadata columns
1 parent 77f10a0 commit 2c9e22e

File tree

21 files changed

+476
-12
lines changed

21 files changed

+476
-12
lines changed

docs/content/how-to/cdc-ingestion.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ To use this feature through `flink run`, run the following shell command.
8686
[--primary-keys <primary-keys>] \
8787
[--type-mapping <option1,option2...>] \
8888
[--computed-column <'column-name=expr-name(args[, ...])'> [--computed-column ...]] \
89+
[--metadata-column <metadata-column>] \
8990
[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...]] \
9091
[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
9192
[--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]
@@ -169,6 +170,7 @@ To use this feature through `flink run`, run the following shell command.
169170
[--including-tables <mysql-table-name|name-regular-expr>] \
170171
[--excluding-tables <mysql-table-name|name-regular-expr>] \
171172
[--mode <sync-mode>] \
173+
[--metadata-column <metadata-column>] \
172174
[--type-mapping <option1,option2...>] \
173175
[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...]] \
174176
[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \

docs/layouts/shortcodes/generated/mysql_sync_database.html

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@
6161
<td><h5>--mode</h5></td>
6262
<td>It is used to specify synchronization mode.<br />Possible values:<ul><li>"divided" (the default mode if you haven't specified one): start a sink for each table, the synchronization of the new table requires restarting the job.</li><li>"combined": start a single combined sink for all tables, the new table will be automatically synchronized.</li></ul></td>
6363
</tr>
64+
<tr>
65+
<td><h5>--metadata-column</h5></td>
66+
<td>--metadata-column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, such as the `table_name`, `database_name`, and `op_ts`. Each configuration should be specified in the format "key=value". See its <a href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#available-metadata">document</a> for a complete list of available metadata.</td>
67+
</tr>
6468
<tr>
6569
<td><h5>--type-mapping</h5></td>
6670
<td>It is used to specify how to map MySQL data type to Paimon type.<br />

docs/layouts/shortcodes/generated/mysql_sync_table.html

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@
6363
<td><h5>--computed-column</h5></td>
6464
<td>The definitions of computed columns. The argument field is from MySQL table field name. See <a href="#computed-functions">here</a> for a complete list of configurations. </td>
6565
</tr>
66+
<tr>
67+
<td><h5>--metadata-column</h5></td>
68+
<td>--metadata-column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, such as the `table_name`, `database_name`, and `op_ts`. Each configuration should be specified in the format "key=value". See its <a href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#available-metadata">document</a> for a complete list of available metadata.</td>
69+
</tr>
6670
<tr>
6771
<td><h5>--mysql-conf</h5></td>
6872
<td>The configuration for Flink CDC MySQL sources. Each configuration should be specified in the format "key=value". hostname, username, password, database-name and table-name are required configurations, others are optional. See its <a href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options">document</a> for a complete list of configurations.</td>

paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,5 +178,20 @@ public T deserialize(JsonParser parser, DeserializationContext context)
178178
});
179179
}
180180

181+
/**
182+
* Converts the given Java object into its corresponding {@link JsonNode} representation.
183+
*
184+
* <p>This method utilizes the Jackson {@link ObjectMapper}'s valueToTree functionality to
185+
* transform any Java object into a JsonNode, which can be useful for various JSON tree
186+
* manipulations without serializing the object into a string format first.
187+
*
188+
* @param <T> The type of the input object.
189+
* @param value The Java object to be converted.
190+
* @return The JsonNode representation of the given object.
191+
*/
192+
public static <T> JsonNode toTree(T value) {
193+
return OBJECT_MAPPER_INSTANCE.valueToTree(value);
194+
}
195+
181196
private JsonSerdeUtil() {}
182197
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,8 @@ public static Schema buildPaimonSchema(
149149
List<String> specifiedPrimaryKeys,
150150
List<ComputedColumn> computedColumns,
151151
Map<String, String> tableConfig,
152-
Schema sourceSchema) {
152+
Schema sourceSchema,
153+
CdcMetadataConverter[] metadataConverters) {
153154
Schema.Builder builder = Schema.newBuilder();
154155

155156
// options
@@ -170,6 +171,10 @@ public static Schema buildPaimonSchema(
170171
builder.column(computedColumn.columnName(), computedColumn.columnType());
171172
}
172173

174+
for (CdcMetadataConverter metadataConverter : metadataConverters) {
175+
builder.column(metadataConverter.getColumnName(), metadataConverter.getDataType());
176+
}
177+
173178
// primary keys
174179
if (!specifiedPrimaryKeys.isEmpty()) {
175180
Map<String, Integer> sourceColumns =
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.action.cdc;
20+
21+
import org.apache.paimon.types.DataType;
22+
23+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
24+
25+
import java.io.Serializable;
26+
import java.util.Map;
27+
28+
/**
29+
* A functional interface for converting CDC metadata.
30+
*
31+
* <p>This interface provides a mechanism to convert Change Data Capture (CDC) metadata from a given
32+
* {@link JsonNode} source. Implementations of this interface can be used to process and transform
33+
* metadata entries from CDC sources.
34+
*/
35+
public interface CdcMetadataConverter extends Serializable {
36+
37+
Map<String, String> read(JsonNode var1);
38+
39+
DataType getDataType();
40+
41+
String getColumnName();
42+
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.paimon.flink.FlinkConnectorOptions;
2525
import org.apache.paimon.flink.action.Action;
2626
import org.apache.paimon.flink.action.ActionBase;
27+
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
2728
import org.apache.paimon.flink.action.cdc.ComputedColumn;
2829
import org.apache.paimon.flink.action.cdc.TypeMapping;
2930
import org.apache.paimon.flink.action.cdc.kafka.format.DataFormat;
@@ -157,7 +158,12 @@ public void build(StreamExecutionEnvironment env) throws Exception {
157158
buildComputedColumns(computedColumnArgs, kafkaSchema);
158159
Schema fromKafka =
159160
buildPaimonSchema(
160-
partitionKeys, primaryKeys, computedColumns, tableConfig, kafkaSchema);
161+
partitionKeys,
162+
primaryKeys,
163+
computedColumns,
164+
tableConfig,
165+
kafkaSchema,
166+
new CdcMetadataConverter[] {});
161167

162168
try {
163169
table = (FileStoreTable) catalog.getTable(identifier);

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.paimon.catalog.Identifier;
2323
import org.apache.paimon.flink.FlinkConnectorOptions;
2424
import org.apache.paimon.flink.action.ActionBase;
25+
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
2526
import org.apache.paimon.flink.action.cdc.ComputedColumn;
2627
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
2728
import org.apache.paimon.flink.sink.cdc.EventParser;
@@ -141,7 +142,8 @@ public void build(StreamExecutionEnvironment env) throws Exception {
141142
Collections.emptyList(),
142143
computedColumns,
143144
tableConfig,
144-
mongodbSchema);
145+
mongodbSchema,
146+
new CdcMetadataConverter[] {});
145147
// Check if table exists before trying to get or create it
146148
if (catalog.tableExists(identifier)) {
147149
table = (FileStoreTable) catalog.getTable(identifier);

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
package org.apache.paimon.flink.action.cdc.mysql;
2525

2626
import org.apache.paimon.catalog.Identifier;
27+
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
2728
import org.apache.paimon.flink.action.cdc.ComputedColumn;
2829
import org.apache.paimon.flink.action.cdc.TableNameConverter;
2930
import org.apache.paimon.flink.action.cdc.TypeMapping;
@@ -35,6 +36,7 @@
3536
import org.apache.paimon.types.DataField;
3637
import org.apache.paimon.types.RowKind;
3738
import org.apache.paimon.utils.DateTimeUtils;
39+
import org.apache.paimon.utils.JsonSerdeUtil;
3840
import org.apache.paimon.utils.Preconditions;
3941
import org.apache.paimon.utils.StringUtils;
4042

@@ -102,12 +104,14 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> {
102104
// NOTE: current table name is not converted by tableNameConverter
103105
private String currentTable;
104106
private boolean shouldSynchronizeCurrentTable;
107+
private final CdcMetadataConverter[] metadataConverters;
105108

106109
public MySqlDebeziumJsonEventParser(
107110
ZoneId serverTimeZone,
108111
boolean caseSensitive,
109112
List<ComputedColumn> computedColumns,
110-
TypeMapping typeMapping) {
113+
TypeMapping typeMapping,
114+
CdcMetadataConverter[] metadataConverters) {
111115
this(
112116
serverTimeZone,
113117
caseSensitive,
@@ -116,7 +120,8 @@ public MySqlDebeziumJsonEventParser(
116120
new MySqlTableSchemaBuilder(new HashMap<>(), caseSensitive, typeMapping),
117121
null,
118122
null,
119-
typeMapping);
123+
typeMapping,
124+
metadataConverters);
120125
}
121126

122127
public MySqlDebeziumJsonEventParser(
@@ -126,7 +131,8 @@ public MySqlDebeziumJsonEventParser(
126131
NewTableSchemaBuilder<TableChanges.TableChange> schemaBuilder,
127132
@Nullable Pattern includingPattern,
128133
@Nullable Pattern excludingPattern,
129-
TypeMapping typeMapping) {
134+
TypeMapping typeMapping,
135+
CdcMetadataConverter[] metadataConverters) {
130136
this(
131137
serverTimeZone,
132138
caseSensitive,
@@ -135,7 +141,8 @@ public MySqlDebeziumJsonEventParser(
135141
schemaBuilder,
136142
includingPattern,
137143
excludingPattern,
138-
typeMapping);
144+
typeMapping,
145+
metadataConverters);
139146
}
140147

141148
public MySqlDebeziumJsonEventParser(
@@ -146,7 +153,8 @@ public MySqlDebeziumJsonEventParser(
146153
NewTableSchemaBuilder<TableChanges.TableChange> schemaBuilder,
147154
@Nullable Pattern includingPattern,
148155
@Nullable Pattern excludingPattern,
149-
TypeMapping typeMapping) {
156+
TypeMapping typeMapping,
157+
CdcMetadataConverter[] metadataConverters) {
150158
this.serverTimeZone = serverTimeZone;
151159
this.caseSensitive = caseSensitive;
152160
this.computedColumns = computedColumns;
@@ -155,6 +163,7 @@ public MySqlDebeziumJsonEventParser(
155163
this.includingPattern = includingPattern;
156164
this.excludingPattern = excludingPattern;
157165
this.typeMapping = typeMapping;
166+
this.metadataConverters = metadataConverters;
158167
}
159168

160169
@Override
@@ -416,6 +425,10 @@ else if (Date.SCHEMA_NAME.equals(className)) {
416425
computedColumn.eval(resultMap.get(computedColumn.fieldReference())));
417426
}
418427

428+
for (CdcMetadataConverter metadataConverters : metadataConverters) {
429+
resultMap.putAll(metadataConverters.read(JsonSerdeUtil.toTree(root.payload())));
430+
}
431+
419432
return resultMap;
420433
}
421434

0 commit comments

Comments
 (0)