diff --git a/AGENTS.md b/AGENTS.md index d81defa16f136..f4f934eec9b4b 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -27,6 +27,20 @@ If you made changes to the runtime (any files in `src/`), you will need to rebui julia. Run `make -j` to rebuild julia. This process may take up to 10 minutes depending on your changes. +### Testing LLVM-related changes + +When making changes to LLVM passes or codegen, add `LLVM_ASSERTIONS=1` to `Make.user` to enable +LLVM assertions. This helps catch IR verification errors early: + +```bash +echo "LLVM_ASSERTIONS=1" >> Make.user +``` + +To run LLVM pass tests: +```bash +make -C test/llvmpasses .ll +``` + After making changes, run static analysis checks: - First run `make -C src install-analysis-deps` to initialize dependencies (only needed once the first time). - Run `make -C src analyze- --output-sync -j8` (replace `` with the basename of any C or C++ file you modified, excluding headers). diff --git a/Compiler/src/effects.jl b/Compiler/src/effects.jl index 3db778943f10a..915f956d824a2 100644 --- a/Compiler/src/effects.jl +++ b/Compiler/src/effects.jl @@ -11,28 +11,32 @@ The output represents the state of different effect properties in the following - `+e` (green): `ALWAYS_TRUE` - `-e` (red): `ALWAYS_FALSE` - `?e` (yellow): `EFFECT_FREE_IF_INACCESSIBLEMEMONLY` -3. `nothrow` (`n`): +3. `reset_safe` (`re`): + - `+re` (green): `ALWAYS_TRUE` + - `-re` (red): `ALWAYS_FALSE` + - `?re` (yellow): `RESET_SAFE_IF_INACCESSIBLEMEMONLY` +4. `nothrow` (`n`): - `+n` (green): `true` - `-n` (red): `false` -4. `terminates` (`t`): +5. `terminates` (`t`): - `+t` (green): `true` - `-t` (red): `false` -5. `notaskstate` (`s`): +6. `notaskstate` (`s`): - `+s` (green): `true` - `-s` (red): `false` -6. `inaccessiblememonly` (`m`): +7. `inaccessiblememonly` (`m`): - `+m` (green): `ALWAYS_TRUE` - `-m` (red): `ALWAYS_FALSE` - `?m` (yellow): `INACCESSIBLEMEM_OR_ARGMEMONLY` -7. `noub` (`u`): +8. `noub` (`u`): - `+u` (green): `true` - `-u` (red): `false` - `?u` (yellow): `NOUB_IF_NOINBOUNDS` -8. `:nonoverlayed` (`o`): +9. `:nonoverlayed` (`o`): - `+o` (green): `ALWAYS_TRUE` - `-o` (red): `ALWAYS_FALSE` - `?o` (yellow): `CONSISTENT_OVERLAY` -9. `:nortcall` (`r`): +10. `:nortcall` (`r`): - `+r` (green): `true` - `-r` (red): `false` """ @@ -62,6 +66,10 @@ following meanings: will not be refined anyway. * `EFFECT_FREE_IF_INACCESSIBLEMEMONLY`: the `:effect-free`-ness of this method can later be refined to `ALWAYS_TRUE` in a case when `:inaccessiblememonly` is proven. +- `reset_safe::UInt8` + * The execution of this function may be interrupted and reset to an earlier cancellation + point at any point in the function. The interpretation is similar to `:effect_free`, + but has different guarantees. - `nothrow::Bool`: this method is guaranteed to not throw an exception. If the execution of this method may raise `MethodError`s and similar exceptions, then the method is not considered as `:nothrow`. @@ -119,6 +127,7 @@ $(effects_key_string) struct Effects consistent::UInt8 effect_free::UInt8 + reset_safe::UInt8 nothrow::Bool terminates::Bool notaskstate::Bool @@ -129,6 +138,7 @@ struct Effects function Effects( consistent::UInt8, effect_free::UInt8, + reset_safe::UInt8, nothrow::Bool, terminates::Bool, notaskstate::Bool, @@ -139,6 +149,7 @@ struct Effects return new( consistent, effect_free, + reset_safe, nothrow, terminates, notaskstate, @@ -175,14 +186,18 @@ const NOUB_IF_NOINBOUNDS = 0x01 << 1 # :nonoverlayed bits const CONSISTENT_OVERLAY = 0x01 << 1 -const EFFECTS_TOTAL = Effects(ALWAYS_TRUE, ALWAYS_TRUE, true, true, true, ALWAYS_TRUE, ALWAYS_TRUE, ALWAYS_TRUE, true) -const EFFECTS_THROWS = Effects(ALWAYS_TRUE, ALWAYS_TRUE, false, true, true, ALWAYS_TRUE, ALWAYS_TRUE, ALWAYS_TRUE, true) -const EFFECTS_UNKNOWN = Effects(ALWAYS_FALSE, ALWAYS_FALSE, false, false, false, ALWAYS_FALSE, ALWAYS_FALSE, ALWAYS_TRUE, false) # unknown mostly, but it's not overlayed at least (e.g. it's not a call) +# :reset_safe bits +const RESET_SAFE_IF_INACCESSIBLEMEMONLY = 0x01 << 1 -function Effects(effects::Effects=Effects( - ALWAYS_FALSE, ALWAYS_FALSE, false, false, false, ALWAYS_FALSE, ALWAYS_FALSE, ALWAYS_FALSE, false); +const EFFECTS_TOTAL = Effects(ALWAYS_TRUE, ALWAYS_TRUE, ALWAYS_TRUE, true, true, true, ALWAYS_TRUE, ALWAYS_TRUE, ALWAYS_TRUE, true) +const EFFECTS_THROWS = Effects(ALWAYS_TRUE, ALWAYS_TRUE, ALWAYS_TRUE, false, true, true, ALWAYS_TRUE, ALWAYS_TRUE, ALWAYS_TRUE, true) +const EFFECTS_UNKNOWN = Effects(ALWAYS_FALSE, ALWAYS_FALSE, ALWAYS_FALSE, false, false, false, ALWAYS_FALSE, ALWAYS_FALSE, ALWAYS_TRUE, false) # unknown mostly, but it's not overlayed at least (e.g. it's not a call) +const EFFECTS_MINIMAL = Effects(ALWAYS_FALSE, ALWAYS_FALSE, ALWAYS_FALSE, false, false, false, ALWAYS_FALSE, ALWAYS_FALSE, ALWAYS_FALSE, false) + +function Effects(effects::Effects=EFFECTS_MINIMAL; consistent::UInt8 = effects.consistent, effect_free::UInt8 = effects.effect_free, + reset_safe::UInt8 = effects.reset_safe, nothrow::Bool = effects.nothrow, terminates::Bool = effects.terminates, notaskstate::Bool = effects.notaskstate, @@ -193,6 +208,7 @@ function Effects(effects::Effects=Effects( return Effects( consistent, effect_free, + reset_safe, nothrow, terminates, notaskstate, @@ -225,6 +241,14 @@ function is_better_effects(new::Effects, old::Effects) elseif new.effect_free != old.effect_free return false end + if new.reset_safe == ALWAYS_TRUE + any_improved |= old.reset_safe != ALWAYS_TRUE + elseif new.reset_safe == RESET_SAFE_IF_INACCESSIBLEMEMONLY + old.reset_safe == ALWAYS_TRUE && return false + any_improved |= old.reset_safe != RESET_SAFE_IF_INACCESSIBLEMEMONLY + elseif new.reset_safe != old.reset_safe + return false + end if new.nothrow any_improved |= !old.nothrow elseif new.nothrow != old.nothrow @@ -276,6 +300,7 @@ function merge_effects(old::Effects, new::Effects) return Effects( merge_effectbits(old.consistent, new.consistent), merge_effectbits(old.effect_free, new.effect_free), + merge_effectbits(old.reset_safe, new.reset_safe), merge_effectbits(old.nothrow, new.nothrow), merge_effectbits(old.terminates, new.terminates), merge_effectbits(old.notaskstate, new.notaskstate), @@ -295,6 +320,7 @@ merge_effectbits(old::Bool, new::Bool) = old & new is_consistent(effects::Effects) = effects.consistent === ALWAYS_TRUE is_effect_free(effects::Effects) = effects.effect_free === ALWAYS_TRUE +is_reset_safe(effects::Effects) = effects.reset_safe === ALWAYS_TRUE is_nothrow(effects::Effects) = effects.nothrow is_terminates(effects::Effects) = effects.terminates is_notaskstate(effects::Effects) = effects.notaskstate @@ -331,6 +357,8 @@ is_consistent_if_inaccessiblememonly(effects::Effects) = !iszero(effects.consist is_effect_free_if_inaccessiblememonly(effects::Effects) = !iszero(effects.effect_free & EFFECT_FREE_IF_INACCESSIBLEMEMONLY) +is_reset_safe_if_inaccessiblememonly(effects::Effects) = !iszero(effects.reset_safe & RESET_SAFE_IF_INACCESSIBLEMEMONLY) + is_inaccessiblemem_or_argmemonly(effects::Effects) = effects.inaccessiblememonly === INACCESSIBLEMEM_OR_ARGMEMONLY is_consistent_overlay(effects::Effects) = effects.nonoverlayed === CONSISTENT_OVERLAY @@ -345,13 +373,15 @@ function encode_effects(e::Effects) ((e.inaccessiblememonly % UInt32) << 8) | ((e.noub % UInt32) << 10) | ((e.nonoverlayed % UInt32) << 12) | - ((e.nortcall % UInt32) << 14) + ((e.nortcall % UInt32) << 14) | + ((e.reset_safe % UInt32) << 15) end function decode_effects(e::UInt32) return Effects( UInt8((e >> 0) & 0x07), UInt8((e >> 3) & 0x03), + UInt8((e >> 15) & 0x03), Bool((e >> 5) & 0x01), Bool((e >> 6) & 0x01), Bool((e >> 7) & 0x01), diff --git a/Compiler/src/optimize.jl b/Compiler/src/optimize.jl index b5704c488d273..7d3d9a1dacdef 100644 --- a/Compiler/src/optimize.jl +++ b/Compiler/src/optimize.jl @@ -39,6 +39,9 @@ const IR_FLAG_NOUB = one(UInt32) << 10 #const IR_FLAG_CONSISTENTOVERLAY = one(UInt32) << 12 # This statement is :nortcall const IR_FLAG_NORTCALL = one(UInt32) << 13 +# This statement is proven :reset_safe +const IR_FLAG_RESET_SAFE = one(UInt32) << 14 +# Reserved: one(UInt32) << 15 used for RSIIMO below # An optimization pass has updated this statement in a way that may # have exposed information that inference did not see. Re-running # inference on this statement may be profitable. @@ -50,16 +53,18 @@ const IR_FLAG_UNUSED = one(UInt32) << 17 const IR_FLAG_EFIIMO = one(UInt32) << 18 # This statement is :inaccessiblememonly == INACCESSIBLEMEM_OR_ARGMEMONLY const IR_FLAG_INACCESSIBLEMEM_OR_ARGMEM = one(UInt32) << 19 +# This statement is :reset_safe == RESET_SAFE_IF_INACCESSIBLEMEMONLY +const IR_FLAG_RSIIMO = one(UInt32) << 20 const NUM_IR_FLAGS = 3 # sync with julia.h const IR_FLAGS_EFFECTS = IR_FLAG_CONSISTENT | IR_FLAG_EFFECT_FREE | IR_FLAG_NOTHROW | - IR_FLAG_TERMINATES | IR_FLAG_NOUB | IR_FLAG_NORTCALL + IR_FLAG_TERMINATES | IR_FLAG_NOUB | IR_FLAG_NORTCALL | IR_FLAG_RESET_SAFE const IR_FLAGS_REMOVABLE = IR_FLAG_EFFECT_FREE | IR_FLAG_NOTHROW | IR_FLAG_TERMINATES -const IR_FLAGS_NEEDS_EA = IR_FLAG_EFIIMO | IR_FLAG_INACCESSIBLEMEM_OR_ARGMEM +const IR_FLAGS_NEEDS_EA = IR_FLAG_EFIIMO | IR_FLAG_INACCESSIBLEMEM_OR_ARGMEM | IR_FLAG_RSIIMO has_flag(curr::UInt32, flag::UInt32) = (curr & flag) == flag @@ -79,6 +84,11 @@ function flags_for_effects(effects::Effects) elseif is_effect_free_if_inaccessiblememonly(effects) flags |= IR_FLAG_EFIIMO end + if is_reset_safe(effects) + flags |= IR_FLAG_RESET_SAFE + elseif is_reset_safe_if_inaccessiblememonly(effects) + flags |= IR_FLAG_RSIIMO + end if is_nothrow(effects) flags |= IR_FLAG_NOTHROW end diff --git a/Compiler/src/ssair/show.jl b/Compiler/src/ssair/show.jl index 2947a381be959..e8e732ec73af6 100644 --- a/Compiler/src/ssair/show.jl +++ b/Compiler/src/ssair/show.jl @@ -1062,7 +1062,7 @@ function show_ir(io::IO, compact::IncrementalCompact, config::IRShowConfig=defau finish_show_ir(io, uncompacted_cfg, config) end -function effectbits_letter(effects::Effects, name::Symbol, suffix::Char) +function effectbits_letter(effects::Effects, name::Symbol, suffix::Union{Char, String}) ft = fieldtype(Effects, name) if ft === UInt8 prefix = getfield(effects, name) === ALWAYS_TRUE ? '+' : @@ -1094,6 +1094,8 @@ function Base.show(io::IO, e::Effects) print(io, ',') printstyled(io, effectbits_letter(e, :effect_free, 'e'); color=effectbits_color(e, :effect_free)) print(io, ',') + printstyled(io, effectbits_letter(e, :reset_safe, "re"); color=effectbits_color(e, :reset_safe)) + print(io, ',') printstyled(io, effectbits_letter(e, :nothrow, 'n'); color=effectbits_color(e, :nothrow)) print(io, ',') printstyled(io, effectbits_letter(e, :terminates, 't'); color=effectbits_color(e, :terminates)) diff --git a/Compiler/src/tfuncs.jl b/Compiler/src/tfuncs.jl index 3c23974c88920..2656c7bb1ee1a 100644 --- a/Compiler/src/tfuncs.jl +++ b/Compiler/src/tfuncs.jl @@ -716,7 +716,7 @@ end @nospecs function pointerset_tfunc(๐•ƒ::AbstractLattice, a, v, i, align) return a end -@nospecs function atomic_fence_tfunc(๐•ƒ::AbstractLattice, order) +@nospecs function atomic_fence_tfunc(๐•ƒ::AbstractLattice, order, syncscope) return Nothing end @nospecs function atomic_pointerref_tfunc(๐•ƒ::AbstractLattice, a, order) @@ -757,7 +757,7 @@ add_tfunc(add_ptr, 2, 2, pointerarith_tfunc, 1) add_tfunc(sub_ptr, 2, 2, pointerarith_tfunc, 1) add_tfunc(pointerref, 3, 3, pointerref_tfunc, 4) add_tfunc(pointerset, 4, 4, pointerset_tfunc, 5) -add_tfunc(atomic_fence, 1, 1, atomic_fence_tfunc, 4) +add_tfunc(atomic_fence, 2, 2, atomic_fence_tfunc, 4) add_tfunc(atomic_pointerref, 2, 2, atomic_pointerref_tfunc, 4) add_tfunc(atomic_pointerset, 3, 3, atomic_pointerset_tfunc, 5) add_tfunc(atomic_pointerswap, 3, 3, atomic_pointerswap_tfunc, 5) diff --git a/NEWS.md b/NEWS.md index bd72e673728f0..571ad6f37ac80 100644 --- a/NEWS.md +++ b/NEWS.md @@ -21,6 +21,10 @@ Command-line option changes Multi-threading changes ----------------------- + - New functions `Threads.atomic_fence_heavy` and `Threads.atoimc_fence_light` provide support for + asymmetric atomic fences, speeding up atomic synchronization where one side of the synchronization + runs significantly less often than the other ([#60311]). + Build system changes -------------------- diff --git a/base/Base.jl b/base/Base.jl index 57d9915239fdf..eeb047125ce34 100644 --- a/base/Base.jl +++ b/base/Base.jl @@ -371,6 +371,35 @@ function start_profile_listener() ccall(:jl_set_peek_cond, Cvoid, (Ptr{Cvoid},), cond.handle) end +function sigint_listener(cond::AsyncCondition) + while _trywait(cond) + # The SIGINT handler should have set a cancellation request on the roottask + cr = @atomic :acquire roottask.cancellation_request + cr === nothing && continue + cancel!(roottask, cr) + end + nothing +end + +function start_sigint_listener() + cond = AsyncCondition() + uv_unref(cond.handle) + t = errormonitor(Threads.@spawn(sigint_listener(cond))) + atexit() do + # destroy this callback when exiting + ccall(:jl_set_sigint_cond, Cvoid, (Ptr{Cvoid},), C_NULL) + # this will prompt any ongoing or pending event to flush also + close(cond) + # error-propagation is not needed, since the errormonitor will handle printing that better + t === current_task() || _wait(t) + end + finalizer(cond) do c + # if something goes south, still make sure we aren't keeping a reference in C to this + ccall(:jl_set_sigint_cond, Cvoid, (Ptr{Cvoid},), C_NULL) + end + ccall(:jl_set_sigint_cond, Cvoid, (Ptr{Cvoid},), cond.handle) +end + function __init__() # Base library init global _atexit_hooks_finished = false @@ -394,6 +423,7 @@ function __init__() # triggering a profile via signals is not implemented on windows start_profile_listener() end + start_sigint_listener() _require_world_age[] = get_world_counter() # Prevent spawned Julia process from getting stuck waiting on Tracy to connect. delete!(ENV, "JULIA_WAIT_FOR_TRACY") diff --git a/base/asyncevent.jl b/base/asyncevent.jl index a4a82b4aba120..771d89f796b58 100644 --- a/base/asyncevent.jl +++ b/base/asyncevent.jl @@ -165,7 +165,7 @@ function _trywait(t::Union{Timer, AsyncCondition}) set = t.set if set # full barrier now for AsyncCondition - t isa Timer || Core.Intrinsics.atomic_fence(:acquire_release) + t isa Timer || Core.Intrinsics.atomic_fence(:acquire_release, :system) else if !isopen(t) set = t.set @@ -183,7 +183,7 @@ function _trywait(t::Union{Timer, AsyncCondition}) set = t.set if !set && t.handle != C_NULL # wait for set or handle, but not the isopen flag iolock_end() - set = wait(t.cond) + set = wait(t.cond; waitee=t) unlock(t.cond) iolock_begin() lock(t.cond) @@ -199,8 +199,14 @@ function _trywait(t::Union{Timer, AsyncCondition}) return set end +cancel_wait!(t::Union{Timer, AsyncCondition}, @nospecialize(creq)) = false +cancel_wait!(t::Union{Timer, AsyncCondition}, task::Task, @nospecialize(creq)) = + cancel_wait!(t.cond, task, creq, false; waitee=t) + function wait(t::Union{Timer, AsyncCondition}) - _trywait(t) || throw(EOFError()) + ok = _trywait(t) + @cancel_check + ok || throw(EOFError()) nothing end diff --git a/base/atomics.jl b/base/atomics.jl index 432c9120939ac..21689e06ba1a1 100644 --- a/base/atomics.jl +++ b/base/atomics.jl @@ -10,7 +10,7 @@ export atomic_add!, atomic_sub!, atomic_and!, atomic_nand!, atomic_or!, atomic_xor!, atomic_max!, atomic_min!, - atomic_fence + atomic_fence, atomic_fence_light, atomic_fence_heavy """ Threads.Atomic{T} @@ -329,4 +329,28 @@ fences should not be necessary in most cases. For further details, see LLVM's `fence` instruction. """ -atomic_fence() = Core.Intrinsics.atomic_fence(:sequentially_consistent) +atomic_fence() = Core.Intrinsics.atomic_fence(:sequentially_consistent, :system) + +""" + Threads.atomic_fence_light() + +Insert the light side of an asymmetric sequential-consistency memory fence. +Asymmetric memory fences are useful in scenarios where one side of the +synchronization runs significantly less often than the other side. Use this +function on the side that runs often and [`atomic_fence_heavy`](@ref) on the +side that runs rarely. + +On supported operating systems and architectures this fence is cheaper than +`Threads.atomic_fence()`, but synchronizes only with [`atomic_fence_heavy`](@ref) +calls from other threads. +""" +atomic_fence_light() = Core.Intrinsics.atomic_fence(:sequentially_consistent, :singlethread) + +""" + Threads.atomic_fence_heavy() + +Insert the heavy side of an asymmetric sequential-consistency memory fence. +Use this function on the side that runs rarely. +See [`atomic_fence_light`](@ref) for more details. +""" +atomic_fence_heavy() = ccall(:jl_membarrier, Cvoid, ()) diff --git a/base/condition.jl b/base/condition.jl index fd771c9be346a..c3520fdf379a9 100644 --- a/base/condition.jl +++ b/base/condition.jl @@ -69,6 +69,8 @@ struct GenericCondition{L<:AbstractLock} GenericCondition(l::AbstractLock) = new{typeof(l)}(IntrusiveLinkedList{Task}(), l) end +waitqueue(c::GenericCondition) = ILLRef(c.waitq, c) + show(io::IO, c::GenericCondition) = print(io, GenericCondition, "(", c.lock, ")") assert_havelock(c::GenericCondition) = assert_havelock(c.lock) @@ -80,13 +82,13 @@ islocked(c::GenericCondition) = islocked(c.lock) lock(f, c::GenericCondition) = lock(f, c.lock) # have waiter wait for c -function _wait2(c::GenericCondition, waiter::Task, first::Bool=false) +function _wait2(c::GenericCondition, waiter::Task, waitee=c, first::Bool=false) ct = current_task() assert_havelock(c) if first - pushfirst!(c.waitq, waiter) + pushfirst!(ILLRef(waitqueue(c), waitee), waiter) else - push!(c.waitq, waiter) + push!(ILLRef(waitqueue(c), waitee), waiter) end # since _wait2 is similar to schedule, we should observe the sticky bit now if waiter.sticky && Threads.threadid(waiter) == 0 && !GC.in_finalizer() @@ -125,6 +127,13 @@ proceeding. """ function wait end +macro cancel_check() + quote + local req = Core.cancellation_point!() + req !== nothing && handle_cancellation!(req) + end +end + """ wait(c::GenericCondition; first::Bool=false) @@ -133,18 +142,57 @@ Wait for [`notify`](@ref) on `c` and return the `val` parameter passed to `notif If the keyword `first` is set to `true`, the waiter will be put _first_ in line to wake up on `notify`. Otherwise, `wait` has first-in-first-out (FIFO) behavior. """ -function wait(c::GenericCondition; first::Bool=false) +function wait(c::GenericCondition; first::Bool=false, waitee=c, expected_cancellation=nothing) ct = current_task() - _wait2(c, ct, first) + _wait2(c, ct, waitee, first) + + cr = pre_sleep_cancellation_request() + if cr !== expected_cancellation + Base.list_deletefirst!(waitqueue(c), ct) + return invokelatest(cancel_wait!, waitee, cr) + end token = unlockall(c.lock) - try - return wait() + + ret = try + wait() catch - q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct) - rethrow() - finally relockall(c.lock, token) + # This cleans up our entry in the waitqueue if we were resumes from an + # unexpected `throwto`. Modern code should generally avoid this pattern. + q = ct.queue; q === waitee && Base.list_deletefirst!(waitqueue(c), ct) + rethrow() + end + + relockall(c.lock, token) + return ret +end + +function cancel_wait!(c::GenericCondition, creq; waitee = c) + throw(creq) +end + +function cancel_wait!(c::GenericCondition, t::Task, @nospecialize(creq); waitee = c) + lock(c) + if t.queue !== waitee + unlock(c) + return false + end + Base.list_deletefirst!(ILLRef(waitqueue(c), waitee), t) + schedule(t, conform_cancellation_request(creq), error=true) + unlock(c) + return true +end + +function cancel_wait!(c::GenericCondition, t::Task, @nospecialize(creq), @nospecialize(val); waitee=c) + lock(c) + if t.queue !== waitee + unlock(c) + return false end + Base.list_deletefirst!(ILLRef(waitqueue(c), waitee), t) + schedule(t, val) + unlock(c) + return true end """ @@ -160,8 +208,8 @@ Return the count of tasks woken up. Return 0 if no tasks are waiting on `conditi function notify(c::GenericCondition, @nospecialize(arg), all, error) assert_havelock(c) cnt = 0 - while !isempty(c.waitq) - t = popfirst!(c.waitq) + while !isempty(waitqueue(c)) + t = popfirst!(waitqueue(c)) schedule(t, arg, error=error) cnt += 1 all || break diff --git a/base/experimental.jl b/base/experimental.jl index 2deb3bc76af6c..f722baf16dc21 100644 --- a/base/experimental.jl +++ b/base/experimental.jl @@ -646,7 +646,7 @@ millisecond. """ function wait_with_timeout(c::GenericCondition; first::Bool=false, timeout::Real=0.0) ct = current_task() - Base._wait2(c, ct, first) + Base._wait2(c, ct, c, first) token = Base.unlockall(c.lock) timer::Union{Timer, Nothing} = nothing diff --git a/base/linked_list.jl b/base/linked_list.jl index c477dc56bdb2b..9df72622e7957 100644 --- a/base/linked_list.jl +++ b/base/linked_list.jl @@ -1,12 +1,19 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license mutable struct IntrusiveLinkedList{T} - # Invasive list requires that T have a field `.next >: U{T, Nothing}` and `.queue >: U{ILL{T}, Nothing}` + # Invasive list requires that T have a field `.next >: U{T, Nothing}` and `.queue::Any` head::Union{T, Nothing} tail::Union{T, Nothing} IntrusiveLinkedList{T}() where {T} = new{T}(nothing, nothing) end +struct ILLRef{T} + list::IntrusiveLinkedList{T} + waitee::Any # Invariant: waitqueue(waitee).list === list +end +ILLRef(ref::ILLRef, @nospecialize(waitee)) = typeof(ref)(ref.list, waitee) +waitqueue(list::IntrusiveLinkedList{T}) where {T} = ILLRef(list, list) + #const list_append!! = append! #const list_deletefirst! = delete! @@ -49,9 +56,13 @@ function list_append!!(q::IntrusiveLinkedList{T}, q2::IntrusiveLinkedList{T}) wh return q end -function push!(q::IntrusiveLinkedList{T}, val::T) where T +isempty(qr::ILLRef{T}) where T = isempty(qr.list) +length(qr::ILLRef{T}) where T = length(qr.list) + +function push!(qr::ILLRef{T}, val::T) where T val.queue === nothing || error("val already in a list") - val.queue = q + val.queue = qr.waitee + q = qr.list tail = q.tail if tail === nothing q.head = q.tail = val @@ -62,9 +73,10 @@ function push!(q::IntrusiveLinkedList{T}, val::T) where T return q end -function pushfirst!(q::IntrusiveLinkedList{T}, val::T) where T +function pushfirst!(qr::ILLRef{T}, val::T) where T val.queue === nothing || error("val already in a list") - val.queue = q + val.queue = qr.waitee + q = qr.list head = q.head if head === nothing q.head = q.tail = val @@ -75,21 +87,23 @@ function pushfirst!(q::IntrusiveLinkedList{T}, val::T) where T return q end -function pop!(q::IntrusiveLinkedList{T}) where {T} - val = q.tail::T +function pop!(qr::ILLRef{T}) where {T} + val = qr.list.tail::T list_deletefirst!(q, val) # expensive! return val end -function popfirst!(q::IntrusiveLinkedList{T}) where {T} - val = q.head::T - list_deletefirst!(q, val) # cheap +function popfirst!(qr::ILLRef{T}) where {T} + val = qr.list.head::T + list_deletefirst!(qr, val) # cheap return val end # this function assumes `val` is found in `q` -function list_deletefirst!(q::IntrusiveLinkedList{T}, val::T) where T - val.queue === q || return +function list_deletefirst!(qr::ILLRef{T}, val::T) where T +# (val.queue === qr.waitee || +# val.queue === qr.list) || throw(ConcurrencyViolationError("attempt to delete from wrong list")) + q = qr.list head = q.head::T if head === val if q.tail::T === val @@ -115,6 +129,24 @@ function list_deletefirst!(q::IntrusiveLinkedList{T}, val::T) where T return q end +function in(val::T, list::IntrusiveLinkedList{T}) where T + head = list.head + while head !== nothing + if val === head + return true + end + head = head.next + end + return false +end + +# TODO: Delete this compatibility wrapper +list_deletefirst!(q::IntrusiveLinkedList{T}, val::T) where T = list_deletefirst!(ILLRef(q, q), val) +push!(q::IntrusiveLinkedList{T}, val::T) where T = push!(ILLRef(q, q), val) +pushfirst!(q::IntrusiveLinkedList{T}, val::T) where T = pushfirst!(ILLRef(q, q), val) +pop!(q::IntrusiveLinkedList{T}) where T = pop!(ILLRef(q, q)) +popfirst!(q::IntrusiveLinkedList{T}) where T = popfirst!(ILLRef(q, q)) + #function list_deletefirst!(q::Array{T}, val::T) where T # i = findfirst(isequal(val), q) # i === nothing || deleteat!(q, i) diff --git a/base/locks-mt.jl b/base/locks-mt.jl index 237e0d9856996..af8513044c0f3 100644 --- a/base/locks-mt.jl +++ b/base/locks-mt.jl @@ -108,3 +108,5 @@ end function islocked(l::AbstractSpinLock) return (@atomic :monotonic l.owned) != 0 end + +Base.show(io::IO, ::AbstractSpinLock) = print(io, typeof(ans), "(", islocked(ans) ? "locked" : "unlocked", ")") diff --git a/base/stream.jl b/base/stream.jl index 7b227458ec552..f74b900fd81c3 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -1061,28 +1061,51 @@ end uv_write(s::LibuvStream, p::Vector{UInt8}) = GC.@preserve p uv_write(s, pointer(p), UInt(sizeof(p))) # caller must have acquired the iolock -function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt) - uvw = uv_write_async(s, p, n) +function uv_write_noncancel(s::LibuvStream, p::Ptr{UInt8}, n::UInt) + # Establish the wait object early so that if we get cancelled, we don't + # have to go to libuv in first place. ct = current_task() + ct.queue = s + + cr = pre_sleep_cancellation_request() + if cr !== nothing + cr.queue = nothing + return 0 + end + + uvw = uv_write_async(s, p, n) + # TODO: If the request was split above, this is wrong + ct.next = uvw + preserve_handle(ct) sigatomic_begin() uv_req_set_data(uvw, ct) iolock_end() - local status + local nwritten try sigatomic_end() # wait for the last chunk to complete (or error) # assume that any errors would be sticky, # (so we don't need to monitor the error status of the intermediate writes) - status = wait()::Cint + nwritten = wait()::Csize_t sigatomic_begin() finally # try-finally unwinds the sigatomic level, so need to repeat sigatomic_end sigatomic_end() iolock_begin() - q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct) + if ct.queue === s + # Only happens in unexpected error cases. Cancellation queues a proper + # callback, which unsets this. + ct.next = nothing + ct.queue = nothing + end if uv_req_data(uvw) != C_NULL - # uvw is still alive, + # uvw is still alive - likely because we got some unexpected throwto + # exception. We try to cancel the request to avoid spamming if this + # is something the user is looking at. Note that cancellation does + # not go through this path and instead returns the number of written + # bytes to the caller. + ccall(:uv_cancel, Cint, (Ptr{Cvoid},), uvw) # Ignore errors # so make sure we won't get spurious notifications later uv_req_set_data(uvw, C_NULL) else @@ -1092,10 +1115,27 @@ function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt) iolock_end() unpreserve_handle(ct) end - if status < 0 - throw(_UVError("write", status)) + return Int(nwritten) +end + +function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt) + nb = uv_write_noncancel(s, p, n) + @cancel_check + @assert nb == n + return nb +end + +function cancel_wait!(s::LibuvStream, t::Task, @nospecialize(creq)) + iolock_begin() + if t.queue !== s + iolock_end() + return false end - return Int(n) + uvw = t.next + @assert uvw !== nothing && uvw != C_NULL + ccall(:uv_cancel, Cint, (Ptr{Cvoid},), uvw) # Ignore errors + iolock_end() + return true end # helper function for uv_write that returns the uv_write_t struct for the write @@ -1111,7 +1151,7 @@ function uv_write_async(s::LibuvStream, p::Ptr{UInt8}, n::UInt) Int32, (Ptr{Cvoid}, Ptr{Cvoid}, UInt, Ptr{Cvoid}, Ptr{Cvoid}), s, p, nwrite, uvw, - @cfunction(uv_writecb_task, Cvoid, (Ptr{Cvoid}, Cint))) + @cfunction(uv_writecb_task, Cvoid, (Ptr{Cvoid}, Cint, Csize_t))) if err < 0 Libc.free(uvw) uv_error("write", err) @@ -1188,12 +1228,18 @@ function write(s::LibuvStream, b::UInt8) return write(s, Ref{UInt8}(b)) end -function uv_writecb_task(req::Ptr{Cvoid}, status::Cint) +function uv_writecb_task(req::Ptr{Cvoid}, status::Cint, nwritten::Csize_t) d = uv_req_data(req) if d != C_NULL uv_req_set_data(req, C_NULL) # let the Task know we got the writecb t = unsafe_pointer_to_objref(d)::Task - schedule(t, status) + t.next = nothing + t.queue = nothing + if status != 0 && status != UV_ECANCELED + schedule(t, _UVError("write", status); error=true) + else + schedule(t, nwritten) + end else # no owner for this req, safe to just free it Libc.free(req) diff --git a/base/task.jl b/base/task.jl index 244a8f70a768a..0d075f293f672 100644 --- a/base/task.jl +++ b/base/task.jl @@ -145,9 +145,11 @@ end # task states -const task_state_runnable = UInt8(0) -const task_state_done = UInt8(1) -const task_state_failed = UInt8(2) +const task_state_runnable = UInt8(0) +const task_state_done = UInt8(1) +const task_state_failed = UInt8(2) +# like _failed, but allows schedule to succeed +const task_state_cancelled = UInt8(3) @inline function getproperty(t::Task, field::Symbol) if field === :state @@ -159,6 +161,8 @@ const task_state_failed = UInt8(2) return :done elseif st === task_state_failed return :failed + elseif st === task_state_cancelled + return :cancelled else @assert false end @@ -250,7 +254,10 @@ true !!! compat "Julia 1.3" This function requires at least Julia 1.3. """ -istaskfailed(t::Task) = ((@atomic :acquire t._state) === task_state_failed) +function istaskfailed(t::Task) + state = (@atomic :acquire t._state) + return state === task_state_failed || state === task_state_cancelled +end Threads.threadid(t::Task) = Int(ccall(:jl_get_task_tid, Int16, (Any,), t)+1) function Threads.threadpool(t::Task) @@ -302,14 +309,14 @@ function task_local_storage(body::Function, key, val) end # just wait for a task to be done, no error propagation -function _wait(t::Task) +function _wait(t::Task; expected_cancellation = nothing) t === current_task() && Core.throw(ConcurrencyViolationError("deadlock detected: cannot wait on current task")) if !istaskdone(t) donenotify = t.donenotify::ThreadSynchronizer lock(donenotify) try - while !istaskdone(t) - wait(donenotify) + while !istaskdone(t) && cancellation_request() === expected_cancellation + wait(donenotify; waitee=t, expected_cancellation) end finally unlock(donenotify) @@ -318,6 +325,11 @@ function _wait(t::Task) nothing end +# We handle cancellation explicitly above - just suppress the error here +cancel_wait!(waitee::Task, @nospecialize(creq)) = nothing +cancel_wait!(waitee::Task, waiter::Task, @nospecialize(creq)) = + cancel_wait!(waitee.donenotify, waiter, creq, nothing; waitee) + # have `waiter` wait for `t` function _wait2(t::Task, waiter::Task) if !istaskdone(t) @@ -359,12 +371,28 @@ Throws a `ConcurrencyViolationError` if `t` is the currently running task, to pr """ function wait(t::Task; throw=true) _wait(t) + cr = cancellation_request_or_yield() + if cr !== nothing + propagate_cancellation!(t, cr) + end if throw && istaskfailed(t) Core.throw(TaskFailedException(t)) end nothing end +""" + wait_nocancel(t::Task) + +Like `wait`, but do not propagate cancellation of this task to the waited-on task. +""" +function wait_nocancel(t::Task; throw=true) + _wait(t) + if throw && istaskfailed(t) + Core.throw(TaskFailedException(t)) + end +end + # Wait multiple tasks """ @@ -587,6 +615,10 @@ function sync_end(c::Channel{Any}) r = take!(c) if isa(r, Task) _wait(r) + cr = cancellation_request_or_yield() + if cr !== nothing + return sync_cancel!(c, r, cr, @isdefined(c_ex) ? c_ex : CompositeException()) + end if istaskfailed(r) if !@isdefined(c_ex) c_ex = CompositeException() @@ -899,12 +931,13 @@ mutable struct IntrusiveLinkedListSynchronized{T} lock::Threads.SpinLock IntrusiveLinkedListSynchronized{T}() where {T} = new(IntrusiveLinkedList{T}(), Threads.SpinLock()) end -isempty(W::IntrusiveLinkedListSynchronized) = isempty(W.queue) -length(W::IntrusiveLinkedListSynchronized) = length(W.queue) +waitqueue(l::IntrusiveLinkedListSynchronized) = ILLRef(l.queue, l) +isempty(W::IntrusiveLinkedListSynchronized) = isempty(waitqueue(W)) +length(W::IntrusiveLinkedListSynchronized) = length(waitqueue(W)) function push!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T lock(W.lock) try - push!(W.queue, t) + push!(waitqueue(W), t) finally unlock(W.lock) end @@ -913,7 +946,7 @@ end function pushfirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T lock(W.lock) try - pushfirst!(W.queue, t) + pushfirst!(waitqueue(W), t) finally unlock(W.lock) end @@ -922,7 +955,7 @@ end function pop!(W::IntrusiveLinkedListSynchronized) lock(W.lock) try - return pop!(W.queue) + return pop!(waitqueue(W)) finally unlock(W.lock) end @@ -930,7 +963,7 @@ end function popfirst!(W::IntrusiveLinkedListSynchronized) lock(W.lock) try - return popfirst!(W.queue) + return popfirst!(waitqueue(W)) finally unlock(W.lock) end @@ -938,7 +971,7 @@ end function list_deletefirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T lock(W.lock) try - list_deletefirst!(W.queue, t) + list_deletefirst!(waitqueue(W), t) finally unlock(W.lock) end @@ -952,8 +985,21 @@ const Workqueue = Workqueues[1] # default work queue is thread 1 // TODO: deprec workqueue_for(tid::Int) = Workqueues[tid] function enq_work(t::Task) - (t._state === task_state_runnable && t.queue === nothing) || error("schedule: Task not runnable") + state = t._state + if state === task_state_cancelled + # When canelled, we allow `enq_work`, but simply transition to failed state. + # All other task cleanup is already done. + state = (@atomicreplace t._state task_state_cancelled => task_state_failed).old + # Catch double `schedule` calls on cancelled tasks. + state === task_state_cancelled && return + end + if !(state === task_state_runnable && t.queue === nothing) + error("schedule: Task not runnable") + end + _enq_work(t) +end +function _enq_work(t::Task) # Sticky tasks go into their thread's work queue. if t.sticky tid = Threads.threadid(t) @@ -1042,18 +1088,19 @@ true """ function schedule(t::Task, @nospecialize(arg); error=false) # schedule a task to be (re)started with the given value or exception - t._state === task_state_runnable || Base.error("schedule: Task not runnable") - if error - q = t.queue; q === nothing || list_deletefirst!(q::IntrusiveLinkedList{Task}, t) - setfield!(t, :result, arg) - setfield!(t, :_isexception, true) - else - t.queue === nothing || Base.error("schedule: Task not runnable") - setfield!(t, :result, arg) + state = t._state + if t._state === task_state_runnable + if error + q = t.queue; q === nothing || list_deletefirst!(q::IntrusiveLinkedList{Task}, t) + setfield!(t, :result, arg) + setfield!(t, :_isexception, true) + else + t.queue === nothing || Base.error("schedule: Task not runnable") + setfield!(t, :result, arg) + end end # [task] created -scheduled-> wait_time - maybe_record_enqueued!(t) - enq_work(t) + schedule(t) return t end @@ -1272,3 +1319,305 @@ function maybe_record_enqueued!(t::Task) end return t end + +## Cancellation + +struct CancellationRequest + request::UInt8 +end + +""" + CANCEL_REQUEST_SAFE + +Request safe cancelation of the current task. If the task is waiting for any +other resources, it will request safe cancellation of any such resources and +wait for the cancellation of such resources to be completed. + +As a result, if either the task itself or any of its dependent resources are +currently unable to process cancelation, the request may hang and a more +aggressive cancelation method may be required. However, in general _SAFE +should be tried first. +""" +const CANCEL_REQUEST_SAFE = CancellationRequest(0x0) + +""" + CANCEL_REQUEST_ACK + +Set by the task itself to indicate that a (safe) cancellation request was +received and acknowledged, but that there are dependent tasks for whom +cancelation is still pending. +""" +const CANCEL_REQUEST_ACK = CancellationRequest(0x1) + +""" + CANCEL_REQUEST_QUERY + +Request that the system create an asynchronous report of why the task is currently +not able to be canceled. The report will be provided in the ->cancelation_request +field of the current task (as long as this field is still CANCEL_REQUEST_QUERY). + +N.B.: Transition to CANCEL_REQUEST_QUERY is only allowed from CANCEL_REQUEST_ACK. + Once the waiting task has read the cancelation report, it may set the cancelation + request back to CANCEL_REQUEST_ACK. +""" +const CANCEL_REQUEST_QUERY = CancellationRequest(0x2) + +""" + CANCEL_REQUEST_ABANDON_EXTERNAL + +Request a cancelation that will cease waiting for any external resources (e.g. I/O objects) +without going through a safe cancelation procedure for such resources. However, the +task will wait for any internal computational tasks to complete cancelation. + +This is a middleground between CANCEL_REQUEST_SAFE and CANCEL_REQUEST_ABANDON_ALL. As external +I/O is often engineered for robustness in case of sudden disapperance of peers +""" +const CANCEL_REQUEST_ABANDON_EXTERNAL = CancellationRequest(0x3) + +""" + CANCEL_REQUEST_ABANDON_ALL + +Request a cancelation that will cease waiting for all external resources and all unacknowledged +internal tasks. Such tasks will be frozen and become unschedulable in the future. + +!!! warning + If any canceled task has acquired locks or other resources that are contested, this method of + cancelation may leak such resources and create deadlocks in future code. It is intended as a + last-resort method to recover a system, but the necessity of this operation should in general + be considered a bug (e.g. due to insufficient cancellation points in computationally-heavy code). +""" +const CANCEL_REQUEST_ABANDON_ALL = CancellationRequest(0x4) + +""" + CANCEL_REQUEST_YIELD + +Request that the task yield to the scheduler at the next cancellation point to +allow another task to run its cancellation propagation logic. The cancelled task +itself will reset to ordinary operation before yielding, but may of course be +canceled by said other task before it resumes operation. +""" +const CANCEL_REQUEST_YIELD = CancellationRequest(0x5) + + +function Base.showerror(io::IO, cr::CancellationRequest) + print(io, "CancellationRequest: ") + if cr === CANCEL_REQUEST_SAFE + print(io, "Safe Cancellation (CANCEL_REQUEST_SAFE)") + else + print(io, "Unknown ($(cr.request))") + end +end + +function conform_cancellation_request(@nospecialize(cr)) + if isa(cr, UInt8) + return CancellationRequest(cr) + end + return cr +end + +# This is the slow path of @cancel_check +@noinline function handle_cancellation!(@nospecialize(_req)) + req = conform_cancellation_request(_req) + if req === CANCEL_REQUEST_YIELD + @atomicreplace :sequentially_consistent :monotonic current_task().cancellation_request _req => nothing + yield() + req = cancellation_request() + end + req === nothing && return + throw(req) +end + +function cancellation_request_raw() + ct = current_task() + req = @atomic :monotonic ct.cancellation_request + req === nothing && return req + req = @atomic :acquire ct.cancellation_request + return req +end + +""" + cancellation_request() + +Returns the cancellation request for the current task or `nothing` if no +cancellation has been requested. If a cancellation request is present, it is +loaded with acquire semantics. +""" +function cancellation_request() + cr = cancellation_request_raw() + return conform_cancellation_request(cr) +end + +""" + cancellation_request_or_yield() + +Like [`cancellation_request`](@ref), but specifically handles CANCEL_REQUEST_YIELD +by calling yield internally and re-checking for cancellation requests. +""" +function cancellation_request_or_yield() + while true + _cr = cancellation_request_raw() + cr = conform_cancellation_request(_cr) + cr !== CANCEL_REQUEST_YIELD && return cr + @atomicreplace :sequentially_consistent :monotonic current_task().cancellation_request _cr => nothing + yield() + end +end + +""" + cancellation_request_or_yield() + +Like [`cancellation_request_or_yield`](@ref), but indicates the caller is about to sleep, +so yield requests can be ignored. Additionally, contains necessary synchronization to +ensure that either the cancellation request is visible, or that any potential +cancellation task will see the wait object established by the caller. + +Precondition: The caller must have established a wait object in `current_task().queue`. +""" +function pre_sleep_cancellation_request() + #@assert (@atomic :monotonic current_task().queue) !== nothing + + # Synchronize with atomic_fence_heavy in cancel! + Threads.atomic_fence_light() + + while true + _cr = cancellation_request_raw() + cr = conform_cancellation_request(_cr) + cr !== CANCEL_REQUEST_YIELD && return cr + @atomicreplace :sequentially_consistent :monotonic current_task().cancellation_request _cr => nothing + # The caller is about to sleep, so we are permitted to ignore the yield request. + end +end + +""" + Core.cancellation_point!() + +Like [`cancellation_request`](@ref), but additionally gives the optimizer license +to establish this point as a cancellation reset point. If safe to do, the runtime +will attempt to unwind execution to the nearest preceeding cancellation point +when a cancellation is requested. +""" +Core.cancellation_point! + +function cancel!(t::Task, crequest=CANCEL_REQUEST_SAFE) + # TODO: Raise task priority + @atomic :release t.cancellation_request = crequest + Threads.atomic_fence_heavy() + # Special case: If the task hasn't started yet at this point, we want to set + # it up to cancel any waits, but we need to be a bit careful with concurrent + # starts of the task. + if !istaskstarted(t) + t.result = crequest + t._isexception = true + if (@atomicreplace :sequentially_consistent :monotonic t._state task_state_runnable => task_state_cancelled).success + lock(t.donenotify) + notify(t.donenotify) + unlock(t.donenotify) + end + return + end + # Try to interrupt the task. The barrier above synchronizes with the establishment + # of a wait object and guarantees that either: + # 1. We have the wait object in t.queue, or + # 2. The task saw the cancellation and called (a different method of) cancel_wait! + # itself. + # Note that it is possible for both to be true, in which case the task wins + # and our call to cancel_wait! is will no-op after acquiring the waitee lock. + # + # Additionally, if there is no wait object, either + # 1. The task is suspended, but not using our wait object protocol. + # In this case, cancellation will not succeed. + # 2. The task is running. + # + # We can't tell the difference, but we unconditionally try to send the cancellation + # signal. If a reset_ctx exists, this will cause the task to be interrupted. + tid = Threads.threadid(t) + if !istaskdone(t) + waitee = t.queue + if waitee !== nothing + invokelatest(cancel_wait!, waitee, t, crequest) + elseif tid != 0 + ccall(:jl_send_cancellation_signal, Cvoid, (Int16,), (tid - 1) % Int16) + end + end + if t.sticky + # If this task is sticky, it won't be able to run if the task currently + # running on its thread is blocking. Use the cancellation mechanism to + # try and pre-empt that task. + # N.B.: This is a best-effort attempt; the task we end up with may get + # descheduled before we get around to cancelling it. However, that's + # fine - it's not a correctness issue to deschedule the task. The + # important thing is that the thread re-enter the scheduler to pick up + # our cancelled task. + # In the future, we may want to use the same mechanism for more general + # pre-emption, but this helps avoid situations where tasks that have + # cancellation points, but no yield points become uncancellable. + tid = Threads.threadid(t) + if tid != 0 + ccall(:jl_preempt_thread_task, Cvoid, (Int16,), (tid - 1) % Int16) + end + end +end + +function cancel_wait!(q::StickyWorkqueue, t::Task, @nospecialize(creq)) + # Tasks in the workqueue are runnable - we do not cancel the wait, + # but we do need to check whether it's in there + lock(q.lock) + try + return (t in q.queue) + finally + unlock(q.lock) + end +end + +""" + Base.reset_cancellation!() + +Resets the cancellation status of the current task. +This should only be used from the root task after normal operation has been +resumed (e.g. by returning control to the user). +""" +function reset_cancellation!() + ct = current_task() + @assert ct === roottask + @atomic :release ct.cancellation_request = nothing +end + +function propagate_cancellation!(t::Task, crequest) + if crequest != CANCEL_REQUEST_SAFE + error("Not yet supported") + end + cancel!(t, crequest) + _wait(t; expected_cancellation=crequest) +end + +@noinline function sync_cancel!(c::Channel{Any}, t::Task, @nospecialize(cr), c_ex::CompositeException) + if cr !== CANCEL_REQUEST_SAFE + error("Not yet supported") + end + waitees = Any[t] + cancel!(t, cr) + while isready(c) + r = take!(c) + cancel!(r, cr) + push!(waitees, r) + end + close(c) + for r in waitees + if isa(r, Task) + _wait(r; expected_cancellation=cr) + if istaskfailed(r) + push!(c_ex, TaskFailedException(r)) + end + else + try + wait(r) + catch e + push!(c_ex, e) + end + end + end + if !isempty(c_ex) + throw(c_ex) + end + return nothing +end diff --git a/deps/libuv.version b/deps/libuv.version index f80cde8964237..57201f6a82210 100644 --- a/deps/libuv.version +++ b/deps/libuv.version @@ -5,5 +5,5 @@ LIBUV_JLL_NAME := LibUV ## source build LIBUV_VER := 2 -LIBUV_BRANCH=julia-uv2-1.48.0 -LIBUV_SHA1=b21d6d84e46f6c97ecbc8e4e8a8ea6ad98049ea8 +LIBUV_BRANCH=kf/julia-writecancel +LIBUV_SHA1=89b5f68b38698a4151f35b10ba9380363614097a diff --git a/doc/src/base/multi-threading.md b/doc/src/base/multi-threading.md index 88dc2b7514a2a..0b7d545e54d08 100644 --- a/doc/src/base/multi-threading.md +++ b/doc/src/base/multi-threading.md @@ -50,6 +50,8 @@ Base.Threads.atomic_xor! Base.Threads.atomic_max! Base.Threads.atomic_min! Base.Threads.atomic_fence +Base.Threads.atomic_fence_heavy +Base.Threads.atomic_fence_light ``` ## ccall using a libuv threadpool (Experimental) diff --git a/src/Makefile b/src/Makefile index 832f0b0ae71e4..11d17718785d9 100644 --- a/src/Makefile +++ b/src/Makefile @@ -84,6 +84,7 @@ CODEGEN_SRCS := codegen jitlayers aotcompile debuginfo disasm llvm-simdloop \ llvm-pass-helpers llvm-ptls llvm-propagate-addrspaces \ llvm-multiversioning llvm-alloc-opt llvm-alloc-helpers cgmemmgr llvm-remove-addrspaces \ llvm-remove-ni llvm-julia-licm llvm-demote-float16 llvm-cpufeatures llvm-expand-atomic-modify \ + llvm-cancellation-lowering \ pipeline llvm_api \ $(GC_CODEGEN_SRCS) FLAGS_COMMON += -I$(shell $(LLVM_CONFIG_HOST) --includedir) diff --git a/src/ast.c b/src/ast.c index d6e3893751c9f..f513ef34b01eb 100644 --- a/src/ast.c +++ b/src/ast.c @@ -319,6 +319,8 @@ void jl_init_common_symbols(void) jl_atomic_sym = jl_symbol("atomic"); jl_not_atomic_sym = jl_symbol("not_atomic"); jl_unordered_sym = jl_symbol("unordered"); + jl_singlethread_sym = jl_symbol("singlethread"); + jl_system_sym = jl_symbol("system"); jl_monotonic_sym = jl_symbol("monotonic"); jl_acquire_sym = jl_symbol("acquire"); jl_release_sym = jl_symbol("release"); diff --git a/src/builtin_proto.h b/src/builtin_proto.h index ff634149a06f6..ee0d8fca8cd8a 100644 --- a/src/builtin_proto.h +++ b/src/builtin_proto.h @@ -76,6 +76,7 @@ extern "C" { XX(tuple,"tuple") \ XX(typeassert,"typeassert") \ XX(typeof,"typeof") \ + XX(cancellation_point,"cancellation_point!") #define DECLARE_BUILTIN(cname,jlname) \ JL_CALLABLE(jl_f_##cname); diff --git a/src/builtins.c b/src/builtins.c index 7b29580c76086..80ca3f3c23a1e 100644 --- a/src/builtins.c +++ b/src/builtins.c @@ -2496,6 +2496,16 @@ JL_CALLABLE(jl_f_intrinsic_call) abort(); } +JL_CALLABLE(jl_f_cancellation_point) +{ + JL_NARGS(cancellation_point, 0, 0); + jl_task_t *ct = jl_current_task; + jl_value_t *cr = jl_atomic_load_relaxed(&ct->cancellation_request); + if (cr == NULL || cr == jl_nothing) + return jl_nothing; + return jl_atomic_load_acquire(&ct->cancellation_request); +} + JL_DLLEXPORT const char *jl_intrinsic_name(int f) { switch ((enum intrinsic)f) { diff --git a/src/codegen.cpp b/src/codegen.cpp index ed427f999e8e3..4a019c8412c8e 100644 --- a/src/codegen.cpp +++ b/src/codegen.cpp @@ -1197,6 +1197,17 @@ static const auto jl_write_barrier_func = new JuliaFunction<>{ }, }; +static const auto jl_cancellation_point_func = new JuliaFunction<>{ + "julia.cancellation_point", + [](LLVMContext &C) { + return FunctionType::get(getInt32Ty(C), {}, false); + }, + [](LLVMContext &C) { return AttributeList::get(C, + Attributes(C, {Attribute::ReturnsTwice}), + AttributeSet(), + None); } +}; + static const auto jlisa_func = new JuliaFunction<>{ XSTR(jl_isa), [](LLVMContext &C) { @@ -1889,6 +1900,7 @@ class jl_codectx_t { int nargs = 0; int nvargs = -1; bool is_opaque_closure = false; + ssize_t current_stmt_idx = -1; // current statement index for ssaflags lookup Value *pgcstack = NULL; Instruction *topalloca = NULL; @@ -2074,6 +2086,28 @@ static Value *emit_ptrgep(jl_codectx_t &ctx, Value *base, size_t byte_offset, co return gep; } +// Check if the current statement has the reset_safe flag set +static bool current_stmt_is_reset_safe(jl_codectx_t &ctx) +{ + if (ctx.current_stmt_idx < 0 || ctx.source == nullptr || ctx.source->ssaflags == nullptr) + return false; + size_t nstmts = jl_array_dim0(ctx.source->ssaflags); + if ((size_t)ctx.current_stmt_idx >= nstmts) + return false; + uint32_t flag = jl_array_data(ctx.source->ssaflags, uint32_t)[ctx.current_stmt_idx]; + return (flag & IR_FLAG_RESET_SAFE) != 0; +} + +// Mark a call instruction with reset_safe metadata if the current statement has the flag +static void mark_reset_safe(jl_codectx_t &ctx, CallInst *call) +{ + if (call && current_stmt_is_reset_safe(ctx)) { + LLVMContext &llvmctx = ctx.builder.getContext(); + MDNode *md = MDNode::get(llvmctx, None); + call->setMetadata("julia.reset_safe", md); + } +} + static Value *emit_ptrgep(jl_codectx_t &ctx, Value *base, Value *byte_offset, const Twine &Name="") { auto *gep = ctx.builder.CreateInBoundsGEP(getInt8Ty(ctx.builder.getContext()), base, byte_offset, Name); @@ -4943,6 +4977,51 @@ static bool emit_builtin_call(jl_codectx_t &ctx, jl_cgval_t *ret, jl_value_t *f, return true; } + else if (f == BUILTIN(cancellation_point) && nargs == 0) { + // Emit the cancellation point intrinsic call first + ctx.builder.CreateCall(prepare_call(jl_cancellation_point_func)); + + // Now do the same as the runtime version: + // 1. Load cancellation_request with relaxed ordering for fast path check + Value *ct = get_current_task(ctx); + Value *cr_ptr = emit_ptrgep(ctx, ct, offsetof(jl_task_t, cancellation_request), "cancellation_request"); + jl_aliasinfo_t ai = jl_aliasinfo_t::fromTBAA(ctx, ctx.tbaa().tbaa_gcframe); + LoadInst *cr_relaxed = ctx.builder.CreateAlignedLoad(ctx.types().T_prjlvalue, cr_ptr, ctx.types().alignof_ptr); + cr_relaxed->setOrdering(AtomicOrdering::Monotonic); + ai.decorateInst(cr_relaxed); + + // 2. Check if cr == NULL || cr == jl_nothing + Value *is_null = ctx.builder.CreateIsNull(cr_relaxed); + Value *nothing_val = track_pjlvalue(ctx, literal_pointer_val(ctx, jl_nothing)); + Value *is_nothing = ctx.builder.CreateICmpEQ(decay_derived(ctx, cr_relaxed), decay_derived(ctx, nothing_val)); + Value *no_cancel = ctx.builder.CreateOr(is_null, is_nothing); + + // Save current basic block before branching + BasicBlock *currBB = ctx.builder.GetInsertBlock(); + + // Create basic blocks for the branch + BasicBlock *has_cancel_bb = BasicBlock::Create(ctx.builder.getContext(), "has_cancellation", ctx.f); + BasicBlock *merge_bb = BasicBlock::Create(ctx.builder.getContext(), "cancellation_merge", ctx.f); + + ctx.builder.CreateCondBr(no_cancel, merge_bb, has_cancel_bb); + + // In the has_cancel case, do an acquire load + ctx.builder.SetInsertPoint(has_cancel_bb); + LoadInst *cr_acquire = ctx.builder.CreateAlignedLoad(ctx.types().T_prjlvalue, cr_ptr, ctx.types().alignof_ptr); + cr_acquire->setOrdering(AtomicOrdering::Acquire); + ai.decorateInst(cr_acquire); + ctx.builder.CreateBr(merge_bb); + + // Merge the results + ctx.builder.SetInsertPoint(merge_bb); + PHINode *result = ctx.builder.CreatePHI(ctx.types().T_prjlvalue, 2); + result->addIncoming(nothing_val, currBB); + result->addIncoming(cr_acquire, has_cancel_bb); + + *ret = mark_julia_type(ctx, result, /*boxed*/ true, rt); + return true; + } + return false; } @@ -4971,6 +5050,8 @@ static CallInst *emit_jlcall(jl_codectx_t &ctx, Value *theFptr, Value *theF, } CallInst *result = ctx.builder.CreateCall(TheTrampoline, theArgs); result->setAttributes(TheTrampoline->getAttributes()); + // Mark as reset_safe if the current statement has that flag + mark_reset_safe(ctx, result); // TODO: we could add readonly attributes in many cases to the args return result; } @@ -5085,6 +5166,8 @@ static jl_cgval_t emit_call_specfun_other(jl_codectx_t &ctx, bool is_opaque_clos call->setAttributes(returninfo.attrs); if (gcstack_arg && ctx.emission_context.use_swiftcc) call->setCallingConv(CallingConv::Swift); + // Mark as reset_safe if the current statement has that flag + mark_reset_safe(ctx, call); jl_cgval_t retval; switch (returninfo.cc) { @@ -9411,7 +9494,9 @@ static jl_llvm_functions_t } } else { + ctx.current_stmt_idx = cursor; emit_stmtpos(ctx, stmt, cursor); + ctx.current_stmt_idx = -1; mallocVisitStmt(nullptr, have_dbg_update); } find_next_stmt(cursor + 1); diff --git a/src/intrinsics.cpp b/src/intrinsics.cpp index ae25c3cc83ca5..4791d439e2ad0 100644 --- a/src/intrinsics.cpp +++ b/src/intrinsics.cpp @@ -915,6 +915,15 @@ static jl_cgval_t emit_pointerarith(jl_codectx_t &ctx, intrinsic f, static jl_cgval_t emit_atomicfence(jl_codectx_t &ctx, ArrayRef argv) { const jl_cgval_t &ord = argv[0]; + const jl_cgval_t &ssid_arg = argv[1]; + llvm::SyncScope::ID ssid = llvm::SyncScope::System; + if (!ssid_arg.constant || !jl_is_symbol(ssid_arg.constant) || + ((jl_sym_t*)ssid_arg.constant != jl_singlethread_sym && + (jl_sym_t*)ssid_arg.constant != jl_system_sym)) { + return emit_runtime_call(ctx, atomic_fence, argv, 2); + } + if ((jl_sym_t*)ssid_arg.constant == jl_singlethread_sym) + ssid = llvm::SyncScope::SingleThread; if (ord.constant && jl_is_symbol(ord.constant)) { enum jl_memory_order order = jl_get_atomic_order((jl_sym_t*)ord.constant, true, true); if (order == jl_memory_order_invalid) { @@ -922,10 +931,10 @@ static jl_cgval_t emit_atomicfence(jl_codectx_t &ctx, ArrayRef argv) return jl_cgval_t(); // unreachable } if (order > jl_memory_order_monotonic) - ctx.builder.CreateFence(get_llvm_atomic_order(order)); + ctx.builder.CreateFence(get_llvm_atomic_order(order), ssid); return ghostValue(ctx, jl_nothing_type); } - return emit_runtime_call(ctx, atomic_fence, argv, 1); + return emit_runtime_call(ctx, atomic_fence, argv, 2); } static jl_cgval_t emit_atomic_pointerref(jl_codectx_t &ctx, ArrayRef argv) @@ -1339,7 +1348,7 @@ static jl_cgval_t emit_intrinsic(jl_codectx_t &ctx, intrinsic f, jl_value_t **ar case atomic_fence: ++Emitted_atomic_fence; - assert(nargs == 1); + assert(nargs == 2); return emit_atomicfence(ctx, argv); case atomic_pointerref: ++Emitted_atomic_pointerref; diff --git a/src/intrinsics.h b/src/intrinsics.h index 5765e3e671bc6..4fd5630afa38f 100644 --- a/src/intrinsics.h +++ b/src/intrinsics.h @@ -95,7 +95,7 @@ ADD_I(pointerref, 3) \ ADD_I(pointerset, 4) \ /* pointer atomics */ \ - ADD_I(atomic_fence, 1) \ + ADD_I(atomic_fence, 2) \ ADD_I(atomic_pointerref, 2) \ ADD_I(atomic_pointerset, 3) \ ADD_I(atomic_pointerswap, 3) \ diff --git a/src/jl_exported_funcs.inc b/src/jl_exported_funcs.inc index d0cfb16062b7d..1a0ff14a82b55 100644 --- a/src/jl_exported_funcs.inc +++ b/src/jl_exported_funcs.inc @@ -506,6 +506,7 @@ XX(jl_vprintf) \ XX(jl_wakeup_thread) \ XX(jl_write_compiler_output) \ + XX(jl_membarrier) \ #define JL_RUNTIME_EXPORTED_FUNCS_WIN(XX) \ XX(jl_setjmp) \ diff --git a/src/jl_uv.c b/src/jl_uv.c index e41b896320693..5a45f9ff998d6 100644 --- a/src/jl_uv.c +++ b/src/jl_uv.c @@ -677,14 +677,14 @@ JL_DLLEXPORT int jl_fs_close(uv_os_fd_t handle) } JL_DLLEXPORT int jl_uv_write(uv_stream_t *stream, const char *data, size_t n, - uv_write_t *uvw, uv_write_cb writecb) + uv_write_t *uvw, uv_write3_cb writecb) { uv_buf_t buf[1]; buf[0].base = (char*)data; buf[0].len = n; JL_UV_LOCK(); JL_SIGATOMIC_BEGIN(); - int err = uv_write(uvw, stream, buf, 1, writecb); + int err = uv_write3(uvw, stream, buf, 1, NULL, 0, writecb); JL_UV_UNLOCK(); JL_SIGATOMIC_END(); return err; diff --git a/src/jltypes.c b/src/jltypes.c index db75be1c9db0a..e80bd6be644c8 100644 --- a/src/jltypes.c +++ b/src/jltypes.c @@ -3767,7 +3767,7 @@ void jl_init_types(void) JL_GC_DISABLED NULL, jl_any_type, jl_emptysvec, - jl_perm_symsvec(27, + jl_perm_symsvec(28, "next", "queue", "storage", @@ -3794,8 +3794,9 @@ void jl_init_types(void) JL_GC_DISABLED "first_enqueued_at", "last_started_running_at", "running_time_ns", - "finished_at"), - jl_svec(27, + "finished_at", + "cancellation_request"), + jl_svec(28, jl_any_type, jl_any_type, jl_any_type, @@ -3822,16 +3823,16 @@ void jl_init_types(void) JL_GC_DISABLED jl_uint64_type, jl_uint64_type, jl_uint64_type, - jl_uint64_type), + jl_uint64_type, + jl_any_type), jl_emptysvec, 0, 1, 6); XX(task); jl_value_t *listt = jl_new_struct(jl_uniontype_type, jl_task_type, jl_nothing_type); - jl_svecset(jl_task_type->types, 0, listt); // Set field 20 (metrics_enabled) as const - // Set fields 8 (_state) and 24-27 (metric counters) as atomic + // Set fields 8 (_state) and 24-27 (metric counters), 28 (cancellation_request) as atomic const static uint32_t task_constfields[1] = { 0b00000000000010000000000000000000 }; - const static uint32_t task_atomicfields[1] = { 0b00000111100000000000000010000000 }; + const static uint32_t task_atomicfields[1] = { 0b00001111100000000000000010000000 }; jl_task_type->name->constfields = task_constfields; jl_task_type->name->atomicfields = task_atomicfields; diff --git a/src/julia.h b/src/julia.h index 9513dfb47941d..18d549d70d20d 100644 --- a/src/julia.h +++ b/src/julia.h @@ -2324,6 +2324,8 @@ JL_DLLEXPORT jl_task_t *jl_new_task(jl_value_t*, jl_value_t*, size_t); JL_DLLEXPORT void jl_switchto(jl_task_t **pt); JL_DLLEXPORT int jl_set_task_tid(jl_task_t *task, int16_t tid) JL_NOTSAFEPOINT; JL_DLLEXPORT int jl_set_task_threadpoolid(jl_task_t *task, int8_t tpid) JL_NOTSAFEPOINT; +JL_DLLEXPORT void jl_preempt_thread_task(int16_t tid); +JL_DLLEXPORT void jl_send_cancellation_signal(int16_t tid); JL_DLLEXPORT void JL_NORETURN jl_throw(jl_value_t *e JL_MAYBE_UNROOTED); JL_DLLEXPORT void JL_NORETURN jl_rethrow(void); JL_DLLEXPORT void JL_NORETURN jl_rethrow_other(jl_value_t *e JL_MAYBE_UNROOTED); diff --git a/src/julia_internal.h b/src/julia_internal.h index 94036981f950a..c2e6aa7634983 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -1689,7 +1689,7 @@ STATIC_INLINE int is_valid_intrinsic_elptr(jl_value_t *ety) JL_DLLEXPORT jl_value_t *jl_bitcast(jl_value_t *ty, jl_value_t *v); JL_DLLEXPORT jl_value_t *jl_pointerref(jl_value_t *p, jl_value_t *i, jl_value_t *align); JL_DLLEXPORT jl_value_t *jl_pointerset(jl_value_t *p, jl_value_t *x, jl_value_t *align, jl_value_t *i); -JL_DLLEXPORT jl_value_t *jl_atomic_fence(jl_value_t *order); +JL_DLLEXPORT jl_value_t *jl_atomic_fence(jl_value_t *order, jl_value_t *syncscope); JL_DLLEXPORT jl_value_t *jl_atomic_pointerref(jl_value_t *p, jl_value_t *order); JL_DLLEXPORT jl_value_t *jl_atomic_pointerset(jl_value_t *p, jl_value_t *x, jl_value_t *order); JL_DLLEXPORT jl_value_t *jl_atomic_pointerswap(jl_value_t *p, jl_value_t *x, jl_value_t *order); @@ -2010,6 +2010,8 @@ JL_DLLEXPORT int jl_isabspath(const char *in) JL_NOTSAFEPOINT; XX(uninferred_sym) \ XX(unordered_sym) \ XX(unused_sym) \ + XX(singlethread_sym) \ + XX(system_sym) #define XX(name) extern JL_DLLEXPORT jl_sym_t *jl_##name; JL_COMMON_SYMBOLS(XX) @@ -2085,6 +2087,7 @@ JL_DLLEXPORT uint32_t jl_crc32c(uint32_t crc, const char *buf, size_t len); // -- exports from codegen -- // #define IR_FLAG_INBOUNDS 0x01 +#define IR_FLAG_RESET_SAFE (1 << 14) JL_DLLIMPORT void jl_generate_fptr_for_unspecialized(jl_code_instance_t *unspec); JL_DLLIMPORT int jl_compile_codeinst(jl_code_instance_t *unspec); diff --git a/src/julia_locks.h b/src/julia_locks.h index a4b5fd96b8fb4..4fe75543ad6b9 100644 --- a/src/julia_locks.h +++ b/src/julia_locks.h @@ -104,6 +104,7 @@ static inline void jl_mutex_init(jl_mutex_t *lock, const char *name) JL_NOTSAFEP #define JL_LOCK(m) jl_mutex_lock(m) #define JL_UNLOCK(m) jl_mutex_unlock(m) #define JL_LOCK_NOGC(m) jl_mutex_lock_nogc(m) +#define JL_TRYLOCK_NOGC(m) jl_mutex_trylock_nogc(m) #define JL_UNLOCK_NOGC(m) jl_mutex_unlock_nogc(m) JL_DLLEXPORT void jl_lock_value(jl_mutex_t *v) JL_NOTSAFEPOINT; diff --git a/src/julia_threads.h b/src/julia_threads.h index 364931e43d2e9..df6a095baf4ec 100644 --- a/src/julia_threads.h +++ b/src/julia_threads.h @@ -225,8 +225,11 @@ typedef struct _jl_handler_t jl_handler_t; typedef struct _jl_task_t { JL_DATA_TYPE - jl_value_t *next; // invasive linked list for scheduler - jl_value_t *queue; // invasive linked list for scheduler + // This invasive linked list is used by the scheduler. The fields are protected + // by the lock of the parent object of the containing queue. + jl_value_t *next; + jl_value_t *queue; + jl_value_t *tls; jl_value_t *donenotify; jl_value_t *result; @@ -252,6 +255,9 @@ typedef struct _jl_task_t { // timestamp this task finished (i.e. entered state DONE or FAILED). _Atomic(uint64_t) finished_at; + // Cancellation request - can be an arbitary julia value, but the runtime recognizes + // CANCEL_REQUEST_ enum values. + _Atomic(jl_value_t *) cancellation_request; // hidden state: // id of owning thread - does not need to be defined until the task runs @@ -280,6 +286,9 @@ typedef struct _jl_task_t { jl_handler_t *eh; // saved thread state jl_ucontext_t ctx; // pointer into stkbuf, if suspended + // current reset point for cancellation. Technically, we only need volatile + // here, but _Atomic makes the intent clearer. + volatile _jl_ucontext_t *reset_ctx; } jl_task_t; JL_DLLEXPORT void *jl_get_ptls_states(void); diff --git a/src/llvm-cancellation-lowering.cpp b/src/llvm-cancellation-lowering.cpp new file mode 100644 index 0000000000000..9d589f59045eb --- /dev/null +++ b/src/llvm-cancellation-lowering.cpp @@ -0,0 +1,311 @@ +// This file is a part of Julia. License is MIT: https://julialang.org/license + +// This pass lowers the julia.cancellation_point intrinsic to: +// 1. A stack buffer allocation for the jl_ucontext_t +// 2. A setjmp call on the uc_mcontext field +// 3. Assignment of the buffer address to task->reset_ctx (atomic release) +// +// It also walks the function to find stores/calls without julia.reset_safe +// metadata and inserts reset_ctx = NULL before them. + +#include "llvm-version.h" +#include "passes.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "llvm-codegen-shared.h" +#include "llvm-pass-helpers.h" +#include "julia.h" +#include "julia_internal.h" +#include "julia_threads.h" + +#define DEBUG_TYPE "cancellation_lowering" + +STATISTIC(CancellationPointsLowered, "Number of cancellation points lowered"); +STATISTIC(ResetCtxClearsInserted, "Number of reset_ctx clears inserted"); + +using namespace llvm; + +// Check if an instruction has the julia.reset_safe metadata +static bool hasResetSafeMetadata(Instruction *I) { + return I->getMetadata("julia.reset_safe") != nullptr; +} + +struct CancellationLowering { + Function *cancel_point_func; + Value *pgcstack; + Value *reset_ctx_ptr; // Computed once in entry block, dominates all uses + + CancellationLowering(Module &M) : cancel_point_func(nullptr), pgcstack(nullptr), reset_ctx_ptr(nullptr) { + cancel_point_func = M.getFunction("julia.cancellation_point"); + } + + bool runOnFunction(Function &F); + +private: + // Compute reset_ctx_ptr once in entry block + // If insertAfter is provided, insert after that instruction + // Otherwise insert at the beginning of the entry block (after allocas) + void computeResetCtxPtr(Function &F, Instruction *insertAfter); +}; + +void CancellationLowering::computeResetCtxPtr(Function &F, Instruction *insertAfter) { + if (!pgcstack) + return; + + LLVMContext &LLVMCtx = F.getContext(); + Type *I8Ty = Type::getInt8Ty(LLVMCtx); + Type *I64Ty = Type::getInt64Ty(LLVMCtx); + + IRBuilder<> Builder(LLVMCtx); + if (insertAfter) { + // Insert right after pgcstack call + Builder.SetInsertPoint(insertAfter->getNextNode()); + } else { + // pgcstack is an argument, insert at the start of entry block + // but after any allocas + BasicBlock &entry = F.getEntryBlock(); + BasicBlock::iterator insertPt = entry.begin(); + while (insertPt != entry.end() && isa(&*insertPt)) { + ++insertPt; + } + Builder.SetInsertPoint(&entry, insertPt); + } + + // Get the offset of gcstack in jl_task_t + size_t gcstack_offset = offsetof(jl_task_t, gcstack); + + Value *task_ptr = Builder.CreateGEP(I8Ty, pgcstack, + ConstantInt::get(I64Ty, -(int64_t)gcstack_offset), + "current_task"); + + // Get pointer to reset_ctx field in current task + size_t reset_ctx_offset = offsetof(jl_task_t, reset_ctx); + + reset_ctx_ptr = Builder.CreateGEP(I8Ty, task_ptr, + ConstantInt::get(I64Ty, reset_ctx_offset), + "reset_ctx_ptr"); +} + +bool CancellationLowering::runOnFunction(Function &F) { + // Skip if there's no cancellation_point function in the module + if (!cancel_point_func) + return false; + + bool Changed = false; + + // Find pgcstack - either as a call to julia.get_pgcstack or as an argument with "gcstack" attribute + pgcstack = nullptr; + reset_ctx_ptr = nullptr; + Instruction *pgcstack_inst = nullptr; // Only set if pgcstack is from a call, not an argument + Function *pgcstack_getter = F.getParent()->getFunction("julia.get_pgcstack"); + Function *adoptthread_func = F.getParent()->getFunction("julia.get_pgcstack_or_new"); + if (pgcstack_getter || adoptthread_func) { + for (auto &I : F.getEntryBlock()) { + if (CallInst *callInst = dyn_cast(&I)) { + Value *callee = callInst->getCalledOperand(); + if ((pgcstack_getter && callee == pgcstack_getter) || + (adoptthread_func && callee == adoptthread_func)) { + pgcstack = callInst; + pgcstack_inst = callInst; + break; + } + } + } + } + // If not found via call, check for argument with "gcstack" attribute + if (!pgcstack) { + for (auto &arg : F.args()) { + AttributeSet attrs = F.getAttributes().getParamAttrs(arg.getArgNo()); + if (attrs.hasAttribute("gcstack")) { + pgcstack = &arg; + break; + } + } + } + + // First, find all cancellation_point intrinsics + SmallVector CancellationPoints; + + for (auto &BB : F) { + for (auto &I : BB) { + if (auto *CI = dyn_cast(&I)) { + Value *callee = CI->getCalledOperand(); + if (callee && callee == cancel_point_func) { + CancellationPoints.push_back(CI); + } + } + } + } + + if (CancellationPoints.empty()) { + return false; + } + + // Compute reset_ctx_ptr once in entry block (dominates all uses) + // pgcstack_inst is set when pgcstack comes from a call; null when it's an argument + computeResetCtxPtr(F, pgcstack_inst); + + // Lower each cancellation point + for (CallInst *CI : CancellationPoints) { + ++CancellationPointsLowered; + Changed = true; + + IRBuilder<> Builder(CI); + LLVMContext &LLVMCtx = F.getContext(); + Type *I8Ty = Type::getInt8Ty(LLVMCtx); + Type *I32Ty = Type::getInt32Ty(LLVMCtx); + Type *PtrTy = PointerType::getUnqual(LLVMCtx); + + if (!reset_ctx_ptr) { + // Can't lower without access to the task, just remove the intrinsic + CI->replaceAllUsesWith(ConstantInt::get(I32Ty, 0)); + CI->eraseFromParent(); + continue; + } + + // Allocate a _jl_ucontext_t on the stack + const size_t UContextSize = sizeof(_jl_ucontext_t); + const size_t UContextAlign = alignof(_jl_ucontext_t); + + // Create the alloca at the start of the function + IRBuilder<> AllocaBuilder(&F.getEntryBlock().front()); + Type *UContextTy = ArrayType::get(I8Ty, UContextSize); + AllocaInst *UContextBuf = AllocaBuilder.CreateAlloca(UContextTy, nullptr, "cancel_ucontext"); + UContextBuf->setAlignment(Align(UContextAlign)); + + // Store the ucontext address to reset_ctx with atomic release ordering + StoreInst *store = Builder.CreateAlignedStore(UContextBuf, reset_ctx_ptr, Align(sizeof(void*))); + store->setOrdering(AtomicOrdering::Release); + + // Call setjmp on the uc_mcontext field (which is at offset 0 of the struct) + // Use the platform-specific setjmp function name defined in julia.h + FunctionType *SetjmpTy = FunctionType::get(I32Ty, {PtrTy}, false); + FunctionCallee SetjmpFn = F.getParent()->getOrInsertFunction(jl_setjmp_name, SetjmpTy); + + CallInst *SetjmpCall = Builder.CreateCall(SetjmpFn, {UContextBuf}); + SetjmpCall->addFnAttr(Attribute::ReturnsTwice); + + // Replace uses and remove the intrinsic + CI->replaceAllUsesWith(SetjmpCall); + CI->eraseFromParent(); + } + + // Now walk the function to find stores/calls without reset_safe metadata + // and insert reset_ctx = NULL before them + SmallVector UnsafePoints; + + // We need to skip instructions that are part of our setup (pgcstack, task, reset_ctx_ptr) + // since they occur before reset_ctx_ptr is available + Instruction *reset_ctx_ptr_inst = dyn_cast_or_null(reset_ctx_ptr); + + for (auto &BB : F) { + bool past_setup = (&BB != &F.getEntryBlock()); + for (auto &I : BB) { + // In the entry block, skip instructions until after reset_ctx_ptr is defined + if (!past_setup) { + if (&I == reset_ctx_ptr_inst) { + past_setup = true; + } + continue; + } + + // Check for stores (but skip the reset_ctx stores we just created) + if (auto *SI = dyn_cast(&I)) { + if (!hasResetSafeMetadata(SI)) { + // Skip atomic stores (including reset_ctx stores we just created) + if (SI->isAtomic()) + continue; + UnsafePoints.push_back(SI); + } + } + // Check for calls (but not debug intrinsics, lifetime markers, etc.) + else if (auto *CI = dyn_cast(&I)) { + // Skip debug intrinsics and other harmless intrinsics + if (isa(CI)) + continue; + if (CI->isLifetimeStartOrEnd()) + continue; + + // Check for reset_safe metadata + if (!hasResetSafeMetadata(CI)) { + // Also skip intrinsic calls that are known safe + Function *Callee = CI->getCalledFunction(); + if (Callee && Callee->isIntrinsic()) { + Intrinsic::ID ID = Callee->getIntrinsicID(); + if (ID == Intrinsic::lifetime_start || + ID == Intrinsic::lifetime_end || + ID == Intrinsic::dbg_declare || + ID == Intrinsic::dbg_value || + ID == Intrinsic::dbg_label || + ID == Intrinsic::assume || + ID == Intrinsic::expect || + ID == Intrinsic::prefetch) { + continue; + } + } + // Skip the setjmp calls we just created + if (Callee && Callee->getName() == jl_setjmp_name) + continue; + UnsafePoints.push_back(CI); + } + } + } + } + + // Insert reset_ctx = NULL before each unsafe point + for (Instruction *I : UnsafePoints) { + if (!reset_ctx_ptr) + continue; + + ++ResetCtxClearsInserted; + Changed = true; + + IRBuilder<> Builder(I); + LLVMContext &LLVMCtx = F.getContext(); + Type *PtrTy = PointerType::getUnqual(LLVMCtx); + + // Store NULL to reset_ctx with atomic release ordering + Value *null_ptr = ConstantPointerNull::get(cast(PtrTy)); + StoreInst *store = Builder.CreateAlignedStore(null_ptr, reset_ctx_ptr, Align(sizeof(void*))); + store->setOrdering(AtomicOrdering::Release); + } + + // Insert reset_ctx = NULL before all return instructions + // This is necessary because the cancel_ucontext buffer is stack-allocated, + // and becomes invalid when the function returns + if (reset_ctx_ptr) { + LLVMContext &LLVMCtx = F.getContext(); + Type *PtrTy = PointerType::getUnqual(LLVMCtx); + Value *null_ptr = ConstantPointerNull::get(cast(PtrTy)); + + for (auto &BB : F) { + if (auto *RI = dyn_cast(BB.getTerminator())) { + IRBuilder<> Builder(RI); + StoreInst *store = Builder.CreateAlignedStore(null_ptr, reset_ctx_ptr, Align(sizeof(void*))); + store->setOrdering(AtomicOrdering::Release); + ++ResetCtxClearsInserted; + } + } + } + + return Changed; +} + +PreservedAnalyses CancellationLoweringPass::run(Function &F, FunctionAnalysisManager &AM) { + CancellationLowering CL(*F.getParent()); + if (CL.runOnFunction(F)) { +#ifdef JL_VERIFY_PASSES + assert(!verifyLLVMIR(F)); +#endif + return PreservedAnalyses::allInSet(); + } + return PreservedAnalyses::all(); +} diff --git a/src/llvm-julia-passes.inc b/src/llvm-julia-passes.inc index bd223499f37af..6f860ead8af84 100644 --- a/src/llvm-julia-passes.inc +++ b/src/llvm-julia-passes.inc @@ -16,6 +16,7 @@ FUNCTION_PASS("AllocOpt", AllocOptPass()) FUNCTION_PASS("PropagateJuliaAddrspaces", PropagateJuliaAddrspacesPass()) FUNCTION_PASS("GCInvariantVerifier", GCInvariantVerifierPass()) FUNCTION_PASS("FinalLowerGC", FinalLowerGCPass()) +FUNCTION_PASS("CancellationLowering", CancellationLoweringPass()) FUNCTION_PASS("ExpandAtomicModify", ExpandAtomicModifyPass()) #endif diff --git a/src/llvm-pass-helpers.cpp b/src/llvm-pass-helpers.cpp index a6a16a7f4956c..ed288328ea908 100644 --- a/src/llvm-pass-helpers.cpp +++ b/src/llvm-pass-helpers.cpp @@ -32,7 +32,7 @@ JuliaPassContext::JuliaPassContext() gc_preserve_begin_func(nullptr), gc_preserve_end_func(nullptr), pointer_from_objref_func(nullptr), gc_loaded_func(nullptr), alloc_obj_func(nullptr), typeof_func(nullptr), write_barrier_func(nullptr), pop_handler_noexcept_func(nullptr), - call_func(nullptr), call2_func(nullptr), call3_func(nullptr), module(nullptr) + call_func(nullptr), call2_func(nullptr), call3_func(nullptr), cancel_point_func(nullptr), module(nullptr) { } @@ -61,6 +61,7 @@ void JuliaPassContext::initFunctions(Module &M) call_func = M.getFunction("julia.call"); call2_func = M.getFunction("julia.call2"); call3_func = M.getFunction("julia.call3"); + cancel_point_func = M.getFunction("julia.cancellation_point"); } void JuliaPassContext::initAll(Module &M) diff --git a/src/llvm-pass-helpers.h b/src/llvm-pass-helpers.h index d79470818c287..d9c96e6451595 100644 --- a/src/llvm-pass-helpers.h +++ b/src/llvm-pass-helpers.h @@ -64,6 +64,7 @@ struct JuliaPassContext { llvm::Function *call_func; llvm::Function *call2_func; llvm::Function *call3_func; + llvm::Function *cancel_point_func; // Creates a pass context. Type and function pointers // are set to `nullptr`. Metadata nodes are initialized. diff --git a/src/passes.h b/src/passes.h index 0c5a124ade952..e40538f8f88da 100644 --- a/src/passes.h +++ b/src/passes.h @@ -43,6 +43,11 @@ struct FinalLowerGCPass : PassInfoMixin { static bool isRequired() { return true; } }; +struct CancellationLoweringPass : PassInfoMixin { + PreservedAnalyses run(Function &F, FunctionAnalysisManager &AM) JL_NOTSAFEPOINT; + static bool isRequired() { return true; } +}; + struct ExpandAtomicModifyPass : PassInfoMixin { PreservedAnalyses run(Function &F, FunctionAnalysisManager &AM) JL_NOTSAFEPOINT; }; diff --git a/src/pipeline.cpp b/src/pipeline.cpp index 0481e04b8d19e..afd2ac90f9f6b 100644 --- a/src/pipeline.cpp +++ b/src/pipeline.cpp @@ -568,6 +568,7 @@ static void buildIntrinsicLoweringPipeline(ModulePassManager &MPM, PassBuilder * JULIA_PASS(MPM.addPass(RemoveNIPass())); { FunctionPassManager FPM; + JULIA_PASS(FPM.addPass(CancellationLoweringPass())); // Lower cancellation points to setjmp (before GC lowering) JULIA_PASS(FPM.addPass(LateLowerGCPass())); JULIA_PASS(FPM.addPass(FinalLowerGCPass())); JULIA_PASS(FPM.addPass(ExpandAtomicModifyPass())); // after LateLowerGCPass so that all IPO is valid diff --git a/src/runtime_intrinsics.c b/src/runtime_intrinsics.c index 31dd3e085033c..c2abb705fd7c6 100644 --- a/src/runtime_intrinsics.c +++ b/src/runtime_intrinsics.c @@ -622,10 +622,17 @@ JL_DLLEXPORT jl_value_t *jl_atomic_pointerreplace(jl_value_t *p, jl_value_t *exp return result; } -JL_DLLEXPORT jl_value_t *jl_atomic_fence(jl_value_t *order_sym) +JL_DLLEXPORT jl_value_t *jl_atomic_fence(jl_value_t *order_sym, jl_value_t *syncscope_sym) { JL_TYPECHK(fence, symbol, order_sym); + JL_TYPECHK(fence, symbol, syncscope_sym); enum jl_memory_order order = jl_get_atomic_order_checked((jl_sym_t*)order_sym, 1, 1); + if ((jl_sym_t*)syncscope_sym == jl_singlethread_sym) { + asm volatile ("" : : : "memory"); + return jl_nothing; + } else if ((jl_sym_t*)syncscope_sym != jl_system_sym) { + jl_error("atomic_fence: invalid syncscope"); + } if (order > jl_memory_order_monotonic) jl_fence(); return jl_nothing; diff --git a/src/signal-handling.c b/src/signal-handling.c index 1da687654dd81..f555bd475755d 100644 --- a/src/signal-handling.c +++ b/src/signal-handling.c @@ -441,6 +441,29 @@ static void jl_check_profile_autostop(void) } } +jl_mutex_t sigint_cond_lock; +static uv_async_t *sigint_cond_loc; +JL_DLLEXPORT void jl_set_sigint_cond(uv_async_t *cond) +{ + JL_LOCK_NOGC(&sigint_cond_lock); + sigint_cond_loc = cond; + JL_UNLOCK_NOGC(&sigint_cond_lock); +} + +static void deliver_sigint_notification(void) +{ + if (JL_TRYLOCK_NOGC(&sigint_cond_lock)) { + if (sigint_cond_loc != NULL) { + uv_async_send(sigint_cond_loc); + // IO only runs on one thread, which may currently be busy - try + // to preempt it, so that the IO loop has a chance to run and deliver + // this notification. + jl_preempt_thread_task(jl_atomic_load_relaxed(&io_loop_tid)); + } + JL_UNLOCK_NOGC(&sigint_cond_lock); + } +} + static void stack_overflow_warning(void) { jl_safe_printf("Warning: detected a stack overflow; program state may be corrupted, so further execution might be unreliable.\n"); diff --git a/src/signals-mach.c b/src/signals-mach.c index 1ef3e9d23094a..55a430425dce5 100644 --- a/src/signals-mach.c +++ b/src/signals-mach.c @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -891,3 +892,46 @@ JL_DLLEXPORT void jl_profile_stop_timer(void) profile_all_tasks = 0; uv_mutex_unlock(&bt_data_prof_lock); } + +// The mprotect implementation in signals-unix.c does not work on macOS/aarch64, as mentioned. +// This implementation comes from dotnet, but is similarly dependent on undocumented behavior of the OS. +// Copyright (c) .NET Foundation and Contributors +// MIT LICENSE +JL_DLLEXPORT void jl_membarrier(void) { + uintptr_t sp; + uintptr_t registerValues[128]; + kern_return_t machret; + + // Iterate through each of the threads in the list. + int nthreads = jl_atomic_load_acquire(&jl_n_threads); + for (int tid = 0; tid < nthreads; tid++) { + jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid]; + thread_act_t thread = pthread_mach_thread_np(ptls2->system_id); + if (__builtin_available (macOS 10.14, iOS 12, tvOS 9, *)) + { + // Request the threads pointer values to force the thread to emit a memory barrier + size_t registers = 128; + machret = thread_get_register_pointer_values(thread, &sp, ®isters, registerValues); + } + else + { + // fallback implementation for older OS versions +#if defined(_CPU_X86_64_) + x86_thread_state64_t threadState; + mach_msg_type_number_t count = x86_THREAD_STATE64_COUNT; + machret = thread_get_state(thread, x86_THREAD_STATE64, (thread_state_t)&threadState, &count); +#elif defined(_CPU_AARCH64_) + arm_thread_state64_t threadState; + mach_msg_type_number_t count = ARM_THREAD_STATE64_COUNT; + machret = thread_get_state(thread, ARM_THREAD_STATE64, (thread_state_t)&threadState, &count); +#else + #error Unexpected architecture +#endif + } + + if (machret == KERN_INSUFFICIENT_BUFFER_SIZE) + { + HANDLE_MACH_ERROR("thread_get_register_pointer_values()", machret); + } + } +} diff --git a/src/signals-unix.c b/src/signals-unix.c index 16e70ef0f764e..6f061af906352 100644 --- a/src/signals-unix.c +++ b/src/signals-unix.c @@ -418,7 +418,6 @@ JL_NO_ASAN static void segv_handler(int sig, siginfo_t *info, void *context) } else if (jl_safepoint_consume_sigint()) { jl_clear_force_sigint(); - jl_throw_in_ctx(ct, jl_interrupt_exception, sig, context); } return; } @@ -546,6 +545,26 @@ static void jl_try_deliver_sigint(void) pthread_mutex_unlock(&in_signal_lock); } +// Send a signal to the specified thread to longjmp to its reset_ctx if available. +// This is used for task cancellation to interrupt a running task at a safe point. +JL_DLLEXPORT void jl_send_cancellation_signal(int16_t tid) +{ + jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid]; + if (ptls2 == NULL) + return; + jl_task_t *ct = jl_atomic_load_relaxed(&ptls2->current_task); + if (ct == NULL) + return; + // Only send if the task has a reset_ctx set (i.e., is at a cancellation point) + if (ct->reset_ctx == NULL) + return; + pthread_mutex_lock(&in_signal_lock); + signals_inflight++; + jl_atomic_store_release(&ptls2->signal_request, 5); + pthread_kill(ptls2->system_id, SIGUSR2); + pthread_mutex_unlock(&in_signal_lock); +} + // Write only by signal handling thread, read only by main thread // no sync necessary. static int thread0_exit_signo = 0; @@ -584,6 +603,7 @@ static void jl_exit_thread0(int signo, jl_bt_element_t *bt_data, size_t bt_size) // is reached // 3: raise `thread0_exit_signo` and try to exit // 4: no-op +// 5: longjmp to reset_ctx if available (for task cancellation) void usr2_handler(int sig, siginfo_t *info, void *ctx) { jl_task_t *ct = jl_get_current_task(); @@ -641,6 +661,15 @@ void usr2_handler(int sig, siginfo_t *info, void *ctx) else if (request == 3) { jl_call_in_ctx(ct->ptls, jl_exit_thread0_cb, sig, ctx); } + else if (request == 5) { + // Longjmp to reset_ctx for task cancellation + volatile _jl_ucontext_t *reset_ctx = ct->reset_ctx; + if (reset_ctx != NULL) { + // Clear reset_ctx before longjmp to prevent double-longjmp + ct->reset_ctx = NULL; + jl_longjmp_in_ctx(sig, ctx, reset_ctx->uc_mcontext); + } + } errno = errno_save; } @@ -989,14 +1018,20 @@ static void *signal_listener(void *arg) #endif if (sig == SIGINT) { - if (jl_ignore_sigint()) { - continue; - } - else if (exit_on_sigint) { + if (exit_on_sigint) { critical = 1; } else { - jl_try_deliver_sigint(); + jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[0]; + // Set the cancellation request, then notify the sigint listener + // that we want to cancel - if the task is not currently running, + // the sigint listener will take care of safely moving us through + // the cancellation state machine. + // TODO: If there is only one thread, we may need to ask the currently + // running task to yield, so that the sigint listener can run. + jl_atomic_store_release(&ptls2->root_task->cancellation_request, + jl_box_uint8(0x00)); + deliver_sigint_notification(); continue; } } @@ -1274,3 +1309,129 @@ JL_DLLEXPORT int jl_repl_raise_sigtstp(void) { return raise(SIGTSTP); } + +#if !defined(_OS_DARWIN_) +// Implementation of the `mprotect` based membarrier fallback. +// This is a common fallback based on the observation that `mprotect` happens to +// issue the necessary memory barriers. However, there is no spec that +// guarantees this behavior, and indeed AArch64 Darwin does not (so we don't use it +// there). However, we only use it as a fallback here for older versions of +// Linux and FreeBSD where we know that it happens to work. We also use it as a +// fallback for unknown Unix systems under the assumption that it will work, +// but this is not guaranteed. +static pthread_mutex_t mprotect_barrier_lock = PTHREAD_MUTEX_INITIALIZER; +static _Atomic(uint64_t) *mprotect_barrier_page = NULL; +static void jl_init_mprotect_membarrier(void) +{ + int result = pthread_mutex_lock(&mprotect_barrier_lock); + assert(result == 0); + if (mprotect_barrier_page == NULL) { + size_t pagesize = jl_getpagesize(); + + mprotect_barrier_page = (_Atomic(uint64_t) *) + mmap(NULL, pagesize, PROT_NONE, + MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + if (mprotect_barrier_page == MAP_FAILED) { + jl_safe_printf("fatal: failed to allocate barrier page.\n"); + abort(); + } + result = mlock(mprotect_barrier_page, pagesize); + if (result != 0) { + jl_safe_printf("fatal: failed to mlock barrier page (try increasing RLIMIT_MEMLOCK with `ulimit -l`).\n"); + abort(); + } + } + result = pthread_mutex_unlock(&mprotect_barrier_lock); + assert(result == 0); + (void)result; +} + +static void jl_mprotect_membarrier(void) +{ + int result = pthread_mutex_lock(&mprotect_barrier_lock); + assert(result == 0); + size_t pagesize = jl_getpagesize(); + result = mprotect(mprotect_barrier_page, pagesize, PROT_READ | PROT_WRITE); + jl_atomic_fetch_add_relaxed(mprotect_barrier_page, 1); + assert(result == 0); + result = mprotect(mprotect_barrier_page, pagesize, PROT_NONE); + assert(result == 0); + result = pthread_mutex_unlock(&mprotect_barrier_lock); + assert(result == 0); + (void)result; +} +#endif + +// Linux and FreeBSD have compatible membarrier support +#if defined(_OS_LINUX_) || defined(_OS_FREEBSD_) +#if defined(_OS_LINUX_) +# include +# if defined(__NR_membarrier) +enum membarrier_cmd { + MEMBARRIER_CMD_QUERY = 0, + MEMBARRIER_CMD_PRIVATE_EXPEDITED = (1 << 3), + MEMBARRIER_CMD_REGISTER_PRIVATE_EXPEDITED = (1 << 4), +}; +# define membarrier(...) syscall(__NR_membarrier, __VA_ARGS__) +# else +# warning "Missing linux kernel headers for membarrier syscall, support disabled" +# define membarrier(...) (errno = ENOSYS, -1) +# endif +#elif defined(_OS_FREEBSD_) +# include +# if __FreeBSD_version >= 1401500 +# include +# else +# define MEMBARRIER_CMD_QUERY 0x00 +# define MEMBARRIER_CMD_PRIVATE_EXPEDITED 0x08 +# define MEMBARRIER_CMD_REGISTER_PRIVATE_EXPEDITED 0x10 +# define membarrier(...) (errno = ENOSYS, -1) +# endif +#endif + +// Implementation of `jl_membarrier` +enum membarrier_implementation { + MEMBARRIER_IMPLEMENTATION_UNKNOWN = 0, + MEMBARRIER_IMPLEMENTATION_SYS_MEMBARRIER = 1, + MEMBARRIER_IMPLEMENTATION_MPROTECT = 2 +}; + +static _Atomic(enum membarrier_implementation) membarrier_impl = MEMBARRIER_IMPLEMENTATION_UNKNOWN; + +static enum membarrier_implementation jl_init_membarrier(void) { + int ret = membarrier(MEMBARRIER_CMD_QUERY, 0); + int needed = MEMBARRIER_CMD_PRIVATE_EXPEDITED | MEMBARRIER_CMD_REGISTER_PRIVATE_EXPEDITED; + if (ret > 0 && ((ret & needed) == needed)) { + // supported + if (membarrier(MEMBARRIER_CMD_REGISTER_PRIVATE_EXPEDITED, 0) == 0) { + // working + jl_atomic_store_relaxed(&membarrier_impl, MEMBARRIER_IMPLEMENTATION_SYS_MEMBARRIER); + return MEMBARRIER_IMPLEMENTATION_SYS_MEMBARRIER; + } + } + jl_init_mprotect_membarrier(); + jl_atomic_store_relaxed(&membarrier_impl, MEMBARRIER_IMPLEMENTATION_MPROTECT); + return MEMBARRIER_IMPLEMENTATION_MPROTECT; +} + +JL_DLLEXPORT void jl_membarrier(void) { + enum membarrier_implementation impl = jl_atomic_load_relaxed(&membarrier_impl); + if (impl == MEMBARRIER_IMPLEMENTATION_UNKNOWN) { + impl = jl_init_membarrier(); + } + if (impl == MEMBARRIER_IMPLEMENTATION_SYS_MEMBARRIER) { + int ret = membarrier(MEMBARRIER_CMD_PRIVATE_EXPEDITED, 0); + assert(ret == 0); + (void)ret; + } else { + assert(impl == MEMBARRIER_IMPLEMENTATION_MPROTECT); + jl_mprotect_membarrier(); + } +} +#elif !defined(_OS_DARWIN_) +JL_DLLEXPORT void jl_membarrier(void) { + if (!mprotect_barrier_page) + jl_init_mprotect_membarrier(); + jl_mprotect_membarrier(); +} +#endif diff --git a/src/signals-win.c b/src/signals-win.c index 1702bc338b16f..16b5734ab9c0d 100644 --- a/src/signals-win.c +++ b/src/signals-win.c @@ -664,3 +664,16 @@ void jl_install_thread_signal_handler(jl_ptls_t ptls) have_backtrace_fiber = 1; } } + +JL_DLLEXPORT void jl_membarrier(void) { + FlushProcessWriteBuffers(); +} + +// Send a signal to the specified thread to longjmp to its reset_ctx if available. +// This is used for task cancellation to interrupt a running task at a safe point. +// TODO: Implement Windows support using SuspendThread/GetThreadContext/SetThreadContext +JL_DLLEXPORT void jl_send_cancellation_signal(int16_t tid) +{ + // Not yet implemented on Windows + (void)tid; +} diff --git a/src/task.c b/src/task.c index 18d21b2343053..94057e550e3a1 100644 --- a/src/task.c +++ b/src/task.c @@ -1151,6 +1151,7 @@ JL_DLLEXPORT jl_task_t *jl_new_task(jl_value_t *start, jl_value_t *completion_fu jl_atomic_store_relaxed(&t->running_time_ns, 0); jl_atomic_store_relaxed(&t->finished_at, 0); jl_timing_task_init(t); + jl_atomic_store_relaxed(&t->cancellation_request, jl_nothing); if (t->ctx.copy_stack) t->ctx.copy_ctx = NULL; @@ -1209,6 +1210,17 @@ void jl_init_tasks(void) JL_GC_DISABLED static void NOINLINE JL_NORETURN _start_task(void); #endif +static void NOINLINE _handle_start_task_cancellation(jl_value_t *creq) +{ + jl_value_t *cancel_handler = jl_get_global(jl_base_module, jl_symbol("handle_cancellation!")); + if (!cancel_handler) { + jl_safe_printf("Task cancellation requested but Base.handle_cancellation! is not defined\n"); + jl_exit(1); + } + jl_value_t *fargs[2] = { cancel_handler, creq }; + jl_apply(fargs, 2); +} + static void NOINLINE JL_NORETURN JL_NO_ASAN start_task(void) { CFI_NORETURN @@ -1269,6 +1281,12 @@ CFI_NORETURN jl_sigint_safepoint(ptls); } JL_TIMING(ROOT, ROOT); + for (;;) { + jl_value_t *creq = jl_atomic_load_relaxed(&ct->cancellation_request); + if (creq == jl_nothing) + break; + _handle_start_task_cancellation(creq); + } res = jl_apply(&ct->start, 1); } JL_CATCH { @@ -1603,6 +1621,7 @@ jl_task_t *jl_init_root_task(jl_ptls_t ptls, void *stack_lo, void *stack_hi) jl_atomic_store_relaxed(&ct->first_enqueued_at, 0); jl_atomic_store_relaxed(&ct->last_started_running_at, 0); } + jl_atomic_store_relaxed(&ct->cancellation_request, jl_nothing); ptls->root_task = ct; jl_atomic_store_relaxed(&ptls->current_task, ct); JL_GC_PROMISE_ROOTED(ct); @@ -1661,6 +1680,14 @@ JL_DLLEXPORT int8_t jl_get_task_threadpoolid(jl_task_t *t) return t->threadpoolid; } +JL_DLLEXPORT void jl_preempt_thread_task(int16_t tid) +{ + jl_task_t *task = jl_atomic_load_relaxed(&jl_all_tls_states[tid]->current_task); + jl_value_t *expected = jl_nothing; + // If the task is already being cancelled, that's good enough for preemption + jl_atomic_cmpswap(&task->cancellation_request, &expected, jl_box_uint8(0x5)); + jl_send_cancellation_signal(tid); +} #ifdef _OS_WINDOWS_ #if defined(_CPU_X86_) diff --git a/stdlib/REPL/src/REPL.jl b/stdlib/REPL/src/REPL.jl index 0c31315e9bea1..2eb8168b47d04 100644 --- a/stdlib/REPL/src/REPL.jl +++ b/stdlib/REPL/src/REPL.jl @@ -451,6 +451,10 @@ function repl_backend_loop(backend::REPLBackend, get_module::Function) while true tls = task_local_storage() tls[:SOURCE_PATH] = nothing + # TODO: Support cancellation scopes for non-root tasks + if current_task() === Base.roottask + Base.reset_cancellation!() + end ast_or_func, show_value = take!(backend.repl_channel) if show_value == -1 # exit flag diff --git a/test/cancellation.jl b/test/cancellation.jl new file mode 100644 index 0000000000000..6a673e3ebb4d3 --- /dev/null +++ b/test/cancellation.jl @@ -0,0 +1,78 @@ +const collatz_code = quote + collatz(n) = (n & 1) == 1 ? (3n + 1) : (nรท2) + function find_collatz_counterexample() + i = 1 + while true + j = i + while true + @Base.cancel_check + j = collatz(j) + j == 1 && break + j == i && error("$j is a collatz counterexample") + end + i += 1 + end + end + @noinline function find_collatz_counterexample_inner() + i = 1 + while true + j = i + while true + j = collatz(j) + j == 1 && break + j == i && return j + end + i += 1 + end + end + function find_collatz_counterexample2() + @Base.cancel_check + return find_collatz_counterexample_inner() + end +end +eval(collatz_code) + +@testset "cancellation" begin + # Test cancellation point for a task that has never started running + @test_throws Base.CancellationRequest wait(@task nothing) + + + # Simple cancellation of `sleep` + t = @async sleep(10000) + yield(); yield(); yield() # Give the task a chance to start + Base.cancel!(t) + @test_throws Base.CancellationRequest wait(t) + + # Simple cancellation of compute-bound function + t = @Threads.spawn(find_collatz_counterexample()) + yield(); yield(); yield() # Give the task a chance to start + Base.cancel!(t) + @test_throws Base.CancellationRequest wait(t) + + # Test cancellation of @sync + t = @async @sync begin + @async sleep(10000) + @async sleep(10000) + end + yield(); yield(); yield() # Give the task a chance to start + Base.cancel!(t) + @test_throws Base.CompositeException wait(t) +end + +@testset "^C" begin + # This exercises ^C needing to preempt a compute-bound task + test_code = :(try; + @sync ((@async sleep(10000)); @async find_collatz_counterexample()) + catch e + println(typeof(e)) + end) + @test read(`$(Base.julia_cmd()) -e '$(string(collatz_code)); $test_code'`, String) == "CompositeException" + + # Make sure that preemption doesn't cause problems if all tasks are blocked + test_code = :(try; + @sync ((@async sleep(10000)); @async sleep(10000)) + catch e + println(typeof(e)) + end) + @test read(`$(Base.julia_cmd()) -e '$(string(collatz_code)); $test_code'`, String) == "CompositeException" +end diff --git a/test/intrinsics.jl b/test/intrinsics.jl index 5e18c1fb3672a..03ba86f85676e 100644 --- a/test/intrinsics.jl +++ b/test/intrinsics.jl @@ -395,13 +395,13 @@ end end using Base.Experimental: @force_compile -@test_throws ConcurrencyViolationError("invalid atomic ordering") (@force_compile; Core.Intrinsics.atomic_fence(:u)) === nothing -@test_throws ConcurrencyViolationError("invalid atomic ordering") (@force_compile; Core.Intrinsics.atomic_fence(Symbol("u", "x"))) === nothing -@test_throws ConcurrencyViolationError("invalid atomic ordering") Core.Intrinsics.atomic_fence(Symbol("u", "x")) === nothing +@test_throws ConcurrencyViolationError("invalid atomic ordering") (@force_compile; Core.Intrinsics.atomic_fence(:u, :system)) === nothing +@test_throws ConcurrencyViolationError("invalid atomic ordering") (@force_compile; Core.Intrinsics.atomic_fence(Symbol("u", "x"), :system)) === nothing +@test_throws ConcurrencyViolationError("invalid atomic ordering") Core.Intrinsics.atomic_fence(Symbol("u", "x"), :system) === nothing for order in (:not_atomic, :monotonic, :acquire, :release, :acquire_release, :sequentially_consistent) - @test Core.Intrinsics.atomic_fence(order) === nothing - @test (order -> Core.Intrinsics.atomic_fence(order))(order) === nothing - @test Base.invokelatest(@eval () -> Core.Intrinsics.atomic_fence($(QuoteNode(order)))) === nothing + @test Core.Intrinsics.atomic_fence(order, :system) === nothing + @test (order -> Core.Intrinsics.atomic_fence(order, :system))(order) === nothing + @test Base.invokelatest(@eval () -> Core.Intrinsics.atomic_fence($(QuoteNode(order)), :system)) === nothing end @test Core.Intrinsics.atomic_pointerref(C_NULL, :sequentially_consistent) === nothing @test (@force_compile; Core.Intrinsics.atomic_pointerref(C_NULL, :sequentially_consistent)) === nothing diff --git a/test/llvmpasses/cancellation-lowering-codegen.jl b/test/llvmpasses/cancellation-lowering-codegen.jl new file mode 100644 index 0000000000000..b9395baa0408c --- /dev/null +++ b/test/llvmpasses/cancellation-lowering-codegen.jl @@ -0,0 +1,26 @@ +# This file is a part of Julia. License is MIT: https://julialang.org/license + +# RUN: julia --startup-file=no -O2 %s %t -O && llvm-link -S %t/* | FileCheck %s + +include(joinpath("..", "testhelpers", "llvmpasses.jl")) + +# Test that @cancel_check generates the expected cancellation lowering IR: +# - A jl_setjmp call with returns_twice attribute +# - A reset_ctx_ptr getelementptr +# - Atomic store of ucontext buffer to reset_ctx +# - Atomic store of null to reset_ctx before return (since ucontext is stack-allocated) + +# CHECK-LABEL: @julia_test_cancel_check +# CHECK: %cancel_ucontext = alloca +# CHECK: %reset_ctx_ptr = getelementptr +# CHECK: store atomic ptr {{.*}}, ptr %reset_ctx_ptr release +# CHECK: call i32 @{{.*}}setjmp{{.*}}(ptr {{.*}}) #[[ATTR:[0-9]+]] +# CHECK: store atomic ptr null, ptr %reset_ctx_ptr release +# CHECK: ret +# CHECK: attributes #[[ATTR]] = {{{.*}}returns_twice{{.*}}} +function test_cancel_check() + Base.@cancel_check + return 1 +end + +emit(test_cancel_check) diff --git a/test/llvmpasses/cancellation-lowering.ll b/test/llvmpasses/cancellation-lowering.ll new file mode 100644 index 0000000000000..22245b64288c3 --- /dev/null +++ b/test/llvmpasses/cancellation-lowering.ll @@ -0,0 +1,82 @@ +; This file is a part of Julia. License is MIT: https://julialang.org/license + +; RUN: opt --load-pass-plugin=libjulia-codegen%shlibext -passes='CancellationLowering' -S %s | FileCheck %s + +declare i32 @julia.cancellation_point() +declare ptr @julia.get_pgcstack() +declare void @some_unsafe_call() +declare void @some_safe_call() + +; Test basic cancellation point lowering with reset_ctx cleared before return +define i32 @test_cancellation_point() { +entry: +; CHECK-LABEL: @test_cancellation_point +; CHECK: %cancel_ucontext = alloca +; CHECK: %pgcstack = call ptr @julia.get_pgcstack() +; CHECK: %current_task = getelementptr i8, ptr %pgcstack +; CHECK: %reset_ctx_ptr = getelementptr i8, ptr %current_task +; CHECK: store atomic ptr %cancel_ucontext, ptr %reset_ctx_ptr release +; CHECK: %{{.*}} = call i32 @{{.*}}setjmp{{.*}}(ptr %cancel_ucontext) +; CHECK-NOT: call i32 @julia.cancellation_point() +; CHECK: store atomic ptr null, ptr %reset_ctx_ptr release +; CHECK-NEXT: ret i32 + %pgcstack = call ptr @julia.get_pgcstack() + %result = call i32 @julia.cancellation_point() + ret i32 %result +} + +; Test that unsafe calls get reset_ctx = NULL inserted before them +define void @test_unsafe_call() { +entry: +; CHECK-LABEL: @test_unsafe_call +; CHECK: %cancel_ucontext = alloca +; CHECK: %pgcstack = call ptr @julia.get_pgcstack() +; CHECK: %current_task = getelementptr i8, ptr %pgcstack +; CHECK: %reset_ctx_ptr = getelementptr i8, ptr %current_task +; CHECK: store atomic ptr %cancel_ucontext, ptr %reset_ctx_ptr release +; CHECK: call i32 @{{.*}}setjmp +; The unsafe call should have reset_ctx = NULL before it +; CHECK: store atomic ptr null, ptr %reset_ctx_ptr release +; CHECK-NEXT: call void @some_unsafe_call() +; Also reset_ctx = NULL before the return +; CHECK: store atomic ptr null, ptr %reset_ctx_ptr release +; CHECK-NEXT: ret void + %pgcstack = call ptr @julia.get_pgcstack() + %result = call i32 @julia.cancellation_point() + call void @some_unsafe_call() + ret void +} + +; Test that calls with reset_safe metadata don't get reset_ctx = NULL before them +; but still get reset_ctx = NULL before return +define void @test_safe_call() { +entry: +; CHECK-LABEL: @test_safe_call +; CHECK: %cancel_ucontext = alloca +; CHECK: %pgcstack = call ptr @julia.get_pgcstack() +; CHECK: %current_task = getelementptr i8, ptr %pgcstack +; CHECK: %reset_ctx_ptr = getelementptr i8, ptr %current_task +; CHECK: call i32 @{{.*}}setjmp +; The safe call should NOT have reset_ctx = NULL before it +; CHECK: call void @some_safe_call(), !julia.reset_safe +; But reset_ctx = NULL should be before the return +; CHECK: store atomic ptr null, ptr %reset_ctx_ptr release +; CHECK-NEXT: ret void + %pgcstack = call ptr @julia.get_pgcstack() + %result = call i32 @julia.cancellation_point() + call void @some_safe_call(), !julia.reset_safe !0 + ret void +} + +; Test function without cancellation points is unchanged +define void @test_no_cancellation_point() { +entry: +; CHECK-LABEL: @test_no_cancellation_point +; CHECK-NOT: setjmp +; CHECK-NOT: reset_ctx +; CHECK: call void @some_unsafe_call() + call void @some_unsafe_call() + ret void +} + +!0 = !{} diff --git a/test/threads_exec.jl b/test/threads_exec.jl index 2780888546964..6fb4a9f2b7b89 100644 --- a/test/threads_exec.jl +++ b/test/threads_exec.jl @@ -464,6 +464,56 @@ function test_fence() end test_fence() +# Test asymmetric thread fences +struct AsymmetricFenceTestData + n::Int + x::AtomicMemory{Int} + y::AtomicMemory{Int} + read_x::AtomicMemory{Int} + read_y::AtomicMemory{Int} +end +function test_asymmetric_fence(data::AsymmetricFenceTestData, cond1, cond2, threadid, it) + if (threadid % 2) == 0 + @atomic :monotonic data.x[it] = 1 + Threads.atomic_fence_heavy() + @atomic :monotonic data.read_y[it] = @atomic :monotonic data.y[it] + wait(cond1) + notify(cond2) + else + @atomic :monotonic data.y[it] = 1 + Threads.atomic_fence_light() + @atomic :monotonic data.read_x[it] = @atomic :monotonic data.x[it] + notify(cond1) + wait(cond2) + end +end +function test_asymmetric_fence(data::AsymmetricFenceTestData, cond1, cond2, threadid) + for i = 1:data.n + test_asymmetric_fence(data, cond1, cond2, threadid, i) + end +end +function test_asymmetric_fence() + asymmetric_test_count = 200_000 + cond1 = Threads.Event(true) + cond2 = Threads.Event(true) + data = AsymmetricFenceTestData(asymmetric_test_count, + AtomicMemory{Int}(undef, asymmetric_test_count), + AtomicMemory{Int}(undef, asymmetric_test_count), + AtomicMemory{Int}(undef, asymmetric_test_count), + AtomicMemory{Int}(undef, asymmetric_test_count)) + for i = 1:asymmetric_test_count + @atomic :monotonic data.x[i] = 0 + @atomic :monotonic data.y[i] = 0 + @atomic :monotonic data.read_x[i] = typemax(Int) + @atomic :monotonic data.read_y[i] = typemax(Int) + end + t1 = @Threads.spawn test_asymmetric_fence(data, cond1, cond2, 1) + t2 = @Threads.spawn test_asymmetric_fence(data, cond1, cond2, 2) + wait(t1); wait(t2) + @test !any((data.read_x .== 0) .& (data.read_y .== 0)) +end +test_asymmetric_fence() + # Test load / store with various types let atomictypes = (Int8, Int16, Int32, Int64, Int128, UInt8, UInt16, UInt32, UInt64, UInt128,