Skip to content

Commit 123e2cc

Browse files
groupcache4321raunaqmorarka
authored andcommitted
Implement predicate pushdown for ROW sub fields in parquet for hive
1 parent 4a1eb9b commit 123e2cc

File tree

5 files changed

+328
-63
lines changed

5 files changed

+328
-63
lines changed

plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.parquet.io.MessageColumnIO;
5858
import org.apache.parquet.schema.GroupType;
5959
import org.apache.parquet.schema.MessageType;
60+
import org.apache.parquet.schema.Type;
6061
import org.joda.time.DateTimeZone;
6162

6263
import javax.inject.Inject;
@@ -72,6 +73,7 @@
7273
import java.util.Set;
7374

7475
import static com.google.common.base.Preconditions.checkArgument;
76+
import static com.google.common.collect.ImmutableList.toImmutableList;
7577
import static com.google.common.collect.ImmutableSet.toImmutableSet;
7678
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
7779
import static io.trino.parquet.BloomFilterStore.getBloomFilterStore;
@@ -218,9 +220,6 @@ public static ReaderPageSource createPageSource(
218220
Optional<ParquetWriteValidation> parquetWriteValidation,
219221
int domainCompactionThreshold)
220222
{
221-
// Ignore predicates on partial columns for now.
222-
effectivePredicate = effectivePredicate.filter((column, domain) -> column.isBaseColumn());
223-
224223
MessageType fileSchema;
225224
MessageType requestedSchema;
226225
MessageColumnIO messageColumn;
@@ -410,18 +409,32 @@ public static TupleDomain<ColumnDescriptor> getParquetTupleDomain(
410409
}
411410

412411
ColumnDescriptor descriptor;
413-
if (useColumnNames) {
414-
descriptor = descriptorsByPath.get(ImmutableList.of(columnHandle.getName()));
412+
413+
Optional<org.apache.parquet.schema.Type> baseColumnType = getBaseColumnParquetType(columnHandle, fileSchema, useColumnNames);
414+
// Parquet file has fewer column than partition
415+
if (baseColumnType.isEmpty()) {
416+
continue;
417+
}
418+
419+
if (baseColumnType.get().isPrimitive()) {
420+
descriptor = descriptorsByPath.get(ImmutableList.of(baseColumnType.get().getName()));
415421
}
416422
else {
417-
Optional<org.apache.parquet.schema.Type> parquetField = getBaseColumnParquetType(columnHandle, fileSchema, false);
418-
if (parquetField.isEmpty() || !parquetField.get().isPrimitive()) {
419-
// Parquet file has fewer column than partition
420-
// Or the field is a complex type
423+
if (columnHandle.getHiveColumnProjectionInfo().isEmpty()) {
421424
continue;
422425
}
423-
descriptor = descriptorsByPath.get(ImmutableList.of(parquetField.get().getName()));
426+
Optional<List<Type>> subfieldTypes = dereferenceSubFieldTypes(baseColumnType.get().asGroupType(), columnHandle.getHiveColumnProjectionInfo().get());
427+
// failed to look up subfields from the file schema
428+
if (subfieldTypes.isEmpty()) {
429+
continue;
430+
}
431+
432+
descriptor = descriptorsByPath.get(ImmutableList.<String>builder()
433+
.add(baseColumnType.get().getName())
434+
.addAll(subfieldTypes.get().stream().map(Type::getName).collect(toImmutableList()))
435+
.build());
424436
}
437+
425438
if (descriptor != null) {
426439
predicate.put(descriptor, entry.getValue());
427440
}

plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestHiveParquetComplexTypePredicatePushDown.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,31 @@
1414
package io.trino.plugin.hive.parquet;
1515

1616
import io.trino.plugin.hive.HiveQueryRunner;
17-
import io.trino.testing.BaseTestParquetComplexTypePredicatePushDown;
17+
import io.trino.testing.BaseTestFileFormatComplexTypesPredicatePushDown;
1818
import io.trino.testing.QueryRunner;
19+
import org.testng.annotations.Test;
20+
21+
import static io.trino.testing.TestingNames.randomNameSuffix;
22+
import static org.assertj.core.api.Assertions.assertThat;
1923

2024
public class TestHiveParquetComplexTypePredicatePushDown
21-
extends BaseTestParquetComplexTypePredicatePushDown
25+
extends BaseTestFileFormatComplexTypesPredicatePushDown
2226
{
2327
@Override
2428
protected QueryRunner createQueryRunner()
2529
throws Exception
2630
{
27-
return HiveQueryRunner.builder().build();
31+
return HiveQueryRunner.builder()
32+
.addHiveProperty("hive.storage-format", "PARQUET")
33+
.build();
34+
}
35+
36+
@Test
37+
public void ensureFormatParquet()
38+
{
39+
String tableName = "test_table_" + randomNameSuffix();
40+
assertUpdate("CREATE TABLE " + tableName + " (colTest BIGINT)");
41+
assertThat(((String) computeScalar("SHOW CREATE TABLE " + tableName))).contains("PARQUET");
42+
assertUpdate("DROP TABLE " + tableName);
2843
}
2944
}

plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/predicate/TestParquetPredicateUtils.java

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.google.common.collect.ImmutableMap;
1818
import com.google.common.collect.Iterables;
1919
import io.trino.plugin.hive.HiveColumnHandle;
20+
import io.trino.plugin.hive.HiveColumnProjectionInfo;
2021
import io.trino.plugin.hive.HiveType;
2122
import io.trino.spi.predicate.Domain;
2223
import io.trino.spi.predicate.TupleDomain;
@@ -122,12 +123,129 @@ public void testParquetTupleDomainStruct(boolean useColumnNames)
122123
MessageType fileSchema = new MessageType("hive_schema",
123124
new GroupType(OPTIONAL, "my_struct",
124125
new PrimitiveType(OPTIONAL, INT32, "a"),
125-
new PrimitiveType(OPTIONAL, INT32, "b")));
126+
new PrimitiveType(OPTIONAL, INT32, "b"),
127+
new PrimitiveType(OPTIONAL, INT32, "c")));
126128
Map<List<String>, ColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, fileSchema);
127129
TupleDomain<ColumnDescriptor> tupleDomain = getParquetTupleDomain(descriptorsByPath, domain, fileSchema, useColumnNames);
128130
assertTrue(tupleDomain.isAll());
129131
}
130132

133+
@Test(dataProvider = "useColumnNames")
134+
public void testParquetTupleDomainStructWithPrimitiveColumnPredicate(boolean useColumNames)
135+
{
136+
RowType baseType = rowType(
137+
RowType.field("a", INTEGER),
138+
RowType.field("b", INTEGER),
139+
RowType.field("c", INTEGER));
140+
141+
HiveColumnProjectionInfo columnProjectionInfo = new HiveColumnProjectionInfo(
142+
ImmutableList.of(1),
143+
ImmutableList.of("b"),
144+
HiveType.HIVE_INT,
145+
INTEGER);
146+
147+
HiveColumnHandle projectedColumn = new HiveColumnHandle(
148+
"row_field",
149+
0,
150+
HiveType.toHiveType(baseType),
151+
baseType,
152+
Optional.of(columnProjectionInfo),
153+
REGULAR,
154+
Optional.empty());
155+
156+
Domain predicateDomain = Domain.singleValue(INTEGER, 123L);
157+
TupleDomain<HiveColumnHandle> tupleDomain = withColumnDomains(ImmutableMap.of(projectedColumn, predicateDomain));
158+
159+
MessageType fileSchema = new MessageType("hive_schema",
160+
new GroupType(OPTIONAL, "row_field",
161+
new PrimitiveType(OPTIONAL, INT32, "a"),
162+
new PrimitiveType(OPTIONAL, INT32, "b"),
163+
new PrimitiveType(OPTIONAL, INT32, "c")));
164+
Map<List<String>, ColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, fileSchema);
165+
TupleDomain<ColumnDescriptor> calculatedTupleDomain = getParquetTupleDomain(descriptorsByPath, tupleDomain, fileSchema, useColumNames);
166+
assertEquals(calculatedTupleDomain.getDomains().get().size(), 1);
167+
ColumnDescriptor selectedColumnDescriptor = descriptorsByPath.get(ImmutableList.of("row_field", "b"));
168+
assertEquals(calculatedTupleDomain.getDomains().get().get(selectedColumnDescriptor), predicateDomain);
169+
}
170+
171+
@Test(dataProvider = "useColumnNames")
172+
public void testParquetTupleDomainStructWithComplexColumnPredicate(boolean useColumNames)
173+
{
174+
RowType c1Type = rowType(
175+
RowType.field("c1", INTEGER),
176+
RowType.field("c2", INTEGER));
177+
RowType baseType = rowType(
178+
RowType.field("a", INTEGER),
179+
RowType.field("b", INTEGER),
180+
RowType.field("c", c1Type));
181+
182+
HiveColumnProjectionInfo columnProjectionInfo = new HiveColumnProjectionInfo(
183+
ImmutableList.of(2),
184+
ImmutableList.of("C"),
185+
HiveType.toHiveType(c1Type),
186+
c1Type);
187+
188+
HiveColumnHandle projectedColumn = new HiveColumnHandle(
189+
"row_field",
190+
0,
191+
HiveType.toHiveType(baseType),
192+
baseType,
193+
Optional.of(columnProjectionInfo),
194+
REGULAR,
195+
Optional.empty());
196+
197+
Domain predicateDomain = Domain.onlyNull(c1Type);
198+
TupleDomain<HiveColumnHandle> tupleDomain = withColumnDomains(ImmutableMap.of(projectedColumn, predicateDomain));
199+
200+
MessageType fileSchema = new MessageType("hive_schema",
201+
new GroupType(OPTIONAL, "row_field",
202+
new PrimitiveType(OPTIONAL, INT32, "a"),
203+
new PrimitiveType(OPTIONAL, INT32, "b"),
204+
new GroupType(OPTIONAL,
205+
"c",
206+
new PrimitiveType(OPTIONAL, INT32, "c1"),
207+
new PrimitiveType(OPTIONAL, INT32, "c2"))));
208+
Map<List<String>, ColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, fileSchema);
209+
// skip looking up predicates for complex types as Parquet only stores stats for primitives
210+
TupleDomain<ColumnDescriptor> calculatedTupleDomain = getParquetTupleDomain(descriptorsByPath, tupleDomain, fileSchema, useColumNames);
211+
assertTrue(calculatedTupleDomain.isAll());
212+
}
213+
214+
@Test(dataProvider = "useColumnNames")
215+
public void testParquetTupleDomainStructWithMissingPrimitiveColumn(boolean useColumnNames)
216+
{
217+
RowType baseType = rowType(
218+
RowType.field("a", INTEGER),
219+
RowType.field("b", INTEGER),
220+
RowType.field("non_exist", INTEGER));
221+
222+
HiveColumnProjectionInfo columnProjectionInfo = new HiveColumnProjectionInfo(
223+
ImmutableList.of(2),
224+
ImmutableList.of("non_exist"),
225+
HiveType.HIVE_INT,
226+
INTEGER);
227+
228+
HiveColumnHandle projectedColumn = new HiveColumnHandle(
229+
"row_field",
230+
0,
231+
HiveType.toHiveType(baseType),
232+
baseType,
233+
Optional.of(columnProjectionInfo),
234+
REGULAR,
235+
Optional.empty());
236+
237+
Domain predicateDomain = Domain.singleValue(INTEGER, 123L);
238+
TupleDomain<HiveColumnHandle> tupleDomain = withColumnDomains(ImmutableMap.of(projectedColumn, predicateDomain));
239+
240+
MessageType fileSchema = new MessageType("hive_schema",
241+
new GroupType(OPTIONAL, "row_field",
242+
new PrimitiveType(OPTIONAL, INT32, "a"),
243+
new PrimitiveType(OPTIONAL, INT32, "b")));
244+
Map<List<String>, ColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, fileSchema);
245+
TupleDomain<ColumnDescriptor> calculatedTupleDomain = getParquetTupleDomain(descriptorsByPath, tupleDomain, fileSchema, useColumnNames);
246+
assertTrue(calculatedTupleDomain.isAll());
247+
}
248+
131249
@Test(dataProvider = "useColumnNames")
132250
public void testParquetTupleDomainMap(boolean useColumnNames)
133251
{

0 commit comments

Comments
 (0)