From 767801364b4cc12aaa1bb641c9f7f02a97aea40c Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 30 Mar 2018 17:34:16 +0200 Subject: [PATCH 1/3] Test for hex format of lineSep --- .../sql/execution/datasources/json/JsonSuite.scala | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 0b954eb24b7df..d5f0593f523b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2073,9 +2073,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val fileName = "json-tests/utf16WithBOM.json" val schema = new StructType().add("firstName", StringType).add("lastName", StringType) val jsonDF = spark.read.schema(schema) - // This option will be replaced by .option("lineSep", "x00 0a") - // as soon as lineSep allows to specify sequence of bytes in hexadecimal format. - .option("mode", "DROPMALFORMED") + .option("recordDelimiter", "x0d 00 0a 00") .json(testFile(fileName)) checkAnswer(jsonDF, Seq( @@ -2225,9 +2223,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val savedDf = spark .read .schema(ds.schema) - // This option will be replaced by .option("lineSep", "x00 0a") - // as soon as lineSep allows to specify sequence of bytes in hexadecimal format. - .option("mode", "DROPMALFORMED") + .option("lineSep", "x00 0a") .json(path.getCanonicalPath) checkAnswer(savedDf.toDF(), ds.toDF()) @@ -2291,7 +2287,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ("\u000d\u000a", "encoding", "UTF-32BE", false), ("\u000a\u000d", "charset", "UTF-8", true), ("===", "encoding", "UTF-16", false), - ("$^+", "charset", "UTF-32LE", true) + ("$^+", "charset", "UTF-32LE", true), + ("x00 0a 00 0d", "charset", "UTF-16BE", false), + ("x0a.00.00.00 0d.00.00.00", "encoding", "UTF-32LE", true) ).zipWithIndex.foreach{case ((d, o, c, s), i) => checkReadJson(d, o, c, s, i)} // scalastyle:on nonascii From 5632aa5a98851b2a13215cd18e163bac9d57ce68 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 30 Mar 2018 19:56:05 +0200 Subject: [PATCH 2/3] Support flexible format of lineSep --- .../spark/sql/catalyst/json/JSONOptions.scala | 62 +++++++++++++------ .../sql/catalyst/json/JacksonGenerator.scala | 10 ++- .../datasources/text/TextFileFormat.scala | 10 ++- .../datasources/text/TextOptions.scala | 35 ++++++++--- .../datasources/json/JsonSuite.scala | 3 +- 5 files changed, 86 insertions(+), 34 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index e79a98fb1efb1..2809d1bfd2110 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.json -import java.nio.charset.StandardCharsets +import java.nio.charset.{Charset, StandardCharsets} import java.util.{Locale, TimeZone} import com.fasterxml.jackson.core.{JsonFactory, JsonParser} @@ -87,30 +87,51 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) /** - * A sequence of bytes between two consecutive json records. + * Standard charset name. For example UTF-8, UTF-16 and UTF-32. + * If charset is not specified (None), it will be detected automatically. */ - val lineSeparator: Option[String] = parameters.get("lineSep") + val charset: Option[String] = parameters.get("encoding").orElse(parameters.get("charset")) /** - * Standard charset name. For example UTF-8, UTF-16 and UTF-32. - * If charset is not specified (None), it will be detected automatically. + * A sequence of bytes between two consecutive json objects. + * Format of the option is: + * selector (1 char) + separator spec (any length) | sequence of chars + * + * Currently the following selectors are supported: + * - 'x' + sequence of bytes in hexadecimal format. For example: "x0a 0d". + * Hex pairs can be separated by any chars different from 0-9,A-F,a-f + * - '\' - reserved for a sequence of control chars like "\r\n" + * and unicode escape like "\u000D\u000A" + * - 'r' and '/' - reserved for future use */ - val charset: Option[String] = parameters.get("charset") - .orElse(parameters.get("encoding")).map { cs => - if (multiLine == false && cs != "UTF-8" && lineSeparator.isEmpty) { - throw new IllegalArgumentException( - s"""Please, set the 'lineSep' option for the given charset $cs. - |Example: .option("lineSep", "|^|") - |Note: lineSep can be detected automatically for UTF-8 only.""".stripMargin - ) - } - cs + val lineSeparator: Option[Array[Byte]] = parameters.get("lineSep").collect { + case hexs if hexs.startsWith("x") => + hexs.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray + .map(Integer.parseInt(_, 16).toByte) + case reserved if reserved.startsWith("r") || reserved.startsWith("/") => + throw new NotImplementedError(s"The $reserved selector has not supported yet") + case "" => throw new IllegalArgumentException("lineSep cannot be empty string") + case lineSep => lineSep.getBytes(charset.getOrElse("UTF-8")) + }.orElse { + if (multiLine == false && charset.isDefined && charset != Some("UTF-8")) { + throw new IllegalArgumentException( + s"""Please, set the 'lineSep' option for the given charset ${charset.get}. + |Example: .option("lineSep", "|^|") + |Note: lineSep can be detected automatically for UTF-8 only.""".stripMargin + ) + } + None } - val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep => - lineSep.getBytes(charset.getOrElse("UTF-8")) - } - val lineSeparatorInWrite: String = lineSeparator.getOrElse("\n") + /** + * A sequence of bytes between two consecutive json objects used by JSON Reader to + * split input stream/text. + */ + val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator + /** + * JSON Writer puts the string between json objects in output stream/text. + */ + val lineSeparatorInWrite: Option[Array[Byte]] = lineSeparator /** Sets config options on a Jackson [[JsonFactory]]. */ def setJacksonOptions(factory: JsonFactory): Unit = { @@ -125,6 +146,7 @@ private[sql] class JSONOptions( } def getTextOptions: Map[String, String] = { - Map[String, String]() ++ charset.map("charset" -> _) ++ lineSeparator.map("lineSep" -> _) + Map[String, String]() ++ charset.map("charset" -> _) ++ + lineSeparator.map("lineSep" -> _.map("x%02x".format(_)).mkString) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 9c413de752a8c..5c71f3939aea3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.json import java.io.Writer -import java.nio.charset.StandardCharsets +import java.nio.charset.Charset import com.fasterxml.jackson.core._ @@ -75,7 +75,12 @@ private[sql] class JacksonGenerator( private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null) - private val lineSeparator: String = options.lineSeparatorInWrite + private val lineSeparator: String = { + new String( + options.lineSeparatorInWrite.getOrElse(Array(0x0A.toByte)), + Charset.forName(options.charset.getOrElse("UTF-8")) + ) + } private def makeWriter(dataType: DataType): ValueWriter = dataType match { case NullType => @@ -255,7 +260,6 @@ private[sql] class JacksonGenerator( } def writeLineEnding(): Unit = { - // Note that JSON uses writer with UTF-8 charset. This string will be written out as UTF-8. gen.writeRaw(lineSeparator) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index 9647f09867643..c2af2fc29766f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.execution.datasources.text +import java.nio.charset.Charset + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} - import org.apache.spark.TaskContext + import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.InternalRow @@ -86,7 +88,7 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - new TextOutputWriter(path, dataSchema, textOptions.lineSeparatorInWrite, context) + new TextOutputWriter(path, dataSchema, textOptions, context) } override def getFileExtension(context: TaskAttemptContext): String = { @@ -149,10 +151,12 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { class TextOutputWriter( path: String, dataSchema: StructType, - lineSeparator: Array[Byte], + options: TextOptions, context: TaskAttemptContext) extends OutputWriter { + private val lineSeparator = options.lineSeparatorInWrite.getOrElse(Array(0x0A.toByte)) + private val writer = CodecStreams.createOutputStream(context, new Path(path)) override def write(row: InternalRow): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala index a523f5aee63ed..360e15640621f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala @@ -43,16 +43,37 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti val charset: Option[String] = Some("UTF-8") - val lineSeparator: Option[Array[Byte]] = parameters.get("lineSep").map { lineSep => - require(lineSep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") - lineSep.getBytes(charset.getOrElse( - throw new IllegalArgumentException("Please, set the charset option for the delimiter"))) + /** + * A sequence of bytes between two consecutive lines in a text. + * Format of the option is: + * selector (1 char) + separator spec (any length) | sequence of chars + * + * Currently the following selectors are supported: + * - 'x' + sequence of bytes in hexadecimal format. For example: "x0a 0d". + * Hex pairs can be separated by any chars different from 0-9,A-F,a-f + * - '\' - reserved for a sequence of control chars like "\r\n" + * and unicode escape like "\u000D\u000A" + * - 'r' and '/' - reserved for future use + */ + val lineSeparator: Option[Array[Byte]] = parameters.get("lineSep").collect { + case hexs if hexs.startsWith("x") => + hexs.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray + .map(Integer.parseInt(_, 16).toByte) + case reserved if reserved.startsWith("r") || reserved.startsWith("/") => + throw new NotImplementedError(s"The $reserved selector has not supported yet") + case "" => throw new IllegalArgumentException("lineSep cannot be empty string") + case lineSep => lineSep.getBytes(charset.getOrElse("UTF-8")) } - // Note that the option 'lineSep' uses a different default value in read and write. + /** + * A sequence of bytes between two consecutive lines used by Text Reader to + * split input stream/text. + */ val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator - val lineSeparatorInWrite: Array[Byte] = - lineSeparatorInRead.getOrElse("\n".getBytes(StandardCharsets.UTF_8)) + /** + * Text Writer puts the string between lines in output stream/text. + */ + val lineSeparatorInWrite: Option[Array[Byte]] = lineSeparator } private[datasources] object TextOptions { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index d5f0593f523b3..069d763da6760 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2073,9 +2073,10 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val fileName = "json-tests/utf16WithBOM.json" val schema = new StructType().add("firstName", StringType).add("lastName", StringType) val jsonDF = spark.read.schema(schema) - .option("recordDelimiter", "x0d 00 0a 00") + .option("lineSep", "x0d 00 0a 00") .json(testFile(fileName)) + val a = jsonDF.collect() checkAnswer(jsonDF, Seq( Row("Chris", "Baird"), Row("Doug", "Rood") )) From f94d846b39ade89da24ef3e85f9721fb34e48154 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 30 Mar 2018 20:27:51 +0200 Subject: [PATCH 3/3] Making the doc generator happy --- .../spark/sql/execution/datasources/text/TextFileFormat.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index c2af2fc29766f..9e67abac7ec20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -17,13 +17,11 @@ package org.apache.spark.sql.execution.datasources.text -import java.nio.charset.Charset - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} -import org.apache.spark.TaskContext +import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.InternalRow