Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions erigon-lib/diagnostics/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package diagnostics

import (
"time"

"golang.org/x/exp/maps"
)

type SyncStageType string
Expand Down Expand Up @@ -47,6 +49,25 @@ type PeerStatistics struct {
TypeBytesOut map[string]uint64
}

func (p PeerStatistics) Clone() PeerStatistics {
p1 := p
p1.CapBytesIn = maps.Clone(p.CapBytesIn)
p1.CapBytesOut = maps.Clone(p.CapBytesOut)
p1.TypeBytesIn = maps.Clone(p.TypeBytesIn)
p1.TypeBytesOut = maps.Clone(p.TypeBytesOut)
return p1
}

func (p PeerStatistics) Equal(p2 PeerStatistics) bool {
return p.PeerType == p2.PeerType &&
p.BytesIn == p2.BytesIn &&
p.BytesOut == p2.BytesOut &&
maps.Equal(p.CapBytesIn, p2.CapBytesIn) &&
maps.Equal(p.CapBytesOut, p2.CapBytesOut) &&
maps.Equal(p.TypeBytesIn, p2.TypeBytesIn) &&
maps.Equal(p.TypeBytesOut, p2.TypeBytesOut)
}

type PeerDataUpdate struct {
PeerID string
ENR string
Expand Down
82 changes: 64 additions & 18 deletions erigon-lib/diagnostics/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type PeerStats struct {
recordsCount int
lastUpdateMap map[string]time.Time
limit int
mu sync.Mutex
}

func NewPeerStats(peerLimit int) *PeerStats {
Expand All @@ -43,23 +44,36 @@ func NewPeerStats(peerLimit int) *PeerStats {

func (p *PeerStats) AddOrUpdatePeer(peerID string, peerInfo PeerStatisticMsgUpdate) {
if value, ok := p.peersInfo.Load(peerID); ok {
p.UpdatePeer(peerID, peerInfo, value)
p.updatePeer(peerID, peerInfo, value)
} else {
p.AddPeer(peerID, peerInfo)
if p.GetPeersCount() > p.limit {
p.RemovePeersWhichExceedLimit(p.limit)
p.addPeer(peerID, peerInfo)
if p.getPeersCount() > p.limit {
p.removePeersWhichExceedLimit(p.limit)
}
}
}

// Deprecated - used in tests. non-thread-safe
func (p *PeerStats) AddPeer(peerID string, peerInfo PeerStatisticMsgUpdate) {
p.mu.Lock()
defer p.mu.Unlock()
p.addPeer(peerID, peerInfo)
}

func (p *PeerStats) addPeer(peerID string, peerInfo PeerStatisticMsgUpdate) {
pv := PeerStatisticsFromMsgUpdate(peerInfo, nil)
p.peersInfo.Store(peerID, pv)
p.recordsCount++
p.lastUpdateMap[peerID] = time.Now()
}

func (p *PeerStats) UpdatePeer(peerID string, peerInfo PeerStatisticMsgUpdate, prevValue any) {
p.mu.Lock()
defer p.mu.Unlock()
p.updatePeer(peerID, peerInfo, prevValue)
}

func (p *PeerStats) updatePeer(peerID string, peerInfo PeerStatisticMsgUpdate, prevValue any) {
pv := PeerStatisticsFromMsgUpdate(peerInfo, prevValue)

p.peersInfo.Store(peerID, pv)
Expand Down Expand Up @@ -104,40 +118,57 @@ func PeerStatisticsFromMsgUpdate(msg PeerStatisticMsgUpdate, prevValue any) Peer
}

func (p *PeerStats) GetPeersCount() int {
p.mu.Lock()
defer p.mu.Unlock()
return p.getPeersCount()
}

func (p *PeerStats) getPeersCount() int {
return p.recordsCount
}

func (p *PeerStats) GetPeers() map[string]*PeerStatistics {
stats := make(map[string]*PeerStatistics)
func (p *PeerStats) GetPeers() map[string]PeerStatistics {
p.mu.Lock()
defer p.mu.Unlock()

stats := make(map[string]PeerStatistics)
p.peersInfo.Range(func(key, value interface{}) bool {
if loadedKey, ok := key.(string); ok {
if loadedValue, ok := value.(PeerStatistics); ok {
stats[loadedKey] = &loadedValue
} else {
log.Debug("Failed to cast value to PeerStatistics struct", value)
}
} else {
loadedKey, ok := key.(string)
if !ok {
log.Debug("Failed to cast key to string", key)
return true
}

loadedValue, ok := value.(PeerStatistics)
if !ok {
log.Debug("Failed to cast value to PeerStatistics struct", value)
return true
}

stats[loadedKey] = loadedValue.Clone()
return true
})

return stats
}

func (p *PeerStats) GetPeerStatistics(peerID string) PeerStatistics {
p.mu.Lock()
defer p.mu.Unlock()

if value, ok := p.peersInfo.Load(peerID); ok {
if peerStats, ok := value.(PeerStatistics); ok {
return peerStats
return peerStats.Clone()
}
}

return PeerStatistics{}
}

func (p *PeerStats) GetLastUpdate(peerID string) time.Time {
p.mu.Lock()
defer p.mu.Unlock()

if lastUpdate, ok := p.lastUpdateMap[peerID]; ok {
return lastUpdate
}
Expand All @@ -146,6 +177,9 @@ func (p *PeerStats) GetLastUpdate(peerID string) time.Time {
}

func (p *PeerStats) RemovePeer(peerID string) {
p.mu.Lock()
defer p.mu.Unlock()

p.peersInfo.Delete(peerID)
p.recordsCount--
delete(p.lastUpdateMap, peerID)
Expand All @@ -157,7 +191,13 @@ type PeerUpdTime struct {
}

func (p *PeerStats) GetOldestUpdatedPeersWithSize(size int) []PeerUpdTime {
timeArray := make([]PeerUpdTime, 0, p.GetPeersCount())
p.mu.Lock()
defer p.mu.Unlock()
return p.getOldestUpdatedPeersWithSize(size)
}

func (p *PeerStats) getOldestUpdatedPeersWithSize(size int) []PeerUpdTime {
timeArray := make([]PeerUpdTime, 0, p.getPeersCount())
for k, v := range p.lastUpdateMap {
timeArray = append(timeArray, PeerUpdTime{k, v})
}
Expand All @@ -174,9 +214,15 @@ func (p *PeerStats) GetOldestUpdatedPeersWithSize(size int) []PeerUpdTime {
}

func (p *PeerStats) RemovePeersWhichExceedLimit(limit int) {
peersToRemove := p.GetPeersCount() - limit
p.mu.Lock()
defer p.mu.Unlock()
p.removePeersWhichExceedLimit(limit)
}

func (p *PeerStats) removePeersWhichExceedLimit(limit int) {
peersToRemove := p.getPeersCount() - limit
if peersToRemove > 0 {
peers := p.GetOldestUpdatedPeersWithSize(peersToRemove)
peers := p.getOldestUpdatedPeersWithSize(peersToRemove)
for _, peer := range peers {
p.RemovePeer(peer.PeerID)
}
Expand Down Expand Up @@ -204,6 +250,6 @@ func (d *DiagnosticClient) runCollectPeersStatistics(rootCtx context.Context) {
}()
}

func (d *DiagnosticClient) Peers() map[string]*PeerStatistics {
func (d *DiagnosticClient) Peers() map[string]PeerStatistics {
return d.peersStats.GetPeers()
}
3 changes: 2 additions & 1 deletion erigon-lib/diagnostics/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func TestGetPeers(t *testing.T) {

peers := peerStats.GetPeers()
require.Equal(t, 3, len(peers))
require.Equal(t, &mockInboundPeerStats, peers["test1"])
require.True(t, peers["test1"].Equal(mockInboundPeerStats))
}

func TestLastUpdated(t *testing.T) {
Expand Down Expand Up @@ -200,6 +200,7 @@ func TestRemovePeersWhichExceedLimit(t *testing.T) {
pid := "test" + strconv.Itoa(i)
peerStats.AddOrUpdatePeer(pid, mockInboundUpdMsg)
}
require.Equal(t, 100, peerStats.GetPeersCount())

peerStats.RemovePeersWhichExceedLimit(limit)

Expand Down