Skip to content

Route splits by affinity key in the scheduler#29182

Open
raunaqmorarka wants to merge 7 commits intomasterfrom
user/split-cache-key-scheduling
Open

Route splits by affinity key in the scheduler#29182
raunaqmorarka wants to merge 7 commits intomasterfrom
user/split-cache-key-scheduling

Conversation

@raunaqmorarka
Copy link
Copy Markdown
Member

@raunaqmorarka raunaqmorarka commented Apr 21, 2026

Description

Move split-to-worker placement for cache-aware scheduling out of connectors and into the engine, behind a new ConnectorSplit#getAffinityKey() SPI. Connectors return an opaque key; the engine maps it to preferred workers via a single consistent-hash ring.

Why

Today, Hive, Delta, and Iceberg each inject a per-catalog CachingHostAddressProvider and compute split→worker placement themselves. This puts scheduling logic in connectors, duplicates a hash-ring + refresher thread per catalog over the same worker set, and leaves Hudi out because wiring it requires copying the pattern. The name CacheKey also couples the API to filesystem caching, but the underlying mechanism — "splits with the same key run on the same worker across queries" — is a general node-affinity primitive.

Approach

  • SPI: ConnectorSplit#getAffinityKey(): Optional<String>. Contract: same key ⇒ same worker(s); only remotely accessible splits may supply a key. The engine's Split wrapper enforces the latter.
  • Engine: one ConsistentHashingAddressProvider bound in CoordinatorModule, lazily refreshed on access (no background executor). Scheduler call sites consult the key inline; when empty, split.getAddresses() is used unchanged.
  • Filesystem library: new SplitAffinityProvider interface with a no-op default and a cache-enabled impl bound on the coordinator when `fs.cache.enabled=true`.
  • Connectors: Hive, Delta, Iceberg, Hudi inject SplitAffinityProvider and call getKey(path, offset, length) during split creation. Hive keeps its forceLocalScheduling + HDFS block-location path.
  • Cleanup (folded into the Iceberg commit, the last migrant): delete CachingHostAddressProvider + its impls + its test, drop the allgood-consistent-hash dep from lib/trino-filesystem, remove the old Guice bindings.

Additional context and related issues

Suggested commit-order review:

  1. Route splits by affinity key in the scheduler — SPI + engine.
  2. Use affinity key in Hive split placement — introduces SplitAffinityProvider and its bindings.
  3. Use affinity key in Delta Lake split placement.
  4. Use affinity key in Iceberg split placement — also deletes the now-unused shared classes.
  5. Add affinity key to Hudi split — net-new for Hudi.
  6. Convert IcebergSplit to a record — incidental cleanup now that the class is smaller.

Release notes

(x) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

## Section
* Fix some things. ({issue}`issuenumber`)

@cla-bot cla-bot Bot added the cla-signed label Apr 21, 2026
@github-actions github-actions Bot added hudi Hudi connector iceberg Iceberg connector delta-lake Delta Lake connector hive Hive connector labels Apr 21, 2026
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 29e4195738

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java Outdated
@raunaqmorarka raunaqmorarka force-pushed the user/split-cache-key-scheduling branch 2 times, most recently from 3830fbe to d8de793 Compare April 21, 2026 11:18
@raunaqmorarka
Copy link
Copy Markdown
Member Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 21, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 21, 2026

Caution

Review failed

The head commit changed during the review from 4a15a9f to fa339b2.

📝 Walkthrough

Walkthrough

This pull request implements a distributed scheduler optimization that replaces host-address based split placement with cache-aware split affinity routing. The change moves consistent hashing logic from the filesystem caching layer to the core scheduler, introducing a new ConsistentHashingAddressProvider that uses an external consistent hash ring library to determine worker placement based on split affinity keys. Multiple split implementations (DeltaLake, Hive, Hudi, Iceberg) are refactored to carry optional affinity key metadata instead of explicit host addresses, and schedulers are updated to query these keys via a new SplitAffinityProvider interface. Related Guice modules, factory classes, and tests are wired to support the new provider throughout the execution and scheduling layers.

Possibly related PRs

  • trinodb/trino#28988: Modifies IcebergSplit's internal data representation by converting it to a record type and changing its serialization fields, directly aligned with this PR's IcebergSplit refactoring to use affinity keys.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplit.java (1)

77-105: ⚠️ Potential issue | 🟡 Minor

Assert the affinity key in the JSON round-trip.

Line 77 seeds a non-empty affinity key, but the test never verifies it after deserialization. If the JSON property is dropped or deserialized as empty, this test would still pass.

Proposed test assertion
         assertThat(actual.getBucketConversion()).isEqualTo(expected.getBucketConversion());
         assertThat(actual.isForceLocalScheduling()).isEqualTo(expected.isForceLocalScheduling());
+        assertThat(actual.getAffinityKey()).isEqualTo(expected.getAffinityKey());
         assertThat(actual.getAcidInfo().get()).isEqualTo(expected.getAcidInfo().get());
         assertThat(actual.getSplitWeight()).isEqualTo(expected.getSplitWeight());
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplit.java`
around lines 77 - 105, The test seeds a non-empty affinity key on the HiveSplit
but never verifies it after JSON round-trip; add an assertion comparing the
deserialized split's affinity key to the original. Specifically, after creating
expected and actual (via codec.toJson/fromJson), add an assertion that
actual.getAffinityKey().equals(expected.getAffinityKey()) (or use
assertThat(actual.getAffinityKey()).isEqualTo(expected.getAffinityKey())
consistent with other assertions) to ensure the affinity key survives
serialization/deserialization.
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java (1)

48-100: ⚠️ Potential issue | 🟠 Major

Include affinityKey in retained-size accounting.

The key is stored on coordinator split objects and can contain path-derived strings, so queued split memory is now under-counted.

Proposed fix
     public long getRetainedSizeInBytes()
     {
         return INSTANCE_SIZE
                 + estimatedSizeOf(path)
                 + sizeOf(fileRowCount, value -> LONG_INSTANCE_SIZE)
                 + sizeOf(deletionVector, DeletionVectorEntry::sizeInBytes)
+                + sizeOf(affinityKey, SizeOf::estimatedSizeOf)
                 + splitWeight.getRetainedSizeInBytes()
                 + statisticsPredicate.getRetainedSizeInBytes(DeltaLakeColumnHandle::retainedSizeInBytes)
                 + estimatedSizeOf(partitionKeys, SizeOf::estimatedSizeOf, value -> sizeOf(value, SizeOf::estimatedSizeOf));
     }

Also applies to: 178-188

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java`
around lines 48 - 100, The retained-size calculation for DeltaLakeSplit misses
the affinityKey field, undercounting memory for coordinator split objects;
update the retained size computation (the retainedSize method in DeltaLakeSplit)
to account for the Optional<String> affinityKey by adding its retained size when
present (include the string object's size plus Optional overhead as you handle
other Optional fields like fileRowCount/deletionVector), ensuring affinityKey is
included both in the primary retainedSize implementation and the alternative
branch referenced around lines 178-188 so queued split memory is correctly
tallied.
plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplit.java (1)

54-150: ⚠️ Potential issue | 🟠 Major

Account for affinityKey in getRetainedSizeInBytes().

The split now retains an optional path-derived affinity key, but the memory estimate still only includes addresses and other existing fields.

Proposed fix
                 + schema.getRetainedSizeInBytes()
                 + estimatedSizeOf(partitionKeys, HivePartitionKey::estimatedSizeInBytes)
                 + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes)
+                + sizeOf(affinityKey, value -> estimatedSizeOf(value))
                 + estimatedSizeOf(partitionName)
                 + sizeOf(readBucketNumber)
                 + sizeOf(tableBucketNumber)

Also applies to: 280-296

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplit.java` around
lines 54 - 150, getRetainedSizeInBytes currently omits the new field
affinityKey; update the HiveSplit.getRetainedSizeInBytes() implementation to
account for the Optional<String> affinityKey by adding the retained size of the
Optional itself and, when present, the retained size of the contained String
(use the same retained-size utility/pattern used for
addresses/partitionName/partitionKeys in the method). Reference the affinityKey
field and the getRetainedSizeInBytes() method so the Optional and its inner
String are included in the total memory estimate.
core/trino-main/src/main/java/io/trino/execution/scheduler/ConsistentHashingAddressProvider.java (1)

88-111: ⚠️ Potential issue | 🟡 Minor

Set lastRefreshTime only after a successful ring update.

Today lastRefreshTime is advanced on line 96 before the addAll/remove mutations (lines 102, 105). If a mutation throws, the catch on line 108 logs and returns, leaving the ring only partially updated while the timestamp already reflects "recently refreshed" — so the next refresh is suppressed for another WORKER_NODES_CACHE_TIMEOUT_SECS. Moving the timestamp assignment to after the mutations (or inside a finally only on success) would let a transient failure self-heal on the next call.

🛡️ Proposed fix
     `@VisibleForTesting`
     synchronized void refreshHashRing()
     {
         try {
             Set<TrinoNode> trinoNodes = nodeManager.getNodes(NodeState.ACTIVE).stream()
                     .filter(node -> !node.isCoordinator())
                     .map(TrinoNode::of)
                     .collect(toImmutableSet());
-            lastRefreshTime = System.nanoTime();
             Set<TrinoNode> hashRingNodes = consistentHashRing.getNodes();
             Set<TrinoNode> removedNodes = Sets.difference(hashRingNodes, trinoNodes);
             Set<TrinoNode> newNodes = Sets.difference(trinoNodes, hashRingNodes);
             // Avoid acquiring a write lock in consistentHashRing if possible
             if (!newNodes.isEmpty()) {
                 consistentHashRing.addAll(newNodes);
             }
             if (!removedNodes.isEmpty()) {
                 removedNodes.forEach(consistentHashRing::remove);
             }
+            lastRefreshTime = System.nanoTime();
         }
         catch (Exception e) {
             log.error(e, "Error refreshing hash ring");
         }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@core/trino-main/src/main/java/io/trino/execution/scheduler/ConsistentHashingAddressProvider.java`
around lines 88 - 111, The method refreshHashRing() updates lastRefreshTime too
early (before mutations) which can suppress subsequent refreshes if
addAll/remove throw; move the assignment of lastRefreshTime so it only executes
after consistentHashRing.addAll(newNodes) and
removedNodes.forEach(consistentHashRing::remove) complete successfully (or set
it inside the try block after those mutations), ensuring lastRefreshTime
reflects a successful refresh; reference refreshHashRing(), lastRefreshTime,
consistentHashRing, addAll and remove when applying the change.
🧹 Nitpick comments (3)
core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplit.java (1)

44-53: Consider strengthening the Javadoc contract.

The PR objective states the contract is "identical keys map to the same worker(s)". The current doc mentions routing of related content but not that equal keys deterministically resolve to the same preferred worker(s) across queries. Worth calling out explicitly so connector authors know how to construct stable keys (e.g., include file path/offset but not per-query state).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplit.java`
around lines 44 - 53, Update the Javadoc for ConnectorSplit.getAffinityKey() to
explicitly state that equal affinity keys must deterministically resolve to the
same preferred worker(s) across queries (i.e., identical keys map to the same
worker(s)), that only remotely accessible splits may return a key (see
isRemotelyAccessible()), and to advise connector implementers to build stable
keys (for example using file path and offset rather than per-query state) so
affinity remains consistent across runs.
core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestArbitraryDistributionSplitAssigner.java (1)

62-62: Add test cases for split affinity key routing.

The ConsistentHashingAddressProvider is wired into ArbitraryDistributionSplitAssigner but test splits via TestingConnectorSplit return empty affinity keys, so the actual affinity key resolution path in getNodeRequirements() (line 372-374) is never exercised. Consider adding a test case where splits return non-empty affinity keys via getAffinityKey() to verify the provider's host resolution logic is correct.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestArbitraryDistributionSplitAssigner.java`
at line 62, Add a unit test that exercises split affinity key routing by
creating a TestingConnectorSplit (or a small test implementation of
ConnectorSplit) that returns a non-empty affinity key from getAffinityKey(),
then pass that split through ArbitraryDistributionSplitAssigner (wired with the
existing CONSISTENT_HASHING_ADDRESS_PROVIDER) and assert that
getNodeRequirements() resolves hosts via ConsistentHashingAddressProvider as
expected; specifically, locate where
ArbitraryDistributionSplitAssigner.getNodeRequirements() is invoked in tests and
add a case using a split with a populated affinity key to verify the provider's
host resolution logic.
plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java (1)

307-315: Optional: consider extracting the affinity-key computation for readability.

The nested ternary is correct (and splitBytes is deterministic here because deterministicSplits is only true when maxInitialSplits == 0 or maxInitialSplitSize.equals(maxSplitSize)), but lifting it into a small helper would make the two strategies easier to scan.

♻️ Possible extraction
-                Optional<String> affinityKey = internalSplit.isForceLocalScheduling()
-                        ? Optional.empty()
-                        : deterministicSplits
-                                ? splitAffinityProvider.getKey(internalSplit.getPath(), internalSplit.getStart(), splitBytes)
-                                : splitAffinityProvider.getKey(internalSplit.getPath(), 0, internalSplit.getEstimatedFileSize());
+                Optional<String> affinityKey = computeAffinityKey(internalSplit, splitBytes);
private Optional<String> computeAffinityKey(InternalHiveSplit internalSplit, long splitBytes)
{
    if (internalSplit.isForceLocalScheduling()) {
        return Optional.empty();
    }
    if (deterministicSplits) {
        return splitAffinityProvider.getKey(internalSplit.getPath(), internalSplit.getStart(), splitBytes);
    }
    // Use a file-wide key so all splits of the same file land on the same worker and reuse cached content
    return splitAffinityProvider.getKey(internalSplit.getPath(), 0, internalSplit.getEstimatedFileSize());
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java`
around lines 307 - 315, Extract the nested ternary that computes the affinityKey
into a small helper method (e.g., computeAffinityKey) to improve readability:
move the logic that checks internalSplit.isForceLocalScheduling(),
deterministicSplits, and calls
splitAffinityProvider.getKey(internalSplit.getPath(), ..., ...) into that helper
and return Optional.empty() for force-local, the path/start/splitBytes variant
when deterministicSplits is true, otherwise the file-wide key using start=0 and
internalSplit.getEstimatedFileSize(); then replace the existing ternary with a
single call to computeAffinityKey(internalSplit, splitBytes) inside
HiveSplitSource.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@core/trino-main/src/main/java/io/trino/execution/scheduler/ConsistentHashingAddressProvider.java`:
- Around line 53-74: The HashRing is mutated in refreshHashRing() while
getHosts() reads it via consistentHashRing.locate(...), causing a race; replace
this with a thread-safe pattern: make the ring a mutable holder (e.g.,
AtomicReference<ConsistentHash<TrinoNode>> ringRef) or use a final lock object
and synchronize both refreshHashRing() and getHosts() around accesses to the
ring. If using AtomicReference, construct a new HashRing instance in
refreshHashRing(), populate it (addAll/remove) off-thread, then atomically swap
ringRef.set(newRing) and have getHosts() call ringRef.get().locate(cacheKey,
preferredHostsCount); if using synchronization, guard all references to
consistentHashRing (both locate and mutation in refreshHashRing) with the same
lock to eliminate concurrent read/write on the non-thread-safe HashRing.

---

Outside diff comments:
In
`@core/trino-main/src/main/java/io/trino/execution/scheduler/ConsistentHashingAddressProvider.java`:
- Around line 88-111: The method refreshHashRing() updates lastRefreshTime too
early (before mutations) which can suppress subsequent refreshes if
addAll/remove throw; move the assignment of lastRefreshTime so it only executes
after consistentHashRing.addAll(newNodes) and
removedNodes.forEach(consistentHashRing::remove) complete successfully (or set
it inside the try block after those mutations), ensuring lastRefreshTime
reflects a successful refresh; reference refreshHashRing(), lastRefreshTime,
consistentHashRing, addAll and remove when applying the change.

In
`@plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java`:
- Around line 48-100: The retained-size calculation for DeltaLakeSplit misses
the affinityKey field, undercounting memory for coordinator split objects;
update the retained size computation (the retainedSize method in DeltaLakeSplit)
to account for the Optional<String> affinityKey by adding its retained size when
present (include the string object's size plus Optional overhead as you handle
other Optional fields like fileRowCount/deletionVector), ensuring affinityKey is
included both in the primary retainedSize implementation and the alternative
branch referenced around lines 178-188 so queued split memory is correctly
tallied.

In `@plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplit.java`:
- Around line 54-150: getRetainedSizeInBytes currently omits the new field
affinityKey; update the HiveSplit.getRetainedSizeInBytes() implementation to
account for the Optional<String> affinityKey by adding the retained size of the
Optional itself and, when present, the retained size of the contained String
(use the same retained-size utility/pattern used for
addresses/partitionName/partitionKeys in the method). Reference the affinityKey
field and the getRetainedSizeInBytes() method so the Optional and its inner
String are included in the total memory estimate.

In `@plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplit.java`:
- Around line 77-105: The test seeds a non-empty affinity key on the HiveSplit
but never verifies it after JSON round-trip; add an assertion comparing the
deserialized split's affinity key to the original. Specifically, after creating
expected and actual (via codec.toJson/fromJson), add an assertion that
actual.getAffinityKey().equals(expected.getAffinityKey()) (or use
assertThat(actual.getAffinityKey()).isEqualTo(expected.getAffinityKey())
consistent with other assertions) to ensure the affinity key survives
serialization/deserialization.

---

Nitpick comments:
In
`@core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestArbitraryDistributionSplitAssigner.java`:
- Line 62: Add a unit test that exercises split affinity key routing by creating
a TestingConnectorSplit (or a small test implementation of ConnectorSplit) that
returns a non-empty affinity key from getAffinityKey(), then pass that split
through ArbitraryDistributionSplitAssigner (wired with the existing
CONSISTENT_HASHING_ADDRESS_PROVIDER) and assert that getNodeRequirements()
resolves hosts via ConsistentHashingAddressProvider as expected; specifically,
locate where ArbitraryDistributionSplitAssigner.getNodeRequirements() is invoked
in tests and add a case using a split with a populated affinity key to verify
the provider's host resolution logic.

In `@core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplit.java`:
- Around line 44-53: Update the Javadoc for ConnectorSplit.getAffinityKey() to
explicitly state that equal affinity keys must deterministically resolve to the
same preferred worker(s) across queries (i.e., identical keys map to the same
worker(s)), that only remotely accessible splits may return a key (see
isRemotelyAccessible()), and to advise connector implementers to build stable
keys (for example using file path and offset rather than per-query state) so
affinity remains consistent across runs.

In `@plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java`:
- Around line 307-315: Extract the nested ternary that computes the affinityKey
into a small helper method (e.g., computeAffinityKey) to improve readability:
move the logic that checks internalSplit.isForceLocalScheduling(),
deterministicSplits, and calls
splitAffinityProvider.getKey(internalSplit.getPath(), ..., ...) into that helper
and return Optional.empty() for force-local, the path/start/splitBytes variant
when deterministicSplits is true, otherwise the file-wide key using start=0 and
internalSplit.getEstimatedFileSize(); then replace the existing ternary with a
single call to computeAffinityKey(internalSplit, splitBytes) inside
HiveSplitSource.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 00411116-6a6f-4665-a0ef-32a03d1ec5d8

📥 Commits

Reviewing files that changed from the base of the PR and between 639374c and d8de793.

📒 Files selected for processing (52)
  • core/trino-main/pom.xml
  • core/trino-main/src/main/java/io/trino/execution/scheduler/ConsistentHashingAddressProvider.java
  • core/trino-main/src/main/java/io/trino/execution/scheduler/ConsistentHashingAddressProviderConfig.java
  • core/trino-main/src/main/java/io/trino/execution/scheduler/TopologyAwareNodeSelector.java
  • core/trino-main/src/main/java/io/trino/execution/scheduler/TopologyAwareNodeSelectorFactory.java
  • core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelector.java
  • core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelectorFactory.java
  • core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/ArbitraryDistributionSplitAssigner.java
  • core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenTaskSourceFactory.java
  • core/trino-main/src/main/java/io/trino/metadata/Split.java
  • core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java
  • core/trino-main/src/main/java/io/trino/testing/PlanTester.java
  • core/trino-main/src/test/java/io/trino/execution/BenchmarkNodeScheduler.java
  • core/trino-main/src/test/java/io/trino/execution/TestNodeScheduler.java
  • core/trino-main/src/test/java/io/trino/execution/scheduler/TestMultiSourcePartitionedScheduler.java
  • core/trino-main/src/test/java/io/trino/execution/scheduler/TestScaledWriterScheduler.java
  • core/trino-main/src/test/java/io/trino/execution/scheduler/TestSourcePartitionedScheduler.java
  • core/trino-main/src/test/java/io/trino/execution/scheduler/TestUniformNodeSelector.java
  • core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestArbitraryDistributionSplitAssigner.java
  • core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplit.java
  • lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemCacheModule.java
  • lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java
  • lib/trino-filesystem/pom.xml
  • lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheSplitAffinityProvider.java
  • lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CachingHostAddressProvider.java
  • lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/NoopSplitAffinityProvider.java
  • lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/SplitAffinityProvider.java
  • lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestConsistentHashingCacheHostAddressProvider.java
  • plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java
  • plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java
  • plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java
  • plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java
  • plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplit.java
  • plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java
  • plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitSource.java
  • plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java
  • plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java
  • plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplit.java
  • plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveSplitSource.java
  • plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestNodeLocalDynamicSplitPruning.java
  • plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplit.java
  • plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitManager.java
  • plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java
  • plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java
  • plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiSplitFactory.java
  • plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergBucketFunction.java
  • plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java
  • plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java
  • plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java
  • plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java
  • plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java
  • plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java
💤 Files with no reviewable changes (3)
  • lib/trino-filesystem/pom.xml
  • lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CachingHostAddressProvider.java
  • lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestConsistentHashingCacheHostAddressProvider.java

@raunaqmorarka raunaqmorarka force-pushed the user/split-cache-key-scheduling branch from d8de793 to 3f5d92e Compare April 21, 2026 20:11
@chenjian2664
Copy link
Copy Markdown
Contributor

You're effectively hard-wiring all connectors to a single consistent-hash strategy whenever getAffinityKey is implemented. That removes any flexibility to adopt alternative strategies in the future. For example, if we introduce a new CachingHostAddressProvider (in today's filesystem cache module) with a different routing policy, there would be no way to opt into it under the current design. Is that an intended constraint?

@raunaqmorarka
Copy link
Copy Markdown
Member Author

You're effectively hard-wiring all connectors to a single consistent-hash strategy whenever getAffinityKey is implemented. That removes any flexibility to adopt alternative strategies in the future. For example, if we introduce a new CachingHostAddressProvider (in today's filesystem cache module) with a different routing policy, there would be no way to opt into it under the current design. Is that an intended constraint?

The old getAddresses way is still available for connectors which want to use a custom strategy.
There was no need for flexibility here, the connector is just interested in providing a splits placement policy and not in dealing with actual host addresses or running the consistent hashing code itself. Doing it this way prevents duplication of same consistent hashing ring per catalog.

@chenjian2664
Copy link
Copy Markdown
Contributor

You're effectively hard-wiring all connectors to a single consistent-hash strategy whenever getAffinityKey is implemented. That removes any flexibility to adopt alternative strategies in the future. For example, if we introduce a new CachingHostAddressProvider (in today's filesystem cache module) with a different routing policy, there would be no way to opt into it under the current design. Is that an intended constraint?

The old getAddresses way is still available for connectors which want to use a custom strategy. There was no need for flexibility here, the connector is just interested in providing a splits placement policy and not in dealing with actual host addresses or running the consistent hashing code itself. Doing it this way prevents duplication of same consistent hashing ring per catalog.

Once you are rollback to use getAddresses, the getAffinityKey will be redundant and shouldn't be serialized, the situation will the same as now.
Let's say if the today the IcebergSplit use a strategy in fs that using partition range to schedule, how would the current design to support that?

@raunaqmorarka
Copy link
Copy Markdown
Member Author

You're effectively hard-wiring all connectors to a single consistent-hash strategy whenever getAffinityKey is implemented. That removes any flexibility to adopt alternative strategies in the future. For example, if we introduce a new CachingHostAddressProvider (in today's filesystem cache module) with a different routing policy, there would be no way to opt into it under the current design. Is that an intended constraint?

The old getAddresses way is still available for connectors which want to use a custom strategy. There was no need for flexibility here, the connector is just interested in providing a splits placement policy and not in dealing with actual host addresses or running the consistent hashing code itself. Doing it this way prevents duplication of same consistent hashing ring per catalog.

Once you are rollback to use getAddresses, the getAffinityKey will be redundant and shouldn't be serialized, the situation will the same as now. Let's say if the today the IcebergSplit use a strategy in fs that using partition range to schedule, how would the current design to support that?

If getAddresses is populated, then getAffinityKey is enforced to be empty, so there is no extra serialization cost.
Iceberg is still free to populate getAffinityKey in different ways, e.g. instead of split path and offsets, it can be partition value. When splits are remotely accessible (as is the case in data lake connectors), there is no real reason for the connector to concern itself with specific host addresses.

Copy link
Copy Markdown
Contributor

@chenjian2664 chenjian2664 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rebase fix the ci failures

Comment thread plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java Outdated
Comment thread plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java Outdated
@raunaqmorarka raunaqmorarka force-pushed the user/split-cache-key-scheduling branch from 2c3e179 to 4a15a9f Compare April 23, 2026 09:18
@raunaqmorarka
Copy link
Copy Markdown
Member Author

@coderabbitai full review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 23, 2026

✅ Actions performed

Full review triggered.

Add an optional affinity key to ConnectorSplit. When a split
returns a key, the engine maps it to preferred worker addresses so
splits reading related content land on the same worker across
queries. When empty, scheduling keeps using split.getAddresses().

Only remotely accessible splits may carry an affinity key; the
engine's Split wrapper enforces this, since a non-remote split is
already pinned to its own addresses.
Populate HiveSplit's affinity key when filesystem caching is
enabled so splits reading the same file reuse worker-local cache.
Hive no longer injects CachingHostAddressProvider; it receives a
SplitAffinityProvider configured by FileSystemModule.

HDFS block locations still flow through HiveSplit.getAddresses(),
preserving the forceLocalScheduling path for clusters that pin
splits to specific hosts.
Populate DeltaLakeSplit's affinity key when filesystem caching is
enabled so splits reading the same file reuse worker-local cache.
Delta no longer injects CachingHostAddressProvider.

Drop the unused addresses field; Delta never emitted HDFS block
locations or used the forceLocalScheduling path.
Populate IcebergSplit's affinity key when filesystem caching is
enabled so splits reading the same file reuse worker-local cache.
Iceberg no longer injects CachingHostAddressProvider.

Drop the unused addresses field; Iceberg never emitted HDFS block
locations or used the forceLocalScheduling path.

As the last connector to rely on the per-catalog
CachingHostAddressProvider, this commit also removes the interface
and its implementations from lib/trino-filesystem, drops the
allgood-consistent-hash dependency there (the engine now owns it),
and removes the now-unused bindings in FileSystemModule and
AlluxioFileSystemCacheModule.
Populate HudiSplit's affinity key when filesystem caching is
enabled so splits reading the same file reuse worker-local cache.
This is net-new cache-aware scheduling for Hudi; no prior
placement mechanism existed.

HudiSplitFactory and its loader chain receive SplitAffinityProvider
so split creation can consult it for each file range.
@raunaqmorarka raunaqmorarka force-pushed the user/split-cache-key-scheduling branch from 4a15a9f to fa339b2 Compare April 23, 2026 12:08
@raunaqmorarka raunaqmorarka force-pushed the user/split-cache-key-scheduling branch from fa339b2 to a4c4c8c Compare April 23, 2026 16:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cla-signed delta-lake Delta Lake connector hive Hive connector hudi Hudi connector iceberg Iceberg connector

Development

Successfully merging this pull request may close these issues.

2 participants