Skip to content
Closed
152 changes: 140 additions & 12 deletions pkg/app/launcher/cmd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ import (
"sigs.k8s.io/yaml"

"github.com/pipe-cd/pipecd/pkg/admin"
"github.com/pipe-cd/pipecd/pkg/app/piped/apistore/commandstore"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
"github.com/pipe-cd/pipecd/pkg/cli"
"github.com/pipe-cd/pipecd/pkg/config"
"github.com/pipe-cd/pipecd/pkg/git"
"github.com/pipe-cd/pipecd/pkg/model"
"github.com/pipe-cd/pipecd/pkg/rpc/rpcauth"
"github.com/pipe-cd/pipecd/pkg/rpc/rpcclient"
"github.com/pipe-cd/pipecd/pkg/version"
Expand Down Expand Up @@ -80,12 +82,22 @@ type launcher struct {
configRepo git.Repo
clientKey string
client pipedservice.Client

commandCh chan model.ReportableCommand
prevCommands map[string]struct{}
commandLister commandLister
detectRestartCommand bool
}

type commandLister interface {
ListPipedCommands() []model.ReportableCommand
}

func NewCommand() *cobra.Command {
l := &launcher{
checkInterval: time.Minute,
gracePeriod: 30 * time.Second,
commandCh: make(chan model.ReportableCommand),
}
cmd := &cobra.Command{
Use: "launcher",
Expand Down Expand Up @@ -220,6 +232,32 @@ func (l *launcher) run(ctx context.Context, input cli.Input) error {
l.configRepo = repo
}

spec, err := l.getSpec(ctx)
if err != nil {
input.Logger.Error(err.Error(), zap.Error(err))
}

pipedKey, err := spec.LoadPipedKey()
if err != nil {
input.Logger.Error("failed to load piped key", zap.Error(err))
return err
}

// Make gRPC client and connect to the API.
apiClient, err := l.createAPIClient(ctx, spec.APIAddress, spec.ProjectID, spec.PipedID, pipedKey)
if err != nil {
input.Logger.Error("failed to create gRPC client to control plane", zap.Error(err))
return err
}

{
store := commandstore.NewStore(apiClient, time.Minute, input.Logger)
group.Go(func() error {
return store.Run(ctx)
})
l.commandLister = store.Lister()
}

var (
runningPiped *command
workingDir = filepath.Join(l.homeDir, "piped")
Expand Down Expand Up @@ -267,6 +305,21 @@ func (l *launcher) run(ctx context.Context, input cli.Input) error {
return nil
}

commandHandler := func(ctx context.Context, cmdCh <-chan model.ReportableCommand) {
input.Logger.Info("started a worker for handling restart piped command")
for {
select {
case cmd := <-cmdCh:
l.handleCommand(ctx, input, cmd)

case <-ctx.Done():
input.Logger.Info("a worker has been stopped")
return
}
}
}
go commandHandler(ctx, l.commandCh)

group.Go(func() error {
// Execute the first time immediately.
if err := execute(); err != nil {
Expand All @@ -279,6 +332,7 @@ func (l *launcher) run(ctx context.Context, input cli.Input) error {
select {
case <-ticker.C:
// Don't return an error to continue piped execution.
l.enqueueNewCommands(ctx, input)
execute()

case <-ctx.Done():
Expand All @@ -302,35 +356,109 @@ func (l *launcher) run(ctx context.Context, input cli.Input) error {
return nil
}

func (l *launcher) enqueueNewCommands(ctx context.Context, input cli.Input) {
input.Logger.Info("fetching unhandled commands to enqueue")

commands := l.commandLister.ListPipedCommands()
if len(commands) == 0 {
input.Logger.Info("there is no command to enqueue")
return
}

news := make([]model.ReportableCommand, 0, len(commands))
cmds := make(map[string]struct{}, len(commands))
for _, cmd := range commands {
cmds[cmd.Id] = struct{}{}
if _, ok := l.prevCommands[cmd.Id]; !ok {
news = append(news, cmd)
}
}

input.Logger.Info("fetched unhandled commands to enqueue",
zap.Any("pre-commands", l.prevCommands),
zap.Any("commands", cmds),
zap.Int("news", len(news)),
)

if len(news) == 0 {
input.Logger.Info("there is no new command to enqueue")
return
}

l.prevCommands = cmds
input.Logger.Info(fmt.Sprintf("will enqueue %d new commands", len(news)))

for _, cmd := range news {
select {
case l.commandCh <- cmd:
input.Logger.Info("queued a new new command", zap.String("command", cmd.Id))

case <-ctx.Done():
return
}
}
}

func (l *launcher) handleCommand(ctx context.Context, input cli.Input, cmd model.ReportableCommand) {
logger := input.Logger.With(
zap.String("command", cmd.Id),
)
logger.Info("received a restart piped command to handle")

l.detectRestartCommand = true

if err := cmd.Report(ctx, model.CommandStatus_COMMAND_SUCCEEDED, nil, []byte(cmd.Id)); err != nil {
logger.Error("failed to report command status", zap.Error(err))
return
}

input.Logger.Info("successfully handled a restart piped command")
}

// getSpec returns launcher's spec.
func (l *launcher) getSpec(ctx context.Context) (*config.LauncherSpec, error) {
config, err := l.loadConfigData(ctx)
if err != nil {
return nil, fmt.Errorf("LAUNCHER: error on loading Piped configuration data")
}

spec, err := parseConfig(config)
if err != nil {
return nil, fmt.Errorf("LAUNCHER: error on parsing Piped configuration data")
}

return spec, nil
}

// shouldRelaunch fetches the latest state of desired version and config
// to determine whether a new Piped should be launched or not.
// This also takes into account whether or not a command
// has been received to make the restart decision.
// This also returns the desired version and config.
func (l *launcher) shouldRelaunch(ctx context.Context, logger *zap.Logger) (version string, config []byte, should bool, err error) {
config, err = l.loadConfigData(ctx)
spec, err := l.getSpec(ctx)
if err != nil {
logger.Error("LAUNCHER: error on loading Piped configuration data", zap.Error(err))
return
}

cfg, err := parseConfig(config)
if err != nil {
logger.Error("LAUNCHER: error on parsing Piped configuration data", zap.Error(err))
return
logger.Error(err.Error(), zap.Error(err))
}

pipedKey, err := cfg.LoadPipedKey()
pipedKey, err := spec.LoadPipedKey()
if err != nil {
logger.Error("LAUNCHER: error on loading Piped key", zap.Error(err))
return
}

version, err = l.getDesiredVersion(ctx, cfg.APIAddress, cfg.ProjectID, cfg.PipedID, pipedKey, logger)
version, err = l.getDesiredVersion(ctx, spec.APIAddress, spec.ProjectID, spec.PipedID, pipedKey, logger)
if err != nil {
logger.Error("LAUNCHER: error on checking desired version", zap.Error(err))
return
}

should = version != l.runningVersion || !bytes.Equal(config, l.runningConfigData)
should = version != l.runningVersion || !bytes.Equal(config, l.runningConfigData) || l.detectRestartCommand

if l.detectRestartCommand {
l.detectRestartCommand = false
}

return
}

Expand Down
22 changes: 21 additions & 1 deletion pkg/app/piped/apistore/commandstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ type Store interface {
Lister() Lister
}

// Lister helps list comands.
// Lister helps list commands.
// All objects returned here must be treated as read-only.
type Lister interface {
ListApplicationCommands() []model.ReportableCommand
ListDeploymentCommands() []model.ReportableCommand
ListStageCommands(deploymentID, stageID string) []model.ReportableCommand
ListBuildPlanPreviewCommands() []model.ReportableCommand
ListPipedCommands() []model.ReportableCommand
}

type store struct {
Expand All @@ -54,6 +55,7 @@ type store struct {
deploymentCommands []model.ReportableCommand
stageCommands []model.ReportableCommand
planPreviewCommands []model.ReportableCommand
pipedCommands []model.ReportableCommand
handledCommands map[string]time.Time
mu sync.RWMutex
gracePeriod time.Duration
Expand Down Expand Up @@ -119,6 +121,7 @@ func (s *store) sync(ctx context.Context) error {
deploymentCommands = make([]model.ReportableCommand, 0)
stageCommands = make([]model.ReportableCommand, 0)
planPreviewCommands = make([]model.ReportableCommand, 0)
pipedCommands = make([]model.ReportableCommand, 0)
)
for _, cmd := range resp.Commands {
switch cmd.Type {
Expand All @@ -130,6 +133,8 @@ func (s *store) sync(ctx context.Context) error {
stageCommands = append(stageCommands, s.makeReportableCommand(cmd))
case model.Command_BUILD_PLAN_PREVIEW:
planPreviewCommands = append(planPreviewCommands, s.makeReportableCommand(cmd))
case model.Command_RESTART_PIPED:
pipedCommands = append(pipedCommands, s.makeReportableCommand(cmd))
}
}

Expand All @@ -138,6 +143,7 @@ func (s *store) sync(ctx context.Context) error {
s.deploymentCommands = deploymentCommands
s.stageCommands = stageCommands
s.planPreviewCommands = planPreviewCommands
s.pipedCommands = pipedCommands
s.mu.Unlock()

return nil
Expand Down Expand Up @@ -219,6 +225,20 @@ func (s *store) ListBuildPlanPreviewCommands() []model.ReportableCommand {
return commands
}

func (s *store) ListPipedCommands() []model.ReportableCommand {
s.mu.RLock()
defer s.mu.RUnlock()

commands := make([]model.ReportableCommand, 0, len(s.pipedCommands))
for _, cmd := range s.pipedCommands {
if _, ok := s.handledCommands[cmd.Id]; ok {
continue
}
commands = append(commands, cmd)
}
return commands
}

func (s *store) makeReportableCommand(c *model.Command) model.ReportableCommand {
return model.ReportableCommand{
Command: c,
Expand Down
4 changes: 4 additions & 0 deletions pkg/model/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (c *Command) IsChainSyncApplicationCmd() bool {
return c.GetChainSyncApplication() != nil
}

func (c *Command) IsRestartPipedCmd() bool {
return c.GetRestartPiped() != nil
}

func (c *Command) SetUpdatedAt(t int64) {
c.UpdatedAt = t
}
10 changes: 10 additions & 0 deletions web/src/api/piped.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import {
UpdatePipedDesiredVersionResponse,
ListReleasedVersionsResponse,
ListReleasedVersionsRequest,
RestartPipedRequest,
RestartPipedResponse,
} from "pipecd/web/api_client/service_pb";

export const getPipeds = ({
Expand Down Expand Up @@ -47,6 +49,14 @@ export const registerPiped = ({
return apiRequest(req, apiClient.registerPiped);
};

export const restartPiped = ({
pipedId,
}: RestartPipedRequest.AsObject): Promise<RestartPipedResponse.AsObject> => {
const req = new RestartPipedRequest();
req.setPipedId(pipedId);
return apiRequest(req, apiClient.restartPiped);
};

export const disablePiped = ({
pipedId,
}: DisablePipedRequest.AsObject): Promise<DisablePipedResponse.AsObject> => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
UI_TEXT_DISABLE,
UI_TEXT_EDIT,
UI_TEXT_ENABLE,
UI_TEXT_RESTART,
} from "~/constants/ui-text";
import { useAppDispatch, useAppSelector } from "~/hooks/redux";
import {
Expand Down Expand Up @@ -79,6 +80,7 @@ const useStyles = makeStyles((theme) => ({
interface Props {
pipedId: string;
onEdit: (id: string) => void;
onRestart: (id: string) => void;
onDisable: (id: string) => void;
onEnable: (id: string) => void;
}
Expand All @@ -93,6 +95,7 @@ const menuStyle = {

export const PipedTableRow: FC<Props> = memo(function PipedTableRow({
pipedId,
onRestart,
onEnable,
onDisable,
onEdit,
Expand Down Expand Up @@ -126,6 +129,11 @@ export const PipedTableRow: FC<Props> = memo(function PipedTableRow({
onEdit(pipedId);
}, [pipedId, onEdit]);

const handleRestart = useCallback(() => {
setAnchorEl(null);
onRestart(pipedId);
}, [pipedId, onRestart]);

const handleAddNewKey = useCallback(() => {
setAnchorEl(null);
if (hasOldKey) {
Expand Down Expand Up @@ -252,6 +260,9 @@ export const PipedTableRow: FC<Props> = memo(function PipedTableRow({
>
{UI_TEXT_DELETE_OLD_KEY}
</MenuItem>,
<MenuItem key="piped-menu-restart" onClick={handleRestart}>
{UI_TEXT_RESTART}
</MenuItem>,
<MenuItem
key="piped-menu-open-piped-config"
onClick={handleOpenPipedConfig}
Expand Down
Loading