Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.jdbc

import java.sql.{Connection, Driver, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException, SQLFeatureNotSupportedException}
import java.util.Locale
import java.util.concurrent.TimeUnit

import scala.util.Try
import scala.util.control.NonFatal
Expand Down Expand Up @@ -226,7 +227,7 @@ object JdbcUtils extends Logging {
case java.sql.Types.SMALLINT => IntegerType
case java.sql.Types.SQLXML => StringType
case java.sql.Types.STRUCT => StringType
case java.sql.Types.TIME => TimestampType
case java.sql.Types.TIME => IntegerType
case java.sql.Types.TIME_WITH_TIMEZONE
=> null
case java.sql.Types.TIMESTAMP => TimestampType
Expand Down Expand Up @@ -303,11 +304,23 @@ object JdbcUtils extends Logging {
} else {
rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
}
val metadata = new MetadataBuilder().putLong("scale", fieldScale)
val metadata = new MetadataBuilder()
// SPARK-33888
// - include scale in metadata for only DECIMAL & NUMERIC
// - include TIME type metadata
// - always build the metadata
dataType match {
// scalastyle:off
case java.sql.Types.NUMERIC => metadata.putLong("scale", fieldScale)
case java.sql.Types.DECIMAL => metadata.putLong("scale", fieldScale)
case java.sql.Types.TIME => metadata.putBoolean("logical_time_type", true)
case _ =>
// scalastyle:on
}
val columnType =
dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse(
getCatalystType(dataType, fieldSize, fieldScale, isSigned))
fields(i) = StructField(columnName, columnType, nullable)
fields(i) = StructField(columnName, columnType, nullable, metadata.build())
i = i + 1
}
new StructType(fields)
Expand Down Expand Up @@ -408,6 +421,23 @@ object JdbcUtils extends Logging {
(rs: ResultSet, row: InternalRow, pos: Int) =>
row.setFloat(pos, rs.getFloat(pos + 1))


// SPARK-33888 - sql TIME type represents as physical int in millis
// Represents a time of day, with no reference to a particular calendar,
// time zone or date, with a precision of one millisecond.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a second thought, why do we pick millisecond precision? Why not microsecond? Is there a standard for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are converting to/from java.sql.Time, and according to the javadoc https://docs.oracle.com/javase/8/docs/api/java/sql/Time.html , it supports till milliseconds for constructor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may confuse Spark users, as Spark timestamp is microsecond precision.

After more thought, it's probably better to return timestamp when reading JDBC time, with a clear rule: we convert the time to timestamp by using "zero epoch" as the date part. It's also more useful as users can call hour function or similar ones to get some field values. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I agree with you on the user experience part (hour function and all). I think it is hard (near impossible) to introduce new DataType (Time - HH:MM:SS.sss display) and another function to convert an int without date portion to Timestamp (most conversion is parse string) as it gonna take through multiple level of approvals and testings.

Another reason why I went with Int was that with TimestampType, it breaks compatibility with Avro logical & Spark type converter (https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala).

I'm not sure if a time_to_timestamp() helper function would be a better compromise or revert back the 2 MRs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks better if the avro schema converter can convert timestamp to time. After reading time column from JDBC, it becomes IntegerType and there is no context to indicate that this int comes from JDBC time and means milliseconds. What if the avro logic type is time-micros? With timestamp type, at least we know the precision is microsecond.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your suggestion makes sense. We can also stuff the info into the metadata field of the struct field. Let me find sometimes this weekend or over the new year to try out your suggestion? Will ping you back if I couldn't find time or manage to get a solution so we can revert the MRs. Thanks a lots for helping me out!

Should I create a new JIRA ticket and cut a new MR or do you prefer to consolidate in here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's create a new JIRA to track it. The PR is merged to master only so we have plenty of time to fix it before the 3.2 release :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan btw, I did a quick check and it seems like if we use TimestampType, the time will always be converted to the JVM system timezone. So for the JDBCSuite test, given 12:34:56 time value, when you do the .select(hour("time")) it will always point to my local timezone hour instead of 12. So I don't know if we should proceed in this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do whatever we want. We can use JDBC API getTime to get the time value, and construct the timestamp value in a reasonable way. It's under our control.

// It stores the number of milliseconds after midnight, 00:00:00.000.
case IntegerType if metadata.contains("logical_time_type") =>
(rs: ResultSet, row: InternalRow, pos: Int) => {
val rawTime = rs.getTime(pos + 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sarutak do you mean what returns here is seconds (with certain precision) from a starting timestamp, while the timestamp is different between databases? I'm a bit surprised if the JDBC protocol was design this way, but if this is true, then this PR doesn't make sense...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What returns here is java.sql.Time, and its doc says

The date components should be set to the "zero epoch"
value of January 1, 1970 and should not be accessed.

Maybe some databases don't follow the requirement, but it doesn't matter, as we call rawTime.toLocalTime which only access the hour:minute:second components.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, I think reading SQL TIME type as integer has a well-define semantic in Spark (after this PR): the integer represents the milliseconds of the time from 00:00:00.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

O.K, so the test seems wrong.
Actually, I'll fix it in another PR.

if (rawTime != null) {
val rawTimeInNano = rawTime.toLocalTime().toNanoOfDay()
val timeInMillis = Math.toIntExact(TimeUnit.NANOSECONDS.toMillis(rawTimeInNano))
row.setInt(pos, timeInMillis)
} else {
row.update(pos, null)
}
}

case IntegerType =>
(rs: ResultSet, row: InternalRow, pos: Int) =>
row.setInt(pos, rs.getInt(pos + 1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.jdbc
import java.math.BigDecimal
import java.sql.{Date, DriverManager, SQLException, Timestamp}
import java.util.{Calendar, GregorianCalendar, Properties}
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -610,7 +611,13 @@ class JDBCSuite extends QueryTest
test("H2 time types") {
val rows = sql("SELECT * FROM timetypes").collect()
val cal = new GregorianCalendar(java.util.Locale.ROOT)
cal.setTime(rows(0).getAs[java.sql.Timestamp](0))
val epochMillis = java.time.LocalTime.ofNanoOfDay(
TimeUnit.MILLISECONDS.toNanos(rows(0).getAs[Int](0)))
.atDate(java.time.LocalDate.ofEpochDay(0))
.atZone(java.time.ZoneId.systemDefault())
.toInstant()
.toEpochMilli()
cal.setTime(new Date(epochMillis))
assert(cal.get(Calendar.HOUR_OF_DAY) === 12)
assert(cal.get(Calendar.MINUTE) === 34)
assert(cal.get(Calendar.SECOND) === 56)
Expand All @@ -625,9 +632,26 @@ class JDBCSuite extends QueryTest
assert(cal.get(Calendar.HOUR) === 11)
assert(cal.get(Calendar.MINUTE) === 22)
assert(cal.get(Calendar.SECOND) === 33)
assert(cal.get(Calendar.MILLISECOND) === 543)
assert(rows(0).getAs[java.sql.Timestamp](2).getNanos === 543543000)
}

test("SPARK-33888: test TIME types") {
val rows = spark.read.jdbc(
urlWithUserAndPass, "TEST.TIMETYPES", new Properties()).collect()
val cachedRows = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties())
.cache().collect()
val expectedTimeRaw = java.sql.Time.valueOf("12:34:56")
val expectedTimeMillis = Math.toIntExact(
java.util.concurrent.TimeUnit.NANOSECONDS.toMillis(
expectedTimeRaw.toLocalTime().toNanoOfDay()
)
)
assert(rows(0).getAs[Int](0) === expectedTimeMillis)
assert(rows(1).getAs[Int](0) === expectedTimeMillis)
assert(cachedRows(0).getAs[Int](0) === expectedTimeMillis)
}

test("test DATE types") {
val rows = spark.read.jdbc(
urlWithUserAndPass, "TEST.TIMETYPES", new Properties()).collect()
Expand Down