Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1110 from grafana/deprecate-offset-manager
Browse files Browse the repository at this point in the history
Deprecate kafka offset manager
  • Loading branch information
Dieterbe authored Oct 24, 2018
2 parents e407846 + fd6274e commit acbcdc1
Show file tree
Hide file tree
Showing 70 changed files with 48 additions and 14,558 deletions.
24 changes: 0 additions & 24 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,6 @@ unused-packages = true
name = "github.com/smartystreets/goconvey"
branch = "master"

[[constraint]]
name = "github.com/syndtr/goleveldb"
branch = "master"

[[constraint]]
name = "github.com/tinylib/msgp"
version = "1.0.0-beta"
Expand Down
2 changes: 0 additions & 2 deletions cmd/mt-kafka-mdm-sniff-out-of-order/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,6 @@ func main() {

// config may have had it disabled
inKafkaMdm.Enabled = true
// important: we don't want to share the same offset tracker as the mdm input of MT itself
inKafkaMdm.DataDir = "/tmp/" + instance

inKafkaMdm.ConfigProcess(instance)

Expand Down
2 changes: 0 additions & 2 deletions cmd/mt-kafka-mdm-sniff/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,6 @@ func main() {

// config may have had it disabled
inKafkaMdm.Enabled = true
// important: we don't want to share the same offset tracker as the mdm input of MT itself
inKafkaMdm.DataDir = "/tmp/" + instance

inKafkaMdm.ConfigProcess(instance)

Expand Down
18 changes: 4 additions & 14 deletions docker/docker-chaos/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -228,17 +228,12 @@ brokers = kafka:9092
kafka-version = 0.10.0.0
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# offset to start consuming from. Can be oldest, newest or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# the further back in time you go, the more old data you can load into metrictank, but the longer it takes to catch up to realtime data
offset = last
offset = newest
# kafka partitions to consume. use '*' or a comma separated list of id's
partitions = *
# save interval for offsets
offset-commit-interval = 5s
# directory to store partition offsets index. supports relative or absolute paths. empty means working dir.
# it will be created (incl parent dirs) if not existing.
data-dir = /var/lib/metrictank
# The number of metrics to buffer in internal and external channels
channel-buffer-size = 1000
# The minimum number of message bytes to fetch in a request
Expand Down Expand Up @@ -330,17 +325,12 @@ kafka-version = 0.10.0.0
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
partitions = *
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# offset to start consuming from. Can be oldest, newest or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# Should match your kafka-mdm-in setting
offset = last
# save interval for offsets
offset-commit-interval = 5s
offset = newest
# Maximum time backlog processing can block during metrictank startup.
backlog-process-timeout = 60s
# directory to store partition offsets index. supports relative or absolute paths. empty means working dir.
# it will be created (incl parent dirs) if not existing.
data-dir = /var/lib/metrictank

### nsq as transport for clustering messages
[nsq-cluster]
Expand Down
14 changes: 2 additions & 12 deletions docker/docker-cluster/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -228,17 +228,12 @@ brokers = kafka:9092
kafka-version = 0.10.0.0
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# offset to start consuming from. Can be oldest, newest or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# the further back in time you go, the more old data you can load into metrictank, but the longer it takes to catch up to realtime data
offset = oldest
# kafka partitions to consume. use '*' or a comma separated list of id's
partitions = *
# save interval for offsets
offset-commit-interval = 5s
# directory to store partition offsets index. supports relative or absolute paths. empty means working dir.
# it will be created (incl parent dirs) if not existing.
data-dir = /var/lib/metrictank
# The number of metrics to buffer in internal and external channels
channel-buffer-size = 1000
# The minimum number of message bytes to fetch in a request
Expand Down Expand Up @@ -330,17 +325,12 @@ kafka-version = 0.10.0.0
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
partitions = *
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# offset to start consuming from. Can be oldest, newest or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# Should match your kafka-mdm-in setting
offset = oldest
# save interval for offsets
offset-commit-interval = 5s
# Maximum time backlog processing can block during metrictank startup.
backlog-process-timeout = 60s
# directory to store partition offsets index. supports relative or absolute paths. empty means working dir.
# it will be created (incl parent dirs) if not existing.
data-dir = /var/lib/metrictank

### nsq as transport for clustering messages
[nsq-cluster]
Expand Down
14 changes: 2 additions & 12 deletions docker/docker-dev-custom-cfg-kafka/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -228,17 +228,12 @@ brokers = kafka:9092
kafka-version = 0.10.0.0
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# offset to start consuming from. Can be oldest, newest or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# the further back in time you go, the more old data you can load into metrictank, but the longer it takes to catch up to realtime data
offset = oldest
# kafka partitions to consume. use '*' or a comma separated list of id's
partitions = *
# save interval for offsets
offset-commit-interval = 5s
# directory to store partition offsets index. supports relative or absolute paths. empty means working dir.
# it will be created (incl parent dirs) if not existing.
data-dir = /var/lib/metrictank
# The number of metrics to buffer in internal and external channels
channel-buffer-size = 1000
# The minimum number of message bytes to fetch in a request
Expand Down Expand Up @@ -330,17 +325,12 @@ kafka-version = 0.10.0.0
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
partitions = *
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# offset to start consuming from. Can be oldest, newest or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# Should match your kafka-mdm-in setting
offset = oldest
# save interval for offsets
offset-commit-interval = 5s
# Maximum time backlog processing can block during metrictank startup.
backlog-process-timeout = 60s
# directory to store partition offsets index. supports relative or absolute paths. empty means working dir.
# it will be created (incl parent dirs) if not existing.
data-dir = /var/lib/metrictank

### nsq as transport for clustering messages
[nsq-cluster]
Expand Down
18 changes: 4 additions & 14 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,17 +281,12 @@ brokers = kafka:9092
kafka-version = 0.10.0.0
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# offset to start consuming from. Can be oldest, newest or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# the further back in time you go, the more old data you can load into metrictank, but the longer it takes to catch up to realtime data
offset = last
offset = newest
# kafka partitions to consume. use '*' or a comma separated list of id's
partitions = *
# save interval for offsets
offset-commit-interval = 5s
# directory to store partition offsets index. supports relative or absolute paths. empty means working dir.
# it will be created (incl parent dirs) if not existing.
data-dir =
# The number of metrics to buffer in internal and external channels
channel-buffer-size = 1000
# The minimum number of message bytes to fetch in a request
Expand Down Expand Up @@ -392,17 +387,12 @@ kafka-version = 0.10.0.0
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
partitions = *
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
# offset to start consuming from. Can be oldest, newest or a time duration
# When using a duration but the offset request fails (e.g. Kafka doesn't have data so far back), metrictank falls back to `oldest`.
# Should match your kafka-mdm-in setting
offset = last
# save interval for offsets
offset-commit-interval = 5s
offset = newest
# Maximum time backlog processing can block during metrictank startup.
backlog-process-timeout = 60s
# directory to store partition offsets index. supports relative or absolute paths. empty means working dir.
# it will be created (incl parent dirs) if not existing.
data-dir =
```

### nsq as transport for clustering messages
Expand Down
33 changes: 2 additions & 31 deletions input/kafkamdm/kafkamdm.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,14 @@ var topics []string
var partitionStr string
var partitions []int32
var offsetStr string
var DataDir string
var config *sarama.Config
var channelBufferSize int
var consumerFetchMin int
var consumerFetchDefault int
var consumerMaxWaitTime time.Duration
var consumerMaxProcessingTime time.Duration
var netMaxOpenRequests int
var offsetMgr *kafka.OffsetMgr
var offsetDuration time.Duration
var offsetCommitInterval time.Duration
var partitionOffset map[int32]*stats.Gauge64
var partitionLogSize map[int32]*stats.Gauge64
var partitionLag map[int32]*stats.Gauge64
Expand All @@ -77,10 +74,8 @@ func ConfigSetup() {
inKafkaMdm.StringVar(&brokerStr, "brokers", "kafka:9092", "tcp address for kafka (may be be given multiple times as a comma-separated list)")
inKafkaMdm.StringVar(&kafkaVersionStr, "kafka-version", "0.10.0.0", "Kafka version in semver format. All brokers must be this version or newer.")
inKafkaMdm.StringVar(&topicStr, "topics", "mdm", "kafka topic (may be given multiple times as a comma-separated list)")
inKafkaMdm.StringVar(&offsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration")
inKafkaMdm.StringVar(&offsetStr, "offset", "newest", "Set the offset to start consuming from. Can be oldest, newest or a time duration")
inKafkaMdm.StringVar(&partitionStr, "partitions", "*", "kafka partitions to consume. use '*' or a comma separated list of id's")
inKafkaMdm.DurationVar(&offsetCommitInterval, "offset-commit-interval", time.Second*5, "Interval at which offsets should be saved.")
inKafkaMdm.StringVar(&DataDir, "data-dir", "", "Directory to store partition offsets index")
inKafkaMdm.IntVar(&channelBufferSize, "channel-buffer-size", 1000, "The number of metrics to buffer in internal and external channels")
inKafkaMdm.IntVar(&consumerFetchMin, "consumer-fetch-min", 1, "The minimum number of message bytes to fetch in a request")
inKafkaMdm.IntVar(&consumerFetchDefault, "consumer-fetch-default", 32768, "The default number of message bytes to fetch in a request")
Expand All @@ -100,9 +95,6 @@ func ConfigProcess(instance string) {
log.Fatalf("kafkamdm: invalid kafka-version. %s", err)
}

if offsetCommitInterval == 0 {
log.Fatal("kafkamdm: offset-commit-interval must be greater then 0")
}
if consumerMaxWaitTime == 0 {
log.Fatal("kafkamdm: consumer-max-wait-time must be greater then 0")
}
Expand All @@ -111,7 +103,6 @@ func ConfigProcess(instance string) {
}

switch offsetStr {
case "last":
case "oldest":
case "newest":
default:
Expand All @@ -121,10 +112,6 @@ func ConfigProcess(instance string) {
}
}

offsetMgr, err = kafka.NewOffsetMgr(DataDir)
if err != nil {
log.Fatalf("kafkamdm: couldnt create offsetMgr. %s", err)
}
brokers = strings.Split(brokerStr, ",")
topics = strings.Split(topicStr, ",")

Expand Down Expand Up @@ -222,12 +209,6 @@ func (k *KafkaMdm) Start(handler input.Handler, cancel context.CancelFunc) error
offset = sarama.OffsetOldest
case "newest":
offset = sarama.OffsetNewest
case "last":
offset, err = offsetMgr.Last(topic, partition)
if err != nil {
log.Errorf("kafkamdm: Failed to get %q duration offset for %s:%d. %q", offsetStr, topic, partition, err)
return err
}
default:
offset, err = k.client.GetOffset(topic, partition, time.Now().Add(-1*offsetDuration).UnixNano()/int64(time.Millisecond))
if err != nil {
Expand Down Expand Up @@ -315,26 +296,20 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset
return
}
messages := pc.Messages()
ticker := time.NewTicker(offsetCommitInterval)
ticker := time.NewTicker(5 * time.Second)
for {
select {
case msg, ok := <-messages:
// https://github.com/Shopify/sarama/wiki/Frequently-Asked-Questions#why-am-i-getting-a-nil-message-from-the-sarama-consumer
if !ok {
log.Errorf("kafkamdm: kafka consumer for %s:%d has shutdown. stop consuming", topic, partition)
if err := offsetMgr.Commit(topic, partition, currentOffset); err != nil {
log.Errorf("kafkamdm: failed to commit offset for %s:%d, %s", topic, partition, err)
}
k.cancel()
return
}
log.Debugf("kafkamdm: received message: Topic %s, Partition: %d, Offset: %d, Key: %x", msg.Topic, msg.Partition, msg.Offset, msg.Key)
k.handleMsg(msg.Value, partition)
currentOffset = msg.Offset
case ts := <-ticker.C:
if err := offsetMgr.Commit(topic, partition, currentOffset); err != nil {
log.Errorf("kafkamdm: failed to commit offset for %s:%d, %s", topic, partition, err)
}
k.lagMonitor.StoreOffset(partition, currentOffset, ts)
newest, err := k.tryGetOffset(topic, partition, sarama.OffsetNewest, 1, 0)
if err != nil {
Expand All @@ -351,9 +326,6 @@ func (k *KafkaMdm) consumePartition(topic string, partition int32, currentOffset
}
case <-k.stopConsuming:
pc.Close()
if err := offsetMgr.Commit(topic, partition, currentOffset); err != nil {
log.Errorf("kafkamdm: failed to commit offset for %s:%d, %s", topic, partition, err)
}
log.Infof("kafkamdm: consumer for %s:%d ended.", topic, partition)
return
}
Expand Down Expand Up @@ -391,7 +363,6 @@ func (k *KafkaMdm) Stop() {
close(k.stopConsuming)
k.wg.Wait()
k.client.Close()
offsetMgr.Close()
}

func (k *KafkaMdm) MaintainPriority() {
Expand Down
4 changes: 1 addition & 3 deletions input/kafkamdm/lag_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ func newLagLogger(size int) *lagLogger {
// Store saves the current value, potentially overwriting an old value
// if needed.
// Note: negative values are ignored. We rely on previous data - if any - in such case.
// negative values can happen when:
// - kafka had to recover, and a previous offset loaded from offsetMgr was bigger than current offset
// - a rollover of the offset counter
// negative values can happen upon a rollover of the offset counter
func (l *lagLogger) Store(lag int) {
if lag < 0 {
return
Expand Down
Loading

0 comments on commit acbcdc1

Please sign in to comment.