diff --git a/base/Base.jl b/base/Base.jl index 221ab90d8d2a9..caa48179755a2 100644 --- a/base/Base.jl +++ b/base/Base.jl @@ -627,7 +627,7 @@ function start_profile_listener() # this will prompt any ongoing or pending event to flush also close(cond) # error-propagation is not needed, since the errormonitor will handle printing that better - _wait(t) + t === current_task() || _wait(t) end finalizer(cond) do c # if something goes south, still make sure we aren't keeping a reference in C to this diff --git a/base/condition.jl b/base/condition.jl index 52781f348eb0d..bc14b17b3ac6b 100644 --- a/base/condition.jl +++ b/base/condition.jl @@ -138,7 +138,7 @@ function wait(c::GenericCondition; first::Bool=false) try return wait() catch - ct.queue === nothing || list_deletefirst!(ct.queue::IntrusiveLinkedList{Task}, ct) + q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct) rethrow() finally relockall(c.lock, token) diff --git a/base/initdefs.jl b/base/initdefs.jl index aa2ea67528da9..707c96a2444d6 100644 --- a/base/initdefs.jl +++ b/base/initdefs.jl @@ -438,6 +438,11 @@ function atexit(f::Function) end function _atexit(exitcode::Cint) + # this current task shouldn't be scheduled anywhere, but if it was (because + # this exit came from a signal for example), then try to clear that state + # to minimize scheduler issues later + ct = current_task() + q = ct.queue; q === nothing || list_deletefirst!(q::IntrusiveLinkedList{Task}, ct) # Don't hold the lock around the iteration, just in case any other thread executing in # parallel tries to register a new atexit hook while this is running. We don't want to # block that thread from proceeding, and we can allow it to register its hook which we diff --git a/base/stream.jl b/base/stream.jl index a45307b883da8..93aeead79eb9c 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -462,7 +462,7 @@ function closewrite(s::LibuvStream) # try-finally unwinds the sigatomic level, so need to repeat sigatomic_end sigatomic_end() iolock_begin() - ct.queue === nothing || list_deletefirst!(ct.queue::IntrusiveLinkedList{Task}, ct) + q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct) if uv_req_data(req) != C_NULL # req is still alive, # so make sure we won't get spurious notifications later @@ -1076,7 +1076,7 @@ function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt) # try-finally unwinds the sigatomic level, so need to repeat sigatomic_end sigatomic_end() iolock_begin() - ct.queue === nothing || list_deletefirst!(ct.queue::IntrusiveLinkedList{Task}, ct) + q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct) if uv_req_data(uvw) != C_NULL # uvw is still alive, # so make sure we won't get spurious notifications later diff --git a/base/task.jl b/base/task.jl index ae99a71585c85..cf7368ebfef42 100644 --- a/base/task.jl +++ b/base/task.jl @@ -320,6 +320,7 @@ end # just wait for a task to be done, no error propagation function _wait(t::Task) + t === current_task() && Core.throw(ConcurrencyViolationError("deadlock detected: cannot wait on current task")) if !istaskdone(t) donenotify = t.donenotify::ThreadSynchronizer lock(donenotify) @@ -374,7 +375,6 @@ in an error, thrown as a [`TaskFailedException`](@ref) which wraps the failed ta Throws a `ConcurrencyViolationError` if `t` is the currently running task, to prevent deadlocks. """ function wait(t::Task; throw=true) - t === current_task() && Core.throw(ConcurrencyViolationError("deadlock detected: cannot wait on current task")) _wait(t) if throw && istaskfailed(t) Core.throw(TaskFailedException(t)) @@ -813,12 +813,15 @@ macro sync_add(expr) end end -throwto_repl_task(@nospecialize val) = throwto(getfield(active_repl_backend, :backend_task)::Task, val) - -function is_repl_running() - return isdefined(Base, :active_repl_backend) && - (getfield(active_repl_backend, :backend_task)::Task)._state === task_state_runnable && - getfield(active_repl_backend, :in_eval) +function repl_backend_task() + @isdefined(active_repl_backend) || return + backend = active_repl_backend + isdefined(backend, :backend_task) || return + backend_task = getfield(active_repl_backend, :backend_task)::Task + if backend_task._state === task_state_runnable && getfield(backend, :in_eval) + return backend_task + end + return end # runtime system hook called when a task finishes @@ -842,8 +845,9 @@ function task_done_hook(t::Task) end if err && !handled && Threads.threadid() == 1 - if isa(result, InterruptException) && isempty(Workqueue) && is_repl_running() - throwto_repl_task(result) + if isa(result, InterruptException) && isempty(Workqueue) + backend = repl_backend_task() + backend isa Task && throwto(backend, e) end end # Clear sigatomic before waiting @@ -854,11 +858,11 @@ function task_done_hook(t::Task) # If an InterruptException happens while blocked in the event loop, try handing # the exception to the REPL task since the current task is done. # issue #19467 - if Threads.threadid() == 1 && isa(e, InterruptException) && isempty(Workqueue) && is_repl_running() - throwto_repl_task(e) - else - rethrow() + if Threads.threadid() == 1 && isa(e, InterruptException) && isempty(Workqueue) + backend = repl_backend_task() + backend isa Task && throwto(backend, e) end + rethrow() # this will terminate the program end end @@ -1032,7 +1036,7 @@ function schedule(t::Task, @nospecialize(arg); error=false) # schedule a task to be (re)started with the given value or exception t._state === task_state_runnable || Base.error("schedule: Task not runnable") if error - t.queue === nothing || Base.list_deletefirst!(t.queue::IntrusiveLinkedList{Task}, t) + q = t.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, t) setfield!(t, :result, arg) setfield!(t, :_isexception, true) else @@ -1056,7 +1060,7 @@ function yield() try wait() catch - ct.queue === nothing || list_deletefirst!(ct.queue::IntrusiveLinkedList{Task}, ct) + q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct) rethrow() end end diff --git a/src/scheduler.c b/src/scheduler.c index 3505e935afcf6..3cf97ba108873 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -199,6 +199,21 @@ static int sleep_check_after_threshold(uint64_t *start_cycles) JL_NOTSAFEPOINT return 0; } +void surprise_wakeup(jl_ptls_t ptls) JL_NOTSAFEPOINT +{ + // equivalent to wake_thread, without the assert on wasrunning + int8_t state = jl_atomic_load_relaxed(&ptls->sleep_check_state); + if (state == sleeping) { + if (jl_atomic_cmpswap_relaxed(&ptls->sleep_check_state, &state, not_sleeping)) { + // this notification will never be consumed, so we may have now + // introduced some inaccuracy into the count, but that is + // unavoidable with any asynchronous interruption + jl_atomic_fetch_add_relaxed(&n_threads_running, 1); + } + } +} + + static int set_not_sleeping(jl_ptls_t ptls) JL_NOTSAFEPOINT { if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) { diff --git a/src/signal-handling.c b/src/signal-handling.c index febf05b653662..6835f5fa364c5 100644 --- a/src/signal-handling.c +++ b/src/signal-handling.c @@ -427,6 +427,8 @@ void jl_show_sigill(void *_ctx) #endif } +void surprise_wakeup(jl_ptls_t ptls) JL_NOTSAFEPOINT; + // make it invalid for a task to return from this point to its stack // this is generally quite an foolish operation, but does free you up to do // arbitrary things on this stack now without worrying about corrupt state that @@ -439,15 +441,17 @@ void jl_task_frame_noreturn(jl_task_t *ct) JL_NOTSAFEPOINT ct->eh = NULL; ct->world_age = 1; // Force all locks to drop. Is this a good idea? Of course not. But the alternative would probably deadlock instead of crashing. - small_arraylist_t *locks = &ct->ptls->locks; + jl_ptls_t ptls = ct->ptls; + small_arraylist_t *locks = &ptls->locks; for (size_t i = locks->len; i > 0; i--) jl_mutex_unlock_nogc((jl_mutex_t*)locks->items[i - 1]); locks->len = 0; - ct->ptls->in_pure_callback = 0; - ct->ptls->in_finalizer = 0; - ct->ptls->defer_signal = 0; + ptls->in_pure_callback = 0; + ptls->in_finalizer = 0; + ptls->defer_signal = 0; // forcibly exit GC (if we were in it) or safe into unsafe, without the mandatory safepoint - jl_atomic_store_release(&ct->ptls->gc_state, JL_GC_STATE_UNSAFE); + jl_atomic_store_release(&ptls->gc_state, JL_GC_STATE_UNSAFE); + surprise_wakeup(ptls); // allow continuing to use a Task that should have already died--unsafe necromancy! jl_atomic_store_relaxed(&ct->_state, JL_TASK_STATE_RUNNABLE); } @@ -461,6 +465,7 @@ void jl_critical_error(int sig, int si_code, bt_context_t *context, jl_task_t *c size_t i, n = ct ? *bt_size : 0; if (sig) { // kill this task, so that we cannot get back to it accidentally (via an untimely ^C or jlbacktrace in jl_exit) + // and also resets the state of ct and ptls so that some code can run on this task again jl_task_frame_noreturn(ct); #ifndef _OS_WINDOWS_ sigset_t sset; diff --git a/stdlib/Sockets/src/Sockets.jl b/stdlib/Sockets/src/Sockets.jl index 5baf8826cc883..3c30b214305fb 100644 --- a/stdlib/Sockets/src/Sockets.jl +++ b/stdlib/Sockets/src/Sockets.jl @@ -450,7 +450,7 @@ function send(sock::UDPSocket, ipaddr::IPAddr, port::Integer, msg) finally Base.sigatomic_end() iolock_begin() - ct.queue === nothing || Base.list_deletefirst!(ct.queue, ct) + q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct) if uv_req_data(uvw) != C_NULL # uvw is still alive, # so make sure we won't get spurious notifications later diff --git a/stdlib/Sockets/src/addrinfo.jl b/stdlib/Sockets/src/addrinfo.jl index 4ee9e07a58430..866a1684c85a1 100644 --- a/stdlib/Sockets/src/addrinfo.jl +++ b/stdlib/Sockets/src/addrinfo.jl @@ -90,7 +90,7 @@ function getalladdrinfo(host::String) finally Base.sigatomic_end() iolock_begin() - ct.queue === nothing || Base.list_deletefirst!(ct.queue, ct) + q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct) if uv_req_data(req) != C_NULL # req is still alive, # so make sure we don't get spurious notifications later @@ -223,7 +223,7 @@ function getnameinfo(address::Union{IPv4, IPv6}) finally Base.sigatomic_end() iolock_begin() - ct.queue === nothing || Base.list_deletefirst!(ct.queue, ct) + q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct) if uv_req_data(req) != C_NULL # req is still alive, # so make sure we don't get spurious notifications later