Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -2959,6 +2959,11 @@
"Unsupported dtype: <invalidValue>. Valid values: float64, float32."
]
},
"EXTENSION" : {
"message" : [
"Invalid extension: <invalidValue>. Extension is limited to exactly 3 letters (e.g. csv, tsv, etc...)"
]
},
"INTEGER" : {
"message" : [
"expects an integer literal, but got <invalidValue>."
Expand Down
6 changes: 6 additions & 0 deletions docs/sql-data-sources-csv.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ Data source options of CSV can be set via:
<td>Sets a separator for each field and value. This separator can be one or more characters.</td>
<td>read/write</td>
</tr>
<tr>
<td><code>extension</code></td>
<td>csv</td>
<td>Sets the file extension for the output files. Limited to letters. Length must equal 3.</td>
<td>write</td>
</tr>
<tr>
<td><code>encoding</code><br><code>charset</code></td>
<td>UTF-8</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,16 @@ class CSVOptions(

val delimiter = CSVExprUtils.toDelimiterStr(
parameters.getOrElse(SEP, parameters.getOrElse(DELIMITER, ",")))

val extension = {
val ext = parameters.getOrElse(EXTENSION, "csv")
if (ext.size != 3 && !ext.forall(_.isLetter)) {
throw QueryExecutionErrors.invalidFileExtensionError(EXTENSION, ext)
}

ext
}

val parseMode: ParseMode =
parameters.get(MODE).map(ParseMode.fromString).getOrElse(PermissiveMode)
val charset = parameters.get(ENCODING).orElse(parameters.get(CHARSET))
Expand Down Expand Up @@ -385,6 +395,7 @@ object CSVOptions extends DataSourceOptions {
val NEGATIVE_INF = newOption("negativeInf")
val TIME_ZONE = newOption("timeZone")
val UNESCAPED_QUOTE_HANDLING = newOption("unescapedQuoteHandling")
val EXTENSION = newOption("extension")
// Options with alternative
val ENCODING = "encoding"
val CHARSET = "charset"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2786,6 +2786,16 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
Map.empty
)

def invalidFileExtensionError(functionName: String, extension: String): RuntimeException = {
new SparkIllegalArgumentException(
errorClass = "INVALID_PARAMETER_VALUE.EXTENSION",
messageParameters = Map(
"functionName" -> toSQLId(functionName),
"parameter" -> toSQLId("extension"),
"fileExtension" -> toSQLId(extension),
"acceptable" -> "Extension is limited to exactly 3 letters (e.g. csv, tsv, etc...)"))
}

def invalidCharsetError(functionName: String, charset: String): RuntimeException = {
new SparkIllegalArgumentException(
errorClass = "INVALID_PARAMETER_VALUE.CHARSET",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
}

override def getFileExtension(context: TaskAttemptContext): String = {
".csv" + CodecStreams.getCompressionExtension(context)
"." + csvOptions.extension + CodecStreams.getCompressionExtension(context)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ case class CSVWrite(
}

override def getFileExtension(context: TaskAttemptContext): String = {
".csv" + CodecStreams.getCompressionExtension(context)
"." + csvOptions.extension + CodecStreams.getCompressionExtension(context)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3078,6 +3078,23 @@ abstract class CSVSuite
}
}

test("SPARK-50616: We can write with a tsv file extension") {
withTempPath { path =>
val input = Seq(
"1423-11-12T23:41:00",
"1765-03-28",
"2016-01-28T20:00:00"
).toDF().repartition(1)
input.write.option("extension", "tsv").csv(path.getAbsolutePath)

val files = Files.list(path.toPath)
.iterator().asScala.map(x => x.getFileName.toString)
.toList.filter(x => x.takeRight(3).equals("tsv"))

assert(files.size == 1)
}
}

test("SPARK-39904: Parse incorrect timestamp values") {
withTempPath { path =>
Seq(
Expand Down Expand Up @@ -3308,7 +3325,7 @@ abstract class CSVSuite
}

test("SPARK-40667: validate CSV Options") {
assert(CSVOptions.getAllOptions.size == 39)
assert(CSVOptions.getAllOptions.size == 40)
// Please add validation on any new CSV options here
assert(CSVOptions.isValidOption("header"))
assert(CSVOptions.isValidOption("inferSchema"))
Expand Down Expand Up @@ -3347,6 +3364,7 @@ abstract class CSVSuite
assert(CSVOptions.isValidOption("compression"))
assert(CSVOptions.isValidOption("codec"))
assert(CSVOptions.isValidOption("sep"))
assert(CSVOptions.isValidOption("extension"))
assert(CSVOptions.isValidOption("delimiter"))
assert(CSVOptions.isValidOption("columnPruning"))
// Please add validation on any new parquet options with alternative here
Expand Down
Loading