diff --git a/pkg/jobrunaggregator/jobrunaggregatoranalyzer/analyzer.go b/pkg/jobrunaggregator/jobrunaggregatoranalyzer/analyzer.go index be217b4b6b6..f9704bdedb6 100644 --- a/pkg/jobrunaggregator/jobrunaggregatoranalyzer/analyzer.go +++ b/pkg/jobrunaggregator/jobrunaggregatoranalyzer/analyzer.go @@ -10,8 +10,9 @@ import ( "strings" "time" + "gopkg.in/yaml.v2" + "k8s.io/apimachinery/pkg/util/clock" - "sigs.k8s.io/yaml" "github.com/openshift/ci-tools/pkg/jobrunaggregator/jobrunaggregatorapi" "github.com/openshift/ci-tools/pkg/jobrunaggregator/jobrunaggregatorlib" @@ -66,8 +67,9 @@ func (o *JobRunAggregatorAnalyzerOptions) getRelatedJobs(ctx context.Context) ([ func (o *JobRunAggregatorAnalyzerOptions) Run(ctx context.Context) error { // if it hasn't been more than hour since the jobRuns started, the list isn't complete. readyAt := o.jobRunStartEstimate.Add(1 * time.Hour) + timeToStopWaiting := o.jobRunStartEstimate.Add(3*time.Hour + 10*time.Minute) - fmt.Printf("Aggregating job runs of type %q for %q. ReadyAt=%v, now=%v.\n", o.jobName, o.payloadTag, readyAt, o.clock.Now()) + fmt.Printf("Aggregating job runs of type %q for %q. now=%v, ReadyAt=%v, timeToStopWaiting=%v.\n", o.jobName, o.payloadTag, o.clock.Now(), readyAt, timeToStopWaiting) ctx, cancel := context.WithTimeout(ctx, o.timeout) defer cancel() @@ -78,11 +80,13 @@ func (o *JobRunAggregatorAnalyzerOptions) Run(ctx context.Context) error { var finishedJobsToAggregate []jobrunaggregatorapi.JobRunInfo var finishedJobRunNames []string + var unfinishedJobNames []string for { // TODO extract to a method. fmt.Println() // for prettier logs // reset vars finishedJobsToAggregate = []jobrunaggregatorapi.JobRunInfo{} finishedJobRunNames = []string{} + unfinishedJobNames = []string{} relatedJobs, err := o.getRelatedJobs(ctx) if err != nil { @@ -100,7 +104,6 @@ func (o *JobRunAggregatorAnalyzerOptions) Run(ctx context.Context) error { return fmt.Errorf("%q for %q: found no related jobRuns", o.jobName, o.payloadTag) } - unfinishedJobNames := []string{} for i := range relatedJobs { relatedJob := relatedJobs[i] if !relatedJob.IsFinished(ctx) { @@ -124,6 +127,12 @@ func (o *JobRunAggregatorAnalyzerOptions) Run(ctx context.Context) error { finishedJobRunNames = append(finishedJobRunNames, relatedJob.GetJobRunID()) } + // ready or not, it's time to check + if o.clock.Now().After(timeToStopWaiting) { + fmt.Printf("%q for %q: waited long enough. Ready or not, here I come. (readyOrNot=%v now=%v)\n", o.jobName, o.payloadTag, timeToStopWaiting, o.clock.Now()) + break + } + if len(unfinishedJobNames) > 0 { fmt.Printf("%q for %q: found %d unfinished related jobRuns: %v\n", o.jobName, o.payloadTag, len(unfinishedJobNames), strings.Join(unfinishedJobNames, ", ")) time.Sleep(2 * time.Minute) @@ -133,9 +142,29 @@ func (o *JobRunAggregatorAnalyzerOptions) Run(ctx context.Context) error { break } + if len(unfinishedJobNames) > 0 { + fmt.Printf("%q for %q: found %d unfinished related jobRuns: %v\n", o.jobName, o.payloadTag, len(unfinishedJobNames), strings.Join(unfinishedJobNames, ", ")) + } + // if more than three jobruns timed out, just fail the entire aggregation + if len(unfinishedJobNames) > 3 { + return fmt.Errorf("%q for %q: found %d unfinished related jobRuns: %v\n", o.jobName, o.payloadTag, len(unfinishedJobNames), strings.Join(unfinishedJobNames, ", ")) + } fmt.Printf("%q for %q: aggregating %d related jobRuns: %v\n", o.jobName, o.payloadTag, len(finishedJobsToAggregate), strings.Join(finishedJobRunNames, ", ")) aggregationConfiguration := &AggregationConfiguration{} + for _, jobRunName := range unfinishedJobNames { + aggregationConfiguration.FinishedJobs = append( + aggregationConfiguration.FinishedJobs, + JobRunInfo{ + JobName: o.jobName, + JobRunID: jobRunName, + HumanURL: jobrunaggregatorapi.GetHumanURL(o.jobName, jobRunName), + GCSBucketURL: jobrunaggregatorapi.GetGCSArtifactURL(o.jobName, jobRunName), + Status: "unknown", + }, + ) + } + currentAggregationJunit := &aggregatedJobRunJunit{} for i := range finishedJobsToAggregate { jobRun := finishedJobsToAggregate[i] @@ -147,8 +176,8 @@ func (o *JobRunAggregatorAnalyzerOptions) Run(ctx context.Context) error { if err != nil { return err } - aggregationConfiguration.IndividualJobs = append( - aggregationConfiguration.IndividualJobs, + aggregationConfiguration.FinishedJobs = append( + aggregationConfiguration.FinishedJobs, JobRunInfo{ JobName: jobRun.GetJobName(), JobRunID: jobRun.GetJobRunID(), @@ -161,27 +190,28 @@ func (o *JobRunAggregatorAnalyzerOptions) Run(ctx context.Context) error { currentAggregationJunit.addJobRun(jobrunaggregatorlib.GetPayloadTagFromProwJob(prowJob), currJunit) } - fmt.Printf("%q for %q: aggregating junit tests.\n", o.jobName, o.payloadTag) - currentAggregationJunitSuites, err := currentAggregationJunit.aggregateAllJobRuns() + // write out the jobruns aggregated by this jobrun. + aggregationConfigYAML, err := yaml.Marshal(aggregationConfiguration) if err != nil { return err } - if err := assignPassFail(ctx, currentAggregationJunitSuites, o.passFailCalculator); err != nil { + if err := ioutil.WriteFile(filepath.Join(currentAggregationDir, "aggregation-config.yaml"), aggregationConfigYAML, 0644); err != nil { return err } - currentAggrationJunitXML, err := xml.Marshal(currentAggregationJunitSuites) + + fmt.Printf("%q for %q: aggregating junit tests.\n", o.jobName, o.payloadTag) + currentAggregationJunitSuites, err := currentAggregationJunit.aggregateAllJobRuns() if err != nil { return err } - if err := ioutil.WriteFile(filepath.Join(currentAggregationDir, "junit-aggregated.xml"), currentAggrationJunitXML, 0644); err != nil { + if err := assignPassFail(ctx, currentAggregationJunitSuites, o.passFailCalculator); err != nil { return err } - - aggregationConfigYAML, err := yaml.Marshal(aggregationConfiguration) + currentAggrationJunitXML, err := xml.Marshal(currentAggregationJunitSuites) if err != nil { return err } - if err := ioutil.WriteFile(filepath.Join(currentAggregationDir, "aggregation-config.yaml"), aggregationConfigYAML, 0644); err != nil { + if err := ioutil.WriteFile(filepath.Join(currentAggregationDir, "junit-aggregated.xml"), currentAggrationJunitXML, 0644); err != nil { return err } diff --git a/pkg/jobrunaggregator/jobrunaggregatoranalyzer/types.go b/pkg/jobrunaggregator/jobrunaggregatoranalyzer/types.go index e0012630b08..df46c2a01cc 100644 --- a/pkg/jobrunaggregator/jobrunaggregatoranalyzer/types.go +++ b/pkg/jobrunaggregator/jobrunaggregatoranalyzer/types.go @@ -1,7 +1,8 @@ package jobrunaggregatoranalyzer type AggregationConfiguration struct { - IndividualJobs []JobRunInfo + UnfinishedJobs []JobRunInfo + FinishedJobs []JobRunInfo } type JobRunInfo struct {