Skip to content

Commit

Permalink
opt(schema): Load schema and types using Stream framework (#8562)
Browse files Browse the repository at this point in the history
For big datasets, we're seeing a big slowdown due to loading schema and
types serially using a single iterator. Using the Stream framework,
makes this metadata loading step much faster, resulting in a much faster
Alpha initialization.

(cherry picked from commit d03d5ad)
  • Loading branch information
mangalaman93 authored Jan 17, 2023
1 parent 2aed52e commit f89eeef
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 78 deletions.
134 changes: 62 additions & 72 deletions schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@
package schema

import (
"bytes"
"context"
"encoding/hex"
"fmt"
"math"
"sync"

"github.com/golang/glog"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
"golang.org/x/net/trace"

"github.com/dgraph-io/badger/v3"
badgerpb "github.com/dgraph-io/badger/v3/pb"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/tok"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/x"

"github.com/golang/glog"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
"golang.org/x/net/trace"
)

var (
Expand Down Expand Up @@ -190,7 +190,7 @@ func logUpdate(schema *pb.SchemaUpdate, pred string) string {
pred, typ, schema.Tokenizer, schema.Directive, schema.Count)
}

func logTypeUpdate(typ pb.TypeUpdate, typeName string) string {
func logTypeUpdate(typ *pb.TypeUpdate, typeName string) string {
return fmt.Sprintf("Setting type definition for type %s: %v\n", typeName, typ)
}

Expand Down Expand Up @@ -239,10 +239,10 @@ func GetIndexingPredicates() []string {

// SetType sets the type for the given predicate in memory.
// schema mutations must flow through the update function, which are synced to the db.
func (s *state) SetType(typeName string, typ pb.TypeUpdate) {
func (s *state) SetType(typeName string, typ *pb.TypeUpdate) {
s.Lock()
defer s.Unlock()
s.types[typeName] = &typ
s.types[typeName] = typ
s.elog.Printf(logTypeUpdate(typ, typeName))
}

Expand Down Expand Up @@ -498,83 +498,73 @@ func Load(predicate string) error {
}

// LoadFromDb reads schema information from db and stores it in memory
func LoadFromDb() error {
if err := LoadSchemaFromDb(); err != nil {
func LoadFromDb(ctx context.Context) error {
if err := loadFromDB(ctx, loadSchema); err != nil {
return err
}
return LoadTypesFromDb()
return loadFromDB(ctx, loadType)
}

// LoadSchemaFromDb iterates through the DB and loads all the stored schema updates.
func LoadSchemaFromDb() error {
prefix := x.SchemaPrefix()
txn := pstore.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()
itr := txn.NewIterator(badger.DefaultIteratorOptions) // Need values, reversed=false.
defer itr.Close()
const (
loadSchema int = iota
loadType
)

for itr.Seek(prefix); itr.Valid(); itr.Next() {
item := itr.Item()
key := item.Key()
if !bytes.HasPrefix(key, prefix) {
break
}
pk, err := x.Parse(key)
if err != nil {
glog.Errorf("Error while parsing key %s: %v", hex.Dump(key), err)
continue
}
attr := pk.Attr
var s pb.SchemaUpdate
err = item.Value(func(val []byte) error {
if len(val) == 0 {
s = pb.SchemaUpdate{Predicate: attr, ValueType: pb.Posting_DEFAULT}
}
x.Checkf(s.Unmarshal(val), "Error while loading schema from db")
State().Set(attr, &s)
return nil
})
if err != nil {
return err
}
}
return nil
}
// loadFromDb iterates through the DB and loads all the stored schema updates.
func loadFromDB(ctx context.Context, loadType int) error {
stream := pstore.NewStreamAt(math.MaxUint64)

// LoadTypesFromDb iterates through the DB and loads all the stored type updates.
func LoadTypesFromDb() error {
prefix := x.TypePrefix()
txn := pstore.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()
itr := txn.NewIterator(badger.DefaultIteratorOptions) // Need values, reversed=false.
defer itr.Close()
switch loadType {
case loadSchema:
stream.Prefix = x.SchemaPrefix()
stream.LogPrefix = "LoadFromDb Schema"
case loadType:
stream.Prefix = x.TypePrefix()
stream.LogPrefix = "LoadFromDb Type"
default:
glog.Fatalf("Invalid load type")
}

for itr.Seek(prefix); itr.Valid(); itr.Next() {
stream.KeyToList = func(key []byte, itr *badger.Iterator) (*badgerpb.KVList, error) {
item := itr.Item()
key := item.Key()
if !bytes.HasPrefix(key, prefix) {
break
}
pk, err := x.Parse(key)
if err != nil {
glog.Errorf("Error while parsing key %s: %v", hex.Dump(key), err)
continue
return nil, nil
}
attr := pk.Attr
var t pb.TypeUpdate
err = item.Value(func(val []byte) error {
if len(val) == 0 {
t = pb.TypeUpdate{TypeName: attr}
}
x.Checkf(t.Unmarshal(val), "Error while loading types from db")
State().SetType(attr, t)
return nil
})
if err != nil {
return err
if len(pk.Attr) == 0 {
glog.Warningf("Empty Attribute: %+v for Key: %x\n", pk, key)
return nil, nil
}

switch loadType {
case loadSchema:
var s pb.SchemaUpdate
err := item.Value(func(val []byte) error {
if len(val) == 0 {
s = pb.SchemaUpdate{Predicate: pk.Attr, ValueType: pb.Posting_DEFAULT}
}
x.Checkf(s.Unmarshal(val), "Error while loading schema from db")
State().Set(pk.Attr, &s)
return nil
})
return nil, err
case loadType:
var t pb.TypeUpdate
err := item.Value(func(val []byte) error {
if len(val) == 0 {
t = pb.TypeUpdate{TypeName: pk.Attr}
}
x.Checkf(t.Unmarshal(val), "Error while loading types from db")
State().SetType(pk.Attr, &t)
return nil
})
return nil, err
}
glog.Fatalf("Invalid load type")
return nil, errors.New("shouldn't reach here")
}
return nil
return stream.Orchestrate(ctx)
}

// InitialTypes returns the type updates to insert at the beginning of
Expand Down
2 changes: 1 addition & 1 deletion worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ func (n *node) retrieveSnapshot(snap pb.Snapshot) error {
}
// Populate shard stores the streamed data directly into db, so we need to refresh
// schema for current group id
if err := schema.LoadFromDb(); err != nil {
if err := schema.LoadFromDb(closer.Ctx()); err != nil {
return errors.Wrapf(err, "while initializing schema")
}
groups().triggerMembershipSync()
Expand Down
8 changes: 6 additions & 2 deletions worker/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,20 +150,24 @@ func StartRaftNodes(walStore *raftwal.DiskStorage, bindall bool) {

gr.Node = newNode(walStore, gid, raftIdx, x.WorkerConfig.MyAddr)

x.Checkf(schema.LoadFromDb(), "Error while initializing schema")
x.Checkf(schema.LoadFromDb(context.Background()), "Error while initializing schema")
glog.Infof("Load schema from DB: OK")
raftServer.UpdateNode(gr.Node.Node)
gr.Node.InitAndStartNode()
glog.Infof("Init and start Raft node: OK")

go gr.sendMembershipUpdates()
go gr.receiveMembershipUpdates()
go gr.processOracleDeltaStream()

gr.informZeroAboutTablets()
glog.Infof("Informed Zero about tablets I have: OK")
gr.applyInitialSchema()
gr.applyInitialTypes()
glog.Infof("Upserted Schema and Types: OK")

x.UpdateHealthStatus(true)
glog.Infof("Server is ready")
glog.Infof("Server is ready: OK")
}

func (g *groupi) Ctx() context.Context {
Expand Down
4 changes: 2 additions & 2 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,14 +340,14 @@ func createSchema(attr string, typ types.TypeID, hint pb.Metadata_HintType, ts u

func runTypeMutation(ctx context.Context, update *pb.TypeUpdate, ts uint64) error {
current := *update
schema.State().SetType(update.TypeName, current)
schema.State().SetType(update.TypeName, &current)
return updateType(update.TypeName, *update, ts)
}

// We commit schema to disk in blocking way, should be ok because this happens
// only during schema mutations or we see a new predicate.
func updateType(typeName string, t pb.TypeUpdate, ts uint64) error {
schema.State().SetType(typeName, t)
schema.State().SetType(typeName, &t)
txn := pstore.NewTransactionAt(ts, true)
defer txn.Discard()
data, err := t.Marshal()
Expand Down
3 changes: 2 additions & 1 deletion worker/online_restore_ee.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build !oss
// +build !oss

/*
Expand Down Expand Up @@ -264,7 +265,7 @@ func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest, pidx uin
}

// Load schema back.
if err := schema.LoadFromDb(); err != nil {
if err := schema.LoadFromDb(ctx); err != nil {
return errors.Wrapf(err, "cannot load schema after restore")
}

Expand Down

0 comments on commit f89eeef

Please sign in to comment.