@@ -32,11 +32,12 @@ type PubSubChainExchange struct {
32
32
* options
33
33
34
34
// mu guards access to chains and API calls.
35
- mu sync.Mutex
36
- chainsWanted map [uint64 ]* lru.Cache [string , * chainPortion ]
37
- chainsDiscovered map [uint64 ]* lru.Cache [string , * chainPortion ]
38
- topic * pubsub.Topic
39
- stop func () error
35
+ mu sync.Mutex
36
+ chainsWanted map [uint64 ]* lru.Cache [string , * chainPortion ]
37
+ chainsDiscovered map [uint64 ]* lru.Cache [string , * chainPortion ]
38
+ pendingCacheAsWanted chan Message
39
+ topic * pubsub.Topic
40
+ stop func () error
40
41
}
41
42
42
43
func NewPubSubChainExchange (o ... Option ) (* PubSubChainExchange , error ) {
@@ -45,9 +46,10 @@ func NewPubSubChainExchange(o ...Option) (*PubSubChainExchange, error) {
45
46
return nil , err
46
47
}
47
48
return & PubSubChainExchange {
48
- options : opts ,
49
- chainsWanted : map [uint64 ]* lru.Cache [string , * chainPortion ]{},
50
- chainsDiscovered : map [uint64 ]* lru.Cache [string , * chainPortion ]{},
49
+ options : opts ,
50
+ chainsWanted : map [uint64 ]* lru.Cache [string , * chainPortion ]{},
51
+ chainsDiscovered : map [uint64 ]* lru.Cache [string , * chainPortion ]{},
52
+ pendingCacheAsWanted : make (chan Message , 100 ), // TODO: parameterise.
51
53
}, nil
52
54
}
53
55
@@ -64,7 +66,9 @@ func (p *PubSubChainExchange) Start(ctx context.Context) error {
64
66
}
65
67
if p .topicScoreParams != nil {
66
68
if err := p .topic .SetScoreParams (p .topicScoreParams ); err != nil {
67
- return fmt .Errorf ("failed to set score params: %w" , err )
69
+ // This can happen most likely due to router not supporting peer scoring. It's
70
+ // non-critical. Hence, the warning log.
71
+ log .Warnw ("failed to set topic score params" , "err" , err )
68
72
}
69
73
}
70
74
subscription , err := p .topic .Subscribe (pubsub .WithBufferSize (p .subscriptionBufferSize ))
@@ -79,17 +83,31 @@ func (p *PubSubChainExchange) Start(ctx context.Context) error {
79
83
for ctx .Err () == nil {
80
84
msg , err := subscription .Next (ctx )
81
85
if err != nil {
82
- log .Debugw ("failed to read nex message from subscription" , "err" , err )
86
+ log .Debugw ("failed to read next message from subscription" , "err" , err )
83
87
continue
84
88
}
85
89
cmsg := msg .ValidatorData .(Message )
86
90
p .cacheAsDiscoveredChain (ctx , cmsg )
87
91
}
92
+ log .Debug ("Stopped reading messages from chainexchange subscription." )
93
+ }()
94
+ go func () {
95
+ for ctx .Err () == nil {
96
+ select {
97
+ case <- ctx .Done ():
98
+ return
99
+ case cmsg := <- p .pendingCacheAsWanted :
100
+ p .cacheAsWantedChain (ctx , cmsg )
101
+ }
102
+ }
103
+ log .Debug ("Stopped caching chains as wanted." )
88
104
}()
89
105
p .stop = func () error {
90
106
cancel ()
91
107
subscription .Cancel ()
92
- return p .topic .Close ()
108
+ _ = p .pubsub .UnregisterTopicValidator (p .topicName )
109
+ _ = p .topic .Close ()
110
+ return nil
93
111
}
94
112
return nil
95
113
}
@@ -124,21 +142,18 @@ func (p *PubSubChainExchange) GetChainByInstance(ctx context.Context, instance u
124
142
cacheKey := string (key )
125
143
126
144
// Check wanted keys first.
127
- p . mu . Lock ()
145
+
128
146
wanted := p .getChainsWantedAt (instance )
129
- p .mu .Unlock ()
130
147
if portion , found := wanted .Get (cacheKey ); found && ! portion .IsPlaceholder () {
131
148
return portion .chain , true
132
149
}
133
150
134
151
// Check if the chain for the key is discovered.
135
- p .mu .Lock ()
136
152
discovered := p .getChainsDiscoveredAt (instance )
137
153
if portion , found := discovered .Get (cacheKey ); found {
138
154
// Add it to the wanted cache and remove it from the discovered cache.
139
155
wanted .Add (cacheKey , portion )
140
156
discovered .Remove (cacheKey )
141
- p .mu .Unlock ()
142
157
143
158
chain := portion .chain
144
159
if p .listener != nil {
@@ -147,7 +162,6 @@ func (p *PubSubChainExchange) GetChainByInstance(ctx context.Context, instance u
147
162
// TODO: Do we want to pull all the suffixes of the chain into wanted cache?
148
163
return chain , true
149
164
}
150
- p .mu .Unlock ()
151
165
152
166
// Otherwise, add a placeholder for the wanted key as a way to prioritise its
153
167
// retention via LRU recent-ness.
@@ -156,6 +170,8 @@ func (p *PubSubChainExchange) GetChainByInstance(ctx context.Context, instance u
156
170
}
157
171
158
172
func (p * PubSubChainExchange ) getChainsWantedAt (instance uint64 ) * lru.Cache [string , * chainPortion ] {
173
+ p .mu .Lock ()
174
+ defer p .mu .Unlock ()
159
175
wanted , exists := p .chainsWanted [instance ]
160
176
if ! exists {
161
177
wanted = p .newChainPortionCache (p .maxWantedChainsPerInstance )
@@ -165,6 +181,8 @@ func (p *PubSubChainExchange) getChainsWantedAt(instance uint64) *lru.Cache[stri
165
181
}
166
182
167
183
func (p * PubSubChainExchange ) getChainsDiscoveredAt (instance uint64 ) * lru.Cache [string , * chainPortion ] {
184
+ p .mu .Lock ()
185
+ defer p .mu .Unlock ()
168
186
discovered , exists := p .chainsDiscovered [instance ]
169
187
if ! exists {
170
188
discovered = p .newChainPortionCache (p .maxDiscoveredChainsPerInstance )
@@ -208,8 +226,6 @@ func (p *PubSubChainExchange) validatePubSubMessage(_ context.Context, _ peer.ID
208
226
}
209
227
210
228
func (p * PubSubChainExchange ) cacheAsDiscoveredChain (ctx context.Context , cmsg Message ) {
211
- p .mu .Lock ()
212
- defer p .mu .Unlock ()
213
229
214
230
wanted := p .getChainsDiscoveredAt (cmsg .Instance )
215
231
discovered := p .getChainsDiscoveredAt (cmsg .Instance )
@@ -245,7 +261,13 @@ func (p *PubSubChainExchange) cacheAsDiscoveredChain(ctx context.Context, cmsg M
245
261
func (p * PubSubChainExchange ) Broadcast (ctx context.Context , msg Message ) error {
246
262
247
263
// Optimistically cache the broadcast chain and all of its prefixes as wanted.
248
- p .cacheAsWantedChain (ctx , msg )
264
+ select {
265
+ case p .pendingCacheAsWanted <- msg :
266
+ case <- ctx .Done ():
267
+ return ctx .Err ()
268
+ default :
269
+ log .Warnw ("Dropping wanted cache entry. Chain exchange is too slow to process chains as wanted" , "msg" , msg )
270
+ }
249
271
250
272
// TODO: integrate zstd compression.
251
273
var buf bytes.Buffer
@@ -266,7 +288,6 @@ type discovery struct {
266
288
267
289
func (p * PubSubChainExchange ) cacheAsWantedChain (ctx context.Context , cmsg Message ) {
268
290
var notifications []discovery
269
- p .mu .Lock ()
270
291
wanted := p .getChainsWantedAt (cmsg .Instance )
271
292
for offset := len (cmsg .Chain ); offset >= 0 && ctx .Err () == nil ; offset -- {
272
293
// TODO: Expose internals of merkle.go so that keys can be generated
@@ -290,7 +311,6 @@ func (p *PubSubChainExchange) cacheAsWantedChain(ctx context.Context, cmsg Messa
290
311
// been evicted from the cache or not. This should be cheap enough considering the
291
312
// added complexity of tracking evictions relative to chain prefixes.
292
313
}
293
- p .mu .Unlock ()
294
314
295
315
// Notify the listener outside the lock.
296
316
if p .listener != nil {
0 commit comments