Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Design: thread pools #6

Closed
njsmith opened this issue Jan 22, 2017 · 7 comments · Fixed by #1545
Closed

Design: thread pools #6

njsmith opened this issue Jan 22, 2017 · 7 comments · Fixed by #1545

Comments

@njsmith
Copy link
Member

njsmith commented Jan 22, 2017

Right now, run_in_worker_thread just always spawns a new thread for the operation, and then kills it after. This might sound ridiculous, but it's not so obviously wrong as it looks! There's a large comment in trio._threads talking about some of the issues.

Questions:

  • Should there be any global control over worker thread spawning?
  • If there is, should it be a strict limit on the number of threads, or something more subtle like a limiter on the rate at which they spawn?
  • How do administrators configure this stuff? Or instrument it?
  • What should the API to interact with it look like, e.g. do we need a way for a specific run_in_worker_thread to say that it shouldn't block waiting for a thread because it might unblock a thread?

Prior art: https://twistedmatrix.com/trac/ticket/5298

Interacting with the products at Rackspace which use Twisted, I've seen problems caused by thread-pool maximum sizes with some annoying regularity. The basic problem is this: if you have a hard limit on the number of threads, it is not possible to write a correct program which may require starting a new thread to un-block a blocked pool thread - glyph

@njsmith
Copy link
Member Author

njsmith commented Jan 28, 2017

The Java SDK docs have surprisingly good notes on these trade-offs: java.util.concurrent.ThreadPoolExecutor

@njsmith
Copy link
Member Author

njsmith commented May 24, 2017

<benbangert> has anyone made anything to throttle/restrict the amount of tasks to defer to thread from a single protocol instance?
<runciter> benbangert: you might be interested in https://github.com/terrycojones/txrdq
<runciter> though you can probably also use a cooperator
<benbangert> ie, I'd like to process multiple messages at once from a websocket client, but if multiple ones need a thread blocking op, I want them queued to run rather than deferToThread where its possible for a single client to saturate the thread-pool

@njsmith
Copy link
Member Author

njsmith commented Jun 5, 2017

@arigo points out something disturbing about Python threads (on both CPython and PyPy): thread.join() doesn't actually call pthread_join (or I assume the equivalent on windows, but I haven't checked). It just waits for the thread to say "hey I'm about to exit". (And all threads are marked as "detached" from pthread's point of view.) So in theory there is a race condition where we could end up with arbitrarily many threads in the last stages of exiting, that are taking up an arbitrary amount of address space. (And then of course run_in_worker_thread also recreates this problem; it doesn't call join at all, just assumes that the worker thread will exit between when the thread schedules the callback to the trio thread and when the callback releases the limiter. But if join worked we could fix this by calling join from the callback, just before releasing the limiter.)

I think the practical impact of this problem is very low. In particular, 64-bit systems have the address space to handle an almost arbitrary number of threads, and even on 32-bit systems, if our default thread limit is on the order of "dozens", then it would take a lot of threads simultaneously stalled in the last bit of exiting to cause a problem. Unless the heap was taking up most of the address space, I guess. It makes me a bit nervous that it's the sort of thing that someone will find a way to hit under high load though.

One response to this would be to use a thread pool. We would want an unbounded thread pool, I think, i.e. one where if a job comes in and all threads are busy, we spawn a new thread (and if a thread is idle for too long, it goes away). (So that's an interesting control problem to avoid too much churn.) So long as worker threads marked themselves as "soon to be available" before signaling back to the trio thread, this would avoid the race condition in the steady state case by making sure that we never have more threads running than were allowed at some point by the trio-side limiting within the last N seconds. In principle it though would still be possible to hit the race condition by having exactly the wrong sort of cyclic workload, where we spin up N threads, then they all sit idle long enough to start running their cleanup logic, and while they're still doing that we suddenly have to spin up another N threads to replace them, so we end up with 2*N threads running at once.

To solve this really-properly-for-sure, we would need to stop using Python's threading module to spawn and join threads. Unfortunately, it's not quite as simple as using cffi to wrap pthread_spawn/pthread_join or the Windows equivalents, because to properly spawn a thread on CPython you have to take care of subinterpreters, which means passing through the threadstate from the parent to the child. I guess potentially we could try whining at Armin or someone to fix it so that ABI-mode callbacks can remember a particular tstate and then restore it when they're invoked.

Given our unbounded thread pool semantics, it makes sense to use a single thread pool for the whole process (shared across subinterpreters and threads), which might simplify things, or make them more complicated. It's not mandatory.

njsmith added a commit to njsmith/trio that referenced this issue Jun 18, 2017
- New synchronization primitive: CapacityLimiter. Like a Semaphore but
  more specialized. See python-triogh-182 for rationale.

- Add limiter= argument to run_in_worker_thread, that allows one to
  correctly (modulo
  python-trio#6 (comment))
  control the number of active threads.

- Added new function current_default_worker_thread_limiter(), which
  creates or returns a run-local CapacityLimiter, and made
  run_in_worker_thread use it by default when no other limiter= is
  given.

Closes: python-triogh-10, python-triogh-57, python-triogh-156
njsmith added a commit to njsmith/trio that referenced this issue Jun 18, 2017
- New synchronization primitive: CapacityLimiter. Like a Semaphore but
  more specialized. See python-triogh-182 for rationale.

- Add limiter= argument to run_in_worker_thread, that allows one to
  correctly (modulo
  python-trio#6 (comment))
  control the number of active threads.

- Added new function current_default_worker_thread_limiter(), which
  creates or returns a run-local CapacityLimiter, and made
  run_in_worker_thread use it by default when no other limiter= is
  given.

Closes: python-triogh-10, python-triogh-57, python-triogh-156
@njsmith
Copy link
Member Author

njsmith commented Jun 23, 2017

So with #181 we've pretty much settled on our general architecture for handling worker threads: a lower-level unbounded "thread cache" (similar to the JDK's "cached thread pool"), plus an extensible policy layer on top that runs in the trio thread.

So the remaining issue is: currently our "thread cache"'s replacement policy is "always", i.e., we don't actually have a cache, we just start a new thread every time. Maybe it would be worthwhile to re-use threads. This is a non-trivial increase in complexity, and it's primarily an optimization, so maybe it should wait until we have some real programs whose behavior we can measure.

If/when we do this, we'll need to figure out the API for interacting with the cache. At the least, we'll need a trio.hazmat API for grabbing a cached thread and running some code on it. There are also some configuration options you can imagine setting on a thread cache, like how long an idle thread should wait before exiting, or the thread stack size. (Not that Python lets us usefully control the latter right now.) Should we make these configurable by users? Should we allow for the creation of multiple thread cache objects with different settings? If so, then how does the user specify which one they want? (For run_in_worker_thread it'd just be a matter of passing in the desired thread cache, but what about getaddrinfo?)

We'll also want to re-use this "thread cache" for other miscellaneous threads that can't quite use the standard run_in_worker_thread machinery, like calling WaitForSingleObjectEx or waitpid – see #233 #4. Possibly also for some kinds of I/O on windows if we decide to use CancelSynchronousIo – see #174 (comment). The distinction features of these cases are: we want an unbounded admission policy, and we need special handling for cancellation. For WaitForSingleObjectEx and CancelSynchronousIo there's a special API for cancelling work, and for waitpid we'll want to spawn a thread at the same time we spawn the process and let it run in the background without being tied to a specific trio task.

@njsmith
Copy link
Member Author

njsmith commented Oct 7, 2019

For some reason a plausible algorithm for this popped in my head today:

import threading
import outcome
import queue

try:
    # SimpleQueue is faster, but only available on python 3.7+
    from queue import SimpleQueue
except ImportError:
    from queue import Queue as SimpleQueue

# How long a thread will idle waiting for new work before it exits. I don't
# think it should matter too much, though it should be substantially longer
# than the cost of creating a thread, which is on the order of 10-100 µs
IDLE_TIMEOUT = 10  # seconds

class ThreadCache:
    def __init__(self):
        self._idle_workers = 0
        self._total_workers = 0
        self._lock = threading.Lock()
        self._q = SimpleQueue()

    def _worker(self):
        while True:
            try:
                job = self._q.get(timeout=IDLE_TIMEOUT)
            except queue.Empty:
                with self._lock:
                    if self._idle_workers == 0:
                        # We were *just* assigned some work, so loop back
                        # around to get it
                        continue
                    else:
                        self._idle_workers -= 1
                        self._total_workers -= 1
                        return

            fn, deliver = job
            result = outcome.capture(fn)

            with self._lock:
                self._idle_workers += 1

            deliver(result)

    def submit(self, fn, deliver):
        with self._lock:
            if self._idle_workers == 0:
                # Spawn a new worker.
                threading.Thread(target=self._worker, daemon=True).start()
                self._total_workers += 1
            else:
                self._idle_workers -= 1
            self._q.put((fn, deliver))

I think that's correct. It's deceptively simple.

This is designed to work as a process-global singleton, so if you have multiple trio.runs happening in sequence or parallel then they can still share a thread cache. I had a draft that include a close method, but that's not really useful for a global singleton, esp. since we already know that we sometimes want to intentionally leak threads during process shutdown. (When someone cancels a getaddrinfo, we intentionally let the thread keep running silently in the background until it exits or the process shuts down, since that's better than any of the alternatives.)

Tracking self._total_workers isn't strictly necessary, but I feel like it might be useful to see how many getaddrinfo-style threads we have outstanding. Within a normal process that does a single call to trio.run it doesn't matter, but maybe pytest-trio should check how many leaked threads are still running after each test, and if it starts getting out of hand it might want to sleep a bit to let some of the leaked threads finish up before continuing. The reason this doesn't matter during a regular call to trio.run is that the global CapacityLimiter for threads does still count the leaked threads until they exit, to keep from accidentally overwhelming the system – but we don't have any way to share that accounting between multiple calls to trio.run.

This is kind of interaction is also related to why we have the fn/deliver split. A simpler design would be to have a single fn that runs in the thread, and let it take care of delivering the result directly. But let's say you have a CapacityLimiter of 10 on thread spawns. Intuitively, you'd expect that to mean that you never have more than 10 threads running. But in the simpler design, that's not guaranteed: you could have the following sequence:

  1. in thread number 10, fn reports back to the main thread, and then the thread is suspended by the OS
  2. the main thread decrements the CapacityLimiter, which allows another trio.to_thread.run_sync to continue
  3. the thread cache sees that no threads are idle, so it spawns thread number 11
  4. thread number 10 gets to run again, and marks itself as idle and ready to accept the next job – but that's already gone to thread number 11

In the design above, the thread marks itself as idle before reporting back to the main thread, so if deliver triggers a call to ThreadCache.submit, it'll get assigned back to the same thread. That way we don't spawn more than 10 threads.

There is still a tiny race condition where we can briefly end up with more than 10 threads: in the moment between when an idle thread decides to give up and quit, and when it actually does so, an 11 thread could be spawned. That situation only persists for a tiny fraction of a second though before correcting itself, while the deliver race condition could potentially let you exceed the threshold for quite some time.

Hmm, in fact if you're unlucky, it could let you exceed the threshold forever... our simple thread cleanup scheme above isn't actually guaranteed to converge on the right number of threads. Imagine you have 1 job submitted per second, and it completes ~instantaneously. So you really only need 1 thread to handle all the jobs. But let's say for whatever reason, you have 10 threads, and they're all waiting on the same queue for jobs to be assigned. (Maybe you briefly needed 10 threads a while ago, but don't anymore.) If you're unlucky, the jobs might be assigned to the threads round-robin style, so thread 1 handles the first job, thread 2 handles the second job, etc. This means that every thread ends up handling 1 request every 10 seconds. So if your idle timeout is 10 seconds... no thread is ever idle that long, and they all stay alive, even though 9 of them are superfluous.

Some ideas for solving this:

  • Don't worry about it. KISS.
  • Somehow arrange for queue.get to wake up threads in LIFO order – whichever thread went to sleep most recently gets priority for waking up. (This might also have some cache locality benefits.) Probably would require implementing a custom thread-safe queue though.
  • Put a timer in the ThreadCache itself, tracking how long its been since the last time all threads were busy. If it exceeds IDLE_TIMEOUT, then explicitly tell a thread to exit. I think this could be done without any extra timer thread, by only updating and checking the timer opportunistically inside ThreadCache.submit. And if no job is submitted for IDLE_TIMEOUT, then by definition all the threads can exit, and they can detect that on their own without help from the main thread.

(You can also get much more fancy with controller design to adjust thread cache size. For example, see Optimizing Concurrency Levels in the .NET ThreadPool: A Case Study of Controller Design and Implementation. One obvious addition would be to add some hysteresis, to smooth out the thread pool size instead of letting all the threads exit at once.)

@njsmith
Copy link
Member Author

njsmith commented Oct 18, 2019

Here's another version, that's about 2x faster than the one I put above, and assigns work to threads in LIFO style, so idle timeouts will work properly and it has better cache behavior:

import threading
import sys
# TODO: also use dict on pypy
# Note: we need an ordered dict that's thread-safe (assignment, del, and
# popitem should all be atomic wrt each other). Fortunately, dict is always
# thread-safe and on py35+, OrderedDict is also thread-safe (protected by the
# GIL).
if sys.version_info >= (3, 7):
    odict = dict
else:
    from collections import OrderedDict as odict
import outcome

# How long a thread will idle waiting for new work before it exits. I don't
# think it should matter too much, though it should be substantially longer
# than the cost of creating a thread, which is on the order of 10-100 µs
IDLE_TIMEOUT = 10  # seconds

class WorkerThread:
    def __init__(self, thread_cache):
        self._job = None
        self._thread_cache = thread_cache
        # Weird convention for this lock: "unlocked" means we've been assigned a job
        # Initially we have no job, so it starts out in locked state.
        self._worker_lock = threading.Lock()
        self._worker_lock.acquire()
        thread = threading.Thread(target=self._work, daemon=True)
        thread.start()

    def _work(self):
        while True:
            if self._worker_lock.acquire(timeout=IDLE_TIMEOUT):
                # We got a job
                fn, deliver = self._job
                self._job = None
                result = outcome.capture(fn)
                self._thread_cache._idle_workers[self] = None
                deliver(result)
            else:
                # Timeout acquiring lock, so we can probably exit
                try:
                    del self._thread_cache._idle_workers[self]
                except KeyError:
                    # We're being assigned a job, so we can't exit yet
                    continue
                else:
                    # We successfully removed ourselves from the idle
                    # worker queue, so we can exit
                    return


class ThreadCache:
    def __init__(self):
        self._idle_workers = odict()
        self._cache_lock = threading.Lock()

    def submit(self, fn, deliver):
         try:
             worker, _ = self._idle_workers.popitem()
         except KeyError:
             worker = WorkerThread(self)
         worker._job = (fn, deliver)
         worker._worker_lock.release()

On my Linux laptop with CPython 3.7.3, I get:

In [58]: a, b = socket.socketpair()                                             

In [59]: %timeit tc.submit(lambda: None, lambda _: a.send(b"x")); b.recv(1)     
8.48 µs ± 312 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

In [60]: %timeit outcome.capture(lambda: None); (lambda _: a.send(b"x"))(None); b.recv(1)
2.63 µs ± 67.4 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

In [63]: %timeit threading.Thread(target=lambda: a.send(b"x")).start(); b.recv(1)
79.5 µs ± 11 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

So pushing a job into a warm thread pool adds about 6 µs of overhead versus running it in the main thread, and is ~10x faster than spawning a thread (like we do now).

I also ran py-spy over it, and it said the worker thread is spending ~97% of its time in the self._worker_lock.acquire line, specifically in do_futex_wait + __pthread_cond_timedwait. And the main thread is spending ~72% of its time in recv, plus 17% in submit. And of the time in submit, 96% is spent in sem_post@@GLIBC_2.2.5. So I think this is pretty close to optimal – I can't see any way a thread pool system could avoid a single wait in the worker and a single notification from the main thread, and those are basically the only things this code spends time on. And of course we can't avoid the socket send/recv, so that also puts a lower bound on how fast this can go.

njsmith added a commit to njsmith/trio that referenced this issue May 23, 2020
On my Linux laptop, this makes 'await trio.to_thread.run_sync(lambda:
None)' about twice as fast, from ~150 µs to ~75 µs.

Closes: python-triogh-6

Test program:

    import trio
    import time

    COUNT = 10000

    async def main():
        while True:
            start = time.monotonic()
            for _ in range(COUNT):
                await trio.to_thread.run_sync(lambda: None)
            end = time.monotonic()
            print("{:.2f} µs/job".format((end - start) / COUNT * 1e6))

    trio.run(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants