Skip to content
43 changes: 34 additions & 9 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/algorand/go-algorand/network"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util/execpool"
"github.com/algorand/go-algorand/util/metrics"
)

const catchupPeersForSync = 10
Expand Down Expand Up @@ -63,6 +64,7 @@ type Ledger interface {
Block(basics.Round) (bookkeeping.Block, error)
BlockHdr(basics.Round) (bookkeeping.BlockHeader, error)
IsWritingCatchpointDataFile() bool
IsBehindCommittingDeltas() bool
Validate(ctx context.Context, blk bookkeeping.Block, executionPool execpool.BacklogPool) (*ledgercore.ValidatedBlock, error)
AddValidatedBlock(vb ledgercore.ValidatedBlock, cert agreement.Certificate) error
WaitMem(r basics.Round) chan struct{}
Expand All @@ -86,10 +88,10 @@ type Service struct {
deadlineTimeout time.Duration
blockValidationPool execpool.BacklogPool

// suspendForCatchpointWriting defines whether we've run into a state where the ledger is currently busy writing the
// catchpoint file. If so, we want to suspend the catchup process until the catchpoint file writing is complete,
// suspendForLedgerOps defines whether we've run into a state where the ledger is currently busy writing the
// catchpoint file or flushing accounts. If so, we want to suspend the catchup process until the catchpoint file writing is complete,
// and resume from there without stopping the catchup timer.
suspendForCatchpointWriting bool
suspendForLedgerOps bool

// The channel gets closed when the initial sync is complete. This allows for other services to avoid
// the overhead of starting prematurely (before this node is caught-up and can validate messages for example).
Expand Down Expand Up @@ -293,6 +295,7 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo

// Try to fetch, timing out after retryInterval
block, cert, blockDownloadDuration, err := s.innerFetch(ctx, r, peer)
totalBlocksFetched.Inc(nil)

if err != nil {
if err == errLedgerAlreadyHasBlock {
Expand Down Expand Up @@ -494,11 +497,26 @@ func (s *Service) pipelinedFetch(seedLookback uint64) {
return
}

// if ledger is busy, pause for some time to let the fetchAndWrite goroutines to finish fetching in-flight blocks.
start := time.Now()
for (s.ledger.IsWritingCatchpointDataFile() || s.ledger.IsBehindCommittingDeltas()) && time.Since(start) < s.deadlineTimeout {
time.Sleep(100 * time.Millisecond)
}

// if ledger is still busy after s.deadlineTimeout timeout then about the current pipelinedFetch invocation.

// if we're writing a catchpoint file, stop catching up to reduce the memory pressure. Once we finish writing the file we
// could resume with the catchup.
if s.ledger.IsWritingCatchpointDataFile() {
s.log.Info("Catchup is stopping due to catchpoint file being written")
s.suspendForCatchpointWriting = true
s.suspendForLedgerOps = true
return
}

// if the ledger has too many non-flushed account changes, stop catching up to reduce the memory pressure.
if s.ledger.IsBehindCommittingDeltas() {
s.log.Info("Catchup is stopping due to too many non-flushed account changes")
s.suspendForLedgerOps = true
return
}

Expand Down Expand Up @@ -555,10 +573,10 @@ func (s *Service) periodicSync() {
sleepDuration = time.Duration(crypto.RandUint63()) % s.deadlineTimeout
continue
case <-s.syncNow:
if s.parallelBlocks == 0 || s.ledger.IsWritingCatchpointDataFile() {
if s.parallelBlocks == 0 || s.ledger.IsWritingCatchpointDataFile() || s.ledger.IsBehindCommittingDeltas() {
continue
}
s.suspendForCatchpointWriting = false
s.suspendForLedgerOps = false
s.log.Info("Immediate resync triggered; resyncing")
s.sync()
case <-time.After(sleepDuration):
Expand All @@ -575,7 +593,12 @@ func (s *Service) periodicSync() {
// keep the existing sleep duration and try again later.
continue
}
s.suspendForCatchpointWriting = false
// if the ledger has too many non-flushed account changes, skip
if s.ledger.IsBehindCommittingDeltas() {
continue
}

s.suspendForLedgerOps = false
s.log.Info("It's been too long since our ledger advanced; resyncing")
s.sync()
case cert := <-s.unmatchedPendingCertificates:
Expand Down Expand Up @@ -630,7 +653,7 @@ func (s *Service) sync() {
initSync := false

// if the catchupWriting flag is set, it means that we aborted the sync due to the ledger writing the catchup file.
if !s.suspendForCatchpointWriting {
if !s.suspendForLedgerOps {
// in that case, don't change the timer so that the "timer" would keep running.
atomic.StoreInt64(&s.syncStartNS, 0)

Expand All @@ -641,7 +664,7 @@ func (s *Service) sync() {
}
}

elapsedTime := time.Now().Sub(start)
elapsedTime := time.Since(start)
s.log.EventWithDetails(telemetryspec.ApplicationState, telemetryspec.CatchupStopEvent, telemetryspec.CatchupStopEventDetails{
StartRound: uint64(pr),
EndRound: uint64(s.ledger.LastRound()),
Expand Down Expand Up @@ -834,3 +857,5 @@ func createPeerSelector(net network.GossipNode, cfg config.Local, pipelineFetch
}
return makePeerSelector(net, peerClasses)
}

var totalBlocksFetched = metrics.MakeCounter(metrics.MetricName{Name: "algod_catchup_blocks_fetched", Description: "Total number of blocks fetched with catchup"})
58 changes: 58 additions & 0 deletions catchup/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,18 @@ func (m *mockedLedger) IsWritingCatchpointDataFile() bool {
return false
}

func (m *mockedLedger) IsBehindCommittingDeltas() bool {
return false
}

type mockedBehindDeltasLedger struct {
mockedLedger
}

func (m *mockedBehindDeltasLedger) IsBehindCommittingDeltas() bool {
return true
}

func testingenvWithUpgrade(
t testing.TB,
numBlocks,
Expand Down Expand Up @@ -1127,3 +1139,49 @@ func TestDownloadBlocksToSupportStateProofs(t *testing.T) {
lookback = lookbackForStateproofsSupport(&topBlk)
assert.Equal(t, uint64(0), lookback)
}

// TestServiceLedgerUnavailable checks a local ledger that is unavailable cannot catchup up to remote round
func TestServiceLedgerUnavailable(t *testing.T) {
partitiontest.PartitionTest(t)

// Make Ledger
local := new(mockedBehindDeltasLedger)
local.blocks = append(local.blocks, bookkeeping.Block{})

remote, _, blk, err := buildTestLedger(t, bookkeeping.Block{})
if err != nil {
t.Fatal(err)
return
}
numBlocks := 10
addBlocks(t, remote, blk, numBlocks)

// Create a network and block service
blockServiceConfig := config.GetDefaultLocal()
net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
net.addPeer(rootURL)

require.Equal(t, basics.Round(0), local.LastRound())
require.Equal(t, basics.Round(numBlocks+1), remote.LastRound())

// Make Service
auth := &mockedAuthenticator{fail: false}
cfg := config.GetDefaultLocal()
cfg.CatchupParallelBlocks = 2
s := MakeService(logging.Base(), cfg, net, local, auth, nil, nil)
s.log = &periodicSyncLogger{Logger: logging.Base()}
s.deadlineTimeout = 2 * time.Second

s.testStart()
defer s.Stop()
s.sync()
require.Greater(t, local.LastRound(), basics.Round(0))
require.Less(t, local.LastRound(), remote.LastRound())
}
37 changes: 1 addition & 36 deletions ledger/catchpointtracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,7 @@ func TestCatchpointReproducibleLabels(t *testing.T) {

// blockingTracker is a testing tracker used to test "what if" a tracker would get blocked.
type blockingTracker struct {
emptyTracker
postCommitUnlockedEntryLock chan struct{}
postCommitUnlockedReleaseLock chan struct{}
postCommitEntryLock chan struct{}
Expand All @@ -783,36 +784,12 @@ type blockingTracker struct {
shouldLockPostCommitUnlocked atomic.Bool
}

// loadFromDisk is not implemented in the blockingTracker.
func (bt *blockingTracker) loadFromDisk(ledgerForTracker, basics.Round) error {
return nil
}

// newBlock is not implemented in the blockingTracker.
func (bt *blockingTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) {
}

// committedUpTo in the blockingTracker just stores the committed round.
func (bt *blockingTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) {
atomic.StoreInt64(&bt.committedUpToRound, int64(committedRnd))
return committedRnd, basics.Round(0)
}

// produceCommittingTask is not used by the blockingTracker
func (bt *blockingTracker) produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange {
return dcr
}

// prepareCommit, is not used by the blockingTracker
func (bt *blockingTracker) prepareCommit(*deferredCommitContext) error {
return nil
}

// commitRound is not used by the blockingTracker
func (bt *blockingTracker) commitRound(context.Context, trackerdb.TransactionScope, *deferredCommitContext) error {
return nil
}

// postCommit implements entry/exit blockers, designed for testing.
func (bt *blockingTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) {
if bt.alwaysLock.Load() || dcc.catchpointFirstStage || bt.shouldLockPostCommit.Load() {
Expand All @@ -829,18 +806,6 @@ func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferred
}
}

// control functions are not used by the blockingTracker
func (bt *blockingTracker) handleUnorderedCommit(dcc *deferredCommitContext) {
}
func (bt *blockingTracker) handlePrepareCommitError(dcc *deferredCommitContext) {
}
func (bt *blockingTracker) handleCommitError(dcc *deferredCommitContext) {
}

// close is not used by the blockingTracker
func (bt *blockingTracker) close() {
}

func TestCatchpointTrackerNonblockingCatchpointWriting(t *testing.T) {
partitiontest.PartitionTest(t)

Expand Down
6 changes: 6 additions & 0 deletions ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,12 @@ func (l *Ledger) LatestTrackerCommitted() basics.Round {
return l.trackers.getDbRound()
}

// IsBehindCommittingDeltas indicates if the ledger is behind expected number of in-memory deltas.
// It intended to slow down the catchup service when deltas overgrow some limit.
func (l *Ledger) IsBehindCommittingDeltas() bool {
return l.trackers.isBehindCommittingDeltas(l.Latest())
}

// DebuggerLedger defines the minimal set of method required for creating a debug balances.
type DebuggerLedger = eval.LedgerForCowBase

Expand Down
Loading