diff --git a/docs/root/start/sandboxes/kafka.rst b/docs/root/start/sandboxes/kafka.rst index ae12f4dd525c6..6debfe8788db6 100644 --- a/docs/root/start/sandboxes/kafka.rst +++ b/docs/root/start/sandboxes/kafka.rst @@ -49,7 +49,7 @@ Start by creating a Kafka topic with the name ``envoy-kafka-broker``: .. code-block:: console $ export TOPIC="envoy-kafka-broker" - $ docker-compose run --rm kafka-client --bootstrap-server proxy:10000 --create --topic $TOPIC + $ docker-compose run --rm kafka-client kafka-topics --bootstrap-server proxy:10000 --create --topic $TOPIC Step 3: Check the Kafka topic @@ -92,17 +92,23 @@ Step 6: Check admin ``kafka_broker`` stats When you proxy to the Kafka broker, Envoy records various stats. -You can check the broker stats by querying the Envoy admin interface: +You can check the broker stats by querying the Envoy admin interface +(the numbers might differ a little as the kafka-client does not expose precise control over its network traffic): .. code-block:: console - $ curl -s "http://localhost:8001/stats?filter=kafka.kafka_broker" | grep -v ": 0" - kafka.kafka_broker.request.api_versions_request: 4 - kafka.kafka_broker.request.find_coordinator_request: 1 - kafka.kafka_broker.request.metadata_request: 4 - kafka.kafka_broker.response.api_versions_response: 4 - kafka.kafka_broker.response.find_coordinator_response: 1 - kafka.kafka_broker.response.metadata_response: 4 + $ curl -s "http://localhost:8001/stats?filter=kafka.kafka_broker" | grep -v ": 0" | grep "_request:" + kafka.kafka_broker.request.api_versions_request: 9 + kafka.kafka_broker.request.create_topics_request: 1 + kafka.kafka_broker.request.fetch_request: 2 + kafka.kafka_broker.request.find_coordinator_request: 8 + kafka.kafka_broker.request.join_group_request: 2 + kafka.kafka_broker.request.leave_group_request: 1 + kafka.kafka_broker.request.list_offsets_request: 1 + kafka.kafka_broker.request.metadata_request: 12 + kafka.kafka_broker.request.offset_fetch_request: 1 + kafka.kafka_broker.request.produce_request: 1 + kafka.kafka_broker.request.sync_group_request: 1 Step 7: Check admin ``kafka_service`` cluster stats diff --git a/examples/kafka/docker-compose.yaml b/examples/kafka/docker-compose.yaml index 64882996a75d7..2c06cc79f329f 100644 --- a/examples/kafka/docker-compose.yaml +++ b/examples/kafka/docker-compose.yaml @@ -27,9 +27,10 @@ services: environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - # Listener value needs to be equal to cluster value in Envoy config - # (will receive payloads from Envoy). - KAFKA_LISTENER: INTERNAL://kafka-server:9092,EXTERNAL://proxy:10000 + # This Kafka server instance sets up two listener sockets: + # - external one for client traffic (this traffic will go through Envoy proxy), + # - internal one for cluster traffic (if we add more brokers). + KAFKA_LISTENERS: INTERNAL://kafka-server:9092,EXTERNAL://kafka-server:10000 # Advertised listener value needs to be equal to Envoy's listener # (will make clients discovering this broker talk to it through Envoy). KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-server:9092,EXTERNAL://proxy:10000 diff --git a/examples/kafka/envoy.yaml b/examples/kafka/envoy.yaml index a758285d33b81..f6d762e594fa8 100644 --- a/examples/kafka/envoy.yaml +++ b/examples/kafka/envoy.yaml @@ -27,8 +27,9 @@ static_resources: - endpoint: address: socket_address: + # Kafka server's listener for client traffic ('EXTERNAL'). address: kafka-server - port_value: 9092 + port_value: 10000 admin: address: diff --git a/examples/kafka/verify.sh b/examples/kafka/verify.sh index 0652b13025c72..b12876bc385dc 100755 --- a/examples/kafka/verify.sh +++ b/examples/kafka/verify.sh @@ -30,18 +30,38 @@ run_log "Receive a message using the Kafka consumer" kafka_client kafka-console-consumer --bootstrap-server proxy:10000 --topic $TOPIC --from-beginning --max-messages 1 | grep "$MESSAGE" run_log "Check admin kafka_broker stats" + +# This function verifies whether a given metric exists and has a value > 0. +has_metric_with_at_least_1 () { + local stat response value + stat="$1" + shift + response=$(_curl "http://localhost:${PORT_ADMIN}/stats?filter=${stat}") + # Extract number from rows like 'kafka.kafka_broker.request.api_versions_request: 123'. + value=$(echo "${response}" | grep "${stat}:" | cut -f2 -d':' | tr -d ' ') + re='^[0-9]+$' + [[ ${value} =~ ${re} && ${value} -gt 0 ]] || { + echo "ERROR: metric check for [${stat}]" >&2 + echo "EXPECTED: numeric value greater than 0" >&2 + echo "RECEIVED:" >&2 + echo "${response}" >&2 + return 1 + } +} + EXPECTED_BROKER_STATS=( - "kafka.kafka_broker.request.api_versions_request: 4" - "kafka.kafka_broker.request.find_coordinator_request: 1" - "kafka.kafka_broker.request.metadata_request: 4" - "kafka.kafka_broker.response.api_versions_response: 4" - "kafka.kafka_broker.response.find_coordinator_response: 1" - "kafka.kafka_broker.response.metadata_response: 4") + "kafka.kafka_broker.request.api_versions_request" + "kafka.kafka_broker.request.metadata_request" + "kafka.kafka_broker.request.create_topics_request" + "kafka.kafka_broker.request.produce_request" + "kafka.kafka_broker.request.fetch_request" + "kafka.kafka_broker.response.api_versions_response" + "kafka.kafka_broker.response.metadata_response" + "kafka.kafka_broker.response.create_topics_response" + "kafka.kafka_broker.response.produce_response" + "kafka.kafka_broker.response.fetch_response") for stat in "${EXPECTED_BROKER_STATS[@]}"; do - filter="$(echo "$stat" | cut -d: -f1)" - responds_with \ - "$stat" \ - "http://localhost:${PORT_ADMIN}/stats?filter=${filter}" + has_metric_with_at_least_1 "${stat}" done run_log "Check admin kafka_service stats"