From 539c7f0e50e16cf3a5be8e551d3078dc0a9ef887 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Wed, 15 Jun 2016 16:50:06 -0400 Subject: [PATCH] Add fingerprint filter for event deduplication The fingerprint filter uses a cryptographic hash function to calculate a hash value from specified fields in the event. The resulting hex encoded hash value is stored in the id field. The id field is used as the _id field when the event is sent through the elasticsearch output. Since there cannot be duplicate _id values stored in an elasticsearch index, this prevents an event from being duplicated. The fingerprint filter is not appropriate for all use cases. It should only be applied when the combination input fields are sufficiently unique. --- libbeat/common/datetime.go | 5 + libbeat/filter/rules/fingerprint.go | 152 ++++++++++++++++++++++ libbeat/filter/rules/fingerprint_test.go | 155 +++++++++++++++++++++++ libbeat/outputs/elasticsearch/bulkapi.go | 1 + libbeat/outputs/elasticsearch/client.go | 6 + 5 files changed, 319 insertions(+) create mode 100644 libbeat/filter/rules/fingerprint.go create mode 100644 libbeat/filter/rules/fingerprint_test.go diff --git a/libbeat/common/datetime.go b/libbeat/common/datetime.go index 7c5010d5d144..c692db298fbc 100644 --- a/libbeat/common/datetime.go +++ b/libbeat/common/datetime.go @@ -13,6 +13,11 @@ const TsLayout = "2006-01-02T15:04:05.000Z" // Time is an abstraction for the time.Time type type Time time.Time +// String returns a string representation of Time. +func (t Time) String() string { + return time.Time(t).UTC().String() +} + // MarshalJSON implements json.Marshaler interface. // The time is a quoted string in the JsTsLayout format. func (t Time) MarshalJSON() ([]byte, error) { diff --git a/libbeat/filter/rules/fingerprint.go b/libbeat/filter/rules/fingerprint.go new file mode 100644 index 000000000000..e077154b0e08 --- /dev/null +++ b/libbeat/filter/rules/fingerprint.go @@ -0,0 +1,152 @@ +package rules + +import ( + "bytes" + "crypto/md5" + "crypto/sha1" + "crypto/sha256" + "crypto/sha512" + "fmt" + "hash" + "io" + "reflect" + "strings" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/filter" + "github.com/pkg/errors" +) + +type Fingerprint struct { + cond *filter.Condition + hashName string // Name of the hash function. + hashFactory func() hash.Hash // Function to create a new Hash. + fields []string // List of input fields. + target string // Target field where hash is written. +} + +type FingerprintConfig struct { + filter.ConditionConfig `config:",inline"` + + // Hash function used for calculating the fingerprint. The accepted values + // are sha1, sha256, sha512, and md5. This value is case-insensitive. The + // default is sha1. + Hash string `config:"hash"` + + // Fields is a list of fields whose values are to be used as the input to + // the hash function. The field values are concatenated before the hashing + // is performed. All fields must be present in the event otherwise an error + // will be returned by the filter. The default is message. + Fields []string `config:"fields"` + + // Target field for the hash value. The value is hex encoded. The default + // is id. + Target string `config:"target"` +} + +var defaultFingerprintConfig = FingerprintConfig{ + Hash: "sha1", + Fields: []string{"message"}, + Target: "id", +} + +func init() { + if err := filter.RegisterPlugin("fingerprint", newFingerprint); err != nil { + panic(err) + } +} + +func newFingerprint(c common.Config) (filter.FilterRule, error) { + fc := defaultFingerprintConfig + err := c.Unpack(&fc) + if err != nil { + return nil, errors.Wrap(err, "failed to unpack fingerprint config") + } + + conditions, err := filter.NewCondition(fc.ConditionConfig) + if err != nil { + return nil, errors.Wrap(err, "failed to create new condition") + } + + hashName := strings.ToLower(fc.Hash) + var hashFactory func() hash.Hash + switch hashName { + case "sha1": + hashFactory = sha1.New + case "sha256": + hashFactory = sha256.New + case "sha512": + hashFactory = sha512.New + case "md5": + hashFactory = md5.New + default: + return nil, fmt.Errorf("unknown fingerprint hash function: %v", fc.Hash) + } + + return &Fingerprint{ + cond: conditions, + hashName: hashName, + hashFactory: hashFactory, + fields: fc.Fields, + target: fc.Target, + }, nil +} + +func (f *Fingerprint) Filter(event common.MapStr) (common.MapStr, error) { + if f.cond != nil && !f.cond.Check(event) { + return event, nil + } + + h := f.hashFactory() + for _, field := range f.fields { + v, err := event.GetValue(field) + if err != nil { + return event, err + } + writeValue(h, v) + } + + // Compute the hash and encode the value in base64. + hashBytes := h.Sum(nil) + event[f.target] = fmt.Sprintf("%x", hashBytes) + + return event, nil +} + +func (f Fingerprint) String() string { + b := new(bytes.Buffer) + b.WriteString("fingerprint=[") + + b.WriteString("fields=") + b.WriteString(strings.Join(f.fields, ", ")) + + b.WriteString(", hash=") + b.WriteString(f.hashName) + + b.WriteString(", target=") + b.WriteString(f.target) + + if f.cond != nil { + b.WriteString(", condition=") + b.WriteString(f.cond.String()) + } + + b.WriteRune(']') + + return b.String() +} + +func writeValue(writer io.Writer, object interface{}) { + val := reflect.ValueOf(object) + + // Follow the pointer. + if val.Kind() == reflect.Ptr && !val.IsNil() { + val = reflect.ValueOf(val.Elem().Interface()) + } + + if val.IsValid() { + writer.Write([]byte(fmt.Sprintf("%v", val.Interface()))) + } else { + writer.Write([]byte("nil")) + } +} diff --git a/libbeat/filter/rules/fingerprint_test.go b/libbeat/filter/rules/fingerprint_test.go new file mode 100644 index 000000000000..f768e21fb6d6 --- /dev/null +++ b/libbeat/filter/rules/fingerprint_test.go @@ -0,0 +1,155 @@ +package rules + +import ( + "bytes" + "testing" + "time" + + "github.com/elastic/beats/libbeat/common" + "github.com/stretchr/testify/assert" +) + +const message = "[Mon Mar 8 05:31:47 2004] [info] [client 64.242.88.10] " + + "(104)Connection reset by peer: client stopped connection before send " + + "body completed" + +func newTestFingerprint(t testing.TB, hash string, opts ...map[string]interface{}) *Fingerprint { + c, err := common.NewConfigFrom(map[string]interface{}{"hash": hash}) + if err != nil { + t.Fatal(err) + } + + if len(opts) == 1 { + err := c.Merge(opts[0]) + if err != nil { + t.Fatal(err) + } + } + + f, err := newFingerprint(*c) + if err != nil { + t.Fatal(err) + } + return f.(*Fingerprint) +} + +func TestFingerprintHashes(t *testing.T) { + var tests = []struct { + hash string + fingerprint string + }{ + {"sha1", "fe7b2aede2119f5508f466209a26a863d405c1ee"}, + {"sha256", "5c2736f2a1b8ec165ffe3e904b4171ad7581db825f492bca7bdfa0cca4e5630f"}, + {"sha512", "3142c7ff002141dba401d5cae154f4c534d5e3b8403699398c3e1dd20bcad764accccfab108188f14b01e11072c3a705d8d355473ebbea576f008920d6953b4e"}, + {"md5", "ffc2c1d636ac17a38860df03350be0e4"}, + } + + for _, testcase := range tests { + f := newTestFingerprint(t, testcase.hash) + event := common.MapStr{"message": message} + event, err := f.Filter(event) + if assert.NoError(t, err) { + assert.Equal(t, testcase.fingerprint, event["id"]) + } + } +} + +func TestFingerprintFieldConcat(t *testing.T) { + f := newTestFingerprint(t, "sha1", map[string]interface{}{ + "fields": []string{"@timestamp", "record_number", "beat.host", "message"}, + }) + event := common.MapStr{ + "@timestamp": common.Time(time.Unix(1091067890, 0)), + "record_number": 1888399992, + "beat": common.MapStr{ + "host": "example", + }, + "message": message, + } + + event, err := f.Filter(event) + if assert.NoError(t, err) { + assert.Equal(t, "ee89e405f814a308440c20f11adcc36e9e51c393", event["id"]) + } +} + +func TestFingerprintMissingField(t *testing.T) { + f := newTestFingerprint(t, "sha1", map[string]interface{}{ + "fields": []string{"other"}, + }) + event := common.MapStr{"message": message} + + event, err := f.Filter(event) + assert.Error(t, err) + assert.NotNil(t, event) +} + +func TestFingerprintString(t *testing.T) { + f := newTestFingerprint(t, "sha1") + assert.Equal(t, "fingerprint=[fields=message, hash=sha1, target=id]", f.String()) +} + +func TestWriteValue(t *testing.T) { + var tests = []struct { + in interface{} + out string + }{ + {nil, "nil"}, + {true, "true"}, + {8, "8"}, + {uint(10), "10"}, + {18.123, "18.123"}, + {"hello", "hello"}, + } + + b := new(bytes.Buffer) + for _, testcase := range tests { + b.Reset() + writeValue(b, testcase.in) + assert.Equal(t, testcase.out, b.String()) + + b.Reset() + writeValue(b, &testcase.in) + assert.Equal(t, testcase.out, b.String()) + } +} + +func BenchmarkFingerprintFilterSHA1(b *testing.B) { + f := newTestFingerprint(b, "sha1") + event := common.MapStr{"message": message} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + f.Filter(event) + } +} + +func BenchmarkFingerprintFilterSHA256(b *testing.B) { + f := newTestFingerprint(b, "sha256") + event := common.MapStr{"message": message} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + f.Filter(event) + } +} + +func BenchmarkFingerprintFilterSHA512(b *testing.B) { + f := newTestFingerprint(b, "sha512") + event := common.MapStr{"message": message} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + f.Filter(event) + } +} + +func BenchmarkFingerprintFilterMD5(b *testing.B) { + f := newTestFingerprint(b, "md5") + event := common.MapStr{"message": message} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + f.Filter(event) + } +} diff --git a/libbeat/outputs/elasticsearch/bulkapi.go b/libbeat/outputs/elasticsearch/bulkapi.go index 429c2b61e27f..3d7fe261d6d3 100644 --- a/libbeat/outputs/elasticsearch/bulkapi.go +++ b/libbeat/outputs/elasticsearch/bulkapi.go @@ -15,6 +15,7 @@ type bulkMeta struct { type bulkMetaIndex struct { Index string `json:"_index"` DocType string `json:"_type"` + ID string `json:"_id,omitempty"` } // MetaBuilder creates meta data for bulk requests diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 935ec7f48c7f..7607e3881396 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -256,6 +256,12 @@ func eventBulkMeta(index string, event common.MapStr) bulkMeta { DocType: event["type"].(string), }, } + if id, ok := event["id"]; ok { + meta.Index.ID, ok = id.(string) + if !ok { + logp.Err("id is not a string") + } + } return meta }