Skip to content

Commit

Permalink
fix: use slot node id to detect node re-configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed Sep 29, 2021
1 parent 5072031 commit 9bea59d
Showing 1 changed file with 24 additions and 13 deletions.
37 changes: 24 additions & 13 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,17 +170,19 @@ func (opt *ClusterOptions) clientOptions() *Options {
//------------------------------------------------------------------------------

type clusterNode struct {
id string
Client *Client

latency uint32 // atomic
generation uint32 // atomic
failing uint32 // atomic
}

func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
func newClusterNode(clOpt *ClusterOptions, id, addr string) *clusterNode {
opt := clOpt.clientOptions()
opt.Addr = addr
node := clusterNode{
id: id,
Client: clOpt.NewClient(opt),
}

Expand Down Expand Up @@ -352,12 +354,16 @@ func (c *clusterNodes) GC(generation uint32) {
}
}

func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
return c.GetOrCreateWithID(addr, "")
}

func (c *clusterNodes) GetOrCreateWithID(addr, id string) (*clusterNode, error) {
node, err := c.get(addr)
if err != nil {
return nil, err
}
if node != nil {
if node != nil && (id == "" || node.id == id) {
return node, nil
}

Expand All @@ -370,10 +376,13 @@ func (c *clusterNodes) Get(addr string) (*clusterNode, error) {

node, ok := c.nodes[addr]
if ok {
return node, nil
if id == "" || node.id == id {
return node, nil
}
delete(c.nodes, addr)
}

node = newClusterNode(c.opt, addr)
node = newClusterNode(c.opt, id, addr)

c.addrs = appendIfNotExists(c.addrs, addr)
c.nodes[addr] = node
Expand Down Expand Up @@ -416,7 +425,7 @@ func (c *clusterNodes) Random() (*clusterNode, error) {
}

n := rand.Intn(len(addrs))
return c.Get(addrs[n])
return c.GetOrCreate(addrs[n])
}

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -474,7 +483,7 @@ func newClusterState(
addr = replaceLoopbackHost(addr, originHost)
}

node, err := c.nodes.Get(addr)
node, err := c.nodes.GetOrCreateWithID(addr, slotNode.ID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -824,8 +833,10 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
var addr string
moved, ask, addr = isMovedError(lastErr)
if moved || ask {
c.state.LazyReload()

var err error
node, err = c.nodes.Get(addr)
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
return err
}
Expand Down Expand Up @@ -1022,7 +1033,7 @@ func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
for _, idx := range rand.Perm(len(addrs)) {
addr := addrs[idx]

node, err := c.nodes.Get(addr)
node, err := c.nodes.GetOrCreate(addr)
if err != nil {
if firstErr == nil {
firstErr = err
Expand Down Expand Up @@ -1236,7 +1247,7 @@ func (c *ClusterClient) checkMovedErr(
return false
}

node, err := c.nodes.Get(addr)
node, err := c.nodes.GetOrCreate(addr)
if err != nil {
return false
}
Expand Down Expand Up @@ -1422,7 +1433,7 @@ func (c *ClusterClient) cmdsMoved(
addr string,
failedCmds *cmdsMap,
) error {
node, err := c.nodes.Get(addr)
node, err := c.nodes.GetOrCreate(addr)
if err != nil {
return err
}
Expand Down Expand Up @@ -1477,7 +1488,7 @@ func (c *ClusterClient) Watch(ctx context.Context, fn func(*Tx) error, keys ...s

moved, ask, addr := isMovedError(err)
if moved || ask {
node, err = c.nodes.Get(addr)
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
return err
}
Expand Down Expand Up @@ -1589,7 +1600,7 @@ func (c *ClusterClient) cmdsInfo(ctx context.Context) (map[string]*CommandInfo,
for _, idx := range perm {
addr := addrs[idx]

node, err := c.nodes.Get(addr)
node, err := c.nodes.GetOrCreate(addr)
if err != nil {
if firstErr == nil {
firstErr = err
Expand Down

0 comments on commit 9bea59d

Please sign in to comment.