diff --git a/packages/kafka_log/_dev/build/docs/README.md b/packages/kafka_log/_dev/build/docs/README.md index 0148a95a816..529ad24b8b3 100644 --- a/packages/kafka_log/_dev/build/docs/README.md +++ b/packages/kafka_log/_dev/build/docs/README.md @@ -1,17 +1,107 @@ -# Custom Kafka Log integration +# Custom Kafka Integration -The custom Kafka log integration is used to read from topics in a Kafka cluster. +## Overview -To configure this integration, specify a list of one or more hosts in the cluster to bootstrap the connection with, a list of topics to track, and a group_id for the connection. +The Custom Kafka integration is an **input** package for Elastic Agent. It runs the Filebeat **Kafka** input so agents can **consume** records from Apache Kafka topics and ship them to Elasticsearch. Use it when applications or pipelines already publish logs or events to Kafka and you want Elastic to read from those topics without an intermediate forwarder. +### Compatibility -## Compatibility -This Integration works with all Kafka versions in between 0.11 and 2.8.0. Older versions might work as well, but are not supported. +This integration is intended for Kafka clusters where brokers speak the standard Kafka protocol. It is tested and supported for Kafka broker versions roughly between **0.11** and **2.8.0**. Earlier or later brokers can work but are not guaranteed. +### How it works -## Ingest Pipelines -Custom ingest pipelines may be added by adding the name to the pipeline configuration option, creating custom ingest pipelines can be done either through the API or the [Ingest Node Pipeline UI](/app/management/ingest/ingest_pipelines/). +Elastic Agent connects to your cluster using the **bootstrap hosts** you configure, joins a **consumer group** (`group_id`), and subscribes to one or more **topics**. Messages are read by the agent, enriched with Kafka metadata (for example topic, partition, offset), and written to Elasticsearch using the **dataset** name you choose (default `kafka_log.generic`). Optional **SASL**, **Kerberos**, and **TLS** settings secure the connection to the brokers. Optional **parsers** and **processors** adjust the payload on the agent before ingest. -**ECS Field Reference** +## What data does this integration collect? -Please refer to the following [document](https://www.elastic.co/guide/en/ecs/current/ecs-field-reference.html) for detailed information on ECS fields. \ No newline at end of file +The integration collects **events** derived from Kafka messages: + +- **Message payload**: Typically stored in the `message` field (format depends on your producers—plain text, JSON, syslog, and so on). +- **Kafka metadata**: Fields such as `kafka.topic`, `kafka.partition`, `kafka.offset`, `kafka.key`, and `kafka.headers` where applicable. +- **Routing fields**: `data_stream.dataset`, `data_stream.type`, and `data_stream.namespace` follow your Fleet policy and dataset name. + +The default **dataset** is `kafka_log.generic`. Changing the **Dataset name** in the policy sends data to a different backing data stream. Dataset names must follow Elasticsearch naming rules (no `-` in the dataset segment). + +### Supported use cases + +- Ingest logs or telemetry already landed on Kafka by microservices or stream processors. +- Centralize topic data for search and observability in Kibana without maintaining a separate log shipper per producer. +- Apply a custom **Ingest Pipeline** in Elasticsearch when you need parsing or ECS normalization beyond agent-side parsers. + +## What do I need to use this integration? + +### Kafka prerequisites + +- **Network reachability** from the host running Elastic Agent to each configured bootstrap broker (hostnames and ports). +- **Topic access**: ACLs or permissions that allow your consumer **group** to read the configured topics. +- **Authentication details** if the cluster uses SASL (PLAIN, SCRAM), Kerberos, or TLS client certificates—match these to your broker configuration. + +### Elastic prerequisites + +- A stack version that satisfies the integration’s **Kibana** requirement (refer to the integration manifest in Kibana or this package’s `manifest.yml`). + +## How do I deploy this integration? + +### Agent-based deployment + +Elastic Agent runs the Kafka input and forwards events to Elasticsearch. Install the agent on a host that can reach your Kafka brokers (same VPC or routed network, firewall rules allowing outbound connections to broker listeners). + +### Set up steps for Kafka + +1. Identify **bootstrap broker addresses** (for example `kafka1:9092`) and the **topic names** to consume. +2. Choose a unique **consumer group id** (`group_id`) for this policy integration—duplicate group membership affects partition assignment when multiple agents share the same group. +3. If the cluster uses TLS or SASL, gather certificates, credentials, or Kerberos configuration paths before editing the integration. + +### Set up steps in Kibana + +1. Go to **Management → Integrations**. +2. Search for **Custom Kafka Logs** and open it. +3. Click **Add Custom Kafka Logs** (or add the integration to an existing policy). +4. Configure the main options: + - **Hosts**: Bootstrap servers for the Kafka cluster. + - **Topics**: Topics to subscribe to. + - **Group ID**: Consumer group for this input. + - **Dataset name**: Target dataset (default `kafka_log.generic`). + - **Client ID**, **Kafka protocol version**, **initial offset**, fetch/rebalance tuning: expand **Advanced options** when needed. +5. Configure **SSL**, **SASL**, or **Kerberos** under advanced sections if your brokers require them. +6. Optionally set **Parsers** (for example NDJSON) or **Processors**, and **Tags**. +7. Optionally set **Ingest Pipeline** to an Elasticsearch pipeline ID for server-side processing. +8. Save the policy and confirm the agent receives the updated configuration. + +### Validation + +1. Produce a test message to one of the configured topics (use your usual producer tooling or `kafka-console-producer`). +2. In Kibana, open **Analytics → Discover** and select a logs-related data view (for example `logs-*`). +3. Filter with KQL, for example: `data_stream.dataset : "kafka_log.generic"` — adjust to match your configured **Dataset name** (default `kafka_log.generic`). +4. Confirm fields such as `message`, `kafka.topic`, `input.type` (`kafka`), and timestamps look correct. + +## Troubleshooting + +For help with Elastic ingest tools, refer to [Common problems](https://www.elastic.co/docs/troubleshoot/ingest/fleet/common-problems). + +### Common configuration issues + +- **Connection or timeout errors**: Verify broker addresses, ports, TLS (`ssl.enabled`), and that firewalls allow outbound traffic from the agent host to every bootstrap broker. +- **Authentication failures**: Confirm SASL mechanism, username/password, or Kerberos settings align with the broker, check broker logs for `Authentication failed` or similar. +- **No documents in Discover**: Confirm the agent is healthy, the policy applied, and the **Dataset name** matches your Discover filter. Dataset names **must not** contain hyphens. +- **Duplicate or competing consumers**: Using the same `group_id` on many agents splits partitions across them by design, use distinct groups if you need full duplicate reads. +- **Offset / replay behavior**: `initial_offset` (for example `oldest` vs `newest`) affects where consumption starts for new groups. Changing `group_id` starts a new consumer group offset state. +- **Parsing issues**: If JSON or multiline payloads look wrong, review **Parsers** and consider an Elasticsearch **Ingest Pipeline** for complex structures. + +## Performance and scaling + +For architectures used to scale ingest, refer to [Ingest Architectures](https://www.elastic.co/docs/manage-data/ingest/ingest-reference-architectures). + +- **Throughput**: Kafka throughput scales with **partitions** and consumer parallelism, multiple agents with the **same** `group_id` share partitions (one consumer per partition per group). +- **Fetch settings**: Tune **fetch** sizes and **max_wait_time** in advanced options if you need higher batching or lower latency—balance broker load and agent memory. +- **Multiple integrations**: Separate policies or dataset names help isolate indices and retention for different topic groups. +- **Elasticsearch**: Size your cluster for the volume of documents and consider ingest pipelines and index lifecycle policies for hot/warm tiers. + +## Reference + +Refer to the [ECS field reference](https://www.elastic.co/guide/en/ecs/current/ecs-field-reference.html) for ECS fields. + +Additional documentation: + +- [Filebeat Kafka input](https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-kafka.html) +- [Filebeat SSL settings](https://www.elastic.co/guide/en/beats/filebeat/current/configuration-ssl.html#ssl-common-config) diff --git a/packages/kafka_log/_dev/deploy/docker/docker-compose.yml b/packages/kafka_log/_dev/deploy/docker/docker-compose.yml index 78839c34e2e..9e90ec628e4 100644 --- a/packages/kafka_log/_dev/deploy/docker/docker-compose.yml +++ b/packages/kafka_log/_dev/deploy/docker/docker-compose.yml @@ -1,21 +1,33 @@ -version: '2.3' services: kafka-service: - image: bashj79/kafka-kraft - healthcheck: - test: nc -z kafka-service 9094 || exit -1 - interval: 10s - timeout: 5s - retries: 3 - start_period: 10s + # Official Apache Kafka image (KRaft). See https://hub.docker.com/r/apache/kafka + image: apache/kafka:4.1.2 hostname: kafka-service - environment: - KAFKA_LISTENERS: "INTERNAL://kafka-service:9092,EXTERNAL://:9094, CONTROLLER://:9093" - KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:9092,EXTERNAL://kafka-service:9094" - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT, CONTROLLER:PLAINTEXT" - KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL ports: - 9094 + environment: + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + # Client traffic on 9094; controller on 9093 (single-node KRaft quorum) + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094,CONTROLLER://0.0.0.0:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-service:9094 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-service:9093 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + healthcheck: + test: + [ + "CMD-SHELL", + "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server 127.0.0.1:9094 || exit 1", + ] + interval: 10s + timeout: 10s + retries: 6 + start_period: 45s kafka-generic: image: docker.elastic.co/observability/stream:v0.18.0 volumes: diff --git a/packages/kafka_log/_dev/test/system/test-kafka-config.yml b/packages/kafka_log/_dev/test/system/test-kafka-config.yml new file mode 100644 index 00000000000..75f7cdf430d --- /dev/null +++ b/packages/kafka_log/_dev/test/system/test-kafka-config.yml @@ -0,0 +1,9 @@ +service: kafka-service +input: kafka +vars: + data_stream.dataset: kafka_log.generic + topics: + - testTopic + hosts: + - "{{Hostname}}:{{Port}}" + group_id: system_test diff --git a/packages/kafka_log/data_stream/generic/agent/stream/kafka.yml.hbs b/packages/kafka_log/agent/input/kafka.yml.hbs similarity index 100% rename from packages/kafka_log/data_stream/generic/agent/stream/kafka.yml.hbs rename to packages/kafka_log/agent/input/kafka.yml.hbs diff --git a/packages/kafka_log/changelog.yml b/packages/kafka_log/changelog.yml index d901c6d6ce6..f1957c0a4ad 100644 --- a/packages/kafka_log/changelog.yml +++ b/packages/kafka_log/changelog.yml @@ -1,3 +1,8 @@ +- version: "2.0.0" + changes: + - description: Convert kafka_log to an input package. + type: enhancement + link: https://github.com/elastic/integrations/pull/18266 - version: "1.9.1" changes: - description: Updated the version field description to clarify default protocol versions. diff --git a/packages/kafka_log/data_stream/generic/_dev/test/system/test-kafka-config.yml b/packages/kafka_log/data_stream/generic/_dev/test/system/test-kafka-config.yml deleted file mode 100644 index e41586e3896..00000000000 --- a/packages/kafka_log/data_stream/generic/_dev/test/system/test-kafka-config.yml +++ /dev/null @@ -1,9 +0,0 @@ -service: kafka-service -input: kafka -data_stream: - vars: - topics: - - testTopic - hosts: - - "{{Hostname}}:{{Port}}" - group_id: system_test diff --git a/packages/kafka_log/data_stream/generic/manifest.yml b/packages/kafka_log/data_stream/generic/manifest.yml deleted file mode 100644 index 7095a2bdfca..00000000000 --- a/packages/kafka_log/data_stream/generic/manifest.yml +++ /dev/null @@ -1,274 +0,0 @@ -title: Custom Kafka Logs -type: logs -streams: - - input: kafka - description: Collect data from Kafka topic with Elastic Agent. - title: Custom Kafka Logs - template_path: kafka.yml.hbs - vars: - - name: hosts - type: text - title: Hosts - description: | - A list of Kafka bootstrapping hosts (brokers) for this cluster. - required: true - show_user: true - multi: true - - name: topics - type: text - title: Topics - description: | - A list of topics to read from. - required: true - show_user: true - multi: true - - name: data_stream.dataset - type: text - title: Dataset name - description: | - Dataset to write data to. Changing the dataset will send the data to a different index. You can't use `-` in the name of a dataset and only valid characters for [Elasticsearch index names](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html). - default: kafka_log.generic - required: true - show_user: true - - name: pipeline - type: text - title: Ingest Pipeline - description: | - The Ingest Node pipeline ID to be used by the integration. - required: false - show_user: true - - name: group_id - type: text - title: Group ID - description: The Kafka consumer group id. - required: true - show_user: true - - name: client_id - type: text - title: Client ID - description: The Kafka client id (optional). - required: false - show_user: true - - name: version - type: text - title: Version - description: The version of the Kafka protocol to use (defaults to "1.0.0" in 8.x and "2.1.0" in 9.0+). - required: false - show_user: true - - name: expand_event_list_from_field - type: text - title: Expand Event List from Field - description: Split a field that contains an array of JSON objects, the value would be the name of this field. - required: false - show_user: true - - name: parsers - type: yaml - title: Parsers - description: | - This option expects a list of parsers that the payload has to go through. For more information see [Parsers](https://www.elastic.co/guide/en/beats/filebeat/8.0/filebeat-input-kafka.html#_parsers_2) - required: false - show_user: true - multi: false - default: | - #- ndjson: - # keys_under_root: true - # message_key: msg - #- multiline: - # type: counter - # lines_count: 3 - - name: sasl_mechanism - title: SASL Mechanism - description: SASL Auth Mechanism - type: select - options: - - text: None - value: "" - - text: PLAIN - value: PLAIN - - text: SCRAM-SHA-256 - value: SCRAM-SHA-256 - - text: SCRAM-SHA-512 - value: SCRAM-SHA-512 - default: "" - required: false - show_user: true - - name: username - type: text - title: Username - description: Username used for SASL authentication. - required: false - show_user: true - - name: password - type: password - title: Password - secret: true - description: Password used for SASL authentication. - required: false - show_user: true - - name: kerberos_enabled - type: bool - title: Kerberos Enabled - description: The enabled setting can be used to enable the Kerberos configuration by setting it to true. The default value is false. - required: false - show_user: false - - name: kerberos_auth_type - type: text - title: Kerberos Auth Type - description: | - There are two options to authenticate with Kerberos KDC: password and keytab. - Password expects the principal name and its password. When choosing keytab, you have to specify a principal name and a path to a keytab. The keytab must contain the keys of the selected principal. Otherwise, authentication will fail. - required: false - show_user: false - - name: kerberos_config_path - type: text - title: Kerberos Config Path - description: You need to set the path to the krb5.conf, so Elastic Agent can find the Kerberos KDC to retrieve a ticket. - required: false - show_user: false - - name: kerberos_username - type: text - title: Kerberos Username - description: Name of the principal used to connect to the output. - required: false - show_user: false - - name: kerberos_password - type: password - title: Kerberos Password - secret: true - description: If you configured password for Auth Type, you have to provide a password for the selected principal. - required: false - show_user: false - - name: kerberos_keytab - type: text - title: Kerberos Keytab - description: If you configured keytab for Auth Type, you have to provide the path to the keytab of the selected principal. - required: false - show_user: false - - name: kerberos_service_name - type: text - title: Kerberos Service Name - description: This option can only be configured for Kafka. It is the name of the Kafka service, usually "kafka". - required: false - show_user: false - - name: kerberos_realm - type: text - title: Kerberos Realm - description: Name of the realm where the output resides. - required: false - show_user: false - - name: kerberos_enable_krb5_fast - type: bool - title: Kerberos KRB5 Fast - description: Enable Kerberos FAST authentication. This may conflict with some Active Directory installations. The default is false. - required: false - show_user: false - - name: initial_offset - type: text - title: Initial Offset - description: The initial offset to start reading, either "oldest" or "newest". Defaults to "oldest". - required: false - show_user: false - - name: connect_backoff - type: text - title: Connect Backoff - description: How long to wait before trying to reconnect to the kafka cluster after a fatal error. Default is 30s. - required: false - show_user: false - - name: consume_backoff - type: text - title: Consume Backoff - description: How long to wait before retrying a failed read. Default is 2s. - required: false - show_user: false - - name: max_wait_time - type: text - title: Max Wait Time - description: How long to wait for the minimum number of input bytes while reading. Default is 250ms. - required: false - show_user: false - - name: wait_close - type: text - title: Wait Close - description: When shutting down, how long to wait for in-flight messages to be delivered and acknowledged. - required: false - show_user: false - - name: isolation_level - type: text - title: Wait Close - description: This configures the Kafka group isolation level, supports the values "read_uncommitted" which returns all messages in the message channel and "read_committed" which hides messages that are part of an aborted transaction. The default is "read_uncommitted". - required: false - show_user: false - - name: fetch_min - type: text - title: Fetch Min - description: The minimum number of bytes to wait for. Defaults to 1. - required: false - show_user: false - - name: fetch_default - type: text - title: Fetch Default - description: The default number of bytes to read per request. Defaults to 1MB. - required: false - show_user: false - - name: fetch_max - type: text - title: Fetch Max - description: The maximum number of bytes to read per request. Defaults to 0 (no limit). - required: false - show_user: false - - name: rebalance_strategy - type: text - title: Rebalance Strategy - description: Either "range" or "roundrobin". Defaults to "range". - required: false - show_user: false - - name: rebalance_timeout - type: text - title: Rebalance Timeout - description: How long to wait for an attempted rebalance. Defaults to 60s. - required: false - show_user: false - - name: rebalance_max_retries - type: text - title: Rebalance Max Retries - description: How many times to retry if rebalancing fails. Defaults to 4. - required: false - show_user: false - - name: rebalance_retry_backoff - type: text - title: Rebalance Retry Backoff - description: How long to wait after an unsuccessful rebalance attempt. Defaults to 2s. - required: false - show_user: false - - name: ssl - type: yaml - title: SSL Configuration - description: SSL configuration options. See [documentation](https://www.elastic.co/guide/en/beats/filebeat/current/configuration-ssl.html#ssl-common-config) for details. - multi: false - required: false - show_user: false - default: | - enabled: false - certificate: "/etc/pki/client/cert.pem" - key: "/etc/pki/client/cert.key" - - name: processors - type: yaml - title: Processors - multi: false - required: false - show_user: false - description: | - Processors are used to reduce the number of fields in the exported event or to enhance the event with metadata. This executes in the agent before the logs are parsed. See [Processors](https://www.elastic.co/guide/en/beats/filebeat/current/filtering-and-enhancing-data.html) for details. - - name: tags - type: text - title: Tags - description: Tags to include in the published event - required: false - default: - - forwarded - multi: true - show_user: true -# Ensures agents have permissions to write data to `logs-*-*` -elasticsearch: - dynamic_dataset: true - dynamic_namespace: true diff --git a/packages/kafka_log/docs/README.md b/packages/kafka_log/docs/README.md index 0148a95a816..529ad24b8b3 100644 --- a/packages/kafka_log/docs/README.md +++ b/packages/kafka_log/docs/README.md @@ -1,17 +1,107 @@ -# Custom Kafka Log integration +# Custom Kafka Integration -The custom Kafka log integration is used to read from topics in a Kafka cluster. +## Overview -To configure this integration, specify a list of one or more hosts in the cluster to bootstrap the connection with, a list of topics to track, and a group_id for the connection. +The Custom Kafka integration is an **input** package for Elastic Agent. It runs the Filebeat **Kafka** input so agents can **consume** records from Apache Kafka topics and ship them to Elasticsearch. Use it when applications or pipelines already publish logs or events to Kafka and you want Elastic to read from those topics without an intermediate forwarder. +### Compatibility -## Compatibility -This Integration works with all Kafka versions in between 0.11 and 2.8.0. Older versions might work as well, but are not supported. +This integration is intended for Kafka clusters where brokers speak the standard Kafka protocol. It is tested and supported for Kafka broker versions roughly between **0.11** and **2.8.0**. Earlier or later brokers can work but are not guaranteed. +### How it works -## Ingest Pipelines -Custom ingest pipelines may be added by adding the name to the pipeline configuration option, creating custom ingest pipelines can be done either through the API or the [Ingest Node Pipeline UI](/app/management/ingest/ingest_pipelines/). +Elastic Agent connects to your cluster using the **bootstrap hosts** you configure, joins a **consumer group** (`group_id`), and subscribes to one or more **topics**. Messages are read by the agent, enriched with Kafka metadata (for example topic, partition, offset), and written to Elasticsearch using the **dataset** name you choose (default `kafka_log.generic`). Optional **SASL**, **Kerberos**, and **TLS** settings secure the connection to the brokers. Optional **parsers** and **processors** adjust the payload on the agent before ingest. -**ECS Field Reference** +## What data does this integration collect? -Please refer to the following [document](https://www.elastic.co/guide/en/ecs/current/ecs-field-reference.html) for detailed information on ECS fields. \ No newline at end of file +The integration collects **events** derived from Kafka messages: + +- **Message payload**: Typically stored in the `message` field (format depends on your producers—plain text, JSON, syslog, and so on). +- **Kafka metadata**: Fields such as `kafka.topic`, `kafka.partition`, `kafka.offset`, `kafka.key`, and `kafka.headers` where applicable. +- **Routing fields**: `data_stream.dataset`, `data_stream.type`, and `data_stream.namespace` follow your Fleet policy and dataset name. + +The default **dataset** is `kafka_log.generic`. Changing the **Dataset name** in the policy sends data to a different backing data stream. Dataset names must follow Elasticsearch naming rules (no `-` in the dataset segment). + +### Supported use cases + +- Ingest logs or telemetry already landed on Kafka by microservices or stream processors. +- Centralize topic data for search and observability in Kibana without maintaining a separate log shipper per producer. +- Apply a custom **Ingest Pipeline** in Elasticsearch when you need parsing or ECS normalization beyond agent-side parsers. + +## What do I need to use this integration? + +### Kafka prerequisites + +- **Network reachability** from the host running Elastic Agent to each configured bootstrap broker (hostnames and ports). +- **Topic access**: ACLs or permissions that allow your consumer **group** to read the configured topics. +- **Authentication details** if the cluster uses SASL (PLAIN, SCRAM), Kerberos, or TLS client certificates—match these to your broker configuration. + +### Elastic prerequisites + +- A stack version that satisfies the integration’s **Kibana** requirement (refer to the integration manifest in Kibana or this package’s `manifest.yml`). + +## How do I deploy this integration? + +### Agent-based deployment + +Elastic Agent runs the Kafka input and forwards events to Elasticsearch. Install the agent on a host that can reach your Kafka brokers (same VPC or routed network, firewall rules allowing outbound connections to broker listeners). + +### Set up steps for Kafka + +1. Identify **bootstrap broker addresses** (for example `kafka1:9092`) and the **topic names** to consume. +2. Choose a unique **consumer group id** (`group_id`) for this policy integration—duplicate group membership affects partition assignment when multiple agents share the same group. +3. If the cluster uses TLS or SASL, gather certificates, credentials, or Kerberos configuration paths before editing the integration. + +### Set up steps in Kibana + +1. Go to **Management → Integrations**. +2. Search for **Custom Kafka Logs** and open it. +3. Click **Add Custom Kafka Logs** (or add the integration to an existing policy). +4. Configure the main options: + - **Hosts**: Bootstrap servers for the Kafka cluster. + - **Topics**: Topics to subscribe to. + - **Group ID**: Consumer group for this input. + - **Dataset name**: Target dataset (default `kafka_log.generic`). + - **Client ID**, **Kafka protocol version**, **initial offset**, fetch/rebalance tuning: expand **Advanced options** when needed. +5. Configure **SSL**, **SASL**, or **Kerberos** under advanced sections if your brokers require them. +6. Optionally set **Parsers** (for example NDJSON) or **Processors**, and **Tags**. +7. Optionally set **Ingest Pipeline** to an Elasticsearch pipeline ID for server-side processing. +8. Save the policy and confirm the agent receives the updated configuration. + +### Validation + +1. Produce a test message to one of the configured topics (use your usual producer tooling or `kafka-console-producer`). +2. In Kibana, open **Analytics → Discover** and select a logs-related data view (for example `logs-*`). +3. Filter with KQL, for example: `data_stream.dataset : "kafka_log.generic"` — adjust to match your configured **Dataset name** (default `kafka_log.generic`). +4. Confirm fields such as `message`, `kafka.topic`, `input.type` (`kafka`), and timestamps look correct. + +## Troubleshooting + +For help with Elastic ingest tools, refer to [Common problems](https://www.elastic.co/docs/troubleshoot/ingest/fleet/common-problems). + +### Common configuration issues + +- **Connection or timeout errors**: Verify broker addresses, ports, TLS (`ssl.enabled`), and that firewalls allow outbound traffic from the agent host to every bootstrap broker. +- **Authentication failures**: Confirm SASL mechanism, username/password, or Kerberos settings align with the broker, check broker logs for `Authentication failed` or similar. +- **No documents in Discover**: Confirm the agent is healthy, the policy applied, and the **Dataset name** matches your Discover filter. Dataset names **must not** contain hyphens. +- **Duplicate or competing consumers**: Using the same `group_id` on many agents splits partitions across them by design, use distinct groups if you need full duplicate reads. +- **Offset / replay behavior**: `initial_offset` (for example `oldest` vs `newest`) affects where consumption starts for new groups. Changing `group_id` starts a new consumer group offset state. +- **Parsing issues**: If JSON or multiline payloads look wrong, review **Parsers** and consider an Elasticsearch **Ingest Pipeline** for complex structures. + +## Performance and scaling + +For architectures used to scale ingest, refer to [Ingest Architectures](https://www.elastic.co/docs/manage-data/ingest/ingest-reference-architectures). + +- **Throughput**: Kafka throughput scales with **partitions** and consumer parallelism, multiple agents with the **same** `group_id` share partitions (one consumer per partition per group). +- **Fetch settings**: Tune **fetch** sizes and **max_wait_time** in advanced options if you need higher batching or lower latency—balance broker load and agent memory. +- **Multiple integrations**: Separate policies or dataset names help isolate indices and retention for different topic groups. +- **Elasticsearch**: Size your cluster for the volume of documents and consider ingest pipelines and index lifecycle policies for hot/warm tiers. + +## Reference + +Refer to the [ECS field reference](https://www.elastic.co/guide/en/ecs/current/ecs-field-reference.html) for ECS fields. + +Additional documentation: + +- [Filebeat Kafka input](https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-kafka.html) +- [Filebeat SSL settings](https://www.elastic.co/guide/en/beats/filebeat/current/configuration-ssl.html#ssl-common-config) diff --git a/packages/kafka_log/data_stream/generic/fields/base-fields.yml b/packages/kafka_log/fields/base-fields.yml similarity index 95% rename from packages/kafka_log/data_stream/generic/fields/base-fields.yml rename to packages/kafka_log/fields/base-fields.yml index 5364b4535c1..44a636b5783 100644 --- a/packages/kafka_log/data_stream/generic/fields/base-fields.yml +++ b/packages/kafka_log/fields/base-fields.yml @@ -14,7 +14,6 @@ - name: event.dataset type: constant_keyword description: Event dataset - value: kafka_log.generic - name: "@timestamp" type: date description: Event timestamp. diff --git a/packages/kafka_log/data_stream/generic/fields/fields.yml b/packages/kafka_log/fields/fields.yml similarity index 100% rename from packages/kafka_log/data_stream/generic/fields/fields.yml rename to packages/kafka_log/fields/fields.yml diff --git a/packages/kafka_log/manifest.yml b/packages/kafka_log/manifest.yml index 51f25627701..62623d28f0a 100644 --- a/packages/kafka_log/manifest.yml +++ b/packages/kafka_log/manifest.yml @@ -1,9 +1,9 @@ -format_version: "3.0.2" +format_version: "3.3.2" name: kafka_log title: Custom Kafka Logs description: Collect data from kafka topic with Elastic Agent. -type: integration -version: "1.9.1" +type: input +version: "2.0.0" conditions: kibana: version: "^8.13.0 || ^9.0.0" @@ -14,12 +14,274 @@ categories: - custom policy_templates: - name: kafka_log + type: logs title: Custom Kafka Logs - description: Collect data from kafka topic with Elastic Agent. - inputs: - - type: kafka - title: Custom Kafka Logs - description: Collect data from kafka topic with Elastic Agent. + description: Collect data from Kafka topic with Elastic Agent. + input: kafka + template_path: kafka.yml.hbs + vars: + - name: hosts + type: text + title: Hosts + description: | + A list of Kafka bootstrapping hosts (brokers) for this cluster. + required: true + show_user: true + multi: true + - name: topics + type: text + title: Topics + description: | + A list of topics to read from. + required: true + show_user: true + multi: true + - name: data_stream.dataset + type: text + title: Dataset name + description: | + Dataset to write data to. Changing the dataset will send the data to a different index. You can't use `-` in the name of a dataset and only valid characters for [Elasticsearch index names](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html). + default: kafka_log.generic + required: true + show_user: true + - name: pipeline + type: text + title: Ingest Pipeline + description: | + The Ingest Node pipeline ID to be used by the integration. + required: false + show_user: true + - name: group_id + type: text + title: Group ID + description: The Kafka consumer group id. + required: true + show_user: true + - name: client_id + type: text + title: Client ID + description: The Kafka client id (optional). + required: false + show_user: true + - name: version + type: text + title: Version + description: The version of the Kafka protocol to use (defaults to "1.0.0" in 8.x and "2.1.0" in 9.0+). + required: false + show_user: true + - name: expand_event_list_from_field + type: text + title: Expand Event List from Field + description: Split a field that contains an array of JSON objects, the value would be the name of this field. + required: false + show_user: true + - name: parsers + type: yaml + title: Parsers + description: | + This option expects a list of parsers that the payload has to go through. For more information see [Parsers](https://www.elastic.co/guide/en/beats/filebeat/8.0/filebeat-input-kafka.html#_parsers_2) + required: false + show_user: true + multi: false + default: | + #- ndjson: + # keys_under_root: true + # message_key: msg + #- multiline: + # type: counter + # lines_count: 3 + - name: sasl_mechanism + title: SASL Mechanism + description: SASL Auth Mechanism + type: select + options: + - text: None + value: "" + - text: PLAIN + value: PLAIN + - text: SCRAM-SHA-256 + value: SCRAM-SHA-256 + - text: SCRAM-SHA-512 + value: SCRAM-SHA-512 + default: "" + required: false + show_user: true + - name: username + type: text + title: Username + description: Username used for SASL authentication. + required: false + show_user: true + - name: password + type: password + title: Password + secret: true + description: Password used for SASL authentication. + required: false + show_user: true + - name: kerberos_enabled + type: bool + title: Kerberos Enabled + description: The enabled setting can be used to enable the Kerberos configuration by setting it to true. The default value is false. + required: false + show_user: false + - name: kerberos_auth_type + type: text + title: Kerberos Auth Type + description: | + There are two options to authenticate with Kerberos KDC: password and keytab. + Password expects the principal name and its password. When choosing keytab, you have to specify a principal name and a path to a keytab. The keytab must contain the keys of the selected principal. Otherwise, authentication will fail. + required: false + show_user: false + - name: kerberos_config_path + type: text + title: Kerberos Config Path + description: You need to set the path to the krb5.conf, so Elastic Agent can find the Kerberos KDC to retrieve a ticket. + required: false + show_user: false + - name: kerberos_username + type: text + title: Kerberos Username + description: Name of the principal used to connect to the output. + required: false + show_user: false + - name: kerberos_password + type: password + title: Kerberos Password + secret: true + description: If you configured password for Auth Type, you have to provide a password for the selected principal. + required: false + show_user: false + - name: kerberos_keytab + type: text + title: Kerberos Keytab + description: If you configured keytab for Auth Type, you have to provide the path to the keytab of the selected principal. + required: false + show_user: false + - name: kerberos_service_name + type: text + title: Kerberos Service Name + description: This option can only be configured for Kafka. It is the name of the Kafka service, usually "kafka". + required: false + show_user: false + - name: kerberos_realm + type: text + title: Kerberos Realm + description: Name of the realm where the output resides. + required: false + show_user: false + - name: kerberos_enable_krb5_fast + type: bool + title: Kerberos KRB5 Fast + description: Enable Kerberos FAST authentication. This may conflict with some Active Directory installations. The default is false. + required: false + show_user: false + - name: initial_offset + type: text + title: Initial Offset + description: The initial offset to start reading, either "oldest" or "newest". Defaults to "oldest". + required: false + show_user: false + - name: connect_backoff + type: text + title: Connect Backoff + description: How long to wait before trying to reconnect to the kafka cluster after a fatal error. Default is 30s. + required: false + show_user: false + - name: consume_backoff + type: text + title: Consume Backoff + description: How long to wait before retrying a failed read. Default is 2s. + required: false + show_user: false + - name: max_wait_time + type: text + title: Max Wait Time + description: How long to wait for the minimum number of input bytes while reading. Default is 250ms. + required: false + show_user: false + - name: wait_close + type: text + title: Wait Close + description: When shutting down, how long to wait for in-flight messages to be delivered and acknowledged. + required: false + show_user: false + - name: isolation_level + type: text + title: Isolation Level + description: This configures the Kafka group isolation level, supports the values "read_uncommitted" which returns all messages in the message channel and "read_committed" which hides messages that are part of an aborted transaction. The default is "read_uncommitted". + required: false + show_user: false + - name: fetch_min + type: text + title: Fetch Min + description: The minimum number of bytes to wait for. Defaults to 1. + required: false + show_user: false + - name: fetch_default + type: text + title: Fetch Default + description: The default number of bytes to read per request. Defaults to 1MB. + required: false + show_user: false + - name: fetch_max + type: text + title: Fetch Max + description: The maximum number of bytes to read per request. Defaults to 0 (no limit). + required: false + show_user: false + - name: rebalance_strategy + type: text + title: Rebalance Strategy + description: Either "range" or "roundrobin". Defaults to "range". + required: false + show_user: false + - name: rebalance_timeout + type: text + title: Rebalance Timeout + description: How long to wait for an attempted rebalance. Defaults to 60s. + required: false + show_user: false + - name: rebalance_max_retries + type: text + title: Rebalance Max Retries + description: How many times to retry if rebalancing fails. Defaults to 4. + required: false + show_user: false + - name: rebalance_retry_backoff + type: text + title: Rebalance Retry Backoff + description: How long to wait after an unsuccessful rebalance attempt. Defaults to 2s. + required: false + show_user: false + - name: ssl + type: yaml + title: SSL Configuration + description: SSL configuration options. See [documentation](https://www.elastic.co/guide/en/beats/filebeat/current/configuration-ssl.html#ssl-common-config) for details. + multi: false + required: false + show_user: false + default: | + enabled: false + certificate: "/etc/pki/client/cert.pem" + key: "/etc/pki/client/cert.key" + - name: processors + type: yaml + title: Processors + multi: false + required: false + show_user: false + description: | + Processors are used to reduce the number of fields in the exported event or to enhance the event with metadata. This executes in the agent before the logs are parsed. See [Processors](https://www.elastic.co/guide/en/beats/filebeat/current/filtering-and-enhancing-data.html) for details. + - name: tags + type: text + title: Tags + description: Tags to include in the published event + required: false + default: + - forwarded + multi: true + show_user: true icons: - src: "/img/logo_kafka.svg" type: "image/svg+xml" diff --git a/packages/kafka_log/data_stream/generic/sample_event.json b/packages/kafka_log/sample_event.json similarity index 100% rename from packages/kafka_log/data_stream/generic/sample_event.json rename to packages/kafka_log/sample_event.json diff --git a/packages/kafka_log/validation.yml b/packages/kafka_log/validation.yml new file mode 100644 index 00000000000..406e69f42b9 --- /dev/null +++ b/packages/kafka_log/validation.yml @@ -0,0 +1,3 @@ +docs_structure_enforced: + enabled: true + version: 1