Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/l1 msg queue #1055

Draft
wants to merge 5 commits into
base: feat/l1-state-tracker
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions core/rawdb/accessors_l1_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,22 @@ func ReadFirstQueueIndexNotInL2Block(db ethdb.Reader, l2BlockHash common.Hash) *
queueIndex := binary.BigEndian.Uint64(data)
return &queueIndex
}

// WriteL1MsgStorageState writes the L1MsgStorage state
func WriteL1MsgStorageState(db ethdb.KeyValueWriter, state []byte) {
if err := db.Put(l1MsgStorageStateKey, state); err != nil {
log.Crit("Failed to update L1MsgStorage state", "err", err)
}
}

// ReadL1MsgStorageState retrieves the L1MsgStorage state
func ReadL1MsgStorageState(db ethdb.Reader) []byte {
data, err := db.Get(l1MsgStorageStateKey)
if err != nil && isNotFoundErr(err) {
return nil
}
if err != nil {
log.Crit("Failed to read highest synced L1 message queue index from database", "err", err)
}
return data
}
2 changes: 2 additions & 0 deletions core/rawdb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ var (
firstQueueIndexNotInL2BlockPrefix = []byte("q") // firstQueueIndexNotInL2BlockPrefix + L2 block hash -> enqueue index
highestSyncedQueueIndexKey = []byte("HighestSyncedQueueIndex")

l1MsgStorageStateKey = []byte("L1MsgStorageState")

// Scroll rollup event store
rollupEventSyncedL1BlockNumberKey = []byte("R-LastRollupEventSyncedL1BlockNumber")
batchChunkRangesPrefix = []byte("R-bcr")
Expand Down
262 changes: 262 additions & 0 deletions rollup/l1/msg_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
package l1

import (
"context"
"fmt"
"sync"
"time"

"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/log"
)

const (
defaultFetchInterval = 5 * time.Second
)

type MsgStorageState struct {
StartBlockHeader *types.Header
EndBlockHeader *types.Header
}

type MsgStorage struct {
Copy link
Member

@georgehao georgehao Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some metrics to MsgStorage eg:

  • whether the task is running
  • which head block is currently processed

state MsgStorageState

ctx context.Context
cancel context.CancelFunc

msgs *common.ShrinkingMap[uint64, storedL1Message]
reader *Reader
unsubscribeTracker func()
newChainNotifications []newChainNotification
NazariiDenha marked this conversation as resolved.
Show resolved Hide resolved

msgsMu sync.RWMutex
notifsMu sync.Mutex
}

func NewMsgStorage(ctx context.Context, tracker *Tracker, reader *Reader) (*MsgStorage, error) {
if tracker == nil || reader == nil {
return nil, fmt.Errorf("failed to create MsgStorage, reader or tracker is nil")
}
ctx, cancel := context.WithCancel(ctx)
msgStorage := &MsgStorage{
ctx: ctx,
cancel: cancel,
msgs: common.NewShrinkingMap[uint64, storedL1Message](1000),
reader: reader,
}
msgStorage.unsubscribeTracker = tracker.Subscribe(LatestChainHead, func(old, new []*types.Header) {
NazariiDenha marked this conversation as resolved.
Show resolved Hide resolved
msgStorage.notifsMu.Lock()
defer msgStorage.notifsMu.Unlock()
msgStorage.newChainNotifications = append(msgStorage.newChainNotifications, newChainNotification{old, new})
NazariiDenha marked this conversation as resolved.
Show resolved Hide resolved
})

msgStorage.Start()
NazariiDenha marked this conversation as resolved.
Show resolved Hide resolved
return msgStorage, nil
}

func (ms *MsgStorage) Start() {
log.Info("starting MsgStorage")
go func() {
fetchTicker := time.NewTicker(defaultFetchInterval)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove the ticker here and simply wait for the channel to be filled and the context done in the select below

defer fetchTicker.Stop()

for {
select {
case <-ms.ctx.Done():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need to specifically handle <-ms.ctx.Done() and the subsequent select handle this again

return
default:
}
select {
case <-ms.ctx.Done():
return
case <-fetchTicker.C:
if len(ms.newChainNotifications) != 0 {
err := ms.fetchMessages()
if err != nil {
log.Warn("MsgStorage: failed to fetch messages", "err", err)
}
}
}

}
}()
}

// ReadL1Message retrieves the L1 message corresponding to the enqueue index.
func (ms *MsgStorage) ReadL1Message(queueIndex uint64) *types.L1MessageTx {
ms.msgsMu.RLock()
defer ms.msgsMu.RUnlock()
msg, exists := ms.msgs.Get(queueIndex)
if !exists {
return nil
}
return msg.l1msg
}

// IterateL1MessagesFrom creates an L1MessageIterator that iterates over
// all L1 message in the MsgStorage starting at the provided enqueue index.
func (ms *MsgStorage) IterateL1MessagesFrom(fromQueueIndex uint64) L1MessageIterator {
return L1MessageIterator{
curIndex: fromQueueIndex,
msgStorage: ms,
}
}

// ReadL1MessagesFrom retrieves up to `maxCount` L1 messages starting at `startIndex`.
func (ms *MsgStorage) ReadL1MessagesFrom(startIndex, maxCount uint64) []types.L1MessageTx {
msgs := make([]types.L1MessageTx, 0, maxCount)

index := startIndex
count := maxCount

storedL1Msg, exists := ms.msgs.Get(index)
for count > 0 && exists {
msg := storedL1Msg.l1msg

// sanity check
if msg.QueueIndex != index {
log.Crit(
"Unexpected QueueIndex in ReadL1MessagesFrom",
"expected", index,
"got", msg.QueueIndex,
"startIndex", startIndex,
"maxCount", maxCount,
)
}

msgs = append(msgs, *msg)
index += 1
count -= 1
storedL1Msg, exists = ms.msgs.Get(index)
}

return msgs
}
NazariiDenha marked this conversation as resolved.
Show resolved Hide resolved

func (ms *MsgStorage) fetchMessages() error {
ms.notifsMu.Lock()
notifs := ms.newChainNotifications
NazariiDenha marked this conversation as resolved.
Show resolved Hide resolved
ms.newChainNotifications = nil
ms.notifsMu.Unlock()

// go through all chain notifications and process
for _, newChainNotification := range notifs {
old, new := newChainNotification.old, newChainNotification.new

// check if there is old chain to delete l1msgs from
if old != nil {
// find msgs that come for reorged chain
ms.msgsMu.RLock()
msgs := ms.msgs.Values()
ms.msgsMu.RUnlock()
var indexesToDelete []uint64
for _, msg := range msgs {
contains := false
for _, header := range old {
if header.Hash() == msg.headerHash {
contains = true
break
}
}
if contains {
indexesToDelete = append(indexesToDelete, msg.l1msg.QueueIndex)
}
}
if len(indexesToDelete) > 0 {
ms.msgsMu.Lock()
for _, index := range indexesToDelete {
ms.msgs.Delete(index)
}
ms.msgsMu.Unlock()
}
}
Comment on lines +267 to +287

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about storing an additional map L1 blockNum -> L1 message indexes. then we just need to iterate once and delete. this should be rare but it could be quite costly depending on how big msgs and old are


// load messages from new chain
start := new[len(new)-1].Number.Uint64()
end := new[0].Number.Uint64()
events, err := ms.reader.FetchL1MessageEventsInRange(start, end)
if err != nil {
return fmt.Errorf("failed to fetch l1 messages in range, start: %d, end: %d, err: %w", start, end, err)
}
msgsToStore := make([]storedL1Message, len(events))
for _, event := range events {
msg := &types.L1MessageTx{
QueueIndex: event.QueueIndex,
Gas: event.GasLimit.Uint64(),
To: &event.Target,
Value: event.Value,
Data: event.Data,
Sender: event.Sender,
}
msgsToStore = append(msgsToStore, storedL1Message{
l1msg: msg,
headerHash: event.Raw.BlockHash,
})
}
ms.msgsMu.Lock()
for _, msg := range msgsToStore {
ms.msgs.Set(msg.l1msg.QueueIndex, msg)
}
ms.msgsMu.Unlock()
// update storage state
ms.state.EndBlockHeader = new[0]
if ms.state.StartBlockHeader == nil {
ms.state.StartBlockHeader = new[len(new)-1]
}
}
return nil
}

// PruneMessages deletes all messages that are older or equal to provided index
func (ms *MsgStorage) PruneMessages(lastIndex uint64) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who call this function? if no, maybe this it's called by itself periodically

ms.msgsMu.Lock()
defer ms.msgsMu.Unlock()

// todo: update state for graceful restart
deleted := ms.msgs.Delete(lastIndex)
for deleted {
lastIndex--
deleted = ms.msgs.Delete(lastIndex)
}
}

func (ms *MsgStorage) Stop() {
log.Info("stopping MsgStorage")
ms.cancel()
log.Info("MsgStorage stopped")
}

type storedL1Message struct {
l1msg *types.L1MessageTx
headerHash common.Hash
}

type newChainNotification struct {
old []*types.Header
new []*types.Header
}

type L1MessageIterator struct {
curIndex uint64
curMsg *types.L1MessageTx
msgStorage *MsgStorage
}

// Next moves the iterator to the next key/value pair.
// It returns false when there is no next L1Msg
func (it *L1MessageIterator) Next() bool {
it.curMsg = it.msgStorage.ReadL1Message(it.curIndex)
it.curIndex++
if it.curMsg == nil {
return false
} else {
return true
}
}

// L1Message returns the current L1 message.
func (it *L1MessageIterator) L1Message() types.L1MessageTx {
return *it.curMsg
}
17 changes: 4 additions & 13 deletions rollup/l1/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,8 @@ func (r *Reader) FetchRollupEventsInRange(from, to uint64) (RollupEvents, error)
return r.processLogsToRollupEvents(logs)
}

func (r *Reader) FetchL1MessagesInRange(fromBlock, toBlock uint64) ([]types.L1MessageTx, error) {
var msgs []types.L1MessageTx

func (r *Reader) FetchL1MessageEventsInRange(fromBlock, toBlock uint64) ([]*L1MessageQueueQueueTransaction, error) {
var events []*L1MessageQueueQueueTransaction
err := r.queryInBatches(fromBlock, toBlock, defaultL1MsgFetchBlockRange, func(from, to uint64) error {
it, err := r.filterer.FilterQueueTransaction(&bind.FilterOpts{
Start: from,
Expand All @@ -163,22 +162,14 @@ func (r *Reader) FetchL1MessagesInRange(fromBlock, toBlock uint64) ([]types.L1Me
if !event.GasLimit.IsUint64() {
return fmt.Errorf("invalid QueueTransaction event: QueueIndex = %v, GasLimit = %v", event.QueueIndex, event.GasLimit)
}

msgs = append(msgs, types.L1MessageTx{
QueueIndex: event.QueueIndex,
Gas: event.GasLimit.Uint64(),
To: &event.Target,
Value: event.Value,
Data: event.Data,
Sender: event.Sender,
})
events = append(events, event)
}
return it.Error()
})
if err != nil {
return nil, err
}
return msgs, nil
return events, nil
}

func (r *Reader) processLogsToRollupEvents(logs []types.Log) (RollupEvents, error) {
Expand Down
6 changes: 6 additions & 0 deletions rollup/rollup_sync_service/l1client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ func (m *mockEthClient) HeaderByNumber(ctx context.Context, number *big.Int) (*t
}, nil
}

func (m *mockEthClient) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) {
return &types.Header{
Number: big.NewInt(100 - 64),
}, nil
}

func (m *mockEthClient) SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) {
return nil, nil
}
Expand Down
1 change: 1 addition & 0 deletions rollup/sync_service/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type EthClient interface {
ChainID(ctx context.Context) (*big.Int, error)
FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error)
SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error)
TransactionByHash(ctx context.Context, txHash common.Hash) (tx *types.Transaction, isPending bool, err error)
BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error)
Expand Down
Loading