Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,15 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.scoreLabelsWeight"),
// SPARK-46938: Javax -> Jakarta namespace change.
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ui.ProxyRedirectHandler$ResponseWrapper"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ui.ProxyRedirectHandler#ResponseWrapper.this")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ui.ProxyRedirectHandler#ResponseWrapper.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.jdbc.DB2Dialect#DB2SQLBuilder.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.jdbc.DB2Dialect#DB2SQLQueryBuilder.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.jdbc.MsSqlServerDialect#MsSqlServerSQLBuilder.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.jdbc.MsSqlServerDialect#MsSqlServerSQLQueryBuilder.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.jdbc.MySQLDialect#MySQLSQLBuilder.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.jdbc.MySQLDialect#MySQLSQLQueryBuilder.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.jdbc.OracleDialect#OracleSQLBuilder.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.jdbc.OracleDialect#OracleSQLQueryBuilder.this")
)

// Default exclude rules
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

org.apache.spark.sql.jdbc.MySQLDialect
org.apache.spark.sql.jdbc.PostgresDialect
org.apache.spark.sql.jdbc.DB2Dialect
org.apache.spark.sql.jdbc.MsSqlServerDialect
org.apache.spark.sql.jdbc.DerbyDialect
org.apache.spark.sql.jdbc.OracleDialect
org.apache.spark.sql.jdbc.TeradataDialect
org.apache.spark.sql.jdbc.H2Dialect
org.apache.spark.sql.jdbc.SnowflakeDialect
org.apache.spark.sql.jdbc.DatabricksDialect
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.connector.expressions.Expression
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.types._

private object DB2Dialect extends JdbcDialect {
private case class DB2Dialect() extends JdbcDialect {

override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:db2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
import org.apache.spark.sql.types._

private case object DatabricksDialect extends JdbcDialect {
private case class DatabricksDialect() extends JdbcDialect {

override def canHandle(url: String): Boolean = {
url.startsWith("jdbc:databricks")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors
import org.apache.spark.sql.types._


private object DerbyDialect extends JdbcDialect {
private case class DerbyDialect() extends JdbcDialect {

override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:derby")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, N
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, MetadataBuilder, ShortType, StringType, TimestampType}

private[sql] object H2Dialect extends JdbcDialect {
private[sql] case class H2Dialect() extends JdbcDialect {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:h2")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.jdbc
import java.sql.{Connection, Date, Driver, Statement, Timestamp}
import java.time.{Instant, LocalDate, LocalDateTime}
import java.util
import java.util.ServiceLoader

import scala.collection.mutable.ArrayBuilder
import scala.util.control.NonFatal
Expand All @@ -46,6 +47,7 @@ import org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProv
import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -825,16 +827,14 @@ object JdbcDialects {

private[this] var dialects = List[JdbcDialect]()

registerDialect(MySQLDialect)
registerDialect(PostgresDialect)
registerDialect(DB2Dialect)
registerDialect(MsSqlServerDialect)
registerDialect(DerbyDialect)
registerDialect(OracleDialect)
registerDialect(TeradataDialect)
registerDialect(H2Dialect)
registerDialect(SnowflakeDialect)
registerDialect(DatabricksDialect)
private def registerDialects(): Unit = {
val loader = ServiceLoader.load(classOf[JdbcDialect], Utils.getContextOrSparkClassLoader)
val iter = loader.iterator()
while (iter.hasNext) {
registerDialect(iter.next())
}
}
registerDialects()

/**
* Fetch the JdbcDialect class corresponding to a given database url.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,11 @@ import org.apache.spark.sql.connector.expressions.{Expression, NullOrdering, Sor
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.jdbc.MsSqlServerDialect.{GEOGRAPHY, GEOMETRY}
import org.apache.spark.sql.types._


private object MsSqlServerDialect extends JdbcDialect {

// Special JDBC types in Microsoft SQL Server.
// https://github.com/microsoft/mssql-jdbc/blob/v9.4.1/src/main/java/microsoft/sql/Types.java
private object SpecificTypes {
val GEOMETRY = -157
val GEOGRAPHY = -158
}

private case class MsSqlServerDialect() extends JdbcDialect {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:sqlserver")

Expand Down Expand Up @@ -113,7 +106,7 @@ private object MsSqlServerDialect extends JdbcDialect {
// Reference doc: https://learn.microsoft.com/en-us/sql/t-sql/data-types
case java.sql.Types.SMALLINT | java.sql.Types.TINYINT => Some(ShortType)
case java.sql.Types.REAL => Some(FloatType)
case SpecificTypes.GEOMETRY | SpecificTypes.GEOGRAPHY => Some(BinaryType)
case GEOMETRY | GEOGRAPHY => Some(BinaryType)
case _ => None
}
}
Expand Down Expand Up @@ -226,3 +219,10 @@ private object MsSqlServerDialect extends JdbcDialect {

override def supportsLimit: Boolean = true
}

private object MsSqlServerDialect {
// Special JDBC types in Microsoft SQL Server.
// https://github.com/microsoft/mssql-jdbc/blob/v9.4.1/src/main/java/microsoft/sql/Types.java
final val GEOMETRY = -157
final val GEOGRAPHY = -158
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.types._

private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
private case class MySQLDialect() extends JdbcDialect with SQLConfHelper {

override def canHandle(url : String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:mysql")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,11 @@ import scala.util.control.NonFatal
import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.connector.expressions.Expression
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.jdbc.OracleDialect._
import org.apache.spark.sql.types._


private case object OracleDialect extends JdbcDialect {
private[jdbc] val BINARY_FLOAT = 100
private[jdbc] val BINARY_DOUBLE = 101
private[jdbc] val TIMESTAMP_TZ = -101
// oracle.jdbc.OracleType.TIMESTAMP_WITH_LOCAL_TIME_ZONE
private[jdbc] val TIMESTAMP_LTZ = -102


private case class OracleDialect() extends JdbcDialect {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:oracle")

Expand Down Expand Up @@ -230,3 +224,11 @@ private case object OracleDialect extends JdbcDialect {

override def supportsOffset: Boolean = true
}

private[jdbc] object OracleDialect {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Mar 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, actually, case object OracleDialect has more features (serializable, hashCode, toString) than object OracleDialect. Do we need to shrink features like this?

Copy link
Member Author

@yaooqinn yaooqinn Mar 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only a place for containing some const ints, which represent some variants from Oracle JDBC extensions. Thus, object is enough

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opps, the errors occured

- simple scan with LIMIT *** FAILED *** (65 milliseconds)
[info]   org.apache.spark.SparkException: Task not serializable

final val BINARY_FLOAT = 100
final val BINARY_DOUBLE = 101
final val TIMESTAMP_TZ = -101
// oracle.jdbc.OracleType.TIMESTAMP_WITH_LOCAL_TIME_ZONE
final val TIMESTAMP_LTZ = -102
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
import org.apache.spark.sql.types._


private object PostgresDialect extends JdbcDialect with SQLConfHelper {
private case class PostgresDialect() extends JdbcDialect with SQLConfHelper {

override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:postgresql")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Locale
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.types.{BooleanType, DataType}

private case object SnowflakeDialect extends JdbcDialect {
private case class SnowflakeDialect() extends JdbcDialect {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:snowflake")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.types._


private case object TeradataDialect extends JdbcDialect {
private case class TeradataDialect() extends JdbcDialect {

override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:teradata")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

org.apache.spark.sql.jdbc.DummyDatabaseDialect
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.jdbc

class DummyDatabaseDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:dummy")
}
61 changes: 35 additions & 26 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -786,12 +786,12 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
}

test("Default jdbc dialect registration") {
assert(JdbcDialects.get("jdbc:mysql://127.0.0.1/db") == MySQLDialect)
assert(JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") == PostgresDialect)
assert(JdbcDialects.get("jdbc:db2://127.0.0.1/db") == DB2Dialect)
assert(JdbcDialects.get("jdbc:sqlserver://127.0.0.1/db") == MsSqlServerDialect)
assert(JdbcDialects.get("jdbc:derby:db") == DerbyDialect)
assert(JdbcDialects.get("test.invalid") == NoopDialect)
assert(JdbcDialects.get("jdbc:mysql://127.0.0.1/db") === MySQLDialect())
assert(JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") === PostgresDialect())
assert(JdbcDialects.get("jdbc:db2://127.0.0.1/db") === DB2Dialect())
assert(JdbcDialects.get("jdbc:sqlserver://127.0.0.1/db") === MsSqlServerDialect())
assert(JdbcDialects.get("jdbc:derby:db") === DerbyDialect())
assert(JdbcDialects.get("test.invalid") === NoopDialect)
}

test("quote column names by jdbc dialect") {
Expand Down Expand Up @@ -846,13 +846,13 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
}

test("Dialect unregister") {
JdbcDialects.unregisterDialect(H2Dialect)
JdbcDialects.unregisterDialect(H2Dialect())
try {
JdbcDialects.registerDialect(testH2Dialect)
JdbcDialects.unregisterDialect(testH2Dialect)
assert(JdbcDialects.get(urlWithUserAndPass) == NoopDialect)
} finally {
JdbcDialects.registerDialect(H2Dialect)
JdbcDialects.registerDialect(H2Dialect())
}
}

Expand Down Expand Up @@ -997,7 +997,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
// JDBC url is a required option but is not used in this test.
val options = new JDBCOptions(Map("url" -> "jdbc:h2://host:port", "dbtable" -> "test"))
assert(
OracleDialect
OracleDialect()
.getJdbcSQLQueryBuilder(options)
.withColumns(Array("a", "b"))
.withLimit(123)
Expand Down Expand Up @@ -1053,7 +1053,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
// JDBC url is a required option but is not used in this test.
val options = new JDBCOptions(Map("url" -> "jdbc:h2://host:port", "dbtable" -> "test"))
assert(
MsSqlServerDialect
MsSqlServerDialect()
.getJdbcSQLQueryBuilder(options)
.withColumns(Array("a", "b"))
.withLimit(123)
Expand All @@ -1066,7 +1066,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
// JDBC url is a required option but is not used in this test.
val options = new JDBCOptions(Map("url" -> "jdbc:db2://host:port", "dbtable" -> "test"))
assert(
DB2Dialect
DB2Dialect()
.getJdbcSQLQueryBuilder(options)
.withColumns(Array("a", "b"))
.withLimit(123)
Expand Down Expand Up @@ -1938,20 +1938,20 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
}

test("SPARK-28552: Case-insensitive database URLs in JdbcDialect") {
assert(JdbcDialects.get("jdbc:mysql://localhost/db") === MySQLDialect)
assert(JdbcDialects.get("jdbc:MySQL://localhost/db") === MySQLDialect)
assert(JdbcDialects.get("jdbc:postgresql://localhost/db") === PostgresDialect)
assert(JdbcDialects.get("jdbc:postGresql://localhost/db") === PostgresDialect)
assert(JdbcDialects.get("jdbc:db2://localhost/db") === DB2Dialect)
assert(JdbcDialects.get("jdbc:DB2://localhost/db") === DB2Dialect)
assert(JdbcDialects.get("jdbc:sqlserver://localhost/db") === MsSqlServerDialect)
assert(JdbcDialects.get("jdbc:sqlServer://localhost/db") === MsSqlServerDialect)
assert(JdbcDialects.get("jdbc:derby://localhost/db") === DerbyDialect)
assert(JdbcDialects.get("jdbc:derBy://localhost/db") === DerbyDialect)
assert(JdbcDialects.get("jdbc:oracle://localhost/db") === OracleDialect)
assert(JdbcDialects.get("jdbc:Oracle://localhost/db") === OracleDialect)
assert(JdbcDialects.get("jdbc:teradata://localhost/db") === TeradataDialect)
assert(JdbcDialects.get("jdbc:Teradata://localhost/db") === TeradataDialect)
assert(JdbcDialects.get("jdbc:mysql://localhost/db") === MySQLDialect())
assert(JdbcDialects.get("jdbc:MySQL://localhost/db") === MySQLDialect())
assert(JdbcDialects.get("jdbc:postgresql://localhost/db") === PostgresDialect())
assert(JdbcDialects.get("jdbc:postGresql://localhost/db") === PostgresDialect())
assert(JdbcDialects.get("jdbc:db2://localhost/db") === DB2Dialect())
assert(JdbcDialects.get("jdbc:DB2://localhost/db") === DB2Dialect())
assert(JdbcDialects.get("jdbc:sqlserver://localhost/db") === MsSqlServerDialect())
assert(JdbcDialects.get("jdbc:sqlServer://localhost/db") === MsSqlServerDialect())
assert(JdbcDialects.get("jdbc:derby://localhost/db") === DerbyDialect())
assert(JdbcDialects.get("jdbc:derBy://localhost/db") === DerbyDialect())
assert(JdbcDialects.get("jdbc:oracle://localhost/db") === OracleDialect())
assert(JdbcDialects.get("jdbc:Oracle://localhost/db") === OracleDialect())
assert(JdbcDialects.get("jdbc:teradata://localhost/db") === TeradataDialect())
assert(JdbcDialects.get("jdbc:Teradata://localhost/db") === TeradataDialect())
}

test("SQLContext.jdbc (deprecated)") {
Expand Down Expand Up @@ -2099,7 +2099,8 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
}

test("SPARK-45139: DatabricksDialect url handling") {
assert(JdbcDialects.get("jdbc:databricks://account.cloud.databricks.com") == DatabricksDialect)
assert(JdbcDialects.get("jdbc:databricks://account.cloud.databricks.com") ===
DatabricksDialect())
}

test("SPARK-45139: DatabricksDialect catalyst type mapping") {
Expand Down Expand Up @@ -2154,4 +2155,12 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
val expected = Map("percentile_approx_val" -> 49)
assert(namedObservation.get === expected)
}

test("SPARK-47496: ServiceLoader support for JDBC dialects") {
var dialect = JdbcDialects.get("jdbc:dummy:dummy_host:dummy_port/dummy_db")
assert(dialect.isInstanceOf[DummyDatabaseDialect])
JdbcDialects.unregisterDialect(dialect)
dialect = JdbcDialects.get("jdbc:dummy:dummy_host:dummy_port/dummy_db")
assert(dialect === NoopDialect)
}
}
Loading