Skip to content

Commit f1569bf

Browse files
committed
fix.
1 parent d158ce1 commit f1569bf

File tree

1 file changed

+129
-111
lines changed

1 file changed

+129
-111
lines changed

sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala

Lines changed: 129 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -227,22 +227,28 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
227227
private val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1")
228228
private val df2 = (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2")
229229

230+
case class BucketTableTestSpec(
231+
bucketSpec: Option[BucketSpec],
232+
numPartitions: Int = 10,
233+
expectedShuffle: Boolean = true,
234+
expectedSort: Boolean = true)
235+
230236
/**
231237
* A helper method to test the bucket read functionality using join. It will save `df1` and `df2`
232238
* to hive tables, bucketed or not, according to the given bucket specifics. Next we will join
233239
* these 2 tables, and firstly make sure the answer is corrected, and then check if the shuffle
234240
* exists as user expected according to the `shuffleLeft` and `shuffleRight`.
235241
*/
236242
private def testBucketing(
237-
bucketSpecLeft: Option[BucketSpec],
238-
bucketSpecRight: Option[BucketSpec],
243+
bucketTableTestSpecLeft: BucketTableTestSpec,
244+
bucketTableTestSpecRight: BucketTableTestSpec,
239245
joinType: String = "inner",
240-
joinCondition: (DataFrame, DataFrame) => Column,
241-
shuffleLeft: Boolean,
242-
shuffleRight: Boolean,
243-
numPartitions: Int = 10,
244-
sortLeft: Boolean = true,
245-
sortRight: Boolean = true): Unit = {
246+
joinCondition: (DataFrame, DataFrame) => Column): Unit = {
247+
val BucketTableTestSpec(bucketSpecLeft, numPartitionsLeft, shuffleLeft, sortLeft) =
248+
bucketTableTestSpecLeft
249+
val BucketTableTestSpec(bucketSpecRight, numPartitionsRight, shuffleRight, sortRight) =
250+
bucketTableTestSpecRight
251+
246252
withTable("bucketed_table1", "bucketed_table2") {
247253
def withBucket(
248254
writer: DataFrameWriter[Row],
@@ -264,9 +270,9 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
264270
}.getOrElse(writer)
265271
}
266272

267-
withBucket(df1.repartition(numPartitions).write.format("parquet"), bucketSpecLeft)
273+
withBucket(df1.repartition(numPartitionsLeft).write.format("parquet"), bucketSpecLeft)
268274
.saveAsTable("bucketed_table1")
269-
withBucket(df2.repartition(numPartitions).write.format("parquet"), bucketSpecRight)
275+
withBucket(df2.repartition(numPartitionsRight).write.format("parquet"), bucketSpecRight)
270276
.saveAsTable("bucketed_table2")
271277

272278
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0",
@@ -308,161 +314,174 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
308314

309315
test("avoid shuffle when join 2 bucketed tables") {
310316
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil))
317+
val bucketTableTestSpecLeft = BucketTableTestSpec(bucketSpec, expectedShuffle = false)
318+
val bucketTableTestSpecRight = BucketTableTestSpec(bucketSpec, expectedShuffle = false)
311319
testBucketing(
312-
bucketSpecLeft = bucketSpec,
313-
bucketSpecRight = bucketSpec,
314-
joinCondition = joinCondition(Seq("i", "j")),
315-
shuffleLeft = false,
316-
shuffleRight = false
320+
bucketTableTestSpecLeft,
321+
bucketTableTestSpecRight,
322+
joinCondition = joinCondition(Seq("i", "j"))
317323
)
318324
}
319325

320326
// Enable it after fix https://issues.apache.org/jira/browse/SPARK-12704
321327
ignore("avoid shuffle when join keys are a super-set of bucket keys") {
322328
val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil))
329+
val bucketTableTestSpecLeft = BucketTableTestSpec(bucketSpec, expectedShuffle = false)
330+
val bucketTableTestSpecRight = BucketTableTestSpec(bucketSpec, expectedShuffle = false)
323331
testBucketing(
324-
bucketSpecLeft = bucketSpec,
325-
bucketSpecRight = bucketSpec,
326-
joinCondition = joinCondition(Seq("i", "j")),
327-
shuffleLeft = false,
328-
shuffleRight = false
332+
bucketTableTestSpecLeft,
333+
bucketTableTestSpecRight,
334+
joinCondition = joinCondition(Seq("i", "j"))
329335
)
330336
}
331337

332338
test("only shuffle one side when join bucketed table and non-bucketed table") {
333339
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil))
340+
val bucketTableTestSpecLeft = BucketTableTestSpec(bucketSpec, expectedShuffle = false)
341+
val bucketTableTestSpecRight = BucketTableTestSpec(None, expectedShuffle = true)
334342
testBucketing(
335-
bucketSpecLeft = bucketSpec,
336-
bucketSpecRight = None,
337-
joinCondition = joinCondition(Seq("i", "j")),
338-
shuffleLeft = false,
339-
shuffleRight = true
343+
bucketTableTestSpecLeft,
344+
bucketTableTestSpecRight,
345+
joinCondition = joinCondition(Seq("i", "j"))
340346
)
341347
}
342348

343349
test("only shuffle one side when 2 bucketed tables have different bucket number") {
344-
val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Nil))
345-
val bucketSpec2 = Some(BucketSpec(5, Seq("i", "j"), Nil))
350+
val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Nil))
351+
val bucketSpecRight = Some(BucketSpec(5, Seq("i", "j"), Nil))
352+
val bucketTableTestSpecLeft = BucketTableTestSpec(bucketSpecLeft, expectedShuffle = false)
353+
val bucketTableTestSpecRight = BucketTableTestSpec(bucketSpecRight, expectedShuffle = true)
346354
testBucketing(
347-
bucketSpecLeft = bucketSpec1,
348-
bucketSpecRight = bucketSpec2,
349-
joinCondition = joinCondition(Seq("i", "j")),
350-
shuffleLeft = false,
351-
shuffleRight = true
355+
bucketTableTestSpecLeft,
356+
bucketTableTestSpecRight,
357+
joinCondition = joinCondition(Seq("i", "j"))
352358
)
353359
}
354360

355361
test("only shuffle one side when 2 bucketed tables have different bucket keys") {
356-
val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Nil))
357-
val bucketSpec2 = Some(BucketSpec(8, Seq("j"), Nil))
362+
val bucketSpecLeft = Some(BucketSpec(8, Seq("i"), Nil))
363+
val bucketSpecRight = Some(BucketSpec(8, Seq("j"), Nil))
364+
val bucketTableTestSpecLeft = BucketTableTestSpec(bucketSpecLeft, expectedShuffle = false)
365+
val bucketTableTestSpecRight = BucketTableTestSpec(bucketSpecRight, expectedShuffle = true)
358366
testBucketing(
359-
bucketSpecLeft = bucketSpec1,
360-
bucketSpecRight = bucketSpec2,
361-
joinCondition = joinCondition(Seq("i")),
362-
shuffleLeft = false,
363-
shuffleRight = true
367+
bucketTableTestSpecLeft,
368+
bucketTableTestSpecRight,
369+
joinCondition = joinCondition(Seq("i"))
364370
)
365371
}
366372

367373
test("shuffle when join keys are not equal to bucket keys") {
368374
val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil))
375+
val bucketTableTestSpecLeft = BucketTableTestSpec(bucketSpec, expectedShuffle = true)
376+
val bucketTableTestSpecRight = BucketTableTestSpec(bucketSpec, expectedShuffle = true)
369377
testBucketing(
370-
bucketSpecLeft = bucketSpec,
371-
bucketSpecRight = bucketSpec,
372-
joinCondition = joinCondition(Seq("j")),
373-
shuffleLeft = true,
374-
shuffleRight = true
378+
bucketTableTestSpecLeft,
379+
bucketTableTestSpecRight,
380+
joinCondition = joinCondition(Seq("j"))
375381
)
376382
}
377383

378384
test("shuffle when join 2 bucketed tables with bucketing disabled") {
379385
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil))
386+
val bucketTableTestSpecLeft = BucketTableTestSpec(bucketSpec, expectedShuffle = true)
387+
val bucketTableTestSpecRight = BucketTableTestSpec(bucketSpec, expectedShuffle = true)
380388
withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") {
381389
testBucketing(
382-
bucketSpecLeft = bucketSpec,
383-
bucketSpecRight = bucketSpec,
384-
joinCondition = joinCondition(Seq("i", "j")),
385-
shuffleLeft = true,
386-
shuffleRight = true
390+
bucketTableTestSpecLeft,
391+
bucketTableTestSpecRight,
392+
joinCondition = joinCondition(Seq("i", "j"))
387393
)
388394
}
389395
}
390396

391-
test("avoid shuffle and sort when bucket and sort columns are join keys") {
392-
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
393-
testBucketing(
394-
bucketSpecLeft = bucketSpec,
395-
bucketSpecRight = bucketSpec,
396-
joinCondition = joinCondition(Seq("i", "j")),
397-
shuffleLeft = false,
398-
shuffleRight = false,
399-
numPartitions = 1,
400-
sortLeft = false,
401-
sortRight = false
402-
)
403-
}
404-
405-
test("sort when bucket and sort columns are join keys when having multiple files in a bucket") {
406-
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
397+
test("check sort and shuffle when bucket and sort columns are join keys") {
407398
// In case of bucketing, its possible to have multiple files belonging to the
408399
// same bucket in a given relation. Each of these files are locally sorted
409400
// but those files combined together are not globally sorted. Given that,
410401
// the RDD partition will not be sorted even if the relation has sort columns set
411402
// Therefore, we still need to keep the Sort in both sides.
403+
val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
404+
405+
val bucketTableTestSpecLeft1 = BucketTableTestSpec(
406+
bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true)
407+
val bucketTableTestSpecRight1 = BucketTableTestSpec(
408+
bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
409+
testBucketing(
410+
bucketTableTestSpecLeft1,
411+
bucketTableTestSpecRight1,
412+
joinCondition = joinCondition(Seq("i", "j"))
413+
)
414+
415+
val bucketTableTestSpecLeft2 = BucketTableTestSpec(
416+
bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
417+
val bucketTableTestSpecRight2 = BucketTableTestSpec(
418+
bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true)
412419
testBucketing(
413-
bucketSpecLeft = bucketSpec,
414-
bucketSpecRight = bucketSpec,
415-
joinCondition = joinCondition(Seq("i", "j")),
416-
shuffleLeft = false,
417-
shuffleRight = false,
418-
numPartitions = 50,
419-
sortLeft = true,
420-
sortRight = true
420+
bucketTableTestSpecLeft2,
421+
bucketTableTestSpecRight2,
422+
joinCondition = joinCondition(Seq("i", "j"))
423+
)
424+
425+
val bucketTableTestSpecLeft3 = BucketTableTestSpec(
426+
bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true)
427+
val bucketTableTestSpecRight3 = BucketTableTestSpec(
428+
bucketSpec, numPartitions = 50, expectedShuffle = false, expectedSort = true)
429+
testBucketing(
430+
bucketTableTestSpecLeft3,
431+
bucketTableTestSpecRight3,
432+
joinCondition = joinCondition(Seq("i", "j"))
433+
)
434+
435+
val bucketTableTestSpecLeft4 = BucketTableTestSpec(
436+
bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
437+
val bucketTableTestSpecRight4 = BucketTableTestSpec(
438+
bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
439+
testBucketing(
440+
bucketTableTestSpecLeft4,
441+
bucketTableTestSpecRight4,
442+
joinCondition = joinCondition(Seq("i", "j"))
421443
)
422444
}
423445

424446
test("avoid shuffle and sort when sort columns are a super set of join keys") {
425-
val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Seq("i", "j")))
426-
val bucketSpec2 = Some(BucketSpec(8, Seq("i"), Seq("i", "k")))
447+
val bucketSpecLeft = Some(BucketSpec(8, Seq("i"), Seq("i", "j")))
448+
val bucketSpecRight = Some(BucketSpec(8, Seq("i"), Seq("i", "k")))
449+
val bucketTableTestSpecLeft = BucketTableTestSpec(
450+
bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false)
451+
val bucketTableTestSpecRight = BucketTableTestSpec(
452+
bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = false)
427453
testBucketing(
428-
bucketSpecLeft = bucketSpec1,
429-
bucketSpecRight = bucketSpec2,
430-
joinCondition = joinCondition(Seq("i")),
431-
shuffleLeft = false,
432-
shuffleRight = false,
433-
numPartitions = 1,
434-
sortLeft = false,
435-
sortRight = false
454+
bucketTableTestSpecLeft,
455+
bucketTableTestSpecRight,
456+
joinCondition = joinCondition(Seq("i"))
436457
)
437458
}
438459

439460
test("only sort one side when sort columns are different") {
440-
val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
441-
val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("k")))
461+
val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
462+
val bucketSpecRight = Some(BucketSpec(8, Seq("i", "j"), Seq("k")))
463+
val bucketTableTestSpecLeft = BucketTableTestSpec(
464+
bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false)
465+
val bucketTableTestSpecRight = BucketTableTestSpec(
466+
bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = true)
442467
testBucketing(
443-
bucketSpecLeft = bucketSpec1,
444-
bucketSpecRight = bucketSpec2,
445-
joinCondition = joinCondition(Seq("i", "j")),
446-
shuffleLeft = false,
447-
shuffleRight = false,
448-
numPartitions = 1,
449-
sortLeft = false,
450-
sortRight = true
468+
bucketTableTestSpecLeft,
469+
bucketTableTestSpecRight,
470+
joinCondition = joinCondition(Seq("i", "j"))
451471
)
452472
}
453473

454474
test("only sort one side when sort columns are same but their ordering is different") {
455-
val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
456-
val bucketSpec2 = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i")))
475+
val bucketSpecLeft = Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j")))
476+
val bucketSpecRight = Some(BucketSpec(8, Seq("i", "j"), Seq("j", "i")))
477+
val bucketTableTestSpecLeft = BucketTableTestSpec(
478+
bucketSpecLeft, numPartitions = 1, expectedShuffle = false, expectedSort = false)
479+
val bucketTableTestSpecRight = BucketTableTestSpec(
480+
bucketSpecRight, numPartitions = 1, expectedShuffle = false, expectedSort = true)
457481
testBucketing(
458-
bucketSpecLeft = bucketSpec1,
459-
bucketSpecRight = bucketSpec2,
460-
joinCondition = joinCondition(Seq("i", "j")),
461-
shuffleLeft = false,
462-
shuffleRight = false,
463-
numPartitions = 1,
464-
sortLeft = false,
465-
sortRight = true
482+
bucketTableTestSpecLeft,
483+
bucketTableTestSpecRight,
484+
joinCondition = joinCondition(Seq("i", "j"))
466485
)
467486
}
468487

@@ -496,21 +515,20 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
496515

497516
test("SPARK-17698 Join predicates should not contain filter clauses") {
498517
val bucketSpec = Some(BucketSpec(8, Seq("i"), Seq("i")))
518+
val bucketTableTestSpecLeft = BucketTableTestSpec(
519+
bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
520+
val bucketTableTestSpecRight = BucketTableTestSpec(
521+
bucketSpec, numPartitions = 1, expectedShuffle = false, expectedSort = false)
499522
testBucketing(
500-
bucketSpecLeft = bucketSpec,
501-
bucketSpecRight = bucketSpec,
523+
bucketTableTestSpecLeft,
524+
bucketTableTestSpecRight,
502525
joinType = "fullouter",
503526
joinCondition = (left: DataFrame, right: DataFrame) => {
504527
val joinPredicates = Seq("i").map(col => left(col) === right(col)).reduce(_ && _)
505528
val filterLeft = left("i") === Literal("1")
506529
val filterRight = right("i") === Literal("1")
507530
joinPredicates && filterLeft && filterRight
508-
},
509-
shuffleLeft = false,
510-
shuffleRight = false,
511-
numPartitions = 1,
512-
sortLeft = false,
513-
sortRight = false
531+
}
514532
)
515533
}
516534

0 commit comments

Comments
 (0)