Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ public void showMemoryUsage() {
logger.info(
"{} bytes of memory were used by task {} but are not associated with specific consumers",
memoryNotAccountedFor, taskAttemptId);
logger.info(
"{} bytes of memory are used for execution and {} bytes of memory are used for storage",
memoryManager.executionMemoryUsed(), memoryManager.storageMemoryUsed());
}
}

Expand Down
26 changes: 22 additions & 4 deletions core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.collection.OpenHashSet

/**
* A trait that allows a class to give [[SizeEstimator]] more accurate size estimation.
* When a class extends it, [[SizeEstimator]] will query the `estimatedSize` first.
* If `estimatedSize` does not return [[None]], [[SizeEstimator]] will use the returned size
* as the size of the object. Otherwise, [[SizeEstimator]] will do the estimation work.
*/
private[spark] trait SizeEstimation {
def estimatedSize: Option[Long]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not return Long? If a class extends this, it should return a Long.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the driver side, UnsafeHashedRelation is using a java hashmap.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the BytesToBytesMap implement this interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do not need to do that.

SizeEstimator.estimate (the publish method of SizeEstimator) is used at two places. One is memory store and another one is trait SizeTracker (a utility trait used to implement collections that need to track estimated size). We do not put BytesToBytesMap to memory store, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BytesToBytesMap is used by UnsafeHashRelation, so it's put into memory store, that's the root cause.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another approach could be remove the reference to BlockManager in BytesToBytesMap, using SparkEnv.get when needed, the difficulty could be how to fix the test (which use mocked BlockManager).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to get it merged first if there is no fundamental issue. So, we can unblock the preview package. I can make the change if we prefer to change BytesToBytesMap instead of UnsafeHashedRelation. I agree returning a Option is weird. But, I feel if it is possible, we should prefer changing UnsafeHashedRelation because it is the one used as the broadcast variable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we going publish a preview tonight or tomorrow morning? I will try to send out a patch to fix BytesToBytesMap, if I can't make it before publishing preview, feel free to merge this one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The high level approach of not relying on reflection and object walking is a good one -- actually with dataset and dataframes, we don't really need size estimation. I also think relying on thread locals and SparkEnv is much less ideal than explicit dependencies.

Either way, this pull request is ok to merge in its current shape, given it's fairly critical. We can do more changes later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #9799.

@rxin We didn't passing BlockManager down to BytesToBytesMap, already rely on thread local.

}

/**
* :: DeveloperApi ::
* Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in
Expand Down Expand Up @@ -199,10 +209,18 @@ object SizeEstimator extends Logging {
// the size estimator since it references the whole REPL. Do nothing in this case. In
// general all ClassLoaders and Classes will be shared between objects anyway.
} else {
val classInfo = getClassInfo(cls)
state.size += alignSize(classInfo.shellSize)
for (field <- classInfo.pointerFields) {
state.enqueue(field.get(obj))
val estimatedSize = obj match {
case s: SizeEstimation => s.estimatedSize
case _ => None
}
if (estimatedSize.isDefined) {
state.size += estimatedSize.get
} else {
val classInfo = getClassInfo(cls)
state.size += alignSize(classInfo.shellSize)
for (field <- classInfo.pointerFields) {
state.enqueue(field.get(obj))
}
}
}
}
Expand Down
22 changes: 22 additions & 0 deletions core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ class DummyString(val arr: Array[Char]) {
@transient val hash32: Int = 0
}

class DummyClass8 extends SizeEstimation {
val x: Int = 0

override def estimatedSize: Option[Long] = Some(2015)
}

class DummyClass9 extends SizeEstimation {
val x: Int = 0

override def estimatedSize: Option[Long] = None
}

class SizeEstimatorSuite
extends SparkFunSuite
with BeforeAndAfterEach
Expand Down Expand Up @@ -214,4 +226,14 @@ class SizeEstimatorSuite
// Class should be 32 bytes on s390x if recognised as 64 bit platform
assertResult(32)(SizeEstimator.estimate(new DummyClass7))
}

test("SizeEstimation can provide the estimated size") {
// DummyClass8 provides its size estimation.
assertResult(2015)(SizeEstimator.estimate(new DummyClass8))
assertResult(20206)(SizeEstimator.estimate(Array.fill(10)(new DummyClass8)))

// DummyClass9 does not provide its size estimation.
assertResult(16)(SizeEstimator.estimate(new DummyClass9))
assertResult(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass9)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetrics}
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.map.BytesToBytesMap
import org.apache.spark.unsafe.memory.MemoryLocation
import org.apache.spark.util.Utils
import org.apache.spark.util.{SizeEstimation, Utils}
import org.apache.spark.util.collection.CompactBuffer
import org.apache.spark.{SparkConf, SparkEnv}

Expand Down Expand Up @@ -189,7 +189,9 @@ private[execution] object HashedRelation {
*/
private[joins] final class UnsafeHashedRelation(
private var hashTable: JavaHashMap[UnsafeRow, CompactBuffer[UnsafeRow]])
extends HashedRelation with Externalizable {
extends HashedRelation
with SizeEstimation
with Externalizable {

private[joins] def this() = this(null) // Needed for serialization

Expand All @@ -215,6 +217,10 @@ private[joins] final class UnsafeHashedRelation(
}
}

override def estimatedSize: Option[Long] = {
Option(binaryMap).map(_.getTotalMemoryConsumption)
}

override def get(key: InternalRow): Seq[InternalRow] = {
val unsafeKey = key.asInstanceOf[UnsafeRow]

Expand Down