Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,17 @@ public TezConfiguration(boolean loadDefaults) {
public static final String TEZ_AM_YARN_SCHEDULER_CLASS_DEFAULT =
"org.apache.tez.dag.app.rm.YarnTaskSchedulerService";

/**
* Int value. The AM waits this amount of time when the first DAG is submitted but not all the services are ready.
* This can happen when the client RPC handler is up and able to accept DAGs but e.g. task scheduler
* manager is not ready (e.g. a task scheduler is waiting for external resources).
* A value equal or less than 0 is not supported and leads to an exception.
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty
public static final String TEZ_AM_READY_FOR_SUBMIT_TIMEOUT_MS = TEZ_AM_PREFIX + "ready.for.submit.timeout.ms";
public static final int TEZ_AM_READY_FOR_SUBMIT_TIMEOUT_MS_DEFAULT = 30000;

/** Int value. The amount of memory in MB to be used by the AppMaster */
@ConfigurationScope(Scope.AM)
@ConfigurationProperty(type="integer")
Expand Down
14 changes: 14 additions & 0 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ public class DAGAppMaster extends AbstractService {
private DagEventDispatcher dagEventDispatcher;
private VertexEventDispatcher vertexEventDispatcher;
private TaskSchedulerManager taskSchedulerManager;
private DAGAppMasterReadinessService appMasterReadinessService;
private WebUIService webUIService;
private HistoryEventHandler historyEventHandler;
private final Map<String, LocalResource> amResources = new HashMap<String, LocalResource>();
Expand Down Expand Up @@ -587,6 +588,8 @@ protected void serviceInit(final Configuration conf) throws Exception {
taskSchedulerManager);
addIfServiceDependency(taskSchedulerManager, clientRpcServer);

appMasterReadinessService = createAppMasterReadinessService();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue is happening as a corner case, as the taskscheduler could be taking longer to init. In this case, possibly "LlapTaskSchedulerService" is taking time to init due to ZK dependency.

Instead of adding one more dependency, why not throw RuntimeException in TaskSchedulerManager::getTaskSchedulerClassName (i.e until all taskSchedulers are initied w.r.t appContext) ?


this.containerLauncherManager = createContainerLauncherManager(containerLauncherDescriptors,
isLocal);
addIfService(containerLauncherManager, true);
Expand Down Expand Up @@ -662,6 +665,15 @@ protected TaskSchedulerManager createTaskSchedulerManager(
taskSchedulerDescriptors, isLocal, hadoopShim);
}

@VisibleForTesting
protected DAGAppMasterReadinessService createAppMasterReadinessService() {
DAGAppMasterReadinessService service =
new DAGAppMasterReadinessService(DAGAppMasterReadinessService.class.getName());
addIfService(service, false);
addIfServiceDependency(service, taskSchedulerManager);
return service;
}

@VisibleForTesting
protected ContainerSignatureMatcher createContainerSignatureMatcher() {
return new ContainerContextMatcher();
Expand Down Expand Up @@ -1291,6 +1303,8 @@ public Void run() throws Exception {

public String submitDAGToAppMaster(DAGPlan dagPlan,
Map<String, LocalResource> additionalResources) throws TezException {
appMasterReadinessService.waitToBeReady();

if (sessionStopped.get()) {
throw new SessionNotRunning("AM unable to accept new DAG submissions."
+ " In the process of shutting down");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app;

import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This is an artifical service to be used in DAGAppMaster,
* which can be added to have dependencies that are crucial in order to be
* able to run DAGs.
*
*/
public class DAGAppMasterReadinessService extends AbstractService {
private static final Logger LOG = LoggerFactory.getLogger(DAGAppMasterReadinessService.class);

private AtomicBoolean ready = new AtomicBoolean(false);
private int timeoutMs;

public DAGAppMasterReadinessService(String name) {
super(name);
}

@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
timeoutMs = getConfig().getInt(TezConfiguration.TEZ_AM_READY_FOR_SUBMIT_TIMEOUT_MS,
TezConfiguration.TEZ_AM_READY_FOR_SUBMIT_TIMEOUT_MS_DEFAULT);
if (timeoutMs <= 0) {
throw new TezException(
"timeout <= 0 is not supported for " + TezConfiguration.TEZ_AM_READY_FOR_SUBMIT_TIMEOUT_MS);
}
}

@Override
protected void serviceStart() throws Exception {
super.serviceStart();
ready.set(true);
}

/**
* The waitToBeReady waits until this service really starts. When the serviceStart
* is called and this service is ready, we can make sure that the dependency services
* has already been started too.
* @throws TezException
*/
public void waitToBeReady() throws TezException {
long start = System.currentTimeMillis();
while (!ready.get()) {
if (System.currentTimeMillis() - start > timeoutMs) {
throw new TezException("App Master is not ready within the configured time period (" + timeoutMs + "ms). "
+ "Please check logs for AM service states.");
}
try {
LOG.info("App is not ready yet, waiting 100ms");
Thread.sleep(100);
} catch (InterruptedException e) {
throw new TezException(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ public TestTokenIdentifier createIdentifier() {
public static class DAGAppMasterForTest extends DAGAppMaster {
private DAGAppMasterShutdownHandler mockShutdown;
private TaskSchedulerManager mockScheduler = mock(TaskSchedulerManager.class);
private DAGAppMasterReadinessService mockAppMasterReadinessService = mock(DAGAppMasterReadinessService.class);

public DAGAppMasterForTest(ApplicationAttemptId attemptId, boolean isSession) {
super(attemptId, ContainerId.newContainerId(attemptId, 1), "hostname", 12345, 12346,
Expand Down Expand Up @@ -774,5 +775,10 @@ protected TaskSchedulerManager createTaskSchedulerManager(
List<NamedEntityDescriptor> taskSchedulerDescriptors) {
return mockScheduler;
}

@Override
protected DAGAppMasterReadinessService createAppMasterReadinessService() {
return mockAppMasterReadinessService;
}
}
}