diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java index a2feac869ece..46c717fe9d5a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java @@ -173,6 +173,8 @@ public InternalRow copy() { row.setInt(i, getInt(i)); } else if (dt instanceof TimestampType) { row.setLong(i, getLong(i)); + } else if (dt instanceof CalendarIntervalType) { + row.update(i, getInterval(i)); } else { throw new RuntimeException("Not implemented. " + dt); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java index bce6aa28c42a..c3beed079c69 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java @@ -89,7 +89,8 @@ public static void populate(WritableColumnVector col, InternalRow row, int field } else if (t instanceof CalendarIntervalType) { CalendarInterval c = (CalendarInterval)row.get(fieldIdx, t); col.getChild(0).putInts(0, capacity, c.months); - col.getChild(1).putLongs(0, capacity, c.microseconds); + col.getChild(1).putInts(0, capacity, c.days); + col.getChild(2).putLongs(0, capacity, c.microseconds); } else if (t instanceof DateType) { col.putInts(0, capacity, row.getInt(fieldIdx)); } else if (t instanceof TimestampType) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 3615afcf86c7..03e0883852e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -48,7 +48,7 @@ import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, Tex import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming.OutputMode -import org.apache.spark.sql.types.{CalendarIntervalType, StructField, StructType} +import org.apache.spark.sql.types.{CalendarIntervalType, DataType, StructField, StructType} import org.apache.spark.sql.util.SchemaUtils import org.apache.spark.util.Utils @@ -503,10 +503,8 @@ case class DataSource( outputColumnNames: Seq[String], physicalPlan: SparkPlan): BaseRelation = { val outputColumns = DataWritingCommand.logicalPlanOutputWithNames(data, outputColumnNames) - if (outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { - throw new AnalysisException("Cannot save interval data type into external storage.") - } + checkUnsupportedTypes(outputColumns.map(_.dataType)) providingInstance() match { case dataSource: CreatableRelationProvider => dataSource.createRelation( @@ -540,9 +538,7 @@ case class DataSource( * Returns a logical plan to write the given [[LogicalPlan]] out to this [[DataSource]]. */ def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = { - if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { - throw new AnalysisException("Cannot save interval data type into external storage.") - } + checkUnsupportedTypes(data.schema.map(_.dataType)) providingInstance() match { case dataSource: CreatableRelationProvider => @@ -574,6 +570,15 @@ case class DataSource( DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, hadoopConf, checkEmptyGlobPath, checkFilesExist) } + + + private def checkUnsupportedTypes(dataTypes: Seq[DataType]): Unit = { + if (providingClass != classOf[ParquetFileFormat]) { + if (dataTypes.exists(_.isInstanceOf[CalendarIntervalType])) { + throw new AnalysisException("Cannot save interval data type into external storage.") + } + } + } } object DataSource extends Logging { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index f52aaf0140e1..b0097ae4cbd2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -355,7 +355,7 @@ class ParquetFileFormat } override def supportDataType(dataType: DataType): Boolean = dataType match { - case _: AtomicType => true + case _: AtomicType | _: CalendarIntervalType => true case st: StructType => st.forall { f => supportDataType(f.dataType) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index ff5c724375c3..5811bb122e7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -34,9 +34,10 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} /** * A [[ParentContainerUpdater]] is used by a Parquet converter to set converted values to some @@ -323,6 +324,23 @@ private[parquet] class ParquetRowConverter( override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) }) + case CalendarIntervalType + if parquetType.asPrimitiveType().getPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY => + new ParquetPrimitiveConverter(updater) { + override def addBinary(value: Binary): Unit = { + assert( + value.length() == 12, + "Intervals are expected to be stored in 12-byte fixed len byte array, " + + s"but got a ${value.length()}-byte array.") + + val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) + val microseconds = buf.getInt * MICROS_PER_MILLIS + val days = buf.getInt + val months = buf.getInt + updater.set(new CalendarInterval(months, days, microseconds)) + } + } + case t => throw new RuntimeException( s"Unable to create Parquet converter for data type ${t.json} " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index 8ce8a86d2f02..782304d8fb6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -171,7 +171,7 @@ class ParquetToSparkSchemaConverter( case FIXED_LEN_BYTE_ARRAY => originalType match { case DECIMAL => makeDecimalType(Decimal.maxPrecisionForBytes(field.getTypeLength)) - case INTERVAL => typeNotImplemented() + case INTERVAL => CalendarIntervalType case _ => illegalType() } @@ -553,6 +553,11 @@ class SparkToParquetSchemaConverter( case udt: UserDefinedType[_] => convertField(field.copy(dataType = udt.sqlType)) + case i: CalendarIntervalType => + Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition).length(12) + .as(INTERVAL) + .named(field.name) + case _ => throw new AnalysisException(s"Unsupported data type ${field.dataType.catalogString}") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index f6490614ab05..40e1a76ec6da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -34,6 +34,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters +import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -71,7 +72,8 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { private var outputTimestampType: SQLConf.ParquetOutputTimestampType.Value = _ // Reusable byte array used to write timestamps as Parquet INT96 values - private val timestampBuffer = new Array[Byte](12) + // or intervals as Parquet FIXED_LEN_BYTE_ARRAY values + private val reusableBuffer = new Array[Byte](12) // Reusable byte array used to write decimal values private val decimalBuffer = @@ -173,9 +175,9 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { case SQLConf.ParquetOutputTimestampType.INT96 => (row: SpecializedGetters, ordinal: Int) => val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal)) - val buf = ByteBuffer.wrap(timestampBuffer) + val buf = ByteBuffer.wrap(reusableBuffer) buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay) - recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer)) + recordConsumer.addBinary(Binary.fromReusedByteArray(reusableBuffer)) case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS => (row: SpecializedGetters, ordinal: Int) => @@ -207,7 +209,16 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { case t: UserDefinedType[_] => makeWriter(t.sqlType) - // TODO Adds IntervalType support + case CalendarIntervalType => + (row: SpecializedGetters, ordinal: Int) => + val interval = row.getInterval(ordinal) + val buf = ByteBuffer.wrap(reusableBuffer) + buf.order(ByteOrder.LITTLE_ENDIAN) + .putInt(Math.toIntExact(interval.microseconds / MICROS_PER_MILLIS)) + .putInt(interval.days) + .putInt(interval.months) + recordConsumer.addBinary(Binary.fromReusedByteArray(reusableBuffer)) + case _ => sys.error(s"Unsupported data type $dataType.") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala index 2ad64b1aa524..cc91d0ac3e6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala @@ -58,6 +58,8 @@ case class ParquetTable( case udt: UserDefinedType[_] => supportsDataType(udt.sqlType) + case _: CalendarIntervalType => true + case _ => false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index a7f3e81904de..0029421ee802 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -330,13 +330,13 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession { } } - test("SPARK-24204 error handling for unsupported Interval data types - csv, json, parquet, orc") { + test("SPARK-24204 error handling for unsupported Interval data types - csv, json, orc") { withTempDir { dir => val tempDir = new File(dir, "files").getCanonicalPath // TODO: test file source V2 after write path is fixed. Seq(true).foreach { useV1 => val useV1List = if (useV1) { - "csv,json,orc,parquet" + "csv,json,orc" } else { "" } @@ -349,7 +349,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) { // write path - Seq("csv", "json", "parquet", "orc").foreach { format => + Seq("csv", "json", "orc").foreach { format => val msg = intercept[AnalysisException] { sql("select interval 1 days").write.format(format).mode("overwrite").save(tempDir) }.getMessage @@ -357,7 +357,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession { } // read path - Seq("parquet", "csv").foreach { format => + Seq("csv").foreach { format => var msg = intercept[AnalysisException] { val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil) spark.range(1).write.format(format).mode("overwrite").save(tempDir) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 39590b063f0a..36e406302c24 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -43,7 +43,7 @@ import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils} import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -114,12 +114,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession | required fixed_len_byte_array(32) i(DECIMAL(32,0)); | required int64 j(TIMESTAMP_MILLIS); | required int64 k(TIMESTAMP_MICROS); + | required fixed_len_byte_array(12) l(INTERVAL); |} """.stripMargin) val expectedSparkTypes = Seq(ByteType, ShortType, DateType, DecimalType(1, 0), DecimalType(10, 0), StringType, StringType, DecimalType(32, 0), DecimalType(32, 0), - TimestampType, TimestampType) + TimestampType, TimestampType, CalendarIntervalType) withTempPath { location => val path = new Path(location.getCanonicalPath) @@ -735,7 +736,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession val dataTypes = Seq(StringType, BooleanType, ByteType, ShortType, IntegerType, LongType, - FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType) + FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType, CalendarIntervalType) val constantValues = Seq( @@ -749,7 +750,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession 0.75D, Decimal("1234.23456"), DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")), - DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"))) + DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")), + IntervalUtils.safeStringToInterval( + UTF8String.fromString("interval 1 month 2 microsecond"))) dataTypes.zip(constantValues).foreach { case (dt, v) => val schema = StructType(StructField("pcol", dt) :: Nil) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 61e93a7acd3f..2776ce5e6a71 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -903,6 +903,32 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS testMigration(fromTsType = "INT96", toTsType = "TIMESTAMP_MICROS") testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") } + + test("interval written and read as Parquet INTERVAL") { + withTempPath { file => + val df = Seq( + "interval 0 seconds", + "interval 1 month 1 millisecond", + "interval -1 month -1 millisecond", + "interval 1 year 2 month 3 weeks 4 days 5 hours 6 minutes 7 second 8 millisecond", + "interval -1 year -2 month -3 weeks -4 days -5 hours -6 minutes -7 second -8 millisecond", + "interval 3650000 days", + "interval -3650000 days", + "interval 9999 years 12 months 1 millisecond", + "interval 9999 years 12 months 23 hours 59 minutes 59 seconds 999 milliseconds", + "interval -9999 years -12 months -23 hours -59 minutes -59 seconds -999 milliseconds", + "interval 1000 months 1000 days 10000000 microseconds", + "").toDF("intervalStr") + .selectExpr("CAST(intervalStr AS interval) AS i") + df.write.parquet(file.getCanonicalPath) + ("true" :: "false" :: Nil).foreach { vectorized => + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { + val df2 = spark.read.parquet(file.getCanonicalPath) + checkAnswer(df2, df.collect().toSeq) + } + } + } + } } class ParquetV1QuerySuite extends ParquetQuerySuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 1274995fd677..efea4e8cf168 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -1010,6 +1010,17 @@ class ParquetSchemaSuite extends ParquetSchemaTest { writeLegacyParquetFormat = true, outputTimestampType = SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS) + testSchema( + "Interval written and read as fixed_len_byte_array(12) with INTERVAL", + StructType(Seq(StructField("f1", CalendarIntervalType))), + """message root { + | optional fixed_len_byte_array(12) f1 (INTERVAL); + |} + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = false, + writeLegacyParquetFormat = true) + private def testSchemaClipping( testName: String, parquetSchema: String,