Skip to content

Commit bb29f5e

Browse files
XCompzentol
authored andcommitted
[FLINK-21445][clients] Adds configuration to ClassPathPackagedProgramRetriever
1 parent 4ef1427 commit bb29f5e

File tree

5 files changed

+91
-23
lines changed

5 files changed

+91
-23
lines changed

flink-clients/src/main/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever.java

+20-5
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.client.program.PackagedProgramRetriever;
2525
import org.apache.flink.client.program.PackagedProgramUtils;
2626
import org.apache.flink.client.program.ProgramInvocationException;
27+
import org.apache.flink.configuration.Configuration;
2728
import org.apache.flink.util.ExceptionUtils;
2829
import org.apache.flink.util.FileUtils;
2930
import org.apache.flink.util.FlinkException;
@@ -76,19 +77,23 @@ public class ClassPathPackagedProgramRetriever implements PackagedProgramRetriev
7677

7778
@Nullable private final File jarFile;
7879

80+
@Nonnull private final Configuration configuration;
81+
7982
private ClassPathPackagedProgramRetriever(
8083
@Nonnull String[] programArguments,
8184
@Nullable String jobClassName,
8285
@Nonnull Supplier<Iterable<File>> jarsOnClassPath,
8386
@Nullable File userLibDirectory,
84-
@Nullable File jarFile)
87+
@Nullable File jarFile,
88+
@Nonnull Configuration configuration)
8589
throws IOException {
8690
this.userLibDirectory = userLibDirectory;
8791
this.programArguments = requireNonNull(programArguments, "programArguments");
8892
this.jobClassName = jobClassName;
8993
this.jarsOnClassPath = requireNonNull(jarsOnClassPath);
9094
this.userClassPaths = discoverUserClassPaths(userLibDirectory);
9195
this.jarFile = jarFile;
96+
this.configuration = configuration;
9297
}
9398

9499
private Collection<URL> discoverUserClassPaths(@Nullable File jobDir) throws IOException {
@@ -129,6 +134,7 @@ public PackagedProgram getPackagedProgram() throws FlinkException {
129134
.setArguments(programArguments)
130135
.setJarFile(jarFile)
131136
.setEntryPointClassName(jobClassName)
137+
.setConfiguration(configuration)
132138
.build();
133139
}
134140

@@ -137,6 +143,7 @@ public PackagedProgram getPackagedProgram() throws FlinkException {
137143
.setUserClassPaths(new ArrayList<>(userClassPaths))
138144
.setEntryPointClassName(entryClass)
139145
.setArguments(programArguments)
146+
.setConfiguration(configuration)
140147
.build();
141148
} catch (ProgramInvocationException e) {
142149
throw new FlinkException("Could not load the provided entrypoint class.", e);
@@ -252,8 +259,11 @@ public static class Builder {
252259

253260
private File jarFile;
254261

255-
private Builder(String[] programArguments) {
262+
private final Configuration configuration;
263+
264+
private Builder(String[] programArguments, Configuration configuration) {
256265
this.programArguments = requireNonNull(programArguments);
266+
this.configuration = requireNonNull(configuration);
257267
}
258268

259269
public Builder setJobClassName(@Nullable String jobClassName) {
@@ -278,11 +288,16 @@ public Builder setJarFile(File file) {
278288

279289
public ClassPathPackagedProgramRetriever build() throws IOException {
280290
return new ClassPathPackagedProgramRetriever(
281-
programArguments, jobClassName, jarsOnClassPath, userLibDirectory, jarFile);
291+
programArguments,
292+
jobClassName,
293+
jarsOnClassPath,
294+
userLibDirectory,
295+
jarFile,
296+
configuration);
282297
}
283298
}
284299

285-
public static Builder newBuilder(String[] programArguments) {
286-
return new Builder(programArguments);
300+
public static Builder newBuilder(String[] programArguments, Configuration configuration) {
301+
return new Builder(programArguments, configuration);
287302
}
288303
}

flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java

+58-10
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,17 @@
3131
import org.apache.flink.configuration.CoreOptions;
3232
import org.apache.flink.configuration.PipelineOptions;
3333
import org.apache.flink.configuration.PipelineOptionsInternal;
34+
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
3435
import org.apache.flink.runtime.jobgraph.JobGraph;
3536
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
37+
import org.apache.flink.util.ChildFirstClassLoader;
3638
import org.apache.flink.util.ExceptionUtils;
3739
import org.apache.flink.util.FileUtils;
3840
import org.apache.flink.util.FlinkException;
3941
import org.apache.flink.util.TestLogger;
4042
import org.apache.flink.util.function.FunctionUtils;
4143

44+
import org.hamcrest.core.IsInstanceOf;
4245
import org.junit.Assert;
4346
import org.junit.BeforeClass;
4447
import org.junit.ClassRule;
@@ -65,6 +68,7 @@
6568
import static org.hamcrest.Matchers.hasItem;
6669
import static org.hamcrest.Matchers.hasProperty;
6770
import static org.hamcrest.Matchers.is;
71+
import static org.hamcrest.Matchers.not;
6872
import static org.junit.Assert.assertEquals;
6973
import static org.junit.Assert.assertThat;
7074
import static org.junit.Assert.assertTrue;
@@ -142,7 +146,7 @@ public void testJobGraphRetrieval()
142146
configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId.toHexString());
143147

144148
final ClassPathPackagedProgramRetriever retrieverUnderTest =
145-
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
149+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new Configuration())
146150
.setJobClassName(TestJob.class.getCanonicalName())
147151
.build();
148152

@@ -161,7 +165,7 @@ public void testJobGraphRetrievalFromJar()
161165
throws IOException, FlinkException, ProgramInvocationException {
162166
final File testJar = TestJob.getTestJobJar();
163167
final ClassPathPackagedProgramRetriever retrieverUnderTest =
164-
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
168+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new Configuration())
165169
.setJarsOnClassPath(() -> Collections.singleton(testJar))
166170
.build();
167171

@@ -176,7 +180,7 @@ public void testJobGraphRetrievalJobClassNameHasPrecedenceOverClassPath()
176180
final File testJar = new File("non-existing");
177181

178182
final ClassPathPackagedProgramRetriever retrieverUnderTest =
179-
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
183+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new Configuration())
180184
// Both a class name is specified and a JAR "is" on the class path
181185
// The class name should have precedence.
182186
.setJobClassName(TestJob.class.getCanonicalName())
@@ -200,7 +204,7 @@ public void testSavepointRestoreSettings()
200204
SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, configuration);
201205

202206
final ClassPathPackagedProgramRetriever retrieverUnderTest =
203-
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
207+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new Configuration())
204208
.setJobClassName(TestJob.class.getCanonicalName())
205209
.build();
206210

@@ -249,7 +253,7 @@ public void testJobGraphRetrievalFailIfJobDirDoesNotHaveEntryClass()
249253
throws IOException, ProgramInvocationException {
250254
final File testJar = TestJob.getTestJobJar();
251255
final ClassPathPackagedProgramRetriever retrieverUnderTest =
252-
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
256+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new Configuration())
253257
.setJarsOnClassPath(() -> Collections.singleton(testJar))
254258
.setUserLibDirectory(userDirHasNotEntryClass)
255259
.build();
@@ -268,7 +272,7 @@ public void testJobGraphRetrievalFailIfJobDirDoesNotHaveEntryClass()
268272
public void testJobGraphRetrievalFailIfDoesNotFindTheEntryClassInTheJobDir()
269273
throws IOException, ProgramInvocationException {
270274
final ClassPathPackagedProgramRetriever retrieverUnderTest =
271-
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
275+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new Configuration())
272276
.setJobClassName(TestJobInfo.JOB_CLASS)
273277
.setJarsOnClassPath(Collections::emptyList)
274278
.setUserLibDirectory(userDirHasNotEntryClass)
@@ -288,7 +292,7 @@ public void testJobGraphRetrievalFailIfDoesNotFindTheEntryClassInTheJobDir()
288292
public void testRetrieveCorrectUserClasspathsWithoutSpecifiedEntryClass()
289293
throws IOException, FlinkException, ProgramInvocationException {
290294
final ClassPathPackagedProgramRetriever retrieverUnderTest =
291-
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
295+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new Configuration())
292296
.setJarsOnClassPath(Collections::emptyList)
293297
.setUserLibDirectory(userDirHasEntryClass)
294298
.build();
@@ -303,7 +307,7 @@ public void testRetrieveCorrectUserClasspathsWithoutSpecifiedEntryClass()
303307
public void testRetrieveCorrectUserClasspathsWithSpecifiedEntryClass()
304308
throws IOException, FlinkException, ProgramInvocationException {
305309
final ClassPathPackagedProgramRetriever retrieverUnderTest =
306-
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
310+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new Configuration())
307311
.setJobClassName(TestJobInfo.JOB_CLASS)
308312
.setJarsOnClassPath(Collections::emptyList)
309313
.setUserLibDirectory(userDirHasEntryClass)
@@ -320,7 +324,7 @@ public void testRetrieveFromJarFileWithoutUserLib()
320324
throws IOException, FlinkException, ProgramInvocationException {
321325
final File testJar = TestJob.getTestJobJar();
322326
final ClassPathPackagedProgramRetriever retrieverUnderTest =
323-
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
327+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new Configuration())
324328
.setJarFile(testJar)
325329
.build();
326330
final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration());
@@ -336,7 +340,7 @@ public void testRetrieveFromJarFileWithUserLib()
336340
throws IOException, FlinkException, ProgramInvocationException {
337341
final File testJar = TestJob.getTestJobJar();
338342
final ClassPathPackagedProgramRetriever retrieverUnderTest =
339-
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS)
343+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, new Configuration())
340344
.setJarFile(testJar)
341345
.setUserLibDirectory(userDirHasEntryClass)
342346
.build();
@@ -350,6 +354,50 @@ public void testRetrieveFromJarFileWithUserLib()
350354
containsInAnyOrder(expectedURLs.stream().map(URL::toString).toArray()));
351355
}
352356

357+
@Test
358+
public void testChildFirstDefaultConfiguration() throws FlinkException, IOException {
359+
// this is a sanity check to backup testConfigurationIsConsidered
360+
final Configuration configuration = new Configuration();
361+
// CHECK_LEAKED_CLASSLOADER has to be disabled to enable the instanceof check later on in
362+
// this test. Otherwise, the actual instance would be hidden by a wrapper
363+
configuration.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
364+
365+
final ClassPathPackagedProgramRetriever retriever =
366+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, configuration)
367+
.setUserLibDirectory(userDirHasEntryClass)
368+
.setJobClassName(TestJobInfo.JOB_CLASS)
369+
.build();
370+
371+
assertThat(
372+
retriever.getPackagedProgram().getUserCodeClassLoader(),
373+
IsInstanceOf.instanceOf(ChildFirstClassLoader.class));
374+
}
375+
376+
@Test
377+
public void testConfigurationIsConsidered() throws FlinkException, IOException {
378+
final String parentFirstConfigValue = "parent-first";
379+
// we want to make sure that parent-first is not set as a default
380+
assertThat(
381+
CoreOptions.CLASSLOADER_RESOLVE_ORDER.defaultValue(),
382+
not(is(parentFirstConfigValue)));
383+
384+
final Configuration configuration = new Configuration();
385+
configuration.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, parentFirstConfigValue);
386+
// CHECK_LEAKED_CLASSLOADER has to be disabled to enable the instanceof check later on in
387+
// this test. Otherwise, the actual instance would be hidden by a wrapper
388+
configuration.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
389+
390+
final ClassPathPackagedProgramRetriever retriever =
391+
ClassPathPackagedProgramRetriever.newBuilder(PROGRAM_ARGUMENTS, configuration)
392+
.setUserLibDirectory(userDirHasEntryClass)
393+
.setJobClassName(TestJobInfo.JOB_CLASS)
394+
.build();
395+
396+
assertThat(
397+
retriever.getPackagedProgram().getUserCodeClassLoader(),
398+
IsInstanceOf.instanceOf(FlinkUserCodeClassLoaders.ParentFirstClassLoader.class));
399+
}
400+
353401
private JobGraph retrieveJobGraph(
354402
ClassPathPackagedProgramRetriever retrieverUnderTest, Configuration configuration)
355403
throws FlinkException, ProgramInvocationException, MalformedURLException {

flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,15 @@ public static void main(String[] args) {
6363
new StandaloneApplicationClusterConfigurationParserFactory(),
6464
StandaloneApplicationClusterEntryPoint.class);
6565

66+
Configuration configuration = loadConfigurationFromClusterConfig(clusterConfiguration);
6667
PackagedProgram program = null;
6768
try {
68-
program = getPackagedProgram(clusterConfiguration);
69+
program = getPackagedProgram(clusterConfiguration, configuration);
6970
} catch (Exception e) {
7071
LOG.error("Could not create application program.", e);
7172
System.exit(1);
7273
}
7374

74-
Configuration configuration = loadConfigurationFromClusterConfig(clusterConfiguration);
7575
try {
7676
configureExecution(configuration, program);
7777
} catch (Exception e) {
@@ -96,20 +96,25 @@ static Configuration loadConfigurationFromClusterConfig(
9696
}
9797

9898
private static PackagedProgram getPackagedProgram(
99-
final StandaloneApplicationClusterConfiguration clusterConfiguration)
99+
final StandaloneApplicationClusterConfiguration clusterConfiguration,
100+
Configuration configuration)
100101
throws IOException, FlinkException {
101102
final PackagedProgramRetriever programRetriever =
102103
getPackagedProgramRetriever(
103-
clusterConfiguration.getArgs(), clusterConfiguration.getJobClassName());
104+
clusterConfiguration.getArgs(),
105+
clusterConfiguration.getJobClassName(),
106+
configuration);
104107
return programRetriever.getPackagedProgram();
105108
}
106109

107110
private static PackagedProgramRetriever getPackagedProgramRetriever(
108-
final String[] programArguments, @Nullable final String jobClassName)
111+
final String[] programArguments,
112+
@Nullable final String jobClassName,
113+
Configuration configuration)
109114
throws IOException {
110115
final File userLibDir = ClusterEntrypointUtils.tryFindUserLibDirectory().orElse(null);
111116
final ClassPathPackagedProgramRetriever.Builder retrieverBuilder =
112-
ClassPathPackagedProgramRetriever.newBuilder(programArguments)
117+
ClassPathPackagedProgramRetriever.newBuilder(programArguments, configuration)
113118
.setUserLibDirectory(userLibDir)
114119
.setJobClassName(jobClassName);
115120
return retrieverBuilder.build();

flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ private static PackagedProgramRetriever getPackagedProgramRetriever(
109109

110110
final File userLibDir = ClusterEntrypointUtils.tryFindUserLibDirectory().orElse(null);
111111
final ClassPathPackagedProgramRetriever.Builder retrieverBuilder =
112-
ClassPathPackagedProgramRetriever.newBuilder(programArguments)
112+
ClassPathPackagedProgramRetriever.newBuilder(programArguments, configuration)
113113
.setUserLibDirectory(userLibDir)
114114
.setJobClassName(jobClassName);
115115

flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnApplicationClusterEntryPoint.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ private static PackagedProgramRetriever getPackagedProgramRetriever(
134134
final File userLibDir = YarnEntrypointUtils.getUsrLibDir(configuration).orElse(null);
135135
final File userApplicationJar = getUserApplicationJar(userLibDir, configuration);
136136
final ClassPathPackagedProgramRetriever.Builder retrieverBuilder =
137-
ClassPathPackagedProgramRetriever.newBuilder(programArguments)
137+
ClassPathPackagedProgramRetriever.newBuilder(programArguments, configuration)
138138
.setUserLibDirectory(userLibDir)
139139
.setJarFile(userApplicationJar)
140140
.setJobClassName(jobClassName);

0 commit comments

Comments
 (0)