From 52ccdde45369abec448bf083b6636689cd5b982e Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Mon, 4 Mar 2024 16:50:08 -0800 Subject: [PATCH 1/7] clear --- python/pyspark/sql/profiler.py | 65 ++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/python/pyspark/sql/profiler.py b/python/pyspark/sql/profiler.py index 5ab27bce2582..eee5faa0c398 100644 --- a/python/pyspark/sql/profiler.py +++ b/python/pyspark/sql/profiler.py @@ -224,6 +224,42 @@ def dump(id: int) -> None: for id in sorted(code_map.keys()): dump(id) + def clear_perf_profiles(self, id: Optional[int] = None) -> None: + """ + Clear the perf profile results. + + .. versionadded:: 4.0.0 + + Parameters + ---------- + id : int, optional + The UDF ID whose profiling results should be cleared. + If not specified, all the results will be cleared. + """ + with self._lock: + if id is not None: + self._perf_profile_results.pop(id, None) + else: + self._perf_profile_results.clear() + + def clear_memory_profiles(self, id: Optional[int] = None) -> None: + """ + Clear the memory profile results. + + .. versionadded:: 4.0.0 + + Parameters + ---------- + id : int, optional + The UDF ID whose profiling results should be cleared. + If not specified, all the results will be cleared. + """ + with self._lock: + if id is not None: + self._memory_profile_results.pop(id, None) + else: + self._memory_profile_results.clear() + class AccumulatorProfilerCollector(ProfilerCollector): def __init__(self) -> None: @@ -309,3 +345,32 @@ def dump(self, path: str, id: Optional[int] = None, *, type: Optional[str] = Non "allowed_values": str(["perf", "memory"]), }, ) + + def clear(self, id: Optional[int] = None, *, type: Optional[str] = None) -> None: + """ + Clear the profile results. + + .. versionadded:: 4.0.0 + + Parameters + ---------- + id : int, optional + The UDF ID whose profiling results should be cleared. + If not specified, all the results will be cleared. + type : str, optional + The profiler type to clear results for, which can be either "perf" or "memory". + """ + if type == "memory": + self.profiler_collector.clear_memory_profiles(id) + elif type == "perf" or type is None: + self.profiler_collector.clear_perf_profiles(id) + if type is None: # Clear both perf and memory profiles + self.profiler_collector.clear_memory_profiles(id) + else: + raise PySparkValueError( + error_class="VALUE_NOT_ALLOWED", + message_parameters={ + "arg_name": "type", + "allowed_values": str(["perf", "memory"]), + }, + ) From 2e05db853e7d1207c45df48fb2887d283c018ebb Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Mon, 4 Mar 2024 16:50:27 -0800 Subject: [PATCH 2/7] mock test --- python/pyspark/sql/tests/test_session.py | 27 ++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/python/pyspark/sql/tests/test_session.py b/python/pyspark/sql/tests/test_session.py index b95e9de9e3f3..5f102d770c6a 100644 --- a/python/pyspark/sql/tests/test_session.py +++ b/python/pyspark/sql/tests/test_session.py @@ -531,6 +531,33 @@ def test_dump_invalid_type(self): }, ) + def test_clear_memory_type(self): + self.profile.clear(type="memory") + self.profiler_collector_mock.clear_memory_profiles.assert_called_once() + self.profiler_collector_mock.clear_perf_profiles.assert_not_called() + + def test_clear_perf_type(self): + self.profile.clear(type="perf") + self.profiler_collector_mock.clear_perf_profiles.assert_called_once() + self.profiler_collector_mock.clear_memory_profiles.assert_not_called() + + def test_clear_no_type(self): + self.profile.clear() + self.profiler_collector_mock.clear_perf_profiles.assert_called_once() + self.profiler_collector_mock.clear_memory_profiles.assert_called_once() + + def test_clear_invalid_type(self): + with self.assertRaises(PySparkValueError) as e: + self.profile.clear(type="invalid") + self.check_error( + exception=e.exception, + error_class="VALUE_NOT_ALLOWED", + message_parameters={ + "arg_name": "type", + "allowed_values": str(["perf", "memory"]), + }, + ) + class SparkExtensionsTest(unittest.TestCase): # These tests are separate because it uses 'spark.sql.extensions' which is From 4b441ae35a994c4477666ea942cd10bdb496729c Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Tue, 5 Mar 2024 11:43:27 -0800 Subject: [PATCH 3/7] fix clear --- python/pyspark/sql/profiler.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/profiler.py b/python/pyspark/sql/profiler.py index eee5faa0c398..71a49e7e377f 100644 --- a/python/pyspark/sql/profiler.py +++ b/python/pyspark/sql/profiler.py @@ -236,11 +236,18 @@ def clear_perf_profiles(self, id: Optional[int] = None) -> None: The UDF ID whose profiling results should be cleared. If not specified, all the results will be cleared. """ + ids_to_remove = [ + result_id + for result_id, (perf, _, *_) in self._profile_results.items() + if perf is not None + ] with self._lock: if id is not None: - self._perf_profile_results.pop(id, None) + if id in ids_to_remove: + self._profile_results.pop(id, None) else: - self._perf_profile_results.clear() + for id_to_remove in ids_to_remove: + self._profile_results.pop(id_to_remove, None) def clear_memory_profiles(self, id: Optional[int] = None) -> None: """ @@ -255,10 +262,15 @@ def clear_memory_profiles(self, id: Optional[int] = None) -> None: If not specified, all the results will be cleared. """ with self._lock: + ids_to_remove = [ + id for id, (_, mem, *_) in self._profile_results.items() if mem is not None + ] if id is not None: - self._memory_profile_results.pop(id, None) + if id in ids_to_remove: + self._profile_results.pop(id, None) else: - self._memory_profile_results.clear() + for id_to_remove in ids_to_remove: + self._profile_results.pop(id_to_remove, None) class AccumulatorProfilerCollector(ProfilerCollector): From c3980bbe5641f4006abc2b7dca1905d31dfdb936 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Tue, 5 Mar 2024 11:43:47 -0800 Subject: [PATCH 4/7] test --- python/pyspark/sql/tests/test_udf_profiler.py | 26 +++++++++++++++++++ python/pyspark/tests/test_memory_profiler.py | 26 +++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/python/pyspark/sql/tests/test_udf_profiler.py b/python/pyspark/sql/tests/test_udf_profiler.py index 557b4daa8550..a66503bc0213 100644 --- a/python/pyspark/sql/tests/test_udf_profiler.py +++ b/python/pyspark/sql/tests/test_udf_profiler.py @@ -521,6 +521,32 @@ def summarize(left, right): io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}" ) + def test_perf_profiler_clear(self): + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): + _do_computation(self.spark) + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + + for id in self.profile_results: + self.spark.profile.clear(id) + self.assertNotIn(id, self.profile_results) + self.assertEqual(0, len(self.profile_results), str(list(self.profile_results))) + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): + _do_computation(self.spark) + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + + self.spark.profile.clear(type="memory") + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + self.spark.profile.clear(type="perf") + self.assertEqual(0, len(self.profile_results), str(list(self.profile_results))) + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): + _do_computation(self.spark) + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + + self.spark.profile.clear() + self.assertEqual(0, len(self.profile_results), str(list(self.profile_results))) + class UDFProfiler2Tests(UDFProfiler2TestsMixin, ReusedSQLTestCase): def setUp(self) -> None: diff --git a/python/pyspark/tests/test_memory_profiler.py b/python/pyspark/tests/test_memory_profiler.py index f0abdd03e243..25d5bd1a2f38 100644 --- a/python/pyspark/tests/test_memory_profiler.py +++ b/python/pyspark/tests/test_memory_profiler.py @@ -571,6 +571,32 @@ def summarize(left, right): io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}" ) + def test_memory_profiler_clear(self): + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): + _do_computation(self.spark) + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + + for id in list(self.profile_results.keys()): + self.spark.profile.clear(id) + self.assertNotIn(id, self.profile_results) + self.assertEqual(0, len(self.profile_results), str(list(self.profile_results))) + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): + _do_computation(self.spark) + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + + self.spark.profile.clear(type="perf") + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + self.spark.profile.clear(type="memory") + self.assertEqual(0, len(self.profile_results), str(list(self.profile_results))) + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): + _do_computation(self.spark) + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + + self.spark.profile.clear() + self.assertEqual(0, len(self.profile_results), str(list(self.profile_results))) + class MemoryProfiler2Tests(MemoryProfiler2TestsMixin, ReusedSQLTestCase): def setUp(self) -> None: From 5881718b801d6281aba4ebf3cc1b3bff60027ff1 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Wed, 6 Mar 2024 14:00:04 -0800 Subject: [PATCH 5/7] fix + test --- python/pyspark/sql/profiler.py | 42 ++++++++++++-------- python/pyspark/tests/test_memory_profiler.py | 31 +++++++++++++++ 2 files changed, 57 insertions(+), 16 deletions(-) diff --git a/python/pyspark/sql/profiler.py b/python/pyspark/sql/profiler.py index 71a49e7e377f..ece3b630901f 100644 --- a/python/pyspark/sql/profiler.py +++ b/python/pyspark/sql/profiler.py @@ -236,18 +236,22 @@ def clear_perf_profiles(self, id: Optional[int] = None) -> None: The UDF ID whose profiling results should be cleared. If not specified, all the results will be cleared. """ - ids_to_remove = [ - result_id - for result_id, (perf, _, *_) in self._profile_results.items() - if perf is not None - ] with self._lock: if id is not None: - if id in ids_to_remove: - self._profile_results.pop(id, None) + if id in self._profile_results: + perf, mem, *rest = self._profile_results[id] + self._profile_results[id] = (None, mem, *rest) + if mem is None: + self._profile_results.pop(id, None) else: - for id_to_remove in ids_to_remove: - self._profile_results.pop(id_to_remove, None) + ids_to_remove = [] + for id, (perf, mem, *rest) in list(self._profile_results.items()): + self._profile_results[id] = (None, mem, *rest) + if mem is None: + ids_to_remove.append(id) + + for id in ids_to_remove: + self._profile_results.pop(id, None) def clear_memory_profiles(self, id: Optional[int] = None) -> None: """ @@ -262,15 +266,21 @@ def clear_memory_profiles(self, id: Optional[int] = None) -> None: If not specified, all the results will be cleared. """ with self._lock: - ids_to_remove = [ - id for id, (_, mem, *_) in self._profile_results.items() if mem is not None - ] if id is not None: - if id in ids_to_remove: - self._profile_results.pop(id, None) + if id in self._profile_results: + perf, mem, *rest = self._profile_results[id] + self._profile_results[id] = (perf, None, *rest) + if perf is None: + self._profile_results.pop(id, None) else: - for id_to_remove in ids_to_remove: - self._profile_results.pop(id_to_remove, None) + ids_to_remove = [] + for id, (perf, mem, *rest) in list(self._profile_results.items()): + self._profile_results[id] = (perf, None, *rest) + if perf is None: + ids_to_remove.append(id) + + for id in ids_to_remove: + self._profile_results.pop(id, None) class AccumulatorProfilerCollector(ProfilerCollector): diff --git a/python/pyspark/tests/test_memory_profiler.py b/python/pyspark/tests/test_memory_profiler.py index 25d5bd1a2f38..03897a5f2645 100644 --- a/python/pyspark/tests/test_memory_profiler.py +++ b/python/pyspark/tests/test_memory_profiler.py @@ -221,6 +221,10 @@ def trap_stdout(self): def profile_results(self): return self.spark._profiler_collector._memory_profile_results + @property + def perf_profile_results(self): + return self.spark._profiler_collector._perf_profile_results + def test_memory_profiler_udf(self): _do_computation(self.spark) @@ -597,6 +601,33 @@ def test_memory_profiler_clear(self): self.spark.profile.clear() self.assertEqual(0, len(self.profile_results), str(list(self.profile_results))) + def test_profilers_clear(self): + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): + _do_computation(self.spark) + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): + _do_computation(self.spark) + + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + some_id = next(iter(self.profile_results[0])) + + # clear a specific memory profile + self.spark.profile.clear(some_id, type="memory") + self.assertEqual(2, len(self.profile_results), str(list(self.profile_results))) + self.assertEqual(3, len(self.perf_profile_results), str(list(self.perf_profile_results))) + + # clear a specific perf profile + self.spark.profile.clear(some_id, type="perf") + self.assertEqual(2, len(self.perf_profile_results), str(list(self.perf_profile_results))) + + # clear all memory profiles + self.spark.profile.clear(type="memory") + self.assertEqual(0, len(self.profile_results), str(list(self.profile_results))) + self.assertEqual(2, len(self.perf_profile_results), str(list(self.perf_profile_results))) + + # clear all perf profiles + self.spark.profile.clear(type="perf") + self.assertEqual(0, len(self.perf_profile_results), str(list(self.perf_profile_results))) + class MemoryProfiler2Tests(MemoryProfiler2TestsMixin, ReusedSQLTestCase): def setUp(self) -> None: From dc5c6ca575178fecddb6a125bcb5fbead60d2bcd Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Wed, 6 Mar 2024 17:29:25 -0800 Subject: [PATCH 6/7] fix --- python/pyspark/sql/profiler.py | 28 +++++++------------- python/pyspark/tests/test_memory_profiler.py | 2 +- 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/python/pyspark/sql/profiler.py b/python/pyspark/sql/profiler.py index ece3b630901f..9af4135b75cf 100644 --- a/python/pyspark/sql/profiler.py +++ b/python/pyspark/sql/profiler.py @@ -239,19 +239,15 @@ def clear_perf_profiles(self, id: Optional[int] = None) -> None: with self._lock: if id is not None: if id in self._profile_results: - perf, mem, *rest = self._profile_results[id] - self._profile_results[id] = (None, mem, *rest) + perf, mem, *_ = self._profile_results[id] # type: ignore + self._profile_results[id] = (None, mem, *_) if mem is None: self._profile_results.pop(id, None) else: - ids_to_remove = [] - for id, (perf, mem, *rest) in list(self._profile_results.items()): - self._profile_results[id] = (None, mem, *rest) + for id, (perf, mem, *_) in list(self._profile_results.items()): + self._profile_results[id] = (None, mem, *_) if mem is None: - ids_to_remove.append(id) - - for id in ids_to_remove: - self._profile_results.pop(id, None) + self._profile_results.pop(id, None) def clear_memory_profiles(self, id: Optional[int] = None) -> None: """ @@ -268,19 +264,15 @@ def clear_memory_profiles(self, id: Optional[int] = None) -> None: with self._lock: if id is not None: if id in self._profile_results: - perf, mem, *rest = self._profile_results[id] - self._profile_results[id] = (perf, None, *rest) + perf, mem, *_ = self._profile_results[id] # type: ignore + self._profile_results[id] = (perf, None, *_) if perf is None: self._profile_results.pop(id, None) else: - ids_to_remove = [] - for id, (perf, mem, *rest) in list(self._profile_results.items()): - self._profile_results[id] = (perf, None, *rest) + for id, (perf, mem, *_) in list(self._profile_results.items()): + self._profile_results[id] = (perf, None, *_) if perf is None: - ids_to_remove.append(id) - - for id in ids_to_remove: - self._profile_results.pop(id, None) + self._profile_results.pop(id, None) class AccumulatorProfilerCollector(ProfilerCollector): diff --git a/python/pyspark/tests/test_memory_profiler.py b/python/pyspark/tests/test_memory_profiler.py index 03897a5f2645..6bb0385fc0b9 100644 --- a/python/pyspark/tests/test_memory_profiler.py +++ b/python/pyspark/tests/test_memory_profiler.py @@ -608,7 +608,7 @@ def test_profilers_clear(self): _do_computation(self.spark) self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) - some_id = next(iter(self.profile_results[0])) + some_id = next(iter(self.profile_results)) # clear a specific memory profile self.spark.profile.clear(some_id, type="memory") From 3de5afbb4fa7d6f1793b65c47983c70c52b8c9d2 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Thu, 7 Mar 2024 11:49:08 -0800 Subject: [PATCH 7/7] fix --- python/pyspark/sql/profiler.py | 4 ++-- python/pyspark/tests/test_memory_profiler.py | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/profiler.py b/python/pyspark/sql/profiler.py index 9af4135b75cf..711e39de4723 100644 --- a/python/pyspark/sql/profiler.py +++ b/python/pyspark/sql/profiler.py @@ -239,7 +239,7 @@ def clear_perf_profiles(self, id: Optional[int] = None) -> None: with self._lock: if id is not None: if id in self._profile_results: - perf, mem, *_ = self._profile_results[id] # type: ignore + perf, mem, *_ = self._profile_results[id] self._profile_results[id] = (None, mem, *_) if mem is None: self._profile_results.pop(id, None) @@ -264,7 +264,7 @@ def clear_memory_profiles(self, id: Optional[int] = None) -> None: with self._lock: if id is not None: if id in self._profile_results: - perf, mem, *_ = self._profile_results[id] # type: ignore + perf, mem, *_ = self._profile_results[id] self._profile_results[id] = (perf, None, *_) if perf is None: self._profile_results.pop(id, None) diff --git a/python/pyspark/tests/test_memory_profiler.py b/python/pyspark/tests/test_memory_profiler.py index 6bb0385fc0b9..046dd3621c42 100644 --- a/python/pyspark/tests/test_memory_profiler.py +++ b/python/pyspark/tests/test_memory_profiler.py @@ -608,16 +608,18 @@ def test_profilers_clear(self): _do_computation(self.spark) self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) - some_id = next(iter(self.profile_results)) # clear a specific memory profile + some_id = next(iter(self.profile_results)) self.spark.profile.clear(some_id, type="memory") self.assertEqual(2, len(self.profile_results), str(list(self.profile_results))) self.assertEqual(3, len(self.perf_profile_results), str(list(self.perf_profile_results))) # clear a specific perf profile + some_id = next(iter(self.perf_profile_results)) self.spark.profile.clear(some_id, type="perf") self.assertEqual(2, len(self.perf_profile_results), str(list(self.perf_profile_results))) + self.assertEqual(2, len(self.profile_results), str(list(self.profile_results))) # clear all memory profiles self.spark.profile.clear(type="memory")