Skip to content
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None):
samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None):
r"""Loads a CSV file and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -453,6 +453,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param locale: sets a locale as language tag in IETF BCP 47 format. If None is set,
it uses the default value, ``en-US``. For instance, ``locale`` is used while
parsing dates and timestamps.
:param lineSep: defines the line separator that should be used for parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``. Maximum length is 2.

>>> df = spark.read.csv('python/test_support/sql/ages.csv')
>>> df.dtypes
Expand All @@ -472,7 +474,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale)
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
Expand Down Expand Up @@ -868,7 +870,7 @@ def text(self, path, compression=None, lineSep=None):
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None,
charToEscapeQuoteEscaping=None, encoding=None, emptyValue=None):
charToEscapeQuoteEscaping=None, encoding=None, emptyValue=None, lineSep=None):
r"""Saves the content of the :class:`DataFrame` in CSV format at the specified path.

:param path: the path in any Hadoop supported file system
Expand Down Expand Up @@ -922,6 +924,8 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
the default UTF-8 charset will be used.
:param emptyValue: sets the string representation of an empty value. If None is set, it uses
the default value, ``""``.
:param lineSep: defines the line separator that should be used for writing. If None is
set, it uses the default value, ``\\n``. Maximum length is 2.

>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
"""
Expand All @@ -932,7 +936,7 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping,
encoding=encoding, emptyValue=emptyValue)
encoding=encoding, emptyValue=emptyValue, lineSep=lineSep)
self._jwrite.csv(path)

@since(1.5)
Expand Down
6 changes: 4 additions & 2 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
enforceSchema=None, emptyValue=None, locale=None):
enforceSchema=None, emptyValue=None, locale=None, lineSep=None):
r"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -675,6 +675,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param locale: sets a locale as language tag in IETF BCP 47 format. If None is set,
it uses the default value, ``en-US``. For instance, ``locale`` is used while
parsing dates and timestamps.
:param lineSep: defines the line separator that should be used for parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``. Maximum length is 2.

>>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema)
>>> csv_sdf.isStreaming
Expand All @@ -692,7 +694,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema,
emptyValue=emptyValue, locale=locale)
emptyValue=emptyValue, locale=locale, lineSep=lineSep)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,20 @@ class CSVOptions(
*/
val emptyValueInWrite = emptyValue.getOrElse("\"\"")

/**
* A string between two consecutive JSON records.
*/
val lineSeparator: Option[String] = parameters.get("lineSep").map { sep =>
require(sep.nonEmpty, "'lineSep' cannot be an empty string.")
require(sep.length <= 2, "'lineSep' can contain 1 or 2 characters.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk, might not be a super big deal but I believe this should be counted after converting it into UTF-8.

Copy link
Member

@HyukjinKwon HyukjinKwon Nov 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could say the line separator should be 1 or 2 bytes (UTF-8) in read path specifically when multiline is enabled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I see.

sep
}

val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep =>
lineSep.getBytes(charset)
}
val lineSeparatorInWrite: Option[String] = lineSeparator

def asWriterSettings: CsvWriterSettings = {
val writerSettings = new CsvWriterSettings()
val format = writerSettings.getFormat
Expand All @@ -200,6 +214,8 @@ class CSVOptions(
format.setQuoteEscape(escape)
charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping)
format.setComment(comment)
lineSeparatorInWrite.foreach(format.setLineSeparator)

writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite)
writerSettings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceFlagInWrite)
writerSettings.setNullValue(nullValue)
Expand All @@ -216,8 +232,13 @@ class CSVOptions(
format.setDelimiter(delimiter)
format.setQuote(quote)
format.setQuoteEscape(escape)
lineSeparator.foreach {sep =>
format.setLineSeparator(sep)
format.setNormalizedNewline(0x00.toChar)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we have some problems here for setting newlines more then 1 character because setNormalizedNewline only supports one character.

This is related with #18581 (comment) and uniVocity/univocity-parsers#170

That's why I thought we can only support this for single character for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why I thought we can only support this for single character for now.

ok. I will restrict line separators by one character.

}
charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping)
format.setComment(comment)

settings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceInRead)
settings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceInRead)
settings.setReadInputOnSeparateThread(false)
Expand All @@ -227,7 +248,10 @@ class CSVOptions(
settings.setEmptyValue(emptyValueInRead)
settings.setMaxCharsPerColumn(maxCharsPerColumn)
settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER)
settings.setLineSeparatorDetectionEnabled(multiLine == true)
settings.setLineSeparatorDetectionEnabled(lineSeparatorInRead.isEmpty && multiLine)
lineSeparatorInRead.foreach { _ =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

settings.setNormalizeLineEndingsWithinQuotes(!multiLine)
}

settings
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines.</li>
* <li>`locale` (default is `en-US`): sets a locale as language tag in IETF BCP 47 format.
* For instance, this is used while parsing dates and timestamps.</li>
* <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
* that should be used for parsing. Maximum length is 2.</li>
* </ul>
*
* @since 2.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* whitespaces from values being written should be skipped.</li>
* <li>`ignoreTrailingWhiteSpace` (default `true`): a flag indicating defines whether or not
* trailing whitespaces from values being written should be skipped.</li>
* <li>`lineSep` (default `\n`): defines the line separator that should be used for writing.
* Maximum length is 2.</li>
* </ul>
*
* @since 2.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ object TextInputCSVDataSource extends CSVDataSource {
headerChecker: CSVHeaderChecker,
requiredSchema: StructType): Iterator[InternalRow] = {
val lines = {
val linesReader = new HadoopFileLinesReader(file, conf)
val linesReader = new HadoopFileLinesReader(file, parser.options.lineSeparatorInRead, conf)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => linesReader.close()))
linesReader.map { line =>
new String(line.getBytes, 0, line.getLength, parser.options.charset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines.</li>
* <li>`locale` (default is `en-US`): sets a locale as language tag in IETF BCP 47 format.
* For instance, this is used while parsing dates and timestamps.</li>
* <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
* that should be used for parsing. Maximum length is 2.</li>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry. can you fix Maximum length is 2 as well? should be good to go.

* </ul>
*
* @since 2.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources.csv

import java.io.File
import java.nio.charset.{Charset, UnsupportedCharsetException}
import java.nio.charset.{Charset, StandardCharsets, UnsupportedCharsetException}
import java.nio.file.Files
import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
Expand All @@ -33,7 +33,7 @@ import org.apache.hadoop.io.compress.GzipCodec
import org.apache.log4j.{AppenderSkeleton, LogManager}
import org.apache.log4j.spi.LoggingEvent

import org.apache.spark.SparkException
import org.apache.spark.{SparkException, TestUtils}
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -1880,4 +1880,112 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
}
}
}

test("""Support line separator - default value \r, \r\n and \n""") {
val data = "\"a\",1\r\"c\",2\r\n\"d\",3\n"

withTempPath { path =>
Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8))
val df = spark.read.option("inferSchema", true).csv(path.getAbsolutePath)
val expectedSchema =
StructType(StructField("_c0", StringType) :: StructField("_c1", IntegerType) :: Nil)
checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF())
assert(df.schema === expectedSchema)
}
}

def testLineSeparator(lineSep: String, encoding: String, inferSchema: Boolean, id: Int): Unit = {
test(s"Support line separator in ${encoding} #${id}") {
// Read
val data =
s""""a",1$lineSep
|c,2$lineSep"
|d",3""".stripMargin
val dataWithTrailingLineSep = s"$data$lineSep"

Seq(data, dataWithTrailingLineSep).foreach { lines =>
withTempPath { path =>
Files.write(path.toPath, lines.getBytes(encoding))
val schema = StructType(StructField("_c0", StringType)
:: StructField("_c1", LongType) :: Nil)

val expected = Seq(("a", 1), ("\nc", 2), ("\nd", 3))
.toDF("_c0", "_c1")
Seq(false, true).foreach { multiLine =>
val reader = spark
.read
.option("lineSep", lineSep)
.option("multiLine", multiLine)
.option("encoding", encoding)
val df = if (inferSchema) {
reader.option("inferSchema", true).csv(path.getAbsolutePath)
} else {
reader.schema(schema).csv(path.getAbsolutePath)
}
checkAnswer(df, expected)
}
}
}

// Write
withTempPath { path =>
Seq("a", "b", "c").toDF("value").coalesce(1)
.write
.option("lineSep", lineSep)
.option("encoding", encoding)
.csv(path.getAbsolutePath)
val partFile = TestUtils.recursiveList(path).filter(f => f.getName.startsWith("part-")).head
val readBack = new String(Files.readAllBytes(partFile.toPath), encoding)
assert(
readBack === s"a${lineSep}b${lineSep}c${lineSep}")
}

// Roundtrip
withTempPath { path =>
val df = Seq("a", "b", "c").toDF()
df.write
.option("lineSep", lineSep)
.option("encoding", encoding)
.csv(path.getAbsolutePath)
val readBack = spark
.read
.option("lineSep", lineSep)
.option("encoding", encoding)
.csv(path.getAbsolutePath)
checkAnswer(df, readBack)
}
}
}

// scalastyle:off nonascii
List(
(0, "|", "UTF-8", false),
(1, "^", "UTF-16BE", true),
(2, "::", "ISO-8859-1", true),
(3, "!!", "UTF-32LE", false),
(4, 0x1E.toChar.toString, "UTF-8", true),
(5, "아", "UTF-32BE", false),
(6, "ку", "CP1251", true),
(8, "\r\n", "UTF-16LE", true),
(9, "\r\n", "utf-16be", false),
(10, "\u000d\u000a", "UTF-32BE", false),
(11, "\u000a\u000d", "UTF-8", true),
(12, "==", "US-ASCII", false),
(13, "$^", "utf-32le", true)
).foreach { case (testNum, sep, encoding, inferSchema) =>
testLineSeparator(sep, encoding, inferSchema, testNum)
}
// scalastyle:on nonascii

test("lineSep restrictions") {
val errMsg1 = intercept[IllegalArgumentException] {
spark.read.option("lineSep", "").csv(testFile(carsFile)).collect
}.getMessage
assert(errMsg1.contains("'lineSep' cannot be an empty string"))

val errMsg2 = intercept[IllegalArgumentException] {
spark.read.option("lineSep", "123").csv(testFile(carsFile)).collect
}.getMessage
assert(errMsg2.contains("'lineSep' can contain 1 or 2 characters"))
}
}