diff --git a/docs/src/main/sphinx/connector/mongodb.rst b/docs/src/main/sphinx/connector/mongodb.rst index bc0603338e82..d2c178b7256b 100644 --- a/docs/src/main/sphinx/connector/mongodb.rst +++ b/docs/src/main/sphinx/connector/mongodb.rst @@ -455,6 +455,7 @@ MongoDB. In addition to the :ref:`globally available statements, the connector supports the following features: * :doc:`/sql/insert` +* :doc:`/sql/delete` * :doc:`/sql/create-table` * :doc:`/sql/create-table-as` * :doc:`/sql/drop-table` diff --git a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoMetadata.java b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoMetadata.java index 8b4f7838b732..222d3e5df840 100644 --- a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoMetadata.java +++ b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoMetadata.java @@ -57,6 +57,7 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalInt; +import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -66,6 +67,7 @@ import static io.trino.plugin.mongodb.TypeUtils.isPushdownSupportedType; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.connector.RetryMode.NO_RETRIES; +import static io.trino.spi.type.BigintType.BIGINT; import static java.lang.Math.toIntExact; import static java.lang.String.format; import static java.nio.charset.StandardCharsets.UTF_8; @@ -79,6 +81,7 @@ public class MongoMetadata private static final Logger log = Logger.get(MongoMetadata.class); private static final int MAX_QUALIFIED_IDENTIFIER_BYTE_LENGTH = 120; + private static final String DELETE_ROW_ID = "_trino_artificial_column_handle_for_delete_row_id_"; private final MongoSession mongoSession; @@ -286,6 +289,32 @@ public Optional finishInsert(ConnectorSession session, return Optional.empty(); } + @Override + public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle) + { + // The column is used for row-level delete, which is not supported, but it's required during analysis anyway. + return new MongoColumnHandle(DELETE_ROW_ID, BIGINT, true, Optional.empty()); + } + + @Override + public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode retryMode) + { + throw new TrinoException(NOT_SUPPORTED, "Unsupported delete"); + } + + @Override + public Optional applyDelete(ConnectorSession session, ConnectorTableHandle handle) + { + return Optional.of(handle); + } + + @Override + public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle handle) + { + MongoTableHandle table = (MongoTableHandle) handle; + return OptionalLong.of(mongoSession.deleteDocuments(table.getSchemaTableName(), table.getConstraint())); + } + @Override public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle table) { diff --git a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java index 8d7f95dc9daf..db937cf21074 100644 --- a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java +++ b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java @@ -404,6 +404,15 @@ public List getIndexes(String schemaName, String tableName) return MongoIndex.parse(collection.listIndexes()); } + public long deleteDocuments(SchemaTableName schemaTableName, TupleDomain constraint) + { + Document filter = buildQuery(constraint); + log.debug("Delete documents: collection: %s, filter: %s", schemaTableName, filter); + + DeleteResult result = getCollection(schemaTableName).deleteMany(filter); + return result.getDeletedCount(); + } + public MongoCursor execute(MongoTableHandle tableHandle, List columns) { Document output = new Document(); diff --git a/plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/BaseMongoConnectorSmokeTest.java b/plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/BaseMongoConnectorSmokeTest.java index f071dd38671d..66112bc65b18 100644 --- a/plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/BaseMongoConnectorSmokeTest.java +++ b/plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/BaseMongoConnectorSmokeTest.java @@ -30,6 +30,9 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) case SUPPORTS_NOT_NULL_CONSTRAINT: return false; + case SUPPORTS_DELETE: + return true; + default: return super.hasBehavior(connectorBehavior); } diff --git a/plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/BaseMongoConnectorTest.java b/plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/BaseMongoConnectorTest.java index ff541beefb5a..41a0cc83f45d 100644 --- a/plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/BaseMongoConnectorTest.java +++ b/plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/BaseMongoConnectorTest.java @@ -75,6 +75,9 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) case SUPPORTS_NOT_NULL_CONSTRAINT: return false; + case SUPPORTS_DELETE: + return true; + default: return super.hasBehavior(connectorBehavior); } @@ -239,6 +242,41 @@ public void testInsertWithEveryType() assertFalse(getQueryRunner().tableExists(getSession(), "test_insert_types_table")); } + @Override + public void testDeleteWithComplexPredicate() + { + assertThatThrownBy(super::testDeleteWithComplexPredicate) + .hasStackTraceContaining("TrinoException: Unsupported delete"); + } + + @Override + public void testDeleteWithLike() + { + assertThatThrownBy(super::testDeleteWithLike) + .hasStackTraceContaining("TrinoException: Unsupported delete"); + } + + @Override + public void testDeleteWithSemiJoin() + { + assertThatThrownBy(super::testDeleteWithSemiJoin) + .hasStackTraceContaining("TrinoException: Unsupported delete"); + } + + @Override + public void testDeleteWithSubquery() + { + assertThatThrownBy(super::testDeleteWithSubquery) + .hasStackTraceContaining("TrinoException: Unsupported delete"); + } + + @Override + public void testExplainAnalyzeWithDeleteWithSubquery() + { + assertThatThrownBy(super::testExplainAnalyzeWithDeleteWithSubquery) + .hasStackTraceContaining("TrinoException: Unsupported delete"); + } + @Test(dataProvider = "predicatePushdownProvider") public void testPredicatePushdown(String value) {