Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.parquet;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You didn't remove io.trino.parquet.RichColumnDescriptor class. Is it still used?

Could you improve commit description? Why we don't need io.trino.parquet.RichColumnDescriptor anymore

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh I thought I had removed it, it has been removed now. Commit message has been updated as well.


import io.trino.spi.type.DecimalType;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.io.ColumnIO;
import org.apache.parquet.io.ColumnIOFactory;
Expand Down Expand Up @@ -94,27 +95,27 @@ public static ColumnIO getArrayElementColumn(ColumnIO columnIO)
return columnIO;
}

public static Map<List<String>, RichColumnDescriptor> getDescriptors(MessageType fileSchema, MessageType requestedSchema)
public static Map<List<String>, ColumnDescriptor> getDescriptors(MessageType fileSchema, MessageType requestedSchema)
{
Map<List<String>, RichColumnDescriptor> descriptorsByPath = new HashMap<>();
Map<List<String>, ColumnDescriptor> descriptorsByPath = new HashMap<>();
List<PrimitiveColumnIO> columns = getColumns(fileSchema, requestedSchema);
for (String[] paths : fileSchema.getPaths()) {
List<String> columnPath = Arrays.asList(paths);
getDescriptor(columns, columnPath)
.ifPresent(richColumnDescriptor -> descriptorsByPath.put(columnPath, richColumnDescriptor));
.ifPresent(columnDescriptor -> descriptorsByPath.put(columnPath, columnDescriptor));
}
return descriptorsByPath;
}

public static Optional<RichColumnDescriptor> getDescriptor(List<PrimitiveColumnIO> columns, List<String> path)
public static Optional<ColumnDescriptor> getDescriptor(List<PrimitiveColumnIO> columns, List<String> path)
{
checkArgument(path.size() >= 1, "Parquet nested path should have at least one component");
int index = getPathIndex(columns, path);
if (index == -1) {
return Optional.empty();
}
PrimitiveColumnIO columnIO = columns.get(index);
return Optional.of(new RichColumnDescriptor(columnIO.getColumnDescriptor(), columnIO.getType().asPrimitiveType()));
return Optional.of(columnIO.getColumnDescriptor());
}

private static int getPathIndex(List<PrimitiveColumnIO> columns, List<String> path)
Expand Down Expand Up @@ -218,12 +219,11 @@ public static ColumnIO lookupColumnById(GroupColumnIO groupColumnIO, int columnI
return null;
}

public static Optional<DecimalType> createDecimalType(RichColumnDescriptor descriptor)
public static Optional<DecimalType> createDecimalType(PrimitiveField field)
{
if (!(descriptor.getPrimitiveType().getLogicalTypeAnnotation() instanceof DecimalLogicalTypeAnnotation)) {
if (!(field.getDescriptor().getPrimitiveType().getLogicalTypeAnnotation() instanceof DecimalLogicalTypeAnnotation decimalLogicalType)) {
return Optional.empty();
}
DecimalLogicalTypeAnnotation decimalLogicalType = (DecimalLogicalTypeAnnotation) descriptor.getPrimitiveType().getLogicalTypeAnnotation();
return Optional.of(DecimalType.createDecimalType(decimalLogicalType.getPrecision(), decimalLogicalType.getScale()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,25 @@
package io.trino.parquet;

import io.trino.spi.type.Type;
import org.apache.parquet.column.ColumnDescriptor;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

public class PrimitiveField
extends Field
{
private final RichColumnDescriptor descriptor;
private final ColumnDescriptor descriptor;
private final int id;

public PrimitiveField(Type type, int repetitionLevel, int definitionLevel, boolean required, RichColumnDescriptor descriptor, int id)
public PrimitiveField(Type type, boolean required, ColumnDescriptor descriptor, int id)
{
super(type, repetitionLevel, definitionLevel, required);
super(type, descriptor.getMaxRepetitionLevel(), descriptor.getMaxDefinitionLevel(), required);
this.descriptor = requireNonNull(descriptor, "descriptor is required");
this.id = id;
}

public RichColumnDescriptor getDescriptor()
public ColumnDescriptor getDescriptor()
{
return descriptor;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.trino.parquet.DictionaryPage;
import io.trino.parquet.ParquetDataSource;
import io.trino.parquet.ParquetEncoding;
import io.trino.parquet.RichColumnDescriptor;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Type;
Expand Down Expand Up @@ -108,26 +107,26 @@ private static BigDecimal maximalValue(DecimalType decimalType)
public static Predicate buildPredicate(
MessageType requestedSchema,
TupleDomain<ColumnDescriptor> parquetTupleDomain,
Map<List<String>, RichColumnDescriptor> descriptorsByPath,
Map<List<String>, ColumnDescriptor> descriptorsByPath,
DateTimeZone timeZone)
{
ImmutableList.Builder<RichColumnDescriptor> columnReferences = ImmutableList.builder();
ImmutableList.Builder<ColumnDescriptor> columnReferences = ImmutableList.builder();
for (String[] paths : requestedSchema.getPaths()) {
RichColumnDescriptor descriptor = descriptorsByPath.get(Arrays.asList(paths));
ColumnDescriptor descriptor = descriptorsByPath.get(Arrays.asList(paths));
if (descriptor != null) {
columnReferences.add(descriptor);
}
}
return new TupleDomainParquetPredicate(parquetTupleDomain, columnReferences.build(), timeZone);
}

public static boolean predicateMatches(Predicate parquetPredicate, BlockMetaData block, ParquetDataSource dataSource, Map<List<String>, RichColumnDescriptor> descriptorsByPath, TupleDomain<ColumnDescriptor> parquetTupleDomain)
public static boolean predicateMatches(Predicate parquetPredicate, BlockMetaData block, ParquetDataSource dataSource, Map<List<String>, ColumnDescriptor> descriptorsByPath, TupleDomain<ColumnDescriptor> parquetTupleDomain)
throws IOException
{
return predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain, Optional.empty());
}

public static boolean predicateMatches(Predicate parquetPredicate, BlockMetaData block, ParquetDataSource dataSource, Map<List<String>, RichColumnDescriptor> descriptorsByPath, TupleDomain<ColumnDescriptor> parquetTupleDomain, Optional<ColumnIndexStore> columnIndexStore)
public static boolean predicateMatches(Predicate parquetPredicate, BlockMetaData block, ParquetDataSource dataSource, Map<List<String>, ColumnDescriptor> descriptorsByPath, TupleDomain<ColumnDescriptor> parquetTupleDomain, Optional<ColumnIndexStore> columnIndexStore)
throws IOException
{
Map<ColumnDescriptor, Statistics<?>> columnStatistics = getStatistics(block, descriptorsByPath);
Expand All @@ -143,13 +142,13 @@ public static boolean predicateMatches(Predicate parquetPredicate, BlockMetaData
return dictionaryPredicatesMatch(parquetPredicate, block, dataSource, descriptorsByPath, parquetTupleDomain);
}

private static Map<ColumnDescriptor, Statistics<?>> getStatistics(BlockMetaData blockMetadata, Map<List<String>, RichColumnDescriptor> descriptorsByPath)
private static Map<ColumnDescriptor, Statistics<?>> getStatistics(BlockMetaData blockMetadata, Map<List<String>, ColumnDescriptor> descriptorsByPath)
{
ImmutableMap.Builder<ColumnDescriptor, Statistics<?>> statistics = ImmutableMap.builder();
for (ColumnChunkMetaData columnMetaData : blockMetadata.getColumns()) {
Statistics<?> columnStatistics = columnMetaData.getStatistics();
if (columnStatistics != null) {
RichColumnDescriptor descriptor = descriptorsByPath.get(Arrays.asList(columnMetaData.getPath().toArray()));
ColumnDescriptor descriptor = descriptorsByPath.get(Arrays.asList(columnMetaData.getPath().toArray()));
if (descriptor != null) {
statistics.put(descriptor, columnStatistics);
}
Expand All @@ -158,11 +157,11 @@ private static Map<ColumnDescriptor, Statistics<?>> getStatistics(BlockMetaData
return statistics.buildOrThrow();
}

private static boolean dictionaryPredicatesMatch(Predicate parquetPredicate, BlockMetaData blockMetadata, ParquetDataSource dataSource, Map<List<String>, RichColumnDescriptor> descriptorsByPath, TupleDomain<ColumnDescriptor> parquetTupleDomain)
private static boolean dictionaryPredicatesMatch(Predicate parquetPredicate, BlockMetaData blockMetadata, ParquetDataSource dataSource, Map<List<String>, ColumnDescriptor> descriptorsByPath, TupleDomain<ColumnDescriptor> parquetTupleDomain)
throws IOException
{
for (ColumnChunkMetaData columnMetaData : blockMetadata.getColumns()) {
RichColumnDescriptor descriptor = descriptorsByPath.get(Arrays.asList(columnMetaData.getPath().toArray()));
ColumnDescriptor descriptor = descriptorsByPath.get(Arrays.asList(columnMetaData.getPath().toArray()));
if (descriptor != null) {
if (isOnlyDictionaryEncodingPages(columnMetaData) && isColumnPredicate(descriptor, parquetTupleDomain)) {
Slice buffer = dataSource.readFully(columnMetaData.getStartingPos(), toIntExact(columnMetaData.getTotalSize()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.trino.parquet.DictionaryPage;
import io.trino.parquet.ParquetCorruptionException;
import io.trino.parquet.ParquetDataSourceId;
import io.trino.parquet.RichColumnDescriptor;
import io.trino.parquet.dictionary.Dictionary;
import io.trino.plugin.base.type.TrinoTimestampEncoder;
import io.trino.spi.predicate.Domain;
Expand Down Expand Up @@ -81,10 +80,10 @@ public class TupleDomainParquetPredicate
implements Predicate
{
private final TupleDomain<ColumnDescriptor> effectivePredicate;
private final List<RichColumnDescriptor> columns;
private final List<ColumnDescriptor> columns;
private final DateTimeZone timeZone;

public TupleDomainParquetPredicate(TupleDomain<ColumnDescriptor> effectivePredicate, List<RichColumnDescriptor> columns, DateTimeZone timeZone)
public TupleDomainParquetPredicate(TupleDomain<ColumnDescriptor> effectivePredicate, List<ColumnDescriptor> columns, DateTimeZone timeZone)
{
this.effectivePredicate = requireNonNull(effectivePredicate, "effectivePredicate is null");
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
Expand All @@ -104,7 +103,7 @@ public boolean matches(long numberOfRows, Map<ColumnDescriptor, Statistics<?>> s
Map<ColumnDescriptor, Domain> effectivePredicateDomains = effectivePredicate.getDomains()
.orElseThrow(() -> new IllegalStateException("Effective predicate other than none should have domains"));

for (RichColumnDescriptor column : columns) {
for (ColumnDescriptor column : columns) {
Domain effectivePredicateDomain = effectivePredicateDomains.get(column);
if (effectivePredicateDomain == null) {
continue;
Expand Down Expand Up @@ -162,7 +161,7 @@ public boolean matches(long numberOfRows, ColumnIndexStore columnIndexStore, Par
Map<ColumnDescriptor, Domain> effectivePredicateDomains = effectivePredicate.getDomains()
.orElseThrow(() -> new IllegalStateException("Effective predicate other than none should have domains"));

for (RichColumnDescriptor column : columns) {
for (ColumnDescriptor column : columns) {
Domain effectivePredicateDomain = effectivePredicateDomains.get(column);
if (effectivePredicateDomain == null) {
continue;
Expand Down Expand Up @@ -405,7 +404,7 @@ public static Domain getDomain(
long rowCount,
ColumnIndex columnIndex,
ParquetDataSourceId id,
RichColumnDescriptor descriptor,
ColumnDescriptor descriptor,
DateTimeZone timeZone)
throws ParquetCorruptionException
{
Expand Down Expand Up @@ -539,7 +538,7 @@ private FilterPredicate convertToParquetFilter(DateTimeZone timeZone)
{
FilterPredicate filter = null;

for (RichColumnDescriptor column : columns) {
for (ColumnDescriptor column : columns) {
Domain domain = effectivePredicate.getDomains().get().get(column);
if (domain == null || domain.isNone()) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package io.trino.parquet.reader;

import io.airlift.slice.Slice;
import io.trino.parquet.RichColumnDescriptor;
import io.trino.parquet.PrimitiveField;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.type.CharType;
import io.trino.spi.type.Type;
Expand All @@ -29,9 +29,9 @@
public class BinaryColumnReader
extends PrimitiveColumnReader
{
public BinaryColumnReader(RichColumnDescriptor descriptor)
public BinaryColumnReader(PrimitiveField field)
{
super(descriptor);
super(field);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
*/
package io.trino.parquet.reader;

import io.trino.parquet.RichColumnDescriptor;
import io.trino.parquet.PrimitiveField;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.type.Type;

public class BooleanColumnReader
extends PrimitiveColumnReader
{
public BooleanColumnReader(RichColumnDescriptor descriptor)
public BooleanColumnReader(PrimitiveField primitiveField)
{
super(descriptor);
super(primitiveField);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@
*/
package io.trino.parquet.reader;

import io.trino.parquet.RichColumnDescriptor;
import io.trino.parquet.PrimitiveField;
import io.trino.spi.type.DecimalType;

public final class DecimalColumnReaderFactory
{
private DecimalColumnReaderFactory() {}

public static PrimitiveColumnReader createReader(RichColumnDescriptor descriptor, DecimalType parquetDecimalType)
public static PrimitiveColumnReader createReader(PrimitiveField field, DecimalType parquetDecimalType)
{
if (parquetDecimalType.isShort()) {
return new ShortDecimalColumnReader(descriptor, parquetDecimalType);
return new ShortDecimalColumnReader(field, parquetDecimalType);
}
else {
return new LongDecimalColumnReader(descriptor, parquetDecimalType);
return new LongDecimalColumnReader(field, parquetDecimalType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
*/
package io.trino.parquet.reader;

import io.trino.parquet.RichColumnDescriptor;
import io.trino.parquet.PrimitiveField;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.type.Type;

public class DoubleColumnReader
extends PrimitiveColumnReader
{
public DoubleColumnReader(RichColumnDescriptor descriptor)
public DoubleColumnReader(PrimitiveField field)
{
super(descriptor);
super(field);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
package io.trino.parquet.reader;

import io.trino.parquet.RichColumnDescriptor;
import io.trino.parquet.PrimitiveField;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.type.Type;

Expand All @@ -22,9 +22,9 @@
public class FloatColumnReader
extends PrimitiveColumnReader
{
public FloatColumnReader(RichColumnDescriptor descriptor)
public FloatColumnReader(PrimitiveField field)
{
super(descriptor);
super(field);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/
package io.trino.parquet.reader;

import io.trino.parquet.RichColumnDescriptor;
import io.trino.parquet.PrimitiveField;
import io.trino.spi.TrinoException;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.type.LongTimestamp;
Expand All @@ -31,9 +31,9 @@
public class Int64TimestampMillisColumnReader
extends PrimitiveColumnReader
{
public Int64TimestampMillisColumnReader(RichColumnDescriptor descriptor)
public Int64TimestampMillisColumnReader(PrimitiveField field)
{
super(descriptor);
super(field);
}

@Override
Expand All @@ -56,7 +56,7 @@ else if (type == BIGINT) {
type.writeLong(blockBuilder, epochMillis);
}
else {
throw new TrinoException(NOT_SUPPORTED, format("Unsupported Trino column type (%s) for Parquet column (%s)", type, columnDescriptor));
throw new TrinoException(NOT_SUPPORTED, format("Unsupported Trino column type (%s) for Parquet column (%s)", type, field.getDescriptor()));
}
}
}
Loading