diff --git a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoClientConfig.java b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoClientConfig.java index fa1e82640a52..eeca4862120b 100644 --- a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoClientConfig.java +++ b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoClientConfig.java @@ -45,6 +45,7 @@ public class MongoClientConfig private String requiredReplicaSetName; private String implicitRowFieldPrefix = "_pos"; private boolean projectionPushDownEnabled = true; + private boolean allowLocalScheduling; @NotNull public String getSchemaCollection() @@ -251,4 +252,17 @@ public MongoClientConfig setProjectionPushdownEnabled(boolean projectionPushDown this.projectionPushDownEnabled = projectionPushDownEnabled; return this; } + + public boolean isAllowLocalScheduling() + { + return allowLocalScheduling; + } + + @Config("mongodb.allow-local-scheduling") + @ConfigDescription("Assign mongo splits to host if worker and mongo share the same cluster") + public MongoClientConfig setAllowLocalScheduling(boolean allowLocalScheduling) + { + this.allowLocalScheduling = allowLocalScheduling; + return this; + } } diff --git a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoClientModule.java b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoClientModule.java index 7251965bcb82..f08d72802755 100644 --- a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoClientModule.java +++ b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoClientModule.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.mongodb; +import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Provides; import com.google.inject.Scopes; @@ -60,6 +61,12 @@ public void setup(Binder binder) MongoClientConfig::getTlsEnabled, new MongoSslModule())); + install(conditionalModule( + MongoClientConfig.class, + MongoClientConfig::isAllowLocalScheduling, + internalBinder -> internalBinder.bind(MongoServerDetailsProvider.class).toInstance(ImmutableList::of), + internalBinder -> internalBinder.bind(MongoServerDetailsProvider.class).to(SessionBasedMongoServerDetailsProvider.class).in(Scopes.SINGLETON))); + newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(Query.class).in(Scopes.SINGLETON); } diff --git a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoServerDetailsProvider.java b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoServerDetailsProvider.java new file mode 100644 index 000000000000..d072a47b699b --- /dev/null +++ b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoServerDetailsProvider.java @@ -0,0 +1,23 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.mongodb; + +import io.trino.spi.HostAddress; + +import java.util.List; + +public interface MongoServerDetailsProvider +{ + List getServerAddress(); +} diff --git a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSplitManager.java b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSplitManager.java index 2695498cde0d..979422b75bc2 100644 --- a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSplitManager.java +++ b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSplitManager.java @@ -14,7 +14,6 @@ package io.trino.plugin.mongodb; import com.google.inject.Inject; -import io.trino.spi.HostAddress; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorSplitSource; @@ -24,17 +23,17 @@ import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.FixedSplitSource; -import java.util.List; +import static java.util.Objects.requireNonNull; public class MongoSplitManager implements ConnectorSplitManager { - private final List addresses; + private final MongoServerDetailsProvider serverDetailsProvider; @Inject - public MongoSplitManager(MongoSession session) + public MongoSplitManager(MongoServerDetailsProvider serverDetailsProvider) { - this.addresses = session.getAddresses(); + this.serverDetailsProvider = requireNonNull(serverDetailsProvider, "serverDetailsProvider is null"); } @Override @@ -45,7 +44,7 @@ public ConnectorSplitSource getSplits( DynamicFilter dynamicFilter, Constraint constraint) { - MongoSplit split = new MongoSplit(addresses); + MongoSplit split = new MongoSplit(serverDetailsProvider.getServerAddress()); return new FixedSplitSource(split); } diff --git a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/SessionBasedMongoServerDetailsProvider.java b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/SessionBasedMongoServerDetailsProvider.java new file mode 100644 index 000000000000..0a0020274b01 --- /dev/null +++ b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/SessionBasedMongoServerDetailsProvider.java @@ -0,0 +1,39 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.mongodb; + +import com.google.inject.Inject; +import io.trino.spi.HostAddress; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +public class SessionBasedMongoServerDetailsProvider + implements MongoServerDetailsProvider +{ + private final MongoSession mongoSession; + + @Inject + public SessionBasedMongoServerDetailsProvider(MongoSession mongoSession) + { + this.mongoSession = requireNonNull(mongoSession, "mongoSession is null"); + } + + @Override + public List getServerAddress() + { + return mongoSession.getAddresses(); + } +} diff --git a/plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/TestMongoClientConfig.java b/plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/TestMongoClientConfig.java index 13bd693dfaf6..abbbc4424a1c 100644 --- a/plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/TestMongoClientConfig.java +++ b/plugin/trino-mongodb/src/test/java/io/trino/plugin/mongodb/TestMongoClientConfig.java @@ -43,7 +43,8 @@ public void testDefaults() .setWriteConcern(WriteConcernType.ACKNOWLEDGED) .setRequiredReplicaSetName(null) .setImplicitRowFieldPrefix("_pos") - .setProjectionPushdownEnabled(true)); + .setProjectionPushdownEnabled(true) + .setAllowLocalScheduling(false)); } @Test @@ -67,6 +68,7 @@ public void testExplicitPropertyMappings() .put("mongodb.required-replica-set", "replica_set") .put("mongodb.implicit-row-field-prefix", "_prefix") .put("mongodb.projection-pushdown-enabled", "false") + .put("mongodb.allow-local-scheduling", "true") .buildOrThrow(); MongoClientConfig expected = new MongoClientConfig() @@ -85,7 +87,8 @@ public void testExplicitPropertyMappings() .setWriteConcern(WriteConcernType.UNACKNOWLEDGED) .setRequiredReplicaSetName("replica_set") .setImplicitRowFieldPrefix("_prefix") - .setProjectionPushdownEnabled(false); + .setProjectionPushdownEnabled(false) + .setAllowLocalScheduling(true); assertFullMapping(properties, expected); }