Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/src/main/sphinx/connector/mongodb.rst
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ MongoDB. In addition to the :ref:`globally available
statements, the connector supports the following features:

* :doc:`/sql/insert`
* :doc:`/sql/delete`
* :doc:`/sql/create-table`
* :doc:`/sql/create-table-as`
* :doc:`/sql/drop-table`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -66,6 +67,7 @@
import static io.trino.plugin.mongodb.TypeUtils.isPushdownSupportedType;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.lang.Math.toIntExact;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
Expand All @@ -79,6 +81,7 @@ public class MongoMetadata
private static final Logger log = Logger.get(MongoMetadata.class);

private static final int MAX_QUALIFIED_IDENTIFIER_BYTE_LENGTH = 120;
private static final String DELETE_ROW_ID = "_trino_artificial_column_handle_for_delete_row_id_";

private final MongoSession mongoSession;

Expand Down Expand Up @@ -286,6 +289,32 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
return Optional.empty();
}

@Override
public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
// The column is used for row-level delete, which is not supported, but it's required during analysis anyway.
return new MongoColumnHandle(DELETE_ROW_ID, BIGINT, true, Optional.empty());
}

@Override
public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode retryMode)
{
throw new TrinoException(NOT_SUPPORTED, "Unsupported delete");
}

@Override
public Optional<ConnectorTableHandle> applyDelete(ConnectorSession session, ConnectorTableHandle handle)
{
return Optional.of(handle);
}

@Override
public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle handle)
{
MongoTableHandle table = (MongoTableHandle) handle;
return OptionalLong.of(mongoSession.deleteDocuments(table.getSchemaTableName(), table.getConstraint()));
}

@Override
public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle table)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,15 @@ public List<MongoIndex> getIndexes(String schemaName, String tableName)
return MongoIndex.parse(collection.listIndexes());
}

public long deleteDocuments(SchemaTableName schemaTableName, TupleDomain<ColumnHandle> constraint)
{
Document filter = buildQuery(constraint);
log.debug("Delete documents: collection: %s, filter: %s", schemaTableName, filter);

DeleteResult result = getCollection(schemaTableName).deleteMany(filter);
return result.getDeletedCount();
}

public MongoCursor<Document> execute(MongoTableHandle tableHandle, List<MongoColumnHandle> columns)
{
Document output = new Document();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
case SUPPORTS_NOT_NULL_CONSTRAINT:
return false;

case SUPPORTS_DELETE:
return true;

default:
return super.hasBehavior(connectorBehavior);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
case SUPPORTS_NOT_NULL_CONSTRAINT:
return false;

case SUPPORTS_DELETE:
return true;

default:
return super.hasBehavior(connectorBehavior);
}
Expand Down Expand Up @@ -239,6 +242,41 @@ public void testInsertWithEveryType()
assertFalse(getQueryRunner().tableExists(getSession(), "test_insert_types_table"));
}

@Override
public void testDeleteWithComplexPredicate()
{
assertThatThrownBy(super::testDeleteWithComplexPredicate)
.hasStackTraceContaining("TrinoException: Unsupported delete");
}

@Override
public void testDeleteWithLike()
{
assertThatThrownBy(super::testDeleteWithLike)
.hasStackTraceContaining("TrinoException: Unsupported delete");
}

@Override
public void testDeleteWithSemiJoin()
{
assertThatThrownBy(super::testDeleteWithSemiJoin)
.hasStackTraceContaining("TrinoException: Unsupported delete");
}

@Override
public void testDeleteWithSubquery()
{
assertThatThrownBy(super::testDeleteWithSubquery)
.hasStackTraceContaining("TrinoException: Unsupported delete");
}

@Override
public void testExplainAnalyzeWithDeleteWithSubquery()
{
assertThatThrownBy(super::testExplainAnalyzeWithDeleteWithSubquery)
.hasStackTraceContaining("TrinoException: Unsupported delete");
}

@Test(dataProvider = "predicatePushdownProvider")
public void testPredicatePushdown(String value)
{
Expand Down