-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Incremental Rollup and Tablet Size Calculation #4972
Conversation
…into mrjn/incremental
// If we were doing any other operation, let's restart rollups. | ||
if id != opRollup { | ||
time.Sleep(10 * time.Second) // Wait for 10s to start rollup operation. | ||
n.startTask(opRollup) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error return value of n.startTask
is not checked (from errcheck
)
go n.processApplyCh() | ||
go n.BatchAndSendMessages() | ||
n.startTask(opRollup) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error return value of n.startTask
is not checked (from errcheck
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 4 files reviewed, 3 unresolved discussions (waiting on @mangalaman93, @manishrjain, @martinmr, and @parasssh)
worker/mutation.go, line 153 at r6 (raw file):
if !schema.State().IndexingInProgress() { if atomic.CompareAndSwapUint32(&done, 0, 1) { // Q: Shouldn't this done be called via defer in this func?
@mangalaman93 : Can you respond to this question?
// We use the following block of code to trigger incremental rollup on this key. | ||
deltaCount := 0 | ||
defer func() { | ||
if deltaCount > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't we want to do this if deltaCount >= 2?
return nil, errors.Errorf("another operation is already running, ops:%v", n.ops) | ||
for otherId, otherCloser := range n.ops { | ||
if otherId == opRollup { | ||
otherCloser.SignalAndWait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we pause rollup only when we know for sure there are no other operation. Let's say,
n.ops has opRollup and opIndexing. Then opRollup will be stopped but since there is also an opIndexing, we will return from line 126 having paused opRollup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 4 files reviewed, 5 unresolved discussions (waiting on @mangalaman93, @martinmr, and @parasssh)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 4 files reviewed, 5 unresolved discussions (waiting on @mangalaman93, @martinmr, and @parasssh)
posting/mvcc.go, line 270 at r7 (raw file):
Previously, parasssh wrote…
Didn't we want to do this if deltaCount >= 2?
I think considering we have other means to not overwhelm the system, we can even send keys which have even one delta posting. If it becomes too taxing, we can reconsider later.
worker/draft.go, line 124 at r7 (raw file):
Previously, parasssh wrote…
Shouldn't we pause rollup only when we know for sure there are no other operation. Let's say,
n.ops has opRollup and opIndexing. Then opRollup will be stopped but since there is also an opIndexing, we will return from line 126 having paused opRollup.
Rollup is not supposed to happen currently for any other operation. So, we won't have opRollup and opIndexing happening at the same time to begin with. This piece of code is just a simpler, more robust to catch all that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 4 files reviewed, 10 unresolved discussions (waiting on @mangalaman93, @manishrjain, @martinmr, and @parasssh)
posting/mvcc.go, line 270 at r7 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
I think considering we have other means to not overwhelm the system, we can even send keys which have even one delta posting. If it becomes too taxing, we can reconsider later.
There seems to be a loop here. ReadPostingList -> addKeyToBatch -> Rollup -> ReadPostingList
This would be an infinite loop. To avoid the infinite loop, we should use deltaCount > 1 at least.
worker/draft.go, line 108 at r9 (raw file):
// If we were doing any other operation, let's restart rollups. if id != opRollup { time.Sleep(10 * time.Second) // Wait for 10s to start rollup operation.
Why wait here?
worker/draft.go, line 109 at r9 (raw file):
if id != opRollup { time.Sleep(10 * time.Second) // Wait for 10s to start rollup operation. x.Check2(n.startTask(opRollup))
startTask can fail here if another operation starts before rollup. We should I think gracefully handle the error instead of just crashing.
worker/draft.go, line 136 at r9 (raw file):
n.ops[id] = closer glog.Infof("Operation started with id: %s", id) go func(id op, closer *y.Closer) {
This seemed like a little bit more work than directly calling stopTask
. Why do you think calling closer.Done() + goroutine
is better than just calling stopTask
?
worker/draft.go, line 1548 at r9 (raw file):
go n.processApplyCh() go n.BatchAndSendMessages() x.Check2(n.startTask(opRollup))
same here, startTask(opRollup) can return a genuine error that should be handled.
worker/draft.go, line 1549 at r9 (raw file):
go n.BatchAndSendMessages() x.Check2(n.startTask(opRollup)) go n.stopAllTasks()
Why are we calling stopAllTasks in the beginning? Shouldn't this be called when Alpha is shutting down?
worker/mutation.go, line 153 at r6 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
@mangalaman93 : Can you respond to this question?
No, because we may have background routines running. This function will return but the routines will be still be running computing indexes in the background.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 4 files at r2, 1 of 4 files at r4, 1 of 2 files at r7, 1 of 1 files at r9.
Reviewable status: all files reviewed, 10 unresolved discussions (waiting on @manishrjain, @martinmr, and @parasssh)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 4 files at r2, 1 of 4 files at r4, 1 of 2 files at r7, 1 of 1 files at r9.
Reviewable status: 2 of 4 files reviewed, 9 unresolved discussions (waiting on @mangalaman93, @manishrjain, @martinmr, and @parasssh)
posting/mvcc.go, line 270 at r7 (raw file):
Previously, mangalaman93 (Aman Mangal) wrote…
There seems to be a loop here.
ReadPostingList -> addKeyToBatch -> Rollup -> ReadPostingList
This would be an infinite loop. To avoid the infinite loop, we should use deltaCount > 1 at least.
The loop exists for any value of minDeltaCount. We have a mechanism which avoids the same key getting rolled up for at least 10s. That would catch this scenario.
worker/draft.go, line 108 at r9 (raw file):
Previously, mangalaman93 (Aman Mangal) wrote…
Why wait here?
When Dgraph starts, it runs a bunch of indexing operations -- this avoids rollup jumping in between them and cycling through starting and stopping. Also, gives other operations a chance to run first.
worker/draft.go, line 109 at r9 (raw file):
Previously, mangalaman93 (Aman Mangal) wrote…
startTask can fail here if another operation starts before rollup. We should I think gracefully handle the error instead of just crashing.
Yeah. My PR didn't have those checks. Paras added them due to CI complaining. I've reverted those changes and added a comment.
worker/draft.go, line 136 at r9 (raw file):
Previously, mangalaman93 (Aman Mangal) wrote…
This seemed like a little bit more work than directly calling
stopTask
. Why do you think callingcloser.Done() + goroutine
is better than just callingstopTask
?
operation struct can't be sent over to posting package. But, closer can. That determined this change.
worker/draft.go, line 1548 at r9 (raw file):
Previously, mangalaman93 (Aman Mangal) wrote…
same here, startTask(opRollup) can return a genuine error that should be handled.
Removed the check failure. Don't think there's need to handle this error, if any.
worker/draft.go, line 1549 at r9 (raw file):
Previously, mangalaman93 (Aman Mangal) wrote…
Why are we calling stopAllTasks in the beginning? Shouldn't this be called when Alpha is shutting down?
stopalltasks blocks on getting a n.Closer.HasBeenClosed call, and only then shuts down the tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 2 of 4 files reviewed, 6 unresolved discussions (waiting on @mangalaman93, @manishrjain, @martinmr, and @parasssh)
worker/draft.go, line 108 at r9 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
When Dgraph starts, it runs a bunch of indexing operations -- this avoids rollup jumping in between them and cycling through starting and stopping. Also, gives other operations a chance to run first.
we should add this information in the comments.
worker/draft.go, line 109 at r9 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
Yeah. My PR didn't have those checks. Paras added them due to CI complaining. I've reverted those changes and added a comment.
But now if this returns an error, we will not have rollups running at all, right?
worker/draft.go, line 136 at r9 (raw file):
Previously, manishrjain (Manish R Jain) wrote…
operation struct can't be sent over to posting package. But, closer can. That determined this change.
I meant, we could have avoided creating a goroutine here altogether.
worker/mutation.go, line 153 at r4 (raw file):
if !schema.State().IndexingInProgress() { if atomic.CompareAndSwapUint32(&done, 0, 1) { // Q: Shouldn't this done be called via defer in this func?
We should remove this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 2 of 4 files reviewed, 6 unresolved discussions (waiting on @mangalaman93, @manishrjain, @martinmr, and @parasssh)
worker/mutation.go, line 151 at r10 (raw file):
var done uint32 stopIndexing := func(closer *y.Closer) { // runSchemaMutation can return. stopIndexing could be called by goroutines.
Sometimes no background tasks are spawned. In that case, we call stopIndexing
from runSchemaMutation
too. FYI.
(cherry picked from commit dd0728f)
Instead of doing a rollup over the entire DB on every snapshot, this PR instead does a live, incremental rollup over keys as they get read. It also switches tablet size calculation from iterating over the entire DB to using
DB.Tables()
function in Badger.This PR also makes some changes to simplify task tracking and ensure that all background tasks get signaled and finished on Ctrl+C.
This change is
Docs Preview: