Skip to content

Commit 9bb5231

Browse files
skrzypo987losipiuk
authored andcommitted
Deprecate partitionHandle argument on ConnectorSplitSource::getNextBatch
It was always a `NOT_PARTITION` since the removal of grouped execution, so it can be hardcoded at this point. The ConnectorNodePartitioningProvider::listPartitionHandles method can also be deprecated at this point
1 parent ced7c97 commit 9bb5231

File tree

37 files changed

+68
-374
lines changed

37 files changed

+68
-374
lines changed

core/trino-main/src/main/java/io/trino/execution/scheduler/FixedSourcePartitionedScheduler.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import io.trino.metadata.InternalNode;
2525
import io.trino.metadata.Split;
2626
import io.trino.server.DynamicFilterService;
27-
import io.trino.spi.connector.ConnectorPartitionHandle;
2827
import io.trino.split.SplitSource;
2928
import io.trino.sql.planner.plan.PlanNodeId;
3029

@@ -41,7 +40,6 @@
4140
import static com.google.common.base.Verify.verify;
4241
import static io.airlift.concurrent.MoreFutures.whenAnyComplete;
4342
import static io.trino.execution.scheduler.SourcePartitionedScheduler.newSourcePartitionedSchedulerAsSourceScheduler;
44-
import static io.trino.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
4543
import static java.util.Objects.requireNonNull;
4644

4745
public class FixedSourcePartitionedScheduler
@@ -64,15 +62,13 @@ public FixedSourcePartitionedScheduler(
6462
BucketNodeMap bucketNodeMap,
6563
int splitBatchSize,
6664
NodeSelector nodeSelector,
67-
List<ConnectorPartitionHandle> partitionHandles,
6865
DynamicFilterService dynamicFilterService,
6966
TableExecuteContextManager tableExecuteContextManager)
7067
{
7168
requireNonNull(stageExecution, "stageExecution is null");
7269
requireNonNull(splitSources, "splitSources is null");
7370
requireNonNull(bucketNodeMap, "bucketNodeMap is null");
7471
checkArgument(!requireNonNull(nodes, "nodes is null").isEmpty(), "nodes is empty");
75-
requireNonNull(partitionHandles, "partitionHandles is null");
7672
requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");
7773

7874
this.stageExecution = stageExecution;
@@ -83,7 +79,6 @@ public FixedSourcePartitionedScheduler(
8379
BucketedSplitPlacementPolicy splitPlacementPolicy = new BucketedSplitPlacementPolicy(nodeSelector, nodes, bucketNodeMap, stageExecution::getAllTasks);
8480

8581
ArrayList<SourceScheduler> sourceSchedulers = new ArrayList<>();
86-
checkArgument(partitionHandles.equals(ImmutableList.of(NOT_PARTITIONED)), "PartitionHandles should be [NOT_PARTITIONED]");
8782

8883
boolean firstPlanNode = true;
8984

core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@
6161
import io.trino.spi.ErrorCode;
6262
import io.trino.spi.QueryId;
6363
import io.trino.spi.TrinoException;
64-
import io.trino.spi.connector.ConnectorPartitionHandle;
6564
import io.trino.spi.exchange.Exchange;
6665
import io.trino.spi.exchange.ExchangeContext;
6766
import io.trino.spi.exchange.ExchangeId;
@@ -151,7 +150,6 @@
151150
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
152151
import static io.trino.spi.StandardErrorCode.NO_NODES_AVAILABLE;
153152
import static io.trino.spi.StandardErrorCode.REMOTE_TASK_FAILED;
154-
import static io.trino.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
155153
import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION;
156154
import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION;
157155
import static io.trino.sql.planner.SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION;
@@ -1428,7 +1426,6 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
14281426
List<PlanNodeId> schedulingOrder = fragment.getPartitionedSources();
14291427
Optional<CatalogName> catalogName = partitioningHandle.getConnectorId();
14301428
checkArgument(catalogName.isPresent(), "No connector ID for partitioning handle: %s", partitioningHandle);
1431-
List<ConnectorPartitionHandle> connectorPartitionHandles = ImmutableList.of(NOT_PARTITIONED);
14321429

14331430
BucketNodeMap bucketNodeMap;
14341431
List<InternalNode> stageNodeList;
@@ -1454,7 +1451,6 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
14541451
bucketNodeMap,
14551452
splitBatchSize,
14561453
nodeScheduler.createNodeSelector(session, catalogName),
1457-
connectorPartitionHandles,
14581454
dynamicFilterService,
14591455
tableExecuteContextManager);
14601456
}

core/trino-main/src/main/java/io/trino/split/ConnectorAwareSplitSource.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727

2828
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
2929
import static io.airlift.concurrent.MoreFutures.toListenableFuture;
30-
import static io.trino.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
3130
import static java.util.Objects.requireNonNull;
3231

3332
public class ConnectorAwareSplitSource
@@ -51,7 +50,7 @@ public CatalogName getCatalogName()
5150
@Override
5251
public ListenableFuture<SplitBatch> getNextBatch(int maxSize)
5352
{
54-
ListenableFuture<ConnectorSplitBatch> nextBatch = toListenableFuture(source.getNextBatch(NOT_PARTITIONED, maxSize));
53+
ListenableFuture<ConnectorSplitBatch> nextBatch = toListenableFuture(source.getNextBatch(maxSize));
5554
return Futures.transform(nextBatch, splitBatch -> {
5655
ImmutableList.Builder<Split> result = ImmutableList.builder();
5756
for (ConnectorSplit connectorSplit : splitBatch.getSplits()) {

core/trino-main/src/main/java/io/trino/sql/planner/NodePartitioningManager.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import io.trino.spi.connector.BucketFunction;
3030
import io.trino.spi.connector.ConnectorBucketNodeMap;
3131
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
32-
import io.trino.spi.connector.ConnectorPartitionHandle;
3332
import io.trino.spi.connector.ConnectorSplit;
3433
import io.trino.spi.type.Type;
3534
import io.trino.split.EmptySplit;
@@ -103,19 +102,6 @@ public BucketFunction getBucketFunction(Session session, PartitioningHandle part
103102
return bucketFunction;
104103
}
105104

106-
public List<ConnectorPartitionHandle> listPartitionHandles(
107-
Session session,
108-
PartitioningHandle partitioningHandle)
109-
{
110-
CatalogName catalogName = partitioningHandle.getConnectorId()
111-
.orElseThrow(() -> new IllegalArgumentException("No connector ID for partitioning handle: " + partitioningHandle));
112-
ConnectorNodePartitioningProvider partitioningProvider = getPartitioningProvider(catalogName);
113-
return partitioningProvider.listPartitionHandles(
114-
partitioningHandle.getTransactionHandle().orElseThrow(() -> new IllegalArgumentException("No transactionHandle for partitioning handle: " + partitioningHandle)),
115-
session.toConnectorSession(catalogName),
116-
partitioningHandle.getConnectorHandle());
117-
}
118-
119105
public NodePartitionMap getNodePartitioningMap(Session session, PartitioningHandle partitioningHandle)
120106
{
121107
requireNonNull(session, "session is null");

core/trino-main/src/test/java/io/trino/execution/scheduler/TestSourcePartitionedScheduler.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import io.trino.operator.RetryPolicy;
4343
import io.trino.server.DynamicFilterService;
4444
import io.trino.spi.QueryId;
45-
import io.trino.spi.connector.ConnectorPartitionHandle;
4645
import io.trino.spi.connector.ConnectorSplit;
4746
import io.trino.spi.connector.ConnectorSplitSource;
4847
import io.trino.spi.connector.DynamicFilter;
@@ -81,7 +80,6 @@
8180
import java.util.concurrent.ScheduledExecutorService;
8281
import java.util.function.Supplier;
8382

84-
import static com.google.common.base.Preconditions.checkArgument;
8583
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
8684
import static io.trino.SessionTestUtils.TEST_SESSION;
8785
import static io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy.NODE;
@@ -94,7 +92,6 @@
9492
import static io.trino.metadata.FunctionManager.createTestingFunctionManager;
9593
import static io.trino.metadata.MetadataManager.createTestMetadataManager;
9694
import static io.trino.spi.StandardErrorCode.NO_NODES_AVAILABLE;
97-
import static io.trino.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
9895
import static io.trino.spi.type.BigintType.BIGINT;
9996
import static io.trino.spi.type.VarcharType.VARCHAR;
10097
import static io.trino.sql.DynamicFilters.createDynamicFilterExpression;
@@ -701,7 +698,7 @@ private static ConnectorSplitSource createBlockedSplitSource()
701698
return new ConnectorSplitSource()
702699
{
703700
@Override
704-
public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize)
701+
public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
705702
{
706703
return new CompletableFuture<>();
707704
}
@@ -781,9 +778,8 @@ synchronized void addSplits(int count)
781778
}
782779

783780
@Override
784-
public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize)
781+
public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
785782
{
786-
checkArgument(partitionHandle.equals(NOT_PARTITIONED), "partitionHandle must be NOT_PARTITIONED");
787783
return notEmptyFuture
788784
.thenApply(x -> getBatch(maxSize))
789785
.thenApply(splits -> new ConnectorSplitBatch(splits, isFinished()));

core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorNodePartitioningProvider.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@
1818
import java.util.List;
1919
import java.util.function.ToIntFunction;
2020

21-
import static io.trino.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
22-
import static java.util.Collections.singletonList;
23-
2421
public interface ConnectorNodePartitioningProvider
2522
{
2623
// TODO: Use ConnectorPartitionHandle (instead of int) to represent individual buckets.
@@ -31,10 +28,13 @@ public interface ConnectorNodePartitioningProvider
3128
* <p>
3229
* This method must be implemented for connectors that support addressable split discovery.
3330
* The partitions return here will be used as address for the purpose of split discovery.
31+
*
32+
* @deprecated The method is not used. Implementations can be simply removed
3433
*/
34+
@Deprecated
3535
default List<ConnectorPartitionHandle> listPartitionHandles(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle)
3636
{
37-
return singletonList(NOT_PARTITIONED);
37+
throw new UnsupportedOperationException();
3838
}
3939

4040
ConnectorBucketNodeMap getBucketNodeMap(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorPartitioningHandle partitioningHandle);

core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorPartitionHandle.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.trino.spi.connector;
1515

1616
@SuppressWarnings("ClassMayBeInterface")
17+
@Deprecated
1718
public abstract class ConnectorPartitionHandle
1819
{
1920
@Override

core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplitSource.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,22 @@
1818
import java.util.Optional;
1919
import java.util.concurrent.CompletableFuture;
2020

21+
import static io.trino.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
2122
import static java.util.Objects.requireNonNull;
2223

2324
public interface ConnectorSplitSource
2425
extends Closeable
2526
{
26-
CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize);
27+
@Deprecated
28+
default CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize)
29+
{
30+
throw new UnsupportedOperationException();
31+
}
32+
33+
default CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
34+
{
35+
return getNextBatch(NOT_PARTITIONED, maxSize);
36+
}
2737

2838
@Override
2939
void close();

core/trino-spi/src/main/java/io/trino/spi/connector/FixedSplitSource.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import java.util.Optional;
1818
import java.util.concurrent.CompletableFuture;
1919

20-
import static io.trino.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
2120
import static java.util.Objects.requireNonNull;
2221
import static java.util.concurrent.CompletableFuture.completedFuture;
2322
import static java.util.stream.Collectors.toUnmodifiableList;
@@ -50,12 +49,8 @@ private FixedSplitSource(Iterable<? extends ConnectorSplit> splits, Optional<Lis
5049

5150
@SuppressWarnings("ObjectEquality")
5251
@Override
53-
public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize)
52+
public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
5453
{
55-
if (!partitionHandle.equals(NOT_PARTITIONED)) {
56-
throw new IllegalArgumentException("partitionHandle must be NOT_PARTITIONED");
57-
}
58-
5954
int remainingSplits = splits.size() - offset;
6055
int size = Math.min(remainingSplits, maxSize);
6156
List<ConnectorSplit> results = splits.subList(offset, offset + size);

core/trino-spi/src/main/java/io/trino/spi/connector/NotPartitionedPartitionHandle.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package io.trino.spi.connector;
1515

16+
@Deprecated
1617
public final class NotPartitionedPartitionHandle
1718
extends ConnectorPartitionHandle
1819
{

0 commit comments

Comments
 (0)