Skip to content

Commit

Permalink
fix(proposals): Incremental proposal key for zero proposals (#8005)
Browse files Browse the repository at this point in the history
Change the proposal's unique key to an atomic counter instead of using a randomly generated key.

(cherry picked from commit a515d0d)
(cherry picked from commit 2aa3d3e)
  • Loading branch information
ahsanbarkati authored and all-seeing-code committed Jan 24, 2023
1 parent 2212e3f commit b3b4042
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 5 deletions.
18 changes: 17 additions & 1 deletion dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ package zero

import (
"context"
"crypto/rand"
"encoding/binary"
"fmt"
"log"
"math"
"sort"
"strings"
"sync"
"sync/atomic"
"time"

farm "github.com/dgryski/go-farm"
Expand All @@ -48,6 +50,8 @@ const (
raftDefaults = "idx=1; learner=false;"
)

var proposalKey uint64

type node struct {
*conn.Node
server *Server
Expand Down Expand Up @@ -81,8 +85,19 @@ func (n *node) AmLeader() bool {
return time.Since(n.lastQuorum) <= 5*time.Second
}

// {2 bytes Node ID} {4 bytes for random} {2 bytes zero}
func (n *node) initProposalKey(id uint64) error {
x.AssertTrue(id != 0)
b := make([]byte, 8)
if _, err := rand.Read(b); err != nil {
return err
}
proposalKey = n.Id<<48 | binary.BigEndian.Uint64(b)<<16
return nil
}

func (n *node) uniqueKey() uint64 {
return uint64(n.Id)<<32 | uint64(n.Rand.Uint32())
return atomic.AddUint64(&proposalKey, 1)
}

var errInternalRetry = errors.New("Retry Raft proposal internally")
Expand Down Expand Up @@ -626,6 +641,7 @@ func (n *node) checkForCIDInEntries() (bool, error) {
}

func (n *node) initAndStartNode() error {
x.Check(n.initProposalKey(n.Id))
_, restart, err := n.PastLife()
x.Check(err)

Expand Down
2 changes: 1 addition & 1 deletion worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1753,7 +1753,7 @@ func (n *node) retryUntilSuccess(fn func() error, pause time.Duration) {

// InitAndStartNode gets called after having at least one membership sync with the cluster.
func (n *node) InitAndStartNode() {
initProposalKey(n.Id)
x.Check(initProposalKey(n.Id))
_, restart, err := n.PastLife()
x.Check(err)

Expand Down
11 changes: 8 additions & 3 deletions worker/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package worker

import (
"context"
"crypto/rand"
"encoding/binary"
"sync"
"sync/atomic"
Expand All @@ -32,7 +33,6 @@ import (
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
)

const baseTimeout time.Duration = 4 * time.Second
Expand Down Expand Up @@ -111,9 +111,14 @@ func (rl *rateLimiter) decr(retry int) {
var proposalKey uint64

// {2 bytes Node ID} {4 bytes for random} {2 bytes zero}
func initProposalKey(id uint64) {
func initProposalKey(id uint64) error {
x.AssertTrue(id != 0)
proposalKey = uint64(groups().Node.Id)<<48 | uint64(z.FastRand())<<16
b := make([]byte, 8)
if _, err := rand.Read(b); err != nil {
return err
}
proposalKey = groups().Node.Id<<48 | binary.BigEndian.Uint64(b)<<16
return nil
}

// uniqueKey is meant to be unique across all the replicas.
Expand Down

0 comments on commit b3b4042

Please sign in to comment.