-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HDFS-16671. RBF: RouterRpcFairnessPolicyController supports configurable permit acquire timeout #4597
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HDFS-16671. RBF: RouterRpcFairnessPolicyController supports configurable permit acquire timeout #4597
Changes from 2 commits
300d8ee
bfb9229
d985449
fcbdd2f
e5436d5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,9 @@ | |
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT; | ||
| import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_MS; | ||
|
|
||
| /** | ||
| * Base fairness policy that implements @RouterRpcFairnessPolicyController. | ||
| * Internally a map of nameservice to Semaphore is used to control permits. | ||
|
|
@@ -42,15 +45,22 @@ public class AbstractRouterRpcFairnessPolicyController | |
| /** Hash table to hold semaphore for each configured name service. */ | ||
| private Map<String, Semaphore> permits; | ||
|
|
||
| private long acquireTimeoutMs = DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT; | ||
|
|
||
| public void init(Configuration conf) { | ||
| this.permits = new HashMap<>(); | ||
| long timeoutMs = conf.getTimeDuration(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_MS, | ||
| DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); | ||
| if (timeoutMs >= 0) { | ||
| acquireTimeoutMs = timeoutMs; | ||
|
Comment on lines
+54
to
+55
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if there is an invalid entry configured and we are moving to using the default value. We should atleast have a warn log. Kind of |
||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public boolean acquirePermit(String nsId) { | ||
| try { | ||
| LOG.debug("Taking lock for nameservice {}", nsId); | ||
| return this.permits.get(nsId).tryAcquire(1, TimeUnit.SECONDS); | ||
| return this.permits.get(nsId).tryAcquire(acquireTimeoutMs, TimeUnit.MILLISECONDS); | ||
| } catch (InterruptedException e) { | ||
| LOG.debug("Cannot get a permit for nameservice {}", nsId); | ||
| } | ||
|
|
@@ -82,15 +92,13 @@ protected int getAvailablePermits(String nsId) { | |
| @Override | ||
| public String getAvailableHandlerOnPerNs() { | ||
| JSONObject json = new JSONObject(); | ||
| for (Map.Entry<String, Semaphore> entry : permits.entrySet()) { | ||
| permits.forEach((k, v) -> { | ||
| try { | ||
| String nsId = entry.getKey(); | ||
| int availableHandler = entry.getValue().availablePermits(); | ||
| json.put(nsId, availableHandler); | ||
| json.put(k, v.availablePermits()); | ||
| } catch (JSONException e) { | ||
| LOG.warn("Cannot put {} into JSONObject", entry.getKey(), e); | ||
| LOG.warn("Cannot put {} into JSONObject", k, e); | ||
| } | ||
| } | ||
| }); | ||
| return json.toString(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -723,6 +723,14 @@ | |
| </description> | ||
| </property> | ||
|
|
||
| <property> | ||
| <name>dfs.federation.router.fairness.acquire.timeout.ms</name> | ||
| <value>1000</value> | ||
|
||
| <description> | ||
| The maximum time, in milliseconds, to wait for a permit. | ||
| </description> | ||
| </property> | ||
|
|
||
| <property> | ||
| <name>dfs.federation.router.federation.rename.bandwidth</name> | ||
| <value>10</value> | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,10 +23,12 @@ | |
| import org.apache.hadoop.hdfs.server.federation.router.FederationUtil; | ||
| import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; | ||
| import org.apache.hadoop.test.GenericTestUtils; | ||
| import org.apache.hadoop.util.Time; | ||
| import org.junit.Test; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; | ||
| import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_MS; | ||
| import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; | ||
| import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; | ||
| import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX; | ||
|
|
@@ -83,6 +85,29 @@ public void testHandlerAllocationPreconfigured() { | |
| assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS)); | ||
| } | ||
|
|
||
| @Test | ||
| public void testAcquireTimeout() { | ||
| Configuration conf = createConf(40); | ||
| conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 30); | ||
| conf.setLong(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_MS, 100); | ||
|
||
| RouterRpcFairnessPolicyController routerRpcFairnessPolicyController = | ||
| FederationUtil.newFairnessPolicyController(conf); | ||
|
|
||
| // ns1 should have 30 permits allocated | ||
| for (int i = 0; i < 30; i++) { | ||
| assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1")); | ||
| } | ||
| long acquireBeginTimeMs = Time.monotonicNow(); | ||
| assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1")); | ||
| long acquireEndTimeMs = Time.monotonicNow(); | ||
|
|
||
| long acquireTimeMs = acquireEndTimeMs - acquireBeginTimeMs; | ||
|
|
||
| // There are some other operations, so acquireTimeMs >= 100ms. | ||
| assertTrue(acquireTimeMs >= 100); | ||
| assertTrue(acquireTimeMs < 100 + 50); | ||
|
||
| } | ||
|
|
||
| @Test | ||
| public void testAllocationErrorWithZeroHandlers() { | ||
| Configuration conf = createConf(0); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do getTimeDuration() we don't need the prefix in the key DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_MS.
We should set the default in the XML to "1s"