Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Multithreading Experiment #6741

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
5d205dd
some threading experiments
tknopp May 2, 2014
6884214
various improvements
tknopp May 4, 2014
c93180f
little improvement to thread type
tknopp May 4, 2014
3a4f58c
add some locks
tknopp May 4, 2014
84c9703
port to libuv threads and mutex
tknopp May 5, 2014
f65313d
remove stdlib=libc++ from Makefile
tknopp May 5, 2014
c6bdc3a
introduce several new locks
tknopp May 6, 2014
87b9824
fix some locks
tknopp May 7, 2014
c702245
do not gc root parameters in threads (as gc is disabled)
tknopp May 7, 2014
edbc1d7
more improvements
tknopp May 8, 2014
541a08c
crashing.jl does not crash anymore
tknopp May 8, 2014
d8e5eb4
revert ast.c
tknopp May 8, 2014
b37b9c9
cleanup
tknopp May 8, 2014
a514766
macrofication
tknopp May 8, 2014
6b1d3ea
update crashing.jl and fix the new crash
tknopp May 8, 2014
7154a27
no locking in main thread (workaround)
tknopp May 8, 2014
3332208
add median filter example which is slower on two threads
tknopp May 10, 2014
edd2db5
a little better locking in gc. introduce parallel imfilter test
tknopp May 13, 2014
3408688
cleanup threading.jl test file and introduce an example where type in…
tknopp May 13, 2014
e14fcb8
implement thread aware memory pool
tknopp May 14, 2014
0bf242e
small cleanup
tknopp May 14, 2014
560bf1e
improve API of parapply (now taking a range)
tknopp May 14, 2014
fd13544
cleanup and implement a very simple thread pool
tknopp May 15, 2014
6667856
cleanup and remove the C implementation of par apply
tknopp May 16, 2014
57c85fe
Merge remote-tracking branch 'upstream/master' into jlthreading
tknopp May 20, 2014
151b633
make par apply work on ranges
tknopp May 21, 2014
d5899df
Add some documentation to threading.jl
tknopp May 22, 2014
66eae20
Make exception throwing work in threads
tknopp May 23, 2014
7afa445
fix crashes when running threads from the REPL
tknopp May 24, 2014
221d6e3
println in a thread now works.
tknopp May 24, 2014
9f4bbb5
use atomics in gc
tknopp May 24, 2014
e1b102d
cleanup and add support for atomic operations on windows
tknopp May 25, 2014
676b7d6
one fix in pool alloc
tknopp May 25, 2014
7d0ba16
Add some documentation to julia.h
tknopp May 26, 2014
f2bfb74
rethrow the actual exception in the main thread
tknopp May 26, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions base/sysimg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ include("graphics.jl")
include("profile.jl")
importall .Profile

include("threading.jl")

function __init__()
# Base library init
reinit_stdio()
Expand Down
104 changes: 104 additions & 0 deletions base/threading.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#=
This file contains experimental threading support for Julia. The following rules
should be taken into account when using the threading functions.

- The function Base.parapply is the best way to use multiple threads. It takes as argument
a function of the form f(r::UnitRange, args...) that operates over a certain range r. parapply
gets as second argument the full range over that f should be executed wheras internally the
range will be split into chunks that are executed in different threads. The variables f is
supposed to work on are passes as varargs after the range argument. parapply has two keyword arguments:
preapply is used to execute the function f once on the main thread. This is very useful for finding
syntax errors that would otherwise crash the Julia process. The other keyword argument is numthreads
that can be used to control the number of threads used.

- Base.parapply does switch the garbage collector of. Thus one should not allocate more memory in the
threads than absolutely needed. It is much better to preallocate it and pass it to the threads.

- Do not use print statements in the threading functions (Hopefully we can fix this)

- When locking is required there is a Mutex type.

- Running threads in an @async task is probably a very bad idea (Fixme)

- There is a low level Thread type. When using this the main thread should directly join it so that the main
thread is waiting until the worker threads are done. Further one should switch the garbage collector off before
running the threads.
=#


### Thread type

type Thread
handle::Ptr{Void}
end

function join(t::Thread)
ccall(:jl_join_thread,Void,(Ptr{Void},),t.handle)
e = exception(t)
if e != nothing
throw(e)
end
end
run(t::Thread) = (ccall(:jl_run_thread,Void,(Ptr{Void},),t.handle))
destroy(t::Thread) = (ccall(:jl_destroy_thread,Void,(Ptr{Void},),t.handle))
exception(t::Thread) = ccall(:jl_thread_exception,Exception,(Ptr{Void},),t.handle)

function Thread(f::Function,args...)
t = Thread(ccall(:jl_create_thread,Ptr{Void},(Any,Any),f,args))
finalizer(t, destroy)
t
end

### Mutex type

type Mutex
handle::Ptr{Void}
end

lock(m::Mutex) = (ccall(:jl_lock_mutex,Void,(Ptr{Void},),m.handle))
unlock(m::Mutex) = (ccall(:jl_unlock_mutex,Void,(Ptr{Void},),m.handle))
destroy(m::Mutex) = (ccall(:jl_destroy_mutex,Void,(Ptr{Void},),m.handle))

function Mutex()
m = Mutex(ccall(:jl_create_mutex,Ptr{Void},()))
finalizer(m, destroy)
m
end


### parallel apply function (scheduling in julia).

function parapply(f::Function, r::UnitRange, args...; preapply::Bool = true, numthreads::Int = CPU_CORES)

st = start(r)
len = length(r)

t = Array(Base.Thread,numthreads)

if(preapply)
f(range(st, 1), args...)
st = st + 1
len = len - 1
end

chunk = ifloor(len / numthreads)
rem = len

gc_disable()
for i=0:(numthreads-2)
t[i+1] = Base.Thread(f,range(int(st+i*chunk), chunk), args...)
rem -= chunk
end
t[numthreads] = Base.Thread(f,range(int(st+(numthreads-1)*chunk), rem), args...)

for i=1:numthreads
Base.run(t[i])
end

for i=1:numthreads
Base.join(t[i])
end

gc_enable()
#gc()
end
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ override CPPFLAGS += $(JCPPFLAGS)

SRCS = \
jltypes gf ast builtins module codegen interpreter \
alloc dlload sys init task array dump toplevel jl_uv jlapi profile llvm-simdloop
alloc dlload sys init task array dump toplevel jl_uv jlapi profile llvm-simdloop threading

FLAGS = \
-D_GNU_SOURCE -Iflisp -Isupport \
Expand Down
15 changes: 13 additions & 2 deletions src/builtins.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
extern "C" {
#endif

JL_DEFINE_MUTEX_EXT(codegen)

// exceptions -----------------------------------------------------------------

DLLEXPORT void jl_error(const char *str)
Expand Down Expand Up @@ -369,6 +371,7 @@ extern int jl_lineno;

JL_CALLABLE(jl_f_top_eval)
{
JL_LOCK(codegen)
jl_module_t *m;
jl_value_t *ex;
if (nargs == 1) {
Expand All @@ -381,14 +384,17 @@ JL_CALLABLE(jl_f_top_eval)
m = (jl_module_t*)args[0];
ex = args[1];
}
jl_value_t *v=NULL;
if (jl_is_symbol(ex)) {
return jl_eval_global_var(m, (jl_sym_t*)ex);
v = jl_eval_global_var(m, (jl_sym_t*)ex);
JL_UNLOCK(codegen)
return v;
}
jl_value_t *v=NULL;
int last_lineno = jl_lineno;
if (m == jl_current_module) {
v = jl_toplevel_eval(ex);
jl_lineno = last_lineno;
JL_UNLOCK(codegen)
return v;
}
jl_module_t *last_m = jl_current_module;
Expand All @@ -401,12 +407,14 @@ JL_CALLABLE(jl_f_top_eval)
jl_lineno = last_lineno;
jl_current_module = last_m;
jl_current_task->current_module = task_last_m;
JL_UNLOCK(codegen)
jl_rethrow();
}
jl_lineno = last_lineno;
jl_current_module = last_m;
jl_current_task->current_module = task_last_m;
assert(v);
JL_UNLOCK(codegen)
return v;
}

Expand Down Expand Up @@ -745,6 +753,7 @@ int jl_eval_with_compiler_p(jl_expr_t *expr, int compileloops);

JL_CALLABLE(jl_trampoline)
{
JL_LOCK(codegen)
assert(jl_is_func(F));
jl_function_t *f = (jl_function_t*)F;
assert(f->linfo != NULL);
Expand All @@ -769,6 +778,8 @@ JL_CALLABLE(jl_trampoline)
if (jl_boot_file_loaded && jl_is_expr(f->linfo->ast)) {
f->linfo->ast = jl_compress_ast(f->linfo, f->linfo->ast);
}
JL_UNLOCK(codegen)

return jl_apply(f, args, nargs);
}

Expand Down
9 changes: 8 additions & 1 deletion src/codegen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,12 +479,15 @@ static void jl_rethrow_with_add(const char *fmt, ...)
jl_rethrow();
}

JL_DEFINE_MUTEX_EXT(codegen)

// --- entry point ---
//static int n_emit=0;
static Function *emit_function(jl_lambda_info_t *lam, bool cstyle);
//static int n_compile=0;
static Function *to_function(jl_lambda_info_t *li, bool cstyle)
{
JL_LOCK(codegen)
JL_SIGATOMIC_BEGIN();
assert(!li->inInference);
BasicBlock *old = nested_compile ? builder.GetInsertBlock() : NULL;
Expand All @@ -506,6 +509,7 @@ static Function *to_function(jl_lambda_info_t *li, bool cstyle)
builder.SetCurrentDebugLocation(olddl);
}
JL_SIGATOMIC_END();
JL_UNLOCK(codegen)
jl_rethrow_with_add("error compiling %s", li->name->name);
}
assert(f != NULL);
Expand Down Expand Up @@ -539,6 +543,7 @@ static Function *to_function(jl_lambda_info_t *li, bool cstyle)
builder.SetCurrentDebugLocation(olddl);
}
JL_SIGATOMIC_END();
JL_UNLOCK(codegen)
return f;
}

Expand All @@ -557,6 +562,7 @@ static void jl_setup_module(Module *m, bool add)

extern "C" void jl_generate_fptr(jl_function_t *f)
{
JL_LOCK(codegen)
// objective: assign li->fptr
jl_lambda_info_t *li = f->linfo;
assert(li->functionObject);
Expand All @@ -575,7 +581,7 @@ extern "C" void jl_generate_fptr(jl_function_t *f)
#endif

Function *llvmf = (Function*)li->functionObject;

#ifdef USE_MCJIT
li->fptr = (jl_fptr_t)jl_ExecutionEngine->getFunctionAddress(llvmf->getName());
#else
Expand All @@ -597,6 +603,7 @@ extern "C" void jl_generate_fptr(jl_function_t *f)
}
}
f->fptr = li->fptr;
JL_UNLOCK(codegen)
}

extern "C" void jl_compile(jl_function_t *f)
Expand Down
Loading