Skip to content

Commit

Permalink
Merge pull request #2473 from onflow/fxamacker/optimize-reading-singl…
Browse files Browse the repository at this point in the history
…e-register

Optimize MTrie reading single path by adding Ledger.GetSingleValue() to boost speed and reduce memory use
  • Loading branch information
fxamacker authored May 25, 2022
2 parents dbb3a7c + 8ac1003 commit 8a964e8
Show file tree
Hide file tree
Showing 19 changed files with 714 additions and 102 deletions.
4 changes: 2 additions & 2 deletions cmd/util/cmd/read-execution-state/list-accounts/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ func run(*cobra.Command, []string) {
},
}

payload, err := forest.Read(read)
values, err := forest.Read(read)
if err != nil {
return nil, err
}

return payload[0].Value, nil
return values[0], nil
})

sth := state.NewStateHolder(state.NewState(ldg))
Expand Down
22 changes: 8 additions & 14 deletions engine/execution/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,10 @@ func NewExecutionState(

}

func makeSingleValueQuery(commitment flow.StateCommitment, owner, controller, key string) (*ledger.Query, error) {
return ledger.NewQuery(ledger.State(commitment),
[]ledger.Key{
RegisterIDToKey(flow.NewRegisterID(owner, controller, key)),
})
func makeSingleValueQuery(commitment flow.StateCommitment, owner, controller, key string) (*ledger.QuerySingleValue, error) {
return ledger.NewQuerySingleValue(ledger.State(commitment),
RegisterIDToKey(flow.NewRegisterID(owner, controller, key)),
)
}

func makeQuery(commitment flow.StateCommitment, ids []flow.RegisterID) (*ledger.Query, error) {
Expand Down Expand Up @@ -187,26 +186,21 @@ func LedgerGetRegister(ldg ledger.Ledger, commitment flow.StateCommitment) delta
return nil, fmt.Errorf("cannot create ledger query: %w", err)
}

values, err := ldg.Get(query)
value, err := ldg.GetSingleValue(query)

if err != nil {
return nil, fmt.Errorf("error getting register (%s) value at %x: %w", key, commitment, err)
}

// We expect 1 element in the returned slice of values because query is from makeSingleValueQuery()
if len(values) != 1 {
return nil, fmt.Errorf("error getting register (%s) value at %x: number of returned values (%d) != number of queried keys (%d)", key, commitment, len(values), len(query.Keys()))
}

// Prevent caching of value with len zero
if len(values[0]) == 0 {
if len(value) == 0 {
return nil, nil
}

// don't cache value with len zero
readCache[regID] = flow.RegisterEntry{Key: regID, Value: values[0]}
readCache[regID] = flow.RegisterEntry{Key: regID, Value: value}

return values[0], nil
return value, nil
}
}

Expand Down
29 changes: 24 additions & 5 deletions ledger/complete/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,29 @@ func (l *Ledger) ValueSizes(query *ledger.Query) (valueSizes []int, err error) {
return valueSizes, err
}

// GetSingleValue reads value of a single given key at the given state.
func (l *Ledger) GetSingleValue(query *ledger.QuerySingleValue) (value ledger.Value, err error) {
start := time.Now()
path, err := pathfinder.KeyToPath(query.Key(), l.pathFinderVersion)
if err != nil {
return nil, err
}
trieRead := &ledger.TrieReadSingleValue{RootHash: ledger.RootHash(query.State()), Path: path}
value, err = l.forest.ReadSingleValue(trieRead)
if err != nil {
return nil, err
}

l.metrics.ReadValuesNumber(1)
readDuration := time.Since(start)
l.metrics.ReadDuration(readDuration)

durationPerValue := time.Duration(readDuration.Nanoseconds()) * time.Nanosecond
l.metrics.ReadDurationPerItem(durationPerValue)

return value, nil
}

// Get read the values of the given keys at the given state
// it returns the values in the same order as given registerIDs and errors (if any)
func (l *Ledger) Get(query *ledger.Query) (values []ledger.Value, err error) {
Expand All @@ -142,11 +165,7 @@ func (l *Ledger) Get(query *ledger.Query) (values []ledger.Value, err error) {
return nil, err
}
trieRead := &ledger.TrieRead{RootHash: ledger.RootHash(query.State()), Paths: paths}
payloads, err := l.forest.Read(trieRead)
if err != nil {
return nil, err
}
values, err = pathfinder.PayloadsToValues(payloads)
values, err = l.forest.Read(trieRead)
if err != nil {
return nil, err
}
Expand Down
73 changes: 73 additions & 0 deletions ledger/complete/ledger_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,79 @@ func BenchmarkTrieRead(b *testing.B) {
b.StopTimer()
}

func BenchmarkLedgerGetOneValue(b *testing.B) {
// key updates per iteration
numInsPerStep := 10000
keyNumberOfParts := 10
keyPartMinByteSize := 1
keyPartMaxByteSize := 100
valueMaxByteSize := 32
rand.Seed(1)

dir, err := os.MkdirTemp("", "test-mtrie-")
defer os.RemoveAll(dir)
if err != nil {
b.Fatal(err)
}

diskWal, err := wal.NewDiskWAL(zerolog.Nop(), nil, metrics.NewNoopCollector(), dir, 101, pathfinder.PathByteSize, wal.SegmentSize)
require.NoError(b, err)
defer func() {
<-diskWal.Done()
}()

led, err := complete.NewLedger(diskWal, 101, &metrics.NoopCollector{}, zerolog.Logger{}, complete.DefaultPathFinderVersion)
defer led.Done()
if err != nil {
b.Fatal("can't create a new complete ledger")
}

state := led.InitialState()

keys := utils.RandomUniqueKeys(numInsPerStep, keyNumberOfParts, keyPartMinByteSize, keyPartMaxByteSize)
values := utils.RandomValues(numInsPerStep, 1, valueMaxByteSize)

update, err := ledger.NewUpdate(state, keys, values)
if err != nil {
b.Fatal(err)
}

newState, _, err := led.Set(update)
if err != nil {
b.Fatal(err)
}

b.Run("batch get", func(b *testing.B) {
query, err := ledger.NewQuery(newState, []ledger.Key{keys[0]})
if err != nil {
b.Fatal(err)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err = led.Get(query)
if err != nil {
b.Fatal(err)
}
}
})

b.Run("single get", func(b *testing.B) {
query, err := ledger.NewQuerySingleValue(newState, keys[0])
if err != nil {
b.Fatal(err)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err = led.GetSingleValue(query)
if err != nil {
b.Fatal(err)
}
}
})
}

// BenchmarkTrieUpdate benchmarks the performance of a trie prove
func BenchmarkTrieProve(b *testing.B) {
// key updates per iteration
Expand Down
90 changes: 90 additions & 0 deletions ledger/complete/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,96 @@ func TestLedger_Get(t *testing.T) {
})
}

// TestLedger_GetSingleValue tests reading value from a single path.
func TestLedger_GetSingleValue(t *testing.T) {

wal := &fixtures.NoopWAL{}
led, err := complete.NewLedger(
wal,
100,
&metrics.NoopCollector{},
zerolog.Logger{},
complete.DefaultPathFinderVersion,
)
require.NoError(t, err)

state := led.InitialState()

t.Run("non-existent key", func(t *testing.T) {

keys := utils.RandomUniqueKeys(10, 2, 1, 10)

for _, k := range keys {
qs, err := ledger.NewQuerySingleValue(state, k)
require.NoError(t, err)

retValue, err := led.GetSingleValue(qs)
require.NoError(t, err)
assert.Equal(t, 0, len(retValue))
}
})

t.Run("existent key", func(t *testing.T) {

u := utils.UpdateFixture()
u.SetState(state)

newState, _, err := led.Set(u)
require.NoError(t, err)
assert.NotEqual(t, state, newState)

for i, k := range u.Keys() {
q, err := ledger.NewQuerySingleValue(newState, k)
require.NoError(t, err)

retValue, err := led.GetSingleValue(q)
require.NoError(t, err)
assert.Equal(t, u.Values()[i], retValue)
}
})

t.Run("mix of existent and non-existent keys", func(t *testing.T) {

u := utils.UpdateFixture()
u.SetState(state)

newState, _, err := led.Set(u)
require.NoError(t, err)
assert.NotEqual(t, state, newState)

// Save expected values for existent keys
expectedValues := make(map[string]ledger.Value)
for i, key := range u.Keys() {
encKey := encoding.EncodeKey(&key)
expectedValues[string(encKey)] = u.Values()[i]
}

// Create a randomly ordered mix of existent and non-existent keys
var queryKeys []ledger.Key
queryKeys = append(queryKeys, u.Keys()...)
queryKeys = append(queryKeys, utils.RandomUniqueKeys(10, 2, 1, 10)...)

rand.Shuffle(len(queryKeys), func(i, j int) {
queryKeys[i], queryKeys[j] = queryKeys[j], queryKeys[i]
})

for _, k := range queryKeys {
qs, err := ledger.NewQuerySingleValue(newState, k)
require.NoError(t, err)

retValue, err := led.GetSingleValue(qs)
require.NoError(t, err)

encKey := encoding.EncodeKey(&k)
if value, ok := expectedValues[string(encKey)]; ok {
require.Equal(t, value, retValue)
} else {
require.Equal(t, 0, len(retValue))
}
}
})
}

func TestLedgerValueSizes(t *testing.T) {
t.Run("empty query", func(t *testing.T) {

Expand Down
28 changes: 23 additions & 5 deletions ledger/complete/mtrie/forest.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,24 @@ func (f *Forest) ValueSizes(r *ledger.TrieRead) ([]int, error) {
return orderedValueSizes, nil
}

// ReadSingleValue reads value for a single path and returns value and error (if any)
func (f *Forest) ReadSingleValue(r *ledger.TrieReadSingleValue) (ledger.Value, error) {
// lookup the trie by rootHash
trie, err := f.GetTrie(r.RootHash)
if err != nil {
return nil, err
}

payload := trie.ReadSinglePayload(r.Path)
return payload.Value.DeepCopy(), nil
}

// Read reads values for an slice of paths and returns values and error (if any)
// TODO: can be optimized further if we don't care about changing the order of the input r.Paths
func (f *Forest) Read(r *ledger.TrieRead) ([]*ledger.Payload, error) {
func (f *Forest) Read(r *ledger.TrieRead) ([]ledger.Value, error) {

if len(r.Paths) == 0 {
return []*ledger.Payload{}, nil
return []ledger.Value{}, nil
}

// lookup the trie by rootHash
Expand All @@ -137,6 +149,12 @@ func (f *Forest) Read(r *ledger.TrieRead) ([]*ledger.Payload, error) {
return nil, err
}

// call ReadSinglePayload if there is only one path
if len(r.Paths) == 1 {
payload := trie.ReadSinglePayload(r.Paths[0])
return []ledger.Value{payload.Value.DeepCopy()}, nil
}

// deduplicate keys:
// Generally, we expect the VM to deduplicate reads and writes. Hence, the following is a pre-caution.
// TODO: We could take out the following de-duplication logic
Expand All @@ -156,20 +174,20 @@ func (f *Forest) Read(r *ledger.TrieRead) ([]*ledger.Payload, error) {
payloads := trie.UnsafeRead(deduplicatedPaths) // this sorts deduplicatedPaths IN-PLACE

// reconstruct the payloads in the same key order that called the method
orderedPayloads := make([]*ledger.Payload, len(r.Paths))
orderedValues := make([]ledger.Value, len(r.Paths))
totalPayloadSize := 0
for i, p := range deduplicatedPaths {
payload := payloads[i]
indices := pathOrgIndex[p]
for _, j := range indices {
orderedPayloads[j] = payload.DeepCopy()
orderedValues[j] = payload.Value.DeepCopy()
}
totalPayloadSize += len(indices) * payload.Size()
}
// TODO rename the metrics
f.metrics.ReadValuesSize(uint64(totalPayloadSize))

return orderedPayloads, nil
return orderedValues, nil
}

// Update updates the Values for the registers and returns rootHash and error (if any).
Expand Down
Loading

0 comments on commit 8a964e8

Please sign in to comment.