Skip to content

Commit 0cd8dc1

Browse files
committed
refine test cases
1 parent a782aac commit 0cd8dc1

File tree

2 files changed

+30
-18
lines changed

2 files changed

+30
-18
lines changed

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -803,24 +803,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
803803
assert(cachedData.collect === Seq(1001))
804804
}
805805

806-
test("non-cascading delete") {
807-
val df1 = testData.filter('key > 1)
808-
df1.cache()
809-
df1.count()
810-
assertCached(df1)
811-
812-
val df3 = df1.select('key)
813-
df3.cache()
814-
df3.count()
815-
assertCached(df3)
816-
817-
df1.unpersist(blocking = true)
818-
819-
assertCached(testData.filter('key > 1), 0)
820-
assertCached(testData.filter('key > 1).select('key))
821-
}
822-
823-
test("non-cascading delete 2") {
806+
test("SPARK-24596 Non-cascading Cache Invalidation") {
824807
withTempView("t1", "t2", "t3") {
825808
val rows = Seq(
826809
Row("p1", 30), Row("p2", 20), Row("p3", 25),

sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,4 +132,33 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
132132
df.unpersist()
133133
assert(df.storageLevel == StorageLevel.NONE)
134134
}
135+
136+
test("SPARK-24596 Non-cascading Cache Invalidation") {
137+
val expensiveUDF = udf({x: Int => Thread.sleep(10000); x})
138+
val df = spark.range(0, 10).toDF("a")
139+
val df1 = df.withColumn("b", expensiveUDF($"a"))
140+
val df2 = df1.groupBy('a).agg(sum('b))
141+
val df3 = df.agg(sum('a))
142+
143+
df1.cache()
144+
df2.cache()
145+
df2.collect()
146+
df3.cache()
147+
148+
df1.unpersist(blocking = true)
149+
150+
assert(df1.storageLevel == StorageLevel.NONE)
151+
152+
val df4 = df1.groupBy('a).agg(sum('b)).select("sum(b)")
153+
assertCached(df4)
154+
// reuse loaded cache
155+
failAfter(5 seconds) {
156+
df4.collect()
157+
}
158+
159+
val df5 = df.agg(sum('a)).filter($"sum(a)" > 1)
160+
assertCached(df5)
161+
// first time use, load cache
162+
df5.collect()
163+
}
135164
}

0 commit comments

Comments
 (0)