From 58c419a3298425a3228020a9a6ee13fb4451557e Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 5 Oct 2018 17:33:51 +0800 Subject: [PATCH 1/2] fasten BucketedReadSuite --- .../org/apache/spark/sql/sources/BucketedReadSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index a9414200e70f..88421132cf04 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -66,7 +66,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { .bucketBy(8, "j", "k") .saveAsTable("bucketed_table") - for (i <- 0 until 5) { + for (i <- 0 until 3) { val table = spark.table("bucketed_table").filter($"i" === i) val query = table.queryExecution val output = query.analyzed.output @@ -145,7 +145,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { .bucketBy(numBuckets, "j") .saveAsTable("bucketed_table") - for (j <- 0 until 13) { + for (j <- 0 until 2) { // Case 1: EqualTo checkPrunedAnswers( bucketSpec, @@ -188,7 +188,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { .bucketBy(numBuckets, "j") .saveAsTable("bucketed_table") - for (j <- 0 until 13) { + for (j <- 0 until 2) { checkPrunedAnswers( bucketSpec, bucketValues = j :: Nil, @@ -236,7 +236,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { .bucketBy(numBuckets, "j") .saveAsTable("bucketed_table") - for (j <- 0 until 13) { + for (j <- 0 until 2) { checkPrunedAnswers( bucketSpec, bucketValues = j :: Nil, From 1c5bfef591932a281a2c91b035dfb0a482ed4e7a Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Sat, 6 Oct 2018 02:00:46 +0800 Subject: [PATCH 2/2] revise --- .../spark/sql/sources/BucketedReadSuite.scala | 181 +++++++++--------- 1 file changed, 91 insertions(+), 90 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 88421132cf04..a2bc651bb2bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.sources import java.io.File import java.net.URI +import scala.util.Random + import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions @@ -47,11 +49,13 @@ class BucketedReadWithoutHiveSupportSuite extends BucketedReadSuite with SharedS abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { import testImplicits._ - private lazy val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") + private val maxI = 5 + private val maxJ = 13 + private lazy val df = (0 until 50).map(i => (i % maxI, i % maxJ, i.toString)).toDF("i", "j", "k") private lazy val nullDF = (for { i <- 0 to 50 s <- Seq(null, "a", "b", "c", "d", "e", "f", null, "g") - } yield (i % 5, s, i % 13)).toDF("i", "j", "k") + } yield (i % maxI, s, i % maxJ)).toDF("i", "j", "k") // number of buckets that doesn't yield empty buckets when bucketing on column j on df/nullDF // empty buckets before filtering might hide bugs in pruning logic @@ -66,23 +70,22 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { .bucketBy(8, "j", "k") .saveAsTable("bucketed_table") - for (i <- 0 until 3) { - val table = spark.table("bucketed_table").filter($"i" === i) - val query = table.queryExecution - val output = query.analyzed.output - val rdd = query.toRdd - - assert(rdd.partitions.length == 8) - - val attrs = table.select("j", "k").queryExecution.analyzed.output - val checkBucketId = rdd.mapPartitionsWithIndex((index, rows) => { - val getBucketId = UnsafeProjection.create( - HashPartitioning(attrs, 8).partitionIdExpression :: Nil, - output) - rows.map(row => getBucketId(row).getInt(0) -> index) - }) - checkBucketId.collect().foreach(r => assert(r._1 == r._2)) - } + val bucketValue = Random.nextInt(maxI) + val table = spark.table("bucketed_table").filter($"i" === bucketValue) + val query = table.queryExecution + val output = query.analyzed.output + val rdd = query.toRdd + + assert(rdd.partitions.length == 8) + + val attrs = table.select("j", "k").queryExecution.analyzed.output + val checkBucketId = rdd.mapPartitionsWithIndex((index, rows) => { + val getBucketId = UnsafeProjection.create( + HashPartitioning(attrs, 8).partitionIdExpression :: Nil, + output) + rows.map(row => getBucketId(row).getInt(0) -> index) + }) + checkBucketId.collect().foreach(r => assert(r._1 == r._2)) } } @@ -145,36 +148,36 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { .bucketBy(numBuckets, "j") .saveAsTable("bucketed_table") - for (j <- 0 until 2) { - // Case 1: EqualTo - checkPrunedAnswers( - bucketSpec, - bucketValues = j :: Nil, - filterCondition = $"j" === j, - df) - - // Case 2: EqualNullSafe - checkPrunedAnswers( - bucketSpec, - bucketValues = j :: Nil, - filterCondition = $"j" <=> j, - df) - - // Case 3: In - checkPrunedAnswers( - bucketSpec, - bucketValues = Seq(j, j + 1, j + 2, j + 3), - filterCondition = $"j".isin(j, j + 1, j + 2, j + 3), - df) - - // Case 4: InSet - val inSetExpr = expressions.InSet($"j".expr, Set(j, j + 1, j + 2, j + 3).map(lit(_).expr)) - checkPrunedAnswers( - bucketSpec, - bucketValues = Seq(j, j + 1, j + 2, j + 3), - filterCondition = Column(inSetExpr), - df) - } + val bucketValue = Random.nextInt(maxJ) + // Case 1: EqualTo + checkPrunedAnswers( + bucketSpec, + bucketValues = bucketValue :: Nil, + filterCondition = $"j" === bucketValue, + df) + + // Case 2: EqualNullSafe + checkPrunedAnswers( + bucketSpec, + bucketValues = bucketValue :: Nil, + filterCondition = $"j" <=> bucketValue, + df) + + // Case 3: In + checkPrunedAnswers( + bucketSpec, + bucketValues = Seq(bucketValue, bucketValue + 1, bucketValue + 2, bucketValue + 3), + filterCondition = $"j".isin(bucketValue, bucketValue + 1, bucketValue + 2, bucketValue + 3), + df) + + // Case 4: InSet + val inSetExpr = expressions.InSet($"j".expr, + Set(bucketValue, bucketValue + 1, bucketValue + 2, bucketValue + 3).map(lit(_).expr)) + checkPrunedAnswers( + bucketSpec, + bucketValues = Seq(bucketValue, bucketValue + 1, bucketValue + 2, bucketValue + 3), + filterCondition = Column(inSetExpr), + df) } } @@ -188,13 +191,12 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { .bucketBy(numBuckets, "j") .saveAsTable("bucketed_table") - for (j <- 0 until 2) { - checkPrunedAnswers( - bucketSpec, - bucketValues = j :: Nil, - filterCondition = $"j" === j, - df) - } + val bucketValue = Random.nextInt(maxJ) + checkPrunedAnswers( + bucketSpec, + bucketValues = bucketValue :: Nil, + filterCondition = $"j" === bucketValue, + df) } } @@ -236,40 +238,39 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { .bucketBy(numBuckets, "j") .saveAsTable("bucketed_table") - for (j <- 0 until 2) { - checkPrunedAnswers( - bucketSpec, - bucketValues = j :: Nil, - filterCondition = $"j" === j && $"k" > $"j", - df) - - checkPrunedAnswers( - bucketSpec, - bucketValues = j :: Nil, - filterCondition = $"j" === j && $"i" > j % 5, - df) - - // check multiple bucket values OR condition - checkPrunedAnswers( - bucketSpec, - bucketValues = Seq(j, j + 1), - filterCondition = $"j" === j || $"j" === (j + 1), - df) - - // check bucket value and none bucket value OR condition - checkPrunedAnswers( - bucketSpec, - bucketValues = Nil, - filterCondition = $"j" === j || $"i" === 0, - df) - - // check AND condition in complex expression - checkPrunedAnswers( - bucketSpec, - bucketValues = Seq(j), - filterCondition = ($"i" === 0 || $"k" > $"j") && $"j" === j, - df) - } + val bucketValue = Random.nextInt(maxJ) + checkPrunedAnswers( + bucketSpec, + bucketValues = bucketValue :: Nil, + filterCondition = $"j" === bucketValue && $"k" > $"j", + df) + + checkPrunedAnswers( + bucketSpec, + bucketValues = bucketValue :: Nil, + filterCondition = $"j" === bucketValue && $"i" > bucketValue % 5, + df) + + // check multiple bucket values OR condition + checkPrunedAnswers( + bucketSpec, + bucketValues = Seq(bucketValue, bucketValue + 1), + filterCondition = $"j" === bucketValue || $"j" === (bucketValue + 1), + df) + + // check bucket value and none bucket value OR condition + checkPrunedAnswers( + bucketSpec, + bucketValues = Nil, + filterCondition = $"j" === bucketValue || $"i" === 0, + df) + + // check AND condition in complex expression + checkPrunedAnswers( + bucketSpec, + bucketValues = Seq(bucketValue), + filterCondition = ($"i" === 0 || $"k" > $"j") && $"j" === bucketValue, + df) } }