Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark.sql.jdbc.v2

import java.sql.Connection
import java.sql.{Connection, SQLFeatureNotSupportedException}

import scala.collection.JavaConverters._

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.connector.catalog.NamespaceChange
import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.tags.DockerTest
Expand Down Expand Up @@ -55,11 +57,47 @@ class MySQLNamespaceSuite extends DockerJDBCIntegrationSuite with V2JDBCNamespac

override def dataPreparation(conn: Connection): Unit = {}

override def builtinNamespaces: Array[Array[String]] = Array()
override def builtinNamespaces: Array[Array[String]] =
Array(Array("information_schema"), Array("mysql"), Array("performance_schema"), Array("sys"))

override def listNamespaces(namespace: Array[String]): Array[Array[String]] = {
Array(builtinNamespaces.head, namespace) ++ builtinNamespaces.tail
}

override val supportsSchemaComment: Boolean = false

// Cannot get namespaces with conn.getMetaData.getSchemas
// TODO testListNamespaces()
// TODO testDropNamespaces()
override val supportsDropSchemaRestrict: Boolean = false

testListNamespaces()
testDropNamespaces()

test("Create or remove comment of namespace unsupported") {
val e1 = intercept[AnalysisException] {
catalog.createNamespace(Array("foo"), Map("comment" -> "test comment").asJava)
}
assert(e1.getMessage.contains("Failed create name space: foo"))
assert(e1.getCause.isInstanceOf[SQLFeatureNotSupportedException])
assert(e1.getCause.asInstanceOf[SQLFeatureNotSupportedException].getMessage
.contains("Create namespace comment is not supported"))
assert(catalog.namespaceExists(Array("foo")) === false)
catalog.createNamespace(Array("foo"), Map.empty[String, String].asJava)
assert(catalog.namespaceExists(Array("foo")) === true)
val e2 = intercept[AnalysisException] {
catalog.alterNamespace(Array("foo"), NamespaceChange
.setProperty("comment", "comment for foo"))
}
assert(e2.getMessage.contains("Failed create comment on name space: foo"))
assert(e2.getCause.isInstanceOf[SQLFeatureNotSupportedException])
assert(e2.getCause.asInstanceOf[SQLFeatureNotSupportedException].getMessage
.contains("Create namespace comment is not supported"))
val e3 = intercept[AnalysisException] {
catalog.alterNamespace(Array("foo"), NamespaceChange.removeProperty("comment"))
}
assert(e3.getMessage.contains("Failed remove comment on name space: foo"))
assert(e3.getCause.isInstanceOf[SQLFeatureNotSupportedException])
assert(e3.getCause.asInstanceOf[SQLFeatureNotSupportedException].getMessage
.contains("Remove namespace comment is not supported"))
catalog.dropNamespace(Array("foo"), cascade = true)
assert(catalog.namespaceExists(Array("foo")) === false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte

def supportsDropSchemaCascade: Boolean = true

def supportsDropSchemaRestrict: Boolean = true

def testListNamespaces(): Unit = {
test("listNamespaces: basic behavior") {
val commentMap = if (supportsSchemaComment) {
Expand All @@ -78,7 +80,11 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte
assert(createCommentWarning === false)
}

catalog.dropNamespace(Array("foo"), cascade = false)
if (supportsDropSchemaRestrict) {
catalog.dropNamespace(Array("foo"), cascade = false)
} else {
catalog.dropNamespace(Array("foo"), cascade = true)
}
assert(catalog.namespaceExists(Array("foo")) === false)
assert(catalog.listNamespaces() === builtinNamespaces)
val msg = intercept[AnalysisException] {
Expand All @@ -99,15 +105,21 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte
}
catalog.createNamespace(Array("foo"), commentMap.asJava)
assert(catalog.namespaceExists(Array("foo")) === true)
catalog.dropNamespace(Array("foo"), cascade = false)
if (supportsDropSchemaRestrict) {
catalog.dropNamespace(Array("foo"), cascade = false)
} else {
catalog.dropNamespace(Array("foo"), cascade = true)
}
assert(catalog.namespaceExists(Array("foo")) === false)

// Drop non empty namespace without cascade
catalog.createNamespace(Array("foo"), Map("comment" -> "test comment").asJava)
catalog.createNamespace(Array("foo"), commentMap.asJava)
assert(catalog.namespaceExists(Array("foo")) === true)
catalog.createTable(ident1, schema, Array.empty, emptyProps)
intercept[NonEmptyNamespaceException] {
catalog.dropNamespace(Array("foo"), cascade = false)
if (supportsDropSchemaRestrict) {
intercept[NonEmptyNamespaceException] {
catalog.dropNamespace(Array("foo"), cascade = false)
}
}

// Drop non empty namespace with cascade
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1944,4 +1944,16 @@ object QueryExecutionErrors {
def MultipleBucketTransformsError(): Throwable = {
new UnsupportedOperationException("Multiple bucket transforms are not supported.")
}

def unsupportedCreateNamespaceCommentError(): Throwable = {
new SQLFeatureNotSupportedException("Create namespace comment is not supported")
}

def unsupportedRemoveNamespaceCommentError(): Throwable = {
new SQLFeatureNotSupportedException("Remove namespace comment is not supported")
}

def unsupportedDropNamespaceRestrictError(): Throwable = {
new SQLFeatureNotSupportedException("Drop namespace restrict is not supported")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -971,53 +971,57 @@ object JdbcUtils extends Logging with SQLConfHelper {
}

/**
* Creates a namespace.
* Creates a schema.
*/
def createNamespace(
def createSchema(
conn: Connection,
options: JDBCOptions,
namespace: String,
schema: String,
comment: String): Unit = {
val statement = conn.createStatement
try {
statement.setQueryTimeout(options.queryTimeout)
val dialect = JdbcDialects.get(options.url)
dialect.createSchema(statement, schema, comment)
} finally {
statement.close()
}
}

def schemaExists(conn: Connection, options: JDBCOptions, schema: String): Boolean = {
val dialect = JdbcDialects.get(options.url)
dialect.schemasExists(conn, options, schema)
}

def listSchemas(conn: Connection, options: JDBCOptions): Array[Array[String]] = {
val dialect = JdbcDialects.get(options.url)
executeStatement(conn, options, s"CREATE SCHEMA ${dialect.quoteIdentifier(namespace)}")
if (!comment.isEmpty) createNamespaceComment(conn, options, namespace, comment)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the actual change here? Get rid of the try-catch in createNamespaceComment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want keep the atomic of create namespace with comment.

dialect.listSchemas(conn, options)
}

def createNamespaceComment(
def alterSchemaComment(
conn: Connection,
options: JDBCOptions,
namespace: String,
schema: String,
comment: String): Unit = {
val dialect = JdbcDialects.get(options.url)
try {
executeStatement(
conn, options, dialect.getSchemaCommentQuery(namespace, comment))
} catch {
case e: Exception =>
logWarning("Cannot create JDBC catalog comment. The catalog comment will be ignored.")
}
executeStatement(conn, options, dialect.getSchemaCommentQuery(schema, comment))
}

def removeNamespaceComment(
def removeSchemaComment(
conn: Connection,
options: JDBCOptions,
namespace: String): Unit = {
schema: String): Unit = {
val dialect = JdbcDialects.get(options.url)
try {
executeStatement(conn, options, dialect.removeSchemaCommentQuery(namespace))
} catch {
case e: Exception =>
logWarning("Cannot drop JDBC catalog comment.")
}
executeStatement(conn, options, dialect.removeSchemaCommentQuery(schema))
}

/**
* Drops a namespace from the JDBC database.
* Drops a schema from the JDBC database.
*/
def dropNamespace(
conn: Connection, options: JDBCOptions, namespace: String, cascade: Boolean): Unit = {
def dropSchema(
conn: Connection, options: JDBCOptions, schema: String, cascade: Boolean): Unit = {
val dialect = JdbcDialects.get(options.url)
executeStatement(conn, options, dialect.dropSchema(namespace, cascade))
executeStatement(conn, options, dialect.dropSchema(schema, cascade))
}

/**
Expand Down Expand Up @@ -1148,11 +1152,17 @@ object JdbcUtils extends Logging with SQLConfHelper {
}
}

def executeQuery(conn: Connection, options: JDBCOptions, sql: String): ResultSet = {
def executeQuery(conn: Connection, options: JDBCOptions, sql: String)(
f: ResultSet => Unit): Unit = {
val statement = conn.createStatement
try {
statement.setQueryTimeout(options.queryTimeout)
statement.executeQuery(sql)
val rs = statement.executeQuery(sql)
try {
f(rs)
} finally {
rs.close()
}
} finally {
statement.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuilder

import org.apache.spark.internal.Logging
import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange}
Expand Down Expand Up @@ -173,23 +172,14 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging
override def namespaceExists(namespace: Array[String]): Boolean = namespace match {
case Array(db) =>
JdbcUtils.withConnection(options) { conn =>
val rs = conn.getMetaData.getSchemas(null, db)
while (rs.next()) {
if (rs.getString(1) == db) return true;
}
false
JdbcUtils.schemaExists(conn, options, db)
}
case _ => false
}

override def listNamespaces(): Array[Array[String]] = {
JdbcUtils.withConnection(options) { conn =>
val schemaBuilder = ArrayBuilder.make[Array[String]]
val rs = conn.getMetaData.getSchemas()
while (rs.next()) {
schemaBuilder += Array(rs.getString(1))
}
schemaBuilder.result
JdbcUtils.listSchemas(conn, options)
}
}

Expand Down Expand Up @@ -236,7 +226,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging
}
JdbcUtils.withConnection(options) { conn =>
JdbcUtils.classifyException(s"Failed create name space: $db", dialect) {
JdbcUtils.createNamespace(conn, options, db, comment)
JdbcUtils.createSchema(conn, options, db, comment)
}
}

Expand All @@ -254,7 +244,9 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging
case set: NamespaceChange.SetProperty =>
if (set.property() == SupportsNamespaces.PROP_COMMENT) {
JdbcUtils.withConnection(options) { conn =>
JdbcUtils.createNamespaceComment(conn, options, db, set.value)
JdbcUtils.classifyException(s"Failed create comment on name space: $db", dialect) {
JdbcUtils.alterSchemaComment(conn, options, db, set.value)
}
}
} else {
throw QueryCompilationErrors.cannotSetJDBCNamespaceWithPropertyError(set.property)
Expand All @@ -263,7 +255,9 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging
case unset: NamespaceChange.RemoveProperty =>
if (unset.property() == SupportsNamespaces.PROP_COMMENT) {
JdbcUtils.withConnection(options) { conn =>
JdbcUtils.removeNamespaceComment(conn, options, db)
JdbcUtils.classifyException(s"Failed remove comment on name space: $db", dialect) {
JdbcUtils.removeSchemaComment(conn, options, db)
}
}
} else {
throw QueryCompilationErrors.cannotUnsetJDBCNamespaceWithPropertyError(unset.property)
Expand All @@ -284,7 +278,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging
case Array(db) if namespaceExists(namespace) =>
JdbcUtils.withConnection(options) { conn =>
JdbcUtils.classifyException(s"Failed drop name space: $db", dialect) {
JdbcUtils.dropNamespace(conn, options, db, cascade)
JdbcUtils.dropSchema(conn, options, db, cascade)
true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.jdbc

import java.sql.{Connection, Date, Timestamp}
import java.sql.{Connection, Date, Statement, Timestamp}
import java.time.{Instant, LocalDate}
import java.util

Expand Down Expand Up @@ -229,6 +229,45 @@ abstract class JdbcDialect extends Serializable with Logging{
}
}

/**
* Create schema with an optional comment. Empty string means no comment.
*/
def createSchema(statement: Statement, schema: String, comment: String): Unit = {
val schemaCommentQuery = if (comment.nonEmpty) {
// We generate comment query here so that it can fail earlier without creating the schema.
getSchemaCommentQuery(schema, comment)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's leave a comment: "We generate comment query here so that it can fail earlier without creating the schema."

} else {
comment
}
statement.executeUpdate(s"CREATE SCHEMA ${quoteIdentifier(schema)}")
if (comment.nonEmpty) {
statement.executeUpdate(schemaCommentQuery)
}
}

/**
* Check schema exists or not.
*/
def schemasExists(conn: Connection, options: JDBCOptions, schema: String): Boolean = {
val rs = conn.getMetaData.getSchemas(null, schema)
while (rs.next()) {
if (rs.getString(1) == schema) return true;
}
false
}

/**
* Lists all the schemas in this table.
*/
def listSchemas(conn: Connection, options: JDBCOptions): Array[Array[String]] = {
val schemaBuilder = ArrayBuilder.make[Array[String]]
val rs = conn.getMetaData.getSchemas()
while (rs.next()) {
schemaBuilder += Array(rs.getString(1))
}
schemaBuilder.result
}

/**
* Return Some[true] iff `TRUNCATE TABLE` causes cascading default.
* Some[true] : TRUNCATE TABLE causes cascading.
Expand Down
Loading