Skip to content

Commit 2d2e99a

Browse files
martinmrdna2github
authored andcommitted
Stream the full set of predicates and types during a snapshot. (hypermodeinc#5444)
Type and predicate keys always have a timestamp of 1 so they are not being picked up by every snapshot. This PR adds logic to send the entire schema in the snapshot and to delete predicates and types not present in the snapshot (since they were deleted in between snapshots).
1 parent 081cb56 commit 2d2e99a

File tree

8 files changed

+562
-288
lines changed

8 files changed

+562
-288
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ require (
1616
github.com/blevesearch/segment v0.0.0-20160915185041-762005e7a34f // indirect
1717
github.com/blevesearch/snowballstem v0.0.0-20180110192139-26b06a2c243d // indirect
1818
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd
19-
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200506081535-536fed1846d0
19+
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515210839-ef28ef36b592
2020
github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453
2121
github.com/dgraph-io/ristretto v0.0.2
2222
github.com/dgrijalva/jwt-go v3.2.0+incompatible

go.sum

+6-2
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,12 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
7676
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
7777
github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo=
7878
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
79-
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200506081535-536fed1846d0 h1:4VBIyLibX6qFfz6wSbEvp4RBfoKETvHfIln18ROLiHI=
80-
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200506081535-536fed1846d0/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM=
79+
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515084818-8ee988574454 h1:xxDt2YtzZ2tVCWVk4tcYyKtZVtWRcScAcsbNhHcE/Qg=
80+
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515084818-8ee988574454/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM=
81+
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515182826-fafb627b4ce4 h1:0hhuliB6NlxxmqQdrl0MkZi+qjibu69uPKtckRn8E0I=
82+
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515182826-fafb627b4ce4/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM=
83+
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515210839-ef28ef36b592 h1:j6j3yunDbktI4H3tbj3grt2enO4EPbhstU6Tb8HwqdQ=
84+
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515210839-ef28ef36b592/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM=
8185
github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453 h1:DTgOrw91nMIukDm/WEvdobPLl0LgeDd/JE66+24jBks=
8286
github.com/dgraph-io/dgo/v200 v200.0.0-20200401175452-e463f9234453/go.mod h1:Co+FwJrnndSrPORO8Gdn20dR7FPTfmXr0W/su0Ve/Ig=
8387
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 h1:MQLRM35Pp0yAyBYksjbj1nZI/w6eyRY/mWoM1sFf4kU=

protos/pb.proto

+8-3
Original file line numberDiff line numberDiff line change
@@ -295,9 +295,14 @@ message Proposal {
295295
}
296296

297297
message KVS {
298-
repeated badgerpb2.KV kv = 1;
299-
// done used to indicate if the stream of KVS is over.
300-
bool done = 2;
298+
repeated badgerpb2.KV kv = 1;
299+
300+
// done used to indicate if the stream of KVS is over.
301+
bool done = 2;
302+
// predicates is the list of predicates known by the leader at the time of the snapshot.
303+
repeated string predicates = 3;
304+
// types is the list of types known by the leader at the time of the snapshot.
305+
repeated string types = 4;
301306
}
302307

303308
// Posting messages.

protos/pb/pb.pb.go

+391-278
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

schema/schema.go

+12
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ func (s *state) Delete(attr string) error {
117117

118118
// DeleteType updates the schema in memory and disk
119119
func (s *state) DeleteType(typeName string) error {
120+
if s == nil {
121+
return nil
122+
}
123+
120124
s.Lock()
121125
defer s.Unlock()
122126

@@ -265,6 +269,10 @@ func (s *state) IsIndexed(ctx context.Context, pred string) bool {
265269

266270
// Predicates returns the list of predicates for given group
267271
func (s *state) Predicates() []string {
272+
if s == nil {
273+
return nil
274+
}
275+
268276
s.RLock()
269277
defer s.RUnlock()
270278
var out []string
@@ -276,6 +284,10 @@ func (s *state) Predicates() []string {
276284

277285
// Types returns the list of types.
278286
func (s *state) Types() []string {
287+
if s == nil {
288+
return nil
289+
}
290+
279291
s.RLock()
280292
defer s.RUnlock()
281293
var out []string

testutil/client.go

+17
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,23 @@ func RetryQuery(dg *dgo.Dgraph, q string) (*api.Response, error) {
173173
}
174174
}
175175

176+
// RetryBadQuery will retry a query until it failse with a non-retryable error.
177+
func RetryBadQuery(dg *dgo.Dgraph, q string) (*api.Response, error) {
178+
for {
179+
txn := dg.NewTxn()
180+
ctx := context.Background()
181+
resp, err := txn.Query(ctx, q)
182+
if err == nil || strings.Contains(err.Error(), "Please retry") {
183+
time.Sleep(10 * time.Millisecond)
184+
txn.Discard(ctx)
185+
continue
186+
}
187+
188+
txn.Discard(ctx)
189+
return resp, err
190+
}
191+
}
192+
176193
// RetryMutation will retry a mutation until it succeeds or a non-retryable error is received.
177194
// The mutation should have CommitNow set to true.
178195
func RetryMutation(dg *dgo.Dgraph, mu *api.Mutation) error {

worker/snapshot.go

+88-3
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,21 @@
1717
package worker
1818

1919
import (
20+
"context"
2021
"sync/atomic"
22+
"time"
2123

2224
bpb "github.com/dgraph-io/badger/v2/pb"
2325
"github.com/golang/glog"
26+
"github.com/pkg/errors"
2427
"go.etcd.io/etcd/raft"
2528

2629
"github.com/dgraph-io/badger/v2"
2730
"github.com/dgraph-io/dgraph/conn"
2831
"github.com/dgraph-io/dgraph/posting"
2932
"github.com/dgraph-io/dgraph/protos/pb"
33+
"github.com/dgraph-io/dgraph/schema"
34+
"github.com/dgraph-io/dgraph/x"
3035
)
3136

3237
const (
@@ -70,12 +75,14 @@ func (n *node) populateSnapshot(snap pb.Snapshot, pl *conn.Pool) (int, error) {
7075

7176
// We can use count to check the number of posting lists returned in tests.
7277
count := 0
78+
var done *pb.KVS
7379
for {
7480
kvs, err := stream.Recv()
7581
if err != nil {
7682
return count, err
7783
}
7884
if kvs.Done {
85+
done = kvs
7986
glog.V(1).Infoln("All key-values have been received.")
8087
break
8188
}
@@ -95,6 +102,10 @@ func (n *node) populateSnapshot(snap pb.Snapshot, pl *conn.Pool) (int, error) {
95102
return 0, err
96103
}
97104

105+
if err := deleteStalePreds(ctx, done); err != nil {
106+
return count, err
107+
}
108+
98109
glog.Infof("Snapshot writes DONE. Sending ACK")
99110
// Send an acknowledgement back to the leader.
100111
if err := stream.Send(&pb.Snapshot{Done: true}); err != nil {
@@ -104,6 +115,60 @@ func (n *node) populateSnapshot(snap pb.Snapshot, pl *conn.Pool) (int, error) {
104115
return count, nil
105116
}
106117

118+
func deleteStalePreds(ctx context.Context, kvs *pb.KVS) error {
119+
if kvs == nil {
120+
return nil
121+
}
122+
123+
// Look for predicates present in the receiver but not in the list sent by the leader.
124+
// These predicates were deleted in between snapshots and need to be deleted from the
125+
// receiver to keep the schema in sync.
126+
currPredicates := schema.State().Predicates()
127+
snapshotPreds := make(map[string]struct{})
128+
for _, pred := range kvs.Predicates {
129+
snapshotPreds[pred] = struct{}{}
130+
}
131+
for _, pred := range currPredicates {
132+
if _, ok := snapshotPreds[pred]; !ok {
133+
LOOP:
134+
for {
135+
err := posting.DeletePredicate(ctx, pred)
136+
switch err {
137+
case badger.ErrBlockedWrites:
138+
time.Sleep(1 * time.Second)
139+
case nil:
140+
break LOOP
141+
default:
142+
glog.Warningf(
143+
"Cannot delete removed predicate %s after streaming snapshot: %v",
144+
pred, err)
145+
return errors.Wrapf(err,
146+
"cannot delete removed predicate %s after streaming snapshot", pred)
147+
}
148+
}
149+
}
150+
}
151+
152+
// Look for types present in the receiver but not in the list sent by the leader.
153+
// These types were deleted in between snapshots and need to be deleted from the
154+
// receiver to keep the schema in sync.
155+
currTypes := schema.State().Types()
156+
snapshotTypes := make(map[string]struct{})
157+
for _, typ := range kvs.Types {
158+
snapshotTypes[typ] = struct{}{}
159+
}
160+
for _, typ := range currTypes {
161+
if _, ok := snapshotTypes[typ]; !ok {
162+
if err := schema.State().DeleteType(typ); err != nil {
163+
return errors.Wrapf(err, "cannot delete removed type %s after streaming snapshot",
164+
typ)
165+
}
166+
}
167+
}
168+
169+
return nil
170+
}
171+
107172
func doStreamSnapshot(snap *pb.Snapshot, out pb.Worker_StreamSnapshotServer) error {
108173
// We choose not to try and match the requested snapshot from the latest snapshot at the leader.
109174
// This is the job of the Raft library. At the leader end, we service whatever is asked of us.
@@ -138,15 +203,35 @@ func doStreamSnapshot(snap *pb.Snapshot, out pb.Worker_StreamSnapshotServer) err
138203
return out.Send(kvs)
139204
}
140205
stream.ChooseKey = func(item *badger.Item) bool {
141-
return item.Version() >= snap.SinceTs
206+
if item.Version() >= snap.SinceTs {
207+
return true
208+
}
209+
210+
if item.Version() != 1 {
211+
return false
212+
}
213+
214+
// Type and Schema keys always have a timestamp of 1. They all need to be sent
215+
// with the snapshot.
216+
pk, err := x.Parse(item.Key())
217+
if err != nil {
218+
return false
219+
}
220+
return pk.IsSchema() || pk.IsType()
142221
}
143222

144223
if err := stream.Orchestrate(out.Context()); err != nil {
145224
return err
146225
}
147226

148-
// Indicate that sending is done.
149-
if err := out.Send(&pb.KVS{Done: true}); err != nil {
227+
// Indicate that sending is done. Send a list of all the predicate and types at the
228+
// time of the snapshot so that the receiver can delete predicates
229+
done := &pb.KVS{
230+
Done: true,
231+
Predicates: schema.State().Predicates(),
232+
Types: schema.State().Types(),
233+
}
234+
if err := out.Send(done); err != nil {
150235
return err
151236
}
152237

worker/snapshot_test.go

+39-1
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,26 @@ func TestSnapshot(t *testing.T) {
4444
DropOp: api.Operation_ALL,
4545
}))
4646
require.NoError(t, dg1.Alter(context.Background(), &api.Operation{
47-
Schema: "value: int .",
47+
Schema: `
48+
value: int .
49+
name: string .
50+
address: string @index(term) .`,
4851
}))
4952

5053
err = testutil.DockerStop("alpha2")
5154
require.NoError(t, err)
5255

56+
// Update the name predicate to include an index.
57+
require.NoError(t, dg1.Alter(context.Background(), &api.Operation{
58+
Schema: `name: string @index(term) .`,
59+
}))
60+
61+
// Delete the address predicate.
62+
require.NoError(t, dg1.Alter(context.Background(), &api.Operation{
63+
DropOp: api.Operation_ATTR,
64+
DropValue: "address",
65+
}))
66+
5367
for i := 1; i <= 200; i++ {
5468
err := testutil.RetryMutation(dg1, &api.Mutation{
5569
SetNquads: []byte(fmt.Sprintf(`_:node <value> "%d" .`, i)),
@@ -112,6 +126,30 @@ func verifySnapshot(t *testing.T, dg *dgo.Dgraph, num int) {
112126
sum += item["value"]
113127
}
114128
require.Equal(t, expectedSum, sum)
129+
130+
// Perform a query using the updated index in the schema.
131+
q2 := `
132+
{
133+
names(func: anyofterms(name, Mike)) {
134+
name
135+
}
136+
}`
137+
resMap = make(map[string][]map[string]int)
138+
resp, err = testutil.RetryQuery(dg, q2)
139+
require.NoError(t, err)
140+
141+
// Trying to perform a query using the address index should not work since that
142+
// predicate was deleted.
143+
q3 := `
144+
{
145+
addresses(func: anyofterms(address, Mike)) {
146+
address
147+
}
148+
}`
149+
resMap = make(map[string][]map[string]int)
150+
resp, err = testutil.RetryBadQuery(dg, q3)
151+
require.Error(t, err)
152+
require.Contains(t, err.Error(), "Attribute address is not indexed")
115153
}
116154

117155
func waitForSnapshot(t *testing.T, prevSnapTs uint64) uint64 {

0 commit comments

Comments
 (0)