Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ type Local struct {
// the max size the sync server would return
TxSyncServeResponseSize int

// IsIndexerActive indicates wheather to activate the indexer for fast retrieval of transactions
// IsIndexerActive indicates whether to activate the indexer for fast retrieval of transactions
// Note -- Indexer cannot operate on non Archival nodes
IsIndexerActive bool

Expand All @@ -678,6 +678,9 @@ type Local struct {
// proxy vendor provides another header field. In the case of CloudFlare proxy, the "CF-Connecting-IP" header
// field can be used.
UseXForwardedForAddressField string

// ForceRelayMessages indicates whether the network library relay messages even in the case that no NetAddress was specified.
ForceRelayMessages bool
}

// Filenames of config files within the configdir (e.g. ~/.algorand)
Expand Down
3 changes: 2 additions & 1 deletion installer/config.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@
"TxPoolSize": 50000,
"TxSyncIntervalSeconds": 60,
"TxSyncServeResponseSize": 1000000,
"TxSyncTimeoutSeconds": 30
"TxSyncTimeoutSeconds": 30,
"ForceRelayMessages": false
}
3 changes: 1 addition & 2 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ func (wn *WebsocketNetwork) setup() {
wn.server.IdleTimeout = httpServerIdleTimeout
wn.server.MaxHeaderBytes = httpServerMaxHeaderBytes
wn.ctx, wn.ctxCancel = context.WithCancel(context.Background())
wn.relayMessages = wn.config.NetAddress != "" || wn.config.ForceRelayMessages
// roughly estimate the number of messages that could be sent over the lifespan of a single round.
wn.outgoingMessagesBufferSize = int(config.Consensus[protocol.ConsensusCurrentVersion].NumProposers*2 +
config.Consensus[protocol.ConsensusCurrentVersion].SoftCommitteeSize +
Expand Down Expand Up @@ -1528,8 +1529,6 @@ func NewWebsocketNetwork(log logging.Logger, config config.Local, phonebook Phon
outerPhonebook := &MultiPhonebook{phonebooks: []Phonebook{phonebook}}
wn = &WebsocketNetwork{log: log, config: config, phonebook: outerPhonebook, GenesisID: genesisID, NetworkID: networkID}

// TODO - add config parameter to allow non-relays to enable relaying.
wn.relayMessages = config.NetAddress != ""
wn.setup()
return wn, nil
}
Expand Down
83 changes: 83 additions & 0 deletions network/wsNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1464,3 +1464,86 @@ func TestSlowPeerDisconnection(t *testing.T) {
time.Sleep(time.Millisecond * 5)
}
}

func TestForceMessageRelaying(t *testing.T) {
log := logging.TestingLog(t)
log.SetLevel(logging.Level(defaultConfig.BaseLoggerDebugLevel))
wn := &WebsocketNetwork{
log: log,
config: defaultConfig,
phonebook: emptyPhonebookSingleton,
GenesisID: "go-test-network-genesis",
NetworkID: config.Devtestnet,
}
wn.setup()
wn.eventualReadyDelay = time.Second

netA := wn
netA.config.GossipFanout = 1

defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }()

counter := newMessageCounter(t, 5)
counterDone := counter.done
netA.RegisterHandlers([]TaggedMessageHandler{TaggedMessageHandler{Tag: debugTag, MessageHandler: counter}})
netA.Start()
addrA, postListen := netA.Address()
require.Truef(t, postListen, "Listening network failed to start")

noAddressConfig := defaultConfig
noAddressConfig.NetAddress = ""
netB := makeTestWebsocketNodeWithConfig(t, noAddressConfig)
netB.config.GossipFanout = 1
netB.phonebook = &oneEntryPhonebook{addrA}
netB.Start()
defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }()

noAddressConfig.ForceRelayMessages = true
netC := makeTestWebsocketNodeWithConfig(t, noAddressConfig)
netC.config.GossipFanout = 1
netC.phonebook = &oneEntryPhonebook{addrA}
netC.Start()
defer func() { t.Log("stopping C"); netB.Stop(); t.Log("C done") }()

readyTimeout := time.NewTimer(2 * time.Second)
waitReady(t, netA, readyTimeout.C)
waitReady(t, netB, readyTimeout.C)
waitReady(t, netC, readyTimeout.C)

// send 5 messages from both netB and netC to netA
for i := 0; i < 5; i++ {
err := netB.Relay(context.Background(), debugTag, []byte{1, 2, 3}, true, nil)
require.NoError(t, err)
err = netC.Relay(context.Background(), debugTag, []byte{1, 2, 3}, true, nil)
require.NoError(t, err)
}

select {
case <-counterDone:
case <-time.After(2 * time.Second):
if counter.count < 5 {
require.Failf(t, "One or more messages failed to reach destination network", "%d > %d", 5, counter.count)
} else if counter.count > 5 {
require.Failf(t, "One or more messages that were expected to be dropped, reached destination network", "%d < %d", 5, counter.count)
}
}
netA.ClearHandlers()
counter = newMessageCounter(t, 10)
counterDone = counter.done
netA.RegisterHandlers([]TaggedMessageHandler{TaggedMessageHandler{Tag: debugTag, MessageHandler: counter}})

// hack the relayMessages on the netB so that it would start sending messages.
netB.relayMessages = true
// send additional 10 messages from netB
for i := 0; i < 10; i++ {
err := netB.Relay(context.Background(), debugTag, []byte{1, 2, 3}, true, nil)
require.NoError(t, err)
}

select {
case <-counterDone:
case <-time.After(2 * time.Second):
require.Failf(t, "One or more messages failed to reach destination network", "%d > %d", 10, counter.count)
}

}