Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pkg/app/piped/eventwatcher/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["eventwatcher.go"],
importpath = "github.com/pipe-cd/pipe/pkg/app/piped/eventwatcher",
visibility = ["//visibility:public"],
deps = [
"//pkg/config:go_default_library",
"//pkg/git:go_default_library",
"//pkg/model:go_default_library",
"//pkg/yamlprocessor:go_default_library",
"@org_uber_go_zap//:go_default_library",
],
)
254 changes: 254 additions & 0 deletions pkg/app/piped/eventwatcher/eventwatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
// Copyright 2021 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package eventwatcher provides facilities to update config files when new
// event found. It can be done by periodically comparing the latest value user
// registered and the value in the files placed at Git repositories.
package eventwatcher

import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"time"

"go.uber.org/zap"

"github.com/pipe-cd/pipe/pkg/config"
"github.com/pipe-cd/pipe/pkg/git"
"github.com/pipe-cd/pipe/pkg/model"
"github.com/pipe-cd/pipe/pkg/yamlprocessor"
)

const (
// The latest value and Event name are supposed.
defaultCommitMessageFormat = "Replace values with %q set by Event %q"
defaultCheckInterval = 5 * time.Minute
)

type Watcher interface {
Run(context.Context) error
}

type eventGetter interface {
GetLatest(ctx context.Context, name string, labels map[string]string) (*model.Event, bool)
}

type gitClient interface {
Clone(ctx context.Context, repoID, remote, branch, destination string) (git.Repo, error)
}

type commit struct {
changes map[string][]byte
message string
}

type watcher struct {
config *config.PipedSpec
eventGetter eventGetter
gitClient gitClient
logger *zap.Logger
wg sync.WaitGroup
}

func NewWatcher(cfg *config.PipedSpec, eventGetter eventGetter, gitClient gitClient, logger *zap.Logger) Watcher {
return &watcher{
config: cfg,
eventGetter: eventGetter,
gitClient: gitClient,
logger: logger.Named("event-watcher"),
}
}

// Run spawns goroutines for each git repository. They periodically fetch the latest Event
// from the control-plane to compare the value with one in the git repository.
func (w *watcher) Run(ctx context.Context) error {
w.logger.Info("start running event watcher")

for _, repoCfg := range w.config.Repositories {
repo, err := w.gitClient.Clone(ctx, repoCfg.RepoID, repoCfg.Remote, repoCfg.Branch, "")
if err != nil {
w.logger.Error("failed to clone repository",
zap.String("repo-id", repoCfg.RepoID),
zap.Error(err),
)
return fmt.Errorf("failed to clone repository %s: %w", repoCfg.RepoID, err)
}
defer os.RemoveAll(repo.GetPath())

w.wg.Add(1)
go w.run(ctx, repo, &repoCfg)
}

w.wg.Wait()
return nil
}

// run works against a single git repo. It periodically compares the value in the given
// git repository and one in the control-plane. And then pushes those with differences.
func (w *watcher) run(ctx context.Context, repo git.Repo, repoCfg *config.PipedRepository) {
defer w.wg.Done()

var (
commitMsg string
includedCfgs, excludedCfgs []string
)
// Use user-defined settings if there is.
for _, r := range w.config.EventWatcher.GitRepos {
if r.RepoID != repoCfg.RepoID {
continue
}
commitMsg = r.CommitMessage
includedCfgs = r.Includes
excludedCfgs = r.Excludes
break
}
checkInterval := time.Duration(w.config.EventWatcher.CheckInterval)
if checkInterval == 0 {
checkInterval = defaultCheckInterval
}

ticker := time.NewTicker(checkInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := repo.Pull(ctx, repo.GetClonedBranch())
if err != nil {
w.logger.Error("failed to perform git pull",
zap.String("repo-id", repoCfg.RepoID),
zap.String("branch", repo.GetClonedBranch()),
zap.Error(err),
)
continue
}
cfg, err := config.LoadEventWatcher(repo.GetPath(), includedCfgs, excludedCfgs)
if errors.Is(err, config.ErrNotFound) {
w.logger.Info("configuration file for Event Watcher not found",
zap.String("repo-id", repoCfg.RepoID),
zap.Error(err),
)
continue
}
if err != nil {
w.logger.Error("failed to load configuration file for Event Watcher",
zap.String("repo-id", repoCfg.RepoID),
zap.Error(err),
)
continue
}
if err := w.updateValues(ctx, repo, cfg.Events, commitMsg); err != nil {
w.logger.Error("failed to update the values",
zap.String("repo-id", repoCfg.RepoID),
zap.Error(err),
)
}
}
}
}

// updateValues inspects all Event-definition and pushes the changes to git repo if there is.
func (w *watcher) updateValues(ctx context.Context, repo git.Repo, events []config.EventWatcherEvent, commitMsg string) error {
// Copy the repo to another directory to avoid pull failure in the future.
tmpDir, err := ioutil.TempDir("", "event-watcher")
if err != nil {
return fmt.Errorf("failed to create a new temporary directory: %w", err)
}
defer os.RemoveAll(tmpDir)
tmpRepo, err := repo.Copy(filepath.Join(tmpDir, "tmp-repo"))
if err != nil {
return fmt.Errorf("failed to copy the repository to the temporary directory: %w", err)
}

commits := make([]*commit, 0)
for _, e := range events {
c, err := w.modifyFiles(ctx, &e, tmpRepo, commitMsg)
if err != nil {
w.logger.Error("failed to check outdated value", zap.Error(err))
continue
}
if c != nil {
commits = append(commits, c)
}
}
if len(commits) == 0 {
return nil
}

w.logger.Info(fmt.Sprintf("there are %d outdated values", len(commits)))
for _, c := range commits {
if err := tmpRepo.CommitChanges(ctx, tmpRepo.GetClonedBranch(), c.message, false, c.changes); err != nil {
return fmt.Errorf("failed to perform git commit: %w", err)
}
}
return tmpRepo.Push(ctx, tmpRepo.GetClonedBranch())
}

// modifyFiles modifies files defined in a given Event if any deviation exists between the value in
// the git repository and one in the control-plane. And gives back a change contents.
func (w *watcher) modifyFiles(ctx context.Context, event *config.EventWatcherEvent, repo git.Repo, commitMsg string) (*commit, error) {
latestEvent, ok := w.eventGetter.GetLatest(ctx, event.Name, event.Labels)
if !ok {
return nil, fmt.Errorf("failed to get the latest Event with the name %q", event.Name)
}

// Determine files to be changed.
changes := make(map[string][]byte, 0)
for _, r := range event.Replacements {
path := filepath.Join(repo.GetPath(), r.File)
yml, err := ioutil.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed to read file: %w", err)
}
v, err := yamlprocessor.GetValue(yml, r.YAMLField)
if err != nil {
return nil, fmt.Errorf("failed to get value at %s in %s: %w", r.YAMLField, r.File, err)
}
value, ok := v.(string)
if !ok {
return nil, fmt.Errorf("unknown type of value is defined at %s in %s", r.YAMLField, r.File)
}
if latestEvent.Data == value {
// Already up-to-date.
continue
}
// Modify the local file and put it into the change list.
newYml, err := yamlprocessor.ReplaceValue(yml, r.YAMLField, latestEvent.Data)
Copy link
Member

@nghialv nghialv Jan 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case the users configured multiple fields for the same file, will this override the old changes?

Copy link
Member Author

@nakabonne nakabonne Jan 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that commit will happen conflict and end with failure.

Copy link
Member Author

@nakabonne nakabonne Jan 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I assumed it doesn't happen if the latest values are the same. It happens when only the latest values are different from each other.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I just got what you mean. No. it always override the original content, defined in Git.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of my decisions are based on the rule that never makes a change that crosses lines, but you know what I mean?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I totally got what you're most worried about. Yes, you're right. That can revert previous changes. Let me re-think it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we can simply solve it by editing a local file inside this loop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That is what I was talking about.

Copy link
Member Author

@nakabonne nakabonne Jan 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha, finally I caught up.

I feel like we can simply solve it by editing a local file inside this loop.

I'm now inspecting if there are some issues with this way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we have no problem with it, so applied it. PTAL 😄

if err != nil {
return nil, fmt.Errorf("failed to replace value at %s with %s: %w", r.YAMLField, latestEvent.Data, err)
}
if err := ioutil.WriteFile(path, newYml, os.ModePerm); err != nil {
Copy link
Member

@nghialv nghialv Jan 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have just realized that we have to save it in memory instead of writing to the local path.
Because writing it back to the local path will cause an error while doing the next pulls ??
(The data inside the repo must be read-only.)

Copy link
Member

@nghialv nghialv Jan 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So at line 215, checking it in the changes map before loading it from the local path looks good.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right! using cache looks good. Alternatively, I think we can leverage the temp repo because we use it when pushing. We can use it when making changes as well
https://github.com/pipe-cd/pipe/pull/1411/files#diff-60908ffb7e37bfc3692d8c4cace4be91d53bccf9a549fb91ea30b70387abd9cfR191

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Totally I misunderstood that we're using tmpRepo when making changes.)

Copy link
Member Author

@nakabonne nakabonne Jan 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... using tmp repo makes the logic simple. But on the dark side, copying the directory happens every time no matter commits exists. After thinking about that, I'm beginning to think the way to check changes you suggested is more appropriate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, checking changes looks better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good. Let me do so.

Copy link
Member Author

@nakabonne nakabonne Jan 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hold on, I found it is still likely to be overwritten when processing other events. The changes is reset for each event. So looks like it's safer to use tmpRepo.

https://github.com/pipe-cd/pipe/pull/1411/files#diff-60908ffb7e37bfc3692d8c4cace4be91d53bccf9a549fb91ea30b70387abd9cfR170-R171

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. lol. So let's update the local data. And please change the function name to match its new behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. The current one is absolutely not appropriate.

return nil, fmt.Errorf("failed to write file: %w", err)
}
changes[r.File] = newYml
}

if len(changes) == 0 {
return nil, nil
}

if commitMsg == "" {
commitMsg = fmt.Sprintf(defaultCommitMessageFormat, latestEvent.Data, event.Name)
}
return &commit{
changes: changes,
message: commitMsg,
}, nil
}