Skip to content
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.connect.config.Connect
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import org.apache.spark.sql.connect.service.SessionHolder
import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, InMemoryCatalog}
import org.apache.spark.sql.connector.catalog.{CatalogManager, Column, Identifier, InMemoryCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -137,12 +137,12 @@ class ProtoToParsedPlanTestSuite
inMemoryCatalog.createNamespace(Array("tempdb"), emptyProps)
inMemoryCatalog.createTable(
Identifier.of(Array("tempdb"), "myTable"),
new StructType().add("id", "long"),
Array(Column.create("id", LongType)),
Array.empty[Transform],
emptyProps)
inMemoryCatalog.createTable(
Identifier.of(Array("tempdb"), "myStreamingTable"),
new StructType().add("id", "long"),
Array(Column.create("id", LongType)),
Array.empty[Transform],
emptyProps)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,22 @@ import org.apache.logging.log4j.Level

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException
import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange}
import org.apache.spark.sql.connector.catalog.{Column, Identifier, NamespaceChange}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.apache.spark.tags.DockerTest

@DockerTest
private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerIntegrationFunSuite {
val catalog = new JDBCTableCatalog()

private val emptyProps: util.Map[String, String] = Collections.emptyMap[String, String]
private val schema: StructType = new StructType()
.add("id", IntegerType)
.add("data", StringType)
private val columns: Array[Column] = Array(
Column.create("id", IntegerType),
Column.create("data", StringType))

def builtinNamespaces: Array[Array[String]]

Expand Down Expand Up @@ -116,7 +116,7 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte
// Drop non empty namespace without cascade
catalog.createNamespace(Array("foo"), commentMap.asJava)
assert(catalog.namespaceExists(Array("foo")) === true)
catalog.createTable(ident1, schema, Array.empty[Transform], emptyProps)
catalog.createTable(ident1, columns, Array.empty[Transform], emptyProps)
if (supportsDropSchemaRestrict) {
intercept[NonEmptyNamespaceException] {
catalog.dropNamespace(Array("foo"), cascade = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
* SELECT and REPLACE TABLE AS SELECT operations are atomic. For example, when one runs a REPLACE
* TABLE AS SELECT operation, if the catalog does not implement this trait, the planner will first
* drop the table via {@link TableCatalog#dropTable(Identifier)}, then create the table via
* {@link TableCatalog#createTable(Identifier, StructType, Transform[], Map)}, and then perform
* {@link TableCatalog#createTable(Identifier, Column[], Transform[], Map)}, and then perform
* the write via {@link SupportsWrite#newWriteBuilder(LogicalWriteInfo)}.
* However, if the write operation fails, the catalog will have already dropped the table, and the
* planner cannot roll back the dropping of the table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3996,4 +3996,14 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
origin = e.origin
)
}

private def callDeprecatedMethodError(oldMethod: String, newMethod: String): Throwable = {
SparkException.internalError(s"The method `$oldMethod` is deprecated, " +
s"please use `$newMethod` instead.")
}

def createTableDeprecatedError(): Throwable = {
callDeprecatedMethodError("createTable(..., StructType, ...)",
"createTable(..., Array[Column], ...)")
}
}
Loading