Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 151 additions & 12 deletions pkg/app/launcher/cmd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ import (
"sigs.k8s.io/yaml"

"github.com/pipe-cd/pipecd/pkg/admin"
"github.com/pipe-cd/pipecd/pkg/app/piped/apistore/commandstore"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
"github.com/pipe-cd/pipecd/pkg/cli"
"github.com/pipe-cd/pipecd/pkg/config"
"github.com/pipe-cd/pipecd/pkg/git"
"github.com/pipe-cd/pipecd/pkg/model"
"github.com/pipe-cd/pipecd/pkg/rpc/rpcauth"
"github.com/pipe-cd/pipecd/pkg/rpc/rpcclient"
"github.com/pipe-cd/pipecd/pkg/version"
Expand Down Expand Up @@ -80,12 +82,22 @@ type launcher struct {
configRepo git.Repo
clientKey string
client pipedservice.Client

commandCh chan model.ReportableCommand
prevCommands map[string]struct{}
commandLister commandLister
detectRestartCommand bool
}

type commandLister interface {
ListPipedCommands() []model.ReportableCommand
}

func NewCommand() *cobra.Command {
l := &launcher{
checkInterval: time.Minute,
gracePeriod: 30 * time.Second,
commandCh: make(chan model.ReportableCommand),
}
cmd := &cobra.Command{
Use: "launcher",
Expand Down Expand Up @@ -220,6 +232,32 @@ func (l *launcher) run(ctx context.Context, input cli.Input) error {
l.configRepo = repo
}

spec, err := l.getSpec(ctx)
if err != nil {
input.Logger.Error(err.Error(), zap.Error(err))
}

pipedKey, err := spec.LoadPipedKey()
if err != nil {
input.Logger.Error("failed to load piped key", zap.Error(err))
return err
}

// Make gRPC client and connect to the API.
apiClient, err := l.createAPIClient(ctx, spec.APIAddress, spec.ProjectID, spec.PipedID, pipedKey)
if err != nil {
input.Logger.Error("failed to create gRPC client to control plane", zap.Error(err))
return err
}

{
store := commandstore.NewStore(apiClient, time.Minute, input.Logger)
group.Go(func() error {
return store.Run(ctx)
})
l.commandLister = store.Lister()
}

var (
runningPiped *command
workingDir = filepath.Join(l.homeDir, "piped")
Expand Down Expand Up @@ -267,6 +305,31 @@ func (l *launcher) run(ctx context.Context, input cli.Input) error {
return nil
}

commandHandler := func(ctx context.Context, cmdCh <-chan model.ReportableCommand) error {
input.Logger.Info("started a worker for handling restart piped command")
for {
select {
case cmd := <-cmdCh:
if err := l.handleCommand(ctx, input, cmd); err != nil {
return err
}

case <-ctx.Done():
input.Logger.Info("a worker has been stopped")
return nil
}
}
}
group.Go(func() error {
if err := commandHandler(ctx, l.commandCh); err != nil {
input.Logger.Info("LAUNCHER: failed to handle restart piped command",
zap.Error(err),
)
return err
}
return nil
})

group.Go(func() error {
// Execute the first time immediately.
if err := execute(); err != nil {
Expand All @@ -279,6 +342,7 @@ func (l *launcher) run(ctx context.Context, input cli.Input) error {
select {
case <-ticker.C:
// Don't return an error to continue piped execution.
l.enqueueNewCommands(ctx, input)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel it's a bit overdo here. Since the command restart piped is not that much and it should be handled immediately on given, making a queue of commands for that is unneeded 🤔 Should we just list commands and restart it instead of sending them to the commandhandler? wdyt @knanao @Szarny

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got your point and I feel it's better but it's up to you since there are already some implementations like this. 🙌

execute()

case <-ctx.Done():
Expand All @@ -302,35 +366,110 @@ func (l *launcher) run(ctx context.Context, input cli.Input) error {
return nil
}

func (l *launcher) enqueueNewCommands(ctx context.Context, input cli.Input) {
input.Logger.Info("fetching unhandled commands to enqueue")

commands := l.commandLister.ListPipedCommands()
if len(commands) == 0 {
input.Logger.Info("there is no command to enqueue")
return
}

news := make([]model.ReportableCommand, 0, len(commands))
cmds := make(map[string]struct{}, len(commands))
for _, cmd := range commands {
cmds[cmd.Id] = struct{}{}
if _, ok := l.prevCommands[cmd.Id]; !ok {
news = append(news, cmd)
}
}

input.Logger.Info("fetched unhandled commands to enqueue",
zap.Any("pre-commands", l.prevCommands),
zap.Any("commands", cmds),
zap.Int("news", len(news)),
)

if len(news) == 0 {
input.Logger.Info("there is no new command to enqueue")
return
}

l.prevCommands = cmds
input.Logger.Info(fmt.Sprintf("will enqueue %d new commands", len(news)))

for _, cmd := range news {
select {
case l.commandCh <- cmd:
input.Logger.Info("queued a new new command", zap.String("command", cmd.Id))

case <-ctx.Done():
return
}
}
}

func (l *launcher) handleCommand(ctx context.Context, input cli.Input, cmd model.ReportableCommand) error {
logger := input.Logger.With(
zap.String("command", cmd.Id),
)
logger.Info("received a restart piped command to handle")

l.detectRestartCommand = true
Copy link
Member

@khanhtc1202 khanhtc1202 Jun 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just feel like this one is not an effective way to implement this feature, we need one ticker (to fetch the commands) and one other ticker(util launcher self checkInterval) to restart the piped. Should we rewrite some part where we restart piped by launcher to reuse that?
ref: current rerun logic https://github.com/pipe-cd/pipecd/blob/master/pkg/app/launcher/cmd/launcher/launcher.go#L248-L262

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can keep the use of this l.detectRestartCommand flag but instead of forking a new go routine to just change this flag value, we can make this in the current enqueueNewCommands, and in execute function, we have shouldRelaunch() function which check for the flag and restart the piped for us 🤔


if err := cmd.Report(ctx, model.CommandStatus_COMMAND_SUCCEEDED, nil, []byte(cmd.Id)); err != nil {
logger.Error("failed to report command status", zap.Error(err))
return err
}

input.Logger.Info("successfully handled a restart piped command")
return nil
}

// getSpec returns launcher's spec.
func (l *launcher) getSpec(ctx context.Context) (*config.LauncherSpec, error) {
config, err := l.loadConfigData(ctx)
if err != nil {
return nil, fmt.Errorf("LAUNCHER: error on loading Piped configuration data")
}

spec, err := parseConfig(config)
if err != nil {
return nil, fmt.Errorf("LAUNCHER: error on parsing Piped configuration data")
}

return spec, nil
}

// shouldRelaunch fetches the latest state of desired version and config
// to determine whether a new Piped should be launched or not.
// This also takes into account whether or not a command
// has been received to make the restart decision.
// This also returns the desired version and config.
func (l *launcher) shouldRelaunch(ctx context.Context, logger *zap.Logger) (version string, config []byte, should bool, err error) {
config, err = l.loadConfigData(ctx)
spec, err := l.getSpec(ctx)
if err != nil {
logger.Error("LAUNCHER: error on loading Piped configuration data", zap.Error(err))
return
logger.Error(err.Error(), zap.Error(err))
}

cfg, err := parseConfig(config)
if err != nil {
logger.Error("LAUNCHER: error on parsing Piped configuration data", zap.Error(err))
return
}

pipedKey, err := cfg.LoadPipedKey()
pipedKey, err := spec.LoadPipedKey()
if err != nil {
logger.Error("LAUNCHER: error on loading Piped key", zap.Error(err))
return
}

version, err = l.getDesiredVersion(ctx, cfg.APIAddress, cfg.ProjectID, cfg.PipedID, pipedKey, logger)
version, err = l.getDesiredVersion(ctx, spec.APIAddress, spec.ProjectID, spec.PipedID, pipedKey, logger)
if err != nil {
logger.Error("LAUNCHER: error on checking desired version", zap.Error(err))
return
}

should = version != l.runningVersion || !bytes.Equal(config, l.runningConfigData)
should = version != l.runningVersion || !bytes.Equal(config, l.runningConfigData) || l.detectRestartCommand

if l.detectRestartCommand {
l.detectRestartCommand = false
}

return
}

Expand Down
22 changes: 21 additions & 1 deletion pkg/app/piped/apistore/commandstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ type Store interface {
Lister() Lister
}

// Lister helps list comands.
// Lister helps list commands.
// All objects returned here must be treated as read-only.
type Lister interface {
ListApplicationCommands() []model.ReportableCommand
ListDeploymentCommands() []model.ReportableCommand
ListStageCommands(deploymentID, stageID string) []model.ReportableCommand
ListBuildPlanPreviewCommands() []model.ReportableCommand
ListPipedCommands() []model.ReportableCommand
}

type store struct {
Expand All @@ -54,6 +55,7 @@ type store struct {
deploymentCommands []model.ReportableCommand
stageCommands []model.ReportableCommand
planPreviewCommands []model.ReportableCommand
pipedCommands []model.ReportableCommand
handledCommands map[string]time.Time
mu sync.RWMutex
gracePeriod time.Duration
Expand Down Expand Up @@ -119,6 +121,7 @@ func (s *store) sync(ctx context.Context) error {
deploymentCommands = make([]model.ReportableCommand, 0)
stageCommands = make([]model.ReportableCommand, 0)
planPreviewCommands = make([]model.ReportableCommand, 0)
pipedCommands = make([]model.ReportableCommand, 0)
)
for _, cmd := range resp.Commands {
switch cmd.Type {
Expand All @@ -130,6 +133,8 @@ func (s *store) sync(ctx context.Context) error {
stageCommands = append(stageCommands, s.makeReportableCommand(cmd))
case model.Command_BUILD_PLAN_PREVIEW:
planPreviewCommands = append(planPreviewCommands, s.makeReportableCommand(cmd))
case model.Command_RESTART_PIPED:
pipedCommands = append(pipedCommands, s.makeReportableCommand(cmd))
}
}

Expand All @@ -138,6 +143,7 @@ func (s *store) sync(ctx context.Context) error {
s.deploymentCommands = deploymentCommands
s.stageCommands = stageCommands
s.planPreviewCommands = planPreviewCommands
s.pipedCommands = pipedCommands
s.mu.Unlock()

return nil
Expand Down Expand Up @@ -219,6 +225,20 @@ func (s *store) ListBuildPlanPreviewCommands() []model.ReportableCommand {
return commands
}

func (s *store) ListPipedCommands() []model.ReportableCommand {
s.mu.RLock()
defer s.mu.RUnlock()

commands := make([]model.ReportableCommand, 0, len(s.pipedCommands))
for _, cmd := range s.pipedCommands {
if _, ok := s.handledCommands[cmd.Id]; ok {
continue
}
commands = append(commands, cmd)
}
return commands
}

func (s *store) makeReportableCommand(c *model.Command) model.ReportableCommand {
return model.ReportableCommand{
Command: c,
Expand Down
4 changes: 4 additions & 0 deletions pkg/model/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (c *Command) IsChainSyncApplicationCmd() bool {
return c.GetChainSyncApplication() != nil
}

func (c *Command) IsRestartPipedCmd() bool {
return c.GetRestartPiped() != nil
}

func (c *Command) SetUpdatedAt(t int64) {
c.UpdatedAt = t
}