Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions common/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,23 @@ func (t PrettyAge) String() string {
}
return result
}

func FormatMilliTime(n int64) string {
if n < 0 {
return "invalid"
}
if n == 0 {
return ""
}
return time.UnixMilli(n).Format("2006-01-02 15:04:05.000")
}

func FormatUnixTime(n int64) string {
if n < 0 {
return "invalid"
}
if n == 0 {
return ""
}
return time.Unix(n, 0).Format("2006-01-02 15:04:05.000")
}
7 changes: 7 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ type BlockChain struct {
bodyRLPCache *lru.Cache[common.Hash, rlp.RawValue]
receiptsCache *lru.Cache[common.Hash, []*types.Receipt]
blockCache *lru.Cache[common.Hash, *types.Block]
recvTimeCache *lru.Cache[common.Hash, int64]

txLookupLock sync.RWMutex
txLookupCache *lru.Cache[common.Hash, txLookup]
Expand Down Expand Up @@ -383,6 +384,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit),
sidecarsCache: lru.NewCache[common.Hash, types.BlobSidecars](sidecarsCacheLimit),
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
recvTimeCache: lru.NewCache[common.Hash, int64](blockCacheLimit),
txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit),
futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks),
diffLayerCache: diffLayerCache,
Expand Down Expand Up @@ -1140,6 +1142,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
bc.receiptsCache.Purge()
bc.sidecarsCache.Purge()
bc.blockCache.Purge()
bc.recvTimeCache.Purge()
bc.txLookupCache.Purge()
bc.futureBlocks.Purge()

Expand Down Expand Up @@ -1778,6 +1781,9 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
defer wg.Wait()
wg.Add(1)
go func() {
if _, ok := bc.recvTimeCache.Get(block.Hash()); !ok {
bc.recvTimeCache.Add(block.Hash(), time.Now().UnixMilli())
}
blockBatch := bc.db.BlockStore().NewBatch()
rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd)
rawdb.WriteBlock(blockBatch, block)
Expand Down Expand Up @@ -2072,6 +2078,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness
// Start the parallel header verifier
headers := make([]*types.Header, len(chain))
for i, block := range chain {
bc.recvTimeCache.Add(block.Hash(), time.Now().UnixMilli())
headers[i] = block.Header()
}
abort, results := bc.engine.VerifyHeaders(bc, headers)
Expand Down
8 changes: 8 additions & 0 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,14 @@ func (bc *BlockChain) GetBlock(hash common.Hash, number uint64) *types.Block {
return block
}

func (bc *BlockChain) GetRecvTime(hash common.Hash) int64 {
t, ok := bc.recvTimeCache.Get(hash)
if !ok {
return 0
}
return t
}

// GetBlockByHash retrieves a block from the database by hash, caching it if found.
func (bc *BlockChain) GetBlockByHash(hash common.Hash) *types.Block {
number := bc.hc.GetBlockNumber(hash)
Expand Down
43 changes: 40 additions & 3 deletions core/vote/vote_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package vote
import (
"container/heap"
"sync"
"time"

mapset "github.com/deckarep/golang-set/v2"

Expand All @@ -25,6 +26,8 @@ const (
upperLimitOfVoteBlockNumber = 11 // refer to fetcher.maxUncleDist

highestVerifiedBlockChanSize = 10 // highestVerifiedBlockChanSize is the size of channel listening to HighestVerifiedBlockEvent.

defaultMajorityThreshold = 15 // this is an inaccurate value, mainly used for metric acquisition
)

var (
Expand All @@ -38,8 +41,25 @@ var (
)

type VoteBox struct {
blockNumber uint64
voteMessages []*types.VoteEnvelope
blockNumber uint64
voteMessages []*types.VoteEnvelope
majorityVoteTime int64
}

func (v *VoteBox) trySetMajorityVoteTime(oldTime int64) {
if v.majorityVoteTime > 0 {
if oldTime > 0 && v.majorityVoteTime > oldTime {
v.majorityVoteTime = oldTime
}
return
}
if oldTime > 0 {
v.majorityVoteTime = oldTime
return
}
if len(v.voteMessages) >= defaultMajorityThreshold {
v.majorityVoteTime = time.Now().UnixMilli()
}
}

type VotePool struct {
Expand Down Expand Up @@ -197,6 +217,7 @@ func (pool *VotePool) putVote(m map[common.Hash]*VoteBox, votesPq *votesPriority

// Put into corresponding votes map.
m[targetHash].voteMessages = append(m[targetHash].voteMessages, vote)
m[targetHash].trySetMajorityVoteTime(0)
// Add into received vote to avoid future duplicated vote comes.
pool.receivedVotes.Add(voteHash)
log.Debug("VoteHash put into votepool is:", "voteHash", voteHash)
Expand Down Expand Up @@ -269,10 +290,15 @@ func (pool *VotePool) transfer(blockHash common.Hash) {
// may len(curVotes[blockHash].voteMessages) extra maxCurVoteAmountPerBlock, but it doesn't matter
if _, ok := curVotes[blockHash]; !ok {
heap.Push(curPq, voteData)
curVotes[blockHash] = &VoteBox{voteBox.blockNumber, validVotes}
curVotes[blockHash] = &VoteBox{
blockNumber: voteBox.blockNumber,
voteMessages: validVotes,
majorityVoteTime: voteBox.majorityVoteTime,
}
localCurVotesPqGauge.Update(int64(curPq.Len()))
} else {
curVotes[blockHash].voteMessages = append(curVotes[blockHash].voteMessages, validVotes...)
curVotes[blockHash].trySetMajorityVoteTime(voteBox.majorityVoteTime)
}

delete(futureVotes, blockHash)
Expand Down Expand Up @@ -322,6 +348,17 @@ func (pool *VotePool) GetVotes() []*types.VoteEnvelope {
return votesRes
}

func (pool *VotePool) GetMajorityVoteTime(hash common.Hash) int64 {
pool.mu.RLock()
defer pool.mu.RUnlock()

vb := pool.curVotes[hash]
if vb == nil {
return 0
}
return vb.majorityVoteTime
}

func (pool *VotePool) FetchVoteByBlockHash(blockHash common.Hash) []*types.VoteEnvelope {
pool.mu.RLock()
defer pool.mu.RUnlock()
Expand Down
37 changes: 36 additions & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"sync"
"time"

"github.com/ethereum/go-ethereum/metrics"

"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
Expand Down Expand Up @@ -117,7 +119,8 @@ type Ethereum struct {

shutdownTracker *shutdowncheck.ShutdownTracker // Tracks if and when the node has shutdown ungracefully

votePool *vote.VotePool
votePool *vote.VotePool
stopReportCh chan struct{}
}

// New creates a new Ethereum object (including the initialisation of the common Ethereum object),
Expand Down Expand Up @@ -240,6 +243,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
p2pServer: stack.Server(),
discmix: enode.NewFairMix(0),
shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb),
stopReportCh: make(chan struct{}, 1),
}

eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil}
Expand Down Expand Up @@ -599,6 +603,8 @@ func (s *Ethereum) Start() error {

// Start the networking layer
s.handler.Start(s.p2pServer.MaxPeers, s.p2pServer.MaxPeersPerIP)

go s.reportRecentBlocksLoop()
return nil
}

Expand Down Expand Up @@ -676,5 +682,34 @@ func (s *Ethereum) Stop() error {
s.chainDb.Close()
s.eventMux.Stop()

// stop report loop
s.stopReportCh <- struct{}{}
return nil
}

func (s *Ethereum) reportRecentBlocksLoop() {
reportCnt := uint64(2)
reportTicker := time.NewTicker(time.Second)
for {
select {
case <-reportTicker.C:
cur := s.blockchain.CurrentBlock()
if cur == nil || cur.Number.Uint64() <= reportCnt {
continue
}
num := cur.Number.Uint64()
hash := cur.Hash()
records := make(map[string]interface{})
records["BlockNum"] = num
records["RecvBlockAt"] = common.FormatMilliTime(s.blockchain.GetRecvTime(hash))
records["Coinbase"] = cur.Coinbase.String()
records["BlockTime"] = common.FormatUnixTime(int64(cur.Time))
if s.votePool != nil {
records["MajorityVotesAt"] = common.FormatMilliTime(s.votePool.GetMajorityVoteTime(hash))
}
metrics.GetOrRegisterLabel("report-blocks", nil).Mark(records)
case <-s.stopReportCh:
return
}
}
}
2 changes: 2 additions & 0 deletions p2p/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ var (
serveUnexpectedIdentity = metrics.NewRegisteredMeter("p2p/serves/error/id/unexpected", nil)
serveEncHandshakeError = metrics.NewRegisteredMeter("p2p/serves/error/rlpx/enc", nil)
serveProtoHandshakeError = metrics.NewRegisteredMeter("p2p/serves/error/rlpx/proto", nil)

peerLatencyStat = metrics.NewRegisteredTimer("p2p/peers/latency", nil)
)

// markDialError matches errors that occur while setting up a dial connection
Expand Down
22 changes: 22 additions & 0 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net"
"slices"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common/mclock"
Expand Down Expand Up @@ -113,12 +114,15 @@ type Peer struct {
protoErr chan error
closed chan struct{}
pingRecv chan struct{}
pongRecv chan struct{}
disc chan DiscReason

// events receives message send / receive events if set
events *event.Feed
testPipe *MsgPipeRW // for testing
testRemoteAddr string // for testing

latency atomic.Int64 // mill second latency, estimated by ping msg
}

// NewPeer returns a peer for testing purposes.
Expand Down Expand Up @@ -268,6 +272,7 @@ func newPeer(log log.Logger, conn *conn, protocols []Protocol) *Peer {
protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop
closed: make(chan struct{}),
pingRecv: make(chan struct{}, 16),
pongRecv: make(chan struct{}, 16),
log: log.New("id", conn.node.ID(), "conn", conn.flags),
}
return p
Expand Down Expand Up @@ -333,9 +338,11 @@ func (p *Peer) pingLoop() {
ping := time.NewTimer(pingInterval)
defer ping.Stop()

var startPing atomic.Int64
for {
select {
case <-ping.C:
startPing.Store(time.Now().UnixMilli())
if err := SendItems(p.rw, pingMsg); err != nil {
p.protoErr <- err
return
Expand All @@ -345,6 +352,13 @@ func (p *Peer) pingLoop() {
case <-p.pingRecv:
SendItems(p.rw, pongMsg)

case <-p.pongRecv:
// estimate latency here, it also includes tiny msg encode/decode, io wait time
latency := (time.Now().UnixMilli() - startPing.Load()) / 2
if latency > 0 {
p.latency.Store(latency)
peerLatencyStat.Update(time.Duration(latency))
}
case <-p.closed:
return
}
Expand Down Expand Up @@ -375,6 +389,12 @@ func (p *Peer) handle(msg Msg) error {
case p.pingRecv <- struct{}{}:
case <-p.closed:
}
case msg.Code == pongMsg:
msg.Discard()
select {
case p.pongRecv <- struct{}{}:
case <-p.closed:
}
case msg.Code == discMsg:
// This is the last message. We don't need to discard or
// check errors because, the connection will be closed after it.
Expand Down Expand Up @@ -557,6 +577,7 @@ type PeerInfo struct {
Static bool `json:"static"`
} `json:"network"`
Protocols map[string]interface{} `json:"protocols"` // Sub-protocol specific metadata fields
Latency int64 `json:"latency"` // the estimate latency from ping msg
}

// Info gathers and returns a collection of metadata known about a peer.
Expand All @@ -573,6 +594,7 @@ func (p *Peer) Info() *PeerInfo {
Name: p.Fullname(),
Caps: caps,
Protocols: make(map[string]interface{}, len(p.running)),
Latency: p.latency.Load(),
}
if p.Node().Seq() > 0 {
info.ENR = p.Node().String()
Expand Down