diff --git a/agent/kibana/config.go b/agent/kibana/config.go index adb8aa0e32e3..ee2fba539761 100644 --- a/agent/kibana/config.go +++ b/agent/kibana/config.go @@ -26,14 +26,14 @@ import ( // Config is the configuration for the Kibana client. type Config struct { - Protocol Protocol `config:"protocol"` - SpaceID string `config:"space.id"` - Username string `config:"username"` - Password string `config:"password"` - Path string `config:"path"` - Host string `config:"host"` - Timeout time.Duration `config:"timeout"` - TLS *tlscommon.Config `config:"ssl"` + Protocol Protocol `config:"protocol" yaml:"protocol"` + SpaceID string `config:"space.id" yaml:"space.id,omitempty"` + Username string `config:"username" yaml:"username,omitempty"` + Password string `config:"password" yaml:"password,omitempty"` + Path string `config:"path" yaml:"path,omitempty"` + Host string `config:"host" yaml:"host,omitempty"` + Timeout time.Duration `config:"timeout" yaml:"timeout,omitempty"` + TLS *tlscommon.Config `config:"ssl" yaml:"ssl,omitempty"` } // Protocol define the protocol to use to make the connection. (Either HTTPS or HTTP) diff --git a/x-pack/agent/_meta/agent.fleet.yml b/x-pack/agent/_meta/agent.fleet.yml new file mode 100644 index 000000000000..826d29624684 --- /dev/null +++ b/x-pack/agent/_meta/agent.fleet.yml @@ -0,0 +1,52 @@ +#================================ General ===================================== +# Beats is configured under Fleet, you can define most settings +# from the Kibana UI. You can update this file to configure the settings that +# are not supported by Fleet. +management: + # + mode: "fleet" + +# reattach collection path is a way of tracking started processes by agent. +reattach_collection_path: "/home/elastic/reattach" + +download: + # operating system [linux, windows, darwin] + operating_system: "linux" + # target architecture [32, 64] + arch: "32" + # source of the artifacts, requires elastic like structure and naming of the binaries + # e.g /windows-x86.zip + sourceURI: "https://artifacts.elastic.co/downloads/beats/" + # path to the directory containing downloaded packages + target_directory: "/home/elastic/downloads" + # timeout for downloading package + timeout: 30s + # file path to a public key used for verifying downloaded artifacts + # if not file is present agent will try to load public key from elastic.co website. + pgpfile: "/home/elastic/elastic.pgp"# install describes the location of installed packages/programs. It is also used + # for reading program specifications. + install_path: "/" + +process: + # minimal port number for spawned processes + min_port: 10000 + # maximum port number for spawned processes + max_port: 30000 + # timeout for creating new processes. when process is not successfully created by this timeout + # start operation is considered a failure + spawn_timeout: 30s + +retry: + # enabled determines whether retry is possible. Default is false. + enabled: true + # retries_count specifies number of retries. Default is 3. + # Retry count of 1 means it will be retried one time after one failure. + retries_count: 3 + # delay specifies delay in ms between retries. Default is 30s + delay: 30s + # max_delay specifies maximum delay in ms between retries. Default is 300s + max_delay: 5m + # Exponential determines whether delay is treated as exponential. + # With 30s delay and 3 retries: 30, 60, 120s + # Default is false + exponential: false diff --git a/x-pack/agent/dev-tools/cmd/buildfleetcfg/buildfleetcfg.go b/x-pack/agent/dev-tools/cmd/buildfleetcfg/buildfleetcfg.go new file mode 100644 index 000000000000..14a72e760915 --- /dev/null +++ b/x-pack/agent/dev-tools/cmd/buildfleetcfg/buildfleetcfg.go @@ -0,0 +1,111 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package main + +import ( + "bytes" + "flag" + "fmt" + "go/format" + "io/ioutil" + "os" + "text/template" + + "github.com/elastic/beats/licenses" + "github.com/elastic/beats/x-pack/agent/pkg/packer" +) + +var ( + input string + output string + license string +) + +func init() { + flag.StringVar(&input, "in", "", "config to embed") + flag.StringVar(&output, "out", "-", "Output path. \"-\" means writing to stdout") + flag.StringVar(&license, "license", "Elastic", "License header for generated file.") +} + +var tmpl = template.Must(template.New("cfg").Parse(` +{{ .License }} +// Code generated by dev-tools/cmd/buildfleetcfg/buildfleetcfg.go - DO NOT EDIT. + +package application + +import "github.com/elastic/beats/x-pack/agent/pkg/packer" + +// DefaultAgentFleetConfig is the content of the default configuration when we enroll a beat, the agent.yml +// will be replaced with this variables. +var DefaultAgentFleetConfig []byte + +func init() { + // Packed File + {{ range $i, $f := .Files -}} + // {{ $f }} + {{ end -}} + unpacked := packer.MustUnpack("{{ .Pack }}") + raw, ok := unpacked["_meta/agent.fleet.yml"] + if !ok { + // ensure we have something loaded. + panic("agent.fleet.yml is not included in the binary") + } + DefaultAgentFleetConfig = raw +} +`)) + +func main() { + flag.Parse() + + if len(input) == 0 { + fmt.Fprintln(os.Stderr, "Invalid input source") + os.Exit(1) + } + + l, err := licenses.Find(license) + if err != nil { + fmt.Fprintf(os.Stderr, "problem to retrieve the license, error: %+v", err) + os.Exit(1) + } + + data, err := gen(input, l) + if err != nil { + fmt.Fprintf(os.Stderr, "Error while generating the file, err: %+v\n", err) + os.Exit(1) + } + + if output == "-" { + os.Stdout.Write(data) + return + } + + ioutil.WriteFile(output, data, 0640) + + return +} + +func gen(path string, l string) ([]byte, error) { + pack, files, err := packer.Pack(input) + if err != nil { + return nil, err + } + + if len(files) > 1 { + return nil, fmt.Errorf("Can only embed a single configuration file") + } + + var buf bytes.Buffer + tmpl.Execute(&buf, struct { + Pack string + Files []string + License string + }{ + Pack: pack, + Files: files, + License: l, + }) + + return format.Source(buf.Bytes()) +} diff --git a/x-pack/agent/magefile.go b/x-pack/agent/magefile.go index aa92950d6d82..b76d488f56bc 100644 --- a/x-pack/agent/magefile.go +++ b/x-pack/agent/magefile.go @@ -315,7 +315,7 @@ func flags() string { // Update is an alias for executing fields, dashboards, config, includes. func Update() { - mg.SerialDeps(Config, BuildSpec) + mg.SerialDeps(Config, BuildSpec, BuildFleetCfg) } // CrossBuild cross-builds the beat for all target platforms. @@ -405,3 +405,13 @@ func UnitTest() { func IntegTest() { os.Create(filepath.Join("build", "TEST-go-integration.out")) } + +// BuildFleetCfg embed the default fleet configuration as part of the binary. +func BuildFleetCfg() error { + goF := filepath.Join("dev-tools", "cmd", "buildfleetcfg", "buildfleetcfg.go") + in := filepath.Join("_meta", "agent.fleet.yml") + out := filepath.Join("pkg", "agent", "application", "configuration_embed.go") + + fmt.Printf(">> BuildFleetCfg %s to %s\n", in, out) + return RunGo("run", goF, "--in", in, "--out", out) +} diff --git a/x-pack/agent/pkg/agent/application/application.go b/x-pack/agent/pkg/agent/application/application.go index 316dd334f904..dd7ca228b438 100644 --- a/x-pack/agent/pkg/agent/application/application.go +++ b/x-pack/agent/pkg/agent/application/application.go @@ -9,10 +9,6 @@ import ( "github.com/elastic/beats/x-pack/agent/pkg/config" "github.com/elastic/beats/x-pack/agent/pkg/core/logger" - "github.com/elastic/beats/x-pack/agent/pkg/fleetapi" - reporting "github.com/elastic/beats/x-pack/agent/pkg/reporter" - fleetreporter "github.com/elastic/beats/x-pack/agent/pkg/reporter/fleet" - logreporter "github.com/elastic/beats/x-pack/agent/pkg/reporter/log" ) // Application is the application interface implemented by the different running mode. @@ -53,89 +49,18 @@ func createApplication( return nil, errors.Wrap(err, "initiating application") } - client, err := getKibanaClient(log, mgmt) - if err != nil { - return nil, errors.Wrap(err, "initiating application") - } - - reporter, err := getReporter(c.Management, log, getAgentID(), client) - if err != nil { - return nil, err - } - - router, err := newRouter(log, streamFactory(config, client, reporter)) - if err != nil { - return nil, errors.Wrap(err, "initiating application") - } - switch mgmt.Mode { case localMode: log.Info("Agent is managed locally") - return newLocal(log, pathConfigFile, c.Management, router) + return newLocal(log, pathConfigFile, c.Management) case fleetMode: log.Info("Agent is managed by Fleet") - return newManaged(log, c.Management, router, client) + return newManaged(log, c.Management) default: return nil, ErrInvalidMgmtMode } } -func getKibanaClient(log *logger.Logger, c *ManagementConfig) (_ sender, err error) { - if c.Mode == localMode { - // fleet not configured client not needed - return nil, nil - } - - defer func() { - if err != nil { - err = errors.Wrap(err, "fail to create the API client") - } - }() - - if c.Fleet == nil { - return nil, errors.New("fleet mode enabled but management.fleet not specified") - } - - kibanaConfig := c.Fleet.Kibana - if kibanaConfig == nil { - return nil, errors.New("fleet mode enabled but management.fleet.kibana not specified") - } - - if c.Fleet.AccessAPIKey != "" { - rawConfig, err := config.NewConfigFrom(kibanaConfig) - if err != nil { - return nil, err - } - - return fleetapi.NewAuthWithConfig(log, rawConfig, c.Fleet.AccessAPIKey) - } - - return fleetapi.NewWithConfig(log, kibanaConfig) -} - -func getReporter(cfg *config.Config, log *logger.Logger, id string, client sender) (reporter, error) { - c := defaultManagementConfig() - if err := cfg.Unpack(&c); err != nil { - return nil, err - } - - backends := make([]reporting.Backend, 0, 2) - backends = append(backends, logreporter.NewReporter(log, c.Reporting.LogReporting)) - - if c.Mode == fleetMode && c.Reporting.FleetManagement != nil && c.Reporting.FleetManagement.Enabled { - agentID := getAgentID() - - fr, err := fleetreporter.NewReporter(agentID, log, c.Reporting.FleetManagement, client) - if err != nil { - return nil, err - } - - backends = append(backends, fr) - } - - return reporting.NewReporter(log, id, backends...), nil -} - func getAgentID() string { // TODO: implement correct way of acquiring agent ID return "default" diff --git a/x-pack/agent/pkg/agent/application/config.go b/x-pack/agent/pkg/agent/application/config.go index c898ddecef39..2741b84e4e42 100644 --- a/x-pack/agent/pkg/agent/application/config.go +++ b/x-pack/agent/pkg/agent/application/config.go @@ -8,11 +8,19 @@ import ( "fmt" "time" + "github.com/pkg/errors" + + "github.com/elastic/beats/agent/kibana" "github.com/elastic/beats/x-pack/agent/pkg/config" fleetreporter "github.com/elastic/beats/x-pack/agent/pkg/reporter/fleet" logreporter "github.com/elastic/beats/x-pack/agent/pkg/reporter/log" ) +// TODO(ph) correctly setup global path. +func fleetAgentConfigPath() string { + return "fleet.yml" +} + // Config define the configuration of the Agent. type Config struct { Management *config.Config `config:"management"` @@ -49,29 +57,20 @@ func (m *managementMode) Unpack(v string) error { // ManagementConfig defines the options for the running of the beats. type ManagementConfig struct { - Mode managementMode `config:"mode"` - Fleet *fleetConfig `config:"fleet"` - Reporting *reportingConfig `config:"reporting"` -} - -type reportingConfig struct { - LogReporting *logreporter.Config `config:"log"` - FleetManagement *fleetreporter.ManagementConfig `config:"fleet"` + Mode managementMode `config:"mode"` + Reporting *logreporter.Config `config:"reporting.log"` } func defaultManagementConfig() *ManagementConfig { return &ManagementConfig{ Mode: localMode, - Reporting: &reportingConfig{ - LogReporting: logreporter.DefaultLogConfig(), - FleetManagement: fleetreporter.DefaultFleetManagementConfig(), - }, } } type localConfig struct { - Reload *reloadConfig `config:"reload"` - Path string `config:"path"` + Reload *reloadConfig `config:"reload"` + Path string `config:"path"` + Reporting *logreporter.Config `config:"reporting"` } type reloadConfig struct { @@ -94,5 +93,57 @@ func localConfigDefault() *localConfig { Enabled: true, Period: 10 * time.Second, }, + Reporting: logreporter.DefaultLogConfig(), } } + +// FleetAgentConfig is the internal configuration of the agent after the enrollment is done, +// this configuration is not exposed in anyway in the agent.yml and is only internal configuration. +type FleetAgentConfig struct { + API *APIAccess `config:"api" yaml:"api"` + Reporting *LogReporting `config:"reporting" yaml:"reporting"` +} + +// APIAccess contains the required details to connect to the Kibana endpoint. +type APIAccess struct { + AccessAPIKey string `config:"access_api_key" yaml:"access_api_key"` + Kibana *kibana.Config `config:"kibana" yaml:"kibana"` +} + +// LogReporting define the fleet options for log reporting. +type LogReporting struct { + Log *logreporter.Config `config:"log" yaml:"log"` + Fleet *fleetreporter.ManagementConfig `config:"fleet" yaml:"fleet"` +} + +// Validate validates the required fields for accessing the API. +func (e *APIAccess) Validate() error { + if len(e.AccessAPIKey) == 0 { + return errors.New("empty access token") + } + + if e.Kibana == nil || len(e.Kibana.Host) == 0 { + return errors.New("missing Kibana host configuration") + } + + return nil +} + +func defaultFleetAgentConfig() *FleetAgentConfig { + return &FleetAgentConfig{ + Reporting: &LogReporting{ + Log: logreporter.DefaultLogConfig(), + Fleet: fleetreporter.DefaultFleetManagementConfig(), + }, + } +} + +func createFleetConfigFromEnroll(access *APIAccess) (*FleetAgentConfig, error) { + if err := access.Validate(); err != nil { + return nil, errors.Wrap(err, "invalid enrollment options") + } + + cfg := defaultFleetAgentConfig() + cfg.API = access + return cfg, nil +} diff --git a/x-pack/agent/pkg/agent/application/config_storage.go b/x-pack/agent/pkg/agent/application/config_storage.go deleted file mode 100644 index 59b52666b80b..000000000000 --- a/x-pack/agent/pkg/agent/application/config_storage.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package application - -import "github.com/elastic/beats/agent/kibana" - -type fleetConfig struct { - AccessAPIKey string `config:"access_api_key"` - Kibana *kibana.Config `config:"kibana"` -} - -type store interface { - Save(fleetConfig) error -} - -// NullStore this is only use to split the work into multiples PRs. -// TODO(ph) make real implementation this is just to make test green and iterate. -type NullStore struct{} - -// Save takes the fleetConfig and persist it, will return an errors on failure. -func (m *NullStore) Save(_ fleetConfig) error { - return nil -} diff --git a/x-pack/agent/pkg/agent/application/configuration_embed.go b/x-pack/agent/pkg/agent/application/configuration_embed.go new file mode 100644 index 000000000000..89c78089bb73 --- /dev/null +++ b/x-pack/agent/pkg/agent/application/configuration_embed.go @@ -0,0 +1,25 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// Code generated by dev-tools/cmd/buildfleetcfg/buildfleetcfg.go - DO NOT EDIT. + +package application + +import "github.com/elastic/beats/x-pack/agent/pkg/packer" + +// DefaultAgentFleetConfig is the content of the default configuration when we enroll a beat, the agent.yml +// will be replaced with this variables. +var DefaultAgentFleetConfig []byte + +func init() { + // Packed File + // _meta/agent.fleet.yml + unpacked := packer.MustUnpack("eJyMVt1yozoavN/HmAc4y0+YGvbOsoOQY8gaT9DPzRaSHIEtYerYYMPWvvuWhOPMnJo6NReuVEDS93V/3S3+++U/Zn+p/lmpfXv5413v95c/RqO//OsLmrz4399/74eepeamHCm+qt/d8/htTwf0vNaUFBOCekIwH7hhHQvLkeGtkjg6MoJUYW6ake0ZpXqQO3Cg+EkxWJoKRxpBfxDhVomg9CTUPQuzFzQCI0x8QWlxYjuwqfC65uZZlW/HHiWfZ0gCjpQU2q6rSKaY0We2Ax4fwYEHkamw9IV5UxLWGqW5lmnRcSMnu56Sra1T2/fcxB5Kc1+kYBBtodkSNPsdgByWWi6fXjhOeoql5rjs5er0gpaLw7JZKI7jI/t+UsiwM8Olh9TJnjkynHgSJodqafu4nRnOvQrHPUpBLaFStle6AyElR8UDqmS6rmlw6bgRSoRFfe/hKkx8YCSfmF3fHpXrod32y9Zx7lGcn0iQDxzeNA2LjgfRu4CJV61OCjXxiQe+3gTlmRJbPxuEKWuZFjUNVLNU3ZGHsucwrtkdE4LxlZF17eYwgmlPco9hT+Hg1vG2fNosQVjhyO6bNkvL/dr+Lx0XIdDCJF6Fo5aEeSTCQnPHzbwXqZNCy0xJmIwsKD0Ek5EGdSdheZBpObId4NmEzmhFPbI9qfm9+oqWaMoaNPeX5oMk6wPbgYE1wKvgm6Jk7VWY1TQsbE8jI4lfkbUWI9AcJpOE+oDgrauCN8vtKHHuSbLWCCY9W4KeYt/x7uZgdQLXHTfJWOFysrjQCPTGCLUJZcdNMcgwu+xXKti0XScWJ/XoqSx3O4e39mQKptfm2/BTb+Yxh54G3wYG45Cb20BxMW2C2UOb8QPnrBMJv917KjphyoOE8bjfOW17FOveco1gMcggOvMgOVrPCau7IGkZyV7QEniUrFtGincG9Wh1yMN19Nosmk1QDxy/DQzfahEWHR3jTz2kWXPH7lXY1zy082KDaMDnmlkjV4rzP60u7XoJ9YXh2Jerk8om8MGfqfBN/4iJ7sBV4vW5wpmqgjJCaTnZ3pmJR5QyLYw2e+zO/6FeebSakFAbinNPXGc9VZjO/oWs4/DNeUukay2CspdLULP5b1jh2xmlheXP5oPjC6XAp+bW0RH8ychRsXY98N2nbjYmH1AqNW3zTsK33uIRUF5dznz/tcc++YyuLFw0aAQdb3OP2vqw0CLIxwo7fTr9chgfKLHefXIarHA0SZicucWbApsLFsO0CcHIAzlS7E+bBqzlEnRiBDVPbY+Ov7uH3ZxsBh2dPtKPfZ4SIdA00KbCufX4wNvMYXrUTOIrJcXJ6WNEL8vW7s21CLOv91leKhx1HCdn5LJyq3hbXqgpx3vdScAk5Mb1ft+bazcr6Hdcx1cerr3XZnHLVotrtphnyHHyVGHf5zvg3iMY+dxy1ADDQ2T7rmUQ/TUTrb4vlNTvAsbjrLnFNVst7h4qOo7LQZK7rmDucnn2TKTl+NNZPUrlieEnq52BBuUkRsdvz+3dFJYH9yxgPoe36OMsq1mbyxLWnfjBK3euLKejXIJBwHL8mLHL/iDuRaCPjKwtphpBVlf4Zu8qi2nmUMeP816bxZSl2YvNfZmuo9d7ljIc1dTerUtwZKTQwtgsKyeHhRQne/cKU3qiPc6+gPEkAt1w+Naj50Izk/g83ar57kzOIpg1/jh3dbJ3k8+2cz13ltFa+PGBh85Xk4DlocKss3fUpxZig1LXa+e4/UutbHyaZ5TMeOx3g8SR5/atnhXHZc3bTFVkq2Sgz3wJGraz2V7YXD4iGPf2nrf8IJgYCV1N+8xQrM823zeWo0cP+TsNYp+3269old19UlifRyjNrwznHbO4RnC0/t3vQMcbcBEjaBgpQobL/m/x3LOOQZsDR1vj+pFPVtsk+GUtq92Ok/Ly6AXqHkF/QtDWktrp8YPz8UkVsDSUlOfZ+4spW811f6zx2ix8fp9XQeorDyLN26Kz33cMlh4ja+thWz+sYOlVjrtHfftt9Kltq4ufz+jv2iYVKU4O5ydnNTdblY2POU1Ot8uryg+LM1o9j1bDjvtfa89qX+9TMHBT9hLqmq9Oj3df/veP/wcAAP//Snq6Uw==") + raw, ok := unpacked["_meta/agent.fleet.yml"] + if !ok { + // ensure we have something loaded. + panic("agent.fleet.yml is not included in the binary") + } + DefaultAgentFleetConfig = raw +} diff --git a/x-pack/agent/pkg/agent/application/enroll_cmd.go b/x-pack/agent/pkg/agent/application/enroll_cmd.go index 3b522d00e38a..a57de4174a7e 100644 --- a/x-pack/agent/pkg/agent/application/enroll_cmd.go +++ b/x-pack/agent/pkg/agent/application/enroll_cmd.go @@ -5,19 +5,26 @@ package application import ( + "bytes" "io" "net/http" "net/url" "runtime" "github.com/pkg/errors" + "gopkg.in/yaml.v2" "github.com/elastic/beats/agent/kibana" "github.com/elastic/beats/agent/release" + "github.com/elastic/beats/x-pack/agent/pkg/agent/storage" "github.com/elastic/beats/x-pack/agent/pkg/core/logger" "github.com/elastic/beats/x-pack/agent/pkg/fleetapi" ) +type store interface { + Save(io.Reader) error +} + type clienter interface { Send( method string, @@ -48,7 +55,37 @@ func NewEnrollCmd( enrollAPIKey string, id string, userProvidedMetadata map[string]interface{}, - configStore store, + configPath string, +) (*EnrollCmd, error) { + + store := storage.NewReplaceOnSuccessStore( + configPath, + DefaultAgentFleetConfig, + storage.NewEncryptedDiskStore(fleetAgentConfigPath(), []byte("")), + ) + + return NewEnrollCmdWithStore( + log, + url, + CAs, + enrollAPIKey, + id, + userProvidedMetadata, + configPath, + store, + ) +} + +//NewEnrollCmdWithStore creates an new enrollment and accept a custom store. +func NewEnrollCmdWithStore( + log *logger.Logger, + url string, + CAs []string, + enrollAPIKey string, + id string, + userProvidedMetadata map[string]interface{}, + configPath string, + store store, ) (*EnrollCmd, error) { cfg, err := kibana.NewConfigFromURL(url, CAs) @@ -74,7 +111,7 @@ func NewEnrollCmd( id: id, userProvidedMetadata: userProvidedMetadata, kibanaConfig: cfg, - configStore: configStore, + configStore: store, }, nil } @@ -97,11 +134,18 @@ func (c *EnrollCmd) Execute() error { return errors.Wrap(err, "fail to execute request to Kibana") } - if err := c.configStore.Save(fleetConfig{ + fleetConfig, err := createFleetConfigFromEnroll(&APIAccess{ AccessAPIKey: resp.Item.AccessAPIKey, Kibana: c.kibanaConfig, - }); err != nil { - return errors.Wrap(err, "could not save credentials") + }) + + reader, err := yamlToReader(fleetConfig) + if err != nil { + return err + } + + if err := c.configStore.Save(reader); err != nil { + return errors.Wrap(err, "could not save enroll credentials") } return nil @@ -113,3 +157,11 @@ func metadata() map[string]interface{} { "version": release.Version(), } } + +func yamlToReader(in interface{}) (io.Reader, error) { + data, err := yaml.Marshal(in) + if err != nil { + return nil, errors.Wrap(err, "could not marshal to YAML") + } + return bytes.NewReader(data), nil +} diff --git a/x-pack/agent/pkg/agent/application/enroll_cmd_test.go b/x-pack/agent/pkg/agent/application/enroll_cmd_test.go index 37a6f90f9c79..c8c2ff0d07ca 100644 --- a/x-pack/agent/pkg/agent/application/enroll_cmd_test.go +++ b/x-pack/agent/pkg/agent/application/enroll_cmd_test.go @@ -5,7 +5,9 @@ package application import ( + "bytes" "crypto/tls" + "io" "io/ioutil" "net" "net/http" @@ -13,33 +15,88 @@ import ( "strconv" "testing" + "github.com/pkg/errors" "github.com/stretchr/testify/require" - "github.com/elastic/beats/agent/kibana" + "github.com/elastic/beats/x-pack/agent/pkg/config" "github.com/elastic/beats/x-pack/agent/pkg/core/logger" "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/authority" ) type mockStore struct { - Config fleetConfig - Err error - Called bool + Err error + Called bool + Content []byte } -func (m *mockStore) Save(c fleetConfig) error { +func (m *mockStore) Save(in io.Reader) error { m.Called = true if m.Err != nil { return m.Err } - m.Config = c + buf := new(bytes.Buffer) + io.Copy(buf, in) + m.Content = buf.Bytes() return nil } func TestEnroll(t *testing.T) { log, _ := logger.New() - t.Run("succesfully enroll with TLS and save access token in the store", withTLSServer( + t.Run("fail to save is propagated", withTLSServer( + func(t *testing.T) *http.ServeMux { + mux := http.NewServeMux() + mux.HandleFunc("/api/fleet/agents/enroll", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte(` +{ + "action": "created", + "success": true, + "item": { + "id": "a9328860-ec54-11e9-93c4-d72ab8a69391", + "active": true, + "policy_id": "69f3f5a0-ec52-11e9-93c4-d72ab8a69391", + "type": "PERMANENT", + "enrolled_at": "2019-10-11T18:26:37.158Z", + "user_provided_metadata": { + "custom": "customize" + }, + "local_metadata": { + "platform": "linux", + "version": "8.0.0" + }, + "actions": [], + "access_api_key": "my-access-token" + } +}`)) + }) + return mux + }, func(t *testing.T, caBytes []byte, host string) { + caFile, err := bytesToTMPFile(caBytes) + require.NoError(t, err) + defer os.Remove(caFile) + + url := "https://" + host + store := &mockStore{Err: errors.New("fail to save")} + cmd, err := NewEnrollCmdWithStore( + log, + url, + []string{caFile}, + "my-enrollment-token", + "my-id", + map[string]interface{}{"custom": "customize"}, + "", + store, + ) + require.NoError(t, err) + + err = cmd.Execute() + require.Error(t, err) + }, + )) + + t.Run("succesfully enroll with TLS and save access api key in the store", withTLSServer( func(t *testing.T) *http.ServeMux { mux := http.NewServeMux() mux.HandleFunc("/api/fleet/agents/enroll", func(w http.ResponseWriter, r *http.Request) { @@ -74,13 +131,14 @@ func TestEnroll(t *testing.T) { url := "https://" + host store := &mockStore{} - cmd, err := NewEnrollCmd( + cmd, err := NewEnrollCmdWithStore( log, url, []string{caFile}, "my-enrollment-api-key", "my-id", map[string]interface{}{"custom": "customize"}, + "", store, ) require.NoError(t, err) @@ -88,16 +146,17 @@ func TestEnroll(t *testing.T) { err = cmd.Execute() require.NoError(t, err) - require.True(t, store.Called) - require.Equal(t, store.Config.AccessAPIKey, "my-access-api-key") - require.Equal(t, store.Config.Kibana.Host, host) - require.Equal(t, store.Config.Kibana.Protocol, kibana.Protocol("https")) - require.Equal(t, store.Config.Kibana.Username, "") - require.Equal(t, store.Config.Kibana.Password, "") + config, err := readConfig(store.Content) + + require.NoError(t, err) + require.Equal(t, "my-access-api-key", config.API.AccessAPIKey) + require.Equal(t, host, config.API.Kibana.Host) + require.Equal(t, "", config.API.Kibana.Username) + require.Equal(t, "", config.API.Kibana.Password) }, )) - t.Run("succesfully enroll without TLS and save access token in the store", withServer( + t.Run("succesfully enroll without TLS and save access api key in the store", withServer( func(t *testing.T) *http.ServeMux { mux := http.NewServeMux() mux.HandleFunc("/api/fleet/agents/enroll", func(w http.ResponseWriter, r *http.Request) { @@ -128,13 +187,14 @@ func TestEnroll(t *testing.T) { }, func(t *testing.T, host string) { url := "http://" + host store := &mockStore{} - cmd, err := NewEnrollCmd( + cmd, err := NewEnrollCmdWithStore( log, url, make([]string, 0), "my-enrollment-api-key", "my-id", map[string]interface{}{"custom": "customize"}, + "", store, ) require.NoError(t, err) @@ -143,11 +203,14 @@ func TestEnroll(t *testing.T) { require.NoError(t, err) require.True(t, store.Called) - require.Equal(t, store.Config.AccessAPIKey, "my-access-api-key") - require.Equal(t, store.Config.Kibana.Host, host) - require.Equal(t, store.Config.Kibana.Protocol, kibana.Protocol("http")) - require.Equal(t, store.Config.Kibana.Username, "") - require.Equal(t, store.Config.Kibana.Password, "") + + config, err := readConfig(store.Content) + + require.NoError(t, err) + require.Equal(t, "my-access-api-key", config.API.AccessAPIKey) + require.Equal(t, host, config.API.Kibana.Host) + require.Equal(t, "", config.API.Kibana.Username) + require.Equal(t, "", config.API.Kibana.Password) }, )) @@ -166,13 +229,14 @@ func TestEnroll(t *testing.T) { }, func(t *testing.T, host string) { url := "http://" + host store := &mockStore{} - cmd, err := NewEnrollCmd( + cmd, err := NewEnrollCmdWithStore( log, url, make([]string, 0), "my-enrollment-token", "my-id", map[string]interface{}{"custom": "customize"}, + "", store, ) require.NoError(t, err) @@ -247,3 +311,17 @@ func bytesToTMPFile(b []byte) (string, error) { return f.Name(), nil } + +func readConfig(raw []byte) (*FleetAgentConfig, error) { + r := bytes.NewReader(raw) + config, err := config.NewConfigFrom(r) + if err != nil { + return nil, err + } + + cfg := &FleetAgentConfig{} + if err := config.Unpack(cfg); err != nil { + return nil, err + } + return cfg, nil +} diff --git a/x-pack/agent/pkg/agent/application/local_mode.go b/x-pack/agent/pkg/agent/application/local_mode.go index 33808903fa53..98fe2315a0ff 100644 --- a/x-pack/agent/pkg/agent/application/local_mode.go +++ b/x-pack/agent/pkg/agent/application/local_mode.go @@ -11,6 +11,8 @@ import ( "github.com/elastic/beats/x-pack/agent/pkg/config" "github.com/elastic/beats/x-pack/agent/pkg/core/logger" "github.com/elastic/beats/x-pack/agent/pkg/dir" + reporting "github.com/elastic/beats/x-pack/agent/pkg/reporter" + logreporter "github.com/elastic/beats/x-pack/agent/pkg/reporter/log" ) type emitterFunc func([]string) error @@ -42,7 +44,6 @@ func newLocal( log *logger.Logger, pathConfigFile string, config *config.Config, - router *router, ) (*Local, error) { var err error if log == nil { @@ -52,11 +53,22 @@ func newLocal( } } + agentID := getAgentID() + c := localConfigDefault() if err := config.Unpack(c); err != nil { return nil, errors.Wrap(err, "initialize local mode") } + logR := logreporter.NewReporter(log, c.Reporting) + + reporter := reporting.NewReporter(log, agentID, logR) + + router, err := newRouter(log, streamFactory(config, nil, reporter)) + if err != nil { + return nil, errors.Wrap(err, "fail to initialize pipeline router") + } + discover := discoverer(pathConfigFile, c.Path) emit := emitter(log, router, injectMonitoring) diff --git a/x-pack/agent/pkg/agent/application/managed_mode.go b/x-pack/agent/pkg/agent/application/managed_mode.go index 378bced59b1b..875735ce2940 100644 --- a/x-pack/agent/pkg/agent/application/managed_mode.go +++ b/x-pack/agent/pkg/agent/application/managed_mode.go @@ -5,39 +5,113 @@ package application import ( + "io" + "net/http" + "net/url" + "github.com/pkg/errors" + "github.com/elastic/beats/x-pack/agent/pkg/agent/storage" "github.com/elastic/beats/x-pack/agent/pkg/config" "github.com/elastic/beats/x-pack/agent/pkg/core/logger" -) + "github.com/elastic/beats/x-pack/agent/pkg/fleetapi" -// Errors generated by the application. -var ( - ErrManagedIsNotImplemented = errors.New("fleet managed mode is not implemented in the agent") + reporting "github.com/elastic/beats/x-pack/agent/pkg/reporter" + fleetreporter "github.com/elastic/beats/x-pack/agent/pkg/reporter/fleet" + logreporter "github.com/elastic/beats/x-pack/agent/pkg/reporter/log" ) +type apiClient interface { + Send( + method string, + path string, + params url.Values, + headers http.Header, + body io.Reader, + ) (*http.Response, error) +} + // Managed application, when the application is run in managed mode, most of the configuration are // coming from the Fleet App. type Managed struct { log *logger.Logger - client sender + Config FleetAgentConfig + api apiClient } func newManaged( log *logger.Logger, rawConfig *config.Config, - router *router, - client sender, ) (*Managed, error) { - return nil, ErrManagedIsNotImplemented + + agentID := getAgentID() + + path := fleetAgentConfigPath() + + // TODO(ph): Define the encryption password. + store := storage.NewEncryptedDiskStore(path, []byte("")) + reader, err := store.Load() + if err != nil { + return nil, errors.Wrap(err, "could not initialize config store") + } + + config, err := config.NewConfigFrom(reader) + if err != nil { + return nil, errors.Wrapf(err, "fail to read configuration %s for the agent", path) + } + + cfg := defaultFleetAgentConfig() + if err := config.Unpack(cfg); err != nil { + return nil, errors.Wrapf(err, "fail to unpack configuration from %s", path) + } + + client, err := fleetapi.NewAuthWithConfig(log, cfg.API.AccessAPIKey, cfg.API.Kibana) + if err != nil { + return nil, errors.Wrap(err, "fail to create API client") + } + + reporter, err := createFleetReporters(log, cfg, agentID, client) + if err != nil { + return nil, errors.Wrap(err, "fail to create reporters") + } + + // TODO(michal, ph) Link router with configuration + _, err = newRouter(log, streamFactory(config, client, reporter)) + if err != nil { + return nil, errors.Wrap(err, "fail to initialize pipeline router") + } + + return &Managed{ + log: log, + api: client, + }, nil } // Start starts a managed agent. func (m *Managed) Start() error { - return ErrManagedIsNotImplemented + m.log.Info("Agent is starting") + defer m.log.Info("Agent is stopped") + return nil } // Stop stops a managed agent. func (m *Managed) Stop() error { - return ErrManagedIsNotImplemented + return nil +} + +func createFleetReporters( + log *logger.Logger, + cfg *FleetAgentConfig, + agentID string, + client apiClient, +) (reporter, error) { + + logR := logreporter.NewReporter(log, cfg.Reporting.Log) + + fleetR, err := fleetreporter.NewReporter(agentID, log, cfg.Reporting.Fleet, client) + if err != nil { + return nil, err + } + + return reporting.NewReporter(log, agentID, logR, fleetR), nil } diff --git a/x-pack/agent/pkg/agent/cmd/enroll.go b/x-pack/agent/pkg/agent/cmd/enroll.go index e92e40685077..df7a77b9d5b4 100644 --- a/x-pack/agent/pkg/agent/cmd/enroll.go +++ b/x-pack/agent/pkg/agent/cmd/enroll.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" "github.com/spf13/cobra" + c "github.com/elastic/beats/libbeat/common/cli" "github.com/elastic/beats/x-pack/agent/pkg/agent/application" "github.com/elastic/beats/x-pack/agent/pkg/cli" "github.com/elastic/beats/x-pack/agent/pkg/config" @@ -24,25 +25,37 @@ func newEnrollCommandWithArgs(flags *globalFlags, _ []string, streams *cli.IOStr Long: "This will enroll the Agent into Fleet.", Args: cobra.ExactArgs(2), Run: func(c *cobra.Command, args []string) { - if err := enroll(c, flags, args); err != nil { + if err := enroll(streams, c, flags, args); err != nil { fmt.Fprintf(streams.Err, "%v\n", err) os.Exit(1) } - fmt.Fprintf(streams.Out, "Successfully enrolled the Agent.\n") }, } - cmd.Flags().StringP("ca", "", "", "Comma separated list of root certificate for server verifications") + cmd.Flags().StringP("certificate-authorities", "a", "", "Comma separated list of root certificate for server verifications") + cmd.Flags().BoolP("force", "f", false, "Force overwrite the current and do not prompt for confirmation") return cmd } -func enroll(cmd *cobra.Command, flags *globalFlags, args []string) error { +func enroll(streams *cli.IOStreams, cmd *cobra.Command, flags *globalFlags, args []string) error { config, err := config.LoadYAML(flags.PathConfigFile) if err != nil { return errors.Wrapf(err, "could not read configuration file %s", flags.PathConfigFile) } + force, _ := cmd.Flags().GetBool("force") + if !force { + confirm, err := c.Confirm("This will replace your current settings. Do you want to continue?", true) + if err != nil { + return errors.Wrap(err, "problem reading prompt response") + } + if !confirm { + fmt.Fprintln(streams.Out, "Enrollment was canceled by the user") + return nil + } + } + logger, err := logger.NewFromConfig(config) if err != nil { return err @@ -51,7 +64,7 @@ func enroll(cmd *cobra.Command, flags *globalFlags, args []string) error { url := args[0] enrollmentToken := args[1] - caStr, _ := cmd.Flags().GetString("ca") + caStr, _ := cmd.Flags().GetString("certificate-authorities") CAs := cli.StringToSlice(caStr) c, err := application.NewEnrollCmd( @@ -61,11 +74,17 @@ func enroll(cmd *cobra.Command, flags *globalFlags, args []string) error { enrollmentToken, "", nil, - &application.NullStore{}, // TODO(ph): persist to local file. + flags.PathConfigFile, ) if err != nil { return err } - return c.Execute() + err = c.Execute() + if err != nil { + return errors.Wrap(err, "fail to enroll") + } + + fmt.Fprintln(streams.Out, "Successfully enrolled the Agent.") + return nil } diff --git a/x-pack/agent/pkg/agent/storage/storage.go b/x-pack/agent/pkg/agent/storage/storage.go new file mode 100644 index 000000000000..70d54e831381 --- /dev/null +++ b/x-pack/agent/pkg/agent/storage/storage.go @@ -0,0 +1,224 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package storage + +import ( + "bytes" + "io" + "io/ioutil" + "os" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common/file" + "github.com/elastic/beats/x-pack/agent/pkg/crypto" +) + +const perms = 0600 + +type store interface { + Save(io.Reader) error +} + +type load interface { + Load() (io.ReadCloser, error) +} + +// NullStore this is only use to split the work into multiples PRs. +type NullStore struct{} + +// Save takes the fleetConfig and persist it, will return an errors on failure. +func (m *NullStore) Save(_ io.Reader) error { + return nil +} + +type handlerFunc func(io.Reader) error + +// HandlerStore take a function handler and wrap it into the store interface. +type HandlerStore struct { + fn handlerFunc +} + +// NewHandlerStore takes a function and wrap it into an handlerStore. +func NewHandlerStore(fn handlerFunc) *HandlerStore { + return &HandlerStore{fn: fn} +} + +// Save calls the handler. +func (h *HandlerStore) Save(in io.Reader) error { + return h.fn(in) +} + +// ReplaceOnSuccessStore takes a target file, a replacement content and a wrapped store. This +// store is useful if you want to trigger an action to replace another file when the wrapped store save method +// is succesful. This store will take care of making a backup copy of the target file and will not +// override the content of the target if the target has already the same content. If an error happen, +// we will not replace the file. +type ReplaceOnSuccessStore struct { + target string + replaceWith []byte + + wrapped store +} + +// NewReplaceOnSuccessStore takes a target file and a replacement content and will replace the target +// file content if the wrapped store execution is done without any error. +func NewReplaceOnSuccessStore(target string, replaceWith []byte, wrapped store) *ReplaceOnSuccessStore { + return &ReplaceOnSuccessStore{ + target: target, + replaceWith: replaceWith, + wrapped: wrapped, + } +} + +// Save will replace a target file with new content if the wrapped store is successful. +func (r *ReplaceOnSuccessStore) Save(in io.Reader) error { + // get original permission + s, err := os.Stat(r.target) + + // Ensure we can read the target files before delegating any call to the wrapped store. + target, err := ioutil.ReadFile(r.target) + if err != nil { + return errors.Wrapf(err, "fail to read content of %s", r.target) + } + + err = r.wrapped.Save(in) + if err != nil { + return err + } + + if bytes.Equal(target, r.replaceWith) { + return nil + } + + // Windows is tricky with the characters permitted for the path and filename, so we have + // to remove any colon from the string. We are using nanosec precision here because of automated + // tools. + const fsSafeTs = "2006-01-02T15-04-05.9999" + + ts := time.Now() + backFilename := r.target + "." + ts.Format(fsSafeTs) + ".bak" + if err := file.SafeFileRotate(backFilename, r.target); err != nil { + return errors.Wrapf(err, "could not backup %s", r.target) + } + + fd, err := os.OpenFile(r.target, os.O_CREATE|os.O_WRONLY, s.Mode()) + if err != nil { + // Rollback on any errors to minimize non working state. + if err := file.SafeFileRotate(r.target, backFilename); err != nil { + return errors.Wrapf(err, "could not rollback %s to %s", backFilename, r.target) + } + } + + if _, err := fd.Write(r.replaceWith); err != nil { + if err := file.SafeFileRotate(r.target, backFilename); err != nil { + return errors.Wrapf(err, "could not rollback %s to %s", backFilename, r.target) + } + } + + return nil +} + +// DiskStore takes a persistedConfig and save it to a temporary files and replace the target file. +type DiskStore struct { + target string +} + +// NewDiskStore creates an unencrypted disk store. +func NewDiskStore(target string) *DiskStore { + return &DiskStore{target: target} +} + +// Save accepts a persistedConfig and saved it to a target file, to do so we will +// make a temporary files if the write is successful we are replacing the target file with the +// original content. +func (d *DiskStore) Save(in io.Reader) error { + tmpFile := d.target + ".tmp" + + fd, err := os.OpenFile(tmpFile, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, perms) + if err != nil { + return errors.Wrapf(err, "could not save to %s", tmpFile) + } + defer fd.Close() + + // Always clean up the temporary file and ignore errors. + defer os.Remove(tmpFile) + + if _, err := io.Copy(fd, in); err != nil { + return errors.Wrap(err, "could not save content on disk") + } + + if err := file.SafeFileRotate(d.target, tmpFile); err != nil { + return errors.Wrapf(err, "could not replace target file %s", d.target) + } + + return nil +} + +// Load return a io.ReadCloser for the target file. +func (d *DiskStore) Load() (io.ReadCloser, error) { + return os.OpenFile(d.target, os.O_RDONLY, perms) +} + +// EncryptedDiskStore save the persisted configuration and encrypt the data on disk. +type EncryptedDiskStore struct { + target string + password []byte +} + +// NewEncryptedDiskStore creates an encrypted disk store. +func NewEncryptedDiskStore(target string, password []byte) *EncryptedDiskStore { + return &EncryptedDiskStore{target: target, password: password} +} + +// Save accepts a persistedConfig, encrypt it and saved it to a target file, to do so we will +// make a temporary files if the write is successful we are replacing the target file with the +// original content. +func (d *EncryptedDiskStore) Save(in io.Reader) error { + const perms = 0600 + + tmpFile := d.target + ".tmp" + + fd, err := os.OpenFile(tmpFile, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, perms) + if err != nil { + return errors.Wrapf(err, "could not save to %s", tmpFile) + } + defer fd.Close() + + // Always clean up the temporary file and ignore errors. + defer os.Remove(tmpFile) + + w, err := crypto.NewWriterWithDefaults(fd, d.password) + if err != nil { + return errors.Wrap(err, "could not encrypt the data to disk") + } + + if _, err := io.Copy(w, in); err != nil { + return errors.Wrap(err, "could not save content on disk") + } + + if err := file.SafeFileRotate(d.target, tmpFile); err != nil { + return errors.Wrapf(err, "could not replace target file %s", d.target) + } + + return nil +} + +// Load return a io.ReadCloser that will take care on unencrypting the data. +func (d *EncryptedDiskStore) Load() (io.ReadCloser, error) { + fd, err := os.OpenFile(d.target, os.O_RDONLY, perms) + if err != nil { + return nil, errors.Wrapf(err, "could not open %s", d.target) + } + + r, err := crypto.NewReaderWithDefaults(fd, d.password) + if err != nil { + fd.Close() + return nil, errors.Wrapf(err, "could not decode file %s", d.target) + } + + return r, nil +} diff --git a/x-pack/agent/pkg/agent/storage/storage_test.go b/x-pack/agent/pkg/agent/storage/storage_test.go new file mode 100644 index 000000000000..5bb3792007b8 --- /dev/null +++ b/x-pack/agent/pkg/agent/storage/storage_test.go @@ -0,0 +1,227 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package storage + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/require" +) + +func TestReplaceOrRollbackStore(t *testing.T) { + in := bytes.NewReader([]byte{}) + + replaceWith := []byte("new content") + oldContent := []byte("old content") + + success := NewHandlerStore(func(_ io.Reader) error { return nil }) + failure := NewHandlerStore(func(_ io.Reader) error { return errors.New("fail") }) + + t.Run("when the save is successful with target and source don't match", func(t *testing.T) { + target, err := genFile(oldContent) + require.NoError(t, err) + dir := filepath.Dir(target) + defer os.RemoveAll(dir) + + requireFilesCount(t, dir, 1) + + s := NewReplaceOnSuccessStore( + target, + replaceWith, + success, + ) + + err = s.Save(in) + require.NoError(t, err) + + writtenContent, err := ioutil.ReadFile(target) + require.NoError(t, err) + + require.True(t, bytes.Equal(writtenContent, replaceWith)) + requireFilesCount(t, dir, 2) + }) + + t.Run("when save is not successful", func(t *testing.T) { + target, err := genFile(oldContent) + require.NoError(t, err) + dir := filepath.Dir(target) + defer os.RemoveAll(dir) + + requireFilesCount(t, dir, 1) + + s := NewReplaceOnSuccessStore( + target, + replaceWith, + failure, + ) + + err = s.Save(in) + require.Error(t, err) + + writtenContent, err := ioutil.ReadFile(target) + require.NoError(t, err) + + require.True(t, bytes.Equal(writtenContent, oldContent)) + requireFilesCount(t, dir, 1) + }) + + t.Run("when save is successful with target and source content match", func(t *testing.T) { + target, err := genFile(replaceWith) + require.NoError(t, err) + dir := filepath.Dir(target) + defer os.RemoveAll(dir) + + requireFilesCount(t, dir, 1) + + s := NewReplaceOnSuccessStore( + target, + replaceWith, + failure, + ) + + err = s.Save(in) + require.Error(t, err) + + writtenContent, err := ioutil.ReadFile(target) + require.NoError(t, err) + + require.True(t, bytes.Equal(writtenContent, replaceWith)) + requireFilesCount(t, dir, 1) + }) + + t.Run("when target file do not exist", func(t *testing.T) { + s := NewReplaceOnSuccessStore( + fmt.Sprintf("%s/%d", os.TempDir(), time.Now().Unix()), + replaceWith, + success, + ) + err := s.Save(in) + require.Error(t, err) + }) +} + +func TestDiskStore(t *testing.T) { + t.Run("when the target file already exists", func(t *testing.T) { + target, err := genFile([]byte("hello world")) + require.NoError(t, err) + defer os.Remove(target) + d := &DiskStore{target: target} + + msg := []byte("bonjour la famille") + err = d.Save(bytes.NewReader(msg)) + require.NoError(t, err) + + content, err := ioutil.ReadFile(target) + require.NoError(t, err) + + require.Equal(t, msg, content) + }) + + t.Run("when the target do no exist", func(t *testing.T) { + dir, err := ioutil.TempDir("", "configs") + require.NoError(t, err) + defer os.Remove(dir) + + target := filepath.Join(dir, "hello.txt") + d := &DiskStore{target: target} + + msg := []byte("bonjour la famille") + err = d.Save(bytes.NewReader(msg)) + require.NoError(t, err) + + content, err := ioutil.ReadFile(target) + require.NoError(t, err) + + require.Equal(t, msg, content) + }) + + t.Run("return an io.ReadCloser to the target file", func(t *testing.T) { + msg := []byte("bonjour la famille") + target, err := genFile(msg) + require.NoError(t, err) + + d := &DiskStore{target: target} + r, err := d.Load() + require.NoError(t, err) + defer r.Close() + + content, err := ioutil.ReadAll(r) + require.NoError(t, err) + require.Equal(t, msg, content) + }) +} + +func TestEncryptedDiskStore(t *testing.T) { + t.Run("when the target file already exists", func(t *testing.T) { + target, err := genFile([]byte("hello world")) + require.NoError(t, err) + defer os.Remove(target) + d := &EncryptedDiskStore{target: target} + + msg := []byte("bonjour la famille") + err = d.Save(bytes.NewReader(msg)) + require.NoError(t, err) + + // lets read the file + nd := &EncryptedDiskStore{target: target} + r, err := nd.Load() + require.NoError(t, err) + + content, err := ioutil.ReadAll(r) + require.NoError(t, err) + + require.Equal(t, msg, content) + }) + + t.Run("when the target do no exist", func(t *testing.T) { + dir, err := ioutil.TempDir("", "configs") + require.NoError(t, err) + defer os.Remove(dir) + + target := filepath.Join(dir, "hello.txt") + d := &DiskStore{target: target} + + msg := []byte("bonjour la famille") + err = d.Save(bytes.NewReader(msg)) + require.NoError(t, err) + + content, err := ioutil.ReadFile(target) + require.NoError(t, err) + + require.Equal(t, msg, content) + }) +} + +func genFile(b []byte) (string, error) { + dir, err := ioutil.TempDir("", "configs") + if err != nil { + return "", err + } + + f, err := ioutil.TempFile(dir, "config-") + if err != nil { + return "", err + } + f.Write(b) + if err := f.Close(); err != nil { + return "", err + } + + return f.Name(), nil +} + +func requireFilesCount(t *testing.T, dir string, l int) { + files, err := ioutil.ReadDir(dir) + require.NoError(t, err) + require.Equal(t, l, len(files)) +} diff --git a/x-pack/agent/pkg/config/config.go b/x-pack/agent/pkg/config/config.go index 7b6ff3bf77a1..e8845840137c 100644 --- a/x-pack/agent/pkg/config/config.go +++ b/x-pack/agent/pkg/config/config.go @@ -6,6 +6,8 @@ package config import ( "fmt" + "io" + "io/ioutil" "github.com/elastic/go-ucfg" "github.com/elastic/go-ucfg/cfgutil" @@ -46,6 +48,15 @@ func NewConfigFrom(from interface{}) (*Config, error) { return newConfigFrom(c), err } + if in, ok := from.(io.Reader); ok { + content, err := ioutil.ReadAll(in) + if err != nil { + return nil, err + } + c, err := yaml.NewConfig(content, DefaultOptions...) + return newConfigFrom(c), err + } + c, err := ucfg.NewFrom(from, DefaultOptions...) return newConfigFrom(c), err } diff --git a/x-pack/agent/pkg/core/plugin/app/configure.go b/x-pack/agent/pkg/core/plugin/app/configure.go index cb1fe1a56789..3b49854d5864 100644 --- a/x-pack/agent/pkg/core/plugin/app/configure.go +++ b/x-pack/agent/pkg/core/plugin/app/configure.go @@ -9,13 +9,13 @@ import ( "net" "time" + "gopkg.in/yaml.v2" + "github.com/elastic/beats/libbeat/common/backoff" "github.com/elastic/beats/x-pack/agent/pkg/agent/errors" "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/retry" "github.com/elastic/beats/x-pack/agent/pkg/core/plugin/state" "github.com/elastic/beats/x-pack/agent/pkg/core/remoteconfig" - - "gopkg.in/yaml.v2" ) const ( diff --git a/x-pack/agent/pkg/crypto/io.go b/x-pack/agent/pkg/crypto/io.go new file mode 100644 index 000000000000..fca7cb6b1883 --- /dev/null +++ b/x-pack/agent/pkg/crypto/io.go @@ -0,0 +1,378 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package crypto + +import ( + "bytes" + "crypto/aes" + "crypto/cipher" + "crypto/rand" + "crypto/sha512" + "encoding/binary" + "fmt" + "io" + + "github.com/pkg/errors" + "golang.org/x/crypto/pbkdf2" +) + +// Option is the default options used to generate the encrypt and decrypt writer. +// NOTE: the defined options need to be same for both the Reader and the writer. +type Option struct { + IterationsCount int + KeyLength int + SaltLength int + IVLength int + Generator bytesGen + + // BlockSize must be a factor of aes.BlockSize + BlockSize int +} + +// Validate the options for encoding and decoding values. +func (o *Option) Validate() error { + if o.IVLength == 0 { + return errors.New("IV length must be superior to 0") + } + + if o.SaltLength == 0 { + return errors.New("Salt length must be superior to 0") + } + + if o.IterationsCount == 0 { + return errors.New("IterationsCount must be superior to 0") + } + + if o.KeyLength == 0 { + return errors.New("KeyLength must be superior to 0") + } + + return nil +} + +// DefaultOptions is the default options to use when creating the writer, changing might decrease +// the efficacity of the encryption. +var DefaultOptions = &Option{ + IterationsCount: 10000, + KeyLength: 32, + SaltLength: 64, + IVLength: 12, + Generator: randomBytes, + BlockSize: bytes.MinRead, +} + +// versionMagicHeader is the format version that will be added at the begining of the header and +// can be used to change how the decryption work in future version. +var versionMagicHeader = []byte("v2") + +// Writer is an io.Writer implementation that will encrypt any data that it need to write, before +// writing any data to the wrapped writer it will lazy write an header with the necessary information +// to be able to decrypt the data. +type Writer struct { + option *Option + password []byte + writer io.Writer + generator bytesGen + + // internal + wroteHeader bool + err error + gcm cipher.AEAD + salt []byte +} +type bytesGen func(int) ([]byte, error) + +// NewWriter returns a new encrypted Writer. +func NewWriter(writer io.Writer, password []byte, option *Option) (*Writer, error) { + if err := option.Validate(); err != nil { + return nil, err + } + + var g bytesGen + if option.Generator == nil { + g = randomBytes + } else { + g = option.Generator + } + + salt, err := g(option.SaltLength) + if err != nil { + return nil, errors.Wrap(err, "fail to generate random password salt") + } + + return &Writer{ + option: option, + password: password, + generator: g, + writer: writer, + salt: salt, + }, nil +} + +// NewWriterWithDefaults create a new encryption writer with the defaults options. +func NewWriterWithDefaults(writer io.Writer, password []byte) (*Writer, error) { + return NewWriter(writer, password, DefaultOptions) +} + +// Write takes a byte slice and encrypt to the destination writer, it will return any errors when +// generating the header information or when we try to encode the data. +func (w *Writer) Write(b []byte) (int, error) { + if w.err != nil { + return 0, w.err + } + + if !w.wroteHeader { + w.wroteHeader = true + + // Stretch the user provided key. + passwordBytes := stretchPassword( + w.password, + w.salt, + w.option.IterationsCount, + w.option.KeyLength, + ) + + // Select AES-256: because len(passwordBytes) == 32 bytes. + block, err := aes.NewCipher(passwordBytes) + if err != nil { + w.err = errors.Wrap(err, "could not create the cipher to encrypt") + return 0, w.err + } + + aesgcm, err := cipher.NewGCM(block) + if err != nil { + w.err = errors.Wrap(err, "could not create the GCM to encrypt") + return 0, w.err + } + + w.gcm = aesgcm + + // Write headers + // VERSION|SALT|IV|PAYLOAD + header := new(bytes.Buffer) + header.Write(versionMagicHeader) + header.Write(w.salt) + + n, err := w.writer.Write(header.Bytes()) + if err != nil { + w.err = errors.Wrap(err, "fail to write encoding information header") + return 0, w.err + } + + if n != len(header.Bytes()) { + w.err = errors.New("written bytes do not match header size") + } + + if err := w.writeBlock(b); err != nil { + return 0, errors.Wrap(err, "fail to write block") + } + + return len(b), err + } + + if err := w.writeBlock(b); err != nil { + return 0, errors.Wrap(err, "fail to write block") + } + + return len(b), nil +} + +func (w *Writer) writeBlock(b []byte) error { + + // randomly generate the salt and the initialization vector, this information will be saved + // on disk in the file as part of the header + iv, err := w.generator(w.option.IVLength) + if err != nil { + w.err = errors.Wrap(err, "fail to generate random IV") + return w.err + } + + w.writer.Write(iv) + + encodedBytes := w.gcm.Seal(nil, iv, b, nil) + + l := make([]byte, 4) + binary.LittleEndian.PutUint32(l, uint32(len(encodedBytes))) + w.writer.Write(l) + + _, err = w.writer.Write(encodedBytes) + if err != nil { + return errors.Wrap(err, "fail to encode data") + } + + return nil +} + +// Reader implements the io.Reader interface and allow to decrypt bytes from the Writer. The reader +// will lazy read the header from the wrapper reader to initialize everything required to decrypt +// the data. +type Reader struct { + option *Option + password []byte + reader io.Reader + + // internal + err error + readHeader bool + gcm cipher.AEAD + iv []byte + buf []byte + eof bool +} + +// NewReader returns a new decryption reader. +func NewReader(reader io.Reader, password []byte, option *Option) (*Reader, error) { + if reader == nil { + return nil, errors.New("missing reader") + } + + return &Reader{ + option: option, + password: password, + reader: reader, + }, nil +} + +// NewReaderWithDefaults create a decryption io.Reader with the default options. +func NewReaderWithDefaults(reader io.Reader, password []byte) (*Reader, error) { + return NewReader(reader, password, DefaultOptions) +} + +// Read reads the bytes from a wrapped io.Reader and will decrypt the content on the fly. +func (r *Reader) Read(b []byte) (int, error) { + if r.err != nil { + return 0, r.err + } + + // Lets read the header. + if !r.readHeader { + r.readHeader = true + vLen := len(versionMagicHeader) + buf := make([]byte, vLen+r.option.SaltLength) + n, err := io.ReadAtLeast(r.reader, buf, len(buf)) + if err != nil { + r.err = errors.Wrap(err, "fail to read encoding header") + return n, err + } + + v := buf[0:vLen] + if !bytes.Equal(versionMagicHeader, v) { + return 0, fmt.Errorf("unknown version %s (%+v)", string(v), v) + } + + salt := buf[vLen : vLen+r.option.SaltLength] + + // Stretch the user provided key. + passwordBytes := stretchPassword( + r.password, + salt, + r.option.IterationsCount, + r.option.KeyLength, + ) + + block, err := aes.NewCipher(passwordBytes) + if err != nil { + r.err = errors.Wrap(err, "could not create the cipher to decrypt the data") + return 0, r.err + } + + aesgcm, err := cipher.NewGCM(block) + if err != nil { + r.err = errors.Wrap(err, "could not create the GCM to decrypt the data") + return 0, r.err + } + r.gcm = aesgcm + } + + return r.readTo(b) +} + +func (r *Reader) readTo(b []byte) (int, error) { + if r.err != nil { + return 0, r.err + } + + if !r.eof { + if err := r.consumeBlock(); err != nil { + // We read all the blocks + if err == io.EOF || err == io.ErrUnexpectedEOF { + r.eof = true + } else { + r.err = err + return 0, err + } + } + } + + n := copy(b, r.buf) + r.buf = r.buf[n:] + + if r.eof && len(r.buf) == 0 { + r.err = io.EOF + } + + return n, r.err +} + +func (r *Reader) consumeBlock() error { + // Retrieve block information: + // - Initialization vector + // - Length of the block + iv, l, err := r.readBlockInfo() + if err != nil { + return err + } + + encodedBytes := make([]byte, l) + _, err = io.ReadAtLeast(r.reader, encodedBytes, int(l)) + if err != nil { + r.err = errors.Wrapf(err, "fail read the block of %d bytes", l) + } + + decodedBytes, err := r.gcm.Open(nil, iv, encodedBytes, nil) + if err != nil { + return errors.Wrap(err, "fail to decode bytes") + } + r.buf = append(r.buf[:], decodedBytes...) + + return nil +} + +func (r *Reader) readBlockInfo() ([]byte, int, error) { + buf := make([]byte, r.option.IVLength+4) + _, err := io.ReadAtLeast(r.reader, buf, len(buf)) + if err != nil { + return nil, 0, err + } + + iv := buf[0:r.option.IVLength] + l := binary.LittleEndian.Uint32(buf[r.option.IVLength:]) + + return iv, int(l), nil +} + +// Close will propagate the Close call to the wrapped reader. +func (r *Reader) Close() error { + a, ok := r.reader.(io.ReadCloser) + if ok { + return a.Close() + } + return nil +} + +func randomBytes(length int) ([]byte, error) { + r := make([]byte, length) + _, err := rand.Read(r) + + if err != nil { + return nil, err + } + + return r, nil +} + +func stretchPassword(password, salt []byte, c, kl int) []byte { + return pbkdf2.Key(password, salt, c, kl, sha512.New) +} diff --git a/x-pack/agent/pkg/crypto/io_test.go b/x-pack/agent/pkg/crypto/io_test.go new file mode 100644 index 000000000000..468476430105 --- /dev/null +++ b/x-pack/agent/pkg/crypto/io_test.go @@ -0,0 +1,195 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package crypto + +import ( + "bufio" + "bytes" + "io" + "io/ioutil" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestIO(t *testing.T) { + t.Run("encode and decode with the right password", func(t *testing.T) { + passwd := []byte("hello") + msg := []byte("bonjour la famille") + dest := new(bytes.Buffer) + + // Encode + w, err := NewWriterWithDefaults(dest, passwd) + require.NoError(t, err) + + n, err := w.Write(msg) + require.NoError(t, err) + require.Equal(t, len(msg), n) + + // Guard to make sure we have not the same bytes. + require.True(t, bytes.Index(dest.Bytes(), msg) == -1) + + r, err := NewReaderWithDefaults(dest, passwd) + require.NoError(t, err) + + content, err := ioutil.ReadAll(r) + require.NoError(t, err) + + require.Equal(t, msg, content) + }) + + t.Run("Large single write", func(t *testing.T) { + passwd := []byte("hello") + msg, err := randomBytes(1327) + + require.NoError(t, err) + dest := new(bytes.Buffer) + + // Encode + w, err := NewWriterWithDefaults(dest, passwd) + require.NoError(t, err) + + n, err := io.Copy(w, bytes.NewBuffer(msg)) + require.NoError(t, err) + require.Equal(t, int64(len(msg)), n) + + // Guard to make sure we have not the same bytes. + require.True(t, bytes.Index(dest.Bytes(), msg) == -1) + + r, err := NewReaderWithDefaults(dest, passwd) + require.NoError(t, err) + + content, err := ioutil.ReadAll(r) + require.NoError(t, err) + + require.Equal(t, msg, content) + }) + + t.Run("try to decode with the wrong password", func(t *testing.T) { + passwd := []byte("hello") + msg := []byte("bonjour la famille") + dest := new(bytes.Buffer) + + // Encode + w, err := NewWriterWithDefaults(dest, passwd) + require.NoError(t, err) + + n, err := w.Write(msg) + require.NoError(t, err) + require.Equal(t, len(msg), n) + + // Guard to make sure we have not the same bytes. + require.True(t, bytes.Index(dest.Bytes(), msg) == -1) + + r, err := NewReaderWithDefaults(dest, []byte("bad password")) + require.NoError(t, err) + + _, err = ioutil.ReadAll(r) + require.Error(t, err) + }) + + t.Run("Make sure that buffered IO works with the encoder", func(t *testing.T) { + passwd := []byte("hello") + msg, err := randomBytes(2048) + require.NoError(t, err) + dest := new(bytes.Buffer) + + // Encode + w, err := NewWriterWithDefaults(dest, passwd) + require.NoError(t, err) + + b := bufio.NewWriterSize(w, 100) + n, err := b.Write(msg) + require.NoError(t, err) + require.Equal(t, 2048, n) + // err = b.Flush() //force flush + require.NoError(t, err) + + require.True(t, len(dest.Bytes()) > 0) + + // Guard to make sure we have not the same bytes. + require.True(t, bytes.Index(dest.Bytes(), msg) == -1) + + r, err := NewReaderWithDefaults(dest, passwd) + require.NoError(t, err) + + content, err := ioutil.ReadAll(r) + require.NoError(t, err) + + require.Equal(t, msg, content) + }) + + t.Run("Make sure that buffered IO works with the decoder", func(t *testing.T) { + passwd := []byte("hello") + msg, err := randomBytes(2048) + require.NoError(t, err) + dest := new(bytes.Buffer) + + // Encode + w, err := NewWriterWithDefaults(dest, passwd) + require.NoError(t, err) + + n, err := w.Write(msg) + require.NoError(t, err) + require.True(t, n == 2048) + + // Guard to make sure we have not the same bytes. + require.True(t, bytes.Index(dest.Bytes(), msg) == -1) + + r, err := NewReaderWithDefaults(dest, passwd) + require.NoError(t, err) + + b := bufio.NewReaderSize(r, 100) + + content, err := ioutil.ReadAll(b) + require.NoError(t, err) + + require.Equal(t, msg, content) + }) + + t.Run("Missing explicit version", func(t *testing.T) { + raw, err := randomBytes(2048) + c := bytes.NewBuffer(raw) + + r, err := NewReaderWithDefaults(c, []byte("bad password")) + require.NoError(t, err) + + b := bufio.NewReaderSize(r, 100) + + _, err = ioutil.ReadAll(b) + require.Error(t, err) + }) + + t.Run("works with multiple writes", func(t *testing.T) { + passwd := []byte("hello") + + expected := []byte("hello world bonjour la famille") + + dest := new(bytes.Buffer) + + // Encode + w, err := NewWriterWithDefaults(dest, passwd) + require.NoError(t, err) + + n, err := w.Write([]byte("hello world")) + require.NoError(t, err) + require.Equal(t, 11, n) + + n, err = w.Write([]byte(" bonjour la famille")) + require.NoError(t, err) + require.Equal(t, 19, n) + + // Guard to make sure we have not the same bytes. + require.True(t, bytes.Index(dest.Bytes(), expected) == -1) + + r, err := NewReaderWithDefaults(dest, passwd) + require.NoError(t, err) + + content, err := ioutil.ReadAll(r) + require.NoError(t, err) + + require.Equal(t, expected, content) + }) +} diff --git a/x-pack/agent/pkg/fleetapi/client.go b/x-pack/agent/pkg/fleetapi/client.go index 724f780a05ba..5b4670fe58e5 100644 --- a/x-pack/agent/pkg/fleetapi/client.go +++ b/x-pack/agent/pkg/fleetapi/client.go @@ -62,8 +62,8 @@ func init() { // - Send the API Key on every HTTP request. // - Ensure a minimun version of Kibana is required. // - Send the Fleet User Agent on every HTTP request. -func NewAuthWithConfig(log *logger.Logger, config *config.Config, apiKey string) (*kibana.Client, error) { - return kibana.NewWithRawConfig(log, config, func(rt http.RoundTripper) (http.RoundTripper, error) { +func NewAuthWithConfig(log *logger.Logger, apiKey string, cfg *kibana.Config) (*kibana.Client, error) { + return kibana.NewWithConfig(log, cfg, func(rt http.RoundTripper) (http.RoundTripper, error) { rt, err := baseRoundTrippers(rt) if err != nil { return nil, err diff --git a/x-pack/agent/pkg/fleetapi/helper_test.go b/x-pack/agent/pkg/fleetapi/helper_test.go index 7c56d6e99560..65e6982bb2a5 100644 --- a/x-pack/agent/pkg/fleetapi/helper_test.go +++ b/x-pack/agent/pkg/fleetapi/helper_test.go @@ -13,7 +13,8 @@ import ( "github.com/stretchr/testify/require" - "github.com/elastic/beats/x-pack/agent/pkg/config" + "github.com/elastic/beats/agent/kibana" + "github.com/elastic/beats/x-pack/agent/pkg/core/logger" ) func authHandler(handler http.HandlerFunc, apiKey string) http.HandlerFunc { @@ -46,15 +47,17 @@ func withServer(m func(t *testing.T) *http.ServeMux, test func(t *testing.T, hos func withServerWithAuthClient( m func(t *testing.T) *http.ServeMux, - accessToken string, + apiKey string, test func(t *testing.T, client clienter), ) func(t *testing.T) { return withServer(m, func(t *testing.T, host string) { - cfg := config.MustNewConfigFrom(map[string]interface{}{ - "host": host, - }) - client, err := NewAuthWithConfig(nil, cfg, accessToken) + log, _ := logger.New() + cfg := &kibana.Config{ + Host: host, + } + + client, err := NewAuthWithConfig(log, apiKey, cfg) require.NoError(t, err) test(t, client) }) diff --git a/x-pack/agent/pkg/reporter/fleet/config.go b/x-pack/agent/pkg/reporter/fleet/config.go index e742e0bec56f..c088b3abf4d4 100644 --- a/x-pack/agent/pkg/reporter/fleet/config.go +++ b/x-pack/agent/pkg/reporter/fleet/config.go @@ -6,15 +6,13 @@ package fleet // ManagementConfig is a configuration describing fleet connected parts type ManagementConfig struct { - Enabled bool `config:"enabled"` - Threshold int `config:"threshold" validate:"min=1"` - ReportingCheckFrequency int `config:"check_frequency_sec" validate:"min=1"` + Threshold int `config:"threshold" validate:"min=1"` + ReportingCheckFrequency int `config:"check_frequency_sec" validate:"min=1"` } // DefaultFleetManagementConfig initiates FleetManagementConfig with default values func DefaultFleetManagementConfig() *ManagementConfig { return &ManagementConfig{ - Enabled: false, Threshold: 10000, ReportingCheckFrequency: 30, } diff --git a/x-pack/agent/pkg/reporter/log/config.go b/x-pack/agent/pkg/reporter/log/config.go index 558c381d2946..ef69afe9bbe2 100644 --- a/x-pack/agent/pkg/reporter/log/config.go +++ b/x-pack/agent/pkg/reporter/log/config.go @@ -6,7 +6,7 @@ package log // Config is a configuration describing log reporter behavior type Config struct { - Format Format `config:"format"` + Format Format `config:"format" yaml:"format"` } // DefaultLogConfig initiates LogConfig with default values diff --git a/x-pack/agent/pkg/reporter/log/format.go b/x-pack/agent/pkg/reporter/log/format.go index 0d82ed7ed15c..dcb08f5b1c47 100644 --- a/x-pack/agent/pkg/reporter/log/format.go +++ b/x-pack/agent/pkg/reporter/log/format.go @@ -30,15 +30,30 @@ var formatMap = map[string]Format{ "json": JSONFormat, } +var reverseMap = map[bool]string{ + true: "default", + false: "json", +} + // Unpack enables using of string values in config func (m *Format) Unpack(v string) error { mgt, ok := formatMap[v] if !ok { return fmt.Errorf( - "unknown format, received '%s' and valid values are local or fleet", + "unknown format, received '%s' and valid values are default or json", v, ) } *m = mgt return nil } + +// MarshalYAML marshal into a string. +func (m Format) MarshalYAML() (interface{}, error) { + s, ok := reverseMap[bool(m)] + if !ok { + return nil, fmt.Errorf("cannot marshal value of %+v", m) + } + + return s, nil +}