Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,13 @@ private double calculateCompleteness(String datasetName, long beginInMillis, lon

/**
* Compare source tier against reference tiers. For each reference tier, calculates percentage by srcCount/refCount.
*
* We will return the lowest value, which, in other words, we will wait until src tier catches up to all reference
* tiers (upto 99.9%) to mark that hour as completed.
* @param datasetName A dataset short name like 'PageViewEvent'
* @param beginInMillis Unix timestamp in milliseconds
* @param endInMillis Unix timestamp in milliseconds
*
* @return The highest percentage value
* @return The lowest percentage value
*/
private double calculateClassicCompleteness(String datasetName, long beginInMillis, long endInMillis,
Map<String, Long> countsByTier) throws IOException {
Expand All @@ -171,16 +172,19 @@ private double calculateClassicCompleteness(String datasetName, long beginInMill
for (String refTier: this.refTiers) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to get more clarity here
Current Scenario:
source tier = gobblin, ref tier = agg cluster

What are the refTiers in the new scenario is it (agg cluster, local cluster1) or (agg cluster, local cluster1,local cluster2,local cluster3 ) and what is the expected behavior?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not necessarily for local tier, but in general to support multiple reference tiers. As for the local stuff as it's internal specific detail, reply with you offline

Copy link
Copy Markdown
Contributor

@vikrambohra vikrambohra Oct 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think better way to put it is : we wait until src tier catches up to all reference tiers (upto 99.9%)

long refCount = countsByTier.get(refTier);
long srcCount = countsByTier.get(this.srcTier);
double tmpPercent;

/*
If we have a case where an audit map is returned, however, one of the source tiers on another fabric is 0,
and the reference tiers from Kafka is reported to be 0, we can say that this hour is complete.
This needs to be added as a non-zero double value divided by 0 is infinity, but 0 divided by 0 is NaN.
*/
if (srcCount == 0 && refCount == 0) {
return 1.0;
tmpPercent = 1;
} else {
tmpPercent = (double) srcCount / (double) refCount;
}
percent = Double.max(percent, (double) srcCount / (double) refCount);
percent = percent < 0 ? tmpPercent : Double.min(percent, tmpPercent);
Copy link
Copy Markdown
Contributor

@AndyJiang99 AndyJiang99 Oct 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the srcTiers are local cluster 1, local cluster 2, etc, rather than using the lowest percentage completion, would instead it be srcTier / (summation of refTier counts)? Since the total records across all the refTiers (assuming local 1, local 2, etc.) should be equal to the number of records on srcTier eventually?

Depending on what the eventual conclusion of this is, let's also update the javadocs of this to match the behavior

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

if (percent < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
public class KafkaAuditCountVerifierTest {

public static final String SOURCE_TIER = "gobblin";
public static final String REFERENCE_TIERS = "producer";
public static final String REFERENCE_TIER = "producer";
public static final String REFERENCE_TIER_1 = "producer_reference";
public static final String REFERENCE_TIERS = REFERENCE_TIER + "," + REFERENCE_TIER_1;

public static final String TOTAL_COUNT_REF_TIER_0 = "producer_0";
public static final String TOTAL_COUNT_REF_TIER_1 = "producer_1";
Expand All @@ -50,7 +52,8 @@ public void testFetch() throws IOException {
// All complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 1000L,
REFERENCE_TIERS, 1000L
REFERENCE_TIER, 1000L,
REFERENCE_TIER_1, 1000L
));
// Default threshold
Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
Expand All @@ -59,15 +62,17 @@ public void testFetch() throws IOException {
// 99.999 % complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 999L,
REFERENCE_TIERS, 1000L
REFERENCE_TIER, 1000L,
REFERENCE_TIER_1, 1000L
));
Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));

// <= 99% complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 990L,
REFERENCE_TIERS, 1000L
REFERENCE_TIER, 1000L,
REFERENCE_TIER_1, 1000L
));
Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
Expand All @@ -86,7 +91,8 @@ public void testTotalCountCompleteness() throws IOException {
// All complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 1000L,
REFERENCE_TIERS, 1000L,
REFERENCE_TIER, 1000L,
REFERENCE_TIER_1, 1000L,
TOTAL_COUNT_REF_TIER_0, 600L,
TOTAL_COUNT_REF_TIER_1, 400L
));
Expand All @@ -97,7 +103,8 @@ public void testTotalCountCompleteness() throws IOException {
// 99.999 % complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 999L,
REFERENCE_TIERS, 1000L,
REFERENCE_TIER, 1000L,
REFERENCE_TIER_1, 1000L,
TOTAL_COUNT_REF_TIER_0, 600L,
TOTAL_COUNT_REF_TIER_1, 400L
));
Expand All @@ -107,7 +114,8 @@ public void testTotalCountCompleteness() throws IOException {
// <= 99% complete
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 990L,
REFERENCE_TIERS, 1000L,
REFERENCE_TIER, 1000L,
REFERENCE_TIER_1, 1000L,
TOTAL_COUNT_REF_TIER_0, 600L,
TOTAL_COUNT_REF_TIER_1, 400L
));
Expand Down Expand Up @@ -140,7 +148,8 @@ public void testEmptyAuditCount() throws IOException {
client.setTierCounts(
ImmutableMap.of(
SOURCE_TIER, 990L,
REFERENCE_TIERS, 0L,
REFERENCE_TIER, 0L,
REFERENCE_TIER_1, 0L,
TOTAL_COUNT_REF_TIER_0, 0L,
TOTAL_COUNT_REF_TIER_1, 0L
));
Expand All @@ -153,7 +162,8 @@ public void testEmptyAuditCount() throws IOException {
client.setTierCounts(
ImmutableMap.of(
SOURCE_TIER, 0L,
REFERENCE_TIERS, 0L,
REFERENCE_TIER, 0L,
REFERENCE_TIER_1, 0L,
TOTAL_COUNT_REF_TIER_0, 0L,
TOTAL_COUNT_REF_TIER_1, 0L
));
Expand All @@ -175,7 +185,8 @@ public void testOneCountFailed() throws IOException {
// Missing total count tier which will throw exception
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 999L,
REFERENCE_TIERS, 1000L
REFERENCE_TIER, 1000L,
REFERENCE_TIER_1, 1000L
));

// Classic completeness is still returned, but total is missing
Expand All @@ -184,4 +195,42 @@ public void testOneCountFailed() throws IOException {
Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
.containsKey(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
}

public void testDifferentValueInReferenceTier() throws IOException {
final String topic = "testTopic";
State props = new State();
props.setProp(KafkaAuditCountVerifier.SOURCE_TIER, SOURCE_TIER);
props.setProp(KafkaAuditCountVerifier.REFERENCE_TIERS, REFERENCE_TIERS);
props.setProp(KafkaAuditCountVerifier.TOTAL_COUNT_REFERENCE_TIERS, TOTAL_COUNT_REFERENCE_TIERS);
props.setProp(KafkaAuditCountVerifier.THRESHOLD, ".99");
props.setProp(KafkaAuditCountVerifier.COMPLETE_ON_NO_COUNTS, true);
TestAuditClient client = new TestAuditClient(props);
KafkaAuditCountVerifier verifier = new KafkaAuditCountVerifier(props, client);

// Different value in reference tier
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 999L,
REFERENCE_TIER, 1000L,
REFERENCE_TIER_1, 2000L
));

// Classic completeness is fail as 999/2000 < 99.9%
Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));

// Different value in reference tier and one tier has 0 in count
client.setTierCounts(ImmutableMap.of(
SOURCE_TIER, 999L,
REFERENCE_TIER, 0L,
REFERENCE_TIER_1, 2000L
));

// Classic completeness is fail as 999/2000 < 99.9%
Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));



}

}
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ private String getHealthCheckReport() {
public void execute() {
this.ingestionLatencies.add(this.statsTracker.getMaxIngestionLatency(TimeUnit.MINUTES));
this.consumptionRateMBps.add(this.statsTracker.getConsumptionRateMBps());
double avgConsumptionRate = getMaxConsumptionRate();
log.info("Avg. Consumption Rate = {} MBps, Target Consumption rate = {} MBps", avgConsumptionRate, this.expectedConsumptionRate);
if (ingestionLatencies.size() < this.slidingWindowSize) {
log.info("SUCCESS: Num observations: {} smaller than {}", ingestionLatencies.size(), this.slidingWindowSize);
return;
Expand All @@ -146,8 +148,6 @@ public void execute() {
log.info("SUCCESS: Ingestion Latencies = {}, Ingestion Latency Threshold: {}", this.ingestionLatencies.toString(), this.ingestionLatencyThresholdMinutes);
return;
}

double avgConsumptionRate = getMaxConsumptionRate();
if (avgConsumptionRate > this.consumptionRateDropOffFraction * this.expectedConsumptionRate) {
log.info("SUCCESS: Avg. Consumption Rate = {} MBps, Target Consumption rate = {} MBps", avgConsumptionRate, this.expectedConsumptionRate);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@
@Slf4j
public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
public static final String GOBBLIN_KAFKA_PREFIX = "gobblin.kafka.";
private static final int DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER = 10;
public static final String DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER_KEY = GOBBLIN_KAFKA_PREFIX + "default.num.topic.partitions.per.container";
private static final int DEFAULT_DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER = 10;

//A global configuration for container capacity. The container capacity refers to the peak rate (in MB/s) that a
//single JVM can consume from Kafka for a single topic and controls the number of partitions of a topic that will be
Expand Down Expand Up @@ -198,6 +199,7 @@ public List<WorkUnit> pack(Map<String, List<WorkUnit>> workUnitsByTopic, int num
}
//Add CONTAINER_CAPACITY into each workunit. Useful when KafkaIngestionHealthCheck is enabled.
for (WorkUnit workUnit: workUnitsForTopic) {
//todo: check whether it's set already to respect the topic specific capacity from user input properties
workUnit.setProp(CONTAINER_CAPACITY_KEY, containerCapacity);
}
double estimatedDataSizeForTopic = calcTotalEstSizeForTopic(workUnitsForTopic);
Expand Down Expand Up @@ -293,7 +295,7 @@ private void addStatsToWorkUnits(Map<String, List<WorkUnit>> workUnitsByTopic) t

private Double getDefaultWorkUnitSize() {
return state.getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY,
KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) / DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER;
KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) / state.getPropAsDouble(DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER_KEY, DEFAULT_DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER);
}

/**
Expand Down