Skip to content

Commit

Permalink
Implement BlockProvider for block snapshotting
Browse files Browse the repository at this point in the history
  • Loading branch information
m-Peter committed Oct 29, 2024
1 parent d3e3847 commit db86472
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 0 deletions.
26 changes: 26 additions & 0 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bootstrap

import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/onflow/flow-go-sdk/access/grpc"
"github.com/onflow/flow-go-sdk/crypto"
gethTypes "github.com/onflow/go-ethereum/core/types"
"github.com/onflow/go-ethereum/eth/tracers"
"github.com/rs/zerolog"
"github.com/sethvargo/go-limiter/memorystore"
grpcOpts "google.golang.org/grpc"
Expand All @@ -25,6 +27,15 @@ import (
"github.com/onflow/flow-evm-gateway/services/traces"
"github.com/onflow/flow-evm-gateway/storage"
"github.com/onflow/flow-evm-gateway/storage/pebble"

// this import is needed for side-effects, because the
// tracers.DefaultDirectory is relying on the init function
_ "github.com/onflow/go-ethereum/eth/tracers/native"
)

const (
callTracerConfig = `{ "onlyTopCall": true }`
callTracerName = "callTracer"
)

type Storages struct {
Expand Down Expand Up @@ -124,9 +135,24 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error {
b.logger,
)

tracer, err := tracers.DefaultDirectory.New(
callTracerName,
&tracers.Context{},
json.RawMessage(callTracerConfig),
)
if err != nil {
return err
}
blocksProvider := pebble.NewBlocksProvider(
b.storages.Blocks,
b.config.FlowNetworkID,
tracer,
)

// initialize event ingestion engine
b.events = ingestion.NewEventIngestionEngine(
subscriber,
blocksProvider,
b.storages.Storage,
b.storages.Blocks,
b.storages.Receipts,
Expand Down
19 changes: 19 additions & 0 deletions services/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Engine struct {
*models.EngineStatus

subscriber EventSubscriber
blocksProvider *pebble.BlocksProvider
store *pebble.Storage
blocks storage.BlockIndexer
receipts storage.ReceiptIndexer
Expand All @@ -49,6 +50,7 @@ type Engine struct {

func NewEventIngestionEngine(
subscriber EventSubscriber,
blocksProvider *pebble.BlocksProvider,
store *pebble.Storage,
blocks storage.BlockIndexer,
receipts storage.ReceiptIndexer,
Expand All @@ -65,6 +67,7 @@ func NewEventIngestionEngine(
EngineStatus: models.NewEngineStatus(),

subscriber: subscriber,
blocksProvider: blocksProvider,
store: store,
blocks: blocks,
receipts: receipts,
Expand Down Expand Up @@ -185,6 +188,22 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {
return fmt.Errorf("failed to index receipts for block %d event: %w", events.Block().Height, err)
}

if err := e.blocksProvider.OnBlockReceived(events.Block()); err != nil {
return fmt.Errorf(
"failed to call OnBlockReceived for block %d, with: %w",
events.Block().Height,
err,
)
}

if err := e.blocksProvider.OnBlockExecuted(events.Block().Height, &pebble.ResultsCollector{}); err != nil {
return fmt.Errorf(
"failed to call OnBlockExecuted for block %d, with: %w",
events.Block().Height,
err,
)
}

if err := batch.Commit(pebbleDB.Sync); err != nil {
return fmt.Errorf("failed to commit indexed data for Cadence block %d: %w", events.CadenceHeight(), err)
}
Expand Down
5 changes: 5 additions & 0 deletions services/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestSerialBlockIngestion(t *testing.T) {

engine := NewEventIngestionEngine(
subscriber,
pebble.NewBlocksProvider(blocks, flowGo.Emulator, nil),
store,
blocks,
receipts,
Expand Down Expand Up @@ -143,6 +144,7 @@ func TestSerialBlockIngestion(t *testing.T) {

engine := NewEventIngestionEngine(
subscriber,
pebble.NewBlocksProvider(blocks, flowGo.Emulator, nil),
store,
blocks,
receipts,
Expand Down Expand Up @@ -258,6 +260,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) {

engine := NewEventIngestionEngine(
subscriber,
pebble.NewBlocksProvider(blocks, flowGo.Emulator, nil),
store,
blocks,
receipts,
Expand Down Expand Up @@ -361,6 +364,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) {

engine := NewEventIngestionEngine(
subscriber,
pebble.NewBlocksProvider(blocks, flowGo.Emulator, nil),
store,
blocks,
receipts,
Expand Down Expand Up @@ -457,6 +461,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) {

engine := NewEventIngestionEngine(
subscriber,
pebble.NewBlocksProvider(blocks, flowGo.Emulator, nil),
store,
blocks,
receipts,
Expand Down
144 changes: 144 additions & 0 deletions storage/pebble/blocks_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package pebble

import (
"fmt"

"github.com/onflow/flow-evm-gateway/models"
"github.com/onflow/flow-evm-gateway/storage"
evmTypes "github.com/onflow/flow-go/fvm/evm/types"
flowGo "github.com/onflow/flow-go/model/flow"
gethCommon "github.com/onflow/go-ethereum/common"
"github.com/onflow/go-ethereum/eth/tracers"
)

type ResultsCollector struct{}

func (s *ResultsCollector) StorageRegisterUpdates() map[flowGo.RegisterID]flowGo.RegisterValue {
return map[flowGo.RegisterID]flowGo.RegisterValue{}
}

var _ evmTypes.ReplayResultCollector = (*ResultsCollector)(nil)

type BlocksProvider struct {
blocks storage.BlockIndexer
chainID flowGo.ChainID
tracer *tracers.Tracer
latestBlock *models.Block
}

var _ evmTypes.BlockSnapshotProvider = (*BlocksProvider)(nil)
var _ evmTypes.BlockSnapshot = (*BlocksProvider)(nil)

func NewBlocksProvider(
blocks storage.BlockIndexer,
chainID flowGo.ChainID,
tracer *tracers.Tracer,
) *BlocksProvider {
return &BlocksProvider{
blocks: blocks,
chainID: chainID,
tracer: tracer,
}
}

func (bp *BlocksProvider) OnBlockReceived(block *models.Block) error {
if bp.latestBlock != nil {
return fmt.Errorf(
"received new block: %d, while still processing latest block: %d",
block.Height,
bp.latestBlock.Height,
)
}

bp.latestBlock = block

return nil
}

func (bp *BlocksProvider) OnBlockExecuted(
height uint64,
resCol evmTypes.ReplayResultCollector,
) error {
if bp.latestBlock == nil {
return fmt.Errorf(
"received block execution for: %d without latest block",
height,
)
}

if bp.latestBlock.Height != height {
return fmt.Errorf(
"latest block height doesn't match expected: %d, got: %d",
bp.latestBlock.Height,
height,
)
}

bp.latestBlock = nil

return nil
}

func (bp *BlocksProvider) GetSnapshotAt(height uint64) (
evmTypes.BlockSnapshot,
error,
) {
if bp.latestBlock != nil {
return bp, nil
}

block, err := bp.blocks.GetByHeight(height)
if err != nil {
return nil, err
}

return &BlocksProvider{
blocks: bp.blocks,
chainID: bp.chainID,
tracer: bp.tracer,
latestBlock: block,
}, nil
}

func (bs *BlocksProvider) BlockContext() (evmTypes.BlockContext, error) {
if bs.latestBlock == nil {
return evmTypes.BlockContext{}, fmt.Errorf(
"cannot create block context without latest block",
)
}

block := bs.latestBlock
return evmTypes.BlockContext{
ChainID: evmTypes.EVMChainIDFromFlowChainID(bs.chainID),
BlockNumber: block.Height,
BlockTimestamp: block.Timestamp,
DirectCallBaseGasUsage: evmTypes.DefaultDirectCallBaseGasUsage,
DirectCallGasPrice: evmTypes.DefaultDirectCallGasPrice,
GasFeeCollector: evmTypes.CoinbaseAddress,
GetHashFunc: func(n uint64) gethCommon.Hash {
// For block heights greater than or equal to the current,
// return an empty block hash.
if n >= block.Height {
return gethCommon.Hash{}
}
// If the given block height, is more than 256 blocks
// in the past, return an empty block hash.
if block.Height-n > 256 {
return gethCommon.Hash{}
}

block, err := bs.blocks.GetByHeight(n)
if err != nil {
return gethCommon.Hash{}
}
blockHash, err := block.Hash()
if err != nil {
return gethCommon.Hash{}
}

return blockHash
},
Random: block.PrevRandao,
Tracer: bs.tracer,
}, nil
}

0 comments on commit db86472

Please sign in to comment.