From 2d35d50a60adb5bc5b5f68d58677c2a5c16ae386 Mon Sep 17 00:00:00 2001 From: Monica Sarbu Date: Mon, 11 Jul 2016 11:28:13 +0200 Subject: [PATCH] Add OR/AND/NOT to the condition associated with the processors (#1983) --- CHANGELOG.asciidoc | 1 + filebeat/tests/system/test_processors.py | 27 ++ libbeat/docs/processors-config.asciidoc | 155 +++++++++--- libbeat/docs/processors.asciidoc | 5 + libbeat/processors/condition.go | 82 +++++++ libbeat/processors/condition_test.go | 230 ++++++++++++++++++ libbeat/processors/config.go | 11 +- .../{test_filtering.py => test_processors.py} | 30 ++- packetbeat/docs/packetbeat-filtering.asciidoc | 2 +- .../tests/system/test_0060_processors.py | 65 +++++ 10 files changed, 564 insertions(+), 44 deletions(-) rename metricbeat/tests/system/{test_filtering.py => test_processors.py} (88%) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 9c8f4d310bf..25e2fa6fe08 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -51,6 +51,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha4...master[Check the HEAD d - Periodically log internal metrics. {pull}1955[1955] - Add enable-setting to all output modules. {pull}1987[1987] - Command line flag -c can be used multiple times. {pull}1985[1985] +- Add OR/AND/NOT to the condition associated with the processors. {pull}1983[1983] *Metricbeat* diff --git a/filebeat/tests/system/test_processors.py b/filebeat/tests/system/test_processors.py index ead7b503da7..5c75b6ca5ee 100644 --- a/filebeat/tests/system/test_processors.py +++ b/filebeat/tests/system/test_processors.py @@ -79,3 +79,30 @@ def test_drop_event(self): assert "beat.name" in output assert "message" in output assert "test" in output["message"] + + def test_condition(self): + """ + Check condition in processors + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/test*.log", + drop_event={ + "condition": "not.contains.source: test", + }, + ) + with open(self.working_dir + "/test1.log", "w") as f: + f.write("test1 message\n") + + with open(self.working_dir + "/test2.log", "w") as f: + f.write("test2 message\n") + + filebeat = self.start_beat() + self.wait_until(lambda: self.output_has(lines=2)) + filebeat.check_kill_and_wait() + + output = self.read_output( + required_fields=["@timestamp", "type"], + )[0] + assert "beat.name" in output + assert "message" in output + assert "test" in output["message"] diff --git a/libbeat/docs/processors-config.asciidoc b/libbeat/docs/processors-config.asciidoc index 98b6cfa8291..5c4f4c808c3 100644 --- a/libbeat/docs/processors-config.asciidoc +++ b/libbeat/docs/processors-config.asciidoc @@ -15,36 +15,23 @@ //TODO: Remove was Filters from the above title and remove extra sections that show the alpha4 configuration -coming[5.0.0-beta1,Filters are being renamed to "processors" to better reflect the capabilities they provide. If you are using 5.0.0-alpha4 or earlier, these are still called "filters" even though the documentation refers to "processors"] +include::../../libbeat/docs/processors.asciidoc[] -You can define a set of `processors` in the +{beatname_lc}.yml+ config file to reduce the number -of fields that are exported by the Beat. - -If multiple processors are defined, they are executed in the order they are defined. The initial event is passed to the -first processor and what results from it is passed to the second processor until all processors are applied. The -condition is checked against the event that is received as input and it might differ from the original event. - -[source,yaml] -------- -event -> processor 1 -> event1 -> processor 2 -> event2 ... -------- - -See <> for the full list of possible fields. - -Each processor receives a condition and optionally a set of arguments. The action is executed only if the condition +Each processor has associated an action with a set of parameters and optionally a condition. If the condition is +present, then the action is executed only if the condition is fulfilled. If no condition is passed then the action is always executed. -deprecated[5.0.0-alpha4,The `filters` section is being renamed to `processors` in 5.0.0-beta1. Therefore the following configuration is deprecated] +deprecated[5.0.0-alpha4,The `filters` configuration option is being renamed to `processors` in 5.0.0-beta1. Therefore the following configuration is deprecated] [source,yaml] ------ filters: - - action1: - condition1 - [arguments] - - action2: - condition2 - [arguments] + - : + + + - : + + ... ------ @@ -53,18 +40,21 @@ coming[5.0.0-beta1,Begin using the following configuration starting with 5.0.0-b [source,yaml] ------ processors: - - action1: + - : when: - condition1 - [arguments] - - action2: + + + - : when: - condition2 - [arguments] + + ... ------ +where can be a way to select the fields that are exported or a way to add meta data to the event , contains the definition of the condition. +and is the list of parameters passed along the . + See <> for specific {beatname_uc} examples. [[filtering-condition]] @@ -75,20 +65,17 @@ them. You can see a list of the <>. For each field, you can specify a simple field name or a nested map, for example `dns.question.name`. -[source,yaml] ----- -condition: - field1: value1 - [field2: value2] - ... ----- -Supported conditions are: +A condition can be: * <> * <> * <> * <> +* <> +* <> +* <> + [[condition-equals]] @@ -126,7 +113,7 @@ The `regexp` condition checks the field against a regular expression. The condit For example, the following condition checks if the process name contains `foo`: -[source,yaml]] +[source,yaml] ----- reqexp: proc.name: "foo.*" @@ -162,6 +149,97 @@ range: cpu.user_p: 0.8 ------ + +coming[5.0.0-beta1, You can combine multiple conditions with the `or`, `and` or `not` operators] + +[[condition-or]] +===== OR + +The `or` operator receives a list of conditions. + +[source,yaml] +------- +or: + - + - + - + ... + +------- + +For example the condition `http.code = 304 OR http.code = 404` translates to: + +[source,yaml] +------ +or: + - equals: + http.code: 304 + - equals: + http.code: 404 +------ + + +[[condition-and]] +===== AND + +The `and` operator receives a list of conditions. + +[source,yaml] +------- +and: + - + - + - + ... + +------- + +For example the condition `http.code = 200 AND status = OK` translates to: + +[source,yaml] +------ +and: + - equals: + http.code: 200 + - equals: + status: OK +------ + +To configure a condition like ` OR AND `: + +[source,yaml] +------ +or: + - + - and: + - + - + +------ + +[[condition-not]] +===== NOT + +The `not` operator receives the condition to negate. + +[source,yaml] +------- +not: + + +------- + +For example the condition `NOT status = OK` translates to: + +[source,yaml] +------ +not: + equals: + status: OK +------ + + + ==== Actions The supported filter actions are: @@ -170,6 +248,7 @@ The supported filter actions are: * <> * <> +See <> for the full list of possible fields. [[include-fields]] ===== include_fields diff --git a/libbeat/docs/processors.asciidoc b/libbeat/docs/processors.asciidoc index 655668ef456..353d9c65184 100644 --- a/libbeat/docs/processors.asciidoc +++ b/libbeat/docs/processors.asciidoc @@ -17,4 +17,9 @@ enhancing events with additional metadata. Each processor receives an event, app and returns the event. If you define a list of processors, they are executed in the order they are defined in the configuration file. +[source,yaml] +------- +event -> processor 1 -> event1 -> processor 2 -> event2 ... +------- + The processors are defined in the {beatname_uc} configuration file. diff --git a/libbeat/processors/condition.go b/libbeat/processors/condition.go index a182b04f6b4..5e940a79157 100644 --- a/libbeat/processors/condition.go +++ b/libbeat/processors/condition.go @@ -28,6 +28,9 @@ type Condition struct { contains map[string]string regexp map[string]*regexp.Regexp rangexp map[string]RangeValue + or []Condition + and []Condition + not *Condition } func NewCondition(config *ConditionConfig) (*Condition, error) { @@ -55,10 +58,33 @@ func NewCondition(config *ConditionConfig) (*Condition, error) { if err := c.setRange(config.Range); err != nil { return nil, err } + } else if len(config.OR) > 0 { + for _, cond_config := range config.OR { + cond, err := NewCondition(&cond_config) + if err != nil { + return nil, err + } + c.or = append(c.or, *cond) + } + } else if len(config.AND) > 0 { + for _, cond_config := range config.AND { + cond, err := NewCondition(&cond_config) + if err != nil { + return nil, err + } + c.and = append(c.and, *cond) + } + } else if config.NOT != nil { + cond, err := NewCondition(config.NOT) + if err != nil { + return nil, err + } + c.not = cond } else { return nil, fmt.Errorf("missing condition") } + logp.Debug("processors", "New condition %s", c) return &c, nil } @@ -166,6 +192,18 @@ func (c *Condition) setRange(cfg *ConditionFields) error { func (c *Condition) Check(event common.MapStr) bool { + if len(c.or) > 0 { + return c.checkOR(event) + } + + if len(c.and) > 0 { + return c.checkAND(event) + } + + if c.not != nil { + return c.checkNOT(event) + } + if !c.checkEquals(event) { return false } @@ -323,6 +361,34 @@ func (c *Condition) checkRange(event common.MapStr) bool { return true } +func (c *Condition) checkOR(event common.MapStr) bool { + + for _, cond := range c.or { + if cond.Check(event) { + return true + } + } + return false +} + +func (c *Condition) checkAND(event common.MapStr) bool { + + for _, cond := range c.and { + if !cond.Check(event) { + return false + } + } + return true +} + +func (c *Condition) checkNOT(event common.MapStr) bool { + + if c.not.Check(event) { + return false + } + return true +} + func (c Condition) String() string { s := "" @@ -339,6 +405,22 @@ func (c Condition) String() string { if len(c.rangexp) > 0 { s = s + fmt.Sprintf("range: %v", c.rangexp) } + if len(c.or) > 0 { + for _, cond := range c.or { + s = s + cond.String() + " or " + } + s = s[:len(s)-len(" or ")] //delete the last or + } + if len(c.and) > 0 { + for _, cond := range c.and { + s = s + cond.String() + " and " + } + s = s[:len(s)-len(" and ")] //delete the last and + } + if c.not != nil { + s = s + "not " + c.not.String() + } + return s } diff --git a/libbeat/processors/condition_test.go b/libbeat/processors/condition_test.go index a4183de3a54..ac963cd9766 100644 --- a/libbeat/processors/condition_test.go +++ b/libbeat/processors/condition_test.go @@ -304,3 +304,233 @@ func TestRangeCondition(t *testing.T) { assert.True(t, conds[3].Check(event1)) assert.False(t, conds[3].Check(event)) } + +func TestORCondition(t *testing.T) { + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + } + configs := []ConditionConfig{ + ConditionConfig{ + OR: []ConditionConfig{ + ConditionConfig{ + Range: &ConditionFields{fields: map[string]interface{}{ + "http.code.gte": 400, + "http.code.lt": 500, + }}, + }, + ConditionConfig{ + Range: &ConditionFields{fields: map[string]interface{}{ + "http.code.gte": 200, + "http.code.lt": 300, + }}, + }, + }, + }, + } + + conds := GetConditions(t, configs) + for _, cond := range conds { + logp.Debug("test", "%s", cond) + } + + event := common.MapStr{ + "@timestamp": "2015-06-11T09:51:23.642Z", + "bytes_in": 126, + "bytes_out": 28033, + "client_ip": "127.0.0.1", + "client_port": 42840, + "client_proc": "", + "client_server": "mar.local", + "http": common.MapStr{ + "code": 404, + "content_length": 76985, + "phrase": "Not found", + }, + "ip": "127.0.0.1", + "method": "GET", + "params": "", + "path": "/jszip.min.js", + "port": 8000, + "proc": "", + "query": "GET /jszip.min.js", + "responsetime": 30, + "server": "mar.local", + "status": "OK", + "type": "http", + } + + assert.True(t, conds[0].Check(event)) + +} + +func TestANDCondition(t *testing.T) { + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + } + configs := []ConditionConfig{ + ConditionConfig{ + AND: []ConditionConfig{ + ConditionConfig{ + Equals: &ConditionFields{fields: map[string]interface{}{ + "client_server": "mar.local", + }}, + }, + ConditionConfig{ + Range: &ConditionFields{fields: map[string]interface{}{ + "http.code.gte": 400, + "http.code.lt": 500, + }}, + }, + }, + }, + } + + conds := GetConditions(t, configs) + for _, cond := range conds { + logp.Debug("test", "%s", cond) + } + + event := common.MapStr{ + "@timestamp": "2015-06-11T09:51:23.642Z", + "bytes_in": 126, + "bytes_out": 28033, + "client_ip": "127.0.0.1", + "client_port": 42840, + "client_proc": "", + "client_server": "mar.local", + "http": common.MapStr{ + "code": 404, + "content_length": 76985, + "phrase": "Not found", + }, + "ip": "127.0.0.1", + "method": "GET", + "params": "", + "path": "/jszip.min.js", + "port": 8000, + "proc": "", + "query": "GET /jszip.min.js", + "responsetime": 30, + "server": "mar.local", + "status": "OK", + "type": "http", + } + + assert.True(t, conds[0].Check(event)) + +} + +func TestNOTCondition(t *testing.T) { + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + } + configs := []ConditionConfig{ + ConditionConfig{ + NOT: &ConditionConfig{ + Equals: &ConditionFields{fields: map[string]interface{}{ + "method": "GET", + }}, + }, + }, + } + + conds := GetConditions(t, configs) + for _, cond := range conds { + logp.Debug("test", "%s", cond) + } + + event := common.MapStr{ + "@timestamp": "2015-06-11T09:51:23.642Z", + "bytes_in": 126, + "bytes_out": 28033, + "client_ip": "127.0.0.1", + "client_port": 42840, + "client_proc": "", + "client_server": "mar.local", + "http": common.MapStr{ + "code": 404, + "content_length": 76985, + "phrase": "Not found", + }, + "ip": "127.0.0.1", + "method": "GET", + "params": "", + "path": "/jszip.min.js", + "port": 8000, + "proc": "", + "query": "GET /jszip.min.js", + "responsetime": 30, + "server": "mar.local", + "status": "OK", + "type": "http", + } + + assert.False(t, conds[0].Check(event)) + +} + +func TestCombinedCondition(t *testing.T) { + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + } + configs := []ConditionConfig{ + ConditionConfig{ + OR: []ConditionConfig{ + ConditionConfig{ + Range: &ConditionFields{fields: map[string]interface{}{ + "http.code.gte": 100, + "http.code.lt": 300, + }}, + }, + ConditionConfig{ + AND: []ConditionConfig{ + ConditionConfig{ + Equals: &ConditionFields{fields: map[string]interface{}{ + "status": 200, + }}, + }, + ConditionConfig{ + Equals: &ConditionFields{fields: map[string]interface{}{ + "type": "http", + }}, + }, + }, + }, + }, + }, + } + + conds := GetConditions(t, configs) + for _, cond := range conds { + logp.Debug("test", "%s", cond) + } + + event := common.MapStr{ + "@timestamp": "2015-06-11T09:51:23.642Z", + "bytes_in": 126, + "bytes_out": 28033, + "client_ip": "127.0.0.1", + "client_port": 42840, + "client_proc": "", + "client_server": "mar.local", + "http": common.MapStr{ + "code": 200, + "content_length": 76985, + "phrase": "OK", + }, + "ip": "127.0.0.1", + "method": "GET", + "params": "", + "path": "/jszip.min.js", + "port": 8000, + "proc": "", + "query": "GET /jszip.min.js", + "responsetime": 30, + "server": "mar.local", + "status": "OK", + "type": "http", + } + + assert.True(t, conds[0].Check(event)) + +} diff --git a/libbeat/processors/config.go b/libbeat/processors/config.go index 550203fe41b..3e651ca870e 100644 --- a/libbeat/processors/config.go +++ b/libbeat/processors/config.go @@ -9,10 +9,13 @@ import ( ) type ConditionConfig struct { - Equals *ConditionFields `config:"equals"` - Contains *ConditionFields `config:"contains"` - Regexp *ConditionFields `config:"regexp"` - Range *ConditionFields `config:"range"` + Equals *ConditionFields `config:"equals"` + Contains *ConditionFields `config:"contains"` + Regexp *ConditionFields `config:"regexp"` + Range *ConditionFields `config:"range"` + OR []ConditionConfig `config:"or"` + AND []ConditionConfig `config:"and"` + NOT *ConditionConfig `config:"not"` } type ConditionFields struct { diff --git a/metricbeat/tests/system/test_filtering.py b/metricbeat/tests/system/test_processors.py similarity index 88% rename from metricbeat/tests/system/test_filtering.py rename to metricbeat/tests/system/test_processors.py index 31b9b746e5f..d01fabc588a 100644 --- a/metricbeat/tests/system/test_filtering.py +++ b/metricbeat/tests/system/test_processors.py @@ -4,7 +4,7 @@ import unittest @unittest.skipUnless(re.match("(?i)win|linux|darwin|freebsd", sys.platform), "os") -class GlobalFiltering(metricbeat.BaseTest): +class TestProcessors(metricbeat.BaseTest): def test_drop_fields(self): @@ -101,6 +101,34 @@ def test_dropevent_with_condition(self): for event in output: assert float(event["system.process.cpu.total.pct"]) >= 0.001 + + def test_dropevent_with_complex_condition(self): + """ + Check drop_event action works when a complex condition is associated. + """ + self.render_config_template( + modules=[{ + "name": "system", + "metricsets": ["process"], + "period": "1s" + }], + drop_event={ + "condition": "not.contains.system.process.cmdline: metricbeat.test", + }, + ) + metricbeat = self.start_beat() + self.wait_until( + lambda: self.output_count(lambda x: x >= 1), + max_timeout=15) + + metricbeat.kill_and_wait() + + output = self.read_output( + required_fields=["@timestamp", "type"], + ) + assert len(output) == 1 + + def test_include_fields(self): """ Check include_fields filtering action diff --git a/packetbeat/docs/packetbeat-filtering.asciidoc b/packetbeat/docs/packetbeat-filtering.asciidoc index a0d099c486b..db4c67dfe21 100644 --- a/packetbeat/docs/packetbeat-filtering.asciidoc +++ b/packetbeat/docs/packetbeat-filtering.asciidoc @@ -3,7 +3,7 @@ include::../../libbeat/docs/processors.asciidoc[] -For example, the following filters configuration includes a subset of the Packetbeat DNS fields so that only the +For example, the following configuration includes a subset of the Packetbeat DNS fields so that only the requests and their response codes are reported: [source, yaml] diff --git a/packetbeat/tests/system/test_0060_processors.py b/packetbeat/tests/system/test_0060_processors.py index c9a46107726..e926b06cd68 100644 --- a/packetbeat/tests/system/test_0060_processors.py +++ b/packetbeat/tests/system/test_0060_processors.py @@ -304,3 +304,68 @@ def test_drop_and_include_fields(self): assert "http.request_headers" in objs[1] assert "http.response_headers" in objs[1] + + def test_condition_and(self): + + self.render_config_template( + http_send_all_headers=True, + include_fields={ + "fields": ["http"], + "condition": """ + and: + - equals.type: http + - equals.http.code: 200 + """ + }, + ) + + self.run_packetbeat(pcap="http_minitwit.pcap", + debug_selectors=["http", "httpdetailed", "processors"]) + objs = self.read_output( + required_fields=["@timestamp", "type"], + ) + + assert len(objs) == 3 + assert all([o["type"] == "http" for o in objs]) + + assert "method" not in objs[0] + + def test_condition_or(self): + + self.render_config_template( + http_send_all_headers=True, + drop_event={ + "condition": """ + or: + - equals.http.code: 404 + - equals.http.code: 200 + """ + }, + ) + + self.run_packetbeat(pcap="http_minitwit.pcap", + debug_selectors=["http", "httpdetailed", "processors"]) + objs = self.read_output( + required_fields=["@timestamp", "type"], + ) + + assert len(objs) == 1 + assert all([o["type"] == "http" for o in objs]) + + def test_condition_not(self): + + self.render_config_template( + http_send_all_headers=True, + drop_event={ + "condition": "not.equals.http.code: 200", + }, + ) + + self.run_packetbeat(pcap="http_minitwit.pcap", + debug_selectors=["http", "httpdetailed", "processors"]) + objs = self.read_output( + required_fields=["@timestamp", "type"], + ) + + assert len(objs) == 1 + assert all([o["type"] == "http" for o in objs])