Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions client/cmd/testutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func startManagement(t *testing.T, config *config.Config, testFile string) (*grp
t.Cleanup(cleanUp)

peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
jobManager := mgmt.NewJobManager(nil, store)
eventStore := &activity.InMemoryEventStore{}
if err != nil {
return nil, nil
Expand All @@ -106,13 +107,13 @@ func startManagement(t *testing.T, config *config.Config, testFile string) (*grp
Return(&types.Settings{}, nil).
AnyTimes()

accountManager, err := mgmt.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "netbird.selfhosted", eventStore, nil, false, iv, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
accountManager, err := mgmt.BuildManager(context.Background(), store, peersUpdateManager, jobManager, nil, "", "netbird.selfhosted", eventStore, nil, false, iv, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
if err != nil {
t.Fatal(err)
}

secretsManager := mgmt.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
mgmtServer, err := mgmt.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, secretsManager, nil, nil, nil, &mgmt.MockIntegratedValidator{})
mgmtServer, err := mgmt.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, jobManager, secretsManager, nil, nil, nil, &mgmt.MockIntegratedValidator{})
if err != nil {
t.Fatal(err)
}
Expand Down
28 changes: 28 additions & 0 deletions client/internal/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ func (e *Engine) Start() error {

e.receiveSignalEvents()
e.receiveManagementEvents()
e.receiveJobEvents()

// starting network monitor at the very last to avoid disruptions
e.startNetworkMonitor()
Expand Down Expand Up @@ -886,6 +887,33 @@ func (e *Engine) updateConfig(conf *mgmProto.PeerConfig) error {
return nil
}

func (e *Engine) receiveJobEvents() {
go func() {
err := e.mgmClient.Job(e.ctx, func(msg *mgmProto.JobRequest) *mgmProto.JobResponse {
// Simple test handler — replace with real logic
log.Infof("Received job request: %+v", msg)
// TODO: trigger local debug bundle or other job
return &mgmProto.JobResponse{
ID: msg.ID,
WorkloadResults: &mgmProto.JobResponse_Bundle{
Bundle: &mgmProto.BundleResult{
UploadKey: "upload-key",
},
},
}
})
if err != nil {
// happens if management is unavailable for a long time.
// We want to cancel the operation of the whole client
_ = CtxGetState(e.ctx).Wrap(ErrResetConnection)
e.clientCancel()
return
}
log.Debugf("stopped receiving jobs from Management Service")
}()
log.Debugf("connecting to Management Service jobs stream")
}

// receiveManagementEvents connects to the Management Service event stream to receive updates from the management service
// E.g. when a new peer has been registered and we are allowed to connect to it.
func (e *Engine) receiveManagementEvents() {
Expand Down
5 changes: 3 additions & 2 deletions client/internal/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1544,6 +1544,7 @@ func startManagement(t *testing.T, dataDir, testFile string) (*grpc.Server, stri
t.Cleanup(cleanUp)

peersUpdateManager := server.NewPeersUpdateManager(nil)
jobManager := server.NewJobManager(nil, store)
eventStore := &activity.InMemoryEventStore{}
if err != nil {
return nil, "", err
Expand All @@ -1568,13 +1569,13 @@ func startManagement(t *testing.T, dataDir, testFile string) (*grpc.Server, stri
permissionsManager := permissions.NewManager(store)
groupsManager := groups.NewManagerMock()

accountManager, err := server.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "netbird.selfhosted", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
accountManager, err := server.BuildManager(context.Background(), store, peersUpdateManager, jobManager, nil, "", "netbird.selfhosted", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
if err != nil {
return nil, "", err
}

secretsManager := server.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
mgmtServer, err := server.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, secretsManager, nil, nil, nil, &server.MockIntegratedValidator{})
mgmtServer, err := server.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, jobManager, secretsManager, nil, nil, nil, &server.MockIntegratedValidator{})
if err != nil {
return nil, "", err
}
Expand Down
5 changes: 3 additions & 2 deletions client/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve
t.Cleanup(cleanUp)

peersUpdateManager := server.NewPeersUpdateManager(nil)
jobManager := server.NewJobManager(nil, store)
eventStore := &activity.InMemoryEventStore{}
if err != nil {
return nil, "", err
Expand All @@ -305,13 +306,13 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve
permissionsManagerMock := permissions.NewMockManager(ctrl)
groupsManager := groups.NewManagerMock()

accountManager, err := server.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "netbird.selfhosted", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
accountManager, err := server.BuildManager(context.Background(), store, peersUpdateManager, jobManager, nil, "", "netbird.selfhosted", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
if err != nil {
return nil, "", err
}

secretsManager := server.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
mgmtServer, err := server.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, secretsManager, nil, nil, nil, &server.MockIntegratedValidator{})
mgmtServer, err := server.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, jobManager, secretsManager, nil, nil, nil, &server.MockIntegratedValidator{})
if err != nil {
return nil, "", err
}
Expand Down
2 changes: 1 addition & 1 deletion management/internals/server/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (s *BaseServer) GRPCServer() *grpc.Server {
}

gRPCAPIHandler := grpc.NewServer(gRPCOpts...)
srv, err := server.NewServer(context.Background(), s.config, s.AccountManager(), s.SettingsManager(), s.PeersUpdateManager(), s.SecretsManager(), s.Metrics(), s.EphemeralManager(), s.AuthManager(), s.IntegratedValidator())
srv, err := server.NewServer(context.Background(), s.config, s.AccountManager(), s.SettingsManager(), s.PeersUpdateManager(), s.JobManager(), s.SecretsManager(), s.Metrics(), s.EphemeralManager(), s.AuthManager(), s.IntegratedValidator())
if err != nil {
log.Fatalf("failed to create management server: %v", err)
}
Expand Down
6 changes: 6 additions & 0 deletions management/internals/server/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ func (s *BaseServer) PeersUpdateManager() *server.PeersUpdateManager {
})
}

func (s *BaseServer) JobManager() *server.JobManager {
return Create(s, func() *server.JobManager {
return server.NewJobManager(s.Metrics(), s.Store())
})
}

func (s *BaseServer) IntegratedValidator() integrated_validator.IntegratedValidator {
return Create(s, func() integrated_validator.IntegratedValidator {
integratedPeerValidator, err := integrations.NewIntegratedValidator(context.Background(), s.EventStore())
Expand Down
2 changes: 1 addition & 1 deletion management/internals/server/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (s *BaseServer) PeersManager() peers.Manager {

func (s *BaseServer) AccountManager() account.Manager {
return Create(s, func() account.Manager {
accountManager, err := server.BuildManager(context.Background(), s.Store(), s.PeersUpdateManager(), s.IdpManager(), s.mgmtSingleAccModeDomain,
accountManager, err := server.BuildManager(context.Background(), s.Store(), s.PeersUpdateManager(), s.JobManager(), s.IdpManager(), s.mgmtSingleAccModeDomain,
s.dnsDomain, s.EventStore(), s.GeoLocationManager(), s.userDeleteFromIDPEnabled, s.IntegratedValidator(), s.Metrics(), s.ProxyController(), s.SettingsManager(), s.PermissionsManager(), s.config.DisableDefaultPolicy)
if err != nil {
log.Fatalf("failed to create account manager: %v", err)
Expand Down
3 changes: 3 additions & 0 deletions management/server/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type DefaultAccountManager struct {
// cacheLoading keeps the accountIDs that are currently reloading. The accountID has to be removed once cache has been reloaded
cacheLoading map[string]chan struct{}
peersUpdateManager *PeersUpdateManager
jobManager *JobManager
idpManager idp.Manager
cacheManager *nbcache.AccountUserDataCache
externalCacheManager nbcache.UserDataCache
Expand Down Expand Up @@ -174,6 +175,7 @@ func BuildManager(
ctx context.Context,
store store.Store,
peersUpdateManager *PeersUpdateManager,
jobManager *JobManager,
idpManager idp.Manager,
singleAccountModeDomain string,
dnsDomain string,
Expand All @@ -196,6 +198,7 @@ func BuildManager(
Store: store,
geo: geo,
peersUpdateManager: peersUpdateManager,
jobManager: jobManager,
idpManager: idpManager,
ctx: context.Background(),
cacheMux: sync.Mutex{},
Expand Down
2 changes: 1 addition & 1 deletion management/server/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2891,7 +2891,7 @@ func createManager(t testing.TB) (*DefaultAccountManager, error) {

permissionsManager := permissions.NewManager(store)

manager, err := BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
manager, err := BuildManager(context.Background(), store, NewPeersUpdateManager(nil), NewJobManager(nil, store), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion management/server/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func createDNSManager(t *testing.T) (*DefaultAccountManager, error) {
// return empty extra settings for expected calls to UpdateAccountPeers
settingsMockManager.EXPECT().GetExtraSettings(gomock.Any(), gomock.Any()).Return(&types.ExtraSettings{}, nil).AnyTimes()
permissionsManager := permissions.NewManager(store)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.test", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), NewJobManager(nil, store), nil, "", "netbird.test", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
}

func createDNSStore(t *testing.T) (store.Store, error) {
Expand Down
Loading
Loading