Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge feature/channelBasedStatsEngine to dev #3717

Merged
merged 2 commits into from
May 25, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions agent/app/agent.go
Original file line number Diff line number Diff line change
@@ -58,6 +58,7 @@ import (
"github.com/aws/amazon-ecs-agent/agent/stats"
"github.com/aws/amazon-ecs-agent/agent/taskresource"
tcshandler "github.com/aws/amazon-ecs-agent/agent/tcs/handler"
"github.com/aws/amazon-ecs-agent/agent/tcs/model/ecstcs"
"github.com/aws/amazon-ecs-agent/agent/utils"
"github.com/aws/amazon-ecs-agent/agent/utils/loader"
"github.com/aws/amazon-ecs-agent/agent/utils/mobypkgwrapper"
@@ -98,6 +99,13 @@ const (
inServiceState = "InService"
asgLifecyclePollWait = time.Minute
asgLifecyclePollMax = 120 // given each poll cycle waits for about a minute, this gives 2-3 hours before timing out

// By default, TCS (or TACS) will reject metrics that are older than 5 minutes. Since our metrics collection interval
// is currently set to 20 seconds, setting a buffer size of 15 allows us to store exactly 5 minutes of metrics in
// these buffers in the case where we temporarily lose connect to TCS. This value does not change with task number,
// as the number of messages in the channel is equal to the number of times we call `getInstanceMetrics`, which collects
// metrics from all tasks and containers and put them into one TelemetryMessage object.
telemetryChannelDefaultBufferSize = 15
)

var (
@@ -814,7 +822,7 @@ func (agent *ecsAgent) reregisterContainerInstance(client api.ECSClient, capabil
return transientError{err}
}

// startAsyncRoutines starts all of the background methods
// startAsyncRoutines starts all background methods
func (agent *ecsAgent) startAsyncRoutines(
containerChangeEventStream *eventstream.EventStream,
credentialsManager credentials.Manager,
@@ -841,7 +849,10 @@ func (agent *ecsAgent) startAsyncRoutines(
// Agent introspection api
go handlers.ServeIntrospectionHTTPEndpoint(agent.ctx, &agent.containerInstanceARN, taskEngine, agent.cfg)

statsEngine := stats.NewDockerStatsEngine(agent.cfg, agent.dockerClient, containerChangeEventStream)
telemetryMessages := make(chan ecstcs.TelemetryMessage, telemetryChannelDefaultBufferSize)
healthMessages := make(chan ecstcs.HealthMessage, telemetryChannelDefaultBufferSize)

statsEngine := stats.NewDockerStatsEngine(agent.cfg, agent.dockerClient, containerChangeEventStream, telemetryMessages, healthMessages)

// Start serving the endpoint to fetch IAM Role credentials and other task metadata
if agent.cfg.TaskMetadataAZDisabled {
@@ -863,9 +874,18 @@ func (agent *ecsAgent) startAsyncRoutines(
ECSClient: client,
TaskEngine: taskEngine,
StatsEngine: statsEngine,
MetricsChannel: telemetryMessages,
HealthChannel: healthMessages,
Doctor: doctor,
}

err := statsEngine.MustInit(agent.ctx, taskEngine, agent.cfg.Cluster, agent.containerInstanceARN)
if err != nil {
seelog.Warnf("Error initializing metrics engine: %v", err)
return
}
go statsEngine.StartMetricsPublish()

// Start metrics session in a go routine
go tcshandler.StartMetricsSession(&telemetrySessionParams)
}
90 changes: 89 additions & 1 deletion agent/stats/engine.go
Original file line number Diff line number Diff line change
@@ -48,6 +48,15 @@ const (
queueResetThreshold = 2 * dockerclient.StatsInactivityTimeout
hostNetworkMode = "host"
noneNetworkMode = "none"
// defaultPublishServiceConnectTicker is every 3rd time service connect metrics will be sent to the backend
// Task metrics are published at 20s interval, thus task's service metrics will be published 60s.
defaultPublishServiceConnectTicker = 3
// publishMetricsTimeout is the duration that we wait for metrics/health info to be
// pushed to the TCS channels. In theory, this timeout should never be hit since
// the TCS handler should be continually reading from the channels and pushing to
// TCS, but when we lose connection to TCS, these channels back up. In case this
// happens, we need to have a timeout to prevent statsEngine channels from blocking.
publishMetricsTimeout = 1 * time.Second
)

var (
@@ -99,6 +108,9 @@ type DockerStatsEngine struct {
taskToServiceConnectStats map[string]*ServiceConnectStats
publishServiceConnectTickerInterval int32
publishMetricsTicker *time.Ticker
// channels to send metrics to TACS Client
metricsChannel chan<- ecstcs.TelemetryMessage
healthChannel chan<- ecstcs.HealthMessage
}

// ResolveTask resolves the api task object, given container id.
@@ -141,7 +153,8 @@ func (resolver *DockerContainerMetadataResolver) ResolveContainer(dockerID strin

// NewDockerStatsEngine creates a new instance of the DockerStatsEngine object.
// MustInit() must be called to initialize the fields of the new event listener.
func NewDockerStatsEngine(cfg *config.Config, client dockerapi.DockerClient, containerChangeEventStream *eventstream.EventStream) *DockerStatsEngine {
func NewDockerStatsEngine(cfg *config.Config, client dockerapi.DockerClient, containerChangeEventStream *eventstream.EventStream,
metricsChannel chan<- ecstcs.TelemetryMessage, healthChannel chan<- ecstcs.HealthMessage) *DockerStatsEngine {
return &DockerStatsEngine{
client: client,
resolver: nil,
@@ -153,6 +166,8 @@ func NewDockerStatsEngine(cfg *config.Config, client dockerapi.DockerClient, con
taskToServiceConnectStats: make(map[string]*ServiceConnectStats),
containerChangeEventStream: containerChangeEventStream,
publishServiceConnectTickerInterval: 0,
metricsChannel: metricsChannel,
healthChannel: healthChannel,
}
}

@@ -421,6 +436,79 @@ func (engine *DockerStatsEngine) addToStatsContainerMapUnsafe(
return true
}

// StartMetricsPublish starts to collect and publish task and health metrics
func (engine *DockerStatsEngine) StartMetricsPublish() {
if engine.publishMetricsTicker == nil {
seelog.Debug("Skipping reporting metrics through channel. Publish ticker is uninitialized")
return
}

// Publish metrics immediately after we start the loop and wait for ticks. This makes sure TACS side has correct
// TaskCount metrics in CX account (especially for short living tasks)
engine.publishMetrics(false)
engine.publishHealth()

for {
var includeServiceConnectStats bool
metricCounter := engine.GetPublishServiceConnectTickerInterval()
metricCounter++
if metricCounter == defaultPublishServiceConnectTicker {
includeServiceConnectStats = true
metricCounter = 0
}
engine.SetPublishServiceConnectTickerInterval(metricCounter)
select {
case <-engine.publishMetricsTicker.C:
seelog.Debugf("publishMetricsTicker triggered. Sending telemetry messages to tcsClient through channel")
go engine.publishMetrics(includeServiceConnectStats)
go engine.publishHealth()
case <-engine.ctx.Done():
return
}
}
}

func (engine *DockerStatsEngine) publishMetrics(includeServiceConnectStats bool) {
publishMetricsCtx, cancel := context.WithTimeout(engine.ctx, publishMetricsTimeout)
defer cancel()
metricsMetadata, taskMetrics, metricsErr := engine.GetInstanceMetrics(includeServiceConnectStats)
if metricsErr == nil {
metricsMessage := ecstcs.TelemetryMessage{
Metadata: metricsMetadata,
TaskMetrics: taskMetrics,
IncludeServiceConnectStats: includeServiceConnectStats,
}
select {
case engine.metricsChannel <- metricsMessage:
seelog.Debugf("sent telemetry message: %v", metricsMessage)
case <-publishMetricsCtx.Done():
seelog.Errorf("timeout sending telemetry message, discarding metrics")
}
} else {
seelog.Warnf("Error collecting task metrics: %v", metricsErr)
}
}

func (engine *DockerStatsEngine) publishHealth() {
publishHealthCtx, cancel := context.WithTimeout(engine.ctx, publishMetricsTimeout)
defer cancel()
healthMetadata, taskHealthMetrics, healthErr := engine.GetTaskHealthMetrics()
if healthErr == nil {
healthMessage := ecstcs.HealthMessage{
Metadata: healthMetadata,
HealthMetrics: taskHealthMetrics,
}
select {
case engine.healthChannel <- healthMessage:
seelog.Debugf("sent health message: %v", healthMessage)
case <-publishHealthCtx.Done():
seelog.Errorf("timeout sending health message, discarding metrics")
}
} else {
seelog.Warnf("Error collecting health metrics: %v", healthErr)
}
}

// GetInstanceMetrics gets all task metrics and instance metadata from stats engine.
func (engine *DockerStatsEngine) GetInstanceMetrics(includeServiceConnectStats bool) (*ecstcs.MetricsMetadata, []*ecstcs.TaskMetric, error) {
idle := engine.isIdle()
21 changes: 10 additions & 11 deletions agent/stats/engine_integ_test.go
Original file line number Diff line number Diff line change
@@ -62,7 +62,7 @@ func createRunningTask(networkMode string) *apitask.Task {

func TestStatsEngineWithExistingContainersWithoutHealth(t *testing.T) {
// Create a new docker stats engine
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithExistingContainersWithoutHealth"))
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithExistingContainersWithoutHealth"), nil, nil)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

@@ -127,7 +127,7 @@ func TestStatsEngineWithExistingContainersWithoutHealth(t *testing.T) {

func TestStatsEngineWithNewContainersWithoutHealth(t *testing.T) {
// Create a new docker stats engine
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithNewContainers"))
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithNewContainers"), nil, nil)
defer engine.removeAll()

// Assign ContainerStop timeout to addressable variable
@@ -199,7 +199,7 @@ func TestStatsEngineWithNewContainersWithoutHealth(t *testing.T) {

func TestStatsEngineWithExistingContainers(t *testing.T) {
// Create a new docker stats engine
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithExistingContainers"))
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithExistingContainers"), nil, nil)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

@@ -270,7 +270,7 @@ func TestStatsEngineWithExistingContainers(t *testing.T) {

func TestStatsEngineWithNewContainers(t *testing.T) {
// Create a new docker stats engine
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithNewContainers"))
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithNewContainers"), nil, nil)
defer engine.removeAll()
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
@@ -352,7 +352,7 @@ func TestStatsEngineWithNewContainersWithPolling(t *testing.T) {
// Create a new docker client with new config
dockerClientForNewContainersWithPolling, _ := dockerapi.NewDockerGoClient(sdkClientFactory, &cfg, ctx)
// Create a new docker stats engine
engine := NewDockerStatsEngine(&cfg, dockerClientForNewContainersWithPolling, eventStream("TestStatsEngineWithNewContainers"))
engine := NewDockerStatsEngine(&cfg, dockerClientForNewContainersWithPolling, eventStream("TestStatsEngineWithNewContainers"), nil, nil)
defer engine.removeAll()
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
@@ -459,7 +459,7 @@ func TestStatsEngineWithDockerTaskEngine(t *testing.T) {
testTask)

// Create a new docker stats engine
statsEngine := NewDockerStatsEngine(&cfg, dockerClient, containerChangeEventStream)
statsEngine := NewDockerStatsEngine(&cfg, dockerClient, containerChangeEventStream, nil, nil)
err = statsEngine.MustInit(ctx, taskEngine, defaultCluster, defaultContainerInstance)
require.NoError(t, err, "initializing stats engine failed")
defer statsEngine.removeAll()
@@ -542,7 +542,7 @@ func TestStatsEngineWithDockerTaskEngineMissingRemoveEvent(t *testing.T) {
testTask)

// Create a new docker stats engine
statsEngine := NewDockerStatsEngine(&cfg, dockerClient, containerChangeEventStream)
statsEngine := NewDockerStatsEngine(&cfg, dockerClient, containerChangeEventStream, nil, nil)
err = statsEngine.MustInit(ctx, taskEngine, defaultCluster, defaultContainerInstance)
require.NoError(t, err, "initializing stats engine failed")
defer statsEngine.removeAll()
@@ -571,9 +571,8 @@ func TestStatsEngineWithDockerTaskEngineMissingRemoveEvent(t *testing.T) {

time.Sleep(checkPointSleep)

// Simulate tcs client invoking GetInstanceMetrics.
_, _, err = statsEngine.GetInstanceMetrics(false)
assert.Error(t, err, "expect error 'no task metrics tp report' when getting instance metrics")
assert.Error(t, err, "expect error 'no task metrics to report' when getting instance metrics")

// Should not contain any metrics after cleanup.
validateIdleContainerMetrics(t, statsEngine)
@@ -586,7 +585,7 @@ func TestStatsEngineWithNetworkStatsDefaultMode(t *testing.T) {

func testNetworkModeStatsInteg(t *testing.T, networkMode string, statsEmpty bool) {
// Create a new docker stats engine
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithNetworkStats"))
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithNetworkStats"), nil, nil)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

@@ -667,7 +666,7 @@ func testNetworkModeStatsInteg(t *testing.T, networkMode string, statsEmpty bool

func TestStorageStats(t *testing.T) {
// Create a new docker stats engine
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithStorageStats"))
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithStorageStats"), nil, nil)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

Loading