From 3618b304532a80ddff1e78a26ed4005ce31defb1 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Mon, 4 Mar 2024 11:08:56 +0800 Subject: [PATCH 01/10] [SPARK-47265][SQL][TESTS] Enable test of v2 data sources in `FileBasedDataSourceSuite` --- .../spark/sql/FileBasedDataSourceSuite.scala | 46 ++++++++++--------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 36af4121f30b..32b0fbc67c13 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -500,8 +500,7 @@ class FileBasedDataSourceSuite extends QueryTest test("SPARK-24204 error handling for unsupported Interval data types - csv, json, parquet, orc") { withTempDir { dir => val tempDir = new File(dir, "files").getCanonicalPath - // TODO: test file source V2 after write path is fixed. - Seq(true).foreach { useV1 => + Seq(true, false).foreach { useV1 => val useV1List = if (useV1) { "csv,json,orc,parquet" } else { @@ -560,8 +559,7 @@ class FileBasedDataSourceSuite extends QueryTest } test("SPARK-24204 error handling for unsupported Null data types - csv, parquet, orc") { - // TODO: test file source V2 after write path is fixed. - Seq(true).foreach { useV1 => + Seq(true, false).foreach { useV1 => val useV1List = if (useV1) { "csv,orc,parquet" } else { @@ -683,24 +681,30 @@ class FileBasedDataSourceSuite extends QueryTest } test("SPARK-25237 compute correct input metrics in FileScanRDD") { - // TODO: Test CSV V2 as well after it implements [[SupportsReportStatistics]]. - withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "csv") { - withTempPath { p => - val path = p.getAbsolutePath - spark.range(1000).repartition(1).write.csv(path) - val bytesReads = new mutable.ArrayBuffer[Long]() - val bytesReadListener = new SparkListener() { - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead + Seq(true, false).foreach { useV1 => + val useV1List = if (useV1) { + "csv" + } else { + "" + } + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) { + withTempPath { p => + val path = p.getAbsolutePath + spark.range(1000).repartition(1).write.csv(path) + val bytesReads = new mutable.ArrayBuffer[Long]() + val bytesReadListener = new SparkListener() { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead + } + } + sparkContext.addSparkListener(bytesReadListener) + try { + spark.read.csv(path).limit(1).collect() + sparkContext.listenerBus.waitUntilEmpty() + assert(bytesReads.sum === 7860) + } finally { + sparkContext.removeSparkListener(bytesReadListener) } - } - sparkContext.addSparkListener(bytesReadListener) - try { - spark.read.csv(path).limit(1).collect() - sparkContext.listenerBus.waitUntilEmpty() - assert(bytesReads.sum === 7860) - } finally { - sparkContext.removeSparkListener(bytesReadListener) } } } From 38cc4810d26e0270f56bab34522c9ce769bf1151 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Mon, 4 Mar 2024 19:35:55 +0800 Subject: [PATCH 02/10] [SPARK-47265][SQL][TESTS] Use `createTable(..., schema: StructType, ...)` instead of `createTable(..., columns: Array[Column], ...)` in UT --- .../connect/ProtoToParsedPlanTestSuite.scala | 8 +- .../sql/jdbc/v2/V2JDBCNamespaceTest.scala | 12 +- .../catalog/StagingTableCatalog.java | 2 +- .../sql/connector/catalog/CatalogSuite.scala | 212 +++++++++--------- .../catalog/InMemoryTableCatalog.scala | 17 ++ ...pportsAtomicPartitionManagementSuite.scala | 10 +- .../SupportsPartitionManagementSuite.scala | 18 +- .../DataSourceV2DataFrameSuite.scala | 6 +- .../WriteDistributionAndOrderingSuite.scala | 26 ++- .../InMemoryTableMetricSuite.scala | 6 +- .../v2/V2SessionCatalogSuite.scala | 194 ++++++++-------- 11 files changed, 265 insertions(+), 246 deletions(-) diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala index e0c4e21503e9..9d16efd3bea8 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala @@ -35,10 +35,10 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.connect.config.Connect import org.apache.spark.sql.connect.planner.SparkConnectPlanner import org.apache.spark.sql.connect.service.SessionHolder -import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, InMemoryCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogManager, Column, Identifier, InMemoryCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.LongType import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.Utils @@ -137,12 +137,12 @@ class ProtoToParsedPlanTestSuite inMemoryCatalog.createNamespace(Array("tempdb"), emptyProps) inMemoryCatalog.createTable( Identifier.of(Array("tempdb"), "myTable"), - new StructType().add("id", "long"), + Array(Column.create("id", LongType)), Array.empty[Transform], emptyProps) inMemoryCatalog.createTable( Identifier.of(Array("tempdb"), "myStreamingTable"), - new StructType().add("id", "long"), + Array(Column.create("id", LongType)), Array.empty[Transform], emptyProps) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala index 4eacfbfbd880..e4cc88cec0f5 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala @@ -26,12 +26,12 @@ import org.apache.logging.log4j.Level import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException -import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange} +import org.apache.spark.sql.connector.catalog.{Column, Identifier, NamespaceChange} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{IntegerType, StringType, StructType} +import org.apache.spark.sql.types.{IntegerType, StringType} import org.apache.spark.tags.DockerTest @DockerTest @@ -39,9 +39,9 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte val catalog = new JDBCTableCatalog() private val emptyProps: util.Map[String, String] = Collections.emptyMap[String, String] - private val schema: StructType = new StructType() - .add("id", IntegerType) - .add("data", StringType) + private val columns: Array[Column] = Array( + Column.create("id", IntegerType), + Column.create("data", StringType)) def builtinNamespaces: Array[Array[String]] @@ -116,7 +116,7 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte // Drop non empty namespace without cascade catalog.createNamespace(Array("foo"), commentMap.asJava) assert(catalog.namespaceExists(Array("foo")) === true) - catalog.createTable(ident1, schema, Array.empty[Transform], emptyProps) + catalog.createTable(ident1, columns, Array.empty[Transform], emptyProps) if (supportsDropSchemaRestrict) { intercept[NonEmptyNamespaceException] { catalog.dropNamespace(Array("foo"), cascade = false) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java index e69fc3324c69..a8e1757a492d 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java @@ -38,7 +38,7 @@ * SELECT and REPLACE TABLE AS SELECT operations are atomic. For example, when one runs a REPLACE * TABLE AS SELECT operation, if the catalog does not implement this trait, the planner will first * drop the table via {@link TableCatalog#dropTable(Identifier)}, then create the table via - * {@link TableCatalog#createTable(Identifier, StructType, Transform[], Map)}, and then perform + * {@link TableCatalog#createTable(Identifier, Column[], Transform[], Map)}, and then perform * the write via {@link SupportsWrite#newWriteBuilder(LogicalWriteInfo)}. * However, if the write operation fails, the catalog will have already dropped the table, and the * planner cannot roll back the dropping of the table. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala index 2935b01649ca..145bfd286123 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} import org.apache.spark.sql.connector.expressions.{LogicalExpressions, Transform} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType} +import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StringType, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap class CatalogSuite extends SparkFunSuite { @@ -38,9 +38,9 @@ class CatalogSuite extends SparkFunSuite { private val emptyProps: util.Map[String, String] = Collections.emptyMap[String, String] private val emptyTrans: Array[Transform] = Array.empty - private val schema: StructType = new StructType() - .add("id", IntegerType) - .add("data", StringType) + private val columns: Array[Column] = Array( + Column.create("id", IntegerType), + Column.create("data", StringType)) private def newCatalog(): InMemoryCatalog = { val newCatalog = new InMemoryCatalog @@ -75,13 +75,13 @@ class CatalogSuite extends SparkFunSuite { intercept[NoSuchNamespaceException](catalog.listTables(Array("ns"))) - catalog.createTable(ident1, schema, emptyTrans, emptyProps) + catalog.createTable(ident1, columns, emptyTrans, emptyProps) assert(catalog.listTables(Array("ns")).toSet == Set(ident1)) intercept[NoSuchNamespaceException](catalog.listTables(Array("ns2"))) - catalog.createTable(ident3, schema, emptyTrans, emptyProps) - catalog.createTable(ident2, schema, emptyTrans, emptyProps) + catalog.createTable(ident3, columns, emptyTrans, emptyProps) + catalog.createTable(ident2, columns, emptyTrans, emptyProps) assert(catalog.listTables(Array("ns")).toSet == Set(ident1, ident2)) assert(catalog.listTables(Array("ns2")).toSet == Set(ident3)) @@ -101,11 +101,11 @@ class CatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(testIdent)) - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) assert(parsed == Seq("test", "`", ".", "test_table")) - assert(table.schema == schema) + assert(table.columns === columns) assert(table.properties.asScala == Map()) assert(catalog.tableExists(testIdent)) @@ -119,11 +119,11 @@ class CatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(testIdent)) - val table = catalog.createTable(testIdent, schema, emptyTrans, properties) + val table = catalog.createTable(testIdent, columns, emptyTrans, properties) val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) assert(parsed == Seq("test", "`", ".", "test_table")) - assert(table.schema == schema) + assert(table.columns === columns) assert(table.properties == properties) assert(catalog.tableExists(testIdent)) @@ -134,10 +134,10 @@ class CatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(testIdent)) - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val exc = intercept[TableAlreadyExistsException] { - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) } checkErrorTableAlreadyExists(exc, testIdentQuoted) @@ -150,7 +150,7 @@ class CatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(testIdent)) - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) assert(catalog.tableExists(testIdent)) @@ -162,7 +162,7 @@ class CatalogSuite extends SparkFunSuite { test("loadTable") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val loaded = catalog.loadTable(testIdent) assert(table.name == loaded.name) @@ -183,7 +183,7 @@ class CatalogSuite extends SparkFunSuite { test("invalidateTable") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps) catalog.invalidateTable(testIdent) val loaded = catalog.loadTable(testIdent) @@ -204,7 +204,7 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: add property") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps) assert(table.properties.asScala == Map()) @@ -223,7 +223,7 @@ class CatalogSuite extends SparkFunSuite { val properties = new util.HashMap[String, String]() properties.put("prop-1", "1") - val table = catalog.createTable(testIdent, schema, emptyTrans, properties) + val table = catalog.createTable(testIdent, columns, emptyTrans, properties) assert(table.properties.asScala == Map("prop-1" -> "1")) @@ -242,7 +242,7 @@ class CatalogSuite extends SparkFunSuite { val properties = new util.HashMap[String, String]() properties.put("prop-1", "1") - val table = catalog.createTable(testIdent, schema, emptyTrans, properties) + val table = catalog.createTable(testIdent, columns, emptyTrans, properties) assert(table.properties.asScala == Map("prop-1" -> "1")) @@ -258,7 +258,7 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: remove missing property") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps) assert(table.properties.asScala == Map()) @@ -274,66 +274,66 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: add top-level column") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps) - assert(table.schema == schema) + assert(table.columns === columns) val updated = catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType)) - assert(updated.schema == schema.add("ts", TimestampType)) + assert(updated.columns === columns :+ Column.create("ts", TimestampType)) } test("alterTable: add required column") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps) - assert(table.schema == schema) + assert(table.columns === columns) val updated = catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType, false)) - assert(updated.schema == schema.add("ts", TimestampType, nullable = false)) + assert(updated.columns === columns :+ Column.create("ts", TimestampType, false)) } test("alterTable: add column with comment") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps) - assert(table.schema == schema) + assert(table.columns === columns) val updated = catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType, false, "comment text")) - val field = StructField("ts", TimestampType, nullable = false).withComment("comment text") - assert(updated.schema == schema.add(field)) + val tsColumn = Column.create("ts", TimestampType, false, "comment text", null) + assert(updated.columns === (columns :+ tsColumn)) } test("alterTable: add nested column") { val catalog = newCatalog() val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) - val tableSchema = schema.add("point", pointStruct) + val tableColumns = columns :+ Column.create("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, tableColumns, emptyTrans, emptyProps) - assert(table.schema == tableSchema) + assert(table.columns === tableColumns) val updated = catalog.alterTable(testIdent, TableChange.addColumn(Array("point", "z"), DoubleType)) - val expectedSchema = schema.add("point", pointStruct.add("z", DoubleType)) + val expectedColumns = columns :+ Column.create("point", pointStruct.add("z", DoubleType)) - assert(updated.schema == expectedSchema) + assert(updated.columns === expectedColumns) } test("alterTable: add column to primitive field fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps) - assert(table.schema == schema) + assert(table.columns === columns) checkError( exception = intercept[SparkIllegalArgumentException] { @@ -343,15 +343,15 @@ class CatalogSuite extends SparkFunSuite { parameters = Map("name" -> "data")) // the table has not changed - assert(catalog.loadTable(testIdent).schema == schema) + assert(catalog.loadTable(testIdent).columns === columns) } test("alterTable: add field to missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps) - assert(table.schema == schema) + assert(table.columns === columns) checkError( exception = intercept[SparkIllegalArgumentException] { @@ -365,9 +365,9 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: update column data type") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps) - assert(table.schema == schema) + assert(table.columns === columns) val updated = catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), LongType)) @@ -378,12 +378,12 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: update column nullability") { val catalog = newCatalog() - val originalSchema = new StructType() - .add("id", IntegerType, nullable = false) - .add("data", StringType) - val table = catalog.createTable(testIdent, originalSchema, emptyTrans, emptyProps) + val originalColumns = Array( + Column.create("id", IntegerType, false), + Column.create("data", StringType)) + val table = catalog.createTable(testIdent, originalColumns, emptyTrans, emptyProps) - assert(table.schema == originalSchema) + assert(table.columns === originalColumns) val updated = catalog.alterTable(testIdent, TableChange.updateColumnNullability(Array("id"), true)) @@ -395,9 +395,9 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: update missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps) - assert(table.schema == schema) + assert(table.columns === columns) checkError( exception = intercept[SparkIllegalArgumentException] { @@ -411,9 +411,9 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: add comment") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps) - assert(table.schema == schema) + assert(table.columns === columns) val updated = catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"), "comment text")) @@ -427,9 +427,9 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: replace comment") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps) - assert(table.schema == schema) + assert(table.columns === columns) catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"), "comment text")) @@ -446,9 +446,9 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: add comment to missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps) - assert(table.schema == schema) + assert(table.columns === columns) checkError( exception = intercept[SparkIllegalArgumentException] { @@ -462,9 +462,9 @@ class CatalogSuite extends SparkFunSuite { test("alterTable: rename top-level column") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps) - assert(table.schema == schema) + assert(table.columns === columns) val updated = catalog.alterTable(testIdent, TableChange.renameColumn(Array("id"), "some_id")) @@ -477,46 +477,46 @@ class CatalogSuite extends SparkFunSuite { val catalog = newCatalog() val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) - val tableSchema = schema.add("point", pointStruct) + val tableColumns = columns :+ Column.create("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, tableColumns, emptyTrans, emptyProps) - assert(table.schema == tableSchema) + assert(table.columns === tableColumns) val updated = catalog.alterTable(testIdent, TableChange.renameColumn(Array("point", "x"), "first")) val newPointStruct = new StructType().add("first", DoubleType).add("y", DoubleType) - val expectedSchema = schema.add("point", newPointStruct) + val expectedColumns = columns :+ Column.create("point", newPointStruct) - assert(updated.schema == expectedSchema) + assert(updated.columns === expectedColumns) } test("alterTable: rename struct column") { val catalog = newCatalog() val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) - val tableSchema = schema.add("point", pointStruct) + val tableColumns = columns :+ Column.create("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, tableColumns, emptyTrans, emptyProps) - assert(table.schema == tableSchema) + assert(table.columns === tableColumns) val updated = catalog.alterTable(testIdent, TableChange.renameColumn(Array("point"), "p")) val newPointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) - val expectedSchema = schema.add("p", newPointStruct) + val expectedColumns = columns :+ Column.create("p", newPointStruct) - assert(updated.schema == expectedSchema) + assert(updated.columns === expectedColumns) } test("alterTable: rename missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps) - assert(table.schema == schema) + assert(table.columns === columns) checkError( exception = intercept[SparkIllegalArgumentException] { @@ -531,28 +531,28 @@ class CatalogSuite extends SparkFunSuite { val catalog = newCatalog() val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) - val tableSchema = schema.add("point", pointStruct) + val tableColumns = columns :+ Column.create("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, tableColumns, emptyTrans, emptyProps) - assert(table.schema == tableSchema) + assert(table.columns === tableColumns) val updated = catalog.alterTable(testIdent, TableChange.renameColumn(Array("point", "x"), "first"), TableChange.renameColumn(Array("point", "y"), "second")) val newPointStruct = new StructType().add("first", DoubleType).add("second", DoubleType) - val expectedSchema = schema.add("point", newPointStruct) + val expectedColumns = columns :+ Column.create("point", newPointStruct) - assert(updated.schema == expectedSchema) + assert(updated.columns() === expectedColumns) } test("alterTable: delete top-level column") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps) - assert(table.schema == schema) + assert(table.columns === columns) val updated = catalog.alterTable(testIdent, TableChange.deleteColumn(Array("id"), false)) @@ -565,27 +565,27 @@ class CatalogSuite extends SparkFunSuite { val catalog = newCatalog() val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) - val tableSchema = schema.add("point", pointStruct) + val tableColumns = columns :+ Column.create("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, tableColumns, emptyTrans, emptyProps) - assert(table.schema == tableSchema) + assert(table.columns === tableColumns) val updated = catalog.alterTable(testIdent, TableChange.deleteColumn(Array("point", "y"), false)) val newPointStruct = new StructType().add("x", DoubleType) - val expectedSchema = schema.add("point", newPointStruct) + val expectedColumns = columns :+ Column.create("point", newPointStruct) - assert(updated.schema == expectedSchema) + assert(updated.columns === expectedColumns) } test("alterTable: delete missing column fails") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps) - assert(table.schema == schema) + assert(table.columns === columns) checkError( exception = intercept[SparkIllegalArgumentException] { @@ -596,18 +596,18 @@ class CatalogSuite extends SparkFunSuite { // with if exists it should pass catalog.alterTable(testIdent, TableChange.deleteColumn(Array("missing_col"), true)) - assert(table.schema == schema) + assert(table.columns === columns) } test("alterTable: delete missing nested column fails") { val catalog = newCatalog() val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) - val tableSchema = schema.add("point", pointStruct) + val tableColumns = columns :+ Column.create("point", pointStruct) - val table = catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, tableColumns, emptyTrans, emptyProps) - assert(table.schema == tableSchema) + assert(table.columns === tableColumns) checkError( exception = intercept[SparkIllegalArgumentException] { @@ -618,7 +618,7 @@ class CatalogSuite extends SparkFunSuite { // with if exists it should pass catalog.alterTable(testIdent, TableChange.deleteColumn(Array("point", "z"), true)) - assert(table.schema == tableSchema) + assert(table.columns === tableColumns) } test("alterTable: table does not exist") { @@ -636,7 +636,7 @@ class CatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(testIdent)) - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) assert(catalog.tableExists(testIdent)) @@ -668,7 +668,7 @@ class CatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(testIdent)) assert(!catalog.tableExists(testIdentNew)) - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) assert(catalog.tableExists(testIdent)) catalog.renameTable(testIdent, testIdentNew) @@ -693,8 +693,8 @@ class CatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(testIdent)) assert(!catalog.tableExists(testIdentNew)) - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) - catalog.createTable(testIdentNew, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) + catalog.createTable(testIdentNew, columns, emptyTrans, emptyProps) assert(catalog.tableExists(testIdent)) assert(catalog.tableExists(testIdentNew)) @@ -720,8 +720,8 @@ class CatalogSuite extends SparkFunSuite { val ident1 = Identifier.of(Array("ns1", "ns2"), "test_table_1") val ident2 = Identifier.of(Array("ns1", "ns2"), "test_table_2") - catalog.createTable(ident1, schema, emptyTrans, emptyProps) - catalog.createTable(ident2, schema, emptyTrans, emptyProps) + catalog.createTable(ident1, columns, emptyTrans, emptyProps) + catalog.createTable(ident2, columns, emptyTrans, emptyProps) assert(catalog.listNamespaces === Array(Array("ns1"))) assert(catalog.listNamespaces(Array()) === Array(Array("ns1"))) @@ -735,8 +735,8 @@ class CatalogSuite extends SparkFunSuite { val ident2 = Identifier.of(Array("ns1", "ns2"), "test_table_2") catalog.createNamespace(Array("ns1"), Map("property" -> "value").asJava) - catalog.createTable(ident1, schema, emptyTrans, emptyProps) - catalog.createTable(ident2, schema, emptyTrans, emptyProps) + catalog.createTable(ident1, columns, emptyTrans, emptyProps) + catalog.createTable(ident2, columns, emptyTrans, emptyProps) assert(catalog.listNamespaces === Array(Array("ns1"))) assert(catalog.listNamespaces(Array()) === Array(Array("ns1"))) @@ -757,7 +757,7 @@ class CatalogSuite extends SparkFunSuite { test("loadNamespaceMetadata: no metadata, table exists") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val metadata = catalog.loadNamespaceMetadata(testNs) @@ -778,7 +778,7 @@ class CatalogSuite extends SparkFunSuite { val catalog = newCatalog() catalog.createNamespace(testNs, Map("property" -> "value").asJava) - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val metadata = catalog.loadNamespaceMetadata(testNs) @@ -811,7 +811,7 @@ class CatalogSuite extends SparkFunSuite { test("createNamespace: fail if namespace already exists from table") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) assert(catalog.namespaceExists(testNs) === true) assert(catalog.loadNamespaceMetadata(testNs).asScala === Map.empty) @@ -853,7 +853,7 @@ class CatalogSuite extends SparkFunSuite { val catalog = newCatalog() catalog.createNamespace(testNs, Map("property" -> "value").asJava) - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) assert(catalog.dropNamespace(testNs, cascade = true)) @@ -883,7 +883,7 @@ class CatalogSuite extends SparkFunSuite { test("alterNamespace: create metadata if missing and table exists") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) catalog.alterNamespace(testNs, NamespaceChange.setProperty("property", "value")) @@ -903,7 +903,7 @@ class CatalogSuite extends SparkFunSuite { test("truncate non-partitioned table") { val catalog = newCatalog() - val table = catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps) .asInstanceOf[InMemoryTable] table.withData(Array( new BufferedRows("3").withRow(InternalRow(0, "abc", "3")), @@ -918,9 +918,9 @@ class CatalogSuite extends SparkFunSuite { val table = partCatalog.createTable( testIdent, - new StructType() - .add("col0", IntegerType) - .add("part0", IntegerType), + Array( + Column.create("col0", IntegerType), + Column.create("part0", IntegerType)), Array[Transform](LogicalExpressions.identity(LogicalExpressions.parseReference("part0"))), util.Collections.emptyMap[String, String]) val partTable = table.asInstanceOf[InMemoryPartitionTable] @@ -933,7 +933,7 @@ class CatalogSuite extends SparkFunSuite { new BufferedRows("1").withRow(InternalRow(1, 1)) )) assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 2) - assert(!partTable.rows.isEmpty) + assert(partTable.rows.nonEmpty) assert(partTable.truncateTable()) assert(partTable.listPartitionIdentifiers(Array.empty, InternalRow.empty).length == 2) assert(partTable.rows.isEmpty) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index f1f84fbfeb9f..c794f3f29eab 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -103,6 +103,23 @@ class BasicInMemoryTableCatalog extends TableCatalog { createTable(ident, schema, partitions, properties) } + def createTable( + ident: Identifier, + columns: Array[Column], + partitions: Array[Transform], + properties: util.Map[String, String], + distribution: Distribution, + ordering: Array[SortOrder], + requiredNumPartitions: Option[Int], + advisoryPartitionSize: Option[Long], + distributionStrictlyRequired: Boolean, + numRowsPerSplit: Int): Table = { + val schema = CatalogV2Util.v2ColumnsToStructType(columns) + createTable(ident, schema, partitions, properties, distribution, ordering, + requiredNumPartitions, advisoryPartitionSize, distributionStrictlyRequired, numRowsPerSplit) + } + + @deprecated("use createTable(..., columns: Array[Column], ...) instead", "4.0.0") def createTable( ident: Identifier, schema: StructType, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala index 4d25fda92ec1..1aa0b408366b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException} import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform} -import org.apache.spark.sql.types.{IntegerType, StringType, StructType} +import org.apache.spark.sql.types.{IntegerType, StringType} import org.apache.spark.sql.util.CaseInsensitiveStringMap class SupportsAtomicPartitionManagementSuite extends SparkFunSuite { @@ -37,10 +37,10 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite { newCatalog.initialize("test", CaseInsensitiveStringMap.empty()) newCatalog.createTable( ident, - new StructType() - .add("id", IntegerType) - .add("data", StringType) - .add("dt", StringType), + Array( + Column.create("id", IntegerType), + Column.create("data", StringType), + Column.create("dt", StringType)), Array[Transform](LogicalExpressions.identity(ref("dt"))), util.Collections.emptyMap[String, String]) newCatalog diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala index 698ac5b0ba40..06a23e7fda20 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException, SparkUnsu import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException} import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform} -import org.apache.spark.sql.types.{IntegerType, StringType, StructType} +import org.apache.spark.sql.types.{IntegerType, StringType} import org.apache.spark.sql.util.CaseInsensitiveStringMap class SupportsPartitionManagementSuite extends SparkFunSuite { @@ -39,10 +39,10 @@ class SupportsPartitionManagementSuite extends SparkFunSuite { newCatalog.initialize("test", CaseInsensitiveStringMap.empty()) newCatalog.createTable( ident, - new StructType() - .add("id", IntegerType) - .add("data", StringType) - .add("dt", StringType), + Array( + Column.create("id", IntegerType), + Column.create("data", StringType), + Column.create("dt", StringType)), Array[Transform](LogicalExpressions.identity(ref("dt"))), util.Collections.emptyMap[String, String]) newCatalog @@ -163,10 +163,10 @@ class SupportsPartitionManagementSuite extends SparkFunSuite { partCatalog.initialize("test", CaseInsensitiveStringMap.empty()) val table = partCatalog.createTable( ident, - new StructType() - .add("col0", IntegerType) - .add("part0", IntegerType) - .add("part1", StringType), + Array( + Column.create("col0", IntegerType), + Column.create("part0", IntegerType), + Column.create("part1", StringType)), Array[Transform]( LogicalExpressions.identity(ref("part0")), LogicalExpressions.identity(ref("part1"))), util.Collections.emptyMap[String, String]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index b8e0d50dc9c0..7d48459a8a51 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -22,12 +22,12 @@ import java.util.Collections import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect} -import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog} +import org.apache.spark.sql.connector.catalog.{Column, Identifier, InMemoryTableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.CalendarIntervalType import org.apache.spark.sql.util.QueryExecutionListener class DataSourceV2DataFrameSuite @@ -178,7 +178,7 @@ class DataSourceV2DataFrameSuite val testCatalog = spark.sessionState.catalogManager.catalog("testcat").asTableCatalog testCatalog.createTable( Identifier.of(Array(), "table_name"), - new StructType().add("i", "interval"), + Array(Column.create("i", CalendarIntervalType)), Array.empty[Transform], Collections.emptyMap[String, String]) val df = sql(s"select interval 1 millisecond as i") val v2Writer = df.writeTo("testcat.table_name") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala index 40938eb64247..d1b6af24af6a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{ApplyFunctionExpression, Cast, import org.apache.spark.sql.catalyst.expressions.objects.Invoke import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.{CoalescedBoundary, CoalescedHashPartitioning, HashPartitioning, RangePartitioning, UnknownPartitioning} -import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.{Column, Identifier} import org.apache.spark.sql.connector.catalog.functions._ import org.apache.spark.sql.connector.distributions.{Distribution, Distributions} import org.apache.spark.sql.connector.expressions._ @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream import org.apache.spark.sql.functions.lit import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.{StreamingQueryException, Trigger} -import org.apache.spark.sql.types.{DateType, IntegerType, LongType, ObjectType, StringType, StructType, TimestampType} +import org.apache.spark.sql.types.{DateType, IntegerType, LongType, ObjectType, StringType, TimestampType} import org.apache.spark.sql.util.QueryExecutionListener import org.apache.spark.tags.SlowSQLTest @@ -69,10 +69,10 @@ class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase private val ident = Identifier.of(namespace, "test_table") private val tableNameAsString = "testcat." + ident.toString private val emptyProps = Collections.emptyMap[String, String] - private val schema = new StructType() - .add("id", IntegerType) - .add("data", StringType) - .add("day", DateType) + private val columns = Array( + Column.create("id", IntegerType), + Column.create("data", StringType), + Column.create("day", DateType)) test("ordered distribution and sort with same exprs: append") { checkOrderedDistributionAndSortWithSameExprsInVariousCases("append") @@ -977,7 +977,8 @@ class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase ) val distribution = Distributions.ordered(ordering) - catalog.createTable(ident, schema, Array.empty, emptyProps, distribution, ordering, None, None) + catalog.createTable(ident, columns, Array.empty, emptyProps, distribution, ordering, None, None, + distributionStrictlyRequired = true, numRowsPerSplit = Int.MaxValue) withTempDir { checkpointDir => val inputData = ContinuousMemoryStream[(Long, String, Date)] @@ -1005,7 +1006,7 @@ class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase } test("continuous mode allows unspecified distribution and empty ordering") { - catalog.createTable(ident, schema, Array.empty[Transform], emptyProps) + catalog.createTable(ident, columns, Array.empty[Transform], emptyProps) withTempDir { checkpointDir => val inputData = ContinuousMemoryStream[(Long, String, Date)] @@ -1217,9 +1218,9 @@ class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase coalesce: Boolean = false): Unit = { // scalastyle:on argcount - catalog.createTable(ident, schema, Array.empty, emptyProps, tableDistribution, + catalog.createTable(ident, columns, Array.empty, emptyProps, tableDistribution, tableOrdering, tableNumPartitions, tablePartitionSize, - distributionStrictlyRequired) + distributionStrictlyRequired, numRowsPerSplit = Int.MaxValue) val df = if (!dataSkewed) { spark.createDataFrame(Seq( @@ -1320,8 +1321,9 @@ class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase outputMode: String = "append", expectAnalysisException: Boolean = false): Unit = { - catalog.createTable(ident, schema, Array.empty, emptyProps, tableDistribution, - tableOrdering, tableNumPartitions, tablePartitionSize) + catalog.createTable(ident, columns, Array.empty, emptyProps, tableDistribution, + tableOrdering, tableNumPartitions, tablePartitionSize, + distributionStrictlyRequired = true, numRowsPerSplit = Int.MaxValue) withTempDir { checkpointDir => val inputData = MemoryStream[(Long, String, Date)] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/InMemoryTableMetricSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/InMemoryTableMetricSuite.scala index 33e2fc46ccba..dee8d7ac3e79 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/InMemoryTableMetricSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/InMemoryTableMetricSuite.scala @@ -22,11 +22,11 @@ import org.scalatest.BeforeAndAfter import org.scalatest.time.SpanSugar._ import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog} +import org.apache.spark.sql.connector.catalog.{Column, Identifier, InMemoryTableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.functions.lit import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.IntegerType class InMemoryTableMetricSuite extends QueryTest with SharedSparkSession with BeforeAndAfter { @@ -51,7 +51,7 @@ class InMemoryTableMetricSuite testCatalog.createTable( Identifier.of(Array(), "table_name"), - new StructType().add("i", "int"), + Array(Column.create("i", IntegerType)), Array.empty[Transform], Collections.emptyMap[String, String]) func("testcat.table_name") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index 2195768e3b08..07478d0bf7f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -31,19 +31,19 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.quoteIdentifier -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange, V1Table} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType} +import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap abstract class V2SessionCatalogBaseSuite extends SharedSparkSession with BeforeAndAfter { val emptyProps: util.Map[String, String] = Collections.emptyMap[String, String] val emptyTrans: Array[Transform] = Array.empty - val schema: StructType = new StructType() - .add("id", IntegerType) - .add("data", StringType) + val columns: Array[Column] = Array( + Column.create("id", IntegerType), + Column.create("data", StringType)) val testNs: Array[String] = Array("db") val defaultNs: Array[String] = Array("default") @@ -98,13 +98,13 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(catalog.listTables(Array("ns")).isEmpty) - catalog.createTable(ident1, schema, emptyTrans, emptyProps) + catalog.createTable(ident1, columns, emptyTrans, emptyProps) assert(catalog.listTables(Array("ns")).toSet == Set(ident1)) assert(catalog.listTables(Array("ns2")).isEmpty) - catalog.createTable(ident3, schema, emptyTrans, emptyProps) - catalog.createTable(ident2, schema, emptyTrans, emptyProps) + catalog.createTable(ident3, columns, emptyTrans, emptyProps) + catalog.createTable(ident2, columns, emptyTrans, emptyProps) assert(catalog.listTables(Array("ns")).toSet == Set(ident1, ident2)) assert(catalog.listTables(Array("ns2")).toSet == Set(ident3)) @@ -126,12 +126,12 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) assert(parsed == Seq("db", "test_table")) - assert(table.schema == schema) + assert(table.columns === columns) assert(filterV2TableProperties(table.properties) == Map()) assert(catalog.tableExists(testIdent)) @@ -145,12 +145,12 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) - catalog.createTable(testIdent, schema, emptyTrans, properties) + catalog.createTable(testIdent, columns, emptyTrans, properties) val table = catalog.loadTable(testIdent) val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) assert(parsed == Seq("db", "test_table")) - assert(table.schema == schema) + assert(table.columns === columns) assert(filterV2TableProperties(table.properties).asJava == properties) assert(catalog.tableExists(testIdent)) @@ -161,14 +161,14 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name) .map(part => quoteIdentifier(part)).mkString(".") val exc = intercept[TableAlreadyExistsException] { - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) } checkErrorTableAlreadyExists(exc, parsed) @@ -189,7 +189,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) // default location - catalog.createTable(testIdent, schema, emptyTrans, properties) + catalog.createTable(testIdent, columns, emptyTrans, properties) val t1 = catalog.loadTable(testIdent).asInstanceOf[V1Table] assert(t1.catalogTable.location === spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier)) @@ -197,21 +197,21 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { // relative path properties.put(TableCatalog.PROP_LOCATION, "relative/path") - catalog.createTable(testIdent, schema, emptyTrans, properties) + catalog.createTable(testIdent, columns, emptyTrans, properties) val t2 = catalog.loadTable(testIdent).asInstanceOf[V1Table] assert(t2.catalogTable.location === makeQualifiedPathWithWarehouse("db.db/relative/path")) catalog.dropTable(testIdent) // absolute path without scheme properties.put(TableCatalog.PROP_LOCATION, "/absolute/path") - catalog.createTable(testIdent, schema, emptyTrans, properties) + catalog.createTable(testIdent, columns, emptyTrans, properties) val t3 = catalog.loadTable(testIdent).asInstanceOf[V1Table] assert(t3.catalogTable.location.toString === "file:///absolute/path") catalog.dropTable(testIdent) // absolute path with scheme properties.put(TableCatalog.PROP_LOCATION, "file:/absolute/path") - catalog.createTable(testIdent, schema, emptyTrans, properties) + catalog.createTable(testIdent, columns, emptyTrans, properties) val t4 = catalog.loadTable(testIdent).asInstanceOf[V1Table] assert(t4.catalogTable.location.toString === "file:/absolute/path") catalog.dropTable(testIdent) @@ -222,7 +222,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) assert(catalog.tableExists(testIdent)) @@ -234,11 +234,11 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("loadTable") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val loaded = catalog.loadTable(testIdent) assert(loaded.name == testIdent.toString) - assert(loaded.schema == schema) + assert(loaded.columns === columns) } test("loadTable: table does not exist") { @@ -254,7 +254,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("invalidateTable") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) catalog.invalidateTable(testIdent) @@ -276,7 +276,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add property") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) assert(filterV2TableProperties(table.properties) == Map()) @@ -297,7 +297,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val properties = new util.HashMap[String, String]() properties.put("prop-1", "1") - catalog.createTable(testIdent, schema, emptyTrans, properties) + catalog.createTable(testIdent, columns, emptyTrans, properties) val table = catalog.loadTable(testIdent) assert(filterV2TableProperties(table.properties) == Map("prop-1" -> "1")) @@ -318,7 +318,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val properties = new util.HashMap[String, String]() properties.put("prop-1", "1") - catalog.createTable(testIdent, schema, emptyTrans, properties) + catalog.createTable(testIdent, columns, emptyTrans, properties) val table = catalog.loadTable(testIdent) assert(filterV2TableProperties(table.properties) == Map("prop-1" -> "1")) @@ -336,7 +336,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: remove missing property") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) assert(filterV2TableProperties(table.properties) == Map()) @@ -354,75 +354,75 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add top-level column") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) - assert(table.schema == schema) + assert(table.columns === columns) catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType)) val updated = catalog.loadTable(testIdent) - assert(updated.schema == schema.add("ts", TimestampType)) + assert(updated.columns() === columns :+ Column.create("ts", TimestampType)) } test("alterTable: add required column") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) - assert(table.schema == schema) + assert(table.columns === columns) catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType, false)) val updated = catalog.loadTable(testIdent) - assert(updated.schema == schema.add("ts", TimestampType, nullable = false)) + assert(updated.columns() === columns :+ Column.create("ts", TimestampType, false)) } test("alterTable: add column with comment") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) - assert(table.schema == schema) + assert(table.columns === columns) catalog.alterTable(testIdent, TableChange.addColumn(Array("ts"), TimestampType, false, "comment text")) val updated = catalog.loadTable(testIdent) - val field = StructField("ts", TimestampType, nullable = false).withComment("comment text") - assert(updated.schema == schema.add(field)) + val tsColumn = Column.create("ts", TimestampType, false, "comment text", null) + assert(updated.columns === columns :+ tsColumn) } test("alterTable: add nested column") { val catalog = newCatalog() val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) - val tableSchema = schema.add("point", pointStruct) + val tableColumns = columns :+ Column.create("point", pointStruct) - catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + catalog.createTable(testIdent, tableColumns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) - assert(table.schema == tableSchema) + assert(table.columns === tableColumns) catalog.alterTable(testIdent, TableChange.addColumn(Array("point", "z"), DoubleType)) val updated = catalog.loadTable(testIdent) - val expectedSchema = schema.add("point", pointStruct.add("z", DoubleType)) + val expectedColumns = columns :+ Column.create("point", pointStruct.add("z", DoubleType)) - assert(updated.schema == expectedSchema) + assert(updated.columns === expectedColumns) } test("alterTable: add column to primitive field fails") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) - assert(table.schema == schema) + assert(table.columns === columns) val exc = intercept[IllegalArgumentException] { catalog.alterTable(testIdent, TableChange.addColumn(Array("data", "ts"), TimestampType)) @@ -432,16 +432,16 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(exc.getMessage.contains("data")) // the table has not changed - assert(catalog.loadTable(testIdent).schema == schema) + assert(catalog.loadTable(testIdent).columns === columns) } test("alterTable: add field to missing column fails") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) - assert(table.schema == schema) + assert(table.columns === columns) val exc = intercept[IllegalArgumentException] { catalog.alterTable(testIdent, @@ -455,10 +455,10 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: update column data type") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) - assert(table.schema == schema) + assert(table.columns === columns) catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), LongType)) val updated = catalog.loadTable(testIdent) @@ -489,10 +489,10 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: update missing column fails") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) - assert(table.schema == schema) + assert(table.columns === columns) val exc = intercept[IllegalArgumentException] { catalog.alterTable(testIdent, @@ -506,10 +506,10 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add comment") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) - assert(table.schema == schema) + assert(table.columns === columns) catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"), "comment text")) @@ -524,10 +524,10 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: replace comment") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) - assert(table.schema == schema) + assert(table.columns === columns) catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"), "comment text")) @@ -545,10 +545,10 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: add comment to missing column fails") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) - assert(table.schema == schema) + assert(table.columns === columns) val exc = intercept[IllegalArgumentException] { catalog.alterTable(testIdent, @@ -562,10 +562,10 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: rename top-level column") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) - assert(table.schema == schema) + assert(table.columns === columns) catalog.alterTable(testIdent, TableChange.renameColumn(Array("id"), "some_id")) val updated = catalog.loadTable(testIdent) @@ -579,50 +579,50 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val catalog = newCatalog() val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) - val tableSchema = schema.add("point", pointStruct) + val tableColumns = columns :+ Column.create("point", pointStruct) - catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + catalog.createTable(testIdent, tableColumns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) - assert(table.schema == tableSchema) + assert(table.columns === tableColumns) catalog.alterTable(testIdent, TableChange.renameColumn(Array("point", "x"), "first")) val updated = catalog.loadTable(testIdent) val newPointStruct = new StructType().add("first", DoubleType).add("y", DoubleType) - val expectedSchema = schema.add("point", newPointStruct) + val expectedColumns = columns :+ Column.create("point", newPointStruct) - assert(updated.schema == expectedSchema) + assert(updated.columns === expectedColumns) } test("alterTable: rename struct column") { val catalog = newCatalog() val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) - val tableSchema = schema.add("point", pointStruct) + val tableColumns = columns :+ Column.create("point", pointStruct) - catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + catalog.createTable(testIdent, tableColumns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) - assert(table.schema == tableSchema) + assert(table.columns === tableColumns) catalog.alterTable(testIdent, TableChange.renameColumn(Array("point"), "p")) val updated = catalog.loadTable(testIdent) val newPointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) - val expectedSchema = schema.add("p", newPointStruct) + val expectedColumns = columns :+ Column.create("p", newPointStruct) - assert(updated.schema == expectedSchema) + assert(updated.columns === expectedColumns) } test("alterTable: rename missing column fails") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) - assert(table.schema == schema) + assert(table.columns === columns) val exc = intercept[IllegalArgumentException] { catalog.alterTable(testIdent, @@ -637,12 +637,12 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val catalog = newCatalog() val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) - val tableSchema = schema.add("point", pointStruct) + val tableColumns = columns :+ Column.create("point", pointStruct) - catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + catalog.createTable(testIdent, tableColumns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) - assert(table.schema == tableSchema) + assert(table.columns === tableColumns) catalog.alterTable(testIdent, TableChange.renameColumn(Array("point", "x"), "first"), @@ -650,18 +650,18 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val updated = catalog.loadTable(testIdent) val newPointStruct = new StructType().add("first", DoubleType).add("second", DoubleType) - val expectedSchema = schema.add("point", newPointStruct) + val expectedColumns = columns :+ Column.create("point", newPointStruct) - assert(updated.schema == expectedSchema) + assert(updated.columns === expectedColumns) } test("alterTable: delete top-level column") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) - assert(table.schema == schema) + assert(table.columns === columns) catalog.alterTable(testIdent, TableChange.deleteColumn(Array("id"), false)) val updated = catalog.loadTable(testIdent) @@ -674,29 +674,29 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val catalog = newCatalog() val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) - val tableSchema = schema.add("point", pointStruct) + val tableColumns = columns :+ Column.create("point", pointStruct) - catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + catalog.createTable(testIdent, tableColumns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) - assert(table.schema == tableSchema) + assert(table.columns === tableColumns) catalog.alterTable(testIdent, TableChange.deleteColumn(Array("point", "y"), false)) val updated = catalog.loadTable(testIdent) val newPointStruct = new StructType().add("x", DoubleType) - val expectedSchema = schema.add("point", newPointStruct) + val expectedColumns = columns :+ Column.create("point", newPointStruct) - assert(updated.schema == expectedSchema) + assert(updated.columns === expectedColumns) } test("alterTable: delete missing column fails") { val catalog = newCatalog() - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) - assert(table.schema == schema) + assert(table.columns === columns) val exc = intercept[IllegalArgumentException] { catalog.alterTable(testIdent, TableChange.deleteColumn(Array("missing_col"), false)) @@ -707,19 +707,19 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { // with if exists it should pass catalog.alterTable(testIdent, TableChange.deleteColumn(Array("missing_col"), true)) - assert(table.schema == schema) + assert(table.columns === columns) } test("alterTable: delete missing nested column fails") { val catalog = newCatalog() val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) - val tableSchema = schema.add("point", pointStruct) + val tableColumns = columns :+ Column.create("point", pointStruct) - catalog.createTable(testIdent, tableSchema, emptyTrans, emptyProps) + catalog.createTable(testIdent, tableColumns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) - assert(table.schema == tableSchema) + assert(table.columns === tableColumns) val exc = intercept[IllegalArgumentException] { catalog.alterTable(testIdent, TableChange.deleteColumn(Array("point", "z"), false)) @@ -730,7 +730,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { // with if exists it should pass catalog.alterTable(testIdent, TableChange.deleteColumn(Array("point", "z"), true)) - assert(table.schema == tableSchema) + assert(table.columns === tableColumns) } test("alterTable: table does not exist") { @@ -748,7 +748,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) // default location - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val t1 = catalog.loadTable(testIdent).asInstanceOf[V1Table] assert(t1.catalogTable.location === spark.sessionState.catalog.defaultTablePath(testIdent.asTableIdentifier)) @@ -777,7 +777,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) assert(catalog.tableExists(testIdent)) @@ -804,7 +804,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) assert(!catalog.tableExists(testIdentNew)) - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) assert(catalog.tableExists(testIdent)) catalog.renameTable(testIdent, testIdentNew) @@ -829,8 +829,8 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) assert(!catalog.tableExists(testIdentNew)) - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) - catalog.createTable(testIdentNew, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) + catalog.createTable(testIdentNew, columns, emptyTrans, emptyProps) assert(catalog.tableExists(testIdent)) assert(catalog.tableExists(testIdentNew)) @@ -849,7 +849,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(!catalog.tableExists(testIdent)) assert(!catalog.tableExists(testIdentNewOtherDb)) - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) assert(catalog.tableExists(testIdent)) @@ -1036,7 +1036,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { assert(catalog.namespaceExists(testNs) === false) val exc = intercept[NoSuchDatabaseException] { - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) } assert(exc.getMessage.contains(testNs.quoted)) @@ -1070,7 +1070,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { val catalog = newCatalog() catalog.createNamespace(testNs, Map("property" -> "value").asJava) - catalog.createTable(testIdent, schema, emptyTrans, emptyProps) + catalog.createTable(testIdent, columns, emptyTrans, emptyProps) val exc = intercept[AnalysisException] { catalog.dropNamespace(testNs, cascade = false) From cf0bbb4a62320e52fa8790e1bf4f7d2f552f3d86 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Mon, 4 Mar 2024 19:36:06 +0800 Subject: [PATCH 03/10] Revert "[SPARK-47265][SQL][TESTS] Enable test of v2 data sources in `FileBasedDataSourceSuite`" This reverts commit 3618b304532a80ddff1e78a26ed4005ce31defb1. --- .../spark/sql/FileBasedDataSourceSuite.scala | 46 +++++++++---------- 1 file changed, 21 insertions(+), 25 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 32b0fbc67c13..36af4121f30b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -500,7 +500,8 @@ class FileBasedDataSourceSuite extends QueryTest test("SPARK-24204 error handling for unsupported Interval data types - csv, json, parquet, orc") { withTempDir { dir => val tempDir = new File(dir, "files").getCanonicalPath - Seq(true, false).foreach { useV1 => + // TODO: test file source V2 after write path is fixed. + Seq(true).foreach { useV1 => val useV1List = if (useV1) { "csv,json,orc,parquet" } else { @@ -559,7 +560,8 @@ class FileBasedDataSourceSuite extends QueryTest } test("SPARK-24204 error handling for unsupported Null data types - csv, parquet, orc") { - Seq(true, false).foreach { useV1 => + // TODO: test file source V2 after write path is fixed. + Seq(true).foreach { useV1 => val useV1List = if (useV1) { "csv,orc,parquet" } else { @@ -681,31 +683,25 @@ class FileBasedDataSourceSuite extends QueryTest } test("SPARK-25237 compute correct input metrics in FileScanRDD") { - Seq(true, false).foreach { useV1 => - val useV1List = if (useV1) { - "csv" - } else { - "" - } - withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) { - withTempPath { p => - val path = p.getAbsolutePath - spark.range(1000).repartition(1).write.csv(path) - val bytesReads = new mutable.ArrayBuffer[Long]() - val bytesReadListener = new SparkListener() { - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead - } - } - sparkContext.addSparkListener(bytesReadListener) - try { - spark.read.csv(path).limit(1).collect() - sparkContext.listenerBus.waitUntilEmpty() - assert(bytesReads.sum === 7860) - } finally { - sparkContext.removeSparkListener(bytesReadListener) + // TODO: Test CSV V2 as well after it implements [[SupportsReportStatistics]]. + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "csv") { + withTempPath { p => + val path = p.getAbsolutePath + spark.range(1000).repartition(1).write.csv(path) + val bytesReads = new mutable.ArrayBuffer[Long]() + val bytesReadListener = new SparkListener() { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead } } + sparkContext.addSparkListener(bytesReadListener) + try { + spark.read.csv(path).limit(1).collect() + sparkContext.listenerBus.waitUntilEmpty() + assert(bytesReads.sum === 7860) + } finally { + sparkContext.removeSparkListener(bytesReadListener) + } } } } From 64909cfc29ff9a39af848afc7bc5804cc0f0501b Mon Sep 17 00:00:00 2001 From: panbingkun Date: Mon, 4 Mar 2024 20:16:04 +0800 Subject: [PATCH 04/10] improve --- .../catalog/InMemoryTableCatalog.scala | 26 +-- .../KeyGroupedPartitioningSuite.scala | 173 +++++++++--------- .../WriteDistributionAndOrderingSuite.scala | 9 +- 3 files changed, 94 insertions(+), 114 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index c794f3f29eab..fad54d1cf8cd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -90,8 +90,8 @@ class BasicInMemoryTableCatalog extends TableCatalog { schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): Table = { - createTable(ident, schema, partitions, properties, Distributions.unspecified(), - Array.empty, None, None) + val columns = CatalogV2Util.structTypeToV2Columns(schema) + createTable(ident, columns, partitions, properties) } override def createTable( @@ -99,8 +99,8 @@ class BasicInMemoryTableCatalog extends TableCatalog { columns: Array[Column], partitions: Array[Transform], properties: util.Map[String, String]): Table = { - val schema = CatalogV2Util.v2ColumnsToStructType(columns) - createTable(ident, schema, partitions, properties) + createTable(ident, columns, partitions, properties, Distributions.unspecified(), + Array.empty, None, None) } def createTable( @@ -112,25 +112,9 @@ class BasicInMemoryTableCatalog extends TableCatalog { ordering: Array[SortOrder], requiredNumPartitions: Option[Int], advisoryPartitionSize: Option[Long], - distributionStrictlyRequired: Boolean, - numRowsPerSplit: Int): Table = { - val schema = CatalogV2Util.v2ColumnsToStructType(columns) - createTable(ident, schema, partitions, properties, distribution, ordering, - requiredNumPartitions, advisoryPartitionSize, distributionStrictlyRequired, numRowsPerSplit) - } - - @deprecated("use createTable(..., columns: Array[Column], ...) instead", "4.0.0") - def createTable( - ident: Identifier, - schema: StructType, - partitions: Array[Transform], - properties: util.Map[String, String], - distribution: Distribution, - ordering: Array[SortOrder], - requiredNumPartitions: Option[Int], - advisoryPartitionSize: Option[Long], distributionStrictlyRequired: Boolean = true, numRowsPerSplit: Int = Int.MaxValue): Table = { + val schema = CatalogV2Util.v2ColumnsToStructType(columns) if (tables.containsKey(ident)) { throw new TableAlreadyExistsException(ident.asMultipartIdentifier) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index e6448d4d80fd..f2c5591aab1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -23,8 +23,7 @@ import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Literal, TransformExpression} import org.apache.spark.sql.catalyst.plans.physical -import org.apache.spark.sql.connector.catalog.Identifier -import org.apache.spark.sql.connector.catalog.InMemoryTableCatalog +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, InMemoryTableCatalog} import org.apache.spark.sql.connector.catalog.functions._ import org.apache.spark.sql.connector.distributions.Distributions import org.apache.spark.sql.connector.expressions._ @@ -64,16 +63,16 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { Collections.emptyMap[String, String] } private val table: String = "tbl" - private val schema = new StructType() - .add("id", IntegerType) - .add("data", StringType) - .add("ts", TimestampType) + private val columns: Array[Column] = Array( + Column.create("id", IntegerType), + Column.create("data", StringType), + Column.create("ts", TimestampType)) test("clustered distribution: output partitioning should be KeyGroupedPartitioning") { val partitions: Array[Transform] = Array(Expressions.years("ts")) // create a table with 3 partitions, partitioned by `years` transform - createTable(table, schema, partitions) + createTable(table, columns, partitions) sql(s"INSERT INTO testcat.ns.$table VALUES " + s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " + s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " + @@ -98,7 +97,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("non-clustered distribution: no partition") { val partitions: Array[Transform] = Array(bucket(32, "ts")) - createTable(table, schema, partitions) + createTable(table, columns, partitions) val df = sql(s"SELECT * FROM testcat.ns.$table") val distribution = physical.ClusteredDistribution( @@ -109,7 +108,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("non-clustered distribution: single partition") { val partitions: Array[Transform] = Array(bucket(32, "ts")) - createTable(table, schema, partitions) + createTable(table, columns, partitions) sql(s"INSERT INTO testcat.ns.$table VALUES (0, 'aaa', CAST('2020-01-01' AS timestamp))") val df = sql(s"SELECT * FROM testcat.ns.$table") @@ -127,7 +126,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { val nonFunctionCatalog = spark.sessionState.catalogManager.catalog("testcat2") .asInstanceOf[InMemoryTableCatalog] val partitions: Array[Transform] = Array(bucket(32, "ts")) - createTable(table, schema, partitions, catalog = nonFunctionCatalog) + createTable(table, columns, partitions, catalog = nonFunctionCatalog) sql(s"INSERT INTO testcat2.ns.$table VALUES " + s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " + s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " + @@ -147,7 +146,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { catalog.clearFunctions() val partitions: Array[Transform] = Array(bucket(32, "ts")) - createTable(table, schema, partitions) + createTable(table, columns, partitions) sql(s"INSERT INTO testcat.ns.$table VALUES " + s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " + s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " + @@ -162,7 +161,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("non-clustered distribution: V2 bucketing disabled") { withSQLConf(SQLConf.V2_BUCKETING_ENABLED.key -> "false") { val partitions: Array[Transform] = Array(bucket(32, "ts")) - createTable(table, schema, partitions) + createTable(table, columns, partitions) sql(s"INSERT INTO testcat.ns.$table VALUES " + s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " + s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " + @@ -182,7 +181,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { ) // create a table with 3 partitions, partitioned by `truncate` transform - createTable(table, schema, partitions) + createTable(table, columns, partitions) sql(s"INSERT INTO testcat.ns.$table VALUES " + s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " + s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " + @@ -227,24 +226,24 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { private def createTable( table: String, - schema: StructType, + columns: Array[Column], partitions: Array[Transform], catalog: InMemoryTableCatalog = catalog): Unit = { catalog.createTable(Identifier.of(Array("ns"), table), - schema, partitions, emptyProps, Distributions.unspecified(), Array.empty, None, None, + columns, partitions, emptyProps, Distributions.unspecified(), Array.empty, None, None, numRowsPerSplit = 1) } private val customers: String = "customers" - private val customers_schema = new StructType() - .add("customer_name", StringType) - .add("customer_age", IntegerType) - .add("customer_id", LongType) + private val customersColumns: Array[Column] = Array( + Column.create("customer_name", StringType), + Column.create("customer_age", IntegerType), + Column.create("customer_id", LongType)) private val orders: String = "orders" - private val orders_schema = new StructType() - .add("order_amount", DoubleType) - .add("customer_id", LongType) + private val ordersColumns: Array[Column] = Array( + Column.create("order_amount", DoubleType), + Column.create("customer_id", LongType)) private def selectWithMergeJoinHint(t1: String, t2: String): String = { s"SELECT /*+ MERGE($t1, $t2) */ " @@ -269,11 +268,11 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { customers_partitions: Array[Transform], orders_partitions: Array[Transform], expectedNumOfShuffleExecs: Int): Unit = { - createTable(customers, customers_schema, customers_partitions) + createTable(customers, customersColumns, customers_partitions) sql(s"INSERT INTO testcat.ns.$customers VALUES " + s"('aaa', 10, 1), ('bbb', 20, 2), ('ccc', 30, 3)") - createTable(orders, orders_schema, orders_partitions) + createTable(orders, ordersColumns, orders_partitions) sql(s"INSERT INTO testcat.ns.$orders VALUES " + s"(100.0, 1), (200.0, 1), (150.0, 2), (250.0, 2), (350.0, 2), (400.50, 3)") @@ -329,21 +328,21 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } private val items: String = "items" - private val items_schema: StructType = new StructType() - .add("id", LongType) - .add("name", StringType) - .add("price", FloatType) - .add("arrive_time", TimestampType) + private val itemsColumns: Array[Column] = Array( + Column.create("id", LongType), + Column.create("name", StringType), + Column.create("price", FloatType), + Column.create("arrive_time", TimestampType)) private val purchases: String = "purchases" - private val purchases_schema: StructType = new StructType() - .add("item_id", LongType) - .add("price", FloatType) - .add("time", TimestampType) + private val purchasesColumns: Array[Column] = Array( + Column.create("item_id", LongType), + Column.create("price", FloatType), + Column.create("time", TimestampType)) test("partitioned join: join with two partition keys and matching & sorted partitions") { val items_partitions = Array(bucket(8, "id"), days("arrive_time")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + @@ -352,7 +351,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(bucket(8, "item_id"), days("time")) - createTable(purchases, purchases_schema, purchases_partitions) + createTable(purchases, purchasesColumns, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + s"(1, 44.0, cast('2020-01-15' as timestamp)), " + @@ -375,7 +374,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("partitioned join: join with two partition keys and unsorted partitions") { val items_partitions = Array(bucket(8, "id"), days("arrive_time")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp)), " + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + @@ -384,7 +383,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp))") val purchases_partitions = Array(bucket(8, "item_id"), days("time")) - createTable(purchases, purchases_schema, purchases_partitions) + createTable(purchases, purchasesColumns, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + s"(2, 11.0, cast('2020-01-01' as timestamp)), " + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + @@ -407,7 +406,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("partitioned join: join with two partition keys and different # of partition keys") { val items_partitions = Array(bucket(8, "id"), days("arrive_time")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + @@ -415,7 +414,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(bucket(8, "item_id"), days("time")) - createTable(purchases, purchases_schema, purchases_partitions) + createTable(purchases, purchasesColumns, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + s"(2, 11.0, cast('2020-01-01' as timestamp))") @@ -440,7 +439,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("SPARK-41413: partitioned join: partition values from one side are subset of those from " + "the other side") { val items_partitions = Array(bucket(4, "id")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + @@ -448,7 +447,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { "(4, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(bucket(4, "item_id")) - createTable(purchases, purchases_schema, purchases_partitions) + createTable(purchases, purchasesColumns, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + "(1, 42.0, cast('2020-01-01' as timestamp)), " + @@ -472,7 +471,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("SPARK-41413: partitioned join: partition values from both sides overlaps") { val items_partitions = Array(identity("id")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + @@ -480,7 +479,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { "(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) - createTable(purchases, purchases_schema, purchases_partitions) + createTable(purchases, purchasesColumns, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + "(1, 42.0, cast('2020-01-01' as timestamp)), " + "(2, 19.5, cast('2020-02-01' as timestamp)), " + @@ -504,14 +503,14 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("SPARK-41413: partitioned join: non-overlapping partition values from both sides") { val items_partitions = Array(identity("id")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + "(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + "(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) - createTable(purchases, purchases_schema, purchases_partitions) + createTable(purchases, purchasesColumns, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + "(4, 42.0, cast('2020-01-01' as timestamp)), " + "(5, 19.5, cast('2020-02-01' as timestamp)), " + @@ -535,14 +534,14 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("SPARK-42038: partially clustered: with same partition keys and one side fully clustered") { val items_partitions = Array(identity("id")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) - createTable(purchases, purchases_schema, purchases_partitions) + createTable(purchases, purchasesColumns, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + s"(1, 45.0, cast('2020-01-01' as timestamp)), " + s"(1, 50.0, cast('2020-01-02' as timestamp)), " + @@ -573,7 +572,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("SPARK-42038: partially clustered: with same partition keys and both sides partially " + "clustered") { val items_partitions = Array(identity("id")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " + @@ -581,7 +580,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) - createTable(purchases, purchases_schema, purchases_partitions) + createTable(purchases, purchasesColumns, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + s"(1, 45.0, cast('2020-01-01' as timestamp)), " + s"(1, 50.0, cast('2020-01-02' as timestamp)), " + @@ -617,7 +616,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("SPARK-42038: partially clustered: with different partition keys and both sides partially " + "clustered") { val items_partitions = Array(identity("id")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " + @@ -626,7 +625,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { s"(4, 'dd', 18.0, cast('2023-01-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) - createTable(purchases, purchases_schema, purchases_partitions) + createTable(purchases, purchasesColumns, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + s"(1, 45.0, cast('2020-01-01' as timestamp)), " + s"(1, 50.0, cast('2020-01-02' as timestamp)), " + @@ -667,7 +666,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("SPARK-42038: partially clustered: with different partition keys and missing keys on " + "left-hand side") { val items_partitions = Array(identity("id")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " + @@ -675,7 +674,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { s"(4, 'dd', 18.0, cast('2023-01-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) - createTable(purchases, purchases_schema, purchases_partitions) + createTable(purchases, purchasesColumns, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + s"(1, 45.0, cast('2020-01-01' as timestamp)), " + s"(1, 50.0, cast('2020-01-02' as timestamp)), " + @@ -714,7 +713,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("SPARK-42038: partially clustered: with different partition keys and missing keys on " + "right-hand side") { val items_partitions = Array(identity("id")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " + @@ -722,7 +721,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) - createTable(purchases, purchases_schema, purchases_partitions) + createTable(purchases, purchasesColumns, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + s"(2, 15.0, cast('2020-01-02' as timestamp)), " + s"(2, 20.0, cast('2020-01-03' as timestamp)), " + @@ -755,7 +754,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("SPARK-42038: partially clustered: left outer join") { val items_partitions = Array(identity("id")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " + @@ -764,7 +763,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) - createTable(purchases, purchases_schema, purchases_partitions) + createTable(purchases, purchasesColumns, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + s"(2, 20.0, cast('2020-01-01' as timestamp)), " + s"(3, 20.0, cast('2020-02-01' as timestamp)), " + @@ -802,7 +801,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("SPARK-42038: partially clustered: right outer join") { val items_partitions = Array(identity("id")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " + @@ -810,7 +809,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) - createTable(purchases, purchases_schema, purchases_partitions) + createTable(purchases, purchasesColumns, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + s"(1, 45.0, cast('2020-01-01' as timestamp)), " + s"(2, 15.0, cast('2020-01-01' as timestamp)), " + @@ -853,7 +852,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("SPARK-42038: partially clustered: full outer join is not applicable") { val items_partitions = Array(identity("id")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " + @@ -861,7 +860,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { s"(3, 'cc', 15.5, cast('2020-01-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) - createTable(purchases, purchases_schema, purchases_partitions) + createTable(purchases, purchasesColumns, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + s"(1, 45.0, cast('2020-01-01' as timestamp)), " + s"(2, 15.0, cast('2020-01-01' as timestamp)), " + @@ -907,7 +906,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "10") { val items_partitions = Array(identity("id")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + @@ -916,7 +915,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) - createTable(purchases, purchases_schema, purchases_partitions) + createTable(purchases, purchasesColumns, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + s"(1, 44.0, cast('2020-01-15' as timestamp)), " + @@ -946,7 +945,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("SPARK-42038: partially clustered: with dynamic partition filtering") { val items_partitions = Array(identity("id")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + @@ -956,7 +955,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { s"(4, 'dd', 18.0, cast('2023-01-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) - createTable(purchases, purchases_schema, purchases_partitions) + createTable(purchases, purchasesColumns, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + s"(1, 44.0, cast('2020-01-15' as timestamp)), " + @@ -1008,14 +1007,14 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("SPARK-41471: shuffle one side: only one side reports partitioning") { val items_partitions = Array(identity("id")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + "(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + "(4, 'cc', 15.5, cast('2020-02-01' as timestamp))") - createTable(purchases, purchases_schema, Array.empty) + createTable(purchases, purchasesColumns, Array.empty) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + "(1, 42.0, cast('2020-01-01' as timestamp)), " + "(3, 19.5, cast('2020-02-01' as timestamp))") @@ -1038,14 +1037,14 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("SPARK-41471: shuffle one side: shuffle side has more partition value") { val items_partitions = Array(identity("id")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + "(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + "(4, 'cc', 15.5, cast('2020-02-01' as timestamp))") - createTable(purchases, purchases_schema, Array.empty) + createTable(purchases, purchasesColumns, Array.empty) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + "(1, 42.0, cast('2020-01-01' as timestamp)), " + "(3, 19.5, cast('2020-02-01' as timestamp)), " + @@ -1084,14 +1083,14 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("SPARK-41471: shuffle one side: only one side reports partitioning with two identity") { val items_partitions = Array(identity("id"), identity("arrive_time")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + "(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + "(4, 'cc', 15.5, cast('2020-02-01' as timestamp))") - createTable(purchases, purchases_schema, Array.empty) + createTable(purchases, purchasesColumns, Array.empty) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + "(1, 42.0, cast('2020-01-01' as timestamp)), " + "(3, 19.5, cast('2020-02-01' as timestamp))") @@ -1114,14 +1113,14 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("SPARK-41471: shuffle one side: partitioning with transform") { val items_partitions = Array(years("arrive_time")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + "(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + "(4, 'cc', 15.5, cast('2021-02-01' as timestamp))") - createTable(purchases, purchases_schema, Array.empty) + createTable(purchases, purchasesColumns, Array.empty) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + "(1, 42.0, cast('2020-01-01' as timestamp)), " + "(3, 19.5, cast('2021-02-01' as timestamp))") @@ -1147,14 +1146,14 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("SPARK-41471: shuffle one side: work with group partition split") { val items_partitions = Array(identity("id")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + "(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + "(4, 'cc', 15.5, cast('2020-02-01' as timestamp))") - createTable(purchases, purchases_schema, Array.empty) + createTable(purchases, purchasesColumns, Array.empty) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + "(1, 42.0, cast('2020-01-01' as timestamp)), " + "(3, 19.5, cast('2020-02-01' as timestamp)), " + @@ -1174,7 +1173,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("SPARK-44641: duplicated records when SPJ is not triggered") { val items_partitions = Array(bucket(8, "id")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s""" INSERT INTO testcat.ns.$items VALUES (1, 'aa', 40.0, cast('2020-01-01' as timestamp)), @@ -1184,7 +1183,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { (3, 'cc', 15.5, cast('2020-02-01' as timestamp))""") val purchases_partitions = Array(bucket(8, "item_id")) - createTable(purchases, purchases_schema, purchases_partitions) + createTable(purchases, purchasesColumns, purchases_partitions) sql(s"""INSERT INTO testcat.ns.$purchases VALUES (1, 42.0, cast('2020-01-01' as timestamp)), (1, 44.0, cast('2020-01-15' as timestamp)), @@ -1227,7 +1226,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { val table1 = "tab1e1" val table2 = "table2" val partition = Array(identity("id"), identity("data")) - createTable(table1, schema, partition) + createTable(table1, columns, partition) sql(s"INSERT INTO testcat.ns.$table1 VALUES " + "(1, 'aa', cast('2020-01-01' as timestamp)), " + "(2, 'bb', cast('2020-01-01' as timestamp)), " + @@ -1237,7 +1236,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { "(3, 'ee', cast('2020-01-01' as timestamp)), " + "(3, 'ee', cast('2020-01-01' as timestamp))") - createTable(table2, schema, partition) + createTable(table2, columns, partition) sql(s"INSERT INTO testcat.ns.$table2 VALUES " + "(4, 'zz', cast('2020-01-01' as timestamp)), " + "(4, 'zz', cast('2020-01-01' as timestamp)), " + @@ -1314,13 +1313,13 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { val table1 = "tab1e1" val table2 = "table2" val partition = Array(identity("id"), identity("data")) - createTable(table1, schema, partition) + createTable(table1, columns, partition) sql(s"INSERT INTO testcat.ns.$table1 VALUES " + "(1, 'aa', cast('2020-01-01' as timestamp)), " + "(2, 'bb', cast('2020-01-02' as timestamp)), " + "(3, 'cc', cast('2020-01-03' as timestamp))") - createTable(table2, schema, partition) + createTable(table2, columns, partition) sql(s"INSERT INTO testcat.ns.$table2 VALUES " + "(4, 'aa', cast('2020-01-01' as timestamp)), " + "(5, 'bb', cast('2020-01-02' as timestamp)), " + @@ -1371,7 +1370,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("SPARK-44647: test join key is the second partition key and a transform") { val items_partitions = Array(bucket(8, "id"), days("arrive_time")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + @@ -1380,7 +1379,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(bucket(8, "item_id"), days("time")) - createTable(purchases, purchases_schema, purchases_partitions) + createTable(purchases, purchasesColumns, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + s"(1, 44.0, cast('2020-01-15' as timestamp)), " + @@ -1441,7 +1440,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { test("SPARK-44647: shuffle one side and join keys are less than partition keys") { val items_partitions = Array(identity("id"), identity("name")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + "(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + @@ -1449,7 +1448,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { "(3, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + "(4, 'cc', 15.5, cast('2020-02-01' as timestamp))") - createTable(purchases, purchases_schema, Array.empty) + createTable(purchases, purchasesColumns, Array.empty) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + "(1, 42.0, cast('2020-01-01' as timestamp)), " + "(1, 89.0, cast('2020-01-03' as timestamp)), " + @@ -1486,7 +1485,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "10") { val items_partitions = Array(identity("id")) - createTable(items, items_schema, items_partitions) + createTable(items, itemsColumns, items_partitions) sql(s"INSERT INTO testcat.ns.$items VALUES " + s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + @@ -1495,7 +1494,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") val purchases_partitions = Array(identity("item_id")) - createTable(purchases, purchases_schema, purchases_partitions) + createTable(purchases, purchasesColumns, purchases_partitions) sql(s"INSERT INTO testcat.ns.$purchases VALUES " + s"(1, 42.0, cast('2020-01-01' as timestamp)), " + s"(1, 44.0, cast('2020-01-15' as timestamp)), " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala index d1b6af24af6a..12d5f13df01c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala @@ -977,8 +977,7 @@ class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase ) val distribution = Distributions.ordered(ordering) - catalog.createTable(ident, columns, Array.empty, emptyProps, distribution, ordering, None, None, - distributionStrictlyRequired = true, numRowsPerSplit = Int.MaxValue) + catalog.createTable(ident, columns, Array.empty, emptyProps, distribution, ordering, None, None) withTempDir { checkpointDir => val inputData = ContinuousMemoryStream[(Long, String, Date)] @@ -1219,8 +1218,7 @@ class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase // scalastyle:on argcount catalog.createTable(ident, columns, Array.empty, emptyProps, tableDistribution, - tableOrdering, tableNumPartitions, tablePartitionSize, - distributionStrictlyRequired, numRowsPerSplit = Int.MaxValue) + tableOrdering, tableNumPartitions, tablePartitionSize, distributionStrictlyRequired) val df = if (!dataSkewed) { spark.createDataFrame(Seq( @@ -1322,8 +1320,7 @@ class WriteDistributionAndOrderingSuite extends DistributionAndOrderingSuiteBase expectAnalysisException: Boolean = false): Unit = { catalog.createTable(ident, columns, Array.empty, emptyProps, tableDistribution, - tableOrdering, tableNumPartitions, tablePartitionSize, - distributionStrictlyRequired = true, numRowsPerSplit = Int.MaxValue) + tableOrdering, tableNumPartitions, tablePartitionSize) withTempDir { checkpointDir => val inputData = MemoryStream[(Long, String, Date)] From f532e1540154aa9199ed8e760e0e2e5b5c0aad35 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Mon, 4 Mar 2024 22:01:32 +0800 Subject: [PATCH 05/10] fix unused import --- .../spark/sql/connector/KeyGroupedPartitioningSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index f2c5591aab1d..7fdc703007c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Literal, TransformExpression} import org.apache.spark.sql.catalyst.plans.physical -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, InMemoryTableCatalog} +import org.apache.spark.sql.connector.catalog.{Column, Identifier, InMemoryTableCatalog} import org.apache.spark.sql.connector.catalog.functions._ import org.apache.spark.sql.connector.distributions.Distributions import org.apache.spark.sql.connector.expressions._ From 8f3abbe7fc688c275f9b6070be00ce9a8fa5dad3 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Tue, 5 Mar 2024 10:12:20 +0800 Subject: [PATCH 06/10] fix UT --- .../catalog/InMemoryPartitionTableCatalog.scala | 10 ++++++++++ .../InMemoryRowLevelOperationTableCatalog.scala | 10 ++++++++++ .../catalog/InMemoryTableWithV2FilterCatalog.scala | 10 ++++++++++ .../apache/spark/sql/connector/LocalScanSuite.scala | 11 ++++++++++- .../spark/sql/connector/V1ReadFallbackSuite.scala | 12 +++++++++++- .../sql/streaming/test/DataStreamTableAPISuite.scala | 11 ++++++++++- 6 files changed, 61 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTableCatalog.scala index 9a45d6420983..f45053fa3780 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTableCatalog.scala @@ -31,12 +31,22 @@ class InMemoryPartitionTableCatalog extends InMemoryTableCatalog { schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): Table = { + val columns = CatalogV2Util.structTypeToV2Columns(schema) + createTable(ident, columns, partitions, properties) + } + + override def createTable( + ident: Identifier, + columns: Array[Column], + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { if (tables.containsKey(ident)) { throw new TableAlreadyExistsException(ident.asMultipartIdentifier) } InMemoryTableCatalog.maybeSimulateFailedTableCreation(properties) + val schema = CatalogV2Util.v2ColumnsToStructType(columns) val table = new InMemoryAtomicPartitionTable( s"$name.${ident.quoted}", schema, partitions, properties) tables.put(ident, table) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala index 94e694761221..befd8fa80af7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala @@ -31,6 +31,15 @@ class InMemoryRowLevelOperationTableCatalog extends InMemoryTableCatalog { schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): Table = { + val columns = CatalogV2Util.structTypeToV2Columns(schema) + createTable(ident, columns, partitions, properties) + } + + override def createTable( + ident: Identifier, + columns: Array[Column], + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { if (tables.containsKey(ident)) { throw new TableAlreadyExistsException(ident.asMultipartIdentifier) } @@ -38,6 +47,7 @@ class InMemoryRowLevelOperationTableCatalog extends InMemoryTableCatalog { InMemoryTableCatalog.maybeSimulateFailedTableCreation(properties) val tableName = s"$name.${ident.quoted}" + val schema = CatalogV2Util.v2ColumnsToStructType(columns) val table = new InMemoryRowLevelOperationTable(tableName, schema, partitions, properties) tables.put(ident, table) namespaces.putIfAbsent(ident.namespace.toList, Map()) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2FilterCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2FilterCatalog.scala index 240550fdcf99..bfdb9912bb66 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2FilterCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2FilterCatalog.scala @@ -31,6 +31,15 @@ class InMemoryTableWithV2FilterCatalog extends InMemoryTableCatalog { schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): Table = { + val columns = CatalogV2Util.structTypeToV2Columns(schema) + createTable(ident, columns, partitions, properties) + } + + override def createTable( + ident: Identifier, + columns: Array[Column], + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { if (tables.containsKey(ident)) { throw new TableAlreadyExistsException(ident.asMultipartIdentifier) } @@ -38,6 +47,7 @@ class InMemoryTableWithV2FilterCatalog extends InMemoryTableCatalog { InMemoryTableCatalog.maybeSimulateFailedTableCreation(properties) val tableName = s"$name.${ident.quoted}" + val schema = CatalogV2Util.v2ColumnsToStructType(columns) val table = new InMemoryTableWithV2Filter(tableName, schema, partitions, properties) tables.put(ident, table) namespaces.putIfAbsent(ident.namespace.toList, Map()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/LocalScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/LocalScanSuite.scala index e3d61a846fdb..61a1dc123bbd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/LocalScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/LocalScanSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, Identifier, SupportsRead, Table, TableCapability} +import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, CatalogV2Util, Column, Identifier, SupportsRead, Table, TableCapability} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.read.{LocalScan, Scan, ScanBuilder} import org.apache.spark.sql.execution.LocalTableScanExec @@ -60,6 +60,15 @@ class TestLocalScanCatalog extends BasicInMemoryTableCatalog { schema: StructType, partitions: Array[Transform], properties: java.util.Map[String, String]): Table = { + val columns = CatalogV2Util.structTypeToV2Columns(schema) + createTable(ident, columns, partitions, properties) + } + + override def createTable( + ident: Identifier, + columns: Array[Column], + partitions: Array[Transform], + properties: java.util.Map[String, String]): Table = { val table = new TestLocalScanTable(ident.toString) tables.put(ident, table) table diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala index b876240ddc37..7726874df4b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.connector import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession, SQLContext} -import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, Identifier, SupportsRead, Table, TableCapability} +import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, CatalogV2Util, Column, Identifier, SupportsRead, Table, TableCapability} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns, V1Scan} import org.apache.spark.sql.execution.RowDataSourceScanExec @@ -104,7 +104,17 @@ class V1ReadFallbackCatalog extends BasicInMemoryTableCatalog { schema: StructType, partitions: Array[Transform], properties: java.util.Map[String, String]): Table = { + val columns = CatalogV2Util.structTypeToV2Columns(schema) + createTable(ident, columns, partitions, properties) + } + + override def createTable( + ident: Identifier, + columns: Array[Column], + partitions: Array[Transform], + properties: java.util.Map[String, String]): Table = { // To simplify the test implementation, only support fixed schema. + val schema = CatalogV2Util.v2ColumnsToStructType(columns) if (schema != V1ReadFallbackCatalog.schema || partitions.nonEmpty) { throw SparkUnsupportedOperationException() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala index eecc9468649d..fc6e57b6b990 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.connector.{FakeV2Provider, FakeV2ProviderWithCustomSchema, InMemoryTableSessionCatalog} -import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog, MetadataColumn, SupportsMetadataColumns, SupportsRead, Table, TableCapability, V2TableWithV1Fallback} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, InMemoryTableCatalog, MetadataColumn, SupportsMetadataColumns, SupportsRead, Table, TableCapability, V2TableWithV1Fallback} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.execution.streaming.{MemoryStream, MemoryStreamScanBuilder, StreamingQueryWrapper} @@ -652,6 +652,15 @@ class InMemoryStreamTableCatalog extends InMemoryTableCatalog { schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): Table = { + val columns = CatalogV2Util.structTypeToV2Columns(schema) + createTable(ident, columns, partitions, properties) + } + + override def createTable( + ident: Identifier, + columns: Array[Column], + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { if (tables.containsKey(ident)) { throw new TableAlreadyExistsException(ident.asMultipartIdentifier) } From 7a2bcbb4528d516ca80ef4e06a4b8c0482d3757f Mon Sep 17 00:00:00 2001 From: panbingkun Date: Wed, 6 Mar 2024 14:39:27 +0800 Subject: [PATCH 07/10] remove redundant override createTable --- .../InMemoryPartitionTableCatalog.scala | 10 --------- ...nMemoryRowLevelOperationTableCatalog.scala | 10 --------- .../catalog/InMemoryTableCatalog.scala | 3 ++- .../InMemoryTableWithV2FilterCatalog.scala | 10 --------- .../datasources/v2/V2SessionCatalog.scala | 21 +++++++++++-------- .../v2/jdbc/JDBCTableCatalog.scala | 14 ++++++++++++- .../spark/sql/connector/LocalScanSuite.scala | 11 +--------- .../connector/TestV2SessionCatalogBase.scala | 12 ++--------- .../sql/connector/V1ReadFallbackSuite.scala | 9 -------- .../test/DataStreamTableAPISuite.scala | 11 +--------- 10 files changed, 31 insertions(+), 80 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTableCatalog.scala index f45053fa3780..3b8020003aa4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTableCatalog.scala @@ -21,20 +21,10 @@ import java.util import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.types.StructType class InMemoryPartitionTableCatalog extends InMemoryTableCatalog { import CatalogV2Implicits._ - override def createTable( - ident: Identifier, - schema: StructType, - partitions: Array[Transform], - properties: util.Map[String, String]): Table = { - val columns = CatalogV2Util.structTypeToV2Columns(schema) - createTable(ident, columns, partitions, properties) - } - override def createTable( ident: Identifier, columns: Array[Column], diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala index befd8fa80af7..deb200650bd5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala @@ -21,20 +21,10 @@ import java.util import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.types.StructType class InMemoryRowLevelOperationTableCatalog extends InMemoryTableCatalog { import CatalogV2Implicits._ - override def createTable( - ident: Identifier, - schema: StructType, - partitions: Array[Transform], - properties: util.Map[String, String]): Table = { - val columns = CatalogV2Util.structTypeToV2Columns(schema) - createTable(ident, columns, partitions, properties) - } - override def createTable( ident: Identifier, columns: Array[Column], diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index fad54d1cf8cd..ef4afaa18e74 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -84,7 +84,8 @@ class BasicInMemoryTableCatalog extends TableCatalog { invalidatedTables.add(ident) } - // TODO: remove it when no tests calling this deprecated method. + // TODO: remove it when the deprecated method `createTable(..., StructType, ...)` + // in the parent interface `TableCatalog` is removed override def createTable( ident: Identifier, schema: StructType, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2FilterCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2FilterCatalog.scala index bfdb9912bb66..7ec1cab304ad 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2FilterCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2FilterCatalog.scala @@ -21,20 +21,10 @@ import java.util import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.types.StructType class InMemoryTableWithV2FilterCatalog extends InMemoryTableCatalog { import CatalogV2Implicits._ - override def createTable( - ident: Identifier, - schema: StructType, - partitions: Array[Transform], - properties: util.Map[String, String]): Table = { - val columns = CatalogV2Util.structTypeToV2Columns(schema) - createTable(ident, columns, partitions, properties) - } - override def createTable( ident: Identifier, columns: Array[Column], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 0cb3f8dca38f..63809ddf5642 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -151,15 +151,6 @@ class V2SessionCatalog(catalog: SessionCatalog) columns: Array[Column], partitions: Array[Transform], properties: util.Map[String, String]): Table = { - createTable(ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties) - } - - // TODO: remove it when no tests calling this deprecated method. - override def createTable( - ident: Identifier, - schema: StructType, - partitions: Array[Transform], - properties: util.Map[String, String]): Table = { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName) val tableProperties = properties.asScala.toMap @@ -173,6 +164,7 @@ class V2SessionCatalog(catalog: SessionCatalog) CatalogTableType.MANAGED } + val schema = CatalogV2Util.v2ColumnsToStructType(columns) val (newSchema, newPartitions) = DataSourceV2Utils.getTableProvider(provider, conf) match { // If the provider does not support external metadata, users should not be allowed to // specify custom schema when creating the data source table, since the schema will not @@ -244,6 +236,17 @@ class V2SessionCatalog(catalog: SessionCatalog) null // Return null to save the `loadTable` call for CREATE TABLE without AS SELECT. } + // TODO: remove it when the deprecated method `createTable(..., StructType, ...)` + // in the parent interface `TableCatalog` is removed + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { + val columns = CatalogV2Util.structTypeToV2Columns(schema) + createTable(ident, columns, partitions, properties) + } + private def toOptions(properties: Map[String, String]): Map[String, String] = { properties.filter { case (k, _) => k.startsWith(TableCatalog.OPTION_PREFIX) }.map { case (key, value) => key.drop(TableCatalog.OPTION_PREFIX.length) -> value diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index a5ef9bd5e6b8..e04bf0fc1424 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -24,7 +24,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException -import org.apache.spark.sql.connector.catalog.{FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange} import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.{DataTypeErrorsBase, QueryCompilationErrors, QueryExecutionErrors} @@ -139,11 +139,22 @@ class JDBCTableCatalog extends TableCatalog } } + // TODO: remove it when the deprecated method `createTable(..., StructType, ...)` + // in the parent interface `TableCatalog` is removed override def createTable( ident: Identifier, schema: StructType, partitions: Array[Transform], properties: java.util.Map[String, String]): Table = { + val columns = CatalogV2Util.structTypeToV2Columns(schema) + createTable(ident, columns, partitions, properties) + } + + override def createTable( + ident: Identifier, + columns: Array[Column], + partitions: Array[Transform], + properties: java.util.Map[String, String]): Table = { checkNamespace(ident.namespace()) if (partitions.nonEmpty) { throw QueryExecutionErrors.cannotCreateJDBCTableWithPartitionsError() @@ -180,6 +191,7 @@ class JDBCTableCatalog extends TableCatalog val writeOptions = new JdbcOptionsInWrite(tableOptions) val caseSensitive = SQLConf.get.caseSensitiveAnalysis + val schema = CatalogV2Util.v2ColumnsToStructType(columns) JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( errorClass = "FAILED_JDBC.CREATE_TABLE", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/LocalScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/LocalScanSuite.scala index 61a1dc123bbd..fc808c835bb9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/LocalScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/LocalScanSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, CatalogV2Util, Column, Identifier, SupportsRead, Table, TableCapability} +import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, Column, Identifier, SupportsRead, Table, TableCapability} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.read.{LocalScan, Scan, ScanBuilder} import org.apache.spark.sql.execution.LocalTableScanExec @@ -55,15 +55,6 @@ class LocalScanSuite extends QueryTest with SharedSparkSession { } class TestLocalScanCatalog extends BasicInMemoryTableCatalog { - override def createTable( - ident: Identifier, - schema: StructType, - partitions: Array[Transform], - properties: java.util.Map[String, String]): Table = { - val columns = CatalogV2Util.structTypeToV2Columns(schema) - createTable(ident, columns, partitions, properties) - } - override def createTable( ident: Identifier, columns: Array[Column], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala index 719b006c1460..3b2fc0379340 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala @@ -69,15 +69,6 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] extends Delegating columns: Array[Column], partitions: Array[Transform], properties: java.util.Map[String, String]): Table = { - createTable(ident, CatalogV2Util.v2ColumnsToStructType(columns), partitions, properties) - } - - // TODO: remove it when no tests calling this deprecated method. - override def createTable( - ident: Identifier, - schema: StructType, - partitions: Array[Transform], - properties: java.util.Map[String, String]): Table = { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper val key = TestV2SessionCatalogBase.SIMULATE_ALLOW_EXTERNAL_PROPERTY val propsWithLocation = if (properties.containsKey(key)) { @@ -93,7 +84,8 @@ private[connector] trait TestV2SessionCatalogBase[T <: Table] extends Delegating } else { properties } - super.createTable(ident, schema, partitions, propsWithLocation) + super.createTable(ident, columns, partitions, propsWithLocation) + val schema = CatalogV2Util.v2ColumnsToStructType(columns) val t = newTable(ident.quoted, schema, partitions, propsWithLocation) addTable(ident, t) t diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala index 7726874df4b3..50272fac4a4a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala @@ -99,15 +99,6 @@ class V1ReadFallbackWithCatalogSuite extends V1ReadFallbackSuite { } class V1ReadFallbackCatalog extends BasicInMemoryTableCatalog { - override def createTable( - ident: Identifier, - schema: StructType, - partitions: Array[Transform], - properties: java.util.Map[String, String]): Table = { - val columns = CatalogV2Util.structTypeToV2Columns(schema) - createTable(ident, columns, partitions, properties) - } - override def createTable( ident: Identifier, columns: Array[Column], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala index fc6e57b6b990..af07aceaed14 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.connector.{FakeV2Provider, FakeV2ProviderWithCustomSchema, InMemoryTableSessionCatalog} -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, InMemoryTableCatalog, MetadataColumn, SupportsMetadataColumns, SupportsRead, Table, TableCapability, V2TableWithV1Fallback} +import org.apache.spark.sql.connector.catalog.{Column, Identifier, InMemoryTableCatalog, MetadataColumn, SupportsMetadataColumns, SupportsRead, Table, TableCapability, V2TableWithV1Fallback} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.execution.streaming.{MemoryStream, MemoryStreamScanBuilder, StreamingQueryWrapper} @@ -647,15 +647,6 @@ class NonStreamV2Table(override val name: String) class InMemoryStreamTableCatalog extends InMemoryTableCatalog { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - override def createTable( - ident: Identifier, - schema: StructType, - partitions: Array[Transform], - properties: util.Map[String, String]): Table = { - val columns = CatalogV2Util.structTypeToV2Columns(schema) - createTable(ident, columns, partitions, properties) - } - override def createTable( ident: Identifier, columns: Array[Column], From fc368b6b5a4239d510b9f3be73176548ba7037f7 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Thu, 7 Mar 2024 09:35:35 +0800 Subject: [PATCH 08/10] call deprecated method throw internal exception --- .../spark/sql/errors/QueryCompilationErrors.scala | 10 ++++++++++ .../sql/connector/catalog/InMemoryTableCatalog.scala | 4 ++-- .../execution/datasources/v2/V2SessionCatalog.scala | 3 +-- .../datasources/v2/jdbc/JDBCTableCatalog.scala | 3 +-- 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 95a66f015729..244011f2d8fb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3996,4 +3996,14 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat origin = e.origin ) } + + private def callDeprecatedMethodError(oldMethod: String, newMethod: String): Throwable = { + SparkException.internalError(s"The method `$oldMethod` is deprecated, " + + s"please use `$newMethod` instead.") + } + + def createTableDeprecatedError(): Throwable = { + callDeprecatedMethodError("createTable(..., StructType, ...)", + "createTable(..., Array[Column], ...)") + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index ef4afaa18e74..803e26aa93f3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -25,6 +25,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NonEmptyNamespaceException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.distributions.{Distribution, Distributions} import org.apache.spark.sql.connector.expressions.{SortOrder, Transform} +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -91,8 +92,7 @@ class BasicInMemoryTableCatalog extends TableCatalog { schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): Table = { - val columns = CatalogV2Util.structTypeToV2Columns(schema) - createTable(ident, columns, partitions, properties) + throw QueryCompilationErrors.createTableDeprecatedError() } override def createTable( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 3bb16421d5b5..cbf2d358cc30 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -248,8 +248,7 @@ class V2SessionCatalog(catalog: SessionCatalog) schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): Table = { - val columns = CatalogV2Util.structTypeToV2Columns(schema) - createTable(ident, columns, partitions, properties) + throw QueryCompilationErrors.createTableDeprecatedError() } private def toOptions(properties: Map[String, String]): Map[String, String] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index e04bf0fc1424..c7bd470f7a4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -146,8 +146,7 @@ class JDBCTableCatalog extends TableCatalog schema: StructType, partitions: Array[Transform], properties: java.util.Map[String, String]): Table = { - val columns = CatalogV2Util.structTypeToV2Columns(schema) - createTable(ident, columns, partitions, properties) + throw QueryCompilationErrors.createTableDeprecatedError() } override def createTable( From 9cd53b215f944b19b3ce81bdc0b6e5d379fc06df Mon Sep 17 00:00:00 2001 From: panbingkun Date: Thu, 7 Mar 2024 10:52:33 +0800 Subject: [PATCH 09/10] fix V2SessionCatalogSuite --- .../datasources/v2/V2SessionCatalogSuite.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index 07478d0bf7f9..4de74af25000 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -470,20 +470,24 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { test("alterTable: update column nullability") { val catalog = newCatalog() - val originalSchema = new StructType() - .add("id", IntegerType, nullable = false) - .add("data", StringType) - catalog.createTable(testIdent, originalSchema, emptyTrans, emptyProps) + val originalColumns: Array[Column] = Array( + Column.create("id", IntegerType, false), + Column.create("data", StringType)) + catalog.createTable(testIdent, originalColumns, emptyTrans, emptyProps) val table = catalog.loadTable(testIdent) - assert(table.schema == originalSchema) + assert(table.columns === originalColumns) catalog.alterTable(testIdent, TableChange.updateColumnNullability(Array("id"), true)) val updated = catalog.loadTable(testIdent) val expectedSchema = new StructType().add("id", IntegerType).add("data", StringType) - assert(updated.schema == expectedSchema) + val expectedColumns: Array[Column] = Array( + Column.create("id", IntegerType), + Column.create("data", StringType) + ) + assert(updated.columns === expectedColumns) } test("alterTable: update missing column fails") { From e3dcf4c951634d76dc72e76f629865f0cda8d81f Mon Sep 17 00:00:00 2001 From: panbingkun Date: Fri, 8 Mar 2024 13:58:45 +0800 Subject: [PATCH 10/10] remove comment --- .../spark/sql/connector/catalog/InMemoryTableCatalog.scala | 2 -- .../spark/sql/execution/datasources/v2/V2SessionCatalog.scala | 2 -- .../sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala | 2 -- 3 files changed, 6 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index 803e26aa93f3..d511477ef5d3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -85,8 +85,6 @@ class BasicInMemoryTableCatalog extends TableCatalog { invalidatedTables.add(ident) } - // TODO: remove it when the deprecated method `createTable(..., StructType, ...)` - // in the parent interface `TableCatalog` is removed override def createTable( ident: Identifier, schema: StructType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index cbf2d358cc30..15166c822903 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -241,8 +241,6 @@ class V2SessionCatalog(catalog: SessionCatalog) null // Return null to save the `loadTable` call for CREATE TABLE without AS SELECT. } - // TODO: remove it when the deprecated method `createTable(..., StructType, ...)` - // in the parent interface `TableCatalog` is removed override def createTable( ident: Identifier, schema: StructType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index c7bd470f7a4e..b6e361946477 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -139,8 +139,6 @@ class JDBCTableCatalog extends TableCatalog } } - // TODO: remove it when the deprecated method `createTable(..., StructType, ...)` - // in the parent interface `TableCatalog` is removed override def createTable( ident: Identifier, schema: StructType,