Skip to content

Commit

Permalink
fix(conn): JoinCluster loop should use latest conn (#7950) (#7954)
Browse files Browse the repository at this point in the history
JoinCluster loop was getting the connection from pool upfront, and then looping over it. This opened up a bug because in #7918 , we close the connection in case it becomes unhealthy.

This PR gets the latest connection available in the loop. This was the only place in the codebase where I found this issue.

(cherry picked from commit 7531e95)
(cherry picked from commit c4098ed)

Co-authored-by: Manish R Jain <[email protected]>
  • Loading branch information
danielmai and manishrjain authored Jul 16, 2021
1 parent d03d5ad commit 07452e8
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 12 deletions.
3 changes: 1 addition & 2 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,10 +576,9 @@ func (n *node) initAndStartNode() error {
return errors.Errorf("Unhealthy connection to %v", opts.peer)
}

gconn := p.Get()
c := pb.NewRaftClient(gconn)
timeout := 8 * time.Second
for {
c := pb.NewRaftClient(p.Get())
ctx, cancel := context.WithTimeout(n.ctx, timeout)
// JoinCluster can block indefinitely, raft ignores conf change proposal
// if it has pending configuration.
Expand Down
3 changes: 1 addition & 2 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1628,8 +1628,7 @@ func (n *node) joinPeers() error {
return err
}

gconn := pl.Get()
c := pb.NewRaftClient(gconn)
c := pb.NewRaftClient(pl.Get())
glog.Infof("Calling JoinCluster via leader: %s", pl.Addr)
if _, err := c.JoinCluster(n.ctx, n.RaftContext); err != nil {
return errors.Wrapf(err, "error while joining cluster")
Expand Down
3 changes: 1 addition & 2 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,10 +546,9 @@ func proposeOrSend(ctx context.Context, gid uint32, m *pb.Mutations, chr chan re
chr <- res
return
}
con := pl.Get()

var tc *api.TxnContext
c := pb.NewWorkerClient(con)
c := pb.NewWorkerClient(pl.Get())

ch := make(chan error, 1)
go func() {
Expand Down
3 changes: 1 addition & 2 deletions worker/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,7 @@ func getSchemaOverNetwork(ctx context.Context, gid uint32, s *pb.SchemaRequest,
ch <- resultErr{err: conn.ErrNoConnection}
return
}
conn := pl.Get()
c := pb.NewWorkerClient(conn)
c := pb.NewWorkerClient(pl.Get())
schema, e := c.Schema(ctx, s)
ch <- resultErr{result: schema, err: e}
}
Expand Down
3 changes: 1 addition & 2 deletions worker/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ type badgerWriter interface {

// populateSnapshot gets data for a shard from the leader and writes it to BadgerDB on the follower.
func (n *node) populateSnapshot(snap pb.Snapshot, pl *conn.Pool) error {
con := pl.Get()
c := pb.NewWorkerClient(con)
c := pb.NewWorkerClient(pl.Get())

// We should absolutely cancel the context when we return from this function, that way, the
// leader who is sending the snapshot would stop sending.
Expand Down
3 changes: 1 addition & 2 deletions worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,10 @@ func invokeNetworkRequest(ctx context.Context, addr string,
return nil, errors.Wrapf(err, "dispatchTaskOverNetwork: while retrieving connection.")
}

con := pl.Get()
if span := otrace.FromContext(ctx); span != nil {
span.Annotatef(nil, "invokeNetworkRequest: Sending request to %v", addr)
}
c := pb.NewWorkerClient(con)
c := pb.NewWorkerClient(pl.Get())
return f(ctx, c)
}

Expand Down

0 comments on commit 07452e8

Please sign in to comment.