Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ def text(self, path, compression=None, lineSep=None):
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None,
charToEscapeQuoteEscaping=None):
charToEscapeQuoteEscaping=None, encoding=None):
"""Saves the content of the :class:`DataFrame` in CSV format at the specified path.

:param path: the path in any Hadoop supported file system
Expand Down Expand Up @@ -895,6 +895,8 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
the quote character. If None is set, the default value is
escape character when escape and quote characters are
different, ``\0`` otherwise..
:param encoding: sets the encoding (charset) to be used on the csv file. If None is set, it
uses the default value, ``UTF-8``.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, let's match the doc to JSON's.


>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
"""
Expand All @@ -904,7 +906,8 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
dateFormat=dateFormat, timestampFormat=timestampFormat,
ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping)
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping,
encoding=encoding)
self._jwrite.csv(path)

@since(1.5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* enclosed in quotes. Default is to only escape values containing a quote character.</li>
* <li>`header` (default `false`): writes the names of columns as the first line.</li>
* <li>`nullValue` (default empty string): sets the string representation of a null value.</li>
* <li>`encoding` (default `UTF-8`): encoding to use when saving to file.</li>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should match the doc with JSON's

* <li>`encoding` (by default it is not set): specifies encoding (charset) of saved json
* files. If it is not set, the UTF-8 charset will be used. </li>

* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
* `snappy` and `deflate`). </li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.datasources.csv

import java.nio.charset.Charset

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce._
Expand Down Expand Up @@ -146,7 +148,12 @@ private[csv] class CsvOutputWriter(
context: TaskAttemptContext,
params: CSVOptions) extends OutputWriter with Logging {

private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path))
private val charset = Charset.forName(params.charset)

private val writer = CodecStreams.createOutputStreamWriter(
context,
Copy link
Member

@HyukjinKwon HyukjinKwon Jul 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit:

private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)

new Path(path),
charset)

private val gen = new UnivocityGenerator(dataSchema, writer, params)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.sql.execution.datasources.csv

import java.io.File
import java.nio.charset.UnsupportedCharsetException
import java.nio.charset.{Charset, UnsupportedCharsetException}
import java.nio.file.Files
import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.util.Locale
Expand Down Expand Up @@ -512,6 +513,43 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
}
}

test("SPARK-19018: Save csv with custom charset") {

// scalastyle:off nonascii
val content = "µß áâä ÁÂÄ"
// scalastyle:on nonascii

Seq("iso-8859-1", "utf-8", "utf-16", "utf-32", "windows-1250").foreach { encoding =>
withTempDir { dir =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withTempDir -> withTempPath

val csvDir = new File(dir, "csv")

val originalDF = Seq(content).toDF("_c0").repartition(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toDF("_c0") -> toDF()

originalDF.write
.option("encoding", encoding)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think our CSV read encoding option is incomplete for now .. there are many discussions about this now. I am going to fix the read path soon. Let me revisit this after fixing it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it's fine. I think we decided to support encoding in CSV/JSON datasources. Ignore the comment above. We can proceed separately.

.csv(csvDir.getCanonicalPath)

csvDir.listFiles().filter(_.getName.endsWith("csv")).foreach({ csvFile =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

h({ => h {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val readback = Files.readAllBytes(csvFile.toPath)
val expected = (content + "\n").getBytes(Charset.forName(encoding))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the newline is dependent on Univocity. This test is going to be broken on Windows. Let's use platform's newline

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good Point!

assert(readback === expected)
})
}
}
}

test("SPARK-19018: error handling for unsupported charsets") {
val exception = intercept[SparkException] {
withTempDir { dir =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withTempPath

val csvDir = new File(dir, "csv").getCanonicalPath
Seq("a,A,c,A,b,B").toDF().write
.option("encoding", "1-9588-osi")
.csv(csvDir)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you could use directly path.getCanonicalPath

}
}

assert(exception.getCause.getMessage.contains("1-9588-osi"))
}

test("commented lines in CSV data") {
Seq("false", "true").foreach { multiLine =>

Expand Down