From ca5c363a58959ac0a6e1a4d517924f49e86eb048 Mon Sep 17 00:00:00 2001 From: Slawomir Pajak Date: Mon, 12 Aug 2024 13:26:14 +0200 Subject: [PATCH] Throw TrinoException when Iceberg commit fails --- .../trino/plugin/iceberg/IcebergMetadata.java | 45 +++++++++---------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 796e64b35f6432..3fbc68d180a987 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -153,7 +153,6 @@ import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.UpdateStatistics; import org.apache.iceberg.exceptions.AlreadyExistsException; -import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.Term; import org.apache.iceberg.io.CloseableIterable; @@ -1032,7 +1031,7 @@ public Optional finishCreateTable(ConnectorSession sess // Commit the transaction if the table is being created without data AppendFiles appendFiles = transaction.newFastAppend(); commit(appendFiles, session); - transaction.commitTransaction(); + commitTransaction(transaction, "create table"); transaction = null; return Optional.empty(); } @@ -1211,7 +1210,7 @@ public Optional finishInsert(ConnectorSession session, } commit(appendFiles, session); - transaction.commitTransaction(); + commitTransaction(transaction, "insert"); // TODO (https://github.com/trinodb/trino/issues/15439) this may not exactly be the snapshot we committed, if there is another writer long newSnapshotId = transaction.table().currentSnapshot().snapshotId(); transaction = null; @@ -1235,7 +1234,7 @@ public Optional finishInsert(ConnectorSession session, .setStatistics(newSnapshotId, statisticsFile) .commit(); - transaction.commitTransaction(); + commitTransaction(transaction, "update statistics on insert"); } catch (Exception e) { // Write was committed, so at this point we cannot fail the query @@ -1581,7 +1580,7 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle Snapshot snapshot = requireNonNull(icebergTable.snapshot(optimizeHandle.snapshotId().get()), "snapshot is null"); rewriteFiles.validateFromSnapshot(snapshot.snapshotId()); commit(rewriteFiles, session); - transaction.commitTransaction(); + commitTransaction(transaction, "optimize"); // TODO (https://github.com/trinodb/trino/issues/15439) this may not exactly be the snapshot we committed, if there is another writer long newSnapshotId = transaction.table().currentSnapshot().snapshotId(); @@ -1599,7 +1598,7 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle transaction.updateStatistics() .setStatistics(newSnapshotId, newStatsFile) .commit(); - transaction.commitTransaction(); + commitTransaction(transaction, "update statistics after optimize"); } catch (Exception e) { // Write was committed, so at this point we cannot fail the query @@ -1609,6 +1608,16 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle transaction = null; } + private static void commitTransaction(Transaction transaction, String operation) + { + try { + transaction.commitTransaction(); + } + catch (RuntimeException e) { + throw new TrinoException(ICEBERG_COMMIT_ERROR, format("Failed to commit during %s: %s", operation, firstNonNull(e.getMessage(), e)), e); + } + } + @Override public void executeTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { @@ -1639,7 +1648,7 @@ private void executeDropExtendedStats(ConnectorSession session, IcebergTableExec updateStatistics.removeStatistics(statisticsFile.snapshotId()); } updateStatistics.commit(); - transaction.commitTransaction(); + commitTransaction(transaction, "drop extended stats"); transaction = null; } @@ -1919,12 +1928,7 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta } } - try { - transaction.commitTransaction(); - } - catch (RuntimeException e) { - throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to commit new table properties", e); - } + commitTransaction(transaction, "set table properties"); } private static void updatePartitioning(Table icebergTable, Transaction transaction, List partitionColumns) @@ -2356,7 +2360,7 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH "Unexpected computed statistics that cannot be attached to a snapshot because none exists: %s", computedStatistics); - transaction.commitTransaction(); + commitTransaction(transaction, "statistics collection"); transaction = null; return; } @@ -2373,7 +2377,7 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH .setStatistics(snapshotId, statisticsFile) .commit(); - transaction.commitTransaction(); + commitTransaction(transaction, "statistics collection"); transaction = null; } @@ -2532,13 +2536,8 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col } rowDelta.validateDataFilesExist(referencedDataFiles.build()); - try { - commit(rowDelta, session); - transaction.commitTransaction(); - } - catch (ValidationException e) { - throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to commit Iceberg update to table: " + table.getSchemaTableName(), e); - } + commit(rowDelta, session); + commitTransaction(transaction, "write"); } @Override @@ -3091,7 +3090,7 @@ public Optional finishRefreshMaterializedView( appendFiles.set(TRINO_QUERY_START_TIME, session.getStart().toString()); commit(appendFiles, session); - transaction.commitTransaction(); + commitTransaction(transaction, "refresh materialized view"); transaction = null; fromSnapshotForRefresh = Optional.empty(); return Optional.of(new HiveWrittenPartitions(commitTasks.stream()