Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
- Fix install command for Fleet Server bootstrap, remove need for --enrollment-token when using --fleet-server {pull}24981[24981]
- Respect host configuration for exposed processes endpoint {pull}25114[25114]
- Set --inscure in container when FLEET_SERVER_ENABLE and FLEET_INSECURE set {pull}25137[25137]

- Fixed: limit for retries to Kibana configurable {issue}25063[25063]
==== New features

- Prepare packaging for endpoint and asc files {pull}20186[20186]
Expand Down
98 changes: 73 additions & 25 deletions x-pack/elastic-agent/pkg/agent/cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"syscall"
Expand All @@ -40,9 +41,11 @@ import (
)

const (
requestRetrySleep = 1 * time.Second // sleep 1 sec between retries for HTTP requests
maxRequestRetries = 30 // maximum number of retries for HTTP requests
defaultStateDirectory = "/usr/share/elastic-agent/state" // directory that will hold the state data
requestRetrySleepEnv = "KIBANA_REQUEST_RETRY_SLEEP"
maxRequestRetriesEnv = "KIBANA_REQUEST_RETRY_COUNT"
defaultRequestRetrySleep = "1s" // sleep 1 sec between retries for HTTP requests
defaultMaxRequestRetries = "30" // maximum number of retries for HTTP requests
defaultStateDirectory = "/usr/share/elastic-agent/state" // directory that will hold the state data
)

var (
Expand Down Expand Up @@ -70,6 +73,8 @@ The following actions are possible and grouped based on the actions.
KIBANA_FLEET_USERNAME - kibana username to enable Fleet [$KIBANA_USERNAME]
KIBANA_FLEET_PASSWORD - kibana password to enable Fleet [$KIBANA_PASSWORD]
KIBANA_FLEET_CA - path to certificate authority to use with communicate with Kibana [$KIBANA_CA]
KIBANA_REQUEST_RETRY_SLEEP - specifies sleep duration taken when agent performs a request to kibana [default 1s]
KIBANA_REQUEST_RETRY_COUNT - specifies number of retries agent performs when executing a request to kibana [default 30]
Comment thread
simitt marked this conversation as resolved.

* Bootstrapping Fleet Server
This bootstraps the Fleet Server to be run by this Elastic Agent. At least one Fleet Server is required in a Fleet
Expand Down Expand Up @@ -167,7 +172,11 @@ func containerCmd(streams *cli.IOStreams, cmd *cobra.Command) error {
// if not in cloud mode, always run the agent
runAgent := !elasticCloud
// create access configuration from ENV and config files
cfg := defaultAccessConfig()
cfg, err := defaultAccessConfig()
if err != nil {
return err
}

for _, f := range []string{"fleet-setup.yml", "credentials.yml"} {
c, err := config.LoadFile(filepath.Join(paths.Config(), f))
if err != nil && !os.IsNotExist(err) {
Expand Down Expand Up @@ -230,7 +239,6 @@ func containerCmd(streams *cli.IOStreams, cmd *cobra.Command) error {
}
}

var err error
if runAgent {
// run the main elastic-agent container command
err = runContainerCmd(streams, cmd, cfg)
Expand Down Expand Up @@ -260,7 +268,7 @@ func runContainerCmd(streams *cli.IOStreams, cmd *cobra.Command, cfg setupConfig
return err
}
logInfo(streams, "Performing setup of Fleet in Kibana\n")
err = kibanaSetup(client, streams)
err = kibanaSetup(cfg, client, streams)
if err != nil {
return err
}
Expand All @@ -275,11 +283,11 @@ func runContainerCmd(streams *cli.IOStreams, cmd *cobra.Command, cfg setupConfig
return err
}
}
policy, err = kibanaFetchPolicy(client, cfg, streams)
policy, err = kibanaFetchPolicy(cfg, client, streams)
if err != nil {
return err
}
token, err = kibanaFetchToken(client, policy, streams, cfg.Fleet.TokenName)
token, err = kibanaFetchToken(cfg, client, policy, streams, cfg.Fleet.TokenName)
if err != nil {
return err
}
Expand Down Expand Up @@ -391,30 +399,30 @@ func buildFleetServerConnStr(cfg fleetServerConfig) (string, error) {
return fmt.Sprintf("%s://%s:%s@%s%s", u.Scheme, cfg.Elasticsearch.Username, cfg.Elasticsearch.Password, u.Host, path), nil
}

func kibanaSetup(client *kibana.Client, streams *cli.IOStreams) error {
err := performPOST(client, "/api/fleet/setup", streams.Err, "Kibana Fleet setup")
func kibanaSetup(cfg setupConfig, client *kibana.Client, streams *cli.IOStreams) error {
err := performPOST(cfg, client, "/api/fleet/setup", streams.Err, "Kibana Fleet setup")
if err != nil {
return err
}
err = performPOST(client, "/api/fleet/agents/setup", streams.Err, "Kibana Fleet Agents setup")
err = performPOST(cfg, client, "/api/fleet/agents/setup", streams.Err, "Kibana Fleet Agents setup")
if err != nil {
return err
}
return nil
}

func kibanaFetchPolicy(client *kibana.Client, cfg setupConfig, streams *cli.IOStreams) (*kibanaPolicy, error) {
func kibanaFetchPolicy(cfg setupConfig, client *kibana.Client, streams *cli.IOStreams) (*kibanaPolicy, error) {
var policies kibanaPolicies
err := performGET(client, "/api/fleet/agent_policies", &policies, streams.Err, "Kibana fetch policy")
err := performGET(cfg, client, "/api/fleet/agent_policies", &policies, streams.Err, "Kibana fetch policy")
if err != nil {
return nil, err
}
return findPolicy(cfg, policies.Items)
}

func kibanaFetchToken(client *kibana.Client, policy *kibanaPolicy, streams *cli.IOStreams, tokenName string) (string, error) {
func kibanaFetchToken(cfg setupConfig, client *kibana.Client, policy *kibanaPolicy, streams *cli.IOStreams, tokenName string) (string, error) {
var keys kibanaAPIKeys
err := performGET(client, "/api/fleet/enrollment-api-keys", &keys, streams.Err, "Kibana fetch token")
err := performGET(cfg, client, "/api/fleet/enrollment-api-keys", &keys, streams.Err, "Kibana fetch token")
if err != nil {
return "", err
}
Expand All @@ -423,7 +431,7 @@ func kibanaFetchToken(client *kibana.Client, policy *kibanaPolicy, streams *cli.
return "", err
}
var keyDetail kibanaAPIKeyDetail
err = performGET(client, fmt.Sprintf("/api/fleet/enrollment-api-keys/%s", key.ID), &keyDetail, streams.Err, "Kibana fetch token detail")
err = performGET(cfg, client, fmt.Sprintf("/api/fleet/enrollment-api-keys/%s", key.ID), &keyDetail, streams.Err, "Kibana fetch token detail")
if err != nil {
return "", err
}
Expand Down Expand Up @@ -513,15 +521,15 @@ func isTrue(val string) bool {
return false
}

func performGET(client *kibana.Client, path string, response interface{}, writer io.Writer, msg string) error {
func performGET(cfg setupConfig, client *kibana.Client, path string, response interface{}, writer io.Writer, msg string) error {
var lastErr error
for i := 0; i < maxRequestRetries; i++ {
for i := 0; i < cfg.Kibana.RetryMaxCount; i++ {
code, result, err := client.Connection.Request("GET", path, nil, nil, nil)
if err != nil || code != 200 {
err = fmt.Errorf("http GET request to %s%s fails: %v. Response: %s",
client.Connection.URL, path, err, truncateString(result))
fmt.Fprintf(writer, "%s failed: %s\n", msg, err)
<-time.After(requestRetrySleep)
<-time.After(cfg.Kibana.RetrySleepDuration)
continue
}
if response == nil {
Expand All @@ -532,16 +540,16 @@ func performGET(client *kibana.Client, path string, response interface{}, writer
return lastErr
}

func performPOST(client *kibana.Client, path string, writer io.Writer, msg string) error {
func performPOST(cfg setupConfig, client *kibana.Client, path string, writer io.Writer, msg string) error {
var lastErr error
for i := 0; i < maxRequestRetries; i++ {
for i := 0; i < cfg.Kibana.RetryMaxCount; i++ {
code, result, err := client.Connection.Request("POST", path, nil, nil, nil)
if err != nil || code >= 400 {
err = fmt.Errorf("http POST request to %s%s fails: %v. Response: %s",
client.Connection.URL, path, err, truncateString(result))
lastErr = err
fmt.Fprintf(writer, "%s failed: %s\n", msg, err)
<-time.After(requestRetrySleep)
<-time.After(cfg.Kibana.RetrySleepDuration)
continue
}
return nil
Expand Down Expand Up @@ -778,7 +786,9 @@ type fleetServerConfig struct {
}

type kibanaConfig struct {
Fleet kibanaFleetConfig `config:"fleet"`
Fleet kibanaFleetConfig `config:"fleet"`
RetrySleepDuration time.Duration `config:"retry_sleep_duration"`
RetryMaxCount int `config:"retry_max_count"`
}

type kibanaFleetConfig struct {
Expand All @@ -789,7 +799,17 @@ type kibanaFleetConfig struct {
Username string `config:"username"`
}

func defaultAccessConfig() setupConfig {
func defaultAccessConfig() (setupConfig, error) {
retrySleepDuration, err := envDurationWithDefault(defaultRequestRetrySleep, requestRetrySleepEnv)
if err != nil {
return setupConfig{}, err
}

retryMaxCount, err := envIntWithDefault(defaultMaxRequestRetries, maxRequestRetriesEnv)
if err != nil {
return setupConfig{}, err
}

cfg := setupConfig{
Fleet: fleetConfig{
CA: envWithDefault("", "FLEET_CA", "KIBANA_CA", "ELASTICSEARCH_CA"),
Expand Down Expand Up @@ -829,7 +849,35 @@ func defaultAccessConfig() setupConfig {
Password: envWithDefault("changeme", "KIBANA_FLEET_PASSWORD", "KIBANA_PASSWORD", "ELASTICSEARCH_PASSWORD"),
CA: envWithDefault("", "KIBANA_FLEET_CA", "KIBANA_CA", "ELASTICSEARCH_CA"),
},
RetrySleepDuration: retrySleepDuration,
RetryMaxCount: retryMaxCount,
},
}
return cfg
return cfg, nil
}

func envDurationWithDefault(defVal string, keys ...string) (time.Duration, error) {
valStr := defVal
for _, key := range keys {
val, ok := os.LookupEnv(key)
if ok {
valStr = val
break
}
}

return time.ParseDuration(valStr)
}

func envIntWithDefault(defVal string, keys ...string) (int, error) {
valStr := defVal
for _, key := range keys {
val, ok := os.LookupEnv(key)
if ok {
valStr = val
break
}
}

return strconv.Atoi(valStr)
}