Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Ledger interface {
Block(basics.Round) (bookkeeping.Block, error)
BlockHdr(basics.Round) (bookkeeping.BlockHeader, error)
IsWritingCatchpointDataFile() bool
IsBehindCommittingDeltas() bool
Validate(ctx context.Context, blk bookkeeping.Block, executionPool execpool.BacklogPool) (*ledgercore.ValidatedBlock, error)
AddValidatedBlock(vb ledgercore.ValidatedBlock, cert agreement.Certificate) error
WaitMem(r basics.Round) chan struct{}
Expand All @@ -86,10 +87,10 @@ type Service struct {
deadlineTimeout time.Duration
blockValidationPool execpool.BacklogPool

// suspendForCatchpointWriting defines whether we've run into a state where the ledger is currently busy writing the
// catchpoint file. If so, we want to suspend the catchup process until the catchpoint file writing is complete,
// suspendForLedgerOps defines whether we've run into a state where the ledger is currently busy writing the
// catchpoint file or flushing accounts. If so, we want to suspend the catchup process until the catchpoint file writing is complete,
// and resume from there without stopping the catchup timer.
suspendForCatchpointWriting bool
suspendForLedgerOps bool

// The channel gets closed when the initial sync is complete. This allows for other services to avoid
// the overhead of starting prematurely (before this node is caught-up and can validate messages for example).
Expand Down Expand Up @@ -494,11 +495,26 @@ func (s *Service) pipelinedFetch(seedLookback uint64) {
return
}

// if ledger is busy, pause for some time to let the fetchAndWrite goroutines to finish fetching in-flight blocks.
start := time.Now()
Comment thread
algorandskiy marked this conversation as resolved.
for (s.ledger.IsWritingCatchpointDataFile() || s.ledger.IsBehindCommittingDeltas()) && time.Since(start) < s.deadlineTimeout {
time.Sleep(100 * time.Millisecond)
}

// if ledger is still busy after s.deadlineTimeout timeout then abort the current pipelinedFetch invocation.

// if we're writing a catchpoint file, stop catching up to reduce the memory pressure. Once we finish writing the file we
// could resume with the catchup.
if s.ledger.IsWritingCatchpointDataFile() {
s.log.Info("Catchup is stopping due to catchpoint file being written")
s.suspendForCatchpointWriting = true
s.suspendForLedgerOps = true
return
}

// if the ledger has too many non-flushed account changes, stop catching up to reduce the memory pressure.
if s.ledger.IsBehindCommittingDeltas() {
s.log.Info("Catchup is stopping due to too many non-flushed account changes")
s.suspendForLedgerOps = true
return
}

Expand Down Expand Up @@ -555,10 +571,10 @@ func (s *Service) periodicSync() {
sleepDuration = time.Duration(crypto.RandUint63()) % s.deadlineTimeout
continue
case <-s.syncNow:
if s.parallelBlocks == 0 || s.ledger.IsWritingCatchpointDataFile() {
if s.parallelBlocks == 0 || s.ledger.IsWritingCatchpointDataFile() || s.ledger.IsBehindCommittingDeltas() {
continue
}
s.suspendForCatchpointWriting = false
s.suspendForLedgerOps = false
s.log.Info("Immediate resync triggered; resyncing")
s.sync()
case <-time.After(sleepDuration):
Expand All @@ -575,7 +591,12 @@ func (s *Service) periodicSync() {
// keep the existing sleep duration and try again later.
continue
}
s.suspendForCatchpointWriting = false
// if the ledger has too many non-flushed account changes, skip
if s.ledger.IsBehindCommittingDeltas() {
continue
}

s.suspendForLedgerOps = false
s.log.Info("It's been too long since our ledger advanced; resyncing")
s.sync()
case cert := <-s.unmatchedPendingCertificates:
Expand Down Expand Up @@ -630,7 +651,7 @@ func (s *Service) sync() {
initSync := false

// if the catchupWriting flag is set, it means that we aborted the sync due to the ledger writing the catchup file.
if !s.suspendForCatchpointWriting {
if !s.suspendForLedgerOps {
// in that case, don't change the timer so that the "timer" would keep running.
atomic.StoreInt64(&s.syncStartNS, 0)

Expand All @@ -641,7 +662,7 @@ func (s *Service) sync() {
}
}

elapsedTime := time.Now().Sub(start)
elapsedTime := time.Since(start)
s.log.EventWithDetails(telemetryspec.ApplicationState, telemetryspec.CatchupStopEvent, telemetryspec.CatchupStopEventDetails{
StartRound: uint64(pr),
EndRound: uint64(s.ledger.LastRound()),
Expand Down
58 changes: 58 additions & 0 deletions catchup/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,18 @@ func (m *mockedLedger) IsWritingCatchpointDataFile() bool {
return false
}

func (m *mockedLedger) IsBehindCommittingDeltas() bool {
return false
}

type mockedBehindDeltasLedger struct {
Comment thread
jasonpaulos marked this conversation as resolved.
mockedLedger
}

func (m *mockedBehindDeltasLedger) IsBehindCommittingDeltas() bool {
return true
}

func testingenvWithUpgrade(
t testing.TB,
numBlocks,
Expand Down Expand Up @@ -1127,3 +1139,49 @@ func TestDownloadBlocksToSupportStateProofs(t *testing.T) {
lookback = lookbackForStateproofsSupport(&topBlk)
assert.Equal(t, uint64(0), lookback)
}

// TestServiceLedgerUnavailable checks a local ledger that is unavailable cannot catchup up to remote round
func TestServiceLedgerUnavailable(t *testing.T) {
partitiontest.PartitionTest(t)

// Make Ledger
local := new(mockedBehindDeltasLedger)
local.blocks = append(local.blocks, bookkeeping.Block{})

remote, _, blk, err := buildTestLedger(t, bookkeeping.Block{})
if err != nil {
t.Fatal(err)
return
}
numBlocks := 10
addBlocks(t, remote, blk, numBlocks)

// Create a network and block service
blockServiceConfig := config.GetDefaultLocal()
net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
net.addPeer(rootURL)

require.Equal(t, basics.Round(0), local.LastRound())
require.Equal(t, basics.Round(numBlocks+1), remote.LastRound())

// Make Service
auth := &mockedAuthenticator{fail: false}
cfg := config.GetDefaultLocal()
cfg.CatchupParallelBlocks = 2
s := MakeService(logging.Base(), cfg, net, local, auth, nil, nil)
s.log = &periodicSyncLogger{Logger: logging.Base()}
s.deadlineTimeout = 2 * time.Second

s.testStart()
defer s.Stop()
s.sync()
require.Greater(t, local.LastRound(), basics.Round(0))
require.Less(t, local.LastRound(), remote.LastRound())
}
37 changes: 1 addition & 36 deletions ledger/catchpointtracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,7 @@ func TestCatchpointReproducibleLabels(t *testing.T) {

// blockingTracker is a testing tracker used to test "what if" a tracker would get blocked.
type blockingTracker struct {
emptyTracker
postCommitUnlockedEntryLock chan struct{}
postCommitUnlockedReleaseLock chan struct{}
postCommitEntryLock chan struct{}
Expand All @@ -783,36 +784,12 @@ type blockingTracker struct {
shouldLockPostCommitUnlocked atomic.Bool
}

// loadFromDisk is not implemented in the blockingTracker.
func (bt *blockingTracker) loadFromDisk(ledgerForTracker, basics.Round) error {
return nil
}

// newBlock is not implemented in the blockingTracker.
func (bt *blockingTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) {
}

// committedUpTo in the blockingTracker just stores the committed round.
func (bt *blockingTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) {
atomic.StoreInt64(&bt.committedUpToRound, int64(committedRnd))
return committedRnd, basics.Round(0)
}

// produceCommittingTask is not used by the blockingTracker
func (bt *blockingTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
return dcr
}

// prepareCommit, is not used by the blockingTracker
func (bt *blockingTracker) prepareCommit(*deferredCommitContext) error {
return nil
}

// commitRound is not used by the blockingTracker
func (bt *blockingTracker) commitRound(context.Context, trackerdb.TransactionScope, *deferredCommitContext) error {
return nil
}

// postCommit implements entry/exit blockers, designed for testing.
func (bt *blockingTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) {
if bt.alwaysLock.Load() || dcc.catchpointFirstStage || bt.shouldLockPostCommit.Load() {
Expand All @@ -829,18 +806,6 @@ func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferred
}
}

// control functions are not used by the blockingTracker
func (bt *blockingTracker) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (bt *blockingTracker) handlePrepareCommitError(dcc *deferredCommitContext) {
}
func (bt *blockingTracker) handleCommitError(dcc *deferredCommitContext) {
}

// close is not used by the blockingTracker
func (bt *blockingTracker) close() {
}

func TestCatchpointTrackerNonblockingCatchpointWriting(t *testing.T) {
partitiontest.PartitionTest(t)

Expand Down
6 changes: 6 additions & 0 deletions ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,12 @@ func (l *Ledger) LatestTrackerCommitted() basics.Round {
return l.trackers.getDbRound()
}

// IsBehindCommittingDeltas indicates if the ledger is behind expected number of in-memory deltas.
// It intended to slow down the catchup service when deltas overgrow some limit.
func (l *Ledger) IsBehindCommittingDeltas() bool {
return l.trackers.isBehindCommittingDeltas(l.Latest())
}

// DebuggerLedger defines the minimal set of method required for creating a debug balances.
type DebuggerLedger = eval.LedgerForCowBase

Expand Down
78 changes: 57 additions & 21 deletions ledger/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"reflect"
"sync"
"sync/atomic"
"time"

"github.com/algorand/go-algorand/config"
Expand Down Expand Up @@ -175,6 +176,8 @@ type trackerRegistry struct {

// accountsWriting provides synchronization around the background writing of account balances.
accountsWriting sync.WaitGroup
// accountsCommitting is set when trackers registry writing accounts into DB.
accountsCommitting atomic.Bool

// dbRound is always exactly accountsRound(),
// cached to avoid SQL queries.
Expand All @@ -196,8 +199,16 @@ type trackerRegistry struct {
lastFlushTime time.Time

cfg config.Local

// maxAccountDeltas is a maximum number of in-memory deltas stored by trackers.
// When exceeded trackerRegistry will attempt to flush, and its Available() method will return false.
// Too many in-memory deltas could cause the node to run out of memory.
maxAccountDeltas uint64
}

// defaultMaxAccountDeltas is a default value for maxAccountDeltas.
const defaultMaxAccountDeltas = 256

// deferredCommitRange is used during the calls to produceCommittingTask, and used as a data structure
// to syncronize the various trackers and create a uniformity around which rounds need to be persisted
// next.
Expand Down Expand Up @@ -285,26 +296,18 @@ func (dcc deferredCommitContext) newBase() basics.Round {
return dcc.oldBase + basics.Round(dcc.offset)
}

var errMissingAccountUpdateTracker = errors.New("initializeTrackerCaches : called without a valid accounts update tracker")
var errMissingAccountUpdateTracker = errors.New("trackers replay : called without a valid accounts update tracker")

func (tr *trackerRegistry) initialize(l ledgerForTracker, trackers []ledgerTracker, cfg config.Local) (err error) {
tr.mu.Lock()
defer tr.mu.Unlock()
tr.dbs = l.trackerDB()
tr.log = l.trackerLog()

err = tr.dbs.Snapshot(func(ctx context.Context, tx trackerdb.SnapshotScope) (err error) {
ar, err := tx.MakeAccountsReader()
if err != nil {
return err
}

tr.dbRound, err = ar.AccountsRound()
return err
})

if err != nil {
return err
tr.maxAccountDeltas = defaultMaxAccountDeltas
if cfg.MaxAcctLookback > tr.maxAccountDeltas {
tr.maxAccountDeltas = cfg.MaxAcctLookback + 1
tr.log.Infof("maxAccountDeltas was overridden to %d because of MaxAcctLookback=%d: this combination might use lots of RAM. To preserve some blocks in blockdb consider using MaxBlockHistoryLookback config option instead of MaxAcctLookback", tr.maxAccountDeltas, cfg.MaxAcctLookback)
}

tr.ctx, tr.ctxCancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -333,24 +336,38 @@ func (tr *trackerRegistry) initialize(l ledgerForTracker, trackers []ledgerTrack
}

func (tr *trackerRegistry) loadFromDisk(l ledgerForTracker) error {
var dbRound basics.Round
err := tr.dbs.Snapshot(func(ctx context.Context, tx trackerdb.SnapshotScope) (err error) {
ar, err0 := tx.MakeAccountsReader()
if err0 != nil {
return err0
}

dbRound, err0 = ar.AccountsRound()
return err0
})
if err != nil {
return err
}

tr.mu.RLock()
dbRound := tr.dbRound
tr.dbRound = dbRound
tr.mu.RUnlock()

for _, lt := range tr.trackers {
err := lt.loadFromDisk(l, dbRound)
if err != nil {
err0 := lt.loadFromDisk(l, dbRound)
if err0 != nil {
// find the tracker name.
trackerName := reflect.TypeOf(lt).String()
return fmt.Errorf("tracker %s failed to loadFromDisk : %w", trackerName, err)
return fmt.Errorf("tracker %s failed to loadFromDisk : %w", trackerName, err0)
}
}

err := tr.replay(l)
if err != nil {
err = fmt.Errorf("initializeTrackerCaches failed : %w", err)
if err0 := tr.replay(l); err0 != nil {
return fmt.Errorf("trackers replay failed : %w", err0)
}
return err

return nil
}

func (tr *trackerRegistry) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) {
Expand Down Expand Up @@ -456,6 +473,20 @@ func (tr *trackerRegistry) waitAccountsWriting() {
tr.accountsWriting.Wait()
}

func (tr *trackerRegistry) isBehindCommittingDeltas(latest basics.Round) bool {
tr.mu.RLock()
dbRound := tr.dbRound
tr.mu.RUnlock()

numDeltas := uint64(latest.SubSaturate(dbRound))
if numDeltas < tr.maxAccountDeltas {
return false
}

// there is a large number of deltas check if commitSyncer is not writing accounts
return tr.accountsCommitting.Load()
Comment thread
cce marked this conversation as resolved.
}

func (tr *trackerRegistry) close() {
if tr.ctxCancel != nil {
tr.ctxCancel()
Expand Down Expand Up @@ -562,6 +593,11 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) error {
start := time.Now()
ledgerCommitroundCount.Inc(nil)
err = tr.dbs.Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) (err error) {
Comment thread
gmalouf marked this conversation as resolved.
tr.accountsCommitting.Store(true)
defer func() {
tr.accountsCommitting.Store(false)
}()

aw, err := tx.MakeAccountsWriter()
if err != nil {
return err
Expand Down
Loading