From 81d0c8496eb24eb6dcbe3f3010abb3d404fc803b Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 19 Oct 2023 13:45:03 -0400 Subject: [PATCH 01/13] catchup: suspend catchup if ledger not ready --- catchup/service.go | 27 ++++++++++++++++++++------- catchup/service_test.go | 4 ++++ ledger/ledger.go | 16 ++++++++++++++++ ledger/tracker.go | 13 +++++++++++++ 4 files changed, 53 insertions(+), 7 deletions(-) diff --git a/catchup/service.go b/catchup/service.go index bc23b3d736..9774fde133 100644 --- a/catchup/service.go +++ b/catchup/service.go @@ -63,6 +63,7 @@ type Ledger interface { Block(basics.Round) (bookkeeping.Block, error) BlockHdr(basics.Round) (bookkeeping.BlockHeader, error) IsWritingCatchpointDataFile() bool + Available() bool Validate(ctx context.Context, blk bookkeeping.Block, executionPool execpool.BacklogPool) (*ledgercore.ValidatedBlock, error) AddValidatedBlock(vb ledgercore.ValidatedBlock, cert agreement.Certificate) error WaitMem(r basics.Round) chan struct{} @@ -86,10 +87,10 @@ type Service struct { deadlineTimeout time.Duration blockValidationPool execpool.BacklogPool - // suspendForCatchpointWriting defines whether we've run into a state where the ledger is currently busy writing the + // suspendForLedgerOps defines whether we've run into a state where the ledger is currently busy writing the // catchpoint file. If so, we want to suspend the catchup process until the catchpoint file writing is complete, // and resume from there without stopping the catchup timer. - suspendForCatchpointWriting bool + suspendForLedgerOps bool // The channel gets closed when the initial sync is complete. This allows for other services to avoid // the overhead of starting prematurely (before this node is caught-up and can validate messages for example). @@ -498,7 +499,14 @@ func (s *Service) pipelinedFetch(seedLookback uint64) { // could resume with the catchup. if s.ledger.IsWritingCatchpointDataFile() { s.log.Info("Catchup is stopping due to catchpoint file being written") - s.suspendForCatchpointWriting = true + s.suspendForLedgerOps = true + return + } + + // if the ledger has too many non-flushed account changes, stop catching up to reduce the memory pressure. + if !s.ledger.Available() { + s.log.Info("Catchup is stopping due to too many non-flushed account changes") + s.suspendForLedgerOps = true return } @@ -555,10 +563,10 @@ func (s *Service) periodicSync() { sleepDuration = time.Duration(crypto.RandUint63()) % s.deadlineTimeout continue case <-s.syncNow: - if s.parallelBlocks == 0 || s.ledger.IsWritingCatchpointDataFile() { + if s.parallelBlocks == 0 || s.ledger.IsWritingCatchpointDataFile() || !s.ledger.Available() { continue } - s.suspendForCatchpointWriting = false + s.suspendForLedgerOps = false s.log.Info("Immediate resync triggered; resyncing") s.sync() case <-time.After(sleepDuration): @@ -575,7 +583,12 @@ func (s *Service) periodicSync() { // keep the existing sleep duration and try again later. continue } - s.suspendForCatchpointWriting = false + // if the ledger has too many non-flushed account changes, skip + if !s.ledger.Available() { + continue + } + + s.suspendForLedgerOps = false s.log.Info("It's been too long since our ledger advanced; resyncing") s.sync() case cert := <-s.unmatchedPendingCertificates: @@ -630,7 +643,7 @@ func (s *Service) sync() { initSync := false // if the catchupWriting flag is set, it means that we aborted the sync due to the ledger writing the catchup file. - if !s.suspendForCatchpointWriting { + if !s.suspendForLedgerOps { // in that case, don't change the timer so that the "timer" would keep running. atomic.StoreInt64(&s.syncStartNS, 0) diff --git a/catchup/service_test.go b/catchup/service_test.go index 6807b11194..466a69cdb6 100644 --- a/catchup/service_test.go +++ b/catchup/service_test.go @@ -830,6 +830,10 @@ func (m *mockedLedger) IsWritingCatchpointDataFile() bool { return false } +func (m *mockedLedger) Available() bool { + return true +} + func testingenvWithUpgrade( t testing.TB, numBlocks, diff --git a/ledger/ledger.go b/ledger/ledger.go index dc3baaf766..0d1fc7c255 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -898,6 +898,22 @@ func (l *Ledger) LatestTrackerCommitted() basics.Round { return l.trackers.getDbRound() } +// maxNonFlushedAccountRounds is a maximum number of not flushed yet account rounds (in-memory deltas) +// that we allow to have before we stop the catchup process. +// To many in-memory deltas can cause the node to run out of memory. +const maxAccountDeltas = 256 + +// Available returns indicates if the ledger is can safely accept more blocks. +// It is intended use with catchup to slowdown when deltas overgrow some limit. +func (l *Ledger) Available() bool { + deltas := l.Latest().SubSaturate(l.trackers.getDbRound()) + if deltas < maxAccountDeltas { + return true + } + + return !l.trackers.busy() +} + // DebuggerLedger defines the minimal set of method required for creating a debug balances. type DebuggerLedger = eval.LedgerForCowBase diff --git a/ledger/tracker.go b/ledger/tracker.go index c5f3e84781..b727ed8ed4 100644 --- a/ledger/tracker.go +++ b/ledger/tracker.go @@ -22,6 +22,7 @@ import ( "fmt" "reflect" "sync" + "sync/atomic" "time" "github.com/algorand/go-algorand/config" @@ -175,6 +176,8 @@ type trackerRegistry struct { // accountsWriting provides synchronization around the background writing of account balances. accountsWriting sync.WaitGroup + // accountsCommitting is set when trackers registry writing accounts into DB. + accountsCommitting atomic.Bool // dbRound is always exactly accountsRound(), // cached to avoid SQL queries. @@ -456,6 +459,11 @@ func (tr *trackerRegistry) waitAccountsWriting() { tr.accountsWriting.Wait() } +// busy returns true if the trackerRegistry is actively writing accounts into DB. +func (tr *trackerRegistry) busy() bool { + return tr.accountsCommitting.Load() +} + func (tr *trackerRegistry) close() { if tr.ctxCancel != nil { tr.ctxCancel() @@ -562,6 +570,11 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) error { start := time.Now() ledgerCommitroundCount.Inc(nil) err = tr.dbs.Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) (err error) { + tr.accountsCommitting.Store(true) + defer func() { + tr.accountsCommitting.Store(false) + }() + aw, err := tx.MakeAccountsWriter() if err != nil { return err From bdeea021e5317d7175f5ac98d24b6cf7937fce55 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 19 Oct 2023 15:21:40 -0400 Subject: [PATCH 02/13] add emptyTracker to remove some code duplication --- ledger/catchpointtracker_test.go | 37 +-------------- ledger/tracker_test.go | 81 ++++++++++---------------------- 2 files changed, 26 insertions(+), 92 deletions(-) diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go index 4e14a1ab1d..4fddd641fe 100644 --- a/ledger/catchpointtracker_test.go +++ b/ledger/catchpointtracker_test.go @@ -773,6 +773,7 @@ func TestCatchpointReproducibleLabels(t *testing.T) { // blockingTracker is a testing tracker used to test "what if" a tracker would get blocked. type blockingTracker struct { + emptyTracker postCommitUnlockedEntryLock chan struct{} postCommitUnlockedReleaseLock chan struct{} postCommitEntryLock chan struct{} @@ -783,36 +784,12 @@ type blockingTracker struct { shouldLockPostCommitUnlocked atomic.Bool } -// loadFromDisk is not implemented in the blockingTracker. -func (bt *blockingTracker) loadFromDisk(ledgerForTracker, basics.Round) error { - return nil -} - -// newBlock is not implemented in the blockingTracker. -func (bt *blockingTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) { -} - // committedUpTo in the blockingTracker just stores the committed round. func (bt *blockingTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) { atomic.StoreInt64(&bt.committedUpToRound, int64(committedRnd)) return committedRnd, basics.Round(0) } -// produceCommittingTask is not used by the blockingTracker -func (bt *blockingTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { - return dcr -} - -// prepareCommit, is not used by the blockingTracker -func (bt *blockingTracker) prepareCommit(*deferredCommitContext) error { - return nil -} - -// commitRound is not used by the blockingTracker -func (bt *blockingTracker) commitRound(context.Context, trackerdb.TransactionScope, *deferredCommitContext) error { - return nil -} - // postCommit implements entry/exit blockers, designed for testing. func (bt *blockingTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) { if bt.alwaysLock.Load() || dcc.catchpointFirstStage || bt.shouldLockPostCommit.Load() { @@ -829,18 +806,6 @@ func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferred } } -// control functions are not used by the blockingTracker -func (bt *blockingTracker) handleUnorderedCommit(dcc *deferredCommitContext) { -} -func (bt *blockingTracker) handlePrepareCommitError(dcc *deferredCommitContext) { -} -func (bt *blockingTracker) handleCommitError(dcc *deferredCommitContext) { -} - -// close is not used by the blockingTracker -func (bt *blockingTracker) close() { -} - func TestCatchpointTrackerNonblockingCatchpointWriting(t *testing.T) { partitiontest.PartitionTest(t) diff --git a/ledger/tracker_test.go b/ledger/tracker_test.go index 730c315e80..0df319ef70 100644 --- a/ledger/tracker_test.go +++ b/ledger/tracker_test.go @@ -150,80 +150,73 @@ func TestTrackerScheduleCommit(t *testing.T) { a.Equal(expectedOffset, dc.offset) } -type ioErrorTracker struct { +type emptyTracker struct { } // loadFromDisk is not implemented in the blockingTracker. -func (io *ioErrorTracker) loadFromDisk(ledgerForTracker, basics.Round) error { +func (t *emptyTracker) loadFromDisk(ledgerForTracker, basics.Round) error { return nil } // newBlock is not implemented in the blockingTracker. -func (io *ioErrorTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) { +func (t *emptyTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) { } // committedUpTo in the blockingTracker just stores the committed round. -func (io *ioErrorTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) { +func (io *emptyTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) { return 0, basics.Round(0) } -func (io *ioErrorTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { +func (t *emptyTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { return dcr } // prepareCommit, is not used by the blockingTracker -func (io *ioErrorTracker) prepareCommit(*deferredCommitContext) error { +func (t *emptyTracker) prepareCommit(*deferredCommitContext) error { return nil } // commitRound is not used by the blockingTracker -func (io *ioErrorTracker) commitRound(context.Context, trackerdb.TransactionScope, *deferredCommitContext) error { - return sqlite3.Error{Code: sqlite3.ErrIoErr} +func (t *emptyTracker) commitRound(context.Context, trackerdb.TransactionScope, *deferredCommitContext) error { + return nil } -func (io *ioErrorTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) { +func (t *emptyTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) { } // postCommitUnlocked implements entry/exit blockers, designed for testing. -func (io *ioErrorTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { +func (t *emptyTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { } // control functions are not used by the blockingTracker -func (io *ioErrorTracker) handleUnorderedCommit(dcc *deferredCommitContext) { +func (t *emptyTracker) handleUnorderedCommit(dcc *deferredCommitContext) { } -func (io *ioErrorTracker) handlePrepareCommitError(dcc *deferredCommitContext) { +func (t *emptyTracker) handlePrepareCommitError(dcc *deferredCommitContext) { } -func (io *ioErrorTracker) handleCommitError(dcc *deferredCommitContext) { +func (t *emptyTracker) handleCommitError(dcc *deferredCommitContext) { } // close is not used by the blockingTracker -func (io *ioErrorTracker) close() { +func (t *emptyTracker) close() { +} + +type ioErrorTracker struct { + emptyTracker } -func (io *ioErrorTracker) reset() { +// commitRound is not used by the blockingTracker +func (io *ioErrorTracker) commitRound(context.Context, trackerdb.TransactionScope, *deferredCommitContext) error { + return sqlite3.Error{Code: sqlite3.ErrIoErr} } type producePrepareBlockingTracker struct { + emptyTracker produceReleaseLock chan struct{} prepareCommitEntryLock chan struct{} prepareCommitReleaseLock chan struct{} cancelTasks bool } -// loadFromDisk is not implemented in the blockingTracker. -func (bt *producePrepareBlockingTracker) loadFromDisk(ledgerForTracker, basics.Round) error { - return nil -} - -// newBlock is not implemented in the blockingTracker. -func (bt *producePrepareBlockingTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) { -} - -// committedUpTo in the blockingTracker just stores the committed round. -func (bt *producePrepareBlockingTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) { - return 0, basics.Round(0) -} - func (bt *producePrepareBlockingTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange { if bt.cancelTasks { return nil @@ -240,30 +233,6 @@ func (bt *producePrepareBlockingTracker) prepareCommit(*deferredCommitContext) e return nil } -// commitRound is not used by the blockingTracker -func (bt *producePrepareBlockingTracker) commitRound(context.Context, trackerdb.TransactionScope, *deferredCommitContext) error { - return nil -} - -func (bt *producePrepareBlockingTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) { -} - -// postCommitUnlocked implements entry/exit blockers, designed for testing. -func (bt *producePrepareBlockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { -} - -// control functions are not used by the blockingTracker -func (bt *producePrepareBlockingTracker) handleUnorderedCommit(dcc *deferredCommitContext) { -} -func (bt *producePrepareBlockingTracker) handlePrepareCommitError(dcc *deferredCommitContext) { -} -func (bt *producePrepareBlockingTracker) handleCommitError(dcc *deferredCommitContext) { -} - -// close is not used by the blockingTracker -func (bt *producePrepareBlockingTracker) close() { -} - func (bt *producePrepareBlockingTracker) reset() { bt.prepareCommitEntryLock = make(chan struct{}) bt.prepareCommitReleaseLock = make(chan struct{}) @@ -271,7 +240,7 @@ func (bt *producePrepareBlockingTracker) reset() { bt.cancelTasks = false } -// TestTrackerDbRoundDataRace checks for dbRound data race +// TestTrackers_DbRoundDataRace checks for dbRound data race // when commit scheduling relies on dbRound from the tracker registry but tracker's deltas // are used in calculations // 1. Add say 128 + MaxAcctLookback (MaxLookback) blocks and commit @@ -280,7 +249,7 @@ func (bt *producePrepareBlockingTracker) reset() { // 4. Set a block in produceCommittingTask, add a new block and resume the commit // 5. Resume produceCommittingTask // 6. The data race and panic happens in block queue syncher thread -func TestTrackerDbRoundDataRace(t *testing.T) { +func TestTrackers_DbRoundDataRace(t *testing.T) { partitiontest.PartitionTest(t) t.Skip("For manual run when touching ledger locking") @@ -367,7 +336,7 @@ func TestTrackerDbRoundDataRace(t *testing.T) { close(stallingTracker.produceReleaseLock) } -func TestCommitRoundIOError(t *testing.T) { +func TestTrackers_CommitRoundIOError(t *testing.T) { partitiontest.PartitionTest(t) a := require.New(t) From e81641975db5e580cf27f55f8234d917460a0af6 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 19 Oct 2023 18:04:24 -0400 Subject: [PATCH 03/13] Add trackerRegistry busy test --- ledger/tracker_test.go | 68 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/ledger/tracker_test.go b/ledger/tracker_test.go index 0df319ef70..e8eb42273b 100644 --- a/ledger/tracker_test.go +++ b/ledger/tracker_test.go @@ -240,6 +240,17 @@ func (bt *producePrepareBlockingTracker) reset() { bt.cancelTasks = false } +type commitRoundStallingTracker struct { + emptyTracker + commitRoundLock chan struct{} +} + +// commitRound is not used by the blockingTracker +func (st *commitRoundStallingTracker) commitRound(context.Context, trackerdb.TransactionScope, *deferredCommitContext) error { + <-st.commitRoundLock + return nil +} + // TestTrackers_DbRoundDataRace checks for dbRound data race // when commit scheduling relies on dbRound from the tracker registry but tracker's deltas // are used in calculations @@ -383,7 +394,62 @@ func TestTrackers_CommitRoundIOError(t *testing.T) { a.True(flag.Load()) } -func TestAccountUpdatesLedgerEvaluatorNoBlockHdr(t *testing.T) { +// TestTrackers_BusyCommitting ensures trackerRegistry.busy() is set when commitRound is in progress +func TestTrackers_BusyCommitting(t *testing.T) { + partitiontest.PartitionTest(t) + a := require.New(t) + + genesisInitState, _ := ledgertesting.GenerateInitState(t, protocol.ConsensusCurrentVersion, 1) + const inMem = true + log := logging.TestingLog(t) + log.SetLevel(logging.Warn) + cfg := config.GetDefaultLocal() + ledger, err := OpenLedger(log, t.Name(), inMem, genesisInitState, cfg) + a.NoError(err) + defer ledger.Close() + + // quit the commitSyncer goroutine + ledger.trackers.ctxCancel() + ledger.trackers.ctxCancel = nil + <-ledger.trackers.commitSyncerClosed + ledger.trackers.commitSyncerClosed = nil + + tracker := &commitRoundStallingTracker{ + commitRoundLock: make(chan struct{}), + } + ledger.trackerMu.Lock() + ledger.trackers.mu.Lock() + ledger.trackers.trackers = append([]ledgerTracker{tracker}, ledger.trackers.trackers...) + ledger.trackers.lastFlushTime = time.Time{} + ledger.trackers.mu.Unlock() + ledger.trackerMu.Unlock() + + // add some blocks + blk := genesisInitState.Block + for i := basics.Round(0); i < basics.Round(cfg.MaxAcctLookback)+1; i++ { + blk.BlockHeader.Round++ + blk.BlockHeader.TimeStamp += 1 + ledger.trackers.newBlock(blk, ledgercore.StateDelta{}) + } + + // manually trigger a commit + ledger.trackers.committedUpTo(blk.BlockHeader.Round) + dcc := <-ledger.trackers.deferredCommits + go func() { + err = ledger.trackers.commitRound(dcc) + a.NoError(err) + }() + + // commitRoundStallingTracker blocks commitRound in the goroutine above, wait few secs to ensure the trackerRegistry has set busy() + a.Eventually(func() bool { + return ledger.trackers.busy() + }, 3*time.Second, 50*time.Millisecond) + close(tracker.commitRoundLock) + ledger.trackers.waitAccountsWriting() + a.False(ledger.trackers.busy()) +} + +func TestTrackers_AccountUpdatesLedgerEvaluatorNoBlockHdr(t *testing.T) { partitiontest.PartitionTest(t) aul := &accountUpdatesLedgerEvaluator{ From 381769e3486f4dfdec01c652077c88795188f85d Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 20 Oct 2023 11:50:59 -0400 Subject: [PATCH 04/13] add more unit tests --- ledger/acctupdates.go | 7 +++++ ledger/ledger.go | 12 +-------- ledger/tracker.go | 56 +++++++++++++++++++++++++-------------- ledger/tracker_test.go | 59 ++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 102 insertions(+), 32 deletions(-) diff --git a/ledger/acctupdates.go b/ledger/acctupdates.go index ca9cd55f72..f205e2103b 100644 --- a/ledger/acctupdates.go +++ b/ledger/acctupdates.go @@ -738,6 +738,13 @@ func (au *accountUpdates) totalsImpl(rnd basics.Round) (ledgercore.AccountTotals return au.roundTotals[offset], nil } +func (au *accountUpdates) numDeltas() uint64 { + au.accountsMu.RLock() + numDeltas := uint64(len(au.deltas)) + au.accountsMu.RUnlock() + return numDeltas +} + // initializeFromDisk performs the atomic operation of loading the accounts data information from disk // and preparing the accountUpdates for operation. func (au *accountUpdates) initializeFromDisk(l ledgerForTracker, lastBalancesRound basics.Round) error { diff --git a/ledger/ledger.go b/ledger/ledger.go index 0d1fc7c255..3df9277176 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -898,20 +898,10 @@ func (l *Ledger) LatestTrackerCommitted() basics.Round { return l.trackers.getDbRound() } -// maxNonFlushedAccountRounds is a maximum number of not flushed yet account rounds (in-memory deltas) -// that we allow to have before we stop the catchup process. -// To many in-memory deltas can cause the node to run out of memory. -const maxAccountDeltas = 256 - // Available returns indicates if the ledger is can safely accept more blocks. // It is intended use with catchup to slowdown when deltas overgrow some limit. func (l *Ledger) Available() bool { - deltas := l.Latest().SubSaturate(l.trackers.getDbRound()) - if deltas < maxAccountDeltas { - return true - } - - return !l.trackers.busy() + return l.trackers.available() } // DebuggerLedger defines the minimal set of method required for creating a debug balances. diff --git a/ledger/tracker.go b/ledger/tracker.go index b727ed8ed4..a65d4a28de 100644 --- a/ledger/tracker.go +++ b/ledger/tracker.go @@ -199,8 +199,16 @@ type trackerRegistry struct { lastFlushTime time.Time cfg config.Local + + // maxAccountDeltas is a maximum number of in-memory deltas stored by trackers. + // When exceeded trackerRegistry will attempt to flush, and its Available() method will return false. + // Too many in-memory deltas could cause the node to run out of memory. + maxAccountDeltas uint64 } +// defaultMaxAccountDeltas is a default value for maxAccountDeltas. +const defaultMaxAccountDeltas = 256 + // deferredCommitRange is used during the calls to produceCommittingTask, and used as a data structure // to syncronize the various trackers and create a uniformity around which rounds need to be persisted // next. @@ -288,7 +296,7 @@ func (dcc deferredCommitContext) newBase() basics.Round { return dcc.oldBase + basics.Round(dcc.offset) } -var errMissingAccountUpdateTracker = errors.New("initializeTrackerCaches : called without a valid accounts update tracker") +var errMissingAccountUpdateTracker = errors.New("trackers replay : called without a valid accounts update tracker") func (tr *trackerRegistry) initialize(l ledgerForTracker, trackers []ledgerTracker, cfg config.Local) (err error) { tr.mu.Lock() @@ -296,18 +304,10 @@ func (tr *trackerRegistry) initialize(l ledgerForTracker, trackers []ledgerTrack tr.dbs = l.trackerDB() tr.log = l.trackerLog() - err = tr.dbs.Snapshot(func(ctx context.Context, tx trackerdb.SnapshotScope) (err error) { - ar, err := tx.MakeAccountsReader() - if err != nil { - return err - } - - tr.dbRound, err = ar.AccountsRound() - return err - }) - - if err != nil { - return err + tr.maxAccountDeltas = defaultMaxAccountDeltas + if cfg.MaxAcctLookback > tr.maxAccountDeltas { + tr.maxAccountDeltas = cfg.MaxAcctLookback + 1 + tr.log.Info("Reset maxAccountDeltas to %d because of config.MaxAcctLookback=%d. Be advised it could cause OOM.", tr.maxAccountDeltas, cfg.MaxAcctLookback) } tr.ctx, tr.ctxCancel = context.WithCancel(context.Background()) @@ -336,8 +336,22 @@ func (tr *trackerRegistry) initialize(l ledgerForTracker, trackers []ledgerTrack } func (tr *trackerRegistry) loadFromDisk(l ledgerForTracker) error { + var dbRound basics.Round + err := tr.dbs.Snapshot(func(ctx context.Context, tx trackerdb.SnapshotScope) (err error) { + ar, err := tx.MakeAccountsReader() + if err != nil { + return err + } + + dbRound, err = ar.AccountsRound() + return err + }) + if err != nil { + return err + } + tr.mu.RLock() - dbRound := tr.dbRound + tr.dbRound = dbRound tr.mu.RUnlock() for _, lt := range tr.trackers { @@ -349,9 +363,9 @@ func (tr *trackerRegistry) loadFromDisk(l ledgerForTracker) error { } } - err := tr.replay(l) + err = tr.replay(l) if err != nil { - err = fmt.Errorf("initializeTrackerCaches failed : %w", err) + err = fmt.Errorf("trackers replay failed : %w", err) } return err } @@ -459,9 +473,13 @@ func (tr *trackerRegistry) waitAccountsWriting() { tr.accountsWriting.Wait() } -// busy returns true if the trackerRegistry is actively writing accounts into DB. -func (tr *trackerRegistry) busy() bool { - return tr.accountsCommitting.Load() +func (tr *trackerRegistry) available() bool { + if tr.accts.numDeltas() < tr.maxAccountDeltas { + return true + } + + // there is a large number of deltas check if commitSyncer is not writing accounts + return !tr.accountsCommitting.Load() } func (tr *trackerRegistry) close() { diff --git a/ledger/tracker_test.go b/ledger/tracker_test.go index e8eb42273b..e41fc71f76 100644 --- a/ledger/tracker_test.go +++ b/ledger/tracker_test.go @@ -442,11 +442,66 @@ func TestTrackers_BusyCommitting(t *testing.T) { // commitRoundStallingTracker blocks commitRound in the goroutine above, wait few secs to ensure the trackerRegistry has set busy() a.Eventually(func() bool { - return ledger.trackers.busy() + return ledger.trackers.accountsCommitting.Load() }, 3*time.Second, 50*time.Millisecond) close(tracker.commitRoundLock) ledger.trackers.waitAccountsWriting() - a.False(ledger.trackers.busy()) + a.False(ledger.trackers.accountsCommitting.Load()) +} + +func TestTrackers_InitializeMaxAccountDeltas(t *testing.T) { + partitiontest.PartitionTest(t) + a := require.New(t) + + accts := setupAccts(20) + ml := makeMockLedgerForTracker(t, true, 1, protocol.ConsensusCurrentVersion, accts) + defer ml.Close() + tr := trackerRegistry{} + + cfg := config.GetDefaultLocal() + err := tr.initialize(ml, []ledgerTracker{}, cfg) + a.NoError(err) + // quit the commitSyncer goroutine + tr.ctxCancel() + tr.ctxCancel = nil + <-tr.commitSyncerClosed + tr.commitSyncerClosed = nil + a.Equal(uint64(defaultMaxAccountDeltas), tr.maxAccountDeltas) + + cfg.MaxAcctLookback = defaultMaxAccountDeltas + 100 + err = tr.initialize(ml, []ledgerTracker{}, cfg) + a.NoError(err) + // quit the commitSyncer goroutine + tr.ctxCancel() + tr.ctxCancel = nil + <-tr.commitSyncerClosed + tr.commitSyncerClosed = nil + a.Equal(cfg.MaxAcctLookback+1, tr.maxAccountDeltas) +} + +func TestTrackers_Available(t *testing.T) { + partitiontest.PartitionTest(t) + a := require.New(t) + + tr := trackerRegistry{ + accts: &accountUpdates{}, + maxAccountDeltas: defaultMaxAccountDeltas, + } + + a.True(tr.available()) + + // no deltas but busy committing => available + tr.accountsCommitting.Store(true) + a.True(tr.available()) + tr.accountsCommitting.Store(false) + + // lots of deltas but not committing => available + tr.accts.deltas = make([]ledgercore.StateDelta, defaultMaxAccountDeltas+10) + a.True(tr.available()) + + // lots of deltas and committing => not available + tr.accountsCommitting.Store(true) + a.False(tr.available()) } func TestTrackers_AccountUpdatesLedgerEvaluatorNoBlockHdr(t *testing.T) { From 2f0fcac4704461329f1e00d03ebdb47bf131118f Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 20 Oct 2023 14:41:59 -0400 Subject: [PATCH 05/13] catchup unit test --- catchup/service_test.go | 52 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/catchup/service_test.go b/catchup/service_test.go index 466a69cdb6..083f9ea701 100644 --- a/catchup/service_test.go +++ b/catchup/service_test.go @@ -834,6 +834,14 @@ func (m *mockedLedger) Available() bool { return true } +type mockedUnavalLedger struct { + mockedLedger +} + +func (m *mockedUnavalLedger) Available() bool { + return false +} + func testingenvWithUpgrade( t testing.TB, numBlocks, @@ -1131,3 +1139,47 @@ func TestDownloadBlocksToSupportStateProofs(t *testing.T) { lookback = lookbackForStateproofsSupport(&topBlk) assert.Equal(t, uint64(0), lookback) } + +// TestServiceLedgerUnavailable checks a local ledger that is unavailable cannot catchup up to remote round +func TestServiceLedgerUnavailable(t *testing.T) { + partitiontest.PartitionTest(t) + + // Make Ledger + local := new(mockedUnavalLedger) + local.blocks = append(local.blocks, bookkeeping.Block{}) + + remote, _, blk, err := buildTestLedger(t, bookkeeping.Block{}) + if err != nil { + t.Fatal(err) + return + } + numBlocks := 10 + addBlocks(t, remote, blk, numBlocks) + + // Create a network and block service + blockServiceConfig := config.GetDefaultLocal() + net := &httpTestPeerSource{} + ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID") + + nodeA := basicRPCNode{} + nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls) + nodeA.start() + defer nodeA.stop() + rootURL := nodeA.rootURL() + net.addPeer(rootURL) + + require.Equal(t, basics.Round(0), local.LastRound()) + require.Equal(t, basics.Round(numBlocks+1), remote.LastRound()) + + // Make Service + auth := &mockedAuthenticator{fail: false} + s := MakeService(logging.Base(), defaultConfig, net, local, auth, nil, nil) + s.log = &periodicSyncLogger{Logger: logging.Base()} + s.deadlineTimeout = 2 * time.Second + + s.testStart() + defer s.Stop() + s.sync() + require.Greater(t, local.LastRound(), basics.Round(0)) + require.Less(t, local.LastRound(), remote.LastRound()) +} From 63ef58ff533045ba20d37e92f19f897a76435656 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 20 Oct 2023 16:25:14 -0400 Subject: [PATCH 06/13] CR fixes: rename IsBehindCommittingDeltas and reverse logic --- catchup/service.go | 8 ++++---- catchup/service_test.go | 12 ++++++------ ledger/ledger.go | 8 ++++---- ledger/tracker.go | 32 ++++++++++++++++---------------- ledger/tracker_test.go | 20 ++++++++++---------- 5 files changed, 40 insertions(+), 40 deletions(-) diff --git a/catchup/service.go b/catchup/service.go index 9774fde133..891e8970c3 100644 --- a/catchup/service.go +++ b/catchup/service.go @@ -63,7 +63,7 @@ type Ledger interface { Block(basics.Round) (bookkeeping.Block, error) BlockHdr(basics.Round) (bookkeeping.BlockHeader, error) IsWritingCatchpointDataFile() bool - Available() bool + IsBehindCommittingDeltas() bool Validate(ctx context.Context, blk bookkeeping.Block, executionPool execpool.BacklogPool) (*ledgercore.ValidatedBlock, error) AddValidatedBlock(vb ledgercore.ValidatedBlock, cert agreement.Certificate) error WaitMem(r basics.Round) chan struct{} @@ -504,7 +504,7 @@ func (s *Service) pipelinedFetch(seedLookback uint64) { } // if the ledger has too many non-flushed account changes, stop catching up to reduce the memory pressure. - if !s.ledger.Available() { + if s.ledger.IsBehindCommittingDeltas() { s.log.Info("Catchup is stopping due to too many non-flushed account changes") s.suspendForLedgerOps = true return @@ -563,7 +563,7 @@ func (s *Service) periodicSync() { sleepDuration = time.Duration(crypto.RandUint63()) % s.deadlineTimeout continue case <-s.syncNow: - if s.parallelBlocks == 0 || s.ledger.IsWritingCatchpointDataFile() || !s.ledger.Available() { + if s.parallelBlocks == 0 || s.ledger.IsWritingCatchpointDataFile() || s.ledger.IsBehindCommittingDeltas() { continue } s.suspendForLedgerOps = false @@ -584,7 +584,7 @@ func (s *Service) periodicSync() { continue } // if the ledger has too many non-flushed account changes, skip - if !s.ledger.Available() { + if s.ledger.IsBehindCommittingDeltas() { continue } diff --git a/catchup/service_test.go b/catchup/service_test.go index 083f9ea701..f076c3139e 100644 --- a/catchup/service_test.go +++ b/catchup/service_test.go @@ -830,16 +830,16 @@ func (m *mockedLedger) IsWritingCatchpointDataFile() bool { return false } -func (m *mockedLedger) Available() bool { - return true +func (m *mockedLedger) IsBehindCommittingDeltas() bool { + return false } -type mockedUnavalLedger struct { +type mockedBehindDeltasLedger struct { mockedLedger } -func (m *mockedUnavalLedger) Available() bool { - return false +func (m *mockedBehindDeltasLedger) IsBehindCommittingDeltas() bool { + return true } func testingenvWithUpgrade( @@ -1145,7 +1145,7 @@ func TestServiceLedgerUnavailable(t *testing.T) { partitiontest.PartitionTest(t) // Make Ledger - local := new(mockedUnavalLedger) + local := new(mockedBehindDeltasLedger) local.blocks = append(local.blocks, bookkeeping.Block{}) remote, _, blk, err := buildTestLedger(t, bookkeeping.Block{}) diff --git a/ledger/ledger.go b/ledger/ledger.go index 3df9277176..ee6cab07b9 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -898,10 +898,10 @@ func (l *Ledger) LatestTrackerCommitted() basics.Round { return l.trackers.getDbRound() } -// Available returns indicates if the ledger is can safely accept more blocks. -// It is intended use with catchup to slowdown when deltas overgrow some limit. -func (l *Ledger) Available() bool { - return l.trackers.available() +// IsBehindCommittingDeltas indicates if the ledger is behind expected number of in-memory deltas. +// It intended to slow down the catchup service when deltas overgrow some limit. +func (l *Ledger) IsBehindCommittingDeltas() bool { + return l.trackers.isBehindCommittingDeltas() } // DebuggerLedger defines the minimal set of method required for creating a debug balances. diff --git a/ledger/tracker.go b/ledger/tracker.go index a65d4a28de..b83bf55f6d 100644 --- a/ledger/tracker.go +++ b/ledger/tracker.go @@ -307,7 +307,7 @@ func (tr *trackerRegistry) initialize(l ledgerForTracker, trackers []ledgerTrack tr.maxAccountDeltas = defaultMaxAccountDeltas if cfg.MaxAcctLookback > tr.maxAccountDeltas { tr.maxAccountDeltas = cfg.MaxAcctLookback + 1 - tr.log.Info("Reset maxAccountDeltas to %d because of config.MaxAcctLookback=%d. Be advised it could cause OOM.", tr.maxAccountDeltas, cfg.MaxAcctLookback) + tr.log.Infof("Reset maxAccountDeltas to %d because of config.MaxAcctLookback=%d. Be advised it could cause OOM.", tr.maxAccountDeltas, cfg.MaxAcctLookback) } tr.ctx, tr.ctxCancel = context.WithCancel(context.Background()) @@ -338,13 +338,13 @@ func (tr *trackerRegistry) initialize(l ledgerForTracker, trackers []ledgerTrack func (tr *trackerRegistry) loadFromDisk(l ledgerForTracker) error { var dbRound basics.Round err := tr.dbs.Snapshot(func(ctx context.Context, tx trackerdb.SnapshotScope) (err error) { - ar, err := tx.MakeAccountsReader() - if err != nil { - return err + ar, err0 := tx.MakeAccountsReader() + if err0 != nil { + return err0 } - dbRound, err = ar.AccountsRound() - return err + dbRound, err0 = ar.AccountsRound() + return err0 }) if err != nil { return err @@ -355,19 +355,19 @@ func (tr *trackerRegistry) loadFromDisk(l ledgerForTracker) error { tr.mu.RUnlock() for _, lt := range tr.trackers { - err := lt.loadFromDisk(l, dbRound) - if err != nil { + err0 := lt.loadFromDisk(l, dbRound) + if err0 != nil { // find the tracker name. trackerName := reflect.TypeOf(lt).String() - return fmt.Errorf("tracker %s failed to loadFromDisk : %w", trackerName, err) + return fmt.Errorf("tracker %s failed to loadFromDisk : %w", trackerName, err0) } } - err = tr.replay(l) - if err != nil { - err = fmt.Errorf("trackers replay failed : %w", err) + if err0 := tr.replay(l); err0 != nil { + return fmt.Errorf("trackers replay failed : %w", err0) } - return err + + return nil } func (tr *trackerRegistry) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) { @@ -473,13 +473,13 @@ func (tr *trackerRegistry) waitAccountsWriting() { tr.accountsWriting.Wait() } -func (tr *trackerRegistry) available() bool { +func (tr *trackerRegistry) isBehindCommittingDeltas() bool { if tr.accts.numDeltas() < tr.maxAccountDeltas { - return true + return false } // there is a large number of deltas check if commitSyncer is not writing accounts - return !tr.accountsCommitting.Load() + return tr.accountsCommitting.Load() } func (tr *trackerRegistry) close() { diff --git a/ledger/tracker_test.go b/ledger/tracker_test.go index e41fc71f76..4cced94e1b 100644 --- a/ledger/tracker_test.go +++ b/ledger/tracker_test.go @@ -163,7 +163,7 @@ func (t *emptyTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDel } // committedUpTo in the blockingTracker just stores the committed round. -func (io *emptyTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) { +func (t *emptyTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) { return 0, basics.Round(0) } @@ -428,7 +428,7 @@ func TestTrackers_BusyCommitting(t *testing.T) { blk := genesisInitState.Block for i := basics.Round(0); i < basics.Round(cfg.MaxAcctLookback)+1; i++ { blk.BlockHeader.Round++ - blk.BlockHeader.TimeStamp += 1 + blk.BlockHeader.TimeStamp++ ledger.trackers.newBlock(blk, ledgercore.StateDelta{}) } @@ -479,7 +479,7 @@ func TestTrackers_InitializeMaxAccountDeltas(t *testing.T) { a.Equal(cfg.MaxAcctLookback+1, tr.maxAccountDeltas) } -func TestTrackers_Available(t *testing.T) { +func TestTrackers_IsBehindCommittingDeltas(t *testing.T) { partitiontest.PartitionTest(t) a := require.New(t) @@ -488,20 +488,20 @@ func TestTrackers_Available(t *testing.T) { maxAccountDeltas: defaultMaxAccountDeltas, } - a.True(tr.available()) + a.False(tr.isBehindCommittingDeltas()) - // no deltas but busy committing => available + // no deltas but busy committing => not behind tr.accountsCommitting.Store(true) - a.True(tr.available()) + a.False(tr.isBehindCommittingDeltas()) tr.accountsCommitting.Store(false) - // lots of deltas but not committing => available + // lots of deltas but not committing => not behind tr.accts.deltas = make([]ledgercore.StateDelta, defaultMaxAccountDeltas+10) - a.True(tr.available()) + a.False(tr.isBehindCommittingDeltas()) - // lots of deltas and committing => not available + // lots of deltas and committing => behind tr.accountsCommitting.Store(true) - a.False(tr.available()) + a.True(tr.isBehindCommittingDeltas()) } func TestTrackers_AccountUpdatesLedgerEvaluatorNoBlockHdr(t *testing.T) { From f41ba3e7af28a3834f0357c40b305ec8f8056cc8 Mon Sep 17 00:00:00 2001 From: Gary <982483+gmalouf@users.noreply.github.com> Date: Tue, 24 Oct 2023 09:40:02 -0400 Subject: [PATCH 07/13] Comments cleanup in tracker_test.go --- ledger/tracker_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/ledger/tracker_test.go b/ledger/tracker_test.go index 4cced94e1b..de187cb31e 100644 --- a/ledger/tracker_test.go +++ b/ledger/tracker_test.go @@ -153,16 +153,16 @@ func TestTrackerScheduleCommit(t *testing.T) { type emptyTracker struct { } -// loadFromDisk is not implemented in the blockingTracker. +// loadFromDisk is not implemented in the emptyTracker. func (t *emptyTracker) loadFromDisk(ledgerForTracker, basics.Round) error { return nil } -// newBlock is not implemented in the blockingTracker. +// newBlock is not implemented in the emptyTracker. func (t *emptyTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) { } -// committedUpTo in the blockingTracker just stores the committed round. +// committedUpTo in the emptyTracker just stores the committed round. func (t *emptyTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) { return 0, basics.Round(0) } @@ -171,12 +171,12 @@ func (t *emptyTracker) produceCommittingTask(committedRound basics.Round, dbRoun return dcr } -// prepareCommit, is not used by the blockingTracker +// prepareCommit, is not used by the emptyTracker func (t *emptyTracker) prepareCommit(*deferredCommitContext) error { return nil } -// commitRound is not used by the blockingTracker +// commitRound is not used by the emptyTracker func (t *emptyTracker) commitRound(context.Context, trackerdb.TransactionScope, *deferredCommitContext) error { return nil } @@ -188,7 +188,7 @@ func (t *emptyTracker) postCommit(ctx context.Context, dcc *deferredCommitContex func (t *emptyTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { } -// control functions are not used by the blockingTracker +// control functions are not used by the emptyTracker func (t *emptyTracker) handleUnorderedCommit(dcc *deferredCommitContext) { } func (t *emptyTracker) handlePrepareCommitError(dcc *deferredCommitContext) { @@ -196,7 +196,7 @@ func (t *emptyTracker) handlePrepareCommitError(dcc *deferredCommitContext) { func (t *emptyTracker) handleCommitError(dcc *deferredCommitContext) { } -// close is not used by the blockingTracker +// close is not used by the emptyTracker func (t *emptyTracker) close() { } @@ -204,7 +204,7 @@ type ioErrorTracker struct { emptyTracker } -// commitRound is not used by the blockingTracker +// commitRound is not used by the ioErrorTracker func (io *ioErrorTracker) commitRound(context.Context, trackerdb.TransactionScope, *deferredCommitContext) error { return sqlite3.Error{Code: sqlite3.ErrIoErr} } From 94733e6ea1782a9a5505c5baec5372885400b559 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 24 Oct 2023 10:04:31 -0400 Subject: [PATCH 08/13] CR: rewrite isBehindCommittingDeltas --- ledger/acctupdates.go | 7 ------- ledger/ledger.go | 2 +- ledger/tracker.go | 9 +++++++-- ledger/tracker_test.go | 12 +++++++----- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/ledger/acctupdates.go b/ledger/acctupdates.go index f205e2103b..ca9cd55f72 100644 --- a/ledger/acctupdates.go +++ b/ledger/acctupdates.go @@ -738,13 +738,6 @@ func (au *accountUpdates) totalsImpl(rnd basics.Round) (ledgercore.AccountTotals return au.roundTotals[offset], nil } -func (au *accountUpdates) numDeltas() uint64 { - au.accountsMu.RLock() - numDeltas := uint64(len(au.deltas)) - au.accountsMu.RUnlock() - return numDeltas -} - // initializeFromDisk performs the atomic operation of loading the accounts data information from disk // and preparing the accountUpdates for operation. func (au *accountUpdates) initializeFromDisk(l ledgerForTracker, lastBalancesRound basics.Round) error { diff --git a/ledger/ledger.go b/ledger/ledger.go index ee6cab07b9..1617007aef 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -901,7 +901,7 @@ func (l *Ledger) LatestTrackerCommitted() basics.Round { // IsBehindCommittingDeltas indicates if the ledger is behind expected number of in-memory deltas. // It intended to slow down the catchup service when deltas overgrow some limit. func (l *Ledger) IsBehindCommittingDeltas() bool { - return l.trackers.isBehindCommittingDeltas() + return l.trackers.isBehindCommittingDeltas(l.Latest()) } // DebuggerLedger defines the minimal set of method required for creating a debug balances. diff --git a/ledger/tracker.go b/ledger/tracker.go index b83bf55f6d..0129eb599f 100644 --- a/ledger/tracker.go +++ b/ledger/tracker.go @@ -473,8 +473,13 @@ func (tr *trackerRegistry) waitAccountsWriting() { tr.accountsWriting.Wait() } -func (tr *trackerRegistry) isBehindCommittingDeltas() bool { - if tr.accts.numDeltas() < tr.maxAccountDeltas { +func (tr *trackerRegistry) isBehindCommittingDeltas(latest basics.Round) bool { + tr.mu.RLock() + dbRound := tr.dbRound + tr.mu.RUnlock() + + numDeltas := uint64(latest.SubSaturate(dbRound)) + if numDeltas < tr.maxAccountDeltas { return false } diff --git a/ledger/tracker_test.go b/ledger/tracker_test.go index de187cb31e..d16fb925c1 100644 --- a/ledger/tracker_test.go +++ b/ledger/tracker_test.go @@ -488,20 +488,22 @@ func TestTrackers_IsBehindCommittingDeltas(t *testing.T) { maxAccountDeltas: defaultMaxAccountDeltas, } - a.False(tr.isBehindCommittingDeltas()) + latest := basics.Round(0) + a.False(tr.isBehindCommittingDeltas(latest)) // no deltas but busy committing => not behind tr.accountsCommitting.Store(true) - a.False(tr.isBehindCommittingDeltas()) + a.False(tr.isBehindCommittingDeltas(latest)) tr.accountsCommitting.Store(false) // lots of deltas but not committing => not behind - tr.accts.deltas = make([]ledgercore.StateDelta, defaultMaxAccountDeltas+10) - a.False(tr.isBehindCommittingDeltas()) + latest = basics.Round(defaultMaxAccountDeltas + 10) + tr.dbRound = 0 + a.False(tr.isBehindCommittingDeltas(latest)) // lots of deltas and committing => behind tr.accountsCommitting.Store(true) - a.True(tr.isBehindCommittingDeltas()) + a.True(tr.isBehindCommittingDeltas(latest)) } func TestTrackers_AccountUpdatesLedgerEvaluatorNoBlockHdr(t *testing.T) { From d62d505cb8cb4011011b5e925132523929c1d42e Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 24 Oct 2023 10:09:39 -0400 Subject: [PATCH 09/13] CR: rewrite messages as suggested --- catchup/service.go | 2 +- ledger/tracker.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/catchup/service.go b/catchup/service.go index 891e8970c3..1327ba963e 100644 --- a/catchup/service.go +++ b/catchup/service.go @@ -88,7 +88,7 @@ type Service struct { blockValidationPool execpool.BacklogPool // suspendForLedgerOps defines whether we've run into a state where the ledger is currently busy writing the - // catchpoint file. If so, we want to suspend the catchup process until the catchpoint file writing is complete, + // catchpoint file or flushing accounts. If so, we want to suspend the catchup process until the catchpoint file writing is complete, // and resume from there without stopping the catchup timer. suspendForLedgerOps bool diff --git a/ledger/tracker.go b/ledger/tracker.go index 0129eb599f..4f5ac56440 100644 --- a/ledger/tracker.go +++ b/ledger/tracker.go @@ -307,7 +307,7 @@ func (tr *trackerRegistry) initialize(l ledgerForTracker, trackers []ledgerTrack tr.maxAccountDeltas = defaultMaxAccountDeltas if cfg.MaxAcctLookback > tr.maxAccountDeltas { tr.maxAccountDeltas = cfg.MaxAcctLookback + 1 - tr.log.Infof("Reset maxAccountDeltas to %d because of config.MaxAcctLookback=%d. Be advised it could cause OOM.", tr.maxAccountDeltas, cfg.MaxAcctLookback) + tr.log.Infof("maxAccountDeltas as overridden to %d because of MaxAcctLookback=%d: this combination might use lots of RAM. To preserve some blocks in blockdb consider using MaxBlockHistoryLookback config option instead of MaxAcctLookback", tr.maxAccountDeltas, cfg.MaxAcctLookback) } tr.ctx, tr.ctxCancel = context.WithCancel(context.Background()) From c2014a9f1ebfa3ecfd5a797d8a05e0540d3544b2 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com> Date: Tue, 24 Oct 2023 10:24:58 -0400 Subject: [PATCH 10/13] Update ledger/tracker.go Co-authored-by: John Jannotti --- ledger/tracker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ledger/tracker.go b/ledger/tracker.go index 4f5ac56440..7ad5ba6641 100644 --- a/ledger/tracker.go +++ b/ledger/tracker.go @@ -307,7 +307,7 @@ func (tr *trackerRegistry) initialize(l ledgerForTracker, trackers []ledgerTrack tr.maxAccountDeltas = defaultMaxAccountDeltas if cfg.MaxAcctLookback > tr.maxAccountDeltas { tr.maxAccountDeltas = cfg.MaxAcctLookback + 1 - tr.log.Infof("maxAccountDeltas as overridden to %d because of MaxAcctLookback=%d: this combination might use lots of RAM. To preserve some blocks in blockdb consider using MaxBlockHistoryLookback config option instead of MaxAcctLookback", tr.maxAccountDeltas, cfg.MaxAcctLookback) + tr.log.Infof("maxAccountDeltas was overridden to %d because of MaxAcctLookback=%d: this combination might use lots of RAM. To preserve some blocks in blockdb consider using MaxBlockHistoryLookback config option instead of MaxAcctLookback", tr.maxAccountDeltas, cfg.MaxAcctLookback) } tr.ctx, tr.ctxCancel = context.WithCancel(context.Background()) From 76d21f9ec4a1e7b52f458f1a14b6526fc45035eb Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 24 Oct 2023 16:56:37 -0400 Subject: [PATCH 11/13] catchup: alternative pause catchup if ledger lagging behind --- catchup/service.go | 14 +++++++++++++- catchup/service_test.go | 4 +++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/catchup/service.go b/catchup/service.go index 1327ba963e..b931e6619e 100644 --- a/catchup/service.go +++ b/catchup/service.go @@ -36,6 +36,7 @@ import ( "github.com/algorand/go-algorand/network" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/util/execpool" + "github.com/algorand/go-algorand/util/metrics" ) const catchupPeersForSync = 10 @@ -294,6 +295,7 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo // Try to fetch, timing out after retryInterval block, cert, blockDownloadDuration, err := s.innerFetch(ctx, r, peer) + totalBlocksFetched.Inc(nil) if err != nil { if err == errLedgerAlreadyHasBlock { @@ -495,6 +497,14 @@ func (s *Service) pipelinedFetch(seedLookback uint64) { return } + // if ledger is busy, pause for some time to let the fetchAndWrite goroutines to finish fetching in-flight blocks. + start := time.Now() + for (s.ledger.IsWritingCatchpointDataFile() || s.ledger.IsBehindCommittingDeltas()) && time.Since(start) < s.deadlineTimeout { + time.Sleep(100 * time.Millisecond) + } + + // if ledger is still busy after s.deadlineTimeout timeout then about the current pipelinedFetch invocation. + // if we're writing a catchpoint file, stop catching up to reduce the memory pressure. Once we finish writing the file we // could resume with the catchup. if s.ledger.IsWritingCatchpointDataFile() { @@ -654,7 +664,7 @@ func (s *Service) sync() { } } - elapsedTime := time.Now().Sub(start) + elapsedTime := time.Since(start) s.log.EventWithDetails(telemetryspec.ApplicationState, telemetryspec.CatchupStopEvent, telemetryspec.CatchupStopEventDetails{ StartRound: uint64(pr), EndRound: uint64(s.ledger.LastRound()), @@ -847,3 +857,5 @@ func createPeerSelector(net network.GossipNode, cfg config.Local, pipelineFetch } return makePeerSelector(net, peerClasses) } + +var totalBlocksFetched = metrics.MakeCounter(metrics.MetricName{Name: "algod_catchup_blocks_fetched", Description: "Total number of blocks fetched with catchup"}) diff --git a/catchup/service_test.go b/catchup/service_test.go index f076c3139e..43f8ab6f6e 100644 --- a/catchup/service_test.go +++ b/catchup/service_test.go @@ -1173,7 +1173,9 @@ func TestServiceLedgerUnavailable(t *testing.T) { // Make Service auth := &mockedAuthenticator{fail: false} - s := MakeService(logging.Base(), defaultConfig, net, local, auth, nil, nil) + cfg := config.GetDefaultLocal() + cfg.CatchupParallelBlocks = 2 + s := MakeService(logging.Base(), cfg, net, local, auth, nil, nil) s.log = &periodicSyncLogger{Logger: logging.Base()} s.deadlineTimeout = 2 * time.Second From e85936b8898c5258ec003d1c135e38ceceb51f1c Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 25 Oct 2023 14:42:16 -0400 Subject: [PATCH 12/13] remove debug metrics --- catchup/service.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/catchup/service.go b/catchup/service.go index b931e6619e..de0311c0cc 100644 --- a/catchup/service.go +++ b/catchup/service.go @@ -36,7 +36,6 @@ import ( "github.com/algorand/go-algorand/network" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/util/execpool" - "github.com/algorand/go-algorand/util/metrics" ) const catchupPeersForSync = 10 @@ -295,7 +294,6 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo // Try to fetch, timing out after retryInterval block, cert, blockDownloadDuration, err := s.innerFetch(ctx, r, peer) - totalBlocksFetched.Inc(nil) if err != nil { if err == errLedgerAlreadyHasBlock { @@ -857,5 +855,3 @@ func createPeerSelector(net network.GossipNode, cfg config.Local, pipelineFetch } return makePeerSelector(net, peerClasses) } - -var totalBlocksFetched = metrics.MakeCounter(metrics.MetricName{Name: "algod_catchup_blocks_fetched", Description: "Total number of blocks fetched with catchup"}) From b03bd702c4d06b5a039f51d1014412ba5a3cc3a0 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com> Date: Thu, 26 Oct 2023 13:38:07 -0400 Subject: [PATCH 13/13] Update catchup/service.go --- catchup/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/catchup/service.go b/catchup/service.go index de0311c0cc..3638360a97 100644 --- a/catchup/service.go +++ b/catchup/service.go @@ -501,7 +501,7 @@ func (s *Service) pipelinedFetch(seedLookback uint64) { time.Sleep(100 * time.Millisecond) } - // if ledger is still busy after s.deadlineTimeout timeout then about the current pipelinedFetch invocation. + // if ledger is still busy after s.deadlineTimeout timeout then abort the current pipelinedFetch invocation. // if we're writing a catchpoint file, stop catching up to reduce the memory pressure. Once we finish writing the file we // could resume with the catchup.