Skip to content

Commit 13ac02f

Browse files
renaynayWondertan
authored andcommitted
fix(metrics): Unregister callback on stop for several metrics implementations (#3281)
Self explanatory
1 parent ba50a25 commit 13ac02f

File tree

15 files changed

+131
-23
lines changed

15 files changed

+131
-23
lines changed

das/daser.go

+5
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,11 @@ func (d *DASer) Stop(ctx context.Context) error {
132132
}
133133

134134
d.cancel()
135+
136+
if err := d.sampler.metrics.close(); err != nil {
137+
log.Warnw("closing metrics", "err", err)
138+
}
139+
135140
if err = d.sampler.wait(ctx); err != nil {
136141
return fmt.Errorf("DASer force quit: %w", err)
137142
}

das/metrics.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ type metrics struct {
2929
newHead metric.Int64Counter
3030

3131
lastSampledTS uint64
32+
33+
clientReg metric.Registration
3234
}
3335

3436
func (d *DASer) InitMetrics() error {
@@ -119,7 +121,7 @@ func (d *DASer) InitMetrics() error {
119121
return nil
120122
}
121123

122-
_, err = meter.RegisterCallback(callback,
124+
d.sampler.metrics.clientReg, err = meter.RegisterCallback(callback,
123125
lastSampledTS,
124126
busyWorkers,
125127
networkHead,
@@ -133,6 +135,13 @@ func (d *DASer) InitMetrics() error {
133135
return nil
134136
}
135137

138+
func (m *metrics) close() error {
139+
if m == nil {
140+
return nil
141+
}
142+
return m.clientReg.Unregister()
143+
}
144+
136145
// observeSample records the time it took to sample a header +
137146
// the amount of sampled contiguous headers
138147
func (m *metrics) observeSample(

nodebuilder/node/metrics.go

+18-3
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,15 @@ import (
44
"context"
55
"time"
66

7+
logging "github.com/ipfs/go-log/v2"
78
"go.opentelemetry.io/otel"
89
"go.opentelemetry.io/otel/attribute"
910
"go.opentelemetry.io/otel/metric"
11+
"go.uber.org/fx"
1012
)
1113

14+
var log = logging.Logger("module/node")
15+
1216
var meter = otel.Meter("node")
1317

1418
var (
@@ -17,7 +21,7 @@ var (
1721
)
1822

1923
// WithMetrics registers node metrics.
20-
func WithMetrics() error {
24+
func WithMetrics(lc fx.Lifecycle) error {
2125
nodeStartTS, err := meter.Int64ObservableGauge(
2226
"node_start_ts",
2327
metric.WithDescription("timestamp when the node was started"),
@@ -66,7 +70,18 @@ func WithMetrics() error {
6670
return nil
6771
}
6872

69-
_, err = meter.RegisterCallback(callback, nodeStartTS, totalNodeRunTime, buildInfoGauge)
73+
clientReg, err := meter.RegisterCallback(callback, nodeStartTS, totalNodeRunTime, buildInfoGauge)
74+
if err != nil {
75+
return nil
76+
}
7077

71-
return err
78+
lc.Append(
79+
fx.Hook{OnStop: func(context.Context) error {
80+
if err := clientReg.Unregister(); err != nil {
81+
log.Warn("failed to close metrics", "err", err)
82+
}
83+
return nil
84+
}},
85+
)
86+
return nil
7287
}

nodebuilder/settings.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,11 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti
8484
baseComponents := fx.Options(
8585
fx.Supply(metricOpts),
8686
fx.Invoke(initializeMetrics),
87-
fx.Invoke(func(ca *state.CoreAccessor) {
87+
fx.Invoke(func(lc fx.Lifecycle, ca *state.CoreAccessor) {
8888
if ca == nil {
8989
return
9090
}
91-
state.WithMetrics(ca)
91+
state.WithMetrics(lc, ca)
9292
}),
9393
fx.Invoke(fraud.WithMetrics[*header.ExtendedHeader]),
9494
fx.Invoke(node.WithMetrics),

share/eds/cache/accessor_cache.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -222,10 +222,13 @@ func (bc *AccessorCache) Remove(key shard.Key) error {
222222
}
223223

224224
// EnableMetrics enables metrics for the cache.
225-
func (bc *AccessorCache) EnableMetrics() error {
225+
func (bc *AccessorCache) EnableMetrics() (CloseMetricsFn, error) {
226226
var err error
227227
bc.metrics, err = newMetrics(bc)
228-
return err
228+
if err != nil {
229+
return nil, err
230+
}
231+
return bc.metrics.close, err
229232
}
230233

231234
// refCloser manages references to accessor from provided reader and removes the ref, when the

share/eds/cache/cache.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ var (
2020
errCacheMiss = errors.New("accessor not found in blockstore cache")
2121
)
2222

23+
type CloseMetricsFn func() error
24+
2325
// Cache is an interface that defines the basic Cache operations.
2426
type Cache interface {
2527
// Get retrieves an item from the Cache.
@@ -37,7 +39,7 @@ type Cache interface {
3739
Remove(shard.Key) error
3840

3941
// EnableMetrics enables metrics in Cache
40-
EnableMetrics() error
42+
EnableMetrics() (CloseMetricsFn, error)
4143
}
4244

4345
// Accessor is a interface type returned by cache, that allows to read raw data by reader or create

share/eds/cache/doublecache.go

+15-4
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,20 @@ func (mc *DoubleCache) Second() Cache {
4343
return mc.second
4444
}
4545

46-
func (mc *DoubleCache) EnableMetrics() error {
47-
if err := mc.first.EnableMetrics(); err != nil {
48-
return err
46+
func (mc *DoubleCache) EnableMetrics() (CloseMetricsFn, error) {
47+
firstCloser, err := mc.first.EnableMetrics()
48+
if err != nil {
49+
return nil, err
4950
}
50-
return mc.second.EnableMetrics()
51+
secondCloser, err := mc.second.EnableMetrics()
52+
if err != nil {
53+
return nil, err
54+
}
55+
56+
return func() error {
57+
if err := errors.Join(firstCloser(), secondCloser()); err != nil {
58+
log.Warnw("failed to close metrics", "err", err)
59+
}
60+
return nil
61+
}, nil
5162
}

share/eds/cache/metrics.go

+15-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ const (
1515
type metrics struct {
1616
getCounter metric.Int64Counter
1717
evictedCounter metric.Int64Counter
18+
19+
clientReg metric.Registration
1820
}
1921

2022
func newMetrics(bc *AccessorCache) (*metrics, error) {
@@ -43,12 +45,23 @@ func newMetrics(bc *AccessorCache) (*metrics, error) {
4345
observer.ObserveInt64(cacheSize, int64(bc.cache.Len()))
4446
return nil
4547
}
46-
_, err = meter.RegisterCallback(callback, cacheSize)
48+
clientReg, err := meter.RegisterCallback(callback, cacheSize)
49+
if err != nil {
50+
return nil, err
51+
}
4752

4853
return &metrics{
4954
getCounter: getCounter,
5055
evictedCounter: evictedCounter,
51-
}, err
56+
clientReg: clientReg,
57+
}, nil
58+
}
59+
60+
func (m *metrics) close() error {
61+
if m == nil {
62+
return nil
63+
}
64+
return m.clientReg.Unregister()
5265
}
5366

5467
func (m *metrics) observeEvicted(failed bool) {

share/eds/cache/noop.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ func (n NoopCache) Remove(shard.Key) error {
2828
return nil
2929
}
3030

31-
func (n NoopCache) EnableMetrics() error {
32-
return nil
31+
func (n NoopCache) EnableMetrics() (CloseMetricsFn, error) {
32+
return func() error { return nil }, nil
3333
}
3434

3535
var _ Accessor = (*NoopAccessor)(nil)

share/eds/metrics.go

+18-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package eds
22

33
import (
44
"context"
5+
"errors"
56
"time"
67

78
"go.opentelemetry.io/otel"
@@ -49,6 +50,9 @@ type metrics struct {
4950

5051
longOpTime metric.Float64Histogram
5152
gcTime metric.Float64Histogram
53+
54+
clientReg metric.Registration
55+
closerFn func() error
5256
}
5357

5458
func (s *Store) WithMetrics() error {
@@ -124,7 +128,8 @@ func (s *Store) WithMetrics() error {
124128
return err
125129
}
126130

127-
if err = s.cache.Load().EnableMetrics(); err != nil {
131+
closerFn, err := s.cache.Load().EnableMetrics()
132+
if err != nil {
128133
return err
129134
}
130135

@@ -139,7 +144,8 @@ func (s *Store) WithMetrics() error {
139144
return nil
140145
}
141146

142-
if _, err := meter.RegisterCallback(callback, dagStoreShards); err != nil {
147+
clientReg, err := meter.RegisterCallback(callback, dagStoreShards)
148+
if err != nil {
143149
return err
144150
}
145151

@@ -155,10 +161,20 @@ func (s *Store) WithMetrics() error {
155161
shardFailureCount: shardFailureCount,
156162
longOpTime: longOpTime,
157163
gcTime: gcTime,
164+
clientReg: clientReg,
165+
closerFn: closerFn,
158166
}
159167
return nil
160168
}
161169

170+
func (m *metrics) close() error {
171+
if m == nil {
172+
return nil
173+
}
174+
175+
return errors.Join(m.closerFn(), m.clientReg.Unregister())
176+
}
177+
162178
func (m *metrics) observeGCtime(ctx context.Context, dur time.Duration, failed bool) {
163179
if m == nil {
164180
return

share/eds/store.go

+5
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,11 @@ func (s *Store) Start(ctx context.Context) error {
159159
// Stop stops the underlying DAGStore.
160160
func (s *Store) Stop(context.Context) error {
161161
defer s.cancel()
162+
163+
if err := s.metrics.close(); err != nil {
164+
log.Warnw("failed to close metrics", "err", err)
165+
}
166+
162167
if err := s.invertedIdx.close(); err != nil {
163168
return err
164169
}

share/p2p/discovery/discovery.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,12 @@ func (d *Discovery) Start(context.Context) error {
114114

115115
func (d *Discovery) Stop(context.Context) error {
116116
d.cancel()
117-
return d.metrics.close()
117+
118+
if err := d.metrics.close(); err != nil {
119+
log.Warnw("failed to close metrics", "err", err)
120+
}
121+
122+
return nil
118123
}
119124

120125
// Peers provides a list of discovered peers in the given topic.

share/p2p/peers/manager.go

+4
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,10 @@ func (m *Manager) Start(startCtx context.Context) error {
166166
func (m *Manager) Stop(ctx context.Context) error {
167167
m.cancel()
168168

169+
if err := m.metrics.close(); err != nil {
170+
log.Warnw("closing metrics", "err", err)
171+
}
172+
169173
// we do not need to wait for headersub and disconnected peers to finish
170174
// here, since they were never started
171175
if m.headerSub == nil && m.shrexSub == nil {

share/p2p/peers/metrics.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ type metrics struct {
6868
fullNodesPool metric.Int64ObservableGauge // attributes: pool_status
6969
blacklistedPeersByReason sync.Map
7070
blacklistedPeers metric.Int64ObservableGauge // attributes: blacklist_reason
71+
72+
clientReg metric.Registration
7173
}
7274

7375
func initMetrics(manager *Manager) (*metrics, error) {
@@ -154,13 +156,20 @@ func initMetrics(manager *Manager) (*metrics, error) {
154156
})
155157
return nil
156158
}
157-
_, err = meter.RegisterCallback(callback, shrexPools, fullNodesPool, blacklisted)
159+
metrics.clientReg, err = meter.RegisterCallback(callback, shrexPools, fullNodesPool, blacklisted)
158160
if err != nil {
159161
return nil, fmt.Errorf("registering metrics callback: %w", err)
160162
}
161163
return metrics, nil
162164
}
163165

166+
func (m *metrics) close() error {
167+
if m == nil {
168+
return nil
169+
}
170+
return m.clientReg.Unregister()
171+
}
172+
164173
func (m *metrics) observeGetPeer(
165174
ctx context.Context,
166175
source peerSource, poolSize int, waitTime time.Duration,

state/metrics.go

+13-2
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ import (
55

66
"go.opentelemetry.io/otel"
77
"go.opentelemetry.io/otel/metric"
8+
"go.uber.org/fx"
89
)
910

1011
var meter = otel.Meter("state")
1112

12-
func WithMetrics(ca *CoreAccessor) {
13+
func WithMetrics(lc fx.Lifecycle, ca *CoreAccessor) {
1314
pfbCounter, _ := meter.Int64ObservableCounter(
1415
"pfb_count",
1516
metric.WithDescription("Total count of submitted PayForBlob transactions"),
@@ -24,8 +25,18 @@ func WithMetrics(ca *CoreAccessor) {
2425
observer.ObserveInt64(lastPfbTimestamp, ca.LastPayForBlob())
2526
return nil
2627
}
27-
_, err := meter.RegisterCallback(callback, pfbCounter, lastPfbTimestamp)
28+
29+
clientReg, err := meter.RegisterCallback(callback, pfbCounter, lastPfbTimestamp)
2830
if err != nil {
2931
panic(err)
3032
}
33+
34+
lc.Append(fx.Hook{
35+
OnStop: func(context.Context) error {
36+
if err := clientReg.Unregister(); err != nil {
37+
log.Warnw("failed to close metrics", "err", err)
38+
}
39+
return nil
40+
},
41+
})
3142
}

0 commit comments

Comments
 (0)