From 96396b8c5f035fd22f4e94a280d447e1a52964ea Mon Sep 17 00:00:00 2001 From: Eric Yap Date: Wed, 16 Jul 2025 13:59:30 +0800 Subject: [PATCH 1/9] Add 9.2 to upgrade test --- .github/workflows/integration-server-test.yml | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/.github/workflows/integration-server-test.yml b/.github/workflows/integration-server-test.yml index 65f0bc589cf..4cb0cbdfe66 100644 --- a/.github/workflows/integration-server-test.yml +++ b/.github/workflows/integration-server-test.yml @@ -37,12 +37,10 @@ jobs: fail-fast: false matrix: upgrade-path: - # Latest 8.15 cannot upgrade to latest 8.17, it can only go to 8.17.3. - # With our current setup (only latest patch), we have to upgrade to intermediate latest 8.16 instead. - # TODO: Maybe add support for upgrading to latest upgradable instead of absolute latest? - - '8.15, 8.16, 8.17' - - '8.17, 8.18, 9.0' - - '8.17, 8.19, 9.1' + - '8.16, 8.17, 8.18' + - '8.18, 8.19, 9.2' + - '8.18, 9.0, 9.2' + - '8.19, 9.1, 9.2' scenario: - 'Default' - 'Reroute' From 35315de1bb53353e59c80aae7c558b6faf8ee700 Mon Sep 17 00:00:00 2001 From: Eric Yap Date: Wed, 16 Jul 2025 14:04:48 +0800 Subject: [PATCH 2/9] Add inputs to workflow job to control which tests to run --- .github/workflows/integration-server-test.yml | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/.github/workflows/integration-server-test.yml b/.github/workflows/integration-server-test.yml index 4cb0cbdfe66..763c1fe3fb5 100644 --- a/.github/workflows/integration-server-test.yml +++ b/.github/workflows/integration-server-test.yml @@ -3,7 +3,23 @@ name: integration-server-test run-name: Integration Server Test on: - workflow_dispatch: ~ + workflow_dispatch: + inputs: + run-upgrade-tests: + description: 'Run upgrade tests (SNAPSHOT)' + required: false + type: boolean + default: true + run-upgrade-bc-tests: + description: 'Run upgrade tests (BC)' + required: false + type: boolean + default: true + run-standalone-tests: + description: 'Run standalone-to-managed tests' + required: false + type: boolean + default: true schedule: - cron: '0 2 * * 1-5' @@ -31,7 +47,8 @@ jobs: uses: ./.github/workflows/generate-bc-upgrade-paths run-upgrade: - name: Upgrade tests (Snapshot) + if: ${{ inputs.run-upgrade-tests == true }} + name: Upgrade tests (SNAPSHOT) runs-on: ubuntu-latest strategy: fail-fast: false @@ -66,6 +83,7 @@ jobs: SCENARIO="${{ matrix.scenario }}" UPGRADE_PATH="${{ matrix.upgrade-path }}" SNAPSHOT=true make integration-server-test/upgrade run-upgrade-bc: + if: ${{ inputs.run-upgrade-bc-tests == true }} name: Upgrade tests (BC) runs-on: ubuntu-latest needs: prepare @@ -98,6 +116,7 @@ jobs: SCENARIO="${{ matrix.scenario }}" UPGRADE_PATH="${{ matrix.upgrade-path }}" make integration-server-test/upgrade run-standalone: + if: ${{ inputs.run-standalone-tests == true }} name: Standalone-to-managed tests runs-on: ubuntu-latest strategy: From ca874552b9fd2833c978bbd61c0f9dc502351b85 Mon Sep 17 00:00:00 2001 From: Eric Yap Date: Wed, 16 Jul 2025 14:06:20 +0800 Subject: [PATCH 3/9] Add back 8.15 --- .github/workflows/integration-server-test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/integration-server-test.yml b/.github/workflows/integration-server-test.yml index 763c1fe3fb5..a18f5844124 100644 --- a/.github/workflows/integration-server-test.yml +++ b/.github/workflows/integration-server-test.yml @@ -54,7 +54,7 @@ jobs: fail-fast: false matrix: upgrade-path: - - '8.16, 8.17, 8.18' + - '8.15, 8.16, 8.17, 8.18' - '8.18, 8.19, 9.2' - '8.18, 9.0, 9.2' - '8.19, 9.1, 9.2' From ae0e79b06971102c3cbad88ce278792ce39e9de5 Mon Sep 17 00:00:00 2001 From: Eric Yap Date: Wed, 16 Jul 2025 15:29:44 +0800 Subject: [PATCH 4/9] Merge branch 'main' into iservertest-add-9.2-test --- .mergify.yml | 2 +- NOTICE-fips.txt | 4 ++-- NOTICE.txt | 4 ++-- go.mod | 2 +- go.sum | 4 ++-- integrationservertest/logs_filters.go | 5 +++++ integrationservertest/standalone_test.go | 3 +++ integrationservertest/upgrade_test.go | 4 +++- internal/beatcmd/beat_test.go | 2 +- 9 files changed, 20 insertions(+), 10 deletions(-) diff --git a/.mergify.yml b/.mergify.yml index 8cf35901f57..de06d48271d 100644 --- a/.mergify.yml +++ b/.mergify.yml @@ -9,7 +9,7 @@ queue_rules: merge_method: squash conditions: - check-success=test (macos-latest) - - check-success=test (windows-latest) + - check-success=test (windows-2025) - check-success=system-test - check-success=lint - check-success=CLA diff --git a/NOTICE-fips.txt b/NOTICE-fips.txt index b4f6d2f9b67..2ca9c0d0931 100644 --- a/NOTICE-fips.txt +++ b/NOTICE-fips.txt @@ -1555,11 +1555,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/gmux@v0.3.2/LIC -------------------------------------------------------------------------------- Dependency : github.com/elastic/go-docappender/v2 -Version: v2.10.0 +Version: v2.11.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/go-docappender/v2@v2.10.0/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/go-docappender/v2@v2.11.0/LICENSE: Apache License Version 2.0, January 2004 diff --git a/NOTICE.txt b/NOTICE.txt index e4bbcfa44a9..af5aeaea97a 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -1555,11 +1555,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/gmux@v0.3.2/LIC -------------------------------------------------------------------------------- Dependency : github.com/elastic/go-docappender/v2 -Version: v2.10.0 +Version: v2.11.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/go-docappender/v2@v2.10.0/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/go-docappender/v2@v2.11.0/LICENSE: Apache License Version 2.0, January 2004 diff --git a/go.mod b/go.mod index 85d1e7831f6..a48ef4c79bb 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/elastic/elastic-agent-system-metrics v0.11.18 github.com/elastic/elastic-transport-go/v8 v8.7.0 github.com/elastic/gmux v0.3.2 - github.com/elastic/go-docappender/v2 v2.10.0 + github.com/elastic/go-docappender/v2 v2.11.0 github.com/elastic/go-freelru v0.16.0 github.com/elastic/go-sysinfo v1.15.3 github.com/elastic/go-ucfg v0.8.8 diff --git a/go.sum b/go.sum index 5b8781b23c5..1b5fb16593e 100644 --- a/go.sum +++ b/go.sum @@ -181,8 +181,8 @@ github.com/elastic/elastic-transport-go/v8 v8.7.0 h1:OgTneVuXP2uip4BA658Xi6Hfw+P github.com/elastic/elastic-transport-go/v8 v8.7.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= github.com/elastic/gmux v0.3.2 h1:cb721R+fe/rt/jVNyBP5HDQsEwLD2wSKfPD2Sk6adDk= github.com/elastic/gmux v0.3.2/go.mod h1:OD6oYrno+SV3pyl1ArdWCjlExZ+FJOfoSaFqnFeldBQ= -github.com/elastic/go-docappender/v2 v2.10.0 h1:gK7embg8JCedjkrfojhhWNufoG5ui7edBklakGXHtbQ= -github.com/elastic/go-docappender/v2 v2.10.0/go.mod h1:uJH43KCtXmqFUhYj7Am1NiHv0MXf3vM6VxtNlA/Utks= +github.com/elastic/go-docappender/v2 v2.11.0 h1:Sr6vKHff26mceWoFjkHFcFOTI4N6lIpyhY6snidq7Pg= +github.com/elastic/go-docappender/v2 v2.11.0/go.mod h1:uSM4ZspehKGUjTEXaSRg5PmcOywMHJNECGbPSkINkJ0= github.com/elastic/go-elasticsearch/v8 v8.18.1 h1:lPsN2Wk6+QqBeD4ckmOax7G/Y8tAZgroDYG8j6/5Ce0= github.com/elastic/go-elasticsearch/v8 v8.18.1/go.mod h1:F3j9e+BubmKvzvLjNui/1++nJuJxbkhHefbaT0kFKGY= github.com/elastic/go-freelru v0.16.0 h1:gG2HJ1WXN2tNl5/p40JS/l59HjvjRhjyAa+oFTRArYs= diff --git a/integrationservertest/logs_filters.go b/integrationservertest/logs_filters.go index c14e9ac8176..f58bfc1ac71 100644 --- a/integrationservertest/logs_filters.go +++ b/integrationservertest/logs_filters.go @@ -75,6 +75,11 @@ var ( "message": {Query: "failed to populate sourcemap metadata: fetcher unavailable: 403 Forbidden:"}, }, }) + syncSourcemapFetcher403 = apmErrorLog(types.Query{ + MatchPhrasePrefix: map[string]types.MatchPhrasePrefixQuery{ + "message": {Query: "failed to sync sourcemaps metadata: fetcher unavailable: 403 Forbidden:"}, + }, + }) refreshCache403 = apmErrorLog(types.Query{ MatchPhrase: map[string]types.MatchPhraseQuery{ diff --git a/integrationservertest/standalone_test.go b/integrationservertest/standalone_test.go index 75ed002a0ff..72655bf6a56 100644 --- a/integrationservertest/standalone_test.go +++ b/integrationservertest/standalone_test.go @@ -164,6 +164,7 @@ func managed7Runner(fromVersion7, toVersion8, toVersion9 ech.Version, config upg waitServerReadyCtxCanceled, grpcServerStopped, populateSourcemapFetcher403, + syncSourcemapFetcher403, }, }, }, @@ -221,6 +222,7 @@ func managed8Runner(fromVersion7, toVersion8, toVersion9 ech.Version, config upg refreshCacheCtxDeadline, refreshCacheESConfigInvalid, populateSourcemapFetcher403, + syncSourcemapFetcher403, }, }, }, @@ -266,6 +268,7 @@ func managed9Runner(fromVersion7, toVersion8, toVersion9 ech.Version, config upg refreshCacheCtxDeadline, refreshCacheESConfigInvalid, populateSourcemapFetcher403, + syncSourcemapFetcher403, }, }, }, diff --git a/integrationservertest/upgrade_test.go b/integrationservertest/upgrade_test.go index fae75cc3b59..86bb7574c04 100644 --- a/integrationservertest/upgrade_test.go +++ b/integrationservertest/upgrade_test.go @@ -164,13 +164,15 @@ func buildTestSteps(t *testing.T, versions ech.Versions, config upgradeTestConfi APMErrorLogsIgnored: apmErrorLogs{ tlsHandshakeError, esReturnedUnknown503, + refreshCache403, refreshCache503, refreshCacheCtxCanceled, refreshCacheCtxDeadline, refreshCacheESConfigInvalid, preconditionFailed, - populateSourcemapFetcher403, populateSourcemapServerShuttingDown, + populateSourcemapFetcher403, + syncSourcemapFetcher403, initialSearchQueryContextCanceled, scrollSearchQueryContextCanceled, }, diff --git a/internal/beatcmd/beat_test.go b/internal/beatcmd/beat_test.go index 732bbd68367..b24cffc004f 100644 --- a/internal/beatcmd/beat_test.go +++ b/internal/beatcmd/beat_test.go @@ -215,7 +215,7 @@ func TestLibbeatMetrics(t *testing.T) { }, snapshot) assert.Eventually(t, func() bool { - return appender.Stats().IndexersActive > 1 + return appender.IndexersActive() > 1 }, 10*time.Second, 50*time.Millisecond) for i := 0; i < 4; i++ { From 603f302248e124092bde4453cd9e5ba1ccd97902 Mon Sep 17 00:00:00 2001 From: Eric Yap Date: Thu, 17 Jul 2025 11:21:25 +0800 Subject: [PATCH 5/9] Merge branch 'main' into iservertest-add-9.2-test --- .github/ISSUE_TEMPLATE/test-plan.md | 34 ++++-- .../internal/gen/generator.go | 107 +++++++++++------- integrationservertest/steps.go | 1 - internal/beater/beater.go | 2 +- internal/publish/pub.go | 5 +- internal/publish/pub_test.go | 6 +- 6 files changed, 94 insertions(+), 61 deletions(-) diff --git a/.github/ISSUE_TEMPLATE/test-plan.md b/.github/ISSUE_TEMPLATE/test-plan.md index fb5ee5f09e4..5de9b9b1849 100644 --- a/.github/ISSUE_TEMPLATE/test-plan.md +++ b/.github/ISSUE_TEMPLATE/test-plan.md @@ -2,27 +2,41 @@ name: Test Plan about: Create a new manual test plan meta issue labels: "test-plan" - --- # Manual Test Plan -When picking up a test case, please add your name to this overview beforehand and tick the checkbox when finished. -Testing can be started when the first build candidate (BC) is available in the CFT region. +- When picking up a test case, please add your name to this overview beforehand and tick the checkbox when finished. +- Testing can be started when the first build candidate (BC) is available in the CFT region. +- For each repository, update the compare version range to get the list of commits to review. + +## ES apm-data plugin + + -## Smoke Testing ESS setup +## apm-aggregation -Thanks to https://github.com/elastic/apm-server/issues/8303 further smoke tests are run automatically on ESS now. -**Consider extending the smoke tests to include more test cases which we'd like to cover** + -## go-docappender library +List of changes: https://github.com/elastic/apm-aggregation/compare/v1.2.0...v1.3.0 + +## go-docappender -## apm-data library +List of changes: https://github.com/elastic/go-docappender/compare/v2.4.0...v2.10.0 + +## apm-data +List of changes: https://github.com/elastic/apm-data/compare/v1.16.0...v1.19.2 + +## apm-server + + + +List of changes: https://github.com/elastic/apm-server/compare/v9.0.0...main ## Test cases from the GitHub board @@ -31,7 +45,3 @@ Thanks to https://github.com/elastic/apm-server/issues/8303 further smoke tests Add yourself as _assignee_ on the PR before you start testing. - -## Regressions - - diff --git a/integrationservertest/internal/gen/generator.go b/integrationservertest/internal/gen/generator.go index 8ed4801f17b..d521a0b5b07 100644 --- a/integrationservertest/internal/gen/generator.go +++ b/integrationservertest/internal/gen/generator.go @@ -19,7 +19,9 @@ package gen import ( "context" + "crypto/rand" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -54,14 +56,64 @@ func New(url, apikey string, kbc *kibana.Client, logger *zap.Logger) *Generator } } -func (g *Generator) waitForAPMToBePublishReady(ctx context.Context, maxWaitDuration time.Duration) error { +// RunBlockingWait runs the underlying generator in blocking mode and waits for all in-flight +// data to be flushed before proceeding. This allows the caller to ensure than 1m aggregation +// metrics are ingested immediately after raw data ingestion, without variable delays. +// This may lead to data loss if the final flush takes more than 30s, which may happen if the +// quantity of data ingested with runBlocking gets too big. The current quantity does not +// trigger this behavior. +func (g *Generator) RunBlockingWait(ctx context.Context, version ech.Version, integrations bool) error { + g.logger.Info("wait for apm server to be ready") + if err := g.waitForAPMToBePublishReady(ctx); err != nil { + // If the APM server is not ready, we likely ran into an issue. + // For example, see https://github.com/elastic/apm-server/issues/17605. + // We can try to temporarily resolve it by re-applying the Elastic APM policy, + // and wait again. + // + // NOTE: This retry only works if there is integrations server, otherwise + // simply do nothing. + if !integrations { + return fmt.Errorf("failed to wait for apm server: %w", err) + } + if err = g.reapplyAPMPolicy(ctx, version); err != nil { + return fmt.Errorf("failed to re-apply apm policy: %w", err) + } + if err = g.waitForAPMToBePublishReady(ctx); err != nil { + return fmt.Errorf("failed to wait for apm server: %w", err) + } + } + + g.logger.Info("ingest data") + if err := g.runBlocking(ctx, version); err != nil { + return fmt.Errorf("cannot run generator: %w", err) + } + + // With Fleet managed APM server, we can trigger metrics flush. + if integrations { + g.logger.Info("flush apm metrics") + if err := g.flushAPMMetrics(ctx, version); err != nil { + return fmt.Errorf("cannot flush apm metrics: %w", err) + } + return nil + } + + // With standalone, we don't have Fleet, so simply just wait for some arbitrary time. + time.Sleep(180 * time.Second) + return nil +} + +// waitForAPMToBePublishReady waits for APM server to be publish-ready by querying the server. +func (g *Generator) waitForAPMToBePublishReady(ctx context.Context) error { + maxWaitDuration := 60 * time.Second timer := time.NewTimer(maxWaitDuration) defer timer.Stop() for { select { + case <-ctx.Done(): + return errors.New("apm server not ready but context done") case <-timer.C: - return fmt.Errorf("apm server not yet ready after %s", maxWaitDuration) + return fmt.Errorf("apm server not ready after %s", maxWaitDuration) default: info, err := queryAPMInfo(ctx, g.apmServerURL, g.apmAPIKey) if err != nil { @@ -72,7 +124,7 @@ func (g *Generator) waitForAPMToBePublishReady(ctx context.Context, maxWaitDurat return nil } - time.Sleep(1 * time.Second) + time.Sleep(10 * time.Second) } } } @@ -80,7 +132,6 @@ func (g *Generator) waitForAPMToBePublishReady(ctx context.Context, maxWaitDurat // runBlocking runs the underlying generator in blocking mode. func (g *Generator) runBlocking(ctx context.Context, version ech.Version) error { eventRate := "1000/s" - cfg := telemetrygen.DefaultConfig() cfg.APIKey = g.apmAPIKey cfg.TargetStackVersion = supportedstacks.TargetStackVersionLatest @@ -104,53 +155,31 @@ func (g *Generator) runBlocking(ctx context.Context, version ech.Version) error return fmt.Errorf("cannot create telemetrygen generator: %w", err) } - g.logger.Info("wait for apm server to be ready") - if err = g.waitForAPMToBePublishReady(ctx, 30*time.Second); err != nil { - return err - } - - g.logger.Info("ingest data") gen.Logger = g.logger return gen.RunBlocking(ctx) } -// RunBlockingWait runs the underlying generator in blocking mode and waits for all in-flight -// data to be flushed before proceeding. This allows the caller to ensure than 1m aggregation -// metrics are ingested immediately after raw data ingestion, without variable delays. -// This may lead to data loss if the final flush takes more than 30s, which may happen if the -// quantity of data ingested with runBlocking gets too big. The current quantity does not -// trigger this behavior. -func (g *Generator) RunBlockingWait(ctx context.Context, version ech.Version, integrations bool) error { - if err := g.runBlocking(ctx, version); err != nil { - return fmt.Errorf("cannot run generator: %w", err) - } +func (g *Generator) reapplyAPMPolicy(ctx context.Context, version ech.Version) error { + policyID := "elastic-cloud-apm" + description := fmt.Sprintf("%s %s", version, rand.Text()[5:]) - // With Fleet managed APM server, we can trigger metrics flush. - if integrations { - if err := flushAPMMetrics(ctx, g.kbc, version); err != nil { - return fmt.Errorf("cannot flush apm metrics: %w", err) - } - return nil + if err := g.kbc.UpdatePackagePolicyDescriptionByID(ctx, policyID, version, description); err != nil { + return fmt.Errorf( + "cannot update %s package policy description: %w", + policyID, err, + ) } - // With standalone, we don't have Fleet, so simply just wait for some arbitrary time. - time.Sleep(180 * time.Second) return nil } // flushAPMMetrics sends an update to the Fleet APM package policy in order // to trigger the flushing of in-flight APM metrics. -func flushAPMMetrics(ctx context.Context, kbc *kibana.Client, version ech.Version) error { - policyID := "elastic-cloud-apm" - description := fmt.Sprintf("Integration server test %s", version) - - // Sending an update with modifying the description is enough to trigger - // final aggregations in APM Server and flush of in-flight metrics. - if err := kbc.UpdatePackagePolicyDescriptionByID(ctx, policyID, version, description); err != nil { - return fmt.Errorf( - "cannot update %s package policy description to flush aggregation metrics: %w", - policyID, err, - ) +func (g *Generator) flushAPMMetrics(ctx context.Context, version ech.Version) error { + // Re-applying the Elastic APM policy is enough to trigger final aggregations + // in APM Server and flush of in-flight metrics. + if err := g.reapplyAPMPolicy(ctx, version); err != nil { + return err } // APM Server needs some time to flush all metrics, and we don't have any diff --git a/integrationservertest/steps.go b/integrationservertest/steps.go index b1b480dc88e..b78be7e061f 100644 --- a/integrationservertest/steps.go +++ b/integrationservertest/steps.go @@ -166,7 +166,6 @@ type ingestStep struct { } func (i ingestStep) Step(t *testing.T, ctx context.Context, e *testStepEnv) { - if e.currentVersion().Major < 8 { t.Fatal("ingest step should only be used for versions >= 8.0") } diff --git a/internal/beater/beater.go b/internal/beater/beater.go index 3c8aff9c1d8..b20d3cac6f3 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -892,7 +892,7 @@ func (s *Runner) newLibbeatFinalBatchProcessor( return nil, nil, fmt.Errorf("failed to create libbeat output pipeline: %w", err) } pipelineConnector := pipetool.WithACKer(pipeline, acker) - publisher, err := publish.NewPublisher(pipelineConnector, tracer) + publisher, err := publish.NewPublisher(pipelineConnector) if err != nil { return nil, nil, err } diff --git a/internal/publish/pub.go b/internal/publish/pub.go index 5f9632d9fb3..c781f16dddb 100644 --- a/internal/publish/pub.go +++ b/internal/publish/pub.go @@ -25,7 +25,6 @@ import ( "time" "github.com/pkg/errors" - "go.elastic.co/apm/v2" "go.elastic.co/fastjson" "github.com/elastic/beats/v7/libbeat/beat" @@ -47,7 +46,6 @@ type Reporter func(context.Context, PendingReq) error // concurrent HTTP requests trying to publish at the same time is limited. type Publisher struct { stopped chan struct{} - tracer *apm.Tracer client beat.Client mu sync.RWMutex @@ -73,10 +71,9 @@ var ( // // GOMAXPROCS goroutines are started for forwarding events to libbeat. // Stop must be called to close the beat.Client and free resources. -func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer) (*Publisher, error) { +func NewPublisher(pipeline beat.Pipeline) (*Publisher, error) { processingCfg := beat.ProcessingConfig{} p := &Publisher{ - tracer: tracer, stopped: make(chan struct{}), // One request will be actively processed by the diff --git a/internal/publish/pub_test.go b/internal/publish/pub_test.go index 148f82f534b..a9be35daecf 100644 --- a/internal/publish/pub_test.go +++ b/internal/publish/pub_test.go @@ -31,7 +31,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.elastic.co/apm/v2/apmtest" "go.elastic.co/fastjson" "github.com/elastic/beats/v7/libbeat/beat" @@ -54,7 +53,7 @@ func TestPublisherStop(t *testing.T) { // Create a pipeline with a limited queue size and no outputs, // so we can simulate a pipeline that blocks indefinitely. pipeline, client := newBlockingPipeline(t) - publisher, err := publish.NewPublisher(pipeline, apmtest.DiscardTracer) + publisher, err := publish.NewPublisher(pipeline) require.NoError(t, err) defer func() { cancelledContext, cancel := context.WithCancel(context.Background()) @@ -90,7 +89,7 @@ func TestPublisherStop(t *testing.T) { func TestPublisherStopShutdownInactive(t *testing.T) { pipeline, _ := newBlockingPipeline(t) - publisher, err := publish.NewPublisher(pipeline, apmtest.DiscardTracer) + publisher, err := publish.NewPublisher(pipeline) require.NoError(t, err) // There are no active events, so the publisher should stop immediately @@ -192,7 +191,6 @@ func BenchmarkPublisher(b *testing.B) { acker.Open() publisher, err := publish.NewPublisher( pipetool.WithACKer(pipeline, acker), - apmtest.DiscardTracer, ) require.NoError(b, err) From 3c2684632b8e819079bf0ed85c65662c46591a7b Mon Sep 17 00:00:00 2001 From: Eric Yap Date: Thu, 17 Jul 2025 15:27:06 +0800 Subject: [PATCH 6/9] Add log for re-apply policy --- integrationservertest/internal/gen/generator.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/integrationservertest/internal/gen/generator.go b/integrationservertest/internal/gen/generator.go index d521a0b5b07..d5a74a31210 100644 --- a/integrationservertest/internal/gen/generator.go +++ b/integrationservertest/internal/gen/generator.go @@ -75,6 +75,7 @@ func (g *Generator) RunBlockingWait(ctx context.Context, version ech.Version, in if !integrations { return fmt.Errorf("failed to wait for apm server: %w", err) } + g.logger.Info("re-apply apm policy") if err = g.reapplyAPMPolicy(ctx, version); err != nil { return fmt.Errorf("failed to re-apply apm policy: %w", err) } @@ -161,7 +162,7 @@ func (g *Generator) runBlocking(ctx context.Context, version ech.Version) error func (g *Generator) reapplyAPMPolicy(ctx context.Context, version ech.Version) error { policyID := "elastic-cloud-apm" - description := fmt.Sprintf("%s %s", version, rand.Text()[5:]) + description := fmt.Sprintf("%s %s", version, rand.Text()[:10]) if err := g.kbc.UpdatePackagePolicyDescriptionByID(ctx, policyID, version, description); err != nil { return fmt.Errorf( From f194ac021a2659206be30ff54df757784895fca3 Mon Sep 17 00:00:00 2001 From: Eric Yap Date: Thu, 17 Jul 2025 16:29:16 +0800 Subject: [PATCH 7/9] Remove flush --- .../internal/gen/generator.go | 27 +------------------ 1 file changed, 1 insertion(+), 26 deletions(-) diff --git a/integrationservertest/internal/gen/generator.go b/integrationservertest/internal/gen/generator.go index d5a74a31210..1b610684a1c 100644 --- a/integrationservertest/internal/gen/generator.go +++ b/integrationservertest/internal/gen/generator.go @@ -89,16 +89,7 @@ func (g *Generator) RunBlockingWait(ctx context.Context, version ech.Version, in return fmt.Errorf("cannot run generator: %w", err) } - // With Fleet managed APM server, we can trigger metrics flush. - if integrations { - g.logger.Info("flush apm metrics") - if err := g.flushAPMMetrics(ctx, version); err != nil { - return fmt.Errorf("cannot flush apm metrics: %w", err) - } - return nil - } - - // With standalone, we don't have Fleet, so simply just wait for some arbitrary time. + // Simply wait for some arbitrary time, for the data to be flushed. time.Sleep(180 * time.Second) return nil } @@ -174,22 +165,6 @@ func (g *Generator) reapplyAPMPolicy(ctx context.Context, version ech.Version) e return nil } -// flushAPMMetrics sends an update to the Fleet APM package policy in order -// to trigger the flushing of in-flight APM metrics. -func (g *Generator) flushAPMMetrics(ctx context.Context, version ech.Version) error { - // Re-applying the Elastic APM policy is enough to trigger final aggregations - // in APM Server and flush of in-flight metrics. - if err := g.reapplyAPMPolicy(ctx, version); err != nil { - return err - } - - // APM Server needs some time to flush all metrics, and we don't have any - // visibility on when this completes. - // NOTE: This value comes from empirical observations. - time.Sleep(120 * time.Second) - return nil -} - type apmInfoResp struct { Version string `json:"version"` PublishReady bool `json:"publish_ready"` From 5f544ffe2c266210ec469a2613917c50db8a7efb Mon Sep 17 00:00:00 2001 From: Eric Yap Date: Thu, 17 Jul 2025 19:19:08 +0800 Subject: [PATCH 8/9] Add retry for generator in case of connection issues --- .../internal/gen/generator.go | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/integrationservertest/internal/gen/generator.go b/integrationservertest/internal/gen/generator.go index 1b610684a1c..d8106ec1dd3 100644 --- a/integrationservertest/internal/gen/generator.go +++ b/integrationservertest/internal/gen/generator.go @@ -85,12 +85,12 @@ func (g *Generator) RunBlockingWait(ctx context.Context, version ech.Version, in } g.logger.Info("ingest data") - if err := g.runBlocking(ctx, version); err != nil { + if err := g.retryRunBlocking(ctx, version, 2); err != nil { return fmt.Errorf("cannot run generator: %w", err) } // Simply wait for some arbitrary time, for the data to be flushed. - time.Sleep(180 * time.Second) + time.Sleep(200 * time.Second) return nil } @@ -151,6 +151,32 @@ func (g *Generator) runBlocking(ctx context.Context, version ech.Version) error return gen.RunBlocking(ctx) } +// retryRunBlocking executes runBlocking. If it fails, it will retry up to retryTimes. +func (g *Generator) retryRunBlocking(ctx context.Context, version ech.Version, retryTimes int) error { + // No error, don't need to retry. + if err := g.runBlocking(ctx, version); err == nil { + return nil + } + + // Otherwise, retry until success or run out of attempts. + var finalErr error + for i := 0; i < retryTimes; i++ { + // Wait for some time before retrying. + time.Sleep(time.Duration(i) * 30 * time.Second) + + g.logger.Info(fmt.Sprintf("retrying ingest data attempt %d", i+1)) + err := g.runBlocking(ctx, version) + // Retry success, simply return. + if err == nil { + return nil + } + + finalErr = err + } + + return finalErr +} + func (g *Generator) reapplyAPMPolicy(ctx context.Context, version ech.Version) error { policyID := "elastic-cloud-apm" description := fmt.Sprintf("%s %s", version, rand.Text()[:10]) From b40208bc73603ab92d58421bc34cd5eb9e5d571f Mon Sep 17 00:00:00 2001 From: Eric Yap Date: Fri, 18 Jul 2025 17:34:01 +0800 Subject: [PATCH 9/9] Update job conditional according to comment --- .github/workflows/integration-server-test.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/integration-server-test.yml b/.github/workflows/integration-server-test.yml index a18f5844124..dec3af64d45 100644 --- a/.github/workflows/integration-server-test.yml +++ b/.github/workflows/integration-server-test.yml @@ -47,7 +47,7 @@ jobs: uses: ./.github/workflows/generate-bc-upgrade-paths run-upgrade: - if: ${{ inputs.run-upgrade-tests == true }} + if: ${{ !contains(inputs.should_run, 'false') }} name: Upgrade tests (SNAPSHOT) runs-on: ubuntu-latest strategy: @@ -83,7 +83,7 @@ jobs: SCENARIO="${{ matrix.scenario }}" UPGRADE_PATH="${{ matrix.upgrade-path }}" SNAPSHOT=true make integration-server-test/upgrade run-upgrade-bc: - if: ${{ inputs.run-upgrade-bc-tests == true }} + if: ${{ !contains(inputs.should_run, 'false') }} name: Upgrade tests (BC) runs-on: ubuntu-latest needs: prepare @@ -116,7 +116,7 @@ jobs: SCENARIO="${{ matrix.scenario }}" UPGRADE_PATH="${{ matrix.upgrade-path }}" make integration-server-test/upgrade run-standalone: - if: ${{ inputs.run-standalone-tests == true }} + if: ${{ !contains(inputs.should_run, 'false') }} name: Standalone-to-managed tests runs-on: ubuntu-latest strategy: