Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EN Performance] [POC] Reduce operational RAM by 152+ GB and checkpoint duration by 24 mins by reusing ledger state #2770

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/onflow/flow-go/fvm/systemcontracts"
"github.com/onflow/flow-go/ledger/common/pathfinder"
ledger "github.com/onflow/flow-go/ledger/complete"
"github.com/onflow/flow-go/ledger/complete/mtrie/trie"
"github.com/onflow/flow-go/ledger/complete/wal"
bootstrapFilenames "github.com/onflow/flow-go/model/bootstrap"
"github.com/onflow/flow-go/model/encoding/cbor"
Expand Down Expand Up @@ -206,6 +207,9 @@ func (e *ExecutionNodeBuilder) LoadComponentsAndModules() {
executionDataService state_synchronization.ExecutionDataService
executionDataCIDCache state_synchronization.ExecutionDataCIDCache
executionDataCIDCacheSize uint = 100

ledgerInitialState []*trie.MTrie
ledgerInitialStateSegmentNum = -1
)

e.FlowNodeBuilder.
Expand Down Expand Up @@ -382,18 +386,39 @@ func (e *ExecutionNodeBuilder) LoadComponentsAndModules() {
}
}

ledgerStorage, err = ledger.NewLedger(diskWAL, int(e.exeConf.mTrieCacheSize), collector, node.Logger.With().Str("subcomponent",
"ledger").Logger(), ledger.DefaultPathFinderVersion)
ledgerStorage, ledgerInitialState, ledgerInitialStateSegmentNum, err = ledger.NewSyncLedger(
diskWAL,
int(e.exeConf.mTrieCacheSize),
collector,
node.Logger.With().Str("subcomponent", "ledger").Logger(),
ledger.DefaultPathFinderVersion)

return ledgerStorage, err
}).
Component("execution state ledger WAL compactor", func(node *NodeConfig) (module.ReadyDoneAware, error) {

checkpointer, err := ledgerStorage.Checkpointer()
if err != nil {
return nil, fmt.Errorf("cannot create checkpointer: %w", err)
}
compactor := wal.NewCompactor(checkpointer,
10*time.Second,

trieUpdateChan := ledgerStorage.SegmentTrieChan()

if trieUpdateChan == nil {
compactor := wal.NewCompactor(checkpointer,
10*time.Second,
e.exeConf.checkpointDistance,
e.exeConf.checkpointsToKeep,
node.Logger.With().Str("subcomponent", "checkpointer").Logger())

return compactor, nil
}

compactor, err := wal.NewCachedCompactor(
checkpointer,
trieUpdateChan,
ledgerInitialState,
ledgerInitialStateSegmentNum,
int(e.exeConf.mTrieCacheSize),
e.exeConf.checkpointDistance,
e.exeConf.checkpointsToKeep,
node.Logger.With().Str("subcomponent", "checkpointer").Logger())
Expand Down
83 changes: 78 additions & 5 deletions ledger/complete/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

const DefaultCacheSize = 1000
const DefaultPathFinderVersion = 1
const defaultSegmentUpdateChanSize = 500

// Ledger (complete) is a fast memory-efficient fork-aware thread-safe trie-based key/value storage.
// Ledger holds an array of registers (key-value pairs) and keeps tracks of changes over a limited time.
Expand All @@ -39,6 +40,7 @@ type Ledger struct {
metrics module.LedgerMetrics
logger zerolog.Logger
pathFinderVersion uint8
trieUpdateCh chan *wal.SegmentTrie
}

// NewLedger creates a new in-memory trie-backed ledger storage with persistence.
Expand Down Expand Up @@ -86,6 +88,61 @@ func NewLedger(
return storage, nil
}

func NewSyncLedger(lwal wal.LedgerWAL, capacity int, metrics module.LedgerMetrics, log zerolog.Logger, pathFinderVer uint8) (*Ledger, []*trie.MTrie, int, error) {

logger := log.With().Str("ledger", "complete").Logger()

forest, err := mtrie.NewForest(capacity, metrics, func(evictedTrie *trie.MTrie) {
err := lwal.RecordDelete(evictedTrie.RootHash())
if err != nil {
logger.Error().Err(err).Msg("failed to save delete record in wal")
}
})
if err != nil {
return nil, nil, 0, fmt.Errorf("cannot create forest: %w", err)
}

storage := &Ledger{
forest: forest,
wal: lwal,
trieUpdateCh: make(chan *wal.SegmentTrie, defaultSegmentUpdateChanSize),
metrics: metrics,
logger: logger,
pathFinderVersion: pathFinderVer,
}

// pause records to prevent double logging trie removals
lwal.PauseRecord()
defer lwal.UnpauseRecord()

err = lwal.ReplayOnForest(forest)
if err != nil {
return nil, nil, 0, fmt.Errorf("cannot restore LedgerWAL: %w", err)
}

lwal.UnpauseRecord()

tries, err := forest.GetTries()
if err != nil {
return nil, nil, 0, fmt.Errorf("cannot get tries from forest: %w", err)
}

_, to, err := lwal.Segments()
if err != nil {
return nil, nil, 0, fmt.Errorf("cannot get segment numbers: %w", err)
}

// TODO update to proper value once https://github.com/onflow/flow-go/pull/3720 is merged
metrics.ForestApproxMemorySize(0)

// TODO verify to-1 is correct because DiskWAL is created and passed in as LedgerWAL and DiskWAL creates new segment file.
return storage, tries, to - 1, nil
}

func (l *Ledger) SegmentTrieChan() <-chan *wal.SegmentTrie {
return l.trieUpdateCh
}

// Ready implements interface module.ReadyDoneAware
// it starts the EventLoop's internal processing loop.
func (l *Ledger) Ready() <-chan struct{} {
Expand Down Expand Up @@ -182,6 +239,11 @@ func (l *Ledger) Get(query *ledger.Query) (values []ledger.Value, err error) {
return values, err
}

type walUpdateResult struct {
segmentNum int
err error
}

// Set updates the ledger given an update
// it returns the state after update and errors (if any)
func (l *Ledger) Set(update *ledger.Update) (newState ledger.State, trieUpdate *ledger.TrieUpdate, err error) {
Expand All @@ -200,20 +262,31 @@ func (l *Ledger) Set(update *ledger.Update) (newState ledger.State, trieUpdate *

l.metrics.UpdateCount()

walChan := make(chan error)
walChan := make(chan walUpdateResult)

go func() {
walChan <- l.wal.RecordUpdate(trieUpdate)
segmentNum, err := l.wal.RecordUpdate(trieUpdate)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After writing the update to WAL, we return the segment num that the update was written to.

If two updates are written to the same segment file, then the same segmentNum will be returned again, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If two updates are written to the same segment file, then the same segmentNum will be returned again, right?

Yes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to return two numbers, the segmentNum and the index of the trie update included in the segment?

So that when subscribing the SegmentTrieUpdate, we can double check the order:

SegmentTrie{trie: trie1, segumentNum: 10, indexInSegument: 4}
SegmentTrie{trie: trie3, segumentNum: 10, indexInSegument: 6}
SegmentTrie{trie: trie2, segumentNum: 10, indexInSegument: 5}

If we receive the trie updates in the above order, then we know it must be inconsistent with the order in the WAL files.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible to include index in PR #2792. But since PR #2792 handles concurrency and ensures the WAL update and ledger state update are in sync, maybe we don't need to include index to detect inconsistency in #2792.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Writing to WAL files happens concurrently, we need a way to ensure the order of trie updates written to the file and the updates pushed to trieUpdateCh is consistent. The current implementation can not guarantee that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation can not guarantee that

The current implementation is not PR 2770 because it was superceded by PR #2792 days ago.

In PR #2792, when Compactor receives trieUpdate from channel, Compactor

  • writes update in WAL,
  • signals to ledger.Set when WAL update is completed,
  • waits for trie update completion (adding new trie ledger state)

Because all these step, the order of trie updates and WAL updates are consistent.

walChan <- walUpdateResult{segmentNum, err}
}()

newRootHash, err := l.forest.Update(trieUpdate)
walError := <-walChan
walResult := <-walChan

if err != nil {
return ledger.State(hash.DummyHash), nil, fmt.Errorf("cannot update state: %w", err)
}
if walError != nil {
return ledger.State(hash.DummyHash), nil, fmt.Errorf("error while writing LedgerWAL: %w", walError)
if walResult.err != nil {
return ledger.State(hash.DummyHash), nil, fmt.Errorf("error while writing LedgerWAL: %w", walResult.err)
}

if l.trieUpdateCh != nil {
// Get updated trie from forest
trie, err := l.forest.GetTrie(newRootHash)
if err != nil {
return ledger.State(hash.DummyHash), nil, fmt.Errorf("cannot get updated trie: %w", err)
}

l.trieUpdateCh <- &wal.SegmentTrie{Trie: trie, SegmentNum: walResult.segmentNum}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the update has been written to WAL, and a new trie is created, we make a SegmentTrie that contains both the new trie and the segment num that contains the update, and push it to the channel, so that the channel subscriber will process it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because tries in checkpoints need to be in sync with tries created by updates in WAL segments. So we need to know new trie and the segment num to reuse trie for checkpointing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the Set method be called concurrently?

Since pushing the trie updates to the channel is concurrent, is it possible that the order of the updates we read from the channel will be different from the order in the WAL file?

What I'm afraid is that imaging the following updates are called concurrently:

  1. SegmentTrie{trie: trie1, segumentNum: 10}
  2. SegmentTrie{trie: trie3, segumentNum: 11}
  3. SegmentTrie{trie: trie2, segumentNum: 10}

Then it's possible that the cache will be inconsistent if trie2 is not included in the previousSegment

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the Set method be called concurrently?

Yes, I asked Maks the same question yesterday and he replied "If fork happens and first collection finishes execution at the same time. Pretty unlikely but can theoretically happen."

Since pushing the trie updates to the channel is concurrent, is it possible that the order of the updates we read from the channel will be different from the order in the WAL file?

Yes, you're correct. Although this POC doesn't handle Set in parallel, PR #2792 handles parallel execution of Set.

This PR #2770 is just a proof-of-concept. It was superceded by PR #2792 which supports parallel Set and memory improvements.

}

// TODO update to proper value once https://github.com/onflow/flow-go/pull/3720 is merged
Expand Down
12 changes: 6 additions & 6 deletions ledger/complete/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,9 +745,9 @@ func TestWALUpdateIsRunInParallel(t *testing.T) {
wg.Add(1)

w := &LongRunningDummyWAL{
updateFn: func(update *ledger.TrieUpdate) error {
updateFn: func(update *ledger.TrieUpdate) (int, error) {
wg.Wait() //wg will let work after the trie has been updated
return nil
return 0, nil
},
}

Expand Down Expand Up @@ -795,8 +795,8 @@ func TestWALUpdateFailuresBubbleUp(t *testing.T) {
theError := fmt.Errorf("error error")

w := &LongRunningDummyWAL{
updateFn: func(update *ledger.TrieUpdate) error {
return theError
updateFn: func(update *ledger.TrieUpdate) (int, error) {
return 0, theError
},
}

Expand Down Expand Up @@ -849,10 +849,10 @@ func migrationByValue(p []ledger.Payload) ([]ledger.Payload, error) {

type LongRunningDummyWAL struct {
fixtures.NoopWAL
updateFn func(update *ledger.TrieUpdate) error
updateFn func(update *ledger.TrieUpdate) (int, error)
}

func (w *LongRunningDummyWAL) RecordUpdate(update *ledger.TrieUpdate) error {
func (w *LongRunningDummyWAL) RecordUpdate(update *ledger.TrieUpdate) (int, error) {
return w.updateFn(update)
}

Expand Down
Loading