Skip to content

Commit

Permalink
fix(proposals): Incremental proposal key for zero proposals (#8005) (#…
Browse files Browse the repository at this point in the history
…8567)

Change the proposal's unique key to an atomic counter instead of using a
randomly generated key.

Proposal key initialisation has a bug where we want to reserve first 2 bytes for node id but do not do because we read random bytes in all 8 bytes and do a logical OR. This PR adds a test case and fixes the logic of initialising the proposal key.

(cherry picked from commit a515d0d)
(cherry picked from commit 2aa3d3e)
  • Loading branch information
all-seeing-code authored Feb 8, 2023
1 parent 41d4b99 commit e4ff3a0
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 5 deletions.
17 changes: 16 additions & 1 deletion dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"

farm "github.com/dgryski/go-farm"
Expand All @@ -48,6 +49,8 @@ const (
raftDefaults = "idx=1; learner=false;"
)

var proposalKey uint64

type node struct {
*conn.Node
server *Server
Expand Down Expand Up @@ -81,8 +84,19 @@ func (n *node) AmLeader() bool {
return time.Since(n.lastQuorum) <= 5*time.Second
}

// {2 bytes Node ID} {4 bytes for random} {2 bytes zero}
func (n *node) initProposalKey(id uint64) error {
x.AssertTrue(id != 0)
var err error
proposalKey, err = x.ProposalKey(n.Id)
if err != nil {
return err
}
return nil
}

func (n *node) uniqueKey() uint64 {
return uint64(n.Id)<<32 | uint64(n.Rand.Uint32())
return atomic.AddUint64(&proposalKey, 1)
}

var errInternalRetry = errors.New("Retry Raft proposal internally")
Expand Down Expand Up @@ -626,6 +640,7 @@ func (n *node) checkForCIDInEntries() (bool, error) {
}

func (n *node) initAndStartNode() error {
x.Check(n.initProposalKey(n.Id))
_, restart, err := n.PastLife()
x.Check(err)

Expand Down
26 changes: 26 additions & 0 deletions dgraph/cmd/zero/zero_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/testutil"
"github.com/dgraph-io/ristretto/z"
)

func TestRemoveNode(t *testing.T) {
Expand Down Expand Up @@ -84,3 +86,27 @@ func TestIdBump(t *testing.T) {
_, err = zc.AssignIds(ctx, &pb.Num{Val: 10, Type: pb.Num_UID, Bump: true})
require.Contains(t, err.Error(), "Nothing to be leased")
}

func TestProposalKey(t *testing.T) {

id := uint64(2)
node := &node{Node: &conn.Node{Id: id}, ctx: context.Background(), closer: z.NewCloser(1)}
node.initProposalKey(node.Id)

pkey := proposalKey
nodeIdFromKey := proposalKey >> 48
require.Equal(t, id, nodeIdFromKey, "id extracted from proposal key is not equal to initial value")

valueOf48thBit := int(pkey & (1 << 48))
require.Equal(t, 0, valueOf48thBit, "48th bit is not set to zero on initialisation")

node.uniqueKey()
require.Equal(t, pkey+1, proposalKey, "proposal key should increment by 1 at each call of unique key")

uniqueKeys := make(map[uint64]struct{})
for i := 0; i < 10; i++ {
node.uniqueKey()
uniqueKeys[proposalKey] = struct{}{}
}
require.Equal(t, len(uniqueKeys), 10, "each iteration should create unique key")
}
2 changes: 1 addition & 1 deletion worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1756,7 +1756,7 @@ func (n *node) retryUntilSuccess(fn func() error, pause time.Duration) {

// InitAndStartNode gets called after having at least one membership sync with the cluster.
func (n *node) InitAndStartNode() {
initProposalKey(n.Id)
x.Check(initProposalKey(n.Id))
_, restart, err := n.PastLife()
x.Check(err)

Expand Down
10 changes: 7 additions & 3 deletions worker/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
)

const baseTimeout time.Duration = 4 * time.Second
Expand Down Expand Up @@ -111,9 +110,14 @@ func (rl *rateLimiter) decr(retry int) {
var proposalKey uint64

// {2 bytes Node ID} {4 bytes for random} {2 bytes zero}
func initProposalKey(id uint64) {
func initProposalKey(id uint64) error {
x.AssertTrue(id != 0)
proposalKey = uint64(groups().Node.Id)<<48 | uint64(z.FastRand())<<16
var err error
proposalKey, err = x.ProposalKey(groups().Node.Id)
if err != nil {
return err
}
return nil
}

// uniqueKey is meant to be unique across all the replicas.
Expand Down
17 changes: 17 additions & 0 deletions x/x.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"bytes"
builtinGzip "compress/gzip"
"context"
cr "crypto/rand"
"crypto/tls"
"encoding/binary"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -644,6 +646,21 @@ func RetryUntilSuccess(maxRetries int, waitAfterFailure time.Duration,
return err
}

// {2 bytes Node ID} {4 bytes for random} {2 bytes zero}
func ProposalKey(id uint64) (uint64, error) {
random4Bytes := make([]byte, 4)
if _, err := cr.Read(random4Bytes); err != nil {
return 0, err
}
proposalKey := id<<48 | uint64(binary.BigEndian.Uint32(random4Bytes))<<16
// We want to avoid spillage to node id in case of overflow. For instance, if the
// random bytes end up being [xx,xx, 255, 255, 255, 255, 0 , 0] (xx, xx being the node id)
// we would spill to node id after 65535 calls to unique key.
// So by setting 48th bit to 0 we ensure that we never spill out to node ids.
proposalKey &= ^(uint64(1) << 47)
return proposalKey, nil
}

// HasString returns whether the slice contains the given string.
func HasString(a []string, b string) bool {
for _, k := range a {
Expand Down

0 comments on commit e4ff3a0

Please sign in to comment.