Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion python/pyspark/sql/connect/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,10 @@ def _proto_to_string(self, p: google.protobuf.message.Message) -> str:
-------
Single line string of the serialized proto message.
"""
return text_format.MessageToString(p, as_one_line=True)
try:
return text_format.MessageToString(p, as_one_line=True)
except RecursionError:
return "<Truncated message due to recursion error>"

def schema(self, plan: pb2.Plan) -> StructType:
"""
Expand Down
13 changes: 13 additions & 0 deletions python/pyspark/sql/tests/connect/test_connect_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,19 @@ def spark_connect_clean_up_test_data(cls):


class SparkConnectBasicTests(SparkConnectSQLTestCase):
def test_recursion_handling_for_plan_logging(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @grundprinzip and @HyukjinKwon .

This test case seems to fail with Python 3.11. SPARK-45987 is filed for Python 3.11 failure.

"""SPARK-45852 - Test that we can handle recursion in plan logging."""
cdf = self.connect.range(1)
for x in range(400):
cdf = cdf.withColumn(f"col_{x}", CF.lit(x))

# Calling schema will trigger logging the message that will in turn trigger the message
# conversion into protobuf that will then trigger the recursion error.
self.assertIsNotNone(cdf.schema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The failure happens here.

ERROR [3.874s]: test_recursion_handling_for_plan_logging (pyspark.sql.tests.connect.test_connect_basic.SparkConnectBasicTests.test_recursion_handling_for_plan_logging)
SPARK-45852 - Test that we can handle recursion in plan logging.
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/__w/spark/spark/python/pyspark/sql/tests/connect/test_connect_basic.py", line 171, in test_recursion_handling_for_plan_logging
    self.assertIsNotNone(cdf.schema)
                         ^^^^^^^^^^
  File "/__w/spark/spark/python/pyspark/sql/connect/dataframe.py", line 1735, in schema
    return self._session.client.schema(query)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 924, in schema
    schema = self._analyze(method="schema", plan=plan).schema
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1110, in _analyze
    self._handle_error(error)
  File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1499, in _handle_error
    self._handle_rpc_error(error)
  File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1570, in _handle_rpc_error
    raise SparkConnectGrpcException(str(rpc_error)) from None
pyspark.errors.exceptions.connect.SparkConnectGrpcException: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.INTERNAL
	details = "Exception serializing request!"
	debug_error_string = "None"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!!


result = self.connect._client._proto_to_string(cdf._plan.to_proto(self.connect._client))
self.assertIn("recursion", result)

def test_df_getattr_behavior(self):
cdf = self.connect.range(10)
sdf = self.spark.range(10)
Expand Down