Skip to content
73 changes: 32 additions & 41 deletions rollup/internal/controller/relayer/l2_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"golang.org/x/exp/maps"
"math"
"math/big"
"sort"
Expand Down Expand Up @@ -46,6 +45,8 @@ const (
Sigmoid
)

const secondsPerBlock = 12

// BaselineType enumerates the baseline types we support when
// turning a baseline fee into a “target” fee.
type BaselineType int
Expand Down Expand Up @@ -189,23 +190,7 @@ func NewLayer2Relayer(ctx context.Context, l2Client *ethclient.Client, db *gorm.
return nil, fmt.Errorf("invalid service type for l2_relayer: %v", serviceType)
}

// pick and validate submission strategy
windowSec := uint64(cfg.BatchSubmission.TimeoutSec)
strategy, ok := bestParams[windowSec]
if !ok {
return nil, fmt.Errorf(
"unsupported BatchSubmission.TimeoutSec: %d (must be one of %v)",
windowSec, maps.Keys(bestParams),
)
}
layer2Relayer.batchStrategy = strategy

latest, err := layer2Relayer.l1BlockOrm.GetLatestL1BlockHeight(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get latest L1 block height: %v", err)
}
layer2Relayer.lastFetchedBlock = latest - uint64(layer2Relayer.cfg.BatchSubmission.TimeoutSec)/12 // start ~window seconds ago
if _, err = layer2Relayer.fetchBlobFeeHistory(uint64(layer2Relayer.cfg.BatchSubmission.TimeoutSec)); err != nil {
if _, err := layer2Relayer.fetchBlobFeeHistory(uint64(layer2Relayer.cfg.BatchSubmission.TimeoutSec)); err != nil {
return nil, fmt.Errorf("initial blob‐fee load failed: %w", err)
}
return layer2Relayer, nil
Expand Down Expand Up @@ -348,18 +333,26 @@ func (r *Layer2Relayer) ProcessPendingBatches() {
return
}

var forceSubmit bool

// return if not hitting target price
if backlogCount <= r.cfg.BatchSubmission.BacklogMax {
// if the batch with the oldest index is too old, we force submit all batches that we have so far in the next step
oldest := dbBatches[0].CreatedAt
if skip, msg := r.skipSubmitByFee(oldest); skip {
log.Debug(msg)
return

if r.cfg.BatchSubmission.TimeoutSec > 0 && !forceSubmit && time.Since(oldest) > time.Duration(r.cfg.BatchSubmission.TimeoutSec)*time.Second {
forceSubmit = true
}
// if !skip, we fall through and submit immediately
if !forceSubmit {
if skip, err := r.skipSubmitByFee(oldest); skip {
log.Debug("Skipping batch submission", "reason", err)
return
}
}
// if !skip, fall through and submit
}

var batchesToSubmit []*dbBatchWithChunksAndParent
var forceSubmit bool
for i, dbBatch := range dbBatches {
if i == 0 && encoding.CodecVersion(dbBatch.CodecVersion) < encoding.CodecV7 {
// if the first batch is not >= V7 then we need to submit batches one by one
Expand Down Expand Up @@ -420,11 +413,6 @@ func (r *Layer2Relayer) ProcessPendingBatches() {
break
}

// if one of the batches is too old, we force submit all batches that we have so far in the next step
if r.cfg.BatchSubmission.TimeoutSec > 0 && !forceSubmit && time.Since(dbBatch.CreatedAt) > time.Duration(r.cfg.BatchSubmission.TimeoutSec)*time.Second {
forceSubmit = true
}

if batchesToSubmitLen < r.cfg.BatchSubmission.MaxBatches {
batchesToSubmit = append(batchesToSubmit, &dbBatchWithChunksAndParent{
Batch: dbBatch,
Expand Down Expand Up @@ -1211,6 +1199,11 @@ func (r *Layer2Relayer) fetchBlobFeeHistory(windowSec uint64) ([]*big.Int, error
if err != nil {
return nil, fmt.Errorf("GetLatestL1BlockHeight: %w", err)
}
// bootstrap on first call
if r.lastFetchedBlock == 0 {
// start window
r.lastFetchedBlock = latest - windowSec/secondsPerBlock
}
from := r.lastFetchedBlock + 1
//if new blocks
if from <= latest {
Expand All @@ -1225,14 +1218,12 @@ func (r *Layer2Relayer) fetchBlobFeeHistory(windowSec uint64) ([]*big.Int, error
}
}

maxLen := int(windowSec / 12)
maxLen := int(windowSec / secondsPerBlock)
if len(r.feeHistory) > maxLen {
r.feeHistory = r.feeHistory[len(r.feeHistory)-maxLen:]
}
// return a copy
out := make([]*big.Int, len(r.feeHistory))
copy(out, r.feeHistory)
return out, nil

return r.feeHistory, nil
}

// calculateTargetPrice applies pct_min/ewma + relaxation to get a BigInt target
Expand Down Expand Up @@ -1284,16 +1275,16 @@ func calculateTargetPrice(windowSec uint64, strategy StrategyParams, firstTime t
return out
}

// skipSubmitByFee returns (true,msg) when submission should be skipped right now
// skipSubmitByFee returns (true, nil) when submission should be skipped right now
// because the blob‐fee is above target and the timeout window hasn’t yet elapsed.
// Otherwise returns (false,msg) where msg is a warning about falling back.
func (r *Layer2Relayer) skipSubmitByFee(oldest time.Time) (bool, string) {
// Otherwise returns (false, err)
func (r *Layer2Relayer) skipSubmitByFee(oldest time.Time) (bool, error) {
windowSec := uint64(r.cfg.BatchSubmission.TimeoutSec)

hist, err := r.fetchBlobFeeHistory(windowSec)
if err != nil || len(hist) == 0 {
return false, fmt.Sprintf(
"blobfee history unavailable or empty; fallback to immediate batch submission – err=%v, history_length=%d",
return false, fmt.Errorf(
"blob-fee history unavailable or empty; fallback to immediate submission: %w (history_length=%d)",
err, len(hist),
)
}
Expand All @@ -1304,14 +1295,14 @@ func (r *Layer2Relayer) skipSubmitByFee(oldest time.Time) (bool, string) {

// if current fee > target and still inside the timeout window, skip
if current.Cmp(target) > 0 && time.Since(oldest) < time.Duration(windowSec)*time.Second {
return true, fmt.Sprintf(
"blobfee above target & window not yet passed; current=%s target=%s age=%s",
return true, fmt.Errorf(
"blob-fee above target & window not yet passed; current=%s target=%s age=%s",
current.String(), target.String(), time.Since(oldest),
)
}

// otherwise proceed with submission
return false, ""
return false, nil
}

func addrFromSignerConfig(config *config.SignerConfig) (common.Address, error) {
Expand Down
8 changes: 3 additions & 5 deletions rollup/internal/orm/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,17 +233,15 @@ func (o *Batch) GetFailedAndPendingBatchesCount(ctx context.Context) (int64, err
// GetFailedAndPendingBatches retrieves batches with failed or pending status up to the specified limit.
// The returned batches are sorted in ascending order by their index.
func (o *Batch) GetFailedAndPendingBatches(ctx context.Context, limit int) ([]*Batch, error) {
if limit < 0 {
return nil, errors.New("limit must be greater than or equal to zero")
if limit <= 0 {
return nil, errors.New("limit must be greater than zero")
}

db := o.db.WithContext(ctx)
db = db.Model(&Batch{})
db = db.Where("rollup_status = ? OR rollup_status = ?", types.RollupCommitFailed, types.RollupPending)
db = db.Order("index ASC")
if limit > 0 {
db = db.Limit(limit)
}
db = db.Limit(limit)

var batches []*Batch
if err := db.Find(&batches).Error; err != nil {
Expand Down
Loading