Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorPageSourceProviderFactory;
import io.trino.spi.connector.ConnectorSplitAddressProvider;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.function.FunctionProvider;

Expand All @@ -59,6 +60,13 @@ public static CatalogServiceProvider<ConnectorSplitManager> createSplitManagerPr
return new ConnectorCatalogServiceProvider<>("split manager", connectorServicesProvider, connector -> connector.getSplitManager().orElse(null));
}

@Provides
@Singleton
public static CatalogServiceProvider<ConnectorSplitAddressProvider> createSplitAddressProviderProvider(ConnectorServicesProvider connectorServicesProvider)
{
return new ConnectorCatalogServiceProvider<>("split address provider", connectorServicesProvider, ConnectorServices::getSplitAddressProvider);
}

@Provides
@Singleton
public static CatalogServiceProvider<ConnectorPageSourceProviderFactory> createPageSourceProviderFactory(ConnectorServicesProvider connectorServicesProvider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.trino.spi.connector.ConnectorPageSourceProviderFactory;
import io.trino.spi.connector.ConnectorRecordSetProvider;
import io.trino.spi.connector.ConnectorSecurityContext;
import io.trino.spi.connector.ConnectorSplitAddressProvider;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.SchemaRoutineName;
import io.trino.spi.connector.SystemTable;
Expand Down Expand Up @@ -75,6 +76,7 @@ public class ConnectorServices
private final Optional<FunctionProvider> functionProvider;
private final CatalogTableFunctions tableFunctions;
private final Optional<ConnectorSplitManager> splitManager;
private final ConnectorSplitAddressProvider splitAddressProvider;
private final Optional<ConnectorPageSourceProviderFactory> pageSourceProviderFactory;
private final Optional<ConnectorPageSinkProvider> pageSinkProvider;
private final Optional<ConnectorIndexProvider> indexProvider;
Expand Down Expand Up @@ -125,6 +127,7 @@ public ConnectorServices(Tracer tracer, CatalogHandle catalogHandle, Connector c
catch (UnsupportedOperationException _) {
}
this.splitManager = Optional.ofNullable(splitManager);
this.splitAddressProvider = requireNonNull(connector.getSplitAddressProvider(), format("Connector '%s' returned a null split address provider", catalogHandle));

ConnectorPageSourceProviderFactory connectorPageSourceProviderFactory = null;
try {
Expand Down Expand Up @@ -264,6 +267,11 @@ public Optional<ConnectorSplitManager> getSplitManager()
return splitManager;
}

public ConnectorSplitAddressProvider getSplitAddressProvider()
{
return splitAddressProvider;
}

public Optional<ConnectorPageSourceProviderFactory> getPageSourceProviderFactory()
{
return pageSourceProviderFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private List<Split> createRemoteSplits(List<ExchangeSourceHandle> handles)

private static Split createRemoteSplit(List<ExchangeSourceHandle> handles)
{
return new Split(REMOTE_CATALOG_HANDLE, new RemoteSplit(new SpoolingExchangeInput(handles, Optional.empty())));
return new Split(REMOTE_CATALOG_HANDLE, new RemoteSplit(new SpoolingExchangeInput(handles, Optional.empty())), ImmutableList.of(), true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ private static Split createExchangeSplit(RemoteTask sourceTask, RemoteTask desti
// Fetch the results from the buffer assigned to the task based on id
URI exchangeLocation = sourceTask.getTaskStatus().self();
URI splitLocation = uriBuilderFrom(exchangeLocation).appendPath("results").appendPath(String.valueOf(destinationTask.getTaskId().partitionId())).build();
return new Split(REMOTE_CATALOG_HANDLE, new RemoteSplit(new DirectExchangeInput(sourceTask.getTaskId(), splitLocation.toString())));
return new Split(REMOTE_CATALOG_HANDLE, new RemoteSplit(new DirectExchangeInput(sourceTask.getTaskId(), splitLocation.toString())), ImmutableList.of(), true);
}

private static class PipelinedStageStateMachine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.execution.scheduler;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
Expand Down Expand Up @@ -264,7 +265,9 @@ else if (pendingSplits.isEmpty()) {
// Scheduling an empty split kicks off necessary driver instantiation to make this work.
pendingSplits.add(new Split(
splitSource.getCatalogHandle(),
new EmptySplit(splitSource.getCatalogHandle())));
new EmptySplit(splitSource.getCatalogHandle()),
ImmutableList.of(),
true));
}
log.debug("stage id: %s, node: %s; transitioning to SPLITS_SCHEDULED", stageExecution.getStageId(), partitionedNode);
state = State.SPLITS_SCHEDULED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3104,7 +3104,7 @@ public String getDebugInfo()

private static Split createOutputSelectorSplit(ExchangeSourceOutputSelector selector)
{
return new Split(REMOTE_CATALOG_HANDLE, new RemoteSplit(new SpoolingExchangeInput(ImmutableList.of(), Optional.of(selector))));
return new Split(REMOTE_CATALOG_HANDLE, new RemoteSplit(new SpoolingExchangeInput(ImmutableList.of(), Optional.of(selector))), ImmutableList.of(), true);
}

private static class OpenTaskDescriptor
Expand Down
25 changes: 22 additions & 3 deletions core/trino-main/src/main/java/io/trino/metadata/Split.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package io.trino.metadata;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.trino.connector.CatalogHandle;
import io.trino.spi.HostAddress;
import io.trino.spi.SplitWeight;
Expand All @@ -23,6 +25,8 @@
import java.util.List;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.slice.SizeOf.estimatedSizeOf;
import static io.airlift.slice.SizeOf.instanceSize;
import static java.util.Objects.requireNonNull;

Expand All @@ -32,14 +36,24 @@ public final class Split

private final CatalogHandle catalogHandle;
private final ConnectorSplit connectorSplit;
private final List<HostAddress> addresses;
private final boolean remotelyAccessible;

@JsonCreator
public Split(
@JsonProperty("catalogHandle") CatalogHandle catalogHandle,
@JsonProperty("connectorSplit") ConnectorSplit connectorSplit)
{
this(catalogHandle, connectorSplit, ImmutableList.of(), true);
}

public Split(CatalogHandle catalogHandle, ConnectorSplit connectorSplit, List<HostAddress> addresses, boolean remotelyAccessible)
{
checkArgument(remotelyAccessible || !addresses.isEmpty(), "addresses must be provided when remotelyAccessible=false");
this.catalogHandle = requireNonNull(catalogHandle, "catalogHandle is null");
this.connectorSplit = requireNonNull(connectorSplit, "connectorSplit is null");
this.addresses = ImmutableList.copyOf(addresses);
this.remotelyAccessible = remotelyAccessible;
}

@JsonProperty
Expand All @@ -54,14 +68,18 @@ public ConnectorSplit getConnectorSplit()
return connectorSplit;
}

// do not serialize addresses as they are not needed on workers
@JsonIgnore
public List<HostAddress> getAddresses()
{
return connectorSplit.getAddresses();
return addresses;
}

// do not serialize remotelyAccessible as it is not needed on workers
@JsonIgnore
public boolean isRemotelyAccessible()
{
return connectorSplit.isRemotelyAccessible();
return remotelyAccessible;
}

public SplitWeight getSplitWeight()
Expand All @@ -82,6 +100,7 @@ public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE
+ catalogHandle.getRetainedSizeInBytes()
+ connectorSplit.getRetainedSizeInBytes();
+ connectorSplit.getRetainedSizeInBytes()
+ estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ private IndexedData streamIndexDataForSingleKey(UpdateRequest updateRequest)

PageRecordSet pageRecordSet = new PageRecordSet(keyTypes, indexKeyTuple);
PlanNodeId planNodeId = driverFactory.getSourceId().get();
ScheduledSplit split = new ScheduledSplit(0, planNodeId, new Split(INDEX_CATALOG_HANDLE, new IndexSplit(pageRecordSet)));
ScheduledSplit split = new ScheduledSplit(0, planNodeId, new Split(INDEX_CATALOG_HANDLE, new IndexSplit(pageRecordSet), ImmutableList.of(), true));
driver.updateSplitAssignment(new SplitAssignment(planNodeId, ImmutableSet.of(split), true));

return new StreamingIndexedData(outputTypes, keyEqualOperators, indexKeyTuple, pageBuffer, driver);
Expand Down Expand Up @@ -323,7 +323,7 @@ public boolean load(List<UpdateRequest> requests)
// Drive index lookup to produce the output (landing in indexSnapshotBuilder)
try (Driver driver = driverFactory.createDriver(pipelineContext.addDriverContext())) {
PlanNodeId sourcePlanNodeId = driverFactory.getSourceId().get();
ScheduledSplit split = new ScheduledSplit(0, sourcePlanNodeId, new Split(INDEX_CATALOG_HANDLE, new IndexSplit(recordSetForLookupSource)));
ScheduledSplit split = new ScheduledSplit(0, sourcePlanNodeId, new Split(INDEX_CATALOG_HANDLE, new IndexSplit(recordSetForLookupSource), ImmutableList.of(), true));
driver.updateSplitAssignment(new SplitAssignment(sourcePlanNodeId, ImmutableSet.of(split), true));
while (!driver.isFinished()) {
ListenableFuture<Void> process = driver.processUntilBlocked();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.connector.CatalogHandle;
import io.trino.metadata.Split;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorSplitAddressProvider;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorSplitSource.ConnectorSplitBatch;
import io.trino.spi.metrics.Metrics;
Expand Down Expand Up @@ -47,6 +48,7 @@ public class ConnectorAwareSplitSource
implements SplitSource
{
private final CatalogHandle catalogHandle;
private final ConnectorSplitAddressProvider addressProvider;
private final String sourceToString;

@Nullable
Expand All @@ -55,10 +57,11 @@ public class ConnectorAwareSplitSource
private Optional<Optional<List<Object>>> tableExecuteSplitsInfo = Optional.empty();
private Metrics metrics = Metrics.EMPTY;

public ConnectorAwareSplitSource(CatalogHandle catalogHandle, ConnectorSplitSource source)
public ConnectorAwareSplitSource(CatalogHandle catalogHandle, ConnectorSplitSource source, ConnectorSplitAddressProvider addressProvider)
{
this.catalogHandle = requireNonNull(catalogHandle, "catalogHandle is null");
this.source = requireNonNull(source, "source is null");
this.addressProvider = requireNonNull(addressProvider, "addressProvider is null");
this.sourceToString = source.toString();
}

Expand All @@ -77,7 +80,11 @@ public ListenableFuture<SplitBatch> getNextBatch(int maxSize)
List<ConnectorSplit> connectorSplits = splitBatch.getSplits();
ImmutableList.Builder<Split> result = ImmutableList.builderWithExpectedSize(connectorSplits.size());
for (ConnectorSplit connectorSplit : connectorSplits) {
result.add(new Split(catalogHandle, connectorSplit));
result.add(new Split(
catalogHandle,
connectorSplit,
addressProvider.getAddresses(connectorSplit),
addressProvider.isRemotelyAccessible(connectorSplit)));
}
boolean noMoreSplits = splitBatch.isNoMoreSplits();
if (noMoreSplits) {
Expand Down
15 changes: 12 additions & 3 deletions core/trino-main/src/main/java/io/trino/split/SplitManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.metadata.TableFunctionHandle;
import io.trino.metadata.TableHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitAddressProvider;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.Constraint;
Expand All @@ -45,15 +46,21 @@
public class SplitManager
{
private final CatalogServiceProvider<ConnectorSplitManager> splitManagerProvider;
private final CatalogServiceProvider<ConnectorSplitAddressProvider> addressProviderProvider;
private final Tracer tracer;
private final int minScheduleSplitBatchSize;
private final ExecutorService executorService;
private final Executor executor;

@Inject
public SplitManager(CatalogServiceProvider<ConnectorSplitManager> splitManagerProvider, Tracer tracer, QueryManagerConfig config)
public SplitManager(
CatalogServiceProvider<ConnectorSplitManager> splitManagerProvider,
CatalogServiceProvider<ConnectorSplitAddressProvider> addressProviderProvider,
Tracer tracer,
QueryManagerConfig config)
{
this.splitManagerProvider = requireNonNull(splitManagerProvider, "splitManagerProvider is null");
this.addressProviderProvider = requireNonNull(addressProviderProvider, "addressProviderProvider is null");
this.tracer = requireNonNull(tracer, "tracer is null");
this.minScheduleSplitBatchSize = config.getMinScheduleSplitBatchSize();
this.executorService = newCachedThreadPool(daemonThreadsNamed("splits-manager-callback-%s"));
Expand All @@ -75,6 +82,7 @@ public SplitSource getSplits(
{
CatalogHandle catalogHandle = table.catalogHandle();
ConnectorSplitManager splitManager = splitManagerProvider.getService(catalogHandle);
ConnectorSplitAddressProvider addressProvider = addressProviderProvider.getService(catalogHandle);
if (!isAllowPushdownIntoConnectors(session)) {
dynamicFilter = DynamicFilter.EMPTY;
}
Expand All @@ -94,7 +102,7 @@ public SplitSource getSplits(
constraint);
}

SplitSource splitSource = new ConnectorAwareSplitSource(catalogHandle, source);
SplitSource splitSource = new ConnectorAwareSplitSource(catalogHandle, source, addressProvider);

Span span = splitSourceSpan(parentSpan, catalogHandle);

Expand All @@ -114,6 +122,7 @@ public SplitSource getSplits(Session session, Span parentSpan, TableFunctionHand
{
CatalogHandle catalogHandle = function.catalogHandle();
ConnectorSplitManager splitManager = splitManagerProvider.getService(catalogHandle);
ConnectorSplitAddressProvider addressProvider = addressProviderProvider.getService(catalogHandle);

ConnectorSplitSource source;
try (var ignore = scopedSpan(tracer.spanBuilder("SplitManager.getSplits")
Expand All @@ -126,7 +135,7 @@ public SplitSource getSplits(Session session, Span parentSpan, TableFunctionHand
function.functionHandle());
}

SplitSource splitSource = new ConnectorAwareSplitSource(catalogHandle, source);
SplitSource splitSource = new ConnectorAwareSplitSource(catalogHandle, source, addressProvider);

Span span = splitSourceSpan(parentSpan, catalogHandle);
return new TracingSplitSource(splitSource, tracer, Optional.of(span), "split-buffer");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@
import static io.trino.connector.CatalogServiceProviderModule.createPageSinkProvider;
import static io.trino.connector.CatalogServiceProviderModule.createPageSourceProviderFactory;
import static io.trino.connector.CatalogServiceProviderModule.createSchemaPropertyManager;
import static io.trino.connector.CatalogServiceProviderModule.createSplitAddressProviderProvider;
import static io.trino.connector.CatalogServiceProviderModule.createSplitManagerProvider;
import static io.trino.connector.CatalogServiceProviderModule.createTableFunctionProvider;
import static io.trino.connector.CatalogServiceProviderModule.createTableProceduresPropertyManager;
Expand Down Expand Up @@ -429,7 +430,7 @@ TypeSignature.class, new TypeSignatureDeserializer(),
nodeSchedulerConfig,
optimizerConfig,
secretsResolver));
this.splitManager = new SplitManager(createSplitManagerProvider(catalogManager), tracer, new QueryManagerConfig());
this.splitManager = new SplitManager(createSplitManagerProvider(catalogManager), createSplitAddressProviderProvider(catalogManager), tracer, new QueryManagerConfig());
this.pageSourceManager = new PageSourceManager(createPageSourceProviderFactory(catalogManager));
this.pageSinkManager = new PageSinkManager(createPageSinkProvider(catalogManager));
this.indexManager = new IndexManager(createIndexProvider(catalogManager));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ public void setup()
InternalNode node = nodes.get(i);
ImmutableList.Builder<Split> initialSplits = ImmutableList.builder();
for (int j = 0; j < MAX_SPLITS_PER_NODE + MAX_PENDING_SPLITS_PER_TASK_PER_NODE; j++) {
initialSplits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote(i)));
TestSplitRemote s = new TestSplitRemote(i);
initialSplits.add(new Split(TEST_CATALOG_HANDLE, s, s.getAddresses(), true));
}
TaskId taskId = new TaskId(new StageId("test", 1), i, 0);
MockRemoteTaskFactory.MockRemoteTask remoteTask = remoteTaskFactory.createTableScanTask(taskId, node, initialSplits.build(), nodeTaskMap.createPartitionedSplitCountTracker(node, taskId));
Expand All @@ -165,7 +166,8 @@ public void setup()
}

for (int i = 0; i < SPLITS; i++) {
splits.add(new Split(TEST_CATALOG_HANDLE, new TestSplitRemote(ThreadLocalRandom.current().nextInt(DATA_NODES))));
TestSplitRemote bs = new TestSplitRemote(ThreadLocalRandom.current().nextInt(DATA_NODES));
splits.add(new Split(TEST_CATALOG_HANDLE, bs, bs.getAddresses(), true));
}

NodeScheduler nodeScheduler = new NodeScheduler(getNodeSelectorFactory(nodeTaskMap));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.trino.operator.index.IndexManager;
import io.trino.server.protocol.spooling.QueryDataEncoders;
import io.trino.server.protocol.spooling.SpoolingEnabledConfig;
import io.trino.spi.HostAddress;
import io.trino.spi.NodeVersion;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spiller.GenericSpillerFactory;
Expand Down Expand Up @@ -82,7 +83,7 @@ private TaskTestUtils() {}

private static final CatalogHandle CATALOG_HANDLE = TEST_TABLE_HANDLE.catalogHandle();

public static final ScheduledSplit SPLIT = new ScheduledSplit(0, TABLE_SCAN_NODE_ID, new Split(CATALOG_HANDLE, TestingSplit.createLocalSplit()));
public static final ScheduledSplit SPLIT = new ScheduledSplit(0, TABLE_SCAN_NODE_ID, new Split(CATALOG_HANDLE, TestingSplit.createLocalSplit(), ImmutableList.of(HostAddress.fromString("127.0.0.1")), false));

public static final List<SplitAssignment> EMPTY_SPLIT_ASSIGNMENTS = ImmutableList.of();

Expand Down
Loading