diff --git a/internal/examples/agent/README.md b/internal/examples/agent/README.md new file mode 100644 index 00000000..ebf7bef9 --- /dev/null +++ b/internal/examples/agent/README.md @@ -0,0 +1,42 @@ +# agent + +Agent provides provides and example agent implementation for the OpAMP protocol. + +Both HTTP and Websocket connections are supported by the agent, however the [examples/server](../examples/server) only supports Websocket. + +The example agent can be in a normal mode; where the binary starts a single agent, or in scale mode (when `-run-scale` is passed or `AGENT_RUN_SCALE=true` is set). + +When in scale mode the process will start multiple agents (up to `-agent-scale-count/AGENT_SCALE_COUNT`) in the same process. +All agents will use the same scheme when connection to the OpAMP server (HTTP/Websocket). + +In scale mode, the agent orchestartor will log to stdout, and the agents to stderr. + +## Usage + +``` +Usage of agent: + -endpoint string + OpAMP server endpoint URL (env var: AGENT_ENDPOINT). (default "wss://127.0.0.1:4320/v1/opamp") + -heartbeat duration + Heartbeat duration (env var: AGENT_HEARTBEAT). (default 30s) + -quite-agent + Disable agent logger (env var: AGENT_QUITE). + -run-scale + Run in scale-test mode (env var: AGENT_RUN_SCALE). + -scale-count uint + The number of agents to start in scale mode (env var: AGENT_SCALE_COUNT). (default 1000) + -t string + Agent Type String (env var: AGENT_TYPE). (default "io.opentelemetry.collector") + -tls-ca_file string + Path to the CA cert. It verifies the server certificate (env var: AGENT_TLS_CA_FILE). + -tls-cert_file string + Path to the TLS cert (env var: AGENT_TLS_CERT_FILE). + -tls-insecure + Disable the client transport security (env var: AGENT_TLS_INSECURE). + -tls-insecure_skip_verify + Will enable TLS but not verify the certificate (env var: AGENT_TLS_INSECURE_SKIP_VERIFY). + -tls-key_file string + Path to the TLS key (env var: AGENT_TLS_KEY_FILE). + -v string + Agent Version String (env var: AGENT_VERSION). (default "1.0.0") +``` diff --git a/internal/examples/agent/agent/agent.go b/internal/examples/agent/agent/agent.go index 8f400646..c830b36d 100644 --- a/internal/examples/agent/agent/agent.go +++ b/internal/examples/agent/agent/agent.go @@ -11,6 +11,7 @@ import ( "encoding/pem" "errors" "fmt" + "log" "net/http" "os" "runtime" @@ -28,7 +29,7 @@ import ( "github.com/open-telemetry/opamp-go/protobufs" ) -const localConfig = ` +var localConfig = []byte(` exporters: otlp: endpoint: localhost:1111 @@ -45,26 +46,30 @@ service: receivers: [otlp] processors: [] exporters: [otlp] -` +`) + +// Agent identification constants +const ( + agentType = "io.opentelemetry.collector" + agentVersion = "1.0.0" +) const customCapability_Health = "io.opentelemetry.custom.health" type Agent struct { + client client.OpAMPClient logger types.Logger agentType string agentVersion string + instanceId uuid.UUID agentConfig *config.AgentConfig - effectiveConfig string - - instanceId uuid.UUID + effectiveConfig []byte agentDescription *protobufs.AgentDescription - opampClient client.OpAMPClient - remoteConfigStatus *protobufs.RemoteConfigStatus metricReporter *MetricReporter @@ -92,31 +97,58 @@ func (p *proxySettings) Clone() *proxySettings { } } -func NewAgent(logger types.Logger, agentType, agentVersion string, agentConfig *config.AgentConfig) *Agent { +type Option func(agent *Agent) + +// WithLogger is used to set an Agent's logger +func WithLogger(l types.Logger) Option { + return func(agent *Agent) { + agent.logger = l + } +} + +// WithAgentType is used to set an Agent's type +func WithAgentType(s string) Option { + return func(agent *Agent) { + agent.agentType = s + } +} + +// WithAgentVersion is used to set an Agent's version +func WithAgentVersion(s string) Option { + return func(agent *Agent) { + agent.agentVersion = s + } +} + +// WithInstanceID is used to set an Agent's id +func WithInstanceID(id uuid.UUID) Option { + return func(agent *Agent) { + agent.instanceId = id + } +} + +// WithNoClientCertRequest will ensure the agent does not request a client cert when initially connecting. +func WithNoClientCertRequest() Option { + return func(agent *Agent) { + agent.certRequested = true + } +} + +func NewAgent(agentConfig *config.AgentConfig, options ...Option) *Agent { agent := &Agent{ - effectiveConfig: localConfig, - logger: logger, + logger: &Logger{Logger: log.Default()}, agentType: agentType, agentVersion: agentVersion, agentConfig: agentConfig, + effectiveConfig: localConfig, } - agent.createAgentIdentity() - agent.logger.Debugf(context.Background(), "Agent starting, id=%v, type=%s, version=%s.", - agent.instanceId, agentType, agentVersion) - - agent.loadLocalConfig() - - tls, err := agentConfig.GetTLSConfig(context.Background()) - if err != nil { - agent.logger.Errorf(context.Background(), "Cannot get the TLS config: %v", err) - return nil + for _, option := range options { + option(agent) } - if err = agent.connect(withTLSConfig(tls)); err != nil { - agent.logger.Errorf(context.Background(), "Cannot connect OpAMP client: %v", err) - return nil - } + agent.createAgentIdentity() + agent.loadLocalConfig() return agent } @@ -142,11 +174,18 @@ func withProxy(proxy *proxySettings) settingsOp { } func (agent *Agent) connect(ops ...settingsOp) error { - agent.opampClient = client.NewWebSocket(agent.logger) + if strings.HasPrefix(agent.agentConfig.Endpoint, "http") { + agent.client = client.NewHTTP(agent.logger) + } else if strings.HasPrefix(agent.agentConfig.Endpoint, "ws") { + agent.client = client.NewWebSocket(agent.logger) + } else { + return fmt.Errorf("server endpoint has unknown scheme: %s", agent.agentConfig.Endpoint) + } settings := types.StartSettings{ - OpAMPServerURL: agent.agentConfig.Endpoint, - InstanceUid: types.InstanceUid(agent.instanceId), + OpAMPServerURL: agent.agentConfig.Endpoint, + HeartbeatInterval: agent.agentConfig.HeartbeatInterval, + InstanceUid: types.InstanceUid(agent.instanceId), Callbacks: types.Callbacks{ OnConnect: func(ctx context.Context) { agent.logger.Debugf(ctx, "Connected to the server.") @@ -178,7 +217,7 @@ func (agent *Agent) connect(ops ...settingsOp) error { headers: settings.ProxyHeaders, } - err := agent.opampClient.SetAgentDescription(agent.agentDescription) + err := agent.client.SetAgentDescription(agent.agentDescription) if err != nil { return err } @@ -189,7 +228,7 @@ func (agent *Agent) connect(ops ...settingsOp) error { protobufs.AgentCapabilities_AgentCapabilities_ReportsOwnMetrics | protobufs.AgentCapabilities_AgentCapabilities_AcceptsOpAMPConnectionSettings | protobufs.AgentCapabilities_AgentCapabilities_ReportsConnectionSettingsStatus - err = agent.opampClient.SetCapabilities(&supportedCapabilities) + err = agent.client.SetCapabilities(&supportedCapabilities) if err != nil { return err } @@ -199,7 +238,7 @@ func (agent *Agent) connect(ops ...settingsOp) error { customCapability_Health, }, } - err = agent.opampClient.SetCustomCapabilities(customCapabilities) + err = agent.client.SetCustomCapabilities(customCapabilities) if err != nil { return err } @@ -214,7 +253,7 @@ func (agent *Agent) connect(ops ...settingsOp) error { agent.logger.Debugf(context.Background(), "Starting OpAMP client...") - err = agent.opampClient.Start(context.Background(), settings) + err = agent.client.Start(context.Background(), settings) if err != nil { return err } @@ -226,16 +265,19 @@ func (agent *Agent) connect(ops ...settingsOp) error { func (agent *Agent) disconnect(ctx context.Context) { agent.logger.Debugf(ctx, "Disconnecting from server...") - agent.opampClient.Stop(ctx) + agent.client.Stop(ctx) } +// createAgentIdentity sets the instanceId if it is not already set and populates agentDescription. func (agent *Agent) createAgentIdentity() { // Generate instance id. - uid, err := uuid.NewV7() - if err != nil { - panic(err) + if agent.instanceId == uuid.Nil { + uid, err := uuid.NewV7() + if err != nil { + panic(err) + } + agent.instanceId = uid } - agent.instanceId = uid hostname, _ := os.Hostname() @@ -288,23 +330,24 @@ func (agent *Agent) updateAgentIdentity(ctx context.Context, instanceId uuid.UUI } } +// loadLocalConfig sets effectiveConfig func (agent *Agent) loadLocalConfig() { k := koanf.New(".") - _ = k.Load(rawbytes.Provider([]byte(localConfig)), yaml.Parser()) + _ = k.Load(rawbytes.Provider(localConfig), yaml.Parser()) effectiveConfigBytes, err := k.Marshal(yaml.Parser()) if err != nil { panic(err) } - agent.effectiveConfig = string(effectiveConfigBytes) + agent.effectiveConfig = effectiveConfigBytes } func (agent *Agent) composeEffectiveConfig() *protobufs.EffectiveConfig { return &protobufs.EffectiveConfig{ ConfigMap: &protobufs.AgentConfigMap{ ConfigMap: map[string]*protobufs.AgentConfigFile{ - "": {Body: []byte(agent.effectiveConfig)}, + "": {Body: agent.effectiveConfig}, }, }, } @@ -358,7 +401,7 @@ func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) (conf // Begin with local config. We will later merge received configs on top of it. k := koanf.New(".") - if err := k.Load(rawbytes.Provider([]byte(localConfig)), yaml.Parser()); err != nil { + if err := k.Load(rawbytes.Provider(localConfig), yaml.Parser()); err != nil { return false, err } @@ -405,21 +448,30 @@ func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) (conf panic(err) } - newEffectiveConfig := string(effectiveConfigBytes) configChanged = false - if agent.effectiveConfig != newEffectiveConfig { + if !bytes.Equal(agent.effectiveConfig, effectiveConfigBytes) { agent.logger.Debugf(context.Background(), "Effective config changed. Need to report to server.") - agent.effectiveConfig = newEffectiveConfig + agent.effectiveConfig = effectiveConfigBytes configChanged = true } return configChanged, nil } +func (agent *Agent) Start() error { + tls, err := agent.agentConfig.GetTLSConfig(context.Background()) + if err != nil { + return err + } + agent.logger.Debugf(context.Background(), "Agent starting, id=%v, type=%s, version=%s.", + agent.instanceId, agent.agentType, agent.agentVersion) + return agent.connect(withTLSConfig(tls)) +} + func (agent *Agent) Shutdown() { agent.logger.Debugf(context.Background(), "Agent shutting down...") - if agent.opampClient != nil { - _ = agent.opampClient.Stop(context.Background()) + if agent.client != nil { + _ = agent.client.Stop(context.Background()) } } @@ -483,7 +535,7 @@ func (agent *Agent) requestClientCertificate() { // Send the request to the Server (immediately if already connected // or upon next successful connection). - err = agent.opampClient.RequestConnectionSettings( + err = agent.client.RequestConnectionSettings( &protobufs.ConnectionSettingsRequest{ Opamp: &protobufs.OpAMPConnectionSettingsRequest{ CertificateRequest: &protobufs.CertificateRequest{ @@ -506,7 +558,7 @@ func (agent *Agent) onMessage(ctx context.Context, msg *types.MessageData) { var err error configChanged, err = agent.applyRemoteConfig(msg.RemoteConfig) if err != nil { - agent.opampClient.SetRemoteConfigStatus( + agent.client.SetRemoteConfigStatus( &protobufs.RemoteConfigStatus{ LastRemoteConfigHash: msg.RemoteConfig.ConfigHash, Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED, @@ -514,7 +566,7 @@ func (agent *Agent) onMessage(ctx context.Context, msg *types.MessageData) { }, ) } else { - agent.opampClient.SetRemoteConfigStatus(&protobufs.RemoteConfigStatus{ + agent.client.SetRemoteConfigStatus(&protobufs.RemoteConfigStatus{ LastRemoteConfigHash: msg.RemoteConfig.ConfigHash, Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED, }) @@ -535,7 +587,7 @@ func (agent *Agent) onMessage(ctx context.Context, msg *types.MessageData) { } if configChanged { - err := agent.opampClient.UpdateEffectiveConfig(ctx) + err := agent.client.UpdateEffectiveConfig(ctx) if err != nil { agent.logger.Errorf(ctx, err.Error()) } @@ -583,7 +635,7 @@ func (agent *Agent) processCustomMessage(ctx context.Context, customMessage *pro func (agent *Agent) sendCustomMessage(ctx context.Context, message *protobufs.CustomMessage) error { for { - sendingChan, err := agent.opampClient.SendCustomMessage(message) + sendingChan, err := agent.client.SendCustomMessage(message) switch { case err == nil: diff --git a/internal/examples/agent/agent/logger.go b/internal/examples/agent/agent/logger.go index 2e5c5ef5..44d746a2 100644 --- a/internal/examples/agent/agent/logger.go +++ b/internal/examples/agent/agent/logger.go @@ -3,7 +3,9 @@ package agent import ( "context" "log" + "os" + "github.com/google/uuid" "github.com/open-telemetry/opamp-go/client/types" ) @@ -13,6 +15,13 @@ type Logger struct { Logger *log.Logger } +// NewScaleLogger returns a logger that prints to stderr with passed uid as a part of the prefix. +func NewScaleLogger(uid uuid.UUID) *Logger { + return &Logger{ + Logger: log.New(os.Stderr, "agent-"+uid.String()+": ", log.Ldate|log.Lmicroseconds|log.Lmsgprefix), + } +} + func (l *Logger) Debugf(_ context.Context, format string, v ...interface{}) { l.Logger.Printf(format, v...) } diff --git a/internal/examples/agent/main.go b/internal/examples/agent/main.go index 7c6d9e88..9a221a05 100644 --- a/internal/examples/agent/main.go +++ b/internal/examples/agent/main.go @@ -1,60 +1,269 @@ package main import ( + "context" + "errors" "flag" + "fmt" "log" + "net/url" "os" "os/signal" + "strconv" + "time" + opampinternal "github.com/open-telemetry/opamp-go/internal" "github.com/open-telemetry/opamp-go/internal/examples/agent/agent" "github.com/open-telemetry/opamp-go/internal/examples/config" + + "github.com/google/uuid" "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/otel" ) -func main() { - var agentType string - flag.StringVar(&agentType, "t", "io.opentelemetry.collector", "Agent Type String") +// nopErrorHandler is used to turn any otel errors generated by scale agents into a nop. +type nopErrorHandler struct{} + +func (l *nopErrorHandler) Handle(err error) {} + +// flagConfig are all the flags/env vars that can be used to configure the agent process +type flagConfig struct { + // Agent config options + agentType string + agentVersion string + tlsInsecure bool + tlsInsecureSkipVerify bool + tlsCertFile string + tlsKeyFile string + tlsCAFile string + endpoint string + heartbeat time.Duration + quietAgent bool + // scaleCount = 1 runs a normal agent + // scaleCount > 1 runs scale test agents (pre-assigned IDs, no initial cert request) + scaleCount uint64 +} + +func (cfg flagConfig) verifyArgs() error { + if cfg.tlsCertFile != "" { + fi, err := os.Stat(cfg.tlsCertFile) + if err != nil { + return fmt.Errorf("tls-cert_file stat failed: %w", err) + } + if fi.IsDir() { + return fmt.Errorf("tls-cert_file: %s is a directory", cfg.tlsCertFile) + } + } + + if cfg.tlsKeyFile != "" { + fi, err := os.Stat(cfg.tlsKeyFile) + if err != nil { + return fmt.Errorf("tls-key_file stat failed: %w", err) + } + if fi.IsDir() { + return fmt.Errorf("tls-key_file: %s is a directory", cfg.tlsKeyFile) + } + } + + if cfg.tlsCAFile != "" { + fi, err := os.Stat(cfg.tlsCAFile) + if err != nil { + return fmt.Errorf("tls-ca_file stat failed: %w", err) + } + if fi.IsDir() { + return fmt.Errorf("tls-ca_file: %s is a directory", cfg.tlsCAFile) + } + } + + parsedURL, err := url.Parse(cfg.endpoint) + if err != nil { + return fmt.Errorf("endpoint failed to parse: %w", err) + } + switch parsedURL.Scheme { + case "http", "https": + case "ws", "wss": + default: + return fmt.Errorf("endpoint has an unknown scheme: %s", parsedURL.Scheme) + } + + if cfg.heartbeat < 0 { + return fmt.Errorf("heartbeat must be non-negative, got: %s", cfg.heartbeat) + } + + if cfg.scaleCount == 0 { + return errors.New("scale count must not be zero") + } + return nil +} + +// loadEnv will attempt to load config options from environment variables. +// used to specifiy options when running the agent in a container. +func loadEnv(cfg *flagConfig) { + if s, ok := os.LookupEnv("AGENT_TYPE"); ok { + cfg.agentType = s + } + + if s, ok := os.LookupEnv("AGENT_VERSION"); ok { + cfg.agentVersion = s + } + + if s, ok := os.LookupEnv("AGENT_TLS_INSECURE"); ok { + b, err := strconv.ParseBool(s) + if err == nil { + cfg.tlsInsecure = b + } + } + + if s, ok := os.LookupEnv("AGENT_TLS_INSECURE_SKIP_VERIFY"); ok { + b, err := strconv.ParseBool(s) + if err == nil { + cfg.tlsInsecureSkipVerify = b + } + } - var agentVersion string - flag.StringVar(&agentVersion, "v", "1.0.0", "Agent Version String") + if s, ok := os.LookupEnv("AGENT_TLS_CERT_FILE"); ok { + cfg.tlsCertFile = s + } - var tlsInsecure bool - flag.BoolVar(&tlsInsecure, "tls-insecure", false, "Disable the client transport security.") + if s, ok := os.LookupEnv("AGENT_TLS_KEY_FILE"); ok { + cfg.tlsKeyFile = s + } - var tlsInsecureSkipVerify bool - flag.BoolVar(&tlsInsecureSkipVerify, "tls-insecure_skip_verify", false, "Will enable TLS but not verify the certificate.") + if s, ok := os.LookupEnv("AGENT_TLS_CA_FILE"); ok { + cfg.tlsCAFile = s + } - var tlsCertFile string - flag.StringVar(&tlsCertFile, "tls-cert_file", "", "Path to the TLS cert") + if s, ok := os.LookupEnv("AGENT_ENDPOINT"); ok { + cfg.endpoint = s + } - var tlsKeyFile string - flag.StringVar(&tlsKeyFile, "tls-key_file", "", "Path to the TLS key") + if s, ok := os.LookupEnv("AGENT_HEARTBEAT"); ok { + dur, err := time.ParseDuration(s) + if err == nil { + cfg.heartbeat = dur + } + } - var tlsCAFile string - flag.StringVar(&tlsCAFile, "tls-ca_file", "", "Path to the CA cert. It verifies the server certificate") + if s, ok := os.LookupEnv("AGENT_QUIET"); ok { + b, err := strconv.ParseBool(s) + if err == nil { + cfg.quietAgent = b + } + } - var endpoint string - flag.StringVar(&endpoint, "endpoint", "wss://127.0.0.1:4320/v1/opamp", "OpAMP server endpoint URL") + if s, ok := os.LookupEnv("AGENT_SCALE_COUNT"); ok { + count, err := strconv.ParseUint(s, 10, 64) + if err == nil { + cfg.scaleCount = count + } + } +} + +func main() { + var cfg flagConfig + flag.StringVar(&cfg.agentType, "t", "io.opentelemetry.collector", "Agent Type String (env var: AGENT_TYPE).") + flag.StringVar(&cfg.agentVersion, "v", "1.0.0", "Agent Version String (env var: AGENT_VERSION).") + flag.BoolVar(&cfg.tlsInsecure, "tls-insecure", false, "Disable the client transport security (env var: AGENT_TLS_INSECURE).") + flag.BoolVar(&cfg.tlsInsecureSkipVerify, "tls-insecure_skip_verify", false, "Will enable TLS but not verify the certificate (env var: AGENT_TLS_INSECURE_SKIP_VERIFY).") + flag.StringVar(&cfg.tlsCertFile, "tls-cert_file", "", "Path to the TLS cert (env var: AGENT_TLS_CERT_FILE).") + flag.StringVar(&cfg.tlsKeyFile, "tls-key_file", "", "Path to the TLS key (env var: AGENT_TLS_KEY_FILE).") + flag.StringVar(&cfg.tlsCAFile, "tls-ca_file", "", "Path to the CA cert. It verifies the server certificate (env var: AGENT_TLS_CA_FILE).") + flag.StringVar(&cfg.endpoint, "endpoint", "wss://127.0.0.1:4320/v1/opamp", "OpAMP server endpoint URL (env var: AGENT_ENDPOINT).") + flag.DurationVar(&cfg.heartbeat, "heartbeat", time.Second*30, "Heartbeat duration (env var: AGENT_HEARTBEAT).") + flag.BoolVar(&cfg.quietAgent, "quite-agent", false, "Disable agent logger (env var: AGENT_QUIET).") + flag.Uint64Var(&cfg.scaleCount, "scale-count", 1, "The number of agents to start in scale mode (env var: AGENT_SCALE_COUNT).") flag.Parse() + loadEnv(&cfg) + + logger := log.Default() + if err := cfg.verifyArgs(); err != nil { + logger.Fatalf("Arg verification error: %v", err) + } - config := &config.AgentConfig{ - Endpoint: endpoint, + if cfg.scaleCount > 1 { + logger = log.New(os.Stdout, "scale-test: ", log.Ldate|log.Lmicroseconds|log.Lmsgprefix) + } + + if cfg.quietAgent { + // Silence the otel errors agents can generate. + // i.e.: failed to upload metrics: ... + otel.SetErrorHandler(&nopErrorHandler{}) + } + + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) + defer cancel() + + agents, err := runScale(ctx, cfg) + if err != nil { + logger.Printf("Error starting agents: %v", err) + } + logger.Printf("%d agents started", len(agents)) + + <-ctx.Done() + for _, a := range agents { + a.Shutdown() + } + logger.Println("All agents stopped") +} + +// runScale starts and returns the configured amount of agents. +// If an error is encountered when starting an agent, it is return along with all started agents. +func runScale(ctx context.Context, cfg flagConfig) ([]*agent.Agent, error) { + nopLogger := &opampinternal.NopLogger{} + agentConfig := &config.AgentConfig{ + Endpoint: cfg.endpoint, + HeartbeatInterval: &cfg.heartbeat, TLSSetting: configtls.ClientConfig{ - Insecure: tlsInsecure, - InsecureSkipVerify: tlsInsecureSkipVerify, + Insecure: cfg.tlsInsecure, + InsecureSkipVerify: cfg.tlsInsecureSkipVerify, Config: configtls.Config{ - KeyFile: tlsKeyFile, - CertFile: tlsCertFile, - CAFile: tlsCAFile, + KeyFile: cfg.tlsKeyFile, + CertFile: cfg.tlsCertFile, + CAFile: cfg.tlsCAFile, }, }, } - agent := agent.NewAgent(&agent.Logger{Logger: log.Default()}, agentType, agentVersion, config) + // Use of slice instead of a concurrent goroutine to reduce memory usage. + agents := make([]*agent.Agent, 0, cfg.scaleCount) + var err error + for range cfg.scaleCount { + select { + case <-ctx.Done(): + return agents, err + default: + } - interrupt := make(chan os.Signal, 1) - signal.Notify(interrupt, os.Interrupt) - <-interrupt - agent.Shutdown() + opts := []agent.Option{ + agent.WithAgentType(cfg.agentType), + agent.WithAgentVersion(cfg.agentVersion), + } + if cfg.quietAgent { + opts = append(opts, agent.WithLogger(nopLogger)) + } + + // Only pass in id and logger assocaited with id when running more than one agent. + if cfg.scaleCount > 1 { + id, err := uuid.NewV7() + if err != nil { + return nil, err + } + opts = append(opts, + agent.WithNoClientCertRequest(), + agent.WithInstanceID(id), + ) + if !cfg.quietAgent { + opts = append(opts, agent.WithLogger(agent.NewScaleLogger(id))) + } + } + + a := agent.NewAgent(agentConfig, opts...) + if startErr := a.Start(); err != nil { + err = errors.Join(err, startErr) + continue + } + agents = append(agents, a) + } + return agents, err } diff --git a/internal/examples/config/agent.go b/internal/examples/config/agent.go index 3e3cf9d4..5f15766e 100644 --- a/internal/examples/config/agent.go +++ b/internal/examples/config/agent.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "fmt" "net/url" + "time" "github.com/open-telemetry/opamp-go/internal/examples/certs" "go.opentelemetry.io/collector/config/configopaque" @@ -12,8 +13,9 @@ import ( ) type AgentConfig struct { - Endpoint string `mapstructure:"endpoint"` - TLSSetting configtls.ClientConfig `mapstructure:"tls,omitempty"` + Endpoint string `mapstructure:"endpoint"` + HeartbeatInterval *time.Duration `mapstructure:"heartbeat_interval"` + TLSSetting configtls.ClientConfig `mapstructure:"tls,omitempty"` } func (a *AgentConfig) GetTLSConfig(ctx context.Context) (*tls.Config, error) { diff --git a/internal/examples/docker-compose.yml b/internal/examples/docker-compose.yml index 5018047d..eed7724a 100644 --- a/internal/examples/docker-compose.yml +++ b/internal/examples/docker-compose.yml @@ -1,5 +1,3 @@ -version: '3.8' - services: opamp-server: build: @@ -17,7 +15,7 @@ services: build: context: ../.. dockerfile: internal/examples/Dockerfile.agent - entrypoint: + entrypoint: - "./agent" - "--endpoint" - "wss://opamp-server:4320/v1/opamp" diff --git a/internal/examples/makefile b/internal/examples/makefile index 78fe17fb..20815b1d 100644 --- a/internal/examples/makefile +++ b/internal/examples/makefile @@ -57,7 +57,7 @@ run-docker: build-docker-server build-docker-agent .PHONY: docker-compose-up docker-compose-up: - cd $(SRC_ROOT)/internal/examples && docker compose up --build + cd $(SRC_ROOT)/internal/examples && docker compose up --build -d .PHONY: docker-compose-down docker-compose-down: