-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32402][SQL] Implement ALTER TABLE in JDBC Table Catalog #29324
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| statement.setQueryTimeout(options.queryTimeout) | ||
| for (sql <- dialect.alterTable(tableName, changes)) { | ||
| statement.executeUpdate(sql) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am debating if I should use statement.executeBatch. The code is simpler without using batch. Not sure if it is common for user to add lots of columns at one time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if one of the statements fails? Do we leave the table in partially modified state? Should we perform all the statements atomically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
|
Thanks for the work, @huaxingao . IMO we need to add exhaustive tests in integration test units for checking if all the patterns can work on our native supported databases (pg, oracle, db2, mariadb, and sqlserver). |
|
@maropu Will check all the supported databases and have a follow-up to override the dialect if needed. |
|
Test build #126906 has finished for PR 29324 at commit
|
|
retest this please |
| case _ => | ||
| throw new IllegalArgumentException(s"Unsupported TableChange fieldNames" + | ||
| s" ${add.fieldNames}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the field names will not be supported?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fieldNames with 2 elements is not supported. It is for nested columns, e.g.
val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
val tableSchema = schema.add("point", pointStruct)
TableChange.addColumn(Array("point", "z"), DoubleType))
I don't think add nested column is supported by any of these commonly used databases (Oracle, DB2, MS SQL Server, Postgres, etc)
|
Test build #126909 has finished for PR 29324 at commit
|
|
retest this please |
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
Outdated
Show resolved
Hide resolved
| val statement = conn.createStatement | ||
| try { | ||
| statement.setQueryTimeout(options.queryTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other methods have similar code:
val statement = ...
try {
statement.setQueryTimeout(options.queryTimeout)
statement.execute ...
} finally {
statement.close()
}Could you put it to a private method.
| statement.setQueryTimeout(options.queryTimeout) | ||
| for (sql <- dialect.alterTable(tableName, changes)) { | ||
| statement.executeUpdate(sql) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if one of the statements fails? Do we leave the table in partially modified state? Should we perform all the statements atomically?
|
|
||
| import java.sql.{Connection, Date, Timestamp} | ||
|
|
||
| import scala.collection.mutable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: import scala.collection.mutable.ArrayBuilder
| * | ||
| * @param tableName The name of the table to be altered. | ||
| * @param changes Changes to apply to the table. | ||
| * @return The SQL statement to use for altering the table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It returns multiple SQL statements, correct? And mapping of input changes to SQL statement isn't 1<->1. Could correct this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it returns multiple SQL statements. I think the mapping of input changes to SQL statement is 1 to 1.
| rename.fieldNames match { | ||
| case Array(name) => | ||
| updateClause += s"ALTER TABLE $tableName RENAME COLUMN $name TO ${rename.newName}" | ||
| case _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not clear for me when it happens. Could you write a test for the case.
| throw new IllegalArgumentException(s"Unsupported TableChange fieldNames" + | ||
| s" ${delete.fieldNames}") | ||
| } | ||
| case _ => throw new IllegalArgumentException(s"JDBC alterTable has Unsupported" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think throw new NotImplementedError fits better for the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could create sub-tasks for other changes like UpdateColumnNullability and add TODO here if you are not going to implement them in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added UpdateColumnType and UpdateColumnNullability. I didn't add UpdateColumnComment and UpdateColumnPosition. Seems most of the databases don't support these two, so I didn't implement these two.
I added UpdateColumnNullability, but didn't add a test for it yet, because the test failed at the check here
case update: UpdateColumnNullability =>
val field = findField("update", update.fieldNames)
val fieldName = update.fieldNames.quoted
if (!update.nullable && field.nullable) {
alter.failAnalysis(
s"Cannot change nullable column to non-nullable: $fieldName")
}
I am confused. I thought the whole point of UpdateColumnNullability is to change nullable column to non-nullable?
sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
Outdated
Show resolved
Hide resolved
| withConnection { conn => | ||
| conn.prepareStatement("""CREATE TABLE "test"."alt_table" (id INTEGER)""").executeUpdate() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you use JDBC Table Catalog functionality:
sql("CREATE TABLE h2.test.alt_table ...")| assert(sql("DESCRIBE TABLE h2.test.alt_table").select("col_name").take(3) === | ||
| Seq(Row("ID"), Row("C1"), Row("C2"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could add a helper function which will check column existence in a table.
|
Test build #126911 has finished for PR 29324 at commit
|
…rces/jdbc/JdbcUtils.scala Co-authored-by: Maxim Gekk <[email protected]>
….scala Co-authored-by: Maxim Gekk <[email protected]>
|
Test build #126923 has finished for PR 29324 at commit
|
|
@MaxGekk Thank you very much for your review! I addressed all the comments. Could you please take one more look? Thanks! |
|
Test build #126927 has finished for PR 29324 at commit
|
|
retest this please |
|
Test build #126930 has finished for PR 29324 at commit
|
|
retest this please |
|
Test build #126941 has finished for PR 29324 at commit
|
|
retest this please |
|
Test build #126946 has finished for PR 29324 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
Outdated
Show resolved
Hide resolved
...rc/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
Outdated
Show resolved
Hide resolved
...rc/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
Outdated
Show resolved
Hide resolved
...rc/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
Outdated
Show resolved
Hide resolved
...rc/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
Outdated
Show resolved
Hide resolved
...rc/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
Outdated
Show resolved
Hide resolved
….scala Co-authored-by: Maxim Gekk <[email protected]>
…rces/v2/jdbc/JDBCTableCatalogSuite.scala Co-authored-by: Maxim Gekk <[email protected]>
…rces/v2/jdbc/JDBCTableCatalogSuite.scala Co-authored-by: Maxim Gekk <[email protected]>
…rces/v2/jdbc/JDBCTableCatalogSuite.scala Co-authored-by: Maxim Gekk <[email protected]>
| } else { | ||
| val metadata = conn.getMetaData | ||
| if (!metadata.supportsTransactions) { | ||
| throw new SQLFeatureNotSupportedException(s"${this.getClass.getName}.alterTable doesn't" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.getClass.getName is JdbcUtils, I don't think it's useful in the error message. How about
The target JDBC server does not support transaction and can only support
ALTER TABLE with a single action.
| change match { | ||
| case add: AddColumn => | ||
| add.fieldNames match { | ||
| case Array(name) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should have a better error message if the field name has more than one parts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to me fieldName always has only one element.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do believe it always has one element, maybe add an assert?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
People can alter a nested field, that why the type of fieldNames is Array[String]. e.g. ALTER TABLE t RENAME COLUMN a.b TO c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this look OK?
case _ =>
throw new SQLFeatureNotSupportedException("Nested column is not supported.")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add.fieldNames match {
case Array(name) =>
We will fail at this pattern match.
One way is
case add: AddColumn if add.fieldNames.length == 1 =>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan I changed code to what you suggested.
We don't support nested column in JDBC yet. In JdbcUtils.getCatalystType, we map java.sql.Types.STRUCT => StringType. In getCommonJDBCType, we don't have a match case for StructType and the default is None. Seems to me currently it is not possible to reach the code path of multiple parts fieldNames in JdbcDialects.alterTable, so I will not have a negative test case for this code path for now.
| assert(t.schema === expectedSchema) | ||
| sql("ALTER TABLE h2.test.alt_table ADD COLUMNS (C3 DOUBLE)") | ||
| t = spark.table("h2.test.alt_table") | ||
| expectedSchema = new StructType() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: expectedSchema = expectedSchema.add("C3", DoubleType)
cloud-fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except some minor comments.
| throw e | ||
| } finally { | ||
| statement.close() | ||
| conn.setAutoCommit(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we record auto commit status and restore back?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we don't need to record the original auto commit status. The default value of autocommit is true.
| update.fieldNames match { | ||
| case Array(name) => | ||
| val nullable = if (update.nullable()) "NULL" else "NOT NULL" | ||
| updateClause += s"ALTER TABLE $tableName ALTER COLUMN $name SET " + nullable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you already use s"", just put nullable into it too.
| s" support multiple alter table changes simultaneously for database server" + | ||
| s" that doesn't have transaction support.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need s"" here.
| case e: SQLException => | ||
| if (conn != null) conn.rollback() | ||
| throw e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we only need to rollback when SQLException? Isn't safer to rollback for all exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
Test build #127014 has finished for PR 29324 at commit
|
|
Thanks for reviewing. I have addressed the comments. |
| } else { | ||
| val metadata = conn.getMetaData | ||
| if (!metadata.supportsTransactions) { | ||
| throw new SQLFeatureNotSupportedException("The target JDBC server does not support" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: put a space at the end ... not support " +
|
Test build #127029 has finished for PR 29324 at commit
|
|
Test build #127041 has finished for PR 29324 at commit
|
|
retest this please |
|
Test build #127059 has finished for PR 29324 at commit
|
| } | ||
|
|
||
| private def getJdbcType(dt: DataType, dialect: JdbcDialect): JdbcType = { | ||
| private[sql] def getJdbcType(dt: DataType, dialect: JdbcDialect): JdbcType = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's okay to remove private[sql] because execution is already in the private package (see also SPARK-16964)
|
|
||
| /** | ||
| * Alter an existing table. | ||
| * TODO (SPARK-32523): Override this method in the dialects that have different syntax. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because you will override this method in other places, not here. Remember to remove this later. :)
| update.fieldNames match { | ||
| case Array(name) => | ||
| val dataType = JdbcUtils.getJdbcType(update.newDataType(), this) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We know fieldNames must be one element now. We don't need match and can just access fieldNames(0).
|
Test build #127082 has finished for PR 29324 at commit
|
|
Test build #127087 has finished for PR 29324 at commit
|
|
thanks, merging to master! |
|
Thanks everyone! |
What changes were proposed in this pull request?
Implement ALTER TABLE in JDBC Table Catalog
The following ALTER TABLE are implemented:
I haven't checked ALTER TABLE syntax for all the databases yet. I will check. If there are different syntax, I will have a follow-up to override the dialect.
Seems most of the databases don't support updating comments and column position, so I didn't implement UpdateColumnComment and UpdateColumnPosition.
Why are the changes needed?
Complete the JDBCTableCatalog implementation
Does this PR introduce any user-facing change?
Yes
JDBCTableCatalog.alterTableHow was this patch tested?
add new tests