From bace775299f12c2aedf19c4d7fcd9e35d313405c Mon Sep 17 00:00:00 2001 From: pzl Date: Wed, 10 Dec 2025 15:03:47 -0500 Subject: [PATCH 1/4] file upload: do not refresh on every chunk --- ...1765396860-improve-upload-performance.yaml | 32 +++++++++++++++++++ internal/pkg/file/uploader/es.go | 9 +++++- internal/pkg/file/uploader/finalize.go | 6 ++++ 3 files changed, 46 insertions(+), 1 deletion(-) create mode 100644 changelog/fragments/1765396860-improve-upload-performance.yaml diff --git a/changelog/fragments/1765396860-improve-upload-performance.yaml b/changelog/fragments/1765396860-improve-upload-performance.yaml new file mode 100644 index 0000000000..57cae66a5a --- /dev/null +++ b/changelog/fragments/1765396860-improve-upload-performance.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: enhancement + +# Change summary; a 80ish characters long description of the change. +summary: improves file upload performance for large files + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; a word indicating the component this changeset affects. +component: fleet-server + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +#pr: https://github.com/owner/repo/1234 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 diff --git a/internal/pkg/file/uploader/es.go b/internal/pkg/file/uploader/es.go index 7da621d66a..fbe3fe281c 100644 --- a/internal/pkg/file/uploader/es.go +++ b/internal/pkg/file/uploader/es.go @@ -130,7 +130,6 @@ func IndexChunk(ctx context.Context, client *elasticsearch.Client, body *cbor.Ch } req.Header.Set("Content-Type", "application/cbor") req.Header.Set("Accept", "application/json") - req.Refresh = "true" }) if err != nil { return err @@ -186,3 +185,11 @@ func DeleteAllChunksForFile(ctx context.Context, bulker bulk.Bulk, source string _, err = client.DeleteByQuery([]string{fmt.Sprintf(UploadDataIndexPattern, source)}, bytes.NewReader(q), client.DeleteByQuery.WithContext(ctx)) return err } + +func EnsureChunksIndexed(ctx context.Context, client *elasticsearch.Client, source string) error { + req := esapi.IndicesRefreshRequest{ + Index: []string{fmt.Sprintf(UploadDataIndexPattern, source)}, + } + _, err := req.Do(ctx, client) + return err +} diff --git a/internal/pkg/file/uploader/finalize.go b/internal/pkg/file/uploader/finalize.go index 0b04321d2a..717ddd6500 100644 --- a/internal/pkg/file/uploader/finalize.go +++ b/internal/pkg/file/uploader/finalize.go @@ -42,6 +42,12 @@ func (u *Uploader) Complete(ctx context.Context, id string, transitHash string) return info, ErrStatusNoUploads } + // complete may be called before most recent chunks are available for search yet + // this explicitly calls refresh once at the end, instead of refreshing on each chunk + if err := EnsureChunksIndexed(ctx, u.bulker.Client(), info.Source); err != nil { + return info, err + } + chunks, err := file.GetChunkInfos(ctx, u.bulker, UploadDataIndexPattern, info.DocID, file.GetChunkInfoOpt{IncludeSize: true, RequireHash: true}) if err != nil { return info, err From 438b01da7352a191807f257669c98226bfaa1c2a Mon Sep 17 00:00:00 2001 From: pzl Date: Fri, 12 Dec 2025 16:40:46 -0500 Subject: [PATCH 2/4] add context to refresh failures --- internal/pkg/file/uploader/finalize.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/pkg/file/uploader/finalize.go b/internal/pkg/file/uploader/finalize.go index 717ddd6500..b474674e84 100644 --- a/internal/pkg/file/uploader/finalize.go +++ b/internal/pkg/file/uploader/finalize.go @@ -9,6 +9,7 @@ import ( "crypto/sha256" "encoding/hex" "errors" + "fmt" "sort" "strings" @@ -45,7 +46,7 @@ func (u *Uploader) Complete(ctx context.Context, id string, transitHash string) // complete may be called before most recent chunks are available for search yet // this explicitly calls refresh once at the end, instead of refreshing on each chunk if err := EnsureChunksIndexed(ctx, u.bulker.Client(), info.Source); err != nil { - return info, err + return info, fmt.Errorf("unable to refresh chunk data index: %w", err) } chunks, err := file.GetChunkInfos(ctx, u.bulker, UploadDataIndexPattern, info.DocID, file.GetChunkInfoOpt{IncludeSize: true, RequireHash: true}) From 9369c1569b36333345b4c6606bdcb7bf525043dd Mon Sep 17 00:00:00 2001 From: pzl Date: Mon, 15 Dec 2025 12:34:12 -0500 Subject: [PATCH 3/4] log refresh call failures --- internal/pkg/file/uploader/es.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/pkg/file/uploader/es.go b/internal/pkg/file/uploader/es.go index fbe3fe281c..3ec344c5a9 100644 --- a/internal/pkg/file/uploader/es.go +++ b/internal/pkg/file/uploader/es.go @@ -190,6 +190,9 @@ func EnsureChunksIndexed(ctx context.Context, client *elasticsearch.Client, sour req := esapi.IndicesRefreshRequest{ Index: []string{fmt.Sprintf(UploadDataIndexPattern, source)}, } - _, err := req.Do(ctx, client) + resp, err := req.Do(ctx, client) + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + zerolog.Ctx(ctx).Warn().Int("status_code", resp.StatusCode).Msg("File Chunk Index refresh gave abnormal response") + } return err } From 67bf9bb9e61e2709f082281afff082f2733350be Mon Sep 17 00:00:00 2001 From: pzl Date: Tue, 16 Dec 2025 09:37:46 -0500 Subject: [PATCH 4/4] add tests around refresh usage on upload --- internal/pkg/file/uploader/es.go | 2 + internal/pkg/file/uploader/es_test.go | 42 +++++++ internal/pkg/file/uploader/finalize_test.go | 129 ++++++++++++++++++++ 3 files changed, 173 insertions(+) create mode 100644 internal/pkg/file/uploader/es_test.go create mode 100644 internal/pkg/file/uploader/finalize_test.go diff --git a/internal/pkg/file/uploader/es.go b/internal/pkg/file/uploader/es.go index 3ec344c5a9..20526ee5f0 100644 --- a/internal/pkg/file/uploader/es.go +++ b/internal/pkg/file/uploader/es.go @@ -123,6 +123,8 @@ func IndexChunk(ctx context.Context, client *elasticsearch.Client, body *cbor.Ch span, _ := apm.StartSpan(ctx, "createChunk", "create") defer span.End() chunkDocID := fmt.Sprintf("%s.%d", fileID, chunkNum) + // This create doc happens *many* times per file, so it must not use any expensive parameters (e.g. refresh) + // for a 5GB file, that is 1,280 create calls resp, err := client.Create(fmt.Sprintf(UploadDataIndexPattern, source), chunkDocID, body, func(req *esapi.CreateRequest) { req.DocumentID = chunkDocID if req.Header == nil { diff --git a/internal/pkg/file/uploader/es_test.go b/internal/pkg/file/uploader/es_test.go new file mode 100644 index 0000000000..4f251c9984 --- /dev/null +++ b/internal/pkg/file/uploader/es_test.go @@ -0,0 +1,42 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package uploader + +import ( + "bytes" + "io" + "net/http" + "strings" + "testing" + + "github.com/elastic/fleet-server/v7/internal/pkg/file/cbor" + "github.com/elastic/fleet-server/v7/internal/pkg/testing/esutil" + "github.com/stretchr/testify/assert" +) + +func TestIndexChunkDoesNotUseExpensiveParams(t *testing.T) { + c, mockTX := esutil.MockESClient(t) + chunker := cbor.NewChunkWriter(bytes.NewReader([]byte{}), false, "", "", 100) + called := false + mockTX.RoundTripFn = func(req *http.Request) (*http.Response, error) { + if strings.Contains(req.URL.Path, "/_create") { + called = true + refr := req.URL.Query().Get("refresh") + assert.NotEqual(t, "true", refr, "Chunk Index operation must not use expensive refresh parameter") + } + respHeaders := make(http.Header) + respHeaders.Set("X-Elastic-Product", "Elasticsearch") + return &http.Response{ + StatusCode: 200, + Body: io.NopCloser(strings.NewReader(`{}`)), + Header: respHeaders, + }, nil + } + + err := IndexChunk(t.Context(), c, chunker, "mypkg", "sampleFileID", 0) + assert.NoError(t, err) + + assert.True(t, called, "_create API was not called") +} diff --git a/internal/pkg/file/uploader/finalize_test.go b/internal/pkg/file/uploader/finalize_test.go new file mode 100644 index 0000000000..051581b290 --- /dev/null +++ b/internal/pkg/file/uploader/finalize_test.go @@ -0,0 +1,129 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package uploader + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "testing" + "time" + + "github.com/elastic/fleet-server/v7/internal/pkg/cache" + "github.com/elastic/fleet-server/v7/internal/pkg/config" + "github.com/elastic/fleet-server/v7/internal/pkg/es" + "github.com/elastic/fleet-server/v7/internal/pkg/file" + itesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + "github.com/elastic/fleet-server/v7/internal/pkg/testing/esutil" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +/* +This test verifies that chunks are made available for search +before the Uploader performs that search and counts/verifies each chunk. +It specifically looks for an elasticsearch call to `../_refresh/` API, +to occur before a chunk-search call. This prevents the index-then-search +data race problem with final chunks *right before* completion API call. + +If alternative means are used to address the data-race race condition, +this test may be updated. +*/ +func TestUploadCompletePerformsRefreshBeforeChunkSearch(t *testing.T) { + + /** + * Setup & Mocking only + * avoid asserts here, since the setup path is involved, and executes + * a lot of code paths. Those paths are not under test here + **/ + refreshCalled := false + size := 200 + fakeBulk := itesting.NewMockBulk() + fakeIntegrationSrc := "endpoint" + // hash of null chunk, and then transithash OF that hash + nullHash := "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + transitHashNull := "5df6e0e2761359d30a8275058e299fcc0381534545f55cf43e41983f5d4c9456" + mockClient, mockTX := esutil.MockESClient(t) + + // populate mock info for a *metadata* search, which may occur during finalization + mockMeta, _ := json.Marshal(map[string]any{ + "action_id": "actionID", + "agent_id": "agentID", + "src": fakeIntegrationSrc, + "file": map[string]interface{}{ + "size": size, + "ChunkSize": file.MaxChunkSize, + "Status": file.StatusProgress, + }, + "upload_id": "some-id", + "upload_start": time.Now().UnixMilli(), + }) + fakeBulk.On("Search", + mock.MatchedBy(func(_ context.Context) bool { return true }), // match context.Context + ".fleet-fileds-fromhost-meta-*", // *metadata* index (NOT CHUNK/DATA!) + mock.Anything, // query bytes + mock.Anything, // bulk opts + ).Return(&es.ResultT{ + HitsT: es.HitsT{ + Hits: []es.HitT{{ + ID: "_sampledocID", + Source: mockMeta, + }}, + }, + }, nil).Maybe() // not under test, calling is not required + fakeBulk.On("Client").Return(mockClient).Maybe() // inject our mock client, if used + mockTX.RoundTripFn = func(req *http.Request) (*http.Response, error) { + if strings.Contains(req.URL.Path, fmt.Sprintf(UploadDataIndexPattern, fakeIntegrationSrc)+"/_refresh") { + refreshCalled = true + } + respHeaders := make(http.Header) + respHeaders.Set("X-Elastic-Product", "Elasticsearch") + return &http.Response{ + StatusCode: 200, + Body: io.NopCloser(strings.NewReader(`{}`)), + Header: respHeaders, + }, nil + } + + fakeBulk.On("Search", + mock.MatchedBy(func(_ context.Context) bool { return true }), // match context.Context + ".fleet-fileds-fromhost-data-*", // *DATA* (chunk) search + mock.Anything, // query bytes + mock.Anything, // bulk opts + + ).Run(func(args mock.Arguments) { + // runs during execution, before return + assert.True(t, refreshCalled, "Chunk finalization search occurred without refresh") + }).Return(&es.ResultT{ + HitsT: es.HitsT{ + Hits: []es.HitT{{ + ID: "actionID.agentID.0", + Fields: map[string]any{ + file.FieldBaseID: []any{"actionID.agentID"}, + file.FieldSHA2: []any{nullHash}, + "size": []any{size}, + file.FieldLast: []any{true}, + }, + }}, + }, + }, nil) + + c, err := cache.New(config.Cache{NumCounters: 100, MaxCost: 100000}) + require.NoError(t, err) // panic-exit if prereq fails, not intentional testing + u := New(nil, fakeBulk, c, size_ptr(size), time.Hour) + + /** + * Begin actual execution & assertions + **/ + _, err = u.Complete(t.Context(), "actionID", transitHashNull) + assert.NoError(t, err) + + assert.True(t, refreshCalled, "_refresh API was not called during file finalization") +}