Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ private synchronized void schedulePartitionedSource(SplitAssignment splitAssignm
ImmutableList.Builder<DriverSplitRunner> runners = ImmutableList.builderWithExpectedSize(removed.size());
Comment thread
radek-kondziolka marked this conversation as resolved.
Outdated
for (ScheduledSplit scheduledSplit : removed) {
// create a new driver for the split
runners.add(partitionedDriverRunnerFactory.createDriverRunner(scheduledSplit));
runners.add(partitionedDriverRunnerFactory.createPartitionedDriverRunner(scheduledSplit));
}
enqueueDriverSplitRunner(false, runners.build());

Expand All @@ -344,7 +344,7 @@ private void scheduleDriversForTaskLifeCycle()
List<DriverSplitRunner> runners = new ArrayList<>();
for (DriverSplitRunnerFactory driverRunnerFactory : driverRunnerFactoriesWithTaskLifeCycle) {
for (int i = 0; i < driverRunnerFactory.getDriverInstances().orElse(1); i++) {
runners.add(driverRunnerFactory.createDriverRunner(null));
runners.add(driverRunnerFactory.createUnpartitionedDriverRunner());
}
}
enqueueDriverSplitRunner(true, runners);
Expand Down Expand Up @@ -574,15 +574,22 @@ private DriverSplitRunnerFactory(DriverFactory driverFactory, boolean partitione
this.pipelineContext = taskContext.addPipelineContext(driverFactory.getPipelineId(), driverFactory.isInputDriver(), driverFactory.isOutputDriver(), partitioned);
}

// TODO: split this method into two: createPartitionedDriverRunner and createUnpartitionedDriverRunner.
// The former will take two arguments, and the latter will take one. This will simplify the signature quite a bit.
public DriverSplitRunner createDriverRunner(@Nullable ScheduledSplit partitionedSplit)
public DriverSplitRunner createPartitionedDriverRunner(ScheduledSplit partitionedSplit)
{
return createDriverRunner(partitionedSplit, partitionedSplit.getSplit().getSplitWeight().getRawValue());
}

public DriverSplitRunner createUnpartitionedDriverRunner()
{
return createDriverRunner(null, 0);
}

public DriverSplitRunner createDriverRunner(@Nullable ScheduledSplit partitionedSplit, long splitWeight)
{
checkState(!noMoreDriverRunner.get(), "noMoreDriverRunner is set");
pendingCreations.incrementAndGet();
// create driver context immediately so the driver existence is recorded in the stats
// the number of drivers is used to balance work across nodes
long splitWeight = partitionedSplit == null ? 0 : partitionedSplit.getSplit().getSplitWeight().getRawValue();
DriverContext driverContext = pipelineContext.addDriverContext(splitWeight);
return new DriverSplitRunner(this, driverContext, partitionedSplit);
}
Expand Down