Skip to content

feat(optimizer): Add exchange on table scan when number of files to be scanned is small#26941

Merged
feilong-liu merged 1 commit intoprestodb:masterfrom
feilong-liu:scan_shuffle
Jan 15, 2026
Merged

feat(optimizer): Add exchange on table scan when number of files to be scanned is small#26941
feilong-liu merged 1 commit intoprestodb:masterfrom
feilong-liu:scan_shuffle

Conversation

@feilong-liu
Copy link
Copy Markdown
Contributor

@feilong-liu feilong-liu commented Jan 10, 2026

Description

The number of tasks of a leaf fragment is proportional to the number of files in the table. When the number of files in a table is small, we want to add an exchange node on top of the table scan to increase parallelism.

Motivation and Context

In description

Impact

We have observed queries whose latency decrease from 2 minutes to 12 seconds after we manually add shuffle above the table scan.

Test Plan

Unit tests

Contributor checklist

  • Please make sure your submission complies with our contributing guide, in particular code style and commit standards.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.
  • If adding new dependencies, verified they have an OpenSSF Scorecard score of 5.0 or higher (or obtained explicit TSC approval for lower scores).

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

General Changes
* Add options to force shuffle table scan input if the number of files to be scanned is small

@prestodb-ci prestodb-ci added the from:Meta PR from Meta label Jan 10, 2026
@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai bot commented Jan 10, 2026

Reviewer's Guide

Adds file-count-aware table statistics and new optimizer/session configuration to optionally insert a shuffle (round-robin exchange) above table scans when tables have few files, plus corresponding tests and plumbing across stats, cost, and planning code.

Sequence diagram for planning table scan shuffle based on file count

sequenceDiagram
    actor Planner
    participant Session
    participant SystemSessionProperties
    participant AddExchanges
    participant Metadata
    participant HiveStatsProvider as MetastoreHiveStatisticsProvider
    participant TableStatistics

    Planner->>AddExchanges: planTableScan(tableScanNode, predicate)
    AddExchanges->>Session: getProperty(TABLE_SCAN_SHUFFLE_STRATEGY)
    Session-->>AddExchanges: shuffleStrategy
    AddExchanges->>SystemSessionProperties: getTableScanShuffleStrategy(session)
    SystemSessionProperties-->>AddExchanges: shuffleStrategy

    alt shuffleStrategy == DISABLED
        AddExchanges-->>Planner: plan without shuffle (existing behavior)
    else shuffleStrategy == ALWAYS_ENABLED
        AddExchanges->>AddExchanges: roundRobinExchange(REMOTE_STREAMING, tableScanPlan)
        AddExchanges-->>Planner: plan with shuffle
    else shuffleStrategy == FILE_COUNT_BASED
        AddExchanges->>SystemSessionProperties: getTableScanShuffleFileCountThreshold(session)
        SystemSessionProperties-->>AddExchanges: fileCountThreshold
        AddExchanges->>Metadata: getTableStatistics(session, tableHandle, assignmentsColumns, constraint)
        Metadata->>HiveStatsProvider: computeTableStatistics(partitions, statistics)
        HiveStatsProvider->>HiveStatsProvider: calculateAverageFileCountPerPartition(statistics)
        HiveStatsProvider->>TableStatistics: TableStatistics.Builder.setFileCount(Estimate.of(totalFileCount))
        HiveStatsProvider-->>Metadata: TableStatistics
        Metadata-->>AddExchanges: TableStatistics
        AddExchanges->>TableStatistics: getFileCount()
        TableStatistics-->>AddExchanges: fileCountEstimate
        alt fileCount known and < fileCountThreshold
            AddExchanges->>AddExchanges: roundRobinExchange(REMOTE_STREAMING, tableScanPlan)
        else fileCount unknown or >= threshold
            AddExchanges->>AddExchanges: keep original table scan plan
        end
        AddExchanges-->>Planner: final plan
    end
Loading

Updated class diagram for table statistics, features config, and planner shuffle logic

classDiagram
    class TableStatistics {
        - Estimate rowCount
        - Estimate totalSize
        - Estimate fileCount
        - Map~ColumnHandle, ColumnStatistics~ columnStatistics
        - ConfidenceLevel confidenceLevel
        + static TableStatistics empty()
        + Estimate getRowCount()
        + Estimate getTotalSize()
        + Estimate getFileCount()
        + ConfidenceLevel getConfidence()
        + Map~ColumnHandle, ColumnStatistics~ getColumnStatistics()
        + static TableStatistics.Builder buildFrom(TableStatistics tableStatistics)
    }

    class TableStatistics_Builder {
        - Estimate rowCount
        - Estimate totalSize
        - Estimate fileCount
        - Map~ColumnHandle, ColumnStatistics~ columnStatisticsMap
        - ConfidenceLevel confidenceLevel
        + TableStatistics_Builder setRowCount(Estimate rowCount)
        + TableStatistics_Builder setTotalSize(Estimate totalSize)
        + TableStatistics_Builder setFileCount(Estimate fileCount)
        + TableStatistics_Builder setColumnStatistics(ColumnHandle columnHandle, ColumnStatistics columnStatistics)
        + TableStatistics_Builder setColumnStatistics(Map~ColumnHandle, ColumnStatistics~ columnStatistics)
        + TableStatistics_Builder setConfidenceLevel(ConfidenceLevel confidenceLevel)
        + Map~ColumnHandle, ColumnStatistics~ getColumnStatistics()
        + TableStatistics build()
    }

    TableStatistics *-- TableStatistics_Builder : built_via

    class FeaturesConfig {
        - boolean pushdownSubfieldForMapFunctions
        - long maxSerializableObjectSize
        - boolean utilizeUniquePropertyInQueryPlanning
        - int tableScanShuffleFileCountThreshold
        - ShuffleForTableScanStrategy tableScanShuffleStrategy
        + long getMaxSerializableObjectSize()
        + int getTableScanShuffleFileCountThreshold()
        + FeaturesConfig setTableScanShuffleFileCountThreshold(int tableScanShuffleFileCountThreshold)
        + ShuffleForTableScanStrategy getTableScanShuffleStrategy()
        + FeaturesConfig setTableScanShuffleStrategy(ShuffleForTableScanStrategy tableScanShuffleStrategy)
    }

    class ShuffleForTableScanStrategy {
        <<enumeration>>
        DISABLED
        ALWAYS_ENABLED
        FILE_COUNT_BASED
    }

    FeaturesConfig --> ShuffleForTableScanStrategy : uses

    class SystemSessionProperties {
        <<utility>>
        + static String TABLE_SCAN_SHUFFLE_FILE_COUNT_THRESHOLD
        + static String TABLE_SCAN_SHUFFLE_STRATEGY
        + static int getTableScanShuffleFileCountThreshold(Session session)
        + static ShuffleForTableScanStrategy getTableScanShuffleStrategy(Session session)
    }

    SystemSessionProperties --> ShuffleForTableScanStrategy : returns

    class AddExchanges {
        - PlanNodeIdAllocator idAllocator
        + PlanWithProperties planTableScan(TableScanNode node, RowExpression predicate, boolean nativeExecution)
    }

    AddExchanges --> SystemSessionProperties : reads_session_properties
    AddExchanges --> TableStatistics : reads_fileCount

    class MetastoreHiveStatisticsProvider {
        + static TableStatistics getTableStatistics(Table table, List~Partition~ partitions, Map~Partition, PartitionStatistics~ statistics)
        + static OptionalDouble calculateAverageSizePerPartition(Collection~PartitionStatistics~ statistics)
        + static OptionalDouble calculateAverageFileCountPerPartition(Collection~PartitionStatistics~ statistics)
    }

    MetastoreHiveStatisticsProvider --> TableStatistics_Builder : builds

    class ConnectorFilterStatsCalculatorService {
        + TableStatistics filterTableStatistics(TableStatistics tableStatistics, RowExpression filter)
    }

    ConnectorFilterStatsCalculatorService --> TableStatistics_Builder : setFileCount
    ConnectorFilterStatsCalculatorService --> TableStatistics : consumes

    class Estimate {
        + static Estimate unknown()
        + static Estimate of(double value)
        + boolean isUnknown()
        + double getValue()
    }

    TableStatistics --> Estimate : uses
    TableStatistics_Builder --> Estimate : uses
    MetastoreHiveStatisticsProvider --> Estimate : uses
    ConnectorFilterStatsCalculatorService --> Estimate : uses
Loading

Flow diagram for shuffle insertion above table scan

flowchart TD
    A[Start planning table scan] --> B{nativeExecution and containsSystemTableScan?}
    B -->|Yes| C[Add gatheringExchange REMOTE_STREAMING]
    C --> Z[Return plan]
    B -->|No| D{Shuffle strategy from session}
    D -->|DISABLED| Z
    D -->|ALWAYS_ENABLED| E[Add roundRobinExchange REMOTE_STREAMING]
    E --> Z
    D -->|FILE_COUNT_BASED| F[Read tableScanShuffleFileCountThreshold]
    F --> G[Fetch TableStatistics for table scan]
    G --> H{tableStatistics.fileCount is known?}
    H -->|No| Z
    H -->|Yes| I{fileCount < threshold?}
    I -->|No| Z
    I -->|Yes| J[Add roundRobinExchange REMOTE_STREAMING]
    J --> Z[Return plan]
Loading

File-Level Changes

Change Details Files
Introduce table-level file count statistics and propagate them through Hive statistics computation and SPI TableStatistics API.
  • Extend TableStatistics to carry a fileCount Estimate, including constructor, accessors, builder, equals/hashCode, and toString updates.
  • In MetastoreHiveStatisticsProvider, compute average file count per partition, set total file count in TableStatistics when available, and allow rowCount/size to stay empty if only file count is known.
  • Add helper calculateAverageFileCountPerPartition and tests covering average computation and table statistics behavior with full, partial, and missing file count metadata.
  • Add a fileCount-based PartitionStatistics factory helper in tests.
presto-spi/src/main/java/com/facebook/presto/spi/statistics/TableStatistics.java
presto-hive/src/main/java/com/facebook/presto/hive/statistics/MetastoreHiveStatisticsProvider.java
presto-hive/src/test/java/com/facebook/presto/hive/statistics/TestMetastoreHiveStatisticsProvider.java
Add configurable optimizer and session properties to control inserting a shuffle above table scans based on strategy and file-count threshold.
  • Add FeaturesConfig fields and config properties optimizer.table-scan-shuffle-file-count-threshold and optimizer.table-scan-shuffle-strategy with enum ShuffleForTableScanStrategy (DISABLED, ALWAYS_ENABLED, FILE_COUNT_BASED) and defaults.
  • Expose matching SystemSessionProperties (TABLE_SCAN_SHUFFLE_FILE_COUNT_THRESHOLD, TABLE_SCAN_SHUFFLE_STRATEGY) with session getters.
  • Update TestFeaturesConfig defaults and explicit property mapping expectations for the new settings.
presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java
presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java
presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java
Update AddExchanges optimizer to optionally insert a round-robin exchange above table scans based on the configured shuffle strategy and file-count statistics.
  • In planTableScan, when not in native system-table mode, branch on TABLE_SCAN_SHUFFLE_STRATEGY to either always add roundRobinExchange or, for FILE_COUNT_BASED, query table statistics and add it only if fileCount is known and below TABLE_SCAN_SHUFFLE_FILE_COUNT_THRESHOLD.
  • Leave behavior unchanged when strategy is DISABLED or stats/fileCount are unknown.
presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java
Ensure filter statistics calculations preserve file-count statistics and add planner tests for the new shuffle behavior.
  • In ConnectorFilterStatsCalculatorService, propagate fileCount from input TableStatistics to filtered statistics when fileCount is known.
  • Add AddExchanges plan tests asserting presence/absence of an extra round-robin exchange above table scans when TABLE_SCAN_SHUFFLE_STRATEGY is ALWAYS_ENABLED vs DISABLED.
  • Minor formatting/indentation adjustments in existing tests.
presto-main-base/src/main/java/com/facebook/presto/cost/ConnectorFilterStatsCalculatorService.java
presto-main-base/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@feilong-liu feilong-liu requested a review from kaikalur January 10, 2026 02:39
@feilong-liu feilong-liu changed the title feat(Optimizer): Add exchange if table has few files feat(Optimizer): Add exchange on table scan when number of files to be scanned is small Jan 10, 2026
Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 3 issues, and left some high level feedback:

  • In ConnectorFilterStatsCalculatorService, fileCount is currently propagated unchanged after applying a filter, which may be misleading when filters drastically reduce row counts; consider scaling fileCount by the same selectivity factor as totalSize or explicitly documenting the intended semantics.
  • The FILE_COUNT_BASED branch in AddExchanges unconditionally calls metadata.getTableStatistics for each table scan when the strategy is enabled; if this proves expensive, consider short‑circuiting when the threshold is non‑positive or when existing statistics are already available higher up in planning rather than always re-fetching them.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In ConnectorFilterStatsCalculatorService, `fileCount` is currently propagated unchanged after applying a filter, which may be misleading when filters drastically reduce row counts; consider scaling `fileCount` by the same selectivity factor as `totalSize` or explicitly documenting the intended semantics.
- The `FILE_COUNT_BASED` branch in AddExchanges unconditionally calls `metadata.getTableStatistics` for each table scan when the strategy is enabled; if this proves expensive, consider short‑circuiting when the threshold is non‑positive or when existing statistics are already available higher up in planning rather than always re-fetching them.

## Individual Comments

### Comment 1
<location> `presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java:3303-3306` </location>
<code_context>
+
+    @Config("optimizer.table-scan-shuffle-file-count-threshold")
+    @ConfigDescription("File count threshold for adding a shuffle above table scan. When the table has fewer files than this threshold and TABLE_SCAN_SHUFFLE_STRATEGY is FILE_COUNT_BASED, a round-robin shuffle exchange is added above the table scan to redistribute data.")
+    public FeaturesConfig setTableScanShuffleFileCountThreshold(int tableScanShuffleFileCountThreshold)
+    {
+        this.tableScanShuffleFileCountThreshold = tableScanShuffleFileCountThreshold;
+        return this;
+    }
+
</code_context>

<issue_to_address>
**issue (bug_risk):** No validation on table-scan shuffle file-count threshold allows negative values

The setter for `optimizer.table-scan-shuffle-file-count-threshold` accepts a raw `int`, so negative values are allowed. Since a negative threshold will effectively disable the shuffle (almost all file counts will be `>=` a negative number), this is likely unintended. Please add validation (e.g., `@Min(0)`/`@Min(1)` or an explicit check) to reject negative values.
</issue_to_address>

### Comment 2
<location> `presto-main-base/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java:527-525` </location>
<code_context>
+        assertEquals(calculateAverageFileCountPerPartition(ImmutableList.of(fileCount(0), fileCount(10))), OptionalDouble.of(5));
+    }
+
     @Test
     public void testCalculateDistinctPartitionKeys()
     {
@@ -692,6 +707,102 @@ public void testGetTableStatistics()
                 expected);
     }

+    @Test
+    public void testGetTableStatisticsWithFileCount()
+    {
+        String partitionName1 = "p1=string1/p2=1234";
+        String partitionName2 = "p1=string2/p2=5678";
+        PartitionStatistics statistics1 = PartitionStatistics.builder()
+                .setBasicStatistics(new HiveBasicStatistics(OptionalLong.of(10), OptionalLong.of(1000), OptionalLong.of(5000), OptionalLong.empty()))
+                .build();
</code_context>

<issue_to_address>
**suggestion (testing):** Add planner tests for FILE_COUNT_BASED shuffle strategy to verify threshold and unknown-file-count behavior

The current tests only exercise the DISABLED and ALWAYS_ENABLED strategies; the FILE_COUNT_BASED branch in `AddExchanges.planTableScan` is still untested despite being core to this feature and depending on `TableStatistics.getFileCount()` and `getTableScanShuffleFileCountThreshold(session)`. Please add tests that cover:

- FILE_COUNT_BASED with file count below the threshold → round-robin exchange added above the table scan
- FILE_COUNT_BASED with file count at/above the threshold → no additional round-robin exchange
- FILE_COUNT_BASED with unknown file count (no fileCount in stats) → no additional round-robin exchange

These should assert on the distributed plan shape, similar to the existing planner tests, using controllable table statistics or a mocked metadata/stats provider to set specific fileCount values.
</issue_to_address>

### Comment 3
<location> `presto-hive/src/test/java/com/facebook/presto/hive/statistics/TestMetastoreHiveStatisticsProvider.java:289-287` </location>
<code_context>
+        assertEquals(calculateAverageFileCountPerPartition(ImmutableList.of(fileCount(0), fileCount(10))), OptionalDouble.of(5));
+    }
+
     @Test
     public void testCalculateDistinctPartitionKeys()
     {
@@ -692,6 +707,102 @@ public void testGetTableStatistics()
                 expected);
     }

+    @Test
+    public void testGetTableStatisticsWithFileCount()
+    {
+        String partitionName1 = "p1=string1/p2=1234";
+        String partitionName2 = "p1=string2/p2=5678";
+        PartitionStatistics statistics1 = PartitionStatistics.builder()
+                .setBasicStatistics(new HiveBasicStatistics(OptionalLong.of(10), OptionalLong.of(1000), OptionalLong.of(5000), OptionalLong.empty()))
+                .build();
</code_context>

<issue_to_address>
**suggestion (testing):** Add a test for getTableStatistics when only fileCount is present but row/size stats are missing

The implementation now returns a `TableStatistics` with only `fileCount` set when `calculateAverageRowsPerPartition` is empty (early return with the builder initialized from fileCount). To cover this path, please add a test where:
- Only fileCount is present in `PartitionStatistics` (row count and size unknown for all partitions), and
- `getTableStatistics` is invoked on those partitions.

The test should assert that:
- `tableStatistics.getFileCount()` is a concrete `Estimate` with the expected total (average * partition count), and
- `tableStatistics.getRowCount()` and `tableStatistics.getTotalSize()` are `Estimate.unknown()`.

This ensures fileCount-only statistics are correctly propagated for optimizer use.

Suggested implementation:

```java
        // fileCount-only statistics: row count and size unknown
        PartitionStatistics statistics1 = PartitionStatistics.builder()
                .setBasicStatistics(new HiveBasicStatistics(
                        OptionalLong.of(10),     // fileCount
                        OptionalLong.empty(),    // rowCount unknown
                        OptionalLong.empty(),    // inMemoryDataSize unknown
                        OptionalLong.empty()))   // onDiskDataSize unknown
                .build();
        PartitionStatistics statistics2 = PartitionStatistics.builder()
                .setBasicStatistics(new HiveBasicStatistics(
                        OptionalLong.of(20),     // fileCount
                        OptionalLong.empty(),    // rowCount unknown
                        OptionalLong.empty(),    // inMemoryDataSize unknown
                        OptionalLong.empty()))   // onDiskDataSize unknown
                .build();

```

After constructing `statisticsProvider` and `session` in `testGetTableStatisticsWithFileCount()`, add code to:

1. Invoke `getTableStatistics` on the table/partitions under test to get `TableStatistics tableStatistics`.
2. Assert:
   - `assertEquals(tableStatistics.getFileCount(), Estimate.of(30.0));`  
     (since the average fileCount per partition is (10 + 20) / 2 = 15 and there are 2 partitions, total = 30)
   - `assertEquals(tableStatistics.getRowCount(), Estimate.unknown());`
   - `assertEquals(tableStatistics.getTotalSize(), Estimate.unknown());`

Use the same table handle / partitioning setup and assertion style as the other `testGetTableStatistics*` methods in this file to keep consistency with the existing tests.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@jaystarshot
Copy link
Copy Markdown
Member

We can just decrease the hive split size which will force the tablescan task to be run on more nodes?

@kaikalur
Copy link
Copy Markdown
Contributor

We can just decrease the hive split size which will force the tablescan task to be run on more nodes?

I think there are some limits on it but also the idea is to not do anything but shuffle right after scan

kaikalur
kaikalur previously approved these changes Jan 10, 2026
Copy link
Copy Markdown
Contributor

@kaikalur kaikalur left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we trust the stats?

@hantangwangd
Copy link
Copy Markdown
Member

hantangwangd commented Jan 10, 2026

@feilong-liu thanks for this change.

There might be an issue with this approach when it comes to UPDATE/DELETE. In the current UPDATE/DELETE implementation, an exchangeOperator shouldn't be inserted directly between an updateOperator/deleteOperator and a tableScanOperator, as this would break their required correspondence. Consider the following example:

use iceberg.default;

set session table_scan_shuffle_strategy = 'ALWAYS_ENABLED';
create table test_delete_update(a int, b varchar);
insert into test_delete_update values(1, '1001');
insert into test_delete_update values(2, '1002');
insert into test_delete_update values(3, '1003');
insert into test_delete_update values(4, '1004');
insert into test_delete_update values(5, '1005');

// would fail
update test_delete_update set a = a + 100 where a > 1;

// would fail
delete from test_delete_update where a > 1;

set session table_scan_shuffle_strategy = 'DISABLED';

// would succeed
update test_delete_update set a = a + 100 where a > 1;
delete from test_delete_update where a > 1;

So, it seems that we need to ensure that an ExchangeNode is not inserted while processing an UPDATE/DELETE statement. What's your thoughts about this?

@feilong-liu
Copy link
Copy Markdown
Contributor Author

feilong-liu commented Jan 10, 2026

We can just decrease the hive split size which will force the tablescan task to be run on more nodes?

I've tried this approach, even when I tune the split size to have way more splits than the number of workers, it just slightly increase the number of tasks, and still not fully utilizing all available workers.
Currently we are doing some hacking in SQL level to force a random shuffle after the table scan, which is not straightforward for user to use and that's why I am adding this PR to add an easier way to fix.

And to give some context, one of the most recent queries we fixed has latency decrease from 2 minutes to 12 seconds after manually addding shuffle above the table scan (and we have seen more before)

@feilong-liu
Copy link
Copy Markdown
Contributor Author

@feilong-liu thanks for this change.

There might be an issue with this approach when it comes to UPDATE/DELETE. In the current UPDATE/DELETE implementation, an exchangeOperator shouldn't be inserted directly between an updateOperator/deleteOperator and a tableScanOperator, as this would break their required correspondence. Consider the following example:

use iceberg.default;

set session table_scan_shuffle_strategy = 'ALWAYS_ENABLED';
create table test_delete_update(a int, b varchar);
insert into test_delete_update values(1, '1001');
insert into test_delete_update values(2, '1002');
insert into test_delete_update values(3, '1003');
insert into test_delete_update values(4, '1004');
insert into test_delete_update values(5, '1005');

// would fail
update test_delete_update set a = a + 100 where a > 1;

// would fail
delete from test_delete_update where a > 1;

set session table_scan_shuffle_strategy = 'DISABLED';

// would succeed
update test_delete_update set a = a + 100 where a > 1;
delete from test_delete_update where a > 1;

So, it seems that we need to ensure that an ExchangeNode is not inserted while processing an UPDATE/DELETE statement. What's your thoughts about this?

Thanks for checking, just updated the code so as to exclude update and delete query from this change

@feilong-liu
Copy link
Copy Markdown
Contributor Author

Can we trust the stats?

For the queries you shared yesterday, it gives the right file count. We can revisit it if later we found gap in the stats

@feilong-liu feilong-liu changed the title feat(Optimizer): Add exchange on table scan when number of files to be scanned is small feat(optimizer): Add exchange on table scan when number of files to be scanned is small Jan 10, 2026
@feilong-liu feilong-liu requested a review from kaikalur January 10, 2026 05:56
kaikalur
kaikalur previously approved these changes Jan 10, 2026
@tdcmeehan
Copy link
Copy Markdown
Contributor

A file is a concept in storage. The Presto runtime doesn't understand files, it understands and schedules splits--under the hood, connectors reason about files and translate them into splits. Instead of introducing the new concept of files to Presto, I'm wondering if the connector instead could internally reason about how those files may translate into splits (since it also owns the code to do that translation), and instead we pass in estimatedSplits to TableStatistics? I think it would accomplish the same goal in this PR without introducing a brand new concept to Presto's optimizer.

I've tried this approach, even when I tune the split size to have way more splits than the number of workers, it just slightly increase the number of tasks, and still not fully utilizing all available workers.

@feilong-liu can you help to explain why this is? I'm just wondering if the proper fix is to investigate and fix that.

@kaikalur
Copy link
Copy Markdown
Contributor

kaikalur commented Jan 10, 2026

A file is a concept in storage. The Presto runtime doesn't understand files, it understands and schedules splits--under the hood, connectors reason about files and translate them into splits. Instead of introducing the new concept of files to Presto, I'm wondering if the connector instead could internally reason about how those files may translate into splits (since it also owns the code to do that translation), and instead we pass in estimatedSplits to TableStatistics? I think it would accomplish the same goal in this PR without introducing a brand new concept to Presto's optimizer.

I've tried this approach, even when I tune the split size to have way more splits than the number of workers, it just slightly increase the number of tasks, and still not fully utilizing all available workers.

@feilong-liu can you help to explain why this is? I'm just wondering if the proper fix is to investigate and fix that.

We have used this approach super effectively at Meta for over 3 years. Time to sesssion prop and expose it to users. Also I think most of these are opportunistic optimizations so it's fine to have them so support teams can use when needed and keep them off by default. At some point we can make them cost-based.

Splits is not the issue. If scan/filter/project do a ton of other things, it will still be slow as the parallelism is independent of the number of splits. We should definitely look at Split scheduling separately.

@tdcmeehan
Copy link
Copy Markdown
Contributor

@kaikalur I agree that this is likely an effective optimization, my primary feedback is using a different metric for reporting from the connector that doesn't rely on introducing a storage-related concept of files (I believe we could rely on estimatedSplits for this, but open to other ideas).

If scan/filter/project do a ton of other things, it will still be slow as the parallelism is independent of the number of splits. We should definitely look at Split scheduling separately.

For my understanding, what could be an example of this where this issue would surface in a manner independent of the parallelism of the split--for example, a very wide table with a lot of predicates? Basically, because SOURCE scheduled stages can have a max parallelism of the entire cluster, vs. the hash partition count which is typically smaller than the cluster, I'm just confused how the exchange improves things independent of the scheduling fix proposed above. I'm just asking for better understanding.

@kaikalur
Copy link
Copy Markdown
Contributor

kaikalur commented Jan 10, 2026

@kaikalur I agree that this is likely an effective optimization, my primary feedback is using a different metric for reporting from the connector that doesn't rely on introducing a storage-related concept of files (I believe we could rely on estimatedSplits for this, but open to other ideas).

If scan/filter/project do a ton of other things, it will still be slow as the parallelism is independent of the number of splits. We should definitely look at Split scheduling separately.

For my understanding, what could be an example of this where this issue would surface in a manner independent of the parallelism of the split--for example, a very wide table with a lot of predicates? Basically, because SOURCE scheduled stages can have a max parallelism of the entire cluster, vs. the hash partition count which is typically smaller than the cluster, I'm just confused how the exchange improves things independent of the scheduling fix proposed above. I'm just asking for better understanding.

That has been a mystery to us as well :( Maybe @arhimondr knows better. The symptom we see is too few files->too few tasks. Also there are situations where the number of rows is small like LLM queries in several K rows so even if you set split size to tiny, it will produce too few splits and few tasks.

One more situation we see is that heavily compressed tables (like 50-200x) with a ton of string fields produce too few splits because the table appears deceptively small on hive.

@kaikalur
Copy link
Copy Markdown
Contributor

kaikalur commented Jan 10, 2026

@tdcmeehan Also want to clarify this is just a latency issue not compute efficiency.

@feilong-liu
Copy link
Copy Markdown
Contributor Author

feilong-liu commented Jan 12, 2026

A file is a concept in storage. The Presto runtime doesn't understand files, it understands and schedules splits--under the hood, connectors reason about files and translate them into splits. Instead of introducing the new concept of files to Presto, I'm wondering if the connector instead could internally reason about how those files may translate into splits (since it also owns the code to do that translation), and instead we pass in estimatedSplits to TableStatistics? I think it would accomplish the same goal in this PR without introducing a brand new concept to Presto's optimizer.

I've tried this approach, even when I tune the split size to have way more splits than the number of workers, it just slightly increase the number of tasks, and still not fully utilizing all available workers.

@feilong-liu can you help to explain why this is? I'm just wondering if the proper fix is to investigate and fix that.

@tdcmeehan I tried to reduce the max_split_size property and node_selection_strategy to be NO_PREFERENCE, however what I observed is that the number of tasks sees slight increase, but still not fully utilize all workers in the cluster. I didn't dig deep into this issue because this is more about from my own curiosity but not a proper way to fix the issue here because:

  • Even if somehow we find the reason and successfully increase the number of splits, it will not work, because the other table scans in the same query will be affected too, which is likely to degrade the overall performance as this tuning will be highly customized for the specific problematic table scan node which has few files.
  • It's also not user friendly to use. Adding a exchange node (which is only affecting the specific scan node) is simply one session properties, and we can also add strategies like the file_count_based which is safe to turn on without user intervention. But to tune split related session properties, it will involve multiple session properties (not to mention that it's affecting the whole query), which is hard to use.

instead we pass in estimatedSplits to TableStatistics?

This may not be trivial. The split generation logic is so complex, just to check the SystemSessionProperties, there are so many session properties related to split generations, e.g. initial_splits_per_node, max_unacknowledged_splits_per_task, schedule_splits_based_on_task_load, native_max_split_preload_per_driver etc. not to mention that we have more in the HiveSessionProperties. Also the number of split depends on the total size, which unfortunately is not always available. I just checked the query which saw latency improvement, the input table does not have the totalSize in TableStatistics.

I also want to add that, this optimization of adding exchange does not only help the few tasks due to few files problem, but also help skew problems. What we observed in some queries is that even when all workers are used, sometimes a few scan workers get much more work than others which a random shuffle immediately after scan help accelerate query execution a lot.

Instead of introducing the new concept of files to Presto

For this part, it's more about what statistics to use to identify problematic table scans. The metric to be used should be 1) accurate enough 2) easy to calculate 3) available all the time.
For the production case I am studying, only the file count and on disk size are available for the input table. For now I can try to add a new API to use this stats or add fields like compressedDataSize in TableStatistics to be used, so as to use it in the optimization here. But we may need to add more stats like file count if it's not covering all cases we observe (and can be in a follow up PR ).

@tdcmeehan
Copy link
Copy Markdown
Contributor

This may not be trivial. The split generation logic is so complex, just to check the SystemSessionProperties, there are so many session properties related to split generations, e.g. initial_splits_per_node, max_unacknowledged_splits_per_task, schedule_splits_based_on_task_load, native_max_split_preload_per_driver etc. not to mention that we have more in the HiveSessionProperties.

Please note the system session properties you mention here are all governing split scheduling, not split creation. Split creation is completely handled internally by the connector, which is why I made the earlier point about files, we don't want to begin leaking the concept into core Presto.

One way to avoid leaking the concept of files is to just abstract this into something more general, basically how much parallelism do we expect from this table? I chose splits because splits are the abstraction to represent the parallelism. Note that we don't need to get the estimate perfect or even close, it just needs to be high signal. Perhaps for your internal connector, it's enough to simply use a very basic heuristic where estimated splits is just # of files. Other connectors that have more accurate statistics could improve this. And to give your users a way to just force it, you could provide a connector session property which forces this metric to indicate low parallelism, regardless of what stats you have available.

@kaikalur
Copy link
Copy Markdown
Contributor

@feilong-liu maybe we simply make it a cost-based just like the other optimizations and cost is something we can provide using number of files in our connectors and others can do whatever is appropriate for their connectors

@kaikalur
Copy link
Copy Markdown
Contributor

@feilong-liu maybe we simply make it a cost-based just like the other optimizations and cost is something we can provide using number of files in our connectors and others can do whatever is appropriate for their connectors. Or make it a connector config is another option.

kaikalur
kaikalur previously approved these changes Jan 13, 2026
@feilong-liu
Copy link
Copy Markdown
Contributor Author

One way to avoid leaking the concept of files is to just abstract this into something more general, basically how much parallelism do we expect from this table? I chose splits because splits are the abstraction to represent the parallelism. Note that we don't need to get the estimate perfect or even close, it just needs to be high signal. Perhaps for your internal connector, it's enough to simply use a very basic heuristic where estimated splits is just # of files. Other connectors that have more accurate statistics could improve this. And to give your users a way to just force it, you could provide a connector session property which forces this metric to indicate low parallelism, regardless of what stats you have available.

@tdcmeehan Thanks for the suggestions.
Since this optimization is mainly for low parallelism in table scans, instead of using file count or split count etc. which is just an approximation of scan parallelism, I just updated the code to introduce parallelismFactor which directly represents the estimated parallelism, and estimation of this metric will be connector specific.

@aditi-pandit
Copy link
Copy Markdown
Contributor

aditi-pandit commented Jan 14, 2026

@kaikalur @feilong-liu : Thanks for this optimization. Am curious what is the "table-scan-shuffle-parallelism-threshold" value you are using and how did you calibrate it ?

Copy link
Copy Markdown
Contributor

@tdcmeehan tdcmeehan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @feilong-liu. The parallelism factor abstraction makes a lot of sense. I just have some nits and a question.

It also seems like this will be useful for single connection connectors like JDBC.

private boolean utilizeUniquePropertyInQueryPlanning = true;
private String expressionOptimizerUsedInRowExpressionRewrite = "";
private double tableScanShuffleParallelismThreshold = 0.1;
private ShuffleForTableScanStrategy tableScanShuffleStrategy = ShuffleForTableScanStrategy.DISABLED;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid leaving this disabled by default, I'm wondering if you considered always making it cost based, but set a default estimate if not provided to 1, which would effectively leave the optimization disabled until connectors modify their code to provide a better estimate?

Please also add documentation for these session properties and config properties.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to keep it more explicit, and it's also consistent with other existing optimization config.

Added documentation in the corresponding rst files.

tdcmeehan
tdcmeehan previously approved these changes Jan 14, 2026
@feilong-liu
Copy link
Copy Markdown
Contributor Author

@kaikalur @feilong-liu : Thanks for this optimization. Am curious what is the "table-scan-shuffle-parallelism-threshold" value you are using and how did you calibrate it ?

This scan parallelism is hard to estimate and also connector specific, for our internal connector our plan is to use file count as an approximation.
Regarding the threshold, it will depend on the heuristic used and how the connector translate the heuristic to a output value.
For our usage case, we plan to start with conservative thresholds, which means we will do the setting so that it will be used when we only have a few files.

Copy link
Copy Markdown
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the doc! It looks great. Just two requests.

Copy link
Copy Markdown
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! (docs)

Pull updated branch, new local doc build. Looks good, thanks!

@feilong-liu feilong-liu merged commit 5130924 into prestodb:master Jan 15, 2026
111 of 112 checks passed
@feilong-liu feilong-liu deleted the scan_shuffle branch January 15, 2026 19:18
feilong-liu added a commit that referenced this pull request Jan 16, 2026
…hange (#26943)

## Description
This is targeting queries like the following:

Scan -> Remote Project -> exchange -> ... -> output

So a problem we see is that, when the number of files is small for the
scan, the number of tasks for the leaf fragment is small (even after
tuning the split size, I still see this problem). What we observe is
that, remote project which calls remote service is affected more by the
few tasks problem. We have observed queries whose latency decrease from
2 minutes to 12 seconds after we manually add shuffle above the table
scan.

In order to solve the problem, I first added
#26941 which adds a shuffle above
the table scan. However this is not enough, as the remote project will
still be pushed below the exchange node by the
`PushProjectionThroughExchange` rule. So to resolve this problem, I
added options to not pushdown projections below exchange in certain
cases.

There are four options:
* ALWAYS_PUSHDOWN: current behavior
* SKIP_IF_TABLESCAN: if the plan fragment below the exchange is a simple
scan fragment, i.e. only have project and filter in addition to the scan
node, we may not see much benefit in projection pushdown, hence skip.
* SKIP_IF_REMOTE_PROJECTION: do not pushdown remote project
* SKIP_IF_REMOTE_PROJECTION_ON_TABLESCAN: do not pushdown only when the
above two conditions both satisfied

We may only need `SKIP_IF_REMOTE_PROJECTION_ON_TABLESCAN` for the
#26941, but I want to get more
flexibility here hence added the other two options too.

## Motivation and Context
As in description

## Impact
We have observed queries whose latency decrease from 2 minutes to 12
seconds after we manually add shuffle above the table scan.

## Test Plan
Unit tests. Also have local end to end test with production queries.

## Contributor checklist

- [ ] Please make sure your submission complies with our [contributing
guide](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md),
in particular [code
style](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#code-style)
and [commit
standards](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#commit-standards).
- [ ] PR description addresses the issue accurately and concisely. If
the change is non-trivial, a GitHub Issue is referenced.
- [ ] Documented new properties (with its default value), SQL syntax,
functions, or other functionality.
- [ ] If release notes are required, they follow the [release notes
guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines).
- [ ] Adequate tests were added if applicable.
- [ ] CI passed.
- [ ] If adding new dependencies, verified they have an [OpenSSF
Scorecard](https://securityscorecards.dev/#the-checks) score of 5.0 or
higher (or obtained explicit TSC approval for lower scores).

## Release Notes
Please follow [release notes
guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines)
and fill in the release notes below.

```
== RELEASE NOTES ==

General Changes
* Add options to skip projection pushdown through exchange rule
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

from:Meta PR from Meta

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants