Skip to content

Commit b8b51d2

Browse files
ahsanbarkatimangalaman93
authored andcommitted
cherry-pick PR #7753
This commit is a major rewrite of online restore code. It used to use KVLoader in badger. Now it instead uses StreamWriter that is much faster for writes in the case of restore. following commits are cherry-picked (in reverse order): * fix(backup): Free the UidPack after use (#7786) * fix(export-backup): Fix double free in export backup (#7780) (#7783) * fix(lsbackup): Fix profiler in lsBackup (#7729) * Bring back "perf(Backup): Improve backup performance (#7601)" * Opt(Backup): Make backups faster (#7680) * Fix s3 backup copy (#7669) * [BREAKING] Opt(Restore): Optimize Restore's new map-reduce based design (#7666) * Perf(restore): Implement map-reduce based restore (#7664) * feat(backup): Merge backup refactoring * Revert "perf(Backup): Improve backup performance (#7601)" * fix(ee): GetKeys should return an error (#7713) (#7797) * fix(restore): Bump uid and namespace after restore (#7790) (#7800) * fix: fixing graphql schema update when the data is restored + skipping /probe/graphql from audit (#7925) * Don't ban namespace in export_backup
1 parent 501325c commit b8b51d2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+3766
-3349
lines changed

codec/codec.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,8 @@ func Decode(pack *pb.UidPack, seek uint64) []uint64 {
405405

406406
// DecodeToBuffer is the same as Decode but it returns a z.Buffer which is
407407
// calloc'ed and can be SHOULD be freed up by calling buffer.Release().
408-
func DecodeToBuffer(buf *z.Buffer, pack *pb.UidPack) {
408+
func DecodeToBuffer(buf *z.Buffer, pack *pb.UidPack) *z.Buffer {
409+
409410
var last uint64
410411
tmp := make([]byte, 16)
411412
dec := Decoder{Pack: pack}
@@ -416,6 +417,7 @@ func DecodeToBuffer(buf *z.Buffer, pack *pb.UidPack) {
416417
last = u
417418
}
418419
}
420+
return buf
419421
}
420422

421423
func match32MSB(num1, num2 uint64) bool {

codec/codec_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ func TestBufferUidPack(t *testing.T) {
7575
// Some edge case tests.
7676
pack := Encode([]uint64{}, 128)
7777
FreePack(pack)
78-
7978
buf := z.NewBuffer(10<<10, "TestBufferUidPack")
8079
defer buf.Release()
8180
DecodeToBuffer(buf, &pb.UidPack{})

dgraph/cmd/bulk/reduce.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -481,8 +481,8 @@ func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *cou
481481
partitionKeys = append(partitionKeys, nil)
482482

483483
for i := 0; i < len(partitionKeys); i++ {
484+
pkey := partitionKeys[i]
484485
for _, itr := range mapItrs {
485-
pkey := partitionKeys[i]
486486
itr.Next(cbuf, pkey)
487487
}
488488
if cbuf.LenNoPadding() < 256<<20 {

dgraph/cmd/decrypt/decrypt.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
)
3131

3232
type options struct {
33-
// keyfile comes from the encryption or Vault flags
33+
// keyfile comes from the encryption_key_file or Vault flags
3434
keyfile x.Sensitive
3535
file string
3636
output string

dgraph/cmd/increment/increment.go

+27-17
Original file line numberDiff line numberDiff line change
@@ -199,40 +199,50 @@ func run(conf *viper.Viper) {
199199
dg = dgTmp
200200
}
201201

202-
// Run things serially first.
203-
for i := 0; i < conc; i++ {
204-
_, err := process(dg, conf)
205-
x.Check(err)
206-
num--
202+
addOne := func(i int) error {
203+
txnStart := time.Now() // Start time of transaction
204+
cnt, err := process(dg, conf)
205+
now := time.Now().UTC().Format(format)
206+
if err != nil {
207+
return err
208+
}
209+
serverLat := cnt.qLatency + cnt.mLatency
210+
clientLat := time.Since(txnStart).Round(time.Millisecond)
211+
fmt.Printf(
212+
"[w%d] %-17s Counter VAL: %d [ Ts: %d ] Latency: Q %s M %s S %s C %s D %s\n",
213+
i, now, cnt.Val, cnt.startTs, cnt.qLatency, cnt.mLatency,
214+
serverLat, clientLat, clientLat-serverLat)
215+
return nil
216+
}
217+
218+
// Run things serially first, if conc > 1.
219+
if conc > 1 {
220+
for i := 0; i < conc; i++ {
221+
err := addOne(0)
222+
x.Check(err)
223+
num--
224+
}
207225
}
208226

209227
var wg sync.WaitGroup
210-
f := func(i int) {
228+
f := func(worker int) {
211229
defer wg.Done()
212230
count := 0
213231
for count < num {
214-
txnStart := time.Now() // Start time of transaction
215-
cnt, err := process(dg, conf)
216-
now := time.Now().UTC().Format(format)
217-
if err != nil {
232+
if err := addOne(worker); err != nil {
233+
now := time.Now().UTC().Format(format)
218234
fmt.Printf("%-17s While trying to process counter: %v. Retrying...\n", now, err)
219235
time.Sleep(time.Second)
220236
continue
221237
}
222-
serverLat := cnt.qLatency + cnt.mLatency
223-
clientLat := time.Since(txnStart).Round(time.Millisecond)
224-
fmt.Printf(
225-
"[%d] %-17s Counter VAL: %d [ Ts: %d ] Latency: Q %s M %s S %s C %s D %s\n",
226-
i, now, cnt.Val, cnt.startTs, cnt.qLatency, cnt.mLatency,
227-
serverLat, clientLat, clientLat-serverLat)
228238
time.Sleep(waitDur)
229239
count++
230240
}
231241
}
232242

233243
for i := 0; i < conc; i++ {
234244
wg.Add(1)
235-
go f(i)
245+
go f(i + 1)
236246
}
237247
wg.Wait()
238248
}

dgraph/cmd/root_ee.go

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
//go:build !oss
12
// +build !oss
23

34
/*

dgraph/cmd/zero/assign.go

+22-1
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,9 @@ func (s *Server) lease(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error
175175
}
176176

177177
// AssignIds is used to assign new ids (UIDs, NsIDs) by communicating with the leader of the
178-
// RAFT group responsible for handing out ids.
178+
// RAFT group responsible for handing out ids. If bump is set to true in the request then the
179+
// lease for the given id type is bumped to num.Val and {startId, endId} of the newly leased ids
180+
// in the process of bump is returned.
179181
func (s *Server) AssignIds(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) {
180182
if ctx.Err() != nil {
181183
return &emptyAssignedIds, ctx.Err()
@@ -246,6 +248,25 @@ func (s *Server) AssignIds(ctx context.Context, num *pb.Num) (*pb.AssignedIds, e
246248
return err
247249
}
248250

251+
// If this is a bump request and the current node is the leader then we create a normal lease
252+
// request based on the number of required ids to reach the asked bump value. If the current
253+
// node is not the leader then the bump request will be forwarded to the leader by lease().
254+
if num.GetBump() && s.Node.AmLeader() {
255+
s.leaseLock.Lock()
256+
cur := s.nextLease[num.GetType()] - 1
257+
s.leaseLock.Unlock()
258+
259+
// We need to lease more UIDs if bump request is more than current max lease.
260+
req := num.GetVal()
261+
if cur >= req {
262+
return &emptyAssignedIds, errors.Errorf("Nothing to be leased")
263+
}
264+
num.Val = req - cur
265+
266+
// Set bump to false because we want to lease the required ids in the following request.
267+
num.Bump = false
268+
}
269+
249270
c := make(chan error, 1)
250271
go func() {
251272
c <- lease()

dgraph/cmd/zero/zero_test.go

+39
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/dgraph-io/dgraph/protos/pb"
2525
"github.com/dgraph-io/dgraph/testutil"
2626
"github.com/stretchr/testify/require"
27+
"google.golang.org/grpc"
2728
)
2829

2930
func TestRemoveNode(t *testing.T) {
@@ -44,3 +45,41 @@ func TestIdLeaseOverflow(t *testing.T) {
4445
require.Error(t, err)
4546
require.Contains(t, err.Error(), "limit has reached")
4647
}
48+
49+
func TestIdBump(t *testing.T) {
50+
dialOpts := []grpc.DialOption{
51+
grpc.WithBlock(),
52+
grpc.WithInsecure(),
53+
}
54+
ctx := context.Background()
55+
con, err := grpc.DialContext(ctx, testutil.SockAddrZero, dialOpts...)
56+
require.NoError(t, err)
57+
58+
zc := pb.NewZeroClient(con)
59+
60+
res, err := zc.AssignIds(ctx, &pb.Num{Val: 10, Type: pb.Num_UID})
61+
require.NoError(t, err)
62+
require.Equal(t, uint64(10), res.GetEndId()-res.GetStartId()+1)
63+
64+
// Next assignemnt's startId should be greater than 10.
65+
res, err = zc.AssignIds(ctx, &pb.Num{Val: 50, Type: pb.Num_UID})
66+
require.NoError(t, err)
67+
require.Greater(t, res.GetStartId(), uint64(10))
68+
require.Equal(t, uint64(50), res.GetEndId()-res.GetStartId()+1)
69+
70+
bumpTo := res.GetEndId() + 100000
71+
72+
// Bump the lease to (last result + 100000).
73+
res, err = zc.AssignIds(ctx, &pb.Num{Val: bumpTo, Type: pb.Num_UID, Bump: true})
74+
require.NoError(t, err)
75+
76+
// Next assignemnt's startId should be greater than bumpTo.
77+
res, err = zc.AssignIds(ctx, &pb.Num{Val: 10, Type: pb.Num_UID})
78+
require.NoError(t, err)
79+
require.Greater(t, res.GetStartId(), bumpTo)
80+
require.Equal(t, uint64(10), res.GetEndId()-res.GetStartId()+1)
81+
82+
// If bump request is less than maxLease, then it should result in no-op.
83+
res, err = zc.AssignIds(ctx, &pb.Num{Val: 10, Type: pb.Num_UID, Bump: true})
84+
require.Contains(t, err.Error(), "Nothing to be leased")
85+
}

dgraph/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func main() {
5757
for range ticker.C {
5858
// Read Jemalloc stats first. Print if there's a big difference.
5959
z.ReadMemStats(&js)
60-
if diff := absDiff(uint64(z.NumAllocBytes()), lastAlloc); diff > 256<<20 {
60+
if diff := absDiff(uint64(z.NumAllocBytes()), lastAlloc); diff > 1<<30 {
6161
glog.V(2).Infof("NumAllocBytes: %s jemalloc: Active %s Allocated: %s"+
6262
" Resident: %s Retained: %s\n",
6363
humanize.IBytes(uint64(z.NumAllocBytes())),

ee/acl/acl_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/dgraph-io/dgo/v210/protos/api"
2929
"github.com/dgraph-io/dgraph/testutil"
3030
"github.com/dgraph-io/dgraph/x"
31+
3132
"github.com/golang/glog"
3233
"github.com/stretchr/testify/require"
3334
)
@@ -2858,7 +2859,7 @@ func TestGuardianOnlyAccessForAdminEndpoints(t *testing.T) {
28582859
queryName: "listBackups",
28592860
respIsArray: true,
28602861
testGuardianAccess: true,
2861-
guardianErr: "The path \"\" does not exist or it is inaccessible.",
2862+
guardianErr: `The uri path: "" doesn't exist`,
28622863
guardianData: `{"listBackups": []}`,
28632864
},
28642865
{
@@ -2939,7 +2940,7 @@ func TestGuardianOnlyAccessForAdminEndpoints(t *testing.T) {
29392940
}`,
29402941
queryName: "restore",
29412942
testGuardianAccess: true,
2942-
guardianErr: "The path \"\" does not exist or it is inaccessible.",
2943+
guardianErr: `The uri path: "" doesn't exist`,
29432944
guardianData: `{"restore": {"code": "Failure"}}`,
29442945
},
29452946
{

ee/audit/interceptor_ee.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,9 @@ var skipApis = map[string]bool{
6363

6464
var skipEPs = map[string]bool{
6565
// list of endpoints that needs to be skipped
66-
"/health": true,
67-
"/state": true,
66+
"/health": true,
67+
"/state": true,
68+
"/probe/graphql": true,
6869
}
6970

7071
func AuditRequestGRPC(ctx context.Context, req interface{},

0 commit comments

Comments
 (0)