Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-developer.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ The list below covers the major changes between 6.3.0 and master only.
- Libbeat provides a global registry for beats developer that allow to register and retrieve plugin. {pull}7392[7392]
- Added more options to control required and optional fields in schema.Apply(), error returned is a plain nil if no error happened {pull}7335[7335]
- Packaging on MacOS now produces a .dmg file containing an installer (.pkg) and uninstaller for the Beat. {pull}7481[7481]
- New function `AddTagsWithKey` is added, so `common.MapStr` can be enriched with tags with an arbitrary key. {pull}7991[7991]
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff]

*Filebeat*

- Add tag "truncated" to "log.flags" if incoming line is longer than configured limit. {pull}7991[7991]

*Heartbeat*

*Metricbeat*
Expand Down
4 changes: 4 additions & 0 deletions filebeat/_meta/fields.common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@
description: >
Logging level.

- name: log.flags
description: >
This field contains the flags of the event.

- name: event.created
type: date
description: >
Expand Down
8 changes: 8 additions & 0 deletions filebeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2652,6 +2652,14 @@ type: keyword
Logging level.


--

*`log.flags`*::
+
--
This field contains the flags of the event.


--

*`event.created`*::
Expand Down
2 changes: 1 addition & 1 deletion filebeat/include/fields.go

Large diffs are not rendered by default.

23 changes: 19 additions & 4 deletions filebeat/reader/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,28 @@ func (m *Message) IsEmpty() bool {
return false
}

func (msg *Message) AddFields(fields common.MapStr) {
// AddFields adds fields to the message.
func (m *Message) AddFields(fields common.MapStr) {
if fields == nil {
return
}

if msg.Fields == nil {
msg.Fields = common.MapStr{}
if m.Fields == nil {
m.Fields = common.MapStr{}
}
msg.Fields.Update(fields)
m.Fields.Update(fields)
}

// AddFlagsWithKey adds flags to the message with an arbitrary key.
// If the field does not exist, it is created.
func (m *Message) AddFlagsWithKey(key string, flags ...string) error {
if len(flags) == 0 {
return nil
}

if m.Fields == nil {
m.Fields = common.MapStr{}
}

return common.AddTagsWithKey(m.Fields, key, flags)
}
17 changes: 17 additions & 0 deletions filebeat/reader/multiline/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Reader struct {
separator []byte
last []byte
numLines int
truncated int
err error // last seen error
state func(*Reader) (reader.Message, error)
message reader.Message
Expand Down Expand Up @@ -262,13 +263,19 @@ func (mlr *Reader) clear() {
mlr.message = reader.Message{}
mlr.last = nil
mlr.numLines = 0
mlr.truncated = 0
mlr.err = nil
}

// finalize writes the existing content into the returned message and resets all reader variables.
func (mlr *Reader) finalize() reader.Message {
if mlr.truncated > 0 {
mlr.message.AddFlagsWithKey("log.flags", "truncated")
}

// Copy message from existing content
msg := mlr.message

mlr.clear()
return msg
}
Expand Down Expand Up @@ -303,6 +310,16 @@ func (mlr *Reader) addLine(m reader.Message) {
}
mlr.message.Content = append(tmp, m.Content[:space]...)
mlr.numLines++

// add number of truncated bytes to fields
diff := len(m.Content) - space
if diff > 0 {
mlr.truncated += diff
}
} else {
// increase the number of skipped bytes, if cannot add
mlr.truncated += len(m.Content)

}

mlr.last = m.Content
Expand Down
83 changes: 83 additions & 0 deletions filebeat/reader/multiline/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,41 @@ func TestMultilineBeforeNegateOKWithEmptyLine(t *testing.T) {
)
}

func TestMultilineAfterTruncated(t *testing.T) {
pattern := match.MustCompile(`^[ ]`) // next line is indented a space
maxLines := 2
testMultilineTruncated(t,
Config{
Pattern: &pattern,
Match: "after",
MaxLines: &maxLines,
},
2,
true,
[]string{
"line1\n line1.1\n line1.2\n",
"line2\n line2.1\n line2.2\n"},
[]string{
"line1\n line1.1",
"line2\n line2.1"},
)
testMultilineTruncated(t,
Config{
Pattern: &pattern,
Match: "after",
MaxLines: &maxLines,
},
2,
false,
[]string{
"line1\n line1.1\n",
"line2\n line2.1\n"},
[]string{
"line1\n line1.1",
"line2\n line2.1"},
)
}

func testMultilineOK(t *testing.T, cfg Config, events int, expected ...string) {
_, buf := createLineBuffer(expected...)
r := createMultilineTestReader(t, buf, cfg)
Expand Down Expand Up @@ -177,6 +212,54 @@ func testMultilineOK(t *testing.T, cfg Config, events int, expected ...string) {
}
}

func testMultilineTruncated(t *testing.T, cfg Config, events int, truncated bool, input, expected []string) {
_, buf := createLineBuffer(input...)
r := createMultilineTestReader(t, buf, cfg)

var messages []reader.Message
for {
message, err := r.Next()
if err != nil {
break
}

messages = append(messages, message)
}

if len(messages) != events {
t.Fatalf("expected %v lines, read only %v line(s)", len(expected), len(messages))
}

for _, message := range messages {
found := false
statusFlags, err := message.Fields.GetValue("log.flags")
if err != nil {
if !truncated {
assert.False(t, found)
return
}
t.Fatalf("error while getting log.status field: %v", err)
}

switch flags := statusFlags.(type) {
case []string:
for _, f := range flags {
if f == "truncated" {
found = true
}
}
default:
t.Fatalf("incorrect type for log.flags")
}

if truncated {
assert.True(t, found)
} else {
assert.False(t, found)
}
}
}

func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg Config) reader.Reader {
encFactory, ok := encoding.FindEncoding("plain")
if !ok {
Expand Down
1 change: 1 addition & 0 deletions filebeat/reader/readfile/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func (r *LimitReader) Next() (reader.Message, error) {
message, err := r.reader.Next()
if len(message.Content) > r.maxBytes {
message.Content = message.Content[:r.maxBytes]
message.AddFlagsWithKey("log.flags", "truncated")
}
return message, err
}
88 changes: 88 additions & 0 deletions filebeat/reader/readfile/limit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build !integration

package readfile

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/filebeat/reader"
)

type mockReader struct {
line []byte
}

func (m *mockReader) Next() (reader.Message, error) {
return reader.Message{
Content: m.line,
}, nil
}

var limitTests = []struct {
line string
maxBytes int
truncated bool
}{
{"long-long-line", 5, true},
{"long-long-line", 3, true},
{"long-long-line", len("long-long-line"), false},
}

func TestLimitReader(t *testing.T) {
for _, test := range limitTests {
r := NewLimitReader(&mockReader{[]byte(test.line)}, test.maxBytes)

msg, err := r.Next()
if err != nil {
t.Fatalf("Error reading from mock reader: %v", err)
}

assert.Equal(t, test.maxBytes, len(msg.Content))

found := false
statusFlags, err := msg.Fields.GetValue("log.flags")
if err != nil {
if !test.truncated {
assert.False(t, found)
return
}
t.Fatalf("Error getting truncated value: %v", err)
}

switch flags := statusFlags.(type) {
case []string:
for _, f := range flags {
if f == "truncated" {
found = true
}
}
default:
t.Fatalf("incorrect type for log.flags")
}

if test.truncated {
assert.True(t, found)
} else {
assert.False(t, found)
}
}
}
27 changes: 20 additions & 7 deletions libbeat/common/mapstr.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,25 +305,38 @@ func MergeFields(ms, fields MapStr, underRoot bool) error {
// exist then it will be created. If the tags field exists and is not a []string
// then an error will be returned. It does not deduplicate the list of tags.
func AddTags(ms MapStr, tags []string) error {
return AddTagsWithKey(ms, TagsKey, tags)
}

// AddTagsWithKey appends a tag to the key field of ms. If the field does not
// exist then it will be created. If the field exists and is not a []string
// then an error will be returned. It does not deduplicate the list.
func AddTagsWithKey(ms MapStr, key string, tags []string) error {
if ms == nil || len(tags) == 0 {
return nil
}
eventTags, exists := ms[TagsKey]
if !exists {
ms[TagsKey] = tags

k, subMap, oldTags, present, err := mapFind(key, ms, true)
if err != nil {
return err
}

if !present {
subMap[k] = tags
return nil
}

switch arr := eventTags.(type) {
switch arr := oldTags.(type) {
case []string:
ms[TagsKey] = append(arr, tags...)
subMap[k] = append(arr, tags...)
case []interface{}:
for _, tag := range tags {
arr = append(arr, tag)
}
ms[TagsKey] = arr
subMap[k] = arr
default:
return errors.Errorf("expected string array by type is %T", eventTags)
return errors.Errorf("expected string array by type is %T", oldTags)

}
return nil
}
Expand Down
Loading