Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 99 additions & 9 deletions packages/kafka_log/_dev/build/docs/README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,107 @@
# Custom Kafka Log integration
# Custom Kafka Integration

The custom Kafka log integration is used to read from topics in a Kafka cluster.
## Overview

To configure this integration, specify a list of one or more hosts in the cluster to bootstrap the connection with, a list of topics to track, and a group_id for the connection.
The Custom Kafka integration is an **input** package for Elastic Agent. It runs the Filebeat **Kafka** input so agents can **consume** records from Apache Kafka topics and ship them to Elasticsearch. Use it when applications or pipelines already publish logs or events to Kafka and you want Elastic to read from those topics without an intermediate forwarder.

### Compatibility

## Compatibility
This Integration works with all Kafka versions in between 0.11 and 2.8.0. Older versions might work as well, but are not supported.
This integration is intended for Kafka clusters where brokers speak the standard Kafka protocol. It is tested and supported for Kafka broker versions roughly between **0.11** and **2.8.0**. Earlier or later brokers can work but are not guaranteed.

### How it works

## Ingest Pipelines
Custom ingest pipelines may be added by adding the name to the pipeline configuration option, creating custom ingest pipelines can be done either through the API or the [Ingest Node Pipeline UI](/app/management/ingest/ingest_pipelines/).
Elastic Agent connects to your cluster using the **bootstrap hosts** you configure, joins a **consumer group** (`group_id`), and subscribes to one or more **topics**. Messages are read by the agent, enriched with Kafka metadata (for example topic, partition, offset), and written to Elasticsearch using the **dataset** name you choose (default `kafka_log.generic`). Optional **SASL**, **Kerberos**, and **TLS** settings secure the connection to the brokers. Optional **parsers** and **processors** adjust the payload on the agent before ingest.

**ECS Field Reference**
## What data does this integration collect?

Please refer to the following [document](https://www.elastic.co/guide/en/ecs/current/ecs-field-reference.html) for detailed information on ECS fields.
The integration collects **events** derived from Kafka messages:

- **Message payload**: Typically stored in the `message` field (format depends on your producers—plain text, JSON, syslog, and so on).
- **Kafka metadata**: Fields such as `kafka.topic`, `kafka.partition`, `kafka.offset`, `kafka.key`, and `kafka.headers` where applicable.
- **Routing fields**: `data_stream.dataset`, `data_stream.type`, and `data_stream.namespace` follow your Fleet policy and dataset name.

The default **dataset** is `kafka_log.generic`. Changing the **Dataset name** in the policy sends data to a different backing data stream. Dataset names must follow Elasticsearch naming rules (no `-` in the dataset segment).

### Supported use cases

- Ingest logs or telemetry already landed on Kafka by microservices or stream processors.
- Centralize topic data for search and observability in Kibana without maintaining a separate log shipper per producer.
- Apply a custom **Ingest Pipeline** in Elasticsearch when you need parsing or ECS normalization beyond agent-side parsers.

## What do I need to use this integration?

### Kafka prerequisites

- **Network reachability** from the host running Elastic Agent to each configured bootstrap broker (hostnames and ports).
- **Topic access**: ACLs or permissions that allow your consumer **group** to read the configured topics.
- **Authentication details** if the cluster uses SASL (PLAIN, SCRAM), Kerberos, or TLS client certificates—match these to your broker configuration.

### Elastic prerequisites

- A stack version that satisfies the integration’s **Kibana** requirement (refer to the integration manifest in Kibana or this package’s `manifest.yml`).

## How do I deploy this integration?

### Agent-based deployment

Elastic Agent runs the Kafka input and forwards events to Elasticsearch. Install the agent on a host that can reach your Kafka brokers (same VPC or routed network, firewall rules allowing outbound connections to broker listeners).

### Set up steps for Kafka

1. Identify **bootstrap broker addresses** (for example `kafka1:9092`) and the **topic names** to consume.
2. Choose a unique **consumer group id** (`group_id`) for this policy integration—duplicate group membership affects partition assignment when multiple agents share the same group.
3. If the cluster uses TLS or SASL, gather certificates, credentials, or Kerberos configuration paths before editing the integration.

### Set up steps in Kibana

1. Go to **Management → Integrations**.
2. Search for **Custom Kafka Logs** and open it.
3. Click **Add Custom Kafka Logs** (or add the integration to an existing policy).
4. Configure the main options:
- **Hosts**: Bootstrap servers for the Kafka cluster.
- **Topics**: Topics to subscribe to.
- **Group ID**: Consumer group for this input.
- **Dataset name**: Target dataset (default `kafka_log.generic`).
- **Client ID**, **Kafka protocol version**, **initial offset**, fetch/rebalance tuning: expand **Advanced options** when needed.
5. Configure **SSL**, **SASL**, or **Kerberos** under advanced sections if your brokers require them.
6. Optionally set **Parsers** (for example NDJSON) or **Processors**, and **Tags**.
7. Optionally set **Ingest Pipeline** to an Elasticsearch pipeline ID for server-side processing.
8. Save the policy and confirm the agent receives the updated configuration.

### Validation

1. Produce a test message to one of the configured topics (use your usual producer tooling or `kafka-console-producer`).
2. In Kibana, open **Analytics → Discover** and select a logs-related data view (for example `logs-*`).
3. Filter with KQL, for example: `data_stream.dataset : "kafka_log.generic"` — adjust to match your configured **Dataset name** (default `kafka_log.generic`).
4. Confirm fields such as `message`, `kafka.topic`, `input.type` (`kafka`), and timestamps look correct.

## Troubleshooting

For help with Elastic ingest tools, refer to [Common problems](https://www.elastic.co/docs/troubleshoot/ingest/fleet/common-problems).

### Common configuration issues

- **Connection or timeout errors**: Verify broker addresses, ports, TLS (`ssl.enabled`), and that firewalls allow outbound traffic from the agent host to every bootstrap broker.
- **Authentication failures**: Confirm SASL mechanism, username/password, or Kerberos settings align with the broker, check broker logs for `Authentication failed` or similar.
- **No documents in Discover**: Confirm the agent is healthy, the policy applied, and the **Dataset name** matches your Discover filter. Dataset names **must not** contain hyphens.
- **Duplicate or competing consumers**: Using the same `group_id` on many agents splits partitions across them by design, use distinct groups if you need full duplicate reads.
- **Offset / replay behavior**: `initial_offset` (for example `oldest` vs `newest`) affects where consumption starts for new groups. Changing `group_id` starts a new consumer group offset state.
- **Parsing issues**: If JSON or multiline payloads look wrong, review **Parsers** and consider an Elasticsearch **Ingest Pipeline** for complex structures.

## Performance and scaling

For architectures used to scale ingest, refer to [Ingest Architectures](https://www.elastic.co/docs/manage-data/ingest/ingest-reference-architectures).

- **Throughput**: Kafka throughput scales with **partitions** and consumer parallelism, multiple agents with the **same** `group_id` share partitions (one consumer per partition per group).
- **Fetch settings**: Tune **fetch** sizes and **max_wait_time** in advanced options if you need higher batching or lower latency—balance broker load and agent memory.
- **Multiple integrations**: Separate policies or dataset names help isolate indices and retention for different topic groups.
- **Elasticsearch**: Size your cluster for the volume of documents and consider ingest pipelines and index lifecycle policies for hot/warm tiers.

## Reference

Refer to the [ECS field reference](https://www.elastic.co/guide/en/ecs/current/ecs-field-reference.html) for ECS fields.

Additional documentation:

- [Filebeat Kafka input](https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-kafka.html)
- [Filebeat SSL settings](https://www.elastic.co/guide/en/beats/filebeat/current/configuration-ssl.html#ssl-common-config)
38 changes: 25 additions & 13 deletions packages/kafka_log/_dev/deploy/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,21 +1,33 @@
version: '2.3'
services:
kafka-service:
image: bashj79/kafka-kraft
healthcheck:
test: nc -z kafka-service 9094 || exit -1
interval: 10s
timeout: 5s
retries: 3
start_period: 10s
# Official Apache Kafka image (KRaft). See https://hub.docker.com/r/apache/kafka
image: apache/kafka:4.1.2
hostname: kafka-service
environment:
KAFKA_LISTENERS: "INTERNAL://kafka-service:9092,EXTERNAL://:9094, CONTROLLER://:9093"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:9092,EXTERNAL://kafka-service:9094"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT, CONTROLLER:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
ports:
- 9094
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
# Client traffic on 9094; controller on 9093 (single-node KRaft quorum)
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-service:9094
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-service:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
healthcheck:
test:
[
"CMD-SHELL",
"/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server 127.0.0.1:9094 || exit 1",
]
interval: 10s
timeout: 10s
retries: 6
start_period: 45s
kafka-generic:
image: docker.elastic.co/observability/stream:v0.18.0
volumes:
Expand Down
9 changes: 9 additions & 0 deletions packages/kafka_log/_dev/test/system/test-kafka-config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
service: kafka-service
input: kafka
vars:
data_stream.dataset: kafka_log.generic
topics:
- testTopic
hosts:
- "{{Hostname}}:{{Port}}"
group_id: system_test
5 changes: 5 additions & 0 deletions packages/kafka_log/changelog.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
- version: "2.0.0"
changes:
- description: Convert kafka_log to an input package.
type: enhancement
link: https://github.com/elastic/integrations/pull/18266
- version: "1.9.1"
changes:
- description: Updated the version field description to clarify default protocol versions.
Expand Down

This file was deleted.

Loading
Loading