diff --git a/docs/source/conf.py b/docs/source/conf.py index 9dddc93400..6c7fb02455 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -50,6 +50,7 @@ ("py:class", "math.inf"), ("py:exc", "Anything else"), ("py:class", "async function"), + ("py:class", "sync function"), ] autodoc_inherit_docstrings = False default_role = "obj" diff --git a/docs/source/reference-lowlevel.rst b/docs/source/reference-lowlevel.rst index 53e5cf8a8c..b196cd0b43 100644 --- a/docs/source/reference-lowlevel.rst +++ b/docs/source/reference-lowlevel.rst @@ -270,6 +270,12 @@ Trio tokens .. autofunction:: current_trio_token +Spawning threads +================ + +.. autofunction:: start_thread_soon + + Safer KeyboardInterrupt handling ================================ diff --git a/newsfragments/6.feature.rst b/newsfragments/6.feature.rst new file mode 100644 index 0000000000..8baa3a27d9 --- /dev/null +++ b/newsfragments/6.feature.rst @@ -0,0 +1,6 @@ +To speed up `trio.to_thread.run_sync`, Trio now caches and re-uses +worker threads. + +And in case you have some exotic use case where you need to spawn +threads manually, but want to take advantage of Trio's cache, you can +do that using the new `trio.lowlevel.start_thread_soon`. diff --git a/notes-to-self/tiny-thread-pool.py b/notes-to-self/tiny-thread-pool.py deleted file mode 100644 index b5de1c37b3..0000000000 --- a/notes-to-self/tiny-thread-pool.py +++ /dev/null @@ -1,143 +0,0 @@ -# This is some very messy notes on how we might implement a thread cache - -import threading -import Queue - -# idea: -# -# unbounded thread pool; tracks how many threads are "available" and how much -# work there is to do; if work > available threads, spawn a new thread -# -# if a thread sits idle for >N ms, exit -# -# we don't need to support job cancellation -# -# we do need to mark a thread as "available" just before it -# signals back to Trio that it's done, to maintain the invariant that all -# unavailable threads are inside the limiter= protection -# -# maintaining this invariant while exiting can be a bit tricky -# -# maybe a simple target should be to always have 1 idle thread - -# XX we can't use a single shared dispatch queue, because we need LIFO -# scheduling, or else the idle-thread timeout won't work! -# -# instead, keep a list/deque/OrderedDict/something of idle threads, and -# dispatch by popping one off; put things back by pushing them on the end -# maybe one shared dispatch Lock, plus a Condition for each thread -# dispatch by dropping the job into the place where the thread can see it and -# then signalling its Condition? or could have separate locks - -@attr.s(frozen=True) -class Job: - main = attr.ib() - main_args = attr.ib() - finish = attr.ib() - finish_args = attr.ib() - -class EXIT: - pass - -class ThreadCache: - def __init__(self): - self._lock = threading.Lock() - self._idle_workers = deque() - self._closed = False - - def close(self): - self._closed = True - with self._lock: - while self._idle_workers: - self._idle_workers.pop().submit(None) - - def submit(self, job): - with self._lock: - if not self._idle_workers: - WorkerThread(self, self._lock, job) - else: - worker = self._idle_workers.pop() - worker.submit(job) - - # Called from another thread - # Must be called with the lock held - def remove_idle_worker(self, worker): - self._idle_workers.remove(worker) - - # Called from another thread - # Lock is *not* held - def add_idle_worker(self, worker): - if self._closed: - with self._lock: - worker.submit - self._idle_workers.append(worker) - -# XX thread name - -IDLE_TIMEOUT = 1.0 - -class WorkerThread: - def __init__(self, cache, lock, initial_job): - self._cache = cache - self._condition = threading.Condition(lock) - self._job = None - self._thread = threading.Thread( - target=self._loop, args=(initial_job,), daemon=True) - self._thread.start() - - # Must be called with the lock held - def submit(self, job): - assert self._job is None - self._job = job - self._condition.notify() - - def _loop(self, initial_job): - self._run_job(initial_job) - while True: - with self._condition: - self._condition.wait(IDLE_TIMEOUT): - job = self._job - self._job = None - if job is None: - self._cache.remove_idle_worker(self) - return - # Dropped the lock, and have a job to do - self._run_job(job) - - def _run_job(self, job): - job.main(*job.main_args) - self._cache.add_idle_worker(self) - job.finish(*job.finish_args) - - -# Probably the interface should be: trio.lowlevel.call_soon_in_worker_thread? - -# Enqueueing work: -# put into unbounded queue -# with lock: -# if idle_threads: -# idle_threads -= 1 -# else: -# spawn a new thread (it starts out non-idle) -# -# Thread shutdown: -# with lock: -# idle_threads -= 1 -# check for work one last time, and then either exit or do it -# -# Thread startup: -# -# check for work -# while True: -# mark self as idle -# check for work (with timeout) -# either do work or shutdown - -# if we want to support QueueUserAPC cancellation, we need a way to get back -# the thread id... maybe that just works like -# -# def WaitForSingleObjectEx_thread_fn(...): -# with lock: -# check if already cancelled -# put our thread id where main thread can find it -# WaitForSingleObjectEx(...) diff --git a/trio/_core/__init__.py b/trio/_core/__init__.py index c28b7f4078..136bfe6b98 100644 --- a/trio/_core/__init__.py +++ b/trio/_core/__init__.py @@ -68,6 +68,8 @@ from ._local import RunVar +from ._thread_cache import start_thread_soon + # Kqueue imports try: from ._run import current_kqueue, monitor_kevent, wait_kevent diff --git a/trio/_core/_thread_cache.py b/trio/_core/_thread_cache.py new file mode 100644 index 0000000000..1ecd4bdfec --- /dev/null +++ b/trio/_core/_thread_cache.py @@ -0,0 +1,168 @@ +from threading import Thread, Lock +import sys +import outcome +from itertools import count + +# The "thread cache" is a simple unbounded thread pool, i.e., it automatically +# spawns as many threads as needed to handle all the requests its given. Its +# only purpose is to cache worker threads so that they don't have to be +# started from scratch every time we want to delegate some work to a thread. +# It's expected that some higher-level code will track how many threads are in +# use to avoid overwhelming the system (e.g. the limiter= argument to +# trio.to_thread.run_sync). +# +# To maximize sharing, there's only one thread cache per process, even if you +# have multiple calls to trio.run. +# +# Guarantees: +# +# It's safe to call start_thread_soon simultaneously from +# multiple threads. +# +# Idle threads are chosen in LIFO order, i.e. we *don't* spread work evenly +# over all threads. Instead we try to let some threads do most of the work +# while others sit idle as much as possible. Compared to FIFO, this has better +# memory cache behavior, and it makes it easier to detect when we have too +# many threads, so idle ones can exit. +# +# This code assumes that 'dict' has the following properties: +# +# - __setitem__, __delitem__, and popitem are all thread-safe and atomic with +# respect to each other. This is guaranteed by the GIL. +# +# - popitem returns the most-recently-added item (i.e., __setitem__ + popitem +# give you a LIFO queue). This relies on dicts being insertion-ordered, like +# they are in py36+. + +# How long a thread will idle waiting for new work before gives up and exits. +# This value is pretty arbitrary; I don't think it matters too much. +IDLE_TIMEOUT = 10 # seconds + +name_counter = count() + + +class WorkerThread: + def __init__(self, thread_cache): + self._job = None + self._thread_cache = thread_cache + # This Lock is used in an unconventional way. + # + # "Unlocked" means we have a pending job that's been assigned to us; + # "locked" means that we don't. + # + # Initially we have no job, so it starts out in locked state. + self._worker_lock = Lock() + self._worker_lock.acquire() + thread = Thread(target=self._work, daemon=True) + thread.name = f"Trio worker thread {next(name_counter)}" + thread.start() + + def _work(self): + while True: + if self._worker_lock.acquire(timeout=IDLE_TIMEOUT): + # We got a job + fn, deliver = self._job + self._job = None + result = outcome.capture(fn) + # Tell the cache that we're available to be assigned a new + # job. We do this *before* calling 'deliver', so that if + # 'deliver' triggers a new job, it can be assigned to us + # instead of spawning a new thread. + self._thread_cache._idle_workers[self] = None + deliver(result) + else: + # Timeout acquiring lock, so we can probably exit. But, + # there's a race condition: we might be assigned a job *just* + # as we're about to exit. So we have to check. + try: + del self._thread_cache._idle_workers[self] + except KeyError: + # Someone else removed us from the idle worker queue, so + # they must be in the process of assigning us a job - loop + # around and wait for it. + continue + else: + # We successfully removed ourselves from the idle + # worker queue, so no more jobs are incoming; it's safe to + # exit. + return + + +class ThreadCache: + def __init__(self): + self._idle_workers = {} + self._cache_lock = Lock() + + def start_thread_soon(self, deliver, fn): + try: + worker, _ = self._idle_workers.popitem() + except KeyError: + worker = WorkerThread(self) + worker._job = (fn, deliver) + worker._worker_lock.release() + + +THREAD_CACHE = ThreadCache() + + +def start_thread_soon(deliver, fn): + """Runs ``deliver(outcome.capture(fn))`` in a worker thread. + + Generally ``fn`` does some blocking work, and ``deliver`` delivers the + result back to whoever is interested. + + This is a low-level, no-frills interface, very similar to using + `threading.Thread` to spawn a thread directly. The main difference is + that this function tries to re-use threads when possible, so it can be + a bit faster than `threading.Thread`. + + Worker threads have the `~threading.Thread.daemon` flag set, which means + that if your main thread exits, worker threads will automatically be + killed. If you want to make sure that your ``fn`` runs to completion, then + you should make sure that the main thread remains alive until ``deliver`` + is called. + + It is safe to call this function simultaneously from multiple threads. + + Args: + + deliver (sync function): Takes the `outcome.Outcome` of ``fn``, and + delivers it. *Must not block.* + + fn (sync function): Performs arbitrary blocking work. + + Because worker threads are cached and reused for multiple calls, neither + function should mutate thread-level state, like `threading.local` objects + – or if they do, they should be careful to revert their changes before + returning. + + Note: + + The split between ``fn`` and ``deliver`` serves two purposes. First, + it's convenient, since most callers need something like this anyway. + + Second, it avoids a small race condition that could cause too many + threads to be spawned. Consider a program that wants to run several + jobs sequentially on a thread, so the main thread submits a job, waits + for it to finish, submits another job, etc. In theory, this program + should only need one worker thread. But what could happen is: + + 1. Worker thread: First job finishes, and calls ``deliver``. + + 2. Main thread: receives notification that the job finished, and calls + ``start_thread_soon``. + + 3. Main thread: sees that no worker threads are marked idle, so spawns + a second worker thread. + + 4. Original worker thread: marks itself as idle. + + To avoid this, threads mark themselves as idle *before* calling + ``deliver``. + + Is this potential extra thread a major problem? Maybe not, but it's + easy enough to avoid, and we figure that if the user is trying to + limit how many threads they're using then it's polite to respect that. + + """ + THREAD_CACHE.start_thread_soon(deliver, fn) diff --git a/trio/_core/tests/test_thread_cache.py b/trio/_core/tests/test_thread_cache.py new file mode 100644 index 0000000000..6c6fc3a104 --- /dev/null +++ b/trio/_core/tests/test_thread_cache.py @@ -0,0 +1,120 @@ +import pytest +import threading +from queue import Queue +import time + +from .tutil import slow +from .. import _thread_cache +from .._thread_cache import start_thread_soon, ThreadCache + + +def test_thread_cache_basics(): + q = Queue() + + def fn(): + raise RuntimeError("hi") + + def deliver(outcome): + q.put(outcome) + + start_thread_soon(deliver, fn) + + outcome = q.get() + with pytest.raises(RuntimeError, match="hi"): + outcome.unwrap() + + +@slow +def test_spawning_new_thread_from_deliver_reuses_starting_thread(): + # We know that no-one else is using the thread cache, so if we keep + # submitting new jobs the instant the previous one is finished, we should + # keep getting the same thread over and over. This tests both that the + # thread cache is LIFO, and that threads can be assigned new work *before* + # deliver exits. + + # Make sure there are a few threads running, so if we weren't LIFO then we + # could grab the wrong one. + q = Queue() + COUNT = 5 + for _ in range(COUNT): + start_thread_soon(lambda result: q.put(result), lambda: time.sleep(1)) + for _ in range(COUNT): + q.get().unwrap() + + seen_threads = set() + done = threading.Event() + + def deliver(n, _): + print(n) + seen_threads.add(threading.current_thread()) + if n == 0: + done.set() + else: + start_thread_soon(lambda _: deliver(n - 1, _), lambda: None) + + start_thread_soon(lambda _: deliver(5, _), lambda: None) + + done.wait() + + assert len(seen_threads) == 1 + + +@slow +def test_idle_threads_exit(monkeypatch): + # Temporarily set the idle timeout to something tiny, to speed up the + # test. (But non-zero, so that the worker loop will at least yield the + # CPU.) + monkeypatch.setattr(_thread_cache, "IDLE_TIMEOUT", 0.0001) + + q = Queue() + start_thread_soon(lambda _: q.put(threading.current_thread()), lambda: None) + seen_thread = q.get() + # Since the idle timeout is 0, after sleeping for 1 second, the thread + # should have exited + time.sleep(1) + assert not seen_thread.is_alive() + + +def test_race_between_idle_exit_and_job_assignment(monkeypatch): + # This is a lock where the first few times you try to acquire it with a + # timeout, it waits until the lock is available and then pretends to time + # out. Using this in our thread cache implementation causes the following + # sequence: + # + # 1. start_thread_soon grabs the worker thread, assigns it a job, and + # releases its lock. + # 2. The worker thread wakes up (because the lock has been released), but + # the JankyLock lies to it and tells it that the lock timed out. So the + # worker thread tries to exit. + # 3. The worker thread checks for the race between exiting and being + # assigned a job, and discovers that it *is* in the process of being + # assigned a job, so it loops around and tries to acquire the lock + # again. + # 4. Eventually the JankyLock admits that the lock is available, and + # everything proceeds as normal. + + class JankyLock: + def __init__(self): + self._lock = threading.Lock() + self._counter = 3 + + def acquire(self, timeout=None): + self._lock.acquire() + if timeout is None: + return True + else: + if self._counter > 0: + self._counter -= 1 + self._lock.release() + return False + return True + + def release(self): + self._lock.release() + + monkeypatch.setattr(_thread_cache, "Lock", JankyLock) + + tc = ThreadCache() + done = threading.Event() + tc.start_thread_soon(lambda _: done.set(), lambda: None) + done.wait() diff --git a/trio/_threads.py b/trio/_threads.py index 92e2b5dc0f..f441952b55 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -14,6 +14,7 @@ disable_ki_protection, RunVar, TrioToken, + start_thread_soon, ) from ._util import coroutine_or_error @@ -253,7 +254,7 @@ async def to_thread_run_sync(sync_fn, *args, cancellable=False, limiter=None): # for the result – or None if this function was cancelled and we should # discard the result. task_register = [trio.lowlevel.current_task()] - name = "trio-worker-{}".format(next(_thread_counter)) + name = f"trio.to_thread.run_sync-{next(_thread_counter)}" placeholder = ThreadPlaceholder(name) # This function gets scheduled into the Trio run loop to deliver the @@ -273,32 +274,26 @@ def do_release_then_return_result(): if task_register[0] is not None: trio.lowlevel.reschedule(task_register[0], result) - # This is the function that runs in the worker thread to do the actual - # work and then schedule the call to report_back_in_trio_thread_fn - # Since this is spawned in a new thread, the trio token needs to be passed - # explicitly to it so it can inject it into thread local storage - def worker_thread_fn(trio_token): - TOKEN_LOCAL.token = trio_token + current_trio_token = trio.lowlevel.current_trio_token() + + def worker_fn(): + TOKEN_LOCAL.token = current_trio_token try: - result = outcome.capture(sync_fn, *args) - try: - trio_token.run_sync_soon(report_back_in_trio_thread_fn, result) - except trio.RunFinishedError: - # The entire run finished, so our particular task is certainly - # long gone -- it must have cancelled. - pass + return sync_fn(*args) finally: del TOKEN_LOCAL.token + def deliver_worker_fn_result(result): + try: + current_trio_token.run_sync_soon(report_back_in_trio_thread_fn, result) + except trio.RunFinishedError: + # The entire run finished, so our particular task is certainly + # long gone -- it must have been cancelled and abandoned us. + pass + await limiter.acquire_on_behalf_of(placeholder) try: - # daemon=True because it might get left behind if we cancel, and in - # this case shouldn't block process exit. - current_trio_token = trio.lowlevel.current_trio_token() - thread = threading.Thread( - target=worker_thread_fn, args=(current_trio_token,), name=name, daemon=True, - ) - thread.start() + start_thread_soon(deliver_worker_fn_result, worker_fn) except: limiter.release_on_behalf_of(placeholder) raise diff --git a/trio/lowlevel.py b/trio/lowlevel.py index 21ec0597df..3ce3e741ba 100644 --- a/trio/lowlevel.py +++ b/trio/lowlevel.py @@ -41,6 +41,7 @@ wait_readable, wait_writable, notify_closing, + start_thread_soon, ) # Unix-specific symbols diff --git a/trio/tests/test_threads.py b/trio/tests/test_threads.py index b4acae8b58..632ce13656 100644 --- a/trio/tests/test_threads.py +++ b/trio/tests/test_threads.py @@ -239,7 +239,9 @@ async def child(q, cancellable): # Make sure that if trio.run exits, and then the thread finishes, then that's # handled gracefully. (Requires that the thread result machinery be prepared # for call_soon to raise RunFinishedError.) -def test_run_in_worker_thread_abandoned(capfd): +def test_run_in_worker_thread_abandoned(capfd, monkeypatch): + monkeypatch.setattr(_core._thread_cache, "IDLE_TIMEOUT", 0.01) + q1 = stdlib_queue.Queue() q2 = stdlib_queue.Queue() @@ -426,10 +428,10 @@ def release_on_behalf_of(self, borrower): async def test_run_in_worker_thread_fail_to_spawn(monkeypatch): # Test the unlikely but possible case where trying to spawn a thread fails - def bad_start(self): + def bad_start(self, *args): raise RuntimeError("the engines canna take it captain") - monkeypatch.setattr(threading.Thread, "start", bad_start) + monkeypatch.setattr(_core._thread_cache.ThreadCache, "start_thread_soon", bad_start) limiter = current_default_thread_limiter() assert limiter.borrowed_tokens == 0