Skip to content

Commit b200822

Browse files
committed
Add the ability to listen for discovered chains
Expand chain exchange to accept a listener which is notified whenever a new chain is discovered. This mechanism is intended to be integrated into F3 host pubsub, whereupon receiving a partial message the host looks up its chain. When known, the chain is returned immediately. Otherwise, the host would buffer the partial message and await notification of its discovering from chain exchange. Part of #792
1 parent 135bfe6 commit b200822

File tree

4 files changed

+70
-5
lines changed

4 files changed

+70
-5
lines changed

chainexchange/chainexchange.go

+4
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,8 @@ type ChainExchange interface {
2424
RemoveChainsByInstance(context.Context, uint64) error
2525
}
2626

27+
type Listener interface {
28+
NotifyChainDiscovered(ctx context.Context, key Key, instance uint64, chain gpbft.ECChain)
29+
}
30+
2731
func (k Key) IsZero() bool { return len(k) == 0 }

chainexchange/options.go

+11
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type options struct {
2121
maxInstanceLookahead uint64
2222
maxDiscoveredChainsPerInstance int
2323
maxWantedChainsPerInstance int
24+
listener Listener
2425
}
2526

2627
func newOptions(o ...Option) (*options, error) {
@@ -132,3 +133,13 @@ func WithMaxWantedChainsPerInstance(max int) Option {
132133
return nil
133134
}
134135
}
136+
137+
func WithListener(listener Listener) Option {
138+
return func(o *options) error {
139+
if listener == nil {
140+
return errors.New("listener cannot be nil")
141+
}
142+
o.listener = listener
143+
return nil
144+
}
145+
}

chainexchange/pubsub.go

+28-4
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func (p *PubSubChainExchange) Key(chain gpbft.ECChain) Key {
107107
return rootDigest[:]
108108
}
109109

110-
func (p *PubSubChainExchange) GetChainByInstance(_ context.Context, instance uint64, key Key) (gpbft.ECChain, bool) {
110+
func (p *PubSubChainExchange) GetChainByInstance(ctx context.Context, instance uint64, key Key) (gpbft.ECChain, bool) {
111111

112112
// We do not have to take instance as input, and instead we can just search
113113
// through all the instance as they are not expected to be more than 10. The
@@ -138,8 +138,12 @@ func (p *PubSubChainExchange) GetChainByInstance(_ context.Context, instance uin
138138
// Add it to the wanted cache and remove it from the discovered cache.
139139
wanted.Add(cacheKey, portion)
140140
discovered.Remove(cacheKey)
141+
chain := portion.chain
142+
if p.listener != nil {
143+
p.listener.NotifyChainDiscovered(ctx, key, instance, chain)
144+
}
141145
// TODO: Do we want to pull all the suffixes of the chain into wanted cache?
142-
return portion.chain, true
146+
return chain, true
143147
}
144148
// Otherwise, add a placeholder for the wanted key as a way to prioritise its
145149
// retention via LRU recent-ness.
@@ -250,10 +254,15 @@ func (p *PubSubChainExchange) Broadcast(ctx context.Context, msg Message) error
250254
return nil
251255
}
252256

257+
type discovery struct {
258+
key Key
259+
instance uint64
260+
chain gpbft.ECChain
261+
}
262+
253263
func (p *PubSubChainExchange) cacheAsWantedChain(ctx context.Context, cmsg Message) {
264+
var notifications []discovery
254265
p.mu.Lock()
255-
defer p.mu.Unlock()
256-
257266
wanted := p.getChainsWantedAt(cmsg.Instance)
258267
for offset := len(cmsg.Chain); offset >= 0 && ctx.Err() == nil; offset-- {
259268
// TODO: Expose internals of merkle.go so that keys can be generated
@@ -265,11 +274,26 @@ func (p *PubSubChainExchange) cacheAsWantedChain(ctx context.Context, cmsg Messa
265274
wanted.Add(cacheKey, &chainPortion{
266275
chain: prefix,
267276
})
277+
if p.listener != nil {
278+
notifications = append(notifications, discovery{
279+
key: key,
280+
instance: cmsg.Instance,
281+
chain: prefix,
282+
})
283+
}
268284
}
269285
// Continue with the remaining prefix keys as we do not know if any of them have
270286
// been evicted from the cache or not. This should be cheap enough considering the
271287
// added complexity of tracking evictions relative to chain prefixes.
272288
}
289+
p.mu.Unlock()
290+
291+
// Notify the listener outside the lock.
292+
if p.listener != nil {
293+
for _, notification := range notifications {
294+
p.listener.NotifyChainDiscovered(ctx, notification.key, notification.instance, notification.chain)
295+
}
296+
}
273297
}
274298

275299
func (p *PubSubChainExchange) RemoveChainsByInstance(_ context.Context, instance uint64) error {

chainexchange/pubsub_test.go

+27-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) {
1616
const topicName = "fish"
1717
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
1818
var testInstant gpbft.Instant
19+
var testListener listener
1920
host, err := libp2p.New()
2021
require.NoError(t, err)
2122
t.Cleanup(func() {
@@ -33,6 +34,7 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) {
3334
chainexchange.WithPubSub(ps),
3435
chainexchange.WithTopicName(topicName),
3536
chainexchange.WithTopicScoreParams(nil),
37+
chainexchange.WithListener(&testListener),
3638
)
3739
require.NoError(t, err)
3840
require.NotNil(t, subject)
@@ -50,6 +52,7 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) {
5052
chain, found := subject.GetChainByInstance(ctx, instance, key)
5153
require.False(t, found)
5254
require.Nil(t, chain)
55+
require.Empty(t, testListener.notifications)
5356

5457
require.NoError(t, subject.Broadcast(ctx, chainexchange.Message{
5558
Instance: instance,
@@ -66,11 +69,34 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) {
6669
require.True(t, found)
6770
require.Equal(t, baseChain, chain)
6871

72+
// Assert that we have received 2 notifications, because ecChain has 2 tipsets.
73+
// First should be the ecChain, second should be the baseChain.
74+
require.Len(t, testListener.notifications, 2)
75+
require.Equal(t, instance, testListener.notifications[1].instance)
76+
require.Equal(t, baseKey, testListener.notifications[1].key)
77+
require.Equal(t, baseChain, testListener.notifications[1].chain)
78+
require.Equal(t, instance, testListener.notifications[0].instance)
79+
require.Equal(t, key, testListener.notifications[0].key)
80+
require.Equal(t, ecChain, testListener.notifications[0].chain)
81+
6982
require.NoError(t, subject.Shutdown(ctx))
7083
}
7184

85+
type notification struct {
86+
key chainexchange.Key
87+
instance uint64
88+
chain gpbft.ECChain
89+
}
90+
type listener struct {
91+
notifications []notification
92+
}
93+
94+
func (l *listener) NotifyChainDiscovered(_ context.Context, key chainexchange.Key, instance uint64, chain gpbft.ECChain) {
95+
l.notifications = append(l.notifications, notification{key: key, instance: instance, chain: chain})
96+
}
97+
7298
// TODO: Add more tests, specifically:
73-
// - valodation
99+
// - validation
74100
// - discovery through other chainexchange instance
75101
// - cache eviction/fixed memory footprint.
76102
// - fulfilment of chain from discovery to wanted in any order.

0 commit comments

Comments
 (0)