From 961beb206e0b1828217a8be926006b014a8143c8 Mon Sep 17 00:00:00 2001 From: Tere Date: Tue, 21 Apr 2026 12:06:51 +0200 Subject: [PATCH] [Fleet] Resolve streams and secrets by effective input name (#264366) ## Summary Completes propagation of **effective input names** (`name ?? type`, `getInputEffectiveName`) for places that still matched registry `streams[].input` against **`input.type`**. That mismatch caused package policy creation to fail with `Stream template not found, unable to find stream for input ` when integration manifests reference streams by **input name** (same agent type, disambiguated names), as exercised by composable/elastic-package dual-input fixtures. **Changes** - **`package_policy.ts`**: `_compilePackageStream` selects the registry stream using `getInputEffectiveName(input)`; error message includes effective name and type. - **`validate_package_policy.ts`**: stream var-def lookup key uses `${dataset}-${getInputEffectiveName(input)}` to match the registry map keyed by `stream.input`. - **`secrets/package_policies.ts`**: stream secret var-def lookup uses the same effective name in the dataset key; import `getInputEffectiveName`. - **`get_template_inputs.ts`**: `buildIndexedPackage` indexes streams and input ids using `getInputEffectiveName(packageInput)` so docs/template indexing matches named stream references. - **`package_policy.test.ts`**: Jest coverage for a manifest whose `streams[].input` is the input **name** while `input.type` remains the real type. Relates to [Fleet: Support setting explicit names for ambiguous inputs](https://github.com/elastic/kibana/pull/262138) and composable packages that set `streams[].input` to the input name qualifier. **Related issue:** This gap surfaced while testing work for [elastic/elastic-package#3465](https://github.com/elastic/elastic-package/issues/3465) (composable `requires.input`: input `name` and stream `input` per package-spec qualified inputs). The elastic-package change that implements that issue is still **WIP**; this Kibana PR closes the Fleet side so policies compile when manifests follow the spec (streams reference the input **name** while the policy input keeps the real agent **type**). **Release notes:** Follow-up to #262138; no additional user-facing release note beyond that change. Use label `release_note:skip`. ### Checklist Check the PR satisfies following conditions. Reviewers should verify this PR satisfies this list as well. - [x] Any text added follows [EUI's writing guidelines](https://elastic.github.io/eui/#/guidelines/writing), uses sentence case text and includes [i18n support](https://github.com/elastic/kibana/blob/main/src/platform/packages/shared/kbn-i18n/README.md) - [ ] [Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html) was added for features that require explanation or tutorials - [x] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios - [ ] If a plugin configuration key changed, check if it needs to be allowlisted in the cloud and added to the [docker list](https://github.com/elastic/kibana/blob/main/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker) - [x] This was checked for breaking HTTP API changes, and any breaking changes have been approved by the breaking-change committee. The `release_note:breaking` label should be applied in these situations. - [ ] [Flaky Test Runner](https://ci-stats.kibana.dev/trigger_flaky_test_runner/1) was used on any tests changed - [x] The PR description includes the appropriate Release Notes section, and the correct `release_note:*` label is applied per the [guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process) (`release_note:skip`, follow-up to #262138). - [ ] Review the [backport guidelines](https://docs.google.com/document/d/1VyN5k91e5OVumlc0Gb9RPa3h1ewuPE705nRtioPiTvY/edit?usp=sharing) and apply applicable `backport:*` labels. ### Identify risks Does this PR introduce any risks? For example, consider risks like hard to test bugs, performance regression, potential of data loss. Describe the risk, its severity, and mitigation for each identified risk. Invite stakeholders and evaluate how to proceed before merging. - [ ] [See some risk examples](https://github.com/elastic/kibana/blob/main/RISK_MATRIX.mdx) **Risks** - **Wrong stream match if a package mis-declares `streams[].input`**: Low. Behavior is aligned with `getInputEffectiveName`; packages that still use plain `type` in manifests continue to work because `name` is absent. - **EPM template / comment indexing**: Low. `buildIndexedPackage` now keys by effective name; verify integration detail docs for packages with duplicate types and names if any UI relies on old `type`-only ids. (cherry picked from commit 170d52a1b934b921c81a4edc59130f54310d77e7) --- .../services/validate_package_policy.ts | 2 +- .../epm/packages/get_template_inputs.ts | 18 ++++--- .../epm/packages/get_templates_inputs.test.ts | 28 +++++++++++ .../server/services/package_policy.test.ts | 49 +++++++++++++++++++ .../fleet/server/services/package_policy.ts | 5 +- .../services/secrets/package_policies.ts | 5 +- 6 files changed, 96 insertions(+), 11 deletions(-) diff --git a/x-pack/platform/plugins/shared/fleet/common/services/validate_package_policy.ts b/x-pack/platform/plugins/shared/fleet/common/services/validate_package_policy.ts index f9da4aa5e1540..3cf68fd787ee6 100644 --- a/x-pack/platform/plugins/shared/fleet/common/services/validate_package_policy.ts +++ b/x-pack/platform/plugins/shared/fleet/common/services/validate_package_policy.ts @@ -481,7 +481,7 @@ export const validatePackagePolicy = ( if (input.streams.length) { input.streams.forEach((stream) => { const streamValidationResults: PackagePolicyConfigValidationResults = {}; - const streamKey = `${stream.data_stream.dataset}-${input.type}`; + const streamKey = `${stream.data_stream.dataset}-${getInputEffectiveName(input)}`; const streamVarDefs = streamVarDefsByDatasetAndInput[streamKey]; const streamVarGroups = streamVarGroupsByDatasetAndInput[streamKey]; diff --git a/x-pack/platform/plugins/shared/fleet/server/services/epm/packages/get_template_inputs.ts b/x-pack/platform/plugins/shared/fleet/server/services/epm/packages/get_template_inputs.ts index 8854d544f8dd6..dd7e206f76c9b 100644 --- a/x-pack/platform/plugins/shared/fleet/server/services/epm/packages/get_template_inputs.ts +++ b/x-pack/platform/plugins/shared/fleet/server/services/epm/packages/get_template_inputs.ts @@ -14,6 +14,7 @@ import { getNormalizedInputs, isIntegrationPolicyTemplate } from '../../../../co import { getStreamsForInputType, + getInputEffectiveName, packageToPackagePolicy, } from '../../../../common/services/package_to_package_policy'; import { _compilePackagePolicyInputs } from '../../package_policy'; @@ -64,9 +65,10 @@ export const templatePackagePolicyToFullInputStreams = ( packagePolicyInputs.forEach((input) => { const streamsIdsMap = new Map(); + const inputEffectiveName = getInputEffectiveName(input); const inputId = input.policy_template - ? `${input.policy_template}-${input.type}` - : `${input.type}`; + ? `${input.policy_template}-${inputEffectiveName}` + : inputEffectiveName; const fullInputStream = { // @ts-ignore-next-line the following id is actually one level above the one in fullInputStream, but the linter thinks it gets overwritten id: inputId, @@ -142,9 +144,10 @@ export async function getTemplateInputs( if (format === 'yml') { // Add a placeholder to all variables without default value for (const inputWithStreamIds of inputsWithStreamIds) { + const inputEffectiveName = getInputEffectiveName(inputWithStreamIds); const inputId = inputWithStreamIds.policy_template - ? `${inputWithStreamIds.policy_template}-${inputWithStreamIds.type}` - : inputWithStreamIds.type; + ? `${inputWithStreamIds.policy_template}-${inputEffectiveName}` + : inputEffectiveName; const packageInput = indexedInputsAndStreams[inputId]; if (!packageInput) { @@ -270,10 +273,11 @@ function buildIndexedPackage(packageInfo: PackageInfo): PackageWithInputAndStrea const inputs = getNormalizedInputs(policyTemplate); inputs.forEach((packageInput) => { - const inputId = `${policyTemplate.name}-${packageInput.type}`; + const inputEffectiveName = getInputEffectiveName(packageInput); + const inputId = `${policyTemplate.name}-${inputEffectiveName}`; const streams = getStreamsForInputType( - packageInput.type, + inputEffectiveName, packageInfo, isIntegrationPolicyTemplate(policyTemplate) && policyTemplate.data_streams ? policyTemplate.data_streams @@ -286,7 +290,7 @@ function buildIndexedPackage(packageInfo: PackageInfo): PackageWithInputAndStrea } > >((acc, stream) => { - const streamId = `${packageInput.type}-${stream.data_stream.dataset}`; + const streamId = `${inputEffectiveName}-${stream.data_stream.dataset}`; acc[streamId] = { ...stream, }; diff --git a/x-pack/platform/plugins/shared/fleet/server/services/epm/packages/get_templates_inputs.test.ts b/x-pack/platform/plugins/shared/fleet/server/services/epm/packages/get_templates_inputs.test.ts index 1180fa8d302b1..4b0a448fe2270 100644 --- a/x-pack/platform/plugins/shared/fleet/server/services/epm/packages/get_templates_inputs.test.ts +++ b/x-pack/platform/plugins/shared/fleet/server/services/epm/packages/get_templates_inputs.test.ts @@ -221,6 +221,34 @@ describe('Fleet - templatePackagePolicyToFullInputStreams', () => { ]); }); + it('uses getInputEffectiveName for template input id when name differs from type', async () => { + expect( + await templatePackagePolicyToFullInputStreams([ + { + ...mockInput2, + name: 'custom-metrics', + type: 'test-metrics', + }, + ]) + ).toEqual([ + { + id: 'some-template-custom-metrics', + type: 'test-metrics', + streams: [ + { + data_stream: { + dataset: 'foo', + type: 'metrics', + }, + fooKey: 'fooValue1', + fooKey2: ['fooValue2'], + id: 'test-metrics-foo', + }, + ], + }, + ]); + }); + it('returns agent inputs without disabled streams', async () => { expect( await templatePackagePolicyToFullInputStreams([ diff --git a/x-pack/platform/plugins/shared/fleet/server/services/package_policy.test.ts b/x-pack/platform/plugins/shared/fleet/server/services/package_policy.test.ts index 10a1ad8f46262..3f69a2da79b12 100644 --- a/x-pack/platform/plugins/shared/fleet/server/services/package_policy.test.ts +++ b/x-pack/platform/plugins/shared/fleet/server/services/package_policy.test.ts @@ -2143,6 +2143,55 @@ describe('Package policy service', () => { ]); }); + it('should compile stream templates when stream.input references a named input (name ?? type)', async () => { + const inputs = await _compilePackagePolicyInputs( + { + name: 'test', + version: '1.0.0', + data_streams: [ + { + type: 'logs', + dataset: 'package.dataset1', + streams: [{ input: 'named_logfile', template_path: 'some_template_path.yml' }], + path: 'dataset1', + }, + ], + policy_templates: [ + { + inputs: [{ type: 'log', name: 'named_logfile' }], + }, + ], + } as unknown as PackageInfo, + {}, + [ + { + type: 'log', + name: 'named_logfile', + enabled: true, + streams: [ + { + id: 'datastream01', + data_stream: { dataset: 'package.dataset1', type: 'logs' }, + enabled: true, + vars: { + paths: { + value: ['/var/log/set.log'], + }, + }, + }, + ], + }, + ], + ASSETS_MAP_FIXTURES + ); + + expect(inputs[0].streams[0].compiled_stream).toEqual({ + metricset: ['dataset1'], + paths: ['/var/log/set.log'], + type: 'log', + }); + }); + it('should compile integration stream data_stream.dataset from package stream var default value', async () => { const pkgInfo = { name: 'dsvarpkg', diff --git a/x-pack/platform/plugins/shared/fleet/server/services/package_policy.ts b/x-pack/platform/plugins/shared/fleet/server/services/package_policy.ts index 56af343afa573..bde3b4fa4b48b 100644 --- a/x-pack/platform/plugins/shared/fleet/server/services/package_policy.ts +++ b/x-pack/platform/plugins/shared/fleet/server/services/package_policy.ts @@ -3808,12 +3808,13 @@ function _compilePackageStream( stream = _applyIndexPrivileges(packageDataStream, streamIn); + const effectiveInputName = getInputEffectiveName(input); const streamFromPkg = (packageDataStream.streams || []).find( - (pkgStream) => pkgStream.input === input.type + (pkgStream) => pkgStream.input === effectiveInputName ); if (!streamFromPkg) { throw new StreamNotFoundError( - `Stream template not found, unable to find stream for input ${input.type}` + `Stream template not found, unable to find stream for input ${effectiveInputName} (type: ${input.type})` ); } diff --git a/x-pack/platform/plugins/shared/fleet/server/services/secrets/package_policies.ts b/x-pack/platform/plugins/shared/fleet/server/services/secrets/package_policies.ts index 03e59698af77b..073fe9ee73ee7 100644 --- a/x-pack/platform/plugins/shared/fleet/server/services/secrets/package_policies.ts +++ b/x-pack/platform/plugins/shared/fleet/server/services/secrets/package_policies.ts @@ -13,6 +13,7 @@ import type { NewPackagePolicy, RegistryStream, UpdatePackagePolicy } from '../. import { SO_SEARCH_LIMIT } from '../../../common'; import { doesPackageHaveIntegrations, + getInputEffectiveName, getNormalizedDataStreams, getNormalizedInputs, } from '../../../common/services'; @@ -391,7 +392,9 @@ function _getInputSecretPaths( if (input.streams.length) { input.streams.forEach((stream, streamIndex) => { const streamVarDefs = - streamSecretVarDefsByDatasetAndInput[`${stream.data_stream.dataset}-${input.type}`]; + streamSecretVarDefsByDatasetAndInput[ + `${stream.data_stream.dataset}-${getInputEffectiveName(input)}` + ]; if (streamVarDefs && Object.keys(streamVarDefs).length) { Object.entries(stream.vars || {}).forEach(([name, configEntry]) => { if (streamVarDefs[name]) {