From 1dd8354968580bf75a61cbf08a96eefc7208e78d Mon Sep 17 00:00:00 2001 From: Da Cheng Date: Fri, 15 Nov 2019 14:41:19 -0800 Subject: [PATCH] Upgrade Kafka Connector to 2.3.1 --- pom.xml | 63 +++++++- .../main/sphinx/connector/kafka-tutorial.rst | 9 -- .../src/main/sphinx/connector/kafka.rst | 49 +++--- presto-kafka/pom.xml | 24 ++- .../presto/kafka/KafkaConnectorConfig.java | 48 ++++-- .../presto/kafka/KafkaConnectorModule.java | 10 +- .../presto/kafka/KafkaConsumerManager.java | 68 ++++++++ .../facebook/presto/kafka/KafkaErrorCode.java | 4 +- .../kafka/KafkaInternalFieldDescription.java | 22 +-- .../facebook/presto/kafka/KafkaMetadata.java | 25 ++- .../facebook/presto/kafka/KafkaRecordSet.java | 74 ++++----- .../presto/kafka/KafkaRecordSetProvider.java | 6 +- .../kafka/KafkaSimpleConsumerManager.java | 92 ----------- .../com/facebook/presto/kafka/KafkaSplit.java | 7 +- .../presto/kafka/KafkaSplitManager.java | 149 ++++++++---------- .../presto/kafka/KafkaStaticServerset.java | 49 ++++++ .../presto/kafka/KafkaTableLayoutHandle.java | 27 +++- .../kafka/TestKafkaConnectorConfig.java | 15 +- .../presto/kafka/TestManySegments.java | 117 -------------- .../kafka/TestMinimalFunctionality.java | 15 +- .../presto/kafka/util/EmbeddedKafka.java | 37 ++--- .../presto/kafka/util/JsonEncoder.java | 9 +- .../presto/kafka/util/KafkaLoader.java | 10 +- .../presto/kafka/util/NumberEncoder.java | 37 ----- .../presto/kafka/util/NumberPartitioner.java | 38 ----- .../facebook/presto/kafka/util/TestUtils.java | 4 +- presto-product-tests/pom.xml | 12 ++ 27 files changed, 479 insertions(+), 541 deletions(-) create mode 100644 presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConsumerManager.java delete mode 100644 presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSimpleConsumerManager.java create mode 100644 presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaStaticServerset.java delete mode 100644 presto-kafka/src/test/java/com/facebook/presto/kafka/TestManySegments.java delete mode 100644 presto-kafka/src/test/java/com/facebook/presto/kafka/util/NumberEncoder.java delete mode 100644 presto-kafka/src/test/java/com/facebook/presto/kafka/util/NumberPartitioner.java diff --git a/pom.xml b/pom.xml index 3550073153e0f..6c79af9e96779 100644 --- a/pom.xml +++ b/pom.xml @@ -63,6 +63,7 @@ 6.2.1 1.9.17 2.1.1 + 2.3.1 com.facebook.airlift @@ -196,6 +203,13 @@ annotations test + + + org.scala-lang + scala-library + ${scala.version} + test + diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorConfig.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorConfig.java index 1333e708c4d17..2d01c2738134a 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorConfig.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorConfig.java @@ -17,15 +17,13 @@ import com.facebook.presto.spi.HostAddress; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableSet; -import io.airlift.units.DataSize; -import io.airlift.units.DataSize.Unit; import io.airlift.units.Duration; import io.airlift.units.MinDuration; import javax.validation.constraints.NotNull; -import javax.validation.constraints.Size; import java.io.File; +import java.util.List; import java.util.Set; import static com.google.common.collect.Iterables.transform; @@ -37,18 +35,13 @@ public class KafkaConnectorConfig /** * Seed nodes for Kafka cluster. At least one must exist. */ - private Set nodes = ImmutableSet.of(); + private List nodes; /** * Timeout to connect to Kafka. */ private Duration kafkaConnectTimeout = Duration.valueOf("10s"); - /** - * Buffer size for connecting to Kafka. - */ - private DataSize kafkaBufferSize = new DataSize(64, Unit.KILOBYTE); - /** * The schema name to use in the connector. */ @@ -69,6 +62,16 @@ public class KafkaConnectorConfig */ private boolean hideInternalColumns = true; + /** + * Maximum number of records per poll() + */ + private int maxPollRecords = 500; + + /** + * Maximum number of bytes from one partition per poll() + */ + private int maxPartitionFetchBytes = 1024 * 1024; + @NotNull public File getTableDescriptionDir() { @@ -108,8 +111,7 @@ public KafkaConnectorConfig setDefaultSchema(String defaultSchema) return this; } - @Size(min = 1) - public Set getNodes() + public List getNodes() { return nodes; } @@ -117,7 +119,7 @@ public Set getNodes() @Config("kafka.nodes") public KafkaConnectorConfig setNodes(String nodes) { - this.nodes = (nodes == null) ? null : parseNodes(nodes); + this.nodes = (nodes == null) ? null : parseNodes(nodes).asList(); return this; } @@ -134,15 +136,27 @@ public KafkaConnectorConfig setKafkaConnectTimeout(String kafkaConnectTimeout) return this; } - public DataSize getKafkaBufferSize() + public int getMaxPollRecords() + { + return maxPollRecords; + } + + @Config("kafka.max-poll-records") + public KafkaConnectorConfig setMaxPollRecords(int maxPollRecords) + { + this.maxPollRecords = maxPollRecords; + return this; + } + + public int getMaxPartitionFetchBytes() { - return kafkaBufferSize; + return maxPartitionFetchBytes; } - @Config("kafka.buffer-size") - public KafkaConnectorConfig setKafkaBufferSize(String kafkaBufferSize) + @Config("kafka.max-partition-fetch-bytes") + public KafkaConnectorConfig setMaxPartitionFetchBytes(int maxPartitionFetchBytes) { - this.kafkaBufferSize = DataSize.valueOf(kafkaBufferSize); + this.maxPartitionFetchBytes = maxPartitionFetchBytes; return this; } diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorModule.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorModule.java index b49eb74c963e6..b809bb9fd2c3a 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorModule.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorModule.java @@ -13,13 +13,13 @@ */ package com.facebook.presto.kafka; +import com.facebook.airlift.configuration.AbstractConfigurationAwareModule; import com.facebook.presto.decoder.DecoderModule; import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.TypeManager; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer; import com.google.inject.Binder; -import com.google.inject.Module; import com.google.inject.Scopes; import javax.inject.Inject; @@ -34,19 +34,19 @@ * Guice module for the Apache Kafka connector. */ public class KafkaConnectorModule - implements Module + extends AbstractConfigurationAwareModule { @Override - public void configure(Binder binder) + public void setup(Binder binder) { binder.bind(KafkaConnector.class).in(Scopes.SINGLETON); + binder.bind(KafkaStaticServerset.class).in(Scopes.SINGLETON); binder.bind(KafkaMetadata.class).in(Scopes.SINGLETON); binder.bind(KafkaSplitManager.class).in(Scopes.SINGLETON); binder.bind(KafkaRecordSetProvider.class).in(Scopes.SINGLETON); - binder.bind(KafkaSimpleConsumerManager.class).in(Scopes.SINGLETON); - + binder.bind(KafkaConsumerManager.class).in(Scopes.SINGLETON); configBinder(binder).bindConfig(KafkaConnectorConfig.class); jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class); diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConsumerManager.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConsumerManager.java new file mode 100644 index 0000000000000..c4c5bae888914 --- /dev/null +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConsumerManager.java @@ -0,0 +1,68 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.kafka; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.spi.HostAddress; +import com.facebook.presto.spi.classloader.ThreadContextClassLoader; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteBufferDeserializer; + +import javax.inject.Inject; + +import java.nio.ByteBuffer; +import java.util.Properties; + +import static java.util.Objects.requireNonNull; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG; + +/** + * Manages connections to the Kafka nodes. A worker may connect to multiple Kafka nodes depending on partitions + * it needs to process. + */ +public class KafkaConsumerManager +{ + private static final Logger log = Logger.get(KafkaConsumerManager.class); + private final int maxPartitionFetchBytes; + private final int maxPollRecords; + + @Inject + public KafkaConsumerManager(KafkaConnectorConfig kafkaConnectorConfig) + { + requireNonNull(kafkaConnectorConfig, "kafkaConfig is null"); + this.maxPartitionFetchBytes = kafkaConnectorConfig.getMaxPartitionFetchBytes(); + this.maxPollRecords = kafkaConnectorConfig.getMaxPollRecords(); + } + + KafkaConsumer createConsumer(String threadName, HostAddress hostAddress) + { + final Properties properties = new Properties(); + properties.put(BOOTSTRAP_SERVERS_CONFIG, hostAddress.toString()); + properties.put(GROUP_ID_CONFIG, threadName); + properties.put(MAX_POLL_RECORDS_CONFIG, Integer.toString(maxPollRecords)); + properties.put(MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes); + properties.put(CLIENT_ID_CONFIG, String.format("%s-%s", threadName, hostAddress.toString())); + properties.put(ENABLE_AUTO_COMMIT_CONFIG, false); + + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(KafkaPlugin.class.getClassLoader())) { + log.debug("Creating KafkaConsumer for thread %d broker %s", threadName, hostAddress.toString()); + return new KafkaConsumer<>(properties, new ByteBufferDeserializer(), new ByteBufferDeserializer()); + } + } +} diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaErrorCode.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaErrorCode.java index 9338b98492116..1a27ecdd058ef 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaErrorCode.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaErrorCode.java @@ -25,7 +25,9 @@ public enum KafkaErrorCode implements ErrorCodeSupplier { - KAFKA_SPLIT_ERROR(0, EXTERNAL); + KAFKA_SPLIT_ERROR(0, EXTERNAL), + + KAFKA_CONSUMER_ERROR(1, EXTERNAL); private final ErrorCode errorCode; diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaInternalFieldDescription.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaInternalFieldDescription.java index bb5eec1e5856f..fe3530b66fbb8 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaInternalFieldDescription.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaInternalFieldDescription.java @@ -46,21 +46,6 @@ public enum KafkaInternalFieldDescription */ PARTITION_OFFSET_FIELD("_partition_offset", BigintType.BIGINT, "Offset for the message within the partition"), - /** - * _segment_start - Kafka start offset for the segment which contains the current message. This is per-partition. - */ - SEGMENT_START_FIELD("_segment_start", BigintType.BIGINT, "Segment start offset"), - - /** - * _segment_end - Kafka end offset for the segment which contains the current message. This is per-partition. The end offset is the first offset that is *not* in the segment. - */ - SEGMENT_END_FIELD("_segment_end", BigintType.BIGINT, "Segment end offset"), - - /** - * _segment_count - Running count of messages in a segment. - */ - SEGMENT_COUNT_FIELD("_segment_count", BigintType.BIGINT, "Running message count per segment"), - /** * _message_corrupt - True if the row converter could not read the a message. May be null if the row converter does not set a value (e.g. the dummy row converter does not). */ @@ -89,7 +74,12 @@ public enum KafkaInternalFieldDescription /** * _key_length - length in bytes of the key. */ - KEY_LENGTH_FIELD("_key_length", BigintType.BIGINT, "Total number of key bytes"); + KEY_LENGTH_FIELD("_key_length", BigintType.BIGINT, "Total number of key bytes"), + + /** + * _timestamp - offset timestamp, used to narrow scan range + */ + OFFSET_TIMESTAMP_FIELD("_timestamp", BigintType.BIGINT, "Offset Timestamp"); private static final Map BY_COLUMN_NAME = stream(KafkaInternalFieldDescription.values()) diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaMetadata.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaMetadata.java index 05f4379d60f8d..12144836297b4 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaMetadata.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaMetadata.java @@ -27,6 +27,9 @@ import com.facebook.presto.spi.SchemaTablePrefix; import com.facebook.presto.spi.TableNotFoundException; import com.facebook.presto.spi.connector.ConnectorMetadata; +import com.facebook.presto.spi.predicate.Domain; +import com.facebook.presto.spi.predicate.Marker; +import com.facebook.presto.spi.predicate.Range; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -204,7 +207,27 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable public List getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint constraint, Optional> desiredColumns) { KafkaTableHandle handle = convertTableHandle(table); - ConnectorTableLayout layout = new ConnectorTableLayout(new KafkaTableLayoutHandle(handle)); + long startTimestamp = 0; + long endTimestamp = 0; + Optional> domains = constraint.getSummary().getDomains(); + if (domains.isPresent()) { + Map columnHandleDomainMap = domains.get(); + for (Map.Entry entry : columnHandleDomainMap.entrySet()) { + if (entry.getKey() instanceof KafkaColumnHandle && ((KafkaColumnHandle) entry.getKey()).getName().equals(KafkaInternalFieldDescription.OFFSET_TIMESTAMP_FIELD.getColumnName())) { + Range span = entry.getValue().getValues().getRanges().getSpan(); + Marker low = span.getLow(); + Marker high = span.getHigh(); + if (!low.isLowerUnbounded()) { + startTimestamp = (long) low.getValue(); + } + if (!high.isUpperUnbounded()) { + endTimestamp = (long) high.getValue(); + } + } + } + } + + ConnectorTableLayout layout = new ConnectorTableLayout(new KafkaTableLayoutHandle(handle, startTimestamp, endTimestamp)); return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); } diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaRecordSet.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaRecordSet.java index b4ba6ac3ac639..c36da7d967255 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaRecordSet.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaRecordSet.java @@ -25,11 +25,10 @@ import com.facebook.presto.spi.type.Type; import com.google.common.collect.ImmutableList; import io.airlift.slice.Slice; -import kafka.api.FetchRequest; -import kafka.api.FetchRequestBuilder; -import kafka.javaapi.FetchResponse; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.message.MessageAndOffset; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import java.nio.ByteBuffer; import java.util.HashMap; @@ -48,18 +47,18 @@ import static java.util.Objects.requireNonNull; /** - * Kafka specific record set. Returns a cursor for a topic which iterates over a Kafka partition segment. + * Kafka specific record set. Returns a cursor for a topic which iterates over a Kafka partition for a specific range by startOffset and endOffset of messages. */ public class KafkaRecordSet implements RecordSet { private static final Logger log = Logger.get(KafkaRecordSet.class); - private static final int KAFKA_READ_BUFFER_SIZE = 100_000; private static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; + private static final int POLL_TIMEOUT = 500; private final KafkaSplit split; - private final KafkaSimpleConsumerManager consumerManager; + private final KafkaConsumerManager consumerManager; private final RowDecoder keyDecoder; private final RowDecoder messageDecoder; @@ -68,7 +67,7 @@ public class KafkaRecordSet private final List columnTypes; KafkaRecordSet(KafkaSplit split, - KafkaSimpleConsumerManager consumerManager, + KafkaConsumerManager consumerManager, List columnHandles, RowDecoder keyDecoder, RowDecoder messageDecoder) @@ -109,9 +108,9 @@ public class KafkaRecordCursor private long totalBytes; private long totalMessages; private long cursorOffset = split.getStart(); - private Iterator messageAndOffsetIterator; + private Iterator> messageAndOffsetIterator; private final AtomicBoolean reported = new AtomicBoolean(); - + private KafkaConsumer consumer; private final FieldValueProvider[] currentRowValues = new FieldValueProvider[columnHandles.size()]; KafkaRecordCursor() @@ -142,17 +141,17 @@ public boolean advanceNextPosition() { while (true) { if (cursorOffset >= split.getEnd()) { - return endOfData(); // Split end is exclusive. + return endOfData(); } // Create a fetch request openFetchRequest(); while (messageAndOffsetIterator.hasNext()) { - MessageAndOffset currentMessageAndOffset = messageAndOffsetIterator.next(); + ConsumerRecord currentMessageAndOffset = messageAndOffsetIterator.next(); long messageOffset = currentMessageAndOffset.offset(); if (messageOffset >= split.getEnd()) { - return endOfData(); // Past our split end. Bail. + return endOfData(); } if (messageOffset >= cursorOffset) { @@ -173,26 +172,27 @@ private boolean endOfData() return false; } - private boolean nextRow(MessageAndOffset messageAndOffset) + private boolean nextRow(ConsumerRecord messageAndOffset) { cursorOffset = messageAndOffset.offset() + 1; // Cursor now points to the next message. - totalBytes += messageAndOffset.message().payloadSize(); + totalBytes += messageAndOffset.serializedValueSize(); totalMessages++; byte[] keyData = EMPTY_BYTE_ARRAY; byte[] messageData = EMPTY_BYTE_ARRAY; - ByteBuffer key = messageAndOffset.message().key(); + ByteBuffer key = messageAndOffset.key(); if (key != null) { keyData = new byte[key.remaining()]; key.get(keyData); } - ByteBuffer message = messageAndOffset.message().payload(); + ByteBuffer message = messageAndOffset.value(); if (message != null) { messageData = new byte[message.remaining()]; message.get(messageData); } + long timeStamp = messageAndOffset.timestamp(); Map currentRowValuesMap = new HashMap<>(); Optional> decodedKey = keyDecoder.decodeRow(keyData, null); @@ -202,9 +202,6 @@ private boolean nextRow(MessageAndOffset messageAndOffset) if (columnHandle.isInternal()) { KafkaInternalFieldDescription fieldDescription = KafkaInternalFieldDescription.forColumnName(columnHandle.getName()); switch (fieldDescription) { - case SEGMENT_COUNT_FIELD: - currentRowValuesMap.put(columnHandle, longValueProvider(totalMessages)); - break; case PARTITION_OFFSET_FIELD: currentRowValuesMap.put(columnHandle, longValueProvider(messageAndOffset.offset())); break; @@ -229,11 +226,8 @@ private boolean nextRow(MessageAndOffset messageAndOffset) case PARTITION_ID_FIELD: currentRowValuesMap.put(columnHandle, longValueProvider(split.getPartitionId())); break; - case SEGMENT_START_FIELD: - currentRowValuesMap.put(columnHandle, longValueProvider(split.getStart())); - break; - case SEGMENT_END_FIELD: - currentRowValuesMap.put(columnHandle, longValueProvider(split.getEnd())); + case OFFSET_TIMESTAMP_FIELD: + currentRowValuesMap.put(columnHandle, longValueProvider(timeStamp)); break; default: throw new IllegalArgumentException("unknown internal field " + fieldDescription); @@ -305,30 +299,26 @@ private void checkFieldType(int field, Class expected) @Override public void close() { + if (consumer != null) { + consumer.close(); + } } private void openFetchRequest() { try { if (messageAndOffsetIterator == null) { - log.debug("Fetching %d bytes from offset %d (%d - %d). %d messages read so far", KAFKA_READ_BUFFER_SIZE, cursorOffset, split.getStart(), split.getEnd(), totalMessages); - FetchRequest req = new FetchRequestBuilder() - .clientId("presto-worker-" + Thread.currentThread().getName()) - .addFetch(split.getTopicName(), split.getPartitionId(), cursorOffset, KAFKA_READ_BUFFER_SIZE) - .build(); - - // TODO - this should look at the actual node this is running on and prefer - // that copy if running locally. - look into NodeInfo - SimpleConsumer consumer = consumerManager.getConsumer(split.getLeader()); - - FetchResponse fetchResponse = consumer.fetch(req); - if (fetchResponse.hasError()) { - short errorCode = fetchResponse.errorCode(split.getTopicName(), split.getPartitionId()); - log.warn("Fetch response has error: %d", errorCode); - throw new RuntimeException("could not fetch data from Kafka, error code is '" + errorCode + "'"); + String threadName = Thread.currentThread().getName(); + + if (consumer == null) { + consumer = consumerManager.createConsumer(threadName, split.getLeader()); } - messageAndOffsetIterator = fetchResponse.messageSet(split.getTopicName(), split.getPartitionId()).iterator(); + TopicPartition topicPartition = new TopicPartition(split.getTopicName(), split.getPartitionId()); + consumer.assign(ImmutableList.of(topicPartition)); + consumer.seek(topicPartition, cursorOffset); + ConsumerRecords records = consumer.poll(POLL_TIMEOUT); + messageAndOffsetIterator = records.records(topicPartition).iterator(); } } catch (Exception e) { // Catch all exceptions because Kafka library is written in scala and checked exceptions are not declared in method signature. diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaRecordSetProvider.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaRecordSetProvider.java index 2875269a21bf6..b7fcbb8cb04c9 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaRecordSetProvider.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaRecordSetProvider.java @@ -41,13 +41,15 @@ public class KafkaRecordSetProvider implements ConnectorRecordSetProvider { private DispatchingRowDecoderFactory decoderFactory; - private final KafkaSimpleConsumerManager consumerManager; + private final KafkaConsumerManager consumerManager; + private final KafkaConnectorConfig config; @Inject - public KafkaRecordSetProvider(DispatchingRowDecoderFactory decoderFactory, KafkaSimpleConsumerManager consumerManager) + public KafkaRecordSetProvider(DispatchingRowDecoderFactory decoderFactory, KafkaConsumerManager consumerManager, KafkaConnectorConfig config) { this.decoderFactory = requireNonNull(decoderFactory, "decoderFactory is null"); this.consumerManager = requireNonNull(consumerManager, "consumerManager is null"); + this.config = requireNonNull(config, "config is null"); } @Override diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSimpleConsumerManager.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSimpleConsumerManager.java deleted file mode 100644 index 273e323434d01..0000000000000 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSimpleConsumerManager.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.kafka; - -import com.facebook.airlift.log.Logger; -import com.facebook.presto.spi.HostAddress; -import com.facebook.presto.spi.NodeManager; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import kafka.javaapi.consumer.SimpleConsumer; - -import javax.annotation.PreDestroy; -import javax.inject.Inject; - -import java.util.Map; - -import static java.lang.Math.toIntExact; -import static java.lang.String.format; -import static java.util.Objects.requireNonNull; - -/** - * Manages connections to the Kafka nodes. A worker may connect to multiple Kafka nodes depending on the segments and partitions - * it needs to process. According to the Kafka source code, a Kafka {@link kafka.javaapi.consumer.SimpleConsumer} is thread-safe. - */ -public class KafkaSimpleConsumerManager -{ - private static final Logger log = Logger.get(KafkaSimpleConsumerManager.class); - - private final LoadingCache consumerCache; - - private final String connectorId; - private final NodeManager nodeManager; - private final int connectTimeoutMillis; - private final int bufferSizeBytes; - - @Inject - public KafkaSimpleConsumerManager( - KafkaConnectorId connectorId, - KafkaConnectorConfig kafkaConnectorConfig, - NodeManager nodeManager) - { - this.connectorId = requireNonNull(connectorId, "connectorId is null").toString(); - this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); - - requireNonNull(kafkaConnectorConfig, "kafkaConfig is null"); - this.connectTimeoutMillis = toIntExact(kafkaConnectorConfig.getKafkaConnectTimeout().toMillis()); - this.bufferSizeBytes = toIntExact(kafkaConnectorConfig.getKafkaBufferSize().toBytes()); - - this.consumerCache = CacheBuilder.newBuilder().build(CacheLoader.from(this::createConsumer)); - } - - @PreDestroy - public void tearDown() - { - for (Map.Entry entry : consumerCache.asMap().entrySet()) { - try { - entry.getValue().close(); - } - catch (Exception e) { - log.warn(e, "While closing consumer %s:", entry.getKey()); - } - } - } - - public SimpleConsumer getConsumer(HostAddress host) - { - requireNonNull(host, "host is null"); - return consumerCache.getUnchecked(host); - } - - private SimpleConsumer createConsumer(HostAddress host) - { - log.info("Creating new Consumer for %s", host); - return new SimpleConsumer(host.getHostText(), - host.getPort(), - connectTimeoutMillis, - bufferSizeBytes, - format("presto-kafka-%s-%s", connectorId, nodeManager.getCurrentNode().getNodeIdentifier())); - } -} diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSplit.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSplit.java index a0b1a6a2684fa..c0813b78b14c0 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSplit.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSplit.java @@ -26,11 +26,8 @@ import static java.util.Objects.requireNonNull; /** - * Represents a kafka specific {@link ConnectorSplit}. Each split is mapped to a segment file on disk (based off the segment offset start() and end() values) so that - * a partition can be processed by reading segment files from partition leader. Otherwise, a Kafka topic could only be processed along partition boundaries. - *

- * When planning to process a Kafka topic with Presto, using smaller than the recommended segment size (default is 1G) allows Presto to optimize early and process a topic - * with more workers in parallel. + * Represents a kafka specific {@link ConnectorSplit}. Each split is mapped to consecutive set of messages on disk (based off the message offset start and end values) so that + * a partition can be processed by reading these messages from partition leader. Otherwise, a Kafka topic could only be processed along partition boundaries. */ public class KafkaSplit implements ConnectorSplit diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSplitManager.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSplitManager.java index d599ea9c7462c..9bf801205a9d5 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSplitManager.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSplitManager.java @@ -13,7 +13,6 @@ */ package com.facebook.presto.kafka; -import com.facebook.airlift.log.Logger; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.ConnectorSplitSource; @@ -25,18 +24,12 @@ import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.io.CharStreams; -import kafka.api.PartitionOffsetRequestInfo; -import kafka.cluster.BrokerEndPoint; -import kafka.common.TopicAndPartition; -import kafka.javaapi.OffsetRequest; -import kafka.javaapi.OffsetResponse; -import kafka.javaapi.PartitionMetadata; -import kafka.javaapi.TopicMetadata; -import kafka.javaapi.TopicMetadataRequest; -import kafka.javaapi.TopicMetadataResponse; -import kafka.javaapi.consumer.SimpleConsumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; import javax.inject.Inject; @@ -47,10 +40,11 @@ import java.net.MalformedURLException; import java.net.URI; import java.net.URL; +import java.nio.ByteBuffer; import java.util.List; -import java.util.Set; -import java.util.concurrent.ThreadLocalRandom; +import java.util.Map; +import static com.facebook.presto.kafka.KafkaErrorCode.KAFKA_CONSUMER_ERROR; import static com.facebook.presto.kafka.KafkaErrorCode.KAFKA_SPLIT_ERROR; import static com.facebook.presto.kafka.KafkaHandleResolver.convertLayout; import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; @@ -65,23 +59,22 @@ public class KafkaSplitManager implements ConnectorSplitManager { - private static final Logger log = Logger.get(KafkaSplitManager.class); - private final String connectorId; - private final KafkaSimpleConsumerManager consumerManager; - private final Set nodes; + private final KafkaConsumerManager consumerManager; + private final KafkaStaticServerset servers; @Inject public KafkaSplitManager( KafkaConnectorId connectorId, KafkaConnectorConfig kafkaConnectorConfig, - KafkaSimpleConsumerManager consumerManager) + KafkaStaticServerset servers, + KafkaConsumerManager consumerManager) { this.connectorId = requireNonNull(connectorId, "connectorId is null").toString(); this.consumerManager = requireNonNull(consumerManager, "consumerManager is null"); requireNonNull(kafkaConnectorConfig, "kafkaConfig is null"); - this.nodes = ImmutableSet.copyOf(kafkaConnectorConfig.getNodes()); + this.servers = servers; } @Override @@ -93,44 +86,50 @@ public ConnectorSplitSource getSplits( { KafkaTableHandle kafkaTableHandle = convertLayout(layout).getTable(); try { - SimpleConsumer simpleConsumer = consumerManager.getConsumer(selectRandom(nodes)); - - TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(ImmutableList.of(kafkaTableHandle.getTopicName())); - TopicMetadataResponse topicMetadataResponse = simpleConsumer.send(topicMetadataRequest); + HostAddress node = servers.selectRandomServer(); + String topic = kafkaTableHandle.getTopicName(); + KafkaTableLayoutHandle layoutHandle = (KafkaTableLayoutHandle) layout; + KafkaConsumer consumer = consumerManager.createConsumer(Thread.currentThread().getName(), node); + List partitions = consumer.partitionsFor(topic); ImmutableList.Builder splits = ImmutableList.builder(); - for (TopicMetadata metadata : topicMetadataResponse.topicsMetadata()) { - for (PartitionMetadata part : metadata.partitionsMetadata()) { - log.debug("Adding Partition %s/%s", metadata.topic(), part.partitionId()); - - BrokerEndPoint leader = part.leader(); - if (leader == null) { - throw new PrestoException(GENERIC_INTERNAL_ERROR, format("Leader election in progress for Kafka topic '%s' partition %s", metadata.topic(), part.partitionId())); - } - - HostAddress partitionLeader = HostAddress.fromParts(leader.host(), leader.port()); - - SimpleConsumer leaderConsumer = consumerManager.getConsumer(partitionLeader); - // Kafka contains a reverse list of "end - start" pairs for the splits - - long[] offsets = findAllOffsets(leaderConsumer, metadata.topic(), part.partitionId()); - - for (int i = offsets.length - 1; i > 0; i--) { - KafkaSplit split = new KafkaSplit( - connectorId, - metadata.topic(), - kafkaTableHandle.getKeyDataFormat(), - kafkaTableHandle.getMessageDataFormat(), - kafkaTableHandle.getKeyDataSchemaLocation().map(KafkaSplitManager::readSchema), - kafkaTableHandle.getMessageDataSchemaLocation().map(KafkaSplitManager::readSchema), - part.partitionId(), - offsets[i], - offsets[i - 1], - partitionLeader); - splits.add(split); - } + for (PartitionInfo partition : partitions) { + Node leader = partition.leader(); + if (leader == null) { + throw new PrestoException(GENERIC_INTERNAL_ERROR, format("Leader election in progress for Kafka topic '%s' partition %s", topic, partition.partition())); + } + + HostAddress partitionLeader = HostAddress.fromParts(leader.host(), leader.port()); + long startTimestamp = layoutHandle.getStartOffsetTimestamp(); + long endTimestamp = layoutHandle.getEndOffsetTimestamp(); + + if (startTimestamp > endTimestamp) { + throw new IllegalArgumentException(String.format("Invalid Kafka Offset start/end pair: %s - %s", startTimestamp, endTimestamp)); } + + TopicPartition topicPartition = new TopicPartition(partition.topic(), partition.partition()); + consumer.assign(ImmutableList.of(topicPartition)); + + long beginningOffset = (startTimestamp == 0) ? + consumer.beginningOffsets(ImmutableList.of(topicPartition)).values().iterator().next() : + findOffsetsByTimestamp(consumer, topicPartition, startTimestamp); + long endOffset = (endTimestamp == 0) ? + consumer.endOffsets(ImmutableList.of(topicPartition)).values().iterator().next() : + findOffsetsByTimestamp(consumer, topicPartition, endTimestamp); + + KafkaSplit split = new KafkaSplit( + connectorId, + topic, + kafkaTableHandle.getKeyDataFormat(), + kafkaTableHandle.getMessageDataFormat(), + kafkaTableHandle.getKeyDataSchemaLocation().map(KafkaSplitManager::readSchema), + kafkaTableHandle.getMessageDataSchemaLocation().map(KafkaSplitManager::readSchema), + partition.partition(), + beginningOffset, + endOffset, + partitionLeader); + splits.add(split); } return new FixedSplitSource(splits.build()); @@ -143,6 +142,21 @@ public ConnectorSplitSource getSplits( } } + private static long findOffsetsByTimestamp(KafkaConsumer consumer, TopicPartition topicPartition, long timestamp) + { + try { + Map topicPartitionOffsets = consumer.offsetsForTimes(ImmutableMap.of(topicPartition, timestamp)); + if (topicPartitionOffsets == null || topicPartitionOffsets.values().size() == 0) { + return 0; + } + OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsets.values().iterator().next(); + return offsetAndTimestamp.offset(); + } + catch (IllegalArgumentException e) { + throw new PrestoException(KAFKA_CONSUMER_ERROR, String.format("Failed to find offset by timestamp: %d for partition %d", timestamp, topicPartition.partition()), e); + } + } + private static String readSchema(String dataSchemaLocation) { InputStream inputStream = null; @@ -190,31 +204,4 @@ private static boolean isURI(String location) } return true; } - - private static long[] findAllOffsets(SimpleConsumer consumer, String topicName, int partitionId) - { - TopicAndPartition topicAndPartition = new TopicAndPartition(topicName, partitionId); - - // The API implies that this will always return all of the offsets. So it seems a partition can not have - // more than Integer.MAX_VALUE-1 segments. - // - // This also assumes that the lowest value returned will be the first segment available. So if segments have been dropped off, this value - // should not be 0. - PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), Integer.MAX_VALUE); - OffsetRequest offsetRequest = new OffsetRequest(ImmutableMap.of(topicAndPartition, partitionOffsetRequestInfo), kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId()); - OffsetResponse offsetResponse = consumer.getOffsetsBefore(offsetRequest); - - if (offsetResponse.hasError()) { - short errorCode = offsetResponse.errorCode(topicName, partitionId); - throw new RuntimeException("could not fetch data from Kafka, error code is '" + errorCode + "'"); - } - - return offsetResponse.offsets(topicName, partitionId); - } - - private static T selectRandom(Iterable iterable) - { - List list = ImmutableList.copyOf(iterable); - return list.get(ThreadLocalRandom.current().nextInt(list.size())); - } } diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaStaticServerset.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaStaticServerset.java new file mode 100644 index 0000000000000..d419345385fe3 --- /dev/null +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaStaticServerset.java @@ -0,0 +1,49 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.kafka; + +import com.facebook.presto.spi.HostAddress; +import com.google.common.collect.ImmutableList; + +import javax.inject.Inject; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +public class KafkaStaticServerset +{ + private final List nodes; + + @Inject + public KafkaStaticServerset(KafkaConnectorConfig config) + { + requireNonNull(config.getNodes(), "nodes is null"); + checkArgument(!config.getNodes().isEmpty(), "nodes must specify at least one URI"); + this.nodes = config.getNodes(); + } + + public HostAddress selectRandomServer() + { + return selectRandom(this.nodes); + } + + private static T selectRandom(Iterable iterable) + { + List list = ImmutableList.copyOf(iterable); + return list.get(ThreadLocalRandom.current().nextInt(list.size())); + } +} diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaTableLayoutHandle.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaTableLayoutHandle.java index de0c37816b234..0581a812bf6d4 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaTableLayoutHandle.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaTableLayoutHandle.java @@ -17,17 +17,25 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; public class KafkaTableLayoutHandle implements ConnectorTableLayoutHandle { private final KafkaTableHandle table; + private final long startOffsetTimestamp; + private final long endOffsetTimestamp; @JsonCreator - public KafkaTableLayoutHandle(@JsonProperty("table") KafkaTableHandle table) + public KafkaTableLayoutHandle( + @JsonProperty("table") KafkaTableHandle table, + @JsonProperty("startOffsetTimestamp") long startOffsetTimestamp, + @JsonProperty("endOffsetTimestamp") long endOffsetTimestamp) { this.table = requireNonNull(table, "table is null"); + this.startOffsetTimestamp = startOffsetTimestamp; + this.endOffsetTimestamp = endOffsetTimestamp; } @JsonProperty @@ -36,9 +44,24 @@ public KafkaTableHandle getTable() return table; } + @JsonProperty + public long getStartOffsetTimestamp() + { + return startOffsetTimestamp; + } + + @JsonProperty + public long getEndOffsetTimestamp() + { + return endOffsetTimestamp; + } + @Override public String toString() { - return table.toString(); + return toStringHelper(this) + .add("table", table.toString()) + .add("startOffsetTimestamp", startOffsetTimestamp) + .add("endOffsetTimestamp", endOffsetTimestamp).toString(); } } diff --git a/presto-kafka/src/test/java/com/facebook/presto/kafka/TestKafkaConnectorConfig.java b/presto-kafka/src/test/java/com/facebook/presto/kafka/TestKafkaConnectorConfig.java index e161e8fb52552..c7d3e29a2a775 100644 --- a/presto-kafka/src/test/java/com/facebook/presto/kafka/TestKafkaConnectorConfig.java +++ b/presto-kafka/src/test/java/com/facebook/presto/kafka/TestKafkaConnectorConfig.java @@ -26,13 +26,14 @@ public class TestKafkaConnectorConfig public void testDefaults() { ConfigAssertions.assertRecordedDefaults(ConfigAssertions.recordDefaults(KafkaConnectorConfig.class) - .setNodes("") + .setNodes(null) .setKafkaConnectTimeout("10s") - .setKafkaBufferSize("64kB") .setDefaultSchema("default") .setTableNames("") .setTableDescriptionDir(new File("etc/kafka/")) - .setHideInternalColumns(true)); + .setHideInternalColumns(true) + .setMaxPartitionFetchBytes(1048576) + .setMaxPollRecords(500)); } @Test @@ -44,8 +45,9 @@ public void testExplicitPropertyMappings() .put("kafka.default-schema", "kafka") .put("kafka.nodes", "localhost:12345,localhost:23456") .put("kafka.connect-timeout", "1h") - .put("kafka.buffer-size", "1MB") .put("kafka.hide-internal-columns", "false") + .put("kafka.max-partition-fetch-bytes", "1024") + .put("kafka.max-poll-records", "1000") .build(); KafkaConnectorConfig expected = new KafkaConnectorConfig() @@ -54,8 +56,9 @@ public void testExplicitPropertyMappings() .setDefaultSchema("kafka") .setNodes("localhost:12345, localhost:23456") .setKafkaConnectTimeout("1h") - .setKafkaBufferSize("1MB") - .setHideInternalColumns(false); + .setHideInternalColumns(false) + .setMaxPartitionFetchBytes(1024) + .setMaxPollRecords(1000); ConfigAssertions.assertFullMapping(properties, expected); } diff --git a/presto-kafka/src/test/java/com/facebook/presto/kafka/TestManySegments.java b/presto-kafka/src/test/java/com/facebook/presto/kafka/TestManySegments.java deleted file mode 100644 index e76f0b9ac3ca6..0000000000000 --- a/presto-kafka/src/test/java/com/facebook/presto/kafka/TestManySegments.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.kafka; - -import com.facebook.presto.Session; -import com.facebook.presto.kafka.util.EmbeddedKafka; -import com.facebook.presto.kafka.util.TestUtils; -import com.facebook.presto.spi.SchemaTableName; -import com.facebook.presto.spi.type.BigintType; -import com.facebook.presto.testing.MaterializedResult; -import com.facebook.presto.tests.StandaloneQueryRunner; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import kafka.producer.KeyedMessage; -import org.testng.annotations.AfterClass; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.util.Properties; -import java.util.UUID; - -import static com.facebook.presto.kafka.util.EmbeddedKafka.CloseableProducer; -import static com.facebook.presto.kafka.util.TestUtils.createEmptyTopicDescription; -import static com.facebook.presto.testing.TestingSession.testSessionBuilder; -import static com.facebook.presto.testing.assertions.Assert.assertEquals; - -@Test(singleThreaded = true) -public class TestManySegments -{ - private static final Session SESSION = testSessionBuilder() - .setCatalog("kafka") - .setSchema("default") - .build(); - - private EmbeddedKafka embeddedKafka; - private String topicName; - private StandaloneQueryRunner queryRunner; - - @BeforeClass - public void startKafka() - throws Exception - { - embeddedKafka = EmbeddedKafka.createEmbeddedKafka(); - embeddedKafka.start(); - - topicName = "test_" + UUID.randomUUID().toString().replaceAll("-", "_"); - - Properties topicProperties = new Properties(); - topicProperties.setProperty("segment.bytes", "1048576"); - - embeddedKafka.createTopics(1, 1, topicProperties, topicName); - - try (CloseableProducer producer = embeddedKafka.createProducer()) { - int jMax = 10_000; - int iMax = 100_000 / jMax; - for (long i = 0; i < iMax; i++) { - ImmutableList.Builder> builder = ImmutableList.builder(); - for (long j = 0; j < jMax; j++) { - builder.add(new KeyedMessage(topicName, i, ImmutableMap.of("id", Long.toString(i * iMax + j), "value", UUID.randomUUID().toString()))); - } - producer.send(builder.build()); - } - } - } - - @AfterClass(alwaysRun = true) - public void stopKafka() - throws Exception - { - embeddedKafka.close(); - embeddedKafka = null; - } - - @BeforeMethod - public void spinUp() - throws Exception - { - this.queryRunner = new StandaloneQueryRunner(SESSION); - - TestUtils.installKafkaPlugin(embeddedKafka, queryRunner, - ImmutableMap.builder() - .put(createEmptyTopicDescription(topicName, new SchemaTableName("default", topicName))) - .build()); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() - { - queryRunner.close(); - queryRunner = null; - } - - @Test - public void testManySegments() - { - MaterializedResult result = queryRunner.execute("SELECT count(_message) from " + topicName); - - MaterializedResult expected = MaterializedResult.resultBuilder(SESSION, BigintType.BIGINT) - .row(100000L) - .build(); - - assertEquals(result, expected); - } -} diff --git a/presto-kafka/src/test/java/com/facebook/presto/kafka/TestMinimalFunctionality.java b/presto-kafka/src/test/java/com/facebook/presto/kafka/TestMinimalFunctionality.java index 12453885d7c1e..bce51198cc024 100644 --- a/presto-kafka/src/test/java/com/facebook/presto/kafka/TestMinimalFunctionality.java +++ b/presto-kafka/src/test/java/com/facebook/presto/kafka/TestMinimalFunctionality.java @@ -24,7 +24,8 @@ import com.facebook.presto.testing.MaterializedResult; import com.facebook.presto.tests.StandaloneQueryRunner; import com.google.common.collect.ImmutableMap; -import kafka.producer.KeyedMessage; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; @@ -35,7 +36,6 @@ import java.util.Properties; import java.util.UUID; -import static com.facebook.presto.kafka.util.EmbeddedKafka.CloseableProducer; import static com.facebook.presto.kafka.util.TestUtils.createEmptyTopicDescription; import static com.facebook.presto.testing.TestingSession.testSessionBuilder; import static com.facebook.presto.testing.assertions.Assert.assertEquals; @@ -96,10 +96,13 @@ public void tearDown() private void createMessages(String topicName, int count) { - try (CloseableProducer producer = embeddedKafka.createProducer()) { - for (long i = 0; i < count; i++) { - Object message = ImmutableMap.of("id", Long.toString(i), "value", UUID.randomUUID().toString()); - producer.send(new KeyedMessage<>(topicName, i, message)); + try (KafkaProducer producer = embeddedKafka.createProducer()) { + int jMax = 10; + int iMax = count / jMax; + for (long i = 0; i < iMax; i++) { + for (long j = 0; j < jMax; j++) { + producer.send(new ProducerRecord<>(topicName, i, ImmutableMap.of("id", Long.toString(i * iMax + j), "value", UUID.randomUUID().toString()))); + } } } } diff --git a/presto-kafka/src/test/java/com/facebook/presto/kafka/util/EmbeddedKafka.java b/presto-kafka/src/test/java/com/facebook/presto/kafka/util/EmbeddedKafka.java index 8c167f045f55d..8b0adfe9662f4 100644 --- a/presto-kafka/src/test/java/com/facebook/presto/kafka/util/EmbeddedKafka.java +++ b/presto-kafka/src/test/java/com/facebook/presto/kafka/util/EmbeddedKafka.java @@ -18,11 +18,11 @@ import com.google.common.io.Files; import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; -import kafka.javaapi.producer.Producer; -import kafka.producer.ProducerConfig; import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; import kafka.utils.ZkUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.LongSerializer; import java.io.Closeable; import java.io.File; @@ -37,6 +37,10 @@ import static com.google.common.io.MoreFiles.deleteRecursively; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static java.util.Objects.requireNonNull; +import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; public class EmbeddedKafka implements Closeable @@ -77,12 +81,12 @@ public static EmbeddedKafka createEmbeddedKafka(Properties overrideProperties) .put("log.flush.interval.messages", "10000") .put("log.flush.interval.ms", "1000") .put("log.retention.minutes", "60") - .put("log.segment.bytes", "1048576") .put("auto.create.topics.enable", "false") .put("zookeeper.connection.timeout.ms", "1000000") .put("port", Integer.toString(port)) .put("log.dirs", kafkaDataDir.getAbsolutePath()) .put("zookeeper.connect", zookeeper.getConnectString()) + .put("offsets.topic.replication.factor", "1") .putAll(Maps.fromProperties(overrideProperties)) .build(); @@ -131,28 +135,15 @@ public void createTopics(int partitions, int replication, Properties topicProper } } - public CloseableProducer createProducer() + public KafkaProducer createProducer() { - Map properties = ImmutableMap.builder() - .put("metadata.broker.list", getConnectString()) - .put("serializer.class", JsonEncoder.class.getName()) - .put("key.serializer.class", NumberEncoder.class.getName()) - .put("partitioner.class", NumberPartitioner.class.getName()) - .put("request.required.acks", "1") - .build(); - - ProducerConfig producerConfig = new ProducerConfig(toProperties(properties)); - return new CloseableProducer<>(producerConfig); - } + Properties properties = new Properties(); + properties.setProperty(BOOTSTRAP_SERVERS_CONFIG, getConnectString()); + properties.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, JsonEncoder.class.getName()); + properties.setProperty(KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); + properties.setProperty(ACKS_CONFIG, "1"); - public static class CloseableProducer - extends Producer - implements AutoCloseable - { - public CloseableProducer(ProducerConfig config) - { - super(config); - } + return new KafkaProducer<>(properties); } public int getZookeeperPort() diff --git a/presto-kafka/src/test/java/com/facebook/presto/kafka/util/JsonEncoder.java b/presto-kafka/src/test/java/com/facebook/presto/kafka/util/JsonEncoder.java index af15f28fc4959..cc806e32e54df 100644 --- a/presto-kafka/src/test/java/com/facebook/presto/kafka/util/JsonEncoder.java +++ b/presto-kafka/src/test/java/com/facebook/presto/kafka/util/JsonEncoder.java @@ -15,24 +15,23 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import kafka.serializer.Encoder; -import kafka.utils.VerifiableProperties; +import org.apache.kafka.common.serialization.Serializer; import java.io.UncheckedIOException; public class JsonEncoder - implements Encoder + implements Serializer { private final ObjectMapper objectMapper = new ObjectMapper(); @SuppressWarnings("UnusedParameters") - public JsonEncoder(VerifiableProperties properties) + public JsonEncoder() { // constructor required by Kafka } @Override - public byte[] toBytes(Object o) + public byte[] serialize(String topic, Object o) { try { return objectMapper.writeValueAsBytes(o); diff --git a/presto-kafka/src/test/java/com/facebook/presto/kafka/util/KafkaLoader.java b/presto-kafka/src/test/java/com/facebook/presto/kafka/util/KafkaLoader.java index 16f2db7de74e4..96701651801d4 100644 --- a/presto-kafka/src/test/java/com/facebook/presto/kafka/util/KafkaLoader.java +++ b/presto-kafka/src/test/java/com/facebook/presto/kafka/util/KafkaLoader.java @@ -25,8 +25,8 @@ import com.facebook.presto.tests.AbstractTestingPrestoClient; import com.facebook.presto.tests.ResultsSession; import com.google.common.collect.ImmutableMap; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.ISODateTimeFormat; @@ -58,10 +58,10 @@ public class KafkaLoader private static final DateTimeFormatter ISO8601_FORMATTER = ISODateTimeFormat.dateTime(); private final String topicName; - private final Producer producer; + private final KafkaProducer producer; private final AtomicLong count = new AtomicLong(); - public KafkaLoader(Producer producer, + public KafkaLoader(KafkaProducer producer, String topicName, TestingPrestoServer prestoServer, Session defaultSession) @@ -114,7 +114,7 @@ public void addResults(QueryStatusInfo statusInfo, QueryData data) } } - producer.send(new KeyedMessage<>(topicName, count.getAndIncrement(), builder.build())); + producer.send(new ProducerRecord<>(topicName, count.getAndIncrement(), builder.build())); } } } diff --git a/presto-kafka/src/test/java/com/facebook/presto/kafka/util/NumberEncoder.java b/presto-kafka/src/test/java/com/facebook/presto/kafka/util/NumberEncoder.java deleted file mode 100644 index e0708a5c15d1a..0000000000000 --- a/presto-kafka/src/test/java/com/facebook/presto/kafka/util/NumberEncoder.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.kafka.util; - -import kafka.serializer.Encoder; -import kafka.utils.VerifiableProperties; - -import java.nio.ByteBuffer; - -public class NumberEncoder - implements Encoder -{ - @SuppressWarnings("UnusedParameters") - public NumberEncoder(VerifiableProperties properties) - { - // constructor required by Kafka - } - - @Override - public byte[] toBytes(Number value) - { - ByteBuffer buf = ByteBuffer.allocate(8); - buf.putLong(value == null ? 0L : value.longValue()); - return buf.array(); - } -} diff --git a/presto-kafka/src/test/java/com/facebook/presto/kafka/util/NumberPartitioner.java b/presto-kafka/src/test/java/com/facebook/presto/kafka/util/NumberPartitioner.java deleted file mode 100644 index d8b5aa3f6eba9..0000000000000 --- a/presto-kafka/src/test/java/com/facebook/presto/kafka/util/NumberPartitioner.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.kafka.util; - -import kafka.producer.Partitioner; -import kafka.utils.VerifiableProperties; - -import static java.lang.Math.toIntExact; - -public class NumberPartitioner - implements Partitioner -{ - @SuppressWarnings("UnusedParameters") - public NumberPartitioner(VerifiableProperties properties) - { - // constructor required by Kafka - } - - @Override - public int partition(Object key, int numPartitions) - { - if (key instanceof Number) { - return toIntExact(((Number) key).longValue() % numPartitions); - } - return 0; - } -} diff --git a/presto-kafka/src/test/java/com/facebook/presto/kafka/util/TestUtils.java b/presto-kafka/src/test/java/com/facebook/presto/kafka/util/TestUtils.java index 755a54f35bab4..0147a718ff8c0 100644 --- a/presto-kafka/src/test/java/com/facebook/presto/kafka/util/TestUtils.java +++ b/presto-kafka/src/test/java/com/facebook/presto/kafka/util/TestUtils.java @@ -23,6 +23,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; +import org.apache.kafka.clients.producer.KafkaProducer; import java.io.IOException; import java.net.ServerSocket; @@ -31,7 +32,6 @@ import java.util.Optional; import java.util.Properties; -import static com.facebook.presto.kafka.util.EmbeddedKafka.CloseableProducer; import static java.lang.String.format; public final class TestUtils @@ -71,7 +71,7 @@ public static void installKafkaPlugin(EmbeddedKafka embeddedKafka, QueryRunner q public static void loadTpchTopic(EmbeddedKafka embeddedKafka, TestingPrestoClient prestoClient, String topicName, QualifiedObjectName tpchTableName) { - try (CloseableProducer producer = embeddedKafka.createProducer(); + try (KafkaProducer producer = embeddedKafka.createProducer(); KafkaLoader tpchLoader = new KafkaLoader(producer, topicName, prestoClient.getServer(), prestoClient.getDefaultSession())) { tpchLoader.execute(format("SELECT * from %s", tpchTableName)); } diff --git a/presto-product-tests/pom.xml b/presto-product-tests/pom.xml index 2197f5b0ac701..fc0c6aaefae0d 100644 --- a/presto-product-tests/pom.xml +++ b/presto-product-tests/pom.xml @@ -15,6 +15,7 @@ ${project.parent.basedir} com.facebook.presto.tests.TemptoProductTestRunner true + 2.12.2 @@ -136,6 +137,17 @@ com.facebook.presto.cassandra cassandra-driver + + org.scala-lang + scala-library + ${scala.version} + runtime + + + org.apache.kafka + kafka-clients + runtime +