Skip to content

Commit fdd52a7

Browse files
committed
[FLINK-25022][rest] Run jars in separate threads
Use a dedicated thread to run each jar, so that pooled threads can't keep references to user-code (e.g., in a ThreadLocal).
1 parent b6ae903 commit fdd52a7

File tree

6 files changed

+245
-8
lines changed

6 files changed

+245
-8
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.util.concurrent;
19+
20+
import org.apache.flink.util.Preconditions;
21+
22+
import javax.annotation.Nonnull;
23+
24+
import java.util.concurrent.Executor;
25+
import java.util.concurrent.ThreadFactory;
26+
27+
/** An {@link Executor} that runs every runnable in a separate thread. */
28+
public final class SeparateThreadExecutor implements Executor {
29+
private final ThreadFactory threadFactory;
30+
31+
public SeparateThreadExecutor(ThreadFactory threadFactory) {
32+
this.threadFactory = Preconditions.checkNotNull(threadFactory);
33+
}
34+
35+
@Override
36+
public void execute(@Nonnull Runnable command) {
37+
threadFactory.newThread(command).start();
38+
}
39+
}

flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java

+52-5
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818

1919
package org.apache.flink.runtime.webmonitor;
2020

21+
import org.apache.flink.annotation.VisibleForTesting;
2122
import org.apache.flink.api.common.time.Time;
2223
import org.apache.flink.api.java.tuple.Tuple2;
24+
import org.apache.flink.client.deployment.application.ApplicationRunner;
2325
import org.apache.flink.client.deployment.application.DetachedApplicationRunner;
2426
import org.apache.flink.configuration.Configuration;
2527
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
2628
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
29+
import org.apache.flink.runtime.util.ExecutorThreadFactory;
2730
import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler;
2831
import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders;
2932
import org.apache.flink.runtime.webmonitor.handlers.JarListHandler;
@@ -36,6 +39,7 @@
3639
import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
3740
import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
3841
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
42+
import org.apache.flink.util.concurrent.SeparateThreadExecutor;
3943

4044
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
4145

@@ -45,13 +49,18 @@
4549
import java.util.Map;
4650
import java.util.concurrent.CompletableFuture;
4751
import java.util.concurrent.Executor;
52+
import java.util.function.Supplier;
4853

4954
/** Container for the web submission handlers. */
5055
public class WebSubmissionExtension implements WebMonitorExtension {
5156

5257
private final ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>>
5358
webSubmissionHandlers;
5459

60+
// for easier access during testing
61+
private final JarUploadHandler jarUploadHandler;
62+
private final JarRunHandler jarRunHandler;
63+
5564
public WebSubmissionExtension(
5665
Configuration configuration,
5766
GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
@@ -61,10 +70,38 @@ public WebSubmissionExtension(
6170
Executor executor,
6271
Time timeout)
6372
throws Exception {
73+
this(
74+
configuration,
75+
leaderRetriever,
76+
responseHeaders,
77+
localAddressFuture,
78+
jarDir,
79+
executor,
80+
timeout,
81+
() -> new DetachedApplicationRunner(true));
82+
}
83+
84+
@VisibleForTesting
85+
WebSubmissionExtension(
86+
Configuration configuration,
87+
GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
88+
Map<String, String> responseHeaders,
89+
CompletableFuture<String> localAddressFuture,
90+
Path jarDir,
91+
Executor executor,
92+
Time timeout,
93+
Supplier<ApplicationRunner> applicationRunnerSupplier)
94+
throws Exception {
6495

6596
webSubmissionHandlers = new ArrayList<>();
6697

67-
final JarUploadHandler jarUploadHandler =
98+
final Executor jarRunExecutor =
99+
new SeparateThreadExecutor(
100+
new ExecutorThreadFactory.Builder()
101+
.setPoolName("flink-jar-runner")
102+
.build());
103+
104+
jarUploadHandler =
68105
new JarUploadHandler(
69106
leaderRetriever,
70107
timeout,
@@ -84,16 +121,16 @@ public WebSubmissionExtension(
84121
configuration,
85122
executor);
86123

87-
final JarRunHandler jarRunHandler =
124+
jarRunHandler =
88125
new JarRunHandler(
89126
leaderRetriever,
90127
timeout,
91128
responseHeaders,
92129
JarRunHeaders.getInstance(),
93130
jarDir,
94131
configuration,
95-
executor,
96-
() -> new DetachedApplicationRunner(true));
132+
jarRunExecutor,
133+
applicationRunnerSupplier);
97134

98135
final JarDeleteHandler jarDeleteHandler =
99136
new JarDeleteHandler(
@@ -112,7 +149,7 @@ public WebSubmissionExtension(
112149
JarPlanGetHeaders.getInstance(),
113150
jarDir,
114151
configuration,
115-
executor);
152+
jarRunExecutor);
116153

117154
final JarPlanHandler postJarPlanHandler =
118155
new JarPlanHandler(
@@ -141,4 +178,14 @@ public CompletableFuture<Void> closeAsync() {
141178
public Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> getHandlers() {
142179
return webSubmissionHandlers;
143180
}
181+
182+
@VisibleForTesting
183+
JarUploadHandler getJarUploadHandler() {
184+
return jarUploadHandler;
185+
}
186+
187+
@VisibleForTesting
188+
JarRunHandler getJarRunHandler() {
189+
return jarRunHandler;
190+
}
144191
}

flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.runtime.webmonitor.handlers;
2020

21+
import org.apache.flink.annotation.VisibleForTesting;
2122
import org.apache.flink.runtime.rest.messages.MessageParameters;
2223
import org.apache.flink.runtime.rest.messages.MessagePathParameter;
2324
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
@@ -29,7 +30,8 @@
2930
/** Base class of {@link MessageParameters} for {@link JarRunHandler} and {@link JarPlanHandler}. */
3031
abstract class JarMessageParameters extends MessageParameters {
3132

32-
final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
33+
@VisibleForTesting
34+
public final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
3335

3436
final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter();
3537

flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.runtime.webmonitor.handlers;
2020

21+
import org.apache.flink.annotation.VisibleForTesting;
2122
import org.apache.flink.api.common.time.Time;
2223
import org.apache.flink.client.deployment.application.ApplicationRunner;
2324
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
@@ -82,7 +83,8 @@ public JarRunHandler(
8283
}
8384

8485
@Override
85-
protected CompletableFuture<JarRunResponseBody> handleRequest(
86+
@VisibleForTesting
87+
public CompletableFuture<JarRunResponseBody> handleRequest(
8688
@Nonnull final HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request,
8789
@Nonnull final DispatcherGateway gateway)
8890
throws RestHandlerException {

flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.runtime.webmonitor.handlers;
2020

21+
import org.apache.flink.annotation.VisibleForTesting;
2122
import org.apache.flink.api.common.time.Time;
2223
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
2324
import org.apache.flink.runtime.rest.handler.HandlerRequest;
@@ -68,7 +69,8 @@ public JarUploadHandler(
6869
}
6970

7071
@Override
71-
protected CompletableFuture<JarUploadResponseBody> handleRequest(
72+
@VisibleForTesting
73+
public CompletableFuture<JarUploadResponseBody> handleRequest(
7274
@Nonnull final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
7375
@Nonnull final RestfulGateway gateway)
7476
throws RestHandlerException {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.runtime.webmonitor;
19+
20+
import org.apache.flink.api.common.JobID;
21+
import org.apache.flink.api.common.time.Time;
22+
import org.apache.flink.client.deployment.application.ApplicationRunner;
23+
import org.apache.flink.client.program.PackagedProgram;
24+
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.runtime.concurrent.Executors;
26+
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
27+
import org.apache.flink.runtime.rest.handler.HandlerRequest;
28+
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
29+
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
30+
import org.apache.flink.runtime.webmonitor.handlers.JarIdPathParameter;
31+
import org.apache.flink.runtime.webmonitor.handlers.JarRunHandler;
32+
import org.apache.flink.runtime.webmonitor.handlers.JarRunMessageParameters;
33+
import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody;
34+
import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
35+
36+
import org.junit.Rule;
37+
import org.junit.Test;
38+
import org.junit.rules.TemporaryFolder;
39+
40+
import java.nio.file.Files;
41+
import java.nio.file.Path;
42+
import java.nio.file.Paths;
43+
import java.util.Collection;
44+
import java.util.Collections;
45+
import java.util.HashMap;
46+
import java.util.IdentityHashMap;
47+
import java.util.List;
48+
import java.util.Map;
49+
import java.util.Set;
50+
import java.util.concurrent.CompletableFuture;
51+
import java.util.concurrent.TimeUnit;
52+
53+
import static org.junit.Assert.assertEquals;
54+
55+
/** Tests for the {@link WebSubmissionExtension}. */
56+
public class WebSubmissionExtensionTest {
57+
58+
private static final String JAR_NAME = "output-test-program.jar";
59+
60+
@Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
61+
62+
@Test
63+
public void applicationsRunInSeparateThreads() throws Exception {
64+
final Path tempDir = temporaryFolder.getRoot().toPath();
65+
66+
final Path uploadDir = Files.createDirectories(tempDir.resolve("uploadDir"));
67+
// create a copy because the upload handler moves uploaded jars (because it assumes it to be
68+
// a temporary file)
69+
final Path jarFile =
70+
Files.copy(
71+
Paths.get(System.getProperty("targetDir")).resolve(JAR_NAME),
72+
tempDir.resolve("app.jar"));
73+
74+
final DispatcherGateway dispatcherGateway = new TestingDispatcherGateway.Builder().build();
75+
76+
final ThreadCapturingApplicationRunner threadCapturingApplicationRunner =
77+
new ThreadCapturingApplicationRunner();
78+
79+
final WebSubmissionExtension webSubmissionExtension =
80+
new WebSubmissionExtension(
81+
new Configuration(),
82+
() -> CompletableFuture.completedFuture(dispatcherGateway),
83+
Collections.emptyMap(),
84+
new CompletableFuture<>(),
85+
uploadDir,
86+
Executors.directExecutor(),
87+
Time.of(5, TimeUnit.SECONDS),
88+
() -> threadCapturingApplicationRunner);
89+
90+
final String jarPath = uploadJar(webSubmissionExtension, jarFile, dispatcherGateway);
91+
final String jarId = Paths.get(jarPath).getFileName().toString();
92+
93+
final JarRunHandler jarRunHandler = webSubmissionExtension.getJarRunHandler();
94+
95+
final Map<String, String> pathParameters = new HashMap<>();
96+
pathParameters.put(JarIdPathParameter.KEY, jarId);
97+
final HandlerRequest<JarRunRequestBody, JarRunMessageParameters> runRequest =
98+
new HandlerRequest(
99+
new JarRunRequestBody(),
100+
new JarRunMessageParameters(),
101+
pathParameters,
102+
Collections.emptyMap());
103+
104+
// run several applications in sequence, and verify that each thread is unique
105+
int numApplications = 20;
106+
for (int i = 0; i < numApplications; i++) {
107+
jarRunHandler.handleRequest(runRequest, dispatcherGateway).get();
108+
}
109+
assertEquals(numApplications, threadCapturingApplicationRunner.getThreads().size());
110+
}
111+
112+
private static String uploadJar(
113+
WebSubmissionExtension extension, Path jarFile, DispatcherGateway dispatcherGateway)
114+
throws Exception {
115+
final JarUploadHandler jarUploadHandler = extension.getJarUploadHandler();
116+
117+
final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> uploadRequest =
118+
new HandlerRequest<>(
119+
EmptyRequestBody.getInstance(),
120+
EmptyMessageParameters.getInstance(),
121+
Collections.emptyMap(),
122+
Collections.emptyMap(),
123+
Collections.singletonList(jarFile.toFile()));
124+
125+
return jarUploadHandler.handleRequest(uploadRequest, dispatcherGateway).get().getFilename();
126+
}
127+
128+
private static class ThreadCapturingApplicationRunner implements ApplicationRunner {
129+
130+
private final Set<Thread> threads = Collections.newSetFromMap(new IdentityHashMap<>());
131+
132+
@Override
133+
public List<JobID> run(
134+
DispatcherGateway dispatcherGateway,
135+
PackagedProgram program,
136+
Configuration configuration) {
137+
threads.add(Thread.currentThread());
138+
return Collections.singletonList(new JobID());
139+
}
140+
141+
public Collection<Thread> getThreads() {
142+
return threads;
143+
}
144+
}
145+
}

0 commit comments

Comments
 (0)