-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-4814] Schedules new clustering plan based on latest clustering instant #6574
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
6fcda13
d6d651e
bcc7396
c8bebb1
6dd530a
277061f
86efca5
7ced8cc
b158b5a
7bdc7a9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,10 +48,10 @@ public class ClusteringPlanActionExecutor<T extends HoodieRecordPayload, I, K, O | |
| private final Option<Map<String, String>> extraMetadata; | ||
|
|
||
| public ClusteringPlanActionExecutor(HoodieEngineContext context, | ||
| HoodieWriteConfig config, | ||
| HoodieTable<T, I, K, O> table, | ||
| String instantTime, | ||
| Option<Map<String, String>> extraMetadata) { | ||
| HoodieWriteConfig config, | ||
| HoodieTable<T, I, K, O> table, | ||
| String instantTime, | ||
| Option<Map<String, String>> extraMetadata) { | ||
| super(context, config, table, instantTime); | ||
| this.extraMetadata = extraMetadata; | ||
| } | ||
|
|
@@ -63,6 +63,7 @@ protected Option<HoodieClusteringPlan> createClusteringPlan() { | |
| int commitsSinceLastClustering = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants() | ||
| .findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"), Integer.MAX_VALUE) | ||
| .countInstants(); | ||
|
|
||
| if (config.inlineClusteringEnabled() && config.getInlineClusterMaxCommits() > commitsSinceLastClustering) { | ||
| LOG.info("Not scheduling inline clustering as only " + commitsSinceLastClustering | ||
| + " commits was found since last clustering " + lastClusteringInstant + ". Waiting for " | ||
|
|
@@ -77,11 +78,14 @@ protected Option<HoodieClusteringPlan> createClusteringPlan() { | |
| return Option.empty(); | ||
| } | ||
|
|
||
| LOG.info("Generating clustering plan for table " + config.getBasePath()); | ||
| ClusteringPlanStrategy strategy = (ClusteringPlanStrategy) | ||
| ReflectionUtils.loadClass(ClusteringPlanStrategy.checkAndGetClusteringPlanStrategy(config), table, context, config); | ||
| ClusteringPlanStrategy strategy = null; | ||
| if (config.getAsyncClusterMaxCommits() <= commitsSinceLastClustering) { | ||
| LOG.info("Generating clustering plan for table " + config.getBasePath()); | ||
| strategy = (ClusteringPlanStrategy) | ||
| ReflectionUtils.loadClass(ClusteringPlanStrategy.checkAndGetClusteringPlanStrategy(config), table, context, config); | ||
| } | ||
|
|
||
| return strategy.generateClusteringPlan(); | ||
| return strategy == null ? Option.empty() : strategy.generateClusteringPlan(); | ||
|
||
| } | ||
|
|
||
| @Override | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and the condition below guarantee that the clustering is only scheduled based on the max_commits config. @eric9204 could you double check the logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yihua yes, this is indeed a redundant inspection, I'm testing whether this condition is needed.
By adding these two conditions, it can really be guaranteed that only one clustering is running at the same time, and if there is no completed clustering, no new clustering plan will be generated.
Configure only these three parameters.
'clustering.schedule.enabled'='true',
'clustering.async.enabled'='false',
'clustering.delta_commits'='6',
Configure only these three parameters.
'clustering.schedule.enabled'='true',
'clustering.async.enabled'='true',
'clustering.delta_commits'='6',
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, so the issue we are trying to solve is:
there is a regular writer which just schedules clustering and we have a async clustering job which does the execution of clustering.
if clustering is pending (may be will be executed by an async clustering job), every new successful commit with regular writer will keep adding new replacecommit.requested.
If yes, then the fix makes sense to me.
@yihua @danny0405 : wdyt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but one thing which I am finding it hard to comprehend is. wrt clustering, either both planning and execution is inline. or both are async atleast wrt spark datasource writer. So, not sure how the user ended up where clustering was just scheduled w/o getting to completion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the fix makes sense to me too.
Flink writer schedules a clustering plan on each successful regular commit and there is a async pipeline that executes the clustering continuously, this patch can solve the problem that the clustering plan schedules too frequently if there is pending clustering.
So, +1 from my side.