Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AWS] [S3] fix: improve object size metric calculation #41755

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add support for Access Points in the `aws-s3` input. {pull}41495[41495]
- Fix the "No such input type exist: 'salesforce'" error on the Windows/AIX platform. {pull}41664[41664]
- Fix missing key in streaming input logging. {pull}41600[41600]
- Improve S3 object size metric calculation to support situations where Content-Length is not available. {pull}41755[41755]

*Heartbeat*

Expand Down
9 changes: 8 additions & 1 deletion x-pack/filebeat/input/awss3/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func init() {
// currentTime returns the current time. This exists to allow unit tests
// simulate the passage of time.
func currentTime() time.Time {
clock := clockValue.Load().(clock)
clock, _ := clockValue.Load().(clock)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - had to fix this to get lint correct

return clock.Now()
}

Expand Down Expand Up @@ -210,6 +210,8 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers
type monitoredReader struct {
reader io.Reader
totalBytesRead *monitoring.Uint

trackBytes int64
Comment on lines 212 to +214
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the naming could be more clear. "trackBytes" does not mean much to me. Maybe...

	totalBytesReadMetric *monitoring.Uint
	totalBytesRead       int64

}

func newMonitoredReader(r io.Reader, metric *monitoring.Uint) *monitoredReader {
Expand All @@ -219,5 +221,10 @@ func newMonitoredReader(r io.Reader, metric *monitoring.Uint) *monitoredReader {
func (m *monitoredReader) Read(p []byte) (int, error) {
n, err := m.reader.Read(p)
m.totalBytesRead.Add(uint64(n))
m.trackBytes += int64(n)
return n, err
}

func (m *monitoredReader) GetTrackedBytes() int64 {
Copy link
Member

@andrewkroh andrewkroh Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The accessor seems unnecessary. The caller can read straight from the field.

return m.trackBytes
}
9 changes: 5 additions & 4 deletions x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ type s3ObjectProcessor struct {

type s3DownloadedObject struct {
body io.ReadCloser
length int64
contentType string
metadata map[string]interface{}
}
Expand Down Expand Up @@ -142,9 +141,9 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func
defer s3Obj.body.Close()

p.s3Metadata = s3Obj.metadata
p.metrics.s3ObjectSizeInBytes.Update(s3Obj.length)

reader, err := p.addGzipDecoderIfNeeded(newMonitoredReader(s3Obj.body, p.metrics.s3BytesProcessedTotal))
mReader := newMonitoredReader(s3Obj.body, p.metrics.s3BytesProcessedTotal)
reader, err := p.addGzipDecoderIfNeeded(mReader)
if err != nil {
return fmt.Errorf("failed checking for gzip content: %w", err)
}
Expand Down Expand Up @@ -213,6 +212,9 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func
time.Since(start).Nanoseconds(), err)
}

// finally obtain total bytes of the object through metered reader
p.metrics.s3ObjectSizeInBytes.Update(mReader.GetTrackedBytes())

return nil
}

Expand Down Expand Up @@ -241,7 +243,6 @@ func (p *s3ObjectProcessor) download() (obj *s3DownloadedObject, err error) {

s := &s3DownloadedObject{
body: getObjectOutput.Body,
length: *getObjectOutput.ContentLength,
contentType: ctType,
metadata: meta,
}
Expand Down
70 changes: 70 additions & 0 deletions x-pack/filebeat/input/awss3/s3_objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,76 @@ func TestS3ObjectProcessor(t *testing.T) {
})
}

func TestProcessObjectMetricCollection(t *testing.T) {
logger := logp.NewLogger("testing-s3-processor-metrics")

tests := []struct {
name string
filename string
contentType string
objectSize int64
}{
{
name: "simple text - octet-stream",
filename: "testdata/log.txt",
contentType: "application/octet-stream",
objectSize: 18,
},
{
name: "json text",
filename: "testdata/log.json",
contentType: "application/json",
objectSize: 199,
},
{
name: "gzip with json text",
filename: "testdata/multiline.json.gz",
contentType: "application/x-gzip",
objectSize: 175,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// given
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()

ctrl, ctx := gomock.WithContext(ctx, t)
defer ctrl.Finish()

s3Event, s3Resp := newS3Object(t, test.filename, test.contentType)
mockS3API := NewMockS3API(ctrl)
gomock.InOrder(
mockS3API.EXPECT().
GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
Return(s3Resp, nil),
)

// metric recorder with zero workers
metricRecorder := newInputMetrics(test.name, nil, 0)
objFactory := newS3ObjectProcessorFactory(metricRecorder, mockS3API, nil, backupConfig{})
objHandler := objFactory.Create(ctx, s3Event)

// when
err := objHandler.ProcessS3Object(logger, func(_ beat.Event) {})

// then
require.NoError(t, err)

require.Equal(t, uint64(1), metricRecorder.s3ObjectsRequestedTotal.Get())
require.Equal(t, uint64(0), metricRecorder.s3ObjectsInflight.Get())

values := metricRecorder.s3ObjectSizeInBytes.Values()
require.Equal(t, 1, len(values))

// since we processed a single object, total and current process size is same
require.Equal(t, test.objectSize, values[0])
require.Equal(t, uint64(test.objectSize), metricRecorder.s3BytesProcessedTotal.Get())
})
}
}

func testProcessS3Object(t testing.TB, file, contentType string, numEvents int, selectors ...fileSelectorConfig) []beat.Event {
return _testProcessS3Object(t, file, contentType, numEvents, false, selectors)
}
Expand Down
Loading