diff --git a/pkg/app/launcher/cmd/launcher/launcher.go b/pkg/app/launcher/cmd/launcher/launcher.go index f6abfcdd48..793832d833 100644 --- a/pkg/app/launcher/cmd/launcher/launcher.go +++ b/pkg/app/launcher/cmd/launcher/launcher.go @@ -37,10 +37,12 @@ import ( "sigs.k8s.io/yaml" "github.com/pipe-cd/pipecd/pkg/admin" + "github.com/pipe-cd/pipecd/pkg/app/piped/apistore/commandstore" "github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice" "github.com/pipe-cd/pipecd/pkg/cli" "github.com/pipe-cd/pipecd/pkg/config" "github.com/pipe-cd/pipecd/pkg/git" + "github.com/pipe-cd/pipecd/pkg/model" "github.com/pipe-cd/pipecd/pkg/rpc/rpcauth" "github.com/pipe-cd/pipecd/pkg/rpc/rpcclient" "github.com/pipe-cd/pipecd/pkg/version" @@ -80,12 +82,22 @@ type launcher struct { configRepo git.Repo clientKey string client pipedservice.Client + + commandCh chan model.ReportableCommand + prevCommands map[string]struct{} + commandLister commandLister + restartCommanded bool +} + +type commandLister interface { + ListPipedCommands() []model.ReportableCommand } func NewCommand() *cobra.Command { l := &launcher{ checkInterval: time.Minute, gracePeriod: 30 * time.Second, + commandCh: make(chan model.ReportableCommand), } cmd := &cobra.Command{ Use: "launcher", @@ -220,6 +232,32 @@ func (l *launcher) run(ctx context.Context, input cli.Input) error { l.configRepo = repo } + spec, err := l.getSpec(ctx) + if err != nil { + input.Logger.Error(err.Error(), zap.Error(err)) + } + + pipedKey, err := spec.LoadPipedKey() + if err != nil { + input.Logger.Error("failed to load piped key", zap.Error(err)) + return err + } + + // Make gRPC client and connect to the API. + apiClient, err := l.createAPIClient(ctx, spec.APIAddress, spec.ProjectID, spec.PipedID, pipedKey) + if err != nil { + input.Logger.Error("failed to create gRPC client to control plane", zap.Error(err)) + return err + } + + { + store := commandstore.NewStore(apiClient, time.Minute, input.Logger) + group.Go(func() error { + return store.Run(ctx) + }) + l.commandLister = store.Lister() + } + var ( runningPiped *command workingDir = filepath.Join(l.homeDir, "piped") @@ -267,6 +305,31 @@ func (l *launcher) run(ctx context.Context, input cli.Input) error { return nil } + commandHandler := func(ctx context.Context, cmdCh <-chan model.ReportableCommand) error { + input.Logger.Info("started a worker for handling restart piped command") + for { + select { + case cmd := <-cmdCh: + if err := l.handleCommand(ctx, input, cmd); err != nil { + return err + } + + case <-ctx.Done(): + input.Logger.Info("a worker has been stopped") + return nil + } + } + } + group.Go(func() error { + if err := commandHandler(ctx, l.commandCh); err != nil { + input.Logger.Info("LAUNCHER: failed to handle restart piped command", + zap.Error(err), + ) + return err + } + return nil + }) + group.Go(func() error { // Execute the first time immediately. if err := execute(); err != nil { @@ -279,6 +342,7 @@ func (l *launcher) run(ctx context.Context, input cli.Input) error { select { case <-ticker.C: // Don't return an error to continue piped execution. + l.enqueueNewCommands(ctx, input) execute() case <-ctx.Done(): @@ -302,35 +366,105 @@ func (l *launcher) run(ctx context.Context, input cli.Input) error { return nil } +func (l *launcher) enqueueNewCommands(ctx context.Context, input cli.Input) { + input.Logger.Info("LAUNCHER: fetching unhandled commands to enqueue") + + commands := l.commandLister.ListPipedCommands() + if len(commands) == 0 { + input.Logger.Debug("LAUNCHER: there is no command to enqueue") + return + } + + news := make([]model.ReportableCommand, 0, len(commands)) + cmds := make(map[string]struct{}, len(commands)) + for _, cmd := range commands { + cmds[cmd.Id] = struct{}{} + if _, ok := l.prevCommands[cmd.Id]; !ok { + news = append(news, cmd) + } + } + + if len(news) == 0 { + input.Logger.Debug("LAUNCHER: there is no new command to enqueue") + return + } + + l.prevCommands = cmds + input.Logger.Info(fmt.Sprintf("LAUNCHER: will enqueue %d new commands", len(news))) + + for _, cmd := range news { + select { + case l.commandCh <- cmd: + input.Logger.Info("LAUNCHER: queued a new command", zap.String("command", cmd.Id)) + + case <-ctx.Done(): + return + } + } +} + +func (l *launcher) handleCommand(ctx context.Context, input cli.Input, cmd model.ReportableCommand) error { + logger := input.Logger.With( + zap.String("command", cmd.Id), + ) + + if cmd.IsRestartPipedCmd() { + l.restartCommanded = true + } + + if err := cmd.Report(ctx, model.CommandStatus_COMMAND_SUCCEEDED, nil, []byte(cmd.Id)); err != nil { + logger.Error("LAUNCHER: failed to report command status", zap.Error(err)) + return err + } + + input.Logger.Info("LAUNCHER: successfully handled command", zap.String("command", cmd.Id)) + return nil +} + +// getSpec returns launcher's spec. +func (l *launcher) getSpec(ctx context.Context) (*config.LauncherSpec, error) { + config, err := l.loadConfigData(ctx) + if err != nil { + return nil, fmt.Errorf("LAUNCHER: error on loading Piped configuration data") + } + + spec, err := parseConfig(config) + if err != nil { + return nil, fmt.Errorf("LAUNCHER: error on parsing Piped configuration data") + } + + return spec, nil +} + // shouldRelaunch fetches the latest state of desired version and config // to determine whether a new Piped should be launched or not. +// This also takes into account whether or not a command +// has been received to make the restart decision. // This also returns the desired version and config. func (l *launcher) shouldRelaunch(ctx context.Context, logger *zap.Logger) (version string, config []byte, should bool, err error) { - config, err = l.loadConfigData(ctx) + spec, err := l.getSpec(ctx) if err != nil { - logger.Error("LAUNCHER: error on loading Piped configuration data", zap.Error(err)) - return + logger.Error(err.Error(), zap.Error(err)) } - cfg, err := parseConfig(config) - if err != nil { - logger.Error("LAUNCHER: error on parsing Piped configuration data", zap.Error(err)) - return - } - - pipedKey, err := cfg.LoadPipedKey() + pipedKey, err := spec.LoadPipedKey() if err != nil { logger.Error("LAUNCHER: error on loading Piped key", zap.Error(err)) return } - version, err = l.getDesiredVersion(ctx, cfg.APIAddress, cfg.ProjectID, cfg.PipedID, pipedKey, logger) + version, err = l.getDesiredVersion(ctx, spec.APIAddress, spec.ProjectID, spec.PipedID, pipedKey, logger) if err != nil { logger.Error("LAUNCHER: error on checking desired version", zap.Error(err)) return } - should = version != l.runningVersion || !bytes.Equal(config, l.runningConfigData) + should = version != l.runningVersion || !bytes.Equal(config, l.runningConfigData) || l.restartCommanded + + if l.restartCommanded { + l.restartCommanded = false + } + return } diff --git a/pkg/app/piped/apistore/commandstore/store.go b/pkg/app/piped/apistore/commandstore/store.go index a735919d7d..26ac4b642c 100644 --- a/pkg/app/piped/apistore/commandstore/store.go +++ b/pkg/app/piped/apistore/commandstore/store.go @@ -36,13 +36,14 @@ type Store interface { Lister() Lister } -// Lister helps list comands. +// Lister helps list commands. // All objects returned here must be treated as read-only. type Lister interface { ListApplicationCommands() []model.ReportableCommand ListDeploymentCommands() []model.ReportableCommand ListStageCommands(deploymentID, stageID string) []model.ReportableCommand ListBuildPlanPreviewCommands() []model.ReportableCommand + ListPipedCommands() []model.ReportableCommand } type store struct { @@ -54,6 +55,7 @@ type store struct { deploymentCommands []model.ReportableCommand stageCommands []model.ReportableCommand planPreviewCommands []model.ReportableCommand + pipedCommands []model.ReportableCommand handledCommands map[string]time.Time mu sync.RWMutex gracePeriod time.Duration @@ -119,6 +121,7 @@ func (s *store) sync(ctx context.Context) error { deploymentCommands = make([]model.ReportableCommand, 0) stageCommands = make([]model.ReportableCommand, 0) planPreviewCommands = make([]model.ReportableCommand, 0) + pipedCommands = make([]model.ReportableCommand, 0) ) for _, cmd := range resp.Commands { switch cmd.Type { @@ -130,6 +133,8 @@ func (s *store) sync(ctx context.Context) error { stageCommands = append(stageCommands, s.makeReportableCommand(cmd)) case model.Command_BUILD_PLAN_PREVIEW: planPreviewCommands = append(planPreviewCommands, s.makeReportableCommand(cmd)) + case model.Command_RESTART_PIPED: + pipedCommands = append(pipedCommands, s.makeReportableCommand(cmd)) } } @@ -138,6 +143,7 @@ func (s *store) sync(ctx context.Context) error { s.deploymentCommands = deploymentCommands s.stageCommands = stageCommands s.planPreviewCommands = planPreviewCommands + s.pipedCommands = pipedCommands s.mu.Unlock() return nil @@ -219,6 +225,20 @@ func (s *store) ListBuildPlanPreviewCommands() []model.ReportableCommand { return commands } +func (s *store) ListPipedCommands() []model.ReportableCommand { + s.mu.RLock() + defer s.mu.RUnlock() + + commands := make([]model.ReportableCommand, 0, len(s.pipedCommands)) + for _, cmd := range s.pipedCommands { + if _, ok := s.handledCommands[cmd.Id]; ok { + continue + } + commands = append(commands, cmd) + } + return commands +} + func (s *store) makeReportableCommand(c *model.Command) model.ReportableCommand { return model.ReportableCommand{ Command: c, diff --git a/pkg/model/command.go b/pkg/model/command.go index 3a9270e739..a014cc7244 100644 --- a/pkg/model/command.go +++ b/pkg/model/command.go @@ -37,6 +37,10 @@ func (c *Command) IsChainSyncApplicationCmd() bool { return c.GetChainSyncApplication() != nil } +func (c *Command) IsRestartPipedCmd() bool { + return c.GetRestartPiped() != nil +} + func (c *Command) SetUpdatedAt(t int64) { c.UpdatedAt = t }