From 8119e2a2c1f06e4f249ddab26cc445cda511e343 Mon Sep 17 00:00:00 2001 From: Robert Casey Date: Sun, 7 Jan 2018 14:11:27 -0600 Subject: [PATCH] Big commit this time. **New functionality:** - (Experimental) Implemented Java-based in-memory data storage which provides some new functionality. Right now this is lightly implemented into the front-end, but over time will replace the current implementation. This allowed me to implement some new functionality: - Report if consumer-group is currently active - This will eventually allow us to report on inactive consumer-groups - (Experimental) Report Burrow-like consumer-group status calculation via REST endpoint (/consumergroup), while updating Burrows rules a bit. The rules I implemnted here are: - Evaluate per consumer-group topic-partition: - Rule 0: If there are no committed offsets, then there is nothing to calculate and the period is OK. - Rule 1: If the difference between now and the last offset timestamp is greater than the difference between the last and first offset timestamps, the consumer has stopped committing offsets for that partition (error) - Rule 2: If the consumer offset decreases from one interval to the next the partition is marked as a rewind (error) - Rule 3: If over the stored period, the lag is ever zero for the partition, the period is OK - Rule 4: If the consumer offset does not change, and the lag is non-zero, it's an error (partition is stalled) - Rule 5: If the consumer offsets are moving, but the lag is consistently increasing, it's a warning (consumer is slow) - Roll-up all consumer-group topic-partitions per consumer-group and report a consumer-group status: - Set consumer-group status to ERROR if any topic-partition status is STOP - Set consumer-group status to ERROR if any topic-partition status is REWIND - Set consumer-group status to ERROR if any topic-partition status is STALL - Set consumer-group status to WARN if any topic-partition status is WARN - Set consumer-group status to OK if none of the above rules match **Of course some of the bugs you were seeing were fixed as well:** - Synchronizing around all SQLite DB activity. SQLite only allows one operation at a time with the DB file. - This fixed all DB create/update/delete issues at the expense of sometimes blocking DB operations while another DB operation is taking place. This is unavoidable using SQLite. Long term fix will be to replace SQLite with a more appropriate DB engine. - Fixed an issue where LogEndOffset and Lag can display incorrect values. - Added retry logic around building the ZkUtils object. This fixed the issue where we would not re-connect to Zookeeper if the zk service went down and then was restored. - Updated some dependency versions. --- README.md | 8 +- build.sbt | 32 ++- project/build.properties | 2 +- .../kafka/KafkaCommittedOffset.java | 50 ++++ .../kafka/KafkaCommittedOffsetMetadata.java | 44 ++++ .../morningstar/kafka/KafkaConsumerGroup.java | 168 ++++++++++++ .../kafka/KafkaOffsetMetadata.java | 63 +++++ .../morningstar/kafka/KafkaOffsetStorage.java | 133 ++++++++++ .../kafka/KafkaTopicPartition.java | 245 ++++++++++++++++++ .../KafkaTopicPartitionLogEndOffset.java | 49 ++++ .../java/com/morningstar/kafka/Status.java | 12 + .../offsetapp/scripts/cluster-viz.js | 8 +- .../offsetapp/scripts/controllers.js | 4 +- .../resources/offsetapp/scripts/directives.js | 4 +- .../resources/offsetapp/views/grouplist.html | 2 +- .../resources/offsetapp/views/topiclist.html | 28 +- .../com/quantifind/kafka/OffsetGetter.scala | 62 ++++- .../kafka/core/KafkaOffsetGetter.scala | 214 +++++++++++---- .../kafka/core/StormOffsetGetter.scala | 12 +- .../kafka/core/ZKOffsetGetter.scala | 8 + .../quantifind/kafka/offsetapp/OffsetDB.scala | 229 ++++++++-------- .../kafka/offsetapp/OffsetGetterApp.scala | 3 +- .../kafka/offsetapp/OffsetGetterWeb.scala | 74 ++++-- .../kafka/core/KafkaOffsetGetterSpec.scala | 13 +- 24 files changed, 1245 insertions(+), 222 deletions(-) create mode 100644 src/main/java/com/morningstar/kafka/KafkaCommittedOffset.java create mode 100644 src/main/java/com/morningstar/kafka/KafkaCommittedOffsetMetadata.java create mode 100644 src/main/java/com/morningstar/kafka/KafkaConsumerGroup.java create mode 100644 src/main/java/com/morningstar/kafka/KafkaOffsetMetadata.java create mode 100644 src/main/java/com/morningstar/kafka/KafkaOffsetStorage.java create mode 100644 src/main/java/com/morningstar/kafka/KafkaTopicPartition.java create mode 100644 src/main/java/com/morningstar/kafka/KafkaTopicPartitionLogEndOffset.java create mode 100644 src/main/java/com/morningstar/kafka/Status.java diff --git a/README.md b/README.md index d7a2bb0..d0c207f 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ This is a small web app, you can run it locally or on a server, as long as you h ``` java -Djava.security.auth.login.config=conf/server-client-jaas.conf \ - -cp KafkaOffsetMonitor-assembly-0.4.0.jar \ + -cp KafkaOffsetMonitor-assembly-0.4.6.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --offsetStorage kafka \ --kafkaBrokers kafkabroker01:6667,kafkabroker02:6667 \ @@ -126,7 +126,7 @@ As long as this is true you will need to use local maven repo and just publish K Assuming you have a custom implementation of OffsetInfoReporter in a jar file, running it is as simple as adding the jar to the classpath when running app: ``` -java -cp KafkaOffsetMonitor-assembly-0.3.0.jar:kafka-offset-monitor-another-db-reporter.jar \ +java -cp KafkaOffsetMonitor-assembly-0.4.6.jar:kafka-offset-monitor-another-db-reporter.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --zk zkserver01,zkserver02 \ --port 8080 \ @@ -141,7 +141,3 @@ Contributing ============ The KafkaOffsetMonitor is released under the Apache License and we **welcome any contributions** within this license. Any pull request is welcome and will be reviewed and merged as quickly as possible. - -Because this open source tool is released by [Quantifind](http://www.quantifind.com) as a company, if you want to submit a pull request, you will have to sign the following simple contributors agreement: -- If you are an individual, please sign [this contributors agreement](https://docs.google.com/a/quantifind.com/document/d/1RS7qEjq3cCmJ1665UhoCMK8541Ms7KyU3kVFoO4CR_I/) and send it back to contributors@quantifind.com -- If you are contributing changes that you did as part of your work, please sign [this contributors agreement](https://docs.google.com/a/quantifind.com/document/d/1kNwLT4qG3G0Ct2mEuNdBGmKDYuApN1CpQtZF8TSVTjE/) and send it back to contributors@quantifind.com diff --git a/build.sbt b/build.sbt index caf5f9e..02589f8 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ name := "KafkaOffsetMonitor" -version := "0.4.1-SNAPSHOT" -scalaVersion := "2.11.8" +version := "0.4.6-SNAPSHOT" +scalaVersion := "2.11.11" organization := "com.quantifind" scalacOptions ++= Seq("-deprecation", "-unchecked", "-optimize", "-feature") @@ -8,18 +8,22 @@ scalacOptions ++= Seq("-deprecation", "-unchecked", "-optimize", "-feature") mainClass in Compile := Some("com.quantifind.kafka.offsetapp.OffsetGetterWeb") libraryDependencies ++= Seq( - "log4j" % "log4j" % "1.2.17", - "net.databinder" %% "unfiltered-filter" % "0.8.4", - "net.databinder" %% "unfiltered-jetty" % "0.8.4", - "net.databinder" %% "unfiltered-json4s" % "0.8.4", - "com.quantifind" %% "sumac" % "0.3.0", - "org.apache.kafka" %% "kafka" % "0.9.0.1", - "org.reflections" % "reflections" % "0.9.10", - "com.twitter" %% "util-core" % "6.40.0", - "com.typesafe.slick" %% "slick" % "2.1.0", - "org.xerial" % "sqlite-jdbc" % "3.7.2", - "org.mockito" % "mockito-all" % "1.10.19" % "test", - "org.scalatest" %% "scalatest" % "2.2.6" % "test") + "log4j" % "log4j" % "1.2.17", + "net.databinder" %% "unfiltered-filter" % "0.8.4", + "net.databinder" %% "unfiltered-jetty" % "0.8.4", + "net.databinder" %% "unfiltered-json4s" % "0.8.4", + "com.quantifind" %% "sumac" % "0.3.0", + "org.apache.kafka" %% "kafka" % "0.9.0.1", + "org.reflections" % "reflections" % "0.9.11", + "com.twitter" %% "util-core" % "7.1.0", + "com.typesafe.slick" %% "slick" % "2.1.0", + "org.xerial" % "sqlite-jdbc" % "3.18.0", + "com.google.code.gson" % "gson" % "2.8.2", + "com.google.guava" % "guava" % "20.0", + "javax.ws.rs" % "javax.ws.rs-api" % "2.0-m16", + "org.glassfish.jersey.core" % "jersey-client" % "2.25.1", + "org.mockito" % "mockito-all" % "1.10.19" % "test", + "org.scalatest" %% "scalatest" % "2.2.6" % "test") assemblyMergeStrategy in assembly := { case "about.html" => MergeStrategy.discard diff --git a/project/build.properties b/project/build.properties index 34412c6..6220d2e 100644 --- a/project/build.properties +++ b/project/build.properties @@ -2,4 +2,4 @@ project.organization=com.quantifind project.name=KafkaOffsetMonitor project.version=0.4 -sbt.version=0.13.13 +sbt.version=0.13.16 diff --git a/src/main/java/com/morningstar/kafka/KafkaCommittedOffset.java b/src/main/java/com/morningstar/kafka/KafkaCommittedOffset.java new file mode 100644 index 0000000..7f1ef7d --- /dev/null +++ b/src/main/java/com/morningstar/kafka/KafkaCommittedOffset.java @@ -0,0 +1,50 @@ +package com.morningstar.kafka; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + +public class KafkaCommittedOffset { + + private String groupName; + private boolean groupIsActive; + private String topicName; + private int partitionId; + private KafkaCommittedOffsetMetadata committedOffset; + + + public KafkaCommittedOffset(String groupName, boolean groupIsActive, String topicName, int partitionId, long committedOffset, long committedMillis) { + + Preconditions.checkArgument(!Strings.isNullOrEmpty(groupName), "groupName must not be NULL or empty."); + Preconditions.checkArgument(!Strings.isNullOrEmpty(topicName), "topicName must not be NULL or empty."); + Preconditions.checkArgument(partitionId > -1, "partitionId must be greater than or equal to 0."); + Preconditions.checkArgument(committedOffset > -1, "committedOffset must be greater than or equal to 0."); + Preconditions.checkArgument(committedMillis > -1, "committedMillis must be greater than or equal to 0."); + + this.groupName = groupName; + this.groupIsActive = groupIsActive; + this.topicName = topicName; + this.partitionId = partitionId; + this.committedOffset = new KafkaCommittedOffsetMetadata(committedOffset, committedMillis); + } + + + public String getGroupName() { + return groupName; + } + + public boolean getGroupIsActive() { + return groupIsActive; + } + + public String getTopicName() { + return topicName; + } + + public int getPartitionId() { + return partitionId; + } + + public KafkaCommittedOffsetMetadata getCommittedOffset() { + return committedOffset; + } +} diff --git a/src/main/java/com/morningstar/kafka/KafkaCommittedOffsetMetadata.java b/src/main/java/com/morningstar/kafka/KafkaCommittedOffsetMetadata.java new file mode 100644 index 0000000..cb299ba --- /dev/null +++ b/src/main/java/com/morningstar/kafka/KafkaCommittedOffsetMetadata.java @@ -0,0 +1,44 @@ +package com.morningstar.kafka; + +import com.google.common.base.Preconditions; +import com.google.gson.annotations.Expose; + +public class KafkaCommittedOffsetMetadata extends KafkaOffsetMetadata { + + @Expose private long lag = -1; + + + public KafkaCommittedOffsetMetadata(KafkaOffsetMetadata offsetMetadata, long lag) { + super(offsetMetadata.getOffset(), offsetMetadata.getTimestamp()); + verifyParameters(lag); + this.lag = lag; + } + + public KafkaCommittedOffsetMetadata(long committedOffset, long timestamp, long lag) { + super(committedOffset, timestamp); + verifyParameters(lag); + this.lag = lag; + } + + public KafkaCommittedOffsetMetadata(KafkaOffsetMetadata offsetMetadata) { + super(offsetMetadata.getOffset(), offsetMetadata.getTimestamp()); + } + + public KafkaCommittedOffsetMetadata(long committedOffset, long timestamp) { + super(committedOffset, timestamp); + } + + private void verifyParameters(long lag) { + + Preconditions.checkArgument(lag > -2, "lag must not be less than -1."); + } + + + public long getLag() { + return lag; + } + + public void setLag(long lag) { + this.lag = lag; + } +} diff --git a/src/main/java/com/morningstar/kafka/KafkaConsumerGroup.java b/src/main/java/com/morningstar/kafka/KafkaConsumerGroup.java new file mode 100644 index 0000000..6925fe6 --- /dev/null +++ b/src/main/java/com/morningstar/kafka/KafkaConsumerGroup.java @@ -0,0 +1,168 @@ +package com.morningstar.kafka; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Sets; +import com.google.gson.annotations.Expose; + +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + + +public class KafkaConsumerGroup { + + private final long COMPLETE_THRESHOLD = 10; + + @Expose private String consumerGroupName; + @Expose private boolean isActive; + @Expose private boolean complete; + @Expose private long mostRecentCommittedMillis; + @Expose private Status status; + @Expose private Set topicPartitions; + + + public KafkaConsumerGroup(String consumerGroupName) { + + Preconditions.checkArgument(!Strings.isNullOrEmpty(consumerGroupName), "consumerGroupName cannot be NULL or empty."); + + this.consumerGroupName = consumerGroupName; + this.isActive = false; + this.complete = false; + this.mostRecentCommittedMillis = -1; + this.status = Status.OK; + this.topicPartitions = Sets.newConcurrentHashSet(); + } + + + public String getConsumerGroupName() { + return consumerGroupName; + } + + public boolean getComplete() { return complete; } + + public long getMaxCommitedMillis() { + return mostRecentCommittedMillis; + } + + public boolean isActive() { + return isActive; + } + + public Set getTopicPartitions() { + + return topicPartitions; + } + + public Set getTopics() { + + return topicPartitions.stream() + .map(KafkaTopicPartition::getTopicName) + .collect(Collectors.toSet()); + } + + public synchronized void updateStatus() { + + if (!isActive) { + this.status = Status.ERR; + return; + } + + Status newStatus = Status.OK; + + for (KafkaTopicPartition topicPartition : topicPartitions) { + + // Set group status to ERROR if any topicPartition's status is STOP + if (Status.STOP == topicPartition.getStatus()) { + newStatus = Status.ERR; + break; + } + + // Set group status to ERROR if any topicPartition's status is REWIND + if (Status.REWIND == topicPartition.getStatus()) { + newStatus = Status.ERR; + break; + } + + // Set group status to ERROR if any topicPartition's status is STALL + if (Status.STALL == topicPartition.getStatus()) { + newStatus = Status.ERR; + break; + } + + // Set group status to WARN if any topicPartition's status is WARN + if (Status.WARN == topicPartition.getStatus()) { + newStatus = Status.WARN; + break; + } + } + + this.status = newStatus; + } + + + private Optional getTopicPartition(String topic, int partitionId) { + + //return committedOffsets.keySet().stream().filter(tp -> (tp.getTopicName().equals(topic) && tp.getPartitionId() == partitionId)).findFirst(); + return topicPartitions.stream() + .filter(tp -> (tp.getTopicName().equals(topic) && tp.getPartitionId() == partitionId)) + .findFirst(); + } + + private void upsertTopicPartition(KafkaCommittedOffset kafkaCommittedOffset) { + + Preconditions.checkArgument(!Strings.isNullOrEmpty(kafkaCommittedOffset.getTopicName()), "topic cannot be NULL or empty."); + Preconditions.checkArgument(kafkaCommittedOffset.getPartitionId() >= 0, "partitionId must be greater-than or equal-to zero."); + + String incomingTopicName = kafkaCommittedOffset.getTopicName(); + int incomingPartitionId = kafkaCommittedOffset.getPartitionId(); + + Optional existingTopicPartition = getTopicPartition(incomingTopicName, incomingPartitionId); + + if (existingTopicPartition.isPresent()) { + // Append committed offset info to existing set item + existingTopicPartition.get().addCommittedOffset(kafkaCommittedOffset.getCommittedOffset()); + } else { + // Add a new entry to the map + KafkaTopicPartition newTopicPartition = new KafkaTopicPartition(incomingTopicName, incomingPartitionId); + newTopicPartition.addCommittedOffset(kafkaCommittedOffset.getCommittedOffset()); + topicPartitions.add(newTopicPartition); + } + } + + private void setMostRecentCommittedMillis(long mostRecentCommittedMillis) { + if (this.mostRecentCommittedMillis < mostRecentCommittedMillis) { + this.mostRecentCommittedMillis = mostRecentCommittedMillis; + } + } + + private void updateCompleteFlag() { + + this.complete = topicPartitions.stream() + .noneMatch(f -> f.getCommittedOffsets().size() < COMPLETE_THRESHOLD); + } + + public void addCommittedOffsetInfo(KafkaCommittedOffset kafkaCommittedOffset) { + + setMostRecentCommittedMillis(kafkaCommittedOffset.getCommittedOffset().getTimestamp()); + this.isActive = kafkaCommittedOffset.getGroupIsActive(); + upsertTopicPartition(kafkaCommittedOffset); + updateCompleteFlag(); + } + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + KafkaConsumerGroup that = (KafkaConsumerGroup) o; + + return getConsumerGroupName().equals(that.getConsumerGroupName()); + } + + @Override + public int hashCode() { + return getConsumerGroupName().hashCode(); + } +} diff --git a/src/main/java/com/morningstar/kafka/KafkaOffsetMetadata.java b/src/main/java/com/morningstar/kafka/KafkaOffsetMetadata.java new file mode 100644 index 0000000..7336485 --- /dev/null +++ b/src/main/java/com/morningstar/kafka/KafkaOffsetMetadata.java @@ -0,0 +1,63 @@ +package com.morningstar.kafka; + +import com.google.common.base.Preconditions; +import com.google.gson.annotations.Expose; + + +public class KafkaOffsetMetadata { + + private final long KAFKA_OFFSET_METADATA_OFFSET_DEFAULT_VALUE = -1; + private final long KAFKA_OFFSET_METADATA_TIMESTAMP_DEFAULT_VALUE = -1; + + @Expose private long offset; + @Expose private long timestamp; + + + public KafkaOffsetMetadata(long offset, long timestamp) { + createObject(offset, timestamp); + } + + public KafkaOffsetMetadata(long offset) { + createObject(offset, KAFKA_OFFSET_METADATA_TIMESTAMP_DEFAULT_VALUE); + } + + public KafkaOffsetMetadata() { + createObject(KAFKA_OFFSET_METADATA_OFFSET_DEFAULT_VALUE, KAFKA_OFFSET_METADATA_TIMESTAMP_DEFAULT_VALUE); + } + + private void createObject(long offset, long timestamp) { + Preconditions.checkArgument(offset >= -1, "offset must be greater than or equal to -1."); + Preconditions.checkArgument(timestamp >= -1, "timestamp must be greater than or equal to -1."); + + this.offset = offset; + this.timestamp = timestamp; + } + + + public long getOffset() { + return offset; + } + + public long getTimestamp() { + return timestamp; + } + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + KafkaOffsetMetadata that = (KafkaOffsetMetadata) o; + + if (getOffset() != that.getOffset()) return false; + return getTimestamp() == that.getTimestamp(); + } + + @Override + public int hashCode() { + int result = (int) (getOffset() ^ (getOffset() >>> 32)); + result = 31 * result + (int) (getTimestamp() ^ (getTimestamp() >>> 32)); + return result; + } +} diff --git a/src/main/java/com/morningstar/kafka/KafkaOffsetStorage.java b/src/main/java/com/morningstar/kafka/KafkaOffsetStorage.java new file mode 100644 index 0000000..0cc696b --- /dev/null +++ b/src/main/java/com/morningstar/kafka/KafkaOffsetStorage.java @@ -0,0 +1,133 @@ +package com.morningstar.kafka; + + +import com.google.common.collect.Sets; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +import java.util.Optional; +import java.util.Set; + + +public class KafkaOffsetStorage { + + private Set consumerGroups; + private Set logEndOffsets; + + + public KafkaOffsetStorage() { + + this.consumerGroups = Sets.newConcurrentHashSet(); + this.logEndOffsets = Sets.newConcurrentHashSet(); + } + + private Optional getConsumerGroup(String consumerGroupName) { + + return consumerGroups.stream().filter(cg -> cg.getConsumerGroupName().equals(consumerGroupName)).findFirst(); + } + + private void upsertConsumerGroup(KafkaCommittedOffset kafkaCommittedOffset) { + + // Store committedOffset + Optional consumerGroup = getConsumerGroup(kafkaCommittedOffset.getGroupName()); + + if (consumerGroup.isPresent()) { + // Update existing group + consumerGroup.get().addCommittedOffsetInfo(kafkaCommittedOffset); + } else { + // Create new group + KafkaConsumerGroup newConsumerGroup = new KafkaConsumerGroup(kafkaCommittedOffset.getGroupName()); + newConsumerGroup.addCommittedOffsetInfo(kafkaCommittedOffset); + consumerGroups.add(newConsumerGroup); + } + } + + private void addTopicPartition(String topicName, int partitionId) { + + KafkaTopicPartitionLogEndOffset incomingTopicPartition = new KafkaTopicPartitionLogEndOffset( + new KafkaTopicPartition(topicName, partitionId)); + + if (!logEndOffsets.contains(incomingTopicPartition)) { + logEndOffsets.add(incomingTopicPartition); + } + } + + /* Calculate lag based on logEndOffset and committedOffset + * Rule 1: If logEndOffset == -1, or committedOffset == -1, then lag is -1. + * This is a timing issue and we will get caught up on the next go-around. + * Rule 2: If committedOffset > logEndOffset, then lag is 0. + * This is a timing issue where a committedOffset was reported before logEndOffset was updated. + * We will get caught up on the next go-around. + * Rule 3: Lag is logEndOffset - committedOffset + */ + private long calculateLag(long logEndOffset, long committedOffset) { + + if (logEndOffset == -1 || committedOffset == -1) { + return -1; + } else if (committedOffset > logEndOffset) { + return 0; + } else { + return logEndOffset - committedOffset; + } + } + + private Optional getTopicPartitionLogEndOffset(String topicName, int partitionId) { + + return logEndOffsets.stream() + .filter(leo -> (leo.getTopicPartition().getTopicName().equals(topicName) && leo.getTopicPartition().getPartitionId() == partitionId)) + .findFirst(); + } + + private KafkaOffsetMetadata getLogEndOffset(String topicName, int partitionId) { + + Optional topicPartitionLogEndOffset = getTopicPartitionLogEndOffset(topicName, partitionId); + + if (!topicPartitionLogEndOffset.isPresent()) { + return new KafkaOffsetMetadata(); + } else { + return topicPartitionLogEndOffset.get().getLogEndOffset(); + } + } + + + public String getConsumerGroups() { + + for (KafkaConsumerGroup consumerGroup : consumerGroups) { + consumerGroup.updateStatus(); + } + + Gson gson = new GsonBuilder().excludeFieldsWithoutExposeAnnotation().create(); + return gson.toJson(consumerGroups); + } + + public Set getLogEndOffsets() { + return logEndOffsets; + } + + public void addCommittedOffset(KafkaCommittedOffset kafkaCommittedOffset) { + + addTopicPartition(kafkaCommittedOffset.getTopicName(), kafkaCommittedOffset.getPartitionId()); + + // Calculate lag + KafkaOffsetMetadata logEndOffsetMetadata = getLogEndOffset(kafkaCommittedOffset.getTopicName(), kafkaCommittedOffset.getPartitionId()); + long lag = calculateLag(logEndOffsetMetadata.getOffset(), kafkaCommittedOffset.getCommittedOffset().getOffset()); + + // Add lag to committedOffset object + kafkaCommittedOffset.getCommittedOffset().setLag(lag); + + // Store committedOffset + upsertConsumerGroup(kafkaCommittedOffset); + } + + public void addLogEndOffset(KafkaTopicPartitionLogEndOffset kafkaTopicLogEndOffset) { + + if (logEndOffsets.contains(kafkaTopicLogEndOffset)) { + logEndOffsets.stream() + .filter(leo -> leo.equals(kafkaTopicLogEndOffset)) + .findFirst().get() + .setLogEndOffset(kafkaTopicLogEndOffset.getLogEndOffset()); + } else { + logEndOffsets.add(kafkaTopicLogEndOffset); + } + } +} diff --git a/src/main/java/com/morningstar/kafka/KafkaTopicPartition.java b/src/main/java/com/morningstar/kafka/KafkaTopicPartition.java new file mode 100644 index 0000000..237316c --- /dev/null +++ b/src/main/java/com/morningstar/kafka/KafkaTopicPartition.java @@ -0,0 +1,245 @@ +package com.morningstar.kafka; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.EvictingQueue; +import com.google.gson.annotations.Expose; + +import java.util.Comparator; +import java.util.Optional; +import java.util.stream.Stream; + + +public class KafkaTopicPartition { + + private final int MAX_HISTORICAL_CONSUMER_OFFSETS = 10; + + private final Comparator committedOffsetLagCompare = Comparator.comparing(KafkaCommittedOffsetMetadata::getLag); + private final Comparator committedOffsetTimestampCompare = Comparator.comparing(KafkaCommittedOffsetMetadata::getTimestamp); + + @Expose private String topicName; + @Expose private long partitionId; + @Expose private long currentLag; + @Expose private Status status; + + @Expose private EvictingQueue committedOffsets; + + + public KafkaTopicPartition(String topicName, long partitionId) { + createObject(topicName, partitionId, -1, System.currentTimeMillis()); + } + + private void createObject(String topicName, long partitionId, long logEndOffset, long logEndOffsetMillis) { + + Preconditions.checkArgument(!Strings.isNullOrEmpty(topicName), "topicName must not be NULL or empty."); + Preconditions.checkArgument(partitionId > -1, "partitionId must not be less than 0."); + + this.topicName = topicName; + this.partitionId = partitionId; + this.status = Status.OK; + this.committedOffsets = EvictingQueue.create(MAX_HISTORICAL_CONSUMER_OFFSETS); + } + + + public String getTopicName() { + return topicName; + } + + public long getPartitionId() { + return partitionId; + } + + public Status getStatus() { + return status; + } + + public EvictingQueue getCommittedOffsets() { + return this.committedOffsets; + } + + private Optional getMaxLagOffset() { + return committedOffsets.stream().max(committedOffsetLagCompare); + } + + private Optional getMinLagOffset() { + return committedOffsets.stream().min(committedOffsetLagCompare); + } + + private Optional getMaxTimestampOffset() { + return committedOffsets.stream().max(committedOffsetTimestampCompare); + } + + private Optional getMinTimestampOffset() { + return committedOffsets.stream().min(committedOffsetTimestampCompare); + } + + private Stream getOffsetsSortedByTimestamp() { + return committedOffsets.stream().sorted(committedOffsetTimestampCompare); + } + + private boolean hasOffsets() { + return (committedOffsets.size() > 0 ); + } + + /* Check if all offsets have a lag == -1, meaning we likely could not calculate lag due to lack of logEndOffset */ + private boolean hasOnlyOffsetsWithNegativeLag() { + + long committedOffsetSize = committedOffsets.size(); + long committedOffsetSizeWithNegativeLag = committedOffsets.stream().filter(o -> o.getLag() == -1).count(); + + return (committedOffsetSize == committedOffsetSizeWithNegativeLag); + } + + /* Check if there is at least one offset with no lag */ + private boolean hasOffsetWithNoLag() { + return (committedOffsets.stream().filter(o -> o.getLag() == 0).findFirst().isPresent()); + } + + /* Check if offsets have not been committed in a while */ + /* Will return false if there are no offsets */ + private boolean areOffsetsTimely() { + + if (!hasOffsets()) { return false; } + + long maxOffsetTimestamp = getMaxLagOffset().get().getTimestamp(); + long minOffsetTimestamp = getMinLagOffset().get().getTimestamp(); + long diffNowAndMaxCommitted = System.currentTimeMillis() - maxOffsetTimestamp; + + return (diffNowAndMaxCommitted < (maxOffsetTimestamp - minOffsetTimestamp)); + } + + /* Check if the minimum offset in the queue is smaller than the first (oldest) offset */ + /* Returns false if there are no offsets in the queue */ + private boolean didConsumerOffsetsRewind() { + + if (!hasOffsets()) { return false; } + + long minLag = getMinLagOffset().get().getLag(); + long minTimestampLag = getOffsetsSortedByTimestamp().findFirst().get().getLag(); + + return (minLag < minTimestampLag); + } + + /* Check if consumer offsets do not change, and the lag is non-zero */ + /* Returns false if there are no offsets in the queue */ + private boolean isTopicPartitionStalled() { + + /* Returns false if there are no offsets in the queue */ + if (!hasOffsets()) { return false; } + + long minTimestampOffset = getMinTimestampOffset().get().getOffset(); + long maxTimestampOffset = getMaxTimestampOffset().get().getOffset(); + + return ((minTimestampOffset == maxTimestampOffset) && (minTimestampOffset != 0)); + } + + /* Check if consumer offsets are changing, but lag is increasing over every commit */ + private boolean isConsumerLagGrowing() { + + /* Returns false if there are no offsets in the queue */ + if (!hasOffsets()) { return false; } + + boolean lagWentDown = false; + + KafkaCommittedOffsetMetadata[] offsetArray = getOffsetsSortedByTimestamp().toArray(KafkaCommittedOffsetMetadata[]::new); + long previousLag = offsetArray[0].getLag(); + + for (KafkaCommittedOffsetMetadata committedOffset : offsetArray) { + + long iteratingLag = committedOffset.getLag(); + + // This is a Rule 3 shortcut + if (iteratingLag == 0) { + lagWentDown = true; + break; + } + + // Shortcut because lag went down + if (iteratingLag < previousLag) { + lagWentDown = true; + break; + } + + // Lag went up this iteration, so continue to the next one and check again + previousLag = iteratingLag; + } + + return lagWentDown; + } + + /* Evaluate a consumer's topic-partition based on the following rules: + * Rule 0: If there are no committed offsets, then there is nothing to calculate and the period is OK. + * Rule 1: If the difference between now and the last offset timestamp is greater than the difference between the last and first offset timestamps, the + * consumer has stopped committing offsets for that partition (error) + * Rule 2: If the consumer offset decreases from one interval to the next the partition is marked as a rewind (error) + * Rule 3: If over the stored period, the lag is ever zero for the partition, the period is OK + * Rule 4: If the consumer offset does not change, and the lag is non-zero, it's an error (partition is stalled) + * Rule 5: If the consumer offsets are moving, but the lag is consistently increasing, it's a warning (consumer is slow) + */ + private void calcStatus() { + + // Rule 0: If the lag is -1, this is a special value that means there are no broker offsets yet. + // Consider it good (will get caught up in the next refresh of topics) + if (hasOnlyOffsetsWithNegativeLag()) { + status = Status.OK; + return; + } + + // Rule 1: Offsets have not been committed in a while and lag is not zero + if (!areOffsetsTimely() && currentLag > 0) { + status = Status.STOP; + return; + } + + // Rule 2: Did the consumer offsets rewind at any point? + if (didConsumerOffsetsRewind()) { + status = Status.REWIND; + return; + } + + // Rule 3: Is there ever an offset with no lag? + if (hasOffsetWithNoLag()) { + status = Status.OK; + return; + } + + // Rule 4: Is there no change in lag when lag is non-zero? + if (isTopicPartitionStalled()) { + status = Status.STALL; + return; + } + + // Rule 5: Is the consumer not keeping up? + if (isConsumerLagGrowing()) { + status = Status.WARN; + return; + } + + status = Status.OK; + } + + public void addCommittedOffset(KafkaCommittedOffsetMetadata committedOffsetMetadata) { + this.committedOffsets.add(committedOffsetMetadata); + this.currentLag = getMaxTimestampOffset().get().getLag(); + calcStatus(); + } + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + KafkaTopicPartition that = (KafkaTopicPartition) o; + + if (getPartitionId() != that.getPartitionId()) return false; + return getTopicName().equals(that.getTopicName()); + } + + @Override + public int hashCode() { + int result = getTopicName().hashCode(); + result = 31 * result + (int) (getPartitionId() ^ (getPartitionId() >>> 32)); + return result; + } +} diff --git a/src/main/java/com/morningstar/kafka/KafkaTopicPartitionLogEndOffset.java b/src/main/java/com/morningstar/kafka/KafkaTopicPartitionLogEndOffset.java new file mode 100644 index 0000000..7db0df8 --- /dev/null +++ b/src/main/java/com/morningstar/kafka/KafkaTopicPartitionLogEndOffset.java @@ -0,0 +1,49 @@ +package com.morningstar.kafka; + +public class KafkaTopicPartitionLogEndOffset { + + private KafkaTopicPartition topicPartition; + private KafkaOffsetMetadata logEndOffset; + + + public KafkaTopicPartitionLogEndOffset(KafkaTopicPartition topicPartition, KafkaOffsetMetadata logEndOffset) { + createObject(topicPartition, logEndOffset); + } + + public KafkaTopicPartitionLogEndOffset(KafkaTopicPartition topicPartition) { + createObject(topicPartition, new KafkaOffsetMetadata(-1)); + } + + private void createObject(KafkaTopicPartition topicPartition, KafkaOffsetMetadata offsetInfo) { + this.topicPartition = topicPartition; + this.logEndOffset = offsetInfo; + } + + public KafkaTopicPartition getTopicPartition() { + return topicPartition; + } + + public KafkaOffsetMetadata getLogEndOffset() { + return logEndOffset; + } + + public void setLogEndOffset(KafkaOffsetMetadata logEndOffset) { + this.logEndOffset = logEndOffset; + } + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + KafkaTopicPartitionLogEndOffset that = (KafkaTopicPartitionLogEndOffset) o; + + return topicPartition.equals(that.topicPartition); + } + + @Override + public int hashCode() { + return topicPartition.hashCode(); + } +} diff --git a/src/main/java/com/morningstar/kafka/Status.java b/src/main/java/com/morningstar/kafka/Status.java new file mode 100644 index 0000000..0df1e35 --- /dev/null +++ b/src/main/java/com/morningstar/kafka/Status.java @@ -0,0 +1,12 @@ +package com.morningstar.kafka; + +public enum Status { + + NOTFOUND, + OK, + WARN, + ERR, + STOP, + STALL, + REWIND +} diff --git a/src/main/resources/offsetapp/scripts/cluster-viz.js b/src/main/resources/offsetapp/scripts/cluster-viz.js index ab82594..9a41e28 100644 --- a/src/main/resources/offsetapp/scripts/cluster-viz.js +++ b/src/main/resources/offsetapp/scripts/cluster-viz.js @@ -2,8 +2,8 @@ function loadViz(load_to_id, data_path) { var m = [0, 120, 0, 120], - w = 1400 - m[1] - m[3], - h = 500 - m[0] - m[2], + w = 1200 - m[1] - m[3], + h = 900 - m[0] - m[2], i = 0, root; @@ -47,8 +47,8 @@ function loadViz(load_to_id, data_path) { function updateClusterViz(source, tree, vis, diagonal) { var m = [0, 120, 0, 120], - w = 1400 - m[1] - m[3], - h = 800 - m[0] - m[2], + w = 1200 - m[1] - m[3], + h = 900 - m[0] - m[2], i = 0; diff --git a/src/main/resources/offsetapp/scripts/controllers.js b/src/main/resources/offsetapp/scripts/controllers.js index 39fee71..3b4ba25 100644 --- a/src/main/resources/offsetapp/scripts/controllers.js +++ b/src/main/resources/offsetapp/scripts/controllers.js @@ -36,7 +36,7 @@ angular.module('offsetapp.controllers',["offsetapp.services"]) $scope.topic = $routeParams.topic; }]) - .controller("TopicConsumersCtrl", ["$scope", "$interval", "$routeParams", "offsetinfo", + .controller("TopicConsumersCtrl", ["$scope", "$interval", "$routeParams", "offsetinfo", function($scope, $interval, $routeParams, offsetinfo) { offsetinfo.topicConsumers($routeParams.topic, function(d) { $scope.info = d; @@ -53,7 +53,7 @@ angular.module('offsetapp.controllers',["offsetapp.services"]) offsetinfo.loadClusterViz($routeParams.group, function(d) { }); }]) - .controller("ActiveTopicsVizCtrl", ["$scope", "$interval", "$routeParams", "offsetinfo", + .controller("ActiveTopicsVizCtrl", ["$scope", "$interval", "$routeParams", "offsetinfo", function($scope, $interval, $routeParams, offsetinfo) { $scope.loading = true; offsetinfo.loadTopicConsumerViz($routeParams.group, function(d) { diff --git a/src/main/resources/offsetapp/scripts/directives.js b/src/main/resources/offsetapp/scripts/directives.js index d3bca2d..8a995ce 100644 --- a/src/main/resources/offsetapp/scripts/directives.js +++ b/src/main/resources/offsetapp/scripts/directives.js @@ -209,7 +209,7 @@ angular.module("offsetapp.directives", []) opposite: true }], series : [{ - name: "log size", + name: "log-end offset", data:d[0], yAxis: 0, color: '#088CFE', @@ -218,7 +218,7 @@ angular.module("offsetapp.directives", []) radius : 3 }}, { - name: "offset", + name: "committed offset", data:d[1], color: '#B9E6D9', yAxis: 0, diff --git a/src/main/resources/offsetapp/views/grouplist.html b/src/main/resources/offsetapp/views/grouplist.html index ed71f73..bde3c5f 100644 --- a/src/main/resources/offsetapp/views/grouplist.html +++ b/src/main/resources/offsetapp/views/grouplist.html @@ -1,5 +1,5 @@
diff --git a/src/main/resources/offsetapp/views/topiclist.html b/src/main/resources/offsetapp/views/topiclist.html index 9d4340a..fa2a40f 100644 --- a/src/main/resources/offsetapp/views/topiclist.html +++ b/src/main/resources/offsetapp/views/topiclist.html @@ -1,11 +1,35 @@
Loading ...
+ + + + + + + + + + + + + + + + + + + + +
TopicPartitionLog-End Offset
{{t.topic}}{{t.sumLogEndOffset}}
{{p.partition}}{{p.logEndOffset}}
+ + \ No newline at end of file diff --git a/src/main/scala/com/quantifind/kafka/OffsetGetter.scala b/src/main/scala/com/quantifind/kafka/OffsetGetter.scala index 7ed9244..10c3489 100644 --- a/src/main/scala/com/quantifind/kafka/OffsetGetter.scala +++ b/src/main/scala/com/quantifind/kafka/OffsetGetter.scala @@ -1,13 +1,13 @@ package com.quantifind.kafka -import java.util.concurrent.Executors import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.Executors -import com.quantifind.kafka.OffsetGetter.{BrokerInfo, KafkaInfo, OffsetInfo} +import com.twitter.util.Time import com.quantifind.kafka.core._ import com.quantifind.kafka.offsetapp.OffsetGetterArgs +import com.quantifind.kafka.OffsetGetter.{BrokerInfo, KafkaInfo, OffsetInfo} import com.quantifind.utils.ZkUtilsWrapper -import com.twitter.util.Time import kafka.common.BrokerNotAvailableException import kafka.consumer.{ConsumerConnector, SimpleConsumer} import kafka.utils.{Json, Logging, ZkUtils} @@ -41,8 +41,12 @@ trait OffsetGetter extends Logging { def getGroups: Seq[String] + def getActiveGroups: Seq[String] + def getTopicMap: Map[String, Seq[String]] + def getTopicsAndLogEndOffsets: Seq[TopicLogEndOffsetInfo] + def getActiveTopicMap: Map[String, Seq[String]] def processPartition(group: String, topic: String, pid: Int): Option[OffsetInfo] @@ -113,6 +117,9 @@ trait OffsetGetter extends Logging { // get list of all topics def getTopics: Seq[String] = { try { + val unsortedTopics: Seq[String] = zkUtils.getChildren(ZkUtils.BrokerTopicsPath) + + zkUtils.getChildren(ZkUtils.BrokerTopicsPath).sortWith(_ < _) } catch { case NonFatal(t) => @@ -185,9 +192,23 @@ trait OffsetGetter extends Logging { } }.toSeq) } + + def getConsumerGroupStatus: String + + def close() { + // TODO: What is going on here? This code is broken + /* + for (consumerOpt <- consumerMap.values) { + consumerOpt match { + case Some(consumer) => consumer.close() + case None => // ignore + } + } + */ + } } -object OffsetGetter { +object OffsetGetter extends Logging { case class KafkaInfo(name: String, brokers: Seq[BrokerInfo], offsets: Seq[OffsetInfo]) @@ -210,10 +231,39 @@ object OffsetGetter { var newKafkaConsumer: KafkaConsumer[String, String] = null def createZkUtils(args: OffsetGetterArgs): ZkUtils = { - ZkUtils(args.zk, + + val sleepAfterFailedZkUtilsInstantiation: Int = 30000 + var zkUtils: ZkUtils = null + + while (null == zkUtils) { + + try { + + info("Creating new ZkUtils object."); + zkUtils = ZkUtils(args.zk, args.zkSessionTimeout.toMillis.toInt, args.zkConnectionTimeout.toMillis.toInt, JaasUtils.isZkSecurityEnabled()) + } + + catch { + + case e: Throwable => + + if (null != zkUtils) { + + zkUtils.close() + zkUtils = null + } + + val errorMsg = "Error creating ZkUtils. Will attempt to re-create in %d seconds".format(sleepAfterFailedZkUtilsInstantiation) + error(errorMsg, e) + Thread.sleep(sleepAfterFailedZkUtilsInstantiation) + } + } + + info("Created zkUtils object: "+ zkUtils) + zkUtils } def getInstance(args: OffsetGetterArgs): OffsetGetter = { @@ -250,4 +300,4 @@ object OffsetGetter { new ZKOffsetGetter(zkUtils) } } -} \ No newline at end of file +} diff --git a/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala b/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala index 6f6a62e..7ea1c1b 100644 --- a/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala +++ b/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala @@ -2,8 +2,15 @@ package com.quantifind.kafka.core import java.nio.{BufferUnderflowException, ByteBuffer} import java.util +import java.util.concurrent.atomic.AtomicReference import java.util.{Arrays, Properties} +import com.morningstar.kafka.KafkaCommittedOffset +import com.morningstar.kafka.KafkaOffsetMetadata +import com.morningstar.kafka.KafkaOffsetStorage +import com.morningstar.kafka.KafkaTopicPartition +import com.morningstar.kafka.KafkaTopicPartitionLogEndOffset + import com.quantifind.kafka.OffsetGetter.OffsetInfo import com.quantifind.kafka.offsetapp.OffsetGetterArgs import com.quantifind.kafka.{Node, OffsetGetter} @@ -17,14 +24,14 @@ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.{PartitionInfo, TopicPartition} -import scala.collection._ +import scala.collection.{mutable, _} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.{Await, Future, duration} /** - * Created by rcasey on 11/16/2016. - */ + * Created by rcasey on 11/16/2016. + */ class KafkaOffsetGetter(zkUtilsWrapper: ZkUtilsWrapper, args: OffsetGetterArgs) extends OffsetGetter { import KafkaOffsetGetter._ @@ -37,27 +44,55 @@ class KafkaOffsetGetter(zkUtilsWrapper: ZkUtilsWrapper, args: OffsetGetterArgs) val topicPartition = new TopicPartition(topic, partitionId) val topicAndPartition = TopicAndPartition(topic, partitionId) + val optionalOffsetMetaData: Option[OffsetAndMetadata] = committedOffsetMap.get(GroupTopicPartition(group, topicAndPartition)) - committedOffsetMap.get(GroupTopicPartition(group, topicAndPartition)) map { offsetMetaData => - - // BIT O'HACK to deal with timing: - // Due to thread and processing timing, it is possible that the value we have for the topicPartition's - // logEndOffset has not yet been updated to match the value on the broker. When this happens, report the - // topicPartition's logEndOffset as "committedOffset - lag", as the topicPartition's logEndOffset is *AT LEAST* - // this value - val logEndOffset: Option[Long] = logEndOffsetsMap.get(topicPartition) - val committedOffset: Long = offsetMetaData.offset - val lag: Long = logEndOffset.get - committedOffset - val logEndOffsetReported: Long = if (lag < 0) committedOffset - lag else logEndOffset.get - - // Get client information if we can find an associated client - var clientString: Option[String] = Option("NA") - val filteredClients = clients.filter(c => (c.group == group && c.topicPartitions.contains(topicPartition))) - if (!filteredClients.isEmpty) { - val client: ClientGroup = filteredClients.head - clientString = Option(client.clientId + client.clientHost) - } + if (!optionalOffsetMetaData.isDefined) { + error(s"processPartition: Could not find group-topic-partition in committedOffsetsMap, g:$group,t:$topic,p:$partitionId") + return None + } + + val offsetMetaData: OffsetAndMetadata = optionalOffsetMetaData.get; + val isActiveGroup: Boolean = isGroupActive(group) + + kafkaOffsetStorage.addCommittedOffset( + new KafkaCommittedOffset( + group, + isActiveGroup, + topic, + partitionId, + offsetMetaData.offset, + offsetMetaData.commitTimestamp + ) + ) + + if (!isActiveGroup) { + info(s"processPartition: Not reporting offset because group is not active, g:$group,t:$topic,p:$partitionId") + return None + } + + val logEndOffset: Long = logEndOffsetsMap.get(topicPartition).getOrElse(-1) + + if (logEndOffset == -1) { + info(s"processPartition: Not reporting this offset as we do not yet have logEndOffset to calc lag, g:$group,t:$topic,p:$partitionId") + return None + } + + // BIT O'HACK to deal with timing: + // Due to thread and processing timing, it is possible that the value we have for the topicPartition's + // logEndOffset has not yet been updated to match the value on the broker. When this happens, report the + // topicPartition's logEndOffset as "committedOffset - lag", as the topicPartition's logEndOffset is *AT LEAST* + // this value + val committedOffset: Long = offsetMetaData.offset + val logEndOffsetReported: Long = if (committedOffset > logEndOffset) committedOffset else logEndOffset + + // Get client information if we can find an associated client + var clientString: Option[String] = Option("NA") + val filteredClient = clients.get().filter(c => (c.group == group && c.topicPartitions.contains(topicPartition))).headOption + if (filteredClient.isDefined) { + clientString = Option(filteredClient.get.clientId + filteredClient.get.clientHost) + } + Some( OffsetInfo(group = group, topic = topic, partition = partitionId, @@ -66,47 +101,95 @@ class KafkaOffsetGetter(zkUtilsWrapper: ZkUtilsWrapper, args: OffsetGetterArgs) owner = clientString, creation = Time.fromMilliseconds(offsetMetaData.commitTimestamp), modified = Time.fromMilliseconds(offsetMetaData.expireTimestamp)) - } + ) } override def getGroups: Seq[String] = { - topicAndGroups.groupBy(_.group).keySet.toSeq.sorted + committedOffsetMap.keys.map(_.group).toSet.toSeq.sorted + //topicAndGroups.get().groupBy(_.group).keySet.toSeq.sorted + } + + override def getActiveGroups: Seq[String] = { + activeTopicAndGroups.get() + .map(_.group) + .toSeq } override def getTopicList(group: String): List[String] = { - topicAndGroups.filter(_.group == group).groupBy(_.topic).keySet.toList.sorted + committedOffsetMap.keySet + .filter(_.group.equals(group)) + .map(_.topicPartition.topic) + .toSeq + .toList + //topicAndGroups.get().filter(_.group == group).groupBy(_.topic).keySet.toList.sorted } override def getTopicMap: Map[String, scala.Seq[String]] = { - topicAndGroups.groupBy(_.topic).mapValues(_.map(_.group).toSeq) + committedOffsetMap.keySet + .groupBy(_.topicPartition.topic) + .mapValues(_.map(_.group).toSeq) + + //topicAndGroups.get().groupBy(_.topic).mapValues(_.map(_.group).toSeq) } override def getActiveTopicMap: Map[String, Seq[String]] = { - getTopicMap + activeTopicAndGroups.get().groupBy(_.topic).mapValues(_.map(_.group).toSeq) + //getTopicMap } override def getTopics: Seq[String] = { - topicPartitionsMap.keys.toSeq.sorted + activeTopicPartitionsMap.get().keys.toSeq.sorted + } + + override def getTopicsAndLogEndOffsets: Seq[TopicLogEndOffsetInfo] = { + var topicsAndLogEndOffsets: Set[TopicLogEndOffsetInfo] = mutable.HashSet() + + for (topic: String <- activeTopicPartitionsMap.get().keys) { + + var topicPartitionOffsetSet: Set[TopicPartitionLogEndOffset] = new mutable.HashSet[TopicPartitionLogEndOffset] + var sumLogEndOffset: Long = 0; + + val topicPartitionOffset: immutable.Map[TopicPartition, Long] = logEndOffsetsMap.filter(p => p._1.topic() == topic).toMap + for (topicPartition: TopicPartition <- topicPartitionOffset.keys.toSeq.sortWith(_.partition() > _.partition())) { + + val partition: Int = topicPartition.partition() + val logEndOffset: Long = logEndOffsetsMap.get(topicPartition).getOrElse(0) + + sumLogEndOffset += logEndOffset + topicPartitionOffsetSet += TopicPartitionLogEndOffset(topic, partition, logEndOffset) + } + + topicsAndLogEndOffsets += TopicLogEndOffsetInfo(topic, sumLogEndOffset, topicPartitionOffsetSet.toSeq.sorted) + } + + topicsAndLogEndOffsets.toSeq.sorted } override def getClusterViz: Node = { - val clusterNodes = topicPartitionsMap.values.map(partition => { + val clusterNodes = activeTopicPartitionsMap.get().values.map(partition => { Node(partition.get(0).leader().host() + ":" + partition.get(0).leader().port(), Seq()) }).toSet.toSeq.sortWith(_.name < _.name) Node("KafkaCluster", clusterNodes) } + + def isGroupActive(group: String): Boolean = getActiveGroups.exists(_.equals(group)) + + override def getConsumerGroupStatus: String = { + kafkaOffsetStorage.getConsumerGroups() + } } object KafkaOffsetGetter extends Logging { val committedOffsetMap: concurrent.Map[GroupTopicPartition, OffsetAndMetadata] = concurrent.TrieMap() val logEndOffsetsMap: concurrent.Map[TopicPartition, Long] = concurrent.TrieMap() + val kafkaOffsetStorage: KafkaOffsetStorage = new KafkaOffsetStorage(); // Swap the object on update - var activeTopicPartitions: immutable.Set[TopicAndPartition] = immutable.HashSet() - var clients: immutable.Set[ClientGroup] = immutable.HashSet() - var topicAndGroups: immutable.Set[TopicAndGroup] = immutable.HashSet() - var topicPartitionsMap: immutable.Map[String, util.List[PartitionInfo]] = immutable.HashMap() + var activeTopicPartitions: AtomicReference[immutable.Set[TopicAndPartition]] = new AtomicReference(immutable.HashSet()) + var clients: AtomicReference[immutable.Set[ClientGroup]] = new AtomicReference(immutable.HashSet()) + var activeTopicAndGroups: AtomicReference[immutable.Set[TopicAndGroup]] = new AtomicReference(immutable.HashSet()) + var activeTopicPartitionsMap: AtomicReference[immutable.Map[String, util.List[PartitionInfo]]] = new AtomicReference(immutable.HashMap()) private def createNewKafkaConsumer(args: OffsetGetterArgs, group: String, autoCommitOffset: Boolean): KafkaConsumer[Array[Byte], Array[Byte]] = { @@ -162,13 +245,13 @@ object KafkaOffsetGetter extends Logging { } /** - * Attempts to parse a kafka message as an offset message. - * - * @author Robert Casey (rcasey212@gmail.com) - * @param message message retrieved from the kafka client's poll() method - * @return key-value of GroupTopicPartition and OffsetAndMetadata if the message was a valid offset message, - * otherwise None - */ + * Attempts to parse a kafka message as an offset message. + * + * @author Robert Casey (rcasey212@gmail.com) + * @param message message retrieved from the kafka client's poll() method + * @return key-value of GroupTopicPartition and OffsetAndMetadata if the message was a valid offset message, + * otherwise None + */ def tryParseOffsetMessage(message: ConsumerRecord[Array[Byte], Array[Byte]]): Option[(GroupTopicPartition, OffsetAndMetadata)] = { try { @@ -359,9 +442,9 @@ object KafkaOffsetGetter extends Logging { }) }) - activeTopicPartitions = newActiveTopicPartitions.toSet - clients = newClients.toSet - topicAndGroups = newTopicAndGroups.toSet + activeTopicPartitions.set(newActiveTopicPartitions.toSet) + clients.set(newClients.toSet) + activeTopicAndGroups.set(newTopicAndGroups.toSet) } catch { @@ -404,7 +487,6 @@ object KafkaOffsetGetter extends Logging { def startLogEndOffsetGetter(args: OffsetGetterArgs) = { - val group: String = "kafka-monitor-LogEndOffsetGetter" val sleepOnDataRetrieval: Int = 10000 val sleepOnError: Int = 30000 var logEndOffsetGetter: KafkaConsumer[Array[Byte], Array[Byte]] = null @@ -415,12 +497,13 @@ object KafkaOffsetGetter extends Logging { while (null == logEndOffsetGetter) { + val group: String = "kafka-monitor-LogEndOffsetGetter" logEndOffsetGetter = createNewKafkaConsumer(args, group, false) } // Get topic-partitions - topicPartitionsMap = JavaConversions.mapAsScalaMap(logEndOffsetGetter.listTopics()).toMap - val distinctPartitionInfo: Seq[PartitionInfo] = (topicPartitionsMap.values).flatten(listPartitionInfo => JavaConversions.asScalaBuffer(listPartitionInfo)).toSeq + activeTopicPartitionsMap.set(JavaConversions.mapAsScalaMap(logEndOffsetGetter.listTopics()).toMap) + val distinctPartitionInfo: Seq[PartitionInfo] = (activeTopicPartitionsMap.get().values).flatten(listPartitionInfo => JavaConversions.asScalaBuffer(listPartitionInfo)).toSeq // Iterate over each distinct PartitionInfo distinctPartitionInfo.foreach(partitionInfo => { @@ -431,6 +514,12 @@ object KafkaOffsetGetter extends Logging { logEndOffsetGetter.seekToEnd(topicPartition) val logEndOffset: Long = logEndOffsetGetter.position(topicPartition) + // Update KafkaOffsetStorage + kafkaOffsetStorage.addLogEndOffset( + new KafkaTopicPartitionLogEndOffset( + new KafkaTopicPartition(partitionInfo.topic, partitionInfo.partition), + new KafkaOffsetMetadata(logEndOffset, System.currentTimeMillis()))); + // Update the TopicPartition map with the current LogEndOffset if it exists, else add a new entry to the map if (logEndOffsetsMap.contains(topicPartition)) { logEndOffsetsMap.update(topicPartition, logEndOffset) @@ -454,6 +543,8 @@ object KafkaOffsetGetter extends Logging { logEndOffsetGetter.close() logEndOffsetGetter = null } + + Thread.sleep(sleepOnError) } } } @@ -462,4 +553,33 @@ object KafkaOffsetGetter extends Logging { case class TopicAndGroup(topic: String, group: String) -case class ClientGroup(group: String, clientId: String, clientHost: String, topicPartitions: Set[TopicPartition]) \ No newline at end of file +case class ClientGroup(group: String, clientId: String, clientHost: String, topicPartitions: Set[TopicPartition]) + +case class TopicPartitionLogEndOffset(topic: String, partition: Int, logEndOffset: Long) extends Ordered [TopicPartitionLogEndOffset] { + + def compare (that: TopicPartitionLogEndOffset) = { + if (this.topic == that.topic) { + if (this.partition > that.partition) + 1 + else if (this.partition < that.partition) + -1 + else + 0 + } + else if (this.topic > that.topic) + 1 + else + -1 + } +} + +case class TopicLogEndOffsetInfo(topic: String, sumLogEndOffset: Long, partitions: Seq[TopicPartitionLogEndOffset]) extends Ordered[TopicLogEndOffsetInfo] { + + def compare (that:TopicLogEndOffsetInfo) = { + if (this.topic == that.topic) 0 + else if (this.topic > that.topic) 1 + else -1 + } +} + +//case class OffsetMetadataAndLag diff --git a/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala b/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala index 1a59430..a987086 100644 --- a/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala +++ b/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala @@ -1,13 +1,15 @@ package com.quantifind.kafka.core +import com.morningstar.kafka.KafkaConsumerGroup import com.quantifind.kafka.OffsetGetter import com.quantifind.kafka.OffsetGetter.OffsetInfo import com.quantifind.utils.ZkUtilsWrapper import com.twitter.util.Time import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo} import kafka.common.TopicAndPartition -import kafka.utils.{Json} +import kafka.utils.Json import org.I0Itec.zkclient.exception.ZkNoNodeException +import org.apache.kafka.common.TopicPartition import org.apache.zookeeper.data.Stat import scala.collection._ @@ -72,6 +74,8 @@ class StormOffsetGetter(theZkUtils: ZkUtilsWrapper, zkOffsetBase: String) extend } } + override def getActiveGroups: Seq[String] = getGroups + /** * Finds all topics for this group, for Kafka Spout there is only one */ @@ -115,4 +119,8 @@ class StormOffsetGetter(theZkUtils: ZkUtilsWrapper, zkOffsetBase: String) extend // not really have a way to determine which consumer is active now, so return all getTopicMap } -} \ No newline at end of file + + override def getTopicsAndLogEndOffsets: Seq[TopicLogEndOffsetInfo] = ??? + + override def getConsumerGroupStatus: String = ??? +} diff --git a/src/main/scala/com/quantifind/kafka/core/ZKOffsetGetter.scala b/src/main/scala/com/quantifind/kafka/core/ZKOffsetGetter.scala index edc5b11..742dafa 100644 --- a/src/main/scala/com/quantifind/kafka/core/ZKOffsetGetter.scala +++ b/src/main/scala/com/quantifind/kafka/core/ZKOffsetGetter.scala @@ -1,5 +1,6 @@ package com.quantifind.kafka.core +import com.morningstar.kafka.KafkaConsumerGroup import com.quantifind.kafka.OffsetGetter import com.quantifind.kafka.OffsetGetter.OffsetInfo import com.quantifind.utils.ZkUtilsWrapper @@ -10,6 +11,7 @@ import kafka.common.TopicAndPartition import kafka.utils.ZkUtils import org.apache.zookeeper.data.Stat import org.I0Itec.zkclient.exception.ZkNoNodeException +import org.apache.kafka.common.TopicPartition import scala.collection._ import scala.util.control.NonFatal @@ -68,6 +70,8 @@ class ZKOffsetGetter(theZkUtils: ZkUtilsWrapper) extends OffsetGetter { } } + override def getActiveGroups: Seq[String] = getGroups + override def getTopicList(group: String): List[String] = { try { zkUtils.getChildren(s"${ZkUtils.ConsumersPath}/$group/offsets").toList @@ -118,4 +122,8 @@ class ZKOffsetGetter(theZkUtils: ZkUtilsWrapper) extends OffsetGetter { Map() } } + + override def getTopicsAndLogEndOffsets: Seq[TopicLogEndOffsetInfo] = ??? + + override def getConsumerGroupStatus: String = ??? } diff --git a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetDB.scala b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetDB.scala index 3758618..f37c959 100644 --- a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetDB.scala +++ b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetDB.scala @@ -9,120 +9,129 @@ import com.quantifind.kafka.offsetapp.OffsetDB.{DbOffsetInfo, OffsetHistory, Off import com.twitter.util.Time /** - * Tools to store offsets in a DB - * User: andrews - * Date: 1/27/14 - */ + * Store and retrieve offsets in a SQLite file + * User: andrews, rcasey212 + * Date: 1/27/14 + */ class OffsetDB(dbfile: String) { - val database = Database.forURL(s"jdbc:sqlite:$dbfile.db", - driver = "org.sqlite.JDBC") - - implicit val twitterTimeMap = MappedColumnType.base[Time, Long]( - { - time => time.inMillis - }, { - millis => Time.fromMilliseconds(millis) - } - ) - - class Offset(tag: Tag) extends Table[DbOffsetInfo](tag, "OFFSETS") { - def id = column[Int]("id", O.PrimaryKey, O.AutoInc) - - val group = column[String]("group") - val topic = column[String]("topic") - val partition = column[Int]("partition") - val offset = column[Long]("offset") - val logSize = column[Long]("log_size") - val owner = column[Option[String]]("owner") - val timestamp = column[Long]("timestamp") - val creation = column[Time]("creation") - val modified = column[Time]("modified") - - - def * = (id.?, group, topic, partition, offset, logSize, owner, timestamp, creation, modified).shaped <>(DbOffsetInfo.parse, DbOffsetInfo.unparse) - - def forHistory = (timestamp, partition, owner, offset, logSize) <>(OffsetPoints.tupled, OffsetPoints.unapply) - - def idx = index("idx_search", (group, topic)) - - def tidx = index("idx_time", (timestamp)) - - def uidx = index("idx_unique", (group, topic, partition, timestamp), unique = true) - - } - - val offsets = TableQuery[Offset] - - def insert(timestamp: Long, info: OffsetInfo) { - database.withSession { - implicit s => - offsets += DbOffsetInfo(timestamp = timestamp, offset = info) - } - } - - def insertAll(info: IndexedSeq[OffsetInfo]) { - val now = System.currentTimeMillis - database.withTransaction { - implicit s => - offsets ++= info.map(i => DbOffsetInfo(timestamp = now, offset = i)) - } - } - - def emptyOld(since: Long) { - database.withSession { - implicit s => - offsets.filter(_.timestamp < since).delete - } - } - - def offsetHistory(group: String, topic: String): OffsetHistory = database.withSession { - implicit s => - val o = offsets - .filter(off => off.group === group && off.topic === topic) - .sortBy(_.timestamp) - .map(_.forHistory) - .list(implicitly[JdbcBackend#Session]) - OffsetHistory(group, topic, o) - } - - def maybeCreate() { - database.withSession { - implicit s => - if (MTable.getTables("OFFSETS").list(implicitly[JdbcBackend#Session]).isEmpty) { - offsets.ddl.create - } - } - } - + var sqliteDbLock: AnyRef = new Object() + + val database = Database.forURL(s"jdbc:sqlite:$dbfile.db", + driver = "org.sqlite.JDBC") + + implicit val twitterTimeMap = MappedColumnType.base[Time, Long]( + { + time => time.inMillis + }, { + millis => Time.fromMilliseconds(millis) + } + ) + + class Offset(tag: Tag) extends Table[DbOffsetInfo](tag, "OFFSETS") { + def id = column[Int]("id", O.PrimaryKey, O.AutoInc) + + val group = column[String]("group") + val topic = column[String]("topic") + val partition = column[Int]("partition") + val offset = column[Long]("offset") + val logSize = column[Long]("log_size") + val owner = column[Option[String]]("owner") + val timestamp = column[Long]("timestamp") + val creation = column[Time]("creation") + val modified = column[Time]("modified") + + + def * = (id.?, group, topic, partition, offset, logSize, owner, timestamp, creation, modified).shaped <> (DbOffsetInfo.parse, DbOffsetInfo.unparse) + + def forHistory = (timestamp, partition, owner, offset, logSize) <> (OffsetPoints.tupled, OffsetPoints.unapply) + + def idx = index("idx_search", (group, topic)) + + def tidx = index("idx_time", (timestamp)) + + def uidx = index("idx_unique", (group, topic, partition, timestamp), unique = true) + } + + val offsets = TableQuery[Offset] + + def insert(timestamp: Long, info: OffsetInfo) { + sqliteDbLock.synchronized { + database.withSession { + implicit s => + offsets += DbOffsetInfo(timestamp = timestamp, offset = info) + } + } + } + + def insertAll(info: IndexedSeq[OffsetInfo]) { + val now = System.currentTimeMillis + sqliteDbLock.synchronized { + database.withTransaction { + implicit s => + offsets ++= info.map(i => DbOffsetInfo(timestamp = now, offset = i)) + } + } + } + + def emptyOld(since: Long) { + sqliteDbLock.synchronized { + database.withSession { + implicit s => + offsets.filter(_.timestamp < since).delete + } + } + } + + def offsetHistory(group: String, topic: String): OffsetHistory = database.withSession { + sqliteDbLock.synchronized { + implicit s => + val o = offsets + .filter(off => off.group === group && off.topic === topic) + .sortBy(_.timestamp) + .map(_.forHistory) + .list(implicitly[JdbcBackend#Session]) + OffsetHistory(group, topic, o) + } + } + + def maybeCreate() { + sqliteDbLock.synchronized { + database.withSession { + implicit s => + if (MTable.getTables("OFFSETS").list(implicitly[JdbcBackend#Session]).isEmpty) { + offsets.ddl.create + } + } + } + } } object OffsetDB { - case class OffsetPoints(timestamp: Long, partition: Int, owner: Option[String], offset: Long, logSize: Long) - - case class OffsetHistory(group: String, topic: String, offsets: Seq[OffsetPoints]) - - case class DbOffsetInfo(id: Option[Int] = None, timestamp: Long, offset: OffsetInfo) - - object DbOffsetInfo { - def parse(in: (Option[Int], String, String, Int, Long, Long, Option[String], Long, Time, Time)): DbOffsetInfo = { - val (id, group, topic, partition, offset, logSize, owner, timestamp, creation, modified) = in - DbOffsetInfo(id, timestamp, OffsetInfo(group, topic, partition, offset, logSize, owner, creation, modified)) - } - - def unparse(in: DbOffsetInfo): Option[(Option[Int], String, String, Int, Long, Long, Option[String], Long, Time, Time)] = Some( - in.id, - in.offset.group, - in.offset.topic, - in.offset.partition, - in.offset.offset, - in.offset.logSize, - in.offset.owner, - in.timestamp, - in.offset.creation, - in.offset.modified - ) - } - + case class OffsetPoints(timestamp: Long, partition: Int, owner: Option[String], offset: Long, logSize: Long) + + case class OffsetHistory(group: String, topic: String, offsets: Seq[OffsetPoints]) + + case class DbOffsetInfo(id: Option[Int] = None, timestamp: Long, offset: OffsetInfo) + + object DbOffsetInfo { + def parse(in: (Option[Int], String, String, Int, Long, Long, Option[String], Long, Time, Time)): DbOffsetInfo = { + val (id, group, topic, partition, offset, logSize, owner, timestamp, creation, modified) = in + DbOffsetInfo(id, timestamp, OffsetInfo(group, topic, partition, offset, logSize, owner, creation, modified)) + } + + def unparse(in: DbOffsetInfo): Option[(Option[Int], String, String, Int, Long, Long, Option[String], Long, Time, Time)] = Some( + in.id, + in.offset.group, + in.offset.topic, + in.offset.partition, + in.offset.offset, + in.offset.logSize, + in.offset.owner, + in.timestamp, + in.offset.creation, + in.offset.modified + ) + } } diff --git a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala index efd6d7d..a7fc541 100644 --- a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala +++ b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala @@ -13,6 +13,7 @@ import com.quantifind.sumac.{ArgMain, FieldArgs} import com.quantifind.sumac.validation.Required import com.quantifind.sumac.{ArgMain, FieldArgs} +import kafka.utils.ZkUtils import org.I0Itec.zkclient.ZkClient import scala.concurrent.duration._ import scala.language.postfixOps @@ -111,7 +112,7 @@ object OffsetGetterApp extends ArgMain[OffsetGetterArgsWGT] { } } finally { - + if (og != null) og.close() } } } diff --git a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterWeb.scala b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterWeb.scala index 759429b..82332bd 100644 --- a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterWeb.scala +++ b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterWeb.scala @@ -49,10 +49,14 @@ object OffsetGetterWeb extends UnfilteredWebApp[OWArgs] with Logging { def htmlRoot: String = "/offsetapp" - val scheduler : ScheduledExecutorService = Executors.newScheduledThreadPool(2) + val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(2) var reporters: mutable.Set[OffsetInfoReporter] = null + val millisBeforeStartingReportingThread = 0 + val millisBeforeStartingCleanupThread = 0 + + def retryTask[T](fn: => T) { try { retry(3) { @@ -65,20 +69,37 @@ object OffsetGetterWeb extends UnfilteredWebApp[OWArgs] with Logging { } def reportOffsets(args: OWArgs) { + + try { val groups = getGroups(args) groups.foreach { g => val inf = getInfo(g, args).offsets.toIndexedSeq debug(s"reporting ${inf.size}") - reporters.foreach( reporter => retryTask { reporter.report(inf) } ) + reporters.foreach(reporter => retryTask { + reporter.report(inf) + }) } } + catch { - def schedule(args: OWArgs) { + case e: Throwable => + error("Error while in reportOffsets().", e) + } + } - scheduler.scheduleAtFixedRate( () => { reportOffsets(args) }, 0, args.refresh.toMillis, TimeUnit.MILLISECONDS ) - scheduler.scheduleAtFixedRate( () => { reporters.foreach(reporter => retryTask({reporter.cleanupOldData()})) }, args.retain.toMillis, args.retain.toMillis, TimeUnit.MILLISECONDS ) + def cleanupOldData() = { + reporters.foreach(reporter => retryTask({ reporter.cleanupOldData() })); + } + + /* Schedule time-based threads */ + def schedule(args: OWArgs) { + + scheduler.scheduleAtFixedRate(() => { reportOffsets(args) }, + millisBeforeStartingReportingThread, args.refresh.toMillis, TimeUnit.MILLISECONDS) + scheduler.scheduleAtFixedRate(() => { cleanupOldData() }, + millisBeforeStartingCleanupThread, args.retain.toMillis, TimeUnit.MILLISECONDS) } def withOG[T](args: OWArgs)(f: OffsetGetter => T): T = { @@ -87,7 +108,7 @@ object OffsetGetterWeb extends UnfilteredWebApp[OWArgs] with Logging { og = OffsetGetter.getInstance(args) f(og) } finally { - + if (og != null) og.close() } } @@ -97,15 +118,24 @@ object OffsetGetterWeb extends UnfilteredWebApp[OWArgs] with Logging { def getGroups(args: OWArgs) = withOG(args) { _.getGroups + } + + def getActiveGroups(args: OWArgs) = withOG(args) { + _.getActiveGroups } def getActiveTopics(args: OWArgs) = withOG(args) { _.getActiveTopics } + def getTopics(args: OWArgs) = withOG(args) { _.getTopics } + def getTopicsAndLogEndOffsets(args: OWArgs) = withOG(args) { + _.getTopicsAndLogEndOffsets + } + def getTopicDetail(topic: String, args: OWArgs) = withOG(args) { _.getTopicDetail(topic) } @@ -118,20 +148,23 @@ object OffsetGetterWeb extends UnfilteredWebApp[OWArgs] with Logging { _.getClusterViz } + def getConsumerGroupStatus(args: OWArgs) = withOG(args) { + _.getConsumerGroupStatus + } + override def afterStop() { scheduler.shutdown() } - class TimeSerializer extends CustomSerializer[Time](format => ( - { - case JInt(s)=> - Time.fromMilliseconds(s.toLong) - }, - { - case x: Time => - JInt(x.inMilliseconds) - } + class TimeSerializer extends CustomSerializer[Time](format => ( { + case JInt(s) => + Time.fromMilliseconds(s.toLong) + }, + { + case x: Time => + JInt(x.inMilliseconds) + } )) override def setup(args: OWArgs): Plan = new Plan { @@ -144,15 +177,17 @@ object OffsetGetterWeb extends UnfilteredWebApp[OWArgs] with Logging { def intent: Plan.Intent = { case GET(Path(Seg("group" :: Nil))) => - JsonContent ~> ResponseString(write(getGroups(args))) + JsonContent ~> ResponseString(write(getActiveGroups(args))) case GET(Path(Seg("group" :: group :: Nil))) => val info = getInfo(group, args) JsonContent ~> ResponseString(write(info)) ~> Ok case GET(Path(Seg("group" :: group :: topic :: Nil))) => val offsets = args.db.offsetHistory(group, topic) JsonContent ~> ResponseString(write(offsets)) ~> Ok + case GET(Path(Seg("consumergroup" :: Nil))) => + JsonContent ~> ResponseString(getConsumerGroupStatus(args)) ~> Ok case GET(Path(Seg("topiclist" :: Nil))) => - JsonContent ~> ResponseString(write(getTopics(args))) + JsonContent ~> ResponseString(write(getTopicsAndLogEndOffsets(args))) case GET(Path(Seg("clusterlist" :: Nil))) => JsonContent ~> ResponseString(write(getClusterViz(args))) case GET(Path(Seg("topicdetails" :: topic :: Nil))) => @@ -172,10 +207,11 @@ object OffsetGetterWeb extends UnfilteredWebApp[OWArgs] with Logging { val reportersSet: mutable.Set[Class[_ <: OffsetInfoReporter]] = scala.collection.JavaConversions.asScalaSet(reportersTypes) - // SQLiteOffsetInfoReporter as a main storage is instantiated explicitly outside this loop so it is filtered out + // SQLiteOffsetInfoReporter as a main storage is instantiated separately as it has a different constructor from + // the other OffsetInfoReporter objects reportersSet .filter(!_.equals(classOf[SQLiteOffsetInfoReporter])) - .map((reporterType: Class[_ <: OffsetInfoReporter]) => createReporterInstance(reporterType, args.pluginsArgs)) + .map((reporterType: Class[_ <: OffsetInfoReporter]) => createReporterInstance(reporterType, args.pluginsArgs)) .+(new SQLiteOffsetInfoReporter(argHolder.db, args)) } diff --git a/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala b/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala index ff5e5da..2f84a6c 100644 --- a/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala +++ b/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala @@ -20,14 +20,16 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers { trait Fixture { - val mockedZkUtil = Mockito.mock(classOf[ZkUtilsWrapper]) - val mockedConsumer = Mockito.mock(classOf[SimpleConsumer]) + val mockedZkUtil: ZkUtilsWrapper = Mockito.mock(classOf[ZkUtilsWrapper]) + val mockedConsumer: SimpleConsumer = Mockito.mock(classOf[SimpleConsumer]) val testPartitionLeader = 1 val args = new OffsetGetterArgs - val offsetGetter = new KafkaOffsetGetter(mockedZkUtil, args) + val offsetGetter: KafkaOffsetGetter = new KafkaOffsetGetter(mockedZkUtil, args) offsetGetter.consumerMap += (testPartitionLeader -> Some(mockedConsumer)) + val offsetGetterSpy: KafkaOffsetGetter = spy(offsetGetter) + } "KafkaOffsetGetter" should "be able to build offset data for given partition" in new Fixture { @@ -52,8 +54,9 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers { val partitionErrorAndOffsets = Map(topicAndPartition -> PartitionOffsetsResponse(0, Seq(logEndOffset))) val offsetResponse = OffsetResponse(1, partitionErrorAndOffsets) when(mockedConsumer.getOffsetsBefore(any[OffsetRequest])).thenReturn(offsetResponse) + when(offsetGetterSpy.isGroupActive(any[String])).thenReturn(true) - offsetGetter.processPartition(testGroup, testTopic, testPartition) match { + offsetGetterSpy.processPartition(testGroup, testTopic, testPartition) match { case Some(offsetInfo) => offsetInfo.topic shouldBe testTopic offsetInfo.group shouldBe testGroup @@ -161,4 +164,4 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers { gtp.topicPartition shouldBe TopicAndPartition(topic, partition) offMeta shouldBe offsetAndMetadata } -} \ No newline at end of file +}