From e2cf26de1ba1473c8b68f41fb10f43496669836f Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Thu, 13 May 2021 08:26:09 -0500 Subject: [PATCH 1/7] Add multiline support to awss3 input - only applies to non JSON logstash Closes #25249 --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-aws-s3.asciidoc | 31 +++++++ x-pack/filebeat/input/awss3/collector.go | 85 ++++++++++--------- x-pack/filebeat/input/awss3/collector_test.go | 37 ++++++-- x-pack/filebeat/input/awss3/config.go | 27 ++++-- 5 files changed, 128 insertions(+), 53 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 16345c981c0d..931549bd22d7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -854,6 +854,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Update PanOS module to parse Global Protect & User ID logs. {issue}24722[24722] {issue}24724[24724] {pull}24927[24927] - Add HMAC signature validation support for http_endpoint input. {pull}24918[24918] - Add new grok pattern for iptables module for Ubiquiti UDM {issue}25615[25615] {pull}25616[25616] +- Add multiline support to aws-s3 input. {issue}25249[25249] {pull}25710[25710] *Heartbeat* diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index e757ad6faead..903f370c8d5e 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -43,6 +43,20 @@ call will be interrupted. The default AWS API call timeout for a message is 120 seconds. The minimum is 0 seconds. The maximum is half of the visibility timeout value. +[float] +==== `buffer_size` + +The size in bytes of the buffer that each harvester uses when fetching a file. +This only applies to non JSON logs. +The default is 16384. + +[float] +==== `encoding` + +The file encoding to use for reading data that contains international +characters. This only applies to non JSON logs. See <<_encoding_5>>. + + [float] ==== `expand_event_list_from_field` @@ -106,12 +120,29 @@ file_selectors: Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. For example: `s3-fips.us-gov-east-1.amazonaws.com`. +[float] +==== `max_bytes` + +The maximum number of bytes that a single log message can have. All +bytes after `max_bytes` are discarded and not sent. This setting is +especially useful for multiline log messages, which can get +large. This only applies to non JSON logs. The default is 10MB +(10485760). + [float] ==== `max_number_of_messages` The maximum number of messages to return. Amazon SQS never returns more messages than this value (however, fewer messages might be returned). Valid values: 1 to 10. Default: 5. +[float] +==== `multiline` + +Options that control how {beatname_uc} deals with log messages that +span multiple lines. This only applies to non JSON logs. See +<> for more information about configuring +multiline options. + [float] ==== `queue_url` diff --git a/x-pack/filebeat/input/awss3/collector.go b/x-pack/filebeat/input/awss3/collector.go index ecc381abf739..5d89b125d2ac 100644 --- a/x-pack/filebeat/input/awss3/collector.go +++ b/x-pack/filebeat/input/awss3/collector.go @@ -13,6 +13,7 @@ import ( "encoding/json" "fmt" "io" + "io/ioutil" "net/http" "net/url" "strings" @@ -29,6 +30,10 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/reader" + "github.com/elastic/beats/v7/libbeat/reader/multiline" + "github.com/elastic/beats/v7/libbeat/reader/readfile" + "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" "github.com/elastic/go-concert/unison" ) @@ -348,67 +353,79 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, defer resp.Body.Close() - reader := bufio.NewReader(resp.Body) + bodyReader := bufio.NewReader(resp.Body) - isS3ObjGzipped, err := isStreamGzipped(reader) + isS3ObjGzipped, err := isStreamGzipped(bodyReader) if err != nil { c.logger.Error(fmt.Errorf("could not determine if S3 object is gzipped: %w", err)) return err } if isS3ObjGzipped { - gzipReader, err := gzip.NewReader(reader) + gzipReader, err := gzip.NewReader(bodyReader) if err != nil { c.logger.Error(fmt.Errorf("gzip.NewReader failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err)) return err } - reader = bufio.NewReader(gzipReader) - gzipReader.Close() + defer gzipReader.Close() + bodyReader = bufio.NewReader(gzipReader) } // Decode JSON documents when content-type is "application/json" or expand_event_list_from_field is given in config if resp.ContentType != nil && *resp.ContentType == "application/json" || info.expandEventListFromField != "" { - decoder := json.NewDecoder(reader) + decoder := json.NewDecoder(bodyReader) err := c.decodeJSON(decoder, objectHash, info, s3Ctx) if err != nil { - c.logger.Error(fmt.Errorf("decodeJSONWithKey failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err)) - return err + return fmt.Errorf("decodeJSONWithKey failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err) } return nil } // handle s3 objects that are not json content-type + encodingFactory, ok := encoding.FindEncoding(c.config.Encoding) + if !ok || encodingFactory == nil { + return fmt.Errorf("unable to find '%v' encoding", c.config.Encoding) + } + enc, err := encodingFactory(bodyReader) + if err != nil { + return fmt.Errorf("failed to initialize encoding: %v", err) + } + var r reader.Reader + r, err = readfile.NewEncodeReader(ioutil.NopCloser(bodyReader), readfile.Config{ + Codec: enc, + BufferSize: c.config.BufferSize, + Terminator: c.config.LineTerminator, + MaxBytes: c.config.MaxBytes * 4, + }) + r = readfile.NewStripNewline(r, c.config.LineTerminator) + + if c.config.Multiline != nil { + r, err = multiline.New(r, "\n", c.config.MaxBytes, c.config.Multiline) + if err != nil { + return fmt.Errorf("error setting up multiline: %v", err) + } + } + + r = readfile.NewLimitReader(r, c.config.MaxBytes) + var offset int64 for { - log, err := readStringAndTrimDelimiter(reader) + message, err := r.Next() if err == io.EOF { - // create event for last line - offset += int64(len(log)) - event := createEvent(log, offset, info, objectHash, s3Ctx) - err = c.forwardEvent(event) - if err != nil { - c.logger.Error(fmt.Errorf("forwardEvent failed: %w", err)) - return err - } - return nil - } else if err != nil { - c.logger.Error(fmt.Errorf("readStringAndTrimDelimiter failed: %w", err)) - return err + // No more lines + break } - - if log == "" { - continue + if err != nil { + return fmt.Errorf("Error reading message: %v", err) } - - // create event per log line - offset += int64(len(log)) - event := createEvent(log, offset, info, objectHash, s3Ctx) + event := createEvent(string(message.Content), offset, info, objectHash, s3Ctx) + offset += int64(message.Bytes) err = c.forwardEvent(event) if err != nil { - c.logger.Error(fmt.Errorf("forwardEvent failed: %w", err)) - return err + return fmt.Errorf("forwardEvent failed: %v", err) } } + return nil } func (c *s3Collector) decodeJSON(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Ctx *s3Context) error { @@ -540,14 +557,6 @@ func trimLogDelimiter(log string) string { return strings.TrimSuffix(log, "\n") } -func readStringAndTrimDelimiter(reader *bufio.Reader) (string, error) { - logOriginal, err := reader.ReadString('\n') - if err != nil { - return logOriginal, err - } - return trimLogDelimiter(logOriginal), nil -} - func createEvent(log string, offset int64, info s3Info, objectHash string, s3Ctx *s3Context) beat.Event { s3Ctx.Inc() diff --git a/x-pack/filebeat/input/awss3/collector_test.go b/x-pack/filebeat/input/awss3/collector_test.go index e2a8bfca53e5..f05309bd53e5 100644 --- a/x-pack/filebeat/input/awss3/collector_test.go +++ b/x-pack/filebeat/input/awss3/collector_test.go @@ -23,6 +23,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/reader" + "github.com/elastic/beats/v7/libbeat/reader/readfile" + "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" ) // MockS3Client struct is used for unit tests. @@ -237,23 +240,43 @@ func TestNewS3BucketReader(t *testing.T) { resp, err := req.Send(ctx) assert.NoError(t, err) - reader := bufio.NewReader(resp.Body) + bodyReader := bufio.NewReader(resp.Body) defer resp.Body.Close() + encFactory, ok := encoding.FindEncoding("plain") + if !ok { + t.Fatalf("unable to find 'plain' encoding") + } + + enc, err := encFactory(bodyReader) + if err != nil { + t.Fatalf("failed to initialize encoding: %v", err) + } + + var r reader.Reader + r, err = readfile.NewEncodeReader(ioutil.NopCloser(bodyReader), readfile.Config{ + Codec: enc, + BufferSize: 4096, + Terminator: readfile.LineFeed, + }) + if err != nil { + t.Fatalf("Failed to initialize line reader: %v", err) + } + + r = readfile.NewStripNewline(r, readfile.LineFeed) + for i := 0; i < 3; i++ { + msg, err := r.Next() switch i { case 0: - log, err := readStringAndTrimDelimiter(reader) assert.NoError(t, err) - assert.Equal(t, s3LogString1Trimmed, log) + assert.Equal(t, s3LogString1Trimmed, string(msg.Content)) case 1: - log, err := readStringAndTrimDelimiter(reader) assert.NoError(t, err) - assert.Equal(t, s3LogString2Trimmed, log) + assert.Equal(t, s3LogString2Trimmed, string(msg.Content)) case 2: - log, err := readStringAndTrimDelimiter(reader) assert.Error(t, io.EOF, err) - assert.Equal(t, "", log) + assert.Equal(t, "", string(msg.Content)) } } } diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index c40a493b8c79..3f7294fb29f9 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -9,18 +9,26 @@ import ( "regexp" "time" + "github.com/dustin/go-humanize" + "github.com/elastic/beats/v7/libbeat/reader/multiline" + "github.com/elastic/beats/v7/libbeat/reader/readfile" awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" ) type config struct { - APITimeout time.Duration `config:"api_timeout"` - ExpandEventListFromField string `config:"expand_event_list_from_field"` - FileSelectors []FileSelectorCfg `config:"file_selectors"` - FipsEnabled bool `config:"fips_enabled"` - MaxNumberOfMessages int `config:"max_number_of_messages"` - QueueURL string `config:"queue_url" validate:"nonzero,required"` - VisibilityTimeout time.Duration `config:"visibility_timeout"` - AwsConfig awscommon.ConfigAWS `config:",inline"` + APITimeout time.Duration `config:"api_timeout"` + ExpandEventListFromField string `config:"expand_event_list_from_field"` + FileSelectors []FileSelectorCfg `config:"file_selectors"` + FipsEnabled bool `config:"fips_enabled"` + MaxNumberOfMessages int `config:"max_number_of_messages"` + QueueURL string `config:"queue_url" validate:"nonzero,required"` + VisibilityTimeout time.Duration `config:"visibility_timeout"` + AwsConfig awscommon.ConfigAWS `config:",inline"` + MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"` + Multiline *multiline.Config `config:"multiline"` + LineTerminator readfile.LineTerminator `config:"line_terminator"` + Encoding string `config:"encoding"` + BufferSize int `config:"buffer_size"` } // FileSelectorCfg defines type and configuration of FileSelectors @@ -36,6 +44,9 @@ func defaultConfig() config { FipsEnabled: false, MaxNumberOfMessages: 5, VisibilityTimeout: 300 * time.Second, + LineTerminator: readfile.AutoLineTerminator, + MaxBytes: 10 * humanize.MiByte, + BufferSize: 16 * humanize.KiByte, } } From d90b4b72f3224d1aeffd52eb6cecbfd742c3942a Mon Sep 17 00:00:00 2001 From: Lee Hinman <57081003+leehinman@users.noreply.github.com> Date: Thu, 13 May 2021 15:49:37 -0500 Subject: [PATCH 2/7] Update x-pack/filebeat/input/awss3/collector.go Co-authored-by: Andrew Kroh --- x-pack/filebeat/input/awss3/collector.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/awss3/collector.go b/x-pack/filebeat/input/awss3/collector.go index 5d89b125d2ac..336c26fe18e7 100644 --- a/x-pack/filebeat/input/awss3/collector.go +++ b/x-pack/filebeat/input/awss3/collector.go @@ -416,7 +416,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, break } if err != nil { - return fmt.Errorf("Error reading message: %v", err) + return fmt.Errorf("error reading message: %w", err) } event := createEvent(string(message.Content), offset, info, objectHash, s3Ctx) offset += int64(message.Bytes) From 8558f4974fe59fb8ddb69c6131c254dacb98c9a6 Mon Sep 17 00:00:00 2001 From: Lee Hinman <57081003+leehinman@users.noreply.github.com> Date: Thu, 13 May 2021 15:50:11 -0500 Subject: [PATCH 3/7] Update x-pack/filebeat/input/awss3/collector.go Co-authored-by: Andrew Kroh --- x-pack/filebeat/input/awss3/collector.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/awss3/collector.go b/x-pack/filebeat/input/awss3/collector.go index 336c26fe18e7..75a742b977e9 100644 --- a/x-pack/filebeat/input/awss3/collector.go +++ b/x-pack/filebeat/input/awss3/collector.go @@ -422,7 +422,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, offset += int64(message.Bytes) err = c.forwardEvent(event) if err != nil { - return fmt.Errorf("forwardEvent failed: %v", err) + return fmt.Errorf("forwardEvent failed: %w", err) } } return nil From 423cf7dfa275850fd5bd0a6a8c62ef96b1af404e Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Thu, 13 May 2021 16:12:27 -0500 Subject: [PATCH 4/7] Add feedback --- x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc | 10 ++++++---- x-pack/filebeat/input/awss3/collector.go | 3 +-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 903f370c8d5e..505e987b4259 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -47,14 +47,14 @@ is 0 seconds. The maximum is half of the visibility timeout value. ==== `buffer_size` The size in bytes of the buffer that each harvester uses when fetching a file. -This only applies to non JSON logs. +This only applies to non-JSON logs. The default is 16384. [float] ==== `encoding` The file encoding to use for reading data that contains international -characters. This only applies to non JSON logs. See <<_encoding_5>>. +characters. This only applies to non-JSON logs. See <<_encoding_5>>. [float] @@ -126,7 +126,7 @@ Enabling this option changes the service name from `s3` to `s3-fips` for connect The maximum number of bytes that a single log message can have. All bytes after `max_bytes` are discarded and not sent. This setting is especially useful for multiline log messages, which can get -large. This only applies to non JSON logs. The default is 10MB +large. This only applies to non-JSON logs. The default is 10MB (10485760). [float] @@ -138,8 +138,10 @@ Valid values: 1 to 10. Default: 5. [float] ==== `multiline` +beta[] + Options that control how {beatname_uc} deals with log messages that -span multiple lines. This only applies to non JSON logs. See +span multiple lines. This only applies to non-JSON logs. See <> for more information about configuring multiline options. diff --git a/x-pack/filebeat/input/awss3/collector.go b/x-pack/filebeat/input/awss3/collector.go index 75a742b977e9..d8d353e4b927 100644 --- a/x-pack/filebeat/input/awss3/collector.go +++ b/x-pack/filebeat/input/awss3/collector.go @@ -420,8 +420,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, } event := createEvent(string(message.Content), offset, info, objectHash, s3Ctx) offset += int64(message.Bytes) - err = c.forwardEvent(event) - if err != nil { + if err = c.forwardEvent(event); err != nil { return fmt.Errorf("forwardEvent failed: %w", err) } } From 85c7985ffd0bea8aa1ca7f53ac7e331c1d242d8d Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Fri, 14 May 2021 10:20:54 -0500 Subject: [PATCH 5/7] allow multiline to be turned on/off per file selector --- x-pack/filebeat/input/awss3/collector.go | 33 +++++++++++++++++------- x-pack/filebeat/input/awss3/config.go | 11 +++++--- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/x-pack/filebeat/input/awss3/collector.go b/x-pack/filebeat/input/awss3/collector.go index d8d353e4b927..ff2521e85bcc 100644 --- a/x-pack/filebeat/input/awss3/collector.go +++ b/x-pack/filebeat/input/awss3/collector.go @@ -55,6 +55,11 @@ type s3Info struct { region string arn string expandEventListFromField string + maxBytes int + multiline *multiline.Config + lineTerminator readfile.LineTerminator + encoding string + bufferSize int } type bucket struct { @@ -278,6 +283,11 @@ func (c *s3Collector) handleSQSMessage(m sqs.Message) ([]s3Info, error) { key: filename, arn: record.S3.bucket.Arn, expandEventListFromField: c.config.ExpandEventListFromField, + maxBytes: c.config.MaxBytes, + multiline: c.config.Multiline, + lineTerminator: c.config.LineTerminator, + encoding: c.config.Encoding, + bufferSize: c.config.BufferSize, }) continue } @@ -293,6 +303,11 @@ func (c *s3Collector) handleSQSMessage(m sqs.Message) ([]s3Info, error) { key: filename, arn: record.S3.bucket.Arn, expandEventListFromField: fs.ExpandEventListFromField, + maxBytes: fs.MaxBytes, + multiline: fs.Multiline, + lineTerminator: fs.LineTerminator, + encoding: fs.Encoding, + bufferSize: fs.BufferSize, }) break } @@ -382,9 +397,9 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, } // handle s3 objects that are not json content-type - encodingFactory, ok := encoding.FindEncoding(c.config.Encoding) + encodingFactory, ok := encoding.FindEncoding(info.encoding) if !ok || encodingFactory == nil { - return fmt.Errorf("unable to find '%v' encoding", c.config.Encoding) + return fmt.Errorf("unable to find '%v' encoding", info.encoding) } enc, err := encodingFactory(bodyReader) if err != nil { @@ -393,20 +408,20 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, var r reader.Reader r, err = readfile.NewEncodeReader(ioutil.NopCloser(bodyReader), readfile.Config{ Codec: enc, - BufferSize: c.config.BufferSize, - Terminator: c.config.LineTerminator, - MaxBytes: c.config.MaxBytes * 4, + BufferSize: info.bufferSize, + Terminator: info.lineTerminator, + MaxBytes: info.maxBytes * 4, }) - r = readfile.NewStripNewline(r, c.config.LineTerminator) + r = readfile.NewStripNewline(r, info.lineTerminator) - if c.config.Multiline != nil { - r, err = multiline.New(r, "\n", c.config.MaxBytes, c.config.Multiline) + if info.multiline != nil { + r, err = multiline.New(r, "\n", info.maxBytes, info.multiline) if err != nil { return fmt.Errorf("error setting up multiline: %v", err) } } - r = readfile.NewLimitReader(r, c.config.MaxBytes) + r = readfile.NewLimitReader(r, info.maxBytes) var offset int64 for { diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index 3f7294fb29f9..695fb887be78 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -33,9 +33,14 @@ type config struct { // FileSelectorCfg defines type and configuration of FileSelectors type FileSelectorCfg struct { - RegexString string `config:"regex"` - Regex *regexp.Regexp `config:",ignore"` - ExpandEventListFromField string `config:"expand_event_list_from_field"` + RegexString string `config:"regex"` + Regex *regexp.Regexp `config:",ignore"` + ExpandEventListFromField string `config:"expand_event_list_from_field"` + MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"` + Multiline *multiline.Config `config:"multiline"` + LineTerminator readfile.LineTerminator `config:"line_terminator"` + Encoding string `config:"encoding"` + BufferSize int `config:"buffer_size"` } func defaultConfig() config { From da105634849826e850987bc933d1d0aefefbc0e8 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Fri, 14 May 2021 15:13:43 -0500 Subject: [PATCH 6/7] Add multiline test --- x-pack/filebeat/input/awss3/collector.go | 16 ++++- x-pack/filebeat/input/awss3/ftest/sample2.txt | 8 +++ .../input/awss3/s3_integration_test.go | 59 +++++++++++++------ 3 files changed, 63 insertions(+), 20 deletions(-) create mode 100644 x-pack/filebeat/input/awss3/ftest/sample2.txt diff --git a/x-pack/filebeat/input/awss3/collector.go b/x-pack/filebeat/input/awss3/collector.go index ff2521e85bcc..1d5b749bf193 100644 --- a/x-pack/filebeat/input/awss3/collector.go +++ b/x-pack/filebeat/input/awss3/collector.go @@ -297,7 +297,7 @@ func (c *s3Collector) handleSQSMessage(m sqs.Message) ([]s3Info, error) { continue } if fs.Regex.MatchString(filename) { - s3Infos = append(s3Infos, s3Info{ + info := s3Info{ region: record.AwsRegion, name: record.S3.bucket.Name, key: filename, @@ -308,9 +308,19 @@ func (c *s3Collector) handleSQSMessage(m sqs.Message) ([]s3Info, error) { lineTerminator: fs.LineTerminator, encoding: fs.Encoding, bufferSize: fs.BufferSize, - }) - break + } + if info.bufferSize == 0 { + info.bufferSize = c.config.BufferSize + } + if info.maxBytes == 0 { + info.maxBytes = c.config.MaxBytes + } + if info.lineTerminator == 0 { + info.lineTerminator = c.config.LineTerminator + } + s3Infos = append(s3Infos, info) } + break } } return s3Infos, nil diff --git a/x-pack/filebeat/input/awss3/ftest/sample2.txt b/x-pack/filebeat/input/awss3/ftest/sample2.txt new file mode 100644 index 000000000000..431f86fa9a80 --- /dev/null +++ b/x-pack/filebeat/input/awss3/ftest/sample2.txt @@ -0,0 +1,8 @@ + + A + B + C + + D + E + F diff --git a/x-pack/filebeat/input/awss3/s3_integration_test.go b/x-pack/filebeat/input/awss3/s3_integration_test.go index f2eca42787e0..0c99686aebf5 100644 --- a/x-pack/filebeat/input/awss3/s3_integration_test.go +++ b/x-pack/filebeat/input/awss3/s3_integration_test.go @@ -11,7 +11,7 @@ import ( "context" "net/http" "os" - "path/filepath" + "strings" "sync" "testing" "time" @@ -33,12 +33,11 @@ import ( ) const ( - fileName = "sample1.txt" + fileName1 = "sample1.txt" + fileName2 = "sample2.txt" visibilityTimeout = 300 * time.Second ) -var filePath = filepath.Join("ftest", fileName) - // GetConfigForTest function gets aws credentials for integration tests. func getConfigForTest(t *testing.T) config { t.Helper() @@ -77,8 +76,23 @@ func getConfigForTest(t *testing.T) config { } func defaultTestConfig() *common.Config { - return common.MustNewConfigFrom(map[string]interface{}{ + return common.MustNewConfigFrom(common.MapStr{ "queue_url": os.Getenv("QUEUE_URL"), + "file_selectors": []common.MapStr{ + { + "regex": strings.Replace(fileName1, ".", "\\.", -1), + "max_bytes": 4096, + }, + { + "regex": strings.Replace(fileName2, ".", "\\.", -1), + "max_bytes": 4096, + "multiline": common.MapStr{ + "pattern": "^") + assert.Contains(t, message, "") + default: + t.Fatalf("object key %s is unknown", objectKey) + } + } }) } From 8a3b4087445b1323dab3c6e090060d1ec2ade484 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Mon, 17 May 2021 10:12:50 -0500 Subject: [PATCH 7/7] update docs and fmt code --- x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc | 8 +++++++- x-pack/filebeat/input/awss3/config.go | 1 + 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 505e987b4259..894a0aff926a 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -43,6 +43,7 @@ call will be interrupted. The default AWS API call timeout for a message is 120 seconds. The minimum is 0 seconds. The maximum is half of the visibility timeout value. +[id="input-{type}-buffer_size"] [float] ==== `buffer_size` @@ -50,6 +51,7 @@ The size in bytes of the buffer that each harvester uses when fetching a file. This only applies to non-JSON logs. The default is 16384. +[id="input-{type}-encoding"] [float] ==== `encoding` @@ -105,7 +107,9 @@ setting. If `file_selectors` is given, then any global `expand_event_list_from_field` value is ignored in favor of the ones specified in the `file_selectors`. Regex syntax is the same as the Go language. Files that don't match one of the regexes won't be -processed. +processed. <>, <>, +<> and <> may also be +set for each file selector. ["source", "yml"] ---- @@ -120,6 +124,7 @@ file_selectors: Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. For example: `s3-fips.us-gov-east-1.amazonaws.com`. +[id="input-{type}-max_bytes"] [float] ==== `max_bytes` @@ -135,6 +140,7 @@ The maximum number of messages to return. Amazon SQS never returns more messages than this value (however, fewer messages might be returned). Valid values: 1 to 10. Default: 5. +[id="input-{type}-multiline"] [float] ==== `multiline` diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index 695fb887be78..2552095a1050 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -10,6 +10,7 @@ import ( "time" "github.com/dustin/go-humanize" + "github.com/elastic/beats/v7/libbeat/reader/multiline" "github.com/elastic/beats/v7/libbeat/reader/readfile" awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"