From 3453e1d025b647dfe913c60c6d4d2c918cea4b3c Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Tue, 6 Aug 2024 13:27:43 +0100 Subject: [PATCH] Update Lookout Ingester To Use String Ids (#3862) * update instructions Signed-off-by: Chris Martin * update instructions Signed-off-by: Chris Martin * fix simulator Signed-off-by: Chris Martin --------- Signed-off-by: Chris Martin Co-authored-by: Chris Martin --- internal/common/ingest/testfixtures/event.go | 153 +++++++---------- .../instructions/instructions.go | 155 ++++-------------- internal/lookoutv2/repository/util.go | 68 +++++--- .../scheduleringester/instructions_test.go | 11 -- 4 files changed, 132 insertions(+), 255 deletions(-) diff --git a/internal/common/ingest/testfixtures/event.go b/internal/common/ingest/testfixtures/event.go index 6d205277a01..54c4cfe47a2 100644 --- a/internal/common/ingest/testfixtures/event.go +++ b/internal/common/ingest/testfixtures/event.go @@ -85,6 +85,7 @@ var Submit = &armadaevents.EventSequence_Event{ Event: &armadaevents.EventSequence_Event_SubmitJob{ SubmitJob: &armadaevents.SubmitJob{ JobId: JobIdProto, + JobIdStr: JobIdString, Priority: Priority, AtMostOnce: true, Preemptible: true, @@ -132,59 +133,14 @@ var Submit = &armadaevents.EventSequence_Event{ }, } -var SubmitDuplicate = &armadaevents.EventSequence_Event{ - Created: testfixtures.BasetimeProto, - Event: &armadaevents.EventSequence_Event_SubmitJob{ - SubmitJob: &armadaevents.SubmitJob{ - IsDuplicate: true, - JobId: JobIdProto, - Priority: Priority, - AtMostOnce: true, - Preemptible: true, - ConcurrencySafe: true, - ObjectMeta: &armadaevents.ObjectMeta{ - Namespace: Namespace, - Name: "test-job", - }, - MainObject: &armadaevents.KubernetesMainObject{ - Object: &armadaevents.KubernetesMainObject_PodSpec{ - PodSpec: &armadaevents.PodSpecWithAvoidList{ - PodSpec: &v1.PodSpec{ - NodeSelector: NodeSelector, - Tolerations: Tolerations, - PriorityClassName: PriorityClassName, - Containers: []v1.Container{ - { - Name: "container1", - Image: "alpine:latest", - Command: []string{"myprogram.sh"}, - Args: []string{"foo", "bar"}, - Resources: v1.ResourceRequirements{ - Limits: map[v1.ResourceName]resource.Quantity{ - "memory": resource.MustParse("64Mi"), - "cpu": resource.MustParse("150m"), - }, - Requests: map[v1.ResourceName]resource.Quantity{ - "memory": resource.MustParse("64Mi"), - "cpu": resource.MustParse("150m"), - }, - }, - }, - }, - }, - }, - }, - }, - }, - }, -} - var Assigned = &armadaevents.EventSequence_Event{ Created: testfixtures.BasetimeProto, Event: &armadaevents.EventSequence_Event_JobRunAssigned{ JobRunAssigned: &armadaevents.JobRunAssigned{ - RunId: RunIdProto, - JobId: JobIdProto, + RunId: RunIdProto, + RunIdStr: RunIdString, + JobId: JobIdProto, + JobIdStr: JobIdString, ResourceInfos: []*armadaevents.KubernetesResourceInfo{ { ObjectMeta: &armadaevents.ObjectMeta{ @@ -209,7 +165,9 @@ var Leased = &armadaevents.EventSequence_Event{ Event: &armadaevents.EventSequence_Event_JobRunLeased{ JobRunLeased: &armadaevents.JobRunLeased{ RunId: RunIdProto, + RunIdStr: RunIdString, JobId: JobIdProto, + JobIdStr: JobIdString, ExecutorId: ExecutorId, NodeId: NodeName, Pool: Pool, @@ -233,8 +191,10 @@ var Running = &armadaevents.EventSequence_Event{ Created: testfixtures.BasetimeProto, Event: &armadaevents.EventSequence_Event_JobRunRunning{ JobRunRunning: &armadaevents.JobRunRunning{ - RunId: RunIdProto, - JobId: JobIdProto, + RunId: RunIdProto, + RunIdStr: RunIdString, + JobId: JobIdProto, + JobIdStr: JobIdString, ResourceInfos: []*armadaevents.KubernetesResourceInfo{ { Info: &armadaevents.KubernetesResourceInfo_PodInfo{ @@ -253,8 +213,10 @@ var JobRunSucceeded = &armadaevents.EventSequence_Event{ Created: testfixtures.BasetimeProto, Event: &armadaevents.EventSequence_Event_JobRunSucceeded{ JobRunSucceeded: &armadaevents.JobRunSucceeded{ - RunId: RunIdProto, - JobId: JobIdProto, + RunId: RunIdProto, + RunIdStr: RunIdString, + JobId: JobIdProto, + JobIdStr: JobIdString, }, }, } @@ -263,8 +225,10 @@ var JobRunCancelled = &armadaevents.EventSequence_Event{ Created: testfixtures.BasetimeProto, Event: &armadaevents.EventSequence_Event_JobRunCancelled{ JobRunCancelled: &armadaevents.JobRunCancelled{ - RunId: RunIdProto, - JobId: JobIdProto, + RunId: RunIdProto, + RunIdStr: RunIdString, + JobId: JobIdProto, + JobIdStr: JobIdString, }, }, } @@ -273,8 +237,10 @@ var LeaseReturned = &armadaevents.EventSequence_Event{ Created: testfixtures.BasetimeProto, Event: &armadaevents.EventSequence_Event_JobRunErrors{ JobRunErrors: &armadaevents.JobRunErrors{ - JobId: JobIdProto, - RunId: RunIdProto, + JobId: JobIdProto, + JobIdStr: JobIdString, + RunId: RunIdProto, + RunIdStr: RunIdString, Errors: []*armadaevents.Error{ { Terminal: true, @@ -294,7 +260,8 @@ var JobCancelRequested = &armadaevents.EventSequence_Event{ Created: testfixtures.BasetimeProto, Event: &armadaevents.EventSequence_Event_CancelJob{ CancelJob: &armadaevents.CancelJob{ - JobId: JobIdProto, + JobId: JobIdProto, + JobIdStr: JobIdString, }, }, } @@ -310,7 +277,8 @@ var JobCancelled = &armadaevents.EventSequence_Event{ Created: testfixtures.BasetimeProto, Event: &armadaevents.EventSequence_Event_CancelledJob{ CancelledJob: &armadaevents.CancelledJob{ - JobId: JobIdProto, + JobId: JobIdProto, + JobIdStr: JobIdString, }, }, } @@ -319,8 +287,9 @@ var JobValidated = &armadaevents.EventSequence_Event{ Created: testfixtures.BasetimeProto, Event: &armadaevents.EventSequence_Event_JobValidated{ JobValidated: &armadaevents.JobValidated{ - JobId: JobIdProto, - Pools: []string{"cpu"}, + JobId: JobIdProto, + JobIdStr: JobIdString, + Pools: []string{"cpu"}, }, }, } @@ -329,7 +298,8 @@ var JobRequeued = &armadaevents.EventSequence_Event{ Created: testfixtures.BasetimeProto, Event: &armadaevents.EventSequence_Event_JobRequeued{ JobRequeued: &armadaevents.JobRequeued{ - JobId: JobIdProto, + JobId: JobIdProto, + JobIdStr: JobIdString, SchedulingInfo: &schedulerobjects.JobSchedulingInfo{ Lifetime: 0, AtMostOnce: true, @@ -379,6 +349,7 @@ var JobReprioritiseRequested = &armadaevents.EventSequence_Event{ Event: &armadaevents.EventSequence_Event_ReprioritiseJob{ ReprioritiseJob: &armadaevents.ReprioritiseJob{ JobId: JobIdProto, + JobIdStr: JobIdString, Priority: NewPriority, }, }, @@ -398,6 +369,7 @@ var JobReprioritised = &armadaevents.EventSequence_Event{ Event: &armadaevents.EventSequence_Event_ReprioritisedJob{ ReprioritisedJob: &armadaevents.ReprioritisedJob{ JobId: JobIdProto, + JobIdStr: JobIdString, Priority: NewPriority, }, }, @@ -407,7 +379,8 @@ var JobPreemptionRequested = &armadaevents.EventSequence_Event{ Created: testfixtures.BasetimeProto, Event: &armadaevents.EventSequence_Event_JobPreemptionRequested{ JobPreemptionRequested: &armadaevents.JobPreemptionRequested{ - JobId: JobIdProto, + JobId: JobIdProto, + JobIdStr: JobIdString, }, }, } @@ -416,8 +389,10 @@ var JobRunPreempted = &armadaevents.EventSequence_Event{ Created: testfixtures.BasetimeProto, Event: &armadaevents.EventSequence_Event_JobRunPreempted{ JobRunPreempted: &armadaevents.JobRunPreempted{ - PreemptedJobId: JobIdProto, - PreemptedRunId: RunIdProto, + PreemptedJobId: JobIdProto, + PreemptedJobIdStr: JobIdString, + PreemptedRunId: RunIdProto, + PreemptedRunIdStr: RunIdString, }, }, } @@ -426,8 +401,10 @@ var JobRunFailed = &armadaevents.EventSequence_Event{ Created: testfixtures.BasetimeProto, Event: &armadaevents.EventSequence_Event_JobRunErrors{ JobRunErrors: &armadaevents.JobRunErrors{ - JobId: JobIdProto, - RunId: RunIdProto, + JobId: JobIdProto, + JobIdStr: JobIdString, + RunId: RunIdProto, + RunIdStr: RunIdString, Errors: []*armadaevents.Error{ { Terminal: true, @@ -451,8 +428,10 @@ var JobRunUnschedulable = &armadaevents.EventSequence_Event{ Created: testfixtures.BasetimeProto, Event: &armadaevents.EventSequence_Event_JobRunErrors{ JobRunErrors: &armadaevents.JobRunErrors{ - JobId: JobIdProto, - RunId: RunIdProto, + JobId: JobIdProto, + JobIdStr: JobIdString, + RunId: RunIdProto, + RunIdStr: RunIdString, Errors: []*armadaevents.Error{ { Terminal: false, @@ -475,7 +454,8 @@ var JobPreempted = &armadaevents.EventSequence_Event{ Created: testfixtures.BasetimeProto, Event: &armadaevents.EventSequence_Event_JobErrors{ JobErrors: &armadaevents.JobErrors{ - JobId: JobIdProto, + JobId: JobIdProto, + JobIdStr: JobIdString, Errors: []*armadaevents.Error{ { Terminal: true, @@ -492,7 +472,8 @@ var JobRejected = &armadaevents.EventSequence_Event{ Created: testfixtures.BasetimeProto, Event: &armadaevents.EventSequence_Event_JobErrors{ JobErrors: &armadaevents.JobErrors{ - JobId: JobIdProto, + JobId: JobIdProto, + JobIdStr: JobIdString, Errors: []*armadaevents.Error{ { Terminal: true, @@ -511,7 +492,8 @@ var JobFailed = &armadaevents.EventSequence_Event{ Created: testfixtures.BasetimeProto, Event: &armadaevents.EventSequence_Event_JobErrors{ JobErrors: &armadaevents.JobErrors{ - JobId: JobIdProto, + JobId: JobIdProto, + JobIdStr: JobIdString, Errors: []*armadaevents.Error{ { Terminal: true, @@ -530,35 +512,12 @@ var JobFailed = &armadaevents.EventSequence_Event{ }, } -var JobLeaseReturned = &armadaevents.EventSequence_Event{ - Created: testfixtures.BasetimeProto, - Event: &armadaevents.EventSequence_Event_JobRunErrors{ - JobRunErrors: &armadaevents.JobRunErrors{ - JobId: JobIdProto, - RunId: RunIdProto, - Errors: []*armadaevents.Error{ - { - Terminal: true, - Reason: &armadaevents.Error_PodLeaseReturned{ - PodLeaseReturned: &armadaevents.PodLeaseReturned{ - ObjectMeta: &armadaevents.ObjectMeta{ - ExecutorId: ExecutorId, - }, - Message: LeaseReturnedMsg, - DebugMessage: DebugMsg, - }, - }, - }, - }, - }, - }, -} - var JobSucceeded = &armadaevents.EventSequence_Event{ Created: testfixtures.BasetimeProto, Event: &armadaevents.EventSequence_Event_JobSucceeded{ JobSucceeded: &armadaevents.JobSucceeded{ - JobId: JobIdProto, + JobId: JobIdProto, + JobIdStr: JobIdString, }, }, } diff --git a/internal/lookoutingesterv2/instructions/instructions.go b/internal/lookoutingesterv2/instructions/instructions.go index fe0e5bfeab2..aa9f9c5e28e 100644 --- a/internal/lookoutingesterv2/instructions/instructions.go +++ b/internal/lookoutingesterv2/instructions/instructions.go @@ -5,7 +5,6 @@ import ( "time" "github.com/gogo/protobuf/proto" - "github.com/pkg/errors" log "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" "k8s.io/utils/pointer" @@ -139,12 +138,6 @@ func (c *InstructionConverter) handleSubmitJob( event *armadaevents.SubmitJob, update *model.InstructionSet, ) error { - jobId, err := armadaevents.UlidStringFromProtoUuid(event.GetJobId()) - if err != nil { - c.metrics.RecordPulsarMessageError(metrics.PulsarMessageErrorProcessing) - return err - } - // Try and marshall the job proto. This shouldn't go wrong but if it does, it's not a fatal error // Rather it means that the job spec won't be available in the ui var jobProto []byte @@ -153,16 +146,16 @@ func (c *InstructionConverter) handleSubmitJob( jobProtoUncompressed, err := proto.Marshal(apiJob) if err != nil { - log.WithError(err).Warnf("Couldn't marshall job %s in jobset %s as proto.", jobId, jobSet) + log.WithError(err).Warnf("Couldn't marshall job %s in jobset %s as proto.", event.JobIdStr, jobSet) } jobProto, err = c.compressor.Compress(jobProtoUncompressed) if err != nil { - log.WithError(err).Warnf("Couldn't compress proto for job %s in jobset %s.", jobId, jobSet) + log.WithError(err).Warnf("Couldn't compress proto for job %s in jobset %s.", event.JobIdStr, jobSet) } } else { c.metrics.RecordPulsarMessageError(metrics.PulsarMessageErrorProcessing) - log.WithError(err).Warnf("Couldn't convert job event for job %s in jobset %s to api job.", jobId, jobSet) + log.WithError(err).Warnf("Couldn't convert job event for job %s in jobset %s to api job.", event.JobIdStr, jobSet) } resources := getJobResources(apiJob) @@ -175,7 +168,7 @@ func (c *InstructionConverter) handleSubmitJob( annotations := extractUserAnnotations(c.userAnnotationPrefix, event.GetObjectMeta().GetAnnotations()) job := model.CreateJobInstruction{ - JobId: jobId, + JobId: event.JobIdStr, Queue: queue, Owner: owner, Namespace: apiJob.Namespace, @@ -215,14 +208,8 @@ func extractUserAnnotations(userAnnotationPrefix string, jobAnnotations map[stri } func (c *InstructionConverter) handleReprioritiseJob(ts time.Time, event *armadaevents.ReprioritisedJob, update *model.InstructionSet) error { - jobId, err := armadaevents.UlidStringFromProtoUuid(event.GetJobId()) - if err != nil { - c.metrics.RecordPulsarMessageError(metrics.PulsarMessageErrorProcessing) - return err - } - jobUpdate := model.UpdateJobInstruction{ - JobId: jobId, + JobId: event.JobIdStr, Priority: pointer.Int64(int64(event.Priority)), } update.JobsToUpdate = append(update.JobsToUpdate, &jobUpdate) @@ -253,14 +240,8 @@ func (c *InstructionConverter) handleCancelledJob(ts time.Time, event *armadaeve } func (c *InstructionConverter) handleJobSucceeded(ts time.Time, event *armadaevents.JobSucceeded, update *model.InstructionSet) error { - jobId, err := armadaevents.UlidStringFromProtoUuid(event.GetJobId()) - if err != nil { - c.metrics.RecordPulsarMessageError(metrics.PulsarMessageErrorProcessing) - return err - } - jobUpdate := model.UpdateJobInstruction{ - JobId: jobId, + JobId: event.JobIdStr, State: pointer.Int32(int32(lookout.JobSucceededOrdinal)), LastTransitionTime: &ts, LastTransitionTimeSeconds: pointer.Int64(ts.Unix()), @@ -270,12 +251,6 @@ func (c *InstructionConverter) handleJobSucceeded(ts time.Time, event *armadaeve } func (c *InstructionConverter) handleJobErrors(ts time.Time, event *armadaevents.JobErrors, update *model.InstructionSet) error { - jobId, err := armadaevents.UlidStringFromProtoUuid(event.GetJobId()) - if err != nil { - c.metrics.RecordPulsarMessageError(metrics.PulsarMessageErrorProcessing) - return err - } - for _, e := range event.GetErrors() { if !e.Terminal { continue @@ -289,13 +264,13 @@ func (c *InstructionConverter) handleJobErrors(ts time.Time, event *armadaevents case *armadaevents.Error_JobRejected: state = lookout.JobRejectedOrdinal update.JobErrorsToCreate = append(update.JobErrorsToCreate, &model.CreateJobErrorInstruction{ - JobId: jobId, - Error: tryCompressError(jobId, reason.JobRejected.Message, c.compressor), + JobId: event.JobIdStr, + Error: tryCompressError(event.JobIdStr, reason.JobRejected.Message, c.compressor), }) } jobUpdate := model.UpdateJobInstruction{ - JobId: jobId, + JobId: event.JobIdStr, State: pointer.Int32(int32(state)), LastTransitionTime: &ts, LastTransitionTimeSeconds: pointer.Int64(ts.Unix()), @@ -308,12 +283,6 @@ func (c *InstructionConverter) handleJobErrors(ts time.Time, event *armadaevents } func (c *InstructionConverter) handleJobRunRunning(ts time.Time, event *armadaevents.JobRunRunning, update *model.InstructionSet) error { - jobId, err := armadaevents.UlidStringFromProtoUuid(event.GetJobId()) - if err != nil { - c.metrics.RecordPulsarMessageError(metrics.PulsarMessageErrorProcessing) - return err - } - runId, err := armadaevents.UuidStringFromProtoUuid(event.GetRunId()) if err != nil { c.metrics.RecordPulsarMessageError(metrics.PulsarMessageErrorProcessing) @@ -322,7 +291,7 @@ func (c *InstructionConverter) handleJobRunRunning(ts time.Time, event *armadaev // Update Job job := model.UpdateJobInstruction{ - JobId: jobId, + JobId: event.JobIdStr, State: pointer.Int32(int32(lookout.JobRunningOrdinal)), LastTransitionTime: &ts, LastTransitionTimeSeconds: pointer.Int64(ts.Unix()), @@ -344,14 +313,8 @@ func (c *InstructionConverter) handleJobRunRunning(ts time.Time, event *armadaev } func (c *InstructionConverter) handleJobRequeued(ts time.Time, event *armadaevents.JobRequeued, update *model.InstructionSet) error { - jobId, err := armadaevents.UlidStringFromProtoUuid(event.GetJobId()) - if err != nil { - c.metrics.RecordPulsarMessageError(metrics.PulsarMessageErrorProcessing) - return err - } - jobUpdate := model.UpdateJobInstruction{ - JobId: jobId, + JobId: event.JobIdStr, State: pointer.Int32(int32(lookout.JobQueuedOrdinal)), LastTransitionTime: &ts, LastTransitionTimeSeconds: pointer.Int64(ts.Unix()), @@ -361,32 +324,20 @@ func (c *InstructionConverter) handleJobRequeued(ts time.Time, event *armadaeven } func (c *InstructionConverter) handleJobRunLeased(ts time.Time, event *armadaevents.JobRunLeased, update *model.InstructionSet) error { - jobId, err := armadaevents.UlidStringFromProtoUuid(event.GetJobId()) - if err != nil { - c.metrics.RecordPulsarMessageError(metrics.PulsarMessageErrorProcessing) - return err - } - - runId, err := armadaevents.UuidStringFromProtoUuid(event.RunId) - if err != nil { - c.metrics.RecordPulsarMessageError(metrics.PulsarMessageErrorProcessing) - return err - } - // Update Job job := model.UpdateJobInstruction{ - JobId: jobId, + JobId: event.JobIdStr, State: pointer.Int32(int32(lookout.JobLeasedOrdinal)), LastTransitionTime: &ts, LastTransitionTimeSeconds: pointer.Int64(ts.Unix()), - LatestRunId: &runId, + LatestRunId: &event.RunIdStr, } update.JobsToUpdate = append(update.JobsToUpdate, &job) // Now create a job run jobRun := model.CreateJobRunInstruction{ - RunId: runId, - JobId: jobId, + RunId: event.RunIdStr, + JobId: event.JobIdStr, Cluster: util.Truncate(event.ExecutorId, maxClusterLen), Node: pointer.String(util.Truncate(event.NodeId, maxNodeLen)), Leased: &ts, @@ -397,30 +348,18 @@ func (c *InstructionConverter) handleJobRunLeased(ts time.Time, event *armadaeve } func (c *InstructionConverter) handleJobRunAssigned(ts time.Time, event *armadaevents.JobRunAssigned, update *model.InstructionSet) error { - jobId, err := armadaevents.UlidStringFromProtoUuid(event.GetJobId()) - if err != nil { - c.metrics.RecordPulsarMessageError(metrics.PulsarMessageErrorProcessing) - return err - } - - runId, err := armadaevents.UuidStringFromProtoUuid(event.RunId) - if err != nil { - c.metrics.RecordPulsarMessageError(metrics.PulsarMessageErrorProcessing) - return err - } - // Update Job job := model.UpdateJobInstruction{ - JobId: jobId, + JobId: event.JobIdStr, State: pointer.Int32(int32(lookout.JobPendingOrdinal)), LastTransitionTime: &ts, LastTransitionTimeSeconds: pointer.Int64(ts.Unix()), - LatestRunId: &runId, + LatestRunId: &event.RunIdStr, } update.JobsToUpdate = append(update.JobsToUpdate, &job) jobRun := model.UpdateJobRunInstruction{ - RunId: runId, + RunId: event.RunIdStr, Pending: &ts, JobRunState: pointer.Int32(lookout.JobRunPendingOrdinal), } @@ -429,14 +368,8 @@ func (c *InstructionConverter) handleJobRunAssigned(ts time.Time, event *armadae } func (c *InstructionConverter) handleJobRunCancelled(ts time.Time, event *armadaevents.JobRunCancelled, update *model.InstructionSet) error { - runId, err := armadaevents.UuidStringFromProtoUuid(event.RunId) - if err != nil { - c.metrics.RecordPulsarMessageError(metrics.PulsarMessageErrorProcessing) - return errors.WithStack(err) - } - jobRun := model.UpdateJobRunInstruction{ - RunId: runId, + RunId: event.RunIdStr, Finished: &ts, JobRunState: pointer.Int32(lookout.JobRunCancelledOrdinal), } @@ -445,14 +378,8 @@ func (c *InstructionConverter) handleJobRunCancelled(ts time.Time, event *armada } func (c *InstructionConverter) handleJobRunSucceeded(ts time.Time, event *armadaevents.JobRunSucceeded, update *model.InstructionSet) error { - runId, err := armadaevents.UuidStringFromProtoUuid(event.RunId) - if err != nil { - c.metrics.RecordPulsarMessageError(metrics.PulsarMessageErrorProcessing) - return errors.WithStack(err) - } - jobRun := model.UpdateJobRunInstruction{ - RunId: runId, + RunId: event.RunIdStr, Finished: &ts, JobRunState: pointer.Int32(lookout.JobRunSucceededOrdinal), ExitCode: pointer.Int32(0), @@ -462,21 +389,9 @@ func (c *InstructionConverter) handleJobRunSucceeded(ts time.Time, event *armada } func (c *InstructionConverter) handleJobRunErrors(ts time.Time, event *armadaevents.JobRunErrors, update *model.InstructionSet) error { - jobId, err := armadaevents.UlidStringFromProtoUuid(event.GetJobId()) - if err != nil { - c.metrics.RecordPulsarMessageError(metrics.PulsarMessageErrorProcessing) - return errors.WithStack(err) - } - - runId, err := armadaevents.UuidStringFromProtoUuid(event.RunId) - if err != nil { - c.metrics.RecordPulsarMessageError(metrics.PulsarMessageErrorProcessing) - return errors.WithStack(err) - } - for _, e := range event.GetErrors() { jobRunUpdate := &model.UpdateJobRunInstruction{ - RunId: runId, + RunId: event.RunIdStr, } if e.Terminal { jobRunUpdate.Finished = &ts @@ -486,8 +401,8 @@ func (c *InstructionConverter) handleJobRunErrors(ts time.Time, event *armadaeve case *armadaevents.Error_PodError: jobRunUpdate.Node = extractNodeName(reason.PodError) jobRunUpdate.JobRunState = pointer.Int32(lookout.JobRunFailedOrdinal) - jobRunUpdate.Error = tryCompressError(jobId, reason.PodError.GetMessage(), c.compressor) - jobRunUpdate.Debug = tryCompressError(jobId, reason.PodError.DebugMessage, c.compressor) + jobRunUpdate.Error = tryCompressError(event.JobIdStr, reason.PodError.GetMessage(), c.compressor) + jobRunUpdate.Debug = tryCompressError(event.JobIdStr, reason.PodError.DebugMessage, c.compressor) var exitCode int32 = 0 for _, containerError := range reason.PodError.ContainerErrors { if containerError.ExitCode != 0 { @@ -504,14 +419,14 @@ func (c *InstructionConverter) handleJobRunErrors(ts time.Time, event *armadaeve jobRunUpdate.Node = extractNodeName(reason.PodUnschedulable) case *armadaevents.Error_PodLeaseReturned: jobRunUpdate.JobRunState = pointer.Int32(lookout.JobRunLeaseReturnedOrdinal) - jobRunUpdate.Error = tryCompressError(jobId, reason.PodLeaseReturned.GetMessage(), c.compressor) - jobRunUpdate.Debug = tryCompressError(jobId, reason.PodLeaseReturned.GetDebugMessage(), c.compressor) + jobRunUpdate.Error = tryCompressError(event.JobIdStr, reason.PodLeaseReturned.GetMessage(), c.compressor) + jobRunUpdate.Debug = tryCompressError(event.JobIdStr, reason.PodLeaseReturned.GetDebugMessage(), c.compressor) case *armadaevents.Error_LeaseExpired: jobRunUpdate.JobRunState = pointer.Int32(lookout.JobRunLeaseExpiredOrdinal) - jobRunUpdate.Error = tryCompressError(jobId, "Lease expired", c.compressor) + jobRunUpdate.Error = tryCompressError(event.JobIdStr, "Lease expired", c.compressor) default: jobRunUpdate.JobRunState = pointer.Int32(lookout.JobRunFailedOrdinal) - jobRunUpdate.Error = tryCompressError(jobId, "Unknown error", c.compressor) + jobRunUpdate.Error = tryCompressError(event.JobIdStr, "Unknown error", c.compressor) log.Debugf("Ignoring event %T", reason) } update.JobRunsToUpdate = append(update.JobRunsToUpdate, jobRunUpdate) @@ -521,23 +436,11 @@ func (c *InstructionConverter) handleJobRunErrors(ts time.Time, event *armadaeve } func (c *InstructionConverter) handleJobRunPreempted(ts time.Time, event *armadaevents.JobRunPreempted, update *model.InstructionSet) error { - jobId, err := armadaevents.UlidStringFromProtoUuid(event.PreemptedJobId) - if err != nil { - c.metrics.RecordPulsarMessageError(metrics.PulsarMessageErrorProcessing) - return err - } - - runId, err := armadaevents.UuidStringFromProtoUuid(event.PreemptedRunId) - if err != nil { - c.metrics.RecordPulsarMessageError(metrics.PulsarMessageErrorProcessing) - return err - } - jobRun := model.UpdateJobRunInstruction{ - RunId: runId, + RunId: event.PreemptedRunIdStr, JobRunState: pointer.Int32(lookout.JobRunPreemptedOrdinal), Finished: &ts, - Error: tryCompressError(jobId, "preempted", c.compressor), + Error: tryCompressError(event.PreemptedJobIdStr, "preempted", c.compressor), } update.JobRunsToUpdate = append(update.JobRunsToUpdate, &jobRun) return nil diff --git a/internal/lookoutv2/repository/util.go b/internal/lookoutv2/repository/util.go index 40c55b80b1f..ac13d738801 100644 --- a/internal/lookoutv2/repository/util.go +++ b/internal/lookoutv2/repository/util.go @@ -123,6 +123,7 @@ func (js *JobSimulator) Submit(queue, jobSet, owner, namespace string, timestamp Event: &armadaevents.EventSequence_Event_SubmitJob{ SubmitJob: &armadaevents.SubmitJob{ JobId: jobIdProto, + JobIdStr: jobId, Priority: uint32(opts.Priority), AtMostOnce: true, Preemptible: true, @@ -192,7 +193,9 @@ func (js *JobSimulator) Lease(runId string, cluster string, node string, timesta Event: &armadaevents.EventSequence_Event_JobRunLeased{ JobRunLeased: &armadaevents.JobRunLeased{ RunId: armadaevents.ProtoUuidFromUuid(uuid.MustParse(runId)), + RunIdStr: runId, JobId: js.jobId, + JobIdStr: armadaevents.MustUlidStringFromProtoUuid(js.jobId), ExecutorId: cluster, NodeId: node, }, @@ -222,8 +225,10 @@ func (js *JobSimulator) Pending(runId string, cluster string, timestamp time.Tim Created: ts, Event: &armadaevents.EventSequence_Event_JobRunAssigned{ JobRunAssigned: &armadaevents.JobRunAssigned{ - RunId: armadaevents.ProtoUuidFromUuid(uuid.MustParse(runId)), - JobId: js.jobId, + RunId: armadaevents.ProtoUuidFromUuid(uuid.MustParse(runId)), + RunIdStr: runId, + JobId: js.jobId, + JobIdStr: armadaevents.MustUlidStringFromProtoUuid(js.jobId), ResourceInfos: []*armadaevents.KubernetesResourceInfo{ { ObjectMeta: &armadaevents.ObjectMeta{ @@ -265,8 +270,10 @@ func (js *JobSimulator) Running(runId string, node string, timestamp time.Time) Created: ts, Event: &armadaevents.EventSequence_Event_JobRunRunning{ JobRunRunning: &armadaevents.JobRunRunning{ - RunId: armadaevents.ProtoUuidFromUuid(uuid.MustParse(runId)), - JobId: js.jobId, + RunId: armadaevents.ProtoUuidFromUuid(uuid.MustParse(runId)), + RunIdStr: runId, + JobId: js.jobId, + JobIdStr: armadaevents.MustUlidStringFromProtoUuid(js.jobId), ResourceInfos: []*armadaevents.KubernetesResourceInfo{ { Info: &armadaevents.KubernetesResourceInfo_PodInfo{ @@ -302,8 +309,10 @@ func (js *JobSimulator) RunSucceeded(runId string, timestamp time.Time) *JobSimu Created: ts, Event: &armadaevents.EventSequence_Event_JobRunSucceeded{ JobRunSucceeded: &armadaevents.JobRunSucceeded{ - RunId: armadaevents.ProtoUuidFromUuid(uuid.MustParse(runId)), - JobId: js.jobId, + RunId: armadaevents.ProtoUuidFromUuid(uuid.MustParse(runId)), + RunIdStr: runId, + JobId: js.jobId, + JobIdStr: armadaevents.MustUlidStringFromProtoUuid(js.jobId), }, }, } @@ -326,7 +335,8 @@ func (js *JobSimulator) Succeeded(timestamp time.Time) *JobSimulator { Created: ts, Event: &armadaevents.EventSequence_Event_JobSucceeded{ JobSucceeded: &armadaevents.JobSucceeded{ - JobId: js.jobId, + JobId: js.jobId, + JobIdStr: armadaevents.MustUlidStringFromProtoUuid(js.jobId), }, }, } @@ -344,8 +354,10 @@ func (js *JobSimulator) LeaseReturned(runId string, message string, timestamp ti Created: ts, Event: &armadaevents.EventSequence_Event_JobRunErrors{ JobRunErrors: &armadaevents.JobRunErrors{ - JobId: js.jobId, - RunId: armadaevents.ProtoUuidFromUuid(uuid.MustParse(runId)), + JobId: js.jobId, + JobIdStr: armadaevents.MustUlidStringFromProtoUuid(js.jobId), + RunId: armadaevents.ProtoUuidFromUuid(uuid.MustParse(runId)), + RunIdStr: runId, Errors: []*armadaevents.Error{ { Terminal: true, @@ -376,7 +388,8 @@ func (js *JobSimulator) Cancelled(timestamp time.Time) *JobSimulator { Created: ts, Event: &armadaevents.EventSequence_Event_CancelledJob{ CancelledJob: &armadaevents.CancelledJob{ - JobId: js.jobId, + JobId: js.jobId, + JobIdStr: armadaevents.MustUlidStringFromProtoUuid(js.jobId), }, }, } @@ -395,6 +408,7 @@ func (js *JobSimulator) Reprioritized(newPriority uint32, timestamp time.Time) * Event: &armadaevents.EventSequence_Event_ReprioritisedJob{ ReprioritisedJob: &armadaevents.ReprioritisedJob{ JobId: js.jobId, + JobIdStr: armadaevents.MustUlidStringFromProtoUuid(js.jobId), Priority: newPriority, }, }, @@ -414,8 +428,10 @@ func (js *JobSimulator) RunFailed(runId string, node string, exitCode int32, mes Created: ts, Event: &armadaevents.EventSequence_Event_JobRunErrors{ JobRunErrors: &armadaevents.JobRunErrors{ - JobId: js.jobId, - RunId: armadaevents.ProtoUuidFromUuid(uuid.MustParse(runId)), + JobId: js.jobId, + JobIdStr: armadaevents.MustUlidStringFromProtoUuid(js.jobId), + RunId: armadaevents.ProtoUuidFromUuid(uuid.MustParse(runId)), + RunIdStr: runId, Errors: []*armadaevents.Error{ { Terminal: true, @@ -454,7 +470,8 @@ func (js *JobSimulator) Rejected(message string, timestamp time.Time) *JobSimula Created: ts, Event: &armadaevents.EventSequence_Event_JobErrors{ JobErrors: &armadaevents.JobErrors{ - JobId: js.jobId, + JobId: js.jobId, + JobIdStr: armadaevents.MustUlidStringFromProtoUuid(js.jobId), Errors: []*armadaevents.Error{ { Terminal: true, @@ -482,7 +499,8 @@ func (js *JobSimulator) Failed(node string, exitCode int32, message string, time Created: ts, Event: &armadaevents.EventSequence_Event_JobErrors{ JobErrors: &armadaevents.JobErrors{ - JobId: js.jobId, + JobId: js.jobId, + JobIdStr: armadaevents.MustUlidStringFromProtoUuid(js.jobId), Errors: []*armadaevents.Error{ { Terminal: true, @@ -515,7 +533,8 @@ func (js *JobSimulator) Preempted(timestamp time.Time) *JobSimulator { Created: ts, Event: &armadaevents.EventSequence_Event_JobErrors{ JobErrors: &armadaevents.JobErrors{ - JobId: js.jobId, + JobId: js.jobId, + JobIdStr: armadaevents.MustUlidStringFromProtoUuid(js.jobId), Errors: []*armadaevents.Error{ { Terminal: true, @@ -528,12 +547,15 @@ func (js *JobSimulator) Preempted(timestamp time.Time) *JobSimulator { }, } + preemptedRunId := uuid.NewString() preemptedRun := &armadaevents.EventSequence_Event{ Created: ts, Event: &armadaevents.EventSequence_Event_JobRunPreempted{ JobRunPreempted: &armadaevents.JobRunPreempted{ - PreemptedJobId: js.jobId, - PreemptedRunId: armadaevents.ProtoUuidFromUuid(uuid.MustParse(uuid.NewString())), + PreemptedJobId: js.jobId, + PreemptedJobIdStr: armadaevents.MustUlidStringFromProtoUuid(js.jobId), + PreemptedRunId: armadaevents.ProtoUuidFromUuid(uuid.MustParse(preemptedRunId)), + PreemptedRunIdStr: preemptedRunId, }, }, } @@ -551,8 +573,10 @@ func (js *JobSimulator) RunUnschedulable(runId string, cluster string, node stri Created: ts, Event: &armadaevents.EventSequence_Event_JobRunErrors{ JobRunErrors: &armadaevents.JobRunErrors{ - JobId: js.jobId, - RunId: armadaevents.ProtoUuidFromUuid(uuid.MustParse(runId)), + JobId: js.jobId, + JobIdStr: armadaevents.MustUlidStringFromProtoUuid(js.jobId), + RunId: armadaevents.ProtoUuidFromUuid(uuid.MustParse(runId)), + RunIdStr: runId, Errors: []*armadaevents.Error{ { Terminal: false, @@ -589,8 +613,10 @@ func (js *JobSimulator) LeaseExpired(runId string, timestamp time.Time, _ clock. Created: ts, Event: &armadaevents.EventSequence_Event_JobRunErrors{ JobRunErrors: &armadaevents.JobRunErrors{ - JobId: js.jobId, - RunId: armadaevents.ProtoUuidFromUuid(uuid.MustParse(runId)), + JobId: js.jobId, + JobIdStr: armadaevents.MustUlidStringFromProtoUuid(js.jobId), + RunId: armadaevents.ProtoUuidFromUuid(uuid.MustParse(runId)), + RunIdStr: runId, Errors: []*armadaevents.Error{ { Terminal: true, diff --git a/internal/scheduleringester/instructions_test.go b/internal/scheduleringester/instructions_test.go index b901bf55e70..89e3065e9bf 100644 --- a/internal/scheduleringester/instructions_test.go +++ b/internal/scheduleringester/instructions_test.go @@ -47,10 +47,6 @@ func TestConvertSequence(t *testing.T) { SchedulingInfo: protoutil.MustMarshall(getExpectedSubmitMessageSchedulingInfo(t)), }}}, }, - "ignores duplicate submit": { - events: []*armadaevents.EventSequence_Event{f.SubmitDuplicate}, - expected: []DbOperation{}, - }, "job run leased": { events: []*armadaevents.EventSequence_Event{f.Leased}, expected: []DbOperation{ @@ -230,13 +226,6 @@ func TestConvertSequence(t *testing.T) { MarkRunsSucceeded{f.RunIdUuid: f.BaseTime.Add(time.Hour)}, }, }, - "ignored events": { - events: []*armadaevents.EventSequence_Event{f.Running, f.SubmitDuplicate, f.JobSucceeded}, - expected: []DbOperation{ - MarkRunsRunning{f.RunIdUuid: f.BaseTime}, - MarkJobsSucceeded{f.JobIdString: true}, - }, - }, } for name, tc := range tests {