Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ledger/bulletin.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ func (b *bulletin) Wait(round basics.Round) chan struct{} {
}

func (b *bulletin) loadFromDisk(l ledgerForTracker, _ basics.Round) error {
b.pendingNotificationRequests = make(map[basics.Round]notifier)
// We want to keep existing notification requests in memory if this flow is triggered by reloadLedger.
if b.pendingNotificationRequests == nil {
b.pendingNotificationRequests = make(map[basics.Round]notifier)
}
b.latestRound = l.Latest()
return nil
}
Expand Down
56 changes: 56 additions & 0 deletions ledger/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"runtime"
"sort"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -1477,6 +1478,31 @@ func benchLedgerCache(b *testing.B, startRound basics.Round) {
}
}

func triggerTrackerFlush(t *testing.T, l *Ledger, genesisInitState ledgercore.InitState) {
l.trackers.mu.RLock()
initialDbRound := l.trackers.dbRound
currentDbRound := initialDbRound
l.trackers.lastFlushTime = time.Time{}
l.trackers.mu.RUnlock()

addEmptyValidatedBlock(t, l, genesisInitState.Accounts)

const timeout = 2 * time.Second
started := time.Now()

// We can't truly wait for scheduleCommit to take place, which means without waiting using sleeps
// we might beat scheduleCommit's addition to accountsWriting, making our wait on it continue immediately.
// The solution is to wait for the advancement of l.trackers.dbRound, which is a side effect of postCommit's success.
for currentDbRound == initialDbRound {
time.Sleep(50 * time.Microsecond)
require.True(t, time.Now().Sub(started) < timeout)
l.trackers.mu.RLock()
currentDbRound = l.trackers.dbRound
l.trackers.mu.RUnlock()
}
l.trackers.waitAccountsWriting()
}

func TestLedgerReload(t *testing.T) {
partitiontest.PartitionTest(t)

Expand Down Expand Up @@ -1513,6 +1539,36 @@ func TestLedgerReload(t *testing.T) {
}
}

func TestWaitLedgerReload(t *testing.T) {
partitiontest.PartitionTest(t)
a := require.New(t)

dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64())
genesisInitState, _ := ledgertesting.GenerateInitState(t, protocol.ConsensusCurrentVersion, 100)
const inMem = true
cfg := config.GetDefaultLocal()
cfg.MaxAcctLookback = 0
log := logging.TestingLog(t)
log.SetLevel(logging.Info)
l, err := OpenLedger(log, dbName, inMem, genesisInitState, cfg)
require.NoError(t, err)
defer l.Close()

waitRound := l.Latest() + 1
waitChannel := l.Wait(waitRound)

err = l.ReloadLedger()
a.NoError(err)
triggerTrackerFlush(t, l, genesisInitState)

select {
case <-waitChannel:
return
default:
a.Failf("", "Wait channel did not receive an expected signal for round %d", waitRound)
}
}

// TestGetLastCatchpointLabel tests ledger.GetLastCatchpointLabel is returning the correct value.
func TestGetLastCatchpointLabel(t *testing.T) {
partitiontest.PartitionTest(t)
Expand Down