From d3b4611595dfdf9b4a1aaaaa50a36576c47d16de Mon Sep 17 00:00:00 2001 From: auxten Date: Tue, 4 Dec 2018 12:14:14 +0800 Subject: [PATCH 1/3] Expose Stream.die to Stream.Die for canceling running golang RPC func --- stream.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/stream.go b/stream.go index 2ce00d2..f64b000 100644 --- a/stream.go +++ b/stream.go @@ -20,7 +20,7 @@ type Stream struct { bufferLock sync.Mutex frameSize int chReadEvent chan struct{} // notify a read event - die chan struct{} // flag the stream has closed + Die chan struct{} // flag the stream has closed dieLock sync.Mutex readDeadline atomic.Value writeDeadline atomic.Value @@ -33,7 +33,7 @@ func newStream(id uint32, frameSize int, sess *Session) *Stream { s.chReadEvent = make(chan struct{}, 1) s.frameSize = frameSize s.sess = sess - s.die = make(chan struct{}) + s.Die = make(chan struct{}) return s } @@ -46,7 +46,7 @@ func (s *Stream) ID() uint32 { func (s *Stream) Read(b []byte) (n int, err error) { if len(b) == 0 { select { - case <-s.die: + case <-s.Die: return 0, errors.New(errBrokenPipe) default: return 0, nil @@ -78,7 +78,7 @@ READ: goto READ case <-deadline: return n, errTimeout - case <-s.die: + case <-s.Die: return 0, errors.New(errBrokenPipe) } } @@ -93,7 +93,7 @@ func (s *Stream) Write(b []byte) (n int, err error) { } select { - case <-s.die: + case <-s.Die: return 0, errors.New(errBrokenPipe) default: } @@ -108,7 +108,7 @@ func (s *Stream) Write(b []byte) (n int, err error) { select { case s.sess.writes <- req: - case <-s.die: + case <-s.Die: return sent, errors.New(errBrokenPipe) case <-deadline: return sent, errTimeout @@ -120,7 +120,7 @@ func (s *Stream) Write(b []byte) (n int, err error) { if result.err != nil { return sent, result.err } - case <-s.die: + case <-s.Die: return sent, errors.New(errBrokenPipe) case <-deadline: return sent, errTimeout @@ -134,11 +134,11 @@ func (s *Stream) Close() error { s.dieLock.Lock() select { - case <-s.die: + case <-s.Die: s.dieLock.Unlock() return errors.New(errBrokenPipe) default: - close(s.die) + close(s.Die) s.dieLock.Unlock() s.sess.streamClosed(s.id) _, err := s.sess.writeFrame(newFrame(cmdFIN, s.id)) @@ -181,9 +181,9 @@ func (s *Stream) sessionClose() { defer s.dieLock.Unlock() select { - case <-s.die: + case <-s.Die: default: - close(s.die) + close(s.Die) } } From adfc81dd78346133628bd13ba49b35ebca8e26dc Mon Sep 17 00:00:00 2001 From: auxten Date: Wed, 5 Dec 2018 11:24:20 +0800 Subject: [PATCH 2/3] Add Stream.GetDieCh which returns a readonly chan to get notified when the stream is to be closed --- session_test.go | 16 ++++++++++++++++ stream.go | 28 +++++++++++++++++----------- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/session_test.go b/session_test.go index 580935c..b02dbdd 100644 --- a/session_test.go +++ b/session_test.go @@ -86,6 +86,22 @@ func TestEcho(t *testing.T) { session.Close() } +func TestGetDieCh(t *testing.T) { + cs, ss, err := getSmuxStreamPair() + if err != nil { + t.Fatal(err) + } + dieCh := ss.GetDieCh() + go func() { + select { + case <-dieCh: + case <-time.Tick(time.Second): + t.Fatal("wait die chan timeout") + } + }() + cs.Close() +} + func TestSpeed(t *testing.T) { _, stop, cli, err := setupServer(t) if err != nil { diff --git a/stream.go b/stream.go index f64b000..2a2b82f 100644 --- a/stream.go +++ b/stream.go @@ -20,7 +20,7 @@ type Stream struct { bufferLock sync.Mutex frameSize int chReadEvent chan struct{} // notify a read event - Die chan struct{} // flag the stream has closed + die chan struct{} // flag the stream has closed dieLock sync.Mutex readDeadline atomic.Value writeDeadline atomic.Value @@ -33,7 +33,7 @@ func newStream(id uint32, frameSize int, sess *Session) *Stream { s.chReadEvent = make(chan struct{}, 1) s.frameSize = frameSize s.sess = sess - s.Die = make(chan struct{}) + s.die = make(chan struct{}) return s } @@ -46,7 +46,7 @@ func (s *Stream) ID() uint32 { func (s *Stream) Read(b []byte) (n int, err error) { if len(b) == 0 { select { - case <-s.Die: + case <-s.die: return 0, errors.New(errBrokenPipe) default: return 0, nil @@ -78,7 +78,7 @@ READ: goto READ case <-deadline: return n, errTimeout - case <-s.Die: + case <-s.die: return 0, errors.New(errBrokenPipe) } } @@ -93,7 +93,7 @@ func (s *Stream) Write(b []byte) (n int, err error) { } select { - case <-s.Die: + case <-s.die: return 0, errors.New(errBrokenPipe) default: } @@ -108,7 +108,7 @@ func (s *Stream) Write(b []byte) (n int, err error) { select { case s.sess.writes <- req: - case <-s.Die: + case <-s.die: return sent, errors.New(errBrokenPipe) case <-deadline: return sent, errTimeout @@ -120,7 +120,7 @@ func (s *Stream) Write(b []byte) (n int, err error) { if result.err != nil { return sent, result.err } - case <-s.Die: + case <-s.die: return sent, errors.New(errBrokenPipe) case <-deadline: return sent, errTimeout @@ -134,11 +134,11 @@ func (s *Stream) Close() error { s.dieLock.Lock() select { - case <-s.Die: + case <-s.die: s.dieLock.Unlock() return errors.New(errBrokenPipe) default: - close(s.Die) + close(s.die) s.dieLock.Unlock() s.sess.streamClosed(s.id) _, err := s.sess.writeFrame(newFrame(cmdFIN, s.id)) @@ -146,6 +146,12 @@ func (s *Stream) Close() error { } } +// GetDieCh returns a readonly chan which can be readable +// when the stream is to be closed. +func (s *Stream) GetDieCh() <-chan struct{} { + return s.die +} + // SetReadDeadline sets the read deadline as defined by // net.Conn.SetReadDeadline. // A zero time value disables the deadline. @@ -181,9 +187,9 @@ func (s *Stream) sessionClose() { defer s.dieLock.Unlock() select { - case <-s.Die: + case <-s.die: default: - close(s.Die) + close(s.die) } } From 9a596215cd16be6aaa415f929af9a1b8898948ff Mon Sep 17 00:00:00 2001 From: auxten Date: Wed, 5 Dec 2018 11:34:04 +0800 Subject: [PATCH 3/3] Fix forget to close server session --- session_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/session_test.go b/session_test.go index b02dbdd..9aca780 100644 --- a/session_test.go +++ b/session_test.go @@ -91,6 +91,7 @@ func TestGetDieCh(t *testing.T) { if err != nil { t.Fatal(err) } + defer ss.Close() dieCh := ss.GetDieCh() go func() { select {