Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,21 +227,28 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
private val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1")
private val df2 = (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2")

case class BucketedTableTestSpec(
bucketSpec: Option[BucketSpec],
numPartitions: Int = 10,
expectedShuffle: Boolean = true,
expectedSort: Boolean = true)

/**
* A helper method to test the bucket read functionality using join. It will save `df1` and `df2`
* to hive tables, bucketed or not, according to the given bucket specifics. Next we will join
* these 2 tables, and firstly make sure the answer is corrected, and then check if the shuffle
* exists as user expected according to the `shuffleLeft` and `shuffleRight`.
*/
private def testBucketing(
bucketSpecLeft: Option[BucketSpec],
bucketSpecRight: Option[BucketSpec],
bucketedTableTestSpecLeft: BucketedTableTestSpec,
bucketedTableTestSpecRight: BucketedTableTestSpec,
joinType: String = "inner",
joinCondition: (DataFrame, DataFrame) => Column,
shuffleLeft: Boolean,
shuffleRight: Boolean,
sortLeft: Boolean = true,
sortRight: Boolean = true): Unit = {
joinCondition: (DataFrame, DataFrame) => Column): Unit = {
val BucketedTableTestSpec(bucketSpecLeft, numPartitionsLeft, shuffleLeft, sortLeft) =
bucketedTableTestSpecLeft
val BucketedTableTestSpec(bucketSpecRight, numPartitionsRight, shuffleRight, sortRight) =
bucketedTableTestSpecRight

withTable("bucketed_table1", "bucketed_table2") {
def withBucket(
writer: DataFrameWriter[Row],
Expand All @@ -263,8 +270,10 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
}.getOrElse(writer)
}

withBucket(df1.write.format("parquet"), bucketSpecLeft).saveAsTable("bucketed_table1")
withBucket(df2.write.format("parquet"), bucketSpecRight).saveAsTable("bucketed_table2")
withBucket(df1.repartition(numPartitionsLeft).write.format("parquet"), bucketSpecLeft)
.saveAsTable("bucketed_table1")
withBucket(df2.repartition(numPartitionsRight).write.format("parquet"), bucketSpecRight)
.saveAsTable("bucketed_table2")

withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0",
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
Expand All @@ -291,10 +300,10 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
// check existence of sort
assert(
joinOperator.left.find(_.isInstanceOf[SortExec]).isDefined == sortLeft,
s"expected sort in plan to be $shuffleLeft but found\n${joinOperator.left}")
s"expected sort in the left child to be $sortLeft but found\n${joinOperator.left}")
assert(
joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == sortRight,
s"expected sort in plan to be $shuffleRight but found\n${joinOperator.right}")
s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}")
}
}
}
Expand All @@ -305,138 +314,174 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet

test("avoid shuffle when join 2 bucketed tables") {
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
testBucketing(
bucketSpecLeft = bucketSpec,
bucketSpecRight = bucketSpec,
joinCondition = joinCondition(Seq("i", "j")),
shuffleLeft = false,
shuffleRight = false
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinCondition = joinCondition(Seq("i", "j"))
)
}

// Enable it after fix https://issues.apache.org/jira/browse/SPARK-12704
ignore("avoid shuffle when join keys are a super-set of bucket keys") {
val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
testBucketing(
bucketSpecLeft = bucketSpec,
bucketSpecRight = bucketSpec,
joinCondition = joinCondition(Seq("i", "j")),
shuffleLeft = false,
shuffleRight = false
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinCondition = joinCondition(Seq("i", "j"))
)
}

test("only shuffle one side when join bucketed table and non-bucketed table") {
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)
val bucketedTableTestSpecRight = BucketedTableTestSpec(None, expectedShuffle = true)
testBucketing(
bucketSpecLeft = bucketSpec,
bucketSpecRight = None,
joinCondition = joinCondition(Seq("i", "j")),
shuffleLeft = false,
shuffleRight = true
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinCondition = joinCondition(Seq("i", "j"))
)
}

test("only shuffle one side when 2 bucketed tables have different bucket number") {
val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Nil))
val bucketSpec2 = Some(BucketSpec(5, Seq("i", "j"), Nil))
val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Nil))
val bucketSpecRight = Some(BucketSpec(5, Seq("i", "j"), Nil))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpecLeft, expectedShuffle = false)
val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpecRight, expectedShuffle = true)
testBucketing(
bucketSpecLeft = bucketSpec1,
bucketSpecRight = bucketSpec2,
joinCondition = joinCondition(Seq("i", "j")),
shuffleLeft = false,
shuffleRight = true
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinCondition = joinCondition(Seq("i", "j"))
)
}

test("only shuffle one side when 2 bucketed tables have different bucket keys") {
val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Nil))
val bucketSpec2 = Some(BucketSpec(8, Seq("j"), Nil))
val bucketSpecLeft = Some(BucketSpec(8, Seq("i"), Nil))
val bucketSpecRight = Some(BucketSpec(8, Seq("j"), Nil))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpecLeft, expectedShuffle = false)
val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpecRight, expectedShuffle = true)
testBucketing(
bucketSpecLeft = bucketSpec1,
bucketSpecRight = bucketSpec2,
joinCondition = joinCondition(Seq("i")),
shuffleLeft = false,
shuffleRight = true
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinCondition = joinCondition(Seq("i"))
)
}

test("shuffle when join keys are not equal to bucket keys") {
val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = true)
val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = true)
testBucketing(
bucketSpecLeft = bucketSpec,
bucketSpecRight = bucketSpec,
joinCondition = joinCondition(Seq("j")),
shuffleLeft = true,
shuffleRight = true
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinCondition = joinCondition(Seq("j"))
)
}

test("shuffle when join 2 bucketed tables with bucketing disabled") {
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = true)
val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = true)
withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") {
testBucketing(
bucketSpecLeft = bucketSpec,
bucketSpecRight = bucketSpec,
joinCondition = joinCondition(Seq("i", "j")),
shuffleLeft = true,
shuffleRight = true
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinCondition = joinCondition(Seq("i", "j"))
)
}
}

test("avoid shuffle and sort when bucket and sort columns are join keys") {
test("check sort and shuffle when bucket and sort columns are join keys") {
// In case of bucketing, its possible to have multiple files belonging to the
// same bucket in a given relation. Each of these files are locally sorted
// but those files combined together are not globally sorted. Given that,
// the RDD partition will not be sorted even if the relation has sort columns set
// Therefore, we still need to keep the Sort in both sides.
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))

val bucketedTableTestSpecLeft1 = BucketedTableTestSpec(
bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true)
val bucketedTableTestSpecRight1 = BucketedTableTestSpec(
bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
testBucketing(
bucketSpecLeft = bucketSpec,
bucketSpecRight = bucketSpec,
joinCondition = joinCondition(Seq("i", "j")),
shuffleLeft = false,
shuffleRight = false,
sortLeft = false,
sortRight = false
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft1,
bucketedTableTestSpecRight = bucketedTableTestSpecRight1,
joinCondition = joinCondition(Seq("i", "j"))
)

val bucketedTableTestSpecLeft2 = BucketedTableTestSpec(
bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
val bucketedTableTestSpecRight2 = BucketedTableTestSpec(
bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true)
testBucketing(
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft2,
bucketedTableTestSpecRight = bucketedTableTestSpecRight2,
joinCondition = joinCondition(Seq("i", "j"))
)

val bucketedTableTestSpecLeft3 = BucketedTableTestSpec(
bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true)
val bucketedTableTestSpecRight3 = BucketedTableTestSpec(
bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true)
testBucketing(
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft3,
bucketedTableTestSpecRight = bucketedTableTestSpecRight3,
joinCondition = joinCondition(Seq("i", "j"))
)

val bucketedTableTestSpecLeft4 = BucketedTableTestSpec(
bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
val bucketedTableTestSpecRight4 = BucketedTableTestSpec(
bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
testBucketing(
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft4,
bucketedTableTestSpecRight = bucketedTableTestSpecRight4,
joinCondition = joinCondition(Seq("i", "j"))
)
}

test("avoid shuffle and sort when sort columns are a super set of join keys") {
val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Seq("i", "j")))
val bucketSpec2 = Some(BucketSpec(8, Seq("i"), Seq("i", "k")))
val bucketSpecLeft = Some(BucketSpec(8, Seq("i"), Seq("i", "j")))
val bucketSpecRight = Some(BucketSpec(8, Seq("i"), Seq("i", "k")))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(
bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false)
val bucketedTableTestSpecRight = BucketedTableTestSpec(
bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = false)
testBucketing(
bucketSpecLeft = bucketSpec1,
bucketSpecRight = bucketSpec2,
joinCondition = joinCondition(Seq("i")),
shuffleLeft = false,
shuffleRight = false,
sortLeft = false,
sortRight = false
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinCondition = joinCondition(Seq("i"))
)
}

test("only sort one side when sort columns are different") {
val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("k")))
val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
val bucketSpecRight = Some(BucketSpec(8, Seq("i", "j"), Seq("k")))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(
bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false)
val bucketedTableTestSpecRight = BucketedTableTestSpec(
bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = true)
testBucketing(
bucketSpecLeft = bucketSpec1,
bucketSpecRight = bucketSpec2,
joinCondition = joinCondition(Seq("i", "j")),
shuffleLeft = false,
shuffleRight = false,
sortLeft = false,
sortRight = true
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinCondition = joinCondition(Seq("i", "j"))
)
}

test("only sort one side when sort columns are same but their ordering is different") {
val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i")))
val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
val bucketSpecRight = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i")))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(
bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false)
val bucketedTableTestSpecRight = BucketedTableTestSpec(
bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = true)
testBucketing(
bucketSpecLeft = bucketSpec1,
bucketSpecRight = bucketSpec2,
joinCondition = joinCondition(Seq("i", "j")),
shuffleLeft = false,
shuffleRight = false,
sortLeft = false,
sortRight = true
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinCondition = joinCondition(Seq("i", "j"))
)
}

Expand Down Expand Up @@ -470,20 +515,20 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet

test("SPARK-17698 Join predicates should not contain filter clauses") {
val bucketSpec = Some(BucketSpec(8, Seq("i"), Seq("i")))
val bucketedTableTestSpecLeft = BucketedTableTestSpec(
bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
val bucketedTableTestSpecRight = BucketedTableTestSpec(
bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
testBucketing(
bucketSpecLeft = bucketSpec,
bucketSpecRight = bucketSpec,
bucketedTableTestSpecLeft = bucketedTableTestSpecLeft,
bucketedTableTestSpecRight = bucketedTableTestSpecRight,
joinType = "fullouter",
joinCondition = (left: DataFrame, right: DataFrame) => {
val joinPredicates = Seq("i").map(col => left(col) === right(col)).reduce(_ && _)
val filterLeft = left("i") === Literal("1")
val filterRight = right("i") === Literal("1")
joinPredicates && filterLeft && filterRight
},
shuffleLeft = false,
shuffleRight = false,
sortLeft = false,
sortRight = false
}
)
}

Expand Down