-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[flink] CDC Ingestion supported metadata columns #2077
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
697c743 to
fafa050
Compare
fafa050 to
fb760c8
Compare
c197aa7 to
92c26ea
Compare
92c26ea to
2c9e22e
Compare
|
Please rebase master |
|
@JingsongLi PTAL,thanks. |
JingsongLi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @MonsterChenzhuo , left some minor comments.
|
|
||
| @Override | ||
| public Map<String, String> read(JsonNode record) { | ||
| String dbName = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dbName?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
|
|
||
| @Override | ||
| public Map<String, String> read(JsonNode record) { | ||
| String dbName = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dbName?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
| */ | ||
| public interface CdcMetadataConverter extends Serializable { | ||
|
|
||
| Map<String, String> read(JsonNode var1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var1 to a meaningful name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
| computedColumn.eval(resultMap.get(computedColumn.fieldReference()))); | ||
| } | ||
|
|
||
| for (CdcMetadataConverter metadataConverters : metadataConverters) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
metadataConverter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
| record.get("source").get(AbstractSourceInfo.TABLE_NAME_KEY).asText(); | ||
| Map<String, String> resultMap = new HashMap<>(); | ||
| resultMap.put("table_name", dbName); | ||
| return resultMap; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collections.singletonMap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
| record.get("source").get(AbstractSourceInfo.DATABASE_NAME_KEY).asText(); | ||
| Map<String, String> resultMap = new HashMap<>(); | ||
| resultMap.put("database_name", dbName); | ||
| return resultMap; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collections.singletonMap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
| record.get("source").get(AbstractSourceInfo.TIMESTAMP_KEY).asText(); | ||
| Map<String, String> resultMap = new HashMap<>(); | ||
| resultMap.put("op_ts", dbName); | ||
| return resultMap; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collections.singletonMap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
JingsongLi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
(cherry picked from commit 04179e3) # Conflicts: # paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java # paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java # paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
(cherry picked from commit 04179e3)
Purpose
close #1985
Tests
MySqlSyncDatabaseActionITCase#testMetadataColumns()
MySqlSyncTableActionITCase#testMetadataColumns()
API and Format
Documentation