Skip to content

Commit

Permalink
Concurrent Mutations (#4892)
Browse files Browse the repository at this point in the history
* Concurrent mutation in ludicrous mode.

* Minor changes.
  • Loading branch information
animesh2049 authored Mar 24, 2020
1 parent da208cd commit 0eccc3e
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 9 deletions.
2 changes: 1 addition & 1 deletion dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ func (n *node) Run() {
go n.updateZeroMembershipPeriodically(closer)
go n.checkQuorum(closer)
go n.RunReadIndexLoop(closer, readStateCh)
if x.WorkerConfig.LudicrousMode {
if opts.LudicrousMode {
closer.AddRunning(1)
go x.StoreSync(n.Store, closer)
}
Expand Down
2 changes: 2 additions & 0 deletions dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type options struct {
peer string
w string
rebalanceInterval time.Duration
LudicrousMode bool
}

var opts options
Expand Down Expand Up @@ -180,6 +181,7 @@ func run() {
peer: Zero.Conf.GetString("peer"),
w: Zero.Conf.GetString("wal"),
rebalanceInterval: Zero.Conf.GetDuration("rebalance_interval"),
LudicrousMode: Zero.Conf.GetBool("ludicrous_mode"),
}

x.WorkerConfig = x.WorkerOptions{
Expand Down
1 change: 1 addition & 0 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.Directed
}

if x.WorkerConfig.LudicrousMode {
// Conflict detection is not required for ludicrous mode.
return nil
}

Expand Down
119 changes: 119 additions & 0 deletions worker/background_mutation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright 2016-2020 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Package worker contains code for pb.worker communication to perform
// queries and mutations.
package worker

import (
"context"
"sync"

"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/golang/glog"
)

type subMutation struct {
edges []*pb.DirectedEdge
ctx context.Context
startTs uint64
}

type executor struct {
sync.RWMutex
predChan map[string]chan *subMutation
}

func newExecutor() *executor {
return &executor{
predChan: make(map[string]chan *subMutation),
}
}

func (e *executor) processMutationCh(ch chan *subMutation) {
writer := posting.NewTxnWriter(pstore)
for payload := range ch {
select {
case <-ShutdownCh:
// Ignore all the unfinished mutation after shutdown signal.
glog.Infof("Ignoring further unfinished mutations")
return
default:
}
ptxn := posting.NewTxn(payload.startTs)
for _, edge := range payload.edges {
for {
err := runMutation(payload.ctx, edge, ptxn)
if err == nil {
break
} else if err != posting.ErrRetry {
glog.Errorf("Error while mutating: %v", err)
break
}
}
}
ptxn.Update()
if err := ptxn.CommitToDisk(writer, payload.startTs); err != nil {
glog.Errorf("Error while commiting to disk: %v", err)
}
// TODO(Animesh): We might not need this wait.
if err := writer.Wait(); err != nil {
glog.Errorf("Error while waiting for writes: %v", err)
}
}
}

func (e *executor) getChannel(pred string) (ch chan *subMutation) {
e.RLock()
ch, ok := e.predChan[pred]
e.RUnlock()
if ok {
return ch
}

// Create a new channel for `pred`.
e.Lock()
defer e.Unlock()
ch, ok = e.predChan[pred]
if ok {
return ch
}
ch = make(chan *subMutation, 1000)
e.predChan[pred] = ch
go e.processMutationCh(ch)
return ch
}

func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.DirectedEdge) {
payloadMap := make(map[string]*subMutation)

for _, edge := range edges {
payload, ok := payloadMap[edge.Attr]
if !ok {
payloadMap[edge.Attr] = &subMutation{
ctx: ctx,
startTs: startTs,
}
payload = payloadMap[edge.Attr]
}
payload.edges = append(payload.edges, edge)
}

for attr, payload := range payloadMap {
e.getChannel(attr) <- payload
}
}
26 changes: 18 additions & 8 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type node struct {
elog trace.EventLog

pendingSize int64

ex *executor
}

type op int
Expand Down Expand Up @@ -193,6 +195,9 @@ func newNode(store *raftwal.DiskStorage, gid uint32, id uint64, myAddr string) *
closer: y.NewCloser(4), // Matches CLOSER:1
ops: make(map[op]*y.Closer),
}
if x.WorkerConfig.LudicrousMode {
n.ex = newExecutor()
}
return n
}

Expand Down Expand Up @@ -379,14 +384,6 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr
}

m := proposal.Mutations
txn := posting.Oracle().RegisterStartTs(m.StartTs)
if txn.ShouldAbort() {
span.Annotatef(nil, "Txn %d should abort.", m.StartTs)
return zero.ErrConflict
}

// Discard the posting lists from cache to release memory at the end.
defer txn.Update()

// It is possible that the user gives us multiple versions of the same edge, one with no facets
// and another with facets. In that case, use stable sort to maintain the ordering given to us
Expand All @@ -402,6 +399,19 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr
return ei.GetEntity() < ej.GetEntity()
})

if x.WorkerConfig.LudicrousMode {
n.ex.addEdges(ctx, m.StartTs, m.Edges)
return nil
}

txn := posting.Oracle().RegisterStartTs(m.StartTs)
if txn.ShouldAbort() {
span.Annotatef(nil, "Txn %d should abort.", m.StartTs)
return zero.ErrConflict
}
// Discard the posting lists from cache to release memory at the end.
defer txn.Update()

process := func(edges []*pb.DirectedEdge) error {
var retries int
for _, edge := range edges {
Expand Down

0 comments on commit 0eccc3e

Please sign in to comment.