Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions cmd/node-termination-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func watchForCancellationEvents(cancelChan <-chan monitor.InterruptionEvent, int
for {
interruptionEvent := <-cancelChan
nodeName := interruptionEvent.NodeName
eventID := interruptionEvent.EventID
interruptionEventStore.CancelInterruptionEvent(interruptionEvent.EventID)
if interruptionEventStore.ShouldUncordonNode(nodeName) {
log.Info().Msg("Uncordoning the node due to a cancellation event")
Expand All @@ -297,7 +298,7 @@ func watchForCancellationEvents(cancelChan <-chan monitor.InterruptionEvent, int
} else {
recorder.Emit(nodeName, observability.Normal, observability.UncordonReason, observability.UncordonMsg)
}
metrics.NodeActionsInc("uncordon", nodeName, err)
metrics.NodeActionsInc("uncordon", nodeName, eventID, err)

err = node.RemoveNTHLabels(nodeName)
if err != nil {
Expand All @@ -317,6 +318,7 @@ func watchForCancellationEvents(cancelChan <-chan monitor.InterruptionEvent, int
func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Store, drainEvent *monitor.InterruptionEvent, node node.Node, nthConfig config.Config, nodeMetadata ec2metadata.NodeMetadata, metrics observability.Metrics, recorder observability.K8sEventRecorder, wg *sync.WaitGroup) {
defer wg.Done()
nodeName := drainEvent.NodeName
eventID := drainEvent.EventID

if nthConfig.UseProviderId {
newNodeName, err := node.GetNodeNameFromProviderID(drainEvent.ProviderID)
Expand All @@ -334,7 +336,7 @@ func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Sto
}
drainEvent.NodeLabels = nodeLabels
if drainEvent.PreDrainTask != nil {
runPreDrainTask(node, nodeName, drainEvent, metrics, recorder)
runPreDrainTask(node, nodeName, eventID, drainEvent, metrics, recorder)
}

podNameList, err := node.FetchPodNameList(nodeName)
Expand All @@ -348,9 +350,9 @@ func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Sto
}

if nthConfig.CordonOnly || (!nthConfig.EnableSQSTerminationDraining && drainEvent.IsRebalanceRecommendation() && !nthConfig.EnableRebalanceDraining) {
err = cordonNode(node, nodeName, drainEvent, metrics, recorder)
err = cordonNode(node, nodeName, eventID, drainEvent, metrics, recorder)
} else {
err = cordonAndDrainNode(node, nodeName, drainEvent, metrics, recorder, nthConfig.EnableSQSTerminationDraining)
err = cordonAndDrainNode(node, nodeName, eventID, drainEvent, metrics, recorder, nthConfig.EnableSQSTerminationDraining)
}

if nthConfig.WebhookURL != "" {
Expand All @@ -363,25 +365,25 @@ func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Sto
} else {
interruptionEventStore.MarkAllAsProcessed(nodeName)
if drainEvent.PostDrainTask != nil {
runPostDrainTask(node, nodeName, drainEvent, metrics, recorder)
runPostDrainTask(node, nodeName, eventID, drainEvent, metrics, recorder)
}
<-interruptionEventStore.Workers
}

}

func runPreDrainTask(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) {
func runPreDrainTask(node node.Node, nodeName string, eventID string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) {
err := drainEvent.PreDrainTask(*drainEvent, node)
if err != nil {
log.Err(err).Msg("There was a problem executing the pre-drain task")
recorder.Emit(nodeName, observability.Warning, observability.PreDrainErrReason, observability.PreDrainErrMsgFmt, err.Error())
} else {
recorder.Emit(nodeName, observability.Normal, observability.PreDrainReason, observability.PreDrainMsg)
}
metrics.NodeActionsInc("pre-drain", nodeName, err)
metrics.NodeActionsInc("pre-drain", nodeName, eventID, err)
}

func cordonNode(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) error {
func cordonNode(node node.Node, nodeName string, eventID string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) error {
err := node.Cordon(nodeName, drainEvent.Description)
if err != nil {
if errors.IsNotFound(err) {
Expand All @@ -393,40 +395,40 @@ func cordonNode(node node.Node, nodeName string, drainEvent *monitor.Interruptio
return err
} else {
log.Info().Str("node_name", nodeName).Str("reason", drainEvent.Description).Msg("Node successfully cordoned")
metrics.NodeActionsInc("cordon", nodeName, err)
metrics.NodeActionsInc("cordon", nodeName, eventID, err)
recorder.Emit(nodeName, observability.Normal, observability.CordonReason, observability.CordonMsg)
}
return nil
}

func cordonAndDrainNode(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder, sqsTerminationDraining bool) error {
func cordonAndDrainNode(node node.Node, nodeName string, eventID string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder, sqsTerminationDraining bool) error {
err := node.CordonAndDrain(nodeName, drainEvent.Description)
if err != nil {
if errors.IsNotFound(err) {
log.Err(err).Msgf("node '%s' not found in the cluster", nodeName)
} else {
log.Err(err).Msg("There was a problem while trying to cordon and drain the node")
metrics.NodeActionsInc("cordon-and-drain", nodeName, err)
metrics.NodeActionsInc("cordon-and-drain", nodeName, eventID, err)
recorder.Emit(nodeName, observability.Warning, observability.CordonAndDrainErrReason, observability.CordonAndDrainErrMsgFmt, err.Error())
}
return err
} else {
log.Info().Str("node_name", nodeName).Str("reason", drainEvent.Description).Msg("Node successfully cordoned and drained")
metrics.NodeActionsInc("cordon-and-drain", nodeName, err)
metrics.NodeActionsInc("cordon-and-drain", nodeName, eventID, err)
recorder.Emit(nodeName, observability.Normal, observability.CordonAndDrainReason, observability.CordonAndDrainMsg)
}
return nil
}

func runPostDrainTask(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) {
func runPostDrainTask(node node.Node, nodeName string, eventID string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) {
err := drainEvent.PostDrainTask(*drainEvent, node)
if err != nil {
log.Err(err).Msg("There was a problem executing the post-drain task")
recorder.Emit(nodeName, observability.Warning, observability.PostDrainErrReason, observability.PostDrainErrMsgFmt, err.Error())
} else {
recorder.Emit(nodeName, observability.Normal, observability.PostDrainReason, observability.PostDrainMsg)
}
metrics.NodeActionsInc("post-drain", nodeName, err)
metrics.NodeActionsInc("post-drain", nodeName, eventID, err)
}

func getRegionFromQueueURL(queueURL string) string {
Expand Down
5 changes: 3 additions & 2 deletions pkg/observability/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
labelNodeActionKey = attribute.Key("node/action")
labelNodeStatusKey = attribute.Key("node/status")
labelNodeNameKey = attribute.Key("node/name")
labelEventIDKey = attribute.Key("node/event-id")
)

// Metrics represents the stats for observability
Expand Down Expand Up @@ -88,12 +89,12 @@ func (m Metrics) ErrorEventsInc(where string) {
}

// NodeActionsInc will increment one for the node stats counter, partitioned by action, nodeName and status, and only if metrics are enabled.
func (m Metrics) NodeActionsInc(action, nodeName string, err error) {
func (m Metrics) NodeActionsInc(action, nodeName string, eventID string, err error) {
if !m.enabled {
return
}

labels := []attribute.KeyValue{labelNodeActionKey.String(action), labelNodeNameKey.String(nodeName)}
labels := []attribute.KeyValue{labelNodeActionKey.String(action), labelNodeNameKey.String(nodeName), labelEventIDKey.String(eventID)}
if err != nil {
labels = append(labels, labelNodeStatusKey.String("error"))
} else {
Expand Down