Skip to content

Commit c55bf66

Browse files
committed
Free buffer once iterator has been fully consumed.
1 parent 62ab054 commit c55bf66

File tree

1 file changed

+15
-6
lines changed

1 file changed

+15
-6
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeGeneratedAggregate.scala

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,6 @@ case class UnsafeGeneratedAggregate(
261261
val buffers = new BytesToBytesMap(MemoryAllocator.HEAP, 128)
262262

263263
// Set up the mutable "pointers" that we'll re-use when pointing to key and value rows
264-
val keyPointer: UnsafeRow = new UnsafeRow()
265264
val currentBuffer: UnsafeRow = new UnsafeRow()
266265

267266
// We're going to need to allocate a lot of empty aggregation buffers, so let's do it
@@ -365,11 +364,21 @@ case class UnsafeGeneratedAggregate(
365364
valueAddress.getBaseOffset,
366365
aggregationBufferSchema.length,
367366
aggregationBufferSchema)
368-
// TODO: once the iterator has been fully consumed, we need to free the map so that
369-
// its off-heap memory is reclaimed. This may mean that we'll have to perform an extra
370-
// defensive copy of the last row so that we can free that memory before returning
371-
// to the caller.
372-
resultProjection(joinedRow(key, value))
367+
val result = resultProjection(joinedRow(key, value))
368+
if (hasNext) {
369+
result
370+
} else {
371+
// This is the last element in the iterator, so let's free the buffer. Before we do,
372+
// though, we need to make a defensive copy of the result so that we don't return an
373+
// object that might contain dangling pointers to the freed memory
374+
val resultCopy = result.copy()
375+
buffers.free()
376+
resultCopy
377+
}
378+
}
379+
380+
override def finalize(): Unit = {
381+
buffers.free()
373382
}
374383
}
375384
}

0 commit comments

Comments
 (0)