Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,7 @@ func (handler *TxHandler) Start() {

// libp2p pubsub validator and handler abstracted as TaggedMessageProcessor
handler.net.RegisterValidatorHandlers([]network.TaggedMessageValidatorHandler{
{
Tag: protocol.TxnTag,
// create anonymous struct to hold the two functions and satisfy the network.MessageProcessor interface
MessageHandler: struct {
network.ValidateHandleFunc
}{
network.ValidateHandleFunc(handler.validateIncomingTxMessage),
},
},
{Tag: protocol.TxnTag, MessageHandler: network.ValidateHandleFunc(handler.validateIncomingTxMessage)},
})

handler.backlogWg.Add(2)
Expand Down
4 changes: 2 additions & 2 deletions network/gossipNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ type GossipNode interface {
// Currently used as p2p pubsub topic validators.
RegisterValidatorHandlers(dispatch []TaggedMessageValidatorHandler)

// ClearProcessors deregisters all the existing message processors.
ClearProcessors()
// ClearValidatorHandlers deregisters all the existing message processors.
ClearValidatorHandlers()

// GetHTTPClient returns a http.Client with a suitable for the network Transport
// that would also limit the number of outgoing connections.
Expand Down
8 changes: 4 additions & 4 deletions network/hybridNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,10 @@ func (n *HybridP2PNetwork) RegisterValidatorHandlers(dispatch []TaggedMessageVal
n.wsNetwork.RegisterValidatorHandlers(dispatch)
}

// ClearProcessors deregisters all the existing message processors.
func (n *HybridP2PNetwork) ClearProcessors() {
n.p2pNetwork.ClearProcessors()
n.wsNetwork.ClearProcessors()
// ClearValidatorHandlers deregisters all the existing message processors.
func (n *HybridP2PNetwork) ClearValidatorHandlers() {
n.p2pNetwork.ClearValidatorHandlers()
n.wsNetwork.ClearValidatorHandlers()
}

// GetHTTPClient returns a http.Client with a suitable for the network Transport
Expand Down
9 changes: 6 additions & 3 deletions network/p2p/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@ func MakeHTTPServer(streamHost host.Host) *HTTPServer {
p2phttpMux: mux.NewRouter(),
}
// libp2phttp server requires either explicit ListenAddrs or streamHost.Addrs() to be non-empty.
// If streamHost.Addrs() is empty, we will listen on all interfaces
// If streamHost.Addrs() is empty (that happens when NetAddress is set to ":0" and private address filtering is automatically enabled),
// we will listen on localhost to satisfy libp2phttp.Host.Serve() requirements.
// A side effect is it actually starts listening on interfaces listed in ListenAddrs and as go-libp2p v0.33.2
// there is no other way to have libp2phttp server running AND to have streamHost.Addrs() filtered.
if len(streamHost.Addrs()) == 0 {
logging.Base().Debugf("MakeHTTPServer: no addresses for %s, asking to listen all interfaces", streamHost.ID())
logging.Base().Debugf("MakeHTTPServer: no addresses for %s, asking to listen localhost interface to satisfy libp2phttp.Host.Serve ", streamHost.ID())
httpServer.ListenAddrs = []multiaddr.Multiaddr{
multiaddr.StringCast("/ip4/0.0.0.0/tcp/0/http"),
multiaddr.StringCast("/ip4/127.0.0.1/tcp/0/http"),
}
httpServer.InsecureAllowHTTP = true
}
Expand Down
4 changes: 2 additions & 2 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,8 @@ func (n *P2PNetwork) RegisterValidatorHandlers(dispatch []TaggedMessageValidator
n.handler.RegisterValidatorHandlers(dispatch)
}

// ClearProcessors deregisters all the existing message handlers.
func (n *P2PNetwork) ClearProcessors() {
// ClearValidatorHandlers deregisters all the existing message handlers.
func (n *P2PNetwork) ClearValidatorHandlers() {
n.handler.ClearValidatorHandlers([]Tag{})
}

Expand Down
2 changes: 1 addition & 1 deletion network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ func TestP2PRelay(t *testing.T) {
counter.Store(0)
var loggedMsgs [][]byte
counterHandler, counterDone = makeCounterHandler(expectedMsgs, &counter, &loggedMsgs)
netA.ClearProcessors()
netA.ClearValidatorHandlers()
netA.RegisterValidatorHandlers(counterHandler)

for i := 0; i < expectedMsgs/2; i++ {
Expand Down
4 changes: 2 additions & 2 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,8 +825,8 @@ func (wn *WebsocketNetwork) ClearHandlers() {
func (wn *WebsocketNetwork) RegisterValidatorHandlers(dispatch []TaggedMessageValidatorHandler) {
}

// ClearProcessors deregisters all the existing message handlers.
func (wn *WebsocketNetwork) ClearProcessors() {
// ClearValidatorHandlers deregisters all the existing message handlers.
func (wn *WebsocketNetwork) ClearValidatorHandlers() {
}

func (wn *WebsocketNetwork) setHeaders(header http.Header) {
Expand Down
2 changes: 2 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ func (node *AlgorandFullNode) Stop() {
}()

node.net.ClearHandlers()
node.net.ClearValidatorHandlers()
if !node.config.DisableNetworking {
node.net.Stop()
}
Expand Down Expand Up @@ -1218,6 +1219,7 @@ func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode boo
node.waitMonitoringRoutines()
}()
node.net.ClearHandlers()
node.net.ClearValidatorHandlers()
node.stateProofWorker.Stop()
node.txHandler.Stop()
node.agreementService.Shutdown()
Expand Down
47 changes: 45 additions & 2 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,12 +606,11 @@ func TestDefaultResourcePaths(t *testing.T) {
log := logging.Base()

n, err := MakeFull(log, testDirectory, cfg, []string{}, genesis)
require.NoError(t, err)

n.Start()
defer n.Stop()

require.NoError(t, err)

// confirm genesis dir exists in the data dir, and that resources exist in the expected locations
require.DirExists(t, filepath.Join(testDirectory, genesis.ID()))

Expand Down Expand Up @@ -1073,3 +1072,47 @@ func TestNodeP2PRelays(t *testing.T) {
return len(nodes[2].net.GetPeers(network.PeersPhonebookRelays)) == 2
}, 80*time.Second, 1*time.Second)
}

// TestNodeSetCatchpointCatchupMode checks node can handle services restart for fast catchup correctly
func TestNodeSetCatchpointCatchupMode(t *testing.T) {
partitiontest.PartitionTest(t)

testDirectory := t.TempDir()

genesis := bookkeeping.Genesis{
SchemaID: "gen",
Proto: protocol.ConsensusCurrentVersion,
Network: config.Devtestnet,
FeeSink: sinkAddr.String(),
RewardsPool: poolAddr.String(),
}
log := logging.TestingLog(t)
cfg := config.GetDefaultLocal()

tests := []struct {
name string
enableP2P bool
}{
{"WS node", false},
{"P2P node", true},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cfg.EnableP2P = test.enableP2P

n, err := MakeFull(log, testDirectory, cfg, []string{}, genesis)
require.NoError(t, err)
err = n.Start()
require.NoError(t, err)
defer n.Stop()

// "start" catchpoint catchup => close services
outCh := n.SetCatchpointCatchupMode(true)
<-outCh
// "stop" catchpoint catchup => resume services
outCh = n.SetCatchpointCatchupMode(false)
<-outCh
})
}
}