-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: Commit both data files and delete files to iceberg transaction. #1939
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| } else { | ||
| // To be compatible with iceberg format V2. | ||
| for (Map.Entry<Long, WriteResult> e : pendingResults.entrySet()) { | ||
| // We don't commit the merged result into a single transaction because for the sequential transaction txn1 and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will provide an unit test to address it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've addressed this case in this unit test here.
| import org.apache.iceberg.ManifestFiles; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.jetbrains.annotations.NotNull; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Em, this could be removed now.
| return this; | ||
| } | ||
|
|
||
| public Builder add(Iterable<WriteResult> results) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typically, we would follow the Java collection convention and use addAll.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, rename it to addAll sound great to me.
| // The completed files cache for current checkpoint. Once the snapshot barrier received, it will be flushed to the | ||
| // 'dataFilesPerCheckpoint'. | ||
| private final List<DataFile> dataFilesOfCurrentCheckpoint = Lists.newArrayList(); | ||
| private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it correct for this to be a list of write results if a write result keeps track of a list of data files and a list of delete files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's correct here. Because if there're 5 IcebergStreamWriter, then each writer will emit a WriteResult. For the one parallelism IcebergFilesCommitter, it will collect all the WriteResult(s) in this writeResultsOfCurrentCkpt cache, and then merge them into a single WriteResult. Finally, write those files into delete + data manifests and update the flink statebackend.
|
Looks good overall, thought I didn't look into the tests very thoroughly. Since this is getting into quite a bit of Flink logic, I'd appreciate it if @JingsongLi and @stevenzwu could also take a look and review. |
| ManifestFile manifestFile = | ||
| SimpleVersionedSerialization.readVersionAndDeSerialize(FlinkManifestSerializer.INSTANCE, manifestData); | ||
| DeltaManifests deltaManifests = | ||
| SimpleVersionedSerialization.readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e.getValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will need to maintain the flink state's compatibility. If the encoding version is 1, then we should use the FlinkManifestSerializer way to read the byte[].
…ts in the lastest sucessful checkpoint.
| @Override | ||
| public ManifestFile deserialize(int version, byte[] serialized) throws IOException { | ||
| return ManifestFiles.decode(serialized); | ||
| public Iterator<ManifestFile> iterator() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to extend from Iterable? It seems only needed for using Iterables.addAll(manifests, deltaManifests);. is it simpler to directly to cal the two getters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, Agreed we don't have to introduce the complex Iterable.
| deleteManifest = deleteManifestWriter.toManifestFile(); | ||
| } | ||
|
|
||
| return new DeltaManifests(dataManifest, deleteManifest); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to check if WriteResult is empty (no data and delete files)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a similar discussion here. Even if the WriteResult is empty ( NOT null, null means there's nobody emitted a result to the IcebergFilesCommitter, while empty WriteResult means the IcebergStreamWriter did not write any new data but still emit a WriterResult with zero data files and zero delete files to downstream IcebergFilesCommitter), we'd better to commit to iceberg txn so that the flink streaming job won't be failure easily when expiring a old snapshot (since that time we did not even write any new records).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About this question, I think we'd better to keep the dummy DeltaManifests in state , although it has no delete files and data files.
| return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber); | ||
| } | ||
|
|
||
| static DeltaManifests writeCompletedFiles(WriteResult result, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just for my own education, referencedDataFiles from WriteResult doesn't seem to be used (except for unit test). What is it for? do we need to serialize it too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should serialize it and add it to the commit. This is the set of files that is referenced by any positional delete, which identifies deleted rows by file and row position. The commit will validate that all of the files still exist in the table.
This isn't strictly needed for this use case because we know that the position deletes only refer to files that are created in this commit. Since the files are being added in the commit, it isn't possible for some other process to delete some of them from metadata. But it is still good to configure the commit properly in case this gets reused later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation, @rdblue . I think it's correct to validate the data files in RowDelta#commit. Will provide an extra unit test to address it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This unit test addressed the data files validation issue
|
|
||
| commitOperation(appendFiles, numFiles, "append", newFlinkJobId, checkpointId); | ||
| int numFiles = 0; | ||
| for (WriteResult result : pendingResults.values()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are using this API from AppendFiles interface. When we had an extended outage and accumulated a few hundreds of transactions/manifests in Flink checkpoint, this help avoiding rewrite of those manifest files. Otherwise, commit can take very long. @rdblue can probably explain it better than I do.
AppendFiles appendManifest(ManifestFile file);
here we are merging data files potentially from multiple checkpoint cycles/manifests into a single manifest file. Maybe we can add a similar API in DeleteFiles interface?
DeleteFiles deleteManifest(ManifestFile file);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds like a separate improvement , so I created an issue for this , let's discuss there, #1959.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can add a similar API in DeleteFiles interface?
We don't currently do this because we need delete entries to exist when we delete files. That way we can track when something was deleted and clean it up incrementally in ExpireSnapshots. If we did have a method like this, it would always rewrite the manifest with deletes, or would need to ensure that the manifest that is added contains only deletes, and these requirements are not very obvious. I think it is better to pass the deleted files through the existing methods.
|
@stevenzwu Thanks for your reviewing, I addressed all the things except the separate issue #1959. any other concerns ? |
stevenzwu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. thanks for opening the jira to track the append manifest file
|
+1 It would be great to have a review from @JingsongLi as well, but I'm going to go ahead and commit this since it looks good to @stevenzwu. |
|
@stevenzwu @rdblue Thanks for the reviewing and merging, @JingsongLi is currently busy for internal flink/blink development work, so he may not have the time to do the double-check now. |
No description provided.