Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 19 additions & 14 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,15 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.

* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
record, and puts the malformed string into a field configured by \
``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
schema. If a schema does not have the field, it drops corrupt records during \
parsing. When inferring a schema, it implicitly adds a \
``columnNameOfCorruptRecord`` field in an output schema.
* ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
into a field configured by ``columnNameOfCorruptRecord``, and sets other \
fields to ``null``. To keep corrupt records, an user can set a string type \
field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
schema does not have the field, it drops corrupt records during parsing. \
When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should say it implicitly adds ... if a corrupted record is found while we are here? I think it only adds `columnNameOfCorruptRecord` when it meets a corrupted record during schema inference.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When users set a string type field named columnNameOfCorruptRecord in an user-defined schema, even no corrupted record, I think the field is still added. Or I misread this sentence?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I thought this:

When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` field in an output schema.

describes schema inference because it adds columnNameOfCorruptRecord column if malformed record was found during schema inference. I mean ..:

scala> spark.read.json(Seq("""{"a": 1}""", """{"a":""").toDS).printSchema()
root
 |-- _corrupt_record: string (nullable = true)
 |-- a: long (nullable = true)


scala> spark.read.json(Seq("""{"a": 1}""").toDS).printSchema()
root
 |-- a: long (nullable = true)

but yes I think I misread it. Here we describe things mainly about malformed records already.

field in an output schema. It doesn't support partial results. Even just one \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's trivial but how about we avoid an abbreviation like dosen't? It's usually what I do for doc although I am not sure if it actually matters.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

field can't be correctly parsed, all fields except for the field of \
``columnNameOfCorruptRecord`` will be set to ``null``.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.

Expand Down Expand Up @@ -393,13 +395,16 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.

* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
record, and puts the malformed string into a field configured by \
``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
a string type field named ``columnNameOfCorruptRecord`` in an \
user-defined schema. If a schema does not have the field, it drops corrupt \
records during parsing. When a length of parsed CSV tokens is shorter than \
an expected length of a schema, it sets `null` for extra fields.
* ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
into a field configured by ``columnNameOfCorruptRecord``, and sets other \
fields to ``null``. To keep corrupt records, an user can set a string type \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't say and sets other fields to null, as it's not the case for CSV

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is talking about a corrupted record, not a record with less/more tokens. If CSV parser fails to parse a record, all other fields are set to null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I think we need to explain that, for CSV a record with less/more tokens is not a malformed record.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Added.

field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
schema does not have the field, it drops corrupt records during parsing. \
It supports partial result for the records just with less or more tokens \
than the schema. When it meets a malformed record whose parsed tokens is \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about a malformed record whose parsed tokens is -> a malformed record having the length of parsed tokens shorter than the length of a schema?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

shorter than an expected length of a schema, it sets ``null`` for extra \
fields. When a length of tokens is longer than a schema, it drops extra \
tokens.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.

Expand Down
33 changes: 19 additions & 14 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,13 +442,15 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.

* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
record, and puts the malformed string into a field configured by \
``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
schema. If a schema does not have the field, it drops corrupt records during \
parsing. When inferring a schema, it implicitly adds a \
``columnNameOfCorruptRecord`` field in an output schema.
* ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
into a field configured by ``columnNameOfCorruptRecord``, and sets other \
fields to ``null``. To keep corrupt records, an user can set a string type \
field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
schema does not have the field, it drops corrupt records during parsing. \
When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \
field in an output schema. It doesn't support partial results. Even just one \
field can't be correctly parsed, all fields except for the field of \
``columnNameOfCorruptRecord`` will be set to ``null``.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.

Expand Down Expand Up @@ -621,13 +623,16 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.

* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
record, and puts the malformed string into a field configured by \
``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
a string type field named ``columnNameOfCorruptRecord`` in an \
user-defined schema. If a schema does not have the field, it drops corrupt \
records during parsing. When a length of parsed CSV tokens is shorter than \
an expected length of a schema, it sets `null` for extra fields.
* ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
into a field configured by ``columnNameOfCorruptRecord``, and sets other \
fields to ``null``. To keep corrupt records, an user can set a string type \
field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
schema does not have the field, it drops corrupt records during parsing. \
It supports partial result for the records just with less or more tokens \
than the schema. When it meets a malformed record whose parsed tokens is \
shorter than an expected length of a schema, it sets ``null`` for extra \
fields. When a length of tokens is longer than a schema, it drops extra \
tokens.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ class JacksonParser(
}
} catch {
case e @ (_: RuntimeException | _: JsonProcessingException) =>
// JSON parser currently doesn't support partial results for corrupted records.
// For such records, all fields other than the field configured by
// `columnNameOfCorruptRecord` are set to `null`.
throw BadRecordException(() => recordLiteral(record), () => None, e)
}
}
Expand Down
18 changes: 11 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing.
* <ul>
* <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
* the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep
* <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
* field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep
* corrupt records, an user can set a string type field named `columnNameOfCorruptRecord`
* in an user-defined schema. If a schema does not have the field, it drops corrupt records
* during parsing. When inferring a schema, it implicitly adds a `columnNameOfCorruptRecord`
* field in an output schema.</li>
* field in an output schema. It doesn't support partial results. Even just one field can't
* be correctly parsed, all fields except for the field of `columnNameOfCorruptRecord` will
* be set to `null`.</li>
* <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
* <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
* </ul>
Expand Down Expand Up @@ -550,12 +552,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing. It supports the following case-insensitive modes.
* <ul>
* <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
* the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep
* <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
* field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep
* corrupt records, an user can set a string type field named `columnNameOfCorruptRecord`
* in an user-defined schema. If a schema does not have the field, it drops corrupt records
* during parsing. When a length of parsed CSV tokens is shorter than an expected length
* of a schema, it sets `null` for extra fields.</li>
* during parsing. It supports partial result for the records just with less or more tokens
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are same instances to update DataStreamReader, readwriter.py and streaming.py too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Will update accordingly.

* than the schema. When it meets a malformed record whose parsed tokens is shorter than an
* expected length of a schema, it sets `null` for extra fields. When a length of tokens is
* longer than a schema, it drops extra tokens.</li>
* <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
* <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
* </ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ class UnivocityParser(
case _: BadRecordException => None
}
}
// For records with less or more tokens than the schema, tries to return partial results
// if possible.
throw BadRecordException(
() => getCurrentInput,
() => getPartialResult(),
Expand All @@ -218,6 +220,9 @@ class UnivocityParser(
row
} catch {
case NonFatal(e) =>
// For corrupted records with the number of tokens same as the schema,
// CSV reader doesn't support partial results. All fields other than the field
// configured by `columnNameOfCorruptRecord` are set to `null`.
throw BadRecordException(() => getCurrentInput, () => None, e)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,14 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing.
* <ul>
* <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
* the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep
* <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
* field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep
* corrupt records, an user can set a string type field named `columnNameOfCorruptRecord`
* in an user-defined schema. If a schema does not have the field, it drops corrupt records
* during parsing. When inferring a schema, it implicitly adds a `columnNameOfCorruptRecord`
* field in an output schema.</li>
* field in an output schema. It doesn't support partial results. Even just one field can't
* be correctly parsed, all fields except for the field of `columnNameOfCorruptRecord` will
* be set to `null`.</li>
* <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
* <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
* </ul>
Expand Down Expand Up @@ -316,12 +318,14 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing. It supports the following case-insensitive modes.
* <ul>
* <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
* the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep
* <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
* field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep
* corrupt records, an user can set a string type field named `columnNameOfCorruptRecord`
* in an user-defined schema. If a schema does not have the field, it drops corrupt records
* during parsing. When a length of parsed CSV tokens is shorter than an expected length
* of a schema, it sets `null` for extra fields.</li>
* during parsing. It supports partial result for the records just with less or more tokens
* than the schema. When it meets a malformed record whose parsed tokens is shorter than an
* expected length of a schema, it sets `null` for extra fields. When a length of tokens is
* longer than a schema, it drops extra tokens.</li>
* <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
* <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
* </ul>
Expand Down