Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 104 additions & 86 deletions pkg/app/piped/imagewatcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"time"
Expand All @@ -33,7 +34,10 @@ import (
"github.com/pipe-cd/pipe/pkg/yamlprocessor"
)

const defaultCommitMessageFormat = "Update image %s to %s defined at %s in %s"
const (
defaultCommitMessageFormat = "Update image %s to %s defined at %s in %s"
defaultPullInterval = 5 * time.Minute
)

type Watcher interface {
Run(context.Context) error
Expand All @@ -48,11 +52,9 @@ type watcher struct {
gitClient gitClient
logger *zap.Logger
wg sync.WaitGroup
// For file locking.
mu sync.Mutex

// Indexed by repo id.
gitRepos map[string]git.Repo
// Indexed by the Image Provider name.
providerCfgs map[string]config.PipedImageProvider
}

func NewWatcher(cfg *config.PipedSpec, gitClient gitClient, logger *zap.Logger) Watcher {
Expand All @@ -63,108 +65,132 @@ func NewWatcher(cfg *config.PipedSpec, gitClient gitClient, logger *zap.Logger)
}
}

// Run spawns goroutines for each image provider. They periodically pull the image
// Run spawns goroutines for each git repository. They periodically pull the image
// from the container registry to compare the image with one in the git repository.
func (w *watcher) Run(ctx context.Context) error {
// TODO: Spawn goroutines for each repository
// Pre-clone to cache the registered git repositories.
for _, r := range w.config.Repositories {
// TODO: Clone repository another temporary destination
repo, err := w.gitClient.Clone(ctx, r.RepoID, r.Remote, r.Branch, "")
w.providerCfgs = make(map[string]config.PipedImageProvider, len(w.config.ImageProviders))
for _, cfg := range w.config.ImageProviders {
w.providerCfgs[cfg.Name] = cfg
}

for _, repoCfg := range w.config.Repositories {
repo, err := w.gitClient.Clone(ctx, repoCfg.RepoID, repoCfg.Remote, repoCfg.Branch, "")
if err != nil {
w.logger.Error("failed to clone repository",
zap.String("repo-id", r.RepoID),
zap.String("repo-id", repoCfg.RepoID),
zap.Error(err),
)
return fmt.Errorf("failed to clone repository %s: %w", r.RepoID, err)
}
w.gitRepos[r.RepoID] = repo
}

for _, cfg := range w.config.ImageProviders {
p, err := imageprovider.NewProvider(&cfg, w.logger)
if err != nil {
return fmt.Errorf("failed to yield image provider %s: %w", cfg.Name, err)
return fmt.Errorf("failed to clone repository %s: %w", repoCfg.RepoID, err)
}

w.wg.Add(1)
go w.run(ctx, p, cfg.PullInterval.Duration())
go w.run(ctx, repo, &repoCfg)
}

w.wg.Wait()
return nil
}

// run periodically compares the image in the given provider and one in git repository.
// run periodically compares the image in the given git repository and one in the image provider.
// And then pushes those with differences.
func (w *watcher) run(ctx context.Context, provider imageprovider.Provider, interval time.Duration) {
func (w *watcher) run(ctx context.Context, repo git.Repo, repoCfg *config.PipedRepository) {
defer w.wg.Done()
ticker := time.NewTicker(interval)
defer ticker.Stop()

var (
pullInterval = defaultPullInterval
commitMsg string
includedCfgs, excludedCfgs []string
)
// Use user-defined settings if there is.
for _, r := range w.config.ImageWatcher.Repos {
if r.RepoID != repoCfg.RepoID {
continue
}
pullInterval = time.Duration(r.CheckInterval)
commitMsg = r.CommitMessage
includedCfgs = r.Includes
excludedCfgs = r.Excludes
break
}

ticker := time.NewTicker(pullInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Inspect all targets in all repos, and compare only images managed by the given provider.
for id, repo := range w.gitRepos {
cfg, err := w.loadImageWatcherConfig(ctx, id, repo)
if err != nil {
w.logger.Error("failed to load image watcher config",
zap.String("repo-id", id),
zap.Error(err),
)
continue
}
for _, target := range cfg.Targets {
if target.Provider != provider.Name() {
continue
}
if err := w.updateOutdatedImage(ctx, &target, repo, provider); err != nil {
w.logger.Error("failed to update image",
zap.String("repo-id", id),
zap.String("image-provider", provider.Name()),
zap.Error(err),
)
continue
}
}
err := repo.Pull(ctx, repo.GetClonedBranch())
if err != nil {
w.logger.Error("failed to perform git pull",
zap.String("repo-id", repoCfg.RepoID),
zap.String("branch", repo.GetClonedBranch()),
zap.Error(err),
)
continue
}
cfg, ok, err := config.LoadImageWatcher(repo.GetPath(), includedCfgs, excludedCfgs)
if err != nil {
w.logger.Error("failed to load configuration file for Image Watcher",
zap.String("repo-id", repoCfg.RepoID),
zap.Error(err),
)
continue
}
if !ok {
w.logger.Error("configuration file for Image Watcher not found",
zap.String("repo-id", repoCfg.RepoID),
zap.Error(err),
)
continue
}
if err := w.pushOutdatedImages(ctx, repo, cfg.Targets, commitMsg); err != nil {
w.logger.Error("failed to push the changes",
zap.String("repo-id", repoCfg.RepoID),
zap.Error(err),
)
}
}
}
}

// loadImageWatcherConfig gives back an Image Watcher Config for the given repo.
func (w *watcher) loadImageWatcherConfig(ctx context.Context, repoID string, repo git.Repo) (*config.ImageWatcherSpec, error) {
w.mu.Lock()
err := repo.Pull(ctx, repo.GetClonedBranch())
w.mu.Unlock()
// pushOutdatedImages inspects all targets and pushes to git repo after commting the changes.
func (w *watcher) pushOutdatedImages(ctx context.Context, repo git.Repo, targets []config.ImageWatcherTarget, commitMsg string) error {
// Copy the repo to another directory to avoid pull failure in the future.
tmpDir, err := ioutil.TempDir("", "image-watcher")
if err != nil {
return nil, fmt.Errorf("failed to perform git pull: %w", err)
return fmt.Errorf("failed to create a new temporary directory: %w", err)
}

var includes, excludes []string
for _, repos := range w.config.ImageWatcher.Repos {
if repos.RepoID == repoID {
includes = repos.Includes
excludes = repos.Excludes
break
}
}
cfg, ok, err := config.LoadImageWatcher(repo.GetPath(), includes, excludes)
defer os.RemoveAll(tmpDir)
tmpRepo, err := repo.Copy(tmpDir)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Copy is done every 5 minutes even if there are no images to update.
I think it is a waste.
It would be better to check the outdated images based on the original read-only repo and then if there are any images to update, let copy to do.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, will fix it.

if err != nil {
return nil, fmt.Errorf("failed to load configuration file for Image Watcher: %w", err)
return fmt.Errorf("failed to copy the repository to the temporary directory: %w", err)
}
if !ok {
return nil, fmt.Errorf("configuration file for Image Watcher not found: %w", err)

for _, t := range targets {
if err := w.commitOutdatedImage(ctx, &t, tmpRepo, commitMsg); err != nil {
w.logger.Error("failed to update image",
zap.Error(err),
)
continue
}
}
return cfg, nil

return tmpRepo.Push(ctx, tmpRepo.GetClonedBranch())
}

// updateOutdatedImage first compares the image in git repository and one in image provider.
// Then pushes rewritten one to the git repository if any deviation exists.
func (w *watcher) updateOutdatedImage(ctx context.Context, target *config.ImageWatcherTarget, repo git.Repo, provider imageprovider.Provider) error {
// Fetch from the image provider.
// commitOutdatedImage first compares the image in the given git repository and one in the
// image provider. Then commits rewritten one to the git repository if any deviation exists.
func (w *watcher) commitOutdatedImage(ctx context.Context, target *config.ImageWatcherTarget, repo git.Repo, commitMsg string) error {
// Retrieve the image from the image provider.
providerCfg, ok := w.providerCfgs[target.Provider]
if !ok {
return fmt.Errorf("unknown image provider %s is defined", target.Provider)
}
provider, err := imageprovider.NewProvider(&providerCfg, w.logger)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, I was thinking about making clients singletons but stopped doing so.
ECR client looks not to do write operation to itself. But the investigation for others hasn't done yet. So for their safety, I decided to create clients every time for now.

if err != nil {
return fmt.Errorf("failed to yield image provider %s: %w", providerCfg.Name, err)
}
i, err := provider.ParseImage(target.Image)
if err != nil {
return fmt.Errorf("failed to parse image string \"%s\": %w", target.Image, err)
Expand All @@ -175,7 +201,7 @@ func (w *watcher) updateOutdatedImage(ctx context.Context, target *config.ImageW
return fmt.Errorf("failed to get latest image from %s: %w", provider.Name(), err)
}

// Fetch from the git repository.
// Retrieve the image from the file cloned from the git repository.
path := filepath.Join(repo.GetPath(), target.FilePath)
yml, err := ioutil.ReadFile(path)
if err != nil {
Expand All @@ -195,24 +221,16 @@ func (w *watcher) updateOutdatedImage(ctx context.Context, target *config.ImageW
return nil
}

// Update the outdated image.
// Update the outdated image and commit it.
newYml, err := yamlprocessor.ReplaceValue(yml, target.Field, imageInRegistry.String())
if err != nil {
return fmt.Errorf("failed to replace value at %s with %s: %w", target.Field, imageInRegistry, err)
}
changes := map[string][]byte{
target.FilePath: newYml,
}
// TODO: Make it changeable the commit message
msg := fmt.Sprintf(defaultCommitMessageFormat, imageInGit, imageInRegistry.String(), target.Field, target.FilePath)
w.mu.Lock()
if err := repo.CommitChanges(ctx, repo.GetClonedBranch(), msg, false, changes); err != nil {
return fmt.Errorf("failed to perform git commit: %w", err)
}
err = repo.Push(ctx, repo.GetClonedBranch())
w.mu.Unlock()
if err != nil {
return fmt.Errorf("failed to perform git push: %w", err)
if commitMsg == "" {
commitMsg = fmt.Sprintf(defaultCommitMessageFormat, imageInGit, imageInRegistry.String(), target.Field, target.FilePath)
}
return nil
return repo.CommitChanges(ctx, repo.GetClonedBranch(), commitMsg, false, changes)
}
40 changes: 24 additions & 16 deletions pkg/config/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"github.com/pipe-cd/pipe/pkg/model"
)

const (
defaultImageWatcherPullInterval = Duration(5 * time.Minute)
)

var DefaultKubernetesCloudProvider = PipedCloudProvider{
Name: "kubernetes-default",
Type: model.CloudProviderKubernetes,
Expand Down Expand Up @@ -59,7 +63,7 @@ type PipedSpec struct {
Notifications Notifications `json:"notifications"`
// How the sealed secret should be managed.
SealedSecretManagement *SealedSecretManagement `json:"sealedSecretManagement"`
// Configuration for image watcher.
// Optional settings for image watcher.
ImageWatcher PipedImageWatcher `json:"imageWatcher"`
}

Expand Down Expand Up @@ -381,18 +385,15 @@ type AnalysisProviderStackdriverConfig struct {
type PipedImageProvider struct {
Name string `json:"name"`
Type model.ImageProviderType `json:"type"`
// Default is 5m.
PullInterval Duration `json:"pullInterval"`

DockerHubConfig *ImageProviderDockerHubConfig
GCRConfig *ImageProviderGCRConfig
ECRConfig *ImageProviderECRConfig
}

type genericPipedImageProvider struct {
Name string `json:"name"`
Type model.ImageProviderType `json:"type"`
PullInterval Duration `json:"pullInterval"`
Name string `json:"name"`
Type model.ImageProviderType `json:"type"`

Config json.RawMessage `json:"config"`
}
Expand All @@ -405,10 +406,6 @@ func (p *PipedImageProvider) UnmarshalJSON(data []byte) error {
}
p.Name = gp.Name
p.Type = gp.Type
p.PullInterval = gp.PullInterval
if p.PullInterval == 0 {
p.PullInterval = Duration(5 * time.Minute)
}

switch p.Type {
case model.ImageProviderTypeDockerHub:
Expand Down Expand Up @@ -594,19 +591,30 @@ type PipedImageWatcher struct {
}

// Validate checks if the duplicated repository setting exists.
func (i *PipedImageWatcher) Validate() error {
repos := make(map[string]struct{})
for _, repo := range i.Repos {
if _, ok := repos[repo.RepoID]; ok {
return fmt.Errorf("duplicated repo id (%s) found in the imageWatcher directive", repo.RepoID)
// And it populates default value if not set.
func (p *PipedImageWatcher) Validate() error {
repos := make(map[string]struct{}, len(p.Repos))
for i := 0; i < len(p.Repos); i++ {
if _, ok := repos[p.Repos[i].RepoID]; ok {
return fmt.Errorf("duplicated repo id (%s) found in the imageWatcher directive", p.Repos[i].RepoID)
}
repos[p.Repos[i].RepoID] = struct{}{}

if p.Repos[i].CheckInterval == 0 {
p.Repos[i].CheckInterval = defaultImageWatcherPullInterval
}
repos[repo.RepoID] = struct{}{}
}
return nil
}

type PipedImageWatcherRepoTarget struct {
RepoID string `json:"repoId"`
// Interval to compare if the image in the git repository
// and one in the images provider. Default is 5m.
CheckInterval Duration `json:"checkInterval"`
// The commit message used to push after updating image.
// Default message is used if not given.
CommitMessage string `json:"commitMessage"`
// The paths to ImageWatcher files to be included.
Includes []string `json:"includes"`
// The paths to ImageWatcher files to be excluded.
Expand Down
Loading