Skip to content

Commit

Permalink
Merge branch 'master' into ibrahim/iterator-log-error
Browse files Browse the repository at this point in the history
  • Loading branch information
Ibrahim Jarif committed Nov 25, 2020
2 parents ad2cf99 + 925e15b commit 524fec5
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 5 deletions.
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ func (db *DB) IsClosed() bool {

func (db *DB) close() (err error) {
db.opt.Debugf("Closing database")
db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(db.lc.l0stallsMs))
db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(atomic.LoadInt64(&db.lc.l0stallsMs)))

atomic.StoreInt32(&db.blockWrites, 1)

Expand Down
4 changes: 2 additions & 2 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ import (

type levelsController struct {
nextFileID uint64 // Atomic
l0stallsMs int64 // Atomic

// The following are initialized once and const.
levels []*levelHandler
kv *DB

cstatus compactStatus
l0stallsMs int64
cstatus compactStatus
}

// revertToManifest checks that all necessary table files exist and removes all table files not
Expand Down
10 changes: 8 additions & 2 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,8 @@ outer:
// are serial. In case any of these steps encounter an error, Orchestrate would stop execution and
// return that error. Orchestrate can be called multiple times, but in serial order.
func (st *Stream) Orchestrate(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
for _, a := range st.allocators {
// Using AllocatorFrom is better because if the allocator is already freed up, it would
Expand All @@ -421,7 +423,7 @@ func (st *Stream) Orchestrate(ctx context.Context) error {
// Picks up ranges from Badger, and sends them to rangeCh.
go st.produceRanges(ctx)

errCh := make(chan error, 1) // Stores error by consumeKeys.
errCh := make(chan error, st.NumGo) // Stores error by consumeKeys.
var wg sync.WaitGroup
for i := 0; i < st.NumGo; i++ {
wg.Add(1)
Expand All @@ -442,7 +444,11 @@ func (st *Stream) Orchestrate(ctx context.Context) error {
kvErr := make(chan error, 1)
go func() {
// Picks up KV lists from kvChan, and sends them to Output.
kvErr <- st.streamKVs(ctx)
err := st.streamKVs(ctx)
if err != nil {
cancel() // Stop all the go routines.
}
kvErr <- err
}()
wg.Wait() // Wait for produceKVs to be over.
close(st.kvChan) // Now we can close kvChan.
Expand Down

0 comments on commit 524fec5

Please sign in to comment.