Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -837,74 +837,77 @@ public void flush(String dbName, String tableName) throws IOException {
try {
TableIdentifier tid = TableIdentifier.of(dbName, tableName);
TableMetadata tableMetadata = tableMetadataMap.getOrDefault(tid, new TableMetadata(this.conf));
if (tableMetadata.transaction.isPresent()) {
Transaction transaction = tableMetadata.transaction.get();
Map<String, String> props = tableMetadata.newProperties.or(
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
//Set data offset range
setDatasetOffsetRange(tableMetadata, props);
String topicName = getTopicName(tid, tableMetadata);
if (tableMetadata.appendFiles.isPresent()) {
tableMetadata.appendFiles.get().commit();
try (Timer.Context context = new Timer().time()) {
sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
log.info("Sending audit counts for {} took {} ms", topicName, TimeUnit.NANOSECONDS.toMillis(context.stop()));
}
if (tableMetadata.completenessEnabled) {
updateWatermarkWithFilesRegistered(topicName, tableMetadata, props,
tableMetadata.totalCountCompletenessEnabled);
}
}
if (tableMetadata.deleteFiles.isPresent()) {
tableMetadata.deleteFiles.get().commit();
}
// Check and update completion watermark when there are no files to be registered, typically for quiet topics
// The logic is to check the window [currentHour-1,currentHour] and update the watermark if there are no audit counts
if(!tableMetadata.appendFiles.isPresent() && !tableMetadata.deleteFiles.isPresent()
&& tableMetadata.completenessEnabled) {
updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props,
if (!tableMetadata.transaction.isPresent()) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review this PR with hide whitespace enabled. There is quite a bit of whitespace change

log.info("There's no transaction initiated for the table {}", tid);
return;
}

Transaction transaction = tableMetadata.transaction.get();
Map<String, String> props = tableMetadata.newProperties.or(
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
//Set data offset range
setDatasetOffsetRange(tableMetadata, props);
String topicName = getTopicName(tid, tableMetadata);
if (tableMetadata.appendFiles.isPresent()) {
tableMetadata.appendFiles.get().commit();
if (tableMetadata.completenessEnabled) {
updateWatermarkWithFilesRegistered(topicName, tableMetadata, props,
tableMetadata.totalCountCompletenessEnabled);
}
}

//Set high waterMark
Long highWatermark = tableCurrentWatermarkMap.get(tid);
props.put(String.format(GMCE_HIGH_WATERMARK_KEY, tableTopicPartitionMap.get(tid)), highWatermark.toString());
//Set low waterMark
props.put(String.format(GMCE_LOW_WATERMARK_KEY, tableTopicPartitionMap.get(tid)),
tableMetadata.lowWatermark.get().toString());
//Set whether to delete metadata files after commit
if (conf.getBoolean(ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY, DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY)) {
props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, Boolean.toString(
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, Integer.toString(
conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
}
//Update schema(commit)
updateSchema(tableMetadata, props, topicName);
//Update properties
UpdateProperties updateProperties = transaction.updateProperties();
props.forEach(updateProperties::set);
updateProperties.commit();
try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, tableName);
Timer.Context context = new Timer().time()) {
transaction.commitTransaction();
log.info("Committing transaction for table {} took {} ms", tid, TimeUnit.NANOSECONDS.toMillis(context.stop()));
}
if (tableMetadata.deleteFiles.isPresent()) {
tableMetadata.deleteFiles.get().commit();
}
// Check and update completion watermark when there are no files to be registered, typically for quiet topics
// The logic is to check the window [currentHour-1,currentHour] and update the watermark if there are no audit counts
if(!tableMetadata.appendFiles.isPresent() && !tableMetadata.deleteFiles.isPresent()
&& tableMetadata.completenessEnabled) {
updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props,
tableMetadata.totalCountCompletenessEnabled);
}

// Emit GTE for snapshot commits
Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
Map<String, String> currentProps = tableMetadata.table.get().properties();
try (Timer.Context context = new Timer().time()) {
submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, currentProps, highWatermark);
log.info("Sending snapshot commit event for {} took {} ms", topicName, TimeUnit.NANOSECONDS.toMillis(context.stop()));
}
//Set high waterMark
Long highWatermark = tableCurrentWatermarkMap.get(tid);
props.put(String.format(GMCE_HIGH_WATERMARK_KEY, tableTopicPartitionMap.get(tid)), highWatermark.toString());
//Set low waterMark
props.put(String.format(GMCE_LOW_WATERMARK_KEY, tableTopicPartitionMap.get(tid)),
tableMetadata.lowWatermark.get().toString());
//Set whether to delete metadata files after commit
if (conf.getBoolean(ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY, DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY)) {
props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, Boolean.toString(
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, Integer.toString(
conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
}
//Update schema(commit)
updateSchema(tableMetadata, props, topicName);
//Update properties
UpdateProperties updateProperties = transaction.updateProperties();
props.forEach(updateProperties::set);
updateProperties.commit();
try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, tableName);
Timer.Context context = new Timer().time()) {
transaction.commitTransaction();
log.info("Committing transaction for table {} took {} ms", tid, TimeUnit.NANOSECONDS.toMillis(context.stop()));
}

//Reset the table metadata for next accumulation period
tableMetadata.reset(currentProps, highWatermark);
log.info(String.format("Finish commit of new snapshot %s for table %s", snapshot.snapshotId(), tid));
} else {
log.info("There's no transaction initiated for the table {}", tid);
// Emit GTE for snapshot commits
Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
Map<String, String> currentProps = tableMetadata.table.get().properties();
try (Timer.Context context = new Timer().time()) {
submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, currentProps, highWatermark);
log.info("Sending snapshot commit event for {} took {} ms", topicName, TimeUnit.NANOSECONDS.toMillis(context.stop()));
}

try (Timer.Context context = new Timer().time()) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change is just to move the audit count emission to the end. I experimented with an early return vs a giant if else statement, so there is a large amount of whitespace change

I think I like this formatting better but not strongly opinionated about it

sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
log.info("Sending audit counts for {} took {} ms", topicName, TimeUnit.NANOSECONDS.toMillis(context.stop()));
}

//Reset the table metadata for next accumulation period
tableMetadata.reset(currentProps, highWatermark);
log.info(String.format("Finish commit of new snapshot %s for table %s", snapshot.snapshotId(), tid));
} catch (RuntimeException e) {
throw new IOException(String.format("Fail to flush table %s %s", dbName, tableName), e);
} catch (Exception e) {
Expand Down