-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-35647][route] Allow symbol replacement to simplify routing rules #3428
Conversation
c5d3248
to
323fc05
Compare
flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full-with-repsym.yaml
Show resolved
Hide resolved
flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full-with-repsym.yaml
Show resolved
Hide resolved
...-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
Show resolved
Hide resolved
private TableId resolveReplacement( | ||
TableId originalTable, Tuple3<Selectors, String, String> route) { | ||
if (route.f2 != null) { | ||
return TableId.parse(route.f1.replace(route.f2, originalTable.getTableName())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use Map<TableId, TableId> to reduce this replace and parse operation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, used LoadingCache
to store table mapping relations. As for previously discussed hashCode
caching optimization, I'll open another PR to address it.
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/RouteRule.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank all contributors.
...-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @yuxiqian, @icchux and @WholeWorld-Timothy for the contribution, LGTM
@icchux and @WholeWorld-Timothy I've added you as co-authors in this PR, and thus I closed #2908 and #2947. |
This closes FLINK-35647, inspired by #2908 and #2947. Credit goes to @icchux and @WholeWorld-Timothy.
Currently, we must provide explicit sink-table in pipeline route rules, which means if we'd like to route all tables in specific database and change names in pattern, there's no choice but write all rules separately, which is a chore.
This PR allows specifying a special "replacement symbol" to resemble original table name in route rules like this:
Now,
<>
will be replaced by upstream table name, so any tables likedb.tableX
will be routed tonew_db.tableX
automatically.