Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.utils.AdjustableSemaphore;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,7 +42,8 @@ public class AbstractRouterRpcFairnessPolicyController
LoggerFactory.getLogger(AbstractRouterRpcFairnessPolicyController.class);

/** Hash table to hold semaphore for each configured name service. */
private Map<String, Semaphore> permits;
private Map<String, AdjustableSemaphore> permits;
private final Map<String, Integer> permitSizes = new HashMap<>();

public void init(Configuration conf) {
this.permits = new HashMap<>();
Expand Down Expand Up @@ -72,7 +75,7 @@ public void shutdown() {
}

protected void insertNameServiceWithPermits(String nsId, int maxPermits) {
this.permits.put(nsId, new Semaphore(maxPermits));
this.permits.put(nsId, new AdjustableSemaphore(maxPermits));
}

protected int getAvailablePermits(String nsId) {
Expand All @@ -82,7 +85,7 @@ protected int getAvailablePermits(String nsId) {
@Override
public String getAvailableHandlerOnPerNs() {
JSONObject json = new JSONObject();
for (Map.Entry<String, Semaphore> entry : permits.entrySet()) {
for (Map.Entry<String, AdjustableSemaphore> entry : permits.entrySet()) {
try {
String nsId = entry.getKey();
int availableHandler = entry.getValue().availablePermits();
Expand All @@ -93,4 +96,24 @@ public String getAvailableHandlerOnPerNs() {
}
return json.toString();
}

@Override
public String getPermitCapacityPerNs() {
JSONObject json = new JSONObject();
for (Map.Entry<String, Integer> entry : permitSizes.entrySet()) {
try {
json.put(entry.getKey(), entry.getValue());
} catch (JSONException e) {
LOG.warn("Cannot put {} into JSONObject", entry.getKey(), e);
}
}
return json.toString();
}

protected Map<String, AdjustableSemaphore> getPermits() {
return permits;
}
protected Map<String, Integer> getPermitSizes() {
return permitSizes;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.federation.fairness;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.utils.AdjustableSemaphore;
import org.apache.hadoop.util.concurrent.HadoopExecutors;

import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;

/**
* Dynamic fairness policy extending {@link StaticRouterRpcFairnessPolicyController}
* and fetching handlers from configuration for all available name services.
* The handlers count changes according to traffic to namespaces.
* Total handlers might NOT strictly add up to the value defined by DFS_ROUTER_HANDLER_COUNT_KEY
* but will not exceed initial handler count + number of nameservices.
*/
public class DynamicRouterRpcFairnessPolicyController
extends StaticRouterRpcFairnessPolicyController {

public static final Logger LOG =
LoggerFactory.getLogger(DynamicRouterRpcFairnessPolicyController.class);

private static final ScheduledExecutorService SCHEDULED_EXECUTOR = HadoopExecutors
.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("DynamicRouterRpcFairnessPolicyControllerPermitsResizer").build());
private PermitsResizerService permitsResizerService;
private ScheduledFuture<?> refreshTask;
private int handlerCount;
private int minimumHandlerPerNs;
private final Map<String, LongAdder> rejectedPermitsPerNs = new ConcurrentHashMap<>();
private final Map<String, LongAdder> acceptedPermitsPerNs = new ConcurrentHashMap<>();

/**
* Initializes using the same logic as {@link StaticRouterRpcFairnessPolicyController}
* and starts a periodic semaphore resizer thread.
*
* @param conf configuration
*/
public DynamicRouterRpcFairnessPolicyController(Configuration conf) {
super(conf);
minimumHandlerPerNs = conf.getInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY,
DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT);
handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT);
long refreshInterval =
conf.getTimeDuration(DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_KEY,
DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_DEFAULT,
TimeUnit.SECONDS);
permitsResizerService = new PermitsResizerService();
refreshTask = SCHEDULED_EXECUTOR
.scheduleWithFixedDelay(permitsResizerService, refreshInterval, refreshInterval,
TimeUnit.SECONDS);
}

@VisibleForTesting
public DynamicRouterRpcFairnessPolicyController(Configuration conf, long refreshInterval) {
super(conf);
minimumHandlerPerNs = conf.getInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY,
DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT);
handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT);
permitsResizerService = new PermitsResizerService();
refreshTask = SCHEDULED_EXECUTOR
.scheduleWithFixedDelay(permitsResizerService, refreshInterval, refreshInterval,
TimeUnit.SECONDS);
}

@VisibleForTesting
public void refreshPermitsCap() {
permitsResizerService.run();
}

@VisibleForTesting
public PermitsResizerService getResizerService() {
return permitsResizerService;
}

@Override
public void shutdown() {
super.shutdown();
if (refreshTask != null) {
refreshTask.cancel(true);
}
}

@Override
public boolean acquirePermit(String nsId) {
boolean result = super.acquirePermit(nsId);
if (result) {
acceptedPermitsPerNs.computeIfAbsent(nsId, k -> new LongAdder()).increment();
} else {
rejectedPermitsPerNs.computeIfAbsent(nsId, k -> new LongAdder()).increment();
}
return result;
}

@VisibleForTesting
public void setAcceptedPermitsPerNs(Map<String, LongAdder> metrics) {
for (Map.Entry<String, LongAdder> entry: metrics.entrySet()) {
acceptedPermitsPerNs.put(entry.getKey(), new LongAdder());
acceptedPermitsPerNs.get(entry.getKey()).add(entry.getValue().longValue());
}
List<String> toRemove = new ArrayList<>();
for (String key: acceptedPermitsPerNs.keySet()) {
if (!metrics.containsKey(key)) {
toRemove.add(key);
}
}
for (String key: toRemove) {
acceptedPermitsPerNs.remove(key);
}
}

@VisibleForTesting
public void setRejectedPermitsPerNs(Map<String, LongAdder> metrics) {
for (Map.Entry<String, LongAdder> entry: metrics.entrySet()) {
rejectedPermitsPerNs.put(entry.getKey(), new LongAdder());
rejectedPermitsPerNs.get(entry.getKey()).add(entry.getValue().longValue());
}
List<String> toRemove = new ArrayList<>();
for (String key: rejectedPermitsPerNs.keySet()) {
if (!metrics.containsKey(key)) {
toRemove.add(key);
}
}
for (String key: toRemove) {
rejectedPermitsPerNs.remove(key);
}
}

class PermitsResizerService implements Runnable {

@Override
public synchronized void run() {
if (rejectedPermitsPerNs == null || acceptedPermitsPerNs == null || (
rejectedPermitsPerNs.isEmpty() && acceptedPermitsPerNs.isEmpty())) {
return;
}
long totalOps = 0;
Map<String, Long> nsOps = new HashMap<>();
for (Map.Entry<String, AdjustableSemaphore> entry : getPermits().entrySet()) {
long ops = (rejectedPermitsPerNs.containsKey(entry.getKey()) ?
rejectedPermitsPerNs.get(entry.getKey()).longValue() :
0) + (acceptedPermitsPerNs.containsKey(entry.getKey()) ?
acceptedPermitsPerNs.get(entry.getKey()).longValue() :
0);
nsOps.put(entry.getKey(), ops);
totalOps += ops;
}

List<String> underMinimumNss = new ArrayList<>();
List<String> overMinimumNss = new ArrayList<>();
int effectiveOps = 0;

// First iteration: split namespaces into those underused and those that are not.
for (Map.Entry<String, AdjustableSemaphore> entry : getPermits().entrySet()) {
String ns = entry.getKey();
int newPermitCap = (int) Math.ceil((float) nsOps.get(ns) / totalOps * handlerCount);

if (newPermitCap <= minimumHandlerPerNs) {
underMinimumNss.add(ns);
} else {
overMinimumNss.add(ns);
effectiveOps += nsOps.get(ns);
}
}

// Second iteration part 1: assign minimum handlers
for (String ns: underMinimumNss) {
resizeNsHandlerCapacity(ns, minimumHandlerPerNs);
}
// Second iteration part 2: assign handlers to the rest
int leftoverPermits = handlerCount - minimumHandlerPerNs * underMinimumNss.size();
for (String ns: overMinimumNss) {
int newPermitCap = (int) Math.ceil((float) nsOps.get(ns) / effectiveOps * leftoverPermits);
resizeNsHandlerCapacity(ns, newPermitCap);
}

// Reset the metrics
rejectedPermitsPerNs.replaceAll((k, v) -> new LongAdder());
acceptedPermitsPerNs.replaceAll((k, v) -> new LongAdder());
}

private void resizeNsHandlerCapacity(String ns, int newPermitCap) {
AdjustableSemaphore semaphore = getPermits().get(ns);
int oldPermitCap = getPermitSizes().get(ns);
if (newPermitCap <= minimumHandlerPerNs) {
newPermitCap = minimumHandlerPerNs;
}
getPermitSizes().put(ns, newPermitCap);
if (newPermitCap > oldPermitCap) {
semaphore.release(newPermitCap - oldPermitCap);
} else if (newPermitCap < oldPermitCap) {
semaphore.reducePermits(oldPermitCap - newPermitCap);
}
LOG.info("Resized handlers for nsId {} from {} to {}", ns, oldPermitCap, newPermitCap);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,9 @@ public void shutdown() {
public String getAvailableHandlerOnPerNs(){
return "N/A";
}

@Override
public String getPermitCapacityPerNs() {
return "N/A";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,9 @@ public interface RouterRpcFairnessPolicyController {
* Returns the JSON string of the available handler for each Ns.
*/
String getAvailableHandlerOnPerNs();

/**
* Returns the JSON string of the max handler count for each ns.
*/
String getPermitCapacityPerNs();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.HashSet;

import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
Expand All @@ -45,6 +47,9 @@ public class StaticRouterRpcFairnessPolicyController extends
public static final String ERROR_MSG = "Configured handlers "
+ DFS_ROUTER_HANDLER_COUNT_KEY + '='
+ " %d is less than the minimum required handlers %d";
public static final String ERROR_NS_MSG =
"Configured handlers %s=%d is less than the minimum required handlers %d";


public StaticRouterRpcFairnessPolicyController(Configuration conf) {
init(conf);
Expand Down Expand Up @@ -78,6 +83,7 @@ public void init(Configuration conf)
handlerCount -= dedicatedHandlers;
insertNameServiceWithPermits(nsId, dedicatedHandlers);
logAssignment(nsId, dedicatedHandlers);
getPermitSizes().put(nsId, dedicatedHandlers);
} else {
unassignedNS.add(nsId);
}
Expand All @@ -92,6 +98,7 @@ public void init(Configuration conf)
for (String nsId : unassignedNS) {
insertNameServiceWithPermits(nsId, handlersPerNS);
logAssignment(nsId, handlersPerNS);
getPermitSizes().put(nsId, handlersPerNS);
}
}

Expand All @@ -103,6 +110,7 @@ public void init(Configuration conf)
LOG.info("Assigned extra {} handlers to commons pool", leftOverHandlers);
insertNameServiceWithPermits(CONCURRENT_NS,
existingPermits + leftOverHandlers);
getPermitSizes().put(CONCURRENT_NS, existingPermits + leftOverHandlers);
}
LOG.info("Final permit allocation for concurrent ns: {}",
getAvailablePermits(CONCURRENT_NS));
Expand All @@ -116,15 +124,23 @@ private static void logAssignment(String nsId, int count) {
private void validateHandlersCount(Configuration conf, int handlerCount,
Set<String> allConfiguredNS) {
int totalDedicatedHandlers = 0;
int minimumHandlerPerNs = conf.getInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY,
DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT);
for (String nsId : allConfiguredNS) {
int dedicatedHandlers =
conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
if (dedicatedHandlers > 0) {
if (dedicatedHandlers < minimumHandlerPerNs) {
String msg = String.format(ERROR_NS_MSG, DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId,
dedicatedHandlers, minimumHandlerPerNs);
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
// Total handlers should not be less than sum of dedicated handlers.
totalDedicatedHandlers += dedicatedHandlers;
} else {
// Each NS should have at least one handler assigned.
totalDedicatedHandlers++;
// Each NS has to have a minimum number of handlers assigned.
totalDedicatedHandlers += minimumHandlerPerNs;
}
}
if (totalDedicatedHandlers > handlerCount) {
Expand All @@ -134,5 +150,4 @@ private void validateHandlersCount(Configuration conf, int handlerCount,
throw new IllegalArgumentException(msg);
}
}

}
Loading