diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java index ca575d4dfa..44c7ea98ac 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java @@ -122,5 +122,18 @@ public enum DAGCounter { * task assignments). This is typically exposed by a resource manager * client. */ - NODE_TOTAL_COUNT + NODE_TOTAL_COUNT, + + /* + * The maximum amount of time a task spent waiting as a task request before being scheduled. + */ + TASK_SCHEDULER_MAX_PENDING_TIME_MS, + /* + * The total accumulated time that all tasks spent waiting as task requests. + */ + TASK_SCHEDULER_SUM_PENDING_TIME_MS, + /* + * The average time tasks spent waiting as task requests before being scheduled. + */ + TASK_SCHEDULER_AVG_PENDING_TIME_MS } diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java index 42ff56f2ba..93f0d22fb9 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java @@ -54,7 +54,6 @@ public enum SchedulerTaskState { public TaskScheduler(TaskSchedulerContext taskSchedulerContext) { this.taskSchedulerContext = taskSchedulerContext; } - /** * An entry point for initialization. * Order of service setup. Constructor, initialize(), start() - when starting a service. @@ -280,4 +279,12 @@ protected void onContainersAllocated(List containers) { getContext().containerAllocated(container); } } + + /** + * Collects DAG-level counters from the TaskScheduler, which is then aggregated by the DAG implementation. + * @return null by default, handled in upper layers + */ + public TaskSchedulerStatistics getStatistics() { + return null; + } } diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerStatistics.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerStatistics.java new file mode 100644 index 0000000000..8ae1bf38cf --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerStatistics.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.serviceplugins.api; + +import org.apache.hadoop.util.Time; +import org.apache.tez.common.counters.DAGCounter; +import org.apache.tez.common.counters.TezCounters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Statistics aggregator for task scheduler requests. + */ +public class TaskSchedulerStatistics { + private static final Logger LOG = LoggerFactory.getLogger(TaskSchedulerStatistics.class); + + /* + * Tracking task allocation pending times. + */ + private int maxPendingTime = 0; + private int sumPendingTime = 0; + private int averagePendingTime = 0; + private int pendingTaskRequestSamples = 0; + + /** + * Called by task schedulers when a request is removed. + */ + public void trackRequestPendingTime(TaskRequestData request) { + if (request == null) { + LOG.info("Got null TaskRequestData, ignoring (it's fine in case of early shutdowns)"); + return; + } + long requestPendingTime = Time.now() - request.getCreatedTime(); + LOG.debug("Tracking request pending time: {}", requestPendingTime); + + sumPendingTime += (int) requestPendingTime; + pendingTaskRequestSamples += 1; + maxPendingTime = Math.max(maxPendingTime, (int) requestPendingTime); + } + + /** + * Calculates some derived statistics. + * @return this instance + */ + protected TaskSchedulerStatistics aggregate() { + // it's fine to lose float precision here, we're interested in average of milliseconds + this.averagePendingTime = pendingTaskRequestSamples > 0 ? sumPendingTime / pendingTaskRequestSamples : 0; + return this; + } + + /** + * Adds the stats from another aggregator to this one and calculates the derived fields by calling aggregate(). + * @return this instance with ready-to-use data + */ + public TaskSchedulerStatistics add(TaskSchedulerStatistics other) { + if (other == null || other.pendingTaskRequestSamples == 0) { + return aggregate(); + } + this.maxPendingTime = Math.max(maxPendingTime, other.maxPendingTime); + this.sumPendingTime += other.sumPendingTime; + this.pendingTaskRequestSamples += other.pendingTaskRequestSamples; + return aggregate(); + } + + public TezCounters getCounters() { + TezCounters counters = new TezCounters(); + LOG.info("Getting counters from statistics: {}", this.statsToString()); + // prevent filling the counters with useless 0 values if any (e.g. in case of unused TaskScheduler) + if (pendingTaskRequestSamples != 0) { + counters.findCounter(DAGCounter.TASK_SCHEDULER_MAX_PENDING_TIME_MS).setValue(maxPendingTime); + counters.findCounter(DAGCounter.TASK_SCHEDULER_SUM_PENDING_TIME_MS).setValue(sumPendingTime); + counters.findCounter(DAGCounter.TASK_SCHEDULER_AVG_PENDING_TIME_MS).setValue(averagePendingTime); + } + return counters; + } + + public TaskSchedulerStatistics clear() { + averagePendingTime = 0; + sumPendingTime = 0; + pendingTaskRequestSamples = 0; + maxPendingTime = 0; + return this; + } + + /** + * Classes implementing this interface tells basic characteristics about task requests they encapsulate. + */ + public interface TaskRequestData { + // The time when the request was created + long getCreatedTime(); + } + + public String statsToString(){ + return String.format("[pending: {max: %d, sum: %d, average: %d, samples: %d}]", maxPendingTime, sumPendingTime, + averagePendingTime, pendingTaskRequestSamples); + } +} diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 57a100f566..c9dda7f762 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -2591,5 +2591,7 @@ private void updateCounters() { setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED, containersUsedByCurrentDAG.size()); setDagCounter(DAGCounter.NODE_USED_COUNT, nodesUsedByCurrentDAG.size()); setDagCounter(DAGCounter.NODE_TOTAL_COUNT, appContext.getTaskScheduler().getNumClusterNodes(true)); + + dagCounters.incrAllCounters(appContext.getTaskScheduler().getCounters()); } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java index 131302a03a..fd76bb5bd5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java @@ -50,6 +50,7 @@ import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AMState; +import org.apache.tez.serviceplugins.api.TaskSchedulerStatistics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -157,6 +158,8 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler private int lastPreemptionHeartbeat = 0; private long preemptionMaxWaitTime; + private final TaskSchedulerStatistics taskSchedulerStatistics = new TaskSchedulerStatistics(); + public DagAwareYarnTaskScheduler(TaskSchedulerContext taskSchedulerContext) { super(taskSchedulerContext); signatureMatcher = taskSchedulerContext.getContainerSignatureMatcher(); @@ -595,6 +598,7 @@ private void informAppAboutAssignment(TaskRequest request, Container container) request.getCookie()); } else { getContext().taskAllocated(request.getTask(), request.getCookie(), container); + taskSchedulerStatistics.trackRequestPendingTime(request); } } @@ -1162,6 +1166,7 @@ public synchronized void dagComplete() { hc.resetMatchingLevel(); } vertexDescendants = null; + taskSchedulerStatistics.clear(); } @GuardedBy("this") @@ -1265,12 +1270,14 @@ public void updateBlacklist(List additions, List removals) { /** * A utility class to track a task allocation. */ - static class TaskRequest extends AMRMClient.ContainerRequest { + static class TaskRequest extends AMRMClient.ContainerRequest + implements TaskSchedulerStatistics.TaskRequestData { final Object task; final int vertexIndex; final Object signature; final Object cookie; final ContainerId affinityContainerId; + final long created; TaskRequest(Object task, int vertexIndex, Resource capability, String[] hosts, String[] racks, Priority priority, Object signature, Object cookie) { @@ -1285,6 +1292,7 @@ static class TaskRequest extends AMRMClient.ContainerRequest { this.signature = signature; this.cookie = cookie; this.affinityContainerId = affinityContainerId; + this.created = Time.now(); } Object getTask() { @@ -1313,6 +1321,11 @@ boolean hasLocality() { List racks = getRacks(); return (nodes != null && !nodes.isEmpty()) || (racks != null && !racks.isEmpty()); } + + @Override + public long getCreatedTime() { + return created; + } } private enum HeldContainerState { @@ -2103,4 +2116,9 @@ protected void afterExecute(Runnable r, Throwable t) { public int getHeldContainersCount() { return heldContainers.size(); } + + @Override + public TaskSchedulerStatistics getStatistics() { + return taskSchedulerStatistics; + } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java index 20f37119d9..521fc52ca1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java @@ -31,10 +31,15 @@ import com.google.common.primitives.Ints; +import org.apache.hadoop.util.Time; import org.apache.tez.common.TezUtils; +import org.apache.tez.common.counters.AbstractCounters; +import org.apache.tez.common.counters.CounterGroup; +import org.apache.tez.common.counters.TezCounter; import org.apache.tez.serviceplugins.api.DagInfo; import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; +import org.apache.tez.serviceplugins.api.TaskSchedulerStatistics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -64,6 +69,8 @@ public class LocalTaskSchedulerService extends TaskScheduler { final String appTrackingUrl; final long customContainerAppId; + private final TaskSchedulerStatistics taskSchedulerStatistics = new TaskSchedulerStatistics(); + public LocalTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) { super(taskSchedulerContext); taskRequestQueue = new LinkedBlockingQueue<>(); @@ -101,7 +108,7 @@ public int getClusterNodeCount() { } @Override - public void dagComplete() { + public void dagComplete( ) { taskRequestHandler.dagComplete(); } @@ -159,7 +166,7 @@ protected AsyncDelegateRequestHandler createRequestHandler(Configuration conf) { new LocalContainerFactory(getContext().getApplicationAttemptId(), customContainerAppId), taskAllocations, getContext(), - conf); + conf, taskSchedulerStatistics); } @Override @@ -221,11 +228,13 @@ public Container createContainer(Resource capability, Priority priority) { static class SchedulerRequest { } - static class TaskRequest extends SchedulerRequest { + static class TaskRequest extends SchedulerRequest implements TaskSchedulerStatistics.TaskRequestData { final Object task; + final long created; public TaskRequest(Object task) { this.task = task; + this.created = Time.now(); } @Override @@ -251,6 +260,10 @@ public int hashCode() { return 7841 + (task != null ? task.hashCode() : 0); } + @Override + public long getCreatedTime() { + return created; + } } static class AllocateTaskRequest extends TaskRequest implements Comparable { @@ -344,6 +357,7 @@ static class AsyncDelegateRequestHandler implements Runnable { final HashMap taskAllocations; final TaskSchedulerContext taskSchedulerContext; private final Object descendantsLock = new Object(); + private final TaskSchedulerStatistics taskSchedulerStatistics; private ArrayList vertexDescendants = null; final int MAX_TASKS; @@ -351,7 +365,7 @@ static class AsyncDelegateRequestHandler implements Runnable { LocalContainerFactory localContainerFactory, HashMap taskAllocations, TaskSchedulerContext taskSchedulerContext, - Configuration conf) { + Configuration conf, TaskSchedulerStatistics taskSchedulerStatistics) { this.clientRequestQueue = clientRequestQueue; this.localContainerFactory = localContainerFactory; this.taskAllocations = taskAllocations; @@ -359,12 +373,14 @@ static class AsyncDelegateRequestHandler implements Runnable { this.MAX_TASKS = conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT); this.taskRequestQueue = new PriorityBlockingQueue<>(); + this.taskSchedulerStatistics = taskSchedulerStatistics; } void dagComplete() { synchronized (descendantsLock) { vertexDescendants = null; } + taskSchedulerStatistics.clear(); } private void ensureVertexDescendants() { synchronized (descendantsLock) { @@ -423,7 +439,8 @@ public void run() { while (!Thread.currentThread().isInterrupted()) { dispatchRequest(); while (shouldProcess()) { - allocateTask(); + AllocateTaskRequest request = allocateTask(); + taskSchedulerStatistics.trackRequestPendingTime(request); } } } @@ -468,15 +485,17 @@ void maybePreempt(AllocateTaskRequest request) { } } - void allocateTask() { + AllocateTaskRequest allocateTask() { try { AllocateTaskRequest request = taskRequestQueue.take(); Container container = localContainerFactory.createContainer(request.capability, request.priority); taskAllocations.put(request.task, new AllocatedTask(request, container)); taskSchedulerContext.taskAllocated(request.task, request.clientCookie, container); + return request; } catch (InterruptedException e) { Thread.currentThread().interrupt(); + return null; } } @@ -518,4 +537,9 @@ void preemptTask(DeallocateContainerRequest request) { public int getHeldContainersCount() { return 0; } + + @Override + public TaskSchedulerStatistics getStatistics() { + return taskSchedulerStatistics; + } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java index e311c23e86..ba9e345ae7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java @@ -35,6 +35,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.tez.Utils; +import org.apache.tez.common.counters.AbstractCounters; +import org.apache.tez.common.counters.CounterGroup; +import org.apache.tez.common.counters.TezCounter; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; @@ -44,6 +47,7 @@ import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus; +import org.apache.tez.serviceplugins.api.TaskSchedulerStatistics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -1104,4 +1108,12 @@ public String getTaskSchedulerClassName(int taskSchedulerIndex) { public TaskScheduler getTaskScheduler(int taskSchedulerIndex) { return taskSchedulers[taskSchedulerIndex].getTaskScheduler(); } + + public AbstractCounters getCounters() { + TaskSchedulerStatistics stats = new TaskSchedulerStatistics(); + for (TaskSchedulerWrapper taskScheduler : taskSchedulers) { + stats.add(taskScheduler.getStatistics()); + } + return stats.getCounters(); + } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerWrapper.java index 9e0d2ab3c0..8f8335c56b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerWrapper.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerWrapper.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.serviceplugins.api.TaskScheduler; +import org.apache.tez.serviceplugins.api.TaskSchedulerStatistics; public class TaskSchedulerWrapper { @@ -88,6 +89,10 @@ public void dagComplete() throws Exception { real.dagComplete(); } + public TaskSchedulerStatistics getStatistics() { + return real.getStatistics(); + } + public TaskScheduler getTaskScheduler() { return real; } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index b299324721..1f2edd2d1c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -37,11 +37,16 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; import org.apache.tez.common.TezUtils; +import org.apache.tez.common.counters.AbstractCounters; +import org.apache.tez.common.counters.CounterGroup; +import org.apache.tez.common.counters.TezCounter; import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AMState; import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus; +import org.apache.tez.serviceplugins.api.TaskSchedulerStatistics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.lang.exception.ExceptionUtils; @@ -158,6 +163,8 @@ public class YarnTaskSchedulerService extends TaskScheduler @VisibleForTesting protected AtomicBoolean shouldUnregister = new AtomicBoolean(false); + private final TaskSchedulerStatistics taskSchedulerStatistics = new TaskSchedulerStatistics(); + static class CRCookie { // Do not use these variables directly. Can caused mocked unit tests to fail. private Object task; @@ -183,9 +190,10 @@ Object getContainerSignature() { } } - class CookieContainerRequest extends ContainerRequest { + class CookieContainerRequest extends ContainerRequest implements TaskSchedulerStatistics.TaskRequestData { CRCookie cookie; ContainerId affinitizedContainerId; + private final long created; public CookieContainerRequest( Resource capability, @@ -195,6 +203,7 @@ public CookieContainerRequest( CRCookie cookie) { super(capability, hosts, racks, priority); this.cookie = cookie; + this.created = Time.now(); } public CookieContainerRequest( @@ -215,6 +224,11 @@ CRCookie getCookie() { ContainerId getAffinitizedContainer() { return affinitizedContainerId; } + + @Override + public long getCreatedTime() { + return created; + } } public YarnTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) { @@ -874,6 +888,7 @@ public synchronized void dagComplete() { synchronized(delayedContainerManager) { delayedContainerManager.notify(); } + taskSchedulerStatistics.clear(); } @Override @@ -1777,6 +1792,7 @@ private void releaseUnassignedContainers(Iterable containers) { private void informAppAboutAssignment(CookieContainerRequest assigned, Container container) { + taskSchedulerStatistics.trackRequestPendingTime(assigned); getContext().taskAllocated(getTask(assigned), assigned.getCookie().getAppCookie(), container); } @@ -2419,4 +2435,9 @@ public String toString() { public int getHeldContainersCount() { return heldContainers.size(); } + + @Override + public TaskSchedulerStatistics getStatistics() { + return taskSchedulerStatistics; + } } diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java index d7b516add2..4cb3ac351f 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; +import org.apache.tez.serviceplugins.api.TaskSchedulerStatistics; import org.junit.Assert; import org.junit.Test; @@ -37,6 +38,8 @@ import org.apache.tez.dag.app.rm.LocalTaskSchedulerService.LocalContainerFactory; import org.apache.tez.dag.app.rm.LocalTaskSchedulerService.SchedulerRequest; +import static org.mockito.Mockito.mock; + public class TestLocalTaskScheduler { @@ -65,7 +68,7 @@ public void maxTasksAllocationsCannotBeExceeded() { containerFactory, taskAllocations, mockContext, - tezConf); + tezConf, mock(TaskSchedulerStatistics.class)); // Allocate up to max tasks for (int i = 0; i < MAX_TASKS; i++) { diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java index e193ee98f2..009e3d733b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java @@ -33,6 +33,7 @@ import org.apache.tez.dag.app.rm.TestLocalTaskSchedulerService.MockLocalTaskSchedulerSerivce.MockAsyncDelegateRequestHandler; import org.apache.tez.serviceplugins.api.DagInfo; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; +import org.apache.tez.serviceplugins.api.TaskSchedulerStatistics; import org.junit.Assert; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -233,7 +234,7 @@ public AsyncDelegateRequestHandler createRequestHandler(Configuration conf) { new LocalContainerFactory(getContext().getApplicationAttemptId(), customContainerAppId), taskAllocations, getContext(), - conf); + conf, mock(TaskSchedulerStatistics.class)); return requestHandler; } @@ -261,9 +262,10 @@ static class MockAsyncDelegateRequestHandler extends AsyncDelegateRequestHandler LinkedBlockingQueue taskRequestQueue, LocalContainerFactory localContainerFactory, HashMap taskAllocations, - TaskSchedulerContext appClientDelegate, Configuration conf) { + TaskSchedulerContext appClientDelegate, Configuration conf, + TaskSchedulerStatistics taskSchedulerStatistics) { super(taskRequestQueue, localContainerFactory, taskAllocations, - appClientDelegate, conf); + appClientDelegate, conf, taskSchedulerStatistics); } @Override @@ -273,9 +275,10 @@ void dispatchRequest() { } @Override - void allocateTask() { - super.allocateTask(); + AllocateTaskRequest allocateTask() { + AllocateTaskRequest request = super.allocateTask(); allocateCount++; + return request; } public void drainRequest(int count) {