diff --git a/pkg/app/launcher/cmd/launcher/launcher.go b/pkg/app/launcher/cmd/launcher/launcher.go index f6abfcdd48..1ee13bdbd5 100644 --- a/pkg/app/launcher/cmd/launcher/launcher.go +++ b/pkg/app/launcher/cmd/launcher/launcher.go @@ -37,10 +37,12 @@ import ( "sigs.k8s.io/yaml" "github.com/pipe-cd/pipecd/pkg/admin" + "github.com/pipe-cd/pipecd/pkg/app/piped/apistore/commandstore" "github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice" "github.com/pipe-cd/pipecd/pkg/cli" "github.com/pipe-cd/pipecd/pkg/config" "github.com/pipe-cd/pipecd/pkg/git" + "github.com/pipe-cd/pipecd/pkg/model" "github.com/pipe-cd/pipecd/pkg/rpc/rpcauth" "github.com/pipe-cd/pipecd/pkg/rpc/rpcclient" "github.com/pipe-cd/pipecd/pkg/version" @@ -80,12 +82,22 @@ type launcher struct { configRepo git.Repo clientKey string client pipedservice.Client + + commandCh chan model.ReportableCommand + prevCommands map[string]struct{} + commandLister commandLister + detectRestartCommand bool +} + +type commandLister interface { + ListPipedCommands() []model.ReportableCommand } func NewCommand() *cobra.Command { l := &launcher{ checkInterval: time.Minute, gracePeriod: 30 * time.Second, + commandCh: make(chan model.ReportableCommand), } cmd := &cobra.Command{ Use: "launcher", @@ -220,6 +232,32 @@ func (l *launcher) run(ctx context.Context, input cli.Input) error { l.configRepo = repo } + spec, err := l.getSpec(ctx) + if err != nil { + input.Logger.Error(err.Error(), zap.Error(err)) + } + + pipedKey, err := spec.LoadPipedKey() + if err != nil { + input.Logger.Error("failed to load piped key", zap.Error(err)) + return err + } + + // Make gRPC client and connect to the API. + apiClient, err := l.createAPIClient(ctx, spec.APIAddress, spec.ProjectID, spec.PipedID, pipedKey) + if err != nil { + input.Logger.Error("failed to create gRPC client to control plane", zap.Error(err)) + return err + } + + { + store := commandstore.NewStore(apiClient, time.Minute, input.Logger) + group.Go(func() error { + return store.Run(ctx) + }) + l.commandLister = store.Lister() + } + var ( runningPiped *command workingDir = filepath.Join(l.homeDir, "piped") @@ -267,6 +305,21 @@ func (l *launcher) run(ctx context.Context, input cli.Input) error { return nil } + commandHandler := func(ctx context.Context, cmdCh <-chan model.ReportableCommand) { + input.Logger.Info("started a worker for handling restart piped command") + for { + select { + case cmd := <-cmdCh: + l.handleCommand(ctx, input, cmd) + + case <-ctx.Done(): + input.Logger.Info("a worker has been stopped") + return + } + } + } + go commandHandler(ctx, l.commandCh) + group.Go(func() error { // Execute the first time immediately. if err := execute(); err != nil { @@ -279,6 +332,7 @@ func (l *launcher) run(ctx context.Context, input cli.Input) error { select { case <-ticker.C: // Don't return an error to continue piped execution. + l.enqueueNewCommands(ctx, input) execute() case <-ctx.Done(): @@ -302,35 +356,109 @@ func (l *launcher) run(ctx context.Context, input cli.Input) error { return nil } +func (l *launcher) enqueueNewCommands(ctx context.Context, input cli.Input) { + input.Logger.Info("fetching unhandled commands to enqueue") + + commands := l.commandLister.ListPipedCommands() + if len(commands) == 0 { + input.Logger.Info("there is no command to enqueue") + return + } + + news := make([]model.ReportableCommand, 0, len(commands)) + cmds := make(map[string]struct{}, len(commands)) + for _, cmd := range commands { + cmds[cmd.Id] = struct{}{} + if _, ok := l.prevCommands[cmd.Id]; !ok { + news = append(news, cmd) + } + } + + input.Logger.Info("fetched unhandled commands to enqueue", + zap.Any("pre-commands", l.prevCommands), + zap.Any("commands", cmds), + zap.Int("news", len(news)), + ) + + if len(news) == 0 { + input.Logger.Info("there is no new command to enqueue") + return + } + + l.prevCommands = cmds + input.Logger.Info(fmt.Sprintf("will enqueue %d new commands", len(news))) + + for _, cmd := range news { + select { + case l.commandCh <- cmd: + input.Logger.Info("queued a new new command", zap.String("command", cmd.Id)) + + case <-ctx.Done(): + return + } + } +} + +func (l *launcher) handleCommand(ctx context.Context, input cli.Input, cmd model.ReportableCommand) { + logger := input.Logger.With( + zap.String("command", cmd.Id), + ) + logger.Info("received a restart piped command to handle") + + l.detectRestartCommand = true + + if err := cmd.Report(ctx, model.CommandStatus_COMMAND_SUCCEEDED, nil, []byte(cmd.Id)); err != nil { + logger.Error("failed to report command status", zap.Error(err)) + return + } + + input.Logger.Info("successfully handled a restart piped command") +} + +// getSpec returns launcher's spec. +func (l *launcher) getSpec(ctx context.Context) (*config.LauncherSpec, error) { + config, err := l.loadConfigData(ctx) + if err != nil { + return nil, fmt.Errorf("LAUNCHER: error on loading Piped configuration data") + } + + spec, err := parseConfig(config) + if err != nil { + return nil, fmt.Errorf("LAUNCHER: error on parsing Piped configuration data") + } + + return spec, nil +} + // shouldRelaunch fetches the latest state of desired version and config // to determine whether a new Piped should be launched or not. +// This also takes into account whether or not a command +// has been received to make the restart decision. // This also returns the desired version and config. func (l *launcher) shouldRelaunch(ctx context.Context, logger *zap.Logger) (version string, config []byte, should bool, err error) { - config, err = l.loadConfigData(ctx) + spec, err := l.getSpec(ctx) if err != nil { - logger.Error("LAUNCHER: error on loading Piped configuration data", zap.Error(err)) - return - } - - cfg, err := parseConfig(config) - if err != nil { - logger.Error("LAUNCHER: error on parsing Piped configuration data", zap.Error(err)) - return + logger.Error(err.Error(), zap.Error(err)) } - pipedKey, err := cfg.LoadPipedKey() + pipedKey, err := spec.LoadPipedKey() if err != nil { logger.Error("LAUNCHER: error on loading Piped key", zap.Error(err)) return } - version, err = l.getDesiredVersion(ctx, cfg.APIAddress, cfg.ProjectID, cfg.PipedID, pipedKey, logger) + version, err = l.getDesiredVersion(ctx, spec.APIAddress, spec.ProjectID, spec.PipedID, pipedKey, logger) if err != nil { logger.Error("LAUNCHER: error on checking desired version", zap.Error(err)) return } - should = version != l.runningVersion || !bytes.Equal(config, l.runningConfigData) + should = version != l.runningVersion || !bytes.Equal(config, l.runningConfigData) || l.detectRestartCommand + + if l.detectRestartCommand { + l.detectRestartCommand = false + } + return } diff --git a/pkg/app/piped/apistore/commandstore/store.go b/pkg/app/piped/apistore/commandstore/store.go index a735919d7d..26ac4b642c 100644 --- a/pkg/app/piped/apistore/commandstore/store.go +++ b/pkg/app/piped/apistore/commandstore/store.go @@ -36,13 +36,14 @@ type Store interface { Lister() Lister } -// Lister helps list comands. +// Lister helps list commands. // All objects returned here must be treated as read-only. type Lister interface { ListApplicationCommands() []model.ReportableCommand ListDeploymentCommands() []model.ReportableCommand ListStageCommands(deploymentID, stageID string) []model.ReportableCommand ListBuildPlanPreviewCommands() []model.ReportableCommand + ListPipedCommands() []model.ReportableCommand } type store struct { @@ -54,6 +55,7 @@ type store struct { deploymentCommands []model.ReportableCommand stageCommands []model.ReportableCommand planPreviewCommands []model.ReportableCommand + pipedCommands []model.ReportableCommand handledCommands map[string]time.Time mu sync.RWMutex gracePeriod time.Duration @@ -119,6 +121,7 @@ func (s *store) sync(ctx context.Context) error { deploymentCommands = make([]model.ReportableCommand, 0) stageCommands = make([]model.ReportableCommand, 0) planPreviewCommands = make([]model.ReportableCommand, 0) + pipedCommands = make([]model.ReportableCommand, 0) ) for _, cmd := range resp.Commands { switch cmd.Type { @@ -130,6 +133,8 @@ func (s *store) sync(ctx context.Context) error { stageCommands = append(stageCommands, s.makeReportableCommand(cmd)) case model.Command_BUILD_PLAN_PREVIEW: planPreviewCommands = append(planPreviewCommands, s.makeReportableCommand(cmd)) + case model.Command_RESTART_PIPED: + pipedCommands = append(pipedCommands, s.makeReportableCommand(cmd)) } } @@ -138,6 +143,7 @@ func (s *store) sync(ctx context.Context) error { s.deploymentCommands = deploymentCommands s.stageCommands = stageCommands s.planPreviewCommands = planPreviewCommands + s.pipedCommands = pipedCommands s.mu.Unlock() return nil @@ -219,6 +225,20 @@ func (s *store) ListBuildPlanPreviewCommands() []model.ReportableCommand { return commands } +func (s *store) ListPipedCommands() []model.ReportableCommand { + s.mu.RLock() + defer s.mu.RUnlock() + + commands := make([]model.ReportableCommand, 0, len(s.pipedCommands)) + for _, cmd := range s.pipedCommands { + if _, ok := s.handledCommands[cmd.Id]; ok { + continue + } + commands = append(commands, cmd) + } + return commands +} + func (s *store) makeReportableCommand(c *model.Command) model.ReportableCommand { return model.ReportableCommand{ Command: c, diff --git a/pkg/model/command.go b/pkg/model/command.go index 3a9270e739..a014cc7244 100644 --- a/pkg/model/command.go +++ b/pkg/model/command.go @@ -37,6 +37,10 @@ func (c *Command) IsChainSyncApplicationCmd() bool { return c.GetChainSyncApplication() != nil } +func (c *Command) IsRestartPipedCmd() bool { + return c.GetRestartPiped() != nil +} + func (c *Command) SetUpdatedAt(t int64) { c.UpdatedAt = t } diff --git a/web/src/api/piped.ts b/web/src/api/piped.ts index e8d7c90e55..3a5181d25d 100644 --- a/web/src/api/piped.ts +++ b/web/src/api/piped.ts @@ -20,6 +20,8 @@ import { UpdatePipedDesiredVersionResponse, ListReleasedVersionsResponse, ListReleasedVersionsRequest, + RestartPipedRequest, + RestartPipedResponse, } from "pipecd/web/api_client/service_pb"; export const getPipeds = ({ @@ -47,6 +49,14 @@ export const registerPiped = ({ return apiRequest(req, apiClient.registerPiped); }; +export const restartPiped = ({ + pipedId, +}: RestartPipedRequest.AsObject): Promise => { + const req = new RestartPipedRequest(); + req.setPipedId(pipedId); + return apiRequest(req, apiClient.restartPiped); +}; + export const disablePiped = ({ pipedId, }: DisablePipedRequest.AsObject): Promise => { diff --git a/web/src/components/settings-page/piped/components/piped-table-row.tsx b/web/src/components/settings-page/piped/components/piped-table-row.tsx index cb82a61940..19cdc83899 100644 --- a/web/src/components/settings-page/piped/components/piped-table-row.tsx +++ b/web/src/components/settings-page/piped/components/piped-table-row.tsx @@ -33,6 +33,7 @@ import { UI_TEXT_DISABLE, UI_TEXT_EDIT, UI_TEXT_ENABLE, + UI_TEXT_RESTART, } from "~/constants/ui-text"; import { useAppDispatch, useAppSelector } from "~/hooks/redux"; import { @@ -79,6 +80,7 @@ const useStyles = makeStyles((theme) => ({ interface Props { pipedId: string; onEdit: (id: string) => void; + onRestart: (id: string) => void; onDisable: (id: string) => void; onEnable: (id: string) => void; } @@ -93,6 +95,7 @@ const menuStyle = { export const PipedTableRow: FC = memo(function PipedTableRow({ pipedId, + onRestart, onEnable, onDisable, onEdit, @@ -126,6 +129,11 @@ export const PipedTableRow: FC = memo(function PipedTableRow({ onEdit(pipedId); }, [pipedId, onEdit]); + const handleRestart = useCallback(() => { + setAnchorEl(null); + onRestart(pipedId); + }, [pipedId, onRestart]); + const handleAddNewKey = useCallback(() => { setAnchorEl(null); if (hasOldKey) { @@ -252,6 +260,9 @@ export const PipedTableRow: FC = memo(function PipedTableRow({ > {UI_TEXT_DELETE_OLD_KEY} , + + {UI_TEXT_RESTART} + , state.pipeds.registeredPiped ); + const handleRestart = useCallback( + (id: string) => { + dispatch(restartPiped({ pipedId: id })).then(() => { + dispatch(fetchPipeds(true)); + dispatch( + addToast({ message: RESTART_PIPED_SUCCESS, severity: "success" }) + ); + }); + }, + [dispatch] + ); + const handleDisable = useCallback( (id: string) => { dispatch(disablePiped({ pipedId: id })).then(() => { @@ -206,6 +221,7 @@ export const SettingsPipedPage: FC = memo(function SettingsPipedPage() { key={piped.id} pipedId={piped.id} onEdit={handleEdit} + onRestart={handleRestart} onDisable={handleDisable} onEnable={handleEnable} /> diff --git a/web/src/constants/toast-text.ts b/web/src/constants/toast-text.ts index 7ecd614a64..b968e21d0e 100644 --- a/web/src/constants/toast-text.ts +++ b/web/src/constants/toast-text.ts @@ -8,6 +8,7 @@ export const GENERATE_API_KEY_SUCCESS = "Successfully generated API Key."; export const DISABLE_API_KEY_SUCCESS = "Successfully disabled API Key."; // Piped +export const RESTART_PIPED_SUCCESS = "Successfully requested to restart Piped."; export const ADD_PIPED_SUCCESS = "Successfully added Piped."; export const UPDATE_PIPED_SUCCESS = "Successfully updated Piped."; export const DELETE_OLD_PIPED_KEY_SUCCESS = diff --git a/web/src/constants/ui-text.ts b/web/src/constants/ui-text.ts index 6526a56510..b4271f2e8e 100644 --- a/web/src/constants/ui-text.ts +++ b/web/src/constants/ui-text.ts @@ -18,4 +18,5 @@ export const UI_TEXT_UPGRADE = "UPGRADE"; // piped export const UI_TEXT_ADD_NEW_KEY = "Add new Key"; export const UI_TEXT_DELETE_OLD_KEY = "Delete old Key"; +export const UI_TEXT_RESTART = "Restart"; export const UI_TEXT_VIEW_THE_CONFIGURATION = "View the configuration"; diff --git a/web/src/modules/pipeds/index.ts b/web/src/modules/pipeds/index.ts index 3a7487351a..beafc02acc 100644 --- a/web/src/modules/pipeds/index.ts +++ b/web/src/modules/pipeds/index.ts @@ -57,6 +57,13 @@ export const addPiped = createAsyncThunk< return { ...res, isNewKey: false }; }); +export const restartPiped = createAsyncThunk( + `${MODULE_NAME}/restart`, + async ({ pipedId }) => { + await pipedsApi.restartPiped({ pipedId }); + } +); + export const disablePiped = createAsyncThunk( `${MODULE_NAME}/disable`, async ({ pipedId }) => {