Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
bd7ab3f
Add internal accumulators to TaskContext
Jul 28, 2015
3c4f042
Track memory usage in ExternalAppendOnlyMap / ExternalSorter
Jul 29, 2015
a417592
Expose memory metrics in UnsafeExternalSorter
Jul 29, 2015
e6c3e2f
Move internal accumulators creation to Stage
Jul 29, 2015
4ef4cb1
Merge branch 'master' of github.com:apache/spark into expose-memory-m…
Jul 29, 2015
9e824f2
Add back execution memory tracking for *ExternalSort
Jul 29, 2015
9c605a4
Track execution memory in GeneratedAggregate
Jul 29, 2015
770ee54
Track execution memory in broadcast joins
Jul 30, 2015
d9b9015
Track execution memory in unsafe shuffles
Jul 30, 2015
eee5437
Merge branch 'master' of github.com:apache/spark into expose-memory-m…
Jul 30, 2015
92b4b6b
Display peak execution memory on the UI
Jul 30, 2015
5b5e6f3
Add peak execution memory to summary table + tooltip
Jul 30, 2015
9528d09
Merge branch 'master' of github.com:apache/spark into expose-memory-m…
Jul 30, 2015
59231e4
Fix tests
Jul 30, 2015
5107691
Add tests for internal accumulators
Jul 30, 2015
b5c51c1
Re-enable test in JavaAPISuite
Jul 30, 2015
a757550
Address comments
Jul 30, 2015
23c845d
Add tests for SQL operators
Jul 31, 2015
a919eb7
Add tests for unsafe shuffle writer
Jul 31, 2015
a7a39a5
Strengthen presence check for accumulator
Jul 31, 2015
111a05e
Merge branch 'master' of github.com:apache/spark into expose-memory-m…
Jul 31, 2015
0b6926c
Oops
Jul 31, 2015
1ecf678
Minor changes: comments, style, unused imports
Jul 31, 2015
5f1235b
Merge branch 'master' of github.com:apache/spark into expose-memory-m…
Jul 31, 2015
2480d84
Expand test coverage
Jul 31, 2015
d090a94
Fix style
Jul 31, 2015
663a303
UnsafeShuffleWriter: update peak memory before close
Jul 31, 2015
b889a68
Minor changes: comments, spacing, style
Jul 31, 2015
6aa2f7a
Merge branch 'master' of github.com:apache/spark into expose-memory-m…
Jul 31, 2015
2840b7d
Merge branch 'master' of github.com:apache/spark into expose-memory-m…
Aug 2, 2015
d70874d
Fix test compile + address comments
Aug 2, 2015
a87b4d0
Fix compile?
Aug 2, 2015
17f4c2d
Fix compile?
Aug 2, 2015
10da1cd
Fix compile
Aug 3, 2015
c00a197
Fix potential NPEs
Aug 3, 2015
0625d73
Merge branch 'master' of github.com:apache/spark into expose-memory-m…
Aug 3, 2015
b3b92f6
Address comments
Aug 3, 2015
d0fef87
Fix tests?
Aug 3, 2015
40b4802
Fix style?
Aug 3, 2015
361a359
Merge branch 'master' of github.com:apache/spark into expose-memory-m…
Aug 3, 2015
876bfa4
Fix failing test after logical merge conflict
Aug 3, 2015
9de2a12
Fix tests due to another logical merge conflict
Aug 3, 2015
8eefbc5
Fix non-failing tests
Aug 3, 2015
d7df332
Merge branch 'master' of github.com:apache/spark into expose-memory-m…
Aug 3, 2015
f5b0d68
Merge branch 'master' of github.com:apache/spark into expose-memory-m…
Aug 3, 2015
9abecb9
Merge branch 'master' of github.com:apache/spark into expose-memory-m…
Aug 3, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import javax.annotation.Nullable;

import scala.Tuple2;

Expand Down Expand Up @@ -86,9 +87,12 @@ final class UnsafeShuffleExternalSorter {

private final LinkedList<SpillInfo> spills = new LinkedList<SpillInfo>();

/** Peak memory used by this sorter so far, in bytes. **/
private long peakMemoryUsedBytes;

// These variables are reset after spilling:
private UnsafeShuffleInMemorySorter sorter;
private MemoryBlock currentPage = null;
@Nullable private UnsafeShuffleInMemorySorter sorter;
@Nullable private MemoryBlock currentPage = null;
private long currentPagePosition = -1;
private long freeSpaceInCurrentPage = 0;

Expand All @@ -106,6 +110,7 @@ public UnsafeShuffleExternalSorter(
this.blockManager = blockManager;
this.taskContext = taskContext;
this.initialSize = initialSize;
this.peakMemoryUsedBytes = initialSize;
this.numPartitions = numPartitions;
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
Expand Down Expand Up @@ -279,10 +284,26 @@ private long getMemoryUsage() {
for (MemoryBlock page : allocatedPages) {
totalPageSize += page.size();
}
return sorter.getMemoryUsage() + totalPageSize;
return ((sorter == null) ? 0 : sorter.getMemoryUsage()) + totalPageSize;
}

private void updatePeakMemoryUsed() {
long mem = getMemoryUsage();
if (mem > peakMemoryUsedBytes) {
peakMemoryUsedBytes = mem;
}
}

/**
* Return the peak memory used so far, in bytes.
*/
long getPeakMemoryUsedBytes() {
updatePeakMemoryUsed();
return peakMemoryUsedBytes;
}

private long freeMemory() {
updatePeakMemoryUsed();
long memoryFreed = 0;
for (MemoryBlock block : allocatedPages) {
memoryManager.freePage(block);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import scala.collection.JavaConversions;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.collection.immutable.Map;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteStreams;
Expand Down Expand Up @@ -78,8 +79,9 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final SparkConf sparkConf;
private final boolean transferToEnabled;

private MapStatus mapStatus = null;
private UnsafeShuffleExternalSorter sorter = null;
@Nullable private MapStatus mapStatus;
@Nullable private UnsafeShuffleExternalSorter sorter;
private long peakMemoryUsedBytes = 0;

/** Subclass of ByteArrayOutputStream that exposes `buf` directly. */
private static final class MyByteArrayOutputStream extends ByteArrayOutputStream {
Expand Down Expand Up @@ -131,9 +133,28 @@ public UnsafeShuffleWriter(

@VisibleForTesting
public int maxRecordSizeBytes() {
assert(sorter != null);
return sorter.maxRecordSizeBytes;
}

private void updatePeakMemoryUsed() {
// sorter can be null if this writer is closed
if (sorter != null) {
long mem = sorter.getPeakMemoryUsedBytes();
if (mem > peakMemoryUsedBytes) {
peakMemoryUsedBytes = mem;
}
}
}

/**
* Return the peak memory used so far, in bytes.
*/
public long getPeakMemoryUsedBytes() {
updatePeakMemoryUsed();
return peakMemoryUsedBytes;
}

/**
* This convenience method should only be called in test code.
*/
Expand All @@ -144,7 +165,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {

@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
// Keep track of success so we know if we ecountered an exception
// Keep track of success so we know if we encountered an exception
// We do this rather than a standard try/catch/re-throw to handle
// generic throwables.
boolean success = false;
Expand Down Expand Up @@ -189,6 +210,8 @@ private void open() throws IOException {

@VisibleForTesting
void closeAndWriteOutput() throws IOException {
assert(sorter != null);
updatePeakMemoryUsed();
serBuffer = null;
serOutputStream = null;
final SpillInfo[] spills = sorter.closeAndGetSpills();
Expand All @@ -209,6 +232,7 @@ void closeAndWriteOutput() throws IOException {

@VisibleForTesting
void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
assert(sorter != null);
final K key = record._1();
final int partitionId = partitioner.getPartition(key);
serBuffer.reset();
Expand Down Expand Up @@ -431,6 +455,14 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
@Override
public Option<MapStatus> stop(boolean success) {
try {
// Update task metrics from accumulators (null in UnsafeShuffleWriterSuite)
Map<String, Accumulator<Object>> internalAccumulators =
taskContext.internalMetricsToAccumulators();
if (internalAccumulators != null) {
internalAccumulators.apply(InternalAccumulator.PEAK_EXECUTION_MEMORY())
.add(getPeakMemoryUsedBytes());
}

if (stopping) {
return Option.apply(null);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ public boolean putNewKey(
// Here, we'll copy the data into our data pages. Because we only store a relative offset from
// the key address instead of storing the absolute address of the value, the key and value
// must be stored in the same memory page.
// (8 byte key length) (key) (8 byte value length) (value)
// (8 byte key length) (key) (value)
final long requiredSize = 8 + keyLengthBytes + valueLengthBytes;

// --- Figure out where to insert the new record ---------------------------------------------
Expand Down Expand Up @@ -655,7 +655,10 @@ public long getPageSizeBytes() {
return pageSizeBytes;
}

/** Returns the total amount of memory, in bytes, consumed by this map's managed structures. */
/**
* Returns the total amount of memory, in bytes, consumed by this map's managed structures.
* Note that this is also the peak memory used by this map, since the map is append-only.
*/
public long getTotalMemoryConsumption() {
long totalDataPagesSize = 0L;
for (MemoryBlock dataPage : dataPages) {
Expand All @@ -674,7 +677,6 @@ public long getTimeSpentResizingNs() {
return timeSpentResizingNs;
}


/**
* Returns the average number of probes per key lookup.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,14 @@ public final class UnsafeExternalSorter {
private final LinkedList<UnsafeSorterSpillWriter> spillWriters = new LinkedList<>();

// These variables are reset after spilling:
private UnsafeInMemorySorter inMemSorter;
@Nullable private UnsafeInMemorySorter inMemSorter;
// Whether the in-mem sorter is created internally, or passed in from outside.
// If it is passed in from outside, we shouldn't release the in-mem sorter's memory.
private boolean isInMemSorterExternal = false;
private MemoryBlock currentPage = null;
private long currentPagePosition = -1;
private long freeSpaceInCurrentPage = 0;
private long peakMemoryUsedBytes = 0;

public static UnsafeExternalSorter createWithExistingInMemorySorter(
TaskMemoryManager taskMemoryManager,
Expand Down Expand Up @@ -183,6 +184,7 @@ public void closeCurrentPage() {
* Sort and spill the current records in response to memory pressure.
*/
public void spill() throws IOException {
assert(inMemSorter != null);
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
Thread.currentThread().getId(),
Utils.bytesToString(getMemoryUsage()),
Expand Down Expand Up @@ -219,7 +221,22 @@ private long getMemoryUsage() {
for (MemoryBlock page : allocatedPages) {
totalPageSize += page.size();
}
return inMemSorter.getMemoryUsage() + totalPageSize;
return ((inMemSorter == null) ? 0 : inMemSorter.getMemoryUsage()) + totalPageSize;
}

private void updatePeakMemoryUsed() {
long mem = getMemoryUsage();
if (mem > peakMemoryUsedBytes) {
peakMemoryUsedBytes = mem;
}
}

/**
* Return the peak memory used so far, in bytes.
*/
public long getPeakMemoryUsedBytes() {
updatePeakMemoryUsed();
return peakMemoryUsedBytes;
}

@VisibleForTesting
Expand All @@ -233,6 +250,7 @@ public int getNumberOfAllocatedPages() {
* @return the number of bytes freed.
*/
public long freeMemory() {
updatePeakMemoryUsed();
long memoryFreed = 0;
for (MemoryBlock block : allocatedPages) {
taskMemoryManager.freePage(block);
Expand Down Expand Up @@ -277,7 +295,8 @@ public void deleteSpillFiles() {
* @return true if the record can be inserted without requiring more allocations, false otherwise.
*/
private boolean haveSpaceForRecord(int requiredSpace) {
assert (requiredSpace > 0);
assert(requiredSpace > 0);
assert(inMemSorter != null);
return (inMemSorter.hasSpaceForAnotherRecord() && (requiredSpace <= freeSpaceInCurrentPage));
}

Expand All @@ -290,6 +309,7 @@ private boolean haveSpaceForRecord(int requiredSpace) {
* the record size.
*/
private void allocateSpaceForRecord(int requiredSpace) throws IOException {
assert(inMemSorter != null);
// TODO: merge these steps to first calculate total memory requirements for this insert,
// then try to acquire; no point in acquiring sort buffer only to spill due to no space in the
// data page.
Expand Down Expand Up @@ -350,6 +370,7 @@ public void insertRecord(
if (!haveSpaceForRecord(totalSpaceRequired)) {
allocateSpaceForRecord(totalSpaceRequired);
}
assert(inMemSorter != null);

final long recordAddress =
taskMemoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition);
Expand Down Expand Up @@ -382,6 +403,7 @@ public void insertKVRecord(
if (!haveSpaceForRecord(totalSpaceRequired)) {
allocateSpaceForRecord(totalSpaceRequired);
}
assert(inMemSorter != null);

final long recordAddress =
taskMemoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition);
Expand All @@ -405,6 +427,7 @@ public void insertKVRecord(
}

public UnsafeSorterIterator getSortedIterator() throws IOException {
assert(inMemSorter != null);
final UnsafeSorterIterator inMemoryIterator = inMemSorter.getSortedIterator();
int numIteratorsToMerge = spillWriters.size() + (inMemoryIterator.hasNext() ? 1 : 0);
if (spillWriters.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ span.additional-metric-title {
/* Hide all additional metrics by default. This is done here rather than using JavaScript to
* avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */
.scheduler_delay, .deserialization_time, .fetch_wait_time, .shuffle_read_remote,
.serialization_time, .getting_result_time {
.serialization_time, .getting_result_time, .peak_execution_memory {
display: none;
}

Expand Down
60 changes: 55 additions & 5 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,14 @@ class Accumulable[R, T] private[spark] (
in.defaultReadObject()
value_ = zero
deserialized = true
val taskContext = TaskContext.get()
taskContext.registerAccumulator(this)
// Automatically register the accumulator when it is deserialized with the task closure.
// Note that internal accumulators are deserialized before the TaskContext is created and
// are registered in the TaskContext constructor.
if (!isInternal) {
val taskContext = TaskContext.get()
assume(taskContext != null, "Task context was null when deserializing user accumulators")
taskContext.registerAccumulator(this)
}
}

override def toString: String = if (value_ == null) "null" else value_.toString
Expand Down Expand Up @@ -248,10 +254,20 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
* @param param helper object defining how to add elements of type `T`
* @tparam T result type
*/
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
extends Accumulable[T, T](initialValue, param, name) {
class Accumulator[T] private[spark] (
@transient initialValue: T,
param: AccumulatorParam[T],
name: Option[String],
internal: Boolean)
extends Accumulable[T, T](initialValue, param, name, internal) {

def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
this(initialValue, param, name, false)
}

def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
def this(initialValue: T, param: AccumulatorParam[T]) = {
this(initialValue, param, None, false)
}
}

/**
Expand Down Expand Up @@ -342,3 +358,37 @@ private[spark] object Accumulators extends Logging {
}

}

private[spark] object InternalAccumulator {
val PEAK_EXECUTION_MEMORY = "peakExecutionMemory"
val TEST_ACCUMULATOR = "testAccumulator"

// For testing only.
// This needs to be a def since we don't want to reuse the same accumulator across stages.
private def maybeTestAccumulator: Option[Accumulator[Long]] = {
if (sys.props.contains("spark.testing")) {
Some(new Accumulator(
0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), internal = true))
} else {
None
}
}

/**
* Accumulators for tracking internal metrics.
*
* These accumulators are created with the stage such that all tasks in the stage will
* add to the same set of accumulators. We do this to report the distribution of accumulator
* values across all tasks within each stage.
*/
def create(): Seq[Accumulator[Long]] = {
Seq(
// Execution memory refers to the memory used by internal data structures created
// during shuffles, aggregations and joins. The value of this accumulator should be
// approximately the sum of the peak sizes across all such data structures created
// in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
new Accumulator(
0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true)
) ++ maybeTestAccumulator.toSeq
}
}
Loading