diff --git a/docs/src/main/sphinx/connector/kafka.rst b/docs/src/main/sphinx/connector/kafka.rst index f474a879233d..7028a4ac0427 100644 --- a/docs/src/main/sphinx/connector/kafka.rst +++ b/docs/src/main/sphinx/connector/kafka.rst @@ -86,6 +86,7 @@ Property name Description ``kafka.nodes`` List of nodes in the Kafka cluster. ``kafka.buffer-size`` Kafka read buffer size. ``kafka.hide-internal-columns`` Controls whether internal columns are part of the table schema or not. +``kafka.internal-column-prefix`` Prefix for internal columns, defaults to ``_`` ``kafka.messages-per-split`` Number of messages that are processed by each Trino split; defaults to ``100000``. ``kafka.timestamp-upper-bound-force-push-down-enabled`` Controls if upper bound timestamp pushdown is enabled for topics using ``CreateTime`` mode. ``kafka.security-protocol`` Security protocol for connection to Kafka cluster; defaults to ``PLAINTEXT``. @@ -228,6 +229,12 @@ This property is optional; default is ``https``. Internal columns ---------------- +The internal column prefix is configurable by ``kafka.internal-column-prefix`` +configuration property and defaults to ``_``. A different prefix affects the +internal column names in the following sections. For example, a value of +``internal_`` changes the partition ID column name from ``_partition_id`` +to ``internal_partition_id``. + For each defined table, the connector maintains the following columns: ======================= =============================== ============================= diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConfig.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConfig.java index 3c624e8bd435..a1292f4c8055 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConfig.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaConfig.java @@ -26,6 +26,7 @@ import io.trino.spi.HostAddress; import javax.validation.constraints.Min; +import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; import javax.validation.constraints.Size; @@ -50,6 +51,7 @@ public class KafkaConfig private boolean timestampUpperBoundPushDownEnabled; private String tableDescriptionSupplier = FileTableDescriptionSupplier.NAME; private List resourceConfigFiles = ImmutableList.of(); + private String internalFieldPrefix = "_"; @Size(min = 1) public Set getNodes() @@ -174,4 +176,18 @@ public KafkaConfig setResourceConfigFiles(List files) .collect(toImmutableList()); return this; } + + @NotEmpty + public String getInternalFieldPrefix() + { + return internalFieldPrefix; + } + + @Config("kafka.internal-column-prefix") + @ConfigDescription("Prefix for internal columns") + public KafkaConfig setInternalFieldPrefix(String internalFieldPrefix) + { + this.internalFieldPrefix = internalFieldPrefix; + return this; + } } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaFilterManager.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaFilterManager.java index aed71a9f4338..dd1bc19c6bad 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaFilterManager.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaFilterManager.java @@ -15,6 +15,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; +import io.trino.plugin.kafka.KafkaInternalFieldManager.InternalFieldId; import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorSession; @@ -45,12 +46,13 @@ import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.getOnlyElement; import static io.trino.plugin.kafka.KafkaErrorCode.KAFKA_SPLIT_ERROR; -import static io.trino.plugin.kafka.KafkaInternalFieldManager.OFFSET_TIMESTAMP_FIELD; -import static io.trino.plugin.kafka.KafkaInternalFieldManager.PARTITION_ID_FIELD; -import static io.trino.plugin.kafka.KafkaInternalFieldManager.PARTITION_OFFSET_FIELD; +import static io.trino.plugin.kafka.KafkaInternalFieldManager.InternalFieldId.OFFSET_TIMESTAMP_FIELD; +import static io.trino.plugin.kafka.KafkaInternalFieldManager.InternalFieldId.PARTITION_ID_FIELD; +import static io.trino.plugin.kafka.KafkaInternalFieldManager.InternalFieldId.PARTITION_OFFSET_FIELD; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND; import static java.lang.Math.floorDiv; @@ -65,12 +67,14 @@ public class KafkaFilterManager private final KafkaConsumerFactory consumerFactory; private final KafkaAdminFactory adminFactory; + private final KafkaInternalFieldManager kafkaInternalFieldManager; @Inject - public KafkaFilterManager(KafkaConsumerFactory consumerFactory, KafkaAdminFactory adminFactory) + public KafkaFilterManager(KafkaConsumerFactory consumerFactory, KafkaAdminFactory adminFactory, KafkaInternalFieldManager kafkaInternalFieldManager) { this.consumerFactory = requireNonNull(consumerFactory, "consumerFactory is null"); this.adminFactory = requireNonNull(adminFactory, "adminFactory is null"); + this.kafkaInternalFieldManager = requireNonNull(kafkaInternalFieldManager, "kafkaInternalFieldManager is null"); } public KafkaFilteringResult getKafkaFilterResult( @@ -91,30 +95,20 @@ public KafkaFilteringResult getKafkaFilterResult( if (!constraint.isAll()) { Set partitionIds = partitionInfos.stream().map(partitionInfo -> (long) partitionInfo.partition()).collect(toImmutableSet()); - Optional offsetRanged = Optional.empty(); - Optional offsetTimestampRanged = Optional.empty(); - Set partitionIdsFiltered = partitionIds; - Optional> domains = constraint.getDomains(); - for (Map.Entry entry : domains.get().entrySet()) { - KafkaColumnHandle columnHandle = (KafkaColumnHandle) entry.getKey(); - if (!columnHandle.isInternal()) { - continue; - } - switch (columnHandle.getName()) { - case PARTITION_OFFSET_FIELD: - offsetRanged = filterRangeByDomain(entry.getValue()); - break; - case PARTITION_ID_FIELD: - partitionIdsFiltered = filterValuesByDomain(entry.getValue(), partitionIds); - break; - case OFFSET_TIMESTAMP_FIELD: - offsetTimestampRanged = filterRangeByDomain(entry.getValue()); - break; - default: - break; - } - } + Map domains = constraint.getDomains().orElseThrow() + .entrySet().stream() + .collect(toImmutableMap( + entry -> ((KafkaColumnHandle) entry.getKey()).getName(), + Map.Entry::getValue)); + + Optional offsetRanged = getDomain(PARTITION_OFFSET_FIELD, domains) + .flatMap(KafkaFilterManager::filterRangeByDomain); + Set partitionIdsFiltered = getDomain(PARTITION_ID_FIELD, domains) + .map(domain -> filterValuesByDomain(domain, partitionIds)) + .orElse(partitionIds); + Optional offsetTimestampRanged = getDomain(OFFSET_TIMESTAMP_FIELD, domains) + .flatMap(KafkaFilterManager::filterRangeByDomain); // push down offset if (offsetRanged.isPresent()) { @@ -128,31 +122,35 @@ public KafkaFilteringResult getKafkaFilterResult( // push down timestamp if possible if (offsetTimestampRanged.isPresent()) { try (KafkaConsumer kafkaConsumer = consumerFactory.create(session)) { - Optional finalOffsetTimestampRanged = offsetTimestampRanged; // filter negative value to avoid java.lang.IllegalArgumentException when using KafkaConsumer offsetsForTimes if (offsetTimestampRanged.get().getBegin() > INVALID_KAFKA_RANGE_INDEX) { partitionBeginOffsets = overridePartitionBeginOffsets(partitionBeginOffsets, - partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, finalOffsetTimestampRanged.get().getBegin())); + partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, offsetTimestampRanged.get().getBegin())); } if (isTimestampUpperBoundPushdownEnabled(session, kafkaTableHandle.getTopicName())) { if (offsetTimestampRanged.get().getEnd() > INVALID_KAFKA_RANGE_INDEX) { partitionEndOffsets = overridePartitionEndOffsets(partitionEndOffsets, - partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, finalOffsetTimestampRanged.get().getEnd())); + partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, offsetTimestampRanged.get().getEnd())); } } } } // push down partitions - final Set finalPartitionIdsFiltered = partitionIdsFiltered; List partitionFilteredInfos = partitionInfos.stream() - .filter(partitionInfo -> finalPartitionIdsFiltered.contains((long) partitionInfo.partition())) + .filter(partitionInfo -> partitionIdsFiltered.contains((long) partitionInfo.partition())) .collect(toImmutableList()); return new KafkaFilteringResult(partitionFilteredInfos, partitionBeginOffsets, partitionEndOffsets); } return new KafkaFilteringResult(partitionInfos, partitionBeginOffsets, partitionEndOffsets); } + private Optional getDomain(InternalFieldId internalFieldId, Map columnNameToDomain) + { + String columnName = kafkaInternalFieldManager.getFieldById(internalFieldId).getColumnName(); + return Optional.ofNullable(columnNameToDomain.get(columnName)); + } + private boolean isTimestampUpperBoundPushdownEnabled(ConnectorSession session, String topic) { try (Admin adminClient = adminFactory.create(session)) { diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaInternalFieldManager.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaInternalFieldManager.java index 6fa7ef0c72cf..130ee5f24b0a 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaInternalFieldManager.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaInternalFieldManager.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.kafka; -import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.type.BigintType; @@ -21,9 +20,23 @@ import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Optional; - +import java.util.stream.Stream; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static io.trino.plugin.kafka.KafkaInternalFieldManager.InternalFieldId.HEADERS_FIELD; +import static io.trino.plugin.kafka.KafkaInternalFieldManager.InternalFieldId.KEY_CORRUPT_FIELD; +import static io.trino.plugin.kafka.KafkaInternalFieldManager.InternalFieldId.KEY_LENGTH_FIELD; +import static io.trino.plugin.kafka.KafkaInternalFieldManager.InternalFieldId.MESSAGE_CORRUPT_FIELD; +import static io.trino.plugin.kafka.KafkaInternalFieldManager.InternalFieldId.MESSAGE_FIELD; +import static io.trino.plugin.kafka.KafkaInternalFieldManager.InternalFieldId.MESSAGE_LENGTH_FIELD; +import static io.trino.plugin.kafka.KafkaInternalFieldManager.InternalFieldId.OFFSET_TIMESTAMP_FIELD; +import static io.trino.plugin.kafka.KafkaInternalFieldManager.InternalFieldId.PARTITION_ID_FIELD; +import static io.trino.plugin.kafka.KafkaInternalFieldManager.InternalFieldId.PARTITION_OFFSET_FIELD; import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS; import static io.trino.spi.type.TypeSignature.arrayType; import static io.trino.spi.type.TypeSignature.mapType; @@ -31,72 +44,83 @@ import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; import static java.util.Objects.requireNonNull; +import static java.util.function.Function.identity; public class KafkaInternalFieldManager { - /** - * _partition_id - Kafka partition id. - */ - public static final String PARTITION_ID_FIELD = "_partition_id"; - - /** - * _partition_offset - The current offset of the message in the partition. - */ - public static final String PARTITION_OFFSET_FIELD = "_partition_offset"; - - /** - * _message_corrupt - True if the row converter could not read the a message. May be null if the row converter does not set a value (e.g. the dummy row converter does not). - */ - public static final String MESSAGE_CORRUPT_FIELD = "_message_corrupt"; - - /** - * _message - Represents the full topic as a text column. Format is UTF-8 which may be wrong for some topics. TODO: make charset configurable. - */ - public static final String MESSAGE_FIELD = "_message"; - - /** - * _message_length - length in bytes of the message. - */ - public static final String MESSAGE_LENGTH_FIELD = "_message_length"; - - /** - * _headers - The header fields of the Kafka message. Key is a UTF-8 String and values an array of byte[]. - */ - public static final String HEADERS_FIELD = "_headers"; - - /** - * _key_corrupt - True if the row converter could not read the a key. May be null if the row converter does not set a value (e.g. the dummy row converter does not). - */ - public static final String KEY_CORRUPT_FIELD = "_key_corrupt"; - - /** - * _key - Represents the key as a text column. Format is UTF-8 which may be wrong for topics. TODO: make charset configurable. - */ - public static final String KEY_FIELD = "_key"; - - /** - * _key_length - length in bytes of the key. - */ - public static final String KEY_LENGTH_FIELD = "_key_length"; - - /** - * _timestamp - message timestamp - */ - public static final String OFFSET_TIMESTAMP_FIELD = "_timestamp"; + public enum InternalFieldId + { + /** + * Kafka partition id. + */ + PARTITION_ID_FIELD, + + /** + * The current offset of the message in the partition. + */ + PARTITION_OFFSET_FIELD, + + /** + * Field is true if the row converter could not read a message. May be null if the row converter does not set a value (e.g. the dummy row converter does not). + */ + MESSAGE_CORRUPT_FIELD, + + /** + * Represents the full topic as a text column. Format is UTF-8 which may be wrong for some topics. TODO: make charset configurable. + */ + MESSAGE_FIELD, + + /** + * length in bytes of the message. + */ + MESSAGE_LENGTH_FIELD, + + /** + * The header fields of the Kafka message. Key is a UTF-8 String and values an array of byte[]. + */ + HEADERS_FIELD, + + /** + * Field is true if the row converter could not read a key. May be null if the row converter does not set a value (e.g. the dummy row converter does not). + */ + KEY_CORRUPT_FIELD, + + /** + * Represents the key as a text column. Format is UTF-8 which may be wrong for topics. TODO: make charset configurable. + */ + KEY_FIELD, + + /** + * length in bytes of the key. + */ + KEY_LENGTH_FIELD, + + /** + * message timestamp + */ + OFFSET_TIMESTAMP_FIELD, + } public static class InternalField { + private final InternalFieldId internalFieldId; private final String columnName; private final String comment; private final Type type; - InternalField(String columnName, String comment, Type type) + InternalField(InternalFieldId internalFieldId, String columnName, String comment, Type type) { + this.internalFieldId = requireNonNull(internalFieldId, "internalFieldId is null"); this.columnName = requireNonNull(columnName, "columnName is null"); this.comment = requireNonNull(comment, "comment is null"); this.type = requireNonNull(type, "type is null"); } + public InternalFieldId getInternalFieldId() + { + return internalFieldId; + } + public String getColumnName() { return columnName; @@ -107,7 +131,7 @@ private Type getType() return type; } - KafkaColumnHandle getColumnHandle(int index, boolean hidden) + KafkaColumnHandle getColumnHandle(boolean hidden) { return new KafkaColumnHandle( getColumnName(), @@ -131,62 +155,87 @@ ColumnMetadata getColumnMetadata(boolean hidden) } } - private final Map internalFields; + private final Map fieldsByNames; + private final Map fieldsByIds; @Inject - public KafkaInternalFieldManager(TypeManager typeManager) + public KafkaInternalFieldManager(TypeManager typeManager, KafkaConfig kafkaConfig) { Type varcharMapType = typeManager.getType(mapType(VARCHAR.getTypeSignature(), arrayType(VARBINARY.getTypeSignature()))); - - internalFields = ImmutableMap.builder() - .put(PARTITION_ID_FIELD, new InternalField( - PARTITION_ID_FIELD, - "Partition Id", - BigintType.BIGINT)) - .put(PARTITION_OFFSET_FIELD, new InternalField( - PARTITION_OFFSET_FIELD, - "Offset for the message within the partition", - BigintType.BIGINT)) - .put(MESSAGE_CORRUPT_FIELD, new InternalField( - MESSAGE_CORRUPT_FIELD, - "Message data is corrupt", - BooleanType.BOOLEAN)) - .put(MESSAGE_FIELD, new InternalField( - MESSAGE_FIELD, - "Message text", - createUnboundedVarcharType())) - .put(HEADERS_FIELD, new InternalField( - HEADERS_FIELD, - "Headers of the message as map", - varcharMapType)) - .put(MESSAGE_LENGTH_FIELD, new InternalField( - MESSAGE_LENGTH_FIELD, - "Total number of message bytes", - BigintType.BIGINT)) - .put(KEY_CORRUPT_FIELD, new InternalField( - KEY_CORRUPT_FIELD, - "Key data is corrupt", - BooleanType.BOOLEAN)) - .put(KEY_FIELD, new InternalField( - KEY_FIELD, - "Key text", - createUnboundedVarcharType())) - .put(KEY_LENGTH_FIELD, new InternalField( - KEY_LENGTH_FIELD, - "Total number of key bytes", - BigintType.BIGINT)) - .put(OFFSET_TIMESTAMP_FIELD, new InternalField( - OFFSET_TIMESTAMP_FIELD, - "Message timestamp", - TIMESTAMP_MILLIS)) - .buildOrThrow(); + String prefix = kafkaConfig.getInternalFieldPrefix(); + List fields = Stream.of( + new InternalField( + PARTITION_ID_FIELD, + prefix + "partition_id", + "Partition Id", + BigintType.BIGINT), + new InternalField( + PARTITION_OFFSET_FIELD, + prefix + "partition_offset", + "Offset for the message within the partition", + BigintType.BIGINT), + new InternalField( + MESSAGE_CORRUPT_FIELD, + prefix + "message_corrupt", + "Message data is corrupt", + BooleanType.BOOLEAN), + new InternalField( + MESSAGE_FIELD, + prefix + "message", + "Message text", + createUnboundedVarcharType()), + new InternalField( + HEADERS_FIELD, + prefix + "headers", + "Headers of the message as map", + varcharMapType), + new InternalField( + MESSAGE_LENGTH_FIELD, + prefix + "message_length", + "Total number of message bytes", + BigintType.BIGINT), + new InternalField( + KEY_CORRUPT_FIELD, + prefix + "key_corrupt", + "Key data is corrupt", + BooleanType.BOOLEAN), + new InternalField( + InternalFieldId.KEY_FIELD, + prefix + "key", + "Key text", + createUnboundedVarcharType()), + new InternalField( + KEY_LENGTH_FIELD, + prefix + "key_length", + "Total number of key bytes", + BigintType.BIGINT), + new InternalField( + OFFSET_TIMESTAMP_FIELD, + prefix + "timestamp", + "Message timestamp", + TIMESTAMP_MILLIS)) + .collect(toImmutableList()); + fieldsByNames = fields.stream() + .collect(toImmutableMap(InternalField::getColumnName, identity())); + fieldsByIds = fields.stream() + .collect(toImmutableMap(InternalField::getInternalFieldId, identity())); } /** * @return Map of {@link InternalField} for each internal field. */ - public Map getInternalFields() + public Collection getInternalFields() + { + return fieldsByNames.values(); + } + + public InternalField getFieldByName(String name) + { + return fieldsByNames.get(name); + } + + public InternalField getFieldById(InternalFieldId id) { - return internalFields; + return fieldsByIds.get(id); } } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaMetadata.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaMetadata.java index e901ad04b0b3..f0a6a91e3344 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaMetadata.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaMetadata.java @@ -43,13 +43,19 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.Set; +import java.util.stream.Stream; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static io.trino.spi.StandardErrorCode.DUPLICATE_COLUMN_NAME; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.connector.RetryMode.NO_RETRIES; import static java.util.Objects.requireNonNull; +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toSet; +import static java.util.stream.Stream.concat; /** * Manages the Kafka connector specific metadata information. The Connector provides an additional set of columns @@ -132,33 +138,34 @@ private Map getColumnHandles(ConnectorSession session, Sch { KafkaTopicDescription kafkaTopicDescription = getRequiredTopicDescription(session, schemaTableName); - ImmutableMap.Builder columnHandles = ImmutableMap.builder(); + Stream keyColumnHandles = kafkaTopicDescription.getKey().stream() + .map(KafkaTopicFieldGroup::getFields) + .flatMap(Collection::stream) + .map(kafkaTopicFieldDescription -> kafkaTopicFieldDescription.getColumnHandle(true)); - AtomicInteger index = new AtomicInteger(0); + Stream messageColumnHandles = kafkaTopicDescription.getMessage().stream() + .map(KafkaTopicFieldGroup::getFields) + .flatMap(Collection::stream) + .map(kafkaTopicFieldDescription -> kafkaTopicFieldDescription.getColumnHandle(false)); - kafkaTopicDescription.getKey().ifPresent(key -> { - List fields = key.getFields(); - if (fields != null) { - for (KafkaTopicFieldDescription kafkaTopicFieldDescription : fields) { - columnHandles.put(kafkaTopicFieldDescription.getName(), kafkaTopicFieldDescription.getColumnHandle(true, index.getAndIncrement())); - } - } - }); + List topicColumnHandles = concat(keyColumnHandles, messageColumnHandles) + .collect(toImmutableList()); - kafkaTopicDescription.getMessage().ifPresent(message -> { - List fields = message.getFields(); - if (fields != null) { - for (KafkaTopicFieldDescription kafkaTopicFieldDescription : fields) { - columnHandles.put(kafkaTopicFieldDescription.getName(), kafkaTopicFieldDescription.getColumnHandle(false, index.getAndIncrement())); - } - } - }); + List internalColumnHandles = kafkaInternalFieldManager.getInternalFields().stream() + .map(kafkaInternalField -> kafkaInternalField.getColumnHandle(hideInternalColumns)) + .collect(toImmutableList()); - for (KafkaInternalFieldManager.InternalField kafkaInternalField : kafkaInternalFieldManager.getInternalFields().values()) { - columnHandles.put(kafkaInternalField.getColumnName(), kafkaInternalField.getColumnHandle(index.getAndIncrement(), hideInternalColumns)); + Set conflictingColumns = topicColumnHandles.stream().map(KafkaColumnHandle::getName).collect(toSet()); + conflictingColumns.retainAll(internalColumnHandles.stream().map(KafkaColumnHandle::getName).collect(toSet())); + if (!conflictingColumns.isEmpty()) { + throw new TrinoException(DUPLICATE_COLUMN_NAME, "Internal Kafka column names conflict with column names from the table. " + + "Consider changing kafka.internal-column-prefix configuration property. " + + "topic=" + schemaTableName + + ", Conflicting names=" + conflictingColumns); } - return columnHandles.buildOrThrow(); + return concat(topicColumnHandles.stream(), internalColumnHandles.stream()) + .collect(toImmutableMap(KafkaColumnHandle::getName, identity())); } @Override @@ -217,7 +224,7 @@ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, Schema } }); - for (KafkaInternalFieldManager.InternalField fieldDescription : kafkaInternalFieldManager.getInternalFields().values()) { + for (KafkaInternalFieldManager.InternalField fieldDescription : kafkaInternalFieldManager.getInternalFields()) { builder.add(fieldDescription.getColumnMetadata(hideInternalColumns)); } diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaRecordSet.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaRecordSet.java index 753221f69ea8..f74948922ce7 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaRecordSet.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaRecordSet.java @@ -35,7 +35,6 @@ import org.apache.kafka.common.header.Headers; import java.time.Duration; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -46,21 +45,13 @@ import static io.trino.decoder.FieldValueProviders.booleanValueProvider; import static io.trino.decoder.FieldValueProviders.bytesValueProvider; import static io.trino.decoder.FieldValueProviders.longValueProvider; -import static io.trino.plugin.kafka.KafkaInternalFieldManager.HEADERS_FIELD; -import static io.trino.plugin.kafka.KafkaInternalFieldManager.KEY_CORRUPT_FIELD; -import static io.trino.plugin.kafka.KafkaInternalFieldManager.KEY_FIELD; -import static io.trino.plugin.kafka.KafkaInternalFieldManager.KEY_LENGTH_FIELD; -import static io.trino.plugin.kafka.KafkaInternalFieldManager.MESSAGE_CORRUPT_FIELD; -import static io.trino.plugin.kafka.KafkaInternalFieldManager.MESSAGE_FIELD; -import static io.trino.plugin.kafka.KafkaInternalFieldManager.MESSAGE_LENGTH_FIELD; -import static io.trino.plugin.kafka.KafkaInternalFieldManager.OFFSET_TIMESTAMP_FIELD; -import static io.trino.plugin.kafka.KafkaInternalFieldManager.PARTITION_ID_FIELD; -import static io.trino.plugin.kafka.KafkaInternalFieldManager.PARTITION_OFFSET_FIELD; import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND; import static io.trino.spi.type.TypeUtils.writeNativeValue; import static java.lang.Math.max; import static java.util.Collections.emptyIterator; import static java.util.Objects.requireNonNull; +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; public class KafkaRecordSet implements RecordSet @@ -74,6 +65,7 @@ public class KafkaRecordSet private final ConnectorSession connectorSession; private final RowDecoder keyDecoder; private final RowDecoder messageDecoder; + private final KafkaInternalFieldManager kafkaInternalFieldManager; private final List columnHandles; private final List columnTypes; @@ -84,7 +76,8 @@ public class KafkaRecordSet ConnectorSession connectorSession, List columnHandles, RowDecoder keyDecoder, - RowDecoder messageDecoder) + RowDecoder messageDecoder, + KafkaInternalFieldManager kafkaInternalFieldManager) { this.split = requireNonNull(split, "split is null"); this.consumerFactory = requireNonNull(consumerFactory, "consumerFactory is null"); @@ -94,6 +87,7 @@ public class KafkaRecordSet this.messageDecoder = requireNonNull(messageDecoder, "messageDecoder is null"); this.columnHandles = requireNonNull(columnHandles, "columnHandles is null"); + this.kafkaInternalFieldManager = requireNonNull(kafkaInternalFieldManager, "kafkaInternalFieldManager is null"); this.columnTypes = columnHandles.stream() .map(KafkaColumnHandle::getType) @@ -152,15 +146,14 @@ public Type getType(int field) @Override public boolean advanceNextPosition() { - if (!records.hasNext()) { - if (kafkaConsumer.position(topicPartition) >= split.getMessagesRange().getEnd()) { - return false; - } - records = kafkaConsumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT)).iterator(); - return advanceNextPosition(); + if (records.hasNext()) { + return nextRow(records.next()); } - - return nextRow(records.next()); + if (kafkaConsumer.position(topicPartition) >= split.getMessagesRange().getEnd()) { + return false; + } + records = kafkaConsumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT)).iterator(); + return advanceNextPosition(); } private boolean nextRow(ConsumerRecord message) @@ -173,61 +166,27 @@ private boolean nextRow(ConsumerRecord message) completedBytes += max(message.serializedKeySize(), 0) + max(message.serializedValueSize(), 0); - byte[] keyData = EMPTY_BYTE_ARRAY; - if (message.key() != null) { - keyData = message.key(); - } - - byte[] messageData = EMPTY_BYTE_ARRAY; - if (message.value() != null) { - messageData = message.value(); - } - long timeStamp = message.timestamp(); - - Map currentRowValuesMap = new HashMap<>(); + byte[] keyData = message.key() == null ? EMPTY_BYTE_ARRAY : message.key(); + byte[] messageData = message.value() == null ? EMPTY_BYTE_ARRAY : message.value(); + long timeStamp = message.timestamp() * MICROSECONDS_PER_MILLISECOND; Optional> decodedKey = keyDecoder.decodeRow(keyData); Optional> decodedValue = messageDecoder.decodeRow(messageData); - for (DecoderColumnHandle columnHandle : columnHandles) { - if (columnHandle.isInternal()) { - switch (columnHandle.getName()) { - case PARTITION_OFFSET_FIELD: - currentRowValuesMap.put(columnHandle, longValueProvider(message.offset())); - break; - case MESSAGE_FIELD: - currentRowValuesMap.put(columnHandle, bytesValueProvider(messageData)); - break; - case MESSAGE_LENGTH_FIELD: - currentRowValuesMap.put(columnHandle, longValueProvider(messageData.length)); - break; - case KEY_FIELD: - currentRowValuesMap.put(columnHandle, bytesValueProvider(keyData)); - break; - case KEY_LENGTH_FIELD: - currentRowValuesMap.put(columnHandle, longValueProvider(keyData.length)); - break; - case OFFSET_TIMESTAMP_FIELD: - timeStamp *= MICROSECONDS_PER_MILLISECOND; - currentRowValuesMap.put(columnHandle, longValueProvider(timeStamp)); - break; - case KEY_CORRUPT_FIELD: - currentRowValuesMap.put(columnHandle, booleanValueProvider(decodedKey.isEmpty())); - break; - case HEADERS_FIELD: - currentRowValuesMap.put(columnHandle, headerMapValueProvider((MapType) columnHandle.getType(), message.headers())); - break; - case MESSAGE_CORRUPT_FIELD: - currentRowValuesMap.put(columnHandle, booleanValueProvider(decodedValue.isEmpty())); - break; - case PARTITION_ID_FIELD: - currentRowValuesMap.put(columnHandle, longValueProvider(message.partition())); - break; - default: - throw new IllegalArgumentException("unknown internal field " + columnHandle.getName()); - } - } - } + Map currentRowValuesMap = columnHandles.stream() + .filter(KafkaColumnHandle::isInternal) + .collect(toMap(identity(), columnHandle -> switch (getInternalFieldId(columnHandle)) { + case PARTITION_OFFSET_FIELD -> longValueProvider(message.offset()); + case MESSAGE_FIELD -> bytesValueProvider(messageData); + case MESSAGE_LENGTH_FIELD -> longValueProvider(messageData.length); + case KEY_FIELD -> bytesValueProvider(keyData); + case KEY_LENGTH_FIELD -> longValueProvider(keyData.length); + case OFFSET_TIMESTAMP_FIELD -> longValueProvider(timeStamp); + case KEY_CORRUPT_FIELD -> booleanValueProvider(decodedKey.isEmpty()); + case HEADERS_FIELD -> headerMapValueProvider((MapType) columnHandle.getType(), message.headers()); + case MESSAGE_CORRUPT_FIELD -> booleanValueProvider(decodedValue.isEmpty()); + case PARTITION_ID_FIELD -> longValueProvider(message.partition()); + })); decodedKey.ifPresent(currentRowValuesMap::putAll); decodedValue.ifPresent(currentRowValuesMap::putAll); @@ -240,6 +199,11 @@ private boolean nextRow(ConsumerRecord message) return true; // Advanced successfully. } + private KafkaInternalFieldManager.InternalFieldId getInternalFieldId(KafkaColumnHandle columnHandle) + { + return kafkaInternalFieldManager.getFieldByName(columnHandle.getName()).getInternalFieldId(); + } + @Override public boolean getBoolean(int field) { diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaRecordSetProvider.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaRecordSetProvider.java index c333326d8990..a12970fd1142 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaRecordSetProvider.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaRecordSetProvider.java @@ -39,12 +39,14 @@ public class KafkaRecordSetProvider { private final DispatchingRowDecoderFactory decoderFactory; private final KafkaConsumerFactory consumerFactory; + private final KafkaInternalFieldManager kafkaInternalFieldManager; @Inject - public KafkaRecordSetProvider(DispatchingRowDecoderFactory decoderFactory, KafkaConsumerFactory consumerFactory) + public KafkaRecordSetProvider(DispatchingRowDecoderFactory decoderFactory, KafkaConsumerFactory consumerFactory, KafkaInternalFieldManager kafkaInternalFieldManager) { this.decoderFactory = requireNonNull(decoderFactory, "decoderFactory is null"); this.consumerFactory = requireNonNull(consumerFactory, "consumerFactory is null"); + this.kafkaInternalFieldManager = requireNonNull(kafkaInternalFieldManager, "kafkaInternalFieldManager is null"); } @Override @@ -72,7 +74,7 @@ public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorS .filter(col -> !col.isKeyCodec()) .collect(toImmutableSet())); - return new KafkaRecordSet(kafkaSplit, consumerFactory, session, kafkaColumns, keyDecoder, messageDecoder); + return new KafkaRecordSet(kafkaSplit, consumerFactory, session, kafkaColumns, keyDecoder, messageDecoder, kafkaInternalFieldManager); } private static Map getDecoderParameters(Optional dataSchema) diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaTopicFieldDescription.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaTopicFieldDescription.java index b660246c0182..48933801a541 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaTopicFieldDescription.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaTopicFieldDescription.java @@ -101,7 +101,7 @@ public boolean isHidden() return hidden; } - KafkaColumnHandle getColumnHandle(boolean keyCodec, int index) + KafkaColumnHandle getColumnHandle(boolean keyCodec) { return new KafkaColumnHandle( getName(), diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestInternalFieldConflict.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestInternalFieldConflict.java new file mode 100644 index 000000000000..5f5550c68ac3 --- /dev/null +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestInternalFieldConflict.java @@ -0,0 +1,71 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.kafka; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.trino.spi.connector.SchemaTableName; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import io.trino.testing.kafka.TestingKafka; +import org.testng.annotations.Test; + +import static io.trino.plugin.kafka.util.TestUtils.createDescription; +import static io.trino.plugin.kafka.util.TestUtils.createOneFieldDescription; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.DoubleType.DOUBLE; +import static io.trino.spi.type.VarcharType.createVarcharType; +import static io.trino.testing.sql.TestTable.randomTableSuffix; + +@Test(singleThreaded = true) +public class TestInternalFieldConflict + extends AbstractTestQueryFramework +{ + private SchemaTableName topicWithDefaultPrefixes; + private SchemaTableName topicWithCustomPrefixes; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + TestingKafka testingKafka = closeAfterClass(TestingKafka.create()); + topicWithDefaultPrefixes = new SchemaTableName("default", "test_" + randomTableSuffix()); + topicWithCustomPrefixes = new SchemaTableName("default", "test_" + randomTableSuffix()); + return KafkaQueryRunner.builder(testingKafka) + .setExtraTopicDescription(ImmutableMap.of( + topicWithDefaultPrefixes, createDescription( + topicWithDefaultPrefixes, + createOneFieldDescription("_key", createVarcharType(15)), + ImmutableList.of(createOneFieldDescription("custkey", BIGINT), createOneFieldDescription("acctbal", DOUBLE))), + topicWithCustomPrefixes, createDescription( + topicWithCustomPrefixes, + createOneFieldDescription("unpredictable_prefix_key", createVarcharType(15)), + ImmutableList.of(createOneFieldDescription("custkey", BIGINT), createOneFieldDescription("acctbal", DOUBLE))))) + .setExtraKafkaProperties(ImmutableMap.builder() + .put("kafka.internal-column-prefix", "unpredictable_prefix_") + .buildOrThrow()) + .build(); + } + + @Test + public void testInternalFieldPrefix() + { + assertQuery("SELECT count(*) FROM " + topicWithDefaultPrefixes, "VALUES 0"); + assertQueryFails("SELECT count(*) FROM " + topicWithCustomPrefixes, "" + + "Internal Kafka column names conflict with column names from the table. " + + "Consider changing kafka.internal-column-prefix configuration property. " + + "topic=" + topicWithCustomPrefixes + + ", Conflicting names=\\[unpredictable_prefix_key]"); + } +} diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConfig.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConfig.java index 5b04622c5db0..004890a711ec 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConfig.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConfig.java @@ -41,7 +41,8 @@ public void testDefaults() .setHideInternalColumns(true) .setMessagesPerSplit(100_000) .setTimestampUpperBoundPushDownEnabled(false) - .setResourceConfigFiles(List.of())); + .setResourceConfigFiles(List.of()) + .setInternalFieldPrefix("_")); } @Test @@ -60,6 +61,7 @@ public void testExplicitPropertyMappings() .put("kafka.messages-per-split", "1") .put("kafka.timestamp-upper-bound-force-push-down-enabled", "true") .put("kafka.config.resources", resource1.toString() + "," + resource2.toString()) + .put("kafka.internal-column-prefix", "the_most_unexpected_prefix_") .buildOrThrow(); KafkaConfig expected = new KafkaConfig() @@ -70,7 +72,8 @@ public void testExplicitPropertyMappings() .setHideInternalColumns(false) .setMessagesPerSplit(1) .setTimestampUpperBoundPushDownEnabled(true) - .setResourceConfigFiles(ImmutableList.of(resource1.toString(), resource2.toString())); + .setResourceConfigFiles(ImmutableList.of(resource1.toString(), resource2.toString())) + .setInternalFieldPrefix("the_most_unexpected_prefix_"); assertFullMapping(properties, expected); } diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConnectorTest.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConnectorTest.java index 1d7442122652..61d0d89d2ce2 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConnectorTest.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaConnectorTest.java @@ -45,6 +45,9 @@ import static io.trino.plugin.kafka.encoder.json.format.DateTimeFormat.MILLISECONDS_SINCE_EPOCH; import static io.trino.plugin.kafka.encoder.json.format.DateTimeFormat.RFC2822; import static io.trino.plugin.kafka.encoder.json.format.DateTimeFormat.SECONDS_SINCE_EPOCH; +import static io.trino.plugin.kafka.util.TestUtils.createDescription; +import static io.trino.plugin.kafka.util.TestUtils.createFieldGroup; +import static io.trino.plugin.kafka.util.TestUtils.createOneFieldDescription; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.DateType.DATE; @@ -90,6 +93,7 @@ public class TestKafkaConnectorTest private static final SchemaTableName TABLE_INSERT_UNICODE_2 = new SchemaTableName("write_test", "test_unicode_2_" + randomTableSuffix()); private static final SchemaTableName TABLE_INSERT_UNICODE_3 = new SchemaTableName("write_test", "test_unicode_3_" + randomTableSuffix()); private static final SchemaTableName TABLE_INSERT_HIGHEST_UNICODE = new SchemaTableName("write_test", "test_highest_unicode_" + randomTableSuffix()); + private static final SchemaTableName TABLE_INTERNAL_FIELD_PREFIX = new SchemaTableName("write_test", "test_internal_fields_prefix_" + randomTableSuffix()); @Override protected QueryRunner createQueryRunner() @@ -145,6 +149,10 @@ protected QueryRunner createQueryRunner() TABLE_INSERT_HIGHEST_UNICODE, createOneFieldDescription("key", BIGINT), ImmutableList.of(createOneFieldDescription("test", createVarcharType(50))))) + .put(TABLE_INTERNAL_FIELD_PREFIX, createDescription( + TABLE_INTERNAL_FIELD_PREFIX, + createOneFieldDescription("_key", createVarcharType(15)), + ImmutableList.of(createOneFieldDescription("custkey", BIGINT), createOneFieldDescription("acctbal", DOUBLE)))) .buildOrThrow(); QueryRunner queryRunner = KafkaQueryRunner.builder(testingKafka) @@ -183,6 +191,16 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) } } + @Test + public void testInternalFieldPrefix() + { + assertQueryFails("SELECT count(*) FROM " + TABLE_INTERNAL_FIELD_PREFIX, "" + + "Internal Kafka column names conflict with column names from the table. " + + "Consider changing kafka.internal-column-prefix configuration property. " + + "topic=" + TABLE_INTERNAL_FIELD_PREFIX + + ", Conflicting names=\\[_key]"); + } + @Override protected TestTable createTableWithDefaultColumns() { @@ -465,47 +483,6 @@ public void testInsertRowConcurrently() throw new SkipException("TODO Prepare a topic in Kafka and enable this test"); } - private static KafkaTopicDescription createDescription(SchemaTableName schemaTableName, KafkaTopicFieldDescription key, List fields) - { - return new KafkaTopicDescription( - schemaTableName.getTableName(), - Optional.of(schemaTableName.getSchemaName()), - schemaTableName.getTableName(), - Optional.of(new KafkaTopicFieldGroup("json", Optional.empty(), Optional.empty(), ImmutableList.of(key))), - Optional.of(new KafkaTopicFieldGroup("json", Optional.empty(), Optional.empty(), fields))); - } - - private static KafkaTopicDescription createDescription(String name, String schema, String topic, Optional message) - { - return new KafkaTopicDescription(name, Optional.of(schema), topic, Optional.empty(), message); - } - - private static Optional createFieldGroup(String dataFormat, List fields) - { - return Optional.of(new KafkaTopicFieldGroup(dataFormat, Optional.empty(), Optional.empty(), fields)); - } - - private static KafkaTopicFieldDescription createOneFieldDescription(String name, Type type) - { - return new KafkaTopicFieldDescription(name, type, name, null, null, null, false); - } - - private static KafkaTopicFieldDescription createOneFieldDescription(String name, Type type, String dataFormat) - { - return new KafkaTopicFieldDescription(name, type, name, null, dataFormat, null, false); - } - - private static KafkaTopicFieldDescription createOneFieldDescription(String name, Type type, String dataFormat, Optional formatHint) - { - return formatHint.map(s -> new KafkaTopicFieldDescription(name, type, name, null, dataFormat, s, false)) - .orElseGet(() -> new KafkaTopicFieldDescription(name, type, name, null, dataFormat, null, false)); - } - - private static KafkaTopicFieldDescription createOneFieldDescription(String name, Type type, String mapping, String dataFormat) - { - return new KafkaTopicFieldDescription(name, type, mapping, null, dataFormat, null, false); - } - @Test public void testKafkaHeaders() { diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaInternalFieldManager.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaInternalFieldManager.java index a6a04c33b8c7..465b1a6b68f1 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaInternalFieldManager.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaInternalFieldManager.java @@ -19,7 +19,7 @@ import java.util.Optional; -import static io.trino.plugin.kafka.KafkaInternalFieldManager.PARTITION_ID_FIELD; +import static io.trino.plugin.kafka.KafkaInternalFieldManager.InternalFieldId.PARTITION_ID_FIELD; import static org.assertj.core.api.Assertions.assertThat; public class TestKafkaInternalFieldManager @@ -30,12 +30,13 @@ public void testInternalField() KafkaInternalFieldManager.InternalField internalField = new KafkaInternalFieldManager.InternalField( PARTITION_ID_FIELD, + "internal_field_name", "Partition Id", BigintType.BIGINT); KafkaColumnHandle kafkaColumnHandle = new KafkaColumnHandle( - PARTITION_ID_FIELD, + "internal_field_name", BigintType.BIGINT, null, null, @@ -46,14 +47,15 @@ public void testInternalField() ColumnMetadata columnMetadata = ColumnMetadata.builder() - .setName(PARTITION_ID_FIELD) + .setName("internal_field_name") .setType(BigintType.BIGINT) .setComment(Optional.of("Partition Id")) .setHidden(false) .build(); - assertThat(internalField.getColumnName()).isEqualTo(PARTITION_ID_FIELD); - assertThat(internalField.getColumnHandle(0, false)).isEqualTo(kafkaColumnHandle); + assertThat(internalField.getInternalFieldId()).isEqualTo(PARTITION_ID_FIELD); + assertThat(internalField.getColumnName()).isEqualTo("internal_field_name"); + assertThat(internalField.getColumnHandle(false)).isEqualTo(kafkaColumnHandle); assertThat(internalField.getColumnMetadata(false)).isEqualTo(columnMetadata); } } diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/util/TestUtils.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/util/TestUtils.java index 9bac9a3f6550..c9dfca3678d8 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/util/TestUtils.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/util/TestUtils.java @@ -13,13 +13,18 @@ */ package io.trino.plugin.kafka.util; +import com.google.common.collect.ImmutableList; import com.google.common.io.ByteStreams; import io.airlift.json.JsonCodec; import io.trino.plugin.kafka.KafkaTopicDescription; +import io.trino.plugin.kafka.KafkaTopicFieldDescription; +import io.trino.plugin.kafka.KafkaTopicFieldGroup; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.type.Type; import java.io.IOException; import java.util.AbstractMap; +import java.util.List; import java.util.Map; import java.util.Optional; @@ -45,4 +50,45 @@ public static Map.Entry createEmptyTopic schemaTableName, new KafkaTopicDescription(schemaTableName.getTableName(), Optional.of(schemaTableName.getSchemaName()), topicName, Optional.empty(), Optional.empty())); } + + public static KafkaTopicDescription createDescription(SchemaTableName schemaTableName, KafkaTopicFieldDescription key, List fields) + { + return new KafkaTopicDescription( + schemaTableName.getTableName(), + Optional.of(schemaTableName.getSchemaName()), + schemaTableName.getTableName(), + Optional.of(new KafkaTopicFieldGroup("json", Optional.empty(), Optional.empty(), ImmutableList.of(key))), + Optional.of(new KafkaTopicFieldGroup("json", Optional.empty(), Optional.empty(), fields))); + } + + public static KafkaTopicDescription createDescription(String name, String schema, String topic, Optional message) + { + return new KafkaTopicDescription(name, Optional.of(schema), topic, Optional.empty(), message); + } + + public static Optional createFieldGroup(String dataFormat, List fields) + { + return Optional.of(new KafkaTopicFieldGroup(dataFormat, Optional.empty(), Optional.empty(), fields)); + } + + public static KafkaTopicFieldDescription createOneFieldDescription(String name, Type type) + { + return new KafkaTopicFieldDescription(name, type, name, null, null, null, false); + } + + public static KafkaTopicFieldDescription createOneFieldDescription(String name, Type type, String dataFormat) + { + return new KafkaTopicFieldDescription(name, type, name, null, dataFormat, null, false); + } + + public static KafkaTopicFieldDescription createOneFieldDescription(String name, Type type, String dataFormat, Optional formatHint) + { + return formatHint.map(s -> new KafkaTopicFieldDescription(name, type, name, null, dataFormat, s, false)) + .orElseGet(() -> new KafkaTopicFieldDescription(name, type, name, null, dataFormat, null, false)); + } + + public static KafkaTopicFieldDescription createOneFieldDescription(String name, Type type, String mapping, String dataFormat) + { + return new KafkaTopicFieldDescription(name, type, mapping, null, dataFormat, null, false); + } }