Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement parallel sweeping of stack pools #55643

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions base/timing.jl
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ struct GC_Num
total_time_to_safepoint ::Int64
sweep_time ::Int64
mark_time ::Int64
stack_pool_sweep_time ::Int64
total_sweep_time ::Int64
total_mark_time ::Int64
total_stack_pool_sweep_time::Int64
last_full_sweep ::Int64
last_incremental_sweep ::Int64
end
Expand Down
1 change: 1 addition & 0 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ $(BUILDDIR)/debuginfo.o $(BUILDDIR)/debuginfo.dbg.obj: $(addprefix $(SRCDIR)/,de
$(BUILDDIR)/disasm.o $(BUILDDIR)/disasm.dbg.obj: $(SRCDIR)/debuginfo.h $(SRCDIR)/processor.h
$(BUILDDIR)/gc-debug.o $(BUILDDIR)/gc-debug.dbg.obj: $(SRCDIR)/gc-common.h $(SRCDIR)/gc-stock.h
$(BUILDDIR)/gc-pages.o $(BUILDDIR)/gc-pages.dbg.obj: $(SRCDIR)/gc-common.h $(SRCDIR)/gc-stock.h
$(BUILDDIR)/gc-stacks.o $(BUILDDIR)/gc-stacks.dbg.obj: $(SRCDIR)/gc-common.h $(SRCDIR)/gc-stock.h
$(BUILDDIR)/gc-stock.o $(BUILDDIR)/gc.dbg.obj: $(SRCDIR)/gc-common.h $(SRCDIR)/gc-stock.h $(SRCDIR)/gc-heap-snapshot.h $(SRCDIR)/gc-alloc-profiler.h $(SRCDIR)/gc-page-profiler.h
$(BUILDDIR)/gc-heap-snapshot.o $(BUILDDIR)/gc-heap-snapshot.dbg.obj: $(SRCDIR)/gc-heap-snapshot.h
$(BUILDDIR)/gc-alloc-profiler.o $(BUILDDIR)/gc-alloc-profiler.dbg.obj: $(SRCDIR)/gc-alloc-profiler.h
Expand Down
2 changes: 2 additions & 0 deletions src/gc-interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ typedef struct {
uint64_t total_time_to_safepoint;
uint64_t sweep_time;
uint64_t mark_time;
uint64_t stack_pool_sweep_time;
uint64_t total_sweep_time;
uint64_t total_mark_time;
uint64_t total_stack_pool_sweep_time;
uint64_t last_full_sweep;
uint64_t last_incremental_sweep;
} jl_gc_num_t;
Expand Down
55 changes: 31 additions & 24 deletions src/gc-stacks.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// This file is a part of Julia. License is MIT: https://julialang.org/license

#include "gc-common.h"
#include "gc-stock.h"
#include "threading.h"
#ifndef _OS_WINDOWS_
# include <sys/resource.h>
Expand Down Expand Up @@ -202,7 +203,7 @@ JL_DLLEXPORT void *jl_malloc_stack(size_t *bufsz, jl_task_t *owner) JL_NOTSAFEPO
return stk;
}

void sweep_stack_pools(void) JL_NOTSAFEPOINT
void sweep_stack_pool_loop(void) JL_NOTSAFEPOINT
{
// Stack sweeping algorithm:
// // deallocate stacks if we have too many sitting around unused
Expand All @@ -215,33 +216,38 @@ void sweep_stack_pools(void) JL_NOTSAFEPOINT
// bufsz = t->bufsz
// if (stkbuf)
// push(free_stacks[sz], stkbuf)
assert(gc_n_threads);
d-netto marked this conversation as resolved.
Show resolved Hide resolved
for (int i = 0; i < gc_n_threads; i++) {
jl_atomic_fetch_add(&gc_n_threads_sweeping_stacks, 1);
while (1) {
int i = jl_atomic_fetch_add_relaxed(&gc_ptls_sweep_idx, -1);
if (i < 0)
break;
jl_ptls_t ptls2 = gc_all_tls_states[i];
if (ptls2 == NULL)
continue;

assert(gc_n_threads);
// free half of stacks that remain unused since last sweep
for (int p = 0; p < JL_N_STACK_POOLS; p++) {
small_arraylist_t *al = &ptls2->gc_tls.heap.free_stacks[p];
size_t n_to_free;
if (jl_atomic_load_relaxed(&ptls2->current_task) == NULL) {
n_to_free = al->len; // not alive yet or dead, so it does not need these anymore
}
else if (al->len > MIN_STACK_MAPPINGS_PER_POOL) {
n_to_free = al->len / 2;
if (n_to_free > (al->len - MIN_STACK_MAPPINGS_PER_POOL))
n_to_free = al->len - MIN_STACK_MAPPINGS_PER_POOL;
}
else {
n_to_free = 0;
}
for (int n = 0; n < n_to_free; n++) {
void *stk = small_arraylist_pop(al);
free_stack(stk, pool_sizes[p]);
}
if (jl_atomic_load_relaxed(&ptls2->current_task) == NULL) {
small_arraylist_free(al);
if (i == jl_atomic_load_relaxed(&gc_stack_free_idx)) {
for (int p = 0; p < JL_N_STACK_POOLS; p++) {
small_arraylist_t *al = &ptls2->gc_tls.heap.free_stacks[p];
size_t n_to_free;
if (jl_atomic_load_relaxed(&ptls2->current_task) == NULL) {
n_to_free = al->len; // not alive yet or dead, so it does not need these anymore
}
else if (al->len > MIN_STACK_MAPPINGS_PER_POOL) {
n_to_free = al->len / 2;
if (n_to_free > (al->len - MIN_STACK_MAPPINGS_PER_POOL))
n_to_free = al->len - MIN_STACK_MAPPINGS_PER_POOL;
}
else {
n_to_free = 0;
}
for (int n = 0; n < n_to_free; n++) {
void *stk = small_arraylist_pop(al);
free_stack(stk, pool_sizes[p]);
}
if (jl_atomic_load_relaxed(&ptls2->current_task) == NULL) {
small_arraylist_free(al);
}
}
}
if (jl_atomic_load_relaxed(&ptls2->current_task) == NULL) {
gbaraldi marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -287,6 +293,7 @@ void sweep_stack_pools(void) JL_NOTSAFEPOINT
}
live_tasks->len -= ndel;
}
jl_atomic_fetch_add(&gc_n_threads_sweeping_stacks, -1);
}

JL_DLLEXPORT jl_array_t *jl_live_tasks(void)
Expand Down
77 changes: 67 additions & 10 deletions src/gc-stock.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,17 @@ int jl_n_sweepthreads;
// Number of threads currently running the GC mark-loop
_Atomic(int) gc_n_threads_marking;
// Number of threads sweeping
_Atomic(int) gc_n_threads_sweeping;
_Atomic(int) gc_n_threads_sweeping_pools;
// Number of threads sweeping stacks
_Atomic(int) gc_n_threads_sweeping_stacks;
// Temporary for the `ptls->gc_tls.page_metadata_allocd` used during parallel sweeping (padded to avoid false sharing)
_Atomic(jl_gc_padded_page_stack_t *) gc_allocd_scratch;
// `tid` of mutator thread that triggered GC
_Atomic(int) gc_master_tid;
// counter for sharing work when sweeping stacks
_Atomic(int) gc_ptls_sweep_idx;
// counter for round robin of giving back stack pages to the OS
_Atomic(int) gc_stack_free_idx = 0;
// `tid` of first GC thread
int gc_first_tid;
// Mutex/cond used to synchronize wakeup of GC threads on parallel marking
Expand Down Expand Up @@ -994,13 +1000,50 @@ STATIC_INLINE void gc_sweep_pool_page(gc_page_profiler_serializer_t *s, jl_gc_pa
// sweep over all memory that is being used and not in a pool
static void gc_sweep_other(jl_ptls_t ptls, int sweep_full) JL_NOTSAFEPOINT
{
sweep_stack_pools();
gc_sweep_foreign_objs();
sweep_malloced_memory();
sweep_big(ptls);
jl_engine_sweep(gc_all_tls_states);
}

// wake up all threads to sweep the stacks
void gc_sweep_wake_all_stacks(jl_ptls_t ptls) JL_NOTSAFEPOINT
{
uv_mutex_lock(&gc_threads_lock);
int first = gc_first_parallel_collector_thread_id();
int last = gc_last_parallel_collector_thread_id();
for (int i = first; i <= last; i++) {
jl_ptls_t ptls2 = gc_all_tls_states[i];
gc_check_ptls_of_parallel_collector_thread(ptls2);
jl_atomic_fetch_add(&ptls2->gc_tls.gc_stack_sweep_requested, 1);
}
uv_cond_broadcast(&gc_threads_cond);
uv_mutex_unlock(&gc_threads_lock);
return;
}

void gc_sweep_wait_for_all_stacks(void) JL_NOTSAFEPOINT
{
while ((jl_atomic_load_acquire(&gc_ptls_sweep_idx) >= 0 ) || jl_atomic_load_acquire(&gc_n_threads_sweeping_stacks) != 0) {
jl_cpu_pause();
}
}

void sweep_stack_pools(jl_ptls_t ptls) JL_NOTSAFEPOINT
{
// initialize ptls index for parallel sweeping of stack pools
assert(gc_n_threads);
int stack_free_idx = jl_atomic_load_relaxed(&gc_stack_free_idx);
if (stack_free_idx + 1 == gc_n_threads)
jl_atomic_store_relaxed(&gc_stack_free_idx, 0);
else
jl_atomic_store_relaxed(&gc_stack_free_idx, stack_free_idx + 1);
jl_atomic_store_release(&gc_ptls_sweep_idx, gc_n_threads - 1); // idx == gc_n_threads = release stacks to the OS so it's serial
gc_sweep_wake_all_stacks(ptls);
sweep_stack_pool_loop();
gc_sweep_wait_for_all_stacks();
}

static void gc_pool_sync_nfree(jl_gc_pagemeta_t *pg, jl_taggedvalue_t *last) JL_NOTSAFEPOINT
{
assert(pg->fl_begin_offset != UINT16_MAX);
Expand Down Expand Up @@ -1076,7 +1119,7 @@ int gc_sweep_prescan(jl_ptls_t ptls, jl_gc_padded_page_stack_t *new_gc_allocd_sc
}

// wake up all threads to sweep the pages
void gc_sweep_wake_all(jl_ptls_t ptls, jl_gc_padded_page_stack_t *new_gc_allocd_scratch)
void gc_sweep_wake_all_pages(jl_ptls_t ptls, jl_gc_padded_page_stack_t *new_gc_allocd_scratch)
{
int parallel_sweep_worthwhile = gc_sweep_prescan(ptls, new_gc_allocd_scratch);
if (parallel_sweep_worthwhile && !page_profile_enabled) {
Expand Down Expand Up @@ -1112,18 +1155,18 @@ void gc_sweep_wake_all(jl_ptls_t ptls, jl_gc_padded_page_stack_t *new_gc_allocd_
}

// wait for all threads to finish sweeping
void gc_sweep_wait_for_all(void)
void gc_sweep_wait_for_all_pages(void)
{
jl_atomic_store(&gc_allocd_scratch, NULL);
while (jl_atomic_load_acquire(&gc_n_threads_sweeping) != 0) {
while (jl_atomic_load_acquire(&gc_n_threads_sweeping_pools) != 0) {
jl_cpu_pause();
}
}

// sweep all pools
void gc_sweep_pool_parallel(jl_ptls_t ptls)
{
jl_atomic_fetch_add(&gc_n_threads_sweeping, 1);
jl_atomic_fetch_add(&gc_n_threads_sweeping_pools, 1);
jl_gc_padded_page_stack_t *allocd_scratch = jl_atomic_load(&gc_allocd_scratch);
if (allocd_scratch != NULL) {
gc_page_profiler_serializer_t serializer = gc_page_serializer_create();
Expand Down Expand Up @@ -1168,7 +1211,7 @@ void gc_sweep_pool_parallel(jl_ptls_t ptls)
}
gc_page_serializer_destroy(&serializer);
}
jl_atomic_fetch_add(&gc_n_threads_sweeping, -1);
jl_atomic_fetch_add(&gc_n_threads_sweeping_pools, -1);
}

// free all pages (i.e. through `madvise` on Linux) that were lazily freed
Expand Down Expand Up @@ -1258,9 +1301,9 @@ static void gc_sweep_pool(void)
// the actual sweeping
jl_gc_padded_page_stack_t *new_gc_allocd_scratch = (jl_gc_padded_page_stack_t *) calloc_s(n_threads * sizeof(jl_gc_padded_page_stack_t));
jl_ptls_t ptls = jl_current_task->ptls;
gc_sweep_wake_all(ptls, new_gc_allocd_scratch);
gc_sweep_wake_all_pages(ptls, new_gc_allocd_scratch);
gc_sweep_pool_parallel(ptls);
gc_sweep_wait_for_all();
gc_sweep_wait_for_all_pages();

// reset half-pages pointers
for (int t_i = 0; t_i < n_threads; t_i++) {
Expand Down Expand Up @@ -3069,6 +3112,11 @@ static int _jl_gc_collect(jl_ptls_t ptls, jl_gc_collection_t collection)
#endif
current_sweep_full = sweep_full;
sweep_weak_refs();
uint64_t stack_pool_time = jl_hrtime();
sweep_stack_pools(ptls);
stack_pool_time = jl_hrtime() - stack_pool_time;
gc_num.total_stack_pool_sweep_time += stack_pool_time;
gc_num.stack_pool_sweep_time = stack_pool_time;
gc_sweep_other(ptls, sweep_full);
gc_scrub();
gc_verify_tags();
Expand Down Expand Up @@ -3480,6 +3528,10 @@ STATIC_INLINE int may_sweep(jl_ptls_t ptls) JL_NOTSAFEPOINT
return (jl_atomic_load(&ptls->gc_tls.gc_sweeps_requested) > 0);
}

STATIC_INLINE int may_sweep_stack(jl_ptls_t ptls) JL_NOTSAFEPOINT
{
return (jl_atomic_load(&ptls->gc_tls.gc_stack_sweep_requested) > 0);
}
// parallel gc thread function
void jl_parallel_gc_threadfun(void *arg)
{
Expand All @@ -3502,12 +3554,17 @@ void jl_parallel_gc_threadfun(void *arg)

while (1) {
uv_mutex_lock(&gc_threads_lock);
while (!may_mark() && !may_sweep(ptls)) {
while (!may_mark() && !may_sweep(ptls) && !may_sweep_stack(ptls)) {
uv_cond_wait(&gc_threads_cond, &gc_threads_lock);
}
uv_mutex_unlock(&gc_threads_lock);
assert(jl_atomic_load_relaxed(&ptls->gc_state) == JL_GC_PARALLEL_COLLECTOR_THREAD);
gc_mark_loop_parallel(ptls, 0);
if (may_sweep_stack(ptls)) {
assert(jl_atomic_load_relaxed(&ptls->gc_state) == JL_GC_PARALLEL_COLLECTOR_THREAD);
sweep_stack_pool_loop();
jl_atomic_fetch_add(&ptls->gc_tls.gc_stack_sweep_requested, -1);
}
gbaraldi marked this conversation as resolved.
Show resolved Hide resolved
d-netto marked this conversation as resolved.
Show resolved Hide resolved
if (may_sweep(ptls)) {
assert(jl_atomic_load_relaxed(&ptls->gc_state) == JL_GC_PARALLEL_COLLECTOR_THREAD);
gc_sweep_pool_parallel(ptls);
Expand Down
7 changes: 5 additions & 2 deletions src/gc-stock.h
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,10 @@ extern uv_mutex_t gc_threads_lock;
extern uv_cond_t gc_threads_cond;
extern uv_sem_t gc_sweep_assists_needed;
extern _Atomic(int) gc_n_threads_marking;
extern _Atomic(int) gc_n_threads_sweeping;
extern _Atomic(int) gc_n_threads_sweeping_pools;
extern _Atomic(int) gc_n_threads_sweeping_stacks;
extern _Atomic(int) gc_ptls_sweep_idx;
extern _Atomic(int) gc_stack_free_idx;
extern _Atomic(int) n_threads_running;
extern uv_barrier_t thread_init_done;
void gc_mark_queue_all_roots(jl_ptls_t ptls, jl_gc_markqueue_t *mq);
Expand All @@ -521,7 +524,7 @@ void gc_mark_loop_serial(jl_ptls_t ptls);
void gc_mark_loop_parallel(jl_ptls_t ptls, int master);
void gc_sweep_pool_parallel(jl_ptls_t ptls);
void gc_free_pages(void);
void sweep_stack_pools(void) JL_NOTSAFEPOINT;
void sweep_stack_pool_loop(void) JL_NOTSAFEPOINT;
void jl_gc_debug_init(void);

// GC pages
Expand Down
1 change: 1 addition & 0 deletions src/gc-tls.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ typedef struct {
jl_gc_markqueue_t mark_queue;
jl_gc_mark_cache_t gc_cache;
_Atomic(size_t) gc_sweeps_requested;
_Atomic(size_t) gc_stack_sweep_requested;
arraylist_t sweep_objs;
} jl_gc_tls_states_t;

Expand Down