Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ private static void addDeprecatedKeys() {
public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME =
false;

/** Configure default application placement allocator. */
public static final String APPLICATION_PLACEMENT_TYPE_CLASS =
YARN_PREFIX + "scheduler.app-placement-allocator.class";

/** Configured scheduler queue placement rules. */
public static final String QUEUE_PLACEMENT_RULES = YARN_PREFIX
+ "scheduler.queue-placement-rules";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -105,6 +106,8 @@ public class AppSchedulingInfo {
private final int retryAttempts;
private boolean unmanagedAM;

private final String defaultResourceRequestAppPlacementType;

public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user,
Queue queue, AbstractUsersManager abstractUsersManager, long epoch,
ResourceUsage appResourceUsage,
Expand All @@ -129,6 +132,31 @@ public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user,
updateContext = new ContainerUpdateContext(this);
readLock = lock.readLock();
writeLock = lock.writeLock();

this.defaultResourceRequestAppPlacementType =
getDefaultResourceRequestAppPlacementType();
}

/**
* Set default App Placement Allocator.
*
* @return app placement class.
*/
public String getDefaultResourceRequestAppPlacementType() {
if (this.rmContext != null
&& this.rmContext.getYarnConfiguration() != null) {

String appPlacementClass = applicationSchedulingEnvs.get(
ApplicationSchedulingConfig.ENV_APPLICATION_PLACEMENT_TYPE_CLASS);
if (null != appPlacementClass) {
return appPlacementClass;
} else {
Configuration conf = rmContext.getYarnConfiguration();
return conf.get(
YarnConfiguration.APPLICATION_PLACEMENT_TYPE_CLASS);
}
}
return null;
}

public ApplicationId getApplicationId() {
Expand Down Expand Up @@ -331,8 +359,7 @@ private boolean internalAddResourceRequests(
SchedulerRequestKey schedulerRequestKey = entry.getKey();
AppPlacementAllocator<SchedulerNode> appPlacementAllocator =
getAndAddAppPlacementAllocatorIfNotExist(schedulerRequestKey,
applicationSchedulingEnvs.get(
ApplicationSchedulingConfig.ENV_APPLICATION_PLACEMENT_TYPE_CLASS));
defaultResourceRequestAppPlacementType);

// Update AppPlacementAllocator
PendingAskUpdateResult pendingAmountChanges =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
Expand All @@ -32,6 +34,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -175,4 +178,52 @@ public void testSchedulerKeyAccounting() {
info.updateResourceRequests(reqs, false);
Assert.assertEquals(0, info.getSchedulerKeys().size());
}

@Test
public void testApplicationPlacementType() {
String DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS =
LocalityAppPlacementAllocator.class.getName();
Configuration conf = new Configuration();
RMContext rmContext = mock(RMContext.class);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
ApplicationId appIdImpl = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appIdImpl, 1);
Queue queue = mock(Queue.class);
AppSchedulingInfo info = new AppSchedulingInfo(appAttemptId, "test", queue,
mock(ActiveUsersManager.class), 0, new ResourceUsage(), new HashMap<>(),
rmContext, false);
Assert.assertEquals(info.getApplicationSchedulingEnvs(), new HashMap<>());
// This should return null as nothing is set in the conf.
Assert.assertNull(info.getDefaultResourceRequestAppPlacementType());
conf = new Configuration();
conf.set(YarnConfiguration.APPLICATION_PLACEMENT_TYPE_CLASS,
DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
info = new AppSchedulingInfo(appAttemptId, "test", queue,
mock(ActiveUsersManager.class), 0, new ResourceUsage(), new HashMap<>(),
rmContext, false);
Assert.assertEquals(info.getDefaultResourceRequestAppPlacementType(),
DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS);
}

@Test
public void testApplicationPlacementTypeNotConfigured() {
Configuration conf = new Configuration();
RMContext rmContext = mock(RMContext.class);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
ApplicationId appIdImpl = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appIdImpl, 1);
Queue queue = mock(Queue.class);
HashMap<String, String> applicationSchedulingEnvs = new HashMap<>();
applicationSchedulingEnvs.put("APPLICATION_PLACEMENT_TYPE_CLASS",
LocalityAppPlacementAllocator.class.getName());
AppSchedulingInfo info = new AppSchedulingInfo(appAttemptId, "test", queue,
mock(ActiveUsersManager.class), 0, new ResourceUsage(),
applicationSchedulingEnvs, rmContext, false);
// This should be set from applicationSchedulingEnvs
Assert.assertEquals(info.getDefaultResourceRequestAppPlacementType(),
LocalityAppPlacementAllocator.class.getName());
}
}