-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[cmd/opampsupervisor] Handle OpAMP connection settings #30237
Changes from all commits
826f613
99a1bda
3add507
68a633f
45d1533
5e06efa
1f7a776
0a8c0bb
e64019c
e910234
c9570df
2c025cf
a51fbe6
6a10718
c40d622
a9a1e85
eeba0e3
7429102
27d3e0b
a0b4ef1
aab7432
36a16cd
3ba9be9
7d8e3e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# Use this changelog template to create an entry for release notes. | ||
|
||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: enhancement | ||
|
||
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) | ||
component: cmd/opampsupervisor | ||
|
||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). | ||
note: Handle OpAMP connection settings. | ||
|
||
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. | ||
issues: [21043] | ||
|
||
# (Optional) One or more lines of additional information to render under the primary note. | ||
# These lines will be padded with 2 spaces and then inserted directly into the document. | ||
# Use pipe (|) for multiline entries. | ||
subtext: | ||
|
||
# If your change doesn't affect end users or the exported elements of any package, | ||
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. | ||
# Optional: The change log or logs in which this entry should be included. | ||
# e.g. '[user]' or '[user, api]' | ||
# Include 'user' if the change is relevant to end users. | ||
# Include 'api' if there is a change to a library API. | ||
# Default: '[user]' | ||
change_logs: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,8 @@ import ( | |
"github.com/open-telemetry/opamp-go/protobufs" | ||
"github.com/open-telemetry/opamp-go/server" | ||
serverTypes "github.com/open-telemetry/opamp-go/server/types" | ||
"go.opentelemetry.io/collector/config/configopaque" | ||
"go.opentelemetry.io/collector/config/configtls" | ||
semconv "go.opentelemetry.io/collector/semconv/v1.21.0" | ||
"go.uber.org/zap" | ||
|
||
|
@@ -105,6 +107,8 @@ type Supervisor struct { | |
|
||
agentHasStarted bool | ||
agentStartHealthCheckAttempts int | ||
|
||
connectedToOpAMPServer chan struct{} | ||
} | ||
|
||
func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { | ||
|
@@ -114,6 +118,7 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { | |
effectiveConfigFilePath: "effective.yaml", | ||
agentConfigOwnMetricsSection: &atomic.Value{}, | ||
effectiveConfig: &atomic.Value{}, | ||
connectedToOpAMPServer: make(chan struct{}), | ||
} | ||
|
||
if err := s.createTemplates(); err != nil { | ||
|
@@ -152,6 +157,10 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { | |
return nil, fmt.Errorf("cannot start OpAMP client: %w", err) | ||
} | ||
|
||
if connErr := s.waitForOpAMPConnection(); connErr != nil { | ||
return nil, fmt.Errorf("failed to connect to the OpAMP server: %w", err) | ||
} | ||
|
||
s.commander, err = commander.NewCommander( | ||
s.logger, | ||
s.config.Agent, | ||
|
@@ -341,6 +350,10 @@ func (s *Supervisor) Capabilities() protobufs.AgentCapabilities { | |
if c.ReportsRemoteConfig != nil && *c.ReportsRemoteConfig { | ||
supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_ReportsRemoteConfig | ||
} | ||
|
||
if c.AcceptsOpAMPConnectionSettings != nil && *c.AcceptsOpAMPConnectionSettings { | ||
supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings | ||
} | ||
} | ||
return supportedCapabilities | ||
} | ||
|
@@ -353,12 +366,15 @@ func (s *Supervisor) startOpAMP() error { | |
return err | ||
} | ||
|
||
s.logger.Debug("Connecting to OpAMP server...", zap.String("endpoint", s.config.Server.Endpoint), zap.Any("headers", s.config.Server.Headers)) | ||
settings := types.StartSettings{ | ||
OpAMPServerURL: s.config.Server.Endpoint, | ||
Header: s.config.Server.Headers, | ||
TLSConfig: tlsConfig, | ||
InstanceUid: s.instanceID.String(), | ||
Callbacks: types.CallbacksStruct{ | ||
OnConnectFunc: func(_ context.Context) { | ||
s.connectedToOpAMPServer <- struct{}{} | ||
s.logger.Debug("Connected to the server.") | ||
}, | ||
OnConnectFailedFunc: func(_ context.Context, err error) { | ||
|
@@ -368,9 +384,9 @@ func (s *Supervisor) startOpAMP() error { | |
s.logger.Error("Server returned an error response", zap.String("message", err.ErrorMessage)) | ||
}, | ||
OnMessageFunc: s.onMessage, | ||
OnOpampConnectionSettingsFunc: func(_ context.Context, _ *protobufs.OpAMPConnectionSettings) error { | ||
// TODO: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21043 | ||
s.logger.Debug("Received ConnectionSettings request") | ||
OnOpampConnectionSettingsFunc: func(ctx context.Context, settings *protobufs.OpAMPConnectionSettings) error { | ||
//nolint:errcheck | ||
go s.onOpampConnectionSettings(ctx, settings) | ||
return nil | ||
}, | ||
OnCommandFunc: func(_ context.Context, command *protobufs.ServerToAgentCommand) error { | ||
|
@@ -412,6 +428,88 @@ func (s *Supervisor) startOpAMP() error { | |
return nil | ||
} | ||
|
||
func (s *Supervisor) stopOpAMP() error { | ||
evan-bradley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
s.logger.Debug("Stopping OpAMP client...") | ||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | ||
defer cancel() | ||
err := s.opampClient.Stop(ctx) | ||
// TODO(srikanthccv): remove context.DeadlineExceeded after https://github.com/open-telemetry/opamp-go/pull/213 | ||
if err != nil && !errors.Is(err, context.DeadlineExceeded) { | ||
evan-bradley marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return err | ||
} | ||
s.logger.Debug("OpAMP client stopped.") | ||
return nil | ||
} | ||
|
||
func (s *Supervisor) getHeadersFromSettings(protoHeaders *protobufs.Headers) http.Header { | ||
headers := make(http.Header) | ||
for _, header := range protoHeaders.Headers { | ||
headers.Add(header.Key, header.Value) | ||
} | ||
return headers | ||
} | ||
|
||
func (s *Supervisor) onOpampConnectionSettings(_ context.Context, settings *protobufs.OpAMPConnectionSettings) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we try to perform this operation in a separate goroutine so that we don't block here in this callback for the entire duration of switching the connections? Let the callback return quickly and initiate the re-establishing of the connection in a separate goroutine. This would ensure that if the currently received message contains more data and not just connection settings all that data will be processed. With the current approach I am not sure what exactly will happen. I think callbacks generally should avoid doing long-lasting blocking operations since they than block other callbacks and the entire opamp operation. I also think the comment here is completely misleading. We should either implement what the comment says (i.e. the caller should do the reconnection) or fix the comment to sat the callback implementation should do the reconnection. @andykellr @evan-bradley Any thoughts on what you would prefer? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Our agent does not currently support
We should clearly state who is responsible for establishing a new connection. I could imagine this being fully implemented in the client library and only have If we expect the implementer of the callback to do this, we should describe the process we expect to take place and implement that in tests and the example agent.
It is not possible to confirm success until the client is started and an initial status message is sent.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Right, the returned value from this callback indicates the reject/accept status of the connection settings. I don't see how we can make this async without changing how the client is expected to handle There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think this was the original intent. However, I am not sure this is the best approach. It is also not how the example implementation works and I don't see good arguments against how the example is implemented today. To summarize this is what the example is supposed to do:
The example implementation of steps 4 and 5 is not complete today (e.g not waiting for OnConnect), but can be modified to match what I described. Thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I created an issue for this open-telemetry/opamp-go#261 Let's also discuss today in our call. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI, open-telemetry/opamp-go#266 removes OnOpampConnectionSettingsAccepted callback. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated the implementation to run in a separate goroutine and use OnConnect to determine the connection status. |
||
if settings == nil { | ||
s.logger.Debug("Received ConnectionSettings request with nil settings") | ||
return nil | ||
} | ||
|
||
newServerConfig := &config.OpAMPServer{} | ||
|
||
if settings.DestinationEndpoint != "" { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if there any value in using some of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not very familiar with |
||
newServerConfig.Endpoint = settings.DestinationEndpoint | ||
} | ||
if settings.Headers != nil { | ||
newServerConfig.Headers = s.getHeadersFromSettings(settings.Headers) | ||
} | ||
if settings.Certificate != nil { | ||
if len(settings.Certificate.CaPublicKey) != 0 { | ||
newServerConfig.TLSSetting.CAPem = configopaque.String(settings.Certificate.CaPublicKey) | ||
} | ||
if len(settings.Certificate.PublicKey) != 0 { | ||
newServerConfig.TLSSetting.CertPem = configopaque.String(settings.Certificate.PublicKey) | ||
} | ||
if len(settings.Certificate.PrivateKey) != 0 { | ||
newServerConfig.TLSSetting.KeyPem = configopaque.String(settings.Certificate.PrivateKey) | ||
} | ||
} else { | ||
newServerConfig.TLSSetting = configtls.ClientConfig{Insecure: true} | ||
} | ||
|
||
if err := s.stopOpAMP(); err != nil { | ||
s.logger.Error("Cannot stop the OpAMP client", zap.Error(err)) | ||
return err | ||
} | ||
|
||
// take a copy of the current OpAMP server config | ||
oldServerConfig := s.config.Server | ||
// update the OpAMP server config | ||
s.config.Server = newServerConfig | ||
|
||
if err := s.startOpAMP(); err != nil { | ||
s.logger.Error("Cannot connect to the OpAMP server using the new settings", zap.Error(err)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. startOpAMP() initiates but does not wait for successful connection. I think we need to wait for it before we consider the new opamp settings successful. It can be done in a future PR, but would be good to add a TODO here. |
||
// revert the OpAMP server config | ||
s.config.Server = oldServerConfig | ||
// start the OpAMP client with the old settings | ||
if err := s.startOpAMP(); err != nil { | ||
s.logger.Error("Cannot reconnect to the OpAMP server after restoring old settings", zap.Error(err)) | ||
return err | ||
} | ||
} | ||
return s.waitForOpAMPConnection() | ||
} | ||
|
||
func (s *Supervisor) waitForOpAMPConnection() error { | ||
// wait for the OpAMP client to connect to the server or timeout | ||
select { | ||
case <-s.connectedToOpAMPServer: | ||
return nil | ||
case <-time.After(10 * time.Second): | ||
return errors.New("timed out waiting for the server to connect") | ||
} | ||
} | ||
|
||
// TODO: Persist instance ID. https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21073 | ||
func (s *Supervisor) createInstanceID() (ulid.ULID, error) { | ||
entropy := ulid.Monotonic(rand.New(rand.NewSource(0)), 0) | ||
|
@@ -779,7 +877,7 @@ func (s *Supervisor) Shutdown() { | |
s.logger.Error("Could not report health to OpAMP server", zap.Error(err)) | ||
} | ||
|
||
err = s.opampClient.Stop(context.Background()) | ||
err = s.stopOpAMP() | ||
|
||
if err != nil { | ||
s.logger.Error("Could not stop the OpAMP client", zap.Error(err)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
server: | ||
endpoint: ws://{{.url}}/v1/opamp | ||
tls: | ||
insecure: true | ||
|
||
capabilities: | ||
reports_effective_config: true | ||
reports_own_metrics: true | ||
reports_health: true | ||
accepts_remote_config: true | ||
reports_remote_config: true | ||
accepts_opamp_connection_settings: true | ||
|
||
agent: | ||
executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I had missed this in earlier, and isn't strictly relevant for the PR but I am curious, what does a pointer value add here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the zero-value for
bool
isfalse
but the default value may betrue
, using a pointer instead allows us to use a nil pointer value as a way to represent when the user didn't pass a value in and a default should be used.