From 01be2bf4bb6c3651613f28d526099e0ce4e11d0b Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Wed, 3 Aug 2016 17:50:11 -0700 Subject: [PATCH] agent: remove picker usage Instead call grpc.Dial and (*grpc.ClientConn).Close() directly. Picker has pretty bad problems when call Reset concurrently - it can block for a long time and then suddenly close connection which was just created. So, in this particular case - less magic is better. There are some other usages of a picker in ca package, but they seem harmless because there is no calls to Reset now. Signed-off-by: Alexander Morozov --- agent/agent.go | 6 +--- agent/agent_test.go | 32 ++++++------------ agent/config.go | 18 ++++------ agent/node.go | 15 ++------- agent/session.go | 37 +++++++++++++++++---- manager/testcluster/manager_cluster_test.go | 20 +++-------- 6 files changed, 54 insertions(+), 74 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 65b3a99397..3705b89a32 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -197,11 +197,6 @@ func (a *Agent) run(ctx context.Context) { sessionq = nil // if we're here before <-registered, do nothing for that event registered = nil - - // Bounce the connection. - if a.config.Picker != nil { - a.config.Picker.Reset() - } case <-session.closed: log.G(ctx).Debugf("agent: rebuild session") @@ -218,6 +213,7 @@ func (a *Agent) run(ctx context.Context) { if a.err == nil { a.err = ctx.Err() } + session.close() return } diff --git a/agent/agent_test.go b/agent/agent_test.go index 6ddde6938b..1012cb115a 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -1,6 +1,9 @@ package agent import ( + "testing" + "time" + "github.com/docker/swarmkit/agent/exec" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/ca" @@ -8,9 +11,6 @@ import ( "github.com/docker/swarmkit/picker" "github.com/stretchr/testify/assert" "golang.org/x/net/context" - "google.golang.org/grpc" - "testing" - "time" ) // NoopExecutor is a dummy executor that implements enough to get the agent started. @@ -67,19 +67,14 @@ func TestAgentStartStop(t *testing.T) { addr := "localhost:4949" remotes := picker.NewRemotes(api.Peer{Addr: addr}) - conn, err := grpc.Dial(addr, - grpc.WithPicker(picker.NewPicker(remotes, addr)), - grpc.WithTransportCredentials(agentSecurityConfig.ClientTLSCreds)) - assert.NoError(t, err) - db, cleanup := storageTestEnv(t) defer cleanup() agent, err := New(&Config{ - Executor: &NoopExecutor{}, - Managers: remotes, - Conn: conn, - DB: db, + Executor: &NoopExecutor{}, + Managers: remotes, + Credentials: agentSecurityConfig.ClientTLSCreds, + DB: db, }) assert.NoError(t, err) assert.NotNil(t, agent) @@ -146,19 +141,14 @@ func agentTestEnv(t *testing.T) (*Agent, func()) { addr := "localhost:4949" remotes := picker.NewRemotes(api.Peer{Addr: addr}) - conn, err := grpc.Dial(addr, - grpc.WithPicker(picker.NewPicker(remotes, addr)), - grpc.WithTransportCredentials(agentSecurityConfig.ClientTLSCreds)) - assert.NoError(t, err) - db, cleanupStorage := storageTestEnv(t) cleanup = append(cleanup, func() { cleanupStorage() }) agent, err := New(&Config{ - Executor: &NoopExecutor{}, - Managers: remotes, - Conn: conn, - DB: db, + Executor: &NoopExecutor{}, + Managers: remotes, + Credentials: agentSecurityConfig.ClientTLSCreds, + DB: db, }) return agent, func() { for i := len(cleanup) - 1; i > 0; i-- { diff --git a/agent/config.go b/agent/config.go index 74da752219..1f26f7797e 100644 --- a/agent/config.go +++ b/agent/config.go @@ -7,7 +7,7 @@ import ( "github.com/docker/swarmkit/agent/exec" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/picker" - "google.golang.org/grpc" + "google.golang.org/grpc/credentials" ) // Config provides values for an Agent. @@ -19,15 +19,6 @@ type Config struct { // updated with managers weights as observed by the agent. Managers picker.Remotes - // Conn specifies the client connection Agent will use. - Conn *grpc.ClientConn - - // Picker is the picker used by Conn. - // TODO(aaronl): This is only part of the config to allow resetting the - // GRPC connection. This should be refactored to address the coupling - // between Conn and Picker. - Picker *picker.Picker - // Executor specifies the executor to use for the agent. Executor exec.Executor @@ -36,11 +27,14 @@ type Config struct { // NotifyRoleChange channel receives new roles from session messages. NotifyRoleChange chan<- api.NodeRole + + // Credentials is credentials for grpc connection to manager. + Credentials credentials.TransportAuthenticator } func (c *Config) validate() error { - if c.Conn == nil { - return fmt.Errorf("agent: Connection is required") + if c.Credentials == nil { + return fmt.Errorf("agent: Credentials is required") } if c.Executor == nil { diff --git a/agent/node.go b/agent/node.go index 2178be8002..d231313959 100644 --- a/agent/node.go +++ b/agent/node.go @@ -361,32 +361,21 @@ func (n *Node) Err(ctx context.Context) error { } func (n *Node) runAgent(ctx context.Context, db *bolt.DB, creds credentials.TransportAuthenticator, ready chan<- struct{}) error { - var manager api.Peer select { case <-ctx.Done(): - case manager = <-n.remotes.WaitSelect(ctx): + case <-n.remotes.WaitSelect(ctx): } if ctx.Err() != nil { return ctx.Err() } - picker := picker.NewPicker(n.remotes, manager.Addr) - conn, err := grpc.Dial(manager.Addr, - grpc.WithPicker(picker), - grpc.WithTransportCredentials(creds), - grpc.WithBackoffMaxDelay(maxSessionFailureBackoff)) - if err != nil { - return err - } - defer conn.Close() agent, err := New(&Config{ Hostname: n.config.Hostname, Managers: n.remotes, Executor: n.config.Executor, DB: db, - Conn: conn, - Picker: picker, NotifyRoleChange: n.roleChangeReq, + Credentials: creds, }) if err != nil { return err diff --git a/agent/session.go b/agent/session.go index 0375e1c5bb..d091a56078 100644 --- a/agent/session.go +++ b/agent/session.go @@ -6,6 +6,7 @@ import ( "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/log" + "github.com/docker/swarmkit/picker" "github.com/docker/swarmkit/protobuf/ptypes" "golang.org/x/net/context" "google.golang.org/grpc" @@ -27,6 +28,9 @@ var ( // flow into the agent, such as task assignment, are called back into the // agent through errs, messages and tasks. type session struct { + conn *grpc.ClientConn + addr string + agent *Agent sessionID string session api.Dispatcher_SessionClient @@ -41,12 +45,27 @@ type session struct { func newSession(ctx context.Context, agent *Agent, delay time.Duration) *session { s := &session{ agent: agent, - errs: make(chan error), + errs: make(chan error, 1), messages: make(chan *api.SessionMessage), tasks: make(chan *api.TasksMessage), registered: make(chan struct{}), closed: make(chan struct{}), } + peer, err := agent.config.Managers.Select() + if err != nil { + s.errs <- err + return s + } + cc, err := grpc.Dial(peer.Addr, + grpc.WithTransportCredentials(agent.config.Credentials), + grpc.WithTimeout(dispatcherRPCTimeout), + ) + if err != nil { + s.errs <- err + return s + } + s.addr = peer.Addr + s.conn = cc go s.run(ctx, delay) return s @@ -77,8 +96,6 @@ func (s *session) run(ctx context.Context, delay time.Duration) { func (s *session) start(ctx context.Context) error { log.G(ctx).Debugf("(*session).start") - client := api.NewDispatcherClient(s.agent.config.Conn) - description, err := s.agent.config.Executor.Describe(ctx) if err != nil { log.G(ctx).WithError(err).WithField("executor", s.agent.config.Executor). @@ -103,6 +120,8 @@ func (s *session) start(ctx context.Context) error { // Need to run Session in a goroutine since there's no way to set a // timeout for an individual Recv call in a stream. go func() { + client := api.NewDispatcherClient(s.conn) + stream, err = client.Session(sessionCtx, &api.SessionRequest{ Description: description, }) @@ -133,7 +152,7 @@ func (s *session) start(ctx context.Context) error { func (s *session) heartbeat(ctx context.Context) error { log.G(ctx).Debugf("(*session).heartbeat") - client := api.NewDispatcherClient(s.agent.config.Conn) + client := api.NewDispatcherClient(s.conn) heartbeat := time.NewTimer(1) // send out a heartbeat right away defer heartbeat.Stop() @@ -195,7 +214,7 @@ func (s *session) handleSessionMessage(ctx context.Context, msg *api.SessionMess func (s *session) watch(ctx context.Context) error { log.G(ctx).Debugf("(*session).watch") - client := api.NewDispatcherClient(s.agent.config.Conn) + client := api.NewDispatcherClient(s.conn) watch, err := client.Tasks(ctx, &api.TasksRequest{ SessionID: s.sessionID}) if err != nil { @@ -221,7 +240,7 @@ func (s *session) watch(ctx context.Context) error { // sendTaskStatus uses the current session to send the status of a single task. func (s *session) sendTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error { - client := api.NewDispatcherClient(s.agent.config.Conn) + client := api.NewDispatcherClient(s.conn) if _, err := client.UpdateTaskStatus(ctx, &api.UpdateTaskStatusRequest{ SessionID: s.sessionID, Updates: []*api.UpdateTaskStatusRequest_TaskStatusUpdate{ @@ -262,7 +281,7 @@ func (s *session) sendTaskStatuses(ctx context.Context, updates ...*api.UpdateTa return updates, ctx.Err() } - client := api.NewDispatcherClient(s.agent.config.Conn) + client := api.NewDispatcherClient(s.conn) n := batchSize if len(updates) < n { @@ -285,6 +304,10 @@ func (s *session) close() error { case <-s.closed: return errSessionClosed default: + if s.conn != nil { + s.agent.config.Managers.ObserveIfExists(api.Peer{Addr: s.addr}, -picker.DefaultObservationWeight) + s.conn.Close() + } close(s.closed) return nil } diff --git a/manager/testcluster/manager_cluster_test.go b/manager/testcluster/manager_cluster_test.go index 8bcb66bf13..7243771ae9 100644 --- a/manager/testcluster/manager_cluster_test.go +++ b/manager/testcluster/manager_cluster_test.go @@ -24,7 +24,6 @@ import ( "github.com/docker/swarmkit/picker" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/grpc" ) func init() { @@ -70,23 +69,12 @@ func (mc *managersCluster) addAgents(count int) error { } managers := picker.NewRemotes(addrs...) - peer, err := managers.Select() - if err != nil { - return err - } - conn, err := grpc.Dial(peer.Addr, - grpc.WithPicker(picker.NewPicker(managers)), - grpc.WithTransportCredentials(asConfig.ClientTLSCreds)) - if err != nil { - return err - } - id := strconv.Itoa(rand.Int()) a, err := agent.New(&agent.Config{ - Hostname: "hostname_" + id, - Managers: managers, - Executor: &NoopExecutor{}, - Conn: conn, + Hostname: "hostname_" + id, + Managers: managers, + Executor: &NoopExecutor{}, + Credentials: asConfig.ClientTLSCreds, }) if err != nil { return err