-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-18699][SQL] Put malformed tokens into a new field when parsing CSV data #16928
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
74e1fc5
763601d
873a383
4eed4a4
448e6fe
619094a
80c3775
c86febe
8d9386a
512fb42
3d514e5
a58ff1f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -191,10 +191,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, | |
| :param mode: allows a mode for dealing with corrupt records during parsing. If None is | ||
| set, it uses the default value, ``PERMISSIVE``. | ||
|
|
||
| * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ | ||
| record and puts the malformed string into a new field configured by \ | ||
| ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \ | ||
| ``null`` for extra fields. | ||
| * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ | ||
| record, and puts the malformed string into a field configured by \ | ||
| ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ | ||
| a string type field named ``columnNameOfCorruptRecord`` in an user-defined \ | ||
| schema. If a schema does not have the field, it drops corrupt records during \ | ||
| parsing. When inferring a schema, it implicitly adds a \ | ||
| ``columnNameOfCorruptRecord`` field in an output schema. | ||
| * ``DROPMALFORMED`` : ignores the whole corrupted records. | ||
| * ``FAILFAST`` : throws an exception when it meets corrupted records. | ||
|
|
||
|
|
@@ -304,7 +307,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non | |
| comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, | ||
| ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, | ||
| negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, | ||
| maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None): | ||
| maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None, | ||
| columnNameOfCorruptRecord=None): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doh, it seems we should add this in
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. okay, I'll check soon |
||
| """Loads a CSV file and returns the result as a :class:`DataFrame`. | ||
|
|
||
| This function will go through the input once to determine the input schema if | ||
|
|
@@ -366,11 +370,22 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non | |
| :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. | ||
| If None is set, it uses the default value, session local timezone. | ||
|
|
||
| * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. | ||
| When a schema is set by user, it sets ``null`` for extra fields. | ||
| * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ | ||
| record, and puts the malformed string into a field configured by \ | ||
| ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ | ||
| a string type field named ``columnNameOfCorruptRecord`` in an \ | ||
| user-defined schema. If a schema does not have the field, it drops corrupt \ | ||
| records during parsing. When a length of parsed CSV tokens is shorter than \ | ||
| an expected length of a schema, it sets `null` for extra fields. | ||
| * ``DROPMALFORMED`` : ignores the whole corrupted records. | ||
| * ``FAILFAST`` : throws an exception when it meets corrupted records. | ||
|
|
||
| :param columnNameOfCorruptRecord: allows renaming the new field having malformed string | ||
| created by ``PERMISSIVE`` mode. This overrides | ||
| ``spark.sql.columnNameOfCorruptRecord``. If None is set, | ||
| it uses the value specified in | ||
| ``spark.sql.columnNameOfCorruptRecord``. | ||
|
|
||
| >>> df = spark.read.csv('python/test_support/sql/ages.csv') | ||
| >>> df.dtypes | ||
| [('_c0', 'string'), ('_c1', 'string')] | ||
|
|
@@ -382,7 +397,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non | |
| nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, | ||
| dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, | ||
| maxCharsPerColumn=maxCharsPerColumn, | ||
| maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone) | ||
| maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone, | ||
| columnNameOfCorruptRecord=columnNameOfCorruptRecord) | ||
| if isinstance(path, basestring): | ||
| path = [path] | ||
| return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path))) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,9 +27,9 @@ import org.apache.hadoop.mapreduce._ | |
|
|
||
| import org.apache.spark.TaskContext | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql.{Dataset, Encoders, SparkSession} | ||
| import org.apache.spark.sql.{AnalysisException, Dataset, Encoders, SparkSession} | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs} | ||
| import org.apache.spark.sql.catalyst.util.CompressionCodecs | ||
| import org.apache.spark.sql.execution.datasources._ | ||
| import org.apache.spark.sql.execution.datasources.text.TextFileFormat | ||
| import org.apache.spark.sql.sources._ | ||
|
|
@@ -96,31 +96,44 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { | |
| filters: Seq[Filter], | ||
| options: Map[String, String], | ||
| hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { | ||
| val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) | ||
|
|
||
| CSVUtils.verifySchema(dataSchema) | ||
| val broadcastedHadoopConf = | ||
| sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) | ||
|
|
||
| val parsedOptions = new CSVOptions( | ||
| options, | ||
| sparkSession.sessionState.conf.sessionLocalTimeZone, | ||
| sparkSession.sessionState.conf.columnNameOfCorruptRecord) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (It seems
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed |
||
|
|
||
| // Check a field requirement for corrupt records here to throw an exception in a driver side | ||
| dataSchema.getFieldIndex(parsedOptions.columnNameOfCorruptRecord).foreach { corruptFieldIndex => | ||
| val f = dataSchema(corruptFieldIndex) | ||
| if (f.dataType != StringType || !f.nullable) { | ||
| throw new AnalysisException( | ||
| "The field for corrupt records must be string type and nullable") | ||
| } | ||
| } | ||
|
|
||
| (file: PartitionedFile) => { | ||
| val lines = { | ||
| val conf = broadcastedHadoopConf.value.value | ||
| val linesReader = new HadoopFileLinesReader(file, conf) | ||
| Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close())) | ||
| linesReader.map { line => | ||
| new String(line.getBytes, 0, line.getLength, csvOptions.charset) | ||
| new String(line.getBytes, 0, line.getLength, parsedOptions.charset) | ||
| } | ||
| } | ||
|
|
||
| val linesWithoutHeader = if (csvOptions.headerFlag && file.start == 0) { | ||
| val linesWithoutHeader = if (parsedOptions.headerFlag && file.start == 0) { | ||
| // Note that if there are only comments in the first block, the header would probably | ||
| // be not dropped. | ||
| CSVUtils.dropHeaderLine(lines, csvOptions) | ||
| CSVUtils.dropHeaderLine(lines, parsedOptions) | ||
| } else { | ||
| lines | ||
| } | ||
|
|
||
| val filteredLines = CSVUtils.filterCommentAndEmpty(linesWithoutHeader, csvOptions) | ||
| val parser = new UnivocityParser(dataSchema, requiredSchema, csvOptions) | ||
| val filteredLines = CSVUtils.filterCommentAndEmpty(linesWithoutHeader, parsedOptions) | ||
| val parser = new UnivocityParser(dataSchema, requiredSchema, parsedOptions) | ||
| filteredLines.flatMap(parser.parse) | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,11 +27,20 @@ import org.apache.spark.internal.Logging | |
| import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes} | ||
|
|
||
| private[csv] class CSVOptions( | ||
| @transient private val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String) | ||
| @transient private val parameters: CaseInsensitiveMap[String], | ||
| defaultTimeZoneId: String, | ||
| defaultColumnNameOfCorruptRecord: String) | ||
| extends Logging with Serializable { | ||
|
|
||
| def this(parameters: Map[String, String], defaultTimeZoneId: String) = | ||
| this(CaseInsensitiveMap(parameters), defaultTimeZoneId) | ||
| def this( | ||
| parameters: Map[String, String], | ||
| defaultTimeZoneId: String, | ||
| defaultColumnNameOfCorruptRecord: String = "") = { | ||
| this( | ||
| CaseInsensitiveMap(parameters), | ||
| defaultTimeZoneId, | ||
| defaultColumnNameOfCorruptRecord) | ||
| } | ||
|
|
||
| private def getChar(paramName: String, default: Char): Char = { | ||
| val paramValue = parameters.get(paramName) | ||
|
|
@@ -95,6 +104,9 @@ private[csv] class CSVOptions( | |
| val dropMalformed = ParseModes.isDropMalformedMode(parseMode) | ||
| val permissive = ParseModes.isPermissiveMode(parseMode) | ||
|
|
||
| val columnNameOfCorruptRecord = | ||
| parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe, we should add this in
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added doc descriptions in |
||
|
|
||
| val nullValue = parameters.getOrElse("nullValue", "") | ||
|
|
||
| val nanValue = parameters.getOrElse("nanValue", "NaN") | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu For JSON, we implicitly add the
columnNameOfCorruptRecordfield during schema inference, when the mode isPERMISSIVE. What is the reason we are not doing the same thing for CSV schema inference?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Sorry for interrupting) yea, it should be consistent and we probably should change. Probably, we should also consider the records with tokens less or more than the schema as malformed records in PERMISSIVE mode rafher than filling some of it. @cloud-fan raised this issue before and I had a talk with some data analysists. It looked some agree and others do not. So, I just decided to not change the current behaviour for now.
To cut it short, the reason (I assume) is I could not imagine a simple common case that fails to parse CSV (not during conversion) for the current implementation. If there are, we should match the behaviour.
I am currently outside and this is my phone. I will double check this when I get to my computer but this will be correct if I haven't missed some changes in this code path.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In CSV, the records with tokens less or more than the schema are already viewed as malformed records in (at least) 2.2. I did not check the previous versions.
I think we need to implicitly add the column
columnNameOfCorruptRecordduring the schema inference too.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has more than one issue here. The default of
columnNameOfCorruptRecorddoes not respect the session confspark.sql.columnNameOfCorruptRecordThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will submit a PR soon for fixing both issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, users have to manually add the column
columnNameOfCorruptRecordfor seeing these malformed records.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile, I just got to my laptop.
I checked when the length of tokens are more than the schema it fills the malformed column. with the data below:
(BTW, it looks respecting
spark.sql.columnNameOfCorruptRecord?)And, I found another bug (when the length is less then the schema):
with data
prints ...
It looks
getCurrentInputproducesnullas the input is all parsed.Another thing I would like to leave is (just to note the difference for all of us to not forget), JSON produces
nullin the columns and put the contents in the malformed column:With the input:
{"a": 1, "b": "a"}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh.. I was writing the comments before seeing your comments ... Yes, I agree with your comments.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me give a shot to fix the bug I found above (
NullPointerException). I think this can be easily fixed (but I am pretty sure the behaviour could be arguable). I will open a PR and cc you to show what it looks like.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for my late response. yea, I also think these behaviour should be the same. But, I tried though in this pr though, I couldn't because (both you already noticed this...) we couldn't easily add a new column in the CSV code path. So, I think we probably need some refactoring
DataSourceto make this behaviour consistent.