Skip to content

Commit 2ae2857

Browse files
mateizmarmbrus
authored andcommitted
[SPARK-3091] [SQL] Add support for caching metadata on Parquet files
For larger Parquet files, reading the file footers (which is done in parallel on up to 5 threads) and HDFS block locations (which is serial) can take multiple seconds. We can add an option to cache this data within FilteringParquetInputFormat. Unfortunately ParquetInputFormat only caches footers within each instance of ParquetInputFormat, not across them. Note: this PR leaves this turned off by default for 1.1, but I believe it's safe to turn it on after. The keys in the hash maps are FileStatus objects that include a modification time, so this will work fine if files are modified. The location cache could become invalid if files have moved within HDFS, but that's rare so I just made it invalidate entries every 15 minutes. Author: Matei Zaharia <[email protected]> Closes apache#2005 from mateiz/parquet-cache and squashes the following commits: dae8efe [Matei Zaharia] Bug fix c71e9ed [Matei Zaharia] Handle empty statuses directly 22072b0 [Matei Zaharia] Use Guava caches and add a config option for caching metadata 8fb56ce [Matei Zaharia] Cache file block locations too 453bd21 [Matei Zaharia] Bug fix 4094df6 [Matei Zaharia] First attempt at caching Parquet footers (cherry picked from commit 9eb74c7) Signed-off-by: Michael Armbrust <[email protected]>
1 parent 496f62d commit 2ae2857

File tree

2 files changed

+72
-13
lines changed

2 files changed

+72
-13
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ private[spark] object SQLConf {
3232
val CODEGEN_ENABLED = "spark.sql.codegen"
3333
val DIALECT = "spark.sql.dialect"
3434
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
35+
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
3536

3637
// This is only used for the thriftserver
3738
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 71 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,23 @@
1717

1818
package org.apache.spark.sql.parquet
1919

20-
import scala.collection.JavaConversions._
21-
import scala.collection.mutable
22-
import scala.util.Try
23-
2420
import java.io.IOException
2521
import java.lang.{Long => JLong}
2622
import java.text.SimpleDateFormat
27-
import java.util.{Date, List => JList}
23+
import java.util.concurrent.{Callable, TimeUnit}
24+
import java.util.{ArrayList, Collections, Date, List => JList}
2825

26+
import scala.collection.JavaConversions._
27+
import scala.collection.mutable
28+
import scala.util.Try
29+
30+
import com.google.common.cache.CacheBuilder
2931
import org.apache.hadoop.conf.Configuration
30-
import org.apache.hadoop.fs.{FileStatus, Path}
32+
import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path}
3133
import org.apache.hadoop.mapreduce._
3234
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
3335
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
3436
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
35-
3637
import parquet.hadoop._
3738
import parquet.hadoop.api.{InitContext, ReadSupport}
3839
import parquet.hadoop.metadata.GlobalMetaData
@@ -41,7 +42,7 @@ import parquet.io.ParquetDecodingException
4142
import parquet.schema.MessageType
4243

4344
import org.apache.spark.rdd.RDD
44-
import org.apache.spark.sql.SQLContext
45+
import org.apache.spark.sql.SQLConf
4546
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
4647
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
4748
import org.apache.spark.{Logging, SerializableWritable, TaskContext}
@@ -96,6 +97,11 @@ case class ParquetTableScan(
9697
ParquetFilters.serializeFilterExpressions(columnPruningPred, conf)
9798
}
9899

100+
// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
101+
conf.set(
102+
SQLConf.PARQUET_CACHE_METADATA,
103+
sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "false"))
104+
99105
sc.newAPIHadoopRDD(
100106
conf,
101107
classOf[FilteringParquetRowInputFormat],
@@ -323,10 +329,40 @@ private[parquet] class FilteringParquetRowInputFormat
323329
}
324330

325331
override def getFooters(jobContext: JobContext): JList[Footer] = {
332+
import FilteringParquetRowInputFormat.footerCache
333+
326334
if (footers eq null) {
335+
val conf = ContextUtil.getConfiguration(jobContext)
336+
val cacheMetadata = conf.getBoolean(SQLConf.PARQUET_CACHE_METADATA, false)
327337
val statuses = listStatus(jobContext)
328338
fileStatuses = statuses.map(file => file.getPath -> file).toMap
329-
footers = getFooters(ContextUtil.getConfiguration(jobContext), statuses)
339+
if (statuses.isEmpty) {
340+
footers = Collections.emptyList[Footer]
341+
} else if (!cacheMetadata) {
342+
// Read the footers from HDFS
343+
footers = getFooters(conf, statuses)
344+
} else {
345+
// Read only the footers that are not in the footerCache
346+
val foundFooters = footerCache.getAllPresent(statuses)
347+
val toFetch = new ArrayList[FileStatus]
348+
for (s <- statuses) {
349+
if (!foundFooters.containsKey(s)) {
350+
toFetch.add(s)
351+
}
352+
}
353+
val newFooters = new mutable.HashMap[FileStatus, Footer]
354+
if (toFetch.size > 0) {
355+
val fetched = getFooters(conf, toFetch)
356+
for ((status, i) <- toFetch.zipWithIndex) {
357+
newFooters(status) = fetched.get(i)
358+
}
359+
footerCache.putAll(newFooters)
360+
}
361+
footers = new ArrayList[Footer](statuses.size)
362+
for (status <- statuses) {
363+
footers.add(newFooters.getOrElse(status, foundFooters.get(status)))
364+
}
365+
}
330366
}
331367

332368
footers
@@ -339,6 +375,10 @@ private[parquet] class FilteringParquetRowInputFormat
339375
configuration: Configuration,
340376
footers: JList[Footer]): JList[ParquetInputSplit] = {
341377

378+
import FilteringParquetRowInputFormat.blockLocationCache
379+
380+
val cacheMetadata = configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA, false)
381+
342382
val maxSplitSize: JLong = configuration.getLong("mapred.max.split.size", Long.MaxValue)
343383
val minSplitSize: JLong =
344384
Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L))
@@ -366,16 +406,23 @@ private[parquet] class FilteringParquetRowInputFormat
366406
for (footer <- footers) {
367407
val fs = footer.getFile.getFileSystem(configuration)
368408
val file = footer.getFile
369-
val fileStatus = fileStatuses.getOrElse(file, fs.getFileStatus(file))
409+
val status = fileStatuses.getOrElse(file, fs.getFileStatus(file))
370410
val parquetMetaData = footer.getParquetMetadata
371411
val blocks = parquetMetaData.getBlocks
372-
val fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen)
412+
var blockLocations: Array[BlockLocation] = null
413+
if (!cacheMetadata) {
414+
blockLocations = fs.getFileBlockLocations(status, 0, status.getLen)
415+
} else {
416+
blockLocations = blockLocationCache.get(status, new Callable[Array[BlockLocation]] {
417+
def call(): Array[BlockLocation] = fs.getFileBlockLocations(status, 0, status.getLen)
418+
})
419+
}
373420
splits.addAll(
374421
generateSplits.invoke(
375422
null,
376423
blocks,
377-
fileBlockLocations,
378-
fileStatus,
424+
blockLocations,
425+
status,
379426
parquetMetaData.getFileMetaData,
380427
readContext.getRequestedSchema.toString,
381428
readContext.getReadSupportMetadata,
@@ -387,6 +434,17 @@ private[parquet] class FilteringParquetRowInputFormat
387434
}
388435
}
389436

437+
private[parquet] object FilteringParquetRowInputFormat {
438+
private val footerCache = CacheBuilder.newBuilder()
439+
.maximumSize(20000)
440+
.build[FileStatus, Footer]()
441+
442+
private val blockLocationCache = CacheBuilder.newBuilder()
443+
.maximumSize(20000)
444+
.expireAfterWrite(15, TimeUnit.MINUTES) // Expire locations since HDFS files might move
445+
.build[FileStatus, Array[BlockLocation]]()
446+
}
447+
390448
private[parquet] object FileSystemHelper {
391449
def listFiles(pathStr: String, conf: Configuration): Seq[Path] = {
392450
val origPath = new Path(pathStr)

0 commit comments

Comments
 (0)