Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 95 additions & 8 deletions client/internal/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/tun/netstack"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/proto"

nberrors "github.com/netbirdio/netbird/client/errors"
Expand Down Expand Up @@ -53,6 +55,7 @@ import (
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"

nbssh "github.com/netbirdio/netbird/client/ssh"
nbstatus "github.com/netbirdio/netbird/client/status"
"github.com/netbirdio/netbird/client/system"
nbdns "github.com/netbirdio/netbird/dns"
"github.com/netbirdio/netbird/route"
Expand All @@ -62,7 +65,9 @@ import (
relayClient "github.com/netbirdio/netbird/shared/relay/client"
signal "github.com/netbirdio/netbird/shared/signal/client"
sProto "github.com/netbirdio/netbird/shared/signal/proto"
"github.com/netbirdio/netbird/upload-server/types"
"github.com/netbirdio/netbird/util"
"google.golang.org/grpc/status"
)

// PeerConnectionTimeoutMax is a timeout of an initial connection attempt to a remote peer.
Expand Down Expand Up @@ -887,19 +892,45 @@ func (e *Engine) updateConfig(conf *mgmProto.PeerConfig) error {
return nil
}

func (e *Engine) getPeerClient(addr string) (*grpc.ClientConn, error) {
conn, err := grpc.NewClient(
strings.TrimPrefix(addr, "tcp://"),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, fmt.Errorf("failed to connect to daemon error: %v\n"+
"If the daemon is not running please run: "+
"\nnetbird service install \nnetbird service start\n", err)
}

return conn, nil
}

func (e *Engine) receiveJobEvents() {
go func() {
err := e.mgmClient.Job(e.ctx, func(msg *mgmProto.JobRequest) *mgmProto.JobResponse {
// Simple test handler — replace with real logic
log.Infof("Received job request: %+v", msg)
// TODO: trigger local debug bundle or other job
return &mgmProto.JobResponse{
ID: msg.ID,
WorkloadResults: &mgmProto.JobResponse_Bundle{
Bundle: &mgmProto.BundleResult{
UploadKey: "upload-key",
switch params := msg.WorkloadParameters.(type) {
case *mgmProto.JobRequest_Bundle:
uploadKey, err := e.handleBundle(params.Bundle)
if err != nil {
return &mgmProto.JobResponse{
ID: msg.ID,
Status: mgmProto.JobStatus_failed,
Reason: []byte(err.Error()),
}
}
return &mgmProto.JobResponse{
ID: msg.ID,
Status: mgmProto.JobStatus_succeeded,
WorkloadResults: &mgmProto.JobResponse_Bundle{
Bundle: &mgmProto.BundleResult{
UploadKey: uploadKey,
},

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you are creating unnecessary code duplication. The only generic part is WorkloadResults. The rest is identical and can be build after the switch

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The generic part are

  1. Status could be succeeded or failed
  2. Reason could be nil or error message
  3. WorkloadResults could be nil or the result
    so the identical part is just job id

},
},
}
default:
return nil
}
})
if err != nil {
Expand All @@ -914,6 +945,62 @@ func (e *Engine) receiveJobEvents() {
log.Debugf("connecting to Management Service jobs stream")
}

func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (string, error) {
// todo: implement with real daemon address
conn, err := e.getPeerClient("unix:///var/run/netbird.sock")
if err != nil {
return "", err
}
defer func() {
if err := conn.Close(); err != nil {
log.Errorf("Failed to close connection: %v", err)
}
}()

statusOutput, err := e.getStatusOutput(params.Anonymize)
if err != nil {
return "", err
}
request := &cProto.DebugBundleRequest{
Anonymize: params.Anonymize,
SystemInfo: true,
Status: statusOutput,
LogFileCount: uint32(params.LogFileCount),
UploadURL: types.DefaultBundleURL,
}
service := cProto.NewDaemonServiceClient(conn)
resp, err := service.DebugBundle(e.clientCtx, request)
if err != nil {
return "", fmt.Errorf("failed to bundle debug: " + status.Convert(err).Message())
}

if resp.GetUploadFailureReason() != "" {
return "", fmt.Errorf("upload failed: " + resp.GetUploadFailureReason())
}
return resp.GetUploadedKey(), nil
}

func (e *Engine) getStatusOutput(anon bool) (string, error) {
// todo: implement with real daemon address
conn, err := e.getPeerClient("unix:///var/run/netbird.sock")
if err != nil {
return "", err
}
defer func() {
if err := conn.Close(); err != nil {
log.Errorf("Failed to close connection: %v", err)
}
}()

statusResp, err := cProto.NewDaemonServiceClient(conn).Status(e.clientCtx, &cProto.StatusRequest{GetFullPeerStatus: true, ShouldRunProbes: true})
if err != nil {
return "", fmt.Errorf("status failed: %v", status.Convert(err).Message())
}
return nbstatus.ParseToFullDetailSummary(
nbstatus.ConvertToStatusOutputOverview(statusResp, anon, "", nil, nil, nil, "", ""),
), nil
}

// receiveManagementEvents connects to the Management Service event stream to receive updates from the management service
// E.g. when a new peer has been registered and we are allowed to connect to it.
func (e *Engine) receiveManagementEvents() {
Expand Down
10 changes: 5 additions & 5 deletions management/server/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ func (s *GRPCServer) Job(srv proto.ManagementService_JobServer) error {
}

// Start background response handler
s.startResponseReceiver(ctx, accountID, srv)
s.startResponseReceiver(ctx, srv)

// Prepare per-peer state
updates := s.jobManager.CreateJobChannel(peer.ID)
log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart))
updates := s.jobManager.CreateJobChannel(ctx, accountID, peer.ID)
log.WithContext(ctx).Debugf("Job: took %v", time.Since(reqStart))

// Main loop: forward jobs to client
return s.sendJobsLoop(ctx, accountID, peerKey, peer, updates, srv)
Expand Down Expand Up @@ -262,7 +262,7 @@ func (s *GRPCServer) handleHandshake(ctx context.Context, srv proto.ManagementSe
return peerKey, nil
}

func (s *GRPCServer) startResponseReceiver(ctx context.Context, accountID string, srv proto.ManagementService_JobServer) {
func (s *GRPCServer) startResponseReceiver(ctx context.Context, srv proto.ManagementService_JobServer) {
go func() {
for {
msg, err := srv.Recv()
Expand All @@ -280,7 +280,7 @@ func (s *GRPCServer) startResponseReceiver(ctx context.Context, accountID string
continue
}

if err := s.jobManager.HandleResponse(ctx, accountID, jobResp); err != nil {
if err := s.jobManager.HandleResponse(ctx, jobResp); err != nil {
log.WithContext(ctx).Errorf("handle job response failed: %v", err)
}

Expand Down
63 changes: 40 additions & 23 deletions management/server/jobChannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/netbirdio/netbird/management/server/store"
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/shared/management/proto"
log "github.com/sirupsen/logrus"
)

const jobChannelBuffer = 100
Expand All @@ -17,7 +18,6 @@ type JobEvent struct {
PeerID string
Request *proto.JobRequest
Response *proto.JobResponse
Done chan struct{} // closed when response arrives
}

type JobManager struct {
Expand All @@ -42,9 +42,11 @@ func NewJobManager(metrics telemetry.AppMetrics, store store.Store) *JobManager
}

// CreateJobChannel creates or replaces a channel for a peer
func (jm *JobManager) CreateJobChannel(peerID string) chan *JobEvent {
// TODO: all pending jobs stored in db for this peer should be failed
// jm.Store.MarkPendingJobsAsFailed(peerID)
func (jm *JobManager) CreateJobChannel(ctx context.Context, accountID, peerID string) chan *JobEvent {
// all pending jobs stored in db for this peer should be failed
if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, peerID, "Pending job cleanup: marked as failed automatically due to being stuck too long"); err != nil {
log.WithContext(ctx).Error(err.Error())
}

jm.mu.Lock()
defer jm.mu.Unlock()
Expand All @@ -71,7 +73,6 @@ func (jm *JobManager) SendJob(ctx context.Context, accountID, peerID string, req
event := &JobEvent{
PeerID: peerID,
Request: req,
Done: make(chan struct{}),
}

jm.mu.Lock()
Expand All @@ -80,40 +81,33 @@ func (jm *JobManager) SendJob(ctx context.Context, accountID, peerID string, req

select {
case ch <- event:
case <-time.After(5 * time.Second):
jm.cleanup(ctx, accountID, string(req.ID), "timed out")
return fmt.Errorf("job channel full for peer %s", peerID)
}

select {
case <-event.Done:
return nil
case <-time.After(jm.responseWait):
jm.cleanup(ctx, accountID, string(req.ID), "timed out")
return fmt.Errorf("job %s timed out", req.ID)
case <-ctx.Done():
jm.cleanup(ctx, accountID, string(req.ID), ctx.Err().Error())
return ctx.Err()
}
return nil
}

// HandleResponse marks a job as finished and moves it to completed
func (jm *JobManager) HandleResponse(ctx context.Context, accountID string, resp *proto.JobResponse) error {
func (jm *JobManager) HandleResponse(ctx context.Context, resp *proto.JobResponse) error {
jm.mu.Lock()
defer jm.mu.Unlock()

event, ok := jm.pending[string(resp.ID)]
if !ok {
return fmt.Errorf("job %s not found", resp.ID)
}
//update or create the store for job response
err := jm.Store.CompletePeerJob(ctx, resp)
if err == nil {
event.Response = resp
}

event.Response = resp
//TODO: update the store for job response
// jm.store.CompleteJob(ctx,accountID, string(resp.GetID()), string(resp.GetResult()),string(resp.GetReason()))
close(event.Done)
delete(jm.pending, string(resp.ID))

return nil
return err
}

// CloseChannel closes a peer’s channel and cleans up its jobs
Expand All @@ -130,7 +124,9 @@ func (jm *JobManager) CloseChannel(ctx context.Context, accountID, peerID string
for jobID, ev := range jm.pending {
if ev.PeerID == peerID {
// if the client disconnect and there is pending job then marke it as failed
// jm.store.CompleteJob(ctx,accountID, jobID,"", "Time out ")
if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, peerID, "Time out peer disconnected"); err != nil {
log.WithContext(ctx).Errorf(err.Error())
}
delete(jm.pending, jobID)
}
}
Expand All @@ -142,8 +138,29 @@ func (jm *JobManager) cleanup(ctx context.Context, accountID, jobID string, reas
defer jm.mu.Unlock()

if ev, ok := jm.pending[jobID]; ok {
close(ev.Done)
// jm.store.CompleteJob(ctx, accountID, jobID, "", reason)
if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, ev.PeerID, reason); err != nil {
log.WithContext(ctx).Errorf(err.Error())
}
delete(jm.pending, jobID)
}
}

func (jm *JobManager) IsPeerConnected(peerID string) bool {
jm.mu.RLock()
defer jm.mu.RUnlock()

_, ok := jm.jobChannels[peerID]
return ok
}

func (jm *JobManager) IsPeerHasPendingJobs(peerID string) bool {
jm.mu.RLock()
defer jm.mu.RUnlock()

for _, ev := range jm.pending {
if ev.PeerID == peerID {
return true
}
}
return false
}
26 changes: 14 additions & 12 deletions management/server/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,22 +353,24 @@ func (am *DefaultAccountManager) CreatePeerJob(ctx context.Context, accountID, p
}

// check if peer connected
// todo: implement jobManager.IsPeerConnected
// if !am.jobManager.IsPeerConnected(ctx, peerID) {
// return status.NewJobFailedError("peer not connected")
// }
if !am.jobManager.IsPeerConnected(peerID) {
return status.Errorf(status.BadRequest, "peer not connected")
}

// check if already has pending jobs
// todo: implement jobManager.GetPendingJobsByPeerID
// if pending := am.jobManager.GetPendingJobsByPeerID(ctx, peerID); len(pending) > 0 {
// return status.NewJobAlreadyPendingError(peerID)
// }
if am.jobManager.IsPeerHasPendingJobs(peerID) {
return status.Errorf(status.BadRequest, "peer already hase pending job")
}

jobStream, err := job.ToStreamJobRequest()
if err != nil {
return status.Errorf(status.BadRequest, "invalid job request %v", err)
}

// try sending job first
// todo: implement am.jobManager.SendJob
// if err := am.jobManager.SendJob(ctx, peerID, job); err != nil {
// return status.NewJobFailedError(fmt.Sprintf("failed to send job: %v", err))
// }
if err := am.jobManager.SendJob(ctx, accountID, peerID, jobStream); err != nil {
return status.Errorf(status.Internal, "failed to send job: %v", err)
}

var peer *nbpeer.Peer
var eventsToStore func()
Expand Down
Loading
Loading