Skip to content

Conversation

@wjszlachta-man
Copy link

What changes were proposed in this pull request?

On glibc based Linux systems select() can monitor only file descriptor numbers that are less than FD_SETSIZE (1024).

This is an unreasonably low limit for many modern applications.

This PR replaces select.select() with select.poll() when running on POSIX os.

Why are the changes needed?

When running via pyspark we frequently observe:

Exception occurred during processing of request from ('127.0.0.1', 46334)
Traceback (most recent call last):
  File "/usr/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/usr/lib/python3.11/site-packages/pyspark/accumulators.py", line 293, in handle
    poll(authenticate_and_accum_updates)
  File "/usr/lib/python3.11/site-packages/pyspark/accumulators.py", line 266, in poll
    r, _, _ = select.select([self.rfile], [], [], 1)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: filedescriptor out of range in select()

On POSIX systems poll() should be used instead of select().

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existing unit tests + we have been running this change (combined with py4j/py4j#560) on our YARN cluster (Linux) since April 2025.

Was this patch authored or co-authored using generative AI tooling?

No

…osix

On glibc based Linux systems select() can monitor only file descriptor numbers
that are less than FD_SETSIZE (1024).

This is an unreasonably low limit for many modern applications.
@wjszlachta-man
Copy link
Author

This is identical to #50774 (which was never reviewed and closed by the bot), but rebased against current master.

@wjszlachta-man
Copy link
Author

@HyukjinKwon is that something you could maybe review (in combination with py4j/py4j#560)?

We needed to implement this change to allow us to run 1000+ executors without running into filedescriptor out of range in select() error.

@HyukjinKwon
Copy link
Member

Can we have an environment variable to fallback?

@wjszlachta-man
Copy link
Author

@HyukjinKwon you can now use PYSPARK_FORCE_SELECT to fallback to select.select().

@wjszlachta-man
Copy link
Author

@HyukjinKwon @gaogaotiantian - I see you now merged #53388

Can you merge a similar change to python/pyspark/accumulators.py as per this PR?

@wjszlachta-man
Copy link
Author

wjszlachta-man commented Dec 9, 2025

See traceback:

Exception occurred during processing of request from ('127.0.0.1', 46334)
Traceback (most recent call last):
  File "/usr/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/usr/lib/python3.11/site-packages/pyspark/accumulators.py", line 293, in handle
    poll(authenticate_and_accum_updates)
  File "/usr/lib/python3.11/site-packages/pyspark/accumulators.py", line 266, in poll
    r, _, _ = select.select([self.rfile], [], [], 1)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: filedescriptor out of range in select()

@gaogaotiantian
Copy link
Contributor

Could you make sure the CI pass? I think it’s the linter issue. Also it has some conflicts now.

@wjszlachta-man
Copy link
Author

Sure - let me rebase... (although conflict is because of #53388 - not sure why duplicate?)

Can we have an environment variable to fallback?

Do you still want environment variable to fallback? I see you don't have one in #53388

@gaogaotiantian
Copy link
Contributor

Sure - let me rebase... (although conflict is because of #53388 - not sure why duplicate?)

That would be my fault. I did not realize this PR exists when I fix the worker. Whether we should have an envvar fallback is a decision for @HyukjinKwon . Personally I think it's okay to just replace the old mechanism. We would need a whole config path for the fallback to work. With the heavily executed CI I think we can validate this local scope change pretty well.

@wjszlachta-man
Copy link
Author

Just updated the branch - this should fix conflict and overall is similar to your changes in python/pyspark/daemon.py.

Removed the fallback - if we have one it should be for both daemon.py and accumulators.py (personally I think it's redundant and should always use poll() if available).

As per your PR I updated poll() timeout to 1000 - I missed it was in millis (unlike select()) in my original commit 👍

@HyukjinKwon
Copy link
Member

Does this LGTM, @gaogaotiantian ?

if self.rfile in r and func():
if poller is not None:
# Unlike select, poll timeout is in millis. Rule out error events.
r = [fd for fd, event in poller.poll(1000) if event & select.POLLIN]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I took some time on this and I think my implementation lacks error checking as well which I will fix later. However, we need to deal with errors here. If the socket has some issues, we will busy loop here forever because poller.poll() does not raise an exception.

We should break on POLLHUP and raise an error on POLLERR and POLLNVAL. We also need to confirm that there's no other event set.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok - you are right - so we want to handle these:

  • POLLIN and POLLHUP we want to break - when using select() these both return "ready for reading" - if peer hangs up reads will return 0 bytes
  • POLLERR and POLLNVAL we want to raise - select() will already raise on error

@wjszlachta-man
Copy link
Author

wjszlachta-man commented Dec 10, 2025

@gaogaotiantian let me know what you think re. latest commit - it will check for errors in both accumulators.py and daemon.py.

I also removed try/catch around:

try:
    ready_fds = select.select([0, listen_sock], [], [], 1)[0]
except select.error as ex:
    if ex[0] == EINTR:
        continue
    else:
        raise

in daemon.py as this is old code and only needed for Python <3.5 (see: https://peps.python.org/pep-0475/ - from Python 3.5 onward select.select() will automatically retry system calls on EINTR).

Considering python_requires=">=3.9" as of Spark>=4, it should be safe to remove.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants