From 05d57eda7868cb3ae255352f044040bf09327d66 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Fri, 22 Nov 2024 09:56:12 -0800 Subject: [PATCH 1/4] rely on monitored reader for content length Signed-off-by: Kavindu Dodanduwa --- x-pack/filebeat/input/awss3/metrics.go | 7 +++++++ x-pack/filebeat/input/awss3/s3_objects.go | 9 +++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index 3be07437a50..fe519a862c9 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -210,6 +210,8 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers type monitoredReader struct { reader io.Reader totalBytesRead *monitoring.Uint + + trackBytes int64 } func newMonitoredReader(r io.Reader, metric *monitoring.Uint) *monitoredReader { @@ -219,5 +221,10 @@ func newMonitoredReader(r io.Reader, metric *monitoring.Uint) *monitoredReader { func (m *monitoredReader) Read(p []byte) (int, error) { n, err := m.reader.Read(p) m.totalBytesRead.Add(uint64(n)) + m.trackBytes += int64(n) return n, err } + +func (m *monitoredReader) GetTrackedBytes() int64 { + return m.trackBytes +} diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index 93219d9a640..9168eabed7a 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -51,7 +51,6 @@ type s3ObjectProcessor struct { type s3DownloadedObject struct { body io.ReadCloser - length int64 contentType string metadata map[string]interface{} } @@ -142,9 +141,9 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func defer s3Obj.body.Close() p.s3Metadata = s3Obj.metadata - p.metrics.s3ObjectSizeInBytes.Update(s3Obj.length) - reader, err := p.addGzipDecoderIfNeeded(newMonitoredReader(s3Obj.body, p.metrics.s3BytesProcessedTotal)) + mReader := newMonitoredReader(s3Obj.body, p.metrics.s3BytesProcessedTotal) + reader, err := p.addGzipDecoderIfNeeded(mReader) if err != nil { return fmt.Errorf("failed checking for gzip content: %w", err) } @@ -213,6 +212,9 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func time.Since(start).Nanoseconds(), err) } + // finally obtain total bytes of the object through metered reader + p.metrics.s3ObjectSizeInBytes.Update(mReader.GetTrackedBytes()) + return nil } @@ -241,7 +243,6 @@ func (p *s3ObjectProcessor) download() (obj *s3DownloadedObject, err error) { s := &s3DownloadedObject{ body: getObjectOutput.Body, - length: *getObjectOutput.ContentLength, contentType: ctType, metadata: meta, } From 7ccece000c3cc253ab8c2ddb1ca7b1a751313397 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Fri, 22 Nov 2024 11:06:22 -0800 Subject: [PATCH 2/4] add tests to validate metrics Signed-off-by: Kavindu Dodanduwa --- .../filebeat/input/awss3/s3_objects_test.go | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index d20d81ced6c..432c5209d25 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -289,6 +289,76 @@ func TestS3ObjectProcessor(t *testing.T) { }) } +func TestProcessObjectMetricCollection(t *testing.T) { + logger := logp.NewLogger("testing-s3-processor-metrics") + + tests := []struct { + name string + filename string + contentType string + objectSize int64 + }{ + { + name: "simple text - octet-stream", + filename: "testdata/log.txt", + contentType: "application/octet-stream", + objectSize: 18, + }, + { + name: "json text", + filename: "testdata/log.json", + contentType: "application/json", + objectSize: 199, + }, + { + name: "gzip with json text", + filename: "testdata/multiline.json.gz", + contentType: "application/x-gzip", + objectSize: 175, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // given + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctrl, ctx := gomock.WithContext(ctx, t) + defer ctrl.Finish() + + s3Event, s3Resp := newS3Object(t, test.filename, test.contentType) + mockS3API := NewMockS3API(ctrl) + gomock.InOrder( + mockS3API.EXPECT(). + GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). + Return(s3Resp, nil), + ) + + // metric recorder with zero workers + metricRecorder := newInputMetrics(test.name, nil, 0) + objFactory := newS3ObjectProcessorFactory(metricRecorder, mockS3API, nil, backupConfig{}) + objHandler := objFactory.Create(ctx, s3Event) + + // when + err := objHandler.ProcessS3Object(logger, func(_ beat.Event) {}) + + // then + require.NoError(t, err) + + require.Equal(t, uint64(1), metricRecorder.s3ObjectsRequestedTotal.Get()) + require.Equal(t, uint64(0), metricRecorder.s3ObjectsInflight.Get()) + + values := metricRecorder.s3ObjectSizeInBytes.Values() + require.Equal(t, 1, len(values)) + + // since we processed a single object, total and current process size is same + require.Equal(t, test.objectSize, values[0]) + require.Equal(t, uint64(test.objectSize), metricRecorder.s3BytesProcessedTotal.Get()) + }) + } +} + func testProcessS3Object(t testing.TB, file, contentType string, numEvents int, selectors ...fileSelectorConfig) []beat.Event { return _testProcessS3Object(t, file, contentType, numEvents, false, selectors) } From 04531f30a1bdeb93cd52056da85cc9395819fe90 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Fri, 22 Nov 2024 11:23:55 -0800 Subject: [PATCH 3/4] add changelog entry Signed-off-by: Kavindu Dodanduwa --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8d6cc7a96fa..d2096b3b100 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -178,6 +178,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add support for Access Points in the `aws-s3` input. {pull}41495[41495] - Fix the "No such input type exist: 'salesforce'" error on the Windows/AIX platform. {pull}41664[41664] - Fix missing key in streaming input logging. {pull}41600[41600] +- Improve S3 object size metric calculation to support situations where Content-Length is not available. {pull}41755[41755] *Heartbeat* From 3a1b4defe21895cc0a53688f4c4c8604662d33a9 Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Fri, 22 Nov 2024 11:50:57 -0800 Subject: [PATCH 4/4] fix lint - ignore ok values as we know the stored value type Signed-off-by: Kavindu Dodanduwa --- x-pack/filebeat/input/awss3/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index fe519a862c9..74ac2d14a9a 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -36,7 +36,7 @@ func init() { // currentTime returns the current time. This exists to allow unit tests // simulate the passage of time. func currentTime() time.Time { - clock := clockValue.Load().(clock) + clock, _ := clockValue.Load().(clock) return clock.Now() }