diff --git a/pkg/app/api/grpcapi/piped_api.go b/pkg/app/api/grpcapi/piped_api.go index f9b425b946..3da696e40e 100644 --- a/pkg/app/api/grpcapi/piped_api.go +++ b/pkg/app/api/grpcapi/piped_api.go @@ -20,6 +20,7 @@ import ( "errors" "time" + "github.com/google/uuid" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -42,6 +43,7 @@ import ( type PipedAPI struct { applicationStore datastore.ApplicationStore deploymentStore datastore.DeploymentStore + deploymentChainStore datastore.DeploymentChainStore environmentStore datastore.EnvironmentStore pipedStore datastore.PipedStore projectStore datastore.ProjectStore @@ -66,6 +68,7 @@ func NewPipedAPI(ctx context.Context, ds datastore.DataStore, sls stagelogstore. a := &PipedAPI{ applicationStore: datastore.NewApplicationStore(ds), deploymentStore: datastore.NewDeploymentStore(ds), + deploymentChainStore: datastore.NewDeploymentChainStore(ds), environmentStore: datastore.NewEnvironmentStore(ds), pipedStore: datastore.NewPipedStore(ds), projectStore: datastore.NewProjectStore(ds), @@ -963,6 +966,135 @@ func (a *PipedAPI) ReportUnregisteredApplicationConfigurations(ctx context.Conte return nil, status.Errorf(codes.Unimplemented, "ReportUnregisteredApplicationConfigurations is not implemented yet") } +// CreateDeploymentChain creates a new deployment chain object and all required commands to +// trigger deployment for applications in the chain. +func (a *PipedAPI) CreateDeploymentChain(ctx context.Context, req *pipedservice.CreateDeploymentChainRequest) (*pipedservice.CreateDeploymentChainResponse, error) { + firstDeployment := req.FirstDeployment + projectID, pipedID, _, err := rpcauth.ExtractPipedToken(ctx) + if err != nil { + return nil, err + } + if err := a.validateAppBelongsToPiped(ctx, firstDeployment.ApplicationId, pipedID); err != nil { + return nil, err + } + + buildChainNodes := func(matcher *pipedservice.CreateDeploymentChainRequest_ApplicationMatcher) ([]*model.ChainNode, []*model.Application, error) { + filters := []datastore.ListFilter{ + { + Field: "ProjectId", + Operator: datastore.OperatorEqual, + Value: projectID, + }, + } + + if matcher.Name != "" { + filters = append(filters, datastore.ListFilter{ + Field: "Name", + Operator: datastore.OperatorEqual, + Value: matcher.Name, + }) + } + + // TODO: Support find node apps by appKind and appLabels. + + apps, _, err := a.applicationStore.ListApplications(ctx, datastore.ListOptions{ + Filters: filters, + }) + if err != nil { + return nil, nil, err + } + + nodes := make([]*model.ChainNode, 0, len(apps)) + for _, app := range apps { + nodes = append(nodes, &model.ChainNode{ + ApplicationRef: &model.ChainApplicationRef{ + ApplicationId: app.Id, + ApplicationName: app.Name, + }, + }) + } + + return nodes, apps, nil + } + + chainBlocks := make([]*model.ChainBlock, 0, len(req.Matchers)+1) + // Add the first deployment which created by piped as the first block of the chain. + chainBlocks = append(chainBlocks, &model.ChainBlock{ + Index: 0, + Nodes: []*model.ChainNode{ + { + ApplicationRef: &model.ChainApplicationRef{ + ApplicationId: firstDeployment.ApplicationId, + ApplicationName: firstDeployment.ApplicationName, + }, + DeploymentRef: &model.ChainDeploymentRef{ + DeploymentId: firstDeployment.Id, + }, + }, + }, + }) + + blockAppsMap := make(map[int][]*model.Application, len(req.Matchers)) + for i, filter := range req.Matchers { + nodes, blockApps, err := buildChainNodes(filter) + if err != nil { + return nil, err + } + + blockAppsMap[i+1] = blockApps + chainBlocks = append(chainBlocks, &model.ChainBlock{ + Index: int32(i + 1), + Nodes: nodes, + }) + } + + dc := model.DeploymentChain{ + Id: uuid.New().String(), + ProjectId: projectID, + Blocks: chainBlocks, + } + + // Create a new deployment chain instance to control newly triggered deployment chain. + if err := a.deploymentChainStore.AddDeploymentChain(ctx, &dc); err != nil { + a.logger.Error("failed to create deployment chain", zap.Error(err)) + return nil, status.Error(codes.Internal, "failed to trigger new deployment chain") + } + + firstDeployment.DeploymentChainId = dc.Id + // Trigger new deployment for the first application by store first deployment to datastore. + if err := a.deploymentStore.AddDeployment(ctx, firstDeployment); err != nil { + a.logger.Error("failed to create deployment", zap.Error(err)) + return nil, status.Error(codes.Internal, "failed to trigger new deployment for the first application in chain") + } + + // Make sync application command for applications of the chain. + for blockIndex, apps := range blockAppsMap { + for _, app := range apps { + cmd := model.Command{ + Id: uuid.New().String(), + PipedId: app.PipedId, + ApplicationId: app.Id, + ProjectId: app.ProjectId, + Commander: dc.Id, + Type: model.Command_CHAIN_SYNC_APPLICATION, + ChainSyncApplication: &model.Command_ChainSyncApplication{ + DeploymentChainId: dc.Id, + BlockIndex: int32(blockIndex), + ApplicationId: app.Id, + SyncStrategy: model.SyncStrategy_AUTO, + }, + } + + if err := addCommand(ctx, a.commandStore, &cmd, a.logger); err != nil { + a.logger.Error("failed to create command to trigger application in chain", zap.Error(err)) + return nil, status.Error(codes.Internal, "failed to command to trigger for applications in chain") + } + } + } + + return &pipedservice.CreateDeploymentChainResponse{}, nil +} + // validateAppBelongsToPiped checks if the given application belongs to the given piped. // It gives back an error unless the application belongs to the piped. func (a *PipedAPI) validateAppBelongsToPiped(ctx context.Context, appID, pipedID string) error { diff --git a/pkg/app/api/service/pipedservice/service.proto b/pkg/app/api/service/pipedservice/service.proto index d7d840f204..1e150956db 100644 --- a/pkg/app/api/service/pipedservice/service.proto +++ b/pkg/app/api/service/pipedservice/service.proto @@ -169,6 +169,10 @@ service PipedService { rpc UpdateApplicationConfigurations(UpdateApplicationConfigurationsRequest) returns (UpdateApplicationConfigurationsResponse) {} // ReportLatestUnusedApplicationConfigurations puts the latest configurations of applications that isn't registered yet. rpc ReportUnregisteredApplicationConfigurations(ReportUnregisteredApplicationConfigurationsRequest) returns (ReportUnregisteredApplicationConfigurationsResponse) {} + + // CreateDeploymentChain creates a new deployment chain object and all required commands to + // trigger deployment for applications in the chain. + rpc CreateDeploymentChain(CreateDeploymentChainRequest) returns (CreateDeploymentChainResponse) {} } enum ListOrder { @@ -477,3 +481,17 @@ message ReportUnregisteredApplicationConfigurationsRequest { message ReportUnregisteredApplicationConfigurationsResponse { } + +message CreateDeploymentChainRequest { + message ApplicationMatcher { + string name = 1; + string kind = 2; + map labels = 3; + } + + pipe.model.Deployment first_deployment = 1 [(validate.rules).message.required = true]; + repeated ApplicationMatcher matchers = 2; +} + +message CreateDeploymentChainResponse { +} diff --git a/pkg/app/piped/trigger/BUILD.bazel b/pkg/app/piped/trigger/BUILD.bazel index 28126f1dca..45374fd689 100644 --- a/pkg/app/piped/trigger/BUILD.bazel +++ b/pkg/app/piped/trigger/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "cache.go", "deployment.go", + "deployment_chain.go", "determiner.go", "trigger.go", ], diff --git a/pkg/app/piped/trigger/deployment.go b/pkg/app/piped/trigger/deployment.go index 4c6869d1b5..f1ba5f0896 100644 --- a/pkg/app/piped/trigger/deployment.go +++ b/pkg/app/piped/trigger/deployment.go @@ -21,7 +21,6 @@ import ( "time" "github.com/google/uuid" - "go.uber.org/zap" "github.com/pipe-cd/pipe/pkg/app/api/service/pipedservice" "github.com/pipe-cd/pipe/pkg/config" @@ -31,47 +30,14 @@ import ( func (t *Trigger) triggerDeployment( ctx context.Context, - app *model.Application, - appCfg *config.GenericDeploymentSpec, - branch string, - commit git.Commit, - commander string, - syncStrategy model.SyncStrategy, - strategySummary string, -) (*model.Deployment, error) { - - // Build deployment model to trigger. - deployment, err := buildDeployment( - app, - branch, - commit, - commander, - syncStrategy, - strategySummary, - time.Now(), - appCfg.DeploymentNotification, - ) - if err != nil { - return nil, fmt.Errorf("could not initialize deployment: %w", err) - } - - // Send deployment model to control-plane to trigger. - t.logger.Info(fmt.Sprintf("application %s will be triggered to sync", app.Id), zap.String("commit", commit.Hash)) - _, err = t.apiClient.CreateDeployment(ctx, &pipedservice.CreateDeploymentRequest{ + deployment *model.Deployment, +) error { + if _, err := t.apiClient.CreateDeployment(ctx, &pipedservice.CreateDeploymentRequest{ Deployment: deployment, - }) - if err != nil { - return nil, fmt.Errorf("cound not register a new deployment to control-plane: %w", err) + }); err != nil { + return fmt.Errorf("cound not register a new deployment to control-plane: %w", err) } - - // TODO: Find a better way to ensure that the application should be updated correctly - // when the deployment was successfully triggered. - // This error is ignored because the deployment was already registered successfully. - if e := reportMostRecentlyTriggeredDeployment(ctx, t.apiClient, deployment); e != nil { - t.logger.Error("failed to report most recently triggered deployment", zap.Error(e)) - } - - return deployment, nil + return nil } func buildDeployment( @@ -83,6 +49,7 @@ func buildDeployment( strategySummary string, now time.Time, noti *config.DeploymentNotification, + deploymentChainID string, ) (*model.Deployment, error) { var commitURL string @@ -125,14 +92,15 @@ func buildDeployment( SyncStrategy: syncStrategy, StrategySummary: strategySummary, }, - GitPath: app.GitPath, - CloudProvider: app.CloudProvider, - Labels: app.Labels, - Status: model.DeploymentStatus_DEPLOYMENT_PENDING, - StatusReason: "The deployment is waiting to be planned", - Metadata: metadata, - CreatedAt: now.Unix(), - UpdatedAt: now.Unix(), + GitPath: app.GitPath, + CloudProvider: app.CloudProvider, + Labels: app.Labels, + Status: model.DeploymentStatus_DEPLOYMENT_PENDING, + StatusReason: "The deployment is waiting to be planned", + Metadata: metadata, + CreatedAt: now.Unix(), + UpdatedAt: now.Unix(), + DeploymentChainId: deploymentChainID, } return deployment, nil diff --git a/pkg/app/piped/trigger/deployment_chain.go b/pkg/app/piped/trigger/deployment_chain.go new file mode 100644 index 0000000000..2ad7c6c4ed --- /dev/null +++ b/pkg/app/piped/trigger/deployment_chain.go @@ -0,0 +1,47 @@ +// Copyright 2021 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trigger + +import ( + "context" + "fmt" + + "github.com/pipe-cd/pipe/pkg/app/api/service/pipedservice" + "github.com/pipe-cd/pipe/pkg/config" + "github.com/pipe-cd/pipe/pkg/model" +) + +func (t *Trigger) triggerDeploymentChain( + ctx context.Context, + dc *config.DeploymentChain, + firstDeployment *model.Deployment, +) error { + matchers := make([]*pipedservice.CreateDeploymentChainRequest_ApplicationMatcher, 0, len(dc.ApplicationMatchers)) + for _, m := range dc.ApplicationMatchers { + matchers = append(matchers, &pipedservice.CreateDeploymentChainRequest_ApplicationMatcher{ + Name: m.Name, + Kind: m.Kind, + Labels: m.Labels, + }) + } + + if _, err := t.apiClient.CreateDeploymentChain(ctx, &pipedservice.CreateDeploymentChainRequest{ + Matchers: matchers, + FirstDeployment: firstDeployment, + }); err != nil { + return fmt.Errorf("could not create new deployment chain: %w", err) + } + return nil +} diff --git a/pkg/app/piped/trigger/trigger.go b/pkg/app/piped/trigger/trigger.go index 01fee3f6b2..d6fd7180cf 100644 --- a/pkg/app/piped/trigger/trigger.go +++ b/pkg/app/piped/trigger/trigger.go @@ -44,6 +44,7 @@ type apiClient interface { CreateDeployment(ctx context.Context, in *pipedservice.CreateDeploymentRequest, opts ...grpc.CallOption) (*pipedservice.CreateDeploymentResponse, error) GetDeployment(ctx context.Context, in *pipedservice.GetDeploymentRequest, opts ...grpc.CallOption) (*pipedservice.GetDeploymentResponse, error) ReportApplicationMostRecentDeployment(ctx context.Context, req *pipedservice.ReportApplicationMostRecentDeploymentRequest, opts ...grpc.CallOption) (*pipedservice.ReportApplicationMostRecentDeploymentResponse, error) + CreateDeploymentChain(ctx context.Context, in *pipedservice.CreateDeploymentChainRequest, opts ...grpc.CallOption) (*pipedservice.CreateDeploymentChainResponse, error) } type gitClient interface { @@ -269,15 +270,54 @@ func (t *Trigger) checkRepoCandidates(ctx context.Context, repoID string, cs []c strategy = model.SyncStrategy_AUTO } - // Build deployment model and send a request to API to create a new deployment. - deployment, err := t.triggerDeployment(ctx, app, appCfg, branch, headCommit, commander, strategy, strategySummary) + // TODO: Add ability to get deployment chain id from CHAIN_SYNC_APPLICATION command. + var deploymentChainID string + // Build the deployment to trigger. + deployment, err := buildDeployment( + app, + branch, + headCommit, + commander, + strategy, + strategySummary, + time.Now(), + appCfg.DeploymentNotification, + deploymentChainID, + ) if err != nil { - msg := fmt.Sprintf("failed to trigger application %s: %v", app.Id, err) + msg := fmt.Sprintf("failed to build deployment for application %s: %v", app.Id, err) t.notifyDeploymentTriggerFailed(app, appCfg, msg, headCommit) t.logger.Error(msg, zap.Error(err)) continue } + // In case the triggered deployment is of application that can trigger a deployment chain + // create a new deployment chain with its configuration besides with the first deployment + // in that chain. + if appCfg.PostSync != nil && appCfg.PostSync.DeploymentChain != nil { + if err := t.triggerDeploymentChain(ctx, appCfg.PostSync.DeploymentChain, deployment); err != nil { + msg := fmt.Sprintf("failed to trigger application %s and its deployment chain: %v", app.Id, err) + t.notifyDeploymentTriggerFailed(app, appCfg, msg, headCommit) + t.logger.Error(msg, zap.Error(err)) + continue + } + } else { + // Send a request to API to create a new deployment. + if err := t.triggerDeployment(ctx, deployment); err != nil { + msg := fmt.Sprintf("failed to trigger application %s: %v", app.Id, err) + t.notifyDeploymentTriggerFailed(app, appCfg, msg, headCommit) + t.logger.Error(msg, zap.Error(err)) + continue + } + } + + // TODO: Find a better way to ensure that the application should be updated correctly + // when the deployment was successfully triggered. + // This error is ignored because the deployment was already registered successfully. + if e := reportMostRecentlyTriggeredDeployment(ctx, t.apiClient, deployment); e != nil { + t.logger.Error("failed to report most recently triggered deployment", zap.Error(e)) + } + triggered[app.Id] = struct{}{} t.commitStore.Put(app.Id, headCommit.Hash) t.notifyDeploymentTriggered(ctx, appCfg, deployment) diff --git a/pkg/app/web/src/__fixtures__/dummy-deployment.ts b/pkg/app/web/src/__fixtures__/dummy-deployment.ts index 881f94b2ab..8ad653fdd3 100644 --- a/pkg/app/web/src/__fixtures__/dummy-deployment.ts +++ b/pkg/app/web/src/__fixtures__/dummy-deployment.ts @@ -43,6 +43,7 @@ export const dummyDeployment: Deployment.AsObject = { }, kind: ApplicationKind.KUBERNETES, metadataMap: [], + deploymentChainId: "", }; export function createDeploymentFromObject(o: Deployment.AsObject): Deployment { diff --git a/pkg/app/web/src/components/deployments-detail-page/pipeline/index.stories.tsx b/pkg/app/web/src/components/deployments-detail-page/pipeline/index.stories.tsx index e1d66a8d49..0f27144609 100644 --- a/pkg/app/web/src/components/deployments-detail-page/pipeline/index.stories.tsx +++ b/pkg/app/web/src/components/deployments-detail-page/pipeline/index.stories.tsx @@ -118,6 +118,7 @@ const fakeDeployment: Deployment.AsObject = { completedAt: 0, createdAt: 1592203166, updatedAt: 1592203166, + deploymentChainId: "", }; export default { diff --git a/pkg/datastore/BUILD.bazel b/pkg/datastore/BUILD.bazel index 2892905d77..70d5a8653d 100644 --- a/pkg/datastore/BUILD.bazel +++ b/pkg/datastore/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "applicationstore.go", "commandstore.go", "datastore.go", + "deploymentchainstore.go", "deploymentstore.go", "environmentstore.go", "eventstore.go", diff --git a/pkg/datastore/deploymentchainstore.go b/pkg/datastore/deploymentchainstore.go new file mode 100644 index 0000000000..90bbb61665 --- /dev/null +++ b/pkg/datastore/deploymentchainstore.go @@ -0,0 +1,56 @@ +// Copyright 2021 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datastore + +import ( + "context" + "time" + + "github.com/pipe-cd/pipe/pkg/model" +) + +const DeploymentChainModelKind = "DeploymentChain" + +type DeploymentChainStore interface { + AddDeploymentChain(ctx context.Context, d *model.DeploymentChain) error +} + +type deploymentChainStore struct { + backend + nowFunc func() time.Time +} + +func NewDeploymentChainStore(ds DataStore) DeploymentChainStore { + return &deploymentChainStore{ + backend: backend{ + ds: ds, + }, + nowFunc: time.Now, + } +} + +func (s *deploymentChainStore) AddDeploymentChain(ctx context.Context, dc *model.DeploymentChain) error { + now := s.nowFunc().Unix() + if dc.CreatedAt == 0 { + dc.CreatedAt = now + } + if dc.UpdatedAt == 0 { + dc.UpdatedAt = now + } + if err := dc.Validate(); err != nil { + return err + } + return s.ds.Create(ctx, DeploymentChainModelKind, dc.Id, dc) +} diff --git a/pkg/model/deployment.proto b/pkg/model/deployment.proto index 442706e384..a3f6066ecb 100644 --- a/pkg/model/deployment.proto +++ b/pkg/model/deployment.proto @@ -86,6 +86,10 @@ message Deployment { repeated PipelineStage stages = 32; map metadata = 33; + // Reference to the chain which the deployment belongs to. + // Empty means the deployment is a standalone deployment. + string deployment_chain_id = 40; + int64 completed_at = 100 [(validate.rules).int64.gte = 0]; int64 created_at = 101 [(validate.rules).int64.gte = 0]; int64 updated_at = 102 [(validate.rules).int64.gte = 0]; diff --git a/pkg/model/deployment_chain.proto b/pkg/model/deployment_chain.proto index 6e35baf8c7..d454d75362 100644 --- a/pkg/model/deployment_chain.proto +++ b/pkg/model/deployment_chain.proto @@ -26,9 +26,9 @@ message DeploymentChain { string id = 1 [(validate.rules).string.min_len = 1]; // The ID of the project this environment belongs to. string project_id = 2 [(validate.rules).string.min_len = 1]; - // List of all deployment node in the chain which contains all + // List of all deployment block in the chain which contains all // configuration required to perform deployment(s). - repeated DeploymentChainNode nodes = 3; + repeated ChainBlock blocks = 3; // Unix time when all the applications in this chain are deployed. int64 completed_at = 100 [(validate.rules).int64.gte = 0]; // Unix time when the deployment chain is created. @@ -37,11 +37,26 @@ message DeploymentChain { int64 updated_at = 102 [(validate.rules).int64.gt = 0]; } -message DeploymentChainNode { +message ChainApplicationRef { + string application_id = 1 [(validate.rules).string.min_len = 1]; + string application_name = 2; +} + +message ChainDeploymentRef { + string deployment_id = 1 [(validate.rules).string.min_len = 1]; +} + +message ChainNode { + ChainApplicationRef application_ref = 1 [(validate.rules).message.required = true]; + ChainDeploymentRef deployment_ref = 2; +} + +message ChainBlock { + // Index represent the offset of the node in chain. + int32 index = 1; // List of applications which should be deployed at the same time in chain. - repeated Deployment deployments = 1; - // Control whether to start to deploy applications in this node or not. - bool runnable = 2; + repeated ChainNode nodes = 2; + // Unix time when the deployment chain node is started. int64 started_at = 100 [(validate.rules).int64.gte = 0]; // Unix time when all the applications in this chain node are deployed.