diff --git a/assembly/20ts/._SUCCESS.crc b/assembly/20ts/._SUCCESS.crc new file mode 100644 index 0000000000000..3b7b044936a89 Binary files /dev/null and b/assembly/20ts/._SUCCESS.crc differ diff --git a/assembly/20ts/.part-00000-00140901-522c-4627-9227-5ac0947a4b18.snappy.parquet.crc b/assembly/20ts/.part-00000-00140901-522c-4627-9227-5ac0947a4b18.snappy.parquet.crc new file mode 100644 index 0000000000000..2cb31be7432c8 Binary files /dev/null and b/assembly/20ts/.part-00000-00140901-522c-4627-9227-5ac0947a4b18.snappy.parquet.crc differ diff --git a/assembly/20ts/_SUCCESS b/assembly/20ts/_SUCCESS new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/assembly/20ts/part-00000-00140901-522c-4627-9227-5ac0947a4b18.snappy.parquet b/assembly/20ts/part-00000-00140901-522c-4627-9227-5ac0947a4b18.snappy.parquet new file mode 100644 index 0000000000000..3cf5e170f43ee Binary files /dev/null and b/assembly/20ts/part-00000-00140901-522c-4627-9227-5ac0947a4b18.snappy.parquet differ diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 0e46e062c94bb..e667473b05076 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -140,7 +140,7 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont + " out of: " + Arrays.toString(foundRowGroupOffsets) + " in range " + split.getStart() + ", " + split.getEnd()); } - this.reader = new ParquetFileReader(configuration, file, footer); + this.reader = ParquetFileReader.open(configuration, file, footer); } this.fileSchema = footer.getFileMetaData().getSchema(); Map fileMetadata = footer.getFileMetaData().getKeyValueMetaData(); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 173ee97ae4a9f..4e00397f908f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -304,7 +304,7 @@ class ParquetFileFormat override def call(): ParquetFileSplitter = createParquetFileSplits(root, hadoopConf, schema) }) - root -> splits.buildSplitter(filters) + root -> splits.buildSplitter(filters, sparkSession.sessionState.conf) }.toMap val compositeSplitter: (FileStatus => Seq[FileSplit]) = { stat => val filePath = stat.getPath @@ -382,13 +382,14 @@ class ParquetFileFormat requiredSchema).asInstanceOf[StructType] ParquetWriteSupport.setSchema(dataSchemaToWrite, hadoopConf) + val int96AsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp // Sets flags for `CatalystSchemaConverter` hadoopConf.setBoolean( SQLConf.PARQUET_BINARY_AS_STRING.key, sparkSession.sessionState.conf.isParquetBinaryAsString) hadoopConf.setBoolean( SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, - sparkSession.sessionState.conf.isParquetINT96AsTimestamp) + int96AsTimestamp) // Try to push down filters when filter push-down is enabled. val pushed = @@ -397,7 +398,7 @@ class ParquetFileFormat // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` // is used here. - .flatMap(ParquetFilters.createFilter(requiredSchema, _)) + .flatMap(ParquetFilters.createFilter(requiredSchema, _, int96AsTimestamp)) .reduceOption(FilterApi.and) } else { None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileSplitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileSplitter.scala index b17814e952c17..48e68fb169dac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileSplitter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileSplitter.scala @@ -25,19 +25,21 @@ import scala.concurrent.{ExecutionContext, Future} import com.google.common.cache.{Cache, CacheBuilder} import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce.lib.input.FileSplit -import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate} +import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.filter2.statisticslevel.StatisticsFilter import org.apache.parquet.hadoop.metadata.BlockMetaData import org.roaringbitmap.RoaringBitmap import org.apache.spark.internal.Logging +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.util.ThreadUtils + abstract class ParquetFileSplitter { - def buildSplitter(filters: Seq[Filter]): (FileStatus => Seq[FileSplit]) + def buildSplitter(filters: Seq[Filter], sQLConf: SQLConf): (FileStatus => Seq[FileSplit]) def singleFileSplit(stat: FileStatus): Seq[FileSplit] = { Seq(new FileSplit(stat.getPath, 0, stat.getLen, Array.empty)) @@ -45,9 +47,9 @@ abstract class ParquetFileSplitter { } object ParquetDefaultFileSplitter extends ParquetFileSplitter { - override def buildSplitter(filters: Seq[Filter]): (FileStatus => Seq[FileSplit]) = { - stat => singleFileSplit(stat) - } + override def buildSplitter( + filters: Seq[Filter], + sQLConf: SQLConf): (FileStatus => Seq[FileSplit]) = singleFileSplit } class ParquetMetadataFileSplitter( @@ -65,7 +67,9 @@ class ParquetMetadataFileSplitter( .concurrencyLevel(1) .build() - override def buildSplitter(filters: Seq[Filter]): (FileStatus => Seq[FileSplit]) = { + override def buildSplitter( + filters: Seq[Filter], + sQLConf: SQLConf): (FileStatus => Seq[FileSplit]) = { val (applied, unapplied, filteredBlocks) = this.synchronized { val (applied, unapplied) = filters.partition(filterSets.getIfPresent(_) != null) val filteredBlocks = filterSets.getAllPresent(applied.asJava).values().asScala @@ -78,7 +82,8 @@ class ParquetMetadataFileSplitter( (applied, unapplied, filteredBlocks) } - val eligible = applyParquetFilter(unapplied, filteredBlocks).map { bmd => + val eligible = applyParquetFilter(unapplied, filteredBlocks, + sQLConf.isParquetINT96AsTimestamp).map { bmd => val blockPath = new Path(root, bmd.getPath) new FileSplit(blockPath, bmd.getStartingPos, bmd.getCompressedSize, Array.empty) } @@ -97,14 +102,15 @@ class ParquetMetadataFileSplitter( private def applyParquetFilter( filters: Seq[Filter], - blocks: Seq[BlockMetaData]): Seq[BlockMetaData] = { + blocks: Seq[BlockMetaData], + int96AsTimestamp: Boolean): Seq[BlockMetaData] = { val predicates = filters.flatMap { - ParquetFilters.createFilter(schema, _) + ParquetFilters.createFilter(schema, _, int96AsTimestamp) } if (predicates.nonEmpty) { // Asynchronously build bitmaps Future { - buildFilterBitMaps(filters) + buildFilterBitMaps(filters, int96AsTimestamp) }(ParquetMetadataFileSplitter.executionContext) val predicate = predicates.reduce(FilterApi.and) @@ -114,21 +120,21 @@ class ParquetMetadataFileSplitter( } } - private def buildFilterBitMaps(filters: Seq[Filter]): Unit = { + private def buildFilterBitMaps(filters: Seq[Filter], int96AsTimestamp: Boolean): Unit = { this.synchronized { // Only build bitmaps for filters that don't exist. val sets = filters .filter(filterSets.getIfPresent(_) == null) .flatMap { filter => val bitmap = new RoaringBitmap - ParquetFilters.createFilter(schema, filter) + ParquetFilters.createFilter(schema, filter, int96AsTimestamp) .map((filter, _, bitmap)) } var i = 0 val blockLen = blocks.size while (i < blockLen) { val bmd = blocks(i) - sets.foreach { case (filter, parquetFilter, bitmap) => + sets.foreach { case (_, parquetFilter, bitmap) => if (!StatisticsFilter.canDrop(parquetFilter, bmd.getColumns)) { bitmap.add(i) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 28a544c67ba3c..e6633eae0bef7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -49,6 +49,11 @@ private[parquet] object ParquetFilters { (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) + case TimestampType => + (n: String, v: Any) => FilterApi.eq( + longColumn(n), v.asInstanceOf[java.sql.Timestamp].asInstanceOf[java.lang.Long]) + case DateType => + (n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer]) } private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -70,6 +75,12 @@ private[parquet] object ParquetFilters { (n: String, v: Any) => FilterApi.notEq( binaryColumn(n), Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) + case TimestampType => + (n: String, v: Any) => FilterApi.notEq( + longColumn(n), v.asInstanceOf[java.sql.Timestamp].asInstanceOf[java.lang.Long]) + case DateType => + (n: String, v: Any) => FilterApi.notEq( + intColumn(n), v.asInstanceOf[java.sql.Date].asInstanceOf[Integer]) } private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -88,6 +99,12 @@ private[parquet] object ParquetFilters { case BinaryType => (n: String, v: Any) => FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) + case TimestampType => + (n: String, v: Any) => FilterApi.lt( + longColumn(n), v.asInstanceOf[java.sql.Timestamp].asInstanceOf[java.lang.Long]) + case DateType => + (n: String, v: Any) => FilterApi.lt( + intColumn(n), v.asInstanceOf[java.sql.Date].asInstanceOf[Integer]) } private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -106,6 +123,12 @@ private[parquet] object ParquetFilters { case BinaryType => (n: String, v: Any) => FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) + case TimestampType => + (n: String, v: Any) => FilterApi.ltEq( + longColumn(n), v.asInstanceOf[java.sql.Timestamp].asInstanceOf[java.lang.Long]) + case DateType => + (n: String, v: Any) => FilterApi.ltEq( + intColumn(n), v.asInstanceOf[java.sql.Date].asInstanceOf[Integer]) } private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -124,6 +147,12 @@ private[parquet] object ParquetFilters { case BinaryType => (n: String, v: Any) => FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) + case TimestampType => + (n: String, v: Any) => FilterApi.gt( + longColumn(n), v.asInstanceOf[java.sql.Timestamp].asInstanceOf[java.lang.Long]) + case DateType => + (n: String, v: Any) => FilterApi.gt( + intColumn(n), v.asInstanceOf[java.sql.Date].asInstanceOf[Integer]) } private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { @@ -142,6 +171,12 @@ private[parquet] object ParquetFilters { case BinaryType => (n: String, v: Any) => FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) + case TimestampType => + (n: String, v: Any) => FilterApi.gtEq( + longColumn(n), v.asInstanceOf[java.sql.Timestamp].asInstanceOf[java.lang.Long]) + case DateType => + (n: String, v: Any) => FilterApi.gtEq( + intColumn(n), v.asInstanceOf[java.sql.Date].asInstanceOf[Integer]) } /** @@ -153,23 +188,29 @@ private[parquet] object ParquetFilters { * using such fields, otherwise Parquet library will throw exception (PARQUET-389). * Here we filter out such fields. */ - private def getFieldMap(dataType: DataType): Map[String, DataType] = dataType match { - case StructType(fields) => - // Here we don't flatten the fields in the nested schema but just look up through - // root fields. Currently, accessing to nested fields does not push down filters - // and it does not support to create filters for them. - fields.filter { f => - !f.metadata.contains(StructType.metadataKeyForOptionalField) || - !f.metadata.getBoolean(StructType.metadataKeyForOptionalField) - }.map(f => f.name -> f.dataType).toMap - case _ => Map.empty[String, DataType] - } + private def getFieldMap(dataType: DataType, int96AsTimestamp: Boolean): Map[String, DataType] = + dataType match { + case StructType(fields) => + // Here we don't flatten the fields in the nested schema but just look up through + // root fields. Currently, accessing to nested fields does not push down filters + // and it does not support to create filters for them. + fields.filter { f => + !int96AsTimestamp || + f.dataType != DataTypes.TimestampType || + !f.metadata.contains(StructType.metadataKeyForOptionalField) || + !f.metadata.getBoolean(StructType.metadataKeyForOptionalField) + }.map(f => f.name -> f.dataType).toMap + case _ => Map.empty[String, DataType] + } /** * Converts data sources filters to Parquet filter predicates. */ - def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { - val dataTypeOf = getFieldMap(schema) + def createFilter( + schema: StructType, + predicate: sources.Filter, + int96AsTimestamp: Boolean): Option[FilterPredicate] = { + val dataTypeOf = getFieldMap(schema, int96AsTimestamp) // NOTE: // @@ -221,18 +262,20 @@ private[parquet] object ParquetFilters { // Pushing one side of AND down is only safe to do at the top level. // You can see ParquetRelation's initializeLocalJobFunc method as an example. for { - lhsFilter <- createFilter(schema, lhs) - rhsFilter <- createFilter(schema, rhs) + lhsFilter <- createFilter(schema, lhs, int96AsTimestamp) + rhsFilter <- createFilter(schema, rhs, int96AsTimestamp) } yield FilterApi.and(lhsFilter, rhsFilter) case sources.Or(lhs, rhs) => for { - lhsFilter <- createFilter(schema, lhs) - rhsFilter <- createFilter(schema, rhs) + lhsFilter <- createFilter(schema, lhs, int96AsTimestamp) + rhsFilter <- createFilter(schema, rhs, int96AsTimestamp) } yield FilterApi.or(lhsFilter, rhsFilter) case sources.Not(pred) => - createFilter(schema, pred).map(FilterApi.not).map(LogicalInverseRewriter.rewrite) + createFilter(schema, pred, int96AsTimestamp) + .map(FilterApi.not) + .map(LogicalInverseRewriter.rewrite) case sources.In(name, values) if dataTypeOf.contains(name) => val eq = makeEq.lift(dataTypeOf(name)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 33dcf2f3fd167..fec62e2e90617 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -252,8 +252,10 @@ private[parquet] class ParquetRowConverter( case StringType => new ParquetStringConverter(updater) + case _: TimestampType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 => + new ParquetPrimitiveConverter(updater) + case TimestampType => - // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that. new ParquetPrimitiveConverter(updater) { // Converts nanosecond timestamps stored as INT96 override def addBinary(value: Binary): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index b4f36ce3752c0..a7214d8af7134 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -159,6 +159,7 @@ private[parquet] class ParquetSchemaConverter( case DECIMAL => makeDecimalType(Decimal.MAX_LONG_DIGITS) case UINT_64 => typeNotSupported() case TIMESTAMP_MILLIS => typeNotImplemented() + case TIMESTAMP_MICROS => TimestampType case _ => illegalType() } @@ -368,14 +369,8 @@ private[parquet] class ParquetSchemaConverter( // from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store // a timestamp into a `Long`. This design decision is subject to change though, for example, // we may resort to microsecond precision in the future. - // - // For Parquet, we plan to write all `TimestampType` value as `TIMESTAMP_MICROS`, but it's - // currently not implemented yet because parquet-mr 1.7.0 (the version we're currently using) - // hasn't implemented `TIMESTAMP_MICROS` yet. - // - // TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that. case TimestampType => - Types.primitive(INT96, repetition).named(field.name) + Types.primitive(INT64, repetition).as(TIMESTAMP_MICROS).named(field.name) case BinaryType => Types.primitive(BINARY, repetition).named(field.name) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index a31d2b9c37e9d..5a08973078e20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.nio.{ByteBuffer, ByteOrder} import java.util import scala.collection.JavaConverters.mapAsJavaMapConverter @@ -32,7 +31,6 @@ import org.apache.parquet.io.api.{Binary, RecordConsumer} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter.minBytesForPrecision import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -154,20 +152,8 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit Binary.fromReusedByteArray(row.getUTF8String(ordinal).getBytes)) case TimestampType => - (row: SpecializedGetters, ordinal: Int) => { - // TODO Writes `TimestampType` values as `TIMESTAMP_MICROS` once parquet-mr implements it - // Currently we only support timestamps stored as INT96, which is compatible with Hive - // and Impala. However, INT96 is to be deprecated. We plan to support `TIMESTAMP_MICROS` - // defined in the parquet-format spec. But up until writing, the most recent parquet-mr - // version (1.8.1) hasn't implemented it yet. - - // NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has microsecond - // precision. Nanosecond parts of timestamp values read from INT96 are simply stripped. - val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal)) - val buf = ByteBuffer.wrap(timestampBuffer) - buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay) - recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer)) - } + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addLong(row.getLong(ordinal)) case BinaryType => (row: SpecializedGetters, ordinal: Int) =>