From 125c6d7562dd88283f7860dcdfc99a0c8818c695 Mon Sep 17 00:00:00 2001 From: ohsoo <47485049+ohsoo@users.noreply.github.com> Date: Fri, 21 Jul 2023 16:47:48 -0400 Subject: [PATCH] Add task stop verification ack to ecs-agent module (#3820) * Add task stop verification ack to ecs-agent module * Add task stop verification ack to ecs-agent module --- .../task_stop_verification_ack_responder.go | 86 +++++++++++++++++++ .../ecs-agent/metrics/constants.go | 8 ++ .../ecs-agent/metrics/metrics.go | 4 +- .../task_stop_verification_ack_responder.go | 86 +++++++++++++++++++ ecs-agent/metrics/constants.go | 8 ++ 5 files changed, 190 insertions(+), 2 deletions(-) create mode 100644 agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/task_stop_verification_ack_responder.go create mode 100644 ecs-agent/acs/session/task_stop_verification_ack_responder.go diff --git a/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/task_stop_verification_ack_responder.go b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/task_stop_verification_ack_responder.go new file mode 100644 index 00000000000..bc4a31608ac --- /dev/null +++ b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/task_stop_verification_ack_responder.go @@ -0,0 +1,86 @@ +package session + +import ( + "fmt" + + "github.com/aws/amazon-ecs-agent/ecs-agent/acs/model/ecsacs" + "github.com/aws/amazon-ecs-agent/ecs-agent/logger" + "github.com/aws/amazon-ecs-agent/ecs-agent/logger/field" + "github.com/aws/amazon-ecs-agent/ecs-agent/metrics" + "github.com/aws/amazon-ecs-agent/ecs-agent/wsclient" + + "github.com/aws/aws-sdk-go/aws" + "github.com/pkg/errors" +) + +type TaskStopper interface { + StopTask(taskARN string) +} + +const ( + TaskStopVerificationACKMessageName = "TaskStopVerificationACKMessage" +) + +// taskStopVerificationACKResponder processes task stop verification ACK messages from ACS. +// It processes the message and sets the desired status of all tasks from the message to STOPPED. +type taskStopVerificationACKResponder struct { + taskStopper TaskStopper + messageIDAccessor ManifestMessageIDAccessor + metricsFactory metrics.EntryFactory + respond wsclient.RespondFunc +} + +// NewTaskStopVerificationACKResponder returns an instance of the taskStopVerificationACKResponder struct. +func NewTaskStopVerificationACKResponder( + taskStopper TaskStopper, + messageIDAccessor ManifestMessageIDAccessor, + metricsFactory metrics.EntryFactory, + responseSender wsclient.RespondFunc) *taskStopVerificationACKResponder { + r := &taskStopVerificationACKResponder{ + taskStopper: taskStopper, + messageIDAccessor: messageIDAccessor, + metricsFactory: metricsFactory, + } + r.respond = ResponseToACSSender(r.Name(), responseSender) + return r +} + +func (*taskStopVerificationACKResponder) Name() string { return "task stop verification ACK responder" } + +func (r *taskStopVerificationACKResponder) HandlerFunc() wsclient.RequestHandler { + return r.handleTaskStopVerificationACK +} + +// handleTaskStopVerificationACK goes through the list of verified tasks to be stopped +// and stops each one by setting the desired status of each task to STOPPED. +func (r *taskStopVerificationACKResponder) handleTaskStopVerificationACK(message *ecsacs.TaskStopVerificationAck) error { + logger.Debug(fmt.Sprintf("Handling %s", TaskStopVerificationACKMessageName)) + + // Ensure that message is valid and that a corresponding task manifest message has been processed before. + ackMessageID := aws.StringValue(message.MessageId) + manifestMessageID := r.messageIDAccessor.GetMessageID() + if ackMessageID == "" || ackMessageID != manifestMessageID { + return errors.New("Invalid messageID received: " + ackMessageID + " Manifest Message ID: " + manifestMessageID) + } + + // Reset the message id so that the message with same message id is not processed twice. + r.messageIDAccessor.SetMessageID("") + + // Loop through all tasks in the verified stop list and set the desired status of each one to STOPPED. + tasksToStop := message.StopTasks + for _, task := range tasksToStop { + taskARN := aws.StringValue(task.TaskArn) + metricFields := logger.Fields{ + field.MessageID: aws.StringValue(message.MessageId), + field.TaskARN: taskARN, + } + r.metricsFactory.New(metrics.TaskStoppedMetricName).WithFields(metricFields).Done(nil) + + // Send request to the task stopper to stop the task. + logger.Info("Sending message to task stopper to stop task", logger.Fields{ + field.TaskARN: taskARN, + }) + r.taskStopper.StopTask(taskARN) + } + return nil +} diff --git a/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/metrics/constants.go b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/metrics/constants.go index d3ada8cee85..4979940c335 100644 --- a/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/metrics/constants.go +++ b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/metrics/constants.go @@ -26,4 +26,12 @@ const ( // AttachResourceResponder attachResourceResponderNamespace = "ResourceAttachment" ResourceValidationMetricName = attachResourceResponderNamespace + ".Validation" + + // TaskManifestResponder + taskManifestResponderNamespace = "TaskManifestResponder" + TaskManifestHandlingDuration = taskManifestResponderNamespace + ".Duration" + + // TaskStopVerificationACKResponder + taskStopVerificationACKResponderNamespace = "TaskStopVeificationACKResponder" + TaskStoppedMetricName = taskStopVerificationACKResponderNamespace + ".TaskStopped" ) diff --git a/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/metrics/metrics.go b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/metrics/metrics.go index f2a9c2e246b..769e9fb92c7 100644 --- a/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/metrics/metrics.go +++ b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/metrics/metrics.go @@ -42,8 +42,8 @@ type Entry interface { // for reporting numerical values related to any operation, for instance the data transfer // rate for an image pull operation. WithGauge(value interface{}) Entry - // Done makes a metric operation as complete. It records the end time of the operation - // flushes the metrics to a persistent store. + // Done makes a metric operation as complete. It records the end time of the operation and + // flushes the metric to a persistent store. Done(err error) } diff --git a/ecs-agent/acs/session/task_stop_verification_ack_responder.go b/ecs-agent/acs/session/task_stop_verification_ack_responder.go new file mode 100644 index 00000000000..bc4a31608ac --- /dev/null +++ b/ecs-agent/acs/session/task_stop_verification_ack_responder.go @@ -0,0 +1,86 @@ +package session + +import ( + "fmt" + + "github.com/aws/amazon-ecs-agent/ecs-agent/acs/model/ecsacs" + "github.com/aws/amazon-ecs-agent/ecs-agent/logger" + "github.com/aws/amazon-ecs-agent/ecs-agent/logger/field" + "github.com/aws/amazon-ecs-agent/ecs-agent/metrics" + "github.com/aws/amazon-ecs-agent/ecs-agent/wsclient" + + "github.com/aws/aws-sdk-go/aws" + "github.com/pkg/errors" +) + +type TaskStopper interface { + StopTask(taskARN string) +} + +const ( + TaskStopVerificationACKMessageName = "TaskStopVerificationACKMessage" +) + +// taskStopVerificationACKResponder processes task stop verification ACK messages from ACS. +// It processes the message and sets the desired status of all tasks from the message to STOPPED. +type taskStopVerificationACKResponder struct { + taskStopper TaskStopper + messageIDAccessor ManifestMessageIDAccessor + metricsFactory metrics.EntryFactory + respond wsclient.RespondFunc +} + +// NewTaskStopVerificationACKResponder returns an instance of the taskStopVerificationACKResponder struct. +func NewTaskStopVerificationACKResponder( + taskStopper TaskStopper, + messageIDAccessor ManifestMessageIDAccessor, + metricsFactory metrics.EntryFactory, + responseSender wsclient.RespondFunc) *taskStopVerificationACKResponder { + r := &taskStopVerificationACKResponder{ + taskStopper: taskStopper, + messageIDAccessor: messageIDAccessor, + metricsFactory: metricsFactory, + } + r.respond = ResponseToACSSender(r.Name(), responseSender) + return r +} + +func (*taskStopVerificationACKResponder) Name() string { return "task stop verification ACK responder" } + +func (r *taskStopVerificationACKResponder) HandlerFunc() wsclient.RequestHandler { + return r.handleTaskStopVerificationACK +} + +// handleTaskStopVerificationACK goes through the list of verified tasks to be stopped +// and stops each one by setting the desired status of each task to STOPPED. +func (r *taskStopVerificationACKResponder) handleTaskStopVerificationACK(message *ecsacs.TaskStopVerificationAck) error { + logger.Debug(fmt.Sprintf("Handling %s", TaskStopVerificationACKMessageName)) + + // Ensure that message is valid and that a corresponding task manifest message has been processed before. + ackMessageID := aws.StringValue(message.MessageId) + manifestMessageID := r.messageIDAccessor.GetMessageID() + if ackMessageID == "" || ackMessageID != manifestMessageID { + return errors.New("Invalid messageID received: " + ackMessageID + " Manifest Message ID: " + manifestMessageID) + } + + // Reset the message id so that the message with same message id is not processed twice. + r.messageIDAccessor.SetMessageID("") + + // Loop through all tasks in the verified stop list and set the desired status of each one to STOPPED. + tasksToStop := message.StopTasks + for _, task := range tasksToStop { + taskARN := aws.StringValue(task.TaskArn) + metricFields := logger.Fields{ + field.MessageID: aws.StringValue(message.MessageId), + field.TaskARN: taskARN, + } + r.metricsFactory.New(metrics.TaskStoppedMetricName).WithFields(metricFields).Done(nil) + + // Send request to the task stopper to stop the task. + logger.Info("Sending message to task stopper to stop task", logger.Fields{ + field.TaskARN: taskARN, + }) + r.taskStopper.StopTask(taskARN) + } + return nil +} diff --git a/ecs-agent/metrics/constants.go b/ecs-agent/metrics/constants.go index d3ada8cee85..4979940c335 100644 --- a/ecs-agent/metrics/constants.go +++ b/ecs-agent/metrics/constants.go @@ -26,4 +26,12 @@ const ( // AttachResourceResponder attachResourceResponderNamespace = "ResourceAttachment" ResourceValidationMetricName = attachResourceResponderNamespace + ".Validation" + + // TaskManifestResponder + taskManifestResponderNamespace = "TaskManifestResponder" + TaskManifestHandlingDuration = taskManifestResponderNamespace + ".Duration" + + // TaskStopVerificationACKResponder + taskStopVerificationACKResponderNamespace = "TaskStopVeificationACKResponder" + TaskStoppedMetricName = taskStopVerificationACKResponderNamespace + ".TaskStopped" )