Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -2309,6 +2309,16 @@ static Set<String> getPropertySet() {
public static final String TEZ_THREAD_DUMP_INTERVAL = "tez.thread.dump.interval";
public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "100ms";

/**
* Time after which the first thread dump should be captured. Supports TimeUnits. This is effective only
* when org.apache.tez.dag.app.ThreadDumpDAGHook is configured to tez.am.hooks or
* org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook is configured to tez.task.attempt.hooks.
*/
@ConfigurationScope(Scope.DAG)
@ConfigurationProperty
public static final String TEZ_THREAD_DUMP_INITIAL_DELAY = "tez.thread.dump.initial.delay";
public static final String TEZ_THREAD_DUMP_INITIAL_DELAY_DEFAULT = "0ms";

/**
* Limits the amount of data that can be written to LocalFileSystem by a Task.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_REMOTE_APP_LOG_DIR;
import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INITIAL_DELAY;
import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INITIAL_DELAY_DEFAULT;
import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL;
import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL_DEFAULT;

Expand All @@ -28,6 +30,7 @@
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -49,7 +52,8 @@

public class TezThreadDumpHelper {

private final long duration;
private final long threadDumpFrequency;
private final long threadDumpInitialDelay;
private final Path basePath;
private final FileSystem fs;

Expand All @@ -58,8 +62,10 @@ public class TezThreadDumpHelper {

private ScheduledExecutorService periodicThreadDumpServiceExecutor;

private TezThreadDumpHelper(long duration, Configuration conf) throws IOException {
this.duration = duration;
private TezThreadDumpHelper(long threadDumpFrequency, long threadDumpInitialDelay, Configuration conf)
throws IOException {
this.threadDumpFrequency = threadDumpFrequency;
this.threadDumpInitialDelay = threadDumpInitialDelay;
Appender appender = org.apache.log4j.Logger.getRootLogger().getAppender(TezConstants.TEZ_CONTAINER_LOGGER_NAME);
if (appender instanceof TezContainerLogAppender) {
this.basePath = new Path(((TezContainerLogAppender) appender).getContainerLogDir());
Expand All @@ -69,18 +75,21 @@ private TezThreadDumpHelper(long duration, Configuration conf) throws IOExceptio
this.basePath = new Path(conf.get(NM_REMOTE_APP_LOG_DIR, DEFAULT_NM_REMOTE_APP_LOG_DIR));
this.fs = this.basePath.getFileSystem(conf);
}
LOG.info("Periodic Thread Dump Capture Service Configured to capture Thread Dumps at {} ms frequency and at " +
"path: {}", duration, basePath);
LOG.info("Periodic Thread Dump Capture Service Configured to capture Thread Dumps at {} ms frequency and at "
+ "path: {} with an initial delay of {}", threadDumpFrequency, basePath, threadDumpInitialDelay);
}

public static TezThreadDumpHelper getInstance(Configuration conf) {
long periodicThreadDumpFrequency = conf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL,
TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
Preconditions.checkArgument(periodicThreadDumpFrequency > 0, "%s must be positive duration",
TEZ_THREAD_DUMP_INTERVAL);
long threadDumpFrequency =
conf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL, TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
Preconditions.checkArgument(threadDumpFrequency > 0, "%s must be positive duration", TEZ_THREAD_DUMP_INTERVAL);
long threadDumpInitialDelay =
conf.getTimeDuration(TEZ_THREAD_DUMP_INITIAL_DELAY, TEZ_THREAD_DUMP_INITIAL_DELAY_DEFAULT,
TimeUnit.MILLISECONDS);
Preconditions.checkArgument(threadDumpInitialDelay >= 0, "%s can not be negative", TEZ_THREAD_DUMP_INITIAL_DELAY);

try {
return new TezThreadDumpHelper(periodicThreadDumpFrequency, conf);
return new TezThreadDumpHelper(threadDumpFrequency, threadDumpInitialDelay, conf);
} catch (IOException e) {
throw new TezUncheckedException("Can not initialize periodic thread dump service", e);
}
Expand All @@ -91,7 +100,8 @@ public TezThreadDumpHelper start(String name) {
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("PeriodicThreadDumpService{" + name + "} #%d")
.build());
Runnable threadDumpCollector = new ThreadDumpCollector(basePath, name, fs);
periodicThreadDumpServiceExecutor.schedule(threadDumpCollector, duration, TimeUnit.MILLISECONDS);
periodicThreadDumpServiceExecutor.scheduleWithFixedDelay(threadDumpCollector, threadDumpInitialDelay,
threadDumpFrequency, TimeUnit.MILLISECONDS);
return this;
}

Expand Down Expand Up @@ -128,7 +138,7 @@ public void run() {
if (!Thread.interrupted()) {
try (FSDataOutputStream fsStream = fs.create(
new Path(path, name + "_" + System.currentTimeMillis() + ".jstack"));
PrintStream printStream = new PrintStream(fsStream, false, "UTF8")) {
PrintStream printStream = new PrintStream(fsStream, false, StandardCharsets.UTF_8)) {
printThreadInfo(printStream, name);
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
6 changes: 6 additions & 0 deletions tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
Original file line number Diff line number Diff line change
Expand Up @@ -628,18 +628,24 @@ private static void validateThreadDumpCaptured(Path jstackPath) throws IOExcepti
RemoteIterator<LocatedFileStatus> files = localFs.listFiles(jstackPath, true);
boolean appMasterDumpFound = false;
boolean tezChildDumpFound = false;
int numAppMasterDumps = 0;
int numTezChildDumps = 0;
while (files.hasNext()) {
LocatedFileStatus file = files.next();
if (file.getPath().getName().endsWith(".jstack")) {
if (file.getPath().getName().contains("attempt")) {
tezChildDumpFound = true;
numTezChildDumps++;
} else {
appMasterDumpFound = true;
numAppMasterDumps++;
}
}
}
assertTrue(tezChildDumpFound);
assertTrue(appMasterDumpFound);
assertTrue(numAppMasterDumps >= 2);
assertTrue(numTezChildDumps >= 2);
}

/**
Expand Down
Loading