Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/src/main/sphinx/connector/kafka.rst
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ Property name Description
``kafka.nodes`` List of nodes in the Kafka cluster.
``kafka.buffer-size`` Kafka read buffer size.
``kafka.hide-internal-columns`` Controls whether internal columns are part of the table schema or not.
``kafka.internal-column-prefix`` Prefix for internal columns, defaults to ``_``
``kafka.messages-per-split`` Number of messages that are processed by each Trino split; defaults to ``100000``.
``kafka.timestamp-upper-bound-force-push-down-enabled`` Controls if upper bound timestamp pushdown is enabled for topics using ``CreateTime`` mode.
``kafka.security-protocol`` Security protocol for connection to Kafka cluster; defaults to ``PLAINTEXT``.
Expand Down Expand Up @@ -228,6 +229,12 @@ This property is optional; default is ``https``.
Internal columns
----------------

The internal column prefix is configurable by ``kafka.internal-column-prefix``
configuration property and defaults to ``_``. A different prefix affects the
internal column names in the following sections. For example, a value of
``internal_`` changes the partition ID column name from ``_partition_id``
to ``internal_partition_id``.

For each defined table, the connector maintains the following columns:

======================= =============================== =============================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.trino.spi.HostAddress;

import javax.validation.constraints.Min;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;

Expand All @@ -50,6 +51,7 @@ public class KafkaConfig
private boolean timestampUpperBoundPushDownEnabled;
private String tableDescriptionSupplier = FileTableDescriptionSupplier.NAME;
private List<File> resourceConfigFiles = ImmutableList.of();
private String internalFieldPrefix = "_";

@Size(min = 1)
public Set<HostAddress> getNodes()
Expand Down Expand Up @@ -174,4 +176,18 @@ public KafkaConfig setResourceConfigFiles(List<String> files)
.collect(toImmutableList());
return this;
}

@NotEmpty
public String getInternalFieldPrefix()
{
return internalFieldPrefix;
}

@Config("kafka.internal-column-prefix")
@ConfigDescription("Prefix for internal columns")
Comment thread
hashhar marked this conversation as resolved.
Outdated
public KafkaConfig setInternalFieldPrefix(String internalFieldPrefix)
{
this.internalFieldPrefix = internalFieldPrefix;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.trino.plugin.kafka.KafkaInternalFieldManager.InternalFieldId;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorSession;
Expand Down Expand Up @@ -45,12 +46,13 @@

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.kafka.KafkaErrorCode.KAFKA_SPLIT_ERROR;
import static io.trino.plugin.kafka.KafkaInternalFieldManager.OFFSET_TIMESTAMP_FIELD;
import static io.trino.plugin.kafka.KafkaInternalFieldManager.PARTITION_ID_FIELD;
import static io.trino.plugin.kafka.KafkaInternalFieldManager.PARTITION_OFFSET_FIELD;
import static io.trino.plugin.kafka.KafkaInternalFieldManager.InternalFieldId.OFFSET_TIMESTAMP_FIELD;
import static io.trino.plugin.kafka.KafkaInternalFieldManager.InternalFieldId.PARTITION_ID_FIELD;
import static io.trino.plugin.kafka.KafkaInternalFieldManager.InternalFieldId.PARTITION_OFFSET_FIELD;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND;
import static java.lang.Math.floorDiv;
Expand All @@ -65,12 +67,14 @@ public class KafkaFilterManager

private final KafkaConsumerFactory consumerFactory;
private final KafkaAdminFactory adminFactory;
private final KafkaInternalFieldManager kafkaInternalFieldManager;

@Inject
public KafkaFilterManager(KafkaConsumerFactory consumerFactory, KafkaAdminFactory adminFactory)
public KafkaFilterManager(KafkaConsumerFactory consumerFactory, KafkaAdminFactory adminFactory, KafkaInternalFieldManager kafkaInternalFieldManager)
{
this.consumerFactory = requireNonNull(consumerFactory, "consumerFactory is null");
this.adminFactory = requireNonNull(adminFactory, "adminFactory is null");
this.kafkaInternalFieldManager = requireNonNull(kafkaInternalFieldManager, "kafkaInternalFieldManager is null");
}

public KafkaFilteringResult getKafkaFilterResult(
Expand All @@ -91,30 +95,20 @@ public KafkaFilteringResult getKafkaFilterResult(

if (!constraint.isAll()) {
Set<Long> partitionIds = partitionInfos.stream().map(partitionInfo -> (long) partitionInfo.partition()).collect(toImmutableSet());
Optional<Range> offsetRanged = Optional.empty();
Optional<Range> offsetTimestampRanged = Optional.empty();
Set<Long> partitionIdsFiltered = partitionIds;
Optional<Map<ColumnHandle, Domain>> domains = constraint.getDomains();

for (Map.Entry<ColumnHandle, Domain> entry : domains.get().entrySet()) {
KafkaColumnHandle columnHandle = (KafkaColumnHandle) entry.getKey();
if (!columnHandle.isInternal()) {
continue;
}
switch (columnHandle.getName()) {
case PARTITION_OFFSET_FIELD:
offsetRanged = filterRangeByDomain(entry.getValue());
break;
case PARTITION_ID_FIELD:
partitionIdsFiltered = filterValuesByDomain(entry.getValue(), partitionIds);
break;
case OFFSET_TIMESTAMP_FIELD:
offsetTimestampRanged = filterRangeByDomain(entry.getValue());
break;
default:
break;
}
}
Map<String, Domain> domains = constraint.getDomains().orElseThrow()
.entrySet().stream()
.collect(toImmutableMap(
entry -> ((KafkaColumnHandle) entry.getKey()).getName(),
Map.Entry::getValue));

Optional<Range> offsetRanged = getDomain(PARTITION_OFFSET_FIELD, domains)
.flatMap(KafkaFilterManager::filterRangeByDomain);
Set<Long> partitionIdsFiltered = getDomain(PARTITION_ID_FIELD, domains)
.map(domain -> filterValuesByDomain(domain, partitionIds))
.orElse(partitionIds);
Optional<Range> offsetTimestampRanged = getDomain(OFFSET_TIMESTAMP_FIELD, domains)
.flatMap(KafkaFilterManager::filterRangeByDomain);

// push down offset
if (offsetRanged.isPresent()) {
Expand All @@ -128,31 +122,35 @@ public KafkaFilteringResult getKafkaFilterResult(
// push down timestamp if possible
if (offsetTimestampRanged.isPresent()) {
try (KafkaConsumer<byte[], byte[]> kafkaConsumer = consumerFactory.create(session)) {
Optional<Range> finalOffsetTimestampRanged = offsetTimestampRanged;
// filter negative value to avoid java.lang.IllegalArgumentException when using KafkaConsumer offsetsForTimes
if (offsetTimestampRanged.get().getBegin() > INVALID_KAFKA_RANGE_INDEX) {
partitionBeginOffsets = overridePartitionBeginOffsets(partitionBeginOffsets,
partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, finalOffsetTimestampRanged.get().getBegin()));
partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, offsetTimestampRanged.get().getBegin()));
}
if (isTimestampUpperBoundPushdownEnabled(session, kafkaTableHandle.getTopicName())) {
if (offsetTimestampRanged.get().getEnd() > INVALID_KAFKA_RANGE_INDEX) {
partitionEndOffsets = overridePartitionEndOffsets(partitionEndOffsets,
partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, finalOffsetTimestampRanged.get().getEnd()));
partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, offsetTimestampRanged.get().getEnd()));
}
}
}
}

// push down partitions
final Set<Long> finalPartitionIdsFiltered = partitionIdsFiltered;
List<PartitionInfo> partitionFilteredInfos = partitionInfos.stream()
.filter(partitionInfo -> finalPartitionIdsFiltered.contains((long) partitionInfo.partition()))
.filter(partitionInfo -> partitionIdsFiltered.contains((long) partitionInfo.partition()))
.collect(toImmutableList());
return new KafkaFilteringResult(partitionFilteredInfos, partitionBeginOffsets, partitionEndOffsets);
}
return new KafkaFilteringResult(partitionInfos, partitionBeginOffsets, partitionEndOffsets);
}

private Optional<Domain> getDomain(InternalFieldId internalFieldId, Map<String, Domain> columnNameToDomain)
{
String columnName = kafkaInternalFieldManager.getFieldById(internalFieldId).getColumnName();
return Optional.ofNullable(columnNameToDomain.get(columnName));
}

private boolean isTimestampUpperBoundPushdownEnabled(ConnectorSession session, String topic)
{
try (Admin adminClient = adminFactory.create(session)) {
Expand Down
Loading