Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EN Performance] Optimize MTrie reading single path by adding Ledger.GetSingleValue() to boost speed and reduce memory use #2473

Merged
merged 11 commits into from
May 25, 2022
22 changes: 8 additions & 14 deletions engine/execution/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,10 @@ func NewExecutionState(

}

func makeSingleValueQuery(commitment flow.StateCommitment, owner, controller, key string) (*ledger.Query, error) {
return ledger.NewQuery(ledger.State(commitment),
[]ledger.Key{
RegisterIDToKey(flow.NewRegisterID(owner, controller, key)),
})
func makeSingleValueQuery(commitment flow.StateCommitment, owner, controller, key string) (*ledger.QuerySingleValue, error) {
return ledger.NewQuerySingleValue(ledger.State(commitment),
RegisterIDToKey(flow.NewRegisterID(owner, controller, key)),
)
}

func makeQuery(commitment flow.StateCommitment, ids []flow.RegisterID) (*ledger.Query, error) {
Expand Down Expand Up @@ -187,26 +186,21 @@ func LedgerGetRegister(ldg ledger.Ledger, commitment flow.StateCommitment) delta
return nil, fmt.Errorf("cannot create ledger query: %w", err)
}

values, err := ldg.Get(query)
value, err := ldg.GetSingleValue(query)

if err != nil {
return nil, fmt.Errorf("error getting register (%s) value at %x: %w", key, commitment, err)
}

// We expect 1 element in the returned slice of values because query is from makeSingleValueQuery()
if len(values) != 1 {
return nil, fmt.Errorf("error getting register (%s) value at %x: number of returned values (%d) != number of queried keys (%d)", key, commitment, len(values), len(query.Keys()))
}

// Prevent caching of value with len zero
if len(values[0]) == 0 {
if len(value) == 0 {
return nil, nil
}

// don't cache value with len zero
readCache[regID] = flow.RegisterEntry{Key: regID, Value: values[0]}
readCache[regID] = flow.RegisterEntry{Key: regID, Value: value}

return values[0], nil
return value, nil
}
}

Expand Down
23 changes: 23 additions & 0 deletions ledger/complete/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,29 @@ func (l *Ledger) ValueSizes(query *ledger.Query) (valueSizes []int, err error) {
return valueSizes, err
}

// GetSingleValue reads value of a single given key at the given state.
func (l *Ledger) GetSingleValue(query *ledger.QuerySingleValue) (value ledger.Value, err error) {
start := time.Now()
path, err := pathfinder.KeyToPath(query.Key(), l.pathFinderVersion)
if err != nil {
return nil, err
}
trieRead := &ledger.TrieReadSinglePayload{RootHash: ledger.RootHash(query.State()), Path: path}
payload, err := l.forest.ReadSinglePayload(trieRead)
if err != nil {
return nil, err
}

l.metrics.ReadValuesNumber(1)
readDuration := time.Since(start)
l.metrics.ReadDuration(readDuration)

durationPerValue := time.Duration(readDuration.Nanoseconds()) * time.Nanosecond
l.metrics.ReadDurationPerItem(durationPerValue)

return payload.Value, nil
}

// Get read the values of the given keys at the given state
// it returns the values in the same order as given registerIDs and errors (if any)
func (l *Ledger) Get(query *ledger.Query) (values []ledger.Value, err error) {
Expand Down
73 changes: 73 additions & 0 deletions ledger/complete/ledger_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,79 @@ func BenchmarkTrieRead(b *testing.B) {
b.StopTimer()
}

func BenchmarkLedgerGetOneValue(b *testing.B) {
// key updates per iteration
numInsPerStep := 10000
keyNumberOfParts := 10
keyPartMinByteSize := 1
keyPartMaxByteSize := 100
valueMaxByteSize := 32
rand.Seed(1)

dir, err := os.MkdirTemp("", "test-mtrie-")
defer os.RemoveAll(dir)
if err != nil {
b.Fatal(err)
}

diskWal, err := wal.NewDiskWAL(zerolog.Nop(), nil, metrics.NewNoopCollector(), dir, 101, pathfinder.PathByteSize, wal.SegmentSize)
require.NoError(b, err)
defer func() {
<-diskWal.Done()
}()

led, err := complete.NewLedger(diskWal, 101, &metrics.NoopCollector{}, zerolog.Logger{}, complete.DefaultPathFinderVersion)
defer led.Done()
if err != nil {
b.Fatal("can't create a new complete ledger")
}

state := led.InitialState()

keys := utils.RandomUniqueKeys(numInsPerStep, keyNumberOfParts, keyPartMinByteSize, keyPartMaxByteSize)
values := utils.RandomValues(numInsPerStep, 1, valueMaxByteSize)

update, err := ledger.NewUpdate(state, keys, values)
if err != nil {
b.Fatal(err)
}

newState, _, err := led.Set(update)
if err != nil {
b.Fatal(err)
}

b.Run("batch get", func(b *testing.B) {
query, err := ledger.NewQuery(newState, []ledger.Key{keys[0]})
if err != nil {
b.Fatal(err)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err = led.Get(query)
if err != nil {
b.Fatal(err)
}
}
})

b.Run("single get", func(b *testing.B) {
query, err := ledger.NewQuerySingleValue(newState, keys[0])
if err != nil {
b.Fatal(err)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err = led.GetSingleValue(query)
if err != nil {
b.Fatal(err)
}
}
})
}

// BenchmarkTrieUpdate benchmarks the performance of a trie prove
func BenchmarkTrieProve(b *testing.B) {
// key updates per iteration
Expand Down
90 changes: 90 additions & 0 deletions ledger/complete/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,96 @@ func TestLedger_Get(t *testing.T) {
})
}

// TestLedger_GetSingleValue tests reading value from a single path.
func TestLedger_GetSingleValue(t *testing.T) {

wal := &fixtures.NoopWAL{}
led, err := complete.NewLedger(
wal,
100,
&metrics.NoopCollector{},
zerolog.Logger{},
complete.DefaultPathFinderVersion,
)
require.NoError(t, err)

state := led.InitialState()

t.Run("non-existent key", func(t *testing.T) {

keys := utils.RandomUniqueKeys(10, 2, 1, 10)

for _, k := range keys {
qs, err := ledger.NewQuerySingleValue(state, k)
require.NoError(t, err)

retValue, err := led.GetSingleValue(qs)
require.NoError(t, err)
assert.Equal(t, 0, len(retValue))
}
})

t.Run("existent key", func(t *testing.T) {

u := utils.UpdateFixture()
u.SetState(state)

newState, _, err := led.Set(u)
require.NoError(t, err)
assert.NotEqual(t, state, newState)

for i, k := range u.Keys() {
q, err := ledger.NewQuerySingleValue(newState, k)
require.NoError(t, err)

retValue, err := led.GetSingleValue(q)
require.NoError(t, err)
assert.Equal(t, u.Values()[i], retValue)
}
})

t.Run("mix of existent and non-existent keys", func(t *testing.T) {

u := utils.UpdateFixture()
u.SetState(state)

newState, _, err := led.Set(u)
require.NoError(t, err)
assert.NotEqual(t, state, newState)

// Save expected values for existent keys
expectedValues := make(map[string]ledger.Value)
for i, key := range u.Keys() {
encKey := encoding.EncodeKey(&key)
expectedValues[string(encKey)] = u.Values()[i]
}

// Create a randomly ordered mix of existent and non-existent keys
var queryKeys []ledger.Key
queryKeys = append(queryKeys, u.Keys()...)
queryKeys = append(queryKeys, utils.RandomUniqueKeys(10, 2, 1, 10)...)

rand.Shuffle(len(queryKeys), func(i, j int) {
queryKeys[i], queryKeys[j] = queryKeys[j], queryKeys[i]
})

for _, k := range queryKeys {
qs, err := ledger.NewQuerySingleValue(newState, k)
require.NoError(t, err)

retValue, err := led.GetSingleValue(qs)
require.NoError(t, err)

encKey := encoding.EncodeKey(&k)
if value, ok := expectedValues[string(encKey)]; ok {
require.Equal(t, value, retValue)
} else {
require.Equal(t, 0, len(retValue))
}
}
})
}

func TestLedgerValueSizes(t *testing.T) {
t.Run("empty query", func(t *testing.T) {

Expand Down
18 changes: 18 additions & 0 deletions ledger/complete/mtrie/forest.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,18 @@ func (f *Forest) ValueSizes(r *ledger.TrieRead) ([]int, error) {
return orderedValueSizes, nil
}

// ReadSinglePayload reads value for a single path and returns value and error (if any)
func (f *Forest) ReadSinglePayload(r *ledger.TrieReadSinglePayload) (*ledger.Payload, error) {
// lookup the trie by rootHash
trie, err := f.GetTrie(r.RootHash)
if err != nil {
return nil, err
}

payload := trie.ReadSinglePayload(r.Path)
return payload.DeepCopy(), nil
}

// Read reads values for an slice of paths and returns values and error (if any)
// TODO: can be optimized further if we don't care about changing the order of the input r.Paths
func (f *Forest) Read(r *ledger.TrieRead) ([]*ledger.Payload, error) {
Expand All @@ -137,6 +149,12 @@ func (f *Forest) Read(r *ledger.TrieRead) ([]*ledger.Payload, error) {
return nil, err
}

// call ReadSinglePayload if there is only one path
if len(r.Paths) == 1 {
payload := trie.ReadSinglePayload(r.Paths[0])
return []*ledger.Payload{payload.DeepCopy()}, nil
}

// deduplicate keys:
// Generally, we expect the VM to deduplicate reads and writes. Hence, the following is a pre-caution.
// TODO: We could take out the following de-duplication logic
Expand Down
60 changes: 60 additions & 0 deletions ledger/complete/mtrie/forest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,66 @@ func TestReadNonExistingPath(t *testing.T) {
require.True(t, retPayloads[0].IsEmpty())
}

// TestReadSinglePayload tests reading a single payload of set/unset register.
func TestReadSinglePayload(t *testing.T) {
forest, err := NewForest(5, &metrics.NoopCollector{}, nil)
require.NoError(t, err)

// path: 01111101...
path1 := pathByUint8s([]uint8{uint8(125), uint8(23)})
payload1 := payloadBySlices([]byte{'A'}, []byte{'A'})

// path: 10110010...
path2 := pathByUint8s([]uint8{uint8(178), uint8(152)})
payload2 := payloadBySlices([]byte{'B'}, []byte{'B'})

paths := []ledger.Path{path1, path2}
payloads := []*ledger.Payload{payload1, payload2}

update := &ledger.TrieUpdate{RootHash: forest.GetEmptyRootHash(), Paths: paths, Payloads: payloads}
baseRoot, err := forest.Update(update)
require.NoError(t, err)

// path: 01101110...
path3 := pathByUint8s([]uint8{uint8(110), uint8(48)})
payload3 := ledger.EmptyPayload()

// path: 00010111...
path4 := pathByUint8s([]uint8{uint8(23), uint8(82)})
payload4 := ledger.EmptyPayload()

expectedPayloads := make(map[ledger.Path]*ledger.Payload)
expectedPayloads[path1] = payload1
expectedPayloads[path2] = payload2
expectedPayloads[path3] = payload3
expectedPayloads[path4] = payload4

// Batch read one payload at a time (less efficient)
for path, payload := range expectedPayloads {
read := &ledger.TrieRead{RootHash: baseRoot, Paths: []ledger.Path{path}}
retPayloads, err := forest.Read(read)
require.NoError(t, err)
require.Equal(t, 1, len(retPayloads))
if payload.IsEmpty() {
require.True(t, retPayloads[0].IsEmpty())
} else {
require.Equal(t, payload, retPayloads[0])
}
}

// Read single payload
for path, payload := range expectedPayloads {
read := &ledger.TrieReadSinglePayload{RootHash: baseRoot, Path: path}
retPayload, err := forest.ReadSinglePayload(read)
require.NoError(t, err)
if payload.IsEmpty() {
require.True(t, retPayload.IsEmpty())
} else {
require.Equal(t, payload, retPayload)
}
}
}

// TestForkingUpdates updates a base trie in two different ways. We expect
// that for each update, a new trie is added to the forest preserving the
// updated values independently of the other update.
Expand Down
Loading