Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None,
columnNameOfCorruptRecord=None):
columnNameOfCorruptRecord=None, wholeFile=None):
"""Loads a CSV file and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -385,6 +385,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
``spark.sql.columnNameOfCorruptRecord``. If None is set,
it uses the value specified in
``spark.sql.columnNameOfCorruptRecord``.
:param wholeFile: parse records, which may span multiple lines. If None is
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here.

set, it uses the default value, ``false``.

>>> df = spark.read.csv('python/test_support/sql/ages.csv')
>>> df.dtypes
Expand All @@ -398,7 +400,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone,
columnNameOfCorruptRecord=columnNameOfCorruptRecord)
columnNameOfCorruptRecord=columnNameOfCorruptRecord, wholeFile=wholeFile)
if isinstance(path, basestring):
path = [path]
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
Expand Down
6 changes: 4 additions & 2 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None,
columnNameOfCorruptRecord=None):
columnNameOfCorruptRecord=None, wholeFile=None):
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -637,6 +637,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
``spark.sql.columnNameOfCorruptRecord``. If None is set,
it uses the value specified in
``spark.sql.columnNameOfCorruptRecord``.
:param wholeFile: parse one record, which may span multiple lines. If None is
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here.

set, it uses the default value, ``false``.

>>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema)
>>> csv_sdf.isStreaming
Expand All @@ -652,7 +654,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone,
columnNameOfCorruptRecord=columnNameOfCorruptRecord)
columnNameOfCorruptRecord=columnNameOfCorruptRecord, wholeFile=wholeFile)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))
else:
Expand Down
9 changes: 8 additions & 1 deletion python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,12 +437,19 @@ def test_udf_with_order_by_and_limit(self):
self.assertEqual(res.collect(), [Row(id=0, copy=0)])

def test_wholefile_json(self):
from pyspark.sql.types import StringType
people1 = self.spark.read.json("python/test_support/sql/people.json")
people_array = self.spark.read.json("python/test_support/sql/people_array.json",
wholeFile=True)
self.assertEqual(people1.collect(), people_array.collect())

def test_wholefile_csv(self):
ages_newlines = self.spark.read.csv(
"python/test_support/sql/ages_newlines.csv", wholeFile=True)
expected = [Row(_c0=u'Joe', _c1=u'20', _c2=u'Hi,\nI am Jeo'),
Row(_c0=u'Tom', _c1=u'30', _c2=u'My name is Tom'),
Row(_c0=u'Hyukjin', _c1=u'25', _c2=u'I am Hyukjin\n\nI love Spark!')]
self.assertEqual(ages_newlines.collect(), expected)

def test_udf_with_input_file_name(self):
from pyspark.sql.functions import udf, input_file_name
from pyspark.sql.types import StringType
Expand Down
6 changes: 6 additions & 0 deletions python/test_support/sql/ages_newlines.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Joe,20,"Hi,
I am Jeo"
Tom,30,"My name is Tom"
Hyukjin,25,"I am Hyukjin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow you are only 25?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was.. in the last year :).


I love Spark!"
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`columnNameOfCorruptRecord` (default is the value specified in
* `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string
* created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li>
* <li>`wholeFile` (default `false`): parse one record, which may span multiple lines.</li>
Copy link
Member

@gatorsmile gatorsmile Jun 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multiple lines. -> multiple lines, per file.

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jun 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, they are different. JSON reads whole file as a record (basically it is. If it is an array then it will be individual record) whereas CSV reads each record when it meets multiple lines in a column.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still unable to get your point. Given an example?

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jun 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely. I wanted to emphasize multiple lines is not per file.

For example, CSV reads multiple records (multiple lines) per file (newline is replaced to \n manually for readability).

"I am
Hyukjin Kwon"
"Hyukjin Kwon
I love Spark!"
scala> spark.read.option("wholeFile", true).csv("test.csv").show()
+---------------------+
|                  _c0|
+---------------------+
|   I am\nHyukjin Kwon|
|Hyukjin Kwon\nI lo...|
+---------------------+

Whereas JSON reads the record per file. I am pretty sure object root support is primary.

{
  "I am": "HyukjinKwon",
  "HyukjinKwon": "I love Spark!"
}
scala> spark.read.option("wholeFile", true).json("test.json").show()
+-------------+-----------+
|  HyukjinKwon|       I am|
+-------------+-----------+
|I love Spark!|HyukjinKwon|
+-------------+-----------+

but note that it could (in terms of input/output), work similarly with CSV when the input is a json array.

[{
  "I am": "HyukjinKwon",
  "HyukjinKwon": "I love Spark!"
},{
  "I am": "HyukjinKwon",
  "HyukjinKwon": "I love Spark!"
}]
scala> spark.read.option("wholeFile", true).json("test.json").show()
+-------------+-----------+
|  HyukjinKwon|       I am|
+-------------+-----------+
|I love Spark!|HyukjinKwon|
|I love Spark!|HyukjinKwon|
+-------------+-----------+

Comparing array case and CSV, they work still differently. JSON, up to my knowledge, parses whole files and produces each record (in case of an array) whereas CSV parses record by record from the stream.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, the option wholeFile is misleading to end users for csv. We should rename it to multiLine or others.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, they are similar and different. I wouldn't mind opening a JIRA for this.

Copy link
Member

@gatorsmile gatorsmile Jun 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In JSON, when wholeFile is on, we only parse one and only one record (i.e., table rows) per file. The semantics are different. Let us fix the option name for CSV in 2.2.

* </ul>
* @since 2.0.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.util.ReflectionUtils

import org.apache.spark.TaskContext

object CodecStreams {
private def getDecompressionCodec(config: Configuration, file: Path): Option[CompressionCodec] = {
val compressionCodecs = new CompressionCodecFactory(config)
Expand All @@ -42,6 +44,16 @@ object CodecStreams {
.getOrElse(inputStream)
}

/**
* Creates an input stream from the string path and add a closure for the input stream to be
* closed on task completion.
*/
def createInputStreamWithCloseResource(config: Configuration, path: String): InputStream = {
val inputStream = createInputStream(config, new Path(path))
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => inputStream.close()))
inputStream
}

private def getCompressionCodec(
context: JobContext,
file: Option[Path] = None): Option[CompressionCodec] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.csv

import java.io.InputStream
import java.nio.charset.{Charset, StandardCharsets}

import com.univocity.parsers.csv.{CsvParser, CsvParserSettings}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat

import org.apache.spark.TaskContext
import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
import org.apache.spark.rdd.{BinaryFileRDD, RDD}
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
import org.apache.spark.sql.types.StructType

/**
* Common functions for parsing CSV files
*/
abstract class CSVDataSource extends Serializable {
def isSplitable: Boolean

/**
* Parse a [[PartitionedFile]] into [[InternalRow]] instances.
*/
def readFile(
conf: Configuration,
file: PartitionedFile,
parser: UnivocityParser,
parsedOptions: CSVOptions): Iterator[InternalRow]

/**
* Infers the schema from `inputPaths` files.
*/
def infer(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: CSVOptions): Option[StructType]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both subclasses return Some(...) so it would be more clear to change this to def infer(...): StructType and do the option wrapping only in CSVFileFormat.inferSchema

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was copied from JsonDataSource.infer. I would like to keep this in case when we deal with #16976 (comment) and maybe introduce another parent or at least for consistency.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. Reducing code duplication between the JSON and CSV parsers would be great.


/**
* Generates a header from the given row which is null-safe and duplicate-safe.
*/
protected def makeSafeHeader(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, just copied and pasted. Just checked that they are same line by line.

row: Array[String],
caseSensitive: Boolean,
options: CSVOptions): Array[String] = {
if (options.headerFlag) {
val duplicates = {
val headerNames = row.filter(_ != null)
.map(name => if (caseSensitive) name else name.toLowerCase)
headerNames.diff(headerNames.distinct).distinct
}

row.zipWithIndex.map { case (value, index) =>
if (value == null || value.isEmpty || value == options.nullValue) {
// When there are empty strings or the values set in `nullValue`, put the
// index as the suffix.
s"_c$index"
} else if (!caseSensitive && duplicates.contains(value.toLowerCase)) {
// When there are case-insensitive duplicates, put the index as the suffix.
s"$value$index"
} else if (duplicates.contains(value)) {
// When there are duplicates, put the index as the suffix.
s"$value$index"
} else {
value
}
}
} else {
row.zipWithIndex.map { case (_, index) =>
// Uses default column names, "_c#" where # is its position of fields
// when header option is disabled.
s"_c$index"
}
}
}
}

object CSVDataSource {
def apply(options: CSVOptions): CSVDataSource = {
if (options.wholeFile) {
WholeFileCSVDataSource
} else {
TextInputCSVDataSource
}
}
}

object TextInputCSVDataSource extends CSVDataSource {
override val isSplitable: Boolean = true

override def readFile(
conf: Configuration,
file: PartitionedFile,
parser: UnivocityParser,
parsedOptions: CSVOptions): Iterator[InternalRow] = {
val lines = {
val linesReader = new HadoopFileLinesReader(file, conf)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close()))
linesReader.map { line =>
new String(line.getBytes, 0, line.getLength, parsedOptions.charset)
}
}

val shouldDropHeader = parsedOptions.headerFlag && file.start == 0
UnivocityParser.parseIterator(lines, shouldDropHeader, parser)
}

override def infer(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: CSVOptions): Option[StructType] = {
val csv: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions)
val firstLine: String = CSVUtils.filterCommentAndEmpty(csv, parsedOptions).first()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also be tested on empty files, if it's not already.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, indeed it is an issue. #16976 (comment)

val firstRow = new CsvParser(parsedOptions.asParserSettings).parseLine(firstLine)
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
val tokenRDD = csv.rdd.mapPartitions { iter =>
val filteredLines = CSVUtils.filterCommentAndEmpty(iter, parsedOptions)
val linesWithoutHeader =
CSVUtils.filterHeaderLine(filteredLines, firstLine, parsedOptions)
val parser = new CsvParser(parsedOptions.asParserSettings)
linesWithoutHeader.map(parser.parseLine)
}

Some(CSVInferSchema.infer(tokenRDD, header, parsedOptions))
}

private def createBaseDataset(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
options: CSVOptions): Dataset[String] = {
val paths = inputPaths.map(_.getPath.toString)
if (Charset.forName(options.charset) == StandardCharsets.UTF_8) {
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
className = classOf[TextFileFormat].getName
).resolveRelation(checkFilesExist = false))
.select("value").as[String](Encoders.STRING)
} else {
val charset = options.charset
val rdd = sparkSession.sparkContext
.hadoopFile[LongWritable, Text, TextInputFormat](paths.mkString(","))
.mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset)))
sparkSession.createDataset(rdd)(Encoders.STRING)
}
}
}

object WholeFileCSVDataSource extends CSVDataSource {
override val isSplitable: Boolean = false

override def readFile(
conf: Configuration,
file: PartitionedFile,
parser: UnivocityParser,
parsedOptions: CSVOptions): Iterator[InternalRow] = {
UnivocityParser.parseStream(
CodecStreams.createInputStreamWithCloseResource(conf, file.filePath),
parsedOptions.headerFlag,
parser)
}

override def infer(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: CSVOptions): Option[StructType] = {
val csv: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths, parsedOptions)
val maybeFirstRow: Option[Array[String]] = csv.flatMap { lines =>
UnivocityParser.tokenizeStream(
CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, lines.getPath()),
false,
new CsvParser(parsedOptions.asParserSettings))
}.take(1).headOption

if (maybeFirstRow.isDefined) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just used isDefined and get here rather than map and getOrElse because it looks easier to read to me.

val firstRow = maybeFirstRow.get
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
val tokenRDD = csv.flatMap { lines =>
UnivocityParser.tokenizeStream(
CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, lines.getPath()),
parsedOptions.headerFlag,
new CsvParser(parsedOptions.asParserSettings))
}
Some(CSVInferSchema.infer(tokenRDD, header, parsedOptions))
} else {
// If the first row could not be read, just return the empty schema.
Some(StructType(Nil))
}
}

private def createBaseRdd(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
options: CSVOptions): RDD[PortableDataStream] = {
val paths = inputPaths.map(_.getPath)
val name = paths.mkString(",")
val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
FileInputFormat.setInputPaths(job, paths: _*)
val conf = job.getConfiguration

val rdd = new BinaryFileRDD(
sparkSession.sparkContext,
classOf[StreamInputFormat],
classOf[String],
classOf[PortableDataStream],
conf,
sparkSession.sparkContext.defaultMinPartitions)

// Only returns `PortableDataStream`s without paths.
rdd.setName(s"CSVFile: $name").values
}
}
Loading