Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

renames workload start/stop to deploy/undeploy #254

Merged
merged 2 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (a *Agent) cacheExecutableArtifact(req *agentapi.DeployRequest) (*string, e
func (a *Agent) dispatchEvents() {
for !a.shuttingDown() {
entry := <-a.eventLogs
bytes, err := json.Marshal(entry)
bytes, err := entry.MarshalJSON()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to marshal event log to json: %s", err.Error())
continue
Expand Down
4 changes: 2 additions & 2 deletions agent/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (a *Agent) PublishWorkloadDeployed(vmID, workloadName string, totalBytes in
Text: fmt.Sprintf("Workload %s deployed", workloadName),
}

evt := agentapi.NewAgentEvent(vmID, agentapi.WorkloadStartedEventType, agentapi.WorkloadStatusEvent{WorkloadName: workloadName})
evt := agentapi.NewAgentEvent(vmID, agentapi.WorkloadDeployedEventType, agentapi.WorkloadStatusEvent{WorkloadName: workloadName})
a.eventLogs <- &evt
}

Expand All @@ -90,6 +90,6 @@ func (a *Agent) PublishWorkloadExited(vmID, workloadName, message string, err bo
Text: txt,
}

evt := agentapi.NewAgentEvent(vmID, agentapi.WorkloadStoppedEventType, agentapi.WorkloadStatusEvent{WorkloadName: workloadName, Code: code, Message: message})
evt := agentapi.NewAgentEvent(vmID, agentapi.WorkloadUndeployedEventType, agentapi.WorkloadStatusEvent{WorkloadName: workloadName, Code: code, Message: message})
a.eventLogs <- &evt
}
22 changes: 10 additions & 12 deletions control-api/events.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
package controlapi

const (
AgentStartedEventType = "agent_started"
AgentStoppedEventType = "agent_stopped"
NodeStartedEventType = "node_started"
NodeStoppedEventType = "node_stopped"
LameDuckEnteredEventType = "node_entered_lameduck"
HeartbeatEventType = "heartbeat"
WorkloadStartedEventType = "workload_started" // FIXME-- should this be WorkloadDeployed?
WorkloadStoppedEventType = "workload_stopped" // FIXME-- should this be in addition to WorkloadUndeployed (likely yes, in case of something bad happening...)
// FIXME-- where is WorkloadDeployedEventType? (likely just need to rename WorkloadStartedEventType -> WorkloadDeployedEventType)
// FIXME-- where is WorkloadStoppedEventType?
AgentStartedEventType = "agent_started"
AgentStoppedEventType = "agent_stopped"
NodeStartedEventType = "node_started"
NodeStoppedEventType = "node_stopped"
LameDuckEnteredEventType = "node_entered_lameduck"
HeartbeatEventType = "heartbeat"
WorkloadDeployedEventType = "workload_deployed"
WorkloadUndeployedEventType = "workload_undeployed"
)

type AgentStartedEvent struct {
AgentVersion string `json:"agent_version"`
}

type WorkloadStartedEvent struct {
type WorkloadDeployedEvent struct {
Name string `json:"workload_name"`
TotalBytes int `json:"total_bytes"`
}

type WorkloadStoppedEvent struct {
type WorkloadUndeployedEvent struct {
Name string `json:"workload_name"`
Code int `json:"code"`
Message string `json:"message"`
Expand Down
2 changes: 1 addition & 1 deletion internal/agent-api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (a *AgentClient) handleAgentEvent(msg *nats.Msg) {
agentID := tokens[1]

var evt cloudevents.Event
err := json.Unmarshal(msg.Data, &evt)
err := evt.UnmarshalJSON(msg.Data)
if err != nil {
a.log.Error("Failed to deserialize cloudevent from agent", slog.Any("err", err))
return
Expand Down
6 changes: 2 additions & 4 deletions internal/agent-api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ const (
AgentStoppedEventType = "agent_stopped"
FunctionExecutionFailedType = "function_exec_failed"
FunctionExecutionSucceededType = "function_exec_succeeded"
WorkloadStartedEventType = "workload_started" // FIXME-- should this be WorkloadDeployed?
WorkloadStoppedEventType = "workload_stopped" // FIXME-- should this be in addition to WorkloadUndeployed (likely yes, in case of something bad happening...)
// FIXME-- where is WorkloadDeployedEventType? (likely just need to rename WorkloadStartedEventType -> WorkloadDeployedEventType)
// FIXME-- where is WorkloadStoppedEventType?
WorkloadDeployedEventType = "workload_deployed"
WorkloadUndeployedEventType = "workload_undeployed"
)

type AgentStartedEvent struct {
Expand Down
11 changes: 8 additions & 3 deletions internal/node/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@ type emittedLog struct {

// publish the given $NEX event to an arbitrary namespace using the given NATS connection
func PublishCloudEvent(nc *nats.Conn, namespace string, event cloudevents.Event, log *slog.Logger) error {
raw, _ := event.MarshalJSON()

raw, err := event.MarshalJSON()
if err != nil {
log.Error("Failed to marshal cloudevent as JSON", slog.Any("error", err))
return err
}

// $NEX.events.{namespace}.{event_type}
subject := fmt.Sprintf("%s.%s.%s", EventSubjectPrefix, namespace, event.Type())
err := nc.Publish(subject, raw)
err = nc.Publish(subject, raw)
if err != nil {
log.Error("Failed to publish cloud event", slog.Any("err", err))
log.Error("Failed to publish cloud event", slog.Any("error", err))
return err
}

Expand Down
5 changes: 3 additions & 2 deletions internal/node/workload_mgr_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ func (w *WorkloadManager) agentEvent(agentId string, evt cloudevents.Event) {
// with it
return
}
evt.SetSource(fmt.Sprintf("%s-%s", *deployRequest.TargetNode, agentId))

err := PublishCloudEvent(w.nc, *deployRequest.Namespace, evt, w.log)
if err != nil {
w.log.Error("Failed to publish cloudevent", slog.Any("err", err))
return
}

if evt.Type() == agentapi.WorkloadStoppedEventType {
if evt.Type() == agentapi.WorkloadUndeployedEventType {
_ = w.StopWorkload(agentId, false)

evtData, err := evt.DataBytes()
Expand Down Expand Up @@ -233,7 +234,7 @@ func (w *WorkloadManager) publishWorkloadStopped(workloadId string) error {
cloudevent.SetSource(w.publicKey)
cloudevent.SetID(uuid.NewString())
cloudevent.SetTime(time.Now().UTC())
cloudevent.SetType(agentapi.WorkloadStoppedEventType)
cloudevent.SetType(agentapi.WorkloadUndeployedEventType)
cloudevent.SetDataContentType(cloudevents.ApplicationJSON)
_ = cloudevent.SetData(workloadStopped)

Expand Down
8 changes: 4 additions & 4 deletions nex/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ func handleEventEntry(log *slog.Logger, emittedEvent controlapi.EmittedEvent) {
} else {
attrs = append(attrs, slog.String("message", evt.Message), slog.Int("code", evt.Code))
}
case controlapi.WorkloadStartedEventType:
evt := &controlapi.WorkloadStartedEvent{}
case controlapi.WorkloadDeployedEventType:
evt := &controlapi.WorkloadDeployedEvent{}
if err := event.DataAs(evt); err != nil {
attrs = append(attrs, slog.Any("err", err))
} else {
attrs = append(attrs, slog.String("workload_name", evt.Name))
}
case controlapi.WorkloadStoppedEventType:
evt := &controlapi.WorkloadStoppedEvent{}
case controlapi.WorkloadUndeployedEventType:
evt := &controlapi.WorkloadUndeployedEvent{}
if err := event.DataAs(evt); err != nil {
attrs = append(attrs, slog.Any("err", err))
} else {
Expand Down
6 changes: 3 additions & 3 deletions spec/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var _ = Describe("event monitor", func() {
Expect(err).To(BeNil())

evt := cloudevents.NewEvent()
evt.SetType("workload_started")
evt.SetType("workload_deployed")
evt.SetID("1")
evt.SetSource("testing")
_ = evt.SetData(testStruct{
Expand All @@ -51,7 +51,7 @@ var _ = Describe("event monitor", func() {
})

bytes, _ := json.Marshal(evt)
_ = nc.Publish("$NEX.events.default.workload_started", bytes)
_ = nc.Publish("$NEX.events.default.workload_deployed", bytes)

subject = <-ch
})
Expand All @@ -61,7 +61,7 @@ var _ = Describe("event monitor", func() {
})

It("maintains event type", func(ctx SpecContext) {
Expect(subject.EventType).To(Equal("workload_started"))
Expect(subject.EventType).To(Equal("workload_deployed"))
})

Describe("DataAs", func() {
Expand Down
Loading