diff --git a/docs/src/main/sphinx/connector/pinot.rst b/docs/src/main/sphinx/connector/pinot.rst index 9d9c275e2f17..eecac02b6b5c 100644 --- a/docs/src/main/sphinx/connector/pinot.rst +++ b/docs/src/main/sphinx/connector/pinot.rst @@ -166,6 +166,7 @@ Pinot Trino ``FLOAT`` ``REAL`` ``DOUBLE`` ``DOUBLE`` ``STRING`` ``VARCHAR`` +``BYTES`` ``VARBINARY`` ``INT_ARRAY`` ``VARCHAR`` ``LONG_ARRAY`` ``VARCHAR`` ``FLOAT_ARRAY`` ``VARCHAR`` diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java index e0dc220caf8e..fb74a017c221 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java @@ -26,8 +26,6 @@ import io.trino.spi.type.Type; import io.trino.spi.type.VarbinaryType; import io.trino.spi.type.VarcharType; -import org.apache.commons.codec.DecoderException; -import org.apache.commons.codec.binary.Hex; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; @@ -39,6 +37,7 @@ import static com.google.common.base.Strings.isNullOrEmpty; import static io.trino.plugin.pinot.PinotErrorCode.PINOT_DECODE_ERROR; import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE; +import static io.trino.plugin.pinot.decoders.VarbinaryDecoder.toBytes; import static java.lang.Float.floatToIntBits; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -332,16 +331,6 @@ else if (trinoType instanceof VarbinaryType) { return Slices.EMPTY_SLICE; } - private static byte[] toBytes(String stringValue) - { - try { - return Hex.decodeHex(stringValue.toCharArray()); - } - catch (DecoderException e) { - throw new IllegalArgumentException("Value: " + stringValue + " is not Hex encoded", e); - } - } - private Slice getUtf8Slice(String value) { if (isNullOrEmpty(value)) { diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/DecoderFactory.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/DecoderFactory.java index 96b235d9bc86..dbc957e69f6e 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/DecoderFactory.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/DecoderFactory.java @@ -22,6 +22,7 @@ import io.trino.spi.type.IntegerType; import io.trino.spi.type.RealType; import io.trino.spi.type.Type; +import io.trino.spi.type.VarbinaryType; import java.util.Optional; @@ -60,6 +61,9 @@ else if (type instanceof BooleanType) { else if (type instanceof ArrayType) { return new ArrayDecoder(type); } + else if (type instanceof VarbinaryType) { + return new VarbinaryDecoder(); + } else { return new VarcharDecoder(); } diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/VarbinaryDecoder.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/VarbinaryDecoder.java new file mode 100644 index 000000000000..472a479958f0 --- /dev/null +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/decoders/VarbinaryDecoder.java @@ -0,0 +1,56 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.pinot.decoders; + +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.trino.spi.TrinoException; +import io.trino.spi.block.BlockBuilder; +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.binary.Hex; + +import java.util.function.Supplier; + +import static io.trino.spi.StandardErrorCode.TYPE_MISMATCH; +import static java.lang.String.format; + +public class VarbinaryDecoder + implements Decoder +{ + @Override + public void decode(Supplier getter, BlockBuilder output) + { + Object value = getter.get(); + if (value == null) { + output.appendNull(); + } + else if (value instanceof String) { + Slice slice = Slices.wrappedBuffer(toBytes((String) value)); + output.writeBytes(slice, 0, slice.length()).closeEntry(); + } + else { + throw new TrinoException(TYPE_MISMATCH, format("Expected a string value of type VARBINARY: %s [%s]", value, value.getClass().getSimpleName())); + } + } + + public static byte[] toBytes(String stringValue) + { + try { + return Hex.decodeHex(stringValue.toCharArray()); + } + catch (DecoderException e) { + throw new IllegalArgumentException("Value: " + stringValue + " is not Hex encoded", e); + } + } +} diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java index 5f2c703d728e..f8bee993fff5 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java @@ -2183,4 +2183,24 @@ public void testAggregationPushdownWithArrays() "(56, VARCHAR 'string_8400', BIGINT '1')," + "(1000, VARCHAR 'string1_8401', BIGINT '1')"); } + + @Test + public void testVarbinary() + { + String expectedValues = "VALUES (X'')," + + " (X'73 74 72 69 6e 67 5f 30')," + + " (X'73 74 72 69 6e 67 5f 31 32 30 30')," + + " (X'73 74 72 69 6e 67 5f 32 34 30 30')," + + " (X'73 74 72 69 6e 67 5f 33 36 30 30')," + + " (X'73 74 72 69 6e 67 5f 34 38 30 30')," + + " (X'73 74 72 69 6e 67 5f 36 30 30 30')," + + " (X'73 74 72 69 6e 67 5f 37 32 30 30')," + + " (X'73 74 72 69 6e 67 5f 38 34 30 30')," + + " (X'73 74 72 69 6e 67 5f 39 36 30 30')"; + // The filter on string_col is to have a deterministic result set: the default limit for broker queries is 10 rows. + assertThat(query("SELECT bytes_col FROM alltypes WHERE string_col != 'array_null'")) + .matches(expectedValues); + assertThat(query("SELECT bytes_col FROM \"SELECT bytes_col, string_col FROM alltypes\" WHERE string_col != 'array_null'")) + .matches(expectedValues); + } }