Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import io.trino.parquet.dictionary.BinaryDictionary;
import io.trino.parquet.dictionary.Dictionary;
import io.trino.parquet.dictionary.DictionaryReader;
import io.trino.parquet.dictionary.DoubleDictionary;
import io.trino.parquet.dictionary.FloatDictionary;
import io.trino.parquet.dictionary.IntegerDictionary;
Expand Down Expand Up @@ -106,24 +105,12 @@ public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valu
},

PLAIN_DICTIONARY {
@Override
public ValuesReader getDictionaryBasedValuesReader(ColumnDescriptor descriptor, ValuesType valuesType, Dictionary dictionary)
{
return RLE_DICTIONARY.getDictionaryBasedValuesReader(descriptor, valuesType, dictionary);
}

@Override
public Dictionary initDictionary(ColumnDescriptor descriptor, DictionaryPage dictionaryPage)
throws IOException
{
return PLAIN.initDictionary(descriptor, dictionaryPage);
}

@Override
public boolean usesDictionary()
{
return true;
}
},

DELTA_BINARY_PACKED {
Expand Down Expand Up @@ -156,24 +143,12 @@ public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valu
},

RLE_DICTIONARY {
@Override
public ValuesReader getDictionaryBasedValuesReader(ColumnDescriptor descriptor, ValuesType valuesType, Dictionary dictionary)
{
return new DictionaryReader(dictionary);
}

@Override
public Dictionary initDictionary(ColumnDescriptor descriptor, DictionaryPage dictionaryPage)
throws IOException
{
return PLAIN.initDictionary(descriptor, dictionaryPage);
}

@Override
public boolean usesDictionary()
{
return true;
}
};

static final int INT96_TYPE_LENGTH = 12;
Expand All @@ -192,11 +167,6 @@ static int getMaxLevel(ColumnDescriptor descriptor, ValuesType valuesType)
};
}

public boolean usesDictionary()
{
return false;
}

public Dictionary initDictionary(ColumnDescriptor descriptor, DictionaryPage dictionaryPage)
throws IOException
{
Expand All @@ -207,9 +177,4 @@ public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valu
{
throw new UnsupportedOperationException("Error decoding values in encoding: " + this.name());
}

public ValuesReader getDictionaryBasedValuesReader(ColumnDescriptor descriptor, ValuesType valuesType, Dictionary dictionary)
{
throw new UnsupportedOperationException(" Dictionary encoding is not supported for: " + name());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,53 +15,46 @@

import io.airlift.slice.Slice;
import io.trino.parquet.DictionaryPage;
import org.apache.parquet.io.api.Binary;

import java.io.IOException;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.parquet.bytes.BytesUtils.readIntLittleEndian;

public class BinaryDictionary
implements Dictionary
{
private final Binary[] content;
private final Slice[] content;

public BinaryDictionary(DictionaryPage dictionaryPage)
throws IOException
{
this(dictionaryPage, null);
}

public BinaryDictionary(DictionaryPage dictionaryPage, Integer length)
throws IOException
{
content = new Binary[dictionaryPage.getDictionarySize()];
content = new Slice[dictionaryPage.getDictionarySize()];

Slice dictionarySlice = dictionaryPage.getSlice();
byte[] dictionaryBytes = dictionarySlice.byteArray();
int offset = dictionarySlice.byteArrayOffset();

int currentInputOffset = 0;
if (length == null) {
for (int i = 0; i < content.length; i++) {
int len = readIntLittleEndian(dictionaryBytes, offset);
offset += 4;
content[i] = Binary.fromReusedByteArray(dictionaryBytes, offset, len);
offset += len;
int positionLength = dictionarySlice.getInt(currentInputOffset);
currentInputOffset += Integer.BYTES;
content[i] = dictionarySlice.slice(currentInputOffset, positionLength);
currentInputOffset += positionLength;
}
}
else {
checkArgument(length > 0, "Invalid byte array length: %s", length);
for (int i = 0; i < content.length; i++) {
content[i] = Binary.fromReusedByteArray(dictionaryBytes, offset, length);
offset += length;
content[i] = dictionarySlice.slice(currentInputOffset, length);
currentInputOffset += length;
}
}
}

@Override
public Binary decodeToBinary(int id)
public Slice decodeToSlice(int id)
{
return content[id];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
*/
package io.trino.parquet.dictionary;

import org.apache.parquet.io.api.Binary;
import io.airlift.slice.Slice;

public interface Dictionary
{
default Binary decodeToBinary(int id)
default Slice decodeToSlice(int id)
{
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,13 @@ public static Domain getDomain(
}

try {
Object min = statistics.genericGetMin();
Object max = statistics.genericGetMax();
return getDomain(
column,
type,
ImmutableList.of(statistics.genericGetMin()),
ImmutableList.of(statistics.genericGetMax()),
ImmutableList.of(min instanceof Binary ? Slices.wrappedBuffer(((Binary) min).getBytes()) : min),
ImmutableList.of(max instanceof Binary ? Slices.wrappedBuffer(((Binary) max).getBytes()) : max),
hasNullValue,
timeZone);
}
Expand Down Expand Up @@ -372,8 +374,8 @@ private static Domain getDomain(
Object min = minimums.get(i);
Object max = maximums.get(i);

long minValue = min instanceof Binary ? getShortDecimalValue(((Binary) min).getBytes()) : asLong(min);
long maxValue = max instanceof Binary ? getShortDecimalValue(((Binary) max).getBytes()) : asLong(max);
long minValue = min instanceof Slice ? getShortDecimalValue(((Slice) min).getBytes()) : asLong(min);
long maxValue = max instanceof Slice ? getShortDecimalValue(((Slice) max).getBytes()) : asLong(max);

if (isStatisticsOverflow(type, minValue, maxValue)) {
return Domain.create(ValueSet.all(type), hasNullValue);
Expand All @@ -384,8 +386,8 @@ private static Domain getDomain(
}
else {
for (int i = 0; i < minimums.size(); i++) {
Int128 min = Int128.fromBigEndian(((Binary) minimums.get(i)).getBytes());
Int128 max = Int128.fromBigEndian(((Binary) maximums.get(i)).getBytes());
Int128 min = Int128.fromBigEndian(((Slice) minimums.get(i)).getBytes());
Int128 max = Int128.fromBigEndian(((Slice) maximums.get(i)).getBytes());

rangesBuilder.addRangeInclusive(min, max);
}
Expand Down Expand Up @@ -427,8 +429,8 @@ private static Domain getDomain(
if (type instanceof VarcharType) {
SortedRangeSet.Builder rangesBuilder = SortedRangeSet.builder(type, minimums.size());
for (int i = 0; i < minimums.size(); i++) {
Slice min = Slices.wrappedHeapBuffer(((Binary) minimums.get(i)).toByteBuffer());
Slice max = Slices.wrappedHeapBuffer(((Binary) maximums.get(i)).toByteBuffer());
Slice min = (Slice) minimums.get(i);
Slice max = (Slice) maximums.get(i);
rangesBuilder.addRangeInclusive(min, max);
}
return Domain.create(rangesBuilder.build(), hasNullValue);
Expand All @@ -446,11 +448,11 @@ private static Domain getDomain(
// PARQUET-1065 deprecated them. The result is that any writer that produced stats was producing unusable incorrect values, except
// the special case where min == max and an incorrect ordering would not be material to the result. PARQUET-1026 made binary stats
// available and valid in that special case
if (!(min instanceof Binary) || !(max instanceof Binary) || !min.equals(max)) {
if (!(min instanceof Slice) || !(max instanceof Slice) || !min.equals(max)) {
return Domain.create(ValueSet.all(type), hasNullValue);
}

rangesBuilder.addValue(timestampEncoder.getTimestamp(decodeInt96Timestamp((Binary) min)));
rangesBuilder.addValue(timestampEncoder.getTimestamp(decodeInt96Timestamp(Binary.fromConstantByteArray(((Slice) min).getBytes()))));
}
return Domain.create(rangesBuilder.build(), hasNullValue);
}
Expand Down Expand Up @@ -732,11 +734,13 @@ public boolean canDrop(org.apache.parquet.filter2.predicate.Statistics<T> statis
return false;
}

T min = statistic.getMin();
T max = statistic.getMax();
Domain domain = getDomain(
columnDescriptor,
columnDomain.getType(),
ImmutableList.of(statistic.getMin()),
ImmutableList.of(statistic.getMax()),
ImmutableList.of(min instanceof Binary ? Slices.wrappedBuffer(((Binary) min).getBytes()) : min),
ImmutableList.of(max instanceof Binary ? Slices.wrappedBuffer(((Binary) max).getBytes()) : max),
true,
timeZone);
return !columnDomain.overlaps(domain);
Expand All @@ -759,23 +763,14 @@ private ColumnIndexValueConverter()

private Function<ByteBuffer, Object> getConverter(PrimitiveType primitiveType)
{
switch (primitiveType.getPrimitiveTypeName()) {
case BOOLEAN:
return buffer -> buffer.get(0) != 0;
case INT32:
return buffer -> buffer.order(LITTLE_ENDIAN).getInt(0);
case INT64:
return buffer -> buffer.order(LITTLE_ENDIAN).getLong(0);
case FLOAT:
return buffer -> buffer.order(LITTLE_ENDIAN).getFloat(0);
case DOUBLE:
return buffer -> buffer.order(LITTLE_ENDIAN).getDouble(0);
case FIXED_LEN_BYTE_ARRAY:
case BINARY:
case INT96:
default:
return buffer -> Binary.fromReusedByteBuffer(buffer);
}
return switch (primitiveType.getPrimitiveTypeName()) {
case BOOLEAN -> buffer -> buffer.get(0) != 0;
case INT32 -> buffer -> buffer.order(LITTLE_ENDIAN).getInt(0);
case INT64 -> buffer -> buffer.order(LITTLE_ENDIAN).getLong(0);
case FLOAT -> buffer -> buffer.order(LITTLE_ENDIAN).getFloat(0);
case DOUBLE -> buffer -> buffer.order(LITTLE_ENDIAN).getDouble(0);
case FIXED_LEN_BYTE_ARRAY, BINARY, INT96 -> Slices::wrappedHeapBuffer;
};
}
}

Expand All @@ -796,7 +791,7 @@ private Function<Integer, Object> getConverter(PrimitiveType primitiveType)
case INT64 -> (i) -> dictionary.decodeToLong(i);
case FLOAT -> (i) -> dictionary.decodeToFloat(i);
case DOUBLE -> (i) -> dictionary.decodeToDouble(i);
case FIXED_LEN_BYTE_ARRAY, BINARY, INT96 -> (i) -> dictionary.decodeToBinary(i);
case FIXED_LEN_BYTE_ARRAY, BINARY, INT96 -> (i) -> dictionary.decodeToSlice(i);
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ static <T> Object[][] testArgs(
static ValuesReader getApacheParquetReader(ParquetEncoding encoding, PrimitiveField field, Optional<Dictionary> dictionary)
{
if (encoding == RLE_DICTIONARY || encoding == PLAIN_DICTIONARY) {
return encoding.getDictionaryBasedValuesReader(field.getDescriptor(), VALUES, dictionary.orElseThrow());
return new DictionaryReader(dictionary.orElseThrow());
}
checkArgument(dictionary.isEmpty(), "dictionary should be empty");
return encoding.getValuesReader(field.getDescriptor(), VALUES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.parquet.dictionary;
package io.trino.parquet.reader.decoders;

import io.trino.parquet.dictionary.Dictionary;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.ValuesReader;
Expand Down Expand Up @@ -50,7 +51,7 @@ public int readValueDictionaryId()
@Override
public Binary readBytes()
{
return dictionary.decodeToBinary(readInt());
return Binary.fromConstantByteArray(dictionary.decodeToSlice(readInt()).getBytes());
}

@Override
Expand Down