diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java index 33fe772f85..f0de897fda 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java @@ -33,21 +33,27 @@ public interface OutputContext extends TaskContext { * Output's data * @return Name of the Destination Vertex */ - public String getDestinationVertexName(); - + String getDestinationVertexName(); + + /** + * Returns a convenient, human-readable string describing the input and output vertices. + * @return the convenient string + */ + String getInputOutputVertexNames(); + /** * Get the index of the output in the set of all outputs for the task. The * index will be consistent and valid only among the tasks of this vertex. * @return index */ - public int getOutputIndex(); + int getOutputIndex(); /** * Get an {@link OutputStatisticsReporter} for this {@link Output} that can * be used to report statistics like data size * @return {@link OutputStatisticsReporter} */ - public OutputStatisticsReporter getStatisticsReporter(); + OutputStatisticsReporter getStatisticsReporter(); /** * Notify the context that at this point no more events should be sent. diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java index 19ece5a0f6..9aeae25bd9 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java @@ -458,7 +458,7 @@ protected List initializeBase() throws IOException, InterruptedException initCommitter(jobConf, useNewApi); } - LOG.info(getContext().getDestinationVertexName() + ": " + LOG.info(getContext().getInputOutputVertexNames() + ": " + "outputFormat=" + outputFormatClassName + ", using newmapreduce API=" + useNewApi); return null; @@ -576,7 +576,7 @@ public void handleEvents(List outputEvents) { @Override public synchronized List close() throws IOException { flush(); - LOG.info(getContext().getDestinationVertexName() + " closed"); + LOG.info(getContext().getInputOutputVertexNames() + " closed"); long outputRecords = getContext().getCounters() .findCounter(TaskCounter.OUTPUT_RECORDS).getValue(); getContext().getStatisticsReporter().reportItemsProcessed(outputRecords); diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java index 20ec0622c7..a17bc8900d 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java @@ -154,6 +154,12 @@ public String getDestinationVertexName() { return destinationVertexName; } + + @Override + public String getInputOutputVertexNames() { + return String.format("%s -> %s", getTaskVertexName(), getDestinationVertexName()); + } + @Override public void fatalError(Throwable exception, String message) { super.signalFatalError(exception, message, sourceInfo); diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java index 3ff74f72bb..758c069799 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java @@ -183,7 +183,7 @@ public ExternalSorter(OutputContext outputContext, Configuration conf, int numOu rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw(); if (LOG.isDebugEnabled()) { - LOG.debug(outputContext.getDestinationVertexName() + ": Initial Mem bytes : " + + LOG.debug(outputContext.getInputOutputVertexNames() + ": Initial Mem bytes : " + initialMemoryAvailable + ", in MB=" + ((initialMemoryAvailable >> 20))); } int assignedMb = (int) (initialMemoryAvailable >> 20); @@ -201,7 +201,7 @@ public ExternalSorter(OutputContext outputContext, Configuration conf, int numOu this.serializationContext = new SerializationContext(this.conf); keySerializer = serializationContext.getKeySerializer(); valSerializer = serializationContext.getValueSerializer(); - LOG.info(outputContext.getDestinationVertexName() + " using: " + LOG.info(outputContext.getInputOutputVertexNames() + " using: " + "memoryMb=" + assignedMb + ", keySerializerClass=" + serializationContext.getKeyClass() + ", valueSerializerClass=" + valSerializer diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index b70d6c4360..08786c9b2c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -161,7 +161,7 @@ public PipelinedSorter(OutputContext outputContext, Configuration conf, int numO } StringBuilder initialSetupLogLine = new StringBuilder("Setting up PipelinedSorter for ") - .append(outputContext.getDestinationVertexName()).append(": "); + .append(outputContext.getInputOutputVertexNames()).append(": "); partitionBits = bitcount(partitions)+1; boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration @@ -235,10 +235,9 @@ public PipelinedSorter(OutputContext outputContext, Configuration conf, int numO TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS_DEFAULT); sortmaster = Executors.newFixedThreadPool(sortThreads, new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Sorter {" + TezUtilsInternal - .cleanVertexName(outputContext.getDestinationVertexName()) + "} #%d") - .build()); - + .setNameFormat("Sorter {" + TezUtilsInternal.cleanVertexName(outputContext.getTaskVertexName()) + " -> " + + TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName()) + "} #%d") + .build()); valSerializer.open(span.out); keySerializer.open(span.out); @@ -336,7 +335,8 @@ public void sort() throws IOException { boolean ret = spill(true); stopWatch.stop(); if (LOG.isDebugEnabled()) { - LOG.debug(outputContext.getDestinationVertexName() + ": Time taken for spill " + (stopWatch.now(TimeUnit.MILLISECONDS)) + " ms"); + LOG.debug(outputContext.getInputOutputVertexNames() + ": Time taken for spill " + + (stopWatch.now(TimeUnit.MILLISECONDS)) + " ms"); } if (pipelinedShuffle && ret) { sendPipelinedShuffleEvents(); @@ -380,7 +380,7 @@ private void sendPipelinedShuffleEvents() throws IOException{ partitions, sendEmptyPartitionDetails, pathComponent, partitionStats, reportDetailedPartitionStats(), auxiliaryService, deflater); outputContext.sendEvents(events); - LOG.info(outputContext.getDestinationVertexName() + + LOG.info(outputContext.getInputOutputVertexNames() + ": Added spill event for spill (final update=false), spillId=" + (numSpills - 1)); } @@ -496,7 +496,7 @@ private void spillSingleRecord(final Object key, final Object value, ensureSpillFilePermissions(filename, rfs); try { - LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString() + + LOG.info(outputContext.getInputOutputVertexNames() + ": Spilling to " + filename.toString() + ", indexFilename=" + indexFilename); for (int i = 0; i < partitions; ++i) { if (isThreadInterrupted()) { @@ -568,8 +568,9 @@ public boolean spill(boolean ignoreEmptySpills) throws IOException { } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - LOG.info(outputContext.getDestinationVertexName() + ": Interrupted while waiting for mergers to complete"); - throw new IOInterruptedException(outputContext.getDestinationVertexName() + ": Interrupted while waiting for mergers to complete", e); + LOG.info(outputContext.getInputOutputVertexNames() + ": Interrupted while waiting for mergers to complete"); + throw new IOInterruptedException( + outputContext.getInputOutputVertexNames() + ": Interrupted while waiting for mergers to complete", e); } // create spill file @@ -581,7 +582,7 @@ public boolean spill(boolean ignoreEmptySpills) throws IOException { spillFilePaths.put(numSpills, filename); out = rfs.create(filename, true, 4096); ensureSpillFilePermissions(filename, rfs); - LOG.info(outputContext.getDestinationVertexName() + ": Spilling to " + filename.toString()); + LOG.info(outputContext.getInputOutputVertexNames() + ": Spilling to " + filename.toString()); for (int i = 0; i < partitions; ++i) { if (isThreadInterrupted()) { return false; @@ -652,8 +653,9 @@ private boolean isThreadInterrupted() throws IOException { cleanup(); } sortmaster.shutdownNow(); - LOG.info(outputContext.getDestinationVertexName() + ": Thread interrupted, cleaned up stale data, sorter threads shutdown=" + sortmaster - .isShutdown() + ", terminated=" + sortmaster.isTerminated()); + LOG.info(outputContext.getInputOutputVertexNames() + + ": Thread interrupted, cleaned up stale data, sorter threads shutdown=" + sortmaster.isShutdown() + + ", terminated=" + sortmaster.isTerminated()); return true; } return false; @@ -674,7 +676,7 @@ public void flush() throws IOException { } try { - LOG.info(outputContext.getDestinationVertexName() + ": Starting flush of map output"); + LOG.info(outputContext.getInputOutputVertexNames() + ": Starting flush of map output"); span.end(); merger.add(span.sort(sorter)); // force a spill in flush() @@ -698,7 +700,7 @@ public void flush() throws IOException { * NPE leading to distraction when debugging. */ if (LOG.isDebugEnabled()) { - LOG.debug(outputContext.getDestinationVertexName() + LOG.debug(outputContext.getInputOutputVertexNames() + ": Index list is empty... returning"); } return; @@ -717,7 +719,8 @@ public void flush() throws IOException { outputContext, i, indexCacheList.get(i), partitions, sendEmptyPartitionDetails, pathComponent, partitionStats, reportDetailedPartitionStats(), auxiliaryService, deflater); - LOG.info(outputContext.getDestinationVertexName() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i); + LOG.info(outputContext.getInputOutputVertexNames() + ": Adding spill event for spill (final update=" + + isLastEvent + "), spillId=" + i); } return; } @@ -736,7 +739,7 @@ public void flush() throws IOException { sameVolRename(filename, finalOutputFile); sameVolRename(indexFilename, finalIndexFile); if (LOG.isDebugEnabled()) { - LOG.debug(outputContext.getDestinationVertexName() + ": numSpills=" + numSpills + + LOG.debug(outputContext.getInputOutputVertexNames() + ": numSpills=" + numSpills + ", finalOutputFile=" + finalOutputFile + ", " + "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" + indexFilename); @@ -759,7 +762,7 @@ public void flush() throws IOException { mapOutputFile.getOutputIndexFileForWrite(0); //TODO if (LOG.isDebugEnabled()) { - LOG.debug(outputContext.getDestinationVertexName() + ": " + + LOG.debug(outputContext.getInputOutputVertexNames() + ": " + "numSpills: " + numSpills + ", finalOutputFile:" + finalOutputFile + ", finalIndexFile:" + finalIndexFile); } @@ -944,7 +947,7 @@ public SortSpan(ByteBuffer source, int maxItems, int perItem, RawComparator comp } ByteBuffer reserved = source.duplicate(); reserved.mark(); - LOG.info(outputContext.getDestinationVertexName() + ": " + "reserved.remaining()=" + + LOG.info(outputContext.getInputOutputVertexNames() + ": " + "reserved.remaining()=" + reserved.remaining() + ", reserved.metasize=" + metasize); reserved.position(metasize); kvbuffer = reserved.slice(); @@ -966,8 +969,8 @@ public SpanIterator sort(IndexedSorter sorter) { if(length() > 1) { sorter.sort(this, 0, length(), progressable); } - LOG.info(outputContext.getDestinationVertexName() + ": " + "done sorting span=" + index + ", length=" + length() + ", " - + "time=" + (System.currentTimeMillis() - start)); + LOG.info(outputContext.getInputOutputVertexNames() + ": " + "done sorting span=" + index + ", length=" + length() + + ", " + "time=" + (System.currentTimeMillis() - start)); return new SpanIterator((SortSpan)this); } @@ -1042,8 +1045,9 @@ public SortSpan next() { } newSpan = new SortSpan(remaining, items, perItem, newComparator); newSpan.index = index+1; - LOG.info(String.format(outputContext.getDestinationVertexName() + ": " + "New Span%d.length = %d, perItem = %d", newSpan.index, newSpan - .length(), perItem) + ", counter:" + mapOutputRecordCounter.getValue()); + LOG.info( + String.format(outputContext.getInputOutputVertexNames() + ": " + "New Span%d.length = %d, perItem = %d", + newSpan.index, newSpan.length(), perItem) + ", counter:" + mapOutputRecordCounter.getValue()); return newSpan; } return null; @@ -1064,13 +1068,14 @@ public ByteBuffer end() { return null; } int perItem = kvbuffer.position()/items; - LOG.info(outputContext.getDestinationVertexName() + ": " + String.format("Span%d.length = %d, perItem = %d", index, length(), perItem)); + LOG.info(outputContext.getInputOutputVertexNames() + ": " + + String.format("Span%d.length = %d, perItem = %d", index, length(), perItem)); if(remaining.remaining() < METASIZE+perItem) { //Check if we can get the next Buffer from the main buffer list ByteBuffer space = allocateSpace(); if (space != null) { - LOG.info(outputContext.getDestinationVertexName() + ": " + "Getting memory from next block in the list, recordsWritten=" + - mapOutputRecordCounter.getValue()); + LOG.info(outputContext.getInputOutputVertexNames() + ": " + + "Getting memory from next block in the list, recordsWritten=" + mapOutputRecordCounter.getValue()); reinit = true; return space; } @@ -1403,7 +1408,7 @@ public final boolean ready() throws IOException, InterruptedException { total += sp.span.length(); eq += sp.span.getEq(); } - LOG.info(outputContext.getDestinationVertexName() + ": " + "Heap = " + sb.toString()); + LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Heap = " + sb.toString()); return true; } catch(ExecutionException e) { LOG.error("Heap size={}, total={}, eq={}, partition={}, gallop={}, totalItr={}," diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index dd6c083109..7c678749b2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -153,7 +153,7 @@ public DefaultSorter(OutputContext outputContext, Configuration conf, int numOut .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT); if (confPipelinedShuffle) { - LOG.warn(outputContext.getDestinationVertexName() + ": " + + LOG.warn(outputContext.getInputOutputVertexNames() + ": " + TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " does not work " + "with DefaultSorter. It is supported only with PipelinedSorter."); } @@ -371,7 +371,8 @@ synchronized void collect(Object key, Object value, final int partition kvindex = (int)(((long)kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity()); totalKeys++; } catch (MapBufferTooSmallException e) { - LOG.info(outputContext.getDestinationVertexName() + ": Record too large for in-memory buffer: " + e.getMessage()); + LOG.info( + outputContext.getInputOutputVertexNames() + ": Record too large for in-memory buffer: " + e.getMessage()); spillSingleRecord(key, value, partition); mapOutputRecordCounter.increment(1); return; @@ -390,7 +391,7 @@ private void setEquator(int pos) { // Cast one of the operands to long to avoid integer overflow kvindex = (int) (((long) aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4; if (LOG.isInfoEnabled()) { - LOG.info(outputContext.getDestinationVertexName() + ": " + "(EQUATOR) " + pos + " kvi " + kvindex + + LOG.info(outputContext.getInputOutputVertexNames() + ": " + "(EQUATOR) " + pos + " kvi " + kvindex + "(" + (kvindex * 4) + ")"); } } @@ -408,7 +409,7 @@ private void resetSpill() { // Cast one of the operands to long to avoid integer overflow kvstart = kvend = (int) (((long) aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4; if (LOG.isInfoEnabled()) { - LOG.info(outputContext.getDestinationVertexName() + ": " + "(RESET) equator " + e + " kv " + kvstart + "(" + + LOG.info(outputContext.getInputOutputVertexNames() + ": " + "(RESET) equator " + e + " kv " + kvstart + "(" + (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")"); } } @@ -664,7 +665,7 @@ void interruptSpillThread() throws IOException { spillThread.interrupt(); spillThread.join(); } catch (InterruptedException e) { - LOG.info(outputContext.getDestinationVertexName() + ": " + "Spill thread interrupted"); + LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Spill thread interrupted"); //Reset status Thread.currentThread().interrupt(); throw new IOInterruptedException("Spill failed", e); @@ -673,7 +674,7 @@ void interruptSpillThread() throws IOException { @Override public void flush() throws IOException { - LOG.info(outputContext.getDestinationVertexName() + ": " + "Starting flush of map output"); + LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Starting flush of map output"); outputContext.notifyProgress(); if (Thread.currentThread().isInterrupted()) { /** @@ -710,7 +711,7 @@ public void flush() throws IOException { bufend = bufmark; if (LOG.isInfoEnabled()) { LOG.info( - outputContext.getDestinationVertexName() + ": " + "Sorting & Spilling map output. " + outputContext.getInputOutputVertexNames() + ": " + "Sorting & Spilling map output. " + "bufstart = " + bufstart + ", bufend = " + bufmark + ", bufvoid = " + bufvoid + "; " + "kvstart=" + kvstart + "(" + (kvstart * 4) + ")" + ", kvend = " + kvend + "(" + (kvend * 4) + ")" @@ -781,7 +782,7 @@ public void run() { spillLock.unlock(); sortAndSpill(sameKeyCount, totalKeysCount); } catch (Throwable t) { - LOG.warn(outputContext.getDestinationVertexName() + ": " + "Got an exception in sortAndSpill", t); + LOG.warn(outputContext.getInputOutputVertexNames() + ": " + "Got an exception in sortAndSpill", t); sortSpillException = t; } finally { spillLock.lock(); @@ -794,7 +795,7 @@ public void run() { } } } catch (InterruptedException e) { - LOG.info(outputContext.getDestinationVertexName() + ": " + "Spill thread interrupted"); + LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Spill thread interrupted"); Thread.currentThread().interrupt(); } finally { spillLock.unlock(); @@ -830,7 +831,7 @@ private void startSpill() { bufend = bufmark; spillInProgress = true; if (LOG.isInfoEnabled()) { - LOG.info(outputContext.getDestinationVertexName() + ": Spilling map output." + LOG.info(outputContext.getInputOutputVertexNames() + ": Spilling map output." + "bufstart=" + bufstart + ", bufend = " + bufmark + ", bufvoid = " + bufvoid +"; kvstart=" + kvstart + "(" + (kvstart * 4) + ")" +", kvend = " + kvend + "(" + (kvend * 4) + ")" @@ -936,7 +937,7 @@ protected void spill(int mstart, int mend, long sameKeyCount, long totalKeysCoun TezRawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); if (LOG.isDebugEnabled()) { - LOG.debug(outputContext.getDestinationVertexName() + ": " + "Running combine processor"); + LOG.debug(outputContext.getInputOutputVertexNames() + ": " + "Running combine processor"); } runCombineProcessor(kvIter, writer); } @@ -975,7 +976,7 @@ protected void spill(int mstart, int mend, long sameKeyCount, long totalKeysCoun totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } - LOG.info(outputContext.getDestinationVertexName() + ": " + "Finished spill " + numSpills + LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Finished spill " + numSpills + " at " + filename.toString()); ++numSpills; if (!isFinalMergeEnabled()) { @@ -1172,7 +1173,7 @@ private void maybeSendEventForSpill(List events, boolean isLastEvent, outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent, partitionStats, reportDetailedPartitionStats(), auxiliaryService, deflater); - LOG.info(outputContext.getDestinationVertexName() + ": " + + LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index); if (sendEvent) { @@ -1339,7 +1340,7 @@ private void mergeParts() throws IOException, InterruptedException { segmentList.add(s); } if (LOG.isDebugEnabled()) { - LOG.debug(outputContext.getDestinationVertexName() + ": " + LOG.debug(outputContext.getInputOutputVertexNames() + ": " + "TaskIdentifier=" + taskIdentifier + " Partition=" + parts + "Spill =" + i + "(" + indexRecord.getStartOffset() + "," + indexRecord.getRawLength() + ", " + diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index 5ff2944766..faf75866b9 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -104,7 +104,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit // Maybe setup a separate statistics class which can be shared between the // buffer and the main path instead of having multiple arrays. - private final String destNameTrimmed; + private final String sourceDestNameTrimmed; private final long availableMemory; @VisibleForTesting final WrappedBuffer[] buffers; @@ -206,7 +206,8 @@ public UnorderedPartitionedKVWriter(OutputContext outputContext, Configuration c Preconditions.checkArgument(availableMemoryBytes >= 0, "availableMemory should be >= 0 bytes"); - this.destNameTrimmed = TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName()); + this.sourceDestNameTrimmed = TezUtilsInternal.cleanVertexName(outputContext.getTaskVertexName()) + " -> " + + TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName()); //Not checking for TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT as it might not add much value in // this case. Add it later if needed. boolean pipelinedShuffleConf = this.conf.getBoolean(TezRuntimeConfiguration @@ -257,7 +258,7 @@ public UnorderedPartitionedKVWriter(OutputContext outputContext, Configuration c buffers[0] = new WrappedBuffer(numOutputs, sizePerBuffer); numInitializedBuffers = 1; if (LOG.isDebugEnabled()) { - LOG.debug(destNameTrimmed + ": " + "Initializing Buffer #" + + LOG.debug(sourceDestNameTrimmed + ": " + "Initializing Buffer #" + numInitializedBuffers + " with size=" + sizePerBuffer); } currentBuffer = buffers[0]; @@ -313,7 +314,7 @@ public UnorderedPartitionedKVWriter(OutputContext outputContext, Configuration c skipBuffers = false; writer = null; } - LOG.info(destNameTrimmed + ": " + LOG.info(sourceDestNameTrimmed + ": " + "numBuffers=" + numBuffers + ", sizePerBuffer=" + sizePerBuffer + ", skipBuffers=" + skipBuffers @@ -493,7 +494,7 @@ private void setupNextBuffer() throws IOException { // Update overall stats final int filledBufferCount = filledBuffers.size(); if (LOG.isDebugEnabled() || (filledBufferCount % 10) == 0) { - LOG.info(destNameTrimmed + ": " + "Moving to next buffer. Total filled buffers: " + filledBufferCount); + LOG.info(sourceDestNameTrimmed + ": " + "Moving to next buffer. Total filled buffers: " + filledBufferCount); } updateGlobalStats(currentBuffer); @@ -531,7 +532,7 @@ private boolean scheduleSpill(boolean block) throws IOException { final int filledBufferCount = filledBuffers.size(); if (LOG.isDebugEnabled() || (filledBufferCount % 10) == 0) { - LOG.info(destNameTrimmed + ": triggering spill. filledBuffers.size=" + filledBufferCount); + LOG.info(sourceDestNameTrimmed + ": triggering spill. filledBuffers.size=" + filledBufferCount); } pendingSpillCount.incrementAndGet(); int spillNumber = numSpills.getAndIncrement(); @@ -673,10 +674,10 @@ protected SpillResult callInternal() throws IOException { spillResult = new SpillResult(compressedLength, this.filledBuffers); handleSpillIndex(spillPathDetails, spillRecord); - LOG.info(destNameTrimmed + ": " + "Finished spill " + spillIndex); + LOG.info(sourceDestNameTrimmed + ": " + "Finished spill " + spillIndex); if (LOG.isDebugEnabled()) { - LOG.debug(destNameTrimmed + ": " + "Spill=" + spillIndex + ", indexPath=" + LOG.debug(sourceDestNameTrimmed + ": " + "Spill=" + spillIndex + ", indexPath=" + spillPathDetails.indexFilePath + ", outputPath=" + spillPathDetails.outputFilePath); } return spillResult; @@ -754,7 +755,8 @@ public List close() throws IOException, InterruptedException { isShutdown.set(true); spillLock.lock(); try { - LOG.info(destNameTrimmed + ": " + "Waiting for all spills to complete : Pending : " + pendingSpillCount.get()); + LOG.info( + sourceDestNameTrimmed + ": " + "Waiting for all spills to complete : Pending : " + pendingSpillCount.get()); while (pendingSpillCount.get() != 0 && spillException == null) { spillInProgress.await(); } @@ -762,7 +764,7 @@ public List close() throws IOException, InterruptedException { spillLock.unlock(); } if (spillException != null) { - LOG.error(destNameTrimmed + ": " + "Error during spill, throwing"); + LOG.error(sourceDestNameTrimmed + ": " + "Error during spill, throwing"); // Assuming close will be called on the same thread as the write cleanup(); currentBuffer.cleanup(); @@ -773,7 +775,7 @@ public List close() throws IOException, InterruptedException { throw new IOException(spillException); } } else { - LOG.info(destNameTrimmed + ": " + "All spills complete"); + LOG.info(sourceDestNameTrimmed + ": " + "All spills complete"); // Assuming close will be called on the same thread as the write cleanup(); @@ -1082,7 +1084,8 @@ private void mergeAll() throws IOException { for (int i = 0; i < numPartitions; i++) { long segmentStart = out.getPos(); if (numRecordsPerPartition[i] == 0) { - LOG.info(destNameTrimmed + ": " + "Skipping partition: " + i + " in final merge since it has no records"); + LOG.info( + sourceDestNameTrimmed + ": " + "Skipping partition: " + i + " in final merge since it has no records"); continue; } writer = new Writer(keySerialization, valSerialization, out, keyClass, valClass, codec, null, null); @@ -1136,7 +1139,7 @@ private void mergeAll() throws IOException { } finalSpillRecord.writeToFile(finalIndexPath, conf, localFs); fileOutputBytesCounter.increment(indexFileSizeEstimate); - LOG.info(destNameTrimmed + ": " + "Finished final spill after merging : " + numSpills.get() + " spills"); + LOG.info(sourceDestNameTrimmed + ": " + "Finished final spill after merging : " + numSpills.get() + " spills"); } private void deleteIntermediateSpills() { @@ -1208,9 +1211,10 @@ private void writeLargeRecord(final Object key, final Object value, final int pa mayBeSendEventsForSpill(emptyPartitions, sizePerPartition, spillIndex, false); - LOG.info(destNameTrimmed + ": " + "Finished writing large record of size " + outSize + " to spill file " + spillIndex); + LOG.info(sourceDestNameTrimmed + ": " + "Finished writing large record of size " + outSize + " to spill file " + + spillIndex); if (LOG.isDebugEnabled()) { - LOG.debug(destNameTrimmed + ": " + "LargeRecord Spill=" + spillIndex + ", indexPath=" + LOG.debug(sourceDestNameTrimmed + ": " + "LargeRecord Spill=" + spillIndex + ", indexPath=" + spillPathDetails.indexFilePath + ", outputPath=" + spillPathDetails.outputFilePath); } @@ -1346,7 +1350,7 @@ private void mayBeSendEventsForSpill( try { events = generateEventForSpill(emptyPartitions, sizePerPartition, spillNumber, isFinalUpdate); - LOG.info(destNameTrimmed + ": " + "Adding spill event for spill" + LOG.info(sourceDestNameTrimmed + ": " + "Adding spill event for spill" + " (final update=" + isFinalUpdate + "), spillId=" + spillNumber); if (pipelinedShuffle) { //Send out an event for consuming. @@ -1355,7 +1359,7 @@ private void mayBeSendEventsForSpill( this.finalEvents.addAll(events); } } catch (IOException e) { - LOG.error(destNameTrimmed + ": " + "Error in sending pipelined events", e); + LOG.error(sourceDestNameTrimmed + ": " + "Error in sending pipelined events", e); outputContext.reportFailure(TaskFailureType.NON_FATAL, e, "Error in sending events."); } @@ -1414,7 +1418,7 @@ public void onSuccess(SpillResult result) { availableBuffers.add(buffer); } } catch (Throwable e) { - LOG.error(destNameTrimmed + ": Failure while attempting to reset buffer after spill", e); + LOG.error(sourceDestNameTrimmed + ": Failure while attempting to reset buffer after spill", e); outputContext.reportFailure(TaskFailureType.NON_FATAL, e, "Failure while attempting to reset buffer after spill"); } @@ -1444,7 +1448,7 @@ public void onSuccess(SpillResult result) { public void onFailure(Throwable t) { // spillException setup to throw an exception back to the user. Requires synchronization. // Consider removing it in favor of having Tez kill the task - LOG.error(destNameTrimmed + ": " + "Failure while spilling to disk", t); + LOG.error(sourceDestNameTrimmed + ": " + "Failure while spilling to disk", t); spillException = t; outputContext.reportFailure(TaskFailureType.NON_FATAL, t, "Failure while spilling to disk"); spillLock.lock(); diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java index 676fe17a5f..44cb9d6aae 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java @@ -135,7 +135,7 @@ public synchronized void start() throws Exception { if (pipelinedShuffle) { if (finalMergeEnabled) { - LOG.info(getContext().getDestinationVertexName() + " disabling final merge as " + LOG.info(getContext().getInputOutputVertexNames() + " disabling final merge as " + TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED + " is enabled."); finalMergeEnabled = false; conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); @@ -194,7 +194,7 @@ public synchronized List close() throws IOException { returnEvents.addAll(generateEvents()); sorter = null; } else { - LOG.warn(getContext().getDestinationVertexName() + + LOG.warn(getContext().getInputOutputVertexNames() + ": Attempting to close output {} of type {} before it was started. Generating empty events", getContext().getDestinationVertexName(), this.getClass().getSimpleName()); returnEvents = generateEmptyEvents(); diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java index e7a4429d95..bcacc5238e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java @@ -106,7 +106,7 @@ public synchronized void start() throws Exception { this.kvWriter = new UnorderedPartitionedKVWriter(getContext(), conf, 1, memoryUpdateCallbackHandler.getMemoryAssigned()); isStarted.set(true); - LOG.info(getContext().getDestinationVertexName() + " started. MemoryAssigned=" + LOG.info(getContext().getInputOutputVertexNames() + " started. MemoryAssigned=" + memoryUpdateCallbackHandler.getMemoryAssigned()); } } @@ -130,7 +130,7 @@ public synchronized List close() throws Exception { returnEvents = kvWriter.close(); kvWriter = null; } else { - LOG.warn(getContext().getDestinationVertexName() + + LOG.warn(getContext().getInputOutputVertexNames() + ": Attempting to close output {} of type {} before it was started. Generating empty events", getContext().getDestinationVertexName(), this.getClass().getSimpleName()); returnEvents = new LinkedList(); diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java index 439b732db5..9bc7ea40cd 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java @@ -108,7 +108,7 @@ public synchronized List close() throws Exception { returnEvents = kvWriter.close(); kvWriter = null; } else { - LOG.warn(getContext().getDestinationVertexName() + + LOG.warn(getContext().getInputOutputVertexNames() + ": Attempting to close output {} of type {} before it was started. Generating empty events", getContext().getDestinationVertexName(), this.getClass().getSimpleName()); returnEvents = new LinkedList(); diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java index b81c2bd036..a7c7ca28cd 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java @@ -69,6 +69,7 @@ static OutputContext createOutputContext(Configuration conf, Configuration userP }).when(ctx).requestInitialMemory(anyLong(), any(MemoryUpdateCallback.class)); doReturn(conf).when(ctx).getContainerConfiguration(); doReturn(TezUtils.createUserPayloadFromConf(userPayloadConf)).when(ctx).getUserPayload(); + doReturn("taskVertex").when(ctx).getTaskVertexName(); doReturn("destinationVertex").when(ctx).getDestinationVertexName(); doReturn("UUID").when(ctx).getUniqueIdentifier(); doReturn(new String[] { workingDir.toString() }).when(ctx).getWorkDirs(); diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java index 2c9c3b2ace..7999d45fcd 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java @@ -406,6 +406,7 @@ private OutputContext createTezOutputContext() throws IOException { doReturn(payLoad).when(context).getUserPayload(); doReturn(5 * 1024 * 1024l).when(context).getTotalMemoryAvailableToTask(); doReturn(UniqueID).when(context).getUniqueIdentifier(); + doReturn("v0").when(context).getTaskVertexName(); doReturn("v1").when(context).getDestinationVertexName(); doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(context) .getServiceProviderMetaData