diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a57cad8f5b82..75bb82ea39eb 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -314,6 +314,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add `keep_null` setting to allow Beats to publish null values in events. {issue}5522[5522] {pull}13928[13928] - Add shared_credential_file option in aws related config for specifying credential file directory. {issue}14157[14157] {pull}14178[14178] - GA the `script` processor. {pull}14325[14325] +- Add `fingerprint` processor. {issue}11173[11173] {pull}14205[14205] *Auditbeat* diff --git a/libbeat/docs/processors-using.asciidoc b/libbeat/docs/processors-using.asciidoc index 3321f4a2fbb9..97a866f8c15a 100644 --- a/libbeat/docs/processors-using.asciidoc +++ b/libbeat/docs/processors-using.asciidoc @@ -1184,6 +1184,27 @@ The following settings are supported: empty array (`[]`) or an empty object (`{}`) are considered empty values. Default is `false`. +[[fingerprint]] +=== Generate a fingerprint of an event + +The `fingerprint` processor generates a fingerprint of an event based on a +specified subset of its fields. + +[source,yaml] +----------------------------------------------------- +processors: + - fingerprint: + fields: ["field1", "field2", ...] +----------------------------------------------------- + +The following settings are supported: + +`fields`:: List of fields to use as the source for the fingerprint. +`ignore_missing`:: (Optional) Whether to ignore missing fields. Default is `false`. +`target_field`:: (Optional) Field in which the generated fingerprint should be stored. Default is `fingerprint`. +`method`:: (Optional) Algorithm to use for computing the fingerprint. Must be one of: `md5`, `sha1`, `sha256`, `sha384`, `sha512`. Default is `sha256`. +`encoding`:: (Optional) Encoding to use on the fingerprint value. Must be one of `hex`, `base32`, or `base64`. Default is `hex`. + [[include-fields]] === Keep fields from events diff --git a/libbeat/processors/fingerprint/config.go b/libbeat/processors/fingerprint/config.go new file mode 100644 index 000000000000..dc36b6bceffb --- /dev/null +++ b/libbeat/processors/fingerprint/config.go @@ -0,0 +1,36 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package fingerprint + +// Config for fingerprint processor. +type Config struct { + Method hashMethod `config:"method"` // Hash function to use for fingerprinting + Fields []string `config:"fields" validate:"required"` // Source fields to compute fingerprint from + TargetField string `config:"target_field"` // Target field for the fingerprint + Encoding encodingMethod `config:"encoding"` // Encoding to use for target field value + IgnoreMissing bool `config:"ignore_missing"` // Ignore missing fields? +} + +func defaultConfig() Config { + return Config{ + Method: hashes["sha256"], + TargetField: "fingerprint", + Encoding: encodings["hex"], + IgnoreMissing: false, + } +} diff --git a/libbeat/processors/fingerprint/encode.go b/libbeat/processors/fingerprint/encode.go new file mode 100644 index 000000000000..843c7bd5d293 --- /dev/null +++ b/libbeat/processors/fingerprint/encode.go @@ -0,0 +1,46 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package fingerprint + +import ( + "encoding/base32" + "encoding/base64" + "encoding/hex" + "strings" +) + +type encodingMethod func([]byte) string + +var encodings = map[string]encodingMethod{ + "hex": hex.EncodeToString, + "base32": base32.StdEncoding.EncodeToString, + "base64": base64.StdEncoding.EncodeToString, +} + +// Unpack creates the encodingMethod from the given string +func (e *encodingMethod) Unpack(str string) error { + str = strings.ToLower(str) + + m, found := encodings[str] + if !found { + return makeErrUnknownEncoding(str) + } + + *e = m + return nil +} diff --git a/libbeat/processors/fingerprint/errors.go b/libbeat/processors/fingerprint/errors.go new file mode 100644 index 000000000000..e025015883b6 --- /dev/null +++ b/libbeat/processors/fingerprint/errors.go @@ -0,0 +1,79 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package fingerprint + +import ( + "errors" + "fmt" +) + +var errNoFields = errors.New("must specify at least one field") + +type ( + errUnknownEncoding struct{ encoding string } + errUnknownMethod struct{ method string } + errConfigUnpack struct{ cause error } + errComputeFingerprint struct{ cause error } + errMissingField struct { + field string + cause error + } + errNonScalarField struct{ field string } +) + +func makeErrUnknownEncoding(encoding string) errUnknownEncoding { + return errUnknownEncoding{encoding} +} +func (e errUnknownEncoding) Error() string { + return fmt.Sprintf("invalid encoding [%s]", e.encoding) +} + +func makeErrUnknownMethod(method string) errUnknownMethod { + return errUnknownMethod{method} +} +func (e errUnknownMethod) Error() string { + return fmt.Sprintf("invalid fingerprinting method [%s]", e.method) +} + +func makeErrConfigUnpack(cause error) errConfigUnpack { + return errConfigUnpack{cause} +} +func (e errConfigUnpack) Error() string { + return fmt.Sprintf("failed to unpack %v processor configuration: %v", processorName, e.cause) +} + +func makeErrComputeFingerprint(cause error) errComputeFingerprint { + return errComputeFingerprint{cause} +} +func (e errComputeFingerprint) Error() string { + return fmt.Sprintf("failed to compute fingerprint: %v", e.cause) +} + +func makeErrMissingField(field string, cause error) errMissingField { + return errMissingField{field, cause} +} +func (e errMissingField) Error() string { + return fmt.Sprintf("failed to find field [%v] in event: %v", e.field, e.cause) +} + +func makeErrNonScalarField(field string) errNonScalarField { + return errNonScalarField{field} +} +func (e errNonScalarField) Error() string { + return fmt.Sprintf("cannot compute fingerprint using non-scalar field [%v]", e.field) +} diff --git a/libbeat/processors/fingerprint/fingerprint.go b/libbeat/processors/fingerprint/fingerprint.go new file mode 100644 index 000000000000..b0f877dfeb32 --- /dev/null +++ b/libbeat/processors/fingerprint/fingerprint.go @@ -0,0 +1,111 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package fingerprint + +import ( + "fmt" + "hash" + "io" + "time" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/processors" + jsprocessor "github.com/elastic/beats/libbeat/processors/script/javascript/module/processor" +) + +func init() { + processors.RegisterPlugin("fingerprint", New) + jsprocessor.RegisterPlugin("Fingerprint", New) +} + +const processorName = "fingerprint" + +type fingerprint struct { + config Config + fields []string + hash hash.Hash +} + +// New constructs a new fingerprint processor. +func New(cfg *common.Config) (processors.Processor, error) { + config := defaultConfig() + if err := cfg.Unpack(&config); err != nil { + return nil, makeErrConfigUnpack(err) + } + + fields := common.MakeStringSet(config.Fields...) + + p := &fingerprint{ + config: config, + hash: config.Method(), + fields: fields.ToSlice(), + } + + return p, nil +} + +// Run enriches the given event with fingerprint information +func (p *fingerprint) Run(event *beat.Event) (*beat.Event, error) { + hashFn := p.hash + hashFn.Reset() + + err := p.writeFields(hashFn, event.Fields) + if err != nil { + return nil, makeErrComputeFingerprint(err) + } + + hash := hashFn.Sum(nil) + encodedHash := p.config.Encoding(hash) + + if _, err = event.PutValue(p.config.TargetField, encodedHash); err != nil { + return nil, makeErrComputeFingerprint(err) + } + + return event, nil +} + +func (p *fingerprint) String() string { + return fmt.Sprintf("%v=[method=[%v]]", processorName, p.config.Method) +} + +func (p *fingerprint) writeFields(to io.Writer, eventFields common.MapStr) error { + for _, k := range p.fields { + v, err := eventFields.GetValue(k) + if err != nil { + if p.config.IgnoreMissing { + continue + } + return makeErrMissingField(k, err) + } + + i := v + switch vv := v.(type) { + case map[string]interface{}, []interface{}, common.MapStr: + return makeErrNonScalarField(k) + case time.Time: + // Ensure we consistently hash times in UTC. + i = vv.UTC() + } + + fmt.Fprintf(to, "|%v|%v", k, i) + } + + io.WriteString(to, "|") + return nil +} diff --git a/libbeat/processors/fingerprint/fingerprint_test.go b/libbeat/processors/fingerprint/fingerprint_test.go new file mode 100644 index 000000000000..95e13669a090 --- /dev/null +++ b/libbeat/processors/fingerprint/fingerprint_test.go @@ -0,0 +1,378 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package fingerprint + +import ( + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +func TestHashMethods(t *testing.T) { + testFields := common.MapStr{ + "field1": "foo", + "field2": "bar", + "unused_field": "baz", + } + + tests := map[string]struct { + expected string + }{ + "md5": {"4c45df4792f3ef850c928ec5f5232538"}, + "sha1": {"22f76427d626516d3f7a05785165b99617683b22"}, + "sha256": {"1208288932231e313b369bae587ff574cd3016a408e52e7128d7bee752674003"}, + "sha384": {"295adfe0bc03908948e4b0b6a54f441767867e426dda590430459c8a147fbba242a38cba282adee78335b9e08877b86c"}, + "sha512": {"f50ad51b63c92a0ed0c910527119b81806f3110f0afaa1dcb93506a78371ea761e50c0fc09b08c441d832dd2da1b45e5d8361adfb240e1fffc2695122a23e183"}, + } + + for method, test := range tests { + t.Run(method, func(t *testing.T) { + testConfig, err := common.NewConfigFrom(common.MapStr{ + "fields": []string{"field1", "field2"}, + "method": method, + }) + assert.NoError(t, err) + + p, err := New(testConfig) + assert.NoError(t, err) + + testEvent := &beat.Event{ + Fields: testFields.Clone(), + Timestamp: time.Now(), + } + + newEvent, err := p.Run(testEvent) + assert.NoError(t, err) + + v, err := newEvent.GetValue("fingerprint") + assert.NoError(t, err) + assert.Equal(t, test.expected, v) + }) + } +} + +func TestSourceFields(t *testing.T) { + testFields := common.MapStr{ + "field1": "foo", + "field2": "bar", + "nested": common.MapStr{ + "field": "qux", + }, + "unused_field": "baz", + } + expectedFingerprint := "3d51237d384215a6e731f2cc67ead6d7d9a5138377897c8f542a915be3c25bcf" + + tests := map[string]struct { + fields []string + }{ + "order_1": {[]string{"field1", "nested.field"}}, + "order_2": {[]string{"nested.field", "field1"}}, + "duplicates_ignored": {[]string{"nested.field", "field1", "nested.field"}}, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + testConfig, err := common.NewConfigFrom(common.MapStr{ + "fields": test.fields, + "method": "sha256", + }) + assert.NoError(t, err) + + p, err := New(testConfig) + assert.NoError(t, err) + + testEvent := &beat.Event{ + Fields: testFields.Clone(), + Timestamp: time.Now(), + } + newEvent, err := p.Run(testEvent) + assert.NoError(t, err) + + v, err := newEvent.GetValue("fingerprint") + assert.NoError(t, err) + assert.Equal(t, expectedFingerprint, v) + }) + } +} + +func TestEncoding(t *testing.T) { + testFields := common.MapStr{ + "field1": "foo", + "field2": "bar", + "nested": common.MapStr{ + "field": "qux", + }, + "unused_field": "baz", + } + + tests := map[string]struct { + expectedFingerprint string + }{ + "hex": {"8934ca639027aab1ee9f3944d4d6bd1e"}, + "base32": {"RE2MUY4QE6VLD3U7HFCNJVV5DY======"}, + "base64": {"iTTKY5AnqrHunzlE1Na9Hg=="}, + } + + for encoding, test := range tests { + t.Run(encoding, func(t *testing.T) { + testConfig, err := common.NewConfigFrom(common.MapStr{ + "fields": []string{"field2", "nested.field"}, + "method": "md5", + "encoding": encoding, + }) + assert.NoError(t, err) + + p, err := New(testConfig) + assert.NoError(t, err) + + testEvent := &beat.Event{ + Fields: testFields.Clone(), + Timestamp: time.Now(), + } + newEvent, err := p.Run(testEvent) + assert.NoError(t, err) + + v, err := newEvent.GetValue("fingerprint") + assert.NoError(t, err) + assert.Equal(t, test.expectedFingerprint, v) + }) + } +} + +func TestConsistentHashingTimeFields(t *testing.T) { + tzUTC := time.UTC + tzPST := time.FixedZone("Pacific Standard Time", int((-8 * time.Hour).Seconds())) + tzIST := time.FixedZone("Indian Standard Time", int((5*time.Hour + 30*time.Minute).Seconds())) + + expectedFingerprint := "4534d56a673c2da41df32db5da87cf47e639e84fe82907f2c015c8dfcac5d4f5" + + tests := map[string]struct { + event common.MapStr + }{ + "UTC": { + common.MapStr{ + "timestamp": time.Date(2019, 10, 29, 0, 0, 0, 0, tzUTC), + }, + }, + "PST": { + common.MapStr{ + "timestamp": time.Date(2019, 10, 28, 16, 0, 0, 0, tzPST), + }, + }, + "IST": { + common.MapStr{ + "timestamp": time.Date(2019, 10, 29, 5, 30, 0, 0, tzIST), + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + testConfig, err := common.NewConfigFrom(common.MapStr{ + "fields": []string{"timestamp"}, + }) + assert.NoError(t, err) + + p, err := New(testConfig) + assert.NoError(t, err) + + testEvent := &beat.Event{ + Fields: test.event, + } + newEvent, err := p.Run(testEvent) + assert.NoError(t, err) + + v, err := newEvent.GetValue("fingerprint") + assert.NoError(t, err) + assert.Equal(t, expectedFingerprint, v) + }) + } +} + +func TestTargetField(t *testing.T) { + testFields := common.MapStr{ + "field1": "foo", + "nested": common.MapStr{ + "field": "bar", + }, + "unused_field": "baz", + } + expectedFingerprint := "4cf8b768ad20266c348d63a6d1ff5d6f6f9ed0f59f5c68ae031b78e3e04c5144" + + tests := map[string]struct { + targetField string + }{ + "root": {"target_field"}, + "nested": {"nested.target_field"}, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + testConfig, err := common.NewConfigFrom(common.MapStr{ + "fields": []string{"field1"}, + "target_field": test.targetField, + }) + assert.NoError(t, err) + + p, err := New(testConfig) + assert.NoError(t, err) + + testEvent := &beat.Event{ + Fields: testFields.Clone(), + Timestamp: time.Now(), + } + newEvent, err := p.Run(testEvent) + assert.NoError(t, err) + + v, err := newEvent.GetValue(test.targetField) + assert.NoError(t, err) + assert.Equal(t, expectedFingerprint, v) + + _, err = newEvent.GetValue("fingerprint") + assert.EqualError(t, err, common.ErrKeyNotFound.Error()) + }) + } +} + +func TestSourceFieldErrors(t *testing.T) { + testFields := common.MapStr{ + "field1": "foo", + "field2": "bar", + "complex_field": map[string]interface{}{ + "child": "qux", + }, + "unused_field": "baz", + } + + tests := map[string]struct { + fields []string + }{ + "missing": { + []string{"field1", "missing_field"}, + }, + "non-scalar": { + []string{"field1", "complex_field"}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + testConfig, err := common.NewConfigFrom(common.MapStr{ + "fields": test.fields, + "method": "sha256", + }) + assert.NoError(t, err) + + p, err := New(testConfig) + assert.NoError(t, err) + + testEvent := &beat.Event{ + Fields: testFields.Clone(), + Timestamp: time.Now(), + } + _, err = p.Run(testEvent) + assert.IsType(t, errComputeFingerprint{}, err) + }) + } +} + +func TestInvalidConfig(t *testing.T) { + tests := map[string]struct { + config common.MapStr + }{ + "no fields": { + common.MapStr{ + "fields": []string{}, + "method": "sha256", + }, + }, + "invalid fingerprinting method": { + common.MapStr{ + "fields": []string{"doesnt", "matter"}, + "method": "non_existent", + }, + }, + "invalid encoding": { + common.MapStr{ + "fields": []string{"doesnt", "matter"}, + "encoding": "non_existent", + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + testConfig, err := common.NewConfigFrom(test.config) + assert.NoError(t, err) + + _, err = New(testConfig) + assert.IsType(t, errConfigUnpack{}, err) + }) + } +} + +func TestIgnoreMissing(t *testing.T) { + testFields := common.MapStr{ + "field1": "foo", + } + + tests := map[string]struct { + assertErr assert.ErrorAssertionFunc + expectedFingerprint string + }{ + "true": { + assert.NoError, + "4cf8b768ad20266c348d63a6d1ff5d6f6f9ed0f59f5c68ae031b78e3e04c5144", + }, + "false": { + assertErr: assert.Error, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ignoreMissing, _ := strconv.ParseBool(name) + testConfig, err := common.NewConfigFrom(common.MapStr{ + "fields": []string{"field1", "missing_field"}, + "ignore_missing": ignoreMissing, + }) + assert.NoError(t, err) + + p, err := New(testConfig) + assert.NoError(t, err) + + testEvent := &beat.Event{ + Fields: testFields.Clone(), + Timestamp: time.Now(), + } + newEvent, err := p.Run(testEvent) + test.assertErr(t, err) + + if err == nil { + v, err := newEvent.GetValue("fingerprint") + assert.NoError(t, err) + assert.Equal(t, test.expectedFingerprint, v) + } + }) + } +} diff --git a/libbeat/processors/fingerprint/hash.go b/libbeat/processors/fingerprint/hash.go new file mode 100644 index 000000000000..50ff51894ea5 --- /dev/null +++ b/libbeat/processors/fingerprint/hash.go @@ -0,0 +1,50 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package fingerprint + +import ( + "crypto/md5" + "crypto/sha1" + "crypto/sha256" + "crypto/sha512" + "hash" + "strings" +) + +type hashMethod func() hash.Hash + +var hashes = map[string]hashMethod{ + "md5": md5.New, + "sha1": sha1.New, + "sha256": sha256.New, + "sha384": sha512.New384, + "sha512": sha512.New, +} + +// Unpack creates the hashMethod from the given string +func (f *hashMethod) Unpack(str string) error { + str = strings.ToLower(str) + + m, found := hashes[str] + if !found { + return makeErrUnknownMethod(str) + } + + *f = m + return nil +}