-
Notifications
You must be signed in to change notification settings - Fork 3k
API, Core, Spark:Add file groups failure in rewrite result #7361
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
API, Core, Spark:Add file groups failure in rewrite result #7361
Conversation
|
@szehon-ho and I should be able to review this at some point |
.palantir/revapi.yml
Outdated
| - code: "java.field.removedWithConstant" | ||
| old: "field org.apache.iceberg.TableProperties.HMS_TABLE_OWNER" | ||
| justification: "Removing deprecations for 1.3.0" | ||
| - code: "java.method.numberOfParametersChanged" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we typically don't want to introduce breaking API changes that haven't gone through a deprecation cycle first. See https://iceberg.apache.org/contribute/#minor-version-deprecations-required for some details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks nastra, It is not reasonable to modify here, I will fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@waltczhang can you please revert all changes to the revapi.yml? Those shouldn't be necessary anymore once my other comments are applied
| private Set<DataFile> addedFiles = Collections.emptySet(); | ||
|
|
||
| public RewriteFileGroup(FileGroupInfo info, List<FileScanTask> fileScanTasks) { | ||
| public RewriteFileGroup(FileGroupInfo info, List<FileScanTask> fileScanTasks, boolean isFailed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need to pass a boolean here or could we derive that info maybe in a different way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to set the default, thanks nastra.
szehon-ho
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, I think its a good improvement suggestion, I had some implementation comments
| return rewriteResults().stream().mapToLong(FileGroupRewriteResult::rewrittenBytesCount).sum(); | ||
| } | ||
|
|
||
| @Value.Default |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of re-using the FileGroupRewriteResult, would it make more sense to have a FailedFileGroupRewriteResult, with just failure?
I dont think the bytesCount/fileCount make too much sense on failure, and we can also put the exception there? cc @aokolnychyi @RussellSpitzer for thoughts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is ok that have a FailedFileGroupRewriteResult. What about the following data structure?
FailedFileGroupRewriteResult {
FileGroupInfo info();
boolean isFailedGroup()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why we would need a "boolean" wouldn't it be this class because it failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should probably have result have two new interfaces, something like
List<FileGroupFailureResult> rewriteFailures();
default long failedGroupCount() {
return rewriteFailures().size();
}
default long failedDataFilesRewrites() {
return rewriteFailures().stream().mapToInt(FailedFileGroup::filescount).sum()
}Then we won't mix failures and successes in the same list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @RussellSpitzer reply,
Yes, "boolean" is redundant, They are great advice, I wiil fix them according to your advice.
| .onFailure( | ||
| (fileGroup, exception) -> | ||
| LOG.error("Failure during rewrite group {}", fileGroup.info(), exception)) | ||
| (fileGroup, exception) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we can't avoid touching the commitService. It doesnt seem like its really the responsibility of the commit service to handle the failed ones. Can't this method just keep a list internally? ie,
List<Failure> failures;
...
onFailure(
(fileGroup, exception) -> {
LOG.error("...")
failures.addFailure(fileGroup, exception)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is ok that keep a list internally, According to the comments above , it should be like this :
List< FailedFileGroupRewriteResult > failures;
...
onFailure(
(fileGroup, exception) -> {
LOG.error("...")
failures.addFailure(fileGroup, exception)
}
...
return ImmutableRewriteDataFiles.Result.builder()
.rewriteResults(rewriteResults)
.failedRewriteREsults(failures)
.build();
| commitReadyCommitGroups(); | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method confuses me a bit,
- It's called failedRewrite but sounds like its only for a very specific case of failure
- It will never actually succeed since if it is called when service is closed it will fail the precondition and throw an exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks RussellSpitzer,
It has been modified here, using szehon-ho's suggestion above.
| "Cannot get results from a service which has not been closed"); | ||
| return committedRewrites; | ||
| results.addAll(committedRewrites); | ||
| results.addAll(failedRewrites); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure we should mix these in the same list? Maybe we should split this into just two separate lists (or methods)?
Results() ++
Failures() ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks RussellSpitzer,
It has been modified here.
8ee0aba to
945a548
Compare
nastra
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can solve this without breaking the API. I've added a few comments on how to achieve that
.palantir/revapi.yml
Outdated
| - code: "java.field.removedWithConstant" | ||
| old: "field org.apache.iceberg.TableProperties.HMS_TABLE_OWNER" | ||
| justification: "Removing deprecations for 1.3.0" | ||
| - code: "java.method.numberOfParametersChanged" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@waltczhang can you please revert all changes to the revapi.yml? Those shouldn't be necessary anymore once my other comments are applied
| interface Result { | ||
| List<FileGroupRewriteResult> rewriteResults(); | ||
|
|
||
| List<FileGroupFailureResult> rewriteFailures(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to avoid introducing API-breaking changes, what you'd rather want to do is
@Value.Default
default List<FileGroupFailureResult> rewriteFailures() {
return ImmutableList.of();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your tips, I know how to do and modify.
| private final List<FileGroupRewriteResult> rewriteResults; | ||
|
|
||
| public BaseRewriteDataFilesResult(List<FileGroupRewriteResult> rewriteResults) { | ||
| private final List<FileGroupFailureResult> rewriteFailures; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this class is deprecated and not used anymore, so no need to add changes to it. Not modifying this class also ensures that no API-breaking changes are introduced (as indicated by RevAPI)
| RewriteDataFiles.Result result = spyRewrite.execute(); | ||
|
|
||
| Assert.assertEquals("Should have 7 fileGroups", result.rewriteResults().size(), 7); | ||
| Assert.assertEquals("Should have 3 failed fileGroups", result.rewriteFailures().size(), 3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to change this to
assertThat(result.rewriteResults()).hasSize(7);
assertThat(result.rewriteFailures()).hasSize(3);
assertThat(result.failedDataFilesCount()).isEqualTo(6);
the advantage here is that the content of result.rewriteResults() / result.rewriteFailures() will be shown if the assertion ever fails, which makes debugging a lot easier.
Could you please do the same changes further below in L801ff
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your patience in explaining, I got it.
| RewriteDataFiles.Result result = spyRewrite.execute(); | ||
|
|
||
| Assert.assertEquals("Should have 7 fileGroups", result.rewriteResults().size(), 7); | ||
| assertThat(result.rewriteFailures()).hasSize(3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you also update the line above please?
| RewriteDataFiles.Result result = spyRewrite.execute(); | ||
|
|
||
| Assert.assertEquals("Should have 7 fileGroups", result.rewriteResults().size(), 7); | ||
| assertThat(result.rewriteFailures()).hasSize(3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you also update the line above please?
nastra
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've mainly been focusing on the API pieces and those LGTM, thanks @waltczhang
| commitManager.service(groupsPerCommit); | ||
| commitService.start(); | ||
|
|
||
| List<FileGroupFailureResult> rewriteFailures = Lists.newArrayList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Race Condition here, need to use a concurrent structure.
The code below adds to this arraylist in a concurrent manner, if multiple groups fail at the same time we will end up call "add" simultaneously which will have some unexpected behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks RussellSpitzer for pointing out this problem.
RussellSpitzer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looking pretty good now
Just need to finish cleaning up
- The race condition
- Eduard's comments on the tests
Let me know when that's all done and we can go to merge
| commitManager.service(groupsPerCommit); | ||
| commitService.start(); | ||
|
|
||
| List<FileGroupFailureResult> rewriteFailures = Lists.newCopyOnWriteArrayList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general this would probably be an expensive data-structure for this operation since we never traverse. I don't think this is ever going to be that huge, but let's use something a bit more efficient.
I think something like
ConcurrentLinkedQueue would be fine here, and then we could just create a list out of it when we are done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to note, I don't think we actually would have performance issues with copy on write array list, it just seems a bit wasteful to me in this context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After reading the introduction of newCopyOnWriteArrayList and ConcurrentLinkedQueue, it is true that the latter is more efficient, thank you.
… ConcurrentLinkedQueue
|
LGTM, will merge after tests pass |
|
I noticed that the unit tests have all passed. Could you please merge them when you have time? Thanks. |
|
Merging, since we have 2 approving reviews and CI passed. Thanks @waltczhang. |
Background:
A table with schema (int id, timestamp ts), When configured 'partial-progress.enabled = true' in rewrite job , the job will return success but in fact rewrite failed, so in order to solve this kind of problem, rewrite result should return group failure information.