Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import com.google.common.base.Preconditions;
Expand All @@ -43,14 +44,22 @@ public class KafkaAuditCountVerifier {
public static final String COMPLETENESS_PREFIX = "completeness.";
public static final String SOURCE_TIER = COMPLETENESS_PREFIX + "source.tier";
public static final String REFERENCE_TIERS = COMPLETENESS_PREFIX + "reference.tiers";
public static final String TOTAL_COUNT_REFERENCE_TIERS = COMPLETENESS_PREFIX + "totalCount.reference.tiers";
public static final String THRESHOLD = COMPLETENESS_PREFIX + "threshold";
private static final double DEFAULT_THRESHOLD = 0.999;
public static final String COMPLETE_ON_NO_COUNTS = COMPLETENESS_PREFIX + "complete.on.no.counts";

public enum CompletenessType {
ClassicCompleteness,
TotalCountCompleteness
}

private final boolean returnCompleteOnNoCounts;

private final AuditCountClient auditCountClient;
private final String srcTier;
private final Collection<String> refTiers;
private final Collection<String> totalCountRefTiers;
private final double threshold;

/**
Expand All @@ -69,6 +78,9 @@ public KafkaAuditCountVerifier(State state, AuditCountClient client) {
state.getPropAsDouble(THRESHOLD, DEFAULT_THRESHOLD);
this.srcTier = state.getProp(SOURCE_TIER);
this.refTiers = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(state.getProp(REFERENCE_TIERS));
this.totalCountRefTiers = state.contains(TOTAL_COUNT_REFERENCE_TIERS)
? Splitter.on(",").omitEmptyStrings().trimResults().splitToList(state.getProp(TOTAL_COUNT_REFERENCE_TIERS))
: null;
this.returnCompleteOnNoCounts = state.getPropAsBoolean(COMPLETE_ON_NO_COUNTS, false);
}

Expand All @@ -90,23 +102,52 @@ private static AuditCountClient getAuditClient(State state) {
}
}

public Map<CompletenessType, Boolean> calculateCompleteness(String datasetName, long beginInMillis, long endInMillis)
throws IOException {
return calculateCompleteness(datasetName, beginInMillis, endInMillis, this.threshold);
}

/**
* Compare source tier against reference tiers.
* Compute completion percentage by srcCount/refCount. Return true iff the highest percentages is greater than threshold.
* Compute completion percentage which is true iff the calculated percentages is greater than threshold.
*
* @param datasetName A dataset short name like 'PageViewEvent'
* @param beginInMillis Unix timestamp in milliseconds
* @param endInMillis Unix timestamp in milliseconds
* @param threshold User defined threshold
*
* @return a map of completeness result by CompletenessType
*/
public boolean isComplete(String datasetName, long beginInMillis, long endInMillis, double threshold)
throws IOException {
return getCompletenessPercentage(datasetName, beginInMillis, endInMillis) > threshold;
public Map<CompletenessType, Boolean> calculateCompleteness(String datasetName, long beginInMillis, long endInMillis,
double threshold) throws IOException {
Map<String, Long> countsByTier = getTierAndCount(datasetName, beginInMillis, endInMillis);
log.info(String.format("checkTierCounts: audit counts map for %s for range [%s,%s]", datasetName, beginInMillis, endInMillis));
countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));

Map<CompletenessType, Boolean> result = new HashMap<>();
result.put(CompletenessType.ClassicCompleteness, calculateCompleteness(datasetName, beginInMillis, endInMillis,
CompletenessType.ClassicCompleteness, countsByTier) > threshold);
result.put(CompletenessType.TotalCountCompleteness, calculateCompleteness(datasetName, beginInMillis, endInMillis,
CompletenessType.TotalCountCompleteness, countsByTier) > threshold);
return result;
}

public boolean isComplete(String datasetName, long beginInMillis, long endInMillis)
throws IOException {
return isComplete(datasetName, beginInMillis, endInMillis, this.threshold);
private double calculateCompleteness(String datasetName, long beginInMillis, long endInMillis, CompletenessType type,
Map<String, Long> countsByTier) throws IOException {
if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {
log.info(String.format("Found empty counts map for %s, returning complete", datasetName));
return 1.0;
}

switch (type) {
case ClassicCompleteness:
return calculateClassicCompleteness(datasetName, beginInMillis, endInMillis, countsByTier);
case TotalCountCompleteness:
return calculateTotalCountCompleteness(datasetName, beginInMillis, endInMillis, countsByTier);
default:
log.error("Skip unsupported completeness type {}", type);
return -1;
}
}

/**
Expand All @@ -118,39 +159,70 @@ public boolean isComplete(String datasetName, long beginInMillis, long endInMill
*
* @return The highest percentage value
*/
private double getCompletenessPercentage(String datasetName, long beginInMillis, long endInMillis) throws IOException {
Map<String, Long> countsByTier = getTierAndCount(datasetName, beginInMillis, endInMillis);
log.info(String.format("Audit counts map for %s for range [%s,%s]", datasetName, beginInMillis, endInMillis));
countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {
log.info(String.format("Found empty counts map for %s, returning complete", datasetName));
return 1.0;
}
double percent = -1;
if (!countsByTier.containsKey(this.srcTier)) {
throw new IOException(String.format("Source tier %s audit count cannot be retrieved for dataset %s between %s and %s", this.srcTier, datasetName, beginInMillis, endInMillis));
}
private double calculateClassicCompleteness(String datasetName, long beginInMillis, long endInMillis,
Map<String, Long> countsByTier) throws IOException {
validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier, this.srcTier, this.refTiers);

double percent = -1;
for (String refTier: this.refTiers) {
if (!countsByTier.containsKey(refTier)) {
throw new IOException(String.format("Reference tier %s audit count cannot be retrieved for dataset %s between %s and %s", refTier, datasetName, beginInMillis, endInMillis));
}
long refCount = countsByTier.get(refTier);
if(refCount <= 0) {
throw new IOException(String.format("Reference tier %s count cannot be less than or equal to zero", refTier));
}
long srcCount = countsByTier.get(this.srcTier);

percent = Double.max(percent, (double) srcCount / (double) refCount);
}

if (percent < 0) {
throw new IOException("Cannot calculate completion percentage");
}
return percent;
}

/**
* Check total count based completeness by comparing source tier against reference tiers,
* and calculate the completion percentage by srcCount/sum_of(refCount).
*
* @param datasetName A dataset short name like 'PageViewEvent'
* @param beginInMillis Unix timestamp in milliseconds
* @param endInMillis Unix timestamp in milliseconds
*
* @return The percentage value by srcCount/sum_of(refCount)
*/
private double calculateTotalCountCompleteness(String datasetName, long beginInMillis, long endInMillis,
Map<String, Long> countsByTier) throws IOException {
if (this.totalCountRefTiers == null) {
return -1;
}
validateTierCounts(datasetName, beginInMillis, endInMillis, countsByTier, this.srcTier, this.totalCountRefTiers);

long srcCount = countsByTier.get(this.srcTier);
long totalRefCount = this.totalCountRefTiers
.stream()
.mapToLong(countsByTier::get)
.sum();
double percent = Double.max(-1, (double) srcCount / (double) totalRefCount);
if (percent < 0) {
throw new IOException("Cannot calculate total count completion percentage");
}
return percent;
}

private static void validateTierCounts(String datasetName, long beginInMillis, long endInMillis, Map<String, Long> countsByTier,
String sourceTier, Collection<String> referenceTiers)
throws IOException {
if (!countsByTier.containsKey(sourceTier)) {
throw new IOException(String.format("Source tier %s audit count cannot be retrieved for dataset %s between %s and %s", sourceTier, datasetName, beginInMillis, endInMillis));
}

for (String refTier: referenceTiers) {
if (!countsByTier.containsKey(refTier)) {
throw new IOException(String.format("Reference tier %s audit count cannot be retrieved for dataset %s between %s and %s", refTier, datasetName, beginInMillis, endInMillis));
}
long refCount = countsByTier.get(refTier);
if(refCount <= 0) {
throw new IOException(String.format("Reference tier %s count cannot be less than or equal to zero", refTier));
}
}
}

/**
* Fetch all <tier-count> pairs for a given dataset between a time range
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public class KafkaAuditCountVerifierTest {
public static final String SOURCE_TIER = "gobblin";
public static final String REFERENCE_TIERS = "producer";

public static final String TOTAL_COUNT_REF_TIER_0 = "producer_0";
public static final String TOTAL_COUNT_REF_TIER_1 = "producer_1";
public static final String TOTAL_COUNT_REFERENCE_TIERS = TOTAL_COUNT_REF_TIER_0 + "," + TOTAL_COUNT_REF_TIER_1;


public void testFetch() throws IOException {
final String topic = "testTopic";
State props = new State();
Expand All @@ -48,22 +53,86 @@ public void testFetch() throws IOException {
REFERENCE_TIERS, 1000L
));
// Default threshold
Assert.assertTrue(verifier.isComplete(topic, 0L, 0L));
Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));

// 99.999 % complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 999L,
REFERENCE_TIERS, 1000L
));
Assert.assertTrue(verifier.isComplete(topic, 0L, 0L));
Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));

// <= 99% complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 990L,
REFERENCE_TIERS, 1000L
));
Assert.assertFalse(verifier.isComplete(topic, 0L, 0L));
Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
}

public void testTotalCountCompleteness() throws IOException {
final String topic = "testTopic";
State props = new State();
props.setProp(KafkaAuditCountVerifier.SOURCE_TIER, SOURCE_TIER);
props.setProp(KafkaAuditCountVerifier.REFERENCE_TIERS, REFERENCE_TIERS);
props.setProp(KafkaAuditCountVerifier.TOTAL_COUNT_REFERENCE_TIERS, TOTAL_COUNT_REFERENCE_TIERS);
props.setProp(KafkaAuditCountVerifier.THRESHOLD, ".99");
TestAuditClient client = new TestAuditClient(props);
KafkaAuditCountVerifier verifier = new KafkaAuditCountVerifier(props, client);

// All complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 1000L,
REFERENCE_TIERS, 1000L,
TOTAL_COUNT_REF_TIER_0, 600L,
TOTAL_COUNT_REF_TIER_1, 400L
));
// Default threshold
Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));

// 99.999 % complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 999L,
REFERENCE_TIERS, 1000L,
TOTAL_COUNT_REF_TIER_0, 600L,
TOTAL_COUNT_REF_TIER_1, 400L
));
Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));

// <= 99% complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 990L,
REFERENCE_TIERS, 1000L,
TOTAL_COUNT_REF_TIER_0, 600L,
TOTAL_COUNT_REF_TIER_1, 400L
));
Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
}

public void testEmptyAuditCount() throws IOException {
final String topic = "testTopic";
State props = new State();
props.setProp(KafkaAuditCountVerifier.SOURCE_TIER, SOURCE_TIER);
props.setProp(KafkaAuditCountVerifier.REFERENCE_TIERS, REFERENCE_TIERS);
props.setProp(KafkaAuditCountVerifier.TOTAL_COUNT_REFERENCE_TIERS, TOTAL_COUNT_REFERENCE_TIERS);
props.setProp(KafkaAuditCountVerifier.THRESHOLD, ".99");
props.setProp(KafkaAuditCountVerifier.COMPLETE_ON_NO_COUNTS, true);
TestAuditClient client = new TestAuditClient(props);
KafkaAuditCountVerifier verifier = new KafkaAuditCountVerifier(props, client);

// Client gets empty audit count
client.setTierCounts(ImmutableMap.of());

// Should be complete, since COMPLETE_ON_NO_COUNTS=true
Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
}
}
Loading