Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
050f881
[SPARK-36895][SQL] Add Create Index syntax support
huaxingao Sep 29, 2021
d3b39cc
fix 2.13 scala compile error
huaxingao Sep 30, 2021
c2b20b9
remove the specific index types for now
huaxingao Sep 30, 2021
e773f55
address comments
huaxingao Oct 3, 2021
74d607c
address comments
huaxingao Oct 3, 2021
0ed3401
address comments
huaxingao Oct 5, 2021
6f9347d
test IF NOT EXISTS
huaxingao Oct 5, 2021
d6ac2aa
add end to end test for createIndex
huaxingao Oct 9, 2021
2e3788a
remove index test from JDBCTableCatalogSuite
huaxingao Oct 10, 2021
78f300f
remove unsed import
huaxingao Oct 10, 2021
7531b87
change CREATE [index_type] INDEX index_name ON [TABLE] table_name to …
huaxingao Oct 13, 2021
b619f7a
rebase
huaxingao Oct 13, 2021
a8382e5
fix index type syntax in test
huaxingao Oct 14, 2021
a4fa7c8
address comments
huaxingao Oct 14, 2021
6b84f83
rebase
huaxingao Oct 14, 2021
9d0781c
fix test failure
huaxingao Oct 14, 2021
03c046b
address comments and change Properties to Map
huaxingao Oct 16, 2021
b7801e4
Update sql/core/src/main/scala/org/apache/spark/sql/execution/datasou…
huaxingao Oct 17, 2021
e962ecb
fix build failure
huaxingao Oct 17, 2021
a86503f
fix lint-java
huaxingao Oct 17, 2021
ab2908f
fix test failure
huaxingao Oct 18, 2021
f871d26
address comments
huaxingao Oct 18, 2021
2e066b3
address comments
huaxingao Oct 18, 2021
a8cb845
change columnsProperties from Array[util.Map[NamedReference, util.Map…
huaxingao Oct 18, 2021
fe0d4d3
address comments
huaxingao Oct 19, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkConf
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException
import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
Expand Down Expand Up @@ -122,13 +124,13 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
override def supportsIndex: Boolean = true

override def testIndexProperties(jdbcTable: SupportsIndex): Unit = {
val properties = new util.Properties();
val properties = new util.HashMap[String, String]();
properties.put("KEY_BLOCK_SIZE", "10")
properties.put("COMMENT", "'this is a comment'")
// MySQL doesn't allow property set on individual column, so use empty Array for
// column properties
jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
Array.empty[util.Map[NamedReference, util.Properties]], properties)
new util.HashMap[NamedReference, util.Map[String, String]](), properties)

var index = jdbcTable.listIndexes()
// The index property size is actually 1. Even though the index is created
Expand All @@ -137,4 +139,32 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
assert(index(0).properties.size == 1)
assert(index(0).properties.get("COMMENT").equals("this is a comment"))
}

override def testIndexUsingSQL(tbl: String): Unit = {
val loaded = Catalogs.load("mysql", conf)
val jdbcTable = loaded.asInstanceOf[TableCatalog]
.loadTable(Identifier.of(Array.empty[String], "new_table"))
.asInstanceOf[SupportsIndex]
assert(jdbcTable.indexExists("i1") == false)
assert(jdbcTable.indexExists("i2") == false)

val indexType = "DUMMY"
var m = intercept[UnsupportedOperationException] {
sql(s"CREATE index i1 ON $catalogName.new_table USING DUMMY (col1)")
}.getMessage
assert(m.contains(s"Index Type $indexType is not supported." +
s" The supported Index Types are: BTREE and HASH"))

sql(s"CREATE index i1 ON $catalogName.new_table USING BTREE (col1)")
sql(s"CREATE index i2 ON $catalogName.new_table (col2, col3, col5)" +
s" OPTIONS (KEY_BLOCK_SIZE=10)")

assert(jdbcTable.indexExists("i1") == true)
assert(jdbcTable.indexExists("i2") == true)

m = intercept[IndexAlreadyExistsException] {
sql(s"CREATE index i1 ON $catalogName.new_table (col1)")
}.getMessage
assert(m.contains("Failed to create index: i1 in new_table"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu

def supportsIndex: Boolean = false
def testIndexProperties(jdbcTable: SupportsIndex): Unit = {}
def testIndexUsingSQL(tbl: String): Unit = {}

test("SPARK-36913: Test INDEX") {
if (supportsIndex) {
Expand All @@ -202,28 +203,28 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
assert(jdbcTable.indexExists("i1") == false)
assert(jdbcTable.indexExists("i2") == false)

val properties = new util.Properties();
val properties = new util.HashMap[String, String]();
val indexType = "DUMMY"
var m = intercept[UnsupportedOperationException] {
jdbcTable.createIndex("i1", indexType, Array(FieldReference("col1")),
Array.empty[util.Map[NamedReference, util.Properties]], properties)
new util.HashMap[NamedReference, util.Map[String, String]](), properties)
}.getMessage
assert(m.contains(s"Index Type $indexType is not supported." +
s" The supported Index Types are: BTREE and HASH"))

jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")),
Array.empty[util.Map[NamedReference, util.Properties]], properties)
new util.HashMap[NamedReference, util.Map[String, String]](), properties)

jdbcTable.createIndex("i2", "",
Array(FieldReference("col2"), FieldReference("col3"), FieldReference("col5")),
Array.empty[util.Map[NamedReference, util.Properties]], properties)
new util.HashMap[NamedReference, util.Map[String, String]](), properties)

assert(jdbcTable.indexExists("i1") == true)
assert(jdbcTable.indexExists("i2") == true)

m = intercept[IndexAlreadyExistsException] {
jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
Array.empty[util.Map[NamedReference, util.Properties]], properties)
new util.HashMap[NamedReference, util.Map[String, String]](), properties)
}.getMessage
assert(m.contains("Failed to create index: i1 in new_table"))

Expand Down Expand Up @@ -276,5 +277,11 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
}
}
}
}

test("SPARK-36895: Test INDEX Using SQL") {
withTable(s"$catalogName.new_table") {
sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT, col4 INT, col5 INT)")
testIndexUsingSQL(s"$catalogName.new_table")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ statement
| CREATE namespace (IF NOT EXISTS)? multipartIdentifier
(commentSpec |
locationSpec |
(WITH (DBPROPERTIES | PROPERTIES) tablePropertyList))* #createNamespace
(WITH (DBPROPERTIES | PROPERTIES) propertyList))* #createNamespace
| ALTER namespace multipartIdentifier
SET (DBPROPERTIES | PROPERTIES) tablePropertyList #setNamespaceProperties
SET (DBPROPERTIES | PROPERTIES) propertyList #setNamespaceProperties
| ALTER namespace multipartIdentifier
SET locationSpec #setNamespaceLocation
| DROP namespace (IF EXISTS)? multipartIdentifier
Expand All @@ -130,7 +130,7 @@ statement
rowFormat |
createFileFormat |
locationSpec |
(TBLPROPERTIES tableProps=tablePropertyList))* #createTableLike
(TBLPROPERTIES tableProps=propertyList))* #createTableLike
| replaceTableHeader ('(' colTypeList ')')? tableProvider?
createTableClauses
(AS? query)? #replaceTable
Expand All @@ -155,9 +155,9 @@ statement
| ALTER (TABLE | VIEW) from=multipartIdentifier
RENAME TO to=multipartIdentifier #renameTable
| ALTER (TABLE | VIEW) multipartIdentifier
SET TBLPROPERTIES tablePropertyList #setTableProperties
SET TBLPROPERTIES propertyList #setTableProperties
| ALTER (TABLE | VIEW) multipartIdentifier
UNSET TBLPROPERTIES (IF EXISTS)? tablePropertyList #unsetTableProperties
UNSET TBLPROPERTIES (IF EXISTS)? propertyList #unsetTableProperties
| ALTER TABLE table=multipartIdentifier
(ALTER | CHANGE) COLUMN? column=multipartIdentifier
alterColumnAction? #alterTableAlterColumn
Expand All @@ -168,9 +168,9 @@ statement
REPLACE COLUMNS
'(' columns=qualifiedColTypeWithPositionList ')' #hiveReplaceColumns
| ALTER TABLE multipartIdentifier (partitionSpec)?
SET SERDE STRING (WITH SERDEPROPERTIES tablePropertyList)? #setTableSerDe
SET SERDE STRING (WITH SERDEPROPERTIES propertyList)? #setTableSerDe
| ALTER TABLE multipartIdentifier (partitionSpec)?
SET SERDEPROPERTIES tablePropertyList #setTableSerDe
SET SERDEPROPERTIES propertyList #setTableSerDe
| ALTER (TABLE | VIEW) multipartIdentifier ADD (IF NOT EXISTS)?
partitionSpecLocation+ #addTablePartition
| ALTER TABLE multipartIdentifier
Expand All @@ -187,11 +187,11 @@ statement
identifierCommentList?
(commentSpec |
(PARTITIONED ON identifierList) |
(TBLPROPERTIES tablePropertyList))*
(TBLPROPERTIES propertyList))*
AS query #createView
| CREATE (OR REPLACE)? GLOBAL? TEMPORARY VIEW
tableIdentifier ('(' colTypeList ')')? tableProvider
(OPTIONS tablePropertyList)? #createTempViewUsing
(OPTIONS propertyList)? #createTempViewUsing
| ALTER VIEW multipartIdentifier AS? query #alterViewQuery
| CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)?
multipartIdentifier AS className=STRING
Expand All @@ -204,7 +204,7 @@ statement
| SHOW TABLE EXTENDED ((FROM | IN) ns=multipartIdentifier)?
LIKE pattern=STRING partitionSpec? #showTableExtended
| SHOW TBLPROPERTIES table=multipartIdentifier
('(' key=tablePropertyKey ')')? #showTblProperties
('(' key=propertyKey ')')? #showTblProperties
| SHOW COLUMNS (FROM | IN) table=multipartIdentifier
((FROM | IN) ns=multipartIdentifier)? #showColumns
| SHOW VIEWS ((FROM | IN) multipartIdentifier)?
Expand All @@ -228,7 +228,7 @@ statement
| REFRESH FUNCTION multipartIdentifier #refreshFunction
| REFRESH (STRING | .*?) #refreshResource
| CACHE LAZY? TABLE multipartIdentifier
(OPTIONS options=tablePropertyList)? (AS? query)? #cacheTable
(OPTIONS options=propertyList)? (AS? query)? #cacheTable
| UNCACHE TABLE (IF EXISTS)? multipartIdentifier #uncacheTable
| CLEAR CACHE #clearCache
| LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
Expand All @@ -247,6 +247,10 @@ statement
| SET .*? #setConfiguration
| RESET configKey #resetQuotedConfiguration
| RESET .*? #resetConfiguration
| CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious why TABLE is optional here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's the keyword TABLE, not table name. Most of the DBMS don't include this TABLE keyword in CREATE INDEX, but Delta Lake has this optional TABLE keyword.
CREATE BLOOMFILTER INDEX ON [TABLE] table_identifier [FOR COLUMNS(columnName1 [OPTIONS(..)], columnName2, ...)] [OPTIONS ( key1 [ = ] val1, key2 [ = ] val2, ... ) ]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I understand this is a keyword, just not sure the reason why we make it optional instead of always requiring or not requiring it.

multipartIdentifier (USING indexType=identifier)?
'(' columns=multipartIdentifierPropertyList ')'
(OPTIONS options=propertyList)? #createIndex
| unsupportedHiveNativeCommands .*? #failNativeCommand
;

Expand Down Expand Up @@ -341,7 +345,7 @@ insertInto
: INSERT OVERWRITE TABLE? multipartIdentifier (partitionSpec (IF NOT EXISTS)?)? identifierList? #insertOverwriteTable
| INSERT INTO TABLE? multipartIdentifier partitionSpec? (IF NOT EXISTS)? identifierList? #insertIntoTable
| INSERT OVERWRITE LOCAL? DIRECTORY path=STRING rowFormat? createFileFormat? #insertOverwriteHiveDir
| INSERT OVERWRITE LOCAL? DIRECTORY (path=STRING)? tableProvider (OPTIONS options=tablePropertyList)? #insertOverwriteDir
| INSERT OVERWRITE LOCAL? DIRECTORY (path=STRING)? tableProvider (OPTIONS options=propertyList)? #insertOverwriteDir
;

partitionSpecLocation
Expand Down Expand Up @@ -387,31 +391,31 @@ tableProvider
;

createTableClauses
:((OPTIONS options=tablePropertyList) |
:((OPTIONS options=propertyList) |
(PARTITIONED BY partitioning=partitionFieldList) |
skewSpec |
bucketSpec |
rowFormat |
createFileFormat |
locationSpec |
commentSpec |
(TBLPROPERTIES tableProps=tablePropertyList))*
(TBLPROPERTIES tableProps=propertyList))*
;

tablePropertyList
: '(' tableProperty (',' tableProperty)* ')'
propertyList
: '(' property (',' property)* ')'
;

tableProperty
: key=tablePropertyKey (EQ? value=tablePropertyValue)?
property
: key=propertyKey (EQ? value=propertyValue)?
;

tablePropertyKey
propertyKey
: identifier ('.' identifier)*
| STRING
;

tablePropertyValue
propertyValue
: INTEGER_VALUE
| DECIMAL_VALUE
| booleanValue
Expand All @@ -437,7 +441,7 @@ fileFormat
;

storageHandler
: STRING (WITH SERDEPROPERTIES tablePropertyList)?
: STRING (WITH SERDEPROPERTIES propertyList)?
;

resource
Expand Down Expand Up @@ -726,7 +730,7 @@ tableAlias
;

rowFormat
: ROW FORMAT SERDE name=STRING (WITH SERDEPROPERTIES props=tablePropertyList)? #rowFormatSerde
: ROW FORMAT SERDE name=STRING (WITH SERDEPROPERTIES props=propertyList)? #rowFormatSerde
| ROW FORMAT DELIMITED
(FIELDS TERMINATED BY fieldsTerminatedBy=STRING (ESCAPED BY escapedBy=STRING)?)?
(COLLECTION ITEMS TERMINATED BY collectionItemsTerminatedBy=STRING)?
Expand All @@ -743,6 +747,14 @@ multipartIdentifier
: parts+=errorCapturingIdentifier ('.' parts+=errorCapturingIdentifier)*
;

multipartIdentifierPropertyList
: multipartIdentifierProperty (',' multipartIdentifierProperty)*
;

multipartIdentifierProperty
: multipartIdentifier (OPTIONS options=propertyList)?
;

tableIdentifier
: (db=errorCapturingIdentifier '.')? table=errorCapturingIdentifier
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.connector.catalog.index;

import java.util.Map;
import java.util.Properties;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException;
Expand All @@ -38,7 +37,8 @@ public interface SupportsIndex extends Table {
* Creates an index.
*
* @param indexName the name of the index to be created
* @param indexType the IndexType of the index to be created
* @param indexType the type of the index to be created. If this is not specified, Spark
* will use empty String.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to follow the table provider and treat index type as a special index property. Then it's easier to indicate when index type is not specified.

We can do it in a followup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix this in next PR when adding the DROP INDEX syntax

* @param columns the columns on which index to be created
* @param columnsProperties the properties of the columns on which index to be created
* @param properties the properties of the index to be created
Expand All @@ -47,8 +47,8 @@ public interface SupportsIndex extends Table {
void createIndex(String indexName,
String indexType,
NamedReference[] columns,
Map<NamedReference, Properties>[] columnsProperties,
Properties properties)
Map<NamedReference, Map<String, String>> columnsProperties,
Map<String, String> properties)
throws IndexAlreadyExistsException;

/**
Expand Down
Loading