Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 23 additions & 17 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,10 @@ type BlockChain struct {
txLookupCache *lru.Cache // Cache for the most recent transaction lookup data.
futureBlocks *lru.Cache // future blocks are blocks added for later processing

quit chan struct{} // blockchain quit channel
running int32 // running must be called atomically
// procInterrupt must be atomically called
procInterrupt int32 // interrupt signaler for block processing
quit chan struct{} // blockchain quit channel
wg sync.WaitGroup // chain processing wait group for shutting down
running int32 // 0 if chain is running, 1 when stopped
procInterrupt int32 // interrupt signaler for block processing

engine consensus.Engine
validator Validator // Block and state validator interface
Expand Down Expand Up @@ -239,7 +238,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
bc.processor = NewStateProcessor(chainConfig, bc, engine)

var err error
bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt)
bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.insertStopped)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -332,10 +331,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
return bc, nil
}

func (bc *BlockChain) getProcInterrupt() bool {
return atomic.LoadInt32(&bc.procInterrupt) == 1
}

// GetVMConfig returns the block chain VM config.
func (bc *BlockChain) GetVMConfig() *vm.Config {
return &bc.vmConfig
Expand Down Expand Up @@ -882,8 +877,7 @@ func (bc *BlockChain) Stop() {
// Unsubscribe all subscriptions registered from blockchain
bc.scope.Close()
close(bc.quit)
atomic.StoreInt32(&bc.procInterrupt, 1)

bc.StopInsert()
bc.wg.Wait()

// Ensure that the entirety of the state snapshot is journalled to disk.
Expand Down Expand Up @@ -928,6 +922,18 @@ func (bc *BlockChain) Stop() {
log.Info("Blockchain stopped")
}

// StopInsert interrupts all insertion methods, causing them to return
// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after
// calling this method.
func (bc *BlockChain) StopInsert() {
atomic.StoreInt32(&bc.procInterrupt, 1)
}

// insertStopped returns true after StopInsert has been called.
func (bc *BlockChain) insertStopped() bool {
return atomic.LoadInt32(&bc.procInterrupt) == 1
}

func (bc *BlockChain) procFutureBlocks() {
blocks := make([]*types.Block, 0, bc.futureBlocks.Len())
for _, hash := range bc.futureBlocks.Keys() {
Expand Down Expand Up @@ -1113,7 +1119,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
var deleted []*numberHash
for i, block := range blockChain {
// Short circuit insertion if shutting down or processing failed
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
if bc.insertStopped() {
return 0, errInsertionInterrupted
}
// Short circuit insertion if it is required(used in testing only)
Expand Down Expand Up @@ -1260,7 +1266,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
batch := bc.db.NewBatch()
for i, block := range blockChain {
// Short circuit insertion if shutting down or processing failed
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
if bc.insertStopped() {
return 0, errInsertionInterrupted
}
// Short circuit if the owner header is unknown
Expand Down Expand Up @@ -1708,8 +1714,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
// No validation errors for the first block (or chain prefix skipped)
for ; block != nil && err == nil || err == ErrKnownBlock; block, err = it.next() {
// If the chain is terminating, stop processing blocks
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
log.Debug("Premature abort during blocks processing")
if bc.insertStopped() {
log.Debug("Abort during block processing")
break
}
// If the header is a banned one, straight out abort
Expand Down Expand Up @@ -1996,8 +2002,8 @@ func (bc *BlockChain) insertSideChain(block *types.Block, it *insertIterator) (i
blocks, memory = blocks[:0], 0

// If the chain is terminating, stop processing blocks
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
log.Debug("Premature abort during blocks processing")
if bc.insertStopped() {
log.Debug("Abort during blocks processing")
return 0, nil
}
}
Expand Down
8 changes: 6 additions & 2 deletions eth/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ func (cs *chainSyncer) loop() {
cs.pm.txFetcher.Start()
defer cs.pm.blockFetcher.Stop()
defer cs.pm.txFetcher.Stop()
defer cs.pm.downloader.Terminate()

// The force timer lowers the peer count threshold down to one when it fires.
// This ensures we'll always start sync even if there aren't enough peers.
Expand All @@ -222,8 +221,13 @@ func (cs *chainSyncer) loop() {
cs.forced = true

case <-cs.pm.quitSync:
// Disable all insertion on the blockchain. This needs to happen before
// terminating the downloader because the downloader waits for blockchain
// inserts, and these can take a long time to finish.
cs.pm.blockchain.StopInsert()
cs.pm.downloader.Terminate()
if cs.doneCh != nil {
cs.pm.downloader.Terminate() // Double term is fine, Cancel would block until queue is emptied
// Wait for the current sync to end.
<-cs.doneCh
}
return
Expand Down
12 changes: 9 additions & 3 deletions light/lightchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,16 @@ func (lc *LightChain) Stop() {
return
}
close(lc.quit)
atomic.StoreInt32(&lc.procInterrupt, 1)

lc.StopInsert()
lc.wg.Wait()
log.Info("Blockchain manager stopped")
log.Info("Blockchain stopped")
}

// StopInsert interrupts all insertion methods, causing them to return
// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after
// calling this method.
func (lc *LightChain) StopInsert() {
atomic.StoreInt32(&lc.procInterrupt, 1)
}

// Rollback is designed to remove a chain of links from the database that aren't
Expand Down