Skip to content

Commit

Permalink
Restore schema states on error, and fixing flaky test (#2730)
Browse files Browse the repository at this point in the history
fixes #2654 fixes #2717
  • Loading branch information
Lucas Wang authored Nov 8, 2018
1 parent 7a59e8d commit e9d1a6b
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 24 deletions.
36 changes: 28 additions & 8 deletions systest/mutations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,27 +234,47 @@ func NQuadMutationTest(t *testing.T, c *dgo.Dgraph) {
func DeleteAllReverseIndex(t *testing.T, c *dgo.Dgraph) {
ctx := context.Background()
require.NoError(t, c.Alter(ctx, &api.Operation{Schema: "link: uid @reverse ."}))
_, err := c.NewTxn().Mutate(ctx, &api.Mutation{
assignedIds, err := c.NewTxn().Mutate(ctx, &api.Mutation{
CommitNow: true,
SetNquads: []byte("<0x1> <link> <0x2> ."),
SetNquads: []byte("_:a <link> _:b ."),
})
require.NoError(t, err)
aId := assignedIds.Uids["a"]
bId := assignedIds.Uids["b"]

/**
we must run a query first before the next delete transaction, the
reason is that a mutation does not wait for the previous mutation
to finish completely with a commitTs from zero. If we run the
deletion directly, and the previous mutation has not received a
commitTs from zero yet, then the deletion will skip the link, and
essentially be treated as a no-op. As a result, when we query it
again following the reverse link, the link would still exist, and
the test would fail.
Running a query would make sure that the previous mutation for
creating the link has completed with a commitTs from zero, and the
subsequent deletion is done *AFTER* the link creation.
*/
c.NewReadOnlyTxn().Query(ctx, fmt.Sprintf("{ q(func: uid(%s)) { link { uid } }}", aId))

_, err = c.NewTxn().Mutate(ctx, &api.Mutation{
CommitNow: true,
DelNquads: []byte("<0x1> <link> * ."),
DelNquads: []byte(fmt.Sprintf("<%s> <link> * .", aId)),
})
resp, err := c.NewTxn().Query(ctx, "{ q(func: uid(0x2)) { ~link { uid } }}")
resp, err := c.NewTxn().Query(ctx, fmt.Sprintf("{ q(func: uid(%s)) { ~link { uid } }}", bId))
require.NoError(t, err)
CompareJSON(t, `{"q":[]}`, string(resp.Json))

_, err = c.NewTxn().Mutate(ctx, &api.Mutation{
assignedIds, err = c.NewTxn().Mutate(ctx, &api.Mutation{
CommitNow: true,
SetNquads: []byte("<0x1> <link> <0x3> ."),
SetNquads: []byte(fmt.Sprintf("<%s> <link> _:c .", aId)),
})
resp, err = c.NewTxn().Query(ctx, "{ q(func: uid(0x3)) { ~link { uid } }}")
cId := assignedIds.Uids["c"]

resp, err = c.NewTxn().Query(ctx, fmt.Sprintf("{ q(func: uid(%s)) { ~link { uid } }}", cId))
require.NoError(t, err)
CompareJSON(t, `{"q":[{"~link": [{"uid": "0x1"}]}]}`, string(resp.Json))
CompareJSON(t, fmt.Sprintf(`{"q":[{"~link": [{"uid": "%s"}]}]}`, aId), string(resp.Json))
}

func ExpandAllReversePredicatesTest(t *testing.T, c *dgo.Dgraph) {
Expand Down
23 changes: 7 additions & 16 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,14 +448,10 @@ func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error {
if txn == nil {
return
}
var err error
for retry := Config.MaxRetries; retry != 0; retry-- {
err = txn.CommitToDisk(writer, commit)
if err == nil {
break
}
time.Sleep(10 * time.Millisecond)
}
err := x.RetryUntilSuccess(Config.MaxRetries, 10 * time.Millisecond, func() error {
return txn.CommitToDisk(writer, commit)
})

if err != nil {
glog.Errorf("Error while applying txn status to disk (%d -> %d): %v",
start, commit, err)
Expand All @@ -481,14 +477,9 @@ func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error {
if txn == nil {
return
}
var err error
for retry := Config.MaxRetries; retry != 0; retry-- {
err = txn.CommitToMemory(commit)
if err == nil {
break
}
time.Sleep(10 * time.Millisecond)
}
err := x.RetryUntilSuccess(Config.MaxRetries, 10 * time.Millisecond, func() error {
return txn.CommitToMemory(commit)
})
if err != nil {
glog.Errorf("Error while applying txn status to memory (%d -> %d): %v",
start, commit, err)
Expand Down
9 changes: 9 additions & 0 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ func runMutation(ctx context.Context, edge *pb.DirectedEdge, txn *posting.Txn) e
// and further mutations are blocked until this is done.
func runSchemaMutation(ctx context.Context, update *pb.SchemaUpdate, startTs uint64) error {
if err := runSchemaMutationHelper(ctx, update, startTs); err != nil {
// on error, we restore the memory state to be the same as the disk
maxRetries := 10
loadErr := x.RetryUntilSuccess(maxRetries, 10 * time.Millisecond, func() error {
return schema.Load(update.Predicate)
})

if loadErr != nil {
glog.Fatalf("failed to load schema after %d retries: %v", maxRetries, loadErr)
}
return err
}

Expand Down
14 changes: 14 additions & 0 deletions x/x.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,20 @@ func Max(a, b uint64) uint64 {
return b
}

func RetryUntilSuccess(maxRetries int, sleepDurationOnFailure time.Duration,
f func() error) error {
var err error
for retry := maxRetries; retry != 0; retry-- {
if err = f(); err == nil {
return nil
}
if sleepDurationOnFailure > 0 {
time.Sleep(sleepDurationOnFailure)
}
}
return err
}

func HasString(a []string, b string) bool {
for _, k := range a {
if k == b {
Expand Down

0 comments on commit e9d1a6b

Please sign in to comment.