Skip to content

Commit f532b3c

Browse files
committed
Merge remote-tracking branch 'upstream/master' into codegen_in
2 parents 446bbcd + 9e952ec commit f532b3c

File tree

190 files changed

+5978
-1808
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

190 files changed

+5978
-1808
lines changed

conf/spark-env.sh.template

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
# - SPARK_WORKER_INSTANCES, to set the number of worker processes per node
3939
# - SPARK_WORKER_DIR, to set the working directory of worker processes
4040
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
41+
# - SPARK_DAEMON_MEMORY, to allocate to the master, worker and history server themselves (default: 1g).
4142
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
4243
# - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y")
4344
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")

core/pom.xml

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,30 +46,10 @@
4646
<dependency>
4747
<groupId>com.twitter</groupId>
4848
<artifactId>chill_${scala.binary.version}</artifactId>
49-
<exclusions>
50-
<exclusion>
51-
<groupId>org.ow2.asm</groupId>
52-
<artifactId>asm</artifactId>
53-
</exclusion>
54-
<exclusion>
55-
<groupId>org.ow2.asm</groupId>
56-
<artifactId>asm-commons</artifactId>
57-
</exclusion>
58-
</exclusions>
5949
</dependency>
6050
<dependency>
6151
<groupId>com.twitter</groupId>
6252
<artifactId>chill-java</artifactId>
63-
<exclusions>
64-
<exclusion>
65-
<groupId>org.ow2.asm</groupId>
66-
<artifactId>asm</artifactId>
67-
</exclusion>
68-
<exclusion>
69-
<groupId>org.ow2.asm</groupId>
70-
<artifactId>asm-commons</artifactId>
71-
</exclusion>
72-
</exclusions>
7353
</dependency>
7454
<dependency>
7555
<groupId>org.apache.hadoop</groupId>

core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.File;
2121
import java.io.IOException;
2222
import java.util.LinkedList;
23+
import javax.annotation.Nullable;
2324

2425
import scala.Tuple2;
2526

@@ -86,9 +87,12 @@ final class UnsafeShuffleExternalSorter {
8687

8788
private final LinkedList<SpillInfo> spills = new LinkedList<SpillInfo>();
8889

90+
/** Peak memory used by this sorter so far, in bytes. **/
91+
private long peakMemoryUsedBytes;
92+
8993
// These variables are reset after spilling:
90-
private UnsafeShuffleInMemorySorter sorter;
91-
private MemoryBlock currentPage = null;
94+
@Nullable private UnsafeShuffleInMemorySorter sorter;
95+
@Nullable private MemoryBlock currentPage = null;
9296
private long currentPagePosition = -1;
9397
private long freeSpaceInCurrentPage = 0;
9498

@@ -106,6 +110,7 @@ public UnsafeShuffleExternalSorter(
106110
this.blockManager = blockManager;
107111
this.taskContext = taskContext;
108112
this.initialSize = initialSize;
113+
this.peakMemoryUsedBytes = initialSize;
109114
this.numPartitions = numPartitions;
110115
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
111116
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
@@ -279,10 +284,26 @@ private long getMemoryUsage() {
279284
for (MemoryBlock page : allocatedPages) {
280285
totalPageSize += page.size();
281286
}
282-
return sorter.getMemoryUsage() + totalPageSize;
287+
return ((sorter == null) ? 0 : sorter.getMemoryUsage()) + totalPageSize;
288+
}
289+
290+
private void updatePeakMemoryUsed() {
291+
long mem = getMemoryUsage();
292+
if (mem > peakMemoryUsedBytes) {
293+
peakMemoryUsedBytes = mem;
294+
}
295+
}
296+
297+
/**
298+
* Return the peak memory used so far, in bytes.
299+
*/
300+
long getPeakMemoryUsedBytes() {
301+
updatePeakMemoryUsed();
302+
return peakMemoryUsedBytes;
283303
}
284304

285305
private long freeMemory() {
306+
updatePeakMemoryUsed();
286307
long memoryFreed = 0;
287308
for (MemoryBlock block : allocatedPages) {
288309
memoryManager.freePage(block);

core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import scala.collection.JavaConversions;
2828
import scala.reflect.ClassTag;
2929
import scala.reflect.ClassTag$;
30+
import scala.collection.immutable.Map;
3031

3132
import com.google.common.annotations.VisibleForTesting;
3233
import com.google.common.io.ByteStreams;
@@ -78,8 +79,9 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
7879
private final SparkConf sparkConf;
7980
private final boolean transferToEnabled;
8081

81-
private MapStatus mapStatus = null;
82-
private UnsafeShuffleExternalSorter sorter = null;
82+
@Nullable private MapStatus mapStatus;
83+
@Nullable private UnsafeShuffleExternalSorter sorter;
84+
private long peakMemoryUsedBytes = 0;
8385

8486
/** Subclass of ByteArrayOutputStream that exposes `buf` directly. */
8587
private static final class MyByteArrayOutputStream extends ByteArrayOutputStream {
@@ -131,9 +133,28 @@ public UnsafeShuffleWriter(
131133

132134
@VisibleForTesting
133135
public int maxRecordSizeBytes() {
136+
assert(sorter != null);
134137
return sorter.maxRecordSizeBytes;
135138
}
136139

140+
private void updatePeakMemoryUsed() {
141+
// sorter can be null if this writer is closed
142+
if (sorter != null) {
143+
long mem = sorter.getPeakMemoryUsedBytes();
144+
if (mem > peakMemoryUsedBytes) {
145+
peakMemoryUsedBytes = mem;
146+
}
147+
}
148+
}
149+
150+
/**
151+
* Return the peak memory used so far, in bytes.
152+
*/
153+
public long getPeakMemoryUsedBytes() {
154+
updatePeakMemoryUsed();
155+
return peakMemoryUsedBytes;
156+
}
157+
137158
/**
138159
* This convenience method should only be called in test code.
139160
*/
@@ -144,7 +165,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
144165

145166
@Override
146167
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
147-
// Keep track of success so we know if we ecountered an exception
168+
// Keep track of success so we know if we encountered an exception
148169
// We do this rather than a standard try/catch/re-throw to handle
149170
// generic throwables.
150171
boolean success = false;
@@ -189,6 +210,8 @@ private void open() throws IOException {
189210

190211
@VisibleForTesting
191212
void closeAndWriteOutput() throws IOException {
213+
assert(sorter != null);
214+
updatePeakMemoryUsed();
192215
serBuffer = null;
193216
serOutputStream = null;
194217
final SpillInfo[] spills = sorter.closeAndGetSpills();
@@ -209,6 +232,7 @@ void closeAndWriteOutput() throws IOException {
209232

210233
@VisibleForTesting
211234
void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
235+
assert(sorter != null);
212236
final K key = record._1();
213237
final int partitionId = partitioner.getPartition(key);
214238
serBuffer.reset();
@@ -431,6 +455,14 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
431455
@Override
432456
public Option<MapStatus> stop(boolean success) {
433457
try {
458+
// Update task metrics from accumulators (null in UnsafeShuffleWriterSuite)
459+
Map<String, Accumulator<Object>> internalAccumulators =
460+
taskContext.internalMetricsToAccumulators();
461+
if (internalAccumulators != null) {
462+
internalAccumulators.apply(InternalAccumulator.PEAK_EXECUTION_MEMORY())
463+
.add(getPeakMemoryUsedBytes());
464+
}
465+
434466
if (stopping) {
435467
return Option.apply(null);
436468
} else {

core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,7 @@ public boolean putNewKey(
505505
// Here, we'll copy the data into our data pages. Because we only store a relative offset from
506506
// the key address instead of storing the absolute address of the value, the key and value
507507
// must be stored in the same memory page.
508-
// (8 byte key length) (key) (8 byte value length) (value)
508+
// (8 byte key length) (key) (value)
509509
final long requiredSize = 8 + keyLengthBytes + valueLengthBytes;
510510

511511
// --- Figure out where to insert the new record ---------------------------------------------
@@ -655,7 +655,10 @@ public long getPageSizeBytes() {
655655
return pageSizeBytes;
656656
}
657657

658-
/** Returns the total amount of memory, in bytes, consumed by this map's managed structures. */
658+
/**
659+
* Returns the total amount of memory, in bytes, consumed by this map's managed structures.
660+
* Note that this is also the peak memory used by this map, since the map is append-only.
661+
*/
659662
public long getTotalMemoryConsumption() {
660663
long totalDataPagesSize = 0L;
661664
for (MemoryBlock dataPage : dataPages) {
@@ -674,7 +677,6 @@ public long getTimeSpentResizingNs() {
674677
return timeSpentResizingNs;
675678
}
676679

677-
678680
/**
679681
* Returns the average number of probes per key lookup.
680682
*/

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,14 @@ public final class UnsafeExternalSorter {
7070
private final LinkedList<UnsafeSorterSpillWriter> spillWriters = new LinkedList<>();
7171

7272
// These variables are reset after spilling:
73-
private UnsafeInMemorySorter inMemSorter;
73+
@Nullable private UnsafeInMemorySorter inMemSorter;
7474
// Whether the in-mem sorter is created internally, or passed in from outside.
7575
// If it is passed in from outside, we shouldn't release the in-mem sorter's memory.
7676
private boolean isInMemSorterExternal = false;
7777
private MemoryBlock currentPage = null;
7878
private long currentPagePosition = -1;
7979
private long freeSpaceInCurrentPage = 0;
80+
private long peakMemoryUsedBytes = 0;
8081

8182
public static UnsafeExternalSorter createWithExistingInMemorySorter(
8283
TaskMemoryManager taskMemoryManager,
@@ -183,6 +184,7 @@ public void closeCurrentPage() {
183184
* Sort and spill the current records in response to memory pressure.
184185
*/
185186
public void spill() throws IOException {
187+
assert(inMemSorter != null);
186188
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
187189
Thread.currentThread().getId(),
188190
Utils.bytesToString(getMemoryUsage()),
@@ -219,7 +221,22 @@ private long getMemoryUsage() {
219221
for (MemoryBlock page : allocatedPages) {
220222
totalPageSize += page.size();
221223
}
222-
return inMemSorter.getMemoryUsage() + totalPageSize;
224+
return ((inMemSorter == null) ? 0 : inMemSorter.getMemoryUsage()) + totalPageSize;
225+
}
226+
227+
private void updatePeakMemoryUsed() {
228+
long mem = getMemoryUsage();
229+
if (mem > peakMemoryUsedBytes) {
230+
peakMemoryUsedBytes = mem;
231+
}
232+
}
233+
234+
/**
235+
* Return the peak memory used so far, in bytes.
236+
*/
237+
public long getPeakMemoryUsedBytes() {
238+
updatePeakMemoryUsed();
239+
return peakMemoryUsedBytes;
223240
}
224241

225242
@VisibleForTesting
@@ -233,6 +250,7 @@ public int getNumberOfAllocatedPages() {
233250
* @return the number of bytes freed.
234251
*/
235252
public long freeMemory() {
253+
updatePeakMemoryUsed();
236254
long memoryFreed = 0;
237255
for (MemoryBlock block : allocatedPages) {
238256
taskMemoryManager.freePage(block);
@@ -277,7 +295,8 @@ public void deleteSpillFiles() {
277295
* @return true if the record can be inserted without requiring more allocations, false otherwise.
278296
*/
279297
private boolean haveSpaceForRecord(int requiredSpace) {
280-
assert (requiredSpace > 0);
298+
assert(requiredSpace > 0);
299+
assert(inMemSorter != null);
281300
return (inMemSorter.hasSpaceForAnotherRecord() && (requiredSpace <= freeSpaceInCurrentPage));
282301
}
283302

@@ -290,6 +309,7 @@ private boolean haveSpaceForRecord(int requiredSpace) {
290309
* the record size.
291310
*/
292311
private void allocateSpaceForRecord(int requiredSpace) throws IOException {
312+
assert(inMemSorter != null);
293313
// TODO: merge these steps to first calculate total memory requirements for this insert,
294314
// then try to acquire; no point in acquiring sort buffer only to spill due to no space in the
295315
// data page.
@@ -350,6 +370,7 @@ public void insertRecord(
350370
if (!haveSpaceForRecord(totalSpaceRequired)) {
351371
allocateSpaceForRecord(totalSpaceRequired);
352372
}
373+
assert(inMemSorter != null);
353374

354375
final long recordAddress =
355376
taskMemoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition);
@@ -382,6 +403,7 @@ public void insertKVRecord(
382403
if (!haveSpaceForRecord(totalSpaceRequired)) {
383404
allocateSpaceForRecord(totalSpaceRequired);
384405
}
406+
assert(inMemSorter != null);
385407

386408
final long recordAddress =
387409
taskMemoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition);
@@ -405,7 +427,8 @@ public void insertKVRecord(
405427
}
406428

407429
public UnsafeSorterIterator getSortedIterator() throws IOException {
408-
final UnsafeSorterIterator inMemoryIterator = inMemSorter.getSortedIterator();
430+
assert(inMemSorter != null);
431+
final UnsafeInMemorySorter.SortedIterator inMemoryIterator = inMemSorter.getSortedIterator();
409432
int numIteratorsToMerge = spillWriters.size() + (inMemoryIterator.hasNext() ? 1 : 0);
410433
if (spillWriters.isEmpty()) {
411434
return inMemoryIterator;

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public void insertRecord(long recordPointer, long keyPrefix) {
133133
pointerArrayInsertPosition++;
134134
}
135135

136-
private static final class SortedIterator extends UnsafeSorterIterator {
136+
public static final class SortedIterator extends UnsafeSorterIterator {
137137

138138
private final TaskMemoryManager memoryManager;
139139
private final int sortBufferInsertPosition;
@@ -144,7 +144,7 @@ private static final class SortedIterator extends UnsafeSorterIterator {
144144
private long keyPrefix;
145145
private int recordLength;
146146

147-
SortedIterator(
147+
private SortedIterator(
148148
TaskMemoryManager memoryManager,
149149
int sortBufferInsertPosition,
150150
long[] sortBuffer) {
@@ -186,7 +186,7 @@ public void loadNext() {
186186
* Return an iterator over record pointers in sorted order. For efficiency, all calls to
187187
* {@code next()} will return the same mutable object.
188188
*/
189-
public UnsafeSorterIterator getSortedIterator() {
189+
public SortedIterator getSortedIterator() {
190190
sorter.sort(pointerArray, 0, pointerArrayInsertPosition / 2, sortComparator);
191191
return new SortedIterator(memoryManager, pointerArrayInsertPosition, pointerArray);
192192
}

core/src/main/resources/org/apache/spark/ui/static/webui.css

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ span.additional-metric-title {
207207
/* Hide all additional metrics by default. This is done here rather than using JavaScript to
208208
* avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */
209209
.scheduler_delay, .deserialization_time, .fetch_wait_time, .shuffle_read_remote,
210-
.serialization_time, .getting_result_time {
210+
.serialization_time, .getting_result_time, .peak_execution_memory {
211211
display: none;
212212
}
213213

@@ -224,3 +224,11 @@ span.additional-metric-title {
224224
a.expandbutton {
225225
cursor: pointer;
226226
}
227+
228+
.executor-thread {
229+
background: #E6E6E6;
230+
}
231+
232+
.non-executor-thread {
233+
background: #FAFAFA;
234+
}

0 commit comments

Comments
 (0)