Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,6 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
}

override def supportsIndex: Boolean = true

override def indexOptions: String = "KEY_BLOCK_SIZE=10"
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,8 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTes
}

override def supportsTableSample: Boolean = true

override def supportsIndex: Boolean = true

override def indexOptions: String = "FILLFACTOR=70"
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu

def supportsIndex: Boolean = false

def indexOptions: String = ""

test("SPARK-36895: Test INDEX Using SQL") {
if (supportsIndex) {
withTable(s"$catalogName.new_table") {
Expand All @@ -208,11 +210,11 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
sql(s"CREATE index i1 ON $catalogName.new_table USING $indexType (col1)")
}.getMessage
assert(m.contains(s"Index Type $indexType is not supported." +
s" The supported Index Types are: BTREE and HASH"))
s" The supported Index Types are:"))

sql(s"CREATE index i1 ON $catalogName.new_table USING BTREE (col1)")
sql(s"CREATE index i2 ON $catalogName.new_table (col2, col3, col5)" +
s" OPTIONS (KEY_BLOCK_SIZE=10)")
s" OPTIONS ($indexOptions)")

assert(jdbcTable.indexExists("i1") == true)
assert(jdbcTable.indexExists("i2") == true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import java.util
import java.util.Locale
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.Try
import scala.util.control.NonFatal

Expand All @@ -38,7 +40,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateToDays, toJavaDate, toJavaTimestamp}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.index.TableIndex
import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex}
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider
Expand Down Expand Up @@ -1073,6 +1075,73 @@ object JdbcUtils extends Logging with SQLConfHelper {
}
}

/**
* Check if index exists in a table
*/
def checkIfIndexExists(
conn: Connection,
sql: String,
options: JDBCOptions): Boolean = {
val statement = conn.createStatement
try {
statement.setQueryTimeout(options.queryTimeout)
val rs = statement.executeQuery(sql)
rs.next
} catch {
case _: Exception =>
logWarning("Cannot retrieved index info.")
false
} finally {
statement.close()
}
}

/**
* Process index properties and return tuple of indexType and list of the other index properties.
*/
def processIndexProperties(
properties: util.Map[String, String],
catalogName: String): (String, Array[String]) = {
var indexType = ""
val indexPropertyList: ArrayBuffer[String] = ArrayBuffer[String]()
val supportedIndexTypeList = getSupportedIndexTypeList(catalogName)

if (!properties.isEmpty) {
properties.asScala.foreach { case (k, v) =>
if (k.equals(SupportsIndex.PROP_TYPE)) {
if (containsIndexTypeIgnoreCase(supportedIndexTypeList, v)) {
indexType = s"USING $v"
} else {
throw new UnsupportedOperationException(s"Index Type $v is not supported." +
s" The supported Index Types are: ${supportedIndexTypeList.mkString(" AND ")}")
}
} else {
indexPropertyList.append(s"$k = $v")
}
}
}
(indexType, indexPropertyList.toArray)
}

def containsIndexTypeIgnoreCase(supportedIndexTypeList: Array[String], value: String): Boolean = {
if (supportedIndexTypeList.isEmpty) {
throw new UnsupportedOperationException(
"Cannot specify 'USING index_type' in 'CREATE INDEX'")
}
for (indexType <- supportedIndexTypeList) {
if (value.equalsIgnoreCase(indexType)) return true
}
false
}

def getSupportedIndexTypeList(catalogName: String): Array[String] = {
catalogName match {
case "mysql" => Array("BTREE", "HASH")
case "postgresql" => Array("BTREE", "HASH", "BRIN")
case _ => Array.empty
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may be better to put this in JdbcDialect and let the subclasses implement this. WDYT? @cloud-fan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree with @huaxingao. Should we move it to JdbcDialect @cloud-fan?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. This is for failing earlier in createIndex, where the JDBC dialect implementation already has full control.

Copy link
Contributor

@cloud-fan cloud-fan Dec 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the only benefit of adding this API in JdbcDialect is to share the code of checking unsupported index type earlier, which I don't think is worthwhile.


def executeQuery(conn: Connection, options: JDBCOptions, sql: String): ResultSet = {
val statement = conn.createStatement
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ import java.sql.{Connection, SQLException, Types}
import java.util
import java.util.Locale

import scala.collection.JavaConverters._

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException}
import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex}
import org.apache.spark.sql.connector.catalog.index.TableIndex
import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
Expand Down Expand Up @@ -120,25 +118,12 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
columnsProperties: util.Map[NamedReference, util.Map[String, String]],
properties: util.Map[String, String]): String = {
val columnList = columns.map(col => quoteIdentifier(col.fieldNames.head))
var indexProperties: String = ""
var indexType = ""
if (!properties.isEmpty) {
properties.asScala.foreach { case (k, v) =>
if (k.equals(SupportsIndex.PROP_TYPE)) {
if (v.equalsIgnoreCase("BTREE") || v.equalsIgnoreCase("HASH")) {
indexType = s"USING $v"
} else {
throw new UnsupportedOperationException(s"Index Type $v is not supported." +
" The supported Index Types are: BTREE and HASH")
}
} else {
indexProperties = indexProperties + " " + s"$k $v"
}
}
}
val (indexType, indexPropertyList) = JdbcUtils.processIndexProperties(properties, "mysql")

// columnsProperties doesn't apply to MySQL so it is ignored
s"CREATE INDEX ${quoteIdentifier(indexName)} $indexType ON" +
s" ${quoteIdentifier(tableName)} (${columnList.mkString(", ")}) $indexProperties"
s" ${quoteIdentifier(tableName)} (${columnList.mkString(", ")})" +
s" ${indexPropertyList.mkString(" ")}"
}

// SHOW INDEX syntax
Expand All @@ -148,21 +133,8 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
indexName: String,
tableName: String,
options: JDBCOptions): Boolean = {
val sql = s"SHOW INDEXES FROM ${quoteIdentifier(tableName)}"
try {
val rs = JdbcUtils.executeQuery(conn, options, sql)
while (rs.next()) {
val retrievedIndexName = rs.getString("key_name")
if (conf.resolver(retrievedIndexName, indexName)) {
return true
}
}
false
} catch {
case _: Exception =>
logWarning("Cannot retrieved index info.")
false
}
val sql = s"SHOW INDEXES FROM ${quoteIdentifier(tableName)} WHERE key_name = '$indexName'"
JdbcUtils.checkIfIndexExists(conn, sql, options)
}

override def dropIndex(indexName: String, tableName: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@

package org.apache.spark.sql.jdbc

import java.sql.{Connection, Types}
import java.sql.{Connection, SQLException, Types}
import java.util
import java.util.Locale

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException}
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
import org.apache.spark.sql.types._


private object PostgresDialect extends JdbcDialect {
private object PostgresDialect extends JdbcDialect with SQLConfHelper {

override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:postgresql")
Expand Down Expand Up @@ -164,4 +169,56 @@ private object PostgresDialect extends JdbcDialect {
s"TABLESAMPLE BERNOULLI" +
s" (${(sample.upperBound - sample.lowerBound) * 100}) REPEATABLE (${sample.seed})"
}

// CREATE INDEX syntax
// https://www.postgresql.org/docs/14/sql-createindex.html
override def createIndex(
indexName: String,
tableName: String,
columns: Array[NamedReference],
columnsProperties: util.Map[NamedReference, util.Map[String, String]],
properties: util.Map[String, String]): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation is similar to MySQL. We probably want to move this to JdbcUtils too for code reuse.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huaxingao The implementation is not totally similar to MySQL in the assignment of variables. So we should create a function with the type of JDBC parameter that we base on it to return the exact SQL, right? Thanks!

val columnList = columns.map(col => quoteIdentifier(col.fieldNames.head))
var indexProperties = ""
val (indexType, indexPropertyList) = JdbcUtils.processIndexProperties(properties, "postgresql")

if (indexPropertyList.nonEmpty) {
indexProperties = "WITH (" + indexPropertyList.mkString(", ") + ")"
}

s"CREATE INDEX ${quoteIdentifier(indexName)} ON ${quoteIdentifier(tableName)}" +
s" $indexType (${columnList.mkString(", ")}) $indexProperties"
}

// SHOW INDEX syntax
// https://www.postgresql.org/docs/14/view-pg-indexes.html
override def indexExists(
conn: Connection,
indexName: String,
tableName: String,
options: JDBCOptions): Boolean = {
val sql = s"SELECT * FROM pg_indexes WHERE tablename = '$tableName' AND" +
s" indexname = '$indexName'"
JdbcUtils.checkIfIndexExists(conn, sql, options)
}

// DROP INDEX syntax
// https://www.postgresql.org/docs/14/sql-dropindex.html
override def dropIndex(indexName: String, tableName: String): String = {
s"DROP INDEX ${quoteIdentifier(indexName)}"
}

override def classifyException(message: String, e: Throwable): AnalysisException = {
e match {
case sqlException: SQLException =>
sqlException.getSQLState match {
// https://www.postgresql.org/docs/14/errcodes-appendix.html
case "42P07" => throw new IndexAlreadyExistsException(message, cause = Some(e))
case "42704" => throw new NoSuchIndexException(message, cause = Some(e))
Comment on lines +216 to +217
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use Postgres error codes to handle the exception, but getErrorCode returns 0. So I use getSQLState instead.

case _ => super.classifyException(message, e)
}
case unsupported: UnsupportedOperationException => throw unsupported
case _ => super.classifyException(message, e)
}
}
}