-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Flink: Initial implementation of Flink source with the new FLIP-27 source interface #2105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
569f46c to
5c485c9
Compare
flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java
Outdated
Show resolved
Hide resolved
5c485c9 to
acd85f1
Compare
| return inputFiles.get(location); | ||
| } | ||
|
|
||
| public void seek(CheckpointedPosition checkpointedPosition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, we put those two-level iterators inside a single DataIterator, that makes the code a bit complex to read and understand. I'd prefer to make this into two different iterators:
- FileRecordIterator, that will seek the provided row offset and then continue to read the following records.
- CombinedTaskRecordIterator, that will have multiple
FileRecordIterators, it will locate the latest openingFileRecordIteratorand seek to the given row offset to read the following records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense to me.
I have a question about the Map<String, InputFile> inputFiles. Right now, it is constructed per CombinedScanTask. Would it be ok to do it at each individual FileScanTask? I tried the change and delete tests work fine. But I am not sure if I could miss anything since I am not familiar on read merge on deleted rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@openinx can you take a look at my question in the comment above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The inputFiles in DataIterator is a in-memory cache to get the given decrypted InputFile path by decrypted location. We maintain those <location, decryptedInputFile> into map because we are trying to get all the decrypted infos once ( Some EncryptionManager implementation will use this feature to request them in a batch RPC call). It don't have relationship to row-level delete in format v2, we could fetch the <location, decryptedInputFile> one by one but that may produce many RPC to a key server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's worth to follow the original comment because it clearly decouples the file and offset iterators code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitStatus.java
Outdated
Show resolved
Hide resolved
flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitStatus.java
Outdated
Show resolved
Hide resolved
2642ed2 to
5df9919
Compare
build.gradle
Outdated
| compile project(':iceberg-parquet') | ||
| compile project(':iceberg-hive-metastore') | ||
|
|
||
| compileOnly "org.apache.flink:flink-connector-base" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why compileOnly? Does this assume that flink-connector-base would be supplied somehow? If so, what's recommended to users of the library given that flink-dist doesn't bundle flink-connector-base.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the Flink deps are defined as compileOnly in Iceberg. Yeah, it assumes Flink jars are provided at runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This particular jar file won't be provided by the Flink dist. It should be a transitive dependency of the connector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now, the iceberg-flink-runtime shadow jar doesn't bring in any Flink deps. if we include flink-connector-base as compile, then it will be bundled in the iceberg-flink-runtime shadow. if a Flink app pulls in flink-connector-base transitively via other deps (like Flink Kafka connector), then we can get dup classes in jars.
@openinx maybe you can share some lights on how users get the Flink jars when using the Flink Iceberg connector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also wondering if flink-dist should actually include flink-connector-base
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://issues.apache.org/jira/browse/FLINK-20098
It is not desirable to place such dependencies into flink-dist.
Regarding the transitive dependency: it would be surprising for the user to find that they have to add a flink-connector-base dependency to their project for the iceberg connector to work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the dup classes: The the user still has control over the transitive dependency if there is a version mismatch (which is why it should be a transitive dependency and not included via shadow).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tweise thx a lot for the context. now it all makes sense to me. Also didn't notice that iceberg-flink-runtime actually exclude all flink jars. Updated with compile deps.
In the future, if Flink decided to move flink-connector-base and flink-connector-files into flink-dist (as hinted in FLINK-20472), we can revisit the compile dep status.
|
|
||
| @Override | ||
| public Boundedness getBoundedness() { | ||
| return enumeratorConfig.splitDiscoveryInterval() == null ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't boundedness be based on whether the data that's being read has finite bounds to it, i.e., if there's an end timestamp at which the source has to stop reading? You can have finite bounds but still have continuous discovery enabled if the end timestamp is sometime in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is from Javadoc. I think the scenario you described also falls into this CONTINUOUS_UNBOUNDED. I know it is not totally intuitive.
A CONTINUOUS_UNBOUNDED stream may also eventually stop at some point. But before that happens, Flink always assumes the sources are going to run forever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah agree with Steven that it's not always intuitive but it does fall in line with their definition.
| final Table table = loadTable(tableLoader); | ||
| if (enumeratorConfig.splitDiscoveryInterval() == null) { | ||
| final List<IcebergSourceSplit> splits = FlinkSplitGenerator.planIcebergSourceSplits(table, scanContext); | ||
| assigner.onDiscoveredSplits(splits); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we move this to within the StaticIcebergEnumerator so that we can keep the consistency on the interactions between the enumerator and assigner?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree. will change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually this is done intentionally. if split planning failed, we will fail fast during the job initialization. Alternatively If we do the one-time planning in the start method, it will fail at task start in taskmanager. At least, we probably should add some comments to explain this.
flink/src/main/java/org/apache/iceberg/flink/source/IcebergSourceEvents.java
Outdated
Show resolved
Hide resolved
| * A {@link SourceEvent} representing the request for a split, typically sent from the | ||
| * {@link SourceReader} to the {@link SplitEnumerator}. | ||
| * | ||
| * TODO: push change to Flink to carry the finished splitIds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a JIRA for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I forgot to follow up on the Flink side. Created the jira and attached a PR to it.
https://issues.apache.org/jira/browse/FLINK-21364
| readersAwaitingSplit.put(subtaskId, splitRequestEvent.requesterHostname()); | ||
| assignSplits(); | ||
| } else { | ||
| LOG.error("Received unrecognized event from subtask {}: {}", subtaskId, sourceEvent); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this throw exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good question. throwing exception will cause job to fail and restart. explicit failure is probably better.
flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
Outdated
Show resolved
Hide resolved
ea2fc4b to
7c1d4cb
Compare
| import org.apache.iceberg.flink.source.split.IcebergSourceSplit; | ||
|
|
||
| /** | ||
| * Enumerator should call the assigner APIs from the coordinator thread. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe expand the javadoc a little to explain why this is a separate component (from the design doc)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good suggestion. will add
| * If enumerator wasn't able to assign the split (e.g., reader disconnected), | ||
| * enumerator should call {@link SplitAssigner#onUnassignedSplits} to return the split. | ||
| */ | ||
| GetSplitResult getNext(@Nullable String hostname); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also pass the subtask index so that it is possible for an implementation to assign splits to subtasks in a particular order? Multiple subtasks can share a host.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally, subtaskIndex was there. We removed it because we can't think of any use cases needing it. I am definitely open to add it back if there is a concrete use case. Can you elaborate a little?
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.flink.source.assigner; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of this class? How will metrics from the enumerator/assigner be reported to Flink?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Originally, I was planning to have enumerator poll the assigner for the stats. This is for that purpose. @sundargates and I discussed this and think it is probably better to have the assigner directly publish metrics so that we don't have to force a single value class like this for all assigners.
I cleaned up the assigner/enumerator code for avoiding using this. but I forgot to remove this class. will delete it.
| // for batch jobs, discover splits eagerly during job initialization. | ||
| // As FLINK-16866 supports non-blocking job submission since 1.12, | ||
| // heavy job initialization won't lead to request timeout for job submission. | ||
| assigner.onDiscoveredSplits(FlinkSplitGenerator.planIcebergSourceSplits(table, scanContext)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it would be better to rearrange this for clarity: When the assigner was created with enumState.pendingSplits(), then we shouldn't perform eager split discovery here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great catch. this is actually a bug. let me fix it and add a unit test
fc88932 to
bc087ef
Compare
bc087ef to
0491316
Compare
|
@stevenzwu would you mind to make this big PR into several small PRs for reviewing purpose ? |
|
@openinx yes, that is the plan as outlined in the description. I am actually preparing the next PR of split reader |
4b03a13 to
49342b1
Compare
1879167 to
3125378
Compare
117143f to
e2d2f38
Compare
|
👋 Is work still ongoing for the FLIP-27 IceBerg Flink source? |
|
@klam-shop yes, The uber draft PR is meant for the full context for how things work together. It is being broken down to smaller PRs for easier code review. you can check the project for progress. https://github.com/apache/iceberg/projects/23. We are about 60% merged. |
|
Thanks for the quick response @stevenzwu! Do you have an idea of when the FLIP-27 source will be completed? |
|
@klam-shop should be done before end of Q2. right now, main challenge is committers' review bandwidth. |
6fe6607 to
8afddf0
Compare
78fdbce to
228e655
Compare
Hi @stevenzwu, is this PR nearly finished? I found all smaller PRs in project23 are merged, and all the class is ready except |
|
Close this draft PR, as we are moving close to merge the MVP version of FLIP-27 source |
Scope in the first version
This is the uber PR for the reference of complete context. Will submit smaller PRs for the code review
The new
IcebergSourcewill be marked as@Experimentalas FLIP-27 source is maturing and we are making it production ready.Here is the design doc that my colleague (@sundargates) and I created, as mentioned in #1626.