Skip to content

Commit

Permalink
Integrate end to end hashed chain exchange with F3
Browse files Browse the repository at this point in the history
Integrate the chain exchange mechanism with F3 host and runner. But
without touching the core GPBFT.

The implementation here leaves two major TODOs: 1) chain broadcasting
mechanism (currently coupled to GPBFT message broadcast), and 2)
partial message validation prior to buffering (currently skipped
entirely but with capped buffer sizes and re-validation by core GPBFT
once the messages are complete).

The integration introduces the concept of Partial GMessage: a GMessage
with chains replaced with the key to the chain. The work introduces a
buffer and refill mechanism that listens to the chains discovered,
un-buffers the messages having re-constructed their original GMessage
and feeds them to the participation using the existing event loop.

Part of #792
  • Loading branch information
masih committed Jan 16, 2025
1 parent fb71a30 commit a9aaf54
Show file tree
Hide file tree
Showing 9 changed files with 624 additions and 60 deletions.
122 changes: 122 additions & 0 deletions cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 41 additions & 21 deletions chainexchange/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ type PubSubChainExchange struct {
*options

// mu guards access to chains and API calls.
mu sync.Mutex
chainsWanted map[uint64]*lru.Cache[string, *chainPortion]
chainsDiscovered map[uint64]*lru.Cache[string, *chainPortion]
topic *pubsub.Topic
stop func() error
mu sync.Mutex
chainsWanted map[uint64]*lru.Cache[string, *chainPortion]
chainsDiscovered map[uint64]*lru.Cache[string, *chainPortion]
pendingCacheAsWanted chan Message
topic *pubsub.Topic
stop func() error
}

func NewPubSubChainExchange(o ...Option) (*PubSubChainExchange, error) {
Expand All @@ -45,9 +46,10 @@ func NewPubSubChainExchange(o ...Option) (*PubSubChainExchange, error) {
return nil, err
}
return &PubSubChainExchange{
options: opts,
chainsWanted: map[uint64]*lru.Cache[string, *chainPortion]{},
chainsDiscovered: map[uint64]*lru.Cache[string, *chainPortion]{},
options: opts,
chainsWanted: map[uint64]*lru.Cache[string, *chainPortion]{},
chainsDiscovered: map[uint64]*lru.Cache[string, *chainPortion]{},
pendingCacheAsWanted: make(chan Message, 100), // TODO: parameterise.
}, nil
}

Expand All @@ -64,7 +66,9 @@ func (p *PubSubChainExchange) Start(ctx context.Context) error {
}
if p.topicScoreParams != nil {
if err := p.topic.SetScoreParams(p.topicScoreParams); err != nil {
return fmt.Errorf("failed to set score params: %w", err)
// This can happen most likely due to router not supporting peer scoring. It's
// non-critical. Hence, the warning log.
log.Warnw("failed to set topic score params", "err", err)
}
}
subscription, err := p.topic.Subscribe(pubsub.WithBufferSize(p.subscriptionBufferSize))
Expand All @@ -79,17 +83,31 @@ func (p *PubSubChainExchange) Start(ctx context.Context) error {
for ctx.Err() == nil {
msg, err := subscription.Next(ctx)
if err != nil {
log.Debugw("failed to read nex message from subscription", "err", err)
log.Debugw("failed to read next message from subscription", "err", err)
continue
}
cmsg := msg.ValidatorData.(Message)
p.cacheAsDiscoveredChain(ctx, cmsg)
}
log.Debug("Stopped reading messages from chainexchange subscription.")
}()
go func() {
for ctx.Err() == nil {
select {
case <-ctx.Done():
return
case cmsg := <-p.pendingCacheAsWanted:
p.cacheAsWantedChain(ctx, cmsg)
}
}
log.Debug("Stopped caching chains as wanted.")
}()
p.stop = func() error {
cancel()
subscription.Cancel()
return p.topic.Close()
_ = p.pubsub.UnregisterTopicValidator(p.topicName)
_ = p.topic.Close()
return nil
}
return nil
}
Expand Down Expand Up @@ -124,21 +142,18 @@ func (p *PubSubChainExchange) GetChainByInstance(ctx context.Context, instance u
cacheKey := string(key)

// Check wanted keys first.
p.mu.Lock()

wanted := p.getChainsWantedAt(instance)
p.mu.Unlock()
if portion, found := wanted.Get(cacheKey); found && !portion.IsPlaceholder() {
return portion.chain, true
}

// Check if the chain for the key is discovered.
p.mu.Lock()
discovered := p.getChainsDiscoveredAt(instance)
if portion, found := discovered.Get(cacheKey); found {
// Add it to the wanted cache and remove it from the discovered cache.
wanted.Add(cacheKey, portion)
discovered.Remove(cacheKey)
p.mu.Unlock()

chain := portion.chain
if p.listener != nil {
Expand All @@ -147,7 +162,6 @@ func (p *PubSubChainExchange) GetChainByInstance(ctx context.Context, instance u
// TODO: Do we want to pull all the suffixes of the chain into wanted cache?
return chain, true
}
p.mu.Unlock()

// Otherwise, add a placeholder for the wanted key as a way to prioritise its
// retention via LRU recent-ness.
Expand All @@ -156,6 +170,8 @@ func (p *PubSubChainExchange) GetChainByInstance(ctx context.Context, instance u
}

func (p *PubSubChainExchange) getChainsWantedAt(instance uint64) *lru.Cache[string, *chainPortion] {
p.mu.Lock()
defer p.mu.Unlock()
wanted, exists := p.chainsWanted[instance]
if !exists {
wanted = p.newChainPortionCache(p.maxWantedChainsPerInstance)
Expand All @@ -165,6 +181,8 @@ func (p *PubSubChainExchange) getChainsWantedAt(instance uint64) *lru.Cache[stri
}

func (p *PubSubChainExchange) getChainsDiscoveredAt(instance uint64) *lru.Cache[string, *chainPortion] {
p.mu.Lock()
defer p.mu.Unlock()
discovered, exists := p.chainsDiscovered[instance]
if !exists {
discovered = p.newChainPortionCache(p.maxDiscoveredChainsPerInstance)
Expand Down Expand Up @@ -208,8 +226,6 @@ func (p *PubSubChainExchange) validatePubSubMessage(_ context.Context, _ peer.ID
}

func (p *PubSubChainExchange) cacheAsDiscoveredChain(ctx context.Context, cmsg Message) {
p.mu.Lock()
defer p.mu.Unlock()

wanted := p.getChainsDiscoveredAt(cmsg.Instance)
discovered := p.getChainsDiscoveredAt(cmsg.Instance)
Expand Down Expand Up @@ -245,7 +261,13 @@ func (p *PubSubChainExchange) cacheAsDiscoveredChain(ctx context.Context, cmsg M
func (p *PubSubChainExchange) Broadcast(ctx context.Context, msg Message) error {

// Optimistically cache the broadcast chain and all of its prefixes as wanted.
p.cacheAsWantedChain(ctx, msg)
select {
case p.pendingCacheAsWanted <- msg:
case <-ctx.Done():
return ctx.Err()
default:
log.Warnw("Dropping wanted cache entry. Chain exchange is too slow to process chains as wanted", "msg", msg)
}

// TODO: integrate zstd compression.
var buf bytes.Buffer
Expand All @@ -266,7 +288,6 @@ type discovery struct {

func (p *PubSubChainExchange) cacheAsWantedChain(ctx context.Context, cmsg Message) {
var notifications []discovery
p.mu.Lock()
wanted := p.getChainsWantedAt(cmsg.Instance)
for offset := len(cmsg.Chain); offset >= 0 && ctx.Err() == nil; offset-- {
// TODO: Expose internals of merkle.go so that keys can be generated
Expand All @@ -290,7 +311,6 @@ func (p *PubSubChainExchange) cacheAsWantedChain(ctx context.Context, cmsg Messa
// been evicted from the cache or not. This should be cheap enough considering the
// added complexity of tracking evictions relative to chain prefixes.
}
p.mu.Unlock()

// Notify the listener outside the lock.
if p.listener != nil {
Expand Down
41 changes: 29 additions & 12 deletions chainexchange/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package chainexchange_test

import (
"context"
"slices"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -52,32 +54,38 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) {
chain, found := subject.GetChainByInstance(ctx, instance, key)
require.False(t, found)
require.Nil(t, chain)
require.Empty(t, testListener.notifications)
require.Empty(t, testListener.getNotifications())

require.NoError(t, subject.Broadcast(ctx, chainexchange.Message{
Instance: instance,
Chain: ecChain,
}))

chain, found = subject.GetChainByInstance(ctx, instance, key)
require.True(t, found)
require.Eventually(t, func() bool {
chain, found = subject.GetChainByInstance(ctx, instance, key)
return found
}, time.Second, 100*time.Millisecond)
require.Equal(t, ecChain, chain)

baseChain := ecChain.BaseChain()
baseKey := subject.Key(baseChain)
chain, found = subject.GetChainByInstance(ctx, instance, baseKey)
require.True(t, found)
require.Eventually(t, func() bool {
chain, found = subject.GetChainByInstance(ctx, instance, baseKey)
return found
}, time.Second, 100*time.Millisecond)
require.Equal(t, baseChain, chain)

// Assert that we have received 2 notifications, because ecChain has 2 tipsets.
// First should be the ecChain, second should be the baseChain.
require.Len(t, testListener.notifications, 2)
require.Equal(t, instance, testListener.notifications[1].instance)
require.Equal(t, baseKey, testListener.notifications[1].key)
require.Equal(t, baseChain, testListener.notifications[1].chain)
require.Equal(t, instance, testListener.notifications[0].instance)
require.Equal(t, key, testListener.notifications[0].key)
require.Equal(t, ecChain, testListener.notifications[0].chain)

notifications := testListener.getNotifications()
require.Len(t, notifications, 2)
require.Equal(t, instance, notifications[1].instance)
require.Equal(t, baseKey, notifications[1].key)
require.Equal(t, baseChain, notifications[1].chain)
require.Equal(t, instance, notifications[0].instance)
require.Equal(t, key, notifications[0].key)
require.Equal(t, ecChain, notifications[0].chain)

require.NoError(t, subject.Shutdown(ctx))
}
Expand All @@ -88,13 +96,22 @@ type notification struct {
chain gpbft.ECChain
}
type listener struct {
mu sync.Mutex
notifications []notification
}

func (l *listener) NotifyChainDiscovered(_ context.Context, key chainexchange.Key, instance uint64, chain gpbft.ECChain) {
l.mu.Lock()
defer l.mu.Unlock()
l.notifications = append(l.notifications, notification{key: key, instance: instance, chain: chain})
}

func (l *listener) getNotifications() []notification {
l.mu.Lock()
defer l.mu.Unlock()
return slices.Clone(l.notifications)
}

// TODO: Add more tests, specifically:
// - validation
// - discovery through other chainexchange instance
Expand Down
2 changes: 1 addition & 1 deletion f3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ func (e *testEnv) waitForEpochFinalized(epoch int64) {
}
}
return false
}, 30*time.Second)
}, 60*time.Second)
}

if head < epoch-100 {
Expand Down
Loading

0 comments on commit a9aaf54

Please sign in to comment.