Skip to content

Commit 56419cf

Browse files
Davies Liudavies
authored andcommitted
[SPARK-10342] [SPARK-10309] [SPARK-10474] [SPARK-10929] [SQL] Cooperative memory management
This PR introduce a mechanism to call spill() on those SQL operators that support spilling (for example, BytesToBytesMap, UnsafeExternalSorter and ShuffleExternalSorter) if there is not enough memory for execution. The preserved first page is needed anymore, so removed. Other Spillable objects in Spark core (ExternalSorter and AppendOnlyMap) are not included in this PR, but those could benefit from this (trigger others' spilling). The PrepareRDD may be not needed anymore, could be removed in follow up PR. The following script will fail with OOM before this PR, finished in 150 seconds with 2G heap (also works in 1.5 branch, with similar duration). ```python sqlContext.setConf("spark.sql.shuffle.partitions", "1") df = sqlContext.range(1<<25).selectExpr("id", "repeat(id, 2) as s") df2 = df.select(df.id.alias('id2'), df.s.alias('s2')) j = df.join(df2, df.id==df2.id2).groupBy(df.id).max("id", "id2") j.explain() print j.count() ``` For thread-safety, here what I'm got: 1) Without calling spill(), the operators should only be used by single thread, no safety problems. 2) spill() could be triggered in two cases, triggered by itself, or by other operators. we can check trigger == this in spill(), so it's still in the same thread, so safety problems. 3) if it's triggered by other operators (right now cache will not trigger spill()), we only spill the data into disk when it's in scanning stage (building is finished), so the in-memory sorter or memory pages are read-only, we only need to synchronize the iterator and change it. 4) During scanning, the iterator will only use one record in one page, we can't free this page, because the downstream is currently using it (used by UnsafeRow or other objects). In BytesToBytesMap, we just skip the current page, and dump all others into disk. In UnsafeExternalSorter, we keep the page that is used by current record (having the same baseObject), free it when loading the next record. In ShuffleExternalSorter, the spill() will not trigger during scanning. 5) In order to avoid deadlock, we didn't call acquireMemory during spill (so we reused the pointer array in InMemorySorter). Author: Davies Liu <[email protected]> Closes #9241 from davies/force_spill.
1 parent d89be0b commit 56419cf

File tree

30 files changed

+1270
-834
lines changed

30 files changed

+1270
-834
lines changed
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.memory;
19+
20+
21+
import java.io.IOException;
22+
23+
import org.apache.spark.unsafe.memory.MemoryBlock;
24+
25+
26+
/**
27+
* An memory consumer of TaskMemoryManager, which support spilling.
28+
*/
29+
public abstract class MemoryConsumer {
30+
31+
private final TaskMemoryManager taskMemoryManager;
32+
private final long pageSize;
33+
private long used;
34+
35+
protected MemoryConsumer(TaskMemoryManager taskMemoryManager, long pageSize) {
36+
this.taskMemoryManager = taskMemoryManager;
37+
this.pageSize = pageSize;
38+
this.used = 0;
39+
}
40+
41+
protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
42+
this(taskMemoryManager, taskMemoryManager.pageSizeBytes());
43+
}
44+
45+
/**
46+
* Returns the size of used memory in bytes.
47+
*/
48+
long getUsed() {
49+
return used;
50+
}
51+
52+
/**
53+
* Force spill during building.
54+
*
55+
* For testing.
56+
*/
57+
public void spill() throws IOException {
58+
spill(Long.MAX_VALUE, this);
59+
}
60+
61+
/**
62+
* Spill some data to disk to release memory, which will be called by TaskMemoryManager
63+
* when there is not enough memory for the task.
64+
*
65+
* This should be implemented by subclass.
66+
*
67+
* Note: In order to avoid possible deadlock, should not call acquireMemory() from spill().
68+
*
69+
* @param size the amount of memory should be released
70+
* @param trigger the MemoryConsumer that trigger this spilling
71+
* @return the amount of released memory in bytes
72+
* @throws IOException
73+
*/
74+
public abstract long spill(long size, MemoryConsumer trigger) throws IOException;
75+
76+
/**
77+
* Acquire `size` bytes memory.
78+
*
79+
* If there is not enough memory, throws OutOfMemoryError.
80+
*/
81+
protected void acquireMemory(long size) {
82+
long got = taskMemoryManager.acquireExecutionMemory(size, this);
83+
if (got < size) {
84+
taskMemoryManager.releaseExecutionMemory(got, this);
85+
taskMemoryManager.showMemoryUsage();
86+
throw new OutOfMemoryError("Could not acquire " + size + " bytes of memory, got " + got);
87+
}
88+
used += got;
89+
}
90+
91+
/**
92+
* Release `size` bytes memory.
93+
*/
94+
protected void releaseMemory(long size) {
95+
used -= size;
96+
taskMemoryManager.releaseExecutionMemory(size, this);
97+
}
98+
99+
/**
100+
* Allocate a memory block with at least `required` bytes.
101+
*
102+
* Throws IOException if there is not enough memory.
103+
*
104+
* @throws OutOfMemoryError
105+
*/
106+
protected MemoryBlock allocatePage(long required) {
107+
MemoryBlock page = taskMemoryManager.allocatePage(Math.max(pageSize, required), this);
108+
if (page == null || page.size() < required) {
109+
long got = 0;
110+
if (page != null) {
111+
got = page.size();
112+
freePage(page);
113+
}
114+
taskMemoryManager.showMemoryUsage();
115+
throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got);
116+
}
117+
used += page.size();
118+
return page;
119+
}
120+
121+
/**
122+
* Free a memory block.
123+
*/
124+
protected void freePage(MemoryBlock page) {
125+
used -= page.size();
126+
taskMemoryManager.freePage(page, this);
127+
}
128+
}

core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java

Lines changed: 108 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@
1717

1818
package org.apache.spark.memory;
1919

20-
import java.util.*;
20+
import javax.annotation.concurrent.GuardedBy;
21+
import java.io.IOException;
22+
import java.util.Arrays;
23+
import java.util.BitSet;
24+
import java.util.HashSet;
2125

2226
import com.google.common.annotations.VisibleForTesting;
2327
import org.slf4j.Logger;
2428
import org.slf4j.LoggerFactory;
2529

2630
import org.apache.spark.unsafe.memory.MemoryBlock;
31+
import org.apache.spark.util.Utils;
2732

2833
/**
2934
* Manages the memory allocated by an individual task.
@@ -100,30 +105,105 @@ public class TaskMemoryManager {
100105
*/
101106
private final boolean inHeap;
102107

108+
/**
109+
* The size of memory granted to each consumer.
110+
*/
111+
@GuardedBy("this")
112+
private final HashSet<MemoryConsumer> consumers;
113+
103114
/**
104115
* Construct a new TaskMemoryManager.
105116
*/
106117
public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
107118
this.inHeap = memoryManager.tungstenMemoryIsAllocatedInHeap();
108119
this.memoryManager = memoryManager;
109120
this.taskAttemptId = taskAttemptId;
121+
this.consumers = new HashSet<>();
110122
}
111123

112124
/**
113-
* Acquire N bytes of memory for execution, evicting cached blocks if necessary.
125+
* Acquire N bytes of memory for a consumer. If there is no enough memory, it will call
126+
* spill() of consumers to release more memory.
127+
*
114128
* @return number of bytes successfully granted (<= N).
115129
*/
116-
public long acquireExecutionMemory(long size) {
117-
return memoryManager.acquireExecutionMemory(size, taskAttemptId);
130+
public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
131+
assert(required >= 0);
132+
synchronized (this) {
133+
long got = memoryManager.acquireExecutionMemory(required, taskAttemptId);
134+
135+
// try to release memory from other consumers first, then we can reduce the frequency of
136+
// spilling, avoid to have too many spilled files.
137+
if (got < required) {
138+
// Call spill() on other consumers to release memory
139+
for (MemoryConsumer c: consumers) {
140+
if (c != null && c != consumer && c.getUsed() > 0) {
141+
try {
142+
long released = c.spill(required - got, consumer);
143+
if (released > 0) {
144+
logger.info("Task {} released {} from {} for {}", taskAttemptId,
145+
Utils.bytesToString(released), c, consumer);
146+
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId);
147+
if (got >= required) {
148+
break;
149+
}
150+
}
151+
} catch (IOException e) {
152+
logger.error("error while calling spill() on " + c, e);
153+
throw new OutOfMemoryError("error while calling spill() on " + c + " : "
154+
+ e.getMessage());
155+
}
156+
}
157+
}
158+
}
159+
160+
// call spill() on itself
161+
if (got < required && consumer != null) {
162+
try {
163+
long released = consumer.spill(required - got, consumer);
164+
if (released > 0) {
165+
logger.info("Task {} released {} from itself ({})", taskAttemptId,
166+
Utils.bytesToString(released), consumer);
167+
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId);
168+
}
169+
} catch (IOException e) {
170+
logger.error("error while calling spill() on " + consumer, e);
171+
throw new OutOfMemoryError("error while calling spill() on " + consumer + " : "
172+
+ e.getMessage());
173+
}
174+
}
175+
176+
consumers.add(consumer);
177+
logger.debug("Task {} acquire {} for {}", taskAttemptId, Utils.bytesToString(got), consumer);
178+
return got;
179+
}
118180
}
119181

120182
/**
121-
* Release N bytes of execution memory.
183+
* Release N bytes of execution memory for a MemoryConsumer.
122184
*/
123-
public void releaseExecutionMemory(long size) {
185+
public void releaseExecutionMemory(long size, MemoryConsumer consumer) {
186+
logger.debug("Task {} release {} from {}", taskAttemptId, Utils.bytesToString(size), consumer);
124187
memoryManager.releaseExecutionMemory(size, taskAttemptId);
125188
}
126189

190+
/**
191+
* Dump the memory usage of all consumers.
192+
*/
193+
public void showMemoryUsage() {
194+
logger.info("Memory used in task " + taskAttemptId);
195+
synchronized (this) {
196+
for (MemoryConsumer c: consumers) {
197+
if (c.getUsed() > 0) {
198+
logger.info("Acquired by " + c + ": " + Utils.bytesToString(c.getUsed()));
199+
}
200+
}
201+
}
202+
}
203+
204+
/**
205+
* Return the page size in bytes.
206+
*/
127207
public long pageSizeBytes() {
128208
return memoryManager.pageSizeBytes();
129209
}
@@ -134,42 +214,40 @@ public long pageSizeBytes() {
134214
*
135215
* Returns `null` if there was not enough memory to allocate the page.
136216
*/
137-
public MemoryBlock allocatePage(long size) {
217+
public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
138218
if (size > MAXIMUM_PAGE_SIZE_BYTES) {
139219
throw new IllegalArgumentException(
140220
"Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes");
141221
}
142222

223+
long acquired = acquireExecutionMemory(size, consumer);
224+
if (acquired <= 0) {
225+
return null;
226+
}
227+
143228
final int pageNumber;
144229
synchronized (this) {
145230
pageNumber = allocatedPages.nextClearBit(0);
146231
if (pageNumber >= PAGE_TABLE_SIZE) {
232+
releaseExecutionMemory(acquired, consumer);
147233
throw new IllegalStateException(
148234
"Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages");
149235
}
150236
allocatedPages.set(pageNumber);
151237
}
152-
final long acquiredExecutionMemory = acquireExecutionMemory(size);
153-
if (acquiredExecutionMemory != size) {
154-
releaseExecutionMemory(acquiredExecutionMemory);
155-
synchronized (this) {
156-
allocatedPages.clear(pageNumber);
157-
}
158-
return null;
159-
}
160-
final MemoryBlock page = memoryManager.tungstenMemoryAllocator().allocate(size);
238+
final MemoryBlock page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
161239
page.pageNumber = pageNumber;
162240
pageTable[pageNumber] = page;
163241
if (logger.isTraceEnabled()) {
164-
logger.trace("Allocate page number {} ({} bytes)", pageNumber, size);
242+
logger.trace("Allocate page number {} ({} bytes)", pageNumber, acquired);
165243
}
166244
return page;
167245
}
168246

169247
/**
170-
* Free a block of memory allocated via {@link TaskMemoryManager#allocatePage(long)}.
248+
* Free a block of memory allocated via {@link TaskMemoryManager#allocatePage}.
171249
*/
172-
public void freePage(MemoryBlock page) {
250+
public void freePage(MemoryBlock page, MemoryConsumer consumer) {
173251
assert (page.pageNumber != -1) :
174252
"Called freePage() on memory that wasn't allocated with allocatePage()";
175253
assert(allocatedPages.get(page.pageNumber));
@@ -182,14 +260,14 @@ public void freePage(MemoryBlock page) {
182260
}
183261
long pageSize = page.size();
184262
memoryManager.tungstenMemoryAllocator().free(page);
185-
releaseExecutionMemory(pageSize);
263+
releaseExecutionMemory(pageSize, consumer);
186264
}
187265

188266
/**
189267
* Given a memory page and offset within that page, encode this address into a 64-bit long.
190268
* This address will remain valid as long as the corresponding page has not been freed.
191269
*
192-
* @param page a data page allocated by {@link TaskMemoryManager#allocatePage(long)}/
270+
* @param page a data page allocated by {@link TaskMemoryManager#allocatePage}/
193271
* @param offsetInPage an offset in this page which incorporates the base offset. In other words,
194272
* this should be the value that you would pass as the base offset into an
195273
* UNSAFE call (e.g. page.baseOffset() + something).
@@ -261,17 +339,17 @@ public long getOffsetInPage(long pagePlusOffsetAddress) {
261339
* value can be used to detect memory leaks.
262340
*/
263341
public long cleanUpAllAllocatedMemory() {
264-
long freedBytes = 0;
265-
for (MemoryBlock page : pageTable) {
266-
if (page != null) {
267-
freedBytes += page.size();
268-
freePage(page);
342+
synchronized (this) {
343+
Arrays.fill(pageTable, null);
344+
for (MemoryConsumer c: consumers) {
345+
if (c != null && c.getUsed() > 0) {
346+
// In case of failed task, it's normal to see leaked memory
347+
logger.warn("leak " + Utils.bytesToString(c.getUsed()) + " memory from " + c);
348+
}
269349
}
350+
consumers.clear();
270351
}
271-
272-
freedBytes += memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId);
273-
274-
return freedBytes;
352+
return memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId);
275353
}
276354

277355
/**

0 commit comments

Comments
 (0)