-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Flink: Use Tag or Branch to scan data. #5029
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
This has a dependency on https://github.com/apache/iceberg/pull/4428/files, I will follow up on that PR. |
c47b6f8 to
2db6835
Compare
|
cc @amogh-jahagirdar @stevenzwu @rdblue, could you please take a look when you are available? |
be61f72 to
f257c2e
Compare
amogh-jahagirdar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this @hililiwei ! left some comments/questions it makes sense to just carry this PR forward based on our discussion in #6660
| scanContext.branch() == null | ||
| ? table.currentSnapshot() | ||
| : table.snapshot(scanContext.branch()); | ||
| Preconditions.checkNotNull( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it'll help if at some point (probably options/conf default) we set the default branch to main and always assume a branch is set in this code. Then we can just do table.snapshot(branch).
Nit on log message:
"No snapshots on branch %s in table %s"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, can we actually do that in this PR? (can we check what is done in Spark, if we decided to go with using main branch or using if else? I cannot remember now)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark doesn't seem to use Main as the default.
My main concern is that main is not compatible with older version iceberg table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we read the table metadata with the current Iceberg version we will set the main branch. https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/TableMetadataParser.java#L444 so it will be compatible with older version of tables.
| long currentSnapshotId = currentSnapshot.snapshotId(); | ||
| long startSnapshotId = table.snapshot(scanContext.startTag()).snapshotId(); | ||
| Preconditions.checkState( | ||
| SnapshotUtil.isAncestorOf(table, currentSnapshotId, startSnapshotId), | ||
| "The option start-snapshot-id %s is not an ancestor of the current snapshot.", | ||
| startSnapshotId); | ||
|
|
||
| lastSnapshotId = startSnapshotId; | ||
| } else if (scanContext.startSnapshotId() != null) { | ||
| Snapshot currentSnapshot = | ||
| scanContext.branch() == null | ||
| ? table.currentSnapshot() | ||
| : table.snapshot(scanContext.branch()); | ||
| Preconditions.checkNotNull( | ||
| table.currentSnapshot(), "Don't have any available snapshot in table."); | ||
| currentSnapshot, | ||
| "Don't have any available snapshot for branch " + scanContext.branch() + " in table."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also sorry if I missed it, but at some point shouldn't there be validation that the start tag is an ancestor of the end tag?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are eventually converted to snapshot id and checked during the planTasks phase
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
Outdated
Show resolved
Hide resolved
|
@stevenzwu since you are reviewing #6660, could you also take a look at this? |
06aa678 to
44f0e5e
Compare
| Preconditions.checkArgument( | ||
| branch == null, | ||
| String.format( | ||
| "Cannot scan table using ref %s configured for streaming reader yet", branch)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#5984 seems to be the prerequisite for flink to implement stream incremental read with branch.
| } else if (scanContext.startTag() != null || scanContext.startSnapshotId() != null) { | ||
| Preconditions.checkArgument( | ||
| !(scanContext.startTag() != null && scanContext.startSnapshotId() != null), | ||
| "START_SNAPSHOT_ID and START_TAG cannot be used both."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: cannot both be set?
jackye1995
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good to me, just 1 nit comment
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java
Outdated
Show resolved
Hide resolved
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/StreamingMonitorFunction.java
Outdated
Show resolved
Hide resolved
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
Outdated
Show resolved
Hide resolved
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
Outdated
Show resolved
Hide resolved
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
Show resolved
Hide resolved
|
@stevenzwu addressed the round of comments, PTAL, thx. |
|
|
||
| public void appendToTable(List<Record> records) throws IOException { | ||
| appendToTable(null, records); | ||
| appendToTable(null, null, records); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this can be appendToTable(null, records), right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently it won't work because we have two methods:
appendToTable(String branch, List<Record> records)
and
appendToTable(StructLike partition, List<Record> records)
Unless I rearrange the order of branch and place it after records, but I prefer to leave it as it is currently, at the front.
amogh-jahagirdar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, thanks @hililiwei for the contribution!
|
Looks like all comments are addressed and we have enough votes, thanks for the work @hililiwei , and thanks for the review @stevenzwu and @amogh-jahagirdar ! |
|
Thanks for the review @jackye1995 @stevenzwu @amogh-jahagirdar |
What is the purpose of the change
Scan data using a specified tag or branch.
Brief change log
Add the following syntax
SQL:
API:
cc @amogh-jahagirdar