-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
[management] Refactor network map controller #4789
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
03b599a
1d25cf8
092fc24
da1ffe4
febde70
ee45532
51ad8d7
e6aa119
d35f904
71684e4
addb4df
bfc84c0
abb8782
04209c5
41744aa
1908cfa
705fd84
f2bb181
83b8216
0294c04
a31e293
a0b7a8f
513ab7b
3fb4a94
67460a2
46f0e6c
596820b
40808a0
df87122
9eaeb09
78d7de6
9530738
88d8835
c443de7
ab8b0d9
6d2f609
7829659
bcd9ca1
4fc1b4d
822e0c7
7b80e00
2b89747
e25f01d
ad87fbc
d2916da
0d7aa44
b5d3a1e
fbf92bb
e6c11d1
8afe24a
fd0cc43
3fc8c8b
7b3b588
f2a5e25
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ import ( | |
| nbdns "github.com/netbirdio/netbird/dns" | ||
| "github.com/netbirdio/netbird/management/internals/controllers/network_map" | ||
| "github.com/netbirdio/netbird/management/internals/controllers/network_map/controller/cache" | ||
| "github.com/netbirdio/netbird/management/internals/modules/peers/ephemeral" | ||
| "github.com/netbirdio/netbird/management/internals/server/config" | ||
| "github.com/netbirdio/netbird/management/internals/shared/grpc" | ||
| "github.com/netbirdio/netbird/management/server/account" | ||
|
|
@@ -42,6 +43,7 @@ type Controller struct { | |
| accountManagerMetrics *telemetry.AccountManagerMetrics | ||
| peersUpdateManager network_map.PeersUpdateManager | ||
| settingsManager settings.Manager | ||
| EphemeralPeersManager ephemeral.Manager | ||
|
|
||
| accountUpdateLocks sync.Map | ||
| sendAccountUpdateLocks sync.Map | ||
|
|
@@ -70,7 +72,7 @@ type bufferUpdate struct { | |
|
|
||
| var _ network_map.Controller = (*Controller)(nil) | ||
|
|
||
| func NewController(ctx context.Context, store store.Store, metrics telemetry.AppMetrics, peersUpdateManager network_map.PeersUpdateManager, requestBuffer account.RequestBuffer, integratedPeerValidator integrated_validator.IntegratedValidator, settingsManager settings.Manager, dnsDomain string, proxyController port_forwarding.Controller, config *config.Config) *Controller { | ||
| func NewController(ctx context.Context, store store.Store, metrics telemetry.AppMetrics, peersUpdateManager network_map.PeersUpdateManager, requestBuffer account.RequestBuffer, integratedPeerValidator integrated_validator.IntegratedValidator, settingsManager settings.Manager, dnsDomain string, proxyController port_forwarding.Controller, ephemeralPeersManager ephemeral.Manager, config *config.Config) *Controller { | ||
| nMetrics, err := newMetrics(metrics.UpdateChannelMetrics()) | ||
| if err != nil { | ||
| log.Fatal(fmt.Errorf("error creating metrics: %w", err)) | ||
|
|
@@ -99,14 +101,40 @@ func NewController(ctx context.Context, store store.Store, metrics telemetry.App | |
| dnsDomain: dnsDomain, | ||
| config: config, | ||
|
|
||
| proxyController: proxyController, | ||
| proxyController: proxyController, | ||
| EphemeralPeersManager: ephemeralPeersManager, | ||
|
|
||
| holder: types.NewHolder(), | ||
| expNewNetworkMap: newNetworkMapBuilder, | ||
| expNewNetworkMapAIDs: expIDs, | ||
| } | ||
| } | ||
|
|
||
| func (c *Controller) OnPeerConnected(ctx context.Context, accountID string, peerID string) (chan *network_map.UpdateMessage, error) { | ||
| peer, err := c.repo.GetPeerByID(ctx, accountID, peerID) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to get peer %s: %v", peerID, err) | ||
| } | ||
|
|
||
| c.EphemeralPeersManager.OnPeerConnected(ctx, peer) | ||
|
|
||
| return c.peersUpdateManager.CreateChannel(ctx, peerID), nil | ||
| } | ||
|
|
||
| func (c *Controller) OnPeerDisconnected(ctx context.Context, accountID string, peerID string) { | ||
| c.peersUpdateManager.CloseChannel(ctx, peerID) | ||
| peer, err := c.repo.GetPeerByID(ctx, accountID, peerID) | ||
| if err != nil { | ||
| log.WithContext(ctx).Errorf("failed to get peer %s: %v", peerID, err) | ||
| return | ||
| } | ||
| c.EphemeralPeersManager.OnPeerDisconnected(ctx, peer) | ||
| } | ||
|
|
||
| func (c *Controller) CountStreams() int { | ||
| return c.peersUpdateManager.CountStreams() | ||
| } | ||
|
|
||
| func (c *Controller) sendUpdateAccountPeers(ctx context.Context, accountID string) error { | ||
| log.WithContext(ctx).Tracef("updating peers for account %s from %s", accountID, util.GetCallerName()) | ||
| var ( | ||
|
|
@@ -366,38 +394,6 @@ func (c *Controller) BufferUpdateAccountPeers(ctx context.Context, accountID str | |
| return nil | ||
| } | ||
|
|
||
| func (c *Controller) DeletePeer(ctx context.Context, accountId string, peerId string) error { | ||
| network, err := c.repo.GetAccountNetwork(ctx, accountId) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| peers, err := c.repo.GetAccountPeers(ctx, accountId) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| dnsFwdPort := computeForwarderPort(peers, network_map.DnsForwarderPortMinVersion) | ||
| c.peersUpdateManager.SendUpdate(ctx, peerId, &network_map.UpdateMessage{ | ||
| Update: &proto.SyncResponse{ | ||
| RemotePeers: []*proto.RemotePeerConfig{}, | ||
| RemotePeersIsEmpty: true, | ||
| NetworkMap: &proto.NetworkMap{ | ||
| Serial: network.CurrentSerial(), | ||
| RemotePeers: []*proto.RemotePeerConfig{}, | ||
| RemotePeersIsEmpty: true, | ||
| FirewallRules: []*proto.FirewallRule{}, | ||
| FirewallRulesIsEmpty: true, | ||
| DNSConfig: &proto.DNSConfig{ | ||
| ForwarderPort: dnsFwdPort, | ||
| }, | ||
| }, | ||
| }, | ||
| }) | ||
| c.peersUpdateManager.CloseChannel(ctx, peerId) | ||
| return nil | ||
| } | ||
|
|
||
| func (c *Controller) GetValidatedPeerWithMap(ctx context.Context, isRequiresApproval bool, accountID string, peer *nbpeer.Peer) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error) { | ||
| if isRequiresApproval { | ||
| network, err := c.repo.GetAccountNetwork(ctx, accountID) | ||
|
|
@@ -698,35 +694,83 @@ func isPeerInPolicySourceGroups(account *types.Account, peerID string, policy *t | |
| return false, nil | ||
| } | ||
|
|
||
| func (c *Controller) OnPeerUpdated(accountId string, peer *nbpeer.Peer) { | ||
| c.UpdatePeerInNetworkMapCache(accountId, peer) | ||
| _ = c.bufferSendUpdateAccountPeers(context.Background(), accountId) | ||
| func (c *Controller) OnPeersUpdated(ctx context.Context, accountID string, peerIDs []string) error { | ||
| peers, err := c.repo.GetPeersByIDs(ctx, accountID, peerIDs) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to get peers by ids: %w", err) | ||
| } | ||
|
|
||
| for _, peer := range peers { | ||
| c.UpdatePeerInNetworkMapCache(accountID, peer) | ||
| } | ||
|
|
||
| err = c.bufferSendUpdateAccountPeers(ctx, accountID) | ||
| if err != nil { | ||
| log.WithContext(ctx).Errorf("failed to buffer update account peers for peer update in account %s: %v", accountID, err) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
Comment on lines
+697
to
713
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OnPeersUpdated swallows buffer errors, diverging from other handlers
err = c.bufferSendUpdateAccountPeers(ctx, accountID)
if err != nil {
log.WithContext(ctx).Errorf("failed to buffer update account peers for peer update in account %s: %v", accountID, err)
}
return nilBoth Consider aligning behavior with the other methods by returning the error (you can keep logging if you want): - err = c.bufferSendUpdateAccountPeers(ctx, accountID)
- if err != nil {
- log.WithContext(ctx).Errorf("failed to buffer update account peers for peer update in account %s: %v", accountID, err)
- }
-
- return nil
+ if err := c.bufferSendUpdateAccountPeers(ctx, accountID); err != nil {
+ log.WithContext(ctx).Errorf("failed to buffer update account peers for peer update in account %s: %v", accountID, err)
+ return fmt.Errorf("buffer update account peers for account %s: %w", accountID, err)
+ }
+ return nil🤖 Prompt for AI Agents |
||
|
|
||
| func (c *Controller) OnPeerAdded(ctx context.Context, accountID string, peerID string) error { | ||
| if c.experimentalNetworkMap(accountID) { | ||
| account, err := c.requestBuffer.GetAccountWithBackpressure(ctx, accountID) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| func (c *Controller) OnPeersAdded(ctx context.Context, accountID string, peerIDs []string) error { | ||
| for _, peerID := range peerIDs { | ||
| if c.experimentalNetworkMap(accountID) { | ||
| account, err := c.requestBuffer.GetAccountWithBackpressure(ctx, accountID) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| err = c.onPeerAddedUpdNetworkMapCache(account, peerID) | ||
| if err != nil { | ||
| return err | ||
| err = c.onPeerAddedUpdNetworkMapCache(account, peerID) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
| } | ||
| return c.bufferSendUpdateAccountPeers(ctx, accountID) | ||
| } | ||
|
|
||
| func (c *Controller) OnPeerDeleted(ctx context.Context, accountID string, peerID string) error { | ||
| if c.experimentalNetworkMap(accountID) { | ||
| account, err := c.requestBuffer.GetAccountWithBackpressure(ctx, accountID) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| err = c.onPeerDeletedUpdNetworkMapCache(account, peerID) | ||
| if err != nil { | ||
| return err | ||
| func (c *Controller) OnPeersDeleted(ctx context.Context, accountID string, peerIDs []string) error { | ||
| network, err := c.repo.GetAccountNetwork(ctx, accountID) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| peers, err := c.repo.GetAccountPeers(ctx, accountID) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| dnsFwdPort := computeForwarderPort(peers, network_map.DnsForwarderPortMinVersion) | ||
| for _, peerID := range peerIDs { | ||
| c.peersUpdateManager.SendUpdate(ctx, peerID, &network_map.UpdateMessage{ | ||
| Update: &proto.SyncResponse{ | ||
| RemotePeers: []*proto.RemotePeerConfig{}, | ||
| RemotePeersIsEmpty: true, | ||
| NetworkMap: &proto.NetworkMap{ | ||
| Serial: network.CurrentSerial(), | ||
| RemotePeers: []*proto.RemotePeerConfig{}, | ||
| RemotePeersIsEmpty: true, | ||
| FirewallRules: []*proto.FirewallRule{}, | ||
| FirewallRulesIsEmpty: true, | ||
| DNSConfig: &proto.DNSConfig{ | ||
| ForwarderPort: dnsFwdPort, | ||
| }, | ||
| }, | ||
| }, | ||
| }) | ||
| c.peersUpdateManager.CloseChannel(ctx, peerID) | ||
|
|
||
| if c.experimentalNetworkMap(accountID) { | ||
| account, err := c.requestBuffer.GetAccountWithBackpressure(ctx, accountID) | ||
| if err != nil { | ||
| log.WithContext(ctx).Errorf("failed to get account %s: %v", accountID, err) | ||
| continue | ||
| } | ||
| err = c.onPeerDeletedUpdNetworkMapCache(account, peerID) | ||
| if err != nil { | ||
| log.WithContext(ctx).Errorf("failed to update network map cache for deleted peer %s in account %s: %v", peerID, accountID, err) | ||
| continue | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -778,10 +822,6 @@ func (c *Controller) GetNetworkMap(ctx context.Context, peerID string) (*types.N | |
| return networkMap, nil | ||
| } | ||
|
|
||
| func (c *Controller) DisconnectPeers(ctx context.Context, peerIDs []string) { | ||
| func (c *Controller) DisconnectPeers(ctx context.Context, accountId string, peerIDs []string) { | ||
| c.peersUpdateManager.CloseChannels(ctx, peerIDs) | ||
| } | ||
|
|
||
| func (c *Controller) IsConnected(peerID string) bool { | ||
| return c.peersUpdateManager.HasChannel(peerID) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Guard ephemeral manager usage in OnPeerConnected/OnPeerDisconnected
The overall flow of the new lifecycle hooks looks good:
(accountID, peerID).EphemeralPeersManagerof connect/disconnect.PeersUpdateManager.chan *UpdateMessagefromOnPeerConnected.Two points to tighten:
EphemeralPeersManagerAssuming the field can be nil in any configuration or tests, both methods will panic. A lightweight guard would make this safer:
func (c *Controller) OnPeerConnected(ctx context.Context, accountID string, peerID string) (chan *network_map.UpdateMessage, error) { peer, err := c.repo.GetPeerByID(ctx, accountID, peerID) if err != nil { return nil, fmt.Errorf("failed to get peer %s: %v", peerID, err) } - c.EphemeralPeersManager.OnPeerConnected(ctx, peer) + if c.EphemeralPeersManager != nil { + c.EphemeralPeersManager.OnPeerConnected(ctx, peer) + } return c.peersUpdateManager.CreateChannel(ctx, peerID), nil } func (c *Controller) OnPeerDisconnected(ctx context.Context, accountID string, peerID string) { c.peersUpdateManager.CloseChannel(ctx, peerID) peer, err := c.repo.GetPeerByID(ctx, accountID, peerID) if err != nil { log.WithContext(ctx).Errorf("failed to get peer %s: %v", peerID, err) return } - c.EphemeralPeersManager.OnPeerDisconnected(ctx, peer) + if c.EphemeralPeersManager != nil { + c.EphemeralPeersManager.OnPeerDisconnected(ctx, peer) + } }OnPeerConnectedfails fast on repo lookup errors, which is good.OnPeerDisconnectedlogs and returns on lookup failure, which is also reasonable because the channel is already closed and the peer may legitimately be gone. The behavior is consistent; just be aware that the ephemeral manager may not see a disconnect in that corner case.I'll verify this review comment by inspecting the codebase to determine if
EphemeralPeersManagercan actually be nil and whether nil guards are necessary.Length of output: 71
🏁 Script executed:
Length of output: 543
🏁 Script executed:
Length of output: 7165
🏁 Script executed:
Length of output: 1985
Length of output: 372
🏁 Script executed:
Length of output: 26310
🏁 Script executed:
Length of output: 0
🏁 Script executed:
Length of output: 168
Length of output: 3585
🏁 Script executed:
Length of output: 0
🏁 Script executed:
Length of output: 798
Add nil guards for EphemeralPeersManager in lifecycle methods
The review comment is valid. While all current code paths initialize
EphemeralPeersManageras non-nil, the field is exported and can theoretically become nil if:BaseServer.EphemeralManager()returns nilWithout guards, both
OnPeerConnected(line 119) andOnPeerDisconnected(line 131) will panic. Adding nil checks is sound defensive programming for exported interface fields.The suggested diffs in the review are appropriate:
OnPeerConnectedshould guard before callingc.EphemeralPeersManager.OnPeerConnected()OnPeerDisconnectedshould guard before callingc.EphemeralPeersManager.OnPeerDisconnected()🤖 Prompt for AI Agents