Preserve sourcePartition to Splits mapping in task descriptor #19478
Preserve sourcePartition to Splits mapping in task descriptor #19478losipiuk merged 3 commits intotrinodb:masterfrom
Conversation
If we observe on task failure that partition the task was handling was already processed by the task ignore the failure.
Explicitly use taskPartition in place of partition in SplitAssignerTester. This is a preparatory commit to avoid confusion as we will be dealing with source data partitions in same context with upcoming changes.
There was a problem hiding this comment.
REPLICATED_SOURCE_PARTITION is also 0
document why 0 is ok here or give the constant a name
There was a problem hiding this comment.
USed:
// marker source partition id for data which is not hash distributed
int SINGLE_SOURCE_PARTITION_ID = 0;
.../java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/SplitsMapping.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/SplitsMapping.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
this is sometimes used for iteration. can this return a Stream<Entry?
There was a problem hiding this comment.
In the production code the iteration usecase is only for generating debug string when we run out of memory in task descriptor storage. I think it is not worth optimizing.
Let's discuss tomorrow - maybe I do not understand what benefits we can get here.
There was a problem hiding this comment.
The code can be made much more simpler and hopefully not much less performant:
public SplitsMapping build()
{
ImmutableMap.Builder<PlanNodeId, Map<Integer, List<Split>>> result = ImmutableMap.builder();
for (PlanNodeId planNodeId : Sets.union(originalMapping.splits.keySet(), updates.keySet())) {
Map<Integer, List<Split>> planNodeOriginalMapping = originalMapping.splits.getOrDefault(planNodeId, ImmutableMap.of());
Map<Integer, ImmutableList.Builder<Split>> planNodeUpdates = updates.getOrDefault(planNodeId, ImmutableMap.of());
if (planNodeUpdates.isEmpty()) {
// just use original splits for planNodeId
result.put(planNodeId, planNodeOriginalMapping);
continue;
}
// create new mapping for planNodeId reusing as much of source as possible
ImmutableMap.Builder<Integer, List<Split>> targetSplitsMapBuilder = ImmutableMap.builder();
for (Integer sourcePartitionId : Sets.union(planNodeOriginalMapping.keySet(), planNodeUpdates.keySet())) {
@Nullable List<Split> originalSplits = planNodeOriginalMapping.get(sourcePartitionId);
@Nullable ImmutableList.Builder<Split> splitUpdates = planNodeUpdates.get(sourcePartitionId);
targetSplitsMapBuilder.put(sourcePartitionId, mergeIfPresent(originalSplits, splitUpdates));
}
result.put(planNodeId, targetSplitsMapBuilder.buildOrThrow());
}
return new SplitsMapping(result.buildOrThrow());
}
private static <T> List<T> mergeIfPresent(@Nullable List<T> list, @Nullable ImmutableList.Builder<T> additionalElements)
{
if (additionalElements == null) {
// reuse source immutable split list
return requireNonNull(list, "list is null");
}
if (list == null) {
return additionalElements.build();
}
return ImmutableList.<T>builder()
.addAll(list)
.addAll(additionalElements.build())
.build();
}There was a problem hiding this comment.
Simpler indeed.
Thanks for reminding my that I cannot code :P
There was a problem hiding this comment.
I think you earned the Co-authored-By: ...
core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/SplitsMapping.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Like
Table<PlanNodeId, Integer, ImmutableList.Builder<Split>> ....;
It would work - but Table interface is less nice to work with. E.g you do not have computeIfAbsent
...trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/SplitAssignerTester.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/execution/scheduler/faulttolerant/TestSplitsMapping.java
Outdated
Show resolved
Hide resolved
0c78d49 to
5b43fa1
Compare
We need to preserve information which splits map to which source partition id in task descriptor to be able to split task descriptor into multiple ones. Splitting is important in case we made a scheduling mistake and packed to many source partitions into single task descriptor and execution of such task is not possible due to lack of resources.
5b43fa1 to
49116d1
Compare
|
@findepi I added one more tiny commit - PTAL when you have a chance. |
49116d1 to
565a173
Compare
Actually it does not work correctly due to write skew mitigation logic - I will send separate - slightly more complex PR to address that. |
Description
We need to preserve information which splits map to which source
partition id in task descriptor to be able to split task descriptor into
multiple ones. Splitting is important in case we made a scheduling
mistake and packed to many source partitions into single task descriptor
and execution of such task is not possible due to lack of resources.
Release notes
(x) This is not user-visible or is docs only, and no release notes are required.