Skip to content

Commit a801d03

Browse files
committed
Regression: fix results with out of order tasks
The pipeline run reconciler builds a pipeline run state on every run, which resolves task references, expands result and processes matrix fan outs. The current process is incremental in a single loop, where each new PipelineTask resolution depends on the state of PipelineTasks resolved before. This is problematic because tasks are not necessarily defined in the pipeline in order of execution (which is undefined, given that pipelines are DAGs). Since this PR is a fix to a regression, it aims to be as minimal as possible. The smallest solution available is to implement some sorting in the list of tasks, so that the incremental state can work correctly. This PR splits the process into two runs, one for tasks that have been already started (and possibly completed), and a second one that includes all remaining tasks. The first group of task does not need matrix fan outs (they have already been processed) or result resolution, so its state can be safely build incrementally. The second group is executed starting from the state of the second group. Any task that is a candidate for execution in this this reconcile cycle must have its results resolved through the state of the first group. Testing with the current code arrangement is a bit challenging, as we ignore result resolution errors in the code, which is ok only in some cases: - result resolution due to task not found or result not defined is permanent and should not be ignored - result resolution due to a result not being available yet is ephemeral (possibly) and should not cause a failure Currently one function checks for all these conditions and returns one error, so it's not possible to safely distinguish them. This will require some refactoring to be fixed in a follow up patch. For now, a reconcile unit test (TBD) may be able to test the fix. Fixes: #7103 Signed-off-by: Andrea Frittoli <[email protected]>
1 parent e8b85e2 commit a801d03

File tree

1 file changed

+42
-3
lines changed

1 file changed

+42
-3
lines changed

pkg/reconciler/pipelinerun/pipelinerun.go

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -325,10 +325,10 @@ func (c *Reconciler) resolvePipelineState(
325325
ctx context.Context,
326326
tasks []v1.PipelineTask,
327327
pipelineMeta *metav1.ObjectMeta,
328-
pr *v1.PipelineRun) (resources.PipelineRunState, error) {
328+
pr *v1.PipelineRun,
329+
pst resources.PipelineRunState) (resources.PipelineRunState, error) {
329330
ctx, span := c.tracerProvider.Tracer(TracerName).Start(ctx, "resolvePipelineState")
330331
defer span.End()
331-
pst := resources.PipelineRunState{}
332332
// Resolve each task individually because they each could have a different reference context (remote or local).
333333
for _, task := range tasks {
334334
// We need the TaskRun name to ensure that we don't perform an additional remote resolution request for a PipelineTask
@@ -536,7 +536,46 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1.PipelineRun, getPipel
536536
if len(pipelineSpec.Finally) > 0 {
537537
tasks = append(tasks, pipelineSpec.Finally...)
538538
}
539-
pipelineRunState, err := c.resolvePipelineState(ctx, tasks, pipelineMeta.ObjectMeta, pr)
539+
540+
// We spit tasks in two lists:
541+
// - those with a completed (Task|Custom)Run reference (i.e. those that finished running)
542+
// - those without a (Task|Custom)Run reference
543+
// We resolve the status for the former first, to collect all results available at this stage
544+
// We know that tasks in progress or completed have had their fan-out alteady calculated so
545+
// they can be safely processed in the first iteration. The underlying assumption is that if
546+
// a PipelineTask has at least one TaskRun associated, then all its TaskRuns have been
547+
// created already.
548+
// The second group takes as input the partial state built in the first iteration and finally
549+
// the two results are collated
550+
ranOrRunningTaskNames := sets.Set[string]{}
551+
ranOrRunningTasks := []v1.PipelineTask{}
552+
notStartedTasks := []v1.PipelineTask{}
553+
554+
for _, child := range pr.Status.ChildReferences {
555+
ranOrRunningTaskNames.Insert(child.PipelineTaskName)
556+
}
557+
for _, task := range tasks {
558+
if ranOrRunningTaskNames.Has(task.Name) {
559+
ranOrRunningTasks = append(ranOrRunningTasks, task)
560+
} else {
561+
notStartedTasks = append(notStartedTasks, task)
562+
}
563+
}
564+
// First iteration
565+
pst := resources.PipelineRunState{}
566+
pipelineRunState, err := c.resolvePipelineState(ctx, ranOrRunningTasks, pipelineMeta.ObjectMeta, pr, pst)
567+
switch {
568+
case errors.Is(err, remote.ErrRequestInProgress):
569+
message := fmt.Sprintf("PipelineRun %s/%s awaiting remote resource", pr.Namespace, pr.Name)
570+
pr.Status.MarkRunning(v1.TaskRunReasonResolvingTaskRef, message)
571+
return nil
572+
case err != nil:
573+
return err
574+
default:
575+
}
576+
577+
// Second iteration
578+
pipelineRunState, err = c.resolvePipelineState(ctx, notStartedTasks, pipelineMeta.ObjectMeta, pr, pipelineRunState)
540579
switch {
541580
case errors.Is(err, remote.ErrRequestInProgress):
542581
message := fmt.Sprintf("PipelineRun %s/%s awaiting remote resource", pr.Namespace, pr.Name)

0 commit comments

Comments
 (0)