Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions block/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func NewSyncComponents(
)

// Create DA submitter for sync nodes (no signer, only DA inclusion processing)
daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, logger)
daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, metrics, logger)
submitter := submitting.NewSubmitter(
store,
exec,
Expand Down Expand Up @@ -240,7 +240,7 @@ func NewAggregatorComponents(
}

// Create DA submitter for aggregator nodes (with signer for submission)
daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, logger)
daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, metrics, logger)
submitter := submitting.NewSubmitter(
store,
exec,
Expand Down
140 changes: 108 additions & 32 deletions block/internal/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@
// State transition metrics
StateTransitions map[string]metrics.Counter
InvalidTransitions metrics.Counter

// DA Submitter metrics
DASubmitterFailures map[string]metrics.Counter // Counter with reason label

Check failure on line 68 in block/internal/common/metrics.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

File is not properly formatted (gofmt)
DASubmitterLastFailure map[string]metrics.Gauge // Timestamp gauge with reason label
DASubmitterPendingBlobs metrics.Gauge // Number of pending blobs
DASubmitterResends metrics.Counter // Number of resend attempts
}

// PrometheusMetrics returns Metrics built using Prometheus client library
Expand All @@ -73,10 +79,12 @@
}

m := &Metrics{
ChannelBufferUsage: make(map[string]metrics.Gauge),
ErrorsByType: make(map[string]metrics.Counter),
OperationDuration: make(map[string]metrics.Histogram),
StateTransitions: make(map[string]metrics.Counter),
ChannelBufferUsage: make(map[string]metrics.Gauge),
ErrorsByType: make(map[string]metrics.Counter),
OperationDuration: make(map[string]metrics.Histogram),
StateTransitions: make(map[string]metrics.Counter),
DASubmitterFailures: make(map[string]metrics.Counter),
DASubmitterLastFailure: make(map[string]metrics.Gauge),
}

// Original metrics
Expand Down Expand Up @@ -349,6 +357,54 @@
}, labels).With(labelsAndValues...)
}

// DA Submitter metrics
m.DASubmitterPendingBlobs = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "da_submitter_pending_blobs",
Help: "Number of blobs pending DA submission",
}, labels).With(labelsAndValues...)

m.DASubmitterResends = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "da_submitter_resends_total",
Help: "Total number of DA submission retry attempts",
}, labels).With(labelsAndValues...)

// Initialize DA submitter failure counters and timestamps for various reasons
failureReasons := []string{
"already_rejected",
"insufficient_fee",
"timeout",
"already_in_mempool",
"not_included_in_block",
"too_big",
"context_canceled",
"unknown",
}
for _, reason := range failureReasons {
m.DASubmitterFailures[reason] = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "da_submitter_failures_total",
Help: "Total number of DA submission failures by reason",
ConstLabels: map[string]string{
"reason": reason,
},
}, labels).With(labelsAndValues...)

m.DASubmitterLastFailure[reason] = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "da_submitter_last_failure_timestamp",
Help: "Unix timestamp of the last DA submission failure by reason",
ConstLabels: map[string]string{
"reason": reason,
},
}, labels).With(labelsAndValues...)
}

return m
}

Expand All @@ -363,34 +419,38 @@
CommittedHeight: discard.NewGauge(),

// Extended metrics
ChannelBufferUsage: make(map[string]metrics.Gauge),
ErrorsByType: make(map[string]metrics.Counter),
OperationDuration: make(map[string]metrics.Histogram),
StateTransitions: make(map[string]metrics.Counter),
DroppedSignals: discard.NewCounter(),
RecoverableErrors: discard.NewCounter(),
NonRecoverableErrors: discard.NewCounter(),
GoroutineCount: discard.NewGauge(),
DASubmissionAttempts: discard.NewCounter(),
DASubmissionSuccesses: discard.NewCounter(),
DASubmissionFailures: discard.NewCounter(),
DARetrievalAttempts: discard.NewCounter(),
DARetrievalSuccesses: discard.NewCounter(),
DARetrievalFailures: discard.NewCounter(),
DAInclusionHeight: discard.NewGauge(),
PendingHeadersCount: discard.NewGauge(),
PendingDataCount: discard.NewGauge(),
SyncLag: discard.NewGauge(),
HeadersSynced: discard.NewCounter(),
DataSynced: discard.NewCounter(),
BlocksApplied: discard.NewCounter(),
InvalidHeadersCount: discard.NewCounter(),
BlockProductionTime: discard.NewHistogram(),
EmptyBlocksProduced: discard.NewCounter(),
LazyBlocksProduced: discard.NewCounter(),
NormalBlocksProduced: discard.NewCounter(),
TxsPerBlock: discard.NewHistogram(),
InvalidTransitions: discard.NewCounter(),
ChannelBufferUsage: make(map[string]metrics.Gauge),
ErrorsByType: make(map[string]metrics.Counter),
OperationDuration: make(map[string]metrics.Histogram),
StateTransitions: make(map[string]metrics.Counter),
DroppedSignals: discard.NewCounter(),
RecoverableErrors: discard.NewCounter(),
NonRecoverableErrors: discard.NewCounter(),
GoroutineCount: discard.NewGauge(),
DASubmissionAttempts: discard.NewCounter(),
DASubmissionSuccesses: discard.NewCounter(),
DASubmissionFailures: discard.NewCounter(),
DARetrievalAttempts: discard.NewCounter(),
DARetrievalSuccesses: discard.NewCounter(),
DARetrievalFailures: discard.NewCounter(),
DAInclusionHeight: discard.NewGauge(),
PendingHeadersCount: discard.NewGauge(),
PendingDataCount: discard.NewGauge(),
SyncLag: discard.NewGauge(),
HeadersSynced: discard.NewCounter(),
DataSynced: discard.NewCounter(),
BlocksApplied: discard.NewCounter(),
InvalidHeadersCount: discard.NewCounter(),
BlockProductionTime: discard.NewHistogram(),
EmptyBlocksProduced: discard.NewCounter(),
LazyBlocksProduced: discard.NewCounter(),
NormalBlocksProduced: discard.NewCounter(),
TxsPerBlock: discard.NewHistogram(),
InvalidTransitions: discard.NewCounter(),
DASubmitterFailures: make(map[string]metrics.Counter),
DASubmitterLastFailure: make(map[string]metrics.Gauge),
DASubmitterPendingBlobs: discard.NewGauge(),
DASubmitterResends: discard.NewCounter(),
}

// Initialize maps with no-op metrics
Expand All @@ -414,5 +474,21 @@
m.StateTransitions[transition] = discard.NewCounter()
}

// Initialize DA submitter failure maps with no-op metrics
failureReasons := []string{
"already_rejected",
"insufficient_fee",
"timeout",
"already_in_mempool",
"not_included_in_block",
"too_big",
"context_canceled",
"unknown",
}
for _, reason := range failureReasons {
m.DASubmitterFailures[reason] = discard.NewCounter()
m.DASubmitterLastFailure[reason] = discard.NewGauge()
}

return m
}
56 changes: 55 additions & 1 deletion block/internal/submitting/da_submitter.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove all those metrics nil check in the code, and just pass a NoOp metrics if metrics is nil in the constructor? makes it less readable otherwise

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@claude can you implement this and other feedback from julien

Copy link
Contributor

@claude claude bot Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude finished @tac0turtle's task —— View job


Tasks

  • Read current implementation files
  • Create typed failure reason constants
  • Refactor metrics to use NoOp pattern in constructor
  • Remove nil checks for metrics throughout code
  • Update all call sites to use typed failure reasons
  • Update tests
  • Run linting and formatting
  • Commit and push changes

Summary

I've successfully implemented all of @julienrbrt's feedback:

1. Created Typed Failure Reason Constants

  • Added DASubmitterFailureReason type with 8 typed constants
  • Created AllDASubmitterFailureReasons() function to centralize the list
  • This prevents typos and makes failure reasons discoverable at compile time

2. Updated Metrics to Use Typed Reasons

  • Changed map types from map[string]metrics.Counter to map[DASubmitterFailureReason]metrics.Counter
  • Updated both PrometheusMetrics() and NopMetrics() to use the centralized list
  • All metric initialization now uses the typed constants

3. Refactored to Use NoOp Metrics Pattern

  • Updated NewDASubmitter() constructor to automatically use NopMetrics() if metrics is nil
  • Removed all if s.metrics == nil checks throughout the code (8 occurrences removed)
  • This makes the code more readable and eliminates defensive programming clutter

4. Updated recordFailure() Function

  • Changed signature to accept DASubmitterFailureReason instead of string
  • Added warning log for unregistered failure reasons (addressing the code review concern)
  • All call sites now use typed constants like common.DASubmitterFailureReasonTimeout

5. Benefits of Changes

  • ✅ Type safety: Compiler catches invalid failure reasons
  • ✅ Better maintainability: Single source of truth for failure reasons
  • ✅ Cleaner code: No nil checks scattered throughout
  • ✅ Better observability: Warning logs for unexpected failure reasons
  • ✅ No breaking changes: Tests still pass with NopMetrics()

All changes have been committed and pushed to the PR branch.


Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ type DASubmitter struct {
genesis genesis.Genesis
options common.BlockOptions
logger zerolog.Logger
metrics *common.Metrics

// calculate namespaces bytes once and reuse them
namespaceBz []byte
Expand All @@ -132,6 +133,7 @@ func NewDASubmitter(
config config.Config,
genesis genesis.Genesis,
options common.BlockOptions,
metrics *common.Metrics,
logger zerolog.Logger,
) *DASubmitter {
daSubmitterLogger := logger.With().Str("component", "da_submitter").Logger()
Expand All @@ -146,12 +148,26 @@ func NewDASubmitter(
config: config,
genesis: genesis,
options: options,
metrics: metrics,
logger: daSubmitterLogger,
namespaceBz: coreda.NamespaceFromString(config.DA.GetNamespace()).Bytes(),
namespaceDataBz: coreda.NamespaceFromString(config.DA.GetDataNamespace()).Bytes(),
}
}

// recordFailure records a DA submission failure in metrics
func (s *DASubmitter) recordFailure(reason string) {
if s.metrics == nil {
return
}
if counter, ok := s.metrics.DASubmitterFailures[reason]; ok {
counter.Add(1)
}
if gauge, ok := s.metrics.DASubmitterLastFailure[reason]; ok {
gauge.Set(float64(time.Now().Unix()))
}
}

// getGasMultiplier fetches the gas multiplier from DA layer with fallback and clamping
func (s *DASubmitter) getGasMultiplier(ctx context.Context, pol retryPolicy) float64 {
gasMultiplier, err := s.da.GasMultiplier(ctx)
Expand Down Expand Up @@ -352,8 +368,18 @@ func submitToDA[T any](
marshaled = batchMarshaled
}

// Update pending blobs metric
if s.metrics != nil {
s.metrics.DASubmitterPendingBlobs.Set(float64(len(items)))
}

// Start the retry loop
for rs.Attempt < pol.MaxAttempts {
// Record resend metric for retry attempts (not the first attempt)
if rs.Attempt > 0 && s.metrics != nil {
s.metrics.DASubmitterResends.Add(1)
}

if err := waitForBackoffOrContext(ctx, rs.Backoff); err != nil {
return err
}
Expand All @@ -375,14 +401,24 @@ func submitToDA[T any](
s.logger.Info().Str("itemType", itemType).Float64("gasPrice", rs.GasPrice).Uint64("count", res.SubmittedCount).Msg("successfully submitted items to DA layer")
if int(res.SubmittedCount) == len(items) {
rs.Next(reasonSuccess, pol, gm, sentinelNoGas)
// Clear pending blobs on success
if s.metrics != nil {
s.metrics.DASubmitterPendingBlobs.Set(0)
}
return nil
}
// partial success: advance window
items = items[res.SubmittedCount:]
marshaled = marshaled[res.SubmittedCount:]
rs.Next(reasonSuccess, pol, gm, sentinelNoGas)
// Update pending blobs count
if s.metrics != nil {
s.metrics.DASubmitterPendingBlobs.Set(float64(len(items)))
}

case coreda.StatusTooBig:
// Record failure metric
s.recordFailure("too_big")
// Iteratively halve until it fits or single-item too big
if len(items) == 1 {
s.logger.Error().Str("itemType", itemType).Msg("single item exceeds DA blob size limit")
Expand All @@ -397,21 +433,39 @@ func submitToDA[T any](
marshaled = marshaled[:half]
s.logger.Debug().Int("newBatchSize", half).Msg("batch too big; halving and retrying")
rs.Next(reasonTooBig, pol, gm, sentinelNoGas)
// Update pending blobs count
if s.metrics != nil {
s.metrics.DASubmitterPendingBlobs.Set(float64(len(items)))
}

case coreda.StatusNotIncludedInBlock:
// Record failure metric
s.recordFailure("not_included_in_block")
s.logger.Info().Dur("backoff", pol.MaxBackoff).Float64("gasPrice", rs.GasPrice).Msg("retrying due to mempool state")
rs.Next(reasonMempool, pol, gm, sentinelNoGas)

case coreda.StatusNotIncludedInBlock, coreda.StatusAlreadyInMempool:
case coreda.StatusAlreadyInMempool:
// Record failure metric
s.recordFailure("already_in_mempool")
s.logger.Info().Dur("backoff", pol.MaxBackoff).Float64("gasPrice", rs.GasPrice).Msg("retrying due to mempool state")
rs.Next(reasonMempool, pol, gm, sentinelNoGas)

case coreda.StatusContextCanceled:
// Record failure metric
s.recordFailure("context_canceled")
s.logger.Info().Msg("DA layer submission canceled due to context cancellation")
return context.Canceled

default:
// Record failure metric
s.recordFailure("unknown")
s.logger.Error().Str("error", res.Message).Int("attempt", rs.Attempt+1).Msg("DA layer submission failed")
rs.Next(reasonFailure, pol, gm, sentinelNoGas)
}
}

// Final failure after max attempts
s.recordFailure("timeout")
return fmt.Errorf("failed to submit all %s(s) to DA layer after %d attempts", itemType, rs.Attempt)
}

Expand Down
2 changes: 1 addition & 1 deletion block/internal/submitting/da_submitter_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestDASubmitter_SubmitHeadersAndData_MarksInclusionAndUpdatesLastSubmitted(
dummyDA := coreda.NewDummyDA(10_000_000, 0, 0, 10*time.Millisecond)

// Create DA submitter
daSubmitter := NewDASubmitter(dummyDA, cfg, gen, common.DefaultBlockOptions(), zerolog.Nop())
daSubmitter := NewDASubmitter(dummyDA, cfg, gen, common.DefaultBlockOptions(), common.NopMetrics(), zerolog.Nop())

// Submit headers and data
require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), cm))
Expand Down
2 changes: 1 addition & 1 deletion block/internal/submitting/da_submitter_mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func newTestSubmitter(mockDA *mocks.MockDA, override func(*config.Config)) *DASu
if override != nil {
override(&cfg)
}
return NewDASubmitter(mockDA, cfg, genesis.Genesis{} /*options=*/, common.BlockOptions{}, zerolog.Nop())
return NewDASubmitter(mockDA, cfg, genesis.Genesis{} /*options=*/, common.BlockOptions{}, common.NopMetrics(), zerolog.Nop())
}

// marshal helper for simple items
Expand Down
2 changes: 2 additions & 0 deletions block/internal/submitting/da_submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func setupDASubmitterTest(t *testing.T) (*DASubmitter, store.Store, cache.Manage
cfg,
gen,
common.DefaultBlockOptions(),
common.NopMetrics(),
zerolog.Nop(),
)

Expand Down Expand Up @@ -99,6 +100,7 @@ func TestNewDASubmitterSetsVisualizerWhenEnabled(t *testing.T) {
cfg,
genesis.Genesis{},
common.DefaultBlockOptions(),
common.NopMetrics(),
zerolog.Nop(),
)

Expand Down
4 changes: 2 additions & 2 deletions block/internal/submitting/submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func TestSubmitter_setSequencerHeightToDAHeight(t *testing.T) {

cfg := config.DefaultConfig()
metrics := common.NopMetrics()
daSub := NewDASubmitter(nil, cfg, genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop())
daSub := NewDASubmitter(nil, cfg, genesis.Genesis{}, common.DefaultBlockOptions(), metrics, zerolog.Nop())
s := NewSubmitter(mockStore, nil, cm, metrics, cfg, genesis.Genesis{}, daSub, nil, zerolog.Nop(), nil)
s.ctx = ctx

Expand Down Expand Up @@ -238,7 +238,7 @@ func TestSubmitter_processDAInclusionLoop_advances(t *testing.T) {
exec.On("SetFinal", mock.Anything, uint64(1)).Return(nil).Once()
exec.On("SetFinal", mock.Anything, uint64(2)).Return(nil).Once()

daSub := NewDASubmitter(nil, cfg, genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop())
daSub := NewDASubmitter(nil, cfg, genesis.Genesis{}, common.DefaultBlockOptions(), metrics, zerolog.Nop())
s := NewSubmitter(st, exec, cm, metrics, cfg, genesis.Genesis{}, daSub, nil, zerolog.Nop(), nil)

// prepare two consecutive blocks in store with DA included in cache
Expand Down
Loading