Skip to content

Commit

Permalink
Throw TrinoException when Iceberg commit fails
Browse files Browse the repository at this point in the history
  • Loading branch information
pajaks committed Aug 16, 2024
1 parent f5be889 commit a22bec7
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
// Commit the transaction if the table is being created without data
AppendFiles appendFiles = transaction.newFastAppend();
commit(appendFiles, session);
transaction.commitTransaction();
commitTransaction(transaction, "create table");
transaction = null;
return Optional.empty();
}
Expand Down Expand Up @@ -1211,7 +1211,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
}

commit(appendFiles, session);
transaction.commitTransaction();
commitTransaction(transaction, "insert");
// TODO (https://github.com/trinodb/trino/issues/15439) this may not exactly be the snapshot we committed, if there is another writer
long newSnapshotId = transaction.table().currentSnapshot().snapshotId();
transaction = null;
Expand All @@ -1235,7 +1235,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
.setStatistics(newSnapshotId, statisticsFile)
.commit();

transaction.commitTransaction();
commitTransaction(transaction, "update statistics on insert");
}
catch (Exception e) {
// Write was committed, so at this point we cannot fail the query
Expand Down Expand Up @@ -1581,7 +1581,7 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle
Snapshot snapshot = requireNonNull(icebergTable.snapshot(optimizeHandle.snapshotId().get()), "snapshot is null");
rewriteFiles.validateFromSnapshot(snapshot.snapshotId());
commit(rewriteFiles, session);
transaction.commitTransaction();
commitTransaction(transaction, "optimize");

// TODO (https://github.com/trinodb/trino/issues/15439) this may not exactly be the snapshot we committed, if there is another writer
long newSnapshotId = transaction.table().currentSnapshot().snapshotId();
Expand All @@ -1599,7 +1599,7 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle
transaction.updateStatistics()
.setStatistics(newSnapshotId, newStatsFile)
.commit();
transaction.commitTransaction();
commitTransaction(transaction, "update statistics after optimize");
}
catch (Exception e) {
// Write was committed, so at this point we cannot fail the query
Expand All @@ -1609,6 +1609,16 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle
transaction = null;
}

private static void commitTransaction(Transaction transaction, String operation)
{
try {
transaction.commitTransaction();
}
catch (ValidationException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, format("Failed to commit during %s: %s", operation, firstNonNull(e.getMessage(), e)), e);
}
}

@Override
public void executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle)
{
Expand Down Expand Up @@ -1639,7 +1649,7 @@ private void executeDropExtendedStats(ConnectorSession session, IcebergTableExec
updateStatistics.removeStatistics(statisticsFile.snapshotId());
}
updateStatistics.commit();
transaction.commitTransaction();
commitTransaction(transaction, "drop extended stats");
transaction = null;
}

Expand Down Expand Up @@ -1919,12 +1929,7 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
}
}

try {
transaction.commitTransaction();
}
catch (RuntimeException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to commit new table properties", e);
}
commitTransaction(transaction, "set table properties");
}

private static void updatePartitioning(Table icebergTable, Transaction transaction, List<String> partitionColumns)
Expand Down Expand Up @@ -2356,7 +2361,7 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
"Unexpected computed statistics that cannot be attached to a snapshot because none exists: %s",
computedStatistics);

transaction.commitTransaction();
commitTransaction(transaction, "statistics collection");
transaction = null;
return;
}
Expand All @@ -2373,7 +2378,7 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
.setStatistics(snapshotId, statisticsFile)
.commit();

transaction.commitTransaction();
commitTransaction(transaction, "statistics collection");
transaction = null;
}

Expand Down Expand Up @@ -2532,13 +2537,8 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col
}

rowDelta.validateDataFilesExist(referencedDataFiles.build());
try {
commit(rowDelta, session);
transaction.commitTransaction();
}
catch (ValidationException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to commit Iceberg update to table: " + table.getSchemaTableName(), e);
}
commit(rowDelta, session);
commitTransaction(transaction, "write");
}

@Override
Expand Down Expand Up @@ -3091,7 +3091,7 @@ public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
appendFiles.set(TRINO_QUERY_START_TIME, session.getStart().toString());
commit(appendFiles, session);

transaction.commitTransaction();
commitTransaction(transaction, "refresh materialized view");
transaction = null;
fromSnapshotForRefresh = Optional.empty();
return Optional.of(new HiveWrittenPartitions(commitTasks.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void testDeleteRowsConcurrently()
ExecutorService executor = newFixedThreadPool(threads);
List<String> rows = ImmutableList.of("(1, 0, 0, 0)", "(0, 1, 0, 0)", "(0, 0, 1, 0)", "(0, 0, 0, 1)");

String[] expectedErrors = new String[] {"Failed to commit Iceberg update to table:", "Failed to replace table due to concurrent updates:"};
String[] expectedErrors = new String[] {"Failed to commit during write:", "Failed to replace table due to concurrent updates:"};
try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_concurrent_delete",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ protected void verifyVersionedQueryFailurePermissible(Exception e)
@Override
protected void verifyConcurrentUpdateFailurePermissible(Exception e)
{
assertThat(e).hasMessageContaining("Failed to commit Iceberg update to table");
assertThat(e).hasMessageContaining("Failed to commit during write");
}

@Override
Expand Down

0 comments on commit a22bec7

Please sign in to comment.