Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e03071e
refactor(vmsync): extract SyncStrategy pattern for static state sync
powerslider Dec 3, 2025
f8ff4c3
feat(vmsync): dynamic state sync with coordinator, pivot cadence, and…
powerslider Oct 29, 2025
6277327
fix: move OnEngineAccept
powerslider Oct 31, 2025
430bba6
feat(vmsync): remove stale blocks from queue after sync target updates
powerslider Nov 6, 2025
f0b79bf
refactor(vmsync): propagate context throughout dynamic state sync flow
powerslider Nov 7, 2025
d953aac
refactor(vmsync): improve code quality and reduce duplication
powerslider Nov 10, 2025
88c53c9
feat(vmsync): allow block enqueuing during batch execution
powerslider Nov 10, 2025
74f726e
feat(sync): add Finalize method to Syncer interface and integrate int…
powerslider Nov 11, 2025
d7ad43e
fix(vmsync): prevent double execution and fix race conditions in dyna…
powerslider Nov 11, 2025
2e6e033
docs(vmsync): streamline method docs
powerslider Nov 11, 2025
30853a4
test(vmsync): add unit tests for dynamic state sync flow
powerslider Dec 2, 2025
5db739e
test(vmsync): add unit tests for dynamic state sync flow
powerslider Dec 2, 2025
58abd3e
refactor(vmsync): introduce SyncStrategy pattern for static/dynamic sync
powerslider Dec 2, 2025
05f5763
refactor(vmsync): use explicit config field instead of type embedding
powerslider Dec 3, 2025
a5f7cbc
refactor(vmsync): refinements on sync strategy definition
powerslider Dec 3, 2025
8d2a974
Merge branch 'powerslider/4651-sync-client-strategy-support' into pow…
powerslider Dec 3, 2025
0283266
fix(vmsync): pass summary to startAsync instead of using resumableSum…
powerslider Dec 3, 2025
fdaf63a
Merge branch 'master' into powerslider/4651-sync-client-strategy-support
powerslider Dec 3, 2025
7180a13
Merge branch 'master' into powerslider/4651-sync-client-strategy-support
powerslider Dec 16, 2025
ed5e8e7
refactor(vmsync): replace finalizer struct with Committer interface
powerslider Dec 16, 2025
e755cb3
refactor(vmsync): rename SyncStrategy to Executor
powerslider Dec 17, 2025
9dbeb65
Merge branch 'master' into powerslider/4651-sync-client-strategy-support
powerslider Dec 18, 2025
7cab6bd
Merge branch 'master' into powerslider/4651-sync-client-strategy-support
powerslider Dec 18, 2025
2dd729b
Merge branch 'master' into powerslider/4651-sync-client-strategy-support
powerslider Dec 18, 2025
da93531
Merge branch 'powerslider/4651-sync-client-strategy-support' into pow…
powerslider Dec 18, 2025
74a6f44
Merge branch 'master' into powerslider/4651-sync-client-strategy-support
powerslider Dec 19, 2025
745b63a
fix(vmsync): add atomic context to extender syncer errors
powerslider Dec 19, 2025
c038160
fix(vmsync): rename Commiter interface to Acceptor and Commit to Acce…
powerslider Dec 19, 2025
59756ae
Merge branch 'master' into powerslider/4651-sync-client-strategy-support
powerslider Dec 22, 2025
2552e47
Merge branch 'powerslider/4651-sync-client-strategy-support' into pow…
powerslider Dec 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions graft/coreth/plugin/evm/atomic/sync/extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,21 @@ func (a *Extender) Initialize(backend *state.AtomicBackend, trie *state.AtomicTr
func (a *Extender) CreateSyncer(client syncclient.LeafClient, verDB *versiondb.Database, summary message.Syncable) (sync.Syncer, error) {
atomicSummary, ok := summary.(*Summary)
if !ok {
return nil, fmt.Errorf("expected *Summary, got %T", summary)
return nil, fmt.Errorf("atomic sync extender: expected *Summary, got %T", summary)
}

return NewSyncer(
syncer, err := NewSyncer(
client,
verDB,
a.trie,
atomicSummary.AtomicRoot,
atomicSummary.BlockNumber,
WithRequestSize(a.requestSize),
)
if err != nil {
return nil, fmt.Errorf("atomic.NewSyncer failed: %w", err)
}
return syncer, nil
}

// OnFinishBeforeCommit implements the sync.Extender interface by marking the previously last accepted block for the shared memory cursor.
Expand Down
4 changes: 4 additions & 0 deletions graft/coreth/plugin/evm/atomic/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ func (s *Syncer) Sync(ctx context.Context) error {
return s.syncer.Sync(ctx)
}

func (*Syncer) UpdateTarget(_ message.Syncable) error {
return nil
}

// Finalize commits any pending database changes to disk.
// This ensures that even if the sync is cancelled or fails, we preserve
// the progress up to the last fully synced height.
Expand Down
94 changes: 94 additions & 0 deletions graft/coreth/plugin/evm/vmsync/block_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package vmsync

import "sync"

// BlockOperationType represents the type of operation to perform on a block.
type BlockOperationType int

const (
OpAccept BlockOperationType = iota
OpReject
OpVerify
)

// String returns the string representation of the block operation.
func (op BlockOperationType) String() string {
switch op {
case OpAccept:
return "accept"
case OpReject:
return "reject"
case OpVerify:
return "verify"
default:
return "unknown"
}
}

// blockOperation represents a queued block operation.
type blockOperation struct {
block EthBlockWrapper
operation BlockOperationType
}

// blockQueue buffers block operations (accept/reject/verify) that arrive while
// the coordinator is in the Running state. Operations are processed in FIFO order.
// It is cleared (drained) on UpdateSyncTarget to avoid drops and is snapshotted
// at finalization via DequeueBatch. Enqueue is always allowed; a DequeueBatch
// only captures the current buffered operations and clears them, and new enqueues
// after the snapshot are not part of that batch.
type blockQueue struct {
mu sync.Mutex
// buffered operations accumulated before finalization
items []blockOperation
}

// newBlockQueue creates a new empty queue.
func newBlockQueue() *blockQueue {
return &blockQueue{}
}

// enqueue appends a block operation to the buffer. Returns true if the operation
// was queued, false if the block is nil.
func (q *blockQueue) enqueue(b EthBlockWrapper, op BlockOperationType) bool {
if b == nil {
return false
}
q.mu.Lock()
defer q.mu.Unlock()
q.items = append(q.items, blockOperation{
block: b,
operation: op,
})
return true
}

// dequeueBatch returns the current buffered operations and clears the buffer. New
// arrivals after the snapshot are not included and remain buffered for later.
func (q *blockQueue) dequeueBatch() []blockOperation {
q.mu.Lock()
defer q.mu.Unlock()
out := q.items
q.items = nil
return out
}

// removeBelowHeight removes all queued blocks with height <= targetHeight.
// This is called after UpdateSyncTarget to remove blocks that will never be executed
// because the sync target has advanced past them.
func (q *blockQueue) removeBelowHeight(targetHeight uint64) {
q.mu.Lock()
defer q.mu.Unlock()

filtered := q.items[:0]
for _, op := range q.items {
ethBlock := op.block.GetEthBlock()
if ethBlock != nil && ethBlock.NumberU64() > targetHeight {
filtered = append(filtered, op)
}
}
q.items = filtered
}
76 changes: 76 additions & 0 deletions graft/coreth/plugin/evm/vmsync/block_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package vmsync

import (
"sync"
"testing"

"github.com/stretchr/testify/require"
)

func TestBlockQueue_EnqueueAndDequeue(t *testing.T) {
q := newBlockQueue()

// Nil block should be rejected.
require.False(t, q.enqueue(nil, OpAccept))

// Enqueue blocks.
for i := uint64(100); i < 105; i++ {
require.True(t, q.enqueue(newMockBlock(i), OpAccept))
}

// Dequeue returns all in FIFO order and clears queue.
batch := q.dequeueBatch()
require.Len(t, batch, 5)
for i, op := range batch {
require.Equal(t, uint64(100+i), op.block.GetEthBlock().NumberU64())
}

// Queue is now empty.
require.Empty(t, q.dequeueBatch())
}

func TestBlockQueue_RemoveBelowHeight(t *testing.T) {
q := newBlockQueue()

// Enqueue blocks at heights 100-110.
for i := uint64(100); i <= 110; i++ {
q.enqueue(newMockBlock(i), OpAccept)
}

// Remove blocks at or below height 105.
q.removeBelowHeight(105)

// Only blocks > 105 should remain (106, 107, 108, 109, 110).
batch := q.dequeueBatch()
require.Len(t, batch, 5)
require.Equal(t, uint64(106), batch[0].block.GetEthBlock().NumberU64())
}

func TestBlockQueue_ConcurrentAccess(t *testing.T) {
t.Parallel()

q := newBlockQueue()
const numGoroutines = 10
const numOps = 100

var wg sync.WaitGroup
wg.Add(numGoroutines)

for g := 0; g < numGoroutines; g++ {
go func(id int) {
defer wg.Done()
for i := 0; i < numOps; i++ {
q.enqueue(newMockBlock(uint64(id*numOps+i)), OpAccept)
}
}(g)
}

wg.Wait()

// All operations should have been enqueued.
batch := q.dequeueBatch()
require.Len(t, batch, numGoroutines*numOps)
}
Loading
Loading