diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala index e20dfd4f6051..d465118729ab 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala @@ -189,7 +189,7 @@ class CatalogSuite extends SparkFunSuite { val loaded = catalog.loadTable(testIdent) assert(table.name == loaded.name) - assert(table.schema == loaded.schema) + assert(table.columns() === loaded.columns()) assert(table.properties == loaded.properties) } @@ -212,7 +212,7 @@ class CatalogSuite extends SparkFunSuite { val loaded = catalog.loadTable(testIdent) assert(table.name == loaded.name) - assert(table.schema == loaded.schema) + assert(table.columns() === loaded.columns()) assert(table.properties == loaded.properties) } @@ -394,8 +394,8 @@ class CatalogSuite extends SparkFunSuite { val updated = catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), LongType)) - val expectedSchema = new StructType().add("id", LongType).add("data", StringType) - assert(updated.schema == expectedSchema) + val expectedColumns = Array(Column.create("id", LongType), Column.create("data", StringType)) + assert(updated.columns() === expectedColumns) } test("alterTable: update column nullability") { @@ -411,8 +411,8 @@ class CatalogSuite extends SparkFunSuite { val updated = catalog.alterTable(testIdent, TableChange.updateColumnNullability(Array("id"), true)) - val expectedSchema = new StructType().add("id", IntegerType).add("data", StringType) - assert(updated.schema == expectedSchema) + val expectedColumns = Array(Column.create("id", IntegerType), Column.create("data", StringType)) + assert(updated.columns() === expectedColumns) } test("alterTable: update missing column fails") { @@ -441,10 +441,10 @@ class CatalogSuite extends SparkFunSuite { val updated = catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"), "comment text")) - val expectedSchema = new StructType() - .add("id", IntegerType, nullable = true, "comment text") - .add("data", StringType) - assert(updated.schema == expectedSchema) + val expectedColumns = Array( + Column.create("id", IntegerType, true, "comment text", null), + Column.create("data", StringType)) + assert(updated.columns() === expectedColumns) } test("alterTable: replace comment") { @@ -456,14 +456,14 @@ class CatalogSuite extends SparkFunSuite { catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"), "comment text")) - val expectedSchema = new StructType() - .add("id", IntegerType, nullable = true, "replacement comment") - .add("data", StringType) + val expectedColumns = Array( + Column.create("id", IntegerType, true, "replacement comment", null), + Column.create("data", StringType)) val updated = catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"), "replacement comment")) - assert(updated.schema == expectedSchema) + assert(updated.columns() === expectedColumns) } test("alterTable: add comment to missing column fails") { @@ -491,9 +491,11 @@ class CatalogSuite extends SparkFunSuite { val updated = catalog.alterTable(testIdent, TableChange.renameColumn(Array("id"), "some_id")) - val expectedSchema = new StructType().add("some_id", IntegerType).add("data", StringType) + val expectedColumns = Array( + Column.create("some_id", IntegerType), + Column.create("data", StringType)) - assert(updated.schema == expectedSchema) + assert(updated.columns() === expectedColumns) } test("alterTable: rename nested column") { @@ -580,8 +582,8 @@ class CatalogSuite extends SparkFunSuite { val updated = catalog.alterTable(testIdent, TableChange.deleteColumn(Array("id"), false)) - val expectedSchema = new StructType().add("data", StringType) - assert(updated.schema == expectedSchema) + val expectedColumns = Array(Column.create("data", StringType)) + assert(updated.columns() === expectedColumns) } test("alterTable: delete nested column") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala index 1aa0b408366b..03b135de4fe7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagementSuite.scala @@ -53,7 +53,10 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite { test("createPartitions") { val table = catalog.loadTable(ident) val partTable = new InMemoryAtomicPartitionTable( - table.name(), table.schema(), table.partitioning(), table.properties()) + table.name(), + CatalogV2Util.v2ColumnsToStructType(table.columns()), + table.partitioning(), + table.properties()) assert(!hasPartitions(partTable)) val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4")) @@ -72,7 +75,10 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite { test("createPartitions failed if partition already exists") { val table = catalog.loadTable(ident) val partTable = new InMemoryAtomicPartitionTable( - table.name(), table.schema(), table.partitioning(), table.properties()) + table.name(), + CatalogV2Util.v2ColumnsToStructType(table.columns()), + table.partitioning(), + table.properties()) assert(!hasPartitions(partTable)) val partIdent = InternalRow.apply("4") @@ -94,7 +100,10 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite { test("dropPartitions") { val table = catalog.loadTable(ident) val partTable = new InMemoryAtomicPartitionTable( - table.name(), table.schema(), table.partitioning(), table.properties()) + table.name(), + CatalogV2Util.v2ColumnsToStructType(table.columns()), + table.partitioning(), + table.properties()) assert(!hasPartitions(partTable)) val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4")) @@ -112,7 +121,10 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite { test("purgePartitions") { val table = catalog.loadTable(ident) val partTable = new InMemoryAtomicPartitionTable( - table.name(), table.schema(), table.partitioning(), table.properties()) + table.name(), + CatalogV2Util.v2ColumnsToStructType(table.columns()), + table.partitioning(), + table.properties()) val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4")) partTable.createPartitions( partIdents, @@ -129,7 +141,10 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite { test("dropPartitions failed if partition not exists") { val table = catalog.loadTable(ident) val partTable = new InMemoryAtomicPartitionTable( - table.name(), table.schema(), table.partitioning(), table.properties()) + table.name(), + CatalogV2Util.v2ColumnsToStructType(table.columns()), + table.partitioning(), + table.properties()) assert(!hasPartitions(partTable)) val partIdent = InternalRow.apply("4") @@ -147,7 +162,10 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite { test("truncatePartitions") { val table = catalog.loadTable(ident) val partTable = new InMemoryAtomicPartitionTable( - table.name(), table.schema(), table.partitioning(), table.properties()) + table.name(), + CatalogV2Util.v2ColumnsToStructType(table.columns()), + table.partitioning(), + table.properties()) assert(!hasPartitions(partTable)) partTable.createPartitions( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala index 06a23e7fda20..a1f635d2e620 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala @@ -55,7 +55,10 @@ class SupportsPartitionManagementSuite extends SparkFunSuite { test("createPartition") { val table = catalog.loadTable(ident) val partTable = new InMemoryPartitionTable( - table.name(), table.schema(), table.partitioning(), table.properties()) + table.name(), + CatalogV2Util.v2ColumnsToStructType(table.columns()), + table.partitioning(), + table.properties()) assert(!hasPartitions(partTable)) val partIdent = InternalRow.apply("3") @@ -70,7 +73,10 @@ class SupportsPartitionManagementSuite extends SparkFunSuite { test("dropPartition") { val table = catalog.loadTable(ident) val partTable = new InMemoryPartitionTable( - table.name(), table.schema(), table.partitioning(), table.properties()) + table.name(), + CatalogV2Util.v2ColumnsToStructType(table.columns()), + table.partitioning(), + table.properties()) assert(!hasPartitions(partTable)) val partIdent = InternalRow.apply("3") @@ -88,7 +94,10 @@ class SupportsPartitionManagementSuite extends SparkFunSuite { test("purgePartition") { val table = catalog.loadTable(ident) val partTable = new InMemoryPartitionTable( - table.name(), table.schema(), table.partitioning(), table.properties()) + table.name(), + CatalogV2Util.v2ColumnsToStructType(table.columns()), + table.partitioning(), + table.properties()) checkError( exception = intercept[SparkUnsupportedOperationException] { partTable.purgePartition(InternalRow.apply("3")) @@ -101,7 +110,10 @@ class SupportsPartitionManagementSuite extends SparkFunSuite { test("replacePartitionMetadata") { val table = catalog.loadTable(ident) val partTable = new InMemoryPartitionTable( - table.name(), table.schema(), table.partitioning(), table.properties()) + table.name(), + CatalogV2Util.v2ColumnsToStructType(table.columns()), + table.partitioning(), + table.properties()) assert(!hasPartitions(partTable)) val partIdent = InternalRow.apply("3") @@ -123,7 +135,10 @@ class SupportsPartitionManagementSuite extends SparkFunSuite { test("loadPartitionMetadata") { val table = catalog.loadTable(ident) val partTable = new InMemoryPartitionTable( - table.name(), table.schema(), table.partitioning(), table.properties()) + table.name(), + CatalogV2Util.v2ColumnsToStructType(table.columns()), + table.partitioning(), + table.properties()) assert(!hasPartitions(partTable)) val partIdent = InternalRow.apply("3") @@ -140,7 +155,10 @@ class SupportsPartitionManagementSuite extends SparkFunSuite { test("listPartitionIdentifiers") { val table = catalog.loadTable(ident) val partTable = new InMemoryPartitionTable( - table.name(), table.schema(), table.partitioning(), table.properties()) + table.name(), + CatalogV2Util.v2ColumnsToStructType(table.columns()), + table.partitioning(), + table.properties()) assert(!hasPartitions(partTable)) val partIdent = InternalRow.apply("3") @@ -248,7 +266,10 @@ class SupportsPartitionManagementSuite extends SparkFunSuite { test("truncatePartition") { val table = catalog.loadTable(ident) val partTable = new InMemoryPartitionTable( - table.name(), table.schema(), table.partitioning(), table.properties()) + table.name(), + CatalogV2Util.v2ColumnsToStructType(table.columns()), + table.partitioning(), + table.properties()) assert(!hasPartitions(partTable)) val partIdent = InternalRow.apply("3") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala index 59a566a3f296..9b0c15ab3006 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.SchemaRequiredDataSource -import org.apache.spark.sql.connector.catalog.InMemoryPartitionTableCatalog +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, InMemoryPartitionTableCatalog} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf @@ -767,7 +767,7 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession { def checkSchema(df: DataFrame): Unit = { val schemas = df.queryExecution.analyzed.collect { case l: LogicalRelation => l.relation.schema - case d: DataSourceV2Relation => d.table.schema() + case d: DataSourceV2Relation => CatalogV2Util.v2ColumnsToStructType(d.table.columns()) } assert(schemas.length == 1) assert(schemas.head.map(_.dataType) == Seq(StringType)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala index e9e3432195a4..3aa3e1d0bd9c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.util.CollationFactory import org.apache.spark.sql.connector.{DatasourceV2SQLBase, FakeV2ProviderWithCustomSchema} -import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, InMemoryTable} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership import org.apache.spark.sql.errors.DataTypeErrors.toSQLType @@ -711,7 +711,8 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { assert(table.columns().head.dataType() == StringType(collationId)) val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) - checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty) + checkAnswer(spark.internalCreateDataFrame(rdd, + CatalogV2Util.v2ColumnsToStructType(table.columns())), Seq.empty) sql(s"INSERT INTO $tableName VALUES ('a'), ('A')")