Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions python/pyspark/sql/streaming/stateful_processor_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,11 @@ def normalize_value(v: Any) -> Any:
# Convert NumPy types to Python primitive types.
if isinstance(v, np.generic):
return v.tolist()
# Named tuples (collections.namedtuple or typing.NamedTuple) have a
# _fields attribute. Spark Row has __fields__. Both require positional
# arguments and cannot be instantiated with a generator expression.
if hasattr(v, '_fields') or hasattr(v, '__fields__'):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think checking the type is much safer than checking an attribute, especially considering that _fields is a not a rare attribute name. If we know what type we are targeting, we should just check type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good! You are so fast. Did not get a chance to add a UT yet 😛 just convert to draft.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @gaogaotiantian, I use

                if (
                    isinstance(v, Row) or
                    (isinstance(v, tuple) and hasattr(v, "_fields"))
                ):

instead. isinstance(v, NamedTuple) won’t work because typing.NamedTuple is a class factory, not a runtime parent of instances. Checking isinstance(v, tuple) and _fields is the correct way. Please take another look. Thanks!

return type(v)(*[normalize_value(e) for e in v])
# List / tuple: recursively normalize each element
if isinstance(v, (list, tuple)):
return type(v)(normalize_value(e) for e in v)
Expand Down