Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,9 @@ public boolean accept(File dir, String name) {
// Create a local resource to point to the destination jar path
final FileSystem fs = FileSystem.get(conf);

if(fs.getScheme().startsWith("file")) {
// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
if( !fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
fs.getScheme().startsWith("file")) {
LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
+ "specified Hadoop configuration path is wrong and the sytem is using the default Hadoop configuration values."
+ "The Flink YARN client needs to store its files in a distributed file system");
Expand Down Expand Up @@ -627,8 +629,9 @@ public boolean accept(File dir, String name) {
+ "the full application log using this command:\n"
+ "\tyarn logs -applicationId "+appReport.getApplicationId()+"\n"
+ "(It sometimes takes a few seconds until the logs are aggregated)");
System.exit(1);
}

System.exit(0);
}

private void printHelp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public static void registerLocalResource(FileSystem fs, Path remoteRsrcPath, Loc
localResource.setSize(jarStat.getLen());
localResource.setTimestamp(jarStat.getModificationTime());
localResource.setType(LocalResourceType.FILE);
localResource.setVisibility(LocalResourceVisibility.PUBLIC);
localResource.setVisibility(LocalResourceVisibility.APPLICATION);
}

public static void setTokensFor(ContainerLaunchContext amContainer, Path[] paths, Configuration conf) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ public void shutdownAM() throws Exception {

private void close() throws Exception {
if(!isClosed) {
jobManager.getInstanceManager().killTaskManagers();
jobManager.shutdown();
nmClient.close();
rmClient.close();
Expand Down
2 changes: 1 addition & 1 deletion flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,5 @@ log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4

export FLINK_CONF_DIR

$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH $log_setting org.apache.flink.yarn.Client -ship $bin/../ship/ -confDir $FLINK_CONF_DIR -j $FLINK_LIB_DIR/*yarn-uberjar.jar $*
$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH $log_setting org.apache.flink.yarn.Client -ship $bin/../ship/ -confDir $FLINK_CONF_DIR -j $FLINK_LIB_DIR/*yarn-uberjar.jar $*

Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ public boolean isAlive() {
return !isDead;
}

public void stopInstance() {
try {
this.getTaskManagerProxy().stop();
} catch (IOException e) {
if(Log.isDebugEnabled()) {
Log.debug("Error while stopping TaskManager", e);
}
}
}
public void markDead() {
if (isDead) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ public InstanceManager(long heartbeatTimeout, long cleanupInterval) {
public long getHeartbeatTimeout() {
return heartbeatTimeout;
}

/**
* This method is only used by the Flink YARN client to self-destruct a Flink cluster
* by stopping the JVMs of the TaskManagers.
*/
public void killTaskManagers() {
for (Instance i : this.registeredHostsById.values()) {
i.stopInstance();
}
}

public void shutdown() {
synchronized (this.lock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ public interface TaskOperationProtocol extends VersionedProtocol {

TaskOperationResult cancelTask(ExecutionAttemptID executionId) throws IOException;


void stop() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -1299,4 +1299,10 @@ public int getTimeout() {
return timeout;
}
}

@Override
public void stop() throws IOException {
LOG.info("Killing TaskManager");
System.exit(0);
}
}