Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -496,4 +496,15 @@ public List<N> getNodesPerPartition(String partition) {
}
return nodesPerPartition;
}

public List<String> getPartitions() {
List<String> partitions = null;
readLock.lock();
try {
partitions = new ArrayList(nodesPerLabel.keySet());
} finally {
readLock.unlock();
}
return partitions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,8 @@ private static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node,

/**
* Schedule on all nodes by starting at a random point.
* Schedule on all partitions by starting at a random partition
* when multiNodePlacementEnabled is true.
* @param cs
*/
static void schedule(CapacityScheduler cs) throws InterruptedException{
Expand All @@ -544,44 +546,79 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{
if(nodeSize == 0) {
return;
}
int start = random.nextInt(nodeSize);

// To avoid too verbose DEBUG logging, only print debug log once for
// every 10 secs.
boolean printSkipedNodeLogging = false;
if (Time.monotonicNow() / 1000 % 10 == 0) {
printSkipedNodeLogging = (!printedVerboseLoggingForAsyncScheduling);
} else {
printedVerboseLoggingForAsyncScheduling = false;
}
if (!cs.multiNodePlacementEnabled) {
int start = random.nextInt(nodeSize);

// To avoid too verbose DEBUG logging, only print debug log once for
// every 10 secs.
boolean printSkipedNodeLogging = false;
if (Time.monotonicNow() / 1000 % 10 == 0) {
printSkipedNodeLogging = (!printedVerboseLoggingForAsyncScheduling);
} else {
printedVerboseLoggingForAsyncScheduling = false;
}

// Allocate containers of node [start, end)
for (FiCaSchedulerNode node : nodes) {
if (current++ >= start) {
if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
continue;
}
cs.allocateContainersToNode(node.getNodeID(), false);
}
}

// Allocate containers of node [start, end)
for (FiCaSchedulerNode node : nodes) {
if (current++ >= start) {
current = 0;

// Allocate containers of node [0, start)
for (FiCaSchedulerNode node : nodes) {
if (current++ > start) {
break;
}
if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
continue;
}
cs.allocateContainersToNode(node.getNodeID(), false);
}
}

current = 0;

// Allocate containers of node [0, start)
for (FiCaSchedulerNode node : nodes) {
if (current++ > start) {
break;
if (printSkipedNodeLogging) {
printedVerboseLoggingForAsyncScheduling = true;
}
if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
continue;
} else {
// Get all partitions
List<String> partitions = cs.nodeTracker.getPartitions();
int partitionSize = partitions.size();
// First randomize the start point
int start = random.nextInt(partitionSize);
// Allocate containers of partition [start, end)
for (String partititon : partitions) {
if (current++ >= start) {
CandidateNodeSet<FiCaSchedulerNode> candidates =
cs.getCandidateNodeSet(partititon);
if (candidates == null) {
continue;
}
cs.allocateContainersToNode(candidates, false);
}
}
cs.allocateContainersToNode(node.getNodeID(), false);
}

if (printSkipedNodeLogging) {
printedVerboseLoggingForAsyncScheduling = true;
}
current = 0;

// Allocate containers of partition [0, start)
for (String partititon : partitions) {
if (current++ > start) {
break;
}
CandidateNodeSet<FiCaSchedulerNode> candidates =
cs.getCandidateNodeSet(partititon);
if (candidates == null) {
continue;
}
cs.allocateContainersToNode(candidates, false);
}

}
Thread.sleep(cs.getAsyncScheduleInterval());
}

Expand Down Expand Up @@ -1486,17 +1523,34 @@ private boolean canAllocateMore(CSAssignment assignment, int offswitchCount,
}

private CandidateNodeSet<FiCaSchedulerNode> getCandidateNodeSet(
FiCaSchedulerNode node) {
String partition) {
CandidateNodeSet<FiCaSchedulerNode> candidates = null;
Map<NodeId, FiCaSchedulerNode> nodesByPartition = new HashMap<>();
List<FiCaSchedulerNode> nodes = nodeTracker
.getNodesPerPartition(partition);
if (nodes != null && !nodes.isEmpty()) {
//Filter for node heartbeat too long
nodes.stream()
.filter(node -> !shouldSkipNodeSchedule(node, this, true))
.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
candidates = new SimpleCandidateNodeSet<FiCaSchedulerNode>(
nodesByPartition, partition);
}
return candidates;
}

private CandidateNodeSet<FiCaSchedulerNode> getCandidateNodeSet(
FiCaSchedulerNode node) {
CandidateNodeSet<FiCaSchedulerNode> candidates = null;
candidates = new SimpleCandidateNodeSet<>(node);
if (multiNodePlacementEnabled) {
Map<NodeId, FiCaSchedulerNode> nodesByPartition = new HashMap<>();
List<FiCaSchedulerNode> nodes = nodeTracker
.getNodesPerPartition(node.getPartition());
.getNodesPerPartition(node.getPartition());
if (nodes != null && !nodes.isEmpty()) {
nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
candidates = new SimpleCandidateNodeSet<FiCaSchedulerNode>(
nodesByPartition, node.getPartition());
nodesByPartition, node.getPartition());
}
}
return candidates;
Expand All @@ -1513,8 +1567,8 @@ private void allocateContainersToNode(NodeId nodeId,
int offswitchCount = 0;
int assignedContainers = 0;

CandidateNodeSet<FiCaSchedulerNode> candidates = getCandidateNodeSet(
node);
CandidateNodeSet<FiCaSchedulerNode> candidates =
getCandidateNodeSet(node);
CSAssignment assignment = allocateContainersToNode(candidates,
withNodeHeartbeat);
// Only check if we can allocate more container on the same node when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public class TestCapacitySchedulerAsyncScheduling {

private NMHeartbeatThread nmHeartbeatThread = null;

private static final String POLICY_CLASS_NAME =
"org.apache.hadoop.yarn.server.resourcemanager.scheduler" +
".placement.ResourceUsageMultiNodeLookupPolicy";

@Before
public void setUp() throws Exception {
conf = new YarnConfiguration();
Expand All @@ -111,6 +115,21 @@ public void testThreeThreadsAsyncContainerAllocation() throws Exception {
testAsyncContainerAllocation(3);
}

@Test(timeout = 300000)
public void testAsyncContainerAllocationWithMultiNode() throws Exception {
conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES,
"resource-based");
conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME,
"resource-based");
String policyName =
CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
+ ".resource-based" + ".class";
conf.set(policyName, POLICY_CLASS_NAME);
conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED,
true);
testAsyncContainerAllocation(2);
}

public void testAsyncContainerAllocation(int numThreads) throws Exception {
conf.setInt(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
Expand Down