Skip to content

Commit 93f438d

Browse files
committed
parquet timestamp support
1 parent eccb9fb commit 93f438d

File tree

11 files changed

+241
-26
lines changed

11 files changed

+241
-26
lines changed

docs/sql-programming-guide.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,15 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or
580580
flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.
581581
</td>
582582
</tr>
583+
<tr>
584+
<td><code>spark.sql.parquet.int96AsTimestamp</code></td>
585+
<td>true</td>
586+
<td>
587+
Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. Spark would also
588+
store Timestamp as INT96 because we need to avoid precision lost of the nanoseconds field. This
589+
flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.
590+
</td>
591+
</tr>
583592
<tr>
584593
<td><code>spark.sql.parquet.cacheMetadata</code></td>
585594
<td>true</td>

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@
149149
<scala.binary.version>2.10</scala.binary.version>
150150
<jline.version>${scala.version}</jline.version>
151151
<jline.groupid>org.scala-lang</jline.groupid>
152+
<jodd.version>3.6.3</jodd.version>
152153
<codehaus.jackson.version>1.8.8</codehaus.jackson.version>
153154
<snappy.version>1.1.1.6</snappy.version>
154155

sql/core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@
6666
<artifactId>jackson-databind</artifactId>
6767
<version>2.3.0</version>
6868
</dependency>
69+
<dependency>
70+
<groupId>org.jodd</groupId>
71+
<artifactId>jodd-core</artifactId>
72+
<version>${jodd.version}</version>
73+
</dependency>
6974
<dependency>
7075
<groupId>junit</groupId>
7176
<artifactId>junit</artifactId>

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ private[spark] object SQLConf {
3333
val DIALECT = "spark.sql.dialect"
3434

3535
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
36+
val PARQUET_INT96_AS_TIMESTAMP = "spark.sql.parquet.int96AsTimestamp"
3637
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
3738
val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
3839
val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
@@ -140,6 +141,12 @@ private[sql] class SQLConf extends Serializable {
140141
private[spark] def isParquetBinaryAsString: Boolean =
141142
getConf(PARQUET_BINARY_AS_STRING, "false").toBoolean
142143

144+
/**
145+
* When set to true, we always treat INT96Values in Parquet files as timestamp.
146+
*/
147+
private[spark] def isParquetINT96AsTimestamp: Boolean =
148+
getConf(PARQUET_INT96_AS_TIMESTAMP, "true").toBoolean
149+
143150
/**
144151
* When set to true, partition pruning for in-memory columnar tables is enabled.
145152
*/

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,20 @@
1717

1818
package org.apache.spark.sql.parquet
1919

20+
import java.sql.Timestamp
21+
import java.util.{TimeZone, Calendar}
22+
2023
import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap}
2124

25+
import jodd.datetime.JDateTime
2226
import parquet.column.Dictionary
2327
import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter}
2428
import parquet.schema.MessageType
2529

2630
import org.apache.spark.sql.catalyst.expressions._
2731
import org.apache.spark.sql.parquet.CatalystConverter.FieldType
2832
import org.apache.spark.sql.types._
33+
import org.apache.spark.sql.parquet.timestamp.NanoTime
2934

3035
/**
3136
* Collection of converters of Parquet types (group and primitive types) that
@@ -123,6 +128,12 @@ private[sql] object CatalystConverter {
123128
parent.updateDecimal(fieldIndex, value, d)
124129
}
125130
}
131+
case TimestampType => {
132+
new CatalystPrimitiveConverter(parent, fieldIndex) {
133+
override def addBinary(value: Binary): Unit =
134+
parent.updateTimestamp(fieldIndex, value)
135+
}
136+
}
126137
// All other primitive types use the default converter
127138
case ctype: PrimitiveType => { // note: need the type tag here!
128139
new CatalystPrimitiveConverter(parent, fieldIndex)
@@ -197,9 +208,11 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
197208
protected[parquet] def updateString(fieldIndex: Int, value: String): Unit =
198209
updateField(fieldIndex, value)
199210

200-
protected[parquet] def updateDecimal(fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {
211+
protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit =
212+
updateField(fieldIndex, readTimestamp(value))
213+
214+
protected[parquet] def updateDecimal(fieldIndex: Int, value: Binary, ctype: DecimalType): Unit =
201215
updateField(fieldIndex, readDecimal(new Decimal(), value, ctype))
202-
}
203216

204217
protected[parquet] def isRootConverter: Boolean = parent == null
205218

@@ -232,6 +245,13 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
232245
unscaled = (unscaled << (64 - numBits)) >> (64 - numBits)
233246
dest.set(unscaled, precision, scale)
234247
}
248+
249+
/**
250+
* Read a Timestamp value from a Parquet Int96Value
251+
*/
252+
protected[parquet] def readTimestamp(value: Binary): Timestamp = {
253+
CatalystTimestampConverter.convertToTimestamp(value)
254+
}
235255
}
236256

237257
/**
@@ -384,6 +404,9 @@ private[parquet] class CatalystPrimitiveRowConverter(
384404
override protected[parquet] def updateString(fieldIndex: Int, value: String): Unit =
385405
current.setString(fieldIndex, value)
386406

407+
override protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit =
408+
current.update(fieldIndex, readTimestamp(value))
409+
387410
override protected[parquet] def updateDecimal(
388411
fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {
389412
var decimal = current(fieldIndex).asInstanceOf[Decimal]
@@ -454,6 +477,75 @@ private[parquet] object CatalystArrayConverter {
454477
val INITIAL_ARRAY_SIZE = 20
455478
}
456479

480+
private[parquet] object CatalystTimestampConverter {
481+
// TODO most part of this comes from Hive-0.14
482+
// Hive code might have some issues, so we need to keep an eye on it.
483+
// Also we use NanoTime and Int96Values from parquet-examples.
484+
// We utilize jodd to convert between NanoTime and Timestamp
485+
val parquetTsCalendar = new ThreadLocal[Calendar]
486+
def getCalendar = {
487+
// this is a cache for the calendar instance.
488+
if (parquetTsCalendar.get == null) {
489+
parquetTsCalendar.set(Calendar.getInstance(TimeZone.getTimeZone("GMT")))
490+
}
491+
parquetTsCalendar.get
492+
}
493+
val NANOS_PER_SECOND = 1000000000
494+
val SECONDS_PER_MINUTE = 60
495+
val MINUTES_PER_HOUR = 60
496+
val NANOS_PER_MILLI = 1000000
497+
498+
def convertToTimestamp(value: Binary): Timestamp = {
499+
val nt = NanoTime.fromBinary(value)
500+
val timeOfDayNanos = nt.getTimeOfDayNanos
501+
val julianDay = nt.getJulianDay
502+
val jDateTime = new JDateTime(julianDay.toDouble)
503+
val calendar = getCalendar
504+
calendar.set(Calendar.YEAR, jDateTime.getYear)
505+
calendar.set(Calendar.MONTH, jDateTime.getMonth - 1)
506+
calendar.set(Calendar.DAY_OF_MONTH, jDateTime.getDay)
507+
508+
// written in command style
509+
var remainder = timeOfDayNanos
510+
calendar.set(
511+
Calendar.HOUR_OF_DAY,
512+
(remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR)).toInt)
513+
remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR)
514+
calendar.set(
515+
Calendar.MINUTE, (remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE)).toInt)
516+
remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE)
517+
calendar.set(Calendar.SECOND, (remainder / NANOS_PER_SECOND).toInt)
518+
// Hive-0.14 put all of the remainder into nanos, while we put the millis part away
519+
val nanos = remainder % NANOS_PER_SECOND
520+
val ts = new Timestamp(calendar.getTimeInMillis + nanos / NANOS_PER_MILLI)
521+
ts.setNanos((nanos % NANOS_PER_MILLI).toInt)
522+
ts
523+
}
524+
525+
def convertFromTimestamp(ts: Timestamp): Binary = {
526+
val calendar = getCalendar
527+
calendar.setTime(ts)
528+
val jDateTime = new JDateTime(calendar.get(Calendar.YEAR),
529+
calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH))
530+
// Hive-0.14 didn't set hour before get day number, while the day number should
531+
// has something to do with hour, since julian day number grows at 12h GMT
532+
// here we just follow what hive does.
533+
val julianDay = jDateTime.getJulianDayNumber
534+
535+
val hour = calendar.get(Calendar.HOUR_OF_DAY)
536+
val minute = calendar.get(Calendar.MINUTE)
537+
val second = calendar.get(Calendar.SECOND)
538+
// Hive-0.14 would not consider millis part in ts itself
539+
val nanos = ts.getNanos + ts.getTime % 1000 * NANOS_PER_MILLI
540+
// Hive-0.14 would use hours directly, that might be wrong, since the day starts
541+
// from 12h in Julian. here we just follow what hive does.
542+
val nanosOfDay = nanos + second * NANOS_PER_SECOND +
543+
minute * NANOS_PER_SECOND * SECONDS_PER_MINUTE +
544+
hour * NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR
545+
NanoTime(julianDay, nanosOfDay).toBinary
546+
}
547+
}
548+
457549
/**
458550
* A `parquet.io.api.GroupConverter` that converts a single-element groups that
459551
* match the characteristics of an array (see

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ private[sql] case class ParquetRelation(
6565
ParquetTypesConverter.readSchemaFromFile(
6666
new Path(path.split(",").head),
6767
conf,
68-
sqlContext.conf.isParquetBinaryAsString)
69-
68+
sqlContext.conf.isParquetBinaryAsString,
69+
sqlContext.conf.isParquetINT96AsTimestamp)
7070
lazy val attributeMap = AttributeMap(output.map(o => o -> o))
7171

7272
override def newInstance() = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
8383
// TODO: Why it can be null?
8484
if (schema == null) {
8585
log.debug("falling back to Parquet read schema")
86-
schema = ParquetTypesConverter.convertToAttributes(parquetSchema, false)
86+
schema = ParquetTypesConverter.convertToAttributes(
87+
parquetSchema, false, true)
8788
}
8889
log.debug(s"list of attributes that will be read: $schema")
8990
new RowRecordMaterializer(parquetSchema, schema)
@@ -184,12 +185,12 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
184185
case t @ StructType(_) => writeStruct(
185186
t,
186187
value.asInstanceOf[CatalystConverter.StructScalaType[_]])
187-
case _ => writePrimitive(schema.asInstanceOf[PrimitiveType], value)
188+
case _ => writePrimitive(schema.asInstanceOf[NativeType], value)
188189
}
189190
}
190191
}
191192

192-
private[parquet] def writePrimitive(schema: PrimitiveType, value: Any): Unit = {
193+
private[parquet] def writePrimitive(schema: DataType, value: Any): Unit = {
193194
if (value != null) {
194195
schema match {
195196
case StringType => writer.addBinary(
@@ -202,6 +203,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
202203
case IntegerType => writer.addInteger(value.asInstanceOf[Int])
203204
case ShortType => writer.addInteger(value.asInstanceOf[Short])
204205
case LongType => writer.addLong(value.asInstanceOf[Long])
206+
case TimestampType => writeTimestamp(value.asInstanceOf[java.sql.Timestamp])
205207
case ByteType => writer.addInteger(value.asInstanceOf[Byte])
206208
case DoubleType => writer.addDouble(value.asInstanceOf[Double])
207209
case FloatType => writer.addFloat(value.asInstanceOf[Float])
@@ -307,6 +309,10 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
307309
writer.addBinary(Binary.fromByteArray(scratchBytes, 0, numBytes))
308310
}
309311

312+
private[parquet] def writeTimestamp(ts: java.sql.Timestamp): Unit = {
313+
val binaryNanoTime = CatalystTimestampConverter.convertFromTimestamp(ts)
314+
writer.addBinary(binaryNanoTime)
315+
}
310316
}
311317

312318
// Optimized for non-nested rows
@@ -351,6 +357,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
351357
case DoubleType => writer.addDouble(record.getDouble(index))
352358
case FloatType => writer.addFloat(record.getFloat(index))
353359
case BooleanType => writer.addBoolean(record.getBoolean(index))
360+
case TimestampType => writeTimestamp(record(index).asInstanceOf[java.sql.Timestamp])
354361
case d: DecimalType =>
355362
if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
356363
sys.error(s"Unsupported datatype $d, cannot write to consumer")

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.hadoop.mapreduce.Job
2525
import org.apache.spark.sql.test.TestSQLContext
2626

2727
import parquet.example.data.{GroupWriter, Group}
28-
import parquet.example.data.simple.SimpleGroup
28+
import parquet.example.data.simple.{NanoTime, SimpleGroup}
2929
import parquet.hadoop.{ParquetReader, ParquetFileReader, ParquetWriter}
3030
import parquet.hadoop.api.WriteSupport
3131
import parquet.hadoop.api.WriteSupport.WriteContext
@@ -63,6 +63,7 @@ private[sql] object ParquetTestData {
6363
optional int64 mylong;
6464
optional float myfloat;
6565
optional double mydouble;
66+
optional int96 mytimestamp;
6667
}"""
6768

6869
// field names for test assertion error messages
@@ -72,7 +73,8 @@ private[sql] object ParquetTestData {
7273
"mystring:String",
7374
"mylong:Long",
7475
"myfloat:Float",
75-
"mydouble:Double"
76+
"mydouble:Double",
77+
"mytimestamp:Timestamp"
7678
)
7779

7880
val subTestSchema =
@@ -98,6 +100,7 @@ private[sql] object ParquetTestData {
98100
optional int64 myoptlong;
99101
optional float myoptfloat;
100102
optional double myoptdouble;
103+
optional int96 mytimestamp;
101104
}
102105
"""
103106

@@ -236,6 +239,7 @@ private[sql] object ParquetTestData {
236239
record.add(3, i.toLong << 33)
237240
record.add(4, 2.5F)
238241
record.add(5, 4.5D)
242+
record.add(6, new NanoTime(1,2))
239243
writer.write(record)
240244
}
241245
writer.close()

0 commit comments

Comments
 (0)