diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index cd4eed6e2937..8441bf87efe5 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -17,6 +17,7 @@ arpack_combined_all-0.1.jar arrow-format-0.12.0.jar arrow-memory-0.12.0.jar arrow-vector-0.12.0.jar +audience-annotations-0.7.0.jar automaton-1.11-8.jar avro-1.8.2.jar avro-ipc-1.8.2.jar @@ -162,13 +163,13 @@ orc-shims-1.5.5.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar paranamer-2.8.jar -parquet-column-1.10.1.jar -parquet-common-1.10.1.jar -parquet-encoding-1.10.1.jar -parquet-format-2.4.0.jar -parquet-hadoop-1.10.1.jar +parquet-column-1.11.0.jar +parquet-common-1.11.0.jar +parquet-encoding-1.11.0.jar +parquet-format-structures-1.11.0.jar +parquet-hadoop-1.11.0.jar parquet-hadoop-bundle-1.6.0.jar -parquet-jackson-1.10.1.jar +parquet-jackson-1.11.0.jar protobuf-java-2.5.0.jar py4j-0.10.8.1.jar pyrolite-4.13.jar diff --git a/dev/deps/spark-deps-hadoop-3.2 b/dev/deps/spark-deps-hadoop-3.2 index 6f3bbce3e746..3f0a665c54ef 100644 --- a/dev/deps/spark-deps-hadoop-3.2 +++ b/dev/deps/spark-deps-hadoop-3.2 @@ -181,13 +181,13 @@ orc-shims-1.5.5.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar paranamer-2.8.jar -parquet-column-1.10.1.jar -parquet-common-1.10.1.jar -parquet-encoding-1.10.1.jar -parquet-format-2.4.0.jar -parquet-hadoop-1.10.1.jar +parquet-column-1.11.0.jar +parquet-common-1.11.0.jar +parquet-encoding-1.11.0.jar +parquet-format-structures-1.11.0.jar +parquet-hadoop-1.11.0.jar parquet-hadoop-bundle-1.6.0.jar -parquet-jackson-1.10.1.jar +parquet-jackson-1.11.0.jar protobuf-java-2.5.0.jar py4j-0.10.8.1.jar pyrolite-4.13.jar diff --git a/dev/run-tests.py b/dev/run-tests.py index dfad2991077b..c159f4d11ea2 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -294,7 +294,8 @@ def get_hadoop_profiles(hadoop_version): def build_spark_maven(hadoop_version): # Enable all of the profiles for the build: build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags - mvn_goals = ["clean", "package", "-DskipTests"] + mvn_goals = ["dependency:purge-local-repository", "-Dinclude=org.apache.parquet", + "clean", "package", "-DskipTests"] profiles_and_goals = build_profiles + mvn_goals print("[info] Building Spark (w/Hive 1.2.1) using Maven with these arguments: ", diff --git a/pom.xml b/pom.xml index 19775050dee7..8d2864b5b474 100644 --- a/pom.xml +++ b/pom.xml @@ -133,7 +133,7 @@ 2.2.0 10.12.1.1 - 1.10.1 + 1.11.0 1.5.5 nohive com.twitter diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index ba26b57567e6..69aaa6980e18 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -30,7 +30,8 @@ import org.apache.parquet.column.page.*; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.io.api.Binary; -import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; import org.apache.spark.sql.catalyst.util.DateTimeUtils; @@ -40,6 +41,8 @@ import org.apache.spark.sql.types.DecimalType; import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; +import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS; +import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MILLIS; import static org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.ValuesReaderIntIterator; import static org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.createRLEIterator; @@ -96,20 +99,23 @@ public class VectorizedColumnReader { private final PageReader pageReader; private final ColumnDescriptor descriptor; - private final OriginalType originalType; + private final LogicalTypeAnnotation logicalTypeAnnotation; // The timezone conversion to apply to int96 timestamps. Null if no conversion. private final TimeZone convertTz; + private final TimeZone sessionLocalTz; private static final TimeZone UTC = DateTimeUtils.TimeZoneUTC(); public VectorizedColumnReader( ColumnDescriptor descriptor, - OriginalType originalType, + LogicalTypeAnnotation logicalTypeAnnotation, PageReader pageReader, - TimeZone convertTz) throws IOException { + TimeZone convertTz, + TimeZone sessionLocalTz) throws IOException { this.descriptor = descriptor; this.pageReader = pageReader; this.convertTz = convertTz; - this.originalType = originalType; + this.sessionLocalTz = sessionLocalTz; + this.logicalTypeAnnotation = logicalTypeAnnotation; this.maxDefLevel = descriptor.getMaxDefinitionLevel(); DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); @@ -179,7 +185,7 @@ void readBatch(int total, WritableColumnVector column) throws IOException { if (column.hasDictionary() || (rowId == 0 && (typeName == PrimitiveType.PrimitiveTypeName.INT32 || (typeName == PrimitiveType.PrimitiveTypeName.INT64 && - originalType != OriginalType.TIMESTAMP_MILLIS) || + !(logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation)) || typeName == PrimitiveType.PrimitiveTypeName.FLOAT || typeName == PrimitiveType.PrimitiveTypeName.DOUBLE || typeName == PrimitiveType.PrimitiveTypeName.BINARY))) { @@ -287,17 +293,24 @@ private void decodeDictionaryIds( case INT64: if (column.dataType() == DataTypes.LongType || DecimalType.is64BitDecimalType(column.dataType()) || - originalType == OriginalType.TIMESTAMP_MICROS) { + isTimestampWithUnit(logicalTypeAnnotation, MICROS)) { for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { - column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i))); + long time = dictionary.decodeToLong(dictionaryIds.getDictId(i)); + if (needsTimezoneAdjustment()) { + time = DateTimeUtils.convertTz(time, sessionLocalTz, UTC); + } + column.putLong(i, time); } } - } else if (originalType == OriginalType.TIMESTAMP_MILLIS) { + } else if (isTimestampWithUnit(logicalTypeAnnotation, MILLIS)) { for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { - column.putLong(i, - DateTimeUtils.fromMillis(dictionary.decodeToLong(dictionaryIds.getDictId(i)))); + long time = DateTimeUtils.fromMillis(dictionary.decodeToLong(dictionaryIds.getDictId(i))); + if (needsTimezoneAdjustment()) { + time = DateTimeUtils.convertTz(time, sessionLocalTz, UTC); + } + column.putLong(i, time); } } } else { @@ -425,13 +438,29 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) thro // This is where we implement support for the valid type conversions. if (column.dataType() == DataTypes.LongType || DecimalType.is64BitDecimalType(column.dataType()) || - originalType == OriginalType.TIMESTAMP_MICROS) { - defColumn.readLongs( - num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); - } else if (originalType == OriginalType.TIMESTAMP_MILLIS) { + isTimestampWithUnit(logicalTypeAnnotation, MICROS)) { + if (needsTimezoneAdjustment()) { + for (int i = 0; i < num; i++) { + if (defColumn.readInteger() == maxDefLevel) { + long timestamp = dataColumn.readLong(); + timestamp = DateTimeUtils.convertTz(timestamp, sessionLocalTz, UTC); + column.putLong(rowId + i, timestamp); + } else { + column.putNull(rowId + i); + } + } + } else { + defColumn.readLongs( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } + } else if (isTimestampWithUnit(logicalTypeAnnotation, MILLIS)) { for (int i = 0; i < num; i++) { if (defColumn.readInteger() == maxDefLevel) { - column.putLong(rowId + i, DateTimeUtils.fromMillis(dataColumn.readLong())); + long timestamp = DateTimeUtils.fromMillis(dataColumn.readLong()); + if (needsTimezoneAdjustment()) { + timestamp = DateTimeUtils.convertTz(timestamp, sessionLocalTz, UTC); + } + column.putLong(rowId + i, timestamp); } else { column.putNull(rowId + i); } @@ -441,6 +470,18 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) thro } } + private boolean needsTimezoneAdjustment() { + return logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation && + !((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).isAdjustedToUTC(); + } + + private boolean isTimestampWithUnit( + LogicalTypeAnnotation logicalTypeAnnotation, + LogicalTypeAnnotation.TimeUnit timeUnit) { + return (logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation) && + ((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).getUnit() == timeUnit; + } + private void readFloatBatch(int rowId, int num, WritableColumnVector column) throws IOException { // This is where we implement support for the valid type conversions. // TODO: support implicit cast to double? diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index f02861355c40..6c72b7523435 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -88,6 +88,8 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa */ private TimeZone convertTz = null; + private TimeZone sessionLocalTz = null; + /** * columnBatch object that is used for batch decoding. This is created on first use and triggers * batched decoding. It is not valid to interleave calls to the batched interface with the row @@ -116,8 +118,9 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa */ private final MemoryMode MEMORY_MODE; - public VectorizedParquetRecordReader(TimeZone convertTz, boolean useOffHeap, int capacity) { + public VectorizedParquetRecordReader(TimeZone convertTz, TimeZone sessionLocalTz, boolean useOffHeap, int capacity) { this.convertTz = convertTz; + this.sessionLocalTz = sessionLocalTz; MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP; this.capacity = capacity; } @@ -308,8 +311,8 @@ private void checkEndOfRowGroup() throws IOException { columnReaders = new VectorizedColumnReader[columns.size()]; for (int i = 0; i < columns.size(); ++i) { if (missingColumns[i]) continue; - columnReaders[i] = new VectorizedColumnReader(columns.get(i), types.get(i).getOriginalType(), - pages.getPageReader(columns.get(i)), convertTz); + columnReaders[i] = new VectorizedColumnReader(columns.get(i), types.get(i).getLogicalTypeAnnotation(), + pages.getPageReader(columns.get(i)), convertTz, sessionLocalTz); } totalCountLoadedSoFar += pages.getRowCount(); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index efa4f3f166d9..991dcef45b93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -363,6 +363,8 @@ class ParquetFileFormat null) val sharedConf = broadcastedHadoopConf.value.value + val sessionLocalTz = DateTimeUtils.getTimeZone( + sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)) lazy val footerFileMetaData = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData @@ -370,7 +372,7 @@ class ParquetFileFormat val pushed = if (enableParquetFilterPushDown) { val parquetSchema = footerFileMetaData.getSchema val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal, - pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) + pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive, sessionLocalTz) filters // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` @@ -408,7 +410,10 @@ class ParquetFileFormat val taskContext = Option(TaskContext.get()) if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader( - convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity) + convertTz.orNull, + sessionLocalTz, + enableOffHeapColumnVector && taskContext.isDefined, + capacity) val iter = new RecordReaderIterator(vectorizedReader) // SPARK-23457 Register a task completion lister before `initialization`. taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) @@ -426,9 +431,10 @@ class ParquetFileFormat // ParquetRecordReader returns UnsafeRow val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) - new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), parquetFilter) + new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz, sessionLocalTz), + parquetFilter) } else { - new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz)) + new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz, sessionLocalTz)) } val iter = new RecordReaderIterator(reader) // SPARK-23457 Register a task completion lister before `initialization`. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 21ab9c78e53d..e66a4959c443 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -20,15 +20,16 @@ package org.apache.spark.sql.execution.datasources.parquet import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Long => JLong} import java.math.{BigDecimal => JBigDecimal} import java.sql.{Date, Timestamp} -import java.util.Locale +import java.util.{Locale, TimeZone} import scala.collection.JavaConverters.asScalaBufferConverter +import scala.language.existentials import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.io.api.Binary -import org.apache.parquet.schema.{DecimalMetadata, MessageType, OriginalType, PrimitiveComparator} -import org.apache.parquet.schema.OriginalType._ +import org.apache.parquet.schema._ +import org.apache.parquet.schema.LogicalTypeAnnotation._ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ @@ -46,7 +47,8 @@ private[parquet] class ParquetFilters( pushDownDecimal: Boolean, pushDownStartWith: Boolean, pushDownInFilterThreshold: Int, - caseSensitive: Boolean) { + caseSensitive: Boolean, + sessionLocalTz : TimeZone) { /** * Holds a single field information stored in the underlying parquet file. @@ -59,23 +61,25 @@ private[parquet] class ParquetFilters( fieldType: ParquetSchemaType) private case class ParquetSchemaType( - originalType: OriginalType, - primitiveTypeName: PrimitiveTypeName, - length: Int, - decimalMetadata: DecimalMetadata) - - private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, 0, null) - private val ParquetByteType = ParquetSchemaType(INT_8, INT32, 0, null) - private val ParquetShortType = ParquetSchemaType(INT_16, INT32, 0, null) - private val ParquetIntegerType = ParquetSchemaType(null, INT32, 0, null) - private val ParquetLongType = ParquetSchemaType(null, INT64, 0, null) - private val ParquetFloatType = ParquetSchemaType(null, FLOAT, 0, null) - private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, 0, null) - private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, 0, null) - private val ParquetBinaryType = ParquetSchemaType(null, BINARY, 0, null) - private val ParquetDateType = ParquetSchemaType(DATE, INT32, 0, null) - private val ParquetTimestampMicrosType = ParquetSchemaType(TIMESTAMP_MICROS, INT64, 0, null) - private val ParquetTimestampMillisType = ParquetSchemaType(TIMESTAMP_MILLIS, INT64, 0, null) + logicalType: LogicalTypeAnnotation, + logicalTypeClass: Class[_ <: LogicalTypeAnnotation], + primitiveTypeName: PrimitiveTypeName, + length: Int) + + private val ParquetBooleanType = ParquetSchemaType(null, null, BOOLEAN, 0) + private val ParquetByteType = ParquetSchemaType(intType(8, true), + classOf[IntLogicalTypeAnnotation], INT32, 0) + private val ParquetShortType = ParquetSchemaType(intType(16, true), + classOf[IntLogicalTypeAnnotation], INT32, 0) + private val ParquetIntegerType = ParquetSchemaType(null, null, INT32, 0) + private val ParquetLongType = ParquetSchemaType(null, null, INT64, 0) + private val ParquetFloatType = ParquetSchemaType(null, null, FLOAT, 0) + private val ParquetDoubleType = ParquetSchemaType(null, null, DOUBLE, 0) + private val ParquetStringType = ParquetSchemaType(stringType(), + classOf[StringLogicalTypeAnnotation], BINARY, 0) + private val ParquetBinaryType = ParquetSchemaType(null, null, BINARY, 0) + private val ParquetDateType = ParquetSchemaType(dateType(), + classOf[DateLogicalTypeAnnotation], INT32, 0) private def dateToDays(date: Date): SQLDate = { DateTimeUtils.fromJavaDate(date) @@ -100,6 +104,25 @@ private[parquet] class ParquetFilters( Binary.fromConstantByteArray(fixedLengthBytes, 0, numBytes) } + private def timestampValue(timestampType: TimestampLogicalTypeAnnotation, v: Any): JLong = + Option(v).map((timestampType.getUnit, timestampType.isAdjustedToUTC) match { + case (TimeUnit.MICROS, true) => + t => DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp]) + .asInstanceOf[JLong] + case (TimeUnit.MICROS, false) => + t => DateTimeUtils.convertTz( + DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp]), + DateTimeUtils.TimeZoneUTC, sessionLocalTz).asInstanceOf[JLong] + case (TimeUnit.MILLIS, true) => + _.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong] + case (TimeUnit.MILLIS, false) => + t => DateTimeUtils.toMillis(DateTimeUtils.convertTz( + DateTimeUtils.fromMillis(t.asInstanceOf[Timestamp].getTime), + DateTimeUtils.TimeZoneUTC, sessionLocalTz)).asInstanceOf[JLong] + case _ => throw new IllegalArgumentException(s"Unsupported timestamp type: " + + s"TIMESTAMP(${timestampType.getUnit}, ${timestampType.isAdjustedToUTC})") + }).orNull + private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { case ParquetBooleanType => (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[JBoolean]) @@ -127,28 +150,26 @@ private[parquet] class ParquetFilters( (n: String, v: Any) => FilterApi.eq( intColumn(n), Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull) - case ParquetTimestampMicrosType if pushDownTimestamp => - (n: String, v: Any) => FilterApi.eq( - longColumn(n), - Option(v).map(t => DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp]) - .asInstanceOf[JLong]).orNull) - case ParquetTimestampMillisType if pushDownTimestamp => - (n: String, v: Any) => FilterApi.eq( - longColumn(n), - Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull) - - case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => - (n: String, v: Any) => FilterApi.eq( - intColumn(n), - Option(v).map(d => decimalToInt32(d.asInstanceOf[JBigDecimal])).orNull) - case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal => + case ParquetSchemaType(logicalType, _class, INT64, _) if pushDownTimestamp && + _class == classOf[TimestampLogicalTypeAnnotation] => (n: String, v: Any) => FilterApi.eq( longColumn(n), - Option(v).map(d => decimalToInt64(d.asInstanceOf[JBigDecimal])).orNull) - case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal => - (n: String, v: Any) => FilterApi.eq( - binaryColumn(n), - Option(v).map(d => decimalToByteArray(d.asInstanceOf[JBigDecimal], length)).orNull) + timestampValue(logicalType.asInstanceOf[TimestampLogicalTypeAnnotation], v)) + case ParquetSchemaType(_, _class, INT32, _) if pushDownDecimal && + _class == classOf[DecimalLogicalTypeAnnotation] => + (n: String, v: Any) => FilterApi.eq( + intColumn(n), + Option(v).map(d => decimalToInt32(d.asInstanceOf[JBigDecimal])).orNull) + case ParquetSchemaType(_, _class, INT64, _) if pushDownDecimal && + _class == classOf[DecimalLogicalTypeAnnotation] => + (n: String, v: Any) => FilterApi.eq( + longColumn(n), + Option(v).map(d => decimalToInt64(d.asInstanceOf[JBigDecimal])).orNull) + case ParquetSchemaType(_, _class, FIXED_LEN_BYTE_ARRAY, length) if pushDownDecimal && + _class == classOf[DecimalLogicalTypeAnnotation] => + (n: String, v: Any) => FilterApi.eq( + binaryColumn(n), + Option(v).map(d => decimalToByteArray(d.asInstanceOf[JBigDecimal], length)).orNull) } private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { @@ -177,25 +198,23 @@ private[parquet] class ParquetFilters( (n: String, v: Any) => FilterApi.notEq( intColumn(n), Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull) - case ParquetTimestampMicrosType if pushDownTimestamp => + case ParquetSchemaType(logicalType, _class, INT64, _) if pushDownTimestamp && + _class == classOf[TimestampLogicalTypeAnnotation] => (n: String, v: Any) => FilterApi.notEq( - longColumn(n), - Option(v).map(t => DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp]) - .asInstanceOf[JLong]).orNull) - case ParquetTimestampMillisType if pushDownTimestamp => - (n: String, v: Any) => FilterApi.notEq( - longColumn(n), - Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull) - - case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => - (n: String, v: Any) => FilterApi.notEq( - intColumn(n), - Option(v).map(d => decimalToInt32(d.asInstanceOf[JBigDecimal])).orNull) - case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal => + longColumn(n), + timestampValue(logicalType.asInstanceOf[TimestampLogicalTypeAnnotation], v)) + case ParquetSchemaType(_, _class, INT32, _) if pushDownDecimal && + _class == classOf[DecimalLogicalTypeAnnotation] => + (n: String, v: Any) => FilterApi.notEq( + intColumn(n), + Option(v).map(d => decimalToInt32(d.asInstanceOf[JBigDecimal])).orNull) + case ParquetSchemaType(_, _class, INT64, _) if pushDownDecimal && + _class == classOf[DecimalLogicalTypeAnnotation] => (n: String, v: Any) => FilterApi.notEq( longColumn(n), Option(v).map(d => decimalToInt64(d.asInstanceOf[JBigDecimal])).orNull) - case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal => + case ParquetSchemaType(_, _class, FIXED_LEN_BYTE_ARRAY, length) if pushDownDecimal && + _class == classOf[DecimalLogicalTypeAnnotation] => (n: String, v: Any) => FilterApi.notEq( binaryColumn(n), Option(v).map(d => decimalToByteArray(d.asInstanceOf[JBigDecimal], length)).orNull) @@ -221,22 +240,22 @@ private[parquet] class ParquetFilters( case ParquetDateType if pushDownDate => (n: String, v: Any) => FilterApi.lt(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer]) - case ParquetTimestampMicrosType if pushDownTimestamp => - (n: String, v: Any) => FilterApi.lt( - longColumn(n), - DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) - case ParquetTimestampMillisType if pushDownTimestamp => + case ParquetSchemaType(logicalType, _class, INT64, _) if pushDownTimestamp && + _class == classOf[TimestampLogicalTypeAnnotation] => (n: String, v: Any) => FilterApi.lt( longColumn(n), - v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]) + timestampValue(logicalType.asInstanceOf[TimestampLogicalTypeAnnotation], v)) - case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => + case ParquetSchemaType(_, _class, INT32, _) if pushDownDecimal && + _class == classOf[DecimalLogicalTypeAnnotation] => (n: String, v: Any) => FilterApi.lt(intColumn(n), decimalToInt32(v.asInstanceOf[JBigDecimal])) - case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal => + case ParquetSchemaType(_, _class, INT64, _) if pushDownDecimal && + _class == classOf[DecimalLogicalTypeAnnotation] => (n: String, v: Any) => FilterApi.lt(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal])) - case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal => + case ParquetSchemaType(_, _class, FIXED_LEN_BYTE_ARRAY, length) if pushDownDecimal && + _class == classOf[DecimalLogicalTypeAnnotation] => (n: String, v: Any) => FilterApi.lt(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length)) } @@ -261,22 +280,21 @@ private[parquet] class ParquetFilters( case ParquetDateType if pushDownDate => (n: String, v: Any) => FilterApi.ltEq(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer]) - case ParquetTimestampMicrosType if pushDownTimestamp => + case ParquetSchemaType(logicalType, _class, INT64, _) if pushDownTimestamp && + _class == classOf[TimestampLogicalTypeAnnotation] => (n: String, v: Any) => FilterApi.ltEq( longColumn(n), - DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) - case ParquetTimestampMillisType if pushDownTimestamp => - (n: String, v: Any) => FilterApi.ltEq( - longColumn(n), - v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]) - - case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => + timestampValue(logicalType.asInstanceOf[TimestampLogicalTypeAnnotation], v)) + case ParquetSchemaType(_, _class, INT32, _) if pushDownDecimal && + _class == classOf[DecimalLogicalTypeAnnotation] => (n: String, v: Any) => FilterApi.ltEq(intColumn(n), decimalToInt32(v.asInstanceOf[JBigDecimal])) - case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal => + case ParquetSchemaType(_, _class, INT64, _) if pushDownDecimal && + _class == classOf[DecimalLogicalTypeAnnotation] => (n: String, v: Any) => FilterApi.ltEq(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal])) - case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal => + case ParquetSchemaType(_, _class, FIXED_LEN_BYTE_ARRAY, length) if pushDownDecimal && + _class == classOf[DecimalLogicalTypeAnnotation] => (n: String, v: Any) => FilterApi.ltEq(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length)) } @@ -301,22 +319,22 @@ private[parquet] class ParquetFilters( case ParquetDateType if pushDownDate => (n: String, v: Any) => FilterApi.gt(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer]) - case ParquetTimestampMicrosType if pushDownTimestamp => - (n: String, v: Any) => FilterApi.gt( - longColumn(n), - DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) - case ParquetTimestampMillisType if pushDownTimestamp => - (n: String, v: Any) => FilterApi.gt( - longColumn(n), - v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]) - - case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => + case ParquetSchemaType(logicalType, _class, INT64, _) if pushDownTimestamp && + _class == classOf[TimestampLogicalTypeAnnotation] => + (n: String, v: Any) => + FilterApi.gt( + longColumn(n), + timestampValue(logicalType.asInstanceOf[TimestampLogicalTypeAnnotation], v)) + case ParquetSchemaType(_, _class, INT32, _) if pushDownDecimal && + _class == classOf[DecimalLogicalTypeAnnotation] => (n: String, v: Any) => FilterApi.gt(intColumn(n), decimalToInt32(v.asInstanceOf[JBigDecimal])) - case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal => + case ParquetSchemaType(_, _class, INT64, _) if pushDownDecimal && + _class == classOf[DecimalLogicalTypeAnnotation] => (n: String, v: Any) => FilterApi.gt(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal])) - case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal => + case ParquetSchemaType(_, _class, FIXED_LEN_BYTE_ARRAY, length) if pushDownDecimal && + _class == classOf[DecimalLogicalTypeAnnotation] => (n: String, v: Any) => FilterApi.gt(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length)) } @@ -341,22 +359,22 @@ private[parquet] class ParquetFilters( case ParquetDateType if pushDownDate => (n: String, v: Any) => FilterApi.gtEq(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer]) - case ParquetTimestampMicrosType if pushDownTimestamp => - (n: String, v: Any) => FilterApi.gtEq( - longColumn(n), - DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) - case ParquetTimestampMillisType if pushDownTimestamp => - (n: String, v: Any) => FilterApi.gtEq( - longColumn(n), - v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]) - - case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => + case ParquetSchemaType(logicalType, _class, INT64, _) if pushDownTimestamp && + _class == classOf[TimestampLogicalTypeAnnotation] => + (n: String, v: Any) => + FilterApi.gtEq( + longColumn(n), + timestampValue(logicalType.asInstanceOf[TimestampLogicalTypeAnnotation], v)) + case ParquetSchemaType(_, _class, INT32, _) if pushDownDecimal && + _class == classOf[DecimalLogicalTypeAnnotation] => (n: String, v: Any) => FilterApi.gtEq(intColumn(n), decimalToInt32(v.asInstanceOf[JBigDecimal])) - case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal => + case ParquetSchemaType(_, _class, INT64, _) if pushDownDecimal && + _class == classOf[DecimalLogicalTypeAnnotation] => (n: String, v: Any) => FilterApi.gtEq(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal])) - case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal => + case ParquetSchemaType(_, _class, FIXED_LEN_BYTE_ARRAY, length) if pushDownDecimal && + _class == classOf[DecimalLogicalTypeAnnotation] => (n: String, v: Any) => FilterApi.gtEq(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length)) } @@ -371,8 +389,9 @@ private[parquet] class ParquetFilters( val primitiveFields = dataType.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f => f.getName -> ParquetField(f.getName, - ParquetSchemaType(f.getOriginalType, - f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata)) + ParquetSchemaType(f.getLogicalTypeAnnotation, + if (f.getLogicalTypeAnnotation == null) null else f.getLogicalTypeAnnotation.getClass, + f.getPrimitiveTypeName, f.getTypeLength)) } if (caseSensitive) { primitiveFields.toMap @@ -412,9 +431,9 @@ private[parquet] class ParquetFilters( canPartialPushDownConjuncts: Boolean): Option[FilterPredicate] = { // Decimal type must make sure that filter value's scale matched the file. // If doesn't matched, which would cause data corruption. - def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match { + def isDecimalMatched(value: Any, dec: DecimalLogicalTypeAnnotation): Boolean = value match { case decimal: JBigDecimal => - decimal.scale == decimalMeta.getScale + decimal.scale == dec.getScale case _ => false } @@ -430,14 +449,16 @@ private[parquet] class ParquetFilters( case ParquetStringType => value.isInstanceOf[String] case ParquetBinaryType => value.isInstanceOf[Array[Byte]] case ParquetDateType => value.isInstanceOf[Date] - case ParquetTimestampMicrosType | ParquetTimestampMillisType => - value.isInstanceOf[Timestamp] - case ParquetSchemaType(DECIMAL, INT32, _, decimalMeta) => - isDecimalMatched(value, decimalMeta) - case ParquetSchemaType(DECIMAL, INT64, _, decimalMeta) => - isDecimalMatched(value, decimalMeta) - case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, _, decimalMeta) => - isDecimalMatched(value, decimalMeta) + case ParquetSchemaType(_, _class, INT64, _) + if _class == classOf[TimestampLogicalTypeAnnotation] => + value.isInstanceOf[Timestamp] + case ParquetSchemaType(decimal, _class, INT32, _) + if _class == classOf[DecimalLogicalTypeAnnotation] => + isDecimalMatched(value, decimal.asInstanceOf[DecimalLogicalTypeAnnotation]) + case ParquetSchemaType(decimal, _class, INT64, _) => + isDecimalMatched(value, decimal.asInstanceOf[DecimalLogicalTypeAnnotation]) + case ParquetSchemaType(decimal, _class, FIXED_LEN_BYTE_ARRAY, _) => + isDecimalMatched(value, decimal.asInstanceOf[DecimalLogicalTypeAnnotation]) case _ => false }) } @@ -570,7 +591,7 @@ private[parquet] class ParquetFilters( } override def keep(value: Binary): Boolean = { - UTF8String.fromBytes(value.getBytes).startsWith( + value != null && UTF8String.fromBytes(value.getBytes).startsWith( UTF8String.fromBytes(strToBinary.getBytes)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index 3319e73f2b31..7873f533793a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.api.{InitContext, ReadSupport} import org.apache.parquet.hadoop.api.ReadSupport.ReadContext import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema._ +import org.apache.parquet.schema.LogicalTypeAnnotation.{listType, ListLogicalTypeAnnotation} import org.apache.parquet.schema.Type.Repetition import org.apache.spark.internal.Logging @@ -49,7 +50,9 @@ import org.apache.spark.sql.types._ * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]] * to [[prepareForRead()]], but use a private `var` for simplicity. */ -private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) +private[parquet] class ParquetReadSupport( + val convertTz: Option[TimeZone], + val sessionLocalTz: TimeZone) extends ReadSupport[UnsafeRow] with Logging { private var catalystRequestedSchema: StructType = _ @@ -57,7 +60,7 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) // We need a zero-arg constructor for SpecificParquetRecordReaderBase. But that is only // used in the vectorized reader, where we get the convertTz value directly, and the value here // is ignored. - this(None) + this(None, null) } /** @@ -107,7 +110,8 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) parquetRequestedSchema, ParquetReadSupport.expandUDT(catalystRequestedSchema), new ParquetToSparkSchemaConverter(conf), - convertTz) + convertTz, + sessionLocalTz) } } @@ -183,11 +187,12 @@ private[parquet] object ParquetReadSupport { // Unannotated repeated group should be interpreted as required list of required element, so // list element type is just the group itself. Clip it. - if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) { + if (parquetList.getLogicalTypeAnnotation == null + && parquetList.isRepetition(Repetition.REPEATED)) { clipParquetType(parquetList, elementType, caseSensitive) } else { assert( - parquetList.getOriginalType == OriginalType.LIST, + parquetList.getLogicalTypeAnnotation.isInstanceOf[ListLogicalTypeAnnotation], "Invalid Parquet schema. " + "Original type of annotated Parquet lists must be LIST: " + parquetList.toString) @@ -215,7 +220,7 @@ private[parquet] object ParquetReadSupport { ) { Types .buildGroup(parquetList.getRepetition) - .as(OriginalType.LIST) + .as(listType()) .addField(clipParquetType(repeatedGroup, elementType, caseSensitive)) .named(parquetList.getName) } else { @@ -223,7 +228,7 @@ private[parquet] object ParquetReadSupport { // repetition. Types .buildGroup(parquetList.getRepetition) - .as(OriginalType.LIST) + .as(listType()) .addField( Types .repeatedGroup() @@ -254,14 +259,14 @@ private[parquet] object ParquetReadSupport { val clippedRepeatedGroup = Types .repeatedGroup() - .as(repeatedGroup.getOriginalType) + .as(repeatedGroup.getLogicalTypeAnnotation) .addField(clipParquetType(parquetKeyType, keyType, caseSensitive)) .addField(clipParquetType(parquetValueType, valueType, caseSensitive)) .named(repeatedGroup.getName) Types .buildGroup(parquetMap.getRepetition) - .as(parquetMap.getOriginalType) + .as(parquetMap.getLogicalTypeAnnotation) .addField(clippedRepeatedGroup) .named(parquetMap.getName) } @@ -279,7 +284,7 @@ private[parquet] object ParquetReadSupport { val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType, caseSensitive) Types .buildGroup(parquetRecord.getRepetition) - .as(parquetRecord.getOriginalType) + .as(parquetRecord.getLogicalTypeAnnotation) .addFields(clippedParquetFields: _*) .named(parquetRecord.getName) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala index b2459dd0e8bb..4941de5b118b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala @@ -36,11 +36,13 @@ private[parquet] class ParquetRecordMaterializer( parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetToSparkSchemaConverter, - convertTz: Option[TimeZone]) + convertTz: Option[TimeZone], + sessionLocalTz: TimeZone) extends RecordMaterializer[UnsafeRow] { private val rootConverter = - new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, convertTz, NoopUpdater) + new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, convertTz, + sessionLocalTz, NoopUpdater) override def getCurrentRecord: UnsafeRow = rootConverter.currentRecord diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 004a96d13413..613d53c2d636 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -26,9 +26,9 @@ import scala.collection.mutable.ArrayBuffer import org.apache.parquet.column.Dictionary import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} -import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, Type} -import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8} -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64, INT96} +import org.apache.parquet.schema.{GroupType, LogicalTypeAnnotation, MessageType, Type} +import org.apache.parquet.schema.LogicalTypeAnnotation.{TimeUnit, _} +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow @@ -106,10 +106,10 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd * 5 converters will be created: * * - a root [[ParquetRowConverter]] for [[MessageType]] `root`, which contains: - * - a [[ParquetPrimitiveConverter]] for required [[INT_32]] field `f1`, and + * - a [[ParquetPrimitiveConverter]] for required [[intType()]] field `f1`, and * - a nested [[ParquetRowConverter]] for optional [[GroupType]] `f2`, which contains: * - a [[ParquetPrimitiveConverter]] for required [[DOUBLE]] field `f21`, and - * - a [[ParquetStringConverter]] for optional [[UTF8]] string field `f22` + * - a [[ParquetStringConverter]] for optional [[stringType()]] string field `f22` * * When used as a root converter, [[NoopUpdater]] should be used since root converters don't have * any "parent" container. @@ -126,6 +126,7 @@ private[parquet] class ParquetRowConverter( parquetType: GroupType, catalystType: StructType, convertTz: Option[TimeZone], + sessionLocalTz: TimeZone, updater: ParentContainerUpdater) extends ParquetGroupConverter(updater) with Logging { @@ -209,6 +210,12 @@ private[parquet] class ParquetRowConverter( } } + def isTimestampWithUnit(parquetType: Type, timeUnit: LogicalTypeAnnotation.TimeUnit): Boolean = { + val logicalType = parquetType.getLogicalTypeAnnotation + logicalType.isInstanceOf[TimestampLogicalTypeAnnotation] && + logicalType.asInstanceOf[TimestampLogicalTypeAnnotation].getUnit == timeUnit + } + /** * Creates a converter for the given Parquet type `parquetType` and Spark SQL data type * `catalystType`. Converted values are handled by `updater`. @@ -257,17 +264,25 @@ private[parquet] class ParquetRowConverter( case StringType => new ParquetStringConverter(updater) - case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MICROS => + case TimestampType if isTimestampWithUnit(parquetType, TimeUnit.MICROS) => new ParquetPrimitiveConverter(updater) { override def addLong(value: Long): Unit = { - updater.setLong(value) + val utc = parquetType.getLogicalTypeAnnotation + .asInstanceOf[TimestampLogicalTypeAnnotation].isAdjustedToUTC + val adjTime = if (utc) value else DateTimeUtils.convertTz(value, sessionLocalTz, UTC) + updater.setLong(adjTime.asInstanceOf[Long]) } } - case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS => + case TimestampType if isTimestampWithUnit(parquetType, TimeUnit.MILLIS) => new ParquetPrimitiveConverter(updater) { override def addLong(value: Long): Unit = { - updater.setLong(DateTimeUtils.fromMillis(value)) + val utc = parquetType.getLogicalTypeAnnotation + .asInstanceOf[TimestampLogicalTypeAnnotation].isAdjustedToUTC + val rawTime = DateTimeUtils.fromMillis(value) + val adjTime = if (utc) rawTime else DateTimeUtils.convertTz(rawTime, + sessionLocalTz, UTC) + updater.setLong(adjTime.asInstanceOf[Long]) } } @@ -301,7 +316,8 @@ private[parquet] class ParquetRowConverter( // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor // annotated by `LIST` or `MAP` should be interpreted as a required list of required // elements where the element type is the type of the field. - case t: ArrayType if parquetType.getOriginalType != LIST => + case t: ArrayType if + !parquetType.getLogicalTypeAnnotation.isInstanceOf[ListLogicalTypeAnnotation] => if (parquetType.isPrimitive) { new RepeatedPrimitiveConverter(parquetType, t.elementType, updater) } else { @@ -316,7 +332,12 @@ private[parquet] class ParquetRowConverter( case t: StructType => new ParquetRowConverter( - schemaConverter, parquetType.asGroupType(), t, convertTz, new ParentContainerUpdater { + schemaConverter, + parquetType.asGroupType(), + t, + convertTz, + sessionLocalTz, + new ParentContainerUpdater { override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) }) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index 8ce8a86d2f02..e821ce399e56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.parquet.schema._ -import org.apache.parquet.schema.OriginalType._ +import org.apache.parquet.schema.LogicalTypeAnnotation._ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.parquet.schema.Type.Repetition._ @@ -92,10 +92,10 @@ class ParquetToSparkSchemaConverter( private def convertPrimitiveField(field: PrimitiveType): DataType = { val typeName = field.getPrimitiveTypeName - val originalType = field.getOriginalType + val logicalTypeAnnotation = field.getLogicalTypeAnnotation def typeString = - if (originalType == null) s"$typeName" else s"$typeName ($originalType)" + if (logicalTypeAnnotation == null) s"$typeName" else s"$typeName ($logicalTypeAnnotation)" def typeNotSupported() = throw new AnalysisException(s"Parquet type not supported: $typeString") @@ -110,8 +110,9 @@ class ParquetToSparkSchemaConverter( // specified in field.getDecimalMetadata. This is useful when interpreting decimal types stored // as binaries with variable lengths. def makeDecimalType(maxPrecision: Int = -1): DecimalType = { - val precision = field.getDecimalMetadata.getPrecision - val scale = field.getDecimalMetadata.getScale + val decimalType = field.getLogicalTypeAnnotation.asInstanceOf[DecimalLogicalTypeAnnotation] + val precision = decimalType.getPrecision + val scale = decimalType.getScale ParquetSchemaConverter.checkConversionRequirement( maxPrecision == -1 || 1 <= precision && precision <= maxPrecision, @@ -128,26 +129,27 @@ class ParquetToSparkSchemaConverter( case DOUBLE => DoubleType case INT32 => - originalType match { - case INT_8 => ByteType - case INT_16 => ShortType - case INT_32 | null => IntegerType - case DATE => DateType - case DECIMAL => makeDecimalType(Decimal.MAX_INT_DIGITS) - case UINT_8 => typeNotSupported() - case UINT_16 => typeNotSupported() - case UINT_32 => typeNotSupported() - case TIME_MILLIS => typeNotImplemented() + logicalTypeAnnotation match { + case i: IntLogicalTypeAnnotation => + if (!i.isSigned) typeNotSupported() else i.getBitWidth match { + case 8 => ByteType + case 16 => ShortType + case 32 => IntegerType + } + case null => IntegerType + case _: DateLogicalTypeAnnotation => DateType + case _: DecimalLogicalTypeAnnotation => makeDecimalType(Decimal.MAX_INT_DIGITS) + case _: TimeLogicalTypeAnnotation => typeNotImplemented() case _ => illegalType() } case INT64 => - originalType match { - case INT_64 | null => LongType - case DECIMAL => makeDecimalType(Decimal.MAX_LONG_DIGITS) - case UINT_64 => typeNotSupported() - case TIMESTAMP_MICROS => TimestampType - case TIMESTAMP_MILLIS => TimestampType + logicalTypeAnnotation match { + case i: IntLogicalTypeAnnotation => + if (!i.isSigned) typeNotSupported() else LongType + case null => LongType + case _: DecimalLogicalTypeAnnotation => makeDecimalType(Decimal.MAX_LONG_DIGITS) + case _: TimestampLogicalTypeAnnotation => TimestampType case _ => illegalType() } @@ -159,19 +161,21 @@ class ParquetToSparkSchemaConverter( TimestampType case BINARY => - originalType match { - case UTF8 | ENUM | JSON => StringType + logicalTypeAnnotation match { + case _: StringLogicalTypeAnnotation | + _: EnumLogicalTypeAnnotation | _: JsonLogicalTypeAnnotation => StringType case null if assumeBinaryIsString => StringType case null => BinaryType - case BSON => BinaryType - case DECIMAL => makeDecimalType() + case _: BsonLogicalTypeAnnotation => BinaryType + case _: DecimalLogicalTypeAnnotation => makeDecimalType() case _ => illegalType() } case FIXED_LEN_BYTE_ARRAY => - originalType match { - case DECIMAL => makeDecimalType(Decimal.maxPrecisionForBytes(field.getTypeLength)) - case INTERVAL => typeNotImplemented() + logicalTypeAnnotation match { + case _: DecimalLogicalTypeAnnotation => + makeDecimalType(Decimal.maxPrecisionForBytes(field.getTypeLength)) + case _: IntervalLogicalTypeAnnotation => typeNotImplemented() case _ => illegalType() } @@ -180,7 +184,7 @@ class ParquetToSparkSchemaConverter( } private def convertGroupField(field: GroupType): DataType = { - Option(field.getOriginalType).fold(convert(field): DataType) { + Option(field.getLogicalTypeAnnotation).fold(convert(field): DataType) { // A Parquet list is represented as a 3-level structure: // // group (LIST) { @@ -194,7 +198,7 @@ class ParquetToSparkSchemaConverter( // we need to check whether the 2nd level or the 3rd level refers to list element type. // // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists - case LIST => + case _: ListLogicalTypeAnnotation => ParquetSchemaConverter.checkConversionRequirement( field.getFieldCount == 1, s"Invalid list type $field") @@ -214,7 +218,7 @@ class ParquetToSparkSchemaConverter( // `MAP_KEY_VALUE` is for backwards-compatibility // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1 // scalastyle:on - case MAP | MAP_KEY_VALUE => + case _: MapLogicalTypeAnnotation | _: MapKeyValueTypeAnnotation => ParquetSchemaConverter.checkConversionRequirement( field.getFieldCount == 1 && !field.getType(0).isPrimitive, s"Invalid map type: $field") @@ -346,10 +350,10 @@ class SparkToParquetSchemaConverter( Types.primitive(BOOLEAN, repetition).named(field.name) case ByteType => - Types.primitive(INT32, repetition).as(INT_8).named(field.name) + Types.primitive(INT32, repetition).as(intType(8, true)).named(field.name) case ShortType => - Types.primitive(INT32, repetition).as(INT_16).named(field.name) + Types.primitive(INT32, repetition).as(intType(16, true)).named(field.name) case IntegerType => Types.primitive(INT32, repetition).named(field.name) @@ -364,10 +368,10 @@ class SparkToParquetSchemaConverter( Types.primitive(DOUBLE, repetition).named(field.name) case StringType => - Types.primitive(BINARY, repetition).as(UTF8).named(field.name) + Types.primitive(BINARY, repetition).as(stringType()).named(field.name) case DateType => - Types.primitive(INT32, repetition).as(DATE).named(field.name) + Types.primitive(INT32, repetition).as(dateType()).named(field.name) // NOTE: Spark SQL can write timestamp values to Parquet using INT96, TIMESTAMP_MICROS or // TIMESTAMP_MILLIS. TIMESTAMP_MICROS is recommended but INT96 is the default to keep the @@ -388,9 +392,11 @@ class SparkToParquetSchemaConverter( case SQLConf.ParquetOutputTimestampType.INT96 => Types.primitive(INT96, repetition).named(field.name) case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS => - Types.primitive(INT64, repetition).as(TIMESTAMP_MICROS).named(field.name) + Types.primitive(INT64, repetition).as(timestampType(true, TimeUnit.MICROS)) + .named(field.name) case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS => - Types.primitive(INT64, repetition).as(TIMESTAMP_MILLIS).named(field.name) + Types.primitive(INT64, repetition).as(timestampType(true, TimeUnit.MILLIS)) + .named(field.name) } case BinaryType => @@ -407,9 +413,7 @@ class SparkToParquetSchemaConverter( case DecimalType.Fixed(precision, scale) if writeLegacyParquetFormat => Types .primitive(FIXED_LEN_BYTE_ARRAY, repetition) - .as(DECIMAL) - .precision(precision) - .scale(scale) + .as(decimalType(scale, precision)) .length(Decimal.minBytesForPrecision(precision)) .named(field.name) @@ -422,9 +426,7 @@ class SparkToParquetSchemaConverter( if precision <= Decimal.MAX_INT_DIGITS && !writeLegacyParquetFormat => Types .primitive(INT32, repetition) - .as(DECIMAL) - .precision(precision) - .scale(scale) + .as(decimalType(scale, precision)) .named(field.name) // Uses INT64 for 1 <= precision <= 18 @@ -432,18 +434,14 @@ class SparkToParquetSchemaConverter( if precision <= Decimal.MAX_LONG_DIGITS && !writeLegacyParquetFormat => Types .primitive(INT64, repetition) - .as(DECIMAL) - .precision(precision) - .scale(scale) + .as(decimalType(scale, precision)) .named(field.name) // Uses FIXED_LEN_BYTE_ARRAY for all other precisions case DecimalType.Fixed(precision, scale) if !writeLegacyParquetFormat => Types .primitive(FIXED_LEN_BYTE_ARRAY, repetition) - .as(DECIMAL) - .precision(precision) - .scale(scale) + .as(decimalType(scale, precision)) .length(Decimal.minBytesForPrecision(precision)) .named(field.name) @@ -468,7 +466,7 @@ class SparkToParquetSchemaConverter( // `array` as its element name as below. Therefore, we build manually // the correct group type here via the builder. (See SPARK-16777) Types - .buildGroup(repetition).as(LIST) + .buildGroup(repetition).as(listType()) .addField(Types .buildGroup(REPEATED) // "array" is the name chosen by parquet-hive (1.7.0 and prior version) @@ -486,7 +484,7 @@ class SparkToParquetSchemaConverter( // Here too, we should not use `listOfElements`. (See SPARK-16777) Types - .buildGroup(repetition).as(LIST) + .buildGroup(repetition).as(listType()) // "array" is the name chosen by parquet-avro (1.7.0 and prior version) .addField(convertField(StructField("array", elementType, nullable), REPEATED)) .named(field.name) @@ -517,7 +515,7 @@ class SparkToParquetSchemaConverter( // } // } Types - .buildGroup(repetition).as(LIST) + .buildGroup(repetition).as(listType()) .addField( Types.repeatedGroup() .addField(convertField(StructField("element", elementType, containsNull))) @@ -532,7 +530,7 @@ class SparkToParquetSchemaConverter( // } // } Types - .buildGroup(repetition).as(MAP) + .buildGroup(repetition).as(mapType()) .addField( Types .repeatedGroup() diff --git a/sql/core/src/test/resources/test-data/timestamp_dictionary.parq b/sql/core/src/test/resources/test-data/timestamp_dictionary.parq new file mode 100644 index 000000000000..cef7a4f5d4c6 Binary files /dev/null and b/sql/core/src/test/resources/test-data/timestamp_dictionary.parq differ diff --git a/sql/core/src/test/resources/test-data/timestamp_dictionary.txt b/sql/core/src/test/resources/test-data/timestamp_dictionary.txt new file mode 100644 index 000000000000..bbbc002d1155 --- /dev/null +++ b/sql/core/src/test/resources/test-data/timestamp_dictionary.txt @@ -0,0 +1,4 @@ +-144674181;1965-06-01 05:43:39.000;1965-06-01 12:43:39.000;1965-06-01 05:43:39.000;1965-06-01 12:43:39.000 +0;1969-12-31 16:00:00.000;1970-01-01 00:00:00.000;1969-12-31 16:00:00.000;1970-01-01 00:00:00.000 +0;1969-12-31 16:00:00.000;1970-01-01 00:00:00.000;1969-12-31 16:00:00.000;1970-01-01 00:00:00.000 +0;1969-12-31 16:00:00.000;1970-01-01 00:00:00.000;1969-12-31 16:00:00.000;1970-01-01 00:00:00.000 diff --git a/sql/core/src/test/resources/test-data/timestamp_plain.parq b/sql/core/src/test/resources/test-data/timestamp_plain.parq new file mode 100644 index 000000000000..e262a0cdbbdb Binary files /dev/null and b/sql/core/src/test/resources/test-data/timestamp_plain.parq differ diff --git a/sql/core/src/test/resources/test-data/timestamp_plain.txt b/sql/core/src/test/resources/test-data/timestamp_plain.txt new file mode 100644 index 000000000000..e439d6745565 --- /dev/null +++ b/sql/core/src/test/resources/test-data/timestamp_plain.txt @@ -0,0 +1,2 @@ +-144674181;1965-06-01 05:43:39.000;1965-06-01 12:43:39.000;1965-06-01 05:43:39.000;1965-06-01 12:43:39.000 +0;1969-12-31 16:00:00.000;1970-01-01 00:00:00.000;1969-12-31 16:00:00.000;1970-01-01 00:00:00.000 diff --git a/sql/core/src/test/resources/test-data/timestamp_pushdown.parq b/sql/core/src/test/resources/test-data/timestamp_pushdown.parq new file mode 100644 index 000000000000..a7e72ad7663b Binary files /dev/null and b/sql/core/src/test/resources/test-data/timestamp_pushdown.parq differ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala index bd2470ee2066..0a0e0e144d53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.benchmark import java.io.File +import java.util.TimeZone import scala.collection.JavaConverters._ import scala.util.Random @@ -165,7 +166,7 @@ object DataSourceReadBenchmark extends BenchmarkBase with SQLHelper { files.map(_.asInstanceOf[String]).foreach { p => val reader = new VectorizedParquetRecordReader( - null, enableOffHeapColumnVector, vectorizedReaderBatchSize) + null, TimeZone.getDefault, enableOffHeapColumnVector, vectorizedReaderBatchSize) try { reader.initialize(p, ("id" :: Nil).asJava) val batch = reader.resultBatch() @@ -199,7 +200,7 @@ object DataSourceReadBenchmark extends BenchmarkBase with SQLHelper { files.map(_.asInstanceOf[String]).foreach { p => val reader = new VectorizedParquetRecordReader( - null, enableOffHeapColumnVector, vectorizedReaderBatchSize) + null, TimeZone.getDefault, enableOffHeapColumnVector, vectorizedReaderBatchSize) try { reader.initialize(p, ("id" :: Nil).asJava) val batch = reader.resultBatch() @@ -454,7 +455,7 @@ object DataSourceReadBenchmark extends BenchmarkBase with SQLHelper { var sum = 0 files.map(_.asInstanceOf[String]).foreach { p => val reader = new VectorizedParquetRecordReader( - null, enableOffHeapColumnVector, vectorizedReaderBatchSize) + null, TimeZone.getDefault, enableOffHeapColumnVector, vectorizedReaderBatchSize) try { reader.initialize(p, ("c1" :: "c2" :: Nil).asJava) val batch = reader.resultBatch() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 0a1141c050ec..8f10b2cf105c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -506,7 +506,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { case plan: InMemoryRelation => plan }.head // InMemoryRelation's stats is file size before the underlying RDD is materialized - assert(inMemoryRelation.computeStats().sizeInBytes === 868) + assert(inMemoryRelation.computeStats().sizeInBytes === 894) // InMemoryRelation's stats is updated after materializing RDD dfFromFile.collect() @@ -519,7 +519,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { // Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's stats // is calculated - assert(inMemoryRelation2.computeStats().sizeInBytes === 868) + assert(inMemoryRelation2.computeStats().sizeInBytes === 894) // InMemoryRelation's stats should be updated after calculating stats of the table // clear cache to simulate a fresh environment diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala index 6e08ee3c4ba3..da653dabf95c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala @@ -45,7 +45,7 @@ class HadoopFsRelationSuite extends QueryTest with SharedSQLContext { import testImplicits._ Seq(1.0, 0.5).foreach { compressionFactor => withSQLConf("spark.sql.sources.fileCompressionFactor" -> compressionFactor.toString, - "spark.sql.autoBroadcastJoinThreshold" -> "434") { + "spark.sql.autoBroadcastJoinThreshold" -> "544") { withTempPath { workDir => // the file size is 740 bytes val workDirPath = workDir.getAbsolutePath diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala index db73bfa149aa..d3aefed87c4e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution.datasources.parquet +import java.util.TimeZone + import scala.collection.JavaConverters._ import org.apache.parquet.hadoop.ParquetOutputFormat @@ -42,7 +44,10 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex val conf = sqlContext.conf val reader = new VectorizedParquetRecordReader( - null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) + null, + TimeZone.getDefault, + conf.offHeapColumnVectorEnabled, + conf.parquetVectorizedReaderBatchSize) reader.initialize(file.asInstanceOf[String], null) val batch = reader.resultBatch() assert(reader.nextBatch()) @@ -69,7 +74,10 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex val conf = sqlContext.conf val reader = new VectorizedParquetRecordReader( - null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) + null, + TimeZone.getDefault, + conf.offHeapColumnVectorEnabled, + conf.parquetVectorizedReaderBatchSize) reader.initialize(file.asInstanceOf[String], null) val batch = reader.resultBatch() assert(reader.nextBatch()) @@ -100,7 +108,10 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex val conf = sqlContext.conf val reader = new VectorizedParquetRecordReader( - null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) + null, + TimeZone.getDefault, + conf.offHeapColumnVectorEnabled, + conf.parquetVectorizedReaderBatchSize) reader.initialize(file, null /* set columns to null to project all columns */) val column = reader.resultBatch().column(0) assert(reader.nextBatch()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 255f7db8d135..cc5fdbaceb60 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -62,7 +63,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex private lazy val parquetFilters = new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, - conf.parquetFilterPushDownInFilterThreshold, conf.caseSensitiveAnalysis) + conf.parquetFilterPushDownInFilterThreshold, conf.caseSensitiveAnalysis, + DateTimeUtils.defaultTimeZone()) override def beforeEach(): Unit = { super.beforeEach() @@ -1177,7 +1179,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex def createParquetFilter(caseSensitive: Boolean): ParquetFilters = { new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, - conf.parquetFilterPushDownInFilterThreshold, caseSensitive) + conf.parquetFilterPushDownInFilterThreshold, caseSensitive, DateTimeUtils.defaultTimeZone()) } val caseSensitiveParquetFilters = createParquetFilter(caseSensitive = true) val caseInsensitiveParquetFilters = createParquetFilter(caseSensitive = false) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 6b05b9c0f720..6e3b8b76e784 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.util.Locale +import java.util.{Locale, TimeZone} import scala.collection.JavaConverters._ import scala.collection.mutable @@ -658,7 +658,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { { val conf = sqlContext.conf val reader = new VectorizedParquetRecordReader( - null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) + null, TimeZone.getDefault, conf.offHeapColumnVectorEnabled, + conf.parquetVectorizedReaderBatchSize) try { reader.initialize(file, null) val result = mutable.ArrayBuffer.empty[(Int, String)] @@ -677,7 +678,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { { val conf = sqlContext.conf val reader = new VectorizedParquetRecordReader( - null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) + null, TimeZone.getDefault, conf.offHeapColumnVectorEnabled, + conf.parquetVectorizedReaderBatchSize) try { reader.initialize(file, ("_2" :: Nil).asJava) val result = mutable.ArrayBuffer.empty[(String)] @@ -695,7 +697,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { { val conf = sqlContext.conf val reader = new VectorizedParquetRecordReader( - null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) + null, TimeZone.getDefault, conf.offHeapColumnVectorEnabled, + conf.parquetVectorizedReaderBatchSize) try { reader.initialize(file, ("_2" :: "_1" :: Nil).asJava) val result = mutable.ArrayBuffer.empty[(String, Int)] @@ -714,7 +717,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { { val conf = sqlContext.conf val reader = new VectorizedParquetRecordReader( - null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) + null, TimeZone.getDefault, conf.offHeapColumnVectorEnabled, + conf.parquetVectorizedReaderBatchSize) try { reader.initialize(file, List[String]().asJava) var result = 0 @@ -755,7 +759,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { val schema = StructType(StructField("pcol", dt) :: Nil) val conf = sqlContext.conf val vectorizedReader = new VectorizedParquetRecordReader( - null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) + null, TimeZone.getDefault, conf.offHeapColumnVectorEnabled, + conf.parquetVectorizedReaderBatchSize) val partitionValues = new GenericInternalRow(Array(v)) val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala index f06e1867151e..33d6e343dc9e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.File +import java.util.TimeZone import scala.language.existentials @@ -27,10 +28,11 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName -import org.apache.spark.sql.Row +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedSQLContext { test("parquet files with different physical schemas but share the same logical schema") { @@ -98,7 +100,7 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS } } - test("parquet timestamp conversion") { + test("parquet Impala timestamp conversion") { // Make a table with one parquet file written by impala, and one parquet file written by spark. // We should only adjust the timestamps in the impala file, and only if the conf is set val impalaFile = "test-data/impala_timestamp.parq" @@ -204,4 +206,125 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS } } } + + test("parquet timestamp read path") { + Seq("timestamp_plain", "timestamp_dictionary").foreach({ file => + val timestampPath = Thread.currentThread() + .getContextClassLoader.getResource("test-data/" + file + ".parq").toURI.getPath + val expectedPath = Thread.currentThread() + .getContextClassLoader.getResource("test-data/" + file + ".txt").toURI.getPath + + val schema = StructType(Array( + StructField("rawValue", LongType, false), + StructField("millisUtc", TimestampType, false), + StructField("millisNonUtc", TimestampType, false), + StructField("microsUtc", TimestampType, false), + StructField("microsNonUtc", TimestampType, false))) + + withTempPath {tableDir => + val textValues = spark.read + .schema(schema) + .option("inferSchema", false) + .option("header", false) + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSS") + .option("delimiter", ";").csv(expectedPath).collect + val timestamps = textValues.map( + row => (row.getLong(0), + row.getTimestamp(1), + row.getTimestamp(2), + row.getTimestamp(3), + row.getTimestamp(4) + ) + ) + FileUtils.copyFile(new File(timestampPath), new File(tableDir, "part-00001.parq")) + + Seq(false, true).foreach { vectorized => + withSQLConf( + (SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized.toString()) + ) { + val readBack = spark.read.parquet(tableDir.getAbsolutePath).collect + + val expected = timestamps.map(_.toString).sorted + val actual = readBack.map( + row => (row.getLong(0), + row.getTimestamp(1), + row.getTimestamp(2), + row.getTimestamp(3), + row.getTimestamp(4) + ) + ).map(_.toString).sorted + assert(readBack.size === expected.size) + withClue(s"vectorized = $vectorized, file = $file") { + assert(actual === expected) + } + } + } + } + }) + } + + // predicates to test for + // s"${column._2} > to_timestamp('2017-10-29 00:45:00.0')" + // s"${column._2} >= to_timestamp('2017-10-29 00:45:00.0')" + // s"${column._2} != to_timestamp('1970-01-01 00:00:55.0')" + test("parquet timestamp predicate pushdown") { + val timestampPath = Thread.currentThread() + .getContextClassLoader.getResource("test-data/timestamp_pushdown.parq").toURI.getPath + + def verifyPredicate(dataFrame: DataFrame, column: String, + item: String, predicate: String, vectorized: Boolean): Unit = { + val filter = s"$column $predicate to_timestamp('$item')" + withSQLConf( + (SQLConf.PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED.key, "true") + ) { + val withPushdown = dataFrame.where(filter).collect + withSQLConf( + (SQLConf.PARQUET_FILTER_PUSHDOWN_TIMESTAMP_ENABLED.key, "false") + ) { + val withoutPushdown = dataFrame.where(filter).collect + withClue(s"vectorized = $vectorized, column = ${column}, item = $item") { + assert(withPushdown === withoutPushdown) + } + } + } + } + + def withTimeZone(timeZone: String)(f: => Unit): Unit = { + val tz = TimeZone.getDefault + TimeZone.setDefault(TimeZone.getTimeZone(timeZone)) + try f finally TimeZone.setDefault(tz) + } + + withTempPath { tableDir => + FileUtils.copyFile(new File(timestampPath), new File(tableDir, "part-00001.parq")) + + Seq("America/Los_Angeles", "Australia/Perth").foreach({ timeZone => + withTimeZone(timeZone) { + Seq(false, true).foreach { vectorized => + withSQLConf( + (SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized.toString()) + ) { + val losAngeles = spark.read.parquet(tableDir.getAbsolutePath).select("inLosAngeles") + .collect.map(_.getString(0)) + val utc = spark.read.parquet(tableDir.getAbsolutePath).select("inUtc") + .collect.map(_.getString(0)) + val singapore = spark.read.parquet(tableDir.getAbsolutePath).select("inPerth") + .collect.map(_.getString(0)) + Seq(losAngeles, utc, singapore).foreach(values => values.foreach(item => + Seq("millisUtc", "millisNonUtc", "microsUtc", "microsNonUtc").foreach(column => { + val dataFrame = spark.read.parquet(tableDir.getAbsolutePath).select(column) + verifyPredicate(dataFrame, column, item, "=", vectorized) + verifyPredicate(dataFrame, column, item, "!=", vectorized) + verifyPredicate(dataFrame, column, item, ">", vectorized) + verifyPredicate(dataFrame, column, item, ">=", vectorized) + verifyPredicate(dataFrame, column, item, "<", vectorized) + verifyPredicate(dataFrame, column, item, "<=", vectorized) + }) + )) + } + } + } + }) + } + } }