Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/howto/policy_testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ In these configuration files it is possible to define:
- Values for the variables of the data stream manifests (only used for
integration packages).
- Input to use, for packages supporting multiple input types.
- Overrides for the data stream routing (only used for input packages). See [Overriding data stream type and dataset for input packages](./system_testing.md#overriding-data-stream-type-and-dataset-for-input-packages) in the system testing documentation.

For example, the following configuration tells Fleet to create a policy using an
specific input, and some variables at the package level:
Expand Down
28 changes: 28 additions & 0 deletions docs/howto/system_testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,34 @@ for system tests.
| wait_for_data_timeout | duration | | Amount of time to wait for data to be present in Elasticsearch. Defaults to 10m. |
| wait_for_dynamic_streams_stable | duration | | For packages with dynamic data streams (e.g. `dynamic_signal_types`), minimum time the discovered data stream count must stay unchanged after the first stream appears before discovery completes. Defaults to 10s. |

#### Overriding data stream type and dataset for input packages

For **input packages**, the top-level `vars` field also accepts two special variables that
override the data stream routing set by Fleet:

- `data_stream.dataset` — overrides the dataset portion of the data stream name
(e.g. `sql_input.test` instead of the default `sql_input.sql_query`).
- `data_stream.type` — overrides the signal type (`logs`, `metrics`, or `traces`).
Requires Kibana 8.18.3 / 8.19.0 / 9.1.0 or later (Fleet PR [#214216](https://github.com/elastic/kibana/pull/214216)).
Because the Kibana simplified Fleet API does not yet accept `data_stream.type` as a
stream-level variable (it rejects unknown vars), tests that use this override must also
set `policy_api_format: legacy` until a Kibana fix is available ([#266321](https://github.com/elastic/kibana/issues/266321)).

```yaml
policy_api_format: legacy
vars:
hosts:
- root:test@tcp({{Hostname}}:{{Port}})/
sql_query: "SHOW GLOBAL STATUS LIKE 'Innodb_data%';"
data_stream.type: logs
data_stream.dataset: sql_input.test
```

When `data_stream.type` is set, `elastic-package` passes it to Fleet as a stream-level variable
and uses it when computing the expected Elasticsearch data stream name
(`<type>-<dataset>-<namespace>`). Without this override, the type defaults to the `type` field
of the policy template in the package manifest.

For example, the `apache/access` data stream's `test-access-log-config.yml` is
shown below.

Expand Down
28 changes: 25 additions & 3 deletions internal/kibana/packagepolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,18 @@ func BuildInputPackagePolicy(

vars := SetKibanaVariables(policyTemplate.Vars, varValues)
ensureDatasetVar(vars, policyTemplate, varValues)
ensureDataStreamTypeVar(vars, varValues)
// For otelcol, data_stream.dataset is the base name only (do not include a ".otel" suffix).
// Elastic Agent appends ".otel" at ingest; Fleet and this builder do not.
if policyTemplate.Input == "otelcol" {
ensureUseAPMVar(vars, varValues)
}
dsType := policyTemplate.Type
if v, found := vars["data_stream.type"]; found && v.fromUser {
if s, ok := v.Value.Value().(string); ok && s != "" {
dsType = s
}
}
inputEntry := PackagePolicyInput{
Enabled: enabled,
Streams: map[string]PackagePolicyStream{
Expand All @@ -178,9 +185,7 @@ func BuildInputPackagePolicy(
Vars: vars.ToMapStr(),
legacyVars: vars,
dataStreamDataset: streamDataset,
// dataStreamType is intentionally empty: input packages
// require Kibana >= 7.16 (simplified API), so legacy
// conversion is not needed.
dataStreamType: dsType,
},
},
inputType: streamInput,
Expand Down Expand Up @@ -279,6 +284,23 @@ func ensureUseAPMVar(vars Vars, varValues common.MapStr) {
}
}

// ensureDataStreamTypeVar injects data_stream.type into vars with fromUser=true so
// that the simplified API includes it, overriding the default from policy_template.type.
// Only applies when the user explicitly provides a value in varValues; when absent,
// Fleet uses policy_template.type as the default (no injection needed).
// This mirrors the override introduced in Kibana PR #214216 for input packages.
func ensureDataStreamTypeVar(vars Vars, varValues common.MapStr) {
raw, err := varValues.GetValue("data_stream.type")
if err != nil {
return
}
var val packages.VarValue
if err := val.Unpack(raw); err != nil {
return
}
setVarFromUser(vars, "data_stream.type", "text", val)
}

// setVarFromUser sets vars[name] with fromUser=true so that the variable is included
// in simplified API requests (ToMapStr). It is a no-op when the var is already
// user-set, preserving any value previously established by SetKibanaVariables.
Expand Down
15 changes: 15 additions & 0 deletions internal/kibana/packagepolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,21 @@ func TestBuildInputPackagePolicy(t *testing.T) {
goldenSimplified: "testdata/log_custom_logs.json",
goldenLegacy: "testdata/log_custom_logs_legacy.json",
},
{
// Override data_stream.type from the default "logs" to "metrics".
// Mirrors the Kibana Fleet feature (PR #214216) for input packages.
name: "log_custom_logs_metrics_type",
packageRoot: "testdata/packages/log_input",
policyTemplateName: "logs",
policyName: "log-logs-test",
varValues: common.MapStr{
"paths": []string{"/tmp/test.log"},
"data_stream.dataset": "log.custom",
"data_stream.type": "metrics",
},
goldenSimplified: "testdata/log_custom_logs_metrics_type.json",
goldenLegacy: "testdata/log_custom_logs_metrics_type_legacy.json",
},
{
name: "sql_input_custom_dataset",
packageRoot: "../../test/packages/parallel/sql_input",
Expand Down
2 changes: 1 addition & 1 deletion internal/kibana/testdata/input_with_pkg_vars_legacy.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
{
"enabled": true,
"data_stream": {
"type": "",
"type": "logs",
"dataset": "input_with_pkg_vars.logs"
},
"vars": {
Expand Down
2 changes: 1 addition & 1 deletion internal/kibana/testdata/log_custom_logs_legacy.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
{
"enabled": true,
"data_stream": {
"type": "",
"type": "logs",
"dataset": "log.logs"
},
"vars": {
Expand Down
28 changes: 28 additions & 0 deletions internal/kibana/testdata/log_custom_logs_metrics_type.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"name": "log-logs-test",
"description": "",
"namespace": "test",
"policy_id": "test-policy-id",
"package": {
"name": "log",
"version": "2.4.4"
},
"inputs": {
"logs-logfile": {
"enabled": true,
"streams": {
"log.logs": {
"enabled": true,
"vars": {
"data_stream.dataset": "log.custom",
"data_stream.type": "metrics",
"paths": [
"/tmp/test.log"
]
}
}
}
}
},
"force": false
}
94 changes: 94 additions & 0 deletions internal/kibana/testdata/log_custom_logs_metrics_type_legacy.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
{
"name": "log-logs-test",
"description": "",
"namespace": "test",
"policy_id": "test-policy-id",
"enabled": true,
"package": {
"name": "log",
"title": "Custom Logs (Deprecated)",
"version": "2.4.4"
},
"inputs": [
{
"policy_template": "logs",
"type": "logfile",
"enabled": true,
"vars": {
"custom": {
"value": "",
"type": "yaml"
},
"data_stream.dataset": {
"value": "log.custom",
"type": "text"
},
"data_stream.type": {
"value": "metrics",
"type": "text"
},
"exclude_files": {
"value": [],
"type": "text"
},
"ignore_older": {
"value": "72h",
"type": "text"
},
"paths": {
"value": [
"/tmp/test.log"
],
"type": "text"
},
"tags": {
"value": [],
"type": "text"
}
},
"streams": [
{
"enabled": true,
"data_stream": {
"type": "metrics",
"dataset": "log.logs"
},
"vars": {
"custom": {
"value": "",
"type": "yaml"
},
"data_stream.dataset": {
"value": "log.custom",
"type": "text"
},
"data_stream.type": {
"value": "metrics",
"type": "text"
},
"exclude_files": {
"value": [],
"type": "text"
},
"ignore_older": {
"value": "72h",
"type": "text"
},
"paths": {
"value": [
"/tmp/test.log"
],
"type": "text"
},
"tags": {
"value": [],
"type": "text"
}
}
}
]
}
],
"output_id": "",
"force": false
}
2 changes: 1 addition & 1 deletion internal/kibana/testdata/otel_traces_use_apm_legacy.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
{
"enabled": true,
"data_stream": {
"type": "",
"type": "traces",
"dataset": "otel_traces_input.receiver"
},
"vars": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
{
"enabled": true,
"data_stream": {
"type": "",
"type": "metrics",
"dataset": "sql_input.sql_query"
},
"vars": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
{
"enabled": true,
"data_stream": {
"type": "",
"type": "metrics",
"dataset": "sql_input.sql_query"
},
"vars": {
Expand Down
28 changes: 27 additions & 1 deletion internal/testrunner/runners/system/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -2171,7 +2171,7 @@ func CreatePackagePolicy(
*pkg, policyTemplate, cfgVars, true,
)
fallbackDataset := fmt.Sprintf("%s.%s", pkg.Name, policyTemplate.Name)
return p, policyTemplate.Type, datasetFromPolicy(p, fallbackDataset), nil
return p, dataStreamTypeFromPolicy(p, policyTemplate.Type), datasetFromPolicy(p, fallbackDataset), nil
}
if ds == nil {
return kibana.PackagePolicy{}, "", "", fmt.Errorf("data stream manifest is required for integration packages")
Expand Down Expand Up @@ -2230,6 +2230,32 @@ func datasetFromPolicy(policy kibana.PackagePolicy, fallback string) string {
return fallback
}

// dataStreamTypeFromPolicy returns the data stream type stored in the enabled stream's
// data_stream.type var (set when the user overrides it via the test config). Falls back
// to the supplied fallback (normally policyTemplate.Type) when not explicitly set.
func dataStreamTypeFromPolicy(policy kibana.PackagePolicy, fallback string) string {
for _, input := range policy.Inputs {
if !input.Enabled {
continue
}
for _, stream := range input.Streams {
if !stream.Enabled {
continue
}

v, _ := common.MapStr(stream.Vars).GetValue("data_stream.type")
t, _ := v.(string)
if t == "" {
continue
}

return t
}
}

return fallback
}

func (r *tester) checkTransforms(ctx context.Context, config *testConfig, pkgManifest *packages.PackageManifest, dataStream, policyTemplateInput string, syntheticEnabled bool) error {
if config.SkipTransformValidation {
return nil
Expand Down
Loading