Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,11 @@
package org.apache.spark.sql.jdbc.v2

import java.sql.{Connection, SQLFeatureNotSupportedException}
import java.util

import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkConf
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException}
import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite}
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -122,66 +117,4 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
}

override def supportsIndex: Boolean = true

override def testIndexProperties(jdbcTable: SupportsIndex): Unit = {
val properties = new util.HashMap[String, String]();
properties.put("KEY_BLOCK_SIZE", "10")
properties.put("COMMENT", "'this is a comment'")
// MySQL doesn't allow property set on individual column, so use empty Array for
// column properties
jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
new util.HashMap[NamedReference, util.Map[String, String]](), properties)

var index = jdbcTable.listIndexes()
// The index property size is actually 1. Even though the index is created
// with properties "KEY_BLOCK_SIZE", "10" and "COMMENT", "'this is a comment'", when
// retrieving index using `SHOW INDEXES`, MySQL only returns `COMMENT`.
assert(index(0).properties.size == 1)
assert(index(0).properties.get("COMMENT").equals("this is a comment"))
}

override def testIndexUsingSQL(tbl: String): Unit = {
val loaded = Catalogs.load("mysql", conf)
val jdbcTable = loaded.asInstanceOf[TableCatalog]
.loadTable(Identifier.of(Array.empty[String], "new_table"))
.asInstanceOf[SupportsIndex]
assert(jdbcTable.indexExists("i1") == false)
assert(jdbcTable.indexExists("i2") == false)

val indexType = "DUMMY"
var m = intercept[UnsupportedOperationException] {
sql(s"CREATE index i1 ON $catalogName.new_table USING DUMMY (col1)")
}.getMessage
assert(m.contains(s"Index Type $indexType is not supported." +
s" The supported Index Types are: BTREE and HASH"))

sql(s"CREATE index i1 ON $catalogName.new_table USING BTREE (col1)")
sql(s"CREATE index i2 ON $catalogName.new_table (col2, col3, col5)" +
s" OPTIONS (KEY_BLOCK_SIZE=10)")

assert(jdbcTable.indexExists("i1") == true)
assert(jdbcTable.indexExists("i2") == true)

// This should pass without exception
sql(s"CREATE index IF NOT EXISTS i1 ON $catalogName.new_table (col1)")

m = intercept[IndexAlreadyExistsException] {
sql(s"CREATE index i1 ON $catalogName.new_table (col1)")
}.getMessage
assert(m.contains("Failed to create index i1 in new_table"))

sql(s"DROP index i1 ON $catalogName.new_table")
sql(s"DROP index i2 ON $catalogName.new_table")

assert(jdbcTable.indexExists("i1") == false)
assert(jdbcTable.indexExists("i2") == false)

// This should pass without exception
sql(s"DROP index IF EXISTS i1 ON $catalogName.new_table")

m = intercept[NoSuchIndexException] {
sql(s"DROP index i1 ON $catalogName.new_table")
}.getMessage
assert(m.contains("Failed to drop index i1 in new_table"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.jdbc.v2

import java.util

import org.apache.log4j.Level

import org.apache.spark.sql.AnalysisException
Expand All @@ -27,7 +25,6 @@ import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSu
import org.apache.spark.sql.catalyst.plans.logical.{Filter, Sample}
import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, V1ScanWrapper}
import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -193,103 +190,58 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
}

def supportsIndex: Boolean = false
def testIndexProperties(jdbcTable: SupportsIndex): Unit = {}
def testIndexUsingSQL(tbl: String): Unit = {}

test("SPARK-36913: Test INDEX") {
test("SPARK-36895: Test INDEX Using SQL") {
if (supportsIndex) {
withTable(s"$catalogName.new_table") {
sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT," +
s" col4 INT, col5 INT)")
" col4 INT, col5 INT)")
val loaded = Catalogs.load(catalogName, conf)
val jdbcTable = loaded.asInstanceOf[TableCatalog]
.loadTable(Identifier.of(Array.empty[String], "new_table"))
.asInstanceOf[SupportsIndex]
assert(jdbcTable.indexExists("i1") == false)
assert(jdbcTable.indexExists("i2") == false)

val properties = new util.HashMap[String, String]();
val indexType = "DUMMY"
var m = intercept[UnsupportedOperationException] {
jdbcTable.createIndex("i1", indexType, Array(FieldReference("col1")),
new util.HashMap[NamedReference, util.Map[String, String]](), properties)
sql(s"CREATE index i1 ON $catalogName.new_table USING $indexType (col1)")
}.getMessage
assert(m.contains(s"Index Type $indexType is not supported." +
s" The supported Index Types are: BTREE and HASH"))

jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
new util.HashMap[NamedReference, util.Map[String, String]](), properties)

jdbcTable.createIndex("i2", "",
Array(FieldReference("col2"), FieldReference("col3"), FieldReference("col5")),
new util.HashMap[NamedReference, util.Map[String, String]](), properties)
sql(s"CREATE index i1 ON $catalogName.new_table USING BTREE (col1)")
sql(s"CREATE index i2 ON $catalogName.new_table (col2, col3, col5)" +
s" OPTIONS (KEY_BLOCK_SIZE=10)")

assert(jdbcTable.indexExists("i1") == true)
assert(jdbcTable.indexExists("i2") == true)

// This should pass without exception
sql(s"CREATE index IF NOT EXISTS i1 ON $catalogName.new_table (col1)")

m = intercept[IndexAlreadyExistsException] {
jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
new util.HashMap[NamedReference, util.Map[String, String]](), properties)
sql(s"CREATE index i1 ON $catalogName.new_table (col1)")
}.getMessage
assert(m.contains("Failed to create index i1 in new_table"))

var index = jdbcTable.listIndexes()
assert(index.length == 2)

assert(index(0).indexName.equals("i1"))
assert(index(0).indexType.equals("BTREE"))
var cols = index(0).columns
assert(cols.length == 1)
assert(cols(0).describe().equals("col1"))
assert(index(0).properties.size == 0)

assert(index(1).indexName.equals("i2"))
assert(index(1).indexType.equals("BTREE"))
cols = index(1).columns
assert(cols.length == 3)
assert(cols(0).describe().equals("col2"))
assert(cols(1).describe().equals("col3"))
assert(cols(2).describe().equals("col5"))
assert(index(1).properties.size == 0)

jdbcTable.dropIndex("i1")
assert(jdbcTable.indexExists("i1") == false)
assert(jdbcTable.indexExists("i2") == true)

index = jdbcTable.listIndexes()
assert(index.length == 1)

assert(index(0).indexName.equals("i2"))
assert(index(0).indexType.equals("BTREE"))
cols = index(0).columns
assert(cols.length == 3)
assert(cols(0).describe().equals("col2"))
assert(cols(1).describe().equals("col3"))
assert(cols(2).describe().equals("col5"))
sql(s"DROP index i1 ON $catalogName.new_table")
sql(s"DROP index i2 ON $catalogName.new_table")

jdbcTable.dropIndex("i2")
assert(jdbcTable.indexExists("i1") == false)
assert(jdbcTable.indexExists("i2") == false)
index = jdbcTable.listIndexes()
assert(index.length == 0)

// This should pass without exception
sql(s"DROP index IF EXISTS i1 ON $catalogName.new_table")

m = intercept[NoSuchIndexException] {
jdbcTable.dropIndex("i2")
sql(s"DROP index i1 ON $catalogName.new_table")
}.getMessage
assert(m.contains("Failed to drop index i2 in new_table"))

testIndexProperties(jdbcTable)
assert(m.contains("Failed to drop index i1 in new_table"))
}
}
}

test("SPARK-36895: Test INDEX Using SQL") {
withTable(s"$catalogName.new_table") {
sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT, col4 INT, col5 INT)")
testIndexUsingSQL(s"$catalogName.new_table")
}
}

def supportsTableSample: Boolean = false

private def samplePushed(df: DataFrame): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,21 @@
@Evolving
public interface SupportsIndex extends Table {

/**
* A reserved property to specify the index type.
*/
String PROP_TYPE = "type";

/**
* Creates an index.
*
* @param indexName the name of the index to be created
* @param indexType the type of the index to be created. If this is not specified, Spark
* will use empty String.
* @param columns the columns on which index to be created
* @param columnsProperties the properties of the columns on which index to be created
* @param properties the properties of the index to be created
* @throws IndexAlreadyExistsException If the index already exists.
*/
void createIndex(String indexName,
String indexType,
NamedReference[] columns,
Map<NamedReference, Map<String, String>> columnsProperties,
Map<String, String> properties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1018,15 +1018,14 @@ object JdbcUtils extends Logging with SQLConfHelper {
def createIndex(
conn: Connection,
indexName: String,
indexType: String,
tableName: String,
columns: Array[NamedReference],
columnsProperties: util.Map[NamedReference, util.Map[String, String]],
properties: util.Map[String, String],
options: JDBCOptions): Unit = {
val dialect = JdbcDialects.get(options.url)
executeStatement(conn, options,
dialect.createIndex(indexName, indexType, tableName, columns, columnsProperties, properties))
dialect.createIndex(indexName, tableName, columns, columnsProperties, properties))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,20 @@ case class CreateIndexExec(
properties: Map[String, String])
extends LeafV2CommandExec {
override protected def run(): Seq[InternalRow] = {

val propertiesWithIndexType: Map[String, String] = if (indexType.nonEmpty) {
properties + (SupportsIndex.PROP_TYPE -> indexType)
} else {
properties
}

val colProperties = new util.HashMap[NamedReference, util.Map[String, String]]
columns.foreach {
case (column, map) => colProperties.put(column, map.asJava)
}
try {
table.createIndex(
indexName, indexType, columns.unzip._1.toArray, colProperties, properties.asJava)
indexName, columns.unzip._1.toArray, colProperties, propertiesWithIndexType.asJava)
} catch {
case _: IndexAlreadyExistsException if ignoreIfExists =>
logWarning(s"Index $indexName already exists in table ${table.name}. Ignoring.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,14 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt

override def createIndex(
indexName: String,
indexType: String,
columns: Array[NamedReference],
columnsProperties: util.Map[NamedReference, util.Map[String, String]],
properties: util.Map[String, String]): Unit = {
JdbcUtils.withConnection(jdbcOptions) { conn =>
JdbcUtils.classifyException(s"Failed to create index $indexName in $name",
JdbcDialects.get(jdbcOptions.url)) {
JdbcUtils.createIndex(
conn, indexName, indexType, name, columns, columnsProperties, properties, jdbcOptions)
conn, indexName, name, columns, columnsProperties, properties, jdbcOptions)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ abstract class JdbcDialect extends Serializable with Logging{
*/
def createIndex(
indexName: String,
indexType: String,
tableName: String,
columns: Array[NamedReference],
columnsProperties: util.Map[NamedReference, util.Map[String, String]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException}
import org.apache.spark.sql.connector.catalog.index.TableIndex
import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex}
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
Expand Down Expand Up @@ -115,30 +115,29 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
// https://dev.mysql.com/doc/refman/8.0/en/create-index.html
override def createIndex(
indexName: String,
indexType: String,
tableName: String,
columns: Array[NamedReference],
columnsProperties: util.Map[NamedReference, util.Map[String, String]],
properties: util.Map[String, String]): String = {
val columnList = columns.map(col => quoteIdentifier(col.fieldNames.head))
var indexProperties: String = ""
var indexType = ""
if (!properties.isEmpty) {
properties.asScala.foreach { case (k, v) =>
indexProperties = indexProperties + " " + s"$k $v"
}
}
val iType = if (indexType.isEmpty) {
""
} else {
if (indexType.length > 1 && !indexType.equalsIgnoreCase("BTREE") &&
!indexType.equalsIgnoreCase("HASH")) {
throw new UnsupportedOperationException(s"Index Type $indexType is not supported." +
" The supported Index Types are: BTREE and HASH")
if (k.equals(SupportsIndex.PROP_TYPE)) {
if (v.equalsIgnoreCase("BTREE") || v.equalsIgnoreCase("HASH")) {
indexType = s"USING $v"
} else {
throw new UnsupportedOperationException(s"Index Type $v is not supported." +
" The supported Index Types are: BTREE and HASH")
}
} else {
indexProperties = indexProperties + " " + s"$k $v"
}
}
s"USING $indexType"
}
// columnsProperties doesn't apply to MySQL so it is ignored
s"CREATE INDEX ${quoteIdentifier(indexName)} $iType ON" +
s"CREATE INDEX ${quoteIdentifier(indexName)} $indexType ON" +
s" ${quoteIdentifier(tableName)} (${columnList.mkString(", ")}) $indexProperties"
}

Expand Down