From c5b57e83c57350e3e59db8f6396b4f1640c96453 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Fri, 22 Jul 2022 05:28:52 -0700 Subject: [PATCH 1/8] YARN-11161. Support getAttributesToNodes, getClusterNodeAttributes, getNodesToAttributes API's for Federation. --- .../yarn/server/router/RouterMetrics.java | 107 ++++++++++++++++-- .../clientrm/FederationClientInterceptor.java | 90 +++++++++++---- .../clientrm/RouterYarnClientUtils.java | 68 ++++++++++- .../yarn/server/router/TestRouterMetrics.java | 99 ++++++++++++++++ 4 files changed, 336 insertions(+), 28 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index 42a22600d2b8b..d6ce729b16522 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -91,6 +91,12 @@ public final class RouterMetrics { private MutableGaugeInt numGetResourceProfilesFailedRetrieved; @Metric("# of getResourceProfile failed to be retrieved") private MutableGaugeInt numGetResourceProfileFailedRetrieved; + @Metric("# of getAttributesToNodes failed to be retrieved") + private MutableGaugeInt numGetAttributesToNodesFailedRetrieved; + @Metric("# of getClusterNodeAttributes failed to be retrieved") + private MutableGaugeInt numGetClusterNodeAttributesFailedRetrieved; + @Metric("# of getNodesToAttributes failed to be retrieved") + private MutableGaugeInt numGetNodesToAttributesFailedRetrieved; // Aggregate metrics are shared, and don't have to be looked up per call @Metric("Total number of successful Submitted apps and latency(ms)") @@ -101,14 +107,11 @@ public final class RouterMetrics { private MutableRate totalSucceededAppsCreated; @Metric("Total number of successful Retrieved app reports and latency(ms)") private MutableRate totalSucceededAppsRetrieved; - @Metric("Total number of successful Retrieved multiple apps reports and " - + "latency(ms)") + @Metric("Total number of successful Retrieved multiple apps reports and latency(ms)") private MutableRate totalSucceededMultipleAppsRetrieved; - @Metric("Total number of successful Retrieved " + - "appAttempt reports and latency(ms)") + @Metric("Total number of successful Retrieved appAttempt reports and latency(ms)") private MutableRate totalSucceededAppAttemptsRetrieved; - @Metric("Total number of successful Retrieved getClusterMetrics and " - + "latency(ms)") + @Metric("Total number of successful Retrieved getClusterMetrics and latency(ms)") private MutableRate totalSucceededGetClusterMetricsRetrieved; @Metric("Total number of successful Retrieved getClusterNodes and latency(ms)") private MutableRate totalSucceededGetClusterNodesRetrieved; @@ -144,9 +147,14 @@ public final class RouterMetrics { private MutableRate totalSucceededMoveApplicationAcrossQueuesRetrieved; @Metric("Total number of successful Retrieved getResourceProfiles and latency(ms)") private MutableRate totalSucceededGetResourceProfilesRetrieved; - @Metric("Total number of successful Retrieved getResourceProfile and latency(ms)") private MutableRate totalSucceededGetResourceProfileRetrieved; + @Metric("Total number of successful Retrieved getAttributesToNodes and latency(ms)") + private MutableRate totalSucceededGetAttributesToNodesRetrieved; + @Metric("Total number of successful Retrieved getClusterNodeAttributes and latency(ms)") + private MutableRate totalSucceededGetClusterNodeAttributesRetrieved; + @Metric("Total number of successful Retrieved getNodesToAttributes and latency(ms)") + private MutableRate totalSucceededGetNodesToAttributesRetrieved; /** * Provide quantile counters for all latencies. @@ -176,6 +184,10 @@ public final class RouterMetrics { private MutableQuantiles moveApplicationAcrossQueuesLatency; private MutableQuantiles getResourceProfilesLatency; private MutableQuantiles getResourceProfileLatency; + private MutableQuantiles getAttributesToNodesLatency; + private MutableQuantiles getClusterNodeAttributesLatency; + + private MutableQuantiles getNodesToAttributesLatency; private static volatile RouterMetrics instance = null; private static MetricsRegistry registry; @@ -274,6 +286,18 @@ private RouterMetrics() { getResourceProfileLatency = registry.newQuantiles("getResourceProfileLatency", "latency of get resource profile timeouts", "ops", "latency", 10); + + getAttributesToNodesLatency = + registry.newQuantiles("getAttributesToNodesLatency", + "latency of get attributes to nodes timeouts", "ops", "latency", 10); + + getClusterNodeAttributesLatency = + registry.newQuantiles("getClusterNodeAttributesLatency", + "latency of get cluster node attributes timeouts", "ops", "latency", 10); + + getNodesToAttributesLatency = + registry.newQuantiles("getNodesToAttributesLatency", + "latency of get nodes to attributes timeouts", "ops", "latency", 10); } public static RouterMetrics getMetrics() { @@ -420,6 +444,21 @@ public long getNumSucceededGetResourceProfileRetrieved() { return totalSucceededGetResourceProfileRetrieved.lastStat().numSamples(); } + @VisibleForTesting + public long getNumSucceededGetAttributesToNodesRetrieved() { + return totalSucceededGetAttributesToNodesRetrieved.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededGetClusterNodeAttributesRetrieved() { + return totalSucceededGetClusterNodeAttributesRetrieved.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededGetNodesToAttributesRetrieved() { + return totalSucceededGetNodesToAttributesRetrieved.lastStat().numSamples(); + } + @VisibleForTesting public double getLatencySucceededAppsCreated() { return totalSucceededAppsCreated.lastStat().mean(); @@ -545,6 +584,21 @@ public double getLatencySucceededGetResourceProfileRetrieved() { return totalSucceededGetResourceProfileRetrieved.lastStat().mean(); } + @VisibleForTesting + public double getLatencySucceededGetAttributesToNodesRetrieved() { + return totalSucceededGetAttributesToNodesRetrieved.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededGetClusterNodeAttributesRetrieved() { + return totalSucceededGetClusterNodeAttributesRetrieved.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededGetNodesToAttributesRetrieved() { + return totalSucceededGetNodesToAttributesRetrieved.lastStat().mean(); + } + @VisibleForTesting public int getAppsFailedCreated() { return numAppsFailedCreated.value(); @@ -666,6 +720,18 @@ public int getResourceProfileFailedRetrieved() { return numGetResourceProfileFailedRetrieved.value(); } + public int getAttributesToNodesFailedRetrieved() { + return numGetAttributesToNodesFailedRetrieved.value(); + } + + public int getClusterNodeAttributesFailedRetrieved() { + return numGetClusterNodeAttributesFailedRetrieved.value(); + } + + public int getNodesToAttributesFailedRetrieved() { + return numGetNodesToAttributesFailedRetrieved.value(); + } + public void succeededAppsCreated(long duration) { totalSucceededAppsCreated.add(duration); getNewApplicationLatency.add(duration); @@ -791,6 +857,21 @@ public void succeededGetResourceProfileRetrieved(long duration) { getResourceProfileLatency.add(duration); } + public void succeededGetAttributesToNodesRetrieved(long duration) { + totalSucceededGetAttributesToNodesRetrieved.add(duration); + getAttributesToNodesLatency.add(duration); + } + + public void succeededGetClusterNodeAttributesRetrieved(long duration) { + totalSucceededGetClusterNodeAttributesRetrieved.add(duration); + getClusterNodeAttributesLatency.add(duration); + } + + public void succeededGetNodesToAttributesRetrieved(long duration) { + totalSucceededGetNodesToAttributesRetrieved.add(duration); + getNodesToAttributesLatency.add(duration); + } + public void incrAppsFailedCreated() { numAppsFailedCreated.incr(); } @@ -890,4 +971,16 @@ public void incrGetResourceProfilesFailedRetrieved() { public void incrGetResourceProfileFailedRetrieved() { numGetResourceProfileFailedRetrieved.incr(); } + + public void incrGetAttributesToNodesFailedRetrieved() { + numGetAttributesToNodesFailedRetrieved.incr(); + } + + public void incrGetClusterNodeAttributesFailedRetrieved() { + numGetClusterNodeAttributesFailedRetrieved.incr(); + } + + public void incrGetNodesToAttributesFailedRetrieved() { + numGetNodesToAttributesFailedRetrieved.incr(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index 7e7bd5afcf5f7..e1bcc4d154b6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -175,7 +175,6 @@ public void init(String userName) { federationFacade = FederationStateStoreFacade.getInstance(); rand = new Random(System.currentTimeMillis()); - int numThreads = getConf().getInt( YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE, YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREADS_SIZE); @@ -197,10 +196,9 @@ public void init(String userName) { numSubmitRetries = conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY, - YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY); + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY); - clientRMProxies = - new ConcurrentHashMap(); + clientRMProxies = new ConcurrentHashMap<>(); routerMetrics = RouterMetrics.getMetrics(); returnPartialReport = conf.getBoolean( @@ -227,19 +225,17 @@ protected ApplicationClientProtocol getClientRMProxyForSubCluster( ApplicationClientProtocol clientRMProxy = null; try { boolean serviceAuthEnabled = getConf().getBoolean( - CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false); + CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false); UserGroupInformation realUser = user; if (serviceAuthEnabled) { - realUser = UserGroupInformation.createProxyUser( - user.getShortUserName(), UserGroupInformation.getLoginUser()); + realUser = UserGroupInformation.createProxyUser(user.getShortUserName(), + UserGroupInformation.getLoginUser()); } clientRMProxy = FederationProxyProviderUtil.createRMProxy(getConf(), ApplicationClientProtocol.class, subClusterId, realUser); } catch (Exception e) { RouterServerUtil.logAndThrowException( - "Unable to create the interface to reach the SubCluster " - + subClusterId, - e); + "Unable to create the interface to reach the SubCluster " + subClusterId, e); } clientRMProxies.put(subClusterId, clientRMProxy); @@ -287,8 +283,7 @@ public GetNewApplicationResponse getNewApplication( for (int i = 0; i < numSubmitRetries; ++i) { SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive); - LOG.debug( - "getNewApplication try #{} on SubCluster {}", i, subClusterId); + LOG.debug("getNewApplication try #{} on SubCluster {}", i, subClusterId); ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); GetNewApplicationResponse response = null; @@ -410,7 +405,7 @@ public SubmitApplicationResponse submitApplication( ApplicationId applicationId = request.getApplicationSubmissionContext().getApplicationId(); - List blacklist = new ArrayList(); + List blacklist = new ArrayList<>(); for (int i = 0; i < numSubmitRetries; ++i) { @@ -561,8 +556,8 @@ public KillApplicationResponse forceKillApplication( } if (response == null) { - LOG.error("No response when attempting to kill the application " - + applicationId + " to SubCluster " + subClusterId.getId()); + LOG.error("No response when attempting to kill the application {} to SubCluster {}.", + applicationId, subClusterId.getId()); } long stopTime = clock.getTime(); @@ -1528,20 +1523,75 @@ public void shutdown() { @Override public GetAttributesToNodesResponse getAttributesToNodes( GetAttributesToNodesRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + if (request == null || request.getNodeAttributes() == null) { + routerMetrics.incrGetAttributesToNodesFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing getAttributesToNodes request " + + "or nodeAttributes.", null); + } + long startTime = clock.getTime(); + ClientMethod remoteMethod = new ClientMethod("getAttributesToNodes", + new Class[] {GetAttributesToNodesRequest.class}, new Object[] {request}); + Collection attributesToNodesResponses = null; + try { + attributesToNodesResponses = invokeAppClientProtocolMethod(true, remoteMethod, + GetAttributesToNodesResponse.class); + } catch (Exception ex) { + routerMetrics.incrGetAttributesToNodesFailedRetrieved(); + RouterServerUtil.logAndThrowException("Unable to get attributes to nodes due to exception.", + ex); + } + long stopTime = clock.getTime(); + routerMetrics.succeededGetAttributesToNodesRetrieved(stopTime - startTime); + return RouterYarnClientUtils.mergeAttributesToNodesResponse(attributesToNodesResponses); } @Override public GetClusterNodeAttributesResponse getClusterNodeAttributes( - GetClusterNodeAttributesRequest request) - throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + GetClusterNodeAttributesRequest request) throws YarnException, IOException { + if (request == null) { + routerMetrics.incrGetClusterNodeAttributesFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing getClusterNodeAttributes request.", null); + } + long startTime = clock.getTime(); + ClientMethod remoteMethod = new ClientMethod("getClusterNodeAttributes", + new Class[] {GetClusterNodeAttributesRequest.class}, new Object[] {request}); + Collection clusterNodeAttributesResponses = null; + try { + clusterNodeAttributesResponses = invokeAppClientProtocolMethod(true, remoteMethod, + GetClusterNodeAttributesResponse.class); + } catch (Exception ex) { + routerMetrics.incrGetClusterNodeAttributesFailedRetrieved(); + RouterServerUtil.logAndThrowException("Unable to get cluster node attributes due " + + " to exception.", ex); + } + long stopTime = clock.getTime(); + routerMetrics.succeededGetClusterNodeAttributesRetrieved(stopTime - startTime); + return RouterYarnClientUtils.mergeClusterNodeAttributesResponse(clusterNodeAttributesResponses); } @Override public GetNodesToAttributesResponse getNodesToAttributes( GetNodesToAttributesRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + if (request == null || request.getHostNames() == null) { + routerMetrics.incrGetNodesToAttributesFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing getNodesToAttributes request or " + + "hostNames.", null); + } + long startTime = clock.getTime(); + ClientMethod remoteMethod = new ClientMethod("getNodesToAttributes", + new Class[] {GetNodesToAttributesRequest.class}, new Object[] {request}); + Collection nodesToAttributesResponses = null; + try { + nodesToAttributesResponses = invokeAppClientProtocolMethod(true, remoteMethod, + GetNodesToAttributesResponse.class); + } catch (Exception ex) { + routerMetrics.incrGetNodesToAttributesFailedRetrieved(); + RouterServerUtil.logAndThrowException("Unable to get nodes to attributes due " + + " to exception.", ex); + } + long stopTime = clock.getTime(); + routerMetrics.succeededGetNodesToAttributesRetrieved(stopTime - startTime); + return RouterYarnClientUtils.mergeNodesToAttributesResponse(nodesToAttributesResponses); } protected SubClusterId getApplicationHomeSubCluster( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java index d72e72a6cff17..80f3ee452594e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java @@ -37,6 +37,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; @@ -49,9 +52,12 @@ import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.NodeAttributeKey; +import org.apache.hadoop.yarn.api.records.NodeToAttributeValue; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeInfo; import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; import org.apache.hadoop.yarn.util.Records; -import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -468,5 +474,65 @@ public static GetResourceProfileResponse mergeClusterResourceProfileResponse( profileResponse.setResource(resource); return profileResponse; } + + /** + * Merges a list of GetAttributesToNodesResponse. + * + * @param responses a list of GetAttributesToNodesResponse to merge. + * @return the merged GetAttributesToNodesResponse. + */ + public static GetAttributesToNodesResponse mergeAttributesToNodesResponse( + Collection responses) { + GetAttributesToNodesResponse attributesToNodesResponse = + GetAttributesToNodesResponse.newInstance(new HashMap<>()); + Map> nodeAttributeMap = new HashMap<>(); + for (GetAttributesToNodesResponse response : responses) { + if (response != null && response.getAttributesToNodes() != null) { + nodeAttributeMap.putAll(response.getAttributesToNodes()); + } + } + attributesToNodesResponse.setAttributeToNodes(nodeAttributeMap); + return attributesToNodesResponse; + } + + /** + * Merges a list of GetClusterNodeAttributesResponse. + * + * @param responses a list of GetClusterNodeAttributesResponse to merge. + * @return the merged GetClusterNodeAttributesResponse. + */ + public static GetClusterNodeAttributesResponse mergeClusterNodeAttributesResponse( + Collection responses) { + GetClusterNodeAttributesResponse attributesResponse = + GetClusterNodeAttributesResponse.newInstance(new HashSet<>()); + Set nodeAttributeInfo = new HashSet<>(); + for (GetClusterNodeAttributesResponse response : responses) { + if (response != null && response.getNodeAttributes() != null) { + nodeAttributeInfo.addAll(response.getNodeAttributes()); + } + } + attributesResponse.setNodeAttributes(nodeAttributeInfo); + return attributesResponse; + } + + /** + * Merges a list of GetNodesToAttributesResponse. + * + * @param responses a list of GetNodesToAttributesResponse to merge. + * @return the merged GetNodesToAttributesResponse. + */ + public static GetNodesToAttributesResponse mergeNodesToAttributesResponse( + Collection responses) { + GetNodesToAttributesResponse attributesResponse = + GetNodesToAttributesResponse.newInstance(new HashMap<>()); + Map> attributesMap = new HashMap<>(); + for (GetNodesToAttributesResponse response : responses) { + if (response != null && response.getNodeToAttributes() != null) { + attributesMap.putAll(response.getNodeToAttributes()); + } + } + attributesResponse.setNodeToAttributes(attributesMap); + return attributesResponse; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java index 61fcd5385a40c..455cb229e99da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java @@ -438,6 +438,21 @@ public void getResourceProfileFailed() { LOG.info("Mocked: failed getResourceProfileFailed call"); metrics.incrGetResourceProfileFailedRetrieved(); } + + public void getAttributesToNodesFailed() { + LOG.info("Mocked: failed getAttributesToNodesFailed call"); + metrics.incrGetAttributesToNodesFailedRetrieved(); + } + + public void getClusterNodeAttributesFailed() { + LOG.info("Mocked: failed getClusterNodeAttributesFailed call"); + metrics.incrGetClusterNodeAttributesFailedRetrieved(); + } + + public void getNodesToAttributesFailed() { + LOG.info("Mocked: failed getNodesToAttributesFailed call"); + metrics.incrGetNodesToAttributesFailedRetrieved(); + } } // Records successes for all calls @@ -573,6 +588,21 @@ public void getResourceProfileRetrieved(long duration) { LOG.info("Mocked: successful getResourceProfile call with duration {}", duration); metrics.succeededGetResourceProfileRetrieved(duration); } + + public void getAttributesToNodesRetrieved(long duration) { + LOG.info("Mocked: successful getAttributesToNodes call with duration {}", duration); + metrics.succeededGetAttributesToNodesRetrieved(duration); + } + + public void getClusterNodeAttributesRetrieved(long duration) { + LOG.info("Mocked: successful getClusterNodeAttributes call with duration {}", duration); + metrics.succeededGetClusterNodeAttributesRetrieved(duration); + } + + public void getNodesToAttributesRetrieved(long duration) { + LOG.info("Mocked: successful getNodesToAttributes call with duration {}", duration); + metrics.succeededGetNodesToAttributesRetrieved(duration); + } } @Test @@ -970,4 +1000,73 @@ public void testGetResourceProfileRetrievedFailed() { Assert.assertEquals(totalBadBefore + 1, metrics.getResourceProfileFailedRetrieved()); } + + @Test + public void testSucceededGetAttributesToNodesRetrieved() { + long totalGoodBefore = metrics.getNumSucceededGetAttributesToNodesRetrieved(); + goodSubCluster.getAttributesToNodesRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededGetAttributesToNodesRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededGetAttributesToNodesRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getAttributesToNodesRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededGetAttributesToNodesRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededGetAttributesToNodesRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testGetAttributesToNodesRetrievedFailed() { + long totalBadBefore = metrics.getAttributesToNodesFailedRetrieved(); + badSubCluster.getAttributesToNodesFailed(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getAttributesToNodesFailedRetrieved()); + } + + @Test + public void testGetClusterNodeAttributesRetrieved() { + long totalGoodBefore = metrics.getNumSucceededGetClusterNodeAttributesRetrieved(); + goodSubCluster.getClusterNodeAttributesRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededGetClusterNodeAttributesRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededGetClusterNodeAttributesRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getClusterNodeAttributesRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededGetClusterNodeAttributesRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededGetClusterNodeAttributesRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testGetClusterNodeAttributesRetrievedFailed() { + long totalBadBefore = metrics.getClusterNodeAttributesFailedRetrieved(); + badSubCluster.getClusterNodeAttributesFailed(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getClusterNodeAttributesFailedRetrieved()); + } + + @Test + public void testGetNodesToAttributesRetrieved() { + long totalGoodBefore = metrics.getNumSucceededGetNodesToAttributesRetrieved(); + goodSubCluster.getNodesToAttributesRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededGetNodesToAttributesRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededGetNodesToAttributesRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getNodesToAttributesRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededGetNodesToAttributesRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededGetNodesToAttributesRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testGetNodesToAttributesRetrievedFailed() { + long totalBadBefore = metrics.getNodesToAttributesFailedRetrieved(); + badSubCluster.getNodesToAttributesFailed(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getNodesToAttributesFailedRetrieved()); + } } From fc85c272efde28e4086233ec282b4a9fe193058d Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Fri, 22 Jul 2022 07:51:21 -0700 Subject: [PATCH 2/8] YARN-11161. Support getAttributesToNodes, getClusterNodeAttributes, getNodesToAttributes API's for Federation. --- .../clientrm/TestRouterYarnClientUtils.java | 103 ++++++++++++++++++ 1 file changed, 103 insertions(+) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java index 0ab2e8a7e5ce5..a0d88e049597b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java @@ -37,6 +37,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -53,6 +55,11 @@ import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; +import org.apache.hadoop.yarn.api.records.NodeAttributeKey; +import org.apache.hadoop.yarn.api.records.NodeToAttributeValue; +import org.apache.hadoop.yarn.api.records.NodeAttributeInfo; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; import org.junit.Assert; @@ -610,4 +617,100 @@ public void testMergeResourceProfile() { Assert.assertEquals(3, resource.getVirtualCores()); Assert.assertEquals(3072, resource.getMemorySize()); } + + @Test + public void testMergeAttributesToNodesResponse() { + // normal response1 + NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU", + NodeAttributeType.STRING, "nvidia"); + Map> map1 = new HashMap<>(); + List lists1 = new ArrayList<>(); + NodeToAttributeValue attributeValue1 = NodeToAttributeValue.newInstance("node1", gpu.getAttributeValue()); + lists1.add(attributeValue1); + map1.put(gpu.getAttributeKey(), lists1); + GetAttributesToNodesResponse response1 = GetAttributesToNodesResponse.newInstance(map1); + + // normal response2 + NodeAttribute docker = NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER", + NodeAttributeType.STRING, "docker0"); + Map> map2 = new HashMap<>(); + List lists2 = new ArrayList<>(); + NodeToAttributeValue attributeValue2 = NodeToAttributeValue.newInstance("node2", docker.getAttributeValue()); + lists2.add(attributeValue2); + map2.put(docker.getAttributeKey(), lists2); + GetAttributesToNodesResponse response2 = GetAttributesToNodesResponse.newInstance(map2); + + // empty response3 + GetAttributesToNodesResponse response3 = GetAttributesToNodesResponse.newInstance(new HashMap<>()); + + // null response4 + GetAttributesToNodesResponse response4 = null; + + List responses = new ArrayList<>(); + responses.add(response1); + responses.add(response2); + responses.add(response3); + responses.add(response4); + + GetAttributesToNodesResponse response = + RouterYarnClientUtils.mergeAttributesToNodesResponse(responses); + + Assert.assertNotNull(response); + Assert.assertEquals(2, response.getAttributesToNodes().size()); + + Map> attrs = response.getAttributesToNodes(); + Assert.assertTrue(findHostnameAndValInMapping("node2", "docker0", + attrs.get(docker.getAttributeKey()))); + } + + @Test + public void testMergeClusterNodeAttributesResponse() { + // normal response1 + NodeAttributeInfo nodeAttributeInfo1 = + NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("GPU"), NodeAttributeType.STRING); + Set attributes1 = new HashSet<>(); + attributes1.add(nodeAttributeInfo1); + GetClusterNodeAttributesResponse response1 = GetClusterNodeAttributesResponse.newInstance(attributes1); + + // normal response2 + NodeAttributeInfo nodeAttributeInfo2 = + NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("CPU"), NodeAttributeType.STRING); + Set attributes2 = new HashSet<>(); + attributes2.add(nodeAttributeInfo2); + GetClusterNodeAttributesResponse response2 = GetClusterNodeAttributesResponse.newInstance(attributes2); + + // empty response3 + GetClusterNodeAttributesResponse response3 = GetClusterNodeAttributesResponse.newInstance(new HashSet<>()); + + // null response4 + GetClusterNodeAttributesResponse response4 = null; + + List responses = new ArrayList<>(); + responses.add(response1); + responses.add(response2); + responses.add(response3); + responses.add(response4); + + GetClusterNodeAttributesResponse response = + RouterYarnClientUtils.mergeClusterNodeAttributesResponse(responses); + + Assert.assertNotNull(response); + + Set nodeAttributeInfos = response.getNodeAttributes(); + Assert.assertEquals(2, nodeAttributeInfos.size()); + + Object[] objectArr = nodeAttributeInfos.toArray(); + Assert.assertEquals("rm.yarn.io/GPU(STRING)", objectArr[0].toString()); + Assert.assertEquals("rm.yarn.io/CPU(STRING)", objectArr[1].toString()); + } + + private boolean findHostnameAndValInMapping(String hostname, String attrVal, + List mappingVals) { + for (NodeToAttributeValue value : mappingVals) { + if (value.getHostname().equals(hostname)) { + return attrVal.equals(value.getAttributeValue()); + } + } + return false; + } } From df1db00f4abaf58ea8eb61c57280977b169ac13a Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Fri, 22 Jul 2022 15:19:14 -0700 Subject: [PATCH 3/8] YARN-11161. Support getAttributesToNodes, getClusterNodeAttributes, getNodesToAttributes API's for Federation. --- .../clientrm/FederationClientInterceptor.java | 3 +- .../clientrm/RouterYarnClientUtils.java | 15 +---- .../clientrm/TestRouterYarnClientUtils.java | 67 +++++++++++++++---- 3 files changed, 58 insertions(+), 27 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index e1bcc4d154b6f..d2214422390c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -195,7 +195,8 @@ public void init(String userName) { } numSubmitRetries = - conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY, + conf.getInt( + YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY, YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY); clientRMProxies = new ConcurrentHashMap<>(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java index 80f3ee452594e..e70d5521ffcf7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java @@ -483,16 +483,13 @@ public static GetResourceProfileResponse mergeClusterResourceProfileResponse( */ public static GetAttributesToNodesResponse mergeAttributesToNodesResponse( Collection responses) { - GetAttributesToNodesResponse attributesToNodesResponse = - GetAttributesToNodesResponse.newInstance(new HashMap<>()); Map> nodeAttributeMap = new HashMap<>(); for (GetAttributesToNodesResponse response : responses) { if (response != null && response.getAttributesToNodes() != null) { nodeAttributeMap.putAll(response.getAttributesToNodes()); } } - attributesToNodesResponse.setAttributeToNodes(nodeAttributeMap); - return attributesToNodesResponse; + return GetAttributesToNodesResponse.newInstance(nodeAttributeMap); } /** @@ -503,16 +500,13 @@ public static GetAttributesToNodesResponse mergeAttributesToNodesResponse( */ public static GetClusterNodeAttributesResponse mergeClusterNodeAttributesResponse( Collection responses) { - GetClusterNodeAttributesResponse attributesResponse = - GetClusterNodeAttributesResponse.newInstance(new HashSet<>()); Set nodeAttributeInfo = new HashSet<>(); for (GetClusterNodeAttributesResponse response : responses) { if (response != null && response.getNodeAttributes() != null) { nodeAttributeInfo.addAll(response.getNodeAttributes()); } } - attributesResponse.setNodeAttributes(nodeAttributeInfo); - return attributesResponse; + return GetClusterNodeAttributesResponse.newInstance(nodeAttributeInfo); } /** @@ -523,16 +517,13 @@ public static GetClusterNodeAttributesResponse mergeClusterNodeAttributesRespons */ public static GetNodesToAttributesResponse mergeNodesToAttributesResponse( Collection responses) { - GetNodesToAttributesResponse attributesResponse = - GetNodesToAttributesResponse.newInstance(new HashMap<>()); Map> attributesMap = new HashMap<>(); for (GetNodesToAttributesResponse response : responses) { if (response != null && response.getNodeToAttributes() != null) { attributesMap.putAll(response.getNodeToAttributes()); } } - attributesResponse.setNodeToAttributes(attributesMap); - return attributesResponse; + return GetNodesToAttributesResponse.newInstance(attributesMap); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java index a0d88e049597b..b51977cf4069a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -659,8 +660,12 @@ public void testMergeAttributesToNodesResponse() { Assert.assertEquals(2, response.getAttributesToNodes().size()); Map> attrs = response.getAttributesToNodes(); - Assert.assertTrue(findHostnameAndValInMapping("node2", "docker0", - attrs.get(docker.getAttributeKey()))); + + NodeAttributeKey gpuKey = gpu.getAttributeKey(); + Assert.assertEquals(attributeValue1.toString(), attrs.get(gpuKey).get(0).toString()); + + NodeAttributeKey dockerKey = docker.getAttributeKey(); + Assert.assertEquals(attributeValue2.toString(), attrs.get(dockerKey).get(0).toString()); } @Test @@ -698,19 +703,53 @@ public void testMergeClusterNodeAttributesResponse() { Set nodeAttributeInfos = response.getNodeAttributes(); Assert.assertEquals(2, nodeAttributeInfos.size()); - - Object[] objectArr = nodeAttributeInfos.toArray(); - Assert.assertEquals("rm.yarn.io/GPU(STRING)", objectArr[0].toString()); - Assert.assertEquals("rm.yarn.io/CPU(STRING)", objectArr[1].toString()); + Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo1)); + Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo2)); } - private boolean findHostnameAndValInMapping(String hostname, String attrVal, - List mappingVals) { - for (NodeToAttributeValue value : mappingVals) { - if (value.getHostname().equals(hostname)) { - return attrVal.equals(value.getAttributeValue()); - } - } - return false; + @Test + public void testMergeNodesToAttributesResponse() { + // normal response1 + NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU", + NodeAttributeType.STRING, "nvida"); + NodeAttribute os = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "OS", + NodeAttributeType.STRING, "windows64"); + NodeAttribute dist = NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "VERSION", + NodeAttributeType.STRING, "3_0_2"); + Map> node1Map = new HashMap<>(); + node1Map.put("node1", ImmutableSet.of(gpu, os, dist)); + GetNodesToAttributesResponse response1 = GetNodesToAttributesResponse.newInstance(node1Map); + + // normal response2 + NodeAttribute docker = NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER", + NodeAttributeType.STRING, "docker0"); + Map> node2Map = new HashMap<>(); + node2Map.put("node2", ImmutableSet.of(docker)); + GetNodesToAttributesResponse response2 = GetNodesToAttributesResponse.newInstance(node2Map); + + // empty response3 + GetNodesToAttributesResponse response3 = GetNodesToAttributesResponse.newInstance(new HashMap<>()); + + // null response4 + GetNodesToAttributesResponse response4 = null; + + List responses = new ArrayList<>(); + responses.add(response1); + responses.add(response2); + responses.add(response3); + responses.add(response4); + + GetNodesToAttributesResponse response = + RouterYarnClientUtils.mergeNodesToAttributesResponse(responses); + + Assert.assertNotNull(response); + + Map> hostToAttrs = response.getNodeToAttributes(); + Assert.assertNotNull(hostToAttrs); + Assert.assertEquals(2, hostToAttrs.size()); + Assert.assertTrue(hostToAttrs.get("node1").contains(dist)); + Assert.assertTrue(hostToAttrs.get("node1").contains(gpu)); + Assert.assertTrue(hostToAttrs.get("node1").contains(os)); + Assert.assertTrue(hostToAttrs.get("node2").contains(docker)); } } From d1cde4f6e53bd0fa8d9267cf26d678114de0ae13 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Fri, 22 Jul 2022 15:49:08 -0700 Subject: [PATCH 4/8] YARN-11161. Fix CheckStyle. --- .../clientrm/FederationClientInterceptor.java | 4 ++-- .../TestFederationClientInterceptor.java | 17 +++++++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index d2214422390c5..1d9834814ec9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -1011,7 +1011,7 @@ public GetLabelsToNodesResponse getLabelsToNodes( } long startTime = clock.getTime(); ClientMethod remoteMethod = new ClientMethod("getLabelsToNodes", - new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request}); + new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request}); Collection labelNodes; try { labelNodes = invokeAppClientProtocolMethod(true, remoteMethod, @@ -1036,7 +1036,7 @@ public GetClusterNodeLabelsResponse getClusterNodeLabels( } long startTime = clock.getTime(); ClientMethod remoteMethod = new ClientMethod("getClusterNodeLabels", - new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request}); + new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request}); Collection nodeLabels; try { nodeLabels = invokeAppClientProtocolMethod(true, remoteMethod, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index 64995ca1dac93..a51d014d901bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -82,6 +82,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -1233,4 +1235,19 @@ public void testGetResourceProfile() throws Exception { Assert.assertEquals(4096, response3.getResource().getMemorySize()); Assert.assertEquals(4, response3.getResource().getVirtualCores()); } + + @Test + public void testGetAttributesToNodes() throws Exception { + LOG.info("Test FederationClientInterceptor : Get AttributesToNodes request."); + + // null request + LambdaTestUtils.intercept(YarnException.class, "Missing getAttributesToNodes request " + + "or nodeAttributes.", () -> interceptor.getAttributesToNodes(null)); + + // normal request + GetAttributesToNodesResponse response = + interceptor.getAttributesToNodes(GetAttributesToNodesRequest.newInstance()); + + Assert.assertNotNull(response); + } } From 2a76b3a59314a97541db611924d81e678f5a7b6d Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Sat, 23 Jul 2022 02:00:39 -0700 Subject: [PATCH 5/8] YARN-11161. Fix CheckStyle. --- .../clientrm/TestRouterYarnClientUtils.java | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java index b51977cf4069a..05a3a254f3648 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java @@ -626,7 +626,8 @@ public void testMergeAttributesToNodesResponse() { NodeAttributeType.STRING, "nvidia"); Map> map1 = new HashMap<>(); List lists1 = new ArrayList<>(); - NodeToAttributeValue attributeValue1 = NodeToAttributeValue.newInstance("node1", gpu.getAttributeValue()); + NodeToAttributeValue attributeValue1 = + NodeToAttributeValue.newInstance("node1", gpu.getAttributeValue()); lists1.add(attributeValue1); map1.put(gpu.getAttributeKey(), lists1); GetAttributesToNodesResponse response1 = GetAttributesToNodesResponse.newInstance(map1); @@ -636,13 +637,15 @@ public void testMergeAttributesToNodesResponse() { NodeAttributeType.STRING, "docker0"); Map> map2 = new HashMap<>(); List lists2 = new ArrayList<>(); - NodeToAttributeValue attributeValue2 = NodeToAttributeValue.newInstance("node2", docker.getAttributeValue()); + NodeToAttributeValue attributeValue2 = + NodeToAttributeValue.newInstance("node2", docker.getAttributeValue()); lists2.add(attributeValue2); map2.put(docker.getAttributeKey(), lists2); GetAttributesToNodesResponse response2 = GetAttributesToNodesResponse.newInstance(map2); // empty response3 - GetAttributesToNodesResponse response3 = GetAttributesToNodesResponse.newInstance(new HashMap<>()); + GetAttributesToNodesResponse response3 = + GetAttributesToNodesResponse.newInstance(new HashMap<>()); // null response4 GetAttributesToNodesResponse response4 = null; @@ -672,20 +675,25 @@ public void testMergeAttributesToNodesResponse() { public void testMergeClusterNodeAttributesResponse() { // normal response1 NodeAttributeInfo nodeAttributeInfo1 = - NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("GPU"), NodeAttributeType.STRING); + NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("GPU"), + NodeAttributeType.STRING); Set attributes1 = new HashSet<>(); attributes1.add(nodeAttributeInfo1); - GetClusterNodeAttributesResponse response1 = GetClusterNodeAttributesResponse.newInstance(attributes1); + GetClusterNodeAttributesResponse response1 = + GetClusterNodeAttributesResponse.newInstance(attributes1); // normal response2 NodeAttributeInfo nodeAttributeInfo2 = - NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("CPU"), NodeAttributeType.STRING); + NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("CPU"), + NodeAttributeType.STRING); Set attributes2 = new HashSet<>(); attributes2.add(nodeAttributeInfo2); - GetClusterNodeAttributesResponse response2 = GetClusterNodeAttributesResponse.newInstance(attributes2); + GetClusterNodeAttributesResponse response2 = + GetClusterNodeAttributesResponse.newInstance(attributes2); // empty response3 - GetClusterNodeAttributesResponse response3 = GetClusterNodeAttributesResponse.newInstance(new HashSet<>()); + GetClusterNodeAttributesResponse response3 = + GetClusterNodeAttributesResponse.newInstance(new HashSet<>()); // null response4 GetClusterNodeAttributesResponse response4 = null; @@ -728,7 +736,8 @@ public void testMergeNodesToAttributesResponse() { GetNodesToAttributesResponse response2 = GetNodesToAttributesResponse.newInstance(node2Map); // empty response3 - GetNodesToAttributesResponse response3 = GetNodesToAttributesResponse.newInstance(new HashMap<>()); + GetNodesToAttributesResponse response3 = + GetNodesToAttributesResponse.newInstance(new HashMap<>()); // null response4 GetNodesToAttributesResponse response4 = null; From 0ed079c9a7be4379c3b3050f8e6da9ad92e66cf5 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Sun, 24 Jul 2022 04:07:44 -0700 Subject: [PATCH 6/8] YARN-11161. Fix CheckStyle. --- .../server/router/clientrm/FederationClientInterceptor.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index 1d9834814ec9f..b9254aefd142a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -194,8 +194,7 @@ public void init(String userName) { LOG.error(e.getMessage()); } - numSubmitRetries = - conf.getInt( + numSubmitRetries = conf.getInt( YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY, YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY); From 81edae4b18c771f7ad8476ea0d1c47fa89afb50a Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Sun, 24 Jul 2022 16:34:58 -0700 Subject: [PATCH 7/8] YARN-11161. Add Junit Test. --- .../TestFederationClientInterceptor.java | 70 +++++++++++++++++++ .../TestableFederationClientInterceptor.java | 34 +++++++++ 2 files changed, 104 insertions(+) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index a51d014d901bc..6d9d4ae84174d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -84,6 +84,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -98,6 +102,11 @@ import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.NodeAttributeKey; +import org.apache.hadoop.yarn.api.records.NodeToAttributeValue; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeInfo; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; @@ -1249,5 +1258,66 @@ public void testGetAttributesToNodes() throws Exception { interceptor.getAttributesToNodes(GetAttributesToNodesRequest.newInstance()); Assert.assertNotNull(response); + Map> attributesToNodes = response.getAttributesToNodes(); + Assert.assertNotNull(attributesToNodes); + Assert.assertEquals(4, attributesToNodes.size()); + + NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU", + NodeAttributeType.STRING, "nvidia"); + NodeToAttributeValue attributeValue1 = + NodeToAttributeValue.newInstance("0-host1", gpu.getAttributeValue()); + NodeAttributeKey gpuKey = gpu.getAttributeKey(); + Assert.assertTrue(attributesToNodes.get(gpuKey).contains(attributeValue1)); + } + + @Test + public void testClusterNodeAttributes() throws Exception { + LOG.info("Test FederationClientInterceptor : Get ClusterNodeAttributes request."); + + // null request + LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodeAttributes request.", + () -> interceptor.getClusterNodeAttributes(null)); + + // normal request + GetClusterNodeAttributesResponse response = + interceptor.getClusterNodeAttributes(GetClusterNodeAttributesRequest.newInstance()); + + Assert.assertNotNull(response); + Set nodeAttributeInfos = response.getNodeAttributes(); + Assert.assertNotNull(nodeAttributeInfos); + Assert.assertEquals(4, nodeAttributeInfos.size()); + + NodeAttributeInfo nodeAttributeInfo1 = + NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("GPU"), + NodeAttributeType.STRING); + Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo1)); + + NodeAttributeInfo nodeAttributeInfo2 = + NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("OS"), + NodeAttributeType.STRING); + Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo2)); + } + + @Test + public void testNodesToAttributes() throws Exception { + LOG.info("Test FederationClientInterceptor : Get NodesToAttributes request."); + + // null request + LambdaTestUtils.intercept(YarnException.class, "Missing getNodesToAttributes request or hostNames.", + () -> interceptor.getNodesToAttributes(null)); + + // normal request + Set hostNames = Collections.singleton("0-host1"); + GetNodesToAttributesResponse response = + interceptor.getNodesToAttributes(GetNodesToAttributesRequest.newInstance(hostNames)); + Assert.assertNotNull(response); + + Map> nodeAttributeMap = response.getNodeToAttributes(); + Assert.assertNotNull(nodeAttributeMap); + Assert.assertEquals(1, nodeAttributeMap.size()); + + NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU", + NodeAttributeType.STRING, "nvida"); + Assert.assertTrue(nodeAttributeMap.get("0-host1").contains(gpu)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java index af1f45924c19c..04215305779cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java @@ -24,12 +24,19 @@ import java.net.ConnectException; import java.util.ArrayList; import java.util.List; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -82,6 +89,7 @@ protected ApplicationClientProtocol getClientRMProxyForSubCluster( } mockRMs.put(subClusterId, mockRM); } + initNodeAttributes(subClusterId,mockRM); return mockRM.getClientRMService(); } } @@ -127,4 +135,30 @@ public ConcurrentHashMap getMockRMs() { public ConcurrentHashMap getMockNMs() { return mockNMs; } + + private void initNodeAttributes(SubClusterId subClusterId, MockRM mockRM) { + String node1 = subClusterId.getId() +"-host1"; + String node2 = subClusterId.getId() +"-host2"; + NodeAttributesManager mgr = mockRM.getRMContext().getNodeAttributesManager(); + NodeAttribute gpu = + NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU", + NodeAttributeType.STRING, "nvidia"); + NodeAttribute os = + NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "OS", + NodeAttributeType.STRING, "windows64"); + NodeAttribute docker = + NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER", + NodeAttributeType.STRING, "docker0"); + NodeAttribute dist = + NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "VERSION", + NodeAttributeType.STRING, "3_0_2"); + Map> nodes = new HashMap<>(); + nodes.put(node1, ImmutableSet.of(gpu, os, dist)); + nodes.put(node2, ImmutableSet.of(docker, dist)); + try { + mgr.addNodeAttributes(nodes); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } From 1b1719ce418abc159664c408772ab718d8aaa146 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Sun, 24 Jul 2022 19:32:14 -0700 Subject: [PATCH 8/8] YARN-11161. Fix CheckStyle. --- .../clientrm/TestFederationClientInterceptor.java | 11 ++++++----- .../clientrm/TestableFederationClientInterceptor.java | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index cd0272f98aa3f..f0aa48082b899 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -1258,16 +1258,16 @@ public void testGetAttributesToNodes() throws Exception { interceptor.getAttributesToNodes(GetAttributesToNodesRequest.newInstance()); Assert.assertNotNull(response); - Map> attributesToNodes = response.getAttributesToNodes(); - Assert.assertNotNull(attributesToNodes); - Assert.assertEquals(4, attributesToNodes.size()); + Map> attrs = response.getAttributesToNodes(); + Assert.assertNotNull(attrs); + Assert.assertEquals(4, attrs.size()); NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU", NodeAttributeType.STRING, "nvidia"); NodeToAttributeValue attributeValue1 = NodeToAttributeValue.newInstance("0-host1", gpu.getAttributeValue()); NodeAttributeKey gpuKey = gpu.getAttributeKey(); - Assert.assertTrue(attributesToNodes.get(gpuKey).contains(attributeValue1)); + Assert.assertTrue(attrs.get(gpuKey).contains(attributeValue1)); } @Test @@ -1303,7 +1303,8 @@ public void testNodesToAttributes() throws Exception { LOG.info("Test FederationClientInterceptor : Get NodesToAttributes request."); // null request - LambdaTestUtils.intercept(YarnException.class, "Missing getNodesToAttributes request or hostNames.", + LambdaTestUtils.intercept(YarnException.class, + "Missing getNodesToAttributes request or hostNames.", () -> interceptor.getNodesToAttributes(null)); // normal request diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java index 04215305779cb..7c82476ec4767 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java @@ -89,7 +89,7 @@ protected ApplicationClientProtocol getClientRMProxyForSubCluster( } mockRMs.put(subClusterId, mockRM); } - initNodeAttributes(subClusterId,mockRM); + initNodeAttributes(subClusterId, mockRM); return mockRM.getClientRMService(); } }