Skip to content

Commit 3645d2b

Browse files
apache#2963 Fix NPE during task rebalancing
Reformatted touched files according to "helix-format"
1 parent dfe0076 commit 3645d2b

File tree

9 files changed

+444
-437
lines changed

9 files changed

+444
-437
lines changed

helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java

+103-119
Large diffs are not rendered by default.

helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java

+19-16
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.ExecutorService;
2828
import java.util.concurrent.Executors;
2929
import java.util.concurrent.Future;
30+
3031
import org.apache.helix.HelixConstants;
3132
import org.apache.helix.HelixRebalanceException;
3233
import org.apache.helix.controller.changedetector.ResourceChangeDetector;
@@ -46,7 +47,6 @@
4647
import org.slf4j.Logger;
4748
import org.slf4j.LoggerFactory;
4849

49-
5050
/**
5151
* Global Rebalance does the baseline recalculation when certain changes happen.
5252
* The Global Baseline calculation does not consider any temporary status, such as participants' offline/disabled.
@@ -60,9 +60,9 @@ class GlobalRebalanceRunner implements AutoCloseable {
6060
// When any of the following change happens, the rebalancer needs to do a global rebalance which
6161
// contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
6262
private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
63-
ImmutableSet
64-
.of(HelixConstants.ChangeType.RESOURCE_CONFIG, HelixConstants.ChangeType.IDEAL_STATE,
65-
HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
63+
ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG,
64+
HelixConstants.ChangeType.IDEAL_STATE, HelixConstants.ChangeType.CLUSTER_CONFIG,
65+
HelixConstants.ChangeType.INSTANCE_CONFIG);
6666

6767
// To calculate the baseline asynchronously
6868
private final ExecutorService _baselineCalculateExecutor;
@@ -77,10 +77,8 @@ class GlobalRebalanceRunner implements AutoCloseable {
7777
private boolean _asyncGlobalRebalanceEnabled;
7878

7979
public GlobalRebalanceRunner(AssignmentManager assignmentManager,
80-
AssignmentMetadataStore assignmentMetadataStore,
81-
MetricCollector metricCollector,
82-
LatencyMetric writeLatency,
83-
CountMetric rebalanceFailureCount,
80+
AssignmentMetadataStore assignmentMetadataStore, MetricCollector metricCollector,
81+
LatencyMetric writeLatency, CountMetric rebalanceFailureCount,
8482
boolean isAsyncGlobalRebalanceEnabled) {
8583
_baselineCalculateExecutor = Executors.newSingleThreadExecutor();
8684
_assignmentManager = assignmentManager;
@@ -106,14 +104,17 @@ public GlobalRebalanceRunner(AssignmentManager assignmentManager,
106104
* @param algorithm
107105
* @throws HelixRebalanceException
108106
*/
109-
public void globalRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
110-
final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm) throws HelixRebalanceException {
107+
public void globalRebalance(ResourceControllerDataProvider clusterData,
108+
Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput,
109+
RebalanceAlgorithm algorithm) throws HelixRebalanceException {
111110
_changeDetector.updateSnapshots(clusterData);
112111
// Get all the changed items' information. Filter for the items that have content changed.
113-
final Map<HelixConstants.ChangeType, Set<String>> clusterChanges = _changeDetector.getAllChanges();
112+
final Map<HelixConstants.ChangeType, Set<String>> clusterChanges =
113+
_changeDetector.getAllChanges();
114114
Set<String> allAssignableInstances = clusterData.getAssignableInstances();
115115

116-
if (clusterChanges.keySet().stream().anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
116+
if (clusterChanges.keySet().stream()
117+
.anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
117118
final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled;
118119
// Calculate the Baseline assignment for global rebalance.
119120
Future<Boolean> result = _baselineCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> {
@@ -153,8 +154,8 @@ public void globalRebalance(ResourceControllerDataProvider clusterData, Map<Stri
153154
*/
154155
private void doGlobalRebalance(ResourceControllerDataProvider clusterData,
155156
Map<String, Resource> resourceMap, Set<String> allAssignableInstances,
156-
RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput, boolean shouldTriggerMainPipeline,
157-
Map<HelixConstants.ChangeType, Set<String>> clusterChanges)
157+
RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput,
158+
boolean shouldTriggerMainPipeline, Map<HelixConstants.ChangeType, Set<String>> clusterChanges)
158159
throws HelixRebalanceException {
159160
LOG.info("Start calculating the new baseline.");
160161
_baselineCalcCounter.increment(1L);
@@ -165,7 +166,8 @@ private void doGlobalRebalance(ResourceControllerDataProvider clusterData,
165166
// 1. Ignore node status (disable/offline).
166167
// 2. Use the previous Baseline as the only parameter about the previous assignment.
167168
Map<String, ResourceAssignment> currentBaseline =
168-
_assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
169+
_assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
170+
resourceMap.keySet());
169171
ClusterModel clusterModel;
170172
try {
171173
clusterModel = ClusterModelProvider.generateClusterModelForBaseline(clusterData, resourceMap,
@@ -175,7 +177,8 @@ private void doGlobalRebalance(ResourceControllerDataProvider clusterData,
175177
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
176178
}
177179

178-
Map<String, ResourceAssignment> newBaseline = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);
180+
Map<String, ResourceAssignment> newBaseline =
181+
WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);
179182
boolean isBaselineChanged =
180183
_assignmentMetadataStore != null && _assignmentMetadataStore.isBaselineChanged(newBaseline);
181184
// Write the new baseline to metadata store

helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java

+38-34
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.ExecutorService;
2727
import java.util.concurrent.Executors;
2828
import java.util.concurrent.Future;
29+
2930
import org.apache.helix.HelixRebalanceException;
3031
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
3132
import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil;
@@ -44,7 +45,6 @@
4445
import org.slf4j.Logger;
4546
import org.slf4j.LoggerFactory;
4647

47-
4848
/**
4949
* Compute the best possible assignment based on the Baseline and the previous Best Possible assignment.
5050
* The coordinator compares the previous Best Possible assignment with the current cluster state so as to derive a
@@ -68,10 +68,8 @@ class PartialRebalanceRunner implements AutoCloseable {
6868
private Future<Boolean> _asyncPartialRebalanceResult;
6969

7070
public PartialRebalanceRunner(AssignmentManager assignmentManager,
71-
AssignmentMetadataStore assignmentMetadataStore,
72-
MetricCollector metricCollector,
73-
CountMetric rebalanceFailureCount,
74-
boolean isAsyncPartialRebalanceEnabled) {
71+
AssignmentMetadataStore assignmentMetadataStore, MetricCollector metricCollector,
72+
CountMetric rebalanceFailureCount, boolean isAsyncPartialRebalanceEnabled) {
7573
_assignmentManager = assignmentManager;
7674
_assignmentMetadataStore = assignmentMetadataStore;
7775
_bestPossibleCalculateExecutor = Executors.newSingleThreadExecutor();
@@ -82,16 +80,16 @@ public PartialRebalanceRunner(AssignmentManager assignmentManager,
8280
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceCounter.name(),
8381
CountMetric.class);
8482
_partialRebalanceLatency = metricCollector.getMetric(
85-
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge
86-
.name(),
83+
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(),
8784
LatencyMetric.class);
8885
_baselineDivergenceGauge = metricCollector.getMetric(
8986
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.BaselineDivergenceGauge.name(),
9087
BaselineDivergenceGauge.class);
9188
}
9289

93-
public void partialRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
94-
Set<String> activeNodes, final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm)
90+
public void partialRebalance(ResourceControllerDataProvider clusterData,
91+
Map<String, Resource> resourceMap, Set<String> activeNodes,
92+
final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm)
9593
throws HelixRebalanceException {
9694
// If partial rebalance is async and the previous result is not completed yet,
9795
// do not start another partial rebalance.
@@ -100,19 +98,20 @@ public void partialRebalance(ResourceControllerDataProvider clusterData, Map<Str
10098
return;
10199
}
102100

103-
_asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> {
104-
try {
105-
doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm,
106-
currentStateOutput);
107-
} catch (HelixRebalanceException e) {
108-
if (_asyncPartialRebalanceEnabled) {
109-
_rebalanceFailureCount.increment(1L);
110-
}
111-
LOG.error("Failed to calculate best possible assignment!", e);
112-
return false;
113-
}
114-
return true;
115-
}));
101+
_asyncPartialRebalanceResult =
102+
_bestPossibleCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> {
103+
try {
104+
doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm,
105+
currentStateOutput);
106+
} catch (HelixRebalanceException e) {
107+
if (_asyncPartialRebalanceEnabled) {
108+
_rebalanceFailureCount.increment(1L);
109+
}
110+
LOG.error("Failed to calculate best possible assignment!", e);
111+
return false;
112+
}
113+
return true;
114+
}));
116115
if (!_asyncPartialRebalanceEnabled) {
117116
try {
118117
if (!_asyncPartialRebalanceResult.get()) {
@@ -131,9 +130,9 @@ public void partialRebalance(ResourceControllerDataProvider clusterData, Map<Str
131130
* If the result differ from the persisted result, persist it to memory (only if the version is not stale);
132131
* If persisted, trigger the pipeline so that main thread logic can run again.
133132
*/
134-
private void doPartialRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
135-
Set<String> activeNodes, RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput)
136-
throws HelixRebalanceException {
133+
private void doPartialRebalance(ResourceControllerDataProvider clusterData,
134+
Map<String, Resource> resourceMap, Set<String> activeNodes, RebalanceAlgorithm algorithm,
135+
CurrentStateOutput currentStateOutput) throws HelixRebalanceException {
137136
LOG.info("Start calculating the new best possible assignment.");
138137
_partialRebalanceCounter.increment(1L);
139138
_partialRebalanceLatency.startMeasuringLatency();
@@ -142,27 +141,30 @@ private void doPartialRebalance(ResourceControllerDataProvider clusterData, Map<
142141
if (_assignmentMetadataStore != null) {
143142
newBestPossibleAssignmentVersion = _assignmentMetadataStore.getBestPossibleVersion() + 1;
144143
} else {
145-
LOG.debug("Assignment Metadata Store is null. Skip getting best possible assignment version.");
144+
LOG.debug(
145+
"Assignment Metadata Store is null. Skip getting best possible assignment version.");
146146
}
147147

148148
// Read the baseline from metadata store
149149
Map<String, ResourceAssignment> currentBaseline =
150-
_assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
150+
_assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
151+
resourceMap.keySet());
151152

152153
// Read the best possible assignment from metadata store
153154
Map<String, ResourceAssignment> currentBestPossibleAssignment =
154155
_assignmentManager.getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
155156
resourceMap.keySet());
156157
ClusterModel clusterModel;
157158
try {
158-
clusterModel = ClusterModelProvider
159-
.generateClusterModelForPartialRebalance(clusterData, resourceMap, activeNodes,
160-
currentBaseline, currentBestPossibleAssignment);
159+
clusterModel =
160+
ClusterModelProvider.generateClusterModelForPartialRebalance(clusterData, resourceMap,
161+
activeNodes, currentBaseline, currentBestPossibleAssignment);
161162
} catch (Exception ex) {
162163
throw new HelixRebalanceException("Failed to generate cluster model for partial rebalance.",
163164
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
164165
}
165-
Map<String, ResourceAssignment> newAssignment = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);
166+
Map<String, ResourceAssignment> newAssignment =
167+
WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);
166168

167169
// Asynchronously report baseline divergence metric before persisting to metadata store,
168170
// just in case if persisting fails, we still have the metric.
@@ -177,12 +179,14 @@ private void doPartialRebalance(ResourceControllerDataProvider clusterData, Map<
177179
currentBaseline, newAssignmentCopy);
178180

179181
boolean bestPossibleUpdateSuccessful = false;
180-
if (_assignmentMetadataStore != null && _assignmentMetadataStore.isBestPossibleChanged(newAssignment)) {
182+
if (_assignmentMetadataStore != null && _assignmentMetadataStore.isBestPossibleChanged(
183+
newAssignment)) {
181184
// This will not persist the new Best Possible Assignment into ZK. It will only update the in-memory cache.
182185
// If this is done successfully, the new Best Possible Assignment will be persisted into ZK the next time that
183186
// the pipeline is triggered. We schedule the pipeline to run below.
184-
bestPossibleUpdateSuccessful = _assignmentMetadataStore.asyncUpdateBestPossibleAssignmentCache(newAssignment,
185-
newBestPossibleAssignmentVersion);
187+
bestPossibleUpdateSuccessful =
188+
_assignmentMetadataStore.asyncUpdateBestPossibleAssignmentCache(newAssignment,
189+
newBestPossibleAssignmentVersion);
186190
} else {
187191
LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment.");
188192
}

0 commit comments

Comments
 (0)