diff --git a/ledger/bulletin.go b/ledger/bulletin.go index e806077745..5968c7f4f0 100644 --- a/ledger/bulletin.go +++ b/ledger/bulletin.go @@ -81,7 +81,10 @@ func (b *bulletin) Wait(round basics.Round) chan struct{} { } func (b *bulletin) loadFromDisk(l ledgerForTracker, _ basics.Round) error { - b.pendingNotificationRequests = make(map[basics.Round]notifier) + // We want to keep existing notification requests in memory if this flow is triggered by reloadLedger. + if b.pendingNotificationRequests == nil { + b.pendingNotificationRequests = make(map[basics.Round]notifier) + } b.latestRound = l.Latest() return nil } diff --git a/ledger/ledger_test.go b/ledger/ledger_test.go index 16024bdd9e..ffa90b3da4 100644 --- a/ledger/ledger_test.go +++ b/ledger/ledger_test.go @@ -27,6 +27,7 @@ import ( "runtime" "sort" "testing" + "time" "github.com/stretchr/testify/require" @@ -1477,6 +1478,31 @@ func benchLedgerCache(b *testing.B, startRound basics.Round) { } } +func triggerTrackerFlush(t *testing.T, l *Ledger, genesisInitState ledgercore.InitState) { + l.trackers.mu.RLock() + initialDbRound := l.trackers.dbRound + currentDbRound := initialDbRound + l.trackers.lastFlushTime = time.Time{} + l.trackers.mu.RUnlock() + + addEmptyValidatedBlock(t, l, genesisInitState.Accounts) + + const timeout = 2 * time.Second + started := time.Now() + + // We can't truly wait for scheduleCommit to take place, which means without waiting using sleeps + // we might beat scheduleCommit's addition to accountsWriting, making our wait on it continue immediately. + // The solution is to wait for the advancement of l.trackers.dbRound, which is a side effect of postCommit's success. + for currentDbRound == initialDbRound { + time.Sleep(50 * time.Microsecond) + require.True(t, time.Now().Sub(started) < timeout) + l.trackers.mu.RLock() + currentDbRound = l.trackers.dbRound + l.trackers.mu.RUnlock() + } + l.trackers.waitAccountsWriting() +} + func TestLedgerReload(t *testing.T) { partitiontest.PartitionTest(t) @@ -1513,6 +1539,36 @@ func TestLedgerReload(t *testing.T) { } } +func TestWaitLedgerReload(t *testing.T) { + partitiontest.PartitionTest(t) + a := require.New(t) + + dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64()) + genesisInitState, _ := ledgertesting.GenerateInitState(t, protocol.ConsensusCurrentVersion, 100) + const inMem = true + cfg := config.GetDefaultLocal() + cfg.MaxAcctLookback = 0 + log := logging.TestingLog(t) + log.SetLevel(logging.Info) + l, err := OpenLedger(log, dbName, inMem, genesisInitState, cfg) + require.NoError(t, err) + defer l.Close() + + waitRound := l.Latest() + 1 + waitChannel := l.Wait(waitRound) + + err = l.ReloadLedger() + a.NoError(err) + triggerTrackerFlush(t, l, genesisInitState) + + select { + case <-waitChannel: + return + default: + a.Failf("", "Wait channel did not receive an expected signal for round %d", waitRound) + } +} + // TestGetLastCatchpointLabel tests ledger.GetLastCatchpointLabel is returning the correct value. func TestGetLastCatchpointLabel(t *testing.T) { partitiontest.PartitionTest(t)