diff --git a/config/config.go b/config/config.go index 7a69814820..3ff54e93e0 100644 --- a/config/config.go +++ b/config/config.go @@ -669,7 +669,7 @@ type Local struct { // the max size the sync server would return TxSyncServeResponseSize int - // IsIndexerActive indicates wheather to activate the indexer for fast retrieval of transactions + // IsIndexerActive indicates whether to activate the indexer for fast retrieval of transactions // Note -- Indexer cannot operate on non Archival nodes IsIndexerActive bool @@ -678,6 +678,9 @@ type Local struct { // proxy vendor provides another header field. In the case of CloudFlare proxy, the "CF-Connecting-IP" header // field can be used. UseXForwardedForAddressField string + + // ForceRelayMessages indicates whether the network library relay messages even in the case that no NetAddress was specified. + ForceRelayMessages bool } // Filenames of config files within the configdir (e.g. ~/.algorand) diff --git a/installer/config.json.example b/installer/config.json.example index c1124aa248..c32703f3f9 100644 --- a/installer/config.json.example +++ b/installer/config.json.example @@ -36,5 +36,6 @@ "TxPoolSize": 50000, "TxSyncIntervalSeconds": 60, "TxSyncServeResponseSize": 1000000, - "TxSyncTimeoutSeconds": 30 + "TxSyncTimeoutSeconds": 30, + "ForceRelayMessages": false } diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 05c9ee159a..1cf56587cc 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -521,6 +521,7 @@ func (wn *WebsocketNetwork) setup() { wn.server.IdleTimeout = httpServerIdleTimeout wn.server.MaxHeaderBytes = httpServerMaxHeaderBytes wn.ctx, wn.ctxCancel = context.WithCancel(context.Background()) + wn.relayMessages = wn.config.NetAddress != "" || wn.config.ForceRelayMessages // roughly estimate the number of messages that could be sent over the lifespan of a single round. wn.outgoingMessagesBufferSize = int(config.Consensus[protocol.ConsensusCurrentVersion].NumProposers*2 + config.Consensus[protocol.ConsensusCurrentVersion].SoftCommitteeSize + @@ -1528,8 +1529,6 @@ func NewWebsocketNetwork(log logging.Logger, config config.Local, phonebook Phon outerPhonebook := &MultiPhonebook{phonebooks: []Phonebook{phonebook}} wn = &WebsocketNetwork{log: log, config: config, phonebook: outerPhonebook, GenesisID: genesisID, NetworkID: networkID} - // TODO - add config parameter to allow non-relays to enable relaying. - wn.relayMessages = config.NetAddress != "" wn.setup() return wn, nil } diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index b9dd648fd1..3544171399 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -1464,3 +1464,86 @@ func TestSlowPeerDisconnection(t *testing.T) { time.Sleep(time.Millisecond * 5) } } + +func TestForceMessageRelaying(t *testing.T) { + log := logging.TestingLog(t) + log.SetLevel(logging.Level(defaultConfig.BaseLoggerDebugLevel)) + wn := &WebsocketNetwork{ + log: log, + config: defaultConfig, + phonebook: emptyPhonebookSingleton, + GenesisID: "go-test-network-genesis", + NetworkID: config.Devtestnet, + } + wn.setup() + wn.eventualReadyDelay = time.Second + + netA := wn + netA.config.GossipFanout = 1 + + defer func() { t.Log("stopping A"); netA.Stop(); t.Log("A done") }() + + counter := newMessageCounter(t, 5) + counterDone := counter.done + netA.RegisterHandlers([]TaggedMessageHandler{TaggedMessageHandler{Tag: debugTag, MessageHandler: counter}}) + netA.Start() + addrA, postListen := netA.Address() + require.Truef(t, postListen, "Listening network failed to start") + + noAddressConfig := defaultConfig + noAddressConfig.NetAddress = "" + netB := makeTestWebsocketNodeWithConfig(t, noAddressConfig) + netB.config.GossipFanout = 1 + netB.phonebook = &oneEntryPhonebook{addrA} + netB.Start() + defer func() { t.Log("stopping B"); netB.Stop(); t.Log("B done") }() + + noAddressConfig.ForceRelayMessages = true + netC := makeTestWebsocketNodeWithConfig(t, noAddressConfig) + netC.config.GossipFanout = 1 + netC.phonebook = &oneEntryPhonebook{addrA} + netC.Start() + defer func() { t.Log("stopping C"); netB.Stop(); t.Log("C done") }() + + readyTimeout := time.NewTimer(2 * time.Second) + waitReady(t, netA, readyTimeout.C) + waitReady(t, netB, readyTimeout.C) + waitReady(t, netC, readyTimeout.C) + + // send 5 messages from both netB and netC to netA + for i := 0; i < 5; i++ { + err := netB.Relay(context.Background(), debugTag, []byte{1, 2, 3}, true, nil) + require.NoError(t, err) + err = netC.Relay(context.Background(), debugTag, []byte{1, 2, 3}, true, nil) + require.NoError(t, err) + } + + select { + case <-counterDone: + case <-time.After(2 * time.Second): + if counter.count < 5 { + require.Failf(t, "One or more messages failed to reach destination network", "%d > %d", 5, counter.count) + } else if counter.count > 5 { + require.Failf(t, "One or more messages that were expected to be dropped, reached destination network", "%d < %d", 5, counter.count) + } + } + netA.ClearHandlers() + counter = newMessageCounter(t, 10) + counterDone = counter.done + netA.RegisterHandlers([]TaggedMessageHandler{TaggedMessageHandler{Tag: debugTag, MessageHandler: counter}}) + + // hack the relayMessages on the netB so that it would start sending messages. + netB.relayMessages = true + // send additional 10 messages from netB + for i := 0; i < 10; i++ { + err := netB.Relay(context.Background(), debugTag, []byte{1, 2, 3}, true, nil) + require.NoError(t, err) + } + + select { + case <-counterDone: + case <-time.After(2 * time.Second): + require.Failf(t, "One or more messages failed to reach destination network", "%d > %d", 10, counter.count) + } + +}