Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,27 @@ public interface OutputContext extends TaskContext {
* Output's data
* @return Name of the Destination Vertex
*/
public String getDestinationVertexName();

String getDestinationVertexName();

/**
* Returns a convenient, human-readable string describing the input and output vertices.
* @return the convenient string
*/
String getInputOutputVertexNames();

/**
* Get the index of the output in the set of all outputs for the task. The
* index will be consistent and valid only among the tasks of this vertex.
* @return index
*/
public int getOutputIndex();
int getOutputIndex();

/**
* Get an {@link OutputStatisticsReporter} for this {@link Output} that can
* be used to report statistics like data size
* @return {@link OutputStatisticsReporter}
*/
public OutputStatisticsReporter getStatisticsReporter();
OutputStatisticsReporter getStatisticsReporter();

/**
* Notify the context that at this point no more events should be sent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ protected List<Event> initializeBase() throws IOException, InterruptedException
initCommitter(jobConf, useNewApi);
}

LOG.info(getContext().getDestinationVertexName() + ": "
LOG.info(getContext().getInputOutputVertexNames() + ": "
+ "outputFormat=" + outputFormatClassName
+ ", using newmapreduce API=" + useNewApi);
return null;
Expand Down Expand Up @@ -576,7 +576,7 @@ public void handleEvents(List<Event> outputEvents) {
@Override
public synchronized List<Event> close() throws IOException {
flush();
LOG.info(getContext().getDestinationVertexName() + " closed");
LOG.info(getContext().getInputOutputVertexNames() + " closed");
long outputRecords = getContext().getCounters()
.findCounter(TaskCounter.OUTPUT_RECORDS).getValue();
getContext().getStatisticsReporter().reportItemsProcessed(outputRecords);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ public String getDestinationVertexName() {
return destinationVertexName;
}


@Override
public String getInputOutputVertexNames() {
return String.format("%s -> %s", getTaskVertexName(), getDestinationVertexName());
}

@Override
public void fatalError(Throwable exception, String message) {
super.signalFatalError(exception, message, sourceInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public ExternalSorter(OutputContext outputContext, Configuration conf, int numOu
rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw();

if (LOG.isDebugEnabled()) {
LOG.debug(outputContext.getDestinationVertexName() + ": Initial Mem bytes : " +
LOG.debug(outputContext.getInputOutputVertexNames() + ": Initial Mem bytes : " +
initialMemoryAvailable + ", in MB=" + ((initialMemoryAvailable >> 20)));
}
int assignedMb = (int) (initialMemoryAvailable >> 20);
Expand All @@ -201,7 +201,7 @@ public ExternalSorter(OutputContext outputContext, Configuration conf, int numOu
this.serializationContext = new SerializationContext(this.conf);
keySerializer = serializationContext.getKeySerializer();
valSerializer = serializationContext.getValueSerializer();
LOG.info(outputContext.getDestinationVertexName() + " using: "
LOG.info(outputContext.getInputOutputVertexNames() + " using: "
+ "memoryMb=" + assignedMb
+ ", keySerializerClass=" + serializationContext.getKeyClass()
+ ", valueSerializerClass=" + valSerializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public PipelinedSorter(OutputContext outputContext, Configuration conf, int numO
}

StringBuilder initialSetupLogLine = new StringBuilder("Setting up PipelinedSorter for ")
.append(outputContext.getDestinationVertexName()).append(": ");
.append(outputContext.getInputOutputVertexNames()).append(": ");
partitionBits = bitcount(partitions)+1;

boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
Expand Down Expand Up @@ -235,10 +235,9 @@ public PipelinedSorter(OutputContext outputContext, Configuration conf, int numO
TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS_DEFAULT);
sortmaster = Executors.newFixedThreadPool(sortThreads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Sorter {" + TezUtilsInternal
.cleanVertexName(outputContext.getDestinationVertexName()) + "} #%d")
.build());

.setNameFormat("Sorter {" + TezUtilsInternal.cleanVertexName(outputContext.getTaskVertexName()) + " -> "
+ TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName()) + "} #%d")
.build());

valSerializer.open(span.out);
keySerializer.open(span.out);
Expand Down Expand Up @@ -336,7 +335,8 @@ public void sort() throws IOException {
boolean ret = spill(true);
stopWatch.stop();
if (LOG.isDebugEnabled()) {
LOG.debug(outputContext.getDestinationVertexName() + ": Time taken for spill " + (stopWatch.now(TimeUnit.MILLISECONDS)) + " ms");
LOG.debug(outputContext.getInputOutputVertexNames() + ": Time taken for spill "
+ (stopWatch.now(TimeUnit.MILLISECONDS)) + " ms");
}
if (pipelinedShuffle && ret) {
sendPipelinedShuffleEvents();
Expand Down Expand Up @@ -380,7 +380,7 @@ private void sendPipelinedShuffleEvents() throws IOException{
partitions, sendEmptyPartitionDetails, pathComponent, partitionStats,
reportDetailedPartitionStats(), auxiliaryService, deflater);
outputContext.sendEvents(events);
LOG.info(outputContext.getDestinationVertexName() +
LOG.info(outputContext.getInputOutputVertexNames() +
": Added spill event for spill (final update=false), spillId=" + (numSpills - 1));
}

Expand Down Expand Up @@ -496,7 +496,7 @@ private void spillSingleRecord(final Object key, final Object value,
ensureSpillFilePermissions(filename, rfs);

try {
LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString() +
LOG.info(outputContext.getInputOutputVertexNames() + ": Spilling to " + filename.toString() +
", indexFilename=" + indexFilename);
for (int i = 0; i < partitions; ++i) {
if (isThreadInterrupted()) {
Expand Down Expand Up @@ -568,8 +568,9 @@ public boolean spill(boolean ignoreEmptySpills) throws IOException {
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.info(outputContext.getDestinationVertexName() + ": Interrupted while waiting for mergers to complete");
throw new IOInterruptedException(outputContext.getDestinationVertexName() + ": Interrupted while waiting for mergers to complete", e);
LOG.info(outputContext.getInputOutputVertexNames() + ": Interrupted while waiting for mergers to complete");
throw new IOInterruptedException(
outputContext.getInputOutputVertexNames() + ": Interrupted while waiting for mergers to complete", e);
}

// create spill file
Expand All @@ -581,7 +582,7 @@ public boolean spill(boolean ignoreEmptySpills) throws IOException {
spillFilePaths.put(numSpills, filename);
out = rfs.create(filename, true, 4096);
ensureSpillFilePermissions(filename, rfs);
LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString());
LOG.info(outputContext.getInputOutputVertexNames() + ": Spilling to " + filename.toString());
for (int i = 0; i < partitions; ++i) {
if (isThreadInterrupted()) {
return false;
Expand Down Expand Up @@ -652,8 +653,9 @@ private boolean isThreadInterrupted() throws IOException {
cleanup();
}
sortmaster.shutdownNow();
LOG.info(outputContext.getDestinationVertexName() + ": Thread interrupted, cleaned up stale data, sorter threads shutdown=" + sortmaster
.isShutdown() + ", terminated=" + sortmaster.isTerminated());
LOG.info(outputContext.getInputOutputVertexNames()
+ ": Thread interrupted, cleaned up stale data, sorter threads shutdown=" + sortmaster.isShutdown()
+ ", terminated=" + sortmaster.isTerminated());
return true;
}
return false;
Expand All @@ -674,7 +676,7 @@ public void flush() throws IOException {
}

try {
LOG.info(outputContext.getDestinationVertexName() + ": Starting flush of map output");
LOG.info(outputContext.getInputOutputVertexNames() + ": Starting flush of map output");
span.end();
merger.add(span.sort(sorter));
// force a spill in flush()
Expand All @@ -698,7 +700,7 @@ public void flush() throws IOException {
* NPE leading to distraction when debugging.
*/
if (LOG.isDebugEnabled()) {
LOG.debug(outputContext.getDestinationVertexName()
LOG.debug(outputContext.getInputOutputVertexNames()
+ ": Index list is empty... returning");
}
return;
Expand All @@ -717,7 +719,8 @@ public void flush() throws IOException {
outputContext, i, indexCacheList.get(i), partitions,
sendEmptyPartitionDetails, pathComponent, partitionStats,
reportDetailedPartitionStats(), auxiliaryService, deflater);
LOG.info(outputContext.getDestinationVertexName() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i);
LOG.info(outputContext.getInputOutputVertexNames() + ": Adding spill event for spill (final update="
+ isLastEvent + "), spillId=" + i);
}
return;
}
Expand All @@ -736,7 +739,7 @@ public void flush() throws IOException {
sameVolRename(filename, finalOutputFile);
sameVolRename(indexFilename, finalIndexFile);
if (LOG.isDebugEnabled()) {
LOG.debug(outputContext.getDestinationVertexName() + ": numSpills=" + numSpills +
LOG.debug(outputContext.getInputOutputVertexNames() + ": numSpills=" + numSpills +
", finalOutputFile=" + finalOutputFile + ", "
+ "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" +
indexFilename);
Expand All @@ -759,7 +762,7 @@ public void flush() throws IOException {
mapOutputFile.getOutputIndexFileForWrite(0); //TODO

if (LOG.isDebugEnabled()) {
LOG.debug(outputContext.getDestinationVertexName() + ": " +
LOG.debug(outputContext.getInputOutputVertexNames() + ": " +
"numSpills: " + numSpills + ", finalOutputFile:" + finalOutputFile + ", finalIndexFile:"
+ finalIndexFile);
}
Expand Down Expand Up @@ -944,7 +947,7 @@ public SortSpan(ByteBuffer source, int maxItems, int perItem, RawComparator comp
}
ByteBuffer reserved = source.duplicate();
reserved.mark();
LOG.info(outputContext.getDestinationVertexName() + ": " + "reserved.remaining()=" +
LOG.info(outputContext.getInputOutputVertexNames() + ": " + "reserved.remaining()=" +
reserved.remaining() + ", reserved.metasize=" + metasize);
reserved.position(metasize);
kvbuffer = reserved.slice();
Expand All @@ -966,8 +969,8 @@ public SpanIterator sort(IndexedSorter sorter) {
if(length() > 1) {
sorter.sort(this, 0, length(), progressable);
}
LOG.info(outputContext.getDestinationVertexName() + ": " + "done sorting span=" + index + ", length=" + length() + ", "
+ "time=" + (System.currentTimeMillis() - start));
LOG.info(outputContext.getInputOutputVertexNames() + ": " + "done sorting span=" + index + ", length=" + length()
+ ", " + "time=" + (System.currentTimeMillis() - start));
return new SpanIterator((SortSpan)this);
}

Expand Down Expand Up @@ -1042,8 +1045,9 @@ public SortSpan next() {
}
newSpan = new SortSpan(remaining, items, perItem, newComparator);
newSpan.index = index+1;
LOG.info(String.format(outputContext.getDestinationVertexName() + ": " + "New Span%d.length = %d, perItem = %d", newSpan.index, newSpan
.length(), perItem) + ", counter:" + mapOutputRecordCounter.getValue());
LOG.info(
String.format(outputContext.getInputOutputVertexNames() + ": " + "New Span%d.length = %d, perItem = %d",
newSpan.index, newSpan.length(), perItem) + ", counter:" + mapOutputRecordCounter.getValue());
return newSpan;
}
return null;
Expand All @@ -1064,13 +1068,14 @@ public ByteBuffer end() {
return null;
}
int perItem = kvbuffer.position()/items;
LOG.info(outputContext.getDestinationVertexName() + ": " + String.format("Span%d.length = %d, perItem = %d", index, length(), perItem));
LOG.info(outputContext.getInputOutputVertexNames() + ": "
+ String.format("Span%d.length = %d, perItem = %d", index, length(), perItem));
if(remaining.remaining() < METASIZE+perItem) {
//Check if we can get the next Buffer from the main buffer list
ByteBuffer space = allocateSpace();
if (space != null) {
LOG.info(outputContext.getDestinationVertexName() + ": " + "Getting memory from next block in the list, recordsWritten=" +
mapOutputRecordCounter.getValue());
LOG.info(outputContext.getInputOutputVertexNames() + ": "
+ "Getting memory from next block in the list, recordsWritten=" + mapOutputRecordCounter.getValue());
reinit = true;
return space;
}
Expand Down Expand Up @@ -1403,7 +1408,7 @@ public final boolean ready() throws IOException, InterruptedException {
total += sp.span.length();
eq += sp.span.getEq();
}
LOG.info(outputContext.getDestinationVertexName() + ": " + "Heap = " + sb.toString());
LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Heap = " + sb.toString());
return true;
} catch(ExecutionException e) {
LOG.error("Heap size={}, total={}, eq={}, partition={}, gallop={}, totalItr={},"
Expand Down
Loading