Skip to content

Commit

Permalink
feat: use same image for cas api/workers and send worker discord noti…
Browse files Browse the repository at this point in the history
…fs (#60)
  • Loading branch information
smrz2001 authored May 20, 2024
1 parent 7556cce commit a611c49
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 76 deletions.
40 changes: 24 additions & 16 deletions cd/manager/common/aws/ecs/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (e Ecs) LaunchTask(cluster, family, container, vpcConfigParam string, overr
return e.runEcsTask(cluster, family, container, &types.NetworkConfiguration{AwsvpcConfiguration: &vpcConfig}, overrides)
}

func (e Ecs) CheckTask(cluster, taskDefId string, running, stable bool, taskIds ...string) (bool, error) {
func (e Ecs) CheckTask(cluster, taskDefId string, running, stable bool, taskIds ...string) (bool, *int32, error) {
ctx, cancel := context.WithTimeout(context.Background(), manager.DefaultHttpWaitTime)
defer cancel()

Expand All @@ -85,33 +85,41 @@ func (e Ecs) CheckTask(cluster, taskDefId string, running, stable bool, taskIds
output, err := e.ecsClient.DescribeTasks(ctx, input)
if err != nil {
log.Printf("checkTask: describe service error: %s, %s, %v", cluster, taskIds, err)
return false, err
}
var checkStatus types.DesiredStatus
if running {
checkStatus = types.DesiredStatusRunning
} else {
checkStatus = types.DesiredStatusStopped
return false, nil, err
}
// If checking for running tasks, at least one task must be present, but when checking for stopped tasks, it's ok to
// have found no matching tasks (i.e. the tasks have already stopped and been removed from the list).
tasksFound := !running
tasksInState := true
var exitCode *int32 = nil
if len(output.Tasks) > 0 {
// We found one or more tasks, only return true if all specified tasks were in the right state for at least a
// few minutes.
for _, task := range output.Tasks {
// If a task definition ARN was specified, make sure that we found at least one task with that definition.
if (len(taskDefId) == 0) || (*task.TaskDefinitionArn == taskDefId) {
tasksFound = true
if (*task.LastStatus != string(checkStatus)) ||
(running && stable && time.Now().Add(-manager.DefaultWaitTime).Before(*task.StartedAt)) {
tasksInState = false
// If checking for stable tasks, make sure that the task has been running for a few minutes.
if running {
if (*task.LastStatus != string(types.DesiredStatusRunning)) ||
(stable && time.Now().After((*task.StartedAt).Add(manager.DefaultWaitTime))) {
tasksInState = false
}
} else {
if *task.LastStatus != string(types.DesiredStatusStopped) {
tasksInState = false
} else
// We always configure the primary application in a task as the first container, so we only care
// about its exit code. Among the first containers across all matching tasks, return the highest
// exit code.
if task.Containers[0].ExitCode != nil {
if (exitCode == nil) || (*task.Containers[0].ExitCode > *exitCode) {
exitCode = task.Containers[0].ExitCode
}
}
}
}
}
}
return tasksFound && tasksInState, nil
return tasksFound && tasksInState, exitCode, nil
}

func (e Ecs) GetLayout(clusters []string) (*manager.Layout, error) {
Expand Down Expand Up @@ -422,7 +430,7 @@ func (e Ecs) checkEcsService(cluster, taskDefArn string) (bool, error) {
return false, err
} else if len(taskArns) > 0 {
// For each running task, check if it's been up for a few minutes.
if deployed, err := e.CheckTask(cluster, taskDefArn, true, true, taskArns...); err != nil {
if deployed, _, err := e.CheckTask(cluster, taskDefArn, true, true, taskArns...); err != nil {
log.Printf("checkEcsService: check task error: %s, %s, %s, %v", cluster, family, taskDefArn, err)
return false, err
} else if !deployed {
Expand Down Expand Up @@ -535,7 +543,7 @@ func (e Ecs) checkEnvTaskSet(taskSet *manager.TaskSet, deployType string, cluste
case deployType_Task:
// Only check tasks that are meant to stay up permanently
if !task.Temp {
if deployed, err := e.CheckTask(cluster, "", true, true, task.Id); err != nil {
if deployed, _, err := e.CheckTask(cluster, "", true, true, task.Id); err != nil {
return false, err
} else if !deployed {
return false, nil
Expand Down
6 changes: 5 additions & 1 deletion cd/manager/jobs/anchor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,13 @@ func (a anchorJob) launchWorker() (string, error) {
}

func (a anchorJob) checkWorker(expectedToBeRunning bool) (bool, error) {
if status, err := a.d.CheckTask("ceramic-"+a.env+"-cas", "", expectedToBeRunning, false, a.state.Params[job.JobParam_Id].(string)); err != nil {
if status, exitCode, err := a.d.CheckTask("ceramic-"+a.env+"-cas", "", expectedToBeRunning, false, a.state.Params[job.JobParam_Id].(string)); err != nil {
return false, err
} else if status {
// If a non-zero exit code was present, the worker failed to complete successfully.
if (exitCode != nil) && (*exitCode != 0) {
return false, fmt.Errorf("anchorJob: worker exited with code %d", *exitCode)
}
return true, nil
} else if expectedToBeRunning && job.IsTimedOut(a.state, manager.DefaultWaitTime) { // Worker did not start in time
return false, manager.Error_StartupTimeout
Expand Down
2 changes: 0 additions & 2 deletions cd/manager/jobs/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ const (
)

const defaultFailureTime = 30 * time.Minute
const anchorWorkerRepo = "ceramic-prod-cas-runner"

func DeployJob(jobState job.JobState, db manager.Database, notifs manager.Notifs, d manager.Deployment, repo manager.Repository) (manager.JobSm, error) {
if component, found := jobState.Params[job.DeployJobParam_Component].(string); !found {
Expand Down Expand Up @@ -235,7 +234,6 @@ func (d deployJob) generateEnvLayout(component manager.DeployComponent) (*manage
if component == manager.DeployComponent_Cas {
newLayout.Clusters[casCluster].Tasks = &manager.TaskSet{Tasks: map[string]*manager.Task{
casCluster + "-" + serviceSuffix_CasWorker: {
Repo: &manager.Repo{Name: anchorWorkerRepo},
Temp: true, // Anchor workers do not stay up permanently
Name: containerName_CasWorker,
},
Expand Down
6 changes: 5 additions & 1 deletion cd/manager/jobs/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,13 @@ func (e e2eTestJob) checkAllTests(expectedToBeRunning bool) (bool, error) {
}

func (e e2eTestJob) checkTests(taskId string, expectedToBeRunning bool) (bool, error) {
if status, err := e.d.CheckTask("ceramic-qa-tests", "", expectedToBeRunning, false, taskId); err != nil {
if status, exitCode, err := e.d.CheckTask("ceramic-qa-tests", "", expectedToBeRunning, false, taskId); err != nil {
return false, err
} else if status {
// If a non-zero exit code was present, at least one of the test tasks failed to complete successfully.
if (exitCode != nil) && (*exitCode != 0) {
return false, fmt.Errorf("e2eTestJob: test exited with code %d", *exitCode)
}
return true, nil
} else if expectedToBeRunning && job.IsTimedOut(e.state, manager.DefaultWaitTime) { // Tests did not start in time
return false, manager.Error_StartupTimeout
Expand Down
6 changes: 5 additions & 1 deletion cd/manager/jobs/smoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,13 @@ func (s smokeTestJob) Advance() (job.JobState, error) {
}

func (s smokeTestJob) checkTests(expectedToBeRunning bool) (bool, error) {
if status, err := s.d.CheckTask(ClusterName, "", expectedToBeRunning, false, s.state.Params[job.JobParam_Id].(string)); err != nil {
if status, exitCode, err := s.d.CheckTask(ClusterName, "", expectedToBeRunning, false, s.state.Params[job.JobParam_Id].(string)); err != nil {
return false, err
} else if status {
// If a non-zero exit code was present, the test failed to complete successfully.
if (exitCode != nil) && (*exitCode != 0) {
return false, fmt.Errorf("anchorJob: worker exited with code %d", *exitCode)
}
return true, nil
} else if expectedToBeRunning && job.IsTimedOut(s.state, manager.DefaultWaitTime) { // Tests did not start in time
return false, manager.Error_StartupTimeout
Expand Down
2 changes: 1 addition & 1 deletion cd/manager/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ type Cache interface {
type Deployment interface {
LaunchServiceTask(cluster, service, family, container string, overrides map[string]string) (string, error)
LaunchTask(cluster, family, container, vpcConfigParam string, overrides map[string]string) (string, error)
CheckTask(cluster, taskDefId string, running, stable bool, taskIds ...string) (bool, error)
CheckTask(cluster, taskDefId string, running, stable bool, taskIds ...string) (bool, *int32, error)
GetLayout(clusters []string) (*Layout, error)
UpdateLayout(*Layout, string) error
CheckLayout(*Layout) (bool, error)
Expand Down
39 changes: 13 additions & 26 deletions cd/manager/notifs/anchor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type anchorNotif struct {
state job.JobState
alertWebhook webhook.Client
warningWebhook webhook.Client
infoWebhook webhook.Client
region string
env string
}
Expand All @@ -27,56 +28,42 @@ func newAnchorNotif(jobState job.JobState) (jobNotif, error) {
return nil, err
} else if w, err := parseDiscordWebhookUrl("DISCORD_WARNING_WEBHOOK"); err != nil {
return nil, err
} else if i, err := parseDiscordWebhookUrl("DISCORD_INFO_WEBHOOK"); err != nil {
return nil, err
} else {
return &anchorNotif{
jobState,
a,
w,
i,
os.Getenv("AWS_REGION"),
os.Getenv(manager.EnvVar_Env),
}, nil
}
}

func (a anchorNotif) getChannels() []webhook.Client {
// We only care about "waiting" notifications from the CD manager for the time being. Other notifications are sent
// directly from the anchor worker.
if a.state.Stage == job.JobStage_Waiting {
webhooks := make([]webhook.Client, 0, 1)
if stalled, _ := a.state.Params[job.AnchorJobParam_Stalled].(bool); stalled {
webhooks = append(webhooks, a.alertWebhook)
} else if delayed, _ := a.state.Params[job.AnchorJobParam_Delayed].(bool); delayed {
webhooks = append(webhooks, a.warningWebhook)
}
webhooks := make([]webhook.Client, 0, 1)
switch a.state.Stage {
case job.JobStage_Started:
webhooks = append(webhooks, a.infoWebhook)
case job.JobStage_Completed:
webhooks = append(webhooks, a.infoWebhook)
case job.JobStage_Failed:
webhooks = append(webhooks, a.alertWebhook)
}
return nil
}

func (a anchorNotif) getTitle() string {
jobStageRepr := string(a.state.Stage)
// If "waiting", update the job stage representation to qualify the severity of the delay, if applicable.
if a.state.Stage == job.JobStage_Waiting {
if stalled, _ := a.state.Params[job.AnchorJobParam_Stalled].(bool); stalled {
jobStageRepr = job.AnchorJobParam_Stalled
} else if delayed, _ := a.state.Params[job.AnchorJobParam_Delayed].(bool); delayed {
jobStageRepr = job.AnchorJobParam_Delayed
}
}
return fmt.Sprintf("Anchor Worker %s", strings.ToUpper(jobStageRepr))
return fmt.Sprintf("Anchor Worker %s", strings.ToUpper(string(a.state.Stage)))
}

func (a anchorNotif) getFields() []discord.EmbedField {
return nil
}

func (a anchorNotif) getColor() discordColor {
if a.state.Stage == job.JobStage_Waiting {
if stalled, _ := a.state.Params[job.AnchorJobParam_Stalled].(bool); stalled {
return discordColor_Alert
} else if delayed, _ := a.state.Params[job.AnchorJobParam_Delayed].(bool); delayed {
return discordColor_Warning
}
}
return colorForStage(a.state.Stage)
}

Expand Down
3 changes: 0 additions & 3 deletions cd/manager/notifs/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,6 @@ func (n JobNotifs) getActiveJobs(jobState job.JobState) []discord.EmbedField {
if field, found := n.getActiveJobsByType(jobState, job.JobType_Deploy); found {
fields = append(fields, field)
}
if field, found := n.getActiveJobsByType(jobState, job.JobType_Anchor); found {
fields = append(fields, field)
}
if field, found := n.getActiveJobsByType(jobState, job.JobType_TestE2E); found {
fields = append(fields, field)
}
Expand Down
25 changes: 0 additions & 25 deletions ci/plans/cas.cue
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ dagger.#Plan & {
source: _source
}

_runnerImage: docker.#Dockerfile & {
target: "runner"
source: _source
}

verify: {
_endpoint: "api/v0/healthcheck"
_port: 8081
Expand Down Expand Up @@ -177,16 +172,6 @@ dagger.#Plan & {
TAGS: _tags + _extraTags + ["qa"]
}
}
_runner: utils.#ECR & {
img: _runnerImage.output
env: {
AWS_ACCOUNT_ID: client.env.AWS_ACCOUNT_ID
AWS_ECR_SECRET: client.commands.aws.stdout
AWS_REGION: Region
REPO: "ceramic-prod-cas-runner"
TAGS: _tags + _extraTags + ["qa"]
}
}
}
}
_base: utils.#ECR & {
Expand All @@ -199,16 +184,6 @@ dagger.#Plan & {
TAGS: _tags + _extraTags
}
}
_runner: utils.#ECR & {
img: _runnerImage.output
env: {
AWS_ACCOUNT_ID: client.env.AWS_ACCOUNT_ID
AWS_ECR_SECRET: client.commands.aws.stdout
AWS_REGION: Region
REPO: "ceramic-prod-cas-runner"
TAGS: _tags + _extraTags
}
}
}
dockerhub: utils.#Dockerhub & {
img: _baseImage.output
Expand Down

0 comments on commit a611c49

Please sign in to comment.