Skip to content

Commit

Permalink
nodes now publish heartbeats (#181)
Browse files Browse the repository at this point in the history
* nodes now publish heartbeats

* removing superfluous field
  • Loading branch information
autodidaddict committed Apr 17, 2024
1 parent a0b7c92 commit 934ef75
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 5 deletions.
9 changes: 9 additions & 0 deletions control-api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const (
AgentStoppedEventType = "agent_stopped"
NodeStartedEventType = "node_started"
NodeStoppedEventType = "node_stopped"
HeartbeatEventType = "heartbeat"
WorkloadStartedEventType = "workload_started" // FIXME-- should this be WorkloadDeployed?
WorkloadStoppedEventType = "workload_stopped" // FIXME-- should this be in addition to WorkloadUndeployed (likely yes, in case of something bad happening...)
// FIXME-- where is WorkloadDeployedEventType? (likely just need to rename WorkloadStartedEventType -> WorkloadDeployedEventType)
Expand Down Expand Up @@ -40,3 +41,11 @@ type NodeStoppedEvent struct {
Id string `json:"id"`
Graceful bool `json:"graceful"`
}

type HeartbeatEvent struct {
Version string `json:"version"`
NodeId string `json:"node_id"`
Uptime string `json:"uptime"`
Tags map[string]string `json:"tags,omitempty"`
RunningMachines int `json:"running_machines"`
}
54 changes: 49 additions & 5 deletions internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ import (
"github.com/synadia-io/nex/internal/node/observability"
)

const defaultNatsStoreDir = "pnats"
const defaultPidFilepath = "/var/run/nex.pid"

const runloopSleepInterval = 100 * time.Millisecond
const runloopTickInterval = 2500 * time.Millisecond
const (
systemNamespace = "system"
defaultNatsStoreDir = "pnats"
defaultPidFilepath = "/var/run/nex.pid"
runloopSleepInterval = 100 * time.Millisecond
runloopTickInterval = 2500 * time.Millisecond
heartbeatInterval = 30 * time.Second
)

// Nex node process
type Node struct {
Expand Down Expand Up @@ -109,6 +112,8 @@ func (n *Node) Start() {

_ = n.publishNodeStarted()

go n.emitHeartbeats()

timer := time.NewTicker(runloopTickInterval)
defer timer.Stop()

Expand Down Expand Up @@ -173,6 +178,17 @@ func (n *Node) createPid() error {
return nil
}

func (n *Node) emitHeartbeats() {
ticker := time.NewTicker(heartbeatInterval)
for range ticker.C {
n.publishHeartbeat()

Check failure on line 184 in internal/node/node.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `n.publishHeartbeat` is not checked (errcheck)

if n.closing > 0 {
ticker.Stop()
}
}
}

func (n *Node) generateKeypair() error {
var err error

Expand Down Expand Up @@ -318,6 +334,34 @@ func (n *Node) loadNodeConfig() error {
return nil
}

func (n *Node) publishHeartbeat() error {
machines, err := n.manager.RunningWorkloads()
if err != nil {
n.log.Error("Failed to query running machines during heartbeat", slog.Any("error", err))
return nil
}

now := time.Now().UTC()

evt := controlapi.HeartbeatEvent{
NodeId: n.publicKey,
Version: Version(),
Uptime: myUptime(now.Sub(n.startedAt)),
RunningMachines: len(machines),
Tags: n.config.Tags,
}

cloudevent := cloudevents.NewEvent()
cloudevent.SetSource(n.publicKey)
cloudevent.SetID(uuid.NewString())
cloudevent.SetTime(now)
cloudevent.SetType(controlapi.HeartbeatEventType)
cloudevent.SetDataContentType(cloudevents.ApplicationJSON)
_ = cloudevent.SetData(evt)

return PublishCloudEvent(n.nc, systemNamespace, cloudevent, n.log)
}

func (n *Node) publishNodeStarted() error {
nodeStart := controlapi.NodeStartedEvent{
Version: VERSION,
Expand Down

0 comments on commit 934ef75

Please sign in to comment.