Skip to content
34 changes: 25 additions & 9 deletions cmd/geth/chaincmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ func initNetwork(ctx *cli.Context) error {
staticConnect = true
}

configs, enodes, err := createConfigs(config, initDir, "node", ips, ports, sentryEnodes, connectOneExtraEnodes, staticConnect)
configs, enodes, accounts, err := createConfigs(config, initDir, "node", ips, ports, sentryEnodes, connectOneExtraEnodes, staticConnect)
if err != nil {
utils.Fatalf("Failed to create node configs: %v", err)
}
Expand All @@ -489,6 +489,11 @@ func initNetwork(ctx *cli.Context) error {
nodeIDs[i] = enodes[i].ID()
}
// add more feature configs
if enableSentryNode {
for i := 0; i < len(sentryConfigs); i++ {
sentryConfigs[i].Node.P2P.ProxyedValidatorAddresses = accounts[i]
}
}
if ctx.Bool(utils.InitEVNValidatorWhitelist.Name) {
for i := 0; i < size; i++ {
configs[i].Node.P2P.EVNNodeIdsWhitelist = nodeIDs
Expand All @@ -501,7 +506,10 @@ func initNetwork(ctx *cli.Context) error {
}
if enableSentryNode && ctx.Bool(utils.InitEVNSentryWhitelist.Name) {
for i := 0; i < len(sentryConfigs); i++ {
sentryConfigs[i].Node.P2P.EVNNodeIdsWhitelist = sentryNodeIDs
// whitelist all sentry nodes + proxyed validator NodeID
wlNodeIDs := []enode.ID{nodeIDs[i]}
wlNodeIDs = append(wlNodeIDs, sentryNodeIDs...)
sentryConfigs[i].Node.P2P.EVNNodeIdsWhitelist = wlNodeIDs
}
}
if enableSentryNode && ctx.Bool(utils.InitEVNSentryRegister.Name) {
Expand Down Expand Up @@ -555,8 +563,11 @@ func createSentryNodeConfigs(ctx *cli.Context, baseConfig gethConfig, initDir st
if err != nil {
utils.Fatalf("Failed to parse ports: %v", err)
}

return createConfigs(baseConfig, initDir, "sentry", ips, ports, nil, false, true)
configs, enodes, _, err := createConfigs(baseConfig, initDir, "sentry", ips, ports, nil, false, true)
if err != nil {
utils.Fatalf("Failed to create config: %v", err)
}
return configs, enodes, nil
}

func createAndSaveFullNodeConfigs(ctx *cli.Context, inGenesisFile *os.File, baseConfig gethConfig, initDir string, extraEnodes []*enode.Node) ([]gethConfig, []*enode.Node, error) {
Expand All @@ -575,7 +586,7 @@ func createAndSaveFullNodeConfigs(ctx *cli.Context, inGenesisFile *os.File, base
utils.Fatalf("Failed to parse ports: %v", err)
}

configs, enodes, err := createConfigs(baseConfig, initDir, "fullnode", ips, ports, extraEnodes, false, false)
configs, enodes, _, err := createConfigs(baseConfig, initDir, "fullnode", ips, ports, extraEnodes, false, false)
if err != nil {
utils.Fatalf("Failed to create config: %v", err)
}
Expand All @@ -590,19 +601,24 @@ func createAndSaveFullNodeConfigs(ctx *cli.Context, inGenesisFile *os.File, base
return configs, enodes, nil
}

func createConfigs(base gethConfig, initDir string, prefix string, ips []string, ports []int, extraEnodes []*enode.Node, connectOneExtraEnodes bool, staticConnect bool) ([]gethConfig, []*enode.Node, error) {
func createConfigs(base gethConfig, initDir string, prefix string, ips []string, ports []int, extraEnodes []*enode.Node, connectOneExtraEnodes bool, staticConnect bool) ([]gethConfig, []*enode.Node, [][]common.Address, error) {
if len(ips) != len(ports) {
return nil, nil, errors.New("mismatch of size and length of ports")
return nil, nil, nil, errors.New("mismatch of size and length of ports")
}
size := len(ips)
enodes := make([]*enode.Node, size)
accounts := make([][]common.Address, size)
for i := 0; i < size; i++ {
nodeConfig := base.Node
nodeConfig.DataDir = path.Join(initDir, fmt.Sprintf("%s%d", prefix, i))
stack, err := node.New(&nodeConfig)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if err := setAccountManagerBackends(stack.Config(), stack.AccountManager(), stack.KeyStoreDir()); err != nil {
utils.Fatalf("Failed to set account manager backends: %v", err)
}
accounts[i] = stack.AccountManager().Accounts()
pk := stack.Config().NodeKey()
enodes[i] = enode.NewV4(&pk.PublicKey, net.ParseIP(ips[i]), ports[i], ports[i])
}
Expand All @@ -618,7 +634,7 @@ func createConfigs(base gethConfig, initDir string, prefix string, ips []string,
}
configs[i] = createNodeConfig(base, ips[i], ports[i], allEnodes, index, staticConnect)
}
return configs, enodes, nil
return configs, enodes, accounts, nil
}

func writeConfig(inGenesisFile *os.File, config gethConfig, dir string) error {
Expand Down
32 changes: 16 additions & 16 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,22 +377,22 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// Permit the downloader to use the trie cache allowance during fast sync
cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit
if eth.handler, err = newHandler(&handlerConfig{
NodeID: eth.p2pServer.Self().ID(),
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Network: networkID,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
RequiredBlocks: config.RequiredBlocks,
DirectBroadcast: config.DirectBroadcast,
EnableEVNFeatures: stack.Config().EnableEVNFeatures,
EVNNodeIdsWhitelist: stack.Config().P2P.EVNNodeIdsWhitelist,
ProxyedValidatorNodeIDs: stack.Config().P2P.ProxyedValidatorNodeIDs,
DisablePeerTxBroadcast: config.DisablePeerTxBroadcast,
PeerSet: peers,
EnableQuickBlockFetching: stack.Config().EnableQuickBlockFetching,
NodeID: eth.p2pServer.Self().ID(),
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Network: networkID,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
RequiredBlocks: config.RequiredBlocks,
DirectBroadcast: config.DirectBroadcast,
EnableEVNFeatures: stack.Config().EnableEVNFeatures,
EVNNodeIdsWhitelist: stack.Config().P2P.EVNNodeIdsWhitelist,
ProxyedValidatorAddresses: stack.Config().P2P.ProxyedValidatorAddresses,
DisablePeerTxBroadcast: config.DisablePeerTxBroadcast,
PeerSet: peers,
EnableQuickBlockFetching: stack.Config().EnableQuickBlockFetching,
}); err != nil {
return nil, err
}
Expand Down
131 changes: 66 additions & 65 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,33 +115,33 @@ type votePool interface {
// handlerConfig is the collection of initialization parameters to create a full
// node network handler.
type handlerConfig struct {
NodeID enode.ID // P2P node ID used for tx propagation topology
Database ethdb.Database // Database for direct sync insertions
Chain *core.BlockChain // Blockchain to serve data from
TxPool txPool // Transaction pool to propagate from
VotePool votePool
Network uint64 // Network identifier to adfvertise
Sync ethconfig.SyncMode // Whether to snap or full sync
BloomCache uint64 // Megabytes to alloc for snap sync bloom
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
RequiredBlocks map[uint64]common.Hash // Hard coded map of required block hashes for sync challenges
DirectBroadcast bool
DisablePeerTxBroadcast bool
PeerSet *peerSet
EnableQuickBlockFetching bool
EnableEVNFeatures bool
EVNNodeIdsWhitelist []enode.ID
ProxyedValidatorNodeIDs []enode.ID
NodeID enode.ID // P2P node ID used for tx propagation topology
Database ethdb.Database // Database for direct sync insertions
Chain *core.BlockChain // Blockchain to serve data from
TxPool txPool // Transaction pool to propagate from
VotePool votePool
Network uint64 // Network identifier to adfvertise
Sync ethconfig.SyncMode // Whether to snap or full sync
BloomCache uint64 // Megabytes to alloc for snap sync bloom
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
RequiredBlocks map[uint64]common.Hash // Hard coded map of required block hashes for sync challenges
DirectBroadcast bool
DisablePeerTxBroadcast bool
PeerSet *peerSet
EnableQuickBlockFetching bool
EnableEVNFeatures bool
EVNNodeIdsWhitelist []enode.ID
ProxyedValidatorAddresses []common.Address
}

type handler struct {
nodeID enode.ID
networkID uint64
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
disablePeerTxBroadcast bool
enableEVNFeatures bool
evnNodeIdsWhitelistMap map[enode.ID]struct{}
proxyedValidatorNodeIDMap map[enode.ID]struct{}
nodeID enode.ID
networkID uint64
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
disablePeerTxBroadcast bool
enableEVNFeatures bool
evnNodeIdsWhitelistMap map[enode.ID]struct{}
proxyedValidatorAddressMap map[common.Address]struct{}

snapSync atomic.Bool // Flag whether snap sync is enabled (gets disabled if we already have blocks)
synced atomic.Bool // Flag whether we're considered synchronised (enables transaction processing)
Expand Down Expand Up @@ -196,32 +196,32 @@ func newHandler(config *handlerConfig) (*handler, error) {
config.PeerSet = newPeerSet() // Nicety initialization for tests
}
h := &handler{
nodeID: config.NodeID,
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Chain),
disablePeerTxBroadcast: config.DisablePeerTxBroadcast,
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
votepool: config.VotePool,
chain: config.Chain,
peers: config.PeerSet,
peersPerIP: make(map[string]int),
requiredBlocks: config.RequiredBlocks,
directBroadcast: config.DirectBroadcast,
enableEVNFeatures: config.EnableEVNFeatures,
evnNodeIdsWhitelistMap: make(map[enode.ID]struct{}),
proxyedValidatorNodeIDMap: make(map[enode.ID]struct{}),
quitSync: make(chan struct{}),
handlerDoneCh: make(chan struct{}),
handlerStartCh: make(chan struct{}),
stopCh: make(chan struct{}),
nodeID: config.NodeID,
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Chain),
disablePeerTxBroadcast: config.DisablePeerTxBroadcast,
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
votepool: config.VotePool,
chain: config.Chain,
peers: config.PeerSet,
peersPerIP: make(map[string]int),
requiredBlocks: config.RequiredBlocks,
directBroadcast: config.DirectBroadcast,
enableEVNFeatures: config.EnableEVNFeatures,
evnNodeIdsWhitelistMap: make(map[enode.ID]struct{}),
proxyedValidatorAddressMap: make(map[common.Address]struct{}),
quitSync: make(chan struct{}),
handlerDoneCh: make(chan struct{}),
handlerStartCh: make(chan struct{}),
stopCh: make(chan struct{}),
}
for _, nodeID := range config.EVNNodeIdsWhitelist {
h.evnNodeIdsWhitelistMap[nodeID] = struct{}{}
}
for _, nodeID := range config.ProxyedValidatorNodeIDs {
h.proxyedValidatorNodeIDMap[nodeID] = struct{}{}
for _, address := range config.ProxyedValidatorAddresses {
h.proxyedValidatorAddressMap[address] = struct{}{}
}
if config.Sync == ethconfig.FullSync {
// The database seems empty as the current block is the genesis. Yet the snap
Expand Down Expand Up @@ -333,6 +333,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
block := types.NewBlockWithHeader(item.Header).WithBody(types.Body{Transactions: item.Txs, Uncles: item.Uncles})
block = block.WithSidecars(item.Sidecars)
block.ReceivedAt = time.Now()
block.ReceivedFrom = p.ID()
if err := block.SanityCheck(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -399,7 +400,7 @@ func (h *handler) protoTracker() {
if h.enableEVNFeatures {
// add onchain validator p2p node list later, it will enable the direct broadcast + no tx broadcast feature
// here check & enable peer broadcast features periodically, and it's a simple way to handle the peer change and the list change scenarios.
h.peers.enableEVNFeatures(h.queryValidatorNodeIDsMap(), h.evnNodeIdsWhitelistMap, h.proxyedValidatorNodeIDMap)
h.peers.enableEVNFeatures(h.queryValidatorNodeIDsMap(), h.evnNodeIdsWhitelistMap)
}
case <-h.quitSync:
// Wait for all active handlers to finish.
Expand Down Expand Up @@ -833,38 +834,34 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
}

for _, peer := range transfer {
log.Debug("broadcast block to peer", "hash", hash, "peer", peer.ID(), "ProxyedValidatorFlag", peer.ProxyedValidatorFlag.Load(), "EVNPeerFlag", peer.EVNPeerFlag.Load())
log.Debug("broadcast block to peer", "hash", hash, "peer", peer.ID(), "EVNPeerFlag", peer.EVNPeerFlag.Load())
peer.AsyncSendNewBlock(block, td)
}

// check if the block should be broadcast to more peers in EVN
fullBroadcastInEVN := h.needFullBroadcastInEVN(block)
var morePeers []*ethPeer
for i := len(transfer); i < len(peers); i++ {
if peers[i].ProxyedValidatorFlag.Load() {
morePeers = append(morePeers, peers[i])
continue
if h.needFullBroadcastInEVN(block) {
for i := len(transfer); i < len(peers); i++ {
if peers[i].EVNPeerFlag.Load() {
morePeers = append(morePeers, peers[i])
}
}
if fullBroadcastInEVN && peers[i].EVNPeerFlag.Load() {
morePeers = append(morePeers, peers[i])
continue
for _, peer := range morePeers {
log.Debug("broadcast block to extra peer", "hash", hash, "peer", peer.ID(), "EVNPeerFlag", peer.EVNPeerFlag.Load())
peer.AsyncSendNewBlock(block, td)
}
}
for _, peer := range morePeers {
log.Debug("broadcast block to extra peer", "hash", hash, "peer", peer.ID(), "ProxyedValidatorFlag", peer.ProxyedValidatorFlag.Load(), "EVNPeerFlag", peer.EVNPeerFlag.Load())
peer.AsyncSendNewBlock(block, td)
}

log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "extra", len(morePeers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
log.Debug("Propagated block", "hash", hash, "recipients", len(transfer), "extra", len(morePeers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
return
}
// Otherwise if the block is indeed in our own chain, announce it
if h.chain.HasBlock(hash, block.NumberU64()) {
for _, peer := range peers {
log.Debug("Announced block to peer", "hash", hash, "peer", peer.ID(), "ProxyedValidatorFlag", peer.ProxyedValidatorFlag.Load(), "EVNPeerFlag", peer.EVNPeerFlag.Load())
log.Debug("Announced block to peer", "hash", hash, "peer", peer.ID(), "EVNPeerFlag", peer.EVNPeerFlag.Load())
peer.AsyncSendNewBlockHash(block)
}
log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
log.Debug("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
}
}

Expand All @@ -875,15 +872,19 @@ func (h *handler) needFullBroadcastInEVN(block *types.Block) bool {
if !h.enableEVNFeatures {
return false
}

parlia, ok := h.chain.Engine().(*parlia.Parlia)
if !ok {
return false
}
if parlia.ConsensusAddress() == block.Coinbase() {
coinbase := block.Coinbase()
// check whether the block is created by self
if parlia.ConsensusAddress() == coinbase {
log.Debug("full broadcast mined block to EVN", "coinbase", coinbase)
return true
}

return h.peers.isProxyedValidator(block.Coinbase(), h.proxyedValidatorNodeIDMap)
return h.peers.isProxyedValidator(coinbase, h.proxyedValidatorAddressMap)
}

func (h *handler) queryValidatorNodeIDsMap() map[common.Address][]enode.ID {
Expand Down
Loading