diff --git a/embed/config.go b/embed/config.go index 8d429cb0a5c..3554de1ea24 100644 --- a/embed/config.go +++ b/embed/config.go @@ -39,11 +39,14 @@ const ( ClusterStateFlagNew = "new" ClusterStateFlagExisting = "existing" - DefaultName = "default" - DefaultMaxSnapshots = 5 - DefaultMaxWALs = 5 - DefaultMaxTxnOps = uint(128) - DefaultMaxRequestBytes = 1.5 * 1024 * 1024 + DefaultName = "default" + DefaultMaxSnapshots = 5 + DefaultMaxWALs = 5 + DefaultMaxTxnOps = uint(128) + DefaultMaxRequestBytes = 1.5 * 1024 * 1024 + DefaultGRPCKeepAliveMinTime = 5 * time.Second + DefaultGRPCKeepAliveInterval = 2 * time.Hour + DefaultGRPCKeepAliveTimeout = 20 * time.Second DefaultListenPeerURLs = "http://localhost:2380" DefaultListenClientURLs = "http://localhost:2379" @@ -93,6 +96,23 @@ type Config struct { MaxTxnOps uint `json:"max-txn-ops"` MaxRequestBytes uint `json:"max-request-bytes"` + // gRPC server options + + // GRPCKeepAliveMinTime is the minimum interval that a client should + // wait before pinging server. When client pings "too fast", server + // sends goaway and closes the connection (errors: too_many_pings, + // http2.ErrCodeEnhanceYourCalm). When too slow, nothing happens. + // Server expects client pings only when there is any active streams + // (PermitWithoutStream is set false). + GRPCKeepAliveMinTime time.Duration `json:"grpc-keepalive-min-time"` + // GRPCKeepAliveInterval is the frequency of server-to-client ping + // to check if a connection is alive. Close a non-responsive connection + // after an additional duration of Timeout. 0 to disable. + GRPCKeepAliveInterval time.Duration `json:"grpc-keepalive-interval"` + // GRPCKeepAliveTimeout is the additional duration of wait + // before closing a non-responsive connection. 0 to disable. + GRPCKeepAliveTimeout time.Duration `json:"grpc-keepalive-timeout"` + // clustering APUrls, ACUrls []url.URL @@ -181,25 +201,28 @@ func NewConfig() *Config { lcurl, _ := url.Parse(DefaultListenClientURLs) acurl, _ := url.Parse(DefaultAdvertiseClientURLs) cfg := &Config{ - CorsInfo: &cors.CORSInfo{}, - MaxSnapFiles: DefaultMaxSnapshots, - MaxWalFiles: DefaultMaxWALs, - Name: DefaultName, - SnapCount: etcdserver.DefaultSnapCount, - MaxTxnOps: DefaultMaxTxnOps, - MaxRequestBytes: DefaultMaxRequestBytes, - TickMs: 100, - ElectionMs: 1000, - LPUrls: []url.URL{*lpurl}, - LCUrls: []url.URL{*lcurl}, - APUrls: []url.URL{*apurl}, - ACUrls: []url.URL{*acurl}, - ClusterState: ClusterStateFlagNew, - InitialClusterToken: "etcd-cluster", - StrictReconfigCheck: true, - Metrics: "basic", - EnableV2: true, - AuthToken: "simple", + CorsInfo: &cors.CORSInfo{}, + MaxSnapFiles: DefaultMaxSnapshots, + MaxWalFiles: DefaultMaxWALs, + Name: DefaultName, + SnapCount: etcdserver.DefaultSnapCount, + MaxTxnOps: DefaultMaxTxnOps, + MaxRequestBytes: DefaultMaxRequestBytes, + GRPCKeepAliveMinTime: DefaultGRPCKeepAliveMinTime, + GRPCKeepAliveInterval: DefaultGRPCKeepAliveInterval, + GRPCKeepAliveTimeout: DefaultGRPCKeepAliveTimeout, + TickMs: 100, + ElectionMs: 1000, + LPUrls: []url.URL{*lpurl}, + LCUrls: []url.URL{*lcurl}, + APUrls: []url.URL{*apurl}, + ACUrls: []url.URL{*acurl}, + ClusterState: ClusterStateFlagNew, + InitialClusterToken: "etcd-cluster", + StrictReconfigCheck: true, + Metrics: "basic", + EnableV2: true, + AuthToken: "simple", } cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) return cfg diff --git a/embed/etcd.go b/embed/etcd.go index 6ceb55b7930..b08349fffa9 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -42,6 +42,7 @@ import ( "github.com/soheilhy/cmux" "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "embed") @@ -424,9 +425,23 @@ func (e *Etcd) serve() (err error) { } h = http.Handler(&cors.CORSHandler{Handler: h, Info: e.cfg.CorsInfo}) + gopts := []grpc.ServerOption{} + if e.cfg.GRPCKeepAliveMinTime > time.Duration(0) { + gopts = append(gopts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: e.cfg.GRPCKeepAliveMinTime, + PermitWithoutStream: false, + })) + } + if e.cfg.GRPCKeepAliveInterval > time.Duration(0) && + e.cfg.GRPCKeepAliveTimeout > time.Duration(0) { + gopts = append(gopts, grpc.KeepaliveParams(keepalive.ServerParameters{ + Time: e.cfg.GRPCKeepAliveInterval, + Timeout: e.cfg.GRPCKeepAliveTimeout, + })) + } for _, sctx := range e.sctxs { go func(s *serveCtx) { - e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler)) + e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, h, e.errHandler, gopts...)) }(sctx) } diff --git a/embed/serve.go b/embed/serve.go index 501d5f25363..53add9b0085 100644 --- a/embed/serve.go +++ b/embed/serve.go @@ -66,7 +66,12 @@ func newServeCtx() *serveCtx { // serve accepts incoming connections on the listener l, // creating a new service goroutine for each. The service goroutines // read requests and then call handler to reply to them. -func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlsinfo *transport.TLSInfo, handler http.Handler, errHandler func(error)) error { +func (sctx *serveCtx) serve( + s *etcdserver.EtcdServer, + tlsinfo *transport.TLSInfo, + handler http.Handler, + errHandler func(error), + gopts ...grpc.ServerOption) error { logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0) <-s.ReadyNotify() plog.Info("ready to serve client requests") @@ -77,7 +82,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlsinfo *transport.TLSInfo servLock := v3lock.NewLockServer(v3c) if sctx.insecure { - gs := v3rpc.Server(s, nil) + gs := v3rpc.Server(s, nil, gopts...) sctx.grpcServerC <- gs v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) @@ -111,7 +116,7 @@ func (sctx *serveCtx) serve(s *etcdserver.EtcdServer, tlsinfo *transport.TLSInfo if tlsErr != nil { return tlsErr } - gs := v3rpc.Server(s, tlscfg) + gs := v3rpc.Server(s, tlscfg, gopts...) sctx.grpcServerC <- gs v3electionpb.RegisterElectionServer(gs, servElection) v3lockpb.RegisterLockServer(gs, servLock) diff --git a/etcdmain/config.go b/etcdmain/config.go index 61411292363..27a44b2eeb2 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -143,6 +143,9 @@ func newConfig() *config { fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.") fs.UintVar(&cfg.MaxTxnOps, "max-txn-ops", cfg.MaxTxnOps, "Maximum number of operations permitted in a transaction.") fs.UintVar(&cfg.MaxRequestBytes, "max-request-bytes", cfg.MaxRequestBytes, "Maximum client request size in bytes the server will accept.") + fs.DurationVar(&cfg.GRPCKeepAliveMinTime, "grpc-keepalive-min-time", cfg.Config.GRPCKeepAliveMinTime, "Minimum interval duration that a client should wait before pinging server.") + fs.DurationVar(&cfg.GRPCKeepAliveInterval, "grpc-keepalive-interval", cfg.Config.GRPCKeepAliveInterval, "Frequency duration of server-to-client ping to check if a connection is alive (0 to disable).") + fs.DurationVar(&cfg.GRPCKeepAliveTimeout, "grpc-keepalive-timeout", cfg.Config.GRPCKeepAliveTimeout, "Additional duration of wait before closing a non-responsive connection (0 to disable).") // clustering fs.Var(flags.NewURLsValue(embed.DefaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.") diff --git a/etcdmain/help.go b/etcdmain/help.go index 37a670abdd5..e6d55191f0d 100644 --- a/etcdmain/help.go +++ b/etcdmain/help.go @@ -70,6 +70,12 @@ member flags: maximum number of operations permitted in a transaction. --max-request-bytes '1572864' maximum client request size in bytes the server will accept. + --grpc-keepalive-min-time '5s' + minimum duration interval that a client should wait before pinging server. + --grpc-keepalive-interval '2h' + frequency duration of server-to-client ping to check if a connection is alive (0 to disable). + --grpc-keepalive-timeout '20s' + additional duration of wait before closing a non-responsive connection (0 to disable). clustering flags: diff --git a/etcdserver/api/v3rpc/grpc.go b/etcdserver/api/v3rpc/grpc.go index 862a7dd3eed..452a9a3967c 100644 --- a/etcdserver/api/v3rpc/grpc.go +++ b/etcdserver/api/v3rpc/grpc.go @@ -39,7 +39,7 @@ func init() { grpclog.SetLoggerV2(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr)) } -func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server { +func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server { var opts []grpc.ServerOption opts = append(opts, grpc.CustomCodec(&codec{})) if tls != nil { @@ -50,7 +50,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server { opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes))) opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes)) opts = append(opts, grpc.MaxConcurrentStreams(maxStreams)) - grpcServer := grpc.NewServer(opts...) + grpcServer := grpc.NewServer(append(opts, gopts...)...) pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s)) pb.RegisterWatchServer(grpcServer, NewWatchServer(s))