Skip to content

Commit

Permalink
feat: improve ping accuracy
Browse files Browse the repository at this point in the history
This patch sends pings on their own (unbuffered) channel. This means:

- keepalives never get stuck behind pending writes.
- because this channel is unbuffered, RTT is more accurately measured.
  • Loading branch information
Stebalien committed Nov 6, 2020
1 parent 10c9119 commit c5baab6
Showing 1 changed file with 26 additions and 9 deletions.
35 changes: 26 additions & 9 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ type Session struct {

// sendCh is used to send messages
sendCh chan []byte
// pingCh is used to send pongs (responses to pings)
pongCh chan uint32

// pingCh and pingCh are used to send pings and pongs
pongCh, pingCh chan uint32

// recvDoneCh is closed when recv() exits to avoid a race
// between stream registration and stream shutdown
Expand Down Expand Up @@ -112,6 +113,7 @@ func newSession(config *Config, conn net.Conn, client bool, readBuf int) *Sessio
acceptCh: make(chan *Stream, config.AcceptBacklog),
sendCh: make(chan []byte, 64),
pongCh: make(chan uint32, config.PingBacklog),
pingCh: make(chan uint32),
recvDoneCh: make(chan struct{}),
sendDoneCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
Expand Down Expand Up @@ -312,16 +314,27 @@ func (s *Session) Ping() (dur time.Duration, err error) {
s.pingLock.Unlock()
}()

// Send the ping request
hdr := encode(typePing, flagSYN, 0, activePing.id)
if err := s.sendMsg(hdr, nil, nil); err != nil {
return 0, err
// Send the ping request, waiting at most one connection write timeout
// to flush it.
timer := time.NewTimer(s.config.ConnectionWriteTimeout)
defer timer.Stop()
select {
case s.pingCh <- activePing.id:
case <-timer.C:
return 0, ErrTimeout
case <-s.shutdownCh:
return 0, s.shutdownErr
}

// Wait for a response
// The "time" starts once we've actually sent the ping. Otherwise, we'll
// measure the time it takes to flush the queue as well.
start := time.Now()
timer := time.NewTimer(s.config.ConnectionWriteTimeout)
defer timer.Stop()

// Wait for a response, again waiting at most one write timeout.
if !timer.Stop() {
<-timer.C
}
timer.Reset(s.config.ConnectionWriteTimeout)
select {
case <-activePing.pingResponse:
case <-timer.C:
Expand Down Expand Up @@ -473,6 +486,10 @@ func (s *Session) sendLoop() error {
var buf []byte
select {
case buf = <-s.sendCh:
case pingID := <-s.pingCh:
buf = pool.Get(headerSize)
hdr := encode(typePing, flagSYN, 0, pingID)
copy(buf, hdr[:])
case pingID := <-s.pongCh:
buf = pool.Get(headerSize)
hdr := encode(typePing, flagACK, 0, pingID)
Expand Down

0 comments on commit c5baab6

Please sign in to comment.