Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/trino-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,11 @@
<artifactId>pcollections</artifactId>
</dependency>

<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>

<dependency>
<groupId>org.weakref</groupId>
<artifactId>jmxutils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.airlift.units.MaxDataSize;

import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
Expand All @@ -31,14 +32,14 @@
"dynamic-filtering-max-per-driver-size",
"experimental.dynamic-filtering-max-per-driver-size",
"dynamic-filtering-range-row-limit-per-driver",
"experimental.dynamic-filtering-refresh-interval"
"experimental.dynamic-filtering-refresh-interval",
"dynamic-filtering.service-thread-count"
})
public class DynamicFilterConfig
{
private boolean enableDynamicFiltering = true;
Copy link
Copy Markdown
Member

@sopel39 sopel39 Jun 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this refactor is probably an overkill for capping max DF size. I think it actually introduces quadratic computational complexity too (merging multiple domains at once is more efficient).

I would rather just keep track of overall DF size.

Do we really need to cap DF size since tasks already cap DF and number of tasks is limited?
For partitioned join, each domain is separate so union is not needed. For broadcast join, we simply skip collecting subsequent domains once we get a first one.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it actually introduces quadratic computational complexity too (merging multiple domains at once is more efficient).

The ValueSet is currently being merged iteratively anyway: https://github.com/trinodb/trino/blob/master/core/trino-spi/src/main/java/io/trino/spi/predicate/ValueSet.java#L135, https://github.com/trinodb/trino/blob/master/core/trino-spi/src/main/java/io/trino/spi/predicate/SortedRangeSet.java#L580, https://github.com/trinodb/trino/blob/master/core/trino-spi/src/main/java/io/trino/spi/predicate/EquatableValueSet.java#L277. It may still result in additional objects allocation along the way, but the overhead shouldn't be significant.

I would rather just keep track of overall DF size.

It may result in hitting the limit prematurely.

Do we really need to cap DF size since tasks already cap DF and number of tasks is limited?

In fault tolerant execution we may create thousands and thousands of tasks as the total number of tasks is no longer limited by the cluster size

For partitioned join, each domain is separate so union is not needed.

This is no longer true for fault tolerant execution, as the filters are collected before the exchange.

Copy link
Copy Markdown
Member

@sopel39 sopel39 Jun 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to 4fb9bb7 union is now more efficient than O(N^2) but there is still room for improvement.

It may result in hitting the limit prematurely.

Is that really that big of a problem?

In fault tolerant execution we may create thousands and thousands of tasks as the total number of tasks is no longer limited by the cluster size

That really stresses the issue with quadratic computation. If every new task adds a tiny bit of information that coordinator needs to union over and over again.

This is no longer true for fault tolerant execution, as the filters are collected before the exchange.

I'm worried that we are repurposing DF mechanism for use-cases that won't be really that relevant for Tardigrade and that proper long-term solution is really adaptive planning. DF in Tardigrade was suppsed to be easy, but it exploded to rather significant effort. It might be best to just continue, but the complexity will remain even if new code won't be used much by community

cc @martint

Copy link
Copy Markdown
Contributor Author

@arhimondr arhimondr Jun 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that really that big of a problem?

For fault tolerant execution it is. There's a high chance the domains collected by different tasks may contain almost identical set of values as the values are going to be distributed uniformly across the tasks.

That really stresses the issue with quadratic computation. If every new task adds a tiny bit of information that coordinator needs to union over and over again.

I can add an optimization and only union when the size limit is reached.

I'm worried that we are repurposing DF mechanism for use-cases that won't be really that relevant for Tardigrade and that proper long-term solution is really adaptive planning. DF in Tardigrade was suppsed to be easy, but it exploded to rather significant effort. It might be best to just continue, but the complexity will remain even if new code won't be used much by community

For now we see this as a long term solution. For adaptive re-planning we are thinking about starting with something simple and rely only on the "data size" metric for each partition that is "free". Collecting more advanced statistics (such as NDV, etc.) at shuffle boundary is expensive and we have to be careful to make sure that the extra optimizations that are possible with the advance statistics are going to pay off the stats collection cost.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The long term goal is to unify both execution models. We should be able to take advantage of fault-tolerant execution while still being able to do speculative or quasi-pipelined execution where appropriate. Unless we're planning to get rid of DF, it does make sense to make it work with fault tolerant execution.

Also, keep in mind that fault tolerant execution is not just for long running batch queries. It can be useful for interactive queries when running clusters in unreliable hardware or ephemeral instances (e.g., spot instances in AWS)

Copy link
Copy Markdown
Member

@sopel39 sopel39 Jul 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The long term goal is to unify both execution models.

Unification can also mean hybrid model, where short queries (few mins) execute with query-level restarts (maybe speculatively), while long running queries execute with task level retries.

Unless we're planning to get rid of DF, it does make sense to make it work with fault tolerant execution.

I don't think it's that simple. IMO the complexity or Tardigrade + "new" DFs is not worth for interactive queries:

  1. there is perf penality
  2. there is higher cumulative cost too as data needs to be kept in memory
  3. Overall system/setup is much more complicated

I think we should try to make Tardigrade default execution mode for long running or memory intensive queries first (that is actually intended Tardigrade use-case). However, I'm not sure this can be done without having better (non-S3) shuffle service in OS

Also, keep in mind that fault tolerant execution is not just for long running batch queries. It can be useful for interactive queries when running clusters in unreliable hardware or ephemeral instances

Why can we start by making Tardigrade a default execution mode for large/long queries in OS? I don't think Tardigrade is that beneficial for interactive queries. Time gap for failures is much shorter, it seems that query restarts are sufficient.

Copy link
Copy Markdown
Member

@sopel39 sopel39 Jul 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also add that using shuffle service for every query is a big paradigm shift in how Trino runs queries and manages resources. Shuffle service is not native to Trino. There are setups where setting up shuffle service won't be possible (e.g. native deployments). It's also something that Trino users don't have operational experience with.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DF is not only beneficial for interactive queries. It helps with cutting computational costs for long running queries as well. While we don't care too much about low latency queries yet, we do care about overall system efficiency for fault tolerant execution (currently there's less than 15% regression in CPU efficiency with fault tolerant execution enabled, the current efficiency regression comes from applying encryption and compression for data exchanges that in theory could be disabled)

private boolean enableCoordinatorDynamicFiltersDistribution = true;
private boolean enableLargeDynamicFilters;
private int serviceThreadCount = 2;

private int smallBroadcastMaxDistinctValuesPerDriver = 200;
private DataSize smallBroadcastMaxSizePerDriver = DataSize.of(20, KILOBYTE);
Expand All @@ -48,6 +49,7 @@ public class DynamicFilterConfig
private DataSize smallPartitionedMaxSizePerDriver = DataSize.of(10, KILOBYTE);
private int smallPartitionedRangeRowLimitPerDriver = 100;
private DataSize smallPartitionedMaxSizePerOperator = DataSize.of(100, KILOBYTE);
private DataSize smallMaxSizePerFilter = DataSize.of(1, MEGABYTE);

private int largeBroadcastMaxDistinctValuesPerDriver = 5_000;
private DataSize largeBroadcastMaxSizePerDriver = DataSize.of(500, KILOBYTE);
Expand All @@ -57,6 +59,7 @@ public class DynamicFilterConfig
private DataSize largePartitionedMaxSizePerDriver = DataSize.of(50, KILOBYTE);
private int largePartitionedRangeRowLimitPerDriver = 1_000;
private DataSize largePartitionedMaxSizePerOperator = DataSize.of(500, KILOBYTE);
private DataSize largeMaxSizePerFilter = DataSize.of(5, MEGABYTE);

public boolean isEnableDynamicFiltering()
{
Expand Down Expand Up @@ -96,19 +99,6 @@ public DynamicFilterConfig setEnableLargeDynamicFilters(boolean enableLargeDynam
return this;
}

@Min(1)
public int getServiceThreadCount()
{
return serviceThreadCount;
}

@Config("dynamic-filtering.service-thread-count")
Comment thread
arhimondr marked this conversation as resolved.
Outdated
public DynamicFilterConfig setServiceThreadCount(int serviceThreadCount)
{
this.serviceThreadCount = serviceThreadCount;
return this;
}

@Min(0)
public int getSmallBroadcastMaxDistinctValuesPerDriver()
{
Expand Down Expand Up @@ -213,6 +203,20 @@ public DynamicFilterConfig setSmallPartitionedMaxSizePerOperator(DataSize smallP
return this;
}

@NotNull
@MaxDataSize("10MB")
public DataSize getSmallMaxSizePerFilter()
{
return smallMaxSizePerFilter;
}

@Config("dynamic-filtering.small.max-size-per-filter")
public DynamicFilterConfig setSmallMaxSizePerFilter(DataSize smallMaxSizePerFilter)
{
this.smallMaxSizePerFilter = smallMaxSizePerFilter;
return this;
}

@Min(0)
public int getLargeBroadcastMaxDistinctValuesPerDriver()
{
Expand Down Expand Up @@ -316,4 +320,18 @@ public DynamicFilterConfig setLargePartitionedMaxSizePerOperator(DataSize largeP
this.largePartitionedMaxSizePerOperator = largePartitionedMaxSizePerOperator;
return this;
}

@NotNull
@MaxDataSize("10MB")
public DataSize getLargeMaxSizePerFilter()
{
return largeMaxSizePerFilter;
}

@Config("dynamic-filtering.large.max-size-per-filter")
public DynamicFilterConfig setLargeMaxSizePerFilter(DataSize largeMaxSizePerFilter)
{
this.largeMaxSizePerFilter = largeMaxSizePerFilter;
return this;
}
}
Loading