-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-33888][SQL] JDBC SQL TIME type represents incorrectly as TimestampType, it should be physical Int in millis #30902
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
656f0af
b7eed45
497e2f6
bd3c0bb
c8bcb3c
78d79c1
4a13c03
68e2afe
db366fb
c75037a
3513eb8
f55a7d6
6d5a3c4
63fad20
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.jdbc | |
|
|
||
| import java.sql.{Connection, Driver, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException, SQLFeatureNotSupportedException} | ||
| import java.util.Locale | ||
| import java.util.concurrent.TimeUnit | ||
|
|
||
| import scala.util.Try | ||
| import scala.util.control.NonFatal | ||
|
|
@@ -226,7 +227,7 @@ object JdbcUtils extends Logging { | |
| case java.sql.Types.SMALLINT => IntegerType | ||
| case java.sql.Types.SQLXML => StringType | ||
| case java.sql.Types.STRUCT => StringType | ||
| case java.sql.Types.TIME => TimestampType | ||
| case java.sql.Types.TIME => IntegerType | ||
| case java.sql.Types.TIME_WITH_TIMEZONE | ||
| => null | ||
| case java.sql.Types.TIMESTAMP => TimestampType | ||
|
|
@@ -303,11 +304,23 @@ object JdbcUtils extends Logging { | |
| } else { | ||
| rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls | ||
| } | ||
| val metadata = new MetadataBuilder().putLong("scale", fieldScale) | ||
| val metadata = new MetadataBuilder() | ||
| // SPARK-33888 | ||
| // - include scale in metadata for only DECIMAL & NUMERIC | ||
| // - include TIME type metadata | ||
| // - always build the metadata | ||
| dataType match { | ||
| // scalastyle:off | ||
| case java.sql.Types.NUMERIC => metadata.putLong("scale", fieldScale) | ||
| case java.sql.Types.DECIMAL => metadata.putLong("scale", fieldScale) | ||
| case java.sql.Types.TIME => metadata.putBoolean("logicaltimetype", true) | ||
| case _ => | ||
| // scalastyle:on | ||
| } | ||
| val columnType = | ||
| dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse( | ||
| getCatalystType(dataType, fieldSize, fieldScale, isSigned)) | ||
| fields(i) = StructField(columnName, columnType, nullable) | ||
| fields(i) = StructField(columnName, columnType, nullable, metadata.build()) | ||
| i = i + 1 | ||
| } | ||
| new StructType(fields) | ||
|
|
@@ -408,6 +421,23 @@ object JdbcUtils extends Logging { | |
| (rs: ResultSet, row: InternalRow, pos: Int) => | ||
| row.setFloat(pos, rs.getFloat(pos + 1)) | ||
|
|
||
|
|
||
| // SPARK-33888 - sql TIME type represents as physical int in millis | ||
| // Represents a time of day, with no reference to a particular calendar, | ||
| // time zone or date, with a precision of one millisecond. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After a second thought, why do we pick millisecond precision? Why not microsecond? Is there a standard for it?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we are converting to/from java.sql.Time, and according to the javadoc https://docs.oracle.com/javase/8/docs/api/java/sql/Time.html , it supports till milliseconds for constructor.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It may confuse Spark users, as Spark timestamp is microsecond precision. After more thought, it's probably better to return timestamp when reading JDBC time, with a clear rule: we convert the time to timestamp by using "zero epoch" as the date part. It's also more useful as users can call
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, I agree with you on the user experience part ( Another reason why I went with Int was that with TimestampType, it breaks compatibility with Avro logical & Spark type converter (https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala). I'm not sure if a
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks better if the avro schema converter can convert timestamp to time. After reading time column from JDBC, it becomes
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Your suggestion makes sense. We can also stuff the info into the metadata field of the struct field. Let me find sometimes this weekend or over the new year to try out your suggestion? Will ping you back if I couldn't find time or manage to get a solution so we can revert the MRs. Thanks a lots for helping me out! Should I create a new JIRA ticket and cut a new MR or do you prefer to consolidate in here?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's create a new JIRA to track it. The PR is merged to master only so we have plenty of time to fix it before the 3.2 release :)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan btw, I did a quick check and it seems like if we use TimestampType, the time will always be converted to the JVM system timezone. So for the JDBCSuite test, given
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can do whatever we want. We can use JDBC API |
||
| // It stores the number of milliseconds after midnight, 00:00:00.000. | ||
| case IntegerType if metadata.contains("logicaltimetype") => | ||
| (rs: ResultSet, row: InternalRow, pos: Int) => { | ||
| val rawTime = rs.getTime(pos + 1) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sarutak do you mean what returns here is seconds (with certain precision) from a starting timestamp, while the timestamp is different between databases? I'm a bit surprised if the JDBC protocol was design this way, but if this is true, then this PR doesn't make sense...
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What returns here is Maybe some databases don't follow the requirement, but it doesn't matter, as we call
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That said, I think reading SQL TIME type as integer has a well-define semantic in Spark (after this PR): the integer represents the milliseconds of the time from 00:00:00.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. O.K, so the test seems wrong. |
||
| if (rawTime != null) { | ||
| val rawTimeInNano = rawTime.toLocalTime().toNanoOfDay() | ||
| val timeInMillis = Math.toIntExact(TimeUnit.NANOSECONDS.toMillis(rawTimeInNano)) | ||
| row.setInt(pos, timeInMillis) | ||
| } else { | ||
| row.update(pos, null) | ||
| } | ||
| } | ||
|
|
||
| case IntegerType => | ||
| (rs: ResultSet, row: InternalRow, pos: Int) => | ||
| row.setInt(pos, rs.getInt(pos + 1)) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ package org.apache.spark.sql.jdbc | |
| import java.math.BigDecimal | ||
| import java.sql.{Date, DriverManager, SQLException, Timestamp} | ||
| import java.util.{Calendar, GregorianCalendar, Properties} | ||
| import java.util.concurrent.TimeUnit | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
|
|
||
|
|
@@ -610,7 +611,13 @@ class JDBCSuite extends QueryTest | |
| test("H2 time types") { | ||
| val rows = sql("SELECT * FROM timetypes").collect() | ||
| val cal = new GregorianCalendar(java.util.Locale.ROOT) | ||
| cal.setTime(rows(0).getAs[java.sql.Timestamp](0)) | ||
| val epochMillis = java.time.LocalTime.ofNanoOfDay( | ||
| TimeUnit.MILLISECONDS.toNanos(rows(0).getAs[Int](0))) | ||
| .atDate(java.time.LocalDate.ofEpochDay(0)) | ||
| .atZone(java.time.ZoneId.systemDefault()) | ||
| .toInstant() | ||
| .toEpochMilli() | ||
| cal.setTime(new Date(epochMillis)) | ||
| assert(cal.get(Calendar.HOUR_OF_DAY) === 12) | ||
| assert(cal.get(Calendar.MINUTE) === 34) | ||
| assert(cal.get(Calendar.SECOND) === 56) | ||
|
|
@@ -625,9 +632,24 @@ class JDBCSuite extends QueryTest | |
| assert(cal.get(Calendar.HOUR) === 11) | ||
| assert(cal.get(Calendar.MINUTE) === 22) | ||
| assert(cal.get(Calendar.SECOND) === 33) | ||
| assert(cal.get(Calendar.MILLISECOND) === 543) | ||
| assert(rows(0).getAs[java.sql.Timestamp](2).getNanos === 543543000) | ||
| } | ||
|
|
||
| test("SPARK-33888 : test TIME types") { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
| val rows = spark.read.jdbc( | ||
| urlWithUserAndPass, "TEST.TIMETYPES", new Properties()).collect() | ||
| val cachedRows = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties()) | ||
| .cache().collect() | ||
| val expectedTimeRaw = java.sql.Time.valueOf("12:34:56") | ||
| val expectedTimeMillis = java.util.concurrent.TimeUnit.SECONDS.toMillis( | ||
| expectedTimeRaw.toLocalTime().toSecondOfDay() | ||
| ).toInt | ||
| assert(rows(0).getAs[java.sql.Time](0) === expectedTimeMillis) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what was the result before? null?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Result before is |
||
| assert(rows(1).getAs[java.sql.Time](0) === expectedTimeMillis) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are we comparing
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops. The value is already converted to Int via |
||
| assert(cachedRows(0).getAs[java.sql.Time](0) === expectedTimeMillis) | ||
| } | ||
|
|
||
| test("test DATE types") { | ||
| val rows = spark.read.jdbc( | ||
| urlWithUserAndPass, "TEST.TIMETYPES", new Properties()).collect() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about
logical_time_type