diff --git a/x-pack/agent/_meta/agent.docker.yml b/x-pack/agent/_meta/agent.docker.yml index 3a900a1927f2..491aaaf4408b 100644 --- a/x-pack/agent/_meta/agent.docker.yml +++ b/x-pack/agent/_meta/agent.docker.yml @@ -8,12 +8,21 @@ outputs: username: elastic password: changeme -streams: - - type: event/file - enabled: true - paths: - - /var/log/hello1.log - - /var/log/hello2.log +datasources: + - namespace: sample + use_output: default + inputs: + - type: logs + processors: + streams: + - dataset: sample.acccess + paths: /var/log/sample/access.log + - enabled: false + dataset: sample.error + paths: /var/log/sample/error.log + - type: system/metrics + streams: + - metricset: cpu management: # Mode of management, the agent support two modes of operation: diff --git a/x-pack/agent/_meta/agent.yml b/x-pack/agent/_meta/agent.yml index 00b16718a3c7..21ed954f95cd 100644 --- a/x-pack/agent/_meta/agent.yml +++ b/x-pack/agent/_meta/agent.yml @@ -8,12 +8,21 @@ outputs: username: elastic password: changeme -streams: - - type: event/file - enabled: true - paths: - - /var/log/hello1.log - - /var/log/hello2.log +datasources: + - namespace: sample + use_output: default + inputs: + - type: logs + processors: + streams: + - dataset: sample.acccess + paths: /var/log/sample/access.log + - enabled: false + dataset: sample.error + paths: /var/log/sample/error.log + - type: system/metrics + streams: + - metricset: cpu management: # Mode of management, the agent support two modes of operation: diff --git a/x-pack/agent/_meta/common.p2.yml b/x-pack/agent/_meta/common.p2.yml index 5a75ec2fc60a..b1dbecc8b00c 100644 --- a/x-pack/agent/_meta/common.p2.yml +++ b/x-pack/agent/_meta/common.p2.yml @@ -8,12 +8,21 @@ outputs: username: elastic password: changeme -streams: - - type: event/file - enabled: true - paths: - - /var/log/hello1.log - - /var/log/hello2.log +datasources: + - namespace: sample + use_output: default + inputs: + - type: logs + processors: + streams: + - dataset: sample.acccess + paths: /var/log/sample/access.log + - enabled: false + dataset: sample.error + paths: /var/log/sample/error.log + - type: system/metrics + streams: + - metricset: cpu management: # Mode of magement, the agent support two modes of operation: diff --git a/x-pack/agent/_meta/common.reference.p2.yml b/x-pack/agent/_meta/common.reference.p2.yml index 48b735726217..22dd1f0619f5 100644 --- a/x-pack/agent/_meta/common.reference.p2.yml +++ b/x-pack/agent/_meta/common.reference.p2.yml @@ -8,12 +8,21 @@ outputs: username: elastic password: changeme -streams: - - type: event/file - enabled: true - paths: - - /var/log/hello1.log - - /var/log/hello2.log +datasources: + - namespace: sample + use_output: default + inputs: + - type: logs + processors: + streams: + - dataset: sample.acccess + paths: /var/log/sample/access.log + - enabled: false + dataset: sample.error + paths: /var/log/sample/error.log + - type: system/metrics + streams: + - metricset: cpu management: # Mode of management, the agent currently only support the following mode: diff --git a/x-pack/agent/agent.docker.yml b/x-pack/agent/agent.docker.yml index 3a900a1927f2..491aaaf4408b 100644 --- a/x-pack/agent/agent.docker.yml +++ b/x-pack/agent/agent.docker.yml @@ -8,12 +8,21 @@ outputs: username: elastic password: changeme -streams: - - type: event/file - enabled: true - paths: - - /var/log/hello1.log - - /var/log/hello2.log +datasources: + - namespace: sample + use_output: default + inputs: + - type: logs + processors: + streams: + - dataset: sample.acccess + paths: /var/log/sample/access.log + - enabled: false + dataset: sample.error + paths: /var/log/sample/error.log + - type: system/metrics + streams: + - metricset: cpu management: # Mode of management, the agent support two modes of operation: diff --git a/x-pack/agent/agent.reference.yml b/x-pack/agent/agent.reference.yml index dc425dc00b5b..264f92f8ca14 100644 --- a/x-pack/agent/agent.reference.yml +++ b/x-pack/agent/agent.reference.yml @@ -13,12 +13,21 @@ outputs: username: elastic password: changeme -streams: - - type: event/file - enabled: true - paths: - - /var/log/hello1.log - - /var/log/hello2.log +datasources: + - namespace: sample + use_output: default + inputs: + - type: logs + processors: + streams: + - dataset: sample.acccess + paths: /var/log/sample/access.log + - enabled: false + dataset: sample.error + paths: /var/log/sample/error.log + - type: system/metrics + streams: + - metricset: cpu management: # Mode of management, the agent currently only support the following mode: diff --git a/x-pack/agent/agent.yml b/x-pack/agent/agent.yml index 07a58aed3691..bf7d3ea5a22c 100644 --- a/x-pack/agent/agent.yml +++ b/x-pack/agent/agent.yml @@ -13,12 +13,21 @@ outputs: username: elastic password: changeme -streams: - - type: event/file - enabled: true - paths: - - /var/log/hello1.log - - /var/log/hello2.log +datasources: + - namespace: sample + use_output: default + inputs: + - type: logs + processors: + streams: + - dataset: sample.acccess + paths: /var/log/sample/access.log + - enabled: false + dataset: sample.error + paths: /var/log/sample/error.log + - type: system/metrics + streams: + - metricset: cpu management: # Mode of magement, the agent support two modes of operation: diff --git a/x-pack/agent/dev-tools/cmd/fakewebapi/action_example.json b/x-pack/agent/dev-tools/cmd/fakewebapi/action_example.json index ab1ff1b79c70..d76fe58aed25 100644 --- a/x-pack/agent/dev-tools/cmd/fakewebapi/action_example.json +++ b/x-pack/agent/dev-tools/cmd/fakewebapi/action_example.json @@ -3,9 +3,9 @@ "success": true, "actions": [ { - "type": "POLICY_CHANGE", + "type": "CONFIG_CHANGE", "data": { - "policy": { + "config": { "id": "default", "outputs": { "default": { diff --git a/x-pack/agent/pkg/agent/application/action_store.go b/x-pack/agent/pkg/agent/application/action_store.go index 7ca9e0f3efc0..22ccfbbdd1c2 100644 --- a/x-pack/agent/pkg/agent/application/action_store.go +++ b/x-pack/agent/pkg/agent/application/action_store.go @@ -33,14 +33,14 @@ func newActionStore(log *logger.Logger, store storeLoad) (*actionStore, error) { return &actionStore{log: log, store: store}, nil } - var action actionPolicyChangeSerializer + var action actionConfigChangeSerializer dec := yaml.NewDecoder(reader) if err := dec.Decode(&action); err != nil { return nil, err } - apc := fleetapi.ActionPolicyChange(action) + apc := fleetapi.ActionConfigChange(action) return &actionStore{ log: log, @@ -53,7 +53,7 @@ func newActionStore(log *logger.Logger, store storeLoad) (*actionStore, error) { // any other type of action will be silently ignored. func (s *actionStore) Add(a action) { switch v := a.(type) { - case *fleetapi.ActionPolicyChange: + case *fleetapi.ActionConfigChange: // Only persist the action if the action is different. if s.action != nil && s.action.ID() == v.ID() { return @@ -69,12 +69,12 @@ func (s *actionStore) Save() error { return nil } - apc, ok := s.action.(*fleetapi.ActionPolicyChange) + apc, ok := s.action.(*fleetapi.ActionConfigChange) if !ok { return fmt.Errorf("incompatible type, expected ActionPolicyChange and received %T", s.action) } - serialize := actionPolicyChangeSerializer(*apc) + serialize := actionConfigChangeSerializer(*apc) reader, err := yamlToReader(&serialize) if err != nil { @@ -98,7 +98,7 @@ func (s *actionStore) Actions() []action { return []action{s.action} } -// actionPolicyChangeSerializer is a struct that add YAML serialization, I don't think serialization +// actionConfigChangeSerializer is a struct that add YAML serialization, I don't think serialization // is a concern of the fleetapi package. I went this route so I don't have to do much refactoring. // // There are four ways to achieve the same results: @@ -108,14 +108,14 @@ func (s *actionStore) Actions() []action { // 4. We have two sets of type. // // This could be done in a refactoring. -type actionPolicyChangeSerializer struct { +type actionConfigChangeSerializer struct { ActionID string `yaml:"action_id"` ActionType string `yaml:"action_type"` - Policy map[string]interface{} `yaml:"policy"` + Config map[string]interface{} `yaml:"config"` } // Add a guards between the serializer structs and the original struct. -var _ actionPolicyChangeSerializer = actionPolicyChangeSerializer(fleetapi.ActionPolicyChange{}) +var _ actionConfigChangeSerializer = actionConfigChangeSerializer(fleetapi.ActionConfigChange{}) // actionStoreAcker wraps an existing acker and will send any acked event to the action store, // its up to the action store to decide if we need to persist the event for future replay or just diff --git a/x-pack/agent/pkg/agent/application/action_store_test.go b/x-pack/agent/pkg/agent/application/action_store_test.go index 33fc61d2424a..636882a2df96 100644 --- a/x-pack/agent/pkg/agent/application/action_store_test.go +++ b/x-pack/agent/pkg/agent/application/action_store_test.go @@ -57,10 +57,10 @@ func TestActionStore(t *testing.T) { t.Run("can save to disk known action type", withFile(func(t *testing.T, file string) { - actionPolicyChange := &fleetapi.ActionPolicyChange{ + actionConfigChange := &fleetapi.ActionConfigChange{ ActionID: "abc123", - ActionType: "POLICY_CHANGE", - Policy: map[string]interface{}{ + ActionType: "CONFIG_CHANGE", + Config: map[string]interface{}{ "hello": "world", }, } @@ -70,7 +70,7 @@ func TestActionStore(t *testing.T) { require.NoError(t, err) require.Equal(t, 0, len(store.Actions())) - store.Add(actionPolicyChange) + store.Add(actionConfigChange) err = store.Save() require.NoError(t, err) require.Equal(t, 1, len(store.Actions())) @@ -82,12 +82,12 @@ func TestActionStore(t *testing.T) { actions := store1.Actions() require.Equal(t, 1, len(actions)) - require.Equal(t, actionPolicyChange, actions[0]) + require.Equal(t, actionConfigChange, actions[0]) })) t.Run("when we ACK we save to disk", withFile(func(t *testing.T, file string) { - actionPolicyChange := &fleetapi.ActionPolicyChange{ + actionConfigChange := &fleetapi.ActionConfigChange{ ActionID: "abc123", } @@ -98,7 +98,7 @@ func TestActionStore(t *testing.T) { acker := newActionStoreAcker(&testAcker{}, store) require.Equal(t, 0, len(store.Actions())) - require.NoError(t, acker.Ack(context.Background(), actionPolicyChange)) + require.NoError(t, acker.Ack(context.Background(), actionConfigChange)) require.Equal(t, 1, len(store.Actions())) })) } diff --git a/x-pack/agent/pkg/agent/application/fleet_gateway_test.go b/x-pack/agent/pkg/agent/application/fleet_gateway_test.go index 8ec33b1850ae..a3539e2ae1fe 100644 --- a/x-pack/agent/pkg/agent/application/fleet_gateway_test.go +++ b/x-pack/agent/pkg/agent/application/fleet_gateway_test.go @@ -191,10 +191,10 @@ func TestFleetGateway(t *testing.T) { { "actions": [ { - "type": "POLICY_CHANGE", + "type": "CONFIG_CHANGE", "id": "id1", "data": { - "policy": { + "config": { "id": "policy-id" } } diff --git a/x-pack/agent/pkg/agent/application/handler_action_policy_change.go b/x-pack/agent/pkg/agent/application/handler_action_policy_change.go index eaecd5655a1e..868518e360c4 100644 --- a/x-pack/agent/pkg/agent/application/handler_action_policy_change.go +++ b/x-pack/agent/pkg/agent/application/handler_action_policy_change.go @@ -14,24 +14,24 @@ import ( "github.com/elastic/beats/x-pack/agent/pkg/fleetapi" ) -type handlerPolicyChange struct { +type handlerConfigChange struct { log *logger.Logger emitter emitterFunc } -func (h *handlerPolicyChange) Handle(ctx context.Context, a action, acker fleetAcker) error { - h.log.Debugf("HandlerPolicyChange: action '%+v' received", a) - action, ok := a.(*fleetapi.ActionPolicyChange) +func (h *handlerConfigChange) Handle(ctx context.Context, a action, acker fleetAcker) error { + h.log.Debugf("handlerConfigChange: action '%+v' received", a) + action, ok := a.(*fleetapi.ActionConfigChange) if !ok { - return fmt.Errorf("invalid type, expected ActionPolicyChange and received %T", a) + return fmt.Errorf("invalid type, expected ActionConfigChange and received %T", a) } - c, err := config.NewConfigFrom(action.Policy) + c, err := config.NewConfigFrom(action.Config) if err != nil { return errors.New(err, "could not parse the configuration from the policy", errors.TypeConfig) } - h.log.Debugf("HandlerPolicyChange: emit configuration for action %+v", a) + h.log.Debugf("handlerConfigChange: emit configuration for action %+v", a) if err := h.emitter(c); err != nil { return err } diff --git a/x-pack/agent/pkg/agent/application/handler_action_policy_change_test.go b/x-pack/agent/pkg/agent/application/handler_action_policy_change_test.go index 63b482bc0356..c17a358b1ddc 100644 --- a/x-pack/agent/pkg/agent/application/handler_action_policy_change_test.go +++ b/x-pack/agent/pkg/agent/application/handler_action_policy_change_test.go @@ -32,35 +32,35 @@ func TestPolicyChange(t *testing.T) { log, _ := logger.New() ack := newNoopAcker() - t.Run("Receive a policy change and successfully emits a raw configuration", func(t *testing.T) { + t.Run("Receive a config change and successfully emits a raw configuration", func(t *testing.T) { emitter := &mockEmitter{} - policy := map[string]interface{}{"hello": "world"} - action := &fleetapi.ActionPolicyChange{ + conf := map[string]interface{}{"hello": "world"} + action := &fleetapi.ActionConfigChange{ ActionID: "abc123", - ActionType: "POLICY_CHANGE", - Policy: policy, + ActionType: "CONFIG_CHANGE", + Config: conf, } - handler := &handlerPolicyChange{log: log, emitter: emitter.Emitter} + handler := &handlerConfigChange{log: log, emitter: emitter.Emitter} err := handler.Handle(context.Background(), action, ack) require.NoError(t, err) - require.Equal(t, config.MustNewConfigFrom(policy), emitter.policy) + require.Equal(t, config.MustNewConfigFrom(conf), emitter.policy) }) - t.Run("Receive a policy and fail to emits a raw configuration", func(t *testing.T) { + t.Run("Receive a config and fail to emits a raw configuration", func(t *testing.T) { mockErr := errors.New("error returned") emitter := &mockEmitter{err: mockErr} - policy := map[string]interface{}{"hello": "world"} - action := &fleetapi.ActionPolicyChange{ + conf := map[string]interface{}{"hello": "world"} + action := &fleetapi.ActionConfigChange{ ActionID: "abc123", - ActionType: "POLICY_CHANGE", - Policy: policy, + ActionType: "CONFIG_CHANGE", + Config: conf, } - handler := &handlerPolicyChange{log: log, emitter: emitter.Emitter} + handler := &handlerConfigChange{log: log, emitter: emitter.Emitter} err := handler.Handle(context.Background(), action, ack) require.Error(t, err) @@ -69,21 +69,21 @@ func TestPolicyChange(t *testing.T) { func TestPolicyAcked(t *testing.T) { log, _ := logger.New() - t.Run("Policy change should not ACK on error", func(t *testing.T) { + t.Run("Config change should not ACK on error", func(t *testing.T) { tacker := &testAcker{} mockErr := errors.New("error returned") emitter := &mockEmitter{err: mockErr} - policy := map[string]interface{}{"hello": "world"} + config := map[string]interface{}{"hello": "world"} actionID := "abc123" - action := &fleetapi.ActionPolicyChange{ + action := &fleetapi.ActionConfigChange{ ActionID: actionID, - ActionType: "POLICY_CHANGE", - Policy: policy, + ActionType: "CONFIG_CHANGE", + Config: config, } - handler := &handlerPolicyChange{log: log, emitter: emitter.Emitter} + handler := &handlerConfigChange{log: log, emitter: emitter.Emitter} err := handler.Handle(context.Background(), action, tacker) require.Error(t, err) @@ -92,20 +92,20 @@ func TestPolicyAcked(t *testing.T) { assert.EqualValues(t, 0, len(actions)) }) - t.Run("Policy change should ACK", func(t *testing.T) { + t.Run("Config change should ACK", func(t *testing.T) { tacker := &testAcker{} emitter := &mockEmitter{} - policy := map[string]interface{}{"hello": "world"} + config := map[string]interface{}{"hello": "world"} actionID := "abc123" - action := &fleetapi.ActionPolicyChange{ + action := &fleetapi.ActionConfigChange{ ActionID: actionID, - ActionType: "POLICY_CHANGE", - Policy: policy, + ActionType: "CONFIG_CHANGE", + Config: config, } - handler := &handlerPolicyChange{log: log, emitter: emitter.Emitter} + handler := &handlerConfigChange{log: log, emitter: emitter.Emitter} err := handler.Handle(context.Background(), action, tacker) require.NoError(t, err) diff --git a/x-pack/agent/pkg/agent/application/managed_mode.go b/x-pack/agent/pkg/agent/application/managed_mode.go index 1bf0e3334daa..9670ad3c88de 100644 --- a/x-pack/agent/pkg/agent/application/managed_mode.go +++ b/x-pack/agent/pkg/agent/application/managed_mode.go @@ -144,8 +144,8 @@ func newManaged( } actionDispatcher.MustRegister( - &fleetapi.ActionPolicyChange{}, - &handlerPolicyChange{ + &fleetapi.ActionConfigChange{}, + &handlerConfigChange{ log: log, emitter: emit, }, diff --git a/x-pack/agent/pkg/agent/application/monitoring_decorator.go b/x-pack/agent/pkg/agent/application/monitoring_decorator.go index a008e5a0a2e5..e354650666c1 100644 --- a/x-pack/agent/pkg/agent/application/monitoring_decorator.go +++ b/x-pack/agent/pkg/agent/application/monitoring_decorator.go @@ -5,19 +5,24 @@ package application import ( + "fmt" + "github.com/elastic/beats/x-pack/agent/pkg/agent/program" "github.com/elastic/beats/x-pack/agent/pkg/agent/transpiler" ) const ( - monitoringName = "FLEET_MONITORING" - programsKey = "programs" - monitoringKey = "monitoring" - monitoringOutputKey = "monitoring.elasticsearch" - enabledKey = "monitoring.enabled" - outputKey = "output" - outputsKey = "outputs" - typeKey = "type" + monitoringName = "FLEET_MONITORING" + programsKey = "programs" + monitoringKey = "settings.monitoring" + monitoringUseOutputKey = "settings.monitoring.use_output" + monitoringOutputFormatKey = "outputs.%s" + outputKey = "output" + + enabledKey = "settings.monitoring.enabled" + outputsKey = "outputs" + elasticsearchKey = "elasticsearch" + typeKey = "type" ) func injectMonitoring(outputGroup string, rootAst *transpiler.AST, programsToRun []program.Program) ([]program.Program, error) { @@ -35,8 +40,21 @@ func injectMonitoring(outputGroup string, rootAst *transpiler.AST, programsToRun config = make(map[string]interface{}) config[enabledKey] = false } else { + // get monitoring output name to be used + useOutputNode, found := transpiler.Lookup(rootAst, monitoringUseOutputKey) + if !found { + return programsToRun, nil + } + + monitoringOutputNameKey, ok := useOutputNode.Value().(*transpiler.StrVal) + if !ok { + return programsToRun, nil + } + + monitoringOutputName := monitoringOutputNameKey.String() + ast := rootAst.Clone() - if err := getMonitoringRule(outputGroup).Apply(ast); err != nil { + if err := getMonitoringRule(monitoringOutputName).Apply(ast); err != nil { return programsToRun, err } @@ -63,8 +81,10 @@ func injectMonitoring(outputGroup string, rootAst *transpiler.AST, programsToRun } func getMonitoringRule(outputName string) *transpiler.RuleList { + monitoringOutputSelector := fmt.Sprintf(monitoringOutputFormatKey, outputName) return transpiler.NewRuleList( - transpiler.Copy(monitoringOutputKey, outputKey), + transpiler.Copy(monitoringOutputSelector, outputKey), + transpiler.Rename(fmt.Sprintf("%s.%s", outputsKey, outputName), elasticsearchKey), transpiler.Filter(monitoringKey, programsKey, outputKey), ) } diff --git a/x-pack/agent/pkg/agent/application/monitoring_decorator_test.go b/x-pack/agent/pkg/agent/application/monitoring_decorator_test.go index 32935eca8c9d..892983acafff 100644 --- a/x-pack/agent/pkg/agent/application/monitoring_decorator_test.go +++ b/x-pack/agent/pkg/agent/application/monitoring_decorator_test.go @@ -83,16 +83,11 @@ GROUPLOOP: } var inputConfigMap = map[string]interface{}{ - "monitoring": map[string]interface{}{ - "enabled": true, - "logs": true, - "metrics": true, - "elasticsearch": map[string]interface{}{ - "index_name": "general", - "pass": "xxx", - "url": "xxxxx", - "username": "monitoring-uname", - }, + "settings.monitoring": map[string]interface{}{ + "enabled": true, + "logs": true, + "metrics": true, + "use_output": "monitoring", }, "outputs": map[string]interface{}{ "default": map[string]interface{}{ @@ -111,33 +106,39 @@ var inputConfigMap = map[string]interface{}{ "url": "xxxxx", "username": "xxx", }, + "monitoring": map[string]interface{}{ + "type": "elasticsearch", + "index_name": "general", + "pass": "xxx", + "url": "xxxxx", + "username": "monitoring-uname", + }, }, - "streams": []interface{}{ + "datasources": []map[string]interface{}{ map[string]interface{}{ - "type": "log", - "path": "/xxxx", - "processors": []interface{}{ + "inputs": []map[string]interface{}{ map[string]interface{}{ - "dissect": map[string]interface{}{ - "tokenizer": "---", + "type": "log", + "streams": []map[string]interface{}{ + map[string]interface{}{"paths": "/xxxx"}, + }, + "processors": []interface{}{ + map[string]interface{}{ + "dissect": map[string]interface{}{ + "tokenizer": "---", + }, + }, }, - }, - }, - "output": map[string]interface{}{ - "override": map[string]interface{}{ - "index_name": "my_service_logs", - "ingest_pipeline": "process_logs", }, }, }, map[string]interface{}{ - "type": "metric/system", - "username": "xxxx", - "pass": "yyy", - "output": map[string]interface{}{ - "index_name": "mysql_metrics", - "use_output": "infosec1", + "inputs": []map[string]interface{}{ + map[string]interface{}{ + "type": "metrics/system", + }, }, + "use_output": "infosec1", }, }, } diff --git a/x-pack/agent/pkg/agent/operation/config/config.go b/x-pack/agent/pkg/agent/operation/config/config.go index 3f141c1d020a..7b8ca81ae31c 100644 --- a/x-pack/agent/pkg/agent/operation/config/config.go +++ b/x-pack/agent/pkg/agent/operation/config/config.go @@ -18,5 +18,5 @@ type Config struct { DownloadConfig *artifact.Config `yaml:"download" config:"download"` - MonitoringConfig *monitoring.Config `yaml:"monitoring" config:"monitoring"` + MonitoringConfig *monitoring.Config `yaml:"settings.monitoring" config:"settings.monitoring"` } diff --git a/x-pack/agent/pkg/agent/operation/monitoring.go b/x-pack/agent/pkg/agent/operation/monitoring.go index 9358c667a8d1..32c50be4b9b4 100644 --- a/x-pack/agent/pkg/agent/operation/monitoring.go +++ b/x-pack/agent/pkg/agent/operation/monitoring.go @@ -16,6 +16,7 @@ import ( const ( monitoringName = "FLEET_MONITORING" + settingsKey = "settings" monitoringKey = "monitoring" outputKey = "output" monitoringEnabledSubkey = "enabled" @@ -97,15 +98,27 @@ func monitoringTags() map[app.Tag]string { } func isMonitoringEnabled(logger *logger.Logger, cfg map[string]interface{}) bool { - monitoringVal, found := cfg[monitoringKey] + settingsVal, found := cfg[settingsKey] if !found { - logger.Error("operator.isMonitoringEnabled: monitoring not found in config") + logger.Error("operator.isMonitoringEnabled: settings not found in config") + return false + } + + settingsMap, ok := settingsVal.(map[string]interface{}) + if !ok { + logger.Error("operator.isMonitoringEnabled: settings not a map") + return false + } + + monitoringVal, found := settingsMap[monitoringKey] + if !found { + logger.Error("operator.isMonitoringEnabled: settings.monitoring not found in config") return false } monitoringMap, ok := monitoringVal.(map[string]interface{}) if !ok { - logger.Error("operator.isMonitoringEnabled: monitoring not a map") + logger.Error("operator.isMonitoringEnabled: settings.monitoring not a map") return false } diff --git a/x-pack/agent/pkg/agent/program/program.go b/x-pack/agent/pkg/agent/program/program.go index 22f179fe43bf..f3231f76b124 100644 --- a/x-pack/agent/pkg/agent/program/program.go +++ b/x-pack/agent/pkg/agent/program/program.go @@ -115,7 +115,7 @@ func groupByOutputs(single *transpiler.AST) (map[string]*transpiler.AST, error) const ( outputsKey = "outputs" outputKey = "output" - streamsKey = "streams" + streamsKey = "datasources" typeKey = "type" ) @@ -227,23 +227,15 @@ func groupByOutputs(single *transpiler.AST) (map[string]*transpiler.AST, error) func findOutputName(m map[string]interface{}) string { const ( defaultOutputName = "default" - outputKey = "output" useOutputKey = "use_output" ) - output, ok := m[outputKey] + output, ok := m[useOutputKey] if !ok { return defaultOutputName } - o := output.(map[string]interface{}) - - name, ok := o[useOutputKey] - if !ok { - return defaultOutputName - } - - return name.(string) + return output.(string) } func cloneMap(m map[string]interface{}) map[string]interface{} { diff --git a/x-pack/agent/pkg/agent/program/program_test.go b/x-pack/agent/pkg/agent/program/program_test.go index 89e192345134..1602c6294fd1 100644 --- a/x-pack/agent/pkg/agent/program/program_test.go +++ b/x-pack/agent/pkg/agent/program/program_test.go @@ -36,141 +36,27 @@ func TestGroupBy(t *testing.T) { "password": "anotherpassword", }, }, - "streams": []map[string]interface{}{ - map[string]interface{}{ - "type": "log", - "path": "/var/log/hello.log", - "output": map[string]interface{}{ - "use_output": "special", - }, - }, - map[string]interface{}{ - "type": "metrics/system", - "output": map[string]interface{}{ - "use_output": "special", - }, - }, - map[string]interface{}{ - "type": "log", - "path": "/var/log/infosec.log", - "output": map[string]interface{}{ - "use_output": "infosec1", - "pipeline": "custompipeline", - "index_name": "myindex", - }, - }, - }, - } - - ast, err := transpiler.NewAST(sConfig) - require.NoError(t, err) - - grouped, err := groupByOutputs(ast) - require.NoError(t, err) - require.Equal(t, 2, len(grouped)) - - c1 := transpiler.MustNewAST(map[string]interface{}{ - "output": map[string]interface{}{ - "elasticsearch": map[string]interface{}{ - "hosts": "xxx", - "username": "myusername", - "password": "mypassword", - }, - }, - "streams": []map[string]interface{}{ - map[string]interface{}{ - "type": "log", - "path": "/var/log/hello.log", - "output": map[string]interface{}{ - "use_output": "special", - }, - }, - map[string]interface{}{ - "type": "metrics/system", - "output": map[string]interface{}{ - "use_output": "special", - }, - }, - }, - }) - - c2, _ := transpiler.NewAST(map[string]interface{}{ - "output": map[string]interface{}{ - "elasticsearch": map[string]interface{}{ - "hosts": "yyy", - "username": "anotherusername", - "password": "anotherpassword", - }, - }, - "streams": []map[string]interface{}{ - map[string]interface{}{ - "type": "log", - "path": "/var/log/infosec.log", - "output": map[string]interface{}{ - "use_output": "infosec1", - "pipeline": "custompipeline", - "index_name": "myindex", - }, - }, - }, - }) - - defaultConfig, ok := grouped["special"] - require.True(t, ok) - require.Equal(t, c1.Hash(), defaultConfig.Hash()) - - infosec1Config, ok := grouped["infosec1"] - require.True(t, ok) - require.Equal(t, c2.Hash(), infosec1Config.Hash()) - }) - - t.Run("copy any top level configuration options to each configuration", func(t *testing.T) { - sConfig := map[string]interface{}{ - "monitoring": map[string]interface{}{ - "elasticsearch": map[string]interface{}{ - "hosts": "127.0.0.1", - }, - }, - "keystore": map[string]interface{}{ - "path": "${path.data}/keystore", - }, - "outputs": map[string]interface{}{ - "special": map[string]interface{}{ - "type": "elasticsearch", - "hosts": "xxx", - "username": "myusername", - "password": "mypassword", - }, - "infosec1": map[string]interface{}{ - "type": "elasticsearch", - "hosts": "yyy", - "username": "anotherusername", - "password": "anotherpassword", - }, - }, - "streams": []map[string]interface{}{ + "datasources": []map[string]interface{}{ map[string]interface{}{ - "type": "log", - "path": "/var/log/hello.log", - "output": map[string]interface{}{ - "use_output": "special", + "inputs": map[string]interface{}{ + "type": "log", + "streams": map[string]interface{}{"paths": "/var/log/hello.log"}, }, + "use_output": "special", }, map[string]interface{}{ - "type": "metrics/system", - "output": map[string]interface{}{ - "use_output": "special", + "inputs": map[string]interface{}{ + "type": "system/metrics", }, + "use_output": "special", }, map[string]interface{}{ - "type": "log", - "path": "/var/log/infosec.log", - "output": map[string]interface{}{ - "use_output": "infosec1", - "pipeline": "custompipeline", - "index_name": "myindex", + "inputs": map[string]interface{}{ + "type": "log", + "streams": map[string]interface{}{"paths": "/var/log/infosec.log"}, }, + "use_output": "infosec1", }, }, } @@ -190,29 +76,21 @@ func TestGroupBy(t *testing.T) { "password": "mypassword", }, }, - "streams": []map[string]interface{}{ + "datasources": []map[string]interface{}{ map[string]interface{}{ - "type": "log", - "path": "/var/log/hello.log", - "output": map[string]interface{}{ - "use_output": "special", + "inputs": map[string]interface{}{ + "type": "log", + "streams": map[string]interface{}{"paths": "/var/log/hello.log"}, }, + "use_output": "special", }, map[string]interface{}{ - "type": "metrics/system", - "output": map[string]interface{}{ - "use_output": "special", + "inputs": map[string]interface{}{ + "type": "system/metrics", }, + "use_output": "special", }, }, - "monitoring": map[string]interface{}{ - "elasticsearch": map[string]interface{}{ - "hosts": "127.0.0.1", - }, - }, - "keystore": map[string]interface{}{ - "path": "${path.data}/keystore", - }, }) c2, _ := transpiler.NewAST(map[string]interface{}{ @@ -223,25 +101,15 @@ func TestGroupBy(t *testing.T) { "password": "anotherpassword", }, }, - "streams": []map[string]interface{}{ + "datasources": []map[string]interface{}{ map[string]interface{}{ - "type": "log", - "path": "/var/log/infosec.log", - "output": map[string]interface{}{ - "use_output": "infosec1", - "pipeline": "custompipeline", - "index_name": "myindex", + "inputs": map[string]interface{}{ + "type": "log", + "streams": map[string]interface{}{"paths": "/var/log/infosec.log"}, }, + "use_output": "infosec1", }, }, - "monitoring": map[string]interface{}{ - "elasticsearch": map[string]interface{}{ - "hosts": "127.0.0.1", - }, - }, - "keystore": map[string]interface{}{ - "path": "${path.data}/keystore", - }, }) defaultConfig, ok := grouped["special"] @@ -275,22 +143,27 @@ func TestGroupBy(t *testing.T) { "password": "anotherpassword", }, }, - "streams": []map[string]interface{}{ + + "datasources": []map[string]interface{}{ map[string]interface{}{ - "type": "log", - "path": "/var/log/hello.log", + "inputs": map[string]interface{}{ + "type": "log", + "streams": map[string]interface{}{"paths": "/var/log/hello.log"}, + }, + "use_output": "special", }, map[string]interface{}{ - "type": "metrics/system", + "inputs": map[string]interface{}{ + "type": "system/metrics", + }, + "use_output": "special", }, map[string]interface{}{ - "type": "log", - "path": "/var/log/infosec.log", - "output": map[string]interface{}{ - "use_output": "donotexist", - "pipeline": "custompipeline", - "index_name": "myindex", + "inputs": map[string]interface{}{ + "type": "log", + "streams": map[string]interface{}{"paths": "/var/log/infosec.log"}, }, + "use_output": "donotexist", }, }, } @@ -318,17 +191,23 @@ func TestGroupBy(t *testing.T) { "password": "anotherpassword", }, }, - "streams": []map[string]interface{}{ + "datasources": []map[string]interface{}{ map[string]interface{}{ - "type": "log", - "path": "/var/log/hello.log", + "inputs": map[string]interface{}{ + "type": "log", + "streams": map[string]interface{}{"paths": "/var/log/hello.log"}, + }, }, map[string]interface{}{ - "type": "metrics/system", + "inputs": map[string]interface{}{ + "type": "system/metrics", + }, }, map[string]interface{}{ - "type": "log", - "path": "/var/log/infosec.log", + "inputs": map[string]interface{}{ + "type": "log", + "streams": map[string]interface{}{"paths": "/var/log/infosec.log"}, + }, }, }, } @@ -348,17 +227,23 @@ func TestGroupBy(t *testing.T) { "password": "mypassword", }, }, - "streams": []map[string]interface{}{ + "datasources": []map[string]interface{}{ map[string]interface{}{ - "type": "log", - "path": "/var/log/hello.log", + "inputs": map[string]interface{}{ + "type": "log", + "streams": map[string]interface{}{"paths": "/var/log/hello.log"}, + }, }, map[string]interface{}{ - "type": "metrics/system", + "inputs": map[string]interface{}{ + "type": "system/metrics", + }, }, map[string]interface{}{ - "type": "log", - "path": "/var/log/infosec.log", + "inputs": map[string]interface{}{ + "type": "log", + "streams": map[string]interface{}{"paths": "/var/log/infosec.log"}, + }, }, }, }) @@ -388,22 +273,24 @@ func TestGroupBy(t *testing.T) { "password": "anotherpassword", }, }, - "streams": []map[string]interface{}{ + "datasources": []map[string]interface{}{ map[string]interface{}{ - "type": "log", - "path": "/var/log/hello.log", + "inputs": map[string]interface{}{ + "type": "log", + "streams": map[string]interface{}{"paths": "/var/log/hello.log"}, + }, }, map[string]interface{}{ - "type": "metrics/system", + "inputs": map[string]interface{}{ + "type": "system/metrics", + }, }, map[string]interface{}{ - "type": "log", - "path": "/var/log/infosec.log", - "output": map[string]interface{}{ - "use_output": "infosec1", - "pipeline": "custompipeline", - "index_name": "myindex", + "inputs": map[string]interface{}{ + "type": "log", + "streams": map[string]interface{}{"paths": "/var/log/infosec.log"}, }, + "use_output": "infosec1", }, }, } @@ -423,13 +310,17 @@ func TestGroupBy(t *testing.T) { "password": "mypassword", }, }, - "streams": []map[string]interface{}{ + "datasources": []map[string]interface{}{ map[string]interface{}{ - "type": "log", - "path": "/var/log/hello.log", + "inputs": map[string]interface{}{ + "type": "log", + "streams": map[string]interface{}{"paths": "/var/log/hello.log"}, + }, }, map[string]interface{}{ - "type": "metrics/system", + "inputs": map[string]interface{}{ + "type": "system/metrics", + }, }, }, }) @@ -442,15 +333,13 @@ func TestGroupBy(t *testing.T) { "password": "anotherpassword", }, }, - "streams": []map[string]interface{}{ + "datasources": []map[string]interface{}{ map[string]interface{}{ - "type": "log", - "path": "/var/log/infosec.log", - "output": map[string]interface{}{ - "use_output": "infosec1", - "pipeline": "custompipeline", - "index_name": "myindex", + "inputs": map[string]interface{}{ + "type": "log", + "streams": map[string]interface{}{"paths": "/var/log/infosec.log"}, }, + "use_output": "infosec1", }, }, }) @@ -481,7 +370,7 @@ func TestGroupBy(t *testing.T) { "password": "anotherpassword", }, }, - "streams": []map[string]interface{}{}, + "datasources": []map[string]interface{}{}, } ast, err := transpiler.NewAST(sConfig) diff --git a/x-pack/agent/pkg/agent/program/supported.go b/x-pack/agent/pkg/agent/program/supported.go index b0d2d68b6c61..0e87a0a0cdca 100644 --- a/x-pack/agent/pkg/agent/program/supported.go +++ b/x-pack/agent/pkg/agent/program/supported.go @@ -19,7 +19,7 @@ func init() { // Packed Files // spec/filebeat.yml // spec/metricbeat.yml - unpacked := packer.MustUnpack("eJyklU2TozYQhu/5GXNNKuFjmSpStQfDBAHD4DW2JaEbkhyBLQE1YDOQyn9PgT/GdmZ2D3uiaKTutxu9j/55aOoN++PvQm7oJmt/75V8+POBKq8lq0okSjYEhTLFi+fU0HfzwlEZepOpgjmf1TlTfJgXDg0K3QuKTgRlLLkPu0jJhi4tSZVXUAB33xDJqR/Lac392jKRFDtNihMZKbhPUdgQtLCJ8hpmrIvInRXR+vikyNuniEuK4J67VkuNRH7DomXA22a9riiAkrtBE7hBmyzHZ9imyMqJAVuCLO06P/dDnSxD7qr4QBWpiQn76duqEsQMu7Sre47eJBuq52jptCmePbrFTASu06U4qeaFM3A/lCnSh1P8sj5wx3dNcD/MaRlPvc1FJY7xmQh8J+dAPAZ+YjGwfr7E3bGO0xEcnGpd4pKrsWd7mv+8cBpqsKt9MxEAaEw9mrFGgNxP+kBS0+u67kwQTCQtFwcO4m5eOFpqzj7JA3fsqRIcJd2dlrH2gQB7mxmwnxfOjhrxK8HBXZ63A+ntnqCkZro9UGCbFNjlvHCOse5W1/gtMmOLGW8HMlSCmXLcU7qFJoiSDQewx+Z4juD7jP1YUgC3HNj9B//jleDd7Yz9u/2uJo41Jg1t4B9n9/4Od2x21Bktr2eliXP/72tDSYAcrnIde+mqNgDxgfm78/lRTNntB3o1OlRTr2fPRctxVlZO0fp+77sPXevDPBmyOo6T4Xp+5zPIDNgQFGvUDIdzLFpee/sSO3CcdBwvLn1nBrSYmRyYWj9zI5d0W4kl8IYlHn38UoW4pVc97M86MNbqwE1V8FeeM23sCQ6jb8lSlNSEGvOhFvadCA3YpDjWMhQPBHl9aogycmclU3CX4ZcymrTx1xSR13TJmsDlDTX4wIE3ZC6rXfH168NvR6ypTftasA/AtkJQY0puTyDbUrR4DIAuuR/WqXECHg5LNlQC9ReYDAQnOnOtmgJtP/6YVL1J8qSrFL0NZHkDnvNajSC9o8DTyI+AqPScKq8kSB8NuqfI3pGV/iXCTp4aTUvGWnjxPSC+58dJz9G6wIt6Sw1LZYjrbPq2fgwA7xl4eWYlbAh+efzQYLptZjipsBlKYsAv7OlkGNDKzaoS3JcdWZwOk1o/Bp7VEpz0GXo5REXzA3NqgiKvuxxG4GnZ02j4pCfIa89geNd3NlTSp8gaplnCT/S5N4DWNtiR1/C61/r8ZC++HQHxa1Q09fVabshJV3gDaU2w6RLRb4EOSE8N7SOgT2Yc/+V4OYy9p4bdbVbneZ73xRpTMKf45TS75DAvnJPOuCDI08a997VJGR7oasx/dZ4/AcJZA5uARGrqJ5Jt/w/SGzje+OTnoHCncby8dQrgz4HhCIPjhdJPMPg+GP795b8AAAD//2tuClk=") + unpacked := packer.MustUnpack("eJykVltzq7oVfu/PyHOnBRF7Dp05D4ZTbnHINk4koTckOYAtGRqwMXT63zsCgy9J9u6ZPuzxjpDW9Vvft/79UJUb9vf3XGzoJqn/1krx8I8HKp2avBZpJEVFUCBivHqKgb57yS2ZoJOIJcz4osyY5N1LblE/1x0/b1J/HwruwWYpRUXXM0Glk1MX7n4gklEvFP2d+7v7SFBsVTGOxFLCQ4yCiqCVSaRTMfCWL+1FvnwbfilyDjHigiJ44PaspiASP3BaM9fZJq0uqQsFt/3Kt/06WqvfoI7RLCMA1gTNtGv73At0sg64LcMjlaQkBmz7b69FSoygiZuy5egkWFc8LddWSWUpYiN6T9BsR3A6t/NF6tuWtsGWeMmtigLe2WlR+y585F6QqbvUFR13zJK7sFZ2fHuR+p6VcTed+26UcdfpqPILYHe21999ya2S7i2de8/nN9HxchZ23AtEjPSzPz1jf9zbFgfmQY0ZocYkzCge7QQ6dWH3khap+nu5trbUsGbYiI4YnEpmrM55Df8SHAn6WqTcEw1ZFdP5OZ5LHMOb2nfDI/PEO3fNIfcxLvsmt4ZJc0tw2FEj6K79cfe3ue9dYrZzLSU405h0ttwxqwSHGgZCI0ifcmCuoyV/FGmCZg3HUceMqCXIqVlT3ORwf977GuvU9D2uY7wY+9rEOCo+9+FT/VRsBwZOGXffbmo3xvWpdqpfniXY9ro26vx0JMbzvP9tru7bi5RgIuh+dSRqHl+LlLrm/tqXb1uCSzUTZsfdqKTbPt9dgh7vfEDQz4ERbZmKzQ2bb+zoxFvMfQ/u2OI2FuV7CaJjDGqVQ0pcc5sA2N7ZqShgRybhLsHhOwOnIwenI+mKdDh7/px7a3YbHKp3CgMz9YY0xYABKSruwhYbikeguMxSKKgLt9w120+9cmuxucMul07F0U3/lL9zLNpdzbSUo6iZsO1F27EWy7W1oyD8INi/vL3JS9kSXd+nfkZJSb1IMGGCGJ10gp9HnHVE8akRHVUtRwyfv30QvJv77sRZ55zvaqDiVFy26jHcz/OYH9kHR4X9O7sa7Yq+piO3X9d4misAK4JCTc3oVb2u+H86O3IcNRyvpngSAGdM5STfnjjIhMLj2nW6tZpF/FwEuKZX/g9jfBhrpW/H0v9nljFtllEEO8XtZJ3uqQE1NatB26QBgFWMQy1BYUeQ08Yg3S/txf7cg/2yj41/xIh8xGtW+Tbv+VnxbWKz0k5///3hr4P0yU39kbMvxO8VQY1JsT2L3Zai1dx3dcG9oIzBWRRxsGddkaJ2EpyO4Ehn9qykrnYgaJbF8iTIH7qM0akj6xtxGu8qQmuo62jkV6Ip9YxKZ0+QrgjhQJG5I6/64xJbWQyqmihfePUz0bzYx1HL0VuOV+WWgplMENdZ/+1t7ru8Ze7zE9vDagCrpkD0LwUIDMSBuPBxBEU/YIqUzvU6k6nYeFEb9/eVsETvydD8e4LdxTjKGDB1JkNxT9pfAfcLov8FeYd3QmVNwj4CNgZms4GmRvVBZMbcBiJRgqPNfS+aMfftWtD6eO6F5X8VVeYFRyX8DJgta34irn9qqfhG+Kcc7heC68Xi+elrsgrfORBa4pgtQVxsvKnOA0Fd18UL1DK0WebVccJDu/opWf+/BD/N2OJbktfYHopfLkvnHlwwPxF/G6NZ188nNI0ERwU2AkEAfGS3C86Iu34pvFkG5Nvcdx6LH/rit2FeHg9P7e5zjQY7vY+XfLG7Fq/l2mpVrhTdLhojyX+zpKklWacTbrWUIueDQDNj+yDbvH65oNUER21Prnj1Ce9TzOBSsyE2/cglfFfkfzs/Q5+u311qGwiKTECg2WPpZk6Undya+OpPvDlzCtSGnquldLxnSSbN7xbsMe+cIGd4q3wNenDGHGkp0O514PAVTylu7nlm4CclnBrB/mfh319h7U5fpp6dxe9yrxYb3IuzsNPSSFx4eMktP8Zh0PNCm4qNXt/ks5T6kXgK2yF/X+/SII97UX5V+eGwiVEonmw+inkvqASdMmZEZWyEIsbBNrFZpWIIFP5d0Q1iXGdE1tnwf7XEhlqMwyJod08P//nLfwMAAP//QEreuQ==") SupportedMap = make(map[string]bool) for f, v := range unpacked { diff --git a/x-pack/agent/pkg/agent/program/testdata/enabled_false.yml b/x-pack/agent/pkg/agent/program/testdata/enabled_false.yml index 08ef6b70cd9d..c7c11a9017de 100644 --- a/x-pack/agent/pkg/agent/program/testdata/enabled_false.yml +++ b/x-pack/agent/pkg/agent/program/testdata/enabled_false.yml @@ -1,9 +1,12 @@ -streams: - - type: event/file - enabled: false - paths: - - /var/log/hello1.log - - /var/log/hello2.log +datasources: + - use_output: default + inputs: + - type: event/file + streams: + - enabled: false + paths: + - var/log/hello1.log + - var/log/hello2.log management: host: "localhost" config: diff --git a/x-pack/agent/pkg/agent/program/testdata/enabled_output_false.yml b/x-pack/agent/pkg/agent/program/testdata/enabled_output_false.yml index 94dc21d8bc8c..1bb3a9896f95 100644 --- a/x-pack/agent/pkg/agent/program/testdata/enabled_output_false.yml +++ b/x-pack/agent/pkg/agent/program/testdata/enabled_output_false.yml @@ -1,8 +1,11 @@ -streams: - - type: event/file - paths: - - /var/log/hello1.log - - /var/log/hello2.log +datasources: + - use_output: default + inputs: + - type: event/file + streams: + - paths: + - /var/log/hello1.log + - /var/log/hello2.log management: host: "localhost" config: diff --git a/x-pack/agent/pkg/agent/program/testdata/enabled_output_true-filebeat.yml b/x-pack/agent/pkg/agent/program/testdata/enabled_output_true-filebeat.yml index 59ea4790801d..2dcc22e14b89 100644 --- a/x-pack/agent/pkg/agent/program/testdata/enabled_output_true-filebeat.yml +++ b/x-pack/agent/pkg/agent/program/testdata/enabled_output_true-filebeat.yml @@ -4,6 +4,7 @@ filebeat: paths: - /var/log/hello1.log - /var/log/hello2.log + index: logs-generic-default output: elasticsearch: enabled: true diff --git a/x-pack/agent/pkg/agent/program/testdata/enabled_output_true.yml b/x-pack/agent/pkg/agent/program/testdata/enabled_output_true.yml index 9b3fb3c065aa..a89d2cf0b7b6 100644 --- a/x-pack/agent/pkg/agent/program/testdata/enabled_output_true.yml +++ b/x-pack/agent/pkg/agent/program/testdata/enabled_output_true.yml @@ -1,8 +1,11 @@ -streams: - - type: event/file - paths: - - /var/log/hello1.log - - /var/log/hello2.log +datasources: + - use_output: default + inputs: + - type: event/file + streams: + - paths: + - /var/log/hello1.log + - /var/log/hello2.log management: host: "localhost" config: diff --git a/x-pack/agent/pkg/agent/program/testdata/enabled_true-filebeat.yml b/x-pack/agent/pkg/agent/program/testdata/enabled_true-filebeat.yml index d5577f79cff0..141e9d95d6dc 100644 --- a/x-pack/agent/pkg/agent/program/testdata/enabled_true-filebeat.yml +++ b/x-pack/agent/pkg/agent/program/testdata/enabled_true-filebeat.yml @@ -5,6 +5,7 @@ filebeat: paths: - /var/log/hello1.log - /var/log/hello2.log + index: logs-generic-default output: elasticsearch: hosts: diff --git a/x-pack/agent/pkg/agent/program/testdata/enabled_true.yml b/x-pack/agent/pkg/agent/program/testdata/enabled_true.yml index ddbcb2929df5..91caed69c762 100644 --- a/x-pack/agent/pkg/agent/program/testdata/enabled_true.yml +++ b/x-pack/agent/pkg/agent/program/testdata/enabled_true.yml @@ -1,9 +1,17 @@ -streams: - - type: event/file - enabled: true - paths: - - /var/log/hello1.log - - /var/log/hello2.log +name: Production Website DB Servers +fleet: + kibana_url: https://kibana.mydomain.com:5601 + ca_hash: 7HIpactkIAq2Y49orFOOQKurWxmmSFZhBCoQYcRhJ3Y= + checkin_interval: 5m +datasources: + - use_output: default + inputs: + - type: event/file + streams: + - enabled: true + paths: + - /var/log/hello1.log + - /var/log/hello2.log management: host: "localhost" config: diff --git a/x-pack/agent/pkg/agent/program/testdata/single_config-filebeat.yml b/x-pack/agent/pkg/agent/program/testdata/single_config-filebeat.yml index 257b38017f0d..c61497d34047 100644 --- a/x-pack/agent/pkg/agent/program/testdata/single_config-filebeat.yml +++ b/x-pack/agent/pkg/agent/program/testdata/single_config-filebeat.yml @@ -4,6 +4,7 @@ filebeat: paths: - /var/log/hello1.log - /var/log/hello2.log + index: logs-generic-default output: elasticsearch: hosts: @@ -11,3 +12,5 @@ output: - 127.0.0.1:9300 username: elastic password: changeme + api_key: TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA + ca_sha256: 7HIpactkIAq2Y49orFOOQKurWxmmSFZhBCoQYcRhJ3Y= diff --git a/x-pack/agent/pkg/agent/program/testdata/single_config-metricbeat.yml b/x-pack/agent/pkg/agent/program/testdata/single_config-metricbeat.yml index 8d091fcdd60d..e4508d88a393 100644 --- a/x-pack/agent/pkg/agent/program/testdata/single_config-metricbeat.yml +++ b/x-pack/agent/pkg/agent/program/testdata/single_config-metricbeat.yml @@ -1,11 +1,15 @@ metricbeat: modules: - module: docker - setting: one + metricsets: [status] + index: metrics-docker.status-default - module: apache - setting: two + metricsets: [info] + index: metrics-generic-testing output: elasticsearch: hosts: [127.0.0.1:9200, 127.0.0.1:9300] username: elastic password: changeme + api_key: TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA + ca_sha256: 7HIpactkIAq2Y49orFOOQKurWxmmSFZhBCoQYcRhJ3Y= diff --git a/x-pack/agent/pkg/agent/program/testdata/single_config.yml b/x-pack/agent/pkg/agent/program/testdata/single_config.yml index 888a769b7f70..ad228c30591c 100644 --- a/x-pack/agent/pkg/agent/program/testdata/single_config.yml +++ b/x-pack/agent/pkg/agent/program/testdata/single_config.yml @@ -1,19 +1,49 @@ -streams: - - type: metric/docker - setting: one - - type: metric/apache - setting: two - - type: event/file - paths: - - /var/log/hello1.log - - /var/log/hello2.log -management: - host: "localhost" -config: - reload: 123 +name: Production Website DB Servers +fleet: + kibana_url: https://kibana.mydomain.com:5601 + ca_hash: 7HIpactkIAq2Y49orFOOQKurWxmmSFZhBCoQYcRhJ3Y= + checkin_interval: 5m + outputs: default: type: elasticsearch hosts: [127.0.0.1:9200, 127.0.0.1:9300] username: elastic password: changeme + api_key: TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA + ca_sha256: 7HIpactkIAq2Y49orFOOQKurWxmmSFZhBCoQYcRhJ3Y= + + monitoring: + type: elasticsearch + api_key: VuaCfGcBCdbkQm-e5aOx:ui2lp2axTNmsyakw9tvNnw + hosts: ["monitoring:9200"] + ca_sha256: "7lHLiyp4J8m9kw38SJ7SURJP4bXRZv/BNxyyXkCcE/M=" + +datasources: + - use_output: default + inputs: + - type: docker/metrics + streams: + - metricset: status + dataset: docker.status + - type: logs + streams: + - paths: + - /var/log/hello1.log + - /var/log/hello2.log + - namespace: testing + use_output: default + inputs: + - type: apache/metrics + streams: + - enabled: true + metricset: info + +settings.monitoring: + use_output: monitoring + +management: + host: "localhost" + +config: + reload: 123 diff --git a/x-pack/agent/pkg/agent/transpiler/rules.go b/x-pack/agent/pkg/agent/transpiler/rules.go index 713477a21d1c..a8b9948818ab 100644 --- a/x-pack/agent/pkg/agent/transpiler/rules.go +++ b/x-pack/agent/pkg/agent/transpiler/rules.go @@ -47,6 +47,8 @@ func (r *RuleList) MarshalYAML() (interface{}, error) { switch rule.(type) { case *CopyRule: name = "copy" + case *CopyToListRule: + name = "copy_to_list" case *RenameRule: name = "rename" case *TranslateRule: @@ -63,6 +65,12 @@ func (r *RuleList) MarshalYAML() (interface{}, error) { name = "filter_values_with_regexp" case *ExtractListItemRule: name = "extract_list_items" + case *InjectIndexRule: + name = "inject_index" + case *MakeArrayRule: + name = "make_array" + case *RemoveKeyRule: + name = "remove_key" default: return nil, fmt.Errorf("unknown rule of type %T", rule) @@ -113,6 +121,8 @@ func (r *RuleList) UnmarshalYAML(unmarshal func(interface{}) error) error { switch name { case "copy": r = &CopyRule{} + case "copy_to_list": + r = &CopyToListRule{} case "rename": r = &RenameRule{} case "translate": @@ -129,6 +139,12 @@ func (r *RuleList) UnmarshalYAML(unmarshal func(interface{}) error) error { r = &FilterValuesWithRegexpRule{} case "extract_list_items": r = &ExtractListItemRule{} + case "inject_index": + r = &InjectIndexRule{} + case "make_array": + r = &MakeArrayRule{} + case "remove_key": + r = &RemoveKeyRule{} default: return fmt.Errorf("unknown rule of type %s", name) } @@ -143,6 +159,214 @@ func (r *RuleList) UnmarshalYAML(unmarshal func(interface{}) error) error { return nil } +// RemoveKeyRule removes key from a dict. +type RemoveKeyRule struct { + Key string +} + +// Apply applies remove key rule. +func (r *RemoveKeyRule) Apply(ast *AST) error { + sourceMap, ok := ast.root.(*Dict) + if !ok { + return nil + } + + for i, item := range sourceMap.value { + itemKey, ok := item.(*Key) + if !ok { + continue + } + + if itemKey.name != r.Key { + continue + } + + sourceMap.value = append(sourceMap.value[:i], sourceMap.value[i+1:]...) + return nil + } + return nil +} + +// RemoveKey creates a RemoveKeyRule +func RemoveKey(key string) *RemoveKeyRule { + return &RemoveKeyRule{ + Key: key, + } +} + +// MakeArrayRule transforms a single value into an array of length 1. +type MakeArrayRule struct { + Item Selector + To string +} + +// Apply applies make array rule. +func (r *MakeArrayRule) Apply(ast *AST) error { + sourceNode, found := Lookup(ast, r.Item) + if !found { + return nil + } + + newList := &List{ + value: make([]Node, 0, 1), + } + + sourceKey, ok := sourceNode.(*Key) + if !ok { + return nil + } + + newList.value = append(newList.value, sourceKey.value.Clone()) + return Insert(ast, newList, r.To) +} + +// MakeArray creates a MakeArrayRule +func MakeArray(item Selector, to string) *MakeArrayRule { + return &MakeArrayRule{ + Item: item, + To: to, + } +} + +// CopyToListRule is a rule which copies a specified +// node into every item in a provided list. +type CopyToListRule struct { + Item Selector + To string +} + +// Apply copies specified node into every item of the list. +func (r *CopyToListRule) Apply(ast *AST) error { + sourceNode, found := Lookup(ast, r.Item) + if !found { + // nothing to copy + return nil + } + + targetListNode, found := Lookup(ast, r.To) + if !found { + // nowhere to copy + return nil + } + + targetList, ok := targetListNode.Value().(*List) + if !ok { + return errors.New("target node is not a list") + } + + for _, listItem := range targetList.value { + listItemMap, ok := listItem.(*Dict) + if !ok { + continue + } + + listItemMap.value = append(listItemMap.value, sourceNode.Clone()) + } + + return nil +} + +// CopyToList creates a CopyToListRule +func CopyToList(item Selector, to string) *CopyToListRule { + return &CopyToListRule{ + Item: item, + To: to, + } +} + +// InjectIndexRule injects index to each input. +// Index is in form {type}-{namespace}-{dataset-type} +// type: is provided to the rule. +// namespace: is collected from streams[n].namespace. If not found used 'default'. +// dataset-type: is collected from streams[n].dataset.type. If not found used 'generic'. +type InjectIndexRule struct { + Type string +} + +// Apply injects index into input. +func (r *InjectIndexRule) Apply(ast *AST) error { + const defaultNamespace = "default" + const defaultDataset = "generic" + + datasourcesNode, found := Lookup(ast, "datasources") + if !found { + return nil + } + + datasourcesList, ok := datasourcesNode.Value().(*List) + if !ok { + return nil + } + + for _, datasourceNode := range datasourcesList.value { + namespace := defaultNamespace + nsNode, found := datasourceNode.Find("namespace") + if found { + nsKey, ok := nsNode.(*Key) + if ok { + namespace = nsKey.value.String() + } + } + + // get input + inputNode, found := datasourceNode.Find("inputs") + if !found { + continue + } + + inputsList, ok := inputNode.Value().(*List) + if !ok { + continue + } + + for _, inputNode := range inputsList.value { + streamsNode, ok := inputNode.Find("streams") + if !ok { + continue + } + + streamsList, ok := streamsNode.Value().(*List) + if !ok { + continue + } + + for _, streamNode := range streamsList.value { + streamMap, ok := streamNode.(*Dict) + if !ok { + continue + } + + dataset := defaultDataset + + dsNode, found := streamNode.Find("dataset") + if found { + dsKey, ok := dsNode.(*Key) + if ok { + dataset = dsKey.value.String() + } + + } + + streamMap.value = append(streamMap.value, &Key{ + name: "index", + value: &StrVal{value: fmt.Sprintf("%s-%s-%s", r.Type, dataset, namespace)}, + }) + } + + } + + } + + return nil +} + +// InjectIndex creates a InjectIndexRule +func InjectIndex(indexType string) *InjectIndexRule { + return &InjectIndexRule{ + Type: indexType, + } +} + // ExtractListItemRule extract items with specified name from a list of maps. // The result is store in a new array. // Example: @@ -188,6 +412,13 @@ func (r *ExtractListItemRule) Apply(ast *AST) error { continue } + if ln, ok := vn.(*List); ok { + for _, lnItem := range ln.value { + newList.value = append(newList.value, lnItem.Clone()) + } + continue + } + newList.value = append(newList.value, vn.Clone()) } diff --git a/x-pack/agent/pkg/agent/transpiler/rules_test.go b/x-pack/agent/pkg/agent/transpiler/rules_test.go index 9c2520ef5531..41dcb2d82cc3 100644 --- a/x-pack/agent/pkg/agent/transpiler/rules_test.go +++ b/x-pack/agent/pkg/agent/transpiler/rules_test.go @@ -22,6 +22,71 @@ func TestRules(t *testing.T) { expectedYAML string rule Rule }{ + "inject index": { + givenYAML: ` +datasources: + - name: All default + inputs: + - type: file + streams: + - paths: /var/log/mysql/error.log + - name: Specified namespace + namespace: nsns + inputs: + - type: file + streams: + - paths: /var/log/mysql/access.log + - name: Specified dataset + inputs: + - type: file + streams: + - paths: /var/log/mysql/access.log + dataset: dsds + - name: All specified + namespace: nsns + inputs: + - type: file + streams: + - paths: /var/log/mysql/access.log + dataset: dsds +`, + expectedYAML: ` +datasources: + - name: All default + inputs: + - type: file + streams: + - paths: /var/log/mysql/error.log + index: mytype-generic-default + - name: Specified namespace + namespace: nsns + inputs: + - type: file + streams: + - paths: /var/log/mysql/access.log + index: mytype-generic-nsns + - name: Specified dataset + inputs: + - type: file + streams: + - paths: /var/log/mysql/access.log + dataset: dsds + index: mytype-dsds-default + - name: All specified + namespace: nsns + inputs: + - type: file + streams: + - paths: /var/log/mysql/access.log + dataset: dsds + index: mytype-dsds-nsns +`, + rule: &RuleList{ + Rules: []Rule{ + InjectIndex("mytype"), + }, + }, + }, "extract items from array": { givenYAML: ` @@ -383,6 +448,61 @@ inputs: }, }, }, + + "remove key": { + givenYAML: ` +key1: val1 +key2: val2 +`, + expectedYAML: ` +key1: val1 +`, + rule: &RuleList{ + Rules: []Rule{ + RemoveKey("key2"), + }, + }, + }, + + "copy item to list": { + givenYAML: ` +namespace: testing +inputs: + - type: metric/log + - type: metric/tcp +`, + expectedYAML: ` +namespace: testing +inputs: + - type: metric/log + namespace: testing + - type: metric/tcp + namespace: testing +`, + rule: &RuleList{ + Rules: []Rule{ + CopyToList("namespace", "inputs"), + }, + }, + }, + + "Make array": { + givenYAML: ` +sample: + log: "log value" +`, + expectedYAML: ` +sample: + log: "log value" +logs: + - "log value" +`, + rule: &RuleList{ + Rules: []Rule{ + MakeArray("sample.log", "logs"), + }, + }, + }, } for name, test := range testcases { @@ -440,6 +560,8 @@ func TestSerialization(t *testing.T) { FilterValues("select-v", "key-v", "v1", "v2"), FilterValuesWithRegexp("inputs", "type", regexp.MustCompile("^metric/.*")), ExtractListItem("path.p", "item", "target"), + InjectIndex("index-type"), + CopyToList("t1", "t2"), ) y := `- rename: @@ -484,6 +606,11 @@ func TestSerialization(t *testing.T) { path: path.p item: item to: target +- inject_index: + type: index-type +- copy_to_list: + item: t1 + to: t2 ` t.Run("serialize_rules", func(t *testing.T) { diff --git a/x-pack/agent/pkg/fleetapi/ack_cmd_test.go b/x-pack/agent/pkg/fleetapi/ack_cmd_test.go index 7495d60dd0e5..cd4f9b95555b 100644 --- a/x-pack/agent/pkg/fleetapi/ack_cmd_test.go +++ b/x-pack/agent/pkg/fleetapi/ack_cmd_test.go @@ -51,11 +51,11 @@ func TestAck(t *testing.T) { return mux }, withAPIKey, func(t *testing.T, client clienter) { - action := &ActionPolicyChange{ + action := &ActionConfigChange{ ActionID: "my-id", - ActionType: "POLICY_CHANGE", - Policy: map[string]interface{}{ - "id": "policy_id", + ActionType: "CONFIG_CHANGE", + Config: map[string]interface{}{ + "id": "config_id", }, } diff --git a/x-pack/agent/pkg/fleetapi/action.go b/x-pack/agent/pkg/fleetapi/action.go index 4245f767af7d..e2fc29435734 100644 --- a/x-pack/agent/pkg/fleetapi/action.go +++ b/x-pack/agent/pkg/fleetapi/action.go @@ -57,14 +57,14 @@ func (a *ActionUnknown) OriginalType() string { return a.originalType } -// ActionPolicyChange is a request to apply a new -type ActionPolicyChange struct { +// ActionConfigChange is a request to apply a new +type ActionConfigChange struct { ActionID string ActionType string - Policy map[string]interface{} `json:"policy"` + Config map[string]interface{} `json:"config"` } -func (a *ActionPolicyChange) String() string { +func (a *ActionConfigChange) String() string { var s strings.Builder s.WriteString("action_id: ") s.WriteString(a.ActionID) @@ -74,12 +74,12 @@ func (a *ActionPolicyChange) String() string { } // Type returns the type of the Action. -func (a *ActionPolicyChange) Type() string { +func (a *ActionConfigChange) Type() string { return a.ActionType } // ID returns the ID of the Action. -func (a *ActionPolicyChange) ID() string { +func (a *ActionConfigChange) ID() string { return a.ActionID } @@ -107,14 +107,14 @@ func (a *Actions) UnmarshalJSON(data []byte) error { for _, response := range responses { switch response.ActionType { - case "POLICY_CHANGE": - action = &ActionPolicyChange{ + case "CONFIG_CHANGE": + action = &ActionConfigChange{ ActionID: response.ActionID, ActionType: response.ActionType, } if err := json.Unmarshal(response.Data, action); err != nil { return errors.New(err, - "fail to decode POLICY_CHANGE action", + "fail to decode CONFIG_CHANGE action", errors.TypeConfig) } default: diff --git a/x-pack/agent/pkg/fleetapi/checkin_cmd.go b/x-pack/agent/pkg/fleetapi/checkin_cmd.go index aa8f63d1f424..a4500b0c6124 100644 --- a/x-pack/agent/pkg/fleetapi/checkin_cmd.go +++ b/x-pack/agent/pkg/fleetapi/checkin_cmd.go @@ -9,6 +9,7 @@ import ( "context" "encoding/json" "fmt" + "io/ioutil" "net/http" "time" @@ -100,15 +101,17 @@ func (e *CheckinCmd) Execute(ctx context.Context, r *CheckinRequest) (*CheckinRe return nil, extract(resp.Body) } + rs, _ := ioutil.ReadAll(resp.Body) + checkinResponse := &CheckinResponse{} - decoder := json.NewDecoder(resp.Body) + decoder := json.NewDecoder(bytes.NewReader(rs)) if err := decoder.Decode(checkinResponse); err != nil { return nil, errors.New(err, "fail to decode checkin response", errors.TypeNetwork, errors.M(errors.MetaKeyURI, cp)) } - + fmt.Println(string(rs)) if err := checkinResponse.Validate(); err != nil { return nil, err } diff --git a/x-pack/agent/pkg/fleetapi/checkin_cmd_test.go b/x-pack/agent/pkg/fleetapi/checkin_cmd_test.go index 3f92feac6921..b9eb9f3aa812 100644 --- a/x-pack/agent/pkg/fleetapi/checkin_cmd_test.go +++ b/x-pack/agent/pkg/fleetapi/checkin_cmd_test.go @@ -53,33 +53,32 @@ Something went wrong func(t *testing.T) *http.ServeMux { raw := ` { - "actions": [ - { - "type": "POLICY_CHANGE", - "id": "id1", - "data": { - "policy": { - "id": "policy-id", - "outputs": { - "default": { - "hosts": "https://localhost:9200" - } - }, - "streams": [ - { - "id": "string", - "type": "logs", - "path": "/var/log/hello.log", - "output": { - "use_output": "default" - } - } - ] - } - } - } - ], - "success": true + "actions": [{ + "type": "CONFIG_CHANGE", + "id": "id1", + "data": { + "config": { + "id": "policy-id", + "outputs": { + "default": { + "hosts": "https://localhost:9200" + } + }, + "datasources": [{ + "id": "string", + "enabled": true, + "use_output": "default", + "inputs": [{ + "type": "logs", + "streams": [{ + "paths": ["/var/log/hello.log"] + }] + }] + }] + } + } + }], + "success": true } ` mux := http.NewServeMux() @@ -103,7 +102,7 @@ Something went wrong // ActionPolicyChange require.Equal(t, "id1", r.Actions[0].ID()) - require.Equal(t, "POLICY_CHANGE", r.Actions[0].Type()) + require.Equal(t, "CONFIG_CHANGE", r.Actions[0].Type()) }, )) @@ -113,26 +112,27 @@ Something went wrong { "actions": [ { - "type": "POLICY_CHANGE", + "type": "CONFIG_CHANGE", "id": "id1", "data": { - "policy": { + "config": { "id": "policy-id", "outputs": { "default": { "hosts": "https://localhost:9200" } }, - "streams": [ - { - "id": "string", - "type": "logs", - "path": "/var/log/hello.log", - "output": { - "use_output": "default" - } - } - ] + "datasources": [{ + "id": "string", + "enabled": true, + "use_output": "default", + "inputs": [{ + "type": "logs", + "streams": [{ + "paths": ["/var/log/hello.log"] + }] + }] + }] } } }, @@ -165,7 +165,7 @@ Something went wrong // ActionPolicyChange require.Equal(t, "id1", r.Actions[0].ID()) - require.Equal(t, "POLICY_CHANGE", r.Actions[0].Type()) + require.Equal(t, "CONFIG_CHANGE", r.Actions[0].Type()) // UnknownAction require.Equal(t, "id2", r.Actions[1].ID()) diff --git a/x-pack/agent/spec/filebeat.yml b/x-pack/agent/spec/filebeat.yml index 0b098e0ac758..7b0858f54280 100644 --- a/x-pack/agent/spec/filebeat.yml +++ b/x-pack/agent/spec/filebeat.yml @@ -3,12 +3,36 @@ cmd: filebeat args: ["-E", "setup.ilm.enabled=false", "-E", "setup.template.enabled=false", "-E", "management.mode=x-pack-fleet", "-E", "management.enabled=true"] configurable: grpc rules: +- inject_index: + type: logs + +- extract_list_items: + path: datasources + item: inputs + to: inputsstreams + - map: - path: streams + path: inputsstreams + rules: + - copy_to_list: + item: type + to: streams + - copy_to_list: + item: processors + to: streams + +- extract_list_items: + path: inputsstreams + item: streams + to: inputs + +- map: + path: inputs rules: - translate: path: type mapper: + logs: log event/file: log event/stdin: stdin event/tcp: tcp @@ -16,8 +40,10 @@ rules: log/docker: docker log/redis_slowlog: redis log/syslog: syslog + + - filter_values: - selector: streams + selector: inputs key: type values: - log @@ -27,12 +53,17 @@ rules: - docker - redis - syslog + +- filter_values: + selector: inputs + key: enabled + values: + - true + - copy: - from: streams + from: inputs to: filebeat -- rename: - from: filebeat.streams - to: inputs + - filter: selectors: - filebeat diff --git a/x-pack/agent/spec/metricbeat.yml b/x-pack/agent/spec/metricbeat.yml index 59f1de49a083..178a797430f4 100644 --- a/x-pack/agent/spec/metricbeat.yml +++ b/x-pack/agent/spec/metricbeat.yml @@ -3,30 +3,74 @@ cmd: metricbeat args: ["-E", "setup.ilm.enabled=false", "-E", "setup.template.enabled=false", "-E", "management.mode=x-pack-fleet", "-E", "management.enabled=true"] configurable: grpc rules: +- inject_index: + type: metrics + +- extract_list_items: + path: datasources + item: inputs + to: inputsstreams + +- map: + path: inputsstreams + rules: + - copy_to_list: + item: type + to: streams + - copy_to_list: + item: processors + to: streams + +- extract_list_items: + path: inputsstreams + item: streams + to: inputs + + - filter_values_with_regexp: key: type - re: ^metric/.+ - selector: streams + re: ^.+/metrics$ + selector: inputs + +- filter_values: + selector: inputs + key: enabled + values: + - true + - map: - path: streams + path: inputs rules: - translate_with_regexp: path: type - re: ^metric/(?P.+) + re: ^(?P.+)/metrics$ with: $type - rename: from: type to: module + - make_array: + item: metricset + to: metricsets + - remove_key: + key: metricset + - remove_key: + key: enabled + - remove_key: + key: dataset + - copy: - from: streams + from: inputs to: metricbeat + - rename: - from: metricbeat.streams + from: metricbeat.inputs to: modules + - filter: selectors: - metricbeat - output - keystore + when: HasItems(%{[metricbeat.modules]}) && HasNamespace('output', 'elasticsearch', 'redis', 'kafka', 'logstash')