From c80e340e99e235780363d3d63be2f87cfabfdcd1 Mon Sep 17 00:00:00 2001 From: Deb Date: Sun, 14 Aug 2022 18:37:33 +0530 Subject: [PATCH 1/5] Numa support in DCE --- .../nodemanager/DefaultContainerExecutor.java | 34 +++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 7d18016d95ade..cb6e30d756467 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -20,7 +20,9 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.numaAwarenessEnabled; +import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.classification.VisibleForTesting; import java.io.DataOutputStream; import java.io.File; @@ -55,6 +57,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocator; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext; @@ -135,9 +140,10 @@ protected void setScriptExecutable(Path script, String owner) lfs.setPermission(script, ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); } + private NumaResourceAllocator numaResourceAllocator; @Override public void init(Context nmContext) throws IOException { - // nothing to do or verify here + numaResourceAllocator = new NumaResourceAllocator(nmContext); } @Override @@ -209,10 +215,11 @@ protected ContainerLocalizer createContainerLocalizer(String user, return localizer; } + private volatile Container container; @Override public int launchContainer(ContainerStartContext ctx) throws IOException, ConfigurationException { - Container container = ctx.getContainer(); + container = ctx.getContainer(); Path nmPrivateContainerScriptPath = ctx.getNmPrivateContainerScriptPath(); Path nmPrivateTokensPath = ctx.getNmPrivateTokensPath(); Path nmPrivateKeystorePath = ctx.getNmPrivateKeystorePath(); @@ -382,6 +389,9 @@ protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, String[] command = getRunCommand(wrapperScriptPath, containerIdStr, user, pidFile, this.getConf(), resource); + if(numaAwarenessEnabled(this.getConf())) { + addNumaCommands(command, container); + } LOG.info("launchContainer: {}", Arrays.toString(command)); return new ShellCommandExecutor( command, @@ -391,6 +401,26 @@ protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, false); } + + private void addNumaCommands(String[] commands, Container container){ + try { + NumaResourceAllocation numaAllocation = numaResourceAllocator.allocateNumaNodes(container); + if(numaAllocation != null){ + String[] numaCommand = new String[3]; + numaCommand[0] = this.getConf().get(YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD, + YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD);; + numaCommand[1] = "--interleave=" + String.join(",", numaAllocation.getMemNodes()); + numaCommand[2] = "--cpunodebind=" + String.join(",", numaAllocation.getCpuNodes()); + ArrayUtils.add(commands, numaCommand); + } + } catch (ResourceHandlerException e){ + LOG.warn("Exception assigning numa :- ", e); + } + + + + } + /** * Create a {@link LocalWrapperScriptBuilder} for the given container ID * and path that is appropriate to the current platform. From 84af21f462802ee7971fe21841b10e73be98459c Mon Sep 17 00:00:00 2001 From: Samrat002 Date: Sun, 14 Aug 2022 18:38:56 +0530 Subject: [PATCH 2/5] Revert "Numa support in DCE" This reverts commit 571a0657ccfbae3ef9dc0bd1dddc032008b45f91. --- .../nodemanager/DefaultContainerExecutor.java | 34 ++----------------- 1 file changed, 2 insertions(+), 32 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index cb6e30d756467..7d18016d95ade 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -20,9 +20,7 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; -import static org.apache.hadoop.yarn.conf.YarnConfiguration.numaAwarenessEnabled; -import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.classification.VisibleForTesting; import java.io.DataOutputStream; import java.io.File; @@ -57,9 +55,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocation; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocator; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext; @@ -140,10 +135,9 @@ protected void setScriptExecutable(Path script, String owner) lfs.setPermission(script, ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); } - private NumaResourceAllocator numaResourceAllocator; @Override public void init(Context nmContext) throws IOException { - numaResourceAllocator = new NumaResourceAllocator(nmContext); + // nothing to do or verify here } @Override @@ -215,11 +209,10 @@ protected ContainerLocalizer createContainerLocalizer(String user, return localizer; } - private volatile Container container; @Override public int launchContainer(ContainerStartContext ctx) throws IOException, ConfigurationException { - container = ctx.getContainer(); + Container container = ctx.getContainer(); Path nmPrivateContainerScriptPath = ctx.getNmPrivateContainerScriptPath(); Path nmPrivateTokensPath = ctx.getNmPrivateTokensPath(); Path nmPrivateKeystorePath = ctx.getNmPrivateKeystorePath(); @@ -389,9 +382,6 @@ protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, String[] command = getRunCommand(wrapperScriptPath, containerIdStr, user, pidFile, this.getConf(), resource); - if(numaAwarenessEnabled(this.getConf())) { - addNumaCommands(command, container); - } LOG.info("launchContainer: {}", Arrays.toString(command)); return new ShellCommandExecutor( command, @@ -401,26 +391,6 @@ protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, false); } - - private void addNumaCommands(String[] commands, Container container){ - try { - NumaResourceAllocation numaAllocation = numaResourceAllocator.allocateNumaNodes(container); - if(numaAllocation != null){ - String[] numaCommand = new String[3]; - numaCommand[0] = this.getConf().get(YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD, - YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD);; - numaCommand[1] = "--interleave=" + String.join(",", numaAllocation.getMemNodes()); - numaCommand[2] = "--cpunodebind=" + String.join(",", numaAllocation.getCpuNodes()); - ArrayUtils.add(commands, numaCommand); - } - } catch (ResourceHandlerException e){ - LOG.warn("Exception assigning numa :- ", e); - } - - - - } - /** * Create a {@link LocalWrapperScriptBuilder} for the given container ID * and path that is appropriate to the current platform. From 6172d5fedad395f2c2465e9c073d7082c7706720 Mon Sep 17 00:00:00 2001 From: Samrat002 Date: Sun, 14 Aug 2022 18:42:19 +0530 Subject: [PATCH 3/5] Numa support in DCE Fix checkstyle error Remove container reference Adding test for the numa changes for DCE Remove unused variable creation releasing numa resources when container closes and reassign during resassign time Addressing to comments Fixing failed test case Fixing failed test case Fix extra spaces Add test for reaquiredContainer Fix test error correcting the test cases properly correcting the test cases properly correcting the test cases properly correcting the test cases properly correcting the test cases properly Putting extra logs --- .../nodemanager/DefaultContainerExecutor.java | 148 +++++++++++- .../WindowsSecureContainerExecutor.java | 4 +- .../resources/numa/NumaResourceAllocator.java | 2 +- .../TestDefaultContainerExecutor.java | 214 +++++++++++++++++- .../numa/TestNumaResourceAllocator.java | 2 +- 5 files changed, 355 insertions(+), 15 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 7d18016d95ade..6d18654ef5f56 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.numaAwarenessEnabled; import org.apache.hadoop.classification.VisibleForTesting; import java.io.DataOutputStream; @@ -28,12 +29,13 @@ import java.io.IOException; import java.io.PrintStream; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.Arrays; +import java.util.ArrayList; import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Optional; + import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.fs.FileContext; @@ -51,14 +53,19 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ConfigurationException; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocator; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; @@ -86,6 +93,8 @@ public class DefaultContainerExecutor extends ContainerExecutor { private String logDirPermissions = null; + private NumaResourceAllocator numaResourceAllocator; + /** * Default constructor for use in testing. */ @@ -137,7 +146,17 @@ protected void setScriptExecutable(Path script, String owner) @Override public void init(Context nmContext) throws IOException { - // nothing to do or verify here + if(numaAwarenessEnabled(getConf())) { + numaResourceAllocator = new NumaResourceAllocator(nmContext); + try { + numaResourceAllocator.init(this.getConf()); + LOG.info("NUMA resources allocation is enabled in DefaultContainer Executor," + + " Successfully initialized NUMA resources allocator."); + } catch (YarnException e) { + LOG.warn("Improper NUMA configuration provided.", e); + throw new IOException("Failed to initialize configured numa subsystem!"); + } + } } @Override @@ -300,11 +319,28 @@ public int launchContainer(ContainerStartContext ctx) setScriptExecutable(launchDst, user); setScriptExecutable(sb.getWrapperScriptPath(), user); + // adding numa commands based on configuration + String[] numaCommands = new String[]{}; + + if (numaResourceAllocator != null) { + try { + NumaResourceAllocation numaResourceAllocation = + numaResourceAllocator.allocateNumaNodes(container); + if (numaResourceAllocation != null) { + numaCommands = getNumaCommands(numaResourceAllocation); + } + } catch (ResourceHandlerException e) { + LOG.error("NumaResource Allocation failed!", e); + throw new IOException("NumaResource Allocation Error!", e); + } + } + shExec = buildCommandExecutor(sb.getWrapperScriptPath().toString(), - containerIdStr, user, pidFile, container.getResource(), - new File(containerWorkDir.toUri().getPath()), - container.getLaunchContext().getEnvironment()); - + containerIdStr, user, pidFile, container.getResource(), + new File(containerWorkDir.toUri().getPath()), + container.getLaunchContext().getEnvironment(), + numaCommands); + if (isContainerActive(containerId)) { shExec.execute(); } else { @@ -350,6 +386,7 @@ public int launchContainer(ContainerStartContext ctx) return exitCode; } finally { if (shExec != null) shExec.close(); + postComplete(containerId); } return 0; } @@ -372,16 +409,19 @@ public int relaunchContainer(ContainerStartContext ctx) * as the current working directory for the command. If null, * the current working directory is not modified. * @param environment the container environment + * @param numaCommands list of prefix numa commands * @return the new {@link ShellCommandExecutor} * @see ShellCommandExecutor */ - protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, - String containerIdStr, String user, Path pidFile, Resource resource, - File workDir, Map environment) { - + protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, + String containerIdStr, String user, Path pidFile, Resource resource, + File workDir, Map environment, String[] numaCommands) { + String[] command = getRunCommand(wrapperScriptPath, containerIdStr, user, pidFile, this.getConf(), resource); + command = concatStringCommands(command, numaCommands); + LOG.info("launchContainer: {}", Arrays.toString(command)); return new ShellCommandExecutor( command, @@ -1040,4 +1080,92 @@ public void updateYarnSysFS(Context ctx, String user, String appId, String spec) throws IOException { throw new ServiceStateException("Implementation unavailable"); } + + @Override + public int reacquireContainer(ContainerReacquisitionContext ctx) + throws IOException, InterruptedException { + try { + if (numaResourceAllocator != null) { + numaResourceAllocator.recoverNumaResource(ctx.getContainerId()); + } + return super.reacquireContainer(ctx); + } finally { + postComplete(ctx.getContainerId()); + } + } + + /** + * clean up and release of resources. + * + * @param containerId containerId of running container + */ + public void postComplete(final ContainerId containerId) { + if (numaResourceAllocator != null) { + try { + numaResourceAllocator.releaseNumaResource(containerId); + } catch (ResourceHandlerException e) { + LOG.warn("NumaResource release failed for " + + "containerId: {}. Exception: ", containerId, e); + } + } + } + + /** + * @param resourceAllocation NonNull NumaResourceAllocation object reference + * @return Array of numa specific commands + */ + String[] getNumaCommands(NumaResourceAllocation resourceAllocation) { + String[] numaCommand = new String[3]; + numaCommand[0] = this.getConf().get(YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD, + YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD); + numaCommand[1] = "--interleave=" + String.join(",", resourceAllocation.getMemNodes()); + numaCommand[2] = "--cpunodebind=" + String.join(",", resourceAllocation.getCpuNodes()); + return numaCommand; + + } + + /** + * @param firstStringArray Array of String + * @param secondStringArray Array of String + * @return combined array of string where first elements are from firstStringArray + * and later are the elements from secondStringArray + */ + String[] concatStringCommands(String[] firstStringArray, String[] secondStringArray) { + + if(firstStringArray == null && secondStringArray == null) { + return null; + } + + int len = 0; + + if(firstStringArray != null){ + len = len + firstStringArray.length; + } + + if(secondStringArray != null){ + len = len + secondStringArray.length; + } + + if (len == 0) { + return new String[]{}; + } + + String[] ret = new String[len]; + int idx = 0; + for (int i=0; firstStringArray !=null && i < firstStringArray.length; i++) { + ret[idx] = firstStringArray[i]; + idx++; + } + for (int i=0; secondStringArray !=null && i < secondStringArray.length; i++) { + ret[idx] = secondStringArray[i]; + idx++; + } + return ret; + } + + @VisibleForTesting + public void setNumaResourceAllocator(NumaResourceAllocator numaResourceAllocator) { + this.numaResourceAllocator = numaResourceAllocator; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java index 6132e579ef982..9d57f8fff4fe5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java @@ -718,10 +718,10 @@ public void startLocalizer(LocalizerStartContext ctx) throws IOException, @Override protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, String containerIdStr, String userName, Path pidFile, Resource resource, - File wordDir, Map environment) { + File wordDir, Map environment, String[] numaCommands) { return new WintuilsProcessStubExecutor( wordDir.toString(), - containerIdStr, userName, pidFile.toString(), + containerIdStr, userName, pidFile.toString(), "cmd /c " + wrapperScriptPath); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java index 48d5bfe95a2d6..2deaf16880a0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceAllocator.java @@ -143,7 +143,7 @@ public void init(Configuration conf) throws YarnException { } @VisibleForTesting - String executeNGetCmdOutput(Configuration conf) throws YarnException { + public String executeNGetCmdOutput(Configuration conf) throws YarnException { String numaCtlCmd = conf.get( YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD, YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java index a5c115283a021..3e5571964351e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java @@ -20,8 +20,8 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.isA; @@ -39,10 +39,12 @@ import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; @@ -60,23 +62,32 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ConfigurationException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings.AssignedResources; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocator; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.MockLocalizerHeartbeatResponse; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -86,6 +97,14 @@ public class TestDefaultContainerExecutor { private static Path BASE_TMP_PATH = new Path("target", TestDefaultContainerExecutor.class.getSimpleName()); + private YarnConfiguration yarnConfiguration; + + private DefaultContainerExecutor containerExecutor; + + private Container mockContainer; + + private NumaResourceAllocator numaResourceAllocator; + @AfterClass public static void deleteTmpFiles() throws IOException { FileContext lfs = FileContext.getLocalFSFileContext(); @@ -736,4 +755,197 @@ public void testPickDirectory() throws Exception { // new FsPermission(ApplicationLocalizer.LOGDIR_PERM), true); // } + @Before + public void setUp() throws IOException, YarnException { + yarnConfiguration = new YarnConfiguration(); + setNumaConfig(); + Context mockContext = createAndGetMockContext(); + NMStateStoreService nmStateStoreService = + mock(NMStateStoreService.class); + when(mockContext.getNMStateStore()).thenReturn(nmStateStoreService); + numaResourceAllocator = new NumaResourceAllocator(mockContext) { + @Override + public String executeNGetCmdOutput(Configuration config) + throws YarnRuntimeException { + return getNumaCmdOutput(); + } + }; + + numaResourceAllocator.init(yarnConfiguration); + FileContext lfs = FileContext.getLocalFSFileContext(); + containerExecutor = new DefaultContainerExecutor(lfs) { + @Override + public Configuration getConf() { + return yarnConfiguration; + } + }; + containerExecutor.setNumaResourceAllocator(numaResourceAllocator); + mockContainer = mock(Container.class); + } + + private void setNumaConfig() { + yarnConfiguration.set(YarnConfiguration.NM_NUMA_AWARENESS_ENABLED, "true"); + yarnConfiguration.set(YarnConfiguration.NM_NUMA_AWARENESS_READ_TOPOLOGY, "true"); + yarnConfiguration.set(YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD, "/usr/bin/numactl"); + } + + + private String getNumaCmdOutput() { + // architecture of 8 cpu cores + // randomly picked size of memory + return "available: 2 nodes (0-1)\n\t" + + "node 0 cpus: 0 2 4 6\n\t" + + "node 0 size: 73717 MB\n\t" + + "node 0 free: 73717 MB\n\t" + + "node 1 cpus: 1 3 5 7\n\t" + + "node 1 size: 73717 MB\n\t" + + "node 1 free: 73717 MB\n\t" + + "node distances:\n\t" + + "node 0 1\n\t" + + "0: 10 20\n\t" + + "1: 20 10"; + } + + private Context createAndGetMockContext() { + Context mockContext = mock(Context.class); + @SuppressWarnings("unchecked") + ConcurrentHashMap mockContainers = mock( + ConcurrentHashMap.class); + mockContainer = mock(Container.class); + when(mockContainer.getResourceMappings()) + .thenReturn(new ResourceMappings()); + when(mockContainers.get(any())).thenReturn(mockContainer); + when(mockContext.getContainers()).thenReturn(mockContainers); + when(mockContainer.getResource()).thenReturn(Resource.newInstance(2048, 2)); + return mockContext; + } + + private void testAllocateNumaResource(String containerId, Resource resource, + String memNodes, String cpuNodes) throws Exception { + when(mockContainer.getContainerId()) + .thenReturn(ContainerId.fromString(containerId)); + when(mockContainer.getResource()).thenReturn(resource); + NumaResourceAllocation numaResourceAllocation = + numaResourceAllocator.allocateNumaNodes(mockContainer); + String[] commands = containerExecutor.getNumaCommands(numaResourceAllocation); + assertEquals(Arrays.asList(commands), Arrays.asList("/usr/bin/numactl", + "--interleave=" + memNodes, "--cpunodebind=" + cpuNodes)); + } + + @Test + public void testAllocateNumaMemoryResource() throws Exception { + // keeping cores constant for testing memory resources + + // allocates node 0 for memory and cpu + testAllocateNumaResource("container_1481156246874_0001_01_000001", + Resource.newInstance(2048, 2), "0", "0"); + + // allocates node 1 for memory and cpu since allocator uses round robin assignment + testAllocateNumaResource("container_1481156246874_0001_01_000002", + Resource.newInstance(60000, 2), "1", "1"); + + // allocates node 0,1 for memory since there is no sufficient memory in any one node + testAllocateNumaResource("container_1481156246874_0001_01_000003", + Resource.newInstance(80000, 2), "0,1", "0"); + + // returns null since there are no sufficient resources available for the request + when(mockContainer.getContainerId()).thenReturn( + ContainerId.fromString("container_1481156246874_0001_01_000004")); + when(mockContainer.getResource()) + .thenReturn(Resource.newInstance(80000, 2)); + Assert.assertNull(numaResourceAllocator.allocateNumaNodes(mockContainer)); + + // allocates node 1 for memory and cpu + testAllocateNumaResource("container_1481156246874_0001_01_000005", + Resource.newInstance(1024, 2), "1", "1"); + } + + @Test + public void testAllocateNumaCpusResource() throws Exception { + // keeping memory constant + + // allocates node 0 for memory and cpu + testAllocateNumaResource("container_1481156246874_0001_01_000001", + Resource.newInstance(2048, 2), "0", "0"); + + // allocates node 1 for memory and cpu since allocator uses round robin assignment + testAllocateNumaResource("container_1481156246874_0001_01_000002", + Resource.newInstance(2048, 2), "1", "1"); + + // allocates node 0,1 for cpus since there is are no sufficient cpus available in any one node + testAllocateNumaResource("container_1481156246874_0001_01_000003", + Resource.newInstance(2048, 3), "0", "0,1"); + + // returns null since there are no sufficient resources available for the request + when(mockContainer.getContainerId()).thenReturn( + ContainerId.fromString("container_1481156246874_0001_01_000004")); + when(mockContainer.getResource()).thenReturn(Resource.newInstance(2048, 2)); + Assert.assertNull(numaResourceAllocator.allocateNumaNodes(mockContainer)); + + // allocates node 1 for memory and cpu + testAllocateNumaResource("container_1481156246874_0001_01_000005", + Resource.newInstance(2048, 1), "1", "1"); + } + + @Test + public void testReacquireContainer() throws Exception { + @SuppressWarnings("unchecked") + ConcurrentHashMap mockContainers = mock( + ConcurrentHashMap.class); + Context mockContext = mock(Context.class); + NMStateStoreService mock = mock(NMStateStoreService.class); + when(mockContext.getNMStateStore()).thenReturn(mock); + ResourceMappings resourceMappings = new ResourceMappings(); + AssignedResources assignedRscs = new AssignedResources(); + when(mockContainer.getResource()) + .thenReturn(Resource.newInstance(142900, 2)); + ContainerId cid = ContainerId.fromString("container_1481156246874_0001_01_000001"); + when(mockContainer.getContainerId()).thenReturn(cid); + NumaResourceAllocation numaResourceAllocation = + numaResourceAllocator.allocateNumaNodes(mockContainer); + assignedRscs.updateAssignedResources(Arrays.asList(numaResourceAllocation)); + resourceMappings.addAssignedResources("numa", assignedRscs); + when(mockContainer.getResourceMappings()).thenReturn(resourceMappings); + when(mockContainers.get(any())).thenReturn(mockContainer); + when(mockContext.getContainers()).thenReturn(mockContainers); + + // recovered numa resources should be added to the used resources and + // remaining will be available for further allocation. + + ContainerReacquisitionContext containerReacquisitionContext = + new ContainerReacquisitionContext.Builder() + .setContainerId(cid) + .setUser("user") + .setContainer(mockContainer) + .build(); + + containerExecutor.reacquireContainer(containerReacquisitionContext); + + // returns null since there are no sufficient resources available for the request + when(mockContainer.getContainerId()).thenReturn( + ContainerId.fromString("container_1481156246874_0001_01_000004")); + when(mockContainer.getResource()) + .thenReturn(Resource.newInstance(156250, 2)); + Assert.assertNull(numaResourceAllocator.allocateNumaNodes(mockContainer)); + } + + @Test + public void testConcatStringCommands() { + // test one array of string as null + assertEquals(containerExecutor.concatStringCommands(null, new String[]{"hello"})[0], + new String[]{"hello"}[0]); + // test both array of string as null + Assert.assertNull(containerExecutor.concatStringCommands(null, null)); + // test case when both arrays are not null and of equal length + String[] res = containerExecutor.concatStringCommands(new String[]{"one"}, + new String[]{"two"}); + assertEquals(res[0]+res[1], "one" + "two"); + // test both array of different length + res = containerExecutor.concatStringCommands(new String[]{"one"}, + new String[]{"two", "three"}); + assertEquals(res[0] + res[1] + res[2], "one" + "two" + "three"); + + } + + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/TestNumaResourceAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/TestNumaResourceAllocator.java index c8072c327a9b6..42c495c88f8f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/TestNumaResourceAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/TestNumaResourceAllocator.java @@ -95,7 +95,7 @@ public void testReadNumaTopologyFromCmdOutput() throws Exception { + "1: 20 10"; numaResourceAllocator = new NumaResourceAllocator(mock(Context.class)) { @Override - String executeNGetCmdOutput(Configuration config) + public String executeNGetCmdOutput(Configuration config) throws YarnRuntimeException { return cmdOutput; } From 0d5151e57bf44baae0257f1b710d06cd4eaf66a2 Mon Sep 17 00:00:00 2001 From: Samrat002 Date: Thu, 25 Aug 2022 18:05:10 +0530 Subject: [PATCH 4/5] address to comments --- .../nodemanager/DefaultContainerExecutor.java | 41 +++++++++++-------- .../TestDefaultContainerExecutor.java | 2 + 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 6d18654ef5f56..705ef88dee357 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -95,6 +95,8 @@ public class DefaultContainerExecutor extends ContainerExecutor { private NumaResourceAllocator numaResourceAllocator; + + private String numactl; /** * Default constructor for use in testing. */ @@ -148,6 +150,8 @@ protected void setScriptExecutable(Path script, String owner) public void init(Context nmContext) throws IOException { if(numaAwarenessEnabled(getConf())) { numaResourceAllocator = new NumaResourceAllocator(nmContext); + numactl = this.getConf().get(YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD, + YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD); try { numaResourceAllocator.init(this.getConf()); LOG.info("NUMA resources allocation is enabled in DefaultContainer Executor," + @@ -420,7 +424,10 @@ protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, String[] command = getRunCommand(wrapperScriptPath, containerIdStr, user, pidFile, this.getConf(), resource); - command = concatStringCommands(command, numaCommands); + // check if numa commands are passed and append it as prefix commands + if(numaCommands != null && numaCommands.length!=0) { + command = concatStringCommands(numaCommands, command); + } LOG.info("launchContainer: {}", Arrays.toString(command)); return new ShellCommandExecutor( @@ -1116,8 +1123,7 @@ public void postComplete(final ContainerId containerId) { */ String[] getNumaCommands(NumaResourceAllocation resourceAllocation) { String[] numaCommand = new String[3]; - numaCommand[0] = this.getConf().get(YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD, - YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD); + numaCommand[0] = numactl; numaCommand[1] = "--interleave=" + String.join(",", resourceAllocation.getMemNodes()); numaCommand[2] = "--cpunodebind=" + String.join(",", resourceAllocation.getCpuNodes()); return numaCommand; @@ -1133,31 +1139,27 @@ String[] getNumaCommands(NumaResourceAllocation resourceAllocation) { String[] concatStringCommands(String[] firstStringArray, String[] secondStringArray) { if(firstStringArray == null && secondStringArray == null) { - return null; + return secondStringArray; } - int len = 0; - - if(firstStringArray != null){ - len = len + firstStringArray.length; + else if(firstStringArray == null || firstStringArray.length == 0) { + return secondStringArray; } - if(secondStringArray != null){ - len = len + secondStringArray.length; + else if(secondStringArray == null || secondStringArray.length == 0){ + return firstStringArray; } - if (len == 0) { - return new String[]{}; - } + int len = firstStringArray.length + secondStringArray.length; String[] ret = new String[len]; int idx = 0; - for (int i=0; firstStringArray !=null && i < firstStringArray.length; i++) { - ret[idx] = firstStringArray[i]; + for (String s : firstStringArray) { + ret[idx] = s; idx++; } - for (int i=0; secondStringArray !=null && i < secondStringArray.length; i++) { - ret[idx] = secondStringArray[i]; + for (String s : secondStringArray) { + ret[idx] = s; idx++; } return ret; @@ -1168,4 +1170,9 @@ public void setNumaResourceAllocator(NumaResourceAllocator numaResourceAllocator this.numaResourceAllocator = numaResourceAllocator; } + @VisibleForTesting + public void setNumactl(String numactl) { + this.numactl = numactl; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java index 3e5571964351e..7b05d33b61ac1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java @@ -827,6 +827,8 @@ private void testAllocateNumaResource(String containerId, Resource resource, when(mockContainer.getResource()).thenReturn(resource); NumaResourceAllocation numaResourceAllocation = numaResourceAllocator.allocateNumaNodes(mockContainer); + containerExecutor.setNumactl(containerExecutor.getConf().get(YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD, + YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD)); String[] commands = containerExecutor.getNumaCommands(numaResourceAllocation); assertEquals(Arrays.asList(commands), Arrays.asList("/usr/bin/numactl", "--interleave=" + memNodes, "--cpunodebind=" + cpuNodes)); From 125c50e4c70ee5ac0470a8df2d1fc38df40ade48 Mon Sep 17 00:00:00 2001 From: Samrat002 Date: Thu, 25 Aug 2022 18:23:33 +0530 Subject: [PATCH 5/5] Fix test case for reaquireContainer --- .../nodemanager/TestDefaultContainerExecutor.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java index 7b05d33b61ac1..473292f35f749 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java @@ -900,7 +900,7 @@ public void testReacquireContainer() throws Exception { ResourceMappings resourceMappings = new ResourceMappings(); AssignedResources assignedRscs = new AssignedResources(); when(mockContainer.getResource()) - .thenReturn(Resource.newInstance(142900, 2)); + .thenReturn(Resource.newInstance(147434, 2)); ContainerId cid = ContainerId.fromString("container_1481156246874_0001_01_000001"); when(mockContainer.getContainerId()).thenReturn(cid); NumaResourceAllocation numaResourceAllocation = @@ -923,11 +923,16 @@ public void testReacquireContainer() throws Exception { containerExecutor.reacquireContainer(containerReacquisitionContext); - // returns null since there are no sufficient resources available for the request + // reacquireContainer recovers all the numa resources , + // that should be free to use next + testAllocateNumaResource("container_1481156246874_0001_01_000001", + Resource.newInstance(147434, 2), "0,1", "1"); when(mockContainer.getContainerId()).thenReturn( ContainerId.fromString("container_1481156246874_0001_01_000004")); when(mockContainer.getResource()) - .thenReturn(Resource.newInstance(156250, 2)); + .thenReturn(Resource.newInstance(1024, 2)); + + // returns null since there are no sufficient resources available for the request Assert.assertNull(numaResourceAllocator.allocateNumaNodes(mockContainer)); }