Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions pkg/app/piped/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (c *controller) Run(ctx context.Context) error {
c.logger.Info("start running controller")

// Make sure the existence of the workspace directory.
// Each scheduler/deployment will have an working directory inside this workspace.
// Each planner/scheduler will have an working directory inside this workspace.
dir, err := ioutil.TempDir("", "workspace")
if err != nil {
c.logger.Error("failed to create workspace directory", zap.Error(err))
Expand Down Expand Up @@ -224,7 +224,7 @@ L:
break L

case <-ticker.C:
// This must be called before syncPlanner because
// syncSchedulers must be called before syncPlanners because
// after piped is restarted all running deployments need to be loaded firstly.
c.syncSchedulers(ctx)
c.syncPlanners(ctx)
Expand Down Expand Up @@ -284,10 +284,9 @@ func (c *controller) checkCommands() {
func (c *controller) syncPlanners(ctx context.Context) error {
// Remove stale planners from the recently completed list.
for id, t := range c.donePlanners {
if time.Since(t) < plannerStaleDuration {
continue
if time.Since(t) >= plannerStaleDuration {
delete(c.donePlanners, id)
}
delete(c.donePlanners, id)
}

// Find all completed ones and add them to donePlaners list.
Expand Down Expand Up @@ -440,10 +439,9 @@ func (c *controller) syncSchedulers(ctx context.Context) error {

// Remove done schedulers.
for id, t := range c.doneSchedulers {
if time.Since(t) < schedulerStaleDuration {
continue
if time.Since(t) >= schedulerStaleDuration {
delete(c.doneSchedulers, id)
}
delete(c.doneSchedulers, id)
}

for id, s := range c.schedulers {
Expand All @@ -462,17 +460,17 @@ func (c *controller) syncSchedulers(ctx context.Context) error {
// Add missing schedulers.
planneds := c.deploymentLister.ListPlanneds()
runnings := c.deploymentLister.ListRunnings()
runnings = append(runnings, planneds...)
targets := append(runnings, planneds...)

if len(runnings) == 0 {
if len(targets) == 0 {
return nil
}

c.logger.Info(fmt.Sprintf("there are %d planned/running deployments for scheduling", len(runnings)),
c.logger.Info(fmt.Sprintf("there are %d planned/running deployments for scheduling", len(targets)),
zap.Int("count", len(c.schedulers)),
)

for _, d := range runnings {
for _, d := range targets {
// Ignore already processed one.
if _, ok := c.doneSchedulers[d.Id]; ok {
continue
Expand Down
27 changes: 15 additions & 12 deletions pkg/app/piped/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
)

var (
defaultDeploymentTimeout = time.Hour
defaultDeploymentTimeout = 6 * time.Hour
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, do we have docs that related to this default value 👀

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be included while fixing #1373

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thank you 👍

)

// scheduler is a dedicated object for a specific deployment of a single application.
Expand Down Expand Up @@ -220,7 +220,7 @@ func (s *scheduler) Run(ctx context.Context) error {
repoCfg, ok := s.pipedConfig.GetRepository(repoID)
if !ok {
s.doneDeploymentStatus = model.DeploymentStatus_DEPLOYMENT_FAILURE
statusReason = fmt.Sprintf("Unable to find %q from the repository list in piped config", repoID)
statusReason = fmt.Sprintf("Repository %q is not found in the piped config", repoID)
Copy link
Member

@khanhtc1202 khanhtc1202 Jan 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, statusDesc or statusDescription or deploymentResultDescription(Desc) can be considered as a better name 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That name was already fixed/used in the proto so let me keep it as is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙆‍♀️

s.reportDeploymentCompleted(ctx, s.doneDeploymentStatus, statusReason, "")
return fmt.Errorf("unable to find %q from the repository list in piped config", repoID)
}
Expand All @@ -247,8 +247,8 @@ func (s *scheduler) Run(ctx context.Context) error {
)
}

// We use another deploy source provider to load the deployment configuration
// at the target commit. This provider is configured with a nil sealedSecretDecrypter
// We use another deploy source provider to load the deployment configuration at the target commit.
// This provider is configured with a nil sealedSecretDecrypter
// because decrypting the sealed secrets is not required.
// We need only the deployment configuration spec.
configDSP := deploysource.NewProvider(
Expand Down Expand Up @@ -327,27 +327,30 @@ func (s *scheduler) Run(ctx context.Context) error {
}

// If all operations of the stage were completed successfully
// go the next stage to handle.
// handle the next stage.
if result == model.StageStatus_STAGE_SUCCESS {
continue
}

sigType := sig.Signal()

// The deployment was cancelled by a web user.
if sigType == executor.StopSignalCancel {
if result == model.StageStatus_STAGE_CANCELLED {
deploymentStatus = model.DeploymentStatus_DEPLOYMENT_CANCELLED
statusReason = fmt.Sprintf("Deployment was cancelled by %s while executing stage %s", cancelCommander, ps.Id)
statusReason = fmt.Sprintf("Cancelled by %s while executing stage %s", cancelCommander, ps.Id)
break
}

// The stage was failed but not caused by the stop signal.
if result == model.StageStatus_STAGE_FAILURE && sigType == executor.StopSignalNone {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original code was missing the handling of the timeout error.

if result == model.StageStatus_STAGE_FAILURE {
deploymentStatus = model.DeploymentStatus_DEPLOYMENT_FAILURE
statusReason = fmt.Sprintf("Failed while executing stage %s", ps.Id)
// The stage was failed because of timing out.
if sig.Signal() == executor.StopSignalTimeout {
statusReason = fmt.Sprintf("Timed out while executing stage %s", ps.Id)
} else {
statusReason = fmt.Sprintf("Failed while executing stage %s", ps.Id)
}
break
}

s.logger.Info("stop scheduler because of temination signal", zap.String("stage-id", ps.Id))
return nil
}

Expand Down