@@ -20,12 +20,14 @@ package org.apache.spark.sql.parquet
2020import java .io .IOException
2121import java .lang .{Long => JLong }
2222import java .text .SimpleDateFormat
23+ import java .util .concurrent .{Callable , TimeUnit }
2324import java .util .{ArrayList , Date , List => JList }
2425
2526import scala .collection .JavaConversions ._
2627import scala .collection .mutable
2728import scala .util .Try
2829
30+ import com .google .common .cache .CacheBuilder
2931import org .apache .hadoop .conf .Configuration
3032import org .apache .hadoop .fs .{BlockLocation , FileStatus , Path }
3133import org .apache .hadoop .mapreduce ._
@@ -40,6 +42,7 @@ import parquet.io.ParquetDecodingException
4042import parquet .schema .MessageType
4143
4244import org .apache .spark .rdd .RDD
45+ import org .apache .spark .sql .SQLConf
4346import org .apache .spark .sql .catalyst .expressions .{Attribute , Expression , Row }
4447import org .apache .spark .sql .execution .{LeafNode , SparkPlan , UnaryNode }
4548import org .apache .spark .{Logging , SerializableWritable , TaskContext }
@@ -94,6 +97,11 @@ case class ParquetTableScan(
9497 ParquetFilters .serializeFilterExpressions(columnPruningPred, conf)
9598 }
9699
100+ // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
101+ conf.set(
102+ SQLConf .PARQUET_CACHE_METADATA ,
103+ sqlContext.getConf(SQLConf .PARQUET_CACHE_METADATA , " false" ))
104+
97105 sc.newAPIHadoopRDD(
98106 conf,
99107 classOf [FilteringParquetRowInputFormat ],
@@ -324,35 +332,33 @@ private[parquet] class FilteringParquetRowInputFormat
324332 import FilteringParquetRowInputFormat .footerCache
325333
326334 if (footers eq null ) {
335+ val conf = ContextUtil .getConfiguration(jobContext)
336+ val cacheMetadata = conf.getBoolean(SQLConf .PARQUET_CACHE_METADATA , false )
327337 val statuses = listStatus(jobContext)
328338 fileStatuses = statuses.map(file => file.getPath -> file).toMap
329- footers = new ArrayList [Footer ](statuses.size)
330- val toFetch = new ArrayList [FileStatus ]
331- footerCache.synchronized {
332- for (status <- statuses) {
333- if (footerCache.contains(status)) {
334- footers.add(footerCache(status))
335- } else {
336- footers.add(null )
337- toFetch.add(status)
339+ if (! cacheMetadata) {
340+ // Read the footers from HDFS
341+ footers = getFooters(conf, statuses)
342+ } else {
343+ // Read only the footers that are not in the footerCache
344+ val foundFooters = footerCache.getAllPresent(statuses)
345+ val toFetch = new ArrayList [FileStatus ]
346+ for (s <- statuses) {
347+ if (! foundFooters.containsKey(s)) {
348+ toFetch.add(s)
338349 }
339350 }
340- }
341- if (toFetch.size > 0 ) {
342- val fetched = getFooters(ContextUtil .getConfiguration(jobContext) , toFetch)
343- footerCache. synchronized {
344- for ((status, i) <- toFetch.zipWithIndex) {
345- footerCache(status) = fetched.get(i )
351+ val newFooters = new mutable. HashMap [ FileStatus , Footer ]
352+ if (toFetch.size > 0 ) {
353+ val fetched = getFooters(conf , toFetch)
354+ for ((footer, i) <- fetched.zipWithIndex) {
355+ newFooters(statuses.get(i)) = footer
356+ footerCache.putAll(newFooters )
346357 }
347358 }
348- var i = 0
349- var j = 0
350- while (i < toFetch.size) {
351- while (statuses.get(j) ne toFetch.get(i)) {
352- j += 1
353- }
354- footers(j) = fetched(i)
355- i += 1
359+ footers = new ArrayList [Footer ](statuses.size)
360+ for (status <- statuses) {
361+ footers.add(newFooters.getOrElse(status, foundFooters.get(status)))
356362 }
357363 }
358364 }
@@ -369,6 +375,8 @@ private[parquet] class FilteringParquetRowInputFormat
369375
370376 import FilteringParquetRowInputFormat .blockLocationCache
371377
378+ val cacheMetadata = configuration.getBoolean(SQLConf .PARQUET_CACHE_METADATA , false )
379+
372380 val maxSplitSize : JLong = configuration.getLong(" mapred.max.split.size" , Long .MaxValue )
373381 val minSplitSize : JLong =
374382 Math .max(getFormatMinSplitSize(), configuration.getLong(" mapred.min.split.size" , 0L ))
@@ -396,27 +404,23 @@ private[parquet] class FilteringParquetRowInputFormat
396404 for (footer <- footers) {
397405 val fs = footer.getFile.getFileSystem(configuration)
398406 val file = footer.getFile
399- val fileStatus = fileStatuses.getOrElse(file, fs.getFileStatus(file))
407+ val status = fileStatuses.getOrElse(file, fs.getFileStatus(file))
400408 val parquetMetaData = footer.getParquetMetadata
401409 val blocks = parquetMetaData.getBlocks
402- var fileBlockLocations : Array [BlockLocation ] = null
403- blockLocationCache.synchronized {
404- if (blockLocationCache.contains(fileStatus)) {
405- fileBlockLocations = blockLocationCache(fileStatus)
406- }
407- }
408- if (fileBlockLocations == null ) {
409- fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0 , fileStatus.getLen)
410- blockLocationCache.synchronized {
411- blockLocationCache(fileStatus) = fileBlockLocations
412- }
410+ var blockLocations : Array [BlockLocation ] = null
411+ if (! cacheMetadata) {
412+ blockLocations = fs.getFileBlockLocations(status, 0 , status.getLen)
413+ } else {
414+ blockLocations = blockLocationCache.get(status, new Callable [Array [BlockLocation ]] {
415+ def call (): Array [BlockLocation ] = fs.getFileBlockLocations(status, 0 , status.getLen)
416+ })
413417 }
414418 splits.addAll(
415419 generateSplits.invoke(
416420 null ,
417421 blocks,
418- fileBlockLocations ,
419- fileStatus ,
422+ blockLocations ,
423+ status ,
420424 parquetMetaData.getFileMetaData,
421425 readContext.getRequestedSchema.toString,
422426 readContext.getReadSupportMetadata,
@@ -429,9 +433,14 @@ private[parquet] class FilteringParquetRowInputFormat
429433}
430434
431435private [parquet] object FilteringParquetRowInputFormat {
432- // TODO: make these LRU maps with a bounded size
433- private val footerCache = new mutable.HashMap [FileStatus , Footer ]
434- private val blockLocationCache = new mutable.HashMap [FileStatus , Array [BlockLocation ]]
436+ private val footerCache = CacheBuilder .newBuilder()
437+ .maximumSize(20000 )
438+ .build[FileStatus , Footer ]()
439+
440+ private val blockLocationCache = CacheBuilder .newBuilder()
441+ .maximumSize(20000 )
442+ .expireAfterWrite(15 , TimeUnit .MINUTES ) // Expire locations since HDFS nodes might fail
443+ .build[FileStatus , Array [BlockLocation ]]()
435444}
436445
437446private [parquet] object FileSystemHelper {
0 commit comments