Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix WAGED Instance Capacity NPE during rebalance failures #3010

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ public synchronized boolean checkAndReduceInstanceCapacity(String instance, Stri
return true;
}

if (!_instanceCapacityMap.containsKey(instance)) {
LOG.error("Instance: " + instance + " not found in instance capacity map. Cluster may be using previous "
+ "idealState that includes an instance that is no longer part of the cluster.");
return false;
}

Comment on lines +204 to +209
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interesting thing is that shall we skip this instance and continue or fail the entire pipeline using prev compute.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwing a helix exception at computeBestPossibleStates will just lead to falling back to the previous best possible. However, throwing an exception here would prevent computeBestPossibleStateForPartition from assigning correct states based off the previous best possible we fell back to. Stack trace below

We'd need to add error handling around the computeBestPossiblePartitionState call and have some fallback mechanism

3732 [HelixController-pipeline-default-TestWagedNPE_cluster-(70412709_DEFAULT)] ERROR org.apache.helix.controller.GenericHelixController [] - Exception while executing DEFAULT pipeline for cluster TestWagedNPE_cluster. Will not continue to next pipeline
org.apache.helix.HelixException: Instance: localhost_0 not found in instance capacity map. Cluster may be using previous idealState that includes an instance that is no longer part of the cluster.
	at org.apache.helix.controller.rebalancer.waged.WagedInstanceCapacity.checkAndReduceInstanceCapacity(WagedInstanceCapacity.java:209) ~[classes/:?]
	at org.apache.helix.controller.dataproviders.ResourceControllerDataProvider.checkAndReduceCapacity(ResourceControllerDataProvider.java:543) ~[classes/:?]
	at org.apache.helix.controller.rebalancer.DelayedAutoRebalancer.computeBestPossibleStateForPartition(DelayedAutoRebalancer.java:378) ~[classes/:?]
	at org.apache.helix.controller.rebalancer.DelayedAutoRebalancer.computeBestPossiblePartitionState(DelayedAutoRebalancer.java:271) ~[classes/:?]
	at org.apache.helix.controller.rebalancer.DelayedAutoRebalancer.computeBestPossiblePartitionState(DelayedAutoRebalancer.java:54) ~[classes/:?]
	at org.apache.helix.controller.rebalancer.waged.WagedRebalancer.lambda$computeNewIdealStates$0(WagedRebalancer.java:281) ~[classes/:?]
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) ~[?:?]
	at java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1693) ~[?:?]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
	at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290) ~[?:?]
	at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746) ~[?:?]
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) ~[?:?]
	at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:408) ~[?:?]
	at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:736) ~[?:?]
	at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159) ~[?:?]
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173) ~[?:?]
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) ~[?:?]
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) ~[?:?]
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:661) ~[?:?]
	at org.apache.helix.controller.rebalancer.waged.WagedRebalancer.computeNewIdealStates(WagedRebalancer.java:277) ~[classes/:?]
	at org.apache.helix.controller.stages.BestPossibleStateCalcStage.computeResourceBestPossibleStateWithWagedRebalancer(BestPossibleStateCalcStage.java:445) ~[classes/:?]
	at org.apache.helix.controller.stages.BestPossibleStateCalcStage.compute(BestPossibleStateCalcStage.java:289) ~[classes/:?]
	at org.apache.helix.controller.stages.BestPossibleStateCalcStage.process(BestPossibleStateCalcStage.java:94) ~[classes/:?]
	at org.apache.helix.controller.pipeline.Pipeline.handle(Pipeline.java:75) ~[classes/:?]
	at org.apache.helix.controller.GenericHelixController.handleEvent(GenericHelixController.java:905) [classes/:?]
	at org.apache.helix.controller.GenericHelixController$ClusterEventProcessor.run(GenericHelixController.java:1556) [classes/:?]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then we should have a way to get state change (urgent rebalance pipeline to handle this). No skipping on urgent rebalance pipeline.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simply ignore it in main logic can cause miscomputation on placement for full / partial rebalance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then we should have a way to get state change (urgent rebalance pipeline to handle this). No skipping on urgent rebalance pipeline.

I'm not sure I understand what you mean. As long as there are rebalance failures, waged will fall back to the previous best possible which may have nodes that have been removed from the cluster during the rebalance failures. Subsequent pipeline runs will face this same issue until the cause of the rebalance failure is addressed. The instanceCapacityMap should be the source of truth for assignable nodes for WagedInstanceCapacity

If we fail the pipeline entirely (current behavior because of NPE) then we will not distribute states across the available nodes from the previous IS

Map<String, Integer> instanceCapacity = _instanceCapacityMap.get(instance);
Map<String, Integer> processedCapacity = new HashMap<>();
for (String key : instanceCapacity.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,16 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
Expand All @@ -23,11 +20,15 @@

public class TestWagedNPE extends ZkTestBase {

public static String CLUSTER_NAME = TestHelper.getTestClassName() + "_cluster";
public static int PARTICIPANT_COUNT = 3;
public static List<MockParticipantManager> _participants = new ArrayList<>();
public static ClusterControllerManager _controller;
public static ConfigAccessor _configAccessor;
private final String CLUSTER_NAME = TestHelper.getTestClassName() + "_cluster";
private final int PARTICIPANT_COUNT = 3;
private final int PARTITION_COUNT = 3;
private final int REPLICA_COUNT = 3;
private final int DEFAULT_VERIFIER_TIMEOUT = 15000;
private List<MockParticipantManager> _participants = new ArrayList<>();
private ClusterControllerManager _controller;
private ConfigAccessor _configAccessor;
private BestPossibleExternalViewVerifier _verifier;

@BeforeClass
public void beforeClass() {
Expand All @@ -45,60 +46,108 @@ public void beforeClass() {
ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
String testCapacityKey = "TestCapacityKey";
clusterConfig.setInstanceCapacityKeys(Collections.singletonList(testCapacityKey));
clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey, 100));
clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey, 3));
clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(testCapacityKey, 1));
clusterConfig.setPersistBestPossibleAssignment(true);
_configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
_verifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
}

// This test was constructed to capture the bug described in issue 2891
// https://github.com/apache/helix/issues/2891
@Test
public void testNPE() throws Exception {
int numPartition = 3;
BestPossibleExternalViewVerifier verifier =
new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();

public void testNPEonNewResource() {
System.out.println("Start test " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName());
// Create 1 WAGED Resource
String firstDB = "firstDB";
_gSetupTool.addResourceToCluster(CLUSTER_NAME, firstDB, numPartition, "LeaderStandby",
IdealState.RebalanceMode.FULL_AUTO.name(), null);
IdealState idealStateOne =
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, firstDB);
idealStateOne.setMinActiveReplicas(2);
idealStateOne.setRebalancerClassName(WagedRebalancer.class.getName());
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, firstDB, idealStateOne);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, firstDB, 3);
addWagedResource(firstDB, PARTITION_COUNT, REPLICA_COUNT);

// Wait for cluster to converge
Assert.assertTrue(verifier.verifyByPolling());
Assert.assertTrue(_verifier.verifyByPolling());

// Drop resource
_gSetupTool.dropResourceFromCluster(CLUSTER_NAME, firstDB);

// Wait for cluster to converge
Assert.assertTrue(verifier.verifyByPolling());
Assert.assertTrue(_verifier.verifyByPolling());

// add instance
addParticipant("instance_to_add");
MockParticipantManager instanceToAdd = addParticipant("instance_to_add");

// Wait for cluster to converge
Assert.assertTrue(verifier.verifyByPolling());
Assert.assertTrue(_verifier.verifyByPolling());

// Add a new resource
String secondDb = "secondDB";
_configAccessor.setResourceConfig(CLUSTER_NAME, secondDb, new ResourceConfig(secondDb));
_gSetupTool.addResourceToCluster(CLUSTER_NAME, secondDb, numPartition, "LeaderStandby",
IdealState.RebalanceMode.FULL_AUTO.name(), null);
IdealState idealStateTwo =
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, secondDb);
idealStateTwo.setMinActiveReplicas(2);
idealStateTwo.setRebalancerClassName(WagedRebalancer.class.getName());
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, secondDb, idealStateTwo);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, secondDb, 3);
String secondDB = "secondDB";
addWagedResource(secondDB, PARTITION_COUNT, REPLICA_COUNT);

// Confirm cluster can converge. Cluster will not converge if NPE occurs during pipeline run
Assert.assertTrue(verifier.verifyByPolling());
Assert.assertTrue(_verifier.verifyByPolling());

// Reset cluster
_gSetupTool.dropResourceFromCluster(CLUSTER_NAME, secondDB);
instanceToAdd.syncStop();
_gSetupTool.getClusterManagementTool().dropInstance(CLUSTER_NAME, _gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToAdd.getInstanceName()));
Assert.assertTrue(_gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME).isEmpty());
Assert.assertTrue(_verifier.verifyByPolling());
System.out.println("End test " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName());
}

@Test(dependsOnMethods = "testNPEonNewResource")
public void testNPEonRebalanceFailure() throws Exception {
System.out.println("Start test " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName());
// Add 1 WAGED resource that will be succesfully placed
String firstDB = "firstDB";
addWagedResource(firstDB, PARTITION_COUNT, REPLICA_COUNT);

// Wait for cluster to converge
Assert.assertTrue(_verifier.verifyByPolling());

// Add a 2nd WAGED resource that will fail to place
String secondDB = "secondDB";
addWagedResource(secondDB, PARTITION_COUNT, REPLICA_COUNT);

// Kill 1 instance
MockParticipantManager instanceToKill = _participants.get(0);
instanceToKill.syncStop();
Assert.assertTrue(TestHelper.verify(() ->
!_gZkClient.exists("/" + CLUSTER_NAME + "/LIVEINSTANCES/" + instanceToKill.getInstanceName()),
DEFAULT_VERIFIER_TIMEOUT));

// Assert that each partition for firstDB has a LEADER replica
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, firstDB)
.getRecord().getMapFields().forEach((partition, partitionStateMap) -> {
Assert.assertFalse(partitionStateMap.containsKey(instanceToKill.getInstanceName()));
Assert.assertTrue(partitionStateMap.containsValue("LEADER"));
});

// Drop the dead instance
_gSetupTool.getClusterManagementTool().dropInstance(CLUSTER_NAME, _gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToKill.getInstanceName()));

// Kill another instance
MockParticipantManager instanceToKill2 = _participants.get(1);
instanceToKill2.syncStop();
Assert.assertTrue(TestHelper.verify(() ->
!_gZkClient.exists("/" + CLUSTER_NAME + "/LIVEINSTANCES/" + instanceToKill2.getInstanceName()),
DEFAULT_VERIFIER_TIMEOUT));

// Assert that each partition for firstDB has a LEADER replica still
Assert.assertTrue(TestHelper.verify( () -> {
AtomicBoolean verified = new AtomicBoolean(true);
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, firstDB)
.getRecord().getMapFields().forEach((partition, partitionStateMap) -> {
boolean result = !partitionStateMap.containsKey(instanceToKill.getInstanceName()) &&
!partitionStateMap.containsKey(instanceToKill2.getInstanceName()) &&
partitionStateMap.containsValue("LEADER");
if (!result) {
verified.set(result);
}
});
return verified.get();}, DEFAULT_VERIFIER_TIMEOUT));
System.out.println("End test " + TestHelper.getTestClassName() + "." + TestHelper.getTestMethodName());
}

public MockParticipantManager addParticipant(String instanceName) {
Expand All @@ -108,4 +157,15 @@ public MockParticipantManager addParticipant(String instanceName) {
_participants.add(participant);
return participant;
}

private void addWagedResource(String resourceName, int partitions, int replicas) {
_gSetupTool.addResourceToCluster(CLUSTER_NAME, resourceName, partitions, "LeaderStandby",
IdealState.RebalanceMode.FULL_AUTO.name(), null);
IdealState idealStateOne =
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, resourceName);
idealStateOne.setMinActiveReplicas(2);
idealStateOne.setRebalancerClassName(WagedRebalancer.class.getName());
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, resourceName, idealStateOne);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, resourceName, replicas);
}
}