Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 195 additions & 15 deletions pkg/app/piped/eventwatcher/eventwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type gitClient interface {
type apiClient interface {
GetLatestEvent(ctx context.Context, req *pipedservice.GetLatestEventRequest, opts ...grpc.CallOption) (*pipedservice.GetLatestEventResponse, error)
ReportEventStatuses(ctx context.Context, req *pipedservice.ReportEventStatusesRequest, opts ...grpc.CallOption) (*pipedservice.ReportEventStatusesResponse, error)
ListApplications(ctx context.Context, in *pipedservice.ListApplicationsRequest, opts ...grpc.CallOption) (*pipedservice.ListApplicationsResponse, error)
}

type watcher struct {
Expand Down Expand Up @@ -107,20 +108,15 @@ func (w *watcher) Run(ctx context.Context) error {
defer os.RemoveAll(workingDir)
w.workingDir = workingDir

repoCfgs := w.config.GetRepositoryMap()
for _, r := range w.config.EventWatcher.GitRepos {
cfg, ok := repoCfgs[r.RepoID]
if !ok {
return fmt.Errorf("repo id %q doesn't exist in the Piped repositories list", r.RepoID)
}
repo, err := w.cloneRepo(ctx, cfg)
for _, r := range w.config.Repositories {
repo, err := w.cloneRepo(ctx, r)
if err != nil {
return err
}
defer repo.Clean()

w.wg.Add(1)
go w.run(ctx, repo, cfg)
go w.run(ctx, repo, r)
}

w.wg.Wait()
Expand Down Expand Up @@ -186,10 +182,43 @@ func (w *watcher) run(ctx context.Context, repo git.Repo, repoCfg config.PipedRe
}
cfg, err := config.LoadEventWatcher(repo.GetPath(), includedCfgs, excludedCfgs)
if errors.Is(err, config.ErrNotFound) {
w.logger.Info("configuration file for Event Watcher not found",
w.logger.Info("configuration file for Event Watcher in .pipe/ not found",
zap.String("repo-id", repoCfg.RepoID),
zap.Error(err),
)
resp, err := w.apiClient.ListApplications(ctx, &pipedservice.ListApplicationsRequest{})
if err != nil {
w.logger.Error("failed to list registered application", zap.Error(err))
continue
}
cfgs := make([]*config.GenericApplicationSpec, 0, len(resp.Applications))
for _, app := range resp.Applications {
if app.GitPath.Repo.Id != repoCfg.RepoID {
continue
}
cfg, err := config.LoadApplication(repo.GetPath(), app.GitPath.GetApplicationConfigFilePath(), app.Kind)
if err != nil {
w.logger.Error("failed to load application configuration", zap.Error(err))
continue
}
cfgs = append(cfgs, cfg)
}
for _, cfg := range cfgs {
if cfg.EventWatcher == nil {
w.logger.Info("configuration for Event Watcher in application configuration not found",
zap.String("repo-id", repoCfg.RepoID),
zap.String("app-name", cfg.Name),
)
continue
}
if err := w.execute(ctx, repo, repoCfg.RepoID, cfg.EventWatcher); err != nil {
w.logger.Error("failed to execute the event from application configuration",
zap.String("repo-id", repoCfg.RepoID),
zap.Error(err),
)
continue
}
}
continue
}
if err != nil {
Expand Down Expand Up @@ -222,6 +251,157 @@ func (w *watcher) cloneRepo(ctx context.Context, repoCfg config.PipedRepository)
return repo, nil
}

// execute inspects all Event-definition and handles the events per EventWatcherHandlerType if there are.
func (w *watcher) execute(ctx context.Context, repo git.Repo, repoID string, eventCfgs []config.EventWatcherConfig) error {
// Copy the repo to another directory to modify local file to avoid reverting previous changes.
tmpDir, err := os.MkdirTemp(w.workingDir, "repo")
if err != nil {
return fmt.Errorf("failed to create a new temporary directory: %w", err)
}
tmpRepo, err := repo.Copy(filepath.Join(tmpDir, "tmp-repo"))
if err != nil {
return fmt.Errorf("failed to copy the repository to the temporary directory: %w", err)
}
// nolint: errcheck
defer tmpRepo.Clean()

var milestone int64
firstRead := true
if v, ok := w.milestoneMap.Load(repoID); ok {
milestone = v.(int64)
firstRead = false
}
var (
handledEvents = make([]*pipedservice.ReportEventStatusesRequest_Event, 0, len(eventCfgs))
outDatedEvents = make([]*pipedservice.ReportEventStatusesRequest_Event, 0)
maxTimestamp int64
outDatedDuration = time.Hour
gitUpdateEvent = false
)
for _, e := range eventCfgs {
var (
matcher = e.Matcher
handler = e.Handler
)
notHandledEvents := w.eventLister.ListNotHandled(matcher.Name, matcher.Labels, milestone+1, numToMakeOutdated)
if len(notHandledEvents) == 0 {
continue
}
if len(notHandledEvents) > 1 {
// Events other than the latest will be OUTDATED.
for _, e := range notHandledEvents[1:] {
outDatedEvents = append(outDatedEvents, &pipedservice.ReportEventStatusesRequest_Event{
Id: e.Id,
Status: model.EventStatus_EVENT_OUTDATED,
StatusDescription: fmt.Sprintf("The new event %q has been created", notHandledEvents[0].Id),
})
}
}

latestEvent := notHandledEvents[0]
if firstRead {
resp, err := w.apiClient.GetLatestEvent(ctx, &pipedservice.GetLatestEventRequest{
Name: matcher.Name,
Labels: matcher.Labels,
})
if err != nil {
return fmt.Errorf("failed to get the latest event: %w", err)
}
// The case where the latest event has already been handled.
if resp.Event.CreatedAt > latestEvent.CreatedAt {
outDatedEvents = append(outDatedEvents, &pipedservice.ReportEventStatusesRequest_Event{
Id: notHandledEvents[0].Id,
Status: model.EventStatus_EVENT_OUTDATED,
StatusDescription: fmt.Sprintf("The new event %q has been created", resp.Event.Id),
})
continue
}
}
if time.Since(time.Unix(latestEvent.CreatedAt, 0)) > outDatedDuration {
outDatedEvents = append(outDatedEvents, &pipedservice.ReportEventStatusesRequest_Event{
Id: latestEvent.Id,
Status: model.EventStatus_EVENT_OUTDATED,
StatusDescription: fmt.Sprintf("Too much time has passed since the event %q was created", latestEvent.Id),
})
continue
}

switch handler.Type {
case config.EventWatcherHandlerTypeGitUpdate:
if err := w.commitFiles(ctx, latestEvent.Data, matcher.Name, handler.Config.CommitMessage, handler.Config.Replacements, tmpRepo); err != nil {
w.logger.Error("failed to commit outdated files", zap.Error(err))
handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{
Id: latestEvent.Id,
Status: model.EventStatus_EVENT_FAILURE,
StatusDescription: fmt.Sprintf("Failed to change files: %v", err),
})
continue
}
handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{
Id: latestEvent.Id,
Status: model.EventStatus_EVENT_SUCCESS,
StatusDescription: fmt.Sprintf("Successfully updated %d files in the %q repository", len(handler.Config.Replacements), repoID),
})
if latestEvent.CreatedAt > maxTimestamp {
maxTimestamp = latestEvent.CreatedAt
}
gitUpdateEvent = true
default:
w.logger.Error(fmt.Sprintf("event watcher handler type %s is not supported yet", handler.Type),
zap.String("event-name", latestEvent.Name),
zap.String("event-id", latestEvent.Id),
)
continue
}
}
if len(outDatedEvents) > 0 {
if _, err := w.apiClient.ReportEventStatuses(ctx, &pipedservice.ReportEventStatusesRequest{Events: outDatedEvents}); err != nil {
return fmt.Errorf("failed to report event statuses: %w", err)
}
w.logger.Info(fmt.Sprintf("successfully made %d events OUTDATED", len(outDatedEvents)))
}
if len(handledEvents) == 0 {
return nil
}

if !gitUpdateEvent {
return nil
}

retry := backoff.NewRetry(retryPushNum, backoff.NewConstant(retryPushInterval))
_, err = retry.Do(ctx, func() (interface{}, error) {
err := tmpRepo.Push(ctx, tmpRepo.GetClonedBranch())
return nil, err
})
if err == nil {
if _, err := w.apiClient.ReportEventStatuses(ctx, &pipedservice.ReportEventStatusesRequest{Events: handledEvents}); err != nil {
return fmt.Errorf("failed to report event statuses: %w", err)
}
w.milestoneMap.Store(repoID, maxTimestamp)
return nil
}

// If push fails because the local branch was not fresh, exit to retry again in the next interval.
if err == git.ErrBranchNotFresh {
w.logger.Warn("failed to push commits", zap.Error(err))
return nil
}

// If push fails because of the other reason, re-set all statuses to FAILURE.
for i := range handledEvents {
if handledEvents[i].Status == model.EventStatus_EVENT_FAILURE {
continue
}
handledEvents[i].Status = model.EventStatus_EVENT_FAILURE
handledEvents[i].StatusDescription = fmt.Sprintf("Failed to push changed files: %v", err)
}
if _, err := w.apiClient.ReportEventStatuses(ctx, &pipedservice.ReportEventStatusesRequest{Events: handledEvents}); err != nil {
return fmt.Errorf("failed to report event statuses: %w", err)
}
w.milestoneMap.Store(repoID, maxTimestamp)
return fmt.Errorf("failed to push commits: %w", err)
}

// updateValues inspects all Event-definition and pushes the changes to git repo if there is.
func (w *watcher) updateValues(ctx context.Context, repo git.Repo, repoID string, eventCfgs []config.EventWatcherEvent, commitMsg string) error {
// Copy the repo to another directory to modify local file to avoid reverting previous changes.
Expand Down Expand Up @@ -290,7 +470,7 @@ func (w *watcher) updateValues(ctx context.Context, repo git.Repo, repoID string
})
continue
}
if err := w.commitFiles(ctx, latestEvent.Data, e, tmpRepo, commitMsg); err != nil {
if err := w.commitFiles(ctx, latestEvent.Data, e.Name, commitMsg, e.Replacements, tmpRepo); err != nil {
w.logger.Error("failed to commit outdated files", zap.Error(err))
handledEvents = append(handledEvents, &pipedservice.ReportEventStatusesRequest_Event{
Id: latestEvent.Id,
Expand Down Expand Up @@ -353,10 +533,10 @@ func (w *watcher) updateValues(ctx context.Context, repo git.Repo, repoID string
}

// commitFiles commits changes if the data in Git is different from the latest event.
func (w *watcher) commitFiles(ctx context.Context, latestData string, eventCfg config.EventWatcherEvent, repo git.Repo, commitMsg string) error {
func (w *watcher) commitFiles(ctx context.Context, latestData, eventName, commitMsg string, replacements []config.EventWatcherReplacement, repo git.Repo) error {
// Determine files to be changed by comparing with the latest event.
changes := make(map[string][]byte, len(eventCfg.Replacements))
for _, r := range eventCfg.Replacements {
changes := make(map[string][]byte, len(replacements))
for _, r := range replacements {
var (
path = filepath.Join(repo.GetPath(), r.File)
newContent []byte
Expand Down Expand Up @@ -390,12 +570,12 @@ func (w *watcher) commitFiles(ctx context.Context, latestData string, eventCfg c
}

if commitMsg == "" {
commitMsg = fmt.Sprintf(defaultCommitMessageFormat, latestData, eventCfg.Name)
commitMsg = fmt.Sprintf(defaultCommitMessageFormat, latestData, eventName)
}
if err := repo.CommitChanges(ctx, repo.GetClonedBranch(), commitMsg, false, changes); err != nil {
return fmt.Errorf("failed to perform git commit: %w", err)
}
w.logger.Info(fmt.Sprintf("event watcher will update values of Event %q", eventCfg.Name))
w.logger.Info(fmt.Sprintf("event watcher will update values of Event %q", eventName))
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/config/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type GenericApplicationSpec struct {
// The list of sealed secrets that should be decrypted.
// Deprecated.
SealedSecrets []SealedSecretMapping `json:"sealedSecrets"`
// List of the configuration for event watcher.
EventWatcher []EventWatcherConfig `json:"eventWatcher"`
}

type DeploymentPlanner struct {
Expand Down
38 changes: 38 additions & 0 deletions pkg/config/event_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,36 @@ type EventWatcherEvent struct {
Replacements []EventWatcherReplacement `json:"replacements"`
}

type EventWatcherConfig struct {
// Matcher represents which event will be handled.
Matcher EventWatcherMatcher `json:"matcher"`
// Handler represents how the matched event will be handled.
Handler EventWatcherHandler `json:"handler"`
}

type EventWatcherMatcher struct {
// The handled event name.
Name string `json:"name"`
// Additional attributes of event. This can make an event definition
// unique even if the one with the same name exists.
Labels map[string]string `json:"labels"`
}

type EventWatcherHandler struct {
// The handler type of event watcher.
Type EventWatcherHandlerType `json:"type,omitempty"`
// The config for the event watcher handler.
Config EventWatcherHandlerConfig `json:"config"`
}

type EventWatcherHandlerConfig struct {
// The commit message used to push after replacing values.
// Default message is used if not given.
CommitMessage string `json:"commitMessage,omitempty"`
// List of places where will be replaced when the new event matches.
Replacements []EventWatcherReplacement `json:"replacements"`
}

type EventWatcherReplacement struct {
// The path to the file to be updated.
File string `json:"file"`
Expand All @@ -56,6 +86,14 @@ type EventWatcherReplacement struct {
Regex string `json:"regex"`
}

// EventWatcherHandlerType represents the type of an event watcher handler.
type EventWatcherHandlerType string

const (
// EventWatcherHandlerTypeGitUpdate represents the handler type for git updating.
EventWatcherHandlerTypeGitUpdate = "GIT_UPDATE"
)

// LoadEventWatcher gives back parsed EventWatcher config after merging config files placed under
// the .pipe directory. With "includes" and "excludes", you can filter the files included the result.
// "excludes" are prioritized if both "excludes" and "includes" are given. ErrNotFound is returned if not found.
Expand Down