diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java index 548f1a82f6d83..5c312b0217c1d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java @@ -22,10 +22,13 @@ import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.utils.AdjustableSemaphore; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +43,10 @@ public class AbstractRouterRpcFairnessPolicyController LoggerFactory.getLogger(AbstractRouterRpcFairnessPolicyController.class); /** Hash table to hold semaphore for each configured name service. */ - private Map permits; + private Map permits; + private final Map permitSizes = new HashMap<>(); + private Map rejectedPermitsPerNs; + private Map acceptedPermitsPerNs; public void init(Configuration conf) { this.permits = new HashMap<>(); @@ -71,7 +77,7 @@ public void shutdown() { } protected void insertNameServiceWithPermits(String nsId, int maxPermits) { - this.permits.put(nsId, new Semaphore(maxPermits)); + this.permits.put(nsId, new AdjustableSemaphore(maxPermits)); } protected int getAvailablePermits(String nsId) { @@ -81,7 +87,7 @@ protected int getAvailablePermits(String nsId) { @Override public String getAvailableHandlerOnPerNs() { JSONObject json = new JSONObject(); - for (Map.Entry entry : permits.entrySet()) { + for (Map.Entry entry : permits.entrySet()) { try { String nsId = entry.getKey(); int availableHandler = entry.getValue().availablePermits(); @@ -92,4 +98,39 @@ public String getAvailableHandlerOnPerNs() { } return json.toString(); } + + @Override + public String getPermitCapacityPerNs() { + JSONObject json = new JSONObject(); + for (Map.Entry entry : permitSizes.entrySet()) { + try { + json.put(entry.getKey(), entry.getValue()); + } catch (JSONException e) { + LOG.warn("Cannot put {} into JSONObject", entry.getKey(), e); + } + } + return json.toString(); + } + + @Override + public void setMetrics(Map rejectedPermits, + Map acceptedPermits) { + this.rejectedPermitsPerNs = rejectedPermits; + this.acceptedPermitsPerNs = acceptedPermits; + } + + protected Map getPermits() { + return permits; + } + + public Map getRejectedPermitsPerNs() { + return rejectedPermitsPerNs; + } + public Map getAcceptedPermitsPerNs() { + return acceptedPermitsPerNs; + } + + protected Map getPermitSizes() { + return permitSizes; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/DynamicRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/DynamicRouterRpcFairnessPolicyController.java new file mode 100644 index 0000000000000..899020b62b1cf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/DynamicRouterRpcFairnessPolicyController.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.fairness; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.utils.AdjustableSemaphore; +import org.apache.hadoop.util.concurrent.HadoopExecutors; + +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; + +/** + * Dynamic fairness policy extending {@link StaticRouterRpcFairnessPolicyController} + * and fetching handlers from configuration for all available name services. + * The handlers count changes according to traffic to namespaces. + * Total handlers might NOT strictly add up to the value defined by DFS_ROUTER_HANDLER_COUNT_KEY + * but will not exceed initial handler count + number of nameservices. + */ +public class DynamicRouterRpcFairnessPolicyController + extends StaticRouterRpcFairnessPolicyController { + + private static final Logger LOG = + LoggerFactory.getLogger(DynamicRouterRpcFairnessPolicyController.class); + + private static final ScheduledExecutorService SCHEDULED_EXECUTOR = HadoopExecutors + .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("DynamicRouterRpcFairnessPolicyControllerPermitsResizer").build()); + private PermitsResizerService permitsResizerService; + private ScheduledFuture refreshTask; + private int handlerCount; + private int minimumHandlerPerNs; + + /** + * Initializes using the same logic as {@link StaticRouterRpcFairnessPolicyController} + * and starts a periodic semaphore resizer thread. + * + * @param conf configuration + */ + public DynamicRouterRpcFairnessPolicyController(Configuration conf) { + super(conf); + minimumHandlerPerNs = conf.getInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY, + DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT); + handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT); + long refreshInterval = + conf.getTimeDuration(DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_KEY, + DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_DEFAULT, + TimeUnit.SECONDS); + permitsResizerService = new PermitsResizerService(); + refreshTask = SCHEDULED_EXECUTOR + .scheduleWithFixedDelay(permitsResizerService, refreshInterval, refreshInterval, + TimeUnit.SECONDS); + } + + @VisibleForTesting + public DynamicRouterRpcFairnessPolicyController(Configuration conf, long refreshInterval) { + super(conf); + minimumHandlerPerNs = conf.getInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY, + DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT); + handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT); + permitsResizerService = new PermitsResizerService(); + refreshTask = SCHEDULED_EXECUTOR + .scheduleWithFixedDelay(permitsResizerService, refreshInterval, refreshInterval, + TimeUnit.SECONDS); + } + + @VisibleForTesting + public void refreshPermitsCap() { + permitsResizerService.run(); + } + + @Override + public void shutdown() { + super.shutdown(); + if (refreshTask != null) { + refreshTask.cancel(true); + } + if (SCHEDULED_EXECUTOR != null) { + SCHEDULED_EXECUTOR.shutdown(); + } + } + + class PermitsResizerService implements Runnable { + + @Override + public synchronized void run() { + if ((getRejectedPermitsPerNs() == null) || (getAcceptedPermitsPerNs() == null)) { + return; + } + long totalOps = 0; + Map nsOps = new HashMap<>(); + for (Map.Entry entry : getPermits().entrySet()) { + long ops = (getRejectedPermitsPerNs().containsKey(entry.getKey()) ? + getRejectedPermitsPerNs().get(entry.getKey()).longValue() : + 0) + (getAcceptedPermitsPerNs().containsKey(entry.getKey()) ? + getAcceptedPermitsPerNs().get(entry.getKey()).longValue() : + 0); + nsOps.put(entry.getKey(), ops); + totalOps += ops; + } + + List underMinimumNss = new ArrayList<>(); + List overMinimumNss = new ArrayList<>(); + int effectiveOps = 0; + + // First iteration: split namespaces into those underused and those that are not. + for (Map.Entry entry : getPermits().entrySet()) { + String ns = entry.getKey(); + int newPermitCap = (int) Math.ceil((float) nsOps.get(ns) / totalOps * handlerCount); + + if (newPermitCap <= minimumHandlerPerNs) { + underMinimumNss.add(ns); + } else { + overMinimumNss.add(ns); + effectiveOps += nsOps.get(ns); + } + } + + // Second iteration part 1: assign minimum handlers + for (String ns: underMinimumNss) { + resizeNsHandlerCapacity(ns, minimumHandlerPerNs); + } + // Second iteration part 2: assign handlers to the rest + int leftoverPermits = handlerCount - minimumHandlerPerNs * underMinimumNss.size(); + for (String ns: overMinimumNss) { + int newPermitCap = (int) Math.ceil((float) nsOps.get(ns) / effectiveOps * leftoverPermits); + resizeNsHandlerCapacity(ns, newPermitCap); + } + } + + private void resizeNsHandlerCapacity(String ns, int newPermitCap) { + AdjustableSemaphore semaphore = getPermits().get(ns); + int oldPermitCap = getPermitSizes().get(ns); + if (newPermitCap <= minimumHandlerPerNs) { + newPermitCap = minimumHandlerPerNs; + } + getPermitSizes().put(ns, newPermitCap); + if (newPermitCap > oldPermitCap) { + semaphore.release(newPermitCap - oldPermitCap); + } else if (newPermitCap < oldPermitCap) { + semaphore.reducePermits(oldPermitCap - newPermitCap); + } + LOG.info("Resized handlers for nsId {} from {} to {}", ns, oldPermitCap, newPermitCap); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java index 3b85da59e1f52..6ce8ac2afd663 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hdfs.server.federation.fairness; +import java.util.Map; +import java.util.concurrent.atomic.LongAdder; + import org.apache.hadoop.conf.Configuration; /** @@ -51,4 +54,15 @@ public void shutdown() { public String getAvailableHandlerOnPerNs(){ return "N/A"; } + + @Override + public String getPermitCapacityPerNs() { + return "N/A"; + } + + @Override + public void setMetrics(Map rejectedPermitsPerNs, + Map acceptedPermitsPerNs) { + // Nothing + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java index 354383a168f4e..d6921add562cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hdfs.server.federation.fairness; +import java.util.Map; +import java.util.concurrent.atomic.LongAdder; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -67,4 +70,15 @@ public interface RouterRpcFairnessPolicyController { * Returns the JSON string of the available handler for each Ns. */ String getAvailableHandlerOnPerNs(); + + /** + * Returns the JSON string of the max handler count for each ns. + */ + String getPermitCapacityPerNs(); + + /** + * Attaches permits access metrics to the controller. + */ + void setMetrics(Map rejectedPermitsPerNs, + Map acceptedPermitsPerNs); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java index aa0777fc03d69..191ad46d96c91 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java @@ -27,6 +27,8 @@ import java.util.HashSet; import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX; @@ -45,6 +47,9 @@ public class StaticRouterRpcFairnessPolicyController extends public static final String ERROR_MSG = "Configured handlers " + DFS_ROUTER_HANDLER_COUNT_KEY + '=' + " %d is less than the minimum required handlers %d"; + public static final String ERROR_NS_MSG = + "Configured handlers %s=%d is less than the minimum required handlers %d"; + public StaticRouterRpcFairnessPolicyController(Configuration conf) { init(conf); @@ -78,6 +83,7 @@ public void init(Configuration conf) handlerCount -= dedicatedHandlers; insertNameServiceWithPermits(nsId, dedicatedHandlers); logAssignment(nsId, dedicatedHandlers); + getPermitSizes().put(nsId, dedicatedHandlers); } else { unassignedNS.add(nsId); } @@ -92,6 +98,7 @@ public void init(Configuration conf) for (String nsId : unassignedNS) { insertNameServiceWithPermits(nsId, handlersPerNS); logAssignment(nsId, handlersPerNS); + getPermitSizes().put(nsId, handlersPerNS); } } @@ -103,6 +110,7 @@ public void init(Configuration conf) LOG.info("Assigned extra {} handlers to commons pool", leftOverHandlers); insertNameServiceWithPermits(CONCURRENT_NS, existingPermits + leftOverHandlers); + getPermitSizes().put(CONCURRENT_NS, existingPermits + leftOverHandlers); } LOG.info("Final permit allocation for concurrent ns: {}", getAvailablePermits(CONCURRENT_NS)); @@ -116,15 +124,23 @@ private static void logAssignment(String nsId, int count) { private void validateHandlersCount(Configuration conf, int handlerCount, Set allConfiguredNS) { int totalDedicatedHandlers = 0; + int minimumHandlerPerNs = conf.getInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY, + DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT); for (String nsId : allConfiguredNS) { int dedicatedHandlers = conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0); if (dedicatedHandlers > 0) { + if (dedicatedHandlers < minimumHandlerPerNs) { + String msg = String.format(ERROR_NS_MSG, DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, + handlerCount, minimumHandlerPerNs); + LOG.error(msg); + throw new IllegalArgumentException(msg); + } // Total handlers should not be less than sum of dedicated handlers. totalDedicatedHandlers += dedicatedHandlers; } else { - // Each NS should have at least one handler assigned. - totalDedicatedHandlers++; + // Each NS has to have a minimum number of handlers assigned. + totalDedicatedHandlers += minimumHandlerPerNs; } } if (totalDedicatedHandlers > handlerCount) { @@ -134,5 +150,4 @@ private void validateHandlersCount(Configuration conf, int handlerCount, throw new IllegalArgumentException(msg); } } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java index 979e7504a872b..3499962d73b2b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java @@ -115,6 +115,12 @@ public interface FederationRPCMBean { */ String getAvailableHandlerOnPerNs(); + /** + * JSON representation of max handler count per ns. + * @return JSON string representation. + */ + String getPermitCapacityPerNs(); + /** * Get the JSON representation of the async caller thread pool. * @return JSON string representation of the async caller thread pool. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java index 823bc7b8af21c..1ec49b37d33e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java @@ -247,6 +247,11 @@ public String getAvailableHandlerOnPerNs() { getRouterRpcFairnessPolicyController().getAvailableHandlerOnPerNs(); } + @Override + public String getPermitCapacityPerNs() { + return rpcServer.getRPCClient().getRouterRpcFairnessPolicyController().getPermitCapacityPerNs(); + } + @Override public String getAsyncCallerPool() { return rpcServer.getRPCClient().getAsyncCallerPoolJson(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java index 7ff853946d700..30fae5caf3706 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java @@ -29,7 +29,9 @@ import java.util.Collection; import java.util.EnumSet; import java.util.HashSet; +import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.LongAdder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSUtil; @@ -271,6 +273,22 @@ public static RouterRpcFairnessPolicyController newFairnessPolicyController( return newInstance(conf, null, null, clazz); } + /** + * Creates an instance of an RouterRpcFairnessPolicyController + * from the configuration and attaches permits access metrics to the controller. + * + * @param conf Configuration that defines the fairness controller class. + * @param rejectedPermitsPerNs Metrics map ns:rejected permits + * @param acceptedPermitsPerNs Metrics map ns:accepted permits + * @return Fairness policy controller. + */ + public static RouterRpcFairnessPolicyController newFairnessPolicyController(Configuration conf, + Map rejectedPermitsPerNs, Map acceptedPermitsPerNs) { + RouterRpcFairnessPolicyController instance = newFairnessPolicyController(conf); + instance.setMetrics(rejectedPermitsPerNs, acceptedPermitsPerNs); + return instance; + } + /** * Collect all configured nameservices. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 741e470c6fc3f..7ba982a95972c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.federation.router; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.server.federation.fairness.NoRouterRpcFairnessPolicyController; @@ -27,13 +29,11 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; +import org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl; import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; -import org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl; - -import java.util.concurrent.TimeUnit; /** * Config fields for router-based hdfs federation. @@ -354,6 +354,13 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { NoRouterRpcFairnessPolicyController.class; public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX = FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count."; + public static final String DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY = + FEDERATION_ROUTER_FAIRNESS_PREFIX + "minimum.handler.count"; + public static final int DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT = 1; + public static final long DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_DEFAULT = + 600; + public static final String DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_KEY = + FEDERATION_ROUTER_FAIRNESS_PREFIX + "policy.controller.dynamic.refresh.interval.seconds"; // HDFS Router Federation Rename. public static final String DFS_ROUTER_FEDERATION_RENAME_PREFIX = diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 90d6c347ef73e..fabccf292ea8d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -156,8 +156,8 @@ public RouterRpcClient(Configuration conf, Router router, HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT); this.connectionManager = new ConnectionManager(clientConf); this.connectionManager.start(); - this.routerRpcFairnessPolicyController = - FederationUtil.newFairnessPolicyController(conf); + this.routerRpcFairnessPolicyController = FederationUtil + .newFairnessPolicyController(conf, rejectedPermitsPerNs, acceptedPermitsPerNs); int numThreads = conf.getInt( RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/AdjustableSemaphore.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/AdjustableSemaphore.java new file mode 100644 index 0000000000000..34a0563232f56 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/AdjustableSemaphore.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.utils; + +import java.util.concurrent.Semaphore; + +public class AdjustableSemaphore extends Semaphore { + + public AdjustableSemaphore(int permits) { + super(permits); + } + + public AdjustableSemaphore(int permits, boolean fair) { + super(permits, fair); + } + + public void reducePermits(int reduction) { + super.reducePermits(reduction); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index fcf6a28475fbd..b35cc569a4c47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -706,6 +706,24 @@ + + dfs.federation.router.fairness.minimum.handler.count + 1 + + Minimum number of handlers assigned per nameservice. + If any dedicated handler count is smaller than this number, + router initialization will fail. + + + + + dfs.federation.router.fairness.policy.controller.dynamic.refresh.interval.seconds + 600 + + Interval (in seconds) between each handler count resize by DynamicFairnessPolicyController + + + dfs.federation.router.fairness.handler.count.EXAMPLENAMESERVICE diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestDynamicRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestDynamicRouterRpcFairnessPolicyController.java new file mode 100644 index 0000000000000..057f7a3646870 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestDynamicRouterRpcFairnessPolicyController.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.fairness; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.LongAdder; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; + +import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; + +/** + * Test functionality of {@link DynamicRouterRpcFairnessPolicyController). + */ +public class TestDynamicRouterRpcFairnessPolicyController { + + private static String nameServices = "ns1.nn1, ns1.nn2, ns2.nn1, ns2.nn2, ns3.nn1"; + + @Test + public void testDynamicControllerSimple() throws InterruptedException, TimeoutException { +// verifyDynamicControllerSimple(true); + verifyDynamicControllerSimple(false); + } + + @Test + public void testDynamicControllerAllPermitsAcquired() throws InterruptedException { + verifyDynamicControllerAllPermitsAcquired(true); + verifyDynamicControllerAllPermitsAcquired(false); + } + + private void verifyDynamicControllerSimple(boolean manualRefresh) + throws InterruptedException, TimeoutException { + // 3 permits each ns + DynamicRouterRpcFairnessPolicyController controller; + if (manualRefresh) { + controller = getFairnessPolicyController(20); + } else { + controller = getFairnessPolicyController(20, 4); + } + + String[] nss = new String[] {"ns1", "ns2", "ns3", CONCURRENT_NS}; + // Initial permit counts should be 5:5:5 + verifyRemainingPermitCounts(new int[] {5, 5, 5, 5}, nss, controller); + + // Release all permits + for (int i = 0; i < 5; i++) { + controller.releasePermit("ns1"); + controller.releasePermit("ns2"); + controller.releasePermit("ns3"); + controller.releasePermit(CONCURRENT_NS); + } + + // Inject dummy metrics + // Split half half for ns1 and concurrent + Map rejectedPermitsPerNs = new HashMap<>(); + Map acceptedPermitsPerNs = new HashMap<>(); + injectDummyMetrics(rejectedPermitsPerNs, "ns1", 10); + injectDummyMetrics(rejectedPermitsPerNs, "ns2", 0); + injectDummyMetrics(rejectedPermitsPerNs, "ns3", 10); + injectDummyMetrics(rejectedPermitsPerNs, CONCURRENT_NS, 10); + controller.setMetrics(rejectedPermitsPerNs, acceptedPermitsPerNs); + + // Current permits count should be 6:3:6:6 + int[] newPermitCounts = new int[] {6, 3, 6, 6}; + + if (manualRefresh) { + controller.refreshPermitsCap(); + } else { + Thread.sleep(5000); + } + verifyRemainingPermitCounts(newPermitCounts, nss, controller); + + } + + public void verifyDynamicControllerAllPermitsAcquired(boolean manualRefresh) + throws InterruptedException { + // 10 permits each ns + DynamicRouterRpcFairnessPolicyController controller; + if (manualRefresh) { + controller = getFairnessPolicyController(40); + } else { + controller = getFairnessPolicyController(40, 4); + } + + String[] nss = new String[] {"ns1", "ns2", "ns3", CONCURRENT_NS}; + verifyRemainingPermitCounts(new int[] {10, 10, 10, 10}, nss, controller); + + // Inject dummy metrics + Map rejectedPermitsPerNs = new HashMap<>(); + Map acceptedPermitsPerNs = new HashMap<>(); + injectDummyMetrics(rejectedPermitsPerNs, "ns1", 13); + injectDummyMetrics(rejectedPermitsPerNs, "ns2", 13); + injectDummyMetrics(rejectedPermitsPerNs, "ns3", 13); + injectDummyMetrics(rejectedPermitsPerNs, CONCURRENT_NS, 1); + // New permit capacity will be 13:13:13:3 + controller.setMetrics(rejectedPermitsPerNs, acceptedPermitsPerNs); + if (manualRefresh) { + controller.refreshPermitsCap(); + } else { + Thread.sleep(5000); + } + Assert.assertEquals("{\"concurrent\":-7,\"ns2\":3,\"ns1\":3,\"ns3\":3}", + controller.getAvailableHandlerOnPerNs()); + + // Can acquire 3 more permits for ns1, ns2, ns3 + verifyRemainingPermitCounts(new int[] {3, 3, 3, 0}, nss, controller); + // Need to release at least 8 permits for concurrent before it has any free permits + Assert.assertFalse(controller.acquirePermit(CONCURRENT_NS)); + for (int i = 0; i < 7; i++) { + controller.releasePermit(CONCURRENT_NS); + } + Assert.assertFalse(controller.acquirePermit(CONCURRENT_NS)); + controller.releasePermit(CONCURRENT_NS); + Assert.assertTrue(controller.acquirePermit(CONCURRENT_NS)); + } + + private void verifyRemainingPermitCounts(int[] remainingPermitCounts, String[] nss, + RouterRpcFairnessPolicyController controller) { + assert remainingPermitCounts.length == nss.length; + for (int i = 0; i < remainingPermitCounts.length; i++) { + verifyRemainingPermitCount(remainingPermitCounts[i], nss[i], controller); + } + } + + private void verifyRemainingPermitCount(int remainingPermitCount, String nameservice, + RouterRpcFairnessPolicyController controller) { + for (int i = 0; i < remainingPermitCount; i++) { + Assert.assertTrue(controller.acquirePermit(nameservice)); + } + Assert.assertFalse(controller.acquirePermit(nameservice)); + } + + private void injectDummyMetrics(Map metrics, String ns, long value) { + metrics.computeIfAbsent(ns, k -> new LongAdder()).add(value); + } + + private DynamicRouterRpcFairnessPolicyController getFairnessPolicyController(int handlers, + long refreshInterval) { + return new DynamicRouterRpcFairnessPolicyController(createConf(handlers, 3), refreshInterval); + } + + private DynamicRouterRpcFairnessPolicyController getFairnessPolicyController(int handlers) { + return new DynamicRouterRpcFairnessPolicyController(createConf(handlers, 3), Long.MAX_VALUE); + } + + private Configuration createConf(int handlers, int minHandlersPerNs) { + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFS_ROUTER_HANDLER_COUNT_KEY, handlers); + conf.set(DFS_ROUTER_MONITOR_NAMENODE, nameServices); + conf.setInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY, minHandlersPerNs); + conf.setClass(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, + DynamicRouterRpcFairnessPolicyController.class, RouterRpcFairnessPolicyController.class); + return conf; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java index 8307f666b5d1c..261f070df090a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX; @@ -102,6 +103,14 @@ public void testAllocationErrorForLowDefaultHandlersPerNS() { verifyInstantiationError(conf, 1, 3); } + @Test + public void testAllocationErrorTooFewDedicatedHandlers() { + Configuration conf = createConf(9); + conf.setInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY, 3); + conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + CONCURRENT_NS, 1); + verifyInstantiationError(conf, CONCURRENT_NS, 9, 3); + } + @Test public void testGetAvailableHandlerOnPerNs() { RouterRpcFairnessPolicyController routerRpcFairnessPolicyController @@ -113,6 +122,18 @@ public void testGetAvailableHandlerOnPerNs() { routerRpcFairnessPolicyController.getAvailableHandlerOnPerNs()); } + @Test + public void testGetPermitCapacityPerNs() { + RouterRpcFairnessPolicyController routerRpcFairnessPolicyController + = getFairnessPolicyController(30); + assertEquals("{\"concurrent\":10,\"ns2\":10,\"ns1\":10}", + routerRpcFairnessPolicyController.getPermitCapacityPerNs()); + routerRpcFairnessPolicyController.acquirePermit("ns1"); + routerRpcFairnessPolicyController.acquirePermit("ns2"); + assertEquals("{\"concurrent\":10,\"ns2\":10,\"ns1\":10}", + routerRpcFairnessPolicyController.getPermitCapacityPerNs()); + } + @Test public void testGetAvailableHandlerOnPerNsForNoFairness() { Configuration conf = new Configuration(); @@ -170,6 +191,20 @@ private void verifyInstantiationError(Configuration conf, int handlerCount, logs.getOutput().contains(errorMsg)); } + private void verifyInstantiationError(Configuration conf, String ns, int handlerCount, + int minimumHandler) { + GenericTestUtils.LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( + LoggerFactory.getLogger(StaticRouterRpcFairnessPolicyController.class)); + try { + FederationUtil.newFairnessPolicyController(conf); + } catch (IllegalArgumentException e) { + // Ignore the exception as it is expected here. + } + String errorMsg = String.format(StaticRouterRpcFairnessPolicyController.ERROR_NS_MSG, + DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + ns, handlerCount, minimumHandler); + assertTrue("Should contain error message: " + errorMsg, logs.getOutput().contains(errorMsg)); + } + private RouterRpcFairnessPolicyController getFairnessPolicyController( int handlers) { return FederationUtil.newFairnessPolicyController(createConf(handlers));