Skip to content
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1362,8 +1362,8 @@ def replace(self, to_replace, value=None, subset=None):
"""Returns a new :class:`DataFrame` replacing a value with another value.
:func:`DataFrame.replace` and :func:`DataFrameNaFunctions.replace` are
aliases of each other.
Values to_replace and value should contain either all numerics, all booleans,
or all strings. When replacing, the new value will be cast
Values to_replace and value must have the same type and can only be numerics, booleans,
or strings. Value can have None. When replacing, the new value will be cast
to the type of the existing column.
For numeric replacements all values to be replaced should have unique
floating point representation. In case of conflicts (for example with `{42: -1, 42.0: 1}`)
Expand All @@ -1373,8 +1373,8 @@ def replace(self, to_replace, value=None, subset=None):
Value to be replaced.
If the value is a dict, then `value` is ignored and `to_replace` must be a
mapping between a value and a replacement.
:param value: int, long, float, string, or list.
The replacement value must be an int, long, float, or string. If `value` is a
:param value: int, long, float, string, list or None.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not related to this pr, but we should add bool to this type list here and in other descriptions?

The replacement value must be an int, long, float, string or None. If `value` is a
list, `value` should be of the same length and type as `to_replace`.
If `value` is a scalar and `to_replace` is a sequence, then `value` is
used as a replacement for each item in `to_replace`.
Expand All @@ -1393,6 +1393,16 @@ def replace(self, to_replace, value=None, subset=None):
|null| null| null|
+----+------+-----+

>>> df4.na.replace('Alice', None).show()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like now we allow something like df4.na.replace('Alice').show(). We're better add it here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think this should be something to be fixed in DataFrameNaFunctions.replace in this file ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and was thinking of not doing this here as strictly it should be a followup for SPARK-19454. I am fine with doing this here too while we are here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change allows us to do df4.na.replace('Alice'). I think SPARK-19454 doesn't?

Copy link
Member

@HyukjinKwon HyukjinKwon Aug 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is .na.replace vs .replace. I think both should be the same though. I just built against this PR and double checked as below:

>>> df = spark.createDataFrame([('Alice', 10, 80.0)])
>>> df.replace("Alice").first()
Row(_1=None, _2=10, _3=80.0)
>>> df.na.replace("Alice").first()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: replace() takes at least 3 arguments (2 given)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've not noticed that. Why we test dataframe.na.replace at the doc test of dataframe.replace? We should test dataframe.replace here.

Copy link
Member

@HyukjinKwon HyukjinKwon Aug 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we added an alise for dataframe.replace to promote use dataframe.na.replace? The doc says they are aliases anyway. I don't know but I tend to agree with paring doc tests and this looks renamed in ff26767.

Let's leave this as is for now. I don't want to make this PR complicated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I'm fine with this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed a JIRA for mismatching default value between replace and na.replace in SPARK-21658

+----+------+----+
| age|height|name|
+----+------+----+
| 10| 80|null|
| 5| null| Bob|
|null| null| Tom|
|null| null|null|
+----+------+----+

>>> df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show()
+----+------+----+
| age|height|name|
Expand Down Expand Up @@ -1428,9 +1438,9 @@ def all_of_(xs):
"to_replace should be a float, int, long, string, list, tuple, or dict. "
"Got {0}".format(type(to_replace)))

if not isinstance(value, valid_types) and not isinstance(to_replace, dict):
if not isinstance(value, valid_types + (type(None), )) and not isinstance(to_replace, dict):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would check None by value is None.

raise ValueError("If to_replace is not a dict, value should be "
"a float, int, long, string, list, or tuple. "
"a float, int, long, string, list, tuple or None. "
"Got {0}".format(type(value)))

if isinstance(to_replace, (list, tuple)) and isinstance(value, (list, tuple)):
Expand All @@ -1446,7 +1456,7 @@ def all_of_(xs):
if isinstance(to_replace, (float, int, long, basestring)):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bool?

to_replace = [to_replace]

if isinstance(value, (float, int, long, basestring)):
if isinstance(value, (float, int, long, basestring, type(None))):
value = [value for _ in range(len(to_replace))]

if isinstance(to_replace, dict):
Expand All @@ -1460,7 +1470,8 @@ def all_of_(xs):
subset = [subset]

# Verify we were not passed in mixed type generics."
Copy link
Member

@HyukjinKwon HyukjinKwon Aug 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we are here, let's remove this " at the end, which looks a typo.

if not any(all_of_type(rep_dict.keys()) and all_of_type(rep_dict.values())
if not any(all_of_type(rep_dict.keys())
and all_of_type(x for x in rep_dict.values() if x is not None)
for all_of_type in [all_of_bool, all_of_str, all_of_numeric]):
raise ValueError("Mixed type replacements are not supported")

Expand Down
10 changes: 10 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1964,6 +1964,16 @@ def test_replace(self):
.replace(False, True).first())
self.assertTupleEqual(row, (True, True))

# replace with None
row = self.spark.createDataFrame(
[(u'Alice', 10, 80.0)], schema).replace(u'Alice', None).first()
self.assertTupleEqual(row, (None, 10, 80.0))

# replace with numerics and None
row = self.spark.createDataFrame(
[(u'Alice', 10, 80.0)], schema).replace([10, 80], [20, None]).first()
self.assertTupleEqual(row, (u'Alice', 20, None))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test that to_replace is a list and value is not given (so as default value None)? Previously this will arise a ValueError. But now it is a valid usage. We are better to add a test explicitly for it.


# should fail if subset is not list, tuple or None
with self.assertRaises(ValueError):
self.spark.createDataFrame(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ class DataTypeSuite extends SparkFunSuite {
val message = intercept[SparkException] {
left.merge(right)
}.getMessage
assert(message.equals("Failed to merge fields 'b' and 'b'. " +
"Failed to merge incompatible data types FloatType and LongType"))
assert(message === "Failed to merge fields 'b' and 'b'. " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: not related to this PR. Please revert it back.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change a valid improvement? I forgot about === when I pushed that commit. I can revert this back but do I need create another PR? With or without JIRA?

Copy link
Member

@HyukjinKwon HyukjinKwon Aug 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does look a valid improvement but it makes backporting harder sometimes. Let's revert this one if we are fine. We could tell fixing this one when someone (or you) happens to fix some codes around here. I guess it is too trivial for a PR.

"Failed to merge incompatible data types FloatType and LongType")
}

test("existsRecursively") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
* Replaces values matching keys in `replacement` map with the corresponding values.
* Key and value of `replacement` map must have the same type, and
* can only be doubles, strings or booleans.
* `replacement` map value can have null.
* If `col` is "*", then the replacement is applied on all string columns or numeric columns.
*
* {{{
Expand Down Expand Up @@ -290,6 +291,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
* Replaces values matching keys in `replacement` map with the corresponding values.
* Key and value of `replacement` map must have the same type, and
* can only be doubles, strings or booleans.
* `replacement` map value can have null.
*
* {{{
* import com.google.common.collect.ImmutableMap;
Expand All @@ -314,6 +316,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
* (Scala-specific) Replaces values matching keys in `replacement` map.
* Key and value of `replacement` map must have the same type, and
* can only be doubles, strings or booleans.
* `replacement` map value can have null.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not put it here. It should be put in @parm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original comments put many @param up here. I will fix those as well.

* If `col` is "*",
* then the replacement is applied on all string columns , numeric columns or boolean columns.
*
Expand Down Expand Up @@ -344,7 +347,8 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
/**
* (Scala-specific) Replaces values matching keys in `replacement` map.
* Key and value of `replacement` map must have the same type, and
* can only be doubles , strings or booleans.
* can only be doubles, strings or booleans.
* `replacement` map value can have null.
*
* {{{
* // Replaces all occurrences of 1.0 with 2.0 in column "height" and "weight".
Expand All @@ -366,11 +370,15 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
return df
}

// replacementMap is either Map[String, String] or Map[Double, Double] or Map[Boolean,Boolean]
val replacementMap: Map[_, _] = replacement.head._2 match {
case v: String => replacement
case v: Boolean => replacement
case _ => replacement.map { case (k, v) => (convertToDouble(k), convertToDouble(v)) }
// replacementMap is either Map[String, String], Map[Double, Double], Map[Boolean,Boolean]
// while value can have null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the types are not these three types, what are the behaviors? Could you explain them here? Also, please add negative examples too. Thanks~

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If replacement is Map[Any, Any] type, the replacementMap will not be confined to these 3 types.
We tell users to only use doubles, strings and booleans in the replacement map in the method doc. But user can still use df.na.replace("*", Map(10 -> 20, "Alpha" -> "Bravo")). The result is that only fields that have same type as the 1st key in the replacement map will perform replacement. This is due to the implementation of targetColumnType a few lines below.
I'll modify the comments here. But for the negative examples (like the one I mentioned in this comment), do I need explain in the method doc to users?

val replacementMap: Map[_, _] = replacement.map {
case (k, v: String) => (k, v)
case (k, v: Boolean) => (k, v)
case (k: String, null) => (k, null)
case (k: Boolean, null) => (k, null)
case (k, null) => (convertToDouble(k), null)
case _ @(k, v) => (convertToDouble(k), convertToDouble(v))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use case (k, v) => instead of case _ @(k, v) => ?

}

// targetColumnType is either DoubleType or StringType or BooleanType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,16 @@ class DataFrameNaFunctionsSuite extends QueryTest with SharedSQLContext {
assert(out(4) === Row("Amy", null, null))
assert(out(5) === Row(null, null, null))

// Replace only the age column
val out1 = input.na.replace("age", Map(
// Replace only the age column and with null
val out1 = input.na.replace("age", Map[Any, Any](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about rather adding a separate test and leaving the existing test as is?

16 -> 61,
60 -> 6,
60 -> null,
164.3 -> 461.3 // Alice is really tall
)).collect()

assert(out1(0) === Row("Bob", 61, 176.5))
assert(out1(1) === Row("Alice", null, 164.3))
assert(out1(2) === Row("David", 6, null))
assert(out1(2) === Row("David", null, null))
assert(out1(3).get(2).asInstanceOf[Double].isNaN)
assert(out1(4) === Row("Amy", null, null))
assert(out1(5) === Row(null, null, null))
Expand Down