Skip to content

Commit 765243d

Browse files
committed
Enable optional performance metrics for hash map.
1 parent 23a440a commit 765243d

File tree

3 files changed

+77
-5
lines changed

3 files changed

+77
-5
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ public final class UnsafeFixedWidthAggregationMap {
6868
*/
6969
private long[] groupingKeyConversionScratchSpace = new long[1024 / 8];
7070

71+
private final boolean enablePerfMetrics;
72+
7173
/**
7274
* @return true if UnsafeFixedWidthAggregationMap supports grouping keys with the given schema,
7375
* false otherwise.
@@ -102,19 +104,22 @@ public static boolean supportsAggregationBufferSchema(StructType schema) {
102104
* @param groupingKeySchema the schema of the grouping key, used for row conversion.
103105
* @param allocator the memory allocator used to allocate our Unsafe memory structures.
104106
* @param initialCapacity the initial capacity of the map (a sizing hint to avoid re-hashing).
107+
* @param enablePerfMetrics if true, performance metrics will be recorded (has minor perf impact)
105108
*/
106109
public UnsafeFixedWidthAggregationMap(
107110
Row emptyAggregationBuffer,
108111
StructType aggregationBufferSchema,
109112
StructType groupingKeySchema,
110113
MemoryAllocator allocator,
111-
int initialCapacity) {
114+
int initialCapacity,
115+
boolean enablePerfMetrics) {
112116
this.emptyAggregationBuffer =
113117
convertToUnsafeRow(emptyAggregationBuffer, aggregationBufferSchema);
114118
this.aggregationBufferSchema = aggregationBufferSchema;
115119
this.groupingKeyToUnsafeRowConverter = new UnsafeRowConverter(groupingKeySchema);
116120
this.groupingKeySchema = groupingKeySchema;
117-
this.map = new BytesToBytesMap(allocator, initialCapacity);
121+
this.map = new BytesToBytesMap(allocator, initialCapacity, enablePerfMetrics);
122+
this.enablePerfMetrics = enablePerfMetrics;
118123
}
119124

120125
/**
@@ -232,4 +237,13 @@ public void free() {
232237
map.free();
233238
}
234239

240+
public void printPerfMetrics() {
241+
if (!enablePerfMetrics) {
242+
throw new IllegalStateException("Perf metrics not enabled");
243+
}
244+
System.out.println("Average probes per lookup: " + map.getAverageProbesPerLookup());
245+
System.out.println("Time spent resizing (ms): " + map.getTimeSpentResizingMs());
246+
System.out.println("Total memory consumption (bytes): " + map.getTotalMemoryConsumption());
247+
}
248+
235249
}

sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,8 @@ case class GeneratedAggregate(
290290
aggregationBufferSchema,
291291
groupKeySchema,
292292
MemoryAllocator.UNSAFE,
293-
1024 * 16
293+
1024 * 16,
294+
false
294295
)
295296

296297
while (iter.hasNext) {

unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,17 +139,38 @@ public final class BytesToBytesMap {
139139

140140
private final Location loc;
141141

142+
private final boolean enablePerfMetrics;
142143

143-
public BytesToBytesMap(MemoryAllocator allocator, int initialCapacity, double loadFactor) {
144+
private long timeSpentResizingMs = 0;
145+
146+
private int numResizes = 0;
147+
148+
private long numProbes = 0;
149+
150+
private long numKeyLookups = 0;
151+
152+
public BytesToBytesMap(
153+
MemoryAllocator allocator,
154+
int initialCapacity,
155+
double loadFactor,
156+
boolean enablePerfMetrics) {
144157
this.inHeap = allocator instanceof HeapMemoryAllocator;
145158
this.allocator = allocator;
146159
this.loadFactor = loadFactor;
147160
this.loc = new Location();
161+
this.enablePerfMetrics = enablePerfMetrics;
148162
allocate(initialCapacity);
149163
}
150164

151165
public BytesToBytesMap(MemoryAllocator allocator, int initialCapacity) {
152-
this(allocator, initialCapacity, 0.70);
166+
this(allocator, initialCapacity, 0.70, false);
167+
}
168+
169+
public BytesToBytesMap(
170+
MemoryAllocator allocator,
171+
int initialCapacity,
172+
boolean enablePerfMetrics) {
173+
this(allocator, initialCapacity, 0.70, enablePerfMetrics);
153174
}
154175

155176
@Override
@@ -205,10 +226,16 @@ public Location lookup(
205226
Object keyBaseObject,
206227
long keyBaseOffset,
207228
int keyRowLengthBytes) {
229+
if (enablePerfMetrics) {
230+
numKeyLookups++;
231+
}
208232
final int hashcode = HASHER.hashUnsafeWords(keyBaseObject, keyBaseOffset, keyRowLengthBytes);
209233
int pos = hashcode & mask;
210234
int step = 1;
211235
while (true) {
236+
if (enablePerfMetrics) {
237+
numProbes++;
238+
}
212239
if (!bitset.isSet(pos)) {
213240
// This is a new key.
214241
return loc.with(pos, hashcode, false);
@@ -484,10 +511,36 @@ public long getTotalMemoryConsumption() {
484511
longArray.memoryBlock().size());
485512
}
486513

514+
/**
515+
* Returns the total amount of time spent resizing this map (in milliseconds).
516+
*/
517+
public long getTimeSpentResizingMs() {
518+
if (!enablePerfMetrics) {
519+
throw new IllegalStateException();
520+
}
521+
return timeSpentResizingMs;
522+
}
523+
524+
525+
/**
526+
* Returns the average number of probes per key lookup.
527+
*/
528+
public double getAverageProbesPerLookup() {
529+
if (!enablePerfMetrics) {
530+
throw new IllegalStateException();
531+
}
532+
return (1.0 * numProbes) / numKeyLookups;
533+
}
534+
487535
/**
488536
* Grows the size of the hash table and re-hash everything.
489537
*/
490538
private void growAndRehash() {
539+
long resizeStartTime = -1;
540+
if (enablePerfMetrics) {
541+
numResizes++;
542+
resizeStartTime = System.currentTimeMillis();
543+
}
491544
// Store references to the old data structures to be used when we re-hash
492545
final LongArray oldLongArray = longArray;
493546
final BitSet oldBitSet = bitset;
@@ -526,6 +579,10 @@ private void growAndRehash() {
526579
// Deallocate the old data structures.
527580
allocator.free(oldLongArray.memoryBlock());
528581
allocator.free(oldBitSet.memoryBlock());
582+
if (enablePerfMetrics) {
583+
System.out.println("Resizing took " + (System.currentTimeMillis() - resizeStartTime) + " ms");
584+
timeSpentResizingMs += System.currentTimeMillis() - resizeStartTime;
585+
}
529586
}
530587

531588
/** Returns the next number greater or equal num that is power of 2. */

0 commit comments

Comments
 (0)