Skip to content

Commit

Permalink
Extract IBD management from invs relay flow to a new separated flow (#…
Browse files Browse the repository at this point in the history
…1930)

* Separate IBD to a new flow (so now invs are handled concurrently and no route capacity errors)

* Invs messages should be queued while waiting for BlockLocator msg

* Close IBD channel so that HandleIBDFlow exits too

* Apply flow separation to p2p protocol v4

* Manage the IBDRequestChannel through the Peer struct

* Some IBDs take a little longer
  • Loading branch information
michaelsutton authored Jan 24, 2022
1 parent b1b179c commit 4855d84
Show file tree
Hide file tree
Showing 12 changed files with 295 additions and 140 deletions.
26 changes: 16 additions & 10 deletions app/protocol/flows/v3/blockrelay/block_locator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,21 @@ func (flow *handleRelayInvsFlow) sendGetBlockLocator(highHash *externalapi.Domai
}

func (flow *handleRelayInvsFlow) receiveBlockLocator() (blockLocatorHashes []*externalapi.DomainHash, err error) {
message, err := flow.dequeueIncomingMessageAndSkipInvs(common.DefaultTimeout)
if err != nil {
return nil, err
}
msgBlockLocator, ok := message.(*appmessage.MsgBlockLocator)
if !ok {
return nil,
protocolerrors.Errorf(true, "received unexpected message type. "+
"expected: %s, got: %s", appmessage.CmdBlockLocator, message.Command())
for {
message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
if err != nil {
return nil, err
}

switch message := message.(type) {
case *appmessage.MsgInvRelayBlock:
flow.invsQueue = append(flow.invsQueue, message)
case *appmessage.MsgBlockLocator:
return message.BlockLocatorHashes, nil
default:
return nil,
protocolerrors.Errorf(true, "received unexpected message type. "+
"expected: %s, got: %s", appmessage.CmdBlockLocator, message.Command())
}
}
return msgBlockLocator.BlockLocatorHashes, nil
}
16 changes: 12 additions & 4 deletions app/protocol/flows/v3/blockrelay/handle_relay_invs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ type RelayInvsContext interface {
GetOrphanRoots(orphanHash *externalapi.DomainHash) ([]*externalapi.DomainHash, bool, error)
IsOrphan(blockHash *externalapi.DomainHash) bool
IsIBDRunning() bool
TrySetIBDRunning(ibdPeer *peerpkg.Peer) bool
UnsetIBDRunning()
IsRecoverableError(err error) bool
}

Expand All @@ -57,7 +55,10 @@ func HandleRelayInvs(context RelayInvsContext, incomingRoute *router.Route, outg
peer: peer,
invsQueue: make([]*appmessage.MsgInvRelayBlock, 0),
}
return flow.start()
err := flow.start()
// Currently, HandleRelayInvs flow is the only place where IBD is triggered, so the channel can be closed now
close(peer.IBDRequestChannel())
return err
}

func (flow *handleRelayInvsFlow) start() error {
Expand Down Expand Up @@ -306,7 +307,14 @@ func (flow *handleRelayInvsFlow) processOrphan(block *externalapi.DomainBlock) e
// Start IBD unless we already are in IBD
log.Debugf("Block %s is out of orphan resolution range. "+
"Attempting to start IBD against it.", blockHash)
return flow.runIBDIfNotRunning(block)

// Send the block to IBD flow via the IBDRequestChannel.
// Note that this is a non-blocking send, since if IBD is already running, there is no need to trigger it
select {
case flow.peer.IBDRequestChannel() <- block:
default:
}
return nil
}

func (flow *handleRelayInvsFlow) isGenesisVirtualSelectedParent() (bool, error) {
Expand Down
128 changes: 89 additions & 39 deletions app/protocol/flows/v3/blockrelay/ibd.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,69 @@
package blockrelay

import (
"time"

"github.com/kaspanet/kaspad/infrastructure/logger"

"github.com/kaspanet/kaspad/domain/consensus/model"

"github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/protocol/common"
peerpkg "github.com/kaspanet/kaspad/app/protocol/peer"
"github.com/kaspanet/kaspad/app/protocol/protocolerrors"
"github.com/kaspanet/kaspad/domain"
"github.com/kaspanet/kaspad/domain/consensus/model"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"github.com/kaspanet/kaspad/domain/consensus/ruleerrors"
"github.com/kaspanet/kaspad/domain/consensus/utils/consensushashing"
"github.com/kaspanet/kaspad/infrastructure/config"
"github.com/kaspanet/kaspad/infrastructure/logger"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/pkg/errors"
"time"
)

func (flow *handleRelayInvsFlow) runIBDIfNotRunning(block *externalapi.DomainBlock) error {
// IBDContext is the interface for the context needed for the HandleIBD flow.
type IBDContext interface {
Domain() domain.Domain
Config() *config.Config
OnNewBlock(block *externalapi.DomainBlock, virtualChangeSet *externalapi.VirtualChangeSet) error
OnVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet) error
OnPruningPointUTXOSetOverride() error
IsIBDRunning() bool
TrySetIBDRunning(ibdPeer *peerpkg.Peer) bool
UnsetIBDRunning()
IsRecoverableError(err error) bool
}

type handleIBDFlow struct {
IBDContext
incomingRoute, outgoingRoute *router.Route
peer *peerpkg.Peer
}

// HandleIBD handles IBD
func HandleIBD(context IBDContext, incomingRoute *router.Route, outgoingRoute *router.Route,
peer *peerpkg.Peer) error {

flow := &handleIBDFlow{
IBDContext: context,
incomingRoute: incomingRoute,
outgoingRoute: outgoingRoute,
peer: peer,
}
return flow.start()
}

func (flow *handleIBDFlow) start() error {
for {
// Wait for IBD requests triggered by other flows
block, ok := <-flow.peer.IBDRequestChannel()
if !ok {
return nil
}
err := flow.runIBDIfNotRunning(block)
if err != nil {
return err
}
}
}

func (flow *handleIBDFlow) runIBDIfNotRunning(block *externalapi.DomainBlock) error {
wasIBDNotRunning := flow.TrySetIBDRunning(flow.peer)
if !wasIBDNotRunning {
log.Debugf("IBD is already running")
Expand Down Expand Up @@ -84,7 +131,16 @@ func (flow *handleRelayInvsFlow) runIBDIfNotRunning(block *externalapi.DomainBlo
return nil
}

func (flow *handleRelayInvsFlow) logIBDFinished(isFinishedSuccessfully bool) {
func (flow *handleIBDFlow) isGenesisVirtualSelectedParent() (bool, error) {
virtualSelectedParent, err := flow.Domain().Consensus().GetVirtualSelectedParent()
if err != nil {
return false, err
}

return virtualSelectedParent.Equal(flow.Config().NetParams().GenesisHash), nil
}

func (flow *handleIBDFlow) logIBDFinished(isFinishedSuccessfully bool) {
successString := "successfully"
if !isFinishedSuccessfully {
successString = "(interrupted)"
Expand All @@ -95,7 +151,7 @@ func (flow *handleRelayInvsFlow) logIBDFinished(isFinishedSuccessfully bool) {
// findHighestSharedBlock attempts to find the highest shared block between the peer
// and this node. This method may fail because the peer and us have conflicting pruning
// points. In that case we return (nil, false, nil) so that we may stop IBD gracefully.
func (flow *handleRelayInvsFlow) findHighestSharedBlockHash(
func (flow *handleIBDFlow) findHighestSharedBlockHash(
targetHash *externalapi.DomainHash) (*externalapi.DomainHash, bool, error) {

log.Debugf("Sending a blockLocator to %s between pruning point and headers selected tip", flow.peer)
Expand Down Expand Up @@ -138,7 +194,7 @@ func (flow *handleRelayInvsFlow) findHighestSharedBlockHash(
}
}

func (flow *handleRelayInvsFlow) nextBlockLocator(lowHash, highHash *externalapi.DomainHash) (externalapi.BlockLocator, error) {
func (flow *handleIBDFlow) nextBlockLocator(lowHash, highHash *externalapi.DomainHash) (externalapi.BlockLocator, error) {
log.Debugf("Sending a blockLocator to %s between %s and %s", flow.peer, lowHash, highHash)
blockLocator, err := flow.Domain().Consensus().CreateHeadersSelectedChainBlockLocator(lowHash, highHash)
if err != nil {
Expand All @@ -156,7 +212,7 @@ func (flow *handleRelayInvsFlow) nextBlockLocator(lowHash, highHash *externalapi
return blockLocator, nil
}

func (flow *handleRelayInvsFlow) findHighestHashIndex(
func (flow *handleIBDFlow) findHighestHashIndex(
highestHash *externalapi.DomainHash, blockLocator externalapi.BlockLocator) (int, error) {

highestHashIndex := 0
Expand All @@ -181,15 +237,15 @@ func (flow *handleRelayInvsFlow) findHighestHashIndex(
// fetchHighestHash attempts to fetch the highest hash the peer knows amongst the given
// blockLocator. This method may fail because the peer and us have conflicting pruning
// points. In that case we return (nil, false, nil) so that we may stop IBD gracefully.
func (flow *handleRelayInvsFlow) fetchHighestHash(
func (flow *handleIBDFlow) fetchHighestHash(
targetHash *externalapi.DomainHash, blockLocator externalapi.BlockLocator) (*externalapi.DomainHash, bool, error) {

ibdBlockLocatorMessage := appmessage.NewMsgIBDBlockLocator(targetHash, blockLocator)
err := flow.outgoingRoute.Enqueue(ibdBlockLocatorMessage)
if err != nil {
return nil, false, err
}
message, err := flow.dequeueIncomingMessageAndSkipInvs(common.DefaultTimeout)
message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
if err != nil {
return nil, false, err
}
Expand All @@ -209,7 +265,7 @@ func (flow *handleRelayInvsFlow) fetchHighestHash(
}
}

func (flow *handleRelayInvsFlow) syncPruningPointFutureHeaders(consensus externalapi.Consensus, highestSharedBlockHash *externalapi.DomainHash,
func (flow *handleIBDFlow) syncPruningPointFutureHeaders(consensus externalapi.Consensus, highestSharedBlockHash *externalapi.DomainHash,
highHash *externalapi.DomainHash) error {

log.Infof("Downloading headers from %s", flow.peer)
Expand Down Expand Up @@ -273,15 +329,15 @@ func (flow *handleRelayInvsFlow) syncPruningPointFutureHeaders(consensus externa
}
}

func (flow *handleRelayInvsFlow) sendRequestHeaders(highestSharedBlockHash *externalapi.DomainHash,
func (flow *handleIBDFlow) sendRequestHeaders(highestSharedBlockHash *externalapi.DomainHash,
peerSelectedTipHash *externalapi.DomainHash) error {

msgGetBlockInvs := appmessage.NewMsgRequstHeaders(highestSharedBlockHash, peerSelectedTipHash)
return flow.outgoingRoute.Enqueue(msgGetBlockInvs)
}

func (flow *handleRelayInvsFlow) receiveHeaders() (msgIBDBlock *appmessage.BlockHeadersMessage, doneHeaders bool, err error) {
message, err := flow.dequeueIncomingMessageAndSkipInvs(common.DefaultTimeout)
func (flow *handleIBDFlow) receiveHeaders() (msgIBDBlock *appmessage.BlockHeadersMessage, doneHeaders bool, err error) {
message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
if err != nil {
return nil, false, err
}
Expand All @@ -300,7 +356,7 @@ func (flow *handleRelayInvsFlow) receiveHeaders() (msgIBDBlock *appmessage.Block
}
}

func (flow *handleRelayInvsFlow) processHeader(consensus externalapi.Consensus, msgBlockHeader *appmessage.MsgBlockHeader) error {
func (flow *handleIBDFlow) processHeader(consensus externalapi.Consensus, msgBlockHeader *appmessage.MsgBlockHeader) error {
header := appmessage.BlockHeaderToDomainBlockHeader(msgBlockHeader)
block := &externalapi.DomainBlock{
Header: header,
Expand Down Expand Up @@ -333,7 +389,7 @@ func (flow *handleRelayInvsFlow) processHeader(consensus externalapi.Consensus,
return nil
}

func (flow *handleRelayInvsFlow) validatePruningPointFutureHeaderTimestamps() error {
func (flow *handleIBDFlow) validatePruningPointFutureHeaderTimestamps() error {
headerSelectedTipHash, err := flow.Domain().StagingConsensus().GetHeadersSelectedTip()
if err != nil {
return err
Expand Down Expand Up @@ -367,7 +423,7 @@ func (flow *handleRelayInvsFlow) validatePruningPointFutureHeaderTimestamps() er
return nil
}

func (flow *handleRelayInvsFlow) receiveAndInsertPruningPointUTXOSet(
func (flow *handleIBDFlow) receiveAndInsertPruningPointUTXOSet(
consensus externalapi.Consensus, pruningPointHash *externalapi.DomainHash) (bool, error) {

onEnd := logger.LogAndMeasureExecutionTime(log, "receiveAndInsertPruningPointUTXOSet")
Expand All @@ -376,7 +432,7 @@ func (flow *handleRelayInvsFlow) receiveAndInsertPruningPointUTXOSet(
receivedChunkCount := 0
receivedUTXOCount := 0
for {
message, err := flow.dequeueIncomingMessageAndSkipInvs(common.DefaultTimeout)
message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -422,7 +478,7 @@ func (flow *handleRelayInvsFlow) receiveAndInsertPruningPointUTXOSet(
}
}

func (flow *handleRelayInvsFlow) syncMissingBlockBodies(highHash *externalapi.DomainHash) error {
func (flow *handleIBDFlow) syncMissingBlockBodies(highHash *externalapi.DomainHash) error {
hashes, err := flow.Domain().Consensus().GetMissingBlockBodyHashes(highHash)
if err != nil {
return err
Expand All @@ -449,7 +505,7 @@ func (flow *handleRelayInvsFlow) syncMissingBlockBodies(highHash *externalapi.Do
}

for _, expectedHash := range hashesToRequest {
message, err := flow.dequeueIncomingMessageAndSkipInvs(common.DefaultTimeout)
message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
if err != nil {
return err
}
Expand Down Expand Up @@ -489,7 +545,16 @@ func (flow *handleRelayInvsFlow) syncMissingBlockBodies(highHash *externalapi.Do
return flow.resolveVirtual()
}

func (flow *handleRelayInvsFlow) resolveVirtual() error {
func (flow *handleIBDFlow) banIfBlockIsHeaderOnly(block *externalapi.DomainBlock) error {
if len(block.Transactions) == 0 {
return protocolerrors.Errorf(true, "sent header of %s block where expected block with body",
consensushashing.BlockHash(block))
}

return nil
}

func (flow *handleIBDFlow) resolveVirtual() error {
for i := 0; ; i++ {
if i%10 == 0 {
log.Infof("Resolving virtual. This may take some time...")
Expand All @@ -510,18 +575,3 @@ func (flow *handleRelayInvsFlow) resolveVirtual() error {
}
}
}

// dequeueIncomingMessageAndSkipInvs is a convenience method to be used during
// IBD. Inv messages are expected to arrive at any given moment, but should be
// ignored while we're in IBD
func (flow *handleRelayInvsFlow) dequeueIncomingMessageAndSkipInvs(timeout time.Duration) (appmessage.Message, error) {
for {
message, err := flow.incomingRoute.DequeueWithTimeout(timeout)
if err != nil {
return nil, err
}
if _, ok := message.(*appmessage.MsgInvRelayBlock); !ok {
return message, nil
}
}
}
Loading

0 comments on commit 4855d84

Please sign in to comment.