Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 62 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
<dep.asm.version>6.2.1</dep.asm.version>
<dep.gcs.version>1.9.17</dep.gcs.version>
<dep.alluxio.version>2.1.1</dep.alluxio.version>
<dep.kafka.version>2.3.1</dep.kafka.version>

<!--
America/Bahia_Banderas has:
Expand Down Expand Up @@ -1127,7 +1128,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.1.1</version>
<version>${dep.kafka.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
Expand All @@ -1137,6 +1138,66 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
</exclusion>
<exclusion>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
</exclusion>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</exclusion>
<exclusion>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${dep.kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
<exclusion>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down
9 changes: 0 additions & 9 deletions presto-docs/src/main/sphinx/connector/kafka-tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,6 @@ built-in ones:
-------------------+---------+-------+---------------------------------------------
_partition_id | bigint | | Partition Id
_partition_offset | bigint | | Offset for the message within the partition
_segment_start | bigint | | Segment start offset
_segment_end | bigint | | Segment end offset
_segment_count | bigint | | Running message count per segment
_key | varchar | | Key text
_key_corrupt | boolean | | Key data is corrupt
_key_length | bigint | | Total number of key bytes
Expand Down Expand Up @@ -231,9 +228,6 @@ The customer table now has an additional column: ``kafka_key``.
kafka_key | bigint | |
_partition_id | bigint | | Partition Id
_partition_offset | bigint | | Offset for the message within the partition
_segment_start | bigint | | Segment start offset
_segment_end | bigint | | Segment end offset
_segment_count | bigint | | Running message count per segment
_key | varchar | | Key text
_key_corrupt | boolean | | Key data is corrupt
_key_length | bigint | | Total number of key bytes
Expand Down Expand Up @@ -357,9 +351,6 @@ the sum query from earlier can operate on the ``account_balance`` column directl
comment | varchar | |
_partition_id | bigint | | Partition Id
_partition_offset | bigint | | Offset for the message within the partition
_segment_start | bigint | | Segment start offset
_segment_end | bigint | | Segment end offset
_segment_count | bigint | | Running message count per segment
_key | varchar | | Key text
_key_corrupt | boolean | | Key data is corrupt
_key_length | bigint | | Total number of key bytes
Expand Down
49 changes: 26 additions & 23 deletions presto-docs/src/main/sphinx/connector/kafka.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ This connector allows the use of Apache Kafka topics as tables in Presto.
Each message is presented as a row in Presto.

Topics can be live: rows will appear as data arrives and disappear as
segments get dropped. This can result in strange behavior if accessing the
messages get dropped. This can result in strange behavior if accessing the
same table multiple times in a single query (e.g., performing a self join).

.. note::

Apache Kafka 0.8+ is supported although it is highly recommend to use 0.8.1 or later.
Apache Kafka 2.3.1+ is supported.

Configuration
-------------
Expand Down Expand Up @@ -48,17 +48,18 @@ Configuration Properties

The following configuration properties are available:

=============================== ==============================================================
Property Name Description
=============================== ==============================================================
``kafka.table-names`` List of all tables provided by the catalog
``kafka.default-schema`` Default schema name for tables
``kafka.nodes`` List of nodes in the Kafka cluster
``kafka.connect-timeout`` Timeout for connecting to the Kafka cluster
``kafka.buffer-size`` Kafka read buffer size
``kafka.table-description-dir`` Directory containing topic description files
``kafka.hide-internal-columns`` Controls whether internal columns are part of the table schema or not
=============================== ==============================================================
=================================== ==============================================================
Property Name Description
=================================== ==============================================================
``kafka.table-names`` List of all tables provided by the catalog
``kafka.default-schema`` Default schema name for tables
``kafka.nodes`` List of nodes in the Kafka cluster
``kafka.connect-timeout`` Timeout for connecting to the Kafka cluster
``kafka.max-poll-records`` Maximum number of records per poll
``kafka.max-partition-fetch-bytes`` Maximum number of bytes from one partition per poll
``kafka.table-description-dir`` Directory containing topic description files
``kafka.hide-internal-columns`` Controls whether internal columns are part of the table schema or not
=================================== ==============================================================

``kafka.table-names``
^^^^^^^^^^^^^^^^^^^^^
Expand Down Expand Up @@ -92,7 +93,7 @@ This property is required; there is no default and at least one node must be def
.. note::

Presto must still be able to connect to all nodes of the cluster
even if only a subset is specified here as segment files may be
even if only a subset is specified here as messages may be
located only on a specific node.

``kafka.connect-timeout``
Expand All @@ -104,14 +105,19 @@ timeouts, increasing this value is a good strategy.

This property is optional; the default is 10 seconds (``10s``).

``kafka.buffer-size``
^^^^^^^^^^^^^^^^^^^^^
``kafka.max-poll-records``
^^^^^^^^^^^^^^^^^^^^^^^^^^

The maximum number of records per poll() from Kafka.

This property is optional; the default is ``500``.

kafka.max-partition-fetch-bytes``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Size of the internal data buffer for reading data from Kafka. The data
buffer must be able to hold at least one message and ideally can hold many
messages. There is one data buffer allocated per worker and data node.
Maximum number of bytes from one partition per poll

This property is optional; the default is ``64kb``.
This property is optional; the default is ``1MB``.

``kafka.table-description-dir``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down Expand Up @@ -141,9 +147,6 @@ Column name Type Description
======================= ========= =============================
``_partition_id`` BIGINT ID of the Kafka partition which contains this row.
``_partition_offset`` BIGINT Offset within the Kafka partition for this row.
``_segment_start`` BIGINT Lowest offset in the segment (inclusive) which contains this row. This offset is partition specific.
``_segment_end`` BIGINT Highest offset in the segment (exclusive) which contains this row. The offset is partition specific. This is the same value as ``_segment_start`` of the next segment (if it exists).
``_segment_count`` BIGINT Running count for the current row within the segment. For an uncompacted topic, ``_segment_start + _segment_count`` is equal to ``_partition_offset``.
``_message_corrupt`` BOOLEAN True if the decoder could not decode the message for this row. When true, data columns mapped from the message should be treated as invalid.
``_message`` VARCHAR Message bytes as an UTF-8 encoded string. This is only useful for a text topic.
``_message_length`` BIGINT Number of bytes in the message.
Expand Down
24 changes: 19 additions & 5 deletions presto-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<scala.version>2.12.2</scala.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -62,6 +63,11 @@
<artifactId>kafka_2.12</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
Expand All @@ -73,11 +79,6 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>

<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
Expand Down Expand Up @@ -118,6 +119,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<scope>provided</scope>
</dependency>

<!-- used by tests but also needed transitively -->
<dependency>
<groupId>com.facebook.airlift</groupId>
Expand Down Expand Up @@ -196,6 +203,13 @@
<artifactId>annotations</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@
import com.facebook.presto.spi.HostAddress;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import io.airlift.units.DataSize.Unit;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;

import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;

import java.io.File;
import java.util.List;
import java.util.Set;

import static com.google.common.collect.Iterables.transform;
Expand All @@ -37,18 +35,13 @@ public class KafkaConnectorConfig
/**
* Seed nodes for Kafka cluster. At least one must exist.
*/
private Set<HostAddress> nodes = ImmutableSet.of();
private List<HostAddress> nodes;

/**
* Timeout to connect to Kafka.
*/
private Duration kafkaConnectTimeout = Duration.valueOf("10s");

/**
* Buffer size for connecting to Kafka.
*/
private DataSize kafkaBufferSize = new DataSize(64, Unit.KILOBYTE);

/**
* The schema name to use in the connector.
*/
Expand All @@ -69,6 +62,16 @@ public class KafkaConnectorConfig
*/
private boolean hideInternalColumns = true;

/**
* Maximum number of records per poll()
*/
private int maxPollRecords = 500;

/**
* Maximum number of bytes from one partition per poll()
*/
private int maxPartitionFetchBytes = 1024 * 1024;

@NotNull
public File getTableDescriptionDir()
{
Expand Down Expand Up @@ -108,16 +111,15 @@ public KafkaConnectorConfig setDefaultSchema(String defaultSchema)
return this;
}

@Size(min = 1)
public Set<HostAddress> getNodes()
public List<HostAddress> getNodes()
{
return nodes;
}

@Config("kafka.nodes")
public KafkaConnectorConfig setNodes(String nodes)
{
this.nodes = (nodes == null) ? null : parseNodes(nodes);
this.nodes = (nodes == null) ? null : parseNodes(nodes).asList();
return this;
}

Expand All @@ -134,15 +136,27 @@ public KafkaConnectorConfig setKafkaConnectTimeout(String kafkaConnectTimeout)
return this;
}

public DataSize getKafkaBufferSize()
public int getMaxPollRecords()
{
return maxPollRecords;
}

@Config("kafka.max-poll-records")
public KafkaConnectorConfig setMaxPollRecords(int maxPollRecords)
{
this.maxPollRecords = maxPollRecords;
return this;
}

public int getMaxPartitionFetchBytes()
{
return kafkaBufferSize;
return maxPartitionFetchBytes;
}

@Config("kafka.buffer-size")
public KafkaConnectorConfig setKafkaBufferSize(String kafkaBufferSize)
@Config("kafka.max-partition-fetch-bytes")
public KafkaConnectorConfig setMaxPartitionFetchBytes(int maxPartitionFetchBytes)
{
this.kafkaBufferSize = DataSize.valueOf(kafkaBufferSize);
this.maxPartitionFetchBytes = maxPartitionFetchBytes;
return this;
}

Expand Down
Loading