Skip to content

Commit

Permalink
[chore][cmd/opampsupervisor]: move supervisor start logic into separa…
Browse files Browse the repository at this point in the history
…te Start() function (#34509)

**Link to tracking Issue:** #34380

Signed-off-by: odubajDT <[email protected]>
  • Loading branch information
odubajDT authored Aug 8, 2024
1 parent 585269b commit 98ee06c
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 8 deletions.
34 changes: 34 additions & 0 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ func TestSupervisorStartsCollectorWithRemoteConfig(t *testing.T) {
})

s := newSupervisor(t, "basic", map[string]string{"url": server.addr})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -281,6 +283,8 @@ func TestSupervisorStartsCollectorWithNoOpAMPServer(t *testing.T) {
"url": server.addr,
"storage_dir": storageDir,
})

require.Nil(t, s.Start())
defer s.Shutdown()

// Verify the collector runs eventually by pinging the healthcheck extension
Expand Down Expand Up @@ -332,6 +336,8 @@ func TestSupervisorStartsWithNoOpAMPServer(t *testing.T) {
s := newSupervisor(t, "basic", map[string]string{
"url": server.addr,
})

require.Nil(t, s.Start())
defer s.Shutdown()

// Verify the collector is running by checking the metrics endpoint
Expand Down Expand Up @@ -416,6 +422,8 @@ func TestSupervisorRestartsCollectorAfterBadConfig(t *testing.T) {
})

s := newSupervisor(t, "basic", map[string]string{"url": server.addr})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -492,6 +500,8 @@ func TestSupervisorConfiguresCapabilities(t *testing.T) {
})

s := newSupervisor(t, "nocap", map[string]string{"url": server.addr})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -547,6 +557,8 @@ func TestSupervisorBootstrapsCollector(t *testing.T) {
})

s := newSupervisor(t, "nocap", map[string]string{"url": server.addr})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -594,6 +606,8 @@ func TestSupervisorReportsEffectiveConfig(t *testing.T) {
})

s := newSupervisor(t, "basic", map[string]string{"url": server.addr})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -701,6 +715,8 @@ func TestSupervisorAgentDescriptionConfigApplies(t *testing.T) {
})

s := newSupervisor(t, "agent_description", map[string]string{"url": server.addr})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -852,6 +868,8 @@ func TestSupervisorRestartCommand(t *testing.T) {
})

s := newSupervisor(t, "basic", map[string]string{"url": server.addr})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -918,6 +936,8 @@ func TestSupervisorOpAMPConnectionSettings(t *testing.T) {
server.ConnectionCallbacksStruct{})

s := newSupervisor(t, "accepts_conn", map[string]string{"url": initialServer.addr})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(initialServer.supervisorConnected, true)
Expand Down Expand Up @@ -978,6 +998,8 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) {

s := newSupervisor(t, "persistence", map[string]string{"url": initialServer.addr, "storage_dir": tempDir})

require.Nil(t, s.Start())

waitForSupervisorConnection(initialServer.supervisorConnected, true)

cfg, hash, _, _ := createSimplePipelineCollectorConf(t)
Expand Down Expand Up @@ -1020,6 +1042,8 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) {
defer newServer.shutdown()

s1 := newSupervisor(t, "persistence", map[string]string{"url": newServer.addr, "storage_dir": tempDir})

require.Nil(t, s1.Start())
defer s1.Shutdown()

waitForSupervisorConnection(newServer.supervisorConnected, true)
Expand Down Expand Up @@ -1066,6 +1090,8 @@ func TestSupervisorPersistsInstanceID(t *testing.T) {
"storage_dir": storageDir,
})

require.Nil(t, s.Start())

waitForSupervisorConnection(server.supervisorConnected, true)

t.Logf("Supervisor connected")
Expand Down Expand Up @@ -1095,6 +1121,8 @@ func TestSupervisorPersistsInstanceID(t *testing.T) {
"url": server.addr,
"storage_dir": storageDir,
})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -1148,6 +1176,8 @@ func TestSupervisorPersistsNewInstanceID(t *testing.T) {
"storage_dir": storageDir,
})

require.Nil(t, s.Start())

waitForSupervisorConnection(server.supervisorConnected, true)

t.Logf("Supervisor connected")
Expand Down Expand Up @@ -1175,6 +1205,8 @@ func TestSupervisorPersistsNewInstanceID(t *testing.T) {
"url": server.addr,
"storage_dir": storageDir,
})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
Expand Down Expand Up @@ -1206,6 +1238,8 @@ func TestSupervisorWritesAgentFilesToStorageDir(t *testing.T) {
"storage_dir": storageDir,
})

require.Nil(t, s.Start())

waitForSupervisorConnection(server.supervisorConnected, true)

t.Logf("Supervisor connected")
Expand Down
7 changes: 7 additions & 0 deletions cmd/opampsupervisor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ func main() {
return
}

err = supervisor.Start()
if err != nil {
logger.Error(err.Error())
os.Exit(-1)
return
}

interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
<-interrupt
Expand Down
20 changes: 12 additions & 8 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,34 +165,38 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) {
return nil, fmt.Errorf("error creating storage dir: %w", err)
}

return s, nil
}

func (s *Supervisor) Start() error {
var err error
s.persistentState, err = loadOrCreatePersistentState(s.persistentStateFilePath())
if err != nil {
return nil, err
return err
}

if err = s.getBootstrapInfo(); err != nil {
return nil, fmt.Errorf("could not get bootstrap info from the Collector: %w", err)
return fmt.Errorf("could not get bootstrap info from the Collector: %w", err)
}

healthCheckPort, err := s.findRandomPort()

if err != nil {
return nil, fmt.Errorf("could not find port for health check: %w", err)
return fmt.Errorf("could not find port for health check: %w", err)
}

s.agentHealthCheckEndpoint = fmt.Sprintf("localhost:%d", healthCheckPort)

logger.Debug("Supervisor starting",
s.logger.Debug("Supervisor starting",
zap.String("id", s.persistentState.InstanceID.String()))

err = s.loadAndWriteInitialMergedConfig()
if err != nil {
return nil, fmt.Errorf("failed loading initial config: %w", err)
return fmt.Errorf("failed loading initial config: %w", err)
}

if err = s.startOpAMP(); err != nil {
return nil, fmt.Errorf("cannot start OpAMP client: %w", err)
return fmt.Errorf("cannot start OpAMP client: %w", err)
}

s.commander, err = commander.NewCommander(
Expand All @@ -202,7 +206,7 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) {
"--config", s.agentConfigFilePath(),
)
if err != nil {
return nil, err
return err
}

s.startHealthCheckTicker()
Expand All @@ -219,7 +223,7 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) {
s.forwardCustomMessagesToServerLoop()
}()

return s, nil
return nil
}

func (s *Supervisor) createTemplates() error {
Expand Down

0 comments on commit 98ee06c

Please sign in to comment.