Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 7 additions & 12 deletions cmd/geth/chaincmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,6 @@ func initNetwork(ctx *cli.Context) error {
sentryEnodes []*enode.Node
sentryNodeIDs []enode.ID
connectOneExtraEnodes bool
staticConnect bool
)
if enableSentryNode {
sentryConfigs, sentryEnodes, err = createSentryNodeConfigs(ctx, config, initDir)
Expand All @@ -533,10 +532,9 @@ func initNetwork(ctx *cli.Context) error {
sentryNodeIDs[i] = sentryEnodes[i].ID()
}
connectOneExtraEnodes = true
staticConnect = true
}

configs, enodes, accounts, err := createConfigs(config, initDir, "node", ips, ports, sentryEnodes, connectOneExtraEnodes, staticConnect)
configs, enodes, accounts, err := createConfigs(config, initDir, "node", ips, ports, sentryEnodes, connectOneExtraEnodes, true)
if err != nil {
utils.Fatalf("Failed to create node configs: %v", err)
}
Expand All @@ -548,7 +546,8 @@ func initNetwork(ctx *cli.Context) error {
// add more feature configs
if enableSentryNode {
for i := 0; i < len(sentryConfigs); i++ {
sentryConfigs[i].Node.P2P.ProxyedValidatorAddresses = accounts
sentryConfigs[i].Node.P2P.ProxyedValidatorAddresses = accounts[i]
sentryConfigs[i].Node.P2P.ProxyedNodeIds = []enode.ID{nodeIDs[i]}
}
}
if ctx.Bool(utils.InitEVNValidatorWhitelist.Name) {
Expand Down Expand Up @@ -588,11 +587,9 @@ func initNetwork(ctx *cli.Context) error {
}

if ctx.Int(utils.InitFullNodeSize.Name) > 0 {
var extraEnodes []*enode.Node
extraEnodes := enodes
if enableSentryNode {
extraEnodes = sentryEnodes
} else {
extraEnodes = enodes
}
_, _, err := createAndSaveFullNodeConfigs(ctx, inGenesisFile, config, initDir, extraEnodes)
if err != nil {
Expand Down Expand Up @@ -656,13 +653,13 @@ func createAndSaveFullNodeConfigs(ctx *cli.Context, inGenesisFile *os.File, base
return configs, enodes, nil
}

func createConfigs(base gethConfig, initDir string, prefix string, ips []string, ports []int, extraEnodes []*enode.Node, connectOneExtraEnodes bool, staticConnect bool) ([]gethConfig, []*enode.Node, []common.Address, error) {
func createConfigs(base gethConfig, initDir string, prefix string, ips []string, ports []int, extraEnodes []*enode.Node, connectOneExtraEnodes bool, staticConnect bool) ([]gethConfig, []*enode.Node, [][]common.Address, error) {
if len(ips) != len(ports) {
return nil, nil, nil, errors.New("mismatch of size and length of ports")
}
size := len(ips)
enodes := make([]*enode.Node, size)
accounts := make([]common.Address, size)
accounts := make([][]common.Address, size)
for i := 0; i < size; i++ {
nodeConfig := base.Node
nodeConfig.DataDir = path.Join(initDir, fmt.Sprintf("%s%d", prefix, i))
Expand All @@ -673,9 +670,7 @@ func createConfigs(base gethConfig, initDir string, prefix string, ips []string,
if err := setAccountManagerBackends(stack.Config(), stack.AccountManager(), stack.KeyStoreDir()); err != nil {
utils.Fatalf("Failed to set account manager backends: %v", err)
}
if len(stack.AccountManager().Accounts()) > 0 {
accounts[i] = stack.AccountManager().Accounts()[0]
}
accounts[i] = stack.AccountManager().Accounts()
pk := stack.Config().NodeKey()
enodes[i] = enode.NewV4(&pk.PublicKey, net.ParseIP(ips[i]), ports[i], ports[i])
}
Expand Down
22 changes: 11 additions & 11 deletions cmd/geth/initnetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,27 +91,27 @@ func verifyConfigFileRemoteHosts(t *testing.T, config *gethConfig, ipStr string,
t.Fatalf("expected ListenAddr to be %s but it is %s instead", expectedListenAddr, config.Node.P2P.ListenAddr)
}

bootnodes := config.Node.P2P.BootstrapNodes
staticnodes := config.Node.P2P.StaticNodes

// 3. check correctness of peers' hosts
for j := 0; j < i; j++ {
ip := bootnodes[j].IP().String()
ip := staticnodes[j].IP().String()
if ip != ips[j] {
t.Fatalf("expected IP of bootnode to be %s but found %s instead", ips[j], ip)
}
}

for j := i + 1; j < size; j++ {
ip := bootnodes[j-1].IP().String()
ip := staticnodes[j-1].IP().String()
if ip != ips[j] {
t.Fatalf("expected IP of bootnode to be %s but found %s instead", ips[j-1], ip)
}
}

// 4. check correctness of peer port numbers
for j := 0; j < size-1; j++ {
if bootnodes[j].UDP() != basePort {
t.Fatalf("expected bootnode port at position %d to be %d but got %d instead", j, basePort, bootnodes[j].UDP())
if staticnodes[j].UDP() != basePort {
t.Fatalf("expected bootnode port at position %d to be %d but got %d instead", j, basePort, staticnodes[j].UDP())
}
}
}
Expand All @@ -123,25 +123,25 @@ func verifyConfigFileLocalhost(t *testing.T, config *gethConfig, i int, basePort
t.Fatalf("expected ListenAddr to be %s but it is %s instead", expectedListenAddr, config.Node.P2P.ListenAddr)
}

bootnodes := config.Node.P2P.BootstrapNodes
staticnodes := config.Node.P2P.StaticNodes
// 2. check correctness of peers' hosts
localhost := "127.0.0.1"
for j := 0; j < size-1; j++ {
ip := bootnodes[j].IP().String()
ip := staticnodes[j].IP().String()
if ip != localhost {
t.Fatalf("expected IP of bootnode to be %s but found %s instead", localhost, ip)
}
}

// 3. check correctness of peer port numbers
for j := 0; j < i; j++ {
if bootnodes[j].UDP() != basePort+j {
t.Fatalf("expected bootnode port at position %d to be %d but got %d instead", j, basePort+j, bootnodes[j].UDP())
if staticnodes[j].UDP() != basePort+j {
t.Fatalf("expected bootnode port at position %d to be %d but got %d instead", j, basePort+j, staticnodes[j].UDP())
}
}
for j := i + 1; j < size; j++ {
if bootnodes[j-1].UDP() != basePort+j {
t.Fatalf("expected bootnode port at position %d to be %d but got %d instead", j-1, basePort+j, bootnodes[j-1].UDP())
if staticnodes[j-1].UDP() != basePort+j {
t.Fatalf("expected bootnode port at position %d to be %d but got %d instead", j-1, basePort+j, staticnodes[j-1].UDP())
}
}
}
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
EnableBAL: config.EnableBAL,
EVNNodeIdsWhitelist: stack.Config().P2P.EVNNodeIdsWhitelist,
ProxyedValidatorAddresses: stack.Config().P2P.ProxyedValidatorAddresses,
ProxyedNodeIds: stack.Config().P2P.ProxyedNodeIds,
DisablePeerTxBroadcast: config.DisablePeerTxBroadcast,
PeerSet: newPeerSet(),
EnableQuickBlockFetching: stack.Config().EnableQuickBlockFetching,
Expand Down
81 changes: 60 additions & 21 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ type handlerConfig struct {
EnableBAL bool
EVNNodeIdsWhitelist []enode.ID
ProxyedValidatorAddresses []common.Address
ProxyedNodeIds []enode.ID
}

type handler struct {
Expand All @@ -154,6 +155,7 @@ type handler struct {
enableBAL bool
evnNodeIdsWhitelistMap map[enode.ID]struct{}
proxyedValidatorAddressMap map[common.Address]struct{}
proxyedNodeIdsMap map[enode.ID]struct{}

snapSync atomic.Bool // Flag whether snap sync is enabled (gets disabled if we already have blocks)
synced atomic.Bool // Flag whether we're considered synchronised (enables transaction processing)
Expand Down Expand Up @@ -225,6 +227,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
enableBAL: config.EnableBAL,
evnNodeIdsWhitelistMap: make(map[enode.ID]struct{}),
proxyedValidatorAddressMap: make(map[common.Address]struct{}),
proxyedNodeIdsMap: make(map[enode.ID]struct{}),
quitSync: make(chan struct{}),
handlerDoneCh: make(chan struct{}),
handlerStartCh: make(chan struct{}),
Expand All @@ -236,6 +239,9 @@ func newHandler(config *handlerConfig) (*handler, error) {
for _, address := range config.ProxyedValidatorAddresses {
h.proxyedValidatorAddressMap[address] = struct{}{}
}
for _, nodeID := range config.ProxyedNodeIds {
h.proxyedNodeIdsMap[nodeID] = struct{}{}
}
if h.chain.NoTries() {
} else if config.Sync == ethconfig.FullSync {
// The database seems empty as the current block is the genesis. Yet the snap
Expand Down Expand Up @@ -400,6 +406,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
func (h *handler) protoTracker() {
defer h.wg.Done()

h.peers.setProxyedPeers(h.proxyedNodeIdsMap)
if h.enableEVNFeatures && h.synced.Load() {
h.peers.enableEVNFeatures(h.queryValidatorNodeIDsMap(), h.evnNodeIdsWhitelistMap)
}
Expand All @@ -413,6 +420,7 @@ func (h *handler) protoTracker() {
case <-h.handlerDoneCh:
active--
case <-updateTicker.C:
h.peers.setProxyedPeers(h.proxyedNodeIdsMap)
if h.enableEVNFeatures && h.synced.Load() {
// add onchain validator p2p node list later, it will enable the direct broadcast + no tx broadcast feature
// here check & enable peer broadcast features periodically, and it's a simple way to handle the peer change and the list change scenarios.
Expand Down Expand Up @@ -819,36 +827,67 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
return
}
// Send the block to a subset of our peers
var transfer []*ethPeer
if h.directBroadcast {
transfer = peers[:]
} else {
transfer = peers[:int(math.Sqrt(float64(len(peers))))]

// Broadcast the block to peers based on broadcast strategy.
totalPeers := len(peers)

// Step 1: Select target peers for initial broadcast.
limit := totalPeers
if !h.directBroadcast {
limit = int(math.Sqrt(float64(totalPeers)))
}

for _, peer := range transfer {
log.Debug("broadcast block to peer", "hash", hash, "peer", peer.ID(),
"EVNPeerFlag", peer.EVNPeerFlag.Load(), "CanHandleBAL", peer.CanHandleBAL.Load())
// Step 2: Broadcast to selected peers.
transferPeersCnt := limit
for _, peer := range peers[:limit] {
log.Debug("Broadcast block to peer",
"hash", hash, "peer", peer.ID(),
"EVNPeerFlag", peer.EVNPeerFlag.Load(),
"CanHandleBAL", peer.CanHandleBAL.Load(),
)
peer.AsyncSendNewBlock(block, td)
}

// check if the block should be broadcast to more peers in EVN
var morePeers []*ethPeer
if h.needFullBroadcastInEVN(block) {
for i := len(transfer); i < len(peers); i++ {
if peers[i].EVNPeerFlag.Load() {
morePeers = append(morePeers, peers[i])
// Step 3: Handle proxyed peers.
proxyedPeersCnt := 0
if len(h.proxyedNodeIdsMap) > 0 {
for _, peer := range peers[limit:] {
if peer.ProxyedPeerFlag.Load() {
log.Debug("Broadcast block to proxyed peer",
"hash", hash, "peer", peer.ID(),
"EVNPeerFlag", peer.EVNPeerFlag.Load(),
"CanHandleBAL", peer.CanHandleBAL.Load(),
)
peer.AsyncSendNewBlock(block, td)
proxyedPeersCnt++
}
}
for _, peer := range morePeers {
log.Debug("broadcast block to extra peer", "hash", hash, "peer", peer.ID(),
"EVNPeerFlag", peer.EVNPeerFlag.Load(), "CanHandleBAL", peer.CanHandleBAL.Load())
peer.AsyncSendNewBlock(block, td)
}

// Step 4: Handle EVN peers if full broadcast is required.
evnPeersCnt := 0
if h.needFullBroadcastInEVN(block) {
for _, peer := range peers[limit:] {
if !peer.ProxyedPeerFlag.Load() && peer.EVNPeerFlag.Load() {
log.Debug("Broadcast block to EVN peer",
"hash", hash, "peer", peer.ID(),
"EVNPeerFlag", peer.EVNPeerFlag.Load(),
"CanHandleBAL", peer.CanHandleBAL.Load(),
)
peer.AsyncSendNewBlock(block, td)
evnPeersCnt++
}
}
}

log.Debug("Propagated block", "hash", hash, "recipients", len(transfer), "extra", len(morePeers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
// Step 5: Log summary.
log.Debug("Propagated block",
"hash", hash,
"recipients", transferPeersCnt,
"proxyed", proxyedPeersCnt,
"evn", evnPeersCnt,
"duration", common.PrettyDuration(time.Since(block.ReceivedAt)),
)
return
}
// Otherwise if the block is indeed in our own chain, announce it
Expand Down Expand Up @@ -1017,7 +1056,7 @@ func (h *handler) BroadcastVote(vote *types.VoteEnvelope) {
if peer.bscExt == nil {
continue
}
if peer.EVNPeerFlag.Load() {
if peer.ProxyedPeerFlag.Load() || peer.EVNPeerFlag.Load() {
voteMap[peer] = vote
continue
}
Expand Down
18 changes: 18 additions & 0 deletions eth/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,24 @@ func (ps *peerSet) peer(id string) *ethPeer {
return ps.peers[id]
}

func (ps *peerSet) setProxyedPeers(proxyedNodeIdsMap map[enode.ID]struct{}) {
ps.lock.RLock()
peers := make([]*ethPeer, 0, len(ps.peers))
for _, peer := range ps.peers {
peers = append(peers, peer)
}
ps.lock.RUnlock()

proxyedPeerCnt := 0
for _, peer := range peers {
if _, ok := proxyedNodeIdsMap[peer.NodeID()]; ok {
peer.ProxyedPeerFlag.Store(true)
proxyedPeerCnt++
}
}
log.Debug("setProxyedPeers", "total", len(peers), "proxyedPeerCnt", proxyedPeerCnt)
}

// enableEVNFeatures enables the given features for the given peers.
func (ps *peerSet) enableEVNFeatures(validatorNodeIDsMap map[common.Address][]enode.ID, evnWhitelistMap map[enode.ID]struct{}) {
// clone current all peers, and update the validatorNodeIDsMap
Expand Down
4 changes: 4 additions & 0 deletions p2p/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ type Config struct {
// it usually used for sentry nodes
ProxyedValidatorAddresses []common.Address `toml:",omitempty"`

// ProxyedNodeIds lists node IDs that receive direct broadcasts of blocks and votes,
// excluding transactions, to prevent delays in block and vote propagation.
ProxyedNodeIds []enode.ID `toml:",omitempty"`

// Connectivity can be restricted to certain IP networks.
// If this option is set to a non-nil value, only hosts which match one of the
// IP networks contained in the list are considered.
Expand Down
6 changes: 6 additions & 0 deletions p2p/config_toml.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ type Peer struct {
// and won't broadcast any txs between EVN peers.
EVNPeerFlag atomic.Bool

// Indicates whether this peer is proxyed.
ProxyedPeerFlag atomic.Bool

// it indicates the peer can handle BAL(block access list) packet
CanHandleBAL atomic.Bool
}
Expand Down Expand Up @@ -648,7 +651,7 @@ func (p *Peer) Info() *PeerInfo {
info.Network.RemoteAddress = p.RemoteAddr().String()
info.Network.Inbound = p.rw.is(inboundConn)
// After Maxwell, we treat all EVN peers as trusted
info.Network.Trusted = p.rw.is(trustedConn) || p.EVNPeerFlag.Load()
info.Network.Trusted = p.rw.is(trustedConn) || p.EVNPeerFlag.Load() || p.ProxyedPeerFlag.Load()
info.Network.Static = p.rw.is(staticDialedConn)

// Gather all the running protocol infos
Expand Down