Skip to content

Commit 7edbd64

Browse files
pwoodyrobert3005
authored andcommitted
SPARK-17059: Allow FileFormat to specify partition pruning strategy (apache#62)
1 parent aa4e13d commit 7edbd64

File tree

4 files changed

+214
-73
lines changed

4 files changed

+214
-73
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -438,14 +438,16 @@ case class FileSourceScanExec(
438438
val partitionFiles = selectedPartitions.flatMap { partition =>
439439
partition.files.map((_, partition.values))
440440
}
441+
val format = fsRelation.fileFormat
442+
val splitter =
443+
format.buildSplitter(session, fsRelation.location,
444+
dataFilters, schema, session.sessionState.newHadoopConf())
445+
441446
val bucketed = partitionFiles.flatMap { case (file, values) =>
442447
val blockLocations = getBlockLocations(file)
443448
val filePath = file.getPath.toUri.toString
444-
val format = fsRelation.fileFormat
445-
446449
if (format.isSplitable(session, fsRelation.options, file.getPath)) {
447-
val validSplits = format.getSplits(session, fsRelation.location, file,
448-
dataFilters, schema, session.sessionState.newHadoopConf())
450+
val validSplits = splitter(file)
449451
validSplits.map { split =>
450452
val hosts = getBlockHosts(blockLocations, split.getStart, split.getLength)
451453
PartitionedFile(values, filePath, split.getStart, split.getLength, hosts)
@@ -492,15 +494,18 @@ case class FileSourceScanExec(
492494
val partitionFiles = selectedPartitions.flatMap { partition =>
493495
partition.files.map((_, partition.values))
494496
}
497+
val format = fsRelation.fileFormat
498+
val splitter =
499+
format.buildSplitter(session, fsRelation.location,
500+
dataFilters, schema, session.sessionState.newHadoopConf())
501+
495502
val splitFiles = partitionFiles.flatMap { case (file, values) =>
496503
val blockLocations = getBlockLocations(file)
497504
val filePath = file.getPath.toUri.toString
498-
val format = fsRelation.fileFormat
499505

500506
// If the format is splittable, attempt to split and filter the file.
501507
if (format.isSplitable(session, fsRelation.options, file.getPath)) {
502-
val validSplits = format.getSplits(session, fsRelation.location, file,
503-
dataFilters, schema, session.sessionState.newHadoopConf())
508+
val validSplits = splitter(file)
504509
validSplits.flatMap { split =>
505510
val splitOffset = split.getStart
506511
val end = splitOffset + split.getLength

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,16 +90,15 @@ trait FileFormat {
9090
}
9191

9292
/**
93-
* For a file, return valid splits that may pass the given data filter.
93+
* Allow a splittable FileFormat to produce a function to split individual files.
9494
*/
95-
def getSplits(
95+
def buildSplitter(
9696
sparkSession: SparkSession,
9797
fileIndex: FileIndex,
98-
fileStatus: FileStatus,
9998
filters: Seq[Filter],
10099
schema: StructType,
101-
hadoopConf: Configuration): Seq[FileSplit] = {
102-
Seq(new FileSplit(fileStatus.getPath, 0, fileStatus.getLen, Array.empty))
100+
hadoopConf: Configuration): (FileStatus => Seq[FileSplit]) = {
101+
stat => Seq(new FileSplit(stat.getPath, 0, stat.getLen, Array.empty))
103102
}
104103

105104
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 50 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,21 @@ package org.apache.spark.sql.execution.datasources.parquet
1919

2020
import java.io.FileNotFoundException
2121
import java.net.URI
22+
import java.util.concurrent.{Callable, TimeUnit}
2223
import java.util.logging.{Logger => JLogger}
2324

2425
import scala.collection.JavaConverters._
2526
import scala.collection.mutable
2627
import scala.util.{Failure, Try}
2728

29+
import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification}
2830
import org.apache.hadoop.conf.Configuration
2931
import org.apache.hadoop.fs.{FileStatus, Path}
3032
import org.apache.hadoop.mapreduce._
3133
import org.apache.hadoop.mapreduce.lib.input.FileSplit
3234
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
3335
import org.apache.parquet.{Log => ApacheParquetLog}
34-
import org.apache.parquet.filter2.compat.{FilterCompat, RowGroupFilter}
36+
import org.apache.parquet.filter2.compat.FilterCompat
3537
import org.apache.parquet.filter2.predicate.FilterApi
3638
import org.apache.parquet.format.converter.ParquetMetadataConverter
3739
import org.apache.parquet.hadoop._
@@ -278,75 +280,47 @@ class ParquetFileFormat
278280
true
279281
}
280282

281-
override def getSplits(
282-
sparkSession: SparkSession,
283-
fileIndex: FileIndex,
284-
fileStatus: FileStatus,
285-
filters: Seq[Filter],
286-
schema: StructType,
287-
hadoopConf: Configuration): Seq[FileSplit] = {
288-
if (filters.isEmpty || !sparkSession.sessionState.conf.parquetPartitionPruningEnabled) {
283+
override def buildSplitter(
284+
sparkSession: SparkSession,
285+
fileIndex: FileIndex,
286+
filters: Seq[Filter],
287+
schema: StructType,
288+
hadoopConf: Configuration): (FileStatus => Seq[FileSplit]) = {
289+
val pruningEnabled = sparkSession.sessionState.conf.parquetPartitionPruningEnabled
290+
val defaultSplitter = super.buildSplitter(sparkSession, fileIndex, filters, schema, hadoopConf)
291+
if (!pruningEnabled || filters.isEmpty) {
289292
// Return immediately to save FileSystem overhead
290-
super.getSplits(sparkSession, fileIndex, fileStatus, filters, schema, hadoopConf)
293+
defaultSplitter
291294
} else {
292-
val filePath = fileStatus.getPath
293-
val rootOption: Option[Path] = fileIndex.rootPaths
294-
.find(root => filePath.toString.startsWith(root.toString))
295-
val metadataOption = rootOption.flatMap { root =>
296-
cachedMetadata.get(root).orElse(getMetadataForPath(filePath, root, hadoopConf))
297-
.map { metadata =>
298-
cachedMetadata.put(root, metadata)
299-
metadata
300-
}
295+
val splitters = fileIndex.rootPaths.map { root =>
296+
val splits = ParquetFileFormat.fileSplits.get(root,
297+
new Callable[ParquetFileSplitter] {
298+
override def call(): ParquetFileSplitter =
299+
createParquetFileSplits(root, hadoopConf, schema)
300+
})
301+
root -> splits.buildSplitter(filters)
302+
}.toMap
303+
val compositeSplitter: (FileStatus => Seq[FileSplit]) = { stat =>
304+
val filePath = stat.getPath
305+
val rootOption: Option[Path] = fileIndex.rootPaths
306+
.find(root => filePath.toString.startsWith(root.toString))
307+
val splitterForPath = rootOption.flatMap(splitters.get).getOrElse(defaultSplitter)
308+
splitterForPath(stat)
301309
}
302-
// If the metadata exists, filter the splits.
303-
// Otherwise, fall back to the default implementation.
304-
metadataOption
305-
.map(filterToSplits(fileStatus, _, rootOption.get, filters, schema, hadoopConf))
306-
.getOrElse(super.getSplits(sparkSession, fileIndex, fileStatus,
307-
filters, schema, hadoopConf))
310+
compositeSplitter
308311
}
309312
}
310313

311-
private def filterToSplits(
312-
fileStatus: FileStatus,
313-
metadata: ParquetMetadata,
314-
metadataRoot: Path,
315-
filters: Seq[Filter],
316-
schema: StructType,
317-
hadoopConf: Configuration): Seq[FileSplit] = {
318-
val metadataBlocks = metadata.getBlocks
319-
320-
// Ensure that the metadata has an entry for the file.
321-
// If it does not, do not filter at this stage.
322-
val metadataContainsPath = metadataBlocks.asScala.exists { bmd =>
323-
new Path(metadataRoot, bmd.getPath) == fileStatus.getPath
324-
}
325-
if (!metadataContainsPath) {
326-
log.warn(s"Found _metadata file for $metadataRoot," +
327-
s" but no entries for blocks in ${fileStatus.getPath}. Retaining whole file.")
328-
return Seq(new FileSplit(fileStatus.getPath, 0, fileStatus.getLen, Array.empty))
329-
}
330-
331-
val parquetSchema = metadata.getFileMetaData.getSchema
332-
val filter = FilterCompat.get(filters
333-
.flatMap(ParquetFilters.createFilter(schema, _))
334-
.reduce(FilterApi.and))
335-
val filteredMetadata =
336-
RowGroupFilter.filterRowGroups(filter, metadataBlocks, parquetSchema).asScala
337-
filteredMetadata.flatMap { bmd =>
338-
val bmdPath = new Path(metadataRoot, bmd.getPath)
339-
val fsPath = fileStatus.getPath
340-
if (bmdPath == fsPath) {
341-
Some(new FileSplit(bmdPath, bmd.getStartingPos, bmd.getTotalByteSize, Array.empty))
342-
} else {
343-
None
344-
}
345-
}
314+
private def createParquetFileSplits(
315+
root: Path,
316+
hadoopConf: Configuration,
317+
schema: StructType): ParquetFileSplitter = {
318+
getMetadataForPath(root, hadoopConf)
319+
.map(meta => new ParquetMetadataFileSplitter(root, meta.getBlocks.asScala, schema))
320+
.getOrElse(ParquetDefaultFileSplitter)
346321
}
347322

348323
private def getMetadataForPath(
349-
filePath: Path,
350324
rootPath: Path,
351325
conf: Configuration): Option[ParquetMetadata] = {
352326
val fs = rootPath.getFileSystem(conf)
@@ -523,6 +497,21 @@ class ParquetFileFormat
523497
}
524498

525499
object ParquetFileFormat extends Logging {
500+
501+
@transient private val fileSplits: Cache[Path, ParquetFileSplitter] =
502+
CacheBuilder.newBuilder()
503+
.expireAfterAccess(4, TimeUnit.HOURS)
504+
.concurrencyLevel(1)
505+
.softValues()
506+
.removalListener(new RemovalListener[Path, ParquetFileSplitter] {
507+
override def onRemoval(removalNotification:
508+
RemovalNotification[Path, ParquetFileSplitter]): Unit = {
509+
val path = removalNotification.getKey
510+
log.info(s"Removing value for path $path from cache, " +
511+
s"cause: ${removalNotification.getCause}")
512+
}
513+
}).build()
514+
526515
private[parquet] def readSchema(
527516
footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {
528517

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.parquet
19+
20+
import java.util.concurrent.TimeUnit
21+
22+
import scala.collection.JavaConverters._
23+
import scala.concurrent.{ExecutionContext, Future}
24+
25+
import com.google.common.cache.{Cache, CacheBuilder}
26+
import org.apache.hadoop.fs.{FileStatus, Path}
27+
import org.apache.hadoop.mapreduce.lib.input.FileSplit
28+
import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate}
29+
import org.apache.parquet.filter2.statisticslevel.StatisticsFilter
30+
import org.apache.parquet.hadoop.metadata.BlockMetaData
31+
import org.roaringbitmap.RoaringBitmap
32+
33+
import org.apache.spark.internal.Logging
34+
import org.apache.spark.sql.sources.Filter
35+
import org.apache.spark.sql.types.StructType
36+
import org.apache.spark.util.ThreadUtils
37+
38+
39+
abstract class ParquetFileSplitter {
40+
def buildSplitter(filters: Seq[Filter]): (FileStatus => Seq[FileSplit])
41+
42+
def singleFileSplit(stat: FileStatus): Seq[FileSplit] = {
43+
Seq(new FileSplit(stat.getPath, 0, stat.getLen, Array.empty))
44+
}
45+
}
46+
47+
object ParquetDefaultFileSplitter extends ParquetFileSplitter {
48+
override def buildSplitter(filters: Seq[Filter]): (FileStatus => Seq[FileSplit]) = {
49+
stat => singleFileSplit(stat)
50+
}
51+
}
52+
53+
class ParquetMetadataFileSplitter(
54+
val root: Path,
55+
val blocks: Seq[BlockMetaData],
56+
val schema: StructType)
57+
extends ParquetFileSplitter
58+
with Logging {
59+
60+
private val referencedFiles = blocks.map(bmd => new Path(root, bmd.getPath)).toSet
61+
62+
private val filterSets: Cache[Filter, RoaringBitmap] =
63+
CacheBuilder.newBuilder()
64+
.expireAfterAccess(4, TimeUnit.HOURS)
65+
.concurrencyLevel(1)
66+
.build()
67+
68+
override def buildSplitter(filters: Seq[Filter]): (FileStatus => Seq[FileSplit]) = {
69+
val (applied, unapplied, filteredBlocks) = this.synchronized {
70+
val (applied, unapplied) = filters.partition(filterSets.getIfPresent(_) != null)
71+
val filteredBlocks = filterSets.getAllPresent(applied.asJava).values().asScala
72+
.reduceOption(RoaringBitmap.and)
73+
.map { bitmap =>
74+
blocks.zipWithIndex.filter { case(block, index) =>
75+
bitmap.contains(index)
76+
}.map(_._1)
77+
}.getOrElse(blocks)
78+
(applied, unapplied, filteredBlocks)
79+
}
80+
81+
val eligible = parquetFilter(unapplied, filteredBlocks).map { bmd =>
82+
val blockPath = new Path(root, bmd.getPath)
83+
new FileSplit(blockPath, bmd.getStartingPos, bmd.getTotalByteSize, Array.empty)
84+
}
85+
86+
val statFilter: (FileStatus => Seq[FileSplit]) = { stat =>
87+
if (referencedFiles.contains(stat.getPath)) {
88+
eligible.filter(_.getPath == stat.getPath)
89+
} else {
90+
log.warn(s"Found _metadata file for $root," +
91+
s" but no entries for blocks in ${stat.getPath}. Retaining whole file.")
92+
singleFileSplit(stat)
93+
}
94+
}
95+
statFilter
96+
}
97+
98+
private def parquetFilter(
99+
filters: Seq[Filter],
100+
blocks: Seq[BlockMetaData]): Seq[BlockMetaData] = {
101+
if (filters.nonEmpty) {
102+
// Asynchronously build bitmaps
103+
Future {
104+
buildFilterBitMaps(filters)
105+
}(ParquetMetadataFileSplitter.executionContext)
106+
107+
val predicate = filters.flatMap {
108+
ParquetFilters.createFilter(schema, _)
109+
}.reduce(FilterApi.and)
110+
blocks.filter(bmd => !StatisticsFilter.canDrop(predicate, bmd.getColumns))
111+
} else {
112+
blocks
113+
}
114+
}
115+
116+
private def buildFilterBitMaps(filters: Seq[Filter]): Unit = {
117+
this.synchronized {
118+
// Only build bitmaps for filters that don't exist.
119+
val sets = filters
120+
.filter(filterSets.getIfPresent(_) == null)
121+
.flatMap { filter =>
122+
val bitmap = new RoaringBitmap
123+
ParquetFilters.createFilter(schema, filter)
124+
.map((filter, _, bitmap))
125+
}
126+
var i = 0
127+
val blockLen = blocks.size
128+
while (i < blockLen) {
129+
val bmd = blocks(i)
130+
sets.foreach { case (filter, parquetFilter, bitmap) =>
131+
if (!StatisticsFilter.canDrop(parquetFilter, bmd.getColumns)) {
132+
bitmap.add(i)
133+
}
134+
}
135+
i += 1
136+
}
137+
val mapping = sets.map { case (filter, _, bitmap) =>
138+
bitmap.runOptimize()
139+
filter -> bitmap
140+
}.toMap.asJava
141+
filterSets.putAll(mapping)
142+
}
143+
}
144+
}
145+
object ParquetMetadataFileSplitter {
146+
private val executionContext = ExecutionContext.fromExecutorService(
147+
ThreadUtils.newDaemonCachedThreadPool("parquet-metadata-filter", 1))
148+
}

0 commit comments

Comments
 (0)