Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a794988
Adding the delimiter option encoded in base64
MaxGekk Feb 24, 2018
dccdaa2
Separator encoded as a sequence of bytes in hex
MaxGekk Feb 24, 2018
d0abab7
Refactoring: removed unused imports and renaming a parameter
MaxGekk Feb 24, 2018
6741796
The sep option is renamed to recordSeparator. The supported format is…
MaxGekk Mar 4, 2018
e4faae1
Renaming recordSeparator to recordDelimiter
MaxGekk Mar 18, 2018
01f4ef5
Comments for the recordDelimiter option
MaxGekk Mar 18, 2018
24cedb9
Support other formats of recordDelimiter
MaxGekk Mar 18, 2018
d40dda2
Checking different charsets and record delimiters
MaxGekk Mar 18, 2018
ad6496c
Renaming test's method to make it more readable
MaxGekk Mar 18, 2018
358863d
Test of reading json in different charsets and delimiters
MaxGekk Mar 18, 2018
7e5be5e
Fix inferring of csv schema for any charsets
MaxGekk Mar 18, 2018
d138d2d
Fix errors of scalastyle check
MaxGekk Mar 18, 2018
c26ef5d
Reserving format for regular expressions and concatenated json
MaxGekk Mar 22, 2018
5f0b069
Fix recordDelimiter tests
MaxGekk Mar 22, 2018
ef8248f
Additional cases are added to the delimiter test
MaxGekk Mar 22, 2018
2efac08
Renaming recordDelimiter to lineSeparator
MaxGekk Mar 22, 2018
b2020fa
Adding HyukjinKwon changes
MaxGekk Mar 22, 2018
f99c1e1
Revert lineSepInWrite back to lineSep
MaxGekk Mar 22, 2018
6d13d00
Merge remote-tracking branch 'origin/master' into json-line-sep
MaxGekk Mar 22, 2018
77112ef
Fix passing of the lineSeparator to HadoopFileLinesReader
MaxGekk Mar 22, 2018
d632706
Fix python style checking
MaxGekk Mar 23, 2018
bbff402
Fix text source tests and javadoc comments
MaxGekk Mar 23, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
multiLine=None, allowUnquotedControlChars=None):
multiLine=None, allowUnquotedControlChars=None, lineSep=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename it to recordDelimiter

"""
Loads JSON files and returns the results as a :class:`DataFrame`.

Expand Down Expand Up @@ -237,6 +237,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
:param allowUnquotedControlChars: allows JSON Strings to contain unquoted control
characters (ASCII characters with value less than 32,
including tab and line feed characters) or not.
:param lineSep: defines the line separator that should be used for parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.

>>> df1 = spark.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
Expand All @@ -254,7 +256,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
timestampFormat=timestampFormat, multiLine=multiLine,
allowUnquotedControlChars=allowUnquotedControlChars)
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
Expand Down Expand Up @@ -746,7 +748,8 @@ def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options)
self._jwrite.saveAsTable(name)

@since(1.4)
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None,
lineSep=None):
"""Saves the content of the :class:`DataFrame` in JSON format
(`JSON Lines text format or newline-delimited JSON <http://jsonlines.org/>`_) at the
specified path.
Expand All @@ -770,12 +773,15 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSXXX``.
:param lineSep: defines the line separator that should be used for writing. If None is
set, it uses the default value, ``\\n``.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it covers all ``\\r``, ``\\r\\n`` and ``\\n``.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a method of DataFrameWriter. It writes exactly '\n'


>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
self._set_opts(
compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat,
lineSep=lineSep)
self._jwrite.json(path)

@since(1.4)
Expand Down
6 changes: 4 additions & 2 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
multiLine=None, allowUnquotedControlChars=None):
multiLine=None, allowUnquotedControlChars=None, lineSep=None):
"""
Loads a JSON file stream and returns the results as a :class:`DataFrame`.

Expand Down Expand Up @@ -470,6 +470,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
:param allowUnquotedControlChars: allows JSON Strings to contain unquoted control
characters (ASCII characters with value less than 32,
including tab and line feed characters) or not.
:param lineSep: defines the line separator that should be used for parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.

>>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
>>> json_sdf.isStreaming
Expand All @@ -484,7 +486,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
timestampFormat=timestampFormat, multiLine=multiLine,
allowUnquotedControlChars=allowUnquotedControlChars)
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep)
if isinstance(path, basestring):
return self._df(self._jreader.json(path))
else:
Expand Down
17 changes: 17 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,23 @@ def test_multiline_json(self):
multiLine=True)
self.assertEqual(people1.collect(), people_array.collect())

def test_linesep_json(self):
df = self.spark.read.json("python/test_support/sql/people.json", lineSep=",")
expected = [Row(_corrupt_record=None, name=u'Michael'),
Row(_corrupt_record=u' "age":30}\n{"name":"Justin"', name=None),
Row(_corrupt_record=u' "age":19}\n', name=None)]
self.assertEqual(df.collect(), expected)

tpath = tempfile.mkdtemp()
shutil.rmtree(tpath)
try:
df = self.spark.read.json("python/test_support/sql/people.json")
df.write.json(tpath, lineSep="!!")
readback = self.spark.read.json(tpath, lineSep="!!")
self.assertEqual(readback.collect(), df.collect())
finally:
shutil.rmtree(tpath)

def test_multiline_csv(self):
ages_newlines = self.spark.read.csv(
"python/test_support/sql/ages_newlines.csv", multiLine=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.{ByteArrayInputStream, InputStream, InputStreamReader}
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import org.apache.hadoop.io.Text

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.unsafe.types.UTF8String

private[sql] object CreateJacksonParser extends Serializable {
Expand All @@ -46,4 +47,15 @@ private[sql] object CreateJacksonParser extends Serializable {
def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = {
jsonFactory.createParser(record)
}

def internalRow(
jsonFactory: JsonFactory,
row: InternalRow,
charset: Option[String] = None
): JsonParser = {
require(charset == Some("UTF-8"))
val is = new ByteArrayInputStream(row.getBinary(0))

inputStream(jsonFactory, is)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,38 @@ private[sql] class JSONOptions(

val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)

val charset: Option[String] = Some("UTF-8")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like we need to review #20849 first


/**
* A sequence of bytes between two consecutive json records. Format of the option is:
* selector (1 char) + delimiter body (any length) | sequence of chars
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid of defining our own rule here, is there any standard we can follow?

* The following selectors are supported:
* - 'x' + sequence of bytes in hexadecimal format. For example: "x0a 0d".
* Hex pairs can be separated by any chars different from 0-9,A-F,a-f
* - '\' - reserved for a sequence of control chars like "\r\n"
* and unicode escape like "\u000D\u000A"
* - 'r' and '/' - reserved for future use
*
* Note: the option defines a delimiter for the json reader only, the json writer
* uses '\n' as the delimiter of output records (it is converted to sequence of
* bytes according to charset)
*/
val lineSeparator: Option[Array[Byte]] = parameters.get("lineSep").collect {
case hexs if hexs.startsWith("x") =>
hexs.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray
.map(Integer.parseInt(_, 16).toByte)
case reserved if reserved.startsWith("r") || reserved.startsWith("/") =>
throw new NotImplementedError(s"the $reserved selector has not supported yet")
case delim => delim.getBytes(charset.getOrElse(
throw new IllegalArgumentException("Please, set the charset option for the delimiter")))
}
val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator

// Note that JSON uses writer with UTF-8 charset. This string will be written out as UTF-8.
val lineSeparatorInWrite: String = {
lineSeparator.map(new String(_, charset.getOrElse("UTF-8"))).getOrElse("\n")
}

/** Sets config options on a Jackson [[JsonFactory]]. */
def setJacksonOptions(factory: JsonFactory): Unit = {
factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments)
Expand All @@ -96,4 +128,10 @@ private[sql] class JSONOptions(
allowBackslashEscapingAnyCharacter)
factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, allowUnquotedControlChars)
}

def getTextOptions: Map[String, String] = {
lineSeparatorInRead.map{ bytes =>
"lineSep" -> bytes.map("x%02x".format(_)).mkString
}.toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ private[sql] class JacksonGenerator(

private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)

private val lineSeparator: String = options.lineSeparatorInWrite

private def makeWriter(dataType: DataType): ValueWriter = dataType match {
case NullType =>
(row: SpecializedGetters, ordinal: Int) =>
Expand Down Expand Up @@ -251,5 +253,8 @@ private[sql] class JacksonGenerator(
mapType = dataType.asInstanceOf[MapType]))
}

def writeLineEnding(): Unit = gen.writeRaw('\n')
def writeLineEnding(): Unit = {
// Note that JSON uses writer with UTF-8 charset. This string will be written out as UTF-8.
gen.writeRaw(lineSeparator)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines,
* per file</li>
* <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
* that should be used for parsing.</li>
* </ul>
*
* @since 2.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
* <li>`lineSep` (default `\n`): defines the line separator that should
* be used for writing.</li>
* </ul>
*
* @since 1.4.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
import org.apache.spark.rdd.{BinaryFileRDD, RDD}
import org.apache.spark.sql.{AnalysisException, Dataset, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
Expand Down Expand Up @@ -92,25 +93,33 @@ object TextInputJsonDataSource extends JsonDataSource {
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: JSONOptions): StructType = {
val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths)
val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions)
inferFromDataset(json, parsedOptions)
}

def inferFromDataset(json: Dataset[String], parsedOptions: JSONOptions): StructType = {
val sampled: Dataset[String] = JsonUtils.sample(json, parsedOptions)
val rdd: RDD[UTF8String] = sampled.queryExecution.toRdd.map(_.getUTF8String(0))
JsonInferSchema.infer(rdd, parsedOptions, CreateJacksonParser.utf8String)
val rdd: RDD[InternalRow] = sampled.queryExecution.toRdd

JsonInferSchema.infer[InternalRow](
rdd,
parsedOptions,
CreateJacksonParser.internalRow(_, _, parsedOptions.charset)
)
}

private def createBaseDataset(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus]): Dataset[String] = {
inputPaths: Seq[FileStatus],
parsedOptions: JSONOptions
): Dataset[String] = {
val paths = inputPaths.map(_.getPath.toString)
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
className = classOf[TextFileFormat].getName
className = classOf[TextFileFormat].getName,
options = parsedOptions.getTextOptions
).resolveRelation(checkFilesExist = false))
.select("value").as(Encoders.STRING)
}
Expand All @@ -120,7 +129,7 @@ object TextInputJsonDataSource extends JsonDataSource {
file: PartitionedFile,
parser: JacksonParser,
schema: StructType): Iterator[InternalRow] = {
val linesReader = new HadoopFileLinesReader(file, conf)
val linesReader = new HadoopFileLinesReader(file, parser.options.lineSeparatorInRead, conf)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close()))
val safeParser = new FailureSafeParser[Text](
input => parser.parse(input, CreateJacksonParser.text, textToUTF8String),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,20 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti
*/
val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean

private val lineSeparator: Option[String] = parameters.get(LINE_SEPARATOR).map { sep =>
require(sep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.")
sep
val charset: Option[String] = Some("UTF-8")

val lineSeparator: Option[Array[Byte]] = parameters.get("lineSep").collect {
case hexs if hexs.startsWith("x") =>
hexs.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray
.map(Integer.parseInt(_, 16).toByte)
case reserved if reserved.startsWith("r") || reserved.startsWith("/") =>
throw new NotImplementedError(s"the $reserved selector has not supported yet")
case delim => delim.getBytes(charset.getOrElse(
throw new IllegalArgumentException("Please, set the charset option for the delimiter")))
}

// Note that the option 'lineSep' uses a different default value in read and write.
val lineSeparatorInRead: Option[Array[Byte]] =
lineSeparator.map(_.getBytes(StandardCharsets.UTF_8))
val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator
val lineSeparatorInWrite: Array[Byte] =
lineSeparatorInRead.getOrElse("\n".getBytes(StandardCharsets.UTF_8))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines,
* per file</li>
* <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
* that should be used for parsing.</li>
* </ul>
*
* @since 2.0.0
Expand Down
Loading