diff --git a/ledger/eval.go b/ledger/eval.go index cc94c17318..b98465e651 100644 --- a/ledger/eval.go +++ b/ledger/eval.go @@ -514,6 +514,11 @@ func (eval *BlockEvaluator) transaction(txn transactions.SignedTxn, ad transacti var err error if eval.validate { + err = txn.Txn.Alive(eval.block) + if err != nil { + return err + } + // Transaction already in the ledger? txid := txn.ID() dup, err := cow.isDup(txn.Txn.First(), txn.Txn.Last(), txid, txlease{sender: txn.Txn.Sender, lease: txn.Txn.Lease}) @@ -717,6 +722,17 @@ func validateTransaction(txn transactions.SignedTxn, block bookkeeping.Block, pr // AddBlock: eval(context.Background(), blk, false, nil, nil) // tracker: eval(context.Background(), blk, false, nil, nil) func (l *Ledger) eval(ctx context.Context, blk bookkeeping.Block, validate bool, txcache VerifiedTxnCache, executionPool execpool.BacklogPool) (StateDelta, error) { + eval, err := startEvaluator(l, blk.BlockHeader, validate, false) + if err != nil { + return StateDelta{}, err + } + + // Next, transactions + paysetgroups, err := blk.DecodePaysetGroups() + if err != nil { + return StateDelta{}, err + } + var txvalidator evalTxValidator ctx, cf := context.WithCancel(ctx) defer cf() @@ -732,20 +748,10 @@ func (l *Ledger) eval(ctx context.Context, blk bookkeeping.Block, validate bool, txvalidator.ctx = ctx txvalidator.cf = cf - txvalidator.txgroups = make(chan []transactions.SignedTxnWithAD, 10) + txvalidator.txgroups = make(chan []transactions.SignedTxnWithAD, len(paysetgroups)) txvalidator.done = make(chan error, 1) go txvalidator.run() } - eval, err := startEvaluator(l, blk.BlockHeader, validate, false) - if err != nil { - return StateDelta{}, err - } - - // Next, transactions - paysetgroups, err := blk.DecodePaysetGroups() - if err != nil { - return StateDelta{}, err - } for _, txgroup := range paysetgroups { select { diff --git a/util/execpool/backlog.go b/util/execpool/backlog.go index e346de7343..8353cbdee2 100644 --- a/util/execpool/backlog.go +++ b/util/execpool/backlog.go @@ -19,15 +19,12 @@ package execpool import ( "context" "sync" - - "github.com/algorand/go-deadlock" ) // A backlog for an execution pool. The typical usage of this is to // create non-blocking queue which would get executed once the execution pool is ready to accept new // tasks. type backlog struct { - mu deadlock.Mutex pool ExecutionPool wg sync.WaitGroup buffer chan backlogItemTask @@ -35,7 +32,6 @@ type backlog struct { ctxCancel context.CancelFunc owner interface{} priority Priority - quit bool } type backlogItemTask struct { @@ -46,7 +42,6 @@ type backlogItemTask struct { // BacklogPool supports all the ExecutionPool functions plus few more that tests the pending tasks. type BacklogPool interface { ExecutionPool - IsFull() bool EnqueueBacklog(enqueueCtx context.Context, t ExecFunc, arg interface{}, out chan interface{}) error } @@ -80,27 +75,8 @@ func (b *backlog) GetParallelism() int { return b.pool.GetParallelism() } -// IsFull test to see if the input buffer is full. -func (b *backlog) IsFull() bool { - b.mu.Lock() - defer b.mu.Unlock() - return len(b.buffer) == cap(b.buffer) -} - // Enqueue enqueues a single task into the backlog func (b *backlog) Enqueue(enqueueCtx context.Context, t ExecFunc, arg interface{}, priority Priority, out chan interface{}) error { - b.mu.Lock() - defer b.mu.Unlock() - if b.quit { - select { - case <-enqueueCtx.Done(): - return enqueueCtx.Err() - case <-b.ctx.Done(): - return b.ctx.Err() - default: - return nil - } - } select { case b.buffer <- backlogItemTask{ enqueuedTask: enqueuedTask{ @@ -120,18 +96,6 @@ func (b *backlog) Enqueue(enqueueCtx context.Context, t ExecFunc, arg interface{ // Enqueue enqueues a single task into the backlog func (b *backlog) EnqueueBacklog(enqueueCtx context.Context, t ExecFunc, arg interface{}, out chan interface{}) error { - b.mu.Lock() - defer b.mu.Unlock() - if b.quit { - select { - case <-enqueueCtx.Done(): - return enqueueCtx.Err() - case <-b.ctx.Done(): - return b.ctx.Err() - default: - return nil - } - } select { case b.buffer <- backlogItemTask{ enqueuedTask: enqueuedTask{ @@ -151,11 +115,8 @@ func (b *backlog) EnqueueBacklog(enqueueCtx context.Context, t ExecFunc, arg int // Shutdown shuts down the backlog. func (b *backlog) Shutdown() { - b.mu.Lock() - defer b.mu.Unlock() - b.quit = true b.ctxCancel() - close(b.buffer) + // NOTE: Do not close(b.buffer) because there's no good way to ensure Enqueue*() won't write to it and panic. Just let it be garbage collected. b.wg.Wait() if b.pool.GetOwner() == b { b.pool.Shutdown()