diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index dc5c2ff927e4..fe418e610da8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -315,7 +315,14 @@ object InMemoryFileIndex extends Logging { // which is very slow on some file system (RawLocalFileSystem, which is launch a // subprocess and parse the stdout). try { - val locations = fs.getFileBlockLocations(f, 0, f.getLen) + val locations = fs.getFileBlockLocations(f, 0, f.getLen).map { loc => + // Store BlockLocation objects to consume less memory + if (loc.getClass == classOf[BlockLocation]) { + loc + } else { + new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, loc.getLength) + } + } val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, f.getModificationTime, 0, null, null, null, null, f.getPath, locations) if (f.isSymlink) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 18bb4bfe661c..49e7af4a9896 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -23,7 +23,7 @@ import java.net.URI import scala.collection.mutable import scala.language.reflectiveCalls -import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} +import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path, RawLocalFileSystem} import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.catalyst.util._ @@ -248,6 +248,26 @@ class FileIndexSuite extends SharedSQLContext { assert(spark.read.parquet(path.getAbsolutePath).schema.exists(_.name == colToUnescape)) } } + + test("SPARK-25062 - InMemoryFileIndex stores BlockLocation objects no matter what subclass " + + "the FS returns") { + withSQLConf("fs.file.impl" -> classOf[SpecialBlockLocationFileSystem].getName) { + withTempDir { dir => + val file = new File(dir, "text.txt") + stringToFile(file, "text") + + val inMemoryFileIndex = new InMemoryFileIndex( + spark, Seq(new Path(file.getCanonicalPath)), Map.empty, None) { + def leafFileStatuses = leafFiles.values + } + val blockLocations = inMemoryFileIndex.leafFileStatuses.flatMap( + _.asInstanceOf[LocatedFileStatus].getBlockLocations) + + assert(blockLocations.forall(_.getClass == classOf[BlockLocation])) + } + } + } + } class FakeParentPathFileSystem extends RawLocalFileSystem { @@ -257,3 +277,20 @@ class FakeParentPathFileSystem extends RawLocalFileSystem { URI.create("mockFs://some-bucket") } } + +class SpecialBlockLocationFileSystem extends RawLocalFileSystem { + + class SpecialBlockLocation( + names: Array[String], + hosts: Array[String], + offset: Long, + length: Long) + extends BlockLocation(names, hosts, offset, length) + + override def getFileBlockLocations( + file: FileStatus, + start: Long, + len: Long): Array[BlockLocation] = { + Array(new SpecialBlockLocation(Array("dummy"), Array("dummy"), 0L, file.getLen)) + } +}