Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ trait V2WriteCommand extends Command {
case (inAttr, outAttr) =>
// names and types must match, nullability must be compatible
inAttr.name == outAttr.name &&
DataType.equalsIgnoreCompatibleNullability(outAttr.dataType, inAttr.dataType) &&
DataType.equalsIgnoreCompatibleNullability(inAttr.dataType, outAttr.dataType) &&
Copy link
Member

@dongjoon-hyun dongjoon-hyun Oct 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @HeartSaVioR . The original code looks like the same with branch-2.4 but the issue is reported at 3.0.0+. Could you confirm that this is 3.0.0-only issue or not?

  override lazy val resolved: Boolean = {
    table.resolved && query.resolved && query.output.size == table.output.size &&
        query.output.zip(table.output).forall {
          case (inAttr, outAttr) =>
            // names and types must match, nullability must be compatible
            inAttr.name == outAttr.name &&
                DataType.equalsIgnoreCompatibleNullability(outAttr.dataType, inAttr.dataType) &&
                (outAttr.nullable || !inAttr.nullable)
        }
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks for noticing. Nice finding. I found it from V2WriteCommand so thought it was added later. Will check the code path in branch-2.4 and test it.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Oct 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage of AppendData is reverted in b6e4aca for branch-2.4 and shipped to Spark 2.4.0. So while the code in AppendData for branch-2.4 is broken as well, it's a dead code.

We seem to have three options: 1) revert remaining part of AppendData in branch-2.4 2) fix the code but leave it as dead 3) leave it as it is. What's our preference?

cc. @cloud-fan @HyukjinKwon

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer (2) because there is a downstream using it in their fork.

(outAttr.nullable || !inAttr.nullable)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@ import scala.collection.JavaConverters._

import org.scalatest.BeforeAndAfter

import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic}
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamedRelation, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, V2WriteCommand}
import org.apache.spark.sql.connector.{InMemoryTable, InMemoryTableCatalog}
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.types.{ArrayType, DataType, IntegerType, LongType, MapType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -101,6 +100,86 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(v2.catalog.exists(_ == catalogPlugin))
}

case class FakeV2WriteCommand(table: NamedRelation, query: LogicalPlan) extends V2WriteCommand

test("SPARK-33136 output resolved on complex types for V2 write commands") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this bug only affects complex types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, according to the implementation of equalsIgnoreCompatibleNullability.

private[sql] def equalsIgnoreCompatibleNullability(from: DataType, to: DataType): Boolean = {
    (from, to) match {
      case (ArrayType(fromElement, fn), ArrayType(toElement, tn)) =>
        (tn || !fn) && equalsIgnoreCompatibleNullability(fromElement, toElement)

      case (MapType(fromKey, fromValue, fn), MapType(toKey, toValue, tn)) =>
        (tn || !fn) &&
          equalsIgnoreCompatibleNullability(fromKey, toKey) &&
          equalsIgnoreCompatibleNullability(fromValue, toValue)

      case (StructType(fromFields), StructType(toFields)) =>
        fromFields.length == toFields.length &&
          fromFields.zip(toFields).forall { case (fromField, toField) =>
            fromField.name == toField.name &&
              (toField.nullable || !fromField.nullable) &&
              equalsIgnoreCompatibleNullability(fromField.dataType, toField.dataType)
          }

      case (fromDataType, toDataType) => fromDataType == toDataType
    }
  }

For primitive types the order doesn't affect the result. outputResolved itself does the right comparison, just except the swapped parameters.

val tableCatalog = catalog("testcat")

def assertTypeCompatibility(name: String, fromType: DataType, toType: DataType): Unit = {
val fromTableName = s"from_table_$name"
tableCatalog.createTable(
Identifier.of(Array(), fromTableName),
StructType(Array(StructField("col", fromType))),
Array.empty,
new java.util.HashMap[String, String]())

val toTable = tableCatalog.createTable(
Identifier.of(Array(), s"to_table_$name"),
StructType(Array(StructField("col", toType))),
Array.empty,
new java.util.HashMap[String, String]())

val df = spark.table(s"testcat.$fromTableName")

val relation = DataSourceV2Relation.create(toTable, Some(tableCatalog), None)
val writeCommand = FakeV2WriteCommand(relation, df.queryExecution.analyzed)

assert(writeCommand.outputResolved, s"Unable to write from type $fromType to type $toType.")
}

// The major difference between `from` and `to` is that `from` is a complex type
// with non-nullable, whereas `to` is same data type with flipping nullable.

// nested struct type
val fromStructType = StructType(Array(
StructField("s", StringType),
StructField("i_nonnull", IntegerType, nullable = false),
StructField("st", StructType(Array(
StructField("l", LongType),
StructField("s_nonnull", StringType, nullable = false))))))

val toStructType = StructType(Array(
StructField("s", StringType),
StructField("i_nonnull", IntegerType),
StructField("st", StructType(Array(
StructField("l", LongType),
StructField("s_nonnull", StringType))))))

assertTypeCompatibility("struct", fromStructType, toStructType)

// array type
assertTypeCompatibility("array", ArrayType(LongType, containsNull = false),
ArrayType(LongType, containsNull = true))

// array type with struct type
val fromArrayWithStructType = ArrayType(
StructType(Array(StructField("s", StringType, nullable = false))),
containsNull = false)

val toArrayWithStructType = ArrayType(
StructType(Array(StructField("s", StringType))),
containsNull = true)

assertTypeCompatibility("array_struct", fromArrayWithStructType, toArrayWithStructType)

// map type
assertTypeCompatibility("map", MapType(IntegerType, StringType, valueContainsNull = false),
MapType(IntegerType, StringType, valueContainsNull = true))

// map type with struct type
val fromMapWithStructType = MapType(
IntegerType,
StructType(Array(StructField("s", StringType, nullable = false))),
valueContainsNull = false)

val toMapWithStructType = MapType(
IntegerType,
StructType(Array(StructField("s", StringType))),
valueContainsNull = true)

assertTypeCompatibility("map_struct", fromMapWithStructType, toMapWithStructType)
}

test("Append: basic append") {
spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo")

Expand Down