Skip to content

Commit

Permalink
Throw TrinoException when Iceberg commit fails
Browse files Browse the repository at this point in the history
  • Loading branch information
pajaks committed Aug 13, 2024
1 parent f5be889 commit ca5c363
Showing 1 changed file with 22 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.UpdateStatistics;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Term;
import org.apache.iceberg.io.CloseableIterable;
Expand Down Expand Up @@ -1032,7 +1031,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
// Commit the transaction if the table is being created without data
AppendFiles appendFiles = transaction.newFastAppend();
commit(appendFiles, session);
transaction.commitTransaction();
commitTransaction(transaction, "create table");
transaction = null;
return Optional.empty();
}
Expand Down Expand Up @@ -1211,7 +1210,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
}

commit(appendFiles, session);
transaction.commitTransaction();
commitTransaction(transaction, "insert");
// TODO (https://github.com/trinodb/trino/issues/15439) this may not exactly be the snapshot we committed, if there is another writer
long newSnapshotId = transaction.table().currentSnapshot().snapshotId();
transaction = null;
Expand All @@ -1235,7 +1234,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
.setStatistics(newSnapshotId, statisticsFile)
.commit();

transaction.commitTransaction();
commitTransaction(transaction, "update statistics on insert");
}
catch (Exception e) {
// Write was committed, so at this point we cannot fail the query
Expand Down Expand Up @@ -1581,7 +1580,7 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle
Snapshot snapshot = requireNonNull(icebergTable.snapshot(optimizeHandle.snapshotId().get()), "snapshot is null");
rewriteFiles.validateFromSnapshot(snapshot.snapshotId());
commit(rewriteFiles, session);
transaction.commitTransaction();
commitTransaction(transaction, "optimize");

// TODO (https://github.com/trinodb/trino/issues/15439) this may not exactly be the snapshot we committed, if there is another writer
long newSnapshotId = transaction.table().currentSnapshot().snapshotId();
Expand All @@ -1599,7 +1598,7 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle
transaction.updateStatistics()
.setStatistics(newSnapshotId, newStatsFile)
.commit();
transaction.commitTransaction();
commitTransaction(transaction, "update statistics after optimize");
}
catch (Exception e) {
// Write was committed, so at this point we cannot fail the query
Expand All @@ -1609,6 +1608,16 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle
transaction = null;
}

private static void commitTransaction(Transaction transaction, String operation)
{
try {
transaction.commitTransaction();
}
catch (RuntimeException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, format("Failed to commit during %s: %s", operation, firstNonNull(e.getMessage(), e)), e);
}
}

@Override
public void executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle)
{
Expand Down Expand Up @@ -1639,7 +1648,7 @@ private void executeDropExtendedStats(ConnectorSession session, IcebergTableExec
updateStatistics.removeStatistics(statisticsFile.snapshotId());
}
updateStatistics.commit();
transaction.commitTransaction();
commitTransaction(transaction, "drop extended stats");
transaction = null;
}

Expand Down Expand Up @@ -1919,12 +1928,7 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
}
}

try {
transaction.commitTransaction();
}
catch (RuntimeException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to commit new table properties", e);
}
commitTransaction(transaction, "set table properties");
}

private static void updatePartitioning(Table icebergTable, Transaction transaction, List<String> partitionColumns)
Expand Down Expand Up @@ -2356,7 +2360,7 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
"Unexpected computed statistics that cannot be attached to a snapshot because none exists: %s",
computedStatistics);

transaction.commitTransaction();
commitTransaction(transaction, "statistics collection");
transaction = null;
return;
}
Expand All @@ -2373,7 +2377,7 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
.setStatistics(snapshotId, statisticsFile)
.commit();

transaction.commitTransaction();
commitTransaction(transaction, "statistics collection");
transaction = null;
}

Expand Down Expand Up @@ -2532,13 +2536,8 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col
}

rowDelta.validateDataFilesExist(referencedDataFiles.build());
try {
commit(rowDelta, session);
transaction.commitTransaction();
}
catch (ValidationException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to commit Iceberg update to table: " + table.getSchemaTableName(), e);
}
commit(rowDelta, session);
commitTransaction(transaction, "write");
}

@Override
Expand Down Expand Up @@ -3091,7 +3090,7 @@ public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
appendFiles.set(TRINO_QUERY_START_TIME, session.getStart().toString());
commit(appendFiles, session);

transaction.commitTransaction();
commitTransaction(transaction, "refresh materialized view");
transaction = null;
fromSnapshotForRefresh = Optional.empty();
return Optional.of(new HiveWrittenPartitions(commitTasks.stream()
Expand Down

0 comments on commit ca5c363

Please sign in to comment.