@@ -19,10 +19,12 @@ package org.apache.spark
1919
2020import java .io ._
2121import java .nio .ByteBuffer
22+ import java .nio .charset .StandardCharsets
2223import java .util .zip .GZIPOutputStream
2324
2425import scala .io .Source
2526
27+ import com .google .common .io .Files
2628import org .apache .hadoop .conf .Configuration
2729import org .apache .hadoop .fs .Path
2830import org .apache .hadoop .io ._
@@ -299,6 +301,25 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
299301 }
300302 }
301303
304+ test(" SPARK-22357 test binaryFiles minPartitions" ) {
305+ sc = new SparkContext (new SparkConf ().setAppName(" test" ).setMaster(" local" )
306+ .set(" spark.files.openCostInBytes" , " 0" )
307+ .set(" spark.default.parallelism" , " 1" ))
308+
309+ val tempDir = Utils .createTempDir()
310+ val tempDirPath = tempDir.getAbsolutePath
311+
312+ for (i <- 0 until 8 ) {
313+ val tempFile = new File (tempDir, s " part-0000 $i" )
314+ Files .write(" someline1 in file1\n someline2 in file1\n someline3 in file1" , tempFile,
315+ StandardCharsets .UTF_8 )
316+ }
317+
318+ assert(sc.binaryFiles(tempDirPath, minPartitions = 1 ).getNumPartitions === 1 )
319+ assert(sc.binaryFiles(tempDirPath, minPartitions = 2 ).getNumPartitions === 2 )
320+ assert(sc.binaryFiles(tempDirPath, minPartitions = 8 ).getNumPartitions === 8 )
321+ }
322+
302323 test(" fixed record length binary file as byte array" ) {
303324 sc = new SparkContext (" local" , " test" )
304325 val testOutput = Array [Byte ](1 , 2 , 3 , 4 , 5 , 6 )
0 commit comments