diff --git a/presto-docs/src/main/sphinx/admin/properties.rst b/presto-docs/src/main/sphinx/admin/properties.rst index 99581b93c6454..b9f513968dbe6 100644 --- a/presto-docs/src/main/sphinx/admin/properties.rst +++ b/presto-docs/src/main/sphinx/admin/properties.rst @@ -845,3 +845,40 @@ The following properties allow tuning the :doc:`/functions/regexp`. to hit the limit on matches for subsequent rows as well, you want to use the correct algorithm from the beginning so as not to waste time and resources. The more rows you are processing, the larger this value should be. + +CTE Materialization Properties +-------------------------------------- + +``cte-materialization-strategy`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + + * **Type:** ``string`` + * **Allowed values:** ``ALL``, ``NONE`` + * **Default value:** ``NONE`` + + Specifies the strategy to use for materializing Common Table Expressions (CTEs) in queries. + ``NONE`` indicates that no CTEs will be materialized. + ``ALL`` indicates that all CTEs in the query will be materialized. + This can also be specified on a per-query basis using the ``cte_materialization_strategy`` session property. + +``query.cte-hash-partition-count`` +^^^^^^^^^^^^^^^^^^^^ + + * **Type:** ``integer`` + * **Default value:** ``100`` + + The number of partitions to be used for materializing Common Table Expressions (CTEs) in queries. + This setting determines how many buckets or writers should be used when materializing the CTEs, potentially affecting the performance of queries involving CTE materialization. + A higher number of partitions might improve parallelism but also increases overhead in terms of memory and network communication. + Recommended value: 4 - 10x times the size of the cluster. + This can also be specified on a per-query basis using the ``cte_hash_partition_count`` session property. + +``query.cte-partitioning-provider-catalog`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + + * **Type:** ``string`` + * **Default value:** ``system`` + + The name of the catalog to be used for Common Table Expressions (CTE) and which provides custom partitioning for Common Table Expression (CTE) materialization. + This setting specifies which catalog should be used for CTE materialization and for determining how to partition the materialization of CTEs in queries. + This can also be specified on a per-query basis using the ``cte_partitioning_provider_catalog`` session property. \ No newline at end of file diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestCteExecution.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestCteExecution.java index 2e801235beb5f..644a171a33a8e 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestCteExecution.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestCteExecution.java @@ -25,7 +25,6 @@ import java.util.Optional; import static com.facebook.presto.SystemSessionProperties.CTE_MATERIALIZATION_STRATEGY; -import static com.facebook.presto.SystemSessionProperties.PARTITIONING_PROVIDER_CATALOG; import static com.facebook.presto.SystemSessionProperties.PUSHDOWN_SUBFIELDS_ENABLED; import static com.facebook.presto.testing.assertions.Assert.assertEquals; import static io.airlift.tpch.TpchTable.CUSTOMER; @@ -48,7 +47,7 @@ protected QueryRunner createQueryRunner() return HiveQueryRunner.createQueryRunner( ImmutableList.of(ORDERS, CUSTOMER, LINE_ITEM, PART_SUPPLIER, NATION, REGION, PART, SUPPLIER), ImmutableMap.of( - "query.partitioning-provider-catalog", "hive"), + "query.cte-partitioning-provider-catalog", "hive"), "sql-standard", ImmutableMap.of("hive.pushdown-filter-enabled", "true", "hive.enable-parquet-dereference-pushdown", "true"), @@ -823,7 +822,6 @@ protected Session getMaterializedSession() return Session.builder(super.getSession()) .setSystemProperty(PUSHDOWN_SUBFIELDS_ENABLED, "true") .setSystemProperty(CTE_MATERIALIZATION_STRATEGY, "ALL") - .setSystemProperty(PARTITIONING_PROVIDER_CATALOG, "hive") .build(); } } diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index 17a8954972fb8..ab47b1d8e049f 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -88,7 +88,11 @@ public final class SystemSessionProperties public static final String DISTRIBUTED_JOIN = "distributed_join"; public static final String DISTRIBUTED_INDEX_JOIN = "distributed_index_join"; public static final String HASH_PARTITION_COUNT = "hash_partition_count"; + public static final String CTE_HASH_PARTITION_COUNT = "cte_hash_partition_count"; + public static final String PARTITIONING_PROVIDER_CATALOG = "partitioning_provider_catalog"; + + public static final String CTE_PARTITIONING_PROVIDER_CATALOG = "cte_partitioning_provider_catalog"; public static final String EXCHANGE_MATERIALIZATION_STRATEGY = "exchange_materialization_strategy"; public static final String USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT = "use_stream_exchange_for_mark_distinct"; public static final String GROUPED_EXECUTION = "grouped_execution"; @@ -409,11 +413,21 @@ public SystemSessionProperties( "Number of partitions for distributed joins and aggregations", queryManagerConfig.getHashPartitionCount(), false), + integerProperty( + CTE_HASH_PARTITION_COUNT, + "Number of partitions for materializing CTEs", + queryManagerConfig.getCteHashPartitionCount(), + false), stringProperty( PARTITIONING_PROVIDER_CATALOG, "Name of the catalog providing custom partitioning", queryManagerConfig.getPartitioningProviderCatalog(), false), + stringProperty( + CTE_PARTITIONING_PROVIDER_CATALOG, + "Name of the catalog providing custom partitioning for cte materialization", + queryManagerConfig.getCtePartitioningProviderCatalog(), + false), new PropertyMetadata<>( EXCHANGE_MATERIALIZATION_STRATEGY, format("The exchange materialization strategy to use. Options are %s", @@ -1935,11 +1949,21 @@ public static int getHashPartitionCount(Session session) return session.getSystemProperty(HASH_PARTITION_COUNT, Integer.class); } + public static int getCteHashPartitionCount(Session session) + { + return session.getSystemProperty(CTE_HASH_PARTITION_COUNT, Integer.class); + } + public static String getPartitioningProviderCatalog(Session session) { return session.getSystemProperty(PARTITIONING_PROVIDER_CATALOG, String.class); } + public static String getCtePartitioningProviderCatalog(Session session) + { + return session.getSystemProperty(CTE_PARTITIONING_PROVIDER_CATALOG, String.class); + } + public static ExchangeMaterializationStrategy getExchangeMaterializationStrategy(Session session) { return session.getSystemProperty(EXCHANGE_MATERIALIZATION_STRATEGY, ExchangeMaterializationStrategy.class); diff --git a/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java b/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java index 7611a232c74c9..24fe22c8209fa 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java @@ -48,7 +48,10 @@ public class QueryManagerConfig private int maxQueuedQueries = 5000; private int hashPartitionCount = 100; + + private int cteHashPartitionCount = 100; private String partitioningProviderCatalog = GlobalSystemConnector.NAME; + private String ctePartitioningProviderCatalog = GlobalSystemConnector.NAME; private ExchangeMaterializationStrategy exchangeMaterializationStrategy = ExchangeMaterializationStrategy.NONE; private boolean useStreamingExchangeForMarkDistinct; private boolean enableWorkerIsolation; @@ -152,6 +155,20 @@ public QueryManagerConfig setMaxQueuedQueries(int maxQueuedQueries) return this; } + @Min(1) + public int getCteHashPartitionCount() + { + return cteHashPartitionCount; + } + + @Config("query.cte-hash-partition-count") + @ConfigDescription("Number of writers or buckets allocated per materialized CTE. (Recommended value: 4 - 10x times the size of the cluster)") + public QueryManagerConfig setCteHashPartitionCount(int cteHashPartitionCount) + { + this.cteHashPartitionCount = cteHashPartitionCount; + return this; + } + @Min(1) public int getHashPartitionCount() { @@ -172,6 +189,20 @@ public String getPartitioningProviderCatalog() return partitioningProviderCatalog; } + @NotNull + public String getCtePartitioningProviderCatalog() + { + return ctePartitioningProviderCatalog; + } + + @Config("query.cte-partitioning-provider-catalog") + @ConfigDescription("Name of the catalog providing custom partitioning for cte materialization") + public QueryManagerConfig setCtePartitioningProviderCatalog(String ctePartitioningProviderCatalog) + { + this.ctePartitioningProviderCatalog = ctePartitioningProviderCatalog; + return this; + } + @Config("query.partitioning-provider-catalog") @ConfigDescription("Name of the catalog providing custom partitioning") public QueryManagerConfig setPartitioningProviderCatalog(String partitioningProviderCatalog) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PhysicalCteOptimizer.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PhysicalCteOptimizer.java index 6813c0bfe5405..d6e763ae47477 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PhysicalCteOptimizer.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PhysicalCteOptimizer.java @@ -45,9 +45,9 @@ import java.util.Map; import java.util.Optional; +import static com.facebook.presto.SystemSessionProperties.getCteHashPartitionCount; import static com.facebook.presto.SystemSessionProperties.getCteMaterializationStrategy; -import static com.facebook.presto.SystemSessionProperties.getHashPartitionCount; -import static com.facebook.presto.SystemSessionProperties.getPartitioningProviderCatalog; +import static com.facebook.presto.SystemSessionProperties.getCtePartitioningProviderCatalog; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.sql.TemporaryTableUtil.assignPartitioningVariables; import static com.facebook.presto.sql.TemporaryTableUtil.assignTemporaryTableColumnNames; @@ -133,11 +133,11 @@ public PlanNode visitCteProducer(CteProducerNode node, RewriteContext partitioningTypes = Arrays.asList(partitionVariable.getType()); - String partitioningProviderCatalog = getPartitioningProviderCatalog(session); + String partitioningProviderCatalog = getCtePartitioningProviderCatalog(session); // First column is taken as the partitioning column Partitioning partitioning = Partitioning.create( metadata.getPartitioningHandleForExchange(session, partitioningProviderCatalog, - getHashPartitionCount(session), partitioningTypes), + getCteHashPartitionCount(session), partitioningTypes), Arrays.asList(partitionVariable)); BasePlanFragmenter.PartitioningVariableAssignments partitioningVariableAssignments = assignPartitioningVariables(variableAllocator, partitioning); diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java b/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java index 478ac19d3f5ce..8e58c850d6645 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java @@ -53,6 +53,7 @@ public void testDefaults() .setMaxQueuedQueries(5000) .setHashPartitionCount(100) .setPartitioningProviderCatalog("system") + .setCtePartitioningProviderCatalog("system") .setExchangeMaterializationStrategy(ExchangeMaterializationStrategy.NONE) .setQueryManagerExecutorPoolSize(5) .setRemoteTaskMinErrorDuration(new Duration(5, TimeUnit.MINUTES)) @@ -77,6 +78,7 @@ public void testDefaults() .setGlobalQueryRetryFailureLimit(150) .setGlobalQueryRetryFailureWindow(new Duration(5, MINUTES)) .setRateLimiterBucketMaxSize(100) + .setCteHashPartitionCount(100) .setRateLimiterCacheLimit(1000) .setRateLimiterCacheWindowMinutes(5) .setEnableWorkerIsolation(false)); @@ -129,6 +131,8 @@ public void testExplicitPropertyMappings() .put("query-manager.rate-limiter-bucket-max-size", "200") .put("query-manager.rate-limiter-cache-limit", "10000") .put("query-manager.rate-limiter-cache-window-minutes", "60") + .put("query.cte-hash-partition-count", "128") + .put("query.cte-partitioning-provider-catalog", "hive") .put("query-manager.enable-worker-isolation", "true") .build(); @@ -150,6 +154,7 @@ public void testExplicitPropertyMappings() .setMaxQueuedQueries(15) .setHashPartitionCount(16) .setPartitioningProviderCatalog("hive") + .setCtePartitioningProviderCatalog("hive") .setExchangeMaterializationStrategy(ExchangeMaterializationStrategy.ALL) .setQueryManagerExecutorPoolSize(11) .setRemoteTaskMinErrorDuration(new Duration(60, SECONDS)) @@ -176,6 +181,8 @@ public void testExplicitPropertyMappings() .setRateLimiterBucketMaxSize(200) .setRateLimiterCacheLimit(10000) .setRateLimiterCacheWindowMinutes(60) + .setCteHashPartitionCount(128) + .setCtePartitioningProviderCatalog("hive") .setEnableWorkerIsolation(true); ConfigAssertions.assertFullMapping(properties, expected); } diff --git a/presto-main/src/test/java/com/facebook/presto/server/TestSessionPropertyDefaults.java b/presto-main/src/test/java/com/facebook/presto/server/TestSessionPropertyDefaults.java index 964183375bdb2..257d22cf0aaec 100644 --- a/presto-main/src/test/java/com/facebook/presto/server/TestSessionPropertyDefaults.java +++ b/presto-main/src/test/java/com/facebook/presto/server/TestSessionPropertyDefaults.java @@ -28,6 +28,7 @@ import java.util.Optional; +import static com.facebook.presto.SystemSessionProperties.CTE_HASH_PARTITION_COUNT; import static com.facebook.presto.SystemSessionProperties.HASH_PARTITION_COUNT; import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; import static com.facebook.presto.SystemSessionProperties.QUERY_MAX_MEMORY; @@ -65,6 +66,7 @@ public void testApplyDefaultProperties() .setSystemProperty(QUERY_MAX_MEMORY, "1GB") .setSystemProperty(JOIN_DISTRIBUTION_TYPE, "partitioned") .setSystemProperty(HASH_PARTITION_COUNT, "43") + .setSystemProperty(CTE_HASH_PARTITION_COUNT, "100") .setSystemProperty("override", "should be overridden") .setCatalogSessionProperty("testCatalog", "explicit_set", "explicit_set") .build(); @@ -73,6 +75,7 @@ public void testApplyDefaultProperties() .put(QUERY_MAX_MEMORY, "1GB") .put(JOIN_DISTRIBUTION_TYPE, "partitioned") .put(HASH_PARTITION_COUNT, "43") + .put(CTE_HASH_PARTITION_COUNT, "100") .put("override", "should be overridden") .build()); assertEquals( @@ -89,6 +92,7 @@ public void testApplyDefaultProperties() .put(QUERY_MAX_MEMORY, "1GB") .put(JOIN_DISTRIBUTION_TYPE, "partitioned") .put(HASH_PARTITION_COUNT, "43") + .put(CTE_HASH_PARTITION_COUNT, "100") .put("system_default", "system_default") .put("override", "overridden") .build());