Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions presto-docs/src/main/sphinx/admin/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -845,3 +845,40 @@ The following properties allow tuning the :doc:`/functions/regexp`.
to hit the limit on matches for subsequent rows as well, you want to use the
correct algorithm from the beginning so as not to waste time and resources.
The more rows you are processing, the larger this value should be.

CTE Materialization Properties
--------------------------------------

``cte-materialization-strategy``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``string``
* **Allowed values:** ``ALL``, ``NONE``
* **Default value:** ``NONE``

Specifies the strategy to use for materializing Common Table Expressions (CTEs) in queries.
``NONE`` indicates that no CTEs will be materialized.
``ALL`` indicates that all CTEs in the query will be materialized.
This can also be specified on a per-query basis using the ``cte_materialization_strategy`` session property.

``query.cte-hash-partition-count``
^^^^^^^^^^^^^^^^^^^^

* **Type:** ``integer``
* **Default value:** ``100``

The number of partitions to be used for materializing Common Table Expressions (CTEs) in queries.
This setting determines how many buckets or writers should be used when materializing the CTEs, potentially affecting the performance of queries involving CTE materialization.
A higher number of partitions might improve parallelism but also increases overhead in terms of memory and network communication.
Recommended value: 4 - 10x times the size of the cluster.
This can also be specified on a per-query basis using the ``cte_hash_partition_count`` session property.

``query.cte-partitioning-provider-catalog``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``string``
* **Default value:** ``system``

The name of the catalog to be used for Common Table Expressions (CTE) and which provides custom partitioning for Common Table Expression (CTE) materialization.
This setting specifies which catalog should be used for CTE materialization and for determining how to partition the materialization of CTEs in queries.
This can also be specified on a per-query basis using the ``cte_partitioning_provider_catalog`` session property.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Optional;

import static com.facebook.presto.SystemSessionProperties.CTE_MATERIALIZATION_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.PARTITIONING_PROVIDER_CATALOG;
import static com.facebook.presto.SystemSessionProperties.PUSHDOWN_SUBFIELDS_ENABLED;
import static com.facebook.presto.testing.assertions.Assert.assertEquals;
import static io.airlift.tpch.TpchTable.CUSTOMER;
Expand All @@ -48,7 +47,7 @@ protected QueryRunner createQueryRunner()
return HiveQueryRunner.createQueryRunner(
ImmutableList.of(ORDERS, CUSTOMER, LINE_ITEM, PART_SUPPLIER, NATION, REGION, PART, SUPPLIER),
ImmutableMap.of(
"query.partitioning-provider-catalog", "hive"),
"query.cte-partitioning-provider-catalog", "hive"),
"sql-standard",
ImmutableMap.of("hive.pushdown-filter-enabled", "true",
"hive.enable-parquet-dereference-pushdown", "true"),
Expand Down Expand Up @@ -823,7 +822,6 @@ protected Session getMaterializedSession()
return Session.builder(super.getSession())
.setSystemProperty(PUSHDOWN_SUBFIELDS_ENABLED, "true")
.setSystemProperty(CTE_MATERIALIZATION_STRATEGY, "ALL")
.setSystemProperty(PARTITIONING_PROVIDER_CATALOG, "hive")
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ public final class SystemSessionProperties
public static final String DISTRIBUTED_JOIN = "distributed_join";
public static final String DISTRIBUTED_INDEX_JOIN = "distributed_index_join";
public static final String HASH_PARTITION_COUNT = "hash_partition_count";
public static final String CTE_HASH_PARTITION_COUNT = "cte_hash_partition_count";

public static final String PARTITIONING_PROVIDER_CATALOG = "partitioning_provider_catalog";

public static final String CTE_PARTITIONING_PROVIDER_CATALOG = "cte_partitioning_provider_catalog";
public static final String EXCHANGE_MATERIALIZATION_STRATEGY = "exchange_materialization_strategy";
public static final String USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT = "use_stream_exchange_for_mark_distinct";
public static final String GROUPED_EXECUTION = "grouped_execution";
Expand Down Expand Up @@ -409,11 +413,21 @@ public SystemSessionProperties(
"Number of partitions for distributed joins and aggregations",
queryManagerConfig.getHashPartitionCount(),
false),
integerProperty(
CTE_HASH_PARTITION_COUNT,
"Number of partitions for materializing CTEs",
queryManagerConfig.getCteHashPartitionCount(),
false),
stringProperty(
PARTITIONING_PROVIDER_CATALOG,
"Name of the catalog providing custom partitioning",
queryManagerConfig.getPartitioningProviderCatalog(),
false),
stringProperty(
CTE_PARTITIONING_PROVIDER_CATALOG,
"Name of the catalog providing custom partitioning for cte materialization",
queryManagerConfig.getCtePartitioningProviderCatalog(),
false),
new PropertyMetadata<>(
EXCHANGE_MATERIALIZATION_STRATEGY,
format("The exchange materialization strategy to use. Options are %s",
Expand Down Expand Up @@ -1935,11 +1949,21 @@ public static int getHashPartitionCount(Session session)
return session.getSystemProperty(HASH_PARTITION_COUNT, Integer.class);
}

public static int getCteHashPartitionCount(Session session)
{
return session.getSystemProperty(CTE_HASH_PARTITION_COUNT, Integer.class);
}

public static String getPartitioningProviderCatalog(Session session)
{
return session.getSystemProperty(PARTITIONING_PROVIDER_CATALOG, String.class);
}

public static String getCtePartitioningProviderCatalog(Session session)
{
return session.getSystemProperty(CTE_PARTITIONING_PROVIDER_CATALOG, String.class);
}

public static ExchangeMaterializationStrategy getExchangeMaterializationStrategy(Session session)
{
return session.getSystemProperty(EXCHANGE_MATERIALIZATION_STRATEGY, ExchangeMaterializationStrategy.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ public class QueryManagerConfig
private int maxQueuedQueries = 5000;

private int hashPartitionCount = 100;

private int cteHashPartitionCount = 100;
private String partitioningProviderCatalog = GlobalSystemConnector.NAME;
private String ctePartitioningProviderCatalog = GlobalSystemConnector.NAME;
private ExchangeMaterializationStrategy exchangeMaterializationStrategy = ExchangeMaterializationStrategy.NONE;
private boolean useStreamingExchangeForMarkDistinct;
private boolean enableWorkerIsolation;
Expand Down Expand Up @@ -152,6 +155,20 @@ public QueryManagerConfig setMaxQueuedQueries(int maxQueuedQueries)
return this;
}

@Min(1)
public int getCteHashPartitionCount()
{
return cteHashPartitionCount;
}

@Config("query.cte-hash-partition-count")
@ConfigDescription("Number of writers or buckets allocated per materialized CTE. (Recommended value: 4 - 10x times the size of the cluster)")
public QueryManagerConfig setCteHashPartitionCount(int cteHashPartitionCount)
{
this.cteHashPartitionCount = cteHashPartitionCount;
return this;
}

@Min(1)
public int getHashPartitionCount()
{
Expand All @@ -172,6 +189,20 @@ public String getPartitioningProviderCatalog()
return partitioningProviderCatalog;
}

@NotNull
public String getCtePartitioningProviderCatalog()
{
return ctePartitioningProviderCatalog;
}

@Config("query.cte-partitioning-provider-catalog")
@ConfigDescription("Name of the catalog providing custom partitioning for cte materialization")
public QueryManagerConfig setCtePartitioningProviderCatalog(String ctePartitioningProviderCatalog)
{
this.ctePartitioningProviderCatalog = ctePartitioningProviderCatalog;
return this;
}

@Config("query.partitioning-provider-catalog")
@ConfigDescription("Name of the catalog providing custom partitioning")
public QueryManagerConfig setPartitioningProviderCatalog(String partitioningProviderCatalog)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@
import java.util.Map;
import java.util.Optional;

import static com.facebook.presto.SystemSessionProperties.getCteHashPartitionCount;
import static com.facebook.presto.SystemSessionProperties.getCteMaterializationStrategy;
import static com.facebook.presto.SystemSessionProperties.getHashPartitionCount;
import static com.facebook.presto.SystemSessionProperties.getPartitioningProviderCatalog;
import static com.facebook.presto.SystemSessionProperties.getCtePartitioningProviderCatalog;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.sql.TemporaryTableUtil.assignPartitioningVariables;
import static com.facebook.presto.sql.TemporaryTableUtil.assignTemporaryTableColumnNames;
Expand Down Expand Up @@ -133,11 +133,11 @@ public PlanNode visitCteProducer(CteProducerNode node, RewriteContext<PhysicalCt
VariableReferenceExpression partitionVariable = actualSource.getOutputVariables()
.get(getCtePartitionIndex(actualSource.getOutputVariables()));
List<Type> partitioningTypes = Arrays.asList(partitionVariable.getType());
String partitioningProviderCatalog = getPartitioningProviderCatalog(session);
String partitioningProviderCatalog = getCtePartitioningProviderCatalog(session);
// First column is taken as the partitioning column
Partitioning partitioning = Partitioning.create(
metadata.getPartitioningHandleForExchange(session, partitioningProviderCatalog,
getHashPartitionCount(session), partitioningTypes),
getCteHashPartitionCount(session), partitioningTypes),
Arrays.asList(partitionVariable));
BasePlanFragmenter.PartitioningVariableAssignments partitioningVariableAssignments
= assignPartitioningVariables(variableAllocator, partitioning);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public void testDefaults()
.setMaxQueuedQueries(5000)
.setHashPartitionCount(100)
.setPartitioningProviderCatalog("system")
.setCtePartitioningProviderCatalog("system")
.setExchangeMaterializationStrategy(ExchangeMaterializationStrategy.NONE)
.setQueryManagerExecutorPoolSize(5)
.setRemoteTaskMinErrorDuration(new Duration(5, TimeUnit.MINUTES))
Expand All @@ -77,6 +78,7 @@ public void testDefaults()
.setGlobalQueryRetryFailureLimit(150)
.setGlobalQueryRetryFailureWindow(new Duration(5, MINUTES))
.setRateLimiterBucketMaxSize(100)
.setCteHashPartitionCount(100)
.setRateLimiterCacheLimit(1000)
.setRateLimiterCacheWindowMinutes(5)
.setEnableWorkerIsolation(false));
Expand Down Expand Up @@ -129,6 +131,8 @@ public void testExplicitPropertyMappings()
.put("query-manager.rate-limiter-bucket-max-size", "200")
.put("query-manager.rate-limiter-cache-limit", "10000")
.put("query-manager.rate-limiter-cache-window-minutes", "60")
.put("query.cte-hash-partition-count", "128")
.put("query.cte-partitioning-provider-catalog", "hive")
.put("query-manager.enable-worker-isolation", "true")
.build();

Expand All @@ -150,6 +154,7 @@ public void testExplicitPropertyMappings()
.setMaxQueuedQueries(15)
.setHashPartitionCount(16)
.setPartitioningProviderCatalog("hive")
.setCtePartitioningProviderCatalog("hive")
.setExchangeMaterializationStrategy(ExchangeMaterializationStrategy.ALL)
.setQueryManagerExecutorPoolSize(11)
.setRemoteTaskMinErrorDuration(new Duration(60, SECONDS))
Expand All @@ -176,6 +181,8 @@ public void testExplicitPropertyMappings()
.setRateLimiterBucketMaxSize(200)
.setRateLimiterCacheLimit(10000)
.setRateLimiterCacheWindowMinutes(60)
.setCteHashPartitionCount(128)
.setCtePartitioningProviderCatalog("hive")
.setEnableWorkerIsolation(true);
ConfigAssertions.assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.util.Optional;

import static com.facebook.presto.SystemSessionProperties.CTE_HASH_PARTITION_COUNT;
import static com.facebook.presto.SystemSessionProperties.HASH_PARTITION_COUNT;
import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE;
import static com.facebook.presto.SystemSessionProperties.QUERY_MAX_MEMORY;
Expand Down Expand Up @@ -65,6 +66,7 @@ public void testApplyDefaultProperties()
.setSystemProperty(QUERY_MAX_MEMORY, "1GB")
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, "partitioned")
.setSystemProperty(HASH_PARTITION_COUNT, "43")
.setSystemProperty(CTE_HASH_PARTITION_COUNT, "100")
.setSystemProperty("override", "should be overridden")
.setCatalogSessionProperty("testCatalog", "explicit_set", "explicit_set")
.build();
Expand All @@ -73,6 +75,7 @@ public void testApplyDefaultProperties()
.put(QUERY_MAX_MEMORY, "1GB")
.put(JOIN_DISTRIBUTION_TYPE, "partitioned")
.put(HASH_PARTITION_COUNT, "43")
.put(CTE_HASH_PARTITION_COUNT, "100")
.put("override", "should be overridden")
.build());
assertEquals(
Expand All @@ -89,6 +92,7 @@ public void testApplyDefaultProperties()
.put(QUERY_MAX_MEMORY, "1GB")
.put(JOIN_DISTRIBUTION_TYPE, "partitioned")
.put(HASH_PARTITION_COUNT, "43")
.put(CTE_HASH_PARTITION_COUNT, "100")
.put("system_default", "system_default")
.put("override", "overridden")
.build());
Expand Down