-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: Fix flink manifest location collision when there are multiple committers for multiple sink tables #3986
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I found the ci failed: https://github.com/apache/iceberg/runs/4947986168?check_suite_focus=true#step:6:203 |
| import org.apache.iceberg.relocated.com.google.common.base.Strings; | ||
|
|
||
| class ManifestOutputFileFactory { | ||
| private static final AtomicInteger fileCount = new AtomicInteger(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure making it static is the right fix. Instead, we probably should fix the generatePath method and include the full table name in the path.
private String generatePath(long checkpointId) {
return FileFormat.AVRO.addExtension(String.format("%s/%s-%05d-%d-%d-%05d",
fullTableName, flinkJobId, subTaskId,
attemptNumber, checkpointId, fileCount.incrementAndGet()));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu I think that including the full table name can not solve this problem because the target table may be the same one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm. I didn't think about the case where target table can be the same. However, I also think the static variable may also not be enough. I think we can still get name collision when those committer operators run on different TMs. we may want to use this unique id from StreamingRuntimeContext
/**
* Returned value is guaranteed to be unique between operators within the same job and to be
* stable and the same across job submissions.
*
* <p>This operation is currently only supported in Streaming (DataStream) contexts.
*
* @return String representation of the operator's unique id.
*/
public String getOperatorUniqueID() {
return operatorUniqueID;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu This is the operator unique id is fine. I will fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu I have updated the pr. Could you please review it again? Thanks.
|
@coolderli can you create this PR for 1.14 only? we typically create a PR for the latest version. then we can create backport PR separately. In the backport PR, we show the diffs for the relevant sub folders. e.g. #3870 (comment) |
@stevenzwu Thanks for reminding me about that. I have removed the modify on 1.12 and 1.13. Could you please take a look again? |
| private String generatePath(long checkpointId) { | ||
| return FileFormat.AVRO.addExtension(String.format("%s-%05d-%d-%d-%05d", flinkJobId, subTaskId, | ||
| attemptNumber, checkpointId, fileCount.incrementAndGet())); | ||
| return FileFormat.AVRO.addExtension(String.format("%s-%05d-%s-%d-%d-%05d", flinkJobId, subTaskId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: might be more consistent if we follow the order of jobId, operatorId, subtaskId
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Little things like that to be more consistent make developer experience a lot smoother in the aggregate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, I'll fix it.
|
@coolderli This PR is almost ready. I left a nit comment. Also we probably should update the PR description. The essence of the collision problem is that we may have multiple committers for multiple sink tables, no matter it is the same TM or different TMs. |
| return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, | ||
| operatorUniqueId, attemptNumber); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Can we fit all of the arguments on one line if they're placed on the next line?
return new ManifestOutputFileFactory(
ops, table.io(), table.properties(), flinkJobId, subTaskId, operatorUniqueId, attemptNumber);You could also make the new parameter into operatorUid (or operatorId) to shorten it.
Not a blocker by any means but would be nice if possible. =)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kbendick Thanks, I'll fix it
| static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, int subTaskId, | ||
| long attemptNumber) { | ||
| static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, String operatorUniqueId, | ||
| int subTaskId, long attemptNumber) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this indention seems not matching the current code style
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, but I found the ci had passed. Never mind, I have fixed it.
|
Looks good. Thanks for fixing this, @coolderli! |
(cherry picked from commit 0d9c63e)
When there is more than one iceberg-flink-committer on one task manager, the manifest location generated by flink will have the same location. And then it will have a conflict with each other. I think the
fileCountshould always increment.There is my problem. This is rare, but we'd better fix it.
I think the correct is we should union all sources, then there will be only one iceberg-flink-committer operator.