Skip to content

Commit a7f3b36

Browse files
committed
[FLINK-21274] Block main thread when running the TaskManagerRunner
In order to ensure that the TaskManager properly shuts down we need to let the main thread block on the execution of the TaskManager. This will ensure that there is always a non-daemon thread as long as the TaskManager runs. This closes apache#14914.
1 parent a7f898a commit a7f3b36

File tree

6 files changed

+113
-70
lines changed

6 files changed

+113
-70
lines changed

flink-kubernetes/src/main/java/org/apache/flink/kubernetes/taskmanager/KubernetesTaskExecutorRunner.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,6 @@ public static void main(String[] args) {
3636
SignalHandler.register(LOG);
3737
JvmShutdownSafeguard.installAsShutdownHook(LOG);
3838

39-
TaskManagerRunner.runTaskManagerSecurely(args);
39+
TaskManagerRunner.runTaskManagerProcessSecurely(args);
4040
}
4141
}

flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,6 @@ public static void main(String[] args) throws Exception {
6868
return;
6969
}
7070

71-
TaskManagerRunner.runTaskManagerSecurely(configuration);
71+
TaskManagerRunner.runTaskManagerProcessSecurely(configuration);
7272
}
7373
}

flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java

+90-36
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import java.util.ArrayList;
8181
import java.util.Collection;
8282
import java.util.concurrent.CompletableFuture;
83+
import java.util.concurrent.ExecutionException;
8384
import java.util.concurrent.ExecutorService;
8485
import java.util.concurrent.Executors;
8586
import java.util.concurrent.TimeUnit;
@@ -92,15 +93,14 @@
9293
* constructs the related components (network, I/O manager, memory manager, RPC service, HA service)
9394
* and starts them.
9495
*/
95-
public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync {
96+
public class TaskManagerRunner implements FatalErrorHandler {
9697

9798
private static final Logger LOG = LoggerFactory.getLogger(TaskManagerRunner.class);
9899

99100
private static final long FATAL_ERROR_SHUTDOWN_TIMEOUT_MS = 10000L;
100101

101-
private static final int STARTUP_FAILURE_RETURN_CODE = 1;
102-
103-
@VisibleForTesting static final int RUNTIME_FAILURE_RETURN_CODE = 2;
102+
private static final int SUCCESS_EXIT_CODE = 0;
103+
@VisibleForTesting static final int FAILURE_EXIT_CODE = 1;
104104

105105
private final Object lock = new Object();
106106

@@ -123,7 +123,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
123123

124124
private final TaskExecutorService taskExecutorService;
125125

126-
private final CompletableFuture<Void> terminationFuture;
126+
private final CompletableFuture<Result> terminationFuture;
127127

128128
private boolean shutdown;
129129

@@ -193,7 +193,8 @@ public TaskManagerRunner(
193193
this.shutdown = false;
194194
handleUnexpectedTaskExecutorServiceTermination();
195195

196-
MemoryLogger.startIfConfigured(LOG, configuration, terminationFuture);
196+
MemoryLogger.startIfConfigured(
197+
LOG, configuration, terminationFuture.thenAccept(ignored -> {}));
197198
}
198199

199200
private void handleUnexpectedTaskExecutorServiceTermination() {
@@ -220,8 +221,19 @@ public void start() throws Exception {
220221
taskExecutorService.start();
221222
}
222223

223-
@Override
224-
public CompletableFuture<Void> closeAsync() {
224+
public void close() throws Exception {
225+
try {
226+
closeAsync().get();
227+
} catch (ExecutionException e) {
228+
ExceptionUtils.rethrowException(ExceptionUtils.stripExecutionException(e));
229+
}
230+
}
231+
232+
public CompletableFuture<Result> closeAsync() {
233+
return closeAsync(Result.SUCCESS);
234+
}
235+
236+
private CompletableFuture<Result> closeAsync(Result terminationResult) {
225237
synchronized (lock) {
226238
if (!shutdown) {
227239
shutdown = true;
@@ -238,7 +250,7 @@ public CompletableFuture<Void> closeAsync() {
238250
if (throwable != null) {
239251
terminationFuture.completeExceptionally(throwable);
240252
} else {
241-
terminationFuture.complete(null);
253+
terminationFuture.complete(terminationResult);
242254
}
243255
});
244256
}
@@ -291,7 +303,7 @@ private CompletableFuture<Void> shutDownServices() {
291303
}
292304

293305
// export the termination future for caller to know it is terminated
294-
public CompletableFuture<Void> getTerminationFuture() {
306+
public CompletableFuture<Result> getTerminationFuture() {
295307
return terminationFuture;
296308
}
297309

@@ -314,17 +326,15 @@ public void onFatalError(Throwable exception) {
314326
&& !ExceptionUtils.isMetaspaceOutOfMemoryError(exception)) {
315327
terminateJVM();
316328
} else {
317-
closeAsync();
329+
closeAsync(Result.FAILURE);
318330

319331
FutureUtils.orTimeout(
320332
terminationFuture, FATAL_ERROR_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
321-
322-
terminationFuture.whenComplete((Void ignored, Throwable throwable) -> terminateJVM());
323333
}
324334
}
325335

326336
private void terminateJVM() {
327-
System.exit(RUNTIME_FAILURE_RETURN_CODE);
337+
System.exit(FAILURE_EXIT_CODE);
328338
}
329339

330340
// --------------------------------------------------------------------------------------------
@@ -345,49 +355,78 @@ public static void main(String[] args) throws Exception {
345355
LOG.info("Cannot determine the maximum number of open file descriptors");
346356
}
347357

348-
runTaskManagerSecurely(args);
358+
runTaskManagerProcessSecurely(args);
349359
}
350360

351361
public static Configuration loadConfiguration(String[] args) throws FlinkParseException {
352362
return ConfigurationParserUtils.loadCommonConfiguration(
353363
args, TaskManagerRunner.class.getSimpleName());
354364
}
355365

356-
public static void runTaskManager(Configuration configuration, PluginManager pluginManager)
366+
public static int runTaskManager(Configuration configuration, PluginManager pluginManager)
357367
throws Exception {
358-
final TaskManagerRunner taskManagerRunner =
359-
new TaskManagerRunner(
360-
configuration, pluginManager, TaskManagerRunner::createTaskExecutorService);
368+
final TaskManagerRunner taskManagerRunner;
361369

362-
taskManagerRunner.start();
363-
}
370+
try {
371+
taskManagerRunner =
372+
new TaskManagerRunner(
373+
configuration,
374+
pluginManager,
375+
TaskManagerRunner::createTaskExecutorService);
376+
taskManagerRunner.start();
377+
} catch (Exception exception) {
378+
throw new FlinkException("Failed to start the TaskManagerRunner.", exception);
379+
}
364380

365-
public static void runTaskManagerSecurely(String[] args) {
366381
try {
367-
Configuration configuration = loadConfiguration(args);
368-
runTaskManagerSecurely(configuration);
382+
return taskManagerRunner.getTerminationFuture().get().getExitCode();
369383
} catch (Throwable t) {
370-
final Throwable strippedThrowable =
371-
ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
372-
LOG.error("TaskManager initialization failed.", strippedThrowable);
373-
System.exit(STARTUP_FAILURE_RETURN_CODE);
384+
throw new FlinkException(
385+
"Unexpected failure during runtime of TaskManagerRunner.",
386+
ExceptionUtils.stripExecutionException(t));
387+
}
388+
}
389+
390+
public static void runTaskManagerProcessSecurely(String[] args) {
391+
Configuration configuration = null;
392+
393+
try {
394+
configuration = loadConfiguration(args);
395+
} catch (FlinkParseException fpe) {
396+
LOG.error("Could not load the configuration.", fpe);
397+
System.exit(FAILURE_EXIT_CODE);
374398
}
399+
400+
runTaskManagerProcessSecurely(checkNotNull(configuration));
375401
}
376402

377-
public static void runTaskManagerSecurely(Configuration configuration) throws Exception {
403+
public static void runTaskManagerProcessSecurely(Configuration configuration) {
378404
replaceGracefulExitWithHaltIfConfigured(configuration);
379405
final PluginManager pluginManager =
380406
PluginUtils.createPluginManagerFromRootFolder(configuration);
381407
FileSystem.initialize(configuration, pluginManager);
382408

383-
SecurityUtils.install(new SecurityConfiguration(configuration));
409+
int exitCode;
410+
Throwable throwable = null;
384411

385-
SecurityUtils.getInstalledContext()
386-
.runSecured(
387-
() -> {
388-
runTaskManager(configuration, pluginManager);
389-
return null;
390-
});
412+
try {
413+
SecurityUtils.install(new SecurityConfiguration(configuration));
414+
415+
exitCode =
416+
SecurityUtils.getInstalledContext()
417+
.runSecured(() -> runTaskManager(configuration, pluginManager));
418+
} catch (Throwable t) {
419+
throwable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
420+
exitCode = FAILURE_EXIT_CODE;
421+
}
422+
423+
if (throwable != null) {
424+
LOG.error("Terminating TaskManagerRunner with exit code {}.", exitCode, throwable);
425+
} else {
426+
LOG.info("Terminating TaskManagerRunner with exit code {}.", exitCode);
427+
}
428+
429+
System.exit(exitCode);
391430
}
392431

393432
// --------------------------------------------------------------------------------------------
@@ -612,4 +651,19 @@ public interface TaskExecutorService extends AutoCloseableAsync {
612651

613652
CompletableFuture<Void> getTerminationFuture();
614653
}
654+
655+
public enum Result {
656+
SUCCESS(SUCCESS_EXIT_CODE),
657+
FAILURE(FAILURE_EXIT_CODE);
658+
659+
private final int exitCode;
660+
661+
Result(int exitCode) {
662+
this.exitCode = exitCode;
663+
}
664+
665+
public int getExitCode() {
666+
return exitCode;
667+
}
668+
}
615669
}

flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java

+13-22
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,19 @@
2525
import org.apache.flink.core.plugin.PluginManager;
2626
import org.apache.flink.core.plugin.PluginUtils;
2727
import org.apache.flink.runtime.clusterframework.types.ResourceID;
28-
import org.apache.flink.runtime.testutils.SystemExitTrackingSecurityManager;
29-
import org.apache.flink.util.FlinkException;
3028
import org.apache.flink.util.TestLogger;
3129
import org.apache.flink.util.TimeUtils;
3230

3331
import org.junit.After;
34-
import org.junit.Before;
3532
import org.junit.Rule;
3633
import org.junit.Test;
3734
import org.junit.rules.Timeout;
3835

3936
import javax.annotation.Nonnull;
4037

4138
import java.net.InetAddress;
42-
import java.time.Duration;
4339
import java.util.concurrent.CompletableFuture;
4440

45-
import static org.apache.flink.core.testutils.FlinkMatchers.willNotComplete;
4641
import static org.hamcrest.Matchers.containsString;
4742
import static org.hamcrest.Matchers.equalTo;
4843
import static org.hamcrest.Matchers.is;
@@ -54,15 +49,8 @@ public class TaskManagerRunnerTest extends TestLogger {
5449

5550
@Rule public final Timeout timeout = Timeout.seconds(30);
5651

57-
private SystemExitTrackingSecurityManager systemExitTrackingSecurityManager;
5852
private TaskManagerRunner taskManagerRunner;
5953

60-
@Before
61-
public void before() {
62-
systemExitTrackingSecurityManager = new SystemExitTrackingSecurityManager();
63-
System.setSecurityManager(systemExitTrackingSecurityManager);
64-
}
65-
6654
@After
6755
public void after() throws Exception {
6856
System.setSecurityManager(null);
@@ -80,8 +68,9 @@ public void testShouldShutdownOnFatalError() throws Exception {
8068

8169
taskManagerRunner.onFatalError(new RuntimeException());
8270

83-
Integer statusCode = systemExitTrackingSecurityManager.getSystemExitFuture().get();
84-
assertThat(statusCode, is(equalTo(TaskManagerRunner.RUNTIME_FAILURE_RETURN_CODE)));
71+
assertThat(
72+
taskManagerRunner.getTerminationFuture().join(),
73+
is(equalTo(TaskManagerRunner.Result.FAILURE)));
8574
}
8675

8776
@Test
@@ -91,8 +80,9 @@ public void testShouldShutdownIfRegistrationWithJobManagerFails() throws Excepti
9180
TaskManagerOptions.REGISTRATION_TIMEOUT, TimeUtils.parseDuration("10 ms"));
9281
taskManagerRunner = createTaskManagerRunner(configuration);
9382

94-
Integer statusCode = systemExitTrackingSecurityManager.getSystemExitFuture().get();
95-
assertThat(statusCode, is(equalTo(TaskManagerRunner.RUNTIME_FAILURE_RETURN_CODE)));
83+
assertThat(
84+
taskManagerRunner.getTerminationFuture().join(),
85+
is(equalTo(TaskManagerRunner.Result.FAILURE)));
9686
}
9787

9888
@Test
@@ -169,10 +159,11 @@ public void testUnexpectedTaskManagerTerminationFailsRunnerFatally() throws Exce
169159
createConfiguration(),
170160
createTaskExecutorServiceFactory(taskExecutorService));
171161

172-
terminationFuture.completeExceptionally(new FlinkException("Test exception."));
162+
terminationFuture.complete(null);
173163

174-
Integer statusCode = systemExitTrackingSecurityManager.getSystemExitFuture().get();
175-
assertThat(statusCode, is(equalTo(TaskManagerRunner.RUNTIME_FAILURE_RETURN_CODE)));
164+
assertThat(
165+
taskManagerRunner.getTerminationFuture().join(),
166+
is(equalTo(TaskManagerRunner.Result.FAILURE)));
176167
}
177168

178169
@Test
@@ -191,11 +182,11 @@ public void testUnexpectedTaskManagerTerminationAfterRunnerCloseWillBeIgnored()
191182

192183
taskManagerRunner.closeAsync();
193184

194-
terminationFuture.completeExceptionally(new FlinkException("Test exception."));
185+
terminationFuture.complete(null);
195186

196187
assertThat(
197-
systemExitTrackingSecurityManager.getSystemExitFuture(),
198-
willNotComplete(Duration.ofMillis(10L)));
188+
taskManagerRunner.getTerminationFuture().join(),
189+
is(equalTo(TaskManagerRunner.Result.SUCCESS)));
199190
}
200191

201192
@Nonnull

flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ public static void main(String[] args) {
336336

337337
TaskManagerRunner.runTaskManager(cfg, pluginManager);
338338
} catch (Throwable t) {
339-
LOG.error("Failed to start TaskManager process", t);
339+
LOG.error("Failed to run the TaskManager process", t);
340340
System.exit(1);
341341
}
342342
}

flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java

+7-9
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,14 @@
2929
import org.apache.flink.runtime.util.EnvironmentInformation;
3030
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
3131
import org.apache.flink.runtime.util.SignalHandler;
32-
import org.apache.flink.util.ExceptionUtils;
32+
import org.apache.flink.util.Preconditions;
3333

3434
import org.apache.hadoop.security.UserGroupInformation;
3535
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
3636
import org.slf4j.Logger;
3737
import org.slf4j.LoggerFactory;
3838

3939
import java.io.IOException;
40-
import java.lang.reflect.UndeclaredThrowableException;
4140
import java.util.Map;
4241

4342
/** This class is the executable entry point for running a TaskExecutor in a YARN container. */
@@ -76,23 +75,22 @@ public static void main(String[] args) {
7675
* @param args The command line arguments.
7776
*/
7877
private static void runTaskManagerSecurely(String[] args) {
78+
Configuration configuration = null;
79+
7980
try {
8081
LOG.debug("All environment variables: {}", ENV);
8182

8283
final String currDir = ENV.get(Environment.PWD.key());
8384
LOG.info("Current working Directory: {}", currDir);
8485

85-
final Configuration configuration = TaskManagerRunner.loadConfiguration(args);
86+
configuration = TaskManagerRunner.loadConfiguration(args);
8687
setupAndModifyConfiguration(configuration, currDir, ENV);
87-
88-
TaskManagerRunner.runTaskManagerSecurely(configuration);
8988
} catch (Throwable t) {
90-
final Throwable strippedThrowable =
91-
ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
92-
// make sure that everything whatever ends up in the log
93-
LOG.error("YARN TaskManager initialization failed.", strippedThrowable);
89+
LOG.error("YARN TaskManager initialization failed.", t);
9490
System.exit(INIT_ERROR_EXIT_CODE);
9591
}
92+
93+
TaskManagerRunner.runTaskManagerProcessSecurely(Preconditions.checkNotNull(configuration));
9694
}
9795

9896
@VisibleForTesting

0 commit comments

Comments
 (0)