diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java index 426521b921c..026bfda1379 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -546,13 +547,22 @@ private void addContainerLocalResources(Path destDir, Map FileStatus[] statuses = this.fs.listStatus(destDir); if (statuses != null) { + Set libJarNames = new HashSet<>(Arrays.asList(this.config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_LIB_JAR_LIST).split(","))); + String containerJars = this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY) ? + this.config.getString(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY) : ""; for (FileStatus status : statuses) { + String fileName = status.getPath().getName(); + // Ensure that we are only adding jars that were uploaded by the YarnAppLauncher for this application + if (fileName.contains(".jar") && !(libJarNames.contains(fileName) || containerJars.contains(fileName))) { + continue; + } YarnHelixUtils.addFileAsLocalResource(this.fs, status.getPath(), LocalResourceType.FILE, resourceMap); } } } + protected ByteBuffer getSecurityTokens() throws IOException { Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); Closer closer = Closer.create(); diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java index 4ce40f21dfb..09fae2b18b7 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java @@ -243,6 +243,8 @@ public class GobblinYarnAppLauncher { private final boolean jarCacheEnabled; + private final Set libJarNames = new HashSet<>(); // List of jars that are shared between appMaster and containers + public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration) throws IOException { this.config = config.withValue(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY, ConfigValueFactory.fromAnyRef(System.currentTimeMillis())); @@ -308,7 +310,6 @@ public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_DETACH_ON_EXIT); this.appLauncherMode = ConfigUtils.getString(this.config, GOBBLIN_YARN_APP_LAUNCHER_MODE, DEFAULT_GOBBLIN_YARN_APP_LAUNCHER_MODE); this.jarCacheEnabled = ConfigUtils.getBoolean(this.config, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT); - try { config = addDynamicConfig(config); outputConfigToFile(config); @@ -668,6 +669,7 @@ private Map addAppMasterLocalResources(ApplicationId appl Path unsharedJarsDestDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME); addLibJars(new Path(this.config.getString(GobblinYarnConfigurationKeys.LIB_JARS_DIR_KEY)), Optional.of(appMasterResources), libJarsDestDir, unsharedJarsDestDir, localFs); + this.libJarNames.addAll(appMasterResources.keySet()); LOGGER.info("Added lib jars to directory: {} and execution-private directory: {}", libJarsDestDir, unsharedJarsDestDir); } if (this.config.hasPath(GobblinYarnConfigurationKeys.APP_MASTER_JARS_KEY)) { @@ -814,6 +816,7 @@ protected String buildApplicationMasterCommand(String applicationId, int memoryM .append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR) .append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME).append("=").append(logFileName).append(".").append(ApplicationConstants.STDOUT) .append(" -D").append(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY).append("=").append(config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_LAUNCHER_START_TIME_KEY)) + .append(" -D").append(GobblinYarnConfigurationKeys.YARN_APPLICATION_LIB_JAR_LIST).append("=").append(String.join(",", this.libJarNames)) .append(" ").append(JvmUtils.formatJvmArguments(this.appMasterJvmArgs)) .append(" ").append(appMasterClass.getName()) .append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME) diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java index 93cc23dba6b..5717b21fc29 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java @@ -55,6 +55,9 @@ public class GobblinYarnConfigurationKeys { public static final String JAR_CACHE_DIR = GOBBLIN_YARN_PREFIX + "jar.cache.dir"; + public static final String YARN_APPLICATION_LIB_JAR_LIST = GOBBLIN_YARN_PREFIX + "lib.jar.list"; + + // Used to store the start time of the app launcher to propagate to workers and appmaster public static final String YARN_APPLICATION_LAUNCHER_START_TIME_KEY = GOBBLIN_YARN_PREFIX + "application.start.time";