Skip to content

Commit b54b7fc

Browse files
skrzypo987losipiuk
authored andcommitted
Reduce Lifespan object to a taskWide singleton
After the removal of grouped execution it is impossible for this object to be anything else than the "taskWide" singleton. This object is at that point completely useless and will exist just until its last occurrence is removed, which will take place in the following commits.
1 parent 4c89f54 commit b54b7fc

File tree

7 files changed

+15
-62
lines changed

7 files changed

+15
-62
lines changed

core/trino-main/src/main/java/io/trino/execution/Lifespan.java

Lines changed: 7 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -18,83 +18,49 @@
1818
import com.fasterxml.jackson.annotation.JsonValue;
1919
import org.openjdk.jol.info.ClassLayout;
2020

21-
import java.util.Objects;
22-
23-
import static com.google.common.base.Preconditions.checkArgument;
24-
import static com.google.common.base.Preconditions.checkState;
25-
import static java.lang.Integer.parseInt;
26-
2721
public class Lifespan
2822
{
2923
private static final int INSTANCE_SIZE = ClassLayout.parseClass(Lifespan.class).instanceSize();
3024

31-
private static final Lifespan TASK_WIDE = new Lifespan(false, 0);
32-
33-
private final boolean grouped;
34-
private final int groupId;
25+
private static final Lifespan TASK_WIDE = new Lifespan();
3526

3627
public static Lifespan taskWide()
3728
{
3829
return TASK_WIDE;
3930
}
4031

41-
public static Lifespan driverGroup(int id)
32+
private Lifespan()
4233
{
43-
return new Lifespan(true, id);
44-
}
45-
46-
private Lifespan(boolean grouped, int groupId)
47-
{
48-
this.grouped = grouped;
49-
this.groupId = groupId;
5034
}
5135

5236
public boolean isTaskWide()
5337
{
54-
return !grouped;
55-
}
56-
57-
public int getId()
58-
{
59-
checkState(grouped);
60-
return groupId;
38+
return true;
6139
}
6240

6341
@JsonCreator
6442
public static Lifespan jsonCreator(String value)
6543
{
66-
if (value.equals("TaskWide")) {
67-
return Lifespan.taskWide();
68-
}
69-
checkArgument(value.startsWith("Group"));
70-
return Lifespan.driverGroup(parseInt(value.substring("Group".length())));
44+
return Lifespan.taskWide();
7145
}
7246

7347
@Override
7448
@JsonValue
7549
public String toString()
7650
{
77-
return grouped ? "Group" + groupId : "TaskWide";
51+
return "TaskWide";
7852
}
7953

8054
@Override
8155
public boolean equals(Object o)
8256
{
83-
if (this == o) {
84-
return true;
85-
}
86-
if (o == null || getClass() != o.getClass()) {
87-
return false;
88-
}
89-
Lifespan that = (Lifespan) o;
90-
return grouped == that.grouped &&
91-
groupId == that.groupId;
57+
return true;
9258
}
9359

9460
@Override
9561
public int hashCode()
9662
{
97-
return Objects.hash(grouped, groupId);
63+
return 42;
9864
}
9965

10066
public long getRetainedSizeInBytes()

core/trino-main/src/main/java/io/trino/execution/scheduler/FixedSourcePartitionedScheduler.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,7 @@ public FixedSourcePartitionedScheduler(
122122

123123
private ConnectorPartitionHandle partitionHandleFor(Lifespan lifespan)
124124
{
125-
if (lifespan.isTaskWide()) {
126-
return NOT_PARTITIONED;
127-
}
128-
return partitionHandles.get(lifespan.getId());
125+
return NOT_PARTITIONED;
129126
}
130127

131128
@Override

core/trino-main/src/main/java/io/trino/execution/scheduler/SourcePartitionedScheduler.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import io.trino.execution.RemoteTask;
2525
import io.trino.execution.TableExecuteContext;
2626
import io.trino.execution.TableExecuteContextManager;
27-
import io.trino.execution.scheduler.FixedSourcePartitionedScheduler.BucketedSplitPlacementPolicy;
2827
import io.trino.metadata.InternalNode;
2928
import io.trino.metadata.Split;
3029
import io.trino.server.DynamicFilterService;
@@ -359,10 +358,6 @@ else if (pendingSplits.isEmpty()) {
359358
Multimap<InternalNode, Lifespan> noMoreSplitsNotification = ImmutableMultimap.of();
360359
if (pendingSplits.isEmpty() && scheduleGroup.state == ScheduleGroupState.NO_MORE_SPLITS) {
361360
scheduleGroup.state = ScheduleGroupState.DONE;
362-
if (!lifespan.isTaskWide()) {
363-
InternalNode node = ((BucketedSplitPlacementPolicy) splitPlacementPolicy).getNodeForBucket(lifespan.getId());
364-
noMoreSplitsNotification = ImmutableMultimap.of(node, lifespan);
365-
}
366361
}
367362

368363
// assign the splits with successful placements

core/trino-main/src/main/java/io/trino/sql/planner/NodePartitioningManager.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -217,14 +217,11 @@ private ToIntFunction<Split> getSplitToBucket(Session session, PartitioningHandl
217217
return split -> {
218218
int bucket;
219219
if (split.getConnectorSplit() instanceof EmptySplit) {
220-
bucket = split.getLifespan().isTaskWide() ? 0 : split.getLifespan().getId();
220+
bucket = 0;
221221
}
222222
else {
223223
bucket = splitBucketFunction.applyAsInt(split.getConnectorSplit());
224224
}
225-
if (!split.getLifespan().isTaskWide()) {
226-
checkArgument(split.getLifespan().getId() == bucket);
227-
}
228225
return bucket;
229226
};
230227
}

core/trino-main/src/test/java/io/trino/operator/TestDriverStats.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
public class TestDriverStats
3131
{
3232
public static final DriverStats EXPECTED = new DriverStats(
33-
Lifespan.driverGroup(21),
33+
Lifespan.taskWide(),
3434

3535
new DateTime(1),
3636
new DateTime(2),
@@ -86,7 +86,7 @@ public void testJson()
8686

8787
public static void assertExpectedDriverStats(DriverStats actual)
8888
{
89-
assertEquals(actual.getLifespan(), Lifespan.driverGroup(21));
89+
assertEquals(actual.getLifespan(), Lifespan.taskWide());
9090

9191
assertEquals(actual.getCreateTime(), new DateTime(1, UTC));
9292
assertEquals(actual.getStartTime(), new DateTime(2, UTC));

core/trino-main/src/test/java/io/trino/operator/exchange/TestLocalExchange.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -671,7 +671,7 @@ public void testMismatchedExecutionStrategy()
671671
Optional.empty(),
672672
LOCAL_EXCHANGE_MAX_BUFFERED_BYTES,
673673
TYPE_OPERATOR_FACTORY);
674-
assertThatThrownBy(() -> ungroupedLocalExchangeFactory.getLocalExchange(Lifespan.driverGroup(3)))
674+
assertThatThrownBy(() -> ungroupedLocalExchangeFactory.getLocalExchange(Lifespan.taskWide()))
675675
.isInstanceOf(IllegalArgumentException.class)
676676
.hasMessage("LocalExchangeFactory is declared as UNGROUPED_EXECUTION. Driver-group exchange cannot be created.");
677677

core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,12 +183,11 @@ public void testRegular()
183183
testingTaskResource.setInitialTaskInfo(remoteTask.getTaskInfo());
184184
remoteTask.start();
185185

186-
Lifespan lifespan = Lifespan.driverGroup(3);
187-
remoteTask.addSplits(ImmutableMultimap.of(TABLE_SCAN_NODE_ID, new Split(new CatalogName("test"), TestingSplit.createLocalSplit(), lifespan)));
186+
remoteTask.addSplits(ImmutableMultimap.of(TABLE_SCAN_NODE_ID, new Split(new CatalogName("test"), TestingSplit.createLocalSplit(), Lifespan.taskWide())));
188187
poll(() -> testingTaskResource.getTaskSplitAssignment(TABLE_SCAN_NODE_ID) != null);
189188
poll(() -> testingTaskResource.getTaskSplitAssignment(TABLE_SCAN_NODE_ID).getSplits().size() == 1);
190189

191-
remoteTask.noMoreSplits(TABLE_SCAN_NODE_ID, lifespan);
190+
remoteTask.noMoreSplits(TABLE_SCAN_NODE_ID, Lifespan.taskWide());
192191
poll(() -> testingTaskResource.getTaskSplitAssignment(TABLE_SCAN_NODE_ID).getNoMoreSplitsForLifespan().size() == 1);
193192

194193
remoteTask.noMoreSplits(TABLE_SCAN_NODE_ID);
@@ -413,8 +412,7 @@ private void runTest(FailureScenario failureScenario)
413412
private void addSplit(RemoteTask remoteTask, TestingTaskResource testingTaskResource, int expectedSplitsCount)
414413
throws InterruptedException
415414
{
416-
Lifespan lifespan = Lifespan.driverGroup(3);
417-
remoteTask.addSplits(ImmutableMultimap.of(TABLE_SCAN_NODE_ID, new Split(new CatalogName("test"), TestingSplit.createLocalSplit(), lifespan)));
415+
remoteTask.addSplits(ImmutableMultimap.of(TABLE_SCAN_NODE_ID, new Split(new CatalogName("test"), TestingSplit.createLocalSplit(), Lifespan.taskWide())));
418416
// wait for splits to be received by remote task
419417
poll(() -> testingTaskResource.getTaskSplitAssignment(TABLE_SCAN_NODE_ID) != null);
420418
poll(() -> testingTaskResource.getTaskSplitAssignment(TABLE_SCAN_NODE_ID).getSplits().size() == expectedSplitsCount);

0 commit comments

Comments
 (0)