Skip to content

[kafka_log] Update integration to input type to provide multi-signal support#18266

Merged
adrianchen-es merged 16 commits intoelastic:mainfrom
adrianchen-es:18264-ac-kafka-multi_signal
May 1, 2026
Merged

[kafka_log] Update integration to input type to provide multi-signal support#18266
adrianchen-es merged 16 commits intoelastic:mainfrom
adrianchen-es:18264-ac-kafka-multi_signal

Conversation

@adrianchen-es
Copy link
Copy Markdown
Contributor

@adrianchen-es adrianchen-es commented Apr 8, 2026

Proposed commit message

Enhance the Kafka log integration to allow ingestion of metrics.

The pattern implemented aligns with other integrations that allow both log and metric ingestions.
A dynamic type is not used as it will require existing users to recreate the integration whereas a separate datastream reduces the potential complication and allows existing integrations to enable metrics or switch.

Checklist

  • I have reviewed tips for building integrations and this pull request is aligned with them.
  • I have verified that all data streams collect metrics or logs.
  • I have added an entry to my package's changelog.yml file.
  • I have verified that Kibana version constraints are current according to guidelines.
  • I have verified that any added dashboard complies with Kibana's Dashboard good practices

Author's Checklist

  • [ ]

How to test this PR locally

Related issues

Closes #18264

Screenshots

@adrianchen-es adrianchen-es added enhancement New feature or request >enhancement labels Apr 8, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 8, 2026

Vale Linting Results

Summary: 3 suggestions found

💡 Suggestions (3)
File Line Rule Message
packages/kafka_input/docs/README.md 13 Elastic.WordChoice Consider using 'refer to if it's a document, view if it's a UI element' instead of 'See', unless the term is in the UI.
packages/kafka_input/docs/README.md 19 Elastic.Ellipses In general, don't use an ellipsis.
packages/kafka_input/docs/README.md 19 Elastic.Ellipses In general, don't use an ellipsis.

The Vale linter checks documentation changes against the Elastic Docs style guide.

To use Vale locally or report issues, refer to Elastic style guide for Vale.

@adrianchen-es adrianchen-es self-assigned this Apr 8, 2026
@adrianchen-es adrianchen-es marked this pull request as ready for review April 8, 2026 04:51
@adrianchen-es adrianchen-es requested a review from a team as a code owner April 8, 2026 04:51
@agithomas
Copy link
Copy Markdown
Contributor

Could you also add the recommended pipeline tests as part the PR.

++ @stefans-elastic for reviewing the PR.

cc @lalit-satapathy

@adrianchen-es adrianchen-es changed the title [kafka log] Initial commit to enable metric ingestion for Kafka [kafka log] Initial commit to enable custom metric ingestion for Kafka Apr 8, 2026
@adrianchen-es adrianchen-es requested a review from a team as a code owner April 8, 2026 09:54
Comment thread packages/kafka_log/_dev/deploy/docker/docker-compose.yml Outdated
@andrewkroh andrewkroh added documentation Improvements or additions to documentation. Applied to PRs that modify *.md files. Integration:kafka_log Custom Kafka Logs Team:Obs-InfraObs Observability Infrastructure Monitoring team [elastic/obs-infraobs-integrations] labels Apr 8, 2026
@elastic-vault-github-plugin-prod
Copy link
Copy Markdown

elastic-vault-github-plugin-prod Bot commented Apr 8, 2026

🚀 Benchmarks report

To see the full report comment with /test benchmark fullreport

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

/test

@elasticmachine
Copy link
Copy Markdown

elasticmachine commented Apr 9, 2026

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

I am not 100% clear on the test failures for the integration test for kafka_log - https://buildkite.com/elastic/integrations/builds/41142#019d7164-02ba-44b9-96d7-9e5ed4848465, as a local test succeeds.
image

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

/test Check integrations kafka_log

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

/test stack 9.3.2

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

@stefans-elastic, any pointers on the integration test error or other changes that are required to progress this?
Just re-ran elastic-package test system -v within the kafka_log package locally, and it passes.

image

@stefans-elastic
Copy link
Copy Markdown
Contributor

@adrianchen-es

Unfortunately I wasn't able to test it locally (because The requested image's platform (linux/amd64) does not match the detected host platform (linux/arm64/v8) and no specific platform was requested). And I don't see obvious reason for the failure. However I got AI to look at it and here is the result(I hope it is helpful):

Root Cause: kafka_log.metrics CI test failure

In CI, all datastream tests in a package share the same Kafka broker, while locally each test gets its own fresh broker. The generic test runs first and consumes all messages from
testTopic under group_id: system_test, committing the offset to the end of the log. When the metrics test starts with the identical topics: [testTopic] + group_id: system_test, Kafka
returns the committed offset — nothing to read — and the test times out after 642s.

Fix: At minimum, change group_id: system_test → group_id: system_test_metrics in data_stream/metrics/_dev/test/system/test-kafka-config.yml. Ideally also add a dedicated kafka-metrics
docker service writing metric-shaped JSON ({"metric":{"name":"cpu.usage","value":0.42}}) to a separate testTopicMetrics topic to fully isolate the two tests.

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

@adrianchen-es

Unfortunately I wasn't able to test it locally (because The requested image's platform (linux/amd64) does not match the detected host platform (linux/arm64/v8) and no specific platform was requested). And I don't see obvious reason for the failure. However I got AI to look at it and here is the result(I hope it is helpful):

Root Cause: kafka_log.metrics CI test failure

In CI, all datastream tests in a package share the same Kafka broker, while locally each test gets its own fresh broker. The generic test runs first and consumes all messages from testTopic under group_id: system_test, committing the offset to the end of the log. When the metrics test starts with the identical topics: [testTopic] + group_id: system_test, Kafka returns the committed offset — nothing to read — and the test times out after 642s.

Fix: At minimum, change group_id: system_test → group_id: system_test_metrics in data_stream/metrics/_dev/test/system/test-kafka-config.yml. Ideally also add a dedicated kafka-metrics docker service writing metric-shaped JSON ({"metric":{"name":"cpu.usage","value":0.42}}) to a separate testTopicMetrics topic to fully isolate the two tests.

@stefans-elastic, that was already done, but the results were the same - c787fbb
I reverted that as it made no material difference (at least for the non-local test).

Both local logs and the CI test logs suggest new containers were created.

image image

I will nevertheless update the consumer group and see if the CI improves.

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

adrianchen-es commented Apr 10, 2026

Tried with a different consumer group again with the same CI results.
image

Comment thread packages/kafka_input/manifest.yml Outdated
Comment thread packages/kafka_log/data_stream/metrics/_dev/deploy/docker/docker-compose.yml Outdated
@adrianchen-es
Copy link
Copy Markdown
Contributor Author

@stefans-elastic , any insight? The test container is now at the data stream level, the test data is now tailored, there is a different topic and consumer group. I've reduced the wait_for_data_timeout since successes are within 10-15s

@stefans-elastic
Copy link
Copy Markdown
Contributor

@adrianchen-es not much yet. But I've figured a way to run the tests locally the way they are run in CI (and they indeed fail). I'm trying to troubleshoot.
I can share the setup in case you want to reproduce this locally:
file local-test.env:

export YQ_VERSION=v4.35.2
export SETUP_GVM_VERSION=v0.6.0
export JQ_VERSION=1.7
export GH_CLI_VERSION=2.29.0
export KIND_VERSION=v0.27.0
export K8S_VERSION=v1.33.0
export SERVERLESS=false
export STACK_VERSION=8.17.0
export UPLOAD_SAFE_LOGS=0

then run the command:

source local-test.env && .buildkite/scripts/test_one_package.sh kafka_log

@stefans-elastic
Copy link
Copy Markdown
Contributor

@adrianchen-es
I've managed to get it working locally. Here is patch:

diff --git a/packages/kafka_log/data_stream/metrics/_dev/test/system/test-kafka_metric-config.yml b/packages/kafka_log/data_stream/metrics/_dev/test/system/test-kafka_metric-config.yml
index c428ce9a9b..451896f0c2 100644
--- a/packages/kafka_log/data_stream/metrics/_dev/test/system/test-kafka_metric-config.yml
+++ b/packages/kafka_log/data_stream/metrics/_dev/test/system/test-kafka_metric-config.yml
@@ -1,6 +1,6 @@
 service: kafka-svc
 input: kafka
-wait_for_data_timeout: 30s
+wait_for_data_timeout: 5m
 assert:
   min_count: 2
 data_stream:
diff --git a/packages/kafka_log/data_stream/metrics/fields/fields.yml b/packages/kafka_log/data_stream/metrics/fields/fields.yml
index d97c400b0d..1be49ebe18 100644
--- a/packages/kafka_log/data_stream/metrics/fields/fields.yml
+++ b/packages/kafka_log/data_stream/metrics/fields/fields.yml
@@ -6,11 +6,9 @@
   fields:
     - name: name
       type: keyword
-      dimension: true
       description: Metric name extracted from the JSON payload.
     - name: value
       type: double
-      metric_type: gauge
       description: Metric value extracted from the JSON payload.
 - name: kafka.headers
   description: Included Kafka headers
diff --git a/packages/kafka_log/data_stream/metrics/manifest.yml b/packages/kafka_log/data_stream/metrics/manifest.yml
index ab31ed01e0..14ffa3fc3d 100644
--- a/packages/kafka_log/data_stream/metrics/manifest.yml
+++ b/packages/kafka_log/data_stream/metrics/manifest.yml
@@ -1,5 +1,5 @@
 title: Custom Kafka Metrics
-type: metrics
+type: logs
 streams:
   - input: kafka
     description: Collect metric data from Kafka topic with Elastic Agent.
@@ -272,5 +272,3 @@ streams:
 elasticsearch:
   dynamic_dataset: true
   dynamic_namespace: true
-  source_mode: synthetic
-  index_mode: time_series

Comments on the changes (AI generated):

manifest.yml: type: metrics → type: logs

The Filebeat kafka input is classified by Fleet as a logs-type input regardless of what the manifest declares. Fleet's processor always injects data_stream.type: logs into every event.
Meanwhile, elastic-package generates a component template from the manifest's declared type, setting data_stream.type as a constant_keyword with value: metrics. When a document arrives
with type: logs but the index template expects type: metrics, Elasticsearch rejects it with a 400. Because data streams are created lazily on the first successful document, no document
ever lands, the data stream is never created, and the test sees 0 hits. Changing the declared type to logs aligns the component template with what Fleet actually sends.


manifest.yml: remove source_mode: synthetic and index_mode: time_series

index_mode: time_series (TSDB) requires every document to contain all routing dimension fields — missing dimensions are rejected. source_mode: synthetic reconstructs _source from stored
fields rather than storing it verbatim, which requires strict mapping compatibility. Both are advanced modes intended for purpose-built metrics pipelines (counters, gauges with fixed
cardinality). A kafka log input that relays arbitrary JSON messages is not that pipeline — these modes only add rejection surface without benefit.


fields.yml: remove dimension: true and metric_type: gauge

dimension: true is the TSDB routing path annotation — it only makes sense when index_mode: time_series is active. With TSDB removed, the field is just a plain keyword. metric_type: gauge
is an ES mapping parameter that only has meaning (and causes issues) inside a TSDB index. Removing both keeps the field definitions clean and avoids mapping conflicts if the package is
ever installed against a stack that enforces stricter TSDB validation.


test-kafka_metric-config.yml: wait_for_data_timeout: 30s → 5m

On a fresh stack, the test environment needs to: start Kafka, produce messages, have Kibana finish uploading the package (component templates + ingest pipeline), have Fleet push the
policy to the agent, have the agent connect to Kafka and consume, and have Elasticsearch index at least 2 documents. 30 seconds is not enough headroom for all of that, especially on CI
where Kibana's Fleet upload rate-limiter can delay package installation. 5 minutes gives the stack enough time to stabilize before the assertion fires.

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

Hey @stefans-elastic .
Changing it to log type defeats the purpose of the PR? Or would the data stream type remain?

If the failure is due to hardcoding in filebeat, it should also fail in my local environment? Given it isn't customised.

I'll tweak it aligning to your patch and see what the CI test returns with

@adrianchen-es adrianchen-es changed the title [kafka] Initial commit to enable metric ingestion via Kafka input [kafka_log] Update integration to input type to provide multi-signal support Apr 30, 2026
@andrewkroh andrewkroh added the Integration:kafka_log Custom Kafka Logs label Apr 30, 2026
Comment thread packages/kafka_log/manifest.yml Outdated
Comment thread packages/kafka_log/docs/README.md Outdated
show_user: false
- name: isolation_level
type: text
title: Isolation Level
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i validated that this config is an exact copy-paste of the orignal data stream manifest, and it is, with the exception of this one line, which was an error in the original 😄. nice !

@ishleenk17
Copy link
Copy Markdown
Member

One thing which we should verify is that the upgrade scenario works. Since the input config file moved from datastream level to the root level of the folder.

@stefans-elastic : Could you pls help with this validation ?

Copy link
Copy Markdown
Contributor

@tommyers-elastic tommyers-elastic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm generally happy with this change. we just need to validate that existing package users can upgrade to the new input package with no breaking changes or weird behaviour.

the only other thing that's a potential blocker for me is the kibana compatibility changes.

@stefans-elastic
Copy link
Copy Markdown
Contributor

I've tested "Upgrade" scenario locally. Everything went well - no errors occurred.
I ingested 32 events before upgrade and 33 events post upgrade and all 65 documents are present in ES:
Screenshot 2026-04-30 at 16 47 56

cc @ishleenk17 @tommyers-elastic

@elasticmachine
Copy link
Copy Markdown

💚 Build Succeeded

History

cc @adrianchen-es

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

/test stack 8.19.14

Comment thread packages/kafka_log/_dev/build/docs/README.md Outdated
@ishleenk17
Copy link
Copy Markdown
Member

1 nit, else looks good!

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

adrianchen-es commented May 1, 2026

@ishleenk17 , @tommyers-elastic or @stefans-elastic
Thank you for the review and approval.

Could I please get a final look before I merge?
I updated the README per the last comment.

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

Thanks @ishleenk17
Renamed it to Custom Kafka Integration

Comment thread packages/kafka_log/_dev/build/docs/README.md Outdated
adrianchen-es and others added 2 commits May 1, 2026 15:30
Co-authored-by: Ishleen Kaur <102962586+ishleenk17@users.noreply.github.com>
@elasticmachine
Copy link
Copy Markdown

💚 Build Succeeded

History

cc @adrianchen-es

@adrianchen-es adrianchen-es merged commit 742e7ec into elastic:main May 1, 2026
12 checks passed
@elastic-vault-github-plugin-prod
Copy link
Copy Markdown

Package kafka_log - 2.0.0 containing this change is available at https://epr.elastic.co/package/kafka_log/2.0.0/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Improvements or additions to documentation. Applied to PRs that modify *.md files. enhancement New feature or request >enhancement Integration:kafka_log Custom Kafka Logs Team:Obs-InfraObs Observability Infrastructure Monitoring team [elastic/obs-infraobs-integrations]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Kafka Log]: Configurable data_stream type

7 participants