Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
org.apache.spark.sql.execution.datasources.jdbc.JDBCRelationProvider
org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2
org.apache.spark.sql.execution.datasources.noop.NoopDataSource
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.connector.catalog.TableProvider
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRelationProvider
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
Expand Down Expand Up @@ -582,7 +582,7 @@ object DataSource extends Logging {

/** A map to maintain backward compatibility in case we move data sources around. */
private val backwardCompatibilityMap: Map[String, String] = {
val jdbc = classOf[JdbcRelationProvider].getCanonicalName
val jdbc = classOf[JDBCRelationProvider].getCanonicalName
val json = classOf[JsonFileFormat].getCanonicalName
val parquet = classOf[ParquetFileFormat].getCanonicalName
val csv = classOf[CSVFileFormat].getCanonicalName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ object JDBCRDD extends Logging {
val url = options.url
val table = options.tableOrQuery
val dialect = JdbcDialects.get(url)
val conn: Connection = JdbcUtils.createConnectionFactory(options)()
val conn: Connection = JDBCUtils.createConnectionFactory(options)()
try {
val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
try {
statement.setQueryTimeout(options.queryTimeout)
val rs = statement.executeQuery()
try {
JdbcUtils.getSchema(rs, dialect, alwaysNullable = true)
JDBCUtils.getSchema(rs, dialect, alwaysNullable = true)
} finally {
rs.close()
}
Expand Down Expand Up @@ -158,7 +158,7 @@ object JDBCRDD extends Logging {
val quotedColumns = requiredColumns.map(colName => dialect.quoteIdentifier(colName))
new JDBCRDD(
sc,
JdbcUtils.createConnectionFactory(options),
JDBCUtils.createConnectionFactory(options),
pruneSchema(schema, requiredColumns),
quotedColumns,
filters,
Expand Down Expand Up @@ -302,7 +302,7 @@ private[jdbc] class JDBCRDD(
stmt.setFetchSize(options.fetchSize)
stmt.setQueryTimeout(options.queryTimeout)
rs = stmt.executeQuery()
val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)
val rowsIterator = JDBCUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)

CompletionIterator[InternalRow, Iterator[InternalRow]](
new InterruptibleIterator(context, rowsIterator), close())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ private[sql] object JDBCRelation extends Logging {
def getSchema(resolver: Resolver, jdbcOptions: JDBCOptions): StructType = {
val tableSchema = JDBCRDD.resolveTable(jdbcOptions)
jdbcOptions.customSchema match {
case Some(customSchema) => JdbcUtils.getCustomSchema(
case Some(customSchema) => JDBCUtils.getCustomSchema(
tableSchema, customSchema, resolver)
case None => tableSchema
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package org.apache.spark.sql.execution.datasources.jdbc

import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils._
import org.apache.spark.sql.execution.datasources.jdbc.JDBCUtils._
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, RelationProvider}

class JdbcRelationProvider extends CreatableRelationProvider
class JDBCRelationProvider extends CreatableRelationProvider
with RelationProvider with DataSourceRegister {

override def shortName(): String = "jdbc"
Expand All @@ -45,16 +45,16 @@ class JdbcRelationProvider extends CreatableRelationProvider
val options = new JdbcOptionsInWrite(parameters)
val isCaseSensitive = sqlContext.conf.caseSensitiveAnalysis

val conn = JdbcUtils.createConnectionFactory(options)()
val conn = JDBCUtils.createConnectionFactory(options)()
try {
val tableExists = JdbcUtils.tableExists(conn, options)
val tableExists = JDBCUtils.tableExists(conn, options)
if (tableExists) {
mode match {
case SaveMode.Overwrite =>
if (options.isTruncate && isCascadingTruncateTable(options.url) == Some(false)) {
// In this case, we should truncate table and then load.
truncateTable(conn, options)
val tableSchema = JdbcUtils.getSchemaOption(conn, options)
val tableSchema = JDBCUtils.getSchemaOption(conn, options)
saveTable(df, tableSchema, isCaseSensitive, options)
} else {
// Otherwise, do not truncate the table, instead drop and recreate it
Expand All @@ -64,7 +64,7 @@ class JdbcRelationProvider extends CreatableRelationProvider
}

case SaveMode.Append =>
val tableSchema = JdbcUtils.getSchemaOption(conn, options)
val tableSchema = JDBCUtils.getSchemaOption(conn, options)
saveTable(df, tableSchema, isCaseSensitive, options)

case SaveMode.ErrorIfExists =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.util.NextIterator
/**
* Util functions for JDBC tables.
*/
object JdbcUtils extends Logging {
object JDBCUtils extends Logging {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk . As you see the change on PostgreSQL dialect, this will break the downstream custom dialect. Shall we avoid to change this?

/**
* Returns a factory for creating connections to the given JDBC URL.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, TableChange}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcOptionsInWrite, JDBCRDD, JdbcUtils}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcOptionsInWrite, JDBCRDD, JDBCUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -70,14 +70,14 @@ class JDBCTableCatalog extends TableCatalog with Logging {
checkNamespace(ident.namespace())
val writeOptions = new JdbcOptionsInWrite(
options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident)))
withConnection(JdbcUtils.tableExists(_, writeOptions))
withConnection(JDBCUtils.tableExists(_, writeOptions))
}

override def dropTable(ident: Identifier): Boolean = {
checkNamespace(ident.namespace())
withConnection { conn =>
try {
JdbcUtils.dropTable(conn, getTableName(ident), options)
JDBCUtils.dropTable(conn, getTableName(ident), options)
true
} catch {
case _: SQLException => false
Expand All @@ -88,7 +88,7 @@ class JDBCTableCatalog extends TableCatalog with Logging {
override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
checkNamespace(oldIdent.namespace())
withConnection { conn =>
JdbcUtils.renameTable(conn, getTableName(oldIdent), getTableName(newIdent), options)
JDBCUtils.renameTable(conn, getTableName(oldIdent), getTableName(newIdent), options)
}
}

Expand Down Expand Up @@ -123,7 +123,7 @@ class JDBCTableCatalog extends TableCatalog with Logging {
options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident)))
val caseSensitive = SQLConf.get.caseSensitiveAnalysis
withConnection { conn =>
JdbcUtils.createTable(conn, getTableName(ident), schema, caseSensitive, writeOptions)
JDBCUtils.createTable(conn, getTableName(ident), schema, caseSensitive, writeOptions)
}

JDBCTable(ident, schema, writeOptions)
Expand All @@ -144,7 +144,7 @@ class JDBCTableCatalog extends TableCatalog with Logging {
}

private def withConnection[T](f: Connection => T): T = {
val conn = JdbcUtils.createConnectionFactory(options)()
val conn = JDBCUtils.createConnectionFactory(options)()
try {
f(conn)
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.jdbc
import java.sql.{Connection, Types}
import java.util.Locale

import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCUtils}
import org.apache.spark.sql.types._


Expand Down Expand Up @@ -80,7 +80,7 @@ private object PostgresDialect extends JdbcDialect {
JdbcType(s"NUMERIC(${t.precision},${t.scale})", java.sql.Types.NUMERIC))
case ArrayType(et, _) if et.isInstanceOf[AtomicType] =>
getJDBCType(et).map(_.databaseTypeDefinition)
.orElse(JdbcUtils.getCommonJDBCType(et).map(_.databaseTypeDefinition))
.orElse(JDBCUtils.getCommonJDBCType(et).map(_.databaseTypeDefinition))
.map(typeName => JdbcType(s"$typeName[]", java.sql.Types.ARRAY))
case _ => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,45 +22,45 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.types._

class JdbcUtilsSuite extends SparkFunSuite {
class JDBCUtilsSuite extends SparkFunSuite {

val tableSchema = StructType(Seq(
StructField("C1", StringType, false), StructField("C2", IntegerType, false)))
val caseSensitive = org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
val caseInsensitive = org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution

test("Parse user specified column types") {
assert(JdbcUtils.getCustomSchema(tableSchema, null, caseInsensitive) === tableSchema)
assert(JdbcUtils.getCustomSchema(tableSchema, "", caseInsensitive) === tableSchema)
assert(JDBCUtils.getCustomSchema(tableSchema, null, caseInsensitive) === tableSchema)
assert(JDBCUtils.getCustomSchema(tableSchema, "", caseInsensitive) === tableSchema)

assert(JdbcUtils.getCustomSchema(tableSchema, "c1 DATE", caseInsensitive) ===
assert(JDBCUtils.getCustomSchema(tableSchema, "c1 DATE", caseInsensitive) ===
StructType(Seq(StructField("C1", DateType, false), StructField("C2", IntegerType, false))))
assert(JdbcUtils.getCustomSchema(tableSchema, "c1 DATE", caseSensitive) ===
assert(JDBCUtils.getCustomSchema(tableSchema, "c1 DATE", caseSensitive) ===
StructType(Seq(StructField("C1", StringType, false), StructField("C2", IntegerType, false))))

assert(
JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", caseInsensitive) ===
JDBCUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", caseInsensitive) ===
StructType(Seq(StructField("C1", DateType, false), StructField("C2", StringType, false))))
assert(JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", caseSensitive) ===
assert(JDBCUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", caseSensitive) ===
StructType(Seq(StructField("C1", StringType, false), StructField("C2", StringType, false))))

// Throw AnalysisException
val duplicate = intercept[AnalysisException]{
JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, c1 STRING", caseInsensitive) ===
JDBCUtils.getCustomSchema(tableSchema, "c1 DATE, c1 STRING", caseInsensitive) ===
StructType(Seq(StructField("c1", DateType, false), StructField("c1", StringType, false)))
}
assert(duplicate.getMessage.contains(
"Found duplicate column(s) in the customSchema option value"))

// Throw ParseException
val dataTypeNotSupported = intercept[ParseException]{
JdbcUtils.getCustomSchema(tableSchema, "c3 DATEE, C2 STRING", caseInsensitive) ===
JDBCUtils.getCustomSchema(tableSchema, "c3 DATEE, C2 STRING", caseInsensitive) ===
StructType(Seq(StructField("c3", DateType, false), StructField("C2", StringType, false)))
}
assert(dataTypeNotSupported.getMessage.contains("DataType datee is not supported"))

val mismatchedInput = intercept[ParseException]{
JdbcUtils.getCustomSchema(tableSchema, "c3 DATE. C2 STRING", caseInsensitive) ===
JDBCUtils.getCustomSchema(tableSchema, "c3 DATE. C2 STRING", caseInsensitive) ===
StructType(Seq(StructField("c3", DateType, false), StructField("C2", StringType, false)))
}
assert(mismatchedInput.getMessage.contains("mismatched input '.' expecting"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeTestUtils
import org.apache.spark.sql.execution.{DataSourceScanExec, ExtendedMode}
import org.apache.spark.sql.execution.command.{ExplainCommand, ShowCreateTableCommand}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRDD, JDBCRelation, JdbcUtils}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRDD, JDBCRelation, JDBCUtils}
import org.apache.spark.sql.execution.metric.InputOutputMetricsHelper
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -1138,7 +1138,7 @@ class JDBCSuite extends QueryTest
test("SPARK-16625: General data types to be mapped to Oracle") {

def getJdbcType(dialect: JdbcDialect, dt: DataType): String = {
dialect.getJDBCType(dt).orElse(JdbcUtils.getCommonJDBCType(dt)).
dialect.getJDBCType(dt).orElse(JDBCUtils.getCommonJDBCType(dt)).
map(_.databaseTypeDefinition).get
}

Expand Down Expand Up @@ -1184,7 +1184,7 @@ class JDBCSuite extends QueryTest

test("SPARK-16387: Reserved SQL words are not escaped by JDBC writer") {
val df = spark.createDataset(Seq("a", "b", "c")).toDF("order")
val schema = JdbcUtils.schemaString(
val schema = JDBCUtils.schemaString(
df.schema,
df.sqlContext.conf.caseSensitiveAnalysis,
"jdbc:mysql://localhost:3306/temp")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.SparkException
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -390,7 +390,7 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter {
val expectedSchemaStr =
colTypes.map { case (col, dataType) => s""""$col" $dataType """ }.mkString(", ")

assert(JdbcUtils.schemaString(
assert(JDBCUtils.schemaString(
df.schema,
df.sqlContext.conf.caseSensitiveAnalysis,
url1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ class ResolvedDataSourceSuite extends SharedSparkSession {
test("jdbc") {
assert(
getProvidingClass("jdbc") ===
classOf[org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider])
classOf[org.apache.spark.sql.execution.datasources.jdbc.JDBCRelationProvider])
assert(
getProvidingClass("org.apache.spark.sql.execution.datasources.jdbc") ===
classOf[org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider])
classOf[org.apache.spark.sql.execution.datasources.jdbc.JDBCRelationProvider])
assert(
getProvidingClass("org.apache.spark.sql.jdbc") ===
classOf[org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider])
classOf[org.apache.spark.sql.execution.datasources.jdbc.JDBCRelationProvider])
}

test("json") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hive.jdbc.HiveDriver

import org.apache.spark.util.Utils

class JdbcConnectionUriSuite extends HiveThriftServer2Test {
class JDBCConnectionUriSuite extends HiveThriftServer2Test {
Utils.classForName(classOf[HiveDriver].getCanonicalName)

override def mode: ServerMode.Value = ServerMode.binary
Expand Down