Skip to content
Merged
33 changes: 25 additions & 8 deletions .github/workflows/integration-server-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,23 @@ name: integration-server-test
run-name: Integration Server Test

on:
workflow_dispatch: ~
workflow_dispatch:
inputs:
run-upgrade-tests:
description: 'Run upgrade tests (SNAPSHOT)'
required: false
type: boolean
default: true
run-upgrade-bc-tests:
description: 'Run upgrade tests (BC)'
required: false
type: boolean
default: true
run-standalone-tests:
description: 'Run standalone-to-managed tests'
required: false
type: boolean
default: true
schedule:
- cron: '0 2 * * 1-5'

Expand Down Expand Up @@ -31,18 +47,17 @@ jobs:
uses: ./.github/workflows/generate-bc-upgrade-paths

run-upgrade:
name: Upgrade tests (Snapshot)
if: ${{ !contains(inputs.should_run, 'false') }}
name: Upgrade tests (SNAPSHOT)
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
upgrade-path:
# Latest 8.15 cannot upgrade to latest 8.17, it can only go to 8.17.3.
# With our current setup (only latest patch), we have to upgrade to intermediate latest 8.16 instead.
# TODO: Maybe add support for upgrading to latest upgradable instead of absolute latest?
- '8.15, 8.16, 8.17'
- '8.17, 8.18, 9.0'
- '8.17, 8.19, 9.1'
- '8.15, 8.16, 8.17, 8.18'
- '8.18, 8.19, 9.2'
- '8.18, 9.0, 9.2'
- '8.19, 9.1, 9.2'
scenario:
- 'Default'
- 'Reroute'
Expand All @@ -68,6 +83,7 @@ jobs:
SCENARIO="${{ matrix.scenario }}" UPGRADE_PATH="${{ matrix.upgrade-path }}" SNAPSHOT=true make integration-server-test/upgrade

run-upgrade-bc:
if: ${{ !contains(inputs.should_run, 'false') }}
name: Upgrade tests (BC)
runs-on: ubuntu-latest
needs: prepare
Expand Down Expand Up @@ -100,6 +116,7 @@ jobs:
SCENARIO="${{ matrix.scenario }}" UPGRADE_PATH="${{ matrix.upgrade-path }}" make integration-server-test/upgrade

run-standalone:
if: ${{ !contains(inputs.should_run, 'false') }}
name: Standalone-to-managed tests
runs-on: ubuntu-latest
strategy:
Expand Down
60 changes: 31 additions & 29 deletions integrationservertest/internal/gen/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (g *Generator) RunBlockingWait(ctx context.Context, version ech.Version, in
if !integrations {
return fmt.Errorf("failed to wait for apm server: %w", err)
}
g.logger.Info("re-apply apm policy")
if err = g.reapplyAPMPolicy(ctx, version); err != nil {
return fmt.Errorf("failed to re-apply apm policy: %w", err)
}
Expand All @@ -84,21 +85,12 @@ func (g *Generator) RunBlockingWait(ctx context.Context, version ech.Version, in
}

g.logger.Info("ingest data")
if err := g.runBlocking(ctx, version); err != nil {
if err := g.retryRunBlocking(ctx, version, 2); err != nil {
return fmt.Errorf("cannot run generator: %w", err)
}

// With Fleet managed APM server, we can trigger metrics flush.
if integrations {
g.logger.Info("flush apm metrics")
if err := g.flushAPMMetrics(ctx, version); err != nil {
return fmt.Errorf("cannot flush apm metrics: %w", err)
}
return nil
}

// With standalone, we don't have Fleet, so simply just wait for some arbitrary time.
time.Sleep(180 * time.Second)
// Simply wait for some arbitrary time, for the data to be flushed.
time.Sleep(200 * time.Second)
return nil
}

Expand Down Expand Up @@ -159,9 +151,35 @@ func (g *Generator) runBlocking(ctx context.Context, version ech.Version) error
return gen.RunBlocking(ctx)
}

// retryRunBlocking executes runBlocking. If it fails, it will retry up to retryTimes.
func (g *Generator) retryRunBlocking(ctx context.Context, version ech.Version, retryTimes int) error {
// No error, don't need to retry.
if err := g.runBlocking(ctx, version); err == nil {
return nil
}

// Otherwise, retry until success or run out of attempts.
var finalErr error
for i := 0; i < retryTimes; i++ {
// Wait for some time before retrying.
time.Sleep(time.Duration(i) * 30 * time.Second)

g.logger.Info(fmt.Sprintf("retrying ingest data attempt %d", i+1))
err := g.runBlocking(ctx, version)
// Retry success, simply return.
if err == nil {
return nil
}

finalErr = err
}

return finalErr
}

func (g *Generator) reapplyAPMPolicy(ctx context.Context, version ech.Version) error {
policyID := "elastic-cloud-apm"
description := fmt.Sprintf("%s %s", version, rand.Text()[5:])
description := fmt.Sprintf("%s %s", version, rand.Text()[:10])

if err := g.kbc.UpdatePackagePolicyDescriptionByID(ctx, policyID, version, description); err != nil {
return fmt.Errorf(
Expand All @@ -173,22 +191,6 @@ func (g *Generator) reapplyAPMPolicy(ctx context.Context, version ech.Version) e
return nil
}

// flushAPMMetrics sends an update to the Fleet APM package policy in order
// to trigger the flushing of in-flight APM metrics.
func (g *Generator) flushAPMMetrics(ctx context.Context, version ech.Version) error {
// Re-applying the Elastic APM policy is enough to trigger final aggregations
// in APM Server and flush of in-flight metrics.
if err := g.reapplyAPMPolicy(ctx, version); err != nil {
return err
}

// APM Server needs some time to flush all metrics, and we don't have any
// visibility on when this completes.
// NOTE: This value comes from empirical observations.
time.Sleep(120 * time.Second)
return nil
}

type apmInfoResp struct {
Version string `json:"version"`
PublishReady bool `json:"publish_ready"`
Expand Down
Loading