Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ func (s *channelManager) pruneSafeBlocks(newSafeHead eth.L2BlockRef) {

if newSafeHead.Number+1 < oldestBlock.NumberU64() {
// This could happen if there was an L1 reorg.
// Or if the sequencer restarted.
s.log.Warn("safe head reversed, clearing channel manager state",
"oldestBlock", eth.ToBlockID(oldestBlock),
"newSafeBlock", newSafeHead)
Expand Down Expand Up @@ -565,3 +566,10 @@ func (m *channelManager) CheckExpectedProgress(syncStatus eth.SyncStatus) error
}
return nil
}

func (m *channelManager) LastStoredBlock() eth.BlockID {
if m.blocks.Len() == 0 {
return eth.BlockID{}
}
return eth.ToBlockID(m.blocks[m.blocks.Len()-1])
}
49 changes: 18 additions & 31 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,6 @@ type BatchSubmitter struct {
txpoolState TxPoolState
txpoolBlockedBlob bool

// lastStoredBlock is the last block loaded into `state`. If it is empty it should be set to the l2 safe head.
lastStoredBlock eth.BlockID
lastL1Tip eth.L1BlockRef

state *channelManager
}

Expand Down Expand Up @@ -147,7 +143,6 @@ func (l *BatchSubmitter) StartBatchSubmitting() error {
l.shutdownCtx, l.cancelShutdownCtx = context.WithCancel(context.Background())
l.killCtx, l.cancelKillCtx = context.WithCancel(context.Background())
l.clearState(l.shutdownCtx)
l.lastStoredBlock = eth.BlockID{}

if err := l.waitForL2Genesis(); err != nil {
return fmt.Errorf("error waiting for L2 genesis: %w", err)
Expand Down Expand Up @@ -271,13 +266,11 @@ func (l *BatchSubmitter) loadBlocksIntoState(syncStatus eth.SyncStatus, ctx cont
block, err := l.loadBlockIntoState(ctx, i)
if errors.Is(err, ErrReorg) {
l.Log.Warn("Found L2 reorg", "block_number", i)
l.lastStoredBlock = eth.BlockID{}
return err
} else if err != nil {
l.Log.Warn("Failed to load block into state", "err", err)
return err
}
l.lastStoredBlock = eth.ToBlockID(block)
latestBlock = block
}

Expand Down Expand Up @@ -366,29 +359,31 @@ func (l *BatchSubmitter) getSyncStatus(ctx context.Context) (*eth.SyncStatus, er
}

// calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state.
// It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions
// as well as garbage collecting blocks which became safe)
func (l *BatchSubmitter) calculateL2BlockRangeToStore(syncStatus eth.SyncStatus) (eth.BlockID, eth.BlockID, error) {
if syncStatus.HeadL1 == (eth.L1BlockRef{}) {
return eth.BlockID{}, eth.BlockID{}, errors.New("empty sync status")
}

// Check last stored to see if it needs to be set on startup OR set if is lagged behind.
// It lagging implies that the op-node processed some batches that were submitted prior to the current instance of the batcher being alive.
if l.lastStoredBlock == (eth.BlockID{}) {
l.Log.Info("Starting batch-submitter work at safe-head", "safe", syncStatus.SafeL2)
l.lastStoredBlock = syncStatus.SafeL2.ID()
} else if l.lastStoredBlock.Number < syncStatus.SafeL2.Number {
l.Log.Warn("Last submitted block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", l.lastStoredBlock, "safe", syncStatus.SafeL2)
l.lastStoredBlock = syncStatus.SafeL2.ID()
}

// Check if we should even attempt to load any blocks. TODO: May not need this check
if syncStatus.SafeL2.Number >= syncStatus.UnsafeL2.Number {
return eth.BlockID{}, eth.BlockID{}, fmt.Errorf("L2 safe head(%d) ahead of L2 unsafe head(%d)", syncStatus.SafeL2.Number, syncStatus.UnsafeL2.Number)
return eth.BlockID{}, eth.BlockID{}, fmt.Errorf("L2 safe head(%d) >= L2 unsafe head(%d)", syncStatus.SafeL2.Number, syncStatus.UnsafeL2.Number)
}

lastStoredBlock := l.state.LastStoredBlock()
start := lastStoredBlock
end := syncStatus.UnsafeL2.ID()

// Check last stored block to see if it is empty or has lagged behind.
// It lagging implies that the op-node processed some batches that
// were submitted prior to the current instance of the batcher being alive.
if lastStoredBlock == (eth.BlockID{}) {
l.Log.Info("Resuming batch-submitter work at safe-head", "safe", syncStatus.SafeL2)
start = syncStatus.SafeL2.ID()
} else if lastStoredBlock.Number < syncStatus.SafeL2.Number {
l.Log.Warn("Last stored block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", lastStoredBlock, "safe", syncStatus.SafeL2)
start = syncStatus.SafeL2.ID()
}

return l.lastStoredBlock, syncStatus.UnsafeL2.ID(), nil
return start, end, nil
}

// The following things occur:
Expand Down Expand Up @@ -704,7 +699,7 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
l.Log.Error("Failed to query L1 tip", "err", err)
return err
}
l.recordL1Tip(l1tip)
l.Metr.RecordLatestL1Block(l1tip)

// Collect next transaction data. This pulls data out of the channel, so we need to make sure
// to put it back if ever da or txmgr requests fail, by calling l.recordFailedDARequest/recordFailedTx.
Expand Down Expand Up @@ -886,14 +881,6 @@ func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txRef]) {
}
}

func (l *BatchSubmitter) recordL1Tip(l1tip eth.L1BlockRef) {
if l.lastL1Tip == l1tip {
return
}
l.lastL1Tip = l1tip
l.Metr.RecordLatestL1Block(l1tip)
}

func (l *BatchSubmitter) recordFailedDARequest(id txID, err error) {
if err != nil {
l.Log.Warn("DA request failed", logFields(id, err)...)
Expand Down