Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/feat_dakotapaasman-supervisor-heartbeats.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "enhancement"

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: "opampsupervisor"

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add support for client initiated OpAMP heartbeats in the supervisor."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [42533]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
63 changes: 62 additions & 1 deletion cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ func TestSupervisorConfiguresCapabilities(t *testing.T) {
require.Eventually(t, func() bool {
caps := capabilities.Load()

return caps == uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsStatus)
return caps == uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsStatus|protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat)
}, 5*time.Second, 250*time.Millisecond)
}

Expand Down Expand Up @@ -2501,3 +2501,64 @@ func TestSupervisorEmitBootstrapTelemetry(t *testing.T) {
require.Truef(t, gotSpan, "expected to find span '%s', but did not find it", expectedSpan)
}
}

func TestSupervisorReportsHeartbeat(t *testing.T) {
var heartbeatReport atomic.Bool
server := newOpAMPServer(
t,
defaultConnectingHandler,
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if isHeartbeatMessage(message) {
heartbeatReport.Store(true)
}
return &protobufs.ServerToAgent{}
},
},
)
s, _ := newSupervisor(t, "reports_heartbeat", map[string]string{"url": server.addr})

require.Nil(t, s.Start(t.Context()))
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)

// Set the heartbeat interval to 1 seconds
server.sendToSupervisor(&protobufs.ServerToAgent{
ConnectionSettings: &protobufs.ConnectionSettingsOffers{
Opamp: &protobufs.OpAMPConnectionSettings{
DestinationEndpoint: "ws://" + server.addr + "/v1/opamp",
HeartbeatIntervalSeconds: 1,
},
},
})

// supervisor disconnects from the server
waitForSupervisorConnection(server.supervisorConnected, false)

// supervisor reconnects to the server
waitForSupervisorConnection(server.supervisorConnected, true)

require.Eventually(t, func() bool {
return heartbeatReport.Load()
}, 3*time.Second, 250*time.Millisecond)
}

// isHeartbeatMessage returns true if all fields of the message are nil.
func isHeartbeatMessage(message *protobufs.AgentToServer) bool {
empty := true

empty = empty && message.AgentDescription == nil
empty = empty && message.Health == nil
empty = empty && message.EffectiveConfig == nil
empty = empty && message.RemoteConfigStatus == nil
empty = empty && message.PackageStatuses == nil
empty = empty && message.AgentDisconnect == nil
empty = empty && message.ConnectionSettingsRequest == nil
empty = empty && message.CustomCapabilities == nil
empty = empty && message.CustomMessage == nil
empty = empty && message.AvailableComponents == nil
empty = empty && message.Flags == 0

return empty
}
13 changes: 13 additions & 0 deletions cmd/opampsupervisor/specification/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ capabilities:
# The Collector will report Health.
reports_health: # true if unspecified

# The supervisor will report OpAMP heartbeats to the Server.
reports_heartbeat: # true if unspecified

storage:
# A writable directory where the Supervisor can store data
# (e.g. cached remote config).
Expand Down Expand Up @@ -504,6 +507,16 @@ the next Collector start (at the minimum the version number to be
included in AgentDescription is expected to change after the executable
is updated).

### OpAMP Heartbeats

OpAMP heartbeats are enabled by default in the Supervisor. They can be
disabled by setting `capabilities.reports_heartbeat` to `false`. The
default interval is 30 seconds, but this can be changed by the OpAMP
server sending a ServerToAgent message with the appropriate field set.
This causes the Supervisor to periodically send an empty OpAMP
AgentToServer message in order to keep the connection alive.
For more information see the [OpAMP specification](https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#opampconnectionsettingsheartbeat_interval_seconds).

### Addons Management

The Collector currently does not have a concept of addons so this OpAMP
Expand Down
5 changes: 5 additions & 0 deletions cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type Capabilities struct {
ReportsHealth bool `mapstructure:"reports_health"`
ReportsRemoteConfig bool `mapstructure:"reports_remote_config"`
ReportsAvailableComponents bool `mapstructure:"reports_available_components"`
ReportsHeartbeat bool `mapstructure:"reports_heartbeat"`
}

func (c Capabilities) SupportedCapabilities() protobufs.AgentCapabilities {
Expand Down Expand Up @@ -155,6 +156,9 @@ func (c Capabilities) SupportedCapabilities() protobufs.AgentCapabilities {
if c.ReportsAvailableComponents {
supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents
}
if c.ReportsHeartbeat {
supportedCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat
}

return supportedCapabilities
}
Expand Down Expand Up @@ -339,6 +343,7 @@ func DefaultSupervisor() Supervisor {
ReportsHealth: true,
ReportsRemoteConfig: false,
ReportsAvailableComponents: false,
ReportsHeartbeat: true,
},
Storage: Storage{
Directory: defaultStorageDir,
Expand Down
9 changes: 7 additions & 2 deletions cmd/opampsupervisor/supervisor/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,8 @@ func TestCapabilities_SupportedCapabilities(t *testing.T) {
expectedAgentCapabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsStatus |
protobufs.AgentCapabilities_AgentCapabilities_ReportsOwnMetrics |
protobufs.AgentCapabilities_AgentCapabilities_ReportsEffectiveConfig |
protobufs.AgentCapabilities_AgentCapabilities_ReportsHealth,
protobufs.AgentCapabilities_AgentCapabilities_ReportsHealth |
protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat,
},
{
name: "Empty capabilities",
Expand All @@ -534,6 +535,7 @@ func TestCapabilities_SupportedCapabilities(t *testing.T) {
ReportsHealth: true,
ReportsRemoteConfig: true,
ReportsAvailableComponents: true,
ReportsHeartbeat: true,
},
expectedAgentCapabilities: protobufs.AgentCapabilities_AgentCapabilities_ReportsStatus |
protobufs.AgentCapabilities_AgentCapabilities_ReportsEffectiveConfig |
Expand All @@ -545,7 +547,8 @@ func TestCapabilities_SupportedCapabilities(t *testing.T) {
protobufs.AgentCapabilities_AgentCapabilities_ReportsRemoteConfig |
protobufs.AgentCapabilities_AgentCapabilities_AcceptsRestartCommand |
protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings |
protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents,
protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents |
protobufs.AgentCapabilities_AgentCapabilities_ReportsHeartbeat,
},
}

Expand Down Expand Up @@ -620,6 +623,7 @@ capabilities:
reports_remote_config: true
accepts_restart_command: true
accepts_opamp_connection_settings: true
reports_heartbeat: true

storage:
directory: %s
Expand Down Expand Up @@ -662,6 +666,7 @@ telemetry:
ReportsRemoteConfig: true,
AcceptsRestartCommand: true,
AcceptsOpAMPConnectionSettings: true,
ReportsHeartbeat: true,
},
Storage: Storage{
Directory: filepath.Join(tmpDir, "storage"),
Expand Down
16 changes: 16 additions & 0 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ type Supervisor struct {

featureGates map[string]struct{}
metrics *supervisorTelemetry.Metrics

// heartbeatInterval is the interval the OpAMP client is configured to send heartbeats.
// Default is 30 seconds but can be overridden by the OpAMP server with an OpAMPConnectionSettings message.
heartbeatIntervalSeconds uint64
Comment thread
evan-bradley marked this conversation as resolved.
}

func NewSupervisor(ctx context.Context, logger *zap.Logger, cfg config.Supervisor) (*Supervisor, error) {
Expand All @@ -204,6 +208,7 @@ func NewSupervisor(ctx context.Context, logger *zap.Logger, cfg config.Superviso
agentReady: atomic.Bool{},
agentReadyChan: make(chan struct{}, 1),
metrics: &supervisorTelemetry.Metrics{},
heartbeatIntervalSeconds: 30,
}

s.runCtx, s.runCtxCancel = context.WithCancel(ctx)
Expand Down Expand Up @@ -713,6 +718,12 @@ func (s *Supervisor) startOpAMPClient() error {
return err
}

// Set heartbeat interval if the agent supports it
if s.config.Capabilities.ReportsHeartbeat {
d := time.Duration(s.heartbeatIntervalSeconds) * time.Second
settings.HeartbeatInterval = &d
}

s.telemetrySettings.Logger.Debug("Starting OpAMP client...")
if err := s.opampClient.Start(s.runCtx, settings); err != nil {
return err
Expand Down Expand Up @@ -1027,6 +1038,11 @@ func (s *Supervisor) onOpampConnectionSettings(_ context.Context, settings *prot
return err
}

// Update the heartbeat interval if the agent supports it
if s.config.Capabilities.ReportsHeartbeat {
s.heartbeatIntervalSeconds = settings.HeartbeatIntervalSeconds
}

if err := s.stopOpAMPClient(); err != nil {
s.telemetrySettings.Logger.Error("Cannot stop the OpAMP client", zap.Error(err))
return err
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
server:
endpoint: ws://{{.url}}/v1/opamp

capabilities:
reports_effective_config: true
reports_own_metrics: true
reports_own_logs: true
reports_own_traces: true
reports_health: true
accepts_remote_config: true
reports_remote_config: true
accepts_opamp_connection_settings: true
reports_heartbeat: true

storage:
directory: "{{.storage_dir}}"

agent:
executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}
Loading