Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions tx-submitter/iface/batch_fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package iface

import (
"github.com/morph-l2/go-ethereum/eth"
)

// BatchFetcher defines the interface for fetching batch data from nodes
type BatchFetcher interface {
GetRollupBatchByIndex(index uint64) (*eth.RPCRollupBatch, error)
}
33 changes: 33 additions & 0 deletions tx-submitter/services/batch_fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package services

import (
"context"
"fmt"
"morph-l2/tx-submitter/iface"

"github.com/morph-l2/go-ethereum/eth"
)

type BatchFetcher struct {
l2Clients []iface.L2Client
}

func NewBatchFetcher(l2Clients []iface.L2Client) *BatchFetcher {
return &BatchFetcher{
l2Clients: l2Clients,
}
}

func (bf *BatchFetcher) GetRollupBatchByIndex(index uint64) (*eth.RPCRollupBatch, error) {
// Try each L2 client until we get a successful response
for _, client := range bf.l2Clients {
batch, err := client.GetRollupBatchByIndex(context.Background(), index)
if err != nil {
continue
}
if batch != nil {
return batch, nil
}
}
return nil, fmt.Errorf("failed to get batch %d from any L2 client", index)
}
31 changes: 30 additions & 1 deletion tx-submitter/services/rollup.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ func (r *Rollup) Start() error {

func (r *Rollup) ProcessTx() error {

// case 0: batch has committed
// -> remove from local pool
// case 1: in mempool
// -> check timeout
// case 2: no in mempool
Expand All @@ -233,6 +235,15 @@ func (r *Rollup) ProcessTx() error {
// case 2.3: tx included -> failed
// -> reset index to failed index

// if this submitter work
cur, err := r.rotator.CurrentSubmitter(r.L2Clients, r.Staking)
if err != nil {
return fmt.Errorf("rollup: get current submitter err, %w", err)
}
if cur.Hex() != r.WalletAddr().Hex() {
log.Info("wait my turn to process tx")
return nil
}
// get all local txs
txRecords := r.pendingTxs.GetAll()
if len(txRecords) == 0 {
Expand All @@ -241,9 +252,27 @@ func (r *Rollup) ProcessTx() error {

// query tx status
for _, txRecord := range txRecords {

// parse tx
rtx := txRecord.tx
method := utils.ParseMethod(rtx, r.abi)
if method == "commitBatch" {
// get latest rolluped batch index
cindexBig, err := r.Rollup.LastCommittedBatchIndex(nil)
if err != nil {
return fmt.Errorf("get last committed batch index error:%v", err)
}
batchIndex := utils.ParseParentBatchIndex(rtx.Data())
if batchIndex <= cindexBig.Uint64() {
log.Info("batch has committed remove batch tx from local pool",
"cur_batch_index", batchIndex,
"latest_committed_batch_index", cindexBig,
)
r.pendingTxs.Remove(rtx.Hash())
continue
}

}

log.Info("process tx", "hash", rtx.Hash().String(), "nonce", rtx.Nonce(), "method", method)
// query tx
_, ispending, err := r.L1Client.TransactionByHash(context.Background(), txRecord.tx.Hash())
Expand Down
53 changes: 47 additions & 6 deletions tx-submitter/types/batch_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,61 @@ package types
import (
"sync"

"morph-l2/tx-submitter/iface"

"github.com/morph-l2/go-ethereum/eth"
"github.com/morph-l2/go-ethereum/log"
)

type BatchCache struct {
m sync.RWMutex
batchCache map[uint64]*eth.RPCRollupBatch
m sync.Mutex
fetcher iface.BatchFetcher
}

func NewBatchCache() *BatchCache {
// NewBatchCache creates a new batch cache instance
func NewBatchCache(fetcher iface.BatchFetcher) *BatchCache {
return &BatchCache{
batchCache: make(map[uint64]*eth.RPCRollupBatch),
fetcher: fetcher,
}
}

// Get retrieves a batch from the cache by its index
// If not found in cache, tries to fetch from node
func (b *BatchCache) Get(batchIndex uint64) (*eth.RPCRollupBatch, bool) {
b.m.Lock()
defer b.m.Unlock()

// First try to get from cache
b.m.RLock()
batch, ok := b.batchCache[batchIndex]
return batch, ok
b.m.RUnlock()

if ok {
return batch, true
}

// If not in cache, try to fetch from node
if b.fetcher != nil {
fetchedBatch, err := b.fetcher.GetRollupBatchByIndex(batchIndex)
if err != nil {
log.Warn("Failed to fetch batch from node",
"index", batchIndex,
"error", err)
return nil, false
}

if fetchedBatch != nil {
// Store in cache for future use
b.m.Lock()
b.batchCache[batchIndex] = fetchedBatch
b.m.Unlock()

return fetchedBatch, true
}
}

return nil, false
}

func (b *BatchCache) Set(batchIndex uint64, batch *eth.RPCRollupBatch) {
b.m.Lock()
defer b.m.Unlock()
Expand All @@ -37,3 +71,10 @@ func (b *BatchCache) Delete(batchIndex uint64) {

delete(b.batchCache, batchIndex)
}

// Clear removes all entries from the batch cache
func (bc *BatchCache) Clear() {
bc.m.Lock()
defer bc.m.Unlock()
bc.batchCache = make(map[uint64]*eth.RPCRollupBatch)
}
164 changes: 153 additions & 11 deletions tx-submitter/types/batch_cache_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,163 @@
package types

import (
"sync"
"testing"

"github.com/stretchr/testify/require"
"github.com/morph-l2/go-ethereum/eth"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

// MockBatchFetcher implements the BatchFetcher interface for testing
type MockBatchFetcher struct {
mock.Mock
}

func (m *MockBatchFetcher) GetRollupBatchByIndex(index uint64) (*eth.RPCRollupBatch, error) {
args := m.Called(index)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*eth.RPCRollupBatch), args.Error(1)
}

func TestBatchCache(t *testing.T) {
t.Run("Get non-existent batch - fetch from node", func(t *testing.T) {
mockFetcher := new(MockBatchFetcher)
cache := NewBatchCache(mockFetcher)

expectedBatch := &eth.RPCRollupBatch{
Version: 1,
}
mockFetcher.On("GetRollupBatchByIndex", uint64(1)).Return(expectedBatch, nil).Once()

batch, ok := cache.Get(1)
assert.True(t, ok)
assert.Equal(t, expectedBatch, batch)

mockFetcher.AssertExpectations(t)

// Second get should use cache
batch, ok = cache.Get(1)
assert.True(t, ok)
assert.Equal(t, expectedBatch, batch)
})

t.Run("Get non-existent batch - fetch fails", func(t *testing.T) {
mockFetcher := new(MockBatchFetcher)
cache := NewBatchCache(mockFetcher)

mockFetcher.On("GetRollupBatchByIndex", uint64(2)).Return(nil, assert.AnError).Once()

batch, ok := cache.Get(2)
assert.False(t, ok)
assert.Nil(t, batch)

mockFetcher.AssertExpectations(t)
})

t.Run("Set and Get batch", func(t *testing.T) {
mockFetcher := new(MockBatchFetcher)
cache := NewBatchCache(mockFetcher)

batch := &eth.RPCRollupBatch{
Version: 1,
}

cache.Set(3, batch)

gotBatch, ok := cache.Get(3)
assert.True(t, ok)
assert.Equal(t, batch, gotBatch)
})

t.Run("Delete batch", func(t *testing.T) {
mockFetcher := new(MockBatchFetcher)
cache := NewBatchCache(mockFetcher)

batch := &eth.RPCRollupBatch{
Version: 1,
}

cache.Set(4, batch)
gotBatch, ok := cache.Get(4)
assert.True(t, ok)
assert.Equal(t, batch, gotBatch)

cache.Delete(4)

// Setup mock for fetching after delete
mockFetcher.On("GetRollupBatchByIndex", uint64(4)).Return(nil, assert.AnError).Once()

gotBatch, ok = cache.Get(4)
assert.False(t, ok)
assert.Nil(t, gotBatch)

mockFetcher.AssertExpectations(t)
})

t.Run("Clear cache", func(t *testing.T) {
mockFetcher := new(MockBatchFetcher)
cache := NewBatchCache(mockFetcher)

batch1 := &eth.RPCRollupBatch{Version: 1}
batch2 := &eth.RPCRollupBatch{Version: 2}

cache.Set(5, batch1)
cache.Set(6, batch2)

cache.Clear()

// Setup mocks for fetching after clear
mockFetcher.On("GetRollupBatchByIndex", uint64(5)).Return(nil, assert.AnError).Once()
mockFetcher.On("GetRollupBatchByIndex", uint64(6)).Return(nil, assert.AnError).Once()

gotBatch, ok := cache.Get(5)
assert.False(t, ok)
assert.Nil(t, gotBatch)

gotBatch, ok = cache.Get(6)
assert.False(t, ok)
assert.Nil(t, gotBatch)

mockFetcher.AssertExpectations(t)
})

t.Run("Concurrent access", func(t *testing.T) {
mockFetcher := new(MockBatchFetcher)
cache := NewBatchCache(mockFetcher)

// Pre-set a batch to avoid nil pointer in concurrent access
testBatch := &eth.RPCRollupBatch{Version: 7}
cache.Set(7, testBatch)

// Setup mock expectation to allow any number of calls
mockFetcher.On("GetRollupBatchByIndex", uint64(7)).Return(testBatch, nil).Maybe()

// Use mutex to protect cache operations
var mu sync.Mutex

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()

mu.Lock()
batch, ok := cache.Get(7)
if ok && batch != nil {
cache.Set(7, batch)
}
mu.Unlock()
}()
}

cache := NewBatchCache()
cache.Set(1, nil)
_, ok := cache.Get(1)
require.True(t, ok)
cache.Delete(1)
_, ok = cache.Get(1)
require.False(t, ok)
_, ok = cache.Get(2)
require.False(t, ok)
wg.Wait()

// Final validation of cache state
batch, ok := cache.Get(7)
assert.True(t, ok)
assert.NotNil(t, batch)
assert.Equal(t, testBatch.Version, batch.Version)
})
}