Skip to content

Commit

Permalink
1) Bootstrap function takes a context
Browse files Browse the repository at this point in the history
2) Discovery requests are handled by polling at the pubsub layer instead of the router layer
3) Created Topic object that can be Subscribed and Published to as well as receive events from
4) Moved events from Subscription to a TopicEventHandler so events can be received even by Publishers
5) Updated go.mod packages, includes fix for tests failing from leaking goroutines (non-versioned commit used)
  • Loading branch information
aschmahmann committed Sep 16, 2019
1 parent 3fa4b45 commit 5530761
Show file tree
Hide file tree
Showing 13 changed files with 1,039 additions and 648 deletions.
108 changes: 84 additions & 24 deletions discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
)

var (
// poll interval
DiscoveryPollInitialDelay = 0 * time.Millisecond
DiscoveryPollInterval = 1 * time.Second
)

// discover represents the discovery pipeline.
// The discovery pipeline handles advertising and discovery of peers
type discover struct {
Expand All @@ -29,11 +35,11 @@ type discover struct {
done chan string
}

type IsDiscovered func(numPeers int) bool

func CheckNumPeers(num int) IsDiscovered {
return func(numPeers int) bool {
return numPeers >= num
// MinTopicSize returns a function that checks if a router is ready for publishing based on the topic size.
// The router ultimately decides the whether it is ready or not, the given size is just a suggestion.
func MinTopicSize(size int) RouterReady {
return func(rt PubSubRouter, topic string) (bool, error) {
return rt.EnoughPeers(topic, size), nil
}
}

Expand All @@ -55,6 +61,40 @@ func (d *discover) Start(p *PubSub) {

d.p = p
go d.discoverLoop()
go d.pollTimer()
}

func (d *discover) pollTimer() {
time.Sleep(DiscoveryPollInitialDelay)
select {
case d.p.eval <- d.requestDiscovery:
case <-d.p.ctx.Done():
return
}

ticker := time.NewTicker(DiscoveryPollInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
select {
case d.p.eval <- d.requestDiscovery:
case <-d.p.ctx.Done():
return
}
case <-d.p.ctx.Done():
return
}
}
}

func (d *discover) requestDiscovery() {
for t := range d.p.pTopics {
if !d.p.rt.EnoughPeers(t, 0) {
d.discoverQ <- &discoverReq{topic: t, done: make(chan struct{}, 1)}
}
}
}

func (d *discover) discoverLoop() {
Expand Down Expand Up @@ -95,6 +135,7 @@ func (d *discover) Advertise(topic string) {
advertisingCtx, cancel := context.WithCancel(d.p.ctx)

if _, ok := d.advertising[topic]; ok {
cancel()
return
}
d.advertising[topic] = cancel
Expand Down Expand Up @@ -140,34 +181,53 @@ func (d *discover) Discover(topic string, opts ...discovery.Option) {
return
}

d.discoverQ <- &discoverReq{topic, opts, make(chan struct{})}
d.discoverQ <- &discoverReq{topic, opts, make(chan struct{}, 1)}
}

// Bootstrap attempts to bootstrap to a given topic. Returns true if it was already bootstrapped, false otherwise.
func (d *discover) Bootstrap(topic string, bs Bootstrap, opts ...discovery.Option) bool {
// Bootstrap attempts to bootstrap to a given topic. Returns true if bootstrapped successfully, false otherwise.
func (d *discover) Bootstrap(ctx context.Context, topic string, ready RouterReady, opts ...discovery.Option) bool {
if d.discovery == nil {
return true
}

bootstrapped := make(chan bool, 1)
select {
case d.p.eval <- func() {
done, _ := bs.CheckDone(d.p.rt.PeerCount(topic))
bootstrapped <- done
close(bootstrapped)
}:
if <-bootstrapped {
return true
for {
// Check if ready for publishing
bootstrapped := make(chan bool, 1)
select {
case d.p.eval <- func() {
done, _ := ready(d.p.rt, topic)
bootstrapped <- done
close(bootstrapped)
}:
if <-bootstrapped {
return true
}
case <-d.p.ctx.Done():
return false
case <-ctx.Done():
return false
}
case <-d.p.ctx.Done():
return false
}

disc := &discoverReq{topic, opts, make(chan struct{})}
d.discoverQ <- disc
<-disc.done
// If not ready discover more peers
disc := &discoverReq{topic, opts, make(chan struct{}, 1)}
select {
case d.discoverQ <- disc:
case <-d.p.ctx.Done():
return false
case <-ctx.Done():
return false
}

return false
select {
case <-disc.done:
case <-d.p.ctx.Done():
return false
case <-ctx.Done():
return false
}

time.Sleep(time.Millisecond * 100)
}
}

func (d *discover) handleDiscovery(ctx context.Context, topic string, opts []discovery.Option) {
Expand Down
40 changes: 28 additions & 12 deletions discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,19 @@ func TestSimpleDiscovery(t *testing.T) {

hosts := getNetHosts(t, ctx, numHosts)
psubs := make([]*PubSub, numHosts)
topicHandlers := make([]*Topic, numHosts)

for i, h := range hosts {
disc := &mockDiscoveryClient{h, server}
psubs[i] = getPubsub(ctx, h, WithDiscovery(disc, discOpts...))
ps := getPubsub(ctx, h, WithDiscovery(disc, discOpts...))
psubs[i] = ps
topicHandlers[i], _ = ps.Join(topic)
}

// Subscribe with all but one pubsub instance
msgs := make([]*Subscription, numHosts)
for i, ps := range psubs[1:] {
subch, err := ps.Subscribe(topic)
for i, th := range topicHandlers[1:] {
subch, err := th.Subscribe()
if err != nil {
t.Fatal(err)
}
Expand All @@ -156,14 +160,16 @@ func TestSimpleDiscovery(t *testing.T) {
}

// Try subscribing followed by publishing a single message
subch, err := psubs[0].Subscribe(topic)
subch, err := topicHandlers[0].Subscribe()
if err != nil {
t.Fatal(err)
}
msgs[0] = subch

msg := []byte("first message")
psubs[0].Publish(topic, msg, WithBootstrapParams(NewContextBootstrap(ctx, CheckNumPeers(numHosts-1))))
if err := topicHandlers[0].Publish(ctx, msg, WithReadiness(MinTopicSize(numHosts-1))); err != nil {
t.Fatal(err)
}

for _, sub := range msgs {
got, err := sub.Next(ctx)
Expand All @@ -181,7 +187,9 @@ func TestSimpleDiscovery(t *testing.T) {

owner := rand.Intn(len(psubs))

psubs[owner].Publish(topic, msg, WithBootstrapParams(NewContextBootstrap(ctx, CheckNumPeers(1))))
if err := topicHandlers[owner].Publish(ctx, msg, WithReadiness(MinTopicSize(1))); err != nil {
t.Fatal(err)
}

for _, sub := range msgs {
got, err := sub.Next(ctx)
Expand Down Expand Up @@ -212,18 +220,22 @@ func TestGossipSubDiscoveryAfterBootstrap(t *testing.T) {
// Put the pubsub clients into two partitions
hosts := getNetHosts(t, ctx, numHosts)
psubs := make([]*PubSub, numHosts)
topicHandlers := make([]*Topic, numHosts)

for i, h := range hosts {
s := server1
if i >= partitionSize {
s = server2
}
disc := &mockDiscoveryClient{h, s}
psubs[i] = getGossipsub(ctx, h, WithDiscovery(disc, discOpts...))
ps := getGossipsub(ctx, h, WithDiscovery(disc, discOpts...))
psubs[i] = ps
topicHandlers[i], _ = ps.Join(topic)
}

msgs := make([]*Subscription, numHosts)
for i, ps := range psubs {
subch, err := ps.Subscribe(topic)
for i, th := range topicHandlers {
subch, err := th.Subscribe()
if err != nil {
t.Fatal(err)
}
Expand All @@ -237,16 +249,20 @@ func TestGossipSubDiscoveryAfterBootstrap(t *testing.T) {
}

for i := 0; i < partitionSize; i++ {
server1.Advertise("floodsub:"+topic, *host.InfoFromHost(hosts[i+partitionSize]), ttl)
if _, err := server1.Advertise("floodsub:"+topic, *host.InfoFromHost(hosts[i+partitionSize]), ttl); err != nil {
t.Fatal(err)
}
}

// test the mesh
for i := 0; i < 100; i++ {
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))

owner := rand.Intn(len(psubs))
owner := rand.Intn(numHosts)

psubs[owner].Publish(topic, msg, WithBootstrapParams(NewContextBootstrap(ctx, CheckNumPeers(numHosts-1))))
if err := topicHandlers[owner].Publish(ctx, msg, WithReadiness(MinTopicSize(numHosts-1))); err != nil {
t.Fatal(err)
}

for _, sub := range msgs {
got, err := sub.Next(ctx)
Expand Down
18 changes: 14 additions & 4 deletions floodsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
)

const (
FloodSubID = protocol.ID("/floodsub/1.0.0")
FloodSubID = protocol.ID("/floodsub/1.0.0")
FloodSubTopicSearchSize = 5
)

// NewFloodsubWithProtocols returns a new floodsub-enabled PubSub objecting using the protocols specified in ps.
Expand Down Expand Up @@ -43,13 +44,22 @@ func (fs *FloodSubRouter) AddPeer(peer.ID, protocol.ID) {}

func (fs *FloodSubRouter) RemovePeer(peer.ID) {}

func (fs *FloodSubRouter) PeerCount(topic string) int {
func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool {
// check all peers in the topic
tmap, ok := fs.p.topics[topic]
if !ok {
return 0
return false
}
return len(tmap)

if suggested == 0 {
suggested = FloodSubTopicSearchSize
}

if len(tmap) >= suggested {
return true
}

return false
}

func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {}
Expand Down
Loading

0 comments on commit 5530761

Please sign in to comment.