diff --git a/cmd/geth/chaincmd.go b/cmd/geth/chaincmd.go index c975bec5b6..fe53b074b0 100644 --- a/cmd/geth/chaincmd.go +++ b/cmd/geth/chaincmd.go @@ -521,7 +521,6 @@ func initNetwork(ctx *cli.Context) error { sentryEnodes []*enode.Node sentryNodeIDs []enode.ID connectOneExtraEnodes bool - staticConnect bool ) if enableSentryNode { sentryConfigs, sentryEnodes, err = createSentryNodeConfigs(ctx, config, initDir) @@ -533,10 +532,9 @@ func initNetwork(ctx *cli.Context) error { sentryNodeIDs[i] = sentryEnodes[i].ID() } connectOneExtraEnodes = true - staticConnect = true } - configs, enodes, accounts, err := createConfigs(config, initDir, "node", ips, ports, sentryEnodes, connectOneExtraEnodes, staticConnect) + configs, enodes, accounts, err := createConfigs(config, initDir, "node", ips, ports, sentryEnodes, connectOneExtraEnodes, true) if err != nil { utils.Fatalf("Failed to create node configs: %v", err) } @@ -548,7 +546,8 @@ func initNetwork(ctx *cli.Context) error { // add more feature configs if enableSentryNode { for i := 0; i < len(sentryConfigs); i++ { - sentryConfigs[i].Node.P2P.ProxyedValidatorAddresses = accounts + sentryConfigs[i].Node.P2P.ProxyedValidatorAddresses = accounts[i] + sentryConfigs[i].Node.P2P.ProxyedNodeIds = []enode.ID{nodeIDs[i]} } } if ctx.Bool(utils.InitEVNValidatorWhitelist.Name) { @@ -588,11 +587,9 @@ func initNetwork(ctx *cli.Context) error { } if ctx.Int(utils.InitFullNodeSize.Name) > 0 { - var extraEnodes []*enode.Node + extraEnodes := enodes if enableSentryNode { extraEnodes = sentryEnodes - } else { - extraEnodes = enodes } _, _, err := createAndSaveFullNodeConfigs(ctx, inGenesisFile, config, initDir, extraEnodes) if err != nil { @@ -656,13 +653,13 @@ func createAndSaveFullNodeConfigs(ctx *cli.Context, inGenesisFile *os.File, base return configs, enodes, nil } -func createConfigs(base gethConfig, initDir string, prefix string, ips []string, ports []int, extraEnodes []*enode.Node, connectOneExtraEnodes bool, staticConnect bool) ([]gethConfig, []*enode.Node, []common.Address, error) { +func createConfigs(base gethConfig, initDir string, prefix string, ips []string, ports []int, extraEnodes []*enode.Node, connectOneExtraEnodes bool, staticConnect bool) ([]gethConfig, []*enode.Node, [][]common.Address, error) { if len(ips) != len(ports) { return nil, nil, nil, errors.New("mismatch of size and length of ports") } size := len(ips) enodes := make([]*enode.Node, size) - accounts := make([]common.Address, size) + accounts := make([][]common.Address, size) for i := 0; i < size; i++ { nodeConfig := base.Node nodeConfig.DataDir = path.Join(initDir, fmt.Sprintf("%s%d", prefix, i)) @@ -673,9 +670,7 @@ func createConfigs(base gethConfig, initDir string, prefix string, ips []string, if err := setAccountManagerBackends(stack.Config(), stack.AccountManager(), stack.KeyStoreDir()); err != nil { utils.Fatalf("Failed to set account manager backends: %v", err) } - if len(stack.AccountManager().Accounts()) > 0 { - accounts[i] = stack.AccountManager().Accounts()[0] - } + accounts[i] = stack.AccountManager().Accounts() pk := stack.Config().NodeKey() enodes[i] = enode.NewV4(&pk.PublicKey, net.ParseIP(ips[i]), ports[i], ports[i]) } diff --git a/cmd/geth/initnetwork_test.go b/cmd/geth/initnetwork_test.go index 1473f056e4..a9352b7c98 100644 --- a/cmd/geth/initnetwork_test.go +++ b/cmd/geth/initnetwork_test.go @@ -91,18 +91,18 @@ func verifyConfigFileRemoteHosts(t *testing.T, config *gethConfig, ipStr string, t.Fatalf("expected ListenAddr to be %s but it is %s instead", expectedListenAddr, config.Node.P2P.ListenAddr) } - bootnodes := config.Node.P2P.BootstrapNodes + staticnodes := config.Node.P2P.StaticNodes // 3. check correctness of peers' hosts for j := 0; j < i; j++ { - ip := bootnodes[j].IP().String() + ip := staticnodes[j].IP().String() if ip != ips[j] { t.Fatalf("expected IP of bootnode to be %s but found %s instead", ips[j], ip) } } for j := i + 1; j < size; j++ { - ip := bootnodes[j-1].IP().String() + ip := staticnodes[j-1].IP().String() if ip != ips[j] { t.Fatalf("expected IP of bootnode to be %s but found %s instead", ips[j-1], ip) } @@ -110,8 +110,8 @@ func verifyConfigFileRemoteHosts(t *testing.T, config *gethConfig, ipStr string, // 4. check correctness of peer port numbers for j := 0; j < size-1; j++ { - if bootnodes[j].UDP() != basePort { - t.Fatalf("expected bootnode port at position %d to be %d but got %d instead", j, basePort, bootnodes[j].UDP()) + if staticnodes[j].UDP() != basePort { + t.Fatalf("expected bootnode port at position %d to be %d but got %d instead", j, basePort, staticnodes[j].UDP()) } } } @@ -123,11 +123,11 @@ func verifyConfigFileLocalhost(t *testing.T, config *gethConfig, i int, basePort t.Fatalf("expected ListenAddr to be %s but it is %s instead", expectedListenAddr, config.Node.P2P.ListenAddr) } - bootnodes := config.Node.P2P.BootstrapNodes + staticnodes := config.Node.P2P.StaticNodes // 2. check correctness of peers' hosts localhost := "127.0.0.1" for j := 0; j < size-1; j++ { - ip := bootnodes[j].IP().String() + ip := staticnodes[j].IP().String() if ip != localhost { t.Fatalf("expected IP of bootnode to be %s but found %s instead", localhost, ip) } @@ -135,13 +135,13 @@ func verifyConfigFileLocalhost(t *testing.T, config *gethConfig, i int, basePort // 3. check correctness of peer port numbers for j := 0; j < i; j++ { - if bootnodes[j].UDP() != basePort+j { - t.Fatalf("expected bootnode port at position %d to be %d but got %d instead", j, basePort+j, bootnodes[j].UDP()) + if staticnodes[j].UDP() != basePort+j { + t.Fatalf("expected bootnode port at position %d to be %d but got %d instead", j, basePort+j, staticnodes[j].UDP()) } } for j := i + 1; j < size; j++ { - if bootnodes[j-1].UDP() != basePort+j { - t.Fatalf("expected bootnode port at position %d to be %d but got %d instead", j-1, basePort+j, bootnodes[j-1].UDP()) + if staticnodes[j-1].UDP() != basePort+j { + t.Fatalf("expected bootnode port at position %d to be %d but got %d instead", j-1, basePort+j, staticnodes[j-1].UDP()) } } } diff --git a/eth/backend.go b/eth/backend.go index 868df524e4..63f1fa9121 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -442,6 +442,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { EnableBAL: config.EnableBAL, EVNNodeIdsWhitelist: stack.Config().P2P.EVNNodeIdsWhitelist, ProxyedValidatorAddresses: stack.Config().P2P.ProxyedValidatorAddresses, + ProxyedNodeIds: stack.Config().P2P.ProxyedNodeIds, DisablePeerTxBroadcast: config.DisablePeerTxBroadcast, PeerSet: newPeerSet(), EnableQuickBlockFetching: stack.Config().EnableQuickBlockFetching, diff --git a/eth/handler.go b/eth/handler.go index bf71375f43..7580ae633d 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -144,6 +144,7 @@ type handlerConfig struct { EnableBAL bool EVNNodeIdsWhitelist []enode.ID ProxyedValidatorAddresses []common.Address + ProxyedNodeIds []enode.ID } type handler struct { @@ -154,6 +155,7 @@ type handler struct { enableBAL bool evnNodeIdsWhitelistMap map[enode.ID]struct{} proxyedValidatorAddressMap map[common.Address]struct{} + proxyedNodeIdsMap map[enode.ID]struct{} snapSync atomic.Bool // Flag whether snap sync is enabled (gets disabled if we already have blocks) synced atomic.Bool // Flag whether we're considered synchronised (enables transaction processing) @@ -225,6 +227,7 @@ func newHandler(config *handlerConfig) (*handler, error) { enableBAL: config.EnableBAL, evnNodeIdsWhitelistMap: make(map[enode.ID]struct{}), proxyedValidatorAddressMap: make(map[common.Address]struct{}), + proxyedNodeIdsMap: make(map[enode.ID]struct{}), quitSync: make(chan struct{}), handlerDoneCh: make(chan struct{}), handlerStartCh: make(chan struct{}), @@ -236,6 +239,9 @@ func newHandler(config *handlerConfig) (*handler, error) { for _, address := range config.ProxyedValidatorAddresses { h.proxyedValidatorAddressMap[address] = struct{}{} } + for _, nodeID := range config.ProxyedNodeIds { + h.proxyedNodeIdsMap[nodeID] = struct{}{} + } if h.chain.NoTries() { } else if config.Sync == ethconfig.FullSync { // The database seems empty as the current block is the genesis. Yet the snap @@ -400,6 +406,7 @@ func newHandler(config *handlerConfig) (*handler, error) { func (h *handler) protoTracker() { defer h.wg.Done() + h.peers.setProxyedPeers(h.proxyedNodeIdsMap) if h.enableEVNFeatures && h.synced.Load() { h.peers.enableEVNFeatures(h.queryValidatorNodeIDsMap(), h.evnNodeIdsWhitelistMap) } @@ -413,6 +420,7 @@ func (h *handler) protoTracker() { case <-h.handlerDoneCh: active-- case <-updateTicker.C: + h.peers.setProxyedPeers(h.proxyedNodeIdsMap) if h.enableEVNFeatures && h.synced.Load() { // add onchain validator p2p node list later, it will enable the direct broadcast + no tx broadcast feature // here check & enable peer broadcast features periodically, and it's a simple way to handle the peer change and the list change scenarios. @@ -819,36 +827,67 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { log.Error("Propagating dangling block", "number", block.Number(), "hash", hash) return } - // Send the block to a subset of our peers - var transfer []*ethPeer - if h.directBroadcast { - transfer = peers[:] - } else { - transfer = peers[:int(math.Sqrt(float64(len(peers))))] + + // Broadcast the block to peers based on broadcast strategy. + totalPeers := len(peers) + + // Step 1: Select target peers for initial broadcast. + limit := totalPeers + if !h.directBroadcast { + limit = int(math.Sqrt(float64(totalPeers))) } - for _, peer := range transfer { - log.Debug("broadcast block to peer", "hash", hash, "peer", peer.ID(), - "EVNPeerFlag", peer.EVNPeerFlag.Load(), "CanHandleBAL", peer.CanHandleBAL.Load()) + // Step 2: Broadcast to selected peers. + transferPeersCnt := limit + for _, peer := range peers[:limit] { + log.Debug("Broadcast block to peer", + "hash", hash, "peer", peer.ID(), + "EVNPeerFlag", peer.EVNPeerFlag.Load(), + "CanHandleBAL", peer.CanHandleBAL.Load(), + ) peer.AsyncSendNewBlock(block, td) } - // check if the block should be broadcast to more peers in EVN - var morePeers []*ethPeer - if h.needFullBroadcastInEVN(block) { - for i := len(transfer); i < len(peers); i++ { - if peers[i].EVNPeerFlag.Load() { - morePeers = append(morePeers, peers[i]) + // Step 3: Handle proxyed peers. + proxyedPeersCnt := 0 + if len(h.proxyedNodeIdsMap) > 0 { + for _, peer := range peers[limit:] { + if peer.ProxyedPeerFlag.Load() { + log.Debug("Broadcast block to proxyed peer", + "hash", hash, "peer", peer.ID(), + "EVNPeerFlag", peer.EVNPeerFlag.Load(), + "CanHandleBAL", peer.CanHandleBAL.Load(), + ) + peer.AsyncSendNewBlock(block, td) + proxyedPeersCnt++ } } - for _, peer := range morePeers { - log.Debug("broadcast block to extra peer", "hash", hash, "peer", peer.ID(), - "EVNPeerFlag", peer.EVNPeerFlag.Load(), "CanHandleBAL", peer.CanHandleBAL.Load()) - peer.AsyncSendNewBlock(block, td) + } + + // Step 4: Handle EVN peers if full broadcast is required. + evnPeersCnt := 0 + if h.needFullBroadcastInEVN(block) { + for _, peer := range peers[limit:] { + if !peer.ProxyedPeerFlag.Load() && peer.EVNPeerFlag.Load() { + log.Debug("Broadcast block to EVN peer", + "hash", hash, "peer", peer.ID(), + "EVNPeerFlag", peer.EVNPeerFlag.Load(), + "CanHandleBAL", peer.CanHandleBAL.Load(), + ) + peer.AsyncSendNewBlock(block, td) + evnPeersCnt++ + } } } - log.Debug("Propagated block", "hash", hash, "recipients", len(transfer), "extra", len(morePeers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) + // Step 5: Log summary. + log.Debug("Propagated block", + "hash", hash, + "recipients", transferPeersCnt, + "proxyed", proxyedPeersCnt, + "evn", evnPeersCnt, + "duration", common.PrettyDuration(time.Since(block.ReceivedAt)), + ) return } // Otherwise if the block is indeed in our own chain, announce it @@ -1017,7 +1056,7 @@ func (h *handler) BroadcastVote(vote *types.VoteEnvelope) { if peer.bscExt == nil { continue } - if peer.EVNPeerFlag.Load() { + if peer.ProxyedPeerFlag.Load() || peer.EVNPeerFlag.Load() { voteMap[peer] = vote continue } diff --git a/eth/peerset.go b/eth/peerset.go index 2c75ad3636..3c5b30c526 100644 --- a/eth/peerset.go +++ b/eth/peerset.go @@ -328,6 +328,24 @@ func (ps *peerSet) peer(id string) *ethPeer { return ps.peers[id] } +func (ps *peerSet) setProxyedPeers(proxyedNodeIdsMap map[enode.ID]struct{}) { + ps.lock.RLock() + peers := make([]*ethPeer, 0, len(ps.peers)) + for _, peer := range ps.peers { + peers = append(peers, peer) + } + ps.lock.RUnlock() + + proxyedPeerCnt := 0 + for _, peer := range peers { + if _, ok := proxyedNodeIdsMap[peer.NodeID()]; ok { + peer.ProxyedPeerFlag.Store(true) + proxyedPeerCnt++ + } + } + log.Debug("setProxyedPeers", "total", len(peers), "proxyedPeerCnt", proxyedPeerCnt) +} + // enableEVNFeatures enables the given features for the given peers. func (ps *peerSet) enableEVNFeatures(validatorNodeIDsMap map[common.Address][]enode.ID, evnWhitelistMap map[enode.ID]struct{}) { // clone current all peers, and update the validatorNodeIDsMap diff --git a/p2p/config.go b/p2p/config.go index 15ee29e1e6..932dcc8adf 100644 --- a/p2p/config.go +++ b/p2p/config.go @@ -98,6 +98,10 @@ type Config struct { // it usually used for sentry nodes ProxyedValidatorAddresses []common.Address `toml:",omitempty"` + // ProxyedNodeIds lists node IDs that receive direct broadcasts of blocks and votes, + // excluding transactions, to prevent delays in block and vote propagation. + ProxyedNodeIds []enode.ID `toml:",omitempty"` + // Connectivity can be restricted to certain IP networks. // If this option is set to a non-nil value, only hosts which match one of the // IP networks contained in the list are considered. diff --git a/p2p/config_toml.go b/p2p/config_toml.go index d4acda59ee..c331a92ad4 100644 --- a/p2p/config_toml.go +++ b/p2p/config_toml.go @@ -33,6 +33,7 @@ func (c Config) MarshalTOML() (interface{}, error) { TrustedNodes []*enode.Node EVNNodeIdsWhitelist []enode.ID `toml:",omitempty"` ProxyedValidatorAddresses []common.Address `toml:",omitempty"` + ProxyedNodeIds []enode.ID `toml:",omitempty"` NetRestrict *netutil.Netlist `toml:",omitempty"` NodeDatabase string `toml:",omitempty"` Protocols []Protocol `toml:"-" json:"-"` @@ -62,6 +63,7 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.TrustedNodes = c.TrustedNodes enc.EVNNodeIdsWhitelist = c.EVNNodeIdsWhitelist enc.ProxyedValidatorAddresses = c.ProxyedValidatorAddresses + enc.ProxyedNodeIds = c.ProxyedNodeIds enc.NetRestrict = c.NetRestrict enc.NodeDatabase = c.NodeDatabase enc.Protocols = c.Protocols @@ -95,6 +97,7 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { TrustedNodes []*enode.Node EVNNodeIdsWhitelist []enode.ID `toml:",omitempty"` ProxyedValidatorAddresses []common.Address `toml:",omitempty"` + ProxyedNodeIds []enode.ID `toml:",omitempty"` NetRestrict *netutil.Netlist `toml:",omitempty"` NodeDatabase *string `toml:",omitempty"` Protocols []Protocol `toml:"-" json:"-"` @@ -159,6 +162,9 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.ProxyedValidatorAddresses != nil { c.ProxyedValidatorAddresses = dec.ProxyedValidatorAddresses } + if dec.ProxyedNodeIds != nil { + c.ProxyedNodeIds = dec.ProxyedNodeIds + } if dec.NetRestrict != nil { c.NetRestrict = dec.NetRestrict } diff --git a/p2p/peer.go b/p2p/peer.go index 6d64e942e7..f9e9d424da 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -130,6 +130,9 @@ type Peer struct { // and won't broadcast any txs between EVN peers. EVNPeerFlag atomic.Bool + // Indicates whether this peer is proxyed. + ProxyedPeerFlag atomic.Bool + // it indicates the peer can handle BAL(block access list) packet CanHandleBAL atomic.Bool } @@ -648,7 +651,7 @@ func (p *Peer) Info() *PeerInfo { info.Network.RemoteAddress = p.RemoteAddr().String() info.Network.Inbound = p.rw.is(inboundConn) // After Maxwell, we treat all EVN peers as trusted - info.Network.Trusted = p.rw.is(trustedConn) || p.EVNPeerFlag.Load() + info.Network.Trusted = p.rw.is(trustedConn) || p.EVNPeerFlag.Load() || p.ProxyedPeerFlag.Load() info.Network.Static = p.rw.is(staticDialedConn) // Gather all the running protocol infos