diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/MetadataColumn.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/MetadataColumn.java new file mode 100644 index 0000000000000..8aefa28323b33 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/MetadataColumn.java @@ -0,0 +1,58 @@ +package org.apache.spark.sql.connector.catalog; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.DataType; + +/** + * Interface for a metadata column. + *

+ * A metadata column can expose additional metadata about a row. For example, rows from Kafka can + * use metadata columns to expose a message's topic, partition number, and offset. + *

+ * A metadata column could also be the result of a transform applied to a value in the row. For + * example, a partition value produced by bucket(id, 16) could be exposed by a metadata column. In + * this case, {@link #transform()} should return a non-null {@link Transform} that produced the + * metadata column's values. + */ +@Evolving +public interface MetadataColumn { + /** + * The name of this metadata column. + * + * @return a String name + */ + String name(); + + /** + * The data type of values in this metadata column. + * + * @return a {@link DataType} + */ + DataType dataType(); + + /** + * @return whether values produced by this metadata column may be null + */ + default boolean isNullable() { + return true; + } + + /** + * Documentation for this metadata column, or null. + * + * @return a documentation String + */ + default String comment() { + return null; + } + + /** + * The {@link Transform} used to produce this metadata column from data rows, or null. + * + * @return a {@link Transform} used to produce the column's values, or null if there isn't one + */ + default Transform transform() { + return null; + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.java new file mode 100644 index 0000000000000..fc313491f2970 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.java @@ -0,0 +1,37 @@ +package org.apache.spark.sql.connector.catalog; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +/** + * An interface for exposing data columns for a table that are not in the table schema. For example, + * a file source could expose a "file" column that contains the path of the file that contained each + * row. + *

+ * The columns returned by {@link #metadataColumns()} may be passed as {@link StructField} in + * requested projections. Sources that implement this interface and column projection using + * {@link SupportsPushDownRequiredColumns} must accept metadata fields passed to + * {@link SupportsPushDownRequiredColumns#pruneColumns(StructType)}. + *

+ * If a table column and a metadata column have the same name, the metadata column will never be + * requested. It is recommended that Table implementations reject data column name that conflict + * with metadata column names. + */ +@Evolving +public interface SupportsMetadataColumns extends Table { + /** + * Metadata columns that are supported by this {@link Table}. + *

+ * The columns returned by this method may be passed as {@link StructField} in requested + * projections using {@link SupportsPushDownRequiredColumns#pruneColumns(StructType)}. + *

+ * If a table column and a metadata column have the same name, the metadata column will never be + * requested and is ignored. It is recommended that Table implementations reject data column names + * that conflict with metadata column names. + * + * @return an array of {@link MetadataColumn} + */ + MetadataColumn[] metadataColumns(); +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 14b50f481f387..8d95d8cf49d45 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -221,6 +221,7 @@ class Analyzer(override val catalogManager: CatalogManager) ResolveRelations :: ResolveTables :: ResolvePartitionSpec :: + AddMetadataColumns :: ResolveReferences :: ResolveCreateNamedStruct :: ResolveDeserializer :: @@ -916,6 +917,29 @@ class Analyzer(override val catalogManager: CatalogManager) } } + /** + * Adds metadata columns to output for child relations when nodes are missing resolved attributes. + * + * References to metadata columns are resolved using columns from [[LogicalPlan.metadataOutput]], + * but the relation's output does not include the metadata columns until the relation is replaced + * using [[DataSourceV2Relation.withMetadataColumns()]]. Unless this rule adds metadata to the + * relation's output, the analyzer will detect that nothing produces the columns. + * + * This rule only adds metadata columns when a node is resolved but is missing input from its + * children. This ensures that metadata columns are not added to the plan unless they are used. By + * checking only resolved nodes, this ensures that * expansion is already done so that metadata + * columns are not accidentally selected by *. + */ + object AddMetadataColumns extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { + case node if node.resolved && node.children.nonEmpty && node.missingInput.nonEmpty => + node resolveOperatorsUp { + case rel: DataSourceV2Relation => + rel.withMetadataColumns() + } + } + } + /** * Resolve table relations with concrete relations from v2 catalog. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 48dfc5fd57e63..ad5c3fd74e9b5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -33,6 +33,9 @@ abstract class LogicalPlan with QueryPlanConstraints with Logging { + /** Metadata fields that can be projected from this node */ + def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput) + /** Returns true if this subtree has data from a streaming data source. */ def isStreaming: Boolean = children.exists(_.isStreaming) @@ -86,7 +89,8 @@ abstract class LogicalPlan } } - private[this] lazy val childAttributes = AttributeSeq(children.flatMap(_.output)) + private[this] lazy val childAttributes = + AttributeSeq(children.flatMap(c => c.output ++ c.metadataOutput)) private[this] lazy val outputAttributes = AttributeSeq(output) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 17bf704c6d67a..4e7923b45822b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -886,6 +886,12 @@ case class SubqueryAlias( val qualifierList = identifier.qualifier :+ alias child.output.map(_.withQualifier(qualifierList)) } + + override def metadataOutput: Seq[Attribute] = { + val qualifierList = identifier.qualifier :+ alias + child.metadataOutput.map(_.withQualifier(qualifierList)) + } + override def doCanonicalize(): LogicalPlan = child.canonicalized } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala index dfacf6e83ef57..8d91ea7c50cde 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala @@ -21,7 +21,9 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, ResolvedPartitionSpec, UnresolvedPartitionSpec} -import org.apache.spark.sql.connector.catalog.{SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability} +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability} +import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap object DataSourceV2Implicits { @@ -78,6 +80,18 @@ object DataSourceV2Implicits { def supportsAny(capabilities: TableCapability*): Boolean = capabilities.exists(supports) } + implicit class MetadataColumnsHelper(metadata: Array[MetadataColumn]) { + def asStruct: StructType = { + val fields = metadata.map { metaCol => + val field = StructField(metaCol.name, metaCol.dataType, metaCol.isNullable) + Option(metaCol.comment).map(field.withComment).getOrElse(field) + } + StructType(fields) + } + + def toAttributes: Seq[AttributeReference] = asStruct.toAttributes + } + implicit class OptionsHelper(options: Map[String, String]) { def asOptions: CaseInsensitiveStringMap = { new CaseInsensitiveStringMap(options.asJava) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 45d89498f5ae9..b09ccff39f842 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -21,10 +21,11 @@ import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelat import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, Table, TableCapability} +import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, MetadataColumn, SupportsMetadataColumns, Table, TableCapability} import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, Statistics => V2Statistics, SupportsReportStatistics} import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream} import org.apache.spark.sql.connector.write.WriteBuilder +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.Utils @@ -48,6 +49,21 @@ case class DataSourceV2Relation( import DataSourceV2Implicits._ + override lazy val metadataOutput: Seq[AttributeReference] = table match { + case hasMeta: SupportsMetadataColumns => + val resolve = SQLConf.get.resolver + val outputNames = outputSet.map(_.name) + def isOutputColumn(col: MetadataColumn): Boolean = { + outputNames.exists(name => resolve(col.name, name)) + } + // filter out metadata columns that have names conflicting with output columns. if the table + // has a column "line" and the table can produce a metadata column called "line", then the + // data column should be returned, not the metadata column. + hasMeta.metadataColumns.filterNot(isOutputColumn).toAttributes + case _ => + Nil + } + override def name: String = table.name() override def skipSchemaResolution: Boolean = table.supports(TableCapability.ACCEPT_ANY_SCHEMA) @@ -78,6 +94,14 @@ case class DataSourceV2Relation( override def newInstance(): DataSourceV2Relation = { copy(output = output.map(_.newInstance())) } + + def withMetadataColumns(): DataSourceV2Relation = { + if (metadataOutput.nonEmpty) { + DataSourceV2Relation(table, output ++ metadataOutput, catalog, identifier, options) + } else { + this + } + } } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala index b0325600e7530..3b47271a114e2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala @@ -27,6 +27,7 @@ import scala.collection.mutable import org.scalatest.Assertions._ import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, HoursTransform, IdentityTransform, MonthsTransform, Transform, YearsTransform} @@ -34,8 +35,9 @@ import org.apache.spark.sql.connector.read._ import org.apache.spark.sql.connector.write._ import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull} -import org.apache.spark.sql.types.{DataType, DateType, StructType, TimestampType} +import org.apache.spark.sql.types.{DataType, DateType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.unsafe.types.UTF8String /** * A simple in-memory table. Rows are stored as a buffered group produced by each output task. @@ -45,7 +47,24 @@ class InMemoryTable( val schema: StructType, override val partitioning: Array[Transform], override val properties: util.Map[String, String]) - extends Table with SupportsRead with SupportsWrite with SupportsDelete { + extends Table with SupportsRead with SupportsWrite with SupportsDelete + with SupportsMetadataColumns { + + private object PartitionKeyColumn extends MetadataColumn { + override def name: String = "_partition" + override def dataType: DataType = StringType + override def comment: String = "Partition key used to store the row" + } + + private object IndexColumn extends MetadataColumn { + override def name: String = "index" + override def dataType: DataType = StringType + override def comment: String = "Metadata column used to conflict with a data column" + } + + // purposely exposes a metadata column that conflicts with a data column in some tests + override val metadataColumns: Array[MetadataColumn] = Array(IndexColumn, PartitionKeyColumn) + private val metadataColumnNames = metadataColumns.map(_.name).toSet -- schema.map(_.name) private val allowUnsupportedTransforms = properties.getOrDefault("allow-unsupported-transforms", "false").toBoolean @@ -146,7 +165,7 @@ class InMemoryTable( val key = getKey(row) dataMap += dataMap.get(key) .map(key -> _.withRow(row)) - .getOrElse(key -> new BufferedRows().withRow(row)) + .getOrElse(key -> new BufferedRows(key.toArray.mkString("/")).withRow(row)) }) this } @@ -160,17 +179,38 @@ class InMemoryTable( TableCapability.TRUNCATE).asJava override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { - () => new InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition])) + new InMemoryScanBuilder(schema) + } + + class InMemoryScanBuilder(tableSchema: StructType) extends ScanBuilder + with SupportsPushDownRequiredColumns { + private var schema: StructType = tableSchema + + override def build: Scan = + new InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]), schema) + + override def pruneColumns(requiredSchema: StructType): Unit = { + // if metadata columns are projected, return the table schema and metadata columns + val hasMetadataColumns = requiredSchema.map(_.name).exists(metadataColumnNames.contains) + if (hasMetadataColumns) { + schema = StructType(tableSchema ++ metadataColumnNames + .flatMap(name => metadataColumns.find(_.name == name)) + .map(col => StructField(col.name, col.dataType, col.isNullable))) + } + } } - class InMemoryBatchScan(data: Array[InputPartition]) extends Scan with Batch { + class InMemoryBatchScan(data: Array[InputPartition], schema: StructType) extends Scan with Batch { override def readSchema(): StructType = schema override def toBatch: Batch = this override def planInputPartitions(): Array[InputPartition] = data - override def createReaderFactory(): PartitionReaderFactory = BufferedRowsReaderFactory + override def createReaderFactory(): PartitionReaderFactory = { + val metadataColumns = schema.map(_.name).filter(metadataColumnNames.contains) + new BufferedRowsReaderFactory(metadataColumns) + } } override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { @@ -340,7 +380,8 @@ object InMemoryTable { } } -class BufferedRows extends WriterCommitMessage with InputPartition with Serializable { +class BufferedRows( + val key: String = "") extends WriterCommitMessage with InputPartition with Serializable { val rows = new mutable.ArrayBuffer[InternalRow]() def withRow(row: InternalRow): BufferedRows = { @@ -349,13 +390,24 @@ class BufferedRows extends WriterCommitMessage with InputPartition with Serializ } } -private object BufferedRowsReaderFactory extends PartitionReaderFactory { +private class BufferedRowsReaderFactory( + metadataColumns: Seq[String]) extends PartitionReaderFactory { override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { - new BufferedRowsReader(partition.asInstanceOf[BufferedRows]) + new BufferedRowsReader(partition.asInstanceOf[BufferedRows], metadataColumns) } } -private class BufferedRowsReader(partition: BufferedRows) extends PartitionReader[InternalRow] { +private class BufferedRowsReader( + partition: BufferedRows, + metadataColumns: Seq[String]) extends PartitionReader[InternalRow] { + private def addMetadata(row: InternalRow): InternalRow = { + val metadataRow = new GenericInternalRow(metadataColumns.map { + case "index" => index + case "_partition" => UTF8String.fromString(partition.key) + }.toArray) + new JoinedRow(row, metadataRow) + } + private var index: Int = -1 override def next(): Boolean = { @@ -363,7 +415,7 @@ private class BufferedRowsReader(partition: BufferedRows) extends PartitionReade index < partition.rows.length } - override def get(): InternalRow = partition.rows(index) + override def get(): InternalRow = addMetadata(partition.rows(index)) override def close(): Unit = {} } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala index 81b1c81499c74..0cbcad1f48026 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema} -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Table, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsMetadataColumns, Table, TableCatalog} import org.apache.spark.sql.types.StructType case class DescribeTableExec( @@ -41,6 +41,7 @@ case class DescribeTableExec( addPartitioning(rows) if (isExtended) { + addMetadataColumns(rows) addTableDetails(rows) } rows.toSeq @@ -72,6 +73,19 @@ case class DescribeTableExec( } } + private def addMetadataColumns(rows: ArrayBuffer[InternalRow]): Unit = table match { + case hasMeta: SupportsMetadataColumns if hasMeta.metadataColumns.nonEmpty => + rows += emptyRow() + rows += toCatalystRow("# Metadata Columns", "", "") + rows ++= hasMeta.metadataColumns.map { column => + toCatalystRow( + column.name, + column.dataType.simpleString, + Option(column.comment()).getOrElse("")) + } + case _ => + } + private def addPartitioning(rows: ArrayBuffer[InternalRow]): Unit = { rows += emptyRow() rows += toCatalystRow("# Partitioning", "", "") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala index 7f6ae20d5cd0b..ce8edce6f08d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala @@ -96,13 +96,11 @@ object PushDownUtils extends PredicateHelper { val exprs = projects ++ filters val requiredColumns = AttributeSet(exprs.flatMap(_.references)) val neededOutput = relation.output.filter(requiredColumns.contains) - if (neededOutput != relation.output) { - r.pruneColumns(neededOutput.toStructType) - val scan = r.build() - scan -> toOutputAttrs(scan.readSchema(), relation) - } else { - r.build() -> relation.output - } + r.pruneColumns(neededOutput.toStructType) + val scan = r.build() + // always project, in case the relation's output has been updated and doesn't match + // the underlying table schema + scan -> toOutputAttrs(scan.readSchema(), relation) case _ => scanBuilder.build() -> relation.output } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 5f7be7c4c565b..4eaf5822e1628 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -139,6 +139,10 @@ class DataSourceV2SQLSuite Array("# Partitioning", "", ""), Array("Part 0", "id", ""), Array("", "", ""), + Array("# Metadata Columns", "", ""), + Array("index", "string", "Metadata column used to conflict with a data column"), + Array("_partition", "string", "Partition key used to store the row"), + Array("", "", ""), Array("# Detailed Table Information", "", ""), Array("Name", "testcat.table_name", ""), Array("Comment", "this is a test table", ""), @@ -2470,6 +2474,45 @@ class DataSourceV2SQLSuite } } + test("SPARK-31255: Project a metadata column") { + val t1 = s"${catalogAndNamespace}table" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " + + "PARTITIONED BY (bucket(4, id), id)") + sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')") + + checkAnswer( + spark.sql(s"SELECT id, data, _partition FROM $t1"), + Seq(Row(1, "a", "3/1"), Row(2, "b", "2/2"), Row(3, "c", "2/3"))) + } + } + + test("SPARK-31255: Projects data column when metadata column has the same name") { + val t1 = s"${catalogAndNamespace}table" + withTable(t1) { + sql(s"CREATE TABLE $t1 (index bigint, data string) USING $v2Format " + + "PARTITIONED BY (bucket(4, index), index)") + sql(s"INSERT INTO $t1 VALUES (3, 'c'), (2, 'b'), (1, 'a')") + + checkAnswer( + spark.sql(s"SELECT index, data, _partition FROM $t1"), + Seq(Row(3, "c", "2/3"), Row(2, "b", "2/2"), Row(1, "a", "3/1"))) + } + } + + test("SPARK-31255: * expansion does not include metadata columns") { + val t1 = s"${catalogAndNamespace}table" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " + + "PARTITIONED BY (bucket(4, id), id)") + sql(s"INSERT INTO $t1 VALUES (3, 'c'), (2, 'b'), (1, 'a')") + + checkAnswer( + spark.sql(s"SELECT * FROM $t1"), + Seq(Row(3, "c"), Row(2, "b"), Row(1, "a"))) + } + } + private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = { val e = intercept[AnalysisException] { sql(s"$sqlCommand $sqlParams")