From 527d9e3110693fa3fc5cba7a67a11bca6c4fd971 Mon Sep 17 00:00:00 2001 From: Heming Han Date: Tue, 27 Jun 2023 15:06:11 -0700 Subject: [PATCH 1/4] Integrate with TCSHandler in /ecs-agent module --- agent/app/agent.go | 29 ++-- agent/app/agent_unix_test.go | 2 +- agent/stats/reporter/reporter.go | 36 +++-- agent/stats/reporter/reporter_test.go | 125 +++++++++++++++ agent/tcs/handler/handler.go | 223 -------------------------- agent/tcs/handler/types.go | 64 -------- agent/tcs/handler/types_test.go | 77 --------- 7 files changed, 158 insertions(+), 398 deletions(-) create mode 100644 agent/stats/reporter/reporter_test.go delete mode 100644 agent/tcs/handler/handler.go delete mode 100644 agent/tcs/handler/types.go delete mode 100644 agent/tcs/handler/types_test.go diff --git a/agent/app/agent.go b/agent/app/agent.go index fa6d73c1c63..1b6d17bc0d1 100644 --- a/agent/app/agent.go +++ b/agent/app/agent.go @@ -47,8 +47,8 @@ import ( "github.com/aws/amazon-ecs-agent/agent/sighandlers/exitcodes" "github.com/aws/amazon-ecs-agent/agent/statemanager" "github.com/aws/amazon-ecs-agent/agent/stats" + "github.com/aws/amazon-ecs-agent/agent/stats/reporter" "github.com/aws/amazon-ecs-agent/agent/taskresource" - tcshandler "github.com/aws/amazon-ecs-agent/agent/tcs/handler" "github.com/aws/amazon-ecs-agent/agent/utils" "github.com/aws/amazon-ecs-agent/agent/utils/loader" "github.com/aws/amazon-ecs-agent/agent/utils/mobypkgwrapper" @@ -871,21 +871,18 @@ func (agent *ecsAgent) startAsyncRoutines( } go statsEngine.StartMetricsPublish() - telemetrySessionParams := tcshandler.TelemetrySessionParams{ - Ctx: agent.ctx, - CredentialProvider: agent.credentialProvider, - Cfg: agent.cfg, - ContainerInstanceArn: agent.containerInstanceARN, - DeregisterInstanceEventStream: deregisterInstanceEventStream, - ECSClient: client, - TaskEngine: taskEngine, - StatsEngine: statsEngine, - MetricsChannel: telemetryMessages, - HealthChannel: healthMessages, - Doctor: doctor, - } - // Start metrics session in a go routine - go tcshandler.StartMetricsSession(&telemetrySessionParams) + session, err := reporter.NewDockerTelemetrySession(agent.containerInstanceARN, agent.credentialProvider, agent.cfg, deregisterInstanceEventStream, + client, taskEngine, telemetryMessages, healthMessages, doctor) + if err != nil { + seelog.Warnf("Error creating telemetry session: %v", err) + return + } + if session == nil { + seelog.Infof("Metrics disabled on the instance.") + return + } + + go session.Start(agent.ctx) } func (agent *ecsAgent) startSpotInstanceDrainingPoller(ctx context.Context, client api.ECSClient) { diff --git a/agent/app/agent_unix_test.go b/agent/app/agent_unix_test.go index 543f3e4a652..03567e11369 100644 --- a/agent/app/agent_unix_test.go +++ b/agent/app/agent_unix_test.go @@ -482,7 +482,7 @@ func TestDoStartCgroupInitHappyPath(t *testing.T) { }).Return("poll-endpoint", nil), client.EXPECT().DiscoverPollEndpoint(gomock.Any()).Return("acs-endpoint", nil).AnyTimes(), client.EXPECT().DiscoverTelemetryEndpoint(gomock.Any()).Do(func(x interface{}) { - // Ensures that the test waits until telemetry session has bee started + // Ensures that the test waits until telemetry session has been started discoverEndpointsInvoked.Done() }).Return("telemetry-endpoint", nil), client.EXPECT().DiscoverTelemetryEndpoint(gomock.Any()).Return( diff --git a/agent/stats/reporter/reporter.go b/agent/stats/reporter/reporter.go index 11118ec6143..c850dd8912f 100644 --- a/agent/stats/reporter/reporter.go +++ b/agent/stats/reporter/reporter.go @@ -19,7 +19,6 @@ import ( "github.com/aws/amazon-ecs-agent/ecs-agent/utils/retry" "github.com/aws/amazon-ecs-agent/ecs-agent/wsclient" "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/cihub/seelog" ) const ( @@ -48,22 +47,20 @@ func NewDockerTelemetrySession( taskEngine engine.TaskEngine, metricsChannel <-chan ecstcs.TelemetryMessage, healthChannel <-chan ecstcs.HealthMessage, - doctor *doctor.Doctor) *DockerTelemetrySession { + doctor *doctor.Doctor) (*DockerTelemetrySession, error) { ok, cfgParseErr := isContainerHealthMetricsDisabled(cfg) if cfgParseErr != nil { - seelog.Warnf("Error starting metrics session: %v", cfgParseErr) - return nil + logger.Warn("Error starting metrics session", logger.Fields{ + field.Error: cfgParseErr, + }) + return nil, cfgParseErr } if ok { - seelog.Warnf("Metrics were disabled, not starting the telemetry session") - return nil + logger.Warn("Metrics were disabled, not starting the telemetry session") + return nil, nil } agentVersion, agentHash, containerRuntimeVersion := generateVersionInfo(taskEngine) - if cfg == nil { - logger.Error("Config is empty in the tcs session parameter") - return nil - } session := tcshandler.NewTelemetrySession( containerInstanceArn, @@ -90,7 +87,7 @@ func NewDockerTelemetrySession( healthChannel, doctor, ) - return &DockerTelemetrySession{session, ecsClient, containerInstanceArn} + return &DockerTelemetrySession{session, ecsClient, containerInstanceArn}, nil } // Start "overloads" tcshandler.TelemetrySession's Start with extra handling of discoverTelemetryEndpoint result. @@ -99,18 +96,23 @@ func NewDockerTelemetrySession( func (session *DockerTelemetrySession) Start(ctx context.Context) error { backoff := retry.NewExponentialBackoff(time.Second, 1*time.Minute, 0.2, 2) for { + select { + case <-ctx.Done(): + logger.Info("TCS session exited cleanly.") + return nil + default: + } endpoint, tcsError := discoverPollEndpoint(session.containerInstanceArn, session.ecsClient) if tcsError == nil { tcsError = session.s.StartTelemetrySession(ctx, endpoint) } - switch tcsError { - case context.Canceled, context.DeadlineExceeded: - return tcsError - case io.EOF, nil: + if tcsError == nil || tcsError == io.EOF { logger.Info("TCS Websocket connection closed for a valid reason") backoff.Reset() - default: - seelog.Errorf("Error: lost websocket connection with ECS Telemetry service (TCS): %v", tcsError) + } else { + logger.Error("Error: lost websocket connection with ECS Telemetry service (TCS)", logger.Fields{ + field.Error: tcsError, + }) time.Sleep(backoff.Duration()) } } diff --git a/agent/stats/reporter/reporter_test.go b/agent/stats/reporter/reporter_test.go new file mode 100644 index 00000000000..64383359123 --- /dev/null +++ b/agent/stats/reporter/reporter_test.go @@ -0,0 +1,125 @@ +package reporter + +import ( + "context" + "errors" + "testing" + + "github.com/aws/amazon-ecs-agent/agent/config" + mock_engine "github.com/aws/amazon-ecs-agent/agent/engine/mocks" + "github.com/aws/amazon-ecs-agent/agent/version" + "github.com/aws/amazon-ecs-agent/ecs-agent/doctor" + "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +const ( + testContainerInstanceArn = "testContainerInstanceArn" + testCluster = "testCluster" + testRegion = "us-west-2" + testDockerEndpoint = "testDockerEndpoint" + testDockerVersion = "testDockerVersion" +) + +func TestNewDockerTelemetrySession(t *testing.T) { + emptyDoctor, _ := doctor.NewDoctor([]doctor.Healthcheck{}, testCluster, testContainerInstanceArn) + testCredentials := credentials.NewStaticCredentials("test-id", "test-secret", "test-token") + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockEngine := mock_engine.NewMockTaskEngine(ctrl) + mockEngine.EXPECT().Version().Return(testDockerVersion, nil) + testCases := []struct { + name string + cfg *config.Config + expectedSession bool + expectedError bool + }{ + { + name: "happy case", + cfg: &config.Config{ + DisableMetrics: config.BooleanDefaultFalse{}, + DisableDockerHealthCheck: config.BooleanDefaultFalse{}, + Cluster: testCluster, + AWSRegion: testRegion, + AcceptInsecureCert: false, + DockerEndpoint: testDockerEndpoint, + }, + expectedSession: true, + expectedError: false, + }, + { + name: "cfg parsing error", + cfg: nil, + expectedSession: false, + expectedError: true, + }, + { + name: "metrics disabled", + cfg: &config.Config{ + DisableMetrics: config.BooleanDefaultFalse{ + Value: config.ExplicitlyEnabled, + }, + DisableDockerHealthCheck: config.BooleanDefaultFalse{ + Value: config.ExplicitlyEnabled, + }, + Cluster: testCluster, + AWSRegion: testRegion, + AcceptInsecureCert: false, + DockerEndpoint: testDockerEndpoint, + }, + expectedSession: false, + expectedError: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dockerTelemetrySession, err := NewDockerTelemetrySession( + testContainerInstanceArn, + testCredentials, + tc.cfg, + eventstream.NewEventStream("Deregister_Instance", context.Background()), + nil, + mockEngine, + nil, + nil, + emptyDoctor, + ) + if tc.expectedSession { + assert.NotNil(t, dockerTelemetrySession) + } else { + assert.Nil(t, dockerTelemetrySession) + } + + if tc.expectedError { + assert.NotNil(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestGenerateVersionInfo_GetVersionError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockEngine := mock_engine.NewMockTaskEngine(ctrl) + mockEngine.EXPECT().Version().Times(1).Return("", errors.New("error")) + agentVersion, agentHash, containerRuntimeVersion := generateVersionInfo(mockEngine) + assert.Equal(t, version.Version, agentVersion) + assert.Equal(t, version.GitShortHash, agentHash) + assert.Equal(t, "", containerRuntimeVersion) +} + +func TestGenerateVersionInfo_NoError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockEngine := mock_engine.NewMockTaskEngine(ctrl) + mockEngine.EXPECT().Version().Times(1).Return(testDockerVersion, nil) + agentVersion, agentHash, containerRuntimeVersion := generateVersionInfo(mockEngine) + assert.Equal(t, version.Version, agentVersion) + assert.Equal(t, version.GitShortHash, agentHash) + assert.Equal(t, testDockerVersion, containerRuntimeVersion) +} diff --git a/agent/tcs/handler/handler.go b/agent/tcs/handler/handler.go deleted file mode 100644 index 97e9db5de50..00000000000 --- a/agent/tcs/handler/handler.go +++ /dev/null @@ -1,223 +0,0 @@ -// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -package tcshandler - -import ( - "context" - "io" - "net/url" - "strings" - "time" - - "github.com/aws/amazon-ecs-agent/agent/config" - "github.com/aws/amazon-ecs-agent/agent/engine" - "github.com/aws/amazon-ecs-agent/agent/version" - "github.com/aws/amazon-ecs-agent/ecs-agent/doctor" - "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" - tcsclient "github.com/aws/amazon-ecs-agent/ecs-agent/tcs/client" - "github.com/aws/amazon-ecs-agent/ecs-agent/tcs/model/ecstcs" - "github.com/aws/amazon-ecs-agent/ecs-agent/utils/retry" - "github.com/aws/amazon-ecs-agent/ecs-agent/wsclient" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/cihub/seelog" -) - -const ( - // The maximum time to wait between heartbeats without disconnecting - defaultHeartbeatTimeout = 1 * time.Minute - defaultHeartbeatJitter = 1 * time.Minute - // wsRWTimeout is the duration of read and write deadline for the - // websocket connection - wsRWTimeout = 2*defaultHeartbeatTimeout + defaultHeartbeatJitter - deregisterContainerInstanceHandler = "TCSDeregisterContainerInstanceHandler" -) - -// StartMetricsSession starts a metric session. It initializes the stats engine -// and invokes StartSession. -func StartMetricsSession(params *TelemetrySessionParams) { - ok, err := params.isContainerHealthMetricsDisabled() - if err != nil { - seelog.Warnf("Error starting metrics session: %v", err) - return - } - if ok { - seelog.Warnf("Metrics were disabled, not starting the telemetry session") - return - } - - err = StartSession(params) - if err != nil { - seelog.Warnf("Error starting metrics session with backend: %v", err) - } -} - -// StartSession creates a session with the backend and handles requests -// using the passed in arguments. -// The engine is expected to be initialized and gathering container metrics by -// the time the websocket client starts using it. -func StartSession(params *TelemetrySessionParams) error { - backoff := retry.NewExponentialBackoff(time.Second, 1*time.Minute, 0.2, 2) - for { - tcsError := startTelemetrySession(params) - if tcsError == nil || tcsError == io.EOF { - seelog.Info("TCS Websocket connection closed for a valid reason") - backoff.Reset() - } else { - seelog.Errorf("Error: lost websocket connection with ECS Telemetry service (TCS): %v", tcsError) - params.time().Sleep(backoff.Duration()) - } - select { - case <-params.Ctx.Done(): - seelog.Info("TCS session exited cleanly.") - return nil - default: - } - } -} - -func startTelemetrySession(params *TelemetrySessionParams) error { - tcsEndpoint, err := params.ECSClient.DiscoverTelemetryEndpoint(params.ContainerInstanceArn) - if err != nil { - seelog.Errorf("tcs: unable to discover poll endpoint: %v", err) - return err - } - url := formatURL(tcsEndpoint, params.Cfg.Cluster, params.ContainerInstanceArn, params.TaskEngine) - return startSession(params.Ctx, url, params.Cfg, params.CredentialProvider, params.MetricsChannel, - params.HealthChannel, defaultHeartbeatTimeout, defaultHeartbeatJitter, - config.DefaultContainerMetricsPublishInterval, params.DeregisterInstanceEventStream, params.Doctor) -} - -func startSession( - ctx context.Context, - url string, - cfg *config.Config, - credentialProvider *credentials.Credentials, - metricsChannel <-chan ecstcs.TelemetryMessage, - healthChannel <-chan ecstcs.HealthMessage, - heartbeatTimeout, heartbeatJitter, - publishMetricsInterval time.Duration, - deregisterInstanceEventStream *eventstream.EventStream, - doctor *doctor.Doctor, -) error { - client := tcsclient.New(url, &wsclient.WSClientMinAgentConfig{ - AWSRegion: cfg.AWSRegion, - AcceptInsecureCert: cfg.AcceptInsecureCert, - DockerEndpoint: cfg.DockerEndpoint, - IsDocker: true, - }, doctor, cfg.DisableMetrics.Enabled(), publishMetricsInterval, - credentialProvider, wsRWTimeout, metricsChannel, healthChannel) - defer client.Close() - - err := deregisterInstanceEventStream.Subscribe(deregisterContainerInstanceHandler, client.Disconnect) - if err != nil { - return err - } - defer deregisterInstanceEventStream.Unsubscribe(deregisterContainerInstanceHandler) - - err = client.Connect() - if err != nil { - seelog.Errorf("Error connecting to TCS: %v", err.Error()) - return err - } - seelog.Info("Connected to TCS endpoint") - // start a timer and listens for tcs heartbeats/acks. The timer is reset when - // we receive a heartbeat from the server or when a published metrics message - // is acked. - timer := time.NewTimer(retry.AddJitter(heartbeatTimeout, heartbeatJitter)) - defer timer.Stop() - client.AddRequestHandler(heartbeatHandler(timer)) - client.AddRequestHandler(ackPublishMetricHandler(timer)) - client.AddRequestHandler(ackPublishHealthMetricHandler(timer)) - client.AddRequestHandler(ackPublishInstanceStatusHandler(timer)) - client.SetAnyRequestHandler(anyMessageHandler(client)) - serveC := make(chan error, 1) - go func() { - serveC <- client.Serve(ctx) - }() - select { - case <-ctx.Done(): - // outer context done, agent is exiting - client.Disconnect() - case <-timer.C: - seelog.Info("TCS Connection hasn't had any activity for too long; disconnecting") - client.Disconnect() - case err := <-serveC: - return err - } - return nil -} - -// heartbeatHandler resets the heartbeat timer when HeartbeatMessage message is received from tcs. -func heartbeatHandler(timer *time.Timer) func(*ecstcs.HeartbeatMessage) { - return func(*ecstcs.HeartbeatMessage) { - seelog.Debug("Received HeartbeatMessage from tcs") - timer.Reset(retry.AddJitter(defaultHeartbeatTimeout, defaultHeartbeatJitter)) - } -} - -// ackPublishMetricHandler consumes the ack message from the backend. THe backend sends -// the ack each time it processes a metric message. -func ackPublishMetricHandler(timer *time.Timer) func(*ecstcs.AckPublishMetric) { - return func(*ecstcs.AckPublishMetric) { - seelog.Debug("Received AckPublishMetric from tcs") - timer.Reset(retry.AddJitter(defaultHeartbeatTimeout, defaultHeartbeatJitter)) - } -} - -// ackPublishHealthMetricHandler consumes the ack message from backend. The backend sends -// the ack each time it processes a health message -func ackPublishHealthMetricHandler(timer *time.Timer) func(*ecstcs.AckPublishHealth) { - return func(*ecstcs.AckPublishHealth) { - seelog.Debug("Received ACKPublishHealth from tcs") - timer.Reset(retry.AddJitter(defaultHeartbeatTimeout, defaultHeartbeatJitter)) - } -} - -// ackPublishInstanceStatusHandler consumes the ack message from backend. The backend sends -// the ack each time it processes a health message -func ackPublishInstanceStatusHandler(timer *time.Timer) func(*ecstcs.AckPublishInstanceStatus) { - return func(*ecstcs.AckPublishInstanceStatus) { - seelog.Debug("Received AckPublishInstanceStatus from tcs") - timer.Reset(retry.AddJitter(defaultHeartbeatTimeout, defaultHeartbeatJitter)) - } -} - -// anyMessageHandler handles any server message. Any server message means the -// connection is active -func anyMessageHandler(client wsclient.ClientServer) func(interface{}) { - return func(interface{}) { - seelog.Trace("TCS activity occurred") - // Reset read deadline as there's activity on the channel - if err := client.SetReadDeadline(time.Now().Add(wsRWTimeout)); err != nil { - seelog.Warnf("Unable to extend read deadline for TCS connection: %v", err) - } - } -} - -// formatURL returns formatted url for tcs endpoint. -func formatURL(endpoint string, cluster string, containerInstance string, taskEngine engine.TaskEngine) string { - tcsURL := endpoint - if !strings.HasSuffix(tcsURL, "/") { - tcsURL += "/" - } - query := url.Values{} - query.Set("cluster", cluster) - query.Set("containerInstance", containerInstance) - query.Set("agentVersion", version.Version) - query.Set("agentHash", version.GitHashString()) - if dockerVersion, err := taskEngine.Version(); err == nil { - query.Set("dockerVersion", dockerVersion) - } - return tcsURL + "ws?" + query.Encode() -} diff --git a/agent/tcs/handler/types.go b/agent/tcs/handler/types.go deleted file mode 100644 index 940267fba25..00000000000 --- a/agent/tcs/handler/types.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -package tcshandler - -import ( - "context" - "sync" - - "github.com/aws/amazon-ecs-agent/agent/api" - "github.com/aws/amazon-ecs-agent/agent/config" - "github.com/aws/amazon-ecs-agent/agent/engine" - "github.com/aws/amazon-ecs-agent/agent/stats" - "github.com/aws/amazon-ecs-agent/ecs-agent/doctor" - "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" - "github.com/aws/amazon-ecs-agent/ecs-agent/tcs/model/ecstcs" - "github.com/aws/amazon-ecs-agent/ecs-agent/utils/ttime" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/pkg/errors" -) - -// TelemetrySessionParams contains all the parameters required to start a tcs -// session -type TelemetrySessionParams struct { - Ctx context.Context - ContainerInstanceArn string - CredentialProvider *credentials.Credentials - Cfg *config.Config - DeregisterInstanceEventStream *eventstream.EventStream - ECSClient api.ECSClient - TaskEngine engine.TaskEngine - StatsEngine *stats.DockerStatsEngine - MetricsChannel <-chan ecstcs.TelemetryMessage - HealthChannel <-chan ecstcs.HealthMessage - Doctor *doctor.Doctor - _time ttime.Time - _timeOnce sync.Once -} - -func (params *TelemetrySessionParams) time() ttime.Time { - params._timeOnce.Do(func() { - if params._time == nil { - params._time = &ttime.DefaultTime{} - } - }) - return params._time -} - -func (params *TelemetrySessionParams) isContainerHealthMetricsDisabled() (bool, error) { - if params.Cfg != nil { - return params.Cfg.DisableMetrics.Enabled() && params.Cfg.DisableDockerHealthCheck.Enabled(), nil - } - return false, errors.New("Config is empty in the tcs session parameter") -} diff --git a/agent/tcs/handler/types_test.go b/agent/tcs/handler/types_test.go deleted file mode 100644 index 6879f5c1bff..00000000000 --- a/agent/tcs/handler/types_test.go +++ /dev/null @@ -1,77 +0,0 @@ -//go:build unit -// +build unit - -// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -package tcshandler - -import ( - "testing" - - "github.com/aws/amazon-ecs-agent/agent/config" - "github.com/pkg/errors" - "github.com/stretchr/testify/assert" -) - -func TestIsMetricsDisabled(t *testing.T) { - tcs := []struct { - param *TelemetrySessionParams - result bool - err error - description string - }{ - { - param: &TelemetrySessionParams{}, - result: false, - err: errors.New("Config is empty in the tcs session parameter"), - description: "Config not set should cause error", - }, - { - param: &TelemetrySessionParams{Cfg: &config.Config{DisableMetrics: config.BooleanDefaultFalse{Value: config.ExplicitlyDisabled}, DisableDockerHealthCheck: config.BooleanDefaultFalse{Value: config.ExplicitlyDisabled}}}, - result: false, - err: nil, - description: "No metrics was disable should return false", - }, - { - param: &TelemetrySessionParams{Cfg: &config.Config{DisableMetrics: config.BooleanDefaultFalse{Value: config.ExplicitlyEnabled}, DisableDockerHealthCheck: config.BooleanDefaultFalse{Value: config.ExplicitlyDisabled}}}, - result: false, - err: nil, - description: "Only health metrics was disable should return false", - }, - { - param: &TelemetrySessionParams{Cfg: &config.Config{DisableMetrics: config.BooleanDefaultFalse{Value: config.ExplicitlyDisabled}, DisableDockerHealthCheck: config.BooleanDefaultFalse{Value: config.ExplicitlyEnabled}}}, - result: false, - err: nil, - description: "Only telemetry metrics was disable should return false", - }, - { - param: &TelemetrySessionParams{Cfg: &config.Config{DisableMetrics: config.BooleanDefaultFalse{Value: config.ExplicitlyEnabled}, DisableDockerHealthCheck: config.BooleanDefaultFalse{Value: config.ExplicitlyEnabled}}}, - result: true, - err: nil, - description: "both telemetry and health metrics were disable should return true", - }, - } - - for _, tc := range tcs { - t.Run(tc.description, func(t *testing.T) { - ok, err := tc.param.isContainerHealthMetricsDisabled() - if tc.err != nil { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - assert.Equal(t, tc.result, ok) - }) - } -} From f966ccb788de03ecfd11aea9aa5611a435ad9831 Mon Sep 17 00:00:00 2001 From: Heming Han Date: Thu, 29 Jun 2023 12:29:29 -0700 Subject: [PATCH 2/4] add license --- agent/stats/reporter/reporter.go | 13 +++++++++++++ agent/stats/reporter/reporter_test.go | 13 +++++++++++++ 2 files changed, 26 insertions(+) diff --git a/agent/stats/reporter/reporter.go b/agent/stats/reporter/reporter.go index c850dd8912f..46f2019c67b 100644 --- a/agent/stats/reporter/reporter.go +++ b/agent/stats/reporter/reporter.go @@ -1,3 +1,16 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + package reporter import ( diff --git a/agent/stats/reporter/reporter_test.go b/agent/stats/reporter/reporter_test.go index 64383359123..0b358534c13 100644 --- a/agent/stats/reporter/reporter_test.go +++ b/agent/stats/reporter/reporter_test.go @@ -1,3 +1,16 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + package reporter import ( From b695ba9faad96791c56fe1a04c3c604a5c14cc4a Mon Sep 17 00:00:00 2001 From: Heming Han Date: Fri, 30 Jun 2023 12:13:01 -0700 Subject: [PATCH 3/4] address comments --- agent/stats/reporter/reporter.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/agent/stats/reporter/reporter.go b/agent/stats/reporter/reporter.go index 46f2019c67b..ca41374c177 100644 --- a/agent/stats/reporter/reporter.go +++ b/agent/stats/reporter/reporter.go @@ -41,6 +41,10 @@ const ( // Default websocket client disconnection timeout initiated by agent defaultDisconnectionTimeout = 15 * time.Minute defaultDisconnectionJitter = 30 * time.Minute + backoffMin = 1 * time.Second + backoffMax = 1 * time.Minute + jitterMultiple = 0.2 + multiple = 2 ) type DockerTelemetrySession struct { @@ -107,22 +111,25 @@ func NewDockerTelemetrySession( // discoverTelemetryEndpoint and tcshandler.TelemetrySession's StartTelemetrySession errors are handled // (retryWithBackoff or return) in a combined manner func (session *DockerTelemetrySession) Start(ctx context.Context) error { - backoff := retry.NewExponentialBackoff(time.Second, 1*time.Minute, 0.2, 2) + backoff := retry.NewExponentialBackoff(backoffMin, backoffMax, jitterMultiple, multiple) for { select { case <-ctx.Done(): - logger.Info("TCS session exited cleanly.") + logger.Info("ECS Telemetry service (TCS) session exited cleanly.") return nil default: } endpoint, tcsError := discoverPollEndpoint(session.containerInstanceArn, session.ecsClient) if tcsError == nil { + // returning from StartTelemetrySession indicates a disconnection, need to reconnect. tcsError = session.s.StartTelemetrySession(ctx, endpoint) } if tcsError == nil || tcsError == io.EOF { + // reset backoff when TCS closed for a valid reason, such as connection expiring due to inactivity logger.Info("TCS Websocket connection closed for a valid reason") backoff.Reset() } else { + // backoff when there is unexpected error, such as invalid frame sent through connection. logger.Error("Error: lost websocket connection with ECS Telemetry service (TCS)", logger.Fields{ field.Error: tcsError, }) From b29472a49c887e36b93b37e3c3a163751704e57e Mon Sep 17 00:00:00 2001 From: Heming Han Date: Fri, 7 Jul 2023 15:36:28 -0700 Subject: [PATCH 4/4] rebase --- agent/tcs/handler/handler_test.go | 396 ------------------------------ 1 file changed, 396 deletions(-) delete mode 100644 agent/tcs/handler/handler_test.go diff --git a/agent/tcs/handler/handler_test.go b/agent/tcs/handler/handler_test.go deleted file mode 100644 index 69b6c08056d..00000000000 --- a/agent/tcs/handler/handler_test.go +++ /dev/null @@ -1,396 +0,0 @@ -//go:build unit -// +build unit - -// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -package tcshandler - -import ( - "errors" - "fmt" - "io" - "math/rand" - "net/url" - "strings" - "sync" - "testing" - "time" - - "context" - - mock_api "github.com/aws/amazon-ecs-agent/agent/api/mocks" - "github.com/aws/amazon-ecs-agent/agent/config" - mock_engine "github.com/aws/amazon-ecs-agent/agent/engine/mocks" - "github.com/aws/amazon-ecs-agent/agent/version" - "github.com/aws/amazon-ecs-agent/ecs-agent/doctor" - "github.com/aws/amazon-ecs-agent/ecs-agent/eventstream" - "github.com/aws/amazon-ecs-agent/ecs-agent/stats" - tcsclient "github.com/aws/amazon-ecs-agent/ecs-agent/tcs/client" - "github.com/aws/amazon-ecs-agent/ecs-agent/tcs/model/ecstcs" - "github.com/aws/amazon-ecs-agent/ecs-agent/wsclient" - wsmock "github.com/aws/amazon-ecs-agent/ecs-agent/wsclient/mock/utils" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/docker/docker/api/types" - "github.com/golang/mock/gomock" - "github.com/gorilla/websocket" - "github.com/stretchr/testify/assert" -) - -const ( - testTaskArn = "arn:aws:ecs:us-east-1:123:task/def" - testTaskDefinitionFamily = "task-def" - testClusterArn = "arn:aws:ecs:us-east-1:123:cluster/default" - testInstanceArn = "arn:aws:ecs:us-east-1:123:container-instance/abc" - testMessageId = "testMessageId" - testPublishMetricsInterval = 1 * time.Second - testSendMetricsToChannelWaitTime = 100 * time.Millisecond - testTelemetryChannelDefaultBufferSize = 10 -) - -type mockStatsEngine struct { - metricsChannel chan<- ecstcs.TelemetryMessage - healthChannel chan<- ecstcs.HealthMessage - publishMetricsTicker *time.Ticker -} - -var testCreds = credentials.NewStaticCredentials("test-id", "test-secret", "test-token") - -var testCfg = &config.Config{ - AcceptInsecureCert: true, - AWSRegion: "us-east-1", -} - -var emptyDoctor, _ = doctor.NewDoctor([]doctor.Healthcheck{}, "test-cluster", "this:is:an:instance:arn") - -func (*mockStatsEngine) GetInstanceMetrics(includeServiceConnectStats bool) (*ecstcs.MetricsMetadata, []*ecstcs.TaskMetric, error) { - req := createPublishMetricsRequest() - return req.Metadata, req.TaskMetrics, nil -} - -func (*mockStatsEngine) ContainerDockerStats(taskARN string, id string) (*types.StatsJSON, *stats.NetworkStatsPerSec, error) { - return nil, nil, fmt.Errorf("not implemented") -} - -func (*mockStatsEngine) GetTaskHealthMetrics() (*ecstcs.HealthMetadata, []*ecstcs.TaskHealth, error) { - return nil, nil, nil -} - -func (*mockStatsEngine) GetPublishServiceConnectTickerInterval() int32 { - return 0 -} - -func (*mockStatsEngine) SetPublishServiceConnectTickerInterval(counter int32) { - return -} - -func (engine *mockStatsEngine) GetPublishMetricsTicker() *time.Ticker { - return engine.publishMetricsTicker -} - -// SimulateMetricsPublishToChannel simulates the behavior of `StartMetricsPublish` in DockerStatsEngine, which feeds metrics -// to channel to TCS Client. There has to be at least one valid metrics sent, otherwise no request will be made to mockServer -// in TestStartSession, specifically blocking `request := <-requestChan` -func (engine *mockStatsEngine) SimulateMetricsPublishToChannel(ctx context.Context) { - engine.publishMetricsTicker = time.NewTicker(testPublishMetricsInterval) - for { - select { - case <-engine.publishMetricsTicker.C: - engine.metricsChannel <- ecstcs.TelemetryMessage{ - Metadata: &ecstcs.MetricsMetadata{ - Cluster: aws.String(testClusterArn), - ContainerInstance: aws.String(testInstanceArn), - Fin: aws.Bool(false), - Idle: aws.Bool(false), - MessageId: aws.String(testMessageId), - }, - TaskMetrics: []*ecstcs.TaskMetric{ - &ecstcs.TaskMetric{}, - }, - } - - engine.healthChannel <- ecstcs.HealthMessage{ - Metadata: &ecstcs.HealthMetadata{}, - HealthMetrics: []*ecstcs.TaskHealth{}, - } - - case <-ctx.Done(): - defer close(engine.metricsChannel) - defer close(engine.healthChannel) - return - } - } -} - -// TestDisableMetrics tests the StartMetricsSession will return immediately if -// the metrics was disabled -func TestDisableMetrics(t *testing.T) { - params := TelemetrySessionParams{ - Cfg: &config.Config{ - DisableMetrics: config.BooleanDefaultFalse{Value: config.ExplicitlyEnabled}, - DisableDockerHealthCheck: config.BooleanDefaultFalse{Value: config.ExplicitlyEnabled}, - }, - } - - StartMetricsSession(¶ms) -} - -func TestFormatURL(t *testing.T) { - endpoint := "http://127.0.0.0.1/" - ctrl := gomock.NewController(t) - defer ctrl.Finish() - taskEngine := mock_engine.NewMockTaskEngine(ctrl) - - taskEngine.EXPECT().Version().Return("Docker version result", nil) - wsurl := formatURL(endpoint, testClusterArn, testInstanceArn, taskEngine) - parsed, err := url.Parse(wsurl) - assert.NoError(t, err, "should be able to parse URL") - assert.Equal(t, "/ws", parsed.Path, "wrong path") - assert.Equal(t, testClusterArn, parsed.Query().Get("cluster"), "wrong cluster") - assert.Equal(t, testInstanceArn, parsed.Query().Get("containerInstance"), "wrong container instance") - assert.Equal(t, version.Version, parsed.Query().Get("agentVersion"), "wrong agent version") - assert.Equal(t, version.GitHashString(), parsed.Query().Get("agentHash"), "wrong agent hash") - assert.Equal(t, "Docker version result", parsed.Query().Get("dockerVersion"), "wrong docker version") -} - -func TestStartSession(t *testing.T) { - // Start test server. - closeWS := make(chan []byte) - server, serverChan, requestChan, serverErr, err := wsmock.GetMockServer(closeWS) - server.StartTLS() - defer server.Close() - if err != nil { - t.Fatal(err) - } - - telemetryMessages := make(chan ecstcs.TelemetryMessage, testTelemetryChannelDefaultBufferSize) - healthMessages := make(chan ecstcs.HealthMessage, testTelemetryChannelDefaultBufferSize) - - wait := &sync.WaitGroup{} - ctx, cancel := context.WithCancel(context.Background()) - wait.Add(1) - go func() { - select { - case sErr := <-serverErr: - t.Error(sErr) - case <-ctx.Done(): - } - wait.Done() - }() - defer func() { - closeSocket(closeWS) - close(serverChan) - }() - - deregisterInstanceEventStream := eventstream.NewEventStream("Deregister_Instance", context.Background()) - - mockEngine := &mockStatsEngine{ - metricsChannel: telemetryMessages, - healthChannel: healthMessages, - } - - // Start a session with the test server. - go startSession(ctx, server.URL, testCfg, testCreds, telemetryMessages, healthMessages, - defaultHeartbeatTimeout, defaultHeartbeatJitter, - testPublishMetricsInterval, deregisterInstanceEventStream, emptyDoctor) - // Wait for 100 ms to make sure the session is ready to receive message from channel - time.Sleep(testSendMetricsToChannelWaitTime) - go mockEngine.SimulateMetricsPublishToChannel(ctx) - - // startSession internally starts publishing metrics from the mockStatsEngine object (poll msg out of channel). - time.Sleep(testPublishMetricsInterval * 2) - - // Read request channel to get the metric data published to the server. - request := <-requestChan - cancel() - wait.Wait() - go func() { - for { - select { - case <-requestChan: - } - } - }() - - // Decode and verify the metric data. - payload, err := getPayloadFromRequest(request) - if err != nil { - t.Fatal("Error decoding payload: ", err) - } - - // Decode and verify the metric data. - _, responseType, err := wsclient.DecodeData([]byte(payload), tcsclient.NewTCSDecoder()) - if err != nil { - t.Fatal("error decoding data: ", err) - } - if responseType != "PublishMetricsRequest" { - t.Fatal("Unexpected responseType: ", responseType) - } -} - -func TestSessionConnectionClosedByRemote(t *testing.T) { - // Start test server. - closeWS := make(chan []byte) - server, serverChan, _, serverErr, err := wsmock.GetMockServer(closeWS) - server.StartTLS() - defer server.Close() - if err != nil { - t.Fatal(err) - } - go func() { - serr := <-serverErr - if !websocket.IsCloseError(serr, websocket.CloseNormalClosure) { - t.Error(serr) - } - }() - sleepBeforeClose := 10 * time.Millisecond - go func() { - time.Sleep(sleepBeforeClose) - closeSocket(closeWS) - close(serverChan) - }() - - ctx, cancel := context.WithCancel(context.Background()) - deregisterInstanceEventStream := eventstream.NewEventStream("Deregister_Instance", ctx) - deregisterInstanceEventStream.StartListening() - defer cancel() - - telemetryMessages := make(chan ecstcs.TelemetryMessage, testTelemetryChannelDefaultBufferSize) - healthMessages := make(chan ecstcs.HealthMessage, testTelemetryChannelDefaultBufferSize) - - // Start a session with the test server. - err = startSession(ctx, server.URL, testCfg, testCreds, telemetryMessages, healthMessages, defaultHeartbeatTimeout, - defaultHeartbeatJitter, testPublishMetricsInterval, deregisterInstanceEventStream, emptyDoctor) - - if err == nil { - t.Error("Expected io.EOF on closed connection") - } - if err != io.EOF { - t.Error("Expected io.EOF on closed connection, got: ", err) - } -} - -// TestConnectionInactiveTimeout tests the tcs client reconnect when it loses network -// connection or it's inactive for too long -func TestConnectionInactiveTimeout(t *testing.T) { - // Start test server. - closeWS := make(chan []byte) - server, _, requestChan, serverErr, err := wsmock.GetMockServer(closeWS) - server.StartTLS() - defer server.Close() - if err != nil { - t.Fatal(err) - } - - go func() { - for { - select { - case <-requestChan: - } - } - }() - - ctx, cancel := context.WithCancel(context.Background()) - deregisterInstanceEventStream := eventstream.NewEventStream("Deregister_Instance", ctx) - deregisterInstanceEventStream.StartListening() - defer cancel() - - telemetryMessages := make(chan ecstcs.TelemetryMessage, testTelemetryChannelDefaultBufferSize) - healthMessages := make(chan ecstcs.HealthMessage, testTelemetryChannelDefaultBufferSize) - - // Start a session with the test server. - err = startSession(ctx, server.URL, testCfg, testCreds, telemetryMessages, healthMessages, 50*time.Millisecond, - 100*time.Millisecond, testPublishMetricsInterval, deregisterInstanceEventStream, emptyDoctor) - // if we are not blocked here, then the test pass as it will reconnect in StartSession - assert.NoError(t, err, "Close the connection should cause the tcs client return error") - - assert.True(t, websocket.IsCloseError(<-serverErr, websocket.CloseAbnormalClosure), - "Read from closed connection should produce an io.EOF error") - - closeSocket(closeWS) -} - -func TestDiscoverEndpointAndStartSession(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockEcs := mock_api.NewMockECSClient(ctrl) - mockEcs.EXPECT().DiscoverTelemetryEndpoint(gomock.Any()).Return("", errors.New("error")) - - err := startTelemetrySession(&TelemetrySessionParams{ECSClient: mockEcs}) - if err == nil { - t.Error("Expected error from startTelemetrySession when DiscoverTelemetryEndpoint returns error") - } -} - -func getPayloadFromRequest(request string) (string, error) { - lines := strings.Split(request, "\r\n") - if len(lines) > 0 { - return lines[len(lines)-1], nil - } - - return "", errors.New("Could not get payload") -} - -// closeSocket tells the server to send a close frame. This lets us test -// what happens if the connection is closed by the remote server. -func closeSocket(ws chan<- []byte) { - ws <- websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") - close(ws) -} - -func createPublishMetricsRequest() *ecstcs.PublishMetricsRequest { - cluster := testClusterArn - ci := testInstanceArn - taskArn := testTaskArn - taskDefinitionFamily := testTaskDefinitionFamily - var fval float64 - fval = rand.Float64() - var ival int64 - ival = rand.Int63n(10) - ts := time.Now() - idle := false - messageId := testMessageId - return &ecstcs.PublishMetricsRequest{ - Metadata: &ecstcs.MetricsMetadata{ - Cluster: &cluster, - ContainerInstance: &ci, - Idle: &idle, - MessageId: &messageId, - }, - TaskMetrics: []*ecstcs.TaskMetric{ - { - ContainerMetrics: []*ecstcs.ContainerMetric{ - { - CpuStatsSet: &ecstcs.CWStatsSet{ - Max: &fval, - Min: &fval, - SampleCount: &ival, - Sum: &fval, - }, - MemoryStatsSet: &ecstcs.CWStatsSet{ - Max: &fval, - Min: &fval, - SampleCount: &ival, - Sum: &fval, - }, - }, - }, - TaskArn: &taskArn, - TaskDefinitionFamily: &taskDefinitionFamily, - }, - }, - Timestamp: &ts, - } -}