-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Concurrent Mutations #4892
Concurrent Mutations #4892
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 3 files reviewed, 7 unresolved discussions (waiting on @animesh2049 and @manishrjain)
worker/draft.go, line 322 at r2 (raw file):
if !ok { n.predChanMutex.Lock() ch, ok = n.predChan[edge.Attr]
This sort of logic belongs to a separate function.
worker/draft.go, line 330 at r2 (raw file):
go func(ch chan *runMutPayload) { writer := posting.NewTxnWriter(pstore) for payload := range ch {
Payload shouldn't just have one edge. Channels are not that performant. You want to batch up all edges from one mutation for that pred and then send across channel.
worker/draft.go, line 332 at r2 (raw file):
for payload := range ch { for { err := runMutation(payload.ctx, payload.edge, payload.txn)
This should have its own txn. Not use a shared txn.
worker/draft.go, line 343 at r2 (raw file):
key := string(x.DataKey(edge.Attr, edge.Entity)) payload.txn.CommitKeyToDisk(writer, key, payload.txn.StartTs)
we update many keys, not just one key. This logic won't work. You have to use a separate txn.
worker/draft.go, line 352 at r2 (raw file):
edge: edge, ctx: ctx, txn: txn,
You don't want to share a txn across goroutines.
worker/draft.go, line 417 at r2 (raw file):
} if x.WorkerConfig.LudicrousMode { // We have already commited changes to disk. We only need to advance the oracle.
You don't need to advance the Oracle. OracleUpdates would do that automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 1 files reviewed, 9 unresolved discussions (waiting on @animesh2049, @golangcibot, and @manishrjain)
worker/draft.go, line 343 at r1 (raw file):
Previously, golangcibot (Bot from GolangCI) wrote…
Error return value of
payload.txn.CommitKeyToDisk
is not checked (fromerrcheck
)
Done.
worker/draft.go, line 322 at r2 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
This sort of logic belongs to a separate function.
Done.
worker/draft.go, line 330 at r2 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
Payload shouldn't just have one edge. Channels are not that performant. You want to batch up all edges from one mutation for that pred and then send across channel.
Done.
worker/draft.go, line 332 at r2 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
This should have its own txn. Not use a shared txn.
Done.
worker/draft.go, line 343 at r2 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
we update many keys, not just one key. This logic won't work. You have to use a separate txn.
Done.
worker/draft.go, line 352 at r2 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
You don't want to share a txn across goroutines.
Done.
worker/draft.go, line 417 at r2 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
You don't need to advance the Oracle. OracleUpdates would do that automatically.
Done.
worker/draft.go, line 338 at r3 (raw file):
Previously, golangcibot (Bot from GolangCI) wrote…
Error return value of
ptxn.CommitToDisk
is not checked (fromerrcheck
)
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dismissed @golangcibot from a discussion.
Reviewable status: 0 of 1 files reviewed, 8 unresolved discussions (waiting on @golangcibot and @manishrjain)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the logic to a separate class. Clean up the code. Get it reviewed. The numbers seem promising.
Reviewable status: 0 of 1 files reviewed, 9 unresolved discussions (waiting on @animesh2049, @golangcibot, and @manishrjain)
worker/draft.go, line 322 at r2 (raw file):
Previously, animesh2049 (Animesh Chandra Pathak) wrote…
Done.
Create a small executor struct or something, and move all logic in there. And maybe that can share a writer across multiple payloads.
Would contain the preds, channels, this func, other funcs, etc.
worker/draft.go, line 76 at r4 (raw file):
pendingSize int64 predChan map[string]chan *runMutPayload
This shouldn't belong to node.
worker/draft.go, line 326 at r4 (raw file):
if err := ptxn.CommitToDisk(writer, payload.startTs); err != nil { glog.Errorf("Error while commiting to disk: %+v", err) }
Don't you need to wait for writer? May or may not work.
worker/draft.go, line 330 at r4 (raw file):
} checkAndAddPredChannel := func(pred string) {
getPredChan
worker/draft.go, line 362 at r4 (raw file):
} for attr, payload := range payloadMap {
Get the channel here then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dismissed @golangcibot from 3 discussions.
Reviewable status: 0 of 2 files reviewed, 8 unresolved discussions (waiting on @animesh2049, @golangcibot, and @manishrjain)
worker/background_mutation.go, line 20 at r5 (raw file):
Previously, golangcibot (Bot from GolangCI) wrote…
File is not
gofmt
-ed with-s
(fromgofmt
)predChan map[string]chan *runMutPayload
Done.
worker/draft.go, line 69 at r5 (raw file):
Previously, golangcibot (Bot from GolangCI) wrote…
File is not
gofmt
-ed with-s
(fromgofmt
)pendingSize int64
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 2 files reviewed, 8 unresolved discussions (waiting on @animesh2049, @golangcibot, and @manishrjain)
worker/draft.go, line 76 at r4 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
This shouldn't belong to node.
Done.
worker/draft.go, line 330 at r4 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
getPredChan
Done.
worker/draft.go, line 362 at r4 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
Get the channel here then.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the wait. And also add a TODO to consider later if we need the wait.
Reviewed 2 of 2 files at r6.
Reviewable status: all files reviewed, 5 unresolved discussions (waiting on @animesh2049, @golangcibot, @mangalaman93, and @pawanrawal)
worker/background_mutation.go, line 1 at r6 (raw file):
package worker
Add the license.
worker/background_mutation.go, line 12 at r6 (raw file):
) type runMutPayload struct {
subMutation
worker/background_mutation.go, line 48 at r6 (raw file):
if err := ptxn.CommitToDisk(writer, payload.startTs); err != nil { glog.Errorf("Error while commiting to disk: %+v", err) }
Add the wait here.
worker/background_mutation.go, line 74 at r6 (raw file):
} func (e *Executor) addEdges(ctx context.Context, StartTs uint64, edges []*pb.DirectedEdge) {
startTs, not capital S.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also test with upserts.
Reviewable status: all files reviewed, 5 unresolved discussions (waiting on @animesh2049, @golangcibot, @mangalaman93, and @pawanrawal)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 10 files reviewed, 5 unresolved discussions (waiting on @animesh2049, @golangcibot, @mangalaman93, @manishrjain, @martinmr, and @pawanrawal)
worker/background_mutation.go, line 1 at r6 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
Add the license.
Done.
worker/background_mutation.go, line 12 at r6 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
subMutation
Done.
worker/background_mutation.go, line 74 at r6 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
startTs, not capital S.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 10 files reviewed, 5 unresolved discussions (waiting on @golangcibot, @mangalaman93, @manishrjain, @martinmr, and @pawanrawal)
worker/background_mutation.go, line 48 at r6 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
Add the wait here.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 8 of 9 files at r7, 2 of 2 files at r8.
Reviewable status: all files reviewed, 21 unresolved discussions (waiting on @animesh2049, @golangcibot, @manishrjain, @martinmr, and @pawanrawal)
dgraph/cmd/zero/raft.go, line 639 at r8 (raw file):
go n.checkQuorum(closer) go n.RunReadIndexLoop(closer, readStateCh) if x.WorkerConfig.LudicrousMode {
Could use Zero.Conf.Get
here directly.
dgraph/cmd/zero/run.go, line 42 at r8 (raw file):
"github.com/dgraph-io/dgraph/raftwal" "github.com/dgraph-io/dgraph/x" "github.com/golang/glog"
Move these imports up
dgraph/cmd/zero/run.go, line 174 at r8 (raw file):
} x.WorkerConfig = x.WorkerOptions{
Don't need this.
edgraph/server.go, line 842 at r8 (raw file):
return resp, ctx.Err() } if x.WorkerConfig.LudicrousMode {
We should keep the logic for timestamp assignment together. Move it below with the if qc.req.StartTs == 0 {
posting/list.go, line 401 at r8 (raw file):
l.updateMutationLayer(mpost) if x.WorkerConfig.LudicrousMode {
Add a comment that we do not perform conflict check
worker/background_mutation.go, line 2 at r8 (raw file):
/* * Copyright 2016-2018 Dgraph Labs, Inc. and Contributors
2020
worker/background_mutation.go, line 31 at r8 (raw file):
type subMutation struct { edges []*pb.DirectedEdge
See if we could use []pb.DirectedEdge
instead. Reduces GC pressure.
worker/background_mutation.go, line 36 at r8 (raw file):
} type Executor struct {
don't think we need to export this.
worker/background_mutation.go, line 41 at r8 (raw file):
} func newExecutor() *Executor {
not needed
worker/background_mutation.go, line 56 at r8 (raw file):
return default:
remove vertical space
worker/background_mutation.go, line 61 at r8 (raw file):
for _, edge := range payload.edges { for { err := runMutation(payload.ctx, edge, ptxn)
use switch instead
worker/background_mutation.go, line 77 at r8 (raw file):
// TODO(Animesh): We might not need this wait. if err := writer.Wait(); err != nil { glog.Errorf("Error while waiting for writes: %+v", err)
%v
worker/background_mutation.go, line 91 at r8 (raw file):
// Create a new channel for `pred`. e.Lock()
use defer here
worker/draft.go, line 276 at r8 (raw file):
m := proposal.Mutations var txn *posting.Txn if !x.WorkerConfig.LudicrousMode {
Can move this code below the return for ludicrous mode!
worker/draft.go, line 304 at r8 (raw file):
if x.WorkerConfig.LudicrousMode { n.ex.addEdges(ctx, m.StartTs, m.Edges)
Is it okay to use the same context here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dismissed @manishrjain from a discussion.
Reviewable status: all files reviewed, 20 unresolved discussions (waiting on @animesh2049, @golangcibot, @manishrjain, @martinmr, and @pawanrawal)
worker/background_mutation.go, line 68 at r7 (raw file):
Previously, golangcibot (Bot from GolangCI) wrote…
Error return value of
writer.Wait
is not checked (fromerrcheck
)
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dismissed @golangcibot from 2 discussions.
Reviewable status: all files reviewed, 18 unresolved discussions (waiting on @animesh2049, @manishrjain, @martinmr, and @pawanrawal)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 5 of 10 files reviewed, 18 unresolved discussions (waiting on @animesh2049, @mangalaman93, @manishrjain, @martinmr, and @pawanrawal)
dgraph/cmd/zero/raft.go, line 639 at r8 (raw file):
Previously, mangalaman93 (Aman Mangal) wrote…
Could use
Zero.Conf.Get
here directly.
It didn't look nice to use "ludicrous_mode"
inside here to get the bool value, so moved it to opts.
dgraph/cmd/zero/run.go, line 42 at r8 (raw file):
Previously, mangalaman93 (Aman Mangal) wrote…
Move these imports up
Done.
dgraph/cmd/zero/run.go, line 174 at r8 (raw file):
Previously, mangalaman93 (Aman Mangal) wrote…
Don't need this.
Done.
posting/list.go, line 401 at r8 (raw file):
Previously, mangalaman93 (Aman Mangal) wrote…
Add a comment that we do not perform conflict check
Done.
worker/background_mutation.go, line 2 at r8 (raw file):
Previously, mangalaman93 (Aman Mangal) wrote…
2020
Done.
worker/background_mutation.go, line 36 at r8 (raw file):
Previously, mangalaman93 (Aman Mangal) wrote…
don't think we need to export this.
Done.
worker/background_mutation.go, line 56 at r8 (raw file):
Previously, mangalaman93 (Aman Mangal) wrote…
remove vertical space
Done.
worker/background_mutation.go, line 61 at r8 (raw file):
Previously, mangalaman93 (Aman Mangal) wrote…
use switch instead
Done.
worker/background_mutation.go, line 77 at r8 (raw file):
Previously, mangalaman93 (Aman Mangal) wrote…
%v
Done.
worker/background_mutation.go, line 91 at r8 (raw file):
Previously, mangalaman93 (Aman Mangal) wrote…
use defer here
Done.
worker/draft.go, line 276 at r8 (raw file):
Previously, mangalaman93 (Aman Mangal) wrote…
Can move this code below the return for ludicrous mode!
Done.
break | ||
case err != posting.ErrRetry: | ||
glog.Errorf("Error while mutating: %v", err) | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
S1023: redundant break statement (from gosimple
)
1c83402
to
3afce13
Compare
3afce13
to
ec75c0b
Compare
* Concurrent mutation in ludicrous mode. * Minor changes.
This change is