Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,9 @@ object TypeCoercion {
case (NullType, StringType) => Some(StringType)

// Cast to TimestampType when we compare DateType with TimestampType
// if conf.compareDateTimestampInTimestamp is true
// i.e. TimeStamp('2017-03-01 00:00:00') eq Date('2017-03-01') = true
case (TimestampType, DateType)
=> if (conf.compareDateTimestampInTimestamp) Some(TimestampType) else Some(StringType)
case (DateType, TimestampType)
=> if (conf.compareDateTimestampInTimestamp) Some(TimestampType) else Some(StringType)
case (TimestampType, DateType) => Some(TimestampType)
case (DateType, TimestampType) => Some(TimestampType)

// There is no proper decimal type we can pick,
// using double type is the best we can do.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,7 @@ class SessionCatalog(

def validateTableLocation(table: CatalogTable): Unit = {
// SPARK-19724: the default location of a managed table should be non-existent or empty.
if (table.tableType == CatalogTableType.MANAGED &&
!conf.allowCreatingManagedTableUsingNonemptyLocation) {
if (table.tableType == CatalogTableType.MANAGED) {
val tableLocation =
new Path(table.storage.locationUri.getOrElse(defaultTablePath(table.identifier)))
val fs = tableLocation.getFileSystem(hadoopConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,12 +515,10 @@ case class JsonToStructs(
timeZoneId: Option[String] = None)
extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes {

val forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA)

// The JSON input data might be missing certain fields. We force the nullability
// of the user-provided schema to avoid data corruptions. In particular, the parquet-mr encoder
// can generate incorrect files if values are missing in columns declared as non-nullable.
val nullableSchema = if (forceNullableSchema) schema.asNullable else schema
val nullableSchema = schema.asNullable

override def nullable: Boolean = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,14 +720,6 @@ object SQLConf {
.stringConf
.createWithDefault("_corrupt_record")

val FROM_JSON_FORCE_NULLABLE_SCHEMA = buildConf("spark.sql.fromJsonForceNullableSchema")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we throw an exception if users try to set the removed conf to a value that is different from the default?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we could throw an exception for 3 configs. I am just wondering why we silently ignore non-existed SQL configs:

scala> spark.conf.set("spark.sql.abc", 1)

How about throwing AnalysisException for not existed SQL configs that have the spark.sql prefix but don't present in

private[sql] val sqlConfEntries = java.util.Collections.synchronizedMap(
?

or there are SQL configs that we have to bypass for some reasons?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the PR #27057

.internal()
.doc("When true, force the output schema of the from_json() function to be nullable " +
"(including all the fields). Otherwise, the schema might not be compatible with" +
"actual data, which leads to corruptions. This config will be removed in Spark 3.0.")
.booleanConf
.createWithDefault(true)

val BROADCAST_TIMEOUT = buildConf("spark.sql.broadcastTimeout")
.doc("Timeout in seconds for the broadcast wait time in broadcast joins.")
.timeConf(TimeUnit.SECONDS)
Expand Down Expand Up @@ -1687,14 +1679,6 @@ object SQLConf {
"the SQL parser.")
.fallbackConf(ANSI_ENABLED)

val ALLOW_CREATING_MANAGED_TABLE_USING_NONEMPTY_LOCATION =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this explicitly to be removed in 3.0? doesn't say so in the doc but it may have been otherwise documented or well understood.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is mentioned in the SQL migration guide:

- Since Spark 2.4, creating a managed table with nonempty location is not allowed. An exception is thrown when attempting to create a managed table with nonempty location. To set `true` to `spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation` restores the previous behavior. This option will be removed in Spark 3.0.

buildConf("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation")
.internal()
.doc("When this option is set to true, creating managed tables with nonempty location " +
"is allowed. Otherwise, an analysis exception is thrown. ")
.booleanConf
.createWithDefault(false)

val VALIDATE_PARTITION_COLUMNS =
buildConf("spark.sql.sources.validatePartitionColumns")
.internal()
Expand Down Expand Up @@ -1913,16 +1897,6 @@ object SQLConf {
.checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
.createWithDefault(Deflater.DEFAULT_COMPRESSION)

val COMPARE_DATE_TIMESTAMP_IN_TIMESTAMP =
buildConf("spark.sql.legacy.compareDateTimestampInTimestamp")
.internal()
.doc("When true (default), compare Date with Timestamp after converting both sides to " +
"Timestamp. This behavior is compatible with Hive 2.2 or later. See HIVE-15236. " +
"When false, restore the behavior prior to Spark 2.4. Compare Date with Timestamp after " +
"converting both sides to string. This config will be removed in Spark 3.0.")
.booleanConf
.createWithDefault(true)

val LEGACY_SIZE_OF_NULL = buildConf("spark.sql.legacy.sizeOfNull")
.doc("If it is set to true, size of null returns -1. This behavior was inherited from Hive. " +
"The size function returns null for null input if the flag is disabled.")
Expand Down Expand Up @@ -2236,8 +2210,6 @@ class SQLConf extends Serializable with Logging {
def caseSensitiveInferenceMode: HiveCaseSensitiveInferenceMode.Value =
HiveCaseSensitiveInferenceMode.withName(getConf(HIVE_CASE_SENSITIVE_INFERENCE))

def compareDateTimestampInTimestamp : Boolean = getConf(COMPARE_DATE_TIMESTAMP_IN_TIMESTAMP)

def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)

def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)
Expand Down Expand Up @@ -2516,9 +2488,6 @@ class SQLConf extends Serializable with Logging {

def eltOutputAsString: Boolean = getConf(ELT_OUTPUT_AS_STRING)

def allowCreatingManagedTableUsingNonemptyLocation: Boolean =
getConf(ALLOW_CREATING_MANAGED_TABLE_USING_NONEMPTY_LOCATION)

def validatePartitionColumns: Boolean = getConf(VALIDATE_PARTITION_COLUMNS)

def partitionOverwriteMode: PartitionOverwriteMode.Value =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1526,26 +1526,15 @@ class TypeCoercionSuite extends AnalysisTest {
GreaterThan(Literal("1.5"), Literal(BigDecimal("0.5"))),
GreaterThan(Cast(Literal("1.5"), DoubleType), Cast(Literal(BigDecimal("0.5")),
DoubleType)))
Seq(true, false).foreach { convertToTS =>
withSQLConf(
SQLConf.COMPARE_DATE_TIMESTAMP_IN_TIMESTAMP.key -> convertToTS.toString) {
val date0301 = Literal(java.sql.Date.valueOf("2017-03-01"))
val timestamp0301000000 = Literal(Timestamp.valueOf("2017-03-01 00:00:00"))
val timestamp0301000001 = Literal(Timestamp.valueOf("2017-03-01 00:00:01"))
if (convertToTS) {
// `Date` should be treated as timestamp at 00:00:00 See SPARK-23549
ruleTest(rule, EqualTo(date0301, timestamp0301000000),
EqualTo(Cast(date0301, TimestampType), timestamp0301000000))
ruleTest(rule, LessThan(date0301, timestamp0301000001),
LessThan(Cast(date0301, TimestampType), timestamp0301000001))
} else {
ruleTest(rule, LessThan(date0301, timestamp0301000000),
LessThan(Cast(date0301, StringType), Cast(timestamp0301000000, StringType)))
ruleTest(rule, LessThan(date0301, timestamp0301000001),
LessThan(Cast(date0301, StringType), Cast(timestamp0301000001, StringType)))
}
}
}
// Checks that dates/timestamps are not promoted to strings
val date0301 = Literal(java.sql.Date.valueOf("2017-03-01"))
val timestamp0301000000 = Literal(Timestamp.valueOf("2017-03-01 00:00:00"))
val timestamp0301000001 = Literal(Timestamp.valueOf("2017-03-01 00:00:01"))
// `Date` should be treated as timestamp at 00:00:00 See SPARK-23549
ruleTest(rule, EqualTo(date0301, timestamp0301000000),
EqualTo(Cast(date0301, TimestampType), timestamp0301000000))
ruleTest(rule, LessThan(date0301, timestamp0301000001),
LessThan(Cast(date0301, TimestampType), timestamp0301000001))
}

test("cast WindowFrame boundaries to the type they operate upon") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,26 +702,22 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
}

test("from_json missing fields") {
for (forceJsonNullableSchema <- Seq(false, true)) {
withSQLConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA.key -> forceJsonNullableSchema.toString) {
val input =
"""{
| "a": 1,
| "c": "foo"
|}
|""".stripMargin
val jsonSchema = new StructType()
.add("a", LongType, nullable = false)
.add("b", StringType, nullable = !forceJsonNullableSchema)
.add("c", StringType, nullable = false)
val output = InternalRow(1L, null, UTF8String.fromString("foo"))
val expr = JsonToStructs(jsonSchema, Map.empty, Literal.create(input, StringType), gmtId)
checkEvaluation(expr, output)
val schema = expr.dataType
val schemaToCompare = if (forceJsonNullableSchema) jsonSchema.asNullable else jsonSchema
assert(schemaToCompare == schema)
}
}
val input =
"""{
| "a": 1,
| "c": "foo"
|}
|""".stripMargin
val jsonSchema = new StructType()
.add("a", LongType, nullable = false)
.add("b", StringType, nullable = false)
.add("c", StringType, nullable = false)
val output = InternalRow(1L, null, UTF8String.fromString("foo"))
val expr = JsonToStructs(jsonSchema, Map.empty, Literal.create(input, StringType), gmtId)
checkEvaluation(expr, output)
val schema = expr.dataType
val schemaToCompare = jsonSchema.asNullable
assert(schemaToCompare == schema)
}

test("SPARK-24709: infer schema of json strings") {
Expand Down