Skip to content
Closed
12 changes: 9 additions & 3 deletions docs/sql-data-sources-csv.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ license: |
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---

Spark SQL provides `spark.read().csv("file_name")` to read a file or directory of files in CSV format into Spark DataFrame, and `dataframe.write().csv("path")` to write to a CSV file. Function `option()` can be used to customize the behavior of reading or writing, such as controlling behavior of the header, delimiter character, character set, and so on.
Spark SQL provides `spark.read().csv("file_name")` to read a file or directory of files in CSV format into Spark DataFrame, and `dataframe.write().csv("path")` to write to a CSV file. Function `option()` can be used to customize the behavior of reading or writing, such as controlling behavior of the header, delimiter character, character set, and so on.

<div class="codetabs">

Expand Down Expand Up @@ -162,6 +162,12 @@ Data source options of CSV can be set via:
<td>Sets the string that indicates a timestamp format. Custom date formats follow the formats at <a href="https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html">Datetime Patterns</a>. This applies to timestamp type.</td>
<td>read/write</td>
</tr>
<tr>
<td><code>timestampNTZFormat</code></td>
<td>yyyy-MM-dd'T'HH:mm:ss[.SSS]</td>
<td>Sets the string that indicates a timestamp without timezone format. Custom date formats follow the formats at <a href="https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html">Datetime Patterns</a>. This applies to timestamp without timezone type, note that zone-offset and time-zone components are not supported when writing or reading this data type.</td>
<td>read/write</td>
</tr>
<tr>
<td><code>maxColumns</code></td>
<td>20480</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.catalyst.util.TimestampFormatter
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

class CSVInferSchema(val options: CSVOptions) extends Serializable {
Expand All @@ -38,6 +39,13 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
legacyFormat = FAST_DATE_FORMAT,
isParsing = true)

private val timestampNTZFormatter = TimestampFormatter(
options.timestampNTZFormatInRead,
options.zoneId,
legacyFormat = FAST_DATE_FORMAT,
isParsing = true,
forTimestampNTZ = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part I'd defer to @MaxGekk to review.


private val decimalParser = if (options.locale == Locale.US) {
// Special handling the default locale for backward compatibility
s: String => new java.math.BigDecimal(s)
Expand Down Expand Up @@ -109,6 +117,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
case LongType => tryParseLong(field)
case _: DecimalType => tryParseDecimal(field)
case DoubleType => tryParseDouble(field)
case TimestampNTZType => tryParseTimestampNTZ(field)
case TimestampType => tryParseTimestamp(field)
case BooleanType => tryParseBoolean(field)
case StringType => StringType
Expand Down Expand Up @@ -160,6 +169,17 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
private def tryParseDouble(field: String): DataType = {
if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field)) {
DoubleType
} else {
tryParseTimestampNTZ(field)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I supported timestamp inference in JSON #23201, I had to introduce new option in #23455, and disable the inference by default in #28966 because such inference slowed down user queries. If you are sure that is not the case, please, extend CSV benchmarks to proof the performance doesn't degrade.

Copy link
Contributor Author

@sadikovi sadikovi Nov 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not working on JSON just yet, this is the CSV data source.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I showed JSON as an example because I guess CSV has similar issue.

}
}

private def tryParseTimestampNTZ(field: String): DataType = {
// We can only parse the value as TimestampNTZType if it does not have zone-offset or
// time-zone component and can be parsed with the timestamp formatter.
// Otherwise, it is likely to be a timestamp with timezone.
if ((allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field, true)).isDefined) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should maybe we skip the parsing if SQLConf.get.timestampType is set to TIMESTAMP_LTZ since parsing is non-trivial op?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate a bit more? Thanks.

My understanding was that the config indicated whether the output of parsing should be treated as TimestampNTZ or TimestampLTZ.

SQLConf.get.timestampType
} else {
tryParseTimestamp(field)
}
Expand Down Expand Up @@ -225,6 +245,10 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
} else {
Some(DecimalType(range + scale, scale))
}

case (TimestampNTZType, TimestampType) | (TimestampType, TimestampNTZType) =>
Some(TimestampType)

case _ => None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ class CSVOptions(
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
})

val timestampNTZFormatInRead: Option[String] = parameters.get("timestampNTZFormat")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do timestampNTZFormatInRead and timestampNTZFormatInWrite have different behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you mean? Actually, it is the same config but reads require an Option[String] and write require String (similar to timestampFormatInRead and timestampFormatInWrite). I can update to keep only one property timestampNTZFormat instead. Would that work?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are not going to respect the conf SQLConf.get.legacyTimeParserPolicy here, I think we can make it one property

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang I had to revert to using timestampNTZFormatInRead and timestampNTZFormatInWrite - one of the tests verifies that timestampsNTZ are returned in a certain format. You can review 043edb6.

val timestampNTZFormatInWrite: String = parameters.getOrElse("timestampNTZFormat",
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what's the reason to have the optional field [.SSS] in write. How should CSV writer decide whether to write milliseconds or not?

Another question, why the precision is in milliseconds but not in microseconds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the same reason as for timestampFormat above, I just copy-pasted it for timestampNTZFormat and removed the zone component.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the same reason as for timestampFormat above ...

The option was added when Spark's timestamp type had milliseconds precision. Now the precision is microsecond, and don't see any reasons to loose info in write, by default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can change it for both LTZ and NTZ, if the precision lose here is a problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather change it separately, sounds more like a general problem with timestamps.


val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)

val maxColumns = getInt("maxColumns", 20480)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class UnivocityGenerator(
legacyFormat = FAST_DATE_FORMAT,
isParsing = false)
private val timestampNTZFormatter = TimestampFormatter(
options.timestampFormatInWrite,
options.timestampNTZFormatInWrite,
options.zoneId,
legacyFormat = FAST_DATE_FORMAT,
isParsing = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class UnivocityParser(
legacyFormat = FAST_DATE_FORMAT,
isParsing = true)
private lazy val timestampNTZFormatter = TimestampFormatter(
options.timestampFormatInRead,
options.timestampNTZFormatInRead,
options.zoneId,
legacyFormat = FAST_DATE_FORMAT,
isParsing = true,
Expand Down Expand Up @@ -204,7 +204,7 @@ class UnivocityParser(

case _: TimestampNTZType => (d: String) =>
nullSafeDatum(d, name, nullable, options) { datum =>
timestampNTZFormatter.parseWithoutTimeZone(datum)
timestampNTZFormatter.parseWithoutTimeZone(datum, true)
}

case _: DateType => (d: String) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,17 +442,22 @@ object DateTimeUtils {

/**
* Trims and parses a given UTF8 string to a corresponding [[Long]] value which representing the
* number of microseconds since the epoch. The result is independent of time zones,
* which means that zone ID in the input string will be ignored.
* number of microseconds since the epoch. The result will be independent of time zones.
*
* If the input string contains a component associated with time zone, the method will return
* `None` if `failOnError` is set to `true`. If `failOnError` is set to `false`, the method
* will simply discard the time zone component. Enable the check to detect situations like parsing
* a timestamp with time zone as TimestampNTZType.
*
* The return type is [[Option]] in order to distinguish between 0L and null. Please
* refer to `parseTimestampString` for the allowed formats.
*/
def stringToTimestampWithoutTimeZone(s: UTF8String): Option[Long] = {
def stringToTimestampWithoutTimeZone(s: UTF8String, failOnError: Boolean): Option[Long] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name failOnError is misleading, as this method never fail. how about allowTimeZone?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Respectfully, I have changed this exact code and variable 3 times by now. Yes, I will update 😞.

try {
val (segments, _, justTime) = parseTimestampString(s)
// If the input string can't be parsed as a timestamp, or it contains only the time part of a
// timestamp and we can't determine its date, return None.
if (segments.isEmpty || justTime) {
val (segments, zoneIdOpt, justTime) = parseTimestampString(s)
// If the input string can't be parsed as a timestamp without time zone, or it contains only
// the time part of a timestamp and we can't determine its date, return None.
if (segments.isEmpty || justTime || failOnError && zoneIdOpt.isDefined) {
return None
}
val nanoseconds = MICROSECONDS.toNanos(segments(6))
Expand All @@ -465,8 +470,19 @@ object DateTimeUtils {
}
}

/**
* Trims and parses a given UTF8 string to a corresponding [[Long]] value which representing the
* number of microseconds since the epoch. The result is independent of time zones. Zone id
* component will be discarded and ignored.
* The return type is [[Option]] in order to distinguish between 0L and null. Please
* refer to `parseTimestampString` for the allowed formats.
*/
def stringToTimestampWithoutTimeZone(s: UTF8String): Option[Long] = {
stringToTimestampWithoutTimeZone(s, false)
Copy link
Contributor Author

@sadikovi sadikovi Nov 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used false to preserve the original behaviour in this method and the one below.

}

def stringToTimestampWithoutTimeZoneAnsi(s: UTF8String): Long = {
stringToTimestampWithoutTimeZone(s).getOrElse {
stringToTimestampWithoutTimeZone(s, false).getOrElse {
throw QueryExecutionErrors.cannotCastToDateTimeError(s, TimestampNTZType)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.{LegacyDateFormat, LENIENT_SIMPLE_DATE_FORMAT}
import org.apache.spark.sql.catalyst.util.RebaseDateTime._
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
import org.apache.spark.sql.types.Decimal
import org.apache.spark.sql.types.{Decimal, TimestampNTZType}
import org.apache.spark.unsafe.types.UTF8String

sealed trait TimestampFormatter extends Serializable {
Expand All @@ -55,6 +56,7 @@ sealed trait TimestampFormatter extends Serializable {
* Parses a timestamp in a string and converts it to microseconds since Unix Epoch in local time.
*
* @param s - string with timestamp to parse
* @param failOnError - indicates strict parsing of timezone
* @return microseconds since epoch.
* @throws ParseException can be thrown by legacy parser
* @throws DateTimeParseException can be thrown by new parser
Expand All @@ -66,10 +68,23 @@ sealed trait TimestampFormatter extends Serializable {
@throws(classOf[DateTimeParseException])
@throws(classOf[DateTimeException])
@throws(classOf[IllegalStateException])
def parseWithoutTimeZone(s: String): Long =
def parseWithoutTimeZone(s: String, failOnError: Boolean): Long =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, allowTimeZone?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another question is, instead of adding a new parameter, can the caller side pick one of parse and parseWithoutTimeZone?

Copy link
Contributor Author

@sadikovi sadikovi Dec 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure we can do it, because parseWithoutTimeZone method can be used with different options depending on the context, and it is, in fact. Unfortunately, this is the artifact of the problem that we discussed earlier - some functions are not supposed to fail while parsing the value and others are.

throw new IllegalStateException(
s"The method `parseWithoutTimeZone(s: String)` should be implemented in the formatter " +
"of timestamp without time zone")
s"The method `parseWithoutTimeZone(s: String, failOnError: Boolean)` should be " +
"implemented in the formatter of timestamp without time zone")

/**
* Parses a timestamp in a string and converts it to microseconds since Unix Epoch in local time.
* Zone-id and zone-offset components are ignored.
*/
@throws(classOf[ParseException])
@throws(classOf[DateTimeParseException])
@throws(classOf[DateTimeException])
@throws(classOf[IllegalStateException])
final def parseWithoutTimeZone(s: String): Long =
// This is implemented to adhere to the original behaviour of `parseWithoutTimeZone` where we
// did not fail if timestamp contained zone-id or zone-offset component and instead ignored it.
parseWithoutTimeZone(s, false)

def format(us: Long): String
def format(ts: Timestamp): String
Expand Down Expand Up @@ -118,9 +133,12 @@ class Iso8601TimestampFormatter(
} catch checkParsedDiff(s, legacyFormatter.parse)
}

override def parseWithoutTimeZone(s: String): Long = {
override def parseWithoutTimeZone(s: String, failOnError: Boolean): Long = {
try {
val parsed = formatter.parse(s)
if (failOnError && parsed.query(TemporalQueries.zone()) != null) {
throw QueryExecutionErrors.cannotParseStringAsDataTypeError(pattern, s, TimestampNTZType)
}
val localDate = toLocalDate(parsed)
val localTime = toLocalTime(parsed)
DateTimeUtils.localDateTimeToMicros(LocalDateTime.of(localDate, localTime))
Expand Down Expand Up @@ -186,9 +204,13 @@ class DefaultTimestampFormatter(
} catch checkParsedDiff(s, legacyFormatter.parse)
}

override def parseWithoutTimeZone(s: String): Long = {
override def parseWithoutTimeZone(s: String, failOnError: Boolean): Long = {
try {
DateTimeUtils.stringToTimestampWithoutTimeZoneAnsi(UTF8String.fromString(s))
val utf8Value = UTF8String.fromString(s)
DateTimeUtils.stringToTimestampWithoutTimeZone(utf8Value, failOnError).getOrElse {
throw QueryExecutionErrors.cannotParseStringAsDataTypeError(
TimestampFormatter.defaultPattern(), s, TimestampNTZType)
}
} catch checkParsedDiff(s, legacyFormatter.parse)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,13 @@ object QueryExecutionErrors {
s"[$token] as target spark data type [$dataType].")
}

def cannotParseStringAsDataTypeError(pattern: String, value: String, dataType: DataType)
: Throwable = {
new RuntimeException(
s"Cannot parse field value ${value} for pattern ${pattern} " +
s"as target spark data type [$dataType].")
}

def failToParseEmptyStringForDataTypeError(dataType: DataType): Throwable = {
new RuntimeException(
s"Failed to parse an empty string for data type ${dataType.catalogString}")
Expand Down Expand Up @@ -1890,4 +1897,3 @@ object QueryExecutionErrors {
new UnsupportedOperationException(s"Hive table $tableName with ANSI intervals is not supported")
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,18 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
checkStringToTimestamp("2021-01-01T12:30:4294967297+4294967297:30", None)
}

test("SPARK-37326: stringToTimestampWithoutTimeZone with failOnError") {
assert(
stringToTimestampWithoutTimeZone(
UTF8String.fromString("2021-11-22 10:54:27 +08:00"), false) ==
Some(DateTimeUtils.localDateTimeToMicros(LocalDateTime.of(2021, 11, 22, 10, 54, 27))))

assert(
stringToTimestampWithoutTimeZone(
UTF8String.fromString("2021-11-22 10:54:27 +08:00"), true) ==
None)
}

test("SPARK-15379: special invalid date string") {
// Test stringToDate
assert(toDate("2015-02-29 00:00:00").isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,4 +368,15 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
.selectExpr("value.a")
checkAnswer(fromCsvDF, Row(localDT))
}

test("SPARK-37326: Handle incorrectly formatted timestamp_ntz values in from_csv") {
val fromCsvDF = Seq("2021-08-12T15:16:23.000+11:00").toDF("csv")
.select(
from_csv(
$"csv",
StructType(StructField("a", TimestampNTZType) :: Nil),
Map.empty[String, String]) as "value")
.selectExpr("value.a")
checkAnswer(fromCsvDF, Row(null))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about positive test for the functions from_csv/to_csv and schema_of_csv?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a test right above that verifies the happy path for those functions. I only added a new test because it was missing from the original patch.

}
}
Loading