diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorFactory.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorFactory.java index 167a45d704bb0..da9a7f8f41ab8 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorFactory.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorFactory.java @@ -23,6 +23,7 @@ import com.facebook.presto.spi.connector.ConnectorContext; import com.facebook.presto.spi.connector.ConnectorFactory; import com.google.inject.Injector; +import com.google.inject.Module; import com.google.inject.Scopes; import com.google.inject.TypeLiteral; @@ -39,10 +40,12 @@ public class KafkaConnectorFactory implements ConnectorFactory { + private final Module extension; private final Optional>> tableDescriptionSupplier; - KafkaConnectorFactory(Optional>> tableDescriptionSupplier) + KafkaConnectorFactory(Module extension, Optional>> tableDescriptionSupplier) { + this.extension = requireNonNull(extension, "extension is null"); this.tableDescriptionSupplier = requireNonNull(tableDescriptionSupplier, "tableDescriptionSupplier is null"); } @@ -68,6 +71,7 @@ public Connector create(String catalogName, Map config, Connecto Bootstrap app = new Bootstrap( new JsonModule(), new KafkaConnectorModule(), + extension, binder -> { binder.bind(KafkaConnectorId.class).toInstance(new KafkaConnectorId(catalogName)); binder.bind(TypeManager.class).toInstance(context.getTypeManager()); diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaPlugin.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaPlugin.java index 284424a9a7fa4..5d983f61855b5 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaPlugin.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaPlugin.java @@ -18,6 +18,8 @@ import com.facebook.presto.spi.connector.ConnectorFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import com.google.inject.Module; +import com.google.inject.util.Modules; import java.util.Map; import java.util.Optional; @@ -31,8 +33,19 @@ public class KafkaPlugin implements Plugin { + private final Module extension; private Optional>> tableDescriptionSupplier = Optional.empty(); + public KafkaPlugin() + { + this(Modules.EMPTY_MODULE); + } + + public KafkaPlugin(Module extension) + { + this.extension = requireNonNull(extension, "extension is null"); + } + @VisibleForTesting public synchronized void setTableDescriptionSupplier(Supplier> tableDescriptionSupplier) { @@ -42,6 +55,6 @@ public synchronized void setTableDescriptionSupplier(Supplier getConnectorFactories() { - return ImmutableList.of(new KafkaConnectorFactory(tableDescriptionSupplier)); + return ImmutableList.of(new KafkaConnectorFactory(extension, tableDescriptionSupplier)); } }