Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update PanOS module to parse Global Protect & User ID logs. {issue}24722[24722] {issue}24724[24724] {pull}24927[24927]
- Add HMAC signature validation support for http_endpoint input. {pull}24918[24918]
- Add new grok pattern for iptables module for Ubiquiti UDM {issue}25615[25615] {pull}25616[25616]
- Add multiline support to aws-s3 input. {issue}25249[25249] {pull}25710[25710]

*Heartbeat*

Expand Down
33 changes: 33 additions & 0 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ call will be interrupted.
The default AWS API call timeout for a message is 120 seconds. The minimum
is 0 seconds. The maximum is half of the visibility timeout value.

[float]
==== `buffer_size`

The size in bytes of the buffer that each harvester uses when fetching a file.
This only applies to non-JSON logs.
The default is 16384.

[float]
==== `encoding`

The file encoding to use for reading data that contains international
characters. This only applies to non-JSON logs. See <<_encoding_5>>.


[float]
==== `expand_event_list_from_field`

Expand Down Expand Up @@ -106,12 +120,31 @@ file_selectors:

Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. For example: `s3-fips.us-gov-east-1.amazonaws.com`.

[float]
==== `max_bytes`

The maximum number of bytes that a single log message can have. All
bytes after `max_bytes` are discarded and not sent. This setting is
especially useful for multiline log messages, which can get
large. This only applies to non-JSON logs. The default is 10MB
(10485760).

[float]
==== `max_number_of_messages`
The maximum number of messages to return. Amazon SQS never returns more messages
than this value (however, fewer messages might be returned).
Valid values: 1 to 10. Default: 5.

[float]
==== `multiline`

beta[]

Options that control how {beatname_uc} deals with log messages that
span multiple lines. This only applies to non-JSON logs. See
<<multiline-examples>> for more information about configuring
multiline options.

[float]
==== `queue_url`

Expand Down
119 changes: 76 additions & 43 deletions x-pack/filebeat/input/awss3/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
Expand All @@ -29,6 +30,10 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/multiline"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
"github.com/elastic/beats/v7/libbeat/reader/readfile/encoding"
"github.com/elastic/go-concert/unison"
)

Expand All @@ -50,6 +55,11 @@ type s3Info struct {
region string
arn string
expandEventListFromField string
maxBytes int
multiline *multiline.Config
lineTerminator readfile.LineTerminator
encoding string
bufferSize int
}

type bucket struct {
Expand Down Expand Up @@ -273,6 +283,11 @@ func (c *s3Collector) handleSQSMessage(m sqs.Message) ([]s3Info, error) {
key: filename,
arn: record.S3.bucket.Arn,
expandEventListFromField: c.config.ExpandEventListFromField,
maxBytes: c.config.MaxBytes,
multiline: c.config.Multiline,
lineTerminator: c.config.LineTerminator,
encoding: c.config.Encoding,
bufferSize: c.config.BufferSize,
})
continue
}
Expand All @@ -282,15 +297,30 @@ func (c *s3Collector) handleSQSMessage(m sqs.Message) ([]s3Info, error) {
continue
}
if fs.Regex.MatchString(filename) {
s3Infos = append(s3Infos, s3Info{
info := s3Info{
region: record.AwsRegion,
name: record.S3.bucket.Name,
key: filename,
arn: record.S3.bucket.Arn,
expandEventListFromField: fs.ExpandEventListFromField,
})
break
maxBytes: fs.MaxBytes,
multiline: fs.Multiline,
lineTerminator: fs.LineTerminator,
encoding: fs.Encoding,
bufferSize: fs.BufferSize,
}
if info.bufferSize == 0 {
info.bufferSize = c.config.BufferSize
}
if info.maxBytes == 0 {
info.maxBytes = c.config.MaxBytes
}
if info.lineTerminator == 0 {
info.lineTerminator = c.config.LineTerminator
}
s3Infos = append(s3Infos, info)
}
break
}
}
return s3Infos, nil
Expand Down Expand Up @@ -348,67 +378,78 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,

defer resp.Body.Close()

reader := bufio.NewReader(resp.Body)
bodyReader := bufio.NewReader(resp.Body)

isS3ObjGzipped, err := isStreamGzipped(reader)
isS3ObjGzipped, err := isStreamGzipped(bodyReader)
if err != nil {
c.logger.Error(fmt.Errorf("could not determine if S3 object is gzipped: %w", err))
return err
}

if isS3ObjGzipped {
gzipReader, err := gzip.NewReader(reader)
gzipReader, err := gzip.NewReader(bodyReader)
if err != nil {
c.logger.Error(fmt.Errorf("gzip.NewReader failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err))
return err
}
reader = bufio.NewReader(gzipReader)
gzipReader.Close()
defer gzipReader.Close()
bodyReader = bufio.NewReader(gzipReader)
}

// Decode JSON documents when content-type is "application/json" or expand_event_list_from_field is given in config
if resp.ContentType != nil && *resp.ContentType == "application/json" || info.expandEventListFromField != "" {
decoder := json.NewDecoder(reader)
decoder := json.NewDecoder(bodyReader)
err := c.decodeJSON(decoder, objectHash, info, s3Ctx)
if err != nil {
c.logger.Error(fmt.Errorf("decodeJSONWithKey failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err))
return err
return fmt.Errorf("decodeJSONWithKey failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err)
}
return nil
}

// handle s3 objects that are not json content-type
encodingFactory, ok := encoding.FindEncoding(info.encoding)
if !ok || encodingFactory == nil {
return fmt.Errorf("unable to find '%v' encoding", info.encoding)
}
enc, err := encodingFactory(bodyReader)
if err != nil {
return fmt.Errorf("failed to initialize encoding: %v", err)
}
var r reader.Reader
r, err = readfile.NewEncodeReader(ioutil.NopCloser(bodyReader), readfile.Config{
Codec: enc,
BufferSize: info.bufferSize,
Terminator: info.lineTerminator,
MaxBytes: info.maxBytes * 4,
})
r = readfile.NewStripNewline(r, info.lineTerminator)

if info.multiline != nil {
r, err = multiline.New(r, "\n", info.maxBytes, info.multiline)
if err != nil {
return fmt.Errorf("error setting up multiline: %v", err)
}
}

r = readfile.NewLimitReader(r, info.maxBytes)

var offset int64
for {
log, err := readStringAndTrimDelimiter(reader)
message, err := r.Next()
if err == io.EOF {
// create event for last line
offset += int64(len(log))
event := createEvent(log, offset, info, objectHash, s3Ctx)
err = c.forwardEvent(event)
if err != nil {
c.logger.Error(fmt.Errorf("forwardEvent failed: %w", err))
return err
}
return nil
} else if err != nil {
c.logger.Error(fmt.Errorf("readStringAndTrimDelimiter failed: %w", err))
return err
}

if log == "" {
continue
// No more lines
break
}

// create event per log line
offset += int64(len(log))
event := createEvent(log, offset, info, objectHash, s3Ctx)
err = c.forwardEvent(event)
if err != nil {
c.logger.Error(fmt.Errorf("forwardEvent failed: %w", err))
return err
return fmt.Errorf("error reading message: %w", err)
}
event := createEvent(string(message.Content), offset, info, objectHash, s3Ctx)
offset += int64(message.Bytes)
if err = c.forwardEvent(event); err != nil {
return fmt.Errorf("forwardEvent failed: %w", err)
}
}
return nil
}

func (c *s3Collector) decodeJSON(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Ctx *s3Context) error {
Expand Down Expand Up @@ -540,14 +581,6 @@ func trimLogDelimiter(log string) string {
return strings.TrimSuffix(log, "\n")
}

func readStringAndTrimDelimiter(reader *bufio.Reader) (string, error) {
logOriginal, err := reader.ReadString('\n')
if err != nil {
return logOriginal, err
}
return trimLogDelimiter(logOriginal), nil
}

func createEvent(log string, offset int64, info s3Info, objectHash string, s3Ctx *s3Context) beat.Event {
s3Ctx.Inc()

Expand Down
37 changes: 30 additions & 7 deletions x-pack/filebeat/input/awss3/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
"github.com/elastic/beats/v7/libbeat/reader/readfile/encoding"
)

// MockS3Client struct is used for unit tests.
Expand Down Expand Up @@ -237,23 +240,43 @@ func TestNewS3BucketReader(t *testing.T) {

resp, err := req.Send(ctx)
assert.NoError(t, err)
reader := bufio.NewReader(resp.Body)
bodyReader := bufio.NewReader(resp.Body)
defer resp.Body.Close()

encFactory, ok := encoding.FindEncoding("plain")
if !ok {
t.Fatalf("unable to find 'plain' encoding")
}

enc, err := encFactory(bodyReader)
if err != nil {
t.Fatalf("failed to initialize encoding: %v", err)
}

var r reader.Reader
r, err = readfile.NewEncodeReader(ioutil.NopCloser(bodyReader), readfile.Config{
Codec: enc,
BufferSize: 4096,
Terminator: readfile.LineFeed,
})
if err != nil {
t.Fatalf("Failed to initialize line reader: %v", err)
}

r = readfile.NewStripNewline(r, readfile.LineFeed)

for i := 0; i < 3; i++ {
msg, err := r.Next()
switch i {
case 0:
log, err := readStringAndTrimDelimiter(reader)
assert.NoError(t, err)
assert.Equal(t, s3LogString1Trimmed, log)
assert.Equal(t, s3LogString1Trimmed, string(msg.Content))
case 1:
log, err := readStringAndTrimDelimiter(reader)
assert.NoError(t, err)
assert.Equal(t, s3LogString2Trimmed, log)
assert.Equal(t, s3LogString2Trimmed, string(msg.Content))
case 2:
log, err := readStringAndTrimDelimiter(reader)
assert.Error(t, io.EOF, err)
assert.Equal(t, "", log)
assert.Equal(t, "", string(msg.Content))
}
}
}
Expand Down
38 changes: 27 additions & 11 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,38 @@ import (
"regexp"
"time"

"github.com/dustin/go-humanize"
"github.com/elastic/beats/v7/libbeat/reader/multiline"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
)

type config struct {
APITimeout time.Duration `config:"api_timeout"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
FileSelectors []FileSelectorCfg `config:"file_selectors"`
FipsEnabled bool `config:"fips_enabled"`
MaxNumberOfMessages int `config:"max_number_of_messages"`
QueueURL string `config:"queue_url" validate:"nonzero,required"`
VisibilityTimeout time.Duration `config:"visibility_timeout"`
AwsConfig awscommon.ConfigAWS `config:",inline"`
APITimeout time.Duration `config:"api_timeout"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
FileSelectors []FileSelectorCfg `config:"file_selectors"`
FipsEnabled bool `config:"fips_enabled"`
MaxNumberOfMessages int `config:"max_number_of_messages"`
QueueURL string `config:"queue_url" validate:"nonzero,required"`
VisibilityTimeout time.Duration `config:"visibility_timeout"`
AwsConfig awscommon.ConfigAWS `config:",inline"`
MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"`
Multiline *multiline.Config `config:"multiline"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think these options should be available in the FileSelectorCfg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmmm we could...

The use case would be a single S3 bucket that has a mix of multiline and non-multiline log files.

I'll add it, and we can see if we like it.

Copy link
Member

@andrewkroh andrewkroh May 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parser options could be combined into a struct that is embedded in both config and FileSelectorCfg to avoid having to duplicate the same config struct tags.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking of changing this so you always have a file_selector, the default is just to match any filename. That should make the config and code paths cleaner. I know we want to get a build out with multiline "soon". You OK with merging as is, with a new PR this week to clean this up?

LineTerminator readfile.LineTerminator `config:"line_terminator"`
Encoding string `config:"encoding"`
BufferSize int `config:"buffer_size"`
}

// FileSelectorCfg defines type and configuration of FileSelectors
type FileSelectorCfg struct {
RegexString string `config:"regex"`
Regex *regexp.Regexp `config:",ignore"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
RegexString string `config:"regex"`
Regex *regexp.Regexp `config:",ignore"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"`
Multiline *multiline.Config `config:"multiline"`
LineTerminator readfile.LineTerminator `config:"line_terminator"`
Encoding string `config:"encoding"`
BufferSize int `config:"buffer_size"`
}

func defaultConfig() config {
Expand All @@ -36,6 +49,9 @@ func defaultConfig() config {
FipsEnabled: false,
MaxNumberOfMessages: 5,
VisibilityTimeout: 300 * time.Second,
LineTerminator: readfile.AutoLineTerminator,
MaxBytes: 10 * humanize.MiByte,
BufferSize: 16 * humanize.KiByte,
}
}

Expand Down
8 changes: 8 additions & 0 deletions x-pack/filebeat/input/awss3/ftest/sample2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<Event><Data>
A
B
C</Data></Event>
<Event><Data>
D
E
F</Data</Event>
Loading