diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorConfig.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorConfig.java index e377b460cf234..6a1c2e07f7409 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorConfig.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorConfig.java @@ -15,27 +15,14 @@ import com.facebook.airlift.configuration.Config; import com.facebook.presto.kafka.schema.file.FileTableDescriptionSupplier; -import com.facebook.presto.spi.HostAddress; -import com.google.common.base.Splitter; -import com.google.common.collect.ImmutableSet; +import com.facebook.presto.kafka.server.file.FileKafkaClusterMetadataSupplier; import io.airlift.units.Duration; import io.airlift.units.MinDuration; import javax.validation.constraints.NotNull; -import java.util.List; - -import static com.google.common.collect.Iterables.transform; - public class KafkaConnectorConfig { - private static final int KAFKA_DEFAULT_PORT = 9092; - - /** - * Seed nodes for Kafka cluster. At least one must exist. - */ - private List nodes; - /** * Timeout to connect to Kafka. */ @@ -66,6 +53,11 @@ public class KafkaConnectorConfig */ private String tableDescriptionSupplier = FileTableDescriptionSupplier.NAME; + /** + * The kafka cluster metadata supplier to use, default is FILE + */ + private String clusterMetadataSupplier = FileKafkaClusterMetadataSupplier.NAME; + @NotNull public String getDefaultSchema() { @@ -79,18 +71,6 @@ public KafkaConnectorConfig setDefaultSchema(String defaultSchema) return this; } - public List getNodes() - { - return nodes; - } - - @Config("kafka.nodes") - public KafkaConnectorConfig setNodes(String nodes) - { - this.nodes = (nodes == null) ? null : parseNodes(nodes).asList(); - return this; - } - @MinDuration("1s") public Duration getKafkaConnectTimeout() { @@ -141,26 +121,28 @@ public KafkaConnectorConfig setTableDescriptionSupplier(String tableDescriptionS return this; } - public boolean isHideInternalColumns() + @NotNull + public String getClusterMetadataSupplier() { - return hideInternalColumns; + return clusterMetadataSupplier; } - @Config("kafka.hide-internal-columns") - public KafkaConnectorConfig setHideInternalColumns(boolean hideInternalColumns) + @Config("kafka.cluster-metadata-supplier") + public KafkaConnectorConfig setClusterMetadataSupplier(String clusterMetadataSupplier) { - this.hideInternalColumns = hideInternalColumns; + this.clusterMetadataSupplier = clusterMetadataSupplier; return this; } - public static ImmutableSet parseNodes(String nodes) + public boolean isHideInternalColumns() { - Splitter splitter = Splitter.on(',').omitEmptyStrings().trimResults(); - return ImmutableSet.copyOf(transform(splitter.split(nodes), KafkaConnectorConfig::toHostAddress)); + return hideInternalColumns; } - private static HostAddress toHostAddress(String value) + @Config("kafka.hide-internal-columns") + public KafkaConnectorConfig setHideInternalColumns(boolean hideInternalColumns) { - return HostAddress.fromString(value).withDefaultPort(KAFKA_DEFAULT_PORT); + this.hideInternalColumns = hideInternalColumns; + return this; } } diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorModule.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorModule.java index a2fc7a21d5e95..210965f788e88 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorModule.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorModule.java @@ -20,6 +20,8 @@ import com.facebook.presto.kafka.encoder.EncoderModule; import com.facebook.presto.kafka.schema.file.FileTableDescriptionSupplier; import com.facebook.presto.kafka.schema.file.FileTableDescriptionSupplierModule; +import com.facebook.presto.kafka.server.file.FileKafkaClusterMetadataSupplier; +import com.facebook.presto.kafka.server.file.FileKafkaClusterMetadataSupplierModule; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer; import com.google.inject.Binder; @@ -28,6 +30,8 @@ import javax.inject.Inject; +import java.util.function.Function; + import static com.facebook.airlift.configuration.ConditionalModule.installModuleIf; import static com.facebook.airlift.configuration.ConfigBinder.configBinder; import static com.facebook.airlift.json.JsonBinder.jsonBinder; @@ -46,7 +50,6 @@ public void setup(Binder binder) { binder.bind(KafkaConnector.class).in(Scopes.SINGLETON); - binder.bind(KafkaStaticServerset.class).in(Scopes.SINGLETON); binder.bind(KafkaMetadata.class).in(Scopes.SINGLETON); binder.bind(KafkaSplitManager.class).in(Scopes.SINGLETON); binder.bind(KafkaRecordSetProvider.class).in(Scopes.SINGLETON); @@ -54,7 +57,8 @@ public void setup(Binder binder) binder.bind(KafkaConsumerManager.class).in(Scopes.SINGLETON); configBinder(binder).bindConfig(KafkaConnectorConfig.class); - bindTopicSchemaProviderModule(FileTableDescriptionSupplier.NAME, new FileTableDescriptionSupplierModule()); + bindTopicSchemaProviderModule(FileTableDescriptionSupplier.NAME, new FileTableDescriptionSupplierModule(), KafkaConnectorConfig::getTableDescriptionSupplier); + bindTopicSchemaProviderModule(FileKafkaClusterMetadataSupplier.NAME, new FileKafkaClusterMetadataSupplierModule(), KafkaConnectorConfig::getClusterMetadataSupplier); jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class); jsonCodecBinder(binder).bindJsonCodec(KafkaTopicDescription.class); @@ -85,11 +89,11 @@ protected Type _deserialize(String value, DeserializationContext context) } } - public void bindTopicSchemaProviderModule(String name, Module module) + public void bindTopicSchemaProviderModule(String name, Module module, Function configSupplier) { install(installModuleIf( KafkaConnectorConfig.class, - kafkaConfig -> name.equalsIgnoreCase(kafkaConfig.getTableDescriptionSupplier()), + kafkaConfig -> name.equalsIgnoreCase(configSupplier.apply(kafkaConfig)), module)); } } diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaPageSink.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaPageSink.java index 04e4176fe7c9b..fc61694a510cd 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaPageSink.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaPageSink.java @@ -15,7 +15,9 @@ import com.facebook.presto.common.Page; import com.facebook.presto.kafka.encoder.RowEncoder; +import com.facebook.presto.kafka.server.KafkaClusterMetadataSupplier; import com.facebook.presto.spi.ConnectorPageSink; +import com.facebook.presto.spi.HostAddress; import com.facebook.presto.spi.PrestoException; import com.google.common.collect.ImmutableList; import io.airlift.slice.Slice; @@ -47,18 +49,21 @@ public class KafkaPageSink private final ErrorCountingCallback errorCounter; public KafkaPageSink( + String schemaName, String topicName, List columns, RowEncoder keyEncoder, RowEncoder messageEncoder, - PlainTextKafkaProducerFactory producerFactory) + PlainTextKafkaProducerFactory producerFactory, + KafkaClusterMetadataSupplier supplier) { this.topicName = requireNonNull(topicName, "topicName is null"); this.columns = requireNonNull(ImmutableList.copyOf(columns), "columns is null"); this.keyEncoder = requireNonNull(keyEncoder, "keyEncoder is null"); this.messageEncoder = requireNonNull(messageEncoder, "messageEncoder is null"); requireNonNull(producerFactory, "producerFactory is null"); - this.producer = producerFactory.create(); + List nodes = supplier.getNodes(schemaName); + this.producer = producerFactory.create(nodes); this.errorCounter = new ErrorCountingCallback(); } diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaPageSinkProvider.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaPageSinkProvider.java index 4663bbcf60697..924cf7cbd2ff1 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaPageSinkProvider.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaPageSinkProvider.java @@ -16,6 +16,7 @@ import com.facebook.presto.kafka.encoder.DispatchingRowEncoderFactory; import com.facebook.presto.kafka.encoder.EncoderColumnHandle; import com.facebook.presto.kafka.encoder.RowEncoder; +import com.facebook.presto.kafka.server.KafkaClusterMetadataSupplier; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorOutputTableHandle; import com.facebook.presto.spi.ConnectorPageSink; @@ -42,12 +43,14 @@ public class KafkaPageSinkProvider { private final DispatchingRowEncoderFactory encoderFactory; private final PlainTextKafkaProducerFactory producerFactory; + private final KafkaClusterMetadataSupplier kafkaClusterMetadataSupplier; @Inject - public KafkaPageSinkProvider(DispatchingRowEncoderFactory encoderFactory, PlainTextKafkaProducerFactory producerFactory) + public KafkaPageSinkProvider(DispatchingRowEncoderFactory encoderFactory, PlainTextKafkaProducerFactory producerFactory, KafkaClusterMetadataSupplier kafkaClusterMetadataSupplier) { this.encoderFactory = requireNonNull(encoderFactory, "encoderFactory is null"); this.producerFactory = requireNonNull(producerFactory, "producerFactory is null"); + this.kafkaClusterMetadataSupplier = requireNonNull(kafkaClusterMetadataSupplier, "kafkaClusterMetadataSupplier is null"); } @Override @@ -89,11 +92,13 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa messageColumns.build()); return new KafkaPageSink( + handle.getSchemaName(), handle.getTopicName(), handle.getColumns(), keyEncoder, messageEncoder, - producerFactory); + producerFactory, + kafkaClusterMetadataSupplier); } private Optional getDataSchema(Optional dataSchemaLocation) diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSplitManager.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSplitManager.java index 9bf801205a9d5..aaa5493d8179c 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSplitManager.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSplitManager.java @@ -13,6 +13,8 @@ */ package com.facebook.presto.kafka; +import com.facebook.presto.kafka.server.KafkaClusterMetadataHelper; +import com.facebook.presto.kafka.server.KafkaClusterMetadataSupplier; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.ConnectorSplitSource; @@ -61,20 +63,20 @@ public class KafkaSplitManager { private final String connectorId; private final KafkaConsumerManager consumerManager; - private final KafkaStaticServerset servers; + private final KafkaClusterMetadataSupplier clusterMetadataSupplier; @Inject public KafkaSplitManager( KafkaConnectorId connectorId, KafkaConnectorConfig kafkaConnectorConfig, - KafkaStaticServerset servers, + KafkaClusterMetadataSupplier clusterMetadataSupplier, KafkaConsumerManager consumerManager) { this.connectorId = requireNonNull(connectorId, "connectorId is null").toString(); this.consumerManager = requireNonNull(consumerManager, "consumerManager is null"); requireNonNull(kafkaConnectorConfig, "kafkaConfig is null"); - this.servers = servers; + this.clusterMetadataSupplier = requireNonNull(clusterMetadataSupplier, "clusterMetadataSupplier is null"); } @Override @@ -86,9 +88,9 @@ public ConnectorSplitSource getSplits( { KafkaTableHandle kafkaTableHandle = convertLayout(layout).getTable(); try { - HostAddress node = servers.selectRandomServer(); String topic = kafkaTableHandle.getTopicName(); KafkaTableLayoutHandle layoutHandle = (KafkaTableLayoutHandle) layout; + HostAddress node = KafkaClusterMetadataHelper.selectRandom(clusterMetadataSupplier.getNodes(layoutHandle.getTable().getSchemaName())); KafkaConsumer consumer = consumerManager.createConsumer(Thread.currentThread().getName(), node); List partitions = consumer.partitionsFor(topic); diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/PlainTextKafkaProducerFactory.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/PlainTextKafkaProducerFactory.java index 79f13f4b8fd3e..c24bee7a88a3f 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/PlainTextKafkaProducerFactory.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/PlainTextKafkaProducerFactory.java @@ -15,16 +15,14 @@ import com.facebook.presto.spi.HostAddress; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.serialization.ByteArraySerializer; import javax.inject.Inject; +import java.util.List; import java.util.Map; -import java.util.Set; -import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.joining; import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG; @@ -35,24 +33,25 @@ public class PlainTextKafkaProducerFactory private final Map properties; @Inject - public PlainTextKafkaProducerFactory(KafkaConnectorConfig kafkaConfig) + public PlainTextKafkaProducerFactory() { - requireNonNull(kafkaConfig, "kafkaConfig is null"); - Set nodes = ImmutableSet.copyOf(kafkaConfig.getNodes()); - properties = ImmutableMap.builder() - .put(BOOTSTRAP_SERVERS_CONFIG, nodes.stream() - .map(HostAddress::toString) - .collect(joining(","))) + this.properties = ImmutableMap.builder() .put(ACKS_CONFIG, "all") .put(LINGER_MS_CONFIG, 5) .build(); } /** - * Creates a KafkaProducer with the properties set in the constructor. + * Creates a KafkaProducer with the properties set in the constructor and kafka bootstrap servers */ - public KafkaProducer create() + public KafkaProducer create(List bootstrapServers) { - return new KafkaProducer<>(properties, new ByteArraySerializer(), new ByteArraySerializer()); + Map propertiesWithBootstrapServers = ImmutableMap.builder() + .putAll(properties) + .put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.stream() + .map(HostAddress::toString) + .collect(joining(","))) + .build(); + return new KafkaProducer<>(propertiesWithBootstrapServers, new ByteArraySerializer(), new ByteArraySerializer()); } } diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaStaticServerset.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/server/KafkaClusterMetadataHelper.java similarity index 53% rename from presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaStaticServerset.java rename to presto-kafka/src/main/java/com/facebook/presto/kafka/server/KafkaClusterMetadataHelper.java index d419345385fe3..4814822197067 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaStaticServerset.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/server/KafkaClusterMetadataHelper.java @@ -11,37 +11,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.facebook.presto.kafka; +package com.facebook.presto.kafka.server; -import com.facebook.presto.spi.HostAddress; import com.google.common.collect.ImmutableList; -import javax.inject.Inject; - import java.util.List; import java.util.concurrent.ThreadLocalRandom; -import static com.google.common.base.Preconditions.checkArgument; -import static java.util.Objects.requireNonNull; - -public class KafkaStaticServerset +public class KafkaClusterMetadataHelper { - private final List nodes; - - @Inject - public KafkaStaticServerset(KafkaConnectorConfig config) - { - requireNonNull(config.getNodes(), "nodes is null"); - checkArgument(!config.getNodes().isEmpty(), "nodes must specify at least one URI"); - this.nodes = config.getNodes(); - } - - public HostAddress selectRandomServer() + private KafkaClusterMetadataHelper() { - return selectRandom(this.nodes); } - private static T selectRandom(Iterable iterable) + public static T selectRandom(Iterable iterable) { List list = ImmutableList.copyOf(iterable); return list.get(ThreadLocalRandom.current().nextInt(list.size())); diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/server/KafkaClusterMetadataSupplier.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/server/KafkaClusterMetadataSupplier.java new file mode 100644 index 0000000000000..22c3218aa3db9 --- /dev/null +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/server/KafkaClusterMetadataSupplier.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.kafka.server; + +import com.facebook.presto.spi.HostAddress; + +import java.util.List; + +/** + * This is mainly used to get Kafka cluster metadata such as broker list so that Kafka Connector can communicate with Kafka cluster + */ +public interface KafkaClusterMetadataSupplier +{ + /** + * Gets kafka broker list for specified kafka cluster name + * @param clusterName the kafka cluster name + * @return kafka broker list + */ + List getNodes(String clusterName); +} diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/server/file/FileKafkaClusterMetadataSupplier.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/server/file/FileKafkaClusterMetadataSupplier.java new file mode 100644 index 0000000000000..a6e979d14428e --- /dev/null +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/server/file/FileKafkaClusterMetadataSupplier.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.kafka.server.file; + +import com.facebook.presto.kafka.server.KafkaClusterMetadataSupplier; +import com.facebook.presto.spi.HostAddress; + +import javax.inject.Inject; + +import java.util.List; + +/** + * Gets cluster metadata data from static configuration file + */ +public class FileKafkaClusterMetadataSupplier + implements KafkaClusterMetadataSupplier +{ + public static final String NAME = "file"; + private final FileKafkaClusterMetadataSupplierConfig config; + + @Inject + public FileKafkaClusterMetadataSupplier(FileKafkaClusterMetadataSupplierConfig config) + { + this.config = config; + } + + @Override + public List getNodes(String clusterName) + { + return config.getNodes(); + } +} diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/server/file/FileKafkaClusterMetadataSupplierConfig.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/server/file/FileKafkaClusterMetadataSupplierConfig.java new file mode 100644 index 0000000000000..8eabfd4433697 --- /dev/null +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/server/file/FileKafkaClusterMetadataSupplierConfig.java @@ -0,0 +1,56 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.kafka.server.file; + +import com.facebook.airlift.configuration.Config; +import com.facebook.presto.spi.HostAddress; +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableSet; + +import java.util.List; + +import static com.google.common.collect.Iterables.transform; + +public class FileKafkaClusterMetadataSupplierConfig +{ + private static final int KAFKA_DEFAULT_PORT = 9092; + + /** + * Seed nodes for Kafka cluster. At least one must exist. + */ + private List nodes; + + public List getNodes() + { + return nodes; + } + + @Config("kafka.nodes") + public FileKafkaClusterMetadataSupplierConfig setNodes(String nodes) + { + this.nodes = (nodes == null) ? null : parseNodes(nodes).asList(); + return this; + } + + public static ImmutableSet parseNodes(String nodes) + { + Splitter splitter = Splitter.on(',').omitEmptyStrings().trimResults(); + return ImmutableSet.copyOf(transform(splitter.split(nodes), FileKafkaClusterMetadataSupplierConfig::toHostAddress)); + } + + private static HostAddress toHostAddress(String value) + { + return HostAddress.fromString(value).withDefaultPort(KAFKA_DEFAULT_PORT); + } +} diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/server/file/FileKafkaClusterMetadataSupplierModule.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/server/file/FileKafkaClusterMetadataSupplierModule.java new file mode 100644 index 0000000000000..1f6145167ba34 --- /dev/null +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/server/file/FileKafkaClusterMetadataSupplierModule.java @@ -0,0 +1,32 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.kafka.server.file; + +import com.facebook.airlift.configuration.AbstractConfigurationAwareModule; +import com.facebook.presto.kafka.server.KafkaClusterMetadataSupplier; +import com.google.inject.Binder; +import com.google.inject.Scopes; + +import static com.facebook.airlift.configuration.ConfigBinder.configBinder; + +public class FileKafkaClusterMetadataSupplierModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + configBinder(binder).bindConfig(FileKafkaClusterMetadataSupplierConfig.class); + binder.bind(KafkaClusterMetadataSupplier.class).to(FileKafkaClusterMetadataSupplier.class).in(Scopes.SINGLETON); + } +} diff --git a/presto-kafka/src/test/java/com/facebook/presto/kafka/TestKafkaConnectorConfig.java b/presto-kafka/src/test/java/com/facebook/presto/kafka/TestKafkaConnectorConfig.java index 79ce1b30507f4..248a8898d1a57 100644 --- a/presto-kafka/src/test/java/com/facebook/presto/kafka/TestKafkaConnectorConfig.java +++ b/presto-kafka/src/test/java/com/facebook/presto/kafka/TestKafkaConnectorConfig.java @@ -26,9 +26,9 @@ public class TestKafkaConnectorConfig public void testDefaults() { ConfigAssertions.assertRecordedDefaults(ConfigAssertions.recordDefaults(KafkaConnectorConfig.class) - .setNodes(null) .setKafkaConnectTimeout("10s") .setDefaultSchema("default") + .setClusterMetadataSupplier(FileTableDescriptionSupplier.NAME) .setTableDescriptionSupplier(FileTableDescriptionSupplier.NAME) .setHideInternalColumns(true) .setMaxPartitionFetchBytes(1048576) @@ -40,8 +40,8 @@ public void testExplicitPropertyMappings() { Map properties = new ImmutableMap.Builder() .put("kafka.table-description-supplier", "test") + .put("kafka.cluster-metadata-supplier", "test") .put("kafka.default-schema", "kafka") - .put("kafka.nodes", "localhost:12345,localhost:23456") .put("kafka.connect-timeout", "1h") .put("kafka.hide-internal-columns", "false") .put("kafka.max-partition-fetch-bytes", "1024") @@ -50,8 +50,8 @@ public void testExplicitPropertyMappings() KafkaConnectorConfig expected = new KafkaConnectorConfig() .setTableDescriptionSupplier("test") + .setClusterMetadataSupplier("test") .setDefaultSchema("kafka") - .setNodes("localhost:12345, localhost:23456") .setKafkaConnectTimeout("1h") .setHideInternalColumns(false) .setMaxPartitionFetchBytes(1024) diff --git a/presto-kafka/src/test/java/com/facebook/presto/kafka/server/file/TestFileKafkaClusterMetadataSupplierConfig.java b/presto-kafka/src/test/java/com/facebook/presto/kafka/server/file/TestFileKafkaClusterMetadataSupplierConfig.java new file mode 100644 index 0000000000000..1c6d05f9ba56a --- /dev/null +++ b/presto-kafka/src/test/java/com/facebook/presto/kafka/server/file/TestFileKafkaClusterMetadataSupplierConfig.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.kafka.server.file; + +import com.facebook.airlift.configuration.testing.ConfigAssertions; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.Map; + +public class TestFileKafkaClusterMetadataSupplierConfig +{ + @Test + public void testDefaults() + { + ConfigAssertions.assertRecordedDefaults(ConfigAssertions.recordDefaults(FileKafkaClusterMetadataSupplierConfig.class) + .setNodes(null)); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = new ImmutableMap.Builder() + .put("kafka.nodes", "localhost:12345,localhost:23456") + .build(); + + FileKafkaClusterMetadataSupplierConfig expected = new FileKafkaClusterMetadataSupplierConfig() + .setNodes("localhost:12345,localhost:23456"); + ConfigAssertions.assertFullMapping(properties, expected); + } +} diff --git a/presto-kafka/src/test/java/com/facebook/presto/kafka/util/TestUtils.java b/presto-kafka/src/test/java/com/facebook/presto/kafka/util/TestUtils.java index 945a8a2cebd50..96c49541f7e1b 100644 --- a/presto-kafka/src/test/java/com/facebook/presto/kafka/util/TestUtils.java +++ b/presto-kafka/src/test/java/com/facebook/presto/kafka/util/TestUtils.java @@ -20,6 +20,9 @@ import com.facebook.presto.kafka.KafkaTopicDescription; import com.facebook.presto.kafka.schema.MapBasedTableDescriptionSupplier; import com.facebook.presto.kafka.schema.TableDescriptionSupplier; +import com.facebook.presto.kafka.server.KafkaClusterMetadataSupplier; +import com.facebook.presto.kafka.server.file.FileKafkaClusterMetadataSupplier; +import com.facebook.presto.kafka.server.file.FileKafkaClusterMetadataSupplierConfig; import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.testing.QueryRunner; import com.facebook.presto.tests.TestingPrestoClient; @@ -63,17 +66,24 @@ public static Properties toProperties(Map map) public static void installKafkaPlugin(EmbeddedKafka embeddedKafka, QueryRunner queryRunner, Map topicDescriptions) { + FileKafkaClusterMetadataSupplierConfig clusterMetadataSupplierConfig = new FileKafkaClusterMetadataSupplierConfig(); + clusterMetadataSupplierConfig.setNodes(embeddedKafka.getConnectString()); KafkaPlugin kafkaPlugin = new KafkaPlugin(combine( installModuleIf( KafkaConnectorConfig.class, kafkaConfig -> kafkaConfig.getTableDescriptionSupplier().equalsIgnoreCase(TEST), binder -> binder.bind(TableDescriptionSupplier.class) - .toInstance(new MapBasedTableDescriptionSupplier(topicDescriptions))))); + .toInstance(new MapBasedTableDescriptionSupplier(topicDescriptions))), + installModuleIf( + KafkaConnectorConfig.class, + kafkaConfig -> kafkaConfig.getClusterMetadataSupplier().equalsIgnoreCase(TEST), + binder -> binder.bind(KafkaClusterMetadataSupplier.class) + .toInstance(new FileKafkaClusterMetadataSupplier(clusterMetadataSupplierConfig))))); queryRunner.installPlugin(kafkaPlugin); Map kafkaConfig = ImmutableMap.of( - "kafka.nodes", embeddedKafka.getConnectString(), + "kafka.cluster-metadata-supplier", TEST, "kafka.table-description-supplier", TEST, "kafka.connect-timeout", "120s", "kafka.default-schema", "default");