Skip to content

Commit 8fb56ce

Browse files
committed
Cache file block locations too
1 parent 453bd21 commit 8fb56ce

File tree

1 file changed

+17
-3
lines changed

1 file changed

+17
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import scala.collection.mutable
2727
import scala.util.Try
2828

2929
import org.apache.hadoop.conf.Configuration
30-
import org.apache.hadoop.fs.{FileStatus, Path}
30+
import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path}
3131
import org.apache.hadoop.mapreduce._
3232
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
3333
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
@@ -367,6 +367,8 @@ private[parquet] class FilteringParquetRowInputFormat
367367
configuration: Configuration,
368368
footers: JList[Footer]): JList[ParquetInputSplit] = {
369369

370+
import FilteringParquetRowInputFormat.blockLocationCache
371+
370372
val maxSplitSize: JLong = configuration.getLong("mapred.max.split.size", Long.MaxValue)
371373
val minSplitSize: JLong =
372374
Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L))
@@ -397,7 +399,18 @@ private[parquet] class FilteringParquetRowInputFormat
397399
val fileStatus = fileStatuses.getOrElse(file, fs.getFileStatus(file))
398400
val parquetMetaData = footer.getParquetMetadata
399401
val blocks = parquetMetaData.getBlocks
400-
val fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen)
402+
var fileBlockLocations: Array[BlockLocation] = null
403+
blockLocationCache.synchronized {
404+
if (blockLocationCache.contains(fileStatus)) {
405+
fileBlockLocations = blockLocationCache(fileStatus)
406+
}
407+
}
408+
if (fileBlockLocations == null) {
409+
fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen)
410+
blockLocationCache.synchronized {
411+
blockLocationCache(fileStatus) = fileBlockLocations
412+
}
413+
}
401414
splits.addAll(
402415
generateSplits.invoke(
403416
null,
@@ -416,8 +429,9 @@ private[parquet] class FilteringParquetRowInputFormat
416429
}
417430

418431
private[parquet] object FilteringParquetRowInputFormat {
419-
// TODO: make this an LRU map with a bounded size
432+
// TODO: make these LRU maps with a bounded size
420433
private val footerCache = new mutable.HashMap[FileStatus, Footer]
434+
private val blockLocationCache = new mutable.HashMap[FileStatus, Array[BlockLocation]]
421435
}
422436

423437
private[parquet] object FileSystemHelper {

0 commit comments

Comments
 (0)