diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index c3657acb7d86..5b6b6352262a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -53,7 +53,8 @@ class UnivocityParser( // Retrieve the raw record string. private def getCurrentInput: UTF8String = { - UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd) + UTF8String.fromString( + Option(tokenizer.getContext.currentParsedContent()).map(_.stripLineEnd).orNull) } // This parser first picks some tokens from the input tokens, according to the required schema, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 352dba79a4c0..250aa9ac228a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1174,4 +1174,12 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } } + + test("SPARK-20978: Set null for malformed column when the number of tokens is less than schema") { + val df = spark.read + .schema("a string, b string, unparsed string") + .option("columnNameOfCorruptRecord", "unparsed") + .csv(Seq("a").toDS()) + checkAnswer(df, Row("a", null, null)) + } }