1717
1818package org .apache .spark .sql .sources
1919
20- import java .io .IOException
21-
2220import scala .collection .mutable
2321
2422import com .google .common .base .Objects
@@ -27,9 +25,9 @@ import org.apache.hadoop.fs.{FileSystem, Path}
2725import org .scalatest .BeforeAndAfter
2826
2927import org .apache .spark .rdd .RDD
28+ import org .apache .spark .sql ._
3029import org .apache .spark .sql .hive .test .TestHive
3130import org .apache .spark .sql .types ._
32- import org .apache .spark .sql ._
3331import org .apache .spark .util .Utils
3432
3533class SimpleFSBasedSource extends FSBasedRelationProvider {
@@ -143,7 +141,14 @@ class FSBasedRelationSuite extends QueryTest with BeforeAndAfter {
143141 StructField (" a" , IntegerType , nullable = false ),
144142 StructField (" b" , StringType , nullable = false )))
145143
146- val partitionColumns = StructType (StructField (" p1" , IntegerType , nullable = true ) :: Nil )
144+ val singlePartitionColumn = StructType (
145+ Seq (
146+ StructField (" p1" , IntegerType , nullable = true )))
147+
148+ val doublePartitionColumns = StructType (
149+ Seq (
150+ StructField (" p1" , IntegerType , nullable = true ),
151+ StructField (" p2" , StringType , nullable = true )))
147152
148153 val testDF = (for {
149154 i <- 1 to 3
@@ -240,7 +245,7 @@ class FSBasedRelationSuite extends QueryTest with BeforeAndAfter {
240245 options = Map (
241246 " path" -> basePath.toString,
242247 " schema" -> dataSchema.json,
243- " partitionColumns" -> partitionColumns .json),
248+ " partitionColumns" -> singlePartitionColumn .json),
244249 partitionColumns = Seq (" p1" ))
245250
246251 Thread .sleep(500 )
@@ -269,7 +274,7 @@ class FSBasedRelationSuite extends QueryTest with BeforeAndAfter {
269274 options = Map (
270275 " path" -> basePath.toString,
271276 " schema" -> dataSchema.json,
272- " partitionColumns" -> partitionColumns .json),
277+ " partitionColumns" -> singlePartitionColumn .json),
273278 partitionColumns = Seq (" p1" ))
274279
275280 // Written rows shouldn't contain dynamic partition column
@@ -281,34 +286,71 @@ class FSBasedRelationSuite extends QueryTest with BeforeAndAfter {
281286 }
282287 }
283288
284- test(" save() - partitioned table - Append" ) {
289+ test(" save() - partitioned table - Append - new partition values " ) {
285290 testDF.save(
286291 source = classOf [SimpleFSBasedSource ].getCanonicalName,
287292 mode = SaveMode .Overwrite ,
288293 options = Map (
289294 " path" -> basePath.toString,
290295 " schema" -> dataSchema.json,
291- " partitionColumns" -> partitionColumns .json),
296+ " partitionColumns" -> singlePartitionColumn .json),
292297 partitionColumns = Seq (" p1" ))
293298
294- testDF.save(
299+ val moreData = (for {
300+ i <- 1 to 3
301+ p <- 3 to 4
302+ } yield (i, s " val_ $i" , p)).toDF(" a" , " b" , " p1" )
303+
304+ moreData.save(
295305 source = classOf [SimpleFSBasedSource ].getCanonicalName,
296306 mode = SaveMode .Append ,
297307 options = Map (
298308 " path" -> basePath.toString,
299309 " schema" -> dataSchema.json,
300- " partitionColumns" -> partitionColumns .json),
310+ " partitionColumns" -> singlePartitionColumn .json),
301311 partitionColumns = Seq (" p1" ))
302312
303313 // Written rows shouldn't contain dynamic partition column
304314 val expectedRows = for (i <- 1 to 3 ; _ <- 1 to 4 ) yield Row (i, s " val_ $i" )
305315
306316 TestResult .synchronized {
307- assert(TestResult .writerPaths.size === 2 )
317+ assert(TestResult .writerPaths.size === 4 )
308318 assert(TestResult .writtenRows === expectedRows.toSet)
309319 }
310320 }
311321
322+ test(" save() - partitioned table - Append - mismatched partition columns" ) {
323+ val data = (for {
324+ i <- 1 to 3
325+ p1 <- 1 to 2
326+ p2 <- Array (" hello" , " world" )
327+ } yield (i, s " val_ $i" , p1, p2)).toDF(" a" , " b" , " p1" , " p2" )
328+
329+ // Using only a subset of all partition columns
330+ intercept[IllegalArgumentException ] {
331+ data.save(
332+ source = classOf [SimpleFSBasedSource ].getCanonicalName,
333+ mode = SaveMode .Append ,
334+ options = Map (
335+ " path" -> basePath.toString,
336+ " schema" -> dataSchema.json,
337+ " partitionColumns" -> doublePartitionColumns.json),
338+ partitionColumns = Seq (" p1" ))
339+ }
340+
341+ // Using different order of partition columns
342+ intercept[IllegalArgumentException ] {
343+ data.save(
344+ source = classOf [SimpleFSBasedSource ].getCanonicalName,
345+ mode = SaveMode .Append ,
346+ options = Map (
347+ " path" -> basePath.toString,
348+ " schema" -> dataSchema.json,
349+ " partitionColumns" -> doublePartitionColumns.json),
350+ partitionColumns = Seq (" p2" , " p1" ))
351+ }
352+ }
353+
312354 test(" save() - partitioned table - ErrorIfExists" ) {
313355 fs.delete(basePath, true )
314356
@@ -318,7 +360,7 @@ class FSBasedRelationSuite extends QueryTest with BeforeAndAfter {
318360 options = Map (
319361 " path" -> basePath.toString,
320362 " schema" -> dataSchema.json,
321- " partitionColumns" -> partitionColumns .json),
363+ " partitionColumns" -> singlePartitionColumn .json),
322364 partitionColumns = Seq (" p1" ))
323365
324366 // Written rows shouldn't contain dynamic partition column
@@ -336,7 +378,7 @@ class FSBasedRelationSuite extends QueryTest with BeforeAndAfter {
336378 options = Map (
337379 " path" -> basePath.toString,
338380 " schema" -> dataSchema.json,
339- " partitionColumns" -> partitionColumns .json),
381+ " partitionColumns" -> singlePartitionColumn .json),
340382 partitionColumns = Seq (" p1" ))
341383 }
342384 }
@@ -354,7 +396,7 @@ class FSBasedRelationSuite extends QueryTest with BeforeAndAfter {
354396 }
355397
356398 test(" save() - data sources other than FSBasedRelation" ) {
357- val cause = intercept[RuntimeException ] {
399+ intercept[RuntimeException ] {
358400 testDF.save(
359401 source = classOf [FilteredScanSource ].getCanonicalName,
360402 mode = SaveMode .Overwrite ,
@@ -363,16 +405,25 @@ class FSBasedRelationSuite extends QueryTest with BeforeAndAfter {
363405 }
364406 }
365407
366- test(" saveAsTable() - partitioned table - Overwrite" ) {
367- testDF.saveAsTable(
368- tableName = " t" ,
408+ def saveToPartitionedTable (
409+ df : DataFrame ,
410+ tableName : String ,
411+ relationPartitionColumns : StructType ,
412+ partitionedBy : Seq [String ],
413+ mode : SaveMode ): Unit = {
414+ df.saveAsTable(
415+ tableName = tableName,
369416 source = classOf [SimpleFSBasedSource ].getCanonicalName,
370- mode = SaveMode . Overwrite ,
417+ mode = mode ,
371418 options = Map (
372419 " path" -> basePath.toString,
373420 " schema" -> dataSchema.json,
374- " partitionColumns" -> partitionColumns.json),
375- partitionColumns = Seq (" p1" ))
421+ " partitionColumns" -> relationPartitionColumns.json),
422+ partitionColumns = partitionedBy)
423+ }
424+
425+ test(" saveAsTable() - partitioned table - Overwrite" ) {
426+ saveToPartitionedTable(testDF, " t" , singlePartitionColumn, Array (" p1" ), SaveMode .Overwrite )
376427
377428 // Written rows shouldn't contain dynamic partition column
378429 val expectedRows = for (i <- 1 to 3 ; _ <- 1 to 2 ) yield Row (i, s " val_ $i" )
@@ -399,17 +450,9 @@ class FSBasedRelationSuite extends QueryTest with BeforeAndAfter {
399450 options = Map (
400451 " path" -> basePath.toString,
401452 " schema" -> dataSchema.json,
402- " partitionColumns" -> partitionColumns .json))
453+ " partitionColumns" -> singlePartitionColumn .json))
403454
404- df.saveAsTable(
405- tableName = " t" ,
406- source = classOf [SimpleFSBasedSource ].getCanonicalName,
407- mode = SaveMode .Overwrite ,
408- options = Map (
409- " path" -> basePath.toString,
410- " schema" -> dataSchema.json,
411- " partitionColumns" -> partitionColumns.json),
412- partitionColumns = Seq (" p1" ))
455+ saveToPartitionedTable(df, " t" , singlePartitionColumn, Seq (" p1" ), SaveMode .Overwrite )
413456
414457 // Written rows shouldn't contain dynamic partition column
415458 val expectedRows = for (i <- 1 to 3 ; _ <- 1 to 2 ) yield Row (i, s " val_ $i" )
@@ -429,25 +472,8 @@ class FSBasedRelationSuite extends QueryTest with BeforeAndAfter {
429472 }
430473
431474 test(" saveAsTable() - partitioned table - Append" ) {
432- testDF.saveAsTable(
433- tableName = " t" ,
434- source = classOf [SimpleFSBasedSource ].getCanonicalName,
435- mode = SaveMode .Overwrite ,
436- options = Map (
437- " path" -> basePath.toString,
438- " schema" -> dataSchema.json,
439- " partitionColumns" -> partitionColumns.json),
440- partitionColumns = Seq (" p1" ))
441-
442- testDF.saveAsTable(
443- tableName = " t" ,
444- source = classOf [SimpleFSBasedSource ].getCanonicalName,
445- mode = SaveMode .Append ,
446- options = Map (
447- " path" -> basePath.toString,
448- " schema" -> dataSchema.json,
449- " partitionColumns" -> partitionColumns.json),
450- partitionColumns = Seq (" p1" ))
475+ saveToPartitionedTable(testDF, " t" , singlePartitionColumn, Seq (" p1" ), SaveMode .Overwrite )
476+ saveToPartitionedTable(testDF, " t" , singlePartitionColumn, Seq (" p1" ), SaveMode .Append )
451477
452478 // Written rows shouldn't contain dynamic partition column
453479 val expectedRows = for (i <- 1 to 3 ; _ <- 1 to 2 ) yield Row (i, s " val_ $i" )
@@ -466,18 +492,30 @@ class FSBasedRelationSuite extends QueryTest with BeforeAndAfter {
466492 sql(" DROP TABLE t" )
467493 }
468494
495+ test(" saveAsTable() - partitioned table - Append - mismatched partition columns" ) {
496+ val data = (for {
497+ i <- 1 to 3
498+ p1 <- 1 to 2
499+ p2 <- Array (" hello" , " world" )
500+ } yield (i, s " val_ $i" , p1, p2)).toDF(" a" , " b" , " p1" , " p2" )
501+
502+ // Using only a subset of all partition columns
503+ intercept[IllegalArgumentException ] {
504+ saveToPartitionedTable(data, " t" , doublePartitionColumns, Seq (" p1" ), SaveMode .Append )
505+ }
506+
507+ // Using different order of partition columns
508+ intercept[IllegalArgumentException ] {
509+ saveToPartitionedTable(data, " t" , doublePartitionColumns, Seq (" p2" , " p1" ), SaveMode .Append )
510+ }
511+
512+ sql(" DROP TABLE t" )
513+ }
514+
469515 test(" saveAsTable() - partitioned table - ErrorIfExists" ) {
470516 fs.delete(basePath, true )
471517
472- testDF.saveAsTable(
473- tableName = " t" ,
474- source = classOf [SimpleFSBasedSource ].getCanonicalName,
475- mode = SaveMode .ErrorIfExists ,
476- options = Map (
477- " path" -> basePath.toString,
478- " schema" -> dataSchema.json,
479- " partitionColumns" -> partitionColumns.json),
480- partitionColumns = Seq (" p1" ))
518+ saveToPartitionedTable(testDF, " t" , singlePartitionColumn, Seq (" p1" ), SaveMode .ErrorIfExists )
481519
482520 // Written rows shouldn't contain dynamic partition column
483521 val expectedRows = for (i <- 1 to 3 ; _ <- 1 to 2 ) yield Row (i, s " val_ $i" )
@@ -494,33 +532,16 @@ class FSBasedRelationSuite extends QueryTest with BeforeAndAfter {
494532 }
495533
496534 intercept[AnalysisException ] {
497- testDF.saveAsTable(
498- tableName = " t" ,
499- source = classOf [SimpleFSBasedSource ].getCanonicalName,
500- mode = SaveMode .ErrorIfExists ,
501- options = Map (
502- " path" -> basePath.toString,
503- " schema" -> dataSchema.json,
504- " partitionColumns" -> partitionColumns.json),
505- partitionColumns = Seq (" p1" ))
535+ saveToPartitionedTable(testDF, " t" , singlePartitionColumn, Seq (" p1" ), SaveMode .ErrorIfExists )
506536 }
507537
508538 sql(" DROP TABLE t" )
509539 }
510540
511541 test(" saveAsTable() - partitioned table - Ignore" ) {
512- testDF.saveAsTable(
513- tableName = " t" ,
514- source = classOf [SimpleFSBasedSource ].getCanonicalName,
515- mode = SaveMode .Ignore ,
516- options = Map (
517- " path" -> basePath.toString,
518- " schema" -> dataSchema.json,
519- " partitionColumns" -> partitionColumns.json),
520- partitionColumns = Seq (" p1" ))
542+ saveToPartitionedTable(testDF, " t" , singlePartitionColumn, Seq (" p1" ), SaveMode .Ignore )
521543
522544 assert(TestResult .writtenRows.isEmpty)
523-
524545 assertResult(table(" t" ).schema) {
525546 StructType (
526547 dataSchema ++ Seq (
@@ -531,7 +552,7 @@ class FSBasedRelationSuite extends QueryTest with BeforeAndAfter {
531552 }
532553
533554 test(" saveAsTable() - data sources other than FSBasedRelation" ) {
534- val cause = intercept[RuntimeException ] {
555+ intercept[RuntimeException ] {
535556 testDF.saveAsTable(
536557 tableName = " t" ,
537558 source = classOf [FilteredScanSource ].getCanonicalName,
0 commit comments