Skip to content

Commit

Permalink
Refactoring including:
Browse files Browse the repository at this point in the history
1) Moved confirmation of peer connection success/failure from pubsub into discovery
2) Only Public functions are used outside of discovery.go, and they all check for an unset discovery.go
3) Boostrapping check uses RWMutex + Map instead of sync.Map, and uses maps of channels instead of mutexes
  • Loading branch information
aschmahmann committed Jun 13, 2019
1 parent d06b14e commit 6edf7ec
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 158 deletions.
253 changes: 189 additions & 64 deletions discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,122 +20,193 @@ type discover struct {
advertising map[string]context.CancelFunc

// bootstrapped tracks which topics have been bootstrapped
bootstrapped sync.Map
bootstrapped map[string]chan struct{}

// discover handles continuing peer discovery
discover chan *Discover
bootstrappedMux sync.RWMutex

// ongoingDiscovery tracks ongoing discovery requests
ongoingDiscovery map[string]struct{}
// discoverQ handles continuing peer discovery
discoverQ chan *discoverReq

// doneDiscovery handles completion of a discovery request
doneDiscovery chan string
// ongoing tracks ongoing discovery requests
ongoing map[string]struct{}

// done handles completion of a discovery request
done chan string

// peerNotifier processes all peer notifications
peerNotifier chan peer.ID

// tracks the channels subscribed to new peer notifications
newPeerNotif map[chan peer.ID]*peerEventTracker

// setup new peer notifier
addPeerNotif chan *reqNewPeerNotifier

// remove peer notifier
rmPeerNotif chan chan peer.ID
}

// newValidation creates a new validation pipeline
func newDiscover() *discover {
return &discover{
bootstrapped: sync.Map{},
discover: make(chan *Discover),
ongoingDiscovery: make(map[string]struct{}),
doneDiscovery: make(chan string),
advertising: make(map[string]context.CancelFunc),
advertising: make(map[string]context.CancelFunc),
bootstrapped: make(map[string]chan struct{}),
discoverQ: make(chan *discoverReq, 32),
ongoing: make(map[string]struct{}),
done: make(chan string),
peerNotifier: make(chan peer.ID, 32),
newPeerNotif: make(map[chan peer.ID]*peerEventTracker),
addPeerNotif: make(chan *reqNewPeerNotifier),
rmPeerNotif: make(chan chan peer.ID),
}
}

// Start attaches the discovery pipeline to a pubsub instance and starts event loop
func (d *discover) Start(p *PubSub) {
if d.discovery == nil {
return
}

d.p = p
go d.discoverLoop()
}

func (d *discover) discoverLoop() {
for {
select {
case discover := <-d.discover:
case discover := <-d.discoverQ:
topic := discover.topic
if _, ok := d.ongoingDiscovery[topic]; !ok {
d.ongoingDiscovery[topic] = struct{}{}
if _, ok := d.ongoing[topic]; !ok {
d.ongoing[topic] = struct{}{}
go func() {
d.handleDiscovery(d.p.ctx, topic, discover.opts)
d.doneDiscovery <- topic
d.done <- topic
}()
}
case topic := <-d.doneDiscovery:
delete(d.ongoingDiscovery, topic)
case topic := <-d.done:
delete(d.ongoing, topic)
case add := <-d.addPeerNotif:
ch := make(chan peer.ID, 10)
sentPeers := &peerEventTracker{peers: make(map[peer.ID]struct{})}
d.newPeerNotif[ch] = sentPeers
add.resp <- ch
go func() {
topicPeers := d.p.ListPeers(add.topic)
sentPeers.mux.RLock()
sendPeers := peerIntersect(topicPeers, sentPeers.peers)
sentPeers.mux.RUnlock()

sentPeers.mux.Lock()
for _, pid := range sendPeers {
if _, ok := sentPeers.peers[pid]; !ok {
sentPeers.peers[pid] = struct{}{}
ch <- pid
}
}
sentPeers.mux.Unlock()
}()
case rm := <-d.rmPeerNotif:
delete(d.newPeerNotif, rm)
case pid := <-d.peerNotifier:
d.notifyNewPeer(pid)
case <-d.p.ctx.Done():
log.Info("pubsub discoverloop shutting down")
return
}
}
}

// advertise advertises this node's interest in a topic to a discovery service
func (d *discover) advertise(topic string) {
if d.discovery != nil {
advertisingCtx, cancel := context.WithCancel(d.p.ctx)
// Advertise advertises this node's interest in a topic to a discovery service
func (d *discover) Advertise(topic string) {
if d.discovery == nil {
return
}

if _, ok := d.advertising[topic]; ok {
return
}
d.advertising[topic] = cancel
advertisingCtx, cancel := context.WithCancel(d.p.ctx)

go func() {
next, err := d.discovery.Advertise(advertisingCtx, topic)
if err != nil {
log.Warningf("bootstrap: error providing rendezvous for %s: %s", topic, err.Error())
}
if _, ok := d.advertising[topic]; ok {
return
}
d.advertising[topic] = cancel

t := time.NewTimer(next)
for {
select {
case <-t.C:
next, err = d.discovery.Advertise(advertisingCtx, topic)
if err != nil {
log.Warningf("bootstrap: error providing rendezvous for %s: %s", topic, err.Error())
}
t.Reset(next)
case <-advertisingCtx.Done():
t.Stop()
return
go func() {
next, err := d.discovery.Advertise(advertisingCtx, topic)
if err != nil {
log.Warningf("bootstrap: error providing rendezvous for %s: %s", topic, err.Error())
}

t := time.NewTimer(next)
for {
select {
case <-t.C:
next, err = d.discovery.Advertise(advertisingCtx, topic)
if err != nil {
log.Warningf("bootstrap: error providing rendezvous for %s: %s", topic, err.Error())
}
t.Reset(next)
case <-advertisingCtx.Done():
t.Stop()
return
}
}()
}
}
}()
}

func (d *discover) stopAdvertise(topic string) {
// StopAdvertise stops advertising this node's interest in a topic
func (d *discover) StopAdvertise(topic string) {
if d.discovery == nil {
return
}

if advertiseCancel, ok := d.advertising[topic]; ok {
advertiseCancel()
delete(d.advertising, topic)
}
}

// bootstrap handles initial bootstrapping of peers for a given topic
func (d *discover) bootstrap(async bool, topic string, dOpts ...discovery.Option) {
newMux := &sync.Mutex{}
newMux.Lock()
bootstrapMux, bootstrapStarted := d.bootstrapped.LoadOrStore(topic, newMux)
// Bootstrap handles initial bootstrapping of peers for a given topic
func (d *discover) Bootstrap(async bool, topic string, dOpts ...discovery.Option) {
if d.discovery == nil {
return
}

d.bootstrappedMux.RLock()
doneCh, bootstrapStarted := d.bootstrapped[topic]
d.bootstrappedMux.RUnlock()

if bootstrapStarted {
newMux.Unlock()
if async {
return
if !async {
_ = <-doneCh
}
mux := bootstrapMux.(*sync.Mutex)
mux.Lock()
mux.Unlock()
} else {
d.bootstrappedMux.Lock()
doneCh, bootstrapStarted := d.bootstrapped[topic]
if bootstrapStarted {
if !async {
_ = <-doneCh
}
d.bootstrappedMux.Unlock()
return
}
ch := make(chan struct{}, 1)
d.bootstrapped[topic] = ch
d.bootstrappedMux.Unlock()

d.handleDiscovery(d.p.ctx, topic, dOpts)
newMux.Unlock()
ch <- struct{}{}
close(ch)
}
}

func (d *discover) handleDiscovery(ctx context.Context, topic string, opts []discovery.Option) {
// Discover searches for additional peers interested in a given topic
func (d *discover) Discover(topic string, opts ...discovery.Option) {
if d.discovery == nil {
return
}

d.discoverQ <- &discoverReq{topic, opts}
}

func (d *discover) handleDiscovery(ctx context.Context, topic string, opts []discovery.Option) {
discoverCtx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

Expand Down Expand Up @@ -183,10 +254,10 @@ loop:
}

if len(findPeers) > 0 {
req := &reqNewPeerNotifier{initPeers: findPeers, resp: make(chan chan peer.ID, 1)}
d.p.addPeerNotif <- req
req := &reqNewPeerNotifier{topic: topic, resp: make(chan chan peer.ID, 1)}
d.addPeerNotif <- req
peerCh := <-req.resp
defer func() { d.p.rmPeerNotif <- peerCh }()
defer func() { d.rmPeerNotif <- peerCh }()
for {
select {
case pid := <-peerCh:
Expand All @@ -205,7 +276,61 @@ loop:
}
}

type Discover struct {
func (d *discover) NotifyNewPeer(pid peer.ID) {
if d.discovery == nil {
return
}

select {
case d.peerNotifier <- pid:
default:
log.Error("dropped msg")
}
}

func (d *discover) notifyNewPeer(pid peer.ID) {
for ch, tracker := range d.newPeerNotif {
tracker.mux.Lock()
if _, ok := tracker.peers[pid]; !ok {
select {
case ch <- pid:
tracker.peers[pid] = struct{}{}
default:
log.Error("dropped msg")
}
}
tracker.mux.Unlock()
}
}

func peerIntersect(peerArr []peer.ID, peerSet map[peer.ID]struct{}) []peer.ID {
maxSize := len(peerSet)

if maxSize > len(peerArr) {
maxSize = len(peerArr)
}

peerBuffer := make([]peer.ID, 0, maxSize)
for _, pid := range peerArr {
if _, ok := peerSet[pid]; ok {
peerBuffer = append(peerBuffer, pid)
}
}

return peerBuffer
}

type reqNewPeerNotifier struct {
topic string
resp chan chan peer.ID
}

type peerEventTracker struct {
mux sync.RWMutex
peers map[peer.ID]struct{}
}

type discoverReq struct {
topic string
opts []discovery.Option
}
Expand Down
2 changes: 1 addition & 1 deletion gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func (gs *GossipSubRouter) heartbeat() {
if len(peers) < GossipSubDlo {
rediscoverTopic := topic
go func() {
gs.p.disc.discover <- &Discover{rediscoverTopic, []discovery.Option{discovery.Limit(GossipSubNPeers)}}
gs.p.disc.Discover(rediscoverTopic, discovery.Limit(GossipSubNPeers))
}()
}

Expand Down
Loading

0 comments on commit 6edf7ec

Please sign in to comment.