-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Flink: FLIP-27 Iceberg source split #3501
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/Position.java
Outdated
Show resolved
Hide resolved
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hashCode(splitId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the value of toString() to calculate the hasCode is not a good practice because if some FileScanTask implementation did not implement the correct toString(), it will use the instance's hashCode as its toString() value. That leads to the meaningless of comparing two IcebergSourceSplit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we have the cached serializedFormCache, why not use the lazy approach to get the serialized bytes and calculate its byte array's hashCode ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as mentioned in the other comment, serializedFormCache can't use the lazy approach because the serialization is done by a separate serializer class. IcebergSourceSplit doesn't know how to serialize itself. Hence we can't calculate the byte array's hashCode.
We aren't not using the toString from FileScanTask. We are only using fileScanTask.file().path().toString().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that we have a guarantee that serializedFormCache will be set unless the split has in fact been serialized via IcebergSourceSplitSerializer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kbendick you are correct. I had a typo in my earlier comment. should be "can't use the lazy approach because ..."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then why not assign an unique integer number to the IcebergSourceSplit as splitId when planing the tasks in FlinkSplitPlanner#planIcebergSourceSplits ? I think keeping the toString approach as the identifier did not answer the question I raised in the first comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure we want to assign an unique integer number as splitId (especially for long-running streaming jobs).
We need to checkpoint the splitId counter. what if we can't restore from checkpoint (e.g. due to corrupted checkpoint state)? It is probably better to compute the splitId based on the intrinsic properties of IcebergSourceSplit (like path, start, length of data files).
I thought your first comment is about depending on FileScanTask#toString, which I explained that it is not the case. We don't call FileScanTask#toString. Instead, we depend on fileScanTask.file().path().toString(). Maybe I misunderstood your first comment. Can you elaborate a little more on your concern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't think it valid to make the hash code build on top of the FileScanTask#toString because this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@openinx not sure where is the miscommunication. but we don't call FileScanTask#toString here. Instead, we only call FileScanTask.file().path().toString(). Please see this code snippet below.
private String toString(Collection<FileScanTask> files) {
return Iterables.toString(files.stream().map(fileScanTask ->
MoreObjects.toStringHelper(fileScanTask)
.add("file", fileScanTask.file() != null ?
fileScanTask.file().path().toString() :
"NoDataFile")
.add("start", fileScanTask.start())
.add("length", fileScanTask.length())
.toString()).collect(Collectors.toList()));
}
| void serializedFormCache(byte[] cachedBytes) { | ||
| this.serializedFormCache = cachedBytes; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use the lazily approach to serialize the split which is similar to the Schema.
| private Map<Integer, NestedField> lazyIdToField() { |
Setting the cachedBytes looks a bit strange for me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Serialization is done by another class IcebergSourceSplitSerializer. It can't be computed and cached internally. Hence this setter provides a way for IcebergSourceSplitSerializer to cache the serialized bytes.
lazyIdToField can work because everything is encapsulated within the Schema class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I'm wrong, but it seems like it is lazy and only set via the first call to IcebergSourceSplitSerializer.
Maybe the name is what looks a bit odd to you? I had to read it a few times myself, but I'm not as quick at Flink stuff as you two (or in general).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me, lazy pattern is encapsulated within the class. E.g., Schema class knows how to compute the IdToField mapping in the lazyIdToField method. Here, we have a more getter/setter protocol btw two classes for caching serialized bytes.
| .add("file", fileScanTask.file() != null ? | ||
| fileScanTask.file().path().toString() : | ||
| "NoFile") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the FileScanTask#file() is guaranteed to provide a non-null value, otherwise the start() and length() won't have any meaning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will remove the null check here
|
|
||
| package org.apache.iceberg.flink.source.split; | ||
|
|
||
| public enum IcebergSourceSplitStatus { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this class related to this source split ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not used by this PR. will remove
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
Outdated
Show resolved
Hide resolved
kbendick
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments.
Overall this looks really good to me. Would there be any benefit to us if we used FileSourceSplit interface instead? Seems like that's a bit too general for our use case. And the SerDe is versioned anyway.
I'm going to approve this as this seems ready to me, give or take a few nits that I'll leave up to you and some questions for my own understanding. I'll still come back to follow up on comments etc, but I think this is a good direction to build off of.
Appreciate all of the work you've put in on the FLIP-27 front. 👍
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
Outdated
Show resolved
Hide resolved
| * <a href="https://github.com/apache/iceberg/issues/1698">issue-1698</a>. | ||
| */ | ||
| @Internal | ||
| public class IcebergSourceSplitSerializer implements SimpleVersionedSerializer<IcebergSourceSplit> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this get used to serialize / deserialize the splits across task boundaries, or just for checkpoints?
The comment in IcebergSourceSplit for serializedFormCache field mentions checkpoints, but curious about crossing task boundaries etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question for my own understanding after looking through SimpleVersionedSerializer docs:
It seems like SimpleVersionedSerializer can only handle one version at a time (hence the getVersion() function).
Is that correct? Are there any best practices when working with SimpleVersionedSerializer to consider? When we evolve, will we have two classes or put all known classes into one instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is used for checkpoint state serializer. cross-process (JM->TM) communication is via Java serializable. Currently, we are using the Java serializable inside this class too for simpler start. This is not ideal, as we know Java serializable is not good with schema evolution. Schema evolution would be important for long-running streaming jobs (not so much for batch jobs).
In the class Javadoc, we linked to an issue for future improvement. Note that this is already an issue for the current FlinkSource in streaming mode.
#1698
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SimpleVersionedSerializer will always use one/latest version to serialize. But during deserialization, it should handle multiple versions to support evolution (e.g. when we switch from Java serializable to some Avro serialization for FileScanTask)
@Override
public IcebergSourceSplit deserialize(int version, byte[] serialized) throws IOException {
switch (version) {
case 1:
return deserializeV1(serialized);
default:
throw new IOException("Unknown version: " + version);
}
}
| case 1: | ||
| return deserializeV1(serialized); | ||
| default: | ||
| throw new IOException("Unknown version: " + version); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: It might help to mention the highest known version (assuming that we'll use monotonically increasing versioning). Or at the least, since most users won't be aware of SimpleVersionedSerializer, mentioning what the unknown version means (seems like this would happen in theory just with data written by a newer library version or maybe corrupted data / some kind of bug).
Something like Failed to deserialize IcebergSourceSplit. Encountered unknown version $version. The maximum version that can be handled is ${currentVersion}.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"maximum version" is probably also not accurate. it implies all versions below this number are supported. We may drop support of deserializing older versions. I will change the error msg to sth like
Failed to deserialize IcebergSourceSplit. Encountered unsupported version: $version. Supported version are [1]"
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
Show resolved
Hide resolved
| * Caching the byte representation makes repeated serialization cheap. | ||
| */ | ||
| @Nullable | ||
| private transient byte[] serializedFormCache; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This is a great idea.
However, the name is a little confusing for me. Maybe lzySerializedBytes or something (sort of like @openinx's comment)? Just a nit as I don't want to bike shed on the name. Clever idea overall.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will change it to serializedBytesCache. This is not a "lazy" pattern, which is typically encapsulated within the class. this is a getter/setter interaction btw IcebergSourceSplit and IcebergSourceSplitSerializer
| void serializedFormCache(byte[] cachedBytes) { | ||
| this.serializedFormCache = cachedBytes; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I'm wrong, but it seems like it is lazy and only set via the first call to IcebergSourceSplitSerializer.
Maybe the name is what looks a bit odd to you? I had to read it a few times myself, but I'm not as quick at Flink stuff as you two (or in general).
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hashCode(splitId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that we have a guarantee that serializedFormCache will be set unless the split has in fact been serialized via IcebergSourceSplitSerializer.
@kbendick the initial implementation does have When we are ready to support vectorized readers in Flink, we need to make sure they support delete filter properly. There is one open PR for Orc: #2566 |
84a662f to
c03459f
Compare
kbendick
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looked at the updates and this looks good to me 👍 .
I agree on not extending FileSourceSplit. Seems like it's more complicated than we need it to be and we also need the delete filter support.
|
@openinx can you take another look? |
|
Since we've enabled all the engine version's checkstyle check, let's rerun this travis CI again ! #3550 |
| if (context.includeColumnStats()) { | ||
| scan = scan.includeColumnStats(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why add this switch in this PR ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will recommend to make this PR to add the flip-27 source split as focused as possible. So it will recommend to remove the unrelated changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
column stats are needed for event time/watermark aligned assigner. You are correct that it is directly used by this PR. Right now, I am taking the approach of splitting sub PRs at minimally connected files for easier creation of the sub PRs. if you think it is important to avoid unrelated changes inside a file, I can revert the piece of change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu How is your feeling for this comment ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a reasonable addition. I think the motivation is to change the file in just this PR and not in the others that are part of FLIP-27.
| /** | ||
| * This returns splits for the FLIP-27 source | ||
| */ | ||
| public static List<IcebergSourceSplit> planIcebergSourceSplits( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think we don't need to switch to a new line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
| if (o == null || getClass() != o.getClass()) { | ||
| return false; | ||
| } | ||
| final Position that = (Position) o; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we usually don't use final for a local variable in iceberg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. will remove
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
Outdated
Show resolved
Hide resolved
| private static final long serialVersionUID = 1L; | ||
|
|
||
| private final CombinedScanTask task; | ||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Could you pls leave a separate empty line between the two different block ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two private final variables. Hence I thought it is one block. it is just that the 2nd variable has a Javadoc. If it is Iceberg's style convention to always start a Javadoc comment with an empty line, I am very happy to conform
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this minor comment need to be addressed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to // comments
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hashCode(splitId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then why not assign an unique integer number to the IcebergSourceSplit as splitId when planing the tasks in FlinkSplitPlanner#planIcebergSourceSplits ? I think keeping the toString approach as the identifier did not answer the question I raised in the first comment.
| public void updatePosition(int newFileOffset, long newRecordOffset) { | ||
| position.update(newFileOffset, newRecordOffset); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No usage for this method ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is not used here. But it is used by IcebergSourceRecordEmitter.
Again, I am creating the PR for minimal connected files (not minimally connected code within files).
| @Override | ||
| public byte[] serialize(IcebergSourceSplit split) throws IOException { | ||
| if (split.serializedBytesCache() == null) { | ||
| final byte[] result = serializeV1(split); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: remove the unnecessary final modifier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
|
|
||
| public static List<IcebergSourceSplit> createFileSplits( | ||
| TemporaryFolder temporaryFolder, int fileCount, int filesPerSplit) throws Exception { | ||
| final File warehouseFile = temporaryFolder.newFolder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: it is recommended to remove all those unnecessary final modifiers to keep consistent with iceberg coding styles in my mind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thx. will do a complete pass on finding and removing the unnecessary final modifiers
| .map(files -> new BaseCombinedScanTask(files)) | ||
| .map(combinedScanTask -> IcebergSourceSplit.fromCombinedScanTask(combinedScanTask)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's good to use method lambda here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
| }) | ||
| .collect(Collectors.toList()); | ||
| } finally { | ||
| catalog.dropTable(TestFixtures.TABLE_IDENTIFIER); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we will drop the table to remove data & metadata files in the end, could we read the IcebergSourceSplit in the outside method ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. I see why this can be confusing.
Right now, this method is used only by TestIcebergSourceSplitSerializer to generate some realistic splits with actual paths (unlike the createMockedSplits). I will add some comments both at method level and here. Let me know if I should move this method into the TestIcebergSourceSplitSerializer. I kept it here so that it might be useful for other testing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the method has been renamed and hopefully it won't be confusing. also updated javadoc
| final IcebergSourceSplit deserialized2 = serializer.deserialize(serializer.getVersion(), cachedResult); | ||
| Assert.assertEquals(split, deserialized2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just use the deserialized ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we already asserted deserialized in line 48. this is to make sure deserialized2 from the 2nd call from serializer.deserialize (cached bytes) still gets the same split.
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
Outdated
Show resolved
Hide resolved
| void serializedFormCache(byte[] cachedBytes) { | ||
| this.serializedFormCache = cachedBytes; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Serialization is done by another class IcebergSourceSplitSerializer. It can't be computed and cached internally. Hence this setter provides a way for IcebergSourceSplitSerializer to cache the serialized bytes.
lazyIdToField can work because everything is encapsulated within the Schema class.
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hashCode(splitId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as mentioned in the other comment, serializedFormCache can't use the lazy approach because the serialization is done by a separate serializer class. IcebergSourceSplit doesn't know how to serialize itself. Hence we can't calculate the byte array's hashCode.
We aren't not using the toString from FileScanTask. We are only using fileScanTask.file().path().toString().
| .add("file", fileScanTask.file() != null ? | ||
| fileScanTask.file().path().toString() : | ||
| "NoFile") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will remove the null check here
|
|
||
| package org.apache.iceberg.flink.source.split; | ||
|
|
||
| public enum IcebergSourceSplitStatus { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not used by this PR. will remove
| .map(files -> new BaseCombinedScanTask(files)) | ||
| .map(combinedScanTask -> IcebergSourceSplit.fromCombinedScanTask(combinedScanTask)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
| }) | ||
| .collect(Collectors.toList()); | ||
| } finally { | ||
| catalog.dropTable(TestFixtures.TABLE_IDENTIFIER); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. I see why this can be confusing.
Right now, this method is used only by TestIcebergSourceSplitSerializer to generate some realistic splits with actual paths (unlike the createMockedSplits). I will add some comments both at method level and here. Let me know if I should move this method into the TestIcebergSourceSplitSerializer. I kept it here so that it might be useful for other testing
| final IcebergSourceSplit deserialized2 = serializer.deserialize(serializer.getVersion(), cachedResult); | ||
| Assert.assertEquals(split, deserialized2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we already asserted deserialized in line 48. this is to make sure deserialized2 from the 2nd call from serializer.deserialize (cached bytes) still gets the same split.
flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
Outdated
Show resolved
Hide resolved
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hashCode(splitId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@openinx not sure where is the miscommunication. but we don't call FileScanTask#toString here. Instead, we only call FileScanTask.file().path().toString(). Please see this code snippet below.
private String toString(Collection<FileScanTask> files) {
return Iterables.toString(files.stream().map(fileScanTask ->
MoreObjects.toStringHelper(fileScanTask)
.add("file", fileScanTask.file() != null ?
fileScanTask.file().path().toString() :
"NoDataFile")
.add("start", fileScanTask.start())
.add("length", fileScanTask.length())
.toString()).collect(Collectors.toList()));
}
| try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table, context)) { | ||
| List<IcebergSourceSplit> splits = Lists.newArrayList(); | ||
| tasksIterable.forEach(task -> splits.add(IcebergSourceSplit.fromCombinedScanTask(task))); | ||
| return splits; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lists.transform is another option here instead of separate lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure. will change to this one-liner
return Lists.newArrayList(CloseableIterable.transform(tasksIterable,
task -> IcebergSourceSplit.fromCombinedScanTask(task)));
| this.recordOffset += 1L; | ||
| } | ||
|
|
||
| public void update(int newFileOffset, long newRecordOffset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems more like a set method to me since it directly sets the internal counters rather than advancing them using an amount.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure. will rename the method to set
| } | ||
| Position that = (Position) o; | ||
| return Objects.equals(fileOffset, that.fileOffset) && | ||
| Objects.equals(recordOffset, that.recordOffset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are both primitives, so you can use == instead of Objects.equals.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to prefer Object.equals as it immediately does == check anyway, and then I don't have to think about it in the future.
If the project style prefers == where possible, then by all means go with that.
Object.equals code per OpenJDK 8.
public static boolean equals(Object a, Object b) {
return (a == b) || (a != null && a.equals(b));
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are primitives, so calling equals(Object, Object) will box both values just to do a more expensive check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh good call hadn’t considered the boxing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will change to ==
| * </ul> | ||
| */ | ||
| @Internal | ||
| public class Position implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SplitPosition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will rename the class to SplitPosition
|
@stevenzwu does this work with Flink 1.14? |
This should work for 1.14. The convention in the repo is to do large PRs against one version and then back port in another PR (usually we apply it on the latest supported version, but this has existed for a while so latest at the time was 1.13). Unless you’re aware of a specific reason this would not work wirh 1.14, I’m fairly certain it will given that we’re using the newer interfaces. And on top of that, we’re using one of the higher level split interfaces. |
2ea704a to
1ad29e8
Compare
…ceSplit split` arg
…expose fileOffset and recordOffset directly.
…the same package as ScanContext
|
Looks good now. Thanks, @stevenzwu! |
This reverts commit d2c26a0.
This PR mainly implements
IcebergSourceSplitand its serializer. Other classes connected to the source split (likeFlinkSplitPlannerandScanContext) are also included. As we only tried to minimize the set of connected files in this PR (not the lines of code), there could be some changes within some files aren't directly related to the main purpose of this change (although needed by the uber PR #2105 ).This is against v1.13 only. Will port to v1.14 when it is ready. we will skip v1.12 for FLIP-27 source.