diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java index c7b74fe9265..05ec790e373 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java @@ -279,15 +279,7 @@ public synchronized void start() { LOGGER.info("Starting the Gobblin Cluster Manager"); this.eventBus.register(this); - this.multiManager.connect(); - - // Standalone mode registers a handler to clean up on manager leadership change, so only clean up for non-standalone - // mode, such as YARN mode - if (!this.isStandaloneMode) { - this.multiManager.cleanUpJobs(); - } - - configureHelixQuotaBasedTaskScheduling(); + setupHelix(); if (this.isStandaloneMode) { // standalone mode starts non-daemon threads later, so need to have this thread to keep process up @@ -316,6 +308,18 @@ public void run() { this.started = true; } + public synchronized void setupHelix() { + this.multiManager.connect(); + + // Standalone mode registers a handler to clean up on manager leadership change, so only clean up for non-standalone + // mode, such as YARN mode + if (!this.isStandaloneMode) { + this.multiManager.cleanUpJobs(); + } + + configureHelixQuotaBasedTaskScheduling(); + } + /** * Stop the Gobblin Cluster Manager. */ @@ -427,11 +431,18 @@ boolean isHelixManagerConnected() { */ @VisibleForTesting void initializeHelixManager() { - this.multiManager = new GobblinHelixMultiManager( - this.config, aVoid -> GobblinClusterManager.this.getUserDefinedMessageHandlerFactory(), this.eventBus, stopStatus) ; + this.multiManager = createMultiManager(); this.multiManager.addLeadershipChangeAwareComponent(this); } + /*** + * Can be overriden to inject mock GobblinHelixMultiManager + * @return a new GobblinHelixMultiManager + */ + public GobblinHelixMultiManager createMultiManager() { + return new GobblinHelixMultiManager(this.config, aVoid -> GobblinClusterManager.this.getUserDefinedMessageHandlerFactory(), this.eventBus, stopStatus); + } + @VisibleForTesting void sendShutdownRequest() { Criteria criteria = new Criteria(); diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java index d6200296717..5614b7146e9 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java @@ -28,11 +28,13 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.HelixManager; +import org.apache.helix.HelixProperty; import org.apache.helix.PropertyKey; import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.model.HelixConfigScope; @@ -448,6 +450,18 @@ public static List getLiveInstances(HelixManager helixManager) { return accessor.getChildNames(liveInstancesKey); } + /** + * Getting all instances (Helix Participants) in cluster at this moment. + * Note that the raw result could contain AppMaster node and replanner node. + * @param filterString Helix instances whose name containing fitlerString will pass filtering. + */ + public static Set getParticipants(HelixDataAccessor helixDataAccessor, String filterString) { + PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder(); + PropertyKey liveInstance = keyBuilder.liveInstances(); + Map childValuesMap = helixDataAccessor.getChildValuesMap(liveInstance); + return childValuesMap.keySet().stream().filter(x -> filterString.isEmpty() || x.contains(filterString)).collect(Collectors.toSet()); + } + public static boolean isInstanceLive(HelixManager helixManager, String instanceName) { HelixDataAccessor accessor = helixManager.getHelixDataAccessor(); PropertyKey liveInstanceKey = accessor.keyBuilder().liveInstance(instanceName); diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java index 16cb954802e..ff2cb0e1a63 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java @@ -19,6 +19,7 @@ import java.util.Collections; import java.util.List; +import java.util.Set; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; @@ -32,6 +33,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; import org.apache.helix.NotificationContext; import org.apache.helix.messaging.handling.HelixTaskResult; import org.apache.helix.messaging.handling.MessageHandler; @@ -52,6 +56,8 @@ import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys; import org.apache.gobblin.cluster.GobblinClusterManager; import org.apache.gobblin.cluster.GobblinClusterUtils; +import org.apache.gobblin.cluster.GobblinHelixMultiManager; +import org.apache.gobblin.cluster.HelixUtils; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.JvmUtils; import org.apache.gobblin.util.PathUtils; @@ -135,6 +141,35 @@ protected MultiTypeMessageHandlerFactory getUserDefinedMessageHandlerFactory() { return new ControllerUserDefinedMessageHandlerFactory(); } + @Override + public synchronized void setupHelix() { + super.setupHelix(); + this.disableTaskRunnersFromPreviousExecutions(this.multiManager); + } + + /** + * A method to disable pre-existing live instances in a Helix cluster. This can happen when a previous Yarn application + * leaves behind orphaned Yarn worker processes. Since Helix does not provide an API to drop a live instance, we use + * the disable instance API to fence off these orphaned instances and prevent them from becoming participants in the + * new cluster. + * + * NOTE: this is a workaround for an existing YARN bug. Once YARN has a fix to guarantee container kills on application + * completion, this method should be removed. + */ + public static void disableTaskRunnersFromPreviousExecutions(GobblinHelixMultiManager multiManager) { + HelixManager helixManager = multiManager.getJobClusterHelixManager(); + HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor(); + String clusterName = helixManager.getClusterName(); + HelixAdmin helixAdmin = helixManager.getClusterManagmentTool(); + Set taskRunners = HelixUtils.getParticipants(helixDataAccessor, + GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_NAME_PREFIX); + LOGGER.warn("Found {} task runners in the cluster.", taskRunners.size()); + for (String taskRunner : taskRunners) { + LOGGER.warn("Disabling instance: {}", taskRunner); + helixAdmin.enableInstance(clusterName, taskRunner, false); + } + } + /** * A custom {@link MultiTypeMessageHandlerFactory} for {@link ControllerUserDefinedMessageHandler}s that * handle messages of type {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}. diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java index 99c4094df50..48ac8947972 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java @@ -38,8 +38,6 @@ import org.apache.avro.Schema; import org.apache.commons.io.FileUtils; import org.apache.commons.mail.EmailException; -import org.apache.gobblin.util.hadoop.TokenUtils; -import org.apache.gobblin.util.reflection.GobblinConstructorUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -71,7 +69,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.Records; import org.apache.helix.Criteria; -import org.apache.helix.HelixAdmin; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; @@ -117,8 +114,10 @@ import org.apache.gobblin.util.EmailUtils; import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.JvmUtils; +import org.apache.gobblin.util.hadoop.TokenUtils; import org.apache.gobblin.util.io.StreamUtils; import org.apache.gobblin.util.logs.LogCopier; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; import org.apache.gobblin.yarn.event.ApplicationReportArrivalEvent; import org.apache.gobblin.yarn.event.GetApplicationReportFailureEvent; @@ -371,7 +370,6 @@ public void launch() throws IOException, YarnException, InterruptedException { this.applicationId = getReconnectableApplicationId(); if (!this.applicationId.isPresent()) { - disableLiveHelixInstances(); LOGGER.info("No reconnectable application found so submitting a new application"); this.yarnClient = potentialYarnClients.get(this.originalYarnRMAddress); this.applicationId = Optional.of(setupAndSubmitApplication()); @@ -454,7 +452,6 @@ public synchronized void stop() throws IOException, TimeoutException { if (!this.detachOnExitEnabled) { LOGGER.info("Disabling all live Helix instances.."); - disableLiveHelixInstances(); } disconnectHelixManager(); @@ -540,26 +537,6 @@ void connectHelixManager() { } } - /** - * A method to disable pre-existing live instances in a Helix cluster. This can happen when a previous Yarn application - * leaves behind orphaned Yarn worker processes. Since Helix does not provide an API to drop a live instance, we use - * the disable instance API to fence off these orphaned instances and prevent them from becoming participants in the - * new cluster. - * - * NOTE: this is a workaround for an existing YARN bug. Once YARN has a fix to guarantee container kills on application - * completion, this method should be removed. - */ - void disableLiveHelixInstances() { - String clusterName = this.helixManager.getClusterName(); - HelixAdmin helixAdmin = this.helixManager.getClusterManagmentTool(); - List liveInstances = HelixUtils.getLiveInstances(this.helixManager); - LOGGER.warn("Found {} live instances in the cluster.", liveInstances.size()); - for (String instanceName: liveInstances) { - LOGGER.warn("Disabling instance: {}", instanceName); - helixAdmin.enableInstance(clusterName, instanceName, false); - } - } - @VisibleForTesting void disconnectHelixManager() { if (this.helixManager.isConnected()) { diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java index e6683cfd383..63d85980d5a 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java @@ -30,12 +30,10 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; - import org.apache.commons.compress.utils.Sets; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; -import org.apache.helix.PropertyKey; import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobContext; import org.apache.helix.task.JobDag; @@ -56,6 +54,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys; +import org.apache.gobblin.cluster.HelixUtils; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.ExecutorsUtils; @@ -184,17 +183,6 @@ public void run() { } } - /** - * Getting all instances (Helix Participants) in cluster at this moment. - * Note that the raw result could contains AppMaster node and replanner node. - * @param filterString Helix instances whose name containing fitlerString will pass filtering. - */ - private Set getParticipants(String filterString) { - PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder(); - return helixDataAccessor.getChildValuesMap(keyBuilder.liveInstances()) - .keySet().stream().filter(x -> filterString.isEmpty() || x.contains(filterString)).collect(Collectors.toSet()); - } - private String getInuseParticipantForHelixPartition(JobContext jobContext, int partition) { if (jobContext.getPartitionNumAttempts(partition) > THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING) { log.warn("Helix task {} has been retried for {} times, please check the config to see how we can handle this task better", @@ -272,7 +260,7 @@ void runInternal() { } // Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager // and potentially replanner-instance. - Set allParticipants = getParticipants(HELIX_YARN_INSTANCE_NAME_PREFIX); + Set allParticipants = HelixUtils.getParticipants(helixDataAccessor, HELIX_YARN_INSTANCE_NAME_PREFIX); // Find all joined participants not in-use for this round of inspection. // If idle time is beyond tolerance, mark the instance as unused by assigning timestamp as -1. diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinApplicationMasterTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinApplicationMasterTest.java new file mode 100644 index 00000000000..947021b04d1 --- /dev/null +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinApplicationMasterTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.yarn; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixProperty; +import org.apache.helix.PropertyKey; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +import junit.framework.TestCase; + +import org.apache.gobblin.cluster.GobblinHelixMultiManager; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + + +public class GobblinApplicationMasterTest extends TestCase { + @Test + public void testDisableTaskRunnersFromPreviousExecutions() { + GobblinHelixMultiManager mockMultiManager = Mockito.mock(GobblinHelixMultiManager.class); + + HelixManager mockHelixManager = Mockito.mock(HelixManager.class); + when(mockMultiManager.getJobClusterHelixManager()).thenReturn(mockHelixManager); + + HelixAdmin mockHelixAdmin = Mockito.mock(HelixAdmin.class); + when(mockHelixManager.getClusterManagmentTool()).thenReturn(mockHelixAdmin); + when(mockHelixManager.getClusterName()).thenReturn("mockCluster"); + + HelixDataAccessor mockAccessor = Mockito.mock(HelixDataAccessor.class); + when(mockHelixManager.getHelixDataAccessor()).thenReturn(mockAccessor); + + PropertyKey.Builder mockBuilder = Mockito.mock(PropertyKey.Builder.class); + when(mockAccessor.keyBuilder()).thenReturn(mockBuilder); + + PropertyKey mockLiveInstancesKey = Mockito.mock(PropertyKey.class); + when(mockBuilder.liveInstances()).thenReturn(mockLiveInstancesKey); + + int instanceCount = 3; + + // GobblinYarnTaskRunner prefix would be disabled, while GobblinClusterManager prefix will not + ArrayList gobblinYarnTaskRunnerPrefix = new ArrayList(); + ArrayList gobblinClusterManagerPrefix = new ArrayList(); + for (int i = 0; i < instanceCount; i++) { + gobblinYarnTaskRunnerPrefix.add("GobblinYarnTaskRunner_TestInstance_" + i); + gobblinClusterManagerPrefix.add("GobblinClusterManager_TestInstance_" + i); + } + + Map mockChildValues = new HashMap<>(); + for (int i = 0; i < instanceCount; i++) { + mockChildValues.put(gobblinYarnTaskRunnerPrefix.get(i), Mockito.mock(HelixProperty.class)); + mockChildValues.put(gobblinClusterManagerPrefix.get(i), Mockito.mock(HelixProperty.class)); + } + when(mockAccessor.getChildValuesMap(mockLiveInstancesKey)).thenReturn(mockChildValues); + + GobblinApplicationMaster.disableTaskRunnersFromPreviousExecutions(mockMultiManager); + + for (int i = 0; i < instanceCount; i++) { + Mockito.verify(mockHelixAdmin).enableInstance("mockCluster", gobblinYarnTaskRunnerPrefix.get(i), false); + Mockito.verify(mockHelixAdmin, times(0)).enableInstance("mockCluster", gobblinClusterManagerPrefix.get(i), false); + } + } +} \ No newline at end of file diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java index 2eb8c4f32f5..65e9dc26946 100644 --- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java @@ -27,6 +27,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -44,9 +45,13 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; +import org.apache.helix.HelixProperty; import org.apache.helix.InstanceType; +import org.apache.helix.PropertyKey; import org.apache.helix.model.Message; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -84,6 +89,7 @@ import org.apache.gobblin.testing.AssertWithBackoff; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; /** @@ -431,6 +437,25 @@ public void testJobCleanup() throws Exception { GobblinHelixMultiManager mockMultiManager = Mockito.mock(GobblinHelixMultiManager.class); appMaster.setMultiManager(mockMultiManager); + + HelixManager mockHelixManager = Mockito.mock(HelixManager.class); + when(mockMultiManager.getJobClusterHelixManager()).thenReturn(mockHelixManager); + + HelixAdmin mockHelixAdmin = Mockito.mock(HelixAdmin.class); + when(mockHelixManager.getClusterManagmentTool()).thenReturn(mockHelixAdmin); + + HelixDataAccessor mockAccessor = Mockito.mock(HelixDataAccessor.class); + when(mockHelixManager.getHelixDataAccessor()).thenReturn(mockAccessor); + + PropertyKey.Builder mockBuilder = Mockito.mock(PropertyKey.Builder.class); + when(mockAccessor.keyBuilder()).thenReturn(mockBuilder); + + PropertyKey mockLiveInstancesKey = Mockito.mock(PropertyKey.class); + when(mockBuilder.liveInstances()).thenReturn(mockLiveInstancesKey); + + Map mockChildValues = new HashMap<>(); + when(mockAccessor.getChildValuesMap(mockLiveInstancesKey)).thenReturn(mockChildValues); + appMaster.start(); Mockito.verify(mockMultiManager, times(1)).cleanUpJobs();