Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions python/pyspark/sql/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,56 @@ def dump(id: int) -> None:
for id in sorted(code_map.keys()):
dump(id)

def clear_perf_profiles(self, id: Optional[int] = None) -> None:
"""
Clear the perf profile results.

.. versionadded:: 4.0.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a user-facing API? If not, we don't need this version directive

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a user-facing API, along with profile.show and profile.dump. We will also add it to API doc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is not. The clear in Profile should be a user-facing API.


Parameters
----------
id : int, optional
The UDF ID whose profiling results should be cleared.
If not specified, all the results will be cleared.
"""
with self._lock:
if id is not None:
if id in self._profile_results:
perf, mem, *_ = self._profile_results[id] # type: ignore
self._profile_results[id] = (None, mem, *_)
if mem is None:
self._profile_results.pop(id, None)
else:
for id, (perf, mem, *_) in list(self._profile_results.items()):
self._profile_results[id] = (None, mem, *_)
if mem is None:
self._profile_results.pop(id, None)

def clear_memory_profiles(self, id: Optional[int] = None) -> None:
"""
Clear the memory profile results.

.. versionadded:: 4.0.0

Parameters
----------
id : int, optional
The UDF ID whose profiling results should be cleared.
If not specified, all the results will be cleared.
"""
with self._lock:
if id is not None:
if id in self._profile_results:
perf, mem, *_ = self._profile_results[id] # type: ignore
self._profile_results[id] = (perf, None, *_)
if perf is None:
self._profile_results.pop(id, None)
else:
for id, (perf, mem, *_) in list(self._profile_results.items()):
self._profile_results[id] = (perf, None, *_)
if perf is None:
self._profile_results.pop(id, None)


class AccumulatorProfilerCollector(ProfilerCollector):
def __init__(self) -> None:
Expand Down Expand Up @@ -309,3 +359,32 @@ def dump(self, path: str, id: Optional[int] = None, *, type: Optional[str] = Non
"allowed_values": str(["perf", "memory"]),
},
)

def clear(self, id: Optional[int] = None, *, type: Optional[str] = None) -> None:
"""
Clear the profile results.

.. versionadded:: 4.0.0

Parameters
----------
id : int, optional
The UDF ID whose profiling results should be cleared.
If not specified, all the results will be cleared.
type : str, optional
The profiler type to clear results for, which can be either "perf" or "memory".
"""
if type == "memory":
self.profiler_collector.clear_memory_profiles(id)
elif type == "perf" or type is None:
self.profiler_collector.clear_perf_profiles(id)
if type is None: # Clear both perf and memory profiles
self.profiler_collector.clear_memory_profiles(id)
else:
raise PySparkValueError(
error_class="VALUE_NOT_ALLOWED",
message_parameters={
"arg_name": "type",
"allowed_values": str(["perf", "memory"]),
},
)
27 changes: 27 additions & 0 deletions python/pyspark/sql/tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,33 @@ def test_dump_invalid_type(self):
},
)

def test_clear_memory_type(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, it seems we don't have a parity test for test_session. does it make sense to move SparkSessionProfileTests out of test_session and add parity test for it?

Copy link
Member Author

@xinrong-meng xinrong-meng Mar 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!

For now, all logic tested by SparkSessionProfileTests is directly imported in Spark Connect with no modification. But I do agree separating it later will improve readability and ensure future parity. I'll refactor later. Thanks!

self.profile.clear(type="memory")
self.profiler_collector_mock.clear_memory_profiles.assert_called_once()
self.profiler_collector_mock.clear_perf_profiles.assert_not_called()

def test_clear_perf_type(self):
self.profile.clear(type="perf")
self.profiler_collector_mock.clear_perf_profiles.assert_called_once()
self.profiler_collector_mock.clear_memory_profiles.assert_not_called()

def test_clear_no_type(self):
self.profile.clear()
self.profiler_collector_mock.clear_perf_profiles.assert_called_once()
self.profiler_collector_mock.clear_memory_profiles.assert_called_once()

def test_clear_invalid_type(self):
with self.assertRaises(PySparkValueError) as e:
self.profile.clear(type="invalid")
self.check_error(
exception=e.exception,
error_class="VALUE_NOT_ALLOWED",
message_parameters={
"arg_name": "type",
"allowed_values": str(["perf", "memory"]),
},
)


class SparkExtensionsTest(unittest.TestCase):
# These tests are separate because it uses 'spark.sql.extensions' which is
Expand Down
26 changes: 26 additions & 0 deletions python/pyspark/sql/tests/test_udf_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,32 @@ def summarize(left, right):
io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
)

def test_perf_profiler_clear(self):
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
_do_computation(self.spark)
self.assertEqual(3, len(self.profile_results), str(list(self.profile_results)))

for id in self.profile_results:
self.spark.profile.clear(id)
self.assertNotIn(id, self.profile_results)
self.assertEqual(0, len(self.profile_results), str(list(self.profile_results)))

with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
_do_computation(self.spark)
self.assertEqual(3, len(self.profile_results), str(list(self.profile_results)))

self.spark.profile.clear(type="memory")
self.assertEqual(3, len(self.profile_results), str(list(self.profile_results)))
self.spark.profile.clear(type="perf")
self.assertEqual(0, len(self.profile_results), str(list(self.profile_results)))

with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
_do_computation(self.spark)
self.assertEqual(3, len(self.profile_results), str(list(self.profile_results)))

self.spark.profile.clear()
self.assertEqual(0, len(self.profile_results), str(list(self.profile_results)))


class UDFProfiler2Tests(UDFProfiler2TestsMixin, ReusedSQLTestCase):
def setUp(self) -> None:
Expand Down
57 changes: 57 additions & 0 deletions python/pyspark/tests/test_memory_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ def trap_stdout(self):
def profile_results(self):
return self.spark._profiler_collector._memory_profile_results

@property
def perf_profile_results(self):
return self.spark._profiler_collector._perf_profile_results

def test_memory_profiler_udf(self):
_do_computation(self.spark)

Expand Down Expand Up @@ -571,6 +575,59 @@ def summarize(left, right):
io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
)

def test_memory_profiler_clear(self):
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
_do_computation(self.spark)
self.assertEqual(3, len(self.profile_results), str(list(self.profile_results)))

for id in list(self.profile_results.keys()):
self.spark.profile.clear(id)
self.assertNotIn(id, self.profile_results)
self.assertEqual(0, len(self.profile_results), str(list(self.profile_results)))

with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
_do_computation(self.spark)
self.assertEqual(3, len(self.profile_results), str(list(self.profile_results)))

self.spark.profile.clear(type="perf")
self.assertEqual(3, len(self.profile_results), str(list(self.profile_results)))
self.spark.profile.clear(type="memory")
self.assertEqual(0, len(self.profile_results), str(list(self.profile_results)))

with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
_do_computation(self.spark)
self.assertEqual(3, len(self.profile_results), str(list(self.profile_results)))

self.spark.profile.clear()
self.assertEqual(0, len(self.profile_results), str(list(self.profile_results)))

def test_profilers_clear(self):
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
_do_computation(self.spark)
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
_do_computation(self.spark)

self.assertEqual(3, len(self.profile_results), str(list(self.profile_results)))
some_id = next(iter(self.profile_results))

# clear a specific memory profile
self.spark.profile.clear(some_id, type="memory")
self.assertEqual(2, len(self.profile_results), str(list(self.profile_results)))
self.assertEqual(3, len(self.perf_profile_results), str(list(self.perf_profile_results)))

# clear a specific perf profile
self.spark.profile.clear(some_id, type="perf")
self.assertEqual(2, len(self.perf_profile_results), str(list(self.perf_profile_results)))

# clear all memory profiles
self.spark.profile.clear(type="memory")
self.assertEqual(0, len(self.profile_results), str(list(self.profile_results)))
self.assertEqual(2, len(self.perf_profile_results), str(list(self.perf_profile_results)))

# clear all perf profiles
self.spark.profile.clear(type="perf")
self.assertEqual(0, len(self.perf_profile_results), str(list(self.perf_profile_results)))


class MemoryProfiler2Tests(MemoryProfiler2TestsMixin, ReusedSQLTestCase):
def setUp(self) -> None:
Expand Down