Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -336,9 +336,11 @@ protected FileStatus[] listPartition(Path partitionPath) throws IOException {
if (!metaClient.getFs().exists(partitionPath)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the partition path was created by another process/thread, we should invoke the #listStatus again.

metaClient.getFs().mkdirs(partitionPath);
return new FileStatus[0];
} else {
// in case the partition path was created by another caller
return metaClient.getFs().listStatus(partitionPath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add one line comment here:

// in case the partition path was created by another caller.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, you may want to add the fix to

as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general, I thought list partitions for any given partition can't have concurrent callers. across partitions we will parallelize, but for a given partition, I wasn't aware that there could be concurrent calls. can you throw some light on when this could happen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In flink pipeline, there are cases that we need to look into the partition files through the fs view with this method call, the bucket assigner would check the small files under the partition and the partition may be new.

Multiple bucket assigner may operate on same partitions at the same time.

}
}
throw new HoodieIOException(String.format("Failed to list partition path: %s", partitionPath));
}

/**
Expand Down