Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ stax-api-1.0.1.jar
stream-2.9.6.jar
stringtemplate-3.2.1.jar
super-csv-2.2.0.jar
univocity-parsers-2.7.3.jar
univocity-parsers-2.8.3.jar
validation-api-2.0.1.Final.jar
xbean-asm7-shaded-4.14.jar
xercesImpl-2.9.1.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-3.2
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ stream-2.9.6.jar
super-csv-2.2.0.jar
token-provider-1.0.1.jar
transaction-api-1.1.jar
univocity-parsers-2.7.3.jar
univocity-parsers-2.8.3.jar
validation-api-2.0.1.Final.jar
velocity-1.5.jar
woodstox-core-5.0.3.jar
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2180,6 +2180,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.univocity</groupId>
<artifactId>univocity-parsers</artifactId>
<version>2.8.3</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
or RDD of Strings storing CSV rows.
:param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema
or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
:param sep: sets a single character as a separator for each field and value.
If None is set, it uses the default value, ``,``.
:param sep: sets a separator (one or more characters) for each field and value. If None is
set, it uses the default value, ``,``.
:param encoding: decodes the CSV files by the given encoding type. If None is set,
it uses the default value, ``UTF-8``.
:param quote: sets a single character used for escaping quoted values where the
Expand Down Expand Up @@ -890,7 +890,7 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, bzip2, gzip, lz4,
snappy and deflate).
:param sep: sets a single character as a separator for each field and value. If None is
:param sep: sets a separator (one or more characters) for each field and value. If None is
set, it uses the default value, ``,``.
:param quote: sets a single character used for escaping quoted values where the
separator can be part of the value. If None is set, it uses the default
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,8 +596,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param path: string, or list of strings, for input path(s).
:param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema
or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
:param sep: sets a single character as a separator for each field and value.
If None is set, it uses the default value, ``,``.
:param sep: sets a separator (one or more characters) for each field and value. If None is
set, it uses the default value, ``,``.
:param encoding: decodes the CSV files by the given encoding type. If None is set,
it uses the default value, ``UTF-8``.
:param quote: sets a single character used for escaping quoted values where the
Expand Down
1 change: 0 additions & 1 deletion sql/catalyst/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@
<dependency>
<groupId>com.univocity</groupId>
<artifactId>univocity-parsers</artifactId>
<version>2.7.3</version>
<type>jar</type>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalyst.csv

import org.apache.commons.lang3.StringUtils

object CSVExprUtils {
/**
* Filter ignorable rows for CSV iterator (lines empty and starting with `comment`).
Expand Down Expand Up @@ -79,4 +81,48 @@ object CSVExprUtils {
throw new IllegalArgumentException(s"Delimiter cannot be more than one character: $str")
}
}

/**
* Helper method that converts string representation of a character sequence to actual
* delimiter characters. The input is processed in "chunks", and each chunk is converted
* by calling [[CSVExprUtils.toChar()]]. A chunk is either:
* <ul>
* <li>a backslash followed by another character</li>
* <li>a non-backslash character by itself</li>
* </ul>
* , in that order of precedence. The result of the converting all chunks is returned as
* a [[String]].
*
* <br/><br/>Examples:
* <ul><li>`\t` will result in a single tab character as the separator (same as before)
* </li><li>`|||` will result in a sequence of three pipe characters as the separator
* </li><li>`\\` will result in a single backslash as the separator (same as before)
* </li><li>`\.` will result in an error (since a dot is not a character that needs escaped)
* </li><li>`\\.` will result in a backslash, then dot, as the separator character sequence
* </li><li>`.\t.` will result in a dot, then tab, then dot as the separator character sequence
* </li>
* </ul>
*
* @param str the string representing the sequence of separator characters
* @return a [[String]] representing the multi-character delimiter
* @throws IllegalArgumentException if any of the individual input chunks are illegal
*/
def toDelimiterStr(str: String): String = {
var idx = 0

var delimiter = ""

while (idx < str.length()) {
// if the current character is a backslash, check it plus the next char
// in order to use existing escape logic
val readAhead = if (str(idx) == '\\') 2 else 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toChar() can handle "\u0000" which is not 2 chars long, I think. Could you check this case and write a test for that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to say, it's not worth it, because toChar doesn't support general unicode syntax either, and it's Java/Scala syntax anyway, and \0 is the more natural way to say it. But toChar doesn't support \0. We could at least special-case that in toChar as well instead, to support NULL as a delimiter, rather than expand the logic here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added a couple more tests, for both varieties of the null character. They were already being handled. \u0000 was handled because of the special case in toChar), and \0 was handled because of the normal single character case (i.e. case Seq(c) => c).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think anything but Scala is handling the \u0000 case. The String is one character by the time any of this executes. I think you'd find this doesn't work if you write "\\u0000", which is what you would have to do to actually encounter the 6-character string \u0000 here. But then you'd interpret the delimiter as u0000. Same problem as the "\\" case.

To your test case below -- unicode unescaping happens before everything, so """\u0000""" still yields a 1-character delimiter.

I would suggest punting on this right here, but I am kind of concerned about the '"\"` case in general.
I would remove the added comment above about backslash escaping because AFAICT people should just be using the language's string literal syntax for expressing the chars, and we actually shouldn't further unescape them, but we can leave that much unchanged here.

We may find this whole loop is unnecessary as a result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I made an incorrect assumption about how the triple quote interpolator works. But yes, if the whole partial unescape stuff could be removed, then it would be far simpler here.

// get the chunk of 1 or 2 input characters to convert to a single delimiter char
val chunk = StringUtils.substring(str, idx, idx + readAhead)
delimiter += toChar(chunk)
// advance the counter by the length of input chunk processed
idx += chunk.length()
}

delimiter.mkString("")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class CSVOptions(
}
}

val delimiter = CSVExprUtils.toChar(
val delimiter = CSVExprUtils.toDelimiterStr(
parameters.getOrElse("sep", parameters.getOrElse("delimiter", ",")))
val parseMode: ParseMode =
parameters.get("mode").map(ParseMode.fromString).getOrElse(PermissiveMode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalyst.csv

import org.scalatest.prop.TableDrivenPropertyChecks._

import org.apache.spark.SparkFunSuite

class CSVExprUtilsSuite extends SparkFunSuite {
Expand Down Expand Up @@ -58,4 +60,40 @@ class CSVExprUtilsSuite extends SparkFunSuite {
}
assert(exception.getMessage.contains("Delimiter cannot be empty string"))
}

val testCases = Table(
("input", "separatorStr", "expectedErrorMsg"),
// normal tab
("""\t""", Some("\t"), None),
// backslash, then tab
("""\\t""", Some("""\t"""), None),
// invalid special character (dot)
("""\.""", None, Some("Unsupported special character for delimiter")),
// backslash, then dot
("""\\.""", Some("""\."""), None),
// nothing special, just straight conversion
("""foo""", Some("foo"), None),
// tab in the middle of some other letters
("""ba\tr""", Some("ba\tr"), None),
// null character, expressed in Unicode literal syntax
("""\u0000""", Some("\u0000"), None),
// and specified directly
("\0", Some("\u0000"), None)
)

test("should correctly produce separator strings, or exceptions, from input") {
forAll(testCases) { (input, separatorStr, expectedErrorMsg) =>
try {
val separator = CSVExprUtils.toDelimiterStr(input)
assert(separatorStr.isDefined)
assert(expectedErrorMsg.isEmpty)
assert(separator.equals(separatorStr.get))
} catch {
case e: IllegalArgumentException =>
assert(separatorStr.isEmpty)
assert(expectedErrorMsg.isDefined)
assert(e.getMessage.contains(expectedErrorMsg.get))
}
}
}
}
1 change: 0 additions & 1 deletion sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
<dependency>
<groupId>com.univocity</groupId>
<artifactId>univocity-parsers</artifactId>
<version>2.7.3</version>
<type>jar</type>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,8 +566,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
*
* You can set the following CSV-specific options to deal with CSV files:
* <ul>
* <li>`sep` (default `,`): sets a single character as a separator for each
* field and value.</li>
* <li>`sep` (default `,`): sets a separator for each field and value. This separator can be one
* or more characters.</li>
* <li>`encoding` (default `UTF-8`): decodes the CSV files by the given encoding
* type.</li>
* <li>`quote` (default `"`): sets a single character used for escaping quoted values where
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
year_/-\_make_/-\_model_/-\_comment_/-\_blank
'2012'_/-\_'Tesla'_/-\_'S'_/-\_'No comment'_/-\_
1997_/-\_Ford_/-\_E350_/-\_'Go get one now they are going fast'_/-\_
2015_/-\_Chevy_/-\_Volt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
year, make, model, comment, blank
'2012', 'Tesla', 'S', No comment,
1997, Ford, E350, 'Go get one now they are going fast',
2015, Chevy, Volt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class CSVSuite extends QueryTest with SharedSparkSession with TestCsvData {
private val carsFile8859 = "test-data/cars_iso-8859-1.csv"
private val carsTsvFile = "test-data/cars.tsv"
private val carsAltFile = "test-data/cars-alternative.csv"
private val carsMultiCharDelimitedFile = "test-data/cars-multichar-delim.csv"
private val carsMultiCharCrazyDelimitedFile = "test-data/cars-multichar-delim-crazy.csv"
private val carsUnbalancedQuotesFile = "test-data/cars-unbalanced-quotes.csv"
private val carsNullFile = "test-data/cars-null.csv"
private val carsEmptyValueFile = "test-data/cars-empty-value.csv"
Expand Down Expand Up @@ -188,6 +190,49 @@ class CSVSuite extends QueryTest with SharedSparkSession with TestCsvData {
verifyCars(cars, withHeader = true)
}

test("test with tab delimiter and double quote") {
val cars = spark.read
.options(Map("quote" -> "\"", "delimiter" -> """\t""", "header" -> "true"))
.csv(testFile(carsTsvFile))

verifyCars(cars, numFields = 6, withHeader = true, checkHeader = false)
}

test("SPARK-24540: test with multiple character delimiter (comma space)") {
val cars = spark.read
.options(Map("quote" -> "\'", "delimiter" -> ", ", "header" -> "true"))
.csv(testFile(carsMultiCharDelimitedFile))

verifyCars(cars, withHeader = true)
}

test("SPARK-24540: test with multiple (crazy) character delimiter") {
val cars = spark.read
.options(Map("quote" -> "\'", "delimiter" -> """_/-\\_""", "header" -> "true"))
.csv(testFile(carsMultiCharCrazyDelimitedFile))

verifyCars(cars, withHeader = true)

// check all the other columns, besides year (which is covered by verifyCars)
val otherCols = cars.select("make", "model", "comment", "blank").collect()
val expectedOtherColVals = Seq(
("Tesla", "S", "No comment", null),
("Ford", "E350", "Go get one now they are going fast", null),
("Chevy", "Volt", null, null)
)

expectedOtherColVals.zipWithIndex.foreach { case (values, index) =>
val actualRow = otherCols(index)
values match {
case (make, model, comment, blank) =>
assert(make == actualRow.getString(0))
assert(model == actualRow.getString(1))
assert(comment == actualRow.getString(2))
assert(blank == actualRow.getString(3))
}
}
}

test("parse unescaped quotes with maxCharsPerColumn") {
val rows = spark.read
.format("csv")
Expand Down