Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 101 additions & 84 deletions pkg/app/piped/imagewatcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"time"
Expand All @@ -33,7 +34,10 @@ import (
"github.com/pipe-cd/pipe/pkg/yamlprocessor"
)

const defaultCommitMessageFormat = "Update image %s to %s defined at %s in %s"
const (
defaultCommitMessageFormat = "Update image %s to %s defined at %s in %s"
defaultPullInterval = 5 * time.Minute
)

type Watcher interface {
Run(context.Context) error
Expand All @@ -48,11 +52,9 @@ type watcher struct {
gitClient gitClient
logger *zap.Logger
wg sync.WaitGroup
// For file locking.
mu sync.Mutex

// Indexed by repo id.
gitRepos map[string]git.Repo
// Indexed by the Image Provider name.
providerCfgs map[string]config.PipedImageProvider
}

func NewWatcher(cfg *config.PipedSpec, gitClient gitClient, logger *zap.Logger) Watcher {
Expand All @@ -63,108 +65,113 @@ func NewWatcher(cfg *config.PipedSpec, gitClient gitClient, logger *zap.Logger)
}
}

// Run spawns goroutines for each image provider. They periodically pull the image
// Run spawns goroutines for each git repository. They periodically pull the image
// from the container registry to compare the image with one in the git repository.
func (w *watcher) Run(ctx context.Context) error {
// TODO: Spawn goroutines for each repository
// Pre-clone to cache the registered git repositories.
for _, r := range w.config.Repositories {
// TODO: Clone repository another temporary destination
repo, err := w.gitClient.Clone(ctx, r.RepoID, r.Remote, r.Branch, "")
if err != nil {
w.logger.Error("failed to clone repository",
zap.String("repo-id", r.RepoID),
zap.Error(err),
)
return fmt.Errorf("failed to clone repository %s: %w", r.RepoID, err)
}
w.gitRepos[r.RepoID] = repo
}

w.providerCfgs = make(map[string]config.PipedImageProvider, len(w.config.ImageProviders))
for _, cfg := range w.config.ImageProviders {
p, err := imageprovider.NewProvider(&cfg, w.logger)
if err != nil {
return fmt.Errorf("failed to yield image provider %s: %w", cfg.Name, err)
}
w.providerCfgs[cfg.Name] = cfg
}

for _, repo := range w.config.Repositories {
w.wg.Add(1)
go w.run(ctx, p, cfg.PullInterval.Duration())
go w.run(ctx, &repo)
}

w.wg.Wait()
return nil
}

// run periodically compares the image in the given provider and one in git repository.
// run periodically compares the image in the given git repository and one in the image provider.
// And then pushes those with differences.
func (w *watcher) run(ctx context.Context, provider imageprovider.Provider, interval time.Duration) {
func (w *watcher) run(ctx context.Context, repoCfg *config.PipedRepository) {
defer w.wg.Done()
ticker := time.NewTicker(interval)
defer ticker.Stop()

var (
pullInterval = defaultPullInterval
commitMsg = ""
includedCfgs = []string{}
excludedCfgs = []string{}
)
// Use user-defined settings if there is.
for _, r := range w.config.ImageWatcher.Repos {
if r.RepoID != repoCfg.RepoID {
continue
}
pullInterval = time.Duration(r.PullInterval)
commitMsg = r.CommitMessage
includedCfgs = r.Includes
excludedCfgs = r.Excludes
break
}

// Periodically update this cloned directory as long as this worker continues.
repo, err := w.gitClient.Clone(ctx, repoCfg.RepoID, repoCfg.Remote, repoCfg.Branch, "")
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When one of the goroutine stops due to this error, should we cancel all of the others?
Currently, this one is stopped and the other ones still running and just an error was logged.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, we should ensure all workers properly start running. Using sync/errgroup would be better.

w.logger.Error("failed to clone repository",
zap.String("repo-id", repoCfg.RepoID),
zap.Error(err),
)
return
}

ticker := time.NewTicker(pullInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Inspect all targets in all repos, and compare only images managed by the given provider.
for id, repo := range w.gitRepos {
cfg, err := w.loadImageWatcherConfig(ctx, id, repo)
if err != nil {
w.logger.Error("failed to load image watcher config",
zap.String("repo-id", id),
err := repo.Pull(ctx, repo.GetClonedBranch())
if err != nil {
w.logger.Error("failed to perform git pull",
zap.String("repo-id", repoCfg.RepoID),
zap.String("branch", repo.GetClonedBranch()),
zap.Error(err),
)
continue
}
cfg, ok, err := config.LoadImageWatcher(repo.GetPath(), includedCfgs, excludedCfgs)
if err != nil {
w.logger.Error("failed to load configuration file for Image Watcher",
zap.String("repo-id", repoCfg.RepoID),
zap.Error(err),
)
continue
}
if !ok {
w.logger.Error("configuration file for Image Watcher not found",
zap.String("repo-id", repoCfg.RepoID),
zap.Error(err),
)
continue
}
// Inspect all targets defined in the repo, and update outdated images.
for _, target := range cfg.Targets {
if err := w.updateOutdatedImage(ctx, &target, repoCfg, repo.GetPath(), commitMsg); err != nil {
w.logger.Error("failed to update image",
zap.String("repo-id", repoCfg.RepoID),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of checking and updating for each target, how about checking and adding the commit for each target and then pushing once after all.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. That way can reduce the number of pushing to git. But I'm a bit worried about the commit will be unclear. Including all changes into a single commit may make it tough to revert

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is mainly for dev environment, but just warried.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we add multiple commits (each commit for each change) and push after all.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, now I can understand. For that, we should share a temporary directory between all targets, but it looks like no problem to do.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! It looks the best way!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

zap.Error(err),
)
continue
}
for _, target := range cfg.Targets {
if target.Provider != provider.Name() {
continue
}
if err := w.updateOutdatedImage(ctx, &target, repo, provider); err != nil {
w.logger.Error("failed to update image",
zap.String("repo-id", id),
zap.String("image-provider", provider.Name()),
zap.Error(err),
)
continue
}
}
}
}
}
}

// loadImageWatcherConfig gives back an Image Watcher Config for the given repo.
func (w *watcher) loadImageWatcherConfig(ctx context.Context, repoID string, repo git.Repo) (*config.ImageWatcherSpec, error) {
w.mu.Lock()
err := repo.Pull(ctx, repo.GetClonedBranch())
w.mu.Unlock()
if err != nil {
return nil, fmt.Errorf("failed to perform git pull: %w", err)
}

var includes, excludes []string
for _, repos := range w.config.ImageWatcher.Repos {
if repos.RepoID == repoID {
includes = repos.Includes
excludes = repos.Excludes
break
}
// updateOutdatedImage first compares the image in the given git repository and one in the
// image provider. Then pushes rewritten one to the git repository if any deviation exists.
func (w *watcher) updateOutdatedImage(ctx context.Context, target *config.ImageWatcherTarget, repoCfg *config.PipedRepository, repoRoot, commitMsg string) error {
// Retrieve the image from the image provider.
providerCfg, ok := w.providerCfgs[target.Provider]
if !ok {
return fmt.Errorf("unknown image provider %s is defined", target.Provider)
}
cfg, ok, err := config.LoadImageWatcher(repo.GetPath(), includes, excludes)
provider, err := imageprovider.NewProvider(&providerCfg, w.logger)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, I was thinking about making clients singletons but stopped doing so.
ECR client looks not to do write operation to itself. But the investigation for others hasn't done yet. So for their safety, I decided to create clients every time for now.

if err != nil {
return nil, fmt.Errorf("failed to load configuration file for Image Watcher: %w", err)
}
if !ok {
return nil, fmt.Errorf("configuration file for Image Watcher not found: %w", err)
return fmt.Errorf("failed to yield image provider %s: %w", providerCfg.Name, err)
}
return cfg, nil
}

// updateOutdatedImage first compares the image in git repository and one in image provider.
// Then pushes rewritten one to the git repository if any deviation exists.
func (w *watcher) updateOutdatedImage(ctx context.Context, target *config.ImageWatcherTarget, repo git.Repo, provider imageprovider.Provider) error {
// Fetch from the image provider.
i, err := provider.ParseImage(target.Image)
if err != nil {
return fmt.Errorf("failed to parse image string \"%s\": %w", target.Image, err)
Expand All @@ -175,8 +182,8 @@ func (w *watcher) updateOutdatedImage(ctx context.Context, target *config.ImageW
return fmt.Errorf("failed to get latest image from %s: %w", provider.Name(), err)
}

// Fetch from the git repository.
path := filepath.Join(repo.GetPath(), target.FilePath)
// Retrieve the image from the file cloned from the git repository.
path := filepath.Join(repoRoot, target.FilePath)
yml, err := ioutil.ReadFile(path)
if err != nil {
return fmt.Errorf("failed to read file: %w", err)
Expand All @@ -196,21 +203,31 @@ func (w *watcher) updateOutdatedImage(ctx context.Context, target *config.ImageW
}

// Update the outdated image.
//
// Clone repo into another directory to avoid pull failure in the future.
tmpDir, err := ioutil.TempDir("", "image-watcher")
if err != nil {
return fmt.Errorf("failed to create a new temporary directory: %w", err)
}
defer os.RemoveAll(tmpDir)
repo, err := w.gitClient.Clone(ctx, repoCfg.RepoID, repoCfg.Remote, repoCfg.Branch, tmpDir)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

repo.Copy may be better. Because cloning is retrieving a new git tree. The commit-tree could be added from the last pull.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, definitely right.

if err != nil {
return fmt.Errorf("failed to clone %s into the temporary directory: %w", repoCfg.RepoID, err)
}
newYml, err := yamlprocessor.ReplaceValue(yml, target.Field, imageInRegistry.String())
if err != nil {
return fmt.Errorf("failed to replace value at %s with %s: %w", target.Field, imageInRegistry, err)
}
changes := map[string][]byte{
target.FilePath: newYml,
}
// TODO: Make it changeable the commit message
msg := fmt.Sprintf(defaultCommitMessageFormat, imageInGit, imageInRegistry.String(), target.Field, target.FilePath)
w.mu.Lock()
if err := repo.CommitChanges(ctx, repo.GetClonedBranch(), msg, false, changes); err != nil {
if commitMsg == "" {
commitMsg = fmt.Sprintf(defaultCommitMessageFormat, imageInGit, imageInRegistry.String(), target.Field, target.FilePath)
}
if err := repo.CommitChanges(ctx, repo.GetClonedBranch(), commitMsg, false, changes); err != nil {
return fmt.Errorf("failed to perform git commit: %w", err)
}
err = repo.Push(ctx, repo.GetClonedBranch())
w.mu.Unlock()
if err != nil {
return fmt.Errorf("failed to perform git push: %w", err)
}
Expand Down
41 changes: 25 additions & 16 deletions pkg/config/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"github.com/pipe-cd/pipe/pkg/model"
)

const (
defaultImageWatcherPullInterval = Duration(5 * time.Minute)
)

var DefaultKubernetesCloudProvider = PipedCloudProvider{
Name: "kubernetes-default",
Type: model.CloudProviderKubernetes,
Expand Down Expand Up @@ -59,7 +63,7 @@ type PipedSpec struct {
Notifications Notifications `json:"notifications"`
// How the sealed secret should be managed.
SealedSecretManagement *SealedSecretManagement `json:"sealedSecretManagement"`
// Configuration for image watcher.
// Optional settings for image watcher.
ImageWatcher PipedImageWatcher `json:"imageWatcher"`
}

Expand Down Expand Up @@ -381,18 +385,15 @@ type AnalysisProviderStackdriverConfig struct {
type PipedImageProvider struct {
Name string `json:"name"`
Type model.ImageProviderType `json:"type"`
// Default is 5m.
PullInterval Duration `json:"pullInterval"`

DockerHubConfig *ImageProviderDockerHubConfig
GCRConfig *ImageProviderGCRConfig
ECRConfig *ImageProviderECRConfig
}

type genericPipedImageProvider struct {
Name string `json:"name"`
Type model.ImageProviderType `json:"type"`
PullInterval Duration `json:"pullInterval"`
Name string `json:"name"`
Type model.ImageProviderType `json:"type"`

Config json.RawMessage `json:"config"`
}
Expand All @@ -405,10 +406,6 @@ func (p *PipedImageProvider) UnmarshalJSON(data []byte) error {
}
p.Name = gp.Name
p.Type = gp.Type
p.PullInterval = gp.PullInterval
if p.PullInterval == 0 {
p.PullInterval = Duration(5 * time.Minute)
}

switch p.Type {
case model.ImageProviderTypeDockerHub:
Expand Down Expand Up @@ -594,19 +591,31 @@ type PipedImageWatcher struct {
}

// Validate checks if the duplicated repository setting exists.
func (i *PipedImageWatcher) Validate() error {
repos := make(map[string]struct{})
for _, repo := range i.Repos {
if _, ok := repos[repo.RepoID]; ok {
return fmt.Errorf("duplicated repo id (%s) found in the imageWatcher directive", repo.RepoID)
// And it populates default value if not set.
func (p *PipedImageWatcher) Validate() error {
repos := make(map[string]struct{}, len(p.Repos))
for i := 0; i < len(p.Repos); i++ {
if _, ok := repos[p.Repos[i].RepoID]; ok {
return fmt.Errorf("duplicated repo id (%s) found in the imageWatcher directive", p.Repos[i].RepoID)
}
repos[p.Repos[i].RepoID] = struct{}{}

if p.Repos[i].PullInterval == 0 {
p.Repos[i].PullInterval = defaultImageWatcherPullInterval
}
repos[repo.RepoID] = struct{}{}
}
return nil
}

type PipedImageWatcherRepoTarget struct {
RepoID string `json:"repoId"`
// Interval to pull from git repository and pull images defined
// in image watcher file in the repo from image provider.
// Default is 5m.
PullInterval Duration `json:"pullInterval"`
// The commit message used to push after updating image.
// Default message is used if not given.
CommitMessage string `json:"commitMessage"`
// The paths to ImageWatcher files to be included.
Includes []string `json:"includes"`
// The paths to ImageWatcher files to be excluded.
Expand Down
Loading