diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java index 19bc805eb6a..d614e25240c 100644 --- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java +++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java @@ -861,7 +861,8 @@ public void flush(String dbName, String tableName) throws IOException { // The logic is to check the window [currentHour-1,currentHour] and update the watermark if there are no audit counts if(!tableMetadata.appendFiles.isPresent() && !tableMetadata.deleteFiles.isPresent() && tableMetadata.completenessEnabled) { - updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props); + updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props, + tableMetadata.totalCountCompletenessEnabled); } //Set high waterMark @@ -925,14 +926,14 @@ private void updateWatermarkWithFilesRegistered(String topicName, TableMetadata } private void updateWatermarkWithNoFilesRegistered(String topicName, TableMetadata tableMetadata, - Map propsToUpdate) { + Map propsToUpdate, boolean includeTotalCountCompletionWatermark) { if (tableMetadata.completionWatermark > DEFAULT_COMPLETION_WATERMARK) { log.info(String.format("Checking kafka audit for %s on change_property ", topicName)); SortedSet timestamps = new TreeSet<>(); ZonedDateTime dtAtBeginningOfHour = ZonedDateTime.now(ZoneId.of(this.timeZone)).truncatedTo(ChronoUnit.HOURS); timestamps.add(dtAtBeginningOfHour); - getWatermarkUpdater(topicName, tableMetadata, propsToUpdate).run(timestamps, true); + getWatermarkUpdater(topicName, tableMetadata, propsToUpdate).run(timestamps, includeTotalCountCompletionWatermark); } else { log.info(String.format("Need valid watermark, current watermark is %s, Not checking kafka audit for %s", tableMetadata.completionWatermark, topicName));