Skip to content

Commit

Permalink
Merge branch 'endtoend'
Browse files Browse the repository at this point in the history
# Conflicts:
#	docs/metrics.md
#	go.mod
#	go.sum
  • Loading branch information
weeco committed Jun 7, 2021
2 parents 17ec4dd + 4948761 commit 423e1c0
Show file tree
Hide file tree
Showing 37 changed files with 1,882 additions and 133 deletions.
44 changes: 44 additions & 0 deletions .github/workflows/image-on-push-to-branch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
name: Build Docker image

on:
push:
tags:
- "*"
branches:
- "!master"
- "**"
paths-ignore:
- "charts/**"

jobs:
build:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@master

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1

- name: Docker meta
id: docker_meta
uses: docker/metadata-action@v3
with:
images: quay.io/cloudhut/kminion
tags: |
type=sha,prefix={{branch}}-
- name: Login to Quay
uses: docker/login-action@v1
with:
registry: quay.io
username: cloudhut+github_push
password: ${{ secrets.QUAY_TOKEN }}

- name: Build and push
uses: docker/build-push-action@v2
with:
push: true
tags: ${{ steps.docker_meta.outputs.tags }}
build-args: |
KMINION_VERSION=sha-${{ github.sha }}
File renamed without changes.
7 changes: 6 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,9 @@ zk-multiple-kafka-multiple
.vscode
.idea

config
config

# go debug binary
__debug_bin

notes.md
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ KMinion.
- **Consumer Group Lags:** Number of messages a consumer group is lagging behind the latest offset
- **Log dir sizes:** Metric for log dir sizes either grouped by broker or by topic
- **Broker info:** Metric for each broker with its address, broker id, controller and rack id
- **Configurable granularity:** Export metrics (e.g. consumer group lags) either per partition or per topic. This helps
to reduce the number of exported metric series
- **Configurable granularity:** Export metrics (e.g. consumer group lags) either per partition or per topic. Helps to reduce the number of exported metric series.
- **End to End Monitoring:** Sends messages to its own topic and consumes them, measuring a messages real-world "roundtrip" latency. Also provides ack-latency and offset-commit-latency. [More Info](/docs/end-to-end.md)
- **Configurable targets:** You can configure what topics or groups you'd like to export using regex expressions
- **Multiple config parsers:** It's possible to configure KMinion using YAML, Environment variables or a mix of both

Expand Down
128 changes: 128 additions & 0 deletions docs/end-to-end.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# End-To-End Monitoring

This page describes the end-to-end monitoring feature in KMinion, how it works, and what metrics it provides.

## Motivation
> What is the issue? Why did we build this feature?
We can monitor metrics like CPU usage, free disk space, or even consumer group lag.
However, these metrics don't give us a good idea of the performance characteristics an actual, real-world, client
experiences when connected to the cluster.

With the "classic" metrics lots of questions go unanswered:
- Can a client produce messages to the cluster?
- Can clients produce & consume messages as well as commit group offsets with an acceptable latency?
- Is the cluster in a healthy state from a client's perspective?

## Approach & Implementation
> How do we solve those issues? How does the feature work?
The most reliably way to get real-world performance and availability metrics is to actually run a producer/consumer
ourselves. This is exactly what the end-to-end monitoring feature does!

## High Level Overview
In order to determine if the cluster is fully operational, and it's performance is within acceptable limits,
KMinion continuously produces and consumes messages to/from the cluster. That way we can measure things like ack-latency,
commit-latency, and roundtrip-time.

KMinion creates and manages its own topic for the end-to-end test messages. The name of the topic can be configured.

**The first step** is to create a message and send it to the cluster.
- Every produced message is added to an internal tracker, so we can recognize messages being "lost".
A message is considered lost if it doesn't arrive back at the consumer within the configured time span.

**The second step** is to continuously consume the topic.
- As each message arrives, we calculate its roundtrip time (time from the point the message was created, until KMinion received it again)
- Consumer group offsets are committed periodically, while also recording the time each commit takes.

### Topic Management
The topic KMinion uses, is created and managed completely automatically (the topic name can be configured though).

KMinion continuously checks the topic and fixes issues/imbalances automatically:
- Add partitions to the topic, so it has at least as many partitions as there are brokers.
- Will reassign partitions to ensure every broker leads at least one partition, and that all partitions' replicas
are distributed evenly across the brokers. KMinion tries to assign partitionIDs to brokers that have the same broker id.


### Consumer Group Management
On startup each KMinion instance generates a unique identifier (UUID) that is used to create its own consumer group.
It incorporates the shared prefix from the config.

That is necessary because:
- Offsets must not be shared among multiple instances.
- Each instance must always consume **all** partitions of the topic.

The instances' UUID is also embedded in every message, so each instance can easily filter out messages it didn't produce.
That's why it is perfectly fine to run multiple KMinion instances against the same cluster, using the same topic.

KMinion also monitors and deletes consumer groups that use it's configured prefix.
That way, when an instance exits/restarts, previous consumer groups will be cleaned up quickly (check happens every 20s).


## Available Metrics
The end-to-end monitoring feature exports the following metrics.

### Counters
| Name | Description |
| --- | --- |
| `kminion_end_to_end_messages_produced_total ` | Messages KMinion *tried* to send |
| `kminion_end_to_end_messages_acked_total ` | Messages actually sent and acknowledged by the cluster |
| `kminion_end_to_end_messages_received_total ` | Number of messages received (only counts those that match, i.e. that this instance actually produced itself) |
| `kminion_end_to_end_commits_total` | Number of successful offset commits |


### Histograms
| Name | Description |
| --- | --- |
| `kminion_end_to_end_produce_latency_seconds ` | Duration until the cluster acknowledged a message. |
| `kminion_end_to_end_commit_latency_seconds` | Duration of offset commits. Has a label for coordinator brokerID that answered the commit request |
| `kminion_end_to_end_roundtrip_latency_seconds ` | Duration from creation of a message, until it was received/consumed again. |

## Config Properties
All config properties related to this feature are located in `minion.endToEnd`.

```yaml
endToEnd:
enabled: true
probeInterval: 800ms # how often to send end-to-end test messages
topicManagement:
# You can disable topic management, without disabling the testing feature.
# Only makes sense if you have multiple kminion instances, and for some reason only want one of them to create/configure the topic.
# It is strongly recommended to leave this enabled.
enabled: true

# Name of the topic kminion uses to send its test messages
# You do *not* need to change this if you are running multiple kminion instances on the same cluster.
# Different instances are perfectly fine with sharing the same topic!
name: kminion-end-to-end

# How often kminion checks its topic to validate configuration, partition count, and partition assignments
reconciliationInterval: 10m

# Useful for monitoring the performance of acks (if >1 this is best combined with 'producer.requiredAcks' set to 'all')
replicationFactor: 1

# Rarely makes sense to change this, but maybe if you want some sort of cheap load test?
partitionsPerBroker: 1

producer:
# This defines the maximum time to wait for an ack response after producing a message,
# and the upper bound for histogram buckets in "produce_latency_seconds"
ackSla: 5s
# Can be to "all" (default) so kafka only reports an end-to-end test message as acknowledged if
# the message was written to all in-sync replicas of the partition.
# Or can be set to "leader" to only require to have written the message to its log.
requiredAcks: all

consumer:
# Prefix kminion uses when creating its consumer groups. Current kminion instance id will be appended automatically
groupIdPrefix: kminion-end-to-end
# Defines the time limit beyond which a message is considered "lost" (failed the roundtrip),
# also used as the upper bound for histogram buckets in "roundtrip_latency"
roundtripSla: 20s

# Maximum time an offset commit is allowed to take before considering it failed,
# also used as the upper bound for histogram buckets in "commit_latency_seconds"
commitSla: 10s
```
35 changes: 31 additions & 4 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,40 @@ kminion_kafka_consumer_group_topic_partition_lag{group_id="bigquery-sink",partit
# HELP kminion_kafka_consumer_group_topic_lag The number of messages a consumer group is lagging behind across all partitions in a topic
# TYPE kminion_kafka_consumer_group_topic_lag gauge
kminion_kafka_consumer_group_topic_lag{group_id="bigquery-sink",topic_name="shop-activity"} 147481
```
#### Offset Commits Metrics
The following metrics are only available when KMinion is configured to use `scrapeMode: offsetsTopic`.
```
# HELP kminion_kafka_consumer_group_offset_commits_total The number of offsets committed by a group
# TYPE kminion_kafka_consumer_group_offset_commits_total counter
kminion_kafka_consumer_group_offset_commits_total{group_id="bigquery-sink"} 1098
```

### End-to-End Metrics

```
# HELP kminion_end_to_end_messages_produced_total Number of messages that kminion's end-to-end test has tried to send to kafka
# TYPE kminion_end_to_end_messages_produced_total counter
kminion_end_to_end_messages_produced_total 384
# HELP kminion_end_to_end_commits_total Counts how many times kminions end-to-end test has committed messages
# TYPE kminion_end_to_end_commits_total counter
kminion_end_to_end_commits_total 18
# HELP kminion_end_to_end_messages_acked_total Number of messages kafka acknowledged as produced
# TYPE kminion_end_to_end_messages_acked_total counter
kminion_end_to_end_messages_acked_total 383
# HELP kminion_end_to_end_messages_received_total Number of *matching* messages kminion received. Every roundtrip message has a minionID (randomly generated on startup) and a timestamp. Kminion only considers a message a match if it it arrives within the configured roundtrip SLA (and it matches the minionID)
# TYPE kminion_end_to_end_messages_received_total counter
kminion_end_to_end_messages_received_total 383
# HELP kminion_end_to_end_produce_latency_seconds Time until we received an ack for a produced message
# TYPE kminion_end_to_end_produce_latency_seconds histogram
kminion_end_to_end_produce_latency_seconds_bucket{partitionId="0",le="0.005"} 0
# HELP kminion_end_to_end_commit_latency_seconds Time kafka took to respond to kminion's offset commit
# TYPE kminion_end_to_end_commit_latency_seconds histogram
kminion_end_to_end_commit_latency_seconds_bucket{groupCoordinatorBrokerId="0",le="0.005"} 0
# HELP kminion_end_to_end_roundtrip_latency_seconds Time it took between sending (producing) and receiving (consuming) a message
# TYPE kminion_end_to_end_roundtrip_latency_seconds histogram
kminion_end_to_end_roundtrip_latency_seconds_bucket{partitionId="0",le="0.005"} 0
```
83 changes: 83 additions & 0 deletions e2e/client_hooks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package e2e

import (
"net"
"sync/atomic"
"time"

"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"go.uber.org/zap"
)

// in e2e we only use client hooks for logging connect/disconnect messages
type clientHooks struct {
logger *zap.Logger

lastCoordinatorUpdate time.Time
currentCoordinator *atomic.Value // kgo.BrokerMetadata
}

func newEndToEndClientHooks(logger *zap.Logger) *clientHooks {

return &clientHooks{
logger: logger.Named("e2e-hooks"),
currentCoordinator: &atomic.Value{},
}
}

func (c *clientHooks) OnConnect(meta kgo.BrokerMetadata, dialDur time.Duration, _ net.Conn, err error) {
if err != nil {
c.logger.Error("kafka connection failed", zap.String("broker_host", meta.Host), zap.Int32("broker_id", meta.NodeID), zap.Error(err))
return
}
c.logger.Debug("kafka connection succeeded",
zap.String("host", meta.Host), zap.Int32("broker_id", meta.NodeID),
zap.Duration("dial_duration", dialDur))
}

func (c *clientHooks) OnDisconnect(meta kgo.BrokerMetadata, _ net.Conn) {
c.logger.Warn("kafka broker disconnected", zap.Int32("broker_id", meta.NodeID),
zap.String("host", meta.Host))
}

// OnWrite is passed the broker metadata, the key for the request that
// was written, the number of bytes written, how long the request
// waited before being written, how long it took to write the request,
// and any error.
//
// The bytes written does not count any tls overhead.
// OnWrite is called after a write to a broker.
//
// OnWrite(meta BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error)
func (c *clientHooks) OnWrite(meta kgo.BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) {
keyName := kmsg.NameForKey(key)
if keyName != "OffsetCommit" {
return
}

// c.logger.Info("hooks onWrite",
// zap.Duration("timeToWrite", timeToWrite),
// zap.NamedError("err", err))
}

// OnRead is passed the broker metadata, the key for the response that
// was read, the number of bytes read, how long the Client waited
// before reading the response, how long it took to read the response,
// and any error.
//
// The bytes written does not count any tls overhead.
// OnRead is called after a read from a broker.
// OnRead(meta BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error)
func (c *clientHooks) OnRead(meta kgo.BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error) {

keyName := kmsg.NameForKey(key)
if keyName != "OffsetCommit" {
return
}

if err == nil {
c.currentCoordinator.Store(meta)
c.lastCoordinatorUpdate = time.Now()
}
}
56 changes: 56 additions & 0 deletions e2e/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package e2e

import (
"fmt"
"time"
)

type Config struct {
Enabled bool `koanf:"enabled"`
TopicManagement EndToEndTopicConfig `koanf:"topicManagement"`
ProbeInterval time.Duration `koanf:"probeInterval"`
Producer EndToEndProducerConfig `koanf:"producer"`
Consumer EndToEndConsumerConfig `koanf:"consumer"`
}

func (c *Config) SetDefaults() {
c.Enabled = false
c.ProbeInterval = 2 * time.Second
c.TopicManagement.SetDefaults()
c.Producer.SetDefaults()
c.Consumer.SetDefaults()
}

func (c *Config) Validate() error {

if !c.Enabled {
return nil
}

// If the timeduration is 0s or 0ms or its variation of zero, it will be parsed as 0
if c.ProbeInterval == 0 {
return fmt.Errorf("failed to validate probeInterval config, the duration can't be zero")
}

err := c.TopicManagement.Validate()
if err != nil {
return fmt.Errorf("failed to validate topicManagement config: %w", err)
}

_, err = time.ParseDuration(c.ProbeInterval.String())
if err != nil {
return fmt.Errorf("failed to parse '%s' to time.Duration: %v", c.ProbeInterval.String(), err)
}

err = c.Producer.Validate()
if err != nil {
return fmt.Errorf("failed to validate producer config: %w", err)
}

err = c.Consumer.Validate()
if err != nil {
return fmt.Errorf("failed to validate consumer config: %w", err)
}

return nil
}
Loading

0 comments on commit 423e1c0

Please sign in to comment.