diff --git a/ledger/acctonline.go b/ledger/acctonline.go index 0db04e92ad..380ff45852 100644 --- a/ledger/acctonline.go +++ b/ledger/acctonline.go @@ -355,13 +355,6 @@ func (ao *onlineAccounts) consecutiveVersion(offset uint64) uint64 { return offset } -func (ao *onlineAccounts) handleUnorderedCommit(dcc *deferredCommitContext) { -} -func (ao *onlineAccounts) handlePrepareCommitError(dcc *deferredCommitContext) { -} -func (ao *onlineAccounts) handleCommitError(dcc *deferredCommitContext) { -} - func (ao *onlineAccounts) maxBalLookback() uint64 { lastProtoVersion := ao.onlineRoundParamsData[len(ao.onlineRoundParamsData)-1].CurrentProtocol return config.Consensus[lastProtoVersion].MaxBalLookback @@ -535,9 +528,6 @@ func (ao *onlineAccounts) postCommit(ctx context.Context, dcc *deferredCommitCon ao.voters.postCommit(dcc) } -func (ao *onlineAccounts) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { -} - // onlineCirculation return the total online balance for the given round, for use by agreement. func (ao *onlineAccounts) onlineCirculation(rnd basics.Round, voteRnd basics.Round) (basics.MicroAlgos, error) { // Get cached total stake for rnd diff --git a/ledger/acctonline_test.go b/ledger/acctonline_test.go index afc7244082..296a5a2481 100644 --- a/ledger/acctonline_test.go +++ b/ledger/acctonline_test.go @@ -110,7 +110,9 @@ func commitSyncPartialComplete(t *testing.T, oa *onlineAccounts, ml *mockLedgerF ml.trackers.lastFlushTime = dcc.flushTime for _, lt := range ml.trackers.trackers { - lt.postCommitUnlocked(ml.trackers.ctx, dcc) + if lt, ok := lt.(trackerCommitLifetimeHandlers); ok { + lt.postCommitUnlocked(ml.trackers.ctx, dcc) + } } } diff --git a/ledger/acctupdates.go b/ledger/acctupdates.go index 3f12955666..6acbb12ae5 100644 --- a/ledger/acctupdates.go +++ b/ledger/acctupdates.go @@ -1483,13 +1483,6 @@ func (au *accountUpdates) roundOffset(rnd basics.Round) (offset uint64, err erro return off, nil } -func (au *accountUpdates) handleUnorderedCommit(dcc *deferredCommitContext) { -} -func (au *accountUpdates) handlePrepareCommitError(dcc *deferredCommitContext) { -} -func (au *accountUpdates) handleCommitError(dcc *deferredCommitContext) { -} - // prepareCommit prepares data to write to the database a "chunk" of rounds, and update the cached dbRound accordingly. func (au *accountUpdates) prepareCommit(dcc *deferredCommitContext) error { if au.logAccountUpdatesMetrics { @@ -1745,9 +1738,6 @@ func (au *accountUpdates) postCommit(ctx context.Context, dcc *deferredCommitCon } } -func (au *accountUpdates) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { -} - // compactKvDeltas takes an array of StateDeltas containing kv deltas (one array entry per round), and // compacts the array into a single map that contains all the // changes. Intermediate changes are eliminated. It counts the number of diff --git a/ledger/acctupdates_test.go b/ledger/acctupdates_test.go index a27a2be795..aa43b07b90 100644 --- a/ledger/acctupdates_test.go +++ b/ledger/acctupdates_test.go @@ -2044,7 +2044,6 @@ func TestAcctUpdatesResources(t *testing.T) { require.NoError(t, err) ml.trackers.dbRound = newBase au.postCommit(ml.trackers.ctx, dcc) - au.postCommitUnlocked(ml.trackers.ctx, dcc) }() } @@ -2330,7 +2329,6 @@ func auCommitSync(t *testing.T, rnd basics.Round, au *accountUpdates, ml *mockLe require.NoError(t, err) ml.trackers.dbRound = newBase au.postCommit(ml.trackers.ctx, dcc) - au.postCommitUnlocked(ml.trackers.ctx, dcc) }() } } diff --git a/ledger/bulletin.go b/ledger/bulletin.go index 0b3f08a6b4..8af69f472d 100644 --- a/ledger/bulletin.go +++ b/ledger/bulletin.go @@ -142,16 +142,6 @@ func (b *bulletin) commitRound(context.Context, trackerdb.TransactionScope, *def func (b *bulletin) postCommit(ctx context.Context, dcc *deferredCommitContext) { } -func (b *bulletin) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { -} - -func (b *bulletin) handleUnorderedCommit(dcc *deferredCommitContext) { -} -func (b *bulletin) handlePrepareCommitError(dcc *deferredCommitContext) { -} -func (b *bulletin) handleCommitError(dcc *deferredCommitContext) { -} - func (b *bulletin) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { return dcr } diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go index 036f5490b3..df7772de53 100644 --- a/ledger/catchpointtracker.go +++ b/ledger/catchpointtracker.go @@ -982,6 +982,14 @@ func (ct *catchpointTracker) handlePrepareCommitError(dcc *deferredCommitContext ct.cancelWrite(dcc) } +// if an error is encountered between retries, clear the balancesTrie to clear in-memory changes made in commitRound(). +func (ct *catchpointTracker) clearCommitRoundRetry(ctx context.Context, dcc *deferredCommitContext) { + ct.log.Infof("rolling back failed commitRound for oldBase %d offset %d, clearing balancesTrie", dcc.oldBase, dcc.offset) + ct.catchpointsMu.Lock() + ct.balancesTrie = nil // balancesTrie will be re-created in the next call to commitRound + ct.catchpointsMu.Unlock() +} + // if an error is encountered during commit, cancel writing and clear the balances trie func (ct *catchpointTracker) handleCommitError(dcc *deferredCommitContext) { // in cases where the commitRound fails, it is not certain that the merkle trie is in a clean state, and should be cleared. diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go index 6790344889..da2408946b 100644 --- a/ledger/catchpointtracker_test.go +++ b/ledger/catchpointtracker_test.go @@ -17,6 +17,7 @@ package ledger import ( + "bytes" "context" "encoding/hex" "errors" @@ -39,6 +40,7 @@ import ( "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/data/transactions" + "github.com/algorand/go-algorand/data/txntest" "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/go-algorand/ledger/store/trackerdb" ledgertesting "github.com/algorand/go-algorand/ledger/testing" @@ -48,6 +50,9 @@ import ( "github.com/algorand/go-algorand/test/partitiontest" ) +// assert catchpointTracker implements the trackerCommitLifetimeHandlers interface +var _ trackerCommitLifetimeHandlers = &catchpointTracker{} + func TestCatchpointIsWritingCatchpointFile(t *testing.T) { partitiontest.PartitionTest(t) @@ -2094,3 +2099,61 @@ func TestMakeCatchpointFilePath(t *testing.T) { } } + +// Test a case where in-memory SQLite, combined with fast locking (improved performance, or no +// deadlock detection) and concurrent reads (from transaction evaluation, stake lookups, etc) can +// cause the SQLite implementation in util/db/dbutil.go to retry the function looping over all +// tracker commitRound implementations. Since catchpointtracker' commitRound updates a merkle trie's +// DB storage and its in-memory cache, the retry can cause the the balancesTrie's cache to become +// corrupted and out of sync with the DB (which uses transaction rollback between retries). The +// merkle trie corruption manifests as error log messages like: +// - "attempted to add duplicate hash 'X' to merkle trie for account Y" +// - "failed to delete hash 'X' from merkle trie for account Y" +// +// So we assert that those errors do not occur after the fix in #6190. +// +//nolint:paralleltest // deadlock detection is globally disabled, so this test is not parallel-safe +func TestCatchpointTrackerFastRoundsDBRetry(t *testing.T) { + partitiontest.PartitionTest(t) + + var bufNewLogger bytes.Buffer + log := logging.NewLogger() + log.SetOutput(&bufNewLogger) + + // disabling deadlock detection globally causes the race detector to go off, but this + // bug can still happen even when deadlock detection is not disabled + //deadlock.Opts.Disable = true // disable deadlock detection during this test + //defer func() { deadlock.Opts.Disable = false }() + + genBalances, addrs, _ := ledgertesting.NewTestGenesis(func(cfg *ledgertesting.GenesisCfg) { + cfg.OnlineCount = 1 + ledgertesting.TurnOffRewards(cfg) + }) + cfg := config.GetDefaultLocal() + dl := NewDoubleLedger(t, genBalances, protocol.ConsensusFuture, cfg, simpleLedgerLogger(log)) // in-memory SQLite + defer dl.Close() + + appSrc := main(`int 1; int 1; ==; assert`) + app := dl.fundedApp(addrs[1], 1_000_000, appSrc) + + makeTxn := func() *txntest.Txn { + return &txntest.Txn{ + Type: "appl", + Sender: addrs[2], + ApplicationID: app, + Note: ledgertesting.RandomNote(), + } + } + + for vb := dl.fullBlock(makeTxn()); vb.Block().Round() <= 1500; vb = dl.fullBlock(makeTxn()) { + nextRnd := vb.Block().Round() + 1 + _, err := dl.generator.OnlineCirculation(nextRnd.SubSaturate(320), nextRnd) + require.NoError(t, err) + require.Empty(t, vb.Block().ExpiredParticipationAccounts) + require.Empty(t, vb.Block().AbsentParticipationAccounts) + } + + // assert that no corruption of merkle trie happened due to DB retries leaving + // incorrect state in the merkle trie cache. + require.NotContains(t, bufNewLogger.String(), "to merkle trie for account", "Merkle trie was corrupted!") +} diff --git a/ledger/metrics.go b/ledger/metrics.go index 7a56d58d56..9f8a0ace0d 100644 --- a/ledger/metrics.go +++ b/ledger/metrics.go @@ -84,16 +84,6 @@ func (mt *metricsTracker) postCommit(ctx context.Context, dcc *deferredCommitCon mt.ledgerDBRound.Set(uint64(dcc.newBase())) } -func (mt *metricsTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { -} - -func (mt *metricsTracker) handleUnorderedCommit(dcc *deferredCommitContext) { -} -func (mt *metricsTracker) handlePrepareCommitError(dcc *deferredCommitContext) { -} -func (mt *metricsTracker) handleCommitError(dcc *deferredCommitContext) { -} - func (mt *metricsTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { return dcr } diff --git a/ledger/notifier.go b/ledger/notifier.go index f97e1c77e6..c7a8996551 100644 --- a/ledger/notifier.go +++ b/ledger/notifier.go @@ -122,16 +122,6 @@ func (bn *blockNotifier) commitRound(context.Context, trackerdb.TransactionScope func (bn *blockNotifier) postCommit(ctx context.Context, dcc *deferredCommitContext) { } -func (bn *blockNotifier) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { -} - -func (bn *blockNotifier) handleUnorderedCommit(dcc *deferredCommitContext) { -} -func (bn *blockNotifier) handlePrepareCommitError(dcc *deferredCommitContext) { -} -func (bn *blockNotifier) handleCommitError(dcc *deferredCommitContext) { -} - func (bn *blockNotifier) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { return dcr } diff --git a/ledger/simple_test.go b/ledger/simple_test.go index 9d4a480b49..8b4632d1de 100644 --- a/ledger/simple_test.go +++ b/ledger/simple_test.go @@ -42,6 +42,7 @@ import ( type simpleLedgerCfg struct { onDisk bool // default is in-memory notArchival bool // default is archival + logger logging.Logger } type simpleLedgerOption func(*simpleLedgerCfg) @@ -54,6 +55,10 @@ func simpleLedgerNotArchival() simpleLedgerOption { return func(cfg *simpleLedgerCfg) { cfg.notArchival = true } } +func simpleLedgerLogger(l logging.Logger) simpleLedgerOption { + return func(cfg *simpleLedgerCfg) { cfg.logger = l } +} + func newSimpleLedgerWithConsensusVersion(t testing.TB, balances bookkeeping.GenesisBalances, cv protocol.ConsensusVersion, cfg config.Local, opts ...simpleLedgerOption) *Ledger { var genHash crypto.Digest crypto.RandBytes(genHash[:]) @@ -72,7 +77,11 @@ func newSimpleLedgerFull(t testing.TB, balances bookkeeping.GenesisBalances, cv dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64()) dbName = strings.Replace(dbName, "/", "_", -1) cfg.Archival = !slCfg.notArchival - l, err := OpenLedger(logging.Base(), dbName, !slCfg.onDisk, ledgercore.InitState{ + log := slCfg.logger + if log == nil { + log = logging.Base() + } + l, err := OpenLedger(log, dbName, !slCfg.onDisk, ledgercore.InitState{ Block: genBlock, Accounts: balances.Balances, GenesisHash: genHash, diff --git a/ledger/spverificationtracker.go b/ledger/spverificationtracker.go index b430368981..ba8b537d16 100644 --- a/ledger/spverificationtracker.go +++ b/ledger/spverificationtracker.go @@ -157,16 +157,6 @@ func (spt *spVerificationTracker) postCommit(_ context.Context, dcc *deferredCom spt.pendingDeleteContexts = spt.pendingDeleteContexts[dcc.spVerification.lastDeleteIndex+1:] } -func (spt *spVerificationTracker) postCommitUnlocked(context.Context, *deferredCommitContext) { -} - -func (spt *spVerificationTracker) handleUnorderedCommit(dcc *deferredCommitContext) { -} -func (spt *spVerificationTracker) handlePrepareCommitError(dcc *deferredCommitContext) { -} -func (spt *spVerificationTracker) handleCommitError(dcc *deferredCommitContext) { -} - func (spt *spVerificationTracker) close() { } diff --git a/ledger/spverificationtracker_test.go b/ledger/spverificationtracker_test.go index 7bba5eac87..d306d9b60f 100644 --- a/ledger/spverificationtracker_test.go +++ b/ledger/spverificationtracker_test.go @@ -88,7 +88,6 @@ func mockCommit(t *testing.T, spt *spVerificationTracker, ml *mockLedgerForTrack postCommitCtx, cancel := context.WithCancel(context.Background()) defer cancel() spt.postCommit(postCommitCtx, &dcc) - spt.postCommitUnlocked(postCommitCtx, &dcc) } func genesisBlock() *blockEntry { diff --git a/ledger/store/trackerdb/dualdriver/dualdriver.go b/ledger/store/trackerdb/dualdriver/dualdriver.go index cbcba9c480..e51b05929f 100644 --- a/ledger/store/trackerdb/dualdriver/dualdriver.go +++ b/ledger/store/trackerdb/dualdriver/dualdriver.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "reflect" + "sync" "time" "github.com/algorand/go-algorand/ledger/store/trackerdb" @@ -123,6 +124,10 @@ func (s *trackerStore) Transaction(fn trackerdb.TransactionFn) (err error) { return s.TransactionContext(context.Background(), fn) } +func (s *trackerStore) TransactionWithRetryClearFn(fn trackerdb.TransactionFn, rollbackFn trackerdb.RetryClearFn) error { + return s.TransactionContextWithRetryClearFn(context.Background(), fn, rollbackFn) +} + func (s *trackerStore) TransactionContext(ctx context.Context, fn trackerdb.TransactionFn) error { handle, err := s.BeginTransaction(ctx) if err != nil { @@ -138,6 +143,22 @@ func (s *trackerStore) TransactionContext(ctx context.Context, fn trackerdb.Tran return handle.Commit() } +func (s *trackerStore) TransactionContextWithRetryClearFn(ctx context.Context, fn trackerdb.TransactionFn, rollbackFn trackerdb.RetryClearFn) error { + var wg sync.WaitGroup + wg.Add(2) + var pErr, sErr error + go func() { + pErr = s.primary.TransactionContextWithRetryClearFn(ctx, fn, rollbackFn) + wg.Done() + }() + go func() { + sErr = s.secondary.TransactionContextWithRetryClearFn(ctx, fn, rollbackFn) + wg.Done() + }() + wg.Wait() + return coalesceErrors(pErr, sErr) +} + func (s *trackerStore) BeginTransaction(ctx context.Context) (trackerdb.Transaction, error) { primary, err := s.primary.BeginTransaction(ctx) if err != nil { diff --git a/ledger/store/trackerdb/pebbledbdriver/pebbledriver.go b/ledger/store/trackerdb/pebbledbdriver/pebbledriver.go index 7d1b394015..2bb64456b5 100644 --- a/ledger/store/trackerdb/pebbledbdriver/pebbledriver.go +++ b/ledger/store/trackerdb/pebbledbdriver/pebbledriver.go @@ -322,6 +322,11 @@ func (s *trackerStore) Transaction(fn trackerdb.TransactionFn) (err error) { return s.TransactionContext(context.Background(), fn) } +// TransactionWithRetryClearFn implements trackerdb.Store +func (s *trackerStore) TransactionWithRetryClearFn(fn trackerdb.TransactionFn, rollbackFn trackerdb.RetryClearFn) (err error) { + return s.TransactionContextWithRetryClearFn(context.Background(), fn, rollbackFn) +} + // TransactionContext implements trackerdb.Store func (s *trackerStore) TransactionContext(ctx context.Context, fn trackerdb.TransactionFn) (err error) { handle, err := s.BeginTransaction(ctx) @@ -345,6 +350,13 @@ func (s *trackerStore) TransactionContext(ctx context.Context, fn trackerdb.Tran return err } +// TransactionContextWithRetryClearFn implements trackerdb.Store. +// It ignores the RetryClearFn, since it does not need to retry +// transactions to work around SQLite issues like the sqlitedriver. +func (s *trackerStore) TransactionContextWithRetryClearFn(ctx context.Context, fn trackerdb.TransactionFn, _ trackerdb.RetryClearFn) error { + return s.TransactionContext(ctx, fn) +} + // BeginTransaction implements trackerdb.Store func (s *trackerStore) BeginTransaction(ctx context.Context) (trackerdb.Transaction, error) { scope := transactionScope{ diff --git a/ledger/store/trackerdb/sqlitedriver/sqlitedriver.go b/ledger/store/trackerdb/sqlitedriver/sqlitedriver.go index 07459fa0c0..54a080fe94 100644 --- a/ledger/store/trackerdb/sqlitedriver/sqlitedriver.go +++ b/ledger/store/trackerdb/sqlitedriver/sqlitedriver.go @@ -103,12 +103,22 @@ func (s *trackerSQLStore) Transaction(fn trackerdb.TransactionFn) (err error) { return wrapIOError(s.TransactionContext(context.Background(), fn)) } +func (s *trackerSQLStore) TransactionWithRetryClearFn(fn trackerdb.TransactionFn, rollbackFn trackerdb.RetryClearFn) (err error) { + return wrapIOError(s.TransactionContextWithRetryClearFn(context.Background(), fn, rollbackFn)) +} + func (s *trackerSQLStore) TransactionContext(ctx context.Context, fn trackerdb.TransactionFn) (err error) { return wrapIOError(s.pair.Wdb.AtomicContext(ctx, func(ctx context.Context, tx *sql.Tx) error { return fn(ctx, &sqlTransactionScope{tx, false, &sqlReader{tx}, &sqlWriter{tx}, &sqlCatchpoint{tx}}) })) } +func (s *trackerSQLStore) TransactionContextWithRetryClearFn(ctx context.Context, fn trackerdb.TransactionFn, rollbackFn trackerdb.RetryClearFn) (err error) { + return wrapIOError(s.pair.Wdb.AtomicContextWithRetryClearFn(ctx, func(ctx context.Context, tx *sql.Tx) error { + return fn(ctx, &sqlTransactionScope{tx, false, &sqlReader{tx}, &sqlWriter{tx}, &sqlCatchpoint{tx}}) + }, rollbackFn)) +} + func (s *trackerSQLStore) BeginTransaction(ctx context.Context) (trackerdb.Transaction, error) { handle, err := s.pair.Wdb.Handle.BeginTx(ctx, nil) if err != nil { diff --git a/ledger/store/trackerdb/store.go b/ledger/store/trackerdb/store.go index 735550ed96..257ff63842 100644 --- a/ledger/store/trackerdb/store.go +++ b/ledger/store/trackerdb/store.go @@ -40,7 +40,9 @@ type Store interface { BeginSnapshot(ctx context.Context) (Snapshot, error) // transaction support Transaction(fn TransactionFn) (err error) + TransactionWithRetryClearFn(TransactionFn, RetryClearFn) (err error) TransactionContext(ctx context.Context, fn TransactionFn) (err error) + TransactionContextWithRetryClearFn(context.Context, TransactionFn, RetryClearFn) (err error) BeginTransaction(ctx context.Context) (Transaction, error) // maintenance Vacuum(ctx context.Context) (stats db.VacuumStats, err error) @@ -153,3 +155,6 @@ type SnapshotFn func(ctx context.Context, tx SnapshotScope) error // TransactionFn is the callback lambda used in `Transaction`. type TransactionFn func(ctx context.Context, tx TransactionScope) error + +// RetryClearFn is the rollback callback lambda used in `TransactionWithRetryClearFn`. +type RetryClearFn func(ctx context.Context) diff --git a/ledger/store/trackerdb/testsuite/utils_test.go b/ledger/store/trackerdb/testsuite/utils_test.go index 826402a357..be11fd9ac8 100644 --- a/ledger/store/trackerdb/testsuite/utils_test.go +++ b/ledger/store/trackerdb/testsuite/utils_test.go @@ -228,6 +228,16 @@ func (db *mockDB) TransactionContext(ctx context.Context, fn trackerdb.Transacti return err } +// TransactionWithRetryClearFn implements trackerdb.Store but ignores the RetryClearFn +func (db *mockDB) TransactionWithRetryClearFn(fn trackerdb.TransactionFn, _ trackerdb.RetryClearFn) (err error) { + return db.TransactionContext(context.Background(), fn) +} + +// TransactionContextWithRetryClearFn implements trackerdb.Store but ignores the RetryClearFn +func (db *mockDB) TransactionContextWithRetryClearFn(ctx context.Context, fn trackerdb.TransactionFn, _ trackerdb.RetryClearFn) (err error) { + return db.TransactionContext(ctx, fn) +} + // BeginTransaction implements trackerdb.Store func (db *mockDB) BeginTransaction(ctx context.Context) (trackerdb.Transaction, error) { scope := mockTransaction{db, db.proto} diff --git a/ledger/tracker.go b/ledger/tracker.go index 716b8f8cfb..1f7950a1c2 100644 --- a/ledger/tracker.go +++ b/ledger/tracker.go @@ -110,11 +110,23 @@ type ledgerTracker interface { // by all the prepareCommit calls. The commitRound is being executed within a single transactional // context, and so, if any of the tracker's commitRound calls fails, the transaction is rolled back. commitRound(context.Context, trackerdb.TransactionScope, *deferredCommitContext) error + // postCommit is called only on a successful commitRound. In that case, each of the trackers have // the chance to update it's internal data structures, knowing that the given deferredCommitContext // has completed. An optional context is provided for long-running operations. postCommit(context.Context, *deferredCommitContext) + // close terminates the tracker, reclaiming any resources + // like open database connections or goroutines. close may + // be called even if loadFromDisk() is not called or does + // not succeed. + close() +} + +// trackerCommitLifetimeHandlers defines additional methods some ledgerTrackers +// might implement to manage and clear state on error or success. In practice, +// it is only used by the catchpointtracker. +type trackerCommitLifetimeHandlers interface { // postCommitUnlocked is called only on a successful commitRound. In that case, each of the trackers have // the chance to make changes that aren't state-dependent. // An optional context is provided for long-running operations. @@ -131,11 +143,12 @@ type ledgerTracker interface { // error during the commit phase of commitRound handleCommitError(*deferredCommitContext) - // close terminates the tracker, reclaiming any resources - // like open database connections or goroutines. close may - // be called even if loadFromDisk() is not called or does - // not succeed. - close() + // clearCommitRoundRetry is called after a failure is encountered in the transaction that commitRound + // uses. It allows trackers to clear any in-memory state associated with the commitRound work they + // did, since even if the tracker returns no error in commitRound, another tracker might be responsible + // for the rollback. The call to commitRound for the same round range may be retried after + // clearCommitRoundRetry is called. + clearCommitRoundRetry(context.Context, *deferredCommitContext) } // ledgerForTracker defines the part of the ledger that a tracker can @@ -561,7 +574,9 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) error { if tr.dbRound < dbRound || offset < uint64(tr.dbRound-dbRound) { tr.log.Warnf("out of order deferred commit: offset %d, dbRound %d but current tracker DB round is %d", offset, dbRound, tr.dbRound) for _, lt := range tr.trackers { - lt.handleUnorderedCommit(dcc) + if lt, ok := lt.(trackerCommitLifetimeHandlers); ok { + lt.handleUnorderedCommit(dcc) + } } tr.mu.RUnlock() return nil @@ -596,7 +611,9 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) error { } if err != nil { for _, lt := range tr.trackers { - lt.handlePrepareCommitError(dcc) + if lt, ok := lt.(trackerCommitLifetimeHandlers); ok { + lt.handlePrepareCommitError(dcc) + } } tr.mu.RUnlock() return err @@ -606,7 +623,7 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) error { start := time.Now() ledgerCommitroundCount.Inc(nil) - err = tr.dbs.Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) (err error) { + err = tr.dbs.TransactionWithRetryClearFn(func(ctx context.Context, tx trackerdb.TransactionScope) (err error) { // TransactionFn tr.accountsCommitting.Store(true) defer func() { tr.accountsCommitting.Store(false) @@ -625,13 +642,21 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) error { } return aw.UpdateAccountsRound(dbRound + basics.Round(offset)) + }, func(ctx context.Context) { // RetryClearFn + for _, lt := range tr.trackers { + if lt, ok := lt.(trackerCommitLifetimeHandlers); ok { + lt.clearCommitRoundRetry(ctx, dcc) + } + } }) ledgerCommitroundMicros.AddMicrosecondsSince(start, nil) if err != nil { for _, lt := range tr.trackers { - lt.handleCommitError(dcc) + if lt, ok := lt.(trackerCommitLifetimeHandlers); ok { + lt.handleCommitError(dcc) + } } tr.log.Warnf("unable to advance tracker db snapshot (%d-%d): %v", dbRound, dbRound+basics.Round(offset), err) @@ -653,7 +678,9 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) error { tr.mu.Unlock() for _, lt := range tr.trackers { - lt.postCommitUnlocked(tr.ctx, dcc) + if lt, ok := lt.(trackerCommitLifetimeHandlers); ok { + lt.postCommitUnlocked(tr.ctx, dcc) + } } tr.log.Debugf("commitRound completed for (%d-%d)", dbRound, dbRound+basics.Round(offset)) diff --git a/ledger/tracker_test.go b/ledger/tracker_test.go index 9c26223c39..3c7bf51faa 100644 --- a/ledger/tracker_test.go +++ b/ledger/tracker_test.go @@ -188,6 +188,9 @@ func (t *emptyTracker) postCommit(ctx context.Context, dcc *deferredCommitContex func (t *emptyTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { } +func (t *emptyTracker) clearCommitRoundRetry(ctx context.Context, dcc *deferredCommitContext) { +} + // control functions are not used by the emptyTracker func (t *emptyTracker) handleUnorderedCommit(dcc *deferredCommitContext) { } diff --git a/ledger/txtail.go b/ledger/txtail.go index 129fbb3985..92ce068ef3 100644 --- a/ledger/txtail.go +++ b/ledger/txtail.go @@ -333,16 +333,6 @@ func (t *txTail) postCommit(ctx context.Context, dcc *deferredCommitContext) { } } -func (t *txTail) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { -} - -func (t *txTail) handleUnorderedCommit(dcc *deferredCommitContext) { -} -func (t *txTail) handlePrepareCommitError(dcc *deferredCommitContext) { -} -func (t *txTail) handleCommitError(dcc *deferredCommitContext) { -} - func (t *txTail) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { return dcr } diff --git a/util/db/dbutil.go b/util/db/dbutil.go index a6e524464d..8b045ad70c 100644 --- a/util/db/dbutil.go +++ b/util/db/dbutil.go @@ -219,6 +219,14 @@ func (db *Accessor) Atomic(fn idemFn, extras ...interface{}) (err error) { // For transactions where readOnly is false, sync determines whether or not to wait for the result. // Like for Atomic, the return error of fn should be a native sqlite3.Error type or an error wrapping it. func (db *Accessor) AtomicContext(ctx context.Context, fn idemFn, extras ...interface{}) (err error) { + + return db.AtomicContextWithRetryClearFn(ctx, fn, nil, extras...) +} + +// AtomicContextWithRetryClearFn is like AtomicContext, but calls retryClearFn if the database +// txn was rolled back, due to error or in between retries. This helps a caller that +// might change in-memory state inside fn. +func (db *Accessor) AtomicContextWithRetryClearFn(ctx context.Context, fn idemFn, retryClearFn func(context.Context), extras ...interface{}) (err error) { atomicDeadline := time.Now().Add(time.Second) // note that the sql library will drop panics inside an active transaction @@ -294,9 +302,12 @@ func (db *Accessor) AtomicContext(ctx context.Context, fn idemFn, extras ...inte if err != nil { tx.Rollback() if dbretry(err) { - continue + if retryClearFn != nil { + retryClearFn(ctx) + } + continue // retry } else { - break + break // exit, returns error } } @@ -305,8 +316,13 @@ func (db *Accessor) AtomicContext(ctx context.Context, fn idemFn, extras ...inte // update the deadline, as it might have been updated. atomicDeadline = txContextData.deadline break - } else if !dbretry(err) { - break + } else if dbretry(err) { + if retryClearFn != nil { + retryClearFn(ctx) + } + continue // retry + } else { + break // exit, returns error } }