Skip to content

Commit 4badedf

Browse files
author
Ibrahim Jarif
committed
Merge branch 'release/v1.2' into ibrahim/r1.2-b-update
2 parents e6e92bb + a64ecd7 commit 4badedf

File tree

9 files changed

+127
-37
lines changed

9 files changed

+127
-37
lines changed

dgraph/cmd/alpha/run.go

+29-3
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,15 @@ they form a Raft group and provide synchronous replication.
196196

197197
// flag.Bool("graphql_introspection", true, "Set to false for no GraphQL schema introspection")
198198

199+
// Cache flags
200+
flag.Int64("cache_mb", 0, "Total size of cache (in MB) to be used in alpha.")
201+
// TODO(Naman): The PostingListCache is a no-op for now. Once the posting list cache is
202+
// enabled in release branch, use it.
203+
flag.String("cache_percentage", "0,65,25,0,10",
204+
`Cache percentages summing up to 100 for various caches (FORMAT:
205+
PostingListCache,PstoreBlockCache,PstoreIndexCache,WstoreBlockCache,WstoreIndexCache).
206+
PostingListCache should be 0 for now and is a no-op.
207+
`)
199208
}
200209

201210
func setupCustomTokenizers() {
@@ -515,12 +524,29 @@ var shutdownCh chan struct{}
515524
func run() {
516525
bindall = Alpha.Conf.GetBool("bindall")
517526

527+
totalCache := int64(Alpha.Conf.GetInt("cache_mb"))
528+
x.AssertTruef(totalCache >= 0, "ERROR: Cache size must be non-negative")
529+
530+
cachePercentage := Alpha.Conf.GetString("cache_percentage")
531+
cachePercent, err := x.GetCachePercentages(cachePercentage, 5)
532+
x.Check(err)
533+
// TODO(Naman): PostingListCache doesn't exist now.
534+
PostingListCacheSize := (cachePercent[0] * (totalCache << 20)) / 100
535+
x.AssertTruef(PostingListCacheSize == 0, "ERROR: PostingListCacheSize should be 0.")
536+
pstoreBlockCacheSize := (cachePercent[1] * (totalCache << 20)) / 100
537+
pstoreIndexCacheSize := (cachePercent[2] * (totalCache << 20)) / 100
538+
wstoreBlockCacheSize := (cachePercent[3] * (totalCache << 20)) / 100
539+
wstoreIndexCacheSize := (cachePercent[4] * (totalCache << 20)) / 100
540+
518541
opts := worker.Options{
519542
BadgerKeyFile: Alpha.Conf.GetString("encryption_key_file"),
520543
BadgerCompressionLevel: Alpha.Conf.GetInt("badger.compression_level"),
521-
522-
PostingDir: Alpha.Conf.GetString("postings"),
523-
WALDir: Alpha.Conf.GetString("wal"),
544+
PostingDir: Alpha.Conf.GetString("postings"),
545+
WALDir: Alpha.Conf.GetString("wal"),
546+
PBlockCacheSize: pstoreBlockCacheSize,
547+
PIndexCacheSize: pstoreIndexCacheSize,
548+
WBlockCacheSize: wstoreBlockCacheSize,
549+
WIndexCacheSize: wstoreIndexCacheSize,
524550

525551
MutationsMode: worker.AllowMutations,
526552
AuthToken: Alpha.Conf.GetString("auth_token"),

dgraph/cmd/bulk/reduce.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (r *reducer) createBadger(i int) *badger.DB {
103103

104104
opt := badger.DefaultOptions(r.opt.shardOutputDirs[i]).WithSyncWrites(false).
105105
WithTableLoadingMode(bo.MemoryMap).WithValueThreshold(1 << 10 /* 1 KB */).
106-
WithLogger(nil).WithMaxCacheSize(1 << 20).
106+
WithLogger(nil).WithBlockCacheSize(1 << 20).
107107
WithEncryptionKey(enc.ReadEncryptionKeyFile(r.opt.BadgerKeyFile))
108108

109109
// TOOD(Ibrahim): Remove this once badger is updated.

dgraph/cmd/zero/run.go

+24-2
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ type options struct {
5252
peer string
5353
w string
5454
rebalanceInterval time.Duration
55+
56+
totalCache int64
57+
cachePercentage string
5558
}
5659

5760
var opts options
@@ -95,6 +98,11 @@ instances to achieve high-availability.
9598
// about the status of supporting annotation logs through the datadog exporter
9699
flag.String("datadog.collector", "", "Send opencensus traces to Datadog. As of now, the trace"+
97100
" exporter does not support annotation logs and would discard them.")
101+
102+
// Cache flags
103+
flag.Int64("cache_mb", 0, "Total size of cache (in MB) to be used in zero.")
104+
flag.String("cache_percentage", "100,0",
105+
"Cache percentages summing up to 100 for various caches (FORMAT: blockCache,indexCache).")
98106
}
99107

100108
func setupListener(addr string, port int, kind string) (listener net.Listener, err error) {
@@ -168,6 +176,8 @@ func run() {
168176
peer: Zero.Conf.GetString("peer"),
169177
w: Zero.Conf.GetString("wal"),
170178
rebalanceInterval: Zero.Conf.GetDuration("rebalance_interval"),
179+
totalCache: int64(Zero.Conf.GetInt("cache_mb")),
180+
cachePercentage: Zero.Conf.GetString("cache_percentage"),
171181
}
172182

173183
if opts.nodeId == 0 {
@@ -211,10 +221,22 @@ func run() {
211221
log.Fatal(err)
212222
}
213223

224+
x.AssertTruef(opts.totalCache >= 0, "ERROR: Cache size must be non-negative")
225+
226+
cachePercent, err := x.GetCachePercentages(opts.cachePercentage, 2)
227+
x.Check(err)
228+
blockCacheSz := (cachePercent[0] * (opts.totalCache << 20)) / 100
229+
indexCacheSz := (cachePercent[1] * (opts.totalCache << 20)) / 100
230+
214231
// Open raft write-ahead log and initialize raft node.
215232
x.Checkf(os.MkdirAll(opts.w, 0700), "Error while creating WAL dir.")
216-
kvOpt := badger.LSMOnlyOptions(opts.w).WithSyncWrites(false).WithTruncate(true).
217-
WithValueLogFileSize(64 << 20).WithMaxCacheSize(10 << 20).WithLoadBloomsOnOpen(false)
233+
kvOpt := badger.LSMOnlyOptions(opts.w).
234+
WithSyncWrites(false).
235+
WithTruncate(true).
236+
WithValueLogFileSize(64 << 20).
237+
WithBlockCacheSize(blockCacheSz).
238+
WithIndexCacheSize(indexCacheSz).
239+
WithLoadBloomsOnOpen(false)
218240

219241
kvOpt.ZSTDCompressionLevel = 3
220242

posting/index.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,7 @@ func (r *rebuilder) Run(ctx context.Context) error {
533533
WithNumVersionsToKeep(math.MaxInt64).
534534
WithCompression(options.None).
535535
WithLogRotatesToFlush(10).
536-
WithMaxCacheSize(50) // TODO(Aman): Disable cache altogether
536+
WithBlockCacheSize(50) // TODO(Aman): Disable cache altogether
537537

538538
// TODO(Ibrahim): Remove this once badger is updated.
539539
dbOpts.ZSTDCompressionLevel = 1

posting/list.go

+15-25
Original file line numberDiff line numberDiff line change
@@ -1362,6 +1362,13 @@ func shouldSplit(plist *pb.PostingList) bool {
13621362
return plist.Size() >= maxListSize && len(plist.Pack.Blocks) > 1
13631363
}
13641364

1365+
func (out *rollupOutput) updateSplits() {
1366+
if out.plist == nil {
1367+
out.plist = &pb.PostingList{}
1368+
}
1369+
out.plist.Splits = out.splits()
1370+
}
1371+
13651372
func (out *rollupOutput) recursiveSplit() {
13661373
// Call splitUpList. Otherwise the map of startUids to parts won't be initialized.
13671374
out.splitUpList()
@@ -1388,7 +1395,7 @@ func (out *rollupOutput) splitUpList() {
13881395
var lists []*pb.PostingList
13891396

13901397
// If list is not split yet, insert the main list.
1391-
if len(out.plist.Splits) == 0 {
1398+
if len(out.parts) == 0 {
13921399
lists = append(lists, out.plist)
13931400
}
13941401

@@ -1398,13 +1405,10 @@ func (out *rollupOutput) splitUpList() {
13981405
lists = append(lists, part)
13991406
}
14001407

1401-
// List of startUids for each list part after the splitting process is complete.
1402-
var newSplits []uint64
1403-
14041408
for i, list := range lists {
14051409
startUid := uint64(1)
14061410
// If the list is split, select the right startUid for this list.
1407-
if len(out.plist.Splits) > 0 {
1411+
if len(out.parts) > 0 {
14081412
startUid = out.plist.Splits[i]
14091413
}
14101414

@@ -1414,23 +1418,11 @@ func (out *rollupOutput) splitUpList() {
14141418
startUids, pls := binSplit(startUid, list)
14151419
for i, startUid := range startUids {
14161420
out.parts[startUid] = pls[i]
1417-
newSplits = append(newSplits, startUid)
14181421
}
1419-
} else {
1420-
// No need to split the list. Add the startUid to the array of new splits.
1421-
newSplits = append(newSplits, startUid)
14221422
}
14231423
}
14241424

1425-
// No new lists were created so there's no need to update the list of splits.
1426-
if len(newSplits) == len(lists) {
1427-
return
1428-
}
1429-
1430-
// The splits changed so update them.
1431-
out.plist = &pb.PostingList{
1432-
Splits: newSplits,
1433-
}
1425+
out.updateSplits()
14341426
}
14351427

14361428
// binSplit takes the given plist and returns two new plists, each with
@@ -1468,28 +1460,26 @@ func binSplit(lowUid uint64, plist *pb.PostingList) ([]uint64, []*pb.PostingList
14681460

14691461
// removeEmptySplits updates the split list by removing empty posting lists' startUids.
14701462
func (out *rollupOutput) removeEmptySplits() {
1471-
var splits []uint64
14721463
for startUid, plist := range out.parts {
14731464
// Do not remove the first split for now, as every multi-part list should always
14741465
// have a split starting with UID 1.
14751466
if startUid == 1 {
1476-
splits = append(splits, startUid)
14771467
continue
14781468
}
14791469

1480-
if !isPlistEmpty(plist) {
1481-
splits = append(splits, startUid)
1470+
if isPlistEmpty(plist) {
1471+
delete(out.parts, startUid)
14821472
}
14831473
}
1484-
out.plist.Splits = splits
1485-
sortSplits(splits)
1474+
out.updateSplits()
14861475

1487-
if len(out.plist.Splits) == 1 {
1476+
if len(out.parts) == 1 && isPlistEmpty(out.parts[1]) {
14881477
// Only the first split remains. If it's also empty, remove it as well.
14891478
// This should mark the entire list for deletion. Please note that the
14901479
// startUid of the first part is always one because a node can never have
14911480
// its uid set to zero.
14921481
if isPlistEmpty(out.parts[1]) {
1482+
delete(out.parts, 1)
14931483
out.plist.Splits = []uint64{}
14941484
}
14951485
}

posting/list_test.go

+12
Original file line numberDiff line numberDiff line change
@@ -894,6 +894,16 @@ func TestAfterUIDCountWithCommit(t *testing.T) {
894894
require.EqualValues(t, 0, ol.Length(txn.StartTs, 300))
895895
}
896896

897+
func verifySplits(t *testing.T, splits []uint64) {
898+
require.Equal(t, uint64(1), splits[0])
899+
for i, uid := range splits {
900+
if i == 0 {
901+
continue
902+
}
903+
require.Greater(t, uid, splits[i-1])
904+
}
905+
}
906+
897907
func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) {
898908
// For testing, set the max list size to a lower threshold.
899909
maxListSize = 5000
@@ -932,6 +942,7 @@ func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) {
932942
ol, err = getNew(key, ps)
933943
require.NoError(t, err)
934944
require.True(t, len(ol.plist.Splits) > 0)
945+
verifySplits(t, ol.plist.Splits)
935946

936947
return ol, commits
937948
}
@@ -965,6 +976,7 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) {
965976
commits++
966977
}
967978
require.True(t, len(ol.plist.Splits) > 0)
979+
verifySplits(t, ol.plist.Splits)
968980

969981
// Delete all the previously inserted entries from the list.
970982
baseStartTs := uint64(size) + 1

worker/config.go

+9
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,15 @@ type Options struct {
6262
// AllottedMemory is the estimated size taken by the LRU cache.
6363
AllottedMemory float64
6464

65+
// PBlockCacheSize is the size of block cache for pstore
66+
PBlockCacheSize int64
67+
// PIndexCacheSize is the size of index cache for pstore
68+
PIndexCacheSize int64
69+
// WBlockCacheSize is the size of block cache for wstore
70+
WBlockCacheSize int64
71+
// WIndexCacheSize is the size of index cache for wstore
72+
WIndexCacheSize int64
73+
6574
// HmacSecret stores the secret used to sign JSON Web Tokens (JWT).
6675
HmacSecret x.SensitiveByteSlice
6776
// AccessJwtTtl is the TTL for the access JWT.

worker/server_state.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,8 @@ func (s *ServerState) initStorage() {
145145
opt := badger.LSMOnlyOptions(Config.WALDir)
146146
opt = setBadgerOptions(opt, true)
147147
opt.ValueLogMaxEntries = 10000 // Allow for easy space reclamation.
148-
opt.MaxCacheSize = 10 << 20 // 10 mb of cache size for WAL.
148+
opt.BlockCacheSize = Config.WBlockCacheSize
149+
opt.IndexCacheSize = Config.WIndexCacheSize
149150

150151
// Print the options w/o exposing key.
151152
// TODO: Build a stringify interface in Badger options, which is used to print nicely here.
@@ -165,10 +166,8 @@ func (s *ServerState) initStorage() {
165166
opt := badger.DefaultOptions(Config.PostingDir).
166167
WithValueThreshold(1 << 10 /* 1KB */).
167168
WithNumVersionsToKeep(math.MaxInt32).
168-
WithMaxCacheSize(1 << 30).
169-
WithKeepBlockIndicesInCache(true).
170-
WithKeepBlocksInCache(true).
171-
WithMaxBfCacheSize(500 << 20) // 500 MB of bloom filter cache.
169+
WithBlockCacheSize(Config.PBlockCacheSize).
170+
WithIndexCacheSize(Config.PIndexCacheSize)
172171
opt = setBadgerOptions(opt, false)
173172

174173
// Print the options w/o exposing key.

x/x.go

+32
Original file line numberDiff line numberDiff line change
@@ -856,3 +856,35 @@ func RunVlogGC(store *badger.DB, closer *y.Closer) {
856856
}
857857
}
858858
}
859+
860+
// GetCachePercentages returns the slice of cache percentages given the "," (comma) separated
861+
// cache percentages(integers) string and expected number of caches.
862+
func GetCachePercentages(cpString string, numExpected int) ([]int64, error) {
863+
cp := strings.Split(cpString, ",")
864+
// Sanity checks
865+
if len(cp) != numExpected {
866+
return nil, errors.Errorf("ERROR: expected %d cache percentages, got %d",
867+
numExpected, len(cp))
868+
}
869+
870+
var cachePercent []int64
871+
percentSum := 0
872+
for _, percent := range cp {
873+
x, err := strconv.Atoi(percent)
874+
if err != nil {
875+
return nil, errors.Errorf("ERROR: unable to parse cache percentage(%s)", percent)
876+
}
877+
if x < 0 {
878+
return nil, errors.Errorf("ERROR: cache percentage(%s) cannot be negative", percent)
879+
}
880+
cachePercent = append(cachePercent, int64(x))
881+
percentSum += x
882+
}
883+
884+
if percentSum != 100 {
885+
return nil, errors.Errorf("ERROR: cache percentages (%s) does not sum up to 100",
886+
strings.Join(cp, "+"))
887+
}
888+
889+
return cachePercent, nil
890+
}

0 commit comments

Comments
 (0)