Skip to content

Commit 4094df6

Browse files
committed
First attempt at caching Parquet footers
1 parent 2fc8aca commit 4094df6

File tree

1 file changed

+41
-8
lines changed

1 file changed

+41
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,21 @@
1717

1818
package org.apache.spark.sql.parquet
1919

20-
import scala.collection.JavaConversions._
21-
import scala.collection.mutable
22-
import scala.util.Try
23-
2420
import java.io.IOException
2521
import java.lang.{Long => JLong}
2622
import java.text.SimpleDateFormat
27-
import java.util.{Date, List => JList}
23+
import java.util.{ArrayList, Date, List => JList}
24+
25+
import scala.collection.JavaConversions._
26+
import scala.collection.mutable
27+
import scala.util.Try
2828

2929
import org.apache.hadoop.conf.Configuration
3030
import org.apache.hadoop.fs.{FileStatus, Path}
3131
import org.apache.hadoop.mapreduce._
3232
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
3333
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
3434
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
35-
3635
import parquet.hadoop._
3736
import parquet.hadoop.api.{InitContext, ReadSupport}
3837
import parquet.hadoop.metadata.GlobalMetaData
@@ -41,7 +40,6 @@ import parquet.io.ParquetDecodingException
4140
import parquet.schema.MessageType
4241

4342
import org.apache.spark.rdd.RDD
44-
import org.apache.spark.sql.SQLContext
4543
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
4644
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
4745
import org.apache.spark.{Logging, SerializableWritable, TaskContext}
@@ -323,10 +321,40 @@ private[parquet] class FilteringParquetRowInputFormat
323321
}
324322

325323
override def getFooters(jobContext: JobContext): JList[Footer] = {
324+
import FilteringParquetRowInputFormat.footerCache
325+
326326
if (footers eq null) {
327327
val statuses = listStatus(jobContext)
328328
fileStatuses = statuses.map(file => file.getPath -> file).toMap
329-
footers = getFooters(ContextUtil.getConfiguration(jobContext), statuses)
329+
footers = new ArrayList[Footer](statuses.size)
330+
val toFetch = new ArrayList[FileStatus]
331+
footerCache.synchronized {
332+
for (status <- statuses) {
333+
if (!footerCache.contains(status)) {
334+
footers.add(footerCache(status))
335+
} else {
336+
footers.add(null)
337+
toFetch.add(status)
338+
}
339+
}
340+
}
341+
if (toFetch.size > 0) {
342+
val fetched = getFooters(ContextUtil.getConfiguration(jobContext), toFetch)
343+
footerCache.synchronized {
344+
for ((status, i) <- toFetch.zipWithIndex) {
345+
footerCache(status) = fetched.get(i)
346+
}
347+
}
348+
var i = 0
349+
var j = 0
350+
while (i < toFetch.size) {
351+
while (statuses(j) ne toFetch.get(i)) {
352+
j += 1
353+
}
354+
footers(j) = fetched(i)
355+
i += 1
356+
}
357+
}
330358
}
331359

332360
footers
@@ -387,6 +415,11 @@ private[parquet] class FilteringParquetRowInputFormat
387415
}
388416
}
389417

418+
private[parquet] object FilteringParquetRowInputFormat {
419+
// TODO: make this an LRU map with a bounded size
420+
private val footerCache = new mutable.HashMap[FileStatus, Footer]
421+
}
422+
390423
private[parquet] object FileSystemHelper {
391424
def listFiles(pathStr: String, conf: Configuration): Seq[Path] = {
392425
val origPath = new Path(pathStr)

0 commit comments

Comments
 (0)