Skip to content

Commit

Permalink
[NOD-858] Don't switch sync peer if the syncing process hasn't yet st…
Browse files Browse the repository at this point in the history
…arted with the current sync peer (#700)

* [NOD-858] Don't switch sync peer if the syncing process hasn't yet started with the current sync peer

* [NOD-858] SetShouldSendBlockLocator(false) on OnBlockLocator

* [NOD-858] Rename shouldSendBlockLocator->wasBlockLocatorRequested

* [NOD-858] Move panic to shouldReplaceSyncPeer
  • Loading branch information
someone235 authored Apr 13, 2020
1 parent 2f25595 commit 3fd647b
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 16 deletions.
46 changes: 30 additions & 16 deletions netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ type SyncManager struct {
msgChan chan interface{}
wg sync.WaitGroup
quit chan struct{}
syncPeerLock sync.Mutex

// These fields should only be accessed from the messageHandler thread
rejectedTxns map[daghash.TxID]struct{}
Expand All @@ -170,6 +171,8 @@ type SyncManager struct {
// download/sync the blockDAG from. When syncing is already running, it
// simply returns. It also examines the candidates for any which are no longer
// candidates and removes them as needed.
//
// This function MUST be called with the sync peer lock held.
func (sm *SyncManager) startSync() {
// Return now if we're already syncing.
if sm.syncPeer != nil {
Expand All @@ -189,6 +192,7 @@ func (sm *SyncManager) startSync() {
// TODO(davec): Use a better algorithm to choose the sync peer.
// For now, just pick the first available candidate.
syncPeer = peer
break
}

// Start syncing from the sync peer if one was selected.
Expand Down Expand Up @@ -294,8 +298,8 @@ func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
}

// Start syncing by choosing the best candidate if needed.
if isSyncCandidate && sm.syncPeer == nil {
sm.startSync()
if isSyncCandidate {
sm.restartSyncIfNeeded()
}
}

Expand Down Expand Up @@ -337,7 +341,7 @@ func (sm *SyncManager) stopSyncFromPeer(peer *peerpkg.Peer) {
// sync peer.
if sm.syncPeer == peer {
sm.syncPeer = nil
sm.startSync()
sm.restartSyncIfNeeded()
}
}

Expand Down Expand Up @@ -427,24 +431,34 @@ func (sm *SyncManager) current() bool {
// restartSyncIfNeeded finds a new sync candidate if we're not expecting any
// blocks from the current one.
func (sm *SyncManager) restartSyncIfNeeded() {
if sm.syncPeer != nil {
syncPeerState, exists := sm.peerStates[sm.syncPeer]
if exists {
isWaitingForBlocks := func() bool {
syncPeerState.requestQueueMtx.Lock()
defer syncPeerState.requestQueueMtx.Unlock()
return len(syncPeerState.requestedBlocks) != 0 || len(syncPeerState.requestQueues[wire.InvTypeSyncBlock].queue) != 0
}()
if isWaitingForBlocks {
return
}
}
sm.syncPeerLock.Lock()
defer sm.syncPeerLock.Unlock()

if !sm.shouldReplaceSyncPeer() {
return
}

sm.syncPeer = nil
sm.startSync()
}

func (sm *SyncManager) shouldReplaceSyncPeer() bool {
if sm.syncPeer == nil {
return true
}

syncPeerState, exists := sm.peerStates[sm.syncPeer]
if !exists {
panic(errors.Errorf("no peer state for sync peer %s", sm.syncPeer))
}

syncPeerState.requestQueueMtx.Lock()
defer syncPeerState.requestQueueMtx.Unlock()
return len(syncPeerState.requestedBlocks) == 0 &&
len(syncPeerState.requestQueues[wire.InvTypeSyncBlock].queue) == 0 &&
!sm.syncPeer.WasBlockLocatorRequested()
}

// handleBlockMsg handles block messages from all peers.
func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
peer := bmsg.peer
Expand Down Expand Up @@ -905,7 +919,7 @@ func (sm *SyncManager) handleSelectedTipMsg(msg *selectedTipMsg) {
return
}
peer.SetSelectedTipHash(selectedTipHash)
sm.startSync()
sm.restartSyncIfNeeded()
}

// messageHandler is the main handler for the sync manager. It must be run as a
Expand Down
17 changes: 17 additions & 0 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,8 @@ type Peer struct {
prevGetBlockInvsLow *daghash.Hash
prevGetBlockInvsHigh *daghash.Hash

wasBlockLocatorRequested bool

// These fields keep track of statistics for the peer and are protected
// by the statsMtx mutex.
statsMtx sync.RWMutex
Expand All @@ -435,6 +437,20 @@ type Peer struct {
quit chan struct{}
}

// WasBlockLocatorRequested returns whether the node
// is expecting to get a block locator from this
// peer.
func (p *Peer) WasBlockLocatorRequested() bool {
return p.wasBlockLocatorRequested
}

// SetWasBlockLocatorRequested sets whether the node
// is expecting to get a block locator from this
// peer.
func (p *Peer) SetWasBlockLocatorRequested(wasBlockLocatorRequested bool) {
p.wasBlockLocatorRequested = wasBlockLocatorRequested
}

// String returns the peer's address and directionality as a human-readable
// string.
//
Expand Down Expand Up @@ -775,6 +791,7 @@ func (p *Peer) PushAddrMsg(addresses []*wire.NetAddress, subnetworkID *subnetwor
//
// This function is safe for concurrent access.
func (p *Peer) PushGetBlockLocatorMsg(highHash, lowHash *daghash.Hash) {
p.SetWasBlockLocatorRequested(true)
msg := wire.NewMsgGetBlockLocator(highHash, lowHash)
p.QueueMessage(msg, nil)
}
Expand Down
1 change: 1 addition & 0 deletions server/p2p/on_block_locator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
// OnBlockLocator is invoked when a peer receives a locator kaspa
// message.
func (sp *Peer) OnBlockLocator(_ *peer.Peer, msg *wire.MsgBlockLocator) {
sp.SetWasBlockLocatorRequested(false)
// Find the highest known shared block between the peers, and asks
// the block and its future from the peer. If the block is not
// found, create a lower resolution block locator and send it to
Expand Down

0 comments on commit 3fd647b

Please sign in to comment.