Skip to content

Commit

Permalink
Implement new CloseWrite/CloseRead interface
Browse files Browse the repository at this point in the history
Close now closes the stream in both directions while CloseRead discards inbound
bytes and CloseWrite sends an EOF. This matches the user's expectation where
Close actually closes the stream.

part of libp2p/go-libp2p-core#166
  • Loading branch information
Stebalien committed Aug 28, 2020
1 parent 30712bb commit 510b0c1
Show file tree
Hide file tree
Showing 6 changed files with 380 additions and 191 deletions.
4 changes: 2 additions & 2 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func testSmallPackets(b *testing.B, n1, n2 net.Conn) {
wg.Add(1)
go func() {
defer wg.Done()
defer localB.Close()
receiveBuf := make([]byte, 2048)

for {
Expand All @@ -103,7 +104,7 @@ func testSmallPackets(b *testing.B, n1, n2 net.Conn) {
atomic.AddUint64(&receivedBytes, uint64(n))
}
}()

defer localA.Close()
i := 0
for {
n, err := localA.Write(msgs[i])
Expand All @@ -116,7 +117,6 @@ func testSmallPackets(b *testing.B, n1, n2 net.Conn) {
break
}
}
localA.Close()
})
b.StopTimer()
wg.Wait()
Expand Down
17 changes: 17 additions & 0 deletions deadline.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ func (d *pipeDeadline) set(t time.Time) {
d.mu.Lock()
defer d.mu.Unlock()

// deadline closed
if d.cancel == nil {
return
}

if d.timer != nil && !d.timer.Stop() {
<-d.cancel // Wait for the timer callback to finish and close cancel
}
Expand Down Expand Up @@ -70,6 +75,18 @@ func (d *pipeDeadline) wait() chan struct{} {
return d.cancel
}

// close closes, the deadline. Any future calls to `set` will do nothing.
func (d *pipeDeadline) close() {
d.mu.Lock()
defer d.mu.Unlock()

if d.timer != nil && !d.timer.Stop() {
<-d.cancel // Wait for the timer callback to finish and close cancel
}
d.timer = nil
d.cancel = nil
}

func isClosedChan(c <-chan struct{}) bool {
select {
case <-c:
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/libp2p/go-libp2p-testing v0.1.2-0.20200422005655-8775583591d8
github.com/multiformats/go-varint v0.0.6
github.com/opentracing/opentracing-go v1.2.0 // indirect
go.uber.org/multierr v1.5.0
go.uber.org/zap v1.15.0 // indirect
golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443 // indirect
google.golang.org/grpc v1.28.1
Expand Down
128 changes: 44 additions & 84 deletions multiplex.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,15 @@ func NewMultiplex(con net.Conn, initiator bool) *Multiplex {

func (mp *Multiplex) newStream(id streamID, name string) (s *Stream) {
s = &Stream{
id: id,
name: name,
dataIn: make(chan []byte, 8),
reset: make(chan struct{}),
rDeadline: makePipeDeadline(),
wDeadline: makePipeDeadline(),
mp: mp,
id: id,
name: name,
dataIn: make(chan []byte, 8),
rDeadline: makePipeDeadline(),
wDeadline: makePipeDeadline(),
mp: mp,
writeCancel: make(chan struct{}),
readCancel: make(chan struct{}),
}

s.closedLocal, s.doCloseLocal = context.WithCancel(context.Background())
return
}

Expand Down Expand Up @@ -168,7 +167,7 @@ func (mp *Multiplex) IsClosed() bool {
}
}

func (mp *Multiplex) sendMsg(done <-chan struct{}, header uint64, data []byte) error {
func (mp *Multiplex) sendMsg(timeout, cancel <-chan struct{}, header uint64, data []byte) error {
buf := pool.Get(len(data) + 20)

n := 0
Expand All @@ -181,8 +180,10 @@ func (mp *Multiplex) sendMsg(done <-chan struct{}, header uint64, data []byte) e
return nil
case <-mp.shutdown:
return ErrShutdown
case <-done:
case <-timeout:
return errTimeout
case <-cancel:
return ErrStreamClosed
}
}

Expand Down Expand Up @@ -321,7 +322,7 @@ func (mp *Multiplex) NewNamedStream(name string) (*Stream, error) {
ctx, cancel := context.WithTimeout(context.Background(), NewStreamTimeout)
defer cancel()

err := mp.sendMsg(ctx.Done(), header, []byte(name))
err := mp.sendMsg(ctx.Done(), nil, header, []byte(name))
if err != nil {
return nil, err
}
Expand All @@ -331,23 +332,20 @@ func (mp *Multiplex) NewNamedStream(name string) (*Stream, error) {

func (mp *Multiplex) cleanup() {
mp.closeNoWait()

// Take the channels.
mp.chLock.Lock()
defer mp.chLock.Unlock()
for _, msch := range mp.channels {
msch.clLock.Lock()
if !msch.closedRemote {
msch.closedRemote = true
// Cancel readers
close(msch.reset)
}
channels := mp.channels
mp.channels = nil
mp.chLock.Unlock()

msch.doCloseLocal()
msch.clLock.Unlock()
// Cancel any reads/writes
for _, msch := range channels {
msch.cancelRead(ErrStreamReset)
msch.cancelWrite(ErrStreamReset)
}
// Don't remove this nil assignment. We check if this is nil to check if
// the connection is closed when we already have the lock (faster than
// checking if the stream is closed).
mp.channels = nil

// And... shutdown!
if mp.shutdownErr == nil {
mp.shutdownErr = ErrShutdown
}
Expand Down Expand Up @@ -421,81 +419,43 @@ func (mp *Multiplex) handleIncoming() {
// This is *ok*. We forget the stream on reset.
continue
}
msch.clLock.Lock()

isClosed := msch.isClosed()

if !msch.closedRemote {
close(msch.reset)
msch.closedRemote = true
}

if !isClosed {
msch.doCloseLocal()
}

msch.clLock.Unlock()

msch.cancelDeadlines()

mp.chLock.Lock()
delete(mp.channels, ch)
mp.chLock.Unlock()
// Cancel any ongoing reads/writes.
msch.cancelRead(ErrStreamReset)
msch.cancelWrite(ErrStreamReset)
case closeTag:
if !ok {
// may have canceled our reads already.
continue
}

msch.clLock.Lock()

if msch.closedRemote {
msch.clLock.Unlock()
// Technically a bug on the other side. We
// should consider killing the connection.
continue
}
// unregister and throw away future data.
mp.chLock.Lock()
delete(mp.channels, ch)
mp.chLock.Unlock()

// close data channel, there will be no more data.
close(msch.dataIn)
msch.closedRemote = true

cleanup := msch.isClosed()

msch.clLock.Unlock()

if cleanup {
msch.cancelDeadlines()
mp.chLock.Lock()
delete(mp.channels, ch)
mp.chLock.Unlock()
}
// We intentionally don't cancel any deadlines, cancel reads, cancel
// writes, etc. We just deliver the EOF by closing the
// data channel, and unregister the channel so we don't
// receive any more data. The user still needs to call
// `Close()` or `Reset()`.
case messageTag:
if !ok {
// reset stream, return b
pool.Put(b)

// This is a perfectly valid case when we reset
// and forget about the stream.
log.Debugf("message for non-existant stream, dropping data: %d", ch)
// go mp.sendResetMsg(ch.header(resetTag), false)
continue
}

msch.clLock.Lock()
remoteClosed := msch.closedRemote
msch.clLock.Unlock()
if remoteClosed {
// closed stream, return b
// We're not accepting data on this stream, for
// some reason. It's likely that we reset it, or
// simply canceled reads (e.g., called Close).
pool.Put(b)

log.Warnf("Received data from remote after stream was closed by them. (len = %d)", len(b))
// go mp.sendResetMsg(msch.id.header(resetTag), false)
continue
}

recvTimeout.Reset(ReceiveTimeout)
select {
case msch.dataIn <- b:
case <-msch.reset:
case <-msch.readCancel:
// the user has canceled reading. walk away.
pool.Put(b)
case <-recvTimeout.C:
pool.Put(b)
Expand Down Expand Up @@ -534,7 +494,7 @@ func (mp *Multiplex) sendResetMsg(header uint64, hard bool) {
ctx, cancel := context.WithTimeout(context.Background(), ResetStreamTimeout)
defer cancel()

err := mp.sendMsg(ctx.Done(), header, nil)
err := mp.sendMsg(ctx.Done(), nil, header, nil)
if err != nil && !mp.isShutdown() {
if hard {
log.Warnf("error sending reset message: %s; killing connection", err.Error())
Expand Down
Loading

0 comments on commit 510b0c1

Please sign in to comment.