From 88e18d41cf5a919ddb28fb66feefc138effbd070 Mon Sep 17 00:00:00 2001 From: Wojciech Szlachta Date: Wed, 30 Apr 2025 16:03:53 +0100 Subject: [PATCH 1/5] [SPARK-51966][PYTHON] replace select.select() with select.poll() on posix On glibc based Linux systems select() can monitor only file descriptor numbers that are less than FD_SETSIZE (1024). This is an unreasonably low limit for many modern applications. --- python/pyspark/accumulators.py | 28 +++++++++++++++++++++++----- python/pyspark/daemon.py | 20 ++++++++++++++++++-- 2 files changed, 41 insertions(+), 7 deletions(-) diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index 59f7856688ee..77c761db4b31 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -266,11 +266,29 @@ def handle(self) -> None: auth_token = self.server.auth_token # type: ignore[attr-defined] def poll(func: Callable[[], bool]) -> None: - while not self.server.server_shutdown: # type: ignore[attr-defined] - # Poll every 1 second for new data -- don't block in case of shutdown. - r, _, _ = select.select([self.rfile], [], [], 1) - if self.rfile in r and func(): - break + rlist = [self.rfile.fileno()] + poller = None + try: + if os.name == "posix": + # On posix systems use poll to avoid problems with file descriptor numbers + # above 1024. + poller = select.poll() + for fd in rlist: + poller.register(fd, select.POLLIN) + + while not self.server.server_shutdown: # type: ignore[attr-defined] + # Poll every 1 second for new data -- don't block in case of shutdown. + if poller is not None: + r = [fd for fd, event in poller.poll(1) if event & select.POLLIN] + else: + # If poll is not available, use select. + r, _, _ = select.select(rlist, [], [], 1) + if self.rfile.fileno() in r and func(): + break + finally: + if poller is not None: + for fd in rlist: + poller.unregister(fd) def accum_updates() -> bool: num_updates = read_int(self.rfile) diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index 409c33d1727a..4d0b1cb54a52 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -162,10 +162,22 @@ def handle_sigterm(*args): reuse = os.environ.get("SPARK_REUSE_WORKER") # Initialization complete + rlist = [0, listen_sock.fileno()] + poller = None try: + if os.name == "posix": + # On posix systems use poll to avoid problems with file descriptor numbers above 1024. + poller = select.poll() + for fd in rlist: + poller.register(fd, select.POLLIN) + while True: try: - ready_fds = select.select([0, listen_sock], [], [], 1)[0] + if poller is not None: + ready_fds = [fd for fd, event in poller.poll(1)] + else: + # If poll is not available, use select. + ready_fds = select.select(rlist, [], [], 1)[0] except select.error as ex: if ex[0] == EINTR: continue @@ -183,7 +195,7 @@ def handle_sigterm(*args): except OSError: pass # process already died - if listen_sock in ready_fds: + if listen_sock.fileno() in ready_fds: try: sock, _ = listen_sock.accept() except OSError as e: @@ -256,6 +268,10 @@ def handle_sigterm(*args): sock.close() finally: + if poller is not None: + for fd in rlist: + poller.unregister(fd) + shutdown(1) From e4d08b0ec451733d1e43a20e92f00cb500298ae0 Mon Sep 17 00:00:00 2001 From: Wojciech Szlachta Date: Thu, 4 Dec 2025 15:23:25 +0000 Subject: [PATCH 2/5] [SPARK-51966][PYTHON] add env var PYSPARK_FORCE_SELECT to fallback to select.select() --- python/pyspark/accumulators.py | 6 +++--- python/pyspark/daemon.py | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index 77c761db4b31..2b8632aa9494 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -269,9 +269,9 @@ def poll(func: Callable[[], bool]) -> None: rlist = [self.rfile.fileno()] poller = None try: - if os.name == "posix": - # On posix systems use poll to avoid problems with file descriptor numbers - # above 1024. + if os.name == "posix" and os.getenv("PYSPARK_FORCE_SELECT", "false").lower() != "true": + # On posix systems use poll to avoid problems with file descriptor numbers above 1024 + # (unless we force select by setting the PYSPARK_FORCE_SELECT environment variable to true). poller = select.poll() for fd in rlist: poller.register(fd, select.POLLIN) diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index 4d0b1cb54a52..07d8a5df11d4 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -165,8 +165,9 @@ def handle_sigterm(*args): rlist = [0, listen_sock.fileno()] poller = None try: - if os.name == "posix": - # On posix systems use poll to avoid problems with file descriptor numbers above 1024. + if os.name == "posix" and os.getenv("PYSPARK_FORCE_SELECT", "false").lower() != "true": + # On posix systems use poll to avoid problems with file descriptor numbers above 1024 + # (unless we force select by setting the PYSPARK_FORCE_SELECT environment variable to true). poller = select.poll() for fd in rlist: poller.register(fd, select.POLLIN) From 4ff153885e03257c08c4b3d9cb0fd2ed45127c23 Mon Sep 17 00:00:00 2001 From: Wojciech Szlachta Date: Tue, 9 Dec 2025 18:25:02 +0000 Subject: [PATCH 3/5] [SPARK-51966][PYTHON] reflect changes in python/pyspark/daemon.py --- python/pyspark/accumulators.py | 39 ++++++++++++++++------------------ 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index 2b8632aa9494..e3fa3e89cf32 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -266,29 +266,26 @@ def handle(self) -> None: auth_token = self.server.auth_token # type: ignore[attr-defined] def poll(func: Callable[[], bool]) -> None: - rlist = [self.rfile.fileno()] poller = None - try: - if os.name == "posix" and os.getenv("PYSPARK_FORCE_SELECT", "false").lower() != "true": - # On posix systems use poll to avoid problems with file descriptor numbers above 1024 - # (unless we force select by setting the PYSPARK_FORCE_SELECT environment variable to true). - poller = select.poll() - for fd in rlist: - poller.register(fd, select.POLLIN) - - while not self.server.server_shutdown: # type: ignore[attr-defined] - # Poll every 1 second for new data -- don't block in case of shutdown. - if poller is not None: - r = [fd for fd, event in poller.poll(1) if event & select.POLLIN] - else: - # If poll is not available, use select. - r, _, _ = select.select(rlist, [], [], 1) - if self.rfile.fileno() in r and func(): - break - finally: + if os.name == "posix": + # On posix systems use poll to avoid problems with file descriptor + # numbers above 1024. + poller = select.poll() + poller.register(self.rfile, select.POLLIN) + + while not self.server.server_shutdown: # type: ignore[attr-defined] + # Poll every 1 second for new data -- don't block in case of shutdown. if poller is not None: - for fd in rlist: - poller.unregister(fd) + # Unlike select, poll timeout is in millis. Rule out error events. + r = [fd for fd, event in poller.poll(1000) if event & select.POLLIN] + else: + # If poll is not available, use select. + r, _, _ = select.select([self.rfile.fileno()], [], [], 1) + if self.rfile.fileno() in r and func(): + break + + if poller is not None: + poller.unregister(self.rfile) def accum_updates() -> bool: num_updates = read_int(self.rfile) From d04b73ab32c2d579feacd90a11f388c0f7e71dfb Mon Sep 17 00:00:00 2001 From: Wojciech Szlachta Date: Wed, 10 Dec 2025 12:49:43 +0000 Subject: [PATCH 4/5] [SPARK-51966][PYTHON] handle POLLHUP, POLLERR and POLLNVAL when using poll() --- python/pyspark/accumulators.py | 13 +++++++++++-- python/pyspark/daemon.py | 22 ++++++++++++++-------- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index e3fa3e89cf32..3042d339dc03 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -276,8 +276,17 @@ def poll(func: Callable[[], bool]) -> None: while not self.server.server_shutdown: # type: ignore[attr-defined] # Poll every 1 second for new data -- don't block in case of shutdown. if poller is not None: - # Unlike select, poll timeout is in millis. Rule out error events. - r = [fd for fd, event in poller.poll(1000) if event & select.POLLIN] + r = set() + # Unlike select, poll timeout is in millis. + for fd, event in poller.poll(1000): + if event & (select.POLLIN | select.POLLHUP): + # Data can be read (for POLLHUP peer hang up, so reads will return + # 0 bytes, in which case we want to break out - this is consistent + # with how select behaves). + r.add(fd) + else: + # Could be POLLERR or POLLNVAL (select would raise in this case). + raise PySparkRuntimeError(f"Polling error - event {event} on fd {fd}") else: # If poll is not available, use select. r, _, _ = select.select([self.rfile.fileno()], [], [], 1) diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index 7495569ecfda..a5711c5699f9 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -30,6 +30,7 @@ from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT from pyspark.serializers import read_int, write_int, write_with_length, UTF8Deserializer +from pyspark.errors import PySparkRuntimeError def compute_real_exit_code(exit_code): @@ -174,15 +175,20 @@ def handle_sigterm(*args): while True: if poller is not None: - ready_fds = [fd_reverse_map[fd] for fd, _ in poller.poll(1000)] - else: - try: - ready_fds = select.select([0, listen_sock], [], [], 1)[0] - except select.error as ex: - if ex[0] == EINTR: - continue + ready_fds = set() + # Unlike select, poll timeout is in millis. + for fd, event in poller.poll(1000): + if event & (select.POLLIN | select.POLLHUP): + # Data can be read (for POLLHUP peer hang up, so reads will return + # 0 bytes, in which case we want to break out - this is consistent + # with how select behaves). + ready_fds.add(fd_reverse_map[fd]) else: - raise + # Could be POLLERR or POLLNVAL (select would raise in this case). + raise PySparkRuntimeError(f"Polling error - event {event} on fd {fd}") + else: + # If poll is not available, use select. + ready_fds = select.select([0, listen_sock], [], [], 1)[0] if 0 in ready_fds: try: From 30505f9a424d362eb5d8cf5f2119aae6e5b777cd Mon Sep 17 00:00:00 2001 From: Wojciech Szlachta Date: Wed, 10 Dec 2025 15:36:03 +0000 Subject: [PATCH 5/5] [SPARK-51966][PYTHON] add type hints for mypy --- python/pyspark/accumulators.py | 1 + python/pyspark/daemon.py | 1 + 2 files changed, 2 insertions(+) diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index 3042d339dc03..5d3f8bda04c0 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -275,6 +275,7 @@ def poll(func: Callable[[], bool]) -> None: while not self.server.server_shutdown: # type: ignore[attr-defined] # Poll every 1 second for new data -- don't block in case of shutdown. + r: set | list if poller is not None: r = set() # Unlike select, poll timeout is in millis. diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index a5711c5699f9..5744fcefa436 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -174,6 +174,7 @@ def handle_sigterm(*args): poller.register(listen_sock, select.POLLIN) while True: + ready_fds: set | list if poller is not None: ready_fds = set() # Unlike select, poll timeout is in millis.