Skip to content
Merged
7 changes: 7 additions & 0 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,10 @@ func Run(ctx context.Context, cfg *config.Config, ready chan struct{}) error {
return fmt.Errorf("failed to start local state index engine: %w", err)
}

// we must wait for state index to be ready before starting the ingestion engine,
// because state index might have to catch-up executed blocks to indexed block height
<-boot.State.Ready()

if err := boot.StartEventIngestion(ctx); err != nil {
return fmt.Errorf("failed to start event ingestion engine: %w", err)
}
Expand All @@ -520,6 +524,9 @@ func Run(ctx context.Context, cfg *config.Config, ready chan struct{}) error {
return fmt.Errorf("failed to start metrics server: %w", err)
}

// wait for event ingestion engine
<-boot.Events.Ready()

// mark ready
close(ready)

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/onflow/atree v0.8.0-rc.6
github.com/onflow/cadence v1.0.0-preview.52
github.com/onflow/flow-go v0.37.10-util-ensure-checkpoint-exists.0.20240918123637-27d2c56494f8
github.com/onflow/flow-go v0.37.10-util-ensure-checkpoint-exists.0.20240914104351-c2d9833c3357
github.com/onflow/flow-go-sdk v1.0.0-preview.56
github.com/onflow/flow/protobuf/go/flow v0.4.7
github.com/onflow/go-ethereum v1.14.7
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1865,8 +1865,8 @@ github.com/onflow/flow-ft/lib/go/contracts v1.0.0 h1:mToacZ5NWqtlWwk/7RgIl/jeKB/
github.com/onflow/flow-ft/lib/go/contracts v1.0.0/go.mod h1:PwsL8fC81cjnUnTfmyL/HOIyHnyaw/JA474Wfj2tl6A=
github.com/onflow/flow-ft/lib/go/templates v1.0.0 h1:6cMS/lUJJ17HjKBfMO/eh0GGvnpElPgBXx7h5aoWJhs=
github.com/onflow/flow-ft/lib/go/templates v1.0.0/go.mod h1:uQ8XFqmMK2jxyBSVrmyuwdWjTEb+6zGjRYotfDJ5pAE=
github.com/onflow/flow-go v0.37.10-util-ensure-checkpoint-exists.0.20240918123637-27d2c56494f8 h1:5GKWWxpTc2o4EFg+SGvepkTsFeNHRYizQVeX4w05wtI=
github.com/onflow/flow-go v0.37.10-util-ensure-checkpoint-exists.0.20240918123637-27d2c56494f8/go.mod h1:Gdqw1ptnAUuB0izif88PWMK8abe655Hr8iEkXXuUJl4=
github.com/onflow/flow-go v0.37.10-util-ensure-checkpoint-exists.0.20240914104351-c2d9833c3357 h1:7gJ5RVKZEsUqPSKglpMXUBn+hceJ1cd/PsmLVsd5uzQ=
github.com/onflow/flow-go v0.37.10-util-ensure-checkpoint-exists.0.20240914104351-c2d9833c3357/go.mod h1:Gdqw1ptnAUuB0izif88PWMK8abe655Hr8iEkXXuUJl4=
github.com/onflow/flow-go-sdk v1.0.0-M1/go.mod h1:TDW0MNuCs4SvqYRUzkbRnRmHQL1h4X8wURsCw9P9beo=
github.com/onflow/flow-go-sdk v1.0.0-preview.56 h1:ZnFznUXI1V8iZ+cKxoJRIeQwJTHItriKpnoKf8hFFso=
github.com/onflow/flow-go-sdk v1.0.0-preview.56/go.mod h1:rBRNboXaTprn7M0MeO6/R1bxNpctbrx66I2FLp0V6fM=
Expand Down
6 changes: 5 additions & 1 deletion services/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,11 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {
}

batch := e.store.NewBatch()
defer batch.Close()
defer func() {
if err := batch.Close(); err != nil {
e.log.Warn().Err(err).Msg("failed to close batch")
}
}()

// we first index the block
err := e.indexBlock(
Expand Down
2 changes: 1 addition & 1 deletion services/requester/client_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (c *ClientHandler) localClient(height uint64) (*LocalClient, error) {

blockState, err := state.NewBlockState(
block,
pebble.NewRegister(c.store, height),
pebble.NewRegister(c.store, height, nil),
c.config.FlowNetworkID,
c.blocks,
c.receipts,
Expand Down
109 changes: 90 additions & 19 deletions services/state/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"

pebbleDB "github.com/cockroachdb/pebble"
"github.com/google/uuid"
"github.com/onflow/atree"
"github.com/onflow/flow/protobuf/go/flow/executiondata"
Expand All @@ -22,6 +23,15 @@ import (
var _ models.Engine = &Engine{}
var _ models.Subscriber = &Engine{}

// Engine state engine takes care of creating a local state by
// re-executing each block against the local emulator and local
// register index.
// The engine relies on the block publisher to receive new
// block events which is done by the event ingestion engine.
// It also relies on the event ingestion engine to wait for the
// state engine to be ready before subscribing, because on startup
// we have to do a sync between last indexed and last executed block
// during which time we should not receive any other block events.
type Engine struct {
config *config.Config
execution executiondata.ExecutionDataAPIClient
Expand Down Expand Up @@ -59,12 +69,8 @@ func NewStateEngine(
}
}

// todo rethink whether it would be more robust to rely on blocks in the storage
// instead of receiving events, relying on storage and keeping a separate count of
// transactions executed would allow for independent restart and reexecution
// if we panic with events the missed tx won't get reexecuted since it's relying on
// event ingestion also not indexing that transaction

// Notify will get new events for blocks from the blocks publisher,
// which is being produced by the event ingestion engine.
func (e *Engine) Notify(data any) {
block, ok := data.(*models.Block)
if !ok {
Expand All @@ -83,6 +89,37 @@ func (e *Engine) Notify(data any) {
}

func (e *Engine) Run(ctx context.Context) error {
// check if we need to execute any blocks that were indexed but not executed
// this could happen if after index but before execution the node crashes
indexed, err := e.blocks.LatestIndexedHeight()
if err != nil {
return err
}

executed, err := e.blocks.LatestExecutedHeight()
if err != nil {
return err
}

if executed < indexed {
e.logger.Info().
Uint64("last-executed", executed).
Uint64("last-indexed", indexed).
Msg("syncing executed blocks on startup")

for i := executed; i <= indexed; i++ {
block, err := e.blocks.GetByHeight(i)
if err != nil {
return err
}

if err := e.executeBlock(block); err != nil {
return err
}
}
}

// after all is up to sync we subscribe to live blocks
e.blockPublisher.Subscribe(e)
e.status.MarkReady()
return nil
Expand Down Expand Up @@ -115,8 +152,16 @@ func (e *Engine) ID() uuid.UUID {
// Transaction executed should match a receipt we have indexed from the network
// produced by execution nodes. This check makes sure we keep a correct state.
func (e *Engine) executeBlock(block *models.Block) error {
// start a new database batch
batch := e.store.NewBatch()
defer func() {
if err := batch.Close(); err != nil {
e.logger.Warn().Err(err).Msg("failed to close batch")
}
}()

var registers atree.Ledger
registers = pebble.NewRegister(e.store, block.Height)
registers = pebble.NewRegister(e.store, block.Height, batch)

// if validation is enabled wrap the register ledger into a validator
if e.config.ValidateRegisters {
Expand Down Expand Up @@ -150,21 +195,47 @@ func (e *Engine) executeBlock(block *models.Block) error {
}

if e.config.ValidateRegisters {
cadenceHeight, err := e.blocks.GetCadenceHeight(block.Height)
if err != nil {
e.logger.Error().Err(err).Msg("register validation failed, block cadence height")
if err := e.validateBlock(registers, block); err != nil {
return err
}
}

validator := registers.(*RegisterValidator)
if err := validator.ValidateBlock(cadenceHeight); err != nil {
if errors.Is(err, errs.ErrStateMismatch) {
return err
}
// if there were issues with the client request only log the error
e.logger.Error().Err(err).Msg("register validation failed")
if err := e.blocks.SetExecutedHeight(block.Height); err != nil {
return err
}

if err := batch.Commit(pebbleDB.Sync); err != nil {
return fmt.Errorf("failed to commit executed data for block %d: %w", block.Height, err)
}

return nil
}

// validateBlock validates the block updated registers using the register validator.
// If there's any register mismatch it returns an error.
//
// todo remove:
// Currently, this is done synchronous but could be improved in the future, however this register
// validation using the AN APIs will be completely replaced with the state commitment checksum once
// the work is done on core: https://github.com/onflow/flow-go/pull/6451
func (e *Engine) validateBlock(registers atree.Ledger, block *models.Block) error {
validator, ok := registers.(*RegisterValidator)
if !ok {
return fmt.Errorf("invalid register validator used")
}

cadenceHeight, err := e.blocks.GetCadenceHeight(block.Height)
if err != nil {
e.logger.Error().Err(err).Msg("register validation failed, block cadence height")
}

if err := validator.ValidateBlock(cadenceHeight); err != nil {
if errors.Is(err, errs.ErrStateMismatch) {
return err
}
// if there were issues with the client request only log the error
e.logger.Error().Err(err).Msg("register validation failed")
}

// update executed block height
return e.blocks.SetExecutedHeight(block.Height)
return nil
}
19 changes: 13 additions & 6 deletions storage/pebble/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (b *Blocks) Store(
)
}

if err := b.store.set(latestEVMHeightKey, nil, evmHeightBytes, batch); err != nil {
if err := b.store.set(latestIndexedHeight, nil, evmHeightBytes, batch); err != nil {
return fmt.Errorf(
"failed to store latest EVM height: %d, with: %w",
block.Height,
Expand Down Expand Up @@ -178,7 +178,7 @@ func (b *Blocks) LatestIndexedHeight() (uint64, error) {
}

func (b *Blocks) latestEVMHeight() (uint64, error) {
val, err := b.store.get(latestEVMHeightKey)
val, err := b.store.get(latestIndexedHeight)
if err != nil {
if errors.Is(err, errs.ErrEntityNotFound) {
return 0, errs.ErrStorageNotInitialized
Expand Down Expand Up @@ -227,8 +227,12 @@ func (b *Blocks) InitHeights(cadenceHeight uint64, cadenceID flow.Identifier) er
return fmt.Errorf("failed to init latest Cadence height at: %d, with: %w", cadenceHeight, err)
}

if err := b.store.set(latestEVMHeightKey, nil, uint64Bytes(0), nil); err != nil {
return fmt.Errorf("failed to init latest EVM height at: %d, with: %w", 0, err)
if err := b.store.set(latestIndexedHeight, nil, uint64Bytes(0), nil); err != nil {
return fmt.Errorf("failed to init latest indexed EVM height at: %d, with: %w", 0, err)
}

if err := b.store.set(latestExecutedHeight, nil, uint64Bytes(0), nil); err != nil {
return fmt.Errorf("failed to init latest executed EVM height at: %d, with: %w", 0, err)
}

// we store genesis block because it isn't emitted over the network
Expand Down Expand Up @@ -268,15 +272,18 @@ func (b *Blocks) SetExecutedHeight(evmHeight uint64) error {
b.mux.Lock()
defer b.mux.Unlock()

return b.store.set(evmHeightIndex, nil, uint64Bytes(evmHeight), nil)
return b.store.set(latestExecutedHeight, nil, uint64Bytes(evmHeight), nil)
}

func (b *Blocks) LatestExecutedHeight() (uint64, error) {
b.mux.RLock()
defer b.mux.RUnlock()

val, err := b.store.get(evmHeightIndex)
val, err := b.store.get(latestExecutedHeight)
if err != nil {
if errors.Is(err, errs.ErrEntityNotFound) {
return 0, errs.ErrStorageNotInitialized
}
return 0, err
}

Expand Down
4 changes: 2 additions & 2 deletions storage/pebble/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const (
blockIDToHeightKey = byte(2)
evmHeightToCadenceHeightKey = byte(3)
evmHeightToCadenceIDKey = byte(4)
evmHeightIndex = byte(5)
latestExecutedHeight = byte(5)

// transaction keys
txIDKey = byte(10)
Expand All @@ -30,7 +30,7 @@ const (
ledgerSlabIndex = byte(51)

// special keys
latestEVMHeightKey = byte(100)
latestIndexedHeight = byte(100)
latestCadenceHeightKey = byte(102)
)

Expand Down
2 changes: 1 addition & 1 deletion storage/pebble/receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (r *Receipts) BloomsForBlockRange(start, end uint64) ([]*models.BloomsHeigh
}

func (r *Receipts) getLast() (uint64, error) {
l, err := r.store.get(latestEVMHeightKey)
l, err := r.store.get(latestIndexedHeight)
if err != nil {
return 0, fmt.Errorf("failed getting latest EVM height: %w", err)
}
Expand Down
22 changes: 17 additions & 5 deletions storage/pebble/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,20 @@ var _ atree.Ledger = &Register{}
type Register struct {
height uint64
store *Storage
batch *pebble.Batch
mux sync.RWMutex
}

func NewRegister(store *Storage, height uint64) *Register {
// NewRegister creates a new index instance at the provided height, all reads and
// writes of the registers will happen at that height.
//
// Batch is an optional argument, if provided the operations will be performed
// inside that batch that later needs to be committed by the provider of the batch.
func NewRegister(store *Storage, height uint64, batch *pebble.Batch) *Register {
return &Register{
store: store,
height: height,
batch: batch,
mux: sync.RWMutex{},
}
}
Expand All @@ -31,7 +38,12 @@ func (l *Register) GetValue(owner, key []byte) ([]byte, error) {
l.mux.RLock()
defer l.mux.RUnlock()

iter, err := l.store.db.NewIter(&pebble.IterOptions{
var db pebble.Reader = l.store.db
if l.batch != nil {
db = l.batch
}

iter, err := db.NewIter(&pebble.IterOptions{
LowerBound: l.idLower(owner, key),
UpperBound: l.idUpper(owner, key),
})
Expand Down Expand Up @@ -70,7 +82,7 @@ func (l *Register) SetValue(owner, key, value []byte) error {
defer l.mux.Unlock()

id := l.id(owner, key)
if err := l.store.set(ledgerValue, id, value, nil); err != nil {
if err := l.store.set(ledgerValue, id, value, l.batch); err != nil {
return fmt.Errorf(
"failed to store ledger value for owner %x and key %x: %w",
owner,
Expand All @@ -97,7 +109,7 @@ func (l *Register) AllocateSlabIndex(owner []byte) (atree.SlabIndex, error) {

var index atree.SlabIndex

val, err := l.store.get(ledgerSlabIndex, owner)
val, err := l.store.batchGet(l.batch, ledgerSlabIndex, owner)
if err != nil {
if !errors.Is(err, errs.ErrEntityNotFound) {
return atree.SlabIndexUndefined, err
Expand All @@ -116,7 +128,7 @@ func (l *Register) AllocateSlabIndex(owner []byte) (atree.SlabIndex, error) {
}

index = index.Next()
if err := l.store.set(ledgerSlabIndex, owner, index[:], nil); err != nil {
if err := l.store.set(ledgerSlabIndex, owner, index[:], l.batch); err != nil {
return atree.SlabIndexUndefined, fmt.Errorf(
"slab index failed to set for owner %x: %w",
owner,
Expand Down
Loading