diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index f2b6e48fff2..9a2f9c65bb9 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -52,6 +52,7 @@ import ( "github.com/onflow/flow-go/fvm/systemcontracts" "github.com/onflow/flow-go/ledger/common/pathfinder" ledger "github.com/onflow/flow-go/ledger/complete" + "github.com/onflow/flow-go/ledger/complete/mtrie/trie" "github.com/onflow/flow-go/ledger/complete/wal" bootstrapFilenames "github.com/onflow/flow-go/model/bootstrap" "github.com/onflow/flow-go/model/encoding/cbor" @@ -206,6 +207,9 @@ func (e *ExecutionNodeBuilder) LoadComponentsAndModules() { executionDataService state_synchronization.ExecutionDataService executionDataCIDCache state_synchronization.ExecutionDataCIDCache executionDataCIDCacheSize uint = 100 + + ledgerInitialState []*trie.MTrie + ledgerInitialStateSegmentNum = -1 ) e.FlowNodeBuilder. @@ -382,18 +386,39 @@ func (e *ExecutionNodeBuilder) LoadComponentsAndModules() { } } - ledgerStorage, err = ledger.NewLedger(diskWAL, int(e.exeConf.mTrieCacheSize), collector, node.Logger.With().Str("subcomponent", - "ledger").Logger(), ledger.DefaultPathFinderVersion) + ledgerStorage, ledgerInitialState, ledgerInitialStateSegmentNum, err = ledger.NewSyncLedger( + diskWAL, + int(e.exeConf.mTrieCacheSize), + collector, + node.Logger.With().Str("subcomponent", "ledger").Logger(), + ledger.DefaultPathFinderVersion) + return ledgerStorage, err }). Component("execution state ledger WAL compactor", func(node *NodeConfig) (module.ReadyDoneAware, error) { - checkpointer, err := ledgerStorage.Checkpointer() if err != nil { return nil, fmt.Errorf("cannot create checkpointer: %w", err) } - compactor := wal.NewCompactor(checkpointer, - 10*time.Second, + + trieUpdateChan := ledgerStorage.SegmentTrieChan() + + if trieUpdateChan == nil { + compactor := wal.NewCompactor(checkpointer, + 10*time.Second, + e.exeConf.checkpointDistance, + e.exeConf.checkpointsToKeep, + node.Logger.With().Str("subcomponent", "checkpointer").Logger()) + + return compactor, nil + } + + compactor, err := wal.NewCachedCompactor( + checkpointer, + trieUpdateChan, + ledgerInitialState, + ledgerInitialStateSegmentNum, + int(e.exeConf.mTrieCacheSize), e.exeConf.checkpointDistance, e.exeConf.checkpointsToKeep, node.Logger.With().Str("subcomponent", "checkpointer").Logger()) diff --git a/ledger/complete/ledger.go b/ledger/complete/ledger.go index 61edc3d2c41..4bfac6a547e 100644 --- a/ledger/complete/ledger.go +++ b/ledger/complete/ledger.go @@ -21,6 +21,7 @@ import ( const DefaultCacheSize = 1000 const DefaultPathFinderVersion = 1 +const defaultSegmentUpdateChanSize = 500 // Ledger (complete) is a fast memory-efficient fork-aware thread-safe trie-based key/value storage. // Ledger holds an array of registers (key-value pairs) and keeps tracks of changes over a limited time. @@ -39,6 +40,7 @@ type Ledger struct { metrics module.LedgerMetrics logger zerolog.Logger pathFinderVersion uint8 + trieUpdateCh chan *wal.SegmentTrie } // NewLedger creates a new in-memory trie-backed ledger storage with persistence. @@ -86,6 +88,61 @@ func NewLedger( return storage, nil } +func NewSyncLedger(lwal wal.LedgerWAL, capacity int, metrics module.LedgerMetrics, log zerolog.Logger, pathFinderVer uint8) (*Ledger, []*trie.MTrie, int, error) { + + logger := log.With().Str("ledger", "complete").Logger() + + forest, err := mtrie.NewForest(capacity, metrics, func(evictedTrie *trie.MTrie) { + err := lwal.RecordDelete(evictedTrie.RootHash()) + if err != nil { + logger.Error().Err(err).Msg("failed to save delete record in wal") + } + }) + if err != nil { + return nil, nil, 0, fmt.Errorf("cannot create forest: %w", err) + } + + storage := &Ledger{ + forest: forest, + wal: lwal, + trieUpdateCh: make(chan *wal.SegmentTrie, defaultSegmentUpdateChanSize), + metrics: metrics, + logger: logger, + pathFinderVersion: pathFinderVer, + } + + // pause records to prevent double logging trie removals + lwal.PauseRecord() + defer lwal.UnpauseRecord() + + err = lwal.ReplayOnForest(forest) + if err != nil { + return nil, nil, 0, fmt.Errorf("cannot restore LedgerWAL: %w", err) + } + + lwal.UnpauseRecord() + + tries, err := forest.GetTries() + if err != nil { + return nil, nil, 0, fmt.Errorf("cannot get tries from forest: %w", err) + } + + _, to, err := lwal.Segments() + if err != nil { + return nil, nil, 0, fmt.Errorf("cannot get segment numbers: %w", err) + } + + // TODO update to proper value once https://github.com/onflow/flow-go/pull/3720 is merged + metrics.ForestApproxMemorySize(0) + + // TODO verify to-1 is correct because DiskWAL is created and passed in as LedgerWAL and DiskWAL creates new segment file. + return storage, tries, to - 1, nil +} + +func (l *Ledger) SegmentTrieChan() <-chan *wal.SegmentTrie { + return l.trieUpdateCh +} + // Ready implements interface module.ReadyDoneAware // it starts the EventLoop's internal processing loop. func (l *Ledger) Ready() <-chan struct{} { @@ -182,6 +239,11 @@ func (l *Ledger) Get(query *ledger.Query) (values []ledger.Value, err error) { return values, err } +type walUpdateResult struct { + segmentNum int + err error +} + // Set updates the ledger given an update // it returns the state after update and errors (if any) func (l *Ledger) Set(update *ledger.Update) (newState ledger.State, trieUpdate *ledger.TrieUpdate, err error) { @@ -200,20 +262,31 @@ func (l *Ledger) Set(update *ledger.Update) (newState ledger.State, trieUpdate * l.metrics.UpdateCount() - walChan := make(chan error) + walChan := make(chan walUpdateResult) go func() { - walChan <- l.wal.RecordUpdate(trieUpdate) + segmentNum, err := l.wal.RecordUpdate(trieUpdate) + walChan <- walUpdateResult{segmentNum, err} }() newRootHash, err := l.forest.Update(trieUpdate) - walError := <-walChan + walResult := <-walChan if err != nil { return ledger.State(hash.DummyHash), nil, fmt.Errorf("cannot update state: %w", err) } - if walError != nil { - return ledger.State(hash.DummyHash), nil, fmt.Errorf("error while writing LedgerWAL: %w", walError) + if walResult.err != nil { + return ledger.State(hash.DummyHash), nil, fmt.Errorf("error while writing LedgerWAL: %w", walResult.err) + } + + if l.trieUpdateCh != nil { + // Get updated trie from forest + trie, err := l.forest.GetTrie(newRootHash) + if err != nil { + return ledger.State(hash.DummyHash), nil, fmt.Errorf("cannot get updated trie: %w", err) + } + + l.trieUpdateCh <- &wal.SegmentTrie{Trie: trie, SegmentNum: walResult.segmentNum} } // TODO update to proper value once https://github.com/onflow/flow-go/pull/3720 is merged diff --git a/ledger/complete/ledger_test.go b/ledger/complete/ledger_test.go index 841a57b2991..22787d637ce 100644 --- a/ledger/complete/ledger_test.go +++ b/ledger/complete/ledger_test.go @@ -745,9 +745,9 @@ func TestWALUpdateIsRunInParallel(t *testing.T) { wg.Add(1) w := &LongRunningDummyWAL{ - updateFn: func(update *ledger.TrieUpdate) error { + updateFn: func(update *ledger.TrieUpdate) (int, error) { wg.Wait() //wg will let work after the trie has been updated - return nil + return 0, nil }, } @@ -795,8 +795,8 @@ func TestWALUpdateFailuresBubbleUp(t *testing.T) { theError := fmt.Errorf("error error") w := &LongRunningDummyWAL{ - updateFn: func(update *ledger.TrieUpdate) error { - return theError + updateFn: func(update *ledger.TrieUpdate) (int, error) { + return 0, theError }, } @@ -849,10 +849,10 @@ func migrationByValue(p []ledger.Payload) ([]ledger.Payload, error) { type LongRunningDummyWAL struct { fixtures.NoopWAL - updateFn func(update *ledger.TrieUpdate) error + updateFn func(update *ledger.TrieUpdate) (int, error) } -func (w *LongRunningDummyWAL) RecordUpdate(update *ledger.TrieUpdate) error { +func (w *LongRunningDummyWAL) RecordUpdate(update *ledger.TrieUpdate) (int, error) { return w.updateFn(update) } diff --git a/ledger/complete/wal/cached_compactor.go b/ledger/complete/wal/cached_compactor.go new file mode 100644 index 00000000000..0e02bf5b571 --- /dev/null +++ b/ledger/complete/wal/cached_compactor.go @@ -0,0 +1,435 @@ +package wal + +import ( + "fmt" + "sync" + "time" + + "github.com/onflow/flow-go/ledger/complete/mtrie" + "github.com/onflow/flow-go/ledger/complete/mtrie/trie" + "github.com/onflow/flow-go/module/lifecycle" + "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/module/observable" + "github.com/rs/zerolog" + "golang.org/x/sync/semaphore" +) + +const ( + defaultSegmentTrieSize = 500 + defaultTrieUpdateChannelBufSize = 500 + defaultSegmentUpdateChannelBufSize = 500 +) + +// SegmentTrie contains trie and WAL segment number it was recorded in. +type SegmentTrie struct { + Trie *trie.MTrie + SegmentNum int +} + +// segmentTries contains tries that were recorded in given WAL segment. +type segmentTries struct { + tries []*trie.MTrie + segmentNum int +} + +// newSegmentTries creates a segment with given segment number and tries recorded in given segment. +func newSegmentTries(segmentNum int, tries ...*trie.MTrie) *segmentTries { + segmentTries := &segmentTries{ + tries: make([]*trie.MTrie, 0, defaultSegmentTrieSize), + segmentNum: segmentNum, + } + segmentTries.tries = append(segmentTries.tries, tries...) + return segmentTries +} + +func (s *segmentTries) add(trie *trie.MTrie) { + s.tries = append(s.tries, trie) +} + +// activeSegmentTrieCompactor contains cached mtries in active segment. +// It receives and gathers updated trie and segment number for ledger update through trieUpdateCh channel. +// It sends batched trie updates for finalized segment through segmentUpdateCh channel. +type activeSegmentTrieCompactor struct { + sync.Mutex + logger zerolog.Logger + tries *segmentTries + trieUpdateCh <-chan *SegmentTrie + segmentUpdateCh chan<- *segmentTries + stopCh chan struct{} +} + +func newActiveSegmentTrieCompactor( + logger zerolog.Logger, + trieUpdateCh <-chan *SegmentTrie, + segmentUpdateCh chan<- *segmentTries, +) *activeSegmentTrieCompactor { + return &activeSegmentTrieCompactor{ + logger: logger, + stopCh: make(chan struct{}), + trieUpdateCh: trieUpdateCh, + segmentUpdateCh: segmentUpdateCh, + } +} + +func (c *activeSegmentTrieCompactor) stop() { + c.stopCh <- struct{}{} +} + +func (c *activeSegmentTrieCompactor) start() { +Loop: + for { + select { + case <-c.stopCh: + break Loop + case update := <-c.trieUpdateCh: + prevSegmentTries, err := c.update(update.Trie, update.SegmentNum) + if err != nil { + c.logger.Error().Err(err).Msg("error updating active segment trie") + continue + } + if prevSegmentTries != nil { + c.segmentUpdateCh <- prevSegmentTries + } + } + } + + // Drain remaining trie updates to remove trie references. + for range c.trieUpdateCh { + } +} + +func (c *activeSegmentTrieCompactor) update(trie *trie.MTrie, segmentNum int) (*segmentTries, error) { + c.Lock() + defer c.Unlock() + + if c.tries == nil { + c.tries = newSegmentTries(segmentNum, trie) + return nil, nil + } + + // Add to active segment tries cache + if segmentNum == c.tries.segmentNum { + c.tries.add(trie) + return nil, nil + } + + // New segment is created + if segmentNum != c.tries.segmentNum+1 { + return nil, fmt.Errorf("got segment number %d, want %d", segmentNum, c.tries.segmentNum+1) + } + + // Save previous segment tries + prevSegmentTries := c.tries + + // Create new segment tries + c.tries = newSegmentTries(segmentNum, trie) + + return prevSegmentTries, nil +} + +// segmentsTrieCompactor contains mtrie forest in finalized segments. +// When enough segments are accumulated, a new goroutine is created to create checkpoints async. +// At most one checkpoint goroutine is run at any given time. +// It receives and gathers updated tries in segment through segmentUpdateCh channel. +// It sends created checkpoint number through checkpointCh channel. +type segmentsTrieCompactor struct { + sync.Mutex + checkpointer *Checkpointer + logger zerolog.Logger + checkpointDistance uint + lastCheckpointNum int + + // forest contains mtries up to and including tries in forestSegmentNum. + forest *mtrie.Forest + forestSegmentNum int + + segmentUpdateCh <-chan *segmentTries + stopCh chan struct{} + checkpointCh chan<- int +} + +func newSegmentsTrieCompactor( + logger zerolog.Logger, + checkpointer *Checkpointer, + segmentUpdateCh <-chan *segmentTries, + checkpointCh chan<- int, + tries []*trie.MTrie, + segmentNum int, + checkpointForestCapacity int, + checkpointDistance uint, +) (*segmentsTrieCompactor, error) { + + forest, err := mtrie.NewForest(checkpointForestCapacity, &metrics.NoopCollector{}, nil) + if err != nil { + return nil, fmt.Errorf("cannot create Forest: %w", err) + } + err = forest.AddTries(tries) + if err != nil { + return nil, fmt.Errorf("cannot add tries to forest: %w", err) + } + + lastCheckpointNum, err := checkpointer.LatestCheckpoint() + if err != nil { + return nil, fmt.Errorf("cannot get last checkpointed number: %w", err) + } + + return &segmentsTrieCompactor{ + checkpointer: checkpointer, + logger: logger, + checkpointDistance: checkpointDistance, + lastCheckpointNum: lastCheckpointNum, + forest: forest, + forestSegmentNum: segmentNum, + segmentUpdateCh: segmentUpdateCh, + checkpointCh: checkpointCh, + stopCh: make(chan struct{}), + }, nil +} + +type checkpointResult struct { + checkpointNum int + err error +} + +func (c *segmentsTrieCompactor) stop() { + c.stopCh <- struct{}{} +} + +func (c *segmentsTrieCompactor) start() { + + checkpointSem := semaphore.NewWeighted(1) // limit to 1 checkpointing goroutine + checkpointResultChan := make(chan checkpointResult) + +Loop: + for { + select { + + case <-c.stopCh: + break Loop + + case tries := <-c.segmentUpdateCh: + triesToBeCheckpointed, checkpointNum, err := c.update(tries.tries, tries.segmentNum) + if err != nil { + c.logger.Error().Err(err).Msg("error updating cache") + continue + } + + if len(triesToBeCheckpointed) > 0 { + if checkpointSem.TryAcquire(1) { + go func() { + defer checkpointSem.Release(1) + err := createCheckpoint(c.checkpointer, c.logger, triesToBeCheckpointed, checkpointNum) + checkpointResultChan <- checkpointResult{checkpointNum, err} + }() + } + } + + case cpResult := <-checkpointResultChan: + if cpResult.err != nil { + c.logger.Error().Err(cpResult.err).Msg("error checkpointing") + continue + } + + c.Lock() + c.lastCheckpointNum = cpResult.checkpointNum + c.Unlock() + + c.checkpointCh <- cpResult.checkpointNum + } + } + + // Drain remaining segment updates to remove trie references. + for range c.segmentUpdateCh { + } +} + +// update adds tries in given segment to forest, and returns tries to be checkpointed +// with checkpoint number if enough segments are finalized. +func (c *segmentsTrieCompactor) update(tries []*trie.MTrie, segmentNum int) ([]*trie.MTrie, int, error) { + c.Lock() + defer c.Unlock() + + err := c.forest.AddTries(tries) + if err != nil { + return nil, 0, fmt.Errorf("error adding trie %w", err) + } + c.forestSegmentNum = segmentNum + + uncheckpointedSegmentCount := segmentNum - c.lastCheckpointNum + if uncheckpointedSegmentCount < int(c.checkpointDistance) { + return nil, 0, nil + } + + triesToBeCheckpointed, err := c.forest.GetTries() + if err != nil { + return nil, 0, err + } + return triesToBeCheckpointed, c.forestSegmentNum, nil +} + +// CachedCompactor creates and manages segmentsTrieCompactor and activeSegmentTrieCompactor. +type CachedCompactor struct { + checkpointer *Checkpointer + logger zerolog.Logger + lm *lifecycle.LifecycleManager + observers map[observable.Observer]struct{} + checkpointsToKeep uint + + activeSegmentTrieCompactor *activeSegmentTrieCompactor + segmentsTrieCompactor *segmentsTrieCompactor + + stopCh chan struct{} + checkpointCh <-chan int +} + +func NewCachedCompactor( + checkpointer *Checkpointer, + trieUpdateCh <-chan *SegmentTrie, + tries []*trie.MTrie, + segmentNum int, + checkpointForestCapacity int, + checkpointDistance uint, + checkpointsToKeep uint, + logger zerolog.Logger, +) (*CachedCompactor, error) { + if checkpointDistance < 1 { + checkpointDistance = 1 + } + + segmentUpdateCh := make(chan *segmentTries, defaultSegmentUpdateChannelBufSize) + checkpointCh := make(chan int) + + activeSegmentTrieCompactor := newActiveSegmentTrieCompactor( + logger, + trieUpdateCh, + segmentUpdateCh, + ) + + segmentsTrieCompactor, err := newSegmentsTrieCompactor( + logger, + checkpointer, + segmentUpdateCh, + checkpointCh, + tries, + segmentNum, + checkpointForestCapacity, + checkpointDistance, + ) + if err != nil { + return nil, err + } + + return &CachedCompactor{ + checkpointer: checkpointer, + logger: logger, + observers: make(map[observable.Observer]struct{}), + lm: lifecycle.NewLifecycleManager(), + checkpointsToKeep: checkpointsToKeep, + activeSegmentTrieCompactor: activeSegmentTrieCompactor, + segmentsTrieCompactor: segmentsTrieCompactor, + checkpointCh: checkpointCh, + stopCh: make(chan struct{}), + }, nil +} + +func (c *CachedCompactor) Subscribe(observer observable.Observer) { + var void struct{} + c.observers[observer] = void +} + +func (c *CachedCompactor) Unsubscribe(observer observable.Observer) { + delete(c.observers, observer) +} + +func (c *CachedCompactor) Ready() <-chan struct{} { + c.lm.OnStart(func() { + go c.activeSegmentTrieCompactor.start() + go c.segmentsTrieCompactor.start() + go c.start() + }) + return c.lm.Started() +} + +func (c *CachedCompactor) Done() <-chan struct{} { + c.lm.OnStop(func() { + for observer := range c.observers { + observer.OnComplete() + } + c.activeSegmentTrieCompactor.stop() + c.segmentsTrieCompactor.stop() + c.stopCh <- struct{}{} + }) + return c.lm.Stopped() +} + +func (c *CachedCompactor) start() { + for { + select { + case <-c.stopCh: + return + case checkpointNum := <-c.checkpointCh: + + for observer := range c.observers { + observer.OnNext(checkpointNum) + } + + err := cleanupCheckpoints(c.checkpointer, int(c.checkpointsToKeep)) + if err != nil { + c.logger.Error().Err(err).Msg("cannot cleanup checkpoints") + } + } + } +} + +func createCheckpoint(checkpointer *Checkpointer, logger zerolog.Logger, tries []*trie.MTrie, checkpointNum int) error { + + logger.Info().Msgf("serializing checkpoint %d with %d tries", checkpointNum, len(tries)) + + startTime := time.Now() + + writer, err := checkpointer.CheckpointWriter(checkpointNum) + if err != nil { + return fmt.Errorf("cannot generate writer: %w", err) + } + defer func() { + closeErr := writer.Close() + // Return close error if there isn't any prior error to return. + if err == nil { + err = closeErr + } + }() + + err = StoreCheckpoint(writer, tries...) + if err != nil { + return fmt.Errorf("error serializing checkpoint (%d): %w", checkpointNum, err) + } + + logger.Info().Msgf("created checkpoint %d with %d tries", checkpointNum, len(tries)) + + duration := time.Since(startTime) + logger.Info().Float64("total_time_s", duration.Seconds()).Msgf("created checkpoint %d with %d tries", checkpointNum, len(tries)) + + return nil +} + +func cleanupCheckpoints(checkpointer *Checkpointer, checkpointsToKeep int) error { + // don't bother listing checkpoints if we keep them all + if checkpointsToKeep == 0 { + return nil + } + checkpoints, err := checkpointer.Checkpoints() + if err != nil { + return fmt.Errorf("cannot list checkpoints: %w", err) + } + if len(checkpoints) > int(checkpointsToKeep) { + checkpointsToRemove := checkpoints[:len(checkpoints)-int(checkpointsToKeep)] // if condition guarantees this never fails + + for _, checkpoint := range checkpointsToRemove { + err := checkpointer.RemoveCheckpoint(checkpoint) + if err != nil { + return fmt.Errorf("cannot remove checkpoint %d: %w", checkpoint, err) + } + } + } + return nil +} diff --git a/ledger/complete/wal/cached_compactor_test.go b/ledger/complete/wal/cached_compactor_test.go new file mode 100644 index 00000000000..6a48f4c72fb --- /dev/null +++ b/ledger/complete/wal/cached_compactor_test.go @@ -0,0 +1,216 @@ +package wal + +import ( + "fmt" + "os" + "path" + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/ledger/common/utils" + "github.com/onflow/flow-go/ledger/complete/mtrie" + "github.com/onflow/flow-go/ledger/complete/mtrie/trie" + "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/utils/unittest" +) + +func Test_CachedCompactor(t *testing.T) { + numInsPerStep := 2 + pathByteSize := 32 + minPayloadByteSize := 2 << 15 + maxPayloadByteSize := 2 << 16 + size := 10 + metricsCollector := &metrics.NoopCollector{} + checkpointDistance := uint(2) + + unittest.RunWithTempDir(t, func(dir string) { + + f, err := mtrie.NewForest(size*10, metricsCollector, nil) + require.NoError(t, err) + + var rootHash = f.GetEmptyRootHash() + + //saved data after updates + savedData := make(map[ledger.RootHash]map[ledger.Path]*ledger.Payload) + + t.Run("Compactor creates checkpoints eventually", func(t *testing.T) { + + wal, err := NewDiskWAL(zerolog.Nop(), nil, metrics.NewNoopCollector(), dir, size*10, pathByteSize, 32*1024) + require.NoError(t, err) + + // WAL segments are 32kB, so here we generate 2 keys 64kB each, times `size` + // so we should get at least `size` segments + + checkpointer, err := wal.NewCheckpointer() + require.NoError(t, err) + + trieUpdateCh := make(chan *SegmentTrie, defaultTrieUpdateChannelBufSize) + compactor, err := NewCachedCompactor( + checkpointer, + trieUpdateCh, + nil, + -1, + size*10, + checkpointDistance, + 1, + zerolog.Nop(), + ) //keep only latest checkpoint + require.NoError(t, err) + + co := CompactorObserver{fromBound: 9, done: make(chan struct{})} + compactor.Subscribe(&co) + + // Run Compactor in background. + <-compactor.Ready() + + // Generate the tree and create WAL + for i := 0; i < size; i++ { + + paths0 := utils.RandomPaths(numInsPerStep) + payloads0 := utils.RandomPayloads(numInsPerStep, minPayloadByteSize, maxPayloadByteSize) + + var paths []ledger.Path + var payloads []*ledger.Payload + paths = append(paths, paths0...) + payloads = append(payloads, payloads0...) + + update := &ledger.TrieUpdate{RootHash: rootHash, Paths: paths, Payloads: payloads} + + segmentNum, err := wal.RecordUpdate(update) + require.NoError(t, err) + + rootHash, err = f.Update(update) + require.NoError(t, err) + + // The following code is done in Ledger.Set(). + // Since this test does't use ledger, we need to send updated trie data manually. + trie, err := f.GetTrie(rootHash) + require.NoError(t, err) + trieUpdateCh <- &SegmentTrie{Trie: trie, SegmentNum: segmentNum} + + require.FileExists(t, path.Join(dir, NumberToFilenamePart(i))) + + data := make(map[ledger.Path]*ledger.Payload, len(paths)) + for j, path := range paths { + data[path] = payloads[j] + } + + savedData[rootHash] = data + } + + // wait for the bound-checking observer to confirm checkpoints have been made + select { + case <-co.done: + // continue + case <-time.After(60 * time.Second): + assert.FailNow(t, "timed out") + } + + from, to, err := checkpointer.NotCheckpointedSegments() + require.NoError(t, err) + + assert.True(t, from == 10 && to == 10, "from: %v, to: %v", from, to) //make sure there is no leftover + + require.NoFileExists(t, path.Join(dir, "checkpoint.00000000")) + require.NoFileExists(t, path.Join(dir, "checkpoint.00000001")) + require.NoFileExists(t, path.Join(dir, "checkpoint.00000002")) + require.NoFileExists(t, path.Join(dir, "checkpoint.00000003")) + require.NoFileExists(t, path.Join(dir, "checkpoint.00000004")) + require.NoFileExists(t, path.Join(dir, "checkpoint.00000005")) + require.NoFileExists(t, path.Join(dir, "checkpoint.00000006")) + require.NoFileExists(t, path.Join(dir, "checkpoint.00000007")) + require.NoFileExists(t, path.Join(dir, "checkpoint.00000008")) + require.FileExists(t, path.Join(dir, "checkpoint.00000009")) + + <-compactor.Done() + <-wal.Done() + require.NoError(t, err) + }) + + time.Sleep(2 * time.Second) + + t.Run("remove unnecessary files", func(t *testing.T) { + // Remove all files apart from target checkpoint and WAL segments ahead of it + // We know their names, so just hardcode them + dirF, _ := os.Open(dir) + files, _ := dirF.Readdir(0) + + for _, fileInfo := range files { + + name := fileInfo.Name() + + if name != "checkpoint.00000009" && + name != "00000010" { + err := os.Remove(path.Join(dir, name)) + require.NoError(t, err) + } + } + }) + + f2, err := mtrie.NewForest(size*10, metricsCollector, nil) + require.NoError(t, err) + + time.Sleep(2 * time.Second) + + t.Run("load data from checkpoint and WAL", func(t *testing.T) { + wal2, err := NewDiskWAL(zerolog.Nop(), nil, metrics.NewNoopCollector(), dir, size*10, pathByteSize, 32*1024) + require.NoError(t, err) + + err = wal2.Replay( + func(tries []*trie.MTrie) error { + return f2.AddTries(tries) + }, + func(update *ledger.TrieUpdate) error { + _, err := f2.Update(update) + return err + }, + func(rootHash ledger.RootHash) error { + return fmt.Errorf("no deletion expected") + }, + ) + require.NoError(t, err) + + <-wal2.Done() + + }) + + t.Run("make sure forests are equal", func(t *testing.T) { + + //check for same data + for rootHash, data := range savedData { + + paths := make([]ledger.Path, 0, len(data)) + for path := range data { + paths = append(paths, path) + } + + read := &ledger.TrieRead{RootHash: rootHash, Paths: paths} + values, err := f.Read(read) + require.NoError(t, err) + + values2, err := f2.Read(read) + require.NoError(t, err) + + for i, path := range paths { + require.Equal(t, data[path].Value, values[i]) + require.Equal(t, data[path].Value, values2[i]) + } + } + + // check for + forestTries, err := f.GetTries() + require.NoError(t, err) + + forestTries2, err := f2.GetTries() + require.NoError(t, err) + + // order might be different + require.Equal(t, len(forestTries), len(forestTries2)) + }) + }) +} diff --git a/ledger/complete/wal/checkpointer_test.go b/ledger/complete/wal/checkpointer_test.go index 53db783bbdf..cf2b185efa2 100644 --- a/ledger/complete/wal/checkpointer_test.go +++ b/ledger/complete/wal/checkpointer_test.go @@ -148,7 +148,7 @@ func Test_Checkpointing(t *testing.T) { trieUpdate, err := pathfinder.UpdateToTrieUpdate(update, pathFinderVersion) require.NoError(t, err) - err = wal.RecordUpdate(trieUpdate) + _, err = wal.RecordUpdate(trieUpdate) require.NoError(t, err) rootHash, err := f.Update(trieUpdate) @@ -277,7 +277,7 @@ func Test_Checkpointing(t *testing.T) { trieUpdate, err := pathfinder.UpdateToTrieUpdate(update, pathFinderVersion) require.NoError(t, err) - err = wal4.RecordUpdate(trieUpdate) + _, err = wal4.RecordUpdate(trieUpdate) require.NoError(t, err) rootHash, err = f.Update(trieUpdate) @@ -448,7 +448,7 @@ func TestCheckpointFileError(t *testing.T) { trieUpdate, err := pathfinder.UpdateToTrieUpdate(update, pathFinderVersion) require.NoError(t, err) - err = wal.RecordUpdate(trieUpdate) + _, err = wal.RecordUpdate(trieUpdate) require.NoError(t, err) // some buffer time of the checkpointer to run diff --git a/ledger/complete/wal/compactor_test.go b/ledger/complete/wal/compactor_test.go index 4a29aad8ff5..4cc3fade276 100644 --- a/ledger/complete/wal/compactor_test.go +++ b/ledger/complete/wal/compactor_test.go @@ -91,7 +91,7 @@ func Test_Compactor(t *testing.T) { update := &ledger.TrieUpdate{RootHash: rootHash, Paths: paths, Payloads: payloads} - err = wal.RecordUpdate(update) + _, err = wal.RecordUpdate(update) require.NoError(t, err) rootHash, err = f.Update(update) @@ -263,7 +263,7 @@ func Test_Compactor_checkpointInterval(t *testing.T) { update := &ledger.TrieUpdate{RootHash: rootHash, Paths: paths, Payloads: payloads} - err = wal.RecordUpdate(update) + _, err = wal.RecordUpdate(update) require.NoError(t, err) rootHash, err = f.Update(update) diff --git a/ledger/complete/wal/fixtures/noopwal.go b/ledger/complete/wal/fixtures/noopwal.go index 8f705efdbf2..a74f3f1b6ef 100644 --- a/ledger/complete/wal/fixtures/noopwal.go +++ b/ledger/complete/wal/fixtures/noopwal.go @@ -29,7 +29,7 @@ func (w *NoopWAL) PauseRecord() {} func (w *NoopWAL) UnpauseRecord() {} -func (w *NoopWAL) RecordUpdate(update *ledger.TrieUpdate) error { return nil } +func (w *NoopWAL) RecordUpdate(update *ledger.TrieUpdate) (int, error) { return 0, nil } func (w *NoopWAL) RecordDelete(rootHash ledger.RootHash) error { return nil } diff --git a/ledger/complete/wal/wal.go b/ledger/complete/wal/wal.go index 7d5d7c5cf91..5b74788be97 100644 --- a/ledger/complete/wal/wal.go +++ b/ledger/complete/wal/wal.go @@ -56,17 +56,20 @@ func (w *DiskWAL) UnpauseRecord() { w.paused = false } -func (w *DiskWAL) RecordUpdate(update *ledger.TrieUpdate) error { +// RecordUpdate writes serialized trie update to WAL and returns WAL segment number and error. +func (w *DiskWAL) RecordUpdate(update *ledger.TrieUpdate) (int, error) { if w.paused { - return nil + return 0, nil } bytes := EncodeUpdate(update) - _, err := w.wal.Log(bytes) - + locations, err := w.wal.Log(bytes) if err != nil { - return fmt.Errorf("error while recording update in LedgerWAL: %w", err) + return 0, fmt.Errorf("error while recording update in LedgerWAL: %w", err) + } + if len(locations) != 1 { + return 0, fmt.Errorf("error while recording update in LedgerWAL: got %d location, expect 1 location", len(locations)) } select { @@ -80,7 +83,7 @@ func (w *DiskWAL) RecordUpdate(update *ledger.TrieUpdate) error { default: //don't block } - return nil + return locations[0].Segment, nil } // DiskSize returns the amount of disk space used by the storage (in bytes) @@ -337,7 +340,7 @@ type LedgerWAL interface { NewCheckpointer() (*Checkpointer, error) PauseRecord() UnpauseRecord() - RecordUpdate(update *ledger.TrieUpdate) error + RecordUpdate(update *ledger.TrieUpdate) (int, error) RecordDelete(rootHash ledger.RootHash) error ReplayOnForest(forest *mtrie.Forest) error Segments() (first, last int, err error)