Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use same image for cas api/workers and send worker discord notifs #60

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 24 additions & 16 deletions cd/manager/common/aws/ecs/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (e Ecs) LaunchTask(cluster, family, container, vpcConfigParam string, overr
return e.runEcsTask(cluster, family, container, &types.NetworkConfiguration{AwsvpcConfiguration: &vpcConfig}, overrides)
}

func (e Ecs) CheckTask(cluster, taskDefId string, running, stable bool, taskIds ...string) (bool, error) {
func (e Ecs) CheckTask(cluster, taskDefId string, running, stable bool, taskIds ...string) (bool, *int32, error) {
ctx, cancel := context.WithTimeout(context.Background(), manager.DefaultHttpWaitTime)
defer cancel()

Expand All @@ -85,33 +85,41 @@ func (e Ecs) CheckTask(cluster, taskDefId string, running, stable bool, taskIds
output, err := e.ecsClient.DescribeTasks(ctx, input)
if err != nil {
log.Printf("checkTask: describe service error: %s, %s, %v", cluster, taskIds, err)
return false, err
}
var checkStatus types.DesiredStatus
if running {
checkStatus = types.DesiredStatusRunning
} else {
checkStatus = types.DesiredStatusStopped
return false, nil, err
}
// If checking for running tasks, at least one task must be present, but when checking for stopped tasks, it's ok to
// have found no matching tasks (i.e. the tasks have already stopped and been removed from the list).
tasksFound := !running
tasksInState := true
var exitCode *int32 = nil
if len(output.Tasks) > 0 {
// We found one or more tasks, only return true if all specified tasks were in the right state for at least a
// few minutes.
for _, task := range output.Tasks {
// If a task definition ARN was specified, make sure that we found at least one task with that definition.
if (len(taskDefId) == 0) || (*task.TaskDefinitionArn == taskDefId) {
tasksFound = true
if (*task.LastStatus != string(checkStatus)) ||
(running && stable && time.Now().Add(-manager.DefaultWaitTime).Before(*task.StartedAt)) {
tasksInState = false
// If checking for stable tasks, make sure that the task has been running for a few minutes.
if running {
if (*task.LastStatus != string(types.DesiredStatusRunning)) ||
(stable && time.Now().After((*task.StartedAt).Add(manager.DefaultWaitTime))) {
tasksInState = false
}
} else {
if *task.LastStatus != string(types.DesiredStatusStopped) {
tasksInState = false
} else
// We always configure the primary application in a task as the first container, so we only care
// about its exit code. Among the first containers across all matching tasks, return the highest
// exit code.
if task.Containers[0].ExitCode != nil {
if (exitCode == nil) || (*task.Containers[0].ExitCode > *exitCode) {
exitCode = task.Containers[0].ExitCode
}
}
}
}
}
}
return tasksFound && tasksInState, nil
return tasksFound && tasksInState, exitCode, nil
}

func (e Ecs) GetLayout(clusters []string) (*manager.Layout, error) {
Expand Down Expand Up @@ -422,7 +430,7 @@ func (e Ecs) checkEcsService(cluster, taskDefArn string) (bool, error) {
return false, err
} else if len(taskArns) > 0 {
// For each running task, check if it's been up for a few minutes.
if deployed, err := e.CheckTask(cluster, taskDefArn, true, true, taskArns...); err != nil {
if deployed, _, err := e.CheckTask(cluster, taskDefArn, true, true, taskArns...); err != nil {
log.Printf("checkEcsService: check task error: %s, %s, %s, %v", cluster, family, taskDefArn, err)
return false, err
} else if !deployed {
Expand Down Expand Up @@ -535,7 +543,7 @@ func (e Ecs) checkEnvTaskSet(taskSet *manager.TaskSet, deployType string, cluste
case deployType_Task:
// Only check tasks that are meant to stay up permanently
if !task.Temp {
if deployed, err := e.CheckTask(cluster, "", true, true, task.Id); err != nil {
if deployed, _, err := e.CheckTask(cluster, "", true, true, task.Id); err != nil {
return false, err
} else if !deployed {
return false, nil
Expand Down
6 changes: 5 additions & 1 deletion cd/manager/jobs/anchor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,13 @@ func (a anchorJob) launchWorker() (string, error) {
}

func (a anchorJob) checkWorker(expectedToBeRunning bool) (bool, error) {
if status, err := a.d.CheckTask("ceramic-"+a.env+"-cas", "", expectedToBeRunning, false, a.state.Params[job.JobParam_Id].(string)); err != nil {
if status, exitCode, err := a.d.CheckTask("ceramic-"+a.env+"-cas", "", expectedToBeRunning, false, a.state.Params[job.JobParam_Id].(string)); err != nil {
return false, err
} else if status {
// If a non-zero exit code was present, the worker failed to complete successfully.
if (exitCode != nil) && (*exitCode != 0) {
return false, fmt.Errorf("anchorJob: worker exited with code %d", *exitCode)
}
return true, nil
} else if expectedToBeRunning && job.IsTimedOut(a.state, manager.DefaultWaitTime) { // Worker did not start in time
return false, manager.Error_StartupTimeout
Expand Down
2 changes: 0 additions & 2 deletions cd/manager/jobs/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ const (
)

const defaultFailureTime = 30 * time.Minute
const anchorWorkerRepo = "ceramic-prod-cas-runner"

func DeployJob(jobState job.JobState, db manager.Database, notifs manager.Notifs, d manager.Deployment, repo manager.Repository) (manager.JobSm, error) {
if component, found := jobState.Params[job.DeployJobParam_Component].(string); !found {
Expand Down Expand Up @@ -235,7 +234,6 @@ func (d deployJob) generateEnvLayout(component manager.DeployComponent) (*manage
if component == manager.DeployComponent_Cas {
newLayout.Clusters[casCluster].Tasks = &manager.TaskSet{Tasks: map[string]*manager.Task{
casCluster + "-" + serviceSuffix_CasWorker: {
Repo: &manager.Repo{Name: anchorWorkerRepo},
Temp: true, // Anchor workers do not stay up permanently
Name: containerName_CasWorker,
},
Expand Down
6 changes: 5 additions & 1 deletion cd/manager/jobs/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,13 @@ func (e e2eTestJob) checkAllTests(expectedToBeRunning bool) (bool, error) {
}

func (e e2eTestJob) checkTests(taskId string, expectedToBeRunning bool) (bool, error) {
if status, err := e.d.CheckTask("ceramic-qa-tests", "", expectedToBeRunning, false, taskId); err != nil {
if status, exitCode, err := e.d.CheckTask("ceramic-qa-tests", "", expectedToBeRunning, false, taskId); err != nil {
return false, err
} else if status {
// If a non-zero exit code was present, at least one of the test tasks failed to complete successfully.
if (exitCode != nil) && (*exitCode != 0) {
return false, fmt.Errorf("e2eTestJob: test exited with code %d", *exitCode)
}
return true, nil
} else if expectedToBeRunning && job.IsTimedOut(e.state, manager.DefaultWaitTime) { // Tests did not start in time
return false, manager.Error_StartupTimeout
Expand Down
6 changes: 5 additions & 1 deletion cd/manager/jobs/smoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,13 @@ func (s smokeTestJob) Advance() (job.JobState, error) {
}

func (s smokeTestJob) checkTests(expectedToBeRunning bool) (bool, error) {
if status, err := s.d.CheckTask(ClusterName, "", expectedToBeRunning, false, s.state.Params[job.JobParam_Id].(string)); err != nil {
if status, exitCode, err := s.d.CheckTask(ClusterName, "", expectedToBeRunning, false, s.state.Params[job.JobParam_Id].(string)); err != nil {
return false, err
} else if status {
// If a non-zero exit code was present, the test failed to complete successfully.
if (exitCode != nil) && (*exitCode != 0) {
return false, fmt.Errorf("anchorJob: worker exited with code %d", *exitCode)
}
return true, nil
} else if expectedToBeRunning && job.IsTimedOut(s.state, manager.DefaultWaitTime) { // Tests did not start in time
return false, manager.Error_StartupTimeout
Expand Down
2 changes: 1 addition & 1 deletion cd/manager/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ type Cache interface {
type Deployment interface {
LaunchServiceTask(cluster, service, family, container string, overrides map[string]string) (string, error)
LaunchTask(cluster, family, container, vpcConfigParam string, overrides map[string]string) (string, error)
CheckTask(cluster, taskDefId string, running, stable bool, taskIds ...string) (bool, error)
CheckTask(cluster, taskDefId string, running, stable bool, taskIds ...string) (bool, *int32, error)
GetLayout(clusters []string) (*Layout, error)
UpdateLayout(*Layout, string) error
CheckLayout(*Layout) (bool, error)
Expand Down
39 changes: 13 additions & 26 deletions cd/manager/notifs/anchor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type anchorNotif struct {
state job.JobState
alertWebhook webhook.Client
warningWebhook webhook.Client
infoWebhook webhook.Client
region string
env string
}
Expand All @@ -27,56 +28,42 @@ func newAnchorNotif(jobState job.JobState) (jobNotif, error) {
return nil, err
} else if w, err := parseDiscordWebhookUrl("DISCORD_WARNING_WEBHOOK"); err != nil {
return nil, err
} else if i, err := parseDiscordWebhookUrl("DISCORD_INFO_WEBHOOK"); err != nil {
return nil, err
} else {
return &anchorNotif{
jobState,
a,
w,
i,
os.Getenv("AWS_REGION"),
os.Getenv(manager.EnvVar_Env),
}, nil
}
}

func (a anchorNotif) getChannels() []webhook.Client {
// We only care about "waiting" notifications from the CD manager for the time being. Other notifications are sent
// directly from the anchor worker.
if a.state.Stage == job.JobStage_Waiting {
webhooks := make([]webhook.Client, 0, 1)
if stalled, _ := a.state.Params[job.AnchorJobParam_Stalled].(bool); stalled {
webhooks = append(webhooks, a.alertWebhook)
} else if delayed, _ := a.state.Params[job.AnchorJobParam_Delayed].(bool); delayed {
webhooks = append(webhooks, a.warningWebhook)
}
webhooks := make([]webhook.Client, 0, 1)
switch a.state.Stage {
case job.JobStage_Started:
webhooks = append(webhooks, a.infoWebhook)
case job.JobStage_Completed:
webhooks = append(webhooks, a.infoWebhook)
case job.JobStage_Failed:
webhooks = append(webhooks, a.alertWebhook)
}
return nil
}

func (a anchorNotif) getTitle() string {
jobStageRepr := string(a.state.Stage)
// If "waiting", update the job stage representation to qualify the severity of the delay, if applicable.
if a.state.Stage == job.JobStage_Waiting {
if stalled, _ := a.state.Params[job.AnchorJobParam_Stalled].(bool); stalled {
jobStageRepr = job.AnchorJobParam_Stalled
} else if delayed, _ := a.state.Params[job.AnchorJobParam_Delayed].(bool); delayed {
jobStageRepr = job.AnchorJobParam_Delayed
}
}
return fmt.Sprintf("Anchor Worker %s", strings.ToUpper(jobStageRepr))
return fmt.Sprintf("Anchor Worker %s", strings.ToUpper(string(a.state.Stage)))
}

func (a anchorNotif) getFields() []discord.EmbedField {
return nil
}

func (a anchorNotif) getColor() discordColor {
if a.state.Stage == job.JobStage_Waiting {
if stalled, _ := a.state.Params[job.AnchorJobParam_Stalled].(bool); stalled {
return discordColor_Alert
} else if delayed, _ := a.state.Params[job.AnchorJobParam_Delayed].(bool); delayed {
return discordColor_Warning
}
}
return colorForStage(a.state.Stage)
}

Expand Down
3 changes: 0 additions & 3 deletions cd/manager/notifs/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,6 @@ func (n JobNotifs) getActiveJobs(jobState job.JobState) []discord.EmbedField {
if field, found := n.getActiveJobsByType(jobState, job.JobType_Deploy); found {
fields = append(fields, field)
}
if field, found := n.getActiveJobsByType(jobState, job.JobType_Anchor); found {
fields = append(fields, field)
}
if field, found := n.getActiveJobsByType(jobState, job.JobType_TestE2E); found {
fields = append(fields, field)
}
Expand Down
25 changes: 0 additions & 25 deletions ci/plans/cas.cue
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ dagger.#Plan & {
source: _source
}

_runnerImage: docker.#Dockerfile & {
target: "runner"
source: _source
}

verify: {
_endpoint: "api/v0/healthcheck"
_port: 8081
Expand Down Expand Up @@ -177,16 +172,6 @@ dagger.#Plan & {
TAGS: _tags + _extraTags + ["qa"]
}
}
_runner: utils.#ECR & {
img: _runnerImage.output
env: {
AWS_ACCOUNT_ID: client.env.AWS_ACCOUNT_ID
AWS_ECR_SECRET: client.commands.aws.stdout
AWS_REGION: Region
REPO: "ceramic-prod-cas-runner"
TAGS: _tags + _extraTags + ["qa"]
}
}
}
}
_base: utils.#ECR & {
Expand All @@ -199,16 +184,6 @@ dagger.#Plan & {
TAGS: _tags + _extraTags
}
}
_runner: utils.#ECR & {
img: _runnerImage.output
env: {
AWS_ACCOUNT_ID: client.env.AWS_ACCOUNT_ID
AWS_ECR_SECRET: client.commands.aws.stdout
AWS_REGION: Region
REPO: "ceramic-prod-cas-runner"
TAGS: _tags + _extraTags
}
}
}
dockerhub: utils.#Dockerhub & {
img: _baseImage.output
Expand Down
Loading