-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Concurrency violation on interplay between Distributed and Base.Threads #73
Concurrency violation on interplay between Distributed and Base.Threads #73
Comments
The error persists on version 1.5.2 and here is the output for
which is the current version of
|
|
Indeed, if I funnel all the communication through thread 1, it works! # ...
p = D.@spawnat w @sync begin
funnel = Channel()
T.@spawn stillepost(i, c1, funnel)
stillepost(i+0.5, funnel, c2)
end
# ... ... but is this something we could hope for to be fixed before the next LTS is released? Why is thread 1 so special? I am not at all familiar with the code in either of the two libraries, but if you could give me some pointers, I can give it a go. 🙂 |
Thread 1 isn't special, this is just something that hasn't been updated for threads yet. The |
TL;DR: There is a different issue. Above example is fixed by using I tried to write some tests that send and receive from all combinations of threads, processes and where the using Test
using Distributed, Base.Threads
using Base.Iterators: product
exeflags = ("--startup-file=no",
"--check-bounds=yes",
"--depwarn=error",
"--threads=2")
function call_on(f, wid, tid)
remotecall(wid) do
t = Task(f)
ccall(:jl_set_task_tid, Cvoid, (Any, Cint), t, tid-1)
schedule(t)
@assert threadid(t) == tid
t
end
end
# Run function on process holding the data to only serialize the result of f.
# This becomes useful for things that cannot be serialized (e.g. running tasks)
# or that would be unnecessarily big if serialized.
fetch_from_owner(f, rr) = remotecall_fetch(f∘fetch, rr.where, rr)
isdone(rr) = fetch_from_owner(istaskdone, rr)
isfailed(rr) = fetch_from_owner(istaskfailed, rr)
@testset "RemoteChannel is threadsafe" begin
ws = ts = product(1:2, 1:2)
timeout = 10.0
@testset "from worker $w1 to $w2 via 1" for (w1, w2) in ws
@testset "from thread $w1.$t1 to $w2.$t2" for (t1, t2) in ts
procs_added = addprocs(2; exeflags)
@everywhere procs_added using Base.Threads
p1 = procs_added[w1]
p2 = procs_added[w2]
chan_id = first(procs_added)
chan = RemoteChannel(chan_id)
send = call_on(p1, t1) do
put!(chan, nothing)
end
recv = call_on(p2, t2) do
take!(chan)
end
timedwait(() -> isdone(send) && isdone(recv), timeout)
@test isdone(send)
@test isdone(recv)
@test !isfailed(send)
@test !isfailed(recv)
rmprocs(procs_added)
end
end
end Above tests fail consistently in these cases -- all errors being concurrency violations:
Comparing the (nearly) complete outputs of those tests, I was lucky to see that some workers were not available: $ julia-1.5 threads.jl > out1.txt
$ julia-1.5 threads.jl > out2.txt
$ diff out1.txt out2.txt
124,125c124,126
< ProcessExitedException(29)
< worker_from_id(::Distributed.ProcessGroup, ::Int64) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/cluster.jl:1074
---
> no process with id 29 exists
> error(::String) at ./error.jl:33
> worker_from_id(::Distributed.ProcessGroup, ::Int64) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/cluster.jl:1079 So I tried adding the workers using |
Latest attempt is JuliaLang/julia#38405 |
I hit on what is probably the same issue - using channels and futures to communicate between threads in a single process. The gist https://gist.github.com/orenbenkiki/ac71f348d4915b394805656b142b33fe contains small sample code and error traces, in case this isn't exactly the same issue (specifically, there are no worker processes involved here at all - the problem is purely lack of thread safety). I found it surprising that channels/futures are not thread safe - I think it warrants an explicit warning in the documentation until this is fixed. |
Try again now that JuliaLang/julia#38405 is merged? |
Is that in the latest Julia 1.7 ? It isn't listed under https://github.com/JuliaLang/julia/blob/v1.7.0-beta2/NEWS.md#multi-threading-changes Edit: It seems this was merged 30m ago? Wow, that's some timing. I suppose this means I'd have to download Julia from github and build it from source - I never tried doing that before... Is there somewhere one can download the compiled-bleeding-latest-githib-version (for Linux)? Edit2: Ah, nighly builds. I'll give it a day or two so the merged version will get there and then give it a try. Thanks! |
It just got merged so it isn't in 1.7 (yet). |
With JuliaLang/julia#38405, 3 threads, and 3 workers, the second test (#73) passes for me, while the first test (#73) hangs with no CPU activity. |
Note I see non-deterministic results. If you run it multiple times, sometimes it passes, most likely it deadlocks, sometimes it crashes, even with the same number of threads/processes. Running it for longer (larger iterations count) increases the chance of a deadlock (unsurprisingly). This was all on Julia 1.6, mind you - I haven't tried it on the latest version yet, since the potential fix was only merged less than two hours ago - waiting for the nightly build to pick it up. |
JuliaLang/julia#38405 also only fixes a limit set of interactions and there is more work needed to ensure that Distributed.jl is fully thread-safe. |
JuliaLang/julia#41722 reverted the first fix again. |
Moving this of the 1.7 milestone. Making Distributed.jl thread-safe will be more work, I am hopeful that we can make progress on this for 1.8 |
@vchuravy JuliaLang/julia#42239 seems stalled, so moving this off the v1.8 milestone |
I just watched the State of Julia 2022 which claimed that Distributed would now be thread-safe. However, the snippet above (#73) remains broken on 1.8.5 as well as 1.9.1 both with the following summary.
Note that |
A pipeline objects holds references to the channels and tasks involved in the execution. It further holds the configurations of each stage of the pipeline. Addresses #18 Opens #1 See https://github.com/JuliaLang/julia/issues/37706
I’m working on a distributed pipeline algorithm that uses several stages per worker process. IIRC tasks cannot hop between threads once they’ve been scheduled. Since I want my stages to potentially run in parallel, I tried to create non-sticky tasks by chaining each
D.@spawnat
with aT.@spawn
. However, this setup keeps failing/crashing and I don’t understand why.I boiled it down to a minimal example:
Player 2 fails with a concurrency violation:
Am I holding it wrong?
See also this post on discourse. foobar_lv2 suggested this might be a bug in Distributed.
The text was updated successfully, but these errors were encountered: