diff --git a/core/trino-main/src/main/java/io/trino/connector/CatalogServiceProviderModule.java b/core/trino-main/src/main/java/io/trino/connector/CatalogServiceProviderModule.java index 784e04da1cd1..0d5172583a24 100644 --- a/core/trino-main/src/main/java/io/trino/connector/CatalogServiceProviderModule.java +++ b/core/trino-main/src/main/java/io/trino/connector/CatalogServiceProviderModule.java @@ -37,6 +37,7 @@ import io.trino.spi.connector.ConnectorNodePartitioningProvider; import io.trino.spi.connector.ConnectorPageSinkProvider; import io.trino.spi.connector.ConnectorPageSourceProviderFactory; +import io.trino.spi.connector.ConnectorSplitAddressProvider; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.function.FunctionProvider; @@ -59,6 +60,13 @@ public static CatalogServiceProvider createSplitManagerPr return new ConnectorCatalogServiceProvider<>("split manager", connectorServicesProvider, connector -> connector.getSplitManager().orElse(null)); } + @Provides + @Singleton + public static CatalogServiceProvider createSplitAddressProviderProvider(ConnectorServicesProvider connectorServicesProvider) + { + return new ConnectorCatalogServiceProvider<>("split address provider", connectorServicesProvider, ConnectorServices::getSplitAddressProvider); + } + @Provides @Singleton public static CatalogServiceProvider createPageSourceProviderFactory(ConnectorServicesProvider connectorServicesProvider) diff --git a/core/trino-main/src/main/java/io/trino/connector/ConnectorServices.java b/core/trino-main/src/main/java/io/trino/connector/ConnectorServices.java index 219ad1a1c30d..6f26e4d530f5 100644 --- a/core/trino-main/src/main/java/io/trino/connector/ConnectorServices.java +++ b/core/trino-main/src/main/java/io/trino/connector/ConnectorServices.java @@ -31,6 +31,7 @@ import io.trino.spi.connector.ConnectorPageSourceProviderFactory; import io.trino.spi.connector.ConnectorRecordSetProvider; import io.trino.spi.connector.ConnectorSecurityContext; +import io.trino.spi.connector.ConnectorSplitAddressProvider; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.SchemaRoutineName; import io.trino.spi.connector.SystemTable; @@ -75,6 +76,7 @@ public class ConnectorServices private final Optional functionProvider; private final CatalogTableFunctions tableFunctions; private final Optional splitManager; + private final ConnectorSplitAddressProvider splitAddressProvider; private final Optional pageSourceProviderFactory; private final Optional pageSinkProvider; private final Optional indexProvider; @@ -125,6 +127,7 @@ public ConnectorServices(Tracer tracer, CatalogHandle catalogHandle, Connector c catch (UnsupportedOperationException _) { } this.splitManager = Optional.ofNullable(splitManager); + this.splitAddressProvider = requireNonNull(connector.getSplitAddressProvider(), format("Connector '%s' returned a null split address provider", catalogHandle)); ConnectorPageSourceProviderFactory connectorPageSourceProviderFactory = null; try { @@ -264,6 +267,11 @@ public Optional getSplitManager() return splitManager; } + public ConnectorSplitAddressProvider getSplitAddressProvider() + { + return splitAddressProvider; + } + public Optional getPageSourceProviderFactory() { return pageSourceProviderFactory; diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/ExchangeSplitSource.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/ExchangeSplitSource.java index 896b8db14306..76f2e6d87aff 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/ExchangeSplitSource.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/ExchangeSplitSource.java @@ -98,7 +98,7 @@ private List createRemoteSplits(List handles) private static Split createRemoteSplit(List handles) { - return new Split(REMOTE_CATALOG_HANDLE, new RemoteSplit(new SpoolingExchangeInput(handles, Optional.empty()))); + return new Split(REMOTE_CATALOG_HANDLE, new RemoteSplit(new SpoolingExchangeInput(handles, Optional.empty())), ImmutableList.of(), true); } @Override diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java index 28070baf6515..8e44ee38f11e 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedStageExecution.java @@ -593,7 +593,7 @@ private static Split createExchangeSplit(RemoteTask sourceTask, RemoteTask desti // Fetch the results from the buffer assigned to the task based on id URI exchangeLocation = sourceTask.getTaskStatus().self(); URI splitLocation = uriBuilderFrom(exchangeLocation).appendPath("results").appendPath(String.valueOf(destinationTask.getTaskId().partitionId())).build(); - return new Split(REMOTE_CATALOG_HANDLE, new RemoteSplit(new DirectExchangeInput(sourceTask.getTaskId(), splitLocation.toString()))); + return new Split(REMOTE_CATALOG_HANDLE, new RemoteSplit(new DirectExchangeInput(sourceTask.getTaskId(), splitLocation.toString())), ImmutableList.of(), true); } private static class PipelinedStageStateMachine diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java index c03d9173d626..837c519bb669 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java @@ -13,6 +13,7 @@ */ package io.trino.execution.scheduler; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; @@ -264,7 +265,9 @@ else if (pendingSplits.isEmpty()) { // Scheduling an empty split kicks off necessary driver instantiation to make this work. pendingSplits.add(new Split( splitSource.getCatalogHandle(), - new EmptySplit(splitSource.getCatalogHandle()))); + new EmptySplit(splitSource.getCatalogHandle()), + ImmutableList.of(), + true)); } log.debug("stage id: %s, node: %s; transitioning to SPLITS_SCHEDULED", stageExecution.getStageId(), partitionedNode); state = State.SPLITS_SCHEDULED; diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index e5654908d5b6..ecdb6853618c 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -3104,7 +3104,7 @@ public String getDebugInfo() private static Split createOutputSelectorSplit(ExchangeSourceOutputSelector selector) { - return new Split(REMOTE_CATALOG_HANDLE, new RemoteSplit(new SpoolingExchangeInput(ImmutableList.of(), Optional.of(selector)))); + return new Split(REMOTE_CATALOG_HANDLE, new RemoteSplit(new SpoolingExchangeInput(ImmutableList.of(), Optional.of(selector))), ImmutableList.of(), true); } private static class OpenTaskDescriptor diff --git a/core/trino-main/src/main/java/io/trino/metadata/Split.java b/core/trino-main/src/main/java/io/trino/metadata/Split.java index 240071a4a76b..2bae75745a1a 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/Split.java +++ b/core/trino-main/src/main/java/io/trino/metadata/Split.java @@ -14,7 +14,9 @@ package io.trino.metadata; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; import io.trino.connector.CatalogHandle; import io.trino.spi.HostAddress; import io.trino.spi.SplitWeight; @@ -23,6 +25,8 @@ import java.util.List; import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static io.airlift.slice.SizeOf.instanceSize; import static java.util.Objects.requireNonNull; @@ -32,14 +36,24 @@ public final class Split private final CatalogHandle catalogHandle; private final ConnectorSplit connectorSplit; + private final List addresses; + private final boolean remotelyAccessible; @JsonCreator public Split( @JsonProperty("catalogHandle") CatalogHandle catalogHandle, @JsonProperty("connectorSplit") ConnectorSplit connectorSplit) { + this(catalogHandle, connectorSplit, ImmutableList.of(), true); + } + + public Split(CatalogHandle catalogHandle, ConnectorSplit connectorSplit, List addresses, boolean remotelyAccessible) + { + checkArgument(remotelyAccessible || !addresses.isEmpty(), "addresses must be provided when remotelyAccessible=false"); this.catalogHandle = requireNonNull(catalogHandle, "catalogHandle is null"); this.connectorSplit = requireNonNull(connectorSplit, "connectorSplit is null"); + this.addresses = ImmutableList.copyOf(addresses); + this.remotelyAccessible = remotelyAccessible; } @JsonProperty @@ -54,14 +68,18 @@ public ConnectorSplit getConnectorSplit() return connectorSplit; } + // do not serialize addresses as they are not needed on workers + @JsonIgnore public List getAddresses() { - return connectorSplit.getAddresses(); + return addresses; } + // do not serialize remotelyAccessible as it is not needed on workers + @JsonIgnore public boolean isRemotelyAccessible() { - return connectorSplit.isRemotelyAccessible(); + return remotelyAccessible; } public SplitWeight getSplitWeight() @@ -82,6 +100,7 @@ public long getRetainedSizeInBytes() { return INSTANCE_SIZE + catalogHandle.getRetainedSizeInBytes() - + connectorSplit.getRetainedSizeInBytes(); + + connectorSplit.getRetainedSizeInBytes() + + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes); } } diff --git a/core/trino-main/src/main/java/io/trino/operator/index/IndexLoader.java b/core/trino-main/src/main/java/io/trino/operator/index/IndexLoader.java index fe21db5b63d0..79129d61c71d 100644 --- a/core/trino-main/src/main/java/io/trino/operator/index/IndexLoader.java +++ b/core/trino-main/src/main/java/io/trino/operator/index/IndexLoader.java @@ -234,7 +234,7 @@ private IndexedData streamIndexDataForSingleKey(UpdateRequest updateRequest) PageRecordSet pageRecordSet = new PageRecordSet(keyTypes, indexKeyTuple); PlanNodeId planNodeId = driverFactory.getSourceId().get(); - ScheduledSplit split = new ScheduledSplit(0, planNodeId, new Split(INDEX_CATALOG_HANDLE, new IndexSplit(pageRecordSet))); + ScheduledSplit split = new ScheduledSplit(0, planNodeId, new Split(INDEX_CATALOG_HANDLE, new IndexSplit(pageRecordSet), ImmutableList.of(), true)); driver.updateSplitAssignment(new SplitAssignment(planNodeId, ImmutableSet.of(split), true)); return new StreamingIndexedData(outputTypes, keyEqualOperators, indexKeyTuple, pageBuffer, driver); @@ -323,7 +323,7 @@ public boolean load(List requests) // Drive index lookup to produce the output (landing in indexSnapshotBuilder) try (Driver driver = driverFactory.createDriver(pipelineContext.addDriverContext())) { PlanNodeId sourcePlanNodeId = driverFactory.getSourceId().get(); - ScheduledSplit split = new ScheduledSplit(0, sourcePlanNodeId, new Split(INDEX_CATALOG_HANDLE, new IndexSplit(recordSetForLookupSource))); + ScheduledSplit split = new ScheduledSplit(0, sourcePlanNodeId, new Split(INDEX_CATALOG_HANDLE, new IndexSplit(recordSetForLookupSource), ImmutableList.of(), true)); driver.updateSplitAssignment(new SplitAssignment(sourcePlanNodeId, ImmutableSet.of(split), true)); while (!driver.isFinished()) { ListenableFuture process = driver.processUntilBlocked(); diff --git a/core/trino-main/src/main/java/io/trino/split/ConnectorAwareSplitSource.java b/core/trino-main/src/main/java/io/trino/split/ConnectorAwareSplitSource.java index e69f9b6397ef..3842e0319519 100644 --- a/core/trino-main/src/main/java/io/trino/split/ConnectorAwareSplitSource.java +++ b/core/trino-main/src/main/java/io/trino/split/ConnectorAwareSplitSource.java @@ -20,6 +20,7 @@ import io.trino.connector.CatalogHandle; import io.trino.metadata.Split; import io.trino.spi.connector.ConnectorSplit; +import io.trino.spi.connector.ConnectorSplitAddressProvider; import io.trino.spi.connector.ConnectorSplitSource; import io.trino.spi.connector.ConnectorSplitSource.ConnectorSplitBatch; import io.trino.spi.metrics.Metrics; @@ -47,6 +48,7 @@ public class ConnectorAwareSplitSource implements SplitSource { private final CatalogHandle catalogHandle; + private final ConnectorSplitAddressProvider addressProvider; private final String sourceToString; @Nullable @@ -55,10 +57,11 @@ public class ConnectorAwareSplitSource private Optional>> tableExecuteSplitsInfo = Optional.empty(); private Metrics metrics = Metrics.EMPTY; - public ConnectorAwareSplitSource(CatalogHandle catalogHandle, ConnectorSplitSource source) + public ConnectorAwareSplitSource(CatalogHandle catalogHandle, ConnectorSplitSource source, ConnectorSplitAddressProvider addressProvider) { this.catalogHandle = requireNonNull(catalogHandle, "catalogHandle is null"); this.source = requireNonNull(source, "source is null"); + this.addressProvider = requireNonNull(addressProvider, "addressProvider is null"); this.sourceToString = source.toString(); } @@ -77,7 +80,11 @@ public ListenableFuture getNextBatch(int maxSize) List connectorSplits = splitBatch.getSplits(); ImmutableList.Builder result = ImmutableList.builderWithExpectedSize(connectorSplits.size()); for (ConnectorSplit connectorSplit : connectorSplits) { - result.add(new Split(catalogHandle, connectorSplit)); + result.add(new Split( + catalogHandle, + connectorSplit, + addressProvider.getAddresses(connectorSplit), + addressProvider.isRemotelyAccessible(connectorSplit))); } boolean noMoreSplits = splitBatch.isNoMoreSplits(); if (noMoreSplits) { diff --git a/core/trino-main/src/main/java/io/trino/split/SplitManager.java b/core/trino-main/src/main/java/io/trino/split/SplitManager.java index 01ed82f230ce..d2a95a299141 100644 --- a/core/trino-main/src/main/java/io/trino/split/SplitManager.java +++ b/core/trino-main/src/main/java/io/trino/split/SplitManager.java @@ -25,6 +25,7 @@ import io.trino.metadata.TableFunctionHandle; import io.trino.metadata.TableHandle; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorSplitAddressProvider; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorSplitSource; import io.trino.spi.connector.Constraint; @@ -45,15 +46,21 @@ public class SplitManager { private final CatalogServiceProvider splitManagerProvider; + private final CatalogServiceProvider addressProviderProvider; private final Tracer tracer; private final int minScheduleSplitBatchSize; private final ExecutorService executorService; private final Executor executor; @Inject - public SplitManager(CatalogServiceProvider splitManagerProvider, Tracer tracer, QueryManagerConfig config) + public SplitManager( + CatalogServiceProvider splitManagerProvider, + CatalogServiceProvider addressProviderProvider, + Tracer tracer, + QueryManagerConfig config) { this.splitManagerProvider = requireNonNull(splitManagerProvider, "splitManagerProvider is null"); + this.addressProviderProvider = requireNonNull(addressProviderProvider, "addressProviderProvider is null"); this.tracer = requireNonNull(tracer, "tracer is null"); this.minScheduleSplitBatchSize = config.getMinScheduleSplitBatchSize(); this.executorService = newCachedThreadPool(daemonThreadsNamed("splits-manager-callback-%s")); @@ -75,6 +82,7 @@ public SplitSource getSplits( { CatalogHandle catalogHandle = table.catalogHandle(); ConnectorSplitManager splitManager = splitManagerProvider.getService(catalogHandle); + ConnectorSplitAddressProvider addressProvider = addressProviderProvider.getService(catalogHandle); if (!isAllowPushdownIntoConnectors(session)) { dynamicFilter = DynamicFilter.EMPTY; } @@ -94,7 +102,7 @@ public SplitSource getSplits( constraint); } - SplitSource splitSource = new ConnectorAwareSplitSource(catalogHandle, source); + SplitSource splitSource = new ConnectorAwareSplitSource(catalogHandle, source, addressProvider); Span span = splitSourceSpan(parentSpan, catalogHandle); @@ -114,6 +122,7 @@ public SplitSource getSplits(Session session, Span parentSpan, TableFunctionHand { CatalogHandle catalogHandle = function.catalogHandle(); ConnectorSplitManager splitManager = splitManagerProvider.getService(catalogHandle); + ConnectorSplitAddressProvider addressProvider = addressProviderProvider.getService(catalogHandle); ConnectorSplitSource source; try (var ignore = scopedSpan(tracer.spanBuilder("SplitManager.getSplits") @@ -126,7 +135,7 @@ public SplitSource getSplits(Session session, Span parentSpan, TableFunctionHand function.functionHandle()); } - SplitSource splitSource = new ConnectorAwareSplitSource(catalogHandle, source); + SplitSource splitSource = new ConnectorAwareSplitSource(catalogHandle, source, addressProvider); Span span = splitSourceSpan(parentSpan, catalogHandle); return new TracingSplitSource(splitSource, tracer, Optional.of(span), "split-buffer"); diff --git a/core/trino-main/src/main/java/io/trino/testing/PlanTester.java b/core/trino-main/src/main/java/io/trino/testing/PlanTester.java index 8b801287222a..a25d60323348 100644 --- a/core/trino-main/src/main/java/io/trino/testing/PlanTester.java +++ b/core/trino-main/src/main/java/io/trino/testing/PlanTester.java @@ -254,6 +254,7 @@ import static io.trino.connector.CatalogServiceProviderModule.createPageSinkProvider; import static io.trino.connector.CatalogServiceProviderModule.createPageSourceProviderFactory; import static io.trino.connector.CatalogServiceProviderModule.createSchemaPropertyManager; +import static io.trino.connector.CatalogServiceProviderModule.createSplitAddressProviderProvider; import static io.trino.connector.CatalogServiceProviderModule.createSplitManagerProvider; import static io.trino.connector.CatalogServiceProviderModule.createTableFunctionProvider; import static io.trino.connector.CatalogServiceProviderModule.createTableProceduresPropertyManager; @@ -429,7 +430,7 @@ TypeSignature.class, new TypeSignatureDeserializer(), nodeSchedulerConfig, optimizerConfig, secretsResolver)); - this.splitManager = new SplitManager(createSplitManagerProvider(catalogManager), tracer, new QueryManagerConfig()); + this.splitManager = new SplitManager(createSplitManagerProvider(catalogManager), createSplitAddressProviderProvider(catalogManager), tracer, new QueryManagerConfig()); this.pageSourceManager = new PageSourceManager(createPageSourceProviderFactory(catalogManager)); this.pageSinkManager = new PageSinkManager(createPageSinkProvider(catalogManager)); this.indexManager = new IndexManager(createIndexProvider(catalogManager)); diff --git a/core/trino-main/src/test/java/io/trino/execution/BenchmarkNodeScheduler.java b/core/trino-main/src/test/java/io/trino/execution/BenchmarkNodeScheduler.java index 9534c4e72b55..22e65bb07030 100644 --- a/core/trino-main/src/test/java/io/trino/execution/BenchmarkNodeScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/BenchmarkNodeScheduler.java @@ -156,7 +156,8 @@ public void setup() InternalNode node = nodes.get(i); ImmutableList.Builder initialSplits = ImmutableList.builder(); for (int j = 0; j < MAX_SPLITS_PER_NODE + MAX_PENDING_SPLITS_PER_TASK_PER_NODE; j++) { - initialSplits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote(i))); + TestSplitRemote s = new TestSplitRemote(i); + initialSplits.add(new Split(TEST_CATALOG_HANDLE, s, s.getAddresses(), true)); } TaskId taskId = new TaskId(new StageId("test", 1), i, 0); MockRemoteTaskFactory.MockRemoteTask remoteTask = remoteTaskFactory.createTableScanTask(taskId, node, initialSplits.build(), nodeTaskMap.createPartitionedSplitCountTracker(node, taskId)); @@ -165,7 +166,8 @@ public void setup() } for (int i = 0; i < SPLITS; i++) { - splits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote(ThreadLocalRandom.current().nextInt(DATA_NODES)))); + TestSplitRemote bs = new TestSplitRemote(ThreadLocalRandom.current().nextInt(DATA_NODES)); + splits.add(new Split(TEST_CATALOG_HANDLE, bs, bs.getAddresses(), true)); } NodeScheduler nodeScheduler = new NodeScheduler(getNodeSelectorFactory(nodeTaskMap)); diff --git a/core/trino-main/src/test/java/io/trino/execution/TaskTestUtils.java b/core/trino-main/src/test/java/io/trino/execution/TaskTestUtils.java index 45d51f6e2bc4..283d59ba8883 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TaskTestUtils.java +++ b/core/trino-main/src/test/java/io/trino/execution/TaskTestUtils.java @@ -33,6 +33,7 @@ import io.trino.operator.index.IndexManager; import io.trino.server.protocol.spooling.QueryDataEncoders; import io.trino.server.protocol.spooling.SpoolingEnabledConfig; +import io.trino.spi.HostAddress; import io.trino.spi.NodeVersion; import io.trino.spi.predicate.TupleDomain; import io.trino.spiller.GenericSpillerFactory; @@ -82,7 +83,7 @@ private TaskTestUtils() {} private static final CatalogHandle CATALOG_HANDLE = TEST_TABLE_HANDLE.catalogHandle(); - public static final ScheduledSplit SPLIT = new ScheduledSplit(0, TABLE_SCAN_NODE_ID, new Split(CATALOG_HANDLE, TestingSplit.createLocalSplit())); + public static final ScheduledSplit SPLIT = new ScheduledSplit(0, TABLE_SCAN_NODE_ID, new Split(CATALOG_HANDLE, TestingSplit.createLocalSplit(), ImmutableList.of(HostAddress.fromString("127.0.0.1")), false)); public static final List EMPTY_SPLIT_ASSIGNMENTS = ImmutableList.of(); diff --git a/core/trino-main/src/test/java/io/trino/execution/TestNodeScheduler.java b/core/trino-main/src/test/java/io/trino/execution/TestNodeScheduler.java index 9606a1447c11..400b75e1c19e 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestNodeScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestNodeScheduler.java @@ -151,7 +151,8 @@ public void tearDown() @Test public void testAssignmentWhenNoNodes() { - assertTrinoExceptionThrownBy(() -> computeSingleAssignment(nodeSelector, new Split(TEST_CATALOG_HANDLE, new TestSplitRemote()))) + TestSplitRemote noNodesSplit = new TestSplitRemote(); + assertTrinoExceptionThrownBy(() -> computeSingleAssignment(nodeSelector, new Split(TEST_CATALOG_HANDLE, noNodesSplit, noNodesSplit.getAddresses(), true))) .hasErrorCode(NO_NODES_AVAILABLE) .hasMessageMatching("No nodes available to run query"); } @@ -160,7 +161,8 @@ public void testAssignmentWhenNoNodes() public void testScheduleLocal() { setUpNodes(); - Split split = new Split(TEST_CATALOG_HANDLE, new TestSplitLocallyAccessible()); + TestSplitLocallyAccessible locallyAccessibleConnSplit = new TestSplitLocallyAccessible(); + Split split = new Split(TEST_CATALOG_HANDLE, locallyAccessibleConnSplit, locallyAccessibleConnSplit.getAddresses(), locallyAccessibleConnSplit.isRemotelyAccessible()); Map.Entry assignment = getOnlyElement(computeSingleAssignment(nodeSelector, split).entries()); assertThat(assignment.getKey().getHostAndPort()).isEqualTo(split.getAddresses().get(0)); assertThat(assignment.getValue()).isEqualTo(split); @@ -197,7 +199,7 @@ public void testTopologyAwareScheduling() // Fill up the nodes with non-local data ImmutableSet.Builder nonRackLocalBuilder = ImmutableSet.builder(); for (int i = 0; i < (25 + 11) * 3; i++) { - nonRackLocalBuilder.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote(HostAddress.fromParts("data.other_rack", 1)))); + nonRackLocalBuilder.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote(HostAddress.fromParts("data.other_rack", 1)), ImmutableList.of(HostAddress.fromParts("data.other_rack", 1)), true)); } Set nonRackLocalSplits = nonRackLocalBuilder.build(); Multimap assignments = nodeSelector.computeAssignments(nonRackLocalSplits, ImmutableList.copyOf(taskMap.values())).getAssignments(); @@ -229,10 +231,10 @@ public void testTopologyAwareScheduling() HostAddress dataHost1 = HostAddress.fromParts("data.rack1", 1); HostAddress dataHost2 = HostAddress.fromParts("data.rack2", 1); for (int i = 0; i < 6 * 2; i++) { - rackLocalSplits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote(dataHost1))); + rackLocalSplits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote(dataHost1), ImmutableList.of(dataHost1), true)); } for (int i = 0; i < 6; i++) { - rackLocalSplits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote(dataHost2))); + rackLocalSplits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote(dataHost2), ImmutableList.of(dataHost2), true)); } assignments = nodeSelector.computeAssignments(rackLocalSplits.build(), ImmutableList.copyOf(taskMap.values())).getAssignments(); for (InternalNode node : assignments.keySet()) { @@ -273,9 +275,9 @@ public void testTopologyAwareScheduling() // Assign local splits ImmutableSet.Builder localSplits = ImmutableSet.builder(); - localSplits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote(HostAddress.fromParts("host1.rack1", 1)))); - localSplits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote(HostAddress.fromParts("host2.rack1", 1)))); - localSplits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote(HostAddress.fromParts("host3.rack2", 1)))); + localSplits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote(HostAddress.fromParts("host1.rack1", 1)), ImmutableList.of(HostAddress.fromParts("host1.rack1", 1)), true)); + localSplits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote(HostAddress.fromParts("host2.rack1", 1)), ImmutableList.of(HostAddress.fromParts("host2.rack1", 1)), true)); + localSplits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote(HostAddress.fromParts("host3.rack2", 1)), ImmutableList.of(HostAddress.fromParts("host3.rack2", 1)), true)); assignments = nodeSelector.computeAssignments(localSplits.build(), ImmutableList.copyOf(taskMap.values())).getAssignments(); assertThat(assignments.size()).isEqualTo(3); assertThat(assignments.keySet().size()).isEqualTo(3); @@ -285,7 +287,8 @@ public void testTopologyAwareScheduling() public void testScheduleRemote() { setUpNodes(); - Multimap assignments = computeSingleAssignment(nodeSelector, new Split(TEST_CATALOG_HANDLE, new TestSplitRemote())); + TestSplitRemote remoteConnSplit = new TestSplitRemote(); + Multimap assignments = computeSingleAssignment(nodeSelector, new Split(TEST_CATALOG_HANDLE, remoteConnSplit, remoteConnSplit.getAddresses(), true)); assertThat(assignments.size()).isEqualTo(1); } @@ -300,7 +303,8 @@ public void testBasicAssignment() // One split for each node Set splits = new HashSet<>(); for (int i = 0; i < activeCatalogNodes.size(); i++) { - splits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote())); + TestSplitRemote s = new TestSplitRemote(); + splits.add(new Split(TEST_CATALOG_HANDLE, s, s.getAddresses(), true)); } Multimap assignments = nodeSelector.computeAssignments(splits, ImmutableList.copyOf(taskMap.values())).getAssignments(); assertThat(assignments.entries()).hasSize(assignments.size()); @@ -318,7 +322,8 @@ public void testMaxSplitsPerNode() ImmutableList.Builder initialSplits = ImmutableList.builder(); for (int i = 0; i < 10; i++) { - initialSplits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote())); + TestSplitRemote s = new TestSplitRemote(); + initialSplits.add(new Split(TEST_CATALOG_HANDLE, s, s.getAddresses(), true)); } MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(remoteTaskExecutor, remoteTaskScheduledExecutor); @@ -333,7 +338,8 @@ public void testMaxSplitsPerNode() Set splits = new HashSet<>(); for (int i = 0; i < 5; i++) { - splits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote())); + TestSplitRemote s = new TestSplitRemote(); + splits.add(new Split(TEST_CATALOG_HANDLE, s, s.getAddresses(), true)); } Multimap assignments = nodeSelector.computeAssignments(splits, ImmutableList.copyOf(taskMap.values())).getAssignments(); @@ -359,7 +365,8 @@ public void testBasicAssignmentMaxUnacknowledgedSplitsPerTask() int splitCount = activeCatalogNodes.size() + 1; Set splits = new HashSet<>(); for (int i = 0; i < splitCount; i++) { - splits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote())); + TestSplitRemote s = new TestSplitRemote(); + splits.add(new Split(TEST_CATALOG_HANDLE, s, s.getAddresses(), true)); } Multimap assignments = nodeSelector.computeAssignments(splits, ImmutableList.copyOf(taskMap.values())).getAssignments(); assertThat(assignments.entries()).hasSize(activeCatalogNodes.size()); @@ -377,7 +384,8 @@ public void testMaxSplitsPerNodePerTask() ImmutableList.Builder initialSplits = ImmutableList.builder(); for (int i = 0; i < 20; i++) { - initialSplits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote())); + TestSplitRemote s = new TestSplitRemote(); + initialSplits.add(new Split(TEST_CATALOG_HANDLE, s, s.getAddresses(), true)); } List tasks = new ArrayList<>(); @@ -399,7 +407,8 @@ public void testMaxSplitsPerNodePerTask() Set splits = new HashSet<>(); for (int i = 0; i < 5; i++) { - splits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote())); + TestSplitRemote s = new TestSplitRemote(); + splits.add(new Split(TEST_CATALOG_HANDLE, s, s.getAddresses(), true)); } Multimap assignments = nodeSelector.computeAssignments(splits, ImmutableList.copyOf(taskMap.values())).getAssignments(); @@ -422,10 +431,11 @@ public void testTaskCompletion() MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(remoteTaskExecutor, remoteTaskScheduledExecutor); InternalNode chosenNode = Iterables.get(nodeManager.getNodes(ACTIVE), 0); TaskId taskId = new TaskId(new StageId("test", 1), 1, 0); + TestSplitRemote taskCompletionSplit = new TestSplitRemote(); RemoteTask remoteTask = remoteTaskFactory.createTableScanTask( taskId, chosenNode, - ImmutableList.of(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote())), + ImmutableList.of(new Split(TEST_CATALOG_HANDLE, taskCompletionSplit, taskCompletionSplit.getAddresses(), true)), nodeTaskMap.createPartitionedSplitCountTracker(chosenNode, taskId)); nodeTaskMap.addTask(chosenNode, remoteTask); assertThat(nodeTaskMap.getPartitionedSplitsOnNode(chosenNode).getCount()).isEqualTo(1); @@ -445,18 +455,21 @@ public void testSplitCount() InternalNode chosenNode = Iterables.get(nodeManager.getNodes(ACTIVE), 0); TaskId taskId1 = new TaskId(new StageId("test", 1), 1, 0); + TestSplitRemote sc1a = new TestSplitRemote(); + TestSplitRemote sc1b = new TestSplitRemote(); RemoteTask remoteTask1 = remoteTaskFactory.createTableScanTask(taskId1, chosenNode, ImmutableList.of( - new Split(TEST_CATALOG_HANDLE, new TestSplitRemote()), - new Split(TEST_CATALOG_HANDLE, new TestSplitRemote())), + new Split(TEST_CATALOG_HANDLE, sc1a, sc1a.getAddresses(), true), + new Split(TEST_CATALOG_HANDLE, sc1b, sc1b.getAddresses(), true)), nodeTaskMap.createPartitionedSplitCountTracker(chosenNode, taskId1)); TaskId taskId2 = new TaskId(new StageId("test", 1), 2, 0); + TestSplitRemote sc2 = new TestSplitRemote(); RemoteTask remoteTask2 = remoteTaskFactory.createTableScanTask( taskId2, chosenNode, - ImmutableList.of(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote())), + ImmutableList.of(new Split(TEST_CATALOG_HANDLE, sc2, sc2.getAddresses(), true)), nodeTaskMap.createPartitionedSplitCountTracker(chosenNode, taskId2)); nodeTaskMap.addTask(chosenNode, remoteTask1); @@ -480,7 +493,7 @@ public void testOptimizedLocalScheduling() Set splits = new LinkedHashSet<>(); // 20 splits with node1 as local node to be assigned in the first iteration of computeAssignments for (int i = 0; i < 20; i++) { - splits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitLocal())); + splits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitLocal(), ImmutableList.of(HostAddress.fromString("10.0.0.1:11")), false)); } // computeAssignments just returns a mapping of nodes with splits to be assigned, it does not assign splits Multimap assignments1 = nodeSelector.computeAssignments(splits, ImmutableList.copyOf(taskMap.values())).getAssignments(); @@ -491,7 +504,7 @@ public void testOptimizedLocalScheduling() // 19 splits with node2 as local node to be assigned in the first iteration of computeAssignments for (int i = 0; i < 19; i++) { - splits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote(HostAddress.fromString("10.0.0.1:12")))); + splits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote(HostAddress.fromString("10.0.0.1:12")), ImmutableList.of(HostAddress.fromString("10.0.0.1:12")), true)); } Multimap assignments2 = nodeSelector.computeAssignments(splits, ImmutableList.copyOf(taskMap.values())).getAssignments(); // Check that all 39 splits are being assigned (20 splits assigned to node1 and 19 splits assigned to node2) @@ -513,9 +526,9 @@ public void testOptimizedLocalScheduling() assertThat(node2Splits).isEqualTo(19); // 1 split with node1 as local node - splits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitLocal())); + splits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitLocal(), ImmutableList.of(HostAddress.fromString("10.0.0.1:11")), false)); // 1 split with node2 as local node - splits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote(HostAddress.fromString("10.0.0.1:12")))); + splits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote(HostAddress.fromString("10.0.0.1:12")), ImmutableList.of(HostAddress.fromString("10.0.0.1:12")), true)); //splits now contains 41 splits : 21 with node1 as local node and 20 with node2 as local node Multimap assignments3 = nodeSelector.computeAssignments(splits, ImmutableList.copyOf(taskMap.values())).getAssignments(); // Check that only 40 splits are being assigned as there is a single task @@ -548,7 +561,8 @@ public void testMaxUnacknowledgedSplitsPerTask() setUpNodes(); ImmutableList.Builder initialSplits = ImmutableList.builder(); for (int i = 0; i < maxUnacknowledgedSplitsPerTask; i++) { - initialSplits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote())); + TestSplitRemote s = new TestSplitRemote(); + initialSplits.add(new Split(TEST_CATALOG_HANDLE, s, s.getAddresses(), true)); } List nodes = nodeManager.getNodes(ACTIVE).stream() @@ -571,7 +585,8 @@ public void testMaxUnacknowledgedSplitsPerTask() // One split per node Set splits = new HashSet<>(); for (int i = 0; i < nodes.size(); i++) { - splits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote())); + TestSplitRemote s = new TestSplitRemote(); + splits.add(new Split(TEST_CATALOG_HANDLE, s, s.getAddresses(), true)); } SplitPlacementResult splitPlacements = nodeSelector.computeAssignments(splits, ImmutableList.copyOf(tasks)); // No splits should have been placed, max unacknowledged was already reached @@ -617,10 +632,10 @@ public void testTopologyAwareFailover() NodeScheduler nodeScheduler = new NodeScheduler(nodeSelectorFactory); NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session); - Split rigidSplit = new Split(TEST_CATALOG_HANDLE, new TestSplitLocal(HostAddress.fromString("host99.rack1:11"))); + Split rigidSplit = new Split(TEST_CATALOG_HANDLE, new TestSplitLocal(HostAddress.fromString("host99.rack1:11")), ImmutableList.of(HostAddress.fromString("host99.rack1:11")), false); assertThatThrownBy(() -> computeSingleAssignment(nodeSelector, rigidSplit)).hasMessageContaining("No nodes available"); - Split flexibleSplit = new Split(TEST_CATALOG_HANDLE, new TestSplitRemote(HostAddress.fromString("host99.rack1:11"))); + Split flexibleSplit = new Split(TEST_CATALOG_HANDLE, new TestSplitRemote(HostAddress.fromString("host99.rack1:11")), ImmutableList.of(HostAddress.fromString("host99.rack1:11")), true); Assertions.assertThat(computeSingleAssignment(nodeSelector, flexibleSplit)).containsValues(flexibleSplit); } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestSqlStage.java b/core/trino-main/src/test/java/io/trino/execution/TestSqlStage.java index 737de712baca..6a3eb2731fcd 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestSqlStage.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestSqlStage.java @@ -142,7 +142,7 @@ private void testFinalStageInfoInternal() Future addTasksTask = executor.submit(() -> { try { PlanNodeId planNodeId = stage.getFragment().getPartitionedSources().get(0); - ImmutableListMultimap initialSplits = ImmutableListMultimap.of(planNodeId, new Split(TEST_CATALOG_HANDLE, new TestingSplit(true, ImmutableList.of()))); + ImmutableListMultimap initialSplits = ImmutableListMultimap.of(planNodeId, new Split(TEST_CATALOG_HANDLE, new TestingSplit(true, ImmutableList.of()), ImmutableList.of(), true)); for (int i = 0; i < 1_000_000; i++) { if (Thread.interrupted()) { return; diff --git a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java index 8f5ef4202773..cf7f789e54d0 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java @@ -299,7 +299,7 @@ public void abort() private ScheduledSplit newScheduledSplit(int sequenceId, PlanNodeId planNodeId, int begin, int count) { - return new ScheduledSplit(sequenceId, planNodeId, new Split(TEST_CATALOG_HANDLE, new TestingSplit(begin, begin + count))); + return new ScheduledSplit(sequenceId, planNodeId, new Split(TEST_CATALOG_HANDLE, new TestingSplit(begin, begin + count), ImmutableList.of(), true)); } public static class Pauser diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestMultiSourcePartitionedScheduler.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestMultiSourcePartitionedScheduler.java index 9b600ea04f2c..7e0df81fbb50 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestMultiSourcePartitionedScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestMultiSourcePartitionedScheduler.java @@ -41,6 +41,7 @@ import io.trino.spi.NodeVersion; import io.trino.spi.QueryId; import io.trino.spi.connector.ConnectorSplit; +import io.trino.spi.connector.ConnectorSplitAddressProvider; import io.trino.spi.connector.ConnectorSplitSource; import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.FixedSplitSource; @@ -501,7 +502,7 @@ private StageScheduler prepareScheduler( { Map sources = splitSources.entrySet() .stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> new ConnectorAwareSplitSource(TEST_CATALOG_HANDLE, e.getValue()))); + .collect(Collectors.toMap(Map.Entry::getKey, e -> new ConnectorAwareSplitSource(TEST_CATALOG_HANDLE, e.getValue(), ConnectorSplitAddressProvider.DEFAULT))); return new MultiSourcePartitionedScheduler( stage, sources, diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestSourcePartitionedScheduler.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestSourcePartitionedScheduler.java index 00c938339122..c5c08dced67c 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestSourcePartitionedScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestSourcePartitionedScheduler.java @@ -42,6 +42,7 @@ import io.trino.spi.NodeVersion; import io.trino.spi.QueryId; import io.trino.spi.connector.ConnectorSplit; +import io.trino.spi.connector.ConnectorSplitAddressProvider; import io.trino.spi.connector.ConnectorSplitSource; import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.FixedSplitSource; @@ -373,7 +374,7 @@ public void testNoNodes() StageScheduler scheduler = newSourcePartitionedSchedulerAsStageScheduler( stage, TABLE_SCAN_NODE_ID, - new ConnectorAwareSplitSource(TEST_CATALOG_HANDLE, createFixedSplitSource(20, TestingSplit::createRemoteSplit)), + new ConnectorAwareSplitSource(TEST_CATALOG_HANDLE, createFixedSplitSource(20, TestingSplit::createRemoteSplit), ConnectorSplitAddressProvider.DEFAULT), new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(session), stage::getAllTasks), 2, new DynamicFilterService(metadata, functionManager, typeOperators, new DynamicFilterConfig()), @@ -511,7 +512,7 @@ public void testNewTaskScheduledWhenChildStageBufferIsUnderutilized() StageScheduler scheduler = newSourcePartitionedSchedulerAsStageScheduler( stage, TABLE_SCAN_NODE_ID, - new ConnectorAwareSplitSource(TEST_CATALOG_HANDLE, createFixedSplitSource(4 * 300, TestingSplit::createRemoteSplit)), + new ConnectorAwareSplitSource(TEST_CATALOG_HANDLE, createFixedSplitSource(4 * 300, TestingSplit::createRemoteSplit), ConnectorSplitAddressProvider.DEFAULT), new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(session), stage::getAllTasks), 4 * 300, new DynamicFilterService(metadata, functionManager, typeOperators, new DynamicFilterConfig()), @@ -554,7 +555,7 @@ public void testNoNewTaskScheduledWhenChildStageBufferIsOverutilized() StageScheduler scheduler = newSourcePartitionedSchedulerAsStageScheduler( stage, TABLE_SCAN_NODE_ID, - new ConnectorAwareSplitSource(TEST_CATALOG_HANDLE, createFixedSplitSource(3 * 300, TestingSplit::createRemoteSplit)), + new ConnectorAwareSplitSource(TEST_CATALOG_HANDLE, createFixedSplitSource(3 * 300, TestingSplit::createRemoteSplit), ConnectorSplitAddressProvider.DEFAULT), new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(session), stage::getAllTasks), 3 * 300, new DynamicFilterService(metadata, functionManager, typeOperators, new DynamicFilterConfig()), @@ -596,7 +597,7 @@ public void testDynamicFiltersUnblockedOnBlockedBuildSource() StageScheduler scheduler = newSourcePartitionedSchedulerAsStageScheduler( stage, TABLE_SCAN_NODE_ID, - new ConnectorAwareSplitSource(TEST_CATALOG_HANDLE, createBlockedSplitSource()), + new ConnectorAwareSplitSource(TEST_CATALOG_HANDLE, createBlockedSplitSource(), ConnectorSplitAddressProvider.DEFAULT), new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(session), stage::getAllTasks), 2, dynamicFilterService, @@ -666,7 +667,7 @@ private StageScheduler getSourcePartitionedScheduler( return newSourcePartitionedSchedulerAsStageScheduler( stage, TABLE_SCAN_NODE_ID, - new ConnectorAwareSplitSource(TEST_CATALOG_HANDLE, splitSource), + new ConnectorAwareSplitSource(TEST_CATALOG_HANDLE, splitSource, ConnectorSplitAddressProvider.DEFAULT), placementPolicy, splitBatchSize, new DynamicFilterService(metadata, functionManager, typeOperators, new DynamicFilterConfig()), diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestUniformNodeSelector.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestUniformNodeSelector.java index 42f7485ae7e8..32c4dce2cae8 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestUniformNodeSelector.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestUniformNodeSelector.java @@ -142,7 +142,7 @@ public void testQueueSizeAdjustmentScaleDown() queueSizeAdjuster); for (int i = 0; i < 20; i++) { - splits.add(new Split(TEST_CATALOG_HANDLE, TestingSplit.createRemoteSplit())); + splits.add(new Split(TEST_CATALOG_HANDLE, TestingSplit.createRemoteSplit(), ImmutableList.of(), true)); } // assign splits, mark all splits running to trigger adjustment @@ -190,7 +190,7 @@ public void testQueueSizeAdjustmentScaleDown() public void testQueueSizeAdjustmentAllNodes() { for (int i = 0; i < 20 * 9; i++) { - splits.add(new Split(TEST_CATALOG_HANDLE, TestingSplit.createRemoteSplit())); + splits.add(new Split(TEST_CATALOG_HANDLE, TestingSplit.createRemoteSplit(), ImmutableList.of(), true)); } // assign splits, marked all running to trigger adjustment @@ -242,7 +242,7 @@ public void testQueueSizeAdjustmentAllNodes() public void testQueueSizeAdjustmentOneOfAll() { for (int i = 0; i < 20 * 9; i++) { - splits.add(new Split(TEST_CATALOG_HANDLE, TestingSplit.createRemoteSplit())); + splits.add(new Split(TEST_CATALOG_HANDLE, TestingSplit.createRemoteSplit(), ImmutableList.of(), true)); } // assign splits, mark all splits for node1 running to trigger adjustment @@ -313,9 +313,9 @@ public void testFailover() true, new UniformNodeSelector.QueueSizeAdjuster(1000, 10000, new TestingTicker())); - Split rigidSplit = new Split(TEST_CATALOG_HANDLE, new TestingSplit(false, ImmutableList.of(node1.getHostAndPort()))); + Split rigidSplit = new Split(TEST_CATALOG_HANDLE, new TestingSplit(false, ImmutableList.of(node1.getHostAndPort())), ImmutableList.of(node1.getHostAndPort()), false); splits.add(rigidSplit); - Split flexibleSplit = new Split(TEST_CATALOG_HANDLE, new TestingSplit(true, ImmutableList.of(node1.getHostAndPort()))); + Split flexibleSplit = new Split(TEST_CATALOG_HANDLE, new TestingSplit(true, ImmutableList.of(node1.getHostAndPort())), ImmutableList.of(node1.getHostAndPort()), true); splits.add(flexibleSplit); // Both nodes alive, but both splits prefer node 1. diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestArbitraryDistributionSplitAssigner.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestArbitraryDistributionSplitAssigner.java index 63b99a764cb9..9d6dc90651c8 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestArbitraryDistributionSplitAssigner.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestArbitraryDistributionSplitAssigner.java @@ -760,24 +760,20 @@ private static int addUpSplits(HostAddress address, Map addresses) { - return new Split(TEST_CATALOG_HANDLE, new TestingConnectorSplit(id, OptionalInt.empty(), Optional.of(addresses))); + TestingConnectorSplit connectorSplit = new TestingConnectorSplit(id, OptionalInt.empty(), Optional.of(addresses)); + return new Split(TEST_CATALOG_HANDLE, connectorSplit, connectorSplit.getAddresses(), connectorSplit.isRemotelyAccessible()); } private static Split createRemoteAccessibleSplit(int id, List addresses) { - return new Split(TEST_CATALOG_HANDLE, new TestingConnectorSplit(id, OptionalInt.empty(), Optional.of(addresses)) - { - @Override - public boolean isRemotelyAccessible() - { - return true; - } - }); + TestingConnectorSplit connectorSplit = new TestingConnectorSplit(id, OptionalInt.empty(), Optional.of(addresses)); + return new Split(TEST_CATALOG_HANDLE, connectorSplit, connectorSplit.getAddresses(), true); } private static ListMultimap createSplitsMultimap(List splits) diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestEventDrivenTaskSource.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestEventDrivenTaskSource.java index 7e2017420b1e..56d93e5377e1 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestEventDrivenTaskSource.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestEventDrivenTaskSource.java @@ -469,7 +469,7 @@ private void setNextBatch() future = currentFuture; ConnectorSplit split = remainingSplits.poll(); boolean lastBatch = remainingSplits.isEmpty(); - batch = new SplitBatch(split == null ? ImmutableList.of() : ImmutableList.of(new Split(TEST_CATALOG_HANDLE, split)), lastBatch); + batch = new SplitBatch(split == null ? ImmutableList.of() : ImmutableList.of(new Split(TEST_CATALOG_HANDLE, split, ((TestingConnectorSplit) split).getAddresses(), ((TestingConnectorSplit) split).isRemotelyAccessible())), lastBatch); if (lastBatch) { finished = true; } diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestHashDistributionSplitAssigner.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestHashDistributionSplitAssigner.java index a5e44e03d380..f743b15df0ec 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestHashDistributionSplitAssigner.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestHashDistributionSplitAssigner.java @@ -591,7 +591,7 @@ private static FaultTolerantPartitioningScheme createPartitioningScheme(int part private static Split createSplit(int id, int partition) { - return new Split(TEST_CATALOG_HANDLE, new TestingConnectorSplit(id, OptionalInt.of(partition), Optional.empty())); + return new Split(TEST_CATALOG_HANDLE, new TestingConnectorSplit(id, OptionalInt.of(partition), Optional.empty()), ImmutableList.of(), true); } private static class SplitBatch diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestSingleDistributionSplitAssigner.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestSingleDistributionSplitAssigner.java index 95c9acc53a86..66657100ac90 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestSingleDistributionSplitAssigner.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestSingleDistributionSplitAssigner.java @@ -13,6 +13,7 @@ */ package io.trino.execution.scheduler.faulttolerant; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.ImmutableSet; import io.trino.metadata.Split; @@ -135,6 +136,6 @@ public void testMultipleSources() private Split createSplit(int id) { - return new Split(TEST_CATALOG_HANDLE, new TestingConnectorSplit(id, OptionalInt.empty(), Optional.empty())); + return new Split(TEST_CATALOG_HANDLE, new TestingConnectorSplit(id, OptionalInt.empty(), Optional.empty()), ImmutableList.of(), true); } } diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestSplitsMapping.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestSplitsMapping.java index 0c761703140d..b735a58dcf7d 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestSplitsMapping.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestSplitsMapping.java @@ -124,6 +124,6 @@ private ListMultimap splitIds(SplitsMapping splitsMapping, Str private static Split createSplit(int id) { - return new Split(TEST_CATALOG_HANDLE, new TestingConnectorSplit(id, OptionalInt.empty(), Optional.empty())); + return new Split(TEST_CATALOG_HANDLE, new TestingConnectorSplit(id, OptionalInt.empty(), Optional.empty()), ImmutableList.of(), true); } } diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestTaskDescriptorStorage.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestTaskDescriptorStorage.java index 29c5b1a338ff..55b2e5574e6e 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestTaskDescriptorStorage.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestTaskDescriptorStorage.java @@ -386,7 +386,7 @@ private static TaskDescriptor createTaskDescriptor(int partitionId, DataSize ret return new TaskDescriptor( partitionId, SplitsMapping.builder() - .addSplit(new PlanNodeId("1"), 1, new Split(REMOTE_CATALOG_HANDLE, new RemoteSplit(new SpoolingExchangeInput(ImmutableList.of(new TestingExchangeSourceHandle(retainedSize.toBytes())), Optional.empty())))) + .addSplit(new PlanNodeId("1"), 1, new Split(REMOTE_CATALOG_HANDLE, new RemoteSplit(new SpoolingExchangeInput(ImmutableList.of(new TestingExchangeSourceHandle(retainedSize.toBytes())), Optional.empty())), ImmutableList.of(), true)) .build(), new NodeRequirements(catalog, Optional.empty(), true)); } diff --git a/core/trino-main/src/test/java/io/trino/memory/TestMemoryBlocking.java b/core/trino-main/src/test/java/io/trino/memory/TestMemoryBlocking.java index 197b37e9a8a2..c88ec71a124d 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestMemoryBlocking.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestMemoryBlocking.java @@ -113,7 +113,7 @@ public void testTableScanMemoryBlocking() Driver driver = Driver.createDriver(driverContext, source, sink); assertThat(driver.getDriverContext()).isSameAs(driverContext); assertThat(driver.isFinished()).isFalse(); - Split testSplit = new Split(TEST_CATALOG_HANDLE, new TestSplit()); + Split testSplit = new Split(TEST_CATALOG_HANDLE, new TestSplit(), ImmutableList.of(), true); driver.updateSplitAssignment(new SplitAssignment(sourceId, ImmutableSet.of(new ScheduledSplit(0, sourceId, testSplit)), true)); ListenableFuture blocked = driver.processForDuration(new Duration(1, NANOSECONDS)); diff --git a/core/trino-main/src/test/java/io/trino/operator/BenchmarkScanFilterAndProjectOperator.java b/core/trino-main/src/test/java/io/trino/operator/BenchmarkScanFilterAndProjectOperator.java index 21fd35982fef..ed1b9a9fb24d 100644 --- a/core/trino-main/src/test/java/io/trino/operator/BenchmarkScanFilterAndProjectOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/BenchmarkScanFilterAndProjectOperator.java @@ -24,6 +24,7 @@ import io.trino.metadata.TestingFunctionResolution; import io.trino.operator.ScanFilterAndProjectOperator.ScanFilterAndProjectOperatorFactory; import io.trino.operator.project.PageProcessor; +import io.trino.spi.HostAddress; import io.trino.spi.Page; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.DynamicFilter; @@ -252,7 +253,7 @@ public List benchmarkColumnOriented(Context context) SourceOperator operator = (SourceOperator) context.getOperatorFactory().createOperator(driverContext); ImmutableList.Builder outputPages = ImmutableList.builder(); - operator.addSplit(new Split(TEST_CATALOG_HANDLE, createLocalSplit())); + operator.addSplit(new Split(TEST_CATALOG_HANDLE, createLocalSplit(), ImmutableList.of(HostAddress.fromString("127.0.0.1")), false)); operator.noMoreSplits(); for (int loops = 0; !operator.isFinished() && loops < 1_000_000; loops++) { diff --git a/core/trino-main/src/test/java/io/trino/operator/TestDriver.java b/core/trino-main/src/test/java/io/trino/operator/TestDriver.java index 2902d7c62051..b9d76531278a 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestDriver.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestDriver.java @@ -379,7 +379,7 @@ public void testBrokenOperatorAddSource() private static Split newMockSplit() { - return new Split(TEST_CATALOG_HANDLE, new MockSplit()); + return new Split(TEST_CATALOG_HANDLE, new MockSplit(), ImmutableList.of(), true); } private PageConsumerOperator createSinkOperator(List types) diff --git a/core/trino-main/src/test/java/io/trino/operator/TestExchangeOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestExchangeOperator.java index a7671b12a006..b586429c692a 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestExchangeOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestExchangeOperator.java @@ -154,7 +154,7 @@ public void testSimple() private static Split newRemoteSplit(TaskId taskId) { - return new Split(REMOTE_CATALOG_HANDLE, new RemoteSplit(new DirectExchangeInput(taskId, "http://localhost/" + taskId))); + return new Split(REMOTE_CATALOG_HANDLE, new RemoteSplit(new DirectExchangeInput(taskId, "http://localhost/" + taskId)), ImmutableList.of(), true); } @Test diff --git a/core/trino-main/src/test/java/io/trino/operator/TestMergeOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestMergeOperator.java index e75892613605..ab94afd67c80 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestMergeOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestMergeOperator.java @@ -374,7 +374,7 @@ private MergeOperator createMergeOperator(List sourceTypes, List private static Split createRemoteSplit(TaskId taskId) { - return new Split(ExchangeOperator.REMOTE_CATALOG_HANDLE, new RemoteSplit(new DirectExchangeInput(taskId, "http://localhost/" + taskId))); + return new Split(ExchangeOperator.REMOTE_CATALOG_HANDLE, new RemoteSplit(new DirectExchangeInput(taskId, "http://localhost/" + taskId)), ImmutableList.of(), true); } private static List pullAvailablePages(Operator operator) diff --git a/core/trino-main/src/test/java/io/trino/operator/TestScanFilterAndProjectOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestScanFilterAndProjectOperator.java index 6a9a9326a93c..7ed679b0d3c8 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestScanFilterAndProjectOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestScanFilterAndProjectOperator.java @@ -28,6 +28,7 @@ import io.trino.operator.project.PageProcessor; import io.trino.operator.project.TestPageProcessor.LazyPagePageProjection; import io.trino.operator.project.TestPageProcessor.SelectAllFilter; +import io.trino.spi.HostAddress; import io.trino.spi.Page; import io.trino.spi.block.Block; import io.trino.spi.connector.ConnectorPageSource; @@ -150,7 +151,7 @@ public void testPageSource() 0); SourceOperator operator = factory.createOperator(driverContext); - operator.addSplit(new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit())); + operator.addSplit(new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit(), ImmutableList.of(HostAddress.fromString("127.0.0.1")), false)); operator.noMoreSplits(); MaterializedResult expected = toMaterializedResult(driverContext.getSession(), ImmutableList.of(VARCHAR), ImmutableList.of(input)); @@ -193,7 +194,7 @@ public void testPageSourceMergeOutput() 2); SourceOperator operator = factory.createOperator(newDriverContext()); - operator.addSplit(new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit())); + operator.addSplit(new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit(), ImmutableList.of(HostAddress.fromString("127.0.0.1")), false)); operator.noMoreSplits(); List actual = toPages(operator); @@ -234,7 +235,7 @@ public void testPageSourceLazyLoad() 0); SourceOperator operator = factory.createOperator(driverContext); - operator.addSplit(new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit())); + operator.addSplit(new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit(), ImmutableList.of(HostAddress.fromString("127.0.0.1")), false)); operator.noMoreSplits(); MaterializedResult expected = toMaterializedResult(driverContext.getSession(), ImmutableList.of(BIGINT), ImmutableList.of(new Page(inputBlock))); @@ -270,7 +271,7 @@ public void testRecordCursorSource() 0); SourceOperator operator = factory.createOperator(driverContext); - operator.addSplit(new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit())); + operator.addSplit(new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit(), ImmutableList.of(HostAddress.fromString("127.0.0.1")), false)); operator.noMoreSplits(); MaterializedResult expected = toMaterializedResult(driverContext.getSession(), ImmutableList.of(VARCHAR), ImmutableList.of(input)); @@ -326,7 +327,7 @@ public void testPageYield() 0); SourceOperator operator = factory.createOperator(driverContext); - operator.addSplit(new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit())); + operator.addSplit(new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit(), ImmutableList.of(HostAddress.fromString("127.0.0.1")), false)); operator.noMoreSplits(); // In the below loop we yield for every cell: 20 X 1000 times diff --git a/core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java b/core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java index 9f0e5cb601f2..2883916299da 100644 --- a/core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java +++ b/core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java @@ -63,6 +63,7 @@ import io.trino.server.TaskUpdateRequest; import io.trino.simd.BlockEncodingSimdSupport; import io.trino.spi.ErrorCode; +import io.trino.spi.HostAddress; import io.trino.spi.NodeVersion; import io.trino.spi.QueryId; import io.trino.spi.block.Block; @@ -206,7 +207,7 @@ public void testRegular() testingTaskResource.setInitialTaskInfo(remoteTask.getTaskInfo()); remoteTask.start(); - remoteTask.addSplits(ImmutableMultimap.of(TABLE_SCAN_NODE_ID, new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit()))); + remoteTask.addSplits(ImmutableMultimap.of(TABLE_SCAN_NODE_ID, new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit(), ImmutableList.of(HostAddress.fromString("127.0.0.1")), false))); poll(() -> testingTaskResource.getTaskSplitAssignment(TABLE_SCAN_NODE_ID) != null); poll(() -> testingTaskResource.getTaskSplitAssignment(TABLE_SCAN_NODE_ID).getSplits().size() == 1); @@ -518,7 +519,7 @@ public void testAdaptiveRemoteTaskRequestSize() Multimap splits = HashMultimap.create(); for (int i = 0; i < 100; i++) { - splits.put(TABLE_SCAN_NODE_ID, new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit())); + splits.put(TABLE_SCAN_NODE_ID, new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit(), ImmutableList.of(HostAddress.fromString("127.0.0.1")), false)); } remoteTask.addSplits(splits); @@ -559,7 +560,7 @@ public void testAdjustSplitBatchSize() Set splits = new HashSet<>(); for (int i = 0; i < 1000; i++) { - splits.add(new ScheduledSplit(i, TABLE_SCAN_NODE_ID, new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit()))); + splits.add(new ScheduledSplit(i, TABLE_SCAN_NODE_ID, new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit(), ImmutableList.of(HostAddress.fromString("127.0.0.1")), false))); } // decrease splitBatchSize @@ -611,7 +612,7 @@ private void runTest(FailureScenario failureScenario) private void addSplit(RemoteTask remoteTask, TestingTaskResource testingTaskResource, int expectedSplitsCount) throws InterruptedException { - remoteTask.addSplits(ImmutableMultimap.of(TABLE_SCAN_NODE_ID, new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit()))); + remoteTask.addSplits(ImmutableMultimap.of(TABLE_SCAN_NODE_ID, new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit(), ImmutableList.of(HostAddress.fromString("127.0.0.1")), false))); // wait for splits to be received by remote task poll(() -> testingTaskResource.getTaskSplitAssignment(TABLE_SCAN_NODE_ID) != null); poll(() -> testingTaskResource.getTaskSplitAssignment(TABLE_SCAN_NODE_ID).getSplits().size() == expectedSplitsCount); diff --git a/core/trino-main/src/test/java/io/trino/split/MockSplitSource.java b/core/trino-main/src/test/java/io/trino/split/MockSplitSource.java index d20ab8a6221d..855460bfad56 100644 --- a/core/trino-main/src/test/java/io/trino/split/MockSplitSource.java +++ b/core/trino-main/src/test/java/io/trino/split/MockSplitSource.java @@ -38,7 +38,7 @@ public class MockSplitSource implements SplitSource { - private static final Split SPLIT = new Split(TEST_CATALOG_HANDLE, new MockConnectorSplit()); + private static final Split SPLIT = new Split(TEST_CATALOG_HANDLE, new MockConnectorSplit(), ImmutableList.of(), true); private static final SettableFuture> COMPLETED_FUTURE = SettableFuture.create(); static { diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/Connector.java b/core/trino-spi/src/main/java/io/trino/spi/connector/Connector.java index 3c24633849ac..b78d62e2623f 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/Connector.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/Connector.java @@ -13,6 +13,7 @@ */ package io.trino.spi.connector; +import io.trino.spi.HostAddress; import io.trino.spi.function.FunctionProvider; import io.trino.spi.function.table.ConnectorTableFunction; import io.trino.spi.procedure.Procedure; @@ -65,6 +66,32 @@ default ConnectorSplitManager getSplitManager() throw new UnsupportedOperationException(); } + /** + * Returns the provider that supplies address (locality) information for scheduling splits. + *

+ * The default delegates to the split's own {@link ConnectorSplit#getAddresses()} and + * {@link ConnectorSplit#isRemotelyAccessible()} methods, preserving backward compatibility. + * Override to express cache affinity, file-system locality, or hard node constraints + * independently of the split object. + */ + default ConnectorSplitAddressProvider getSplitAddressProvider() + { + return new ConnectorSplitAddressProvider() + { + @Override + public List getAddresses(ConnectorSplit split) + { + return split.getAddresses(); + } + + @Override + public boolean isRemotelyAccessible(ConnectorSplit split) + { + return split.isRemotelyAccessible(); + } + }; + } + /** * @throws UnsupportedOperationException if this connector does not support reading tables page at a time */ diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplitAddressProvider.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplitAddressProvider.java new file mode 100644 index 000000000000..0171173b8362 --- /dev/null +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplitAddressProvider.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spi.connector; + +import io.trino.spi.HostAddress; + +import java.util.List; + +/** + * Provides address (locality) information for scheduling {@link ConnectorSplit}s. + */ +public interface ConnectorSplitAddressProvider +{ + /** + * Returns the list of preferred or required host addresses for the given split. + *

+ * When {@link #isRemotelyAccessible(ConnectorSplit)} returns {@code true}, these addresses + * are hints: the scheduler will try them first but may fall back to any available node. + * When it returns {@code false}, the split must run on one of the returned addresses. + */ + List getAddresses(ConnectorSplit split); + + /** + * Returns {@code true} when the split can be scheduled on any node (addresses are hints), + * or {@code false} when the split must be scheduled on one of the addresses returned by + * {@link #getAddresses(ConnectorSplit)}. + */ + boolean isRemotelyAccessible(ConnectorSplit split); + + ConnectorSplitAddressProvider DEFAULT = new ConnectorSplitAddressProvider() + { + @Override + public List getAddresses(ConnectorSplit split) + { + return List.of(); + } + + @Override + public boolean isRemotelyAccessible(ConnectorSplit split) + { + return true; + } + }; +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOrcPageSourceMemoryTracking.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOrcPageSourceMemoryTracking.java index c56ccdd81c22..1da7a7c0abf6 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOrcPageSourceMemoryTracking.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOrcPageSourceMemoryTracking.java @@ -38,6 +38,7 @@ import io.trino.plugin.hive.orc.OrcWriterConfig; import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.hive.parquet.ParquetWriterConfig; +import io.trino.spi.HostAddress; import io.trino.spi.Page; import io.trino.spi.classloader.ThreadContextClassLoader; import io.trino.spi.connector.ColumnHandle; @@ -601,7 +602,7 @@ public SourceOperator newTableScanOperator(DriverContext driverContext) columns.stream().map(ColumnHandle.class::cast).collect(toImmutableList()), types); SourceOperator operator = sourceOperatorFactory.createOperator(driverContext); - operator.addSplit(new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit())); + operator.addSplit(new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit(), ImmutableList.of(HostAddress.fromString("127.0.0.1")), false)); return operator; } @@ -628,7 +629,7 @@ public SourceOperator newScanFilterAndProjectOperator(DriverContext driverContex DataSize.ofBytes(0), 0); SourceOperator operator = sourceOperatorFactory.createOperator(driverContext); - operator.addSplit(new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit())); + operator.addSplit(new Split(TEST_CATALOG_HANDLE, TestingSplit.createLocalSplit(), ImmutableList.of(HostAddress.fromString("127.0.0.1")), false)); operator.noMoreSplits(); return operator; } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergBucketFunction.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergBucketFunction.java index 6b1235d6a127..69833c6f035e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergBucketFunction.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergBucketFunction.java @@ -87,7 +87,7 @@ public int getBucket(Page page, int position) @Override public int applyAsInt(ConnectorSplit split) { - List partitionValues = getPartitionValues(((IcebergSplit) split).getPartitionValues()); + List partitionValues = getPartitionValues(((IcebergSplit) split).partitionValues()); if (singleBucketFunction) { long bucket = (long) requireNonNullElse(partitionValues.getFirst(), 0L); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnector.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnector.java index 95baa864ac18..b5391051fb12 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnector.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnector.java @@ -31,6 +31,7 @@ import io.trino.spi.connector.ConnectorPageSinkProvider; import io.trino.spi.connector.ConnectorPageSourceProviderFactory; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorSplitAddressProvider; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.connector.SystemTable; @@ -64,6 +65,7 @@ public class IcebergConnector private final LifeCycleManager lifeCycleManager; private final IcebergTransactionManager transactionManager; private final ConnectorSplitManager splitManager; + private final ConnectorSplitAddressProvider splitAddressProvider; private final ConnectorPageSourceProviderFactory pageSourceProviderFactory; private final ConnectorPageSinkProvider pageSinkProvider; private final ConnectorNodePartitioningProvider nodePartitioningProvider; @@ -85,6 +87,7 @@ public IcebergConnector( LifeCycleManager lifeCycleManager, IcebergTransactionManager transactionManager, ConnectorSplitManager splitManager, + ConnectorSplitAddressProvider splitAddressProvider, ConnectorPageSourceProviderFactory pageSourceProviderFactory, ConnectorPageSinkProvider pageSinkProvider, ConnectorNodePartitioningProvider nodePartitioningProvider, @@ -104,6 +107,7 @@ public IcebergConnector( this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null"); this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); this.splitManager = requireNonNull(splitManager, "splitManager is null"); + this.splitAddressProvider = requireNonNull(splitAddressProvider, "splitAddressProvider is null"); this.pageSourceProviderFactory = requireNonNull(pageSourceProviderFactory, "pageSourceProviderFactory is null"); this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null"); this.nodePartitioningProvider = requireNonNull(nodePartitioningProvider, "nodePartitioningProvider is null"); @@ -150,6 +154,12 @@ public ConnectorSplitManager getSplitManager() return splitManager; } + @Override + public ConnectorSplitAddressProvider getSplitAddressProvider() + { + return splitAddressProvider; + } + @Override public ConnectorPageSourceProviderFactory getPageSourceProviderFactory() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java index b39e3c62cd9c..9aa929c6eb85 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java @@ -55,6 +55,7 @@ import io.trino.spi.connector.ConnectorNodePartitioningProvider; import io.trino.spi.connector.ConnectorPageSinkProvider; import io.trino.spi.connector.ConnectorPageSourceProviderFactory; +import io.trino.spi.connector.ConnectorSplitAddressProvider; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.SystemTable; import io.trino.spi.connector.TableProcedureMetadata; @@ -86,6 +87,7 @@ public void configure(Binder binder) binder.bind(ConnectorSplitManager.class).annotatedWith(ForClassLoaderSafe.class).to(IcebergSplitManager.class).in(Scopes.SINGLETON); binder.bind(ConnectorSplitManager.class).to(ClassLoaderSafeConnectorSplitManager.class).in(Scopes.SINGLETON); + binder.bind(ConnectorSplitAddressProvider.class).to(IcebergSplitAddressProvider.class).in(Scopes.SINGLETON); binder.bind(ConnectorPageSourceProviderFactory.class).annotatedWith(ForClassLoaderSafe.class).to(IcebergPageSourceProviderFactory.class).in(Scopes.SINGLETON); binder.bind(IcebergPageSourceProviderFactory.class).in(Scopes.SINGLETON); binder.bind(ConnectorPageSourceProviderFactory.class).to(ClassLoaderSafeConnectorPageSourceProviderFactory.class).in(Scopes.SINGLETON); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index a82f5b0dfe12..531f0b1f4658 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -273,7 +273,7 @@ public ConnectorPageSource createPageSource( .collect(toImmutableList()); IcebergTableHandle tableHandle = (IcebergTableHandle) connectorTable; Schema schema = SchemaParser.fromJson(tableHandle.getTableSchemaJson()); - String partitionSpecJson = tableHandle.getPartitionSpecJsons().get(split.getSpecId()); + String partitionSpecJson = tableHandle.getPartitionSpecJsons().get(split.specId()); PartitionSpec partitionSpec = PartitionSpecParser.fromJson(schema, partitionSpecJson); org.apache.iceberg.types.Type[] partitionColumnTypes = partitionSpec.fields().stream() .map(field -> field.transform().getResultType(schema.findType(field.sourceId()))) @@ -284,20 +284,20 @@ public ConnectorPageSource createPageSource( icebergColumns, schema, partitionSpec, - PartitionData.fromBlocks(split.getPartitionValues(), partitionColumnTypes, typeManager), - split.getDeletes(), + PartitionData.fromBlocks(split.partitionValues(), partitionColumnTypes, typeManager), + split.deletes(), dynamicFilter, tableHandle.getUnenforcedPredicate(), - split.getFileStatisticsDomain(), - split.getPath(), - split.getStart(), - split.getLength(), - split.getFileSize(), - split.getFileRecordCount(), - split.getFileFormat(), + split.fileStatisticsDomain(), + split.path(), + split.start(), + split.length(), + split.fileSize(), + split.fileRecordCount(), + split.fileFormat(), getFileIoProperties(connectorTableCredentials), - split.getDataSequenceNumber(), - split.getFileFirstRowId(), + split.dataSequenceNumber(), + split.fileFirstRowId(), tableHandle.getNameMappingJson().map(NameMappingParser::fromJson)); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java index 6bed586ed031..6ac851687591 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java @@ -13,13 +13,9 @@ */ package io.trino.plugin.iceberg; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.collect.ImmutableList; import io.trino.plugin.iceberg.delete.DeleteFile; -import io.trino.spi.HostAddress; import io.trino.spi.SplitWeight; import io.trino.spi.block.Block; import io.trino.spi.connector.ConnectorSplit; @@ -35,177 +31,40 @@ import static io.airlift.slice.SizeOf.instanceSize; import static java.util.Objects.requireNonNull; -public class IcebergSplit - implements ConnectorSplit +public record IcebergSplit( + String path, + long start, + long length, + long fileSize, + long fileRecordCount, + IcebergFileFormat fileFormat, + int specId, + List partitionValues, + List deletes, + SplitWeight splitWeight, + TupleDomain fileStatisticsDomain, + long dataSequenceNumber, + OptionalLong fileFirstRowId) implements ConnectorSplit { private static final int INSTANCE_SIZE = instanceSize(IcebergSplit.class); - private final String path; - private final long start; - private final long length; - private final long fileSize; - private final long fileRecordCount; - private final IcebergFileFormat fileFormat; - private final int specId; - private final List partitionValues; - private final List deletes; - private final SplitWeight splitWeight; - private final TupleDomain fileStatisticsDomain; - private final long dataSequenceNumber; - private final OptionalLong fileFirstRowId; - private final List addresses; - - @JsonCreator - public IcebergSplit( - @JsonProperty("path") String path, - @JsonProperty("start") long start, - @JsonProperty("length") long length, - @JsonProperty("fileSize") long fileSize, - @JsonProperty("fileRecordCount") long fileRecordCount, - @JsonProperty("fileFormat") IcebergFileFormat fileFormat, - @JsonProperty("specId") int specId, - @JsonProperty("partitionValues") List partitionValues, - @JsonProperty("deletes") List deletes, - @JsonProperty("splitWeight") SplitWeight splitWeight, - @JsonProperty("fileStatisticsDomain") TupleDomain fileStatisticsDomain, - @JsonProperty("dataSequenceNumber") long dataSequenceNumber, - @JsonProperty("fileFirstRowId") OptionalLong fileFirstRowId) - { - this( - path, - start, - length, - fileSize, - fileRecordCount, - fileFormat, - specId, - partitionValues, - deletes, - splitWeight, - fileStatisticsDomain, - ImmutableList.of(), - dataSequenceNumber, - fileFirstRowId); - } - - public IcebergSplit( - String path, - long start, - long length, - long fileSize, - long fileRecordCount, - IcebergFileFormat fileFormat, - int specId, - List partitionValues, - List deletes, - SplitWeight splitWeight, - TupleDomain fileStatisticsDomain, - List addresses, - long dataSequenceNumber, - OptionalLong fileFirstRowId) - { - this.path = requireNonNull(path, "path is null"); - this.start = start; - this.length = length; - this.fileSize = fileSize; - this.fileRecordCount = fileRecordCount; - this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); - this.specId = specId; - this.partitionValues = ImmutableList.copyOf(partitionValues); - this.deletes = ImmutableList.copyOf(requireNonNull(deletes, "deletes is null")); - this.splitWeight = requireNonNull(splitWeight, "splitWeight is null"); - this.fileStatisticsDomain = requireNonNull(fileStatisticsDomain, "fileStatisticsDomain is null"); - this.addresses = requireNonNull(addresses, "addresses is null"); - this.dataSequenceNumber = dataSequenceNumber; - this.fileFirstRowId = requireNonNull(fileFirstRowId, "fileFirstRowId is null"); - } - - @JsonIgnore - @Override - public List getAddresses() - { - return addresses; - } - - @JsonProperty - public String getPath() - { - return path; - } - - @JsonProperty - public long getStart() - { - return start; - } - - @JsonProperty - public long getLength() - { - return length; - } - - @JsonProperty - public long getFileSize() - { - return fileSize; - } - - @JsonProperty - public long getFileRecordCount() - { - return fileRecordCount; - } - - @JsonProperty - public IcebergFileFormat getFileFormat() - { - return fileFormat; - } - - @JsonProperty - public int getSpecId() - { - return specId; - } - - @JsonProperty - public List getPartitionValues() - { - return partitionValues; - } - - @JsonProperty - public List getDeletes() + public IcebergSplit { - return deletes; + requireNonNull(path, "path is null"); + requireNonNull(fileFormat, "fileFormat is null"); + partitionValues = ImmutableList.copyOf(partitionValues); + deletes = ImmutableList.copyOf(requireNonNull(deletes, "deletes is null")); + requireNonNull(splitWeight, "splitWeight is null"); + requireNonNull(fileStatisticsDomain, "fileStatisticsDomain is null"); + requireNonNull(fileFirstRowId, "fileFirstRowId is null"); } - @JsonProperty @Override public SplitWeight getSplitWeight() { return splitWeight; } - @JsonProperty - public TupleDomain getFileStatisticsDomain() - { - return fileStatisticsDomain; - } - - @JsonProperty - public long getDataSequenceNumber() - { - return dataSequenceNumber; - } - - @JsonProperty - public OptionalLong getFileFirstRowId() - { - return fileFirstRowId; - } - @Override public long getRetainedSizeInBytes() { @@ -218,7 +77,6 @@ public long getRetainedSizeInBytes() + splitWeight.getRetainedSizeInBytes() + fileStatisticsDomain.getRetainedSizeInBytes(IcebergColumnHandle::getRetainedSizeInBytes) + SIZE_OF_LONG // dataSequenceNumber - + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes) + (fileFirstRowId.isPresent() ? SIZE_OF_LONG : 0); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitAddressProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitAddressProvider.java new file mode 100644 index 000000000000..90a99c3f76a9 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitAddressProvider.java @@ -0,0 +1,62 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; +import io.trino.filesystem.cache.CachingHostAddressProvider; +import io.trino.plugin.iceberg.functions.tablechanges.TableChangesSplit; +import io.trino.spi.HostAddress; +import io.trino.spi.connector.ConnectorSplit; +import io.trino.spi.connector.ConnectorSplitAddressProvider; + +import java.util.List; + +import static io.trino.filesystem.cache.CachingHostAddressProvider.getSplitKey; +import static java.util.Objects.requireNonNull; + +public class IcebergSplitAddressProvider + implements ConnectorSplitAddressProvider +{ + private final CachingHostAddressProvider cachingHostAddressProvider; + + @Inject + public IcebergSplitAddressProvider(CachingHostAddressProvider cachingHostAddressProvider) + { + this.cachingHostAddressProvider = requireNonNull(cachingHostAddressProvider, "cachingHostAddressProvider is null"); + } + + @Override + public List getAddresses(ConnectorSplit split) + { + switch (split) { + case IcebergSplit icebergSplit -> { + return cachingHostAddressProvider.getHosts( + getSplitKey(icebergSplit.path(), icebergSplit.start(), icebergSplit.length()), + ImmutableList.of()); + } + case TableChangesSplit tableChangesSplit -> { + return ImmutableList.of(); + } + + default -> throw new IllegalStateException("Unexpected value: " + split); + } + } + + @Override + public boolean isRemotelyAccessible(ConnectorSplit split) + { + return true; + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java index b57a63670203..567918f4f624 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java @@ -17,7 +17,6 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.inject.Inject; import io.airlift.units.Duration; -import io.trino.filesystem.cache.CachingHostAddressProvider; import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource; import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionHandle; import io.trino.plugin.iceberg.functions.tablechanges.TableChangesSplitSource; @@ -64,7 +63,6 @@ public class IcebergSplitManager private final IcebergFileSystemFactory fileSystemFactory; private final ListeningExecutorService splitSourceExecutor; private final ExecutorService icebergPlanningExecutor; - private final CachingHostAddressProvider cachingHostAddressProvider; @Inject public IcebergSplitManager( @@ -72,15 +70,13 @@ public IcebergSplitManager( TypeManager typeManager, IcebergFileSystemFactory fileSystemFactory, @ForIcebergSplitSource ListeningExecutorService splitSourceExecutor, - @ForIcebergSplitManager ExecutorService icebergPlanningExecutor, - CachingHostAddressProvider cachingHostAddressProvider) + @ForIcebergSplitManager ExecutorService icebergPlanningExecutor) { this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.splitSourceExecutor = requireNonNull(splitSourceExecutor, "splitSourceExecutor is null"); this.icebergPlanningExecutor = requireNonNull(icebergPlanningExecutor, "icebergPlanningExecutor is null"); - this.cachingHostAddressProvider = requireNonNull(cachingHostAddressProvider, "cachingHostAddressProvider is null"); } @Override @@ -120,7 +116,6 @@ public ConnectorSplitSource getSplits( typeManager, table.isRecordScannedFiles(), getMinimumAssignedSplitWeight(session), - cachingHostAddressProvider, metricsReporter, splitSourceExecutor); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index 9d81375a524e..8fe650cfa6c7 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -29,7 +29,6 @@ import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.cache.NonEvictableCache; -import io.trino.filesystem.cache.CachingHostAddressProvider; import io.trino.plugin.base.metrics.DurationTiming; import io.trino.plugin.base.metrics.IntList; import io.trino.plugin.base.metrics.LongCount; @@ -100,7 +99,6 @@ import static io.airlift.slice.Slices.utf8Slice; import static io.trino.cache.CacheUtils.uncheckedCacheGet; import static io.trino.cache.SafeCaches.buildNonEvictableCache; -import static io.trino.filesystem.cache.CachingHostAddressProvider.getSplitKey; import static io.trino.plugin.iceberg.ExpressionConverter.isConvertibleToIcebergExpression; import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression; import static io.trino.plugin.iceberg.IcebergExceptions.translateMetadataException; @@ -185,7 +183,6 @@ public class IcebergSplitSource private Map> scannedFilesByPartition = new HashMap<>(); @GuardedBy("this") private long outputRowsLowerBound; - private final CachingHostAddressProvider cachingHostAddressProvider; private final InMemoryMetricsReporter metricsReporter; private volatile boolean finished; @@ -202,7 +199,6 @@ public IcebergSplitSource( TypeManager typeManager, boolean recordScannedFiles, double minimumAssignedSplitWeight, - CachingHostAddressProvider cachingHostAddressProvider, InMemoryMetricsReporter metricsReporter, ListeningExecutorService executor) { @@ -241,7 +237,6 @@ public IcebergSplitSource( .map(IcebergColumnHandle::getId) .collect(toImmutableSet()); this.fileModifiedTimeDomain = getFileModifiedTimeDomain(tableHandle.getEnforcedPredicate()); - this.cachingHostAddressProvider = requireNonNull(cachingHostAddressProvider, "cachingHostAddressProvider is null"); this.metricsReporter = requireNonNull(metricsReporter, "metricsReporter is null"); this.executor = requireNonNull(executor, "executor is null"); } @@ -744,7 +739,6 @@ private IcebergSplit toIcebergSplit(FileScanTaskWithDomain taskWithDomain) .collect(toImmutableList()), SplitWeight.fromProportion(clamp(getSplitWeight(task), minimumAssignedSplitWeight, 1.0)), taskWithDomain.fileStatisticsDomain(), - cachingHostAddressProvider.getHosts(getSplitKey(task.file().location(), task.start(), task.length()), ImmutableList.of()), task.file().dataSequenceNumber(), task.file().firstRowId() == null ? OptionalLong.empty() : OptionalLong.of(task.file().firstRowId())); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index 2271d22eb42a..1a7f62858714 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -18,7 +18,6 @@ import com.google.common.collect.ImmutableSet; import io.airlift.units.Duration; import io.trino.filesystem.TrinoFileSystemFactory; -import io.trino.filesystem.cache.DefaultCachingHostAddressProvider; import io.trino.metastore.HiveMetastore; import io.trino.metastore.cache.CachingHiveMetastore; import io.trino.plugin.hive.TrinoViewHiveMetastore; @@ -192,7 +191,6 @@ public TupleDomain getCurrentPredicate() TESTING_TYPE_MANAGER, false, new IcebergConfig().getMinimumAssignedSplitWeight(), - new DefaultCachingHostAddressProvider(), new InMemoryMetricsReporter(), newDirectExecutorService())) { ImmutableList.Builder splits = ImmutableList.builder(); @@ -223,14 +221,14 @@ public void testFileStatisticsDomain() IcebergTableHandle tableHandle = createTableHandle(schemaTableName, nationTable, TupleDomain.all()); IcebergSplit split = generateSplit(nationTable, tableHandle, DynamicFilter.EMPTY); - assertThat(split.getFileStatisticsDomain()).isEqualTo(TupleDomain.all()); + assertThat(split.fileStatisticsDomain()).isEqualTo(TupleDomain.all()); IcebergColumnHandle nationKey = IcebergColumnHandle.optional(new ColumnIdentity(1, "nationkey", ColumnIdentity.TypeCategory.PRIMITIVE, ImmutableList.of())) .columnType(BIGINT) .build(); tableHandle = createTableHandle(schemaTableName, nationTable, TupleDomain.fromFixedValues(ImmutableMap.of(nationKey, NullableValue.of(BIGINT, 1L)))); split = generateSplit(nationTable, tableHandle, DynamicFilter.EMPTY); - assertThat(split.getFileStatisticsDomain()).isEqualTo(TupleDomain.withColumnDomains( + assertThat(split.fileStatisticsDomain()).isEqualTo(TupleDomain.withColumnDomains( ImmutableMap.of(nationKey, Domain.create(ValueSet.ofRanges(Range.range(BIGINT, 0L, true, 24L, true)), false)))); IcebergColumnHandle regionKey = IcebergColumnHandle.optional(new ColumnIdentity(3, "regionkey", ColumnIdentity.TypeCategory.PRIMITIVE, ImmutableList.of())) @@ -268,7 +266,7 @@ public TupleDomain getCurrentPredicate() return TupleDomain.all(); } }); - assertThat(split.getFileStatisticsDomain()).isEqualTo(TupleDomain.withColumnDomains( + assertThat(split.fileStatisticsDomain()).isEqualTo(TupleDomain.withColumnDomains( ImmutableMap.of( nationKey, Domain.create(ValueSet.ofRanges(Range.range(BIGINT, 0L, true, 24L, true)), false), regionKey, Domain.create(ValueSet.ofRanges(Range.range(BIGINT, 0L, true, 4L, true)), false)))); @@ -418,7 +416,6 @@ private IcebergSplit generateSplit(Table nationTable, IcebergTableHandle tableHa TESTING_TYPE_MANAGER, false, 0, - new DefaultCachingHostAddressProvider(), new InMemoryMetricsReporter(), newDirectExecutorService())) { ImmutableList.Builder builder = ImmutableList.builder();