From 3b35896fde608aded7bc11ca65f00472619752f4 Mon Sep 17 00:00:00 2001 From: testinginprod Date: Fri, 4 Oct 2024 15:43:12 +0200 Subject: [PATCH 1/5] enhance merged iter --- server/v2/stf/branch/changeset.go | 3 +- server/v2/stf/branch/mergeiter.go | 307 ++++++++++++------------------ 2 files changed, 121 insertions(+), 189 deletions(-) diff --git a/server/v2/stf/branch/changeset.go b/server/v2/stf/branch/changeset.go index 07a29345aee7..8aac45f8953f 100644 --- a/server/v2/stf/branch/changeset.go +++ b/server/v2/stf/branch/changeset.go @@ -3,6 +3,7 @@ package branch import ( "bytes" "errors" + "fmt" "github.com/tidwall/btree" ) @@ -165,7 +166,7 @@ func (mi *memIterator) Close() error { // Otherwise, it returns nil. func (mi *memIterator) Error() error { if !mi.Valid() { - return errInvalidIterator + return fmt.Errorf("invalid iterator") } return nil } diff --git a/server/v2/stf/branch/mergeiter.go b/server/v2/stf/branch/mergeiter.go index e71b88cffc42..c35ed9ddbb30 100644 --- a/server/v2/stf/branch/mergeiter.go +++ b/server/v2/stf/branch/mergeiter.go @@ -8,228 +8,159 @@ import ( ) // mergedIterator merges a parent Iterator and a cache Iterator. -// The cache iterator may return nil keys to signal that an item -// had been deleted (but not deleted in the parent). -// If the cache iterator has the same key as the parent, the -// cache shadows (overrides) the parent. -type mergedIterator struct { - parent corestore.Iterator - cache corestore.Iterator - ascending bool - - valid bool +// The cache iterator may contain items that shadow or override items in the parent iterator. +// If the cache iterator has the same key as the parent, the cache's value takes precedence. +// Deleted items in the cache (indicated by nil values) are skipped. +type mergedIterator[Parent, Cache corestore.Iterator] struct { + parent Parent // Iterator for the parent store + cache Cache // Iterator for the cache store + ascending bool // Direction of iteration + valid bool // Indicates if the iterator is in a valid state + currKey []byte // Current key pointed by the iterator + currValue []byte // Current value corresponding to currKey + err error // Error encountered during iteration } -var _ corestore.Iterator = (*mergedIterator)(nil) +// Ensure mergedIterator implements the corestore.Iterator interface. +var _ corestore.Iterator = (*mergedIterator[corestore.Iterator, corestore.Iterator])(nil) -// mergeIterators merges two iterators. -func mergeIterators(parent, cache corestore.Iterator, ascending bool) corestore.Iterator { - iter := &mergedIterator{ +// mergeIterators creates a new merged iterator from parent and cache iterators. +// The 'ascending' parameter determines the direction of iteration. +func mergeIterators[Parent, Cache corestore.Iterator](parent Parent, cache Cache, ascending bool) *mergedIterator[Parent, Cache] { + iter := &mergedIterator[Parent, Cache]{ parent: parent, cache: cache, ascending: ascending, } - - iter.valid = iter.skipUntilExistsOrInvalid() + iter.advance() // Initialize the iterator by advancing to the first valid item return iter } -// Domain implements Iterator. -// Returns parent domain because cache and parent domains are the same. -func (iter *mergedIterator) Domain() (start, end []byte) { - return iter.parent.Domain() -} - -// Valid implements Iterator. -func (iter *mergedIterator) Valid() bool { - return iter.valid -} - -// Next implements Iterator -func (iter *mergedIterator) Next() { - iter.assertValid() - - switch { - case !iter.parent.Valid(): - // If parent is invalid, get the next cache item. - iter.cache.Next() - case !iter.cache.Valid(): - // If cache is invalid, get the next parent item. - iter.parent.Next() - default: - // Both are valid. Compare keys. - keyP, keyC := iter.parent.Key(), iter.cache.Key() - switch iter.compare(keyP, keyC) { - case -1: // parent < cache - iter.parent.Next() - case 0: // parent == cache - iter.parent.Next() - iter.cache.Next() - case 1: // parent > cache - iter.cache.Next() - } - } - iter.valid = iter.skipUntilExistsOrInvalid() -} - -// Key implements Iterator -func (iter *mergedIterator) Key() []byte { - iter.assertValid() - - // If parent is invalid, get the cache key. - if !iter.parent.Valid() { - return iter.cache.Key() - } - - // If cache is invalid, get the parent key. - if !iter.cache.Valid() { - return iter.parent.Key() - } - - // Both are valid. Compare keys. - keyP, keyC := iter.parent.Key(), iter.cache.Key() - - cmp := iter.compare(keyP, keyC) - switch cmp { - case -1: // parent < cache - return keyP - case 0: // parent == cache - return keyP - case 1: // parent > cache - return keyC - default: - panic("invalid compare result") - } +// Domain returns the start and end range of the iterator. +// It delegates to the parent iterator as both iterators share the same domain. +func (i *mergedIterator[Parent, Cache]) Domain() (start, end []byte) { + return i.parent.Domain() } -// Value implements Iterator -func (iter *mergedIterator) Value() []byte { - iter.assertValid() - - // If parent is invalid, get the cache value. - if !iter.parent.Valid() { - return iter.cache.Value() - } - - // If cache is invalid, get the parent value. - if !iter.cache.Valid() { - return iter.parent.Value() - } - - // Both are valid. Compare keys. - keyP, keyC := iter.parent.Key(), iter.cache.Key() - - cmp := iter.compare(keyP, keyC) - switch cmp { - case -1: // parent < cache - return iter.parent.Value() - case 0: // parent == cache - return iter.cache.Value() - case 1: // parent > cache - return iter.cache.Value() - default: - panic("invalid comparison result") - } +// Valid checks if the iterator is in a valid state. +// It returns true if the iterator has not reached the end. +func (i *mergedIterator[Parent, Cache]) Valid() bool { + return i.valid } -// Close implements Iterator -func (iter *mergedIterator) Close() error { - err1 := iter.cache.Close() - if err := iter.parent.Close(); err != nil { - return err +// Next advances the iterator to the next valid item. +// It skips over deleted items (with nil values) and updates the current key and value. +func (i *mergedIterator[Parent, Cache]) Next() { + if !i.valid { + i.err = errors.New("invalid iterator") + return } - - return err1 + i.advance() } -var errInvalidIterator = errors.New("invalid merged iterator") - -// Error returns an error if the mergedIterator is invalid defined by the -// Valid method. -func (iter *mergedIterator) Error() error { - if !iter.Valid() { - return errInvalidIterator +// Key returns the current key pointed by the iterator. +// If the iterator is invalid, it returns nil. +func (i *mergedIterator[Parent, Cache]) Key() []byte { + if !i.valid { + return nil } - - return nil + return i.currKey } -// If not valid, panics. -// NOTE: May have side-effect of iterating over cache. -func (iter *mergedIterator) assertValid() { - if err := iter.Error(); err != nil { - panic(err) +// Value returns the current value corresponding to the current key. +// If the iterator is invalid, it returns nil. +func (i *mergedIterator[Parent, Cache]) Value() []byte { + if !i.valid { + return nil } + return i.currValue } -// Like bytes.Compare but opposite if not ascending. -func (iter *mergedIterator) compare(a, b []byte) int { - if iter.ascending { - return bytes.Compare(a, b) +// Close closes both the parent and cache iterators. +// It returns any error encountered during the closing of the iterators. +func (i *mergedIterator[Parent, Cache]) Close() error { + err1 := i.parent.Close() + err2 := i.cache.Close() + if err1 != nil { + return err1 } - - return bytes.Compare(a, b) * -1 + return err2 } -// Skip all delete-items from the cache w/ `key < until`. After this function, -// current cache item is a non-delete-item, or `until <= key`. -// If the current cache item is not a delete item, does nothing. -// If `until` is nil, there is no limit, and cache may end up invalid. -// CONTRACT: cache is valid. -func (iter *mergedIterator) skipCacheDeletes(until []byte) { - for iter.cache.Valid() && - iter.cache.Value() == nil && - (until == nil || iter.compare(iter.cache.Key(), until) < 0) { - iter.cache.Next() - } +// Error returns any error that occurred during iteration. +// If the iterator is valid, it returns nil. +func (i *mergedIterator[Parent, Cache]) Error() error { + return i.err } -// Fast forwards cache (or parent+cache in case of deleted items) until current -// item exists, or until iterator becomes invalid. -// Returns whether the iterator is valid. -func (iter *mergedIterator) skipUntilExistsOrInvalid() bool { +// advance moves the iterator to the next valid (non-deleted) item. +// It handles merging logic between the parent and cache iterators. +func (i *mergedIterator[Parent, Cache]) advance() { for { - // If parent is invalid, fast-forward cache. - if !iter.parent.Valid() { - iter.skipCacheDeletes(nil) - return iter.cache.Valid() + // Check if both iterators have reached the end + if !i.parent.Valid() && !i.cache.Valid() { + i.valid = false + return } - // Parent is valid. - if !iter.cache.Valid() { - return true + var key, value []byte + + // If parent iterator is exhausted, use the cache iterator + if !i.parent.Valid() { + key = i.cache.Key() + value = i.cache.Value() + i.cache.Next() + } else if !i.cache.Valid() { + // If cache iterator is exhausted, use the parent iterator + key = i.parent.Key() + value = i.parent.Value() + i.parent.Next() + } else { + // Both iterators are valid; compare keys + keyP, keyC := i.parent.Key(), i.cache.Key() + switch cmp := i.compare(keyP, keyC); { + case cmp < 0: + // Parent key is less than cache key + key = keyP + value = i.parent.Value() + i.parent.Next() + case cmp == 0: + // Keys are equal; cache overrides parent + key = keyC + value = i.cache.Value() + i.parent.Next() + i.cache.Next() + case cmp > 0: + // Cache key is less than parent key + key = keyC + value = i.cache.Value() + i.cache.Next() + } } - // Parent is valid, cache is valid. - - // Compare parent and cache. - keyP := iter.parent.Key() - keyC := iter.cache.Key() - switch iter.compare(keyP, keyC) { - case -1: // parent < cache. - return true + // Skip deleted items (value is nil) + if value == nil { + continue + } - case 0: // parent == cache. - // Skip over if cache item is a delete. - valueC := iter.cache.Value() - if valueC == nil { - iter.parent.Next() - iter.cache.Next() + // Update the current key and value, and mark iterator as valid + i.currKey = key + i.currValue = value + i.valid = true + return + } +} - continue - } - // Cache is not a delete. - - return true // cache exists. - case 1: // cache < parent - // Skip over if cache item is a delete. - valueC := iter.cache.Value() - if valueC == nil { - iter.skipCacheDeletes(keyP) - continue - } - // Cache is not a delete. - return true // cache exists. - } +// compare compares two byte slices a and b. +// It returns an integer comparing a and b: +// - Negative if a < b +// - Zero if a == b +// - Positive if a > b +// +// The comparison respects the iterator's direction (ascending or descending). +func (i *mergedIterator[Parent, Cache]) compare(a, b []byte) int { + if i.ascending { + return bytes.Compare(a, b) } + return bytes.Compare(b, a) } From ee40c11077d2c470332655a488719e648b81ebf2 Mon Sep 17 00:00:00 2001 From: testinginprod Date: Fri, 4 Oct 2024 15:46:58 +0200 Subject: [PATCH 2/5] small tweak --- server/v2/stf/branch/mergeiter.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/v2/stf/branch/mergeiter.go b/server/v2/stf/branch/mergeiter.go index c35ed9ddbb30..e057643c09e3 100644 --- a/server/v2/stf/branch/mergeiter.go +++ b/server/v2/stf/branch/mergeiter.go @@ -7,6 +7,10 @@ import ( corestore "cosmossdk.io/core/store" ) +var ( + errInvalidIterator = errors.New("invalid iterator") +) + // mergedIterator merges a parent Iterator and a cache Iterator. // The cache iterator may contain items that shadow or override items in the parent iterator. // If the cache iterator has the same key as the parent, the cache's value takes precedence. @@ -52,7 +56,7 @@ func (i *mergedIterator[Parent, Cache]) Valid() bool { // It skips over deleted items (with nil values) and updates the current key and value. func (i *mergedIterator[Parent, Cache]) Next() { if !i.valid { - i.err = errors.New("invalid iterator") + i.err = errInvalidIterator return } i.advance() From b4b99fb7b1642918511107c8bec15d51db53724c Mon Sep 17 00:00:00 2001 From: testinginprod Date: Fri, 4 Oct 2024 15:50:36 +0200 Subject: [PATCH 3/5] small revert --- server/v2/stf/branch/changeset.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/v2/stf/branch/changeset.go b/server/v2/stf/branch/changeset.go index 8aac45f8953f..07a29345aee7 100644 --- a/server/v2/stf/branch/changeset.go +++ b/server/v2/stf/branch/changeset.go @@ -3,7 +3,6 @@ package branch import ( "bytes" "errors" - "fmt" "github.com/tidwall/btree" ) @@ -166,7 +165,7 @@ func (mi *memIterator) Close() error { // Otherwise, it returns nil. func (mi *memIterator) Error() error { if !mi.Valid() { - return fmt.Errorf("invalid iterator") + return errInvalidIterator } return nil } From af57f0d8cb5de5ea82f08fe9bedb977a01ff323d Mon Sep 17 00:00:00 2001 From: testinginprod Date: Tue, 8 Oct 2024 15:19:21 +0200 Subject: [PATCH 4/5] address PR reviews --- server/v2/stf/branch/mergeiter.go | 4 +-- server/v2/stf/branch/mergeiter_test.go | 46 ++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/server/v2/stf/branch/mergeiter.go b/server/v2/stf/branch/mergeiter.go index e057643c09e3..042d9d390b31 100644 --- a/server/v2/stf/branch/mergeiter.go +++ b/server/v2/stf/branch/mergeiter.go @@ -66,7 +66,7 @@ func (i *mergedIterator[Parent, Cache]) Next() { // If the iterator is invalid, it returns nil. func (i *mergedIterator[Parent, Cache]) Key() []byte { if !i.valid { - return nil + panic("called key on invalid iterator") } return i.currKey } @@ -75,7 +75,7 @@ func (i *mergedIterator[Parent, Cache]) Key() []byte { // If the iterator is invalid, it returns nil. func (i *mergedIterator[Parent, Cache]) Value() []byte { if !i.valid { - return nil + panic("called value on invalid iterator") } return i.currValue } diff --git a/server/v2/stf/branch/mergeiter_test.go b/server/v2/stf/branch/mergeiter_test.go index 1a9e13bbe65c..45f10d4c1b36 100644 --- a/server/v2/stf/branch/mergeiter_test.go +++ b/server/v2/stf/branch/mergeiter_test.go @@ -7,6 +7,52 @@ import ( corestore "cosmossdk.io/core/store" ) +func TestMergedIterator_Validity(t *testing.T) { + panics := func(f func()) { + defer func() { + r := recover() + if r == nil { + t.Error("panic expected") + } + }() + + f() + } + + t.Run("panics when calling key on invalid iter", func(t *testing.T) { + parent, err := newMemState().Iterator(nil, nil) + if err != nil { + t.Fatal(err) + } + cache, err := newMemState().Iterator(nil, nil) + if err != nil { + t.Fatal(err) + } + + it := mergeIterators(parent, cache, true) + panics(func() { + it.Key() + }) + }) + + t.Run("panics when calling value on invalid iter", func(t *testing.T) { + parent, err := newMemState().Iterator(nil, nil) + if err != nil { + t.Fatal(err) + } + cache, err := newMemState().Iterator(nil, nil) + if err != nil { + t.Fatal(err) + } + + it := mergeIterators(parent, cache, true) + + panics(func() { + it.Value() + }) + }) +} + func TestMergedIterator_Next(t *testing.T) { specs := map[string]struct { setup func() corestore.Iterator From f5110c966fa7d0c872448cea4d8d6d4de2c2fba8 Mon Sep 17 00:00:00 2001 From: testinginprod Date: Tue, 8 Oct 2024 15:21:43 +0200 Subject: [PATCH 5/5] address PR reviews 2 --- server/v2/stf/branch/mergeiter.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/server/v2/stf/branch/mergeiter.go b/server/v2/stf/branch/mergeiter.go index 042d9d390b31..108d19e7e041 100644 --- a/server/v2/stf/branch/mergeiter.go +++ b/server/v2/stf/branch/mergeiter.go @@ -82,13 +82,11 @@ func (i *mergedIterator[Parent, Cache]) Value() []byte { // Close closes both the parent and cache iterators. // It returns any error encountered during the closing of the iterators. -func (i *mergedIterator[Parent, Cache]) Close() error { - err1 := i.parent.Close() - err2 := i.cache.Close() - if err1 != nil { - return err1 - } - return err2 +func (i *mergedIterator[Parent, Cache]) Close() (err error) { + err = errors.Join(err, i.parent.Close()) + err = errors.Join(err, i.cache.Close()) + i.valid = false + return err } // Error returns any error that occurred during iteration.