Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ private[spark] object SQLConf {
val CODEGEN_ENABLED = "spark.sql.codegen"
val DIALECT = "spark.sql.dialect"
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,23 @@

package org.apache.spark.sql.parquet

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.util.Try

import java.io.IOException
import java.lang.{Long => JLong}
import java.text.SimpleDateFormat
import java.util.{Date, List => JList}
import java.util.concurrent.{Callable, TimeUnit}
import java.util.{ArrayList, Collections, Date, List => JList}

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.util.Try

import com.google.common.cache.CacheBuilder
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter

import parquet.hadoop._
import parquet.hadoop.api.{InitContext, ReadSupport}
import parquet.hadoop.metadata.GlobalMetaData
Expand All @@ -41,7 +42,7 @@ import parquet.io.ParquetDecodingException
import parquet.schema.MessageType

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
import org.apache.spark.{Logging, SerializableWritable, TaskContext}
Expand Down Expand Up @@ -96,6 +97,11 @@ case class ParquetTableScan(
ParquetFilters.serializeFilterExpressions(columnPruningPred, conf)
}

// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
conf.set(
SQLConf.PARQUET_CACHE_METADATA,
sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "false"))

sc.newAPIHadoopRDD(
conf,
classOf[FilteringParquetRowInputFormat],
Expand Down Expand Up @@ -323,10 +329,40 @@ private[parquet] class FilteringParquetRowInputFormat
}

override def getFooters(jobContext: JobContext): JList[Footer] = {
import FilteringParquetRowInputFormat.footerCache

if (footers eq null) {
val conf = ContextUtil.getConfiguration(jobContext)
val cacheMetadata = conf.getBoolean(SQLConf.PARQUET_CACHE_METADATA, false)
val statuses = listStatus(jobContext)
fileStatuses = statuses.map(file => file.getPath -> file).toMap
footers = getFooters(ContextUtil.getConfiguration(jobContext), statuses)
if (statuses.isEmpty) {
footers = Collections.emptyList[Footer]
} else if (!cacheMetadata) {
// Read the footers from HDFS
footers = getFooters(conf, statuses)
} else {
// Read only the footers that are not in the footerCache
val foundFooters = footerCache.getAllPresent(statuses)
val toFetch = new ArrayList[FileStatus]
for (s <- statuses) {
if (!foundFooters.containsKey(s)) {
toFetch.add(s)
}
}
val newFooters = new mutable.HashMap[FileStatus, Footer]
if (toFetch.size > 0) {
val fetched = getFooters(conf, toFetch)
for ((status, i) <- toFetch.zipWithIndex) {
newFooters(status) = fetched.get(i)
}
footerCache.putAll(newFooters)
}
footers = new ArrayList[Footer](statuses.size)
for (status <- statuses) {
footers.add(newFooters.getOrElse(status, foundFooters.get(status)))
}
}
}

footers
Expand All @@ -339,6 +375,10 @@ private[parquet] class FilteringParquetRowInputFormat
configuration: Configuration,
footers: JList[Footer]): JList[ParquetInputSplit] = {

import FilteringParquetRowInputFormat.blockLocationCache

val cacheMetadata = configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA, false)

val maxSplitSize: JLong = configuration.getLong("mapred.max.split.size", Long.MaxValue)
val minSplitSize: JLong =
Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L))
Expand Down Expand Up @@ -366,16 +406,23 @@ private[parquet] class FilteringParquetRowInputFormat
for (footer <- footers) {
val fs = footer.getFile.getFileSystem(configuration)
val file = footer.getFile
val fileStatus = fileStatuses.getOrElse(file, fs.getFileStatus(file))
val status = fileStatuses.getOrElse(file, fs.getFileStatus(file))
val parquetMetaData = footer.getParquetMetadata
val blocks = parquetMetaData.getBlocks
val fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen)
var blockLocations: Array[BlockLocation] = null
if (!cacheMetadata) {
blockLocations = fs.getFileBlockLocations(status, 0, status.getLen)
} else {
blockLocations = blockLocationCache.get(status, new Callable[Array[BlockLocation]] {
def call(): Array[BlockLocation] = fs.getFileBlockLocations(status, 0, status.getLen)
})
}
splits.addAll(
generateSplits.invoke(
null,
blocks,
fileBlockLocations,
fileStatus,
blockLocations,
status,
parquetMetaData.getFileMetaData,
readContext.getRequestedSchema.toString,
readContext.getReadSupportMetadata,
Expand All @@ -387,6 +434,17 @@ private[parquet] class FilteringParquetRowInputFormat
}
}

private[parquet] object FilteringParquetRowInputFormat {
private val footerCache = CacheBuilder.newBuilder()
.maximumSize(20000)
.build[FileStatus, Footer]()

private val blockLocationCache = CacheBuilder.newBuilder()
.maximumSize(20000)
.expireAfterWrite(15, TimeUnit.MINUTES) // Expire locations since HDFS files might move
.build[FileStatus, Array[BlockLocation]]()
}

private[parquet] object FileSystemHelper {
def listFiles(pathStr: String, conf: Configuration): Seq[Path] = {
val origPath = new Path(pathStr)
Expand Down