Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ type TxHandlerOpts struct {
Config config.Local
}

// HybridRelayer is an interface for relaying p2p transactions to WS network
type HybridRelayer interface {
BridgeP2PToWS(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except network.Peer) error
}

// MakeTxHandler makes a new handler for transaction messages
func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) {

Expand Down Expand Up @@ -858,6 +863,11 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa
if err != nil {
logging.Base().Infof("unable to pin transaction: %v", err)
}

if hybridNet, ok := handler.net.(HybridRelayer); ok {
_ = hybridNet.BridgeP2PToWS(handler.ctx, protocol.TxnTag, reencoded, false, wi.rawmsg.Sender)
}

return network.OutgoingMessage{
Action: network.Accept,
}
Expand Down
5 changes: 5 additions & 0 deletions network/hybridNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ func (n *HybridP2PNetwork) Relay(ctx context.Context, tag protocol.Tag, data []b
})
}

// BridgeP2PToWS skips Relay/Broadcast to both networks and only sends to WS
func (n *HybridP2PNetwork) BridgeP2PToWS(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error {
return n.wsNetwork.Relay(ctx, tag, data, wait, except)
}

// Disconnect implements GossipNode
func (n *HybridP2PNetwork) Disconnect(badnode DisconnectablePeer) {
net := badnode.GetNetwork()
Expand Down
156 changes: 153 additions & 3 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func setupFullNodesEx(
genesis[short] = data
}
genesis[poolAddr] = basics.AccountData{
Status: basics.Online,
Status: basics.Offline,
MicroAlgos: basics.MicroAlgos{Raw: uint64(100000)},
}

Expand Down Expand Up @@ -241,7 +241,7 @@ func setupFullNodesEx(
cfg, err := config.LoadConfigFromDisk(rootDirectory)
phonebook := phonebookHook(nodeInfos, i)
require.NoError(t, err)
node, err := MakeFull(logging.Base(), rootDirectory, cfg, phonebook, g)
node, err := MakeFull(logging.Base().With("net", fmt.Sprintf("node%d", i)), rootDirectory, cfg, phonebook, g)
nodes[i] = node
require.NoError(t, err)
}
Expand Down Expand Up @@ -861,7 +861,7 @@ func TestMaxSizesCorrect(t *testing.T) {
// N -- R -- A and ensures N can discover A and download blocks from it.
//
// N is a non-part node that joins the network later
// R is a non-archival relay node with block service disabled. It MUST NOT service blocks to force N to discover A.
// R is a non-archival relay node with block service disabled. It MUST NOT serve blocks to force N to discover A.
// A is a archival node that can only provide blocks.
// Nodes N and A have only R in their initial phonebook, and all nodes are in hybrid mode.
func TestNodeHybridTopology(t *testing.T) {
Expand Down Expand Up @@ -1112,3 +1112,153 @@ func TestNodeSetCatchpointCatchupMode(t *testing.T) {
})
}
}

// TestNodeHybridP2PGossipSend set ups 3 nodes network with the following topology:
// N0 -- R -- N2 where N0 is wsnet only, R is a relay hybrid node, and N2 is p2pnet only.
//
// N0 is the only blocks producer, and N2 is the only transaction supplier.
// Test ensures that a hybrid R relay can properly deliver transactions to N0.
func TestNodeHybridP2PGossipSend(t *testing.T) {
partitiontest.PartitionTest(t)

const consensusTest0 = protocol.ConsensusVersion("test0")

configurableConsensus := make(config.ConsensusProtocols)

testParams0 := config.Consensus[protocol.ConsensusCurrentVersion]
testParams0.AgreementFilterTimeoutPeriod0 = 500 * time.Millisecond
configurableConsensus[consensusTest0] = testParams0

// configure the stake to have R and A producing and confirming blocks
const totalStake = 100_000_000_000
const npnStake = 1_000_000
const nodeStake = totalStake - npnStake
const numAccounts = 3
acctStake := make([]basics.MicroAlgos, numAccounts)
acctStake[0] = basics.MicroAlgos{Raw: nodeStake}
acctStake[1] = basics.MicroAlgos{}
acctStake[2] = basics.MicroAlgos{Raw: npnStake}

configHook := func(ni nodeInfo, cfg config.Local) (nodeInfo, config.Local) {
cfg = config.GetDefaultLocal()
cfg.CatchpointInterval = 0
cfg.BaseLoggerDebugLevel = uint32(logging.Debug)
if ni.idx == 0 {
// node 0 is ws node only
cfg.EnableP2PHybridMode = false
cfg.EnableP2P = false
}

if ni.idx == 1 {
// node 1 is a hybrid relay
cfg.EnableBlockService = true
cfg.EnableGossipBlockService = true
cfg.NetAddress = ni.wsNetAddr()
cfg.EnableP2PHybridMode = true
cfg.PublicAddress = ni.wsNetAddr()
cfg.P2PPersistPeerID = true
privKey, err := p2p.GetPrivKey(cfg, ni.rootDir)
require.NoError(t, err)
ni.p2pID, err = p2p.PeerIDFromPublicKey(privKey.GetPublic())
require.NoError(t, err)

cfg.P2PHybridNetAddress = ni.p2pNetAddr()
}
if ni.idx == 2 {
// node 2 is p2p only
cfg.EnableP2PHybridMode = false
cfg.EnableP2P = true
}
return ni, cfg
}

phonebookHook := func(ni []nodeInfo, i int) []string {
switch i {
case 0:
// node 0 (N0) connects to R
t.Logf("Node%d phonebook: %s, %s", i, ni[1].wsNetAddr(), ni[1].p2pMultiAddr())
return []string{ni[1].wsNetAddr(), ni[1].p2pMultiAddr()}
case 1:
// node 1 (R) is a relay accepting connections from all
t.Logf("Node%d phonebook: empty", i)
return []string{}
case 2:
// node 2 (A) connects to R
t.Logf("Node%d phonebook: %s, %s", i, ni[1].wsNetAddr(), ni[1].p2pMultiAddr())
return []string{ni[1].wsNetAddr(), ni[1].p2pMultiAddr()}
default:
t.Errorf("not expected number of nodes: %d", i)
t.FailNow()
}
return nil
}

nodes, wallets := setupFullNodesEx(t, consensusTest0, configurableConsensus, acctStake, configHook, phonebookHook)
require.Len(t, nodes, 3)
require.Len(t, wallets, 3)
for i := 0; i < len(nodes); i++ {
defer os.Remove(wallets[i])
defer nodes[i].Stop()
}

startAndConnectNodes(nodes, nodelayFirstNodeStartDelay)

// ensure the initial connectivity topology
require.Eventually(t, func() bool {
node0Conn := len(nodes[0].net.GetPeers(network.PeersConnectedOut)) > 0 // connected to 1
node1Conn := len(nodes[1].net.GetPeers(network.PeersConnectedOut, network.PeersConnectedIn)) == 2 // connected from 0 and 2
node2Conn := len(nodes[2].net.GetPeers(network.PeersConnectedOut)) > 0 // connected to 1
return node0Conn && node1Conn && node2Conn
}, 60*time.Second, 500*time.Millisecond)

// now wait 2x heartbeat interval (GossipSubHeartbeatInterval) to ensure the meshsub is built
time.Sleep(2 * time.Second)

filename := filepath.Join(nodes[2].genesisDirs.RootGenesisDir, wallets[2])
access, err := db.MakeAccessor(filename, false, false)
require.NoError(t, err)
root, err := account.RestoreRoot(access)
access.Close()
require.NoError(t, err)

addr2 := root.Address()
secrets2 := root.Secrets()

txn := transactions.Transaction{
Type: protocol.PaymentTx,
Header: transactions.Header{
Sender: addr2,
FirstValid: 1,
LastValid: 100,
Fee: basics.MicroAlgos{Raw: 1000},
GenesisID: nodes[2].genesisID,
GenesisHash: nodes[2].genesisHash,
},
PaymentTxnFields: transactions.PaymentTxnFields{
Receiver: addr2,
Amount: basics.MicroAlgos{Raw: 0},
},
}
signature := secrets2.Sign(txn)
stxn := transactions.SignedTxn{
Sig: signature,
Txn: txn,
}

err = nodes[2].BroadcastSignedTxGroup([]transactions.SignedTxn{stxn})
require.NoError(t, err)

initialRound := nodes[0].ledger.NextRound()
targetRound := initialRound + 10
t.Logf("Waiting for round %d (initial %d)", targetRound, initialRound)

// ensure tx properly propagated to node 0
select {
case <-nodes[0].ledger.Wait(targetRound):
b, err := nodes[0].ledger.Block(targetRound)
require.NoError(t, err)
require.Greater(t, b.TxnCounter, uint64(1000)) // new initial value after AppForbidLowResources
case <-time.After(1 * time.Minute):
require.Fail(t, fmt.Sprintf("no block notification for wallet: %v.", wallets[0]))
}
}