diff --git a/erigon-lib/diagnostics/entities.go b/erigon-lib/diagnostics/entities.go index 6033978f14f..ea83615c163 100644 --- a/erigon-lib/diagnostics/entities.go +++ b/erigon-lib/diagnostics/entities.go @@ -18,6 +18,8 @@ package diagnostics import ( "time" + + "golang.org/x/exp/maps" ) type SyncStageType string @@ -47,6 +49,25 @@ type PeerStatistics struct { TypeBytesOut map[string]uint64 } +func (p PeerStatistics) Clone() PeerStatistics { + p1 := p + p1.CapBytesIn = maps.Clone(p.CapBytesIn) + p1.CapBytesOut = maps.Clone(p.CapBytesOut) + p1.TypeBytesIn = maps.Clone(p.TypeBytesIn) + p1.TypeBytesOut = maps.Clone(p.TypeBytesOut) + return p1 +} + +func (p PeerStatistics) Equal(p2 PeerStatistics) bool { + return p.PeerType == p2.PeerType && + p.BytesIn == p2.BytesIn && + p.BytesOut == p2.BytesOut && + maps.Equal(p.CapBytesIn, p2.CapBytesIn) && + maps.Equal(p.CapBytesOut, p2.CapBytesOut) && + maps.Equal(p.TypeBytesIn, p2.TypeBytesIn) && + maps.Equal(p.TypeBytesOut, p2.TypeBytesOut) +} + type PeerDataUpdate struct { PeerID string ENR string diff --git a/erigon-lib/diagnostics/network.go b/erigon-lib/diagnostics/network.go index fe53667c961..e69e8e3c9ec 100644 --- a/erigon-lib/diagnostics/network.go +++ b/erigon-lib/diagnostics/network.go @@ -30,6 +30,7 @@ type PeerStats struct { recordsCount int lastUpdateMap map[string]time.Time limit int + mu sync.Mutex } func NewPeerStats(peerLimit int) *PeerStats { @@ -43,16 +44,23 @@ func NewPeerStats(peerLimit int) *PeerStats { func (p *PeerStats) AddOrUpdatePeer(peerID string, peerInfo PeerStatisticMsgUpdate) { if value, ok := p.peersInfo.Load(peerID); ok { - p.UpdatePeer(peerID, peerInfo, value) + p.updatePeer(peerID, peerInfo, value) } else { - p.AddPeer(peerID, peerInfo) - if p.GetPeersCount() > p.limit { - p.RemovePeersWhichExceedLimit(p.limit) + p.addPeer(peerID, peerInfo) + if p.getPeersCount() > p.limit { + p.removePeersWhichExceedLimit(p.limit) } } } +// Deprecated - used in tests. non-thread-safe func (p *PeerStats) AddPeer(peerID string, peerInfo PeerStatisticMsgUpdate) { + p.mu.Lock() + defer p.mu.Unlock() + p.addPeer(peerID, peerInfo) +} + +func (p *PeerStats) addPeer(peerID string, peerInfo PeerStatisticMsgUpdate) { pv := PeerStatisticsFromMsgUpdate(peerInfo, nil) p.peersInfo.Store(peerID, pv) p.recordsCount++ @@ -60,6 +68,12 @@ func (p *PeerStats) AddPeer(peerID string, peerInfo PeerStatisticMsgUpdate) { } func (p *PeerStats) UpdatePeer(peerID string, peerInfo PeerStatisticMsgUpdate, prevValue any) { + p.mu.Lock() + defer p.mu.Unlock() + p.updatePeer(peerID, peerInfo, prevValue) +} + +func (p *PeerStats) updatePeer(peerID string, peerInfo PeerStatisticMsgUpdate, prevValue any) { pv := PeerStatisticsFromMsgUpdate(peerInfo, prevValue) p.peersInfo.Store(peerID, pv) @@ -104,23 +118,34 @@ func PeerStatisticsFromMsgUpdate(msg PeerStatisticMsgUpdate, prevValue any) Peer } func (p *PeerStats) GetPeersCount() int { + p.mu.Lock() + defer p.mu.Unlock() + return p.getPeersCount() +} + +func (p *PeerStats) getPeersCount() int { return p.recordsCount } -func (p *PeerStats) GetPeers() map[string]*PeerStatistics { - stats := make(map[string]*PeerStatistics) +func (p *PeerStats) GetPeers() map[string]PeerStatistics { + p.mu.Lock() + defer p.mu.Unlock() + stats := make(map[string]PeerStatistics) p.peersInfo.Range(func(key, value interface{}) bool { - if loadedKey, ok := key.(string); ok { - if loadedValue, ok := value.(PeerStatistics); ok { - stats[loadedKey] = &loadedValue - } else { - log.Debug("Failed to cast value to PeerStatistics struct", value) - } - } else { + loadedKey, ok := key.(string) + if !ok { log.Debug("Failed to cast key to string", key) + return true } + loadedValue, ok := value.(PeerStatistics) + if !ok { + log.Debug("Failed to cast value to PeerStatistics struct", value) + return true + } + + stats[loadedKey] = loadedValue.Clone() return true }) @@ -128,9 +153,12 @@ func (p *PeerStats) GetPeers() map[string]*PeerStatistics { } func (p *PeerStats) GetPeerStatistics(peerID string) PeerStatistics { + p.mu.Lock() + defer p.mu.Unlock() + if value, ok := p.peersInfo.Load(peerID); ok { if peerStats, ok := value.(PeerStatistics); ok { - return peerStats + return peerStats.Clone() } } @@ -138,6 +166,9 @@ func (p *PeerStats) GetPeerStatistics(peerID string) PeerStatistics { } func (p *PeerStats) GetLastUpdate(peerID string) time.Time { + p.mu.Lock() + defer p.mu.Unlock() + if lastUpdate, ok := p.lastUpdateMap[peerID]; ok { return lastUpdate } @@ -146,6 +177,9 @@ func (p *PeerStats) GetLastUpdate(peerID string) time.Time { } func (p *PeerStats) RemovePeer(peerID string) { + p.mu.Lock() + defer p.mu.Unlock() + p.peersInfo.Delete(peerID) p.recordsCount-- delete(p.lastUpdateMap, peerID) @@ -157,7 +191,13 @@ type PeerUpdTime struct { } func (p *PeerStats) GetOldestUpdatedPeersWithSize(size int) []PeerUpdTime { - timeArray := make([]PeerUpdTime, 0, p.GetPeersCount()) + p.mu.Lock() + defer p.mu.Unlock() + return p.getOldestUpdatedPeersWithSize(size) +} + +func (p *PeerStats) getOldestUpdatedPeersWithSize(size int) []PeerUpdTime { + timeArray := make([]PeerUpdTime, 0, p.getPeersCount()) for k, v := range p.lastUpdateMap { timeArray = append(timeArray, PeerUpdTime{k, v}) } @@ -174,9 +214,15 @@ func (p *PeerStats) GetOldestUpdatedPeersWithSize(size int) []PeerUpdTime { } func (p *PeerStats) RemovePeersWhichExceedLimit(limit int) { - peersToRemove := p.GetPeersCount() - limit + p.mu.Lock() + defer p.mu.Unlock() + p.removePeersWhichExceedLimit(limit) +} + +func (p *PeerStats) removePeersWhichExceedLimit(limit int) { + peersToRemove := p.getPeersCount() - limit if peersToRemove > 0 { - peers := p.GetOldestUpdatedPeersWithSize(peersToRemove) + peers := p.getOldestUpdatedPeersWithSize(peersToRemove) for _, peer := range peers { p.RemovePeer(peer.PeerID) } @@ -204,6 +250,6 @@ func (d *DiagnosticClient) runCollectPeersStatistics(rootCtx context.Context) { }() } -func (d *DiagnosticClient) Peers() map[string]*PeerStatistics { +func (d *DiagnosticClient) Peers() map[string]PeerStatistics { return d.peersStats.GetPeers() } diff --git a/erigon-lib/diagnostics/network_test.go b/erigon-lib/diagnostics/network_test.go index ec9525281b8..cce3c25bfbb 100644 --- a/erigon-lib/diagnostics/network_test.go +++ b/erigon-lib/diagnostics/network_test.go @@ -159,7 +159,7 @@ func TestGetPeers(t *testing.T) { peers := peerStats.GetPeers() require.Equal(t, 3, len(peers)) - require.Equal(t, &mockInboundPeerStats, peers["test1"]) + require.True(t, peers["test1"].Equal(mockInboundPeerStats)) } func TestLastUpdated(t *testing.T) { @@ -200,6 +200,7 @@ func TestRemovePeersWhichExceedLimit(t *testing.T) { pid := "test" + strconv.Itoa(i) peerStats.AddOrUpdatePeer(pid, mockInboundUpdMsg) } + require.Equal(t, 100, peerStats.GetPeersCount()) peerStats.RemovePeersWhichExceedLimit(limit)