Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/cmd/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (p *program) Start(svc service.Service) error {
}
}

serverInstance := server.New(p.ctx, util.FindFirstLogPath(logFiles), configPath, profilesDisabled, updateSettingsDisabled)
serverInstance := server.New(p.ctx, util.FindFirstLogPath(logFiles), configPath, profilesDisabled, updateSettingsDisabled, daemonAddr)
if err := serverInstance.Start(); err != nil {
log.Fatalf("failed to start daemon: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion client/cmd/testutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func startClientDaemon(
s := grpc.NewServer()

server := client.New(ctx,
"", "", false, false)
"", "", false, false, "")
if err := server.Start(); err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion client/cmd/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func runInForegroundMode(ctx context.Context, cmd *cobra.Command, activeProf *pr
r := peer.NewRecorder(config.ManagementURL.String())
r.GetFullStatus()

connectClient := internal.NewConnectClient(ctx, config, r)
connectClient := internal.NewConnectClient(ctx, config, r, daemonAddr)
SetupDebugHandler(ctx, config, r, connectClient, "")

return connectClient.Run(nil)
Expand Down
6 changes: 4 additions & 2 deletions client/internal/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,20 @@ type ConnectClient struct {
engineMutex sync.Mutex

persistSyncResponse bool
daemonAddress string
}

func NewConnectClient(
ctx context.Context,
config *profilemanager.Config,
statusRecorder *peer.Status,

daemonAddress string,
) *ConnectClient {
return &ConnectClient{
ctx: ctx,
config: config,
statusRecorder: statusRecorder,
daemonAddress: daemonAddress,
engineMutex: sync.Mutex{},
}
}
Expand Down Expand Up @@ -270,7 +272,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
checks := loginResp.GetChecks()

c.engineMutex.Lock()
c.engine = NewEngine(engineCtx, cancel, signalClient, mgmClient, relayManager, engineConfig, mobileDependency, c.statusRecorder, checks)
c.engine = NewEngine(engineCtx, cancel, signalClient, mgmClient, relayManager, engineConfig, mobileDependency, c.statusRecorder, checks, c.daemonAddress)
c.engine.SetSyncResponsePersistence(c.persistSyncResponse)
c.engineMutex.Unlock()

Expand Down
106 changes: 98 additions & 8 deletions client/internal/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/tun/netstack"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/proto"

nberrors "github.com/netbirdio/netbird/client/errors"
Expand Down Expand Up @@ -53,6 +55,7 @@ import (
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"

nbssh "github.com/netbirdio/netbird/client/ssh"
nbstatus "github.com/netbirdio/netbird/client/status"
"github.com/netbirdio/netbird/client/system"
nbdns "github.com/netbirdio/netbird/dns"
"github.com/netbirdio/netbird/route"
Expand All @@ -62,7 +65,9 @@ import (
relayClient "github.com/netbirdio/netbird/shared/relay/client"
signal "github.com/netbirdio/netbird/shared/signal/client"
sProto "github.com/netbirdio/netbird/shared/signal/proto"
"github.com/netbirdio/netbird/upload-server/types"
"github.com/netbirdio/netbird/util"
"google.golang.org/grpc/status"
)

// PeerConnectionTimeoutMax is a timeout of an initial connection attempt to a remote peer.
Expand Down Expand Up @@ -194,6 +199,8 @@ type Engine struct {
latestSyncResponse *mgmProto.SyncResponse
connSemaphore *semaphoregroup.SemaphoreGroup
flowManager nftypes.FlowManager

daemonAddress string
}

// Peer is an instance of the Connection Peer
Expand All @@ -217,6 +224,7 @@ func NewEngine(
mobileDep MobileDependency,
statusRecorder *peer.Status,
checks []*mgmProto.Checks,
daemonAddress string,
) *Engine {
engine := &Engine{
clientCtx: clientCtx,
Expand All @@ -236,6 +244,7 @@ func NewEngine(
statusRecorder: statusRecorder,
checks: checks,
connSemaphore: semaphoregroup.NewSemaphoreGroup(connInitLimit),
daemonAddress: daemonAddress,
}

sm := profilemanager.NewServiceManager("")
Expand Down Expand Up @@ -887,19 +896,45 @@ func (e *Engine) updateConfig(conf *mgmProto.PeerConfig) error {
return nil
}

func (e *Engine) getPeerClient() (*grpc.ClientConn, error) {
conn, err := grpc.NewClient(
strings.TrimPrefix(e.daemonAddress, "tcp://"),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, fmt.Errorf("failed to connect to daemon error: %v\n"+
"If the daemon is not running please run: "+
"\nnetbird service install \nnetbird service start\n", err)
}

return conn, nil
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the way to go. The engine is part of the daemon so we are basically connecting to ourself.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i understand , that is why i needed help to understand what is the best why to call bundle ,
is passing grpc client and call it from it or passing address string and call the bundle or without the grpc and call the debugging directly ?
and also considering the other os

func (e *Engine) receiveJobEvents() {
go func() {
err := e.mgmClient.Job(e.ctx, func(msg *mgmProto.JobRequest) *mgmProto.JobResponse {
// Simple test handler — replace with real logic
log.Infof("Received job request: %+v", msg)
// TODO: trigger local debug bundle or other job
return &mgmProto.JobResponse{
ID: msg.ID,
WorkloadResults: &mgmProto.JobResponse_Bundle{
Bundle: &mgmProto.BundleResult{
UploadKey: "upload-key",
switch params := msg.WorkloadParameters.(type) {
case *mgmProto.JobRequest_Bundle:
uploadKey, err := e.handleBundle(params.Bundle)
if err != nil {
return &mgmProto.JobResponse{
ID: msg.ID,
Status: mgmProto.JobStatus_failed,
Reason: []byte(err.Error()),
}
}
return &mgmProto.JobResponse{
ID: msg.ID,
Status: mgmProto.JobStatus_succeeded,
WorkloadResults: &mgmProto.JobResponse_Bundle{
Bundle: &mgmProto.BundleResult{
UploadKey: uploadKey,
},

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you are creating unnecessary code duplication. The only generic part is WorkloadResults. The rest is identical and can be build after the switch

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The generic part are

  1. Status could be succeeded or failed
  2. Reason could be nil or error message
  3. WorkloadResults could be nil or the result
    so the identical part is just job id

},
},
}
default:
return nil
}
})
if err != nil {
Expand All @@ -914,6 +949,61 @@ func (e *Engine) receiveJobEvents() {
log.Debugf("connecting to Management Service jobs stream")
}

func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (string, error) {
conn, err := e.getPeerClient()
if err != nil {
return "", err
}
defer func() {
if err := conn.Close(); err != nil {
log.Errorf("Failed to close connection: %v", err)
}
}()

statusOutput, err := e.getStatusOutput(params.Anonymize)
if err != nil {
return "", err
}
request := &cProto.DebugBundleRequest{
Anonymize: params.Anonymize,
SystemInfo: true,
Status: statusOutput,
LogFileCount: uint32(params.LogFileCount),
UploadURL: types.DefaultBundleURL,
}
service := cProto.NewDaemonServiceClient(conn)
resp, err := service.DebugBundle(e.clientCtx, request)
if err != nil {
return "", fmt.Errorf("failed to bundle debug: " + status.Convert(err).Message())
}

if resp.GetUploadFailureReason() != "" {
return "", fmt.Errorf("upload failed: " + resp.GetUploadFailureReason())
}
return resp.GetUploadedKey(), nil
}

func (e *Engine) getStatusOutput(anon bool) (string, error) {
// todo: implement with real daemon address
conn, err := e.getPeerClient()
if err != nil {
return "", err
}
defer func() {
if err := conn.Close(); err != nil {
log.Errorf("Failed to close connection: %v", err)
}
}()

statusResp, err := cProto.NewDaemonServiceClient(conn).Status(e.clientCtx, &cProto.StatusRequest{GetFullPeerStatus: true, ShouldRunProbes: true})
if err != nil {
return "", fmt.Errorf("status failed: %v", status.Convert(err).Message())
}
return nbstatus.ParseToFullDetailSummary(
nbstatus.ConvertToStatusOutputOverview(statusResp, anon, "", nil, nil, nil, "", ""),
), nil
}

// receiveManagementEvents connects to the Management Service event stream to receive updates from the management service
// E.g. when a new peer has been registered and we are allowed to connect to it.
func (e *Engine) receiveManagementEvents() {
Expand Down
11 changes: 6 additions & 5 deletions client/internal/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func TestEngine_SSH(t *testing.T) {
MobileDependency{},
peer.NewRecorder("https://mgm"),
nil,
"",
)

engine.dnsServer = &dns.MockServer{
Expand Down Expand Up @@ -377,7 +378,7 @@ func TestEngine_UpdateNetworkMap(t *testing.T) {
},
MobileDependency{},
peer.NewRecorder("https://mgm"),
nil)
nil, "")

wgIface := &MockWGIface{
NameFunc: func() string { return "utun102" },
Expand Down Expand Up @@ -595,7 +596,7 @@ func TestEngine_Sync(t *testing.T) {
WgAddr: "100.64.0.1/24",
WgPrivateKey: key,
WgPort: 33100,
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil)
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, "")
engine.ctx = ctx

engine.dnsServer = &dns.MockServer{
Expand Down Expand Up @@ -759,7 +760,7 @@ func TestEngine_UpdateNetworkMapWithRoutes(t *testing.T) {
WgAddr: wgAddr,
WgPrivateKey: key,
WgPort: 33100,
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil)
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, "")
engine.ctx = ctx
newNet, err := stdnet.NewNet()
if err != nil {
Expand Down Expand Up @@ -960,7 +961,7 @@ func TestEngine_UpdateNetworkMapWithDNSUpdate(t *testing.T) {
WgAddr: wgAddr,
WgPrivateKey: key,
WgPort: 33100,
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil)
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, "")
engine.ctx = ctx

newNet, err := stdnet.NewNet()
Expand Down Expand Up @@ -1484,7 +1485,7 @@ func createEngine(ctx context.Context, cancel context.CancelFunc, setupKey strin
}

relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String())
e, err := NewEngine(ctx, cancel, signalClient, mgmtClient, relayMgr, conf, MobileDependency{}, peer.NewRecorder("https://mgm"), nil), nil
e, err := NewEngine(ctx, cancel, signalClient, mgmtClient, relayMgr, conf, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, ""), nil
e.ctx = ctx
return e, err
}
Expand Down
6 changes: 4 additions & 2 deletions client/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type Server struct {
profileManager *profilemanager.ServiceManager
profilesDisabled bool
updateSettingsDisabled bool
daemonAddress string
}

type oauthAuthFlow struct {
Expand All @@ -88,7 +89,7 @@ type oauthAuthFlow struct {
}

// New server instance constructor.
func New(ctx context.Context, logFile string, configFile string, profilesDisabled bool, updateSettingsDisabled bool) *Server {
func New(ctx context.Context, logFile string, configFile string, profilesDisabled bool, updateSettingsDisabled bool, daemonAddress string) *Server {
return &Server{
rootCtx: ctx,
logFile: logFile,
Expand All @@ -97,6 +98,7 @@ func New(ctx context.Context, logFile string, configFile string, profilesDisable
profileManager: profilemanager.NewServiceManager(configFile),
profilesDisabled: profilesDisabled,
updateSettingsDisabled: updateSettingsDisabled,
daemonAddress: daemonAddress,
}
}

Expand Down Expand Up @@ -235,7 +237,7 @@ func (s *Server) connectWithRetryRuns(ctx context.Context, config *profilemanage

runOperation := func() error {
log.Tracef("running client connection")
s.connectClient = internal.NewConnectClient(ctx, config, statusRecorder)
s.connectClient = internal.NewConnectClient(ctx, config, statusRecorder, s.daemonAddress)
s.connectClient.SetSyncResponsePersistence(s.persistSyncResponse)

err := s.connectClient.Run(runningChan)
Expand Down
6 changes: 3 additions & 3 deletions client/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestConnectWithRetryRuns(t *testing.T) {
t.Fatalf("failed to set active profile state: %v", err)
}

s := New(ctx, "debug", "", false, false)
s := New(ctx, "debug", "", false, false, "")

s.config = config

Expand Down Expand Up @@ -152,7 +152,7 @@ func TestServer_Up(t *testing.T) {
t.Fatalf("failed to set active profile state: %v", err)
}

s := New(ctx, "console", "", false, false)
s := New(ctx, "console", "", false, false, "")

err = s.Start()
require.NoError(t, err)
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestServer_SubcribeEvents(t *testing.T) {
t.Fatalf("failed to set active profile state: %v", err)
}

s := New(ctx, "console", "", false, false)
s := New(ctx, "console", "", false, false, "")

err = s.Start()
require.NoError(t, err)
Expand Down
10 changes: 5 additions & 5 deletions management/server/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ func (s *GRPCServer) Job(srv proto.ManagementService_JobServer) error {
}

// Start background response handler
s.startResponseReceiver(ctx, accountID, srv)
s.startResponseReceiver(ctx, srv)

// Prepare per-peer state
updates := s.jobManager.CreateJobChannel(peer.ID)
log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart))
updates := s.jobManager.CreateJobChannel(ctx, accountID, peer.ID)
log.WithContext(ctx).Debugf("Job: took %v", time.Since(reqStart))

// Main loop: forward jobs to client
return s.sendJobsLoop(ctx, accountID, peerKey, peer, updates, srv)
Expand Down Expand Up @@ -262,7 +262,7 @@ func (s *GRPCServer) handleHandshake(ctx context.Context, srv proto.ManagementSe
return peerKey, nil
}

func (s *GRPCServer) startResponseReceiver(ctx context.Context, accountID string, srv proto.ManagementService_JobServer) {
func (s *GRPCServer) startResponseReceiver(ctx context.Context, srv proto.ManagementService_JobServer) {
go func() {
for {
msg, err := srv.Recv()
Expand All @@ -280,7 +280,7 @@ func (s *GRPCServer) startResponseReceiver(ctx context.Context, accountID string
continue
}

if err := s.jobManager.HandleResponse(ctx, accountID, jobResp); err != nil {
if err := s.jobManager.HandleResponse(ctx, jobResp); err != nil {
log.WithContext(ctx).Errorf("handle job response failed: %v", err)
}

Expand Down
Loading
Loading