diff --git a/common/format.go b/common/format.go index 7af41f52d5..fef1447018 100644 --- a/common/format.go +++ b/common/format.go @@ -80,3 +80,23 @@ func (t PrettyAge) String() string { } return result } + +func FormatMilliTime(n int64) string { + if n < 0 { + return "invalid" + } + if n == 0 { + return "" + } + return time.UnixMilli(n).Format("2006-01-02 15:04:05.000") +} + +func FormatUnixTime(n int64) string { + if n < 0 { + return "invalid" + } + if n == 0 { + return "" + } + return time.Unix(n, 0).Format("2006-01-02 15:04:05.000") +} diff --git a/core/blockchain.go b/core/blockchain.go index 6a39dcad42..1da15bd0f2 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -292,6 +292,7 @@ type BlockChain struct { bodyRLPCache *lru.Cache[common.Hash, rlp.RawValue] receiptsCache *lru.Cache[common.Hash, []*types.Receipt] blockCache *lru.Cache[common.Hash, *types.Block] + recvTimeCache *lru.Cache[common.Hash, int64] txLookupLock sync.RWMutex txLookupCache *lru.Cache[common.Hash, txLookup] @@ -383,6 +384,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit), sidecarsCache: lru.NewCache[common.Hash, types.BlobSidecars](sidecarsCacheLimit), blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit), + recvTimeCache: lru.NewCache[common.Hash, int64](blockCacheLimit), txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit), futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks), diffLayerCache: diffLayerCache, @@ -1140,6 +1142,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha bc.receiptsCache.Purge() bc.sidecarsCache.Purge() bc.blockCache.Purge() + bc.recvTimeCache.Purge() bc.txLookupCache.Purge() bc.futureBlocks.Purge() @@ -1778,6 +1781,9 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. defer wg.Wait() wg.Add(1) go func() { + if _, ok := bc.recvTimeCache.Get(block.Hash()); !ok { + bc.recvTimeCache.Add(block.Hash(), time.Now().UnixMilli()) + } blockBatch := bc.db.BlockStore().NewBatch() rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd) rawdb.WriteBlock(blockBatch, block) @@ -2072,6 +2078,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness // Start the parallel header verifier headers := make([]*types.Header, len(chain)) for i, block := range chain { + bc.recvTimeCache.Add(block.Hash(), time.Now().UnixMilli()) headers[i] = block.Header() } abort, results := bc.engine.VerifyHeaders(bc, headers) diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index eb0832509d..5ec5cac982 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -199,6 +199,14 @@ func (bc *BlockChain) GetBlock(hash common.Hash, number uint64) *types.Block { return block } +func (bc *BlockChain) GetRecvTime(hash common.Hash) int64 { + t, ok := bc.recvTimeCache.Get(hash) + if !ok { + return 0 + } + return t +} + // GetBlockByHash retrieves a block from the database by hash, caching it if found. func (bc *BlockChain) GetBlockByHash(hash common.Hash) *types.Block { number := bc.hc.GetBlockNumber(hash) diff --git a/core/vote/vote_pool.go b/core/vote/vote_pool.go index 3b64e2cea3..e1f240874b 100644 --- a/core/vote/vote_pool.go +++ b/core/vote/vote_pool.go @@ -3,6 +3,7 @@ package vote import ( "container/heap" "sync" + "time" mapset "github.com/deckarep/golang-set/v2" @@ -25,6 +26,8 @@ const ( upperLimitOfVoteBlockNumber = 11 // refer to fetcher.maxUncleDist highestVerifiedBlockChanSize = 10 // highestVerifiedBlockChanSize is the size of channel listening to HighestVerifiedBlockEvent. + + defaultMajorityThreshold = 15 // this is an inaccurate value, mainly used for metric acquisition ) var ( @@ -38,8 +41,25 @@ var ( ) type VoteBox struct { - blockNumber uint64 - voteMessages []*types.VoteEnvelope + blockNumber uint64 + voteMessages []*types.VoteEnvelope + majorityVoteTime int64 +} + +func (v *VoteBox) trySetMajorityVoteTime(oldTime int64) { + if v.majorityVoteTime > 0 { + if oldTime > 0 && v.majorityVoteTime > oldTime { + v.majorityVoteTime = oldTime + } + return + } + if oldTime > 0 { + v.majorityVoteTime = oldTime + return + } + if len(v.voteMessages) >= defaultMajorityThreshold { + v.majorityVoteTime = time.Now().UnixMilli() + } } type VotePool struct { @@ -197,6 +217,7 @@ func (pool *VotePool) putVote(m map[common.Hash]*VoteBox, votesPq *votesPriority // Put into corresponding votes map. m[targetHash].voteMessages = append(m[targetHash].voteMessages, vote) + m[targetHash].trySetMajorityVoteTime(0) // Add into received vote to avoid future duplicated vote comes. pool.receivedVotes.Add(voteHash) log.Debug("VoteHash put into votepool is:", "voteHash", voteHash) @@ -269,10 +290,15 @@ func (pool *VotePool) transfer(blockHash common.Hash) { // may len(curVotes[blockHash].voteMessages) extra maxCurVoteAmountPerBlock, but it doesn't matter if _, ok := curVotes[blockHash]; !ok { heap.Push(curPq, voteData) - curVotes[blockHash] = &VoteBox{voteBox.blockNumber, validVotes} + curVotes[blockHash] = &VoteBox{ + blockNumber: voteBox.blockNumber, + voteMessages: validVotes, + majorityVoteTime: voteBox.majorityVoteTime, + } localCurVotesPqGauge.Update(int64(curPq.Len())) } else { curVotes[blockHash].voteMessages = append(curVotes[blockHash].voteMessages, validVotes...) + curVotes[blockHash].trySetMajorityVoteTime(voteBox.majorityVoteTime) } delete(futureVotes, blockHash) @@ -322,6 +348,17 @@ func (pool *VotePool) GetVotes() []*types.VoteEnvelope { return votesRes } +func (pool *VotePool) GetMajorityVoteTime(hash common.Hash) int64 { + pool.mu.RLock() + defer pool.mu.RUnlock() + + vb := pool.curVotes[hash] + if vb == nil { + return 0 + } + return vb.majorityVoteTime +} + func (pool *VotePool) FetchVoteByBlockHash(blockHash common.Hash) []*types.VoteEnvelope { pool.mu.RLock() defer pool.mu.RUnlock() diff --git a/eth/backend.go b/eth/backend.go index 8214984b59..cadcb558bf 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -26,6 +26,8 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -117,7 +119,8 @@ type Ethereum struct { shutdownTracker *shutdowncheck.ShutdownTracker // Tracks if and when the node has shutdown ungracefully - votePool *vote.VotePool + votePool *vote.VotePool + stopReportCh chan struct{} } // New creates a new Ethereum object (including the initialisation of the common Ethereum object), @@ -240,6 +243,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { p2pServer: stack.Server(), discmix: enode.NewFairMix(0), shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb), + stopReportCh: make(chan struct{}, 1), } eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil} @@ -599,6 +603,8 @@ func (s *Ethereum) Start() error { // Start the networking layer s.handler.Start(s.p2pServer.MaxPeers, s.p2pServer.MaxPeersPerIP) + + go s.reportRecentBlocksLoop() return nil } @@ -676,5 +682,34 @@ func (s *Ethereum) Stop() error { s.chainDb.Close() s.eventMux.Stop() + // stop report loop + s.stopReportCh <- struct{}{} return nil } + +func (s *Ethereum) reportRecentBlocksLoop() { + reportCnt := uint64(2) + reportTicker := time.NewTicker(time.Second) + for { + select { + case <-reportTicker.C: + cur := s.blockchain.CurrentBlock() + if cur == nil || cur.Number.Uint64() <= reportCnt { + continue + } + num := cur.Number.Uint64() + hash := cur.Hash() + records := make(map[string]interface{}) + records["BlockNum"] = num + records["RecvBlockAt"] = common.FormatMilliTime(s.blockchain.GetRecvTime(hash)) + records["Coinbase"] = cur.Coinbase.String() + records["BlockTime"] = common.FormatUnixTime(int64(cur.Time)) + if s.votePool != nil { + records["MajorityVotesAt"] = common.FormatMilliTime(s.votePool.GetMajorityVoteTime(hash)) + } + metrics.GetOrRegisterLabel("report-blocks", nil).Mark(records) + case <-s.stopReportCh: + return + } + } +} diff --git a/p2p/metrics.go b/p2p/metrics.go index d713a2fc8d..493acbf035 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -68,6 +68,8 @@ var ( serveUnexpectedIdentity = metrics.NewRegisteredMeter("p2p/serves/error/id/unexpected", nil) serveEncHandshakeError = metrics.NewRegisteredMeter("p2p/serves/error/rlpx/enc", nil) serveProtoHandshakeError = metrics.NewRegisteredMeter("p2p/serves/error/rlpx/proto", nil) + + peerLatencyStat = metrics.NewRegisteredTimer("p2p/peers/latency", nil) ) // markDialError matches errors that occur while setting up a dial connection diff --git a/p2p/peer.go b/p2p/peer.go index bd45dba5cf..1244bc5ca7 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -23,6 +23,7 @@ import ( "net" "slices" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common/mclock" @@ -113,12 +114,15 @@ type Peer struct { protoErr chan error closed chan struct{} pingRecv chan struct{} + pongRecv chan struct{} disc chan DiscReason // events receives message send / receive events if set events *event.Feed testPipe *MsgPipeRW // for testing testRemoteAddr string // for testing + + latency atomic.Int64 // mill second latency, estimated by ping msg } // NewPeer returns a peer for testing purposes. @@ -268,6 +272,7 @@ func newPeer(log log.Logger, conn *conn, protocols []Protocol) *Peer { protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop closed: make(chan struct{}), pingRecv: make(chan struct{}, 16), + pongRecv: make(chan struct{}, 16), log: log.New("id", conn.node.ID(), "conn", conn.flags), } return p @@ -333,9 +338,11 @@ func (p *Peer) pingLoop() { ping := time.NewTimer(pingInterval) defer ping.Stop() + var startPing atomic.Int64 for { select { case <-ping.C: + startPing.Store(time.Now().UnixMilli()) if err := SendItems(p.rw, pingMsg); err != nil { p.protoErr <- err return @@ -345,6 +352,13 @@ func (p *Peer) pingLoop() { case <-p.pingRecv: SendItems(p.rw, pongMsg) + case <-p.pongRecv: + // estimate latency here, it also includes tiny msg encode/decode, io wait time + latency := (time.Now().UnixMilli() - startPing.Load()) / 2 + if latency > 0 { + p.latency.Store(latency) + peerLatencyStat.Update(time.Duration(latency)) + } case <-p.closed: return } @@ -375,6 +389,12 @@ func (p *Peer) handle(msg Msg) error { case p.pingRecv <- struct{}{}: case <-p.closed: } + case msg.Code == pongMsg: + msg.Discard() + select { + case p.pongRecv <- struct{}{}: + case <-p.closed: + } case msg.Code == discMsg: // This is the last message. We don't need to discard or // check errors because, the connection will be closed after it. @@ -557,6 +577,7 @@ type PeerInfo struct { Static bool `json:"static"` } `json:"network"` Protocols map[string]interface{} `json:"protocols"` // Sub-protocol specific metadata fields + Latency int64 `json:"latency"` // the estimate latency from ping msg } // Info gathers and returns a collection of metadata known about a peer. @@ -573,6 +594,7 @@ func (p *Peer) Info() *PeerInfo { Name: p.Fullname(), Caps: caps, Protocols: make(map[string]interface{}, len(p.running)), + Latency: p.latency.Load(), } if p.Node().Seq() > 0 { info.ENR = p.Node().String()