diff --git a/base/stream.jl b/base/stream.jl index 7b227458ec552..c688232164040 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -665,75 +665,55 @@ end function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}) stream_unknown_type = @handle_as handle LibuvStream nrequested = ccall(:jl_uv_buf_len, Csize_t, (Ptr{Cvoid},), buf) + function readcb_specialized(stream::LibuvStream, nread::Int, nrequested::UInt) lock(stream.cond) - if nread < 0 - if nread == UV_ENOBUFS && nrequested == 0 - # remind the client that stream.buffer is full - notify(stream.cond) - elseif nread == UV_EOF # libuv called uv_stop_reading already - if stream.status != StatusClosing - stream.status = StatusEOF + try + if nread < 0 + if nread == UV_ENOBUFS && nrequested == 0 + # remind the client that stream.buffer is full notify(stream.cond) - if stream isa TTY - # stream can still be used by reseteof (or possibly write) - elseif !(stream isa PipeEndpoint) && ccall(:uv_is_writable, Cint, (Ptr{Cvoid},), stream.handle) != 0 - # stream can still be used by write - else - # underlying stream is no longer useful: begin finalization - ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) - stream.status = StatusClosing + elseif nread == UV_EOF + if stream.status != StatusClosing + stream.status = StatusEOF + notify(stream.cond) + + if stream isa TTY + # still usable + elseif !(stream isa PipeEndpoint) && + ccall(:uv_is_writable, Cint, (Ptr{Cvoid},), stream.handle) != 0 + # still writable + else + ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) + stream.status = StatusClosing + end end + else + stream.readerror = _UVError("read", nread) + notify(stream.cond) + ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) + stream.status = StatusClosing end else - stream.readerror = _UVError("read", nread) + notify_filled(stream.buffer, nread) notify(stream.cond) - # This is a fatal connection error - ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) - stream.status = StatusClosing end - else - notify_filled(stream.buffer, nread) - notify(stream.cond) + finally + unlock(stream.cond) end - unlock(stream.cond) - # Stop background reading when - # 1) there's nobody paying attention to the data we are reading - # 2) we have accumulated a lot of unread data OR - # 3) we have an alternate buffer that has reached its limit. if stream.status == StatusPaused || (stream.status == StatusActive && ((bytesavailable(stream.buffer) >= stream.throttle) || (bytesavailable(stream.buffer) >= stream.buffer.maxsize))) - # save cycles by stopping kernel notifications from arriving ccall(:uv_read_stop, Cint, (Ptr{Cvoid},), stream) stream.status = StatusOpen end - nothing - end - readcb_specialized(stream_unknown_type, Int(nread), UInt(nrequested)) - nothing -end -function reseteof(x::TTY) - iolock_begin() - if x.status == StatusEOF - x.status = StatusOpen + nothing end - iolock_end() - nothing -end -function _uv_hook_close(uv::Union{LibuvStream, LibuvServer}) - lock(uv.cond) - try - uv.status = StatusClosed - # notify any listeners that exist on this libuv stream type - notify(uv.cond) - finally - unlock(uv.cond) - end + readcb_specialized(stream_unknown_type, Int(nread), UInt(nrequested)) nothing end diff --git a/stdlib/Sockets/test/runtests.jl b/stdlib/Sockets/test/runtests.jl index 2565dc180eba7..b2d27228dd645 100644 --- a/stdlib/Sockets/test/runtests.jl +++ b/stdlib/Sockets/test/runtests.jl @@ -175,7 +175,7 @@ defaultport = rand(2000:4000) wait(tsk) end - mktempdir() do tmpdir + mktempdir() do tmpdir socketname = Sys.iswindows() ? ("\\\\.\\pipe\\uv-test-" * randstring(6)) : joinpath(tmpdir, "socket") local nconn = 0 srv = listen(socketname) @@ -192,8 +192,38 @@ defaultport = rand(2000:4000) wait(t) @test read(conn, String) == "" end + + @static if !Sys.iswindows() + @testset "Unix domain socket half-close preserves writeability" begin + dir = mktempdir() + sock = joinpath(dir, "halfclose.sock") + + server = listen(sock) + + t = Threads.@spawn begin + c = accept(server) + read(c, String) # peer shutdown(SHUT_WR) + write(c, "pong\n") # MUST still succeed + flush(c) + close(c) + close(server) + :ok + end + + out = read(pipeline( + `socat -t 1 - UNIX-CONNECT:$sock`, + stdin = IOBuffer("ping\n"), + ), String) + + srv = fetch(t) + + @test out == "pong\n" + @test srv == :ok + end + end end + @testset "getsockname errors" begin sock = TCPSocket() serv = Sockets.TCPServer()