From 5a2d5a16ea5d06202106d1e2c3af638ab7e85c54 Mon Sep 17 00:00:00 2001 From: Yang Yang Date: Tue, 23 Feb 2021 23:47:25 -0800 Subject: [PATCH] Allow to extend Kafka connector MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit cherry-pick of https://github.com/trinodb/trino/commit/92e8b7b6a1b16b66ff00a87e7fe494f08db97f46 Co-Authored-By: Grzegorz KokosiƄski <7569403+kokosing@users.noreply.github.com> --- .../presto/kafka/KafkaConnectorFactory.java | 6 +++++- .../com/facebook/presto/kafka/KafkaPlugin.java | 15 ++++++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorFactory.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorFactory.java index 167a45d704bb0..da9a7f8f41ab8 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorFactory.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorFactory.java @@ -23,6 +23,7 @@ import com.facebook.presto.spi.connector.ConnectorContext; import com.facebook.presto.spi.connector.ConnectorFactory; import com.google.inject.Injector; +import com.google.inject.Module; import com.google.inject.Scopes; import com.google.inject.TypeLiteral; @@ -39,10 +40,12 @@ public class KafkaConnectorFactory implements ConnectorFactory { + private final Module extension; private final Optional>> tableDescriptionSupplier; - KafkaConnectorFactory(Optional>> tableDescriptionSupplier) + KafkaConnectorFactory(Module extension, Optional>> tableDescriptionSupplier) { + this.extension = requireNonNull(extension, "extension is null"); this.tableDescriptionSupplier = requireNonNull(tableDescriptionSupplier, "tableDescriptionSupplier is null"); } @@ -68,6 +71,7 @@ public Connector create(String catalogName, Map config, Connecto Bootstrap app = new Bootstrap( new JsonModule(), new KafkaConnectorModule(), + extension, binder -> { binder.bind(KafkaConnectorId.class).toInstance(new KafkaConnectorId(catalogName)); binder.bind(TypeManager.class).toInstance(context.getTypeManager()); diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaPlugin.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaPlugin.java index 284424a9a7fa4..5d983f61855b5 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaPlugin.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaPlugin.java @@ -18,6 +18,8 @@ import com.facebook.presto.spi.connector.ConnectorFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import com.google.inject.Module; +import com.google.inject.util.Modules; import java.util.Map; import java.util.Optional; @@ -31,8 +33,19 @@ public class KafkaPlugin implements Plugin { + private final Module extension; private Optional>> tableDescriptionSupplier = Optional.empty(); + public KafkaPlugin() + { + this(Modules.EMPTY_MODULE); + } + + public KafkaPlugin(Module extension) + { + this.extension = requireNonNull(extension, "extension is null"); + } + @VisibleForTesting public synchronized void setTableDescriptionSupplier(Supplier> tableDescriptionSupplier) { @@ -42,6 +55,6 @@ public synchronized void setTableDescriptionSupplier(Supplier getConnectorFactories() { - return ImmutableList.of(new KafkaConnectorFactory(tableDescriptionSupplier)); + return ImmutableList.of(new KafkaConnectorFactory(extension, tableDescriptionSupplier)); } }