Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update PanOS module to parse Global Protect & User ID logs. {issue}24722[24722] {issue}24724[24724] {pull}24927[24927]
- Add HMAC signature validation support for http_endpoint input. {pull}24918[24918]
- Add new grok pattern for iptables module for Ubiquiti UDM {issue}25615[25615] {pull}25616[25616]
- Add multiline support to aws-s3 input. {issue}25249[25249] {pull}25710[25710]

*Heartbeat*

Expand Down
41 changes: 40 additions & 1 deletion x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@ call will be interrupted.
The default AWS API call timeout for a message is 120 seconds. The minimum
is 0 seconds. The maximum is half of the visibility timeout value.

[id="input-{type}-buffer_size"]
[float]
==== `buffer_size`

The size in bytes of the buffer that each harvester uses when fetching a file.
This only applies to non-JSON logs.
The default is 16384.

[id="input-{type}-encoding"]
[float]
==== `encoding`

The file encoding to use for reading data that contains international
characters. This only applies to non-JSON logs. See <<_encoding_5>>.


[float]
==== `expand_event_list_from_field`

Expand Down Expand Up @@ -91,7 +107,9 @@ setting. If `file_selectors` is given, then any global
`expand_event_list_from_field` value is ignored in favor of the ones
specified in the `file_selectors`. Regex syntax is the same as the Go
language. Files that don't match one of the regexes won't be
processed.
processed. <<input-aws-s3-multiline>>, <<input-aws-s3-max_bytes>>,
<<input-aws-s3-buffer_size>> and <<input-aws-s3-encoding>> may also be
set for each file selector.

["source", "yml"]
----
Expand All @@ -106,12 +124,33 @@ file_selectors:

Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. For example: `s3-fips.us-gov-east-1.amazonaws.com`.

[id="input-{type}-max_bytes"]
[float]
==== `max_bytes`

The maximum number of bytes that a single log message can have. All
bytes after `max_bytes` are discarded and not sent. This setting is
especially useful for multiline log messages, which can get
large. This only applies to non-JSON logs. The default is 10MB
(10485760).

[float]
==== `max_number_of_messages`
The maximum number of messages to return. Amazon SQS never returns more messages
than this value (however, fewer messages might be returned).
Valid values: 1 to 10. Default: 5.

[id="input-{type}-multiline"]
[float]
==== `multiline`

beta[]

Options that control how {beatname_uc} deals with log messages that
span multiple lines. This only applies to non-JSON logs. See
<<multiline-examples>> for more information about configuring
multiline options.

[float]
==== `queue_url`

Expand Down
119 changes: 76 additions & 43 deletions x-pack/filebeat/input/awss3/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
Expand All @@ -29,6 +30,10 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/multiline"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
"github.com/elastic/beats/v7/libbeat/reader/readfile/encoding"
"github.com/elastic/go-concert/unison"
)

Expand All @@ -50,6 +55,11 @@ type s3Info struct {
region string
arn string
expandEventListFromField string
maxBytes int
multiline *multiline.Config
lineTerminator readfile.LineTerminator
encoding string
bufferSize int
}

type bucket struct {
Expand Down Expand Up @@ -273,6 +283,11 @@ func (c *s3Collector) handleSQSMessage(m sqs.Message) ([]s3Info, error) {
key: filename,
arn: record.S3.bucket.Arn,
expandEventListFromField: c.config.ExpandEventListFromField,
maxBytes: c.config.MaxBytes,
multiline: c.config.Multiline,
lineTerminator: c.config.LineTerminator,
encoding: c.config.Encoding,
bufferSize: c.config.BufferSize,
})
continue
}
Expand All @@ -282,15 +297,30 @@ func (c *s3Collector) handleSQSMessage(m sqs.Message) ([]s3Info, error) {
continue
}
if fs.Regex.MatchString(filename) {
s3Infos = append(s3Infos, s3Info{
info := s3Info{
region: record.AwsRegion,
name: record.S3.bucket.Name,
key: filename,
arn: record.S3.bucket.Arn,
expandEventListFromField: fs.ExpandEventListFromField,
})
break
maxBytes: fs.MaxBytes,
multiline: fs.Multiline,
lineTerminator: fs.LineTerminator,
encoding: fs.Encoding,
bufferSize: fs.BufferSize,
}
if info.bufferSize == 0 {
info.bufferSize = c.config.BufferSize
}
if info.maxBytes == 0 {
info.maxBytes = c.config.MaxBytes
}
if info.lineTerminator == 0 {
info.lineTerminator = c.config.LineTerminator
}
s3Infos = append(s3Infos, info)
}
break
}
}
return s3Infos, nil
Expand Down Expand Up @@ -348,67 +378,78 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,

defer resp.Body.Close()

reader := bufio.NewReader(resp.Body)
bodyReader := bufio.NewReader(resp.Body)

isS3ObjGzipped, err := isStreamGzipped(reader)
isS3ObjGzipped, err := isStreamGzipped(bodyReader)
if err != nil {
c.logger.Error(fmt.Errorf("could not determine if S3 object is gzipped: %w", err))
return err
}

if isS3ObjGzipped {
gzipReader, err := gzip.NewReader(reader)
gzipReader, err := gzip.NewReader(bodyReader)
if err != nil {
c.logger.Error(fmt.Errorf("gzip.NewReader failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err))
return err
}
reader = bufio.NewReader(gzipReader)
gzipReader.Close()
defer gzipReader.Close()
bodyReader = bufio.NewReader(gzipReader)
}

// Decode JSON documents when content-type is "application/json" or expand_event_list_from_field is given in config
if resp.ContentType != nil && *resp.ContentType == "application/json" || info.expandEventListFromField != "" {
decoder := json.NewDecoder(reader)
decoder := json.NewDecoder(bodyReader)
err := c.decodeJSON(decoder, objectHash, info, s3Ctx)
if err != nil {
c.logger.Error(fmt.Errorf("decodeJSONWithKey failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err))
return err
return fmt.Errorf("decodeJSONWithKey failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err)
}
return nil
}

// handle s3 objects that are not json content-type
encodingFactory, ok := encoding.FindEncoding(info.encoding)
if !ok || encodingFactory == nil {
return fmt.Errorf("unable to find '%v' encoding", info.encoding)
}
enc, err := encodingFactory(bodyReader)
if err != nil {
return fmt.Errorf("failed to initialize encoding: %v", err)
}
var r reader.Reader
r, err = readfile.NewEncodeReader(ioutil.NopCloser(bodyReader), readfile.Config{
Codec: enc,
BufferSize: info.bufferSize,
Terminator: info.lineTerminator,
MaxBytes: info.maxBytes * 4,
})
r = readfile.NewStripNewline(r, info.lineTerminator)

if info.multiline != nil {
r, err = multiline.New(r, "\n", info.maxBytes, info.multiline)
if err != nil {
return fmt.Errorf("error setting up multiline: %v", err)
}
}

r = readfile.NewLimitReader(r, info.maxBytes)

var offset int64
for {
log, err := readStringAndTrimDelimiter(reader)
message, err := r.Next()
if err == io.EOF {
// create event for last line
offset += int64(len(log))
event := createEvent(log, offset, info, objectHash, s3Ctx)
err = c.forwardEvent(event)
if err != nil {
c.logger.Error(fmt.Errorf("forwardEvent failed: %w", err))
return err
}
return nil
} else if err != nil {
c.logger.Error(fmt.Errorf("readStringAndTrimDelimiter failed: %w", err))
return err
}

if log == "" {
continue
// No more lines
break
}

// create event per log line
offset += int64(len(log))
event := createEvent(log, offset, info, objectHash, s3Ctx)
err = c.forwardEvent(event)
if err != nil {
c.logger.Error(fmt.Errorf("forwardEvent failed: %w", err))
return err
return fmt.Errorf("error reading message: %w", err)
}
event := createEvent(string(message.Content), offset, info, objectHash, s3Ctx)
offset += int64(message.Bytes)
if err = c.forwardEvent(event); err != nil {
return fmt.Errorf("forwardEvent failed: %w", err)
}
}
return nil
}

func (c *s3Collector) decodeJSON(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Ctx *s3Context) error {
Expand Down Expand Up @@ -540,14 +581,6 @@ func trimLogDelimiter(log string) string {
return strings.TrimSuffix(log, "\n")
}

func readStringAndTrimDelimiter(reader *bufio.Reader) (string, error) {
logOriginal, err := reader.ReadString('\n')
if err != nil {
return logOriginal, err
}
return trimLogDelimiter(logOriginal), nil
}

func createEvent(log string, offset int64, info s3Info, objectHash string, s3Ctx *s3Context) beat.Event {
s3Ctx.Inc()

Expand Down
37 changes: 30 additions & 7 deletions x-pack/filebeat/input/awss3/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
"github.com/elastic/beats/v7/libbeat/reader/readfile/encoding"
)

// MockS3Client struct is used for unit tests.
Expand Down Expand Up @@ -237,23 +240,43 @@ func TestNewS3BucketReader(t *testing.T) {

resp, err := req.Send(ctx)
assert.NoError(t, err)
reader := bufio.NewReader(resp.Body)
bodyReader := bufio.NewReader(resp.Body)
defer resp.Body.Close()

encFactory, ok := encoding.FindEncoding("plain")
if !ok {
t.Fatalf("unable to find 'plain' encoding")
}

enc, err := encFactory(bodyReader)
if err != nil {
t.Fatalf("failed to initialize encoding: %v", err)
}

var r reader.Reader
r, err = readfile.NewEncodeReader(ioutil.NopCloser(bodyReader), readfile.Config{
Codec: enc,
BufferSize: 4096,
Terminator: readfile.LineFeed,
})
if err != nil {
t.Fatalf("Failed to initialize line reader: %v", err)
}

r = readfile.NewStripNewline(r, readfile.LineFeed)

for i := 0; i < 3; i++ {
msg, err := r.Next()
switch i {
case 0:
log, err := readStringAndTrimDelimiter(reader)
assert.NoError(t, err)
assert.Equal(t, s3LogString1Trimmed, log)
assert.Equal(t, s3LogString1Trimmed, string(msg.Content))
case 1:
log, err := readStringAndTrimDelimiter(reader)
assert.NoError(t, err)
assert.Equal(t, s3LogString2Trimmed, log)
assert.Equal(t, s3LogString2Trimmed, string(msg.Content))
case 2:
log, err := readStringAndTrimDelimiter(reader)
assert.Error(t, io.EOF, err)
assert.Equal(t, "", log)
assert.Equal(t, "", string(msg.Content))
}
}
}
Expand Down
Loading