Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class CatalogSuite extends SparkFunSuite {
val loaded = catalog.loadTable(testIdent)

assert(table.name == loaded.name)
assert(table.schema == loaded.schema)
assert(table.columns() === loaded.columns())
assert(table.properties == loaded.properties)
}

Expand All @@ -212,7 +212,7 @@ class CatalogSuite extends SparkFunSuite {
val loaded = catalog.loadTable(testIdent)

assert(table.name == loaded.name)
assert(table.schema == loaded.schema)
assert(table.columns() === loaded.columns())
assert(table.properties == loaded.properties)
}

Expand Down Expand Up @@ -394,8 +394,8 @@ class CatalogSuite extends SparkFunSuite {

val updated = catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), LongType))

val expectedSchema = new StructType().add("id", LongType).add("data", StringType)
assert(updated.schema == expectedSchema)
val expectedColumns = Array(Column.create("id", LongType), Column.create("data", StringType))
assert(updated.columns() === expectedColumns)
}

test("alterTable: update column nullability") {
Expand All @@ -411,8 +411,8 @@ class CatalogSuite extends SparkFunSuite {
val updated = catalog.alterTable(testIdent,
TableChange.updateColumnNullability(Array("id"), true))

val expectedSchema = new StructType().add("id", IntegerType).add("data", StringType)
assert(updated.schema == expectedSchema)
val expectedColumns = Array(Column.create("id", IntegerType), Column.create("data", StringType))
assert(updated.columns() === expectedColumns)
}

test("alterTable: update missing column fails") {
Expand Down Expand Up @@ -441,10 +441,10 @@ class CatalogSuite extends SparkFunSuite {
val updated = catalog.alterTable(testIdent,
TableChange.updateColumnComment(Array("id"), "comment text"))

val expectedSchema = new StructType()
.add("id", IntegerType, nullable = true, "comment text")
.add("data", StringType)
assert(updated.schema == expectedSchema)
val expectedColumns = Array(
Column.create("id", IntegerType, true, "comment text", null),
Column.create("data", StringType))
assert(updated.columns() === expectedColumns)
}

test("alterTable: replace comment") {
Expand All @@ -456,14 +456,14 @@ class CatalogSuite extends SparkFunSuite {

catalog.alterTable(testIdent, TableChange.updateColumnComment(Array("id"), "comment text"))

val expectedSchema = new StructType()
.add("id", IntegerType, nullable = true, "replacement comment")
.add("data", StringType)
val expectedColumns = Array(
Column.create("id", IntegerType, true, "replacement comment", null),
Column.create("data", StringType))

val updated = catalog.alterTable(testIdent,
TableChange.updateColumnComment(Array("id"), "replacement comment"))

assert(updated.schema == expectedSchema)
assert(updated.columns() === expectedColumns)
}

test("alterTable: add comment to missing column fails") {
Expand Down Expand Up @@ -491,9 +491,11 @@ class CatalogSuite extends SparkFunSuite {

val updated = catalog.alterTable(testIdent, TableChange.renameColumn(Array("id"), "some_id"))

val expectedSchema = new StructType().add("some_id", IntegerType).add("data", StringType)
val expectedColumns = Array(
Column.create("some_id", IntegerType),
Column.create("data", StringType))

assert(updated.schema == expectedSchema)
assert(updated.columns() === expectedColumns)
}

test("alterTable: rename nested column") {
Expand Down Expand Up @@ -580,8 +582,8 @@ class CatalogSuite extends SparkFunSuite {
val updated = catalog.alterTable(testIdent,
TableChange.deleteColumn(Array("id"), false))

val expectedSchema = new StructType().add("data", StringType)
assert(updated.schema == expectedSchema)
val expectedColumns = Array(Column.create("data", StringType))
assert(updated.columns() === expectedColumns)
}

test("alterTable: delete nested column") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite {
test("createPartitions") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
table.name(),
CatalogV2Util.v2ColumnsToStructType(table.columns()),
table.partitioning(),
table.properties())
assert(!hasPartitions(partTable))

val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4"))
Expand All @@ -72,7 +75,10 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite {
test("createPartitions failed if partition already exists") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
table.name(),
CatalogV2Util.v2ColumnsToStructType(table.columns()),
table.partitioning(),
table.properties())
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("4")
Expand All @@ -94,7 +100,10 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite {
test("dropPartitions") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
table.name(),
CatalogV2Util.v2ColumnsToStructType(table.columns()),
table.partitioning(),
table.properties())
assert(!hasPartitions(partTable))

val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4"))
Expand All @@ -112,7 +121,10 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite {
test("purgePartitions") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
table.name(),
CatalogV2Util.v2ColumnsToStructType(table.columns()),
table.partitioning(),
table.properties())
val partIdents = Array(InternalRow.apply("3"), InternalRow.apply("4"))
partTable.createPartitions(
partIdents,
Expand All @@ -129,7 +141,10 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite {
test("dropPartitions failed if partition not exists") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
table.name(),
CatalogV2Util.v2ColumnsToStructType(table.columns()),
table.partitioning(),
table.properties())
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("4")
Expand All @@ -147,7 +162,10 @@ class SupportsAtomicPartitionManagementSuite extends SparkFunSuite {
test("truncatePartitions") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryAtomicPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
table.name(),
CatalogV2Util.v2ColumnsToStructType(table.columns()),
table.partitioning(),
table.properties())
assert(!hasPartitions(partTable))

partTable.createPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
test("createPartition") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
table.name(),
CatalogV2Util.v2ColumnsToStructType(table.columns()),
table.partitioning(),
table.properties())
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("3")
Expand All @@ -70,7 +73,10 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
test("dropPartition") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
table.name(),
CatalogV2Util.v2ColumnsToStructType(table.columns()),
table.partitioning(),
table.properties())
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("3")
Expand All @@ -88,7 +94,10 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
test("purgePartition") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
table.name(),
CatalogV2Util.v2ColumnsToStructType(table.columns()),
table.partitioning(),
table.properties())
checkError(
exception = intercept[SparkUnsupportedOperationException] {
partTable.purgePartition(InternalRow.apply("3"))
Expand All @@ -101,7 +110,10 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
test("replacePartitionMetadata") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
table.name(),
CatalogV2Util.v2ColumnsToStructType(table.columns()),
table.partitioning(),
table.properties())
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("3")
Expand All @@ -123,7 +135,10 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
test("loadPartitionMetadata") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
table.name(),
CatalogV2Util.v2ColumnsToStructType(table.columns()),
table.partitioning(),
table.properties())
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("3")
Expand All @@ -140,7 +155,10 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
test("listPartitionIdentifiers") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
table.name(),
CatalogV2Util.v2ColumnsToStructType(table.columns()),
table.partitioning(),
table.properties())
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("3")
Expand Down Expand Up @@ -248,7 +266,10 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
test("truncatePartition") {
val table = catalog.loadTable(ident)
val partTable = new InMemoryPartitionTable(
table.name(), table.schema(), table.partitioning(), table.properties())
table.name(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to add a constructor to InMemoryPartitionTable? Also, since InMemoryPartitionTable is code used for testing, it might be acceptable to directly modify the default constructor definition, but I'm not sure if there are third-party projects that depend on this test code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can create a new one and let the old one throw an exception?

CatalogV2Util.v2ColumnsToStructType(table.columns()),
table.partitioning(),
table.properties())
assert(!hasPartitions(partTable))

val partIdent = InternalRow.apply("3")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.SchemaRequiredDataSource
import org.apache.spark.sql.connector.catalog.InMemoryPartitionTableCatalog
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, InMemoryPartitionTableCatalog}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -767,7 +767,7 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession {
def checkSchema(df: DataFrame): Unit = {
val schemas = df.queryExecution.analyzed.collect {
case l: LogicalRelation => l.relation.schema
case d: DataSourceV2Relation => d.table.schema()
case d: DataSourceV2Relation => CatalogV2Util.v2ColumnsToStructType(d.table.columns())
}
assert(schemas.length == 1)
assert(schemas.head.map(_.dataType) == Seq(StringType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.util.CollationFactory
import org.apache.spark.sql.connector.{DatasourceV2SQLBase, FakeV2ProviderWithCustomSchema}
import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, InMemoryTable}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
Expand Down Expand Up @@ -711,7 +711,8 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
assert(table.columns().head.dataType() == StringType(collationId))

val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty)
checkAnswer(spark.internalCreateDataFrame(rdd,
CatalogV2Util.v2ColumnsToStructType(table.columns())), Seq.empty)

sql(s"INSERT INTO $tableName VALUES ('a'), ('A')")

Expand Down