Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private[mllib] object Loader {
assert(loadedFields.contains(field.name), s"Unable to parse model data." +
s" Expected field with name ${field.name} was missing in loaded schema:" +
s" ${loadedFields.mkString(", ")}")
assert(loadedFields(field.name) == field.dataType,
assert(DataType.equalsIgnoreNullability(loadedFields(field.name), field.dataType),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mengxr Is it OK to ignore the nullability of ArrayType (containsNull field) and MapType (valueContainsNull field) in this check?

s"Unable to parse model data. Expected field $field but found field" +
s" with different type: ${loadedFields(field.name)}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ case class InsertIntoTable(
override def output = child.output

override lazy val resolved = childrenResolved && child.output.zip(table.output).forall {
case (childAttr, tableAttr) => childAttr.dataType == tableAttr.dataType
case (childAttr, tableAttr) =>
DataType.equalsIgnoreCompatibleNullability(childAttr.dataType, tableAttr.dataType)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ object DataType {
/**
* Compares two types, ignoring nullability of ArrayType, MapType, StructType.
*/
private[sql] def equalsIgnoreNullability(left: DataType, right: DataType): Boolean = {
private[spark] def equalsIgnoreNullability(left: DataType, right: DataType): Boolean = {
(left, right) match {
case (ArrayType(leftElementType, _), ArrayType(rightElementType, _)) =>
equalsIgnoreNullability(leftElementType, rightElementType)
Expand All @@ -198,6 +198,57 @@ object DataType {
case (left, right) => left == right
}
}

/**
* Compares two types, ignoring compatible nullability of ArrayType, MapType, StructType.
*
* Compatible nullability is defined as follows:
* - If `from` and `to` are ArrayTypes, `from` has a compatible nullability with `to`
* if and only if `from.containsNull` is false, or both of `from.containsNull` and
* `to.containsNull` are true.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be better to reword this definition to make it consistent with the code:

If from and to are ArrayTypes, from has a compatible nullablity with to if and only if to.containsNull is true or both to.containsNull and from.containsNull are false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same applies to MapType and StructType below.

* - If `from` and `to` are MapTypes, `from` has a compatible nullability with `to`
* if and only if `from.valueContainsNull` is false, or both of `from.valueContainsNull` and
* `to.valueContainsNull` are true.
* - If `from` and `to` are StructTypes, `from` has a compatible nullability with `to`
* if and only if for all every pair of fields, `fromField.nullable` is false, or both
* of `fromField.nullable` and `toField.nullable` are true.
*/
private[spark] def equalsIgnoreCompatibleNullability(from: DataType, to: DataType): Boolean = {
(from, to) match {
case (ArrayType(fromElement, fn), ArrayType(toElement, tn)) =>
(tn || !fn) && equalsIgnoreCompatibleNullability(fromElement, toElement)

case (MapType(fromKey, fromValue, fn), MapType(toKey, toValue, tn)) =>
(tn || !fn) &&
equalsIgnoreCompatibleNullability(fromKey, toKey) &&
equalsIgnoreCompatibleNullability(fromValue, toValue)

case (StructType(fromFields), StructType(toFields)) =>
fromFields.size == toFields.size &&
fromFields.zip(toFields).forall {
case (fromField, toField) =>
fromField.name == toField.name &&
(toField.nullable || !fromField.nullable) &&
equalsIgnoreCompatibleNullability(fromField.dataType, toField.dataType)
}

case (fromDataType, toDataType) => fromDataType == toDataType
}
}

/** Sets all nullable/containsNull/valueContainsNull to true. */
private[spark] def alwaysNullable(dataType: DataType): DataType = dataType match {
case ArrayType(elementType, _) =>
ArrayType(alwaysNullable(elementType), containsNull = true)
case MapType(keyType, valueType, _) =>
MapType(alwaysNullable(keyType), alwaysNullable(valueType), true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Use named parameter for valueContainsNull.

case StructType(fields) =>
val newFields = fields.map { field =>
StructField(field.name, alwaysNullable(field.dataType), nullable = true)
}
StructType(newFields)
case other => other
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,84 @@ class DataTypeSuite extends FunSuite {
checkDefaultSize(MapType(IntegerType, StringType, true), 410000)
checkDefaultSize(MapType(IntegerType, ArrayType(DoubleType), false), 80400)
checkDefaultSize(structType, 812)

def checkEqualsIgnoreCompatibleNullability(
from: DataType,
to: DataType,
expected: Boolean): Unit = {
val testName =
s"equalsIgnoreCompatibleNullability: (from: ${from}, to: ${to})"
test(testName) {
assert(DataType.equalsIgnoreCompatibleNullability(from, to) === expected)
}
}

checkEqualsIgnoreCompatibleNullability(
from = ArrayType(DoubleType, containsNull = true),
to = ArrayType(DoubleType, containsNull = true),
expected = true)
checkEqualsIgnoreCompatibleNullability(
from = ArrayType(DoubleType, containsNull = false),
to = ArrayType(DoubleType, containsNull = false),
expected = true)
checkEqualsIgnoreCompatibleNullability(
from = ArrayType(DoubleType, containsNull = false),
to = ArrayType(DoubleType, containsNull = true),
expected = true)
checkEqualsIgnoreCompatibleNullability(
from = ArrayType(DoubleType, containsNull = true),
to = ArrayType(DoubleType, containsNull = false),
expected = false)
checkEqualsIgnoreCompatibleNullability(
from = ArrayType(DoubleType, containsNull = false),
to = ArrayType(StringType, containsNull = false),
expected = false)

checkEqualsIgnoreCompatibleNullability(
from = MapType(StringType, DoubleType, valueContainsNull = true),
to = MapType(StringType, DoubleType, valueContainsNull = true),
expected = true)
checkEqualsIgnoreCompatibleNullability(
from = MapType(StringType, DoubleType, valueContainsNull = false),
to = MapType(StringType, DoubleType, valueContainsNull = false),
expected = true)
checkEqualsIgnoreCompatibleNullability(
from = MapType(StringType, DoubleType, valueContainsNull = false),
to = MapType(StringType, DoubleType, valueContainsNull = true),
expected = true)
checkEqualsIgnoreCompatibleNullability(
from = MapType(StringType, DoubleType, valueContainsNull = true),
to = MapType(StringType, DoubleType, valueContainsNull = false),
expected = false)
checkEqualsIgnoreCompatibleNullability(
from = MapType(StringType, ArrayType(IntegerType, true), valueContainsNull = true),
to = MapType(StringType, ArrayType(IntegerType, false), valueContainsNull = true),
expected = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to add another test case to show nested case:

  checkEqualsIgnoreCompatibleNullability(
    from = MapType(StringType, ArrayType(IntegerType, false), valueContainsNull = true),
    to = MapType(StringType,  ArrayType(IntegerType, true), valueContainsNull = true),
    expected = true)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done (added a test after this one)


checkEqualsIgnoreCompatibleNullability(
from = StructType(StructField("a", StringType, nullable = true) :: Nil),
to = StructType(StructField("a", StringType, nullable = true) :: Nil),
expected = true)
checkEqualsIgnoreCompatibleNullability(
from = StructType(StructField("a", StringType, nullable = false) :: Nil),
to = StructType(StructField("a", StringType, nullable = false) :: Nil),
expected = true)
checkEqualsIgnoreCompatibleNullability(
from = StructType(StructField("a", StringType, nullable = false) :: Nil),
to = StructType(StructField("a", StringType, nullable = true) :: Nil),
expected = true)
checkEqualsIgnoreCompatibleNullability(
from = StructType(StructField("a", StringType, nullable = true) :: Nil),
to = StructType(StructField("a", StringType, nullable = false) :: Nil),
expected = false)
checkEqualsIgnoreCompatibleNullability(
from = StructType(
StructField("a", StringType, nullable = false) ::
StructField("b", StringType, nullable = true) :: Nil),
to = StructType(
StructField("a", StringType, nullable = false) ::
StructField("b", StringType, nullable = false) :: Nil),
expected = false)


}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{DataType, StructType}


private[sql] class DefaultSource
Expand Down Expand Up @@ -131,7 +131,7 @@ private[sql] case class JSONRelation(

override def equals(other: Any): Boolean = other match {
case that: JSONRelation =>
(this.path == that.path) && (this.schema == that.schema)
(this.path == that.path) && (DataType.equalsIgnoreNullability(this.schema, that.schema))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also have this in ParquetRelation2.equals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since nullability affects how parquet encode and decode data, I think it is better to leave ParquetRelation2.equals unchanged for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ignore my comment. Will change it.

case _ => false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.logging.Level
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.permission.FsAction
import org.apache.spark.sql.types.{StructType, DataType}
import parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat}
import parquet.hadoop.metadata.CompressionCodecName
import parquet.schema.MessageType
Expand Down Expand Up @@ -172,9 +173,14 @@ private[sql] object ParquetRelation {
sqlContext.conf.parquetCompressionCodec.toUpperCase, CompressionCodecName.UNCOMPRESSED)
.name())
ParquetRelation.enableLogForwarding()
ParquetTypesConverter.writeMetaData(attributes, path, conf)
// This is a hack. We always set nullable/containsNull/valueContainsNull to true
// for the schema of a parquet data.
val schema =
DataType.alwaysNullable(StructType.fromAttributes(attributes)).asInstanceOf[StructType]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also make ParquetRelation2.schema always nullable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if we want to do this. For createEmpty (this change), we are creating a new parquet dataset. But, for ParquetRelation2, the schema may be created for an existing dataset. For every place we create a new parquet dataset, we will call alwaysNullable.

val newAttributes = schema.toAttributes
ParquetTypesConverter.writeMetaData(newAttributes, path, conf)
new ParquetRelation(path.toString, Some(conf), sqlContext) {
override val output = attributes
override val output = newAttributes
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liancheng @marmbrus I am also changing the nullability for our old parquet write path to make the behavior consistent with our new write path. Let me know if there is any potential compatibility issue and we should revert this change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also make data types of ParquetRelation.output always nullable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, since we always write nullable data, it should be OK to leave ParquetRelation.output untouched.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified that when merging schemas, official Parquet implementation will handle nullability (repetition level) properly. So our change should be safe for interoperation with other systems that support Parquet schema evolving.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,11 @@ private[sql] case class InsertIntoParquetTable(
ParquetOutputFormat.setWriteSupportClass(job, writeSupport)

val conf = ContextUtil.getConfiguration(job)
RowWriteSupport.setSchema(relation.output, conf)
// This is a hack. We always set nullable/containsNull/valueContainsNull to true
// for the schema of a parquet data.
val schema =
DataType.alwaysNullable(StructType.fromAttributes(relation.output)).asInstanceOf[StructType]
RowWriteSupport.setSchema(schema.toAttributes, conf)

val fspath = new Path(relation.path)
val fs = fspath.getFileSystem(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,15 @@ private[sql] class DefaultSource
}

val relation = if (doInsertion) {
// This is a hack. We always set nullable/containsNull/valueContainsNull to true
// for the schema of a parquet data.
val df =
sqlContext.createDataFrame(
data.queryExecution.toRdd,
DataType.alwaysNullable(data.schema).asInstanceOf[StructType])
val createdRelation =
createRelation(sqlContext, parameters, data.schema).asInstanceOf[ParquetRelation2]
createdRelation.insert(data, overwrite = mode == SaveMode.Overwrite)
createRelation(sqlContext, parameters, df.schema).asInstanceOf[ParquetRelation2]
createdRelation.insert(df, overwrite = mode == SaveMode.Overwrite)
createdRelation
} else {
// If the save mode is Ignore, we will just create the relation based on existing data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.sources
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{LogicalRDD, RunnableCommand}
import org.apache.spark.sql.execution.RunnableCommand

private[sql] case class InsertIntoDataSource(
logicalRelation: LogicalRelation,
Expand All @@ -29,7 +29,10 @@ private[sql] case class InsertIntoDataSource(

override def run(sqlContext: SQLContext) = {
val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
relation.insert(DataFrame(sqlContext, query), overwrite)
val data = DataFrame(sqlContext, query)
// Apply the schema of the existing table to the new data.
val df = sqlContext.createDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
relation.insert(df, overwrite)

// Invalidate the cache.
sqlContext.cacheManager.invalidateCache(logicalRelation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ case class CreateMetastoreDataSourceAsSelect(
options
}

var existingSchema = None: Option[StructType]
if (sqlContext.catalog.tableExists(Seq(tableName))) {
// Check if we need to throw an exception or just return.
mode match {
Expand All @@ -188,22 +189,7 @@ case class CreateMetastoreDataSourceAsSelect(
val createdRelation = LogicalRelation(resolved.relation)
EliminateSubQueries(sqlContext.table(tableName).logicalPlan) match {
case l @ LogicalRelation(i: InsertableRelation) =>
if (l.schema != createdRelation.schema) {
val errorDescription =
s"Cannot append to table $tableName because the schema of this " +
s"DataFrame does not match the schema of table $tableName."
val errorMessage =
s"""
|$errorDescription
|== Schemas ==
|${sideBySide(
s"== Expected Schema ==" +:
l.schema.treeString.split("\\\n"),
s"== Actual Schema ==" +:
createdRelation.schema.treeString.split("\\\n")).mkString("\n")}
""".stripMargin
throw new AnalysisException(errorMessage)
} else if (i != createdRelation.relation) {
if (i != createdRelation.relation) {
val errorDescription =
s"Cannot append to table $tableName because the resolved relation does not " +
s"match the existing relation of $tableName. " +
Expand All @@ -221,6 +207,7 @@ case class CreateMetastoreDataSourceAsSelect(
""".stripMargin
throw new AnalysisException(errorMessage)
}
existingSchema = Some(l.schema)
case o =>
throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
}
Expand All @@ -234,15 +221,23 @@ case class CreateMetastoreDataSourceAsSelect(
createMetastoreTable = true
}

val df = DataFrame(hiveContext, query)
val data = DataFrame(hiveContext, query)
val df = existingSchema match {
// If we are inserting into an existing table, just use the existing schema.
case Some(schema) => sqlContext.createDataFrame(data.queryExecution.toRdd, schema)
case None => data
}

// Create the relation based on the data of df.
ResolvedDataSource(sqlContext, provider, mode, optionsWithPath, df)
val resolved = ResolvedDataSource(sqlContext, provider, mode, optionsWithPath, df)

if (createMetastoreTable) {
// We will use the schema of resolved.relation as the schema of the table (instead of
// the schema of df). It is important since the nullability may be changed by the relation
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
hiveContext.catalog.createDataSourceTable(
tableName,
Some(df.schema),
Some(resolved.relation.schema),
provider,
optionsWithPath,
isExternal)
Expand Down
Loading