Skip to content

Commit 4659468

Browse files
adrian-wangliancheng
authored andcommitted
[SPARK-4985] [SQL] parquet support for date type
This PR might have some issues with apache#3732 , and this would have merge conflicts with apache#3820 so the review can be delayed till that 2 were merged. Author: Daoyuan Wang <[email protected]> Closes apache#3822 from adrian-wang/parquetdate and squashes the following commits: 2c5d54d [Daoyuan Wang] add a test case faef887 [Daoyuan Wang] parquet support for primitive date 97e9080 [Daoyuan Wang] parquet support for date type
1 parent 2bf40c5 commit 4659468

File tree

5 files changed

+35
-1
lines changed

5 files changed

+35
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,12 @@ private[sql] object CatalystConverter {
127127
parent.updateByte(fieldIndex, value.asInstanceOf[ByteType.JvmType])
128128
}
129129
}
130+
case DateType => {
131+
new CatalystPrimitiveConverter(parent, fieldIndex) {
132+
override def addInt(value: Int): Unit =
133+
parent.updateDate(fieldIndex, value.asInstanceOf[DateType.JvmType])
134+
}
135+
}
130136
case d: DecimalType => {
131137
new CatalystPrimitiveConverter(parent, fieldIndex) {
132138
override def addBinary(value: Binary): Unit =
@@ -192,6 +198,9 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
192198
protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
193199
updateField(fieldIndex, value)
194200

201+
protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit =
202+
updateField(fieldIndex, value)
203+
195204
protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
196205
updateField(fieldIndex, value)
197206

@@ -388,6 +397,9 @@ private[parquet] class CatalystPrimitiveRowConverter(
388397
override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
389398
current.setInt(fieldIndex, value)
390399

400+
override protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit =
401+
current.update(fieldIndex, value)
402+
391403
override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
392404
current.setLong(fieldIndex, value)
393405

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
212212
case DoubleType => writer.addDouble(value.asInstanceOf[Double])
213213
case FloatType => writer.addFloat(value.asInstanceOf[Float])
214214
case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean])
215+
case DateType => writer.addInteger(value.asInstanceOf[Int])
215216
case d: DecimalType =>
216217
if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
217218
sys.error(s"Unsupported datatype $d, cannot write to consumer")
@@ -358,6 +359,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
358359
case DoubleType => writer.addDouble(record.getDouble(index))
359360
case FloatType => writer.addFloat(record.getFloat(index))
360361
case BooleanType => writer.addBoolean(record.getBoolean(index))
362+
case DateType => writer.addInteger(record.getInt(index))
361363
case TimestampType => writeTimestamp(record(index).asInstanceOf[java.sql.Timestamp])
362364
case d: DecimalType =>
363365
if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
6464
case ParquetPrimitiveTypeName.BOOLEAN => BooleanType
6565
case ParquetPrimitiveTypeName.DOUBLE => DoubleType
6666
case ParquetPrimitiveTypeName.FLOAT => FloatType
67+
case ParquetPrimitiveTypeName.INT32
68+
if originalType == ParquetOriginalType.DATE => DateType
6769
case ParquetPrimitiveTypeName.INT32 => IntegerType
6870
case ParquetPrimitiveTypeName.INT64 => LongType
6971
case ParquetPrimitiveTypeName.INT96 if int96AsTimestamp => TimestampType
@@ -222,6 +224,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
222224
// There is no type for Byte or Short so we promote them to INT32.
223225
case ShortType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
224226
case ByteType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
227+
case DateType => Some(ParquetTypeInfo(
228+
ParquetPrimitiveTypeName.INT32, Some(ParquetOriginalType.DATE)))
225229
case LongType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT64))
226230
case TimestampType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT96))
227231
case DecimalType.Fixed(precision, scale) if precision <= 18 =>

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,21 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
135135
}
136136
}
137137

138+
test("date type") {
139+
def makeDateRDD(): DataFrame =
140+
sparkContext
141+
.parallelize(0 to 1000)
142+
.map(i => Tuple1(DateUtils.toJavaDate(i)))
143+
.toDF()
144+
.select($"_1")
145+
146+
withTempPath { dir =>
147+
val data = makeDateRDD()
148+
data.saveAsParquetFile(dir.getCanonicalPath)
149+
checkAnswer(parquetFile(dir.getCanonicalPath), data.collect().toSeq)
150+
}
151+
}
152+
138153
test("map") {
139154
val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i")))
140155
checkParquetFile(data)

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,15 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
5757
|}
5858
""".stripMargin)
5959

60-
testSchema[(Byte, Short, Int, Long)](
60+
testSchema[(Byte, Short, Int, Long, java.sql.Date)](
6161
"logical integral types",
6262
"""
6363
|message root {
6464
| required int32 _1 (INT_8);
6565
| required int32 _2 (INT_16);
6666
| required int32 _3 (INT_32);
6767
| required int64 _4 (INT_64);
68+
| optional int32 _5 (DATE);
6869
|}
6970
""".stripMargin)
7071

0 commit comments

Comments
 (0)