diff --git a/pkg/app/piped/eventwatcher/eventwatcher.go b/pkg/app/piped/eventwatcher/eventwatcher.go index 40f79e85ed..279d17e689 100644 --- a/pkg/app/piped/eventwatcher/eventwatcher.go +++ b/pkg/app/piped/eventwatcher/eventwatcher.go @@ -90,9 +90,15 @@ type watcher struct { type eventWatcherCache struct { HeadCommit string + GitPath string Configs []config.EventWatcherConfig } +type eventWatcherConfig struct { + GitPath string + Configs []config.EventWatcherConfig +} + func NewWatcher(cfg *config.PipedSpec, eventLister eventLister, gitClient gitClient, apiClient apiClient, logger *zap.Logger) Watcher { return &watcher{ config: cfg, @@ -224,7 +230,7 @@ func (w *watcher) run(ctx context.Context, repo git.Repo, repoCfg config.PipedRe w.logger.Error("failed to list registered application", zap.Error(err)) continue } - cfgs := make([]config.EventWatcherConfig, 0, len(resp.Applications)) + cfgs := make([]eventWatcherConfig, 0, len(resp.Applications)) for _, app := range resp.Applications { if app.GitPath.Repo.Id != repoCfg.RepoID { continue @@ -233,7 +239,11 @@ func (w *watcher) run(ctx context.Context, repo git.Repo, repoCfg config.PipedRe if v, ok := w.lastScannedConfig.Load(app.Id); ok { c := v.(eventWatcherCache) if c.HeadCommit == headCommit.Hash { - cfgs = append(cfgs, c.Configs...) + ew := eventWatcherConfig{ + GitPath: c.GitPath, + Configs: c.Configs, + } + cfgs = append(cfgs, ew) continue } } @@ -247,6 +257,7 @@ func (w *watcher) run(ctx context.Context, repo git.Repo, repoCfg config.PipedRe // Save as a cache regardless of whether the event watcher configuration exists or not in an application configuration. cache := &eventWatcherCache{ HeadCommit: headCommit.Hash, + GitPath: app.GitPath.Path, Configs: appCfg.EventWatcher, } w.lastScannedConfig.Store(app.Id, cache) @@ -255,7 +266,11 @@ func (w *watcher) run(ctx context.Context, repo git.Repo, repoCfg config.PipedRe continue } - cfgs = append(cfgs, appCfg.EventWatcher...) + ew := eventWatcherConfig{ + GitPath: app.GitPath.Path, + Configs: appCfg.EventWatcher, + } + cfgs = append(cfgs, ew) } if len(cfgs) == 0 { w.logger.Info("configuration for Event Watcher in application configuration not found", @@ -287,7 +302,7 @@ func (w *watcher) cloneRepo(ctx context.Context, repoCfg config.PipedRepository) } // execute inspects all Event-definition and handles the events per EventWatcherHandlerType if there are. -func (w *watcher) execute(ctx context.Context, repo git.Repo, repoID string, eventCfgs []config.EventWatcherConfig) error { +func (w *watcher) execute(ctx context.Context, repo git.Repo, repoID string, eventCfgs []eventWatcherConfig) error { // Copy the repo to another directory to modify local file to avoid reverting previous changes. tmpDir, err := os.MkdirTemp(w.workingDir, "repo") if err != nil { @@ -314,79 +329,81 @@ func (w *watcher) execute(ctx context.Context, repo git.Repo, repoID string, eve gitUpdateEvent = false ) for _, e := range eventCfgs { - var ( - matcher = e.Matcher - handler = e.Handler - ) - notHandledEvents := w.eventLister.ListNotHandled(matcher.Name, matcher.Labels, milestone+1, numToMakeOutdated) - if len(notHandledEvents) == 0 { - continue - } - if len(notHandledEvents) > 1 { - // Events other than the latest will be OUTDATED. - for _, e := range notHandledEvents[1:] { - outDatedEvents = append(outDatedEvents, &pipedservice.ReportEventStatusesRequest_Event{ - Id: e.Id, - Status: model.EventStatus_EVENT_OUTDATED, - StatusDescription: fmt.Sprintf("The new event %q has been created", notHandledEvents[0].Id), - }) + for _, cfg := range e.Configs { + var ( + matcher = cfg.Matcher + handler = cfg.Handler + ) + notHandledEvents := w.eventLister.ListNotHandled(matcher.Name, matcher.Labels, milestone+1, numToMakeOutdated) + if len(notHandledEvents) == 0 { + continue + } + if len(notHandledEvents) > 1 { + // Events other than the latest will be OUTDATED. + for _, e := range notHandledEvents[1:] { + outDatedEvents = append(outDatedEvents, &pipedservice.ReportEventStatusesRequest_Event{ + Id: e.Id, + Status: model.EventStatus_EVENT_OUTDATED, + StatusDescription: fmt.Sprintf("The new event %q has been created", notHandledEvents[0].Id), + }) + } } - } - latestEvent := notHandledEvents[0] - if firstRead { - resp, err := w.apiClient.GetLatestEvent(ctx, &pipedservice.GetLatestEventRequest{ - Name: matcher.Name, - Labels: matcher.Labels, - }) - if err != nil { - return fmt.Errorf("failed to get the latest event: %w", err) + latestEvent := notHandledEvents[0] + if firstRead { + resp, err := w.apiClient.GetLatestEvent(ctx, &pipedservice.GetLatestEventRequest{ + Name: matcher.Name, + Labels: matcher.Labels, + }) + if err != nil { + return fmt.Errorf("failed to get the latest event: %w", err) + } + // The case where the latest event has already been handled. + if resp.Event.CreatedAt > latestEvent.CreatedAt { + outDatedEvents = append(outDatedEvents, &pipedservice.ReportEventStatusesRequest_Event{ + Id: notHandledEvents[0].Id, + Status: model.EventStatus_EVENT_OUTDATED, + StatusDescription: fmt.Sprintf("The new event %q has been created", resp.Event.Id), + }) + continue + } } - // The case where the latest event has already been handled. - if resp.Event.CreatedAt > latestEvent.CreatedAt { + if time.Since(time.Unix(latestEvent.CreatedAt, 0)) > outDatedDuration { outDatedEvents = append(outDatedEvents, &pipedservice.ReportEventStatusesRequest_Event{ - Id: notHandledEvents[0].Id, + Id: latestEvent.Id, Status: model.EventStatus_EVENT_OUTDATED, - StatusDescription: fmt.Sprintf("The new event %q has been created", resp.Event.Id), + StatusDescription: fmt.Sprintf("Too much time has passed since the event %q was created", latestEvent.Id), }) continue } - } - if time.Since(time.Unix(latestEvent.CreatedAt, 0)) > outDatedDuration { - outDatedEvents = append(outDatedEvents, &pipedservice.ReportEventStatusesRequest_Event{ - Id: latestEvent.Id, - Status: model.EventStatus_EVENT_OUTDATED, - StatusDescription: fmt.Sprintf("Too much time has passed since the event %q was created", latestEvent.Id), - }) - continue - } - switch handler.Type { - case config.EventWatcherHandlerTypeGitUpdate: - if err := w.commitFiles(ctx, latestEvent.Data, matcher.Name, handler.Config.CommitMessage, handler.Config.Replacements, tmpRepo); err != nil { - w.logger.Error("failed to commit outdated files", zap.Error(err)) + switch handler.Type { + case config.EventWatcherHandlerTypeGitUpdate: + if err := w.commitFiles(ctx, latestEvent.Data, matcher.Name, handler.Config.CommitMessage, e.GitPath, handler.Config.Replacements, tmpRepo); err != nil { + w.logger.Error("failed to commit outdated files", zap.Error(err)) + handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{ + Id: latestEvent.Id, + Status: model.EventStatus_EVENT_FAILURE, + StatusDescription: fmt.Sprintf("Failed to change files: %v", err), + }) + continue + } handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{ Id: latestEvent.Id, - Status: model.EventStatus_EVENT_FAILURE, - StatusDescription: fmt.Sprintf("Failed to change files: %v", err), + Status: model.EventStatus_EVENT_SUCCESS, + StatusDescription: fmt.Sprintf("Successfully updated %d files in the %q repository", len(handler.Config.Replacements), repoID), }) + if latestEvent.CreatedAt > maxTimestamp { + maxTimestamp = latestEvent.CreatedAt + } + gitUpdateEvent = true + default: + w.logger.Error(fmt.Sprintf("event watcher handler type %s is not supported yet", handler.Type), + zap.String("event-name", latestEvent.Name), + zap.String("event-id", latestEvent.Id), + ) continue } - handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{ - Id: latestEvent.Id, - Status: model.EventStatus_EVENT_SUCCESS, - StatusDescription: fmt.Sprintf("Successfully updated %d files in the %q repository", len(handler.Config.Replacements), repoID), - }) - if latestEvent.CreatedAt > maxTimestamp { - maxTimestamp = latestEvent.CreatedAt - } - gitUpdateEvent = true - default: - w.logger.Error(fmt.Sprintf("event watcher handler type %s is not supported yet", handler.Type), - zap.String("event-name", latestEvent.Name), - zap.String("event-id", latestEvent.Id), - ) - continue } } if len(outDatedEvents) > 0 { @@ -506,7 +523,7 @@ func (w *watcher) updateValues(ctx context.Context, repo git.Repo, repoID string }) continue } - if err := w.commitFiles(ctx, latestEvent.Data, e.Name, commitMsg, e.Replacements, tmpRepo); err != nil { + if err := w.commitFiles(ctx, latestEvent.Data, e.Name, commitMsg, "", e.Replacements, tmpRepo); err != nil { w.logger.Error("failed to commit outdated files", zap.Error(err)) handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{ Id: latestEvent.Id, @@ -569,16 +586,21 @@ func (w *watcher) updateValues(ctx context.Context, repo git.Repo, repoID string } // commitFiles commits changes if the data in Git is different from the latest event. -func (w *watcher) commitFiles(ctx context.Context, latestData, eventName, commitMsg string, replacements []config.EventWatcherReplacement, repo git.Repo) error { +func (w *watcher) commitFiles(ctx context.Context, latestData, eventName, commitMsg, gitPath string, replacements []config.EventWatcherReplacement, repo git.Repo) error { // Determine files to be changed by comparing with the latest event. changes := make(map[string][]byte, len(replacements)) for _, r := range replacements { var ( - path = filepath.Join(repo.GetPath(), r.File) newContent []byte upToDate bool err error ) + + filePath := r.File + if gitPath != "" { + filePath = fmt.Sprintf("%s/%s", gitPath, r.File) + } + path := filepath.Join(repo.GetPath(), filePath) switch { case r.YAMLField != "": newContent, upToDate, err = modifyYAML(path, r.YAMLField, latestData)