diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index 409c33d1727a..7495569ecfda 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -163,14 +163,26 @@ def handle_sigterm(*args): # Initialization complete try: + poller = None + if os.name == "posix": + # select.select has a known limit on the number of file descriptors + # it can handle. We use select.poll instead to avoid this limit. + poller = select.poll() + fd_reverse_map = {0: 0, listen_sock.fileno(): listen_sock} + poller.register(0, select.POLLIN) + poller.register(listen_sock, select.POLLIN) + while True: - try: - ready_fds = select.select([0, listen_sock], [], [], 1)[0] - except select.error as ex: - if ex[0] == EINTR: - continue - else: - raise + if poller is not None: + ready_fds = [fd_reverse_map[fd] for fd, _ in poller.poll(1000)] + else: + try: + ready_fds = select.select([0, listen_sock], [], [], 1)[0] + except select.error as ex: + if ex[0] == EINTR: + continue + else: + raise if 0 in ready_fds: try: @@ -208,6 +220,9 @@ def handle_sigterm(*args): if pid == 0: # in child process + if poller is not None: + poller.unregister(0) + poller.unregister(listen_sock) listen_sock.close() # It should close the standard input in the child process so that @@ -256,6 +271,9 @@ def handle_sigterm(*args): sock.close() finally: + if poller is not None: + poller.unregister(0) + poller.unregister(listen_sock) shutdown(1)