diff --git a/.github/workflows/image-on-push-to-branch.yaml b/.github/workflows/image-on-push-to-branch.yaml new file mode 100644 index 0000000..20d9e35 --- /dev/null +++ b/.github/workflows/image-on-push-to-branch.yaml @@ -0,0 +1,44 @@ +name: Build Docker image + +on: + push: + tags: + - "*" + branches: + - "!master" + - "**" + paths-ignore: + - "charts/**" + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@master + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v1 + + - name: Docker meta + id: docker_meta + uses: docker/metadata-action@v3 + with: + images: quay.io/cloudhut/kminion + tags: | + type=sha,prefix={{branch}}- + + - name: Login to Quay + uses: docker/login-action@v1 + with: + registry: quay.io + username: cloudhut+github_push + password: ${{ secrets.QUAY_TOKEN }} + + - name: Build and push + uses: docker/build-push-action@v2 + with: + push: true + tags: ${{ steps.docker_meta.outputs.tags }} + build-args: | + KMINION_VERSION=sha-${{ github.sha }} diff --git a/.github/workflows/image-on-push.yaml b/.github/workflows/image-on-push-to-master.yaml similarity index 100% rename from .github/workflows/image-on-push.yaml rename to .github/workflows/image-on-push-to-master.yaml diff --git a/.gitignore b/.gitignore index 48f3e68..ef61953 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,9 @@ zk-multiple-kafka-multiple .vscode .idea -config \ No newline at end of file +config + +# go debug binary +__debug_bin + +notes.md \ No newline at end of file diff --git a/README.md b/README.md index f4445d4..082958f 100644 --- a/README.md +++ b/README.md @@ -13,8 +13,8 @@ KMinion. - **Consumer Group Lags:** Number of messages a consumer group is lagging behind the latest offset - **Log dir sizes:** Metric for log dir sizes either grouped by broker or by topic - **Broker info:** Metric for each broker with its address, broker id, controller and rack id -- **Configurable granularity:** Export metrics (e.g. consumer group lags) either per partition or per topic. This helps - to reduce the number of exported metric series +- **Configurable granularity:** Export metrics (e.g. consumer group lags) either per partition or per topic. Helps to reduce the number of exported metric series. +- **End to End Monitoring:** Sends messages to its own topic and consumes them, measuring a messages real-world "roundtrip" latency. Also provides ack-latency and offset-commit-latency. [More Info](/docs/end-to-end.md) - **Configurable targets:** You can configure what topics or groups you'd like to export using regex expressions - **Multiple config parsers:** It's possible to configure KMinion using YAML, Environment variables or a mix of both diff --git a/docs/end-to-end.md b/docs/end-to-end.md new file mode 100644 index 0000000..fe7dd10 --- /dev/null +++ b/docs/end-to-end.md @@ -0,0 +1,128 @@ +# End-To-End Monitoring + +This page describes the end-to-end monitoring feature in KMinion, how it works, and what metrics it provides. + +## Motivation +> What is the issue? Why did we build this feature? + +We can monitor metrics like CPU usage, free disk space, or even consumer group lag. +However, these metrics don't give us a good idea of the performance characteristics an actual, real-world, client +experiences when connected to the cluster. + +With the "classic" metrics lots of questions go unanswered: +- Can a client produce messages to the cluster? +- Can clients produce & consume messages as well as commit group offsets with an acceptable latency? +- Is the cluster in a healthy state from a client's perspective? + +## Approach & Implementation +> How do we solve those issues? How does the feature work? + +The most reliably way to get real-world performance and availability metrics is to actually run a producer/consumer +ourselves. This is exactly what the end-to-end monitoring feature does! + +## High Level Overview +In order to determine if the cluster is fully operational, and it's performance is within acceptable limits, +KMinion continuously produces and consumes messages to/from the cluster. That way we can measure things like ack-latency, +commit-latency, and roundtrip-time. + +KMinion creates and manages its own topic for the end-to-end test messages. The name of the topic can be configured. + +**The first step** is to create a message and send it to the cluster. +- Every produced message is added to an internal tracker, so we can recognize messages being "lost". + A message is considered lost if it doesn't arrive back at the consumer within the configured time span. + +**The second step** is to continuously consume the topic. +- As each message arrives, we calculate its roundtrip time (time from the point the message was created, until KMinion received it again) +- Consumer group offsets are committed periodically, while also recording the time each commit takes. + +### Topic Management +The topic KMinion uses, is created and managed completely automatically (the topic name can be configured though). + +KMinion continuously checks the topic and fixes issues/imbalances automatically: +- Add partitions to the topic, so it has at least as many partitions as there are brokers. +- Will reassign partitions to ensure every broker leads at least one partition, and that all partitions' replicas + are distributed evenly across the brokers. KMinion tries to assign partitionIDs to brokers that have the same broker id. + + +### Consumer Group Management +On startup each KMinion instance generates a unique identifier (UUID) that is used to create its own consumer group. +It incorporates the shared prefix from the config. + +That is necessary because: +- Offsets must not be shared among multiple instances. +- Each instance must always consume **all** partitions of the topic. + +The instances' UUID is also embedded in every message, so each instance can easily filter out messages it didn't produce. +That's why it is perfectly fine to run multiple KMinion instances against the same cluster, using the same topic. + +KMinion also monitors and deletes consumer groups that use it's configured prefix. +That way, when an instance exits/restarts, previous consumer groups will be cleaned up quickly (check happens every 20s). + + +## Available Metrics +The end-to-end monitoring feature exports the following metrics. + +### Counters +| Name | Description | +| --- | --- | +| `kminion_end_to_end_messages_produced_total ` | Messages KMinion *tried* to send | +| `kminion_end_to_end_messages_acked_total ` | Messages actually sent and acknowledged by the cluster | +| `kminion_end_to_end_messages_received_total ` | Number of messages received (only counts those that match, i.e. that this instance actually produced itself) | +| `kminion_end_to_end_commits_total` | Number of successful offset commits | + + +### Histograms +| Name | Description | +| --- | --- | +| `kminion_end_to_end_produce_latency_seconds ` | Duration until the cluster acknowledged a message. | +| `kminion_end_to_end_commit_latency_seconds` | Duration of offset commits. Has a label for coordinator brokerID that answered the commit request | +| `kminion_end_to_end_roundtrip_latency_seconds ` | Duration from creation of a message, until it was received/consumed again. | + +## Config Properties +All config properties related to this feature are located in `minion.endToEnd`. + +```yaml + endToEnd: + enabled: true + probeInterval: 800ms # how often to send end-to-end test messages + topicManagement: + # You can disable topic management, without disabling the testing feature. + # Only makes sense if you have multiple kminion instances, and for some reason only want one of them to create/configure the topic. + # It is strongly recommended to leave this enabled. + enabled: true + + # Name of the topic kminion uses to send its test messages + # You do *not* need to change this if you are running multiple kminion instances on the same cluster. + # Different instances are perfectly fine with sharing the same topic! + name: kminion-end-to-end + + # How often kminion checks its topic to validate configuration, partition count, and partition assignments + reconciliationInterval: 10m + + # Useful for monitoring the performance of acks (if >1 this is best combined with 'producer.requiredAcks' set to 'all') + replicationFactor: 1 + + # Rarely makes sense to change this, but maybe if you want some sort of cheap load test? + partitionsPerBroker: 1 + + producer: + # This defines the maximum time to wait for an ack response after producing a message, + # and the upper bound for histogram buckets in "produce_latency_seconds" + ackSla: 5s + # Can be to "all" (default) so kafka only reports an end-to-end test message as acknowledged if + # the message was written to all in-sync replicas of the partition. + # Or can be set to "leader" to only require to have written the message to its log. + requiredAcks: all + + consumer: + # Prefix kminion uses when creating its consumer groups. Current kminion instance id will be appended automatically + groupIdPrefix: kminion-end-to-end + # Defines the time limit beyond which a message is considered "lost" (failed the roundtrip), + # also used as the upper bound for histogram buckets in "roundtrip_latency" + roundtripSla: 20s + + # Maximum time an offset commit is allowed to take before considering it failed, + # also used as the upper bound for histogram buckets in "commit_latency_seconds" + commitSla: 10s +``` + diff --git a/docs/metrics.md b/docs/metrics.md index e573332..ea29791 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -82,13 +82,40 @@ kminion_kafka_consumer_group_topic_partition_lag{group_id="bigquery-sink",partit # HELP kminion_kafka_consumer_group_topic_lag The number of messages a consumer group is lagging behind across all partitions in a topic # TYPE kminion_kafka_consumer_group_topic_lag gauge kminion_kafka_consumer_group_topic_lag{group_id="bigquery-sink",topic_name="shop-activity"} 147481 -``` -#### Offset Commits Metrics -The following metrics are only available when KMinion is configured to use `scrapeMode: offsetsTopic`. -``` # HELP kminion_kafka_consumer_group_offset_commits_total The number of offsets committed by a group # TYPE kminion_kafka_consumer_group_offset_commits_total counter kminion_kafka_consumer_group_offset_commits_total{group_id="bigquery-sink"} 1098 ``` +### End-to-End Metrics + +``` +# HELP kminion_end_to_end_messages_produced_total Number of messages that kminion's end-to-end test has tried to send to kafka +# TYPE kminion_end_to_end_messages_produced_total counter +kminion_end_to_end_messages_produced_total 384 + +# HELP kminion_end_to_end_commits_total Counts how many times kminions end-to-end test has committed messages +# TYPE kminion_end_to_end_commits_total counter +kminion_end_to_end_commits_total 18 + +# HELP kminion_end_to_end_messages_acked_total Number of messages kafka acknowledged as produced +# TYPE kminion_end_to_end_messages_acked_total counter +kminion_end_to_end_messages_acked_total 383 + +# HELP kminion_end_to_end_messages_received_total Number of *matching* messages kminion received. Every roundtrip message has a minionID (randomly generated on startup) and a timestamp. Kminion only considers a message a match if it it arrives within the configured roundtrip SLA (and it matches the minionID) +# TYPE kminion_end_to_end_messages_received_total counter +kminion_end_to_end_messages_received_total 383 + +# HELP kminion_end_to_end_produce_latency_seconds Time until we received an ack for a produced message +# TYPE kminion_end_to_end_produce_latency_seconds histogram +kminion_end_to_end_produce_latency_seconds_bucket{partitionId="0",le="0.005"} 0 + +# HELP kminion_end_to_end_commit_latency_seconds Time kafka took to respond to kminion's offset commit +# TYPE kminion_end_to_end_commit_latency_seconds histogram +kminion_end_to_end_commit_latency_seconds_bucket{groupCoordinatorBrokerId="0",le="0.005"} 0 + +# HELP kminion_end_to_end_roundtrip_latency_seconds Time it took between sending (producing) and receiving (consuming) a message +# TYPE kminion_end_to_end_roundtrip_latency_seconds histogram +kminion_end_to_end_roundtrip_latency_seconds_bucket{partitionId="0",le="0.005"} 0 +``` diff --git a/e2e/client_hooks.go b/e2e/client_hooks.go new file mode 100644 index 0000000..7c80d00 --- /dev/null +++ b/e2e/client_hooks.go @@ -0,0 +1,83 @@ +package e2e + +import ( + "net" + "sync/atomic" + "time" + + "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/kmsg" + "go.uber.org/zap" +) + +// in e2e we only use client hooks for logging connect/disconnect messages +type clientHooks struct { + logger *zap.Logger + + lastCoordinatorUpdate time.Time + currentCoordinator *atomic.Value // kgo.BrokerMetadata +} + +func newEndToEndClientHooks(logger *zap.Logger) *clientHooks { + + return &clientHooks{ + logger: logger.Named("e2e-hooks"), + currentCoordinator: &atomic.Value{}, + } +} + +func (c *clientHooks) OnConnect(meta kgo.BrokerMetadata, dialDur time.Duration, _ net.Conn, err error) { + if err != nil { + c.logger.Error("kafka connection failed", zap.String("broker_host", meta.Host), zap.Int32("broker_id", meta.NodeID), zap.Error(err)) + return + } + c.logger.Debug("kafka connection succeeded", + zap.String("host", meta.Host), zap.Int32("broker_id", meta.NodeID), + zap.Duration("dial_duration", dialDur)) +} + +func (c *clientHooks) OnDisconnect(meta kgo.BrokerMetadata, _ net.Conn) { + c.logger.Warn("kafka broker disconnected", zap.Int32("broker_id", meta.NodeID), + zap.String("host", meta.Host)) +} + +// OnWrite is passed the broker metadata, the key for the request that +// was written, the number of bytes written, how long the request +// waited before being written, how long it took to write the request, +// and any error. +// +// The bytes written does not count any tls overhead. +// OnWrite is called after a write to a broker. +// +// OnWrite(meta BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) +func (c *clientHooks) OnWrite(meta kgo.BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) { + keyName := kmsg.NameForKey(key) + if keyName != "OffsetCommit" { + return + } + + // c.logger.Info("hooks onWrite", + // zap.Duration("timeToWrite", timeToWrite), + // zap.NamedError("err", err)) +} + +// OnRead is passed the broker metadata, the key for the response that +// was read, the number of bytes read, how long the Client waited +// before reading the response, how long it took to read the response, +// and any error. +// +// The bytes written does not count any tls overhead. +// OnRead is called after a read from a broker. +// OnRead(meta BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error) +func (c *clientHooks) OnRead(meta kgo.BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error) { + + keyName := kmsg.NameForKey(key) + if keyName != "OffsetCommit" { + return + } + + if err == nil { + c.currentCoordinator.Store(meta) + c.lastCoordinatorUpdate = time.Now() + } +} diff --git a/e2e/config.go b/e2e/config.go new file mode 100644 index 0000000..8daca75 --- /dev/null +++ b/e2e/config.go @@ -0,0 +1,56 @@ +package e2e + +import ( + "fmt" + "time" +) + +type Config struct { + Enabled bool `koanf:"enabled"` + TopicManagement EndToEndTopicConfig `koanf:"topicManagement"` + ProbeInterval time.Duration `koanf:"probeInterval"` + Producer EndToEndProducerConfig `koanf:"producer"` + Consumer EndToEndConsumerConfig `koanf:"consumer"` +} + +func (c *Config) SetDefaults() { + c.Enabled = false + c.ProbeInterval = 2 * time.Second + c.TopicManagement.SetDefaults() + c.Producer.SetDefaults() + c.Consumer.SetDefaults() +} + +func (c *Config) Validate() error { + + if !c.Enabled { + return nil + } + + // If the timeduration is 0s or 0ms or its variation of zero, it will be parsed as 0 + if c.ProbeInterval == 0 { + return fmt.Errorf("failed to validate probeInterval config, the duration can't be zero") + } + + err := c.TopicManagement.Validate() + if err != nil { + return fmt.Errorf("failed to validate topicManagement config: %w", err) + } + + _, err = time.ParseDuration(c.ProbeInterval.String()) + if err != nil { + return fmt.Errorf("failed to parse '%s' to time.Duration: %v", c.ProbeInterval.String(), err) + } + + err = c.Producer.Validate() + if err != nil { + return fmt.Errorf("failed to validate producer config: %w", err) + } + + err = c.Consumer.Validate() + if err != nil { + return fmt.Errorf("failed to validate consumer config: %w", err) + } + + return nil +} diff --git a/e2e/config_consumer.go b/e2e/config_consumer.go new file mode 100644 index 0000000..960577a --- /dev/null +++ b/e2e/config_consumer.go @@ -0,0 +1,32 @@ +package e2e + +import ( + "fmt" + "time" +) + +type EndToEndConsumerConfig struct { + GroupIdPrefix string `koanf:"groupIdPrefix"` + + RoundtripSla time.Duration `koanf:"roundtripSla"` + CommitSla time.Duration `koanf:"commitSla"` +} + +func (c *EndToEndConsumerConfig) SetDefaults() { + c.GroupIdPrefix = "kminion-end-to-end" + c.RoundtripSla = 20 * time.Second + c.CommitSla = 10 * time.Second // no idea what to use as a good default value +} + +func (c *EndToEndConsumerConfig) Validate() error { + + if c.RoundtripSla <= 0 { + return fmt.Errorf("consumer.roundtripSla must be greater than zero") + } + + if c.CommitSla <= 0 { + return fmt.Errorf("consumer.commitSla must be greater than zero") + } + + return nil +} diff --git a/e2e/config_producer.go b/e2e/config_producer.go new file mode 100644 index 0000000..c29796f --- /dev/null +++ b/e2e/config_producer.go @@ -0,0 +1,29 @@ +package e2e + +import ( + "fmt" + "time" +) + +type EndToEndProducerConfig struct { + AckSla time.Duration `koanf:"ackSla"` + RequiredAcks string `koanf:"requiredAcks"` +} + +func (c *EndToEndProducerConfig) SetDefaults() { + c.AckSla = 5 * time.Second + c.RequiredAcks = "all" +} + +func (c *EndToEndProducerConfig) Validate() error { + + if c.RequiredAcks != "all" && c.RequiredAcks != "leader" { + return fmt.Errorf("producer.requiredAcks must be 'all' or 'leader") + } + + if c.AckSla <= 0 { + return fmt.Errorf("producer.ackSla must be greater than zero") + } + + return nil +} diff --git a/e2e/config_topic.go b/e2e/config_topic.go new file mode 100644 index 0000000..27bc12d --- /dev/null +++ b/e2e/config_topic.go @@ -0,0 +1,38 @@ +package e2e + +import ( + "fmt" + "time" +) + +type EndToEndTopicConfig struct { + Enabled bool `koanf:"enabled"` + Name string `koanf:"name"` + ReplicationFactor int `koanf:"replicationFactor"` + PartitionsPerBroker int `koanf:"partitionsPerBroker"` + ReconciliationInterval time.Duration `koanf:"reconciliationInterval"` +} + +func (c *EndToEndTopicConfig) SetDefaults() { + c.Enabled = true + c.Name = "kminion-end-to-end" + c.ReconciliationInterval = 10 * time.Minute +} + +func (c *EndToEndTopicConfig) Validate() error { + + if c.ReplicationFactor < 1 { + return fmt.Errorf("failed to parse replicationFactor, it should be more than 1, retrieved value %v", c.ReplicationFactor) + } + + if c.PartitionsPerBroker < 1 { + return fmt.Errorf("failed to parse partitionsPerBroker, it should be more than 1, retrieved value %v", c.ReplicationFactor) + } + + // If the timeduration is 0s or 0ms or its variation of zero, it will be parsed as 0 + if c.ReconciliationInterval == 0 { + return fmt.Errorf("failed to validate topic.ReconciliationInterval config, the duration can't be zero") + } + + return nil +} diff --git a/e2e/consumer.go b/e2e/consumer.go new file mode 100644 index 0000000..1c04c64 --- /dev/null +++ b/e2e/consumer.go @@ -0,0 +1,102 @@ +package e2e + +import ( + "context" + "encoding/json" + "time" + + "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/kmsg" + "go.uber.org/zap" +) + +func (s *Service) startConsumeMessages(ctx context.Context) { + client := s.client + topicName := s.config.TopicManagement.Name + topic := kgo.ConsumeTopics(kgo.NewOffset().AtEnd(), topicName) + client.AssignPartitions(topic) + + // Create our own consumer group + client.AssignGroup(s.groupId, + kgo.GroupTopics(topicName), + kgo.Balancers(kgo.CooperativeStickyBalancer()), + kgo.DisableAutoCommit(), + ) + s.logger.Info("Starting to consume end-to-end", zap.String("topicName", topicName), zap.String("groupId", s.groupId)) + + for { + select { + case <-ctx.Done(): + return + default: + fetches := client.PollFetches(ctx) + receiveTimestamp := time.Now() + + // Log all errors and continue afterwards as we might get errors and still have some fetch results + errors := fetches.Errors() + for _, err := range errors { + s.logger.Error("kafka fetch error", + zap.String("topic", err.Topic), + zap.Int32("partition", err.Partition), + zap.Error(err.Err)) + } + + // Process messages + fetches.EachRecord(func(record *kgo.Record) { + if record != nil { + s.processMessage(record, receiveTimestamp) + } + }) + } + } +} + +func (s *Service) commitOffsets(ctx context.Context) { + client := s.client + uncommittedOffset := client.UncommittedOffsets() + if uncommittedOffset == nil { + return + } + + startCommitTimestamp := time.Now() + client.CommitOffsets(ctx, uncommittedOffset, func(req *kmsg.OffsetCommitRequest, r *kmsg.OffsetCommitResponse, err error) { + // Got commit response + latency := time.Since(startCommitTimestamp) + + if s.logCommitErrors(r, err) > 0 { + return + } + + // only report commit latency if the coordinator wasn't set too long ago + if time.Since(s.clientHooks.lastCoordinatorUpdate) < 10*time.Second { + coordinator := s.clientHooks.currentCoordinator.Load().(kgo.BrokerMetadata) + s.onOffsetCommit(coordinator.NodeID, latency) + } + }) +} + +// processMessage: +// - deserializes the message +// - checks if it is from us, or from another kminion process running somewhere else +// - hands it off to the service, which then reports metrics on it +func (s *Service) processMessage(record *kgo.Record, receiveTimestamp time.Time) { + var msg EndToEndMessage + if jerr := json.Unmarshal(record.Value, &msg); jerr != nil { + return // maybe older version + } + + if msg.MinionID != s.minionID { + return // not from us + } + + // restore partition, which was not serialized + msg.partition = int(record.Partition) + + created := msg.creationTime() + latency := receiveTimestamp.Sub(created) + + s.onRoundtrip(record.Partition, latency) + + // notify the tracker that the message arrived + s.messageTracker.onMessageArrived(&msg) +} diff --git a/e2e/group_tracker.go b/e2e/group_tracker.go new file mode 100644 index 0000000..83d59d4 --- /dev/null +++ b/e2e/group_tracker.go @@ -0,0 +1,197 @@ +package e2e + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/twmb/franz-go/pkg/kerr" + "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/kmsg" + "go.uber.org/zap" +) + +const ( + oldGroupCheckInterval = 5 * time.Second // how often to check for old kminion groups + oldGroupMaxAge = 20 * time.Second // maximum age after which an old group should be deleted +) + +// groupTracker keeps checking for empty consumerGroups matching the kminion prefix. +// When a group was seen empty for some time, we delete it. +// Why? +// Whenever a kminion instance starts up it creates a consumer-group for itself in order to not "collide" with other kminion instances. +// When an instance restarts (for whatever reason), it creates a new group again, so we'd end up with a lot of unused groups. +type groupTracker struct { + svc *Service // used to obtain stuff like logger, kafka client, ... + logger *zap.Logger + ctx context.Context // cancellation context + + client *kgo.Client // kafka client + + groupId string // our own groupId + potentiallyEmptyGroups map[string]time.Time // groupName -> utc timestamp when the group was first seen + + isNotAuthorized bool // if we get a not authorized response while trying to delete old groups, this will be set to true, essentially disabling the tracker +} + +func newGroupTracker(svc *Service, ctx context.Context) *groupTracker { + + tracker := groupTracker{ + svc: svc, + logger: svc.logger.Named("groupTracker"), + ctx: ctx, + + client: svc.client, + + groupId: svc.groupId, + potentiallyEmptyGroups: make(map[string]time.Time), + + isNotAuthorized: false, + } + + return &tracker +} + +func (g *groupTracker) start() { + g.logger.Debug("starting group tracker") + + deleteOldGroupsTicker := time.NewTicker(oldGroupCheckInterval) + // stop ticker when context is cancelled + go func() { + <-g.ctx.Done() + g.logger.Debug("stopping group tracker, context was cancelled") + deleteOldGroupsTicker.Stop() + }() + + // look for old consumer groups and delete them + go func() { + for range deleteOldGroupsTicker.C { + err := g.checkAndDeleteOldConsumerGroups() + if err != nil { + g.logger.Error("failed to check for old consumer groups: %w", zap.Error(err)) + } + } + }() +} + +func (g *groupTracker) checkAndDeleteOldConsumerGroups() error { + if g.isNotAuthorized { + return nil + } + + groupsRq := kmsg.NewListGroupsRequest() + groupsRq.StatesFilter = []string{"Empty"} + + g.logger.Debug("checking for empty kminion consumer groups...") + + shardedResponse := g.client.RequestSharded(g.ctx, &groupsRq) + + // find groups that start with the kminion prefix + matchingGroups := make([]string, 0, 10) + for _, shard := range shardedResponse { + if shard.Err != nil { + g.logger.Error("error in response to ListGroupsRequest", zap.Error(shard.Err)) + continue + } + + r, ok := shard.Resp.(*kmsg.ListGroupsResponse) + if !ok { + g.logger.Error("cannot cast responseShard.Resp to kmsg.ListGroupsResponse") + continue + } + + for _, group := range r.Groups { + name := group.Group + + if name == g.groupId { + continue // skip our own consumer group + } + + if strings.HasPrefix(name, g.svc.config.Consumer.GroupIdPrefix) { + matchingGroups = append(matchingGroups, name) + } + } + } + + // save new (previously unseen) groups to tracker + g.logger.Debug(fmt.Sprintf("found %v matching kminion consumer groups", len(matchingGroups)), zap.Strings("groups", matchingGroups)) + for _, name := range matchingGroups { + _, exists := g.potentiallyEmptyGroups[name] + if !exists { + // add it with the current timestamp + now := time.Now() + g.potentiallyEmptyGroups[name] = now + g.logger.Debug("new empty kminion group, adding to tracker", zap.String("group", name), zap.Time("firstSeen", now)) + } + } + + // go through saved groups: + // - don't track the ones we don't see anymore (bc they got deleted or are not empty anymore) + // - mark the ones that are too old (have been observed as empty for too long) + groupsToDelete := make([]string, 0) + for name, firstSeen := range g.potentiallyEmptyGroups { + exists, _ := containsStr(matchingGroups, name) + if exists { + // still there, check age and maybe delete it + age := time.Since(firstSeen) + if age > oldGroupMaxAge { + // group was unused for too long, delete it + groupsToDelete = append(groupsToDelete, name) + delete(g.potentiallyEmptyGroups, name) + } + } else { + // does not exist anymore, it must have been deleted, or is in use now (no longer empty) + // don't track it anymore + delete(g.potentiallyEmptyGroups, name) + } + } + + // actually delete the groups we've decided to delete + if len(groupsToDelete) == 0 { + return nil + } + + deleteRq := kmsg.NewDeleteGroupsRequest() + deleteRq.Groups = groupsToDelete + deleteResp := g.client.RequestSharded(g.ctx, &deleteRq) + + // done, now just errors + // if we get a not authorized error we'll disable deleting groups + foundNotAuthorizedError := false + deletedGroups := make([]string, 0) + for _, shard := range deleteResp { + if shard.Err != nil { + g.logger.Error("sharded consumer group delete request failed", zap.Error(shard.Err)) + continue + } + + resp, ok := shard.Resp.(*kmsg.DeleteGroupsResponse) + if !ok { + g.logger.Error("failed to cast shard response to DeleteGroupsResponse while handling an error for deleting groups", zap.String("shardHost", shard.Meta.Host), zap.Int32("broker_id", shard.Meta.NodeID), zap.NamedError("shardError", shard.Err)) + continue + } + + for _, groupResp := range resp.Groups { + err := kerr.ErrorForCode(groupResp.ErrorCode) + if err != nil { + g.logger.Error("failed to delete consumer group", zap.String("shard", shard.Meta.Host), zap.Int32("broker_id", shard.Meta.NodeID), zap.String("group", groupResp.Group), zap.Error(err)) + + if groupResp.ErrorCode == kerr.GroupAuthorizationFailed.Code { + foundNotAuthorizedError = true + } + + } else { + deletedGroups = append(deletedGroups, groupResp.Group) + } + } + } + g.logger.Info("deleted old consumer groups", zap.Strings("deletedGroups", deletedGroups)) + + if foundNotAuthorizedError { + g.logger.Info("disabling trying to delete old kminion consumer-groups since one of the last delete results had an 'GroupAuthorizationFailed' error") + g.isNotAuthorized = true + } + + return nil +} diff --git a/e2e/message_tracker.go b/e2e/message_tracker.go new file mode 100644 index 0000000..c26c6e9 --- /dev/null +++ b/e2e/message_tracker.go @@ -0,0 +1,104 @@ +package e2e + +import ( + "time" + + goCache "github.com/patrickmn/go-cache" + "go.uber.org/zap" +) + +// messageTracker keeps track of messages (wow) +// +// When we successfully send a mesasge, it will be added to this tracker. +// Later, when we receive the message back in the consumer, the message is marked as completed and removed from the tracker. +// If the message does not arrive within the configured `consumer.roundtripSla`, it is counted as lost. +// A lost message is reported in the `roundtrip_latency_seconds` metric with infinite duration, +// but it would probably be a good idea to also have a metric that reports the number of lost messages. +// +// When we fail to send a message, it isn't tracked. +// +// todo: We should probably report that in the roundtrip metric as infinite duration. +// since, if one broker is offline, we can't produce to the partition it leads, +// but we are still able to produce to other partitions led by other brokers. +// This should add at least a little protection against people who only alert on messages_produced and messages_received. +// +// Alternatively, maybe some sort of "failed count" metric could be a good idea? +// +type messageTracker struct { + svc *Service + logger *zap.Logger + cache *goCache.Cache +} + +func newMessageTracker(svc *Service) *messageTracker { + + defaultExpirationTime := svc.config.Consumer.RoundtripSla + cleanupInterval := 1 * time.Second + + t := &messageTracker{ + svc: svc, + logger: svc.logger.Named("message-tracker"), + cache: goCache.New(defaultExpirationTime, cleanupInterval), + } + + t.cache.OnEvicted(func(key string, item interface{}) { + t.onMessageExpired(key, item.(*EndToEndMessage)) + }) + + return t +} + +func (t *messageTracker) addToTracker(msg *EndToEndMessage) { + t.cache.SetDefault(msg.MessageID, msg) +} + +func (t *messageTracker) onMessageArrived(arrivedMessage *EndToEndMessage) { + cachedMessageInterface, _, found := t.cache.GetWithExpiration(arrivedMessage.MessageID) + if !found { + // message expired and was removed from the cache + // it arrived too late, nothing to do here... + return + } + + actualExpireTime := arrivedMessage.creationTime().Add(t.svc.config.Consumer.RoundtripSla) + if time.Now().Before(actualExpireTime) { + // message arrived early enough + + // timeUntilExpire := time.Until(actualExpireTime) + // t.logger.Debug("message arrived", + // zap.Duration("timeLeft", timeUntilExpire), + // zap.Duration("age", ), + // zap.Int("partition", msg.partition), + // zap.String("messageId", msg.MessageID), + // ) + } else { + // Message arrived late, but was still in cache. + // Maybe we could log something like "message arrived after the sla"... + // + // But for now we don't report it as "lost" in the log (because it actually *did* arrive just now, just too late). + // The metrics will report it as 'duration infinite' anyway. + } + + // Set it as arrived, so we don't log it as lost in 'onMessageExpired' and remove it from the tracker + msg := cachedMessageInterface.(*EndToEndMessage) + msg.hasArrived = true + t.cache.Delete(msg.MessageID) +} + +func (t *messageTracker) onMessageExpired(key string, msg *EndToEndMessage) { + + if msg.hasArrived { + // message did, in fact, arrive (doesn't matter here if soon enough of barely expired) + // don't log anything + return + } + + created := msg.creationTime() + age := time.Since(created) + + t.logger.Debug("message lost/expired", + zap.Int64("ageMilliseconds", age.Milliseconds()), + zap.Int("partition", msg.partition), + zap.String("messageId", msg.MessageID), + ) +} diff --git a/e2e/partitioner.go b/e2e/partitioner.go new file mode 100644 index 0000000..7516a5c --- /dev/null +++ b/e2e/partitioner.go @@ -0,0 +1,76 @@ +package e2e + +import ( + "github.com/twmb/franz-go/pkg/kgo" + "go.uber.org/zap" +) + +/* + This file defines a custom partitioner for use in franz-go. + + Why do we need one? + Because we want to have control over which partition exactly a message is sent to, + and the built-in partitioners in franz-go don't support that. + + Why do we want to do that? + We want to test all brokers with our "end-to-end test" test (sending message, receiving it again, measuring latency). + To do that, we need to ensure we send a message to each broker. +*/ + +// Partitioner: Creates a TopicPartitioner for a given topic name +type customPartitioner struct { + logger *zap.Logger + expectedPartitionCount int +} + +func (c *customPartitioner) ForTopic(topicName string) kgo.TopicPartitioner { + return &customTopicPartitioner{ + logger: c.logger, + expectedPartitionCount: c.expectedPartitionCount, + } +} + +// TopicPartitioner: Determines which partition to produce a message to +type customTopicPartitioner struct { + logger *zap.Logger + expectedPartitionCount int +} + +// OnNewBatch is called when producing a record if that record would +// trigger a new batch on its current partition. +func (c *customTopicPartitioner) OnNewBatch() { + // Not interesting for us +} + +// RequiresConsistency returns true if a record must hash to the same +// partition even if a partition is down. +// If true, a record may hash to a partition that cannot be written to +// and will error until the partition comes back. +func (c *customTopicPartitioner) RequiresConsistency(_ *kgo.Record) bool { + // We must always return true, only then will we get the correct 'n' in the 'Partition()' call. + return true +} + +// Partition determines, among a set of n partitions, which index should +// be chosen to use for the partition for r. +func (c *customTopicPartitioner) Partition(r *kgo.Record, n int) int { + // We expect n to be equal to the partition count of the topic + // If, for whatever reason, that is false, we print a warning + if c.expectedPartitionCount != n { + // todo: maybe this should be an error? + // we can probably fix ourselves by just restarting... + c.logger.Warn("end-to-end TopicPartitioner expected a different number of partitions. This means that kminion has either too many or too few producers to produce to all partitions of the topic.", + zap.Int("expectedPartitionCount", c.expectedPartitionCount), + zap.Int("givenPartitionCount", n), + ) + } + + // If the message wants to be produced to a partitionID higher than what is available, immediately error out by returning -1 + // This should never happen since it would mean that the topics partition count has been changed (!?) + p := int(r.Partition) + if p >= n { + return -1 // partition doesn't exist + } + + return p +} diff --git a/e2e/producer.go b/e2e/producer.go new file mode 100644 index 0000000..b74ecb8 --- /dev/null +++ b/e2e/producer.go @@ -0,0 +1,105 @@ +package e2e + +import ( + "context" + "encoding/json" + "time" + + "github.com/google/uuid" + "github.com/twmb/franz-go/pkg/kgo" + "go.uber.org/zap" +) + +type EndToEndMessage struct { + MinionID string `json:"minionID"` // unique for each running kminion instance + MessageID string `json:"messageID"` // unique for each message + Timestamp int64 `json:"createdUtcNs"` // when the message was created, unix nanoseconds + + partition int // used in message tracker + hasArrived bool // used in tracker +} + +func (m *EndToEndMessage) creationTime() time.Time { + return time.Unix(0, m.Timestamp) +} + +// Sends a EndToEndMessage to every partition +func (s *Service) produceLatencyMessages(ctx context.Context) { + + for i := 0; i < s.partitionCount; i++ { + err := s.produceSingleMessage(ctx, i) + if err != nil { + s.logger.Error("failed to produce to end-to-end topic", + zap.String("topicName", s.config.TopicManagement.Name), + zap.Int("partition", i), + zap.Error(err)) + } + } + +} + +func (s *Service) produceSingleMessage(ctx context.Context, partition int) error { + + topicName := s.config.TopicManagement.Name + record, msg := createEndToEndRecord(s.minionID, topicName, partition) + + for { + select { + case <-ctx.Done(): + return nil + default: + startTime := time.Now() + s.endToEndMessagesProduced.Inc() + + errCh := make(chan error) + s.client.Produce(ctx, record, func(r *kgo.Record, err error) { + ackDuration := time.Since(startTime) + + errCh <- err + + // only notify ack if it is successful + if err == nil { + // notify service about ack + s.onAck(r.Partition, ackDuration) + + // add to tracker + s.messageTracker.addToTracker(msg) + } + }) + + err := <-errCh + if err != nil { + s.logger.Error("error producing record", zap.Error(err)) + return err + } + return nil + } + } + +} + +func createEndToEndRecord(minionID string, topicName string, partition int) (*kgo.Record, *EndToEndMessage) { + + message := &EndToEndMessage{ + MinionID: minionID, + MessageID: uuid.NewString(), + Timestamp: time.Now().UnixNano(), + + partition: partition, + } + + mjson, err := json.Marshal(message) + if err != nil { + // Should never happen since the struct is so simple, + // but if it does, something is completely broken anyway + panic("cannot serialize EndToEndMessage") + } + + record := &kgo.Record{ + Topic: topicName, + Value: []byte(mjson), + Partition: int32(partition), // we set partition for producing so our customPartitioner can make use of it + } + + return record, message +} diff --git a/e2e/service.go b/e2e/service.go new file mode 100644 index 0000000..133b644 --- /dev/null +++ b/e2e/service.go @@ -0,0 +1,234 @@ +package e2e + +import ( + "context" + "fmt" + "time" + + "github.com/cloudhut/kminion/v2/kafka" + "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/twmb/franz-go/pkg/kgo" + "go.uber.org/zap" +) + +type Service struct { + // General + config Config + logger *zap.Logger + + kafkaSvc *kafka.Service // creates kafka client for us + client *kgo.Client + + // Service + minionID string // unique identifier, reported in metrics, in case multiple instances run at the same time + groupId string // our own consumer group + groupTracker *groupTracker // tracks consumer groups starting with the kminion prefix and deletes them if they are unused for some time + messageTracker *messageTracker // tracks successfully produced messages, + clientHooks *clientHooks // logs broker events, tracks the coordinator (i.e. which broker last responded to our offset commit) + partitioner *customPartitioner // takes care of sending our end-to-end messages to the right partition + partitionCount int // number of partitions of our test topic, used to send messages to all partitions + + // Metrics + endToEndMessagesProduced prometheus.Counter + endToEndMessagesAcked prometheus.Counter + endToEndMessagesReceived prometheus.Counter + endToEndCommits prometheus.Counter + + endToEndAckLatency *prometheus.HistogramVec + endToEndRoundtripLatency *prometheus.HistogramVec + endToEndCommitLatency *prometheus.HistogramVec +} + +// NewService creates a new instance of the e2e moinitoring service (wow) +func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricNamespace string, ctx context.Context) (*Service, error) { + + client, hooks, partitioner, err := createKafkaClient(cfg, logger, kafkaSvc, ctx) + if err != nil { + return nil, fmt.Errorf("failed to create kafka client for e2e: %w", err) + } + + minionId := uuid.NewString() + + svc := &Service{ + config: cfg, + logger: logger.Named("e2e"), + kafkaSvc: kafkaSvc, + client: client, + + minionID: minionId, + groupId: fmt.Sprintf("%v-%v", cfg.Consumer.GroupIdPrefix, minionId), + clientHooks: hooks, + partitioner: partitioner, + } + + svc.groupTracker = newGroupTracker(svc, ctx) + svc.messageTracker = newMessageTracker(svc) + + makeCounter := func(name string, help string) prometheus.Counter { + return promauto.NewCounter(prometheus.CounterOpts{ + Namespace: metricNamespace, + Subsystem: "end_to_end", + Name: name, + Help: help, + }) + } + makeHistogramVec := func(name string, maxLatency time.Duration, labelNames []string, help string) *prometheus.HistogramVec { + return promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: metricNamespace, + Subsystem: "end_to_end", + Name: name, + Help: help, + Buckets: createHistogramBuckets(maxLatency), + }, labelNames) + } + + // Low-level info + // Users can construct alerts like "can't produce messages" themselves from those + svc.endToEndMessagesProduced = makeCounter("messages_produced_total", "Number of messages that kminion's end-to-end test has tried to send to kafka") + svc.endToEndMessagesAcked = makeCounter("messages_acked_total", "Number of messages kafka acknowledged as produced") + svc.endToEndMessagesReceived = makeCounter("messages_received_total", "Number of *matching* messages kminion received. Every roundtrip message has a minionID (randomly generated on startup) and a timestamp. Kminion only considers a message a match if it it arrives within the configured roundtrip SLA (and it matches the minionID)") + svc.endToEndCommits = makeCounter("commits_total", "Counts how many times kminions end-to-end test has committed messages") + + // Latency Histograms + // More detailed info about how long stuff took + // Since histograms also have an 'infinite' bucket, they can be used to detect small hickups "lost" messages + svc.endToEndAckLatency = makeHistogramVec("produce_latency_seconds", cfg.Producer.AckSla, []string{"partitionId"}, "Time until we received an ack for a produced message") + svc.endToEndRoundtripLatency = makeHistogramVec("roundtrip_latency_seconds", cfg.Consumer.RoundtripSla, []string{"partitionId"}, "Time it took between sending (producing) and receiving (consuming) a message") + svc.endToEndCommitLatency = makeHistogramVec("commit_latency_seconds", cfg.Consumer.CommitSla, []string{"groupCoordinatorBrokerId"}, "Time kafka took to respond to kminion's offset commit") + + return svc, nil +} + +func createKafkaClient(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, ctx context.Context) (*kgo.Client, *clientHooks, *customPartitioner, error) { + + // Add RequiredAcks, as options can't be altered later + kgoOpts := []kgo.Opt{} + + if cfg.Producer.RequiredAcks == "all" { + kgoOpts = append(kgoOpts, kgo.RequiredAcks(kgo.AllISRAcks())) + } else { + kgoOpts = append(kgoOpts, kgo.RequiredAcks(kgo.LeaderAck())) + kgoOpts = append(kgoOpts, kgo.DisableIdempotentWrite()) + } + + // produce request timeout + kgoOpts = append(kgoOpts, kgo.ProduceRequestTimeout(cfg.Producer.AckSla)) + + // Prepare hooks + e2eHooks := newEndToEndClientHooks(logger) + kgoOpts = append(kgoOpts, kgo.WithHooks(e2eHooks)) + + // Use a custom partitioner that uses the 'PartitionID' of a record to directly assign the right partition + partitioner := &customPartitioner{ + logger: logger.Named("e2e-partitioner"), + expectedPartitionCount: 0, // not yet known, will be set before we start producing + } + kgoOpts = append(kgoOpts, kgo.RecordPartitioner(partitioner)) + + // Create kafka service and check if client can successfully connect to Kafka cluster + client, err := kafkaSvc.CreateAndTestClient(logger, kgoOpts, ctx) + return client, e2eHooks, partitioner, err +} + +// Start starts the service (wow) +func (s *Service) Start(ctx context.Context) error { + + // Ensure topic exists and is configured correctly + if err := s.validateManagementTopic(ctx); err != nil { + return fmt.Errorf("could not validate end-to-end topic: %w", err) + } + + // Get up-to-date metadata and inform our custom partitioner about the partition count + topicMetadata, err := s.getTopicMetadata(ctx) + if err != nil { + return fmt.Errorf("could not get topic metadata after validation: %w", err) + } + partitions := len(topicMetadata.Topics[0].Partitions) + s.partitioner.expectedPartitionCount = partitions + s.partitionCount = partitions + + // finally start everything else (producing, consuming, continous validation, consumer group tracking) + go s.initEndToEnd(ctx) + + return nil +} + +func (s *Service) initEndToEnd(ctx context.Context) { + + validateTopicTicker := time.NewTicker(s.config.TopicManagement.ReconciliationInterval) + produceTicker := time.NewTicker(s.config.ProbeInterval) + commitTicker := time.NewTicker(5 * time.Second) + // stop tickers when context is cancelled + go func() { + <-ctx.Done() + produceTicker.Stop() + validateTopicTicker.Stop() + commitTicker.Stop() + }() + + // keep checking end-to-end topic + go func() { + for range validateTopicTicker.C { + err := s.validateManagementTopic(ctx) + if err != nil { + s.logger.Error("failed to validate end-to-end topic", zap.Error(err)) + } + } + }() + + // keep track of groups, delete old unused groups + go s.groupTracker.start() + + // start consuming topic + go s.startConsumeMessages(ctx) + + // start comitting offsets + go func() { + for range commitTicker.C { + s.commitOffsets(ctx) + } + }() + + // start producing to topic + go func() { + for range produceTicker.C { + s.produceLatencyMessages(ctx) + } + }() + +} + +// called from e2e when a message is acknowledged +func (s *Service) onAck(partitionId int32, duration time.Duration) { + s.endToEndMessagesAcked.Inc() + s.endToEndAckLatency.WithLabelValues(fmt.Sprintf("%v", partitionId)).Observe(duration.Seconds()) +} + +// called from e2e when a message completes a roundtrip (send to kafka, receive msg from kafka again) +func (s *Service) onRoundtrip(partitionId int32, duration time.Duration) { + if duration > s.config.Consumer.RoundtripSla { + return // message is too old + } + + s.endToEndMessagesReceived.Inc() + s.endToEndRoundtripLatency.WithLabelValues(fmt.Sprintf("%v", partitionId)).Observe(duration.Seconds()) +} + +// called from e2e when an offset commit is confirmed +func (s *Service) onOffsetCommit(brokerId int32, duration time.Duration) { + + // todo: + // if the commit took too long, don't count it in 'commits' but add it to the histogram? + // and how do we want to handle cases where we get an error?? + // should we have another metric that tells us about failed commits? or a label on the counter? + brokerIdStr := fmt.Sprintf("%v", brokerId) + s.endToEndCommitLatency.WithLabelValues(brokerIdStr).Observe(duration.Seconds()) + + if duration > s.config.Consumer.CommitSla { + return + } + + s.endToEndCommits.Inc() +} diff --git a/e2e/topic.go b/e2e/topic.go new file mode 100644 index 0000000..bdc5cd6 --- /dev/null +++ b/e2e/topic.go @@ -0,0 +1,328 @@ +package e2e + +import ( + "context" + "fmt" + "time" + + "github.com/twmb/franz-go/pkg/kerr" + "github.com/twmb/franz-go/pkg/kmsg" + "go.uber.org/zap" +) + +// Check our end-to-end test topic +// - does it exist? +// - is it configured correctly? +// - does it have enough partitions? +// - is the replicationFactor correct? +// - are assignments good? +// - is each broker leading at least one partition? +// - are replicas distributed correctly? +func (s *Service) validateManagementTopic(ctx context.Context) error { + s.logger.Debug("validating end-to-end topic...") + + meta, err := s.getTopicMetadata(ctx) + if err != nil { + return fmt.Errorf("validateManagementTopic cannot get metadata of e2e topic: %w", err) + } + + // Create topic if it doesn't exist + if len(meta.Topics) == 0 { + if err = s.createManagementTopic(ctx, meta); err != nil { + return err + } + } + + // Ensure the topic has enough partitions + if err = s.ensureEnoughPartitions(ctx, meta); err != nil { + return err + } + + // Validate assignments + if err = s.validatePartitionAssignments(ctx, meta); err != nil { + return err + } + + return nil +} + +func (s *Service) validatePartitionAssignments(ctx context.Context, meta *kmsg.MetadataResponse) error { + // We use a very simple strategy to distribute all partitions and its replicas to the brokers + // + // For example if we had: + // - 5 brokers + // - 5 partitions (partitionsPerBroker = 1) + // - replicationFactor of 5 + // then our assignments would look like the table below. + // Why does this example use 5 partitions? + // Because for end-to-end testing to make sense, we'd like to report the message roundtrip latency for each broker. + // That's why we ensure that we always have at least as many partitions as there are brokers, so every broker + // can be the leader of at least one partition. + // + // The numbers after each partition are the brokerIds (0, 1, 2, 3, 4) + // The first broker in an assignment array is the leader for that partition, + // the following broker ids are hosting the partitions' replicas. + // + // Partition 0: [0, 1, 2, 3, 4] + // Partition 1: [1, 2, 3, 4, 0] + // Partition 2: [2, 3, 4, 0, 1] + // Partition 3: [3, 4, 0, 1, 2] + // Partition 4: [4, 0, 1, 2, 3] + // + // In addition to being very simple, this also has the benefit that each partitionID neatly corresponds + // its leaders brokerID - at least most of the time. + // When a broker suddenly goes offline, or a new one is added to the cluster, etc the assignments + // might be off for a short period of time, but the assignments will be fixed automatically + // in the next, periodic, topic validation (configured by 'topicManagement.reconciliationInterval') + // + + topicName := s.config.TopicManagement.Name + topicMeta := meta.Topics[0] + realPartitionCount := len(topicMeta.Partitions) + realReplicationFactor := len(topicMeta.Partitions[0].Replicas) + + // 1. Calculate the expected assignments + allPartitionAssignments := make([][]int32, realPartitionCount) + for i := range allPartitionAssignments { + allPartitionAssignments[i] = make([]int32, realReplicationFactor) + } + + // simple helper function that just keeps returning values from 0 to brokerCount forever. + brokerIterator := func(start int32) func() int32 { + brokerIndex := start + return func() int32 { + result := brokerIndex // save result we'll return now + brokerIndex = (brokerIndex + 1) % int32(len(meta.Brokers)) // prepare for next call: add one, and wrap around + return result // return current result + } + } + + startBroker := brokerIterator(0) + for _, partitionAssignments := range allPartitionAssignments { + // determine the leader broker: first partition will get 0, next one will get 1, ... + start := startBroker() + // and create an iterator that goes over all brokers, starting at our leader + nextReplica := brokerIterator(start) + + for i := range partitionAssignments { + partitionAssignments[i] = nextReplica() + } + } + + // 2. Check + // Now that every partition knows which brokers are hosting its replicas, and which broker is the leader (i.e. is hosting the primary "replica"), + // we just have to check if the current/real assignments are equal to our desired assignments (and then apply them if not). + assignmentsAreEqual := true + for partitionId := 0; partitionId < realPartitionCount; partitionId++ { + expectedAssignments := allPartitionAssignments[partitionId] + actualAssignments := topicMeta.Partitions[partitionId].Replicas + + // Check if any replica is assigned to the wrong broker + for i := 0; i < realReplicationFactor; i++ { + expectedBroker := expectedAssignments[i] + actualBroker := actualAssignments[i] + if expectedBroker != actualBroker { + assignmentsAreEqual = false + } + } + } + if assignmentsAreEqual { + // assignments are already exactly as they are supposed to be + return nil + } + + // 3. Apply + // Some partitions have their replicas hosted on the wrong brokers! + // Apply our desired replica configuration + partitionReassignments := make([]kmsg.AlterPartitionAssignmentsRequestTopicPartition, realPartitionCount) + for i := range partitionReassignments { + partitionReassignments[i] = kmsg.NewAlterPartitionAssignmentsRequestTopicPartition() + partitionReassignments[i].Partition = int32(i) + partitionReassignments[i].Replicas = allPartitionAssignments[i] + } + + topicReassignment := kmsg.NewAlterPartitionAssignmentsRequestTopic() + topicReassignment.Topic = topicName + topicReassignment.Partitions = partitionReassignments + + reassignRq := kmsg.NewAlterPartitionAssignmentsRequest() + reassignRq.Topics = []kmsg.AlterPartitionAssignmentsRequestTopic{topicReassignment} + + reassignRes, err := reassignRq.RequestWith(ctx, s.client) + if err != nil { + // error while sending + return fmt.Errorf("topic reassignment request failed: %w", err) + } + reassignErr := kerr.ErrorForCode(reassignRes.ErrorCode) + if reassignErr != nil || (reassignRes.ErrorMessage != nil && *reassignRes.ErrorMessage != "") { + // global error + return fmt.Errorf(fmt.Sprintf("topic reassignment failed with ErrorMessage=\"%v\": %v", + *reassignRes.ErrorMessage, + safeUnwrap(reassignErr), + )) + } + + // errors for individual partitions + for _, t := range reassignRes.Topics { + for _, p := range t.Partitions { + pErr := kerr.ErrorForCode(p.ErrorCode) + if pErr != nil || (p.ErrorMessage != nil && *p.ErrorMessage != "") { + return fmt.Errorf(fmt.Sprintf("topic reassignment failed on partition %v with ErrorMessage=\"%v\": %v", + p.Partition, + safeUnwrap(pErr), + *p.ErrorMessage), + ) + } + } + } + + return nil +} + +func (s *Service) ensureEnoughPartitions(ctx context.Context, meta *kmsg.MetadataResponse) error { + partitionsPerBroker := s.config.TopicManagement.PartitionsPerBroker + expectedPartitions := partitionsPerBroker * len(meta.Brokers) + + if len(meta.Topics[0].Partitions) >= expectedPartitions { + return nil // no need to add more + } + + partitionsToAdd := expectedPartitions - len(meta.Topics[0].Partitions) + s.logger.Warn("e2e test topic does not have enough partitions, partitionCount is less than brokerCount * partitionsPerBroker. will add partitions to the topic...", + zap.Int("expectedPartitionCount", expectedPartitions), + zap.Int("actualPartitionCount", len(meta.Topics[0].Partitions)), + zap.Int("brokerCount", len(meta.Brokers)), + zap.Int("config.partitionsPerBroker", s.config.TopicManagement.PartitionsPerBroker), + zap.Int("partitionsToAdd", partitionsToAdd), + ) + + topic := kmsg.NewCreatePartitionsRequestTopic() + topic.Topic = s.config.TopicManagement.Name + topic.Count = int32(expectedPartitions) + + // For each partition we're about to add, we need to define its replicas + for i := 0; i < partitionsToAdd; i++ { + assignment := kmsg.NewCreatePartitionsRequestTopicAssignment() + // In order to keep the code as simple as possible, just copy the assignments from the first partition. + // After the topic is created, there is another validation step that will take care of bad assignments! + assignment.Replicas = meta.Topics[0].Partitions[0].Replicas + topic.Assignment = append(topic.Assignment, assignment) + } + + // Send request + create := kmsg.NewCreatePartitionsRequest() + create.Topics = []kmsg.CreatePartitionsRequestTopic{topic} + createPartitionsResponse, err := create.RequestWith(ctx, s.client) + + // Check for errors + if err != nil { + return fmt.Errorf("request to create more partitions for e2e topic failed: %w", err) + } + nestedErrors := 0 + for _, topicResponse := range createPartitionsResponse.Topics { + tErr := kerr.ErrorForCode(topicResponse.ErrorCode) + if tErr != nil || (topicResponse.ErrorMessage != nil && *topicResponse.ErrorMessage != "") { + s.logger.Error("error in createPartitionsResponse", + zap.String("topic", topicResponse.Topic), + zap.Stringp("errorMessage", topicResponse.ErrorMessage), + zap.NamedError("topicError", tErr), + ) + nestedErrors++ + } + } + if nestedErrors > 0 { + return fmt.Errorf("request to add more partitions to e2e topic had some nested errors, see the %v log lines above", nestedErrors) + } + + return nil +} + +func (s *Service) createManagementTopic(ctx context.Context, allMeta *kmsg.MetadataResponse) error { + topicCfg := s.config.TopicManagement + brokerCount := len(allMeta.Brokers) + totalPartitions := brokerCount * topicCfg.PartitionsPerBroker + + s.logger.Info("e2e topic does not exist, creating it...", + zap.String("topicName", topicCfg.Name), + zap.Int("partitionsPerBroker", topicCfg.PartitionsPerBroker), + zap.Int("replicationFactor", topicCfg.ReplicationFactor), + zap.Int("brokerCount", brokerCount), + zap.Int("totalPartitions", totalPartitions), + ) + + topic := kmsg.NewCreateTopicsRequestTopic() + topic.Topic = topicCfg.Name + topic.NumPartitions = int32(totalPartitions) + topic.ReplicationFactor = int16(topicCfg.ReplicationFactor) + topic.Configs = createTopicConfig(topicCfg) + + req := kmsg.NewCreateTopicsRequest() + req.Topics = []kmsg.CreateTopicsRequestTopic{topic} + + res, err := req.RequestWith(ctx, s.client) + if err != nil { + return fmt.Errorf("failed to create e2e topic: %w", err) + } + if len(res.Topics) > 0 { + if res.Topics[0].ErrorMessage != nil && *res.Topics[0].ErrorMessage != "" { + return fmt.Errorf("failed to create e2e topic: %s", *res.Topics[0].ErrorMessage) + } + } + + return nil +} + +func (s *Service) getTopicMetadata(ctx context.Context) (*kmsg.MetadataResponse, error) { + topicReq := kmsg.NewMetadataRequestTopic() + topicName := s.config.TopicManagement.Name + topicReq.Topic = &topicName + + req := kmsg.NewMetadataRequest() + req.Topics = []kmsg.MetadataRequestTopic{topicReq} + + return req.RequestWith(ctx, s.client) +} + +func (s *Service) getTopicsConfigs(ctx context.Context, configNames []string) (*kmsg.DescribeConfigsResponse, error) { + req := kmsg.NewDescribeConfigsRequest() + req.IncludeDocumentation = false + req.IncludeSynonyms = false + req.Resources = []kmsg.DescribeConfigsRequestResource{ + { + ResourceType: kmsg.ConfigResourceTypeTopic, + ResourceName: s.config.TopicManagement.Name, + ConfigNames: configNames, + }, + } + + return req.RequestWith(ctx, s.client) +} + +func createTopicConfig(cfgTopic EndToEndTopicConfig) []kmsg.CreateTopicsRequestTopicConfig { + + topicConfig := func(name string, value interface{}) kmsg.CreateTopicsRequestTopicConfig { + prop := kmsg.NewCreateTopicsRequestTopicConfig() + prop.Name = name + valStr := fmt.Sprintf("%v", value) + prop.Value = &valStr + return prop + } + + minISR := 1 + if cfgTopic.ReplicationFactor >= 3 { + // Only with 3+ replicas does it make sense to require acks from 2 brokers + // todo: think about if we should change how 'producer.requiredAcks' works. + // we probably don't even need this configured on the topic directly... + minISR = 2 + } + + // Even though kminion's end-to-end feature actually does not require any + // real persistence beyond a few minutes; it might be good too keep messages + // around a bit for debugging. + return []kmsg.CreateTopicsRequestTopicConfig{ + topicConfig("cleanup.policy", "delete"), + topicConfig("segment.ms", (time.Hour * 12).Milliseconds()), // new segment every 12h + topicConfig("retention.ms", (time.Hour * 24).Milliseconds()), // discard segments older than 24h + topicConfig("min.insync.replicas", minISR), + } +} diff --git a/e2e/utils.go b/e2e/utils.go new file mode 100644 index 0000000..642e6b9 --- /dev/null +++ b/e2e/utils.go @@ -0,0 +1,78 @@ +package e2e + +import ( + "math" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/twmb/franz-go/pkg/kerr" + "github.com/twmb/franz-go/pkg/kmsg" + "go.uber.org/zap" +) + +// create histogram buckets for metrics reported by 'end-to-end' +// todo: +/* +- custom, much simpler, exponential buckets + we know: + - we want to go from 5ms to 'max' + - we want to double each time + - doubling 5ms might not get us to 'max' exactly + questions: + - can we slightly adjust the factor so we hit 'max' exactly? + - or can we adjust 'max'? + (and if so, better to overshoot or undershoot?) + - or should we just set the last bucket to 'max' exactly? +*/ +func createHistogramBuckets(maxLatency time.Duration) []float64 { + // Since this is an exponential bucket we need to take Log base2 or binary as the upper bound + // Divide by 10 for the argument because the base is counted as 20ms and we want to normalize it as base 2 instead of 20 + // +2 because it starts at 5ms or 0.005 sec, to account 5ms and 10ms before it goes to the base which in this case is 0.02 sec or 20ms + // and another +1 to account for decimal points on int parsing + latencyCount := math.Logb(float64(maxLatency.Milliseconds() / 10)) + count := int(latencyCount) + 3 + bucket := prometheus.ExponentialBuckets(0.005, 2, count) + + return bucket +} + +func containsStr(ar []string, x string) (bool, int) { + for i, item := range ar { + if item == x { + return true, i + } + } + return false, -1 +} + +// logs all errors, returns number of errors +func (s *Service) logCommitErrors(r *kmsg.OffsetCommitResponse, err error) int { + if err != nil { + s.logger.Error("offset commit failed", zap.Error(err)) + return 1 + } + + errCount := 0 + for _, t := range r.Topics { + for _, p := range t.Partitions { + err := kerr.ErrorForCode(p.ErrorCode) + if err != nil { + s.logger.Error("error committing partition offset", + zap.String("topic", t.Topic), + zap.Int32("partitionId", p.Partition), + zap.Error(err), + ) + errCount++ + } + } + } + + return errCount +} + +func safeUnwrap(err error) string { + if err == nil { + return "" + } + return err.Error() +} diff --git a/go.mod b/go.mod index 2080a5f..9713de2 100644 --- a/go.mod +++ b/go.mod @@ -8,23 +8,25 @@ require ( github.com/jcmturner/gokrb5/v8 v8.4.2 github.com/knadh/koanf v0.16.0 github.com/kr/text v0.2.0 // indirect - github.com/mitchellh/copystructure v1.1.2 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/mapstructure v1.4.1 - github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231 - github.com/pelletier/go-toml v1.8.1 // indirect + github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc + github.com/patrickmn/go-cache v2.1.0+incompatible + github.com/pelletier/go-toml v1.9.1 // indirect github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.10.0 - github.com/prometheus/common v0.20.0 // indirect - github.com/stretchr/testify v1.7.0 // indirect - github.com/twmb/franz-go v0.8.0 + github.com/prometheus/common v0.24.0 // indirect + github.com/twmb/franz-go v0.7.1 go.uber.org/atomic v1.7.0 - go.uber.org/multierr v1.6.0 // indirect + go.uber.org/multierr v1.7.0 // indirect go.uber.org/zap v1.16.0 - golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect - golang.org/x/mod v0.4.1 // indirect + golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a // indirect + golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect + golang.org/x/mod v0.4.2 // indirect + golang.org/x/net v0.0.0-20210510120150-4163338589ed // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect - honnef.co/go/tools v0.1.1 // indirect + honnef.co/go/tools v0.1.4 // indirect ) diff --git a/go.sum b/go.sum index ad2ff34..02b333b 100644 --- a/go.sum +++ b/go.sum @@ -188,8 +188,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.13.0 h1:2T7tUoQrQT+fQWdaY5rjWztFGAFwbGD04iPJg90ZiOs= -github.com/klauspost/compress v1.13.0/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/klauspost/compress v1.12.2 h1:2KCfW3I9M7nSc5wOqXAlW2v2U6v+w6cbjvbfp+OykW8= +github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/knadh/koanf v0.16.0 h1:qQqGvE8hs/y5pZTG5kT354vqUqsDKQcXX8IOq2Rg11Y= github.com/knadh/koanf v0.16.0/go.mod h1:DMZ6jQlhA3PqxnKR63luVaBtDemi/m8v/FpXI7B5Ez8= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -215,8 +215,8 @@ github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3N github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw= github.com/mitchellh/copystructure v1.1.1/go.mod h1:EBArHfARyrSWO/+Wyr9zwEkc6XMFB9XyNgFNmRkZZU4= -github.com/mitchellh/copystructure v1.1.2 h1:Th2TIvG1+6ma3e/0/bopBKohOTY7s4dA8V2q4EUcBJ0= -github.com/mitchellh/copystructure v1.1.2/go.mod h1:EBArHfARyrSWO/+Wyr9zwEkc6XMFB9XyNgFNmRkZZU4= +github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= +github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= @@ -230,8 +230,9 @@ github.com/mitchellh/mapstructure v1.2.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag= github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= -github.com/mitchellh/reflectwalk v1.0.1 h1:FVzMWA5RllMAKIdUSC8mdWo3XtwoecrH79BY70sEEpE= github.com/mitchellh/reflectwalk v1.0.1/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= +github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= @@ -260,21 +261,23 @@ github.com/openzipkin-contrib/zipkin-go-opentracing v0.4.5/go.mod h1:/wsWhb9smxS github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= -github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231 h1:fa50YL1pzKW+1SsBnJDOHppJN9stOEwS+CRWyUtyYGU= -github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI= +github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc h1:Ak86L+yDSOzKFa7WM5bf5itSOo1e3Xh8bm5YCMUXIjQ= +github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI= github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= -github.com/pelletier/go-toml v1.8.1 h1:1Nf83orprkJyknT6h7zbuEGUEjcyVlCxSUGTENmNCRM= -github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc= +github.com/pelletier/go-toml v1.9.1 h1:a6qW1EVNZWH9WGI6CsYdD8WAylkoXBS5yv0XHlh17Tc= +github.com/pelletier/go-toml v1.9.1/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac= github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= -github.com/pierrec/lz4/v4 v4.1.7 h1:UDV9geJWhFIufAliH7HQlz9wP3JA0t748w+RwbWMLow= -github.com/pierrec/lz4/v4 v4.1.7/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.6 h1:ueMTcBBFrbT8K4uGDNNZPa8Z7LtPV7Cl0TDjaeHxP44= +github.com/pierrec/lz4/v4 v4.1.6/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -302,8 +305,8 @@ github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8 github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.18.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= -github.com/prometheus/common v0.20.0 h1:pfeDeUdQcIxOMutNjCejsEFp7qeP+/iltHSSmLpE+hU= -github.com/prometheus/common v0.20.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= +github.com/prometheus/common v0.24.0 h1:aIycr3wRFxPUq8XlLQlGQ9aNXV3dFi5y62pe/SB262k= +github.com/prometheus/common v0.24.0/go.mod h1:H6QK/N6XVT42whUeIdI3dp36w49c+/iMDk7UAI2qm7Q= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= @@ -345,8 +348,8 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= -github.com/twmb/franz-go v0.8.0 h1:DFe9ptohEBtzuFyDKpUM1d39h+jkuEg/fEudDHqKhyw= -github.com/twmb/franz-go v0.8.0/go.mod h1:v6QnB3abhlVAzlIEIO5L/1Emu8NlkreCI2HSps9utH0= +github.com/twmb/franz-go v0.7.1 h1:oUS5zLzqX00NWnv8XmJKMjGpHwcjbMyUZVng7YLighI= +github.com/twmb/franz-go v0.7.1/go.mod h1:StwVC7bQkTM3I6DJyNGvmgpnza7Tz11YfLACXwMvQ0k= github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg= github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= @@ -366,8 +369,8 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= -go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= -go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec= +go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= @@ -381,6 +384,7 @@ golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a h1:kr2P4QFmQr29mSLA43kwrOcgcReGTfbE9N577tCTuBc= golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -389,13 +393,13 @@ golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvx golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 h1:2M3HP5CCK1Si9FQhwnzYhXdG6DXeebvUHFpre8QvbyI= -golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= +golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug= +golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.4.1 h1:Kvvh58BN8Y9/lBi7hTekvtMpm07eUZ0ck5pRHpsMWrY= -golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -416,8 +420,9 @@ golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20210525063256-abc453219eb5 h1:wjuX4b5yYQnEQHzd+CBcrcC6OVR2J1CN6mUy0oSxIPo= -golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= +golang.org/x/net v0.0.0-20210510120150-4163338589ed h1:p9UgmWI9wKpfYmgaV/IZKGdXc5qEK45tDwwwDyjS26I= +golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -458,8 +463,9 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 h1:hZR0X1kPW+nwyJ9xRxqZk1vx5RUObAPBdKVvXPDUH/E= +golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20181227161524-e6919f6577db/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -552,7 +558,7 @@ honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= -honnef.co/go/tools v0.1.1 h1:EVDuO03OCZwpV2t/tLLxPmPiomagMoBOgfPt0FM+4IY= -honnef.co/go/tools v0.1.1/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= +honnef.co/go/tools v0.1.4 h1:SadWOkti5uVN1FAMgxn165+Mw00fuQKyk4Gyn/inxNQ= +honnef.co/go/tools v0.1.4/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU= diff --git a/kafka/client_config_helper.go b/kafka/client_config_helper.go index 06b77be..d29f4d2 100644 --- a/kafka/client_config_helper.go +++ b/kafka/client_config_helper.go @@ -5,6 +5,10 @@ import ( "crypto/x509" "encoding/pem" "fmt" + "io/ioutil" + "net" + "time" + "github.com/jcmturner/gokrb5/v8/client" "github.com/jcmturner/gokrb5/v8/keytab" "github.com/twmb/franz-go/pkg/kgo" @@ -14,16 +18,14 @@ import ( "github.com/twmb/franz-go/pkg/sasl/plain" "github.com/twmb/franz-go/pkg/sasl/scram" "go.uber.org/zap" - "io/ioutil" - "net" - "time" krbconfig "github.com/jcmturner/gokrb5/v8/config" ) // NewKgoConfig creates a new Config for the Kafka Client as exposed by the franz-go library. // If TLS certificates can't be read an error will be returned. -func NewKgoConfig(cfg Config, logger *zap.Logger, hooks kgo.Hook) ([]kgo.Opt, error) { +// logger is only used to print warnings about TLS. +func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) { opts := []kgo.Opt{ kgo.SeedBrokers(cfg.Brokers...), kgo.MaxVersions(kversion.V2_7_0()), @@ -34,13 +36,10 @@ func NewKgoConfig(cfg Config, logger *zap.Logger, hooks kgo.Hook) ([]kgo.Opt, er // Create Logger kgoLogger := KgoZapLogger{ - logger: logger.With(zap.String("source", "kafka_client")).Sugar(), + logger: logger.Sugar(), } opts = append(opts, kgo.WithLogger(kgoLogger)) - // Attach hooks - opts = append(opts, kgo.WithHooks(hooks)) - // Add Rack Awareness if configured if cfg.RackID != "" { opts = append(opts, kgo.Rack(cfg.RackID)) diff --git a/kafka/service.go b/kafka/service.go index 964112e..3f4fc32 100644 --- a/kafka/service.go +++ b/kafka/service.go @@ -3,59 +3,73 @@ package kafka import ( "context" "fmt" + "strings" + "time" + "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" "github.com/twmb/franz-go/pkg/kversion" "go.uber.org/zap" - "strings" ) type Service struct { cfg Config - Client *kgo.Client logger *zap.Logger } -func NewService(cfg Config, logger *zap.Logger) (*Service, error) { - // Create Kafka Client - hooksChildLogger := logger.With(zap.String("source", "kafka_client_hooks")) - clientHooks := newClientHooks(hooksChildLogger, "") +func NewService(cfg Config, logger *zap.Logger) *Service { + return &Service{ + cfg: cfg, + logger: logger.Named("kafka-service"), + } +} - kgoOpts, err := NewKgoConfig(cfg, logger, clientHooks) +// Create a client with the services default settings +// logger: will be used to log connections, errors, warnings about tls config, ... +func (s *Service) CreateAndTestClient(logger *zap.Logger, opts []kgo.Opt, ctx context.Context) (*kgo.Client, error) { + // Config with default options + kgoOpts, err := NewKgoConfig(s.cfg, logger) if err != nil { return nil, fmt.Errorf("failed to create a valid kafka Client config: %w", err) } + // Append user (the service calling this method) provided options + kgoOpts = append(kgoOpts, opts...) - kafkaClient, err := kgo.NewClient(kgoOpts...) + // Create kafka client + client, err := kgo.NewClient(kgoOpts...) if err != nil { return nil, fmt.Errorf("failed to create kafka Client: %w", err) } - return &Service{ - cfg: cfg, - Client: kafkaClient, - logger: logger, - }, nil + // Test connection + connectCtx, cancel := context.WithTimeout(ctx, 15*time.Second) + defer cancel() + err = s.testConnection(client, connectCtx) + if err != nil { + logger.Fatal("failed to test connectivity to Kafka cluster", zap.Error(err)) + } + + return client, nil } -// TestConnection tries to fetch Broker metadata and prints some information if connection succeeds. An error will be +// testConnection tries to fetch Broker metadata and prints some information if connection succeeds. An error will be // returned if connecting fails. -func (s *Service) TestConnection(ctx context.Context) error { +func (s *Service) testConnection(client *kgo.Client, ctx context.Context) error { s.logger.Info("connecting to Kafka seed brokers, trying to fetch cluster metadata", zap.String("seed_brokers", strings.Join(s.cfg.Brokers, ","))) req := kmsg.MetadataRequest{ Topics: nil, } - res, err := req.RequestWith(ctx, s.Client) + res, err := req.RequestWith(ctx, client) if err != nil { return fmt.Errorf("failed to request metadata: %w", err) } // Request versions in order to guess Kafka Cluster version versionsReq := kmsg.NewApiVersionsRequest() - versionsRes, err := versionsReq.RequestWith(ctx, s.Client) + versionsRes, err := versionsReq.RequestWith(ctx, client) if err != nil { return fmt.Errorf("failed to request api versions: %w", err) } diff --git a/main.go b/main.go index 189c63c..f206aef 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,13 @@ package main import ( "context" "fmt" + "net" + "net/http" + "os" + "os/signal" + "strconv" + + "github.com/cloudhut/kminion/v2/e2e" "github.com/cloudhut/kminion/v2/kafka" "github.com/cloudhut/kminion/v2/logging" "github.com/cloudhut/kminion/v2/minion" @@ -10,12 +17,6 @@ import ( promclient "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "go.uber.org/zap" - "net" - "net/http" - "os" - "os/signal" - "strconv" - "time" ) func main() { @@ -53,27 +54,39 @@ func main() { } }() - // Create kafka service and check if client can successfully connect to Kafka cluster - kafkaSvc, err := kafka.NewService(cfg.Kafka, logger) - if err != nil { - logger.Fatal("failed to setup kafka service", zap.Error(err)) - } - connectCtx, cancel := context.WithTimeout(ctx, 15*time.Second) - defer cancel() - err = kafkaSvc.TestConnection(connectCtx) - if err != nil { - logger.Fatal("failed to test connectivity to Kafka cluster", zap.Error(err)) - } + // Create kafka service + kafkaSvc := kafka.NewService(cfg.Kafka, logger) - // Create minion service that does most of the work. The Prometheus exporter only talks to the minion service - // which issues all the requests to Kafka and wraps the interface accordingly. - minionSvc, err := minion.NewService(cfg.Minion, logger, kafkaSvc) + // Create minion service + // Prometheus exporter only talks to the minion service which + // issues all the requests to Kafka and wraps the interface accordingly. + minionSvc, err := minion.NewService(cfg.Minion, logger, kafkaSvc, cfg.Exporter.Namespace, ctx) if err != nil { logger.Fatal("failed to setup minion service", zap.Error(err)) } - err = minionSvc.Start(ctx) - if err != nil { - logger.Fatal("failed to start minion service", zap.Error(err)) + if false { + err = minionSvc.Start(ctx) + if err != nil { + logger.Fatal("failed to start minion service", zap.Error(err)) + } + } + + // Create end to end testing service + if cfg.Minion.EndToEnd.Enabled { + e2eService, err := e2e.NewService( + cfg.Minion.EndToEnd, + logger, + kafkaSvc, + cfg.Exporter.Namespace, + ctx, + ) + if err != nil { + logger.Fatal("failed to create end-to-end monitoring service: %w", zap.Error(err)) + } + + if err = e2eService.Start(ctx); err != nil { + logger.Fatal("failed to start end-to-end monitoring service: %w", zap.Error(err)) + } } // The Prometheus exporter that implements the Prometheus collector interface diff --git a/kafka/client_hooks.go b/minion/client_hooks.go similarity index 96% rename from kafka/client_hooks.go rename to minion/client_hooks.go index 84e1a21..1c179fd 100644 --- a/kafka/client_hooks.go +++ b/minion/client_hooks.go @@ -1,12 +1,13 @@ -package kafka +package minion import ( + "net" + "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/twmb/franz-go/pkg/kgo" "go.uber.org/zap" - "net" - "time" ) // clientHooks implements the various hook interfaces from the franz-go (kafka) library. We can use these hooks to @@ -21,7 +22,7 @@ type clientHooks struct { bytesReceived prometheus.Counter } -func newClientHooks(logger *zap.Logger, metricsNamespace string) *clientHooks { +func newMinionClientHooks(logger *zap.Logger, metricsNamespace string) *clientHooks { requestSentCount := promauto.NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "kafka", diff --git a/minion/config.go b/minion/config.go index 154e878..3b26a76 100644 --- a/minion/config.go +++ b/minion/config.go @@ -1,17 +1,23 @@ package minion -import "fmt" +import ( + "fmt" + + "github.com/cloudhut/kminion/v2/e2e" +) type Config struct { ConsumerGroups ConsumerGroupConfig `koanf:"consumerGroups"` Topics TopicConfig `koanf:"topics"` LogDirs LogDirsConfig `koanf:"logDirs"` + EndToEnd e2e.Config `koanf:"endToEnd"` } func (c *Config) SetDefaults() { c.ConsumerGroups.SetDefaults() c.Topics.SetDefaults() c.LogDirs.SetDefaults() + c.EndToEnd.SetDefaults() } func (c *Config) Validate() error { @@ -30,5 +36,10 @@ func (c *Config) Validate() error { return fmt.Errorf("failed to validate log dirs config: %w", err) } + err = c.EndToEnd.Validate() + if err != nil { + return fmt.Errorf("failed to validate endToEnd config: %w", err) + } + return nil } diff --git a/minion/consumer_group_offsets.go b/minion/consumer_group_offsets.go index 87525b7..650ded9 100644 --- a/minion/consumer_group_offsets.go +++ b/minion/consumer_group_offsets.go @@ -3,10 +3,11 @@ package minion import ( "context" "fmt" + "sync" + "github.com/twmb/franz-go/pkg/kmsg" "go.uber.org/zap" "golang.org/x/sync/errgroup" - "sync" ) // ListAllConsumerGroupOffsetsInternal returns a map from the in memory storage. The map value is the offset commit @@ -69,7 +70,7 @@ func (s *Service) listConsumerGroupOffsets(ctx context.Context, group string) (* req := kmsg.NewOffsetFetchRequest() req.Group = group req.Topics = nil - res, err := req.RequestWith(ctx, s.kafkaSvc.Client) + res, err := req.RequestWith(ctx, s.client) if err != nil { return nil, fmt.Errorf("failed to request group offsets for group '%v': %w", group, err) } diff --git a/minion/describe_consumer_groups.go b/minion/describe_consumer_groups.go index a44ff17..1e48ae9 100644 --- a/minion/describe_consumer_groups.go +++ b/minion/describe_consumer_groups.go @@ -3,11 +3,12 @@ package minion import ( "context" "fmt" + "time" + "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" "go.uber.org/zap" - "time" ) type DescribeConsumerGroupsResponse struct { @@ -40,7 +41,7 @@ func (s *Service) listConsumerGroupsCached(ctx context.Context) (*kmsg.ListGroup func (s *Service) listConsumerGroups(ctx context.Context) (*kmsg.ListGroupsResponse, error) { listReq := kmsg.NewListGroupsRequest() - res, err := listReq.RequestWith(ctx, s.kafkaSvc.Client) + res, err := listReq.RequestWith(ctx, s.client) if err != nil { return nil, fmt.Errorf("failed to list consumer groups: %w", err) } @@ -66,7 +67,7 @@ func (s *Service) DescribeConsumerGroups(ctx context.Context) ([]DescribeConsume describeReq := kmsg.NewDescribeGroupsRequest() describeReq.Groups = groupIDs describeReq.IncludeAuthorizedOperations = false - shardedResp := s.kafkaSvc.Client.RequestSharded(ctx, &describeReq) + shardedResp := s.client.RequestSharded(ctx, &describeReq) describedGroups := make([]DescribeConsumerGroupsResponse, 0) for _, kresp := range shardedResp { diff --git a/minion/describe_topic_config.go b/minion/describe_topic_config.go index ae6323e..bb7cc77 100644 --- a/minion/describe_topic_config.go +++ b/minion/describe_topic_config.go @@ -3,6 +3,7 @@ package minion import ( "context" "fmt" + "github.com/pkg/errors" "github.com/twmb/franz-go/pkg/kmsg" ) @@ -23,7 +24,7 @@ func (s *Service) GetTopicConfigs(ctx context.Context) (*kmsg.DescribeConfigsRes req.Resources = append(req.Resources, resourceReq) } - res, err := req.RequestWith(ctx, s.kafkaSvc.Client) + res, err := req.RequestWith(ctx, s.client) if err != nil { return nil, fmt.Errorf("failed to request metadata: %w", err) } diff --git a/minion/list_offsets.go b/minion/list_offsets.go index 597dd2a..4a85909 100644 --- a/minion/list_offsets.go +++ b/minion/list_offsets.go @@ -3,9 +3,10 @@ package minion import ( "context" "fmt" - "github.com/twmb/franz-go/pkg/kmsg" "strconv" "time" + + "github.com/twmb/franz-go/pkg/kmsg" ) func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (*kmsg.ListOffsetsResponse, error) { @@ -59,5 +60,5 @@ func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (*kmsg.ListO req := kmsg.NewListOffsetsRequest() req.Topics = topicReqs - return req.RequestWith(ctx, s.kafkaSvc.Client) + return req.RequestWith(ctx, s.client) } diff --git a/minion/log_dirs.go b/minion/log_dirs.go index 1bb56cd..42c79e9 100644 --- a/minion/log_dirs.go +++ b/minion/log_dirs.go @@ -2,6 +2,7 @@ package minion import ( "context" + "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" ) @@ -15,7 +16,7 @@ type LogDirResponseShard struct { func (s *Service) DescribeLogDirs(ctx context.Context) []LogDirResponseShard { req := kmsg.NewDescribeLogDirsRequest() req.Topics = nil // Describe all topics - responses := s.kafkaSvc.Client.RequestSharded(ctx, &req) + responses := s.client.RequestSharded(ctx, &req) res := make([]LogDirResponseShard, len(responses)) for i, responseShard := range responses { diff --git a/minion/metadata.go b/minion/metadata.go index 57661f1..774f030 100644 --- a/minion/metadata.go +++ b/minion/metadata.go @@ -3,8 +3,9 @@ package minion import ( "context" "fmt" - "github.com/twmb/franz-go/pkg/kmsg" "time" + + "github.com/twmb/franz-go/pkg/kmsg" ) func (s *Service) GetMetadataCached(ctx context.Context) (*kmsg.MetadataResponse, error) { @@ -36,7 +37,7 @@ func (s *Service) GetMetadata(ctx context.Context) (*kmsg.MetadataResponse, erro req := kmsg.NewMetadataRequest() req.Topics = nil - res, err := req.RequestWith(ctx, s.kafkaSvc.Client) + res, err := req.RequestWith(ctx, s.client) if err != nil { return nil, fmt.Errorf("failed to request metadata: %w", err) } diff --git a/minion/offset_consumer.go b/minion/offset_consumer.go index 21b2cc8..06216af 100644 --- a/minion/offset_consumer.go +++ b/minion/offset_consumer.go @@ -3,18 +3,19 @@ package minion import ( "context" "fmt" + "time" + "github.com/twmb/franz-go/pkg/kbin" "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" "go.uber.org/zap" - "time" ) // startConsumingOffsets consumes the __consumer_offsets topic and forwards the kafka messages to their respective // methods where they'll be decoded and further processed. func (s *Service) startConsumingOffsets(ctx context.Context) { - client := s.kafkaSvc.Client + client := s.client topic := kgo.ConsumeTopics(kgo.NewOffset().AtStart(), "__consumer_offsets") client.AssignPartitions(topic) @@ -66,7 +67,7 @@ func (s *Service) checkIfConsumerLagIsCaughtUp(ctx context.Context) { topic.Topic = &topicName req.Topics = []kmsg.MetadataRequestTopic{topic} - res, err := req.RequestWith(ctx, s.kafkaSvc.Client) + res, err := req.RequestWith(ctx, s.client) if err != nil { s.logger.Warn("failed to check if consumer lag on offsets topic is caught up because metadata request failed", zap.Error(err)) @@ -91,7 +92,7 @@ func (s *Service) checkIfConsumerLagIsCaughtUp(ctx context.Context) { } offsetReq := kmsg.NewListOffsetsRequest() offsetReq.Topics = topicReqs - highMarksRes, err := offsetReq.RequestWith(ctx, s.kafkaSvc.Client) + highMarksRes, err := offsetReq.RequestWith(ctx, s.client) if err != nil { s.logger.Warn("failed to check if consumer lag on offsets topic is caught up because high watermark request failed", zap.Error(err)) @@ -108,7 +109,13 @@ func (s *Service) checkIfConsumerLagIsCaughtUp(ctx context.Context) { consumedOffsets := s.storage.getConsumedOffsets() topicRes := highMarksRes.Topics[0] isReady := true - partitionsLagging := 0 + + type laggingParition struct { + Name string + Id int32 + Lag int64 + } + var partitionsLagging []laggingParition totalLag := int64(0) for _, partition := range topicRes.Partitions { err := kerr.ErrorForCode(partition.ErrorCode) @@ -126,7 +133,11 @@ func (s *Service) checkIfConsumerLagIsCaughtUp(ctx context.Context) { } if partitionLag > 0 { - partitionsLagging++ + partitionsLagging = append(partitionsLagging, laggingParition{ + Name: topicRes.Topic, + Id: partition.Partition, + Lag: partitionLag, + }) totalLag += partitionLag s.logger.Debug("consumer_offsets topic lag has not been caught up yet", zap.Int32("partition_id", partition.Partition), @@ -143,7 +154,8 @@ func (s *Service) checkIfConsumerLagIsCaughtUp(ctx context.Context) { return } else { s.logger.Info("catching up the message lag on consumer offsets", - zap.Int("lagging_partitions", partitionsLagging), + zap.Int("lagging_partitions_count", len(partitionsLagging)), + zap.Any("lagging_partitions", partitionsLagging), zap.Int64("total_lag", totalLag)) } } diff --git a/minion/service.go b/minion/service.go index c811f56..b147bb1 100644 --- a/minion/service.go +++ b/minion/service.go @@ -3,21 +3,23 @@ package minion import ( "context" "fmt" + "regexp" + "sync" + "time" + "github.com/cloudhut/kminion/v2/kafka" + "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" "github.com/twmb/franz-go/pkg/kversion" "go.uber.org/zap" "golang.org/x/sync/singleflight" - "regexp" - "sync" - "time" ) type Service struct { Cfg Config logger *zap.Logger - // requestGroup is used to cache responses and deduplicate multiple concurrent inflight requests + // requestGroup is used to deduplicate multiple concurrent requests to kafka requestGroup *singleflight.Group cache map[string]interface{} cacheLock sync.RWMutex @@ -27,23 +29,33 @@ type Service struct { AllowedTopicsExpr []*regexp.Regexp IgnoredTopicsExpr []*regexp.Regexp - kafkaSvc *kafka.Service - storage *Storage + client *kgo.Client + storage *Storage } -func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service) (*Service, error) { +func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricsNamespace string, ctx context.Context) (*Service, error) { storage, err := newStorage(logger) if err != nil { return nil, fmt.Errorf("failed to create storage: %w", err) } + // Kafka client + hooksChildLogger := logger.With(zap.String("source", "minion_kafka_client")) + minionHooks := newMinionClientHooks(hooksChildLogger, metricsNamespace) + kgoOpts := []kgo.Opt{kgo.WithHooks(minionHooks)} + + client, err := kafkaSvc.CreateAndTestClient(logger, kgoOpts, ctx) + if err != nil { + return nil, fmt.Errorf("failed to create kafka client: %w", err) + } + // Compile regexes. We can ignore the errors because valid compilation has been validated already allowedGroupIDsExpr, _ := compileRegexes(cfg.ConsumerGroups.AllowedGroupIDs) ignoredGroupIDsExpr, _ := compileRegexes(cfg.ConsumerGroups.IgnoredGroupIDs) allowedTopicsExpr, _ := compileRegexes(cfg.Topics.AllowedTopics) ignoredTopicsExpr, _ := compileRegexes(cfg.Topics.IgnoredTopics) - return &Service{ + service := &Service{ Cfg: cfg, logger: logger, @@ -56,9 +68,11 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service) (*Servi AllowedTopicsExpr: allowedTopicsExpr, IgnoredTopicsExpr: ignoredTopicsExpr, - kafkaSvc: kafkaSvc, - storage: storage, - }, nil + client: client, + storage: storage, + } + + return service, nil } func (s *Service) Start(ctx context.Context) error { diff --git a/minion/storage.go b/minion/storage.go index 9bb704e..caa037e 100644 --- a/minion/storage.go +++ b/minion/storage.go @@ -2,20 +2,23 @@ package minion import ( "fmt" - "github.com/orcaman/concurrent-map" + "strconv" + "time" + + cmap "github.com/orcaman/concurrent-map" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" "go.uber.org/atomic" "go.uber.org/zap" - "strconv" - "time" ) // Storage stores the current state of all consumer group information that has been consumed using the offset consumer. type Storage struct { logger *zap.Logger - // offsetCommits is a map of all consumer offsets. A unique key in the format "group:topic:partition" is used as map key. + // offsetCommits is a map of all consumer offsets. + // A unique key in the format "group:topic:partition" is used as map key. + // Value is of type OffsetCommit offsetCommits cmap.ConcurrentMap // progressTracker is a map that tracks what offsets in each partition have already been consumed diff --git a/minion/versions.go b/minion/versions.go index 7c25cdc..d6b596e 100644 --- a/minion/versions.go +++ b/minion/versions.go @@ -3,6 +3,7 @@ package minion import ( "context" "fmt" + "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kmsg" "github.com/twmb/franz-go/pkg/kversion" @@ -22,7 +23,7 @@ func (s *Service) GetAPIVersions(ctx context.Context) (*kmsg.ApiVersionsResponse versionsReq := kmsg.NewApiVersionsRequest() versionsReq.ClientSoftwareName = "kminion" versionsReq.ClientSoftwareVersion = "v2" - res, err := versionsReq.RequestWith(ctx, s.kafkaSvc.Client) + res, err := versionsReq.RequestWith(ctx, s.client) if err != nil { return nil, fmt.Errorf("failed to request api versions: %w", err) } diff --git a/prometheus/exporter.go b/prometheus/exporter.go index 404adc1..b0bcf98 100644 --- a/prometheus/exporter.go +++ b/prometheus/exporter.go @@ -2,12 +2,13 @@ package prometheus import ( "context" + "os" + "time" + "github.com/cloudhut/kminion/v2/minion" uuid2 "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" - "os" - "time" ) // Exporter is the Prometheus exporter that implements the prometheus.Collector interface