Skip to content

Commit

Permalink
Merge pull request #81 from libp2p/feat/rw-close
Browse files Browse the repository at this point in the history
Implement new CloseWrite/CloseRead interface
  • Loading branch information
Stebalien authored Sep 2, 2020
2 parents 6ee3b24 + 7bfef51 commit 67680fb
Show file tree
Hide file tree
Showing 8 changed files with 386 additions and 192 deletions.
6 changes: 3 additions & 3 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestSmallPackets(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if slowdown > 0.15 {
if slowdown > 0.15 && !raceEnabled {
t.Fatalf("Slowdown from mplex was >15%%: %f", slowdown)
}
}
Expand Down Expand Up @@ -90,6 +90,7 @@ func testSmallPackets(b *testing.B, n1, n2 net.Conn) {
wg.Add(1)
go func() {
defer wg.Done()
defer localB.Close()
receiveBuf := make([]byte, 2048)

for {
Expand All @@ -103,7 +104,7 @@ func testSmallPackets(b *testing.B, n1, n2 net.Conn) {
atomic.AddUint64(&receivedBytes, uint64(n))
}
}()

defer localA.Close()
i := 0
for {
n, err := localA.Write(msgs[i])
Expand All @@ -116,7 +117,6 @@ func testSmallPackets(b *testing.B, n1, n2 net.Conn) {
break
}
}
localA.Close()
})
b.StopTimer()
wg.Wait()
Expand Down
17 changes: 17 additions & 0 deletions deadline.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ func (d *pipeDeadline) set(t time.Time) {
d.mu.Lock()
defer d.mu.Unlock()

// deadline closed
if d.cancel == nil {
return
}

if d.timer != nil && !d.timer.Stop() {
<-d.cancel // Wait for the timer callback to finish and close cancel
}
Expand Down Expand Up @@ -70,6 +75,18 @@ func (d *pipeDeadline) wait() chan struct{} {
return d.cancel
}

// close closes, the deadline. Any future calls to `set` will do nothing.
func (d *pipeDeadline) close() {
d.mu.Lock()
defer d.mu.Unlock()

if d.timer != nil && !d.timer.Stop() {
<-d.cancel // Wait for the timer callback to finish and close cancel
}
d.timer = nil
d.cancel = nil
}

func isClosedChan(c <-chan struct{}) bool {
select {
case <-c:
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/libp2p/go-libp2p-testing v0.1.2-0.20200422005655-8775583591d8
github.com/multiformats/go-varint v0.0.6
github.com/opentracing/opentracing-go v1.2.0 // indirect
go.uber.org/multierr v1.5.0
go.uber.org/zap v1.15.0 // indirect
golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443 // indirect
google.golang.org/grpc v1.28.1
Expand Down
128 changes: 44 additions & 84 deletions multiplex.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,15 @@ func NewMultiplex(con net.Conn, initiator bool) *Multiplex {

func (mp *Multiplex) newStream(id streamID, name string) (s *Stream) {
s = &Stream{
id: id,
name: name,
dataIn: make(chan []byte, 8),
reset: make(chan struct{}),
rDeadline: makePipeDeadline(),
wDeadline: makePipeDeadline(),
mp: mp,
id: id,
name: name,
dataIn: make(chan []byte, 8),
rDeadline: makePipeDeadline(),
wDeadline: makePipeDeadline(),
mp: mp,
writeCancel: make(chan struct{}),
readCancel: make(chan struct{}),
}

s.closedLocal, s.doCloseLocal = context.WithCancel(context.Background())
return
}

Expand Down Expand Up @@ -168,7 +167,7 @@ func (mp *Multiplex) IsClosed() bool {
}
}

func (mp *Multiplex) sendMsg(done <-chan struct{}, header uint64, data []byte) error {
func (mp *Multiplex) sendMsg(timeout, cancel <-chan struct{}, header uint64, data []byte) error {
buf := pool.Get(len(data) + 20)

n := 0
Expand All @@ -181,8 +180,10 @@ func (mp *Multiplex) sendMsg(done <-chan struct{}, header uint64, data []byte) e
return nil
case <-mp.shutdown:
return ErrShutdown
case <-done:
case <-timeout:
return errTimeout
case <-cancel:
return ErrStreamClosed
}
}

Expand Down Expand Up @@ -321,7 +322,7 @@ func (mp *Multiplex) NewNamedStream(name string) (*Stream, error) {
ctx, cancel := context.WithTimeout(context.Background(), NewStreamTimeout)
defer cancel()

err := mp.sendMsg(ctx.Done(), header, []byte(name))
err := mp.sendMsg(ctx.Done(), nil, header, []byte(name))
if err != nil {
return nil, err
}
Expand All @@ -331,23 +332,20 @@ func (mp *Multiplex) NewNamedStream(name string) (*Stream, error) {

func (mp *Multiplex) cleanup() {
mp.closeNoWait()

// Take the channels.
mp.chLock.Lock()
defer mp.chLock.Unlock()
for _, msch := range mp.channels {
msch.clLock.Lock()
if !msch.closedRemote {
msch.closedRemote = true
// Cancel readers
close(msch.reset)
}
channels := mp.channels
mp.channels = nil
mp.chLock.Unlock()

msch.doCloseLocal()
msch.clLock.Unlock()
// Cancel any reads/writes
for _, msch := range channels {
msch.cancelRead(ErrStreamReset)
msch.cancelWrite(ErrStreamReset)
}
// Don't remove this nil assignment. We check if this is nil to check if
// the connection is closed when we already have the lock (faster than
// checking if the stream is closed).
mp.channels = nil

// And... shutdown!
if mp.shutdownErr == nil {
mp.shutdownErr = ErrShutdown
}
Expand Down Expand Up @@ -421,81 +419,43 @@ func (mp *Multiplex) handleIncoming() {
// This is *ok*. We forget the stream on reset.
continue
}
msch.clLock.Lock()

isClosed := msch.isClosed()

if !msch.closedRemote {
close(msch.reset)
msch.closedRemote = true
}

if !isClosed {
msch.doCloseLocal()
}

msch.clLock.Unlock()

msch.cancelDeadlines()

mp.chLock.Lock()
delete(mp.channels, ch)
mp.chLock.Unlock()
// Cancel any ongoing reads/writes.
msch.cancelRead(ErrStreamReset)
msch.cancelWrite(ErrStreamReset)
case closeTag:
if !ok {
// may have canceled our reads already.
continue
}

msch.clLock.Lock()

if msch.closedRemote {
msch.clLock.Unlock()
// Technically a bug on the other side. We
// should consider killing the connection.
continue
}
// unregister and throw away future data.
mp.chLock.Lock()
delete(mp.channels, ch)
mp.chLock.Unlock()

// close data channel, there will be no more data.
close(msch.dataIn)
msch.closedRemote = true

cleanup := msch.isClosed()

msch.clLock.Unlock()

if cleanup {
msch.cancelDeadlines()
mp.chLock.Lock()
delete(mp.channels, ch)
mp.chLock.Unlock()
}
// We intentionally don't cancel any deadlines, cancel reads, cancel
// writes, etc. We just deliver the EOF by closing the
// data channel, and unregister the channel so we don't
// receive any more data. The user still needs to call
// `Close()` or `Reset()`.
case messageTag:
if !ok {
// reset stream, return b
pool.Put(b)

// This is a perfectly valid case when we reset
// and forget about the stream.
log.Debugf("message for non-existant stream, dropping data: %d", ch)
// go mp.sendResetMsg(ch.header(resetTag), false)
continue
}

msch.clLock.Lock()
remoteClosed := msch.closedRemote
msch.clLock.Unlock()
if remoteClosed {
// closed stream, return b
// We're not accepting data on this stream, for
// some reason. It's likely that we reset it, or
// simply canceled reads (e.g., called Close).
pool.Put(b)

log.Warnf("Received data from remote after stream was closed by them. (len = %d)", len(b))
// go mp.sendResetMsg(msch.id.header(resetTag), false)
continue
}

recvTimeout.Reset(ReceiveTimeout)
select {
case msch.dataIn <- b:
case <-msch.reset:
case <-msch.readCancel:
// the user has canceled reading. walk away.
pool.Put(b)
case <-recvTimeout.C:
pool.Put(b)
Expand Down Expand Up @@ -534,7 +494,7 @@ func (mp *Multiplex) sendResetMsg(header uint64, hard bool) {
ctx, cancel := context.WithTimeout(context.Background(), ResetStreamTimeout)
defer cancel()

err := mp.sendMsg(ctx.Done(), header, nil)
err := mp.sendMsg(ctx.Done(), nil, header, nil)
if err != nil && !mp.isShutdown() {
if hard {
log.Warnf("error sending reset message: %s; killing connection", err.Error())
Expand Down
Loading

0 comments on commit 67680fb

Please sign in to comment.