diff --git a/lib/trino-record-decoder/pom.xml b/lib/trino-record-decoder/pom.xml index 80e5716d2d3b..0ddeea13517d 100644 --- a/lib/trino-record-decoder/pom.xml +++ b/lib/trino-record-decoder/pom.xml @@ -33,12 +33,6 @@ jackson-databind - - com.google.code.findbugs - jsr305 - true - - com.google.guava guava diff --git a/lib/trino-record-decoder/src/main/java/io/trino/decoder/RowDecoder.java b/lib/trino-record-decoder/src/main/java/io/trino/decoder/RowDecoder.java index 37f5fa41e8a3..3f5c2f623a31 100644 --- a/lib/trino-record-decoder/src/main/java/io/trino/decoder/RowDecoder.java +++ b/lib/trino-record-decoder/src/main/java/io/trino/decoder/RowDecoder.java @@ -13,8 +13,6 @@ */ package io.trino.decoder; -import javax.annotation.Nullable; - import java.util.Map; import java.util.Optional; @@ -29,18 +27,5 @@ public interface RowDecoder * @param data The row data to decode. * @return Returns mapping from column handle to decoded value. Unmapped columns will be reported as null. Optional.empty() signals decoding error. */ - default Optional> decodeRow(byte[] data) - { - return decodeRow(data, null); - } - - /** - * Decodes a given sequence of bytes into field values. - * - * @param data The row data to decode. - * @param dataMap The row data as fields map - * @return Returns mapping from column handle to decoded value. Unmapped columns will be reported as null. Optional.empty() signals decoding error. - */ - // TODO This is Redis-specific, move to trino-redis - Optional> decodeRow(byte[] data, @Nullable Map dataMap); + Optional> decodeRow(byte[] data); } diff --git a/lib/trino-record-decoder/src/main/java/io/trino/decoder/avro/GenericRecordRowDecoder.java b/lib/trino-record-decoder/src/main/java/io/trino/decoder/avro/GenericRecordRowDecoder.java index d7fface9fc5f..a68f990f0fbd 100644 --- a/lib/trino-record-decoder/src/main/java/io/trino/decoder/avro/GenericRecordRowDecoder.java +++ b/lib/trino-record-decoder/src/main/java/io/trino/decoder/avro/GenericRecordRowDecoder.java @@ -44,7 +44,7 @@ public GenericRecordRowDecoder(AvroDeserializer deserializer, Set } @Override - public Optional> decodeRow(byte[] data, Map dataMap) + public Optional> decodeRow(byte[] data) { GenericRecord avroRecord = deserializer.deserialize(data); return Optional.of(columnDecoders.stream() diff --git a/lib/trino-record-decoder/src/main/java/io/trino/decoder/avro/SingleValueRowDecoder.java b/lib/trino-record-decoder/src/main/java/io/trino/decoder/avro/SingleValueRowDecoder.java index eb9493b4b0c9..014619bb2037 100644 --- a/lib/trino-record-decoder/src/main/java/io/trino/decoder/avro/SingleValueRowDecoder.java +++ b/lib/trino-record-decoder/src/main/java/io/trino/decoder/avro/SingleValueRowDecoder.java @@ -36,7 +36,7 @@ public SingleValueRowDecoder(AvroDeserializer deserializer, DecoderColum } @Override - public Optional> decodeRow(byte[] data, Map dataMap) + public Optional> decodeRow(byte[] data) { Object avroValue = deserializer.deserialize(data); return Optional.of(ImmutableMap.of(column, new AvroColumnDecoder.ObjectValueProvider(avroValue, column.getType(), column.getName()))); diff --git a/lib/trino-record-decoder/src/main/java/io/trino/decoder/csv/CsvRowDecoder.java b/lib/trino-record-decoder/src/main/java/io/trino/decoder/csv/CsvRowDecoder.java index 14293accd5aa..8ad3f0cd2550 100644 --- a/lib/trino-record-decoder/src/main/java/io/trino/decoder/csv/CsvRowDecoder.java +++ b/lib/trino-record-decoder/src/main/java/io/trino/decoder/csv/CsvRowDecoder.java @@ -51,7 +51,7 @@ private CsvColumnDecoder createColumnDecoder(DecoderColumnHandle columnHandle) } @Override - public Optional> decodeRow(byte[] data, Map dataMap) + public Optional> decodeRow(byte[] data) { String[] tokens; try { diff --git a/lib/trino-record-decoder/src/main/java/io/trino/decoder/dummy/DummyRowDecoder.java b/lib/trino-record-decoder/src/main/java/io/trino/decoder/dummy/DummyRowDecoder.java index 94d5095809e4..1d6e42b9ac6c 100644 --- a/lib/trino-record-decoder/src/main/java/io/trino/decoder/dummy/DummyRowDecoder.java +++ b/lib/trino-record-decoder/src/main/java/io/trino/decoder/dummy/DummyRowDecoder.java @@ -39,8 +39,7 @@ public class DummyRowDecoder private static final Optional> ALL_NULLS = Optional.of(ImmutableMap.of()); @Override - public Optional> decodeRow(byte[] data, - Map dataMap) + public Optional> decodeRow(byte[] data) { return ALL_NULLS; } diff --git a/lib/trino-record-decoder/src/main/java/io/trino/decoder/json/JsonRowDecoder.java b/lib/trino-record-decoder/src/main/java/io/trino/decoder/json/JsonRowDecoder.java index 15c9035bb0d5..3efc3ae31b5e 100644 --- a/lib/trino-record-decoder/src/main/java/io/trino/decoder/json/JsonRowDecoder.java +++ b/lib/trino-record-decoder/src/main/java/io/trino/decoder/json/JsonRowDecoder.java @@ -47,8 +47,7 @@ public class JsonRowDecoder } @Override - public Optional> decodeRow(byte[] data, - Map dataMap) + public Optional> decodeRow(byte[] data) { JsonNode tree; try { diff --git a/lib/trino-record-decoder/src/main/java/io/trino/decoder/raw/RawRowDecoder.java b/lib/trino-record-decoder/src/main/java/io/trino/decoder/raw/RawRowDecoder.java index ed2159e411a3..0a8b1f2d5548 100644 --- a/lib/trino-record-decoder/src/main/java/io/trino/decoder/raw/RawRowDecoder.java +++ b/lib/trino-record-decoder/src/main/java/io/trino/decoder/raw/RawRowDecoder.java @@ -48,7 +48,7 @@ private RawColumnDecoder createColumnDecoder(DecoderColumnHandle columnHandle) } @Override - public Optional> decodeRow(byte[] data, Map dataMap) + public Optional> decodeRow(byte[] data) { return Optional.of(columnDecoders.entrySet().stream() .collect(toImmutableMap( diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestAvroConfluentRowDecoder.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestAvroConfluentRowDecoder.java index 6829c6387767..45126ba08ab1 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestAvroConfluentRowDecoder.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestAvroConfluentRowDecoder.java @@ -120,14 +120,14 @@ public void testSingleValueRow() private static void testRow(RowDecoder rowDecoder, GenericRecord record, int schemaId) { byte[] serializedRecord = serializeRecord(record, record.getSchema(), schemaId); - Optional> decodedRow = rowDecoder.decodeRow(serializedRecord, null); + Optional> decodedRow = rowDecoder.decodeRow(serializedRecord); assertRowsAreEqual(decodedRow, record); } private static void testSingleValueRow(RowDecoder rowDecoder, Object value, Schema schema, int schemaId) { byte[] serializedRecord = serializeRecord(value, schema, schemaId); - Optional> decodedRow = rowDecoder.decodeRow(serializedRecord, null); + Optional> decodedRow = rowDecoder.decodeRow(serializedRecord); checkState(decodedRow.isPresent(), "decodedRow is not present"); Map.Entry entry = getOnlyElement(decodedRow.get().entrySet()); assertValuesAreEqual(entry.getValue(), value, schema); diff --git a/plugin/trino-redis/src/main/java/io/trino/plugin/redis/RedisRecordCursor.java b/plugin/trino-redis/src/main/java/io/trino/plugin/redis/RedisRecordCursor.java index 1afa7a838760..66f2fcaffa3f 100644 --- a/plugin/trino-redis/src/main/java/io/trino/plugin/redis/RedisRecordCursor.java +++ b/plugin/trino-redis/src/main/java/io/trino/plugin/redis/RedisRecordCursor.java @@ -18,6 +18,7 @@ import io.trino.decoder.DecoderColumnHandle; import io.trino.decoder.FieldValueProvider; import io.trino.decoder.RowDecoder; +import io.trino.plugin.redis.decoder.RedisRowDecoder; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.RecordCursor; import io.trino.spi.type.Type; @@ -200,10 +201,13 @@ private void generateRowValues(String keyString, String valueString, @Nullable M { byte[] keyData = keyString.getBytes(StandardCharsets.UTF_8); byte[] stringValueData = valueString.getBytes(StandardCharsets.UTF_8); + // Redis connector supports two types of Redis values: STRING and HASH. HASH type requires hash row decoder to + // decode a row from map, whereas for the STRING type decoders are optional. The redis keyData is always byte array, + // so the decoder of key always decodes a row from bytes. Optional> decodedKey = keyDecoder.decodeRow(keyData); - Optional> decodedValue = valueDecoder.decodeRow( - stringValueData, - hashValueMap); + Optional> decodedValue = valueDecoder instanceof RedisRowDecoder + ? ((RedisRowDecoder) valueDecoder).decodeRow(hashValueMap) + : valueDecoder.decodeRow(stringValueData); totalBytes += stringValueData.length; totalValues++; @@ -367,11 +371,6 @@ private void fetchData(List currentKeys) { stringValues = null; hashValues = null; - // Redis connector supports two types of Redis - // values: STRING and HASH - // HASH types requires hash row decoder to - // fill in the columns - // whereas for the STRING type decoders are optional try (Jedis jedis = jedisPool.getResource()) { switch (split.getValueDataType()) { case STRING: diff --git a/plugin/trino-redis/src/main/java/io/trino/plugin/redis/decoder/RedisRowDecoder.java b/plugin/trino-redis/src/main/java/io/trino/plugin/redis/decoder/RedisRowDecoder.java new file mode 100644 index 000000000000..980397a3e9e2 --- /dev/null +++ b/plugin/trino-redis/src/main/java/io/trino/plugin/redis/decoder/RedisRowDecoder.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.redis.decoder; + +import io.trino.decoder.DecoderColumnHandle; +import io.trino.decoder.FieldValueProvider; +import io.trino.decoder.RowDecoder; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.Optional; + +/** + * Implementations decode a row from map and add field value providers for all decodable columns. + */ +public interface RedisRowDecoder + extends RowDecoder +{ + /** + * Decodes a given map into field values. + * + * @param dataMap The row data as fields map + * @return Returns mapping from column handle to decoded value. Unmapped columns will be reported as null. Optional.empty() signals decoding error. + */ + Optional> decodeRow(@Nullable Map dataMap); +} diff --git a/plugin/trino-redis/src/main/java/io/trino/plugin/redis/decoder/hash/HashRedisRowDecoder.java b/plugin/trino-redis/src/main/java/io/trino/plugin/redis/decoder/hash/HashRedisRowDecoder.java index 55a3734b2d17..4d1c87ab0e63 100644 --- a/plugin/trino-redis/src/main/java/io/trino/plugin/redis/decoder/hash/HashRedisRowDecoder.java +++ b/plugin/trino-redis/src/main/java/io/trino/plugin/redis/decoder/hash/HashRedisRowDecoder.java @@ -16,8 +16,8 @@ import com.google.common.collect.ImmutableMap; import io.trino.decoder.DecoderColumnHandle; import io.trino.decoder.FieldValueProvider; -import io.trino.decoder.RowDecoder; import io.trino.plugin.redis.RedisFieldDecoder; +import io.trino.plugin.redis.decoder.RedisRowDecoder; import java.util.HashMap; import java.util.Map; @@ -30,7 +30,7 @@ * The row decoder for the Redis values that are stored in Hash format. */ public class HashRedisRowDecoder - implements RowDecoder + implements RedisRowDecoder { public static final String NAME = "hash"; @@ -42,7 +42,7 @@ public HashRedisRowDecoder(Map> f } @Override - public Optional> decodeRow(byte[] data, Map dataMap) + public Optional> decodeRow(Map dataMap) { if (dataMap == null) { return Optional.of(emptyMap()); @@ -62,4 +62,10 @@ public Optional> decodeRow(byte[] d } return Optional.of(decodedRow); } + + @Override + public Optional> decodeRow(byte[] data) + { + throw new UnsupportedOperationException(); + } } diff --git a/plugin/trino-redis/src/main/java/io/trino/plugin/redis/decoder/hash/HashRedisRowDecoderFactory.java b/plugin/trino-redis/src/main/java/io/trino/plugin/redis/decoder/hash/HashRedisRowDecoderFactory.java index f39882a3ccb8..d37fe1dd7ff3 100644 --- a/plugin/trino-redis/src/main/java/io/trino/plugin/redis/decoder/hash/HashRedisRowDecoderFactory.java +++ b/plugin/trino-redis/src/main/java/io/trino/plugin/redis/decoder/hash/HashRedisRowDecoderFactory.java @@ -14,9 +14,9 @@ package io.trino.plugin.redis.decoder.hash; import io.trino.decoder.DecoderColumnHandle; -import io.trino.decoder.RowDecoder; import io.trino.decoder.RowDecoderFactory; import io.trino.plugin.redis.RedisFieldDecoder; +import io.trino.plugin.redis.decoder.RedisRowDecoder; import java.util.Map; import java.util.Set; @@ -31,7 +31,7 @@ public class HashRedisRowDecoderFactory implements RowDecoderFactory { @Override - public RowDecoder create(Map decoderParams, Set columns) + public RedisRowDecoder create(Map decoderParams, Set columns) { requireNonNull(columns, "columns is null"); return new HashRedisRowDecoder(chooseFieldDecoders(columns)); diff --git a/plugin/trino-redis/src/main/java/io/trino/plugin/redis/decoder/zset/ZsetRedisRowDecoder.java b/plugin/trino-redis/src/main/java/io/trino/plugin/redis/decoder/zset/ZsetRedisRowDecoder.java index 63acbe6def4c..4ebd46d150f1 100644 --- a/plugin/trino-redis/src/main/java/io/trino/plugin/redis/decoder/zset/ZsetRedisRowDecoder.java +++ b/plugin/trino-redis/src/main/java/io/trino/plugin/redis/decoder/zset/ZsetRedisRowDecoder.java @@ -15,7 +15,7 @@ import io.trino.decoder.DecoderColumnHandle; import io.trino.decoder.FieldValueProvider; -import io.trino.decoder.RowDecoder; +import io.trino.plugin.redis.decoder.RedisRowDecoder; import java.util.Map; import java.util.Optional; @@ -26,14 +26,18 @@ * The row decoder for the 'zset' format. Zset's can contain redis keys for tables */ public class ZsetRedisRowDecoder - implements RowDecoder + implements RedisRowDecoder { public static final String NAME = "zset"; @Override - public Optional> decodeRow( - byte[] data, - Map dataMap) + public Optional> decodeRow(Map dataMap) + { + throw new UnsupportedOperationException(); + } + + @Override + public Optional> decodeRow(byte[] data) { return Optional.of(emptyMap()); } diff --git a/plugin/trino-redis/src/main/java/io/trino/plugin/redis/decoder/zset/ZsetRedisRowDecoderFactory.java b/plugin/trino-redis/src/main/java/io/trino/plugin/redis/decoder/zset/ZsetRedisRowDecoderFactory.java index 5336f0a0ce02..26b6c26a696d 100644 --- a/plugin/trino-redis/src/main/java/io/trino/plugin/redis/decoder/zset/ZsetRedisRowDecoderFactory.java +++ b/plugin/trino-redis/src/main/java/io/trino/plugin/redis/decoder/zset/ZsetRedisRowDecoderFactory.java @@ -14,8 +14,8 @@ package io.trino.plugin.redis.decoder.zset; import io.trino.decoder.DecoderColumnHandle; -import io.trino.decoder.RowDecoder; import io.trino.decoder.RowDecoderFactory; +import io.trino.plugin.redis.decoder.RedisRowDecoder; import java.util.Map; import java.util.Set; @@ -26,10 +26,10 @@ public class ZsetRedisRowDecoderFactory implements RowDecoderFactory { - private static final RowDecoder DECODER_INSTANCE = new ZsetRedisRowDecoder(); + private static final RedisRowDecoder DECODER_INSTANCE = new ZsetRedisRowDecoder(); @Override - public RowDecoder create(Map decoderParams, Set columns) + public RedisRowDecoder create(Map decoderParams, Set columns) { requireNonNull(columns, "columns is null"); checkArgument(columns.stream().noneMatch(DecoderColumnHandle::isInternal), "unexpected internal column");