Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions libbeat/common/datetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ const TsLayout = "2006-01-02T15:04:05.000Z"
// Time is an abstraction for the time.Time type
type Time time.Time

// String returns a string representation of Time.
func (t Time) String() string {
return time.Time(t).UTC().String()
}

// MarshalJSON implements json.Marshaler interface.
// The time is a quoted string in the JsTsLayout format.
func (t Time) MarshalJSON() ([]byte, error) {
Expand Down
152 changes: 152 additions & 0 deletions libbeat/filter/rules/fingerprint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package rules

import (
"bytes"
"crypto/md5"
"crypto/sha1"
"crypto/sha256"
"crypto/sha512"
"fmt"
"hash"
"io"
"reflect"
"strings"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/filter"
"github.com/pkg/errors"
)

type Fingerprint struct {
cond *filter.Condition
hashName string // Name of the hash function.
hashFactory func() hash.Hash // Function to create a new Hash.
fields []string // List of input fields.
target string // Target field where hash is written.
}

type FingerprintConfig struct {
filter.ConditionConfig `config:",inline"`

// Hash function used for calculating the fingerprint. The accepted values
// are sha1, sha256, sha512, and md5. This value is case-insensitive. The
// default is sha1.
Hash string `config:"hash"`

// Fields is a list of fields whose values are to be used as the input to
// the hash function. The field values are concatenated before the hashing
// is performed. All fields must be present in the event otherwise an error
// will be returned by the filter. The default is message.
Fields []string `config:"fields"`

// Target field for the hash value. The value is hex encoded. The default
// is id.
Target string `config:"target"`
}

var defaultFingerprintConfig = FingerprintConfig{
Hash: "sha1",
Fields: []string{"message"},
Target: "id",
}

func init() {
if err := filter.RegisterPlugin("fingerprint", newFingerprint); err != nil {
panic(err)
}
}

func newFingerprint(c common.Config) (filter.FilterRule, error) {
fc := defaultFingerprintConfig
err := c.Unpack(&fc)
if err != nil {
return nil, errors.Wrap(err, "failed to unpack fingerprint config")
}

conditions, err := filter.NewCondition(fc.ConditionConfig)
if err != nil {
return nil, errors.Wrap(err, "failed to create new condition")
}

hashName := strings.ToLower(fc.Hash)
var hashFactory func() hash.Hash
switch hashName {
case "sha1":
hashFactory = sha1.New
case "sha256":
hashFactory = sha256.New
case "sha512":
hashFactory = sha512.New
case "md5":
hashFactory = md5.New
default:
return nil, fmt.Errorf("unknown fingerprint hash function: %v", fc.Hash)
}

return &Fingerprint{
cond: conditions,
hashName: hashName,
hashFactory: hashFactory,
fields: fc.Fields,
target: fc.Target,
}, nil
}

func (f *Fingerprint) Filter(event common.MapStr) (common.MapStr, error) {
if f.cond != nil && !f.cond.Check(event) {
return event, nil
}

h := f.hashFactory()
for _, field := range f.fields {
v, err := event.GetValue(field)
if err != nil {
return event, err
}
writeValue(h, v)
}

// Compute the hash and encode the value in base64.
hashBytes := h.Sum(nil)
event[f.target] = fmt.Sprintf("%x", hashBytes)

return event, nil
}

func (f Fingerprint) String() string {
b := new(bytes.Buffer)
b.WriteString("fingerprint=[")

b.WriteString("fields=")
b.WriteString(strings.Join(f.fields, ", "))

b.WriteString(", hash=")
b.WriteString(f.hashName)

b.WriteString(", target=")
b.WriteString(f.target)

if f.cond != nil {
b.WriteString(", condition=")
b.WriteString(f.cond.String())
}

b.WriteRune(']')

return b.String()
}

func writeValue(writer io.Writer, object interface{}) {
val := reflect.ValueOf(object)

// Follow the pointer.
if val.Kind() == reflect.Ptr && !val.IsNil() {
val = reflect.ValueOf(val.Elem().Interface())
}

if val.IsValid() {
writer.Write([]byte(fmt.Sprintf("%v", val.Interface())))
} else {
writer.Write([]byte("nil"))
}
}
155 changes: 155 additions & 0 deletions libbeat/filter/rules/fingerprint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package rules

import (
"bytes"
"testing"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/stretchr/testify/assert"
)

const message = "[Mon Mar 8 05:31:47 2004] [info] [client 64.242.88.10] " +
"(104)Connection reset by peer: client stopped connection before send " +
"body completed"

func newTestFingerprint(t testing.TB, hash string, opts ...map[string]interface{}) *Fingerprint {
c, err := common.NewConfigFrom(map[string]interface{}{"hash": hash})
if err != nil {
t.Fatal(err)
}

if len(opts) == 1 {
err := c.Merge(opts[0])
if err != nil {
t.Fatal(err)
}
}

f, err := newFingerprint(*c)
if err != nil {
t.Fatal(err)
}
return f.(*Fingerprint)
}

func TestFingerprintHashes(t *testing.T) {
var tests = []struct {
hash string
fingerprint string
}{
{"sha1", "fe7b2aede2119f5508f466209a26a863d405c1ee"},
{"sha256", "5c2736f2a1b8ec165ffe3e904b4171ad7581db825f492bca7bdfa0cca4e5630f"},
{"sha512", "3142c7ff002141dba401d5cae154f4c534d5e3b8403699398c3e1dd20bcad764accccfab108188f14b01e11072c3a705d8d355473ebbea576f008920d6953b4e"},
{"md5", "ffc2c1d636ac17a38860df03350be0e4"},
}

for _, testcase := range tests {
f := newTestFingerprint(t, testcase.hash)
event := common.MapStr{"message": message}
event, err := f.Filter(event)
if assert.NoError(t, err) {
assert.Equal(t, testcase.fingerprint, event["id"])
}
}
}

func TestFingerprintFieldConcat(t *testing.T) {
f := newTestFingerprint(t, "sha1", map[string]interface{}{
"fields": []string{"@timestamp", "record_number", "beat.host", "message"},
})
event := common.MapStr{
"@timestamp": common.Time(time.Unix(1091067890, 0)),
"record_number": 1888399992,
"beat": common.MapStr{
"host": "example",
},
"message": message,
}

event, err := f.Filter(event)
if assert.NoError(t, err) {
assert.Equal(t, "ee89e405f814a308440c20f11adcc36e9e51c393", event["id"])
}
}

func TestFingerprintMissingField(t *testing.T) {
f := newTestFingerprint(t, "sha1", map[string]interface{}{
"fields": []string{"other"},
})
event := common.MapStr{"message": message}

event, err := f.Filter(event)
assert.Error(t, err)
assert.NotNil(t, event)
}

func TestFingerprintString(t *testing.T) {
f := newTestFingerprint(t, "sha1")
assert.Equal(t, "fingerprint=[fields=message, hash=sha1, target=id]", f.String())
}

func TestWriteValue(t *testing.T) {
var tests = []struct {
in interface{}
out string
}{
{nil, "nil"},
{true, "true"},
{8, "8"},
{uint(10), "10"},
{18.123, "18.123"},
{"hello", "hello"},
}

b := new(bytes.Buffer)
for _, testcase := range tests {
b.Reset()
writeValue(b, testcase.in)
assert.Equal(t, testcase.out, b.String())

b.Reset()
writeValue(b, &testcase.in)
assert.Equal(t, testcase.out, b.String())
}
}

func BenchmarkFingerprintFilterSHA1(b *testing.B) {
f := newTestFingerprint(b, "sha1")
event := common.MapStr{"message": message}

b.ResetTimer()
for i := 0; i < b.N; i++ {
f.Filter(event)
}
}

func BenchmarkFingerprintFilterSHA256(b *testing.B) {
f := newTestFingerprint(b, "sha256")
event := common.MapStr{"message": message}

b.ResetTimer()
for i := 0; i < b.N; i++ {
f.Filter(event)
}
}

func BenchmarkFingerprintFilterSHA512(b *testing.B) {
f := newTestFingerprint(b, "sha512")
event := common.MapStr{"message": message}

b.ResetTimer()
for i := 0; i < b.N; i++ {
f.Filter(event)
}
}

func BenchmarkFingerprintFilterMD5(b *testing.B) {
f := newTestFingerprint(b, "md5")
event := common.MapStr{"message": message}

b.ResetTimer()
for i := 0; i < b.N; i++ {
f.Filter(event)
}
}
1 change: 1 addition & 0 deletions libbeat/outputs/elasticsearch/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type bulkMeta struct {
type bulkMetaIndex struct {
Index string `json:"_index"`
DocType string `json:"_type"`
ID string `json:"_id,omitempty"`
}

// MetaBuilder creates meta data for bulk requests
Expand Down
6 changes: 6 additions & 0 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,12 @@ func eventBulkMeta(index string, event common.MapStr) bulkMeta {
DocType: event["type"].(string),
},
}
if id, ok := event["id"]; ok {
meta.Index.ID, ok = id.(string)
if !ok {
logp.Err("id is not a string")
}
}
return meta
}

Expand Down