diff --git a/pom.xml b/pom.xml
index 3550073153e0f..6c79af9e96779 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,6 +63,7 @@
6.2.1
1.9.17
2.1.1
+ 2.3.1
com.facebook.airlift
@@ -196,6 +203,13 @@
annotations
test
+
+
+ org.scala-lang
+ scala-library
+ ${scala.version}
+ test
+
diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorConfig.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorConfig.java
index 1333e708c4d17..2d01c2738134a 100644
--- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorConfig.java
+++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorConfig.java
@@ -17,15 +17,13 @@
import com.facebook.presto.spi.HostAddress;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
-import io.airlift.units.DataSize;
-import io.airlift.units.DataSize.Unit;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;
import javax.validation.constraints.NotNull;
-import javax.validation.constraints.Size;
import java.io.File;
+import java.util.List;
import java.util.Set;
import static com.google.common.collect.Iterables.transform;
@@ -37,18 +35,13 @@ public class KafkaConnectorConfig
/**
* Seed nodes for Kafka cluster. At least one must exist.
*/
- private Set nodes = ImmutableSet.of();
+ private List nodes;
/**
* Timeout to connect to Kafka.
*/
private Duration kafkaConnectTimeout = Duration.valueOf("10s");
- /**
- * Buffer size for connecting to Kafka.
- */
- private DataSize kafkaBufferSize = new DataSize(64, Unit.KILOBYTE);
-
/**
* The schema name to use in the connector.
*/
@@ -69,6 +62,16 @@ public class KafkaConnectorConfig
*/
private boolean hideInternalColumns = true;
+ /**
+ * Maximum number of records per poll()
+ */
+ private int maxPollRecords = 500;
+
+ /**
+ * Maximum number of bytes from one partition per poll()
+ */
+ private int maxPartitionFetchBytes = 1024 * 1024;
+
@NotNull
public File getTableDescriptionDir()
{
@@ -108,8 +111,7 @@ public KafkaConnectorConfig setDefaultSchema(String defaultSchema)
return this;
}
- @Size(min = 1)
- public Set getNodes()
+ public List getNodes()
{
return nodes;
}
@@ -117,7 +119,7 @@ public Set getNodes()
@Config("kafka.nodes")
public KafkaConnectorConfig setNodes(String nodes)
{
- this.nodes = (nodes == null) ? null : parseNodes(nodes);
+ this.nodes = (nodes == null) ? null : parseNodes(nodes).asList();
return this;
}
@@ -134,15 +136,27 @@ public KafkaConnectorConfig setKafkaConnectTimeout(String kafkaConnectTimeout)
return this;
}
- public DataSize getKafkaBufferSize()
+ public int getMaxPollRecords()
+ {
+ return maxPollRecords;
+ }
+
+ @Config("kafka.max-poll-records")
+ public KafkaConnectorConfig setMaxPollRecords(int maxPollRecords)
+ {
+ this.maxPollRecords = maxPollRecords;
+ return this;
+ }
+
+ public int getMaxPartitionFetchBytes()
{
- return kafkaBufferSize;
+ return maxPartitionFetchBytes;
}
- @Config("kafka.buffer-size")
- public KafkaConnectorConfig setKafkaBufferSize(String kafkaBufferSize)
+ @Config("kafka.max-partition-fetch-bytes")
+ public KafkaConnectorConfig setMaxPartitionFetchBytes(int maxPartitionFetchBytes)
{
- this.kafkaBufferSize = DataSize.valueOf(kafkaBufferSize);
+ this.maxPartitionFetchBytes = maxPartitionFetchBytes;
return this;
}
diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorModule.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorModule.java
index b49eb74c963e6..b809bb9fd2c3a 100644
--- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorModule.java
+++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorModule.java
@@ -13,13 +13,13 @@
*/
package com.facebook.presto.kafka;
+import com.facebook.airlift.configuration.AbstractConfigurationAwareModule;
import com.facebook.presto.decoder.DecoderModule;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.TypeManager;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer;
import com.google.inject.Binder;
-import com.google.inject.Module;
import com.google.inject.Scopes;
import javax.inject.Inject;
@@ -34,19 +34,19 @@
* Guice module for the Apache Kafka connector.
*/
public class KafkaConnectorModule
- implements Module
+ extends AbstractConfigurationAwareModule
{
@Override
- public void configure(Binder binder)
+ public void setup(Binder binder)
{
binder.bind(KafkaConnector.class).in(Scopes.SINGLETON);
+ binder.bind(KafkaStaticServerset.class).in(Scopes.SINGLETON);
binder.bind(KafkaMetadata.class).in(Scopes.SINGLETON);
binder.bind(KafkaSplitManager.class).in(Scopes.SINGLETON);
binder.bind(KafkaRecordSetProvider.class).in(Scopes.SINGLETON);
- binder.bind(KafkaSimpleConsumerManager.class).in(Scopes.SINGLETON);
-
+ binder.bind(KafkaConsumerManager.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(KafkaConnectorConfig.class);
jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);
diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConsumerManager.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConsumerManager.java
new file mode 100644
index 0000000000000..c4c5bae888914
--- /dev/null
+++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConsumerManager.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.kafka;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.spi.HostAddress;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteBufferDeserializer;
+
+import javax.inject.Inject;
+
+import java.nio.ByteBuffer;
+import java.util.Properties;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG;
+
+/**
+ * Manages connections to the Kafka nodes. A worker may connect to multiple Kafka nodes depending on partitions
+ * it needs to process.
+ */
+public class KafkaConsumerManager
+{
+ private static final Logger log = Logger.get(KafkaConsumerManager.class);
+ private final int maxPartitionFetchBytes;
+ private final int maxPollRecords;
+
+ @Inject
+ public KafkaConsumerManager(KafkaConnectorConfig kafkaConnectorConfig)
+ {
+ requireNonNull(kafkaConnectorConfig, "kafkaConfig is null");
+ this.maxPartitionFetchBytes = kafkaConnectorConfig.getMaxPartitionFetchBytes();
+ this.maxPollRecords = kafkaConnectorConfig.getMaxPollRecords();
+ }
+
+ KafkaConsumer createConsumer(String threadName, HostAddress hostAddress)
+ {
+ final Properties properties = new Properties();
+ properties.put(BOOTSTRAP_SERVERS_CONFIG, hostAddress.toString());
+ properties.put(GROUP_ID_CONFIG, threadName);
+ properties.put(MAX_POLL_RECORDS_CONFIG, Integer.toString(maxPollRecords));
+ properties.put(MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
+ properties.put(CLIENT_ID_CONFIG, String.format("%s-%s", threadName, hostAddress.toString()));
+ properties.put(ENABLE_AUTO_COMMIT_CONFIG, false);
+
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(KafkaPlugin.class.getClassLoader())) {
+ log.debug("Creating KafkaConsumer for thread %d broker %s", threadName, hostAddress.toString());
+ return new KafkaConsumer<>(properties, new ByteBufferDeserializer(), new ByteBufferDeserializer());
+ }
+ }
+}
diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaErrorCode.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaErrorCode.java
index 9338b98492116..1a27ecdd058ef 100644
--- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaErrorCode.java
+++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaErrorCode.java
@@ -25,7 +25,9 @@
public enum KafkaErrorCode
implements ErrorCodeSupplier
{
- KAFKA_SPLIT_ERROR(0, EXTERNAL);
+ KAFKA_SPLIT_ERROR(0, EXTERNAL),
+
+ KAFKA_CONSUMER_ERROR(1, EXTERNAL);
private final ErrorCode errorCode;
diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaInternalFieldDescription.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaInternalFieldDescription.java
index bb5eec1e5856f..fe3530b66fbb8 100644
--- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaInternalFieldDescription.java
+++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaInternalFieldDescription.java
@@ -46,21 +46,6 @@ public enum KafkaInternalFieldDescription
*/
PARTITION_OFFSET_FIELD("_partition_offset", BigintType.BIGINT, "Offset for the message within the partition"),
- /**
- * _segment_start - Kafka start offset for the segment which contains the current message. This is per-partition.
- */
- SEGMENT_START_FIELD("_segment_start", BigintType.BIGINT, "Segment start offset"),
-
- /**
- * _segment_end - Kafka end offset for the segment which contains the current message. This is per-partition. The end offset is the first offset that is *not* in the segment.
- */
- SEGMENT_END_FIELD("_segment_end", BigintType.BIGINT, "Segment end offset"),
-
- /**
- * _segment_count - Running count of messages in a segment.
- */
- SEGMENT_COUNT_FIELD("_segment_count", BigintType.BIGINT, "Running message count per segment"),
-
/**
* _message_corrupt - True if the row converter could not read the a message. May be null if the row converter does not set a value (e.g. the dummy row converter does not).
*/
@@ -89,7 +74,12 @@ public enum KafkaInternalFieldDescription
/**
* _key_length - length in bytes of the key.
*/
- KEY_LENGTH_FIELD("_key_length", BigintType.BIGINT, "Total number of key bytes");
+ KEY_LENGTH_FIELD("_key_length", BigintType.BIGINT, "Total number of key bytes"),
+
+ /**
+ * _timestamp - offset timestamp, used to narrow scan range
+ */
+ OFFSET_TIMESTAMP_FIELD("_timestamp", BigintType.BIGINT, "Offset Timestamp");
private static final Map BY_COLUMN_NAME =
stream(KafkaInternalFieldDescription.values())
diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaMetadata.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaMetadata.java
index 05f4379d60f8d..12144836297b4 100644
--- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaMetadata.java
+++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaMetadata.java
@@ -27,6 +27,9 @@
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.Marker;
+import com.facebook.presto.spi.predicate.Range;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -204,7 +207,27 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
public List getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint constraint, Optional> desiredColumns)
{
KafkaTableHandle handle = convertTableHandle(table);
- ConnectorTableLayout layout = new ConnectorTableLayout(new KafkaTableLayoutHandle(handle));
+ long startTimestamp = 0;
+ long endTimestamp = 0;
+ Optional