Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ private Set<DataFile> getIcebergDataFilesToBeAdded(Table table, TableMetadata ta
}
dataFiles.add(IcebergUtils.getIcebergDataFileWithMetric(file, table.spec(), partition, conf, schemaIdMap));
} catch (Exception e) {
log.warn("Cannot get DataFile for {} dur to {}", file.getFilePath(), e);
log.warn("Cannot get DataFile for {} due to {}", file.getFilePath(), e);
}
}
return dataFiles;
Expand Down Expand Up @@ -834,86 +834,122 @@ protected String getTopicName(TableIdentifier tid, TableMetadata tableMetadata)
public void flush(String dbName, String tableName) throws IOException {
Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
boolean transactionCommitted = false;
try {
TableIdentifier tid = TableIdentifier.of(dbName, tableName);
TableMetadata tableMetadata = tableMetadataMap.getOrDefault(tid, new TableMetadata(this.conf));
if (tableMetadata.transaction.isPresent()) {
Transaction transaction = tableMetadata.transaction.get();
Map<String, String> props = tableMetadata.newProperties.or(
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
//Set data offset range
setDatasetOffsetRange(tableMetadata, props);
String topicName = getTopicName(tid, tableMetadata);
if (tableMetadata.appendFiles.isPresent()) {
tableMetadata.appendFiles.get().commit();
try (Timer.Context context = new Timer().time()) {
sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
log.info("Sending audit counts for {} took {} ms", topicName, TimeUnit.NANOSECONDS.toMillis(context.stop()));
}
if (tableMetadata.completenessEnabled) {
updateWatermarkWithFilesRegistered(topicName, tableMetadata, props,
tableMetadata.totalCountCompletenessEnabled);
}
}
if (tableMetadata.deleteFiles.isPresent()) {
tableMetadata.deleteFiles.get().commit();
}
// Check and update completion watermark when there are no files to be registered, typically for quiet topics
// The logic is to check the window [currentHour-1,currentHour] and update the watermark if there are no audit counts
if(!tableMetadata.appendFiles.isPresent() && !tableMetadata.deleteFiles.isPresent()
&& tableMetadata.completenessEnabled) {
updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props,
tableMetadata.totalCountCompletenessEnabled);
}
if (!tableMetadata.transaction.isPresent()) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review this PR with hide whitespace enabled. There is quite a bit of whitespace change

log.info("There's no transaction initiated for the table {}", tid);
return;
}

//Set high waterMark
Long highWatermark = tableCurrentWatermarkMap.get(tid);
props.put(String.format(GMCE_HIGH_WATERMARK_KEY, tableTopicPartitionMap.get(tid)), highWatermark.toString());
//Set low waterMark
props.put(String.format(GMCE_LOW_WATERMARK_KEY, tableTopicPartitionMap.get(tid)),
tableMetadata.lowWatermark.get().toString());
//Set whether to delete metadata files after commit
if (conf.getBoolean(ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY, DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY)) {
props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, Boolean.toString(
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, Integer.toString(
conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
}
//Update schema(commit)
updateSchema(tableMetadata, props, topicName);
//Update properties
UpdateProperties updateProperties = transaction.updateProperties();
props.forEach(updateProperties::set);
updateProperties.commit();
try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, tableName);
Timer.Context context = new Timer().time()) {
transaction.commitTransaction();
log.info("Committing transaction for table {} took {} ms", tid, TimeUnit.NANOSECONDS.toMillis(context.stop()));
Transaction transaction = tableMetadata.transaction.get();
Map<String, String> props = tableMetadata.newProperties.or(
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
//Set data offset range
setDatasetOffsetRange(tableMetadata, props);
String topicName = getTopicName(tid, tableMetadata);
if (tableMetadata.appendFiles.isPresent()) {
tableMetadata.appendFiles.get().commit();
if (tableMetadata.completenessEnabled) {
updateWatermarkWithFilesRegistered(topicName, tableMetadata, props,
tableMetadata.totalCountCompletenessEnabled);
}
}

// Emit GTE for snapshot commits
Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
Map<String, String> currentProps = tableMetadata.table.get().properties();
try (Timer.Context context = new Timer().time()) {
submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, currentProps, highWatermark);
log.info("Sending snapshot commit event for {} took {} ms", topicName, TimeUnit.NANOSECONDS.toMillis(context.stop()));
}
if (tableMetadata.deleteFiles.isPresent()) {
tableMetadata.deleteFiles.get().commit();
}
// Check and update completion watermark when there are no files to be registered, typically for quiet topics
// The logic is to check the window [currentHour-1,currentHour] and update the watermark if there are no audit counts
if(!tableMetadata.appendFiles.isPresent() && !tableMetadata.deleteFiles.isPresent()
&& tableMetadata.completenessEnabled) {
updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props,
tableMetadata.totalCountCompletenessEnabled);
}

//Reset the table metadata for next accumulation period
tableMetadata.reset(currentProps, highWatermark);
log.info(String.format("Finish commit of new snapshot %s for table %s", snapshot.snapshotId(), tid));
} else {
log.info("There's no transaction initiated for the table {}", tid);
//Set high waterMark
Long highWatermark = tableCurrentWatermarkMap.get(tid);
props.put(String.format(GMCE_HIGH_WATERMARK_KEY, tableTopicPartitionMap.get(tid)), highWatermark.toString());
//Set low waterMark
props.put(String.format(GMCE_LOW_WATERMARK_KEY, tableTopicPartitionMap.get(tid)),
tableMetadata.lowWatermark.get().toString());
//Set whether to delete metadata files after commit
if (conf.getBoolean(ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY, DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY)) {
props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, Boolean.toString(
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, Integer.toString(
conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
}
//Update schema(commit)
updateSchema(tableMetadata, props, topicName);
//Update properties
UpdateProperties updateProperties = transaction.updateProperties();
props.forEach(updateProperties::set);
updateProperties.commit();
try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, tableName);
Timer.Context context = new Timer().time()) {
transaction.commitTransaction();
log.info("Committing transaction for table {} took {} ms", tid, TimeUnit.NANOSECONDS.toMillis(context.stop()));
transactionCommitted = true;
}

postCommit(tableMetadata, dbName, tableName, topicName, highWatermark);
//Reset the table metadata for next accumulation period
Map<String, String> currentProps = tableMetadata.table.get().properties();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why no put these line in postCommit ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I think of postCommit is that we want to execute to have any custom logic for emission of metrics here. But resetting the table metadata we always want to do. If we ever need to override the above postCommit in an inherited class, they need to remember to invoke this boiler plate too.

Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
tableMetadata.reset(currentProps, highWatermark);
log.info(String.format("Finish commit of new snapshot %s for table %s", snapshot.snapshotId(), tid));
} catch (RuntimeException e) {
throw new IOException(String.format("Fail to flush table %s %s", dbName, tableName), e);
throw new IOException(String.format("Failed to flush table %s %s. transactionCommitted=%s",
dbName, tableName, transactionCommitted), e);
} catch (Exception e) {
throw new IOException(String.format("Fail to flush table %s %s", dbName, tableName), e);
throw new IOException(String.format("Failed to flush table %s %s. transactionCommitted=%s",
dbName, tableName, transactionCommitted), e);
} finally {
writeLock.unlock();
}
}

/**
* PostCommit operation that executes after the transaction is committed to the Iceberg table. Operations in this
* method are considered non-critical to the transaction and will not cause the transaction to fail if they fail,
* but should ideally still be executed for observability.
*
* One example of this is observability events / metrics like {@link org.apache.gobblin.metrics.GobblinTrackingEvent}.
* The default behavior is to emit a GTE for the commit event and a kafka audit event
*
* @param tableMetadata
* @param dbName
* @param tableName
* @param topicName
* @param highWatermark
* @throws IOException
*/
protected void postCommit(
TableMetadata tableMetadata,
String dbName,
String tableName,
String topicName,
long highWatermark) throws IOException {
// Emit GTE for snapshot commits
Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
Map<String, String> currentProps = tableMetadata.table.get().properties();

// Sending the audit count before the snapshot commit event because downstream users are more likely
// to consume this audit count API for determining completion since it is agnostic to the system (e.g. Kafka, Brooklin)
// The snapshot commit event is more for internal monitoring.
try (Timer.Context context = new Timer().time()) {
sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's try send audit count first to avoid lower counting as much as possible?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 ways to think about it:

  1. Kafka Audit is easier to check, and downstream users are more likely to use this number, so we should prioritize this step
  2. Gobblin commit GTE is what we use for Gobblin monitoring and so we should prioritize our own monitoring.

Practically speaking, the most common use case is the first (as you suggest), so I think it's a good idea to emit the audit count first

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, and in addition to that, missing one GTE won't cause alert in our monitoring system, but missing sending audit count will cause us to have less count permanently

log.info("Sending audit counts for {} took {} ms", topicName, TimeUnit.NANOSECONDS.toMillis(context.stop()));
}

try (Timer.Context context = new Timer().time()) {
submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, currentProps, highWatermark);
log.info("Sending snapshot commit event for {} took {} ms", topicName, TimeUnit.NANOSECONDS.toMillis(context.stop()));
}
}

private CompletenessWatermarkUpdater getWatermarkUpdater(String topicName, TableMetadata tableMetadata,
Map<String, String> propsToUpdate) {
return new CompletenessWatermarkUpdater(topicName, this.auditCheckGranularity, this.timeZone,
Expand Down Expand Up @@ -946,7 +982,7 @@ public void reset(String dbName, String tableName) throws IOException {
this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
}

private void submitSnapshotCommitEvent(Snapshot snapshot, TableMetadata tableMetadata, String dbName,
protected void submitSnapshotCommitEvent(Snapshot snapshot, TableMetadata tableMetadata, String dbName,
String tableName, Map<String, String> props, Long highWaterMark) {
GobblinEventBuilder gobblinTrackingEvent =
new GobblinEventBuilder(MetadataWriterKeys.ICEBERG_COMMIT_EVENT_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,16 @@
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileWriter;
Expand All @@ -39,6 +47,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.iceberg.FindFiles;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.expressions.Expressions;
Expand Down Expand Up @@ -405,7 +414,7 @@ public void testFaultTolerant() throws Exception {

@Test(dependsOnMethods={"testChangeProperty"}, groups={"icebergMetadataWriterTest"})
public void testWriteAddFileGMCECompleteness() throws IOException {
// Creating a copy of gmce with static type in GenericRecord to work with writeEnvelop method
// Creating a copy of gmce with static type in GenericRecord to work with writeEnvelope method
// without risking running into type cast runtime error.
gmce.setOperationType(OperationType.add_files);
File hourlyFile = new File(tmpDir, "testDB/testTopicCompleteness/hourly/2021/09/16/10/data.avro");
Expand Down Expand Up @@ -546,6 +555,57 @@ public void testChangePropertyGMCECompleteness() throws IOException {
Assert.assertEquals(table.properties().get(TOTAL_COUNT_COMPLETION_WATERMARK_KEY), String.valueOf(expectedWatermark));
}

@Test(dependsOnMethods={"testChangePropertyGMCECompleteness"}, groups={"icebergMetadataWriterTest"})
public void testKafkaAuditAndGTEEmittedAfterIcebergCommitDuringFlush() throws IOException {
State state = getState();
state.setProp(GobblinMCEWriter.GMCE_METADATA_WRITER_CLASSES, SpyIcebergMetadataWriter.class.getName());
GobblinMCEWriter gobblinMCEWriterWithSpy = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), state);
// Set fault tolerant dataset number to be 1 so watermark is updated
gobblinMCEWriterWithSpy.setMaxErrorDataset(1);

Assert.assertEquals(gobblinMCEWriterWithSpy.metadataWriters.size(), 1);
Assert.assertEquals(gobblinMCEWriterWithSpy.metadataWriters.get(0).getClass().getName(), SpyIcebergMetadataWriter.class.getName());
SpyIcebergMetadataWriter spyIcebergMetadataWriter =
(SpyIcebergMetadataWriter) gobblinMCEWriterWithSpy.metadataWriters.get(0);


// For quiet topics, watermark should always be beginning of current hour
File hourlyFile = new File(tmpDir, "testDB/testTopic/hourly/2021/09/16/11/failAfterCommit.avro");
Files.createParentDirs(hourlyFile);
writeRecord(hourlyFile);
Assert.assertTrue(hourlyFile.exists());
gmce.setOldFilePrefixes(null);
gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
.setFilePath(hourlyFile.toString())
.setFileFormat("avro")
.setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
.build()));
gmce.setOperationType(OperationType.add_files);
gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "4000-4001").build());
gmce.setAllowedMetadataWriters(new ArrayList<>());
GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(), gmce);
gobblinMCEWriterWithSpy.writeEnvelope(new RecordEnvelope<>(genericGmce,
new KafkaStreamingExtractor.KafkaWatermark(
new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(70L))));

gobblinMCEWriterWithSpy.flush();

Table table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");

// get the file that patches the path of the file that failed to be added
Iterator<org.apache.iceberg.DataFile> result = FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", hourlyFile.getAbsolutePath())).collect().iterator();
Assert.assertTrue(result.hasNext());
Assert.assertEquals(result.next().path(), hourlyFile.getAbsolutePath());
Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), "0-4001");

// The audit count
Assert.assertEquals(spyIcebergMetadataWriter.methodsCalledCounter.get("postCommit").get(), 1);
Assert.assertEquals(spyIcebergMetadataWriter.methodsCalledCounter.get("sendAuditCounts"), null);
Assert.assertEquals(spyIcebergMetadataWriter.methodsCalledCounter.get("submitSnapshotCommitEvent"), null);
}

private String writeRecord(File file) throws IOException {
GenericData.Record record = new GenericData.Record(avroDataSchema);
record.put("id", 1L);
Expand Down Expand Up @@ -621,5 +681,40 @@ public TestAuditCountVerifier(State state, AuditCountClient client) {
super(state, client);
}
}

/**
* A spy class for IcebergMetadataWriter to track the methods called and intentionally
* invoke failure after the iceberg transaction is committed
*/
public static class SpyIcebergMetadataWriter extends IcebergMetadataWriter {
public Map<String, AtomicInteger> methodsCalledCounter = new HashMap<>();

public SpyIcebergMetadataWriter(State state)
throws IOException {
super(state);
}

protected void postCommit(TableMetadata tableMetadata, String dbName, String tableName, String topicName,
long highWatermark) {
String methodName = new Object() {}.getClass().getEnclosingMethod().getName();
methodsCalledCounter.putIfAbsent(methodName, new AtomicInteger(0));
methodsCalledCounter.get(methodName).incrementAndGet();
throw new RuntimeException("Intentionally aborting postcommit for testing");
}

@Override
public void sendAuditCounts(String topicName, Collection<String> serializedAuditCountMaps) {
String methodName = new Object() {}.getClass().getEnclosingMethod().getName();
methodsCalledCounter.putIfAbsent(methodName, new AtomicInteger(0));
methodsCalledCounter.get(methodName).incrementAndGet();
}

protected void submitSnapshotCommitEvent(Snapshot snapshot, TableMetadata tableMetadata, String dbName,
String tableName, Map<String, String> props, Long highWaterMark) {
String methodName = new Object() {}.getClass().getEnclosingMethod().getName();
methodsCalledCounter.putIfAbsent(methodName, new AtomicInteger(0));
methodsCalledCounter.get(methodName).incrementAndGet();
}
}
}