Skip to content

Commit

Permalink
fix: #283, immudb crash on dump of empty db
Browse files Browse the repository at this point in the history
  • Loading branch information
Gjergj authored and vchaindz committed Jun 2, 2020
1 parent 52a191f commit 6c6e1a5
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 9 deletions.
19 changes: 15 additions & 4 deletions pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,14 @@ func (t *Store) Dump(kvChan chan *pb.KVList) (err error) {
t.tree.Lock()
t.tree.flush()

var emptyCaches = true
for _, c := range t.tree.caches {
tail := c.Tail()
if tail == 0 {
continue
}
emptyCaches = false
}
stream := t.db.NewStreamAt(t.tree.w)
stream.NumGo = 16
stream.LogPrefix = "Badger.Streaming"
Expand All @@ -481,10 +489,13 @@ func (t *Store) Dump(kvChan chan *pb.KVList) (err error) {
kvChan <- list
return nil
}

// Run the stream
if err = stream.Orchestrate(context.Background()); err != nil {
return err
//workaround possible badger bug
//ReadTs should not be retrieved for managed DB
if !emptyCaches {
// Run the stream
if err = stream.Orchestrate(context.Background()); err != nil {
return err
}
}
close(kvChan)
return err
Expand Down
18 changes: 13 additions & 5 deletions pkg/store/treestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,29 +302,37 @@ func (t *treeStore) worker() {
func (t *treeStore) flush() {
t.log.Infof("Flushing tree caches at index %d", t.w-1)
var cancel bool
var emptyCaches bool = true
wb := t.db.NewWriteBatchAt(t.w)
defer func() {
if cancel {
wb.Cancel()
return
}
if err := wb.Flush(); err != nil {
t.log.Errorf("Tree flush error: %s", err)
} else {
advance := func() {
// advance cache indexes iff flushing has succeeded
for l, c := range t.caches {
t.cPos[l] = c.Tail()
}
t.lastFlushed = t.w
}
//workaround possible badger bug
//Commit cannot be called with managedDB=true. Use CommitAt.
if !emptyCaches {
err := wb.Flush()
if err != nil {
t.log.Errorf("Tree flush error: %s", err)
return
}
advance()
}
}()

for l, c := range t.caches {
tail := c.Tail()
if tail == 0 {
continue
}

emptyCaches = false
// fmt.Printf("Flushing [l=%d, head=%d, tail=%d] from %d to (%d-1)\n", l, c.Head(), c.Tail(), t.cPos[l], tail)
for i := t.cPos[l]; i < tail; i++ {
if h := c.Get(i); h != nil {
Expand Down

0 comments on commit 6c6e1a5

Please sign in to comment.