Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -1003,6 +1004,25 @@ public Map<String, Object> getTableProperties(ConnectorSession session, JdbcTabl
return emptyMap();
}

@Override
public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle)
{
checkArgument(handle.isNamedRelation(), "Unable to delete from synthetic table: %s", handle);
Comment thread
kokosing marked this conversation as resolved.
Outdated
Comment thread
kokosing marked this conversation as resolved.
Outdated
checkArgument(handle.getLimit().isEmpty(), "Unable to delete when limit is set: %s", handle);
checkArgument(handle.getSortOrder().isEmpty(), "Unable to delete when sort order is set: %s", handle);
try (Connection connection = connectionFactory.openConnection(session)) {
verify(connection.getAutoCommit());
QueryBuilder queryBuilder = new QueryBuilder(this);
PreparedQuery preparedQuery = queryBuilder.prepareDelete(session, connection, handle.getRequiredNamedRelation(), handle.getConstraint());
try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(session, connection, preparedQuery)) {
return OptionalLong.of(preparedStatement.executeUpdate());
}
}
catch (SQLException e) {
throw new TrinoException(JDBC_ERROR, e);
}
}

protected String quoted(@Nullable String catalog, @Nullable String schema, String table)
{
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -451,6 +452,14 @@ public void onDataChanged(JdbcTableHandle handle)
invalidateCache(statisticsCache, key -> key.tableHandle.equals(handle));
}

@Override
public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle)
{
OptionalLong deletedRowsCount = delegate.delete(session, handle);
onDataChanged(handle.getRequiredNamedRelation().getSchemaTableName());
return deletedRowsCount;
}

private JdbcIdentityCacheKey getIdentityKey(ConnectorSession session)
{
return identityMapping.getRemoteUserCacheKey(JdbcIdentity.from(session));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableLayoutHandle;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableProperties;
import io.trino.spi.connector.ConnectorTableSchema;
Expand Down Expand Up @@ -57,6 +58,7 @@
import io.trino.spi.statistics.ComputedStatistics;
import io.trino.spi.statistics.TableStatistics;

import java.sql.Types;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand All @@ -75,7 +77,9 @@
import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isAggregationPushdownEnabled;
import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isJoinPushdownEnabled;
import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.isTopNPushdownEnabled;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.StandardErrorCode.PERMISSION_DENIED;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.lang.Math.max;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -660,6 +664,40 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
return Optional.empty();
}

@Override
public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
// The column is used for row-level delete, which is not supported, but it's required during analysis anyway.
return new JdbcColumnHandle(
"$update_row_id",
new JdbcTypeHandle(Types.BIGINT, Optional.of("bigint"), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()),
BIGINT);
}

@Override
public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
{
throw new TrinoException(NOT_SUPPORTED, "Unsupported delete");
}

@Override
public boolean supportsMetadataDelete(ConnectorSession session, ConnectorTableHandle tableHandle, ConnectorTableLayoutHandle tableLayoutHandle)
{
return true;
}

@Override
public Optional<ConnectorTableHandle> applyDelete(ConnectorSession session, ConnectorTableHandle handle)
{
return Optional.of(handle);
}

@Override
public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle handle)
{
return jdbcClient.delete(session, (JdbcTableHandle) handle);
}

@Override
public void setColumnComment(ConnectorSession session, ConnectorTableHandle table, ColumnHandle column, Optional<String> comment)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Supplier;

Expand Down Expand Up @@ -332,4 +333,10 @@ public Optional<TableScanRedirectApplicationResult> getTableScanRedirection(Conn
{
return delegate().getTableScanRedirection(session, tableHandle);
}

@Override
public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle)
{
return delegate().delete(session, handle);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;

import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
Expand Down Expand Up @@ -171,4 +172,6 @@ default Optional<TableScanRedirectApplicationResult> getTableScanRedirection(Con
{
return Optional.empty();
}

OptionalLong delete(ConnectorSession session, JdbcTableHandle handle);
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,23 @@ protected static String formatJoinType(JoinType joinType)
throw new IllegalStateException("Unsupported join type: " + joinType);
}

public PreparedQuery prepareDelete(
ConnectorSession session,
Connection connection,
JdbcNamedRelationHandle baseRelation,
TupleDomain<ColumnHandle> tupleDomain)
{
String sql = "DELETE FROM " + getRelation(baseRelation.getRemoteTableName());

ImmutableList.Builder<QueryParameter> accumulator = ImmutableList.builder();

List<String> clauses = toConjuncts(session, connection, tupleDomain, accumulator::add);
if (!clauses.isEmpty()) {
sql += " WHERE " + Joiner.on(" AND ").join(clauses);
}
return new PreparedQuery(sql, accumulator.build());
Comment thread
kokosing marked this conversation as resolved.
Outdated
}

public PreparedStatement prepareStatement(
ConnectorSession session,
Connection connection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public final class JdbcClientStats
private final JdbcApiStats toWriteMapping = new JdbcApiStats();
private final JdbcApiStats implementAggregation = new JdbcApiStats();
private final JdbcApiStats getTableScanRedirection = new JdbcApiStats();
private final JdbcApiStats delete = new JdbcApiStats();

@Managed
@Nested
Expand Down Expand Up @@ -290,4 +291,11 @@ public JdbcApiStats getGetTableScanRedirection()
{
return getTableScanRedirection;
}

@Managed
@Nested
public JdbcApiStats getDelete()
{
return delete;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;

import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -351,4 +352,10 @@ public Optional<TableScanRedirectApplicationResult> getTableScanRedirection(Conn
{
return stats.getGetTableScanRedirection().wrap(() -> delegate().getTableScanRedirection(session, tableHandle));
}

@Override
public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle)
{
return stats.getDelete().wrap(() -> delegate().delete(session, handle));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@
package io.trino.plugin.jdbc;

import io.trino.testing.BaseConnectorSmokeTest;
import io.trino.testing.TestingConnectorBehavior;

public abstract class BaseJdbcConnectorSmokeTest
extends BaseConnectorSmokeTest {}
extends BaseConnectorSmokeTest
{
@Override
protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
{
switch (connectorBehavior) {
Comment thread
kokosing marked this conversation as resolved.
Outdated
case SUPPORTS_DELETE:
return true;

default:
return super.hasBehavior(connectorBehavior);
}
}
}
Loading