diff --git a/block/components.go b/block/components.go index d1c0084da2..3ee2062acf 100644 --- a/block/components.go +++ b/block/components.go @@ -161,7 +161,7 @@ func NewSyncComponents( ) // Create DA submitter for sync nodes (no signer, only DA inclusion processing) - daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, logger) + daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, metrics, logger) submitter := submitting.NewSubmitter( store, exec, @@ -240,7 +240,7 @@ func NewAggregatorComponents( } // Create DA submitter for aggregator nodes (with signer for submission) - daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, logger) + daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, metrics, logger) submitter := submitting.NewSubmitter( store, exec, diff --git a/block/internal/common/metrics.go b/block/internal/common/metrics.go index b079eaef09..c543537cf5 100644 --- a/block/internal/common/metrics.go +++ b/block/internal/common/metrics.go @@ -13,6 +13,34 @@ const ( MetricsSubsystem = "sequencer" ) +// DASubmitterFailureReason represents a typed failure reason for DA submission failures +type DASubmitterFailureReason string + +const ( + DASubmitterFailureReasonAlreadyRejected DASubmitterFailureReason = "already_rejected" + DASubmitterFailureReasonInsufficientFee DASubmitterFailureReason = "insufficient_fee" + DASubmitterFailureReasonTimeout DASubmitterFailureReason = "timeout" + DASubmitterFailureReasonAlreadyInMempool DASubmitterFailureReason = "already_in_mempool" + DASubmitterFailureReasonNotIncludedInBlock DASubmitterFailureReason = "not_included_in_block" + DASubmitterFailureReasonTooBig DASubmitterFailureReason = "too_big" + DASubmitterFailureReasonContextCanceled DASubmitterFailureReason = "context_canceled" + DASubmitterFailureReasonUnknown DASubmitterFailureReason = "unknown" +) + +// AllDASubmitterFailureReasons returns all possible failure reasons +func AllDASubmitterFailureReasons() []DASubmitterFailureReason { + return []DASubmitterFailureReason{ + DASubmitterFailureReasonAlreadyRejected, + DASubmitterFailureReasonInsufficientFee, + DASubmitterFailureReasonTimeout, + DASubmitterFailureReasonAlreadyInMempool, + DASubmitterFailureReasonNotIncludedInBlock, + DASubmitterFailureReasonTooBig, + DASubmitterFailureReasonContextCanceled, + DASubmitterFailureReasonUnknown, + } +} + // Metrics contains all metrics exposed by this package. type Metrics struct { // Original metrics @@ -63,6 +91,12 @@ type Metrics struct { // State transition metrics StateTransitions map[string]metrics.Counter InvalidTransitions metrics.Counter + + // DA Submitter metrics + DASubmitterFailures map[DASubmitterFailureReason]metrics.Counter // Counter with reason label + DASubmitterLastFailure map[DASubmitterFailureReason]metrics.Gauge // Timestamp gauge with reason label + DASubmitterPendingBlobs metrics.Gauge // Total number of blobs awaiting submission (backlog) + DASubmitterResends metrics.Counter // Number of resend attempts } // PrometheusMetrics returns Metrics built using Prometheus client library @@ -73,10 +107,12 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { } m := &Metrics{ - ChannelBufferUsage: make(map[string]metrics.Gauge), - ErrorsByType: make(map[string]metrics.Counter), - OperationDuration: make(map[string]metrics.Histogram), - StateTransitions: make(map[string]metrics.Counter), + ChannelBufferUsage: make(map[string]metrics.Gauge), + ErrorsByType: make(map[string]metrics.Counter), + OperationDuration: make(map[string]metrics.Histogram), + StateTransitions: make(map[string]metrics.Counter), + DASubmitterFailures: make(map[DASubmitterFailureReason]metrics.Counter), + DASubmitterLastFailure: make(map[DASubmitterFailureReason]metrics.Gauge), } // Original metrics @@ -349,6 +385,44 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { }, labels).With(labelsAndValues...) } + // DA Submitter metrics + m.DASubmitterPendingBlobs = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "da_submitter_pending_blobs", + Help: "Total number of blobs awaiting DA submission (backlog)", + }, labels).With(labelsAndValues...) + + m.DASubmitterResends = prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "da_submitter_resends_total", + Help: "Total number of DA submission retry attempts", + }, labels).With(labelsAndValues...) + + // Initialize DA submitter failure counters and timestamps for various reasons + for _, reason := range AllDASubmitterFailureReasons() { + m.DASubmitterFailures[reason] = prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "da_submitter_failures_total", + Help: "Total number of DA submission failures by reason", + ConstLabels: map[string]string{ + "reason": string(reason), + }, + }, labels).With(labelsAndValues...) + + m.DASubmitterLastFailure[reason] = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "da_submitter_last_failure_timestamp", + Help: "Unix timestamp of the last DA submission failure by reason", + ConstLabels: map[string]string{ + "reason": string(reason), + }, + }, labels).With(labelsAndValues...) + } + return m } @@ -363,34 +437,38 @@ func NopMetrics() *Metrics { CommittedHeight: discard.NewGauge(), // Extended metrics - ChannelBufferUsage: make(map[string]metrics.Gauge), - ErrorsByType: make(map[string]metrics.Counter), - OperationDuration: make(map[string]metrics.Histogram), - StateTransitions: make(map[string]metrics.Counter), - DroppedSignals: discard.NewCounter(), - RecoverableErrors: discard.NewCounter(), - NonRecoverableErrors: discard.NewCounter(), - GoroutineCount: discard.NewGauge(), - DASubmissionAttempts: discard.NewCounter(), - DASubmissionSuccesses: discard.NewCounter(), - DASubmissionFailures: discard.NewCounter(), - DARetrievalAttempts: discard.NewCounter(), - DARetrievalSuccesses: discard.NewCounter(), - DARetrievalFailures: discard.NewCounter(), - DAInclusionHeight: discard.NewGauge(), - PendingHeadersCount: discard.NewGauge(), - PendingDataCount: discard.NewGauge(), - SyncLag: discard.NewGauge(), - HeadersSynced: discard.NewCounter(), - DataSynced: discard.NewCounter(), - BlocksApplied: discard.NewCounter(), - InvalidHeadersCount: discard.NewCounter(), - BlockProductionTime: discard.NewHistogram(), - EmptyBlocksProduced: discard.NewCounter(), - LazyBlocksProduced: discard.NewCounter(), - NormalBlocksProduced: discard.NewCounter(), - TxsPerBlock: discard.NewHistogram(), - InvalidTransitions: discard.NewCounter(), + ChannelBufferUsage: make(map[string]metrics.Gauge), + ErrorsByType: make(map[string]metrics.Counter), + OperationDuration: make(map[string]metrics.Histogram), + StateTransitions: make(map[string]metrics.Counter), + DroppedSignals: discard.NewCounter(), + RecoverableErrors: discard.NewCounter(), + NonRecoverableErrors: discard.NewCounter(), + GoroutineCount: discard.NewGauge(), + DASubmissionAttempts: discard.NewCounter(), + DASubmissionSuccesses: discard.NewCounter(), + DASubmissionFailures: discard.NewCounter(), + DARetrievalAttempts: discard.NewCounter(), + DARetrievalSuccesses: discard.NewCounter(), + DARetrievalFailures: discard.NewCounter(), + DAInclusionHeight: discard.NewGauge(), + PendingHeadersCount: discard.NewGauge(), + PendingDataCount: discard.NewGauge(), + SyncLag: discard.NewGauge(), + HeadersSynced: discard.NewCounter(), + DataSynced: discard.NewCounter(), + BlocksApplied: discard.NewCounter(), + InvalidHeadersCount: discard.NewCounter(), + BlockProductionTime: discard.NewHistogram(), + EmptyBlocksProduced: discard.NewCounter(), + LazyBlocksProduced: discard.NewCounter(), + NormalBlocksProduced: discard.NewCounter(), + TxsPerBlock: discard.NewHistogram(), + InvalidTransitions: discard.NewCounter(), + DASubmitterFailures: make(map[DASubmitterFailureReason]metrics.Counter), + DASubmitterLastFailure: make(map[DASubmitterFailureReason]metrics.Gauge), + DASubmitterPendingBlobs: discard.NewGauge(), + DASubmitterResends: discard.NewCounter(), } // Initialize maps with no-op metrics @@ -414,5 +492,11 @@ func NopMetrics() *Metrics { m.StateTransitions[transition] = discard.NewCounter() } + // Initialize DA submitter failure maps with no-op metrics + for _, reason := range AllDASubmitterFailureReasons() { + m.DASubmitterFailures[reason] = discard.NewCounter() + m.DASubmitterLastFailure[reason] = discard.NewGauge() + } + return m } diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index d47c4a2296..8b3d780b47 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -120,6 +120,7 @@ type DASubmitter struct { genesis genesis.Genesis options common.BlockOptions logger zerolog.Logger + metrics *common.Metrics // calculate namespaces bytes once and reuse them namespaceBz []byte @@ -132,6 +133,7 @@ func NewDASubmitter( config config.Config, genesis genesis.Genesis, options common.BlockOptions, + metrics *common.Metrics, logger zerolog.Logger, ) *DASubmitter { daSubmitterLogger := logger.With().Str("component", "da_submitter").Logger() @@ -141,17 +143,37 @@ func NewDASubmitter( server.SetDAVisualizationServer(server.NewDAVisualizationServer(da, visualizerLogger, config.Node.Aggregator)) } + // Use NoOp metrics if nil to avoid nil checks throughout the code + if metrics == nil { + metrics = common.NopMetrics() + } + return &DASubmitter{ da: da, config: config, genesis: genesis, options: options, + metrics: metrics, logger: daSubmitterLogger, namespaceBz: coreda.NamespaceFromString(config.DA.GetNamespace()).Bytes(), namespaceDataBz: coreda.NamespaceFromString(config.DA.GetDataNamespace()).Bytes(), } } +// recordFailure records a DA submission failure in metrics +func (s *DASubmitter) recordFailure(reason common.DASubmitterFailureReason) { + counter, ok := s.metrics.DASubmitterFailures[reason] + if !ok { + s.logger.Warn().Str("reason", string(reason)).Msg("unregistered failure reason, metric not recorded") + return + } + counter.Add(1) + + if gauge, ok := s.metrics.DASubmitterLastFailure[reason]; ok { + gauge.Set(float64(time.Now().Unix())) + } +} + // getGasMultiplier fetches the gas multiplier from DA layer with fallback and clamping func (s *DASubmitter) getGasMultiplier(ctx context.Context, pol retryPolicy) float64 { gasMultiplier, err := s.da.GasMultiplier(ctx) @@ -215,6 +237,7 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, cache cache.Manager) er s.namespaceBz, []byte(s.config.DA.SubmitOptions), cache, + func() uint64 { return cache.NumPendingHeaders() }, ) } @@ -258,6 +281,7 @@ func (s *DASubmitter) SubmitData(ctx context.Context, cache cache.Manager, signe s.namespaceDataBz, []byte(s.config.DA.SubmitOptions), cache, + func() uint64 { return cache.NumPendingData() }, ) } @@ -328,6 +352,7 @@ func submitToDA[T any]( namespace []byte, options []byte, cache cache.Manager, + getTotalPendingFn func() uint64, ) error { marshaled, err := marshalItems(ctx, items, marshalFn, itemType) if err != nil { @@ -352,8 +377,18 @@ func submitToDA[T any]( marshaled = batchMarshaled } + // Update pending blobs metric to track total backlog + if getTotalPendingFn != nil { + s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn())) + } + // Start the retry loop for rs.Attempt < pol.MaxAttempts { + // Record resend metric for retry attempts (not the first attempt) + if rs.Attempt > 0 { + s.metrics.DASubmitterResends.Add(1) + } + if err := waitForBackoffOrContext(ctx, rs.Backoff); err != nil { return err } @@ -375,14 +410,24 @@ func submitToDA[T any]( s.logger.Info().Str("itemType", itemType).Float64("gasPrice", rs.GasPrice).Uint64("count", res.SubmittedCount).Msg("successfully submitted items to DA layer") if int(res.SubmittedCount) == len(items) { rs.Next(reasonSuccess, pol, gm, sentinelNoGas) + // Update pending blobs metric to reflect total backlog + if getTotalPendingFn != nil { + s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn())) + } return nil } // partial success: advance window items = items[res.SubmittedCount:] marshaled = marshaled[res.SubmittedCount:] rs.Next(reasonSuccess, pol, gm, sentinelNoGas) + // Update pending blobs count to reflect total backlog + if getTotalPendingFn != nil { + s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn())) + } case coreda.StatusTooBig: + // Record failure metric + s.recordFailure(common.DASubmitterFailureReasonTooBig) // Iteratively halve until it fits or single-item too big if len(items) == 1 { s.logger.Error().Str("itemType", itemType).Msg("single item exceeds DA blob size limit") @@ -397,21 +442,39 @@ func submitToDA[T any]( marshaled = marshaled[:half] s.logger.Debug().Int("newBatchSize", half).Msg("batch too big; halving and retrying") rs.Next(reasonTooBig, pol, gm, sentinelNoGas) + // Update pending blobs count to reflect total backlog + if getTotalPendingFn != nil { + s.metrics.DASubmitterPendingBlobs.Set(float64(getTotalPendingFn())) + } + + case coreda.StatusNotIncludedInBlock: + // Record failure metric + s.recordFailure(common.DASubmitterFailureReasonNotIncludedInBlock) + s.logger.Info().Dur("backoff", pol.MaxBackoff).Float64("gasPrice", rs.GasPrice).Msg("retrying due to mempool state") + rs.Next(reasonMempool, pol, gm, sentinelNoGas) - case coreda.StatusNotIncludedInBlock, coreda.StatusAlreadyInMempool: + case coreda.StatusAlreadyInMempool: + // Record failure metric + s.recordFailure(common.DASubmitterFailureReasonAlreadyInMempool) s.logger.Info().Dur("backoff", pol.MaxBackoff).Float64("gasPrice", rs.GasPrice).Msg("retrying due to mempool state") rs.Next(reasonMempool, pol, gm, sentinelNoGas) case coreda.StatusContextCanceled: + // Record failure metric + s.recordFailure(common.DASubmitterFailureReasonContextCanceled) s.logger.Info().Msg("DA layer submission canceled due to context cancellation") return context.Canceled default: + // Record failure metric + s.recordFailure(common.DASubmitterFailureReasonUnknown) s.logger.Error().Str("error", res.Message).Int("attempt", rs.Attempt+1).Msg("DA layer submission failed") rs.Next(reasonFailure, pol, gm, sentinelNoGas) } } + // Final failure after max attempts + s.recordFailure(common.DASubmitterFailureReasonTimeout) return fmt.Errorf("failed to submit all %s(s) to DA layer after %d attempts", itemType, rs.Attempt) } diff --git a/block/internal/submitting/da_submitter_integration_test.go b/block/internal/submitting/da_submitter_integration_test.go index 223682f545..11713d707b 100644 --- a/block/internal/submitting/da_submitter_integration_test.go +++ b/block/internal/submitting/da_submitter_integration_test.go @@ -86,7 +86,7 @@ func TestDASubmitter_SubmitHeadersAndData_MarksInclusionAndUpdatesLastSubmitted( dummyDA := coreda.NewDummyDA(10_000_000, 0, 0, 10*time.Millisecond) // Create DA submitter - daSubmitter := NewDASubmitter(dummyDA, cfg, gen, common.DefaultBlockOptions(), zerolog.Nop()) + daSubmitter := NewDASubmitter(dummyDA, cfg, gen, common.DefaultBlockOptions(), common.NopMetrics(), zerolog.Nop()) // Submit headers and data require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), cm)) diff --git a/block/internal/submitting/da_submitter_mocks_test.go b/block/internal/submitting/da_submitter_mocks_test.go index 2775ce7d62..fc309e0638 100644 --- a/block/internal/submitting/da_submitter_mocks_test.go +++ b/block/internal/submitting/da_submitter_mocks_test.go @@ -28,7 +28,7 @@ func newTestSubmitter(mockDA *mocks.MockDA, override func(*config.Config)) *DASu if override != nil { override(&cfg) } - return NewDASubmitter(mockDA, cfg, genesis.Genesis{} /*options=*/, common.BlockOptions{}, zerolog.Nop()) + return NewDASubmitter(mockDA, cfg, genesis.Genesis{} /*options=*/, common.BlockOptions{}, common.NopMetrics(), zerolog.Nop()) } // marshal helper for simple items @@ -86,6 +86,7 @@ func TestSubmitToDA_MempoolRetry_IncreasesGasAndSucceeds(t *testing.T) { nsBz, opts, nil, + nil, ) assert.NoError(t, err) @@ -137,6 +138,7 @@ func TestSubmitToDA_UnknownError_RetriesSameGasThenSucceeds(t *testing.T) { nsBz, opts, nil, + nil, ) assert.NoError(t, err) assert.Equal(t, []float64{5.5, 5.5}, usedGas) @@ -193,6 +195,7 @@ func TestSubmitToDA_TooBig_HalvesBatch(t *testing.T) { nsBz, opts, nil, + nil, ) assert.NoError(t, err) assert.Equal(t, []int{4, 2}, batchSizes) @@ -242,6 +245,7 @@ func TestSubmitToDA_SentinelNoGas_PreservesGasAcrossRetries(t *testing.T) { nsBz, opts, nil, + nil, ) assert.NoError(t, err) assert.Equal(t, []float64{-1, -1}, usedGas) @@ -282,6 +286,7 @@ func TestSubmitToDA_PartialSuccess_AdvancesWindow(t *testing.T) { nsBz, opts, nil, + nil, ) assert.NoError(t, err) assert.Equal(t, 3, totalSubmitted) diff --git a/block/internal/submitting/da_submitter_test.go b/block/internal/submitting/da_submitter_test.go index c682c055e1..69af9ab1fa 100644 --- a/block/internal/submitting/da_submitter_test.go +++ b/block/internal/submitting/da_submitter_test.go @@ -56,6 +56,7 @@ func setupDASubmitterTest(t *testing.T) (*DASubmitter, store.Store, cache.Manage cfg, gen, common.DefaultBlockOptions(), + common.NopMetrics(), zerolog.Nop(), ) @@ -99,6 +100,7 @@ func TestNewDASubmitterSetsVisualizerWhenEnabled(t *testing.T) { cfg, genesis.Genesis{}, common.DefaultBlockOptions(), + common.NopMetrics(), zerolog.Nop(), ) diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index f86d7deb45..c05ebe3297 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -159,7 +159,7 @@ func TestSubmitter_setSequencerHeightToDAHeight(t *testing.T) { cfg := config.DefaultConfig() metrics := common.NopMetrics() - daSub := NewDASubmitter(nil, cfg, genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + daSub := NewDASubmitter(nil, cfg, genesis.Genesis{}, common.DefaultBlockOptions(), metrics, zerolog.Nop()) s := NewSubmitter(mockStore, nil, cm, metrics, cfg, genesis.Genesis{}, daSub, nil, zerolog.Nop(), nil) s.ctx = ctx @@ -238,7 +238,7 @@ func TestSubmitter_processDAInclusionLoop_advances(t *testing.T) { exec.On("SetFinal", mock.Anything, uint64(1)).Return(nil).Once() exec.On("SetFinal", mock.Anything, uint64(2)).Return(nil).Once() - daSub := NewDASubmitter(nil, cfg, genesis.Genesis{}, common.DefaultBlockOptions(), zerolog.Nop()) + daSub := NewDASubmitter(nil, cfg, genesis.Genesis{}, common.DefaultBlockOptions(), metrics, zerolog.Nop()) s := NewSubmitter(st, exec, cm, metrics, cfg, genesis.Genesis{}, daSub, nil, zerolog.Nop(), nil) // prepare two consecutive blocks in store with DA included in cache