Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.13.0-beta2
1.13.0-beta3
58 changes: 14 additions & 44 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1161,32 +1161,6 @@ function throwto(t::Task, @nospecialize exc)
return try_yieldto(identity)
end

function wait_forever()
while true
try
while true
wait()
end
catch e
local errs = stderr
# try to display the failure atomically
errio = IOContext(PipeBuffer(), errs::IO)
emphasize(errio, "Internal Task ")
display_error(errio, current_exceptions())
write(errs, errio)
# victimize another random Task also
if Threads.threadid() == 1 && isa(e, InterruptException) && isempty(Workqueue)
backend = repl_backend_task()
backend isa Task && throwto(backend, e)
end
end
end
end

const get_sched_task = OncePerThread{Task}() do
Task(wait_forever)
end

function ensure_rescheduled(othertask::Task)
ct = current_task()
W = workqueue_for(Threads.threadid())
Expand Down Expand Up @@ -1223,30 +1197,26 @@ end

checktaskempty = Partr.multiq_check_empty

@noinline function poptask(W::StickyWorkqueue)
task = trypoptask(W)
if !(task isa Task)
task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any), trypoptask, W, checktaskempty)
end
set_next_task(task)
nothing
end

function wait()
ct = current_task()
# [task] user_time -yield-or-done-> wait_time
record_running_time!(ct)
# let GC run
GC.safepoint()
# check for libuv events
process_events()

# get the next task to run
W = workqueue_for(Threads.threadid())
task = trypoptask(W)
if task === nothing
# No tasks to run; switch to the scheduler task to run the
# thread sleep logic.
sched_task = get_sched_task()
if ct !== sched_task
istaskdone(sched_task) && (sched_task = @task wait())
return yieldto(sched_task)
end
task = ccall(:jl_task_get_next, Ref{Task}, (Any, Any, Any), trypoptask, W, checktaskempty)
end
set_next_task(task)
return try_yieldto(ensure_rescheduled)
poptask(W)
result = try_yieldto(ensure_rescheduled)
process_events()
# return when we come out of the queue
return result
end

if Sys.iswindows()
Expand Down
1 change: 0 additions & 1 deletion contrib/juliac/juliac-buildscript.jl
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ end
#entrypoint(join, (Base.GenericIOBuffer{Memory{UInt8}}, Array{String, 1}, Char))
entrypoint(Base.task_done_hook, (Task,))
entrypoint(Base.wait, ())
entrypoint(Base.wait_forever, ())
entrypoint(Base.trypoptask, (Base.StickyWorkqueue,))
entrypoint(Base.checktaskempty, ())

Expand Down
1 change: 0 additions & 1 deletion contrib/juliac/juliac-trim-base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ end
depwarn(msg, funcsym; force::Bool=false) = nothing
_assert_tostring(msg) = ""
reinit_stdio() = nothing
wait_forever() = while true; wait(); end
JuliaSyntax.enable_in_core!() = nothing
init_active_project() = ACTIVE_PROJECT[] = nothing
set_active_project(projfile::Union{AbstractString,Nothing}) = ACTIVE_PROJECT[] = projfile
Expand Down
4 changes: 3 additions & 1 deletion stdlib/Sockets/test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -556,12 +556,14 @@ end
fetch(r)
end

let addr = Sockets.InetAddr(ip"192.0.2.5", 4444)
let addr = Sockets.InetAddr(ip"127.0.0.1", 4444)
srv = listen(addr)
s = Sockets.TCPSocket()
Sockets.connect!(s, addr)
r = @async close(s)
@test_throws Base._UVError("connect", Base.UV_ECANCELED) Sockets.wait_connected(s)
fetch(r)
close(srv)
end
end

Expand Down
6 changes: 4 additions & 2 deletions test/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -463,14 +463,16 @@ end
cb = first(async.cond.waitq)
@test isopen(async)
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
W = Base.workqueue_for(Threads.threadid())
@test isempty(W)
Base.process_events() # schedule event
Sys.iswindows() && Base.process_events() # schedule event (windows?)
@test length(W) == 1
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
@test tc[] == 0
yield() # consume event
@test tc[] == 1
ccall(:uv_async_send, Cvoid, (Ptr{Cvoid},), async)
Base.process_events()
Sys.iswindows() && Base.process_events() # schedule event (windows?)
yield() # consume event
@test tc[] == 2
Expand Down
Loading