Skip to content

Commit

Permalink
Implementation for server enforcement of keepalive policy. (#1147)
Browse files Browse the repository at this point in the history
Implementation of server enforcement of keepalive policy.
Server will close connection with a client that violates this policy.
Policy parameters:
 - MinTime is the minimum amount of time a client should wait before sending a keepalive ping.
 - If PermitWithoutStream true, server expects keepalive pings even when there are no active streams(RPCs).
  • Loading branch information
MakMukhi authored Mar 31, 2017
1 parent 7fc29d0 commit b2fae0c
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 9 deletions.
23 changes: 17 additions & 6 deletions keepalive/keepalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,40 @@ import (
// ClientParameters is used to set keepalive parameters on the client-side.
// These configure how the client will actively probe to notice when a connection broken
// and to cause activity so intermediaries are aware the connection is still in use.
// Make sure these parameters are set in coordination with the keepalive policy on the server,
// as incompatible settings can result in closing of connection.
type ClientParameters struct {
// After a duration of this time if the client doesn't see any activity it pings the server to see if the transport is still alive.
Time time.Duration // The current default value is infinity.
// After having pinged for keepalive check, the client waits for a duration of Timeout and if no activity is seen even after that
// the connection is closed.
Timeout time.Duration // The current default value is 20 seconds.
// If true, client runs keepalive checks even with no active RPCs.
PermitWithoutStream bool
PermitWithoutStream bool // false by default.
}

// ServerParameters is used to set keepalive and max-age parameters on the server-side.
type ServerParameters struct {
// MaxConnectionIdle is a duration for the amount of time after which an idle connection would be closed by sending a GoAway.
// Idleness duration is defined since the most recent time the number of outstanding RPCs became zero or the connection establishment.
MaxConnectionIdle time.Duration
MaxConnectionIdle time.Duration // The current default value is infinity.
// MaxConnectionAge is a duration for the maximum amount of time a connection may exist before it will be closed by sending a GoAway.
// A random jitter of +/-10% will be added to MaxConnectionAge to spread out connection storms.
MaxConnectionAge time.Duration
MaxConnectionAge time.Duration // The current default value is infinity.
// MaxConnectinoAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed.
MaxConnectionAgeGrace time.Duration
MaxConnectionAgeGrace time.Duration // The current default value is infinity.
// After a duration of this time if the server doesn't see any activity it pings the client to see if the transport is still alive.
Time time.Duration
Time time.Duration // The current default value is 2 hours.
// After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that
// the connection is closed.
Timeout time.Duration
Timeout time.Duration // The current default value is 20 seconds.
}

// EnforcementPolicy is used to set keepalive enforcement policy on the server-side.
// Server will close connection with a client that violates this policy.
type EnforcementPolicy struct {
// MinTime is the minimum amount of time a client should wait before sending a keepalive ping.
MinTime time.Duration // The current default value is 5 minutes.
// If true, server expects keepalive pings even when there are no active streams(RPCs).
PermitWithoutStream bool // false by default.
}
9 changes: 9 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ type options struct {
useHandlerImpl bool // use http.Handler-based server
unknownStreamDesc *StreamDesc
keepaliveParams keepalive.ServerParameters
keepalivePolicy keepalive.EnforcementPolicy
}

var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
Expand All @@ -133,6 +134,13 @@ func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
}
}

// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
return func(o *options) {
o.keepalivePolicy = kep
}
}

// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
func CustomCodec(codec Codec) ServerOption {
return func(o *options) {
Expand Down Expand Up @@ -479,6 +487,7 @@ func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo)
InTapHandle: s.opts.inTapHandle,
StatsHandler: s.opts.statsHandler,
KeepaliveParams: s.opts.keepaliveParams,
KeepalivePolicy: s.opts.keepalivePolicy,
}
st, err := transport.NewServerTransport("http2", c, config)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions transport/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
defaultMaxConnectionAgeGrace = infinity
defaultServerKeepaliveTime = time.Duration(2 * time.Hour)
defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
)

// The following defines various control items which could flow through
Expand Down Expand Up @@ -84,6 +85,8 @@ type resetStream struct {
func (*resetStream) item() {}

type goAway struct {
code http2.ErrCode
debugData []byte
}

func (*goAway) item() {}
Expand Down
3 changes: 3 additions & 0 deletions transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,9 @@ func (t *http2Client) handlePing(f *http2.PingFrame) {
}

func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
grpclog.Printf("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
}
t.mu.Lock()
if t.state == reachable || t.state == draining {
if f.LastStreamID > 0 && f.LastStreamID%2 != 1 {
Expand Down
76 changes: 73 additions & 3 deletions transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ type http2Server struct {
// Keepalive and max-age parameters for the server.
kp keepalive.ServerParameters

// Keepalive enforcement policy.
kep keepalive.EnforcementPolicy
// The time instance last ping was received.
lastPingAt time.Time
// Number of times the client has violated keepalive ping policy so far.
pingStrikes uint8
// Flag to signify that number of ping strikes should be reset to 0.
// This is set whenever data or header frames are sent.
// 1 means yes.
resetPingStrikes uint32 // Accessed atomically.

mu sync.Mutex // guard the following
state transportState
activeStreams map[uint32]*Stream
Expand Down Expand Up @@ -161,6 +172,10 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
if kp.Timeout == 0 {
kp.Timeout = defaultServerKeepaliveTimeout
}
kep := config.KeepalivePolicy
if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime
}
var buf bytes.Buffer
t := &http2Server{
ctx: context.Background(),
Expand All @@ -184,6 +199,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
stats: config.StatsHandler,
kp: kp,
idle: time.Now(),
kep: kep,
}
if t.stats != nil {
t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
Expand Down Expand Up @@ -504,13 +520,50 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
t.controlBuf.put(&settings{ack: true, ss: ss})
}

const (
maxPingStrikes = 2
defaultPingTimeout = 2 * time.Hour
)

func (t *http2Server) handlePing(f *http2.PingFrame) {
if f.IsAck() { // Do nothing.
return
}
pingAck := &ping{ack: true}
copy(pingAck.data[:], f.Data[:])
t.controlBuf.put(pingAck)

now := time.Now()
defer func() {
t.lastPingAt = now
}()
// A reset ping strikes means that we don't need to check for policy
// violation for this ping and the pingStrikes counter should be set
// to 0.
if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
t.pingStrikes = 0
return
}
t.mu.Lock()
ns := len(t.activeStreams)
t.mu.Unlock()
if ns < 1 && !t.kep.PermitWithoutStream {
// Keepalive shouldn't be active thus, this new ping should
// have come after atleast defaultPingTimeout.
if t.lastPingAt.Add(defaultPingTimeout).After(now) {
t.pingStrikes++
}
} else {
// Check if keepalive policy is respected.
if t.lastPingAt.Add(t.kep.MinTime).After(now) {
t.pingStrikes++
}
}

if t.pingStrikes > maxPingStrikes {
// Send goaway and close the connection.
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings")})
}
}

func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
Expand All @@ -529,6 +582,13 @@ func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) e
first := true
endHeaders := false
var err error
defer func() {
if err == nil {
// Reset ping strikes when seding headers since that might cause the
// peer to send ping.
atomic.StoreUint32(&t.resetPingStrikes, 1)
}
}()
// Sends the headers in a single batch.
for !endHeaders {
size := t.hBuf.Len()
Expand Down Expand Up @@ -672,7 +732,7 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s

// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
// is returns if it fails (e.g., framing error, transport error).
func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
// TODO(zhaoq): Support multi-writers for a single stream.
var writeHeaderFrame bool
s.mu.Lock()
Expand All @@ -687,6 +747,13 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
if writeHeaderFrame {
t.WriteHeader(s, nil)
}
defer func() {
if err == nil {
// Reset ping strikes when sending data since this might cause
// the peer to send ping.
atomic.StoreUint32(&t.resetPingStrikes, 1)
}
}()
r := bytes.NewBuffer(data)
for {
if r.Len() == 0 {
Expand Down Expand Up @@ -892,7 +959,10 @@ func (t *http2Server) controller() {
sid := t.maxStreamID
t.state = draining
t.mu.Unlock()
t.framer.writeGoAway(true, sid, http2.ErrCodeNo, nil)
t.framer.writeGoAway(true, sid, i.code, i.debugData)
if i.code == http2.ErrCodeEnhanceYourCalm {
t.Close()
}
case *flushIO:
t.framer.flushWrite()
case *ping:
Expand Down Expand Up @@ -972,7 +1042,7 @@ func (t *http2Server) RemoteAddr() net.Addr {
}

func (t *http2Server) Drain() {
t.controlBuf.put(&goAway{})
t.controlBuf.put(&goAway{code: http2.ErrCodeNo})
}

var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
Expand Down
1 change: 1 addition & 0 deletions transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ type ServerConfig struct {
InTapHandle tap.ServerInHandle
StatsHandler stats.Handler
KeepaliveParams keepalive.ServerParameters
KeepalivePolicy keepalive.EnforcementPolicy
}

// NewServerTransport creates a ServerTransport with conn or non-nil error
Expand Down
Loading

0 comments on commit b2fae0c

Please sign in to comment.