Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.sql.sources
import java.io.File
import java.net.URI

import scala.util.Random

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions
Expand Down Expand Up @@ -47,11 +49,13 @@ class BucketedReadWithoutHiveSupportSuite extends BucketedReadSuite with SharedS
abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
import testImplicits._

private lazy val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
private val maxI = 5
private val maxJ = 13
private lazy val df = (0 until 50).map(i => (i % maxI, i % maxJ, i.toString)).toDF("i", "j", "k")
private lazy val nullDF = (for {
i <- 0 to 50
s <- Seq(null, "a", "b", "c", "d", "e", "f", null, "g")
} yield (i % 5, s, i % 13)).toDF("i", "j", "k")
} yield (i % maxI, s, i % maxJ)).toDF("i", "j", "k")

// number of buckets that doesn't yield empty buckets when bucketing on column j on df/nullDF
// empty buckets before filtering might hide bugs in pruning logic
Expand All @@ -66,23 +70,22 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
.bucketBy(8, "j", "k")
.saveAsTable("bucketed_table")

for (i <- 0 until 5) {
Copy link
Member

@HyukjinKwon HyukjinKwon Oct 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh haha I was struggling to find why it was 5 and 13 and looks it's in df :-)

val table = spark.table("bucketed_table").filter($"i" === i)
val query = table.queryExecution
val output = query.analyzed.output
val rdd = query.toRdd

assert(rdd.partitions.length == 8)

val attrs = table.select("j", "k").queryExecution.analyzed.output
val checkBucketId = rdd.mapPartitionsWithIndex((index, rows) => {
val getBucketId = UnsafeProjection.create(
HashPartitioning(attrs, 8).partitionIdExpression :: Nil,
output)
rows.map(row => getBucketId(row).getInt(0) -> index)
})
checkBucketId.collect().foreach(r => assert(r._1 == r._2))
}
val bucketValue = Random.nextInt(maxI)
val table = spark.table("bucketed_table").filter($"i" === bucketValue)
val query = table.queryExecution
val output = query.analyzed.output
val rdd = query.toRdd

assert(rdd.partitions.length == 8)

val attrs = table.select("j", "k").queryExecution.analyzed.output
val checkBucketId = rdd.mapPartitionsWithIndex((index, rows) => {
val getBucketId = UnsafeProjection.create(
HashPartitioning(attrs, 8).partitionIdExpression :: Nil,
output)
rows.map(row => getBucketId(row).getInt(0) -> index)
})
checkBucketId.collect().foreach(r => assert(r._1 == r._2))
}
}

Expand Down Expand Up @@ -145,36 +148,36 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
.bucketBy(numBuckets, "j")
.saveAsTable("bucketed_table")

for (j <- 0 until 13) {
// Case 1: EqualTo
checkPrunedAnswers(
bucketSpec,
bucketValues = j :: Nil,
filterCondition = $"j" === j,
df)

// Case 2: EqualNullSafe
checkPrunedAnswers(
bucketSpec,
bucketValues = j :: Nil,
filterCondition = $"j" <=> j,
df)

// Case 3: In
checkPrunedAnswers(
bucketSpec,
bucketValues = Seq(j, j + 1, j + 2, j + 3),
filterCondition = $"j".isin(j, j + 1, j + 2, j + 3),
df)

// Case 4: InSet
val inSetExpr = expressions.InSet($"j".expr, Set(j, j + 1, j + 2, j + 3).map(lit(_).expr))
checkPrunedAnswers(
bucketSpec,
bucketValues = Seq(j, j + 1, j + 2, j + 3),
filterCondition = Column(inSetExpr),
df)
}
val bucketValue = Random.nextInt(maxJ)
// Case 1: EqualTo
checkPrunedAnswers(
bucketSpec,
bucketValues = bucketValue :: Nil,
filterCondition = $"j" === bucketValue,
df)

// Case 2: EqualNullSafe
checkPrunedAnswers(
bucketSpec,
bucketValues = bucketValue :: Nil,
filterCondition = $"j" <=> bucketValue,
df)

// Case 3: In
checkPrunedAnswers(
bucketSpec,
bucketValues = Seq(bucketValue, bucketValue + 1, bucketValue + 2, bucketValue + 3),
filterCondition = $"j".isin(bucketValue, bucketValue + 1, bucketValue + 2, bucketValue + 3),
df)

// Case 4: InSet
val inSetExpr = expressions.InSet($"j".expr,
Set(bucketValue, bucketValue + 1, bucketValue + 2, bucketValue + 3).map(lit(_).expr))
checkPrunedAnswers(
bucketSpec,
bucketValues = Seq(bucketValue, bucketValue + 1, bucketValue + 2, bucketValue + 3),
filterCondition = Column(inSetExpr),
df)
}
}

Expand All @@ -188,13 +191,12 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
.bucketBy(numBuckets, "j")
.saveAsTable("bucketed_table")

for (j <- 0 until 13) {
checkPrunedAnswers(
bucketSpec,
bucketValues = j :: Nil,
filterCondition = $"j" === j,
df)
}
val bucketValue = Random.nextInt(maxJ)
checkPrunedAnswers(
bucketSpec,
bucketValues = bucketValue :: Nil,
filterCondition = $"j" === bucketValue,
df)
}
}

Expand Down Expand Up @@ -236,40 +238,39 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
.bucketBy(numBuckets, "j")
.saveAsTable("bucketed_table")

for (j <- 0 until 13) {
checkPrunedAnswers(
bucketSpec,
bucketValues = j :: Nil,
filterCondition = $"j" === j && $"k" > $"j",
df)

checkPrunedAnswers(
bucketSpec,
bucketValues = j :: Nil,
filterCondition = $"j" === j && $"i" > j % 5,
df)

// check multiple bucket values OR condition
checkPrunedAnswers(
bucketSpec,
bucketValues = Seq(j, j + 1),
filterCondition = $"j" === j || $"j" === (j + 1),
df)

// check bucket value and none bucket value OR condition
checkPrunedAnswers(
bucketSpec,
bucketValues = Nil,
filterCondition = $"j" === j || $"i" === 0,
df)

// check AND condition in complex expression
checkPrunedAnswers(
bucketSpec,
bucketValues = Seq(j),
filterCondition = ($"i" === 0 || $"k" > $"j") && $"j" === j,
df)
}
val bucketValue = Random.nextInt(maxJ)
checkPrunedAnswers(
bucketSpec,
bucketValues = bucketValue :: Nil,
filterCondition = $"j" === bucketValue && $"k" > $"j",
df)

checkPrunedAnswers(
bucketSpec,
bucketValues = bucketValue :: Nil,
filterCondition = $"j" === bucketValue && $"i" > bucketValue % 5,
df)

// check multiple bucket values OR condition
checkPrunedAnswers(
bucketSpec,
bucketValues = Seq(bucketValue, bucketValue + 1),
filterCondition = $"j" === bucketValue || $"j" === (bucketValue + 1),
df)

// check bucket value and none bucket value OR condition
checkPrunedAnswers(
bucketSpec,
bucketValues = Nil,
filterCondition = $"j" === bucketValue || $"i" === 0,
df)

// check AND condition in complex expression
checkPrunedAnswers(
bucketSpec,
bucketValues = Seq(bucketValue),
filterCondition = ($"i" === 0 || $"k" > $"j") && $"j" === bucketValue,
df)
}
}

Expand Down