Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ledger/acctupdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ type accountUpdates struct {
// i.e., totals is one longer than deltas.
roundTotals []ledgercore.AccountTotals

// roundDigest stores the digest of the block for every round starting with dbRound and every round after it.
roundDigest []crypto.Digest

// log copied from ledger
log logging.Logger

Expand Down Expand Up @@ -717,6 +720,7 @@ func (au *accountUpdates) initializeFromDisk(l ledgerForTracker, lastBalancesRou
au.accounts = make(map[basics.Address]modifiedAccount)
au.creatables = make(map[basics.CreatableIndex]ledgercore.ModifiedCreatable)
au.deltasAccum = []int{0}
au.roundDigest = nil

au.baseAccounts.init(au.log, baseAccountsPendingAccountsBufferSize, baseAccountsPendingAccountsWarnThreshold)
return
Expand All @@ -738,6 +742,7 @@ func (au *accountUpdates) newBlockImpl(blk bookkeeping.Block, delta ledgercore.S
au.deltas = append(au.deltas, delta.Accts)
au.versions = append(au.versions, blk.CurrentProtocol)
au.creatableDeltas = append(au.creatableDeltas, delta.Creatables)
au.roundDigest = append(au.roundDigest, blk.Digest())
au.deltasAccum = append(au.deltasAccum, delta.Accts.Len()+au.deltasAccum[len(au.deltasAccum)-1])

au.baseAccounts.flushPendingWrites()
Expand Down Expand Up @@ -1085,6 +1090,10 @@ func (au *accountUpdates) prepareCommit(dcc *deferredCommitContext) error {
return fmt.Errorf("attempted to commit series of rounds with non-uniform consensus versions")
}

if dcc.isCatchpointRound {
dcc.committedRoundDigest = au.roundDigest[offset+uint64(dcc.lookback)-1]
}

// compact all the deltas - when we're trying to persist multiple rounds, we might have the same account
// being updated multiple times. When that happen, we can safely omit the intermediate updates.
dcc.compactAccountDeltas = makeCompactAccountDeltas(dcc.deltas, au.baseAccounts)
Expand Down Expand Up @@ -1216,6 +1225,7 @@ func (au *accountUpdates) postCommit(ctx context.Context, dcc *deferredCommitCon

au.deltas = au.deltas[offset:]
au.deltasAccum = au.deltasAccum[offset:]
au.roundDigest = au.roundDigest[offset:]
au.versions = au.versions[offset:]
au.roundTotals = au.roundTotals[offset:]
au.creatableDeltas = au.creatableDeltas[offset:]
Expand Down
20 changes: 2 additions & 18 deletions ledger/catchpointtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,6 @@ type catchpointTracker struct {

// catchpointsMu is the synchronization mutex for accessing the various non-static variables.
catchpointsMu deadlock.RWMutex

// roundDigest stores the digest of the block for every round starting with dbRound and every round after it.
roundDigest []crypto.Digest
}

// initialize initializes the catchpointTracker structure
Expand Down Expand Up @@ -159,7 +156,6 @@ func (ct *catchpointTracker) loadFromDisk(l ledgerForTracker, lastBalancesRound
ct.log = l.trackerLog()
ct.dbs = l.trackerDB()

ct.roundDigest = nil
ct.catchpointWriting = 0
// keep these channel closed if we're not generating catchpoint
ct.catchpointSlowWriting = make(chan struct{}, 1)
Expand Down Expand Up @@ -220,9 +216,6 @@ func (ct *catchpointTracker) loadFromDisk(l ledgerForTracker, lastBalancesRound
// newBlock informs the tracker of a new block from round
// rnd and a given ledgercore.StateDelta as produced by BlockEvaluator.
func (ct *catchpointTracker) newBlock(blk bookkeeping.Block, delta ledgercore.StateDelta) {
ct.catchpointsMu.Lock()
defer ct.catchpointsMu.Unlock()
ct.roundDigest = append(ct.roundDigest, blk.Digest())
}

// committedUpTo implements the ledgerTracker interface for catchpointTracker.
Expand Down Expand Up @@ -294,11 +287,6 @@ func (ct *catchpointTracker) produceCommittingTask(committedRound basics.Round,
// prepareCommit, commitRound and postCommit are called when it is time to commit tracker's data.
// If an error returned the process is aborted.
func (ct *catchpointTracker) prepareCommit(dcc *deferredCommitContext) error {
ct.catchpointsMu.RLock()
defer ct.catchpointsMu.RUnlock()
if dcc.isCatchpointRound {
dcc.committedRoundDigest = ct.roundDigest[dcc.offset+uint64(dcc.lookback)-1]
}
return nil
}

Expand Down Expand Up @@ -380,20 +368,16 @@ func (ct *catchpointTracker) postCommit(ctx context.Context, dcc *deferredCommit
}

if dcc.isCatchpointRound && dcc.catchpointLabel != "" {
ct.catchpointsMu.Lock()
ct.lastCatchpointLabel = dcc.catchpointLabel
ct.catchpointsMu.Unlock()
}
dcc.updatingBalancesDuration = time.Since(dcc.flushTime)

if dcc.updateStats {
dcc.stats.MemoryUpdatesDuration = time.Duration(time.Now().UnixNano())
}

ct.catchpointsMu.Lock()

ct.roundDigest = ct.roundDigest[dcc.offset:]

ct.catchpointsMu.Unlock()

if dcc.isCatchpointRound && ct.archivalLedger && dcc.catchpointLabel != "" {
// generate the catchpoint file. This need to be done inline so that it will block any new accounts that from being written.
// the generateCatchpoint expects that the accounts data would not be modified in the background during it's execution.
Expand Down
30 changes: 0 additions & 30 deletions ledger/catchpointtracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,7 @@ func TestReproducibleCatchpointLabels(t *testing.T) {
}

// test to see that after loadFromDisk, all the tracker content is lost ( as expected )
require.NotZero(t, len(ct.roundDigest))
require.NoError(t, ct.loadFromDisk(ml, ml.Latest()))
require.Zero(t, len(ct.roundDigest))
require.Zero(t, ct.catchpointWriting)
select {
case _, closed := <-ct.catchpointSlowWriting:
Expand All @@ -385,31 +383,3 @@ func TestReproducibleCatchpointLabels(t *testing.T) {
require.FailNow(t, "The catchpointSlowWriting should have been a closed channel; it seems to be a nil ?!")
}
}

func TestCatchpointTrackerPrepareCommit(t *testing.T) {
partitiontest.PartitionTest(t)

ct := &catchpointTracker{}
const maxOffset = 40
const maxLookback = 320
ct.roundDigest = make([]crypto.Digest, maxOffset+maxLookback)
for i := 0; i < len(ct.roundDigest); i++ {
ct.roundDigest[i] = crypto.Hash([]byte{byte(i), byte(i / 256)})
}
dcc := &deferredCommitContext{}
for offset := uint64(1); offset < maxOffset; offset++ {
dcc.offset = offset
for lookback := basics.Round(0); lookback < maxLookback; lookback += 20 {
dcc.lookback = lookback
for _, isCatchpointRound := range []bool{false, true} {
dcc.isCatchpointRound = isCatchpointRound
require.NoError(t, ct.prepareCommit(dcc))
if isCatchpointRound {
expectedRound := offset + uint64(lookback) - 1
expectedHash := crypto.Hash([]byte{byte(expectedRound), byte(expectedRound / 256)})
require.Equal(t, expectedHash[:], dcc.committedRoundDigest[:])
}
}
}
}
}
11 changes: 10 additions & 1 deletion ledger/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,14 +464,23 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) {
return
}

var ctt ledgerTracker
tr.mu.Lock()
tr.dbRound = newBase
for _, lt := range tr.trackers {
lt.postCommit(tr.ctx, dcc)
if ct, ok := lt.(*catchpointTracker); ok {
ctt = ct
} else {
lt.postCommit(tr.ctx, dcc)
}
}
tr.lastFlushTime = dcc.flushTime
tr.mu.Unlock()

if ctt != nil {
// run catchpoint tracker's postCommit without a lock as potentially long operation
ctt.postCommit(tr.ctx, dcc)
}
}

// initializeTrackerCaches fills up the accountUpdates cache with the most recent ~320 blocks ( on normal execution ).
Expand Down