Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.catalyst.util.TimestampFormatter
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

class CSVInferSchema(val options: CSVOptions) extends Serializable {
Expand All @@ -38,6 +39,13 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
legacyFormat = FAST_DATE_FORMAT,
isParsing = true)

private val timestampNTZFormatter = TimestampFormatter(
options.timestampFormatInRead,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should fail earlier if the pattern string contains timezone? It doesn't make sense to have timezone in NTZ string.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moreover, maybe we should add a new CSV option to define pattern for timestamp ntz.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

options.zoneId,
legacyFormat = FAST_DATE_FORMAT,
isParsing = true,
forTimestampNTZ = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part I'd defer to @MaxGekk to review.


private val decimalParser = if (options.locale == Locale.US) {
// Special handling the default locale for backward compatibility
s: String => new java.math.BigDecimal(s)
Expand Down Expand Up @@ -109,6 +117,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
case LongType => tryParseLong(field)
case _: DecimalType => tryParseDecimal(field)
case DoubleType => tryParseDouble(field)
case TimestampNTZType => tryParseTimestampNTZ(field)
case TimestampType => tryParseTimestamp(field)
case BooleanType => tryParseBoolean(field)
case StringType => StringType
Expand Down Expand Up @@ -160,6 +169,15 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
private def tryParseDouble(field: String): DataType = {
if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field)) {
DoubleType
} else {
tryParseTimestampNTZ(field)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I supported timestamp inference in JSON #23201, I had to introduce new option in #23455, and disable the inference by default in #28966 because such inference slowed down user queries. If you are sure that is not the case, please, extend CSV benchmarks to proof the performance doesn't degrade.

Copy link
Contributor Author

@sadikovi sadikovi Nov 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not working on JSON just yet, this is the CSV data source.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I showed JSON as an example because I guess CSV has similar issue.

}
}

private def tryParseTimestampNTZ(field: String): DataType = {
if ((allCatch opt !timestampNTZFormatter.isTimeZoneSet(field)).getOrElse(false) &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's add one-line comment here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added, thanks.

(allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field)).isDefined) {
SQLConf.get.timestampType
} else {
tryParseTimestamp(field)
}
Expand Down Expand Up @@ -225,6 +243,10 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
} else {
Some(DecimalType(range + scale, scale))
}

case (TimestampNTZType, TimestampType) | (TimestampType, TimestampNTZType) =>
Some(TimestampType)

case _ => None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ sealed trait TimestampFormatter extends Serializable {
s"The method `parseWithoutTimeZone(s: String)` should be implemented in the formatter " +
"of timestamp without time zone")

/**
* Returns true if the parsed timestamp contains the time zone component, false otherwise.
* Used to determine if the timestamp can be inferred as timestamp without time zone.
*
* @param s - string with timestamp to inspect
* @return whether the timestamp string has the time zone component defined.
*/
@throws(classOf[IllegalStateException])
def isTimeZoneSet(s: String): Boolean =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API looks a bit weird. How about we throw exception in parseWithoutTimeZone if the input string has timezone?

BTW is it possible to statically fail the ntz formatter creating if the pattern string contains timezone? Then we can fail earlier before processing data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can update.

IMHO, it is tricky to check this statically as there does not seem to be a way of checking pattern components and we need an actual value to validate whether or not it has a zone offset.

throw new IllegalStateException(
s"The method `isTimeZoneSet(s: String)` should be implemented in the formatter " +
"of timestamp without time zone")

def format(us: Long): String
def format(ts: Timestamp): String
def format(instant: Instant): String
Expand Down Expand Up @@ -127,6 +140,14 @@ class Iso8601TimestampFormatter(
} catch checkParsedDiff(s, legacyFormatter.parse)
}

override def isTimeZoneSet(s: String): Boolean = {
try {
val parsed = formatter.parse(s)
val parsedZoneId = parsed.query(TemporalQueries.zone())
parsedZoneId != null
} catch checkParsedDiff(s, legacyFormatter.isTimeZoneSet)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}

override def format(instant: Instant): String = {
try {
formatter.withZone(zoneId).format(instant)
Expand All @@ -144,7 +165,13 @@ class Iso8601TimestampFormatter(
}

override def format(localDateTime: LocalDateTime): String = {
localDateTime.format(formatter)
// If the legacy time parser policy is selected, we can only write timestamp with timezone,
// we will use the default time zone for it.
if (SQLConf.get.legacyTimeParserPolicy == LEGACY) {
format(toJavaTimestamp(instantToMicros(localDateTime.atZone(zoneId).toInstant)))
} else {
localDateTime.format(formatter)
}
}

override def validatePatternString(checkLegacy: Boolean): Unit = {
Expand Down Expand Up @@ -191,6 +218,13 @@ class DefaultTimestampFormatter(
DateTimeUtils.stringToTimestampWithoutTimeZoneAnsi(UTF8String.fromString(s))
} catch checkParsedDiff(s, legacyFormatter.parse)
}

override def isTimeZoneSet(s: String): Boolean = {
try {
val (_, zoneIdOpt, _) = parseTimestampString(UTF8String.fromString(s))
zoneIdOpt.isDefined
} catch checkParsedDiff(s, legacyFormatter.isTimeZoneSet)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,130 @@ abstract class CSVSuite
}
}

test("SPARK-37326: Write TIMESTAMP_NTZ in legacy time parser policy") {
withTempDir { dir =>
val path = s"${dir.getCanonicalPath}/csv"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just write directly to dir

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I could not because the directory already exists by the time the method is called. withTempDir creates a directory, but it needs to be non-existent for Spark to write into.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is withTempPath which doesn't create any dirs:

/**
* Generates a temporary path without creating the actual file/directory, then pass it to `f`. If
* a file/directory is created there by `f`, it will be delete after `f` returns.
*/
protected def withTempPath(f: File => Unit): Unit = {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to use withTempPath

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems there is a mixture of withTempDir and withTempPath in this file arguably being used with the same purpose. I will open a PR to fix the tests I added but I am not going to update other occurrences.


val exp = spark.sql("select timestamp_ltz'2020-12-12 12:12:12' as col1").coalesce(1)

withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") {
exp.write.format("csv")
.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
.option("header", "true")
.save(path)
}

val res = spark.read
.format("csv")
.option("inferSchema", "true")
.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
.option("header", "true")
.load(path)

checkAnswer(res, exp)
}
}

test("SPARK-37326: Roundtrip in reading and writing TIMESTAMP_NTZ values with custom schema") {
withTempDir { dir =>
val path = s"${dir.getCanonicalPath}/csv"

val exp = spark.sql("""
select
timestamp_ntz'2020-12-12 12:12:12' as col1,
timestamp_ltz'2020-12-12 12:12:12' as col2
""")

exp.write.format("csv").option("header", "true").save(path)

val res = spark.read
.format("csv")
.schema("col1 TIMESTAMP_NTZ, col2 TIMESTAMP_LTZ")
.option("header", "true")
.load(path)

checkAnswer(res, exp)
}
}

test("SPARK-37326: Timestamp type inference for a column with TIMESTAMP_NTZ values") {
withTempDir { dir =>
val path = s"${dir.getCanonicalPath}/csv"

val exp = spark.sql("""
select timestamp_ntz'2020-12-12 12:12:12' as col0 union all
select timestamp_ntz'2020-12-12 12:12:12' as col0
""")

exp.write.format("csv").option("header", "true").save(path)

val timestampTypes = Seq(
SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString,
SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString)

for (timestampType <- timestampTypes) {
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) {
val res = spark.read
.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load(path)

if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString &&
spark.conf.get(SQLConf.LEGACY_TIME_PARSER_POLICY.key) != "legacy") {
checkAnswer(res, exp)
} else {
// Timestamps are written as timestamp with timezone in the legacy mode.
checkAnswer(
res,
spark.sql("""
select timestamp_ltz'2020-12-12 12:12:12' as col0 union all
select timestamp_ltz'2020-12-12 12:12:12' as col0
""")
)
}
}
}
}
}

test("SPARK-37326: Timestamp type inference for a mix of TIMESTAMP_NTZ and TIMESTAMP_LTZ") {
withTempDir { dir =>
val path = s"${dir.getCanonicalPath}/csv"

Seq(
"col0",
"2020-12-12T12:12:12.000",
"2020-12-12T17:12:12.000Z",
"2020-12-12T17:12:12.000+05:00",
"2020-12-12T12:12:12.000"
).toDF("data")
.coalesce(1)
.write.text(path)

val res = spark.read
.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load(path)

if (spark.conf.get(SQLConf.LEGACY_TIME_PARSER_POLICY.key) == "legacy") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we need a loop to set the config

// Timestamps without timezone are parsed as strings, so the col0 type would be StringType
// which is similar to reading without schema inference.
val exp = spark.read.format("csv").option("header", "true").load(path)
checkAnswer(res, exp)
} else {
val exp = spark.sql("""
select timestamp_ltz'2020-12-12T12:12:12.000' as col0 union all
select timestamp_ltz'2020-12-12T17:12:12.000Z' as col0 union all
select timestamp_ltz'2020-12-12T17:12:12.000+05:00' as col0 union all
select timestamp_ltz'2020-12-12T12:12:12.000' as col0
""")
checkAnswer(res, exp)
}
}
}

test("Write dates correctly with dateFormat option") {
val customSchema = new StructType(Array(StructField("date", DateType, true)))
withTempDir { dir =>
Expand Down