Skip to content

Commit 9ce714d

Browse files
sureshthalamatigatorsmile
authored andcommitted
[SPARK-10655][SQL] Adding additional data type mappings to jdbc DB2dialect.
This patch adds DB2 specific data type mappings for decfloat, real, xml , and timestamp with time zone (DB2Z specific type) types on read and for byte, short data types on write to the to jdbc data source DB2 dialect. Default mapping does not work for these types when reading/writing from DB2 database. Added docker test, and a JDBC unit test case. Author: sureshthalamati <[email protected]> Closes #9162 from sureshthalamati/db2dialect_enhancements-spark-10655.
1 parent b6b1088 commit 9ce714d

File tree

3 files changed

+66
-11
lines changed

3 files changed

+66
-11
lines changed

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@ import java.math.BigDecimal
2121
import java.sql.{Connection, Date, Timestamp}
2222
import java.util.Properties
2323

24-
import org.scalatest._
24+
import org.scalatest.Ignore
2525

26+
import org.apache.spark.sql.Row
27+
import org.apache.spark.sql.types.{BooleanType, ByteType, ShortType, StructType}
2628
import org.apache.spark.tags.DockerTest
2729

30+
2831
@DockerTest
2932
@Ignore // AMPLab Jenkins needs to be updated before shared memory works on docker
3033
class DB2IntegrationSuite extends DockerJDBCIntegrationSuite {
@@ -47,19 +50,22 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite {
4750
conn.prepareStatement("INSERT INTO tbl VALUES (17,'dave')").executeUpdate()
4851

4952
conn.prepareStatement("CREATE TABLE numbers ( small SMALLINT, med INTEGER, big BIGINT, "
50-
+ "deci DECIMAL(31,20), flt FLOAT, dbl DOUBLE)").executeUpdate()
53+
+ "deci DECIMAL(31,20), flt FLOAT, dbl DOUBLE, real REAL, "
54+
+ "decflt DECFLOAT, decflt16 DECFLOAT(16), decflt34 DECFLOAT(34))").executeUpdate()
5155
conn.prepareStatement("INSERT INTO numbers VALUES (17, 77777, 922337203685477580, "
52-
+ "123456745.56789012345000000000, 42.75, 5.4E-70)").executeUpdate()
56+
+ "123456745.56789012345000000000, 42.75, 5.4E-70, "
57+
+ "3.4028234663852886e+38, 4.2999, DECFLOAT('9.999999999999999E19', 16), "
58+
+ "DECFLOAT('1234567891234567.123456789123456789', 34))").executeUpdate()
5359

5460
conn.prepareStatement("CREATE TABLE dates (d DATE, t TIME, ts TIMESTAMP )").executeUpdate()
5561
conn.prepareStatement("INSERT INTO dates VALUES ('1991-11-09', '13:31:24', "
5662
+ "'2009-02-13 23:31:30')").executeUpdate()
5763

5864
// TODO: Test locale conversion for strings.
59-
conn.prepareStatement("CREATE TABLE strings (a CHAR(10), b VARCHAR(10), c CLOB, d BLOB)")
60-
.executeUpdate()
61-
conn.prepareStatement("INSERT INTO strings VALUES ('the', 'quick', 'brown', BLOB('fox'))")
65+
conn.prepareStatement("CREATE TABLE strings (a CHAR(10), b VARCHAR(10), c CLOB, d BLOB, e XML)")
6266
.executeUpdate()
67+
conn.prepareStatement("INSERT INTO strings VALUES ('the', 'quick', 'brown', BLOB('fox'),"
68+
+ "'<cinfo cid=\"10\"><name>Kathy</name></cinfo>')").executeUpdate()
6369
}
6470

6571
test("Basic test") {
@@ -77,20 +83,28 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite {
7783
val rows = df.collect()
7884
assert(rows.length == 1)
7985
val types = rows(0).toSeq.map(x => x.getClass.toString)
80-
assert(types.length == 6)
86+
assert(types.length == 10)
8187
assert(types(0).equals("class java.lang.Integer"))
8288
assert(types(1).equals("class java.lang.Integer"))
8389
assert(types(2).equals("class java.lang.Long"))
8490
assert(types(3).equals("class java.math.BigDecimal"))
8591
assert(types(4).equals("class java.lang.Double"))
8692
assert(types(5).equals("class java.lang.Double"))
93+
assert(types(6).equals("class java.lang.Float"))
94+
assert(types(7).equals("class java.math.BigDecimal"))
95+
assert(types(8).equals("class java.math.BigDecimal"))
96+
assert(types(9).equals("class java.math.BigDecimal"))
8797
assert(rows(0).getInt(0) == 17)
8898
assert(rows(0).getInt(1) == 77777)
8999
assert(rows(0).getLong(2) == 922337203685477580L)
90100
val bd = new BigDecimal("123456745.56789012345000000000")
91101
assert(rows(0).getAs[BigDecimal](3).equals(bd))
92102
assert(rows(0).getDouble(4) == 42.75)
93103
assert(rows(0).getDouble(5) == 5.4E-70)
104+
assert(rows(0).getFloat(6) == 3.4028234663852886e+38)
105+
assert(rows(0).getDecimal(7) == new BigDecimal("4.299900000000000000"))
106+
assert(rows(0).getDecimal(8) == new BigDecimal("99999999999999990000.000000000000000000"))
107+
assert(rows(0).getDecimal(9) == new BigDecimal("1234567891234567.123456789123456789"))
94108
}
95109

96110
test("Date types") {
@@ -112,7 +126,7 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite {
112126
val rows = df.collect()
113127
assert(rows.length == 1)
114128
val types = rows(0).toSeq.map(x => x.getClass.toString)
115-
assert(types.length == 4)
129+
assert(types.length == 5)
116130
assert(types(0).equals("class java.lang.String"))
117131
assert(types(1).equals("class java.lang.String"))
118132
assert(types(2).equals("class java.lang.String"))
@@ -121,14 +135,27 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite {
121135
assert(rows(0).getString(1).equals("quick"))
122136
assert(rows(0).getString(2).equals("brown"))
123137
assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](3), Array[Byte](102, 111, 120)))
138+
assert(rows(0).getString(4).equals("""<cinfo cid="10"><name>Kathy</name></cinfo>"""))
124139
}
125140

126141
test("Basic write test") {
127-
// val df1 = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties)
142+
// cast decflt column with precision value of 38 to DB2 max decimal precision value of 31.
143+
val df1 = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties)
144+
.selectExpr("small", "med", "big", "deci", "flt", "dbl", "real",
145+
"cast(decflt as decimal(31, 5)) as decflt")
128146
val df2 = sqlContext.read.jdbc(jdbcUrl, "dates", new Properties)
129147
val df3 = sqlContext.read.jdbc(jdbcUrl, "strings", new Properties)
130-
// df1.write.jdbc(jdbcUrl, "numberscopy", new Properties)
148+
df1.write.jdbc(jdbcUrl, "numberscopy", new Properties)
131149
df2.write.jdbc(jdbcUrl, "datescopy", new Properties)
132150
df3.write.jdbc(jdbcUrl, "stringscopy", new Properties)
151+
// spark types that does not have exact matching db2 table types.
152+
val df4 = sqlContext.createDataFrame(
153+
sparkContext.parallelize(Seq(Row("1".toShort, "20".toByte, true))),
154+
new StructType().add("c1", ShortType).add("b", ByteType).add("c3", BooleanType))
155+
df4.write.jdbc(jdbcUrl, "otherscopy", new Properties)
156+
val rows = sqlContext.read.jdbc(jdbcUrl, "otherscopy", new Properties).collect()
157+
assert(rows(0).getInt(0) == 1)
158+
assert(rows(0).getInt(1) == 20)
159+
assert(rows(0).getString(2) == "1")
133160
}
134161
}

sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,34 @@
1717

1818
package org.apache.spark.sql.jdbc
1919

20-
import org.apache.spark.sql.types.{BooleanType, DataType, StringType}
20+
import java.sql.Types
21+
22+
import org.apache.spark.sql.types._
2123

2224
private object DB2Dialect extends JdbcDialect {
2325

2426
override def canHandle(url: String): Boolean = url.startsWith("jdbc:db2")
2527

28+
override def getCatalystType(
29+
sqlType: Int,
30+
typeName: String,
31+
size: Int,
32+
md: MetadataBuilder): Option[DataType] = sqlType match {
33+
case Types.REAL => Option(FloatType)
34+
case Types.OTHER =>
35+
typeName match {
36+
case "DECFLOAT" => Option(DecimalType(38, 18))
37+
case "XML" => Option(StringType)
38+
case t if (t.startsWith("TIMESTAMP")) => Option(TimestampType) // TIMESTAMP WITH TIMEZONE
39+
case _ => None
40+
}
41+
case _ => None
42+
}
43+
2644
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
2745
case StringType => Option(JdbcType("CLOB", java.sql.Types.CLOB))
2846
case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR))
47+
case ShortType | ByteType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
2948
case _ => None
3049
}
3150

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,15 @@ class JDBCSuite extends SparkFunSuite
713713
val db2Dialect = JdbcDialects.get("jdbc:db2://127.0.0.1/db")
714714
assert(db2Dialect.getJDBCType(StringType).map(_.databaseTypeDefinition).get == "CLOB")
715715
assert(db2Dialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == "CHAR(1)")
716+
assert(db2Dialect.getJDBCType(ShortType).map(_.databaseTypeDefinition).get == "SMALLINT")
717+
assert(db2Dialect.getJDBCType(ByteType).map(_.databaseTypeDefinition).get == "SMALLINT")
718+
// test db2 dialect mappings on read
719+
assert(db2Dialect.getCatalystType(java.sql.Types.REAL, "REAL", 1, null) == Option(FloatType))
720+
assert(db2Dialect.getCatalystType(java.sql.Types.OTHER, "DECFLOAT", 1, null) ==
721+
Option(DecimalType(38, 18)))
722+
assert(db2Dialect.getCatalystType(java.sql.Types.OTHER, "XML", 1, null) == Option(StringType))
723+
assert(db2Dialect.getCatalystType(java.sql.Types.OTHER, "TIMESTAMP WITH TIME ZONE", 1, null) ==
724+
Option(TimestampType))
716725
}
717726

718727
test("PostgresDialect type mapping") {

0 commit comments

Comments
 (0)