Skip to content

Commit

Permalink
cloud: fix(lease): prevent ID lease overflow (#7802)
Browse files Browse the repository at this point in the history
* fix(lease): prevent ID lease overflow (#7724)

(cherry picked from commit d4c6b7c)

* fix(live): make live loader progress on a cluster with very high maxUid  (#7743)

This PR fixes the following case:
If we have a cluster that cannot lease out new UIDs because it has already leased upto its
max limit. Now, we try to live load the data with the given UIDs and the AssignIds complains
that the limit has reached. Hence, update the xidmap's maxSeenUid and make progress.

(cherry picked from commit ad64c01)
  • Loading branch information
NamanJain8 authored May 11, 2021
1 parent 5e45f13 commit 1235438
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 24 deletions.
42 changes: 22 additions & 20 deletions dgraph/cmd/zero/assign.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,38 +106,40 @@ func (s *Server) lease(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error
// We couldn't service it. So, let's request an extra timestamp for
// readonly transactions, if needed.
}

// If we're asking for more ids than the standard lease bandwidth, then we
// should set howMany generously, so we can service future requests from
// memory, without asking for another lease. Only used if we need to renew
// our lease.
howMany := leaseBandwidth
if num.Val > leaseBandwidth {
howMany = num.Val + leaseBandwidth
}

if s.nextLease[pb.Num_UID] == 0 || s.nextLease[pb.Num_TXN_TS] == 0 ||
s.nextLease[pb.Num_NS_ID] == 0 {
return nil, errors.New("Server not initialized")
}

var proposal pb.ZeroProposal

// Calculate how many ids do we have available in memory, before we need to
// renew our lease.
maxLease := s.maxLease(typ)
available := maxLease - s.nextLease[typ] + 1
switch typ {
case pb.Num_TXN_TS:
proposal.MaxTxnTs = maxLease + howMany
case pb.Num_UID:
proposal.MaxUID = maxLease + howMany
case pb.Num_NS_ID:
proposal.MaxNsID = maxLease + howMany
}

// If we have less available than what we need, we need to renew our lease.
if available < num.Val+1 { // +1 for a potential readonly ts.
// If we're asking for more ids than the standard lease bandwidth, then we
// should set howMany generously, so we can service future requests from
// memory, without asking for another lease. Only used if we need to renew
// our lease.
howMany := leaseBandwidth
if num.Val > leaseBandwidth {
howMany = num.Val + leaseBandwidth
}
if howMany < num.Val || maxLease+howMany < maxLease { // check for overflow.
return &emptyAssignedIds, errors.Errorf("Cannot lease %s as the limit has reached."+
" currMax:%d", typ, s.nextLease[typ]-1)
}

var proposal pb.ZeroProposal
switch typ {
case pb.Num_TXN_TS:
proposal.MaxTxnTs = maxLease + howMany
case pb.Num_UID:
proposal.MaxUID = maxLease + howMany
case pb.Num_NS_ID:
proposal.MaxNsID = maxLease + howMany
}
// Blocking propose to get more ids or timestamps.
if err := s.Node.proposeAndWait(ctx, &proposal); err != nil {
return nil, err
Expand Down
8 changes: 8 additions & 0 deletions dgraph/cmd/zero/zero_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package zero

import (
"context"
"math"
"testing"

"github.com/dgraph-io/dgraph/protos/pb"
Expand All @@ -38,6 +39,13 @@ func TestRemoveNode(t *testing.T) {
require.Error(t, err)
}

func TestIdLeaseOverflow(t *testing.T) {
require.NoError(t, testutil.AssignUids(100))
err := testutil.AssignUids(math.MaxUint64 - 10)
require.Error(t, err)
require.Contains(t, err.Error(), "limit has reached")
}

func TestIdBump(t *testing.T) {
dialOpts := []grpc.DialOption{
grpc.WithBlock(),
Expand Down
24 changes: 23 additions & 1 deletion testutil/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,29 @@ top:

// AssignUids talks to zero to assign the given number of uids.
func AssignUids(num uint64) error {
_, err := http.Get(fmt.Sprintf("http://"+SockAddrZeroHttp+"/assign?what=uids&num=%d", num))
resp, err := http.Get(fmt.Sprintf("http://"+SockAddrZeroHttp+"/assign?what=uids&num=%d", num))
type assignResp struct {
Errors []struct {
Message string
Code string
}
}
var data assignResp
if err == nil && resp != nil && resp.Body != nil {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if err := resp.Body.Close(); err != nil {
return err
}
if err := json.Unmarshal(body, &data); err != nil {
return err
}
if len(data.Errors) > 0 {
return errors.New(data.Errors[0].Message)
}
}
return err
}

Expand Down
30 changes: 27 additions & 3 deletions xidmap/xidmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"context"
"encoding/binary"
"math/rand"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -36,6 +39,8 @@ import (
"github.com/golang/glog"
)

var maxLeaseRegex = regexp.MustCompile(`currMax:([0-9]+)`)

// XidMapOptions specifies the options for creating a new xidmap.
type XidMapOptions struct {
UidAssigner *grpc.ClientConn
Expand Down Expand Up @@ -290,12 +295,30 @@ func (m *XidMap) updateMaxSeen(max uint64) {
// BumpTo can be used to make Zero allocate UIDs up to this given number. Attempts are made to
// ensure all future allocations of UIDs be higher than this one, but results are not guaranteed.
func (m *XidMap) BumpTo(uid uint64) {
curMax := atomic.LoadUint64(&m.maxUidSeen)
if uid <= curMax {
return
// If we have a cluster that cannot lease out new UIDs because it has already leased upto its
// max limit. Now, we try to live load the data with the given UIDs and the AssignIds complains
// that the limit has reached. Hence, update the xidmap's maxSeenUid and make progress.
updateLease := func(msg string) {
if !strings.Contains(msg, "limit has reached. currMax:") {
return
}
matches := maxLeaseRegex.FindAllStringSubmatch(msg, 1)
if len(matches) == 0 {
return
}
maxUidLeased, err := strconv.ParseUint(matches[0][1], 10, 64)
if err != nil {
glog.Errorf("While parsing currMax %+v", err)
return
}
m.updateMaxSeen(maxUidLeased)
}

for {
curMax := atomic.LoadUint64(&m.maxUidSeen)
if uid <= curMax {
return
}
glog.V(1).Infof("Bumping up to %v", uid)
num := x.Max(uid-curMax, 1e4)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
Expand All @@ -307,6 +330,7 @@ func (m *XidMap) BumpTo(uid uint64) {
m.updateMaxSeen(assigned.EndId)
return
}
updateLease(err.Error())
glog.Errorf("While requesting AssignUids(%d): %v", num, err)
if x.IsJwtExpired(err) {
if err := m.relogin(); err != nil {
Expand Down

0 comments on commit 1235438

Please sign in to comment.