Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;

import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.utils.AdjustableSemaphore;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,7 +43,9 @@ public class AbstractRouterRpcFairnessPolicyController
LoggerFactory.getLogger(AbstractRouterRpcFairnessPolicyController.class);

/** Hash table to hold semaphore for each configured name service. */
private Map<String, Semaphore> permits;
protected Map<String, AdjustableSemaphore> permits;
protected Map<String, LongAdder> rejectedPermitsPerNs;
protected Map<String, LongAdder> acceptedPermitsPerNs;

public void init(Configuration conf) {
this.permits = new HashMap<>();
Expand Down Expand Up @@ -71,7 +76,7 @@ public void shutdown() {
}

protected void insertNameServiceWithPermits(String nsId, int maxPermits) {
this.permits.put(nsId, new Semaphore(maxPermits));
this.permits.put(nsId, new AdjustableSemaphore(maxPermits));
}

protected int getAvailablePermits(String nsId) {
Expand All @@ -81,7 +86,7 @@ protected int getAvailablePermits(String nsId) {
@Override
public String getAvailableHandlerOnPerNs() {
JSONObject json = new JSONObject();
for (Map.Entry<String, Semaphore> entry : permits.entrySet()) {
for (Map.Entry<String, AdjustableSemaphore> entry : permits.entrySet()) {
try {
String nsId = entry.getKey();
int availableHandler = entry.getValue().availablePermits();
Expand All @@ -92,4 +97,11 @@ public String getAvailableHandlerOnPerNs() {
}
return json.toString();
}

@Override
public void setMetrics(Map<String, LongAdder> rejectedPermitsPerNs,
Map<String, LongAdder> acceptedPermitsPerNs) {
this.rejectedPermitsPerNs = rejectedPermitsPerNs;
this.acceptedPermitsPerNs = acceptedPermitsPerNs;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.federation.fairness;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.utils.AdjustableSemaphore;
import org.apache.hadoop.util.concurrent.HadoopExecutors;

import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;

/**
* Dynamic fairness policy extending {@link StaticRouterRpcFairnessPolicyController}
* and fetching handlers from configuration for all available name services.
* The handlers count changes according to traffic to namespaces.
* Total handlers might NOT strictly add up to the value defined by DFS_ROUTER_HANDLER_COUNT_KEY.
*/
public class DynamicRouterRpcFairnessPolicyController
extends StaticRouterRpcFairnessPolicyController {

private static final Logger LOG =
LoggerFactory.getLogger(DynamicRouterRpcFairnessPolicyController.class);

private static final ScheduledExecutorService scheduledExecutor = HadoopExecutors
.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("DynamicRouterRpcFairnessPolicyControllerPermitsResizer").build());
private PermitsResizerService permitsResizerService;
private ScheduledFuture<?> refreshTask;
private int handlerCount;

/**
* Initializes using the same logic as {@link StaticRouterRpcFairnessPolicyController}
* and starts a periodic semaphore resizer thread
*
* @param conf configuration
*/
public DynamicRouterRpcFairnessPolicyController(Configuration conf) {
super(conf);
handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT);
long refreshInterval =
conf.getTimeDuration(DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_KEY,
DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
permitsResizerService = new PermitsResizerService();
refreshTask = scheduledExecutor
.scheduleWithFixedDelay(permitsResizerService, refreshInterval, refreshInterval,
TimeUnit.MILLISECONDS);
}

@VisibleForTesting
public DynamicRouterRpcFairnessPolicyController(Configuration conf, long refreshInterval) {
super(conf);
handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT);
permitsResizerService = new PermitsResizerService();
refreshTask = scheduledExecutor
.scheduleWithFixedDelay(permitsResizerService, refreshInterval, refreshInterval,
TimeUnit.MILLISECONDS);
}

@VisibleForTesting
public void refreshPermitsCap() {
permitsResizerService.run();
}

@Override
public void shutdown() {
super.shutdown();
if (refreshTask != null) {
refreshTask.cancel(true);
}
if (scheduledExecutor != null) {
scheduledExecutor.shutdown();
}
}

class PermitsResizerService implements Runnable {

@Override
public synchronized void run() {
long totalOps = 0;
Map<String, Long> nsOps = new HashMap<>();
for (Map.Entry<String, AdjustableSemaphore> entry : permits.entrySet()) {
long ops = (rejectedPermitsPerNs.containsKey(entry.getKey()) ?
rejectedPermitsPerNs.get(entry.getKey()).longValue() :
0) + (acceptedPermitsPerNs.containsKey(entry.getKey()) ?
acceptedPermitsPerNs.get(entry.getKey()).longValue() :
0);
nsOps.put(entry.getKey(), ops);
totalOps += ops;
}

for (Map.Entry<String, AdjustableSemaphore> entry : permits.entrySet()) {
Comment thread
ferhui marked this conversation as resolved.
Outdated
String ns = entry.getKey();
AdjustableSemaphore semaphore = entry.getValue();
int oldPermitCap = permitSizes.get(ns);
int newPermitCap = (int) Math.ceil((float) nsOps.get(ns) / totalOps * handlerCount);
// Leave at least 1 handler even if there's no traffic
if (newPermitCap == 0) {
newPermitCap = 1;
Comment thread
kokonguyen191 marked this conversation as resolved.
Outdated
}
permitSizes.put(ns, newPermitCap);
if (newPermitCap > oldPermitCap) {
semaphore.release(newPermitCap - oldPermitCap);
} else if (newPermitCap < oldPermitCap) {
semaphore.reducePermits(oldPermitCap - newPermitCap);
}
LOG.info("Resized handlers for nsId {} from {} to {}", ns, oldPermitCap, newPermitCap);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.hadoop.hdfs.server.federation.fairness;

import java.util.Map;
import java.util.concurrent.atomic.LongAdder;

import org.apache.hadoop.conf.Configuration;

/**
Expand Down Expand Up @@ -51,4 +54,10 @@ public void shutdown() {
public String getAvailableHandlerOnPerNs(){
return "N/A";
}

@Override
public void setMetrics(Map<String, LongAdder> rejectedPermitsPerNs,
Map<String, LongAdder> acceptedPermitsPerNs) {
// Nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.hadoop.hdfs.server.federation.fairness;

import java.util.Map;
import java.util.concurrent.atomic.LongAdder;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

Expand Down Expand Up @@ -67,4 +70,10 @@ public interface RouterRpcFairnessPolicyController {
* Returns the JSON string of the available handler for each Ns.
*/
String getAvailableHandlerOnPerNs();

/**
* Attaches permits access metrics to the controller
*/
void setMetrics(Map<String, LongAdder> rejectedPermitsPerNs,
Map<String, LongAdder> acceptedPermitsPerNs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.HashSet;

Expand All @@ -46,6 +48,8 @@ public class StaticRouterRpcFairnessPolicyController extends
+ DFS_ROUTER_HANDLER_COUNT_KEY + '='
+ " %d is less than the minimum required handlers %d";

protected Map<String, Integer> permitSizes = new HashMap<>();

public StaticRouterRpcFairnessPolicyController(Configuration conf) {
init(conf);
}
Expand Down Expand Up @@ -78,6 +82,7 @@ public void init(Configuration conf)
handlerCount -= dedicatedHandlers;
insertNameServiceWithPermits(nsId, dedicatedHandlers);
logAssignment(nsId, dedicatedHandlers);
permitSizes.put(nsId, dedicatedHandlers);
} else {
unassignedNS.add(nsId);
}
Expand All @@ -92,6 +97,7 @@ public void init(Configuration conf)
for (String nsId : unassignedNS) {
insertNameServiceWithPermits(nsId, handlersPerNS);
logAssignment(nsId, handlersPerNS);
permitSizes.put(nsId, handlersPerNS);
}
}

Expand All @@ -103,6 +109,7 @@ public void init(Configuration conf)
LOG.info("Assigned extra {} handlers to commons pool", leftOverHandlers);
insertNameServiceWithPermits(CONCURRENT_NS,
existingPermits + leftOverHandlers);
permitSizes.put(CONCURRENT_NS, existingPermits + leftOverHandlers);
}
LOG.info("Final permit allocation for concurrent ns: {}",
getAvailablePermits(CONCURRENT_NS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.LongAdder;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
Expand Down Expand Up @@ -271,6 +273,22 @@ public static RouterRpcFairnessPolicyController newFairnessPolicyController(
return newInstance(conf, null, null, clazz);
}

/**
* Creates an instance of an RouterRpcFairnessPolicyController
* from the configuration and attaches permits access metrics to the controller.
*
* @param conf Configuration that defines the fairness controller class.
* @param rejectedPermitsPerNs Metrics map ns -> rejected permits
* @param acceptedPermitsPerNs Metrics map ns -> accepted permits
* @return Fairness policy controller.
*/
public static RouterRpcFairnessPolicyController newFairnessPolicyController(Configuration conf,
Map<String, LongAdder> rejectedPermitsPerNs, Map<String, LongAdder> acceptedPermitsPerNs) {
RouterRpcFairnessPolicyController instance = newFairnessPolicyController(conf);
instance.setMetrics(rejectedPermitsPerNs, acceptedPermitsPerNs);
return instance;
}

/**
* Collect all configured nameservices.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
NoRouterRpcFairnessPolicyController.class;
public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX =
FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count.";
public static final long DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_DEFAULT =
600000;
public static final String DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_KEY =
FEDERATION_ROUTER_FAIRNESS_PREFIX + "policy.controller.dynamic.refresh.interval";
Comment thread
kokonguyen191 marked this conversation as resolved.
Outdated

// HDFS Router Federation Rename.
public static final String DFS_ROUTER_FEDERATION_RENAME_PREFIX =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ public RouterRpcClient(Configuration conf, Router router,
HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT);
this.connectionManager = new ConnectionManager(clientConf);
this.connectionManager.start();
this.routerRpcFairnessPolicyController =
FederationUtil.newFairnessPolicyController(conf);
this.routerRpcFairnessPolicyController = FederationUtil
.newFairnessPolicyController(conf, rejectedPermitsPerNs, acceptedPermitsPerNs);

int numThreads = conf.getInt(
RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.federation.utils;

import java.util.concurrent.Semaphore;

public class AdjustableSemaphore extends Semaphore {

public AdjustableSemaphore(int permits) {
super(permits);
}

public AdjustableSemaphore(int permits, boolean fair) {
super(permits, fair);
}

public void reducePermits(int reduction) {
super.reducePermits(reduction);
}
}
Loading