diff --git a/python/pyspark/sql/profiler.py b/python/pyspark/sql/profiler.py index 5ab27bce2582..711e39de4723 100644 --- a/python/pyspark/sql/profiler.py +++ b/python/pyspark/sql/profiler.py @@ -224,6 +224,56 @@ def dump(id: int) -> None: for id in sorted(code_map.keys()): dump(id) + def clear_perf_profiles(self, id: Optional[int] = None) -> None: + """ + Clear the perf profile results. + + .. versionadded:: 4.0.0 + + Parameters + ---------- + id : int, optional + The UDF ID whose profiling results should be cleared. + If not specified, all the results will be cleared. + """ + with self._lock: + if id is not None: + if id in self._profile_results: + perf, mem, *_ = self._profile_results[id] + self._profile_results[id] = (None, mem, *_) + if mem is None: + self._profile_results.pop(id, None) + else: + for id, (perf, mem, *_) in list(self._profile_results.items()): + self._profile_results[id] = (None, mem, *_) + if mem is None: + self._profile_results.pop(id, None) + + def clear_memory_profiles(self, id: Optional[int] = None) -> None: + """ + Clear the memory profile results. + + .. versionadded:: 4.0.0 + + Parameters + ---------- + id : int, optional + The UDF ID whose profiling results should be cleared. + If not specified, all the results will be cleared. + """ + with self._lock: + if id is not None: + if id in self._profile_results: + perf, mem, *_ = self._profile_results[id] + self._profile_results[id] = (perf, None, *_) + if perf is None: + self._profile_results.pop(id, None) + else: + for id, (perf, mem, *_) in list(self._profile_results.items()): + self._profile_results[id] = (perf, None, *_) + if perf is None: + self._profile_results.pop(id, None) + class AccumulatorProfilerCollector(ProfilerCollector): def __init__(self) -> None: @@ -309,3 +359,32 @@ def dump(self, path: str, id: Optional[int] = None, *, type: Optional[str] = Non "allowed_values": str(["perf", "memory"]), }, ) + + def clear(self, id: Optional[int] = None, *, type: Optional[str] = None) -> None: + """ + Clear the profile results. + + .. versionadded:: 4.0.0 + + Parameters + ---------- + id : int, optional + The UDF ID whose profiling results should be cleared. + If not specified, all the results will be cleared. + type : str, optional + The profiler type to clear results for, which can be either "perf" or "memory". + """ + if type == "memory": + self.profiler_collector.clear_memory_profiles(id) + elif type == "perf" or type is None: + self.profiler_collector.clear_perf_profiles(id) + if type is None: # Clear both perf and memory profiles + self.profiler_collector.clear_memory_profiles(id) + else: + raise PySparkValueError( + error_class="VALUE_NOT_ALLOWED", + message_parameters={ + "arg_name": "type", + "allowed_values": str(["perf", "memory"]), + }, + ) diff --git a/python/pyspark/sql/tests/test_session.py b/python/pyspark/sql/tests/test_session.py index b95e9de9e3f3..5f102d770c6a 100644 --- a/python/pyspark/sql/tests/test_session.py +++ b/python/pyspark/sql/tests/test_session.py @@ -531,6 +531,33 @@ def test_dump_invalid_type(self): }, ) + def test_clear_memory_type(self): + self.profile.clear(type="memory") + self.profiler_collector_mock.clear_memory_profiles.assert_called_once() + self.profiler_collector_mock.clear_perf_profiles.assert_not_called() + + def test_clear_perf_type(self): + self.profile.clear(type="perf") + self.profiler_collector_mock.clear_perf_profiles.assert_called_once() + self.profiler_collector_mock.clear_memory_profiles.assert_not_called() + + def test_clear_no_type(self): + self.profile.clear() + self.profiler_collector_mock.clear_perf_profiles.assert_called_once() + self.profiler_collector_mock.clear_memory_profiles.assert_called_once() + + def test_clear_invalid_type(self): + with self.assertRaises(PySparkValueError) as e: + self.profile.clear(type="invalid") + self.check_error( + exception=e.exception, + error_class="VALUE_NOT_ALLOWED", + message_parameters={ + "arg_name": "type", + "allowed_values": str(["perf", "memory"]), + }, + ) + class SparkExtensionsTest(unittest.TestCase): # These tests are separate because it uses 'spark.sql.extensions' which is diff --git a/python/pyspark/sql/tests/test_udf_profiler.py b/python/pyspark/sql/tests/test_udf_profiler.py index 557b4daa8550..a66503bc0213 100644 --- a/python/pyspark/sql/tests/test_udf_profiler.py +++ b/python/pyspark/sql/tests/test_udf_profiler.py @@ -521,6 +521,32 @@ def summarize(left, right): io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}" ) + def test_perf_profiler_clear(self): + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): + _do_computation(self.spark) + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + + for id in self.profile_results: + self.spark.profile.clear(id) + self.assertNotIn(id, self.profile_results) + self.assertEqual(0, len(self.profile_results), str(list(self.profile_results))) + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): + _do_computation(self.spark) + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + + self.spark.profile.clear(type="memory") + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + self.spark.profile.clear(type="perf") + self.assertEqual(0, len(self.profile_results), str(list(self.profile_results))) + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): + _do_computation(self.spark) + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + + self.spark.profile.clear() + self.assertEqual(0, len(self.profile_results), str(list(self.profile_results))) + class UDFProfiler2Tests(UDFProfiler2TestsMixin, ReusedSQLTestCase): def setUp(self) -> None: diff --git a/python/pyspark/tests/test_memory_profiler.py b/python/pyspark/tests/test_memory_profiler.py index f0abdd03e243..046dd3621c42 100644 --- a/python/pyspark/tests/test_memory_profiler.py +++ b/python/pyspark/tests/test_memory_profiler.py @@ -221,6 +221,10 @@ def trap_stdout(self): def profile_results(self): return self.spark._profiler_collector._memory_profile_results + @property + def perf_profile_results(self): + return self.spark._profiler_collector._perf_profile_results + def test_memory_profiler_udf(self): _do_computation(self.spark) @@ -571,6 +575,61 @@ def summarize(left, right): io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}" ) + def test_memory_profiler_clear(self): + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): + _do_computation(self.spark) + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + + for id in list(self.profile_results.keys()): + self.spark.profile.clear(id) + self.assertNotIn(id, self.profile_results) + self.assertEqual(0, len(self.profile_results), str(list(self.profile_results))) + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): + _do_computation(self.spark) + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + + self.spark.profile.clear(type="perf") + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + self.spark.profile.clear(type="memory") + self.assertEqual(0, len(self.profile_results), str(list(self.profile_results))) + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): + _do_computation(self.spark) + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + + self.spark.profile.clear() + self.assertEqual(0, len(self.profile_results), str(list(self.profile_results))) + + def test_profilers_clear(self): + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): + _do_computation(self.spark) + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): + _do_computation(self.spark) + + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + + # clear a specific memory profile + some_id = next(iter(self.profile_results)) + self.spark.profile.clear(some_id, type="memory") + self.assertEqual(2, len(self.profile_results), str(list(self.profile_results))) + self.assertEqual(3, len(self.perf_profile_results), str(list(self.perf_profile_results))) + + # clear a specific perf profile + some_id = next(iter(self.perf_profile_results)) + self.spark.profile.clear(some_id, type="perf") + self.assertEqual(2, len(self.perf_profile_results), str(list(self.perf_profile_results))) + self.assertEqual(2, len(self.profile_results), str(list(self.profile_results))) + + # clear all memory profiles + self.spark.profile.clear(type="memory") + self.assertEqual(0, len(self.profile_results), str(list(self.profile_results))) + self.assertEqual(2, len(self.perf_profile_results), str(list(self.perf_profile_results))) + + # clear all perf profiles + self.spark.profile.clear(type="perf") + self.assertEqual(0, len(self.perf_profile_results), str(list(self.perf_profile_results))) + class MemoryProfiler2Tests(MemoryProfiler2TestsMixin, ReusedSQLTestCase): def setUp(self) -> None: