diff --git a/docs/content/en/docs-dev/user-guide/configuration-reference.md b/docs/content/en/docs-dev/user-guide/configuration-reference.md index d911109aee..4450461fa1 100644 --- a/docs/content/en/docs-dev/user-guide/configuration-reference.md +++ b/docs/content/en/docs-dev/user-guide/configuration-reference.md @@ -699,6 +699,7 @@ Note: By default, the sum of traffic is rounded to 100. If both `primary` and `c | Field | Type | Description | Required | |-|-|-|-| | commitMessage | string | The commit message used to push after replacing values. Default message is used if not given. | No | +| makePullRequest | bool | Whether to create a new branch or not when commit changes in event watcher. Default is `false`. | No | | replacements | [][EventWatcherReplacement](#eventwatcherreplacement) | List of places where will be replaced when the new event matches. | Yes | ## DriftDetection diff --git a/pkg/app/piped/eventwatcher/eventwatcher.go b/pkg/app/piped/eventwatcher/eventwatcher.go index d53838e8e3..de9162cc04 100644 --- a/pkg/app/piped/eventwatcher/eventwatcher.go +++ b/pkg/app/piped/eventwatcher/eventwatcher.go @@ -30,6 +30,7 @@ import ( "text/template" "time" + "github.com/google/uuid" "go.uber.org/zap" "google.golang.org/grpc" @@ -329,11 +330,11 @@ func (w *watcher) execute(ctx context.Context, repo git.Repo, repoID string, eve firstRead = false } var ( - handledEvents = make([]*pipedservice.ReportEventStatusesRequest_Event, 0, len(eventCfgs)) - outDatedEvents = make([]*pipedservice.ReportEventStatusesRequest_Event, 0) - maxTimestamp int64 - outDatedDuration = time.Hour - gitUpdateEvent = false + outDatedEvents = make([]*pipedservice.ReportEventStatusesRequest_Event, 0) + maxTimestamp int64 + outDatedDuration = time.Hour + gitUpdateEvent = false + branchHandledEvents = make(map[string][]*pipedservice.ReportEventStatusesRequest_Event, len(eventCfgs)) ) for _, e := range eventCfgs { for _, cfg := range e.Configs { @@ -383,23 +384,25 @@ func (w *watcher) execute(ctx context.Context, repo git.Repo, repoID string, eve }) continue } - switch handler.Type { case config.EventWatcherHandlerTypeGitUpdate: - if err := w.commitFiles(ctx, latestEvent.Data, matcher.Name, handler.Config.CommitMessage, e.GitPath, handler.Config.Replacements, tmpRepo); err != nil { + branchName, err := w.commitFiles(ctx, latestEvent.Data, matcher.Name, handler.Config.CommitMessage, e.GitPath, handler.Config.Replacements, tmpRepo, handler.Config.MakePullRequest) + if err != nil { w.logger.Error("failed to commit outdated files", zap.Error(err)) - handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{ + handledEvent := &pipedservice.ReportEventStatusesRequest_Event{ Id: latestEvent.Id, Status: model.EventStatus_EVENT_FAILURE, StatusDescription: fmt.Sprintf("Failed to change files: %v", err), - }) + } + branchHandledEvents[branchName] = append(branchHandledEvents[branchName], handledEvent) continue } - handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{ + handledEvent := &pipedservice.ReportEventStatusesRequest_Event{ Id: latestEvent.Id, Status: model.EventStatus_EVENT_SUCCESS, StatusDescription: fmt.Sprintf("Successfully updated %d files in the %q repository", len(handler.Config.Replacements), repoID), - }) + } + branchHandledEvents[branchName] = append(branchHandledEvents[branchName], handledEvent) if latestEvent.CreatedAt > maxTimestamp { maxTimestamp = latestEvent.CreatedAt } @@ -419,46 +422,51 @@ func (w *watcher) execute(ctx context.Context, repo git.Repo, repoID string, eve } w.logger.Info(fmt.Sprintf("successfully made %d events OUTDATED", len(outDatedEvents))) } - if len(handledEvents) == 0 { - return nil - } if !gitUpdateEvent { return nil } + var responseError error retry := backoff.NewRetry(retryPushNum, backoff.NewConstant(retryPushInterval)) - _, err = retry.Do(ctx, func() (interface{}, error) { - err := tmpRepo.Push(ctx, tmpRepo.GetClonedBranch()) - return nil, err - }) - if err == nil { - if _, err := w.apiClient.ReportEventStatuses(ctx, &pipedservice.ReportEventStatusesRequest{Events: handledEvents}); err != nil { - return fmt.Errorf("failed to report event statuses: %w", err) - } - w.executionMilestoneMap.Store(repoID, maxTimestamp) - return nil - } + for branch, events := range branchHandledEvents { + _, err = retry.Do(ctx, func() (interface{}, error) { + err := tmpRepo.Push(ctx, branch) + return nil, err + }) - // If push fails because the local branch was not fresh, exit to retry again in the next interval. - if err == git.ErrBranchNotFresh { - w.logger.Warn("failed to push commits", zap.Error(err)) - return nil - } + if err == nil { + if _, err := w.apiClient.ReportEventStatuses(ctx, &pipedservice.ReportEventStatusesRequest{Events: events}); err != nil { + w.logger.Error("failed to report event statuses", zap.Error(err)) + } + w.executionMilestoneMap.Store(repoID, maxTimestamp) + continue + } - // If push fails because of the other reason, re-set all statuses to FAILURE. - for i := range handledEvents { - if handledEvents[i].Status == model.EventStatus_EVENT_FAILURE { + // If push fails because the local branch was not fresh, exit to retry again in the next interval. + if err == git.ErrBranchNotFresh { + w.logger.Warn("failed to push commits", zap.Error(err)) continue } - handledEvents[i].Status = model.EventStatus_EVENT_FAILURE - handledEvents[i].StatusDescription = fmt.Sprintf("Failed to push changed files: %v", err) + + // If push fails because of the other reason, re-set all statuses to FAILURE. + for i := range events { + if events[i].Status == model.EventStatus_EVENT_FAILURE { + continue + } + events[i].Status = model.EventStatus_EVENT_FAILURE + events[i].StatusDescription = fmt.Sprintf("Failed to push changed files: %v", err) + } + if _, err := w.apiClient.ReportEventStatuses(ctx, &pipedservice.ReportEventStatusesRequest{Events: events}); err != nil { + w.logger.Error("failed to report event statuses", zap.Error(err)) + } + w.executionMilestoneMap.Store(repoID, maxTimestamp) + responseError = errors.Join(responseError, err) } - if _, err := w.apiClient.ReportEventStatuses(ctx, &pipedservice.ReportEventStatusesRequest{Events: handledEvents}); err != nil { - return fmt.Errorf("failed to report event statuses: %w", err) + if responseError != nil { + return responseError } - w.executionMilestoneMap.Store(repoID, maxTimestamp) - return fmt.Errorf("failed to push commits: %w", err) + return nil } // updateValues inspects all Event-definition and pushes the changes to git repo if there is. @@ -530,7 +538,8 @@ func (w *watcher) updateValues(ctx context.Context, repo git.Repo, repoID string }) continue } - if err := w.commitFiles(ctx, latestEvent.Data, e.Name, commitMsg, "", e.Replacements, tmpRepo); err != nil { + _, err := w.commitFiles(ctx, latestEvent.Data, e.Name, commitMsg, "", e.Replacements, tmpRepo, false) + if err != nil { w.logger.Error("failed to commit outdated files", zap.Error(err)) handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{ Id: latestEvent.Id, @@ -593,7 +602,7 @@ func (w *watcher) updateValues(ctx context.Context, repo git.Repo, repoID string } // commitFiles commits changes if the data in Git is different from the latest event. -func (w *watcher) commitFiles(ctx context.Context, latestData, eventName, commitMsg, gitPath string, replacements []config.EventWatcherReplacement, repo git.Repo) error { +func (w *watcher) commitFiles(ctx context.Context, latestData, eventName, commitMsg, gitPath string, replacements []config.EventWatcherReplacement, repo git.Repo, newBranch bool) (string, error) { // Determine files to be changed by comparing with the latest event. changes := make(map[string][]byte, len(replacements)) for _, r := range replacements { @@ -619,19 +628,19 @@ func (w *watcher) commitFiles(ctx context.Context, latestData, eventName, commit newContent, upToDate, err = modifyText(path, r.Regex, latestData) } if err != nil { - return err + return "", err } if upToDate { continue } if err := os.WriteFile(path, newContent, os.ModePerm); err != nil { - return fmt.Errorf("failed to write file: %w", err) + return "", fmt.Errorf("failed to write file: %w", err) } changes[filePath] = newContent } if len(changes) == 0 { - return nil + return "", nil } args := argsTemplate{ @@ -639,11 +648,12 @@ func (w *watcher) commitFiles(ctx context.Context, latestData, eventName, commit EventName: eventName, } commitMsg = parseCommitMsg(commitMsg, args) - if err := repo.CommitChanges(ctx, repo.GetClonedBranch(), commitMsg, false, changes); err != nil { - return fmt.Errorf("failed to perform git commit: %w", err) + branch := makeBranchName(newBranch, eventName, repo.GetClonedBranch()) + if err := repo.CommitChanges(ctx, branch, commitMsg, newBranch, changes); err != nil { + return "", fmt.Errorf("failed to perform git commit: %w", err) } w.logger.Info(fmt.Sprintf("event watcher will update values of Event %q", eventName)) - return nil + return branch, nil } // modifyYAML returns a new YAML content as a first returned value if the value of given @@ -777,3 +787,12 @@ func parseCommitMsg(msg string, args argsTemplate) string { } return buf.String() } + +// makeBranchName generates a new branch name in the format {eventName}-{uuid} if newBranch is true. +// If newBranch is false, the function returns the existing branch name. +func makeBranchName(newBranch bool, eventName, branch string) string { + if newBranch { + return fmt.Sprintf("%s-%s", eventName, uuid.New().String()) + } + return branch +} diff --git a/pkg/app/piped/eventwatcher/eventwatcher_test.go b/pkg/app/piped/eventwatcher/eventwatcher_test.go index bb6c0b9f53..aa7750ba72 100644 --- a/pkg/app/piped/eventwatcher/eventwatcher_test.go +++ b/pkg/app/piped/eventwatcher/eventwatcher_test.go @@ -242,3 +242,40 @@ spec: }) } } + +func TestGetBranchName(t *testing.T) { + t.Parallel() + testcases := []struct { + name string + newBranch bool + eventName string + branch string + want string + }{ + { + name: "create new branch", + newBranch: true, + eventName: "event", + branch: "main", + }, + { + name: "return existing branch", + newBranch: false, + eventName: "event", + branch: "main", + want: "main", + }, + } + for _, tc := range testcases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + got := makeBranchName(tc.newBranch, tc.eventName, tc.branch) + if tc.newBranch { + assert.NotEqual(t, tc.branch, got) + } else { + assert.Equal(t, tc.want, got) + } + }) + } +} diff --git a/pkg/config/event_watcher.go b/pkg/config/event_watcher.go index aac4e67e88..45b41c7876 100644 --- a/pkg/config/event_watcher.go +++ b/pkg/config/event_watcher.go @@ -64,6 +64,8 @@ type EventWatcherHandlerConfig struct { // The commit message used to push after replacing values. // Default message is used if not given. CommitMessage string `json:"commitMessage,omitempty"` + // Whether to create a new branch or not when event watcher commits changes. + MakePullRequest bool `json:"makePullRequest,omitempty"` // List of places where will be replaced when the new event matches. Replacements []EventWatcherReplacement `json:"replacements"` }