From 0be8253aec839e6f7b23e2c19f8d5b9a086e7927 Mon Sep 17 00:00:00 2001 From: ohsoo Date: Thu, 20 Jul 2023 20:43:08 +0000 Subject: [PATCH 1/2] Add task stop verification ack to ecs-agent module --- .../acs/session/task_stop_verification_ack | 84 ++++++++++++++++++ .../ecs-agent/metrics/metrics.go | 4 +- .../acs/session/task_stop_verification_ack | 87 +++++++++++++++++++ 3 files changed, 173 insertions(+), 2 deletions(-) create mode 100644 agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/task_stop_verification_ack create mode 100644 ecs-agent/acs/session/task_stop_verification_ack diff --git a/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/task_stop_verification_ack b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/task_stop_verification_ack new file mode 100644 index 00000000000..34103869a60 --- /dev/null +++ b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/task_stop_verification_ack @@ -0,0 +1,84 @@ +package session + +import ( + "fmt" + + "github.com/aws/amazon-ecs-agent/ecs-agent/acs/model/ecsacs" + "github.com/aws/amazon-ecs-agent/ecs-agent/logger" + "github.com/aws/amazon-ecs-agent/ecs-agent/logger/field" + "github.com/aws/amazon-ecs-agent/ecs-agent/metrics" + "github.com/aws/amazon-ecs-agent/ecs-agent/wsclient" + + "github.com/aws/aws-sdk-go/aws" + "github.com/pkg/errors" +) + +type TaskStopper interface { + StopTask(taskARN string) +} + +const ( + TaskStopVerificationACKMessageName = "TaskStopVerificationACKMessage" +) + +// taskStopVerificationACKResponder processes task stop verification ACK messages from ACS. +// It processes the message and sets the desired status of all tasks from the message to STOPPED. +type taskStopVerificationACKResponder struct { + taskStopper TaskStopper + messageIDAccessor ManifestMessageIDAccessor + metricsFactory metrics.EntryFactory + respond wsclient.RespondFunc +} + +// NewTaskStopVerificationACKResponder returns an instance of the taskStopVerificationACKResponder struct. +func NewTaskStopVerificationACKResponder( + taskStopper TaskStopper, + messageIDAccessor ManifestMessageIDAccessor, + metricsFactory metrics.EntryFactory, + responseSender wsclient.RespondFunc) *taskStopVerificationACKResponder { + r := &taskStopVerificationACKResponder{ + taskStopper: taskStopper, + messageIDAccessor: messageIDAccessor, + metricsFactory: metricsFactory, + } + r.respond = ResponseToACSSender(r.Name(), responseSender) + return r +} + +func (*taskStopVerificationACKResponder) Name() string { return "task stop verification ACK responder" } + +func (r *taskStopVerificationACKResponder) HandlerFunc() wsclient.RequestHandler { + return r.handleTaskStopVerificationACK +} + +// handleTaskStopVerificationACK goes through the list of verified tasks to be stopped +// and stops each one by setting the desired status of each task to STOPPED. +func (r *taskStopVerificationACKResponder) handleTaskStopVerificationACK(message *ecsacs.TaskStopVerificationAck) error { + logger.Debug(fmt.Sprintf("Handling %s", TaskStopVerificationACKMessageName)) + + // Ensure that message is valid and that a corresponding task manifest message has been processed before. + ackMessageID := aws.StringValue(message.MessageId) + manifestMessageID := r.messageIDAccessor.GetMessageID() + if ackMessageID == "" || ackMessageID != manifestMessageID { + return errors.New("Invalid messageID received: " + ackMessageID + " Manifest Message ID: " + manifestMessageID) + } + + // Loop through all tasks in the verified stop list and set the desired status of each one to STOPPED. + tasksToStop := message.StopTasks + for _, task := range tasksToStop { + taskARN := aws.StringValue(task.TaskArn) + metricFields := map[string]interface{}{ + "MessageID": aws.StringValue(message.MessageId), + "TaskARN": taskARN, + } + r.metricsFactory.New(metrics.TaskStoppedMetricName).WithFields(metricFields).Done(nil) + + // Send request to the task stopper to stop the task this will be executed asynchronously + // in the context of the task stopper's message channel. + logger.Info("Sending message to task stopper to stop task", logger.Fields{ + field.TaskARN: taskARN, + }) + r.taskStopper.StopTask(taskARN) + } + return nil +} diff --git a/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/metrics/metrics.go b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/metrics/metrics.go index f2a9c2e246b..769e9fb92c7 100644 --- a/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/metrics/metrics.go +++ b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/metrics/metrics.go @@ -42,8 +42,8 @@ type Entry interface { // for reporting numerical values related to any operation, for instance the data transfer // rate for an image pull operation. WithGauge(value interface{}) Entry - // Done makes a metric operation as complete. It records the end time of the operation - // flushes the metrics to a persistent store. + // Done makes a metric operation as complete. It records the end time of the operation and + // flushes the metric to a persistent store. Done(err error) } diff --git a/ecs-agent/acs/session/task_stop_verification_ack b/ecs-agent/acs/session/task_stop_verification_ack new file mode 100644 index 00000000000..bb0ce99b901 --- /dev/null +++ b/ecs-agent/acs/session/task_stop_verification_ack @@ -0,0 +1,87 @@ +package session + +import ( + "fmt" + + "github.com/aws/amazon-ecs-agent/ecs-agent/acs/model/ecsacs" + "github.com/aws/amazon-ecs-agent/ecs-agent/logger" + "github.com/aws/amazon-ecs-agent/ecs-agent/logger/field" + "github.com/aws/amazon-ecs-agent/ecs-agent/metrics" + "github.com/aws/amazon-ecs-agent/ecs-agent/wsclient" + + "github.com/aws/aws-sdk-go/aws" + "github.com/pkg/errors" +) + +type TaskStopper interface { + StopTask(taskARN string) +} + +const ( + TaskStopVerificationACKMessageName = "TaskStopVerificationACKMessage" +) + +// taskStopVerificationACKResponder processes task stop verification ACK messages from ACS. +// It processes the message and sets the desired status of all tasks from the message to STOPPED. +type taskStopVerificationACKResponder struct { + taskStopper TaskStopper + messageIDAccessor ManifestMessageIDAccessor + metricsFactory metrics.EntryFactory + respond wsclient.RespondFunc +} + +// NewTaskStopVerificationACKResponder returns an instance of the taskStopVerificationACKResponder struct. +func NewTaskStopVerificationACKResponder( + taskStopper TaskStopper, + messageIDAccessor ManifestMessageIDAccessor, + metricsFactory metrics.EntryFactory, + responseSender wsclient.RespondFunc) *taskStopVerificationACKResponder { + r := &taskStopVerificationACKResponder{ + taskStopper: taskStopper, + messageIDAccessor: messageIDAccessor, + metricsFactory: metricsFactory, + } + r.respond = ResponseToACSSender(r.Name(), responseSender) + return r +} + +func (*taskStopVerificationACKResponder) Name() string { return "task stop verification ACK responder" } + +func (r *taskStopVerificationACKResponder) HandlerFunc() wsclient.RequestHandler { + return r.handleTaskStopVerificationACK +} + +// handleTaskStopVerificationACK goes through the list of verified tasks to be stopped +// and stops each one by setting the desired status of each task to STOPPED. +func (r *taskStopVerificationACKResponder) handleTaskStopVerificationACK(message *ecsacs.TaskStopVerificationAck) error { + logger.Debug(fmt.Sprintf("Handling %s", TaskStopVerificationACKMessageName)) + + // Ensure that message is valid and that a corresponding task manifest message has been processed before. + ackMessageID := aws.StringValue(message.MessageId) + manifestMessageID := r.messageIDAccessor.GetMessageID() + if ackMessageID == "" || ackMessageID != manifestMessageID { + return errors.New("Invalid messageID received: " + ackMessageID + " Manifest Message ID: " + manifestMessageID) + } + + // Reset the message id so that the message with same message id is not processed twice. + r.messageIDAccessor.SetMessageID("") + + // Loop through all tasks in the verified stop list and set the desired status of each one to STOPPED. + tasksToStop := message.StopTasks + for _, task := range tasksToStop { + taskARN := aws.StringValue(task.TaskArn) + metricFields := map[string]interface{}{ + "MessageID": aws.StringValue(message.MessageId), + "TaskARN": taskARN, + } + r.metricsFactory.New(metrics.TaskStoppedMetricName).WithFields(metricFields).Done(nil) + + // Send request to the task stopper to stop the task this will be executed asynchronously + // in the context of the task stopper's message channel. + logger.Info("Sending message to task stopper to stop task", logger.Fields{ + field.TaskARN: taskARN, + }) + r.taskStopper.StopTask(taskARN) + } + return nil +} From b2307322d705e71960fc9a9bf6f7d54bae182102 Mon Sep 17 00:00:00 2001 From: ohsoo Date: Thu, 20 Jul 2023 20:43:08 +0000 Subject: [PATCH 2/2] Add task stop verification ack to ecs-agent module --- .../session/task_stop_verification_ack_responder.go | 9 ++++----- .../amazon-ecs-agent/ecs-agent/metrics/constants.go | 8 ++++++++ .../session/task_stop_verification_ack_responder.go | 12 +++++++----- ecs-agent/metrics/constants.go | 8 ++++++++ 4 files changed, 27 insertions(+), 10 deletions(-) rename ecs-agent/acs/session/task_stop_verification_ack => agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/task_stop_verification_ack_responder.go (91%) rename agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/task_stop_verification_ack => ecs-agent/acs/session/task_stop_verification_ack_responder.go (90%) diff --git a/ecs-agent/acs/session/task_stop_verification_ack b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/task_stop_verification_ack_responder.go similarity index 91% rename from ecs-agent/acs/session/task_stop_verification_ack rename to agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/task_stop_verification_ack_responder.go index bb0ce99b901..bc4a31608ac 100644 --- a/ecs-agent/acs/session/task_stop_verification_ack +++ b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/task_stop_verification_ack_responder.go @@ -70,14 +70,13 @@ func (r *taskStopVerificationACKResponder) handleTaskStopVerificationACK(message tasksToStop := message.StopTasks for _, task := range tasksToStop { taskARN := aws.StringValue(task.TaskArn) - metricFields := map[string]interface{}{ - "MessageID": aws.StringValue(message.MessageId), - "TaskARN": taskARN, + metricFields := logger.Fields{ + field.MessageID: aws.StringValue(message.MessageId), + field.TaskARN: taskARN, } r.metricsFactory.New(metrics.TaskStoppedMetricName).WithFields(metricFields).Done(nil) - // Send request to the task stopper to stop the task this will be executed asynchronously - // in the context of the task stopper's message channel. + // Send request to the task stopper to stop the task. logger.Info("Sending message to task stopper to stop task", logger.Fields{ field.TaskARN: taskARN, }) diff --git a/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/metrics/constants.go b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/metrics/constants.go index d3ada8cee85..4979940c335 100644 --- a/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/metrics/constants.go +++ b/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/metrics/constants.go @@ -26,4 +26,12 @@ const ( // AttachResourceResponder attachResourceResponderNamespace = "ResourceAttachment" ResourceValidationMetricName = attachResourceResponderNamespace + ".Validation" + + // TaskManifestResponder + taskManifestResponderNamespace = "TaskManifestResponder" + TaskManifestHandlingDuration = taskManifestResponderNamespace + ".Duration" + + // TaskStopVerificationACKResponder + taskStopVerificationACKResponderNamespace = "TaskStopVeificationACKResponder" + TaskStoppedMetricName = taskStopVerificationACKResponderNamespace + ".TaskStopped" ) diff --git a/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/task_stop_verification_ack b/ecs-agent/acs/session/task_stop_verification_ack_responder.go similarity index 90% rename from agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/task_stop_verification_ack rename to ecs-agent/acs/session/task_stop_verification_ack_responder.go index 34103869a60..bc4a31608ac 100644 --- a/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/acs/session/task_stop_verification_ack +++ b/ecs-agent/acs/session/task_stop_verification_ack_responder.go @@ -63,18 +63,20 @@ func (r *taskStopVerificationACKResponder) handleTaskStopVerificationACK(message return errors.New("Invalid messageID received: " + ackMessageID + " Manifest Message ID: " + manifestMessageID) } + // Reset the message id so that the message with same message id is not processed twice. + r.messageIDAccessor.SetMessageID("") + // Loop through all tasks in the verified stop list and set the desired status of each one to STOPPED. tasksToStop := message.StopTasks for _, task := range tasksToStop { taskARN := aws.StringValue(task.TaskArn) - metricFields := map[string]interface{}{ - "MessageID": aws.StringValue(message.MessageId), - "TaskARN": taskARN, + metricFields := logger.Fields{ + field.MessageID: aws.StringValue(message.MessageId), + field.TaskARN: taskARN, } r.metricsFactory.New(metrics.TaskStoppedMetricName).WithFields(metricFields).Done(nil) - // Send request to the task stopper to stop the task this will be executed asynchronously - // in the context of the task stopper's message channel. + // Send request to the task stopper to stop the task. logger.Info("Sending message to task stopper to stop task", logger.Fields{ field.TaskARN: taskARN, }) diff --git a/ecs-agent/metrics/constants.go b/ecs-agent/metrics/constants.go index d3ada8cee85..4979940c335 100644 --- a/ecs-agent/metrics/constants.go +++ b/ecs-agent/metrics/constants.go @@ -26,4 +26,12 @@ const ( // AttachResourceResponder attachResourceResponderNamespace = "ResourceAttachment" ResourceValidationMetricName = attachResourceResponderNamespace + ".Validation" + + // TaskManifestResponder + taskManifestResponderNamespace = "TaskManifestResponder" + TaskManifestHandlingDuration = taskManifestResponderNamespace + ".Duration" + + // TaskStopVerificationACKResponder + taskStopVerificationACKResponderNamespace = "TaskStopVeificationACKResponder" + TaskStoppedMetricName = taskStopVerificationACKResponderNamespace + ".TaskStopped" )