-
Notifications
You must be signed in to change notification settings - Fork 527
ledger: rearrange blockqueue start/stop #4964
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b508744
def171b
d434750
65eee4c
7be0c91
8156ca6
d27b3a5
063f406
b7bd2b8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -52,10 +52,29 @@ type blockQueue struct { | |
| closed chan struct{} | ||
| } | ||
|
|
||
| func bqInit(l *Ledger) (*blockQueue, error) { | ||
| func newBlockQueue(l *Ledger) (*blockQueue, error) { | ||
| bq := &blockQueue{} | ||
| bq.cond = sync.NewCond(&bq.mu) | ||
| bq.l = l | ||
| return bq, nil | ||
| } | ||
|
|
||
| func (bq *blockQueue) start() error { | ||
| bq.mu.Lock() | ||
| defer bq.mu.Unlock() | ||
|
|
||
| if bq.running { | ||
| // this should be harmless, but it should also be impossible | ||
| bq.l.log.Warn("blockQueue.start() already started") | ||
| return nil | ||
| } | ||
| if bq.closed != nil { | ||
| // a previus close() is still waiting on a previous syncer() to finish | ||
| oldsyncer := bq.closed | ||
| bq.mu.Unlock() | ||
| <-oldsyncer | ||
| bq.mu.Lock() | ||
| } | ||
| bq.running = true | ||
| bq.closed = make(chan struct{}) | ||
| ledgerBlockqInitCount.Inc(nil) | ||
|
|
@@ -67,40 +86,41 @@ func bqInit(l *Ledger) (*blockQueue, error) { | |
| }) | ||
| ledgerBlockqInitMicros.AddMicrosecondsSince(start, nil) | ||
| if err != nil { | ||
| return nil, err | ||
| return err | ||
| } | ||
|
|
||
| go bq.syncer() | ||
| return bq, nil | ||
| return nil | ||
| } | ||
|
|
||
| func (bq *blockQueue) close() { | ||
| func (bq *blockQueue) stop() { | ||
| bq.mu.Lock() | ||
| defer func() { | ||
| bq.mu.Unlock() | ||
| // we want to block here until the sync go routine is done. | ||
| // it's not (just) for the sake of a complete cleanup, but rather | ||
| // to ensure that the sync goroutine isn't busy in a notifyCommit | ||
| // call which might be blocked inside one of the trackers. | ||
| <-bq.closed | ||
| }() | ||
|
|
||
| closechan := bq.closed | ||
| if bq.running { | ||
| bq.running = false | ||
| bq.cond.Broadcast() | ||
| } | ||
|
|
||
| bq.mu.Unlock() | ||
|
|
||
| // we want to block here until the sync go routine is done. | ||
| // it's not (just) for the sake of a complete cleanup, but rather | ||
| // to ensure that the sync goroutine isn't busy in a notifyCommit | ||
| // call which might be blocked inside one of the trackers. | ||
| if closechan != nil { | ||
| <-closechan | ||
| } | ||
| } | ||
|
|
||
| func (bq *blockQueue) syncer() { | ||
| defer close(bq.closed) | ||
| bq.mu.Lock() | ||
| for { | ||
| for bq.running && len(bq.q) == 0 { | ||
| bq.cond.Wait() | ||
| } | ||
|
|
||
| if !bq.running { | ||
| close(bq.closed) | ||
| bq.closed = nil | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is setting this to nil necessary? Keep it closed, and replace it with a new one when restarted.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. receiving from nil blocks forever, and creating a chan and immediately closing it feels silly, so I like the nil checking
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on my suggestion, I expect that none of what you arguing should happen.
|
||
| bq.mu.Unlock() | ||
| return | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.