From af31291285feaf56ba33fb5f51fe00eeed9b6b2f Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Sat, 13 Jul 2024 11:34:42 +0700 Subject: [PATCH 1/6] save --- erigon-lib/diagnostics/entities.go | 11 +++++++ erigon-lib/diagnostics/network.go | 53 +++++++++++++++++++++++------- 2 files changed, 53 insertions(+), 11 deletions(-) diff --git a/erigon-lib/diagnostics/entities.go b/erigon-lib/diagnostics/entities.go index 6033978f14f..9e528f8b9e1 100644 --- a/erigon-lib/diagnostics/entities.go +++ b/erigon-lib/diagnostics/entities.go @@ -18,6 +18,8 @@ package diagnostics import ( "time" + + "golang.org/x/exp/maps" ) type SyncStageType string @@ -47,6 +49,15 @@ type PeerStatistics struct { TypeBytesOut map[string]uint64 } +func (p PeerStatistics) Clone() PeerStatistics { + p1 := p + p1.CapBytesIn = maps.Clone(p.CapBytesIn) + p1.CapBytesOut = maps.Clone(p.CapBytesOut) + p1.TypeBytesIn = maps.Clone(p.TypeBytesIn) + p1.TypeBytesOut = maps.Clone(p.TypeBytesOut) + return p1 +} + type PeerDataUpdate struct { PeerID string ENR string diff --git a/erigon-lib/diagnostics/network.go b/erigon-lib/diagnostics/network.go index fe53667c961..65aa62ee5d8 100644 --- a/erigon-lib/diagnostics/network.go +++ b/erigon-lib/diagnostics/network.go @@ -30,6 +30,7 @@ type PeerStats struct { recordsCount int lastUpdateMap map[string]time.Time limit int + mu sync.Mutex } func NewPeerStats(peerLimit int) *PeerStats { @@ -42,6 +43,8 @@ func NewPeerStats(peerLimit int) *PeerStats { } func (p *PeerStats) AddOrUpdatePeer(peerID string, peerInfo PeerStatisticMsgUpdate) { + p.mu.Lock() + defer p.mu.Unlock() if value, ok := p.peersInfo.Load(peerID); ok { p.UpdatePeer(peerID, peerInfo, value) } else { @@ -53,6 +56,9 @@ func (p *PeerStats) AddOrUpdatePeer(peerID string, peerInfo PeerStatisticMsgUpda } func (p *PeerStats) AddPeer(peerID string, peerInfo PeerStatisticMsgUpdate) { + p.mu.Lock() + defer p.mu.Unlock() + pv := PeerStatisticsFromMsgUpdate(peerInfo, nil) p.peersInfo.Store(peerID, pv) p.recordsCount++ @@ -60,6 +66,9 @@ func (p *PeerStats) AddPeer(peerID string, peerInfo PeerStatisticMsgUpdate) { } func (p *PeerStats) UpdatePeer(peerID string, peerInfo PeerStatisticMsgUpdate, prevValue any) { + p.mu.Lock() + defer p.mu.Unlock() + pv := PeerStatisticsFromMsgUpdate(peerInfo, prevValue) p.peersInfo.Store(peerID, pv) @@ -104,23 +113,30 @@ func PeerStatisticsFromMsgUpdate(msg PeerStatisticMsgUpdate, prevValue any) Peer } func (p *PeerStats) GetPeersCount() int { + p.mu.Lock() + defer p.mu.Unlock() return p.recordsCount } -func (p *PeerStats) GetPeers() map[string]*PeerStatistics { - stats := make(map[string]*PeerStatistics) +func (p *PeerStats) GetPeers() map[string]PeerStatistics { + p.mu.Lock() + defer p.mu.Unlock() + stats := make(map[string]PeerStatistics) p.peersInfo.Range(func(key, value interface{}) bool { - if loadedKey, ok := key.(string); ok { - if loadedValue, ok := value.(PeerStatistics); ok { - stats[loadedKey] = &loadedValue - } else { - log.Debug("Failed to cast value to PeerStatistics struct", value) - } - } else { + loadedKey, ok := key.(string) + if !ok { log.Debug("Failed to cast key to string", key) + return true + } + + loadedValue, ok := value.(PeerStatistics) + if !ok { + log.Debug("Failed to cast value to PeerStatistics struct", value) + return true } + stats[loadedKey] = loadedValue.Clone() return true }) @@ -128,9 +144,12 @@ func (p *PeerStats) GetPeers() map[string]*PeerStatistics { } func (p *PeerStats) GetPeerStatistics(peerID string) PeerStatistics { + p.mu.Lock() + defer p.mu.Unlock() + if value, ok := p.peersInfo.Load(peerID); ok { if peerStats, ok := value.(PeerStatistics); ok { - return peerStats + return peerStats.Clone() } } @@ -138,6 +157,9 @@ func (p *PeerStats) GetPeerStatistics(peerID string) PeerStatistics { } func (p *PeerStats) GetLastUpdate(peerID string) time.Time { + p.mu.Lock() + defer p.mu.Unlock() + if lastUpdate, ok := p.lastUpdateMap[peerID]; ok { return lastUpdate } @@ -146,6 +168,9 @@ func (p *PeerStats) GetLastUpdate(peerID string) time.Time { } func (p *PeerStats) RemovePeer(peerID string) { + p.mu.Lock() + defer p.mu.Unlock() + p.peersInfo.Delete(peerID) p.recordsCount-- delete(p.lastUpdateMap, peerID) @@ -157,6 +182,9 @@ type PeerUpdTime struct { } func (p *PeerStats) GetOldestUpdatedPeersWithSize(size int) []PeerUpdTime { + p.mu.Lock() + defer p.mu.Unlock() + timeArray := make([]PeerUpdTime, 0, p.GetPeersCount()) for k, v := range p.lastUpdateMap { timeArray = append(timeArray, PeerUpdTime{k, v}) @@ -174,6 +202,9 @@ func (p *PeerStats) GetOldestUpdatedPeersWithSize(size int) []PeerUpdTime { } func (p *PeerStats) RemovePeersWhichExceedLimit(limit int) { + p.mu.Lock() + defer p.mu.Unlock() + peersToRemove := p.GetPeersCount() - limit if peersToRemove > 0 { peers := p.GetOldestUpdatedPeersWithSize(peersToRemove) @@ -204,6 +235,6 @@ func (d *DiagnosticClient) runCollectPeersStatistics(rootCtx context.Context) { }() } -func (d *DiagnosticClient) Peers() map[string]*PeerStatistics { +func (d *DiagnosticClient) Peers() map[string]PeerStatistics { return d.peersStats.GetPeers() } From d84ee4be1b57d7d13ce77d5f90a085ae59313fb3 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Sat, 13 Jul 2024 15:08:32 +0700 Subject: [PATCH 2/6] save --- erigon-lib/diagnostics/network.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/erigon-lib/diagnostics/network.go b/erigon-lib/diagnostics/network.go index 65aa62ee5d8..4086e88ac9d 100644 --- a/erigon-lib/diagnostics/network.go +++ b/erigon-lib/diagnostics/network.go @@ -48,17 +48,19 @@ func (p *PeerStats) AddOrUpdatePeer(peerID string, peerInfo PeerStatisticMsgUpda if value, ok := p.peersInfo.Load(peerID); ok { p.UpdatePeer(peerID, peerInfo, value) } else { - p.AddPeer(peerID, peerInfo) - if p.GetPeersCount() > p.limit { + p.addPeer(peerID, peerInfo) + if p.getPeersCount() > p.limit { p.RemovePeersWhichExceedLimit(p.limit) } } } +// Deprecated - used in tests. non-thread-safe func (p *PeerStats) AddPeer(peerID string, peerInfo PeerStatisticMsgUpdate) { - p.mu.Lock() - defer p.mu.Unlock() + p.addPeer(peerID, peerInfo) +} +func (p *PeerStats) addPeer(peerID string, peerInfo PeerStatisticMsgUpdate) { pv := PeerStatisticsFromMsgUpdate(peerInfo, nil) p.peersInfo.Store(peerID, pv) p.recordsCount++ @@ -115,6 +117,10 @@ func PeerStatisticsFromMsgUpdate(msg PeerStatisticMsgUpdate, prevValue any) Peer func (p *PeerStats) GetPeersCount() int { p.mu.Lock() defer p.mu.Unlock() + return p.getPeersCount() +} + +func (p *PeerStats) getPeersCount() int { return p.recordsCount } @@ -185,7 +191,7 @@ func (p *PeerStats) GetOldestUpdatedPeersWithSize(size int) []PeerUpdTime { p.mu.Lock() defer p.mu.Unlock() - timeArray := make([]PeerUpdTime, 0, p.GetPeersCount()) + timeArray := make([]PeerUpdTime, 0, p.getPeersCount()) for k, v := range p.lastUpdateMap { timeArray = append(timeArray, PeerUpdTime{k, v}) } @@ -205,7 +211,7 @@ func (p *PeerStats) RemovePeersWhichExceedLimit(limit int) { p.mu.Lock() defer p.mu.Unlock() - peersToRemove := p.GetPeersCount() - limit + peersToRemove := p.getPeersCount() - limit if peersToRemove > 0 { peers := p.GetOldestUpdatedPeersWithSize(peersToRemove) for _, peer := range peers { From 84d782914c4bc009b4cd696665ec3697076b0af1 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Sat, 13 Jul 2024 15:09:34 +0700 Subject: [PATCH 3/6] save --- erigon-lib/diagnostics/network.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/erigon-lib/diagnostics/network.go b/erigon-lib/diagnostics/network.go index 4086e88ac9d..7b121caefe7 100644 --- a/erigon-lib/diagnostics/network.go +++ b/erigon-lib/diagnostics/network.go @@ -46,11 +46,11 @@ func (p *PeerStats) AddOrUpdatePeer(peerID string, peerInfo PeerStatisticMsgUpda p.mu.Lock() defer p.mu.Unlock() if value, ok := p.peersInfo.Load(peerID); ok { - p.UpdatePeer(peerID, peerInfo, value) + p.updatePeer(peerID, peerInfo, value) } else { p.addPeer(peerID, peerInfo) if p.getPeersCount() > p.limit { - p.RemovePeersWhichExceedLimit(p.limit) + p.removePeersWhichExceedLimit(p.limit) } } } @@ -70,7 +70,10 @@ func (p *PeerStats) addPeer(peerID string, peerInfo PeerStatisticMsgUpdate) { func (p *PeerStats) UpdatePeer(peerID string, peerInfo PeerStatisticMsgUpdate, prevValue any) { p.mu.Lock() defer p.mu.Unlock() + p.updatePeer(peerID, peerInfo, prevValue) +} +func (p *PeerStats) updatePeer(peerID string, peerInfo PeerStatisticMsgUpdate, prevValue any) { pv := PeerStatisticsFromMsgUpdate(peerInfo, prevValue) p.peersInfo.Store(peerID, pv) @@ -210,7 +213,10 @@ func (p *PeerStats) GetOldestUpdatedPeersWithSize(size int) []PeerUpdTime { func (p *PeerStats) RemovePeersWhichExceedLimit(limit int) { p.mu.Lock() defer p.mu.Unlock() + p.removePeersWhichExceedLimit(limit) +} +func (p *PeerStats) removePeersWhichExceedLimit(limit int) { peersToRemove := p.getPeersCount() - limit if peersToRemove > 0 { peers := p.GetOldestUpdatedPeersWithSize(peersToRemove) From 87adb176c8dda8dbb9c600373bf1416288544dba Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Sat, 13 Jul 2024 15:09:54 +0700 Subject: [PATCH 4/6] save --- erigon-lib/diagnostics/network.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/erigon-lib/diagnostics/network.go b/erigon-lib/diagnostics/network.go index 7b121caefe7..949f602f10e 100644 --- a/erigon-lib/diagnostics/network.go +++ b/erigon-lib/diagnostics/network.go @@ -57,6 +57,8 @@ func (p *PeerStats) AddOrUpdatePeer(peerID string, peerInfo PeerStatisticMsgUpda // Deprecated - used in tests. non-thread-safe func (p *PeerStats) AddPeer(peerID string, peerInfo PeerStatisticMsgUpdate) { + p.mu.Lock() + defer p.mu.Unlock() p.addPeer(peerID, peerInfo) } From acb8adef79292ea99ef8849f46f09ba840b9d5b8 Mon Sep 17 00:00:00 2001 From: dvovk Date: Sat, 13 Jul 2024 11:14:41 +0200 Subject: [PATCH 5/6] added compare method --- erigon-lib/diagnostics/entities.go | 10 ++++++++++ erigon-lib/diagnostics/network_test.go | 6 +++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/erigon-lib/diagnostics/entities.go b/erigon-lib/diagnostics/entities.go index 9e528f8b9e1..ea83615c163 100644 --- a/erigon-lib/diagnostics/entities.go +++ b/erigon-lib/diagnostics/entities.go @@ -58,6 +58,16 @@ func (p PeerStatistics) Clone() PeerStatistics { return p1 } +func (p PeerStatistics) Equal(p2 PeerStatistics) bool { + return p.PeerType == p2.PeerType && + p.BytesIn == p2.BytesIn && + p.BytesOut == p2.BytesOut && + maps.Equal(p.CapBytesIn, p2.CapBytesIn) && + maps.Equal(p.CapBytesOut, p2.CapBytesOut) && + maps.Equal(p.TypeBytesIn, p2.TypeBytesIn) && + maps.Equal(p.TypeBytesOut, p2.TypeBytesOut) +} + type PeerDataUpdate struct { PeerID string ENR string diff --git a/erigon-lib/diagnostics/network_test.go b/erigon-lib/diagnostics/network_test.go index ec9525281b8..6cbdfff90f5 100644 --- a/erigon-lib/diagnostics/network_test.go +++ b/erigon-lib/diagnostics/network_test.go @@ -159,7 +159,7 @@ func TestGetPeers(t *testing.T) { peers := peerStats.GetPeers() require.Equal(t, 3, len(peers)) - require.Equal(t, &mockInboundPeerStats, peers["test1"]) + require.True(t, peers["test1"].Equal(mockInboundPeerStats)) } func TestLastUpdated(t *testing.T) { @@ -201,12 +201,12 @@ func TestRemovePeersWhichExceedLimit(t *testing.T) { peerStats.AddOrUpdatePeer(pid, mockInboundUpdMsg) } - peerStats.RemovePeersWhichExceedLimit(limit) + //peerStats.RemovePeersWhichExceedLimit(limit) require.Equal(t, limit, peerStats.GetPeersCount()) limit = 1000 - peerStats.RemovePeersWhichExceedLimit(limit) + //peerStats.RemovePeersWhichExceedLimit(limit) require.Equal(t, 100, peerStats.GetPeersCount()) } From 25367d7afbbc1f9c7d1a5bd398ae6e60822a92a1 Mon Sep 17 00:00:00 2001 From: dvovk Date: Sat, 13 Jul 2024 11:27:37 +0200 Subject: [PATCH 6/6] fix lock --- erigon-lib/diagnostics/network.go | 7 ++++--- erigon-lib/diagnostics/network_test.go | 5 +++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/erigon-lib/diagnostics/network.go b/erigon-lib/diagnostics/network.go index 949f602f10e..e69e8e3c9ec 100644 --- a/erigon-lib/diagnostics/network.go +++ b/erigon-lib/diagnostics/network.go @@ -43,8 +43,6 @@ func NewPeerStats(peerLimit int) *PeerStats { } func (p *PeerStats) AddOrUpdatePeer(peerID string, peerInfo PeerStatisticMsgUpdate) { - p.mu.Lock() - defer p.mu.Unlock() if value, ok := p.peersInfo.Load(peerID); ok { p.updatePeer(peerID, peerInfo, value) } else { @@ -195,7 +193,10 @@ type PeerUpdTime struct { func (p *PeerStats) GetOldestUpdatedPeersWithSize(size int) []PeerUpdTime { p.mu.Lock() defer p.mu.Unlock() + return p.getOldestUpdatedPeersWithSize(size) +} +func (p *PeerStats) getOldestUpdatedPeersWithSize(size int) []PeerUpdTime { timeArray := make([]PeerUpdTime, 0, p.getPeersCount()) for k, v := range p.lastUpdateMap { timeArray = append(timeArray, PeerUpdTime{k, v}) @@ -221,7 +222,7 @@ func (p *PeerStats) RemovePeersWhichExceedLimit(limit int) { func (p *PeerStats) removePeersWhichExceedLimit(limit int) { peersToRemove := p.getPeersCount() - limit if peersToRemove > 0 { - peers := p.GetOldestUpdatedPeersWithSize(peersToRemove) + peers := p.getOldestUpdatedPeersWithSize(peersToRemove) for _, peer := range peers { p.RemovePeer(peer.PeerID) } diff --git a/erigon-lib/diagnostics/network_test.go b/erigon-lib/diagnostics/network_test.go index 6cbdfff90f5..cce3c25bfbb 100644 --- a/erigon-lib/diagnostics/network_test.go +++ b/erigon-lib/diagnostics/network_test.go @@ -200,13 +200,14 @@ func TestRemovePeersWhichExceedLimit(t *testing.T) { pid := "test" + strconv.Itoa(i) peerStats.AddOrUpdatePeer(pid, mockInboundUpdMsg) } + require.Equal(t, 100, peerStats.GetPeersCount()) - //peerStats.RemovePeersWhichExceedLimit(limit) + peerStats.RemovePeersWhichExceedLimit(limit) require.Equal(t, limit, peerStats.GetPeersCount()) limit = 1000 - //peerStats.RemovePeersWhichExceedLimit(limit) + peerStats.RemovePeersWhichExceedLimit(limit) require.Equal(t, 100, peerStats.GetPeersCount()) }