Skip to content

Commit

Permalink
Merge pull request #1312 from LK4D4/no_agent_picker
Browse files Browse the repository at this point in the history
agent: remove picker usage
  • Loading branch information
LK4D4 committed Aug 5, 2016
2 parents a30b34d + 01be2bf commit 9b0742e
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 74 deletions.
6 changes: 1 addition & 5 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,6 @@ func (a *Agent) run(ctx context.Context) {
sessionq = nil
// if we're here before <-registered, do nothing for that event
registered = nil

// Bounce the connection.
if a.config.Picker != nil {
a.config.Picker.Reset()
}
case <-session.closed:
log.G(ctx).Debugf("agent: rebuild session")

Expand All @@ -218,6 +213,7 @@ func (a *Agent) run(ctx context.Context) {
if a.err == nil {
a.err = ctx.Err()
}
session.close()

return
}
Expand Down
32 changes: 11 additions & 21 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package agent

import (
"testing"
"time"

"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/ca"
"github.com/docker/swarmkit/ca/testutils"
"github.com/docker/swarmkit/picker"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"google.golang.org/grpc"
"testing"
"time"
)

// NoopExecutor is a dummy executor that implements enough to get the agent started.
Expand Down Expand Up @@ -67,19 +67,14 @@ func TestAgentStartStop(t *testing.T) {
addr := "localhost:4949"
remotes := picker.NewRemotes(api.Peer{Addr: addr})

conn, err := grpc.Dial(addr,
grpc.WithPicker(picker.NewPicker(remotes, addr)),
grpc.WithTransportCredentials(agentSecurityConfig.ClientTLSCreds))
assert.NoError(t, err)

db, cleanup := storageTestEnv(t)
defer cleanup()

agent, err := New(&Config{
Executor: &NoopExecutor{},
Managers: remotes,
Conn: conn,
DB: db,
Executor: &NoopExecutor{},
Managers: remotes,
Credentials: agentSecurityConfig.ClientTLSCreds,
DB: db,
})
assert.NoError(t, err)
assert.NotNil(t, agent)
Expand Down Expand Up @@ -146,19 +141,14 @@ func agentTestEnv(t *testing.T) (*Agent, func()) {
addr := "localhost:4949"
remotes := picker.NewRemotes(api.Peer{Addr: addr})

conn, err := grpc.Dial(addr,
grpc.WithPicker(picker.NewPicker(remotes, addr)),
grpc.WithTransportCredentials(agentSecurityConfig.ClientTLSCreds))
assert.NoError(t, err)

db, cleanupStorage := storageTestEnv(t)
cleanup = append(cleanup, func() { cleanupStorage() })

agent, err := New(&Config{
Executor: &NoopExecutor{},
Managers: remotes,
Conn: conn,
DB: db,
Executor: &NoopExecutor{},
Managers: remotes,
Credentials: agentSecurityConfig.ClientTLSCreds,
DB: db,
})
return agent, func() {
for i := len(cleanup) - 1; i > 0; i-- {
Expand Down
18 changes: 6 additions & 12 deletions agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/picker"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

// Config provides values for an Agent.
Expand All @@ -19,15 +19,6 @@ type Config struct {
// updated with managers weights as observed by the agent.
Managers picker.Remotes

// Conn specifies the client connection Agent will use.
Conn *grpc.ClientConn

// Picker is the picker used by Conn.
// TODO(aaronl): This is only part of the config to allow resetting the
// GRPC connection. This should be refactored to address the coupling
// between Conn and Picker.
Picker *picker.Picker

// Executor specifies the executor to use for the agent.
Executor exec.Executor

Expand All @@ -36,11 +27,14 @@ type Config struct {

// NotifyRoleChange channel receives new roles from session messages.
NotifyRoleChange chan<- api.NodeRole

// Credentials is credentials for grpc connection to manager.
Credentials credentials.TransportAuthenticator
}

func (c *Config) validate() error {
if c.Conn == nil {
return fmt.Errorf("agent: Connection is required")
if c.Credentials == nil {
return fmt.Errorf("agent: Credentials is required")
}

if c.Executor == nil {
Expand Down
15 changes: 2 additions & 13 deletions agent/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,32 +361,21 @@ func (n *Node) Err(ctx context.Context) error {
}

func (n *Node) runAgent(ctx context.Context, db *bolt.DB, creds credentials.TransportAuthenticator, ready chan<- struct{}) error {
var manager api.Peer
select {
case <-ctx.Done():
case manager = <-n.remotes.WaitSelect(ctx):
case <-n.remotes.WaitSelect(ctx):
}
if ctx.Err() != nil {
return ctx.Err()
}
picker := picker.NewPicker(n.remotes, manager.Addr)
conn, err := grpc.Dial(manager.Addr,
grpc.WithPicker(picker),
grpc.WithTransportCredentials(creds),
grpc.WithBackoffMaxDelay(maxSessionFailureBackoff))
if err != nil {
return err
}
defer conn.Close()

agent, err := New(&Config{
Hostname: n.config.Hostname,
Managers: n.remotes,
Executor: n.config.Executor,
DB: db,
Conn: conn,
Picker: picker,
NotifyRoleChange: n.roleChangeReq,
Credentials: creds,
})
if err != nil {
return err
Expand Down
37 changes: 30 additions & 7 deletions agent/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/picker"
"github.com/docker/swarmkit/protobuf/ptypes"
"golang.org/x/net/context"
"google.golang.org/grpc"
Expand All @@ -27,6 +28,9 @@ var (
// flow into the agent, such as task assignment, are called back into the
// agent through errs, messages and tasks.
type session struct {
conn *grpc.ClientConn
addr string

agent *Agent
sessionID string
session api.Dispatcher_SessionClient
Expand All @@ -41,12 +45,27 @@ type session struct {
func newSession(ctx context.Context, agent *Agent, delay time.Duration) *session {
s := &session{
agent: agent,
errs: make(chan error),
errs: make(chan error, 1),
messages: make(chan *api.SessionMessage),
tasks: make(chan *api.TasksMessage),
registered: make(chan struct{}),
closed: make(chan struct{}),
}
peer, err := agent.config.Managers.Select()
if err != nil {
s.errs <- err
return s
}
cc, err := grpc.Dial(peer.Addr,
grpc.WithTransportCredentials(agent.config.Credentials),
grpc.WithTimeout(dispatcherRPCTimeout),
)
if err != nil {
s.errs <- err
return s
}
s.addr = peer.Addr
s.conn = cc

go s.run(ctx, delay)
return s
Expand Down Expand Up @@ -77,8 +96,6 @@ func (s *session) run(ctx context.Context, delay time.Duration) {
func (s *session) start(ctx context.Context) error {
log.G(ctx).Debugf("(*session).start")

client := api.NewDispatcherClient(s.agent.config.Conn)

description, err := s.agent.config.Executor.Describe(ctx)
if err != nil {
log.G(ctx).WithError(err).WithField("executor", s.agent.config.Executor).
Expand All @@ -103,6 +120,8 @@ func (s *session) start(ctx context.Context) error {
// Need to run Session in a goroutine since there's no way to set a
// timeout for an individual Recv call in a stream.
go func() {
client := api.NewDispatcherClient(s.conn)

stream, err = client.Session(sessionCtx, &api.SessionRequest{
Description: description,
})
Expand Down Expand Up @@ -133,7 +152,7 @@ func (s *session) start(ctx context.Context) error {

func (s *session) heartbeat(ctx context.Context) error {
log.G(ctx).Debugf("(*session).heartbeat")
client := api.NewDispatcherClient(s.agent.config.Conn)
client := api.NewDispatcherClient(s.conn)
heartbeat := time.NewTimer(1) // send out a heartbeat right away
defer heartbeat.Stop()

Expand Down Expand Up @@ -195,7 +214,7 @@ func (s *session) handleSessionMessage(ctx context.Context, msg *api.SessionMess

func (s *session) watch(ctx context.Context) error {
log.G(ctx).Debugf("(*session).watch")
client := api.NewDispatcherClient(s.agent.config.Conn)
client := api.NewDispatcherClient(s.conn)
watch, err := client.Tasks(ctx, &api.TasksRequest{
SessionID: s.sessionID})
if err != nil {
Expand All @@ -221,7 +240,7 @@ func (s *session) watch(ctx context.Context) error {
// sendTaskStatus uses the current session to send the status of a single task.
func (s *session) sendTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error {

client := api.NewDispatcherClient(s.agent.config.Conn)
client := api.NewDispatcherClient(s.conn)
if _, err := client.UpdateTaskStatus(ctx, &api.UpdateTaskStatusRequest{
SessionID: s.sessionID,
Updates: []*api.UpdateTaskStatusRequest_TaskStatusUpdate{
Expand Down Expand Up @@ -262,7 +281,7 @@ func (s *session) sendTaskStatuses(ctx context.Context, updates ...*api.UpdateTa
return updates, ctx.Err()
}

client := api.NewDispatcherClient(s.agent.config.Conn)
client := api.NewDispatcherClient(s.conn)
n := batchSize

if len(updates) < n {
Expand All @@ -285,6 +304,10 @@ func (s *session) close() error {
case <-s.closed:
return errSessionClosed
default:
if s.conn != nil {
s.agent.config.Managers.ObserveIfExists(api.Peer{Addr: s.addr}, -picker.DefaultObservationWeight)
s.conn.Close()
}
close(s.closed)
return nil
}
Expand Down
20 changes: 4 additions & 16 deletions manager/testcluster/manager_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/docker/swarmkit/picker"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

func init() {
Expand Down Expand Up @@ -70,23 +69,12 @@ func (mc *managersCluster) addAgents(count int) error {
}

managers := picker.NewRemotes(addrs...)
peer, err := managers.Select()
if err != nil {
return err
}
conn, err := grpc.Dial(peer.Addr,
grpc.WithPicker(picker.NewPicker(managers)),
grpc.WithTransportCredentials(asConfig.ClientTLSCreds))
if err != nil {
return err
}

id := strconv.Itoa(rand.Int())
a, err := agent.New(&agent.Config{
Hostname: "hostname_" + id,
Managers: managers,
Executor: &NoopExecutor{},
Conn: conn,
Hostname: "hostname_" + id,
Managers: managers,
Executor: &NoopExecutor{},
Credentials: asConfig.ClientTLSCreds,
})
if err != nil {
return err
Expand Down

0 comments on commit 9b0742e

Please sign in to comment.