Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agreement/fuzzer/networkFacade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (n *NetworkFacade) PushDownstreamMessage(newMsg context.CancelFunc) bool {
func (n *NetworkFacade) Address() (string, bool) { return "mock network", true }

// Start - unused function
func (n *NetworkFacade) Start() {}
func (n *NetworkFacade) Start() error { return nil }

// Stop - unused function
func (n *NetworkFacade) Stop() {}
Expand Down
4 changes: 2 additions & 2 deletions agreement/gossip/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (w *whiteholeNetwork) GetHTTPRequestConnection(request *http.Request) (conn
return nil
}

func (w *whiteholeNetwork) Start() {
func (w *whiteholeNetwork) Start() error {
w.quit = make(chan struct{})
go func(w *whiteholeNetwork) {
w.domain.messagesMu.Lock()
Expand Down Expand Up @@ -216,7 +216,7 @@ func (w *whiteholeNetwork) Start() {
atomic.AddUint32(&w.lastMsgRead, 1)
}
}(w)
return
return nil
}
func (w *whiteholeNetwork) getMux() *network.Multiplexer {
return w.mux
Expand Down
3 changes: 2 additions & 1 deletion components/mocks/mockNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ func (network *MockNetwork) Address() (string, bool) {
}

// Start - unused function
func (network *MockNetwork) Start() {
func (network *MockNetwork) Start() error {
return nil
}

// Stop - unused function
Expand Down
11 changes: 8 additions & 3 deletions daemon/algod/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const maxHeaderBytes = 4096
type ServerNode interface {
apiServer.APINodeInterface
ListeningAddress() (string, bool)
Start()
Start() error
Stop()
}

Expand Down Expand Up @@ -272,7 +272,13 @@ func makeListener(addr string) (net.Listener, error) {
func (s *Server) Start() {
s.log.Info("Trying to start an Algorand node")
fmt.Print("Initializing the Algorand node... ")
s.node.Start()
err := s.node.Start()
if err != nil {
msg := fmt.Sprintf("Failed to start alg Algorand node: %v", err)
s.log.Error(msg)
fmt.Println(msg)
os.Exit(1)
}
s.log.Info("Successfully started an Algorand node.")
fmt.Println("Success!")

Expand All @@ -291,7 +297,6 @@ func (s *Server) Start() {
}

var apiToken string
var err error
fmt.Printf("API authentication disabled: %v\n", cfg.DisableAPIAuth)
if !cfg.DisableAPIAuth {
apiToken, err = tokens.GetAndValidateAPIToken(s.RootPath, tokens.AlgodTokenFilename)
Expand Down
2 changes: 1 addition & 1 deletion network/gossipNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type GossipNode interface {
GetPeers(options ...PeerOption) []Peer

// Start threads, listen on sockets.
Start()
Start() error

// Close sockets. Stop threads.
Stop()
Expand Down
12 changes: 6 additions & 6 deletions network/hybridNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewHybridP2PNetwork(log logging.Logger, cfg config.Local, datadir string, p
// supply alternate NetAddress for P2P network
p2pcfg := cfg
p2pcfg.NetAddress = cfg.P2PListenAddress
p2pnet, err := NewP2PNetwork(log, p2pcfg, datadir, phonebookAddresses, genesisID, networkID)
p2pnet, err := NewP2PNetwork(log, p2pcfg, datadir, phonebookAddresses, genesisID, networkID, nodeInfo)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -153,17 +153,17 @@ func (n *HybridP2PNetwork) GetPeers(options ...PeerOption) []Peer {
}

// Start implements GossipNode
func (n *HybridP2PNetwork) Start() {
_ = n.runParallel(func(net GossipNode) error {
net.Start()
return nil
func (n *HybridP2PNetwork) Start() error {
err := n.runParallel(func(net GossipNode) error {
return net.Start()
})
return err
}

// Stop implements GossipNode
func (n *HybridP2PNetwork) Stop() {
_ = n.runParallel(func(net GossipNode) error {
net.Start()
net.Stop()
return nil
})
}
Expand Down
28 changes: 16 additions & 12 deletions network/p2p/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/logging"
algoDht "github.com/algorand/go-algorand/network/p2p/dht"
"github.com/algorand/go-algorand/network/p2p/peerstore"
"github.com/algorand/go-algorand/protocol"
)

// Capability represents functions that some nodes may provide and other nodes would want to know about
Expand Down Expand Up @@ -88,7 +88,9 @@ func (c *CapabilitiesDiscovery) PeersForCapability(capability Capability, n int)
ctx, cancel := context.WithTimeout(context.Background(), operationTimeout)
defer cancel()
var peers []peer.AddrInfo
peersChan, err := c.FindPeers(ctx, string(capability), discovery.Limit(n))
// +1 because it can include self but we exclude self from the returned list
// that might confuse the caller (and tests assertions)
peersChan, err := c.FindPeers(ctx, string(capability), discovery.Limit(n+1))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -146,17 +148,19 @@ func (c *CapabilitiesDiscovery) AdvertiseCapabilities(capabilities ...Capability
}()
}

// Sizer exposes the Size method
type Sizer interface {
Size() int
}

// RoutingTable exposes some knowledge about the DHT routing table
func (c *CapabilitiesDiscovery) RoutingTable() Sizer {
return c.dht.RoutingTable()
}

// MakeCapabilitiesDiscovery creates a new CapabilitiesDiscovery object which exposes peer discovery and capabilities advertisement
func MakeCapabilitiesDiscovery(ctx context.Context, cfg config.Local, datadir string, network string, log logging.Logger, bootstrapPeers []*peer.AddrInfo) (*CapabilitiesDiscovery, error) {
pstore, err := peerstore.NewPeerStore(bootstrapPeers)
if err != nil {
return nil, err
}
h, err := makeHost(cfg, datadir, pstore)
if err != nil {
return nil, err
}
discDht, err := algoDht.MakeDHT(ctx, h, network, cfg, bootstrapPeers)
func MakeCapabilitiesDiscovery(ctx context.Context, cfg config.Local, h host.Host, networkID protocol.NetworkID, log logging.Logger, bootstrapFunc func() []peer.AddrInfo) (*CapabilitiesDiscovery, error) {
discDht, err := algoDht.MakeDHT(ctx, h, networkID, cfg, bootstrapFunc)
if err != nil {
return nil, err
}
Expand Down
22 changes: 14 additions & 8 deletions network/p2p/capabilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ func TestCapabilities_Discovery(t *testing.T) {
testSize := 3
for i := 0; i < testSize; i++ {
tempdir := t.TempDir()
capD, err := MakeCapabilitiesDiscovery(context.Background(), config.GetDefaultLocal(), tempdir, "devtestnet", logging.Base(), []*peer.AddrInfo{})
ps, err := peerstore.NewPeerStore(nil)
require.NoError(t, err)
h, _, err := MakeHost(config.GetDefaultLocal(), tempdir, ps)
require.NoError(t, err)
capD, err := MakeCapabilitiesDiscovery(context.Background(), config.GetDefaultLocal(), h, "devtestnet", logging.Base(), func() []peer.AddrInfo { return nil })
require.NoError(t, err)
caps = append(caps, capD)
addrs = append(addrs, peer.AddrInfo{
Expand All @@ -72,7 +76,7 @@ func TestCapabilities_Discovery(t *testing.T) {

func setupDHTHosts(t *testing.T, numHosts int) []*dht.IpfsDHT {
var hosts []host.Host
var bootstrapPeers []*peer.AddrInfo
var bootstrapPeers []peer.AddrInfo
var dhts []*dht.IpfsDHT
cfg := config.GetDefaultLocal()
for i := 0; i < numHosts; i++ {
Expand All @@ -87,10 +91,10 @@ func setupDHTHosts(t *testing.T, numHosts int) []*dht.IpfsDHT {
libp2p.Peerstore(ps))
require.NoError(t, err)
hosts = append(hosts, h)
bootstrapPeers = append(bootstrapPeers, &peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()})
bootstrapPeers = append(bootstrapPeers, peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()})
}
for _, h := range hosts {
ht, err := algodht.MakeDHT(context.Background(), h, "devtestnet", cfg, bootstrapPeers)
ht, err := algodht.MakeDHT(context.Background(), h, "devtestnet", cfg, func() []peer.AddrInfo { return bootstrapPeers })
require.NoError(t, err)
err = ht.Bootstrap(context.Background())
require.NoError(t, err)
Expand All @@ -117,7 +121,7 @@ func waitForRouting(t *testing.T, disc *CapabilitiesDiscovery) {

func setupCapDiscovery(t *testing.T, numHosts int, numBootstrapPeers int) []*CapabilitiesDiscovery {
var hosts []host.Host
var bootstrapPeers []*peer.AddrInfo
var bootstrapPeers []peer.AddrInfo
var capsDisc []*CapabilitiesDiscovery
cfg := config.GetDefaultLocal()
for i := 0; i < numHosts; i++ {
Expand All @@ -132,17 +136,19 @@ func setupCapDiscovery(t *testing.T, numHosts int, numBootstrapPeers int) []*Cap
libp2p.Peerstore(ps))
require.NoError(t, err)
hosts = append(hosts, h)
bootstrapPeers = append(bootstrapPeers, &peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()})
bootstrapPeers = append(bootstrapPeers, peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()})
}
for _, h := range hosts {
bp := bootstrapPeers
if numBootstrapPeers != 0 && numBootstrapPeers != numHosts {
bp = make([]peer.AddrInfo, len(bootstrapPeers))
copy(bp, bootstrapPeers)
rand.Shuffle(len(bootstrapPeers), func(i, j int) {
bp[i], bp[j] = bp[j], bp[i]
})
bp = bp[:numBootstrapPeers]
}
ht, err := algodht.MakeDHT(context.Background(), h, "devtestnet", cfg, bp)
ht, err := algodht.MakeDHT(context.Background(), h, "devtestnet", cfg, func() []peer.AddrInfo { return bp })
require.NoError(t, err)
disc, err := algodht.MakeDiscovery(ht)
require.NoError(t, err)
Expand Down Expand Up @@ -304,7 +310,7 @@ func TestCapabilities_ExcludesSelf(t *testing.T) {
disc := setupCapDiscovery(t, 2, 2)

testPeersFound := func(disc *CapabilitiesDiscovery, n int, cap Capability) bool {
peers, err := disc.PeersForCapability(cap, n+1)
peers, err := disc.PeersForCapability(cap, n)
if err == nil && len(peers) == n {
return true
}
Expand Down
46 changes: 5 additions & 41 deletions network/p2p/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,62 +32,26 @@ import (
"github.com/libp2p/go-libp2p/p2p/discovery/routing"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/network/p2p/dnsaddr"
algoproto "github.com/algorand/go-algorand/protocol"
)

const minBackoff = time.Second * 5
const maxBackoff = time.Second * 20
const baseBackoff = float64(1.1)

// getBootstrapPeersFunc looks up a list of Multiaddrs strings from the dnsaddr records at the primary
// SRV record domain.
func getBootstrapPeersFunc(cfg config.Local, network string) func() []peer.AddrInfo {
return func() []peer.AddrInfo {
var addrs []peer.AddrInfo
bootstraps := cfg.DNSBootstrapArray(algoproto.NetworkID(network))
for _, dnsBootstrap := range bootstraps {
controller := dnsaddr.NewMultiaddrDNSResolveController(cfg.DNSSecuritySRVEnforced(), "")
resolvedAddrs, err := dnsaddr.MultiaddrsFromResolver(dnsBootstrap.PrimarySRVBootstrap, controller)
if err != nil {
continue
}
for _, resolvedAddr := range resolvedAddrs {
info, err0 := peer.AddrInfoFromP2pAddr(resolvedAddr)
if err0 != nil {
continue
}
addrs = append(addrs, *info)
}
}
return addrs
}
}

func dhtProtocolPrefix(network string) protocol.ID {
return protocol.ID(fmt.Sprintf("/algorand/kad/%s", network))
func dhtProtocolPrefix(networkID algoproto.NetworkID) protocol.ID {
return protocol.ID(fmt.Sprintf("/algorand/kad/%s", networkID))
}

// MakeDHT creates the dht.IpfsDHT object
func MakeDHT(ctx context.Context, h host.Host, network string, cfg config.Local, bootstrapPeers []*peer.AddrInfo) (*dht.IpfsDHT, error) {
func MakeDHT(ctx context.Context, h host.Host, networkID algoproto.NetworkID, cfg config.Local, bootstrapFunc func() []peer.AddrInfo) (*dht.IpfsDHT, error) {
dhtCfg := []dht.Option{
// Automatically determine server or client mode
dht.Mode(dht.ModeAutoServer),
// We don't need the value store right now
dht.DisableValues(),
dht.ProtocolPrefix(dhtProtocolPrefix(network)),
}
if len(bootstrapPeers) > 0 {
var peers []peer.AddrInfo
for _, bPeer := range bootstrapPeers {
if bPeer != nil {
peers = append(peers, *bPeer)
}
}
dhtCfg = append(dhtCfg, dht.BootstrapPeers(peers...))

} else {
dhtCfg = append(dhtCfg, dht.BootstrapPeersFunc(getBootstrapPeersFunc(cfg, network)))
dht.ProtocolPrefix(dhtProtocolPrefix(networkID)),
dht.BootstrapPeersFunc(bootstrapFunc),
}
return dht.New(ctx, h, dhtCfg...)
}
Expand Down
46 changes: 2 additions & 44 deletions network/p2p/dht/dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestDHTBasic(t *testing.T) {
h,
"devtestnet",
config.GetDefaultLocal(),
[]*peer.AddrInfo{{}})
func() []peer.AddrInfo { return nil })
require.NoError(t, err)
_, err = MakeDiscovery(dht)
require.NoError(t, err)
Expand All @@ -55,52 +55,10 @@ func TestDHTBasicAlgodev(t *testing.T) {
require.NoError(t, err)
cfg := config.GetDefaultLocal()
cfg.DNSBootstrapID = "<network>.algodev.network"
dht, err := MakeDHT(context.Background(), h, "betanet", cfg, []*peer.AddrInfo{})
dht, err := MakeDHT(context.Background(), h, "betanet", cfg, func() []peer.AddrInfo { return nil })
require.NoError(t, err)
_, err = MakeDiscovery(dht)
require.NoError(t, err)
err = dht.Bootstrap(context.Background())
require.NoError(t, err)
}

func TestGetBootstrapPeers(t *testing.T) {
t.Parallel()
partitiontest.PartitionTest(t)

cfg := config.GetDefaultLocal()
cfg.DNSBootstrapID = "<network>.algodev.network"
cfg.DNSSecurityFlags = 0

addrs := getBootstrapPeersFunc(cfg, "test")()

require.GreaterOrEqual(t, len(addrs), 1)
addr := addrs[0]
require.Equal(t, len(addr.Addrs), 1)
require.GreaterOrEqual(t, len(addr.Addrs), 1)
}

func TestGetBootstrapPeersFailure(t *testing.T) {
t.Parallel()
partitiontest.PartitionTest(t)

cfg := config.GetDefaultLocal()
cfg.DNSSecurityFlags = 0
cfg.DNSBootstrapID = "non-existent.algodev.network"

addrs := getBootstrapPeersFunc(cfg, "test")()

require.Equal(t, 0, len(addrs))
}

func TestGetBootstrapPeersInvalidAddr(t *testing.T) {
t.Parallel()
partitiontest.PartitionTest(t)

cfg := config.GetDefaultLocal()
cfg.DNSSecurityFlags = 0
cfg.DNSBootstrapID = "<network>.algodev.network"

addrs := getBootstrapPeersFunc(cfg, "testInvalidAddr")()

require.Equal(t, 0, len(addrs))
}
Loading