Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -832,15 +833,14 @@ public void flush(String dbName, String tableName) throws IOException {
tableMetadata.deleteFiles.get().commit();
}
// Check and update completion watermark when there are no files to be registered, typically for quiet topics
// The logic is to check the next window from previous completion watermark and update the watermark if there are no audit counts
// The logic is to check the window [currentHour-1,currentHour] and update the watermark if there are no audit counts
if(!tableMetadata.appendFiles.isPresent() && !tableMetadata.deleteFiles.isPresent()
&& tableMetadata.completenessEnabled) {
if (tableMetadata.completionWatermark > DEFAULT_COMPLETION_WATERMARK) {
log.info(String.format("Checking kafka audit for %s on change_property ", topicName));
SortedSet<ZonedDateTime> timestamps = new TreeSet<>();
ZonedDateTime prevWatermarkDT =
Instant.ofEpochMilli(tableMetadata.completionWatermark).atZone(ZoneId.of(this.timeZone));
timestamps.add(TimeIterator.inc(prevWatermarkDT, TimeIterator.Granularity.valueOf(this.auditCheckGranularity), 1));
ZonedDateTime dtAtBeginningOfHour = ZonedDateTime.now(ZoneId.of(this.timeZone)).truncatedTo(ChronoUnit.HOURS);
timestamps.add(dtAtBeginningOfHour);
checkAndUpdateCompletenessWatermark(tableMetadata, topicName, timestamps, props);
} else {
log.info(String.format("Need valid watermark, current watermark is %s, Not checking kafka audit for %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import java.io.File;
import java.io.IOException;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -492,9 +495,9 @@ public void testWriteAddFileGMCECompleteness() throws IOException {
@Test(dependsOnMethods={"testWriteAddFileGMCECompleteness"}, groups={"icebergMetadataWriterTest"})
public void testChangePropertyGMCECompleteness() throws IOException {

Table table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
long watermark = Long.parseLong(table.properties().get(COMPLETION_WATERMARK_KEY));
long expectedWatermark = watermark + TimeUnit.HOURS.toMillis(1);
ZonedDateTime expectedCWDt = ZonedDateTime.now(ZoneId.of(DEFAULT_TIME_ZONE)).truncatedTo(ChronoUnit.HOURS);
// For quiet topics, watermark should always be beginning of current hour
long expectedWatermark = expectedCWDt.toInstant().toEpochMilli();
File hourlyFile2 = new File(tmpDir, "testDB/testTopic/hourly/2021/09/16/11/data.avro");
gmce.setOldFilePrefixes(null);
gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
Expand All @@ -511,11 +514,12 @@ public void testChangePropertyGMCECompleteness() throws IOException {
new LongWatermark(65L))));

KafkaAuditCountVerifier verifier = Mockito.mock(TestAuditCountVerifier.class);
Mockito.when(verifier.isComplete("testTopic", watermark, expectedWatermark)).thenReturn(true);
// For quiet topics always check for previous hour window
Mockito.when(verifier.isComplete("testTopic", expectedCWDt.minusHours(1).toInstant().toEpochMilli(), expectedWatermark)).thenReturn(true);
((IcebergMetadataWriter) gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier);
gobblinMCEWriterWithCompletness.flush();

table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
Table table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), "0-7000");
Assert.assertEquals(table.spec().fields().get(1).name(), "late");
Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
Expand Down