diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java index 92c69b62f9976..22172ed315102 100644 --- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java +++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java @@ -343,7 +343,9 @@ public boolean accept(File dir, String name) { // Create a local resource to point to the destination jar path final FileSystem fs = FileSystem.get(conf); - if(fs.getScheme().startsWith("file")) { + // hard coded check for the GoogleHDFS client because its not overriding the getScheme() method. + if( !fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") && + fs.getScheme().startsWith("file")) { LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the " + "specified Hadoop configuration path is wrong and the sytem is using the default Hadoop configuration values." + "The Flink YARN client needs to store its files in a distributed file system"); @@ -627,8 +629,9 @@ public boolean accept(File dir, String name) { + "the full application log using this command:\n" + "\tyarn logs -applicationId "+appReport.getApplicationId()+"\n" + "(It sometimes takes a few seconds until the logs are aggregated)"); + System.exit(1); } - + System.exit(0); } private void printHelp() { diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index 42a45922788f8..6b541bf9d923f 100644 --- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -212,7 +212,7 @@ public static void registerLocalResource(FileSystem fs, Path remoteRsrcPath, Loc localResource.setSize(jarStat.getLen()); localResource.setTimestamp(jarStat.getModificationTime()); localResource.setType(LocalResourceType.FILE); - localResource.setVisibility(LocalResourceVisibility.PUBLIC); + localResource.setVisibility(LocalResourceVisibility.APPLICATION); } public static void setTokensFor(ContainerLaunchContext amContainer, Path[] paths, Configuration conf) throws IOException { diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java index 326491881fab0..a28538148632e 100644 --- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java +++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java @@ -521,6 +521,7 @@ public void shutdownAM() throws Exception { private void close() throws Exception { if(!isClosed) { + jobManager.getInstanceManager().killTaskManagers(); jobManager.shutdown(); nmClient.close(); rmClient.close(); diff --git a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh index 9ff4ed3bc13c9..21da505dc662b 100644 --- a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh +++ b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh @@ -52,5 +52,5 @@ log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4 export FLINK_CONF_DIR -$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH $log_setting org.apache.flink.yarn.Client -ship $bin/../ship/ -confDir $FLINK_CONF_DIR -j $FLINK_LIB_DIR/*yarn-uberjar.jar $* +$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH $log_setting org.apache.flink.yarn.Client -ship $bin/../ship/ -confDir $FLINK_CONF_DIR -j $FLINK_LIB_DIR/*yarn-uberjar.jar $* diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java index 0cafcec01d8d3..439d3efc1b2fd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java @@ -130,6 +130,15 @@ public boolean isAlive() { return !isDead; } + public void stopInstance() { + try { + this.getTaskManagerProxy().stop(); + } catch (IOException e) { + if(Log.isDebugEnabled()) { + Log.debug("Error while stopping TaskManager", e); + } + } + } public void markDead() { if (isDead) { return; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java index ced1afe304611..f779048bd1cdc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java @@ -102,6 +102,16 @@ public InstanceManager(long heartbeatTimeout, long cleanupInterval) { public long getHeartbeatTimeout() { return heartbeatTimeout; } + + /** + * This method is only used by the Flink YARN client to self-destruct a Flink cluster + * by stopping the JVMs of the TaskManagers. + */ + public void killTaskManagers() { + for (Instance i : this.registeredHostsById.values()) { + i.stopInstance(); + } + } public void shutdown() { synchronized (this.lock) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java index 7e39047924c96..e2bbfbad56264 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java @@ -36,5 +36,5 @@ public interface TaskOperationProtocol extends VersionedProtocol { TaskOperationResult cancelTask(ExecutionAttemptID executionId) throws IOException; - + void stop() throws IOException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java index 10cbb8aa07f89..03bc8c27c6db1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java @@ -1299,4 +1299,10 @@ public int getTimeout() { return timeout; } } + + @Override + public void stop() throws IOException { + LOG.info("Killing TaskManager"); + System.exit(0); + } }