Skip to content

Commit

Permalink
Merge pull request onflow#5175 from onflow/leo/optimize-finalized-reader
Browse files Browse the repository at this point in the history
[Storehouse] Optimize finalized reader to use block id index
  • Loading branch information
zhangchiqing authored Dec 22, 2023
2 parents 0783c24 + 17b28f4 commit 7fc5f15
Show file tree
Hide file tree
Showing 14 changed files with 45 additions and 64 deletions.
6 changes: 2 additions & 4 deletions engine/access/rpc/backend/backend_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,12 @@ func (b *backendAccounts) GetAccountAtBlockHeight(
address flow.Address,
height uint64,
) (*flow.Account, error) {
header, err := b.headers.ByHeight(height)
blockID, err := b.headers.BlockIDByHeight(height)
if err != nil {
return nil, rpc.ConvertStorageError(err)
}

blockID := header.ID()

account, err := b.getAccountAtBlock(ctx, address, blockID, header.Height)
account, err := b.getAccountAtBlock(ctx, address, blockID, height)
if err != nil {
b.log.Debug().Err(err).Msgf("failed to get account at height: %d", height)
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion engine/access/rpc/backend/backend_accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (s *BackendAccountsSuite) testGetAccountAtLatestBlock(ctx context.Context,

func (s *BackendAccountsSuite) testGetAccountAtBlockHeight(ctx context.Context, backend *backendAccounts, statusCode codes.Code) {
height := s.block.Header.Height
s.headers.On("ByHeight", height).Return(s.block.Header, nil).Once()
s.headers.On("BlockIDByHeight", height).Return(s.block.Header.ID(), nil).Once()

if statusCode == codes.OK {
actual, err := backend.GetAccountAtBlockHeight(ctx, s.account.Address, height)
Expand Down
3 changes: 1 addition & 2 deletions engine/consensus/approvals/assignment_collector_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,10 @@ func (t *AssignmentCollectorTree) selectCollectorsForFinalizedFork(startHeight,
var fork []*assignmentCollectorVertex
for height := startHeight; height <= finalizedHeight; height++ {
iter := t.forest.GetVerticesAtLevel(height)
finalizedBlock, err := t.headers.ByHeight(height)
finalizedBlockID, err := t.headers.BlockIDByHeight(height)
if err != nil {
return nil, fmt.Errorf("could not retrieve finalized block at height %d: %w", height, err)
}
finalizedBlockID := finalizedBlock.ID()
for iter.HasNext() {
vertex := iter.NextVertex().(*assignmentCollectorVertex)
if finalizedBlockID == vertex.collector.BlockID() {
Expand Down
20 changes: 11 additions & 9 deletions engine/consensus/approvals/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,22 @@ func (s *BaseAssignmentCollectorTestSuite) SetupTest() {
return realstorage.ErrNotFound
}
})
s.Headers.On("ByHeight", mock.Anything).Return(
func(height uint64) *flow.Header {
s.Headers.On("BlockIDByHeight", mock.Anything).Return(
func(height uint64) (flow.Identifier, error) {
if block, found := s.FinalizedAtHeight[height]; found {
return block
return block.ID(), nil
} else {
return nil
return flow.ZeroID, realstorage.ErrNotFound
}
},
func(height uint64) error {
_, found := s.FinalizedAtHeight[height]
if !found {
return realstorage.ErrNotFound
)
s.Headers.On("ByHeight", mock.Anything).Return(
func(height uint64) (*flow.Header, error) {
if block, found := s.FinalizedAtHeight[height]; found {
return block, nil
} else {
return nil, realstorage.ErrNotFound
}
return nil
},
)

Expand Down
4 changes: 2 additions & 2 deletions engine/consensus/sealing/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,11 @@ func (c *Core) processIncorporatedResult(incRes *flow.IncorporatedResult) error
// For incorporating blocks at heights that are already finalized, we check that the incorporating block
// is on the finalized fork. Otherwise, the incorporating block is orphaned, and we can drop the result.
if incorporatedAtHeight <= c.counterLastFinalizedHeight.Value() {
finalized, err := c.headers.ByHeight(incorporatedAtHeight)
finalizedID, err := c.headers.BlockIDByHeight(incorporatedAtHeight)
if err != nil {
return fmt.Errorf("could not retrieve finalized block at height %d: %w", incorporatedAtHeight, err)
}
if finalized.ID() != incRes.IncorporatedBlockID {
if finalizedID != incRes.IncorporatedBlockID {
// it means that we got incorporated incRes for a block which doesn't extend our chain
// and should be discarded from future processing
return engine.NewOutdatedInputErrorf("won't process incorporated incRes from orphan block %s", incRes.IncorporatedBlockID)
Expand Down
19 changes: 5 additions & 14 deletions engine/execution/ingestion/loader/unexecuted_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
type UnexecutedLoader struct {
log zerolog.Logger
state protocol.State
headers storage.Headers // see comments on getHeaderByHeight for why we need it
headers storage.Headers
execState state.ExecutionState
}

Expand Down Expand Up @@ -157,12 +157,12 @@ func (e *UnexecutedLoader) finalizedUnexecutedBlocks(ctx context.Context, finali
}

for ; lastExecuted > rootBlock.Height; lastExecuted-- {
header, err := e.getHeaderByHeight(lastExecuted)
finalizedID, err := e.headers.BlockIDByHeight(lastExecuted)
if err != nil {
return nil, fmt.Errorf("could not get header at height: %v, %w", lastExecuted, err)
}

executed, err := e.execState.IsBlockExecuted(header.Height, header.ID())
executed, err := e.execState.IsBlockExecuted(lastExecuted, finalizedID)
if err != nil {
return nil, fmt.Errorf("could not check whether block is executed: %w", err)
}
Expand All @@ -179,12 +179,12 @@ func (e *UnexecutedLoader) finalizedUnexecutedBlocks(ctx context.Context, finali
// starting from the first unexecuted block, go through each unexecuted and finalized block
// reload its block to execution queues
for height := firstUnexecuted; height <= final.Height; height++ {
header, err := e.getHeaderByHeight(height)
finalizedID, err := e.headers.BlockIDByHeight(height)
if err != nil {
return nil, fmt.Errorf("could not get header at height: %v, %w", height, err)
}

unexecuted = append(unexecuted, header.ID())
unexecuted = append(unexecuted, finalizedID)
}

e.log.Info().
Expand Down Expand Up @@ -227,12 +227,3 @@ func (e *UnexecutedLoader) pendingUnexecutedBlocks(ctx context.Context, finalize

return unexecuted, nil
}

// if the EN is dynamically bootstrapped, the finalized blocks at height range:
// [ sealedRoot.Height, finalizedRoot.Height - 1] can not be retrieved from
// protocol state, but only from headers
func (e *UnexecutedLoader) getHeaderByHeight(height uint64) (*flow.Header, error) {
// we don't use protocol state because for dynamic boostrapped execution node
// the last executed and sealed block is below the finalized root block
return e.headers.ByHeight(height)
}
8 changes: 4 additions & 4 deletions engine/execution/ingestion/loader/unexecuted_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func TestLoadingUnexecutedBlocks(t *testing.T) {
loader := loader.NewUnexecutedLoader(log, ps, headers, es)

// block C is the only finalized block, index its header by its height
headers.EXPECT().ByHeight(blockC.Header.Height).Return(blockC.Header, nil)
headers.EXPECT().BlockIDByHeight(blockC.Header.Height).Return(blockC.Header.ID(), nil)

es.ExecuteBlock(t, blockA)
es.ExecuteBlock(t, blockB)
Expand Down Expand Up @@ -238,7 +238,7 @@ func TestLoadingUnexecutedBlocks(t *testing.T) {
loader := loader.NewUnexecutedLoader(log, ps, headers, es)

// block C is finalized, index its header by its height
headers.EXPECT().ByHeight(blockC.Header.Height).Return(blockC.Header, nil)
headers.EXPECT().BlockIDByHeight(blockC.Header.Height).Return(blockC.Header.ID(), nil)

es.ExecuteBlock(t, blockA)
es.ExecuteBlock(t, blockB)
Expand Down Expand Up @@ -278,7 +278,7 @@ func TestLoadingUnexecutedBlocks(t *testing.T) {
loader := loader.NewUnexecutedLoader(log, ps, headers, es)

// block A is finalized, index its header by its height
headers.EXPECT().ByHeight(blockA.Header.Height).Return(blockA.Header, nil)
headers.EXPECT().BlockIDByHeight(blockA.Header.Height).Return(blockA.Header.ID(), nil)

es.ExecuteBlock(t, blockA)
es.ExecuteBlock(t, blockB)
Expand Down Expand Up @@ -346,7 +346,7 @@ func TestLoadingUnexecutedBlocks(t *testing.T) {
loader := loader.NewUnexecutedLoader(log, ps, headers, es)

// block C is finalized, index its header by its height
headers.EXPECT().ByHeight(blockC.Header.Height).Return(blockC.Header, nil)
headers.EXPECT().BlockIDByHeight(blockC.Header.Height).Return(blockC.Header.ID(), nil)

es.ExecuteBlock(t, blockA)
es.ExecuteBlock(t, blockB)
Expand Down
15 changes: 3 additions & 12 deletions engine/execution/ingestion/loader/unfinalized_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
type UnfinalizedLoader struct {
log zerolog.Logger
state protocol.State
headers storage.Headers // see comments on getHeaderByHeight for why we need it
headers storage.Headers
execState state.FinalizedExecutionState
}

Expand Down Expand Up @@ -60,12 +60,12 @@ func (e *UnfinalizedLoader) LoadUnexecuted(ctx context.Context) ([]flow.Identifi
// reload its block to execution queues
// loading finalized blocks
for height := lastExecuted + 1; height <= final.Height; height++ {
header, err := e.getHeaderByHeight(height)
finalizedID, err := e.headers.BlockIDByHeight(height)
if err != nil {
return nil, fmt.Errorf("could not get header at height: %v, %w", height, err)
}

unexecutedFinalized = append(unexecutedFinalized, header.ID())
unexecutedFinalized = append(unexecutedFinalized, finalizedID)
}

// loaded all pending blocks
Expand All @@ -85,12 +85,3 @@ func (e *UnfinalizedLoader) LoadUnexecuted(ctx context.Context) ([]flow.Identifi

return unexecuted, nil
}

// if the EN is dynamically bootstrapped, the finalized blocks at height range:
// [ sealedRoot.Height, finalizedRoot.Height - 1] can not be retrieved from
// protocol state, but only from headers
func (e *UnfinalizedLoader) getHeaderByHeight(height uint64) (*flow.Header, error) {
// we don't use protocol state because for dynamic boostrapped execution node
// the last executed and sealed block is below the finalized root block
return e.headers.ByHeight(height)
}
6 changes: 3 additions & 3 deletions engine/execution/ingestion/loader/unfinalized_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func TestLoadingUnfinalizedBlocks(t *testing.T) {
es := new(stateMock.FinalizedExecutionState)
es.On("GetHighestFinalizedExecuted").Return(genesis.Header.Height)
headers := new(storage.Headers)
headers.On("ByHeight", blockA.Header.Height).Return(blockA.Header, nil)
headers.On("ByHeight", blockB.Header.Height).Return(blockB.Header, nil)
headers.On("ByHeight", blockC.Header.Height).Return(blockC.Header, nil)
headers.On("BlockIDByHeight", blockA.Header.Height).Return(blockA.Header.ID(), nil)
headers.On("BlockIDByHeight", blockB.Header.Height).Return(blockB.Header.ID(), nil)
headers.On("BlockIDByHeight", blockC.Header.Height).Return(blockC.Header.ID(), nil)

loader := loader.NewUnfinalizedLoader(unittest.Logger(), ps, headers, es)

Expand Down
6 changes: 3 additions & 3 deletions engine/execution/ingestion/stop/stop_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (s stopBoundary) String() string {
// StopControlHeaders is an interface for fetching headers
// Its jut a small subset of storage.Headers for comments see storage.Headers
type StopControlHeaders interface {
ByHeight(height uint64) (*flow.Header, error)
BlockIDByHeight(height uint64) (flow.Identifier, error)
}

// NewStopControl creates new StopControl.
Expand Down Expand Up @@ -476,12 +476,12 @@ func (s *StopControl) blockFinalized(

// Let's find the ID of the block that should be executed last
// which is the parent of the block at the stopHeight
header, err := s.headers.ByHeight(s.stopBoundary.StopBeforeHeight - 1)
finalizedID, err := s.headers.BlockIDByHeight(s.stopBoundary.StopBeforeHeight - 1)
if err != nil {
handleErr(fmt.Errorf("failed to get header by height: %w", err))
return
}
parentID = header.ID()
parentID = finalizedID
}

s.stopBoundary.stopAfterExecuting = parentID
Expand Down
6 changes: 3 additions & 3 deletions engine/execution/ingestion/stop/stop_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,12 @@ type stopControlMockHeaders struct {
headers map[uint64]*flow.Header
}

func (m *stopControlMockHeaders) ByHeight(height uint64) (*flow.Header, error) {
func (m *stopControlMockHeaders) BlockIDByHeight(height uint64) (flow.Identifier, error) {
h, ok := m.headers[height]
if !ok {
return nil, fmt.Errorf("header not found")
return flow.ZeroID, fmt.Errorf("header not found")
}
return h, nil
return h.ID(), nil
}

func TestAddStopForPastBlocks(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions engine/execution/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,11 +487,11 @@ func (s *state) GetHighestExecutedBlockID(ctx context.Context) (uint64, flow.Ide
// when storehouse is enabled, the highest executed block is consisted as
// the highest finalized and executed block
height := s.GetHighestFinalizedExecuted()
header, err := s.headers.ByHeight(height)
finalizedID, err := s.headers.BlockIDByHeight(height)
if err != nil {
return 0, flow.ZeroID, fmt.Errorf("could not get header by height %v: %w", height, err)
}
return height, header.ID(), nil
return height, finalizedID, nil
}

var blockID flow.Identifier
Expand Down
4 changes: 2 additions & 2 deletions module/finalizedreader/finalizedreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ func (r *FinalizedReader) FinalizedBlockIDAtHeight(height uint64) (flow.Identifi
return flow.ZeroID, fmt.Errorf("height not finalized (%v): %w", height, storage.ErrNotFound)
}

header, err := r.headers.ByHeight(height)
finalizedID, err := r.headers.BlockIDByHeight(height)
if err != nil {
return flow.ZeroID, err
}

return header.ID(), nil
return finalizedID, nil
}

// BlockFinalized implements the protocol.Consumer interface, which allows FinalizedReader
Expand Down
6 changes: 3 additions & 3 deletions module/finalizer/consensus/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ func (f *Finalizer) MakeFinal(blockID flow.Identifier) error {
}

if pending.Height <= finalized {
dup, err := f.headers.ByHeight(pending.Height)
dupID, err := f.headers.BlockIDByHeight(pending.Height)
if err != nil {
return fmt.Errorf("could not retrieve finalized equivalent: %w", err)
}
if dup.ID() != blockID {
return fmt.Errorf("cannot finalize pending block conflicting with finalized state (height: %d, pending: %x, finalized: %x)", pending.Height, blockID, dup.ID())
if dupID != blockID {
return fmt.Errorf("cannot finalize pending block conflicting with finalized state (height: %d, pending: %x, finalized: %x)", pending.Height, blockID, dupID)
}
return nil
}
Expand Down

0 comments on commit 7fc5f15

Please sign in to comment.