Skip to content

Commit 3ca84b2

Browse files
committed
Only zero the used portion of groupingKeyConversionScratchSpace
1 parent 162caf7 commit 3ca84b2

File tree

2 files changed

+36
-8
lines changed

2 files changed

+36
-8
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,13 +139,17 @@ private static long[] convertToUnsafeRow(Row javaRow, StructType schema) {
139139
* return the same object.
140140
*/
141141
public UnsafeRow getAggregationBuffer(Row groupingKey) {
142-
// Zero out the buffer that's used to hold the current row. This is necessary in order
143-
// to ensure that rows hash properly, since garbage data from the previous row could
144-
// otherwise end up as padding in this row.
145-
Arrays.fill(groupingKeyConversionScratchSpace, 0);
146142
final int groupingKeySize = groupingKeyToUnsafeRowConverter.getSizeRequirement(groupingKey);
143+
// Make sure that the buffer is large enough to hold the key. If it's not, grow it:
147144
if (groupingKeySize > groupingKeyConversionScratchSpace.length) {
145+
// This new array will be initially zero, so there's no need to zero it out here
148146
groupingKeyConversionScratchSpace = new long[groupingKeySize];
147+
} else {
148+
// Zero out the buffer that's used to hold the current row. This is necessary in order
149+
// to ensure that rows hash properly, since garbage data from the previous row could
150+
// otherwise end up as padding in this row. As a performance optimization, we only zero out
151+
// the portion of the buffer that we'll actually write to.
152+
Arrays.fill(groupingKeyConversionScratchSpace, 0, groupingKeySize, 0);
149153
}
150154
final long actualGroupingKeySize = groupingKeyToUnsafeRowConverter.writeRow(
151155
groupingKey,

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions
1919

20+
import scala.collection.JavaConverters._
21+
import scala.util.Random
22+
2023
import org.apache.spark.unsafe.memory.{MemoryManager, MemoryAllocator}
2124
import org.scalatest.{BeforeAndAfterEach, FunSuite, Matchers}
2225

@@ -59,8 +62,8 @@ class UnsafeFixedWidthAggregationMapSuite extends FunSuite with Matchers with Be
5962
aggBufferSchema,
6063
groupKeySchema,
6164
memoryManager,
62-
1024,
63-
false
65+
1024, // initial capacity
66+
false // disable perf metrics
6467
)
6568
assert(!map.iterator().hasNext)
6669
map.free()
@@ -72,8 +75,8 @@ class UnsafeFixedWidthAggregationMapSuite extends FunSuite with Matchers with Be
7275
aggBufferSchema,
7376
groupKeySchema,
7477
memoryManager,
75-
1024,
76-
false
78+
1024, // initial capacity
79+
false // disable perf metrics
7780
)
7881
val groupKey = new GenericRow(Array[Any](UTF8String("cats")))
7982

@@ -92,4 +95,25 @@ class UnsafeFixedWidthAggregationMapSuite extends FunSuite with Matchers with Be
9295
map.free()
9396
}
9497

98+
test("inserting large random keys") {
99+
val map = new UnsafeFixedWidthAggregationMap(
100+
emptyAggregationBuffer,
101+
aggBufferSchema,
102+
groupKeySchema,
103+
memoryManager,
104+
128, // initial capacity
105+
false // disable perf metrics
106+
)
107+
val rand = new Random(42)
108+
val groupKeys: Set[String] = Seq.fill(512)(rand.nextString(1024)).toSet
109+
groupKeys.foreach { keyString =>
110+
map.getAggregationBuffer(new GenericRow(Array[Any](UTF8String(keyString))))
111+
}
112+
val seenKeys: Set[String] = map.iterator().asScala.map { entry =>
113+
entry.key.getString(0)
114+
}.toSet
115+
seenKeys.size should be (groupKeys.size)
116+
seenKeys should be (groupKeys)
117+
}
118+
95119
}

0 commit comments

Comments
 (0)