@@ -29,6 +29,7 @@ import org.apache.spark.sql.hive.execution.{InsertIntoHiveTable, HiveTableScan}
2929import org .apache .spark .sql .hive .test .TestHive ._
3030import org .apache .spark .sql .hive .test .TestHive .implicits ._
3131import org .apache .spark .sql .sources .{InsertIntoDataSource , LogicalRelation }
32+ import org .apache .spark .sql .SaveMode
3233
3334// The data where the partitioning key exists only in the directory structure.
3435case class ParquetData (intField : Int , stringField : String )
@@ -409,6 +410,32 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
409410 )
410411 """ )
411412 }
413+
414+ test(" SPARK-6016 make sure to use the latest footers are used" ) {
415+ sql(" drop table if exists spark_6016_fix" )
416+
417+ // Create a DataFrame with two partitions. So, the created table will have two parquet files.
418+ val df1 = jsonRDD(sparkContext.parallelize((1 to 10 ).map(i => s """ {"a": $i} """ ), 2 ))
419+ df1.saveAsTable(" spark_6016_fix" , " parquet" , SaveMode .Overwrite )
420+ checkAnswer(
421+ sql(" select * from spark_6016_fix" ),
422+ (1 to 10 ).map(i => Row (i))
423+ )
424+
425+ // Create a DataFrame with four partitions. So, the created table will have four parquet files.
426+ val df2 = jsonRDD(sparkContext.parallelize((1 to 10 ).map(i => s """ {"b": $i} """ ), 4 ))
427+ df2.saveAsTable(" spark_6016_fix" , " parquet" , SaveMode .Overwrite )
428+ // For the bug of SPARK-6016, we are caching two outdated footers for df1. Then,
429+ // since the new table has four parquet files, we are trying to read new footers from two files
430+ // and then merge metadata in footers of these four (two outdated ones and two latest one),
431+ // which will cause an error.
432+ checkAnswer(
433+ sql(" select * from spark_6016_fix" ),
434+ (1 to 10 ).map(i => Row (i))
435+ )
436+
437+ sql(" drop table spark_6016_fix" )
438+ }
412439}
413440
414441class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
0 commit comments