diff --git a/tez-common/src/main/java/org/apache/tez/util/TezRuntimeShutdownHandler.java b/tez-common/src/main/java/org/apache/tez/util/TezRuntimeShutdownHandler.java new file mode 100644 index 0000000000..4881e08ab9 --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/util/TezRuntimeShutdownHandler.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.util; + +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class TezRuntimeShutdownHandler { + private static final Logger LOG = LoggerFactory.getLogger(TezRuntimeShutdownHandler.class); + + private static final List shutdownTasks = new ArrayList<>(); + + private TezRuntimeShutdownHandler() { + } + + public static void addShutdownTask(Runnable r) { + shutdownTasks.add(r); + } + + public static synchronized void shutdown() { + LOG.info("Handling {} shutdown tasks", shutdownTasks.size()); + for (Runnable shutdownTask : shutdownTasks) { + shutdownTask.run(); + } + } +} diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index bc8c2d8391..b89b12db2b 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -71,6 +71,7 @@ import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; import org.apache.tez.runtime.internals.api.TaskReporterInterface; +import org.apache.tez.util.TezRuntimeShutdownHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -390,8 +391,10 @@ public void shutdown() { LOG.info("Shutting down container {}", containerIdString); // It's possible that there's pending tasks on the executor. Those should be cancelled. List pendingRunnables = executor.shutdownNow(); + LOG.info("There are {} runnables in shared executor, cancelling those...", pendingRunnables.size()); for (Runnable r : pendingRunnables) { - LOG.info("Cancelling pending runnables during TezChild shutdown for containerId={}", containerIdString); + LOG.info("Cancelling pending runnable ({}) during TezChild shutdown for containerId={}", r.hashCode(), + containerIdString); ((FutureTask)r).cancel(false); } if (taskReporter != null) { @@ -401,6 +404,8 @@ public void shutdown() { RPC.stopProxy(umbilical); } } + TezRuntimeShutdownHandler.shutdown(); + LOG.info("TezChild shutdown finished"); } public static class ContainerExecutionResult { @@ -522,7 +527,8 @@ public static void main(String[] args) throws IOException, InterruptedException, System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())), credentials, Runtime.getRuntime().maxMemory(), System .getenv(ApplicationConstants.Environment.USER.toString()), null, true, hadoopShim); - tezChild.run(); + ContainerExecutionResult result = tezChild.run(); + LOG.info("TezChild is about to exit from main(), run() returned result: {}", result.toString()); } private void handleError(Throwable t) { diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java index 63b8934821..215e63af58 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java @@ -35,6 +35,7 @@ import org.apache.tez.runtime.library.common.security.SecureShuffleUtils; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader; import org.apache.tez.util.StopWatch; +import org.apache.tez.util.TezRuntimeShutdownHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,6 +103,16 @@ private void initClient(HttpConnectionParams httpConnParams) throws IOException .build(); DefaultAsyncHttpClientConfig config = builder.build(); httpAsyncClient = new DefaultAsyncHttpClient(config); + TezRuntimeShutdownHandler.addShutdownTask(() -> { + try { + if (httpAsyncClient != null) { + httpAsyncClient.close(); + httpAsyncClient = null; + } + } catch (IOException e) { + LOG.warn("Error while closing async client (this won't block shutdown)", e); + } + }); } } }