diff --git a/pkg/app/api/grpcapi/piped_api.go b/pkg/app/api/grpcapi/piped_api.go index adb10ced5f..d6a2f6e4f9 100644 --- a/pkg/app/api/grpcapi/piped_api.go +++ b/pkg/app/api/grpcapi/piped_api.go @@ -1136,28 +1136,28 @@ func (a *PipedAPI) InChainDeploymentPlannable(ctx context.Context, req *pipedser if err != nil { return nil, err } - if err := a.validateDeploymentBelongsToPiped(ctx, req.Deployment.Id, pipedID); err != nil { + if err := a.validateDeploymentBelongsToPiped(ctx, req.DeploymentId, pipedID); err != nil { return nil, err } - dc, err := a.deploymentChainStore.GetDeploymentChain(ctx, req.Deployment.DeploymentChainId) + dc, err := a.deploymentChainStore.GetDeploymentChain(ctx, req.DeploymentChainId) if err != nil { return nil, status.Error(codes.InvalidArgument, "unable to find the deployment chain which this deployment belongs to") } // Deployment of blocks[0] in the chain means it's the first deployment of the chain; // hence it should be processed without any lock. - if req.Deployment.DeploymentChainBlockIndex == 0 { + if req.DeploymentChainBlockIndex == 0 { return &pipedservice.InChainDeploymentPlannableResponse{ Plannable: true, }, nil } - if req.Deployment.DeploymentChainBlockIndex >= uint32(len(dc.Blocks)) { + if req.DeploymentChainBlockIndex >= uint32(len(dc.Blocks)) { return nil, status.Error(codes.InvalidArgument, "invalid deployment with chain block index provided") } - prevBlock := dc.Blocks[req.Deployment.DeploymentChainBlockIndex-1] + prevBlock := dc.Blocks[req.DeploymentChainBlockIndex-1] plannable := true for _, node := range prevBlock.Nodes { // TODO: Consider add deployment status to the deployment ref in the deployment chain model diff --git a/pkg/app/api/service/pipedservice/service.proto b/pkg/app/api/service/pipedservice/service.proto index 12ff6be3bb..401dd9fc90 100644 --- a/pkg/app/api/service/pipedservice/service.proto +++ b/pkg/app/api/service/pipedservice/service.proto @@ -501,7 +501,9 @@ message CreateDeploymentChainResponse { } message InChainDeploymentPlannableRequest { - pipe.model.Deployment deployment = 1 [(validate.rules).message.required = true]; + string deployment_id = 1 [(validate.rules).string.min_len = 1]; + string deployment_chain_id = 2 [(validate.rules).string.min_len = 1]; + uint32 deployment_chain_block_index = 3; } message InChainDeploymentPlannableResponse { diff --git a/pkg/app/piped/controller/controller.go b/pkg/app/piped/controller/controller.go index 26ea50850f..d46af08644 100644 --- a/pkg/app/piped/controller/controller.go +++ b/pkg/app/piped/controller/controller.go @@ -54,6 +54,8 @@ type apiClient interface { SaveStageMetadata(ctx context.Context, req *pipedservice.SaveStageMetadataRequest, opts ...grpc.CallOption) (*pipedservice.SaveStageMetadataResponse, error) ReportStageLogs(ctx context.Context, req *pipedservice.ReportStageLogsRequest, opts ...grpc.CallOption) (*pipedservice.ReportStageLogsResponse, error) ReportStageLogsFromLastCheckpoint(ctx context.Context, in *pipedservice.ReportStageLogsFromLastCheckpointRequest, opts ...grpc.CallOption) (*pipedservice.ReportStageLogsFromLastCheckpointResponse, error) + + InChainDeploymentPlannable(ctx context.Context, in *pipedservice.InChainDeploymentPlannableRequest, opts ...grpc.CallOption) (*pipedservice.InChainDeploymentPlannableResponse, error) } type gitClient interface { @@ -369,6 +371,32 @@ func (c *controller) syncPlanners(ctx context.Context) error { } for appID, d := range pendingByApp { + plannable, err := c.shouldStartPlanningDeployment(ctx, d) + if err != nil { + c.logger.Error("failed to check deployment plannability", + zap.String("deployment", d.Id), + zap.String("app", d.ApplicationId), + zap.Error(err), + ) + continue + } + + if !plannable { + if d.IsInChainDeployment() { + c.logger.Info("unable to start planning deployment, probably locked by the previous block in its deployment chain", + zap.String("deployment_chain", d.DeploymentChainId), + zap.String("deployment", d.Id), + zap.String("app", d.ApplicationId), + ) + } else { + c.logger.Info("unable to start planning deployment, try again next sync interval", + zap.String("deployment", d.Id), + zap.String("app", d.ApplicationId), + ) + } + continue + } + planner, err := c.startNewPlanner(ctx, d) if err != nil { c.logger.Error("failed to start a new planner", @@ -643,6 +671,21 @@ func (c *controller) getMostRecentlySuccessfulDeployment(ctx context.Context, ap return nil, err } +func (c *controller) shouldStartPlanningDeployment(ctx context.Context, d *model.Deployment) (bool, error) { + if !d.IsInChainDeployment() { + return true, nil + } + resp, err := c.apiClient.InChainDeploymentPlannable(ctx, &pipedservice.InChainDeploymentPlannableRequest{ + DeploymentId: d.Id, + DeploymentChainId: d.DeploymentChainId, + DeploymentChainBlockIndex: d.DeploymentChainBlockIndex, + }) + if err != nil { + return false, err + } + return resp.Plannable, nil +} + type appLiveResourceLister struct { lister liveResourceLister cloudProvider string diff --git a/pkg/model/deployment.go b/pkg/model/deployment.go index bb7742cf1a..f4a7ceac51 100644 --- a/pkg/model/deployment.go +++ b/pkg/model/deployment.go @@ -219,3 +219,9 @@ func (d *Deployment) ContainLabels(labels map[string]string) bool { } return true } + +// IsInChainDeployment returns true if the current deployment belongs +// to a deployment chain. +func (d *Deployment) IsInChainDeployment() bool { + return d.DeploymentChainId != "" +}