Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions block/internal/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ type Metrics struct {
DAInclusionHeight metrics.Gauge
PendingHeadersCount metrics.Gauge
PendingDataCount metrics.Gauge

// Forced inclusion metrics
ForcedInclusionTxsInGracePeriod metrics.Gauge // Number of forced inclusion txs currently in grace period
ForcedInclusionTxsMalicious metrics.Counter // Total number of forced inclusion txs marked as malicious
}

// PrometheusMetrics returns Metrics built using Prometheus client library
Expand Down Expand Up @@ -182,6 +186,21 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Help: "Number of data blocks pending DA submission",
}, labels).With(labelsAndValues...)

// Forced inclusion metrics
m.ForcedInclusionTxsInGracePeriod = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "forced_inclusion_txs_in_grace_period",
Help: "Number of forced inclusion transactions currently in grace period (past epoch end but within grace boundary)",
}, labels).With(labelsAndValues...)

m.ForcedInclusionTxsMalicious = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "forced_inclusion_txs_malicious_total",
Help: "Total number of forced inclusion transactions marked as malicious (past grace boundary)",
}, labels).With(labelsAndValues...)

// DA Submitter metrics
m.DASubmitterPendingBlobs = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Expand Down Expand Up @@ -246,6 +265,10 @@ func NopMetrics() *Metrics {
DASubmitterLastFailure: make(map[DASubmitterFailureReason]metrics.Gauge),
DASubmitterPendingBlobs: discard.NewGauge(),
DASubmitterResends: discard.NewCounter(),

// Forced inclusion metrics
ForcedInclusionTxsInGracePeriod: discard.NewGauge(),
ForcedInclusionTxsMalicious: discard.NewCounter(),
}

// Initialize maps with no-op metrics
Expand Down
203 changes: 182 additions & 21 deletions block/internal/syncing/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"math"
"sync"
"sync/atomic"
"time"
Expand All @@ -18,6 +19,7 @@ import (

coreda "github.com/evstack/ev-node/core/da"
coreexecutor "github.com/evstack/ev-node/core/execution"
seqcommon "github.com/evstack/ev-node/sequencers/common"

"github.com/evstack/ev-node/block/internal/cache"
"github.com/evstack/ev-node/block/internal/common"
Expand All @@ -28,6 +30,47 @@ import (
"github.com/evstack/ev-node/types"
)

// forcedInclusionGracePeriodConfig contains internal configuration for forced inclusion grace periods.
type forcedInclusionGracePeriodConfig struct {
// basePeriod is the base number of additional epochs allowed for including forced inclusion transactions
// before marking the sequencer as malicious. This provides tolerance for temporary chain congestion.
// A value of 0 means strict enforcement (no grace period).
// A value of 1 means transactions from epoch N can be included in epoch N+1 without being marked malicious.
// Recommended: 1 epoch.
basePeriod uint64

// dynamicMinMultiplier is the minimum multiplier for the base grace period.
// The actual grace period will be at least: basePeriod * dynamicMinMultiplier.
// Example: base=2, min=0.5 → minimum grace period is 1 epoch.
dynamicMinMultiplier float64

// dynamicMaxMultiplier is the maximum multiplier for the base grace period.
// The actual grace period will be at most: basePeriod * dynamicMaxMultiplier.
// Example: base=2, max=3.0 → maximum grace period is 6 epochs.
dynamicMaxMultiplier float64

// dynamicFullnessThreshold defines what percentage of block capacity is considered "full".
// When EMA of block fullness is above this threshold, grace period increases.
// When below, grace period decreases. Value should be between 0.0 and 1.0.
dynamicFullnessThreshold float64

// dynamicAdjustmentRate controls how quickly the grace period multiplier adapts.
// Higher values make it adapt faster to congestion changes. Value should be between 0.0 and 1.0.
// Recommended: 0.05 for gradual adjustment, 0.1 for faster response.
dynamicAdjustmentRate float64
}

// newForcedInclusionGracePeriodConfig returns the internal grace period configuration.
func newForcedInclusionGracePeriodConfig() forcedInclusionGracePeriodConfig {
return forcedInclusionGracePeriodConfig{
basePeriod: 1, // 1 epoch grace period
dynamicMinMultiplier: 0.5, // Minimum 0.5x base grace period
dynamicMaxMultiplier: 3.0, // Maximum 3x base grace period
dynamicFullnessThreshold: 0.8, // 80% capacity considered full
dynamicAdjustmentRate: 0.05, // 5% adjustment per block
}
}

// Syncer handles block synchronization from DA and P2P sources.
type Syncer struct {
// Core components
Expand Down Expand Up @@ -66,6 +109,9 @@ type Syncer struct {

// Forced inclusion tracking
pendingForcedInclusionTxs sync.Map // map[string]pendingForcedInclusionTx
gracePeriodMultiplier *atomic.Pointer[float64]
blockFullnessEMA *atomic.Pointer[float64]
gracePeriodConfig forcedInclusionGracePeriodConfig

// Lifecycle
ctx context.Context
Expand Down Expand Up @@ -102,22 +148,34 @@ func NewSyncer(
daRetrieverHeight := &atomic.Uint64{}
daRetrieverHeight.Store(genesis.DAStartHeight)

// Initialize dynamic grace period state
initialMultiplier := 1.0
gracePeriodMultiplier := &atomic.Pointer[float64]{}
gracePeriodMultiplier.Store(&initialMultiplier)

initialFullness := 0.0
blockFullnessEMA := &atomic.Pointer[float64]{}
blockFullnessEMA.Store(&initialFullness)

return &Syncer{
store: store,
exec: exec,
cache: cache,
metrics: metrics,
config: config,
genesis: genesis,
options: options,
lastState: &atomic.Pointer[types.State]{},
daClient: daClient,
daRetrieverHeight: daRetrieverHeight,
headerStore: headerStore,
dataStore: dataStore,
heightInCh: make(chan common.DAHeightEvent, 100),
errorCh: errorCh,
logger: logger.With().Str("component", "syncer").Logger(),
store: store,
exec: exec,
cache: cache,
metrics: metrics,
config: config,
genesis: genesis,
options: options,
lastState: &atomic.Pointer[types.State]{},
daClient: daClient,
daRetrieverHeight: daRetrieverHeight,
headerStore: headerStore,
dataStore: dataStore,
heightInCh: make(chan common.DAHeightEvent, 100),
errorCh: errorCh,
logger: logger.With().Str("component", "syncer").Logger(),
gracePeriodMultiplier: gracePeriodMultiplier,
blockFullnessEMA: blockFullnessEMA,
gracePeriodConfig: newForcedInclusionGracePeriodConfig(),
}
}

Expand Down Expand Up @@ -677,15 +735,92 @@ func hashTx(tx []byte) string {
return hex.EncodeToString(hash[:])
}

// calculateBlockFullness returns a value between 0.0 and 1.0 indicating how full the block is.
// It estimates fullness based on total data size.
// This is a heuristic - actual limits may vary by execution layer.
func (s *Syncer) calculateBlockFullness(data *types.Data) float64 {
const maxDataSize = seqcommon.AbsoluteMaxBlobSize

var fullness float64
count := 0

// Check data size fullness
dataSize := uint64(0)
for _, tx := range data.Txs {
dataSize += uint64(len(tx))
}
sizeFullness := float64(dataSize) / float64(maxDataSize)
fullness += min(sizeFullness, 1.0)
count++

// Return average fullness
return fullness / float64(count)
}

// updateDynamicGracePeriod updates the grace period multiplier based on block fullness.
// When blocks are consistently full, the multiplier increases (more lenient).
// When blocks have capacity, the multiplier decreases (stricter).
func (s *Syncer) updateDynamicGracePeriod(blockFullness float64) {
// Update exponential moving average of block fullness
currentEMA := *s.blockFullnessEMA.Load()
alpha := s.gracePeriodConfig.dynamicAdjustmentRate
newEMA := alpha*blockFullness + (1-alpha)*currentEMA
s.blockFullnessEMA.Store(&newEMA)

// Adjust grace period multiplier based on EMA
currentMultiplier := *s.gracePeriodMultiplier.Load()
threshold := s.gracePeriodConfig.dynamicFullnessThreshold

var newMultiplier float64
if newEMA > threshold {
// Blocks are full - increase grace period (more lenient)
adjustment := alpha * (newEMA - threshold) / (1.0 - threshold)
newMultiplier = currentMultiplier + adjustment
} else {
// Blocks have capacity - decrease grace period (stricter)
adjustment := alpha * (threshold - newEMA) / threshold
newMultiplier = currentMultiplier - adjustment
}

// Clamp to min/max bounds
newMultiplier = max(newMultiplier, s.gracePeriodConfig.dynamicMinMultiplier)
newMultiplier = min(newMultiplier, s.gracePeriodConfig.dynamicMaxMultiplier)

s.gracePeriodMultiplier.Store(&newMultiplier)

// Log significant changes (more than 10% change)
if math.Abs(newMultiplier-currentMultiplier) > 0.1 {
s.logger.Debug().
Float64("block_fullness", blockFullness).
Float64("fullness_ema", newEMA).
Float64("old_multiplier", currentMultiplier).
Float64("new_multiplier", newMultiplier).
Msg("dynamic grace period multiplier adjusted")
}
}

// getEffectiveGracePeriod returns the current effective grace period considering dynamic adjustment.
func (s *Syncer) getEffectiveGracePeriod() uint64 {
multiplier := *s.gracePeriodMultiplier.Load()
effectivePeriod := math.Round(float64(s.gracePeriodConfig.basePeriod) * multiplier)
minPeriod := float64(s.gracePeriodConfig.basePeriod) * s.gracePeriodConfig.dynamicMinMultiplier

return uint64(max(effectivePeriod, minPeriod))
}

// verifyForcedInclusionTxs verifies that forced inclusion transactions from DA are properly handled.
// Note: Due to block size constraints (MaxBytes), sequencers may defer forced inclusion transactions
// to future blocks (smoothing). This is legitimate behavior within an epoch.
// However, ALL forced inclusion txs from an epoch MUST be included before the next epoch begins.
// However, ALL forced inclusion txs from an epoch MUST be included before the next epoch begins or grace boundary (whichever comes later).
func (s *Syncer) verifyForcedInclusionTxs(currentState types.State, data *types.Data) error {
if s.fiRetriever == nil {
return nil
}

// Update dynamic grace period based on block fullness
blockFullness := s.calculateBlockFullness(data)
s.updateDynamicGracePeriod(blockFullness)

// Retrieve forced inclusion transactions from DA for current epoch
forcedIncludedTxsEvent, err := s.fiRetriever.RetrieveForcedIncludedTxs(s.ctx, currentState.DAHeight)
if err != nil {
Expand Down Expand Up @@ -741,16 +876,36 @@ func (s *Syncer) verifyForcedInclusionTxs(currentState types.State, data *types.
}

// Check if we've moved past any epoch boundaries with pending txs
// Grace period: Allow forced inclusion txs from epoch N to be included in epoch N+1, N+2, etc.
// Only flag as malicious if past grace boundary to prevent false positives during chain congestion.
var maliciousTxs, remainingPending []pendingForcedInclusionTx
var txsInGracePeriod int
for _, pending := range stillPending {
// If current DA height is past this epoch's end, these txs should have been included
if currentState.DAHeight > pending.EpochEnd {
// Calculate grace boundary: epoch end + (effective grace periods × epoch size)
effectiveGracePeriod := s.getEffectiveGracePeriod()
graceBoundary := pending.EpochEnd + (effectiveGracePeriod * s.genesis.DAEpochForcedInclusion)

if currentState.DAHeight > graceBoundary {
maliciousTxs = append(maliciousTxs, pending)
s.logger.Warn().
Uint64("current_da_height", currentState.DAHeight).
Uint64("epoch_end", pending.EpochEnd).
Uint64("grace_boundary", graceBoundary).
Uint64("base_grace_periods", s.gracePeriodConfig.basePeriod).
Uint64("effective_grace_periods", effectiveGracePeriod).
Float64("grace_multiplier", *s.gracePeriodMultiplier.Load()).
Str("tx_hash", pending.TxHash[:16]).
Msg("forced inclusion transaction past grace boundary - marking as malicious")
} else {
remainingPending = append(remainingPending, pending)
if currentState.DAHeight > pending.EpochEnd {
txsInGracePeriod++
}
}
}

s.metrics.ForcedInclusionTxsInGracePeriod.Set(float64(txsInGracePeriod))

// Update pending map - clear old entries and store only remaining pending
s.pendingForcedInclusionTxs.Range(func(key, value any) bool {
s.pendingForcedInclusionTxs.Delete(key)
Expand All @@ -760,14 +915,20 @@ func (s *Syncer) verifyForcedInclusionTxs(currentState types.State, data *types.
s.pendingForcedInclusionTxs.Store(pending.TxHash, pending)
}

// If there are transactions from past epochs that weren't included, sequencer is malicious
// If there are transactions past grace boundary that weren't included, sequencer is malicious
if len(maliciousTxs) > 0 {
s.metrics.ForcedInclusionTxsMalicious.Add(float64(len(maliciousTxs)))

effectiveGracePeriod := s.getEffectiveGracePeriod()
s.logger.Error().
Uint64("height", data.Height()).
Uint64("current_da_height", currentState.DAHeight).
Int("malicious_count", len(maliciousTxs)).
Msg("SEQUENCER IS MALICIOUS: forced inclusion transactions from past epoch(s) not included")
return errors.Join(errMaliciousProposer, fmt.Errorf("sequencer is malicious: %d forced inclusion transactions from past epoch(s) not included", len(maliciousTxs)))
Uint64("base_grace_periods", s.gracePeriodConfig.basePeriod).
Uint64("effective_grace_periods", effectiveGracePeriod).
Float64("grace_multiplier", *s.gracePeriodMultiplier.Load()).
Msg("SEQUENCER IS MALICIOUS: forced inclusion transactions past grace boundary not included")
return errors.Join(errMaliciousProposer, fmt.Errorf("sequencer is malicious: %d forced inclusion transactions past grace boundary (base_grace_periods=%d, effective_grace_periods=%d) not included", len(maliciousTxs), s.gracePeriodConfig.basePeriod, effectiveGracePeriod))
}

// Log current state
Expand Down
Loading
Loading