From d9f98033d47f28bff53e7bd592c915e36040c392 Mon Sep 17 00:00:00 2001 From: Zhenxiao Luo Date: Thu, 22 Aug 2019 14:35:57 -0700 Subject: [PATCH] Subfield pruning in Parquet --- .../hive/parquet/ParquetPageSource.java | 2 +- .../parquet/ParquetPageSourceFactory.java | 43 +++++--- .../facebook/presto/hive/HiveQueryRunner.java | 13 ++- .../hive/TestParquetDistributedQueries.java | 98 +++++++++++++++++++ .../presto/parquet/ParquetTypeUtils.java | 41 +++++++- 5 files changed, 180 insertions(+), 17 deletions(-) create mode 100644 presto-hive/src/test/java/com/facebook/presto/hive/TestParquetDistributedQueries.java diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSource.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSource.java index 65b79578e3188..933dd035e63b2 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSource.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSource.java @@ -101,7 +101,7 @@ public ParquetPageSource( typesBuilder.add(type); hiveColumnIndexes[columnIndex] = column.getHiveColumnIndex(); - if (getParquetType(type, fileSchema, useParquetColumnNames, column.getName(), column.getHiveColumnIndex(), column.getHiveType()).isPresent()) { + if (getParquetType(type, fileSchema, useParquetColumnNames, column).isPresent()) { String columnName = useParquetColumnNames ? name : fileSchema.getFields().get(column.getHiveColumnIndex()).getName(); fieldsBuilder.add(constructField(type, lookupColumnByName(messageColumnIO, columnName))); } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java index d37762306eee1..e6ce0e793ffc2 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java @@ -17,7 +17,6 @@ import com.facebook.presto.hive.HdfsEnvironment; import com.facebook.presto.hive.HiveBatchPageSourceFactory; import com.facebook.presto.hive.HiveColumnHandle; -import com.facebook.presto.hive.HiveType; import com.facebook.presto.memory.context.AggregatedMemoryContext; import com.facebook.presto.parquet.ParquetCorruptionException; import com.facebook.presto.parquet.ParquetDataSource; @@ -28,6 +27,7 @@ import com.facebook.presto.spi.ConnectorPageSource; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.Subfield; import com.facebook.presto.spi.predicate.Domain; import com.facebook.presto.spi.predicate.TupleDomain; import com.facebook.presto.spi.type.StandardTypes; @@ -74,6 +74,7 @@ import static com.facebook.presto.parquet.ParquetTypeUtils.getColumnIO; import static com.facebook.presto.parquet.ParquetTypeUtils.getDescriptors; import static com.facebook.presto.parquet.ParquetTypeUtils.getParquetTypeByName; +import static com.facebook.presto.parquet.ParquetTypeUtils.getSubfieldType; import static com.facebook.presto.parquet.predicate.PredicateUtils.buildPredicate; import static com.facebook.presto.parquet.predicate.PredicateUtils.predicateMatches; import static com.facebook.presto.spi.type.StandardTypes.ARRAY; @@ -94,7 +95,6 @@ import static com.google.common.base.Strings.nullToEmpty; import static java.lang.String.format; import static java.util.Objects.requireNonNull; -import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.PRIMITIVE; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; @@ -186,14 +186,15 @@ public static ParquetPageSource createParquetPageSource( MessageType fileSchema = fileMetaData.getSchema(); dataSource = buildHdfsParquetDataSource(inputStream, path, fileSize, stats); - List fields = columns.stream() + Optional message = columns.stream() .filter(column -> column.getColumnType() == REGULAR) - .map(column -> getParquetType(typeManager.getType(column.getTypeSignature()), fileSchema, useParquetColumnNames, column.getName(), column.getHiveColumnIndex(), column.getHiveType())) + .map(column -> getColumnType(typeManager.getType(column.getTypeSignature()), fileSchema, useParquetColumnNames, column)) .filter(Optional::isPresent) .map(Optional::get) - .collect(toList()); + .map(type -> new MessageType(fileSchema.getName(), type)) + .reduce(MessageType::union); - MessageType requestedSchema = new MessageType(fileSchema.getName(), fields); + MessageType requestedSchema = message.orElse(new MessageType(fileSchema.getName(), ImmutableList.of())); ImmutableList.Builder footerBlocks = ImmutableList.builder(); for (BlockMetaData block : parquetMetadata.getBlocks()) { @@ -278,14 +279,14 @@ public static TupleDomain getParquetTupleDomain(Map getParquetType(Type prestoType, MessageType messageType, boolean useParquetColumnNames, String columnName, int columnHiveIndex, HiveType hiveType) + public static Optional getParquetType(Type prestoType, MessageType messageType, boolean useParquetColumnNames, HiveColumnHandle column) { org.apache.parquet.schema.Type type = null; if (useParquetColumnNames) { - type = getParquetTypeByName(columnName, messageType); + type = getParquetTypeByName(column.getName(), messageType); } - else if (columnHiveIndex < messageType.getFieldCount()) { - type = messageType.getType(columnHiveIndex); + else if (column.getHiveColumnIndex() < messageType.getFieldCount()) { + type = messageType.getType(column.getHiveColumnIndex()); } if (type == null) { @@ -304,8 +305,8 @@ else if (columnHiveIndex < messageType.getFieldCount()) { parquetTypeName = builder.toString(); } throw new PrestoException(HIVE_PARTITION_SCHEMA_MISMATCH, format("The column %s is declared as type %s, but the Parquet file declares the column as type %s", - columnName, - hiveType, + column.getName(), + column.getHiveType(), parquetTypeName)); } return Optional.of(type); @@ -390,4 +391,22 @@ private static boolean checkSchemaMatch(org.apache.parquet.schema.Type parquetTy throw new IllegalArgumentException("Unexpected parquet type name: " + parquetTypeName); } } + + public static Optional getColumnType(Type prestoType, MessageType messageType, boolean useParquetColumnNames, HiveColumnHandle column) + { + if (useParquetColumnNames && !column.getRequiredSubfields().isEmpty()) { + MessageType result = null; + for (Subfield subfield : column.getRequiredSubfields()) { + MessageType type = getSubfieldType(messageType, subfield); + if (result == null) { + result = type; + } + else { + result = result.union(type); + } + } + return Optional.of(result); + } + return getParquetType(prestoType, messageType, useParquetColumnNames, column); + } } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/HiveQueryRunner.java b/presto-hive/src/test/java/com/facebook/presto/hive/HiveQueryRunner.java index e75b8558861dc..0339d826eed62 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/HiveQueryRunner.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/HiveQueryRunner.java @@ -128,12 +128,19 @@ public static DistributedQueryRunner createQueryRunner(Iterable> ta .put("hive.collect-column-statistics-on-write", "true") .put("hive.temporary-table-schema", TEMPORARY_TABLE_SCHEMA) .build(); + + Map storageProperties = extraHiveProperties.containsKey("hive.storage-format") ? + ImmutableMap.copyOf(hiveProperties) : + ImmutableMap.builder() + .putAll(hiveProperties) + .put("hive.storage-format", "TEXTFILE") + .put("hive.compression-codec", "NONE") + .build(); + Map hiveBucketedProperties = ImmutableMap.builder() - .putAll(hiveProperties) + .putAll(storageProperties) .put("hive.max-initial-split-size", "10kB") // so that each bucket has multiple splits .put("hive.max-split-size", "10kB") // so that each bucket has multiple splits - .put("hive.storage-format", "TEXTFILE") // so that there's no minimum split size for the file - .put("hive.compression-codec", "NONE") // so that the file is splittable .build(); queryRunner.createCatalog(HIVE_CATALOG, HIVE_CATALOG, hiveProperties); queryRunner.createCatalog(HIVE_BUCKETED_CATALOG, HIVE_CATALOG, hiveBucketedProperties); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestParquetDistributedQueries.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestParquetDistributedQueries.java new file mode 100644 index 0000000000000..8907e6d30c7cf --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestParquetDistributedQueries.java @@ -0,0 +1,98 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import com.facebook.presto.testing.MaterializedResult; +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestDistributedQueries; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.Map; +import java.util.Optional; + +import static com.facebook.presto.hive.HiveQueryRunner.createQueryRunner; +import static com.facebook.presto.sql.tree.ExplainType.Type.LOGICAL; +import static com.google.common.collect.Iterables.getOnlyElement; +import static io.airlift.tpch.TpchTable.getTables; +import static org.testng.Assert.assertEquals; + +public class TestParquetDistributedQueries + extends AbstractTestDistributedQueries +{ + protected TestParquetDistributedQueries() + { + super(TestParquetDistributedQueries::createQueryRunner); + } + + private static QueryRunner createQueryRunner() + throws Exception + { + Map parquetProperties = ImmutableMap.builder() + .put("hive.storage-format", "PARQUET") + .put("hive.parquet.use-column-names", "true") + .put("hive.compression-codec", "GZIP") + .build(); + return HiveQueryRunner.createQueryRunner(getTables(), + ImmutableMap.of("experimental.pushdown-subfields-enabled", "true"), + "sql-standard", + parquetProperties, + Optional.empty()); + } + + @Test + public void testSubfieldPruning() + { + getQueryRunner().execute("CREATE TABLE test_subfield_pruning AS " + + "SELECT orderkey, linenumber, shipdate, " + + " CAST(ROW(orderkey, linenumber, ROW(day(shipdate), month(shipdate), year(shipdate))) " + + " AS ROW(orderkey BIGINT, linenumber INTEGER, shipdate ROW(ship_day TINYINT, ship_month TINYINT, ship_year INTEGER))) AS info " + + "FROM lineitem"); + + try { + assertQuery("SELECT info.orderkey, info.shipdate.ship_month FROM test_subfield_pruning", "SELECT orderkey, month(shipdate) FROM lineitem"); + + assertQuery("SELECT orderkey FROM test_subfield_pruning WHERE info.shipdate.ship_month % 2 = 0", "SELECT orderkey FROM lineitem WHERE month(shipdate) % 2 = 0"); + } + finally { + getQueryRunner().execute("DROP TABLE test_subfield_pruning"); + } + } + + @Override + protected boolean supportsNotNullColumns() + { + return false; + } + + @Override + public void testDelete() + { + // Hive connector currently does not support row-by-row delete + } + + @Override + public void testRenameColumn() + { + // Parquet field lookup use column name does not support Rename + } + + @Test + public void testExplainOfCreateTableAs() + { + String query = "CREATE TABLE copy_orders AS SELECT * FROM orders"; + MaterializedResult result = computeActual("EXPLAIN " + query); + assertEquals(getOnlyElement(result.getOnlyColumnAsSet()), getExplainPlan(query, LOGICAL)); + } +} diff --git a/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetTypeUtils.java b/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetTypeUtils.java index 660ef9d6f0ffc..a8931b6164220 100644 --- a/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetTypeUtils.java +++ b/presto-parquet/src/main/java/com/facebook/presto/parquet/ParquetTypeUtils.java @@ -13,8 +13,11 @@ */ package com.facebook.presto.parquet; +import com.facebook.presto.spi.Subfield; +import com.facebook.presto.spi.Subfield.PathElement; import com.facebook.presto.spi.type.DecimalType; import com.facebook.presto.spi.type.Type; +import com.google.common.collect.ImmutableList; import org.apache.parquet.column.Encoding; import org.apache.parquet.io.ColumnIO; import org.apache.parquet.io.ColumnIOFactory; @@ -24,6 +27,7 @@ import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.PrimitiveColumnIO; import org.apache.parquet.schema.DecimalMetadata; +import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import java.util.Arrays; @@ -33,6 +37,7 @@ import java.util.Map; import java.util.Optional; +import static com.facebook.presto.spi.Subfield.NestedField; import static com.google.common.base.Preconditions.checkArgument; import static org.apache.parquet.schema.OriginalType.DECIMAL; import static org.apache.parquet.schema.Type.Repetition.REPEATED; @@ -184,7 +189,7 @@ public static ParquetEncoding getParquetEncoding(Encoding encoding) } } - public static org.apache.parquet.schema.Type getParquetTypeByName(String columnName, MessageType messageType) + public static org.apache.parquet.schema.Type getParquetTypeByName(String columnName, GroupType messageType) { if (messageType.containsField(columnName)) { return messageType.getType(columnName); @@ -261,4 +266,38 @@ public static long getShortDecimalValue(byte[] bytes) return value; } + + public static MessageType getSubfieldType(GroupType baseType, Subfield subfield) + { + checkArgument(subfield.getPath().size() >= 1, "subfield size is less than 1"); + + ImmutableList.Builder typeBuilder = ImmutableList.builder(); + org.apache.parquet.schema.Type parentType = getParquetTypeByName(subfield.getRootName(), baseType); + + for (PathElement field : subfield.getPath()) { + if (field instanceof NestedField) { + NestedField nestedField = (NestedField) field; + org.apache.parquet.schema.Type childType = getParquetTypeByName(nestedField.getName(), parentType.asGroupType()); + if (childType != null) { + typeBuilder.add(childType); + parentType = childType; + } + } + else { + typeBuilder.add(parentType.asGroupType().getFields().get(0)); + break; + } + } + + List subfieldTypes = typeBuilder.build(); + if (subfieldTypes.isEmpty()) { + return new MessageType(subfield.getRootName(), ImmutableList.of()); + } + org.apache.parquet.schema.Type type = subfieldTypes.get(subfieldTypes.size() - 1); + for (int i = subfieldTypes.size() - 2; i >= 0; --i) { + GroupType groupType = subfieldTypes.get(i).asGroupType(); + type = new MessageType(groupType.getName(), ImmutableList.of(type)); + } + return new MessageType(subfield.getRootName(), ImmutableList.of(type)); + } }