Skip to content
3 changes: 3 additions & 0 deletions python/pyspark/sql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ class NullType(DataType):

__metaclass__ = DataTypeSingleton

def simpleString(self):
return 'unknown'


class AtomicType(DataType):
"""An internal type used to represent everything that is not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case AlterTableAddColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
cols.foreach(c => failNullType(c.dataType))
cols.foreach(c => failCharType(c.dataType))
val changes = cols.map { col =>
TableChange.addColumn(
Expand All @@ -47,6 +48,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)

case AlterTableReplaceColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
cols.foreach(c => failNullType(c.dataType))
cols.foreach(c => failCharType(c.dataType))
val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match {
case Some(table) =>
Expand All @@ -69,6 +71,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)

case a @ AlterTableAlterColumnStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
a.dataType.foreach(failNullType)
a.dataType.foreach(failCharType)
val colName = a.column.toArray
val typeChange = a.dataType.map { newDataType =>
Expand Down Expand Up @@ -145,6 +148,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)

case c @ CreateTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
assertNoCharTypeInSchema(c.tableSchema)
CreateV2Table(
catalog.asTableCatalog,
Expand All @@ -157,6 +161,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager)

case c @ CreateTableAsSelectStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
if (c.asSelect.resolved) {
assertNoNullTypeInSchema(c.asSelect.schema)
}
CreateTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
Expand All @@ -172,6 +179,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)

case c @ ReplaceTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
assertNoCharTypeInSchema(c.tableSchema)
ReplaceTable(
catalog.asTableCatalog,
Expand All @@ -184,6 +192,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager)

case c @ ReplaceTableAsSelectStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
if (c.asSelect.resolved) {
assertNoNullTypeInSchema(c.asSelect.schema)
}
ReplaceTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2211,6 +2211,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
DecimalType(precision.getText.toInt, 0)
case ("decimal" | "dec" | "numeric", precision :: scale :: Nil) =>
DecimalType(precision.getText.toInt, scale.getText.toInt)
case ("void", Nil) => NullType
case ("interval", Nil) => CalendarIntervalType
case (dt, params) =>
val dtStr = if (params.nonEmpty) s"$dt(${params.mkString(",")})" else dt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.AlterTable
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, DataType, HIVE_TYPE_STRING, HiveStringType, MapType, StructField, StructType}
import org.apache.spark.sql.types.{ArrayType, DataType, HIVE_TYPE_STRING, HiveStringType, MapType, NullType, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -346,4 +346,23 @@ private[sql] object CatalogV2Util {
}
}
}

def failNullType(dt: DataType): Unit = {
def containsNullType(dt: DataType): Boolean = dt match {
case ArrayType(et, _) => containsNullType(et)
case MapType(kt, vt, _) => containsNullType(kt) || containsNullType(vt)
case StructType(fields) => fields.exists(f => containsNullType(f.dataType))
case _ => dt.isInstanceOf[NullType]
}
if (containsNullType(dt)) {
throw new AnalysisException(
"Cannot create tables with unknown type.")
}
}

def assertNoNullTypeInSchema(schema: StructType): Unit = {
schema.foreach { f =>
failNullType(f.dataType)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ class NullType private() extends DataType {
override def defaultSize: Int = 1

private[spark] override def asNullable: NullType = this

// "null" is mainly used to represent a literal in Spark,
// it's better to avoid using it for data types.
override def simpleString: String = "unknown"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class DataTypeParserSuite extends SparkFunSuite {
checkDataType("varchAr(20)", StringType)
checkDataType("cHaR(27)", StringType)
checkDataType("BINARY", BinaryType)
checkDataType("void", NullType)
checkDataType("interval", CalendarIntervalType)

checkDataType("array<doublE>", ArrayType(DoubleType, true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ResolveSessionCatalog(
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case AlterTableAddColumnsStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
cols.foreach(c => failNullType(c.dataType))
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
if (!DDLUtils.isHiveTable(v1Table.v1Table)) {
Expand Down Expand Up @@ -76,6 +77,7 @@ class ResolveSessionCatalog(

case AlterTableReplaceColumnsStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
cols.foreach(c => failNullType(c.dataType))
val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match {
case Some(_: V1Table) =>
throw new AnalysisException("REPLACE COLUMNS is only supported with v2 tables.")
Expand All @@ -100,6 +102,7 @@ class ResolveSessionCatalog(

case a @ AlterTableAlterColumnStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
a.dataType.foreach(failNullType)
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
if (!DDLUtils.isHiveTable(v1Table.v1Table)) {
Expand Down Expand Up @@ -268,6 +271,7 @@ class ResolveSessionCatalog(
// session catalog and the table provider is not v2.
case c @ CreateTableStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if we could add a legacy flag for such behavior change in future. This changes the behavior for both v1 and v2 Catalogs in order to fix a compatibility issue with Hive Metastore. But Hive Metastore is not the only Catalog Spark supports since we have opened the Catalog APIs in DSv2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know any database that supports creating tables with null/void type column, so this change is not for hive compatibility but for reasonable SQL semantic.

I agree this is a breaking change that should be at least put in the migration guide. A legacy config can also be added but I can't find a reasonable use case for a null type column.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know any database that supports creating tables with null/void type column, so this change is not for hive compatibility but for reasonable SQL semantic.

I agree this is a breaking change that should be at least put in the migration guide. A legacy config can also be added but I can't find a reasonable use case for a null type column.

I think the main reason why you would want to support it is when people are using tables / views / temp tables to structure existing workloads. We support NullType type in CTEs, but in the case where people want to reuse the same CTE in multiple queries (i.e., multi-output workloads), they have no choice but to use views or temporary tables. (With DataFrames they'd still be able to reuse the same dataframe for multiple outputs, but in SQL that doesn't work.)

One typical use case where you use CTEs to structure your code is if you have multiple sources with different structures that you then UNION ALL together into a single dataset. It is not uncommon for each of the sources to have certain columns that don't apply, and then you write explicit NULLs there. It would be pretty annoying if you had to write explicit casts of those NULLs to the right type in all of those cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bart-samwel this makes sense, shall we also support CREATE TABLE t(c VOID)? Your case seems like CTAS only.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bart-samwel this makes sense, shall we also support CREATE TABLE t(c VOID)? Your case seems like CTAS only.

I think the CREATE TABLE case with explicit types is not very useful, but it could be useful if there were tools that get a table's schema and then try to recreate it, e.g. for mocking purposes. Probably best to be orthogonal here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LantaoJin do you have time to fix it? I think we can simply remove the null type check and add a few tests with both in-memory and hive catalog.

val provider = c.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
if (!DDLUtils.isHiveTable(Some(provider))) {
Expand All @@ -292,6 +296,9 @@ class ResolveSessionCatalog(

case c @ CreateTableAsSelectStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
if (c.asSelect.resolved) {
assertNoNullTypeInSchema(c.asSelect.schema)
}
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType,
Expand Down Expand Up @@ -319,6 +326,7 @@ class ResolveSessionCatalog(
// session catalog and the table provider is not v2.
case c @ ReplaceTableStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
throw new AnalysisException("REPLACE TABLE is only supported with v2 tables.")
Expand All @@ -336,6 +344,9 @@ class ResolveSessionCatalog(

case c @ ReplaceTableAsSelectStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
if (c.asSelect.resolved) {
assertNoNullTypeInSchema(c.asSelect.schema)
}
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
throw new AnalysisException("REPLACE TABLE AS SELECT is only supported with v2 tables.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema
import org.apache.spark.sql.connector.expressions.{FieldReference, RewritableTransform}
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
Expand Down Expand Up @@ -292,6 +293,8 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
"in the table definition of " + table.identifier,
sparkSession.sessionState.conf.caseSensitiveAnalysis)

assertNoNullTypeInSchema(schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? I think the changes in ResolveCatalogs and ResolveSessionCatalog should cover all the commands.

Copy link
Contributor Author

@LantaoJin LantaoJin Jul 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this, "CREATE TABLE t1 USING PARQUET AS SELECT null as null_col" in Spark will throw Parquet data source does not support null data type. instead of Cannot create tables with VOID type

Comparing the error message from Hive SemanticException [Error 10305]: CREATE-TABLE-AS-SELECT creates a VOID type, please use CAST to specify the type, near field: col, it's confused. So better to keep it.

Copy link
Contributor Author

@LantaoJin LantaoJin Jul 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan

Without this, "CREATE TABLE t1 USING PARQUET AS SELECT null as null_col" in Spark will throw Parquet data source does not support null data type. instead of Cannot create tables with VOID type

Sorry, above description is incorrect. Without this, CTAS for Hive table CREATE TABLE t2 AS SELECT null as null_col will pass. No exception throws.

Seems Hive table (non-parquet/orc format) doesn't go through ResolveSessionCatalog


val normalizedPartCols = normalizePartitionColumns(schema, table)
val normalizedBucketSpec = normalizeBucketSpec(schema, table)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
| org.apache.spark.sql.catalyst.expressions.Ascii | ascii | SELECT ascii('222') | struct<ascii(222):int> |
| org.apache.spark.sql.catalyst.expressions.Asin | asin | SELECT asin(0) | struct<ASIN(CAST(0 AS DOUBLE)):double> |
| org.apache.spark.sql.catalyst.expressions.Asinh | asinh | SELECT asinh(0) | struct<ASINH(CAST(0 AS DOUBLE)):double> |
| org.apache.spark.sql.catalyst.expressions.AssertTrue | assert_true | SELECT assert_true(0 < 1) | struct<assert_true((0 < 1)):null> |
| org.apache.spark.sql.catalyst.expressions.AssertTrue | assert_true | SELECT assert_true(0 < 1) | struct<assert_true((0 < 1)):unknown> |
| org.apache.spark.sql.catalyst.expressions.Atan | atan | SELECT atan(0) | struct<ATAN(CAST(0 AS DOUBLE)):double> |
| org.apache.spark.sql.catalyst.expressions.Atan2 | atan2 | SELECT atan2(0, 0) | struct<ATAN2(CAST(0 AS DOUBLE), CAST(0 AS DOUBLE)):double> |
| org.apache.spark.sql.catalyst.expressions.Atanh | atanh | SELECT atanh(0) | struct<ATANH(CAST(0 AS DOUBLE)):double> |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
-- !query
select null, Null, nUll
-- !query schema
struct<NULL:null,NULL:null,NULL:null>
struct<NULL:unknown,NULL:unknown,NULL:unknown>
-- !query output
NULL NULL NULL

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ two 2
-- !query
select * from values ("one", null), ("two", null) as data(a, b)
-- !query schema
struct<a:string,b:null>
struct<a:string,b:unknown>
-- !query output
one NULL
two NULL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
-- !query
select null, Null, nUll
-- !query schema
struct<NULL:null,NULL:null,NULL:null>
struct<NULL:unknown,NULL:unknown,NULL:unknown>
-- !query output
NULL NULL NULL

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ select typeof(null)
-- !query schema
struct<typeof(NULL):string>
-- !query output
null
unknown


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,15 +308,15 @@ struct<1:int>
-- !query
select foo.* from (select null) as foo
-- !query schema
struct<NULL:null>
struct<NULL:unknown>
-- !query output
NULL


-- !query
select foo.* from (select 'xyzzy',1,null) as foo
-- !query schema
struct<xyzzy:string,1:int,NULL:null>
struct<xyzzy:string,1:int,NULL:unknown>
-- !query output
xyzzy 1 NULL

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
-- !query
SELECT ifnull(null, 'x'), ifnull('y', 'x'), ifnull(null, null)
-- !query schema
struct<ifnull(NULL, x):string,ifnull(y, x):string,ifnull(NULL, NULL):null>
struct<ifnull(NULL, x):string,ifnull(y, x):string,ifnull(NULL, NULL):unknown>
-- !query output
x y NULL

Expand All @@ -21,15 +21,15 @@ NULL x
-- !query
SELECT nvl(null, 'x'), nvl('y', 'x'), nvl(null, null)
-- !query schema
struct<nvl(NULL, x):string,nvl(y, x):string,nvl(NULL, NULL):null>
struct<nvl(NULL, x):string,nvl(y, x):string,nvl(NULL, NULL):unknown>
-- !query output
x y NULL


-- !query
SELECT nvl2(null, 'x', 'y'), nvl2('n', 'x', 'y'), nvl2(null, null, null)
-- !query schema
struct<nvl2(NULL, x, y):string,nvl2(n, x, y):string,nvl2(NULL, NULL, NULL):null>
struct<nvl2(NULL, x, y):string,nvl2(n, x, y):string,nvl2(NULL, NULL, NULL):unknown>
-- !query output
y x NULL

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ two 2
-- !query
select udf(a), b from values ("one", null), ("two", null) as data(a, b)
-- !query schema
struct<CAST(udf(cast(a as string)) AS STRING):string,b:null>
struct<CAST(udf(cast(a as string)) AS STRING):string,b:unknown>
-- !query output
one NULL
two NULL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ class FileBasedDataSourceSuite extends QueryTest
""
}
def errorMessage(format: String): String = {
s"$format data source does not support null data type."
s"$format data source does not support unknown data type."
}
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
withTempDir { dir =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.CreateTable
Expand Down Expand Up @@ -225,6 +226,8 @@ case class RelationConversions(
isConvertible(tableDesc) && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
// validation is required to be done here before relation conversion.
DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema))
// This is for CREATE TABLE .. STORED AS PARQUET/ORC AS SELECT null
assertNoNullTypeInSchema(query.schema)
OptimizedCreateHiveTableAsSelectCommand(
tableDesc, query, query.output.map(_.name), mode)
}
Expand Down
Loading