From ef7c93b73ae78353e26694518ff28f926e9f83ad Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 25 Oct 2024 17:45:14 -0400 Subject: [PATCH 1/4] network: handle p2p to ws messages propagation --- data/txHandler.go | 3 + node/node_test.go | 147 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 149 insertions(+), 1 deletion(-) diff --git a/data/txHandler.go b/data/txHandler.go index ec3a84cc1f..e156417f79 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -858,6 +858,9 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa if err != nil { logging.Base().Infof("unable to pin transaction: %v", err) } + + handler.net.Relay(handler.ctx, protocol.TxnTag, reencoded, false, wi.rawmsg.Sender) + return network.OutgoingMessage{ Action: network.Accept, } diff --git a/node/node_test.go b/node/node_test.go index 387ea58aa8..8cee3f9a11 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -861,7 +861,7 @@ func TestMaxSizesCorrect(t *testing.T) { // N -- R -- A and ensures N can discover A and download blocks from it. // // N is a non-part node that joins the network later -// R is a non-archival relay node with block service disabled. It MUST NOT service blocks to force N to discover A. +// R is a non-archival relay node with block service disabled. It MUST NOT serve blocks to force N to discover A. // A is a archival node that can only provide blocks. // Nodes N and A have only R in their initial phonebook, and all nodes are in hybrid mode. func TestNodeHybridTopology(t *testing.T) { @@ -1112,3 +1112,148 @@ func TestNodeSetCatchpointCatchupMode(t *testing.T) { }) } } + +// TestNodeHybridP2PGossipSend set ups 3 nodes network with the following topology: +// N0 -- R -- N2 where N0 is wsnet only, R is a relay hybrid node, and N2 is p2pnet only. +// +// N0 is the only blocks producer, and N2 is the only transaction supplier. +// Test ensures that a hybrid R relay can properly deliver transactions to N0. +func TestNodeHybridP2PGossipSend(t *testing.T) { + partitiontest.PartitionTest(t) + + const consensusTest0 = protocol.ConsensusVersion("test0") + + configurableConsensus := make(config.ConsensusProtocols) + + testParams0 := config.Consensus[protocol.ConsensusCurrentVersion] + testParams0.AgreementFilterTimeoutPeriod0 = 500 * time.Millisecond + configurableConsensus[consensusTest0] = testParams0 + + // configure the stake to have R and A producing and confirming blocks + const totalStake = 100_000_000_000 + const npnStake = 1_000_000 + const nodeStake = totalStake - npnStake + const numAccounts = 3 + acctStake := make([]basics.MicroAlgos, numAccounts) + acctStake[0] = basics.MicroAlgos{Raw: nodeStake} + acctStake[1] = basics.MicroAlgos{} + acctStake[2] = basics.MicroAlgos{Raw: npnStake} + + configHook := func(ni nodeInfo, cfg config.Local) (nodeInfo, config.Local) { + cfg = config.GetDefaultLocal() + if ni.idx != 1 { + cfg.EnableBlockService = false + cfg.EnableGossipBlockService = false + cfg.EnableLedgerService = false + cfg.CatchpointInterval = 0 + cfg.Archival = false + cfg.NetAddress = "" + } else { + // node 1 is archival + cfg.EnableBlockService = true + cfg.EnableGossipBlockService = true + cfg.NetAddress = ni.wsNetAddr() + cfg.EnableP2PHybridMode = true + cfg.PublicAddress = ni.wsNetAddr() + cfg.P2PPersistPeerID = true + privKey, err := p2p.GetPrivKey(cfg, ni.rootDir) + require.NoError(t, err) + ni.p2pID, err = p2p.PeerIDFromPublicKey(privKey.GetPublic()) + require.NoError(t, err) + + cfg.P2PHybridNetAddress = ni.p2pNetAddr() + } + if ni.idx == 2 { + // node 2 is p2p only + cfg.EnableP2PHybridMode = false + cfg.EnableP2P = true + } + return ni, cfg + } + + phonebookHook := func(ni []nodeInfo, i int) []string { + switch i { + case 0: + // node 0 (N0) connects to R + t.Logf("Node%d phonebook: %s, %s", i, ni[1].wsNetAddr(), ni[1].p2pMultiAddr()) + return []string{ni[1].wsNetAddr(), ni[1].p2pMultiAddr()} + case 1: + // node 1 (R) is a relay accepting connections from all + t.Logf("Node%d phonebook: empty", i) + return []string{} + case 2: + // node 2 (A) connects to R + t.Logf("Node%d phonebook: %s, %s", i, ni[1].wsNetAddr(), ni[1].p2pMultiAddr()) + return []string{ni[1].wsNetAddr(), ni[1].p2pMultiAddr()} + default: + t.Errorf("not expected number of nodes: %d", i) + t.FailNow() + } + return nil + } + + nodes, wallets := setupFullNodesEx(t, consensusTest0, configurableConsensus, acctStake, configHook, phonebookHook) + require.Len(t, nodes, 3) + require.Len(t, wallets, 3) + for i := 0; i < len(nodes); i++ { + defer os.Remove(wallets[i]) + defer nodes[i].Stop() + } + + startAndConnectNodes(nodes, nodelayFirstNodeStartDelay) + + // ensure the initial connectivity topology + require.Eventually(t, func() bool { + node0Conn := len(nodes[0].net.GetPeers(network.PeersConnectedOut)) > 0 // connected to 1 + node1Conn := len(nodes[1].net.GetPeers(network.PeersConnectedOut, network.PeersConnectedIn)) == 2 // connected from 0 and 2 + node2Conn := len(nodes[2].net.GetPeers(network.PeersConnectedOut)) > 0 // connected to 1 + return node0Conn && node1Conn && node2Conn + }, 60*time.Second, 500*time.Millisecond) + + filename := filepath.Join(nodes[2].genesisDirs.RootGenesisDir, wallets[2]) + access, err := db.MakeAccessor(filename, false, false) + require.NoError(t, err) + root, err := account.RestoreRoot(access) + access.Close() + require.NoError(t, err) + + addr2 := root.Address() + secrets2 := root.Secrets() + + txn := transactions.Transaction{ + Type: protocol.PaymentTx, + Header: transactions.Header{ + Sender: addr2, + FirstValid: 1, + LastValid: 100, + Fee: basics.MicroAlgos{Raw: 1000}, + GenesisID: nodes[2].genesisID, + GenesisHash: nodes[2].genesisHash, + }, + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: addr2, + Amount: basics.MicroAlgos{Raw: 0}, + }, + } + signature := secrets2.Sign(txn) + stxn := transactions.SignedTxn{ + Sig: signature, + Txn: txn, + } + + err = nodes[2].BroadcastSignedTxGroup([]transactions.SignedTxn{stxn}) + require.NoError(t, err) + + initialRound := nodes[0].ledger.NextRound() + targetRound := initialRound + 10 + + // ensure discovery of archival node by tracking its ledger + select { + case <-nodes[2].ledger.Wait(targetRound): + b, err := nodes[0].ledger.Block(targetRound) + require.NoError(t, err) + require.Greater(t, b.TxnCounter, uint64(1000)) // new initial value after AppForbidLowResources + case <-time.After(3 * time.Minute): // set it to 1.5x of the dht.periodicBootstrapInterval to give DHT code to rebuild routing table one more time + require.Fail(t, fmt.Sprintf("no block notification for wallet: %v.", wallets[0])) + } +} From a9d469fbcd7826d7909b3e3566926e90e4ac695b Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 25 Oct 2024 17:54:02 -0400 Subject: [PATCH 2/4] fix linter --- config/config_test.go | 5 +++-- data/txHandler.go | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/config/config_test.go b/config/config_test.go index 936622b06c..2ad8e32d61 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -773,7 +773,8 @@ func TestLocal_ValidateP2PHybridConfig(t *testing.T) { for i, test := range tests { test := test - t.Run(fmt.Sprintf("test=%d", i), func(t *testing.T) { + name := fmt.Sprintf("test=%d", i) + t.Run(name, func(t *testing.T) { t.Parallel() c := Local{ @@ -782,7 +783,7 @@ func TestLocal_ValidateP2PHybridConfig(t *testing.T) { NetAddress: test.netAddress, } err := c.ValidateP2PHybridConfig() - require.Equal(t, test.err, err != nil, "test=%d", i) + require.Equal(t, test.err, err != nil, name) }) } } diff --git a/data/txHandler.go b/data/txHandler.go index e156417f79..4f390494de 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -859,7 +859,7 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa logging.Base().Infof("unable to pin transaction: %v", err) } - handler.net.Relay(handler.ctx, protocol.TxnTag, reencoded, false, wi.rawmsg.Sender) + _ = handler.net.Relay(handler.ctx, protocol.TxnTag, reencoded, false, wi.rawmsg.Sender) return network.OutgoingMessage{ Action: network.Accept, From 3ef71a58fa14dfaca41d3604ac949a5759d6bd92 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 29 Oct 2024 14:22:43 -0400 Subject: [PATCH 3/4] fix TestNodeHybridP2PGossipSend --- node/node_test.go | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/node/node_test.go b/node/node_test.go index 8cee3f9a11..8cab211280 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -207,7 +207,7 @@ func setupFullNodesEx( genesis[short] = data } genesis[poolAddr] = basics.AccountData{ - Status: basics.Online, + Status: basics.Offline, MicroAlgos: basics.MicroAlgos{Raw: uint64(100000)}, } @@ -241,7 +241,7 @@ func setupFullNodesEx( cfg, err := config.LoadConfigFromDisk(rootDirectory) phonebook := phonebookHook(nodeInfos, i) require.NoError(t, err) - node, err := MakeFull(logging.Base(), rootDirectory, cfg, phonebook, g) + node, err := MakeFull(logging.Base().With("net", fmt.Sprintf("node%d", i)), rootDirectory, cfg, phonebook, g) nodes[i] = node require.NoError(t, err) } @@ -1141,15 +1141,16 @@ func TestNodeHybridP2PGossipSend(t *testing.T) { configHook := func(ni nodeInfo, cfg config.Local) (nodeInfo, config.Local) { cfg = config.GetDefaultLocal() - if ni.idx != 1 { - cfg.EnableBlockService = false - cfg.EnableGossipBlockService = false - cfg.EnableLedgerService = false - cfg.CatchpointInterval = 0 - cfg.Archival = false - cfg.NetAddress = "" - } else { - // node 1 is archival + cfg.CatchpointInterval = 0 + cfg.BaseLoggerDebugLevel = uint32(logging.Debug) + if ni.idx == 0 { + // node 0 is ws node only + cfg.EnableP2PHybridMode = false + cfg.EnableP2P = false + } + + if ni.idx == 1 { + // node 1 is a hybrid relay cfg.EnableBlockService = true cfg.EnableGossipBlockService = true cfg.NetAddress = ni.wsNetAddr() @@ -1210,6 +1211,9 @@ func TestNodeHybridP2PGossipSend(t *testing.T) { return node0Conn && node1Conn && node2Conn }, 60*time.Second, 500*time.Millisecond) + // now wait 2x heartbeat interval (GossipSubHeartbeatInterval) to ensure the meshsub is built + time.Sleep(2 * time.Second) + filename := filepath.Join(nodes[2].genesisDirs.RootGenesisDir, wallets[2]) access, err := db.MakeAccessor(filename, false, false) require.NoError(t, err) @@ -1246,14 +1250,15 @@ func TestNodeHybridP2PGossipSend(t *testing.T) { initialRound := nodes[0].ledger.NextRound() targetRound := initialRound + 10 + t.Logf("Waiting for round %d (initial %d)", targetRound, initialRound) - // ensure discovery of archival node by tracking its ledger + // ensure tx properly propagated to node 0 select { - case <-nodes[2].ledger.Wait(targetRound): + case <-nodes[0].ledger.Wait(targetRound): b, err := nodes[0].ledger.Block(targetRound) require.NoError(t, err) require.Greater(t, b.TxnCounter, uint64(1000)) // new initial value after AppForbidLowResources - case <-time.After(3 * time.Minute): // set it to 1.5x of the dht.periodicBootstrapInterval to give DHT code to rebuild routing table one more time + case <-time.After(1 * time.Minute): require.Fail(t, fmt.Sprintf("no block notification for wallet: %v.", wallets[0])) } } From 7b0e4a019edd156dbef368b9f1928535c41dd77a Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 18 Nov 2024 16:23:43 -0500 Subject: [PATCH 4/4] Add HybridRelayer interface --- data/txHandler.go | 9 ++++++++- network/hybridNetwork.go | 5 +++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/data/txHandler.go b/data/txHandler.go index 4f390494de..83eafe7a6c 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -148,6 +148,11 @@ type TxHandlerOpts struct { Config config.Local } +// HybridRelayer is an interface for relaying p2p transactions to WS network +type HybridRelayer interface { + BridgeP2PToWS(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except network.Peer) error +} + // MakeTxHandler makes a new handler for transaction messages func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) { @@ -859,7 +864,9 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa logging.Base().Infof("unable to pin transaction: %v", err) } - _ = handler.net.Relay(handler.ctx, protocol.TxnTag, reencoded, false, wi.rawmsg.Sender) + if hybridNet, ok := handler.net.(HybridRelayer); ok { + _ = hybridNet.BridgeP2PToWS(handler.ctx, protocol.TxnTag, reencoded, false, wi.rawmsg.Sender) + } return network.OutgoingMessage{ Action: network.Accept, diff --git a/network/hybridNetwork.go b/network/hybridNetwork.go index 5f31436fb8..c62c01c5d6 100644 --- a/network/hybridNetwork.go +++ b/network/hybridNetwork.go @@ -125,6 +125,11 @@ func (n *HybridP2PNetwork) Relay(ctx context.Context, tag protocol.Tag, data []b }) } +// BridgeP2PToWS skips Relay/Broadcast to both networks and only sends to WS +func (n *HybridP2PNetwork) BridgeP2PToWS(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error { + return n.wsNetwork.Relay(ctx, tag, data, wait, except) +} + // Disconnect implements GossipNode func (n *HybridP2PNetwork) Disconnect(badnode DisconnectablePeer) { net := badnode.GetNetwork()