Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions block/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func NewSyncComponents(
)

// Create DA submitter for sync nodes (no signer, only DA inclusion processing)
daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, logger)
daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, metrics, logger)
submitter := submitting.NewSubmitter(
store,
exec,
Expand Down Expand Up @@ -240,7 +240,7 @@ func NewAggregatorComponents(
}

// Create DA submitter for aggregator nodes (with signer for submission)
daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, logger)
daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, metrics, logger)
submitter := submitting.NewSubmitter(
store,
exec,
Expand Down
148 changes: 116 additions & 32 deletions block/internal/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,34 @@ const (
MetricsSubsystem = "sequencer"
)

// DASubmitterFailureReason represents a typed failure reason for DA submission failures
type DASubmitterFailureReason string

const (
DASubmitterFailureReasonAlreadyRejected DASubmitterFailureReason = "already_rejected"
DASubmitterFailureReasonInsufficientFee DASubmitterFailureReason = "insufficient_fee"
DASubmitterFailureReasonTimeout DASubmitterFailureReason = "timeout"
DASubmitterFailureReasonAlreadyInMempool DASubmitterFailureReason = "already_in_mempool"
DASubmitterFailureReasonNotIncludedInBlock DASubmitterFailureReason = "not_included_in_block"
DASubmitterFailureReasonTooBig DASubmitterFailureReason = "too_big"
DASubmitterFailureReasonContextCanceled DASubmitterFailureReason = "context_canceled"
DASubmitterFailureReasonUnknown DASubmitterFailureReason = "unknown"
)

// AllDASubmitterFailureReasons returns all possible failure reasons
func AllDASubmitterFailureReasons() []DASubmitterFailureReason {
return []DASubmitterFailureReason{
DASubmitterFailureReasonAlreadyRejected,
DASubmitterFailureReasonInsufficientFee,
DASubmitterFailureReasonTimeout,
DASubmitterFailureReasonAlreadyInMempool,
DASubmitterFailureReasonNotIncludedInBlock,
DASubmitterFailureReasonTooBig,
DASubmitterFailureReasonContextCanceled,
DASubmitterFailureReasonUnknown,
}
}

// Metrics contains all metrics exposed by this package.
type Metrics struct {
// Original metrics
Expand Down Expand Up @@ -63,6 +91,12 @@ type Metrics struct {
// State transition metrics
StateTransitions map[string]metrics.Counter
InvalidTransitions metrics.Counter

// DA Submitter metrics
DASubmitterFailures map[DASubmitterFailureReason]metrics.Counter // Counter with reason label
DASubmitterLastFailure map[DASubmitterFailureReason]metrics.Gauge // Timestamp gauge with reason label
DASubmitterPendingBlobs metrics.Gauge // Total number of blobs awaiting submission (backlog)
DASubmitterResends metrics.Counter // Number of resend attempts
}

// PrometheusMetrics returns Metrics built using Prometheus client library
Expand All @@ -73,10 +107,12 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
}

m := &Metrics{
ChannelBufferUsage: make(map[string]metrics.Gauge),
ErrorsByType: make(map[string]metrics.Counter),
OperationDuration: make(map[string]metrics.Histogram),
StateTransitions: make(map[string]metrics.Counter),
ChannelBufferUsage: make(map[string]metrics.Gauge),
ErrorsByType: make(map[string]metrics.Counter),
OperationDuration: make(map[string]metrics.Histogram),
StateTransitions: make(map[string]metrics.Counter),
DASubmitterFailures: make(map[DASubmitterFailureReason]metrics.Counter),
DASubmitterLastFailure: make(map[DASubmitterFailureReason]metrics.Gauge),
}

// Original metrics
Expand Down Expand Up @@ -349,6 +385,44 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
}, labels).With(labelsAndValues...)
}

// DA Submitter metrics
m.DASubmitterPendingBlobs = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "da_submitter_pending_blobs",
Help: "Total number of blobs awaiting DA submission (backlog)",
}, labels).With(labelsAndValues...)

m.DASubmitterResends = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "da_submitter_resends_total",
Help: "Total number of DA submission retry attempts",
}, labels).With(labelsAndValues...)

// Initialize DA submitter failure counters and timestamps for various reasons
for _, reason := range AllDASubmitterFailureReasons() {
m.DASubmitterFailures[reason] = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "da_submitter_failures_total",
Help: "Total number of DA submission failures by reason",
ConstLabels: map[string]string{
"reason": string(reason),
},
}, labels).With(labelsAndValues...)

m.DASubmitterLastFailure[reason] = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "da_submitter_last_failure_timestamp",
Help: "Unix timestamp of the last DA submission failure by reason",
ConstLabels: map[string]string{
"reason": string(reason),
},
}, labels).With(labelsAndValues...)
}

return m
}

Expand All @@ -363,34 +437,38 @@ func NopMetrics() *Metrics {
CommittedHeight: discard.NewGauge(),

// Extended metrics
ChannelBufferUsage: make(map[string]metrics.Gauge),
ErrorsByType: make(map[string]metrics.Counter),
OperationDuration: make(map[string]metrics.Histogram),
StateTransitions: make(map[string]metrics.Counter),
DroppedSignals: discard.NewCounter(),
RecoverableErrors: discard.NewCounter(),
NonRecoverableErrors: discard.NewCounter(),
GoroutineCount: discard.NewGauge(),
DASubmissionAttempts: discard.NewCounter(),
DASubmissionSuccesses: discard.NewCounter(),
DASubmissionFailures: discard.NewCounter(),
DARetrievalAttempts: discard.NewCounter(),
DARetrievalSuccesses: discard.NewCounter(),
DARetrievalFailures: discard.NewCounter(),
DAInclusionHeight: discard.NewGauge(),
PendingHeadersCount: discard.NewGauge(),
PendingDataCount: discard.NewGauge(),
SyncLag: discard.NewGauge(),
HeadersSynced: discard.NewCounter(),
DataSynced: discard.NewCounter(),
BlocksApplied: discard.NewCounter(),
InvalidHeadersCount: discard.NewCounter(),
BlockProductionTime: discard.NewHistogram(),
EmptyBlocksProduced: discard.NewCounter(),
LazyBlocksProduced: discard.NewCounter(),
NormalBlocksProduced: discard.NewCounter(),
TxsPerBlock: discard.NewHistogram(),
InvalidTransitions: discard.NewCounter(),
ChannelBufferUsage: make(map[string]metrics.Gauge),
ErrorsByType: make(map[string]metrics.Counter),
OperationDuration: make(map[string]metrics.Histogram),
StateTransitions: make(map[string]metrics.Counter),
DroppedSignals: discard.NewCounter(),
RecoverableErrors: discard.NewCounter(),
NonRecoverableErrors: discard.NewCounter(),
GoroutineCount: discard.NewGauge(),
DASubmissionAttempts: discard.NewCounter(),
DASubmissionSuccesses: discard.NewCounter(),
DASubmissionFailures: discard.NewCounter(),
DARetrievalAttempts: discard.NewCounter(),
DARetrievalSuccesses: discard.NewCounter(),
DARetrievalFailures: discard.NewCounter(),
DAInclusionHeight: discard.NewGauge(),
PendingHeadersCount: discard.NewGauge(),
PendingDataCount: discard.NewGauge(),
SyncLag: discard.NewGauge(),
HeadersSynced: discard.NewCounter(),
DataSynced: discard.NewCounter(),
BlocksApplied: discard.NewCounter(),
InvalidHeadersCount: discard.NewCounter(),
BlockProductionTime: discard.NewHistogram(),
EmptyBlocksProduced: discard.NewCounter(),
LazyBlocksProduced: discard.NewCounter(),
NormalBlocksProduced: discard.NewCounter(),
TxsPerBlock: discard.NewHistogram(),
InvalidTransitions: discard.NewCounter(),
DASubmitterFailures: make(map[DASubmitterFailureReason]metrics.Counter),
DASubmitterLastFailure: make(map[DASubmitterFailureReason]metrics.Gauge),
DASubmitterPendingBlobs: discard.NewGauge(),
DASubmitterResends: discard.NewCounter(),
}

// Initialize maps with no-op metrics
Expand All @@ -414,5 +492,11 @@ func NopMetrics() *Metrics {
m.StateTransitions[transition] = discard.NewCounter()
}

// Initialize DA submitter failure maps with no-op metrics
for _, reason := range AllDASubmitterFailureReasons() {
m.DASubmitterFailures[reason] = discard.NewCounter()
m.DASubmitterLastFailure[reason] = discard.NewGauge()
}

return m
}
65 changes: 64 additions & 1 deletion block/internal/submitting/da_submitter.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove all those metrics nil check in the code, and just pass a NoOp metrics if metrics is nil in the constructor? makes it less readable otherwise

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@claude can you implement this and other feedback from julien

Copy link
Contributor

@claude claude bot Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude finished @tac0turtle's task —— View job


Tasks

  • Read current implementation files
  • Create typed failure reason constants
  • Refactor metrics to use NoOp pattern in constructor
  • Remove nil checks for metrics throughout code
  • Update all call sites to use typed failure reasons
  • Update tests
  • Run linting and formatting
  • Commit and push changes

Summary

I've successfully implemented all of @julienrbrt's feedback:

1. Created Typed Failure Reason Constants

  • Added DASubmitterFailureReason type with 8 typed constants
  • Created AllDASubmitterFailureReasons() function to centralize the list
  • This prevents typos and makes failure reasons discoverable at compile time

2. Updated Metrics to Use Typed Reasons

  • Changed map types from map[string]metrics.Counter to map[DASubmitterFailureReason]metrics.Counter
  • Updated both PrometheusMetrics() and NopMetrics() to use the centralized list
  • All metric initialization now uses the typed constants

3. Refactored to Use NoOp Metrics Pattern

  • Updated NewDASubmitter() constructor to automatically use NopMetrics() if metrics is nil
  • Removed all if s.metrics == nil checks throughout the code (8 occurrences removed)
  • This makes the code more readable and eliminates defensive programming clutter

4. Updated recordFailure() Function

  • Changed signature to accept DASubmitterFailureReason instead of string
  • Added warning log for unregistered failure reasons (addressing the code review concern)
  • All call sites now use typed constants like common.DASubmitterFailureReasonTimeout

5. Benefits of Changes

  • ✅ Type safety: Compiler catches invalid failure reasons
  • ✅ Better maintainability: Single source of truth for failure reasons
  • ✅ Cleaner code: No nil checks scattered throughout
  • ✅ Better observability: Warning logs for unexpected failure reasons
  • ✅ No breaking changes: Tests still pass with NopMetrics()

All changes have been committed and pushed to the PR branch.


Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ type DASubmitter struct {
genesis genesis.Genesis
options common.BlockOptions
logger zerolog.Logger
metrics *common.Metrics

// calculate namespaces bytes once and reuse them
namespaceBz []byte
Expand All @@ -132,6 +133,7 @@ func NewDASubmitter(
config config.Config,
genesis genesis.Genesis,
options common.BlockOptions,
metrics *common.Metrics,
logger zerolog.Logger,
) *DASubmitter {
daSubmitterLogger := logger.With().Str("component", "da_submitter").Logger()
Expand All @@ -141,17 +143,37 @@ func NewDASubmitter(
server.SetDAVisualizationServer(server.NewDAVisualizationServer(da, visualizerLogger, config.Node.Aggregator))
}

// Use NoOp metrics if nil to avoid nil checks throughout the code
if metrics == nil {
metrics = common.NopMetrics()
}

return &DASubmitter{
da: da,
config: config,
genesis: genesis,
options: options,
metrics: metrics,
logger: daSubmitterLogger,
namespaceBz: coreda.NamespaceFromString(config.DA.GetNamespace()).Bytes(),
namespaceDataBz: coreda.NamespaceFromString(config.DA.GetDataNamespace()).Bytes(),
}
}

// recordFailure records a DA submission failure in metrics
func (s *DASubmitter) recordFailure(reason common.DASubmitterFailureReason) {
counter, ok := s.metrics.DASubmitterFailures[reason]
if !ok {
s.logger.Warn().Str("reason", string(reason)).Msg("unregistered failure reason, metric not recorded")
return
}
counter.Add(1)

if gauge, ok := s.metrics.DASubmitterLastFailure[reason]; ok {
gauge.Set(float64(time.Now().Unix()))
}
}

// getGasMultiplier fetches the gas multiplier from DA layer with fallback and clamping
func (s *DASubmitter) getGasMultiplier(ctx context.Context, pol retryPolicy) float64 {
gasMultiplier, err := s.da.GasMultiplier(ctx)
Expand Down Expand Up @@ -215,6 +237,7 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, cache cache.Manager) er
s.namespaceBz,
[]byte(s.config.DA.SubmitOptions),
cache,
func() uint64 { return cache.NumPendingHeaders() },
)
}

Expand Down Expand Up @@ -258,6 +281,7 @@ func (s *DASubmitter) SubmitData(ctx context.Context, cache cache.Manager, signe
s.namespaceDataBz,
[]byte(s.config.DA.SubmitOptions),
cache,
func() uint64 { return cache.NumPendingData() },
)
}

Expand Down Expand Up @@ -328,6 +352,7 @@ func submitToDA[T any](
namespace []byte,
options []byte,
cache cache.Manager,
getTotalPendingFn func() uint64,
) error {
marshaled, err := marshalItems(ctx, items, marshalFn, itemType)
if err != nil {
Expand All @@ -352,8 +377,18 @@ func submitToDA[T any](
marshaled = batchMarshaled
}

// Update pending blobs metric to track total backlog
if getTotalPendingFn != nil {
s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn()))
}

// Start the retry loop
for rs.Attempt < pol.MaxAttempts {
// Record resend metric for retry attempts (not the first attempt)
if rs.Attempt > 0 {
s.metrics.DASubmitterResends.Add(1)
}

if err := waitForBackoffOrContext(ctx, rs.Backoff); err != nil {
return err
}
Expand All @@ -375,14 +410,24 @@ func submitToDA[T any](
s.logger.Info().Str("itemType", itemType).Float64("gasPrice", rs.GasPrice).Uint64("count", res.SubmittedCount).Msg("successfully submitted items to DA layer")
if int(res.SubmittedCount) == len(items) {
rs.Next(reasonSuccess, pol, gm, sentinelNoGas)
// Update pending blobs metric to reflect total backlog
if getTotalPendingFn != nil {
s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn()))
}
return nil
}
// partial success: advance window
items = items[res.SubmittedCount:]
marshaled = marshaled[res.SubmittedCount:]
rs.Next(reasonSuccess, pol, gm, sentinelNoGas)
// Update pending blobs count to reflect total backlog
if getTotalPendingFn != nil {
s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn()))
}

case coreda.StatusTooBig:
// Record failure metric
s.recordFailure(common.DASubmitterFailureReasonTooBig)
// Iteratively halve until it fits or single-item too big
if len(items) == 1 {
s.logger.Error().Str("itemType", itemType).Msg("single item exceeds DA blob size limit")
Expand All @@ -397,21 +442,39 @@ func submitToDA[T any](
marshaled = marshaled[:half]
s.logger.Debug().Int("newBatchSize", half).Msg("batch too big; halving and retrying")
rs.Next(reasonTooBig, pol, gm, sentinelNoGas)
// Update pending blobs count to reflect total backlog
if getTotalPendingFn != nil {
s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn()))
}

case coreda.StatusNotIncludedInBlock:
// Record failure metric
s.recordFailure(common.DASubmitterFailureReasonNotIncludedInBlock)
s.logger.Info().Dur("backoff", pol.MaxBackoff).Float64("gasPrice", rs.GasPrice).Msg("retrying due to mempool state")
rs.Next(reasonMempool, pol, gm, sentinelNoGas)

case coreda.StatusNotIncludedInBlock, coreda.StatusAlreadyInMempool:
case coreda.StatusAlreadyInMempool:
// Record failure metric
s.recordFailure(common.DASubmitterFailureReasonAlreadyInMempool)
s.logger.Info().Dur("backoff", pol.MaxBackoff).Float64("gasPrice", rs.GasPrice).Msg("retrying due to mempool state")
rs.Next(reasonMempool, pol, gm, sentinelNoGas)

case coreda.StatusContextCanceled:
// Record failure metric
s.recordFailure(common.DASubmitterFailureReasonContextCanceled)
s.logger.Info().Msg("DA layer submission canceled due to context cancellation")
return context.Canceled

default:
// Record failure metric
s.recordFailure(common.DASubmitterFailureReasonUnknown)
s.logger.Error().Str("error", res.Message).Int("attempt", rs.Attempt+1).Msg("DA layer submission failed")
rs.Next(reasonFailure, pol, gm, sentinelNoGas)
}
}

// Final failure after max attempts
s.recordFailure(common.DASubmitterFailureReasonTimeout)
return fmt.Errorf("failed to submit all %s(s) to DA layer after %d attempts", itemType, rs.Attempt)
}

Expand Down
2 changes: 1 addition & 1 deletion block/internal/submitting/da_submitter_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestDASubmitter_SubmitHeadersAndData_MarksInclusionAndUpdatesLastSubmitted(
dummyDA := coreda.NewDummyDA(10_000_000, 0, 0, 10*time.Millisecond)

// Create DA submitter
daSubmitter := NewDASubmitter(dummyDA, cfg, gen, common.DefaultBlockOptions(), zerolog.Nop())
daSubmitter := NewDASubmitter(dummyDA, cfg, gen, common.DefaultBlockOptions(), common.NopMetrics(), zerolog.Nop())

// Submit headers and data
require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), cm))
Expand Down
Loading
Loading