Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@
import static org.apache.iceberg.MetadataColumns.ROW_POSITION;
import static org.apache.iceberg.ReachableFileUtil.metadataFileLocations;
import static org.apache.iceberg.ReachableFileUtil.versionHintLocation;
import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP;
import static org.apache.iceberg.SnapshotSummary.REMOVED_EQ_DELETES_PROP;
import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
Expand Down Expand Up @@ -1345,8 +1348,16 @@ public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle
.deleteFromRowFilter(toIcebergExpression(handle.getEnforcedPredicate()))
.commit();

// TODO: it should be possible to return number of deleted records. https://github.com/trinodb/trino/issues/12055
return OptionalLong.empty();
Map<String, String> summary = icebergTable.currentSnapshot().summary();
String deletedRowsStr = summary.get(DELETED_RECORDS_PROP);
if (deletedRowsStr == null) {
// TODO Iceberg should guarantee this is always present (https://github.com/apache/iceberg/issues/4647)
return OptionalLong.empty();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make Iceberg fill DELETED_RECORDS_PROP always?
let's file a PR and link with a TODO note here.

}
long deletedRecords = Long.parseLong(deletedRowsStr);
long removedPositionDeletes = Long.parseLong(summary.getOrDefault(REMOVED_POS_DELETES_PROP, "0"));
long removedEqualityDeletes = Long.parseLong(summary.getOrDefault(REMOVED_EQ_DELETES_PROP, "0"));
return OptionalLong.of(deletedRecords - removedPositionDeletes - removedEqualityDeletes);
}

public void rollback()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import static io.trino.sql.planner.OptimizerConfig.JoinDistributionType.BROADCAST;
import static io.trino.testing.MaterializedResult.resultBuilder;
import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_DELETE;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static io.trino.testing.assertions.Assert.assertEquals;
import static io.trino.testing.assertions.Assert.assertEventually;
Expand Down Expand Up @@ -239,46 +240,6 @@ protected void checkInformationSchemaViewsForMaterializedView(String schemaName,
.hasMessageFindingMatch("(?s)Expecting.*to contain:.*\\Q[(" + viewName + ")]");
}

// Override is required because metadata deletes do not return the number of rows deleted. https://github.com/trinodb/trino/issues/12055
@Test
@Override
public void testDeleteWithComplexPredicate()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_delete_complex_", "AS SELECT * FROM orders")) {
// delete half the table, then delete the rest
assertUpdate("DELETE FROM " + table.getName() + " WHERE orderkey % 2 = 0", "SELECT count(*) FROM orders WHERE orderkey % 2 = 0");
assertQuery("SELECT * FROM " + table.getName(), "SELECT * FROM orders WHERE orderkey % 2 <> 0");

query("DELETE FROM " + table.getName());
assertQuery("SELECT * FROM " + table.getName(), "SELECT * FROM orders LIMIT 0");

assertUpdate("DELETE FROM " + table.getName() + " WHERE rand() < 0", 0);
}
}

// Override is required because metadata deletes do not return the number of rows deleted. https://github.com/trinodb/trino/issues/12055
@Test
@Override
public void testDeleteWithSubquery()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_delete_subquery", "AS SELECT * FROM nation")) {
// delete using a subquery
assertUpdate("DELETE FROM " + table.getName() + " WHERE regionkey IN (SELECT regionkey FROM region WHERE name LIKE 'A%')", 15);
assertQuery(
"SELECT * FROM " + table.getName(),
"SELECT * FROM nation WHERE regionkey IN (SELECT regionkey FROM region WHERE name NOT LIKE 'A%')");
}

try (TestTable table = new TestTable(getQueryRunner()::execute, "test_delete_subquery", "AS SELECT * FROM orders")) {
// delete using a scalar and EXISTS subquery
assertUpdate("DELETE FROM " + table.getName() + " WHERE orderkey = (SELECT orderkey FROM orders ORDER BY orderkey LIMIT 1)", 1);
assertUpdate("DELETE FROM " + table.getName() + " WHERE orderkey = (SELECT orderkey FROM orders WHERE false)", 0);
assertUpdate("DELETE FROM " + table.getName() + " WHERE EXISTS(SELECT 1 WHERE false)", 0);
query("DELETE FROM " + table.getName() + " WHERE EXISTS(SELECT 1)");
assertQuery("SELECT count(*) FROM " + table.getName(), "VALUES 0");
}
}

@Test
public void testDecimal()
{
Expand Down Expand Up @@ -1754,7 +1715,7 @@ public void testMetadataDeleteSimple()
assertUpdate("INSERT INTO test_metadata_delete_simple VALUES(1, 100), (1, 101), (1, 102), (2, 200), (2, 201), (3, 300)", 6);
assertQuery("SELECT sum(col2) FROM test_metadata_delete_simple", "SELECT 1004");
assertQuery("SELECT count(*) FROM \"test_metadata_delete_simple$partitions\"", "SELECT 3");
assertUpdate("DELETE FROM test_metadata_delete_simple WHERE col1 = 1");
assertUpdate("DELETE FROM test_metadata_delete_simple WHERE col1 = 1", 3);
assertQuery("SELECT sum(col2) FROM test_metadata_delete_simple", "SELECT 701");
assertQuery("SELECT count(*) FROM \"test_metadata_delete_simple$partitions\"", "SELECT 2");
dropTable("test_metadata_delete_simple");
Expand All @@ -1781,11 +1742,11 @@ public void testMetadataDelete()

assertQuery("SELECT COUNT(*) FROM \"test_metadata_delete$partitions\"", "SELECT 14");

assertUpdate("DELETE FROM test_metadata_delete WHERE linestatus = 'F' AND linenumber = 3");
assertUpdate("DELETE FROM test_metadata_delete WHERE linestatus = 'F' AND linenumber = 3", 5378);
assertQuery("SELECT * FROM test_metadata_delete", "SELECT orderkey, linenumber, linestatus FROM lineitem WHERE linestatus <> 'F' or linenumber <> 3");
assertQuery("SELECT count(*) FROM \"test_metadata_delete$partitions\"", "SELECT 13");

assertUpdate("DELETE FROM test_metadata_delete WHERE linestatus='O'");
assertUpdate("DELETE FROM test_metadata_delete WHERE linestatus='O'", 30049);
assertQuery("SELECT count(*) FROM \"test_metadata_delete$partitions\"", "SELECT 6");
assertQuery("SELECT * FROM test_metadata_delete", "SELECT orderkey, linenumber, linestatus FROM lineitem WHERE linestatus <> 'O' AND linenumber <> 3");

Expand Down Expand Up @@ -2279,7 +2240,7 @@ public void testOptimizedMetadataQueries()
assertQuery(session, "SELECT DISTINCT b FROM test_metadata_optimization WHERE c > 8", "VALUES (9)");

// Assert behavior after metadata delete
assertUpdate("DELETE FROM test_metadata_optimization WHERE b = 6");
assertUpdate("DELETE FROM test_metadata_optimization WHERE b = 6", 1);
assertQuery(session, "SELECT DISTINCT b FROM test_metadata_optimization", "VALUES (9)");

// TODO: assert behavior after deleting the last row of a partition, once row-level deletes are supported.
Expand Down Expand Up @@ -3294,7 +3255,7 @@ public void testExpireSnapshotsPartitionedTable()
Session sessionWithShortRetentionUnlocked = prepareCleanUpSession();
assertUpdate("CREATE TABLE " + tableName + " (col1 BIGINT, col2 BIGINT) WITH (partitioning = ARRAY['col1'])");
assertUpdate("INSERT INTO " + tableName + " VALUES(1, 100), (1, 101), (1, 102), (2, 200), (2, 201), (3, 300)", 6);
assertUpdate("DELETE FROM " + tableName + " WHERE col1 = 1");
assertUpdate("DELETE FROM " + tableName + " WHERE col1 = 1", 3);
assertUpdate("INSERT INTO " + tableName + " VALUES(4, 400)", 1);
assertQuery("SELECT sum(col2) FROM " + tableName, "SELECT 1101");
List<String> initialDataFiles = getAllDataFilesFromTableDirectory(tableName);
Expand Down Expand Up @@ -3462,6 +3423,22 @@ public void testDeleteOrphanFilesParameterValidation()
"\\QRetention specified (33.00s) is shorter than the minimum retention configured in the system (7.00d). Minimum retention can be changed with iceberg.delete_orphan_files.min-retention configuration property or iceberg.delete_orphan_files_min_retention session property");
}

@Test
public void testIfDeletesReturnsNumberOfRemovedRows()
{
String tableName = "test_delete_returns_number_of_rows_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " (key varchar, value integer) WITH (partitioning = ARRAY['key'])");
assertUpdate("INSERT INTO " + tableName + " VALUES ('one', 1)", 1);
assertUpdate("INSERT INTO " + tableName + " VALUES ('one', 2)", 1);
assertUpdate("INSERT INTO " + tableName + " VALUES ('one', 3)", 1);
assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 1)", 1);
assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 2)", 1);
assertUpdate("DELETE FROM " + tableName + " WHERE key = 'one'", 3);
assertUpdate("DELETE FROM " + tableName + " WHERE key = 'one'"); // TODO change this when iceberg will guarantee to always return this (https://github.com/apache/iceberg/issues/4647)
assertUpdate("DELETE FROM " + tableName + " WHERE key = 'three'");
assertUpdate("DELETE FROM " + tableName + " WHERE key = 'two'", 2);
}

private Session prepareCleanUpSession()
{
return Session.builder(getSession())
Expand Down