Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,14 @@

import com.facebook.airlift.configuration.Config;
import com.facebook.presto.kafka.schema.file.FileTableDescriptionSupplier;
import com.facebook.presto.spi.HostAddress;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import com.facebook.presto.kafka.server.file.FileKafkaClusterMetadataSupplier;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;

import javax.validation.constraints.NotNull;

import java.util.List;

import static com.google.common.collect.Iterables.transform;

public class KafkaConnectorConfig
{
private static final int KAFKA_DEFAULT_PORT = 9092;

/**
* Seed nodes for Kafka cluster. At least one must exist.
*/
private List<HostAddress> nodes;

/**
* Timeout to connect to Kafka.
*/
Expand Down Expand Up @@ -66,6 +53,11 @@ public class KafkaConnectorConfig
*/
private String tableDescriptionSupplier = FileTableDescriptionSupplier.NAME;

/**
* The kafka cluster metadata supplier to use, default is FILE
*/
private String clusterMetadataSupplier = FileKafkaClusterMetadataSupplier.NAME;

@NotNull
public String getDefaultSchema()
{
Expand All @@ -79,18 +71,6 @@ public KafkaConnectorConfig setDefaultSchema(String defaultSchema)
return this;
}

public List<HostAddress> getNodes()
{
return nodes;
}

@Config("kafka.nodes")
public KafkaConnectorConfig setNodes(String nodes)
{
this.nodes = (nodes == null) ? null : parseNodes(nodes).asList();
return this;
}

Comment on lines -82 to -93
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanna double confirm if this would break the existing config. is there any config migration required for this change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing config can work as-is, no config migration is needed.
The reason is default cluster metadata supplier(FileKafkaClusterMetadataSupplier) will load FileKafkaClusterMetadataSupplierConfig which is the copy of the config that has been removed from this file.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, sounds good to me.

@MinDuration("1s")
public Duration getKafkaConnectTimeout()
{
Expand Down Expand Up @@ -141,26 +121,28 @@ public KafkaConnectorConfig setTableDescriptionSupplier(String tableDescriptionS
return this;
}

public boolean isHideInternalColumns()
@NotNull
public String getClusterMetadataSupplier()
{
return hideInternalColumns;
return clusterMetadataSupplier;
}

@Config("kafka.hide-internal-columns")
public KafkaConnectorConfig setHideInternalColumns(boolean hideInternalColumns)
@Config("kafka.cluster-metadata-supplier")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we document this new configuration? Thanks!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as for now, the only option for this configuration is the default option. do you recommend adding the document now or later when the new supplier is added?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, we can document it later I think. I remember you also made some other pluggable interface before. Maybe you can document these new configuration together later.

public KafkaConnectorConfig setClusterMetadataSupplier(String clusterMetadataSupplier)
{
this.hideInternalColumns = hideInternalColumns;
this.clusterMetadataSupplier = clusterMetadataSupplier;
return this;
}

public static ImmutableSet<HostAddress> parseNodes(String nodes)
public boolean isHideInternalColumns()
{
Splitter splitter = Splitter.on(',').omitEmptyStrings().trimResults();
return ImmutableSet.copyOf(transform(splitter.split(nodes), KafkaConnectorConfig::toHostAddress));
return hideInternalColumns;
}

private static HostAddress toHostAddress(String value)
@Config("kafka.hide-internal-columns")
public KafkaConnectorConfig setHideInternalColumns(boolean hideInternalColumns)
{
return HostAddress.fromString(value).withDefaultPort(KAFKA_DEFAULT_PORT);
this.hideInternalColumns = hideInternalColumns;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.facebook.presto.kafka.encoder.EncoderModule;
import com.facebook.presto.kafka.schema.file.FileTableDescriptionSupplier;
import com.facebook.presto.kafka.schema.file.FileTableDescriptionSupplierModule;
import com.facebook.presto.kafka.server.file.FileKafkaClusterMetadataSupplier;
import com.facebook.presto.kafka.server.file.FileKafkaClusterMetadataSupplierModule;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer;
import com.google.inject.Binder;
Expand All @@ -28,6 +30,8 @@

import javax.inject.Inject;

import java.util.function.Function;

import static com.facebook.airlift.configuration.ConditionalModule.installModuleIf;
import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
import static com.facebook.airlift.json.JsonBinder.jsonBinder;
Expand All @@ -46,15 +50,15 @@ public void setup(Binder binder)
{
binder.bind(KafkaConnector.class).in(Scopes.SINGLETON);

binder.bind(KafkaStaticServerset.class).in(Scopes.SINGLETON);
binder.bind(KafkaMetadata.class).in(Scopes.SINGLETON);
binder.bind(KafkaSplitManager.class).in(Scopes.SINGLETON);
binder.bind(KafkaRecordSetProvider.class).in(Scopes.SINGLETON);
binder.bind(KafkaPageSinkProvider.class).in(Scopes.SINGLETON);

binder.bind(KafkaConsumerManager.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(KafkaConnectorConfig.class);
bindTopicSchemaProviderModule(FileTableDescriptionSupplier.NAME, new FileTableDescriptionSupplierModule());
bindTopicSchemaProviderModule(FileTableDescriptionSupplier.NAME, new FileTableDescriptionSupplierModule(), KafkaConnectorConfig::getTableDescriptionSupplier);
bindTopicSchemaProviderModule(FileKafkaClusterMetadataSupplier.NAME, new FileKafkaClusterMetadataSupplierModule(), KafkaConnectorConfig::getClusterMetadataSupplier);

jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);
jsonCodecBinder(binder).bindJsonCodec(KafkaTopicDescription.class);
Expand Down Expand Up @@ -85,11 +89,11 @@ protected Type _deserialize(String value, DeserializationContext context)
}
}

public void bindTopicSchemaProviderModule(String name, Module module)
public void bindTopicSchemaProviderModule(String name, Module module, Function<KafkaConnectorConfig, String> configSupplier)
{
install(installModuleIf(
KafkaConnectorConfig.class,
kafkaConfig -> name.equalsIgnoreCase(kafkaConfig.getTableDescriptionSupplier()),
kafkaConfig -> name.equalsIgnoreCase(configSupplier.apply(kafkaConfig)),
module));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

import com.facebook.presto.common.Page;
import com.facebook.presto.kafka.encoder.RowEncoder;
import com.facebook.presto.kafka.server.KafkaClusterMetadataSupplier;
import com.facebook.presto.spi.ConnectorPageSink;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.PrestoException;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
Expand Down Expand Up @@ -47,18 +49,21 @@ public class KafkaPageSink
private final ErrorCountingCallback errorCounter;

public KafkaPageSink(
String schemaName,
String topicName,
List<KafkaColumnHandle> columns,
RowEncoder keyEncoder,
RowEncoder messageEncoder,
PlainTextKafkaProducerFactory producerFactory)
PlainTextKafkaProducerFactory producerFactory,
KafkaClusterMetadataSupplier supplier)
{
this.topicName = requireNonNull(topicName, "topicName is null");
this.columns = requireNonNull(ImmutableList.copyOf(columns), "columns is null");
this.keyEncoder = requireNonNull(keyEncoder, "keyEncoder is null");
this.messageEncoder = requireNonNull(messageEncoder, "messageEncoder is null");
requireNonNull(producerFactory, "producerFactory is null");
this.producer = producerFactory.create();
List<HostAddress> nodes = supplier.getNodes(schemaName);
this.producer = producerFactory.create(nodes);
this.errorCounter = new ErrorCountingCallback();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.kafka.encoder.DispatchingRowEncoderFactory;
import com.facebook.presto.kafka.encoder.EncoderColumnHandle;
import com.facebook.presto.kafka.encoder.RowEncoder;
import com.facebook.presto.kafka.server.KafkaClusterMetadataSupplier;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorPageSink;
Expand All @@ -42,12 +43,14 @@ public class KafkaPageSinkProvider
{
private final DispatchingRowEncoderFactory encoderFactory;
private final PlainTextKafkaProducerFactory producerFactory;
private final KafkaClusterMetadataSupplier kafkaClusterMetadataSupplier;

@Inject
public KafkaPageSinkProvider(DispatchingRowEncoderFactory encoderFactory, PlainTextKafkaProducerFactory producerFactory)
public KafkaPageSinkProvider(DispatchingRowEncoderFactory encoderFactory, PlainTextKafkaProducerFactory producerFactory, KafkaClusterMetadataSupplier kafkaClusterMetadataSupplier)
{
this.encoderFactory = requireNonNull(encoderFactory, "encoderFactory is null");
this.producerFactory = requireNonNull(producerFactory, "producerFactory is null");
this.kafkaClusterMetadataSupplier = requireNonNull(kafkaClusterMetadataSupplier, "kafkaClusterMetadataSupplier is null");
}

@Override
Expand Down Expand Up @@ -89,11 +92,13 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
messageColumns.build());

return new KafkaPageSink(
handle.getSchemaName(),
handle.getTopicName(),
handle.getColumns(),
keyEncoder,
messageEncoder,
producerFactory);
producerFactory,
kafkaClusterMetadataSupplier);
}

private Optional<String> getDataSchema(Optional<String> dataSchemaLocation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.facebook.presto.kafka;

import com.facebook.presto.kafka.server.KafkaClusterMetadataHelper;
import com.facebook.presto.kafka.server.KafkaClusterMetadataSupplier;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource;
Expand Down Expand Up @@ -61,20 +63,20 @@ public class KafkaSplitManager
{
private final String connectorId;
private final KafkaConsumerManager consumerManager;
private final KafkaStaticServerset servers;
private final KafkaClusterMetadataSupplier clusterMetadataSupplier;

@Inject
public KafkaSplitManager(
KafkaConnectorId connectorId,
KafkaConnectorConfig kafkaConnectorConfig,
KafkaStaticServerset servers,
KafkaClusterMetadataSupplier clusterMetadataSupplier,
KafkaConsumerManager consumerManager)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
this.consumerManager = requireNonNull(consumerManager, "consumerManager is null");

requireNonNull(kafkaConnectorConfig, "kafkaConfig is null");
this.servers = servers;
this.clusterMetadataSupplier = requireNonNull(clusterMetadataSupplier, "clusterMetadataSupplier is null");
}

@Override
Expand All @@ -86,9 +88,9 @@ public ConnectorSplitSource getSplits(
{
KafkaTableHandle kafkaTableHandle = convertLayout(layout).getTable();
try {
HostAddress node = servers.selectRandomServer();
String topic = kafkaTableHandle.getTopicName();
KafkaTableLayoutHandle layoutHandle = (KafkaTableLayoutHandle) layout;
HostAddress node = KafkaClusterMetadataHelper.selectRandom(clusterMetadataSupplier.getNodes(layoutHandle.getTable().getSchemaName()));

KafkaConsumer<ByteBuffer, ByteBuffer> consumer = consumerManager.createConsumer(Thread.currentThread().getName(), node);
List<PartitionInfo> partitions = consumer.partitionsFor(topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@

import com.facebook.presto.spi.HostAddress;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.ByteArraySerializer;

import javax.inject.Inject;

import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
Expand All @@ -35,24 +33,25 @@ public class PlainTextKafkaProducerFactory
private final Map<String, Object> properties;

@Inject
public PlainTextKafkaProducerFactory(KafkaConnectorConfig kafkaConfig)
public PlainTextKafkaProducerFactory()
{
requireNonNull(kafkaConfig, "kafkaConfig is null");
Set<HostAddress> nodes = ImmutableSet.copyOf(kafkaConfig.getNodes());
properties = ImmutableMap.<String, Object>builder()
.put(BOOTSTRAP_SERVERS_CONFIG, nodes.stream()
.map(HostAddress::toString)
.collect(joining(",")))
this.properties = ImmutableMap.<String, Object>builder()
.put(ACKS_CONFIG, "all")
.put(LINGER_MS_CONFIG, 5)
.build();
}

/**
* Creates a KafkaProducer with the properties set in the constructor.
* Creates a KafkaProducer with the properties set in the constructor and kafka bootstrap servers
*/
public KafkaProducer<byte[], byte[]> create()
public KafkaProducer<byte[], byte[]> create(List<HostAddress> bootstrapServers)
{
return new KafkaProducer<>(properties, new ByteArraySerializer(), new ByteArraySerializer());
Map<String, Object> propertiesWithBootstrapServers = ImmutableMap.<String, Object>builder()
.putAll(properties)
.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.stream()
.map(HostAddress::toString)
.collect(joining(",")))
.build();
return new KafkaProducer<>(propertiesWithBootstrapServers, new ByteArraySerializer(), new ByteArraySerializer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.kafka;
package com.facebook.presto.kafka.server;

import com.facebook.presto.spi.HostAddress;
import com.google.common.collect.ImmutableList;

import javax.inject.Inject;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class KafkaStaticServerset
public class KafkaClusterMetadataHelper
{
private final List<HostAddress> nodes;

@Inject
public KafkaStaticServerset(KafkaConnectorConfig config)
{
requireNonNull(config.getNodes(), "nodes is null");
checkArgument(!config.getNodes().isEmpty(), "nodes must specify at least one URI");
this.nodes = config.getNodes();
}

public HostAddress selectRandomServer()
private KafkaClusterMetadataHelper()
{
return selectRandom(this.nodes);
}

private static <T> T selectRandom(Iterable<T> iterable)
public static <T> T selectRandom(Iterable<T> iterable)
{
List<T> list = ImmutableList.copyOf(iterable);
return list.get(ThreadLocalRandom.current().nextInt(list.size()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.kafka.server;

import com.facebook.presto.spi.HostAddress;

import java.util.List;

/**
* This is mainly used to get Kafka cluster metadata such as broker list so that Kafka Connector can communicate with Kafka cluster
*/
public interface KafkaClusterMetadataSupplier
{
/**
* Gets kafka broker list for specified kafka cluster name
* @param clusterName the kafka cluster name
* @return kafka broker list
*/
List<HostAddress> getNodes(String clusterName);
}
Loading