Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions data/pools/transactionPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ type TransactionPool struct {
// stateproofOverflowed indicates that a stateproof transaction was allowed to
// exceed the txPoolMaxSize. This flag is reset to false OnNewBlock
stateproofOverflowed bool

// shutdown is set to true when the pool is being shut down. It is checked in exported methods
// to prevent pool operations like remember and recomputing the block evaluator
// from using down stream resources like ledger that may be shutting down.
shutdown bool
}

// BlockEvaluator defines the block evaluator interface exposed by the ledger package.
Expand All @@ -113,6 +118,8 @@ type VotingAccountSupplier interface {
VotingAccountsForRound(basics.Round) []basics.Address
}

var errPoolShutdown = errors.New("transaction pool is shutting down")

// MakeTransactionPool makes a transaction pool.
func MakeTransactionPool(ledger *ledger.Ledger, cfg config.Local, log logging.Logger, vac VotingAccountSupplier) *TransactionPool {
if cfg.TxPoolExponentialIncreaseFactor < 1 {
Expand Down Expand Up @@ -430,6 +437,10 @@ func (pool *TransactionPool) ingest(txgroup []transactions.SignedTxn, params poo
return ErrNoPendingBlockEvaluator
}

if pool.shutdown {
Comment thread
gmalouf marked this conversation as resolved.
return errPoolShutdown
}

if !params.recomputing {
// Make sure that the latest block has been processed by OnNewBlock().
// If not, we might be in a race, so wait a little bit for OnNewBlock()
Expand All @@ -441,6 +452,10 @@ func (pool *TransactionPool) ingest(txgroup []transactions.SignedTxn, params poo
if pool.pendingBlockEvaluator == nil {
return ErrNoPendingBlockEvaluator
}
// recheck if the pool is shutting down since TimedWait above releases the lock
if pool.shutdown {
return errPoolShutdown
}
}

err := pool.checkSufficientFee(txgroup)
Expand Down Expand Up @@ -529,6 +544,10 @@ func (pool *TransactionPool) OnNewBlock(block bookkeeping.Block, delta ledgercor

pool.mu.Lock()
defer pool.mu.Unlock()
if pool.shutdown {
Comment thread
cce marked this conversation as resolved.
return
}

defer pool.cond.Broadcast()
if pool.pendingBlockEvaluator == nil || block.Round() >= pool.pendingBlockEvaluator.Round() {
// Adjust the pool fee threshold. The rules are:
Expand Down Expand Up @@ -1010,3 +1029,13 @@ func (pool *TransactionPool) AssembleDevModeBlock() (assembled *ledgercore.Unfin
assembled, err = pool.AssembleBlock(pool.pendingBlockEvaluator.Round(), time.Now().Add(pool.proposalAssemblyTime))
return
}

// Shutdown stops the transaction pool from accepting new transactions and blocks.
// It takes the pool.mu lock in order to ensure there is no pending remember or block operations in flight
// and sets the shutdown flag to true.
func (pool *TransactionPool) Shutdown() {
Comment thread
gmalouf marked this conversation as resolved.
pool.mu.Lock()
defer pool.mu.Unlock()

pool.shutdown = true
}
2 changes: 2 additions & 0 deletions ledger/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func (bn *blockNotifier) worker() {

func (bn *blockNotifier) close() {
bn.mu.Lock()
bn.pendingBlocks = nil
bn.listeners = nil
if bn.running {
bn.running = false
bn.cond.Broadcast()
Expand Down
18 changes: 14 additions & 4 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ type AlgorandFullNode struct {
tracer messagetracer.MessageTracer

stateProofWorker *stateproof.Worker
partHandles []db.Accessor
}

// TxnWithStatus represents information about a single transaction,
Expand Down Expand Up @@ -418,6 +419,12 @@ func (node *AlgorandFullNode) Stop() {
defer func() {
node.mu.Unlock()
node.waitMonitoringRoutines()

// oldKeyDeletionThread uses accountManager registry so must be stopped before accountManager is closed
node.accountManager.Registry().Close()
for h := range node.partHandles {
node.partHandles[h].Close()
}
}()

node.net.ClearHandlers()
Expand All @@ -430,6 +437,7 @@ func (node *AlgorandFullNode) Stop() {
node.stateProofWorker.Stop()
node.txHandler.Stop()
node.agreementService.Shutdown()
node.agreementService.Accessor.Close()
Comment thread
gmalouf marked this conversation as resolved.
node.catchupService.Stop()
node.txPoolSyncerService.Stop()
node.blockService.Stop()
Expand All @@ -441,7 +449,9 @@ func (node *AlgorandFullNode) Stop() {
node.lowPriorityCryptoVerificationPool.Shutdown()
node.cryptoPool.Shutdown()
node.log.Debug("crypto worker pools have stopped")
node.transactionPool.Shutdown()
Comment thread
cce marked this conversation as resolved.
node.cancelCtx()
node.ledger.Close()
}

// note: unlike the other two functions, this accepts a whole filename
Expand Down Expand Up @@ -987,12 +997,12 @@ func (node *AlgorandFullNode) loadParticipationKeys() error {
// These files are not ephemeral and must be deleted eventually since
// this function is called to load files located in the node on startup
added := node.accountManager.AddParticipation(part, false)
if added {
node.log.Infof("Loaded participation keys from storage: %s %s", part.Address(), info.Name())
} else {
if !added {
part.Close()
continue
}
node.log.Infof("Loaded participation keys from storage: %s %s", part.Address(), info.Name())
node.partHandles = append(node.partHandles, handle)
err = insertStateProofToRegistry(part, node)
if err != nil {
return err
Expand Down Expand Up @@ -1024,7 +1034,7 @@ func (node *AlgorandFullNode) txPoolGaugeThread(done <-chan struct{}) {
defer node.monitoringRoutinesWaitGroup.Done()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for true {
for {
select {
case <-ticker.C:
txPoolGauge.Set(uint64(node.transactionPool.PendingCount()))
Expand Down