diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java index 7fff09fae6a3e..c44a12b174f4c 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java @@ -84,5 +84,10 @@ public enum TableCapability { *

* See {@code org.apache.spark.sql.sources.v2.writer.SupportsDynamicOverwrite}. */ - OVERWRITE_DYNAMIC + OVERWRITE_DYNAMIC, + + /** + * Signals that the table accepts input of any schema in a write operation. + */ + ACCEPT_ANY_SCHEMA } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NamedRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NamedRelation.scala index ad201f947b671..56b8d84441c95 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NamedRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NamedRelation.scala @@ -21,4 +21,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan trait NamedRelation extends LogicalPlan { def name: String + + // When false, the schema of input data must match the schema of this relation, during write. + def skipSchemaResolution: Boolean = false } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 2b98132f188f5..1925d45591902 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -393,14 +393,17 @@ trait V2WriteCommand extends Command { override lazy val resolved: Boolean = outputResolved def outputResolved: Boolean = { - table.resolved && query.resolved && query.output.size == table.output.size && + // If the table doesn't require schema match, we don't need to resolve the output columns. + table.skipSchemaResolution || { + table.resolved && query.resolved && query.output.size == table.output.size && query.output.zip(table.output).forall { case (inAttr, outAttr) => // names and types must match, nullability must be compatible inAttr.name == outAttr.name && - DataType.equalsIgnoreCompatibleNullability(outAttr.dataType, inAttr.dataType) && - (outAttr.nullable || !inAttr.nullable) + DataType.equalsIgnoreCompatibleNullability(outAttr.dataType, inAttr.dataType) && + (outAttr.nullable || !inAttr.nullable) } + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala index 0c48548614266..48b43fcccacef 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.catalyst.analysis import java.util.Locale import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Expression, LessThanOrEqual, Literal} -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LeafNode, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Project} -import org.apache.spark.sql.types.{DoubleType, FloatType, StructField, StructType} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.types._ class V2AppendDataAnalysisSuite extends DataSourceV2AnalysisSuite { override def byName(table: NamedRelation, query: LogicalPlan): LogicalPlan = { @@ -104,6 +104,12 @@ case class TestRelation(output: Seq[AttributeReference]) extends LeafNode with N override def name: String = "table-name" } +case class TestRelationAcceptAnySchema(output: Seq[AttributeReference]) + extends LeafNode with NamedRelation { + override def name: String = "test-name" + override def skipSchemaResolution: Boolean = true +} + abstract class DataSourceV2AnalysisSuite extends AnalysisTest { val table = TestRelation(StructType(Seq( StructField("x", FloatType), @@ -446,6 +452,27 @@ abstract class DataSourceV2AnalysisSuite extends AnalysisTest { "Cannot safely cast", "'x'", "DoubleType to FloatType")) } + test("bypass output column resolution") { + val table = TestRelationAcceptAnySchema(StructType(Seq( + StructField("a", FloatType, nullable = false), + StructField("b", DoubleType))).toAttributes) + + val query = TestRelation(StructType(Seq( + StructField("s", StringType))).toAttributes) + + withClue("byName") { + val parsedPlan = byName(table, query) + assertResolved(parsedPlan) + checkAnalysis(parsedPlan, parsedPlan) + } + + withClue("byPosition") { + val parsedPlan = byPosition(table, query) + assertResolved(parsedPlan) + checkAnalysis(parsedPlan, parsedPlan) + } + } + def assertNotResolved(logicalPlan: LogicalPlan): Unit = { assert(!logicalPlan.resolved, s"Plan should not be resolved: $logicalPlan") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala index e91e2b48db48d..6b4efaf303c66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala @@ -44,7 +44,10 @@ private[noop] object NoopTable extends Table with SupportsWrite { override def name(): String = "noop-table" override def schema(): StructType = new StructType() override def capabilities(): util.Set[TableCapability] = { - Set(TableCapability.BATCH_WRITE, TableCapability.STREAMING_WRITE).asJava + Set( + TableCapability.BATCH_WRITE, + TableCapability.STREAMING_WRITE, + TableCapability.ACCEPT_ANY_SCHEMA).asJava } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index e7e0be0dddf4f..27875b3f7923d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -44,6 +44,8 @@ case class DataSourceV2Relation( override def name: String = table.name() + override def skipSchemaResolution: Boolean = table.supports(TableCapability.ACCEPT_ANY_SCHEMA) + override def simpleString(maxFields: Int): String = { s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name" }