Skip to content

Commit 73c0dcf

Browse files
committed
Add pubsub discovery.
1 parent 6533a12 commit 73c0dcf

File tree

6 files changed

+48
-13
lines changed

6 files changed

+48
-13
lines changed

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ require (
2525
github.com/libp2p/go-libp2p-blankhost v0.2.0
2626
github.com/libp2p/go-libp2p-connmgr v0.2.4
2727
github.com/libp2p/go-libp2p-core v0.7.0
28+
github.com/libp2p/go-libp2p-discovery v0.5.0
2829
github.com/libp2p/go-libp2p-kad-dht v0.11.0
2930
github.com/libp2p/go-libp2p-noise v0.1.1
3031
github.com/libp2p/go-libp2p-pubsub v0.4.1

p2p/host.go

+17-1
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@ package p2p
22

33
import (
44
"context"
5+
"sync"
56
"time"
67

78
"github.com/libp2p/go-libp2p"
89
"github.com/libp2p/go-libp2p-connmgr"
910
"github.com/libp2p/go-libp2p-core/crypto"
1011
"github.com/libp2p/go-libp2p-core/host"
12+
"github.com/libp2p/go-libp2p-core/peer"
1113
"github.com/libp2p/go-libp2p-core/routing"
14+
"github.com/libp2p/go-libp2p-kad-dht"
1215
"github.com/libp2p/go-libp2p-kad-dht/dual"
1316
"github.com/libp2p/go-libp2p-noise"
1417
"github.com/libp2p/go-libp2p-tls"
@@ -30,8 +33,8 @@ const (
3033

3134
// NewHost returns a new libp2p host and router.
3235
func NewHost(ctx context.Context, priv crypto.PrivKey) (host.Host, routing.Routing, error) {
33-
var err error
3436
var router routing.Routing
37+
var err error
3538

3639
host, err := libp2p.New(ctx,
3740
libp2p.Identity(priv),
@@ -55,3 +58,16 @@ func NewHost(ctx context.Context, priv crypto.PrivKey) (host.Host, routing.Routi
5558

5659
return host, router, err
5760
}
61+
62+
// Bootstrap initiates connections to a list of known peers.
63+
func Bootstrap(ctx context.Context, host host.Host) {
64+
var wg sync.WaitGroup
65+
for _, info := range dht.GetDefaultBootstrapPeerAddrInfos() {
66+
wg.Add(1)
67+
go func(info peer.AddrInfo) {
68+
defer wg.Done()
69+
host.Connect(ctx, info)
70+
}(info)
71+
}
72+
wg.Wait()
73+
}

p2p/namesys.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77

88
"github.com/libp2p/go-libp2p-core/host"
99
"github.com/libp2p/go-libp2p-core/peer"
10+
"github.com/libp2p/go-libp2p-core/routing"
11+
discovery "github.com/libp2p/go-libp2p-discovery"
1012
pubsub "github.com/libp2p/go-libp2p-pubsub"
1113
namesys "github.com/libp2p/go-libp2p-pubsub-router"
1214
record "github.com/libp2p/go-libp2p-record"
@@ -36,8 +38,13 @@ func PeerIDForTopic(topic string) (peer.ID, error) {
3638
}
3739

3840
// NewSystem returns a new pubsub name system.
39-
func NewNamesys(ctx context.Context, host host.Host) (*namesys.PubsubValueStore, error) {
40-
sub, err := pubsub.NewGossipSub(ctx, host)
41+
func NewNamesys(ctx context.Context, host host.Host, router routing.ContentRouting) (*namesys.PubsubValueStore, error) {
42+
var options []pubsub.Option
43+
if router != nil {
44+
options = append(options, pubsub.WithDiscovery(discovery.NewRoutingDiscovery(router)))
45+
}
46+
47+
sub, err := pubsub.NewGossipSub(ctx, host, options...)
4148
if err != nil {
4249
return nil, err
4350
}

peer/mock.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func NewMock(t *testing.T, ctx context.Context) *Mock {
3737
t.Fatal("failed to create peer config")
3838
}
3939

40-
namesys, err := p2p.NewNamesys(ctx, host)
40+
namesys, err := p2p.NewNamesys(ctx, host, nil)
4141
if err != nil {
4242
t.Fatal("failed to create peer namesys")
4343
}

peer/node.go

+19-8
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"github.com/libp2p/go-libp2p-core/host"
1717
"github.com/libp2p/go-libp2p-core/peer"
1818
"github.com/libp2p/go-libp2p-core/routing"
19-
"github.com/libp2p/go-libp2p-kad-dht"
2019
"github.com/multiverse-vcs/go-multiverse/p2p"
2120
)
2221

@@ -47,11 +46,6 @@ func New(ctx context.Context, dstore datastore.Batching, config *Config) (*Node,
4746
return nil, err
4847
}
4948

50-
namesys, err := p2p.NewNamesys(ctx, host)
51-
if err != nil {
52-
return nil, err
53-
}
54-
5549
bstore := blockstore.NewBlockstore(dstore)
5650
net := bsnet.NewFromIpfsHost(host, router)
5751
exc := bitswap.New(ctx, net, bstore)
@@ -63,16 +57,23 @@ func New(ctx context.Context, dstore datastore.Batching, config *Config) (*Node,
6357
if err != nil {
6458
return nil, err
6559
}
60+
61+
p2p.Bootstrap(ctx, host)
6662
provsys.Run()
6763

68-
for _, info := range dht.GetDefaultBootstrapPeerAddrInfos() {
69-
go host.Connect(ctx, info)
64+
namesys, err := p2p.NewNamesys(ctx, host, router)
65+
if err != nil {
66+
return nil, err
7067
}
7168

7269
if err := p2p.Discovery(ctx, host); err != nil {
7370
return nil, err
7471
}
7572

73+
if err := router.Bootstrap(ctx); err != nil {
74+
return nil, err
75+
}
76+
7677
return &Node{
7778
dag: dag,
7879
host: host,
@@ -96,6 +97,16 @@ func (n *Node) Config() *Config {
9697
return n.config
9798
}
9899

100+
// Connect connects to the peer with the given ID.
101+
func (n *Node) Connect(ctx context.Context, id peer.ID) error {
102+
info, err := n.router.FindPeer(ctx, id)
103+
if err != nil {
104+
return nil
105+
}
106+
107+
return n.host.Connect(ctx, info)
108+
}
109+
99110
// Dag returns the merkledag api.
100111
func (n *Node) Dag() ipld.DAGService {
101112
return n.dag

peer/peer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,5 @@ type Peer interface {
2323
// Namesys returns the name system.
2424
Namesys() routing.ValueStore
2525
// ResolvePath resolves the node from the given path.
26-
ResolvePath(ctx context.Context, p path.Path) (ipld.Node, error)
26+
ResolvePath(context.Context, path.Path) (ipld.Node, error)
2727
}

0 commit comments

Comments
 (0)