Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix println performance regression #10679

Merged
merged 1 commit into from
Apr 2, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ Library improvements

* `readavailable` returns a byte vector instead of a string.

* `lock` and `unlock` which operate on `ReentrantLock`. Useful to lock a stream during
concurrent writes from multiple tasks


Deprecated or removed
---------------------

Expand Down
3 changes: 3 additions & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1000,10 +1000,13 @@ export
current_task,
istaskstarted,
istaskdone,
lock,
notify,
produce,
ReentrantLock,
schedule,
task_local_storage,
unlock,
yield,

# time
Expand Down
37 changes: 37 additions & 0 deletions base/lock.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Advisory reentrant lock
type ReentrantLock
locked_by::Nullable{Task}
cond_wait::Condition
reentrancy_cnt::Int

ReentrantLock() = new(nothing, Condition(), 0)
end

function lock(rl::ReentrantLock)
t = current_task()
while true
if rl.reentrancy_cnt == 0
rl.locked_by = t
rl.reentrancy_cnt = 1
return
elseif t == get(rl.locked_by)
rl.reentrancy_cnt += 1
return
end
wait(rl.cond_wait)
end
end

unlock(o::Any) = unlock(o.lock)

function unlock(rl::ReentrantLock)
rl.reentrancy_cnt = rl.reentrancy_cnt - 1
if rl.reentrancy_cnt == 0
rl.locked_by = nothing
notify(rl.cond_wait)
elseif rl.reentrancy_cnt < 0
AssertionError("unlock count must match lock count")
end
rl
end

5 changes: 4 additions & 1 deletion base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ end
function send_msg_(w::Worker, kind, args, now::Bool)
#println("Sending msg $kind")
io = w.w_stream
lock(io) do io
lock(io.lock)
try
serialize(io, kind)
for arg in args
serialize(io, arg)
Expand All @@ -178,6 +179,8 @@ function send_msg_(w::Worker, kind, args, now::Bool)
else
flush(io)
end
finally
unlock(io.lock)
end
end

Expand Down
4 changes: 3 additions & 1 deletion base/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ type TCPSocket <: Socket
closecb::Callback
closenotify::Condition
sendbuf::Nullable{IOBuffer}
lock::ReentrantLock

TCPSocket(handle) = new(
handle,
Expand All @@ -268,7 +269,8 @@ type TCPSocket <: Socket
false, Condition(),
false, Condition(),
false, Condition(),
nothing
nothing,
ReentrantLock()
)
end
function TCPSocket()
Expand Down
20 changes: 17 additions & 3 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type Pipe <: AsyncStream
closecb::Callback
closenotify::Condition
sendbuf::Nullable{IOBuffer}
lock::ReentrantLock

Pipe(handle) = new(
handle,
StatusUninit,
Expand All @@ -109,7 +111,7 @@ type Pipe <: AsyncStream
false,Condition(),
false,Condition(),
false,Condition(),
nothing)
nothing, ReentrantLock())
end
function Pipe()
handle = Libc.malloc(_sizeof_uv_named_pipe)
Expand Down Expand Up @@ -177,6 +179,7 @@ type TTY <: AsyncStream
closecb::Callback
closenotify::Condition
sendbuf::Nullable{IOBuffer}
lock::ReentrantLock
@windows_only ispty::Bool
function TTY(handle)
tty = new(
Expand All @@ -186,7 +189,7 @@ type TTY <: AsyncStream
PipeBuffer(),
false,Condition(),
false,Condition(),
nothing)
nothing, ReentrantLock())
@windows_only tty.ispty = Bool(ccall(:jl_ispty, Cint, (Ptr{Void},), handle))
tty
end
Expand Down Expand Up @@ -217,6 +220,16 @@ nb_available(stream::UVStream) = nb_available(stream.buffer)
show(io::IO,stream::TTY) = print(io,"TTY(",uv_status_string(stream),", ",
nb_available(stream.buffer)," bytes waiting)")

function println(io::AsyncStream, xs...)
lock(io.lock)
try
invoke(println, tuple(IO, typeof(xs)...), io, xs...)
finally
unlock(io.lock)
end
end


uvtype(::AsyncStream) = UV_STREAM
uvhandle(stream::AsyncStream) = stream.handle

Expand Down Expand Up @@ -957,8 +970,9 @@ type BufferStream <: AsyncStream
close_c::Condition
is_open::Bool
buffer_writes::Bool
lock::ReentrantLock

BufferStream() = new(PipeBuffer(), Condition(), Condition(), true, false)
BufferStream() = new(PipeBuffer(), Condition(), Condition(), true, false, ReentrantLock())
end

isopen(s::BufferStream) = s.is_open
Expand Down
7 changes: 2 additions & 5 deletions base/string.jl
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
## core text I/O ##

print(io::IO, x) = show(io, x)
function print(io::IO, xs...)
lock(io) do io
for x in xs print(io, x) end
end
end
print(io::IO, xs...) = for x in xs print(io, x) end

println(io::IO, xs...) = print(io, xs..., '\n')

print(xs...) = print(STDOUT, xs...)
Expand Down
1 change: 1 addition & 0 deletions base/sysimg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ include("nullable.jl")

# I/O
include("task.jl")
include("lock.jl")
include("show.jl")
include("stream.jl")
include("socket.jl")
Expand Down
57 changes: 0 additions & 57 deletions base/util.jl
Original file line number Diff line number Diff line change
Expand Up @@ -268,60 +268,3 @@ end

julia_exename() = ccall(:jl_is_debugbuild,Cint,())==0 ? "julia" : "julia-debug"

# Advisory reentrant lock
type ReentrantLock
locked_by::Nullable{Task}
cond_wait::Condition
reentrancy_cnt::Int

ReentrantLock() = new(nothing, Condition(), 0)
end

# Lock object during function execution. Recursive calls by the same task is OK.
const adv_locks_map = WeakKeyDict{Any, ReentrantLock}()
function lock(f::Function, o::Any)
rl = get(adv_locks_map, o, nothing)
if rl == nothing
rl = ReentrantLock()
adv_locks_map[o] = rl
end
lock(rl)

try
f(o)
finally
unlock(o)
end
end

function lock(rl::ReentrantLock)
t = current_task()
while true
if rl.reentrancy_cnt == 0
rl.locked_by = t
rl.reentrancy_cnt = 1
return
elseif t == get(rl.locked_by)
rl.reentrancy_cnt += 1
return
end
wait(rl.cond_wait)
end
end

function unlock(o::Any)
rl = adv_locks_map[o]
unlock(rl)
end

function unlock(rl::ReentrantLock)
rl.reentrancy_cnt = rl.reentrancy_cnt - 1
if rl.reentrancy_cnt == 0
rl.locked_by = nothing
notify(rl.cond_wait)
elseif rl.reentrancy_cnt < 0
AssertionError("unlock count must match lock count")
end
rl
end

44 changes: 44 additions & 0 deletions doc/manual/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,50 @@ potential performance optimizations that can be achieved by other
means (e.g., using explicit loops), operators like ``+=`` and ``*=``
work by rebinding new values.

Asynchronous IO and concurrent synchronous writes
-------------------------------------------------

Why do concurrent writes to the same stream result in inter-mixed output?
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

While the streaming I/O API is synchronous, the underlying implementation
is fully asynchronous.

The following::

@sync for i in 1:3
@async print(i, " Foo ", " Bar ")
end

results in::
123 Foo Foo Foo Bar Bar Bar

This is happening because, while ``print(i, " Foo ", " Bar ")`` is synchronous,
internally, the writing of each argument yields to other tasks while waiting for
that part of the I/O to complete.

``println`` to asynchronous streams like STDOUT, TcpSockets, "locks" the stream
during a call. Consequently changing ``print`` to ``println`` in the above example
results in::

1 Foo Bar
2 Foo Bar
3 Foo Bar

For other functions and streams, etc, you could lock your writes with a ``ReentrantLock``
like this::

l = ReentrantLock()
@sync for i in 1:3
@async begin
lock(l)
try
print(i, " Foo ", " Bar ")
finally
unlock(l)
end
end


Julia Releases
----------------
Expand Down
16 changes: 16 additions & 0 deletions doc/stdlib/parallel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,22 @@ Tasks
Block the current task for a specified number of seconds. The minimum sleep
time is 1 millisecond or input of ``0.001``.

.. function:: ReentrantLock()

Creates a reentrant lock. The same task can acquire the lock as many times
as required. Each lock must be matched with an unlock.

.. function:: lock(l::ReentrantLock)

Associates ``l`` with the current task. If ``l`` is already locked by a different
task, waits for it to become available. The same task can acquire the lock multiple
times. Each "lock" must be matched by an "unlock"

.. function:: unlock(l::ReentrantLock)

Releases ownership of the lock by the current task. If the lock had been acquired before,
it just decrements an internal counter and returns immediately.


General Parallel Computing Support
----------------------------------
Expand Down