diff --git a/graft/coreth/plugin/evm/atomic/sync/extender.go b/graft/coreth/plugin/evm/atomic/sync/extender.go index 37b4a35b09f1..537cd2077842 100644 --- a/graft/coreth/plugin/evm/atomic/sync/extender.go +++ b/graft/coreth/plugin/evm/atomic/sync/extender.go @@ -32,10 +32,10 @@ func (a *Extender) Initialize(backend *state.AtomicBackend, trie *state.AtomicTr func (a *Extender) CreateSyncer(client syncclient.LeafClient, verDB *versiondb.Database, summary message.Syncable) (sync.Syncer, error) { atomicSummary, ok := summary.(*Summary) if !ok { - return nil, fmt.Errorf("expected *Summary, got %T", summary) + return nil, fmt.Errorf("atomic sync extender: expected *Summary, got %T", summary) } - return NewSyncer( + syncer, err := NewSyncer( client, verDB, a.trie, @@ -43,6 +43,10 @@ func (a *Extender) CreateSyncer(client syncclient.LeafClient, verDB *versiondb.D atomicSummary.BlockNumber, WithRequestSize(a.requestSize), ) + if err != nil { + return nil, fmt.Errorf("atomic.NewSyncer failed: %w", err) + } + return syncer, nil } // OnFinishBeforeCommit implements the sync.Extender interface by marking the previously last accepted block for the shared memory cursor. diff --git a/graft/coreth/plugin/evm/atomic/sync/syncer.go b/graft/coreth/plugin/evm/atomic/sync/syncer.go index a2963a6d7708..1af8224f7d88 100644 --- a/graft/coreth/plugin/evm/atomic/sync/syncer.go +++ b/graft/coreth/plugin/evm/atomic/sync/syncer.go @@ -146,6 +146,10 @@ func (s *Syncer) Sync(ctx context.Context) error { return s.syncer.Sync(ctx) } +func (*Syncer) UpdateTarget(_ message.Syncable) error { + return nil +} + // Finalize commits any pending database changes to disk. // This ensures that even if the sync is cancelled or fails, we preserve // the progress up to the last fully synced height. diff --git a/graft/coreth/plugin/evm/vmsync/block_queue.go b/graft/coreth/plugin/evm/vmsync/block_queue.go new file mode 100644 index 000000000000..9a8fe86beaa3 --- /dev/null +++ b/graft/coreth/plugin/evm/vmsync/block_queue.go @@ -0,0 +1,94 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package vmsync + +import "sync" + +// BlockOperationType represents the type of operation to perform on a block. +type BlockOperationType int + +const ( + OpAccept BlockOperationType = iota + OpReject + OpVerify +) + +// String returns the string representation of the block operation. +func (op BlockOperationType) String() string { + switch op { + case OpAccept: + return "accept" + case OpReject: + return "reject" + case OpVerify: + return "verify" + default: + return "unknown" + } +} + +// blockOperation represents a queued block operation. +type blockOperation struct { + block EthBlockWrapper + operation BlockOperationType +} + +// blockQueue buffers block operations (accept/reject/verify) that arrive while +// the coordinator is in the Running state. Operations are processed in FIFO order. +// It is cleared (drained) on UpdateSyncTarget to avoid drops and is snapshotted +// at finalization via DequeueBatch. Enqueue is always allowed; a DequeueBatch +// only captures the current buffered operations and clears them, and new enqueues +// after the snapshot are not part of that batch. +type blockQueue struct { + mu sync.Mutex + // buffered operations accumulated before finalization + items []blockOperation +} + +// newBlockQueue creates a new empty queue. +func newBlockQueue() *blockQueue { + return &blockQueue{} +} + +// enqueue appends a block operation to the buffer. Returns true if the operation +// was queued, false if the block is nil. +func (q *blockQueue) enqueue(b EthBlockWrapper, op BlockOperationType) bool { + if b == nil { + return false + } + q.mu.Lock() + defer q.mu.Unlock() + q.items = append(q.items, blockOperation{ + block: b, + operation: op, + }) + return true +} + +// dequeueBatch returns the current buffered operations and clears the buffer. New +// arrivals after the snapshot are not included and remain buffered for later. +func (q *blockQueue) dequeueBatch() []blockOperation { + q.mu.Lock() + defer q.mu.Unlock() + out := q.items + q.items = nil + return out +} + +// removeBelowHeight removes all queued blocks with height <= targetHeight. +// This is called after UpdateSyncTarget to remove blocks that will never be executed +// because the sync target has advanced past them. +func (q *blockQueue) removeBelowHeight(targetHeight uint64) { + q.mu.Lock() + defer q.mu.Unlock() + + filtered := q.items[:0] + for _, op := range q.items { + ethBlock := op.block.GetEthBlock() + if ethBlock != nil && ethBlock.NumberU64() > targetHeight { + filtered = append(filtered, op) + } + } + q.items = filtered +} diff --git a/graft/coreth/plugin/evm/vmsync/block_queue_test.go b/graft/coreth/plugin/evm/vmsync/block_queue_test.go new file mode 100644 index 000000000000..f361575af315 --- /dev/null +++ b/graft/coreth/plugin/evm/vmsync/block_queue_test.go @@ -0,0 +1,76 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package vmsync + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestBlockQueue_EnqueueAndDequeue(t *testing.T) { + q := newBlockQueue() + + // Nil block should be rejected. + require.False(t, q.enqueue(nil, OpAccept)) + + // Enqueue blocks. + for i := uint64(100); i < 105; i++ { + require.True(t, q.enqueue(newMockBlock(i), OpAccept)) + } + + // Dequeue returns all in FIFO order and clears queue. + batch := q.dequeueBatch() + require.Len(t, batch, 5) + for i, op := range batch { + require.Equal(t, uint64(100+i), op.block.GetEthBlock().NumberU64()) + } + + // Queue is now empty. + require.Empty(t, q.dequeueBatch()) +} + +func TestBlockQueue_RemoveBelowHeight(t *testing.T) { + q := newBlockQueue() + + // Enqueue blocks at heights 100-110. + for i := uint64(100); i <= 110; i++ { + q.enqueue(newMockBlock(i), OpAccept) + } + + // Remove blocks at or below height 105. + q.removeBelowHeight(105) + + // Only blocks > 105 should remain (106, 107, 108, 109, 110). + batch := q.dequeueBatch() + require.Len(t, batch, 5) + require.Equal(t, uint64(106), batch[0].block.GetEthBlock().NumberU64()) +} + +func TestBlockQueue_ConcurrentAccess(t *testing.T) { + t.Parallel() + + q := newBlockQueue() + const numGoroutines = 10 + const numOps = 100 + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for g := 0; g < numGoroutines; g++ { + go func(id int) { + defer wg.Done() + for i := 0; i < numOps; i++ { + q.enqueue(newMockBlock(uint64(id*numOps+i)), OpAccept) + } + }(g) + } + + wg.Wait() + + // All operations should have been enqueued. + batch := q.dequeueBatch() + require.Len(t, batch, numGoroutines*numOps) +} diff --git a/graft/coreth/plugin/evm/vmsync/client.go b/graft/coreth/plugin/evm/vmsync/client.go index 435e5b836560..8058311c2070 100644 --- a/graft/coreth/plugin/evm/vmsync/client.go +++ b/graft/coreth/plugin/evm/vmsync/client.go @@ -5,10 +5,10 @@ package vmsync import ( "context" + "errors" "fmt" "sync" - "github.com/ava-labs/libevm/common" "github.com/ava-labs/libevm/core/types" "github.com/ava-labs/libevm/ethdb" "github.com/ava-labs/libevm/log" @@ -33,22 +33,57 @@ import ( // The last 256 block hashes are necessary to support the BLOCKHASH opcode. const BlocksToFetch = 256 -var stateSyncSummaryKey = []byte("stateSyncSummary") +var ( + stateSyncSummaryKey = []byte("stateSyncSummary") + + errSkipSync = errors.New("skip sync") + errBlockNotFound = errors.New("block not found in state") + errInvalidBlockType = errors.New("invalid block wrapper type") + errBlockHashMismatch = errors.New("block hash mismatch") + errBlockHeightMismatch = errors.New("block height mismatch") + errCommitCancelled = errors.New("commit cancelled") + errCommitMarkers = errors.New("failed to commit VM markers") +) // BlockAcceptor provides a mechanism to update the last accepted block ID during state synchronization. -// This interface is used by the state sync process to ensure the blockchain state -// is properly updated when new blocks are synchronized from the network. type BlockAcceptor interface { PutLastAcceptedID(ids.ID) error } // EthBlockWrapper can be implemented by a concrete block wrapper type to // return *types.Block, which is needed to update chain pointers at the -// end of the sync operation. +// end of the sync operation. It also provides Accept/Reject/Verify operations +// for deferred processing during dynamic state sync. type EthBlockWrapper interface { GetEthBlock() *types.Block + Accept(context.Context) error + Reject(context.Context) error + Verify(context.Context) error +} + +// Acceptor applies the results of state sync to the VM, preparing it for bootstrapping. +type Acceptor interface { + AcceptSync(ctx context.Context, summary message.Syncable) error } +// Executor defines how state sync is executed. +// Implementations handle the sync lifecycle differently based on sync mode. +type Executor interface { + // Execute runs the sync process and blocks until completion or error. + Execute(ctx context.Context, summary message.Syncable) error + + // OnBlockAccepted handles a block accepted during sync. + OnBlockAccepted(EthBlockWrapper) (bool, error) + + // OnBlockRejected handles a block rejected during sync. + OnBlockRejected(EthBlockWrapper) (bool, error) + + // OnBlockVerified handles a block verified during sync. + OnBlockVerified(EthBlockWrapper) (bool, error) +} + +var _ Acceptor = (*client)(nil) + type ClientConfig struct { Chain *eth.Ethereum State *chain.State @@ -73,25 +108,25 @@ type ClientConfig struct { RequestSize uint16 // number of key/value pairs to ask peers for per request Enabled bool SkipResume bool + // DynamicStateSyncEnabled toggles dynamic vs static state sync orchestration. + DynamicStateSyncEnabled bool + + // PivotInterval advances the sync target every N blocks. + PivotInterval uint64 } type client struct { - *ClientConfig - + config *ClientConfig resumableSummary message.Syncable - - cancel context.CancelFunc - wg sync.WaitGroup - - // State Sync results - summary message.Syncable - err error + cancel context.CancelFunc + wg sync.WaitGroup + err error + stateSyncOnce sync.Once // ensures only one state sync can be in progress at a time + executor Executor // executor manages sync execution (static or dynamic) } func NewClient(config *ClientConfig) Client { - return &client{ - ClientConfig: config, - } + return &client{config: config} } type Client interface { @@ -104,40 +139,49 @@ type Client interface { ClearOngoingSummary() error Shutdown() error Error() error + // OnEngineAccept should be called by the engine when a block is accepted. + // Returns true if the block was enqueued for deferred processing, false otherwise. + OnEngineAccept(EthBlockWrapper) (bool, error) + // OnEngineReject should be called by the engine when a block is rejected. + // Returns true if the block was enqueued for deferred processing, false otherwise. + OnEngineReject(EthBlockWrapper) (bool, error) + // OnEngineVerify should be called by the engine when a block is verified. + // Returns true if the block was enqueued for deferred processing, false otherwise. + OnEngineVerify(EthBlockWrapper) (bool, error) } // StateSyncEnabled returns [client.enabled], which is set in the chain's config file. -func (client *client) StateSyncEnabled(context.Context) (bool, error) { - return client.Enabled, nil +func (c *client) StateSyncEnabled(context.Context) (bool, error) { + return c.config.Enabled, nil } // GetOngoingSyncStateSummary returns a state summary that was previously started // and not finished, and sets [resumableSummary] if one was found. // Returns [database.ErrNotFound] if no ongoing summary is found or if [client.skipResume] is true. -func (client *client) GetOngoingSyncStateSummary(context.Context) (block.StateSummary, error) { - if client.SkipResume { +func (c *client) GetOngoingSyncStateSummary(context.Context) (block.StateSummary, error) { + if c.config.SkipResume { return nil, database.ErrNotFound } - summaryBytes, err := client.MetadataDB.Get(stateSyncSummaryKey) + summaryBytes, err := c.config.MetadataDB.Get(stateSyncSummaryKey) if err != nil { return nil, err // includes the [database.ErrNotFound] case } - summary, err := client.Parser.Parse(summaryBytes, client.acceptSyncSummary) + summary, err := c.config.Parser.Parse(summaryBytes, c.acceptSyncSummary) if err != nil { return nil, fmt.Errorf("failed to parse saved state sync summary to SyncSummary: %w", err) } - client.resumableSummary = summary + c.resumableSummary = summary return summary, nil } // ClearOngoingSummary clears any marker of an ongoing state sync summary -func (client *client) ClearOngoingSummary() error { - if err := client.MetadataDB.Delete(stateSyncSummaryKey); err != nil { +func (c *client) ClearOngoingSummary() error { + if err := c.config.MetadataDB.Delete(stateSyncSummaryKey); err != nil { return fmt.Errorf("failed to clear ongoing summary: %w", err) } - if err := client.VerDB.Commit(); err != nil { + if err := c.config.VerDB.Commit(); err != nil { return fmt.Errorf("failed to commit db while clearing ongoing summary: %w", err) } @@ -145,118 +189,65 @@ func (client *client) ClearOngoingSummary() error { } // ParseStateSummary parses [summaryBytes] to [commonEng.Summary] -func (client *client) ParseStateSummary(_ context.Context, summaryBytes []byte) (block.StateSummary, error) { - return client.Parser.Parse(summaryBytes, client.acceptSyncSummary) +func (c *client) ParseStateSummary(_ context.Context, summaryBytes []byte) (block.StateSummary, error) { + return c.config.Parser.Parse(summaryBytes, c.acceptSyncSummary) } -func (client *client) stateSync(ctx context.Context) error { - // Create and register all syncers. - registry := NewSyncerRegistry() - - if err := client.registerSyncers(registry); err != nil { - return err +// OnEngineAccept delegates to the executor if active. +func (c *client) OnEngineAccept(b EthBlockWrapper) (bool, error) { + if c.executor == nil { + return false, nil } - - // Run all registered syncers sequentially. - return registry.RunSyncerTasks(ctx, client.summary) + return c.executor.OnBlockAccepted(b) } -func (client *client) registerSyncers(registry *SyncerRegistry) error { - // Register block syncer. - blockSyncer, err := client.createBlockSyncer(client.summary.GetBlockHash(), client.summary.Height()) - if err != nil { - return fmt.Errorf("failed to create block syncer: %w", err) - } - - codeQueue, err := client.createCodeQueue() - if err != nil { - return fmt.Errorf("failed to create code queue: %w", err) - } - - codeSyncer, err := client.createCodeSyncer(codeQueue.CodeHashes()) - if err != nil { - return fmt.Errorf("failed to create code syncer: %w", err) - } - - stateSyncer, err := client.createEVMSyncer(codeQueue) - if err != nil { - return fmt.Errorf("failed to create EVM state syncer: %w", err) - } - - var atomicSyncer syncpkg.Syncer - if client.Extender != nil { - atomicSyncer, err = client.createAtomicSyncer() - if err != nil { - return fmt.Errorf("failed to create atomic syncer: %w", err) - } - } - - syncers := []syncpkg.Syncer{ - blockSyncer, - codeSyncer, - stateSyncer, - } - if atomicSyncer != nil { - syncers = append(syncers, atomicSyncer) - } - - for _, s := range syncers { - if err := registry.Register(s); err != nil { - return fmt.Errorf("failed to register %s syncer: %w", s.Name(), err) - } +// OnEngineReject delegates to the executor if active. +func (c *client) OnEngineReject(b EthBlockWrapper) (bool, error) { + if c.executor == nil { + return false, nil } - - return nil + return c.executor.OnBlockRejected(b) } -func (client *client) createBlockSyncer(fromHash common.Hash, fromHeight uint64) (syncpkg.Syncer, error) { - return blocksync.NewSyncer( - client.Client, - client.ChainDB, - fromHash, - fromHeight, - BlocksToFetch, - ) -} - -func (client *client) createEVMSyncer(queue *statesync.CodeQueue) (syncpkg.Syncer, error) { - return statesync.NewSyncer( - client.Client, - client.ChainDB, - client.summary.GetBlockRoot(), - queue, - client.RequestSize, - ) -} - -func (client *client) createCodeQueue() (*statesync.CodeQueue, error) { - return statesync.NewCodeQueue( - client.ChainDB, - client.StateSyncDone, - ) +// OnEngineVerify delegates to the executor if active. +func (c *client) OnEngineVerify(b EthBlockWrapper) (bool, error) { + if c.executor == nil { + return false, nil + } + return c.executor.OnBlockVerified(b) } -func (client *client) createCodeSyncer(codeHashes <-chan common.Hash) (syncpkg.Syncer, error) { - return statesync.NewCodeSyncer(client.Client, client.ChainDB, codeHashes) +func (c *client) Shutdown() error { + c.signalDone(context.Canceled) + c.wg.Wait() + return nil } -func (client *client) createAtomicSyncer() (syncpkg.Syncer, error) { - return client.Extender.CreateSyncer(client.Client, client.VerDB, client.summary) +// Error returns a non-nil error if one occurred during the sync. +func (c *client) Error() error { + return c.err } // acceptSyncSummary returns true if sync will be performed and launches the state sync process // in a goroutine. -func (client *client) acceptSyncSummary(proposedSummary message.Syncable) (block.StateSyncMode, error) { - isResume := client.resumableSummary != nil && - proposedSummary.GetBlockHash() == client.resumableSummary.GetBlockHash() +func (c *client) acceptSyncSummary(proposedSummary message.Syncable) (block.StateSyncMode, error) { + // If dynamic sync is already running, treat new summaries as target updates. + if ds, ok := c.executor.(*dynamicExecutor); ok && ds.CurrentState() == StateRunning { + if err := ds.UpdateSyncTarget(proposedSummary); err != nil { + return block.StateSyncSkipped, err + } + return block.StateSyncDynamic, nil + } + + isResume := c.resumableSummary != nil && + proposedSummary.GetBlockHash() == c.resumableSummary.GetBlockHash() if !isResume { // Skip syncing if the blockchain is not significantly ahead of local state, // since bootstrapping would be faster. - // (Also ensures we don't sync to a height prior to local state.) - if client.LastAcceptedHeight+client.MinBlocks > proposedSummary.Height() { + if c.config.LastAcceptedHeight+c.config.MinBlocks > proposedSummary.Height() { log.Info( "last accepted too close to most recent syncable block, skipping state sync", - "lastAccepted", client.LastAcceptedHeight, + "lastAccepted", c.config.LastAcceptedHeight, "syncableHeight", proposedSummary.Height(), ) return block.StateSyncSkipped, nil @@ -268,82 +259,96 @@ func (client *client) acceptSyncSummary(proposedSummary message.Syncable) (block // sync marker will be wiped, so we do not accidentally resume progress from an incorrect version // of the snapshot. (if switching between versions that come before this change and back this could // lead to the snapshot not being cleaned up correctly) - <-snapshot.WipeSnapshot(client.ChainDB, true) + <-snapshot.WipeSnapshot(c.config.ChainDB, true) // Reset the snapshot generator here so that when state sync completes, snapshots will not attempt to read an // invalid generator. // Note: this must be called after WipeSnapshot is called so that we do not invalidate a partially generated snapshot. - snapshot.ResetSnapshotGeneration(client.ChainDB) + snapshot.ResetSnapshotGeneration(c.config.ChainDB) } - client.summary = proposedSummary - // Update the current state sync summary key in the database - // Note: this must be performed after WipeSnapshot finishes so that we do not start a state sync - // session from a partially wiped snapshot. - if err := client.MetadataDB.Put(stateSyncSummaryKey, proposedSummary.Bytes()); err != nil { + // Update the current state sync summary key in the database. + if err := c.config.MetadataDB.Put(stateSyncSummaryKey, proposedSummary.Bytes()); err != nil { return block.StateSyncSkipped, fmt.Errorf("failed to write state sync summary key to disk: %w", err) } - if err := client.VerDB.Commit(); err != nil { + if err := c.config.VerDB.Commit(); err != nil { return block.StateSyncSkipped, fmt.Errorf("failed to commit db: %w", err) } - log.Info("Starting state sync", "summary", proposedSummary) - - // create a cancellable ctx for the state sync goroutine + log.Info("Starting state sync", "summary", proposedSummary.GetBlockHash().Hex(), "height", proposedSummary.Height()) ctx, cancel := context.WithCancel(context.Background()) - client.cancel = cancel - client.wg.Add(1) // track the state sync goroutine so we can wait for it on shutdown - go func() { - defer client.wg.Done() - defer cancel() + c.cancel = cancel - if err := client.stateSync(ctx); err != nil { - client.err = err - } else { - client.err = client.finishSync() - } - // notify engine regardless of whether err == nil, - // this error will be propagated to the engine when it calls - // vm.SetState(snow.Bootstrapping) - log.Info("stateSync completed, notifying engine", "err", client.err) - close(client.StateSyncDone) + registry, err := c.newSyncerRegistry(proposedSummary) + if err != nil { + return block.StateSyncSkipped, err + } + + var ( + executor Executor + mode block.StateSyncMode + ) + if c.config.DynamicStateSyncEnabled { + executor = newDynamicExecutor(registry, c, c.config.PivotInterval) + mode = block.StateSyncDynamic + } else { + executor = newStaticExecutor(registry, c) + mode = block.StateSyncStatic + } + + c.executor = executor + c.wg.Add(1) + go func() { + defer c.wg.Done() + err := executor.Execute(ctx, proposedSummary) + c.signalDone(err) }() - return block.StateSyncStatic, nil + + log.Info("state sync started", "mode", mode.String(), "summary", proposedSummary.GetBlockHash().Hex(), "height", proposedSummary.Height()) + return mode, nil } -func (client *client) Shutdown() error { - if client.cancel != nil { - client.cancel() - } - client.wg.Wait() // wait for the background goroutine to exit - return nil +// signalDone sets the terminal error exactly once, signals completion to the engine. +func (c *client) signalDone(err error) { + c.stateSyncOnce.Do(func() { + c.err = err + if c.cancel != nil { + c.cancel() + } + close(c.config.StateSyncDone) + }) } -// finishSync is responsible for updating disk and memory pointers so the VM is prepared -// for bootstrapping. Executes any shared memory operations from the atomic trie to shared memory. -func (client *client) finishSync() error { - stateBlock, err := client.State.GetBlock(context.TODO(), ids.ID(client.summary.GetBlockHash())) +// Error returns a non-nil error if one occurred during the sync. +func (c *client) Error() error { return c.err } + +// AcceptSync implements Acceptor. It resets the blockchain to the synced block, +// preparing it for execution, and updates disk and memory pointers so the VM +// is ready for bootstrapping. Also executes any shared memory operations from +// the atomic trie to shared memory. +func (c *client) AcceptSync(ctx context.Context, summary message.Syncable) error { + stateBlock, err := c.config.State.GetBlock(ctx, ids.ID(summary.GetBlockHash())) if err != nil { - return fmt.Errorf("could not get block by hash from client state: %s", client.summary.GetBlockHash()) + return fmt.Errorf("%w: hash=%s", errBlockNotFound, summary.GetBlockHash()) } wrapper, ok := stateBlock.(*chain.BlockWrapper) if !ok { - return fmt.Errorf("could not convert block(%T) to *chain.BlockWrapper", wrapper) + return fmt.Errorf("%w: got %T, want *chain.BlockWrapper", errInvalidBlockType, stateBlock) } wrappedBlock := wrapper.Block evmBlockGetter, ok := wrappedBlock.(EthBlockWrapper) if !ok { - return fmt.Errorf("could not convert block(%T) to evm.EthBlockWrapper", stateBlock) + return fmt.Errorf("%w: got %T, want EthBlockWrapper", errInvalidBlockType, wrappedBlock) } block := evmBlockGetter.GetEthBlock() - if block.Hash() != client.summary.GetBlockHash() { - return fmt.Errorf("attempted to set last summary block to unexpected block hash: (%s != %s)", block.Hash(), client.summary.GetBlockHash()) + if block.Hash() != summary.GetBlockHash() { + return fmt.Errorf("%w: got %s, want %s", errBlockHashMismatch, block.Hash(), summary.GetBlockHash()) } - if block.NumberU64() != client.summary.Height() { - return fmt.Errorf("attempted to set last summary block to unexpected block number: (%d != %d)", block.NumberU64(), client.summary.Height()) + if block.NumberU64() != summary.Height() { + return fmt.Errorf("%w: got %d, want %d", errBlockHeightMismatch, block.NumberU64(), summary.Height()) } // BloomIndexer needs to know that some parts of the chain are not available @@ -356,52 +361,97 @@ func (client *client) finishSync() error { // by [params.BloomBitsBlocks]. parentHeight := block.NumberU64() - 1 parentHash := block.ParentHash() - client.Chain.BloomIndexer().AddCheckpoint(parentHeight/params.BloomBitsBlocks, parentHash) + c.config.Chain.BloomIndexer().AddCheckpoint(parentHeight/params.BloomBitsBlocks, parentHash) - if err := client.Chain.BlockChain().ResetToStateSyncedBlock(block); err != nil { + if err := ctx.Err(); err != nil { + return fmt.Errorf("%w: %w", errCommitCancelled, err) + } + if err := c.config.Chain.BlockChain().ResetToStateSyncedBlock(block); err != nil { return err } - if client.Extender != nil { - if err := client.Extender.OnFinishBeforeCommit(client.LastAcceptedHeight, client.summary); err != nil { + if c.config.Extender != nil { + if err := c.config.Extender.OnFinishBeforeCommit(c.config.LastAcceptedHeight, summary); err != nil { return err } } - if err := client.commitVMMarkers(); err != nil { - return fmt.Errorf("error updating vm markers, height=%d, hash=%s, err=%w", block.NumberU64(), block.Hash(), err) + if err := c.commitMarkers(summary); err != nil { + return fmt.Errorf("%w: height=%d, hash=%s: %w", errCommitMarkers, block.NumberU64(), block.Hash(), err) } - if err := client.State.SetLastAcceptedBlock(wrappedBlock); err != nil { + if err := c.config.State.SetLastAcceptedBlock(wrappedBlock); err != nil { return err } - if client.Extender != nil { - return client.Extender.OnFinishAfterCommit(block.NumberU64()) + if c.config.Extender != nil { + if err := c.config.Extender.OnFinishAfterCommit(block.NumberU64()); err != nil { + return err + } } return nil } -// commitVMMarkers updates the following markers in the VM's database -// and commits them atomically: -// - updates atomic trie so it will have necessary metadata for the last committed root -// - updates atomic trie so it will resume applying operations to shared memory on initialize -// - updates lastAcceptedKey -// - removes state sync progress markers -func (client *client) commitVMMarkers() error { - // Mark the previously last accepted block for the shared memory cursor, so that we will execute shared - // memory operations from the previously last accepted block to [vm.syncSummary] when ApplyToSharedMemory - // is called. - id := ids.ID(client.summary.GetBlockHash()) - if err := client.Acceptor.PutLastAcceptedID(id); err != nil { +// commitMarkers updates VM database markers atomically. +func (c *client) commitMarkers(summary message.Syncable) error { + id := ids.ID(summary.GetBlockHash()) + if err := c.config.Acceptor.PutLastAcceptedID(id); err != nil { return err } - if err := client.MetadataDB.Delete(stateSyncSummaryKey); err != nil { + if err := c.config.MetadataDB.Delete(stateSyncSummaryKey); err != nil { return err } - return client.VerDB.Commit() + return c.config.VerDB.Commit() } -// Error returns a non-nil error if one occurred during the sync. -func (client *client) Error() error { return client.err } +// newSyncerRegistry creates a registry with all required syncers for the given summary. +func (c *client) newSyncerRegistry(summary message.Syncable) (*SyncerRegistry, error) { + registry := NewSyncerRegistry() + + blockSyncer, err := blocksync.NewSyncer( + c.config.Client, c.config.ChainDB, + summary.GetBlockHash(), summary.Height(), + BlocksToFetch, + ) + if err != nil { + return nil, fmt.Errorf("failed to create block syncer: %w", err) + } + + codeQueue, err := statesync.NewCodeQueue(c.config.ChainDB, c.config.StateSyncDone) + if err != nil { + return nil, fmt.Errorf("failed to create code queue: %w", err) + } + + codeSyncer, err := statesync.NewCodeSyncer(c.config.Client, c.config.ChainDB, codeQueue.CodeHashes()) + if err != nil { + return nil, fmt.Errorf("failed to create code syncer: %w", err) + } + + stateSyncer, err := statesync.NewSyncer( + c.config.Client, c.config.ChainDB, + summary.GetBlockRoot(), + codeQueue, c.config.RequestSize, + ) + if err != nil { + return nil, fmt.Errorf("failed to create EVM state syncer: %w", err) + } + + syncers := []syncpkg.Syncer{blockSyncer, codeSyncer, stateSyncer} + + if c.config.Extender != nil { + extenderSyncer, err := c.config.Extender.CreateSyncer(c.config.Client, c.config.VerDB, summary) + if err != nil { + return nil, fmt.Errorf("failed to create extender syncer: %w", err) + } + syncers = append(syncers, extenderSyncer) + } + + for _, s := range syncers { + if err := registry.Register(s); err != nil { + return nil, fmt.Errorf("failed to register %s syncer: %w", s.Name(), err) + } + } + + return registry, nil +} diff --git a/graft/coreth/plugin/evm/vmsync/coordinator.go b/graft/coreth/plugin/evm/vmsync/coordinator.go new file mode 100644 index 000000000000..31b4c21dde86 --- /dev/null +++ b/graft/coreth/plugin/evm/vmsync/coordinator.go @@ -0,0 +1,239 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package vmsync + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + + "github.com/ava-labs/libevm/libevm/options" + + "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/message" +) + +// State represents the lifecycle phases of dynamic state sync orchestration. +type State int + +const ( + StateIdle State = iota + StateInitializing + StateRunning + StateFinalizing + StateExecutingBatch + StateCompleted + StateAborted +) + +var ( + errInvalidTargetType = errors.New("invalid target type") + errInvalidState = errors.New("invalid coordinator state") + errBatchCancelled = errors.New("batch execution cancelled") + errBatchOperationFailed = errors.New("batch operation failed") +) + +// Callbacks allows the coordinator to delegate VM-specific work back to the client. +type Callbacks struct { + // FinalizeVM performs the same actions as finishSync/commitVMMarkers in the client. + // The context is used for cancellation checks during finalization. + FinalizeVM func(ctx context.Context, target message.Syncable) error + // OnDone is called when the coordinator finishes (successfully or with error). + OnDone func(err error) +} + +// Coordinator orchestrates dynamic state sync across multiple syncers. +type Coordinator struct { + // state is managed atomically to allow cheap concurrent checks/updates. + state atomic.Int32 + // target stores the current target [message.Syncable] when [Coordinator.UpdateSyncTarget] is called. + target atomic.Value + queue *blockQueue + syncerRegistry *SyncerRegistry + callbacks Callbacks + // doneOnce ensures [Callbacks.OnDone] is invoked at most once. + doneOnce sync.Once + + // pivotInterval configures the pivot policy throttling. 0 disables throttling. + pivotInterval uint64 + pivot *pivotPolicy +} + +// CoordinatorOption follows the functional options pattern for Coordinator. +type CoordinatorOption = options.Option[Coordinator] + +// WithPivotInterval configures the interval-based pivot policy. 0 disables it. +func WithPivotInterval(interval uint64) CoordinatorOption { + return options.Func[Coordinator](func(co *Coordinator) { + co.pivotInterval = interval + }) +} + +// NewCoordinator constructs a coordinator to orchestrate dynamic state sync across multiple syncers. +func NewCoordinator(syncerRegistry *SyncerRegistry, cbs Callbacks, opts ...CoordinatorOption) *Coordinator { + co := &Coordinator{ + queue: newBlockQueue(), + syncerRegistry: syncerRegistry, + callbacks: cbs, + } + options.ApplyTo(co, opts...) + co.state.Store(int32(StateIdle)) + + return co +} + +// Start launches all syncers and returns immediately. Failures are monitored +// in the background and will transition to [StateAborted]. +func (co *Coordinator) Start(ctx context.Context, initial message.Syncable) { + co.state.Store(int32(StateInitializing)) + co.target.Store(initial) + co.pivot = newPivotPolicy(co.pivotInterval) + + cctx, cancel := context.WithCancelCause(ctx) + g := co.syncerRegistry.StartAsync(cctx, initial) + + co.state.Store(int32(StateRunning)) + + go func() { + if err := g.Wait(); err != nil { + co.finish(cancel, err) + return + } + + if err := co.ProcessQueuedBlockOperations(cctx); err != nil { + co.finish(cancel, err) + return + } + co.finish(cancel, nil) + }() +} + +// ProcessQueuedBlockOperations finalizes the VM and processes queued block operations +// in FIFO order. Called after syncers complete to finalize state and execute deferred operations. +func (co *Coordinator) ProcessQueuedBlockOperations(ctx context.Context) error { + if err := ctx.Err(); err != nil { + return err + } + + co.state.Store(int32(StateFinalizing)) + + if co.callbacks.FinalizeVM != nil { + if err := ctx.Err(); err != nil { + co.state.Store(int32(StateAborted)) + return err + } + + loaded := co.target.Load() + current, ok := loaded.(message.Syncable) + if !ok { + co.state.Store(int32(StateAborted)) + return errInvalidTargetType + } + if err := co.callbacks.FinalizeVM(ctx, current); err != nil { + co.state.Store(int32(StateAborted)) + return err + } + } + + if err := ctx.Err(); err != nil { + co.state.Store(int32(StateAborted)) + return err + } + + co.state.Store(int32(StateExecutingBatch)) + + if err := co.executeBlockOperationBatch(ctx); err != nil { + return err + } + + return nil +} + +// UpdateSyncTarget broadcasts a new target to all syncers and removes stale blocks from queue. +// Only valid in [StateRunning] state. Syncers manage cancellation themselves. +func (co *Coordinator) UpdateSyncTarget(newTarget message.Syncable) error { + if co.CurrentState() != StateRunning { + return errInvalidState + } + if !co.pivot.shouldForward(newTarget.Height()) { + return nil + } + + // Re-check state before modifying queue to handle concurrent transitions. + if co.CurrentState() != StateRunning { + return errInvalidState + } + + // Remove blocks from queue that will never be executed (behind the new target). + co.queue.removeBelowHeight(newTarget.Height()) + + co.target.Store(newTarget) + + if err := co.syncerRegistry.UpdateSyncTarget(newTarget); err != nil { + return err + } + co.pivot.advance() + return nil +} + +// AddBlockOperation appends the block to the queue while in the Running or +// StateExecutingBatch state. Blocks enqueued during batch execution will be +// processed in the next batch. Returns true if the block was queued, false +// if the queue was already sealed or the block is nil. +func (co *Coordinator) AddBlockOperation(b EthBlockWrapper, op BlockOperationType) bool { + if b == nil { + return false + } + state := co.CurrentState() + if state != StateRunning && state != StateExecutingBatch { + return false + } + return co.queue.enqueue(b, op) +} + +func (co *Coordinator) CurrentState() State { + return State(co.state.Load()) +} + +// executeBlockOperationBatch executes queued block operations in FIFO order. +// Partial completion is acceptable as operations are idempotent. +func (co *Coordinator) executeBlockOperationBatch(ctx context.Context) error { + operations := co.queue.dequeueBatch() + for i, op := range operations { + select { + case <-ctx.Done(): + return fmt.Errorf("operation %d/%d: %w", i+1, len(operations), errors.Join(errBatchCancelled, ctx.Err())) + default: + } + + var err error + switch op.operation { + case OpAccept: + err = op.block.Accept(ctx) + case OpReject: + err = op.block.Reject(ctx) + case OpVerify: + err = op.block.Verify(ctx) + } + if err != nil { + return fmt.Errorf("operation %d/%d (%v): %w", i+1, len(operations), op.operation, errors.Join(errBatchOperationFailed, err)) + } + } + return nil +} + +func (co *Coordinator) finish(cancel context.CancelCauseFunc, err error) { + if err != nil { + co.state.Store(int32(StateAborted)) + } else { + co.state.Store(int32(StateCompleted)) + } + if cancel != nil { + cancel(err) + } + if co.callbacks.OnDone != nil { + co.doneOnce.Do(func() { co.callbacks.OnDone(err) }) + } +} diff --git a/graft/coreth/plugin/evm/vmsync/coordinator_test.go b/graft/coreth/plugin/evm/vmsync/coordinator_test.go new file mode 100644 index 000000000000..678dbdb035fd --- /dev/null +++ b/graft/coreth/plugin/evm/vmsync/coordinator_test.go @@ -0,0 +1,137 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package vmsync + +import ( + "context" + "errors" + "sync" + "testing" + + "github.com/ava-labs/libevm/common" + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/message" +) + +func TestCoordinator_StateValidation(t *testing.T) { + co := NewCoordinator(NewSyncerRegistry(), Callbacks{}, WithPivotInterval(1)) + block := newMockBlock(100) + target := newTestSyncTarget(100) + + // States that reject both operations. + for _, state := range []State{StateIdle, StateInitializing, StateFinalizing, StateCompleted, StateAborted} { + co.state.Store(int32(state)) + require.False(t, co.AddBlockOperation(block, OpAccept), "state %d should reject block", state) + err := co.UpdateSyncTarget(target) + require.ErrorIs(t, err, errInvalidState, "state %d should reject target update", state) + } + + // Running: accepts both. + co.state.Store(int32(StateRunning)) + require.True(t, co.AddBlockOperation(block, OpAccept)) + require.NoError(t, co.UpdateSyncTarget(target)) + + // ExecutingBatch: accepts blocks, rejects target updates. + co.state.Store(int32(StateExecutingBatch)) + require.True(t, co.AddBlockOperation(block, OpAccept)) + err := co.UpdateSyncTarget(target) + require.ErrorIs(t, err, errInvalidState) + + // Nil block is always rejected. + co.state.Store(int32(StateRunning)) + require.False(t, co.AddBlockOperation(nil, OpAccept)) +} + +func TestCoordinator_UpdateSyncTarget_RemovesStaleBlocks(t *testing.T) { + co := NewCoordinator(NewSyncerRegistry(), Callbacks{}, WithPivotInterval(1)) + co.state.Store(int32(StateRunning)) + + for i := uint64(100); i <= 110; i++ { + co.AddBlockOperation(newMockBlock(i), OpAccept) + } + + require.NoError(t, co.UpdateSyncTarget(newTestSyncTarget(105))) + + batch := co.queue.dequeueBatch() + require.Len(t, batch, 5) // Only 106-110 remain. +} + +func TestCoordinator_Lifecycle(t *testing.T) { + t.Run("completes successfully", func(t *testing.T) { + registry := NewSyncerRegistry() + require.NoError(t, registry.Register(newMockSyncer("test", nil))) + + co, err := runCoordinator(t, registry, Callbacks{ + FinalizeVM: func(context.Context, message.Syncable) error { return nil }, + }) + + require.NoError(t, err) + require.Equal(t, StateCompleted, co.CurrentState()) + }) + + t.Run("aborts on syncer error", func(t *testing.T) { + expectedErr := errors.New("syncer failed") + registry := NewSyncerRegistry() + require.NoError(t, registry.Register(newMockSyncer("failing", expectedErr))) + + co, err := runCoordinator(t, registry, Callbacks{}) + + require.ErrorIs(t, err, expectedErr) + require.Equal(t, StateAborted, co.CurrentState()) + }) +} + +func TestCoordinator_ProcessQueuedBlockOperations(t *testing.T) { + t.Run("executes queued operations", func(t *testing.T) { + co := NewCoordinator(NewSyncerRegistry(), Callbacks{}) + co.state.Store(int32(StateRunning)) + co.target.Store(newTestSyncTarget(100)) + co.AddBlockOperation(newMockBlock(100), OpAccept) + + require.NoError(t, co.ProcessQueuedBlockOperations(t.Context())) + require.Equal(t, StateExecutingBatch, co.CurrentState()) + }) + + t.Run("returns error on block operation failure", func(t *testing.T) { + co := NewCoordinator(NewSyncerRegistry(), Callbacks{}) + co.state.Store(int32(StateRunning)) + co.target.Store(newTestSyncTarget(100)) + + failBlock := newMockBlock(100) + failBlock.acceptErr = errors.New("accept failed") + co.AddBlockOperation(failBlock, OpAccept) + + err := co.ProcessQueuedBlockOperations(t.Context()) + require.ErrorIs(t, err, errBatchOperationFailed) + }) +} + +// runCoordinator starts a coordinator and waits for completion. +func runCoordinator(t *testing.T, registry *SyncerRegistry, cbs Callbacks) (*Coordinator, error) { + t.Helper() + + var ( + errDone error + wg sync.WaitGroup + ) + wg.Add(1) + + cbs.OnDone = func(err error) { + errDone = err + wg.Done() + } + + co := NewCoordinator(registry, cbs) + co.Start(t.Context(), newTestSyncTarget(100)) + wg.Wait() + + return co, errDone +} + +func newTestSyncTarget(height uint64) message.Syncable { + hash := common.BytesToHash([]byte{byte(height)}) + root := common.BytesToHash([]byte{byte(height + 1)}) + return newSyncTarget(hash, root, height) +} diff --git a/graft/coreth/plugin/evm/vmsync/doubles_test.go b/graft/coreth/plugin/evm/vmsync/doubles_test.go index 7c79095df6c7..aaec3566dc7b 100644 --- a/graft/coreth/plugin/evm/vmsync/doubles_test.go +++ b/graft/coreth/plugin/evm/vmsync/doubles_test.go @@ -6,12 +6,39 @@ package vmsync import ( "context" "errors" + "math/big" "sync" "time" + "github.com/ava-labs/libevm/core/types" + + "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/message" + syncpkg "github.com/ava-labs/avalanchego/graft/coreth/sync" ) +// mockEthBlockWrapper implements EthBlockWrapper for testing. +type mockEthBlockWrapper struct { + ethBlock *types.Block + acceptErr error + rejectErr error + verifyErr error +} + +func newMockBlock(height uint64) *mockEthBlockWrapper { + header := &types.Header{Number: new(big.Int).SetUint64(height)} + return &mockEthBlockWrapper{ + ethBlock: types.NewBlockWithHeader(header), + } +} + +func (m *mockEthBlockWrapper) GetEthBlock() *types.Block { return m.ethBlock } +func (m *mockEthBlockWrapper) Accept(context.Context) error { return m.acceptErr } +func (m *mockEthBlockWrapper) Reject(context.Context) error { return m.rejectErr } +func (m *mockEthBlockWrapper) Verify(context.Context) error { return m.verifyErr } + +var _ EthBlockWrapper = (*mockEthBlockWrapper)(nil) + // FuncSyncer adapts a function to the simple Syncer shape used in tests. It is // useful for defining small, behavior-driven syncers inline. type FuncSyncer struct { @@ -22,8 +49,9 @@ type FuncSyncer struct { func (f FuncSyncer) Sync(ctx context.Context) error { return f.fn(ctx) } // Name returns the provided name or a default if unspecified. -func (FuncSyncer) Name() string { return "Test Name" } -func (FuncSyncer) ID() string { return "test_id" } +func (FuncSyncer) Name() string { return "Test Name" } +func (FuncSyncer) ID() string { return "test_id" } +func (FuncSyncer) UpdateTarget(_ message.Syncable) error { return nil } var _ syncpkg.Syncer = FuncSyncer{} diff --git a/graft/coreth/plugin/evm/vmsync/executor_dynamic.go b/graft/coreth/plugin/evm/vmsync/executor_dynamic.go new file mode 100644 index 000000000000..a9b12d9b8ae2 --- /dev/null +++ b/graft/coreth/plugin/evm/vmsync/executor_dynamic.go @@ -0,0 +1,106 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package vmsync + +import ( + "context" + "fmt" + + "github.com/ava-labs/libevm/log" + + "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/message" +) + +var _ Executor = (*dynamicExecutor)(nil) + +// dynamicExecutor runs syncers concurrently with block queueing. +// It wraps [Coordinator] to manage the sync lifecycle. +type dynamicExecutor struct { + coordinator *Coordinator +} + +func newDynamicExecutor(registry *SyncerRegistry, committer Committer, pivotInterval uint64) *dynamicExecutor { + coordinator := NewCoordinator( + registry, + Callbacks{ + FinalizeVM: committer.Commit, + OnDone: nil, // Set in Execute to capture completion. + }, + WithPivotInterval(pivotInterval), + ) + return &dynamicExecutor{coordinator: coordinator} +} + +// Execute launches the coordinator and blocks until sync completes or fails. +func (d *dynamicExecutor) Execute(ctx context.Context, summary message.Syncable) error { + done := make(chan error, 1) + + // Wire up OnDone to signal completion. + d.coordinator.callbacks.OnDone = func(err error) { + if err != nil { + log.Error("dynamic state sync completed with error", "err", err) + } else { + log.Info("dynamic state sync completed successfully") + } + done <- err + } + + d.coordinator.Start(ctx, summary) + return <-done +} + +// OnBlockAccepted enqueues the block for deferred processing and updates the sync target. +func (d *dynamicExecutor) OnBlockAccepted(b EthBlockWrapper) (bool, error) { + if d.coordinator.CurrentState() == StateExecutingBatch { + // Still enqueue for the next batch, but don't update target. + return d.enqueue(b, OpAccept), nil + } + + if !d.enqueue(b, OpAccept) { + return false, nil + } + + ethb := b.GetEthBlock() + target := newSyncTarget(ethb.Hash(), ethb.Root(), ethb.NumberU64()) + if err := d.coordinator.UpdateSyncTarget(target); err != nil { + // Block is enqueued but target update failed. + return true, fmt.Errorf("block enqueued but sync target update failed: %w", err) + } + return true, nil +} + +// OnBlockRejected enqueues the block for deferred rejection. +func (d *dynamicExecutor) OnBlockRejected(b EthBlockWrapper) (bool, error) { + return d.enqueue(b, OpReject), nil +} + +// OnBlockVerified enqueues the block for deferred verification. +func (d *dynamicExecutor) OnBlockVerified(b EthBlockWrapper) (bool, error) { + return d.enqueue(b, OpVerify), nil +} + +// enqueue adds a block operation to the coordinator's queue. +func (d *dynamicExecutor) enqueue(b EthBlockWrapper, op BlockOperationType) bool { + ok := d.coordinator.AddBlockOperation(b, op) + if !ok { + if ethb := b.GetEthBlock(); ethb != nil { + log.Warn("could not enqueue block operation", + "hash", ethb.Hash(), + "height", ethb.NumberU64(), + "op", op.String(), + ) + } + } + return ok +} + +// CurrentState returns the coordinator's current state. +func (d *dynamicExecutor) CurrentState() State { + return d.coordinator.CurrentState() +} + +// UpdateSyncTarget updates the coordinator's sync target. +func (d *dynamicExecutor) UpdateSyncTarget(target message.Syncable) error { + return d.coordinator.UpdateSyncTarget(target) +} diff --git a/graft/coreth/plugin/evm/vmsync/executor_static.go b/graft/coreth/plugin/evm/vmsync/executor_static.go new file mode 100644 index 000000000000..adfef7d9b39e --- /dev/null +++ b/graft/coreth/plugin/evm/vmsync/executor_static.go @@ -0,0 +1,51 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package vmsync + +import ( + "context" + + "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/message" +) + +var _ Executor = (*staticExecutor)(nil) + +// staticExecutor runs syncers sequentially without block queueing. +// This is the default sync mode where all syncers complete before +// committing results, with no concurrent block processing. +type staticExecutor struct { + registry *SyncerRegistry + acceptor Acceptor +} + +func newStaticExecutor(registry *SyncerRegistry, acceptor Acceptor) *staticExecutor { + return &staticExecutor{ + registry: registry, + acceptor: acceptor, + } +} + +// Execute runs the sync process and blocks until completion or error. +// For static sync, this runs all syncers and then accepts the synced state into the VM. +func (e *staticExecutor) Execute(ctx context.Context, summary message.Syncable) error { + if err := e.registry.RunSyncerTasks(ctx, summary); err != nil { + return err + } + return e.acceptor.AcceptSync(ctx, summary) +} + +// OnBlockAccepted is a no-op for static sync since blocks are not queued. +func (*staticExecutor) OnBlockAccepted(EthBlockWrapper) (bool, error) { + return false, nil +} + +// OnBlockRejected is a no-op for static sync since blocks are not queued. +func (*staticExecutor) OnBlockRejected(EthBlockWrapper) (bool, error) { + return false, nil +} + +// OnBlockVerified is a no-op for static sync since blocks are not queued. +func (*staticExecutor) OnBlockVerified(EthBlockWrapper) (bool, error) { + return false, nil +} diff --git a/graft/coreth/plugin/evm/vmsync/pivot_policy.go b/graft/coreth/plugin/evm/vmsync/pivot_policy.go new file mode 100644 index 000000000000..ac808368b92c --- /dev/null +++ b/graft/coreth/plugin/evm/vmsync/pivot_policy.go @@ -0,0 +1,60 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package vmsync + +import "sync/atomic" + +// defaultPivotInterval is the default number of blocks between sync target updates. +const defaultPivotInterval = uint64(10000) + +// pivotPolicy encapsulates the logic for deciding when to forward +// a new sync target based on a fixed block-height interval. It is +// safe for concurrent use. +type pivotPolicy struct { + interval uint64 + // nextHeight is the next height threshold at or beyond which we + // should forward an update. A value of 0 means uninitialized. + nextHeight atomic.Uint64 +} + +// newPivotPolicy creates a new pivot policy with the given interval. +// If interval is 0, defaultPivotInterval is used. +func newPivotPolicy(interval uint64) *pivotPolicy { + if interval == 0 { + interval = defaultPivotInterval + } + return &pivotPolicy{interval: interval} +} + +// shouldForward reports whether a summary at the given height should be +// forwarded, initializing the next threshold on first use. When it returns +// true, callers should follow up with advance(). +func (p *pivotPolicy) shouldForward(height uint64) bool { + if p == nil || p.interval == 0 { + return true + } + next := p.nextHeight.Load() + if next == 0 { + // Round up the initial height to the next multiple of interval. + // Ceil division: ((h + interval - 1) / interval) * interval + h := height + init := ((h + p.interval - 1) / p.interval) * p.interval + // Initialize once - if another goroutine wins, read the established value. + if !p.nextHeight.CompareAndSwap(0, init) { + next = p.nextHeight.Load() + } else { + next = init + } + } + return height >= next +} + +// advance moves the next threshold forward by one interval. Call this +// only after shouldForward has returned true and the update was issued. +func (p *pivotPolicy) advance() { + if p == nil || p.interval == 0 { + return + } + p.nextHeight.Add(p.interval) +} diff --git a/graft/coreth/plugin/evm/vmsync/pivot_policy_test.go b/graft/coreth/plugin/evm/vmsync/pivot_policy_test.go new file mode 100644 index 000000000000..5237a102c0d1 --- /dev/null +++ b/graft/coreth/plugin/evm/vmsync/pivot_policy_test.go @@ -0,0 +1,27 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package vmsync + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPivotPolicy(t *testing.T) { + // Zero interval uses default. + require.Equal(t, defaultPivotInterval, newPivotPolicy(0).interval) + + // Test throttling behavior. + p := newPivotPolicy(100) + + // First call at 150 initializes threshold to ceil(150/100)*100 = 200 + require.False(t, p.shouldForward(150)) // 150 < 200 + require.False(t, p.shouldForward(199)) // 199 < 200 + require.True(t, p.shouldForward(200)) // 200 >= 200 + p.advance() // threshold becomes 300 + + require.False(t, p.shouldForward(250)) // 250 < 300 + require.True(t, p.shouldForward(300)) // 300 >= 300 +} diff --git a/graft/coreth/plugin/evm/vmsync/registry.go b/graft/coreth/plugin/evm/vmsync/registry.go index 83c3c9ddcfc5..368f244cbc6c 100644 --- a/graft/coreth/plugin/evm/vmsync/registry.go +++ b/graft/coreth/plugin/evm/vmsync/registry.go @@ -107,6 +107,19 @@ func (r *SyncerRegistry) StartAsync(ctx context.Context, summary message.Syncabl return g } +// UpdateSyncTarget updates the sync target for all syncers. +// Note: Syncers manage cancellation themselves through their Sync() contexts. +func (r *SyncerRegistry) UpdateSyncTarget(newTarget message.Syncable) error { + for _, task := range r.syncers { + if err := task.syncer.UpdateTarget(newTarget); err != nil { + log.Error("failed updating sync target", "name", task.name, "err", err) + return err + } + log.Info("updated sync target", "name", task.name, "new_target", newTarget.GetBlockHash().Hex(), "height", newTarget.Height()) + } + return nil +} + // FinalizeAll iterates over all registered syncers and calls Finalize on those that implement the Finalizer interface. // Errors are logged but not returned to ensure best-effort cleanup of all syncers. func (r *SyncerRegistry) FinalizeAll(summary message.Syncable) { diff --git a/graft/coreth/plugin/evm/vmsync/registry_test.go b/graft/coreth/plugin/evm/vmsync/registry_test.go index 55f17d15d01f..f29c8e5569e5 100644 --- a/graft/coreth/plugin/evm/vmsync/registry_test.go +++ b/graft/coreth/plugin/evm/vmsync/registry_test.go @@ -38,8 +38,9 @@ func (m *mockSyncer) Sync(context.Context) error { return m.syncError } -func (m *mockSyncer) Name() string { return m.name } -func (m *mockSyncer) ID() string { return m.name } +func (m *mockSyncer) Name() string { return m.name } +func (m *mockSyncer) ID() string { return m.name } +func (*mockSyncer) UpdateTarget(_ message.Syncable) error { return nil } // namedSyncer adapts an existing syncer with a provided name to satisfy Syncer with Name(). type namedSyncer struct { @@ -50,6 +51,9 @@ type namedSyncer struct { func (n *namedSyncer) Sync(ctx context.Context) error { return n.syncer.Sync(ctx) } func (n *namedSyncer) Name() string { return n.name } func (n *namedSyncer) ID() string { return n.name } +func (n *namedSyncer) UpdateTarget(newTarget message.Syncable) error { + return n.syncer.UpdateTarget(newTarget) +} // syncerConfig describes a test syncer setup for RunSyncerTasks table tests. type syncerConfig struct { diff --git a/graft/coreth/plugin/evm/vmsync/sync_target.go b/graft/coreth/plugin/evm/vmsync/sync_target.go new file mode 100644 index 000000000000..539e3ed6e9ed --- /dev/null +++ b/graft/coreth/plugin/evm/vmsync/sync_target.go @@ -0,0 +1,44 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package vmsync + +import ( + "context" + + "github.com/ava-labs/libevm/common" + + "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/message" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow/engine/snowman/block" +) + +var _ message.Syncable = (*syncTarget)(nil) + +// syncTarget is a minimal implementation of [message.Syncable] used internally +// to advance the coordinator's sync target from engine-accepted blocks. +// +// NOTE: Unlike [message.BlockSyncSummary], this is not serializable and should not +// be used for network communication. Only [message.Syncable.GetBlockHash], +// [message.Syncable.GetBlockRoot], and [message.Syncable.Height] are used in practice. +// The other methods are stubs to satisfy the interface. +type syncTarget struct { + hash common.Hash + root common.Hash + height uint64 +} + +func newSyncTarget(hash common.Hash, root common.Hash, height uint64) message.Syncable { + return &syncTarget{hash: hash, root: root, height: height} +} + +func (s *syncTarget) GetBlockHash() common.Hash { return s.hash } +func (s *syncTarget) GetBlockRoot() common.Hash { return s.root } + +func (s *syncTarget) ID() ids.ID { return ids.ID(s.hash) } +func (s *syncTarget) Height() uint64 { return s.height } +func (s *syncTarget) Bytes() []byte { return s.hash.Bytes() } +func (*syncTarget) Accept(context.Context) (block.StateSyncMode, error) { + // When used internally to advance targets, we always handle dynamically. + return block.StateSyncDynamic, nil +} diff --git a/graft/coreth/plugin/evm/wrapped_block.go b/graft/coreth/plugin/evm/wrapped_block.go index c7600a9bf2cc..b811691efbe6 100644 --- a/graft/coreth/plugin/evm/wrapped_block.go +++ b/graft/coreth/plugin/evm/wrapped_block.go @@ -55,6 +55,7 @@ var ( errParentBeaconRootNonEmpty = errors.New("invalid non-empty parentBeaconRoot") errBlobGasUsedNilInCancun = errors.New("blob gas used must not be nil in Cancun") errBlobsNotEnabled = errors.New("blobs not enabled on avalanche networks") + errCouldNotNotifySyncClient = errors.New("could not notify sync client") ) var ( @@ -91,11 +92,21 @@ func wrapBlock(ethBlock *types.Block, vm *VM) (*wrappedBlock, error) { func (b *wrappedBlock) ID() ids.ID { return b.id } // Accept implements the snowman.Block interface -func (b *wrappedBlock) Accept(context.Context) error { - vm := b.vm - // Although returning an error from Accept is considered fatal, it is good - // practice to cleanup the batch we were modifying in the case of an error. - defer vm.versiondb.Abort() +// TODO(powerslider): Propagate context to the sync client. +func (b *wrappedBlock) Accept(_ context.Context) error { + // Notify sync client that engine accepted a block. + // If the block was enqueued for deferred processing, skip immediate execution. + if client := b.vm.SyncerClient(); client != nil { + deferred, err := client.OnEngineAccept(b) + if err != nil { + return fmt.Errorf("%w: %w", errCouldNotNotifySyncClient, err) + } + if deferred { + // Block was enqueued for deferred processing during dynamic state sync. + // It will be processed later from the queue, so skip immediate execution. + return nil + } + } blkID := b.ID() log.Debug("accepting block", @@ -111,17 +122,17 @@ func (b *wrappedBlock) Accept(context.Context) error { if err := b.handlePrecompileAccept(rules); err != nil { return err } - if err := vm.blockChain.Accept(b.ethBlock); err != nil { + if err := b.vm.blockChain.Accept(b.ethBlock); err != nil { return fmt.Errorf("chain could not accept %s: %w", blkID, err) } - if err := vm.PutLastAcceptedID(blkID); err != nil { + if err := b.vm.PutLastAcceptedID(blkID); err != nil { return fmt.Errorf("failed to put %s as the last accepted block: %w", blkID, err) } // Get pending operations on the vm's versionDB so we can apply them atomically // with the block extension's changes. - vdbBatch, err := vm.versiondb.CommitBatch() + vdbBatch, err := b.vm.versiondb.CommitBatch() if err != nil { return fmt.Errorf("could not create commit batch processing block[%s]: %w", blkID, err) } @@ -130,11 +141,17 @@ func (b *wrappedBlock) Accept(context.Context) error { // Apply any changes atomically with other pending changes to // the vm's versionDB. // Accept flushes the changes in the batch to the database. - return b.extension.Accept(vdbBatch) + if err := b.extension.Accept(vdbBatch); err != nil { + return err + } + } else { + // If there is no extension, we still need to apply the changes to the versionDB + if err := vdbBatch.Write(); err != nil { + return err + } } - // If there is no extension, we still need to apply the changes to the versionDB - return vdbBatch.Write() + return nil } // handlePrecompileAccept calls Accept on any logs generated with an active precompile address that implements @@ -173,7 +190,21 @@ func (b *wrappedBlock) handlePrecompileAccept(rules extras.Rules) error { // Reject implements the snowman.Block interface // If [b] contains an atomic transaction, attempt to re-issue it -func (b *wrappedBlock) Reject(context.Context) error { +func (b *wrappedBlock) Reject(_ context.Context) error { + // Notify sync client that engine rejected a block. + // If the block was enqueued for deferred processing, skip immediate execution. + if client := b.vm.SyncerClient(); client != nil { + deferred, err := client.OnEngineReject(b) + if err != nil { + return fmt.Errorf("%w: %w", errCouldNotNotifySyncClient, err) + } + if deferred { + // Block was enqueued for deferred processing during dynamic state sync. + // It will be processed later from the queue, so skip immediate execution. + return nil + } + } + blkID := b.ID() log.Debug("rejecting block", "hash", blkID.Hex(), @@ -205,7 +236,24 @@ func (b *wrappedBlock) Timestamp() time.Time { } // Verify implements the snowman.Block interface -func (b *wrappedBlock) Verify(context.Context) error { +// TODO(powerslider): Propagate context to the sync client. +func (b *wrappedBlock) Verify(_ context.Context) error { + // Notify sync client that engine verified a block. + // If the block was enqueued for deferred processing, skip immediate execution. + if client := b.vm.SyncerClient(); client != nil { + deferred, err := client.OnEngineVerify(b) + if err != nil { + return fmt.Errorf("%w: %w", errCouldNotNotifySyncClient, err) + } + if deferred { + // Block was enqueued for deferred processing during dynamic state sync. + // It will be processed later from the queue, so skip immediate execution. + // Note: Verify may be called multiple times with different contexts, but + // we only enqueue once and process once from the queue. + return nil + } + } + return b.verify(&precompileconfig.PredicateContext{ SnowCtx: b.vm.ctx, ProposerVMBlockCtx: nil, @@ -236,8 +284,25 @@ func (b *wrappedBlock) ShouldVerifyWithContext(context.Context) (bool, error) { return false, nil } -// VerifyWithContext implements the block.WithVerifyContext interface +// VerifyWithContext implements the block.WithVerifyContext interface. +// TODO(powerslider): Propagate context to the sync client. func (b *wrappedBlock) VerifyWithContext(_ context.Context, proposerVMBlockCtx *block.Context) error { + // Notify sync client that engine verified a block. + // If the block was enqueued for deferred processing, skip immediate execution. + if client := b.vm.SyncerClient(); client != nil { + deferred, err := client.OnEngineVerify(b) + if err != nil { + return fmt.Errorf("%w: %w", errCouldNotNotifySyncClient, err) + } + if deferred { + // Block was enqueued for deferred processing during dynamic state sync. + // It will be processed later from the queue, so skip immediate execution. + // Note: VerifyWithContext may be called multiple times with different contexts, but + // we only enqueue once and process once from the queue. + return nil + } + } + return b.verify(&precompileconfig.PredicateContext{ SnowCtx: b.vm.ctx, ProposerVMBlockCtx: proposerVMBlockCtx, diff --git a/graft/coreth/sync/blocksync/syncer.go b/graft/coreth/sync/blocksync/syncer.go index 2f7340741234..0ffa9caa5780 100644 --- a/graft/coreth/sync/blocksync/syncer.go +++ b/graft/coreth/sync/blocksync/syncer.go @@ -13,6 +13,8 @@ import ( "github.com/ava-labs/libevm/ethdb" "github.com/ava-labs/libevm/log" + "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/message" + syncpkg "github.com/ava-labs/avalanchego/graft/coreth/sync" statesyncclient "github.com/ava-labs/avalanchego/graft/coreth/sync/client" ) @@ -112,3 +114,7 @@ func (s *BlockSyncer) Sync(ctx context.Context) error { log.Info("fetched blocks from peer", "total", blocksToFetch) return batch.Write() } + +func (*BlockSyncer) UpdateTarget(_ message.Syncable) error { + return nil +} diff --git a/graft/coreth/sync/statesync/code_syncer.go b/graft/coreth/sync/statesync/code_syncer.go index 022b8622c3ea..78efed9e17fc 100644 --- a/graft/coreth/sync/statesync/code_syncer.go +++ b/graft/coreth/sync/statesync/code_syncer.go @@ -115,6 +115,10 @@ func (c *CodeSyncer) Sync(ctx context.Context) error { return eg.Wait() } +func (*CodeSyncer) UpdateTarget(_ message.Syncable) error { + return nil +} + // work fulfills any incoming requests from the producer channel by fetching code bytes from the network // and fulfilling them by updating the database. func (c *CodeSyncer) work(ctx context.Context) error { diff --git a/graft/coreth/sync/statesync/state_syncer.go b/graft/coreth/sync/statesync/state_syncer.go index 309f5ef3863f..751e3596acfe 100644 --- a/graft/coreth/sync/statesync/state_syncer.go +++ b/graft/coreth/sync/statesync/state_syncer.go @@ -18,6 +18,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/ava-labs/avalanchego/graft/coreth/core/state/snapshot" + "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/message" syncpkg "github.com/ava-labs/avalanchego/graft/coreth/sync" syncclient "github.com/ava-labs/avalanchego/graft/coreth/sync/client" @@ -171,6 +172,10 @@ func (t *stateSync) Sync(ctx context.Context) error { return eg.Wait() } +func (*stateSync) UpdateTarget(_ message.Syncable) error { + return nil +} + // onStorageTrieFinished is called after a storage trie finishes syncing. func (t *stateSync) onStorageTrieFinished(root common.Hash) error { <-t.triesInProgressSem // allow another trie to start (release the semaphore) diff --git a/graft/coreth/sync/types.go b/graft/coreth/sync/types.go index 63418aa1f5a8..dfd22c8aec42 100644 --- a/graft/coreth/sync/types.go +++ b/graft/coreth/sync/types.go @@ -22,6 +22,9 @@ type Syncer interface { // The sync will respect context cancellation. Sync(ctx context.Context) error + // UpdateTarget updates the syncer's target while running to support dynamic state sync. + UpdateTarget(newTarget message.Syncable) error + // Name returns a human-readable name for this syncer implementation. Name() string