diff --git a/tx-submitter/iface/batch_fetcher.go b/tx-submitter/iface/batch_fetcher.go new file mode 100644 index 000000000..2dccd84f4 --- /dev/null +++ b/tx-submitter/iface/batch_fetcher.go @@ -0,0 +1,10 @@ +package iface + +import ( + "github.com/morph-l2/go-ethereum/eth" +) + +// BatchFetcher defines the interface for fetching batch data from nodes +type BatchFetcher interface { + GetRollupBatchByIndex(index uint64) (*eth.RPCRollupBatch, error) +} \ No newline at end of file diff --git a/tx-submitter/services/batch_fetcher.go b/tx-submitter/services/batch_fetcher.go new file mode 100644 index 000000000..826c49443 --- /dev/null +++ b/tx-submitter/services/batch_fetcher.go @@ -0,0 +1,33 @@ +package services + +import ( + "context" + "fmt" + "morph-l2/tx-submitter/iface" + + "github.com/morph-l2/go-ethereum/eth" +) + +type BatchFetcher struct { + l2Clients []iface.L2Client +} + +func NewBatchFetcher(l2Clients []iface.L2Client) *BatchFetcher { + return &BatchFetcher{ + l2Clients: l2Clients, + } +} + +func (bf *BatchFetcher) GetRollupBatchByIndex(index uint64) (*eth.RPCRollupBatch, error) { + // Try each L2 client until we get a successful response + for _, client := range bf.l2Clients { + batch, err := client.GetRollupBatchByIndex(context.Background(), index) + if err != nil { + continue + } + if batch != nil { + return batch, nil + } + } + return nil, fmt.Errorf("failed to get batch %d from any L2 client", index) +} diff --git a/tx-submitter/services/rollup.go b/tx-submitter/services/rollup.go index c7180d3ae..96e9c78af 100644 --- a/tx-submitter/services/rollup.go +++ b/tx-submitter/services/rollup.go @@ -225,6 +225,8 @@ func (r *Rollup) Start() error { func (r *Rollup) ProcessTx() error { + // case 0: batch has committed + // -> remove from local pool // case 1: in mempool // -> check timeout // case 2: no in mempool @@ -233,6 +235,15 @@ func (r *Rollup) ProcessTx() error { // case 2.3: tx included -> failed // -> reset index to failed index + // if this submitter work + cur, err := r.rotator.CurrentSubmitter(r.L2Clients, r.Staking) + if err != nil { + return fmt.Errorf("rollup: get current submitter err, %w", err) + } + if cur.Hex() != r.WalletAddr().Hex() { + log.Info("wait my turn to process tx") + return nil + } // get all local txs txRecords := r.pendingTxs.GetAll() if len(txRecords) == 0 { @@ -241,9 +252,27 @@ func (r *Rollup) ProcessTx() error { // query tx status for _, txRecord := range txRecords { - + // parse tx rtx := txRecord.tx method := utils.ParseMethod(rtx, r.abi) + if method == "commitBatch" { + // get latest rolluped batch index + cindexBig, err := r.Rollup.LastCommittedBatchIndex(nil) + if err != nil { + return fmt.Errorf("get last committed batch index error:%v", err) + } + batchIndex := utils.ParseParentBatchIndex(rtx.Data()) + if batchIndex <= cindexBig.Uint64() { + log.Info("batch has committed remove batch tx from local pool", + "cur_batch_index", batchIndex, + "latest_committed_batch_index", cindexBig, + ) + r.pendingTxs.Remove(rtx.Hash()) + continue + } + + } + log.Info("process tx", "hash", rtx.Hash().String(), "nonce", rtx.Nonce(), "method", method) // query tx _, ispending, err := r.L1Client.TransactionByHash(context.Background(), txRecord.tx.Hash()) diff --git a/tx-submitter/types/batch_cache.go b/tx-submitter/types/batch_cache.go index 7e6794f1d..481704a93 100644 --- a/tx-submitter/types/batch_cache.go +++ b/tx-submitter/types/batch_cache.go @@ -3,27 +3,61 @@ package types import ( "sync" + "morph-l2/tx-submitter/iface" + "github.com/morph-l2/go-ethereum/eth" + "github.com/morph-l2/go-ethereum/log" ) type BatchCache struct { + m sync.RWMutex batchCache map[uint64]*eth.RPCRollupBatch - m sync.Mutex + fetcher iface.BatchFetcher } -func NewBatchCache() *BatchCache { +// NewBatchCache creates a new batch cache instance +func NewBatchCache(fetcher iface.BatchFetcher) *BatchCache { return &BatchCache{ batchCache: make(map[uint64]*eth.RPCRollupBatch), + fetcher: fetcher, } } +// Get retrieves a batch from the cache by its index +// If not found in cache, tries to fetch from node func (b *BatchCache) Get(batchIndex uint64) (*eth.RPCRollupBatch, bool) { - b.m.Lock() - defer b.m.Unlock() - + // First try to get from cache + b.m.RLock() batch, ok := b.batchCache[batchIndex] - return batch, ok + b.m.RUnlock() + + if ok { + return batch, true + } + + // If not in cache, try to fetch from node + if b.fetcher != nil { + fetchedBatch, err := b.fetcher.GetRollupBatchByIndex(batchIndex) + if err != nil { + log.Warn("Failed to fetch batch from node", + "index", batchIndex, + "error", err) + return nil, false + } + + if fetchedBatch != nil { + // Store in cache for future use + b.m.Lock() + b.batchCache[batchIndex] = fetchedBatch + b.m.Unlock() + + return fetchedBatch, true + } + } + + return nil, false } + func (b *BatchCache) Set(batchIndex uint64, batch *eth.RPCRollupBatch) { b.m.Lock() defer b.m.Unlock() @@ -37,3 +71,10 @@ func (b *BatchCache) Delete(batchIndex uint64) { delete(b.batchCache, batchIndex) } + +// Clear removes all entries from the batch cache +func (bc *BatchCache) Clear() { + bc.m.Lock() + defer bc.m.Unlock() + bc.batchCache = make(map[uint64]*eth.RPCRollupBatch) +} diff --git a/tx-submitter/types/batch_cache_test.go b/tx-submitter/types/batch_cache_test.go index f07d6411e..a0bfce195 100644 --- a/tx-submitter/types/batch_cache_test.go +++ b/tx-submitter/types/batch_cache_test.go @@ -1,21 +1,163 @@ package types import ( + "sync" "testing" - - "github.com/stretchr/testify/require" + "github.com/morph-l2/go-ethereum/eth" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) +// MockBatchFetcher implements the BatchFetcher interface for testing +type MockBatchFetcher struct { + mock.Mock +} + +func (m *MockBatchFetcher) GetRollupBatchByIndex(index uint64) (*eth.RPCRollupBatch, error) { + args := m.Called(index) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*eth.RPCRollupBatch), args.Error(1) +} + func TestBatchCache(t *testing.T) { + t.Run("Get non-existent batch - fetch from node", func(t *testing.T) { + mockFetcher := new(MockBatchFetcher) + cache := NewBatchCache(mockFetcher) + + expectedBatch := ð.RPCRollupBatch{ + Version: 1, + } + mockFetcher.On("GetRollupBatchByIndex", uint64(1)).Return(expectedBatch, nil).Once() + + batch, ok := cache.Get(1) + assert.True(t, ok) + assert.Equal(t, expectedBatch, batch) + + mockFetcher.AssertExpectations(t) + + // Second get should use cache + batch, ok = cache.Get(1) + assert.True(t, ok) + assert.Equal(t, expectedBatch, batch) + }) + + t.Run("Get non-existent batch - fetch fails", func(t *testing.T) { + mockFetcher := new(MockBatchFetcher) + cache := NewBatchCache(mockFetcher) + + mockFetcher.On("GetRollupBatchByIndex", uint64(2)).Return(nil, assert.AnError).Once() + + batch, ok := cache.Get(2) + assert.False(t, ok) + assert.Nil(t, batch) + + mockFetcher.AssertExpectations(t) + }) + + t.Run("Set and Get batch", func(t *testing.T) { + mockFetcher := new(MockBatchFetcher) + cache := NewBatchCache(mockFetcher) + + batch := ð.RPCRollupBatch{ + Version: 1, + } + + cache.Set(3, batch) + + gotBatch, ok := cache.Get(3) + assert.True(t, ok) + assert.Equal(t, batch, gotBatch) + }) + + t.Run("Delete batch", func(t *testing.T) { + mockFetcher := new(MockBatchFetcher) + cache := NewBatchCache(mockFetcher) + + batch := ð.RPCRollupBatch{ + Version: 1, + } + + cache.Set(4, batch) + gotBatch, ok := cache.Get(4) + assert.True(t, ok) + assert.Equal(t, batch, gotBatch) + + cache.Delete(4) + + // Setup mock for fetching after delete + mockFetcher.On("GetRollupBatchByIndex", uint64(4)).Return(nil, assert.AnError).Once() + + gotBatch, ok = cache.Get(4) + assert.False(t, ok) + assert.Nil(t, gotBatch) + + mockFetcher.AssertExpectations(t) + }) + + t.Run("Clear cache", func(t *testing.T) { + mockFetcher := new(MockBatchFetcher) + cache := NewBatchCache(mockFetcher) + + batch1 := ð.RPCRollupBatch{Version: 1} + batch2 := ð.RPCRollupBatch{Version: 2} + + cache.Set(5, batch1) + cache.Set(6, batch2) + + cache.Clear() + + // Setup mocks for fetching after clear + mockFetcher.On("GetRollupBatchByIndex", uint64(5)).Return(nil, assert.AnError).Once() + mockFetcher.On("GetRollupBatchByIndex", uint64(6)).Return(nil, assert.AnError).Once() + + gotBatch, ok := cache.Get(5) + assert.False(t, ok) + assert.Nil(t, gotBatch) + + gotBatch, ok = cache.Get(6) + assert.False(t, ok) + assert.Nil(t, gotBatch) + + mockFetcher.AssertExpectations(t) + }) + + t.Run("Concurrent access", func(t *testing.T) { + mockFetcher := new(MockBatchFetcher) + cache := NewBatchCache(mockFetcher) + + // Pre-set a batch to avoid nil pointer in concurrent access + testBatch := ð.RPCRollupBatch{Version: 7} + cache.Set(7, testBatch) + + // Setup mock expectation to allow any number of calls + mockFetcher.On("GetRollupBatchByIndex", uint64(7)).Return(testBatch, nil).Maybe() + + // Use mutex to protect cache operations + var mu sync.Mutex + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + mu.Lock() + batch, ok := cache.Get(7) + if ok && batch != nil { + cache.Set(7, batch) + } + mu.Unlock() + }() + } - cache := NewBatchCache() - cache.Set(1, nil) - _, ok := cache.Get(1) - require.True(t, ok) - cache.Delete(1) - _, ok = cache.Get(1) - require.False(t, ok) - _, ok = cache.Get(2) - require.False(t, ok) + wg.Wait() + // Final validation of cache state + batch, ok := cache.Get(7) + assert.True(t, ok) + assert.NotNil(t, batch) + assert.Equal(t, testBatch.Version, batch.Version) + }) }