Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ import scala.collection.JavaConversions._
import scala.util.Try

import com.google.common.base.Objects
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import parquet.filter2.predicate.FilterApi
import parquet.format.converter.ParquetMetadataConverter
import parquet.hadoop._
import parquet.hadoop.metadata.CompressionCodecName
import parquet.hadoop.util.ContextUtil
Expand Down Expand Up @@ -175,8 +174,8 @@ private[sql] class ParquetRelation2(
override def dataSchema: StructType = metadataCache.dataSchema

override private[sql] def refresh(): Unit = {
metadataCache.refresh()
super.refresh()
metadataCache.refresh()
}

// Parquet data source always uses Catalyst internal representations.
Expand Down Expand Up @@ -234,15 +233,15 @@ private[sql] class ParquetRelation2(
override def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
inputPaths: Array[String]): RDD[Row] = {
inputFiles: Array[FileStatus]): RDD[Row] = {

val job = new Job(SparkHadoopUtil.get.conf)
val conf = ContextUtil.getConfiguration(job)

ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])

if (inputPaths.nonEmpty) {
FileInputFormat.setInputPaths(job, inputPaths.map(new Path(_)): _*)
if (inputFiles.nonEmpty) {
FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
}

// Try to push down filters when filter push-down is enabled.
Expand All @@ -269,10 +268,7 @@ private[sql] class ParquetRelation2(
val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)

val inputFileStatuses =
metadataCache.dataStatuses.filter(f => inputPaths.contains(f.getPath.toString))

val footers = inputFileStatuses.map(metadataCache.footers)
val footers = inputFiles.map(f => metadataCache.footers(f.getPath))

// TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
// After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
Expand All @@ -287,7 +283,7 @@ private[sql] class ParquetRelation2(

val cacheMetadata = useMetadataCache

@transient val cachedStatuses = inputFileStatuses.map { f =>
@transient val cachedStatuses = inputFiles.map { f =>
// In order to encode the authority of a Path containing special characters such as /,
// we need to use the string returned by the URI of the path to create a new Path.
val pathWithAuthority = new Path(f.getPath.toUri.toString)
Expand Down Expand Up @@ -333,7 +329,7 @@ private[sql] class ParquetRelation2(
private var commonMetadataStatuses: Array[FileStatus] = _

// Parquet footer cache.
var footers: Map[FileStatus, Footer] = _
var footers: Map[Path, Footer] = _

// `FileStatus` objects of all data files (Parquet part-files).
var dataStatuses: Array[FileStatus] = _
Expand All @@ -349,35 +345,30 @@ private[sql] class ParquetRelation2(
* Refreshes `FileStatus`es, footers, partition spec, and table schema.
*/
def refresh(): Unit = {
// Support either reading a collection of raw Parquet part-files, or a collection of folders
// containing Parquet files (e.g. partitioned Parquet table).
val baseStatuses = paths.distinct.flatMap { p =>
val path = new Path(p)
val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
Try(fs.getFileStatus(qualified)).toOption
}
assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir))

// Lists `FileStatus`es of all leaf nodes (files) under all base directories.
val leaves = baseStatuses.flatMap { f =>
val fs = FileSystem.get(f.getPath.toUri, SparkHadoopUtil.get.conf)
SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
isSummaryFile(f.getPath) ||
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
}
}
val leaves = cachedLeafStatuses().filter { f =>
isSummaryFile(f.getPath) ||
!(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
}.toArray

dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
commonMetadataStatuses =
leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)

footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f =>
val parquetMetadata = ParquetFileReader.readFooter(
SparkHadoopUtil.get.conf, f, ParquetMetadataConverter.NO_FILTER)
f -> new Footer(f.getPath, parquetMetadata)
}.seq.toMap
footers = {
val conf = SparkHadoopUtil.get.conf
val taskSideMetaData = conf.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true)
val rawFooters = if (shouldMergeSchemas) {
ParquetFileReader.readAllFootersInParallel(
conf, seqAsJavaList(leaves), taskSideMetaData)
} else {
ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(
conf, seqAsJavaList(leaves), taskSideMetaData)
}

rawFooters.map(footer => footer.getFile -> footer).toMap
}

dataSchema = {
val dataSchema0 =
Expand Down Expand Up @@ -444,7 +435,7 @@ private[sql] class ParquetRelation2(
"No schema defined, " +
s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")

ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext)
ParquetRelation2.readSchema(filesToTouch.map(f => footers.apply(f.getPath)), sqlContext)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,16 @@

package org.apache.spark.sql.sources

import org.apache.hadoop.fs.Path

import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{UnionRDD, RDD}
import org.apache.spark.sql.Row
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.types.{StructType, UTF8String, StringType}
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StringType, StructType, UTF8String}
import org.apache.spark.sql.{SaveMode, Strategy, execution, sources}

/**
* A Strategy for planning scans over data sources defined using the sources API.
Expand Down Expand Up @@ -58,7 +54,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
filters,
(a, _) => t.buildScan(a)) :: Nil

// Scanning partitioned FSBasedRelation
// Scanning partitioned HadoopFsRelation
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation))
if t.partitionSpec.partitionColumns.nonEmpty =>
val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray
Expand Down Expand Up @@ -86,22 +82,13 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
t.partitionSpec.partitionColumns,
selectedPartitions) :: Nil

// Scanning non-partitioned FSBasedRelation
// Scanning non-partitioned HadoopFsRelation
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
val inputPaths = t.paths.map(new Path(_)).flatMap { path =>
val fs = path.getFileSystem(t.sqlContext.sparkContext.hadoopConfiguration)
val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.listLeafStatuses(fs, qualifiedPath).map(_.getPath).filterNot { path =>
val name = path.getName
name.startsWith("_") || name.startsWith(".")
}.map(fs.makeQualified(_).toString)
}

pruneFilterProject(
l,
projectList,
filters,
(a, f) => t.buildScan(a, f, inputPaths)) :: Nil
(a, f) => t.buildScan(a, f, t.paths)) :: Nil

case l @ LogicalRelation(t: TableScan) =>
createPhysicalRDD(l.relation, l.output, t.buildScan()) :: Nil
Expand Down Expand Up @@ -130,16 +117,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {

// Builds RDD[Row]s for each selected partition.
val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
// Paths to all data files within this partition
val dataFilePaths = {
val dirPath = new Path(dir)
val fs = dirPath.getFileSystem(SparkHadoopUtil.get.conf)
fs.listStatus(dirPath).map(_.getPath).filterNot { path =>
val name = path.getName
name.startsWith("_") || name.startsWith(".")
}.map(fs.makeQualified(_).toString)
}

// The table scan operator (PhysicalRDD) which retrieves required columns from data files.
// Notice that the schema of data files, represented by `relation.dataSchema`, may contain
// some partition column(s).
Expand All @@ -155,7 +132,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// assuming partition columns data stored in data files are always consistent with those
// partition values encoded in partition directory paths.
val nonPartitionColumns = requiredColumns.filterNot(partitionColNames.contains)
val dataRows = relation.buildScan(nonPartitionColumns, filters, dataFilePaths)
val dataRows = relation.buildScan(nonPartitionColumns, filters, Array(dir))

// Merges data values with partition values.
mergeWithPartitionValues(
Expand Down
Loading