Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,13 @@ abstract class DockerJDBCIntegrationSuite extends SharedSparkSession with Eventu
val connectionTimeout = timeout(2.minutes)

private var docker: DockerClient = _
protected var externalPort: Int = _
// Configure networking (necessary for boot2docker / Docker Machine)
protected lazy val externalPort: Int = {
val sock = new ServerSocket(0)
val port = sock.getLocalPort
sock.close()
port
}
private var containerId: String = _
protected var jdbcUrl: String = _

Expand All @@ -122,13 +128,6 @@ abstract class DockerJDBCIntegrationSuite extends SharedSparkSession with Eventu
log.warn(s"Docker image ${db.imageName} not found; pulling image from registry")
docker.pull(db.imageName)
}
// Configure networking (necessary for boot2docker / Docker Machine)
externalPort = {
val sock = new ServerSocket(0)
val port = sock.getLocalPort
sock.close()
port
}
val hostConfigBuilder = HostConfig.builder()
.privileged(db.privileged)
.networkMode("bridge")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.jdbc.v2

import java.sql.Connection

import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkConf
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.tags.DockerTest
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz remove all the unused imports.


/**
* The following would be the steps to test this
* 1. Build Oracle database in Docker, please refer below link about how to.
* https://github.com/oracle/docker-images/blob/master/OracleDatabase/SingleInstance/README.md
* 2. export ORACLE_DOCKER_IMAGE_NAME=$ORACLE_DOCKER_IMAGE_NAME
* Pull oracle $ORACLE_DOCKER_IMAGE_NAME image - docker pull $ORACLE_DOCKER_IMAGE_NAME
* 3. Start docker - sudo service docker start
* 4. Run spark test - ./build/sbt -Pdocker-integration-tests
* "test-only org.apache.spark.sql.jdbc.v2.OracleIntegrationSuite"
*
* An actual sequence of commands to run the test is as follows
*
* $ git clone https://github.com/oracle/docker-images.git
* // Head SHA: 3e352a22618070595f823977a0fd1a3a8071a83c
* $ cd docker-images/OracleDatabase/SingleInstance/dockerfiles
* $ ./buildDockerImage.sh -v 18.4.0 -x
* $ export ORACLE_DOCKER_IMAGE_NAME=oracle/database:18.4.0-xe
* $ cd $SPARK_HOME
* $ ./build/sbt -Pdocker-integration-tests
* "test-only org.apache.spark.sql.jdbc.v2.OracleIntegrationSuite"
*
* It has been validated with 18.4.0 Express Edition.
*/
@DockerTest
class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSparkSession {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we run the existing tests of o.a.s.s.jdbc.OracleIntegrationSuite for the V2 JDBC path? I think it is okay to fix it in a separate PR though.

Copy link
Member Author

@MaxGekk MaxGekk Oct 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened the JIRA ticket for that https://issues.apache.org/jira/browse/SPARK-33066, let's do that separately. Probably, we will need to split the ticket per each supported dialect.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, looks okay. Thanks for opening it, Max!

override val db = new DatabaseOnDocker {
override val imageName = sys.env("ORACLE_DOCKER_IMAGE_NAME")
override val env = Map(
"ORACLE_PWD" -> "oracle"
)
override val usesIpc = false
override val jdbcPort: Int = 1521
override def getJdbcUrl(ip: String, port: Int): String =
s"jdbc:oracle:thin:system/oracle@//$ip:$port/xe"
}

override def sparkConf: SparkConf = super.sparkConf
.set("spark.sql.catalog.oracle", classOf[JDBCTableCatalog].getName)
.set("spark.sql.catalog.oracle.url", db.getJdbcUrl(dockerIp, externalPort))

override val connectionTimeout = timeout(7.minutes)
override def dataPreparation(conn: Connection): Unit = {}

test("SPARK-33034: ALTER TABLE ... add new columns") {
withTable("oracle.alt_table") {
sql("CREATE TABLE oracle.alt_table (ID STRING) USING _")
sql("ALTER TABLE oracle.alt_table ADD COLUMNS (C1 STRING, C2 STRING)")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this ALTER command always succeed? What if we add a new column having the same name with the existing column? Anyway, I think it is better to add some tests for error cases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We test here dialect specific changes:
ALTER TABLE ... ALTER COLUMN vs ALTER TABLE ... ADD
I believe the test for error handling should be added to JDBCTableCatalogSuite since error handling should be generic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added negative tests to the common tests #29945

var t = spark.table("oracle.alt_table")
var expectedSchema = new StructType()
.add("ID", StringType)
.add("C1", StringType)
.add("C2", StringType)
assert(t.schema === expectedSchema)
sql("ALTER TABLE oracle.alt_table ADD COLUMNS (C3 STRING)")
t = spark.table("oracle.alt_table")
expectedSchema = expectedSchema.add("C3", StringType)
assert(t.schema === expectedSchema)
// Add already existing column
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE oracle.alt_table ADD COLUMNS (C3 DOUBLE)")
}.getMessage
assert(msg.contains("Cannot add column, because C3 already exists"))
}
// Add a column to not existing table
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE oracle.not_existing_table ADD COLUMNS (C4 STRING)")
}.getMessage
assert(msg.contains("Table not found"))
}

test("SPARK-33034: ALTER TABLE ... update column type") {
withTable("oracle.alt_table") {
sql("CREATE TABLE oracle.alt_table (ID INTEGER) USING _")
sql("ALTER TABLE oracle.alt_table ALTER COLUMN id TYPE STRING")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: We can alter a column from a string type to a int one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No:

rg.apache.spark.sql.AnalysisException: Cannot update alt_table field ID: string cannot be cast to int; line 1 pos 0;
[info] AlterTable org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog@3ebc40fe, alt_table, RelationV2[ID#25] alt_table, [org.apache.spark.sql.connector.catalog.TableChange$UpdateColumnType@ce035e83]
[info]   at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$31(CheckAnalysis.scala:528)
[info]   at scala.collection.immutable.List.foreach(List.scala:392)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:489)

I will add a check for that.

val t = spark.table("oracle.alt_table")
val expectedSchema = new StructType().add("ID", StringType)
assert(t.schema === expectedSchema)
// Update column type from STRING to INTEGER
val msg1 = intercept[AnalysisException] {
sql("ALTER TABLE oracle.alt_table ALTER COLUMN id TYPE INTEGER")
}.getMessage
assert(msg1.contains("Cannot update alt_table field ID: string cannot be cast to int"))
// Update not existing column
val msg2 = intercept[AnalysisException] {
sql("ALTER TABLE oracle.alt_table ALTER COLUMN bad_column TYPE DOUBLE")
}.getMessage
assert(msg2.contains("Cannot update missing field bad_column"))
// Update column to wrong type
val msg3 = intercept[ParseException] {
sql("ALTER TABLE oracle.alt_table ALTER COLUMN id TYPE bad_type")
}.getMessage
assert(msg3.contains("DataType bad_type is not supported"))
}
// Update column type in not existing table
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE oracle.not_existing_table ALTER COLUMN id TYPE DOUBLE")
}.getMessage
assert(msg.contains("Table not found"))
}

test("SPARK-33034: ALTER TABLE ... update column nullability") {
withTable("oracle.alt_table") {
sql("CREATE TABLE oracle.alt_table (ID STRING NOT NULL) USING _")
sql("ALTER TABLE oracle.alt_table ALTER COLUMN ID DROP NOT NULL")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: What if we drop a non-existent column?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add negative tests.

val t = spark.table("oracle.alt_table")
val expectedSchema = new StructType().add("ID", StringType, nullable = true)
assert(t.schema === expectedSchema)
// Update nullability of not existing column
val msg = intercept[AnalysisException] {
sql("ALTER TABLE oracle.alt_table ALTER COLUMN bad_column DROP NOT NULL")
}.getMessage
assert(msg.contains("Cannot update missing field bad_column"))
}
// Update column nullability in not existing table
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE oracle.not_existing_table ALTER COLUMN ID DROP NOT NULL")
}.getMessage
assert(msg.contains("Table not found"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap

case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOptions)
extends Table with SupportsRead with SupportsWrite {
assert(ident.namespace().length == 1)

override def name(): String = ident.toString

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ abstract class JdbcDialect extends Serializable {
case add: AddColumn if add.fieldNames.length == 1 =>
val dataType = JdbcUtils.getJdbcType(add.dataType(), this).databaseTypeDefinition
val name = add.fieldNames
updateClause += s"ALTER TABLE $tableName ADD COLUMN ${name(0)} $dataType"
updateClause += getAddColumnQuery(tableName, name(0), dataType)
case rename: RenameColumn if rename.fieldNames.length == 1 =>
val name = rename.fieldNames
updateClause += s"ALTER TABLE $tableName RENAME COLUMN ${name(0)} TO ${rename.newName}"
Expand All @@ -223,17 +223,36 @@ abstract class JdbcDialect extends Serializable {
val name = updateColumnType.fieldNames
val dataType = JdbcUtils.getJdbcType(updateColumnType.newDataType(), this)
.databaseTypeDefinition
updateClause += s"ALTER TABLE $tableName ALTER COLUMN ${name(0)} $dataType"
updateClause += getUpdateColumnTypeQuery(tableName, name(0), dataType)
case updateNull: UpdateColumnNullability if updateNull.fieldNames.length == 1 =>
val name = updateNull.fieldNames
val nullable = if (updateNull.nullable()) "NULL" else "NOT NULL"
updateClause += s"ALTER TABLE $tableName ALTER COLUMN ${name(0)} SET $nullable"
updateClause += getUpdateColumnNullabilityQuery(tableName, name(0), updateNull.nullable())
case _ =>
throw new SQLFeatureNotSupportedException(s"Unsupported TableChange $change")
}
}
updateClause.result()
}

def getAddColumnQuery(tableName: String, columnName: String, dataType: String): String = {
s"ALTER TABLE $tableName ADD COLUMN $columnName $dataType"
}

def getUpdateColumnTypeQuery(
tableName: String,
columnName: String,
newDataType: String): String = {
s"ALTER TABLE $tableName ALTER COLUMN $columnName $newDataType"
}

def getUpdateColumnNullabilityQuery(
tableName: String,
columnName: String,
isNullable: Boolean): String = {
val nullable = if (isNullable) "NULL" else "NOT NULL"
s"ALTER TABLE $tableName ALTER COLUMN $columnName SET $nullable"
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,23 @@ private case object OracleDialect extends JdbcDialect {
case _ => s"TRUNCATE TABLE $table"
}
}

// see https://docs.oracle.com/cd/B28359_01/server.111/b28286/statements_3001.htm#SQLRF01001
override def getAddColumnQuery(tableName: String, columnName: String, dataType: String): String =
s"ALTER TABLE $tableName ADD $columnName $dataType"

// see https://docs.oracle.com/cd/B28359_01/server.111/b28286/statements_3001.htm#SQLRF01001
override def getUpdateColumnTypeQuery(
tableName: String,
columnName: String,
newDataType: String): String =
s"ALTER TABLE $tableName MODIFY $columnName $newDataType"

override def getUpdateColumnNullabilityQuery(
tableName: String,
columnName: String,
isNullable: Boolean): String = {
val nullable = if (isNullable) "NULL" else "NOT NULL"
s"ALTER TABLE $tableName MODIFY $columnName $nullable"
}
}