Skip to content

Commit 8b51800

Browse files
committed
Configurable empty values when reading/writing CSV files
1 parent c17a8ff commit 8b51800

File tree

8 files changed

+102
-21
lines changed

8 files changed

+102
-21
lines changed

python/pyspark/sql/readwriter.py

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -345,11 +345,11 @@ def text(self, paths, wholetext=False, lineSep=None):
345345
@since(2.0)
346346
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
347347
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
348-
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
349-
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
350-
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
351-
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
352-
samplingRatio=None, enforceSchema=None):
348+
ignoreTrailingWhiteSpace=None, nullValue=None, emptyValue=None, nanValue=None,
349+
positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None,
350+
maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None,
351+
mode=None, columnNameOfCorruptRecord=None, multiLine=None,
352+
charToEscapeQuoteEscaping=None, samplingRatio=None, enforceSchema=None):
353353
"""Loads a CSV file and returns the result as a :class:`DataFrame`.
354354
355355
This function will go through the input once to determine the input schema if
@@ -395,6 +395,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
395395
:param nullValue: sets the string representation of a null value. If None is set, it uses
396396
the default value, empty string. Since 2.0.1, this ``nullValue`` param
397397
applies to all supported types including the string type.
398+
:param emptyValue: sets the string representation of an empty value. If None is set, it uses
399+
the default value, empty string.
398400
:param nanValue: sets the string representation of a non-number value. If None is set, it
399401
uses the default value, ``NaN``.
400402
:param positiveInf: sets the string representation of a positive infinity value. If None
@@ -457,9 +459,9 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
457459
schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment,
458460
header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
459461
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
460-
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
461-
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
462-
maxCharsPerColumn=maxCharsPerColumn,
462+
emptyValue=emptyValue, nanValue=nanValue, positiveInf=positiveInf,
463+
negativeInf=negativeInf, dateFormat=dateFormat, timestampFormat=timestampFormat,
464+
maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
463465
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
464466
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
465467
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
@@ -857,9 +859,9 @@ def text(self, path, compression=None, lineSep=None):
857859

858860
@since(2.0)
859861
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
860-
header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
861-
timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None,
862-
charToEscapeQuoteEscaping=None, encoding=None):
862+
header=None, nullValue=None, emptyValue=None, escapeQuotes=None, quoteAll=None,
863+
dateFormat=None, timestampFormat=None, ignoreLeadingWhiteSpace=None,
864+
ignoreTrailingWhiteSpace=None, charToEscapeQuoteEscaping=None, encoding=None):
863865
"""Saves the content of the :class:`DataFrame` in CSV format at the specified path.
864866
865867
:param path: the path in any Hadoop supported file system
@@ -891,6 +893,8 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
891893
the default value, ``false``.
892894
:param nullValue: sets the string representation of a null value. If None is set, it uses
893895
the default value, empty string.
896+
:param emptyValue: sets the string representation of an empty value. If None is set, it uses
897+
the default value, ``""``.
894898
:param dateFormat: sets the string that indicates a date format. Custom date formats
895899
follow the formats at ``java.text.SimpleDateFormat``. This
896900
applies to date type. If None is set, it uses the
@@ -916,8 +920,8 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
916920
"""
917921
self.mode(mode)
918922
self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header,
919-
nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll,
920-
dateFormat=dateFormat, timestampFormat=timestampFormat,
923+
nullValue=nullValue, emptyValue=emptyValue, escapeQuotes=escapeQuotes,
924+
quoteAll=quoteAll, dateFormat=dateFormat, timestampFormat=timestampFormat,
921925
ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
922926
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace,
923927
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping,

python/pyspark/sql/streaming.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -560,9 +560,9 @@ def text(self, path, wholetext=False, lineSep=None):
560560
@since(2.0)
561561
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
562562
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
563-
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
564-
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
565-
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
563+
ignoreTrailingWhiteSpace=None, nullValue=None, emptyValue=None, nanValue=None,
564+
positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None,
565+
maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
566566
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
567567
enforceSchema=None):
568568
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
@@ -611,6 +611,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
611611
:param nullValue: sets the string representation of a null value. If None is set, it uses
612612
the default value, empty string. Since 2.0.1, this ``nullValue`` param
613613
applies to all supported types including the string type.
614+
:param emptyValue: sets the string representation of an empty value. If None is set, it uses
615+
the default value, empty string.
614616
:param nanValue: sets the string representation of a non-number value. If None is set, it
615617
uses the default value, ``NaN``.
616618
:param positiveInf: sets the string representation of a positive infinity value. If None
@@ -669,9 +671,9 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
669671
schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment,
670672
header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
671673
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue,
672-
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
673-
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
674-
maxCharsPerColumn=maxCharsPerColumn,
674+
emptyValue=emptyValue, nanValue=nanValue, positiveInf=positiveInf,
675+
negativeInf=negativeInf, dateFormat=dateFormat, timestampFormat=timestampFormat,
676+
maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn,
675677
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
676678
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
677679
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema)

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
571571
* whitespaces from values being read should be skipped.</li>
572572
* <li>`nullValue` (default empty string): sets the string representation of a null value. Since
573573
* 2.0.1, this applies to all supported types including the string type.</li>
574+
* <li>`emptyValue` (default empty string): sets the string representation of an empty value.</li>
574575
* <li>`nanValue` (default `NaN`): sets the string representation of a non-number" value.</li>
575576
* <li>`positiveInf` (default `Inf`): sets the string representation of a positive infinity
576577
* value.</li>

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -635,6 +635,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
635635
* enclosed in quotes. Default is to only escape values containing a quote character.</li>
636636
* <li>`header` (default `false`): writes the names of columns as the first line.</li>
637637
* <li>`nullValue` (default empty string): sets the string representation of a null value.</li>
638+
* <li>`emptyValue` (default `""`): sets the string representation of an empty value.</li>
638639
* <li>`encoding` (by default it is not set): specifies encoding (charset) of saved csv
639640
* files. If it is not set, the UTF-8 charset will be used.</li>
640641
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ class CSVOptions(
117117

118118
val nullValue = parameters.getOrElse("nullValue", "")
119119

120+
val emptyValueInRead = parameters.getOrElse("emptyValue", "")
121+
val emptyValueInWrite = parameters.getOrElse("emptyValue", "\"\"")
122+
120123
val nanValue = parameters.getOrElse("nanValue", "NaN")
121124

122125
val positiveInf = parameters.getOrElse("positiveInf", "Inf")
@@ -173,7 +176,7 @@ class CSVOptions(
173176
writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite)
174177
writerSettings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceFlagInWrite)
175178
writerSettings.setNullValue(nullValue)
176-
writerSettings.setEmptyValue("\"\"")
179+
writerSettings.setEmptyValue(emptyValueInWrite)
177180
writerSettings.setSkipEmptyLines(true)
178181
writerSettings.setQuoteAllFields(quoteAll)
179182
writerSettings.setQuoteEscapingEnabled(escapeQuotes)
@@ -194,7 +197,7 @@ class CSVOptions(
194197
settings.setInputBufferSize(inputBufferSize)
195198
settings.setMaxColumns(maxColumns)
196199
settings.setNullValue(nullValue)
197-
settings.setEmptyValue("")
200+
settings.setEmptyValue(emptyValueInRead)
198201
settings.setMaxCharsPerColumn(maxCharsPerColumn)
199202
settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER)
200203
settings

sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
327327
* whitespaces from values being read should be skipped.</li>
328328
* <li>`nullValue` (default empty string): sets the string representation of a null value. Since
329329
* 2.0.1, this applies to all supported types including the string type.</li>
330+
* <li>`emptyValue` (default empty string): sets the string representation of an empty value.</li>
330331
* <li>`nanValue` (default `NaN`): sets the string representation of a non-number" value.</li>
331332
* <li>`positiveInf` (default `Inf`): sets the string representation of a positive infinity
332333
* value.</li>
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
year,make,model,comment,blank
2+
"2012","Tesla","S","",""
3+
1997,Ford,E350,"Go get one now they are going fast",
4+
2015,Chevy,Volt,,""

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
5050
private val carsAltFile = "test-data/cars-alternative.csv"
5151
private val carsUnbalancedQuotesFile = "test-data/cars-unbalanced-quotes.csv"
5252
private val carsNullFile = "test-data/cars-null.csv"
53+
private val carsEmptyValueFile = "test-data/cars-empty-value.csv"
5354
private val carsBlankColName = "test-data/cars-blank-column-name.csv"
5455
private val emptyFile = "test-data/empty.csv"
5556
private val commentsFile = "test-data/comments.csv"
@@ -668,6 +669,70 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
668669
assert(results(2).toSeq === Array(null, "Chevy", "Volt", null, null))
669670
}
670671

672+
test("empty fields with user defined empty values") {
673+
674+
// year,make,model,comment,blank
675+
val dataSchema = StructType(List(
676+
StructField("year", IntegerType, nullable = true),
677+
StructField("make", StringType, nullable = false),
678+
StructField("model", StringType, nullable = false),
679+
StructField("comment", StringType, nullable = true),
680+
StructField("blank", StringType, nullable = true)))
681+
val cars = spark.read
682+
.format("csv")
683+
.schema(dataSchema)
684+
.option("header", "true")
685+
.option("emptyValue", "empty")
686+
.load(testFile(carsEmptyValueFile))
687+
688+
verifyCars(cars, withHeader = true, checkValues = false)
689+
val results = cars.collect()
690+
assert(results(0).toSeq === Array(2012, "Tesla", "S", "empty", "empty"))
691+
assert(results(1).toSeq ===
692+
Array(1997, "Ford", "E350", "Go get one now they are going fast", null))
693+
assert(results(2).toSeq === Array(2015, "Chevy", "Volt", null, "empty"))
694+
}
695+
696+
test("save csv with empty fields with user defined empty values") {
697+
withTempDir { dir =>
698+
val csvDir = new File(dir, "csv").getCanonicalPath
699+
700+
// year,make,model,comment,blank
701+
val dataSchema = StructType(List(
702+
StructField("year", IntegerType, nullable = true),
703+
StructField("make", StringType, nullable = false),
704+
StructField("model", StringType, nullable = false),
705+
StructField("comment", StringType, nullable = true),
706+
StructField("blank", StringType, nullable = true)))
707+
val cars = spark.read
708+
.format("csv")
709+
.schema(dataSchema)
710+
.option("header", "true")
711+
.option("nullValue", "NULL")
712+
.load(testFile(carsEmptyValueFile))
713+
714+
cars.coalesce(1).write
715+
.format("csv")
716+
.option("header", "true")
717+
.option("emptyValue", "empty")
718+
.option("nullValue", null)
719+
.save(csvDir)
720+
721+
val carsCopy = spark.read
722+
.format("csv")
723+
.schema(dataSchema)
724+
.option("header", "true")
725+
.load(csvDir)
726+
727+
verifyCars(carsCopy, withHeader = true, checkValues = false)
728+
val results = carsCopy.collect()
729+
assert(results(0).toSeq === Array(2012, "Tesla", "S", "empty", "empty"))
730+
assert(results(1).toSeq ===
731+
Array(1997, "Ford", "E350", "Go get one now they are going fast", null))
732+
assert(results(2).toSeq === Array(2015, "Chevy", "Volt", null, "empty"))
733+
}
734+
}
735+
671736
test("save csv with compression codec option") {
672737
withTempDir { dir =>
673738
val csvDir = new File(dir, "csv").getCanonicalPath

0 commit comments

Comments
 (0)