diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala index 6781d4514613..8d1bb08bc048 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala @@ -24,6 +24,8 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkConf import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException +import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog} import org.apache.spark.sql.connector.catalog.index.SupportsIndex import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog @@ -122,13 +124,13 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest { override def supportsIndex: Boolean = true override def testIndexProperties(jdbcTable: SupportsIndex): Unit = { - val properties = new util.Properties(); + val properties = new util.HashMap[String, String](); properties.put("KEY_BLOCK_SIZE", "10") properties.put("COMMENT", "'this is a comment'") // MySQL doesn't allow property set on individual column, so use empty Array for // column properties jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")), - Array.empty[util.Map[NamedReference, util.Properties]], properties) + new util.HashMap[NamedReference, util.Map[String, String]](), properties) var index = jdbcTable.listIndexes() // The index property size is actually 1. Even though the index is created @@ -137,4 +139,32 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest { assert(index(0).properties.size == 1) assert(index(0).properties.get("COMMENT").equals("this is a comment")) } + + override def testIndexUsingSQL(tbl: String): Unit = { + val loaded = Catalogs.load("mysql", conf) + val jdbcTable = loaded.asInstanceOf[TableCatalog] + .loadTable(Identifier.of(Array.empty[String], "new_table")) + .asInstanceOf[SupportsIndex] + assert(jdbcTable.indexExists("i1") == false) + assert(jdbcTable.indexExists("i2") == false) + + val indexType = "DUMMY" + var m = intercept[UnsupportedOperationException] { + sql(s"CREATE index i1 ON $catalogName.new_table USING DUMMY (col1)") + }.getMessage + assert(m.contains(s"Index Type $indexType is not supported." + + s" The supported Index Types are: BTREE and HASH")) + + sql(s"CREATE index i1 ON $catalogName.new_table USING BTREE (col1)") + sql(s"CREATE index i2 ON $catalogName.new_table (col2, col3, col5)" + + s" OPTIONS (KEY_BLOCK_SIZE=10)") + + assert(jdbcTable.indexExists("i1") == true) + assert(jdbcTable.indexExists("i2") == true) + + m = intercept[IndexAlreadyExistsException] { + sql(s"CREATE index i1 ON $catalogName.new_table (col1)") + }.getMessage + assert(m.contains("Failed to create index: i1 in new_table")) + } } diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index f3e3b34356c8..a97adf94ed1e 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -189,6 +189,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu def supportsIndex: Boolean = false def testIndexProperties(jdbcTable: SupportsIndex): Unit = {} + def testIndexUsingSQL(tbl: String): Unit = {} test("SPARK-36913: Test INDEX") { if (supportsIndex) { @@ -202,28 +203,28 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu assert(jdbcTable.indexExists("i1") == false) assert(jdbcTable.indexExists("i2") == false) - val properties = new util.Properties(); + val properties = new util.HashMap[String, String](); val indexType = "DUMMY" var m = intercept[UnsupportedOperationException] { jdbcTable.createIndex("i1", indexType, Array(FieldReference("col1")), - Array.empty[util.Map[NamedReference, util.Properties]], properties) + new util.HashMap[NamedReference, util.Map[String, String]](), properties) }.getMessage assert(m.contains(s"Index Type $indexType is not supported." + s" The supported Index Types are: BTREE and HASH")) jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")), - Array.empty[util.Map[NamedReference, util.Properties]], properties) + new util.HashMap[NamedReference, util.Map[String, String]](), properties) jdbcTable.createIndex("i2", "", Array(FieldReference("col2"), FieldReference("col3"), FieldReference("col5")), - Array.empty[util.Map[NamedReference, util.Properties]], properties) + new util.HashMap[NamedReference, util.Map[String, String]](), properties) assert(jdbcTable.indexExists("i1") == true) assert(jdbcTable.indexExists("i2") == true) m = intercept[IndexAlreadyExistsException] { jdbcTable.createIndex("i1", "", Array(FieldReference("col1")), - Array.empty[util.Map[NamedReference, util.Properties]], properties) + new util.HashMap[NamedReference, util.Map[String, String]](), properties) }.getMessage assert(m.contains("Failed to create index: i1 in new_table")) @@ -276,5 +277,11 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu } } } -} + test("SPARK-36895: Test INDEX Using SQL") { + withTable(s"$catalogName.new_table") { + sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT, col4 INT, col5 INT)") + testIndexUsingSQL(s"$catalogName.new_table") + } + } +} diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 8aca8eda190e..32b080cdbb08 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -112,9 +112,9 @@ statement | CREATE namespace (IF NOT EXISTS)? multipartIdentifier (commentSpec | locationSpec | - (WITH (DBPROPERTIES | PROPERTIES) tablePropertyList))* #createNamespace + (WITH (DBPROPERTIES | PROPERTIES) propertyList))* #createNamespace | ALTER namespace multipartIdentifier - SET (DBPROPERTIES | PROPERTIES) tablePropertyList #setNamespaceProperties + SET (DBPROPERTIES | PROPERTIES) propertyList #setNamespaceProperties | ALTER namespace multipartIdentifier SET locationSpec #setNamespaceLocation | DROP namespace (IF EXISTS)? multipartIdentifier @@ -130,7 +130,7 @@ statement rowFormat | createFileFormat | locationSpec | - (TBLPROPERTIES tableProps=tablePropertyList))* #createTableLike + (TBLPROPERTIES tableProps=propertyList))* #createTableLike | replaceTableHeader ('(' colTypeList ')')? tableProvider? createTableClauses (AS? query)? #replaceTable @@ -155,9 +155,9 @@ statement | ALTER (TABLE | VIEW) from=multipartIdentifier RENAME TO to=multipartIdentifier #renameTable | ALTER (TABLE | VIEW) multipartIdentifier - SET TBLPROPERTIES tablePropertyList #setTableProperties + SET TBLPROPERTIES propertyList #setTableProperties | ALTER (TABLE | VIEW) multipartIdentifier - UNSET TBLPROPERTIES (IF EXISTS)? tablePropertyList #unsetTableProperties + UNSET TBLPROPERTIES (IF EXISTS)? propertyList #unsetTableProperties | ALTER TABLE table=multipartIdentifier (ALTER | CHANGE) COLUMN? column=multipartIdentifier alterColumnAction? #alterTableAlterColumn @@ -168,9 +168,9 @@ statement REPLACE COLUMNS '(' columns=qualifiedColTypeWithPositionList ')' #hiveReplaceColumns | ALTER TABLE multipartIdentifier (partitionSpec)? - SET SERDE STRING (WITH SERDEPROPERTIES tablePropertyList)? #setTableSerDe + SET SERDE STRING (WITH SERDEPROPERTIES propertyList)? #setTableSerDe | ALTER TABLE multipartIdentifier (partitionSpec)? - SET SERDEPROPERTIES tablePropertyList #setTableSerDe + SET SERDEPROPERTIES propertyList #setTableSerDe | ALTER (TABLE | VIEW) multipartIdentifier ADD (IF NOT EXISTS)? partitionSpecLocation+ #addTablePartition | ALTER TABLE multipartIdentifier @@ -187,11 +187,11 @@ statement identifierCommentList? (commentSpec | (PARTITIONED ON identifierList) | - (TBLPROPERTIES tablePropertyList))* + (TBLPROPERTIES propertyList))* AS query #createView | CREATE (OR REPLACE)? GLOBAL? TEMPORARY VIEW tableIdentifier ('(' colTypeList ')')? tableProvider - (OPTIONS tablePropertyList)? #createTempViewUsing + (OPTIONS propertyList)? #createTempViewUsing | ALTER VIEW multipartIdentifier AS? query #alterViewQuery | CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)? multipartIdentifier AS className=STRING @@ -204,7 +204,7 @@ statement | SHOW TABLE EXTENDED ((FROM | IN) ns=multipartIdentifier)? LIKE pattern=STRING partitionSpec? #showTableExtended | SHOW TBLPROPERTIES table=multipartIdentifier - ('(' key=tablePropertyKey ')')? #showTblProperties + ('(' key=propertyKey ')')? #showTblProperties | SHOW COLUMNS (FROM | IN) table=multipartIdentifier ((FROM | IN) ns=multipartIdentifier)? #showColumns | SHOW VIEWS ((FROM | IN) multipartIdentifier)? @@ -228,7 +228,7 @@ statement | REFRESH FUNCTION multipartIdentifier #refreshFunction | REFRESH (STRING | .*?) #refreshResource | CACHE LAZY? TABLE multipartIdentifier - (OPTIONS options=tablePropertyList)? (AS? query)? #cacheTable + (OPTIONS options=propertyList)? (AS? query)? #cacheTable | UNCACHE TABLE (IF EXISTS)? multipartIdentifier #uncacheTable | CLEAR CACHE #clearCache | LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE @@ -247,6 +247,10 @@ statement | SET .*? #setConfiguration | RESET configKey #resetQuotedConfiguration | RESET .*? #resetConfiguration + | CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE? + multipartIdentifier (USING indexType=identifier)? + '(' columns=multipartIdentifierPropertyList ')' + (OPTIONS options=propertyList)? #createIndex | unsupportedHiveNativeCommands .*? #failNativeCommand ; @@ -341,7 +345,7 @@ insertInto : INSERT OVERWRITE TABLE? multipartIdentifier (partitionSpec (IF NOT EXISTS)?)? identifierList? #insertOverwriteTable | INSERT INTO TABLE? multipartIdentifier partitionSpec? (IF NOT EXISTS)? identifierList? #insertIntoTable | INSERT OVERWRITE LOCAL? DIRECTORY path=STRING rowFormat? createFileFormat? #insertOverwriteHiveDir - | INSERT OVERWRITE LOCAL? DIRECTORY (path=STRING)? tableProvider (OPTIONS options=tablePropertyList)? #insertOverwriteDir + | INSERT OVERWRITE LOCAL? DIRECTORY (path=STRING)? tableProvider (OPTIONS options=propertyList)? #insertOverwriteDir ; partitionSpecLocation @@ -387,7 +391,7 @@ tableProvider ; createTableClauses - :((OPTIONS options=tablePropertyList) | + :((OPTIONS options=propertyList) | (PARTITIONED BY partitioning=partitionFieldList) | skewSpec | bucketSpec | @@ -395,23 +399,23 @@ createTableClauses createFileFormat | locationSpec | commentSpec | - (TBLPROPERTIES tableProps=tablePropertyList))* + (TBLPROPERTIES tableProps=propertyList))* ; -tablePropertyList - : '(' tableProperty (',' tableProperty)* ')' +propertyList + : '(' property (',' property)* ')' ; -tableProperty - : key=tablePropertyKey (EQ? value=tablePropertyValue)? +property + : key=propertyKey (EQ? value=propertyValue)? ; -tablePropertyKey +propertyKey : identifier ('.' identifier)* | STRING ; -tablePropertyValue +propertyValue : INTEGER_VALUE | DECIMAL_VALUE | booleanValue @@ -437,7 +441,7 @@ fileFormat ; storageHandler - : STRING (WITH SERDEPROPERTIES tablePropertyList)? + : STRING (WITH SERDEPROPERTIES propertyList)? ; resource @@ -726,7 +730,7 @@ tableAlias ; rowFormat - : ROW FORMAT SERDE name=STRING (WITH SERDEPROPERTIES props=tablePropertyList)? #rowFormatSerde + : ROW FORMAT SERDE name=STRING (WITH SERDEPROPERTIES props=propertyList)? #rowFormatSerde | ROW FORMAT DELIMITED (FIELDS TERMINATED BY fieldsTerminatedBy=STRING (ESCAPED BY escapedBy=STRING)?)? (COLLECTION ITEMS TERMINATED BY collectionItemsTerminatedBy=STRING)? @@ -743,6 +747,14 @@ multipartIdentifier : parts+=errorCapturingIdentifier ('.' parts+=errorCapturingIdentifier)* ; +multipartIdentifierPropertyList + : multipartIdentifierProperty (',' multipartIdentifierProperty)* + ; + +multipartIdentifierProperty + : multipartIdentifier (OPTIONS options=propertyList)? + ; + tableIdentifier : (db=errorCapturingIdentifier '.')? table=errorCapturingIdentifier ; diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java index 4181cf5f2511..9cf39eb95dbf 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java @@ -18,7 +18,6 @@ package org.apache.spark.sql.connector.catalog.index; import java.util.Map; -import java.util.Properties; import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException; @@ -38,7 +37,8 @@ public interface SupportsIndex extends Table { * Creates an index. * * @param indexName the name of the index to be created - * @param indexType the IndexType of the index to be created + * @param indexType the type of the index to be created. If this is not specified, Spark + * will use empty String. * @param columns the columns on which index to be created * @param columnsProperties the properties of the columns on which index to be created * @param properties the properties of the index to be created @@ -47,8 +47,8 @@ public interface SupportsIndex extends Table { void createIndex(String indexName, String indexType, NamedReference[] columns, - Map[] columnsProperties, - Properties properties) + Map> columnsProperties, + Map properties) throws IndexAlreadyExistsException; /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 82c8891730ef..d36c7ac82e9b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2780,14 +2780,14 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * Convert a table property list into a key-value map. + * Convert a property list into a key-value map. * This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]]. */ - override def visitTablePropertyList( - ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) { - val properties = ctx.tableProperty.asScala.map { property => - val key = visitTablePropertyKey(property.key) - val value = visitTablePropertyValue(property.value) + override def visitPropertyList( + ctx: PropertyListContext): Map[String, String] = withOrigin(ctx) { + val properties = ctx.property.asScala.map { property => + val key = visitPropertyKey(property.key) + val value = visitPropertyValue(property.value) key -> value } // Check for duplicate property names. @@ -2796,10 +2796,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified. + * Parse a key-value map from a [[PropertyListContext]], assuming all values are specified. */ - def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] = { - val props = visitTablePropertyList(ctx) + def visitPropertyKeyValues(ctx: PropertyListContext): Map[String, String] = { + val props = visitPropertyList(ctx) val badKeys = props.collect { case (key, null) => key } if (badKeys.nonEmpty) { operationNotAllowed( @@ -2809,10 +2809,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * Parse a list of keys from a [[TablePropertyListContext]], assuming no values are specified. + * Parse a list of keys from a [[PropertyListContext]], assuming no values are specified. */ - def visitPropertyKeys(ctx: TablePropertyListContext): Seq[String] = { - val props = visitTablePropertyList(ctx) + def visitPropertyKeys(ctx: PropertyListContext): Seq[String] = { + val props = visitPropertyList(ctx) val badKeys = props.filter { case (_, v) => v != null }.keys if (badKeys.nonEmpty) { operationNotAllowed( @@ -2822,11 +2822,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * A table property key can either be String or a collection of dot separated elements. This - * function extracts the property key based on whether its a string literal or a table property + * A property key can either be String or a collection of dot separated elements. This + * function extracts the property key based on whether its a string literal or a property * identifier. */ - override def visitTablePropertyKey(key: TablePropertyKeyContext): String = { + override def visitPropertyKey(key: PropertyKeyContext): String = { if (key.STRING != null) { string(key.STRING) } else { @@ -2835,10 +2835,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * A table property value can be String, Integer, Boolean or Decimal. This function extracts + * A property value can be String, Integer, Boolean or Decimal. This function extracts * the property value based on whether its a string, integer, boolean or decimal literal. */ - override def visitTablePropertyValue(value: TablePropertyValueContext): String = { + override def visitPropertyValue(value: PropertyValueContext): String = { if (value == null) { null } else if (value.STRING != null) { @@ -3048,7 +3048,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg throw QueryParsingErrors.propertiesAndDbPropertiesBothSpecifiedError(ctx) } - var properties = ctx.tablePropertyList.asScala.headOption + var properties = ctx.propertyList.asScala.headOption .map(visitPropertyKeyValues) .getOrElse(Map.empty) @@ -3096,7 +3096,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg */ override def visitSetNamespaceProperties(ctx: SetNamespacePropertiesContext): LogicalPlan = { withOrigin(ctx) { - val properties = cleanNamespaceProperties(visitPropertyKeyValues(ctx.tablePropertyList), ctx) + val properties = cleanNamespaceProperties(visitPropertyKeyValues(ctx.propertyList), ctx) SetNamespaceProperties( UnresolvedNamespace(visitMultipartIdentifier(ctx.multipartIdentifier)), properties) @@ -3237,7 +3237,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg import ctx._ SerdeInfo( serde = Some(string(name)), - serdeProperties = Option(tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)) + serdeProperties = Option(propertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)) } /** @@ -3809,7 +3809,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg */ override def visitSetTableProperties( ctx: SetTablePropertiesContext): LogicalPlan = withOrigin(ctx) { - val properties = visitPropertyKeyValues(ctx.tablePropertyList) + val properties = visitPropertyKeyValues(ctx.propertyList) val cleanedTableProperties = cleanTableProperties(ctx, properties) if (ctx.VIEW != null) { SetViewProperties( @@ -3840,7 +3840,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg */ override def visitUnsetTableProperties( ctx: UnsetTablePropertiesContext): LogicalPlan = withOrigin(ctx) { - val properties = visitPropertyKeys(ctx.tablePropertyList) + val properties = visitPropertyKeys(ctx.propertyList) val cleanedProperties = cleanTableProperties(ctx, properties.map(_ -> "").toMap).keys.toSeq val ifExists = ctx.EXISTS != null @@ -4278,7 +4278,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg "ALTER TABLE ... SET [SERDE|SERDEPROPERTIES]", alterTableTypeMismatchHint), Option(ctx.STRING).map(string), - Option(ctx.tablePropertyList).map(visitPropertyKeyValues), + Option(ctx.propertyList).map(visitPropertyKeyValues), // TODO a partition spec is allowed to have optional values. This is currently violated. Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)) } @@ -4329,7 +4329,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg ctx: ShowTblPropertiesContext): LogicalPlan = withOrigin(ctx) { ShowTableProperties( createUnresolvedTableOrView(ctx.table, "SHOW TBLPROPERTIES"), - Option(ctx.key).map(visitTablePropertyKey)) + Option(ctx.key).map(visitPropertyKey)) } /** @@ -4404,6 +4404,38 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg CommentOnTable(createUnresolvedTable(ctx.multipartIdentifier, "COMMENT ON TABLE"), comment) } + /** + * Create an index, returning a [[CreateIndex]] logical plan. + * For example: + * {{{ + * CREATE INDEX index_name ON [TABLE] table_name [USING index_type] (column_index_property_list) + * [OPTIONS indexPropertyList] + * column_index_property_list: column_name [OPTIONS(indexPropertyList)] [ , . . . ] + * indexPropertyList: index_property_name [= index_property_value] [ , . . . ] + * }}} + */ + override def visitCreateIndex(ctx: CreateIndexContext): LogicalPlan = withOrigin(ctx) { + val (indexName, indexType) = if (ctx.identifier.size() == 1) { + (ctx.identifier(0).getText, "") + } else { + (ctx.identifier(0).getText, ctx.identifier(1).getText) + } + + val columns = ctx.columns.multipartIdentifierProperty.asScala + .map(_.multipartIdentifier.getText).toSeq + val columnsProperties = ctx.columns.multipartIdentifierProperty.asScala + .map(x => (Option(x.options).map(visitPropertyKeyValues).getOrElse(Map.empty))).toSeq + val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) + + CreateIndex( + createUnresolvedTable(ctx.multipartIdentifier(), "CREATE INDEX"), + indexName, + indexType, + ctx.EXISTS != null, + columns.map(FieldReference(_).asInstanceOf[FieldReference]).zip(columnsProperties), + options) + } + private def alterViewTypeMismatchHint: Option[String] = Some("Please use ALTER TABLE instead.") private def alterTableTypeMismatchHint: Option[String] = Some("Please use ALTER VIEW instead.") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 31fdb6cea627..e3498229e9b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema import org.apache.spark.sql.catalyst.trees.BinaryLike import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog._ -import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.expressions.{NamedReference, Transform} import org.apache.spark.sql.connector.write.Write import org.apache.spark.sql.types.{BooleanType, DataType, MetadataBuilder, StringType, StructType} @@ -1056,3 +1056,17 @@ case class UncacheTable( override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true) } + +/** + * The logical plan of the CREATE INDEX command. + */ +case class CreateIndex( + child: LogicalPlan, + indexName: String, + indexType: String, + ignoreIfExists: Boolean, + columns: Seq[(NamedReference, Map[String, String])], + properties: Map[String, String]) extends UnaryCommand { + override protected def withNewChildInternal(newChild: LogicalPlan): CreateIndex = + copy(child = newChild) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index eb8985d50d04..dcaa62229171 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -2366,4 +2366,8 @@ object QueryCompilationErrors { errorClass = "INVALID_JSON_SCHEMA_MAPTYPE", messageParameters = Array(schema.toString)) } + + def tableIndexNotSupportedError(errorMessage: String): Throwable = { + new AnalysisException(errorMessage) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 7bcc2b7f0c3d..798c130e0421 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -2282,6 +2282,24 @@ class DDLParserSuite extends AnalysisTest { RefreshFunction(UnresolvedFunc(Seq("a", "b", "c")))) } + test("CREATE INDEX") { + parseCompare("CREATE index i1 ON a.b.c USING BTREE (col1)", + CreateIndex(UnresolvedTable(Seq("a", "b", "c"), "CREATE INDEX", None), "i1", "BTREE", false, + Array(FieldReference("col1")).toSeq.zip(Seq(Map.empty[String, String])), Map.empty)) + + parseCompare("CREATE index IF NOT EXISTS i1 ON TABLE a.b.c USING BTREE" + + " (col1 OPTIONS ('k1'='v1'), col2 OPTIONS ('k2'='v2')) ", + CreateIndex(UnresolvedTable(Seq("a", "b", "c"), "CREATE INDEX", None), "i1", "BTREE", true, + Array(FieldReference("col1"), FieldReference("col2")).toSeq + .zip(Seq(Map("k1" -> "v1"), Map("k2" -> "v2"))), Map.empty)) + + parseCompare("CREATE index i1 ON a.b.c" + + " (col1 OPTIONS ('k1'='v1'), col2 OPTIONS ('k2'='v2')) OPTIONS ('k3'='v3', 'k4'='v4')", + CreateIndex(UnresolvedTable(Seq("a", "b", "c"), "CREATE INDEX", None), "i1", "", false, + Array(FieldReference("col1"), FieldReference("col2")).toSeq + .zip(Seq(Map("k1" -> "v1"), Map("k2" -> "v2"))), Map("k3" -> "v3", "k4" -> "v4"))) + } + private case class TableSpec( name: Seq[String], schema: Option[StructType], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index d26b26191715..b63306be6b04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -338,7 +338,7 @@ class SparkSqlAstBuilder extends AstBuilder { replace = ctx.REPLACE != null, global = ctx.GLOBAL != null, provider = ctx.tableProvider.multipartIdentifier.getText, - options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)) + options = Option(ctx.propertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)) } /** @@ -464,7 +464,7 @@ class SparkSqlAstBuilder extends AstBuilder { throw QueryParsingErrors.createViewWithBothIfNotExistsAndReplaceError(ctx) } - val properties = ctx.tablePropertyList.asScala.headOption.map(visitPropertyKeyValues) + val properties = ctx.propertyList.asScala.headOption.map(visitPropertyKeyValues) .getOrElse(Map.empty) if (ctx.TEMPORARY != null && !properties.isEmpty) { operationNotAllowed("TBLPROPERTIES can't coexist with CREATE TEMPORARY VIEW", ctx) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index d482245fb31f..4402f2709be7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -1021,8 +1021,8 @@ object JdbcUtils extends Logging with SQLConfHelper { indexType: String, tableName: String, columns: Array[NamedReference], - columnsProperties: Array[util.Map[NamedReference, util.Properties]], - properties: util.Properties, + columnsProperties: util.Map[NamedReference, util.Map[String, String]], + properties: util.Map[String, String], options: JDBCOptions): Unit = { val dialect = JdbcDialects.get(options.url) executeStatement(conn, options, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala new file mode 100644 index 000000000000..78bdf64ab77a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.index.SupportsIndex +import org.apache.spark.sql.connector.expressions.NamedReference + +/** + * Physical plan node for creating an index. + */ +case class CreateIndexExec( + table: SupportsIndex, + indexName: String, + indexType: String, + ignoreIfExists: Boolean, + columns: Seq[(NamedReference, Map[String, String])], + properties: Map[String, String]) + extends LeafV2CommandExec { + override protected def run(): Seq[InternalRow] = { + val colProperties = new util.HashMap[NamedReference, util.Map[String, String]] + columns.foreach { + case (column, map) => colProperties.put(column, map.asJava) + } + try { + table.createIndex( + indexName, indexType, columns.unzip._1.toArray, colProperties, properties.asJava) + } catch { + case _: IndexAlreadyExistsException if ignoreIfExists => + logWarning(s"Index $indexName already exists in table ${table.name}. Ignoring.") + } + Seq.empty + } + + override def output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 56e7abcb801f..66ee43130976 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.toPrettySQL import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog} +import org.apache.spark.sql.connector.catalog.index.SupportsIndex import org.apache.spark.sql.connector.expressions.{FieldReference, Literal => V2Literal, LiteralValue} import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse => V2AlwaysFalse, AlwaysTrue => V2AlwaysTrue, And => V2And, EqualNullSafe => V2EqualNullSafe, EqualTo => V2EqualTo, Filter => V2Filter, GreaterThan => V2GreaterThan, GreaterThanOrEqual => V2GreaterThanOrEqual, In => V2In, IsNotNull => V2IsNotNull, IsNull => V2IsNull, LessThan => V2LessThan, LessThanOrEqual => V2LessThanOrEqual, Not => V2Not, Or => V2Or, StringContains => V2StringContains, StringEndsWith => V2StringEndsWith, StringStartsWith => V2StringStartsWith} import org.apache.spark.sql.connector.read.LocalScan @@ -429,6 +430,15 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat val table = a.table.asInstanceOf[ResolvedTable] AlterTableExec(table.catalog, table.identifier, a.changes) :: Nil + case CreateIndex(ResolvedTable(_, _, table, _), + indexName, indexType, ifNotExists, columns, properties) => + table match { + case s: SupportsIndex => + CreateIndexExec(s, indexName, indexType, ifNotExists, columns, properties):: Nil + case _ => throw QueryCompilationErrors.tableIndexNotSupportedError( + s"CreateIndex is not supported in this table ${table.name}.") + } + case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala index 23ff50330846..c61d89061759 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala @@ -56,8 +56,8 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt indexName: String, indexType: String, columns: Array[NamedReference], - columnsProperties: Array[util.Map[NamedReference, util.Properties]], - properties: util.Properties): Unit = { + columnsProperties: util.Map[NamedReference, util.Map[String, String]], + properties: util.Map[String, String]): Unit = { JdbcUtils.withConnection(jdbcOptions) { conn => JdbcUtils.classifyException(s"Failed to create index: $indexName in $name", JdbcDialects.get(jdbcOptions.url)) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index eb3986ce79b3..9e54ba7ce27e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -306,8 +306,8 @@ abstract class JdbcDialect extends Serializable with Logging{ indexType: String, tableName: String, columns: Array[NamedReference], - columnsProperties: Array[util.Map[NamedReference, util.Properties]], - properties: util.Properties): String = { + columnsProperties: util.Map[NamedReference, util.Map[String, String]], + properties: util.Map[String, String]): String = { throw new UnsupportedOperationException("createIndex is not supported") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 7e85b3bbb84e..73b36f1d1b75 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -118,13 +118,12 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { indexType: String, tableName: String, columns: Array[NamedReference], - columnsProperties: Array[util.Map[NamedReference, util.Properties]], - properties: util.Properties): String = { + columnsProperties: util.Map[NamedReference, util.Map[String, String]], + properties: util.Map[String, String]): String = { val columnList = columns.map(col => quoteIdentifier(col.fieldNames.head)) var indexProperties: String = "" - val scalaProps = properties.asScala if (!properties.isEmpty) { - scalaProps.foreach { case (k, v) => + properties.asScala.foreach { case (k, v) => indexProperties = indexProperties + " " + s"$k $v" } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 360c8beff436..10a70e2fd6d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2969,6 +2969,17 @@ class DataSourceV2SQLSuite Row("testcat"), Row("testcat2"))) } + test("CREATE INDEX should fail") { + val t = "testcat.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, data string COMMENT 'hello') USING foo") + val ex = intercept[AnalysisException] { + sql(s"CREATE index i1 ON $t(col1)") + } + assert(ex.getMessage.contains(s"CreateIndex is not supported in this table $t.")) + } + } + private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = { val e = intercept[AnalysisException] { sql(s"$sqlCommand $sqlParams")