diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java index e0d07e7ce4a58..ec6733e2e1a92 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java @@ -185,6 +185,7 @@ import static com.facebook.presto.hive.HiveColumnHandle.FILE_SIZE_COLUMN_NAME; import static com.facebook.presto.hive.HiveColumnHandle.PATH_COLUMN_NAME; import static com.facebook.presto.hive.HiveColumnHandle.ROW_ID_COLUMN_NAME; +import static com.facebook.presto.hive.HiveColumnHandle.rowIdColumnHandle; import static com.facebook.presto.hive.HiveColumnHandle.updateRowIdHandle; import static com.facebook.presto.hive.HiveErrorCode.HIVE_COLUMN_ORDER_MISMATCH; import static com.facebook.presto.hive.HiveErrorCode.HIVE_CONCURRENT_MODIFICATION_DETECTED; @@ -2924,7 +2925,8 @@ && isOrderBasedExecutionEnabled(session)) { streamPartitionColumns, discretePredicates, localPropertyBuilder.build(), - Optional.of(combinedRemainingPredicate)); + Optional.of(combinedRemainingPredicate), + Optional.of(rowIdColumnHandle())); } @Override diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveBucketedTablesWithRowId.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveBucketedTablesWithRowId.java new file mode 100644 index 0000000000000..c32fa22ac1116 --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveBucketedTablesWithRowId.java @@ -0,0 +1,240 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import com.facebook.presto.Session; +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.Optional; + +import static com.facebook.presto.SystemSessionProperties.UTILIZE_UNIQUE_PROPERTY_IN_QUERY_PLANNING; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.aggregation; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.exchange; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.filter; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.join; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.project; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableScan; +import static io.airlift.tpch.TpchTable.CUSTOMER; +import static io.airlift.tpch.TpchTable.ORDERS; + +@Test(singleThreaded = true) +public class TestHiveBucketedTablesWithRowId + extends AbstractTestQueryFramework +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return HiveQueryRunner.createQueryRunner( + ImmutableList.of(ORDERS, CUSTOMER), + ImmutableMap.of(), + Optional.empty()); + } + + @BeforeClass + public void setUp() + { + // Create bucketed customer table + assertUpdate("CREATE TABLE customer_bucketed WITH " + + "(bucketed_by = ARRAY['custkey'], bucket_count = 13) " + + "AS SELECT * FROM customer", 1500); + + // Create bucketed orders table + assertUpdate("CREATE TABLE orders_bucketed WITH " + + "(bucketed_by = ARRAY['orderkey'], bucket_count = 11) " + + "AS SELECT * FROM orders", 15000); + + // Verify tables are created + assertQuery("SELECT count(*) FROM customer_bucketed", "SELECT count(*) FROM customer"); + assertQuery("SELECT count(*) FROM orders_bucketed", "SELECT count(*) FROM orders"); + } + + @AfterClass(alwaysRun = true) + public void tearDown() + { + try { + assertUpdate("DROP TABLE IF EXISTS customer_bucketed"); + assertUpdate("DROP TABLE IF EXISTS orders_bucketed"); + } + catch (Exception e) { + // Ignore cleanup errors + } + } + + @Test + public void testRowIdWithBucketColumn() + { + Session session = Session.builder(getQueryRunner().getDefaultSession()) + .setSystemProperty(UTILIZE_UNIQUE_PROPERTY_IN_QUERY_PLANNING, "true") + .build(); + + // Test basic query with both $row_id and $bucket + String sql = "SELECT \"$row_id\", \"$bucket\", custkey, name " + + "FROM customer_bucketed " + + "WHERE \"$bucket\" = 5"; + + assertPlan(session, sql, anyTree( + project(filter(tableScan("customer_bucketed"))))); + + // Test aggregation grouping by both $row_id and $bucket + sql = "SELECT \"$row_id\", \"$bucket\", COUNT(*) " + + "FROM customer_bucketed " + + "GROUP BY \"$row_id\", \"$bucket\""; + + assertPlan(session, sql, anyTree( + aggregation(ImmutableMap.of(), + project(tableScan("customer_bucketed"))))); + + // Test join between bucketed tables using both $row_id and $bucket + sql = "SELECT c.\"$row_id\" AS customer_row_id, " + + "c.\"$bucket\" AS customer_bucket, " + + "o.\"$row_id\" AS order_row_id, " + + "o.\"$bucket\" AS order_bucket, " + + "c.name, o.orderkey " + + "FROM customer_bucketed c " + + "JOIN orders_bucketed o " + + "ON c.custkey = o.custkey " + + "WHERE c.\"$bucket\" IN (1, 3, 5) " + + "AND o.\"$bucket\" IN (2, 4, 6)"; + + assertPlan(session, sql, anyTree( + join( + project(filter(tableScan("customer_bucketed"))), + exchange(anyTree(tableScan("orders_bucketed")))))); + } + + @Test + public void testRowIdUniquePropertyWithBucketing() + { + Session session = Session.builder(getQueryRunner().getDefaultSession()) + .setSystemProperty(UTILIZE_UNIQUE_PROPERTY_IN_QUERY_PLANNING, "true") + .build(); + + // Test unique grouping by $row_id with bucket filtering + String sql = "SELECT " + + "customer_row_id, " + + "ARBITRARY(name) AS customer_name, " + + "ARBITRARY(bucket_num) AS customer_bucket, " + + "ARRAY_AGG(orderkey) AS orders_info " + + "FROM (" + + " SELECT " + + " c.\"$row_id\" AS customer_row_id, " + + " c.\"$bucket\" AS bucket_num, " + + " c.name, " + + " o.orderkey " + + " FROM customer_bucketed c " + + " LEFT JOIN orders_bucketed o " + + " ON c.custkey = o.custkey " + + " AND o.orderstatus IN ('O', 'F') " + + " WHERE c.\"$bucket\" < 5 " + + " AND c.nationkey IN (1, 2, 3) " + + ") " + + "GROUP BY customer_row_id"; + + assertPlan(session, sql, anyTree( + aggregation(ImmutableMap.of(), + join( + anyTree(tableScan("customer_bucketed")), + anyTree(tableScan("orders_bucketed")))))); + } + + @Test + public void testRowIdAndBucketInComplexQuery() + { + Session session = Session.builder(getQueryRunner().getDefaultSession()) + .setSystemProperty(UTILIZE_UNIQUE_PROPERTY_IN_QUERY_PLANNING, "true") + .build(); + + // Complex query with both row_id and bucket columns + String sql = "SELECT " + + " unique_id, " + + " bucket_group, " + + " COUNT(*) AS order_count, " + + " AVG(totalprice) AS avg_price " + + "FROM (" + + " SELECT " + + " c.\"$row_id\" AS unique_id, " + + " CASE " + + " WHEN c.\"$bucket\" < 5 THEN 'low' " + + " WHEN c.\"$bucket\" < 10 THEN 'medium' " + + " ELSE 'high' " + + " END AS bucket_group, " + + " o.totalprice " + + " FROM customer_bucketed c " + + " JOIN orders_bucketed o " + + " ON c.custkey = o.custkey " + + " WHERE o.\"$bucket\" % 2 = 0 " + + ") t " + + "GROUP BY unique_id, bucket_group"; + + assertPlan(session, sql, anyTree( + aggregation(ImmutableMap.of(), + project(project(join( + project(tableScan("customer_bucketed")), + anyTree(tableScan("orders_bucketed")))))))); + } + + @Test + public void testDistinctRowIdWithBucketFilter() + { + Session session = Session.builder(getQueryRunner().getDefaultSession()) + .setSystemProperty(UTILIZE_UNIQUE_PROPERTY_IN_QUERY_PLANNING, "true") + .build(); + + // Test DISTINCT with row_id and bucket filtering + String sql = "SELECT " + + " DISTINCT c.\"$row_id\" AS unique_id, " + + " c.\"$bucket\" AS bucket_num, " + + " c.name " + + "FROM customer_bucketed c " + + "WHERE c.\"$bucket\" BETWEEN 3 AND 8 " + + " AND c.nationkey = 1"; + + assertPlan(session, sql, anyTree( + aggregation(ImmutableMap.of(), + project(filter(tableScan("customer_bucketed")))))); + } + + @Test + public void testRowIdJoinOnBucketColumn() + { + Session session = Session.builder(getQueryRunner().getDefaultSession()) + .setSystemProperty(UTILIZE_UNIQUE_PROPERTY_IN_QUERY_PLANNING, "true") + .build(); + + // Test joining on bucket column while selecting row_id + String sql = "SELECT " + + " c.\"$row_id\" AS customer_row_id, " + + " o.\"$row_id\" AS order_row_id, " + + " c.\"$bucket\" AS shared_bucket, " + + " c.name, " + + " o.orderkey " + + "FROM customer_bucketed c " + + "JOIN orders_bucketed o " + + " ON c.\"$bucket\" = o.\"$bucket\" " + + "WHERE c.\"$bucket\" < 5"; + + assertPlan(session, sql, anyTree( + join( + exchange(anyTree(tableScan("customer_bucketed"))), + exchange(anyTree(tableScan("orders_bucketed")))))); + } +} diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveLogicalPlanner.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveLogicalPlanner.java index 07d0e3a7c6385..534beafe5a0b6 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveLogicalPlanner.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveLogicalPlanner.java @@ -85,6 +85,7 @@ import static com.facebook.presto.SystemSessionProperties.PUSHDOWN_DEREFERENCE_ENABLED; import static com.facebook.presto.SystemSessionProperties.PUSHDOWN_SUBFIELDS_ENABLED; import static com.facebook.presto.SystemSessionProperties.PUSHDOWN_SUBFIELDS_FOR_MAP_FUNCTIONS; +import static com.facebook.presto.SystemSessionProperties.UTILIZE_UNIQUE_PROPERTY_IN_QUERY_PLANNING; import static com.facebook.presto.common.function.OperatorType.EQUAL; import static com.facebook.presto.common.predicate.Domain.create; import static com.facebook.presto.common.predicate.Domain.multipleValues; @@ -130,6 +131,7 @@ import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.node; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.output; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.project; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.semiJoin; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.strictTableScan; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.values; import static com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher.searchFrom; @@ -2153,6 +2155,336 @@ public void testPartialAggregatePushdown() } } + @Test + public void testRowId() + { + Session session = Session.builder(getQueryRunner().getDefaultSession()) + .setSystemProperty(UTILIZE_UNIQUE_PROPERTY_IN_QUERY_PLANNING, "true") + .build(); + String sql; + sql = "SELECT\n" + + " unique_id AS unique_id,\n" + + " ARBITRARY(name) AS customer_name,\n" + + " ARRAY_AGG(orderkey) AS orders_info\n" + + "FROM (\n" + + " SELECT\n" + + " customer.name,\n" + + " orders.orderkey,\n" + + " customer.\"$row_id\" AS unique_id\n" + + " FROM customer\n" + + " LEFT JOIN orders\n" + + " ON customer.custkey = orders.custkey\n" + + " AND orders.orderstatus IN ('O', 'F')\n" + + " AND orders.orderdate BETWEEN DATE '1995-01-01' AND DATE '1995-12-31'\n" + + " WHERE\n" + + " customer.nationkey IN (1, 2, 3, 4, 5)\n" + + ")\n" + + "GROUP BY\n" + + " unique_id"; + + assertPlan(session, + sql, + anyTree( + aggregation( + ImmutableMap.of(), + join( + anyTree(tableScan("customer")), + anyTree(tableScan("orders")))))); + + sql = "select \"$row_id\", count(*) from orders group by 1"; + assertPlan(sql, anyTree(aggregation(ImmutableMap.of(), tableScan("orders")))); + sql = "SELECT\n" + + " customer.\"$row_id\" AS unique_id,\n" + + " COUNT(orders.orderkey) AS order_count,\n" + + " ARRAY_AGG(orders.orderkey) AS order_keys\n" + + "FROM customer\n" + + "LEFT JOIN orders\n" + + " ON customer.custkey = orders.custkey\n" + + "WHERE customer.acctbal > 1000\n" + + "GROUP BY customer.\"$row_id\""; + assertPlan(sql, anyTree( + aggregation(ImmutableMap.of(), + join( + anyTree(tableScan("customer")), + anyTree(tableScan("orders")))))); + + sql = "SELECT\n" + + " unique_id,\n" + + " MAX(orderdate) AS latest_order_date\n" + + "FROM (\n" + + " SELECT\n" + + " customer.\"$row_id\" AS unique_id,\n" + + " orders.orderdate\n" + + " FROM customer\n" + + " JOIN orders\n" + + " ON customer.custkey = orders.custkey\n" + + " WHERE orders.orderstatus = 'O'\n" + + ") t\n" + + "GROUP BY unique_id"; + assertPlan(sql, anyTree( + aggregation(ImmutableMap.of(), + join( + anyTree(tableScan("customer")), + anyTree(tableScan("orders")))))); + + sql = "SELECT\n" + + " DISTINCT customer.\"$row_id\" AS unique_id,\n" + + " customer.name\n" + + "FROM customer\n" + + "WHERE customer.nationkey = 1"; + + assertPlan(sql, anyTree( + aggregation(ImmutableMap.of(), + project(filter(tableScan("customer")))))); + + sql = "SELECT\n" + + " c.name,\n" + + " o.orderkey,\n" + + " c.\"$row_id\" AS customer_row_id,\n" + + " o.\"$row_id\" AS order_row_id\n" + + "FROM customer c\n" + + "JOIN orders o\n" + + " ON c.\"$row_id\" = o.\"$row_id\"\n" + + "WHERE o.totalprice > 10000"; + assertPlan(sql, anyTree( + join( + exchange(anyTree(tableScan("customer"))), + exchange(anyTree(tableScan("orders")))))); + + sql = "SELECT\n" + + " \"$row_id\" AS unique_id,\n" + + " orderstatus,\n" + + " COUNT(*) AS cnt\n" + + "FROM orders\n" + + "GROUP BY \"$row_id\", orderstatus"; + assertPlan(sql, anyTree( + aggregation(ImmutableMap.of(), + project(tableScan("orders"))))); + + sql = "SELECT\n" + + " o1.orderkey,\n" + + " o2.totalprice\n" + + "FROM orders o1\n" + + "JOIN orders o2\n" + + " ON o1.\"$row_id\" = o2.\"$row_id\"\n" + + "WHERE o1.orderstatus = 'O'"; + assertPlan(sql, anyTree( + join( + exchange(anyTree(tableScan("orders"))), + exchange(anyTree(tableScan("orders")))))); + + sql = "SELECT\n" + + " orderkey,\n" + + " totalprice\n" + + "FROM orders o1\n" + + "WHERE EXISTS (\n" + + " SELECT 1\n" + + " FROM orders o2\n" + + " WHERE o1.\"$row_id\" = o2.\"$row_id\"\n" + + " AND o2.orderstatus = 'F'\n" + + ")"; + assertPlan(sql, anyTree( + join( + exchange(anyTree(tableScan("orders"))), + exchange(anyTree(aggregation(ImmutableMap.of(), project(filter(tableScan("orders"))))))))); + + sql = "SELECT\n" + + " custkey,\n" + + " name\n" + + "FROM customer\n" + + "WHERE \"$row_id\" IN (\n" + + " SELECT \"$row_id\"\n" + + " FROM customer\n" + + " WHERE acctbal > 5000\n" + + ")"; + assertPlan(sql, anyTree( + semiJoin( + anyTree(tableScan("customer")), + anyTree(tableScan("customer"))))); + + sql = "SELECT \"$row_id\", orderkey FROM orders WHERE orderstatus = 'O'\n" + + "UNION\n" + + "SELECT \"$row_id\", orderkey FROM orders WHERE orderstatus = 'F'"; + assertPlan(sql, anyTree( + aggregation(ImmutableMap.of(), + project(filter(tableScan("orders")))))); + + sql = "SELECT\n" + + " c.\"$row_id\",\n" + + " COUNT(o.orderkey) AS order_count,\n" + + " SUM(o.totalprice) AS total_spent,\n" + + " MAX(o.orderdate) AS latest_order,\n" + + " MIN(o.orderdate) AS first_order\n" + + "FROM customer c\n" + + "LEFT JOIN orders o ON c.custkey = o.custkey\n" + + "GROUP BY c.\"$row_id\""; + assertPlan(sql, anyTree( + aggregation(ImmutableMap.of(), + join( + anyTree(tableScan("customer")), + anyTree(tableScan("orders")))))); + + sql = "SELECT\n" + + " \"$row_id\",\n" + + " COUNT(*) AS cnt\n" + + "FROM orders\n" + + "GROUP BY \"$row_id\"\n" + + "HAVING COUNT(*) = 1"; + assertPlan(sql, anyTree( + project(filter( + aggregation(ImmutableMap.of(), + tableScan("orders")))))); + + sql = "SELECT\n" + + " c.custkey,\n" + + " c.name,\n" + + " (\n" + + " SELECT COUNT(*)\n" + + " FROM orders o\n" + + " WHERE o.custkey = c.custkey\n" + + " AND o.\"$row_id\" IS NOT NULL\n" + + " ) AS order_count\n" + + "FROM customer c"; + assertPlan(sql, anyTree( + project( + join( + anyTree(tableScan("customer")), + anyTree(tableScan("orders")))))); + + sql = "SELECT\n" + + " outer_query.customer_id,\n" + + " outer_query.order_count\n" + + "FROM (\n" + + " SELECT\n" + + " c.\"$row_id\" AS customer_id,\n" + + " COUNT(DISTINCT o.\"$row_id\") AS order_count\n" + + " FROM customer c\n" + + " LEFT JOIN orders o ON c.custkey = o.custkey\n" + + " WHERE c.nationkey IN (1, 2, 3)\n" + + " GROUP BY c.\"$row_id\"\n" + + ") outer_query\n" + + "WHERE outer_query.order_count > 2"; + assertPlan(sql, anyTree( + aggregation(ImmutableMap.of(), + project( + join( + anyTree(tableScan("customer")), + anyTree(tableScan("orders"))))))); + + sql = "SELECT\n" + + " c.custkey,\n" + + " c.name\n" + + "FROM customer c\n" + + "WHERE NOT EXISTS (\n" + + " SELECT 1\n" + + " FROM orders o\n" + + " WHERE c.\"$row_id\" = o.\"$row_id\"\n" + + ")"; + assertPlan(sql, anyTree( + join( + exchange(anyTree(tableScan("customer"))), + exchange(anyTree(aggregation(ImmutableMap.of(), tableScan("orders"))))))); + + sql = "SELECT\n" + + " c.name,\n" + + " o.orderkey,\n" + + " l.linenumber\n" + + "FROM customer c\n" + + "JOIN orders o ON c.custkey = o.custkey\n" + + "JOIN lineitem l ON o.orderkey = l.orderkey\n" + + "WHERE c.\"$row_id\" IS NOT NULL\n" + + " AND o.\"$row_id\" IS NOT NULL\n" + + " AND l.\"$row_id\" IS NOT NULL"; + assertPlan(sql, anyTree( + join( + anyTree( + join( + anyTree(tableScan("customer")), + anyTree(tableScan("orders")))), + anyTree(tableScan("lineitem"))))); + + sql = "SELECT\n" + + " c.\"$row_id\" AS customer_row_id,\n" + + " o.\"$row_id\" AS order_row_id,\n" + + " c.name,\n" + + " o.orderkey\n" + + "FROM customer c\n" + + "CROSS JOIN orders o\n" + + "WHERE c.nationkey = 1 AND o.orderstatus = 'O'"; + assertPlan(sql, anyTree( + join( + anyTree(tableScan("customer")), + anyTree(tableScan("orders"))))); + + sql = "SELECT\n" + + " orderstatus,\n" + + " COUNT(\"$row_id\") AS row_count,\n" + + " MIN(\"$row_id\") AS min_row_id\n" + + "FROM orders\n" + + "GROUP BY orderstatus"; + assertPlan(sql, anyTree( + aggregation(ImmutableMap.of(), + exchange(anyTree(tableScan("orders")))))); + + sql = "SELECT\n" + + " xxhash64(\"$row_id\") AS bucket,\n" + + " COUNT(*) AS cnt\n" + + "FROM orders\n" + + "GROUP BY xxhash64(\"$row_id\")"; + assertPlan(sql, anyTree( + aggregation(ImmutableMap.of(), + exchange(anyTree(tableScan("orders")))))); + + sql = "SELECT \"$row_id\", 'customer' AS source FROM customer\n" + + "UNION ALL\n" + + "SELECT \"$row_id\", 'orders' AS source FROM orders"; + assertPlan(sql, anyTree( + exchange( + anyTree(tableScan("customer")), + anyTree(tableScan("orders"))))); + + sql = "SELECT\n" + + " o.\"$row_id\" AS order_row_id,\n" + + " o.orderkey,\n" + + " l.linenumber\n" + + "FROM orders o\n" + + "JOIN lineitem l ON o.orderkey = l.orderkey\n" + + "WHERE o.orderstatus = 'O'"; + assertPlan(sql, anyTree( + join( + exchange(anyTree(tableScan("lineitem"))), + exchange(anyTree(tableScan("orders")))))); + + sql = "SELECT\n" + + " orderkey,\n" + + " totalprice\n" + + "FROM orders o1\n" + + "WHERE \"$row_id\" > (\n" + + " SELECT MIN(\"$row_id\")\n" + + " FROM orders o2\n" + + " WHERE o2.orderstatus = 'F'\n" + + ")"; + assertPlan(sql, anyTree( + join( + tableScan("orders"), + exchange(anyTree(tableScan("orders")))))); + + sql = "SELECT\n" + + " CASE \n" + + " WHEN nationkey = 1 THEN \"$row_id\"\n" + + " ELSE cast('default' as varbinary)\n" + + " END AS conditional_row_id,\n" + + " COUNT(*) AS cnt\n" + + "FROM customer\n" + + "GROUP BY CASE \n" + + " WHEN nationkey = 1 THEN \"$row_id\"\n" + + " ELSE cast('default' as varbinary)\n" + + " END"; + assertPlan(sql, anyTree( + aggregation(ImmutableMap.of(), + exchange(anyTree(tableScan("customer")))))); + } + private static Set toSubfields(String... subfieldPaths) { return Arrays.stream(subfieldPaths) @@ -2170,6 +2502,11 @@ private void assertPushdownSubfields(Session session, String query, String table assertPlan(session, query, anyTree(tableScan(tableName, requiredSubfields))); } + private static PlanMatchPattern tableScan(String expectedTableName) + { + return PlanMatchPattern.tableScan(expectedTableName); + } + private static PlanMatchPattern tableScan(String expectedTableName, Map> expectedRequiredSubfields) { return PlanMatchPattern.tableScan(expectedTableName).with(new HiveTableScanMatcher(expectedRequiredSubfields)); diff --git a/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java index 1a30a76798d37..f335f1f718c57 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -337,6 +337,7 @@ public final class SystemSessionProperties public static final String QUERY_CLIENT_TIMEOUT = "query_client_timeout"; public static final String REWRITE_MIN_MAX_BY_TO_TOP_N = "rewrite_min_max_by_to_top_n"; public static final String ADD_DISTINCT_BELOW_SEMI_JOIN_BUILD = "add_distinct_below_semi_join_build"; + public static final String UTILIZE_UNIQUE_PROPERTY_IN_QUERY_PLANNING = "utilize_unique_property_in_query_planning"; public static final String PUSHDOWN_SUBFIELDS_FOR_MAP_FUNCTIONS = "pushdown_subfields_for_map_functions"; public static final String MAX_SERIALIZABLE_OBJECT_SIZE = "max_serializable_object_size"; @@ -1950,6 +1951,10 @@ public SystemSessionProperties( false, value -> Duration.valueOf((String) value), Duration::toString), + booleanProperty(UTILIZE_UNIQUE_PROPERTY_IN_QUERY_PLANNING, + "Utilize the unique property of input columns in query planning", + featuresConfig.isUtilizeUniquePropertyInQueryPlanning(), + false), booleanProperty(ADD_DISTINCT_BELOW_SEMI_JOIN_BUILD, "Add distinct aggregation below semi join build", featuresConfig.isAddDistinctBelowSemiJoinBuild(), @@ -3310,6 +3315,11 @@ public static boolean isPushSubfieldsForMapFunctionsEnabled(Session session) return session.getSystemProperty(PUSHDOWN_SUBFIELDS_FOR_MAP_FUNCTIONS, Boolean.class); } + public static boolean isUtilizeUniquePropertyInQueryPlanningEnabled(Session session) + { + return session.getSystemProperty(UTILIZE_UNIQUE_PROPERTY_IN_QUERY_PLANNING, Boolean.class); + } + public static boolean isAddDistinctBelowSemiJoinBuildEnabled(Session session) { return session.getSystemProperty(ADD_DISTINCT_BELOW_SEMI_JOIN_BUILD, Boolean.class); diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/TableLayout.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/TableLayout.java index 553f5b5f8da1c..19026f586c035 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/metadata/TableLayout.java +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/TableLayout.java @@ -78,6 +78,11 @@ public List> getLocalProperties() return layout.getLocalProperties(); } + public Optional getUniqueColumn() + { + return layout.getUniqueColumn(); + } + public ConnectorTableLayoutHandle getLayoutHandle() { return layout.getHandle(); diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index 06b1fee1fdce7..cdecf697bda5e 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -309,6 +309,7 @@ public class FeaturesConfig private boolean addDistinctBelowSemiJoinBuild; private boolean pushdownSubfieldForMapFunctions = true; private long maxSerializableObjectSize = 1000; + private boolean utilizeUniquePropertyInQueryPlanning = true; private boolean builtInSidecarFunctionsEnabled; @@ -3101,6 +3102,19 @@ public boolean isPushdownSubfieldForMapFunctions() return pushdownSubfieldForMapFunctions; } + @Config("optimizer.utilize-unique-property-in-query-planning") + @ConfigDescription("Utilize the unique property of input columns in query planning") + public FeaturesConfig setUtilizeUniquePropertyInQueryPlanning(boolean utilizeUniquePropertyInQueryPlanning) + { + this.utilizeUniquePropertyInQueryPlanning = utilizeUniquePropertyInQueryPlanning; + return this; + } + + public boolean isUtilizeUniquePropertyInQueryPlanning() + { + return utilizeUniquePropertyInQueryPlanning; + } + @Config("max_serializable_object_size") @ConfigDescription("Configure the maximum byte size of a serializable object in expression interpreters") public FeaturesConfig setMaxSerializableObjectSize(long maxSerializableObjectSize) diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/ActualProperties.java b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/ActualProperties.java index 32fc8913fa12c..f562d79b50c32 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/ActualProperties.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/ActualProperties.java @@ -49,6 +49,7 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.Iterables.transform; +import static java.lang.String.format; import static java.util.Objects.requireNonNull; public class ActualProperties @@ -56,11 +57,22 @@ public class ActualProperties private final Global global; private final List> localProperties; private final Map constants; + // Used to track the properties of the unique row_id + private final Optional propertiesFromUniqueColumn; private ActualProperties( Global global, List> localProperties, Map constants) + { + this(global, localProperties, constants, Optional.empty()); + } + + private ActualProperties( + Global global, + List> localProperties, + Map constants, + Optional propertiesFromUniqueColumn) { requireNonNull(global, "globalProperties is null"); requireNonNull(localProperties, "localProperties is null"); @@ -85,6 +97,8 @@ private ActualProperties( this.localProperties = ImmutableList.copyOf(updatedLocalProperties); this.constants = ImmutableMap.copyOf(constants); + propertiesFromUniqueColumn.ifPresent(actualProperties -> checkArgument(!actualProperties.getPropertiesFromUniqueColumn().isPresent())); + this.propertiesFromUniqueColumn = propertiesFromUniqueColumn; } public boolean isCoordinatorOnly() @@ -92,6 +106,11 @@ public boolean isCoordinatorOnly() return global.isCoordinatorOnly(); } + public Optional getPropertiesFromUniqueColumn() + { + return propertiesFromUniqueColumn; + } + /** * @return true if the plan will only execute on a single node */ @@ -120,6 +139,16 @@ public boolean isStreamPartitionedOn(Collection col } } + public boolean isStreamPartitionedOnAdditionalProperty(Collection columns, boolean exactly) + { + if (exactly) { + return propertiesFromUniqueColumn.isPresent() && propertiesFromUniqueColumn.get().global.isStreamPartitionedOnExactly(columns, ImmutableSet.of(), false); + } + else { + return propertiesFromUniqueColumn.isPresent() && propertiesFromUniqueColumn.get().global.isStreamPartitionedOn(columns, ImmutableSet.of(), false); + } + } + public boolean isNodePartitionedOn(Collection columns, boolean exactly) { return isNodePartitionedOn(columns, false, exactly); @@ -135,6 +164,16 @@ public boolean isNodePartitionedOn(Collection colum } } + public boolean isNodePartitionedOnAdditionalProperty(Collection columns, boolean exactly) + { + if (exactly) { + return propertiesFromUniqueColumn.isPresent() && propertiesFromUniqueColumn.get().global.isNodePartitionedOnExactly(columns, ImmutableSet.of(), false); + } + else { + return propertiesFromUniqueColumn.isPresent() && propertiesFromUniqueColumn.get().global.isNodePartitionedOn(columns, ImmutableSet.of(), false); + } + } + @Deprecated public boolean isCompatibleTablePartitioningWith(Partitioning partitioning, boolean nullsAndAnyReplicated, Metadata metadata, Session session) { @@ -194,6 +233,13 @@ public ActualProperties translateVariable(Function newAdditionalProperty = Optional.empty(); + if (propertiesFromUniqueColumn.isPresent()) { + ActualProperties translatedAdditionalProperty = propertiesFromUniqueColumn.get().translateVariable(translator); + if (!translatedAdditionalProperty.getLocalProperties().isEmpty()) { + newAdditionalProperty = Optional.of(translatedAdditionalProperty); + } + } return builder() .global(global.translateVariableToRowExpression(variable -> { Optional translated = translator.apply(variable).map(RowExpression.class::cast); @@ -204,6 +250,7 @@ public ActualProperties translateVariable(Function !inputToOutputVariables.containsKey(entry.getKey())) .forEach(inputToOutputMappings::put); + + Optional newAdditionalProperty = Optional.empty(); + if (propertiesFromUniqueColumn.isPresent()) { + ActualProperties translatedAdditionalProperty = propertiesFromUniqueColumn.get().translateRowExpression(assignments); + if (!translatedAdditionalProperty.getLocalProperties().isEmpty()) { + newAdditionalProperty = Optional.of(translatedAdditionalProperty); + } + } + return builder() .global(global.translateRowExpression(inputToOutputMappings.build(), assignments)) .local(LocalProperties.translate(localProperties, variable -> Optional.ofNullable(inputToOutputVariables.get(variable)))) .constants(translatedConstants) + .propertiesFromUniqueColumn(newAdditionalProperty) .build(); } @@ -274,6 +331,7 @@ public static class Builder private List> localProperties; private Map constants; private boolean unordered; + private Optional propertiesFromUniqueColumn; public Builder() { @@ -281,10 +339,16 @@ public Builder() } public Builder(Global global, List> localProperties, Map constants) + { + this(global, localProperties, constants, Optional.empty()); + } + + public Builder(Global global, List> localProperties, Map constants, Optional propertiesFromUniqueColumn) { this.global = requireNonNull(global, "global is null"); this.localProperties = ImmutableList.copyOf(localProperties); this.constants = ImmutableMap.copyOf(constants); + this.propertiesFromUniqueColumn = propertiesFromUniqueColumn; } public Builder global(Global global) @@ -317,6 +381,18 @@ public Builder unordered(boolean unordered) return this; } + public Builder propertiesFromUniqueColumn(Optional propertiesFromUniqueColumn) + { + if (propertiesFromUniqueColumn.isPresent() && !propertiesFromUniqueColumn.get().getLocalProperties().isEmpty()) { + checkArgument(propertiesFromUniqueColumn.get().getLocalProperties().size() == 1); + this.propertiesFromUniqueColumn = propertiesFromUniqueColumn; + } + else { + this.propertiesFromUniqueColumn = Optional.empty(); + } + return this; + } + public ActualProperties build() { List> localProperties = this.localProperties; @@ -332,14 +408,19 @@ public ActualProperties build() } localProperties = newLocalProperties.build(); } - return new ActualProperties(global, localProperties, constants); + if (propertiesFromUniqueColumn.isPresent() && unordered) { + propertiesFromUniqueColumn = Optional.of(ActualProperties.builderFrom(propertiesFromUniqueColumn.get()) + .unordered(unordered) + .build()); + } + return new ActualProperties(global, localProperties, constants, propertiesFromUniqueColumn); } } @Override public int hashCode() { - return Objects.hash(global, localProperties, constants.keySet()); + return Objects.hash(global, localProperties, constants.keySet(), propertiesFromUniqueColumn); } @Override @@ -354,7 +435,8 @@ public boolean equals(Object obj) final ActualProperties other = (ActualProperties) obj; return Objects.equals(this.global, other.global) && Objects.equals(this.localProperties, other.localProperties) - && Objects.equals(this.constants.keySet(), other.constants.keySet()); + && Objects.equals(this.constants.keySet(), other.constants.keySet()) + && Objects.equals(this.propertiesFromUniqueColumn, other.propertiesFromUniqueColumn); } @Override @@ -364,6 +446,7 @@ public String toString() .add("globalProperties", global) .add("localProperties", localProperties) .add("constants", constants) + .add("propertiesFromUniqueColumn", propertiesFromUniqueColumn) .toString(); } @@ -391,7 +474,7 @@ private Global(Optional nodePartitioning, Optional s || !streamPartitioning.isPresent() || nodePartitioning.get().getVariableReferences().containsAll(streamPartitioning.get().getVariableReferences()) || streamPartitioning.get().getVariableReferences().containsAll(nodePartitioning.get().getVariableReferences()), - "Global stream partitioning columns should match node partitioning columns"); + format("Global stream partitioning columns should match node partitioning columns, nodePartitioning: %s, streamPartitioning: %s", nodePartitioning, streamPartitioning)); this.nodePartitioning = requireNonNull(nodePartitioning, "nodePartitioning is null"); this.streamPartitioning = requireNonNull(streamPartitioning, "streamPartitioning is null"); this.nullsAndAnyReplicated = nullsAndAnyReplicated; diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java index 9749dd023c58e..d37d534933dc2 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java @@ -311,7 +311,8 @@ public PlanWithProperties visitAggregation(AggregationNode node, PreferredProper child.getProperties()); } else if (hasMixedGroupingSets - || !isStreamPartitionedOn(child.getProperties(), partitioningRequirement) && !isNodePartitionedOn(child.getProperties(), partitioningRequirement)) { + || !isStreamPartitionedOn(child.getProperties(), partitioningRequirement) && !isNodePartitionedOn(child.getProperties(), partitioningRequirement) + && !isNodePartitionedOnAdditionalProperty(child.getProperties(), partitioningRequirement) && !isStreamPartitionedOnAdditionalProperty(child.getProperties(), partitioningRequirement)) { child = withDerivedProperties( partitionedExchange( idAllocator.getNextId(), @@ -1624,11 +1625,21 @@ private boolean isNodePartitionedOn(ActualProperties properties, Collection columns) + { + return properties.isNodePartitionedOnAdditionalProperty(columns, isExactPartitioningPreferred(session)); + } + private boolean isStreamPartitionedOn(ActualProperties properties, Collection columns) { return properties.isStreamPartitionedOn(columns, isExactPartitioningPreferred(session)); } + private boolean isStreamPartitionedOnAdditionalProperty(ActualProperties properties, Collection columns) + { + return properties.isStreamPartitionedOnAdditionalProperty(columns, isExactPartitioningPreferred(session)); + } + private boolean shouldAggregationMergePartitionPreferences(AggregationPartitioningMergingStrategy aggregationPartitioningMergingStrategy) { if (isExactPartitioningPreferred(session)) { diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java index 1618c69b543e6..65fa710226c48 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java @@ -399,7 +399,8 @@ public PlanWithProperties visitAggregation(AggregationNode node, StreamPreferred // [A, B] [(A, C)] -> List.of(Optional.of(GroupingProperty(C))) // [A, B] [(D, A, C)] -> List.of(Optional.of(GroupingProperty(D, C))) List>> matchResult = LocalProperties.match(child.getProperties().getLocalProperties(), LocalProperties.grouped(groupingKeys)); - if (!matchResult.get(0).isPresent()) { + List>> matchResultForAdditional = LocalProperties.match(child.getProperties().getAdditionalLocalProperties(), LocalProperties.grouped(groupingKeys)); + if (!matchResult.get(0).isPresent() || !matchResultForAdditional.get(0).isPresent()) { // !isPresent() indicates the property was satisfied completely preGroupedSymbols = groupingKeys; } diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/LocalProperties.java b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/LocalProperties.java index e8c02f02216e4..00fa4935ee21a 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/LocalProperties.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/LocalProperties.java @@ -18,6 +18,7 @@ import com.facebook.presto.spi.GroupingProperty; import com.facebook.presto.spi.LocalProperty; import com.facebook.presto.spi.SortingProperty; +import com.facebook.presto.spi.UniqueProperty; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.PeekingIterator; @@ -55,6 +56,11 @@ public static List> sorted(Collection columns, SortOrder return columns.stream().map(column -> new SortingProperty<>(column, order)).collect(toImmutableList()); } + public static List> unique(T column) + { + return ImmutableList.of(new UniqueProperty<>(column)); + } + public static List> stripLeadingConstants(List> properties) { PeekingIterator> iterator = peekingIterator(properties.iterator()); diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java index f086283a140da..bb4f27b09bb0e 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java @@ -23,6 +23,7 @@ import com.facebook.presto.spi.GroupingProperty; import com.facebook.presto.spi.LocalProperty; import com.facebook.presto.spi.SortingProperty; +import com.facebook.presto.spi.UniqueProperty; import com.facebook.presto.spi.plan.AggregationNode; import com.facebook.presto.spi.plan.DeleteNode; import com.facebook.presto.spi.plan.DistinctLimitNode; @@ -90,6 +91,7 @@ import static com.facebook.presto.SystemSessionProperties.isJoinSpillingEnabled; import static com.facebook.presto.SystemSessionProperties.isSpillEnabled; +import static com.facebook.presto.SystemSessionProperties.isUtilizeUniquePropertyInQueryPlanningEnabled; import static com.facebook.presto.SystemSessionProperties.planWithTableNodePartitioning; import static com.facebook.presto.common.predicate.TupleDomain.toLinkedMap; import static com.facebook.presto.spi.relation.DomainTranslator.BASIC_COLUMN_EXTRACTOR; @@ -142,6 +144,22 @@ public static ActualProperties streamBackdoorDeriveProperties(PlanNode node, Lis return node.accept(new Visitor(metadata, session), inputProperties); } + public static Optional uniqueToGroupProperties(ActualProperties properties) + { + // We only call uniqueToGroupProperties on derived properties from propertiesFromUniqueColumn, which can have one local property if the column is preserved in a node + // output, or no local property if the column is not preserved in a node output + checkArgument(properties.getLocalProperties().size() <= 1); + if (properties.getLocalProperties().isEmpty()) { + return Optional.empty(); + } + LocalProperty localProperty = Iterables.getOnlyElement(properties.getLocalProperties()); + if (localProperty instanceof UniqueProperty) { + return Optional.of(ActualProperties.builderFrom(properties).local(ImmutableList.of(new GroupingProperty<>(ImmutableList.of(((UniqueProperty) localProperty).getColumn())))).build()); + } + checkState(localProperty instanceof GroupingProperty, "returned actual properties should have grouping property"); + return Optional.of(properties); + } + private static class Visitor extends InternalPlanVisitor> { @@ -196,12 +214,14 @@ public ActualProperties visitAssignUniqueId(AssignUniqueId node, List in inputToOutputMappings.putIfAbsent(argument, argument); } - return Iterables.getOnlyElement(inputProperties).translateVariable(column -> Optional.ofNullable(inputToOutputMappings.get(column))); + ActualProperties properties = Iterables.getOnlyElement(inputProperties); + return ActualProperties.builderFrom(properties.translateVariable(column -> Optional.ofNullable(inputToOutputMappings.get(column)))) + .propertiesFromUniqueColumn(properties.getPropertiesFromUniqueColumn().flatMap(x -> uniqueToGroupProperties(x.translateVariable(column -> Optional.ofNullable(inputToOutputMappings.get(column)))))) + .build(); } @Override @@ -291,10 +314,9 @@ public ActualProperties visitAggregation(AggregationNode node, List node.getGroupingKeys().contains(variable) ? Optional.of(variable) : Optional.empty()); - return ActualProperties.builderFrom(translated) .local(LocalProperties.grouped(node.getGroupingKeys())) - .build(); + .propertiesFromUniqueColumn(uniqueProperties(translated.getPropertiesFromUniqueColumn())).build(); } @Override @@ -303,6 +325,14 @@ public ActualProperties visitRowNumber(RowNumberNode node, List uniqueProperties(Optional properties) + { + if (properties.isPresent() && properties.get().getLocalProperties().size() == 1 && properties.get().getLocalProperties().get(0) instanceof UniqueProperty) { + return properties; + } + return Optional.empty(); + } + @Override public ActualProperties visitTopNRowNumber(TopNRowNumberNode node, List inputProperties) { @@ -316,6 +346,7 @@ public ActualProperties visitTopNRowNumber(TopNRowNumberNode node, List inputPro return ActualProperties.builderFrom(properties) .local(localProperties) + .propertiesFromUniqueColumn(uniqueProperties(properties.getPropertiesFromUniqueColumn())) .build(); } @@ -344,6 +376,7 @@ public ActualProperties visitSort(SortNode node, List inputPro return ActualProperties.builderFrom(properties) .local(localProperties) + .propertiesFromUniqueColumn(uniqueProperties(properties.getPropertiesFromUniqueColumn())) .build(); } @@ -360,6 +393,7 @@ public ActualProperties visitDistinctLimit(DistinctLimitNode node, List inputPro return ActualProperties.builderFrom(probeProperties) .constants(constants) + .propertiesFromUniqueColumn(probeProperties.getPropertiesFromUniqueColumn().flatMap(x -> uniqueToGroupProperties(x.translateVariable(column -> filterOrRewrite(outputVariableReferences, node.getCriteria(), column))))) .unordered(unordered) .build(); case LEFT: return ActualProperties.builderFrom(probeProperties.translateVariable(column -> filterIfMissing(outputVariableReferences, column))) + .propertiesFromUniqueColumn(probeProperties.getPropertiesFromUniqueColumn().flatMap(x -> uniqueToGroupProperties(x.translateVariable(column -> filterIfMissing(outputVariableReferences, column))))) .unordered(unordered) .build(); case RIGHT: @@ -518,10 +554,12 @@ public ActualProperties visitIndexJoin(IndexJoinNode node, List uniqueToGroupProperties(x))) .build(); case SOURCE_OUTER: return ActualProperties.builderFrom(probeProperties) .constants(probeProperties.getConstants()) + .propertiesFromUniqueColumn(probeProperties.getPropertiesFromUniqueColumn().flatMap(x -> uniqueToGroupProperties(x))) .build(); default: throw new UnsupportedOperationException("Unsupported join type: " + node.getType()); @@ -553,16 +591,19 @@ public ActualProperties visitMergeJoin(MergeJoinNode node, List uniqueToGroupProperties(x.translateVariable(column -> filterOrRewrite(outputVariableReferences, node.getCriteria(), column))))) .constants(constants) .build(); case LEFT: return ActualProperties.builderFrom(leftProperties.translateVariable(column -> filterIfMissing(outputVariableReferences, column))) + .propertiesFromUniqueColumn(leftProperties.getPropertiesFromUniqueColumn().flatMap(x -> uniqueToGroupProperties(x.translateVariable(column -> filterIfMissing(outputVariableReferences, column))))) .build(); case RIGHT: rightProperties = rightProperties.translateVariable(column -> filterIfMissing(node.getOutputVariables(), column)); return ActualProperties.builderFrom(rightProperties.translateVariable(column -> filterIfMissing(outputVariableReferences, column))) .local(ImmutableList.of()) + .propertiesFromUniqueColumn(rightProperties.getPropertiesFromUniqueColumn().flatMap(x -> uniqueToGroupProperties(x.translateVariable(column -> filterIfMissing(outputVariableReferences, column))))) .unordered(true) .build(); case FULL: @@ -598,6 +639,7 @@ public ActualProperties visitExchange(ExchangeNode node, List checkArgument(!node.getScope().isRemote() || inputProperties.stream().noneMatch(ActualProperties::isNullsAndAnyReplicated), "Null-and-any replicated inputs should not be remotely exchanged"); Set> entries = null; + ActualProperties translated = null; for (int sourceIndex = 0; sourceIndex < node.getSources().size(); sourceIndex++) { List inputVariables = node.getInputs().get(sourceIndex); Map inputToOutput = new HashMap<>(); @@ -605,7 +647,7 @@ public ActualProperties visitExchange(ExchangeNode node, List inputToOutput.put(inputVariables.get(i), node.getOutputVariables().get(i)); } - ActualProperties translated = inputProperties.get(sourceIndex).translateVariable(variable -> Optional.ofNullable(inputToOutput.get(variable))); + translated = inputProperties.get(sourceIndex).translateVariable(variable -> Optional.ofNullable(inputToOutput.get(variable))); entries = (entries == null) ? translated.getConstants().entrySet() : Sets.intersection(entries, translated.getConstants().entrySet()); } @@ -621,6 +663,8 @@ public ActualProperties visitExchange(ExchangeNode node, List .forEach(localProperties::add); } + boolean additionalPropertyIsUnique = inputProperties.size() == 1 && uniqueProperties(translated.getPropertiesFromUniqueColumn()).isPresent() && !node.getType().equals(ExchangeNode.Type.REPLICATE); + // Local exchanges are only created in AddLocalExchanges, at the end of optimization, and // local exchanges do not produce all global properties as represented by ActualProperties. // This is acceptable because AddLocalExchanges does not use global properties and is only @@ -640,6 +684,10 @@ else if (inputProperties.stream().anyMatch(ActualProperties::isSingleNode)) { builder.global(coordinatorSingleStreamPartition()); } + if (additionalPropertyIsUnique) { + builder.propertiesFromUniqueColumn(translated.getPropertiesFromUniqueColumn()); + } + return builder.build(); } @@ -650,6 +698,7 @@ else if (inputProperties.stream().anyMatch(ActualProperties::isSingleNode)) { .global(coordinatorOnly ? coordinatorSingleStreamPartition() : singleStreamPartition()) .local(localProperties.build()) .constants(constants) + .propertiesFromUniqueColumn(additionalPropertyIsUnique ? translated.getPropertiesFromUniqueColumn() : Optional.empty()) .build(); case REPARTITION: { Global globalPartitioning; @@ -666,6 +715,7 @@ else if (inputProperties.stream().anyMatch(ActualProperties::isSingleNode)) { return ActualProperties.builder() .global(globalPartitioning) .constants(constants) + .propertiesFromUniqueColumn(additionalPropertyIsUnique ? translated.getPropertiesFromUniqueColumn() : Optional.empty()) .build(); } case REPLICATE: @@ -691,6 +741,7 @@ public ActualProperties visitFilter(FilterNode node, List inpu return ActualProperties.builderFrom(properties) .constants(constants) + .propertiesFromUniqueColumn(properties.getPropertiesFromUniqueColumn()) .build(); } @@ -728,6 +779,7 @@ else if (!(value instanceof RowExpression)) { return ActualProperties.builderFrom(translatedProperties) .constants(constants) + .propertiesFromUniqueColumn(properties.getPropertiesFromUniqueColumn().map(x -> x.translateRowExpression(node.getAssignments().getMap()))) .build(); } @@ -814,6 +866,13 @@ public ActualProperties visitTableScan(TableScanNode node, List Optional.ofNullable(assignments.get(column)))); + if (isUtilizeUniquePropertyInQueryPlanningEnabled(session) && layout.getUniqueColumn().isPresent() && assignments.containsKey(layout.getUniqueColumn().get())) { + VariableReferenceExpression uniqueVariable = assignments.get(layout.getUniqueColumn().get()); + ActualProperties.Builder propertiesFromUniqueColumn = ActualProperties.builder(); + propertiesFromUniqueColumn.global(partitionedOn(ARBITRARY_DISTRIBUTION, ImmutableList.of(uniqueVariable), Optional.empty())); + propertiesFromUniqueColumn.local(LocalProperties.unique(uniqueVariable)); + properties.propertiesFromUniqueColumn(Optional.of(propertiesFromUniqueColumn.build())); + } return properties.build(); } diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPreferredProperties.java b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPreferredProperties.java index 221d224ea6d40..96288e6af729c 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPreferredProperties.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPreferredProperties.java @@ -188,9 +188,11 @@ else if (actualProperties.getDistribution() == SINGLE) { // is there a preference for a specific partitioning scheme? if (partitioningColumns.isPresent()) { if (exactColumnOrder) { - return actualProperties.isExactlyPartitionedOn(partitioningColumns.get()); + return actualProperties.isExactlyPartitionedOn(partitioningColumns.get()) + || actualProperties.getStreamPropertiesFromUniqueColumn().isPresent() && actualProperties.getStreamPropertiesFromUniqueColumn().get().isExactlyPartitionedOn(partitioningColumns.get()); } - return actualProperties.isPartitionedOn(partitioningColumns.get()); + return actualProperties.isPartitionedOn(partitioningColumns.get()) + || actualProperties.getStreamPropertiesFromUniqueColumn().isPresent() && actualProperties.getStreamPropertiesFromUniqueColumn().get().isPartitionedOn(partitioningColumns.get()); } return true; diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java index 58f8e32977de6..51e6d3de35ab2 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java @@ -18,6 +18,7 @@ import com.facebook.presto.metadata.TableLayout; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.LocalProperty; +import com.facebook.presto.spi.UniqueProperty; import com.facebook.presto.spi.plan.AggregationNode; import com.facebook.presto.spi.plan.DeleteNode; import com.facebook.presto.spi.plan.DistinctLimitNode; @@ -78,6 +79,7 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static com.facebook.presto.SystemSessionProperties.isUtilizeUniquePropertyInQueryPlanningEnabled; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION; import static com.facebook.presto.sql.planner.optimizations.PropertyDerivations.extractFixedValuesToConstantExpressions; import static com.facebook.presto.sql.planner.optimizations.StreamPropertyDerivations.StreamProperties.StreamDistribution.FIXED; @@ -123,7 +125,10 @@ public static StreamProperties deriveProperties(PlanNode node, List properties.otherActualProperties) + .map(properties -> { + checkState(properties.otherActualProperties.isPresent(), "otherActualProperties should always exist"); + return properties.otherActualProperties.get(); + }) .collect(toImmutableList()), metadata, session); @@ -225,11 +230,8 @@ public StreamProperties visitIndexJoin(IndexJoinNode node, List streamPropertiesFromUniqueColumn = Optional.empty(); + if (isUtilizeUniquePropertyInQueryPlanningEnabled(session) && layout.getUniqueColumn().isPresent() && assignments.containsKey(layout.getUniqueColumn().get())) { + streamPropertiesFromUniqueColumn = Optional.of(new StreamProperties(streamDistribution, Optional.of(ImmutableList.of(assignments.get(layout.getUniqueColumn().get()))), false)); + } + // if we are partitioned on empty set, we must say multiple of unknown partitioning, because // the connector does not guarantee a single split in this case (since it might not understand // that the value is a constant). if (streamPartitionSymbols.isPresent() && streamPartitionSymbols.get().isEmpty()) { - return new StreamProperties(streamDistribution, Optional.empty(), false); + return new StreamProperties(streamDistribution, Optional.empty(), false, Optional.empty(), streamPropertiesFromUniqueColumn); } - return new StreamProperties(streamDistribution, streamPartitionSymbols, false); + return new StreamProperties(streamDistribution, streamPartitionSymbols, false, Optional.empty(), streamPropertiesFromUniqueColumn); } private Optional> getNonConstantVariables(Set columnHandles, Map assignments, Set globalConstants) @@ -337,20 +344,32 @@ public StreamProperties visitExchange(ExchangeNode node, List return new StreamProperties(MULTIPLE, Optional.empty(), false); } + Optional additionalUniqueProperty = Optional.empty(); + if (inputProperties.size() == 1 && inputProperties.get(0).hasUniqueProperties() && !node.getType().equals(ExchangeNode.Type.REPLICATE)) { + List inputVariables = node.getInputs().get(0); + Map inputToOutput = new HashMap<>(); + for (int i = 0; i < node.getOutputVariables().size(); i++) { + inputToOutput.put(inputVariables.get(i), node.getOutputVariables().get(i)); + } + checkState(inputProperties.get(0).getStreamPropertiesFromUniqueColumn().isPresent(), + "when unique columns exists, the stream is also partitioned by the unique column and should be represented in the streamPropertiesFromUniqueColumn field"); + additionalUniqueProperty = Optional.of(inputProperties.get(0).getStreamPropertiesFromUniqueColumn().get().translate(column -> Optional.ofNullable(inputToOutput.get(column)))); + } + if (node.getScope().isRemote()) { // TODO: correctly determine if stream is parallelised // based on session properties - return StreamProperties.fixedStreams(); + return StreamProperties.fixedStreams().withStreamPropertiesFromUniqueColumn(additionalUniqueProperty); } switch (node.getType()) { case GATHER: - return StreamProperties.singleStream(); + return StreamProperties.singleStream().withStreamPropertiesFromUniqueColumn(additionalUniqueProperty); case REPARTITION: if (node.getPartitioningScheme().getPartitioning().getHandle().equals(FIXED_ARBITRARY_DISTRIBUTION) || // no strict partitioning guarantees when multiple writers per partitions are allows (scaled writers) node.getPartitioningScheme().isScaleWriters()) { - return new StreamProperties(FIXED, Optional.empty(), false); + return new StreamProperties(FIXED, Optional.empty(), false).withStreamPropertiesFromUniqueColumn(additionalUniqueProperty); } checkArgument( node.getPartitioningScheme().getPartitioning().getArguments().stream().allMatch(VariableReferenceExpression.class::isInstance), @@ -359,7 +378,7 @@ public StreamProperties visitExchange(ExchangeNode node, List FIXED, Optional.of(node.getPartitioningScheme().getPartitioning().getArguments().stream() .map(VariableReferenceExpression.class::cast) - .collect(toImmutableList())), false); + .collect(toImmutableList())), false).withStreamPropertiesFromUniqueColumn(additionalUniqueProperty); case REPLICATE: return new StreamProperties(MULTIPLE, Optional.empty(), false); } @@ -668,21 +687,33 @@ public enum StreamDistribution // We are only interested in the local properties, but PropertyDerivations requires input // ActualProperties, so we hold on to the whole object - private final ActualProperties otherActualProperties; + private final Optional otherActualProperties; // NOTE: Partitioning on zero columns (or effectively zero columns if the columns are constant) indicates that all // the rows will be partitioned into a single stream. + private final Optional streamPropertiesFromUniqueColumn; + private StreamProperties(StreamDistribution distribution, Optional> partitioningColumns, boolean ordered) { - this(distribution, partitioningColumns, ordered, null); + this(distribution, partitioningColumns, ordered, Optional.empty()); + } + + private StreamProperties( + StreamDistribution distribution, + Optional> partitioningColumns, + boolean ordered, + Optional otherActualProperties) + { + this(distribution, partitioningColumns, ordered, otherActualProperties, Optional.empty()); } private StreamProperties( StreamDistribution distribution, Optional> partitioningColumns, boolean ordered, - ActualProperties otherActualProperties) + Optional otherActualProperties, + Optional streamPropertiesFromUniqueColumn) { this.distribution = requireNonNull(distribution, "distribution is null"); @@ -697,13 +728,38 @@ private StreamProperties( this.ordered = ordered; checkArgument(!ordered || distribution == SINGLE, "Ordered must be a single stream"); - this.otherActualProperties = otherActualProperties; + this.otherActualProperties = requireNonNull(otherActualProperties); + requireNonNull(streamPropertiesFromUniqueColumn).ifPresent(properties -> checkArgument(!properties.streamPropertiesFromUniqueColumn.isPresent())); + // When unique properties exists, the stream is also partitioned on the unique column + if (otherActualProperties.isPresent() && otherActualProperties.get().getPropertiesFromUniqueColumn().isPresent()) { + ActualProperties propertiesFromUniqueColumn = otherActualProperties.get().getPropertiesFromUniqueColumn().get(); + if (Iterables.getOnlyElement(propertiesFromUniqueColumn.getLocalProperties()) instanceof UniqueProperty) { + VariableReferenceExpression uniqueVariable = (VariableReferenceExpression) ((UniqueProperty) Iterables.getOnlyElement(propertiesFromUniqueColumn.getLocalProperties())).getColumn(); + checkState(streamPropertiesFromUniqueColumn.isPresent() && streamPropertiesFromUniqueColumn.get().partitioningColumns.isPresent() + && Iterables.getOnlyElement(streamPropertiesFromUniqueColumn.get().partitioningColumns.get()).equals(uniqueVariable)); + } + } + this.streamPropertiesFromUniqueColumn = streamPropertiesFromUniqueColumn; } public List> getLocalProperties() { - checkState(otherActualProperties != null, "otherActualProperties not set"); - return otherActualProperties.getLocalProperties(); + checkState(otherActualProperties.isPresent(), "otherActualProperties not set"); + return otherActualProperties.get().getLocalProperties(); + } + + public List> getAdditionalLocalProperties() + { + checkState(otherActualProperties.isPresent(), "otherActualProperties not set"); + if (!otherActualProperties.get().getPropertiesFromUniqueColumn().isPresent()) { + return ImmutableList.of(); + } + return otherActualProperties.get().getPropertiesFromUniqueColumn().get().getLocalProperties(); + } + + public Optional getStreamPropertiesFromUniqueColumn() + { + return streamPropertiesFromUniqueColumn; } private static StreamProperties singleStream() @@ -725,8 +781,9 @@ private StreamProperties unordered(boolean unordered) { if (unordered) { ActualProperties updatedProperties = null; - if (otherActualProperties != null) { - updatedProperties = ActualProperties.builderFrom(otherActualProperties) + if (otherActualProperties.isPresent()) { + updatedProperties = ActualProperties.builderFrom(otherActualProperties.get()) + .propertiesFromUniqueColumn(otherActualProperties.get().getPropertiesFromUniqueColumn().map(x -> ActualProperties.builderFrom(x).unordered(true).build())) .unordered(true) .build(); } @@ -734,11 +791,33 @@ private StreamProperties unordered(boolean unordered) distribution, partitioningColumns, false, - updatedProperties); + Optional.ofNullable(updatedProperties), + streamPropertiesFromUniqueColumn.map(x -> x.unordered(true))); } return this; } + public StreamProperties uniqueToGroupProperties() + { + if (otherActualProperties.isPresent() && otherActualProperties.get().getPropertiesFromUniqueColumn().isPresent()) { + if (Iterables.getOnlyElement(otherActualProperties.get().getPropertiesFromUniqueColumn().get().getLocalProperties()) instanceof UniqueProperty) { + Optional groupedProperties = PropertyDerivations.uniqueToGroupProperties(otherActualProperties.get().getPropertiesFromUniqueColumn().get()); + return new StreamProperties(distribution, partitioningColumns, ordered, + otherActualProperties.map(x -> ActualProperties.builderFrom(x).propertiesFromUniqueColumn(groupedProperties).build()), + streamPropertiesFromUniqueColumn.map(StreamProperties::uniqueToGroupProperties)); + } + } + return this; + } + + public boolean hasUniqueProperties() + { + if (otherActualProperties.isPresent() && otherActualProperties.get().getPropertiesFromUniqueColumn().isPresent()) { + return Iterables.getOnlyElement(otherActualProperties.get().getPropertiesFromUniqueColumn().get().getLocalProperties()) instanceof UniqueProperty; + } + return false; + } + public boolean isSingleStream() { return distribution == SINGLE; @@ -783,7 +862,12 @@ private StreamProperties withUnspecifiedPartitioning() private StreamProperties withOtherActualProperties(ActualProperties actualProperties) { - return new StreamProperties(distribution, partitioningColumns, ordered, actualProperties); + return new StreamProperties(distribution, partitioningColumns, ordered, Optional.ofNullable(actualProperties), streamPropertiesFromUniqueColumn); + } + + private StreamProperties withStreamPropertiesFromUniqueColumn(Optional streamPropertiesFromUniqueColumn) + { + return new StreamProperties(distribution, partitioningColumns, ordered, otherActualProperties, streamPropertiesFromUniqueColumn); } public StreamProperties translate(Function> translator) @@ -801,7 +885,8 @@ public StreamProperties translate(Function x.translateVariable(translator)), + streamPropertiesFromUniqueColumn.map(x -> x.translate(translator))); } public Optional> getPartitioningColumns() @@ -812,7 +897,7 @@ public Optional> getPartitioningColumns() @Override public int hashCode() { - return Objects.hash(distribution, partitioningColumns); + return Objects.hash(distribution, partitioningColumns, ordered, otherActualProperties, streamPropertiesFromUniqueColumn); } @Override @@ -826,7 +911,10 @@ public boolean equals(Object obj) } StreamProperties other = (StreamProperties) obj; return Objects.equals(this.distribution, other.distribution) && - Objects.equals(this.partitioningColumns, other.partitioningColumns); + Objects.equals(this.partitioningColumns, other.partitioningColumns) && + this.ordered == other.ordered && + Objects.equals(this.otherActualProperties, other.otherActualProperties) && + Objects.equals(this.streamPropertiesFromUniqueColumn, other.streamPropertiesFromUniqueColumn); } @Override @@ -835,6 +923,9 @@ public String toString() return toStringHelper(this) .add("distribution", distribution) .add("partitioningColumns", partitioningColumns) + .add("ordered", ordered) + .add("otherActualProperties", otherActualProperties) + .add("streamPropertiesFromUniqueColumn", streamPropertiesFromUniqueColumn) .toString(); } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateStreamingAggregations.java b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateStreamingAggregations.java index cfe040826bdcf..f5ca4ab176918 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateStreamingAggregations.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateStreamingAggregations.java @@ -86,8 +86,10 @@ public Void visitAggregation(AggregationNode node, Void context) List> desiredProperties = ImmutableList.of(new GroupingProperty<>(node.getPreGroupedVariables())); Iterator>> matchIterator = LocalProperties.match(properties.getLocalProperties(), desiredProperties).iterator(); + Iterator>> additionalMatchIterator = LocalProperties.match(properties.getAdditionalLocalProperties(), desiredProperties).iterator(); Optional> unsatisfiedRequirement = Iterators.getOnlyElement(matchIterator); - checkArgument(!unsatisfiedRequirement.isPresent(), "Streaming aggregation with input not grouped on the grouping keys"); + Optional> additionalUnsatisfiedRequirement = Iterators.getOnlyElement(additionalMatchIterator); + checkArgument(!unsatisfiedRequirement.isPresent() || !additionalUnsatisfiedRequirement.isPresent(), "Streaming aggregation with input not grouped on the grouping keys"); return null; } } diff --git a/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index cc29fe7160751..56be62a79306a 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -263,6 +263,7 @@ public void testDefaults() .setAddExchangeBelowPartialAggregationOverGroupId(false) .setAddDistinctBelowSemiJoinBuild(false) .setPushdownSubfieldForMapFunctions(true) + .setUtilizeUniquePropertyInQueryPlanning(true) .setInnerJoinPushdownEnabled(false) .setBroadcastSemiJoinForDelete(true) .setInEqualityJoinPushdownEnabled(false) @@ -482,6 +483,7 @@ public void testExplicitPropertyMappings() .put("exclude-invalid-worker-session-properties", "true") .put("optimizer.add-distinct-below-semi-join-build", "true") .put("optimizer.pushdown-subfield-for-map-functions", "false") + .put("optimizer.utilize-unique-property-in-query-planning", "false") .put("optimizer.add-exchange-below-partial-aggregation-over-group-id", "true") .put("max_serializable_object_size", "50") .build(); @@ -692,6 +694,7 @@ public void testExplicitPropertyMappings() .setAddExchangeBelowPartialAggregationOverGroupId(true) .setAddDistinctBelowSemiJoinBuild(true) .setPushdownSubfieldForMapFunctions(false) + .setUtilizeUniquePropertyInQueryPlanning(false) .setInEqualityJoinPushdownEnabled(true) .setBroadcastSemiJoinForDelete(false) .setRewriteMinMaxByToTopNEnabled(true) diff --git a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/assertions/PlanMatchPattern.java b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/assertions/PlanMatchPattern.java index 9c6eb9a528e56..c69c4ea815c92 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/assertions/PlanMatchPattern.java +++ b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/assertions/PlanMatchPattern.java @@ -412,6 +412,11 @@ public static PlanMatchPattern strictProject(Map assi .withExactAssignments(assignments.values()); } + public static PlanMatchPattern semiJoin(PlanMatchPattern source, PlanMatchPattern filtering) + { + return node(SemiJoinNode.class, source, filtering); + } + public static PlanMatchPattern semiJoin(String sourceSymbolAlias, String filteringSymbolAlias, String outputAlias, PlanMatchPattern source, PlanMatchPattern filtering) { return semiJoin(sourceSymbolAlias, filteringSymbolAlias, outputAlias, Optional.empty(), source, filtering); diff --git a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/optimizations/TestLocalProperties.java b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/optimizations/TestLocalProperties.java index db0600d2d6380..edbeeb3e0cdc2 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/optimizations/TestLocalProperties.java +++ b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/optimizations/TestLocalProperties.java @@ -20,6 +20,7 @@ import com.facebook.presto.spi.GroupingProperty; import com.facebook.presto.spi.LocalProperty; import com.facebook.presto.spi.SortingProperty; +import com.facebook.presto.spi.UniqueProperty; import com.facebook.presto.spi.relation.VariableReferenceExpression; import com.facebook.presto.testing.TestingMetadata.TestingColumnHandle; import com.fasterxml.jackson.core.JsonParser; @@ -78,6 +79,10 @@ public void testConstantProcessing() input = ImmutableList.of(constant("a"), constant("b")); assertEquals(stripLeadingConstants(input), ImmutableList.of()); assertEquals(extractLeadingConstants(input), ImmutableSet.of("a", "b")); + + input = ImmutableList.of(unique("a")); + assertEquals(stripLeadingConstants(input), ImmutableList.of(unique("a"))); + assertEquals(extractLeadingConstants(input), ImmutableSet.of()); } @Test @@ -138,6 +143,14 @@ public void testTranslate() map = ImmutableMap.of("a", "a1", "b", "b1", "c", "c1"); input = ImmutableList.of(grouped("a"), constant("b"), grouped("c")); assertEquals(LocalProperties.translate(input, translateWithMap(map)), ImmutableList.of(grouped("a1"), constant("b1"), grouped("c1"))); + + map = ImmutableMap.of(); + input = ImmutableList.of(unique("a")); + assertEquals(LocalProperties.translate(input, translateWithMap(map)), ImmutableList.of()); + + map = ImmutableMap.of("a", "a1"); + input = ImmutableList.of(unique("a")); + assertEquals(LocalProperties.translate(input, translateWithMap(map)), ImmutableList.of(unique("a1"))); } private static Function> translateWithMap(Map translateMap) @@ -177,6 +190,35 @@ public void testNormalizeOverlappingSymbol() assertNormalizeAndFlatten( localProperties, grouped("a")); + + localProperties = builder() + .unique("a") + .sorted("a", SortOrder.ASC_NULLS_FIRST) + .constant("a") + .build(); + assertNormalize( + localProperties, + Optional.of(unique("a")), + Optional.empty(), + Optional.empty()); + assertNormalizeAndFlatten( + localProperties, + unique("a")); + + localProperties = builder() + .grouped("a") + .unique("a") + .constant("a") + .build(); + assertNormalize( + localProperties, + Optional.of(grouped("a")), + Optional.of(unique("a")), + Optional.empty()); + assertNormalizeAndFlatten( + localProperties, + grouped("a"), + unique("a")); } @Test @@ -780,6 +822,11 @@ private static GroupingProperty grouped(String... columns) return new GroupingProperty<>(Arrays.asList(columns)); } + private static UniqueProperty unique(String column) + { + return new UniqueProperty<>(column); + } + private static SortingProperty sorted(String column, SortOrder order) { return new SortingProperty<>(column, order); @@ -812,6 +859,12 @@ public Builder constant(String column) return this; } + public Builder unique(String column) + { + properties.add(new UniqueProperty<>(column)); + return this; + } + public List> build() { return new ArrayList<>(properties); diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorTableLayout.java b/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorTableLayout.java index f355b52f7e993..d794088f14f79 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorTableLayout.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorTableLayout.java @@ -34,6 +34,7 @@ public class ConnectorTableLayout private final Optional discretePredicates; private final List> localProperties; private final Optional remainingPredicate; + private final Optional uniqueColumn; public ConnectorTableLayout(ConnectorTableLayoutHandle handle) { @@ -68,6 +69,20 @@ public ConnectorTableLayout( Optional discretePredicates, List> localProperties, Optional remainingPredicate) + { + this(handle, columns, predicate, tablePartitioning, streamPartitioningColumns, discretePredicates, localProperties, remainingPredicate, Optional.empty()); + } + + public ConnectorTableLayout( + ConnectorTableLayoutHandle handle, + Optional> columns, + TupleDomain predicate, + Optional tablePartitioning, + Optional> streamPartitioningColumns, + Optional discretePredicates, + List> localProperties, + Optional remainingPredicate, + Optional uniqueColumn) { requireNonNull(handle, "handle is null"); requireNonNull(columns, "columns is null"); @@ -86,6 +101,7 @@ public ConnectorTableLayout( this.discretePredicates = discretePredicates; this.localProperties = localProperties; this.remainingPredicate = remainingPredicate; + this.uniqueColumn = uniqueColumn; } public ConnectorTableLayoutHandle getHandle() @@ -123,6 +139,11 @@ public Optional getRemainingPredicate() return remainingPredicate; } + public Optional getUniqueColumn() + { + return uniqueColumn; + } + /** * The partitioning of the table across the worker nodes. *

diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/UniqueProperty.java b/presto-spi/src/main/java/com/facebook/presto/spi/UniqueProperty.java new file mode 100644 index 0000000000000..946cdb6dc1dab --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/UniqueProperty.java @@ -0,0 +1,98 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Collections; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; + +import static java.util.Objects.requireNonNull; + +public final class UniqueProperty + implements LocalProperty +{ + private final E column; + + @JsonCreator + public UniqueProperty(@JsonProperty("column") E column) + { + this.column = requireNonNull(column, "column is null"); + } + + @Override + public boolean isOrderSensitive() + { + return false; + } + + @JsonProperty + public E getColumn() + { + return column; + } + + public Set getColumns() + { + return Collections.singleton(column); + } + + @Override + public Optional> translate(Function> translator) + { + return translator.apply(column) + .map(UniqueProperty::new); + } + + @Override + public boolean isSimplifiedBy(LocalProperty known) + { + return known instanceof UniqueProperty && known.equals(this); + } + + @Override + public Optional> withConstants(Set constants) + { + return Optional.of(this); + } + + @Override + public String toString() + { + return "U(" + column + ")"; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UniqueProperty that = (UniqueProperty) o; + return Objects.equals(column, that.column); + } + + @Override + public int hashCode() + { + return Objects.hash(column); + } +}