-
Notifications
You must be signed in to change notification settings - Fork 139
Composable packages: input name qualifiers and stream input refs
#3480
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
teresaromero
merged 37 commits into
elastic:main
from
teresaromero:composable/input-name
May 7, 2026
Merged
Changes from 35 commits
Commits
Show all changes
37 commits
Select commit
Hold shift + click to select a range
1916c55
Set input name qualifier and stream input ref for composable duplicat…
teresaromero 43e6f1b
Refactor input handling for composable packages to support name quali…
teresaromero 6ddb76f
Update go.mod and go.sum to replace package-spec dependency with pre-…
teresaromero c27b5aa
Merge branch 'main' of github.com:elastic/elastic-package into compos…
teresaromero 734716a
Merge branch 'main' of github.com:elastic/elastic-package into compos…
teresaromero 82e93d0
chore: update go.mod and go.sum to use stable version of package-spec…
teresaromero 4d5029d
chore: add stack version files and update Kibana version in manifests…
teresaromero b6b5e7d
chore: enhance composable package testing script with environment var…
teresaromero 8a9af94
chore: refactor composable package testing script to handle multiple …
teresaromero 1a62bba
Add composable test package for nginx
jsoriano 2a34f0c
Enhance Nginx composable data stream configuration by adding variable…
teresaromero 3a1c690
chore: update Makefile and test-composable-packages.sh for composable…
teresaromero 492bdcb
refactor: remove FindInputByType method from PolicyTemplate
teresaromero 2c95d6d
chore: remove deprecated stack version files and update test-composab…
teresaromero b5abcfd
docs: clarify inputType comments in PackagePolicyInput and BuildInteg…
teresaromero 2b86b9e
chore: add stack version file and update nginx composable data stream…
teresaromero 648e22d
Merge branch 'main' of github.com:elastic/elastic-package into compos…
teresaromero b8f6ccb
Merge branch 'main' of github.com:elastic/elastic-package into compos…
teresaromero e1b1926
fix: always build package policy from built tree to resolve composabl…
teresaromero 86fa6b5
chore: update nginx composable stack version to 9.4.0-SNAPSHOT
teresaromero 22faa60
refactor: introduce ReadBuiltPackageManifest to streamline package ma…
teresaromero 7b6c425
feat: add sample event JSON files for nginx composable access and stu…
teresaromero fe65c91
Merge branch 'main' of github.com:elastic/elastic-package into compos…
teresaromero 40a6d84
chore: update nginx composable manifest and add validation file
teresaromero b1caf22
Merge branch 'main' of github.com:elastic/elastic-package into compos…
teresaromero a226b44
Refactor addPackagePolicy to improve input resolution logic for polic…
teresaromero 5c807ec
Merge branch 'main' of github.com:elastic/elastic-package into compos…
teresaromero 64d7f53
Update test-composable-packages.sh to use Elastic stack version 9.4.0…
teresaromero ccc9dee
Merge branch 'main' of github.com:elastic/elastic-package into compos…
teresaromero f3d6525
Add nginx_composable.stack_version file with version 9.4.0-SNAPSHOT
teresaromero 4a0e8cc
Refactor input handling in package policy to use effective names
teresaromero 1f83a73
Refactor inputPkgInfo structure to remove pkgName field
teresaromero 974de32
Update inputPkgInfo to use effectiveName instead of identifier
teresaromero 65b9c12
Refactor input package metadata handling to use input field
teresaromero b2eff03
Enhance buildStreamInputRefs documentation for clarity and detail
teresaromero ee8d081
Improve error handling for missing stream input references in applyIn…
teresaromero cbe6b1c
Refactor CreatePackagePolicy and buildIntegrationPackagePolicyFromBui…
teresaromero File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,10 +14,10 @@ import ( | |
| "github.com/elastic/elastic-package/internal/packages" | ||
| ) | ||
|
|
||
| // inputPkgInfo holds the resolved metadata from an input package needed to | ||
| // replace package: references in composable package manifests. | ||
| type inputPkgInfo struct { | ||
| identifier string // policy_templates[0].input; if several templates exist, only the first is used | ||
| // inputPolicyTemplateInfo holds the resolved metadata from an input package needed to | ||
| // replace package: references in composable (integrations) package manifests. | ||
| type inputPolicyTemplateInfo struct { | ||
| input string // policy_templates[0].input; if several templates exist, only the first is used | ||
| pkgTitle string // manifest.title (fallback title) | ||
| pkgDescription string // manifest.description (fallback description) | ||
| } | ||
|
|
@@ -27,6 +27,12 @@ type inputPkgInfo struct { | |
| // data_stream/*/manifest.yml (streams[]) with the actual input type identifier | ||
| // from the referenced input package, then removes the package: key. | ||
| // | ||
| // When multiple inputs in the same policy template share the same type (e.g., two | ||
| // otelcol inputs from different required packages), it also sets a unique name on | ||
| // each such input (derived from the package name) so Fleet can distinguish them. | ||
| // In that case, data stream manifests use the name as their input reference instead | ||
| // of the type. | ||
| // | ||
| // This step must run last, after mergeVariables, because that step uses | ||
| // stream.Package and input.Package to identify which entries to process. | ||
| // It resolves metadata per required input via buildInputPkgInfoByName, then | ||
|
|
@@ -41,32 +47,80 @@ func (r *RequiredInputsResolver) resolveStreamInputTypes( | |
| return err | ||
| } | ||
|
|
||
| if err := applyInputTypesToComposableManifest(manifest, buildRoot, infoByPkg); err != nil { | ||
| streamInputEffectveNames := buildStreamInputRefs(manifest, infoByPkg) | ||
|
|
||
| if err := applyInputTypesToComposableManifest(manifest, buildRoot, infoByPkg, streamInputEffectveNames); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| return applyInputTypesToDataStreamManifests(buildRoot, infoByPkg) | ||
| return applyInputTypesToDataStreamManifests(buildRoot, infoByPkg, streamInputEffectveNames) | ||
| } | ||
|
|
||
| // buildInputPkgInfoByName loads inputPkgInfo for each downloaded required input package path. | ||
| func buildInputPkgInfoByName(inputPkgPaths map[string]string) (map[string]inputPkgInfo, error) { | ||
| infoByPkg := make(map[string]inputPkgInfo, len(inputPkgPaths)) | ||
| // buildStreamInputRefs computes the value to write to streams[].input for each | ||
| // required input package referenced by the integration. | ||
| // | ||
| // By default the ref is the resolved input type (e.g. "logfile", "otelcol"). | ||
| // If a policy template contains multiple required input packages that resolve to | ||
| // the same type, those packages use their package name as a stable qualifier; | ||
| // streams[].input references that qualifier (and the corresponding input gets | ||
| // name: <package>) so Fleet can distinguish same-type inputs. | ||
| // | ||
| // Duplicate detection is per policy template (not integration-wide) because only | ||
| // inputs that coexist within a single policy template must be unique. When | ||
| // building the Fleet package policy, inputs are keyed as | ||
| // "{policyTemplate.Name}-{effectiveName}" where effectiveName is input Name when | ||
| // set, otherwise Type. | ||
| // Once a package is qualified, it keeps that ref across policy templates. | ||
| func buildStreamInputRefs(manifest *packages.PackageManifest, infoByInputPkg map[string]inputPolicyTemplateInfo) map[string]string { | ||
| refs := make(map[string]string, len(infoByInputPkg)) | ||
| // iterate over all policy templates from composable package manifest | ||
| for _, pt := range manifest.PolicyTemplates { | ||
| // first count the number of inputs of each type | ||
| typeCounts := make(map[string]int) | ||
| for _, input := range pt.Inputs { | ||
| if input.Package == "" { | ||
| continue | ||
| } | ||
| // integration input type is equivalent to the policy template input identifier in the required input package | ||
| typeCounts[infoByInputPkg[input.Package].input]++ | ||
| } | ||
| for _, input := range pt.Inputs { | ||
| if input.Package == "" { | ||
| continue | ||
| } | ||
| importedInput := infoByInputPkg[input.Package] | ||
| if typeCounts[importedInput.input] > 1 { | ||
| refs[input.Package] = input.Package // package name as stable unique qualifier | ||
| } else if _, exists := refs[input.Package]; !exists { | ||
| refs[input.Package] = importedInput.input | ||
| } | ||
| } | ||
| } | ||
| return refs | ||
| } | ||
|
|
||
| // buildInputPkgInfoByName loads inputPolicyTemplateInfo for each downloaded required input package path. | ||
| func buildInputPkgInfoByName(inputPkgPaths map[string]string) (map[string]inputPolicyTemplateInfo, error) { | ||
| infoByInputPkg := make(map[string]inputPolicyTemplateInfo, len(inputPkgPaths)) | ||
| for pkgName, pkgPath := range inputPkgPaths { | ||
| info, err := loadInputPkgInfo(pkgPath) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("loading input package info for %q: %w", pkgName, err) | ||
| } | ||
| infoByPkg[pkgName] = info | ||
| infoByInputPkg[pkgName] = *info | ||
| } | ||
| return infoByPkg, nil | ||
| return infoByInputPkg, nil | ||
| } | ||
|
|
||
| // applyInputTypesToComposableManifest sets type (and optional title/description) on | ||
| // package-backed policy template inputs in manifest.yml and drops package:. | ||
| // applyInputTypesToComposableManifest sets type (and optional title/description/name) on | ||
| // package-backed policy template inputs in manifest.yml and drops package field. | ||
| // When streamInputRefs maps a package to its own name (indicating a type conflict), it also | ||
| // sets name on that input so Fleet can distinguish multiple inputs of the same type. | ||
| func applyInputTypesToComposableManifest( | ||
| manifest *packages.PackageManifest, | ||
| buildRoot *os.Root, | ||
| infoByPkg map[string]inputPkgInfo, | ||
| infoByInputPkg map[string]inputPolicyTemplateInfo, | ||
| streamInputRefs map[string]string, | ||
| ) error { | ||
| manifestBytes, err := buildRoot.ReadFile("manifest.yml") | ||
| if err != nil { | ||
|
|
@@ -82,7 +136,7 @@ func applyInputTypesToComposableManifest( | |
| if input.Package == "" { | ||
| continue | ||
| } | ||
| info, ok := infoByPkg[input.Package] | ||
| info, ok := infoByInputPkg[input.Package] | ||
| if !ok { | ||
| return fmt.Errorf("input package %q referenced in policy_templates[%d].inputs[%d] not found in required inputs", input.Package, ptIdx, inputIdx) | ||
| } | ||
|
|
@@ -92,7 +146,10 @@ func applyInputTypesToComposableManifest( | |
| return fmt.Errorf("getting input node at pt[%d].inputs[%d]: %w", ptIdx, inputIdx, err) | ||
| } | ||
|
|
||
| upsertKey(inputNode, "type", strVal(info.identifier)) | ||
| upsertKey(inputNode, "type", strVal(info.input)) | ||
| if streamInputRefs[input.Package] == input.Package { | ||
| upsertKey(inputNode, "name", strVal(input.Package)) | ||
| } | ||
|
|
||
| if mappingValue(inputNode, "title") == nil && info.pkgTitle != "" { | ||
| upsertKey(inputNode, "title", strVal(info.pkgTitle)) | ||
|
|
@@ -116,8 +173,11 @@ func applyInputTypesToComposableManifest( | |
| } | ||
|
|
||
| // applyInputTypesToDataStreamManifests sets input on package-backed streams in each | ||
| // data_stream/*/manifest.yml and drops package:. | ||
| func applyInputTypesToDataStreamManifests(buildRoot *os.Root, infoByPkg map[string]inputPkgInfo) error { | ||
| // data_stream/*/manifest.yml and drops package:. The value written to streams[].input | ||
| // is taken from streamInputRefs: the package name when disambiguation is required | ||
| // (i.e. the corresponding policy template input carries a name qualifier), otherwise | ||
| // the type identifier. | ||
| func applyInputTypesToDataStreamManifests(buildRoot *os.Root, infoByInputPkg map[string]inputPolicyTemplateInfo, streamInputRefs map[string]string) error { | ||
| dsManifestPaths, err := fs.Glob(buildRoot.FS(), "data_stream/*/manifest.yml") | ||
| if err != nil { | ||
| return fmt.Errorf("globbing data stream manifests: %w", err) | ||
|
|
@@ -143,7 +203,7 @@ func applyInputTypesToDataStreamManifests(buildRoot *os.Root, infoByPkg map[stri | |
| if stream.Package == "" { | ||
| continue | ||
| } | ||
| info, ok := infoByPkg[stream.Package] | ||
| info, ok := infoByInputPkg[stream.Package] | ||
| if !ok { | ||
| return fmt.Errorf("input package %q referenced in %q streams[%d] not found in required inputs", stream.Package, path.Dir(manifestPath), streamIdx) | ||
| } | ||
|
|
@@ -153,7 +213,7 @@ func applyInputTypesToDataStreamManifests(buildRoot *os.Root, infoByPkg map[stri | |
| return fmt.Errorf("getting stream node at index %d in %q: %w", streamIdx, manifestPath, err) | ||
| } | ||
|
|
||
| upsertKey(streamNode, "input", strVal(info.identifier)) | ||
| upsertKey(streamNode, "input", strVal(streamInputRefs[stream.Package])) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should it be ensured that |
||
|
|
||
| if stream.Title == "" && info.pkgTitle != "" { | ||
| upsertKey(streamNode, "title", strVal(info.pkgTitle)) | ||
|
|
@@ -181,37 +241,37 @@ func applyInputTypesToDataStreamManifests(buildRoot *os.Root, infoByPkg map[stri | |
| // needed to replace package: references in composable packages. When the input | ||
| // package has several policy templates, only the first template's input id is | ||
| // used and a warning is logged. | ||
| func loadInputPkgInfo(pkgPath string) (inputPkgInfo, error) { | ||
| func loadInputPkgInfo(pkgPath string) (*inputPolicyTemplateInfo, error) { | ||
| pkgFS, closeFn, err := openPackageFS(pkgPath) | ||
| if err != nil { | ||
| return inputPkgInfo{}, fmt.Errorf("opening package: %w", err) | ||
| return nil, fmt.Errorf("opening package: %w", err) | ||
| } | ||
| defer func() { _ = closeFn() }() | ||
|
|
||
| manifestBytes, err := fs.ReadFile(pkgFS, packages.PackageManifestFile) | ||
| if err != nil { | ||
| return inputPkgInfo{}, fmt.Errorf("reading manifest: %w", err) | ||
| return nil, fmt.Errorf("reading manifest: %w", err) | ||
| } | ||
|
|
||
| m, err := packages.ReadPackageManifestBytes(manifestBytes) | ||
| if err != nil { | ||
| return inputPkgInfo{}, fmt.Errorf("parsing manifest: %w", err) | ||
| return nil, fmt.Errorf("parsing manifest: %w", err) | ||
| } | ||
|
|
||
| if len(m.PolicyTemplates) == 0 { | ||
| return inputPkgInfo{}, fmt.Errorf("input package %q has no policy templates", m.Name) | ||
| return nil, fmt.Errorf("input package %q has no policy templates", m.Name) | ||
| } | ||
| if len(m.PolicyTemplates) > 1 { | ||
| logger.Warnf("Input package %q has multiple policy templates; using input identifier %q from first policy template only", m.Name, m.PolicyTemplates[0].Input) | ||
| } | ||
|
|
||
| pt := m.PolicyTemplates[0] | ||
| if pt.Input == "" { | ||
| return inputPkgInfo{}, fmt.Errorf("input package %q policy template %q has no input identifier", m.Name, pt.Name) | ||
| return nil, fmt.Errorf("input package %q policy template %q has no input identifier", m.Name, pt.Name) | ||
| } | ||
|
|
||
| return inputPkgInfo{ | ||
| identifier: pt.Input, | ||
| return &inputPolicyTemplateInfo{ | ||
| input: pt.Input, | ||
| pkgTitle: m.Title, | ||
| pkgDescription: m.Description, | ||
| }, nil | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Should this loop duplicated to first look for input.Name and if it does not find any , then look for input.Type ?
Is it ok to search at the same time for both ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as defiened on spec https://github.com/elastic/package-spec/blob/61cee23beb811b6c93ddb71caf822914ee30afba/code/go/internal/validator/semantic/validate_integration_input_qualifier.go#L68 the propoerty name is required when inputs share the same type. so i think is safe to make this comparison, so when name is found, its due to the fact that type is the same. when name is not found (empty) then we can trust type