diff --git a/workers/scheduler/scheduler.go b/workers/scheduler/scheduler.go index 87844ad1..c3dc1b81 100644 --- a/workers/scheduler/scheduler.go +++ b/workers/scheduler/scheduler.go @@ -457,13 +457,48 @@ func (s *Scheduler) SchedulePipeline(p *gaia.Pipeline, args []*gaia.Argument) (* return &run, s.storeService.PipelinePutRun(&run) } +func getDependency(s string, r *gaia.PipelineRun) *gaia.Job { + for _, p := range r.Jobs { + if p.Title == s { + return p + } + } + + return nil +} + // executeJob executes a job and informs via triggerSave that the job can be saved to the store. // This method is blocking. -func executeJob(j gaia.Job, pS plugin.Plugin, triggerSave chan gaia.Job) { +func executeJob(j gaia.Job, pS plugin.Plugin, triggerSave chan gaia.Job, run *gaia.PipelineRun) { // Set Job to running and trigger save j.Status = gaia.JobRunning triggerSave <- j + // Load in the jobs previous dependencies and look for possible output. + // For some reason the job's dependencies are not up to date here. + // Need to get the run information from the PipelineRun. + //log.Println("depends On: ", j.DependsOn) + for _, dependingJob := range j.DependsOn { + dep := getDependency(dependingJob.Title, run) + if dep == nil { + continue + } + + // look for output + if dep.Outs == nil { + continue + } + + // Set up any arguments which might match which are a dependency to this job. + for _, out := range dep.Outs { + for _, arg := range j.Args { + if arg.Key == out.Key { + arg.Value = out.Value + } + } + } + } + // Execute job if err := pS.Execute(&j); err != nil { gaia.Cfg.Logger.Debug("error during job execution", "error", err.Error(), "job", j) @@ -760,7 +795,7 @@ func (s *Scheduler) executeScheduler(r *gaia.PipelineRun, pS plugin.Plugin) { mw.Replace(*wl) // Start execution - go executeJob(*j, pS, triggerSave) + go executeJob(*j, pS, triggerSave, r) } } }