Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ public final class UnsafeFixedWidthAggregationMap {
*/
private final UnsafeRow currentAggregationBuffer;

private final boolean enablePerfMetrics;

/**
* @return true if UnsafeFixedWidthAggregationMap supports aggregation buffers with the given
* schema, false otherwise.
Expand All @@ -87,23 +85,20 @@ public static boolean supportsAggregationBufferSchema(StructType schema) {
* @param taskMemoryManager the memory manager used to allocate our Unsafe memory structures.
* @param initialCapacity the initial capacity of the map (a sizing hint to avoid re-hashing).
* @param pageSizeBytes the data page size, in bytes; limits the maximum record size.
* @param enablePerfMetrics if true, performance metrics will be recorded (has minor perf impact)
*/
public UnsafeFixedWidthAggregationMap(
InternalRow emptyAggregationBuffer,
StructType aggregationBufferSchema,
StructType groupingKeySchema,
TaskMemoryManager taskMemoryManager,
int initialCapacity,
long pageSizeBytes,
boolean enablePerfMetrics) {
long pageSizeBytes) {
this.aggregationBufferSchema = aggregationBufferSchema;
this.currentAggregationBuffer = new UnsafeRow(aggregationBufferSchema.length());
this.groupingKeyProjection = UnsafeProjection.create(groupingKeySchema);
this.groupingKeySchema = groupingKeySchema;
this.map =
new BytesToBytesMap(taskMemoryManager, initialCapacity, pageSizeBytes, enablePerfMetrics);
this.enablePerfMetrics = enablePerfMetrics;
new BytesToBytesMap(taskMemoryManager, initialCapacity, pageSizeBytes, true);

// Initialize the buffer for aggregation value
final UnsafeProjection valueProjection = UnsafeProjection.create(aggregationBufferSchema);
Expand Down Expand Up @@ -223,15 +218,11 @@ public void free() {
map.free();
}

@SuppressWarnings("UseOfSystemOutOrSystemErr")
public void printPerfMetrics() {
if (!enablePerfMetrics) {
throw new IllegalStateException("Perf metrics not enabled");
}
System.out.println("Average probes per lookup: " + map.getAverageProbesPerLookup());
System.out.println("Number of hash collisions: " + map.getNumHashCollisions());
System.out.println("Time spent resizing (ns): " + map.getTimeSpentResizingNs());
System.out.println("Total memory consumption (bytes): " + map.getTotalMemoryConsumption());
/**
* Gets the average hash map probe per looking up for the underlying `BytesToBytesMap`.
*/
public double getAverageProbesPerLookup() {
return map.getAverageProbesPerLookup();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ case class HashAggregateExec(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
"aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time"))
"aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time"),
"avgHashmapProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hashmap probe"))

override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)

Expand Down Expand Up @@ -93,6 +94,7 @@ case class HashAggregateExec(
val numOutputRows = longMetric("numOutputRows")
val peakMemory = longMetric("peakMemory")
val spillSize = longMetric("spillSize")
val avgHashmapProbe = longMetric("avgHashmapProbe")

child.execute().mapPartitions { iter =>

Expand All @@ -116,7 +118,8 @@ case class HashAggregateExec(
testFallbackStartsAt,
numOutputRows,
peakMemory,
spillSize)
spillSize,
avgHashmapProbe)
if (!hasInput && groupingExpressions.isEmpty) {
numOutputRows += 1
Iterator.single[UnsafeRow](aggregationIterator.outputForEmptyGroupingKeyWithoutInput())
Expand Down Expand Up @@ -157,7 +160,7 @@ case class HashAggregateExec(
}
}

// The variables used as aggregation buffer
// The variables used as aggregation buffer. Only used for aggregation without keys.
private var bufVars: Seq[ExprCode] = _

private def doProduceWithoutKeys(ctx: CodegenContext): String = {
Expand Down Expand Up @@ -312,8 +315,7 @@ case class HashAggregateExec(
groupingKeySchema,
TaskContext.get().taskMemoryManager(),
1024 * 16, // initial capacity
TaskContext.get().taskMemoryManager().pageSizeBytes,
false // disable tracking of performance metrics
TaskContext.get().taskMemoryManager().pageSizeBytes
)
}

Expand Down Expand Up @@ -341,7 +343,8 @@ case class HashAggregateExec(
hashMap: UnsafeFixedWidthAggregationMap,
sorter: UnsafeKVExternalSorter,
peakMemory: SQLMetric,
spillSize: SQLMetric): KVIterator[UnsafeRow, UnsafeRow] = {
spillSize: SQLMetric,
avgHashmapProbe: SQLMetric): KVIterator[UnsafeRow, UnsafeRow] = {

// update peak execution memory
val mapMemory = hashMap.getPeakMemoryUsedBytes
Expand All @@ -351,6 +354,10 @@ case class HashAggregateExec(
peakMemory.add(maxMemory)
metrics.incPeakExecutionMemory(maxMemory)

// Update average hashmap probe
val avgProbes = hashMap.getAverageProbesPerLookup()
avgHashmapProbe.add(avgProbes.ceil.toLong)

if (sorter == null) {
// not spilled
return hashMap.iterator()
Expand Down Expand Up @@ -577,6 +584,7 @@ case class HashAggregateExec(
val doAgg = ctx.freshName("doAggregateWithKeys")
val peakMemory = metricTerm(ctx, "peakMemory")
val spillSize = metricTerm(ctx, "spillSize")
val avgHashmapProbe = metricTerm(ctx, "avgHashmapProbe")

def generateGenerateCode(): String = {
if (isFastHashMapEnabled) {
Expand All @@ -602,7 +610,8 @@ case class HashAggregateExec(
${if (isFastHashMapEnabled) {
s"$iterTermForFastHashMap = $fastHashMapTerm.rowIterator();"} else ""}

$iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm, $peakMemory, $spillSize);
$iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm, $peakMemory, $spillSize,
$avgHashmapProbe);
}
""")

Expand Down Expand Up @@ -792,6 +801,8 @@ case class HashAggregateExec(
| $unsafeRowBuffer =
| $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, ${hashEval.value});
| }
| // Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
| // aggregation after processing all input rows.
| if ($unsafeRowBuffer == null) {
| if ($sorterTerm == null) {
| $sorterTerm = $hashMapTerm.destructAndCreateExternalSorter();
Expand All @@ -800,7 +811,7 @@ case class HashAggregateExec(
| }
| $resetCounter
| // the hash map had be spilled, it should have enough memory now,
| // try to allocate buffer again.
| // try to allocate buffer again.
| $unsafeRowBuffer =
| $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, ${hashEval.value});
| if ($unsafeRowBuffer == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ class TungstenAggregationIterator(
testFallbackStartsAt: Option[(Int, Int)],
numOutputRows: SQLMetric,
peakMemory: SQLMetric,
spillSize: SQLMetric)
spillSize: SQLMetric,
avgHashmapProbe: SQLMetric)
extends AggregationIterator(
groupingExpressions,
originalInputAttributes,
Expand Down Expand Up @@ -162,8 +163,7 @@ class TungstenAggregationIterator(
StructType.fromAttributes(groupingExpressions.map(_.toAttribute)),
TaskContext.get().taskMemoryManager(),
1024 * 16, // initial capacity
TaskContext.get().taskMemoryManager().pageSizeBytes,
false // disable tracking of performance metrics
TaskContext.get().taskMemoryManager().pageSizeBytes
)

// The function used to read and process input rows. When processing input rows,
Expand Down Expand Up @@ -420,6 +420,10 @@ class TungstenAggregationIterator(
peakMemory += maxMemory
spillSize += metrics.memoryBytesSpilled - spillSizeBefore
metrics.incPeakExecutionMemory(maxMemory)

// Update average hashmap probe if this is the last record.
val averageProbes = hashMap.getAverageProbesPerLookup()
avgHashmapProbe.add(averageProbes.ceil.toLong)
}
numOutputRows += 1
res
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato
}
}


object SQLMetrics {
private val SUM_METRIC = "sum"
private val SIZE_METRIC = "size"
private val TIMING_METRIC = "timing"
private val AVERAGE_METRIC = "average"

def createMetric(sc: SparkContext, name: String): SQLMetric = {
val acc = new SQLMetric(SUM_METRIC)
Expand Down Expand Up @@ -102,6 +102,22 @@ object SQLMetrics {
acc
}

/**
* Create a metric to report the average information (including min, med, max) like
* avg hashmap probe. Because `SQLMetric` stores long values, we take the ceil of the average
* values before storing them. This metric is used to record an average value computed in the
* end of a task. It should be set once. The initial values (zeros) of this metrics will be
* excluded after.
*/
def createAverageMetric(sc: SparkContext, name: String): SQLMetric = {
// The final result of this metric in physical operator UI may looks like:
// probe avg (min, med, max):
// (1, 2, 6)
val acc = new SQLMetric(AVERAGE_METRIC)
acc.register(sc, name = Some(s"$name (min, med, max)"), countFailedValues = false)
acc
}

/**
* A function that defines how we aggregate the final accumulator results among all tasks,
* and represent it in string for a SQL physical operator.
Expand All @@ -110,6 +126,20 @@ object SQLMetrics {
if (metricsType == SUM_METRIC) {
val numberFormat = NumberFormat.getIntegerInstance(Locale.US)
numberFormat.format(values.sum)
} else if (metricsType == AVERAGE_METRIC) {
val numberFormat = NumberFormat.getIntegerInstance(Locale.US)

val validValues = values.filter(_ > 0)
val Seq(min, med, max) = {
val metric = if (validValues.isEmpty) {
Seq.fill(3)(0L)
} else {
val sorted = validValues.sorted
Seq(sorted(0), sorted(validValues.length / 2), sorted(validValues.length - 1))
}
metric.map(numberFormat.format)
}
s"\n($min, $med, $max)"
} else {
val strFormat: Long => String = if (metricsType == SIZE_METRIC) {
Utils.bytesToString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ class UnsafeFixedWidthAggregationMapSuite
groupKeySchema,
taskMemoryManager,
1024, // initial capacity,
PAGE_SIZE_BYTES,
false // disable perf metrics
PAGE_SIZE_BYTES
)
assert(!map.iterator().next())
map.free()
Expand All @@ -125,8 +124,7 @@ class UnsafeFixedWidthAggregationMapSuite
groupKeySchema,
taskMemoryManager,
1024, // initial capacity
PAGE_SIZE_BYTES,
false // disable perf metrics
PAGE_SIZE_BYTES
)
val groupKey = InternalRow(UTF8String.fromString("cats"))

Expand All @@ -152,8 +150,7 @@ class UnsafeFixedWidthAggregationMapSuite
groupKeySchema,
taskMemoryManager,
128, // initial capacity
PAGE_SIZE_BYTES,
false // disable perf metrics
PAGE_SIZE_BYTES
)
val rand = new Random(42)
val groupKeys: Set[String] = Seq.fill(512)(rand.nextString(1024)).toSet
Expand All @@ -178,8 +175,7 @@ class UnsafeFixedWidthAggregationMapSuite
groupKeySchema,
taskMemoryManager,
128, // initial capacity
PAGE_SIZE_BYTES,
false // disable perf metrics
PAGE_SIZE_BYTES
)

val keys = randomStrings(1024).take(512)
Expand Down Expand Up @@ -226,8 +222,7 @@ class UnsafeFixedWidthAggregationMapSuite
groupKeySchema,
taskMemoryManager,
128, // initial capacity
PAGE_SIZE_BYTES,
false // disable perf metrics
PAGE_SIZE_BYTES
)
val sorter = map.destructAndCreateExternalSorter()

Expand Down Expand Up @@ -267,8 +262,7 @@ class UnsafeFixedWidthAggregationMapSuite
StructType(Nil),
taskMemoryManager,
128, // initial capacity
PAGE_SIZE_BYTES,
false // disable perf metrics
PAGE_SIZE_BYTES
)
(1 to 10).foreach { i =>
val buf = map.getAggregationBuffer(UnsafeRow.createFromByteArray(0, 0))
Expand Down Expand Up @@ -312,8 +306,7 @@ class UnsafeFixedWidthAggregationMapSuite
groupKeySchema,
taskMemoryManager,
128, // initial capacity
pageSize,
false // disable perf metrics
pageSize
)

val rand = new Random(42)
Expand Down Expand Up @@ -350,8 +343,7 @@ class UnsafeFixedWidthAggregationMapSuite
groupKeySchema,
taskMemoryManager,
128, // initial capacity
pageSize,
false // disable perf metrics
pageSize
)

val rand = new Random(42)
Expand Down
Loading