diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java index 3135a124351..4cfb0bea056 100644 --- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java +++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java @@ -23,6 +23,7 @@ import java.time.ZoneId; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -832,15 +833,14 @@ public void flush(String dbName, String tableName) throws IOException { tableMetadata.deleteFiles.get().commit(); } // Check and update completion watermark when there are no files to be registered, typically for quiet topics - // The logic is to check the next window from previous completion watermark and update the watermark if there are no audit counts + // The logic is to check the window [currentHour-1,currentHour] and update the watermark if there are no audit counts if(!tableMetadata.appendFiles.isPresent() && !tableMetadata.deleteFiles.isPresent() && tableMetadata.completenessEnabled) { if (tableMetadata.completionWatermark > DEFAULT_COMPLETION_WATERMARK) { log.info(String.format("Checking kafka audit for %s on change_property ", topicName)); SortedSet timestamps = new TreeSet<>(); - ZonedDateTime prevWatermarkDT = - Instant.ofEpochMilli(tableMetadata.completionWatermark).atZone(ZoneId.of(this.timeZone)); - timestamps.add(TimeIterator.inc(prevWatermarkDT, TimeIterator.Granularity.valueOf(this.auditCheckGranularity), 1)); + ZonedDateTime dtAtBeginningOfHour = ZonedDateTime.now(ZoneId.of(this.timeZone)).truncatedTo(ChronoUnit.HOURS); + timestamps.add(dtAtBeginningOfHour); checkAndUpdateCompletenessWatermark(tableMetadata, topicName, timestamps, props); } else { log.info(String.format("Need valid watermark, current watermark is %s, Not checking kafka audit for %s", diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java index 294ef08ab95..bc1c8ca5706 100644 --- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java +++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java @@ -19,6 +19,9 @@ import java.io.File; import java.io.IOException; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; import java.util.*; import java.util.concurrent.TimeUnit; @@ -492,9 +495,9 @@ public void testWriteAddFileGMCECompleteness() throws IOException { @Test(dependsOnMethods={"testWriteAddFileGMCECompleteness"}, groups={"icebergMetadataWriterTest"}) public void testChangePropertyGMCECompleteness() throws IOException { - Table table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0)); - long watermark = Long.parseLong(table.properties().get(COMPLETION_WATERMARK_KEY)); - long expectedWatermark = watermark + TimeUnit.HOURS.toMillis(1); + ZonedDateTime expectedCWDt = ZonedDateTime.now(ZoneId.of(DEFAULT_TIME_ZONE)).truncatedTo(ChronoUnit.HOURS); + // For quiet topics, watermark should always be beginning of current hour + long expectedWatermark = expectedCWDt.toInstant().toEpochMilli(); File hourlyFile2 = new File(tmpDir, "testDB/testTopic/hourly/2021/09/16/11/data.avro"); gmce.setOldFilePrefixes(null); gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder() @@ -511,11 +514,12 @@ public void testChangePropertyGMCECompleteness() throws IOException { new LongWatermark(65L)))); KafkaAuditCountVerifier verifier = Mockito.mock(TestAuditCountVerifier.class); - Mockito.when(verifier.isComplete("testTopic", watermark, expectedWatermark)).thenReturn(true); + // For quiet topics always check for previous hour window + Mockito.when(verifier.isComplete("testTopic", expectedCWDt.minusHours(1).toInstant().toEpochMilli(), expectedWatermark)).thenReturn(true); ((IcebergMetadataWriter) gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier); gobblinMCEWriterWithCompletness.flush(); - table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0)); + Table table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0)); Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), "0-7000"); Assert.assertEquals(table.spec().fields().get(1).name(), "late"); Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");