Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,5 +122,18 @@ public enum DAGCounter {
* task assignments). This is typically exposed by a resource manager
* client.
*/
NODE_TOTAL_COUNT
NODE_TOTAL_COUNT,

/*
* The maximum amount of time a task spent waiting as a task request before being scheduled.
*/
TASK_SCHEDULER_MAX_PENDING_TIME_MS,
/*
* The total accumulated time that all tasks spent waiting as task requests.
*/
TASK_SCHEDULER_SUM_PENDING_TIME_MS,
/*
* The average time tasks spent waiting as task requests before being scheduled.
*/
TASK_SCHEDULER_AVG_PENDING_TIME_MS
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public enum SchedulerTaskState {
public TaskScheduler(TaskSchedulerContext taskSchedulerContext) {
this.taskSchedulerContext = taskSchedulerContext;
}

/**
* An entry point for initialization.
* Order of service setup. Constructor, initialize(), start() - when starting a service.
Expand Down Expand Up @@ -280,4 +279,12 @@ protected void onContainersAllocated(List<Container> containers) {
getContext().containerAllocated(container);
}
}

/**
* Collects DAG-level counters from the TaskScheduler, which is then aggregated by the DAG implementation.
* @return null by default, handled in upper layers
*/
public TaskSchedulerStatistics getStatistics() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.serviceplugins.api;

import org.apache.hadoop.util.Time;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TezCounters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Statistics aggregator for task scheduler requests.
*/
public class TaskSchedulerStatistics {
private static final Logger LOG = LoggerFactory.getLogger(TaskSchedulerStatistics.class);

/*
* Tracking task allocation pending times.
*/
private int maxPendingTime = 0;
private int sumPendingTime = 0;
private int averagePendingTime = 0;
private int pendingTaskRequestSamples = 0;

/**
* Called by task schedulers when a request is removed.
*/
public void trackRequestPendingTime(TaskRequestData request) {
if (request == null) {
LOG.info("Got null TaskRequestData, ignoring (it's fine in case of early shutdowns)");
return;
}
long requestPendingTime = Time.now() - request.getCreatedTime();
LOG.debug("Tracking request pending time: {}", requestPendingTime);

sumPendingTime += (int) requestPendingTime;
pendingTaskRequestSamples += 1;
maxPendingTime = Math.max(maxPendingTime, (int) requestPendingTime);
}

/**
* Calculates some derived statistics.
* @return this instance
*/
protected TaskSchedulerStatistics aggregate() {
// it's fine to lose float precision here, we're interested in average of milliseconds
this.averagePendingTime = pendingTaskRequestSamples > 0 ? sumPendingTime / pendingTaskRequestSamples : 0;
return this;
}

/**
* Adds the stats from another aggregator to this one and calculates the derived fields by calling aggregate().
* @return this instance with ready-to-use data
*/
public TaskSchedulerStatistics add(TaskSchedulerStatistics other) {
if (other == null || other.pendingTaskRequestSamples == 0) {
return aggregate();
}
this.maxPendingTime = Math.max(maxPendingTime, other.maxPendingTime);
this.sumPendingTime += other.sumPendingTime;
this.pendingTaskRequestSamples += other.pendingTaskRequestSamples;
return aggregate();
}

public TezCounters getCounters() {
TezCounters counters = new TezCounters();
LOG.info("Getting counters from statistics: {}", this.statsToString());
// prevent filling the counters with useless 0 values if any (e.g. in case of unused TaskScheduler)
if (pendingTaskRequestSamples != 0) {
counters.findCounter(DAGCounter.TASK_SCHEDULER_MAX_PENDING_TIME_MS).setValue(maxPendingTime);
counters.findCounter(DAGCounter.TASK_SCHEDULER_SUM_PENDING_TIME_MS).setValue(sumPendingTime);
counters.findCounter(DAGCounter.TASK_SCHEDULER_AVG_PENDING_TIME_MS).setValue(averagePendingTime);
}
return counters;
}

public TaskSchedulerStatistics clear() {
averagePendingTime = 0;
sumPendingTime = 0;
pendingTaskRequestSamples = 0;
maxPendingTime = 0;
return this;
}

/**
* Classes implementing this interface tells basic characteristics about task requests they encapsulate.
*/
public interface TaskRequestData {
// The time when the request was created
long getCreatedTime();
}

public String statsToString(){
return String.format("[pending: {max: %d, sum: %d, average: %d, samples: %d}]", maxPendingTime, sumPendingTime,
averagePendingTime, pendingTaskRequestSamples);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2591,5 +2591,7 @@ private void updateCounters() {
setDagCounter(DAGCounter.TOTAL_CONTAINERS_USED, containersUsedByCurrentDAG.size());
setDagCounter(DAGCounter.NODE_USED_COUNT, nodesUsedByCurrentDAG.size());
setDagCounter(DAGCounter.NODE_TOTAL_COUNT, appContext.getTaskScheduler().getNumClusterNodes(true));

dagCounters.incrAllCounters(appContext.getTaskScheduler().getCounters());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AMState;
import org.apache.tez.serviceplugins.api.TaskSchedulerStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -157,6 +158,8 @@ public class DagAwareYarnTaskScheduler extends TaskScheduler
private int lastPreemptionHeartbeat = 0;
private long preemptionMaxWaitTime;

private final TaskSchedulerStatistics taskSchedulerStatistics = new TaskSchedulerStatistics();

public DagAwareYarnTaskScheduler(TaskSchedulerContext taskSchedulerContext) {
super(taskSchedulerContext);
signatureMatcher = taskSchedulerContext.getContainerSignatureMatcher();
Expand Down Expand Up @@ -595,6 +598,7 @@ private void informAppAboutAssignment(TaskRequest request, Container container)
request.getCookie());
} else {
getContext().taskAllocated(request.getTask(), request.getCookie(), container);
taskSchedulerStatistics.trackRequestPendingTime(request);
}
}

Expand Down Expand Up @@ -1162,6 +1166,7 @@ public synchronized void dagComplete() {
hc.resetMatchingLevel();
}
vertexDescendants = null;
taskSchedulerStatistics.clear();
}

@GuardedBy("this")
Expand Down Expand Up @@ -1265,12 +1270,14 @@ public void updateBlacklist(List<String> additions, List<String> removals) {
/**
* A utility class to track a task allocation.
*/
static class TaskRequest extends AMRMClient.ContainerRequest {
static class TaskRequest extends AMRMClient.ContainerRequest
implements TaskSchedulerStatistics.TaskRequestData {
final Object task;
final int vertexIndex;
final Object signature;
final Object cookie;
final ContainerId affinityContainerId;
final long created;

TaskRequest(Object task, int vertexIndex, Resource capability, String[] hosts, String[] racks,
Priority priority, Object signature, Object cookie) {
Expand All @@ -1285,6 +1292,7 @@ static class TaskRequest extends AMRMClient.ContainerRequest {
this.signature = signature;
this.cookie = cookie;
this.affinityContainerId = affinityContainerId;
this.created = Time.now();
}

Object getTask() {
Expand Down Expand Up @@ -1313,6 +1321,11 @@ boolean hasLocality() {
List<String> racks = getRacks();
return (nodes != null && !nodes.isEmpty()) || (racks != null && !racks.isEmpty());
}

@Override
public long getCreatedTime() {
return created;
}
}

private enum HeldContainerState {
Expand Down Expand Up @@ -2103,4 +2116,9 @@ protected void afterExecute(Runnable r, Throwable t) {
public int getHeldContainersCount() {
return heldContainers.size();
}

@Override
public TaskSchedulerStatistics getStatistics() {
return taskSchedulerStatistics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,15 @@

import com.google.common.primitives.Ints;

import org.apache.hadoop.util.Time;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.AbstractCounters;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.serviceplugins.api.DagInfo;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.apache.tez.serviceplugins.api.TaskSchedulerStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -64,6 +69,8 @@ public class LocalTaskSchedulerService extends TaskScheduler {
final String appTrackingUrl;
final long customContainerAppId;

private final TaskSchedulerStatistics taskSchedulerStatistics = new TaskSchedulerStatistics();

public LocalTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
super(taskSchedulerContext);
taskRequestQueue = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -101,7 +108,7 @@ public int getClusterNodeCount() {
}

@Override
public void dagComplete() {
public void dagComplete( ) {
taskRequestHandler.dagComplete();
}

Expand Down Expand Up @@ -159,7 +166,7 @@ protected AsyncDelegateRequestHandler createRequestHandler(Configuration conf) {
new LocalContainerFactory(getContext().getApplicationAttemptId(), customContainerAppId),
taskAllocations,
getContext(),
conf);
conf, taskSchedulerStatistics);
}

@Override
Expand Down Expand Up @@ -221,11 +228,13 @@ public Container createContainer(Resource capability, Priority priority) {
static class SchedulerRequest {
}

static class TaskRequest extends SchedulerRequest {
static class TaskRequest extends SchedulerRequest implements TaskSchedulerStatistics.TaskRequestData {
final Object task;
final long created;

public TaskRequest(Object task) {
this.task = task;
this.created = Time.now();
}

@Override
Expand All @@ -251,6 +260,10 @@ public int hashCode() {
return 7841 + (task != null ? task.hashCode() : 0);
}

@Override
public long getCreatedTime() {
return created;
}
}

static class AllocateTaskRequest extends TaskRequest implements Comparable<AllocateTaskRequest> {
Expand Down Expand Up @@ -344,27 +357,30 @@ static class AsyncDelegateRequestHandler implements Runnable {
final HashMap<Object, AllocatedTask> taskAllocations;
final TaskSchedulerContext taskSchedulerContext;
private final Object descendantsLock = new Object();
private final TaskSchedulerStatistics taskSchedulerStatistics;
private ArrayList<BitSet> vertexDescendants = null;
final int MAX_TASKS;

AsyncDelegateRequestHandler(LinkedBlockingQueue<SchedulerRequest> clientRequestQueue,
LocalContainerFactory localContainerFactory,
HashMap<Object, AllocatedTask> taskAllocations,
TaskSchedulerContext taskSchedulerContext,
Configuration conf) {
Configuration conf, TaskSchedulerStatistics taskSchedulerStatistics) {
this.clientRequestQueue = clientRequestQueue;
this.localContainerFactory = localContainerFactory;
this.taskAllocations = taskAllocations;
this.taskSchedulerContext = taskSchedulerContext;
this.MAX_TASKS = conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS,
TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT);
this.taskRequestQueue = new PriorityBlockingQueue<>();
this.taskSchedulerStatistics = taskSchedulerStatistics;
}

void dagComplete() {
synchronized (descendantsLock) {
vertexDescendants = null;
}
taskSchedulerStatistics.clear();
}
private void ensureVertexDescendants() {
synchronized (descendantsLock) {
Expand Down Expand Up @@ -423,7 +439,8 @@ public void run() {
while (!Thread.currentThread().isInterrupted()) {
dispatchRequest();
while (shouldProcess()) {
allocateTask();
AllocateTaskRequest request = allocateTask();
taskSchedulerStatistics.trackRequestPendingTime(request);
}
}
}
Expand Down Expand Up @@ -468,15 +485,17 @@ void maybePreempt(AllocateTaskRequest request) {
}
}

void allocateTask() {
AllocateTaskRequest allocateTask() {
try {
AllocateTaskRequest request = taskRequestQueue.take();
Container container = localContainerFactory.createContainer(request.capability,
request.priority);
taskAllocations.put(request.task, new AllocatedTask(request, container));
taskSchedulerContext.taskAllocated(request.task, request.clientCookie, container);
return request;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}

Expand Down Expand Up @@ -518,4 +537,9 @@ void preemptTask(DeallocateContainerRequest request) {
public int getHeldContainersCount() {
return 0;
}

@Override
public TaskSchedulerStatistics getStatistics() {
return taskSchedulerStatistics;
}
}
Loading