diff --git a/data/ledger_test.go b/data/ledger_test.go index 76a3c902d1..4d79d7dd70 100644 --- a/data/ledger_test.go +++ b/data/ledger_test.go @@ -34,7 +34,6 @@ import ( "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/logging/telemetryspec" "github.com/algorand/go-algorand/protocol" - "github.com/algorand/go-algorand/util/execpool" ) func incaddr(user *basics.Address) { @@ -70,9 +69,6 @@ func BenchmarkAssemblePayset(b *testing.B) { secrets := make([]*crypto.SignatureSecrets, numUsers) addresses := make([]basics.Address, numUsers) - backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil) - defer backlogPool.Shutdown() - genesis := make(map[basics.Address]basics.AccountData) for i := 0; i < numUsers; i++ { secret := keypair() @@ -164,7 +160,7 @@ func BenchmarkAssemblePayset(b *testing.B) { } b.StartTimer() newEmptyBlk := bookkeeping.MakeBlock(prev) - eval, err := l.StartEvaluator(newEmptyBlk.BlockHeader, tp, backlogPool) + eval, err := l.StartEvaluator(newEmptyBlk.BlockHeader) if err != nil { b.Errorf("could not make proposals at round %d: could not start evaluator: %v", next, err) return diff --git a/data/pools/transactionPool.go b/data/pools/transactionPool.go index fe108e448b..42daf3c917 100644 --- a/data/pools/transactionPool.go +++ b/data/pools/transactionPool.go @@ -459,16 +459,6 @@ func (pool *TransactionPool) OnNewBlock(block bookkeeping.Block, delta ledger.St } } -// alwaysVerifiedPool implements ledger.VerifiedTxnCache and returns every -// transaction as verified. -type alwaysVerifiedPool struct { - pool *TransactionPool -} - -func (*alwaysVerifiedPool) Verified(txn transactions.SignedTxn, params verify.Params) bool { - return true -} - func (pool *TransactionPool) addToPendingBlockEvaluatorOnce(txgroup []transactions.SignedTxn) error { r := pool.pendingBlockEvaluator.Round() + pool.numPendingWholeBlocks for _, tx := range txgroup { @@ -529,7 +519,7 @@ func (pool *TransactionPool) recomputeBlockEvaluator(committedTxIds map[transact next := bookkeeping.MakeBlock(prev) pool.numPendingWholeBlocks = 0 - pool.pendingBlockEvaluator, err = pool.ledger.StartEvaluator(next.BlockHeader, &alwaysVerifiedPool{pool}, nil) + pool.pendingBlockEvaluator, err = pool.ledger.StartEvaluator(next.BlockHeader) if err != nil { logging.Base().Warnf("TransactionPool.recomputeBlockEvaluator: cannot start evaluator: %v", err) return diff --git a/data/pools/transactionPool_test.go b/data/pools/transactionPool_test.go index 757b14b0fe..9cbd86d10a 100644 --- a/data/pools/transactionPool_test.go +++ b/data/pools/transactionPool_test.go @@ -102,7 +102,7 @@ func newBlockEvaluator(t TestingT, l *ledger.Ledger) *ledger.BlockEvaluator { require.NoError(t, err) next := bookkeeping.MakeBlock(prev) - eval, err := l.StartEvaluator(next.BlockHeader, &alwaysVerifiedPool{}, nil) + eval, err := l.StartEvaluator(next.BlockHeader) require.NoError(t, err) return eval diff --git a/ledger/accountdb_test.go b/ledger/accountdb_test.go index db6babcda0..ed8c3add92 100644 --- a/ledger/accountdb_test.go +++ b/ledger/accountdb_test.go @@ -160,6 +160,7 @@ func TestAccountDBInit(t *testing.T) { proto := config.Consensus[protocol.ConsensusCurrentVersion] dbs := dbOpenTest(t) + setDbLogging(t, dbs) defer dbs.close() tx, err := dbs.wdb.Handle.Begin() @@ -180,6 +181,7 @@ func TestAccountDBRound(t *testing.T) { proto := config.Consensus[protocol.ConsensusCurrentVersion] dbs := dbOpenTest(t) + setDbLogging(t, dbs) defer dbs.close() tx, err := dbs.wdb.Handle.Begin() diff --git a/ledger/acctupdates_test.go b/ledger/acctupdates_test.go index e1dbe741d0..ad2bc1741e 100644 --- a/ledger/acctupdates_test.go +++ b/ledger/acctupdates_test.go @@ -34,11 +34,15 @@ import ( type mockLedgerForTracker struct { dbs dbPair blocks []blockEntry + log logging.Logger } func makeMockLedgerForTracker(t *testing.T) *mockLedgerForTracker { dbs := dbOpenTest(t) - return &mockLedgerForTracker{dbs: dbs} + dblogger := logging.TestingLog(t) + dbs.rdb.SetLogger(dblogger) + dbs.wdb.SetLogger(dblogger) + return &mockLedgerForTracker{dbs: dbs, log: dblogger} } func (ml *mockLedgerForTracker) close() { @@ -77,7 +81,7 @@ func (ml *mockLedgerForTracker) trackerDB() dbPair { } func (ml *mockLedgerForTracker) trackerLog() logging.Logger { - return logging.Base() + return ml.log } func checkAcctUpdates(t *testing.T, au *accountUpdates, base basics.Round, latestRnd basics.Round, accts []map[basics.Address]basics.AccountData, rewards []uint64, proto config.ConsensusParams) { diff --git a/ledger/archival_test.go b/ledger/archival_test.go index 95a1fc278d..b7ad33b635 100644 --- a/ledger/archival_test.go +++ b/ledger/archival_test.go @@ -104,7 +104,8 @@ func TestArchival(t *testing.T) { genesisInitState := getInitState() const inMem = true const archival = true - l, err := OpenLedger(logging.Base(), dbName, inMem, genesisInitState, archival) + log := logging.TestingLog(t) + l, err := OpenLedger(log, dbName, inMem, genesisInitState, archival) require.NoError(t, err) defer l.Close() wl := &wrappedLedger{ @@ -495,7 +496,8 @@ func TestArchivalFromNonArchival(t *testing.T) { const inMem = false // use persistent storage archival := false - l, err := OpenLedger(logging.Base(), dbPrefix, inMem, genesisInitState, archival) + log := logging.TestingLog(t) + l, err := OpenLedger(log, dbPrefix, inMem, genesisInitState, archival) require.NoError(t, err) blk := genesisInitState.Block @@ -524,7 +526,7 @@ func TestArchivalFromNonArchival(t *testing.T) { l.Close() archival = true - l, err = OpenLedger(logging.Base(), dbPrefix, inMem, genesisInitState, archival) + l, err = OpenLedger(log, dbPrefix, inMem, genesisInitState, archival) require.NoError(t, err) defer l.Close() diff --git a/ledger/blockdb_test.go b/ledger/blockdb_test.go index 67376b0c6c..3a3550cead 100644 --- a/ledger/blockdb_test.go +++ b/ledger/blockdb_test.go @@ -26,6 +26,7 @@ import ( "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" ) @@ -100,8 +101,15 @@ func checkBlockDB(t *testing.T, tx *sql.Tx, blocks []blockEntry) { require.Error(t, err) } +func setDbLogging(t *testing.T, dbs dbPair) { + dblogger := logging.TestingLog(t) + dbs.rdb.SetLogger(dblogger) + dbs.wdb.SetLogger(dblogger) +} + func TestBlockDBEmpty(t *testing.T) { dbs := dbOpenTest(t) + setDbLogging(t, dbs) defer dbs.close() tx, err := dbs.wdb.Handle.Begin() @@ -115,6 +123,7 @@ func TestBlockDBEmpty(t *testing.T) { func TestBlockDBInit(t *testing.T) { dbs := dbOpenTest(t) + setDbLogging(t, dbs) defer dbs.close() tx, err := dbs.wdb.Handle.Begin() @@ -134,6 +143,7 @@ func TestBlockDBInit(t *testing.T) { func TestBlockDBAppend(t *testing.T) { dbs := dbOpenTest(t) + setDbLogging(t, dbs) defer dbs.close() tx, err := dbs.wdb.Handle.Begin() diff --git a/ledger/eval.go b/ledger/eval.go index 28182fa45e..f277502e26 100644 --- a/ledger/eval.go +++ b/ledger/eval.go @@ -156,7 +156,6 @@ type BlockEvaluator struct { state *roundCowState validate bool generate bool - txcache VerifiedTxnCache prevHeader bookkeeping.BlockHeader // cached proto config.ConsensusParams @@ -165,8 +164,6 @@ type BlockEvaluator struct { block bookkeeping.Block blockTxBytes int - verificationPool execpool.BacklogPool - l ledgerForEvaluator } @@ -183,11 +180,11 @@ type ledgerForEvaluator interface { // StartEvaluator creates a BlockEvaluator, given a ledger and a block header // of the block that the caller is planning to evaluate. -func (l *Ledger) StartEvaluator(hdr bookkeeping.BlockHeader, txcache VerifiedTxnCache, executionPool execpool.BacklogPool) (*BlockEvaluator, error) { - return startEvaluator(l, hdr, true, true, txcache, executionPool) +func (l *Ledger) StartEvaluator(hdr bookkeeping.BlockHeader) (*BlockEvaluator, error) { + return startEvaluator(l, hdr, true, true) } -func startEvaluator(l ledgerForEvaluator, hdr bookkeeping.BlockHeader, validate bool, generate bool, txcache VerifiedTxnCache, executionPool execpool.BacklogPool) (*BlockEvaluator, error) { +func startEvaluator(l ledgerForEvaluator, hdr bookkeeping.BlockHeader, validate bool, generate bool) (*BlockEvaluator, error) { proto, ok := config.Consensus[hdr.CurrentProtocol] if !ok { return nil, protocol.Error(hdr.CurrentProtocol) @@ -204,14 +201,12 @@ func startEvaluator(l ledgerForEvaluator, hdr bookkeeping.BlockHeader, validate } eval := &BlockEvaluator{ - validate: validate, - generate: generate, - txcache: txcache, - block: bookkeeping.Block{BlockHeader: hdr}, - proto: proto, - genesisHash: l.GenesisHash(), - verificationPool: executionPool, - l: l, + validate: validate, + generate: generate, + block: bookkeeping.Block{BlockHeader: hdr}, + proto: proto, + genesisHash: l.GenesisHash(), + l: l, } if hdr.Round > 0 { @@ -394,11 +389,6 @@ func (eval *BlockEvaluator) TestTransactionGroup(txgroup []transactions.SignedTx // on a single transaction, but does not actually add the transaction to the block // evaluator, or modify the block evaluator state in any other visible way. func (eval *BlockEvaluator) testTransaction(txn transactions.SignedTxn, cow *roundCowState) error { - // Verify that groups are supported. - if !txn.Txn.Group.IsZero() && !eval.proto.SupportTxGroups { - return fmt.Errorf("transaction groups not supported") - } - // Transaction valid (not expired)? err := txn.Txn.Alive(eval.block) if err != nil { @@ -468,17 +458,10 @@ func (eval *BlockEvaluator) transactionGroup(txgroup []transactions.SignedTxnWit cow := eval.state.child() - groupNoAD := make([]transactions.SignedTxn, len(txgroup)) - for i := range txgroup { - groupNoAD[i] = txgroup[i].SignedTxn - } - - ctxs := verify.PrepareContexts(groupNoAD, eval.block.BlockHeader) - for gi, txad := range txgroup { var txib transactions.SignedTxnInBlock - err := eval.transaction(txad.SignedTxn, txad.ApplyData, groupNoAD, gi, ctxs[gi], cow, &txib) + err := eval.transaction(txad.SignedTxn, txad.ApplyData, cow, &txib) if err != nil { return err } @@ -529,21 +512,10 @@ func (eval *BlockEvaluator) transactionGroup(txgroup []transactions.SignedTxnWit // transaction tentatively executes a new transaction as part of this block evaluation. // If the transaction cannot be added to the block without violating some constraints, // an error is returned and the block evaluator state is unchanged. -func (eval *BlockEvaluator) transaction(txn transactions.SignedTxn, ad transactions.ApplyData, txgroup []transactions.SignedTxn, groupIndex int, ctx verify.Context, cow *roundCowState, txib *transactions.SignedTxnInBlock) error { +func (eval *BlockEvaluator) transaction(txn transactions.SignedTxn, ad transactions.ApplyData, cow *roundCowState, txib *transactions.SignedTxnInBlock) error { var err error - spec := transactions.SpecialAddresses{ - FeeSink: eval.block.BlockHeader.FeeSink, - RewardsPool: eval.block.BlockHeader.RewardsPool, - } - if eval.validate { - // Transaction valid (not expired)? - err = txn.Txn.Alive(eval.block) - if err != nil { - return err - } - // Transaction already in the ledger? txid := txn.ID() dup, err := cow.isDup(txn.Txn.First(), txn.Txn.Last(), txid, txlease{sender: txn.Txn.Sender, lease: txn.Txn.Lease}) @@ -553,24 +525,11 @@ func (eval *BlockEvaluator) transaction(txn transactions.SignedTxn, ad transacti if dup { return TransactionInLedgerError{txn.ID()} } + } - // Well-formed on its own? - err = txn.Txn.WellFormed(spec, eval.proto) - if err != nil { - return fmt.Errorf("transaction %v: malformed: %v", txn.ID(), err) - } - - if eval.txcache == nil || !eval.txcache.Verified(txn, ctx.Params) { - err = verify.TxnPool(&txn, ctx, eval.verificationPool) - if err != nil { - return fmt.Errorf("transaction %v: failed to verify: %v", txn.ID(), err) - } - } - - // Verify that groups are supported. - if !txn.Txn.Group.IsZero() && !eval.proto.SupportTxGroups { - return fmt.Errorf("transaction groups not supported") - } + spec := transactions.SpecialAddresses{ + FeeSink: eval.block.BlockHeader.FeeSink, + RewardsPool: eval.block.BlockHeader.RewardsPool, } // Apply the transaction, updating the cow balances @@ -697,14 +656,93 @@ func (eval *BlockEvaluator) GenerateBlock() (*ValidatedBlock, error) { return &vb, nil } +type evalTxValidator struct { + txcache VerifiedTxnCache + block bookkeeping.Block + proto config.ConsensusParams + verificationPool execpool.BacklogPool + + ctx context.Context + cf context.CancelFunc + txgroups chan []transactions.SignedTxnWithAD + done chan error +} + +func (validator *evalTxValidator) run() { + for txgroup := range validator.txgroups { + select { + case <-validator.ctx.Done(): + validator.done <- validator.ctx.Err() + validator.cf() + close(validator.done) + return + default: + } + groupNoAD := make([]transactions.SignedTxn, len(txgroup)) + for i := range txgroup { + groupNoAD[i] = txgroup[i].SignedTxn + } + ctxs := verify.PrepareContexts(groupNoAD, validator.block.BlockHeader) + + for gi, tx := range txgroup { + err := validateTransaction(tx.SignedTxn, validator.block, validator.proto, validator.txcache, ctxs[gi], validator.verificationPool) + if err != nil { + validator.done <- err + validator.cf() + close(validator.done) + return + } + } + } + close(validator.done) +} + +func validateTransaction(txn transactions.SignedTxn, block bookkeeping.Block, proto config.ConsensusParams, txcache VerifiedTxnCache, ctx verify.Context, verificationPool execpool.BacklogPool) error { + // Transaction valid (not expired)? + err := txn.Txn.Alive(block) + if err != nil { + return err + } + + if txcache == nil || !txcache.Verified(txn, ctx.Params) { + err = verify.TxnPool(&txn, ctx, verificationPool) + if err != nil { + return fmt.Errorf("transaction %v: failed to verify: %v", txn.ID(), err) + } + } + return nil +} + +// used by Ledger.Validate() Ledger.AddBlock() Ledger.trackerEvalVerified()(accountUpdates.loadFromDisk()) +// +// Validate: eval(ctx, blk, true, txcache, executionPool) +// AddBlock: eval(context.Background(), blk, false, nil, nil) +// tracker: eval(context.Background(), blk, false, nil, nil) func (l *Ledger) eval(ctx context.Context, blk bookkeeping.Block, validate bool, txcache VerifiedTxnCache, executionPool execpool.BacklogPool) (StateDelta, error) { - eval, err := startEvaluator(l, blk.BlockHeader, validate, false, txcache, executionPool) + var txvalidator evalTxValidator + ctx, cf := context.WithCancel(ctx) + defer cf() + if validate { + proto, ok := config.Consensus[blk.CurrentProtocol] + if !ok { + return StateDelta{}, protocol.Error(blk.CurrentProtocol) + } + txvalidator.txcache = txcache + txvalidator.block = blk + txvalidator.proto = proto + txvalidator.verificationPool = executionPool + + txvalidator.ctx = ctx + txvalidator.cf = cf + txvalidator.txgroups = make(chan []transactions.SignedTxnWithAD, 10) + txvalidator.done = make(chan error, 1) + go txvalidator.run() + } + eval, err := startEvaluator(l, blk.BlockHeader, validate, false) if err != nil { return StateDelta{}, err } - // TODO: batch tx sig verification: ingest blk.Payset and output a list of ValidatedTx - // Next, transactions paysetgroups, err := blk.DecodePaysetGroups() if err != nil { @@ -714,10 +752,18 @@ func (l *Ledger) eval(ctx context.Context, blk bookkeeping.Block, validate bool, for _, txgroup := range paysetgroups { select { case <-ctx.Done(): + select { + case err := <-txvalidator.done: + return StateDelta{}, err + default: + } return StateDelta{}, ctx.Err() default: } + if validate { + txvalidator.txgroups <- txgroup + } err = eval.TransactionGroup(txgroup) if err != nil { return StateDelta{}, err @@ -732,6 +778,11 @@ func (l *Ledger) eval(ctx context.Context, blk bookkeeping.Block, validate bool, // If validating, do final block checks that depend on our new state if validate { + close(txvalidator.txgroups) + err, gotErr := <-txvalidator.done + if gotErr && err != nil { + return StateDelta{}, err + } err = eval.finalValidation() if err != nil { return StateDelta{}, err diff --git a/ledger/eval_test.go b/ledger/eval_test.go index 310f77e361..c5a934b419 100644 --- a/ledger/eval_test.go +++ b/ledger/eval_test.go @@ -30,7 +30,6 @@ import ( "github.com/algorand/go-algorand/data/transactions" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" - "github.com/algorand/go-algorand/util/execpool" ) var testPoolAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} @@ -45,9 +44,6 @@ func init() { func TestBlockEvaluator(t *testing.T) { genesisInitState, addrs, keys := genesis(10) - backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil) - defer backlogPool.Shutdown() - dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64()) const inMem = true const archival = true @@ -56,7 +52,7 @@ func TestBlockEvaluator(t *testing.T) { defer l.Close() newBlock := bookkeeping.MakeBlock(genesisInitState.Block.BlockHeader) - eval, err := l.StartEvaluator(newBlock.BlockHeader, nil, backlogPool) + eval, err := l.StartEvaluator(newBlock.BlockHeader) require.NoError(t, err) genHash := genesisInitState.Block.BlockHeader.GenesisHash @@ -75,20 +71,8 @@ func TestBlockEvaluator(t *testing.T) { }, } - // Zero signature should fail - st := transactions.SignedTxn{ - Txn: txn, - } - err = eval.Transaction(st, transactions.ApplyData{}) - require.Error(t, err) - - // Random signature should fail - crypto.RandBytes(st.Sig[:]) - err = eval.Transaction(st, transactions.ApplyData{}) - require.Error(t, err) - // Correct signature should work - st = txn.Sign(keys[0]) + st := txn.Sign(keys[0]) err = eval.Transaction(st, transactions.ApplyData{}) require.NoError(t, err) diff --git a/ledger/ledger.go b/ledger/ledger.go index 2d63b5140d..cb8a90c8e3 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -102,6 +102,10 @@ func OpenLedger( err = fmt.Errorf("OpenLedger.openLedgerDB %v", err) return nil, err } + l.trackerDBs.rdb.SetLogger(log) + l.trackerDBs.wdb.SetLogger(log) + l.blockDBs.rdb.SetLogger(log) + l.blockDBs.wdb.SetLogger(log) err = l.blockDBs.wdb.Atomic(func(tx *sql.Tx) error { return initBlocksDB(tx, l, []bookkeeping.Block{genesisInitState.Block}, isArchival) diff --git a/ledger/ledger_test.go b/ledger/ledger_test.go index 4925b397b4..e06bb74008 100644 --- a/ledger/ledger_test.go +++ b/ledger/ledger_test.go @@ -202,7 +202,8 @@ func TestLedgerBasic(t *testing.T) { genesisInitState, _ := testGenerateInitState(t, protocol.ConsensusCurrentVersion) const inMem = true const archival = true - l, err := OpenLedger(logging.Base(), t.Name(), inMem, genesisInitState, archival) + log := logging.TestingLog(t) + l, err := OpenLedger(log, t.Name(), inMem, genesisInitState, archival) require.NoError(t, err, "could not open ledger") defer l.Close() } @@ -353,7 +354,8 @@ func TestLedgerSingleTx(t *testing.T) { genesisInitState, initSecrets := testGenerateInitState(t, protocol.ConsensusV7) const inMem = true const archival = true - l, err := OpenLedger(logging.Base(), t.Name(), inMem, genesisInitState, archival) + log := logging.TestingLog(t) + l, err := OpenLedger(log, t.Name(), inMem, genesisInitState, archival) a.NoError(err, "could not open ledger") defer l.Close() @@ -488,6 +490,11 @@ func TestLedgerSingleTx(t *testing.T) { sbadTx.Sig = crypto.Signature{} a.Error(l.appendUnvalidatedSignedTx(t, initAccounts, sbadTx, ad), "added tx with no signature") + badTx = correctPay + sbadTx = sign(initSecrets, badTx) + sbadTx.Sig[5]++ + a.Error(l.appendUnvalidatedSignedTx(t, initAccounts, sbadTx, ad), "added tx with corrupt signature") + // TODO set multisig and test badTx = correctPay @@ -538,7 +545,8 @@ func testLedgerSingleTxApplyData(t *testing.T, version protocol.ConsensusVersion genesisInitState, initSecrets := testGenerateInitState(t, version) const inMem = true const archival = true - l, err := OpenLedger(logging.Base(), t.Name(), inMem, genesisInitState, archival) + log := logging.TestingLog(t) + l, err := OpenLedger(log, t.Name(), inMem, genesisInitState, archival) a.NoError(err, "could not open ledger") defer l.Close() diff --git a/node/impls.go b/node/impls.go index 136c5d1068..0408ba9a59 100644 --- a/node/impls.go +++ b/node/impls.go @@ -97,7 +97,7 @@ func (i *blockFactoryImpl) AssembleBlock(round basics.Round, deadline time.Time) newEmptyBlk := bookkeeping.MakeBlock(prev) - eval, err := i.l.StartEvaluator(newEmptyBlk.BlockHeader, i.tp, i.verificationPool) + eval, err := i.l.StartEvaluator(newEmptyBlk.BlockHeader) if err != nil { return nil, fmt.Errorf("could not make proposals at round %d: could not start evaluator: %v", round, err) } diff --git a/util/db/dbutil.go b/util/db/dbutil.go index f688d2e216..b01f51d24f 100644 --- a/util/db/dbutil.go +++ b/util/db/dbutil.go @@ -53,6 +53,7 @@ var initStatements []string type Accessor struct { Handle *sql.DB readOnly bool + log logging.Logger } // MakeAccessor creates a new Accessor. @@ -100,22 +101,35 @@ func (db Accessor) runInitStatements() error { return nil } +// SetLogger sets the Logger, mainly for unit test quietness +func (db *Accessor) SetLogger(log logging.Logger) { + db.log = log +} + +func (db *Accessor) logger() logging.Logger { + if db.log != nil { + return db.log + } + return logging.Base() +} + // Close closes the connection. func (db Accessor) Close() { db.Handle.Close() db.Handle = nil } -// Retry executes a function repeatedly as long as it returns an error +// LoggedRetry executes a function repeatedly as long as it returns an error // that indicates database contention that warrants a retry. -func Retry(fn func() error) (err error) { +// Sends warnings and errors to log. +func LoggedRetry(fn func() error, log logging.Logger) (err error) { for i := 0; ; i++ { if i > 0 && i%warnTxRetries == 0 { if i >= 1000 { - logging.Base().Errorf("db.Retry: %d retries (last err: %v)", i, err) + log.Errorf("db.Retry: %d retries (last err: %v)", i, err) return } - logging.Base().Warnf("db.Retry: %d retries (last err: %v)", i, err) + log.Warnf("db.Retry: %d retries (last err: %v)", i, err) } err = fn() @@ -127,9 +141,16 @@ func Retry(fn func() error) (err error) { } } +// Retry executes a function repeatedly as long as it returns an error +// that indicates database contention that warrants a retry. +// Sends warnings and errors to logging.Base() +func Retry(fn func() error) (err error) { + return LoggedRetry(fn, logging.Base()) +} + // getDecoratedLogger retruns a decorated logger that includes the readonly true/false, caller and extra fields. func (db *Accessor) getDecoratedLogger(fn idemFn, extras ...interface{}) logging.Logger { - log := logging.Base().With("readonly", db.readOnly) + log := db.logger().With("readonly", db.readOnly) _, file, line, ok := runtime.Caller(2) if ok { log = log.With("caller", fmt.Sprintf("%s:%d", file, line)) diff --git a/util/execpool/backlog.go b/util/execpool/backlog.go index bdce408a7a..e346de7343 100644 --- a/util/execpool/backlog.go +++ b/util/execpool/backlog.go @@ -19,12 +19,15 @@ package execpool import ( "context" "sync" + + "github.com/algorand/go-deadlock" ) // A backlog for an execution pool. The typical usage of this is to // create non-blocking queue which would get executed once the execution pool is ready to accept new // tasks. type backlog struct { + mu deadlock.Mutex pool ExecutionPool wg sync.WaitGroup buffer chan backlogItemTask @@ -32,6 +35,7 @@ type backlog struct { ctxCancel context.CancelFunc owner interface{} priority Priority + quit bool } type backlogItemTask struct { @@ -78,11 +82,25 @@ func (b *backlog) GetParallelism() int { // IsFull test to see if the input buffer is full. func (b *backlog) IsFull() bool { + b.mu.Lock() + defer b.mu.Unlock() return len(b.buffer) == cap(b.buffer) } // Enqueue enqueues a single task into the backlog func (b *backlog) Enqueue(enqueueCtx context.Context, t ExecFunc, arg interface{}, priority Priority, out chan interface{}) error { + b.mu.Lock() + defer b.mu.Unlock() + if b.quit { + select { + case <-enqueueCtx.Done(): + return enqueueCtx.Err() + case <-b.ctx.Done(): + return b.ctx.Err() + default: + return nil + } + } select { case b.buffer <- backlogItemTask{ enqueuedTask: enqueuedTask{ @@ -95,11 +113,25 @@ func (b *backlog) Enqueue(enqueueCtx context.Context, t ExecFunc, arg interface{ return nil case <-enqueueCtx.Done(): return enqueueCtx.Err() + case <-b.ctx.Done(): + return b.ctx.Err() } } // Enqueue enqueues a single task into the backlog func (b *backlog) EnqueueBacklog(enqueueCtx context.Context, t ExecFunc, arg interface{}, out chan interface{}) error { + b.mu.Lock() + defer b.mu.Unlock() + if b.quit { + select { + case <-enqueueCtx.Done(): + return enqueueCtx.Err() + case <-b.ctx.Done(): + return b.ctx.Err() + default: + return nil + } + } select { case b.buffer <- backlogItemTask{ enqueuedTask: enqueuedTask{ @@ -112,11 +144,16 @@ func (b *backlog) EnqueueBacklog(enqueueCtx context.Context, t ExecFunc, arg int return nil case <-enqueueCtx.Done(): return enqueueCtx.Err() + case <-b.ctx.Done(): + return b.ctx.Err() } } // Shutdown shuts down the backlog. func (b *backlog) Shutdown() { + b.mu.Lock() + defer b.mu.Unlock() + b.quit = true b.ctxCancel() close(b.buffer) b.wg.Wait()