Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions ledger/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,11 @@ func (eval *BlockEvaluator) transaction(txn transactions.SignedTxn, ad transacti
var err error

if eval.validate {
err = txn.Txn.Alive(eval.block)
if err != nil {
return err
}

// Transaction already in the ledger?
txid := txn.ID()
dup, err := cow.isDup(txn.Txn.First(), txn.Txn.Last(), txid, txlease{sender: txn.Txn.Sender, lease: txn.Txn.Lease})
Expand Down Expand Up @@ -717,6 +722,17 @@ func validateTransaction(txn transactions.SignedTxn, block bookkeeping.Block, pr
// AddBlock: eval(context.Background(), blk, false, nil, nil)
// tracker: eval(context.Background(), blk, false, nil, nil)
func (l *Ledger) eval(ctx context.Context, blk bookkeeping.Block, validate bool, txcache VerifiedTxnCache, executionPool execpool.BacklogPool) (StateDelta, error) {
eval, err := startEvaluator(l, blk.BlockHeader, validate, false)
if err != nil {
return StateDelta{}, err
}

// Next, transactions
paysetgroups, err := blk.DecodePaysetGroups()
if err != nil {
return StateDelta{}, err
}

var txvalidator evalTxValidator
ctx, cf := context.WithCancel(ctx)
defer cf()
Expand All @@ -732,20 +748,10 @@ func (l *Ledger) eval(ctx context.Context, blk bookkeeping.Block, validate bool,

txvalidator.ctx = ctx
txvalidator.cf = cf
txvalidator.txgroups = make(chan []transactions.SignedTxnWithAD, 10)
txvalidator.txgroups = make(chan []transactions.SignedTxnWithAD, len(paysetgroups))
txvalidator.done = make(chan error, 1)
go txvalidator.run()
}
eval, err := startEvaluator(l, blk.BlockHeader, validate, false)
if err != nil {
return StateDelta{}, err
}

// Next, transactions
paysetgroups, err := blk.DecodePaysetGroups()
if err != nil {
return StateDelta{}, err
}

for _, txgroup := range paysetgroups {
select {
Expand Down
41 changes: 1 addition & 40 deletions util/execpool/backlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,19 @@ package execpool
import (
"context"
"sync"

"github.com/algorand/go-deadlock"
)

// A backlog for an execution pool. The typical usage of this is to
// create non-blocking queue which would get executed once the execution pool is ready to accept new
// tasks.
type backlog struct {
mu deadlock.Mutex
pool ExecutionPool
wg sync.WaitGroup
buffer chan backlogItemTask
ctx context.Context
ctxCancel context.CancelFunc
owner interface{}
priority Priority
quit bool
}

type backlogItemTask struct {
Expand All @@ -46,7 +42,6 @@ type backlogItemTask struct {
// BacklogPool supports all the ExecutionPool functions plus few more that tests the pending tasks.
type BacklogPool interface {
ExecutionPool
IsFull() bool
EnqueueBacklog(enqueueCtx context.Context, t ExecFunc, arg interface{}, out chan interface{}) error
}

Expand Down Expand Up @@ -80,27 +75,8 @@ func (b *backlog) GetParallelism() int {
return b.pool.GetParallelism()
}

// IsFull test to see if the input buffer is full.
func (b *backlog) IsFull() bool {
b.mu.Lock()
defer b.mu.Unlock()
return len(b.buffer) == cap(b.buffer)
}

// Enqueue enqueues a single task into the backlog
func (b *backlog) Enqueue(enqueueCtx context.Context, t ExecFunc, arg interface{}, priority Priority, out chan interface{}) error {
b.mu.Lock()
defer b.mu.Unlock()
if b.quit {
select {
case <-enqueueCtx.Done():
return enqueueCtx.Err()
case <-b.ctx.Done():
return b.ctx.Err()
default:
return nil
}
}
select {
case b.buffer <- backlogItemTask{
enqueuedTask: enqueuedTask{
Expand All @@ -120,18 +96,6 @@ func (b *backlog) Enqueue(enqueueCtx context.Context, t ExecFunc, arg interface{

// Enqueue enqueues a single task into the backlog
func (b *backlog) EnqueueBacklog(enqueueCtx context.Context, t ExecFunc, arg interface{}, out chan interface{}) error {
b.mu.Lock()
defer b.mu.Unlock()
if b.quit {
select {
case <-enqueueCtx.Done():
return enqueueCtx.Err()
case <-b.ctx.Done():
return b.ctx.Err()
default:
return nil
}
}
select {
case b.buffer <- backlogItemTask{
enqueuedTask: enqueuedTask{
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

github won't let me comment on lines not in this PR, but it looks like this doesn't totally undo the changes done to this file in #700 b/c of the case <-b.ctx.Done() case

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(in both Enqueue and EnqueueBacklog)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may actually be the minimum change needed to fix things. I think without this Enqueue() and EnqueueBacklog() will panic if Shutdown() has been called because they try to send on a closed b.buffer . The internal Context b.ctx is the only thing that can signal that Enqueue shuoldn't actually try to append to b.buffer

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a race between context cancellation and closing the buffer?

Expand All @@ -151,11 +115,8 @@ func (b *backlog) EnqueueBacklog(enqueueCtx context.Context, t ExecFunc, arg int

// Shutdown shuts down the backlog.
func (b *backlog) Shutdown() {
b.mu.Lock()
defer b.mu.Unlock()
b.quit = true
b.ctxCancel()
close(b.buffer)
// NOTE: Do not close(b.buffer) because there's no good way to ensure Enqueue*() won't write to it and panic. Just let it be garbage collected.
b.wg.Wait()
if b.pool.GetOwner() == b {
b.pool.Shutdown()
Expand Down