diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java index 92d0e73746ee..9318149d4b2b 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java @@ -52,6 +52,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; import java.util.UUID; import java.util.function.BiFunction; @@ -1003,6 +1004,25 @@ public Map getTableProperties(ConnectorSession session, JdbcTabl return emptyMap(); } + @Override + public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle) + { + checkArgument(handle.isNamedRelation(), "Unable to delete from synthetic table: %s", handle); + checkArgument(handle.getLimit().isEmpty(), "Unable to delete when limit is set: %s", handle); + checkArgument(handle.getSortOrder().isEmpty(), "Unable to delete when sort order is set: %s", handle); + try (Connection connection = connectionFactory.openConnection(session)) { + verify(connection.getAutoCommit()); + QueryBuilder queryBuilder = new QueryBuilder(this); + PreparedQuery preparedQuery = queryBuilder.prepareDelete(session, connection, handle.getRequiredNamedRelation(), handle.getConstraint()); + try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(session, connection, preparedQuery)) { + return OptionalLong.of(preparedStatement.executeUpdate()); + } + } + catch (SQLException e) { + throw new TrinoException(JDBC_ERROR, e); + } + } + protected String quoted(@Nullable String catalog, @Nullable String schema, String table) { StringBuilder sb = new StringBuilder(); diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java index f463ed39e711..b4aaa8f058bc 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java @@ -48,6 +48,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -451,6 +452,14 @@ public void onDataChanged(JdbcTableHandle handle) invalidateCache(statisticsCache, key -> key.tableHandle.equals(handle)); } + @Override + public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle) + { + OptionalLong deletedRowsCount = delegate.delete(session, handle); + onDataChanged(handle.getRequiredNamedRelation().getSchemaTableName()); + return deletedRowsCount; + } + private JdbcIdentityCacheKey getIdentityKey(ConnectorSession session) { return identityMapping.getRemoteUserCacheKey(JdbcIdentity.from(session)); diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultJdbcMetadata.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultJdbcMetadata.java index f887dca5ef56..d72a04ecc420 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultJdbcMetadata.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultJdbcMetadata.java @@ -30,6 +30,7 @@ import io.trino.spi.connector.ConnectorOutputTableHandle; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorTableHandle; +import io.trino.spi.connector.ConnectorTableLayoutHandle; import io.trino.spi.connector.ConnectorTableMetadata; import io.trino.spi.connector.ConnectorTableProperties; import io.trino.spi.connector.ConnectorTableSchema; @@ -57,6 +58,7 @@ import io.trino.spi.statistics.ComputedStatistics; import io.trino.spi.statistics.TableStatistics; +import java.sql.Types; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -75,7 +77,9 @@ import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isAggregationPushdownEnabled; import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isJoinPushdownEnabled; import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isTopNPushdownEnabled; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.StandardErrorCode.PERMISSION_DENIED; +import static io.trino.spi.type.BigintType.BIGINT; import static java.lang.Math.max; import static java.util.Objects.requireNonNull; @@ -660,6 +664,40 @@ public Optional finishInsert(ConnectorSession session, return Optional.empty(); } + @Override + public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle) + { + // The column is used for row-level delete, which is not supported, but it's required during analysis anyway. + return new JdbcColumnHandle( + "$update_row_id", + new JdbcTypeHandle(Types.BIGINT, Optional.of("bigint"), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()), + BIGINT); + } + + @Override + public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle) + { + throw new TrinoException(NOT_SUPPORTED, "Unsupported delete"); + } + + @Override + public boolean supportsMetadataDelete(ConnectorSession session, ConnectorTableHandle tableHandle, ConnectorTableLayoutHandle tableLayoutHandle) + { + return true; + } + + @Override + public Optional applyDelete(ConnectorSession session, ConnectorTableHandle handle) + { + return Optional.of(handle); + } + + @Override + public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle handle) + { + return jdbcClient.delete(session, (JdbcTableHandle) handle); + } + @Override public void setColumnComment(ConnectorSession session, ConnectorTableHandle table, ColumnHandle column, Optional comment) { diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForwardingJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForwardingJdbcClient.java index de277b28f02e..7d5ffb146ab6 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForwardingJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForwardingJdbcClient.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; import java.util.function.Supplier; @@ -332,4 +333,10 @@ public Optional getTableScanRedirection(Conn { return delegate().getTableScanRedirection(session, tableHandle); } + + @Override + public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle) + { + return delegate().delete(session, handle); + } } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcClient.java index 97a9e04139b7..caf8f11a5cba 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcClient.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; @@ -171,4 +172,6 @@ default Optional getTableScanRedirection(Con { return Optional.empty(); } + + OptionalLong delete(ConnectorSession session, JdbcTableHandle handle); } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/QueryBuilder.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/QueryBuilder.java index 1d140997e64a..e3a5b4c2afd9 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/QueryBuilder.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/QueryBuilder.java @@ -158,6 +158,23 @@ protected static String formatJoinType(JoinType joinType) throw new IllegalStateException("Unsupported join type: " + joinType); } + public PreparedQuery prepareDelete( + ConnectorSession session, + Connection connection, + JdbcNamedRelationHandle baseRelation, + TupleDomain tupleDomain) + { + String sql = "DELETE FROM " + getRelation(baseRelation.getRemoteTableName()); + + ImmutableList.Builder accumulator = ImmutableList.builder(); + + List clauses = toConjuncts(session, connection, tupleDomain, accumulator::add); + if (!clauses.isEmpty()) { + sql += " WHERE " + Joiner.on(" AND ").join(clauses); + } + return new PreparedQuery(sql, accumulator.build()); + } + public PreparedStatement prepareStatement( ConnectorSession session, Connection connection, diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/JdbcClientStats.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/JdbcClientStats.java index 4e61f56e22f5..5d4a1cec59ad 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/JdbcClientStats.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/JdbcClientStats.java @@ -52,6 +52,7 @@ public final class JdbcClientStats private final JdbcApiStats toWriteMapping = new JdbcApiStats(); private final JdbcApiStats implementAggregation = new JdbcApiStats(); private final JdbcApiStats getTableScanRedirection = new JdbcApiStats(); + private final JdbcApiStats delete = new JdbcApiStats(); @Managed @Nested @@ -290,4 +291,11 @@ public JdbcApiStats getGetTableScanRedirection() { return getTableScanRedirection; } + + @Managed + @Nested + public JdbcApiStats getDelete() + { + return delete; + } } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/StatisticsAwareJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/StatisticsAwareJdbcClient.java index cbabf66ebb65..e8b53242ef94 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/StatisticsAwareJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/StatisticsAwareJdbcClient.java @@ -50,6 +50,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; import static java.util.Objects.requireNonNull; @@ -351,4 +352,10 @@ public Optional getTableScanRedirection(Conn { return stats.getGetTableScanRedirection().wrap(() -> delegate().getTableScanRedirection(session, tableHandle)); } + + @Override + public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle) + { + return stats.getDelete().wrap(() -> delegate().delete(session, handle)); + } } diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorSmokeTest.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorSmokeTest.java index 7bad3e4bd09f..709073084586 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorSmokeTest.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorSmokeTest.java @@ -14,6 +14,20 @@ package io.trino.plugin.jdbc; import io.trino.testing.BaseConnectorSmokeTest; +import io.trino.testing.TestingConnectorBehavior; public abstract class BaseJdbcConnectorSmokeTest - extends BaseConnectorSmokeTest {} + extends BaseConnectorSmokeTest +{ + @Override + protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) + { + switch (connectorBehavior) { + case SUPPORTS_DELETE: + return true; + + default: + return super.hasBehavior(connectorBehavior); + } + } +} diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorTest.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorTest.java index d44d7eb35877..b8bbb45b380d 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorTest.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorTest.java @@ -32,6 +32,7 @@ import io.trino.sql.query.QueryAssertions.QueryAssert; import io.trino.testing.BaseConnectorTest; import io.trino.testing.MaterializedResult; +import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.SqlExecutor; import io.trino.testing.sql.TestTable; import io.trino.testing.sql.TestView; @@ -65,6 +66,7 @@ import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_AGGREGATION_PUSHDOWN_VARIANCE; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_CANCELLATION; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_CREATE_TABLE; +import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_DELETE; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_JOIN_PUSHDOWN; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_JOIN_PUSHDOWN_WITH_DISTINCT_FROM; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_JOIN_PUSHDOWN_WITH_FULL_JOIN; @@ -76,11 +78,13 @@ import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_TOPN_PUSHDOWN; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_TOPN_PUSHDOWN_WITH_VARCHAR; import static io.trino.testing.assertions.Assert.assertEventually; +import static io.trino.testing.sql.TestTable.randomTableSuffix; import static java.lang.String.format; import static java.util.Locale.ENGLISH; import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.TimeUnit.MINUTES; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; public abstract class BaseJdbcConnectorTest extends BaseConnectorTest @@ -95,6 +99,18 @@ public void afterClass() executor.shutdownNow(); } + @Override + protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) + { + switch (connectorBehavior) { + case SUPPORTS_DELETE: + return true; + + default: + return super.hasBehavior(connectorBehavior); + } + } + @Test public void testInsertInPresenceOfNotSupportedColumn() { @@ -1146,4 +1162,120 @@ protected TestView createSleepingView(Duration minimalSleepDuration) { throw new UnsupportedOperationException(); } + + @Test + public void testDeleteWithBigintEqualityPredicate() + { + if (!hasBehavior(SUPPORTS_DELETE)) { + assertQueryFails("DELETE FROM region WHERE regionkey = 1", "This connector does not support deletes"); + return; + } + String tableName = "test_delete_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM region", 5); + assertUpdate("DELETE FROM " + tableName + " WHERE regionkey = 1", 1); + assertQuery( + "SELECT regionkey, name FROM " + tableName, + "VALUES " + + "(0, 'AFRICA')," + + "(2, 'ASIA')," + + "(3, 'EUROPE')," + + "(4, 'MIDDLE EAST')"); + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testDeleteWithVarcharEqualityPredicate() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_delete", "(col varchar(1))", ImmutableList.of("'a'", "'A'", "null"))) { + if (!hasBehavior(SUPPORTS_DELETE)) { + assertQueryFails("DELETE FROM " + table.getName(), "This connector does not support deletes"); + return; + } + if (!hasBehavior(SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_EQUALITY)) { + assertQueryFails("DELETE FROM " + table.getName() + " WHERE col = 'A'", "Unsupported delete"); + return; + } + + assertUpdate("DELETE FROM " + table.getName() + " WHERE col = 'A'", 1); + assertQuery("SELECT * FROM " + table.getName(), "VALUES 'a', null"); + } + } + + @Test + public void testDeleteWithVarcharInequalityPredicate() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_delete", "(col varchar(1))", ImmutableList.of("'a'", "'A'", "null"))) { + if (!hasBehavior(SUPPORTS_DELETE)) { + assertQueryFails("DELETE FROM " + table.getName(), "This connector does not support deletes"); + return; + } + if (!hasBehavior(SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_INEQUALITY)) { + assertQueryFails("DELETE FROM " + table.getName() + " WHERE col != 'A'", "Unsupported delete"); + return; + } + + assertUpdate("DELETE FROM " + table.getName() + " WHERE col != 'A'", 1); + assertQuery("SELECT * FROM " + table.getName(), "VALUES 'A', null"); + } + } + + @Test + public void testDeleteWithVarcharGreaterAndLowerPredicate() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_delete", "(col varchar(1))", ImmutableList.of("'0'", "'a'", "'A'", "'b'", "null"))) { + if (!hasBehavior(SUPPORTS_DELETE)) { + assertQueryFails("DELETE FROM " + table.getName(), "This connector does not support deletes"); + return; + } + if (!hasBehavior(SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_INEQUALITY)) { + assertQueryFails("DELETE FROM " + table.getName() + " WHERE col < 'A'", "Unsupported delete"); + assertQueryFails("DELETE FROM " + table.getName() + " WHERE col > 'A'", "Unsupported delete"); + return; + } + + assertUpdate("DELETE FROM " + table.getName() + " WHERE col < 'A'", 1); + assertQuery("SELECT * FROM " + table.getName(), "VALUES 'a', 'A', 'b', null"); + assertUpdate("DELETE FROM " + table.getName() + " WHERE col > 'A'", 2); + assertQuery("SELECT * FROM " + table.getName(), "VALUES 'A', null"); + } + } + + @Override + public void testDeleteWithComplexPredicate() + { + if (!hasBehavior(SUPPORTS_DELETE)) { + assertQueryFails("DELETE FROM nation", "This connector does not support deletes"); + return; + } + assertThatThrownBy(() -> super.testDeleteWithComplexPredicate()) + .hasStackTraceContaining("TrinoException: Unsupported delete"); + } + + @Override + public void testDeleteWithSubquery() + { + if (!hasBehavior(SUPPORTS_DELETE)) { + assertQueryFails("DELETE FROM nation", "This connector does not support deletes"); + return; + } + assertThatThrownBy(() -> super.testDeleteWithSubquery()) + .hasStackTraceContaining("TrinoException: Unsupported delete"); + } + + @Override + public void testDeleteWithSemiJoin() + { + if (!hasBehavior(SUPPORTS_DELETE)) { + assertQueryFails("DELETE FROM nation", "This connector does not support deletes"); + return; + } + assertThatThrownBy(() -> super.testDeleteWithSemiJoin()) + .hasStackTraceContaining("TrinoException: Unsupported delete"); + } + + @Override + public void testDeleteWithVarcharPredicate() + { + throw new SkipException("This is implemented by testDeleteWithVarcharEqualityPredicate"); + } } diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java index 20184c30f9bc..f41444072bf2 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java @@ -847,6 +847,48 @@ public void testDelete() assertEquals(execute("SELECT * FROM " + keyspaceAndTable + whereMultiplePartitionKey).getRowCount(), 0); } + @Override + public void testDeleteWithComplexPredicate() + { + assertThatThrownBy(super::testDeleteWithComplexPredicate) + .hasStackTraceContaining("Deleting without partition key is not supported"); + } + + @Override + public void testDeleteWithSemiJoin() + { + assertThatThrownBy(super::testDeleteWithSemiJoin) + .hasStackTraceContaining("Deleting without partition key is not supported"); + } + + @Override + public void testDeleteWithSubquery() + { + assertThatThrownBy(super::testDeleteWithSubquery) + .hasStackTraceContaining("Deleting without partition key is not supported"); + } + + @Override + public void testDeleteWithVarcharPredicate() + { + assertThatThrownBy(super::testDeleteWithVarcharPredicate) + .hasStackTraceContaining("Deleting without partition key is not supported"); + } + + @Override + public void testDeleteAllTable() + { + assertThatThrownBy(super::testDeleteAllTable) + .hasStackTraceContaining("Deleting without partition key is not supported"); + } + + @Override + public void testRowLevelDelete() + { + assertThatThrownBy(super::testRowLevelDelete) + .hasStackTraceContaining("Deleting without partition key is not supported"); + } + private void assertSelect(String tableName, boolean createdByTrino) { Type uuidType = createdByTrino ? createUnboundedVarcharType() : createVarcharType(36); diff --git a/plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseClient.java b/plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseClient.java index eeaf697c9990..f61e8c260df6 100644 --- a/plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseClient.java +++ b/plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseClient.java @@ -63,6 +63,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalLong; import java.util.UUID; import java.util.function.BiFunction; @@ -312,6 +313,13 @@ public boolean isLimitGuaranteed(ConnectorSession session) return true; } + @Override + public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle) + { + // ClickHouse does not support DELETE syntax, but is using custom: ALTER TABLE [db.]table [ON CLUSTER cluster] DELETE WHERE filter_expr + throw new TrinoException(NOT_SUPPORTED, "This connector does not support deletes"); + } + @Override public Optional toColumnMapping(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle) { diff --git a/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/TestClickHouseConnectorTest.java b/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/TestClickHouseConnectorTest.java index 7dbc6e16d680..6955303b2c28 100644 --- a/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/TestClickHouseConnectorTest.java +++ b/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/TestClickHouseConnectorTest.java @@ -72,6 +72,9 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) case SUPPORTS_ARRAY: return false; + case SUPPORTS_DELETE: + return false; + default: return super.hasBehavior(connectorBehavior); } diff --git a/plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClient.java b/plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClient.java index 1d98ef131e27..fe2385f1865f 100644 --- a/plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClient.java +++ b/plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClient.java @@ -48,6 +48,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.OptionalLong; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -222,6 +223,13 @@ public boolean isLimitGuaranteed(ConnectorSession session) return true; } + @Override + public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle) + { + // DELETE statement is not yet support in Druid (Avatica JDBC, see https://issues.apache.org/jira/browse/CALCITE-706) + throw new TrinoException(NOT_SUPPORTED, "This connector does not support deletes"); + } + @Override public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) { diff --git a/plugin/trino-druid/src/test/java/io/trino/plugin/druid/BaseDruidConnectorTest.java b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/BaseDruidConnectorTest.java index 5d1e351b988f..29982c313e95 100644 --- a/plugin/trino-druid/src/test/java/io/trino/plugin/druid/BaseDruidConnectorTest.java +++ b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/BaseDruidConnectorTest.java @@ -36,6 +36,7 @@ import static io.trino.sql.planner.assertions.PlanMatchPattern.node; import static io.trino.testing.MaterializedResult.resultBuilder; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; public abstract class BaseDruidConnectorTest extends BaseJdbcConnectorTest @@ -143,6 +144,27 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) } } + @Override + public void testDeleteWithVarcharEqualityPredicate() + { + assertThatThrownBy(() -> super.testDeleteWithVarcharEqualityPredicate()) + .hasMessage("This connector does not support creating tables"); + } + + @Override + public void testDeleteWithVarcharInequalityPredicate() + { + assertThatThrownBy(() -> super.testDeleteWithVarcharInequalityPredicate()) + .hasMessage("This connector does not support creating tables"); + } + + @Override + public void testDeleteWithVarcharGreaterAndLowerPredicate() + { + assertThatThrownBy(() -> super.testDeleteWithVarcharGreaterAndLowerPredicate()) + .hasMessage("This connector does not support creating tables"); + } + @Override protected SqlExecutor onRemoteDatabase() { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConnectorTest.java index 64c5bc0af557..bffbba8666c1 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConnectorTest.java @@ -222,6 +222,41 @@ public void testDelete() .hasStackTraceContaining("Deletes must match whole partitions for non-transactional tables"); } + @Override + public void testDeleteWithComplexPredicate() + { + assertThatThrownBy(super::testDeleteWithComplexPredicate) + .hasStackTraceContaining("Deletes must match whole partitions for non-transactional tables"); + } + + @Override + public void testDeleteWithSemiJoin() + { + assertThatThrownBy(super::testDeleteWithSemiJoin) + .hasStackTraceContaining("Deletes must match whole partitions for non-transactional tables"); + } + + @Override + public void testDeleteWithSubquery() + { + assertThatThrownBy(super::testDeleteWithSubquery) + .hasStackTraceContaining("Deletes must match whole partitions for non-transactional tables"); + } + + @Override + public void testDeleteWithVarcharPredicate() + { + assertThatThrownBy(super::testDeleteWithVarcharPredicate) + .hasStackTraceContaining("Deletes must match whole partitions for non-transactional tables"); + } + + @Override + public void testRowLevelDelete() + { + assertThatThrownBy(super::testRowLevelDelete) + .hasStackTraceContaining("Deletes must match whole partitions for non-transactional tables"); + } + @Test public void testRequiredPartitionFilter() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/AbstractTestIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/AbstractTestIcebergConnectorTest.java index 61e2281e3648..614e85399f71 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/AbstractTestIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/AbstractTestIcebergConnectorTest.java @@ -136,11 +136,51 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) @Override public void testDelete() { - // Deletes are covered with testMetadata*Delete test methods + // Deletes are covered with testMetadataDelete test methods assertThatThrownBy(super::testDelete) .hasStackTraceContaining("This connector only supports delete where one or more partitions are deleted entirely"); } + @Override + public void testDeleteWithComplexPredicate() + { + // Deletes are covered with testMetadataDelete test methods + assertThatThrownBy(super::testDeleteWithComplexPredicate) + .hasStackTraceContaining("This connector only supports delete where one or more partitions are deleted entirely"); + } + + @Override + public void testDeleteWithSemiJoin() + { + // Deletes are covered with testMetadataDelete test methods + assertThatThrownBy(super::testDeleteWithSemiJoin) + .hasStackTraceContaining("This connector only supports delete where one or more partitions are deleted entirely"); + } + + @Override + public void testDeleteWithSubquery() + { + // Deletes are covered with testMetadataDelete test methods + assertThatThrownBy(super::testDeleteWithSubquery) + .hasStackTraceContaining("This connector only supports delete where one or more partitions are deleted entirely"); + } + + @Override + public void testDeleteWithVarcharPredicate() + { + // Deletes are covered with testMetadataDelete test methods + assertThatThrownBy(super::testDeleteWithVarcharPredicate) + .hasStackTraceContaining("This connector only supports delete where one or more partitions are deleted entirely"); + } + + @Override + public void testRowLevelDelete() + { + // Deletes are covered with testMetadataDelete test methods + assertThatThrownBy(super::testRowLevelDelete) + .hasStackTraceContaining("This connector only supports delete where one or more partitions are deleted entirely"); + } + @Test @Override public void testRenameTable() diff --git a/plugin/trino-postgresql/src/main/java/io/trino/plugin/postgresql/PostgreSqlClient.java b/plugin/trino-postgresql/src/main/java/io/trino/plugin/postgresql/PostgreSqlClient.java index 6084045035b4..025d366ca34a 100644 --- a/plugin/trino-postgresql/src/main/java/io/trino/plugin/postgresql/PostgreSqlClient.java +++ b/plugin/trino-postgresql/src/main/java/io/trino/plugin/postgresql/PostgreSqlClient.java @@ -36,6 +36,8 @@ import io.trino.plugin.jdbc.ObjectReadFunction; import io.trino.plugin.jdbc.ObjectWriteFunction; import io.trino.plugin.jdbc.PredicatePushdownController; +import io.trino.plugin.jdbc.PreparedQuery; +import io.trino.plugin.jdbc.QueryBuilder; import io.trino.plugin.jdbc.ReadFunction; import io.trino.plugin.jdbc.SliceReadFunction; import io.trino.plugin.jdbc.SliceWriteFunction; @@ -110,6 +112,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalLong; import java.util.UUID; import java.util.function.BiFunction; import java.util.stream.Stream; @@ -748,6 +751,28 @@ public boolean isLimitGuaranteed(ConnectorSession session) return true; } + @Override + public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle) + { + checkArgument(handle.isNamedRelation(), "Unable to delete from synthetic table: %s", handle); + checkArgument(handle.getLimit().isEmpty(), "Unable to delete when limit is set: %s", handle); + checkArgument(handle.getSortOrder().isEmpty(), "Unable to delete when sort order is set: %s", handle); + try (Connection connection = connectionFactory.openConnection(session)) { + verify(connection.getAutoCommit()); + QueryBuilder queryBuilder = new QueryBuilder(this); + PreparedQuery preparedQuery = queryBuilder.prepareDelete(session, connection, handle.getRequiredNamedRelation(), handle.getConstraint()); + try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(session, connection, preparedQuery)) { + int affectedRowsCount = preparedStatement.executeUpdate(); + // connection.getAutoCommit() is not enough for PostgreSQL to make DELETE effective and explicit commit is required + connection.commit(); + return OptionalLong.of(affectedRowsCount); + } + } + catch (SQLException e) { + throw new TrinoException(JDBC_ERROR, e); + } + } + @Override protected boolean isSupportedJoinCondition(JdbcJoinCondition joinCondition) { diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestDistributedQueries.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestDistributedQueries.java index 31e77f53cecb..7d74f7dc4e6a 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestDistributedQueries.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestDistributedQueries.java @@ -616,19 +616,8 @@ public void testDelete() } String tableName = "test_delete_" + randomTableSuffix(); - assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM orders", "SELECT count(*) FROM orders"); - - // delete half the table, then delete the rest - assertUpdate("DELETE FROM " + tableName + " WHERE orderkey % 2 = 0", "SELECT count(*) FROM orders WHERE orderkey % 2 = 0"); - assertQuery("SELECT * FROM " + tableName, "SELECT * FROM orders WHERE orderkey % 2 <> 0"); - - assertUpdate("DELETE FROM " + tableName, "SELECT count(*) FROM orders WHERE orderkey % 2 <> 0"); - assertQuery("SELECT * FROM " + tableName, "SELECT * FROM orders LIMIT 0"); - - assertUpdate("DROP TABLE " + tableName); // delete successive parts of the table - assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM orders", "SELECT count(*) FROM orders"); assertUpdate("DELETE FROM " + tableName + " WHERE custkey <= 100", "SELECT count(*) FROM orders WHERE custkey <= 100"); @@ -642,30 +631,62 @@ public void testDelete() assertUpdate("DROP TABLE " + tableName); - // delete using a constant property - + // delete without matching any rows assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM orders", "SELECT count(*) FROM orders"); + assertUpdate("DELETE FROM " + tableName + " WHERE orderkey < 0", 0); + assertUpdate("DROP TABLE " + tableName); - assertUpdate("DELETE FROM " + tableName + " WHERE orderstatus = 'O'", "SELECT count(*) FROM orders WHERE orderstatus = 'O'"); - assertQuery("SELECT * FROM " + tableName, "SELECT * FROM orders WHERE orderstatus <> 'O'"); + // delete with a predicate that optimizes to false + assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM orders", "SELECT count(*) FROM orders"); + assertUpdate("DELETE FROM " + tableName + " WHERE orderkey > 5 AND orderkey < 4", 0); + assertUpdate("DROP TABLE " + tableName); + // test EXPLAIN ANALYZE with CTAS + assertExplainAnalyze("EXPLAIN ANALYZE CREATE TABLE " + tableName + " AS SELECT CAST(orderstatus AS VARCHAR(15)) orderstatus FROM orders"); + assertQuery("SELECT * from " + tableName, "SELECT orderstatus FROM orders"); + // check that INSERT works also + assertExplainAnalyze("EXPLAIN ANALYZE INSERT INTO " + tableName + " SELECT clerk FROM orders"); + assertQuery("SELECT * from " + tableName, "SELECT orderstatus FROM orders UNION ALL SELECT clerk FROM orders"); + // check DELETE works with EXPLAIN ANALYZE + assertExplainAnalyze("EXPLAIN ANALYZE DELETE FROM " + tableName + " WHERE TRUE"); + assertQuery("SELECT COUNT(*) from " + tableName, "SELECT 0"); assertUpdate("DROP TABLE " + tableName); + } - // delete without matching any rows + @Test + public void testDeleteWithComplexPredicate() + { + if (!supportsDelete()) { + assertQueryFails("DELETE FROM nation", "This connector does not support deletes"); + return; + } + String tableName = "test_delete_" + randomTableSuffix(); assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM orders", "SELECT count(*) FROM orders"); + + // delete half the table, then delete the rest + assertUpdate("DELETE FROM " + tableName + " WHERE orderkey % 2 = 0", "SELECT count(*) FROM orders WHERE orderkey % 2 = 0"); + assertQuery("SELECT * FROM " + tableName, "SELECT * FROM orders WHERE orderkey % 2 <> 0"); + + assertUpdate("DELETE FROM " + tableName, "SELECT count(*) FROM orders WHERE orderkey % 2 <> 0"); + assertQuery("SELECT * FROM " + tableName, "SELECT * FROM orders LIMIT 0"); + assertUpdate("DELETE FROM " + tableName + " WHERE rand() < 0", 0); - assertUpdate("DELETE FROM " + tableName + " WHERE orderkey < 0", 0); + assertUpdate("DROP TABLE " + tableName); + } - // delete with a predicate that optimizes to false + @Test + public void testDeleteWithSubquery() + { + if (!supportsDelete()) { + assertQueryFails("DELETE FROM nation", "This connector does not support deletes"); + return; + } - assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM orders", "SELECT count(*) FROM orders"); - assertUpdate("DELETE FROM " + tableName + " WHERE orderkey > 5 AND orderkey < 4", 0); - assertUpdate("DROP TABLE " + tableName); + String tableName = "test_delete_" + randomTableSuffix(); // delete using a subquery - assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM nation", 25); assertUpdate("DELETE FROM " + tableName + " WHERE regionkey IN (SELECT regionkey FROM region WHERE name LIKE 'A%')", 15); @@ -675,8 +696,26 @@ public void testDelete() assertUpdate("DROP TABLE " + tableName); - // delete with multiple SemiJoin + // delete using a scalar and EXISTS subquery + assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM orders", "SELECT count(*) FROM orders"); + assertUpdate("DELETE FROM " + tableName + " WHERE orderkey = (SELECT orderkey FROM orders ORDER BY orderkey LIMIT 1)", 1); + assertUpdate("DELETE FROM " + tableName + " WHERE orderkey = (SELECT orderkey FROM orders WHERE false)", 0); + assertUpdate("DELETE FROM " + tableName + " WHERE EXISTS(SELECT 1 WHERE false)", 0); + assertUpdate("DELETE FROM " + tableName + " WHERE EXISTS(SELECT 1)", "SELECT count(*) - 1 FROM orders"); + assertUpdate("DROP TABLE " + tableName); + } + @Test + public void testDeleteWithSemiJoin() + { + if (!supportsDelete()) { + assertQueryFails("DELETE FROM nation", "This connector does not support deletes"); + return; + } + + String tableName = "test_delete_" + randomTableSuffix(); + + // delete with multiple SemiJoin assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM nation", 25); assertUpdate( @@ -693,7 +732,6 @@ public void testDelete() assertUpdate("DROP TABLE " + tableName); // delete with SemiJoin null handling - assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM orders", "SELECT count(*) FROM orders"); assertUpdate( @@ -707,24 +745,22 @@ public void testDelete() "WHERE (orderkey IN (SELECT CASE WHEN orderkey % 3 = 0 THEN NULL ELSE orderkey END FROM lineitem)) IS NOT NULL\n"); assertUpdate("DROP TABLE " + tableName); + } - // delete using a scalar and EXISTS subquery + @Test + public void testDeleteWithVarcharPredicate() + { + if (!supportsDelete()) { + assertQueryFails("DELETE FROM nation", "This connector does not support deletes"); + return; + } + + String tableName = "test_delete_" + randomTableSuffix(); assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM orders", "SELECT count(*) FROM orders"); - assertUpdate("DELETE FROM " + tableName + " WHERE orderkey = (SELECT orderkey FROM orders ORDER BY orderkey LIMIT 1)", 1); - assertUpdate("DELETE FROM " + tableName + " WHERE orderkey = (SELECT orderkey FROM orders WHERE false)", 0); - assertUpdate("DELETE FROM " + tableName + " WHERE EXISTS(SELECT 1 WHERE false)", 0); - assertUpdate("DELETE FROM " + tableName + " WHERE EXISTS(SELECT 1)", "SELECT count(*) - 1 FROM orders"); - assertUpdate("DROP TABLE " + tableName); - // test EXPLAIN ANALYZE with CTAS - assertExplainAnalyze("EXPLAIN ANALYZE CREATE TABLE " + tableName + " AS SELECT CAST(orderstatus AS VARCHAR(15)) orderstatus FROM orders"); - assertQuery("SELECT * from " + tableName, "SELECT orderstatus FROM orders"); - // check that INSERT works also - assertExplainAnalyze("EXPLAIN ANALYZE INSERT INTO " + tableName + " SELECT clerk FROM orders"); - assertQuery("SELECT * from " + tableName, "SELECT orderstatus FROM orders UNION ALL SELECT clerk FROM orders"); - // check DELETE works with EXPLAIN ANALYZE - assertExplainAnalyze("EXPLAIN ANALYZE DELETE FROM " + tableName + " WHERE TRUE"); - assertQuery("SELECT COUNT(*) from " + tableName, "SELECT 0"); + assertUpdate("DELETE FROM " + tableName + " WHERE orderstatus = 'O'", "SELECT count(*) FROM orders WHERE orderstatus = 'O'"); + assertQuery("SELECT * FROM " + tableName, "SELECT * FROM orders WHERE orderstatus <> 'O'"); + assertUpdate("DROP TABLE " + tableName); } diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorSmokeTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorSmokeTest.java index 09ed9d4f667a..95811f1455fa 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorSmokeTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorSmokeTest.java @@ -183,7 +183,9 @@ public void testDelete() assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM region", 5); assertUpdate("DELETE FROM " + tableName + " WHERE regionkey = 2", 1); - assertThat(query("SELECT regionkey FROM " + tableName)) + assertThat(query("SELECT * FROM " + tableName + " WHERE regionkey = 2")) + .returnsEmptyResult(); + assertThat(query("SELECT cast(regionkey AS integer) FROM " + tableName)) .skippingTypesCheck() .matches("VALUES 0, 1, 3, 4"); diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java index 768d0734ef44..8435d420ddc2 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java @@ -37,6 +37,7 @@ import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_INSERT; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_RENAME_TABLE; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_RENAME_TABLE_ACROSS_SCHEMAS; +import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ROW_LEVEL_DELETE; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_TOPN_PUSHDOWN; import static io.trino.testing.assertions.Assert.assertEquals; import static io.trino.testing.sql.TestTable.randomTableSuffix; @@ -725,4 +726,38 @@ public void testRenameTableAcrossSchema() assertFalse(getQueryRunner().tableExists(getSession(), tableName)); assertFalse(getQueryRunner().tableExists(getSession(), renamedTable)); } + + @Test + public void testDeleteAllTable() + { + if (!hasBehavior(SUPPORTS_DELETE)) { + assertQueryFails("DELETE FROM region", "This connector does not support deletes"); + return; + } + + String tableName = "test_delete_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM nation", 25); + // not using assertUpdate as some connectors provide update count and some not + getQueryRunner().execute("DELETE FROM " + tableName); + assertQuery("SELECT count(*) FROM " + tableName, "VALUES 0"); + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testRowLevelDelete() + { + if (!hasBehavior(SUPPORTS_DELETE)) { + assertQueryFails("DELETE FROM region", "This connector does not support deletes"); + return; + } + if (!hasBehavior(SUPPORTS_ROW_LEVEL_DELETE)) { + assertQueryFails("DELETE FROM nation WHERE nationkey = 2", "This connector does not support deletes"); + return; + } + String tableName = "test_delete_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM nation", 25); + assertUpdate("DELETE FROM " + tableName + " WHERE nationkey = 2", 1); + assertQuery("SELECT count(*) FROM " + tableName, "VALUES 24"); + assertUpdate("DROP TABLE " + tableName); + } }