From 826f6133ba087631b140e0c283bbc8e0e0f7a66f Mon Sep 17 00:00:00 2001 From: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> Date: Tue, 12 Dec 2023 15:45:22 -0500 Subject: [PATCH 01/15] Implement Collector bootstrapping in the Supervisor --- cmd/opampsupervisor/e2e_test.go | 92 +++++- cmd/opampsupervisor/go.mod | 1 + cmd/opampsupervisor/go.sum | 2 + cmd/opampsupervisor/specification/README.md | 4 +- cmd/opampsupervisor/supervisor/supervisor.go | 285 ++++++++++++------ .../supervisor/templates/bootstrap.yaml | 24 ++ .../supervisor/templates/extraconfig.yaml | 18 ++ .../supervisor/templates/owntelemetry.yaml | 21 ++ 8 files changed, 342 insertions(+), 105 deletions(-) create mode 100644 cmd/opampsupervisor/supervisor/templates/bootstrap.yaml create mode 100644 cmd/opampsupervisor/supervisor/templates/extraconfig.yaml create mode 100644 cmd/opampsupervisor/supervisor/templates/owntelemetry.yaml diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index a9be00a7e4d1..a569ac747b65 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -15,6 +15,7 @@ import ( "net/http" "net/http/httptest" "os" + "os/exec" "path" "runtime" "strings" @@ -23,12 +24,18 @@ import ( "text/template" "time" + "github.com/knadh/koanf/parsers/yaml" + "github.com/knadh/koanf/providers/file" + "github.com/knadh/koanf/providers/rawbytes" + "github.com/knadh/koanf/v2" "github.com/open-telemetry/opamp-go/protobufs" "github.com/open-telemetry/opamp-go/server" "github.com/open-telemetry/opamp-go/server/types" "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor" + "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + semconv "go.opentelemetry.io/collector/semconv/v1.21.0" "go.uber.org/zap" ) @@ -122,6 +129,14 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca } func newSupervisor(t *testing.T, configType string, extraConfigData map[string]string) *supervisor.Supervisor { + cfgFile := getSupervisorConfig(t, configType, extraConfigData) + s, err := supervisor.NewSupervisor(zap.NewNop(), cfgFile.Name()) + require.NoError(t, err) + + return s +} + +func getSupervisorConfig(t *testing.T, configType string, extraConfigData map[string]string) *os.File { tpl, err := os.ReadFile(path.Join("testdata", "supervisor", "supervisor_"+configType+".yaml")) require.NoError(t, err) @@ -148,10 +163,7 @@ func newSupervisor(t *testing.T, configType string, extraConfigData map[string]s _, err = cfgFile.Write(buf.Bytes()) require.NoError(t, err) - s, err := supervisor.NewSupervisor(zap.NewNop(), cfgFile.Name()) - require.NoError(t, err) - - return s + return cfgFile } func TestSupervisorStartsCollectorWithRemoteConfig(t *testing.T) { @@ -323,6 +335,78 @@ func TestSupervisorConfiguresCapabilities(t *testing.T) { }, 5*time.Second, 250*time.Millisecond) } +func TestSupervisorBootstrapsCollector(t *testing.T) { + agentDescription := atomic.Value{} + + // Load the Supervisor config so we can get the location of + // the Collector that will be run. + var cfg config.Supervisor + cfgFile := getSupervisorConfig(t, "nocap", map[string]string{}) + k := koanf.New("::") + err := k.Load(file.Provider(cfgFile.Name()), yaml.Parser()) + require.NoError(t, err) + err = k.UnmarshalWithConf("", &cfg, koanf.UnmarshalConf{ + Tag: "mapstructure", + }) + require.NoError(t, err) + + // Get the binary name and version from the Collector binary + // using the `components` command that prints a YAML-encoded + // map of information about the Collector build. Some of this + // information will be used as defaults for the telemetry + // attributes. + agentPath := cfg.Agent.Executable + componentsInfo, err := exec.Command(agentPath, "components").Output() + require.NoError(t, err) + k = koanf.New("::") + err = k.Load(rawbytes.Provider(componentsInfo), yaml.Parser()) + require.NoError(t, err) + buildinfo := k.StringMap("buildinfo") + command := buildinfo["command"] + version := buildinfo["version"] + + server := newOpAMPServer( + t, + defaultConnectingHandler, + server.ConnectionCallbacksStruct{ + OnMessageFunc: func(_ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + if message.AgentDescription != nil { + agentDescription.Store(message.AgentDescription) + } + + return &protobufs.ServerToAgent{} + }, + }) + + s := newSupervisor(t, "nocap", map[string]string{"url": server.addr}) + defer s.Shutdown() + + waitForSupervisorConnection(server.supervisorConnected, true) + + require.Eventually(t, func() bool { + ad, ok := agentDescription.Load().(*protobufs.AgentDescription) + if !ok { + return false + } + + var agentName, agentVersion string + identAttr := ad.IdentifyingAttributes + for _, attr := range identAttr { + switch attr.Key { + case semconv.AttributeServiceName: + agentName = attr.Value.GetStringValue() + case semconv.AttributeServiceVersion: + agentVersion = attr.Value.GetStringValue() + } + } + + // By default the Collector should report its name and version + // from the component.BuildInfo struct built into the Collector + // binary. + return agentName == command && agentVersion == version + }, 5*time.Second, 250*time.Millisecond) +} + // Creates a Collector config that reads and writes logs to files and provides // file descriptors for I/O operations to those files. The files are placed // in a unique temp directory that is cleaned up after the test's completion. diff --git a/cmd/opampsupervisor/go.mod b/cmd/opampsupervisor/go.mod index 93ebc0eb675c..c91c034eef8c 100644 --- a/cmd/opampsupervisor/go.mod +++ b/cmd/opampsupervisor/go.mod @@ -12,6 +12,7 @@ require ( github.com/open-telemetry/opamp-go v0.10.0 github.com/stretchr/testify v1.8.4 go.opentelemetry.io/collector/config/configtls v0.91.0 + go.opentelemetry.io/collector/semconv v0.91.0 go.uber.org/zap v1.26.0 ) diff --git a/cmd/opampsupervisor/go.sum b/cmd/opampsupervisor/go.sum index 33d68d57e4d1..91d45ccc5331 100644 --- a/cmd/opampsupervisor/go.sum +++ b/cmd/opampsupervisor/go.sum @@ -41,6 +41,8 @@ go.opentelemetry.io/collector/config/configopaque v0.91.0 h1:bQgJPyARbuXAsU2p6h2 go.opentelemetry.io/collector/config/configopaque v0.91.0/go.mod h1:TPCHaU+QXiEV+JXbgyr6mSErTI9chwQyasDVMdJr3eY= go.opentelemetry.io/collector/config/configtls v0.91.0 h1:lZromNeOslPwyVlTPMOzF2q++SY+VONvfH3cDqA0kKk= go.opentelemetry.io/collector/config/configtls v0.91.0/go.mod h1:E+CW5gZoH8V3z5aSlZxwiof7GAcayzn1HRM+uRILLEI= +go.opentelemetry.io/collector/semconv v0.91.0 h1:TRd+yDDfKQl+aNtS24wmEbJp1/QE/xAFV9SB5zWGxpE= +go.opentelemetry.io/collector/semconv v0.91.0/go.mod h1:j/8THcqVxFna1FpvA2zYIsUperEtOaRaqoLYIN4doWw= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= diff --git a/cmd/opampsupervisor/specification/README.md b/cmd/opampsupervisor/specification/README.md index 909ca86f58e5..e72656b0a9ae 100644 --- a/cmd/opampsupervisor/specification/README.md +++ b/cmd/opampsupervisor/specification/README.md @@ -220,8 +220,8 @@ configuration. To overcome this problem the Supervisor starts the Collector with an "noop" configuration that collects nothing but allows the opamp extension to be started. The "noop" configuration is a single pipeline -with a filelog receiver that points to a non-existing file and a logging -exporter and the opamp extension. The purpose of the "noop" +with an OTLP receiver that listens on a random port and a debug +exporter, and the opamp extension. The purpose of the "noop" configuration is to make sure the Collector starts and the opamp extension communicates with the Supervisor. diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 3e8355040c4d..2b1506aeb0d8 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -4,15 +4,18 @@ package supervisor import ( + "bytes" "context" + _ "embed" "errors" "fmt" "math/rand" "net" + "net/http" "os" - "runtime" "sort" "sync/atomic" + "text/template" "time" "github.com/cenkalti/backoff/v4" @@ -24,6 +27,9 @@ import ( "github.com/open-telemetry/opamp-go/client" "github.com/open-telemetry/opamp-go/client/types" "github.com/open-telemetry/opamp-go/protobufs" + "github.com/open-telemetry/opamp-go/server" + serverTypes "github.com/open-telemetry/opamp-go/server/types" + semconv "go.opentelemetry.io/collector/semconv/v1.21.0" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/commander" @@ -31,8 +37,16 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/healthchecker" ) -// This Supervisor is developed specifically for the OpenTelemetry Collector. -const agentType = "io.opentelemetry.collector" +var ( + //go:embed templates/bootstrap.yaml + bootstrapConfTpl string + + //go:embed templates/extraconfig.yaml + extraConfigTpl string + + //go:embed templates/owntelemetry.yaml + ownTelemetryTpl string +) // Supervisor implements supervising of OpenTelemetry Collector and uses OpAMPClient // to work with an OpAMP Server. @@ -51,12 +65,21 @@ type Supervisor struct { // Supervisor's own config. config config.Supervisor + agentDescription *protobufs.AgentDescription + // Agent's instance id. instanceID ulid.ULID + // The name of the agent. + agentName string + // The version of the agent. agentVersion string + bootstrapTemplate *template.Template + extraConfigTemplate *template.Template + ownTelemetryTemplate *template.Template + // A config section to be added to the Collector's config to fetch its own metrics. // TODO: store this persistently so that when starting we can compose the effective // config correctly. @@ -97,12 +120,24 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { effectiveConfig: &atomic.Value{}, } + if err := s.createTemplates(); err != nil { + return nil, err + } + if err := s.loadConfig(configFile); err != nil { return nil, fmt.Errorf("error loading config: %w", err) } + id, err := s.createInstanceID() + + if err != nil { + return nil, err + } + + s.instanceID = id + if err := s.getBootstrapInfo(); err != nil { - s.logger.Error("Couldn't get agent version", zap.Error(err)) + return nil, fmt.Errorf("could not get bootstrap info from the Collector: %w", err) } port, err := s.findRandomPort() @@ -113,16 +148,8 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { s.agentHealthCheckEndpoint = fmt.Sprintf("localhost:%d", port) - id, err := s.createInstanceID() - - if err != nil { - return nil, err - } - - s.instanceID = id - logger.Debug("Supervisor starting", - zap.String("id", s.instanceID.String()), zap.String("type", agentType), zap.String("version", s.agentVersion)) + zap.String("id", s.instanceID.String()), zap.String("name", s.agentName), zap.String("version", s.agentVersion)) s.loadAgentEffectiveConfig() @@ -145,6 +172,26 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { return s, nil } +func (s *Supervisor) createTemplates() error { + var err error + s.bootstrapTemplate, err = template.New("bootstrap").Parse(bootstrapConfTpl) + if err != nil { + return err + } + + s.extraConfigTemplate, err = template.New("bootstrap").Parse(extraConfigTpl) + if err != nil { + return err + } + + s.ownTelemetryTemplate, err = template.New("bootstrap").Parse(ownTelemetryTpl) + if err != nil { + return err + } + + return nil +} + func (s *Supervisor) loadConfig(configFile string) error { if configFile == "" { return errors.New("path to config file cannot be empty") @@ -166,10 +213,102 @@ func (s *Supervisor) loadConfig(configFile string) error { return nil } -// TODO: Implement bootstrapping https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21071 -// nolint: unparam func (s *Supervisor) getBootstrapInfo() (err error) { - s.agentVersion = "1.0.0" + port, err := s.findRandomPort() + + if err != nil { + return err + } + + supervisorPort, err := s.findRandomPort() + + if err != nil { + return err + } + + var cfg bytes.Buffer + + s.bootstrapTemplate.Execute(&cfg, map[string]any{ + "EndpointPort": port, + "InstanceUid": s.instanceID.String(), + "SupervisorPort": supervisorPort, + }) + + s.writeEffectiveConfigToFile(cfg.String(), s.effectiveConfigFilePath) + + srv := server.New(s.logger.Sugar()) + + done := make(chan struct{}, 1) + var connected bool + + srv.Start(server.StartSettings{ + Settings: server.Settings{ + Callbacks: server.CallbacksStruct{ + OnConnectingFunc: func(request *http.Request) serverTypes.ConnectionResponse { + connected = true + return serverTypes.ConnectionResponse{ + Accept: true, + ConnectionCallbacks: server.ConnectionCallbacksStruct{ + OnMessageFunc: func(conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + if message.AgentDescription != nil { + s.agentDescription = message.AgentDescription + identAttr := s.agentDescription.IdentifyingAttributes + + for _, attr := range identAttr { + switch attr.Key { + case semconv.AttributeServiceName: + s.agentName = attr.Value.GetStringValue() + case semconv.AttributeServiceVersion: + s.agentVersion = attr.Value.GetStringValue() + } + } + + done <- struct{}{} + } + + return &protobufs.ServerToAgent{} + }, + }, + } + }, + }, + }, + ListenEndpoint: fmt.Sprintf("localhost:%d", supervisorPort), + }) + + cmd, err := commander.NewCommander( + s.logger, + s.config.Agent, + "--config", s.effectiveConfigFilePath, + ) + + if err != nil { + return err + } + + cmd.Start(context.Background()) + + select { + case <-time.After(3 * time.Second): + if connected { + return errors.New("collector connected but never responded with an AgentDescription message") + } else { + return errors.New("collector's OpAMP client never connected to the Supervisor") + } + case <-done: + } + + err = cmd.Stop(context.Background()) + + if err != nil { + return err + } + + err = srv.Stop(context.Background()) + + if err != nil { + return err + } return nil } @@ -252,7 +391,7 @@ func (s *Supervisor) startOpAMP() error { }, Capabilities: s.Capabilities(), } - err = s.opampClient.SetAgentDescription(s.createAgentDescription()) + err = s.opampClient.SetAgentDescription(s.agentDescription) if err != nil { return err } @@ -287,57 +426,23 @@ func (s *Supervisor) createInstanceID() (ulid.ULID, error) { } -func keyVal(key, val string) *protobufs.KeyValue { - return &protobufs.KeyValue{ - Key: key, - Value: &protobufs.AnyValue{ - Value: &protobufs.AnyValue_StringValue{StringValue: val}, - }, - } -} - -func (s *Supervisor) createAgentDescription() *protobufs.AgentDescription { - hostname, _ := os.Hostname() - - return &protobufs.AgentDescription{ - IdentifyingAttributes: []*protobufs.KeyValue{ - keyVal("service.name", agentType), - keyVal("service.version", s.agentVersion), - keyVal("service.instance.id", s.instanceID.String()), - }, - NonIdentifyingAttributes: []*protobufs.KeyValue{ - keyVal("os.family", runtime.GOOS), - keyVal("host.name", hostname), +func (s *Supervisor) composeExtraLocalConfig() []byte { + var cfg bytes.Buffer + err := s.extraConfigTemplate.Execute( + &cfg, + map[string]any{ + "ServiceName": s.agentName, + "ServiceVersion": s.agentVersion, + "InstanceId": s.instanceID.String(), + "Healthcheck": s.agentHealthCheckEndpoint, }, + ) + if err != nil { + s.logger.Error("Could not compose local config", zap.Error(err)) + return nil } -} -func (s *Supervisor) composeExtraLocalConfig() string { - return fmt.Sprintf(` -service: - telemetry: - logs: - # Enables JSON log output for the Agent. - encoding: json - resource: - # Set resource attributes required by OpAMP spec. - # See https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#agentdescriptionidentifying_attributes - service.name: %s - service.version: %s - service.instance.id: %s - - # Enable extension to allow the Supervisor to check health. - extensions: [health_check] - -extensions: - health_check: - endpoint: %s -`, - agentType, - s.agentVersion, - s.instanceID.String(), - s.agentHealthCheckEndpoint, - ) + return cfg.Bytes() } func (s *Supervisor) loadAgentEffectiveConfig() { @@ -349,7 +454,7 @@ func (s *Supervisor) loadAgentEffectiveConfig() { effectiveConfigBytes = effFromFile } else { // No effective config file, just use the initial config. - effectiveConfigBytes = []byte(s.composeExtraLocalConfig()) + effectiveConfigBytes = s.composeExtraLocalConfig() } s.effectiveConfig.Store(string(effectiveConfigBytes)) @@ -375,11 +480,10 @@ func (s *Supervisor) createEffectiveConfigMsg() *protobufs.EffectiveConfig { } func (s *Supervisor) setupOwnMetrics(_ context.Context, settings *protobufs.TelemetryConnectionSettings) (configChanged bool) { - var cfg string + var cfg bytes.Buffer if settings.DestinationEndpoint == "" { // No destination. Disable metric collection. s.logger.Debug("Disabling own metrics pipeline in the config") - cfg = "" } else { s.logger.Debug("Enabling own metrics pipeline in the config") @@ -390,37 +494,20 @@ func (s *Supervisor) setupOwnMetrics(_ context.Context, settings *protobufs.Tele return } - cfg = fmt.Sprintf( - ` -receivers: - # Collect own metrics - prometheus/own_metrics: - config: - scrape_configs: - - job_name: 'otel-collector' - scrape_interval: 10s - static_configs: - - targets: ['0.0.0.0:%d'] -exporters: - otlphttp/own_metrics: - metrics_endpoint: %s - -service: - telemetry: - metrics: - address: :%d - pipelines: - metrics/own_metrics: - receivers: [prometheus/own_metrics] - exporters: [otlphttp/own_metrics] -`, - port, - settings.DestinationEndpoint, - port, + err = s.ownTelemetryTemplate.Execute( + &cfg, + map[string]any{ + "PrometheusPort": port, + "MetricsEndpoint": settings.DestinationEndpoint, + }, ) - } + if err != nil { + s.logger.Error("Could not setup own metrics", zap.Error(err)) + return + } - s.agentConfigOwnMetricsSection.Store(cfg) + } + s.agentConfigOwnMetricsSection.Store(cfg.String()) // Need to recalculate the Agent config so that the metric config is included in it. configChanged, err := s.recalcEffectiveConfig() @@ -481,7 +568,7 @@ func (s *Supervisor) composeEffectiveConfig(config *protobufs.AgentRemoteConfig) } // Merge local config last since it has the highest precedence. - if err = k.Load(rawbytes.Provider([]byte(s.composeExtraLocalConfig())), yaml.Parser()); err != nil { + if err = k.Load(rawbytes.Provider(s.composeExtraLocalConfig()), yaml.Parser()); err != nil { return false, err } @@ -737,7 +824,7 @@ func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) { zap.String("old_id", s.instanceID.String()), zap.String("new_id", newInstanceID.String())) s.instanceID = newInstanceID - err = s.opampClient.SetAgentDescription(s.createAgentDescription()) + err = s.opampClient.SetAgentDescription(s.agentDescription) if err != nil { s.logger.Error("Failed to send agent description to OpAMP server") } diff --git a/cmd/opampsupervisor/supervisor/templates/bootstrap.yaml b/cmd/opampsupervisor/supervisor/templates/bootstrap.yaml new file mode 100644 index 000000000000..983b62c4af59 --- /dev/null +++ b/cmd/opampsupervisor/supervisor/templates/bootstrap.yaml @@ -0,0 +1,24 @@ +receivers: + otlp: + protocols: + http: + endpoint: "localhost:{{.EndpointPort}}" +exporters: + debug: + verbosity: basic + +extensions: + opamp: + instance_uid: "{{.InstanceUid}}" + server: + ws: + endpoint: "ws://localhost:{{.SupervisorPort}}/v1/opamp" + tls: + insecure: true + +service: + pipelines: + traces: + receivers: [otlp] + exporters: [debug] + extensions: [opamp] diff --git a/cmd/opampsupervisor/supervisor/templates/extraconfig.yaml b/cmd/opampsupervisor/supervisor/templates/extraconfig.yaml new file mode 100644 index 000000000000..87b6d5535b68 --- /dev/null +++ b/cmd/opampsupervisor/supervisor/templates/extraconfig.yaml @@ -0,0 +1,18 @@ +service: + telemetry: + logs: + # Enables JSON log output for the Agent. + encoding: json + resource: + # Set resource attributes required by OpAMP spec. + # See https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#agentdescriptionidentifying_attributes + service.name: "{{.ServiceName}}" + service.version: "{{.ServiceVersion}}" + service.instance.id: "{{.InstanceId}}" + + # Enable extension to allow the Supervisor to check health. + extensions: [health_check] + +extensions: + health_check: + endpoint: "{{.Healthcheck}}" diff --git a/cmd/opampsupervisor/supervisor/templates/owntelemetry.yaml b/cmd/opampsupervisor/supervisor/templates/owntelemetry.yaml new file mode 100644 index 000000000000..f46851298780 --- /dev/null +++ b/cmd/opampsupervisor/supervisor/templates/owntelemetry.yaml @@ -0,0 +1,21 @@ +receivers: + # Collect own metrics + prometheus/own_metrics: + config: + scrape_configs: + - job_name: 'otel-collector' + scrape_interval: 10s + static_configs: + - targets: ['0.0.0.0:{{.PrometheusPort}}'] +exporters: + otlphttp/own_metrics: + metrics_endpoint: "{{.MetricsEndpoint}}" + +service: + telemetry: + metrics: + address: ":{{.PrometheusPort}}" + pipelines: + metrics/own_metrics: + receivers: [prometheus/own_metrics] + exporters: [otlphttp/own_metrics] From 99a1bdac11e69d7f196670fde3bbd1ca88051f9b Mon Sep 17 00:00:00 2001 From: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> Date: Wed, 13 Dec 2023 09:06:57 -0500 Subject: [PATCH 02/15] Address PR feedback --- cmd/opampsupervisor/supervisor/server.go | 45 +++++++++++++ cmd/opampsupervisor/supervisor/supervisor.go | 68 ++++++++++---------- 2 files changed, 78 insertions(+), 35 deletions(-) create mode 100644 cmd/opampsupervisor/supervisor/server.go diff --git a/cmd/opampsupervisor/supervisor/server.go b/cmd/opampsupervisor/supervisor/server.go new file mode 100644 index 000000000000..efc7ba5e154d --- /dev/null +++ b/cmd/opampsupervisor/supervisor/server.go @@ -0,0 +1,45 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package supervisor + +import ( + "net/http" + + "github.com/open-telemetry/opamp-go/protobufs" + "github.com/open-telemetry/opamp-go/server" + serverTypes "github.com/open-telemetry/opamp-go/server/types" +) + +type flattenedSettings struct { + onMessageFunc func(conn serverTypes.Connection, message *protobufs.AgentToServer) + onConnectingFunc func(request *http.Request) + endpoint string +} + +func newServerSettings(fs flattenedSettings) server.StartSettings { + return server.StartSettings{ + Settings: server.Settings{ + Callbacks: server.CallbacksStruct{ + OnConnectingFunc: func(request *http.Request) serverTypes.ConnectionResponse { + if fs.onConnectingFunc != nil { + fs.onConnectingFunc(request) + } + return serverTypes.ConnectionResponse{ + Accept: true, + ConnectionCallbacks: server.ConnectionCallbacksStruct{ + OnMessageFunc: func(conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + if fs.onMessageFunc != nil { + fs.onMessageFunc(conn, message) + } + + return &protobufs.ServerToAgent{} + }, + }, + } + }, + }, + }, + ListenEndpoint: fs.endpoint, + } +} diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 2b1506aeb0d8..155314350add 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -239,42 +239,40 @@ func (s *Supervisor) getBootstrapInfo() (err error) { srv := server.New(s.logger.Sugar()) done := make(chan struct{}, 1) - var connected bool - - srv.Start(server.StartSettings{ - Settings: server.Settings{ - Callbacks: server.CallbacksStruct{ - OnConnectingFunc: func(request *http.Request) serverTypes.ConnectionResponse { - connected = true - return serverTypes.ConnectionResponse{ - Accept: true, - ConnectionCallbacks: server.ConnectionCallbacksStruct{ - OnMessageFunc: func(conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { - if message.AgentDescription != nil { - s.agentDescription = message.AgentDescription - identAttr := s.agentDescription.IdentifyingAttributes - - for _, attr := range identAttr { - switch attr.Key { - case semconv.AttributeServiceName: - s.agentName = attr.Value.GetStringValue() - case semconv.AttributeServiceVersion: - s.agentVersion = attr.Value.GetStringValue() - } - } - - done <- struct{}{} - } - - return &protobufs.ServerToAgent{} - }, - }, + var connected atomic.Bool + + srv.Start(newServerSettings(flattenedSettings{ + endpoint: fmt.Sprintf("localhost:%d", supervisorPort), + onConnectingFunc: func(request *http.Request) { + connected.Store(true) + + }, + onMessageFunc: func(_ serverTypes.Connection, message *protobufs.AgentToServer) { + if message.AgentDescription != nil { + s.agentDescription = message.AgentDescription + identAttr := s.agentDescription.IdentifyingAttributes + + for _, attr := range identAttr { + switch attr.Key { + case semconv.AttributeServiceInstanceID: + if attr.Value.GetStringValue() != s.instanceID.String() { + s.logger.Warn( + "Client's instance ID does not match with the instance ID set by the Supervisor", + zap.String("expected", s.instanceID.String()), + zap.String("saw", attr.Value.GetStringValue()), + ) + } + case semconv.AttributeServiceName: + s.agentName = attr.Value.GetStringValue() + case semconv.AttributeServiceVersion: + s.agentVersion = attr.Value.GetStringValue() } - }, - }, + } + + done <- struct{}{} + } }, - ListenEndpoint: fmt.Sprintf("localhost:%d", supervisorPort), - }) + })) cmd, err := commander.NewCommander( s.logger, @@ -290,7 +288,7 @@ func (s *Supervisor) getBootstrapInfo() (err error) { select { case <-time.After(3 * time.Second): - if connected { + if connected.Load() { return errors.New("collector connected but never responded with an AgentDescription message") } else { return errors.New("collector's OpAMP client never connected to the Supervisor") From 3add5078d2d562cfdae4b3cddcfb8bd1904d847f Mon Sep 17 00:00:00 2001 From: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> Date: Wed, 13 Dec 2023 16:03:33 -0500 Subject: [PATCH 03/15] Add issue todo --- cmd/opampsupervisor/supervisor/supervisor.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 155314350add..f1ab5fc6defc 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -255,8 +255,10 @@ func (s *Supervisor) getBootstrapInfo() (err error) { for _, attr := range identAttr { switch attr.Key { case semconv.AttributeServiceInstanceID: + // TODO Consider this a critical error + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29864 if attr.Value.GetStringValue() != s.instanceID.String() { - s.logger.Warn( + s.logger.Error( "Client's instance ID does not match with the instance ID set by the Supervisor", zap.String("expected", s.instanceID.String()), zap.String("saw", attr.Value.GetStringValue()), From 68a633fe59b8cba387d30543262578bd82679a6d Mon Sep 17 00:00:00 2001 From: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> Date: Wed, 13 Dec 2023 16:09:00 -0500 Subject: [PATCH 04/15] Add a changelog --- .chloggen/supervisor-bootstrapping.yaml | 27 +++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100755 .chloggen/supervisor-bootstrapping.yaml diff --git a/.chloggen/supervisor-bootstrapping.yaml b/.chloggen/supervisor-bootstrapping.yaml new file mode 100755 index 000000000000..962ee4799413 --- /dev/null +++ b/.chloggen/supervisor-bootstrapping.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: cmd/opampsupervisor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Use a bootstrapping flow to get the Collector's agent description. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [21071] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] From 45d153390aae1223de5a7855b48e7f0028d11ac6 Mon Sep 17 00:00:00 2001 From: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> Date: Thu, 14 Dec 2023 15:18:49 -0500 Subject: [PATCH 05/15] Fix lint errors --- cmd/opampsupervisor/supervisor/supervisor.go | 28 ++++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index f1ab5fc6defc..422d0d5df7ce 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -129,14 +129,13 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { } id, err := s.createInstanceID() - if err != nil { return nil, err } s.instanceID = id - if err := s.getBootstrapInfo(); err != nil { + if err = s.getBootstrapInfo(); err != nil { return nil, fmt.Errorf("could not get bootstrap info from the Collector: %w", err) } @@ -215,24 +214,25 @@ func (s *Supervisor) loadConfig(configFile string) error { func (s *Supervisor) getBootstrapInfo() (err error) { port, err := s.findRandomPort() - if err != nil { return err } supervisorPort, err := s.findRandomPort() - if err != nil { return err } var cfg bytes.Buffer - s.bootstrapTemplate.Execute(&cfg, map[string]any{ + err = s.bootstrapTemplate.Execute(&cfg, map[string]any{ "EndpointPort": port, "InstanceUid": s.instanceID.String(), "SupervisorPort": supervisorPort, }) + if err != nil { + return err + } s.writeEffectiveConfigToFile(cfg.String(), s.effectiveConfigFilePath) @@ -241,7 +241,7 @@ func (s *Supervisor) getBootstrapInfo() (err error) { done := make(chan struct{}, 1) var connected atomic.Bool - srv.Start(newServerSettings(flattenedSettings{ + err = srv.Start(newServerSettings(flattenedSettings{ endpoint: fmt.Sprintf("localhost:%d", supervisorPort), onConnectingFunc: func(request *http.Request) { connected.Store(true) @@ -275,18 +275,22 @@ func (s *Supervisor) getBootstrapInfo() (err error) { } }, })) + if err != nil { + return err + } cmd, err := commander.NewCommander( s.logger, s.config.Agent, "--config", s.effectiveConfigFilePath, ) - if err != nil { return err } - cmd.Start(context.Background()) + if err = cmd.Start(context.Background()); err != nil { + return err + } select { case <-time.After(3 * time.Second): @@ -298,15 +302,11 @@ func (s *Supervisor) getBootstrapInfo() (err error) { case <-done: } - err = cmd.Stop(context.Background()) - - if err != nil { + if err = cmd.Stop(context.Background()); err != nil { return err } - err = srv.Stop(context.Background()) - - if err != nil { + if err = srv.Stop(context.Background()); err != nil { return err } From 5e06efa2e41e4ce7da266a5f469fd6cc117a8234 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Tue, 19 Dec 2023 22:10:07 +0530 Subject: [PATCH 06/15] Handle OpAMP connection settings Part of https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21043 --- .../supervisor/config/config.go | 3 + cmd/opampsupervisor/supervisor/supervisor.go | 69 +++++++++++++++++-- 2 files changed, 66 insertions(+), 6 deletions(-) diff --git a/cmd/opampsupervisor/supervisor/config/config.go b/cmd/opampsupervisor/supervisor/config/config.go index ccbd523d4e6f..7bbd475a126d 100644 --- a/cmd/opampsupervisor/supervisor/config/config.go +++ b/cmd/opampsupervisor/supervisor/config/config.go @@ -4,6 +4,8 @@ package config import ( + "net/http" + "go.opentelemetry.io/collector/config/configtls" ) @@ -25,6 +27,7 @@ type Capabilities struct { type OpAMPServer struct { Endpoint string + Headers http.Header TLSSetting configtls.TLSClientSetting `mapstructure:"tls,omitempty"` } diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 422d0d5df7ce..3de8b0b6aadb 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -29,6 +29,7 @@ import ( "github.com/open-telemetry/opamp-go/protobufs" "github.com/open-telemetry/opamp-go/server" serverTypes "github.com/open-telemetry/opamp-go/server/types" + "go.opentelemetry.io/collector/config/configopaque" semconv "go.opentelemetry.io/collector/semconv/v1.21.0" "go.uber.org/zap" @@ -364,12 +365,8 @@ func (s *Supervisor) startOpAMP() error { OnErrorFunc: func(err *protobufs.ServerErrorResponse) { s.logger.Error("Server returned an error response", zap.String("message", err.ErrorMessage)) }, - OnMessageFunc: s.onMessage, - OnOpampConnectionSettingsFunc: func(ctx context.Context, settings *protobufs.OpAMPConnectionSettings) error { - // TODO: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21043 - s.logger.Debug("Received ConnectionSettings request") - return nil - }, + OnMessageFunc: s.onMessage, + OnOpampConnectionSettingsFunc: s.onOpampConnectionSettings, OnOpampConnectionSettingsAcceptedFunc: func(settings *protobufs.OpAMPConnectionSettings) { // TODO: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21043 s.logger.Debug("ConnectionSettings accepted") @@ -413,6 +410,66 @@ func (s *Supervisor) startOpAMP() error { return nil } +func (s *Supervisor) stopOpAMP() error { + s.logger.Debug("Stopping OpAMP client...") + return s.opampClient.Stop(context.Background()) +} + +func (s *Supervisor) getHeadersFromSettings(protoHeaders *protobufs.Headers) http.Header { + var headers http.Header + for _, header := range protoHeaders.Headers { + headers.Add(header.Key, header.Value) + } + return headers +} + +func (s *Supervisor) onOpampConnectionSettings(ctx context.Context, settings *protobufs.OpAMPConnectionSettings) error { + if settings == nil { + s.logger.Debug("Received ConnectionSettings request with nil settings") + return nil + } + + newServerConfig := &config.OpAMPServer{} + + if settings.DestinationEndpoint != "" { + newServerConfig.Endpoint = settings.DestinationEndpoint + } + if settings.Headers != nil { + newServerConfig.Headers = s.getHeadersFromSettings(settings.Headers) + } + if settings.Certificate != nil { + if len(settings.Certificate.CaPublicKey) != 0 { + newServerConfig.TLSSetting.CAPem = configopaque.String(settings.Certificate.CaPublicKey) + } + if len(settings.Certificate.PublicKey) != 0 { + newServerConfig.TLSSetting.CertPem = configopaque.String(settings.Certificate.PublicKey) + } + if len(settings.Certificate.PrivateKey) != 0 { + newServerConfig.TLSSetting.KeyPem = configopaque.String(settings.Certificate.PrivateKey) + } + } + + s.stopOpAMP() + + // take a copy of the current OpAMP server config + oldServerConfig := s.config.Server + // update the OpAMP server config + s.config.Server = newServerConfig + + if err := s.startOpAMP(); err != nil { + s.logger.Error("Cannot connect to the OpAMP server using the new settings", zap.Error(err)) + // revert the OpAMP server config + s.config.Server = oldServerConfig + // start the OpAMP client with the old settings + if err := s.startOpAMP(); err != nil { + s.logger.Error("Cannot reconnect to the OpAMP server after restoring old settings", zap.Error(err)) + return err + } + } + + return nil +} + // TODO: Persist instance ID. https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21073 func (s *Supervisor) createInstanceID() (ulid.ULID, error) { entropy := ulid.Monotonic(rand.New(rand.NewSource(0)), 0) From 1f7a776b95b435e46321b603a7a92b4986bb3d09 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sun, 31 Dec 2023 03:13:07 +0530 Subject: [PATCH 07/15] Add capability to supervisor and e2e test --- cmd/opampsupervisor/e2e_test.go | 55 +++++++++++++++++-- cmd/opampsupervisor/go.mod | 2 +- .../supervisor/config/config.go | 11 ++-- cmd/opampsupervisor/supervisor/supervisor.go | 13 ++++- .../supervisor/supervisor_accepts_conn.yaml | 15 +++++ 5 files changed, 84 insertions(+), 12 deletions(-) create mode 100644 cmd/opampsupervisor/testdata/supervisor/supervisor_accepts_conn.yaml diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index a569ac747b65..5ab7458f24b4 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -79,12 +79,12 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca s := server.New(testLogger{t: t}) onConnectedFunc := callbacks.OnConnectedFunc callbacks.OnConnectedFunc = func(conn types.Connection) { - agentConn.Store(conn) - isAgentConnected.Store(true) - connectedChan <- true if onConnectedFunc != nil { onConnectedFunc(conn) } + agentConn.Store(conn) + isAgentConnected.Store(true) + connectedChan <- true } onConnectionCloseFunc := callbacks.OnConnectionCloseFunc callbacks.OnConnectionCloseFunc = func(conn types.Connection) { @@ -130,7 +130,9 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca func newSupervisor(t *testing.T, configType string, extraConfigData map[string]string) *supervisor.Supervisor { cfgFile := getSupervisorConfig(t, configType, extraConfigData) - s, err := supervisor.NewSupervisor(zap.NewNop(), cfgFile.Name()) + logger, err := zap.NewDevelopment() + require.NoError(t, err) + s, err := supervisor.NewSupervisor(logger, cfgFile.Name()) require.NoError(t, err) return s @@ -470,3 +472,48 @@ func waitForSupervisorConnection(connection chan bool, connected bool) { } } } + +func TestSupervisorOpAMPConnectionSettings(t *testing.T) { + var connectedToNewServer atomic.Bool + initialServer := newOpAMPServer( + t, + defaultConnectingHandler, + server.ConnectionCallbacksStruct{}) + + s := newSupervisor(t, "accepts_conn", map[string]string{"url": initialServer.addr}) + defer s.Shutdown() + + waitForSupervisorConnection(initialServer.supervisorConnected, true) + + newServer := newOpAMPServer( + t, + defaultConnectingHandler, + server.ConnectionCallbacksStruct{ + OnConnectedFunc: func(_ types.Connection) { + connectedToNewServer.Store(true) + }, + OnMessageFunc: func(_ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + return &protobufs.ServerToAgent{} + }, + }) + + initialServer.sendToSupervisor(&protobufs.ServerToAgent{ + ConnectionSettings: &protobufs.ConnectionSettingsOffers{ + Opamp: &protobufs.OpAMPConnectionSettings{ + DestinationEndpoint: "ws://" + newServer.addr + "/v1/opamp", + Headers: &protobufs.Headers{ + Headers: []*protobufs.Header{ + { + Key: "x-foo", + Value: "bar", + }, + }, + }, + }, + }, + }) + + require.Eventually(t, func() bool { + return connectedToNewServer.Load() == true + }, 10*time.Second, 500*time.Millisecond, "Collector did not connect to new OpAMP server") +} diff --git a/cmd/opampsupervisor/go.mod b/cmd/opampsupervisor/go.mod index c91c034eef8c..3d742a81ba00 100644 --- a/cmd/opampsupervisor/go.mod +++ b/cmd/opampsupervisor/go.mod @@ -11,6 +11,7 @@ require ( github.com/oklog/ulid/v2 v2.1.0 github.com/open-telemetry/opamp-go v0.10.0 github.com/stretchr/testify v1.8.4 + go.opentelemetry.io/collector/config/configopaque v0.91.0 go.opentelemetry.io/collector/config/configtls v0.91.0 go.opentelemetry.io/collector/semconv v0.91.0 go.uber.org/zap v1.26.0 @@ -25,7 +26,6 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - go.opentelemetry.io/collector/config/configopaque v0.91.0 // indirect go.uber.org/multierr v1.10.0 // indirect golang.org/x/sys v0.14.0 // indirect google.golang.org/protobuf v1.31.0 // indirect diff --git a/cmd/opampsupervisor/supervisor/config/config.go b/cmd/opampsupervisor/supervisor/config/config.go index 7bbd475a126d..b24d76b51e27 100644 --- a/cmd/opampsupervisor/supervisor/config/config.go +++ b/cmd/opampsupervisor/supervisor/config/config.go @@ -18,11 +18,12 @@ type Supervisor struct { // Capabilities is the set of capabilities that the Supervisor supports. type Capabilities struct { - AcceptsRemoteConfig *bool `mapstructure:"accepts_remote_config"` - ReportsEffectiveConfig *bool `mapstructure:"reports_effective_config"` - ReportsOwnMetrics *bool `mapstructure:"reports_own_metrics"` - ReportsHealth *bool `mapstructure:"reports_health"` - ReportsRemoteConfig *bool `mapstructure:"reports_remote_config"` + AcceptsRemoteConfig *bool `mapstructure:"accepts_remote_config"` + AcceptsOpAMPConnectionSettings *bool `mapstructure:"accepts_opamp_connection_settings"` + ReportsEffectiveConfig *bool `mapstructure:"reports_effective_config"` + ReportsOwnMetrics *bool `mapstructure:"reports_own_metrics"` + ReportsHealth *bool `mapstructure:"reports_health"` + ReportsRemoteConfig *bool `mapstructure:"reports_remote_config"` } type OpAMPServer struct { diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 3de8b0b6aadb..82aead5da231 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -30,6 +30,7 @@ import ( "github.com/open-telemetry/opamp-go/server" serverTypes "github.com/open-telemetry/opamp-go/server/types" "go.opentelemetry.io/collector/config/configopaque" + "go.opentelemetry.io/collector/config/configtls" semconv "go.opentelemetry.io/collector/semconv/v1.21.0" "go.uber.org/zap" @@ -339,6 +340,10 @@ func (s *Supervisor) Capabilities() protobufs.AgentCapabilities { if c.ReportsRemoteConfig != nil && *c.ReportsRemoteConfig { supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_ReportsRemoteConfig } + + if c.AcceptsOpAMPConnectionSettings != nil && *c.AcceptsOpAMPConnectionSettings { + supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings + } } return supportedCapabilities } @@ -351,6 +356,7 @@ func (s *Supervisor) startOpAMP() error { return err } + s.logger.Debug("Connecting to OpAMP server...", zap.String("endpoint", s.config.Server.Endpoint)) settings := types.StartSettings{ OpAMPServerURL: s.config.Server.Endpoint, TLSConfig: tlsConfig, @@ -412,11 +418,12 @@ func (s *Supervisor) startOpAMP() error { func (s *Supervisor) stopOpAMP() error { s.logger.Debug("Stopping OpAMP client...") - return s.opampClient.Stop(context.Background()) + ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) + return s.opampClient.Stop(ctx) } func (s *Supervisor) getHeadersFromSettings(protoHeaders *protobufs.Headers) http.Header { - var headers http.Header + headers := make(http.Header) for _, header := range protoHeaders.Headers { headers.Add(header.Key, header.Value) } @@ -447,6 +454,8 @@ func (s *Supervisor) onOpampConnectionSettings(ctx context.Context, settings *pr if len(settings.Certificate.PrivateKey) != 0 { newServerConfig.TLSSetting.KeyPem = configopaque.String(settings.Certificate.PrivateKey) } + } else { + newServerConfig.TLSSetting = configtls.TLSClientSetting{Insecure: true} } s.stopOpAMP() diff --git a/cmd/opampsupervisor/testdata/supervisor/supervisor_accepts_conn.yaml b/cmd/opampsupervisor/testdata/supervisor/supervisor_accepts_conn.yaml new file mode 100644 index 000000000000..0282577b252a --- /dev/null +++ b/cmd/opampsupervisor/testdata/supervisor/supervisor_accepts_conn.yaml @@ -0,0 +1,15 @@ +server: + endpoint: ws://{{.url}}/v1/opamp + tls: + insecure: true + +capabilities: + reports_effective_config: true + reports_own_metrics: true + reports_health: true + accepts_remote_config: true + reports_remote_config: true + accepts_opamp_connection_settings: true + +agent: + executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}} From 0a8c0bbcb6a29f67e2d40df35c179c1ad50013f0 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sun, 31 Dec 2023 19:06:04 +0530 Subject: [PATCH 08/15] fix lint and pass headers --- cmd/opampsupervisor/supervisor/supervisor.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 82aead5da231..ccfec8ab080f 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -359,6 +359,7 @@ func (s *Supervisor) startOpAMP() error { s.logger.Debug("Connecting to OpAMP server...", zap.String("endpoint", s.config.Server.Endpoint)) settings := types.StartSettings{ OpAMPServerURL: s.config.Server.Endpoint, + Header: s.config.Server.Headers, TLSConfig: tlsConfig, InstanceUid: s.instanceID.String(), Callbacks: types.CallbacksStruct{ @@ -418,8 +419,14 @@ func (s *Supervisor) startOpAMP() error { func (s *Supervisor) stopOpAMP() error { s.logger.Debug("Stopping OpAMP client...") - ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) - return s.opampClient.Stop(ctx) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + err := s.opampClient.Stop(ctx) + cancel() + if err != nil { + return err + } + s.logger.Debug("OpAMP client stopped.") + return nil } func (s *Supervisor) getHeadersFromSettings(protoHeaders *protobufs.Headers) http.Header { From e910234cd6f35624ada890bc4fc1a0c61c7c49d8 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sat, 13 Jan 2024 15:11:12 +0530 Subject: [PATCH 09/15] Fix conflicts --- cmd/opampsupervisor/e2e_test.go | 4 +++- cmd/opampsupervisor/supervisor/supervisor.go | 8 +++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 2559feff4d51..5ab7458f24b4 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -130,7 +130,9 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca func newSupervisor(t *testing.T, configType string, extraConfigData map[string]string) *supervisor.Supervisor { cfgFile := getSupervisorConfig(t, configType, extraConfigData) - s, err := supervisor.NewSupervisor(zap.NewNop(), cfgFile.Name()) + logger, err := zap.NewDevelopment() + require.NoError(t, err) + s, err := supervisor.NewSupervisor(logger, cfgFile.Name()) require.NoError(t, err) return s diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index eb58e61f990f..10124f675942 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -29,6 +29,8 @@ import ( "github.com/open-telemetry/opamp-go/protobufs" "github.com/open-telemetry/opamp-go/server" serverTypes "github.com/open-telemetry/opamp-go/server/types" + "go.opentelemetry.io/collector/config/configopaque" + "go.opentelemetry.io/collector/config/configtls" semconv "go.opentelemetry.io/collector/semconv/v1.21.0" "go.uber.org/zap" @@ -74,10 +76,6 @@ type Supervisor struct { extraConfigTemplate *template.Template ownTelemetryTemplate *template.Template - bootstrapTemplate *template.Template - extraConfigTemplate *template.Template - ownTelemetryTemplate *template.Template - // A config section to be added to the Collector's config to fetch its own metrics. // TODO: store this persistently so that when starting we can compose the effective // config correctly. @@ -354,7 +352,7 @@ func (s *Supervisor) startOpAMP() error { return err } - s.logger.Debug("Connecting to OpAMP server...", zap.String("endpoint", s.config.Server.Endpoint)) + s.logger.Debug("Connecting to OpAMP server...", zap.String("endpoint", s.config.Server.Endpoint), zap.Any("headers", s.config.Server.Headers)) settings := types.StartSettings{ OpAMPServerURL: s.config.Server.Endpoint, Header: s.config.Server.Headers, From c9570dfc4216596910d22a9eab96793326da03b4 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sat, 13 Jan 2024 15:31:06 +0530 Subject: [PATCH 10/15] Fix checks --- .chloggen/supervisor-accepts-conn.yaml | 27 ++++++++++++++++++++ cmd/opampsupervisor/go.mod | 2 +- cmd/opampsupervisor/supervisor/supervisor.go | 7 +++-- 3 files changed, 33 insertions(+), 3 deletions(-) create mode 100644 .chloggen/supervisor-accepts-conn.yaml diff --git a/.chloggen/supervisor-accepts-conn.yaml b/.chloggen/supervisor-accepts-conn.yaml new file mode 100644 index 000000000000..60dc5d6bc7e4 --- /dev/null +++ b/.chloggen/supervisor-accepts-conn.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: cmd/opampsupervisor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Handle OpAMP connection settings. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [21043] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/opampsupervisor/go.mod b/cmd/opampsupervisor/go.mod index ff63704e1762..03ea18ab51fd 100644 --- a/cmd/opampsupervisor/go.mod +++ b/cmd/opampsupervisor/go.mod @@ -11,6 +11,7 @@ require ( github.com/oklog/ulid/v2 v2.1.0 github.com/open-telemetry/opamp-go v0.10.0 github.com/stretchr/testify v1.8.4 + go.opentelemetry.io/collector/config/configopaque v0.92.1-0.20240112172857-83d463ceba06 go.opentelemetry.io/collector/config/configtls v0.92.1-0.20240112172857-83d463ceba06 go.opentelemetry.io/collector/semconv v0.92.1-0.20240112172857-83d463ceba06 go.uber.org/zap v1.26.0 @@ -25,7 +26,6 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - go.opentelemetry.io/collector/config/configopaque v0.92.1-0.20240112172857-83d463ceba06 // indirect go.uber.org/multierr v1.10.0 // indirect golang.org/x/sys v0.14.0 // indirect google.golang.org/protobuf v1.31.0 // indirect diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 10124f675942..7ee0e66ab5ce 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -433,7 +433,7 @@ func (s *Supervisor) getHeadersFromSettings(protoHeaders *protobufs.Headers) htt return headers } -func (s *Supervisor) onOpampConnectionSettings(ctx context.Context, settings *protobufs.OpAMPConnectionSettings) error { +func (s *Supervisor) onOpampConnectionSettings(_ context.Context, settings *protobufs.OpAMPConnectionSettings) error { if settings == nil { s.logger.Debug("Received ConnectionSettings request with nil settings") return nil @@ -461,7 +461,10 @@ func (s *Supervisor) onOpampConnectionSettings(ctx context.Context, settings *pr newServerConfig.TLSSetting = configtls.TLSClientSetting{Insecure: true} } - s.stopOpAMP() + if err := s.stopOpAMP(); err != nil { + s.logger.Error("Cannot stop the OpAMP client", zap.Error(err)) + return err + } // take a copy of the current OpAMP server config oldServerConfig := s.config.Server From a51fbe603def1bbfe2bb6e27d1a2b757efb58eb6 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sun, 21 Jan 2024 00:30:53 +0530 Subject: [PATCH 11/15] mod tidy --- cmd/opampsupervisor/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/opampsupervisor/go.mod b/cmd/opampsupervisor/go.mod index acf7c2598c83..dfe1cc2246d4 100644 --- a/cmd/opampsupervisor/go.mod +++ b/cmd/opampsupervisor/go.mod @@ -11,6 +11,7 @@ require ( github.com/oklog/ulid/v2 v2.1.0 github.com/open-telemetry/opamp-go v0.10.0 github.com/stretchr/testify v1.8.4 + go.opentelemetry.io/collector/config/configopaque v0.92.1-0.20240118172122-8131d31601b8 go.opentelemetry.io/collector/config/configtls v0.92.1-0.20240118172122-8131d31601b8 go.opentelemetry.io/collector/semconv v0.92.1-0.20240118172122-8131d31601b8 go.uber.org/goleak v1.3.0 @@ -26,7 +27,6 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - go.opentelemetry.io/collector/config/configopaque v0.92.1-0.20240118172122-8131d31601b8 // indirect go.uber.org/multierr v1.10.0 // indirect golang.org/x/sys v0.14.0 // indirect google.golang.org/protobuf v1.31.0 // indirect From c40d622581a64bd46968cb2c4a416282014afef9 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Fri, 26 Jan 2024 00:29:43 +0530 Subject: [PATCH 12/15] Update to latest main and temporarily skip DeadlineExceeded --- cmd/opampsupervisor/e2e_test.go | 16 ++++++++-------- cmd/opampsupervisor/go.mod | 4 ++-- cmd/opampsupervisor/go.sum | 4 ++-- cmd/opampsupervisor/supervisor/server.go | 3 ++- cmd/opampsupervisor/supervisor/supervisor.go | 5 +++-- 5 files changed, 17 insertions(+), 15 deletions(-) diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 7fdad70c4232..c77924066578 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -81,9 +81,9 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca connectedChan := make(chan bool) s := server.New(testLogger{t: t}) onConnectedFunc := callbacks.OnConnectedFunc - callbacks.OnConnectedFunc = func(conn types.Connection) { + callbacks.OnConnectedFunc = func(ctx context.Context, conn types.Connection) { if onConnectedFunc != nil { - onConnectedFunc(conn) + onConnectedFunc(ctx, conn) } agentConn.Store(conn) isAgentConnected.Store(true) @@ -177,7 +177,7 @@ func TestSupervisorStartsCollectorWithRemoteConfig(t *testing.T) { t, defaultConnectingHandler, server.ConnectionCallbacksStruct{ - OnMessageFunc: func(_ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.EffectiveConfig != nil { config := message.EffectiveConfig.ConfigMap.ConfigMap[""] if config != nil { @@ -237,7 +237,7 @@ func TestSupervisorRestartsCollectorAfterBadConfig(t *testing.T) { t, defaultConnectingHandler, server.ConnectionCallbacksStruct{ - OnMessageFunc: func(_ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.Health != nil { healthReport.Store(message.Health) } @@ -321,7 +321,7 @@ func TestSupervisorConfiguresCapabilities(t *testing.T) { t, defaultConnectingHandler, server.ConnectionCallbacksStruct{ - OnMessageFunc: func(_ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { capabilities.Store(message.Capabilities) return &protobufs.ServerToAgent{} @@ -374,7 +374,7 @@ func TestSupervisorBootstrapsCollector(t *testing.T) { t, defaultConnectingHandler, server.ConnectionCallbacksStruct{ - OnMessageFunc: func(_ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.AgentDescription != nil { agentDescription.Store(message.AgentDescription) } @@ -492,10 +492,10 @@ func TestSupervisorOpAMPConnectionSettings(t *testing.T) { t, defaultConnectingHandler, server.ConnectionCallbacksStruct{ - OnConnectedFunc: func(_ types.Connection) { + OnConnectedFunc: func(_ context.Context, _ types.Connection) { connectedToNewServer.Store(true) }, - OnMessageFunc: func(_ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { return &protobufs.ServerToAgent{} }, }) diff --git a/cmd/opampsupervisor/go.mod b/cmd/opampsupervisor/go.mod index 479c02bc96d8..98faaf3c2ed2 100644 --- a/cmd/opampsupervisor/go.mod +++ b/cmd/opampsupervisor/go.mod @@ -9,8 +9,9 @@ require ( github.com/knadh/koanf/providers/rawbytes v0.1.0 github.com/knadh/koanf/v2 v2.0.1 github.com/oklog/ulid/v2 v2.1.0 - github.com/open-telemetry/opamp-go v0.11.0 + github.com/open-telemetry/opamp-go v0.11.1-0.20240123204604-4d07a6af062f github.com/stretchr/testify v1.8.4 + go.opentelemetry.io/collector/config/configopaque v0.93.1-0.20240124123350-9047c0e373f9 go.opentelemetry.io/collector/config/configtls v0.93.1-0.20240124123350-9047c0e373f9 go.opentelemetry.io/collector/semconv v0.93.1-0.20240124123350-9047c0e373f9 go.uber.org/goleak v1.3.0 @@ -27,7 +28,6 @@ require ( github.com/mitchellh/mapstructure v1.5.1-0.20231216201459-8508981c8b6c // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - go.opentelemetry.io/collector/config/configopaque v0.93.1-0.20240124123350-9047c0e373f9 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/sys v0.16.0 // indirect diff --git a/cmd/opampsupervisor/go.sum b/cmd/opampsupervisor/go.sum index 38048198f462..15e47f6203a9 100644 --- a/cmd/opampsupervisor/go.sum +++ b/cmd/opampsupervisor/go.sum @@ -28,8 +28,8 @@ github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zx github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU= github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ= -github.com/open-telemetry/opamp-go v0.11.0 h1:+fZrbbQlJNsdpe574vd4CI9ss8CmWpOOiq5Mz6DVSoM= -github.com/open-telemetry/opamp-go v0.11.0/go.mod h1:bk3WZ4RjbVdzsHT3gaPZscUdGvoz9Bi2+AvG8/5X824= +github.com/open-telemetry/opamp-go v0.11.1-0.20240123204604-4d07a6af062f h1:KgShO6synyGBqGXPfZJ6Jfs8YyQsdHYjsZq7ol+Bm0s= +github.com/open-telemetry/opamp-go v0.11.1-0.20240123204604-4d07a6af062f/go.mod h1:bk3WZ4RjbVdzsHT3gaPZscUdGvoz9Bi2+AvG8/5X824= github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/cmd/opampsupervisor/supervisor/server.go b/cmd/opampsupervisor/supervisor/server.go index efc7ba5e154d..6e4c00697097 100644 --- a/cmd/opampsupervisor/supervisor/server.go +++ b/cmd/opampsupervisor/supervisor/server.go @@ -4,6 +4,7 @@ package supervisor import ( + "context" "net/http" "github.com/open-telemetry/opamp-go/protobufs" @@ -28,7 +29,7 @@ func newServerSettings(fs flattenedSettings) server.StartSettings { return serverTypes.ConnectionResponse{ Accept: true, ConnectionCallbacks: server.ConnectionCallbacksStruct{ - OnMessageFunc: func(conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + OnMessageFunc: func(_ context.Context, conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if fs.onMessageFunc != nil { fs.onMessageFunc(conn, message) } diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 0814e9a23787..5c232d0fd2c0 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -416,9 +416,10 @@ func (s *Supervisor) startOpAMP() error { func (s *Supervisor) stopOpAMP() error { s.logger.Debug("Stopping OpAMP client...") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() err := s.opampClient.Stop(ctx) - cancel() - if err != nil { + // TODO(srikanthccv): remove context.DeadlineExceeded after https://github.com/open-telemetry/opamp-go/pull/213 + if err != nil && !errors.Is(err, context.DeadlineExceeded) { return err } s.logger.Debug("OpAMP client stopped.") From eeba0e36af917d141f85900c26e449cc6bec8d18 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Wed, 21 Feb 2024 22:30:58 +0530 Subject: [PATCH 13/15] Add review comments --- cmd/opampsupervisor/e2e_test.go | 4 +--- cmd/opampsupervisor/supervisor/supervisor.go | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index c77924066578..6119ed1c5490 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -133,9 +133,7 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca func newSupervisor(t *testing.T, configType string, extraConfigData map[string]string) *supervisor.Supervisor { cfgFile := getSupervisorConfig(t, configType, extraConfigData) - logger, err := zap.NewDevelopment() - require.NoError(t, err) - s, err := supervisor.NewSupervisor(logger, cfgFile.Name()) + s, err := supervisor.NewSupervisor(zap.NewNop(), cfgFile.Name()) require.NoError(t, err) return s diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 7efe4785878a..ab2684251e53 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -860,7 +860,7 @@ func (s *Supervisor) Shutdown() { s.logger.Error("Could not report health to OpAMP server", zap.Error(err)) } - err = s.opampClient.Stop(context.Background()) + err = s.stopOpAMP() if err != nil { s.logger.Error("Could not stop the OpAMP client", zap.Error(err)) From aab743295d3d967c27bbb57b169b94e91fd408cc Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Wed, 3 Apr 2024 23:23:07 +0530 Subject: [PATCH 14/15] Rely on OnConnectFunc for conn status --- cmd/opampsupervisor/supervisor/supervisor.go | 26 +++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index d22777fdf25d..1ca186e89153 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -107,6 +107,8 @@ type Supervisor struct { agentHasStarted bool agentStartHealthCheckAttempts int + + connectedToOpAMPServer chan struct{} } func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { @@ -116,6 +118,7 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { effectiveConfigFilePath: "effective.yaml", agentConfigOwnMetricsSection: &atomic.Value{}, effectiveConfig: &atomic.Value{}, + connectedToOpAMPServer: make(chan struct{}), } if err := s.createTemplates(); err != nil { @@ -154,6 +157,10 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { return nil, fmt.Errorf("cannot start OpAMP client: %w", err) } + if err := s.waitForOpAMPConnection(); err != nil { + return nil, fmt.Errorf("failed to connect to the OpAMP server: %w", err) + } + s.commander, err = commander.NewCommander( s.logger, s.config.Agent, @@ -367,6 +374,7 @@ func (s *Supervisor) startOpAMP() error { InstanceUid: s.instanceID.String(), Callbacks: types.CallbacksStruct{ OnConnectFunc: func(_ context.Context) { + s.connectedToOpAMPServer <- struct{}{} s.logger.Debug("Connected to the server.") }, OnConnectFailedFunc: func(_ context.Context, err error) { @@ -375,8 +383,11 @@ func (s *Supervisor) startOpAMP() error { OnErrorFunc: func(_ context.Context, err *protobufs.ServerErrorResponse) { s.logger.Error("Server returned an error response", zap.String("message", err.ErrorMessage)) }, - OnMessageFunc: s.onMessage, - OnOpampConnectionSettingsFunc: s.onOpampConnectionSettings, + OnMessageFunc: s.onMessage, + OnOpampConnectionSettingsFunc: func(ctx context.Context, settings *protobufs.OpAMPConnectionSettings) error { + go s.onOpampConnectionSettings(ctx, settings) + return nil + }, OnCommandFunc: func(_ context.Context, command *protobufs.ServerToAgentCommand) error { cmdType := command.GetType() if *cmdType.Enum() == protobufs.CommandType_CommandType_Restart { @@ -485,8 +496,17 @@ func (s *Supervisor) onOpampConnectionSettings(_ context.Context, settings *prot return err } } + return s.waitForOpAMPConnection() +} - return nil +func (s *Supervisor) waitForOpAMPConnection() error { + // wait for the OpAMP client to connect to the server or timeout + select { + case <-s.connectedToOpAMPServer: + return nil + case <-time.After(10 * time.Second): + return errors.New("timed out waiting for the server to connect") + } } // TODO: Persist instance ID. https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21073 From 36a16cdba416dd75ed97d953b6c6c04565588f0f Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Wed, 3 Apr 2024 23:30:00 +0530 Subject: [PATCH 15/15] Fix lint --- cmd/opampsupervisor/supervisor/supervisor.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 1ca186e89153..2c4d132fc88f 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -157,7 +157,7 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { return nil, fmt.Errorf("cannot start OpAMP client: %w", err) } - if err := s.waitForOpAMPConnection(); err != nil { + if connErr := s.waitForOpAMPConnection(); connErr != nil { return nil, fmt.Errorf("failed to connect to the OpAMP server: %w", err) } @@ -385,6 +385,7 @@ func (s *Supervisor) startOpAMP() error { }, OnMessageFunc: s.onMessage, OnOpampConnectionSettingsFunc: func(ctx context.Context, settings *protobufs.OpAMPConnectionSettings) error { + //nolint:errcheck go s.onOpampConnectionSettings(ctx, settings) return nil },