Skip to content
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -80,6 +80,14 @@ class CacheManager extends Logging {
cachedData.isEmpty
}

private def extractStatsOfPlanForCache(plan: LogicalPlan): Option[Statistics] = {
if (plan.stats.rowCount.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this condition? Even there is no row count, the sizeInBytes is still better than defaultSizeInBytes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same question before. Seems @CodingCat's claim is that the sizeInBytes from relation can cause OOM issue: #19864 (comment).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't trust sizeInBytes, we should fix it in all places, not just here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the current logic is that only when we have run have enough stats for the interested table, we would put the value here,

I agree that we should choose a better value for format like parquet where the actual in-memory size would be much larger than the sizeInBytes (i.e. on-disk size), my question is

  1. what's the expected value if we have a HadoopFsRelation which is in parquet format ?

  2. do we want to do it in this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expected value should be rowCount * avgRowSize. Without CBO, I think the file size is the best we can get, although it may not be correct.

That is to say, without CBO, parquet relation may have underestimated size and cause OOM, users need to turn on CBO to fix it. So the same thing should happen in table cache.

We can fix this by defining sizeInBytes as file size * some factor, but it should belong to another PR.

Copy link
Contributor Author

@CodingCat CodingCat Dec 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed the code here and filed a JIRA in https://issues.apache.org/jira/browse/SPARK-22790

Some(plan.stats)
} else {
None
}
}

/**
* Caches the data produced by the logical representation of the given [[Dataset]].
* Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
Expand All @@ -94,14 +102,13 @@ class CacheManager extends Logging {
logWarning("Asked to cache already cached data.")
} else {
val sparkSession = query.sparkSession
cachedData.add(CachedData(
planToCache,
InMemoryRelation(
sparkSession.sessionState.conf.useCompression,
sparkSession.sessionState.conf.columnBatchSize,
storageLevel,
sparkSession.sessionState.executePlan(planToCache).executedPlan,
tableName)))
val inMemoryRelation = InMemoryRelation(
sparkSession.sessionState.conf.useCompression,
sparkSession.sessionState.conf.columnBatchSize, storageLevel,
sparkSession.sessionState.executePlan(planToCache).executedPlan,
tableName,
extractStatsOfPlanForCache(planToCache))
cachedData.add(CachedData(planToCache, inMemoryRelation))
}
}

Expand Down Expand Up @@ -148,7 +155,8 @@ class CacheManager extends Logging {
batchSize = cd.cachedRepresentation.batchSize,
storageLevel = cd.cachedRepresentation.storageLevel,
child = spark.sessionState.executePlan(cd.plan).executedPlan,
tableName = cd.cachedRepresentation.tableName)
tableName = cd.cachedRepresentation.tableName,
statsOfPlanToCache = extractStatsOfPlanForCache(cd.plan))
needToRecache += cd.copy(cachedRepresentation = newCache)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ object InMemoryRelation {
batchSize: Int,
storageLevel: StorageLevel,
child: SparkPlan,
tableName: Option[String]): InMemoryRelation =
new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)()
tableName: Option[String],
statsOfPlanToCache: Option[Statistics]): InMemoryRelation =
new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)(
statsOfPlanToCache = statsOfPlanToCache)
}


Expand All @@ -60,7 +62,8 @@ case class InMemoryRelation(
@transient child: SparkPlan,
tableName: Option[String])(
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator)
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator,
statsOfPlanToCache: Option[Statistics] = None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where shall we put the stats parameter? in the main constructor or in the curried constructor? The major difference is whether we wanna include it in equals/hashCode

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my two cents here: I didn't look into the code which makes this influence the logic of equal and hash, but we may not want to make equals/hash dependent on this:

as in Spark SQL, we usually compare plan based on the string-represented format instead of plus stats info, e.g. try to reuse the cached plan based on the execution plan's string-representation instead of anything + stats info

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the secondary argument list seems a better place. I don't think we should incorporate the stats in the hash/equals method.

extends logical.LeafNode with MultiInstanceRelation {

override protected def innerChildren: Seq[SparkPlan] = Seq(child)
Expand All @@ -71,9 +74,10 @@ case class InMemoryRelation(

override def computeStats(): Statistics = {
if (batchStats.value == 0L) {
// Underlying columnar RDD hasn't been materialized, no useful statistics information
// available, return the default statistics.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache.

Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
// Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache when
// applicable
statsOfPlanToCache.getOrElse(Statistics(sizeInBytes =
child.sqlContext.conf.defaultSizeInBytes))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mweh - this seems very arbitrary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just follow the original implementation....I do not have a better value to put it here...if we use sizeInBytes, the risk is in the data source like parquet-formatted files in which the sizeInBytes is much smaller than the actual size in memory....your suggestion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't trust the size in bytes for parquet then we should fix that in the datasource and not here. The old version did not use statistics proper at all, now that we do, we should use that.

} else {
Statistics(sizeInBytes = batchStats.value.longValue)
}
Expand Down Expand Up @@ -142,7 +146,7 @@ case class InMemoryRelation(
def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
InMemoryRelation(
newOutput, useCompression, batchSize, storageLevel, child, tableName)(
_cachedColumnBuffers, batchStats)
_cachedColumnBuffers, batchStats, statsOfPlanToCache)
}

override def newInstance(): this.type = {
Expand All @@ -154,11 +158,12 @@ case class InMemoryRelation(
child,
tableName)(
_cachedColumnBuffers,
batchStats).asInstanceOf[this.type]
batchStats,
statsOfPlanToCache).asInstanceOf[this.type]
}

def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers

override protected def otherCopyArgs: Seq[AnyRef] =
Seq(_cachedColumnBuffers, batchStats)
Seq(_cachedColumnBuffers, batchStats, statsOfPlanToCache)
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.test.SQLTestData._
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel._
import org.apache.spark.util.Utils

class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
import testImplicits._
Expand All @@ -40,7 +41,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
data.createOrReplaceTempView(s"testData$dataType")
val storageLevel = MEMORY_ONLY
val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
val inMemoryRelation = InMemoryRelation(useCompression = true, 5, storageLevel, plan, None)
val inMemoryRelation = InMemoryRelation(useCompression = true, 5, storageLevel, plan, None,
None)

assert(inMemoryRelation.cachedColumnBuffers.getStorageLevel == storageLevel)
inMemoryRelation.cachedColumnBuffers.collect().head match {
Expand Down Expand Up @@ -116,7 +118,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {

test("simple columnar query") {
val plan = spark.sessionState.executePlan(testData.logicalPlan).sparkPlan
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None, None)

checkAnswer(scan, testData.collect().toSeq)
}
Expand All @@ -133,7 +135,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {

test("projection") {
val plan = spark.sessionState.executePlan(testData.select('value, 'key).logicalPlan).sparkPlan
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None, None)

checkAnswer(scan, testData.collect().map {
case Row(key: Int, value: String) => value -> key
Expand All @@ -149,7 +151,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {

test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
val plan = spark.sessionState.executePlan(testData.logicalPlan).sparkPlan
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None, None)

checkAnswer(scan, testData.collect().toSeq)
checkAnswer(scan, testData.collect().toSeq)
Expand Down Expand Up @@ -323,7 +325,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
test("SPARK-17549: cached table size should be correctly calculated") {
val data = spark.sparkContext.parallelize(1 to 10, 5).toDF()
val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None)
val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None, None)

// Materialize the data.
val expectedAnswer = data.collect()
Expand Down Expand Up @@ -449,7 +451,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
test("SPARK-22249: buildFilter should not throw exception when In contains an empty list") {
val attribute = AttributeReference("a", IntegerType)()
val testRelation = InMemoryRelation(false, 1, MEMORY_ONLY,
LocalTableScanExec(Seq(attribute), Nil), None)
LocalTableScanExec(Seq(attribute), Nil), None, None)
val tableScanExec = InMemoryTableScanExec(Seq(attribute),
Seq(In(attribute, Nil)), testRelation)
assert(tableScanExec.partitionFilters.isEmpty)
Expand Down Expand Up @@ -479,4 +481,35 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
}
}
}

test("SPARK-22673: InMemoryRelation should utilize existing stats whenever possible") {
withSQLConf("spark.sql.cbo.enabled" -> "true") {
val workDir = s"${Utils.createTempDir()}/table1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use withTempDir

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

val data = Seq(100, 200, 300, 400).toDF("count")
data.write.parquet(workDir)
val dfFromFile = spark.read.parquet(workDir).cache()
val inMemoryRelation = dfFromFile.queryExecution.optimizedPlan.collect {
case plan: InMemoryRelation => plan
}.head
// InMemoryRelation's stats is Long.MaxValue before the underlying RDD is materialized
assert(inMemoryRelation.computeStats().sizeInBytes === Long.MaxValue)

// InMemoryRelation's stats is updated after materializing RDD
dfFromFile.collect()
assert(inMemoryRelation.computeStats().sizeInBytes === 16)

// test of catalog table
val dfFromTable = spark.catalog.createTable("table1", workDir).cache()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wrap with withTable, which will clean up the table automatically at the end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

val inMemoryRelation2 = dfFromTable.queryExecution.optimizedPlan.
collect { case plan: InMemoryRelation => plan }.head

// Even CBO enabled, InMemoryRelation's stats keeps as the default one before table's stats
// is calculated
assert(inMemoryRelation2.computeStats().sizeInBytes === Long.MaxValue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Properly inserting few blank lines can make the test more readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


// InMemoryRelation's stats should be updated after calculating stats of the table
spark.sql("ANALYZE TABLE table1 COMPUTE STATISTICS")
assert(inMemoryRelation2.computeStats().sizeInBytes === 16)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happened here? InMemoryRelation.statsOfPlanToCache gets updated aotumatically?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was a mistake here, AnalyzeTableCommand would actually force the table to be evaluated with count(), and it actually hits the longAccumulator's value,

fixed in the latest commit

}
}
}