Skip to content

Commit 01c38ed

Browse files
committed
Implement dereference pushdown for the Iceberg connector
1 parent 4fb5ae2 commit 01c38ed

File tree

18 files changed

+958
-88
lines changed

18 files changed

+958
-88
lines changed

plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveApplyProjectionUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
import static java.util.Objects.requireNonNull;
2929

30-
final class HiveApplyProjectionUtil
30+
public final class HiveApplyProjectionUtil
3131
{
3232
private HiveApplyProjectionUtil() {}
3333

plugin/trino-hive/src/main/java/io/trino/plugin/hive/ReaderColumns.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@
2626
* - the projected columns required by a connector level pagesource and
2727
* - the columns supplied by format-specific page source
2828
* <p>
29-
* Currently used in {@link HivePageSource}.
29+
* Currently used in {@link HivePageSource} and {@code io.trino.plugin.iceberg.IcebergPageSource}.
3030
*/
3131
public class ReaderColumns
3232
{
3333
// columns to be read by the reader (ordered)
3434
private final List<ColumnHandle> readerColumns;
35-
// indices for mapping expected hive column handles to the reader's column handles
35+
// indices for mapping expected column handles to the reader's column handles
3636
private final List<Integer> readerBlockIndices;
3737

3838
public ReaderColumns(List<? extends ColumnHandle> readerColumns, List<Integer> readerBlockIndices)

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ColumnIdentity.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.trino.plugin.iceberg;
1515

1616
import com.fasterxml.jackson.annotation.JsonCreator;
17+
import com.fasterxml.jackson.annotation.JsonIgnore;
1718
import com.fasterxml.jackson.annotation.JsonProperty;
1819
import com.google.common.collect.ImmutableList;
1920
import com.google.common.collect.ImmutableMap;
@@ -40,6 +41,7 @@ public class ColumnIdentity
4041
private final TypeCategory typeCategory;
4142
// Underlying ImmutableMap is used to maintain the column ordering
4243
private final Map<Integer, ColumnIdentity> children;
44+
private final Map<Integer, Integer> childFieldIdToIndex;
4345

4446
@JsonCreator
4547
public ColumnIdentity(
@@ -56,10 +58,14 @@ public ColumnIdentity(
5658
children.isEmpty() == (typeCategory == PRIMITIVE),
5759
"Children should be empty if and only if column type is primitive");
5860
ImmutableMap.Builder<Integer, ColumnIdentity> childrenBuilder = ImmutableMap.builder();
59-
for (ColumnIdentity child : children) {
61+
ImmutableMap.Builder<Integer, Integer> childFieldIdToIndex = ImmutableMap.builder();
62+
for (int i = 0; i < children.size(); i++) {
63+
ColumnIdentity child = children.get(i);
6064
childrenBuilder.put(child.getId(), child);
65+
childFieldIdToIndex.put(child.getId(), i);
6166
}
6267
this.children = childrenBuilder.build();
68+
this.childFieldIdToIndex = childFieldIdToIndex.build();
6369
}
6470

6571
@JsonProperty
@@ -86,6 +92,20 @@ public List<ColumnIdentity> getChildren()
8692
return ImmutableList.copyOf(children.values());
8793
}
8894

95+
@JsonIgnore
96+
public ColumnIdentity getChildByFieldId(int fieldId)
97+
{
98+
checkArgument(children.containsKey(fieldId), "ColumnIdentity %s does not contain child with field id %s", this, fieldId);
99+
return children.get(fieldId);
100+
}
101+
102+
@JsonIgnore
103+
public int getChildIndexByFieldId(int fieldId)
104+
{
105+
checkArgument(childFieldIdToIndex.containsKey(fieldId), "ColumnIdentity %s does not contain child with field id %s", this, fieldId);
106+
return childFieldIdToIndex.get(fieldId);
107+
}
108+
89109
@Override
90110
public boolean equals(Object o)
91111
{

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ExpressionConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public static Expression toIcebergExpression(TupleDomain<IcebergColumnHandle> tu
8282
for (Map.Entry<IcebergColumnHandle, Domain> entry : domainMap.entrySet()) {
8383
IcebergColumnHandle columnHandle = entry.getKey();
8484
Domain domain = entry.getValue();
85-
expression = and(expression, toIcebergExpression(columnHandle.getName(), columnHandle.getType(), domain));
85+
expression = and(expression, toIcebergExpression(columnHandle.getQualifiedName(), columnHandle.getType(), domain));
8686
}
8787
return expression;
8888
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java

Lines changed: 76 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,40 +16,52 @@
1616
import com.fasterxml.jackson.annotation.JsonCreator;
1717
import com.fasterxml.jackson.annotation.JsonIgnore;
1818
import com.fasterxml.jackson.annotation.JsonProperty;
19+
import com.google.common.collect.ImmutableList;
20+
import com.google.common.collect.Iterables;
1921
import io.trino.spi.connector.ColumnHandle;
2022
import io.trino.spi.type.Type;
21-
import io.trino.spi.type.TypeManager;
22-
import org.apache.iceberg.types.Types;
2323

24+
import java.util.List;
2425
import java.util.Objects;
2526
import java.util.Optional;
2627

27-
import static io.trino.plugin.iceberg.ColumnIdentity.createColumnIdentity;
28-
import static io.trino.plugin.iceberg.ColumnIdentity.primitiveColumnIdentity;
29-
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
3028
import static java.util.Objects.requireNonNull;
3129

3230
public class IcebergColumnHandle
3331
implements ColumnHandle
3432
{
35-
private final ColumnIdentity columnIdentity;
33+
private final ColumnIdentity baseColumnIdentity;
34+
private final Type baseType;
35+
// The list of field ids to indicate the projected part of the top-level column represented by baseColumnIdentity
36+
private final List<Integer> path;
3637
private final Type type;
3738
private final Optional<String> comment;
39+
// Cache of ColumnIdentity#getId to ensure quick access, even with dereferences
40+
private final int id;
3841

3942
@JsonCreator
4043
public IcebergColumnHandle(
41-
@JsonProperty("columnIdentity") ColumnIdentity columnIdentity,
44+
@JsonProperty("baseColumnIdentity") ColumnIdentity baseColumnIdentity,
45+
@JsonProperty("baseType") Type baseType,
46+
@JsonProperty("path") List<Integer> path,
4247
@JsonProperty("type") Type type,
4348
@JsonProperty("comment") Optional<String> comment)
4449
{
45-
this.columnIdentity = requireNonNull(columnIdentity, "columnIdentity is null");
50+
this.baseColumnIdentity = requireNonNull(baseColumnIdentity, "baseColumnIdentity is null");
51+
this.baseType = requireNonNull(baseType, "baseType is null");
52+
this.path = ImmutableList.copyOf(requireNonNull(path, "path is null"));
4653
this.type = requireNonNull(type, "type is null");
4754
this.comment = requireNonNull(comment, "comment is null");
55+
this.id = path.isEmpty() ? baseColumnIdentity.getId() : Iterables.getLast(path);
4856
}
4957

50-
@JsonProperty
58+
@JsonIgnore
5159
public ColumnIdentity getColumnIdentity()
5260
{
61+
ColumnIdentity columnIdentity = baseColumnIdentity;
62+
for (int fieldId : path) {
63+
columnIdentity = columnIdentity.getChildByFieldId(fieldId);
64+
}
5365
return columnIdentity;
5466
}
5567

@@ -59,6 +71,24 @@ public Type getType()
5971
return type;
6072
}
6173

74+
@JsonProperty
75+
public ColumnIdentity getBaseColumnIdentity()
76+
{
77+
return baseColumnIdentity;
78+
}
79+
80+
@JsonProperty
81+
public Type getBaseType()
82+
{
83+
return baseType;
84+
}
85+
86+
@JsonIgnore
87+
public IcebergColumnHandle getBaseColumn()
88+
{
89+
return new IcebergColumnHandle(getBaseColumnIdentity(), getBaseType(), ImmutableList.of(), getBaseType(), Optional.empty());
90+
}
91+
6292
@JsonProperty
6393
public Optional<String> getComment()
6494
{
@@ -68,19 +98,50 @@ public Optional<String> getComment()
6898
@JsonIgnore
6999
public int getId()
70100
{
71-
return columnIdentity.getId();
101+
return id;
72102
}
73103

104+
/**
105+
* For nested columns, this is the unqualified name of the last field in the path
106+
*/
74107
@JsonIgnore
75108
public String getName()
76109
{
77-
return columnIdentity.getName();
110+
return getColumnIdentity().getName();
111+
}
112+
113+
@JsonProperty
114+
public List<Integer> getPath()
115+
{
116+
return path;
117+
}
118+
119+
/**
120+
* The dot separated path components used to address this column, including all dereferences and the column name.
121+
*/
122+
@JsonIgnore
123+
public String getQualifiedName()
124+
{
125+
ImmutableList.Builder<String> pathNames = ImmutableList.builder();
126+
ColumnIdentity columnIdentity = baseColumnIdentity;
127+
pathNames.add(columnIdentity.getName());
128+
for (int fieldId : path) {
129+
columnIdentity = columnIdentity.getChildByFieldId(fieldId);
130+
pathNames.add(columnIdentity.getName());
131+
}
132+
// Iceberg tables are guaranteed not to have ambiguous column names so joining them like this must uniquely identify a single column.
133+
return String.join(".", pathNames.build());
134+
}
135+
136+
public boolean isBaseColumn()
137+
{
138+
return path.isEmpty();
78139
}
79140

80141
@Override
81142
public int hashCode()
82143
{
83-
return Objects.hash(columnIdentity, type, comment);
144+
return Objects.hash(baseColumnIdentity, baseType, path, type, comment);
84145
}
85146

86147
@Override
@@ -93,7 +154,9 @@ public boolean equals(Object obj)
93154
return false;
94155
}
95156
IcebergColumnHandle other = (IcebergColumnHandle) obj;
96-
return Objects.equals(this.columnIdentity, other.columnIdentity) &&
157+
return Objects.equals(this.baseColumnIdentity, other.baseColumnIdentity) &&
158+
Objects.equals(this.baseType, other.baseType) &&
159+
Objects.equals(this.path, other.path) &&
97160
Objects.equals(this.type, other.type) &&
98161
Objects.equals(this.comment, other.comment);
99162
}
@@ -103,17 +166,4 @@ public String toString()
103166
{
104167
return getId() + ":" + getName() + ":" + type.getDisplayName();
105168
}
106-
107-
public static IcebergColumnHandle primitiveIcebergColumnHandle(int id, String name, Type type, Optional<String> comment)
108-
{
109-
return new IcebergColumnHandle(primitiveColumnIdentity(id, name), type, comment);
110-
}
111-
112-
public static IcebergColumnHandle create(Types.NestedField column, TypeManager typeManager)
113-
{
114-
return new IcebergColumnHandle(
115-
createColumnIdentity(column),
116-
toTrinoType(column.type(), typeManager),
117-
Optional.ofNullable(column.doc()));
118-
}
119169
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class IcebergConfig
3737
private CatalogType catalogType = HIVE_METASTORE;
3838
private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS);
3939
private boolean tableStatisticsEnabled = true;
40+
private boolean projectionPushdownEnabled = true;
4041

4142
public CatalogType getCatalogType()
4243
{
@@ -153,4 +154,17 @@ public boolean isTableStatisticsEnabled()
153154
{
154155
return tableStatisticsEnabled;
155156
}
157+
158+
public boolean isProjectionPushdownEnabled()
159+
{
160+
return projectionPushdownEnabled;
161+
}
162+
163+
@Config("iceberg.projection-pushdown-enabled")
164+
@ConfigDescription("Read only required fields from a struct")
165+
public IcebergConfig setProjectionPushdownEnabled(boolean projectionPushdownEnabled)
166+
{
167+
this.projectionPushdownEnabled = projectionPushdownEnabled;
168+
return this;
169+
}
156170
}

0 commit comments

Comments
 (0)