diff --git a/metricbeat/docker-compose.yml b/metricbeat/docker-compose.yml index dfa56f08289c..c9c66bfb6389 100644 --- a/metricbeat/docker-compose.yml +++ b/metricbeat/docker-compose.yml @@ -153,15 +153,6 @@ services: ports: - 8778 - kafka: - image: docker.elastic.co/observability-ci/beats-integration-kafka:${KAFKA_VERSION:-2.1.1}-1 - build: - context: ./module/kafka/_meta - args: - KAFKA_VERSION: ${KAFKA_VERSION:-2.1.1} - ports: - - 9092 - kibana: image: docker.elastic.co/observability-ci/beats-integration-kibana:${KIBANA_VERSION:-7.4.0}-1 build: diff --git a/metricbeat/module/kafka/_meta/healthcheck.sh b/metricbeat/module/kafka/_meta/healthcheck.sh index 97d70b812edc..f77b2891a2d9 100755 --- a/metricbeat/module/kafka/_meta/healthcheck.sh +++ b/metricbeat/module/kafka/_meta/healthcheck.sh @@ -1,14 +1,16 @@ #!/bin/bash +ZOOKEEPER_HOST=${ZOOKEEPER_HOST:-zookeeper} + [ -f /tmp/.acls_loaded ] || exit 1 TOPIC="foo-`date '+%s-%N'`" -${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper=127.0.0.1:2181 --create --partitions 1 --topic "${TOPIC}" --replication-factor 1 +${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper=${ZOOKEEPER_HOST}:2181 --create --partitions 1 --topic "${TOPIC}" --replication-factor 1 rc=$? if [[ $rc != 0 ]]; then exit $rc fi -${KAFKA_HOME}/bin/kafka-topic.sh --zookeeper=127.0.0.1:2181 --delete --topic "${TOPIC}" +${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper=${ZOOKEEPER_HOST}:2181 --delete --topic "${TOPIC}" exit 0 diff --git a/metricbeat/module/kafka/_meta/run.sh b/metricbeat/module/kafka/_meta/run.sh index 68df3e8a4c6a..ef5709bb8478 100755 --- a/metricbeat/module/kafka/_meta/run.sh +++ b/metricbeat/module/kafka/_meta/run.sh @@ -1,5 +1,8 @@ #!/bin/bash +KAFKA_BROKER_NAME=${KAFKA_BROKER_NAME:-kafka} +ZOOKEEPER_HOST=${ZOOKEEPER_HOST:-zookeeper} + if [ -n "$KAFKA_ADVERTISED_HOST_AUTO" ]; then KAFKA_ADVERTISED_HOST=$(dig +short $HOSTNAME):9092 fi @@ -22,10 +25,11 @@ if [ -z "$KAFKA_ADVERTISED_HOST" ]; then done fi -wait_for_port() { +wait_for() { count=20 - port=$1 - while ! nc -z localhost $port && [[ $count -ne 0 ]]; do + host=$1 + port=$2 + while ! nc -z $host $port && [[ $count -ne 0 ]]; do count=$(( $count - 1 )) [[ $count -eq 0 ]] && return 1 sleep 0.5 @@ -35,14 +39,15 @@ wait_for_port() { } -echo "Starting ZooKeeper" -${KAFKA_HOME}/bin/zookeeper-server-start.sh ${KAFKA_HOME}/config/zookeeper.properties & -wait_for_port 2181 +echo "Waiting for ZooKeeper" +wait_for $ZOOKEEPER_HOST 2181 echo "Starting Kafka broker" mkdir -p ${KAFKA_LOGS_DIR} export KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/server_jaas.conf ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties \ + --override host.name=$KAFKA_BROKER_NAME \ + --override zookeeper.connect=$ZOOKEEPER_HOST:2181 \ --override authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer \ --override super.users=User:admin \ --override sasl.enabled.mechanisms=PLAIN \ @@ -54,17 +59,17 @@ ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties \ --override inter.broker.listener.name=INSIDE \ --override logs.dir=${KAFKA_LOGS_DIR} & -wait_for_port 9092 +wait_for localhost 9092 echo "Kafka load status code $?" # ACLS used to prepare tests -${KAFKA_HOME}/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:producer --operation All --cluster --topic '*' --group '*' -${KAFKA_HOME}/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:consumer --operation All --cluster --topic '*' --group '*' +${KAFKA_HOME}/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=${ZOOKEEPER_HOST}:2181 --add --allow-principal User:producer --operation All --cluster --topic '*' --group '*' +${KAFKA_HOME}/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=${ZOOKEEPER_HOST}:2181 --add --allow-principal User:consumer --operation All --cluster --topic '*' --group '*' # Minimal ACLs required by metricbeat. If this needs to be changed, please update docs too -${KAFKA_HOME}/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:stats --operation Describe --group '*' -${KAFKA_HOME}/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:stats --operation Read --topic '*' +${KAFKA_HOME}/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=${ZOOKEEPER_HOST}:2181 --add --allow-principal User:stats --operation Describe --group '*' +${KAFKA_HOME}/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=${ZOOKEEPER_HOST}:2181 --add --allow-principal User:stats --operation Read --topic '*' touch /tmp/.acls_loaded diff --git a/metricbeat/module/kafka/docker-compose.yml b/metricbeat/module/kafka/docker-compose.yml new file mode 100644 index 000000000000..a45f32832926 --- /dev/null +++ b/metricbeat/module/kafka/docker-compose.yml @@ -0,0 +1,52 @@ +version: '2.3' + +services: + zookeeper: + image: docker.elastic.co/observability-ci/beats-integration-zookeeper:${ZOOKEEPER_VERSION:-3.5.5}-1 + build: + context: ../zookeeper/_meta + args: + ZOOKEEPER_VERSION: ${ZOOKEEPER_VERSION:-3.5.5} + ports: + - 2181 + + kafka: + image: docker.elastic.co/observability-ci/beats-integration-kafka:${KAFKA_VERSION:-2.1.1}-2 + build: + context: ./_meta + args: + KAFKA_VERSION: ${KAFKA_VERSION:-2.1.1} + environment: + KAFKA_BROKER_NAME: kafka1 + ports: + - 9092 + depends_on: + - zookeeper + - kafka2 + - kafka3 + + kafka2: + image: docker.elastic.co/observability-ci/beats-integration-kafka:${KAFKA_VERSION:-2.1.1}-2 + build: + context: ./_meta + args: + KAFKA_VERSION: ${KAFKA_VERSION:-2.1.1} + environment: + KAFKA_BROKER_NAME: kafka2 + ports: + - 9092 + depends_on: + - zookeeper + + kafka3: + image: docker.elastic.co/observability-ci/beats-integration-kafka:${KAFKA_VERSION:-2.1.1}-2 + build: + context: ./_meta + args: + KAFKA_VERSION: ${KAFKA_VERSION:-2.1.1} + environment: + KAFKA_BROKER_NAME: kafka3 + ports: + - 9092 + depends_on: + - zookeeper diff --git a/metricbeat/tests/system/test_kafka.py b/metricbeat/module/kafka/test_kafka.py similarity index 95% rename from metricbeat/tests/system/test_kafka.py rename to metricbeat/module/kafka/test_kafka.py index f08d105eb579..a794a7c91b3b 100644 --- a/metricbeat/tests/system/test_kafka.py +++ b/metricbeat/module/kafka/test_kafka.py @@ -1,9 +1,12 @@ import os -import metricbeat +import sys import unittest from nose.plugins.attrib import attr from nose.plugins.skip import SkipTest +sys.path.append(os.path.join(os.path.dirname(__file__), '../../tests/system')) +import metricbeat + class KafkaTest(metricbeat.BaseTest): COMPOSE_SERVICES = ['kafka']