Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 53 additions & 7 deletions pkg/app/launcher/cmd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (l *launcher) run(ctx context.Context, input cli.Input) error {
)

execute := func() error {
version, config, relaunch, err := l.shouldRelaunch(ctx, input.Logger)
version, config, relaunchOption, err := l.shouldRelaunch(ctx, input.Logger)
if err != nil {
input.Logger.Error("LAUNCHER: failed while checking desired version and config",
zap.String("version", version),
Expand All @@ -236,14 +236,21 @@ func (l *launcher) run(ctx context.Context, input cli.Input) error {
return err
}

if !relaunch {
if !relaunchOption.ShouldRelaunch() {
if runningPiped != nil && runningPiped.IsRunning() {
input.Logger.Info("LAUNCHER: everything up-to-date", zap.String("version", l.runningVersion))
return nil
}
input.Logger.Warn("LAUNCHER: it seems the launched Piped has stopped unexpectedly")
}
input.Logger.Info("LAUNCHER: will relaunch a new Piped because some changes in version/config were detected")

if relaunchOption.withNewCfg {
input.Logger.Info("LAUNCHER: will relaunch a new Piped because some changes in version/config were detected")
} else if relaunchOption.withCurrentCfg {
// The restart request can be ignored when a new config is detected,
// because Piped will be relaunch anyway.
input.Logger.Info("LAUNCHER: will relaunch a new Piped because a restart request was sent")
}

// Stop old piped process and clean its data.
if err := l.cleanOldPiped(runningPiped, workingDir, input.Logger); err != nil {
Expand Down Expand Up @@ -302,10 +309,19 @@ func (l *launcher) run(ctx context.Context, input cli.Input) error {
return nil
}

type RelaunchOption struct {
withCurrentCfg bool
withNewCfg bool
}

func (o *RelaunchOption) ShouldRelaunch() bool {
return o.withCurrentCfg || o.withNewCfg
}

// shouldRelaunch fetches the latest state of desired version and config
// to determine whether a new Piped should be launched or not.
// This also returns the desired version and config.
func (l *launcher) shouldRelaunch(ctx context.Context, logger *zap.Logger) (version string, config []byte, should bool, err error) {
func (l *launcher) shouldRelaunch(ctx context.Context, logger *zap.Logger) (version string, config []byte, relaunchOption RelaunchOption, err error) {
config, err = l.loadConfigData(ctx)
if err != nil {
logger.Error("LAUNCHER: error on loading Piped configuration data", zap.Error(err))
Expand All @@ -324,13 +340,20 @@ func (l *launcher) shouldRelaunch(ctx context.Context, logger *zap.Logger) (vers
return
}

shouldRelaunchWithCurrentCfg, err := l.getNeedRestart(ctx, cfg.APIAddress, cfg.ProjectID, cfg.PipedID, pipedKey, logger)
if err != nil {
logger.Error("LAUNCHER: error on checking restart flag", zap.Error(err))
return
}
relaunchOption.withCurrentCfg = shouldRelaunchWithCurrentCfg

version, err = l.getDesiredVersion(ctx, cfg.APIAddress, cfg.ProjectID, cfg.PipedID, pipedKey, logger)
if err != nil {
logger.Error("LAUNCHER: error on checking desired version", zap.Error(err))
return
}

should = version != l.runningVersion || !bytes.Equal(config, l.runningConfigData)
relaunchOption.withNewCfg = version != l.runningVersion || !bytes.Equal(config, l.runningConfigData)
return
}

Expand Down Expand Up @@ -438,7 +461,7 @@ func (l *launcher) loadConfigData(ctx context.Context) ([]byte, error) {
}, ", "))
}

func (l *launcher) getDesiredVersion(ctx context.Context, address, projectID, pipedID string, pipedKey []byte, logger *zap.Logger) (string, error) {
func (l *launcher) initClient(ctx context.Context, address, projectID, pipedID string, pipedKey []byte, logger *zap.Logger) error {
clientKey := fmt.Sprintf("%s,%s,%s,%s", address, projectID, pipedID, string(pipedKey))

// In order to reduce the time of initializing gRPC client
Expand All @@ -447,12 +470,35 @@ func (l *launcher) getDesiredVersion(ctx context.Context, address, projectID, pi
client, err := l.createAPIClient(ctx, address, projectID, pipedID, pipedKey)
if err != nil {
logger.Error("LAUNCHER: failed to create api client", zap.Error(err))
return "", err
return err
}
l.clientKey = clientKey
l.client = client
}

return nil
}

func (l *launcher) getNeedRestart(ctx context.Context, address, projectID, pipedID string, pipedKey []byte, logger *zap.Logger) (bool, error) {
err := l.initClient(ctx, address, projectID, pipedID, pipedKey, logger)
if err != nil {
return false, err
}

resp, err := l.client.GetNeedRestart(ctx, &pipedservice.GetNeedRestartRequest{})
if err != nil {
return false, err
}

return resp.NeedRestart, nil
}

func (l *launcher) getDesiredVersion(ctx context.Context, address, projectID, pipedID string, pipedKey []byte, logger *zap.Logger) (string, error) {
err := l.initClient(ctx, address, projectID, pipedID, pipedKey, logger)
if err != nil {
return "", err
}

resp, err := l.client.GetDesiredVersion(ctx, &pipedservice.GetDesiredVersionRequest{})
if err != nil {
return "", err
Expand Down
19 changes: 18 additions & 1 deletion pkg/app/server/grpcapi/piped_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type pipedApiDeploymentChainStore interface {

type pipedApiPipedStore interface {
Get(ctx context.Context, id string) (*model.Piped, error)
UpdateMetadata(ctx context.Context, id, version, config string, cps []*model.Piped_CloudProvider, repos []*model.ApplicationGitRepository, se *model.Piped_SecretEncryption, startedAt int64) error
UpdateMetadata(ctx context.Context, id, version, config string, cps []*model.Piped_CloudProvider, repos []*model.ApplicationGitRepository, se *model.Piped_SecretEncryption, needRestart bool, startedAt int64) error
}

type pipedApiEventStore interface {
Expand Down Expand Up @@ -179,6 +179,7 @@ func (a *PipedAPI) ReportPipedMeta(ctx context.Context, req *pipedservice.Report
req.CloudProviders,
req.Repositories,
req.SecretEncryption,
false,
now,
); err != nil {
return nil, gRPCEntityOperationError(err, fmt.Sprintf("update metadata of piped %s", pipedID))
Expand Down Expand Up @@ -864,6 +865,22 @@ func (a *PipedAPI) PutLatestAnalysisResult(ctx context.Context, req *pipedservic
return &pipedservice.PutLatestAnalysisResultResponse{}, nil
}

func (a *PipedAPI) GetNeedRestart(ctx context.Context, _ *pipedservice.GetNeedRestartRequest) (*pipedservice.GetNeedRestartResponse, error) {
_, pipedID, _, err := rpcauth.ExtractPipedToken(ctx)
if err != nil {
return nil, err
}

piped, err := getPiped(ctx, a.pipedStore, pipedID, a.logger)
if err != nil {
return nil, err
}

return &pipedservice.GetNeedRestartResponse{
NeedRestart: piped.NeedRestart,
}, nil
}

func (a *PipedAPI) GetDesiredVersion(ctx context.Context, _ *pipedservice.GetDesiredVersionRequest) (*pipedservice.GetDesiredVersionResponse, error) {
_, pipedID, _, err := rpcauth.ExtractPipedToken(ctx)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/app/server/grpcapi/web_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type webApiPipedStore interface {
AddKey(ctx context.Context, id, keyHash, creator string, createdAt time.Time) error
DeleteOldKeys(ctx context.Context, id string) error
UpdateInfo(ctx context.Context, id, name, desc string) error
RestartPiped(ctx context.Context, id string) error
EnablePiped(ctx context.Context, id string) error
DisablePiped(ctx context.Context, id string) error
UpdateDesiredVersion(ctx context.Context, id, version string) error
Expand Down Expand Up @@ -258,6 +259,13 @@ func (a *WebAPI) DeleteOldPipedKeys(ctx context.Context, req *webservice.DeleteO
return &webservice.DeleteOldPipedKeysResponse{}, nil
}

func (a *WebAPI) RestartPiped(ctx context.Context, req *webservice.RestartPipedRequest) (*webservice.RestartPipedResponse, error) {
if err := a.updatePiped(ctx, req.PipedId, a.pipedStore.RestartPiped); err != nil {
return nil, err
}
return &webservice.RestartPipedResponse{}, nil
}

func (a *WebAPI) EnablePiped(ctx context.Context, req *webservice.EnablePipedRequest) (*webservice.EnablePipedResponse, error) {
if err := a.updatePiped(ctx, req.PipedId, a.pipedStore.EnablePiped); err != nil {
return nil, err
Expand Down
Loading