Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added assembly/20ts/._SUCCESS.crc
Binary file not shown.
Binary file not shown.
Empty file added assembly/20ts/_SUCCESS
Empty file.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
+ " out of: " + Arrays.toString(foundRowGroupOffsets)
+ " in range " + split.getStart() + ", " + split.getEnd());
}
this.reader = new ParquetFileReader(configuration, file, footer);
this.reader = ParquetFileReader.open(configuration, file, footer);
}
this.fileSchema = footer.getFileMetaData().getSchema();
Map<String, String> fileMetadata = footer.getFileMetaData().getKeyValueMetaData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ class ParquetFileFormat
override def call(): ParquetFileSplitter =
createParquetFileSplits(root, hadoopConf, schema)
})
root -> splits.buildSplitter(filters)
root -> splits.buildSplitter(filters, sparkSession.sessionState.conf)
}.toMap
val compositeSplitter: (FileStatus => Seq[FileSplit]) = { stat =>
val filePath = stat.getPath
Expand Down Expand Up @@ -382,13 +382,14 @@ class ParquetFileFormat
requiredSchema).asInstanceOf[StructType]
ParquetWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)

val int96AsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
// Sets flags for `CatalystSchemaConverter`
hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sparkSession.sessionState.conf.isParquetBinaryAsString)
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
int96AsTimestamp)

// Try to push down filters when filter push-down is enabled.
val pushed =
Expand All @@ -397,7 +398,7 @@ class ParquetFileFormat
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(ParquetFilters.createFilter(requiredSchema, _))
.flatMap(ParquetFilters.createFilter(requiredSchema, _, int96AsTimestamp))
.reduceOption(FilterApi.and)
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,31 @@ import scala.concurrent.{ExecutionContext, Future}
import com.google.common.cache.{Cache, CacheBuilder}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate}
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.filter2.statisticslevel.StatisticsFilter
import org.apache.parquet.hadoop.metadata.BlockMetaData
import org.roaringbitmap.RoaringBitmap

import org.apache.spark.internal.Logging
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ThreadUtils



abstract class ParquetFileSplitter {
def buildSplitter(filters: Seq[Filter]): (FileStatus => Seq[FileSplit])
def buildSplitter(filters: Seq[Filter], sQLConf: SQLConf): (FileStatus => Seq[FileSplit])

def singleFileSplit(stat: FileStatus): Seq[FileSplit] = {
Seq(new FileSplit(stat.getPath, 0, stat.getLen, Array.empty))
}
}

object ParquetDefaultFileSplitter extends ParquetFileSplitter {
override def buildSplitter(filters: Seq[Filter]): (FileStatus => Seq[FileSplit]) = {
stat => singleFileSplit(stat)
}
override def buildSplitter(
filters: Seq[Filter],
sQLConf: SQLConf): (FileStatus => Seq[FileSplit]) = singleFileSplit
}

class ParquetMetadataFileSplitter(
Expand All @@ -65,7 +67,9 @@ class ParquetMetadataFileSplitter(
.concurrencyLevel(1)
.build()

override def buildSplitter(filters: Seq[Filter]): (FileStatus => Seq[FileSplit]) = {
override def buildSplitter(
filters: Seq[Filter],
sQLConf: SQLConf): (FileStatus => Seq[FileSplit]) = {
val (applied, unapplied, filteredBlocks) = this.synchronized {
val (applied, unapplied) = filters.partition(filterSets.getIfPresent(_) != null)
val filteredBlocks = filterSets.getAllPresent(applied.asJava).values().asScala
Expand All @@ -78,7 +82,8 @@ class ParquetMetadataFileSplitter(
(applied, unapplied, filteredBlocks)
}

val eligible = applyParquetFilter(unapplied, filteredBlocks).map { bmd =>
val eligible = applyParquetFilter(unapplied, filteredBlocks,
sQLConf.isParquetINT96AsTimestamp).map { bmd =>
val blockPath = new Path(root, bmd.getPath)
new FileSplit(blockPath, bmd.getStartingPos, bmd.getCompressedSize, Array.empty)
}
Expand All @@ -97,14 +102,15 @@ class ParquetMetadataFileSplitter(

private def applyParquetFilter(
filters: Seq[Filter],
blocks: Seq[BlockMetaData]): Seq[BlockMetaData] = {
blocks: Seq[BlockMetaData],
int96AsTimestamp: Boolean): Seq[BlockMetaData] = {
val predicates = filters.flatMap {
ParquetFilters.createFilter(schema, _)
ParquetFilters.createFilter(schema, _, int96AsTimestamp)
}
if (predicates.nonEmpty) {
// Asynchronously build bitmaps
Future {
buildFilterBitMaps(filters)
buildFilterBitMaps(filters, int96AsTimestamp)
}(ParquetMetadataFileSplitter.executionContext)

val predicate = predicates.reduce(FilterApi.and)
Expand All @@ -114,21 +120,21 @@ class ParquetMetadataFileSplitter(
}
}

private def buildFilterBitMaps(filters: Seq[Filter]): Unit = {
private def buildFilterBitMaps(filters: Seq[Filter], int96AsTimestamp: Boolean): Unit = {
this.synchronized {
// Only build bitmaps for filters that don't exist.
val sets = filters
.filter(filterSets.getIfPresent(_) == null)
.flatMap { filter =>
val bitmap = new RoaringBitmap
ParquetFilters.createFilter(schema, filter)
ParquetFilters.createFilter(schema, filter, int96AsTimestamp)
.map((filter, _, bitmap))
}
var i = 0
val blockLen = blocks.size
while (i < blockLen) {
val bmd = blocks(i)
sets.foreach { case (filter, parquetFilter, bitmap) =>
sets.foreach { case (_, parquetFilter, bitmap) =>
if (!StatisticsFilter.canDrop(parquetFilter, bmd.getColumns)) {
bitmap.add(i)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case TimestampType =>
(n: String, v: Any) => FilterApi.eq(
longColumn(n), v.asInstanceOf[java.sql.Timestamp].asInstanceOf[java.lang.Long])
case DateType =>
(n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer])
}

private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -70,6 +75,12 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
case TimestampType =>
(n: String, v: Any) => FilterApi.notEq(
longColumn(n), v.asInstanceOf[java.sql.Timestamp].asInstanceOf[java.lang.Long])
case DateType =>
(n: String, v: Any) => FilterApi.notEq(
intColumn(n), v.asInstanceOf[java.sql.Date].asInstanceOf[Integer])
}

private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -88,6 +99,12 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case TimestampType =>
(n: String, v: Any) => FilterApi.lt(
longColumn(n), v.asInstanceOf[java.sql.Timestamp].asInstanceOf[java.lang.Long])
case DateType =>
(n: String, v: Any) => FilterApi.lt(
intColumn(n), v.asInstanceOf[java.sql.Date].asInstanceOf[Integer])
}

private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -106,6 +123,12 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case TimestampType =>
(n: String, v: Any) => FilterApi.ltEq(
longColumn(n), v.asInstanceOf[java.sql.Timestamp].asInstanceOf[java.lang.Long])
case DateType =>
(n: String, v: Any) => FilterApi.ltEq(
intColumn(n), v.asInstanceOf[java.sql.Date].asInstanceOf[Integer])
}

private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -124,6 +147,12 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case TimestampType =>
(n: String, v: Any) => FilterApi.gt(
longColumn(n), v.asInstanceOf[java.sql.Timestamp].asInstanceOf[java.lang.Long])
case DateType =>
(n: String, v: Any) => FilterApi.gt(
intColumn(n), v.asInstanceOf[java.sql.Date].asInstanceOf[Integer])
}

private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -142,6 +171,12 @@ private[parquet] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
case TimestampType =>
(n: String, v: Any) => FilterApi.gtEq(
longColumn(n), v.asInstanceOf[java.sql.Timestamp].asInstanceOf[java.lang.Long])
case DateType =>
(n: String, v: Any) => FilterApi.gtEq(
intColumn(n), v.asInstanceOf[java.sql.Date].asInstanceOf[Integer])
}

/**
Expand All @@ -153,23 +188,29 @@ private[parquet] object ParquetFilters {
* using such fields, otherwise Parquet library will throw exception (PARQUET-389).
* Here we filter out such fields.
*/
private def getFieldMap(dataType: DataType): Map[String, DataType] = dataType match {
case StructType(fields) =>
// Here we don't flatten the fields in the nested schema but just look up through
// root fields. Currently, accessing to nested fields does not push down filters
// and it does not support to create filters for them.
fields.filter { f =>
!f.metadata.contains(StructType.metadataKeyForOptionalField) ||
!f.metadata.getBoolean(StructType.metadataKeyForOptionalField)
}.map(f => f.name -> f.dataType).toMap
case _ => Map.empty[String, DataType]
}
private def getFieldMap(dataType: DataType, int96AsTimestamp: Boolean): Map[String, DataType] =
dataType match {
case StructType(fields) =>
// Here we don't flatten the fields in the nested schema but just look up through
// root fields. Currently, accessing to nested fields does not push down filters
// and it does not support to create filters for them.
fields.filter { f =>
!int96AsTimestamp ||
f.dataType != DataTypes.TimestampType ||
!f.metadata.contains(StructType.metadataKeyForOptionalField) ||
!f.metadata.getBoolean(StructType.metadataKeyForOptionalField)
}.map(f => f.name -> f.dataType).toMap
case _ => Map.empty[String, DataType]
}

/**
* Converts data sources filters to Parquet filter predicates.
*/
def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
val dataTypeOf = getFieldMap(schema)
def createFilter(
schema: StructType,
predicate: sources.Filter,
int96AsTimestamp: Boolean): Option[FilterPredicate] = {
val dataTypeOf = getFieldMap(schema, int96AsTimestamp)

// NOTE:
//
Expand Down Expand Up @@ -221,18 +262,20 @@ private[parquet] object ParquetFilters {
// Pushing one side of AND down is only safe to do at the top level.
// You can see ParquetRelation's initializeLocalJobFunc method as an example.
for {
lhsFilter <- createFilter(schema, lhs)
rhsFilter <- createFilter(schema, rhs)
lhsFilter <- createFilter(schema, lhs, int96AsTimestamp)
rhsFilter <- createFilter(schema, rhs, int96AsTimestamp)
} yield FilterApi.and(lhsFilter, rhsFilter)

case sources.Or(lhs, rhs) =>
for {
lhsFilter <- createFilter(schema, lhs)
rhsFilter <- createFilter(schema, rhs)
lhsFilter <- createFilter(schema, lhs, int96AsTimestamp)
rhsFilter <- createFilter(schema, rhs, int96AsTimestamp)
} yield FilterApi.or(lhsFilter, rhsFilter)

case sources.Not(pred) =>
createFilter(schema, pred).map(FilterApi.not).map(LogicalInverseRewriter.rewrite)
createFilter(schema, pred, int96AsTimestamp)
.map(FilterApi.not)
.map(LogicalInverseRewriter.rewrite)

case sources.In(name, values) if dataTypeOf.contains(name) =>
val eq = makeEq.lift(dataTypeOf(name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,10 @@ private[parquet] class ParquetRowConverter(
case StringType =>
new ParquetStringConverter(updater)

case _: TimestampType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 =>
new ParquetPrimitiveConverter(updater)

case TimestampType =>
// TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
new ParquetPrimitiveConverter(updater) {
// Converts nanosecond timestamps stored as INT96
override def addBinary(value: Binary): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ private[parquet] class ParquetSchemaConverter(
case DECIMAL => makeDecimalType(Decimal.MAX_LONG_DIGITS)
case UINT_64 => typeNotSupported()
case TIMESTAMP_MILLIS => typeNotImplemented()
case TIMESTAMP_MICROS => TimestampType
case _ => illegalType()
}

Expand Down Expand Up @@ -368,14 +369,8 @@ private[parquet] class ParquetSchemaConverter(
// from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store
// a timestamp into a `Long`. This design decision is subject to change though, for example,
// we may resort to microsecond precision in the future.
//
// For Parquet, we plan to write all `TimestampType` value as `TIMESTAMP_MICROS`, but it's
// currently not implemented yet because parquet-mr 1.7.0 (the version we're currently using)
// hasn't implemented `TIMESTAMP_MICROS` yet.
//
// TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that.
case TimestampType =>
Types.primitive(INT96, repetition).named(field.name)
Types.primitive(INT64, repetition).as(TIMESTAMP_MICROS).named(field.name)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in line with @ash211 question, how does this play with assumeInt96IsTimestamp set from spark.sql.parquet.int96AsTimestamp and defaults to true?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code path is when writing. We just stop writing INT96. I haven't removed code path for reading INT96


case BinaryType =>
Types.primitive(BINARY, repetition).named(field.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.nio.{ByteBuffer, ByteOrder}
import java.util

import scala.collection.JavaConverters.mapAsJavaMapConverter
Expand All @@ -32,7 +31,6 @@ import org.apache.parquet.io.api.{Binary, RecordConsumer}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter.minBytesForPrecision
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -154,20 +152,8 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
Binary.fromReusedByteArray(row.getUTF8String(ordinal).getBytes))

case TimestampType =>
(row: SpecializedGetters, ordinal: Int) => {
// TODO Writes `TimestampType` values as `TIMESTAMP_MICROS` once parquet-mr implements it
// Currently we only support timestamps stored as INT96, which is compatible with Hive
// and Impala. However, INT96 is to be deprecated. We plan to support `TIMESTAMP_MICROS`
// defined in the parquet-format spec. But up until writing, the most recent parquet-mr
// version (1.8.1) hasn't implemented it yet.

// NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has microsecond
// precision. Nanosecond parts of timestamp values read from INT96 are simply stripped.
val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal))
val buf = ByteBuffer.wrap(timestampBuffer)
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
}
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addLong(row.getLong(ordinal))

case BinaryType =>
(row: SpecializedGetters, ordinal: Int) =>
Expand Down