Skip to content

Commit

Permalink
started gc threads
Browse files Browse the repository at this point in the history
  • Loading branch information
d-netto committed Feb 9, 2023
1 parent 578c432 commit 6d76135
Show file tree
Hide file tree
Showing 13 changed files with 247 additions and 74 deletions.
1 change: 1 addition & 0 deletions base/options.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ struct JLOptions
cpu_target::Ptr{UInt8}
nthreadpools::Int16
nthreads::Int16
ngcthreads::Int8
nthreads_per_pool::Ptr{Int16}
nprocs::Int32
machine_file::Ptr{UInt8}
Expand Down
4 changes: 0 additions & 4 deletions contrib/generate_precompile.jl
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

if Threads.maxthreadid() != 1
@warn "Running this file with multiple Julia threads may lead to a build error" Base.maxthreadid()
end

if Base.isempty(Base.ARGS) || Base.ARGS[1] !== "0"
Sys.__init_build()
# Prevent this from being put into the Main namespace
Expand Down
102 changes: 102 additions & 0 deletions src/chase-lev-deque.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// This file is a part of Julia. License is MIT: https://julialang.org/license

#ifndef CHASE_LEV_DEQUE_H
#define CHASE_LEV_DEQUE_H

#include "julia_atomics.h"

#ifdef __cplusplus
extern "C" {
#endif

typedef struct {
void **buffer;
int64_t capacity;
} ws_array_t;

STATIC_INLINE ws_array_t *create_ws_array(size_t capacity) JL_NOTSAFEPOINT
{
ws_array_t *a = (ws_array_t *)malloc_s(sizeof(ws_array_t));
a->buffer = (void **)malloc_s(capacity * sizeof(void *));
a->capacity = capacity;
return a;
}

// =======
// Chase and Lev's work-stealing queue, optimized for
// weak memory models by Le et al.
//
// * Chase D., Lev Y. Dynamic Circular Work-Stealing queue
// * Le N. M. et al. Correct and Efficient Work-Stealing for
// Weak Memory Models
// =======

typedef struct {
_Atomic(int64_t) top;
_Atomic(int64_t) bottom;
_Atomic(ws_array_t *) array;
} ws_queue_t;

STATIC_INLINE void ws_queue_push(ws_queue_t *q, void *v) JL_NOTSAFEPOINT
{
int64_t b = jl_atomic_load_relaxed(&q->bottom);
int64_t t = jl_atomic_load_acquire(&q->top);
ws_array_t *a = jl_atomic_load_relaxed(&q->array);
if (__unlikely(b - t > a->capacity - 1))
abort();
jl_atomic_store_relaxed((_Atomic(void *) *)&a->buffer[b % a->capacity], v);
jl_fence_release();
jl_atomic_store_relaxed(&q->bottom, b + 1);
}

STATIC_INLINE void *ws_queue_pop(ws_queue_t *q) JL_NOTSAFEPOINT
{
int64_t b = jl_atomic_load_relaxed(&q->bottom) - 1;
ws_array_t *a = jl_atomic_load_relaxed(&q->array);
jl_atomic_store_relaxed(&q->bottom, b);
#if defined(_CPU_X86_64_)
__asm__ volatile ("lock orq $0, (%rsp)");
#else
jl_fence();
#endif
int64_t t = jl_atomic_load_relaxed(&q->top);
void *v;
if (__likely(t <= b)) {
v = jl_atomic_load_relaxed((_Atomic(void *) *)&a->buffer[b % a->capacity]);
if (t == b) {
if (!jl_atomic_cmpswap(&q->top, &t, t + 1))
v = NULL;
jl_atomic_store_relaxed(&q->bottom, b + 1);
}
}
else {
v = NULL;
jl_atomic_store_relaxed(&q->bottom, b + 1);
}
return v;
}

STATIC_INLINE void *ws_queue_steal_from(ws_queue_t *q) JL_NOTSAFEPOINT
{
int64_t t = jl_atomic_load_acquire(&q->top);
#if defined(_CPU_X86_64_)
__asm__ volatile ("lock orq $0, (%rsp)");
#else
jl_fence();
#endif
int64_t b = jl_atomic_load_acquire(&q->bottom);
void *v = NULL;
if (t < b) {
ws_array_t *a = jl_atomic_load_relaxed(&q->array);
v = jl_atomic_load_relaxed((_Atomic(void *) *)&a->buffer[t % a->capacity]);
if (!jl_atomic_cmpswap(&q->top, &t, t + 1))
return NULL;
}
return v;
}

#ifdef __cplusplus
}
#endif

#endif
24 changes: 0 additions & 24 deletions src/gc-debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -1268,30 +1268,6 @@ int gc_slot_to_arrayidx(void *obj, void *_slot) JL_NOTSAFEPOINT
return (slot - start) / elsize;
}

// Print a backtrace from the `mq->start` of the mark queue up to `mq->current`
// `offset` will be added to `mq->current` for convenience in the debugger.
NOINLINE void gc_mark_loop_unwind(jl_ptls_t ptls, jl_gc_markqueue_t *mq, int offset)
{
jl_jmp_buf *old_buf = jl_get_safe_restore();
jl_jmp_buf buf;
jl_set_safe_restore(&buf);
if (jl_setjmp(buf, 0) != 0) {
jl_safe_printf("\n!!! ERROR when unwinding gc mark loop -- ABORTING !!!\n");
jl_set_safe_restore(old_buf);
return;
}
jl_value_t **start = mq->start;
jl_value_t **end = mq->current + offset;
for (; start < end; start++) {
jl_value_t *obj = *start;
jl_taggedvalue_t *o = jl_astaggedvalue(obj);
jl_safe_printf("Queued object: %p :: (tag: %zu) (bits: %zu)\n", obj,
(uintptr_t)o->header, ((uintptr_t)o->header & 3));
jl_((void*)(jl_datatype_t *)(o->header & ~(uintptr_t)0xf));
}
jl_set_safe_restore(old_buf);
}

static int gc_logging_enabled = 0;

JL_DLLEXPORT void jl_enable_gc_logging(int enable) {
Expand Down
104 changes: 74 additions & 30 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
extern "C" {
#endif

int jl_n_gcthreads;
uv_mutex_t gc_threads_lock;
uv_cond_t gc_threads_cond;
_Atomic(uint8_t) jl_gc_marking;
_Atomic(uint8_t) gc_threads_entered_marking;
_Atomic(uint8_t) gc_n_threads_marking;

// Linked list of callback functions

typedef void (*jl_gc_cb_func_t)(void);
Expand Down Expand Up @@ -1757,7 +1764,6 @@ JL_NORETURN NOINLINE void gc_assert_datatype_fail(jl_ptls_t ptls, jl_datatype_t
jl_gc_debug_print_status();
jl_(vt);
jl_gc_debug_critical_error();
gc_mark_loop_unwind(ptls, mq, 0);
abort();
}

Expand All @@ -1780,35 +1786,22 @@ STATIC_INLINE void gc_mark_push_remset(jl_ptls_t ptls, jl_value_t *obj,
}
}

// Double the mark queue
static NOINLINE void gc_markqueue_resize(jl_gc_markqueue_t *mq) JL_NOTSAFEPOINT
{
jl_value_t **old_start = mq->start;
size_t old_queue_size = (mq->end - mq->start);
size_t offset = (mq->current - old_start);
mq->start = (jl_value_t **)realloc_s(old_start, 2 * old_queue_size * sizeof(jl_value_t *));
mq->current = (mq->start + offset);
mq->end = (mq->start + 2 * old_queue_size);
}

// Push a work item to the queue
STATIC_INLINE void gc_markqueue_push(jl_gc_markqueue_t *mq, jl_value_t *obj) JL_NOTSAFEPOINT
{
if (__unlikely(mq->current == mq->end))
gc_markqueue_resize(mq);
*mq->current = obj;
mq->current++;
ws_queue_push(&mq->q, obj);
}

// Pop from the mark queue
STATIC_INLINE jl_value_t *gc_markqueue_pop(jl_gc_markqueue_t *mq)
{
jl_value_t *obj = NULL;
if (mq->current != mq->start) {
mq->current--;
obj = *mq->current;
}
return obj;
return ws_queue_pop(&mq->q);
}

// Steal from `mq2`
STATIC_INLINE jl_value_t *gc_markqueue_steal_from(jl_gc_markqueue_t *mq2)
{
return ws_queue_steal_from(&mq2->q);
}

// Double the chunk queue
Expand Down Expand Up @@ -2571,6 +2564,55 @@ JL_EXTENSION NOINLINE void gc_mark_loop(jl_ptls_t ptls)
gc_drain_own_chunkqueue(ptls, &ptls->mark_queue);
}

void gc_mark_loop_worker(jl_ptls_t ptls)
{
jl_atomic_fetch_add(&gc_n_threads_marking, 1);
jl_atomic_store(&gc_threads_entered_marking, 1);
jl_gc_markqueue_t *mq = &ptls->mark_queue;
void *new_obj;
pop : {
new_obj = gc_markqueue_pop(mq);
// Couldn't get object from own queue: try to
// steal from someone else
if (new_obj == NULL)
goto steal;
}
mark : {
gc_mark_outrefs(ptls, mq, new_obj, 0);
goto pop;
}
steal : {
// Steal from a random victim
for (int i = 0; i < 2 * gc_n_threads; i++) {
uint32_t v = cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % gc_n_threads;
jl_gc_markqueue_t *mq2 = &gc_all_tls_states[v]->mark_queue;
new_obj = gc_markqueue_steal_from(mq2);
if (new_obj != NULL)
goto mark;
}
}
gc_drain_own_chunkqueue(ptls, mq);
jl_atomic_fetch_add(&gc_n_threads_marking, -1);
}

void gc_mark_loop_master(jl_ptls_t ptls)
{
if (__likely(jl_n_gcthreads) == 0) {
gc_mark_loop(ptls);
}
else {
jl_atomic_store(&gc_threads_entered_marking, 0);
jl_atomic_store(&jl_gc_marking, 1);
uv_mutex_lock(&gc_threads_lock);
uv_cond_broadcast(&gc_threads_cond);
uv_mutex_unlock(&gc_threads_lock);
// Spin while gc threads are marking
while (!jl_atomic_load(&gc_threads_entered_marking) || jl_atomic_load(&gc_n_threads_marking) > 0)
jl_cpu_pause();
jl_atomic_store(&jl_gc_marking, 0);
}
}

static void gc_premark(jl_ptls_t ptls2)
{
arraylist_t *remset = ptls2->heap.remset;
Expand Down Expand Up @@ -2849,7 +2891,7 @@ static int _jl_gc_collect(jl_ptls_t ptls, jl_gc_collection_t collection)
gc_invoke_callbacks(jl_gc_cb_root_scanner_t,
gc_cblist_root_scanner, (collection));
}
gc_mark_loop(ptls);
gc_mark_loop_master(ptls);

// 4. check for objects to finalize
clear_weak_refs();
Expand Down Expand Up @@ -3211,14 +3253,14 @@ void jl_init_thread_heap(jl_ptls_t ptls)
gc_cache->nbig_obj = 0;

// Initialize GC mark-queue
size_t init_size = (1 << 18);
jl_gc_markqueue_t *mq = &ptls->mark_queue;
mq->start = (jl_value_t **)malloc_s(init_size * sizeof(jl_value_t *));
mq->current = mq->start;
mq->end = mq->start + init_size;
size_t cq_init_size = (1 << 14);
mq->current_chunk = mq->chunk_start = (jl_gc_chunk_t *)malloc_s(cq_init_size * sizeof(jl_gc_chunk_t));
mq->chunk_end = mq->chunk_start + cq_init_size;
ws_queue_t *q = &mq->q;
jl_atomic_store_relaxed(&q->top, 0);
jl_atomic_store_relaxed(&q->bottom, 0);
ws_array_t *a = create_ws_array(1 << 18);
jl_atomic_store_relaxed(&q->array, a);
mq->current_chunk = mq->chunk_start = (jl_gc_chunk_t *)malloc_s((1 << 14) * sizeof(jl_gc_chunk_t));
mq->chunk_end = mq->chunk_start + (1 << 14);

memset(&ptls->gc_num, 0, sizeof(ptls->gc_num));
jl_atomic_store_relaxed(&ptls->gc_num.allocd, -(int64_t)gc_num.interval);
Expand All @@ -3234,6 +3276,8 @@ void jl_gc_init(void)
JL_MUTEX_INIT(&finalizers_lock);
uv_mutex_init(&gc_cache_lock);
uv_mutex_init(&gc_perm_lock);
uv_mutex_init(&gc_threads_lock);
uv_cond_init(&gc_threads_cond);

jl_gc_init_page();
jl_gc_debug_init();
Expand Down
6 changes: 1 addition & 5 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -849,11 +849,7 @@ static NOINLINE void _finish_julia_init(JL_IMAGE_SEARCH rel, jl_ptls_t ptls, jl_
jl_load(jl_core_module, "boot.jl");
post_boot_hooks();
}

if (jl_base_module == NULL) {
// nthreads > 1 requires code in Base
jl_atomic_store_relaxed(&jl_n_threads, 1);
}

jl_start_threads();

jl_gc_enable(1);
Expand Down
12 changes: 12 additions & 0 deletions src/jloptions.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ JL_DLLEXPORT void jl_init_options(void)
NULL, // cpu_target ("native", "core2", etc...)
0, // nthreadpools
0, // nthreads
0, // ngcthreads
NULL, // nthreads_per_pool
0, // nprocs
NULL, // machine_file
Expand Down Expand Up @@ -128,6 +129,7 @@ static const char opts[] =
" interface if supported (Linux and Windows) or to the number of CPU\n"
" threads if not supported (MacOS) or if process affinity is not\n"
" configured, and sets M to 1.\n"
" --gc=N Enable N gc threads\n"
" -p, --procs {N|auto} Integer value N launches N additional local worker processes\n"
" \"auto\" launches as many workers as the number of local CPU threads (logical cores)\n"
" --machine-file <file> Run processes on hosts listed in <file>\n\n"
Expand Down Expand Up @@ -251,6 +253,7 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp)
opt_strip_metadata,
opt_strip_ir,
opt_heap_size_hint,
opt_gc_threads,
};
static const char* const shortopts = "+vhqH:e:E:L:J:C:it:p:O:g:";
static const struct option longopts[] = {
Expand All @@ -275,6 +278,7 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp)
{ "cpu-target", required_argument, 0, 'C' },
{ "procs", required_argument, 0, 'p' },
{ "threads", required_argument, 0, 't' },
{ "gc", required_argument, 0, opt_gc_threads },
{ "machine-file", required_argument, 0, opt_machine_file },
{ "project", optional_argument, 0, opt_project },
{ "color", required_argument, 0, opt_color },
Expand Down Expand Up @@ -812,6 +816,14 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp)
if (jl_options.heap_size_hint == 0)
jl_errorf("julia: invalid argument to --heap-size-hint without memory size specified");

break;
case opt_gc_threads:
if (optarg != NULL) {
jl_options.ngcthreads = atoi(optarg);
}
if (jl_options.ngcthreads == 0)
jl_errorf("julia: invalid argument to --gc without number of threads specified");

break;
default:
jl_errorf("julia: unhandled option -- %c\n"
Expand Down
1 change: 1 addition & 0 deletions src/jloptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ typedef struct {
const char *cpu_target;
int8_t nthreadpools;
int16_t nthreads;
int8_t ngcthreads;
const int16_t *nthreads_per_pool;
int32_t nprocs;
const char *machine_file;
Expand Down
1 change: 1 addition & 0 deletions src/julia.h
Original file line number Diff line number Diff line change
Expand Up @@ -1682,6 +1682,7 @@ JL_DLLEXPORT jl_sym_t *jl_get_ARCH(void) JL_NOTSAFEPOINT;
JL_DLLEXPORT jl_value_t *jl_get_libllvm(void) JL_NOTSAFEPOINT;
extern JL_DLLIMPORT int jl_n_threadpools;
extern JL_DLLIMPORT _Atomic(int) jl_n_threads;
extern JL_DLLIMPORT int jl_n_gcthreads;
extern JL_DLLIMPORT int *jl_n_threads_per_pool;

// environment entries
Expand Down
Loading

0 comments on commit 6d76135

Please sign in to comment.