Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions lib/trino-orc/src/main/java/io/trino/orc/OrcReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public OrcRecordReader createRecordReader(
return createRecordReader(
readColumns,
readTypes,
Collections.nCopies(readColumns.size(), ProjectedLayout.fullyProjectedLayout()),
Collections.nCopies(readColumns.size(), fullyProjectedLayout()),
predicate,
0,
orcDataSource.getEstimatedSize(),
Expand Down Expand Up @@ -432,29 +432,40 @@ static void validateFile(
}
}

public static class ProjectedLayout
public interface ProjectedLayout
{
ProjectedLayout getFieldLayout(OrcColumn orcColumn);
}

/**
* Constructs a ProjectedLayout where all subfields must be read
*/
public static ProjectedLayout fullyProjectedLayout()
{
return orcColumn -> fullyProjectedLayout();
}

public static class NameBasedProjectedLayout
implements ProjectedLayout
{
private final Optional<Map<String, ProjectedLayout>> fieldLayouts;

private ProjectedLayout(Optional<Map<String, ProjectedLayout>> fieldLayouts)
private NameBasedProjectedLayout(Optional<Map<String, ProjectedLayout>> fieldLayouts)
{
this.fieldLayouts = requireNonNull(fieldLayouts, "fieldLayouts is null");
}

public ProjectedLayout getFieldLayout(String name)
@Override
public ProjectedLayout getFieldLayout(OrcColumn orcColumn)
{
String name = orcColumn.getColumnName().toLowerCase(ENGLISH);
if (fieldLayouts.isPresent()) {
return fieldLayouts.get().get(name);
}

return fullyProjectedLayout();
}

public static ProjectedLayout fullyProjectedLayout()
{
return new ProjectedLayout(Optional.empty());
}

public static ProjectedLayout createProjectedLayout(OrcColumn root, List<List<String>> dereferences)
{
if (dereferences.stream().map(List::size).anyMatch(Predicate.isEqual(0))) {
Expand All @@ -474,7 +485,7 @@ public static ProjectedLayout createProjectedLayout(OrcColumn root, List<List<St
}
}

return new ProjectedLayout(Optional.of(fieldLayouts.build()));
return new NameBasedProjectedLayout(Optional.of(fieldLayouts.build()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static io.trino.orc.OrcReader.ProjectedLayout.fullyProjectedLayout;
import static io.trino.orc.OrcReader.fullyProjectedLayout;
import static io.trino.orc.metadata.Stream.StreamKind.LENGTH;
import static io.trino.orc.metadata.Stream.StreamKind.PRESENT;
import static io.trino.orc.reader.ColumnReaders.createColumnReader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static io.trino.orc.OrcReader.ProjectedLayout.fullyProjectedLayout;
import static io.trino.orc.OrcReader.fullyProjectedLayout;
import static io.trino.orc.metadata.Stream.StreamKind.LENGTH;
import static io.trino.orc.metadata.Stream.StreamKind.PRESENT;
import static io.trino.orc.reader.ColumnReaders.createColumnReader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;

Expand All @@ -53,6 +52,7 @@
import static io.trino.orc.reader.ColumnReaders.createColumnReader;
import static io.trino.orc.reader.ReaderUtils.verifyStreamType;
import static io.trino.orc.stream.MissingInputStreamSource.missingStreamSource;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;

public class StructColumnReader
Expand Down Expand Up @@ -98,13 +98,13 @@ public class StructColumnReader
for (Field field : this.type.getFields()) {
String fieldName = field.getName()
.orElseThrow(() -> new IllegalArgumentException("ROW type does not have field names declared: " + type))
.toLowerCase(Locale.ENGLISH);
.toLowerCase(ENGLISH);
fieldNames.add(fieldName);

OrcColumn fieldStream = fieldMapper.get(fieldName);

if (fieldStream != null) {
OrcReader.ProjectedLayout fieldLayout = readLayout.getFieldLayout(fieldName);
OrcReader.ProjectedLayout fieldLayout = readLayout.getFieldLayout(fieldStream);
if (fieldLayout != null) {
structFields.put(
fieldName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static io.trino.orc.OrcReader.ProjectedLayout.fullyProjectedLayout;
import static io.trino.orc.OrcReader.fullyProjectedLayout;
import static io.trino.orc.metadata.Stream.StreamKind.DATA;
import static io.trino.orc.metadata.Stream.StreamKind.PRESENT;
import static io.trino.orc.reader.ColumnReaders.createColumnReader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import static java.util.Objects.requireNonNull;

final class HiveApplyProjectionUtil
public final class HiveApplyProjectionUtil
{
private HiveApplyProjectionUtil() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
* - the projected columns required by a connector level pagesource and
* - the columns supplied by format-specific page source
* <p>
* Currently used in {@link HivePageSource}.
* Currently used in {@link HivePageSource} and {@code io.trino.plugin.iceberg.IcebergPageSource}.
*/
public class ReaderColumns
{
// columns to be read by the reader (ordered)
private final List<ColumnHandle> readerColumns;
// indices for mapping expected hive column handles to the reader's column handles
// indices for mapping expected column handles to the reader's column handles
private final List<Integer> readerBlockIndices;

public ReaderColumns(List<? extends ColumnHandle> readerColumns, List<Integer> readerBlockIndices)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.type.Type;

import javax.annotation.Nullable;

import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -39,7 +41,7 @@ public class ReaderProjectionsAdapter
private final List<Type> inputTypes;

public ReaderProjectionsAdapter(
List<ColumnHandle> expectedColumns,
List<? extends ColumnHandle> expectedColumns,
ReaderColumns readColumns,
ColumnTypeGetter typeGetter,
ProjectionGetter projectionGetter)
Expand Down Expand Up @@ -68,7 +70,8 @@ public ReaderProjectionsAdapter(
.collect(toImmutableList());
}

public Page adaptPage(Page input)
@Nullable
public Page adaptPage(@Nullable Page input)
{
if (input == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
import static com.google.common.collect.Maps.uniqueIndex;
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.orc.OrcReader.MAX_BATCH_SIZE;
import static io.trino.orc.OrcReader.ProjectedLayout.fullyProjectedLayout;
import static io.trino.orc.OrcReader.createOrcReader;
import static io.trino.orc.OrcReader.fullyProjectedLayout;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_MISSING_DATA;
import static io.trino.plugin.hive.acid.AcidSchema.ACID_COLUMN_BUCKET;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@
import static com.google.common.collect.Maps.uniqueIndex;
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.orc.OrcReader.INITIAL_BATCH_SIZE;
import static io.trino.orc.OrcReader.ProjectedLayout.createProjectedLayout;
import static io.trino.orc.OrcReader.ProjectedLayout.fullyProjectedLayout;
import static io.trino.orc.OrcReader.NameBasedProjectedLayout.createProjectedLayout;
import static io.trino.orc.OrcReader.fullyProjectedLayout;
import static io.trino.orc.metadata.OrcMetadataWriter.PRESTO_WRITER_ID;
import static io.trino.orc.metadata.OrcMetadataWriter.TRINO_WRITER_ID;
import static io.trino.orc.metadata.OrcType.OrcTypeKind.INT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
package io.trino.plugin.iceberg;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;

import java.util.List;
import java.util.Map;
import java.util.Objects;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -36,7 +39,9 @@ public class ColumnIdentity
private final int id;
private final String name;
private final TypeCategory typeCategory;
private final List<ColumnIdentity> children;
// Underlying ImmutableMap is used to maintain the column ordering
private final Map<Integer, ColumnIdentity> children;
private final Map<Integer, Integer> childFieldIdToIndex;

@JsonCreator
public ColumnIdentity(
Expand All @@ -48,10 +53,19 @@ public ColumnIdentity(
this.id = id;
this.name = requireNonNull(name, "name is null");
this.typeCategory = requireNonNull(typeCategory, "typeCategory is null");
this.children = ImmutableList.copyOf(requireNonNull(children, "children is null"));
requireNonNull(children, "children is null");
checkArgument(
children.isEmpty() == (typeCategory == PRIMITIVE),
"Children should be empty if and only if column type is primitive");
ImmutableMap.Builder<Integer, ColumnIdentity> childrenBuilder = ImmutableMap.builder();
ImmutableMap.Builder<Integer, Integer> childFieldIdToIndex = ImmutableMap.builder();
for (int i = 0; i < children.size(); i++) {
ColumnIdentity child = children.get(i);
childrenBuilder.put(child.getId(), child);
childFieldIdToIndex.put(child.getId(), i);
}
this.children = childrenBuilder.build();
this.childFieldIdToIndex = childFieldIdToIndex.build();
}

@JsonProperty
Expand All @@ -75,7 +89,21 @@ public TypeCategory getTypeCategory()
@JsonProperty
public List<ColumnIdentity> getChildren()
{
return children;
return ImmutableList.copyOf(children.values());
}

@JsonIgnore
public ColumnIdentity getChildByFieldId(int fieldId)
{
checkArgument(children.containsKey(fieldId), "ColumnIdentity %s does not contain child with field id %s", this, fieldId);
return children.get(fieldId);
}

@JsonIgnore
public int getChildIndexByFieldId(int fieldId)
{
checkArgument(childFieldIdToIndex.containsKey(fieldId), "ColumnIdentity %s does not contain child with field id %s", this, fieldId);
return childFieldIdToIndex.get(fieldId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public static Expression toIcebergExpression(TupleDomain<IcebergColumnHandle> tu
for (Map.Entry<IcebergColumnHandle, Domain> entry : domainMap.entrySet()) {
IcebergColumnHandle columnHandle = entry.getKey();
Domain domain = entry.getValue();
expression = and(expression, toIcebergExpression(columnHandle.getName(), columnHandle.getType(), domain));
expression = and(expression, toIcebergExpression(columnHandle.getQualifiedName(), columnHandle.getType(), domain));
}
return expression;
}
Expand Down
Loading