Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions management/internals/shared/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
}
s.syncSem.Add(1)

reqStart := time.Now()

ctx := srv.Context()

syncReq := &proto.SyncRequest{}
Expand Down Expand Up @@ -261,6 +263,10 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S

s.secretsManager.SetupRefresh(ctx, accountID, peer.ID)

if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountSyncRequestDuration(time.Since(reqStart), accountID)
}
Comment on lines +266 to +268

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Duration metric not recorded on error paths.

The duration is only recorded on the happy path (Line 267), but multiple error paths (Lines 182, 197, 214-218, 245-246, 252-253, 259-261) exit early without recording the metric. This creates incomplete telemetry data that only reflects successful syncs. The Login function avoids this issue by using a defer statement (Lines 558-562), ensuring the duration is always captured.

Apply this diff to ensure duration is always recorded:

 func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error {
 	if s.syncSem.Load() >= s.syncLim {
 		return status.Errorf(codes.ResourceExhausted, "too many concurrent sync requests, please try again later")
 	}
 	s.syncSem.Add(1)
 
 	reqStart := time.Now()
+
+	ctx := srv.Context()
+
+	// Declare accountID early so defer can access it
+	var accountID string
+
+	defer func() {
+		if s.appMetrics != nil {
+			s.appMetrics.GRPCMetrics().CountSyncRequestDuration(time.Since(reqStart), accountID)
+		}
+	}()
 
-	ctx := srv.Context()
 
 	syncReq := &proto.SyncRequest{}
 	peerKey, err := s.parseRequest(ctx, req, syncReq)
 	if err != nil {
 		s.syncSem.Add(-1)
 		return err
 	}
 	realIP := getRealIP(ctx)
 	sRealIP := realIP.String()
 	peerMeta := extractPeerMeta(ctx, syncReq.GetMeta())
 	metahashed := metaHash(peerMeta, sRealIP)
 	if !s.loginFilter.allowLogin(peerKey.String(), metahashed) {
 		if s.appMetrics != nil {
 			s.appMetrics.GRPCMetrics().CountSyncRequestBlocked()
 		}
 		if s.logBlockedPeers {
 			log.WithContext(ctx).Tracef("peer %s with meta hash %d is blocked from syncing", peerKey.String(), metahashed)
 		}
 		if s.blockPeersWithSameConfig {
 			s.syncSem.Add(-1)
 			return mapError(ctx, internalStatus.ErrPeerAlreadyLoggedIn)
 		}
 	}
 
 	if s.appMetrics != nil {
 		s.appMetrics.GRPCMetrics().CountSyncRequest()
 	}
 
 	// nolint:staticcheck
 	ctx = context.WithValue(ctx, nbContext.PeerIDKey, peerKey.String())
 
-	accountID, err := s.accountManager.GetAccountIDForPeerKey(ctx, peerKey.String())
+	accountID, err = s.accountManager.GetAccountIDForPeerKey(ctx, peerKey.String())
 	if err != nil {
 		// nolint:staticcheck
 		ctx = context.WithValue(ctx, nbContext.AccountIDKey, "UNKNOWN")
+		accountID = "UNKNOWN"
 		log.WithContext(ctx).Tracef("peer %s is not registered", peerKey.String())
 		if errStatus, ok := internalStatus.FromError(err); ok && errStatus.Type() == internalStatus.NotFound {
 			s.syncSem.Add(-1)
 			return status.Errorf(codes.PermissionDenied, "peer is not registered")
 		}
 		s.syncSem.Add(-1)
 		return err
 	}
 
 	// nolint:staticcheck
 	ctx = context.WithValue(ctx, nbContext.AccountIDKey, accountID)
 
 	start := time.Now()
 	unlock := s.acquirePeerLockByUID(ctx, peerKey.String())
 	defer func() {
 		if unlock != nil {
 			unlock()
 		}
 	}()
 	log.WithContext(ctx).Tracef("acquired peer lock for peer %s took %v", peerKey.String(), time.Since(start))
 
 	log.WithContext(ctx).Debugf("Sync request from peer [%s] [%s]", req.WgPubKey, sRealIP)
 
 	if syncReq.GetMeta() == nil {
 		log.WithContext(ctx).Tracef("peer system meta has to be provided on sync. Peer %s, remote addr %s", peerKey.String(), realIP)
 	}
 
 	metahash := metaHash(peerMeta, realIP.String())
 	s.loginFilter.addLogin(peerKey.String(), metahash)
 
 	peer, netMap, postureChecks, dnsFwdPort, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), peerMeta, realIP)
 	if err != nil {
 		log.WithContext(ctx).Debugf("error while syncing peer %s: %v", peerKey.String(), err)
 		s.syncSem.Add(-1)
 		return mapError(ctx, err)
 	}
 
 	err = s.sendInitialSync(ctx, peerKey, peer, netMap, postureChecks, srv, dnsFwdPort)
 	if err != nil {
 		log.WithContext(ctx).Debugf("error while sending initial sync for %s: %v", peerKey.String(), err)
 		s.syncSem.Add(-1)
 		return err
 	}
 
 	updates, err := s.networkMapController.OnPeerConnected(ctx, accountID, peer.ID)
 	if err != nil {
 		log.WithContext(ctx).Debugf("error while notify peer connected for %s: %v", peerKey.String(), err)
 		s.syncSem.Add(-1)
 		s.cancelPeerRoutines(ctx, accountID, peer)
 		return err
 	}
 
 	s.secretsManager.SetupRefresh(ctx, accountID, peer.ID)
 
-	if s.appMetrics != nil {
-		s.appMetrics.GRPCMetrics().CountSyncRequestDuration(time.Since(reqStart), accountID)
-	}
-
 	unlock()
 	unlock = nil
 
 	s.syncSem.Add(-1)
 
 	return s.handleUpdates(ctx, accountID, peerKey, peer, updates, srv)
 }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In management/internals/shared/grpc/server.go around lines 266-268, the sync
duration metric is only recorded on the happy path; add a defer immediately
after reqStart is set that calls
s.appMetrics.GRPCMetrics().CountSyncRequestDuration(time.Since(reqStart),
accountID) (guarded by s.appMetrics != nil) so the duration is recorded on all
exits, and remove the existing inline happy-path-only metric call to avoid
double-reporting.


unlock()
unlock = nil

Expand Down
5 changes: 5 additions & 0 deletions management/server/telemetry/grpc_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ func (grpcMetrics *GRPCMetrics) CountLoginRequestDuration(duration time.Duration
}
}

// CountSyncRequestDuration counts the duration of the sync gRPC requests
func (grpcMetrics *GRPCMetrics) CountSyncRequestDuration(duration time.Duration, accountID string) {
grpcMetrics.syncRequestDuration.Record(grpcMetrics.ctx, duration.Milliseconds())
}

// RegisterConnectedStreams registers a function that collects number of active streams and feeds it to the metrics gauge.
func (grpcMetrics *GRPCMetrics) RegisterConnectedStreams(producer func() int64) error {
_, err := grpcMetrics.meter.RegisterCallback(
Expand Down
Loading