Skip to content

Commit

Permalink
Remove JobTerminatedEvent (#3863)
Browse files Browse the repository at this point in the history
* wip

Signed-off-by: Chris Martin <[email protected]>

* wip

Signed-off-by: Chris Martin <[email protected]>

* lint

Signed-off-by: Chris Martin <[email protected]>

* fix python

Signed-off-by: Chris Martin <[email protected]>

---------

Signed-off-by: Chris Martin <[email protected]>
Co-authored-by: Chris Martin <[email protected]>
  • Loading branch information
d80tb7 and d80tb7 authored Aug 6, 2024
1 parent 00e8c16 commit 8a6424f
Show file tree
Hide file tree
Showing 16 changed files with 384 additions and 785 deletions.
2 changes: 0 additions & 2 deletions client/python/tests/unit/test_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ def __init__(self, name):
("reprioritized", EventType.reprioritized),
("cancelling", EventType.cancelling),
("cancelled", EventType.cancelled),
("terminated", EventType.terminated),
("utilisation", EventType.utilisation),
("ingress_info", EventType.ingress_info),
("reprioritizing", EventType.reprioritizing),
Expand Down Expand Up @@ -76,7 +75,6 @@ def test_event_class(name, event_type):
"reprioritized",
"cancelling",
"cancelled",
"terminated",
"utilisation",
"ingress_info",
"reprioritizing",
Expand Down
24 changes: 0 additions & 24 deletions internal/common/ingest/testfixtures/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,30 +447,6 @@ var JobRunFailed = &armadaevents.EventSequence_Event{
},
}

var JobRunTerminated = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_JobRunErrors{
JobRunErrors: &armadaevents.JobRunErrors{
JobId: JobIdProto,
RunId: RunIdProto,
Errors: []*armadaevents.Error{
{
Terminal: false,
Reason: &armadaevents.Error_PodTerminated{
PodTerminated: &armadaevents.PodTerminated{
NodeName: NodeName,
ObjectMeta: &armadaevents.ObjectMeta{
ExecutorId: ExecutorId,
},
Message: TerminatedMsg,
},
},
},
},
},
},
}

var JobRunUnschedulable = &armadaevents.EventSequence_Event{
Created: testfixtures.BasetimeProto,
Event: &armadaevents.EventSequence_Event_JobRunErrors{
Expand Down
2 changes: 0 additions & 2 deletions internal/lookoutingesterv2/instructions/instructions.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,8 +496,6 @@ func (c *InstructionConverter) handleJobRunErrors(ts time.Time, event *armadaeve
}
}
jobRunUpdate.ExitCode = pointer.Int32(exitCode)
case *armadaevents.Error_PodTerminated:
continue
case *armadaevents.Error_JobRunPreemptedError:
// This case is already handled by the JobRunPreempted event
// When we formalise that as a terminal event, we'll remove this JobRunError getting produced
Expand Down
9 changes: 0 additions & 9 deletions internal/lookoutingesterv2/instructions/instructions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,15 +363,6 @@ func TestConvert(t *testing.T) {
MessageIds: []pulsar.MessageID{pulsarutils.NewMessageId(1)},
},
},
"terminated": {
events: &ingest.EventSequencesWithIds{
EventSequences: []*armadaevents.EventSequence{testfixtures.NewEventSequence(testfixtures.JobRunTerminated)},
MessageIds: []pulsar.MessageID{pulsarutils.NewMessageId(1)},
},
expected: &model.InstructionSet{
MessageIds: []pulsar.MessageID{pulsarutils.NewMessageId(1)},
},
},
"unschedulable": {
events: &ingest.EventSequencesWithIds{
EventSequences: []*armadaevents.EventSequence{testfixtures.NewEventSequence(testfixtures.JobRunUnschedulable)},
Expand Down
38 changes: 0 additions & 38 deletions internal/lookoutv2/repository/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,44 +544,6 @@ func (js *JobSimulator) Preempted(timestamp time.Time) *JobSimulator {
return js
}

func (js *JobSimulator) RunTerminated(runId string, cluster string, node string, message string, timestamp time.Time) *JobSimulator {
ts := timestampOrNow(timestamp)
terminatedTime := protoutil.ToStdTime(ts)
terminated := &armadaevents.EventSequence_Event{
Created: ts,
Event: &armadaevents.EventSequence_Event_JobRunErrors{
JobRunErrors: &armadaevents.JobRunErrors{
JobId: js.jobId,
RunId: armadaevents.ProtoUuidFromUuid(uuid.MustParse(runId)),
Errors: []*armadaevents.Error{
{
Terminal: false,
Reason: &armadaevents.Error_PodTerminated{
PodTerminated: &armadaevents.PodTerminated{
NodeName: node,
ObjectMeta: &armadaevents.ObjectMeta{
ExecutorId: cluster,
},
Message: message,
},
},
},
},
},
},
}
js.events = append(js.events, terminated)

js.updateRun(js.job, &runPatch{
runId: runId,
cluster: &cluster,
finished: &terminatedTime,
jobRunState: lookout.JobRunTerminated,
node: &node,
})
return js
}

func (js *JobSimulator) RunUnschedulable(runId string, cluster string, node string, message string, timestamp time.Time) *JobSimulator {
ts := timestampOrNow(timestamp)
unschedulableTime := protoutil.ToStdTime(ts)
Expand Down
2 changes: 0 additions & 2 deletions internal/scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,6 @@ func errorTypeAndMessageFromError(ctx *armadacontext.Context, err *armadaevents.
return podError, reason.PodError.Message
case *armadaevents.Error_PodLeaseReturned:
return podLeaseReturned, reason.PodLeaseReturned.Message
case *armadaevents.Error_PodTerminated:
return podTerminated, reason.PodTerminated.Message
case *armadaevents.Error_JobRunPreemptedError:
return jobRunPreempted, ""
default:
Expand Down
19 changes: 0 additions & 19 deletions internal/server/event/conversion/conversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,25 +330,6 @@ func FromInternalJobRunErrors(queueName string, jobSetName string, time time.Tim
},
}
events = append(events, event)
case *armadaevents.Error_PodTerminated:
objectMeta := reason.PodTerminated.GetObjectMeta()
event := &api.EventMessage{
Events: &api.EventMessage_Terminated{
Terminated: &api.JobTerminatedEvent{
JobId: jobId,
JobSetId: jobSetName,
PodNamespace: objectMeta.GetNamespace(),
PodName: objectMeta.GetName(),
Queue: queueName,
Created: protoutil.ToTimestamp(time),
ClusterId: objectMeta.GetExecutorId(),
Reason: reason.PodTerminated.GetMessage(),
KubernetesId: objectMeta.GetKubernetesId(),
PodNumber: reason.PodTerminated.GetPodNumber(),
},
},
}
events = append(events, event)
default:
log.Debugf("Ignoring event %T", reason)
}
Expand Down
53 changes: 0 additions & 53 deletions internal/server/event/conversion/conversions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,59 +419,6 @@ func TestConvertPodLeaseReturned(t *testing.T) {
assert.Equal(t, expected, apiEvents)
}

func TestConvertPodTerminated(t *testing.T) {
terminated := &armadaevents.EventSequence_Event{
Created: baseTimeProto,
Event: &armadaevents.EventSequence_Event_JobRunErrors{
JobRunErrors: &armadaevents.JobRunErrors{
JobId: jobIdProto,
RunId: runIdProto,
Errors: []*armadaevents.Error{
{
Terminal: false,
Reason: &armadaevents.Error_PodTerminated{
PodTerminated: &armadaevents.PodTerminated{
ObjectMeta: &armadaevents.ObjectMeta{
ExecutorId: executorId,
Namespace: namespace,
Name: podName,
KubernetesId: runIdString,
},
Message: "The pod was terminated",
NodeName: nodeName,
PodNumber: podNumber,
},
},
},
},
},
},
}

expected := []*api.EventMessage{
{
Events: &api.EventMessage_Terminated{
Terminated: &api.JobTerminatedEvent{
JobId: jobIdString,
ClusterId: executorId,
PodNamespace: namespace,
PodName: podName,
KubernetesId: runIdString,
Reason: "The pod was terminated",
PodNumber: podNumber,
JobSetId: jobSetName,
Queue: queue,
Created: protoutil.ToTimestamp(baseTime),
},
},
},
}

apiEvents, err := FromEventSequence(toEventSeq(terminated))
assert.NoError(t, err)
assert.Equal(t, expected, apiEvents)
}

func TestConvertJobError(t *testing.T) {
errored := &armadaevents.EventSequence_Event{
Created: baseTimeProto,
Expand Down
40 changes: 0 additions & 40 deletions pkg/api/api.swagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,9 +704,6 @@ func SwaggerJsonTemplate() string {
" \"succeeded\": {\n" +
" \"$ref\": \"#/definitions/apiJobSucceededEvent\"\n" +
" },\n" +
" \"terminated\": {\n" +
" \"$ref\": \"#/definitions/apiJobTerminatedEvent\"\n" +
" },\n" +
" \"unableToSchedule\": {\n" +
" \"$ref\": \"#/definitions/apiJobUnableToScheduleEvent\"\n" +
" },\n" +
Expand Down Expand Up @@ -1755,43 +1752,6 @@ func SwaggerJsonTemplate() string {
" }\n" +
" }\n" +
" },\n" +
" \"apiJobTerminatedEvent\": {\n" +
" \"type\": \"object\",\n" +
" \"properties\": {\n" +
" \"clusterId\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"created\": {\n" +
" \"type\": \"string\",\n" +
" \"format\": \"date-time\"\n" +
" },\n" +
" \"jobId\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"jobSetId\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"kubernetesId\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"podName\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"podNamespace\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"podNumber\": {\n" +
" \"type\": \"integer\",\n" +
" \"format\": \"int32\"\n" +
" },\n" +
" \"queue\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"reason\": {\n" +
" \"type\": \"string\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"apiJobUnableToScheduleEvent\": {\n" +
" \"type\": \"object\",\n" +
" \"properties\": {\n" +
Expand Down
40 changes: 0 additions & 40 deletions pkg/api/api.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -693,9 +693,6 @@
"succeeded": {
"$ref": "#/definitions/apiJobSucceededEvent"
},
"terminated": {
"$ref": "#/definitions/apiJobTerminatedEvent"
},
"unableToSchedule": {
"$ref": "#/definitions/apiJobUnableToScheduleEvent"
},
Expand Down Expand Up @@ -1744,43 +1741,6 @@
}
}
},
"apiJobTerminatedEvent": {
"type": "object",
"properties": {
"clusterId": {
"type": "string"
},
"created": {
"type": "string",
"format": "date-time"
},
"jobId": {
"type": "string"
},
"jobSetId": {
"type": "string"
},
"kubernetesId": {
"type": "string"
},
"podName": {
"type": "string"
},
"podNamespace": {
"type": "string"
},
"podNumber": {
"type": "integer",
"format": "int32"
},
"queue": {
"type": "string"
},
"reason": {
"type": "string"
}
}
},
"apiJobUnableToScheduleEvent": {
"type": "object",
"properties": {
Expand Down
Loading

0 comments on commit 8a6424f

Please sign in to comment.