Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ case class CachedRDDBuilder(
tableName: Option[String])(
@transient private var _cachedColumnBuffers: RDD[CachedBatch] = null) {

override def toString: String = s"CachedRDDBuilder($useCompression, $batchSize, $storageLevel)"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My major point is whether we need to output $useCompression, $batchSize. How useful are they? Our explain output is already pretty long. Maybe we can skip them?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I think the output should be the same with one in v2.3;

scala> val df = Seq((1, 2), (3, 4)).toDF("a", "b")
scala> val testDf = df.join(df, "a").join(df, "a").cache
scala> testDf.groupBy("a").count().explain
== Physical Plan ==
*(2) HashAggregate(keys=[a#309], functions=[count(1)])
+- Exchange hashpartitioning(a#309, 200)
   +- *(1) HashAggregate(keys=[a#309], functions=[partial_count(1)])
      +- *(1) InMemoryTableScan [a#309]
            +- InMemoryRelation [a#309, b#310, b#314, b#319], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- *(3) Project [a#60, b#61, b#212, b#217]
                     +- *(3) BroadcastHashJoin [a#60], [a#216], Inner, BuildRight
                        :- *(3) Project [a#60, b#61, b#212]
                        :  +- *(3) BroadcastHashJoin [a#60], [a#211], Inner, BuildRight
                        :     :- *(3) InMemoryTableScan [a#60, b#61]
                        :     :     +- InMemoryRelation [a#60, b#61], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                        :     :           +- LocalTableScan [a#15, b#16]
                        :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
                        :        +- *(1) InMemoryTableScan [a#211, b#212]
                        :              +- InMemoryRelation [a#211, b#212], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                        :                    +- LocalTableScan [a#15, b#16]
                        +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
                           +- *(2) InMemoryTableScan [a#216, b#217]
                                 +- InMemoryRelation [a#216, b#217], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                                       +- LocalTableScan [a#15, b#16]

The output of this current pr is still different, so can you fix that way? @onursatici


val sizeInBytesStats: LongAccumulator = cachedPlan.sqlContext.sparkContext.longAccumulator

def cachedColumnBuffers: RDD[CachedBatch] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,19 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
// first time use, load cache
checkDataset(df5, Row(10))
}

test("SPARK-24850 InMemoryRelation string representation does not include cached plan") {
val dummyQueryExecution = spark.range(0, 1).toDF().queryExecution
val inMemoryRelation = InMemoryRelation(
true,
1000,
StorageLevel.MEMORY_ONLY,
dummyQueryExecution.sparkPlan,
Some("test-relation"),
dummyQueryExecution.logical)

assert(!inMemoryRelation.simpleString.contains(dummyQueryExecution.sparkPlan.toString))
assert(inMemoryRelation.simpleString.contains(
"CachedRDDBuilder(true, 1000, StorageLevel(memory, deserialized, 1 replicas))"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true and 1000 look confusing to end users. Can we improve it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we might not need the batch size in the plan.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just comparing explain output results like the query in this pr description?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile tried to keep this close to its default value, maybe we can do something like CachedRDDBuilder(useCompression = true, batchSize = 1000, ...)? But that will break the consistency across logging case classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu wouldn't that be testing the same thing, as explain calls plan.treeString which calls elem.simpleString for every child? I think testing for InMemoryRelation.simpleString covers other possible places where a plan.treeString is logged. Happy to change if you have concerns

Copy link
Member

@maropu maropu Jul 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, but you don't need to fill useCompression and batchSize in the test case.

}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just comparing explain results?

    val df = Seq((1, 2)).toDF("a", "b").cache
    val outputStream = new java.io.ByteArrayOutputStream()
    Console.withOut(outputStream) {
      df.explain(false)
    }
    assert(outputStream.toString.replaceAll("#\\d+", "#x").contains(
      "InMemoryRelation [a#x, b#x], StorageLevel(disk, memory, deserialized, 1 replicas)"))

}