Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@
package org.apache.spark.sql.jdbc.v2

import java.sql.{Connection, SQLFeatureNotSupportedException}
import java.util

import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkConf
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException
import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite}
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -115,4 +120,32 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
}

override def testIndex(tbl: String): Unit = {
val loaded = Catalogs.load("mysql", conf)
val jdbcTable = loaded.asInstanceOf[TableCatalog]
.loadTable(Identifier.of(Array.empty[String], "new_table"))
.asInstanceOf[SupportsIndex]
assert(jdbcTable.indexExists("i1") == false)
assert(jdbcTable.indexExists("i2") == false)

val properties = new util.Properties();
properties.put("KEY_BLOCK_SIZE", "10")
properties.put("COMMENT", "'this is a comment'")
jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
Array.empty[util.Map[NamedReference, util.Properties]], properties)

jdbcTable.createIndex("i2", "",
Array(FieldReference("col2"), FieldReference("col3"), FieldReference("col5")),
Array.empty[util.Map[NamedReference, util.Properties]], new util.Properties)

assert(jdbcTable.indexExists("i1") == true)
assert(jdbcTable.indexExists("i2") == true)

val m = intercept[IndexAlreadyExistsException] {
jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
Array.empty[util.Map[NamedReference, util.Properties]], properties)
}.getMessage
assert(m.contains("Failed to create index: i1 in new_table"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,5 +180,14 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
testCreateTableWithProperty(s"$catalogName.new_table")
}
}

def testIndex(tbl: String): Unit = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you want to add some test code here? If this is empty, the following test case is invalid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally leave it empty and let the subclass (e.g. MySQLIntegrationSuite) implement it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that we leave it empty because other dialects do not support index yet. how about

def supportIndex: Boolean = false

test(...) {
  require(supportsIndex)
  test code
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


test("SPARK-36913: Test INDEX") {
withTable(s"$catalogName.new_table") {
sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT, col4 INT, col5 INT)")
testIndex(s"$catalogName.new_table")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if you want to put all index test code (create, list, delete...) together into one test method. Sounds better to have individual test methods for different APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be more logical to put these in one test method?

indexExists   // negative test
createIndex
indexExists   // positive test
listIndex
dropIndex
indexExists   // negative test

In next PR, I will add listIndex /dropIndex and refine test

}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ public interface SupportsIndex extends Table {
* @param indexName the name of the index to be created
* @param indexType the IndexType of the index to be created
* @param columns the columns on which index to be created
* @param columnProperties the properties of the columns on which index to be created
* @param columnsProperties the properties of the columns on which index to be created
* @param properties the properties of the index to be created
* @throws IndexAlreadyExistsException If the index already exists (optional)
*/
void createIndex(String indexName,
String indexType,
NamedReference[] columns,
Map<NamedReference, Properties>[] columnProperties,
Map<NamedReference, Properties>[] columnsProperties,
Copy link
Member

@dongjoon-hyun dongjoon-hyun Oct 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, this looks like orthogonal to this PR. Do we need this in this PR? Never mind.

Properties properties)
throws IndexAlreadyExistsException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,5 @@ class PartitionsAlreadyExistException(message: String) extends AnalysisException
class FunctionAlreadyExistsException(db: String, func: String)
extends AnalysisException(s"Function '$func' already exists in database '$db'")

class IndexAlreadyExistsException(indexName: String, table: Identifier)
extends AnalysisException(s"Index '$indexName' already exists in table ${table.quoted}")
class IndexAlreadyExistsException(message: String, cause: Option[Throwable] = None)
extends AnalysisException(message, cause = cause)
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.jdbc

import java.sql.{Connection, Driver, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException}
import java.time.{Instant, LocalDate}
import java.util
import java.util.Locale
import java.util.concurrent.TimeUnit

Expand All @@ -37,6 +38,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateToDays, toJavaDate, toJavaTimestamp}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -1020,6 +1022,35 @@ object JdbcUtils extends Logging {
executeStatement(conn, options, s"DROP SCHEMA ${dialect.quoteIdentifier(namespace)}")
}

/**
* Create an index.
*/
def createIndex(
conn: Connection,
indexName: String,
indexType: String,
tableName: String,
columns: Array[NamedReference],
columnsProperties: Array[util.Map[NamedReference, util.Properties]],
properties: util.Properties,
options: JDBCOptions): Unit = {
val dialect = JdbcDialects.get(options.url)
executeStatement(conn, options,
dialect.createIndex(indexName, indexType, tableName, columns, columnsProperties, properties))
}

/**
* Check if an index exists
*/
def indexExists(
conn: Connection,
indexName: String,
tableName: String,
options: JDBCOptions): Boolean = {
val dialect = JdbcDialects.get(options.url)
dialect.indexExists(conn, indexName, tableName, options)
}

private def executeStatement(conn: Connection, options: JDBCOptions, sql: String): Unit = {
val statement = conn.createStatement
try {
Expand All @@ -1029,4 +1060,31 @@ object JdbcUtils extends Logging {
statement.close()
}
}

def executeQuery(conn: Connection, options: JDBCOptions, sql: String): ResultSet = {
val statement = conn.createStatement
try {
statement.setQueryTimeout(options.queryTimeout)
statement.executeQuery(sql)
} finally {
statement.close()
}
}

def classifyException[T](message: String, dialect: JdbcDialect)(f: => T): T = {
try {
f
} catch {
case e: Throwable => throw dialect.classifyException(message, e)
}
}

def withConnection[T](options: JDBCOptions)(f: Connection => T): T = {
val conn = createConnectionFactory(options)()
try {
f(conn)
} finally {
conn.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex}
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcOptionsInWrite}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcOptionsInWrite, JdbcUtils}
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOptions)
extends Table with SupportsRead with SupportsWrite {
extends Table with SupportsRead with SupportsWrite with SupportsIndex {

override def name(): String = ident.toString

Expand All @@ -48,4 +51,33 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
jdbcOptions.parameters.originalMap ++ info.options.asCaseSensitiveMap().asScala)
JDBCWriteBuilder(schema, mergedOptions)
}

override def createIndex(
indexName: String,
indexType: String,
columns: Array[NamedReference],
columnsProperties: Array[util.Map[NamedReference, util.Properties]],
properties: util.Properties): Unit = {
JdbcUtils.withConnection(jdbcOptions) { conn =>
JdbcUtils.classifyException(s"Failed to create index: $indexName in $name",
JdbcDialects.get(jdbcOptions.url)) {
JdbcUtils.createIndex(
conn, indexName, indexType, name, columns, columnsProperties, properties, jdbcOptions)
}
}
}

override def indexExists(indexName: String): Boolean = {
JdbcUtils.withConnection(jdbcOptions) { conn =>
JdbcUtils.indexExists(conn, indexName, name, jdbcOptions)
}
}

override def dropIndex(indexName: String): Boolean = {
throw new UnsupportedOperationException("dropIndex is not supported yet")
}

override def listIndexes(): Array[TableIndex] = {
throw new UnsupportedOperationException("listIndexes is not supported yet")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution.datasources.v2.jdbc

import java.sql.{Connection, SQLException}
import java.sql.SQLException
import java.util

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -57,7 +57,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging

override def listTables(namespace: Array[String]): Array[Identifier] = {
checkNamespace(namespace)
withConnection { conn =>
JdbcUtils.withConnection(options) { conn =>
val schemaPattern = if (namespace.length == 1) namespace.head else null
val rs = conn.getMetaData
.getTables(null, schemaPattern, "%", Array("TABLE"));
Expand All @@ -72,14 +72,14 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging
checkNamespace(ident.namespace())
val writeOptions = new JdbcOptionsInWrite(
options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident)))
classifyException(s"Failed table existence check: $ident") {
withConnection(JdbcUtils.tableExists(_, writeOptions))
JdbcUtils.classifyException(s"Failed table existence check: $ident", dialect) {
JdbcUtils.withConnection(options)(JdbcUtils.tableExists(_, writeOptions))
}
}

override def dropTable(ident: Identifier): Boolean = {
checkNamespace(ident.namespace())
withConnection { conn =>
JdbcUtils.withConnection(options) { conn =>
try {
JdbcUtils.dropTable(conn, getTableName(ident), options)
true
Expand All @@ -91,8 +91,8 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging

override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
checkNamespace(oldIdent.namespace())
withConnection { conn =>
classifyException(s"Failed table renaming from $oldIdent to $newIdent") {
JdbcUtils.withConnection(options) { conn =>
JdbcUtils.classifyException(s"Failed table renaming from $oldIdent to $newIdent", dialect) {
JdbcUtils.renameTable(conn, getTableName(oldIdent), getTableName(newIdent), options)
}
}
Expand Down Expand Up @@ -151,8 +151,8 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging

val writeOptions = new JdbcOptionsInWrite(tableOptions)
val caseSensitive = SQLConf.get.caseSensitiveAnalysis
withConnection { conn =>
classifyException(s"Failed table creation: $ident") {
JdbcUtils.withConnection(options) { conn =>
JdbcUtils.classifyException(s"Failed table creation: $ident", dialect) {
JdbcUtils.createTable(conn, getTableName(ident), schema, caseSensitive, writeOptions)
}
}
Expand All @@ -162,8 +162,8 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging

override def alterTable(ident: Identifier, changes: TableChange*): Table = {
checkNamespace(ident.namespace())
withConnection { conn =>
classifyException(s"Failed table altering: $ident") {
JdbcUtils.withConnection(options) { conn =>
JdbcUtils.classifyException(s"Failed table altering: $ident", dialect) {
JdbcUtils.alterTable(conn, getTableName(ident), changes, options)
}
loadTable(ident)
Expand All @@ -172,7 +172,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging

override def namespaceExists(namespace: Array[String]): Boolean = namespace match {
case Array(db) =>
withConnection { conn =>
JdbcUtils.withConnection(options) { conn =>
val rs = conn.getMetaData.getSchemas(null, db)
while (rs.next()) {
if (rs.getString(1) == db) return true;
Expand All @@ -183,7 +183,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging
}

override def listNamespaces(): Array[Array[String]] = {
withConnection { conn =>
JdbcUtils.withConnection(options) { conn =>
val schemaBuilder = ArrayBuilder.make[Array[String]]
val rs = conn.getMetaData.getSchemas()
while (rs.next()) {
Expand Down Expand Up @@ -234,8 +234,8 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging
}
}
}
withConnection { conn =>
classifyException(s"Failed create name space: $db") {
JdbcUtils.withConnection(options) { conn =>
JdbcUtils.classifyException(s"Failed create name space: $db", dialect) {
JdbcUtils.createNamespace(conn, options, db, comment)
}
}
Expand All @@ -253,7 +253,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging
changes.foreach {
case set: NamespaceChange.SetProperty =>
if (set.property() == SupportsNamespaces.PROP_COMMENT) {
withConnection { conn =>
JdbcUtils.withConnection(options) { conn =>
JdbcUtils.createNamespaceComment(conn, options, db, set.value)
}
} else {
Expand All @@ -262,7 +262,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging

case unset: NamespaceChange.RemoveProperty =>
if (unset.property() == SupportsNamespaces.PROP_COMMENT) {
withConnection { conn =>
JdbcUtils.withConnection(options) { conn =>
JdbcUtils.removeNamespaceComment(conn, options, db)
}
} else {
Expand All @@ -283,8 +283,8 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging
if (listTables(Array(db)).nonEmpty) {
throw QueryExecutionErrors.namespaceNotEmptyError(namespace)
}
withConnection { conn =>
classifyException(s"Failed drop name space: $db") {
JdbcUtils.withConnection(options) { conn =>
JdbcUtils.classifyException(s"Failed drop name space: $db", dialect) {
JdbcUtils.dropNamespace(conn, options, db)
true
}
Expand All @@ -301,24 +301,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging
}
}

private def withConnection[T](f: Connection => T): T = {
val conn = JdbcUtils.createConnectionFactory(options)()
try {
f(conn)
} finally {
conn.close()
}
}

private def getTableName(ident: Identifier): String = {
(ident.namespace() :+ ident.name()).map(dialect.quoteIdentifier).mkString(".")
}

private def classifyException[T](message: String)(f: => T): T = {
try {
f
} catch {
case e: Throwable => throw dialect.classifyException(message, e)
}
}
}
Loading