From 61c58af42ac7b412fdc0f80cc43604c621ee4503 Mon Sep 17 00:00:00 2001 From: Ahsan Barkati Date: Mon, 30 Aug 2021 21:46:22 +0530 Subject: [PATCH 1/3] Incremental proposal key for zero --- dgraph/cmd/zero/raft.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index bda04d7d427..7407098f92c 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -25,6 +25,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" "github.com/dgraph-io/dgraph/conn" @@ -47,6 +48,8 @@ const ( raftDefaults = "idx=1; learner=false;" ) +var proposalKey uint64 + type node struct { *conn.Node server *Server @@ -80,8 +83,14 @@ func (n *node) AmLeader() bool { return time.Since(n.lastQuorum) <= 5*time.Second } +// {2 bytes Node ID} {4 bytes for random} {2 bytes zero} +func (n *node) initProposalKey(id uint64) { + x.AssertTrue(id != 0) + proposalKey = uint64(n.Id)<<48 | uint64(z.FastRand())<<16 +} + func (n *node) uniqueKey() uint64 { - return uint64(n.Id)<<32 | uint64(n.Rand.Uint32()) + return atomic.AddUint64(&proposalKey, 1) } var errInternalRetry = errors.New("Retry Raft proposal internally") @@ -597,6 +606,7 @@ func (n *node) checkForCIDInEntries() (bool, error) { } func (n *node) initAndStartNode() error { + n.initProposalKey(n.Id) _, restart, err := n.PastLife() x.Check(err) From 474bda8b2c1eb30f0a02dfc1d78d0e8f3022fb08 Mon Sep 17 00:00:00 2001 From: Ahsan Barkati Date: Thu, 2 Sep 2021 21:57:33 +0530 Subject: [PATCH 2/3] Use crypto rand --- dgraph/cmd/zero/raft.go | 13 ++++++++++--- worker/draft.go | 2 +- worker/proposal.go | 12 +++++++++--- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index 7407098f92c..13a857d325e 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -18,6 +18,7 @@ package zero import ( "context" + "crypto/rand" "encoding/binary" "fmt" "log" @@ -84,9 +85,15 @@ func (n *node) AmLeader() bool { } // {2 bytes Node ID} {4 bytes for random} {2 bytes zero} -func (n *node) initProposalKey(id uint64) { +func (n *node) initProposalKey(id uint64) error { x.AssertTrue(id != 0) - proposalKey = uint64(n.Id)<<48 | uint64(z.FastRand())<<16 + randBytes := make([]byte, 8) + if _, err := rand.Read(randBytes); err != nil { + return err + } + randNum := binary.BigEndian.Uint64(randBytes) + proposalKey = uint64(n.Id)<<48 | uint64(randNum)<<16 + return nil } func (n *node) uniqueKey() uint64 { @@ -606,7 +613,7 @@ func (n *node) checkForCIDInEntries() (bool, error) { } func (n *node) initAndStartNode() error { - n.initProposalKey(n.Id) + x.Check(n.initProposalKey(n.Id)) _, restart, err := n.PastLife() x.Check(err) diff --git a/worker/draft.go b/worker/draft.go index f795fe72904..f15c5a0d2cc 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -2120,7 +2120,7 @@ func (n *node) retryUntilSuccess(fn func() error, pause time.Duration) { // InitAndStartNode gets called after having at least one membership sync with the cluster. func (n *node) InitAndStartNode() { - initProposalKey(n.Id) + x.Check(initProposalKey(n.Id)) _, restart, err := n.PastLife() x.Check(err) diff --git a/worker/proposal.go b/worker/proposal.go index 8365fe3e459..e8464050829 100644 --- a/worker/proposal.go +++ b/worker/proposal.go @@ -18,6 +18,7 @@ package worker import ( "context" + "crypto/rand" "encoding/binary" "sync" "sync/atomic" @@ -27,7 +28,6 @@ import ( "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/x" - "github.com/dgraph-io/ristretto/z" ostats "go.opencensus.io/stats" "go.opencensus.io/tag" @@ -112,9 +112,15 @@ func (rl *rateLimiter) decr(retry int) { var proposalKey uint64 // {2 bytes Node ID} {4 bytes for random} {2 bytes zero} -func initProposalKey(id uint64) { +func initProposalKey(id uint64) error { x.AssertTrue(id != 0) - proposalKey = uint64(groups().Node.Id)<<48 | uint64(z.FastRand())<<16 + randBytes := make([]byte, 8) + if _, err := rand.Read(randBytes); err != nil { + return err + } + randNum := binary.BigEndian.Uint64(randBytes) + proposalKey = uint64(groups().Node.Id)<<48 | uint64(randNum)<<16 + return nil } // uniqueKey is meant to be unique across all the replicas. From 70b2ba981078af35188fc50719c22e76f49983a5 Mon Sep 17 00:00:00 2001 From: Ahsan Barkati Date: Thu, 2 Sep 2021 22:03:19 +0530 Subject: [PATCH 3/3] Refactor --- dgraph/cmd/zero/raft.go | 7 +++---- worker/proposal.go | 7 +++---- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index 13a857d325e..0c2d98364c8 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -87,12 +87,11 @@ func (n *node) AmLeader() bool { // {2 bytes Node ID} {4 bytes for random} {2 bytes zero} func (n *node) initProposalKey(id uint64) error { x.AssertTrue(id != 0) - randBytes := make([]byte, 8) - if _, err := rand.Read(randBytes); err != nil { + b := make([]byte, 8) + if _, err := rand.Read(b); err != nil { return err } - randNum := binary.BigEndian.Uint64(randBytes) - proposalKey = uint64(n.Id)<<48 | uint64(randNum)<<16 + proposalKey = n.Id<<48 | binary.BigEndian.Uint64(b)<<16 return nil } diff --git a/worker/proposal.go b/worker/proposal.go index e8464050829..858eaf7bc9a 100644 --- a/worker/proposal.go +++ b/worker/proposal.go @@ -114,12 +114,11 @@ var proposalKey uint64 // {2 bytes Node ID} {4 bytes for random} {2 bytes zero} func initProposalKey(id uint64) error { x.AssertTrue(id != 0) - randBytes := make([]byte, 8) - if _, err := rand.Read(randBytes); err != nil { + b := make([]byte, 8) + if _, err := rand.Read(b); err != nil { return err } - randNum := binary.BigEndian.Uint64(randBytes) - proposalKey = uint64(groups().Node.Id)<<48 | uint64(randNum)<<16 + proposalKey = groups().Node.Id<<48 | binary.BigEndian.Uint64(b)<<16 return nil }