Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
override val jdbcPort: Int = 3306

override def getJdbcUrl(ip: String, port: Int): String =
s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass"
s"jdbc:mysql://$ip:$port/" +
s"mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true&useSSL=false"
}

override def sparkConf: SparkConf = super.sparkConf
Expand All @@ -59,7 +60,11 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {

override val connectionTimeout = timeout(7.minutes)

override def dataPreparation(conn: Connection): Unit = {}
private var mySQLVersion = -1

override def dataPreparation(conn: Connection): Unit = {
mySQLVersion = conn.getMetaData.getDatabaseMajorVersion
}

override def testUpdateColumnType(tbl: String): Unit = {
sql(s"CREATE TABLE $tbl (ID INTEGER) USING _")
Expand All @@ -77,11 +82,26 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
assert(msg1.contains("Cannot update alt_table field ID: string cannot be cast to int"))
}

override def testRenameColumn(tbl: String): Unit = {
assert(mySQLVersion > 0)
if (mySQLVersion < 8) {
// Rename is unsupported for mysql versions < 8.0.
val exception = intercept[AnalysisException] {
sql(s"ALTER TABLE $tbl RENAME COLUMN ID TO RENAMED")
}
assert(exception.getCause != null, s"Wrong exception thrown: $exception")
val msg = exception.getCause.asInstanceOf[SQLFeatureNotSupportedException].getMessage
assert(msg.contains("Rename column is only supported for MySQL version 8.0 and above."))
} else {
super.testRenameColumn(tbl)
}
}

override def testUpdateColumnNullability(tbl: String): Unit = {
sql("CREATE TABLE mysql.alt_table (ID STRING NOT NULL) USING _")
sql(s"CREATE TABLE $tbl (ID STRING NOT NULL) USING _")
// Update nullability is unsupported for mysql db.
val msg = intercept[AnalysisException] {
sql("ALTER TABLE mysql.alt_table ALTER COLUMN ID DROP NOT NULL")
sql(s"ALTER TABLE $tbl ALTER COLUMN ID DROP NOT NULL")
}.getCause.asInstanceOf[SQLFeatureNotSupportedException].getMessage

assert(msg.contains("UpdateColumnNullability is not supported"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.tags.DockerTest

@DockerTest
trait V2JDBCTest extends SharedSparkSession {
private[v2] trait V2JDBCTest extends SharedSparkSession {
val catalogName: String
// dialect specific update column type test
def testUpdateColumnType(tbl: String): Unit
Expand All @@ -46,6 +46,14 @@ trait V2JDBCTest extends SharedSparkSession {
assert(msg.contains("Cannot update missing field bad_column"))
}

def testRenameColumn(tbl: String): Unit = {
sql(s"ALTER TABLE $tbl RENAME COLUMN ID TO RENAMED")
val t = spark.table(s"$tbl")
val expectedSchema = new StructType().add("RENAMED", StringType, nullable = true)
.add("ID1", StringType, nullable = true).add("ID2", StringType, nullable = true)
assert(t.schema === expectedSchema)
}

test("SPARK-33034: ALTER TABLE ... add new columns") {
withTable(s"$catalogName.alt_table") {
sql(s"CREATE TABLE $catalogName.alt_table (ID STRING) USING _")
Expand Down Expand Up @@ -110,6 +118,24 @@ trait V2JDBCTest extends SharedSparkSession {
assert(msg.contains("Table not found"))
}

test("SPARK-33034: ALTER TABLE ... rename column") {
withTable(s"$catalogName.alt_table") {
sql(s"CREATE TABLE $catalogName.alt_table (ID STRING NOT NULL," +
s" ID1 STRING NOT NULL, ID2 STRING NOT NULL) USING _")
testRenameColumn(s"$catalogName.alt_table")
// Rename to already existing column
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.alt_table RENAME COLUMN ID1 TO ID2")
}.getMessage
assert(msg.contains("Cannot rename column, because ID2 already exists"))
}
// Rename a column in a not existing table
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table RENAME COLUMN ID TO C")
}.getMessage
assert(msg.contains("Table not found"))
}

test("SPARK-33034: ALTER TABLE ... update column nullability") {
withTable(s"$catalogName.alt_table") {
testUpdateColumnNullability(s"$catalogName.alt_table")
Expand All @@ -121,3 +147,4 @@ trait V2JDBCTest extends SharedSparkSession {
assert(msg.contains("Table not found"))
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -895,19 +895,20 @@ object JdbcUtils extends Logging {
changes: Seq[TableChange],
options: JDBCOptions): Unit = {
val dialect = JdbcDialects.get(options.url)
val metaData = conn.getMetaData
if (changes.length == 1) {
executeStatement(conn, options, dialect.alterTable(tableName, changes)(0))
executeStatement(conn, options, dialect.alterTable(tableName, changes,
metaData.getDatabaseMajorVersion)(0))
} else {
val metadata = conn.getMetaData
if (!metadata.supportsTransactions) {
if (!metaData.supportsTransactions) {
throw new SQLFeatureNotSupportedException("The target JDBC server does not support " +
"transaction and can only support ALTER TABLE with a single action.")
} else {
conn.setAutoCommit(false)
val statement = conn.createStatement
try {
statement.setQueryTimeout(options.queryTimeout)
for (sql <- dialect.alterTable(tableName, changes)) {
for (sql <- dialect.alterTable(tableName, changes, metaData.getDatabaseMajorVersion)) {
statement.executeUpdate(sql)
}
conn.commit()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,10 @@ abstract class JdbcDialect extends Serializable {
* @param changes Changes to apply to the table.
* @return The SQL statements to use for altering the table.
*/
def alterTable(tableName: String, changes: Seq[TableChange]): Array[String] = {
def alterTable(
tableName: String,
changes: Seq[TableChange],
dbMajorVersion: Int): Array[String] = {
val updateClause = ArrayBuilder.make[String]
for (change <- changes) {
change match {
Expand All @@ -215,7 +218,7 @@ abstract class JdbcDialect extends Serializable {
updateClause += getAddColumnQuery(tableName, name(0), dataType)
case rename: RenameColumn if rename.fieldNames.length == 1 =>
val name = rename.fieldNames
updateClause += getRenameColumnQuery(tableName, name(0), rename.newName)
updateClause += getRenameColumnQuery(tableName, name(0), rename.newName, dbMajorVersion)
case delete: DeleteColumn if delete.fieldNames.length == 1 =>
val name = delete.fieldNames
updateClause += getDeleteColumnQuery(tableName, name(0))
Expand All @@ -237,7 +240,11 @@ abstract class JdbcDialect extends Serializable {
def getAddColumnQuery(tableName: String, columnName: String, dataType: String): String =
s"ALTER TABLE $tableName ADD COLUMN ${quoteIdentifier(columnName)} $dataType"

def getRenameColumnQuery(tableName: String, columnName: String, newName: String): String =
def getRenameColumnQuery(
tableName: String,
columnName: String,
newName: String,
dbMajorVersion: Int): String =
s"ALTER TABLE $tableName RENAME COLUMN ${quoteIdentifier(columnName)} TO" +
s" ${quoteIdentifier(newName)}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,25 @@ private case object MySQLDialect extends JdbcDialect {
s"ALTER TABLE $tableName MODIFY COLUMN ${quoteIdentifier(columnName)} $newDataType"
}

// See Old Syntax: https://dev.mysql.com/doc/refman/5.6/en/alter-table.html
// According to https://dev.mysql.com/worklog/task/?id=10761 old syntax works for
// both versions of MySQL i.e. 5.x and 8.0
// The old syntax requires us to have type definition. Since we do not have type
// information, we throw the exception for old version.
override def getRenameColumnQuery(
tableName: String,
columnName: String,
newName: String,
dbMajorVersion: Int): String = {
if (dbMajorVersion >= 8) {
s"ALTER TABLE $tableName RENAME COLUMN ${quoteIdentifier(columnName)} TO" +
s" ${quoteIdentifier(newName)}"
} else {
throw new SQLFeatureNotSupportedException(
s"Rename column is only supported for MySQL version 8.0 and above.")
}
}

// See https://dev.mysql.com/doc/refman/8.0/en/alter-table.html
// require to have column data type to change the column nullability
// ALTER TABLE tbl_name MODIFY [COLUMN] col_name column_definition
Expand Down