diff --git a/ledger/acctupdates.go b/ledger/acctupdates.go index 16d933fbb5..480f62d377 100644 --- a/ledger/acctupdates.go +++ b/ledger/acctupdates.go @@ -133,6 +133,9 @@ type accountUpdates struct { // i.e., totals is one longer than deltas. roundTotals []ledgercore.AccountTotals + // roundDigest stores the digest of the block for every round starting with dbRound and every round after it. + roundDigest []crypto.Digest + // log copied from ledger log logging.Logger @@ -717,6 +720,7 @@ func (au *accountUpdates) initializeFromDisk(l ledgerForTracker, lastBalancesRou au.accounts = make(map[basics.Address]modifiedAccount) au.creatables = make(map[basics.CreatableIndex]ledgercore.ModifiedCreatable) au.deltasAccum = []int{0} + au.roundDigest = nil au.baseAccounts.init(au.log, baseAccountsPendingAccountsBufferSize, baseAccountsPendingAccountsWarnThreshold) return @@ -738,6 +742,7 @@ func (au *accountUpdates) newBlockImpl(blk bookkeeping.Block, delta ledgercore.S au.deltas = append(au.deltas, delta.Accts) au.versions = append(au.versions, blk.CurrentProtocol) au.creatableDeltas = append(au.creatableDeltas, delta.Creatables) + au.roundDigest = append(au.roundDigest, blk.Digest()) au.deltasAccum = append(au.deltasAccum, delta.Accts.Len()+au.deltasAccum[len(au.deltasAccum)-1]) au.baseAccounts.flushPendingWrites() @@ -1085,6 +1090,10 @@ func (au *accountUpdates) prepareCommit(dcc *deferredCommitContext) error { return fmt.Errorf("attempted to commit series of rounds with non-uniform consensus versions") } + if dcc.isCatchpointRound { + dcc.committedRoundDigest = au.roundDigest[offset+uint64(dcc.lookback)-1] + } + // compact all the deltas - when we're trying to persist multiple rounds, we might have the same account // being updated multiple times. When that happen, we can safely omit the intermediate updates. dcc.compactAccountDeltas = makeCompactAccountDeltas(dcc.deltas, au.baseAccounts) @@ -1216,6 +1225,7 @@ func (au *accountUpdates) postCommit(ctx context.Context, dcc *deferredCommitCon au.deltas = au.deltas[offset:] au.deltasAccum = au.deltasAccum[offset:] + au.roundDigest = au.roundDigest[offset:] au.versions = au.versions[offset:] au.roundTotals = au.roundTotals[offset:] au.creatableDeltas = au.creatableDeltas[offset:] diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go index e70b526da2..8be735b114 100644 --- a/ledger/catchpointtracker.go +++ b/ledger/catchpointtracker.go @@ -110,9 +110,6 @@ type catchpointTracker struct { // catchpointsMu is the synchronization mutex for accessing the various non-static variables. catchpointsMu deadlock.RWMutex - - // roundDigest stores the digest of the block for every round starting with dbRound and every round after it. - roundDigest []crypto.Digest } // initialize initializes the catchpointTracker structure @@ -159,7 +156,6 @@ func (ct *catchpointTracker) loadFromDisk(l ledgerForTracker, lastBalancesRound ct.log = l.trackerLog() ct.dbs = l.trackerDB() - ct.roundDigest = nil ct.catchpointWriting = 0 // keep these channel closed if we're not generating catchpoint ct.catchpointSlowWriting = make(chan struct{}, 1) @@ -220,9 +216,6 @@ func (ct *catchpointTracker) loadFromDisk(l ledgerForTracker, lastBalancesRound // newBlock informs the tracker of a new block from round // rnd and a given ledgercore.StateDelta as produced by BlockEvaluator. func (ct *catchpointTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) { - ct.catchpointsMu.Lock() - defer ct.catchpointsMu.Unlock() - ct.roundDigest = append(ct.roundDigest, blk.Digest()) } // committedUpTo implements the ledgerTracker interface for catchpointTracker. @@ -294,11 +287,6 @@ func (ct *catchpointTracker) produceCommittingTask(committedRound basics.Round, // prepareCommit, commitRound and postCommit are called when it is time to commit tracker's data. // If an error returned the process is aborted. func (ct *catchpointTracker) prepareCommit(dcc *deferredCommitContext) error { - ct.catchpointsMu.RLock() - defer ct.catchpointsMu.RUnlock() - if dcc.isCatchpointRound { - dcc.committedRoundDigest = ct.roundDigest[dcc.offset+uint64(dcc.lookback)-1] - } return nil } @@ -380,7 +368,9 @@ func (ct *catchpointTracker) postCommit(ctx context.Context, dcc *deferredCommit } if dcc.isCatchpointRound && dcc.catchpointLabel != "" { + ct.catchpointsMu.Lock() ct.lastCatchpointLabel = dcc.catchpointLabel + ct.catchpointsMu.Unlock() } dcc.updatingBalancesDuration = time.Since(dcc.flushTime) @@ -388,12 +378,6 @@ func (ct *catchpointTracker) postCommit(ctx context.Context, dcc *deferredCommit dcc.stats.MemoryUpdatesDuration = time.Duration(time.Now().UnixNano()) } - ct.catchpointsMu.Lock() - - ct.roundDigest = ct.roundDigest[dcc.offset:] - - ct.catchpointsMu.Unlock() - if dcc.isCatchpointRound && ct.archivalLedger && dcc.catchpointLabel != "" { // generate the catchpoint file. This need to be done inline so that it will block any new accounts that from being written. // the generateCatchpoint expects that the accounts data would not be modified in the background during it's execution. diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go index 64db5f275a..85003dcf01 100644 --- a/ledger/catchpointtracker_test.go +++ b/ledger/catchpointtracker_test.go @@ -374,9 +374,7 @@ func TestReproducibleCatchpointLabels(t *testing.T) { } // test to see that after loadFromDisk, all the tracker content is lost ( as expected ) - require.NotZero(t, len(ct.roundDigest)) require.NoError(t, ct.loadFromDisk(ml, ml.Latest())) - require.Zero(t, len(ct.roundDigest)) require.Zero(t, ct.catchpointWriting) select { case _, closed := <-ct.catchpointSlowWriting: @@ -385,31 +383,3 @@ func TestReproducibleCatchpointLabels(t *testing.T) { require.FailNow(t, "The catchpointSlowWriting should have been a closed channel; it seems to be a nil ?!") } } - -func TestCatchpointTrackerPrepareCommit(t *testing.T) { - partitiontest.PartitionTest(t) - - ct := &catchpointTracker{} - const maxOffset = 40 - const maxLookback = 320 - ct.roundDigest = make([]crypto.Digest, maxOffset+maxLookback) - for i := 0; i < len(ct.roundDigest); i++ { - ct.roundDigest[i] = crypto.Hash([]byte{byte(i), byte(i / 256)}) - } - dcc := &deferredCommitContext{} - for offset := uint64(1); offset < maxOffset; offset++ { - dcc.offset = offset - for lookback := basics.Round(0); lookback < maxLookback; lookback += 20 { - dcc.lookback = lookback - for _, isCatchpointRound := range []bool{false, true} { - dcc.isCatchpointRound = isCatchpointRound - require.NoError(t, ct.prepareCommit(dcc)) - if isCatchpointRound { - expectedRound := offset + uint64(lookback) - 1 - expectedHash := crypto.Hash([]byte{byte(expectedRound), byte(expectedRound / 256)}) - require.Equal(t, expectedHash[:], dcc.committedRoundDigest[:]) - } - } - } - } -} diff --git a/ledger/tracker.go b/ledger/tracker.go index 855995665d..c081c23143 100644 --- a/ledger/tracker.go +++ b/ledger/tracker.go @@ -464,14 +464,23 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) { return } + var ctt ledgerTracker tr.mu.Lock() tr.dbRound = newBase for _, lt := range tr.trackers { - lt.postCommit(tr.ctx, dcc) + if ct, ok := lt.(*catchpointTracker); ok { + ctt = ct + } else { + lt.postCommit(tr.ctx, dcc) + } } tr.lastFlushTime = dcc.flushTime tr.mu.Unlock() + if ctt != nil { + // run catchpoint tracker's postCommit without a lock as potentially long operation + ctt.postCommit(tr.ctx, dcc) + } } // initializeTrackerCaches fills up the accountUpdates cache with the most recent ~320 blocks ( on normal execution ).