diff --git a/libbeat/common/datetime.go b/libbeat/common/datetime.go index 7c5010d5d144..c692db298fbc 100644 --- a/libbeat/common/datetime.go +++ b/libbeat/common/datetime.go @@ -13,6 +13,11 @@ const TsLayout = "2006-01-02T15:04:05.000Z" // Time is an abstraction for the time.Time type type Time time.Time +// String returns a string representation of Time. +func (t Time) String() string { + return time.Time(t).UTC().String() +} + // MarshalJSON implements json.Marshaler interface. // The time is a quoted string in the JsTsLayout format. func (t Time) MarshalJSON() ([]byte, error) { diff --git a/libbeat/filter/rules/fingerprint.go b/libbeat/filter/rules/fingerprint.go new file mode 100644 index 000000000000..e077154b0e08 --- /dev/null +++ b/libbeat/filter/rules/fingerprint.go @@ -0,0 +1,152 @@ +package rules + +import ( + "bytes" + "crypto/md5" + "crypto/sha1" + "crypto/sha256" + "crypto/sha512" + "fmt" + "hash" + "io" + "reflect" + "strings" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/filter" + "github.com/pkg/errors" +) + +type Fingerprint struct { + cond *filter.Condition + hashName string // Name of the hash function. + hashFactory func() hash.Hash // Function to create a new Hash. + fields []string // List of input fields. + target string // Target field where hash is written. +} + +type FingerprintConfig struct { + filter.ConditionConfig `config:",inline"` + + // Hash function used for calculating the fingerprint. The accepted values + // are sha1, sha256, sha512, and md5. This value is case-insensitive. The + // default is sha1. + Hash string `config:"hash"` + + // Fields is a list of fields whose values are to be used as the input to + // the hash function. The field values are concatenated before the hashing + // is performed. All fields must be present in the event otherwise an error + // will be returned by the filter. The default is message. + Fields []string `config:"fields"` + + // Target field for the hash value. The value is hex encoded. The default + // is id. + Target string `config:"target"` +} + +var defaultFingerprintConfig = FingerprintConfig{ + Hash: "sha1", + Fields: []string{"message"}, + Target: "id", +} + +func init() { + if err := filter.RegisterPlugin("fingerprint", newFingerprint); err != nil { + panic(err) + } +} + +func newFingerprint(c common.Config) (filter.FilterRule, error) { + fc := defaultFingerprintConfig + err := c.Unpack(&fc) + if err != nil { + return nil, errors.Wrap(err, "failed to unpack fingerprint config") + } + + conditions, err := filter.NewCondition(fc.ConditionConfig) + if err != nil { + return nil, errors.Wrap(err, "failed to create new condition") + } + + hashName := strings.ToLower(fc.Hash) + var hashFactory func() hash.Hash + switch hashName { + case "sha1": + hashFactory = sha1.New + case "sha256": + hashFactory = sha256.New + case "sha512": + hashFactory = sha512.New + case "md5": + hashFactory = md5.New + default: + return nil, fmt.Errorf("unknown fingerprint hash function: %v", fc.Hash) + } + + return &Fingerprint{ + cond: conditions, + hashName: hashName, + hashFactory: hashFactory, + fields: fc.Fields, + target: fc.Target, + }, nil +} + +func (f *Fingerprint) Filter(event common.MapStr) (common.MapStr, error) { + if f.cond != nil && !f.cond.Check(event) { + return event, nil + } + + h := f.hashFactory() + for _, field := range f.fields { + v, err := event.GetValue(field) + if err != nil { + return event, err + } + writeValue(h, v) + } + + // Compute the hash and encode the value in base64. + hashBytes := h.Sum(nil) + event[f.target] = fmt.Sprintf("%x", hashBytes) + + return event, nil +} + +func (f Fingerprint) String() string { + b := new(bytes.Buffer) + b.WriteString("fingerprint=[") + + b.WriteString("fields=") + b.WriteString(strings.Join(f.fields, ", ")) + + b.WriteString(", hash=") + b.WriteString(f.hashName) + + b.WriteString(", target=") + b.WriteString(f.target) + + if f.cond != nil { + b.WriteString(", condition=") + b.WriteString(f.cond.String()) + } + + b.WriteRune(']') + + return b.String() +} + +func writeValue(writer io.Writer, object interface{}) { + val := reflect.ValueOf(object) + + // Follow the pointer. + if val.Kind() == reflect.Ptr && !val.IsNil() { + val = reflect.ValueOf(val.Elem().Interface()) + } + + if val.IsValid() { + writer.Write([]byte(fmt.Sprintf("%v", val.Interface()))) + } else { + writer.Write([]byte("nil")) + } +} diff --git a/libbeat/filter/rules/fingerprint_test.go b/libbeat/filter/rules/fingerprint_test.go new file mode 100644 index 000000000000..f768e21fb6d6 --- /dev/null +++ b/libbeat/filter/rules/fingerprint_test.go @@ -0,0 +1,155 @@ +package rules + +import ( + "bytes" + "testing" + "time" + + "github.com/elastic/beats/libbeat/common" + "github.com/stretchr/testify/assert" +) + +const message = "[Mon Mar 8 05:31:47 2004] [info] [client 64.242.88.10] " + + "(104)Connection reset by peer: client stopped connection before send " + + "body completed" + +func newTestFingerprint(t testing.TB, hash string, opts ...map[string]interface{}) *Fingerprint { + c, err := common.NewConfigFrom(map[string]interface{}{"hash": hash}) + if err != nil { + t.Fatal(err) + } + + if len(opts) == 1 { + err := c.Merge(opts[0]) + if err != nil { + t.Fatal(err) + } + } + + f, err := newFingerprint(*c) + if err != nil { + t.Fatal(err) + } + return f.(*Fingerprint) +} + +func TestFingerprintHashes(t *testing.T) { + var tests = []struct { + hash string + fingerprint string + }{ + {"sha1", "fe7b2aede2119f5508f466209a26a863d405c1ee"}, + {"sha256", "5c2736f2a1b8ec165ffe3e904b4171ad7581db825f492bca7bdfa0cca4e5630f"}, + {"sha512", "3142c7ff002141dba401d5cae154f4c534d5e3b8403699398c3e1dd20bcad764accccfab108188f14b01e11072c3a705d8d355473ebbea576f008920d6953b4e"}, + {"md5", "ffc2c1d636ac17a38860df03350be0e4"}, + } + + for _, testcase := range tests { + f := newTestFingerprint(t, testcase.hash) + event := common.MapStr{"message": message} + event, err := f.Filter(event) + if assert.NoError(t, err) { + assert.Equal(t, testcase.fingerprint, event["id"]) + } + } +} + +func TestFingerprintFieldConcat(t *testing.T) { + f := newTestFingerprint(t, "sha1", map[string]interface{}{ + "fields": []string{"@timestamp", "record_number", "beat.host", "message"}, + }) + event := common.MapStr{ + "@timestamp": common.Time(time.Unix(1091067890, 0)), + "record_number": 1888399992, + "beat": common.MapStr{ + "host": "example", + }, + "message": message, + } + + event, err := f.Filter(event) + if assert.NoError(t, err) { + assert.Equal(t, "ee89e405f814a308440c20f11adcc36e9e51c393", event["id"]) + } +} + +func TestFingerprintMissingField(t *testing.T) { + f := newTestFingerprint(t, "sha1", map[string]interface{}{ + "fields": []string{"other"}, + }) + event := common.MapStr{"message": message} + + event, err := f.Filter(event) + assert.Error(t, err) + assert.NotNil(t, event) +} + +func TestFingerprintString(t *testing.T) { + f := newTestFingerprint(t, "sha1") + assert.Equal(t, "fingerprint=[fields=message, hash=sha1, target=id]", f.String()) +} + +func TestWriteValue(t *testing.T) { + var tests = []struct { + in interface{} + out string + }{ + {nil, "nil"}, + {true, "true"}, + {8, "8"}, + {uint(10), "10"}, + {18.123, "18.123"}, + {"hello", "hello"}, + } + + b := new(bytes.Buffer) + for _, testcase := range tests { + b.Reset() + writeValue(b, testcase.in) + assert.Equal(t, testcase.out, b.String()) + + b.Reset() + writeValue(b, &testcase.in) + assert.Equal(t, testcase.out, b.String()) + } +} + +func BenchmarkFingerprintFilterSHA1(b *testing.B) { + f := newTestFingerprint(b, "sha1") + event := common.MapStr{"message": message} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + f.Filter(event) + } +} + +func BenchmarkFingerprintFilterSHA256(b *testing.B) { + f := newTestFingerprint(b, "sha256") + event := common.MapStr{"message": message} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + f.Filter(event) + } +} + +func BenchmarkFingerprintFilterSHA512(b *testing.B) { + f := newTestFingerprint(b, "sha512") + event := common.MapStr{"message": message} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + f.Filter(event) + } +} + +func BenchmarkFingerprintFilterMD5(b *testing.B) { + f := newTestFingerprint(b, "md5") + event := common.MapStr{"message": message} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + f.Filter(event) + } +} diff --git a/libbeat/outputs/elasticsearch/bulkapi.go b/libbeat/outputs/elasticsearch/bulkapi.go index 429c2b61e27f..3d7fe261d6d3 100644 --- a/libbeat/outputs/elasticsearch/bulkapi.go +++ b/libbeat/outputs/elasticsearch/bulkapi.go @@ -15,6 +15,7 @@ type bulkMeta struct { type bulkMetaIndex struct { Index string `json:"_index"` DocType string `json:"_type"` + ID string `json:"_id,omitempty"` } // MetaBuilder creates meta data for bulk requests diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 935ec7f48c7f..7607e3881396 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -256,6 +256,12 @@ func eventBulkMeta(index string, event common.MapStr) bulkMeta { DocType: event["type"].(string), }, } + if id, ok := event["id"]; ok { + meta.Index.ID, ok = id.(string) + if !ok { + logp.Err("id is not a string") + } + } return meta }