Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,38 +13,19 @@
*/
package io.trino.parquet.reader.decoders;

import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.parquet.reader.SimpleSliceInputStream;
import io.trino.parquet.reader.flat.BinaryBuffer;
import io.trino.plugin.base.type.DecodedTimestamp;
import io.trino.spi.type.CharType;
import io.trino.spi.type.Chars;
import io.trino.spi.type.Decimals;
import io.trino.spi.type.Int128;
import io.trino.spi.type.VarcharType;
import io.trino.spi.type.Varchars;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.schema.LogicalTypeAnnotation;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.parquet.ParquetReaderUtils.castToByte;
import static io.trino.parquet.ParquetTimestampUtils.decodeInt96Timestamp;
import static io.trino.parquet.ParquetTypeUtils.checkBytesFitInShortDecimal;
import static io.trino.parquet.ParquetTypeUtils.getShortDecimalValue;
import static io.trino.parquet.reader.flat.Int96ColumnAdapter.Int96Buffer;
import static io.trino.spi.type.Varchars.truncateToLength;
import static java.util.Objects.requireNonNull;
import static org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;

/**
* This is a set of proxy value decoders that use a delegated value reader from apache lib.
Expand Down Expand Up @@ -93,232 +74,6 @@ public void skip(int n)
}
}

public static final class ShortDecimalApacheParquetValueDecoder
implements ValueDecoder<long[]>
{
private final ValuesReader delegate;
private final ColumnDescriptor descriptor;
private final int typeLength;

public ShortDecimalApacheParquetValueDecoder(ValuesReader delegate, ColumnDescriptor descriptor)
{
this.delegate = requireNonNull(delegate, "delegate is null");
LogicalTypeAnnotation logicalTypeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation();
checkArgument(
logicalTypeAnnotation instanceof DecimalLogicalTypeAnnotation decimalAnnotation
&& decimalAnnotation.getPrecision() <= Decimals.MAX_SHORT_PRECISION,
"Column %s is not a short decimal",
descriptor);
this.typeLength = descriptor.getPrimitiveType().getTypeLength();
checkArgument(typeLength > 0 && typeLength <= 16, "Expected column %s to have type length in range (1-16)", descriptor);
this.descriptor = descriptor;
}

@Override
public void init(SimpleSliceInputStream input)
{
initialize(input, delegate);
}

@Override
public void read(long[] values, int offset, int length)
{
int bytesOffset = 0;
int bytesLength = typeLength;
if (typeLength > Long.BYTES) {
bytesOffset = typeLength - Long.BYTES;
bytesLength = Long.BYTES;
}
for (int i = offset; i < offset + length; i++) {
byte[] bytes = delegate.readBytes().getBytes();
checkBytesFitInShortDecimal(bytes, 0, bytesOffset, descriptor);
values[i] = getShortDecimalValue(bytes, bytesOffset, bytesLength);
}
}

@Override
public void skip(int n)
{
delegate.skip(n);
}
}

public static final class LongDecimalApacheParquetValueDecoder
implements ValueDecoder<long[]>
{
private final ValuesReader delegate;

public LongDecimalApacheParquetValueDecoder(ValuesReader delegate)
{
this.delegate = requireNonNull(delegate, "delegate is null");
}

@Override
public void init(SimpleSliceInputStream input)
{
initialize(input, delegate);
}

@Override
public void read(long[] values, int offset, int length)
{
int endOffset = (offset + length) * 2;
for (int currentOutputOffset = offset * 2; currentOutputOffset < endOffset; currentOutputOffset += 2) {
Int128 value = Int128.fromBigEndian(delegate.readBytes().getBytes());
values[currentOutputOffset] = value.getHigh();
values[currentOutputOffset + 1] = value.getLow();
}
}

@Override
public void skip(int n)
{
delegate.skip(n);
}
}

public static final class BoundedVarcharApacheParquetValueDecoder
implements ValueDecoder<BinaryBuffer>
{
private final ValuesReader delegate;
private final int boundedLength;

public BoundedVarcharApacheParquetValueDecoder(ValuesReader delegate, VarcharType varcharType)
{
this.delegate = requireNonNull(delegate, "delegate is null");
checkArgument(
!varcharType.isUnbounded(),
"Trino type %s is not a bounded varchar",
varcharType);
this.boundedLength = varcharType.getBoundedLength();
}

@Override
public void init(SimpleSliceInputStream input)
{
initialize(input, delegate);
}

@Override
public void read(BinaryBuffer values, int offsetsIndex, int length)
{
for (int i = 0; i < length; i++) {
byte[] value = delegate.readBytes().getBytes();
Slice slice = Varchars.truncateToLength(Slices.wrappedBuffer(value), boundedLength);
values.add(slice, i + offsetsIndex);
}
}

@Override
public void skip(int n)
{
delegate.skip(n);
}
}

public static final class CharApacheParquetValueDecoder
implements ValueDecoder<BinaryBuffer>
{
private final ValuesReader delegate;
private final int maxLength;

public CharApacheParquetValueDecoder(ValuesReader delegate, CharType charType)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.maxLength = charType.getLength();
}

@Override
public void init(SimpleSliceInputStream input)
{
initialize(input, delegate);
}

@Override
public void read(BinaryBuffer values, int offsetsIndex, int length)
{
for (int i = 0; i < length; i++) {
byte[] value = delegate.readBytes().getBytes();
Slice slice = Chars.trimTrailingSpaces(truncateToLength(Slices.wrappedBuffer(value), maxLength));
values.add(slice, i + offsetsIndex);
}
}

@Override
public void skip(int n)
{
delegate.skip(n);
}
}

public static final class BinaryApacheParquetValueDecoder
implements ValueDecoder<BinaryBuffer>
{
private final ValuesReader delegate;

public BinaryApacheParquetValueDecoder(ValuesReader delegate)
{
this.delegate = requireNonNull(delegate, "delegate is null");
}

@Override
public void init(SimpleSliceInputStream input)
{
initialize(input, delegate);
}

@Override
public void read(BinaryBuffer values, int offsetsIndex, int length)
{
for (int i = 0; i < length; i++) {
byte[] value = delegate.readBytes().getBytes();
values.add(value, i + offsetsIndex);
}
}

@Override
public void skip(int n)
{
delegate.skip(n);
}
}

public static final class UuidApacheParquetValueDecoder
implements ValueDecoder<long[]>
{
private static final VarHandle LONG_ARRAY_HANDLE = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.LITTLE_ENDIAN);

private final ValuesReader delegate;

public UuidApacheParquetValueDecoder(ValuesReader delegate)
{
this.delegate = requireNonNull(delegate, "delegate is null");
}

@Override
public void init(SimpleSliceInputStream input)
{
initialize(input, delegate);
}

@Override
public void read(long[] values, int offset, int length)
{
int endOffset = (offset + length) * 2;
for (int currentOutputOffset = offset * 2; currentOutputOffset < endOffset; currentOutputOffset += 2) {
byte[] data = delegate.readBytes().getBytes();
values[currentOutputOffset] = (long) LONG_ARRAY_HANDLE.get(data, 0);
values[currentOutputOffset + 1] = (long) LONG_ARRAY_HANDLE.get(data, Long.BYTES);
}
}

@Override
public void skip(int n)
{
delegate.skip(n);
}
}

public static final class Int96ApacheParquetValueDecoder
implements ValueDecoder<Int96Buffer>
{
Expand Down
Loading