Skip to content

Commit 37719e0

Browse files
Davies Liurxin
authored andcommitted
[SPARK-8189] [SQL] use Long for TimestampType in SQL
This PR change to use Long as internal type for TimestampType for efficiency, which means it will the precision below 100ns. Author: Davies Liu <[email protected]> Closes apache#6733 from davies/timestamp and squashes the following commits: d9565fa [Davies Liu] remove print 65cf2f1 [Davies Liu] fix Timestamp in SparkR 86fecfb [Davies Liu] disable two timestamp tests 8f77ee0 [Davies Liu] fix scala style 246ee74 [Davies Liu] address comments 309d2e1 [Davies Liu] use Long for TimestampType in SQL
1 parent b928f54 commit 37719e0

File tree

36 files changed

+272
-172
lines changed

36 files changed

+272
-172
lines changed

core/src/main/scala/org/apache/spark/api/r/SerDe.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.api.r
1919

2020
import java.io.{DataInputStream, DataOutputStream}
21-
import java.sql.{Date, Time}
21+
import java.sql.{Timestamp, Date, Time}
2222

2323
import scala.collection.JavaConversions._
2424

@@ -107,9 +107,12 @@ private[spark] object SerDe {
107107
Date.valueOf(readString(in))
108108
}
109109

110-
def readTime(in: DataInputStream): Time = {
111-
val t = in.readDouble()
112-
new Time((t * 1000L).toLong)
110+
def readTime(in: DataInputStream): Timestamp = {
111+
val seconds = in.readDouble()
112+
val sec = Math.floor(seconds).toLong
113+
val t = new Timestamp(sec * 1000L)
114+
t.setNanos(((seconds - sec) * 1e9).toInt)
115+
t
113116
}
114117

115118
def readBytesArr(in: DataInputStream): Array[Array[Byte]] = {
@@ -227,6 +230,9 @@ private[spark] object SerDe {
227230
case "java.sql.Time" =>
228231
writeType(dos, "time")
229232
writeTime(dos, value.asInstanceOf[Time])
233+
case "java.sql.Timestamp" =>
234+
writeType(dos, "time")
235+
writeTime(dos, value.asInstanceOf[Timestamp])
230236
case "[B" =>
231237
writeType(dos, "raw")
232238
writeBytes(dos, value.asInstanceOf[Array[Byte]])
@@ -289,6 +295,9 @@ private[spark] object SerDe {
289295
out.writeDouble(value.getTime.toDouble / 1000.0)
290296
}
291297

298+
def writeTime(out: DataOutputStream, value: Timestamp): Unit = {
299+
out.writeDouble((value.getTime / 1000).toDouble + value.getNanos.toDouble / 1e9)
300+
}
292301

293302
// NOTE: Only works for ASCII right now
294303
def writeString(out: DataOutputStream, value: String): Unit = {

python/pyspark/sql/types.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import decimal
2020
import time
2121
import datetime
22+
import calendar
2223
import keyword
2324
import warnings
2425
import json
@@ -654,6 +655,8 @@ def _need_python_to_sql_conversion(dataType):
654655
_need_python_to_sql_conversion(dataType.valueType)
655656
elif isinstance(dataType, UserDefinedType):
656657
return True
658+
elif isinstance(dataType, TimestampType):
659+
return True
657660
else:
658661
return False
659662

@@ -707,6 +710,14 @@ def converter(obj):
707710
return lambda m: dict([(key_converter(k), value_converter(v)) for k, v in m.items()])
708711
elif isinstance(dataType, UserDefinedType):
709712
return lambda obj: dataType.serialize(obj)
713+
elif isinstance(dataType, TimestampType):
714+
715+
def to_posix_timstamp(dt):
716+
if dt.tzinfo is None:
717+
return int(time.mktime(dt.timetuple()) * 1e7 + dt.microsecond * 10)
718+
else:
719+
return int(calendar.timegm(dt.utctimetuple()) * 1e7 + dt.microsecond * 10)
720+
return to_posix_timstamp
710721
else:
711722
raise ValueError("Unexpected type %r" % dataType)
712723

sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.math.BigDecimal;
2121
import java.sql.Date;
22+
import java.sql.Timestamp;
2223
import java.util.List;
2324

2425
import scala.collection.Seq;
@@ -103,6 +104,11 @@ public Date getDate(int i) {
103104
throw new UnsupportedOperationException();
104105
}
105106

107+
@Override
108+
public Timestamp getTimestamp(int i) {
109+
throw new UnsupportedOperationException();
110+
}
111+
106112
@Override
107113
public <T> Seq<T> getSeq(int i) {
108114
throw new UnsupportedOperationException();

sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,9 +260,15 @@ trait Row extends Serializable {
260260
*
261261
* @throws ClassCastException when data type does not match.
262262
*/
263-
// TODO(davies): This is not the right default implementation, we use Int as Date internally
264263
def getDate(i: Int): java.sql.Date = apply(i).asInstanceOf[java.sql.Date]
265264

265+
/**
266+
* Returns the value at position i of date type as java.sql.Timestamp.
267+
*
268+
* @throws ClassCastException when data type does not match.
269+
*/
270+
def getTimestamp(i: Int): java.sql.Timestamp = apply(i).asInstanceOf[java.sql.Timestamp]
271+
266272
/**
267273
* Returns the value at position i of array type as a Scala Seq.
268274
*

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst
1919

2020
import java.lang.{Iterable => JavaIterable}
2121
import java.math.{BigDecimal => JavaBigDecimal}
22-
import java.sql.Date
22+
import java.sql.{Timestamp, Date}
2323
import java.util.{Map => JavaMap}
2424
import javax.annotation.Nullable
2525

@@ -58,6 +58,7 @@ object CatalystTypeConverters {
5858
case structType: StructType => StructConverter(structType)
5959
case StringType => StringConverter
6060
case DateType => DateConverter
61+
case TimestampType => TimestampConverter
6162
case dt: DecimalType => BigDecimalConverter
6263
case BooleanType => BooleanConverter
6364
case ByteType => ByteConverter
@@ -274,6 +275,15 @@ object CatalystTypeConverters {
274275
override def toScalaImpl(row: Row, column: Int): Date = toScala(row.getInt(column))
275276
}
276277

278+
private object TimestampConverter extends CatalystTypeConverter[Timestamp, Timestamp, Any] {
279+
override def toCatalystImpl(scalaValue: Timestamp): Long =
280+
DateUtils.fromJavaTimestamp(scalaValue)
281+
override def toScala(catalystValue: Any): Timestamp =
282+
if (catalystValue == null) null
283+
else DateUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long])
284+
override def toScalaImpl(row: Row, column: Int): Timestamp = toScala(row.getLong(column))
285+
}
286+
277287
private object BigDecimalConverter extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] {
278288
override def toCatalystImpl(scalaValue: Any): Decimal = scalaValue match {
279289
case d: BigDecimal => Decimal(d)
@@ -367,6 +377,7 @@ object CatalystTypeConverters {
367377
def convertToCatalyst(a: Any): Any = a match {
368378
case s: String => StringConverter.toCatalyst(s)
369379
case d: Date => DateConverter.toCatalyst(d)
380+
case t: Timestamp => TimestampConverter.toCatalyst(t)
370381
case d: BigDecimal => BigDecimalConverter.toCatalyst(d)
371382
case d: JavaBigDecimal => BigDecimalConverter.toCatalyst(d)
372383
case seq: Seq[Any] => seq.map(convertToCatalyst)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala

Lines changed: 28 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
113113
private[this] def castToString(from: DataType): Any => Any = from match {
114114
case BinaryType => buildCast[Array[Byte]](_, UTF8String(_))
115115
case DateType => buildCast[Int](_, d => UTF8String(DateUtils.toString(d)))
116-
case TimestampType => buildCast[Timestamp](_, t => UTF8String(timestampToString(t)))
116+
case TimestampType => buildCast[Long](_,
117+
t => UTF8String(timestampToString(DateUtils.toJavaTimestamp(t))))
117118
case _ => buildCast[Any](_, o => UTF8String(o.toString))
118119
}
119120

@@ -127,7 +128,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
127128
case StringType =>
128129
buildCast[UTF8String](_, _.length() != 0)
129130
case TimestampType =>
130-
buildCast[Timestamp](_, t => t.getTime() != 0 || t.getNanos() != 0)
131+
buildCast[Long](_, t => t != 0)
131132
case DateType =>
132133
// Hive would return null when cast from date to boolean
133134
buildCast[Int](_, d => null)
@@ -158,20 +159,21 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
158159
if (periodIdx != -1 && n.length() - periodIdx > 9) {
159160
n = n.substring(0, periodIdx + 10)
160161
}
161-
try Timestamp.valueOf(n) catch { case _: java.lang.IllegalArgumentException => null }
162+
try DateUtils.fromJavaTimestamp(Timestamp.valueOf(n))
163+
catch { case _: java.lang.IllegalArgumentException => null }
162164
})
163165
case BooleanType =>
164-
buildCast[Boolean](_, b => new Timestamp(if (b) 1 else 0))
166+
buildCast[Boolean](_, b => (if (b) 1L else 0))
165167
case LongType =>
166-
buildCast[Long](_, l => new Timestamp(l))
168+
buildCast[Long](_, l => longToTimestamp(l))
167169
case IntegerType =>
168-
buildCast[Int](_, i => new Timestamp(i))
170+
buildCast[Int](_, i => longToTimestamp(i.toLong))
169171
case ShortType =>
170-
buildCast[Short](_, s => new Timestamp(s))
172+
buildCast[Short](_, s => longToTimestamp(s.toLong))
171173
case ByteType =>
172-
buildCast[Byte](_, b => new Timestamp(b))
174+
buildCast[Byte](_, b => longToTimestamp(b.toLong))
173175
case DateType =>
174-
buildCast[Int](_, d => new Timestamp(DateUtils.toJavaDate(d).getTime))
176+
buildCast[Int](_, d => DateUtils.toMillisSinceEpoch(d) * 10000)
175177
// TimestampWritable.decimalToTimestamp
176178
case DecimalType() =>
177179
buildCast[Decimal](_, d => decimalToTimestamp(d))
@@ -191,25 +193,17 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
191193
})
192194
}
193195

194-
private[this] def decimalToTimestamp(d: Decimal) = {
195-
val seconds = Math.floor(d.toDouble).toLong
196-
val bd = (d.toBigDecimal - seconds) * 1000000000
197-
val nanos = bd.intValue()
198-
199-
val millis = seconds * 1000
200-
val t = new Timestamp(millis)
201-
202-
// remaining fractional portion as nanos
203-
t.setNanos(nanos)
204-
t
196+
private[this] def decimalToTimestamp(d: Decimal): Long = {
197+
(d.toBigDecimal * 10000000L).longValue()
205198
}
206199

207-
// Timestamp to long, converting milliseconds to seconds
208-
private[this] def timestampToLong(ts: Timestamp) = Math.floor(ts.getTime / 1000.0).toLong
209-
210-
private[this] def timestampToDouble(ts: Timestamp) = {
211-
// First part is the seconds since the beginning of time, followed by nanosecs.
212-
Math.floor(ts.getTime / 1000.0).toLong + ts.getNanos.toDouble / 1000000000
200+
// converting milliseconds to 100ns
201+
private[this] def longToTimestamp(t: Long): Long = t * 10000L
202+
// converting 100ns to seconds
203+
private[this] def timestampToLong(ts: Long): Long = math.floor(ts.toDouble / 10000000L).toLong
204+
// converting 100ns to seconds in double
205+
private[this] def timestampToDouble(ts: Long): Double = {
206+
ts / 10000000.0
213207
}
214208

215209
// Converts Timestamp to string according to Hive TimestampWritable convention
@@ -234,7 +228,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
234228
case TimestampType =>
235229
// throw valid precision more than seconds, according to Hive.
236230
// Timestamp.nanos is in 0 to 999,999,999, no more than a second.
237-
buildCast[Timestamp](_, t => DateUtils.millisToDays(t.getTime))
231+
buildCast[Long](_, t => DateUtils.millisToDays(t / 10000L))
238232
// Hive throws this exception as a Semantic Exception
239233
// It is never possible to compare result when hive return with exception,
240234
// so we can return null
@@ -253,7 +247,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
253247
case DateType =>
254248
buildCast[Int](_, d => null)
255249
case TimestampType =>
256-
buildCast[Timestamp](_, t => timestampToLong(t))
250+
buildCast[Long](_, t => timestampToLong(t))
257251
case x: NumericType =>
258252
b => x.numeric.asInstanceOf[Numeric[Any]].toLong(b)
259253
}
@@ -269,7 +263,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
269263
case DateType =>
270264
buildCast[Int](_, d => null)
271265
case TimestampType =>
272-
buildCast[Timestamp](_, t => timestampToLong(t).toInt)
266+
buildCast[Long](_, t => timestampToLong(t).toInt)
273267
case x: NumericType =>
274268
b => x.numeric.asInstanceOf[Numeric[Any]].toInt(b)
275269
}
@@ -285,7 +279,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
285279
case DateType =>
286280
buildCast[Int](_, d => null)
287281
case TimestampType =>
288-
buildCast[Timestamp](_, t => timestampToLong(t).toShort)
282+
buildCast[Long](_, t => timestampToLong(t).toShort)
289283
case x: NumericType =>
290284
b => x.numeric.asInstanceOf[Numeric[Any]].toInt(b).toShort
291285
}
@@ -301,7 +295,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
301295
case DateType =>
302296
buildCast[Int](_, d => null)
303297
case TimestampType =>
304-
buildCast[Timestamp](_, t => timestampToLong(t).toByte)
298+
buildCast[Long](_, t => timestampToLong(t).toByte)
305299
case x: NumericType =>
306300
b => x.numeric.asInstanceOf[Numeric[Any]].toInt(b).toByte
307301
}
@@ -334,7 +328,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
334328
buildCast[Int](_, d => null) // date can't cast to decimal in Hive
335329
case TimestampType =>
336330
// Note that we lose precision here.
337-
buildCast[Timestamp](_, t => changePrecision(Decimal(timestampToDouble(t)), target))
331+
buildCast[Long](_, t => changePrecision(Decimal(timestampToDouble(t)), target))
338332
case DecimalType() =>
339333
b => changePrecision(b.asInstanceOf[Decimal].clone(), target)
340334
case LongType =>
@@ -358,7 +352,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
358352
case DateType =>
359353
buildCast[Int](_, d => null)
360354
case TimestampType =>
361-
buildCast[Timestamp](_, t => timestampToDouble(t))
355+
buildCast[Long](_, t => timestampToDouble(t))
362356
case x: NumericType =>
363357
b => x.numeric.asInstanceOf[Numeric[Any]].toDouble(b)
364358
}
@@ -374,7 +368,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
374368
case DateType =>
375369
buildCast[Int](_, d => null)
376370
case TimestampType =>
377-
buildCast[Timestamp](_, t => timestampToDouble(t).toFloat)
371+
buildCast[Long](_, t => timestampToDouble(t).toFloat)
378372
case x: NumericType =>
379373
b => x.numeric.asInstanceOf[Numeric[Any]].toFloat(b)
380374
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR
203203
case BooleanType => new MutableBoolean
204204
case LongType => new MutableLong
205205
case DateType => new MutableInt // We use INT for DATE internally
206+
case TimestampType => new MutableLong // We use Long for Timestamp internally
206207
case _ => new MutableAny
207208
}.toArray)
208209

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ class CodeGenContext {
122122
case BinaryType => "byte[]"
123123
case StringType => stringType
124124
case DateType => "int"
125-
case TimestampType => "java.sql.Timestamp"
125+
case TimestampType => "long"
126126
case dt: OpenHashSetUDT if dt.elementType == IntegerType => classOf[IntegerHashSet].getName
127127
case dt: OpenHashSetUDT if dt.elementType == LongType => classOf[LongHashSet].getName
128128
case _ => "Object"
@@ -140,6 +140,7 @@ class CodeGenContext {
140140
case FloatType => "Float"
141141
case BooleanType => "Boolean"
142142
case DateType => "Integer"
143+
case TimestampType => "Long"
143144
case _ => javaType(dt)
144145
}
145146

@@ -155,6 +156,7 @@ class CodeGenContext {
155156
case DoubleType => "-1.0"
156157
case IntegerType => "-1"
157158
case DateType => "-1"
159+
case TimestampType => "-1L"
158160
case _ => "null"
159161
}
160162

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,9 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
7373

7474
val specificAccessorFunctions = ctx.nativeTypes.map { dataType =>
7575
val cases = expressions.zipWithIndex.map {
76-
case (e, i) if e.dataType == dataType =>
76+
case (e, i) if e.dataType == dataType
77+
|| dataType == IntegerType && e.dataType == DateType
78+
|| dataType == LongType && e.dataType == TimestampType =>
7779
s"case $i: return c$i;"
7880
case _ => ""
7981
}.mkString("\n ")
@@ -96,7 +98,9 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
9698

9799
val specificMutatorFunctions = ctx.nativeTypes.map { dataType =>
98100
val cases = expressions.zipWithIndex.map {
99-
case (e, i) if e.dataType == dataType =>
101+
case (e, i) if e.dataType == dataType
102+
|| dataType == IntegerType && e.dataType == DateType
103+
|| dataType == LongType && e.dataType == TimestampType =>
100104
s"case $i: { c$i = value; return; }"
101105
case _ => ""
102106
}.mkString("\n")
@@ -119,7 +123,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
119123
val nonNull = e.dataType match {
120124
case BooleanType => s"$col ? 0 : 1"
121125
case ByteType | ShortType | IntegerType | DateType => s"$col"
122-
case LongType => s"$col ^ ($col >>> 32)"
126+
case LongType | TimestampType => s"$col ^ ($col >>> 32)"
123127
case FloatType => s"Float.floatToIntBits($col)"
124128
case DoubleType =>
125129
s"(int)(Double.doubleToLongBits($col) ^ (Double.doubleToLongBits($col) >>> 32))"

0 commit comments

Comments
 (0)