-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: FLIP-27 source reader #4269
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
This PR has no unit test as it mainly contains the plumbing classes for Iceberg source. We will have full test with Iceberg source. |
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSourceEvents.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSourceEvents.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSourceEvents.java
Outdated
Show resolved
Hide resolved
...4/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReaderMetrics.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java
Outdated
Show resolved
Hide resolved
...4/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReaderMetrics.java
Outdated
Show resolved
Hide resolved
....14/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
Outdated
Show resolved
Hide resolved
|
|
||
| IcebergSourceSplit nextSplit = splits.poll(); | ||
| if (nextSplit == null) { | ||
| throw new IOException("No split remaining"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a comment to explain why this is the right behavior. It seems odd to me that it should throw an exception when it hasn't received new splits. Maybe that is the expectation from how fetch is called, but it is definitely concerning to see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great question. I looked into Flink code again. Throwing an IOException will cause the fetcher to be closed so that we don't have to poll the fetcher again. that is not a problem, because new split later will create new fetcher.
This is the FetchTask that calls splitReader.fetch. We can see that the return value can't be null. I agree that this is a little weird behavior. cc @tweise
public boolean run() throws IOException {
try {
if (!isWakenUp() && lastRecords == null) {
lastRecords = splitReader.fetch();
}
if (!isWakenUp()) {
// The order matters here. We must first put the last records into the queue.
// This ensures the handling of the fetched records is atomic to wakeup.
if (elementsQueue.put(fetcherIndex, lastRecords)) {
if (!lastRecords.finishedSplits().isEmpty()) {
// The callback does not throw InterruptedException.
splitFinishedCallback.accept(lastRecords.finishedSplits());
}
lastRecords = null;
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aded a method comment section to explain the IOException behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably better as an inline comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to inline comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about making it a blocking poll that can be waken up by #wakeUp per
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java#L37-L48
It's ok to keep it simple at first though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yittg that is a good question. In theory, I agree it is a good behavior. I haven't figured a good way to implement the wakeUp behavior
Say there is no split available from the queue. if fetch() method doesn't return, it would need to poll the splits queue to check for new split in some loop with sleep. if the wakerUp is called and set the wakeUp flag for the fetch method to break out of the loop and return an empty RecordsWithSplitIds with no records and no finished splits.
We can't use wait-notify to signal btw wakeUp and fetch. Otherwise, fetch method won't do a splits queue polling and discover new splits during wait. That is also undesirable.
Actually fetch, add splits, wakeup tasks are all enqueued executed by SingleThreadFetcherManager. not sure if we can leverage notify here.
Maybe there is other way I haven't thought about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My colleague suggested a good way. We can just return an empty result in this case. it will cause fetcher to be idle and SplitFetcherManager will closes idle fetcher. At least, it is like a normal flow (than IOException). I pushed a commit for the change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM roughly, thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forgot to mention. For the change of return an empty result for no split case, it has been validated by the unit test in the uber draft PR, which contains MiniCluster unit tests for both batch and streaming executions. #2105
| private IcebergSourceSplit currentSplit; | ||
| private String currentSplitId; | ||
|
|
||
| IcebergSourceSplitReader(ReaderFunction<T> readerFunction, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be better to call this openSplitFunction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still feel ReaderFunction is more accurate. here is the interface from previously merged PR. It basically read a split and return an iterator of record batch.
public interface ReaderFunction<T> extends Serializable,
Function<IcebergSourceSplit, CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>>> {
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed the variable to openSplitFunction
| } | ||
|
|
||
| /** Sets the position without setting a record. */ | ||
| public void position(int newFileOffset, long newRecordOffset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removing an unused method
efbd7a7 to
849346d
Compare
| public void handleSplitsChanges(SplitsChange<IcebergSourceSplit> splitsChange) { | ||
| if (!(splitsChange instanceof SplitsAddition)) { | ||
| throw new UnsupportedOperationException(String.format( | ||
| "The SplitChange type of %s is not supported.", splitsChange.getClass())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unsupported split change: %s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thx. done
| "The SplitChange type of %s is not supported.", splitsChange.getClass())); | ||
| } | ||
|
|
||
| LOG.info("Add splits to reader: {}", splitsChange.splits()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May want to convert this to a count rather than locations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
…e split fetcher to be idle. SplitFetcherManager closes idle fetcher.
No description provided.