Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

track operations and cancel when needed #4916

Merged
merged 12 commits into from
Mar 16, 2020
101 changes: 95 additions & 6 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ import (
"time"

humanize "github.com/dustin/go-humanize"
"github.com/golang/glog"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
"golang.org/x/net/trace"

ostats "go.opencensus.io/stats"
"go.opencensus.io/tag"
Expand All @@ -45,10 +48,6 @@ import (
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/x"
"github.com/pkg/errors"

"github.com/golang/glog"
"golang.org/x/net/trace"
)

type node struct {
Expand All @@ -63,12 +62,81 @@ type node struct {

streaming int32 // Used to avoid calculating snapshot

// Used to track the ops going on in the system.
ops map[int]*operation
opsLock sync.Mutex

canCampaign bool
elog trace.EventLog

pendingSize int64
}

type operation struct {
// id of the operation.
id int
// closer is used to signal and wait for the operation to finish/cancel.
closer *y.Closer
}

const (
opRollup = iota + 1
opSnapshot
opIndexing
)

// startTask is used to check whether an op is already going on.
// If rollup is going on, we cancel and wait for rollup to complete
// before we return. If the same task is already going, we return error.
func (n *node) startTask(id int) (*operation, error) {
n.opsLock.Lock()
defer n.opsLock.Unlock()

switch id {
case opRollup:
if len(n.ops) > 0 {
return nil, errors.Errorf("another operation is already running, ops:%v", n.ops)
}
case opSnapshot, opIndexing:
if op, has := n.ops[opRollup]; has {
glog.Info("Found a rollup going on. Cancelling rollup!")
op.closer.SignalAndWait()
glog.Info("Rollup cancelled.")
} else if len(n.ops) > 0 {
return nil, errors.Errorf("another operation is already running, ops:%v", n.ops)
}
default:
glog.Errorf("Got an unhandled operation %d. Ignoring...", id)
return nil, nil
}

op := &operation{id: id, closer: y.NewCloser(1)}
n.ops[id] = op
glog.Infof("Operation started with id: %d", id)
return op, nil
}

// stopTask will delete the entry from the map that keep tracks of the ops
// and then signal that tasks has been cancelled/completed for waiting task.
func (n *node) stopTask(op *operation) {
op.closer.Done()

n.opsLock.Lock()
delete(n.ops, op.id)
n.opsLock.Unlock()
glog.Infof("Operation completed with id: %d", op.id)
}

func (n *node) waitForTask(id int) {
n.opsLock.Lock()
op, ok := n.ops[id]
n.opsLock.Unlock()
if !ok {
return
}
op.closer.Wait()
}

// Now that we apply txn updates via Raft, waiting based on Txn timestamps is
// sufficient. We don't need to wait for proposals to be applied.

Expand All @@ -93,6 +161,7 @@ func newNode(store *raftwal.DiskStorage, gid uint32, id uint64, myAddr string) *
rollupCh: make(chan uint64, 3),
elog: trace.NewEventLog("Dgraph", "ApplyCh"),
closer: y.NewCloser(3), // Matches CLOSER:1
ops: make(map[int]*operation),
}
return n
}
Expand Down Expand Up @@ -625,6 +694,12 @@ func (n *node) Snapshot() (*pb.Snapshot, error) {
}

func (n *node) retrieveSnapshot(snap pb.Snapshot) error {
op, err := n.startTask(opSnapshot)
if err != nil {
return err
}
defer n.stopTask(op)

// In some edge cases, the Zero leader might not have been able to update
// the status of Alpha leader. So, instead of blocking forever on waiting
// for Zero to send us the updates info about the leader, we can just use
Expand Down Expand Up @@ -1067,7 +1142,11 @@ func listWrap(kv *bpb.KV) *bpb.KVList {
// rollupLists would consolidate all the deltas that constitute one posting
// list, and write back a complete posting list.
func (n *node) rollupLists(readTs uint64) error {
writer := posting.NewTxnWriter(pstore)
op, err := n.startTask(opRollup)
if err != nil {
return err
}
defer n.stopTask(op)

// We're doing rollups. We should use this opportunity to calculate the tablet sizes.
amLeader := n.AmLeader()
Expand Down Expand Up @@ -1098,6 +1177,7 @@ func (n *node) rollupLists(readTs uint64) error {
atomic.AddInt64(size, delta)
}

writer := posting.NewTxnWriter(pstore)
stream := pstore.NewStreamAt(readTs)
stream.LogPrefix = "Rolling up"
stream.ChooseKey = func(item *badger.Item) bool {
Expand Down Expand Up @@ -1132,7 +1212,16 @@ func (n *node) rollupLists(readTs uint64) error {
stream.Send = func(list *bpb.KVList) error {
return writer.Write(list)
}
if err := stream.Orchestrate(context.Background()); err != nil {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
case <-ctx.Done():
case <-op.closer.HasBeenClosed():
cancel()
}
}()
if err := stream.Orchestrate(ctx); err != nil {
return err
}
if err := writer.Flush(); err != nil {
Expand Down
34 changes: 24 additions & 10 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@ import (
"context"
"math"
"sync"
"sync/atomic"
"time"

"github.com/golang/glog"
"github.com/pkg/errors"
otrace "go.opencensus.io/trace"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/dgo/v2"
"github.com/dgraph-io/dgo/v2/protos/api"

"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/x"

"github.com/golang/glog"
"github.com/pkg/errors"
otrace "go.opencensus.io/trace"
)

var (
Expand Down Expand Up @@ -131,15 +131,29 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs
// not indexing, it would accept and propose the request.
// It is possible that a receiver R of the proposal is still indexing. In that case, R would
// block here and wait for indexing to be finished.
for {
gr.Node.waitForTask(opIndexing)

// done is used to ensure that we only stop the indexing task once.
var done uint32
stopIndexing := func(op *operation) {
if !schema.State().IndexingInProgress() {
break
if atomic.CompareAndSwapUint32(&done, 0, 1) {
gr.Node.stopTask(op)
}
}
glog.Infoln("waiting for indexing to complete")
time.Sleep(time.Second * 2)
}

// Ensure that rollup is not running.
op, err := gr.Node.startTask(opIndexing)
if err != nil {
return err
}
defer stopIndexing(op)

buildIndexesHelper := func(update *pb.SchemaUpdate, rebuild posting.IndexRebuild) error {
// in case background indexing is running, we should call it here again.
defer stopIndexing(op)

wrtCtx := schema.GetWriteContext(context.Background())
if err := rebuild.BuildIndexes(wrtCtx); err != nil {
return err
Expand All @@ -153,7 +167,7 @@ func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs
}

// This wg allows waiting until setup for all the predicates is complete
// befor running buildIndexes for any of those predicates.
// before running buildIndexes for any of those predicates.
var wg sync.WaitGroup
wg.Add(1)
defer wg.Done()
Expand Down
4 changes: 2 additions & 2 deletions worker/proposal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestLimiterDeadlock(t *testing.T) {
"Total proposals: ", atomic.LoadInt64(&currentCount),
"Pending proposal: ", atomic.LoadInt64(&pending),
"Completed Proposals: ", atomic.LoadInt64(&completed),
"Aboted Proposals: ", atomic.LoadInt64(&aborted),
"Aborted Proposals: ", atomic.LoadInt64(&aborted),
"IOU: ", l.iou)
}
}()
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestLimiterDeadlock(t *testing.T) {

// After trying all the proposals, (completed + aborted) should be equal to tried proposal.
require.True(t, toTry == completed+aborted,
fmt.Sprintf("Tried: %d, Compteted: %d, Aboted: %d", toTry, completed, aborted))
fmt.Sprintf("Tried: %d, Compteted: %d, Aborted: %d", toTry, completed, aborted))
}

func BenchmarkRateLimiter(b *testing.B) {
Expand Down