From 8dcf07504a6b88e5685b0ddc945412d53aaa9446 Mon Sep 17 00:00:00 2001 From: chris erway Date: Wed, 5 Jan 2022 20:12:21 -0500 Subject: [PATCH 1/4] add TestAcctUpdatesLookupRetry --- ledger/acctupdates_test.go | 142 +++++++++++++++++++++++++++++++ ledger/catchpointtracker_test.go | 5 +- 2 files changed, 145 insertions(+), 2 deletions(-) diff --git a/ledger/acctupdates_test.go b/ledger/acctupdates_test.go index 27306af539..d025178c0e 100644 --- a/ledger/acctupdates_test.go +++ b/ledger/acctupdates_test.go @@ -1677,3 +1677,145 @@ func TestConsecutiveVersion(t *testing.T) { protocol.ConsensusV21, } } + +func TestAcctUpdatesLookupRetry(t *testing.T) { + partitiontest.PartitionTest(t) + + testProtocolVersion := protocol.ConsensusVersion("test-protocol-TestAcctUpdatesLookupRetry") + proto := config.Consensus[protocol.ConsensusCurrentVersion] + proto.MaxBalLookback = 10 + config.Consensus[testProtocolVersion] = proto + defer func() { + delete(config.Consensus, testProtocolVersion) + }() + + accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)} + rewardsLevels := []uint64{0} + + pooldata := basics.AccountData{} + pooldata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000 + pooldata.Status = basics.NotParticipating + accts[0][testPoolAddr] = pooldata + + sinkdata := basics.AccountData{} + sinkdata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000 + sinkdata.Status = basics.NotParticipating + accts[0][testSinkAddr] = sinkdata + + ml := makeMockLedgerForTracker(t, false, 10, testProtocolVersion, accts) + defer ml.Close() + + conf := config.GetDefaultLocal() + au := newAcctUpdates(t, ml, conf, ".") + defer au.close() + + // cover 10 genesis blocks + rewardLevel := uint64(0) + for i := 1; i < 10; i++ { + accts = append(accts, accts[0]) + rewardsLevels = append(rewardsLevels, rewardLevel) + } + + checkAcctUpdates(t, au, 0, 9, accts, rewardsLevels, proto) + + // lastCreatableID stores asset or app max used index to get rid of conflicts + lastCreatableID := crypto.RandUint64() % 512 + knownCreatables := make(map[basics.CreatableIndex]bool) + + for i := basics.Round(10); i < basics.Round(proto.MaxBalLookback+15); i++ { + rewardLevelDelta := crypto.RandUint64() % 5 + rewardLevel += rewardLevelDelta + var updates ledgercore.AccountDeltas + var totals map[basics.Address]basics.AccountData + base := accts[i-1] + updates, totals, lastCreatableID = ledgertesting.RandomDeltasBalancedFull(1, base, rewardLevel, lastCreatableID) + prevTotals, err := au.Totals(basics.Round(i - 1)) + require.NoError(t, err) + + newPool := totals[testPoolAddr] + newPool.MicroAlgos.Raw -= prevTotals.RewardUnits() * rewardLevelDelta + updates.Upsert(testPoolAddr, newPool) + totals[testPoolAddr] = newPool + + blk := bookkeeping.Block{ + BlockHeader: bookkeeping.BlockHeader{ + Round: basics.Round(i), + }, + } + blk.RewardsLevel = rewardLevel + blk.CurrentProtocol = testProtocolVersion + + delta := ledgercore.MakeStateDelta(&blk.BlockHeader, 0, updates.Len(), 0) + delta.Accts.MergeAccounts(updates) + delta.Creatables = creatablesFromUpdates(base, updates, knownCreatables) + delta.Totals = accumulateTotals(t, testProtocolVersion, []map[basics.Address]basics.AccountData{totals}, rewardLevel) + au.newBlock(blk, delta) + accts = append(accts, totals) + rewardsLevels = append(rewardsLevels, rewardLevel) + + checkAcctUpdates(t, au, 0, i, accts, rewardsLevels, proto) + } + + flushRound := func(i basics.Round) { + // Clear the timer to ensure a flush + ml.trackers.lastFlushTime = time.Time{} + + ml.trackers.committedUpTo(basics.Round(proto.MaxBalLookback) + i) + ml.trackers.waitAccountsWriting() + //checkAcctUpdates(t, au, i, basics.Round(proto.MaxBalLookback+14), accts, rewardsLevels, proto) + } + + // flush a couple of rounds (indirectly schedules commitSyncer) + flushRound(basics.Round(0)) + flushRound(basics.Round(1)) + + // add stallingTracker to list of trackers + stallingTracker := &blockingTracker{ + postCommitUnlockedEntryLock: make(chan struct{}), + postCommitUnlockedReleaseLock: make(chan struct{}), + postCommitEntryLock: make(chan struct{}), + postCommitReleaseLock: make(chan struct{}), + alwaysLock: true, + } + ml.trackers.trackers = append([]ledgerTracker{stallingTracker}, ml.trackers.trackers...) + + // kick off another round + go flushRound(basics.Round(2)) + + // let stallingTracker enter postCommit() and block (waiting on postCommitReleaseLock) + // this will prevent accountUpdates.postCommit() from updating au.cachedDBRound = newBase + <-stallingTracker.postCommitEntryLock + + // prune the baseAccounts cache, so that lookup will fall through to the DB + au.accountsMu.Lock() + au.baseAccounts.prune(0) + au.accountsMu.Unlock() + + rnd := basics.Round(2) + + // grab any address and data to use for call to lookup + var addr basics.Address + var data basics.AccountData + for a, d := range accts[rnd] { + addr = a + data = d + break + } + + // release the postCommit lock, once au.lookupWithoutRewards hits au.accountsReadCond.Wait() + go func() { + time.Sleep(200 * time.Millisecond) + stallingTracker.postCommitReleaseLock <- struct{}{} + }() + + // issue a LookupWithoutRewards while persistedData.round != au.cachedDBRound + d, validThrough, err := au.LookupWithoutRewards(rnd, addr) + require.NoError(t, err) + require.Equal(t, d, data) + require.GreaterOrEqualf(t, uint64(validThrough), uint64(rnd), "validThrough: %v rnd :%v", validThrough, rnd) + t.Log("validThrough", validThrough) + + // allow the postCommitUnlocked() handler to go through + <-stallingTracker.postCommitUnlockedEntryLock + stallingTracker.postCommitUnlockedReleaseLock <- struct{}{} +} diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go index d2188f2e3f..affa5a08f4 100644 --- a/ledger/catchpointtracker_test.go +++ b/ledger/catchpointtracker_test.go @@ -424,6 +424,7 @@ type blockingTracker struct { postCommitEntryLock chan struct{} postCommitReleaseLock chan struct{} committedUpToRound int64 + alwaysLock bool } // loadFromDisk is not implemented in the blockingTracker. @@ -458,7 +459,7 @@ func (bt *blockingTracker) commitRound(context.Context, *sql.Tx, *deferredCommit // postCommit implements entry/exit blockers, designed for testing. func (bt *blockingTracker) postCommit(ctx context.Context, dcc *deferredCommitContext) { - if dcc.isCatchpointRound && dcc.catchpointLabel != "" { + if bt.alwaysLock || (dcc.isCatchpointRound && dcc.catchpointLabel != "") { bt.postCommitEntryLock <- struct{}{} <-bt.postCommitReleaseLock } @@ -466,7 +467,7 @@ func (bt *blockingTracker) postCommit(ctx context.Context, dcc *deferredCommitCo // postCommitUnlocked implements entry/exit blockers, designed for testing. func (bt *blockingTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) { - if dcc.isCatchpointRound && dcc.catchpointLabel != "" { + if bt.alwaysLock || (dcc.isCatchpointRound && dcc.catchpointLabel != "") { bt.postCommitUnlockedEntryLock <- struct{}{} <-bt.postCommitUnlockedReleaseLock } From 4b590ea809becc495c528f617281f16f179eccf1 Mon Sep 17 00:00:00 2001 From: chris erway Date: Thu, 6 Jan 2022 14:26:54 -0500 Subject: [PATCH 2/4] clean up test from CR comments --- ledger/acctupdates_test.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/ledger/acctupdates_test.go b/ledger/acctupdates_test.go index cdde534a09..5f014985c6 100644 --- a/ledger/acctupdates_test.go +++ b/ledger/acctupdates_test.go @@ -1701,6 +1701,15 @@ func TestConsecutiveVersion(t *testing.T) { } } +// This test attempts to cover the case when an accountUpdates.lookupX method: +// - can't find the requested address, +// - falls through looking at deltas and the LRU accounts cache, +// - then hits the database (calling accountsDbQueries.lookup) +// only to discover that the round stored in the database (committed in accountUpdates.commitRound) +// is out of sync with accountUpdates.cachedDBRound (updated a little bit later in accountUpdates.postCommit). +// +// In this case it waits on a condition variable and retries when +// commitSyncer/accountUpdates has advanced the cachedDBRound. func TestAcctUpdatesLookupRetry(t *testing.T) { partitiontest.PartitionTest(t) @@ -1785,7 +1794,6 @@ func TestAcctUpdatesLookupRetry(t *testing.T) { ml.trackers.committedUpTo(basics.Round(proto.MaxBalLookback) + i) ml.trackers.waitAccountsWriting() - //checkAcctUpdates(t, au, i, basics.Round(proto.MaxBalLookback+14), accts, rewardsLevels, proto) } // flush a couple of rounds (indirectly schedules commitSyncer) @@ -1836,7 +1844,6 @@ func TestAcctUpdatesLookupRetry(t *testing.T) { require.NoError(t, err) require.Equal(t, d, data) require.GreaterOrEqualf(t, uint64(validThrough), uint64(rnd), "validThrough: %v rnd :%v", validThrough, rnd) - t.Log("validThrough", validThrough) // allow the postCommitUnlocked() handler to go through <-stallingTracker.postCommitUnlockedEntryLock From e8e981d68b2f4a2cb4a6201d4961d367ae538b45 Mon Sep 17 00:00:00 2001 From: chris erway Date: Thu, 6 Jan 2022 16:40:52 -0500 Subject: [PATCH 3/4] provide slightly more coverage for TestAcctUpdatesLookupRetry --- ledger/acctupdates_test.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/ledger/acctupdates_test.go b/ledger/acctupdates_test.go index 5f014985c6..4dedabc355 100644 --- a/ledger/acctupdates_test.go +++ b/ledger/acctupdates_test.go @@ -1833,19 +1833,25 @@ func TestAcctUpdatesLookupRetry(t *testing.T) { break } - // release the postCommit lock, once au.lookupWithoutRewards hits au.accountsReadCond.Wait() + defer func() { // allow the postCommitUnlocked() handler to go through, even if test fails + <-stallingTracker.postCommitUnlockedEntryLock + stallingTracker.postCommitUnlockedReleaseLock <- struct{}{} + }() + + // issue a LookupWithoutRewards while persistedData.round != au.cachedDBRound + // when synchronized=false it will fail fast + d, validThrough, err := au.lookupWithoutRewards(rnd, addr, false) + require.Equal(t, err, &MismatchingDatabaseRoundError{databaseRound: 2, memoryRound: 1}) + + // release the postCommit lock, once au.lookupWithoutRewards() hits au.accountsReadCond.Wait() go func() { time.Sleep(200 * time.Millisecond) stallingTracker.postCommitReleaseLock <- struct{}{} }() - // issue a LookupWithoutRewards while persistedData.round != au.cachedDBRound - d, validThrough, err := au.LookupWithoutRewards(rnd, addr) + // when synchronized=true it will wait until above goroutine releases postCommitReleaseLock + d, validThrough, err = au.lookupWithoutRewards(rnd, addr, true) require.NoError(t, err) require.Equal(t, d, data) require.GreaterOrEqualf(t, uint64(validThrough), uint64(rnd), "validThrough: %v rnd :%v", validThrough, rnd) - - // allow the postCommitUnlocked() handler to go through - <-stallingTracker.postCommitUnlockedEntryLock - stallingTracker.postCommitUnlockedReleaseLock <- struct{}{} } From f24fc2a9b2939f3248e9a3e71fd44e4ac1451d51 Mon Sep 17 00:00:00 2001 From: chris erway Date: Fri, 7 Jan 2022 10:46:26 -0500 Subject: [PATCH 4/4] use in-memory DB for TestAcctUpdatesLookupRetry --- ledger/acctupdates_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ledger/acctupdates_test.go b/ledger/acctupdates_test.go index 4dedabc355..f1434b02f3 100644 --- a/ledger/acctupdates_test.go +++ b/ledger/acctupdates_test.go @@ -1734,7 +1734,7 @@ func TestAcctUpdatesLookupRetry(t *testing.T) { sinkdata.Status = basics.NotParticipating accts[0][testSinkAddr] = sinkdata - ml := makeMockLedgerForTracker(t, false, 10, testProtocolVersion, accts) + ml := makeMockLedgerForTracker(t, true, 10, testProtocolVersion, accts) defer ml.Close() conf := config.GetDefaultLocal()