Skip to content

Commit

Permalink
Fix synchronization issue on the GC scheduler (JuliaLang#53355)
Browse files Browse the repository at this point in the history
This aims to slightly simplify the synchronization by making
`n_threads_marking` the sole memory location of relevance for it, it
also removes the fast path, because being protected by the lock is
quite important so that the observed gc state arrays are valid.

Fixes: JuliaLang#53350
Fixes: JuliaLang#52757
Maybe fixes: JuliaLang#53026
Co-authored-by: Jameson Nash <[email protected]>
  • Loading branch information
2 people authored and mkitti committed Mar 7, 2024
1 parent 8897ee0 commit 609d131
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 57 deletions.
63 changes: 22 additions & 41 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "gc.h"
#include "gc-page-profiler.h"
#include "julia.h"
#include "julia_atomics.h"
#include "julia_gcext.h"
#include "julia_assert.h"
#ifdef __GLIBC__
Expand Down Expand Up @@ -540,7 +541,7 @@ void jl_gc_run_all_finalizers(jl_task_t *ct)

void jl_gc_add_finalizer_(jl_ptls_t ptls, void *v, void *f) JL_NOTSAFEPOINT
{
assert(jl_atomic_load_relaxed(&ptls->gc_state) == 0);
assert(jl_atomic_load_relaxed(&ptls->gc_state) == JL_GC_STATE_UNSAFE);
arraylist_t *a = &ptls->finalizers;
// This acquire load and the release store at the end are used to
// synchronize with `finalize_object` on another thread. Apart from the GC,
Expand Down Expand Up @@ -1676,7 +1677,7 @@ void gc_sweep_wake_all(jl_ptls_t ptls, jl_gc_padded_page_stack_t *new_gc_allocd_
void gc_sweep_wait_for_all(void)
{
jl_atomic_store(&gc_allocd_scratch, NULL);
while (jl_atomic_load_relaxed(&gc_n_threads_sweeping) != 0) {
while (jl_atomic_load_acquire(&gc_n_threads_sweeping) != 0) {
jl_cpu_pause();
}
}
Expand Down Expand Up @@ -2966,9 +2967,7 @@ void gc_mark_and_steal(jl_ptls_t ptls)
jl_gc_markqueue_t *mq = &ptls->mark_queue;
jl_gc_markqueue_t *mq_master = NULL;
int master_tid = jl_atomic_load(&gc_master_tid);
if (master_tid == -1) {
return;
}
assert(master_tid != -1);
mq_master = &gc_all_tls_states[master_tid]->mark_queue;
void *new_obj;
jl_gc_chunk_t c;
Expand Down Expand Up @@ -3060,54 +3059,37 @@ size_t gc_count_work_in_queue(jl_ptls_t ptls) JL_NOTSAFEPOINT
* Correctness argument for the mark-loop termination protocol.
*
* Safety properties:
* - No work items shall be in any thread's queues when `gc_mark_loop_barrier` observes
* - No work items shall be in any thread's queues when `gc_should_mark` observes
* that `gc_n_threads_marking` is zero.
*
* - No work item shall be stolen from the master thread (i.e. mutator thread which started
* GC and which helped the `jl_n_markthreads` - 1 threads to mark) after
* `gc_mark_loop_barrier` observes that `gc_n_threads_marking` is zero. This property is
* `gc_should_mark` observes that `gc_n_threads_marking` is zero. This property is
* necessary because we call `gc_mark_loop_serial` after marking the finalizer list in
* `_jl_gc_collect`, and want to ensure that we have the serial mark-loop semantics there,
* and that no work is stolen from us at that point.
*
* Proof:
* - Suppose the master thread observes that `gc_n_threads_marking` is zero in
* `gc_mark_loop_barrier` and there is a work item left in one thread's queue at that point.
* Since threads try to steal from all threads' queues, this implies that all threads must
* have tried to steal from the queue which still has a work item left, but failed to do so,
* which violates the semantics of Chase-Lev's work-stealing queue.
*
* - Let E1 be the event "master thread writes -1 to gc_master_tid" and E2 be the event
* "master thread observes that `gc_n_threads_marking` is zero". Since we're using
* sequentially consistent atomics, E1 => E2. Now suppose one thread which is spinning in
* `gc_should_mark` tries to enter the mark-loop after E2. In order to do so, it must
* increment `gc_n_threads_marking` to 1 in an event E3, and then read `gc_master_tid` in an
* event E4. Since we're using sequentially consistent atomics, E3 => E4. Since we observed
* `gc_n_threads_marking` as zero in E2, then E2 => E3, and we conclude E1 => E4, so that
* the thread which is spinning in `gc_should_mark` must observe that `gc_master_tid` is -1
* and therefore won't enter the mark-loop.
* - If a thread observes that `gc_n_threads_marking` is zero inside `gc_should_mark`, that
* means that no thread has work on their queue, this is guaranteed because a thread may only exit
* `gc_mark_and_steal` when its own queue is empty, this information is synchronized by the
* seq-cst fetch_add to a thread that is in `gc_should_mark`. `gc_queue_observer_lock`
* guarantees that once `gc_n_threads_marking` reaches zero, no thread will increment it again,
* because incrementing is only legal from inside the lock. Therefore, no thread will reenter
* the mark-loop after `gc_n_threads_marking` reaches zero.
*/

int gc_should_mark(void)
{
int should_mark = 0;
int n_threads_marking = jl_atomic_load(&gc_n_threads_marking);
// fast path
if (n_threads_marking == 0) {
return 0;
}
uv_mutex_lock(&gc_queue_observer_lock);
while (1) {
int tid = jl_atomic_load(&gc_master_tid);
// fast path
if (tid == -1) {
break;
}
n_threads_marking = jl_atomic_load(&gc_n_threads_marking);
// fast path
int n_threads_marking = jl_atomic_load(&gc_n_threads_marking);
if (n_threads_marking == 0) {
break;
}
int tid = jl_atomic_load_relaxed(&gc_master_tid);
assert(tid != -1);
size_t work = gc_count_work_in_queue(gc_all_tls_states[tid]);
for (tid = gc_first_tid; tid < gc_first_tid + jl_n_markthreads; tid++) {
jl_ptls_t ptls2 = gc_all_tls_states[tid];
Expand All @@ -3118,7 +3100,8 @@ int gc_should_mark(void)
}
// if there is a lot of work left, enter the mark loop
if (work >= 16 * n_threads_marking) {
jl_atomic_fetch_add(&gc_n_threads_marking, 1);
jl_atomic_fetch_add(&gc_n_threads_marking, 1); // A possibility would be to allow a thread that found lots
// of work to increment this
should_mark = 1;
break;
}
Expand All @@ -3130,16 +3113,16 @@ int gc_should_mark(void)

void gc_wake_all_for_marking(jl_ptls_t ptls)
{
jl_atomic_store(&gc_master_tid, ptls->tid);
uv_mutex_lock(&gc_threads_lock);
jl_atomic_fetch_add(&gc_n_threads_marking, 1);
uv_cond_broadcast(&gc_threads_cond);
uv_mutex_unlock(&gc_threads_lock);
}

void gc_mark_loop_parallel(jl_ptls_t ptls, int master)
{
if (master) {
jl_atomic_store(&gc_master_tid, ptls->tid);
jl_atomic_fetch_add(&gc_n_threads_marking, 1);
gc_wake_all_for_marking(ptls);
gc_mark_and_steal(ptls);
jl_atomic_fetch_add(&gc_n_threads_marking, -1);
Expand All @@ -3166,10 +3149,8 @@ void gc_mark_loop(jl_ptls_t ptls)

void gc_mark_loop_barrier(void)
{
jl_atomic_store(&gc_master_tid, -1);
while (jl_atomic_load(&gc_n_threads_marking) != 0) {
jl_cpu_pause();
}
assert(jl_atomic_load_relaxed(&gc_n_threads_marking) == 0);
jl_atomic_store_relaxed(&gc_master_tid, -1);
}

void gc_mark_clean_reclaim_sets(void)
Expand Down
11 changes: 6 additions & 5 deletions src/julia_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ typedef struct _jl_tls_states_t {
_Atomic(volatile size_t *) safepoint; // may be changed to the suspend page by any thread
_Atomic(int8_t) sleep_check_state; // read/write from foreign threads
// Whether it is safe to execute GC at the same time.
#define JL_GC_STATE_UNSAFE 0
// gc_state = 0 means the thread is running Julia code and is not
// safe to run concurrently to the GC
#define JL_GC_STATE_WAITING 1
// gc_state = 1 means the thread is doing GC or is waiting for the GC to
// finish.
Expand Down Expand Up @@ -330,9 +333,7 @@ STATIC_INLINE int8_t jl_gc_state_set(jl_ptls_t ptls, int8_t state,
int8_t old_state)
{
jl_atomic_store_release(&ptls->gc_state, state);
if (state == JL_GC_STATE_SAFE && old_state == 0)
jl_gc_safepoint_(ptls);
if (state == 0 && old_state == JL_GC_STATE_SAFE)
if (state == JL_GC_STATE_UNSAFE || old_state == JL_GC_STATE_UNSAFE)
jl_gc_safepoint_(ptls);
return old_state;
}
Expand All @@ -347,8 +348,8 @@ void jl_gc_unsafe_leave(jl_ptls_t ptls, int8_t state) JL_NOTSAFEPOINT JL_NOTSAFE
int8_t jl_gc_safe_enter(jl_ptls_t ptls) JL_NOTSAFEPOINT JL_NOTSAFEPOINT_ENTER;
void jl_gc_safe_leave(jl_ptls_t ptls, int8_t state) JL_NOTSAFEPOINT_LEAVE; // this might not be a safepoint, but we have to assume it could be (statically)
#else
#define jl_gc_unsafe_enter(ptls) jl_gc_state_save_and_set(ptls, 0)
#define jl_gc_unsafe_leave(ptls, state) ((void)jl_gc_state_set(ptls, (state), 0))
#define jl_gc_unsafe_enter(ptls) jl_gc_state_save_and_set(ptls, JL_GC_STATE_UNSAFE)
#define jl_gc_unsafe_leave(ptls, state) ((void)jl_gc_state_set(ptls, (state), JL_GC_STATE_UNSAFE))
#define jl_gc_safe_enter(ptls) jl_gc_state_save_and_set(ptls, JL_GC_STATE_SAFE)
#define jl_gc_safe_leave(ptls, state) ((void)jl_gc_state_set(ptls, (state), JL_GC_STATE_SAFE))
#endif
Expand Down
2 changes: 1 addition & 1 deletion src/llvm-codegen-shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ static inline llvm::Value *emit_gc_unsafe_enter(llvm::IRBuilder<> &builder, llvm
static inline llvm::Value *emit_gc_unsafe_leave(llvm::IRBuilder<> &builder, llvm::Type *T_size, llvm::Value *ptls, llvm::Value *state, bool final)
{
using namespace llvm;
Value *old_state = builder.getInt8(0);
Value *old_state = builder.getInt8(JL_GC_STATE_UNSAFE);
return emit_gc_state_set(builder, T_size, ptls, state, old_state, final);
}

Expand Down
4 changes: 2 additions & 2 deletions src/safepoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ void jl_safepoint_wait_gc(void) JL_NOTSAFEPOINT
jl_task_t *ct = jl_current_task; (void)ct;
JL_TIMING_SUSPEND_TASK(GC_SAFEPOINT, ct);
// The thread should have set this is already
assert(jl_atomic_load_relaxed(&ct->ptls->gc_state) != 0);
assert(jl_atomic_load_relaxed(&ct->ptls->gc_state) != JL_GC_STATE_UNSAFE);
// Use normal volatile load in the loop for speed until GC finishes.
// Then use an acquire load to make sure the GC result is visible on this thread.
while (jl_atomic_load_relaxed(&jl_gc_running) || jl_atomic_load_acquire(&jl_gc_running)) {
Expand Down Expand Up @@ -309,7 +309,7 @@ int jl_safepoint_suspend_thread(int tid, int waitstate)
}
while (jl_atomic_load_acquire(&ptls2->suspend_count) != 0) {
int8_t state2 = jl_atomic_load_acquire(&ptls2->gc_state);
if (waitstate <= 2 && state2 != 0)
if (waitstate <= 2 && state2 != JL_GC_STATE_UNSAFE)
break;
if (waitstate == 3 && state2 == JL_GC_STATE_WAITING)
break;
Expand Down
10 changes: 4 additions & 6 deletions src/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ void jl_parallel_gc_threadfun(void *arg)
jl_ptls_t ptls = jl_init_threadtls(targ->tid);

// wait for all threads
jl_gc_state_set(ptls, JL_GC_STATE_WAITING, 0);
jl_gc_state_set(ptls, JL_GC_STATE_WAITING, JL_GC_STATE_UNSAFE);
uv_barrier_wait(targ->barrier);

// free the thread argument here
Expand All @@ -143,9 +143,7 @@ void jl_parallel_gc_threadfun(void *arg)
uv_cond_wait(&gc_threads_cond, &gc_threads_lock);
}
uv_mutex_unlock(&gc_threads_lock);
if (may_mark()) {
gc_mark_loop_parallel(ptls, 0);
}
gc_mark_loop_parallel(ptls, 0);
if (may_sweep(ptls)) { // not an else!
gc_sweep_pool_parallel(ptls);
jl_atomic_fetch_add(&ptls->gc_sweeps_requested, -1);
Expand All @@ -162,7 +160,7 @@ void jl_concurrent_gc_threadfun(void *arg)
jl_ptls_t ptls = jl_init_threadtls(targ->tid);

// wait for all threads
jl_gc_state_set(ptls, JL_GC_STATE_WAITING, 0);
jl_gc_state_set(ptls, JL_GC_STATE_WAITING, JL_GC_STATE_UNSAFE);
uv_barrier_wait(targ->barrier);

// free the thread argument here
Expand Down Expand Up @@ -190,7 +188,7 @@ void jl_threadfun(void *arg)
JL_GC_PROMISE_ROOTED(ct);

// wait for all threads
jl_gc_state_set(ptls, JL_GC_STATE_SAFE, 0);
jl_gc_state_set(ptls, JL_GC_STATE_SAFE, JL_GC_STATE_UNSAFE);
uv_barrier_wait(targ->barrier);

// free the thread argument here
Expand Down
2 changes: 1 addition & 1 deletion src/signal-handling.c
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ void jl_task_frame_noreturn(jl_task_t *ct) JL_NOTSAFEPOINT
ct->ptls->in_finalizer = 0;
ct->ptls->defer_signal = 0;
// forcibly exit GC (if we were in it) or safe into unsafe, without the mandatory safepoint
jl_atomic_store_release(&ct->ptls->gc_state, 0);
jl_atomic_store_release(&ct->ptls->gc_state, JL_GC_STATE_UNSAFE);
// allow continuing to use a Task that should have already died--unsafe necromancy!
jl_atomic_store_relaxed(&ct->_state, JL_TASK_STATE_RUNNABLE);
}
Expand Down
2 changes: 1 addition & 1 deletion src/threading.c
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ jl_ptls_t jl_init_threadtls(int16_t tid)
}
}
#endif
jl_atomic_store_relaxed(&ptls->gc_state, 0); // GC unsafe
jl_atomic_store_relaxed(&ptls->gc_state, JL_GC_STATE_UNSAFE); // GC unsafe
// Conditionally initialize the safepoint address. See comment in
// `safepoint.c`
if (tid == 0) {
Expand Down

0 comments on commit 609d131

Please sign in to comment.