-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-3042] Refactoring clustering executors #4847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-3042] Refactoring clustering executors #4847
Conversation
| protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) { | ||
| Set<HoodieFileGroupId> newFilesWritten = writeMetadata.getWriteStats().get().stream() | ||
| .map(s -> new HoodieFileGroupId(s.getPartitionPath(), s.getFileId())).collect(Collectors.toSet()); | ||
| // for the below execution strategy, new file group id would be same as old file group id | ||
| if (SparkSingleFileSortExecutionStrategy.class.getName().equals(config.getClusteringExecutionStrategyClass())) { | ||
| return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan) | ||
| .collect(Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList()))); | ||
| } | ||
| return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan) | ||
| .filter(fg -> !newFilesWritten.contains(fg)) | ||
| .collect(Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList()))); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extracted to BaseCommitActionExecutor.java
| private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) { | ||
| if (writeMetadata.getWriteStatuses().isEmpty()) { | ||
| throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + instantTime | ||
| + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least " | ||
| + clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum() | ||
| + " write statuses"); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extracted to BaseCommitActionExecutor.java
| HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(instantTime); | ||
| // Mark instant as clustering inflight | ||
| table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty()); | ||
| table.getMetaClient().reloadActiveTimeline(); | ||
|
|
||
| final Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); | ||
| HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = ((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>) | ||
| ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(), | ||
| new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config)) | ||
| .performClustering(clusteringPlan, schema, instantTime); | ||
| JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses(); | ||
| JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata); | ||
| writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect()); | ||
| writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata)); | ||
| commitOnAutoCommit(writeMetadata); | ||
| if (!writeMetadata.getCommitMetadata().isPresent()) { | ||
| HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(), | ||
| extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType()); | ||
| writeMetadata.setCommitMetadata(Option.of(commitMetadata)); | ||
| } | ||
| return writeMetadata; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extracted to executeClustering() in BaseCommitActionExecutor.java
c89db11 to
94c384e
Compare
94c384e to
7306cb4
Compare
| Iterator<HoodieRecord<T>> recordItr) throws IOException; | ||
|
|
||
| protected HoodieWriteMetadata<HoodieData<WriteStatus>> executeClustering(HoodieClusteringPlan clusteringPlan) { | ||
| HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(instantTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The BaseCommitActionExecutor responsibilities are a bit confusing, it handles regular writing process such as insert, upsert and with this path clustering, then what about the compaction?
Should we make a new base class for table services then ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the goal is to revamp the commit executors and write pipeline altogether later on, so the refactoring here is limited to code reuse. @xushiyan is that the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danny0405 agreed that it looks like some mixed responsibilities there. i'll make clearer separation in https://issues.apache.org/jira/browse/HUDI-2439
yihua
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| */ | ||
| public abstract class JavaExecutionStrategy<T extends HoodieRecordPayload<T>> | ||
| extends ClusteringExecutionStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> { | ||
| extends ClusteringExecutionStrategy<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this Java-specific class going to be removed as a follow-up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yihua yes i should make another PR to deal with ClusteringExecutionStrategy and subclasses, which can be a good separation.
Extract common code from
to BaseCommitActionExecutor.java
Remove redundant engine specific classes
use ClusteringPlanActionExecutor.java instead.
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.