File tree 2 files changed +17
-3
lines changed
2 files changed +17
-3
lines changed Original file line number Diff line number Diff line change @@ -32,6 +32,7 @@ type manualReader struct {
32
32
shutdownOnce sync.Once
33
33
34
34
mu sync.Mutex
35
+ isShutdown bool
35
36
externalProducers atomic.Value
36
37
37
38
temporalitySelector TemporalitySelector
@@ -67,6 +68,9 @@ func (mr *manualReader) register(p sdkProducer) {
67
68
func (mr * manualReader ) RegisterProducer (p Producer ) {
68
69
mr .mu .Lock ()
69
70
defer mr .mu .Unlock ()
71
+ if mr .isShutdown {
72
+ return
73
+ }
70
74
currentProducers := mr .externalProducers .Load ().([]Producer )
71
75
newProducers := []Producer {}
72
76
newProducers = append (newProducers , currentProducers ... )
@@ -97,6 +101,9 @@ func (mr *manualReader) Shutdown(context.Context) error {
97
101
mr .sdkProducer .Store (produceHolder {
98
102
produce : shutdownProducer {}.produce ,
99
103
})
104
+ mr .mu .Lock ()
105
+ defer mr .mu .Unlock ()
106
+ mr .isShutdown = true
100
107
// release references to Producer(s)
101
108
mr .externalProducers .Store ([]Producer {})
102
109
err = nil
Original file line number Diff line number Diff line change @@ -130,6 +130,7 @@ type periodicReader struct {
130
130
sdkProducer atomic.Value
131
131
132
132
mu sync.Mutex
133
+ isShutdown bool
133
134
externalProducers atomic.Value
134
135
135
136
timeout time.Duration
@@ -182,6 +183,9 @@ func (r *periodicReader) register(p sdkProducer) {
182
183
func (r * periodicReader ) RegisterProducer (p Producer ) {
183
184
r .mu .Lock ()
184
185
defer r .mu .Unlock ()
186
+ if r .isShutdown {
187
+ return
188
+ }
185
189
currentProducers := r .externalProducers .Load ().([]Producer )
186
190
newProducers := []Producer {}
187
191
newProducers = append (newProducers , currentProducers ... )
@@ -301,13 +305,16 @@ func (r *periodicReader) Shutdown(ctx context.Context) error {
301
305
}
302
306
}
303
307
304
- // release references to Producer(s)
305
- r .externalProducers .Store ([]Producer {})
306
-
307
308
sErr := r .exporter .Shutdown (ctx )
308
309
if err == nil || err == ErrReaderShutdown {
309
310
err = sErr
310
311
}
312
+
313
+ r .mu .Lock ()
314
+ defer r .mu .Unlock ()
315
+ r .isShutdown = true
316
+ // release references to Producer(s)
317
+ r .externalProducers .Store ([]Producer {})
311
318
})
312
319
return err
313
320
}
You can’t perform that action at this time.
0 commit comments