diff --git a/base/sysimg.jl b/base/sysimg.jl index 0d1cf2921708d..bf934e1a7bf78 100644 --- a/base/sysimg.jl +++ b/base/sysimg.jl @@ -261,6 +261,8 @@ include("graphics.jl") include("profile.jl") importall .Profile +include("threading.jl") + function __init__() # Base library init reinit_stdio() diff --git a/base/threading.jl b/base/threading.jl new file mode 100644 index 0000000000000..e29de9ae86790 --- /dev/null +++ b/base/threading.jl @@ -0,0 +1,104 @@ +#= +This file contains experimental threading support for Julia. The following rules +should be taken into account when using the threading functions. + +- The function Base.parapply is the best way to use multiple threads. It takes as argument +a function of the form f(r::UnitRange, args...) that operates over a certain range r. parapply +gets as second argument the full range over that f should be executed wheras internally the +range will be split into chunks that are executed in different threads. The variables f is +supposed to work on are passes as varargs after the range argument. parapply has two keyword arguments: +preapply is used to execute the function f once on the main thread. This is very useful for finding +syntax errors that would otherwise crash the Julia process. The other keyword argument is numthreads +that can be used to control the number of threads used. + +- Base.parapply does switch the garbage collector of. Thus one should not allocate more memory in the +threads than absolutely needed. It is much better to preallocate it and pass it to the threads. + +- Do not use print statements in the threading functions (Hopefully we can fix this) + +- When locking is required there is a Mutex type. + +- Running threads in an @async task is probably a very bad idea (Fixme) + +- There is a low level Thread type. When using this the main thread should directly join it so that the main +thread is waiting until the worker threads are done. Further one should switch the garbage collector off before +running the threads. +=# + + +### Thread type + +type Thread + handle::Ptr{Void} +end + +function join(t::Thread) + ccall(:jl_join_thread,Void,(Ptr{Void},),t.handle) + e = exception(t) + if e != nothing + throw(e) + end +end +run(t::Thread) = (ccall(:jl_run_thread,Void,(Ptr{Void},),t.handle)) +destroy(t::Thread) = (ccall(:jl_destroy_thread,Void,(Ptr{Void},),t.handle)) +exception(t::Thread) = ccall(:jl_thread_exception,Exception,(Ptr{Void},),t.handle) + +function Thread(f::Function,args...) + t = Thread(ccall(:jl_create_thread,Ptr{Void},(Any,Any),f,args)) + finalizer(t, destroy) + t +end + +### Mutex type + +type Mutex + handle::Ptr{Void} +end + +lock(m::Mutex) = (ccall(:jl_lock_mutex,Void,(Ptr{Void},),m.handle)) +unlock(m::Mutex) = (ccall(:jl_unlock_mutex,Void,(Ptr{Void},),m.handle)) +destroy(m::Mutex) = (ccall(:jl_destroy_mutex,Void,(Ptr{Void},),m.handle)) + +function Mutex() + m = Mutex(ccall(:jl_create_mutex,Ptr{Void},())) + finalizer(m, destroy) + m +end + + +### parallel apply function (scheduling in julia). + +function parapply(f::Function, r::UnitRange, args...; preapply::Bool = true, numthreads::Int = CPU_CORES) + + st = start(r) + len = length(r) + + t = Array(Base.Thread,numthreads) + + if(preapply) + f(range(st, 1), args...) + st = st + 1 + len = len - 1 + end + + chunk = ifloor(len / numthreads) + rem = len + + gc_disable() + for i=0:(numthreads-2) + t[i+1] = Base.Thread(f,range(int(st+i*chunk), chunk), args...) + rem -= chunk + end + t[numthreads] = Base.Thread(f,range(int(st+(numthreads-1)*chunk), rem), args...) + + for i=1:numthreads + Base.run(t[i]) + end + + for i=1:numthreads + Base.join(t[i]) + end + + gc_enable() + #gc() +end diff --git a/src/Makefile b/src/Makefile index b7ab79cee1b7e..6e9be6934db09 100644 --- a/src/Makefile +++ b/src/Makefile @@ -8,7 +8,7 @@ override CPPFLAGS += $(JCPPFLAGS) SRCS = \ jltypes gf ast builtins module codegen interpreter \ - alloc dlload sys init task array dump toplevel jl_uv jlapi profile llvm-simdloop + alloc dlload sys init task array dump toplevel jl_uv jlapi profile llvm-simdloop threading FLAGS = \ -D_GNU_SOURCE -Iflisp -Isupport \ diff --git a/src/builtins.c b/src/builtins.c index fdb3d8d700e03..5c7df429ef43a 100644 --- a/src/builtins.c +++ b/src/builtins.c @@ -32,6 +32,8 @@ extern "C" { #endif +JL_DEFINE_MUTEX_EXT(codegen) + // exceptions ----------------------------------------------------------------- DLLEXPORT void jl_error(const char *str) @@ -369,6 +371,7 @@ extern int jl_lineno; JL_CALLABLE(jl_f_top_eval) { + JL_LOCK(codegen) jl_module_t *m; jl_value_t *ex; if (nargs == 1) { @@ -381,14 +384,17 @@ JL_CALLABLE(jl_f_top_eval) m = (jl_module_t*)args[0]; ex = args[1]; } + jl_value_t *v=NULL; if (jl_is_symbol(ex)) { - return jl_eval_global_var(m, (jl_sym_t*)ex); + v = jl_eval_global_var(m, (jl_sym_t*)ex); + JL_UNLOCK(codegen) + return v; } - jl_value_t *v=NULL; int last_lineno = jl_lineno; if (m == jl_current_module) { v = jl_toplevel_eval(ex); jl_lineno = last_lineno; + JL_UNLOCK(codegen) return v; } jl_module_t *last_m = jl_current_module; @@ -401,12 +407,14 @@ JL_CALLABLE(jl_f_top_eval) jl_lineno = last_lineno; jl_current_module = last_m; jl_current_task->current_module = task_last_m; + JL_UNLOCK(codegen) jl_rethrow(); } jl_lineno = last_lineno; jl_current_module = last_m; jl_current_task->current_module = task_last_m; assert(v); + JL_UNLOCK(codegen) return v; } @@ -745,6 +753,7 @@ int jl_eval_with_compiler_p(jl_expr_t *expr, int compileloops); JL_CALLABLE(jl_trampoline) { + JL_LOCK(codegen) assert(jl_is_func(F)); jl_function_t *f = (jl_function_t*)F; assert(f->linfo != NULL); @@ -769,6 +778,8 @@ JL_CALLABLE(jl_trampoline) if (jl_boot_file_loaded && jl_is_expr(f->linfo->ast)) { f->linfo->ast = jl_compress_ast(f->linfo, f->linfo->ast); } + JL_UNLOCK(codegen) + return jl_apply(f, args, nargs); } diff --git a/src/codegen.cpp b/src/codegen.cpp index c5bfa311d6a64..7ed704e1b0db2 100644 --- a/src/codegen.cpp +++ b/src/codegen.cpp @@ -479,12 +479,15 @@ static void jl_rethrow_with_add(const char *fmt, ...) jl_rethrow(); } +JL_DEFINE_MUTEX_EXT(codegen) + // --- entry point --- //static int n_emit=0; static Function *emit_function(jl_lambda_info_t *lam, bool cstyle); //static int n_compile=0; static Function *to_function(jl_lambda_info_t *li, bool cstyle) { + JL_LOCK(codegen) JL_SIGATOMIC_BEGIN(); assert(!li->inInference); BasicBlock *old = nested_compile ? builder.GetInsertBlock() : NULL; @@ -506,6 +509,7 @@ static Function *to_function(jl_lambda_info_t *li, bool cstyle) builder.SetCurrentDebugLocation(olddl); } JL_SIGATOMIC_END(); + JL_UNLOCK(codegen) jl_rethrow_with_add("error compiling %s", li->name->name); } assert(f != NULL); @@ -539,6 +543,7 @@ static Function *to_function(jl_lambda_info_t *li, bool cstyle) builder.SetCurrentDebugLocation(olddl); } JL_SIGATOMIC_END(); + JL_UNLOCK(codegen) return f; } @@ -557,6 +562,7 @@ static void jl_setup_module(Module *m, bool add) extern "C" void jl_generate_fptr(jl_function_t *f) { + JL_LOCK(codegen) // objective: assign li->fptr jl_lambda_info_t *li = f->linfo; assert(li->functionObject); @@ -575,7 +581,7 @@ extern "C" void jl_generate_fptr(jl_function_t *f) #endif Function *llvmf = (Function*)li->functionObject; - + #ifdef USE_MCJIT li->fptr = (jl_fptr_t)jl_ExecutionEngine->getFunctionAddress(llvmf->getName()); #else @@ -597,6 +603,7 @@ extern "C" void jl_generate_fptr(jl_function_t *f) } } f->fptr = li->fptr; + JL_UNLOCK(codegen) } extern "C" void jl_compile(jl_function_t *f) diff --git a/src/gc.c b/src/gc.c index 96c9895646b41..956503ac73dd2 100644 --- a/src/gc.c +++ b/src/gc.c @@ -19,6 +19,9 @@ extern "C" { #endif +JL_DEFINE_MUTEX_EXT(gc) +extern uv_mutex_t gc_pool_mutex[N_GC_THREADS]; + typedef struct _gcpage_t { char data[GC_PAGE_SZ]; union { @@ -62,8 +65,8 @@ typedef struct _bigval_t { } bigval_t; // GC knobs and self-measurement variables -static size_t allocd_bytes = 0; -static int64_t total_allocd_bytes = 0; +static volatile size_t allocd_bytes = 0; +static volatile int64_t total_allocd_bytes = 0; static size_t freed_bytes = 0; #define default_collect_interval (3200*1024*sizeof(void*)) static size_t collect_interval = default_collect_interval; @@ -120,7 +123,7 @@ DLLEXPORT void *jl_gc_counted_malloc(size_t sz) { if (allocd_bytes > collect_interval) jl_gc_collect(); - allocd_bytes += sz; + JL_ATOMIC_FETCH_AND_ADD(allocd_bytes,sz); void *b = malloc(sz); if (b == NULL) jl_throw(jl_memory_exception); @@ -130,14 +133,14 @@ DLLEXPORT void *jl_gc_counted_malloc(size_t sz) DLLEXPORT void jl_gc_counted_free(void *p, size_t sz) { free(p); - freed_bytes += sz; + JL_ATOMIC_FETCH_AND_ADD(freed_bytes,sz); } DLLEXPORT void *jl_gc_counted_realloc(void *p, size_t sz) { if (allocd_bytes > collect_interval) jl_gc_collect(); - allocd_bytes += ((sz+1)/2); // NOTE: wild guess at growth amount + JL_ATOMIC_FETCH_AND_ADD(allocd_bytes,((sz+1)/2)); // NOTE: wild guess at growth amount void *b = realloc(p, sz); if (b == NULL) jl_throw(jl_memory_exception); @@ -149,7 +152,7 @@ DLLEXPORT void *jl_gc_counted_realloc_with_old_size(void *p, size_t old, size_t if (allocd_bytes > collect_interval) jl_gc_collect(); if (sz > old) - allocd_bytes += (sz-old); + JL_ATOMIC_FETCH_AND_ADD(allocd_bytes,sz-old); void *b = realloc(p, sz); if (b == NULL) jl_throw(jl_memory_exception); @@ -163,8 +166,10 @@ void *jl_gc_managed_malloc(size_t sz) sz = (sz+15) & -16; void *b = malloc_a16(sz); if (b == NULL) + { jl_throw(jl_memory_exception); - allocd_bytes += sz; + } + JL_ATOMIC_FETCH_AND_ADD(allocd_bytes,sz); return b; } @@ -192,8 +197,10 @@ void *jl_gc_managed_realloc(void *d, size_t sz, size_t oldsz, int isaligned) } #endif if (b == NULL) + { jl_throw(jl_memory_exception); - allocd_bytes += sz; + } + JL_ATOMIC_FETCH_AND_ADD(allocd_bytes,sz); return b; } @@ -208,12 +215,16 @@ int jl_gc_n_preserved_values(void) void jl_gc_preserve(jl_value_t *v) { + JL_LOCK(gc) arraylist_push(&preserved_values, (void*)v); + JL_UNLOCK(gc) } void jl_gc_unpreserve(void) { + JL_LOCK(gc) (void)arraylist_pop(&preserved_values); + JL_UNLOCK(gc) } // weak references @@ -225,7 +236,9 @@ DLLEXPORT jl_weakref_t *jl_gc_new_weakref(jl_value_t *value) jl_weakref_t *wr = (jl_weakref_t*)alloc_2w(); wr->type = (jl_value_t*)jl_weakref_type; wr->value = value; + JL_LOCK(gc) arraylist_push(&weak_refs, wr); + JL_UNLOCK(gc) return wr; } @@ -320,6 +333,7 @@ void jl_gc_run_all_finalizers(void) void jl_gc_add_finalizer(jl_value_t *v, jl_function_t *f) { + JL_LOCK(gc) jl_value_t **bp = (jl_value_t**)ptrhash_bp(&finalizer_table, v); if (*bp == HT_NOTFOUND) { *bp = (jl_value_t*)f; @@ -327,6 +341,7 @@ void jl_gc_add_finalizer(jl_value_t *v, jl_function_t *f) else { *bp = (jl_value_t*)jl_tuple2((jl_value_t*)f, *bp); } + JL_UNLOCK(gc) } // big value list @@ -342,16 +357,20 @@ static void *alloc_big(size_t sz) jl_throw(jl_memory_exception); size_t allocsz = (sz+offs+15) & -16; bigval_t *v = (bigval_t*)malloc_a16(allocsz); - allocd_bytes += allocsz; + JL_ATOMIC_FETCH_AND_ADD(allocd_bytes,allocsz); if (v == NULL) + { jl_throw(jl_memory_exception); + } #ifdef MEMDEBUG //memset(v, 0xee, allocsz); #endif v->sz = sz; v->flags = 0; + JL_LOCK(gc) v->next = big_objects; big_objects = v; + JL_UNLOCK(gc) return &v->_data[0]; } @@ -389,6 +408,7 @@ static mallocarray_t *mafreelist = NULL; void jl_gc_track_malloced_array(jl_array_t *a) { + JL_LOCK(gc) mallocarray_t *ma; if (mafreelist == NULL) { ma = (mallocarray_t*)malloc(sizeof(mallocarray_t)); @@ -400,6 +420,7 @@ void jl_gc_track_malloced_array(jl_array_t *a) ma->a = a; ma->next = mallocarrays; mallocarrays = ma; + JL_UNLOCK(gc) } static size_t array_nbytes(jl_array_t *a) @@ -445,9 +466,9 @@ static void sweep_malloced_arrays() // pool allocation #define N_POOLS 42 -static pool_t norm_pools[N_POOLS]; -static pool_t ephe_pools[N_POOLS]; -static pool_t *pools = &norm_pools[0]; +static pool_t norm_pools[N_GC_THREADS][N_POOLS]; +static pool_t ephe_pools[N_GC_THREADS][N_POOLS]; +static pool_t *pools= &norm_pools[0][0]; static void add_page(pool_t *p) { @@ -475,7 +496,7 @@ static inline void *pool_alloc(pool_t *p) { if (allocd_bytes > collect_interval) jl_gc_collect(); - allocd_bytes += p->osize; + JL_ATOMIC_FETCH_AND_ADD(allocd_bytes,p->osize); if (p->freelist == NULL) { add_page(p); } @@ -569,10 +590,12 @@ static void gc_sweep(void) { sweep_malloced_arrays(); sweep_big(); - int i; - for(i=0; i < N_POOLS; i++) { - sweep_pool(&norm_pools[i]); - sweep_pool(&ephe_pools[i]); + int i,j; + for(j=0; j < N_GC_THREADS; j++) { + for(i=0; i < N_POOLS; i++) { + sweep_pool(&norm_pools[j][i]); + sweep_pool(&ephe_pools[j][i]); + } } jl_unmark_symbols(); } @@ -864,8 +887,8 @@ DLLEXPORT int jl_gc_is_enabled(void) { return is_gc_enabled; } DLLEXPORT int64_t jl_gc_total_bytes(void) { return total_allocd_bytes + allocd_bytes; } -void jl_gc_ephemeral_on(void) { pools = &ephe_pools[0]; } -void jl_gc_ephemeral_off(void) { pools = &norm_pools[0]; } +void jl_gc_ephemeral_on(void) { pools = &ephe_pools[0][0]; } +void jl_gc_ephemeral_off(void) { pools = &norm_pools[0][0]; } #if defined(MEMPROFILE) static void all_pool_stats(void); @@ -948,6 +971,30 @@ void jl_gc_collect(void) // allocator entry points +static inline void* thread_safe_pool_alloc(size_t szclass_) +{ + void* a; + + if(jl_main_thread_id == uv_thread_self() ) + return pool_alloc(&pools[szclass_]); + + for(int n=1; n 2048) - return alloc_big(sz); - return pool_alloc(&pools[szclass(sz)]); + { + a = alloc_big(sz); + return a; + } + + a = thread_safe_pool_alloc(szclass(sz)); + return a; } DLLEXPORT void *alloc_2w(void) { + void* b; #ifdef MEMDEBUG - return alloc_big(2*sizeof(void*)); + b = alloc_big(2*sizeof(void*)); + return b; #endif #ifdef _P64 - return pool_alloc(&pools[2]); + b = thread_safe_pool_alloc(2); #else - return pool_alloc(&pools[0]); + b = thread_safe_pool_alloc(0); #endif + return b; } DLLEXPORT void *alloc_3w(void) { + void* b; #ifdef MEMDEBUG - return alloc_big(3*sizeof(void*)); + b = alloc_big(3*sizeof(void*)); + return b; #endif #ifdef _P64 - return pool_alloc(&pools[4]); + b = thread_safe_pool_alloc(4); #else - return pool_alloc(&pools[1]); + b = thread_safe_pool_alloc(1); #endif + return b; } DLLEXPORT void *alloc_4w(void) { + void* b; #ifdef MEMDEBUG - return alloc_big(4*sizeof(void*)); + b = alloc_big(4*sizeof(void*)); + return b; #endif #ifdef _P64 - return pool_alloc(&pools[6]); + b = thread_safe_pool_alloc(6); #else - return pool_alloc(&pools[2]); + b = thread_safe_pool_alloc(2); #endif + return b; } #ifdef GC_FINAL_STATS @@ -1043,15 +1106,17 @@ void jl_gc_init(void) 640, 768, 896, 1024, 1536, 2048 }; - int i; - for(i=0; i < N_POOLS; i++) { - norm_pools[i].osize = szc[i]; - norm_pools[i].pages = NULL; - norm_pools[i].freelist = NULL; - - ephe_pools[i].osize = szc[i]; - ephe_pools[i].pages = NULL; - ephe_pools[i].freelist = NULL; + int i,j; + for(j=0; j < N_GC_THREADS; j++) { + for(i=0; i < N_POOLS; i++) { + norm_pools[j][i].osize = szc[i]; + norm_pools[j][i].pages = NULL; + norm_pools[j][i].freelist = NULL; + + ephe_pools[j][i].osize = szc[i]; + ephe_pools[j][i].pages = NULL; + ephe_pools[j][i].freelist = NULL; + } } htable_new(&finalizer_table, 0); @@ -1114,18 +1179,20 @@ static size_t pool_stats(pool_t *p, size_t *pwaste) static void all_pool_stats(void) { - int i; + int i,j; size_t nb=0, w, tw=0, no=0, b; - for(i=0; i < N_POOLS; i++) { - b = pool_stats(&norm_pools[i], &w); - nb += b; - no += (b/norm_pools[i].osize); - tw += w; - - b = pool_stats(&ephe_pools[i], &w); - nb += b; - no += (b/ephe_pools[i].osize); - tw += w; + for(j=0; j < N_GC_THREADS; j++) { + for(i=0; i < N_POOLS; i++) { + b = pool_stats(&norm_pools[j][i], &w); + nb += b; + no += (b/norm_pools[j][i].osize); + tw += w; + + b = pool_stats(&ephe_pools[j][i], &w); + nb += b; + no += (b/ephe_pools[j][i].osize); + tw += w; + } } JL_PRINTF(JL_STDOUT, "%d objects, %d total allocated, %d total fragments\n", diff --git a/src/gf.c b/src/gf.c index 010d5a2460224..e1aa4b3e9368a 100644 --- a/src/gf.c +++ b/src/gf.c @@ -368,9 +368,11 @@ extern jl_function_t *jl_typeinf_func; can be equal to "li" if not applicable. */ int jl_in_inference = 0; +JL_DEFINE_MUTEX_EXT(codegen) void jl_type_infer(jl_lambda_info_t *li, jl_tuple_t *argtypes, jl_lambda_info_t *def) { + JL_LOCK(codegen) int last_ii = jl_in_inference; jl_in_inference = 1; if (jl_typeinf_func != NULL) { @@ -397,6 +399,7 @@ void jl_type_infer(jl_lambda_info_t *li, jl_tuple_t *argtypes, li->inInference = 0; } jl_in_inference = last_ii; + JL_UNLOCK(codegen) } static jl_value_t *nth_slot_type(jl_tuple_t *sig, size_t i) @@ -457,6 +460,7 @@ static jl_function_t *cache_method(jl_methtable_t *mt, jl_tuple_t *type, jl_function_t *method, jl_tuple_t *decl, jl_tuple_t *sparams) { + JL_LOCK(codegen) size_t i; int need_guard_entries = 0; jl_value_t *temp=NULL; @@ -796,6 +800,7 @@ static jl_function_t *cache_method(jl_methtable_t *mt, jl_tuple_t *type, newmeth = jl_reinstantiate_method(method, li); (void)jl_method_cache_insert(mt, type, newmeth); JL_GC_POP(); + JL_UNLOCK(codegen) return newmeth; } else { @@ -842,6 +847,7 @@ static jl_function_t *cache_method(jl_methtable_t *mt, jl_tuple_t *type, jl_type_infer(newmeth->linfo, type, method->linfo); } JL_GC_POP(); + JL_UNLOCK(codegen) return newmeth; } @@ -1263,7 +1269,7 @@ jl_value_t *jl_no_method_error(jl_function_t *f, jl_value_t **args, size_t na) return jl_nothing; } -static jl_tuple_t *arg_type_tuple(jl_value_t **args, size_t nargs) +jl_tuple_t *arg_type_tuple(jl_value_t **args, size_t nargs) { jl_tuple_t *tt = jl_alloc_tuple(nargs); JL_GC_PUSH1(&tt); @@ -1355,6 +1361,7 @@ static void show_call(jl_value_t *F, jl_value_t **args, uint32_t nargs) } #endif + JL_CALLABLE(jl_apply_generic) { jl_methtable_t *mt = jl_gf_mtable(F); diff --git a/src/init.c b/src/init.c index 763cfd725ffcc..9fda1b9d52a0c 100644 --- a/src/init.c +++ b/src/init.c @@ -719,6 +719,7 @@ void julia_init(char *imageFile) } #endif + jl_init_threading(); #ifdef JL_GC_MARKSWEEP jl_gc_init(); jl_gc_disable(); diff --git a/src/julia.h b/src/julia.h index 3698e2b410638..9eb1f29691d1e 100644 --- a/src/julia.h +++ b/src/julia.h @@ -18,10 +18,12 @@ extern "C" { #ifndef _OS_WINDOWS_ # define jl_jmp_buf sigjmp_buf # define MAX_ALIGN sizeof(void*) +# define __JL_THREAD __thread #else # define jl_jmp_buf jmp_buf # include //for _resetstkoflw # define MAX_ALIGN 8 +# define __JL_THREAD __declspec(thread) #endif #ifdef _P64 @@ -1002,36 +1004,37 @@ typedef struct _jl_gcframe_t { // jl_value_t *x=NULL, *y=NULL; JL_GC_PUSH(&x, &y); // x = f(); y = g(); foo(x, y) +extern DLLEXPORT long jl_main_thread_id; extern DLLEXPORT jl_gcframe_t *jl_pgcstack; #define JL_GC_PUSH(...) \ void *__gc_stkf[] = {(void*)((VA_NARG(__VA_ARGS__)<<1)|1), jl_pgcstack, \ __VA_ARGS__}; \ - jl_pgcstack = (jl_gcframe_t*)__gc_stkf; + if(jl_main_thread_id == uv_thread_self()) { jl_pgcstack = (jl_gcframe_t*)__gc_stkf; } #define JL_GC_PUSH1(arg1) \ void *__gc_stkf[] = {(void*)3, jl_pgcstack, arg1}; \ - jl_pgcstack = (jl_gcframe_t*)__gc_stkf; + if(jl_main_thread_id == uv_thread_self()) { jl_pgcstack = (jl_gcframe_t*)__gc_stkf; } #define JL_GC_PUSH2(arg1, arg2) \ void *__gc_stkf[] = {(void*)5, jl_pgcstack, arg1, arg2}; \ - jl_pgcstack = (jl_gcframe_t*)__gc_stkf; + if(jl_main_thread_id == uv_thread_self()) { jl_pgcstack = (jl_gcframe_t*)__gc_stkf; } #define JL_GC_PUSH3(arg1, arg2, arg3) \ void *__gc_stkf[] = {(void*)7, jl_pgcstack, arg1, arg2, arg3}; \ - jl_pgcstack = (jl_gcframe_t*)__gc_stkf; + if(jl_main_thread_id == uv_thread_self()) { jl_pgcstack = (jl_gcframe_t*)__gc_stkf; } #define JL_GC_PUSH4(arg1, arg2, arg3, arg4) \ void *__gc_stkf[] = {(void*)9, jl_pgcstack, arg1, arg2, arg3, arg4}; \ - jl_pgcstack = (jl_gcframe_t*)__gc_stkf; + if(jl_main_thread_id == uv_thread_self()) { jl_pgcstack = (jl_gcframe_t*)__gc_stkf; } #define JL_GC_PUSHARGS(rts_var,n) \ rts_var = ((jl_value_t**)alloca(((n)+2)*sizeof(jl_value_t*)))+2; \ ((void**)rts_var)[-2] = (void*)(((size_t)n)<<1); \ ((void**)rts_var)[-1] = jl_pgcstack; \ - jl_pgcstack = (jl_gcframe_t*)&(((void**)rts_var)[-2]) + if(jl_main_thread_id == uv_thread_self()) { jl_pgcstack = (jl_gcframe_t*)&(((void**)rts_var)[-2]); } -#define JL_GC_POP() (jl_pgcstack = jl_pgcstack->prev) +#define JL_GC_POP() if(jl_main_thread_id == uv_thread_self()) {jl_pgcstack = jl_pgcstack->prev;} void jl_gc_init(void); void jl_gc_setmark(jl_value_t *v); @@ -1202,6 +1205,77 @@ void jl_longjmp(jmp_buf _Buf,int _Value); for (i__ca=1, jl_eh_restore_state(&__eh); i__ca; i__ca=0) #endif +// Threads + +typedef struct { + uv_thread_t t; + uv_mutex_t m; + uv_cond_t c; + int busy; + int poolid; + jl_value_t* exception; + jl_function_t* f; + jl_tuple_t* targs; +} jl_thread_t; + +DLLEXPORT jl_thread_t* jl_create_thread(jl_function_t* f, jl_tuple_t* targs); +DLLEXPORT void jl_run_thread(jl_thread_t* t); +DLLEXPORT void jl_join_thread(jl_thread_t* t); +DLLEXPORT void jl_destroy_thread(jl_thread_t* t); +DLLEXPORT jl_value_t* jl_thread_exception(jl_thread_t* t); + +DLLEXPORT uv_mutex_t* jl_create_mutex(); +DLLEXPORT void jl_lock_mutex(uv_mutex_t* m); +DLLEXPORT void jl_unlock_mutex(uv_mutex_t* m); +DLLEXPORT void jl_destroy_mutex(uv_mutex_t* m); + +extern long jl_nr_running_threads; + +// TODO: Implement a fallback that uses a mutex +#if defined( __GNUC__ ) +# define JL_ATOMIC_FETCH_AND_ADD(a,b) __sync_fetch_and_add(& a, b ) +#elif defined( _WIN32 ) +# define JL_ATOMIC_FETCH_AND_ADD(a,b) _InterlockedExchangeAdd ((volatile LONG*) & a, b); +#else +# error "No atomic operations supported." +#endif + +#define JL_DEFINE_MUTEX_EXT(m) \ + extern uv_mutex_t m ## _mutex; \ + extern long m ## _thread_id; + +#define JL_DEFINE_MUTEX(m) \ + uv_mutex_t m ## _mutex; \ + long m ## _thread_id; + +// The macros JL_LOCK and JL_UNLOCK are used to prevent different threads to execute the same code +// at the same time. They are implemented in a recursive manner so that a thread will not deadlock +// when it already holds the lock. +// +// There is a special case that the main thread should not acquire locks when no further threads are +// running. If this is not done the threads will deadlock when running code from the REPL as the main +// thread holds the lock when spawning the thread. However, when the threads are running, the main thread +// will also acquire the lock in order to prevent race consitions + +#define JL_LOCK(m) \ + int locked = 0; \ + if( m ## _thread_id != uv_thread_self() && (jl_main_thread_id != uv_thread_self() || jl_nr_running_threads > 0) ) \ + { \ + uv_mutex_lock(& m ## _mutex); \ + locked = 1; \ + m ## _thread_id = uv_thread_self(); \ + } + +#define JL_UNLOCK(m) \ + if(locked) { \ + m ## _thread_id = -1; \ + uv_mutex_unlock(& m ## _mutex); \ + } + +// This is the thread local exception handler that is used to catch exceptions in threads and rethrow +// them in the main thread when the threads are joining. +extern __JL_THREAD jl_jmp_buf jl_thread_eh; +extern __JL_THREAD jl_value_t* jl_thread_exception_in_transit; // I/O system ----------------------------------------------------------------- diff --git a/src/julia_internal.h b/src/julia_internal.h index 7d76a2fd6e15b..2f339b36fdc15 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -89,6 +89,8 @@ void jl_init_codegen(void); void jl_init_intrinsic_functions(void); void jl_init_tasks(void *stack, size_t ssize); void jl_init_serializer(void); +void jl_init_threading(); +void jl_cleanup_threading(); void jl_dump_bitcode(char *fname); void jl_dump_objfile(char *fname, int jit_model); @@ -102,6 +104,8 @@ jl_lambda_info_t *jl_add_static_parameters(jl_lambda_info_t *l, jl_tuple_t *sp); void jl_generate_fptr(jl_function_t *f); void jl_fptr_to_llvm(void *fptr, jl_lambda_info_t *lam, int specsig); +jl_function_t *jl_get_specialization(jl_function_t *f, jl_tuple_t *types); +jl_tuple_t *arg_type_tuple(jl_value_t **args, size_t nargs); // backtraces #ifdef _OS_WINDOWS_ diff --git a/src/options.h b/src/options.h index eba5ea1e4f23b..29e294301dc4f 100644 --- a/src/options.h +++ b/src/options.h @@ -74,4 +74,11 @@ #define COPY_STACKS #endif +// threading options ---------------------------------------------------------- + +// size of the thread pool +#define N_THREAD_POOL 0 +// number of memory pools for lock free pool_alloc +#define N_GC_THREADS 16 + #endif diff --git a/src/task.c b/src/task.c index 974843a42c5c8..01d74abfb71d3 100644 --- a/src/task.c +++ b/src/task.c @@ -155,7 +155,6 @@ jl_gcframe_t *jl_pgcstack = NULL; #endif static void start_task(jl_task_t *t); - #ifdef COPY_STACKS jl_jmp_buf * volatile jl_jmp_target; @@ -273,6 +272,9 @@ static void ctx_switch(jl_task_t *t, jl_jmp_buf *where) extern int jl_in_gc; static jl_value_t *switchto(jl_task_t *t) { + // prevent threads to switch tasks + if( jl_main_thread_id != uv_thread_self()) + return jl_nothing; if (t->state == done_sym || t->state == failed_sym) { jl_task_arg_in_transit = (jl_value_t*)jl_null; if (t->exception != jl_nothing) @@ -730,6 +732,16 @@ DLLEXPORT void gdbbacktrace() void NORETURN throw_internal(jl_value_t *e) { assert(e != NULL); + + // Threads use a special exit here to tell the main thread that + // an exception occurred. This means that try/catch should not be + // used in threads currently. + if(jl_main_thread_id != uv_thread_self()) + { + jl_thread_exception_in_transit = e; + jl_longjmp(jl_thread_eh,1); + } + jl_exception_in_transit = e; if (jl_current_task->eh != NULL) { jl_longjmp(jl_current_task->eh->eh_ctx, 1); @@ -752,6 +764,14 @@ void NORETURN throw_internal(jl_value_t *e) DLLEXPORT void jl_throw(jl_value_t *e) { assert(e != NULL); + // Threads use a special exit here to tell the main thread that + // an exception occurred. This means that try/catch should not be + // used in threads currently. + if(jl_main_thread_id != uv_thread_self()) + { + jl_thread_exception_in_transit = e; + jl_longjmp(jl_thread_eh,1); + } record_backtrace(); throw_internal(e); } diff --git a/src/threading.cpp b/src/threading.cpp new file mode 100644 index 0000000000000..f7828a1457c73 --- /dev/null +++ b/src/threading.cpp @@ -0,0 +1,233 @@ +#include "julia.h" +#include "julia_internal.h" + +#include +#include +#include "uv.h" + +extern "C" { + +long jl_main_thread_id = -1; + +__JL_THREAD jl_jmp_buf jl_thread_eh; +__JL_THREAD jl_value_t* jl_thread_exception_in_transit; + +long jl_nr_running_threads = 0; + +JL_DEFINE_MUTEX(gc) +JL_DEFINE_MUTEX(codegen) +JL_DEFINE_MUTEX(nr_running_threads) +uv_mutex_t gc_pool_mutex[N_GC_THREADS]; + +#if N_THREAD_POOL > 0 +jl_thread_t thread_pool[N_THREAD_POOL]; +int finish_thread_pool = 0; +#endif + +#if N_THREAD_POOL > 0 +void run_pool_thread(void* t_) +{ + jl_thread_t* t = (jl_thread_t*)t_; + uv_mutex_lock(&t->m); + while(true) + { + // wait for the next task + while(!t->busy) + uv_cond_wait(&t->c, &t->m); + + if(finish_thread_pool) + break; + + uv_mutex_unlock(&t->m); + + jl_value_t** args = (jl_value_t**) alloca( sizeof(jl_value_t*)*jl_tuple_len(t->targs)); + for(int l=0; ltargs); l++) + args[l] = jl_tupleref(t->targs,l); + + t->exception = jl_nothing; + + // try/catch + if(!jl_setjmp(jl_thread_eh,0)) { + jl_apply(t->f,args,jl_tuple_len(t->targs)); + } else { + t->exception = jl_thread_exception_in_transit; + } + + t->busy = 0; + uv_cond_signal(&t->c); + uv_mutex_lock(&t->m); + } + uv_mutex_unlock(&t->m); +} +#endif + +void run_standalone_thread(void* t) +{ + jl_function_t* f = ((jl_thread_t*)t)->f; + jl_tuple_t* targs = ((jl_thread_t*)t)->targs; + + jl_value_t** args = (jl_value_t**) alloca( sizeof(jl_value_t*)*jl_tuple_len(targs)); + for(int l=0; lexception = jl_thread_exception_in_transit; + } +} + +void jl_init_threading() +{ + uv_mutex_init(&gc_mutex); + uv_mutex_init(&codegen_mutex); + uv_mutex_init(&nr_running_threads_mutex); + for(int n=0; n 0 + for(int n=0; n 0 + finish_thread_pool = true; + for(int n=0; n 0 + for(int n=0; npoolid = -1; // This tells us that this thread is standalone + t->exception = jl_nothing; + + jl_tuple_t* argtypes = arg_type_tuple(&jl_tupleref(targs,0), nargs); + t->f = jl_get_specialization(f, argtypes); + if(t->f == NULL) + t->f = f; + jl_compile(t->f); // does this make sense here? + t->targs = targs; + return t; +} + +void jl_run_thread(jl_thread_t* t) +{ + uv_mutex_lock(&nr_running_threads_mutex); + jl_nr_running_threads++; + uv_mutex_unlock(&nr_running_threads_mutex); + + #if N_THREAD_POOL > 0 + if(t->poolid != -1) + { + t->busy = 1; + uv_cond_signal(&t->c); // notify thread that it now can proceed + uv_mutex_unlock(&t->m); + } + else + #endif + uv_thread_create(&(t->t), run_standalone_thread, t); +} + +void jl_join_thread(jl_thread_t* t) +{ + #if N_THREAD_POOL > 0 + if(t->poolid != -1) { + uv_mutex_lock(&t->m); + while(t->busy) + uv_cond_wait(&t->c, &t->m); + uv_mutex_unlock(&t->m); + } + else + #endif + uv_thread_join(&(t->t)); + + uv_mutex_lock(&nr_running_threads_mutex); + jl_nr_running_threads--; + uv_mutex_unlock(&nr_running_threads_mutex); + + +} + +void jl_destroy_thread(jl_thread_t* t) +{ + #if N_THREAD_POOL > 0 + if(t->poolid == -1) + #endif + free(t); +} + +jl_value_t* jl_thread_exception(jl_thread_t* t) +{ + return t->exception; +} + +// locks + +uv_mutex_t* jl_create_mutex() +{ + uv_mutex_t* m = (uv_mutex_t*) malloc(sizeof(uv_mutex_t)); + uv_mutex_init(m); + return m; +} + +void jl_lock_mutex(uv_mutex_t* m) +{ + uv_mutex_lock(m); +} + +void jl_unlock_mutex(uv_mutex_t* m) +{ + uv_mutex_unlock(m); +} + +void jl_destroy_mutex(uv_mutex_t* m) +{ + uv_mutex_destroy(m); + free(m); +} + +} diff --git a/test/crashing.jl b/test/crashing.jl new file mode 100644 index 0000000000000..ec4afa1c275c1 --- /dev/null +++ b/test/crashing.jl @@ -0,0 +1,25 @@ +import Base.Test.@test, Base.parapply + +### Print test + +function i_will_print(N,m) + #u="hhh"*"kkk" + for i=1:N + Base.lock(m) + println(i) + Base.unlock(m) + end +end + +let N=100 + gc_disable() + m = Base.Mutex() + t1 = Base.Thread(i_will_print, N,m) + t2 = Base.Thread(i_will_print, N,m) + Base.run(t1) + Base.run(t2) + Base.join(t1) + Base.join(t2) + gc_enable() +end + diff --git a/test/pimfilter.jl b/test/pimfilter.jl new file mode 100644 index 0000000000000..02b46a56974bb --- /dev/null +++ b/test/pimfilter.jl @@ -0,0 +1,68 @@ +import Base.Test.@test, Base.parapply + +using Images + +function imfilter{T}(img::Matrix{T}, filter::Matrix{T}, border::String, value::T; numthreads=0) + sf = size(filter) + fw = int(([sf...] .- 1) / 2) + A = padarray(img, fw, fw, border, value) + + m, n = sf + offsets = int(broadcast(+, [0:m-1] .- floor((m - 1) / 2), ([0:n-1]' .- floor((n - 1) / 2)) * size(A, 1)))'' + I = filter .!= 0 + kernel_values = filter[I] + kernel_offsets = offsets[I] + start_index = int(floor(m / 2) + floor(n / 2) * size(A, 1)) + if(numthreads > 0) + return _pimfilter(A[:], kernel_values, kernel_offsets, size(A, 1), size(A, 2), size(img, 1), size(img, 2), start_index, numthreads=numthreads) + else + return _imfilter(A[:], kernel_values, kernel_offsets, size(A, 1), size(A, 2), size(img, 1), size(img, 2), start_index) + end +end + + +function _imfilter{T}(A::Vector{T}, coefficients::Vector{T}, offsets::Vector{Int}, in_height::Int, in_width::Int, out_height::Int, out_width::Int, start_index::Int) + B = zeros(T, out_height, out_width) + _pimfilter_core(1:out_width,A,B,coefficients,offsets,in_height,out_height,start_index) + return B +end + +function _pimfilter{T}(A::Vector{T}, coefficients::Vector{T}, offsets::Vector{Int}, in_height::Int, in_width::Int, out_height::Int, out_width::Int, start_index::Int; numthreads=2) + B = zeros(T, out_height, out_width) + num_coefficients = length(coefficients) + parapply(_pimfilter_core, 1:out_width, A,B,coefficients,offsets,in_height,out_height,start_index, numthreads=numthreads) + return B +end + +function _pimfilter_core{T}(r, A::Vector{T}, B::Matrix{T}, coefficients::Vector{T}, offsets::Vector{Int}, in_height::Int, out_height::Int, start_index::Int) + num_coefficients = length(coefficients) + for n in r + for m = 1:out_height + index = m + (n - 1) * in_height + start_index; + sum = zero(T) + for k = 1:num_coefficients + @inbounds sum += A[index + offsets[k]] * coefficients[k] + end + @inbounds B[m, n] = sum; + end + end +end + + + +let N = 1024 + A = rand(N,N) + kern = ones(19,19) + + B = imfilter(A,kern,"replicate",0.0) + B = imfilter(A,kern,"replicate",0.0,numthreads=1) + println("imfilter - serial") + @time B = imfilter(A,kern,"replicate",0.0) + println("imfilter - 1 thread") + @time D = imfilter(A,kern,"replicate",0.0,numthreads=1) + println("imfilter - 2 threads") + @time C = imfilter(A,kern,"replicate",0.0,numthreads=2) + + @test B == C +end + diff --git a/test/runtests.jl b/test/runtests.jl index 8659c6696bbaa..c4dfcee9edcc1 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -8,7 +8,7 @@ testnames = [ "resolve", "pollfd", "mpfr", "broadcast", "complex", "socket", "floatapprox", "readdlm", "regex", "float16", "combinatorics", "sysinfo", "rounding", "ranges", "mod2pi", "euler", "show", - "lineedit", "replcompletions" + "lineedit", "replcompletions", "threading" ] @unix_only push!(testnames, "unicode") diff --git a/test/threading.jl b/test/threading.jl new file mode 100644 index 0000000000000..0c5507082d039 --- /dev/null +++ b/test/threading.jl @@ -0,0 +1,257 @@ +import Base.Test.@test, Base.parapply + +### perfect threading + +function my_kernel(r) + z = 0.0 + for l=1:40000*length(r) + z += sin(2*pi*l/40000) + end +end + +let N=1000 + # warmup + parapply(my_kernel, 1:N, numthreads=1) + + # run with 1 thread (serial) + println("my_kernel - 1 thread") + @time parapply(my_kernel, 1:N, numthreads=1) + + # run with 2 threads (parallel) + println("my_kernel - 2 threads") + @time parapply(my_kernel, 1:N, numthreads=2) +end + +### bad threading + +type MyType + i::Int +end + +function my_bad_kernel(r) + z = 0.0 + for l=1:4000*length(r) + tmp = MyType(l) + for k=1:10 + z += sin(2*pi*l/40000) + end + end +end + +let N=1000 + # warmup + parapply(my_kernel, 1:N, numthreads=1) + + # run with 1 thread (serial) + println("\nmy_bad_kernel - 1 thread") + @time parapply(my_bad_kernel, 1:N, numthreads=1) + + # run with 2 threads (parallel) + println("my_bad_kernel - 2 threads") + @time parapply(my_bad_kernel, 1:N, numthreads=2) +end + +### tanh + +function tanh_core(r,x,y) + for i in r + @inbounds y[i] = tanh(x[i]) + end +end + +function ptanh(x; numthreads=CPU_CORES) + y = similar(x) + N = length(x) + parapply(tanh_core, 1:N, x, y, preapply=true, numthreads=numthreads) + y +end + +let + x = rand(10000,200) + + # warmup + tanh(x) + println("\ntanh - serial") + @time tanh(x) + + + # run with 1 thread (serial) + println("tanh - 1 thread") + ptanh(x, numthreads=1) + @time ptanh(x, numthreads=1) + + # run with 2 threads (parallel) + println("tanh - 2 threads") + @time ptanh(x, numthreads=2) + +end + +### matrix vector multiplication + +function my_matmult(r,A,x,b) + N = length(x) + @inbounds begin + for i in r + b[i] = 0 + for l=1:N + b[i] += A[l,i]*x[l] + end + end + end +end + +let N=9000 + dtype = Int128 + x = ones(dtype,N) + A = ones(dtype,N,N) + b1 = A'*x + b2 = zeros(dtype,N) + b3 = zeros(dtype,N) + + # warmup + parapply(my_matmult,1:N, A,x,b2, numthreads=1) + + # run with 1 thread (serial) + println("\nmy_matmult - 1 thread") + @time parapply(my_matmult,1:N, A,x,b2, numthreads=1) + + # run with 2 threads (parallel) + println("my_matmult - 2 threads") + @time parapply(my_matmult,1:N, A,x,b3, numthreads=2) + + @test b1 == b2 + @test b1 == b3 +end + +### Simple thread test + +function sqrt!(x) + for n=1:length(x) + x[n] = sqrt(x[n]) + end +end + +let N=1000 + x = rand(N) + y = copy(x) + + t = Base.Thread(sqrt!,x) + Base.run(t) + sqrt!(y) + Base.join(t) + + @test x == y +end + +### Mutex test + +function plus_one!(x,m) + for n=1:10000 + Base.lock(m) + x[1] += 1 + Base.unlock(m) + end +end + +let + x = zeros(1) + m = Base.Mutex() + + t1 = Base.Thread(plus_one!,x,m) + t2 = Base.Thread(plus_one!,x,m) + Base.run(t1) + Base.run(t2) + Base.join(t1) + Base.join(t2) + + @test x[1] == 20000 +end + +### Exception test + +function i_will_throw() + error("An error in thread!") +end + +let N=100 + + nthrows = 0 + for l=1:N + try + t1 = Base.Thread(i_will_throw) + t2 = Base.Thread(i_will_throw) + Base.run(t1) + Base.run(t2) + Base.join(t1) + Base.join(t2) + catch + nthrows += 1 + end + end + + @test nthrows == N +end + + +### Median filter + +function median_filter(im::Matrix, filterSize=3) + N = size(im) + out = similar(im) + K = int(floor(filterSize/2)) + for x=1:N[1], y=1:N[2] + x_min = max(1, x-K) + x_max = min(N[1], x+K) + y_min = max(1, y-K) + y_max = min(N[2], y+K) + + s = im[x_min:x_max, y_min:y_max] + out[x,y] = median(s[:]) + end + out +end + +function pmedian_filter_core(r, im, out, K) + @inbounds begin + for y in r + N = size(im) + y_min = max(1, y-K) + y_max = min(N[2], y+K) + + for x=1:N[1] + x_min = max(1, x-K) + x_max = min(N[1], x+K) + + s = im[x_min:x_max, y_min:y_max] + out[x,y] = median(s[:]) + end + end + end +end + + +function pmedian_filter(im::Matrix, filterSize=3; numthreads=2) + N = size(im) + out = similar(im) + K = int(floor(filterSize/2)) + + parapply(pmedian_filter_core,1:N[2], im,out,K, numthreads = numthreads) + out +end + + +let N = 512 + A = rand(N,N) + filterSize = 3 + + println("\nmedian_filter - serial") + B = median_filter(A,filterSize) + @time B = median_filter(A,filterSize) + println("median_filter - 1 thread") + @time D = pmedian_filter(A, filterSize, numthreads=1) + println("median_filter - 2 threads") + @time C = pmedian_filter(A, filterSize, numthreads=2) + + @test B == C +end +