Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e090918
Add dumb scale test agent
michel-laterman Dec 31, 2025
229943d
Change scale test driver to use example agents
michel-laterman Jan 6, 2026
d10f4e3
Cleanup
michel-laterman Jan 6, 2026
6591f69
Server performance improvements
michel-laterman Jan 7, 2026
b2e6edd
Fix issues
michel-laterman Jan 7, 2026
11e0b80
fix
michel-laterman Jan 7, 2026
5dea625
Remove extra changes
michel-laterman Jan 7, 2026
f148d9a
Fix comments
michel-laterman Jan 7, 2026
72aaae6
Add connection count intstrumentation to example server
michel-laterman Jan 8, 2026
0225f00
Control scale agent verbosity
michel-laterman Jan 8, 2026
1aef6c8
Use internal NopLogger
michel-laterman Jan 9, 2026
4842f66
Merge remote-tracking branch 'origin/main' into enhancement/scale-test
michel-laterman Jan 9, 2026
853d54e
Cleanup scale test driver, provide dockerfile, adjust makefile target
michel-laterman Jan 13, 2026
35dedbd
Merge branch 'main' into enhancement/scale-test
michel-laterman Jan 13, 2026
3e3d5ac
Allow example agent to be started in scale-test mode
michel-laterman Jan 16, 2026
0fa4c86
Remove unused dockerfile
michel-laterman Jan 16, 2026
0fd21e7
fix fmt
michel-laterman Jan 16, 2026
529cb4f
Merge remote-tracking branch 'origin/main' into enhancement/scale-test
michel-laterman Jan 16, 2026
3d7d253
Review feedback
michel-laterman Jan 19, 2026
1797dae
Fix README typos
michel-laterman Jan 19, 2026
53be1ed
Remove metrics from example server
michel-laterman Jan 21, 2026
182cdd3
fix go.mod
michel-laterman Jan 21, 2026
16f7ad1
Review feedback
michel-laterman Feb 6, 2026
b3a3ded
Fix typo
michel-laterman Feb 6, 2026
8a1ba2c
Merge remote-tracking branch 'origin/main' into enhancement/scale-test
michel-laterman Feb 6, 2026
ded57ab
Fix merge
michel-laterman Feb 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions internal/examples/agent/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# agent

Agent provides provides and example agent implementation for the OpAMP protocol.

Both HTTP and Websocket connections are supported by the agent, however the [examples/server](../examples/server) only supports Websocket.

The example agent can be in a normal mode; where the binary starts a single agent, or in scale mode (when `-run-scale` is passed or `AGENT_RUN_SCALE=true` is set).

When in scale mode the process will start multiple agents (up to `-agent-scale-count/AGENT_SCALE_COUNT`) in the same process.
All agents will use the same scheme when connection to the OpAMP server (HTTP/Websocket).

In scale mode, the agent orchestartor will log to stdout, and the agents to stderr.

## Usage

```
Usage of agent:
-endpoint string
OpAMP server endpoint URL (env var: AGENT_ENDPOINT). (default "wss://127.0.0.1:4320/v1/opamp")
-heartbeat duration
Heartbeat duration (env var: AGENT_HEARTBEAT). (default 30s)
-quite-agent
Disable agent logger (env var: AGENT_QUITE).
-run-scale
Run in scale-test mode (env var: AGENT_RUN_SCALE).
-scale-count uint
The number of agents to start in scale mode (env var: AGENT_SCALE_COUNT). (default 1000)
-t string
Agent Type String (env var: AGENT_TYPE). (default "io.opentelemetry.collector")
-tls-ca_file string
Path to the CA cert. It verifies the server certificate (env var: AGENT_TLS_CA_FILE).
-tls-cert_file string
Path to the TLS cert (env var: AGENT_TLS_CERT_FILE).
-tls-insecure
Disable the client transport security (env var: AGENT_TLS_INSECURE).
-tls-insecure_skip_verify
Will enable TLS but not verify the certificate (env var: AGENT_TLS_INSECURE_SKIP_VERIFY).
-tls-key_file string
Path to the TLS key (env var: AGENT_TLS_KEY_FILE).
-v string
Agent Version String (env var: AGENT_VERSION). (default "1.0.0")
```
152 changes: 102 additions & 50 deletions internal/examples/agent/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"encoding/pem"
"errors"
"fmt"
"log"
"net/http"
"os"
"runtime"
Expand All @@ -28,7 +29,7 @@ import (
"github.com/open-telemetry/opamp-go/protobufs"
)

const localConfig = `
var localConfig = []byte(`
exporters:
otlp:
endpoint: localhost:1111
Expand All @@ -45,26 +46,30 @@ service:
receivers: [otlp]
processors: []
exporters: [otlp]
`
`)

// Agent identification constants
const (
agentType = "io.opentelemetry.collector"
agentVersion = "1.0.0"
)

const customCapability_Health = "io.opentelemetry.custom.health"

type Agent struct {
client client.OpAMPClient
logger types.Logger

agentType string
agentVersion string
instanceId uuid.UUID

agentConfig *config.AgentConfig

effectiveConfig string

instanceId uuid.UUID
effectiveConfig []byte

agentDescription *protobufs.AgentDescription

opampClient client.OpAMPClient

remoteConfigStatus *protobufs.RemoteConfigStatus

metricReporter *MetricReporter
Expand Down Expand Up @@ -92,31 +97,58 @@ func (p *proxySettings) Clone() *proxySettings {
}
}

func NewAgent(logger types.Logger, agentType, agentVersion string, agentConfig *config.AgentConfig) *Agent {
type Option func(agent *Agent)

// WithLogger is used to set an Agent's logger
func WithLogger(l types.Logger) Option {
return func(agent *Agent) {
agent.logger = l
}
}

// WithAgentType is used to set an Agent's type
func WithAgentType(s string) Option {
return func(agent *Agent) {
agent.agentType = s
}
}

// WithAgentVersion is used to set an Agent's version
func WithAgentVersion(s string) Option {
return func(agent *Agent) {
agent.agentVersion = s
}
}

// WithInstanceID is used to set an Agent's id
func WithInstanceID(id uuid.UUID) Option {
return func(agent *Agent) {
agent.instanceId = id
}
}

// WithNoClientCertRequest will ensure the agent does not request a client cert when initially connecting.
func WithNoClientCertRequest() Option {
return func(agent *Agent) {
agent.certRequested = true
}
}

func NewAgent(agentConfig *config.AgentConfig, options ...Option) *Agent {
agent := &Agent{
effectiveConfig: localConfig,
logger: logger,
logger: &Logger{Logger: log.Default()},
agentType: agentType,
agentVersion: agentVersion,
agentConfig: agentConfig,
effectiveConfig: localConfig,
}

agent.createAgentIdentity()
agent.logger.Debugf(context.Background(), "Agent starting, id=%v, type=%s, version=%s.",
agent.instanceId, agentType, agentVersion)

agent.loadLocalConfig()

tls, err := agentConfig.GetTLSConfig(context.Background())
if err != nil {
agent.logger.Errorf(context.Background(), "Cannot get the TLS config: %v", err)
return nil
for _, option := range options {
option(agent)
}

if err = agent.connect(withTLSConfig(tls)); err != nil {
agent.logger.Errorf(context.Background(), "Cannot connect OpAMP client: %v", err)
return nil
}
agent.createAgentIdentity()
agent.loadLocalConfig()

return agent
}
Expand All @@ -142,11 +174,18 @@ func withProxy(proxy *proxySettings) settingsOp {
}

func (agent *Agent) connect(ops ...settingsOp) error {
agent.opampClient = client.NewWebSocket(agent.logger)
if strings.HasPrefix(agent.agentConfig.Endpoint, "http") {
agent.client = client.NewHTTP(agent.logger)
} else if strings.HasPrefix(agent.agentConfig.Endpoint, "ws") {
agent.client = client.NewWebSocket(agent.logger)
} else {
return fmt.Errorf("server endpoint has unknown scheme: %s", agent.agentConfig.Endpoint)
}

settings := types.StartSettings{
OpAMPServerURL: agent.agentConfig.Endpoint,
InstanceUid: types.InstanceUid(agent.instanceId),
OpAMPServerURL: agent.agentConfig.Endpoint,
HeartbeatInterval: agent.agentConfig.HeartbeatInterval,
InstanceUid: types.InstanceUid(agent.instanceId),
Callbacks: types.Callbacks{
OnConnect: func(ctx context.Context) {
agent.logger.Debugf(ctx, "Connected to the server.")
Expand Down Expand Up @@ -178,7 +217,7 @@ func (agent *Agent) connect(ops ...settingsOp) error {
headers: settings.ProxyHeaders,
}

err := agent.opampClient.SetAgentDescription(agent.agentDescription)
err := agent.client.SetAgentDescription(agent.agentDescription)
if err != nil {
return err
}
Expand All @@ -189,7 +228,7 @@ func (agent *Agent) connect(ops ...settingsOp) error {
protobufs.AgentCapabilities_AgentCapabilities_ReportsOwnMetrics |
protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings |
protobufs.AgentCapabilities_AgentCapabilities_ReportsConnectionSettingsStatus
err = agent.opampClient.SetCapabilities(&supportedCapabilities)
err = agent.client.SetCapabilities(&supportedCapabilities)
if err != nil {
return err
}
Expand All @@ -199,7 +238,7 @@ func (agent *Agent) connect(ops ...settingsOp) error {
customCapability_Health,
},
}
err = agent.opampClient.SetCustomCapabilities(customCapabilities)
err = agent.client.SetCustomCapabilities(customCapabilities)
if err != nil {
return err
}
Expand All @@ -214,7 +253,7 @@ func (agent *Agent) connect(ops ...settingsOp) error {

agent.logger.Debugf(context.Background(), "Starting OpAMP client...")

err = agent.opampClient.Start(context.Background(), settings)
err = agent.client.Start(context.Background(), settings)
if err != nil {
return err
}
Expand All @@ -226,16 +265,19 @@ func (agent *Agent) connect(ops ...settingsOp) error {

func (agent *Agent) disconnect(ctx context.Context) {
agent.logger.Debugf(ctx, "Disconnecting from server...")
agent.opampClient.Stop(ctx)
agent.client.Stop(ctx)
}

// createAgentIdentity sets the instanceId if it is not already set and populates agentDescription.
func (agent *Agent) createAgentIdentity() {
// Generate instance id.
uid, err := uuid.NewV7()
if err != nil {
panic(err)
if agent.instanceId == uuid.Nil {
uid, err := uuid.NewV7()
if err != nil {
panic(err)
}
agent.instanceId = uid
}
agent.instanceId = uid

hostname, _ := os.Hostname()

Expand Down Expand Up @@ -288,23 +330,24 @@ func (agent *Agent) updateAgentIdentity(ctx context.Context, instanceId uuid.UUI
}
}

// loadLocalConfig sets effectiveConfig
func (agent *Agent) loadLocalConfig() {
k := koanf.New(".")
_ = k.Load(rawbytes.Provider([]byte(localConfig)), yaml.Parser())
_ = k.Load(rawbytes.Provider(localConfig), yaml.Parser())

effectiveConfigBytes, err := k.Marshal(yaml.Parser())
if err != nil {
panic(err)
}

agent.effectiveConfig = string(effectiveConfigBytes)
agent.effectiveConfig = effectiveConfigBytes
}

func (agent *Agent) composeEffectiveConfig() *protobufs.EffectiveConfig {
return &protobufs.EffectiveConfig{
ConfigMap: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: []byte(agent.effectiveConfig)},
"": {Body: agent.effectiveConfig},
},
},
}
Expand Down Expand Up @@ -358,7 +401,7 @@ func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) (conf

// Begin with local config. We will later merge received configs on top of it.
k := koanf.New(".")
if err := k.Load(rawbytes.Provider([]byte(localConfig)), yaml.Parser()); err != nil {
if err := k.Load(rawbytes.Provider(localConfig), yaml.Parser()); err != nil {
return false, err
}

Expand Down Expand Up @@ -405,21 +448,30 @@ func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) (conf
panic(err)
}

newEffectiveConfig := string(effectiveConfigBytes)
configChanged = false
if agent.effectiveConfig != newEffectiveConfig {
if !bytes.Equal(agent.effectiveConfig, effectiveConfigBytes) {
agent.logger.Debugf(context.Background(), "Effective config changed. Need to report to server.")
agent.effectiveConfig = newEffectiveConfig
agent.effectiveConfig = effectiveConfigBytes
configChanged = true
}

return configChanged, nil
}

func (agent *Agent) Start() error {
tls, err := agent.agentConfig.GetTLSConfig(context.Background())
if err != nil {
return err
}
agent.logger.Debugf(context.Background(), "Agent starting, id=%v, type=%s, version=%s.",
agent.instanceId, agent.agentType, agent.agentVersion)
return agent.connect(withTLSConfig(tls))
}

func (agent *Agent) Shutdown() {
agent.logger.Debugf(context.Background(), "Agent shutting down...")
if agent.opampClient != nil {
_ = agent.opampClient.Stop(context.Background())
if agent.client != nil {
_ = agent.client.Stop(context.Background())
}
}

Expand Down Expand Up @@ -483,7 +535,7 @@ func (agent *Agent) requestClientCertificate() {

// Send the request to the Server (immediately if already connected
// or upon next successful connection).
err = agent.opampClient.RequestConnectionSettings(
err = agent.client.RequestConnectionSettings(
&protobufs.ConnectionSettingsRequest{
Opamp: &protobufs.OpAMPConnectionSettingsRequest{
CertificateRequest: &protobufs.CertificateRequest{
Expand All @@ -506,15 +558,15 @@ func (agent *Agent) onMessage(ctx context.Context, msg *types.MessageData) {
var err error
configChanged, err = agent.applyRemoteConfig(msg.RemoteConfig)
if err != nil {
agent.opampClient.SetRemoteConfigStatus(
agent.client.SetRemoteConfigStatus(
&protobufs.RemoteConfigStatus{
LastRemoteConfigHash: msg.RemoteConfig.ConfigHash,
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED,
ErrorMessage: err.Error(),
},
)
} else {
agent.opampClient.SetRemoteConfigStatus(&protobufs.RemoteConfigStatus{
agent.client.SetRemoteConfigStatus(&protobufs.RemoteConfigStatus{
LastRemoteConfigHash: msg.RemoteConfig.ConfigHash,
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED,
})
Expand All @@ -535,7 +587,7 @@ func (agent *Agent) onMessage(ctx context.Context, msg *types.MessageData) {
}

if configChanged {
err := agent.opampClient.UpdateEffectiveConfig(ctx)
err := agent.client.UpdateEffectiveConfig(ctx)
if err != nil {
agent.logger.Errorf(ctx, err.Error())
}
Expand Down Expand Up @@ -583,7 +635,7 @@ func (agent *Agent) processCustomMessage(ctx context.Context, customMessage *pro

func (agent *Agent) sendCustomMessage(ctx context.Context, message *protobufs.CustomMessage) error {
for {
sendingChan, err := agent.opampClient.SendCustomMessage(message)
sendingChan, err := agent.client.SendCustomMessage(message)

switch {
case err == nil:
Expand Down
9 changes: 9 additions & 0 deletions internal/examples/agent/agent/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package agent
import (
"context"
"log"
"os"

"github.com/google/uuid"
"github.com/open-telemetry/opamp-go/client/types"
)

Expand All @@ -13,6 +15,13 @@ type Logger struct {
Logger *log.Logger
}

// NewScaleLogger returns a logger that prints to stderr with passed uid as a part of the prefix.
func NewScaleLogger(uid uuid.UUID) *Logger {
return &Logger{
Logger: log.New(os.Stderr, "agent-"+uid.String()+": ", log.Ldate|log.Lmicroseconds|log.Lmsgprefix),
}
}

func (l *Logger) Debugf(_ context.Context, format string, v ...interface{}) {
l.Logger.Printf(format, v...)
}
Expand Down
Loading
Loading