Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,16 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
}

test("cache UDF result correctly") {
val expensiveUDF = udf({x: Int => Thread.sleep(5000); x})
val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a"))
val expensiveUDF = udf({x: Int => Thread.sleep(2000); x})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmh...since we fail after 2 seconds we may pass this even in case it doesn't work. Shall we put it to 3? or 2500 at least?

Copy link
Contributor Author

@dilipbiswal dilipbiswal Oct 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 OK, please correct me on this one. So we insert 2 rows .. i.e two invocation of the UDF amounting to 2 * 2sec = 4 secs of execution. So wouldn't a 2 sec fail time be ok ? Also marco, i did run this 10 times back to back without a problem - fyi.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, I do think this will pass 100% times, my concern was that in case of a regression we might fail detecting it. But yes, with the repartition to 1 you're right, I haven't considered it, otherwise they may have run in parallel. So this seems enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 Thanks marco.

val df = spark.range(0, 2).toDF("a").repartition(1).withColumn("b", expensiveUDF($"a"))
val df2 = df.agg(sum(df("b")))

df.cache()
df.count()
assertCached(df2)

// udf has been evaluated during caching, and thus should not be re-evaluated here
failAfter(3 seconds) {
failAfter(2 seconds) {
df2.collect()
}

Expand Down