diff --git a/pkg/app/piped/eventwatcher/eventwatcher.go b/pkg/app/piped/eventwatcher/eventwatcher.go index 7627f92d40..40f79e85ed 100644 --- a/pkg/app/piped/eventwatcher/eventwatcher.go +++ b/pkg/app/piped/eventwatcher/eventwatcher.go @@ -68,6 +68,7 @@ type gitClient interface { type apiClient interface { GetLatestEvent(ctx context.Context, req *pipedservice.GetLatestEventRequest, opts ...grpc.CallOption) (*pipedservice.GetLatestEventResponse, error) ReportEventStatuses(ctx context.Context, req *pipedservice.ReportEventStatusesRequest, opts ...grpc.CallOption) (*pipedservice.ReportEventStatusesResponse, error) + ListApplications(ctx context.Context, in *pipedservice.ListApplicationsRequest, opts ...grpc.CallOption) (*pipedservice.ListApplicationsResponse, error) } type watcher struct { @@ -83,6 +84,13 @@ type watcher struct { // Maximum timestamp of the last Event read. // A map from repo-id to the UNIX timestamp that has been read. milestoneMap sync.Map + // Cache for the last scanned commit and event watcher configs for each application. + lastScannedConfig sync.Map +} + +type eventWatcherCache struct { + HeadCommit string + Configs []config.EventWatcherConfig } func NewWatcher(cfg *config.PipedSpec, eventLister eventLister, gitClient gitClient, apiClient apiClient, logger *zap.Logger) Watcher { @@ -107,20 +115,15 @@ func (w *watcher) Run(ctx context.Context) error { defer os.RemoveAll(workingDir) w.workingDir = workingDir - repoCfgs := w.config.GetRepositoryMap() - for _, r := range w.config.EventWatcher.GitRepos { - cfg, ok := repoCfgs[r.RepoID] - if !ok { - return fmt.Errorf("repo id %q doesn't exist in the Piped repositories list", r.RepoID) - } - repo, err := w.cloneRepo(ctx, cfg) + for _, r := range w.config.Repositories { + repo, err := w.cloneRepo(ctx, r) if err != nil { return err } defer repo.Clean() w.wg.Add(1) - go w.run(ctx, repo, cfg) + go w.run(ctx, repo, r) } w.wg.Wait() @@ -184,23 +187,84 @@ func (w *watcher) run(ctx context.Context, repo git.Repo, repoCfg config.PipedRe } continue } - cfg, err := config.LoadEventWatcher(repo.GetPath(), includedCfgs, excludedCfgs) - if errors.Is(err, config.ErrNotFound) { - w.logger.Info("configuration file for Event Watcher not found", + headCommit, err := repo.GetLatestCommit(ctx) + if err != nil { + w.logger.Error("failed to get latest commit", zap.String("repo-id", repoCfg.RepoID), zap.Error(err), ) continue } - if err != nil { + // Check whether the config file exists in .pipe/ or not and updates values if it exists. + // NOTE: This was deprecated and will be deleted in the future. + cfg, err := config.LoadEventWatcher(repo.GetPath(), includedCfgs, excludedCfgs) + if !errors.Is(err, config.ErrNotFound) && err != nil { w.logger.Error("failed to load configuration file for Event Watcher", zap.String("repo-id", repoCfg.RepoID), zap.Error(err), ) continue } - if err := w.updateValues(ctx, repo, repoCfg.RepoID, cfg.Events, commitMsg); err != nil { - w.logger.Error("failed to update the values", + if errors.Is(err, config.ErrNotFound) { + w.logger.Info("there was no config file for Event Watcher in .pipe directory", + zap.String("repo-id", repoCfg.RepoID), + zap.Error(err), + ) + } else { + if err := w.updateValues(ctx, repo, repoCfg.RepoID, cfg.Events, commitMsg); err != nil { + w.logger.Error("failed to update the values", + zap.String("repo-id", repoCfg.RepoID), + zap.Error(err), + ) + } + } + // If event watcher config exist in the application config file, they are handled. + resp, err := w.apiClient.ListApplications(ctx, &pipedservice.ListApplicationsRequest{}) + if err != nil { + w.logger.Error("failed to list registered application", zap.Error(err)) + continue + } + cfgs := make([]config.EventWatcherConfig, 0, len(resp.Applications)) + for _, app := range resp.Applications { + if app.GitPath.Repo.Id != repoCfg.RepoID { + continue + } + // Return a last scanned application if there is no new commit pushed from last scanned time for this application. + if v, ok := w.lastScannedConfig.Load(app.Id); ok { + c := v.(eventWatcherCache) + if c.HeadCommit == headCommit.Hash { + cfgs = append(cfgs, c.Configs...) + continue + } + } + + appCfg, err := config.LoadApplication(repo.GetPath(), app.GitPath.GetApplicationConfigFilePath(), app.Kind) + if err != nil { + w.logger.Error("failed to load application configuration", zap.Error(err)) + continue + } + + // Save as a cache regardless of whether the event watcher configuration exists or not in an application configuration. + cache := &eventWatcherCache{ + HeadCommit: headCommit.Hash, + Configs: appCfg.EventWatcher, + } + w.lastScannedConfig.Store(app.Id, cache) + + if appCfg.EventWatcher == nil { + continue + } + + cfgs = append(cfgs, appCfg.EventWatcher...) + } + if len(cfgs) == 0 { + w.logger.Info("configuration for Event Watcher in application configuration not found", + zap.String("repo-id", repoCfg.RepoID), + ) + continue + } + if err := w.execute(ctx, repo, repoCfg.RepoID, cfgs); err != nil { + w.logger.Error("failed to execute the event from application configuration", zap.String("repo-id", repoCfg.RepoID), zap.Error(err), ) @@ -222,7 +286,159 @@ func (w *watcher) cloneRepo(ctx context.Context, repoCfg config.PipedRepository) return repo, nil } +// execute inspects all Event-definition and handles the events per EventWatcherHandlerType if there are. +func (w *watcher) execute(ctx context.Context, repo git.Repo, repoID string, eventCfgs []config.EventWatcherConfig) error { + // Copy the repo to another directory to modify local file to avoid reverting previous changes. + tmpDir, err := os.MkdirTemp(w.workingDir, "repo") + if err != nil { + return fmt.Errorf("failed to create a new temporary directory: %w", err) + } + tmpRepo, err := repo.Copy(filepath.Join(tmpDir, "tmp-repo")) + if err != nil { + return fmt.Errorf("failed to copy the repository to the temporary directory: %w", err) + } + // nolint: errcheck + defer tmpRepo.Clean() + + var milestone int64 + firstRead := true + if v, ok := w.milestoneMap.Load(repoID); ok { + milestone = v.(int64) + firstRead = false + } + var ( + handledEvents = make([]*pipedservice.ReportEventStatusesRequest_Event, 0, len(eventCfgs)) + outDatedEvents = make([]*pipedservice.ReportEventStatusesRequest_Event, 0) + maxTimestamp int64 + outDatedDuration = time.Hour + gitUpdateEvent = false + ) + for _, e := range eventCfgs { + var ( + matcher = e.Matcher + handler = e.Handler + ) + notHandledEvents := w.eventLister.ListNotHandled(matcher.Name, matcher.Labels, milestone+1, numToMakeOutdated) + if len(notHandledEvents) == 0 { + continue + } + if len(notHandledEvents) > 1 { + // Events other than the latest will be OUTDATED. + for _, e := range notHandledEvents[1:] { + outDatedEvents = append(outDatedEvents, &pipedservice.ReportEventStatusesRequest_Event{ + Id: e.Id, + Status: model.EventStatus_EVENT_OUTDATED, + StatusDescription: fmt.Sprintf("The new event %q has been created", notHandledEvents[0].Id), + }) + } + } + + latestEvent := notHandledEvents[0] + if firstRead { + resp, err := w.apiClient.GetLatestEvent(ctx, &pipedservice.GetLatestEventRequest{ + Name: matcher.Name, + Labels: matcher.Labels, + }) + if err != nil { + return fmt.Errorf("failed to get the latest event: %w", err) + } + // The case where the latest event has already been handled. + if resp.Event.CreatedAt > latestEvent.CreatedAt { + outDatedEvents = append(outDatedEvents, &pipedservice.ReportEventStatusesRequest_Event{ + Id: notHandledEvents[0].Id, + Status: model.EventStatus_EVENT_OUTDATED, + StatusDescription: fmt.Sprintf("The new event %q has been created", resp.Event.Id), + }) + continue + } + } + if time.Since(time.Unix(latestEvent.CreatedAt, 0)) > outDatedDuration { + outDatedEvents = append(outDatedEvents, &pipedservice.ReportEventStatusesRequest_Event{ + Id: latestEvent.Id, + Status: model.EventStatus_EVENT_OUTDATED, + StatusDescription: fmt.Sprintf("Too much time has passed since the event %q was created", latestEvent.Id), + }) + continue + } + + switch handler.Type { + case config.EventWatcherHandlerTypeGitUpdate: + if err := w.commitFiles(ctx, latestEvent.Data, matcher.Name, handler.Config.CommitMessage, handler.Config.Replacements, tmpRepo); err != nil { + w.logger.Error("failed to commit outdated files", zap.Error(err)) + handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{ + Id: latestEvent.Id, + Status: model.EventStatus_EVENT_FAILURE, + StatusDescription: fmt.Sprintf("Failed to change files: %v", err), + }) + continue + } + handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{ + Id: latestEvent.Id, + Status: model.EventStatus_EVENT_SUCCESS, + StatusDescription: fmt.Sprintf("Successfully updated %d files in the %q repository", len(handler.Config.Replacements), repoID), + }) + if latestEvent.CreatedAt > maxTimestamp { + maxTimestamp = latestEvent.CreatedAt + } + gitUpdateEvent = true + default: + w.logger.Error(fmt.Sprintf("event watcher handler type %s is not supported yet", handler.Type), + zap.String("event-name", latestEvent.Name), + zap.String("event-id", latestEvent.Id), + ) + continue + } + } + if len(outDatedEvents) > 0 { + if _, err := w.apiClient.ReportEventStatuses(ctx, &pipedservice.ReportEventStatusesRequest{Events: outDatedEvents}); err != nil { + return fmt.Errorf("failed to report event statuses: %w", err) + } + w.logger.Info(fmt.Sprintf("successfully made %d events OUTDATED", len(outDatedEvents))) + } + if len(handledEvents) == 0 { + return nil + } + + if !gitUpdateEvent { + return nil + } + + retry := backoff.NewRetry(retryPushNum, backoff.NewConstant(retryPushInterval)) + _, err = retry.Do(ctx, func() (interface{}, error) { + err := tmpRepo.Push(ctx, tmpRepo.GetClonedBranch()) + return nil, err + }) + if err == nil { + if _, err := w.apiClient.ReportEventStatuses(ctx, &pipedservice.ReportEventStatusesRequest{Events: handledEvents}); err != nil { + return fmt.Errorf("failed to report event statuses: %w", err) + } + w.milestoneMap.Store(repoID, maxTimestamp) + return nil + } + + // If push fails because the local branch was not fresh, exit to retry again in the next interval. + if err == git.ErrBranchNotFresh { + w.logger.Warn("failed to push commits", zap.Error(err)) + return nil + } + + // If push fails because of the other reason, re-set all statuses to FAILURE. + for i := range handledEvents { + if handledEvents[i].Status == model.EventStatus_EVENT_FAILURE { + continue + } + handledEvents[i].Status = model.EventStatus_EVENT_FAILURE + handledEvents[i].StatusDescription = fmt.Sprintf("Failed to push changed files: %v", err) + } + if _, err := w.apiClient.ReportEventStatuses(ctx, &pipedservice.ReportEventStatusesRequest{Events: handledEvents}); err != nil { + return fmt.Errorf("failed to report event statuses: %w", err) + } + w.milestoneMap.Store(repoID, maxTimestamp) + return fmt.Errorf("failed to push commits: %w", err) +} + // updateValues inspects all Event-definition and pushes the changes to git repo if there is. +// NOTE: This will be removed. func (w *watcher) updateValues(ctx context.Context, repo git.Repo, repoID string, eventCfgs []config.EventWatcherEvent, commitMsg string) error { // Copy the repo to another directory to modify local file to avoid reverting previous changes. tmpDir, err := os.MkdirTemp(w.workingDir, "repo") @@ -290,7 +506,7 @@ func (w *watcher) updateValues(ctx context.Context, repo git.Repo, repoID string }) continue } - if err := w.commitFiles(ctx, latestEvent.Data, e, tmpRepo, commitMsg); err != nil { + if err := w.commitFiles(ctx, latestEvent.Data, e.Name, commitMsg, e.Replacements, tmpRepo); err != nil { w.logger.Error("failed to commit outdated files", zap.Error(err)) handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{ Id: latestEvent.Id, @@ -353,10 +569,10 @@ func (w *watcher) updateValues(ctx context.Context, repo git.Repo, repoID string } // commitFiles commits changes if the data in Git is different from the latest event. -func (w *watcher) commitFiles(ctx context.Context, latestData string, eventCfg config.EventWatcherEvent, repo git.Repo, commitMsg string) error { +func (w *watcher) commitFiles(ctx context.Context, latestData, eventName, commitMsg string, replacements []config.EventWatcherReplacement, repo git.Repo) error { // Determine files to be changed by comparing with the latest event. - changes := make(map[string][]byte, len(eventCfg.Replacements)) - for _, r := range eventCfg.Replacements { + changes := make(map[string][]byte, len(replacements)) + for _, r := range replacements { var ( path = filepath.Join(repo.GetPath(), r.File) newContent []byte @@ -390,12 +606,12 @@ func (w *watcher) commitFiles(ctx context.Context, latestData string, eventCfg c } if commitMsg == "" { - commitMsg = fmt.Sprintf(defaultCommitMessageFormat, latestData, eventCfg.Name) + commitMsg = fmt.Sprintf(defaultCommitMessageFormat, latestData, eventName) } if err := repo.CommitChanges(ctx, repo.GetClonedBranch(), commitMsg, false, changes); err != nil { return fmt.Errorf("failed to perform git commit: %w", err) } - w.logger.Info(fmt.Sprintf("event watcher will update values of Event %q", eventCfg.Name)) + w.logger.Info(fmt.Sprintf("event watcher will update values of Event %q", eventName)) return nil } diff --git a/pkg/config/application.go b/pkg/config/application.go index 9ff91eb075..3e967e94e4 100644 --- a/pkg/config/application.go +++ b/pkg/config/application.go @@ -66,6 +66,8 @@ type GenericApplicationSpec struct { // The list of sealed secrets that should be decrypted. // Deprecated. SealedSecrets []SealedSecretMapping `json:"sealedSecrets"` + // List of the configuration for event watcher. + EventWatcher []EventWatcherConfig `json:"eventWatcher"` } type DeploymentPlanner struct { diff --git a/pkg/config/event_watcher.go b/pkg/config/event_watcher.go index d8250557f8..427ec49ac5 100644 --- a/pkg/config/event_watcher.go +++ b/pkg/config/event_watcher.go @@ -38,6 +38,36 @@ type EventWatcherEvent struct { Replacements []EventWatcherReplacement `json:"replacements"` } +type EventWatcherConfig struct { + // Matcher represents which event will be handled. + Matcher EventWatcherMatcher `json:"matcher"` + // Handler represents how the matched event will be handled. + Handler EventWatcherHandler `json:"handler"` +} + +type EventWatcherMatcher struct { + // The handled event name. + Name string `json:"name"` + // Additional attributes of event. This can make an event definition + // unique even if the one with the same name exists. + Labels map[string]string `json:"labels"` +} + +type EventWatcherHandler struct { + // The handler type of event watcher. + Type EventWatcherHandlerType `json:"type,omitempty"` + // The config for the event watcher handler. + Config EventWatcherHandlerConfig `json:"config"` +} + +type EventWatcherHandlerConfig struct { + // The commit message used to push after replacing values. + // Default message is used if not given. + CommitMessage string `json:"commitMessage,omitempty"` + // List of places where will be replaced when the new event matches. + Replacements []EventWatcherReplacement `json:"replacements"` +} + type EventWatcherReplacement struct { // The path to the file to be updated. File string `json:"file"` @@ -56,6 +86,14 @@ type EventWatcherReplacement struct { Regex string `json:"regex"` } +// EventWatcherHandlerType represents the type of an event watcher handler. +type EventWatcherHandlerType string + +const ( + // EventWatcherHandlerTypeGitUpdate represents the handler type for git updating. + EventWatcherHandlerTypeGitUpdate = "GIT_UPDATE" +) + // LoadEventWatcher gives back parsed EventWatcher config after merging config files placed under // the .pipe directory. With "includes" and "excludes", you can filter the files included the result. // "excludes" are prioritized if both "excludes" and "includes" are given. ErrNotFound is returned if not found.