Skip to content

Commit 5366148

Browse files
vtjnashJeffBezanson
authored andcommitted
threads: fix scheduler variable confusion (#32551)
gotta keep system vs runtime and global vs local straight! fix #32511
1 parent ba03920 commit 5366148

File tree

6 files changed

+52
-41
lines changed

6 files changed

+52
-41
lines changed

src/julia_threads.h

+3-5
Original file line numberDiff line numberDiff line change
@@ -190,14 +190,12 @@ struct _jl_tls_states_t {
190190
// this is limited to the few places we do synchronous IO
191191
// we can make this more general (similar to defer_signal) if necessary
192192
volatile sig_atomic_t io_wait;
193-
#ifndef _OS_WINDOWS_
194-
// These are only used on unix now
195-
pthread_t system_id;
196-
void *signal_stack;
197-
#endif
198193
#ifdef _OS_WINDOWS_
199194
int needs_resetstkoflw;
195+
#else
196+
void *signal_stack;
200197
#endif
198+
unsigned long system_id;
201199
// execution of certain certain impure
202200
// statements is prohibited from certain
203201
// callbacks (such as generated functions)

src/partr.c

+28-27
Original file line numberDiff line numberDiff line change
@@ -311,16 +311,14 @@ static int sleep_check_after_threshold(uint64_t *start_cycles)
311311
}
312312

313313

314-
static void wake_thread(int16_t self, int16_t tid)
314+
static void wake_thread(int16_t tid)
315315
{
316-
if (self != tid) {
317-
jl_ptls_t other = jl_all_tls_states[tid];
318-
int16_t state = jl_atomic_exchange(&other->sleep_check_state, not_sleeping);
319-
if (state == sleeping) {
320-
uv_mutex_lock(&other->sleep_lock);
321-
uv_cond_signal(&other->wake_signal);
322-
uv_mutex_unlock(&other->sleep_lock);
323-
}
316+
jl_ptls_t other = jl_all_tls_states[tid];
317+
int16_t state = jl_atomic_exchange(&other->sleep_check_state, not_sleeping);
318+
if (state == sleeping) {
319+
uv_mutex_lock(&other->sleep_lock);
320+
uv_cond_signal(&other->wake_signal);
321+
uv_mutex_unlock(&other->sleep_lock);
324322
}
325323
}
326324

@@ -345,34 +343,37 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid)
345343
{
346344
jl_ptls_t ptls = jl_get_ptls_states();
347345
int16_t self = ptls->tid;
346+
unsigned long system_self = jl_all_tls_states[self]->system_id;
348347
int16_t uvlock = jl_atomic_load_acquire(&jl_uv_mutex.owner);
349-
if (tid == self) {
348+
if (tid == self || tid == -1) {
350349
// we're already awake, but make sure we'll exit uv_run
351350
jl_atomic_store(&ptls->sleep_check_state, not_sleeping);
352-
if (uvlock == self)
351+
if (uvlock == system_self)
353352
uv_stop(jl_global_event_loop());
354353
}
355354
#ifdef JULIA_ENABLE_THREADING
356355
else {
356+
// something added to the sticky-queue: notify that thread
357+
wake_thread(tid);
358+
// check if we need to notify uv_run too
359+
if (uvlock != system_self)
360+
jl_wake_libuv();
361+
}
362+
if (tid == -1) {
357363
// check if the other threads might be sleeping
358364
if (jl_atomic_load_acquire(&sleep_check_state) != not_sleeping) {
359-
if (tid == -1) {
360-
// something added to the multi-queue: notify all threads
361-
// in the future, we might want to instead wake some fraction of threads,
362-
// and let each of those wake additional threads if they find work
363-
int16_t state = jl_atomic_exchange(&sleep_check_state, not_sleeping);
364-
if (state == sleeping) {
365-
for (tid = 0; tid < jl_n_threads; tid++)
366-
wake_thread(self, tid);
367-
}
368-
}
369-
else {
370-
// something added to the sticky-queue: notify that thread
371-
wake_thread(self, tid);
365+
// something added to the multi-queue: notify all threads
366+
// in the future, we might want to instead wake some fraction of threads,
367+
// and let each of those wake additional threads if they find work
368+
int16_t state = jl_atomic_exchange(&sleep_check_state, not_sleeping);
369+
if (state == sleeping) {
370+
for (tid = 0; tid < jl_n_threads; tid++)
371+
if (tid != self)
372+
wake_thread(tid);
373+
// check if we need to notify uv_run too
374+
if (uvlock != system_self)
375+
jl_wake_libuv();
372376
}
373-
// check if we need to notify uv_run too
374-
if (uvlock != self)
375-
jl_wake_libuv();
376377
}
377378
}
378379
#endif

src/signals-mach.c

+7-7
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ void jl_mach_gc_end(void)
3131
int8_t gc_state = (int8_t)(item >> 8);
3232
jl_ptls_t ptls2 = jl_all_tls_states[tid];
3333
jl_atomic_store_release(&ptls2->gc_state, gc_state);
34-
thread_resume(pthread_mach_thread_np(ptls2->system_id));
34+
thread_resume(pthread_mach_thread_np((pthread_t)ptls2->system_id));
3535
}
3636
suspended_threads.len = 0;
3737
}
@@ -101,7 +101,7 @@ static void allocate_segv_handler()
101101
}
102102
pthread_attr_destroy(&attr);
103103
for (int16_t tid = 0;tid < jl_n_threads;tid++) {
104-
attach_exception_port(pthread_mach_thread_np(jl_all_tls_states[tid]->system_id), 0);
104+
attach_exception_port(pthread_mach_thread_np((pthread_t)jl_all_tls_states[tid]->system_id), 0);
105105
}
106106
}
107107

@@ -178,7 +178,7 @@ kern_return_t catch_exception_raise(mach_port_t exception_port,
178178
jl_ptls_t ptls2 = NULL;
179179
for (tid = 0;tid < jl_n_threads;tid++) {
180180
jl_ptls_t _ptls2 = jl_all_tls_states[tid];
181-
if (pthread_mach_thread_np(_ptls2->system_id) == thread) {
181+
if (pthread_mach_thread_np((pthread_t)_ptls2->system_id) == thread) {
182182
ptls2 = _ptls2;
183183
break;
184184
}
@@ -269,7 +269,7 @@ static void attach_exception_port(thread_port_t thread, int segv_only)
269269
static void jl_thread_suspend_and_get_state(int tid, unw_context_t **ctx)
270270
{
271271
jl_ptls_t ptls2 = jl_all_tls_states[tid];
272-
mach_port_t tid_port = pthread_mach_thread_np(ptls2->system_id);
272+
mach_port_t tid_port = pthread_mach_thread_np((pthread_t)ptls2->system_id);
273273

274274
kern_return_t ret = thread_suspend(tid_port);
275275
HANDLE_MACH_ERROR("thread_suspend", ret);
@@ -289,7 +289,7 @@ static void jl_thread_suspend_and_get_state(int tid, unw_context_t **ctx)
289289
static void jl_thread_resume(int tid, int sig)
290290
{
291291
jl_ptls_t ptls2 = jl_all_tls_states[tid];
292-
mach_port_t thread = pthread_mach_thread_np(ptls2->system_id);
292+
mach_port_t thread = pthread_mach_thread_np((pthread_t)ptls2->system_id);
293293
kern_return_t ret = thread_resume(thread);
294294
HANDLE_MACH_ERROR("thread_resume", ret);
295295
}
@@ -299,7 +299,7 @@ static void jl_thread_resume(int tid, int sig)
299299
static void jl_try_deliver_sigint(void)
300300
{
301301
jl_ptls_t ptls2 = jl_all_tls_states[0];
302-
mach_port_t thread = pthread_mach_thread_np(ptls2->system_id);
302+
mach_port_t thread = pthread_mach_thread_np((pthread_t)ptls2->system_id);
303303

304304
kern_return_t ret = thread_suspend(thread);
305305
HANDLE_MACH_ERROR("thread_suspend", ret);
@@ -328,7 +328,7 @@ static void jl_try_deliver_sigint(void)
328328
static void jl_exit_thread0(int exitstate)
329329
{
330330
jl_ptls_t ptls2 = jl_all_tls_states[0];
331-
mach_port_t thread = pthread_mach_thread_np(ptls2->system_id);
331+
mach_port_t thread = pthread_mach_thread_np((pthread_t)ptls2->system_id);
332332
kern_return_t ret = thread_suspend(thread);
333333
HANDLE_MACH_ERROR("thread_suspend", ret);
334334

src/threading.c

+1-2
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ JL_DLLEXPORT int16_t jl_threadid(void)
241241
void jl_init_threadtls(int16_t tid)
242242
{
243243
jl_ptls_t ptls = jl_get_ptls_states();
244+
ptls->system_id = jl_thread_self();
244245
seed_cong(&ptls->rngseed);
245246
#ifdef _OS_WINDOWS_
246247
if (tid == 0) {
@@ -251,8 +252,6 @@ void jl_init_threadtls(int16_t tid)
251252
hMainThread = INVALID_HANDLE_VALUE;
252253
}
253254
}
254-
#else
255-
ptls->system_id = pthread_self();
256255
#endif
257256
assert(ptls->world_age == 0);
258257
ptls->world_age = 1; // OK to run Julia code on this thread

test/runtests.jl

+1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ end
5050
# Base.compilecache only works from node 1, so precompile test is handled specially
5151
move_to_node1("precompile")
5252
move_to_node1("SharedArrays")
53+
move_to_node1("threads")
5354
# Ensure things like consuming all kernel pipe memory doesn't interfere with other tests
5455
move_to_node1("stress")
5556

test/threads_exec.jl

+12
Original file line numberDiff line numberDiff line change
@@ -653,3 +653,15 @@ function pfib(n::Int)
653653
return pfib(n-1) + fetch(t)::Int
654654
end
655655
@test pfib(20) == 6765
656+
657+
658+
# scheduling wake/sleep test (#32511)
659+
let timeout = 300 # this test should take about 1-10 seconds
660+
t = Timer(timeout) do t
661+
ccall(:uv_kill, Cint, (Cint, Cint), getpid(), Base.SIGTERM)
662+
end # set up a watchdog alarm
663+
for _ = 1:10^5
664+
@threads for idx in 1:1024; #=nothing=# end
665+
end
666+
close(t) # stop the watchdog
667+
end

0 commit comments

Comments
 (0)