Skip to content

Commit

Permalink
remove keep-alives
Browse files Browse the repository at this point in the history
Keeping a connection alive is a responsibility of the underlying
transport, not of the stream multiplexer.
  • Loading branch information
marten-seemann committed Feb 16, 2021
1 parent f213fd7 commit 14ee6f4
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 168 deletions.
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ Yamux features include:
* Flow control
* Avoid starvation
* Back-pressure to prevent overwhelming a receiver
* Keep Alives
* Enables persistent connections over a load balancer
* Efficient
* Enables thousands of logical streams with low overhead

Expand Down
3 changes: 0 additions & 3 deletions const.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ var (
// ErrConnectionWriteTimeout indicates that we hit the "safety valve"
// timeout writing to the underlying stream connection.
ErrConnectionWriteTimeout = &Error{msg: "connection write timeout", timeout: true}

// ErrKeepAliveTimeout is sent if a missed keepalive caused the stream close
ErrKeepAliveTimeout = &Error{msg: "keepalive timeout", timeout: true}
)

const (
Expand Down
12 changes: 0 additions & 12 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@ type Config struct {
// PingBacklog is used to limit how many ping acks we can queue.
PingBacklog int

// EnableKeepalive is used to do a period keep alive
// messages using a ping.
EnableKeepAlive bool

// KeepAliveInterval is how often to perform the keep alive
KeepAliveInterval time.Duration

// ConnectionWriteTimeout is meant to be a "safety valve" timeout after
// we which will suspect a problem with the underlying connection and
// close it. This is only applied to writes, where's there's generally
Expand Down Expand Up @@ -57,8 +50,6 @@ func DefaultConfig() *Config {
return &Config{
AcceptBacklog: 256,
PingBacklog: 32,
EnableKeepAlive: true,
KeepAliveInterval: 30 * time.Second,
ConnectionWriteTimeout: 10 * time.Second,
MaxStreamWindowSize: initialStreamWindow,
LogOutput: os.Stderr,
Expand All @@ -73,9 +64,6 @@ func VerifyConfig(config *Config) error {
if config.AcceptBacklog <= 0 {
return fmt.Errorf("backlog must be positive")
}
if config.KeepAliveInterval == 0 {
return fmt.Errorf("keep-alive interval must be positive")
}
if config.MaxStreamWindowSize < initialStreamWindow {
return fmt.Errorf("MaxStreamWindowSize must be larger than %d", initialStreamWindow)
}
Expand Down
72 changes: 0 additions & 72 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,6 @@ type Session struct {
shutdownErr error
shutdownCh chan struct{}
shutdownLock sync.Mutex

// keepaliveTimer is a periodic timer for keepalive messages. It's nil
// when keepalives are disabled.
keepaliveLock sync.Mutex
keepaliveTimer *time.Timer
keepaliveActive bool
}

// newSession is used to construct a new session
Expand Down Expand Up @@ -124,9 +118,6 @@ func newSession(config *Config, conn net.Conn, client bool, readBuf int) *Sessio
} else {
s.nextStreamID = 2
}
if config.EnableKeepAlive {
s.startKeepalive()
}
go s.recv()
go s.send()
return s
Expand Down Expand Up @@ -255,7 +246,6 @@ func (s *Session) Close() error {
}
close(s.shutdownCh)
s.conn.Close()
s.stopKeepalive()
<-s.recvDoneCh
<-s.sendDoneCh

Expand Down Expand Up @@ -350,62 +340,6 @@ func (s *Session) Ping() (dur time.Duration, err error) {
return time.Since(start), nil
}

// startKeepalive starts the keepalive process.
func (s *Session) startKeepalive() {
s.keepaliveLock.Lock()
defer s.keepaliveLock.Unlock()
s.keepaliveTimer = time.AfterFunc(s.config.KeepAliveInterval, func() {
s.keepaliveLock.Lock()
if s.keepaliveTimer == nil || s.keepaliveActive {
// keepalives have been stopped or a keepalive is active.
s.keepaliveLock.Unlock()
return
}
s.keepaliveActive = true
s.keepaliveLock.Unlock()

_, err := s.Ping()

s.keepaliveLock.Lock()
s.keepaliveActive = false
if s.keepaliveTimer != nil {
s.keepaliveTimer.Reset(s.config.KeepAliveInterval)
}
s.keepaliveLock.Unlock()

if err != nil {
s.logger.Printf("[ERR] yamux: keepalive failed: %v", err)
s.exitErr(ErrKeepAliveTimeout)
}
})
}

// stopKeepalive stops the keepalive process.
func (s *Session) stopKeepalive() {
s.keepaliveLock.Lock()
defer s.keepaliveLock.Unlock()
if s.keepaliveTimer != nil {
s.keepaliveTimer.Stop()
s.keepaliveTimer = nil
}
}

func (s *Session) extendKeepalive() {
s.keepaliveLock.Lock()
if s.keepaliveTimer != nil && !s.keepaliveActive {
// Don't stop the timer and drain the channel. This is an
// AfterFunc, not a normal timer, and any attempts to drain the
// channel will block forever.
//
// Go will stop the timer for us internally anyways. The docs
// say one must stop the timer before calling reset but that's
// to ensure that the timer doesn't end up firing immediately
// after calling Reset.
s.keepaliveTimer.Reset(s.config.KeepAliveInterval)
}
s.keepaliveLock.Unlock()
}

// send sends the header and body.
func (s *Session) sendMsg(hdr header, body []byte, deadline <-chan struct{}) error {
select {
Expand Down Expand Up @@ -571,12 +505,6 @@ func (s *Session) recvLoop() error {
return err
}

// Reset the keepalive timer every time we receive data.
// There's no reason to keepalive if we're active. Worse, if the
// peer is busy sending us stuff, the pong might get stuck
// behind a bunch of data.
s.extendKeepalive()

// Verify the version
if hdr.Version() != protoVersion {
s.logger.Printf("[ERR] yamux: Invalid protocol version: %d", hdr.Version())
Expand Down
2 changes: 1 addition & 1 deletion session_norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

func TestSession_PingOfDeath(t *testing.T) {
conf := testConfNoKeepAlive()
conf := testConf()
// This test is slow and can easily time out on writes on CI.
//
// In the future, we might want to prioritize ping-replies over even
Expand Down
83 changes: 5 additions & 78 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,10 @@ func testConn() (conn1, conn2 net.Conn) {
func testConf() *Config {
conf := DefaultConfig()
conf.AcceptBacklog = 64
conf.KeepAliveInterval = 100 * time.Millisecond
conf.ConnectionWriteTimeout = 350 * time.Millisecond
return conf
}

func testConfNoKeepAlive() *Config {
conf := testConf()
conf.EnableKeepAlive = false
return conf
}

func testClientServer() (*Session, *Session) {
return testClientServerConfig(testConf())
}
Expand Down Expand Up @@ -993,71 +986,6 @@ func TestBacklogExceeded(t *testing.T) {
}
}

func TestKeepAlive(t *testing.T) {
client, server := testClientServer()
defer client.Close()
defer server.Close()

time.Sleep(200 * time.Millisecond)

// Ping value should increase
client.pingLock.Lock()
defer client.pingLock.Unlock()
if client.pingID == 0 {
t.Fatalf("should ping")
}

server.pingLock.Lock()
defer server.pingLock.Unlock()
if server.pingID == 0 {
t.Fatalf("should ping")
}
}

func TestKeepAlive_Timeout(t *testing.T) {
conn1, conn2 := testConn()

clientConf := testConf()
clientConf.ConnectionWriteTimeout = time.Hour // We're testing keep alives, not connection writes
clientConf.EnableKeepAlive = false // Just test one direction, so it's deterministic who hangs up on whom
client, _ := Client(conn1, clientConf)
defer client.Close()

serverLogs := new(logCapture)
serverConf := testConf()
serverConf.LogOutput = serverLogs

server, _ := Server(conn2, serverConf)
defer server.Close()

errCh := make(chan error, 1)
go func() {
_, err := server.Accept() // Wait until server closes
errCh <- err
}()

// Prevent the client from responding
clientConn := client.conn.(*pipeConn)
clientConn.BlockWrites()

select {
case err := <-errCh:
if err != ErrKeepAliveTimeout {
t.Fatalf("unexpected error: %v", err)
}
case <-time.After(1 * time.Second):
t.Fatalf("timeout waiting for timeout")
}

if !server.IsClosed() {
t.Fatalf("server should have closed")
}

if !serverLogs.match([]string{"[ERR] yamux: keepalive failed: i/o deadline reached"}) {
t.Fatalf("server log incorect: %v", serverLogs.logs())
}
}

type UnlimitedReader struct{}

func (u *UnlimitedReader) Read(p []byte) (int, error) {
Expand Down Expand Up @@ -1100,7 +1028,7 @@ func TestBacklogExceeded_Accept(t *testing.T) {
}

func TestSession_WindowUpdateWriteDuringRead(t *testing.T) {
client, server := testClientServerConfig(testConfNoKeepAlive())
client, server := testClientServerConfig(testConf())
defer client.Close()
defer server.Close()

Expand Down Expand Up @@ -1163,7 +1091,7 @@ func TestSession_WindowUpdateWriteDuringRead(t *testing.T) {
}

func TestSession_PartialReadWindowUpdate(t *testing.T) {
client, server := testClientServerConfig(testConfNoKeepAlive())
client, server := testClientServerConfig(testConf())
defer client.Close()
defer server.Close()

Expand Down Expand Up @@ -1238,7 +1166,7 @@ func TestSession_PartialReadWindowUpdate(t *testing.T) {
}

func TestSession_sendMsg_Timeout(t *testing.T) {
client, server := testClientServerConfig(testConfNoKeepAlive())
client, server := testClientServerConfig(testConf())
defer client.Close()
defer server.Close()

Expand All @@ -1265,7 +1193,7 @@ func TestWindowOverflow(t *testing.T) {
// 2. We unlock after resetting the stream.
for i := uint32(1); i < 100; i += 2 {
func() {
client, server := testClientServerConfig(testConfNoKeepAlive())
client, server := testClientServerConfig(testConf())
defer client.Close()
defer server.Close()

Expand All @@ -1287,7 +1215,7 @@ func TestWindowOverflow(t *testing.T) {
}

func TestSession_ConnectionWriteTimeout(t *testing.T) {
client, server := testClientServerConfig(testConfNoKeepAlive())
client, server := testClientServerConfig(testConf())
defer client.Close()
defer server.Close()

Expand Down Expand Up @@ -1463,7 +1391,6 @@ func TestStreamResetRead(t *testing.T) {

func TestLotsOfWritesWithStreamDeadline(t *testing.T) {
config := testConf()
config.EnableKeepAlive = false

client, server := testClientServerConfig(config)
defer client.Close()
Expand Down

0 comments on commit 14ee6f4

Please sign in to comment.