-
Notifications
You must be signed in to change notification settings - Fork 139
Composable packages: input name qualifiers and stream input refs
#3480
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 27 commits
1916c55
43e6f1b
6ddb76f
c27b5aa
734716a
82e93d0
4d5029d
b6b5e7d
8a9af94
1a62bba
2a34f0c
3a1c690
492bdcb
2c95d6d
b5abcfd
2b86b9e
648e22d
b8f6ccb
e1b1926
86fa6b5
22faa60
7b6c425
fe65c91
40a6d84
b1caf22
a226b44
5c807ec
64d7f53
ccc9dee
f3d6525
4a0e8cc
1f83a73
974de32
65b9c12
b2eff03
ee8d081
cbe6b1c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -193,6 +193,7 @@ type Variable struct { | |
|
|
||
| // Input is a single input configuration. | ||
| type Input struct { | ||
| Name string `config:"name,omitempty" json:"name,omitempty" yaml:"name,omitempty"` | ||
| Type string `config:"type" json:"type" yaml:"type"` | ||
| Package string `config:"package,omitempty" json:"package,omitempty" yaml:"package,omitempty"` | ||
| Vars []Variable `config:"vars" json:"vars" yaml:"vars"` | ||
|
|
@@ -841,11 +842,15 @@ func (dsm *DataStreamManifest) indexTemplateNamePrefix() string { | |
| return "" | ||
| } | ||
|
|
||
| // FindInputByType returns the input for the provided type. | ||
| func (pt *PolicyTemplate) FindInputByType(inputType string) *Input { | ||
| for _, input := range pt.Inputs { | ||
| if input.Type == inputType { | ||
| return &input | ||
| // FindInput returns the first input matching identifier, checked against the | ||
| // input's Name qualifier first (when set) and then its Type. Use this when the | ||
| // identifier may be a name qualifier (set on inputs that share a type) rather | ||
| // than a bare type string. | ||
| func (pt *PolicyTemplate) FindInput(identifier string) *Input { | ||
|
jsoriano marked this conversation as resolved.
Outdated
|
||
| for i := range pt.Inputs { | ||
| input := &pt.Inputs[i] | ||
| if (input.Name != "" && input.Name == identifier) || input.Type == identifier { | ||
| return input | ||
| } | ||
| } | ||
|
Comment on lines
+850
to
+855
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Q: Should this loop duplicated to first look for input.Name and if it does not find any , then look for input.Type ? Is it ok to search at the same time for both ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as defiened on spec https://github.com/elastic/package-spec/blob/61cee23beb811b6c93ddb71caf822914ee30afba/code/go/internal/validator/semantic/validate_integration_input_qualifier.go#L68 the propoerty name is required when inputs share the same type. so i think is safe to make this comparison, so when name is found, its due to the fact that type is the same. when name is not found (empty) then we can trust type |
||
| return nil | ||
|
|
@@ -906,8 +911,8 @@ func findPolicyTemplateForDataStream(pkg PackageManifest, ds DataStreamManifest, | |
|
|
||
| var matchedPolicyTemplates []string | ||
| for _, policyTemplate := range pkg.PolicyTemplates { | ||
| // Does this policy_template include this input type? | ||
| if policyTemplate.FindInputByType(inputName) == nil { | ||
| // Does this policy_template include an input for this identifier (name or type)? | ||
| if policyTemplate.FindInput(inputName) == nil { | ||
| continue | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ import ( | |
| // inputPkgInfo holds the resolved metadata from an input package needed to | ||
| // replace package: references in composable package manifests. | ||
| type inputPkgInfo struct { | ||
| pkgName string // manifest.name; used as the input name qualifier when disambiguation is needed | ||
|
mrodm marked this conversation as resolved.
Outdated
|
||
| identifier string // policy_templates[0].input; if several templates exist, only the first is used | ||
| pkgTitle string // manifest.title (fallback title) | ||
| pkgDescription string // manifest.description (fallback description) | ||
|
|
@@ -27,6 +28,12 @@ type inputPkgInfo struct { | |
| // data_stream/*/manifest.yml (streams[]) with the actual input type identifier | ||
| // from the referenced input package, then removes the package: key. | ||
| // | ||
| // When multiple inputs in the same policy template share the same type (e.g., two | ||
| // otelcol inputs from different required packages), it also sets a unique name on | ||
| // each such input (derived from the package name) so Fleet can distinguish them. | ||
| // In that case, data stream manifests use the name as their input reference instead | ||
| // of the type. | ||
| // | ||
| // This step must run last, after mergeVariables, because that step uses | ||
| // stream.Package and input.Package to identify which entries to process. | ||
| // It resolves metadata per required input via buildInputPkgInfoByName, then | ||
|
|
@@ -41,11 +48,44 @@ func (r *RequiredInputsResolver) resolveStreamInputTypes( | |
| return err | ||
| } | ||
|
|
||
| if err := applyInputTypesToComposableManifest(manifest, buildRoot, infoByPkg); err != nil { | ||
| streamInputRefs := buildStreamInputRefs(manifest, infoByPkg) | ||
|
jsoriano marked this conversation as resolved.
Outdated
|
||
|
|
||
| if err := applyInputTypesToComposableManifest(manifest, buildRoot, infoByPkg, streamInputRefs); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| return applyInputTypesToDataStreamManifests(buildRoot, infoByPkg) | ||
| return applyInputTypesToDataStreamManifests(buildRoot, infoByPkg, streamInputRefs) | ||
| } | ||
|
|
||
| // buildStreamInputRefs computes what value to write to streams[].input per required package name. | ||
| // When two or more inputs of the same type coexist in any single policy template, each conflicting | ||
| // package is given its own package name as the qualifier (used as both the input name and the | ||
| // stream input reference). Packages with a unique type within every policy template get their type | ||
| // identifier. Once a package is marked as needing a qualifier it keeps that assignment even if it | ||
| // appears in other policy templates without a type conflict. | ||
| func buildStreamInputRefs(manifest *packages.PackageManifest, infoByPkg map[string]inputPkgInfo) map[string]string { | ||
| refs := make(map[string]string, len(infoByPkg)) | ||
| for _, pt := range manifest.PolicyTemplates { | ||
| typeCounts := make(map[string]int) | ||
| for _, input := range pt.Inputs { | ||
| if input.Package == "" { | ||
| continue | ||
| } | ||
| typeCounts[infoByPkg[input.Package].identifier]++ | ||
| } | ||
| for _, input := range pt.Inputs { | ||
| if input.Package == "" { | ||
| continue | ||
| } | ||
| info := infoByPkg[input.Package] | ||
| if typeCounts[info.identifier] > 1 { | ||
| refs[input.Package] = input.Package // package name as stable unique qualifier | ||
| } else if _, exists := refs[input.Package]; !exists { | ||
| refs[input.Package] = info.identifier | ||
| } | ||
| } | ||
| } | ||
| return refs | ||
| } | ||
|
|
||
| // buildInputPkgInfoByName loads inputPkgInfo for each downloaded required input package path. | ||
|
|
@@ -61,12 +101,15 @@ func buildInputPkgInfoByName(inputPkgPaths map[string]string) (map[string]inputP | |
| return infoByPkg, nil | ||
| } | ||
|
|
||
| // applyInputTypesToComposableManifest sets type (and optional title/description) on | ||
| // applyInputTypesToComposableManifest sets type (and optional title/description/name) on | ||
| // package-backed policy template inputs in manifest.yml and drops package:. | ||
| // When streamInputRefs maps a package to its own name (indicating a type conflict), it also | ||
| // sets name on that input so Fleet can distinguish multiple inputs of the same type. | ||
| func applyInputTypesToComposableManifest( | ||
| manifest *packages.PackageManifest, | ||
| buildRoot *os.Root, | ||
| infoByPkg map[string]inputPkgInfo, | ||
| streamInputRefs map[string]string, | ||
| ) error { | ||
| manifestBytes, err := buildRoot.ReadFile("manifest.yml") | ||
| if err != nil { | ||
|
|
@@ -93,6 +136,9 @@ func applyInputTypesToComposableManifest( | |
| } | ||
|
|
||
| upsertKey(inputNode, "type", strVal(info.identifier)) | ||
| if streamInputRefs[input.Package] == input.Package { | ||
| upsertKey(inputNode, "name", strVal(input.Package)) | ||
| } | ||
|
|
||
| if mappingValue(inputNode, "title") == nil && info.pkgTitle != "" { | ||
| upsertKey(inputNode, "title", strVal(info.pkgTitle)) | ||
|
|
@@ -116,8 +162,11 @@ func applyInputTypesToComposableManifest( | |
| } | ||
|
|
||
| // applyInputTypesToDataStreamManifests sets input on package-backed streams in each | ||
| // data_stream/*/manifest.yml and drops package:. | ||
| func applyInputTypesToDataStreamManifests(buildRoot *os.Root, infoByPkg map[string]inputPkgInfo) error { | ||
| // data_stream/*/manifest.yml and drops package:. The value written to streams[].input | ||
| // is taken from streamInputRefs: the package name when disambiguation is required | ||
| // (i.e. the corresponding policy template input carries a name qualifier), otherwise | ||
| // the type identifier. | ||
| func applyInputTypesToDataStreamManifests(buildRoot *os.Root, infoByPkg map[string]inputPkgInfo, streamInputRefs map[string]string) error { | ||
| dsManifestPaths, err := fs.Glob(buildRoot.FS(), "data_stream/*/manifest.yml") | ||
| if err != nil { | ||
| return fmt.Errorf("globbing data stream manifests: %w", err) | ||
|
|
@@ -153,7 +202,7 @@ func applyInputTypesToDataStreamManifests(buildRoot *os.Root, infoByPkg map[stri | |
| return fmt.Errorf("getting stream node at index %d in %q: %w", streamIdx, manifestPath, err) | ||
| } | ||
|
|
||
| upsertKey(streamNode, "input", strVal(info.identifier)) | ||
| upsertKey(streamNode, "input", strVal(streamInputRefs[stream.Package])) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should it be ensured that |
||
|
|
||
| if stream.Title == "" && info.pkgTitle != "" { | ||
| upsertKey(streamNode, "title", strVal(info.pkgTitle)) | ||
|
|
@@ -211,6 +260,7 @@ func loadInputPkgInfo(pkgPath string) (inputPkgInfo, error) { | |
| } | ||
|
|
||
| return inputPkgInfo{ | ||
| pkgName: m.Name, | ||
| identifier: pt.Input, | ||
| pkgTitle: m.Title, | ||
| pkgDescription: m.Description, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.