diff --git a/catchup/service.go b/catchup/service.go index bc23b3d736..3638360a97 100644 --- a/catchup/service.go +++ b/catchup/service.go @@ -63,6 +63,7 @@ type Ledger interface { Block(basics.Round) (bookkeeping.Block, error) BlockHdr(basics.Round) (bookkeeping.BlockHeader, error) IsWritingCatchpointDataFile() bool + IsBehindCommittingDeltas() bool Validate(ctx context.Context, blk bookkeeping.Block, executionPool execpool.BacklogPool) (*ledgercore.ValidatedBlock, error) AddValidatedBlock(vb ledgercore.ValidatedBlock, cert agreement.Certificate) error WaitMem(r basics.Round) chan struct{} @@ -86,10 +87,10 @@ type Service struct { deadlineTimeout time.Duration blockValidationPool execpool.BacklogPool - // suspendForCatchpointWriting defines whether we've run into a state where the ledger is currently busy writing the - // catchpoint file. If so, we want to suspend the catchup process until the catchpoint file writing is complete, + // suspendForLedgerOps defines whether we've run into a state where the ledger is currently busy writing the + // catchpoint file or flushing accounts. If so, we want to suspend the catchup process until the catchpoint file writing is complete, // and resume from there without stopping the catchup timer. - suspendForCatchpointWriting bool + suspendForLedgerOps bool // The channel gets closed when the initial sync is complete. This allows for other services to avoid // the overhead of starting prematurely (before this node is caught-up and can validate messages for example). @@ -494,11 +495,26 @@ func (s *Service) pipelinedFetch(seedLookback uint64) { return } + // if ledger is busy, pause for some time to let the fetchAndWrite goroutines to finish fetching in-flight blocks. + start := time.Now() + for (s.ledger.IsWritingCatchpointDataFile() || s.ledger.IsBehindCommittingDeltas()) && time.Since(start) < s.deadlineTimeout { + time.Sleep(100 * time.Millisecond) + } + + // if ledger is still busy after s.deadlineTimeout timeout then abort the current pipelinedFetch invocation. + // if we're writing a catchpoint file, stop catching up to reduce the memory pressure. Once we finish writing the file we // could resume with the catchup. if s.ledger.IsWritingCatchpointDataFile() { s.log.Info("Catchup is stopping due to catchpoint file being written") - s.suspendForCatchpointWriting = true + s.suspendForLedgerOps = true + return + } + + // if the ledger has too many non-flushed account changes, stop catching up to reduce the memory pressure. + if s.ledger.IsBehindCommittingDeltas() { + s.log.Info("Catchup is stopping due to too many non-flushed account changes") + s.suspendForLedgerOps = true return } @@ -555,10 +571,10 @@ func (s *Service) periodicSync() { sleepDuration = time.Duration(crypto.RandUint63()) % s.deadlineTimeout continue case <-s.syncNow: - if s.parallelBlocks == 0 || s.ledger.IsWritingCatchpointDataFile() { + if s.parallelBlocks == 0 || s.ledger.IsWritingCatchpointDataFile() || s.ledger.IsBehindCommittingDeltas() { continue } - s.suspendForCatchpointWriting = false + s.suspendForLedgerOps = false s.log.Info("Immediate resync triggered; resyncing") s.sync() case <-time.After(sleepDuration): @@ -575,7 +591,12 @@ func (s *Service) periodicSync() { // keep the existing sleep duration and try again later. continue } - s.suspendForCatchpointWriting = false + // if the ledger has too many non-flushed account changes, skip + if s.ledger.IsBehindCommittingDeltas() { + continue + } + + s.suspendForLedgerOps = false s.log.Info("It's been too long since our ledger advanced; resyncing") s.sync() case cert := <-s.unmatchedPendingCertificates: @@ -630,7 +651,7 @@ func (s *Service) sync() { initSync := false // if the catchupWriting flag is set, it means that we aborted the sync due to the ledger writing the catchup file. - if !s.suspendForCatchpointWriting { + if !s.suspendForLedgerOps { // in that case, don't change the timer so that the "timer" would keep running. atomic.StoreInt64(&s.syncStartNS, 0) @@ -641,7 +662,7 @@ func (s *Service) sync() { } } - elapsedTime := time.Now().Sub(start) + elapsedTime := time.Since(start) s.log.EventWithDetails(telemetryspec.ApplicationState, telemetryspec.CatchupStopEvent, telemetryspec.CatchupStopEventDetails{ StartRound: uint64(pr), EndRound: uint64(s.ledger.LastRound()), diff --git a/catchup/service_test.go b/catchup/service_test.go index 6807b11194..43f8ab6f6e 100644 --- a/catchup/service_test.go +++ b/catchup/service_test.go @@ -830,6 +830,18 @@ func (m *mockedLedger) IsWritingCatchpointDataFile() bool { return false } +func (m *mockedLedger) IsBehindCommittingDeltas() bool { + return false +} + +type mockedBehindDeltasLedger struct { + mockedLedger +} + +func (m *mockedBehindDeltasLedger) IsBehindCommittingDeltas() bool { + return true +} + func testingenvWithUpgrade( t testing.TB, numBlocks, @@ -1127,3 +1139,49 @@ func TestDownloadBlocksToSupportStateProofs(t *testing.T) { lookback = lookbackForStateproofsSupport(&topBlk) assert.Equal(t, uint64(0), lookback) } + +// TestServiceLedgerUnavailable checks a local ledger that is unavailable cannot catchup up to remote round +func TestServiceLedgerUnavailable(t *testing.T) { + partitiontest.PartitionTest(t) + + // Make Ledger + local := new(mockedBehindDeltasLedger) + local.blocks = append(local.blocks, bookkeeping.Block{}) + + remote, _, blk, err := buildTestLedger(t, bookkeeping.Block{}) + if err != nil { + t.Fatal(err) + return + } + numBlocks := 10 + addBlocks(t, remote, blk, numBlocks) + + // Create a network and block service + blockServiceConfig := config.GetDefaultLocal() + net := &httpTestPeerSource{} + ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID") + + nodeA := basicRPCNode{} + nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls) + nodeA.start() + defer nodeA.stop() + rootURL := nodeA.rootURL() + net.addPeer(rootURL) + + require.Equal(t, basics.Round(0), local.LastRound()) + require.Equal(t, basics.Round(numBlocks+1), remote.LastRound()) + + // Make Service + auth := &mockedAuthenticator{fail: false} + cfg := config.GetDefaultLocal() + cfg.CatchupParallelBlocks = 2 + s := MakeService(logging.Base(), cfg, net, local, auth, nil, nil) + s.log = &periodicSyncLogger{Logger: logging.Base()} + s.deadlineTimeout = 2 * time.Second + + s.testStart() + defer s.Stop() + s.sync() + require.Greater(t, local.LastRound(), basics.Round(0)) + require.Less(t, local.LastRound(), remote.LastRound()) +} diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go index 4e14a1ab1d..4fddd641fe 100644 --- a/ledger/catchpointtracker_test.go +++ b/ledger/catchpointtracker_test.go @@ -773,6 +773,7 @@ func TestCatchpointReproducibleLabels(t *testing.T) { // blockingTracker is a testing tracker used to test "what if" a tracker would get blocked. type blockingTracker struct { + emptyTracker postCommitUnlockedEntryLock chan struct{} postCommitUnlockedReleaseLock chan struct{} postCommitEntryLock chan struct{} @@ -783,36 +784,12 @@ type blockingTracker struct { shouldLockPostCommitUnlocked atomic.Bool } -// loadFromDisk is not implemented in the blockingTracker. -func (bt *blockingTracker) loadFromDisk(ledgerForTracker, basics.Round) error { - return nil -} - -// newBlock is not implemented in the blockingTracker. -func (bt *blockingTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) { -} - // committedUpTo in the blockingTracker just stores the committed round. func (bt *blockingTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) { atomic.StoreInt64(&bt.committedUpToRound, int64(committedRnd)) return committedRnd, basics.Round(0) } -// produceCommittingTask is not used by the blockingTracker -func (bt *blockingTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { - return dcr -} - -// prepareCommit, is not used by the blockingTracker -func (bt *blockingTracker) prepareCommit(*deferredCommitContext) error { - return nil -} - -// commitRound is not used by the blockingTracker -func (bt *blockingTracker) commitRound(context.Context, trackerdb.TransactionScope, *deferredCommitContext) error { - return nil -} - // postCommit implements entry/exit blockers, designed for testing. func (bt *blockingTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) { if bt.alwaysLock.Load() || dcc.catchpointFirstStage || bt.shouldLockPostCommit.Load() { @@ -829,18 +806,6 @@ func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferred } } -// control functions are not used by the blockingTracker -func (bt *blockingTracker) handleUnorderedCommit(dcc *deferredCommitContext) { -} -func (bt *blockingTracker) handlePrepareCommitError(dcc *deferredCommitContext) { -} -func (bt *blockingTracker) handleCommitError(dcc *deferredCommitContext) { -} - -// close is not used by the blockingTracker -func (bt *blockingTracker) close() { -} - func TestCatchpointTrackerNonblockingCatchpointWriting(t *testing.T) { partitiontest.PartitionTest(t) diff --git a/ledger/ledger.go b/ledger/ledger.go index dc3baaf766..1617007aef 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -898,6 +898,12 @@ func (l *Ledger) LatestTrackerCommitted() basics.Round { return l.trackers.getDbRound() } +// IsBehindCommittingDeltas indicates if the ledger is behind expected number of in-memory deltas. +// It intended to slow down the catchup service when deltas overgrow some limit. +func (l *Ledger) IsBehindCommittingDeltas() bool { + return l.trackers.isBehindCommittingDeltas(l.Latest()) +} + // DebuggerLedger defines the minimal set of method required for creating a debug balances. type DebuggerLedger = eval.LedgerForCowBase diff --git a/ledger/tracker.go b/ledger/tracker.go index c5f3e84781..7ad5ba6641 100644 --- a/ledger/tracker.go +++ b/ledger/tracker.go @@ -22,6 +22,7 @@ import ( "fmt" "reflect" "sync" + "sync/atomic" "time" "github.com/algorand/go-algorand/config" @@ -175,6 +176,8 @@ type trackerRegistry struct { // accountsWriting provides synchronization around the background writing of account balances. accountsWriting sync.WaitGroup + // accountsCommitting is set when trackers registry writing accounts into DB. + accountsCommitting atomic.Bool // dbRound is always exactly accountsRound(), // cached to avoid SQL queries. @@ -196,8 +199,16 @@ type trackerRegistry struct { lastFlushTime time.Time cfg config.Local + + // maxAccountDeltas is a maximum number of in-memory deltas stored by trackers. + // When exceeded trackerRegistry will attempt to flush, and its Available() method will return false. + // Too many in-memory deltas could cause the node to run out of memory. + maxAccountDeltas uint64 } +// defaultMaxAccountDeltas is a default value for maxAccountDeltas. +const defaultMaxAccountDeltas = 256 + // deferredCommitRange is used during the calls to produceCommittingTask, and used as a data structure // to syncronize the various trackers and create a uniformity around which rounds need to be persisted // next. @@ -285,7 +296,7 @@ func (dcc deferredCommitContext) newBase() basics.Round { return dcc.oldBase + basics.Round(dcc.offset) } -var errMissingAccountUpdateTracker = errors.New("initializeTrackerCaches : called without a valid accounts update tracker") +var errMissingAccountUpdateTracker = errors.New("trackers replay : called without a valid accounts update tracker") func (tr *trackerRegistry) initialize(l ledgerForTracker, trackers []ledgerTracker, cfg config.Local) (err error) { tr.mu.Lock() @@ -293,18 +304,10 @@ func (tr *trackerRegistry) initialize(l ledgerForTracker, trackers []ledgerTrack tr.dbs = l.trackerDB() tr.log = l.trackerLog() - err = tr.dbs.Snapshot(func(ctx context.Context, tx trackerdb.SnapshotScope) (err error) { - ar, err := tx.MakeAccountsReader() - if err != nil { - return err - } - - tr.dbRound, err = ar.AccountsRound() - return err - }) - - if err != nil { - return err + tr.maxAccountDeltas = defaultMaxAccountDeltas + if cfg.MaxAcctLookback > tr.maxAccountDeltas { + tr.maxAccountDeltas = cfg.MaxAcctLookback + 1 + tr.log.Infof("maxAccountDeltas was overridden to %d because of MaxAcctLookback=%d: this combination might use lots of RAM. To preserve some blocks in blockdb consider using MaxBlockHistoryLookback config option instead of MaxAcctLookback", tr.maxAccountDeltas, cfg.MaxAcctLookback) } tr.ctx, tr.ctxCancel = context.WithCancel(context.Background()) @@ -333,24 +336,38 @@ func (tr *trackerRegistry) initialize(l ledgerForTracker, trackers []ledgerTrack } func (tr *trackerRegistry) loadFromDisk(l ledgerForTracker) error { + var dbRound basics.Round + err := tr.dbs.Snapshot(func(ctx context.Context, tx trackerdb.SnapshotScope) (err error) { + ar, err0 := tx.MakeAccountsReader() + if err0 != nil { + return err0 + } + + dbRound, err0 = ar.AccountsRound() + return err0 + }) + if err != nil { + return err + } + tr.mu.RLock() - dbRound := tr.dbRound + tr.dbRound = dbRound tr.mu.RUnlock() for _, lt := range tr.trackers { - err := lt.loadFromDisk(l, dbRound) - if err != nil { + err0 := lt.loadFromDisk(l, dbRound) + if err0 != nil { // find the tracker name. trackerName := reflect.TypeOf(lt).String() - return fmt.Errorf("tracker %s failed to loadFromDisk : %w", trackerName, err) + return fmt.Errorf("tracker %s failed to loadFromDisk : %w", trackerName, err0) } } - err := tr.replay(l) - if err != nil { - err = fmt.Errorf("initializeTrackerCaches failed : %w", err) + if err0 := tr.replay(l); err0 != nil { + return fmt.Errorf("trackers replay failed : %w", err0) } - return err + + return nil } func (tr *trackerRegistry) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) { @@ -456,6 +473,20 @@ func (tr *trackerRegistry) waitAccountsWriting() { tr.accountsWriting.Wait() } +func (tr *trackerRegistry) isBehindCommittingDeltas(latest basics.Round) bool { + tr.mu.RLock() + dbRound := tr.dbRound + tr.mu.RUnlock() + + numDeltas := uint64(latest.SubSaturate(dbRound)) + if numDeltas < tr.maxAccountDeltas { + return false + } + + // there is a large number of deltas check if commitSyncer is not writing accounts + return tr.accountsCommitting.Load() +} + func (tr *trackerRegistry) close() { if tr.ctxCancel != nil { tr.ctxCancel() @@ -562,6 +593,11 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) error { start := time.Now() ledgerCommitroundCount.Inc(nil) err = tr.dbs.Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) (err error) { + tr.accountsCommitting.Store(true) + defer func() { + tr.accountsCommitting.Store(false) + }() + aw, err := tx.MakeAccountsWriter() if err != nil { return err diff --git a/ledger/tracker_test.go b/ledger/tracker_test.go index 730c315e80..d16fb925c1 100644 --- a/ledger/tracker_test.go +++ b/ledger/tracker_test.go @@ -150,80 +150,73 @@ func TestTrackerScheduleCommit(t *testing.T) { a.Equal(expectedOffset, dc.offset) } -type ioErrorTracker struct { +type emptyTracker struct { } -// loadFromDisk is not implemented in the blockingTracker. -func (io *ioErrorTracker) loadFromDisk(ledgerForTracker, basics.Round) error { +// loadFromDisk is not implemented in the emptyTracker. +func (t *emptyTracker) loadFromDisk(ledgerForTracker, basics.Round) error { return nil } -// newBlock is not implemented in the blockingTracker. -func (io *ioErrorTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) { +// newBlock is not implemented in the emptyTracker. +func (t *emptyTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) { } -// committedUpTo in the blockingTracker just stores the committed round. -func (io *ioErrorTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) { +// committedUpTo in the emptyTracker just stores the committed round. +func (t *emptyTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) { return 0, basics.Round(0) } -func (io *ioErrorTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { +func (t *emptyTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { return dcr } -// prepareCommit, is not used by the blockingTracker -func (io *ioErrorTracker) prepareCommit(*deferredCommitContext) error { +// prepareCommit, is not used by the emptyTracker +func (t *emptyTracker) prepareCommit(*deferredCommitContext) error { return nil } -// commitRound is not used by the blockingTracker -func (io *ioErrorTracker) commitRound(context.Context, trackerdb.TransactionScope, *deferredCommitContext) error { - return sqlite3.Error{Code: sqlite3.ErrIoErr} +// commitRound is not used by the emptyTracker +func (t *emptyTracker) commitRound(context.Context, trackerdb.TransactionScope, *deferredCommitContext) error { + return nil } -func (io *ioErrorTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) { +func (t *emptyTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) { } // postCommitUnlocked implements entry/exit blockers, designed for testing. -func (io *ioErrorTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { +func (t *emptyTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { } -// control functions are not used by the blockingTracker -func (io *ioErrorTracker) handleUnorderedCommit(dcc *deferredCommitContext) { +// control functions are not used by the emptyTracker +func (t *emptyTracker) handleUnorderedCommit(dcc *deferredCommitContext) { } -func (io *ioErrorTracker) handlePrepareCommitError(dcc *deferredCommitContext) { +func (t *emptyTracker) handlePrepareCommitError(dcc *deferredCommitContext) { } -func (io *ioErrorTracker) handleCommitError(dcc *deferredCommitContext) { +func (t *emptyTracker) handleCommitError(dcc *deferredCommitContext) { } -// close is not used by the blockingTracker -func (io *ioErrorTracker) close() { +// close is not used by the emptyTracker +func (t *emptyTracker) close() { } -func (io *ioErrorTracker) reset() { +type ioErrorTracker struct { + emptyTracker +} + +// commitRound is not used by the ioErrorTracker +func (io *ioErrorTracker) commitRound(context.Context, trackerdb.TransactionScope, *deferredCommitContext) error { + return sqlite3.Error{Code: sqlite3.ErrIoErr} } type producePrepareBlockingTracker struct { + emptyTracker produceReleaseLock chan struct{} prepareCommitEntryLock chan struct{} prepareCommitReleaseLock chan struct{} cancelTasks bool } -// loadFromDisk is not implemented in the blockingTracker. -func (bt *producePrepareBlockingTracker) loadFromDisk(ledgerForTracker, basics.Round) error { - return nil -} - -// newBlock is not implemented in the blockingTracker. -func (bt *producePrepareBlockingTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) { -} - -// committedUpTo in the blockingTracker just stores the committed round. -func (bt *producePrepareBlockingTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) { - return 0, basics.Round(0) -} - func (bt *producePrepareBlockingTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { if bt.cancelTasks { return nil @@ -240,30 +233,6 @@ func (bt *producePrepareBlockingTracker) prepareCommit(*deferredCommitContext) e return nil } -// commitRound is not used by the blockingTracker -func (bt *producePrepareBlockingTracker) commitRound(context.Context, trackerdb.TransactionScope, *deferredCommitContext) error { - return nil -} - -func (bt *producePrepareBlockingTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) { -} - -// postCommitUnlocked implements entry/exit blockers, designed for testing. -func (bt *producePrepareBlockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { -} - -// control functions are not used by the blockingTracker -func (bt *producePrepareBlockingTracker) handleUnorderedCommit(dcc *deferredCommitContext) { -} -func (bt *producePrepareBlockingTracker) handlePrepareCommitError(dcc *deferredCommitContext) { -} -func (bt *producePrepareBlockingTracker) handleCommitError(dcc *deferredCommitContext) { -} - -// close is not used by the blockingTracker -func (bt *producePrepareBlockingTracker) close() { -} - func (bt *producePrepareBlockingTracker) reset() { bt.prepareCommitEntryLock = make(chan struct{}) bt.prepareCommitReleaseLock = make(chan struct{}) @@ -271,7 +240,18 @@ func (bt *producePrepareBlockingTracker) reset() { bt.cancelTasks = false } -// TestTrackerDbRoundDataRace checks for dbRound data race +type commitRoundStallingTracker struct { + emptyTracker + commitRoundLock chan struct{} +} + +// commitRound is not used by the blockingTracker +func (st *commitRoundStallingTracker) commitRound(context.Context, trackerdb.TransactionScope, *deferredCommitContext) error { + <-st.commitRoundLock + return nil +} + +// TestTrackers_DbRoundDataRace checks for dbRound data race // when commit scheduling relies on dbRound from the tracker registry but tracker's deltas // are used in calculations // 1. Add say 128 + MaxAcctLookback (MaxLookback) blocks and commit @@ -280,7 +260,7 @@ func (bt *producePrepareBlockingTracker) reset() { // 4. Set a block in produceCommittingTask, add a new block and resume the commit // 5. Resume produceCommittingTask // 6. The data race and panic happens in block queue syncher thread -func TestTrackerDbRoundDataRace(t *testing.T) { +func TestTrackers_DbRoundDataRace(t *testing.T) { partitiontest.PartitionTest(t) t.Skip("For manual run when touching ledger locking") @@ -367,7 +347,7 @@ func TestTrackerDbRoundDataRace(t *testing.T) { close(stallingTracker.produceReleaseLock) } -func TestCommitRoundIOError(t *testing.T) { +func TestTrackers_CommitRoundIOError(t *testing.T) { partitiontest.PartitionTest(t) a := require.New(t) @@ -414,7 +394,119 @@ func TestCommitRoundIOError(t *testing.T) { a.True(flag.Load()) } -func TestAccountUpdatesLedgerEvaluatorNoBlockHdr(t *testing.T) { +// TestTrackers_BusyCommitting ensures trackerRegistry.busy() is set when commitRound is in progress +func TestTrackers_BusyCommitting(t *testing.T) { + partitiontest.PartitionTest(t) + a := require.New(t) + + genesisInitState, _ := ledgertesting.GenerateInitState(t, protocol.ConsensusCurrentVersion, 1) + const inMem = true + log := logging.TestingLog(t) + log.SetLevel(logging.Warn) + cfg := config.GetDefaultLocal() + ledger, err := OpenLedger(log, t.Name(), inMem, genesisInitState, cfg) + a.NoError(err) + defer ledger.Close() + + // quit the commitSyncer goroutine + ledger.trackers.ctxCancel() + ledger.trackers.ctxCancel = nil + <-ledger.trackers.commitSyncerClosed + ledger.trackers.commitSyncerClosed = nil + + tracker := &commitRoundStallingTracker{ + commitRoundLock: make(chan struct{}), + } + ledger.trackerMu.Lock() + ledger.trackers.mu.Lock() + ledger.trackers.trackers = append([]ledgerTracker{tracker}, ledger.trackers.trackers...) + ledger.trackers.lastFlushTime = time.Time{} + ledger.trackers.mu.Unlock() + ledger.trackerMu.Unlock() + + // add some blocks + blk := genesisInitState.Block + for i := basics.Round(0); i < basics.Round(cfg.MaxAcctLookback)+1; i++ { + blk.BlockHeader.Round++ + blk.BlockHeader.TimeStamp++ + ledger.trackers.newBlock(blk, ledgercore.StateDelta{}) + } + + // manually trigger a commit + ledger.trackers.committedUpTo(blk.BlockHeader.Round) + dcc := <-ledger.trackers.deferredCommits + go func() { + err = ledger.trackers.commitRound(dcc) + a.NoError(err) + }() + + // commitRoundStallingTracker blocks commitRound in the goroutine above, wait few secs to ensure the trackerRegistry has set busy() + a.Eventually(func() bool { + return ledger.trackers.accountsCommitting.Load() + }, 3*time.Second, 50*time.Millisecond) + close(tracker.commitRoundLock) + ledger.trackers.waitAccountsWriting() + a.False(ledger.trackers.accountsCommitting.Load()) +} + +func TestTrackers_InitializeMaxAccountDeltas(t *testing.T) { + partitiontest.PartitionTest(t) + a := require.New(t) + + accts := setupAccts(20) + ml := makeMockLedgerForTracker(t, true, 1, protocol.ConsensusCurrentVersion, accts) + defer ml.Close() + tr := trackerRegistry{} + + cfg := config.GetDefaultLocal() + err := tr.initialize(ml, []ledgerTracker{}, cfg) + a.NoError(err) + // quit the commitSyncer goroutine + tr.ctxCancel() + tr.ctxCancel = nil + <-tr.commitSyncerClosed + tr.commitSyncerClosed = nil + a.Equal(uint64(defaultMaxAccountDeltas), tr.maxAccountDeltas) + + cfg.MaxAcctLookback = defaultMaxAccountDeltas + 100 + err = tr.initialize(ml, []ledgerTracker{}, cfg) + a.NoError(err) + // quit the commitSyncer goroutine + tr.ctxCancel() + tr.ctxCancel = nil + <-tr.commitSyncerClosed + tr.commitSyncerClosed = nil + a.Equal(cfg.MaxAcctLookback+1, tr.maxAccountDeltas) +} + +func TestTrackers_IsBehindCommittingDeltas(t *testing.T) { + partitiontest.PartitionTest(t) + a := require.New(t) + + tr := trackerRegistry{ + accts: &accountUpdates{}, + maxAccountDeltas: defaultMaxAccountDeltas, + } + + latest := basics.Round(0) + a.False(tr.isBehindCommittingDeltas(latest)) + + // no deltas but busy committing => not behind + tr.accountsCommitting.Store(true) + a.False(tr.isBehindCommittingDeltas(latest)) + tr.accountsCommitting.Store(false) + + // lots of deltas but not committing => not behind + latest = basics.Round(defaultMaxAccountDeltas + 10) + tr.dbRound = 0 + a.False(tr.isBehindCommittingDeltas(latest)) + + // lots of deltas and committing => behind + tr.accountsCommitting.Store(true) + a.True(tr.isBehindCommittingDeltas(latest)) +} + +func TestTrackers_AccountUpdatesLedgerEvaluatorNoBlockHdr(t *testing.T) { partitiontest.PartitionTest(t) aul := &accountUpdatesLedgerEvaluator{