Skip to content

Commit 4bbd744

Browse files
committed
[SPARK-23376][SQL] creating UnsafeKVExternalSorter with BytesToBytesMap may fail
## What changes were proposed in this pull request? This is a long-standing bug in `UnsafeKVExternalSorter` and was reported in the dev list multiple times. When creating `UnsafeKVExternalSorter` with `BytesToBytesMap`, we need to create a `UnsafeInMemorySorter` to sort the data in `BytesToBytesMap`. The data format of the sorter and the map is same, so no data movement is required. However, both the sorter and the map need a point array for some bookkeeping work. There is an optimization in `UnsafeKVExternalSorter`: reuse the point array between the sorter and the map, to avoid an extra memory allocation. This sounds like a reasonable optimization, the length of the `BytesToBytesMap` point array is at least 4 times larger than the number of keys(to avoid hash collision, the hash table size should be at least 2 times larger than the number of keys, and each key occupies 2 slots). `UnsafeInMemorySorter` needs the pointer array size to be 4 times of the number of entries, so we are safe to reuse the point array. However, the number of keys of the map doesn't equal to the number of entries in the map, because `BytesToBytesMap` supports duplicated keys. This breaks the assumption of the above optimization and we may run out of space when inserting data into the sorter, and hit error ``` java.lang.IllegalStateException: There is no space for new record at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.insertRecord(UnsafeInMemorySorter.java:239) at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:149) ... ``` This PR fixes this bug by creating a new point array if the existing one is not big enough. ## How was this patch tested? a new test Author: Wenchen Fan <[email protected]> Closes #20561 from cloud-fan/bug.
1 parent eacb62f commit 4bbd744

File tree

2 files changed

+62
-8
lines changed

2 files changed

+62
-8
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.spark.storage.BlockManager;
3535
import org.apache.spark.unsafe.KVIterator;
3636
import org.apache.spark.unsafe.Platform;
37+
import org.apache.spark.unsafe.array.LongArray;
3738
import org.apache.spark.unsafe.map.BytesToBytesMap;
3839
import org.apache.spark.unsafe.memory.MemoryBlock;
3940
import org.apache.spark.util.collection.unsafe.sort.*;
@@ -98,19 +99,33 @@ public UnsafeKVExternalSorter(
9899
numElementsForSpillThreshold,
99100
canUseRadixSort);
100101
} else {
101-
// The array will be used to do in-place sort, which require half of the space to be empty.
102-
// Note: each record in the map takes two entries in the array, one is record pointer,
103-
// another is the key prefix.
104-
assert(map.numKeys() * 2 <= map.getArray().size() / 2);
105-
// During spilling, the array in map will not be used, so we can borrow that and use it
106-
// as the underlying array for in-memory sorter (it's always large enough).
107-
// Since we will not grow the array, it's fine to pass `null` as consumer.
102+
// During spilling, the pointer array in `BytesToBytesMap` will not be used, so we can borrow
103+
// that and use it as the pointer array for `UnsafeInMemorySorter`.
104+
LongArray pointerArray = map.getArray();
105+
// `BytesToBytesMap`'s pointer array is only guaranteed to hold all the distinct keys, but
106+
// `UnsafeInMemorySorter`'s pointer array need to hold all the entries. Since
107+
// `BytesToBytesMap` can have duplicated keys, here we need a check to make sure the pointer
108+
// array can hold all the entries in `BytesToBytesMap`.
109+
// The pointer array will be used to do in-place sort, which requires half of the space to be
110+
// empty. Note: each record in the map takes two entries in the pointer array, one is record
111+
// pointer, another is key prefix. So the required size of pointer array is `numRecords * 4`.
112+
// TODO: It's possible to change UnsafeInMemorySorter to have multiple entries with same key,
113+
// so that we can always reuse the pointer array.
114+
if (map.numValues() > pointerArray.size() / 4) {
115+
// Here we ask the map to allocate memory, so that the memory manager won't ask the map
116+
// to spill, if the memory is not enough.
117+
pointerArray = map.allocateArray(map.numValues() * 4L);
118+
}
119+
120+
// Since the pointer array(either reuse the one in the map, or create a new one) is guaranteed
121+
// to be large enough, it's fine to pass `null` as consumer because we won't allocate more
122+
// memory.
108123
final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
109124
null,
110125
taskMemoryManager,
111126
comparatorSupplier.get(),
112127
prefixComparator,
113-
map.getArray(),
128+
pointerArray,
114129
canUseRadixSort);
115130

116131
// We cannot use the destructive iterator here because we are reusing the existing memory

sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
2929
import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeProjection, UnsafeRow}
3030
import org.apache.spark.sql.test.SharedSQLContext
3131
import org.apache.spark.sql.types._
32+
import org.apache.spark.unsafe.map.BytesToBytesMap
3233

3334
/**
3435
* Test suite for [[UnsafeKVExternalSorter]], with randomly generated test data.
@@ -205,4 +206,42 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
205206
spill = true
206207
)
207208
}
209+
210+
test("SPARK-23376: Create UnsafeKVExternalSorter with BytesToByteMap having duplicated keys") {
211+
val memoryManager = new TestMemoryManager(new SparkConf())
212+
val taskMemoryManager = new TaskMemoryManager(memoryManager, 0)
213+
val map = new BytesToBytesMap(taskMemoryManager, 64, taskMemoryManager.pageSizeBytes())
214+
215+
// Key/value are a unsafe rows with a single int column
216+
val schema = new StructType().add("i", IntegerType)
217+
val key = new UnsafeRow(1)
218+
key.pointTo(new Array[Byte](32), 32)
219+
key.setInt(0, 1)
220+
val value = new UnsafeRow(1)
221+
value.pointTo(new Array[Byte](32), 32)
222+
value.setInt(0, 2)
223+
224+
for (_ <- 1 to 65) {
225+
val loc = map.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes)
226+
loc.append(
227+
key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
228+
value.getBaseObject, value.getBaseOffset, value.getSizeInBytes)
229+
}
230+
231+
// Make sure we can successfully create a UnsafeKVExternalSorter with a `BytesToBytesMap`
232+
// which has duplicated keys and the number of entries exceeds its capacity.
233+
try {
234+
TaskContext.setTaskContext(new TaskContextImpl(0, 0, 0, 0, 0, taskMemoryManager, null, null))
235+
new UnsafeKVExternalSorter(
236+
schema,
237+
schema,
238+
sparkContext.env.blockManager,
239+
sparkContext.env.serializerManager,
240+
taskMemoryManager.pageSizeBytes(),
241+
Int.MaxValue,
242+
map)
243+
} finally {
244+
TaskContext.unset()
245+
}
246+
}
208247
}

0 commit comments

Comments
 (0)