Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ public InternalRow copy() {
row.setInt(i, getInt(i));
} else if (dt instanceof TimestampType) {
row.setLong(i, getLong(i));
} else if (dt instanceof CalendarIntervalType) {
row.update(i, getInterval(i));
} else {
throw new RuntimeException("Not implemented. " + dt);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public static void populate(WritableColumnVector col, InternalRow row, int field
} else if (t instanceof CalendarIntervalType) {
CalendarInterval c = (CalendarInterval)row.get(fieldIdx, t);
col.getChild(0).putInts(0, capacity, c.months);
col.getChild(1).putLongs(0, capacity, c.microseconds);
col.getChild(1).putInts(0, capacity, c.days);
col.getChild(2).putLongs(0, capacity, c.microseconds);
} else if (t instanceof DateType) {
col.putInts(0, capacity, row.getInt(fieldIdx));
} else if (t instanceof TimestampType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, Tex
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{CalendarIntervalType, StructField, StructType}
import org.apache.spark.sql.types.{CalendarIntervalType, DataType, StructField, StructType}
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -503,10 +503,8 @@ case class DataSource(
outputColumnNames: Seq[String],
physicalPlan: SparkPlan): BaseRelation = {
val outputColumns = DataWritingCommand.logicalPlanOutputWithNames(data, outputColumnNames)
if (outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do this change after we officially make CalendarIntervalType public. i.e. move it to a public package.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering what's the relation between this PR and opening CalendarIntervalType? An INTERVAL column could appear as the result of subtraction of 2 datetime columns, and an user may want to store it into fs.

Copy link
Contributor

@cloud-fan cloud-fan Nov 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interval type is kind of an internal type for now. It's a big decision if we can read/write it from/to data sources.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and Python and R needs a proper conversion for both to read and write as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are Python and R involved into read/write in parquet?

Copy link
Member

@HyukjinKwon HyukjinKwon Nov 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance, if Scala API saves interval types:

df.write.parquet("...")

and Python reads it.

spark.read.parquet("...").collect()

There's no way to map it in Python side via collect. In case of Date type, it's mapped to date.date instance in Python.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We gotta make it all supported before exposing it all related interval ones (see #25022 (comment))

throw new AnalysisException("Cannot save interval data type into external storage.")
}

checkUnsupportedTypes(outputColumns.map(_.dataType))
providingInstance() match {
case dataSource: CreatableRelationProvider =>
dataSource.createRelation(
Expand Down Expand Up @@ -540,9 +538,7 @@ case class DataSource(
* Returns a logical plan to write the given [[LogicalPlan]] out to this [[DataSource]].
*/
def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
}
checkUnsupportedTypes(data.schema.map(_.dataType))

providingInstance() match {
case dataSource: CreatableRelationProvider =>
Expand Down Expand Up @@ -574,6 +570,15 @@ case class DataSource(
DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, hadoopConf,
checkEmptyGlobPath, checkFilesExist)
}


private def checkUnsupportedTypes(dataTypes: Seq[DataType]): Unit = {
if (providingClass != classOf[ParquetFileFormat]) {
if (dataTypes.exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
}
}
}
}

object DataSource extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ class ParquetFileFormat
}

override def supportDataType(dataType: DataType): Boolean = dataType match {
case _: AtomicType => true
case _: AtomicType | _: CalendarIntervalType => true

case st: StructType => st.forall { f => supportDataType(f.dataType) }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

/**
* A [[ParentContainerUpdater]] is used by a Parquet converter to set converted values to some
Expand Down Expand Up @@ -323,6 +324,23 @@ private[parquet] class ParquetRowConverter(
override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
})

case CalendarIntervalType
if parquetType.asPrimitiveType().getPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY =>
new ParquetPrimitiveConverter(updater) {
override def addBinary(value: Binary): Unit = {
assert(
value.length() == 12,
"Intervals are expected to be stored in 12-byte fixed len byte array, " +
s"but got a ${value.length()}-byte array.")

val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
val microseconds = buf.getInt * MICROS_PER_MILLIS
val days = buf.getInt
val months = buf.getInt
updater.set(new CalendarInterval(months, days, microseconds))
}
}

case t =>
throw new RuntimeException(
s"Unable to create Parquet converter for data type ${t.json} " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class ParquetToSparkSchemaConverter(
case FIXED_LEN_BYTE_ARRAY =>
originalType match {
case DECIMAL => makeDecimalType(Decimal.maxPrecisionForBytes(field.getTypeLength))
case INTERVAL => typeNotImplemented()
case INTERVAL => CalendarIntervalType
case _ => illegalType()
}

Expand Down Expand Up @@ -553,6 +553,11 @@ class SparkToParquetSchemaConverter(
case udt: UserDefinedType[_] =>
convertField(field.copy(dataType = udt.sqlType))

case i: CalendarIntervalType =>
Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition).length(12)
.as(INTERVAL)
.named(field.name)

case _ =>
throw new AnalysisException(s"Unsupported data type ${field.dataType.catalogString}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -71,7 +72,8 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
private var outputTimestampType: SQLConf.ParquetOutputTimestampType.Value = _

// Reusable byte array used to write timestamps as Parquet INT96 values
private val timestampBuffer = new Array[Byte](12)
// or intervals as Parquet FIXED_LEN_BYTE_ARRAY values
private val reusableBuffer = new Array[Byte](12)

// Reusable byte array used to write decimal values
private val decimalBuffer =
Expand Down Expand Up @@ -173,9 +175,9 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
case SQLConf.ParquetOutputTimestampType.INT96 =>
(row: SpecializedGetters, ordinal: Int) =>
val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal))
val buf = ByteBuffer.wrap(timestampBuffer)
val buf = ByteBuffer.wrap(reusableBuffer)
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
recordConsumer.addBinary(Binary.fromReusedByteArray(reusableBuffer))

case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS =>
(row: SpecializedGetters, ordinal: Int) =>
Expand Down Expand Up @@ -207,7 +209,16 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {

case t: UserDefinedType[_] => makeWriter(t.sqlType)

// TODO Adds IntervalType support
case CalendarIntervalType =>
(row: SpecializedGetters, ordinal: Int) =>
val interval = row.getInterval(ordinal)
val buf = ByteBuffer.wrap(reusableBuffer)
buf.order(ByteOrder.LITTLE_ENDIAN)
.putInt(Math.toIntExact(interval.microseconds / MICROS_PER_MILLIS))
.putInt(interval.days)
.putInt(interval.months)
recordConsumer.addBinary(Binary.fromReusedByteArray(reusableBuffer))

case _ => sys.error(s"Unsupported data type $dataType.")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ case class ParquetTable(

case udt: UserDefinedType[_] => supportsDataType(udt.sqlType)

case _: CalendarIntervalType => true

case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,13 +330,13 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession {
}
}

test("SPARK-24204 error handling for unsupported Interval data types - csv, json, parquet, orc") {
test("SPARK-24204 error handling for unsupported Interval data types - csv, json, orc") {
withTempDir { dir =>
val tempDir = new File(dir, "files").getCanonicalPath
// TODO: test file source V2 after write path is fixed.
Seq(true).foreach { useV1 =>
val useV1List = if (useV1) {
"csv,json,orc,parquet"
"csv,json,orc"
} else {
""
}
Expand All @@ -349,15 +349,15 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession {

withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
// write path
Seq("csv", "json", "parquet", "orc").foreach { format =>
Seq("csv", "json", "orc").foreach { format =>
val msg = intercept[AnalysisException] {
sql("select interval 1 days").write.format(format).mode("overwrite").save(tempDir)
}.getMessage
validateErrorMessage(msg)
}

// read path
Seq("parquet", "csv").foreach { format =>
Seq("csv").foreach { format =>
var msg = intercept[AnalysisException] {
val schema = StructType(StructField("a", CalendarIntervalType, true) :: Nil)
spark.range(1).write.format(format).mode("overwrite").save(tempDir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils}
import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -114,12 +114,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
| required fixed_len_byte_array(32) i(DECIMAL(32,0));
| required int64 j(TIMESTAMP_MILLIS);
| required int64 k(TIMESTAMP_MICROS);
| required fixed_len_byte_array(12) l(INTERVAL);
|}
""".stripMargin)

val expectedSparkTypes = Seq(ByteType, ShortType, DateType, DecimalType(1, 0),
DecimalType(10, 0), StringType, StringType, DecimalType(32, 0), DecimalType(32, 0),
TimestampType, TimestampType)
TimestampType, TimestampType, CalendarIntervalType)

withTempPath { location =>
val path = new Path(location.getCanonicalPath)
Expand Down Expand Up @@ -735,7 +736,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession

val dataTypes =
Seq(StringType, BooleanType, ByteType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)
FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType, CalendarIntervalType)

val constantValues =
Seq(
Expand All @@ -749,7 +750,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
0.75D,
Decimal("1234.23456"),
DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")),
DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")))
DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")),
IntervalUtils.safeStringToInterval(
UTF8String.fromString("interval 1 month 2 microsecond")))

dataTypes.zip(constantValues).foreach { case (dt, v) =>
val schema = StructType(StructField("pcol", dt) :: Nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,32 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
testMigration(fromTsType = "INT96", toTsType = "TIMESTAMP_MICROS")
testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96")
}

test("interval written and read as Parquet INTERVAL") {
withTempPath { file =>
val df = Seq(
"interval 0 seconds",
"interval 1 month 1 millisecond",
"interval -1 month -1 millisecond",
"interval 1 year 2 month 3 weeks 4 days 5 hours 6 minutes 7 second 8 millisecond",
"interval -1 year -2 month -3 weeks -4 days -5 hours -6 minutes -7 second -8 millisecond",
"interval 3650000 days",
"interval -3650000 days",
"interval 9999 years 12 months 1 millisecond",
"interval 9999 years 12 months 23 hours 59 minutes 59 seconds 999 milliseconds",
"interval -9999 years -12 months -23 hours -59 minutes -59 seconds -999 milliseconds",
"interval 1000 months 1000 days 10000000 microseconds",
"").toDF("intervalStr")
.selectExpr("CAST(intervalStr AS interval) AS i")
df.write.parquet(file.getCanonicalPath)
("true" :: "false" :: Nil).foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
val df2 = spark.read.parquet(file.getCanonicalPath)
checkAnswer(df2, df.collect().toSeq)
}
}
}
}
}

class ParquetV1QuerySuite extends ParquetQuerySuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,17 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
writeLegacyParquetFormat = true,
outputTimestampType = SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS)

testSchema(
"Interval written and read as fixed_len_byte_array(12) with INTERVAL",
StructType(Seq(StructField("f1", CalendarIntervalType))),
"""message root {
| optional fixed_len_byte_array(12) f1 (INTERVAL);
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = false,
writeLegacyParquetFormat = true)

private def testSchemaClipping(
testName: String,
parquetSchema: String,
Expand Down