diff --git a/README.md b/README.md index 4996903..6d7b8d9 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ This is a small web app, you can run it locally or on a server, as long as you h ``` java -Djava.security.auth.login.config=conf/server-client-jaas.conf \ - -cp KafkaOffsetMonitor-assembly-0.4.0.jar \ + -cp KafkaOffsetMonitor-assembly-0.4.6.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --offsetStorage kafka \ --kafkaBrokers kafkabroker01:6667,kafkabroker02:6667 \ @@ -126,7 +126,7 @@ As long as this is true you will need to use local maven repo and just publish K Assuming you have a custom implementation of OffsetInfoReporter in a jar file, running it is as simple as adding the jar to the classpath when running app: ``` -java -cp KafkaOffsetMonitor-assembly-0.3.0.jar:kafka-offset-monitor-another-db-reporter.jar \ +java -cp KafkaOffsetMonitor-assembly-0.4.6.jar:kafka-offset-monitor-another-db-reporter.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --zk zkserver01,zkserver02 \ --port 8080 \ @@ -141,7 +141,3 @@ Contributing ============ The KafkaOffsetMonitor is released under the Apache License and we **welcome any contributions** within this license. Any pull request is welcome and will be reviewed and merged as quickly as possible. - -Because this open source tool is released by [Quantifind](http://www.quantifind.com) as a company, if you want to submit a pull request, you will have to sign the following simple contributors agreement: -- If you are an individual, please sign [this contributors agreement](https://docs.google.com/a/quantifind.com/document/d/1RS7qEjq3cCmJ1665UhoCMK8541Ms7KyU3kVFoO4CR_I/) and send it back to contributors@quantifind.com -- If you are contributing changes that you did as part of your work, please sign [this contributors agreement](https://docs.google.com/a/quantifind.com/document/d/1kNwLT4qG3G0Ct2mEuNdBGmKDYuApN1CpQtZF8TSVTjE/) and send it back to contributors@quantifind.com diff --git a/build.sbt b/build.sbt index caf5f9e..02589f8 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ name := "KafkaOffsetMonitor" -version := "0.4.1-SNAPSHOT" -scalaVersion := "2.11.8" +version := "0.4.6-SNAPSHOT" +scalaVersion := "2.11.11" organization := "com.quantifind" scalacOptions ++= Seq("-deprecation", "-unchecked", "-optimize", "-feature") @@ -8,18 +8,22 @@ scalacOptions ++= Seq("-deprecation", "-unchecked", "-optimize", "-feature") mainClass in Compile := Some("com.quantifind.kafka.offsetapp.OffsetGetterWeb") libraryDependencies ++= Seq( - "log4j" % "log4j" % "1.2.17", - "net.databinder" %% "unfiltered-filter" % "0.8.4", - "net.databinder" %% "unfiltered-jetty" % "0.8.4", - "net.databinder" %% "unfiltered-json4s" % "0.8.4", - "com.quantifind" %% "sumac" % "0.3.0", - "org.apache.kafka" %% "kafka" % "0.9.0.1", - "org.reflections" % "reflections" % "0.9.10", - "com.twitter" %% "util-core" % "6.40.0", - "com.typesafe.slick" %% "slick" % "2.1.0", - "org.xerial" % "sqlite-jdbc" % "3.7.2", - "org.mockito" % "mockito-all" % "1.10.19" % "test", - "org.scalatest" %% "scalatest" % "2.2.6" % "test") + "log4j" % "log4j" % "1.2.17", + "net.databinder" %% "unfiltered-filter" % "0.8.4", + "net.databinder" %% "unfiltered-jetty" % "0.8.4", + "net.databinder" %% "unfiltered-json4s" % "0.8.4", + "com.quantifind" %% "sumac" % "0.3.0", + "org.apache.kafka" %% "kafka" % "0.9.0.1", + "org.reflections" % "reflections" % "0.9.11", + "com.twitter" %% "util-core" % "7.1.0", + "com.typesafe.slick" %% "slick" % "2.1.0", + "org.xerial" % "sqlite-jdbc" % "3.18.0", + "com.google.code.gson" % "gson" % "2.8.2", + "com.google.guava" % "guava" % "20.0", + "javax.ws.rs" % "javax.ws.rs-api" % "2.0-m16", + "org.glassfish.jersey.core" % "jersey-client" % "2.25.1", + "org.mockito" % "mockito-all" % "1.10.19" % "test", + "org.scalatest" %% "scalatest" % "2.2.6" % "test") assemblyMergeStrategy in assembly := { case "about.html" => MergeStrategy.discard diff --git a/project/build.properties b/project/build.properties index 34412c6..6220d2e 100644 --- a/project/build.properties +++ b/project/build.properties @@ -2,4 +2,4 @@ project.organization=com.quantifind project.name=KafkaOffsetMonitor project.version=0.4 -sbt.version=0.13.13 +sbt.version=0.13.16 diff --git a/src/main/java/com/morningstar/kafka/KafkaCommittedOffset.java b/src/main/java/com/morningstar/kafka/KafkaCommittedOffset.java new file mode 100644 index 0000000..7f1ef7d --- /dev/null +++ b/src/main/java/com/morningstar/kafka/KafkaCommittedOffset.java @@ -0,0 +1,50 @@ +package com.morningstar.kafka; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + +public class KafkaCommittedOffset { + + private String groupName; + private boolean groupIsActive; + private String topicName; + private int partitionId; + private KafkaCommittedOffsetMetadata committedOffset; + + + public KafkaCommittedOffset(String groupName, boolean groupIsActive, String topicName, int partitionId, long committedOffset, long committedMillis) { + + Preconditions.checkArgument(!Strings.isNullOrEmpty(groupName), "groupName must not be NULL or empty."); + Preconditions.checkArgument(!Strings.isNullOrEmpty(topicName), "topicName must not be NULL or empty."); + Preconditions.checkArgument(partitionId > -1, "partitionId must be greater than or equal to 0."); + Preconditions.checkArgument(committedOffset > -1, "committedOffset must be greater than or equal to 0."); + Preconditions.checkArgument(committedMillis > -1, "committedMillis must be greater than or equal to 0."); + + this.groupName = groupName; + this.groupIsActive = groupIsActive; + this.topicName = topicName; + this.partitionId = partitionId; + this.committedOffset = new KafkaCommittedOffsetMetadata(committedOffset, committedMillis); + } + + + public String getGroupName() { + return groupName; + } + + public boolean getGroupIsActive() { + return groupIsActive; + } + + public String getTopicName() { + return topicName; + } + + public int getPartitionId() { + return partitionId; + } + + public KafkaCommittedOffsetMetadata getCommittedOffset() { + return committedOffset; + } +} diff --git a/src/main/java/com/morningstar/kafka/KafkaCommittedOffsetMetadata.java b/src/main/java/com/morningstar/kafka/KafkaCommittedOffsetMetadata.java new file mode 100644 index 0000000..cb299ba --- /dev/null +++ b/src/main/java/com/morningstar/kafka/KafkaCommittedOffsetMetadata.java @@ -0,0 +1,44 @@ +package com.morningstar.kafka; + +import com.google.common.base.Preconditions; +import com.google.gson.annotations.Expose; + +public class KafkaCommittedOffsetMetadata extends KafkaOffsetMetadata { + + @Expose private long lag = -1; + + + public KafkaCommittedOffsetMetadata(KafkaOffsetMetadata offsetMetadata, long lag) { + super(offsetMetadata.getOffset(), offsetMetadata.getTimestamp()); + verifyParameters(lag); + this.lag = lag; + } + + public KafkaCommittedOffsetMetadata(long committedOffset, long timestamp, long lag) { + super(committedOffset, timestamp); + verifyParameters(lag); + this.lag = lag; + } + + public KafkaCommittedOffsetMetadata(KafkaOffsetMetadata offsetMetadata) { + super(offsetMetadata.getOffset(), offsetMetadata.getTimestamp()); + } + + public KafkaCommittedOffsetMetadata(long committedOffset, long timestamp) { + super(committedOffset, timestamp); + } + + private void verifyParameters(long lag) { + + Preconditions.checkArgument(lag > -2, "lag must not be less than -1."); + } + + + public long getLag() { + return lag; + } + + public void setLag(long lag) { + this.lag = lag; + } +} diff --git a/src/main/java/com/morningstar/kafka/KafkaConsumerGroup.java b/src/main/java/com/morningstar/kafka/KafkaConsumerGroup.java new file mode 100644 index 0000000..6925fe6 --- /dev/null +++ b/src/main/java/com/morningstar/kafka/KafkaConsumerGroup.java @@ -0,0 +1,168 @@ +package com.morningstar.kafka; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Sets; +import com.google.gson.annotations.Expose; + +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + + +public class KafkaConsumerGroup { + + private final long COMPLETE_THRESHOLD = 10; + + @Expose private String consumerGroupName; + @Expose private boolean isActive; + @Expose private boolean complete; + @Expose private long mostRecentCommittedMillis; + @Expose private Status status; + @Expose private Set topicPartitions; + + + public KafkaConsumerGroup(String consumerGroupName) { + + Preconditions.checkArgument(!Strings.isNullOrEmpty(consumerGroupName), "consumerGroupName cannot be NULL or empty."); + + this.consumerGroupName = consumerGroupName; + this.isActive = false; + this.complete = false; + this.mostRecentCommittedMillis = -1; + this.status = Status.OK; + this.topicPartitions = Sets.newConcurrentHashSet(); + } + + + public String getConsumerGroupName() { + return consumerGroupName; + } + + public boolean getComplete() { return complete; } + + public long getMaxCommitedMillis() { + return mostRecentCommittedMillis; + } + + public boolean isActive() { + return isActive; + } + + public Set getTopicPartitions() { + + return topicPartitions; + } + + public Set getTopics() { + + return topicPartitions.stream() + .map(KafkaTopicPartition::getTopicName) + .collect(Collectors.toSet()); + } + + public synchronized void updateStatus() { + + if (!isActive) { + this.status = Status.ERR; + return; + } + + Status newStatus = Status.OK; + + for (KafkaTopicPartition topicPartition : topicPartitions) { + + // Set group status to ERROR if any topicPartition's status is STOP + if (Status.STOP == topicPartition.getStatus()) { + newStatus = Status.ERR; + break; + } + + // Set group status to ERROR if any topicPartition's status is REWIND + if (Status.REWIND == topicPartition.getStatus()) { + newStatus = Status.ERR; + break; + } + + // Set group status to ERROR if any topicPartition's status is STALL + if (Status.STALL == topicPartition.getStatus()) { + newStatus = Status.ERR; + break; + } + + // Set group status to WARN if any topicPartition's status is WARN + if (Status.WARN == topicPartition.getStatus()) { + newStatus = Status.WARN; + break; + } + } + + this.status = newStatus; + } + + + private Optional getTopicPartition(String topic, int partitionId) { + + //return committedOffsets.keySet().stream().filter(tp -> (tp.getTopicName().equals(topic) && tp.getPartitionId() == partitionId)).findFirst(); + return topicPartitions.stream() + .filter(tp -> (tp.getTopicName().equals(topic) && tp.getPartitionId() == partitionId)) + .findFirst(); + } + + private void upsertTopicPartition(KafkaCommittedOffset kafkaCommittedOffset) { + + Preconditions.checkArgument(!Strings.isNullOrEmpty(kafkaCommittedOffset.getTopicName()), "topic cannot be NULL or empty."); + Preconditions.checkArgument(kafkaCommittedOffset.getPartitionId() >= 0, "partitionId must be greater-than or equal-to zero."); + + String incomingTopicName = kafkaCommittedOffset.getTopicName(); + int incomingPartitionId = kafkaCommittedOffset.getPartitionId(); + + Optional existingTopicPartition = getTopicPartition(incomingTopicName, incomingPartitionId); + + if (existingTopicPartition.isPresent()) { + // Append committed offset info to existing set item + existingTopicPartition.get().addCommittedOffset(kafkaCommittedOffset.getCommittedOffset()); + } else { + // Add a new entry to the map + KafkaTopicPartition newTopicPartition = new KafkaTopicPartition(incomingTopicName, incomingPartitionId); + newTopicPartition.addCommittedOffset(kafkaCommittedOffset.getCommittedOffset()); + topicPartitions.add(newTopicPartition); + } + } + + private void setMostRecentCommittedMillis(long mostRecentCommittedMillis) { + if (this.mostRecentCommittedMillis < mostRecentCommittedMillis) { + this.mostRecentCommittedMillis = mostRecentCommittedMillis; + } + } + + private void updateCompleteFlag() { + + this.complete = topicPartitions.stream() + .noneMatch(f -> f.getCommittedOffsets().size() < COMPLETE_THRESHOLD); + } + + public void addCommittedOffsetInfo(KafkaCommittedOffset kafkaCommittedOffset) { + + setMostRecentCommittedMillis(kafkaCommittedOffset.getCommittedOffset().getTimestamp()); + this.isActive = kafkaCommittedOffset.getGroupIsActive(); + upsertTopicPartition(kafkaCommittedOffset); + updateCompleteFlag(); + } + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + KafkaConsumerGroup that = (KafkaConsumerGroup) o; + + return getConsumerGroupName().equals(that.getConsumerGroupName()); + } + + @Override + public int hashCode() { + return getConsumerGroupName().hashCode(); + } +} diff --git a/src/main/java/com/morningstar/kafka/KafkaOffsetMetadata.java b/src/main/java/com/morningstar/kafka/KafkaOffsetMetadata.java new file mode 100644 index 0000000..7336485 --- /dev/null +++ b/src/main/java/com/morningstar/kafka/KafkaOffsetMetadata.java @@ -0,0 +1,63 @@ +package com.morningstar.kafka; + +import com.google.common.base.Preconditions; +import com.google.gson.annotations.Expose; + + +public class KafkaOffsetMetadata { + + private final long KAFKA_OFFSET_METADATA_OFFSET_DEFAULT_VALUE = -1; + private final long KAFKA_OFFSET_METADATA_TIMESTAMP_DEFAULT_VALUE = -1; + + @Expose private long offset; + @Expose private long timestamp; + + + public KafkaOffsetMetadata(long offset, long timestamp) { + createObject(offset, timestamp); + } + + public KafkaOffsetMetadata(long offset) { + createObject(offset, KAFKA_OFFSET_METADATA_TIMESTAMP_DEFAULT_VALUE); + } + + public KafkaOffsetMetadata() { + createObject(KAFKA_OFFSET_METADATA_OFFSET_DEFAULT_VALUE, KAFKA_OFFSET_METADATA_TIMESTAMP_DEFAULT_VALUE); + } + + private void createObject(long offset, long timestamp) { + Preconditions.checkArgument(offset >= -1, "offset must be greater than or equal to -1."); + Preconditions.checkArgument(timestamp >= -1, "timestamp must be greater than or equal to -1."); + + this.offset = offset; + this.timestamp = timestamp; + } + + + public long getOffset() { + return offset; + } + + public long getTimestamp() { + return timestamp; + } + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + KafkaOffsetMetadata that = (KafkaOffsetMetadata) o; + + if (getOffset() != that.getOffset()) return false; + return getTimestamp() == that.getTimestamp(); + } + + @Override + public int hashCode() { + int result = (int) (getOffset() ^ (getOffset() >>> 32)); + result = 31 * result + (int) (getTimestamp() ^ (getTimestamp() >>> 32)); + return result; + } +} diff --git a/src/main/java/com/morningstar/kafka/KafkaOffsetStorage.java b/src/main/java/com/morningstar/kafka/KafkaOffsetStorage.java new file mode 100644 index 0000000..0cc696b --- /dev/null +++ b/src/main/java/com/morningstar/kafka/KafkaOffsetStorage.java @@ -0,0 +1,133 @@ +package com.morningstar.kafka; + + +import com.google.common.collect.Sets; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +import java.util.Optional; +import java.util.Set; + + +public class KafkaOffsetStorage { + + private Set consumerGroups; + private Set logEndOffsets; + + + public KafkaOffsetStorage() { + + this.consumerGroups = Sets.newConcurrentHashSet(); + this.logEndOffsets = Sets.newConcurrentHashSet(); + } + + private Optional getConsumerGroup(String consumerGroupName) { + + return consumerGroups.stream().filter(cg -> cg.getConsumerGroupName().equals(consumerGroupName)).findFirst(); + } + + private void upsertConsumerGroup(KafkaCommittedOffset kafkaCommittedOffset) { + + // Store committedOffset + Optional consumerGroup = getConsumerGroup(kafkaCommittedOffset.getGroupName()); + + if (consumerGroup.isPresent()) { + // Update existing group + consumerGroup.get().addCommittedOffsetInfo(kafkaCommittedOffset); + } else { + // Create new group + KafkaConsumerGroup newConsumerGroup = new KafkaConsumerGroup(kafkaCommittedOffset.getGroupName()); + newConsumerGroup.addCommittedOffsetInfo(kafkaCommittedOffset); + consumerGroups.add(newConsumerGroup); + } + } + + private void addTopicPartition(String topicName, int partitionId) { + + KafkaTopicPartitionLogEndOffset incomingTopicPartition = new KafkaTopicPartitionLogEndOffset( + new KafkaTopicPartition(topicName, partitionId)); + + if (!logEndOffsets.contains(incomingTopicPartition)) { + logEndOffsets.add(incomingTopicPartition); + } + } + + /* Calculate lag based on logEndOffset and committedOffset + * Rule 1: If logEndOffset == -1, or committedOffset == -1, then lag is -1. + * This is a timing issue and we will get caught up on the next go-around. + * Rule 2: If committedOffset > logEndOffset, then lag is 0. + * This is a timing issue where a committedOffset was reported before logEndOffset was updated. + * We will get caught up on the next go-around. + * Rule 3: Lag is logEndOffset - committedOffset + */ + private long calculateLag(long logEndOffset, long committedOffset) { + + if (logEndOffset == -1 || committedOffset == -1) { + return -1; + } else if (committedOffset > logEndOffset) { + return 0; + } else { + return logEndOffset - committedOffset; + } + } + + private Optional getTopicPartitionLogEndOffset(String topicName, int partitionId) { + + return logEndOffsets.stream() + .filter(leo -> (leo.getTopicPartition().getTopicName().equals(topicName) && leo.getTopicPartition().getPartitionId() == partitionId)) + .findFirst(); + } + + private KafkaOffsetMetadata getLogEndOffset(String topicName, int partitionId) { + + Optional topicPartitionLogEndOffset = getTopicPartitionLogEndOffset(topicName, partitionId); + + if (!topicPartitionLogEndOffset.isPresent()) { + return new KafkaOffsetMetadata(); + } else { + return topicPartitionLogEndOffset.get().getLogEndOffset(); + } + } + + + public String getConsumerGroups() { + + for (KafkaConsumerGroup consumerGroup : consumerGroups) { + consumerGroup.updateStatus(); + } + + Gson gson = new GsonBuilder().excludeFieldsWithoutExposeAnnotation().create(); + return gson.toJson(consumerGroups); + } + + public Set getLogEndOffsets() { + return logEndOffsets; + } + + public void addCommittedOffset(KafkaCommittedOffset kafkaCommittedOffset) { + + addTopicPartition(kafkaCommittedOffset.getTopicName(), kafkaCommittedOffset.getPartitionId()); + + // Calculate lag + KafkaOffsetMetadata logEndOffsetMetadata = getLogEndOffset(kafkaCommittedOffset.getTopicName(), kafkaCommittedOffset.getPartitionId()); + long lag = calculateLag(logEndOffsetMetadata.getOffset(), kafkaCommittedOffset.getCommittedOffset().getOffset()); + + // Add lag to committedOffset object + kafkaCommittedOffset.getCommittedOffset().setLag(lag); + + // Store committedOffset + upsertConsumerGroup(kafkaCommittedOffset); + } + + public void addLogEndOffset(KafkaTopicPartitionLogEndOffset kafkaTopicLogEndOffset) { + + if (logEndOffsets.contains(kafkaTopicLogEndOffset)) { + logEndOffsets.stream() + .filter(leo -> leo.equals(kafkaTopicLogEndOffset)) + .findFirst().get() + .setLogEndOffset(kafkaTopicLogEndOffset.getLogEndOffset()); + } else { + logEndOffsets.add(kafkaTopicLogEndOffset); + } + } +} diff --git a/src/main/java/com/morningstar/kafka/KafkaTopicPartition.java b/src/main/java/com/morningstar/kafka/KafkaTopicPartition.java new file mode 100644 index 0000000..237316c --- /dev/null +++ b/src/main/java/com/morningstar/kafka/KafkaTopicPartition.java @@ -0,0 +1,245 @@ +package com.morningstar.kafka; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.EvictingQueue; +import com.google.gson.annotations.Expose; + +import java.util.Comparator; +import java.util.Optional; +import java.util.stream.Stream; + + +public class KafkaTopicPartition { + + private final int MAX_HISTORICAL_CONSUMER_OFFSETS = 10; + + private final Comparator committedOffsetLagCompare = Comparator.comparing(KafkaCommittedOffsetMetadata::getLag); + private final Comparator committedOffsetTimestampCompare = Comparator.comparing(KafkaCommittedOffsetMetadata::getTimestamp); + + @Expose private String topicName; + @Expose private long partitionId; + @Expose private long currentLag; + @Expose private Status status; + + @Expose private EvictingQueue committedOffsets; + + + public KafkaTopicPartition(String topicName, long partitionId) { + createObject(topicName, partitionId, -1, System.currentTimeMillis()); + } + + private void createObject(String topicName, long partitionId, long logEndOffset, long logEndOffsetMillis) { + + Preconditions.checkArgument(!Strings.isNullOrEmpty(topicName), "topicName must not be NULL or empty."); + Preconditions.checkArgument(partitionId > -1, "partitionId must not be less than 0."); + + this.topicName = topicName; + this.partitionId = partitionId; + this.status = Status.OK; + this.committedOffsets = EvictingQueue.create(MAX_HISTORICAL_CONSUMER_OFFSETS); + } + + + public String getTopicName() { + return topicName; + } + + public long getPartitionId() { + return partitionId; + } + + public Status getStatus() { + return status; + } + + public EvictingQueue getCommittedOffsets() { + return this.committedOffsets; + } + + private Optional getMaxLagOffset() { + return committedOffsets.stream().max(committedOffsetLagCompare); + } + + private Optional getMinLagOffset() { + return committedOffsets.stream().min(committedOffsetLagCompare); + } + + private Optional getMaxTimestampOffset() { + return committedOffsets.stream().max(committedOffsetTimestampCompare); + } + + private Optional getMinTimestampOffset() { + return committedOffsets.stream().min(committedOffsetTimestampCompare); + } + + private Stream getOffsetsSortedByTimestamp() { + return committedOffsets.stream().sorted(committedOffsetTimestampCompare); + } + + private boolean hasOffsets() { + return (committedOffsets.size() > 0 ); + } + + /* Check if all offsets have a lag == -1, meaning we likely could not calculate lag due to lack of logEndOffset */ + private boolean hasOnlyOffsetsWithNegativeLag() { + + long committedOffsetSize = committedOffsets.size(); + long committedOffsetSizeWithNegativeLag = committedOffsets.stream().filter(o -> o.getLag() == -1).count(); + + return (committedOffsetSize == committedOffsetSizeWithNegativeLag); + } + + /* Check if there is at least one offset with no lag */ + private boolean hasOffsetWithNoLag() { + return (committedOffsets.stream().filter(o -> o.getLag() == 0).findFirst().isPresent()); + } + + /* Check if offsets have not been committed in a while */ + /* Will return false if there are no offsets */ + private boolean areOffsetsTimely() { + + if (!hasOffsets()) { return false; } + + long maxOffsetTimestamp = getMaxLagOffset().get().getTimestamp(); + long minOffsetTimestamp = getMinLagOffset().get().getTimestamp(); + long diffNowAndMaxCommitted = System.currentTimeMillis() - maxOffsetTimestamp; + + return (diffNowAndMaxCommitted < (maxOffsetTimestamp - minOffsetTimestamp)); + } + + /* Check if the minimum offset in the queue is smaller than the first (oldest) offset */ + /* Returns false if there are no offsets in the queue */ + private boolean didConsumerOffsetsRewind() { + + if (!hasOffsets()) { return false; } + + long minLag = getMinLagOffset().get().getLag(); + long minTimestampLag = getOffsetsSortedByTimestamp().findFirst().get().getLag(); + + return (minLag < minTimestampLag); + } + + /* Check if consumer offsets do not change, and the lag is non-zero */ + /* Returns false if there are no offsets in the queue */ + private boolean isTopicPartitionStalled() { + + /* Returns false if there are no offsets in the queue */ + if (!hasOffsets()) { return false; } + + long minTimestampOffset = getMinTimestampOffset().get().getOffset(); + long maxTimestampOffset = getMaxTimestampOffset().get().getOffset(); + + return ((minTimestampOffset == maxTimestampOffset) && (minTimestampOffset != 0)); + } + + /* Check if consumer offsets are changing, but lag is increasing over every commit */ + private boolean isConsumerLagGrowing() { + + /* Returns false if there are no offsets in the queue */ + if (!hasOffsets()) { return false; } + + boolean lagWentDown = false; + + KafkaCommittedOffsetMetadata[] offsetArray = getOffsetsSortedByTimestamp().toArray(KafkaCommittedOffsetMetadata[]::new); + long previousLag = offsetArray[0].getLag(); + + for (KafkaCommittedOffsetMetadata committedOffset : offsetArray) { + + long iteratingLag = committedOffset.getLag(); + + // This is a Rule 3 shortcut + if (iteratingLag == 0) { + lagWentDown = true; + break; + } + + // Shortcut because lag went down + if (iteratingLag < previousLag) { + lagWentDown = true; + break; + } + + // Lag went up this iteration, so continue to the next one and check again + previousLag = iteratingLag; + } + + return lagWentDown; + } + + /* Evaluate a consumer's topic-partition based on the following rules: + * Rule 0: If there are no committed offsets, then there is nothing to calculate and the period is OK. + * Rule 1: If the difference between now and the last offset timestamp is greater than the difference between the last and first offset timestamps, the + * consumer has stopped committing offsets for that partition (error) + * Rule 2: If the consumer offset decreases from one interval to the next the partition is marked as a rewind (error) + * Rule 3: If over the stored period, the lag is ever zero for the partition, the period is OK + * Rule 4: If the consumer offset does not change, and the lag is non-zero, it's an error (partition is stalled) + * Rule 5: If the consumer offsets are moving, but the lag is consistently increasing, it's a warning (consumer is slow) + */ + private void calcStatus() { + + // Rule 0: If the lag is -1, this is a special value that means there are no broker offsets yet. + // Consider it good (will get caught up in the next refresh of topics) + if (hasOnlyOffsetsWithNegativeLag()) { + status = Status.OK; + return; + } + + // Rule 1: Offsets have not been committed in a while and lag is not zero + if (!areOffsetsTimely() && currentLag > 0) { + status = Status.STOP; + return; + } + + // Rule 2: Did the consumer offsets rewind at any point? + if (didConsumerOffsetsRewind()) { + status = Status.REWIND; + return; + } + + // Rule 3: Is there ever an offset with no lag? + if (hasOffsetWithNoLag()) { + status = Status.OK; + return; + } + + // Rule 4: Is there no change in lag when lag is non-zero? + if (isTopicPartitionStalled()) { + status = Status.STALL; + return; + } + + // Rule 5: Is the consumer not keeping up? + if (isConsumerLagGrowing()) { + status = Status.WARN; + return; + } + + status = Status.OK; + } + + public void addCommittedOffset(KafkaCommittedOffsetMetadata committedOffsetMetadata) { + this.committedOffsets.add(committedOffsetMetadata); + this.currentLag = getMaxTimestampOffset().get().getLag(); + calcStatus(); + } + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + KafkaTopicPartition that = (KafkaTopicPartition) o; + + if (getPartitionId() != that.getPartitionId()) return false; + return getTopicName().equals(that.getTopicName()); + } + + @Override + public int hashCode() { + int result = getTopicName().hashCode(); + result = 31 * result + (int) (getPartitionId() ^ (getPartitionId() >>> 32)); + return result; + } +} diff --git a/src/main/java/com/morningstar/kafka/KafkaTopicPartitionLogEndOffset.java b/src/main/java/com/morningstar/kafka/KafkaTopicPartitionLogEndOffset.java new file mode 100644 index 0000000..7db0df8 --- /dev/null +++ b/src/main/java/com/morningstar/kafka/KafkaTopicPartitionLogEndOffset.java @@ -0,0 +1,49 @@ +package com.morningstar.kafka; + +public class KafkaTopicPartitionLogEndOffset { + + private KafkaTopicPartition topicPartition; + private KafkaOffsetMetadata logEndOffset; + + + public KafkaTopicPartitionLogEndOffset(KafkaTopicPartition topicPartition, KafkaOffsetMetadata logEndOffset) { + createObject(topicPartition, logEndOffset); + } + + public KafkaTopicPartitionLogEndOffset(KafkaTopicPartition topicPartition) { + createObject(topicPartition, new KafkaOffsetMetadata(-1)); + } + + private void createObject(KafkaTopicPartition topicPartition, KafkaOffsetMetadata offsetInfo) { + this.topicPartition = topicPartition; + this.logEndOffset = offsetInfo; + } + + public KafkaTopicPartition getTopicPartition() { + return topicPartition; + } + + public KafkaOffsetMetadata getLogEndOffset() { + return logEndOffset; + } + + public void setLogEndOffset(KafkaOffsetMetadata logEndOffset) { + this.logEndOffset = logEndOffset; + } + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + KafkaTopicPartitionLogEndOffset that = (KafkaTopicPartitionLogEndOffset) o; + + return topicPartition.equals(that.topicPartition); + } + + @Override + public int hashCode() { + return topicPartition.hashCode(); + } +} diff --git a/src/main/java/com/morningstar/kafka/Status.java b/src/main/java/com/morningstar/kafka/Status.java new file mode 100644 index 0000000..0df1e35 --- /dev/null +++ b/src/main/java/com/morningstar/kafka/Status.java @@ -0,0 +1,12 @@ +package com.morningstar.kafka; + +public enum Status { + + NOTFOUND, + OK, + WARN, + ERR, + STOP, + STALL, + REWIND +} diff --git a/src/main/resources/offsetapp/scripts/cluster-viz.js b/src/main/resources/offsetapp/scripts/cluster-viz.js index ab82594..9a41e28 100644 --- a/src/main/resources/offsetapp/scripts/cluster-viz.js +++ b/src/main/resources/offsetapp/scripts/cluster-viz.js @@ -2,8 +2,8 @@ function loadViz(load_to_id, data_path) { var m = [0, 120, 0, 120], - w = 1400 - m[1] - m[3], - h = 500 - m[0] - m[2], + w = 1200 - m[1] - m[3], + h = 900 - m[0] - m[2], i = 0, root; @@ -47,8 +47,8 @@ function loadViz(load_to_id, data_path) { function updateClusterViz(source, tree, vis, diagonal) { var m = [0, 120, 0, 120], - w = 1400 - m[1] - m[3], - h = 800 - m[0] - m[2], + w = 1200 - m[1] - m[3], + h = 900 - m[0] - m[2], i = 0; diff --git a/src/main/resources/offsetapp/scripts/controllers.js b/src/main/resources/offsetapp/scripts/controllers.js index 39fee71..3b4ba25 100644 --- a/src/main/resources/offsetapp/scripts/controllers.js +++ b/src/main/resources/offsetapp/scripts/controllers.js @@ -36,7 +36,7 @@ angular.module('offsetapp.controllers',["offsetapp.services"]) $scope.topic = $routeParams.topic; }]) - .controller("TopicConsumersCtrl", ["$scope", "$interval", "$routeParams", "offsetinfo", + .controller("TopicConsumersCtrl", ["$scope", "$interval", "$routeParams", "offsetinfo", function($scope, $interval, $routeParams, offsetinfo) { offsetinfo.topicConsumers($routeParams.topic, function(d) { $scope.info = d; @@ -53,7 +53,7 @@ angular.module('offsetapp.controllers',["offsetapp.services"]) offsetinfo.loadClusterViz($routeParams.group, function(d) { }); }]) - .controller("ActiveTopicsVizCtrl", ["$scope", "$interval", "$routeParams", "offsetinfo", + .controller("ActiveTopicsVizCtrl", ["$scope", "$interval", "$routeParams", "offsetinfo", function($scope, $interval, $routeParams, offsetinfo) { $scope.loading = true; offsetinfo.loadTopicConsumerViz($routeParams.group, function(d) { diff --git a/src/main/resources/offsetapp/scripts/directives.js b/src/main/resources/offsetapp/scripts/directives.js index d3bca2d..8a995ce 100644 --- a/src/main/resources/offsetapp/scripts/directives.js +++ b/src/main/resources/offsetapp/scripts/directives.js @@ -209,7 +209,7 @@ angular.module("offsetapp.directives", []) opposite: true }], series : [{ - name: "log size", + name: "log-end offset", data:d[0], yAxis: 0, color: '#088CFE', @@ -218,7 +218,7 @@ angular.module("offsetapp.directives", []) radius : 3 }}, { - name: "offset", + name: "committed offset", data:d[1], color: '#B9E6D9', yAxis: 0, diff --git a/src/main/resources/offsetapp/views/grouplist.html b/src/main/resources/offsetapp/views/grouplist.html index ed71f73..bde3c5f 100644 --- a/src/main/resources/offsetapp/views/grouplist.html +++ b/src/main/resources/offsetapp/views/grouplist.html @@ -1,5 +1,5 @@
diff --git a/src/main/resources/offsetapp/views/topiclist.html b/src/main/resources/offsetapp/views/topiclist.html index 9d4340a..fa2a40f 100644 --- a/src/main/resources/offsetapp/views/topiclist.html +++ b/src/main/resources/offsetapp/views/topiclist.html @@ -1,11 +1,35 @@
Loading ...
+ + + + + + + + + + + + + + + + + + + + +
TopicPartitionLog-End Offset
{{t.topic}}{{t.sumLogEndOffset}}
{{p.partition}}{{p.logEndOffset}}
+ + \ No newline at end of file diff --git a/src/main/scala/com/quantifind/kafka/OffsetGetter.scala b/src/main/scala/com/quantifind/kafka/OffsetGetter.scala index 7ed9244..10c3489 100644 --- a/src/main/scala/com/quantifind/kafka/OffsetGetter.scala +++ b/src/main/scala/com/quantifind/kafka/OffsetGetter.scala @@ -1,13 +1,13 @@ package com.quantifind.kafka -import java.util.concurrent.Executors import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.Executors -import com.quantifind.kafka.OffsetGetter.{BrokerInfo, KafkaInfo, OffsetInfo} +import com.twitter.util.Time import com.quantifind.kafka.core._ import com.quantifind.kafka.offsetapp.OffsetGetterArgs +import com.quantifind.kafka.OffsetGetter.{BrokerInfo, KafkaInfo, OffsetInfo} import com.quantifind.utils.ZkUtilsWrapper -import com.twitter.util.Time import kafka.common.BrokerNotAvailableException import kafka.consumer.{ConsumerConnector, SimpleConsumer} import kafka.utils.{Json, Logging, ZkUtils} @@ -41,8 +41,12 @@ trait OffsetGetter extends Logging { def getGroups: Seq[String] + def getActiveGroups: Seq[String] + def getTopicMap: Map[String, Seq[String]] + def getTopicsAndLogEndOffsets: Seq[TopicLogEndOffsetInfo] + def getActiveTopicMap: Map[String, Seq[String]] def processPartition(group: String, topic: String, pid: Int): Option[OffsetInfo] @@ -113,6 +117,9 @@ trait OffsetGetter extends Logging { // get list of all topics def getTopics: Seq[String] = { try { + val unsortedTopics: Seq[String] = zkUtils.getChildren(ZkUtils.BrokerTopicsPath) + + zkUtils.getChildren(ZkUtils.BrokerTopicsPath).sortWith(_ < _) } catch { case NonFatal(t) => @@ -185,9 +192,23 @@ trait OffsetGetter extends Logging { } }.toSeq) } + + def getConsumerGroupStatus: String + + def close() { + // TODO: What is going on here? This code is broken + /* + for (consumerOpt <- consumerMap.values) { + consumerOpt match { + case Some(consumer) => consumer.close() + case None => // ignore + } + } + */ + } } -object OffsetGetter { +object OffsetGetter extends Logging { case class KafkaInfo(name: String, brokers: Seq[BrokerInfo], offsets: Seq[OffsetInfo]) @@ -210,10 +231,39 @@ object OffsetGetter { var newKafkaConsumer: KafkaConsumer[String, String] = null def createZkUtils(args: OffsetGetterArgs): ZkUtils = { - ZkUtils(args.zk, + + val sleepAfterFailedZkUtilsInstantiation: Int = 30000 + var zkUtils: ZkUtils = null + + while (null == zkUtils) { + + try { + + info("Creating new ZkUtils object."); + zkUtils = ZkUtils(args.zk, args.zkSessionTimeout.toMillis.toInt, args.zkConnectionTimeout.toMillis.toInt, JaasUtils.isZkSecurityEnabled()) + } + + catch { + + case e: Throwable => + + if (null != zkUtils) { + + zkUtils.close() + zkUtils = null + } + + val errorMsg = "Error creating ZkUtils. Will attempt to re-create in %d seconds".format(sleepAfterFailedZkUtilsInstantiation) + error(errorMsg, e) + Thread.sleep(sleepAfterFailedZkUtilsInstantiation) + } + } + + info("Created zkUtils object: "+ zkUtils) + zkUtils } def getInstance(args: OffsetGetterArgs): OffsetGetter = { @@ -250,4 +300,4 @@ object OffsetGetter { new ZKOffsetGetter(zkUtils) } } -} \ No newline at end of file +} diff --git a/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala b/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala index 6f6a62e..7ea1c1b 100644 --- a/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala +++ b/src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala @@ -2,8 +2,15 @@ package com.quantifind.kafka.core import java.nio.{BufferUnderflowException, ByteBuffer} import java.util +import java.util.concurrent.atomic.AtomicReference import java.util.{Arrays, Properties} +import com.morningstar.kafka.KafkaCommittedOffset +import com.morningstar.kafka.KafkaOffsetMetadata +import com.morningstar.kafka.KafkaOffsetStorage +import com.morningstar.kafka.KafkaTopicPartition +import com.morningstar.kafka.KafkaTopicPartitionLogEndOffset + import com.quantifind.kafka.OffsetGetter.OffsetInfo import com.quantifind.kafka.offsetapp.OffsetGetterArgs import com.quantifind.kafka.{Node, OffsetGetter} @@ -17,14 +24,14 @@ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.{PartitionInfo, TopicPartition} -import scala.collection._ +import scala.collection.{mutable, _} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.{Await, Future, duration} /** - * Created by rcasey on 11/16/2016. - */ + * Created by rcasey on 11/16/2016. + */ class KafkaOffsetGetter(zkUtilsWrapper: ZkUtilsWrapper, args: OffsetGetterArgs) extends OffsetGetter { import KafkaOffsetGetter._ @@ -37,27 +44,55 @@ class KafkaOffsetGetter(zkUtilsWrapper: ZkUtilsWrapper, args: OffsetGetterArgs) val topicPartition = new TopicPartition(topic, partitionId) val topicAndPartition = TopicAndPartition(topic, partitionId) + val optionalOffsetMetaData: Option[OffsetAndMetadata] = committedOffsetMap.get(GroupTopicPartition(group, topicAndPartition)) - committedOffsetMap.get(GroupTopicPartition(group, topicAndPartition)) map { offsetMetaData => - - // BIT O'HACK to deal with timing: - // Due to thread and processing timing, it is possible that the value we have for the topicPartition's - // logEndOffset has not yet been updated to match the value on the broker. When this happens, report the - // topicPartition's logEndOffset as "committedOffset - lag", as the topicPartition's logEndOffset is *AT LEAST* - // this value - val logEndOffset: Option[Long] = logEndOffsetsMap.get(topicPartition) - val committedOffset: Long = offsetMetaData.offset - val lag: Long = logEndOffset.get - committedOffset - val logEndOffsetReported: Long = if (lag < 0) committedOffset - lag else logEndOffset.get - - // Get client information if we can find an associated client - var clientString: Option[String] = Option("NA") - val filteredClients = clients.filter(c => (c.group == group && c.topicPartitions.contains(topicPartition))) - if (!filteredClients.isEmpty) { - val client: ClientGroup = filteredClients.head - clientString = Option(client.clientId + client.clientHost) - } + if (!optionalOffsetMetaData.isDefined) { + error(s"processPartition: Could not find group-topic-partition in committedOffsetsMap, g:$group,t:$topic,p:$partitionId") + return None + } + + val offsetMetaData: OffsetAndMetadata = optionalOffsetMetaData.get; + val isActiveGroup: Boolean = isGroupActive(group) + + kafkaOffsetStorage.addCommittedOffset( + new KafkaCommittedOffset( + group, + isActiveGroup, + topic, + partitionId, + offsetMetaData.offset, + offsetMetaData.commitTimestamp + ) + ) + + if (!isActiveGroup) { + info(s"processPartition: Not reporting offset because group is not active, g:$group,t:$topic,p:$partitionId") + return None + } + + val logEndOffset: Long = logEndOffsetsMap.get(topicPartition).getOrElse(-1) + + if (logEndOffset == -1) { + info(s"processPartition: Not reporting this offset as we do not yet have logEndOffset to calc lag, g:$group,t:$topic,p:$partitionId") + return None + } + + // BIT O'HACK to deal with timing: + // Due to thread and processing timing, it is possible that the value we have for the topicPartition's + // logEndOffset has not yet been updated to match the value on the broker. When this happens, report the + // topicPartition's logEndOffset as "committedOffset - lag", as the topicPartition's logEndOffset is *AT LEAST* + // this value + val committedOffset: Long = offsetMetaData.offset + val logEndOffsetReported: Long = if (committedOffset > logEndOffset) committedOffset else logEndOffset + + // Get client information if we can find an associated client + var clientString: Option[String] = Option("NA") + val filteredClient = clients.get().filter(c => (c.group == group && c.topicPartitions.contains(topicPartition))).headOption + if (filteredClient.isDefined) { + clientString = Option(filteredClient.get.clientId + filteredClient.get.clientHost) + } + Some( OffsetInfo(group = group, topic = topic, partition = partitionId, @@ -66,47 +101,95 @@ class KafkaOffsetGetter(zkUtilsWrapper: ZkUtilsWrapper, args: OffsetGetterArgs) owner = clientString, creation = Time.fromMilliseconds(offsetMetaData.commitTimestamp), modified = Time.fromMilliseconds(offsetMetaData.expireTimestamp)) - } + ) } override def getGroups: Seq[String] = { - topicAndGroups.groupBy(_.group).keySet.toSeq.sorted + committedOffsetMap.keys.map(_.group).toSet.toSeq.sorted + //topicAndGroups.get().groupBy(_.group).keySet.toSeq.sorted + } + + override def getActiveGroups: Seq[String] = { + activeTopicAndGroups.get() + .map(_.group) + .toSeq } override def getTopicList(group: String): List[String] = { - topicAndGroups.filter(_.group == group).groupBy(_.topic).keySet.toList.sorted + committedOffsetMap.keySet + .filter(_.group.equals(group)) + .map(_.topicPartition.topic) + .toSeq + .toList + //topicAndGroups.get().filter(_.group == group).groupBy(_.topic).keySet.toList.sorted } override def getTopicMap: Map[String, scala.Seq[String]] = { - topicAndGroups.groupBy(_.topic).mapValues(_.map(_.group).toSeq) + committedOffsetMap.keySet + .groupBy(_.topicPartition.topic) + .mapValues(_.map(_.group).toSeq) + + //topicAndGroups.get().groupBy(_.topic).mapValues(_.map(_.group).toSeq) } override def getActiveTopicMap: Map[String, Seq[String]] = { - getTopicMap + activeTopicAndGroups.get().groupBy(_.topic).mapValues(_.map(_.group).toSeq) + //getTopicMap } override def getTopics: Seq[String] = { - topicPartitionsMap.keys.toSeq.sorted + activeTopicPartitionsMap.get().keys.toSeq.sorted + } + + override def getTopicsAndLogEndOffsets: Seq[TopicLogEndOffsetInfo] = { + var topicsAndLogEndOffsets: Set[TopicLogEndOffsetInfo] = mutable.HashSet() + + for (topic: String <- activeTopicPartitionsMap.get().keys) { + + var topicPartitionOffsetSet: Set[TopicPartitionLogEndOffset] = new mutable.HashSet[TopicPartitionLogEndOffset] + var sumLogEndOffset: Long = 0; + + val topicPartitionOffset: immutable.Map[TopicPartition, Long] = logEndOffsetsMap.filter(p => p._1.topic() == topic).toMap + for (topicPartition: TopicPartition <- topicPartitionOffset.keys.toSeq.sortWith(_.partition() > _.partition())) { + + val partition: Int = topicPartition.partition() + val logEndOffset: Long = logEndOffsetsMap.get(topicPartition).getOrElse(0) + + sumLogEndOffset += logEndOffset + topicPartitionOffsetSet += TopicPartitionLogEndOffset(topic, partition, logEndOffset) + } + + topicsAndLogEndOffsets += TopicLogEndOffsetInfo(topic, sumLogEndOffset, topicPartitionOffsetSet.toSeq.sorted) + } + + topicsAndLogEndOffsets.toSeq.sorted } override def getClusterViz: Node = { - val clusterNodes = topicPartitionsMap.values.map(partition => { + val clusterNodes = activeTopicPartitionsMap.get().values.map(partition => { Node(partition.get(0).leader().host() + ":" + partition.get(0).leader().port(), Seq()) }).toSet.toSeq.sortWith(_.name < _.name) Node("KafkaCluster", clusterNodes) } + + def isGroupActive(group: String): Boolean = getActiveGroups.exists(_.equals(group)) + + override def getConsumerGroupStatus: String = { + kafkaOffsetStorage.getConsumerGroups() + } } object KafkaOffsetGetter extends Logging { val committedOffsetMap: concurrent.Map[GroupTopicPartition, OffsetAndMetadata] = concurrent.TrieMap() val logEndOffsetsMap: concurrent.Map[TopicPartition, Long] = concurrent.TrieMap() + val kafkaOffsetStorage: KafkaOffsetStorage = new KafkaOffsetStorage(); // Swap the object on update - var activeTopicPartitions: immutable.Set[TopicAndPartition] = immutable.HashSet() - var clients: immutable.Set[ClientGroup] = immutable.HashSet() - var topicAndGroups: immutable.Set[TopicAndGroup] = immutable.HashSet() - var topicPartitionsMap: immutable.Map[String, util.List[PartitionInfo]] = immutable.HashMap() + var activeTopicPartitions: AtomicReference[immutable.Set[TopicAndPartition]] = new AtomicReference(immutable.HashSet()) + var clients: AtomicReference[immutable.Set[ClientGroup]] = new AtomicReference(immutable.HashSet()) + var activeTopicAndGroups: AtomicReference[immutable.Set[TopicAndGroup]] = new AtomicReference(immutable.HashSet()) + var activeTopicPartitionsMap: AtomicReference[immutable.Map[String, util.List[PartitionInfo]]] = new AtomicReference(immutable.HashMap()) private def createNewKafkaConsumer(args: OffsetGetterArgs, group: String, autoCommitOffset: Boolean): KafkaConsumer[Array[Byte], Array[Byte]] = { @@ -162,13 +245,13 @@ object KafkaOffsetGetter extends Logging { } /** - * Attempts to parse a kafka message as an offset message. - * - * @author Robert Casey (rcasey212@gmail.com) - * @param message message retrieved from the kafka client's poll() method - * @return key-value of GroupTopicPartition and OffsetAndMetadata if the message was a valid offset message, - * otherwise None - */ + * Attempts to parse a kafka message as an offset message. + * + * @author Robert Casey (rcasey212@gmail.com) + * @param message message retrieved from the kafka client's poll() method + * @return key-value of GroupTopicPartition and OffsetAndMetadata if the message was a valid offset message, + * otherwise None + */ def tryParseOffsetMessage(message: ConsumerRecord[Array[Byte], Array[Byte]]): Option[(GroupTopicPartition, OffsetAndMetadata)] = { try { @@ -359,9 +442,9 @@ object KafkaOffsetGetter extends Logging { }) }) - activeTopicPartitions = newActiveTopicPartitions.toSet - clients = newClients.toSet - topicAndGroups = newTopicAndGroups.toSet + activeTopicPartitions.set(newActiveTopicPartitions.toSet) + clients.set(newClients.toSet) + activeTopicAndGroups.set(newTopicAndGroups.toSet) } catch { @@ -404,7 +487,6 @@ object KafkaOffsetGetter extends Logging { def startLogEndOffsetGetter(args: OffsetGetterArgs) = { - val group: String = "kafka-monitor-LogEndOffsetGetter" val sleepOnDataRetrieval: Int = 10000 val sleepOnError: Int = 30000 var logEndOffsetGetter: KafkaConsumer[Array[Byte], Array[Byte]] = null @@ -415,12 +497,13 @@ object KafkaOffsetGetter extends Logging { while (null == logEndOffsetGetter) { + val group: String = "kafka-monitor-LogEndOffsetGetter" logEndOffsetGetter = createNewKafkaConsumer(args, group, false) } // Get topic-partitions - topicPartitionsMap = JavaConversions.mapAsScalaMap(logEndOffsetGetter.listTopics()).toMap - val distinctPartitionInfo: Seq[PartitionInfo] = (topicPartitionsMap.values).flatten(listPartitionInfo => JavaConversions.asScalaBuffer(listPartitionInfo)).toSeq + activeTopicPartitionsMap.set(JavaConversions.mapAsScalaMap(logEndOffsetGetter.listTopics()).toMap) + val distinctPartitionInfo: Seq[PartitionInfo] = (activeTopicPartitionsMap.get().values).flatten(listPartitionInfo => JavaConversions.asScalaBuffer(listPartitionInfo)).toSeq // Iterate over each distinct PartitionInfo distinctPartitionInfo.foreach(partitionInfo => { @@ -431,6 +514,12 @@ object KafkaOffsetGetter extends Logging { logEndOffsetGetter.seekToEnd(topicPartition) val logEndOffset: Long = logEndOffsetGetter.position(topicPartition) + // Update KafkaOffsetStorage + kafkaOffsetStorage.addLogEndOffset( + new KafkaTopicPartitionLogEndOffset( + new KafkaTopicPartition(partitionInfo.topic, partitionInfo.partition), + new KafkaOffsetMetadata(logEndOffset, System.currentTimeMillis()))); + // Update the TopicPartition map with the current LogEndOffset if it exists, else add a new entry to the map if (logEndOffsetsMap.contains(topicPartition)) { logEndOffsetsMap.update(topicPartition, logEndOffset) @@ -454,6 +543,8 @@ object KafkaOffsetGetter extends Logging { logEndOffsetGetter.close() logEndOffsetGetter = null } + + Thread.sleep(sleepOnError) } } } @@ -462,4 +553,33 @@ object KafkaOffsetGetter extends Logging { case class TopicAndGroup(topic: String, group: String) -case class ClientGroup(group: String, clientId: String, clientHost: String, topicPartitions: Set[TopicPartition]) \ No newline at end of file +case class ClientGroup(group: String, clientId: String, clientHost: String, topicPartitions: Set[TopicPartition]) + +case class TopicPartitionLogEndOffset(topic: String, partition: Int, logEndOffset: Long) extends Ordered [TopicPartitionLogEndOffset] { + + def compare (that: TopicPartitionLogEndOffset) = { + if (this.topic == that.topic) { + if (this.partition > that.partition) + 1 + else if (this.partition < that.partition) + -1 + else + 0 + } + else if (this.topic > that.topic) + 1 + else + -1 + } +} + +case class TopicLogEndOffsetInfo(topic: String, sumLogEndOffset: Long, partitions: Seq[TopicPartitionLogEndOffset]) extends Ordered[TopicLogEndOffsetInfo] { + + def compare (that:TopicLogEndOffsetInfo) = { + if (this.topic == that.topic) 0 + else if (this.topic > that.topic) 1 + else -1 + } +} + +//case class OffsetMetadataAndLag diff --git a/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala b/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala index 1a59430..a987086 100644 --- a/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala +++ b/src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala @@ -1,13 +1,15 @@ package com.quantifind.kafka.core +import com.morningstar.kafka.KafkaConsumerGroup import com.quantifind.kafka.OffsetGetter import com.quantifind.kafka.OffsetGetter.OffsetInfo import com.quantifind.utils.ZkUtilsWrapper import com.twitter.util.Time import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo} import kafka.common.TopicAndPartition -import kafka.utils.{Json} +import kafka.utils.Json import org.I0Itec.zkclient.exception.ZkNoNodeException +import org.apache.kafka.common.TopicPartition import org.apache.zookeeper.data.Stat import scala.collection._ @@ -72,6 +74,8 @@ class StormOffsetGetter(theZkUtils: ZkUtilsWrapper, zkOffsetBase: String) extend } } + override def getActiveGroups: Seq[String] = getGroups + /** * Finds all topics for this group, for Kafka Spout there is only one */ @@ -115,4 +119,8 @@ class StormOffsetGetter(theZkUtils: ZkUtilsWrapper, zkOffsetBase: String) extend // not really have a way to determine which consumer is active now, so return all getTopicMap } -} \ No newline at end of file + + override def getTopicsAndLogEndOffsets: Seq[TopicLogEndOffsetInfo] = ??? + + override def getConsumerGroupStatus: String = ??? +} diff --git a/src/main/scala/com/quantifind/kafka/core/ZKOffsetGetter.scala b/src/main/scala/com/quantifind/kafka/core/ZKOffsetGetter.scala index edc5b11..742dafa 100644 --- a/src/main/scala/com/quantifind/kafka/core/ZKOffsetGetter.scala +++ b/src/main/scala/com/quantifind/kafka/core/ZKOffsetGetter.scala @@ -1,5 +1,6 @@ package com.quantifind.kafka.core +import com.morningstar.kafka.KafkaConsumerGroup import com.quantifind.kafka.OffsetGetter import com.quantifind.kafka.OffsetGetter.OffsetInfo import com.quantifind.utils.ZkUtilsWrapper @@ -10,6 +11,7 @@ import kafka.common.TopicAndPartition import kafka.utils.ZkUtils import org.apache.zookeeper.data.Stat import org.I0Itec.zkclient.exception.ZkNoNodeException +import org.apache.kafka.common.TopicPartition import scala.collection._ import scala.util.control.NonFatal @@ -68,6 +70,8 @@ class ZKOffsetGetter(theZkUtils: ZkUtilsWrapper) extends OffsetGetter { } } + override def getActiveGroups: Seq[String] = getGroups + override def getTopicList(group: String): List[String] = { try { zkUtils.getChildren(s"${ZkUtils.ConsumersPath}/$group/offsets").toList @@ -118,4 +122,8 @@ class ZKOffsetGetter(theZkUtils: ZkUtilsWrapper) extends OffsetGetter { Map() } } + + override def getTopicsAndLogEndOffsets: Seq[TopicLogEndOffsetInfo] = ??? + + override def getConsumerGroupStatus: String = ??? } diff --git a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetDB.scala b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetDB.scala index 3758618..f37c959 100644 --- a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetDB.scala +++ b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetDB.scala @@ -9,120 +9,129 @@ import com.quantifind.kafka.offsetapp.OffsetDB.{DbOffsetInfo, OffsetHistory, Off import com.twitter.util.Time /** - * Tools to store offsets in a DB - * User: andrews - * Date: 1/27/14 - */ + * Store and retrieve offsets in a SQLite file + * User: andrews, rcasey212 + * Date: 1/27/14 + */ class OffsetDB(dbfile: String) { - val database = Database.forURL(s"jdbc:sqlite:$dbfile.db", - driver = "org.sqlite.JDBC") - - implicit val twitterTimeMap = MappedColumnType.base[Time, Long]( - { - time => time.inMillis - }, { - millis => Time.fromMilliseconds(millis) - } - ) - - class Offset(tag: Tag) extends Table[DbOffsetInfo](tag, "OFFSETS") { - def id = column[Int]("id", O.PrimaryKey, O.AutoInc) - - val group = column[String]("group") - val topic = column[String]("topic") - val partition = column[Int]("partition") - val offset = column[Long]("offset") - val logSize = column[Long]("log_size") - val owner = column[Option[String]]("owner") - val timestamp = column[Long]("timestamp") - val creation = column[Time]("creation") - val modified = column[Time]("modified") - - - def * = (id.?, group, topic, partition, offset, logSize, owner, timestamp, creation, modified).shaped <>(DbOffsetInfo.parse, DbOffsetInfo.unparse) - - def forHistory = (timestamp, partition, owner, offset, logSize) <>(OffsetPoints.tupled, OffsetPoints.unapply) - - def idx = index("idx_search", (group, topic)) - - def tidx = index("idx_time", (timestamp)) - - def uidx = index("idx_unique", (group, topic, partition, timestamp), unique = true) - - } - - val offsets = TableQuery[Offset] - - def insert(timestamp: Long, info: OffsetInfo) { - database.withSession { - implicit s => - offsets += DbOffsetInfo(timestamp = timestamp, offset = info) - } - } - - def insertAll(info: IndexedSeq[OffsetInfo]) { - val now = System.currentTimeMillis - database.withTransaction { - implicit s => - offsets ++= info.map(i => DbOffsetInfo(timestamp = now, offset = i)) - } - } - - def emptyOld(since: Long) { - database.withSession { - implicit s => - offsets.filter(_.timestamp < since).delete - } - } - - def offsetHistory(group: String, topic: String): OffsetHistory = database.withSession { - implicit s => - val o = offsets - .filter(off => off.group === group && off.topic === topic) - .sortBy(_.timestamp) - .map(_.forHistory) - .list(implicitly[JdbcBackend#Session]) - OffsetHistory(group, topic, o) - } - - def maybeCreate() { - database.withSession { - implicit s => - if (MTable.getTables("OFFSETS").list(implicitly[JdbcBackend#Session]).isEmpty) { - offsets.ddl.create - } - } - } - + var sqliteDbLock: AnyRef = new Object() + + val database = Database.forURL(s"jdbc:sqlite:$dbfile.db", + driver = "org.sqlite.JDBC") + + implicit val twitterTimeMap = MappedColumnType.base[Time, Long]( + { + time => time.inMillis + }, { + millis => Time.fromMilliseconds(millis) + } + ) + + class Offset(tag: Tag) extends Table[DbOffsetInfo](tag, "OFFSETS") { + def id = column[Int]("id", O.PrimaryKey, O.AutoInc) + + val group = column[String]("group") + val topic = column[String]("topic") + val partition = column[Int]("partition") + val offset = column[Long]("offset") + val logSize = column[Long]("log_size") + val owner = column[Option[String]]("owner") + val timestamp = column[Long]("timestamp") + val creation = column[Time]("creation") + val modified = column[Time]("modified") + + + def * = (id.?, group, topic, partition, offset, logSize, owner, timestamp, creation, modified).shaped <> (DbOffsetInfo.parse, DbOffsetInfo.unparse) + + def forHistory = (timestamp, partition, owner, offset, logSize) <> (OffsetPoints.tupled, OffsetPoints.unapply) + + def idx = index("idx_search", (group, topic)) + + def tidx = index("idx_time", (timestamp)) + + def uidx = index("idx_unique", (group, topic, partition, timestamp), unique = true) + } + + val offsets = TableQuery[Offset] + + def insert(timestamp: Long, info: OffsetInfo) { + sqliteDbLock.synchronized { + database.withSession { + implicit s => + offsets += DbOffsetInfo(timestamp = timestamp, offset = info) + } + } + } + + def insertAll(info: IndexedSeq[OffsetInfo]) { + val now = System.currentTimeMillis + sqliteDbLock.synchronized { + database.withTransaction { + implicit s => + offsets ++= info.map(i => DbOffsetInfo(timestamp = now, offset = i)) + } + } + } + + def emptyOld(since: Long) { + sqliteDbLock.synchronized { + database.withSession { + implicit s => + offsets.filter(_.timestamp < since).delete + } + } + } + + def offsetHistory(group: String, topic: String): OffsetHistory = database.withSession { + sqliteDbLock.synchronized { + implicit s => + val o = offsets + .filter(off => off.group === group && off.topic === topic) + .sortBy(_.timestamp) + .map(_.forHistory) + .list(implicitly[JdbcBackend#Session]) + OffsetHistory(group, topic, o) + } + } + + def maybeCreate() { + sqliteDbLock.synchronized { + database.withSession { + implicit s => + if (MTable.getTables("OFFSETS").list(implicitly[JdbcBackend#Session]).isEmpty) { + offsets.ddl.create + } + } + } + } } object OffsetDB { - case class OffsetPoints(timestamp: Long, partition: Int, owner: Option[String], offset: Long, logSize: Long) - - case class OffsetHistory(group: String, topic: String, offsets: Seq[OffsetPoints]) - - case class DbOffsetInfo(id: Option[Int] = None, timestamp: Long, offset: OffsetInfo) - - object DbOffsetInfo { - def parse(in: (Option[Int], String, String, Int, Long, Long, Option[String], Long, Time, Time)): DbOffsetInfo = { - val (id, group, topic, partition, offset, logSize, owner, timestamp, creation, modified) = in - DbOffsetInfo(id, timestamp, OffsetInfo(group, topic, partition, offset, logSize, owner, creation, modified)) - } - - def unparse(in: DbOffsetInfo): Option[(Option[Int], String, String, Int, Long, Long, Option[String], Long, Time, Time)] = Some( - in.id, - in.offset.group, - in.offset.topic, - in.offset.partition, - in.offset.offset, - in.offset.logSize, - in.offset.owner, - in.timestamp, - in.offset.creation, - in.offset.modified - ) - } - + case class OffsetPoints(timestamp: Long, partition: Int, owner: Option[String], offset: Long, logSize: Long) + + case class OffsetHistory(group: String, topic: String, offsets: Seq[OffsetPoints]) + + case class DbOffsetInfo(id: Option[Int] = None, timestamp: Long, offset: OffsetInfo) + + object DbOffsetInfo { + def parse(in: (Option[Int], String, String, Int, Long, Long, Option[String], Long, Time, Time)): DbOffsetInfo = { + val (id, group, topic, partition, offset, logSize, owner, timestamp, creation, modified) = in + DbOffsetInfo(id, timestamp, OffsetInfo(group, topic, partition, offset, logSize, owner, creation, modified)) + } + + def unparse(in: DbOffsetInfo): Option[(Option[Int], String, String, Int, Long, Long, Option[String], Long, Time, Time)] = Some( + in.id, + in.offset.group, + in.offset.topic, + in.offset.partition, + in.offset.offset, + in.offset.logSize, + in.offset.owner, + in.timestamp, + in.offset.creation, + in.offset.modified + ) + } } diff --git a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala index efd6d7d..a7fc541 100644 --- a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala +++ b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterApp.scala @@ -13,6 +13,7 @@ import com.quantifind.sumac.{ArgMain, FieldArgs} import com.quantifind.sumac.validation.Required import com.quantifind.sumac.{ArgMain, FieldArgs} +import kafka.utils.ZkUtils import org.I0Itec.zkclient.ZkClient import scala.concurrent.duration._ import scala.language.postfixOps @@ -111,7 +112,7 @@ object OffsetGetterApp extends ArgMain[OffsetGetterArgsWGT] { } } finally { - + if (og != null) og.close() } } } diff --git a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterWeb.scala b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterWeb.scala index 759429b..82332bd 100644 --- a/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterWeb.scala +++ b/src/main/scala/com/quantifind/kafka/offsetapp/OffsetGetterWeb.scala @@ -49,10 +49,14 @@ object OffsetGetterWeb extends UnfilteredWebApp[OWArgs] with Logging { def htmlRoot: String = "/offsetapp" - val scheduler : ScheduledExecutorService = Executors.newScheduledThreadPool(2) + val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(2) var reporters: mutable.Set[OffsetInfoReporter] = null + val millisBeforeStartingReportingThread = 0 + val millisBeforeStartingCleanupThread = 0 + + def retryTask[T](fn: => T) { try { retry(3) { @@ -65,20 +69,37 @@ object OffsetGetterWeb extends UnfilteredWebApp[OWArgs] with Logging { } def reportOffsets(args: OWArgs) { + + try { val groups = getGroups(args) groups.foreach { g => val inf = getInfo(g, args).offsets.toIndexedSeq debug(s"reporting ${inf.size}") - reporters.foreach( reporter => retryTask { reporter.report(inf) } ) + reporters.foreach(reporter => retryTask { + reporter.report(inf) + }) } } + catch { - def schedule(args: OWArgs) { + case e: Throwable => + error("Error while in reportOffsets().", e) + } + } - scheduler.scheduleAtFixedRate( () => { reportOffsets(args) }, 0, args.refresh.toMillis, TimeUnit.MILLISECONDS ) - scheduler.scheduleAtFixedRate( () => { reporters.foreach(reporter => retryTask({reporter.cleanupOldData()})) }, args.retain.toMillis, args.retain.toMillis, TimeUnit.MILLISECONDS ) + def cleanupOldData() = { + reporters.foreach(reporter => retryTask({ reporter.cleanupOldData() })); + } + + /* Schedule time-based threads */ + def schedule(args: OWArgs) { + + scheduler.scheduleAtFixedRate(() => { reportOffsets(args) }, + millisBeforeStartingReportingThread, args.refresh.toMillis, TimeUnit.MILLISECONDS) + scheduler.scheduleAtFixedRate(() => { cleanupOldData() }, + millisBeforeStartingCleanupThread, args.retain.toMillis, TimeUnit.MILLISECONDS) } def withOG[T](args: OWArgs)(f: OffsetGetter => T): T = { @@ -87,7 +108,7 @@ object OffsetGetterWeb extends UnfilteredWebApp[OWArgs] with Logging { og = OffsetGetter.getInstance(args) f(og) } finally { - + if (og != null) og.close() } } @@ -97,15 +118,24 @@ object OffsetGetterWeb extends UnfilteredWebApp[OWArgs] with Logging { def getGroups(args: OWArgs) = withOG(args) { _.getGroups + } + + def getActiveGroups(args: OWArgs) = withOG(args) { + _.getActiveGroups } def getActiveTopics(args: OWArgs) = withOG(args) { _.getActiveTopics } + def getTopics(args: OWArgs) = withOG(args) { _.getTopics } + def getTopicsAndLogEndOffsets(args: OWArgs) = withOG(args) { + _.getTopicsAndLogEndOffsets + } + def getTopicDetail(topic: String, args: OWArgs) = withOG(args) { _.getTopicDetail(topic) } @@ -118,20 +148,23 @@ object OffsetGetterWeb extends UnfilteredWebApp[OWArgs] with Logging { _.getClusterViz } + def getConsumerGroupStatus(args: OWArgs) = withOG(args) { + _.getConsumerGroupStatus + } + override def afterStop() { scheduler.shutdown() } - class TimeSerializer extends CustomSerializer[Time](format => ( - { - case JInt(s)=> - Time.fromMilliseconds(s.toLong) - }, - { - case x: Time => - JInt(x.inMilliseconds) - } + class TimeSerializer extends CustomSerializer[Time](format => ( { + case JInt(s) => + Time.fromMilliseconds(s.toLong) + }, + { + case x: Time => + JInt(x.inMilliseconds) + } )) override def setup(args: OWArgs): Plan = new Plan { @@ -144,15 +177,17 @@ object OffsetGetterWeb extends UnfilteredWebApp[OWArgs] with Logging { def intent: Plan.Intent = { case GET(Path(Seg("group" :: Nil))) => - JsonContent ~> ResponseString(write(getGroups(args))) + JsonContent ~> ResponseString(write(getActiveGroups(args))) case GET(Path(Seg("group" :: group :: Nil))) => val info = getInfo(group, args) JsonContent ~> ResponseString(write(info)) ~> Ok case GET(Path(Seg("group" :: group :: topic :: Nil))) => val offsets = args.db.offsetHistory(group, topic) JsonContent ~> ResponseString(write(offsets)) ~> Ok + case GET(Path(Seg("consumergroup" :: Nil))) => + JsonContent ~> ResponseString(getConsumerGroupStatus(args)) ~> Ok case GET(Path(Seg("topiclist" :: Nil))) => - JsonContent ~> ResponseString(write(getTopics(args))) + JsonContent ~> ResponseString(write(getTopicsAndLogEndOffsets(args))) case GET(Path(Seg("clusterlist" :: Nil))) => JsonContent ~> ResponseString(write(getClusterViz(args))) case GET(Path(Seg("topicdetails" :: topic :: Nil))) => @@ -172,10 +207,11 @@ object OffsetGetterWeb extends UnfilteredWebApp[OWArgs] with Logging { val reportersSet: mutable.Set[Class[_ <: OffsetInfoReporter]] = scala.collection.JavaConversions.asScalaSet(reportersTypes) - // SQLiteOffsetInfoReporter as a main storage is instantiated explicitly outside this loop so it is filtered out + // SQLiteOffsetInfoReporter as a main storage is instantiated separately as it has a different constructor from + // the other OffsetInfoReporter objects reportersSet .filter(!_.equals(classOf[SQLiteOffsetInfoReporter])) - .map((reporterType: Class[_ <: OffsetInfoReporter]) => createReporterInstance(reporterType, args.pluginsArgs)) + .map((reporterType: Class[_ <: OffsetInfoReporter]) => createReporterInstance(reporterType, args.pluginsArgs)) .+(new SQLiteOffsetInfoReporter(argHolder.db, args)) } diff --git a/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala b/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala index ff5e5da..2f84a6c 100644 --- a/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala +++ b/src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala @@ -20,14 +20,16 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers { trait Fixture { - val mockedZkUtil = Mockito.mock(classOf[ZkUtilsWrapper]) - val mockedConsumer = Mockito.mock(classOf[SimpleConsumer]) + val mockedZkUtil: ZkUtilsWrapper = Mockito.mock(classOf[ZkUtilsWrapper]) + val mockedConsumer: SimpleConsumer = Mockito.mock(classOf[SimpleConsumer]) val testPartitionLeader = 1 val args = new OffsetGetterArgs - val offsetGetter = new KafkaOffsetGetter(mockedZkUtil, args) + val offsetGetter: KafkaOffsetGetter = new KafkaOffsetGetter(mockedZkUtil, args) offsetGetter.consumerMap += (testPartitionLeader -> Some(mockedConsumer)) + val offsetGetterSpy: KafkaOffsetGetter = spy(offsetGetter) + } "KafkaOffsetGetter" should "be able to build offset data for given partition" in new Fixture { @@ -52,8 +54,9 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers { val partitionErrorAndOffsets = Map(topicAndPartition -> PartitionOffsetsResponse(0, Seq(logEndOffset))) val offsetResponse = OffsetResponse(1, partitionErrorAndOffsets) when(mockedConsumer.getOffsetsBefore(any[OffsetRequest])).thenReturn(offsetResponse) + when(offsetGetterSpy.isGroupActive(any[String])).thenReturn(true) - offsetGetter.processPartition(testGroup, testTopic, testPartition) match { + offsetGetterSpy.processPartition(testGroup, testTopic, testPartition) match { case Some(offsetInfo) => offsetInfo.topic shouldBe testTopic offsetInfo.group shouldBe testGroup @@ -161,4 +164,4 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers { gtp.topicPartition shouldBe TopicAndPartition(topic, partition) offMeta shouldBe offsetAndMetadata } -} \ No newline at end of file +}