Skip to content

Commit

Permalink
correctness fixes for the autobatch blockstore
Browse files Browse the repository at this point in the history
1. Simplify shutdown and make it idempotent by using a context.
2. Make sure `Flush` actually _fully_ flushes if the previous flush failed.
3. Don't clear the flush batch if flushing fails.
  • Loading branch information
Stebalien committed Jan 12, 2022
1 parent b161f56 commit c7246e1
Showing 1 changed file with 54 additions and 37 deletions.
91 changes: 54 additions & 37 deletions blockstore/autobatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,17 @@ type AutobatchBlockstore struct {
addedCids map[cid.Cid]struct{}

stateLock sync.Mutex
doFlushLock sync.Mutex
bufferedBatch blockBatch

flushingBatch blockBatch
flushErr error
flushWorkerDone bool
flushingBatch blockBatch
flushErr error

flushCh chan struct{}

doFlushLock sync.Mutex
flushRetryDelay time.Duration
flushCtx context.Context
shutdownCh chan struct{}
doneCh chan struct{}
shutdown context.CancelFunc

backingBs Blockstore

Expand All @@ -46,21 +45,20 @@ type AutobatchBlockstore struct {
}

func NewAutobatch(ctx context.Context, backingBs Blockstore, bufferCapacity int) *AutobatchBlockstore {
ctx, cancel := context.WithCancel(ctx)
bs := &AutobatchBlockstore{
addedCids: make(map[cid.Cid]struct{}),
backingBs: backingBs,
bufferCapacity: bufferCapacity,
flushCtx: ctx,
flushCh: make(chan struct{}, 1),
shutdownCh: make(chan struct{}),
// could be made configable
flushRetryDelay: time.Millisecond * 100,
flushWorkerDone: false,
shutdown: cancel,
}

bs.bufferedBatch.blockMap = make(map[cid.Cid]block.Block)

go bs.flushWorker()
go bs.flushWorker(ctx)

return bs
}
Expand Down Expand Up @@ -88,66 +86,85 @@ func (bs *AutobatchBlockstore) Put(ctx context.Context, blk block.Block) error {
return nil
}

func (bs *AutobatchBlockstore) flushWorker() {
defer func() {
bs.stateLock.Lock()
bs.flushWorkerDone = true
bs.stateLock.Unlock()
}()
func (bs *AutobatchBlockstore) flushWorker(ctx context.Context) {
defer close(bs.doneCh)
for {
select {
case <-bs.flushCh:
putErr := bs.doFlush(bs.flushCtx)
// TODO: check if we _should_ actually flush. We could get a spurious wakeup
// here.
putErr := bs.doFlush(ctx, false)
for putErr != nil {
select {
case <-bs.shutdownCh:
case <-ctx.Done():
return
case <-time.After(bs.flushRetryDelay):
autolog.Errorf("FLUSH ERRORED: %w, retrying after %v", putErr, bs.flushRetryDelay)
putErr = bs.doFlush(bs.flushCtx)
putErr = bs.doFlush(ctx, true)
}
}
case <-bs.shutdownCh:
case <-ctx.Done():
// Do one last flush.
_ = bs.doFlush(ctx, false)
return
}
}
}

// caller must NOT hold stateLock
func (bs *AutobatchBlockstore) doFlush(ctx context.Context) error {
// set retryOnly to true to only retry a failed flush and not flush anything new.
func (bs *AutobatchBlockstore) doFlush(ctx context.Context, retryOnly bool) error {
bs.doFlushLock.Lock()
defer bs.doFlushLock.Unlock()
if bs.flushErr == nil {
bs.stateLock.Lock()
// We do NOT clear addedCids here, because its purpose is to expedite Puts
bs.flushingBatch = bs.bufferedBatch
bs.bufferedBatch.blockList = make([]block.Block, 0, len(bs.flushingBatch.blockList))
bs.bufferedBatch.blockMap = make(map[cid.Cid]block.Block, len(bs.flushingBatch.blockMap))
bs.stateLock.Unlock()

// If we failed to flush last time, try flushing again.
if bs.flushErr != nil {
bs.flushErr = bs.backingBs.PutMany(ctx, bs.flushingBatch.blockList)
}

bs.flushErr = bs.backingBs.PutMany(ctx, bs.flushingBatch.blockList)
// If we failed, or we're _only_ retrying, bail.
if retryOnly || bs.flushErr != nil {
return bs.flushErr
}

// Then take the current batch...
bs.stateLock.Lock()
bs.flushingBatch = blockBatch{}
// We do NOT clear addedCids here, because its purpose is to expedite Puts
bs.flushingBatch = bs.bufferedBatch
bs.bufferedBatch.blockList = make([]block.Block, 0, len(bs.flushingBatch.blockList))
bs.bufferedBatch.blockMap = make(map[cid.Cid]block.Block, len(bs.flushingBatch.blockMap))
bs.stateLock.Unlock()

// And try to flush it.
bs.flushErr = bs.backingBs.PutMany(ctx, bs.flushingBatch.blockList)

// If we succeeded, reset the batch. Otherwise, we'll try again next time.
if bs.flushErr == nil {
bs.stateLock.Lock()
bs.flushingBatch = blockBatch{}
bs.stateLock.Unlock()
}

return bs.flushErr
}

// caller must NOT hold stateLock
func (bs *AutobatchBlockstore) Flush(ctx context.Context) error {
return bs.doFlush(ctx)
return bs.doFlush(ctx, false)
}

func (bs *AutobatchBlockstore) Shutdown(ctx context.Context) error {
bs.stateLock.Lock()
flushDone := bs.flushWorkerDone
bs.stateLock.Unlock()
if !flushDone {
// may racily block forever if Shutdown is called in parallel
bs.shutdownCh <- struct{}{}
// TODO: Prevent puts after we call this to avoid losing data.
bs.shutdown()
select {
case <-bs.doneCh:
case <-ctx.Done():
return ctx.Err()
}

bs.doFlushLock.Lock()
defer bs.doFlushLock.Unlock()

return bs.flushErr
}

Expand Down

0 comments on commit c7246e1

Please sign in to comment.