Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 88 additions & 66 deletions pkg/app/piped/eventwatcher/eventwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,15 @@ type watcher struct {

type eventWatcherCache struct {
HeadCommit string
GitPath string
Configs []config.EventWatcherConfig
}

type eventWatcherConfig struct {
GitPath string
Configs []config.EventWatcherConfig
}

func NewWatcher(cfg *config.PipedSpec, eventLister eventLister, gitClient gitClient, apiClient apiClient, logger *zap.Logger) Watcher {
return &watcher{
config: cfg,
Expand Down Expand Up @@ -224,7 +230,7 @@ func (w *watcher) run(ctx context.Context, repo git.Repo, repoCfg config.PipedRe
w.logger.Error("failed to list registered application", zap.Error(err))
continue
}
cfgs := make([]config.EventWatcherConfig, 0, len(resp.Applications))
cfgs := make([]eventWatcherConfig, 0, len(resp.Applications))
for _, app := range resp.Applications {
if app.GitPath.Repo.Id != repoCfg.RepoID {
continue
Expand All @@ -233,7 +239,11 @@ func (w *watcher) run(ctx context.Context, repo git.Repo, repoCfg config.PipedRe
if v, ok := w.lastScannedConfig.Load(app.Id); ok {
c := v.(eventWatcherCache)
if c.HeadCommit == headCommit.Hash {
cfgs = append(cfgs, c.Configs...)
ew := eventWatcherConfig{
GitPath: c.GitPath,
Configs: c.Configs,
}
cfgs = append(cfgs, ew)
continue
}
}
Expand All @@ -247,6 +257,7 @@ func (w *watcher) run(ctx context.Context, repo git.Repo, repoCfg config.PipedRe
// Save as a cache regardless of whether the event watcher configuration exists or not in an application configuration.
cache := &eventWatcherCache{
HeadCommit: headCommit.Hash,
GitPath: app.GitPath.Path,
Configs: appCfg.EventWatcher,
}
w.lastScannedConfig.Store(app.Id, cache)
Expand All @@ -255,7 +266,11 @@ func (w *watcher) run(ctx context.Context, repo git.Repo, repoCfg config.PipedRe
continue
}

cfgs = append(cfgs, appCfg.EventWatcher...)
ew := eventWatcherConfig{
GitPath: app.GitPath.Path,
Configs: appCfg.EventWatcher,
}
cfgs = append(cfgs, ew)
}
if len(cfgs) == 0 {
w.logger.Info("configuration for Event Watcher in application configuration not found",
Expand Down Expand Up @@ -287,7 +302,7 @@ func (w *watcher) cloneRepo(ctx context.Context, repoCfg config.PipedRepository)
}

// execute inspects all Event-definition and handles the events per EventWatcherHandlerType if there are.
func (w *watcher) execute(ctx context.Context, repo git.Repo, repoID string, eventCfgs []config.EventWatcherConfig) error {
func (w *watcher) execute(ctx context.Context, repo git.Repo, repoID string, eventCfgs []eventWatcherConfig) error {
// Copy the repo to another directory to modify local file to avoid reverting previous changes.
tmpDir, err := os.MkdirTemp(w.workingDir, "repo")
if err != nil {
Expand All @@ -314,79 +329,81 @@ func (w *watcher) execute(ctx context.Context, repo git.Repo, repoID string, eve
gitUpdateEvent = false
)
for _, e := range eventCfgs {
var (
matcher = e.Matcher
handler = e.Handler
)
notHandledEvents := w.eventLister.ListNotHandled(matcher.Name, matcher.Labels, milestone+1, numToMakeOutdated)
if len(notHandledEvents) == 0 {
continue
}
if len(notHandledEvents) > 1 {
// Events other than the latest will be OUTDATED.
for _, e := range notHandledEvents[1:] {
outDatedEvents = append(outDatedEvents, &pipedservice.ReportEventStatusesRequest_Event{
Id: e.Id,
Status: model.EventStatus_EVENT_OUTDATED,
StatusDescription: fmt.Sprintf("The new event %q has been created", notHandledEvents[0].Id),
})
for _, cfg := range e.Configs {
var (
matcher = cfg.Matcher
handler = cfg.Handler
)
notHandledEvents := w.eventLister.ListNotHandled(matcher.Name, matcher.Labels, milestone+1, numToMakeOutdated)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I start feeling that this RPC is being called quite frequently.
In the future, maybe we need to add a new RPC to check multiple events simultaneously.

Copy link
Member Author

@knanao knanao Jun 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, but it's not so frequently in the current spec because the count of eventCfgs is equal to the count of apps and the count of e.Configs is almost to be one except for edge cases.
So Let's add a new RPC when we add a new event type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if the project has a fairly large number of applications, the number of the RPC calls is likely to be high.🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. It would be nice if the number of calls did not depend on the number of applications.
But it could be an improvement point.

if len(notHandledEvents) == 0 {
continue
}
if len(notHandledEvents) > 1 {
// Events other than the latest will be OUTDATED.
for _, e := range notHandledEvents[1:] {
outDatedEvents = append(outDatedEvents, &pipedservice.ReportEventStatusesRequest_Event{
Id: e.Id,
Status: model.EventStatus_EVENT_OUTDATED,
StatusDescription: fmt.Sprintf("The new event %q has been created", notHandledEvents[0].Id),
})
}
}
}

latestEvent := notHandledEvents[0]
if firstRead {
resp, err := w.apiClient.GetLatestEvent(ctx, &pipedservice.GetLatestEventRequest{
Name: matcher.Name,
Labels: matcher.Labels,
})
if err != nil {
return fmt.Errorf("failed to get the latest event: %w", err)
latestEvent := notHandledEvents[0]
if firstRead {
resp, err := w.apiClient.GetLatestEvent(ctx, &pipedservice.GetLatestEventRequest{
Name: matcher.Name,
Labels: matcher.Labels,
})
if err != nil {
return fmt.Errorf("failed to get the latest event: %w", err)
}
// The case where the latest event has already been handled.
if resp.Event.CreatedAt > latestEvent.CreatedAt {
outDatedEvents = append(outDatedEvents, &pipedservice.ReportEventStatusesRequest_Event{
Id: notHandledEvents[0].Id,
Status: model.EventStatus_EVENT_OUTDATED,
StatusDescription: fmt.Sprintf("The new event %q has been created", resp.Event.Id),
})
continue
}
}
// The case where the latest event has already been handled.
if resp.Event.CreatedAt > latestEvent.CreatedAt {
if time.Since(time.Unix(latestEvent.CreatedAt, 0)) > outDatedDuration {
outDatedEvents = append(outDatedEvents, &pipedservice.ReportEventStatusesRequest_Event{
Id: notHandledEvents[0].Id,
Id: latestEvent.Id,
Status: model.EventStatus_EVENT_OUTDATED,
StatusDescription: fmt.Sprintf("The new event %q has been created", resp.Event.Id),
StatusDescription: fmt.Sprintf("Too much time has passed since the event %q was created", latestEvent.Id),
})
continue
}
}
if time.Since(time.Unix(latestEvent.CreatedAt, 0)) > outDatedDuration {
outDatedEvents = append(outDatedEvents, &pipedservice.ReportEventStatusesRequest_Event{
Id: latestEvent.Id,
Status: model.EventStatus_EVENT_OUTDATED,
StatusDescription: fmt.Sprintf("Too much time has passed since the event %q was created", latestEvent.Id),
})
continue
}

switch handler.Type {
case config.EventWatcherHandlerTypeGitUpdate:
if err := w.commitFiles(ctx, latestEvent.Data, matcher.Name, handler.Config.CommitMessage, handler.Config.Replacements, tmpRepo); err != nil {
w.logger.Error("failed to commit outdated files", zap.Error(err))
switch handler.Type {
case config.EventWatcherHandlerTypeGitUpdate:
if err := w.commitFiles(ctx, latestEvent.Data, matcher.Name, handler.Config.CommitMessage, e.GitPath, handler.Config.Replacements, tmpRepo); err != nil {
w.logger.Error("failed to commit outdated files", zap.Error(err))
handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{
Id: latestEvent.Id,
Status: model.EventStatus_EVENT_FAILURE,
StatusDescription: fmt.Sprintf("Failed to change files: %v", err),
})
continue
}
handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{
Id: latestEvent.Id,
Status: model.EventStatus_EVENT_FAILURE,
StatusDescription: fmt.Sprintf("Failed to change files: %v", err),
Status: model.EventStatus_EVENT_SUCCESS,
StatusDescription: fmt.Sprintf("Successfully updated %d files in the %q repository", len(handler.Config.Replacements), repoID),
})
if latestEvent.CreatedAt > maxTimestamp {
maxTimestamp = latestEvent.CreatedAt
}
gitUpdateEvent = true
default:
w.logger.Error(fmt.Sprintf("event watcher handler type %s is not supported yet", handler.Type),
zap.String("event-name", latestEvent.Name),
zap.String("event-id", latestEvent.Id),
)
continue
}
handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{
Id: latestEvent.Id,
Status: model.EventStatus_EVENT_SUCCESS,
StatusDescription: fmt.Sprintf("Successfully updated %d files in the %q repository", len(handler.Config.Replacements), repoID),
})
if latestEvent.CreatedAt > maxTimestamp {
maxTimestamp = latestEvent.CreatedAt
}
gitUpdateEvent = true
default:
w.logger.Error(fmt.Sprintf("event watcher handler type %s is not supported yet", handler.Type),
zap.String("event-name", latestEvent.Name),
zap.String("event-id", latestEvent.Id),
)
continue
}
}
if len(outDatedEvents) > 0 {
Expand Down Expand Up @@ -506,7 +523,7 @@ func (w *watcher) updateValues(ctx context.Context, repo git.Repo, repoID string
})
continue
}
if err := w.commitFiles(ctx, latestEvent.Data, e.Name, commitMsg, e.Replacements, tmpRepo); err != nil {
if err := w.commitFiles(ctx, latestEvent.Data, e.Name, commitMsg, "", e.Replacements, tmpRepo); err != nil {
w.logger.Error("failed to commit outdated files", zap.Error(err))
handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{
Id: latestEvent.Id,
Expand Down Expand Up @@ -569,16 +586,21 @@ func (w *watcher) updateValues(ctx context.Context, repo git.Repo, repoID string
}

// commitFiles commits changes if the data in Git is different from the latest event.
func (w *watcher) commitFiles(ctx context.Context, latestData, eventName, commitMsg string, replacements []config.EventWatcherReplacement, repo git.Repo) error {
func (w *watcher) commitFiles(ctx context.Context, latestData, eventName, commitMsg, gitPath string, replacements []config.EventWatcherReplacement, repo git.Repo) error {
// Determine files to be changed by comparing with the latest event.
changes := make(map[string][]byte, len(replacements))
for _, r := range replacements {
var (
path = filepath.Join(repo.GetPath(), r.File)
newContent []byte
upToDate bool
err error
)

filePath := r.File
if gitPath != "" {
filePath = fmt.Sprintf("%s/%s", gitPath, r.File)
}
path := filepath.Join(repo.GetPath(), filePath)
switch {
case r.YAMLField != "":
newContent, upToDate, err = modifyYAML(path, r.YAMLField, latestData)
Expand Down