Skip to content

Commit 7df6008

Browse files
committed
Optimizations related to zeroing out memory:
- Do not zero out all allocated memory; the zeroing isn't free and in many cases it isn't necessary. - There are some cases where we do want to clear the memory, such as in BitSet. It shouldn't be the BitSet object's responsibility to zero out the memory block passed to it (since maybe we're passing some memory created by someone else and want to interpret it as a bitset). To make the caller's life easier, though, I added a MemoryBlock.zero() method for clearing the block. - In UnsafeGeneratedAggregate, use Arrays.fill to clear the re-used temporary row buffer, since this is likely to be much faster than Unsafe.setMemory; see http://psy-lob-saw.blogspot.com/2015/04/on-arraysfill-intrinsics-superword-and.html for more details.
1 parent c1b3813 commit 7df6008

File tree

7 files changed

+21
-19
lines changed

7 files changed

+21
-19
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeGeneratedAggregate.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@
1717

1818
package org.apache.spark.sql.execution
1919

20+
import java.util
21+
2022
import org.apache.spark.annotation.DeveloperApi
2123
import org.apache.spark.rdd.RDD
2224
import org.apache.spark.sql.catalyst.trees._
2325
import org.apache.spark.sql.catalyst.expressions._
2426
import org.apache.spark.sql.catalyst.plans.physical._
2527
import org.apache.spark.sql.types._
2628
import org.apache.spark.unsafe.PlatformDependent
27-
import org.apache.spark.unsafe.array.ByteArrayMethods
2829
import org.apache.spark.unsafe.map.BytesToBytesMap
2930
import org.apache.spark.unsafe.memory.MemoryAllocator
3031

@@ -295,8 +296,7 @@ case class UnsafeGeneratedAggregate(
295296
// Zero out the buffer that's used to hold the current row. This is necessary in order
296297
// to ensure that rows hash properly, since garbage data from the previous row could
297298
// otherwise end up as padding in this row.
298-
ByteArrayMethods.zeroBytes(
299-
unsafeRowBuffer, PlatformDependent.LONG_ARRAY_OFFSET, unsafeRowBuffer.length)
299+
util.Arrays.fill(unsafeRowBuffer, 0)
300300
// Grab the next row from our input iterator and compute its group projection.
301301
// In the long run, it might be nice to use Unsafe rows for this as well, but for now
302302
// we'll just rely on the existing code paths to compute the projection.

unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,6 @@ public static int roundNumberOfBytesToNearestWord(int numBytes) {
4040
}
4141
}
4242

43-
public static void zeroBytes(
44-
Object baseObject,
45-
long baseOffset,
46-
long lengthInBytes) {
47-
for (int i = 0; i < lengthInBytes; i++) {
48-
PlatformDependent.UNSAFE.putByte(baseObject, baseOffset + i, (byte) 0);
49-
}
50-
}
51-
5243
/**
5344
* Optimized equality check for equal-length byte arrays.
5445
* @return true if the arrays are equal, false otherwise

unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,7 @@
2222
import org.apache.spark.unsafe.array.LongArray;
2323
import org.apache.spark.unsafe.bitset.BitSet;
2424
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
25-
import org.apache.spark.unsafe.memory.HeapMemoryAllocator;
26-
import org.apache.spark.unsafe.memory.MemoryAllocator;
27-
import org.apache.spark.unsafe.memory.MemoryBlock;
28-
import org.apache.spark.unsafe.memory.MemoryLocation;
25+
import org.apache.spark.unsafe.memory.*;
2926

3027
import java.lang.IllegalStateException;
3128
import java.lang.Long;
@@ -452,7 +449,7 @@ public void storeKeyAndValue(
452449
private void allocate(long capacity) {
453450
capacity = java.lang.Math.max(nextPowerOf2(capacity), 64);
454451
longArray = new LongArray(allocator.allocate(capacity * 8 * 2));
455-
bitset = new BitSet(allocator.allocate(capacity / 8));
452+
bitset = new BitSet(allocator.allocate(capacity / 8).zero());
456453

457454
this.growthThreshold = (long) (capacity * loadFactor);
458455
this.mask = capacity - 1;
@@ -525,6 +522,9 @@ private void growAndRehash() {
525522
}
526523
}
527524

525+
// TODO: we should probably have a try-finally block here to make sure that we free the allocated
526+
// memory even if an error occurs.
527+
528528
// Deallocate the old data structures.
529529
allocator.free(oldLongArray.memoryBlock());
530530
allocator.free(oldBitSet.memoryBlock());

unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919

2020
public interface MemoryAllocator {
2121

22+
/**
23+
* Allocates a contiguous block of memory. Note that the allocated memory is not guaranteed
24+
* to be zeroed out (call `zero()` on the result if this is necessary).
25+
*/
2226
public MemoryBlock allocate(long size) throws OutOfMemoryError;
2327

2428
public void free(MemoryBlock memory);

unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@ public long size() {
4040
return length;
4141
}
4242

43+
/**
44+
* Clear the contents of this memory block. Returns `this` to facilitate chaining.
45+
*/
46+
public MemoryBlock zero() {
47+
PlatformDependent.UNSAFE.setMemory(obj, offset, length, (byte) 0);
48+
return this;
49+
}
50+
4351
/**
4452
* Creates a memory block pointing to the memory used by the byte array.
4553
*/

unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ public class UnsafeMemoryAllocator implements MemoryAllocator {
2727
@Override
2828
public MemoryBlock allocate(long size) throws OutOfMemoryError {
2929
long address = PlatformDependent.UNSAFE.allocateMemory(size);
30-
PlatformDependent.UNSAFE.setMemory(address, size, (byte) 0);
3130
return new MemoryBlock(null, address, size);
3231
}
3332

unsafe/src/test/java/org/apache/spark/unsafe/bitset/TestBitSet.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class TestBitSet {
2727

2828
private BitSet createBitSet(int capacity) {
2929
assert capacity % 64 == 0;
30-
return new BitSet(MemoryBlock.fromLongArray(new long[capacity / 64]));
30+
return new BitSet(MemoryBlock.fromLongArray(new long[capacity / 64]).zero());
3131
}
3232

3333
@Test

0 commit comments

Comments
 (0)