diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 0afc8cf41af8..0018db82c147 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -565,6 +565,14 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con @Override public Optional finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection fragments, Collection computedStatistics) { + if (fragments.isEmpty()) { + // Commit the transaction if the table is being created without data + transaction.newFastAppend().commit(); + transaction.commitTransaction(); + transaction = null; + return Optional.empty(); + } + return finishInsert(session, (IcebergWritableTableHandle) tableHandle, fragments, computedStatistics); } @@ -626,13 +634,17 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto @Override public Optional finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection fragments, Collection computedStatistics) { - IcebergWritableTableHandle table = (IcebergWritableTableHandle) insertHandle; - Table icebergTable = transaction.table(); - List commitTasks = fragments.stream() .map(slice -> commitTaskCodec.fromJson(slice.getBytes())) .collect(toImmutableList()); + if (commitTasks.isEmpty()) { + transaction = null; + return Optional.empty(); + } + + IcebergWritableTableHandle table = (IcebergWritableTableHandle) insertHandle; + Table icebergTable = transaction.table(); Type[] partitionColumnTypes = icebergTable.spec().fields().stream() .map(field -> field.transform().getResultType( icebergTable.schema().findType(field.sourceId()))) @@ -1345,6 +1357,12 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col .map(slice -> commitTaskCodec.fromJson(slice.getBytes())) .collect(toImmutableList()); + if (commitTasks.isEmpty()) { + // Avoid recording "empty" write operation + transaction = null; + return; + } + Schema schema = SchemaParser.fromJson(table.getTableSchemaJson()); RowDelta rowDelta = transaction.newRowDelta(); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index fbe3fc389fe7..7acfe8f23836 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -3482,6 +3482,78 @@ public void testUpdatingInvalidTableProperty() assertUpdate("DROP TABLE " + tableName); } + @Test + public void testEmptyCreateTableAsSelect() + { + String tableName = "test_empty_ctas_" + randomTableSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM nation WHERE false", 0); + List initialTableSnapshots = getSnapshotIds(tableName); + assertThat(initialTableSnapshots.size()) + .withFailMessage("CTAS operations must create Iceberg snapshot independently whether the selection is empty or not") + .isEqualTo(1); + assertQueryReturnsEmptyResult("SELECT * FROM " + tableName); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testEmptyInsert() + { + String tableName = "test_empty_insert_" + randomTableSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM nation", "SELECT count(*) FROM nation"); + List initialTableSnapshots = getSnapshotIds(tableName); + + assertUpdate("INSERT INTO " + tableName + " SELECT * FROM nation WHERE false", 0); + List updatedTableSnapshots = getSnapshotIds(tableName); + + assertThat(initialTableSnapshots) + .withFailMessage("INSERT operations that are not changing the state of the table must not cause the creation of a new Iceberg snapshot") + .hasSize(1) + .isEqualTo(updatedTableSnapshots); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testEmptyUpdate() + { + String tableName = "test_empty_update_" + randomTableSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM nation", "SELECT count(*) FROM nation"); + List initialTableSnapshots = getSnapshotIds(tableName); + + assertUpdate("UPDATE " + tableName + " SET comment = 'new comment' WHERE nationkey IS NULL", 0); + List updatedTableSnapshots = getSnapshotIds(tableName); + + assertThat(initialTableSnapshots) + .withFailMessage("UPDATE operations that are not changing the state of the table must not cause the creation of a new Iceberg snapshot") + .hasSize(1) + .isEqualTo(updatedTableSnapshots); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testEmptyDelete() + { + String tableName = "test_empty_delete_" + randomTableSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " WITH (format = '" + format.name() + "') AS SELECT * FROM nation", "SELECT count(*) FROM nation"); + List initialTableSnapshots = getSnapshotIds(tableName); + + assertUpdate("DELETE FROM " + tableName + " WHERE nationkey IS NULL", 0); + List updatedTableSnapshots = getSnapshotIds(tableName); + + assertThat(initialTableSnapshots) + .withFailMessage("DELETE operations that are not changing the state of the table must not cause the creation of a new Iceberg snapshot") + .hasSize(1) + .isEqualTo(updatedTableSnapshots); + + assertUpdate("DROP TABLE " + tableName); + } + private Session prepareCleanUpSession() { return Session.builder(getSession())