Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion daemon/algod/api/algod.oas3.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7192,5 +7192,6 @@
{
"name": "private"
}
]
],
"x-original-swagger-version": "2.0"
}
4 changes: 2 additions & 2 deletions daemon/algod/api/server/v2/generated/data/routes.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

408 changes: 204 additions & 204 deletions daemon/algod/api/server/v2/generated/experimental/routes.go

Large diffs are not rendered by default.

414 changes: 207 additions & 207 deletions daemon/algod/api/server/v2/generated/nonparticipating/private/routes.go

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

157 changes: 101 additions & 56 deletions network/p2p/capabilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package p2p

import (
"context"
"math/rand"
"sync"
"testing"
"time"

Expand All @@ -36,7 +38,7 @@ import (
"github.com/algorand/go-algorand/test/partitiontest"
)

func TestCapabilitiesDiscovery(t *testing.T) {
func TestCapabilities_Discovery(t *testing.T) {
partitiontest.PartitionTest(t)

golog.SetDebugLogging()
Expand Down Expand Up @@ -113,7 +115,7 @@ func waitForRouting(t *testing.T, disc *CapabilitiesDiscovery) {
}
}

func setupCapDiscovery(t *testing.T, numHosts int) []*CapabilitiesDiscovery {
func setupCapDiscovery(t *testing.T, numHosts int, numBootstrapPeers int) []*CapabilitiesDiscovery {
var hosts []host.Host
var bootstrapPeers []*peer.AddrInfo
var capsDisc []*CapabilitiesDiscovery
Expand All @@ -133,7 +135,14 @@ func setupCapDiscovery(t *testing.T, numHosts int) []*CapabilitiesDiscovery {
bootstrapPeers = append(bootstrapPeers, &peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()})
}
for _, h := range hosts {
ht, err := algodht.MakeDHT(context.Background(), h, "devtestnet", cfg, bootstrapPeers)
bp := bootstrapPeers
if numBootstrapPeers != 0 && numBootstrapPeers != numHosts {
rand.Shuffle(len(bootstrapPeers), func(i, j int) {
bp[i], bp[j] = bp[j], bp[i]
})
bp = bp[:numBootstrapPeers]
}
ht, err := algodht.MakeDHT(context.Background(), h, "devtestnet", cfg, bp)
require.NoError(t, err)
disc, err := algodht.MakeDiscovery(ht)
require.NoError(t, err)
Expand All @@ -147,7 +156,7 @@ func setupCapDiscovery(t *testing.T, numHosts int) []*CapabilitiesDiscovery {
return capsDisc
}

func TestDHTTwoPeers(t *testing.T) {
func TestCapabilities_DHTTwoPeers(t *testing.T) {
partitiontest.PartitionTest(t)

numAdvertisers := 2
Expand Down Expand Up @@ -176,6 +185,7 @@ func TestDHTTwoPeers(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
var advertisers []peer.AddrInfo
peersChan, err := disc.FindPeers(ctx, topic, discovery.Limit(numAdvertisers))
require.NoError(t, err)
pollingForPeers:
for {
select {
Expand All @@ -194,69 +204,104 @@ func TestDHTTwoPeers(t *testing.T) {
}
}

func TestVaryingCapabilities(t *testing.T) {
func TestCapabilities_Varying(t *testing.T) {
partitiontest.PartitionTest(t)

numAdvertisers := 10
capsDisc := setupCapDiscovery(t, numAdvertisers)
noCap := capsDisc[:3]
archOnly := capsDisc[3:5]
catchOnly := capsDisc[5:7]
archCatch := capsDisc[7:]
const numAdvertisers = 10

for _, disc := range archOnly {
waitForRouting(t, disc)
disc.AdvertiseCapabilities(Archival)
}
for _, disc := range catchOnly {
waitForRouting(t, disc)
disc.AdvertiseCapabilities(Catchpoints)
}
for _, disc := range archCatch {
waitForRouting(t, disc)
disc.AdvertiseCapabilities(Archival, Catchpoints)
var tests = []struct {
name string
numBootstrap int
}{
{"bootstrap=all", numAdvertisers},
{"bootstrap=2", 2},
}

for _, disc := range noCap {
require.Eventuallyf(t,
func() bool {
numArchPeers := len(archOnly) + len(archCatch)
peers, err := disc.PeersForCapability(Archival, numArchPeers)
if err == nil && len(peers) == numArchPeers {
return true
}
return false
},
time.Minute,
time.Second,
"Not all expected archival peers were found",
)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
capsDisc := setupCapDiscovery(t, numAdvertisers, test.numBootstrap)
noCap := capsDisc[:3]
archOnly := capsDisc[3:5]
catchOnly := capsDisc[5:7]
archCatch := capsDisc[7:]

require.Eventuallyf(t,
func() bool {
numCatchPeers := len(catchOnly) + len(archCatch)
peers, err := disc.PeersForCapability(Catchpoints, numCatchPeers)
if err == nil && len(peers) == numCatchPeers {
return true
}
return false
},
time.Minute,
time.Second,
"Not all expected catchpoint peers were found",
)
}
var wg sync.WaitGroup
wg.Add(len(archOnly) + len(catchOnly) + len(archCatch))
for _, disc := range archOnly {
go func(disc *CapabilitiesDiscovery) {
defer wg.Done()
waitForRouting(t, disc)
disc.AdvertiseCapabilities(Archival)
}(disc)
}
for _, disc := range catchOnly {
go func(disc *CapabilitiesDiscovery) {
defer wg.Done()
waitForRouting(t, disc)
disc.AdvertiseCapabilities(Catchpoints)
}(disc)
}
for _, disc := range archCatch {
go func(disc *CapabilitiesDiscovery) {
defer wg.Done()
waitForRouting(t, disc)
disc.AdvertiseCapabilities(Archival, Catchpoints)
}(disc)
}

wg.Wait()

for _, disc := range capsDisc[3:] {
disc.Close()
// Make sure it actually closes
disc.wg.Wait()
wg.Add(len(noCap) * 2)
for _, disc := range noCap {
go func(disc *CapabilitiesDiscovery) {
defer wg.Done()
require.Eventuallyf(t,
func() bool {
numArchPeers := len(archOnly) + len(archCatch)
peers, err := disc.PeersForCapability(Archival, numArchPeers)
if err == nil && len(peers) == numArchPeers {
return true
}
return false
},
time.Minute,
time.Second,
"Not all expected archival peers were found",
)
}(disc)

go func(disc *CapabilitiesDiscovery) {
defer wg.Done()
require.Eventuallyf(t,
func() bool {
numCatchPeers := len(catchOnly) + len(archCatch)
peers, err := disc.PeersForCapability(Catchpoints, numCatchPeers)
if err == nil && len(peers) == numCatchPeers {
return true
}
return false
},
time.Minute,
time.Second,
"Not all expected catchpoint peers were found",
)
}(disc)
}

wg.Wait()

for _, disc := range capsDisc[3:] {
disc.Close()
// Make sure it actually closes
disc.wg.Wait()
}
})
}
}

func TestCapabilitiesExcludesSelf(t *testing.T) {
func TestCapabilities_ExcludesSelf(t *testing.T) {
partitiontest.PartitionTest(t)
disc := setupCapDiscovery(t, 2)
disc := setupCapDiscovery(t, 2, 2)

testPeersFound := func(disc *CapabilitiesDiscovery, n int, cap Capability) bool {
peers, err := disc.PeersForCapability(cap, n+1)
Expand Down
22 changes: 12 additions & 10 deletions network/p2p/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (

const minBackoff = time.Second * 5
const maxBackoff = time.Second * 20
const baseBackoff = float64(1)
const baseBackoff = float64(1.1)

// getBootstrapPeersFunc looks up a list of Multiaddrs strings from the dnsaddr records at the primary
// SRV record domain.
Expand Down Expand Up @@ -70,28 +70,30 @@ func dhtProtocolPrefix(network string) protocol.ID {

// MakeDHT creates the dht.IpfsDHT object
func MakeDHT(ctx context.Context, h host.Host, network string, cfg config.Local, bootstrapPeers []*peer.AddrInfo) (*dht.IpfsDHT, error) {
var peers []peer.AddrInfo
for _, bPeer := range bootstrapPeers {
if bPeer != nil {
peers = append(peers, *bPeer)
}
}
dhtCfg := []dht.Option{
// Automatically determine server or client mode
dht.Mode(dht.ModeAutoServer),
// We don't need the value store right now
dht.DisableValues(),
dht.ProtocolPrefix(dhtProtocolPrefix(network)),
dht.BootstrapPeers(peers...),
}
if len(bootstrapPeers) == 0 {
if len(bootstrapPeers) > 0 {
var peers []peer.AddrInfo
for _, bPeer := range bootstrapPeers {
if bPeer != nil {
peers = append(peers, *bPeer)
}
}
dhtCfg = append(dhtCfg, dht.BootstrapPeers(peers...))

} else {
dhtCfg = append(dhtCfg, dht.BootstrapPeersFunc(getBootstrapPeersFunc(cfg, network)))
}
return dht.New(ctx, h, dhtCfg...)
}

func backoffFactory() backoff.BackoffFactory {
return backoff.NewExponentialDecorrelatedJitter(minBackoff, maxBackoff, baseBackoff, rand.New(rand.NewSource(rand.Int63())))
return backoff.NewExponentialDecorrelatedJitter(minBackoff, maxBackoff, baseBackoff, rand.NewSource(rand.Int63()))
}

// MakeDiscovery creates a discovery.Discovery object using backoff and cacching
Expand Down