Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.tez.mapreduce.grouper.TezSplitGrouper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
Expand Down Expand Up @@ -129,14 +138,69 @@ public class TezGroupedSplitsRecordReader implements RecordReader<K, V> {
int idx = 0;
long progress;
RecordReader<K, V> curReader;

private final AtomicInteger initIndex;
private final int numReaders;
private ExecutorService initReaderExecService;
private BlockingDeque<Future<RecordReader<K, V>>> initedReaders;
private AtomicBoolean failureOccurred = new AtomicBoolean(false);

public TezGroupedSplitsRecordReader(TezGroupedSplit split, JobConf job,
Reporter reporter) throws IOException {
this.groupedSplit = split;
this.job = job;
this.reporter = reporter;
this.initIndex = new AtomicInteger(0);
int numThreads = conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS,
TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT);
this.numReaders = Math.min(groupedSplit.wrappedSplits.size(),
conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_RECORDREADERS,
TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_RECORDREADERS_DEFAULT));
// init the async split opening executor service if numReaders are greater than 1
if (numReaders > 1) {
this.initReaderExecService = Executors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder()
.setDaemon(true)
.setPriority(Thread.MAX_PRIORITY)
.setNameFormat("TEZ-Split-Init-Thread-%d")
.build());
this.initedReaders = new LinkedBlockingDeque<>();
}
initNextRecordReader();
}

private void preInitReaders() {
if (initReaderExecService == null) {
return;
}
for (int i = 0; i < numReaders; i++) {
initedReaders.offer(this.initReaderExecService.submit(() -> {
if (failureOccurred.get()) {
return null;
}
try {
int index = initIndex.getAndIncrement();
if (index >= groupedSplit.wrappedSplits.size()) {
return null;
}
InputSplit s = groupedSplit.wrappedSplits.get(index);
RecordReader<K, V> reader = wrappedInputFormat.getRecordReader(s, job, reporter);
LOG.debug("Init Thread processed reader number {} initialization", index);
return reader;
} catch (Exception e) {
failureOccurred.set(true);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
cancelFutures();
throw new RuntimeException(e);
}
}));
}
}

public RecordReader<K, V> getCurReader() {
return curReader;
}

@Override
public boolean next(K key, V value) throws IOException {
Expand Down Expand Up @@ -171,7 +235,7 @@ public void close() throws IOException {
curReader = null;
}
}

protected boolean initNextRecordReader() throws IOException {
if (curReader != null) {
curReader.close();
Expand All @@ -183,23 +247,45 @@ protected boolean initNextRecordReader() throws IOException {

// if all chunks have been processed, nothing more to do.
if (idx == groupedSplit.wrappedSplits.size()) {
if (initReaderExecService != null) {
LOG.info("Shutting down the init record reader threadpool");
initReaderExecService.shutdownNow();
}
return false;
}

if (LOG.isDebugEnabled()) {
LOG.debug("Init record reader for index " + idx + " of " +
LOG.debug("Init record reader for index " + idx + " of " +
groupedSplit.wrappedSplits.size());
}

// get a record reader for the idx-th chunk
try {
curReader = wrappedInputFormat.getRecordReader(
groupedSplit.wrappedSplits.get(idx), job, reporter);
// get the cur reader directly when async split opening is disabled
if (initReaderExecService == null) {
curReader = wrappedInputFormat.getRecordReader(groupedSplit.wrappedSplits.get(idx), job, reporter);
} else {
preInitReaders();
curReader = initedReaders.take().get();
}
} catch (Exception e) {
throw new RuntimeException (e);
failureOccurred.set(true);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
if (initedReaders != null) {
cancelFutures();
}
throw new RuntimeException(e);
}
idx++;
return true;
return curReader != null;
}

private void cancelFutures() {
for (Future<RecordReader<K, V>> f : initedReaders) {
f.cancel(true);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Set;
import java.util.TreeMap;

import org.apache.hadoop.classification.InterfaceStability;
import org.apache.tez.common.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.util.RackResolver;
Expand Down Expand Up @@ -102,6 +103,20 @@ public abstract class TezSplitGrouper {
public static final String TEZ_GROUPING_NODE_LOCAL_ONLY = "tez.grouping.node.local.only";
public static final boolean TEZ_GROUPING_NODE_LOCAL_ONLY_DEFAULT = false;

/**
* Number of threads used to initialize the grouped splits, to asynchronously open the readers.
*/
public static final String TEZ_GROUPING_SPLIT_INIT_THREADS = "tez.grouping.split.init.threads";
public static final int TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT = 4;

/**
* Number of record readers to asynchronously and proactively init.
* In order for upstream apps to use this feature, the objects created in the
* upstream apps as part TezGroupedSplitsRecordReader call should be thread safe.
*/
@InterfaceStability.Unstable
public static final String TEZ_GROUPING_SPLIT_INIT_RECORDREADERS = "tez.grouping.split.init.recordreaders";
public static final int TEZ_GROUPING_SPLIT_INIT_RECORDREADERS_DEFAULT = 1;

static class LocationHolder {
List<SplitContainer> splits;
Expand Down