Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 72 additions & 60 deletions pkg/app/piped/imagewatcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/pipe-cd/pipe/pkg/yamlprocessor"
)

const defaultCommitMessageFormat = "Update image %s to %s defined at %s in %s"

type Watcher interface {
Run(context.Context) error
}
Expand All @@ -46,7 +48,8 @@ type watcher struct {
gitClient gitClient
logger *zap.Logger
wg sync.WaitGroup
mu sync.Mutex
// For file locking.
mu sync.Mutex

// Indexed by repo id.
gitRepos map[string]git.Repo
Expand All @@ -63,23 +66,25 @@ func NewWatcher(cfg *config.PipedSpec, gitClient gitClient, logger *zap.Logger)
// Run spawns goroutines for each image provider. They periodically pull the image
// from the container registry to compare the image with one in the git repository.
func (w *watcher) Run(ctx context.Context) error {
// TODO: Spawn goroutines for each repository
// Pre-clone to cache the registered git repositories.
for _, r := range w.config.Repositories {
// TODO: Clone repository another temporary destination
repo, err := w.gitClient.Clone(ctx, r.RepoID, r.Remote, r.Branch, "")
if err != nil {
w.logger.Error("failed to clone repository",
zap.String("repo-id", r.RepoID),
zap.Error(err),
)
return err
return fmt.Errorf("failed to clone repository %s: %w", r.RepoID, err)
}
w.gitRepos[r.RepoID] = repo
}

for _, cfg := range w.config.ImageProviders {
p, err := imageprovider.NewProvider(&cfg, w.logger)
if err != nil {
return err
return fmt.Errorf("failed to yield image provider %s: %w", cfg.Name, err)
}

w.wg.Add(1)
Expand All @@ -89,7 +94,7 @@ func (w *watcher) Run(ctx context.Context) error {
return nil
}

// run periodically compares the image stored in the given provider and one stored in git.
// run periodically compares the image in the given provider and one in git repository.
Copy link
Member

@nghialv nghialv Dec 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in this PR's scope, but I feel that we should start thinking about simplifying the concurrency model implemented in this ImageWatcher package.

Current approach:

  • Spawn goroutines for each image provider
  • Inside each goroutine
    • loop the repositories
      • lock and pull to update the local git.repo data
      • load the configuration at .pipe to find the target providers
      • if the provider was specified, retrieve the latest images
      • lock and update them

Suggestion:

  • Spawn goroutines for each repository
  • Inside each goroutine
    • pull to update the local repo data
    • load the configuration at .pipe to find the target providers
    • loop the target providers
      • retrieve the latest images and update them

Pros:

  • don't need the look for git.Pull
  • avoid complex concurrent processing for git.Repo: pull and push
  • reduce the number of pulls and configuration loads (current: provider_num x repo_num, new: repo_num)

Cons:

  • The pull interval for all image providers must be same (I suggest to move that field into imageWatcher part, imageProvider part is only for defining how to interact with the image provider)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're quite right. Current complex implementation is cause by the spec that requires to specify polling interval for each provider. I couldn't completely make up my mind for a long time. The way to be the same interval across all provider would be pretty simple, and love it. But isn't it really needed to configure the interval for each provider? I thought we should be more cautious about communication to the container registry than to git repository, that's why I designed like this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But as you said, using the same interval is pretty good if it's not worth supporting.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance, a user has two image providers:

  • Docker Hub that has the strict pull rate limit
  • ECR that has not strict pull rate limit

For that, it's useful to respectively set pull intervals.

However, we don't have to handle such carefully if we define that case as an edge case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think in that case they can split into multiple pipeds.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, agree with you. The general case had better be prioritized. Okay, let me work on them in another PR. I'll fix your other change requests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// And then pushes those with differences.
func (w *watcher) run(ctx context.Context, provider imageprovider.Provider, interval time.Duration) {
defer w.wg.Done()
Expand All @@ -101,51 +106,48 @@ func (w *watcher) run(ctx context.Context, provider imageprovider.Provider, inte
case <-ctx.Done():
return
case <-ticker.C:
updates := make([]config.ImageWatcherTarget, 0)
// Inspect all targets in all repos, and compare only images managed by the given provider.
for id, repo := range w.gitRepos {
u, err := w.determineUpdates(ctx, id, repo, provider)
cfg, err := w.loadImageWatcherConfig(ctx, id, repo)
if err != nil {
w.logger.Error("failed to determine images to be updated",
w.logger.Error("failed to load image watcher config",
zap.String("repo-id", id),
zap.Error(err),
)
continue
}
updates = append(updates, u...)
}
if len(updates) == 0 {
w.logger.Info("no image to be updated",
zap.String("image-provider", provider.Name()),
)
continue
}
if err := update(updates); err != nil {
w.logger.Error("failed to update image",
zap.String("image-provider", provider.Name()),
zap.Error(err),
)
continue
for _, target := range cfg.Targets {
if target.Provider != provider.Name() {
continue
}
if err := w.updateOutdatedImage(ctx, &target, repo, provider); err != nil {
w.logger.Error("failed to update image",
zap.String("repo-id", id),
zap.String("image-provider", provider.Name()),
zap.Error(err),
)
continue
}
}
}
}
}
}

// determineUpdates gives back target images to be updated for a given repo.
func (w *watcher) determineUpdates(ctx context.Context, repoID string, repo git.Repo, provider imageprovider.Provider) ([]config.ImageWatcherTarget, error) {
branch := repo.GetClonedBranch()
// loadImageWatcherConfig gives back an Image Watcher Config for the given repo.
func (w *watcher) loadImageWatcherConfig(ctx context.Context, repoID string, repo git.Repo) (*config.ImageWatcherSpec, error) {
w.mu.Lock()
err := repo.Pull(ctx, branch)
err := repo.Pull(ctx, repo.GetClonedBranch())
w.mu.Unlock()
if err != nil {
return nil, fmt.Errorf("failed to fetch from and integrate with a local branch: %w", err)
return nil, fmt.Errorf("failed to perform git pull: %w", err)
}

// Load Image Watcher Config for the given repo.
var includes, excludes []string
for _, target := range w.config.ImageWatcher.Repos {
if target.RepoID == repoID {
includes = target.Includes
excludes = target.Excludes
for _, repos := range w.config.ImageWatcher.Repos {
if repos.RepoID == repoID {
includes = repos.Includes
excludes = repos.Excludes
break
}
}
Expand All @@ -156,51 +158,61 @@ func (w *watcher) determineUpdates(ctx context.Context, repoID string, repo git.
if !ok {
return nil, fmt.Errorf("configuration file for Image Watcher not found: %w", err)
}

updates := make([]config.ImageWatcherTarget, 0)
for _, target := range cfg.Targets {
if provider.Name() != target.Provider {
continue
}
outdated, err := checkOutdated(ctx, target, repo, provider)
if err != nil {
return nil, fmt.Errorf("failed to check the image is outdated: %w", err)
}
if outdated {
updates = append(updates, target)
}
}
return updates, nil
return cfg, nil
Copy link
Member

@khanhtc1202 khanhtc1202 Dec 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I suggest we could simplify the function signature of this config.LoadImageWatcher function to

func LoadImageWatcher(repoRoot string, includes, excludes []string) (*ImageWatcherSpec, error)

and create a new error type config.NotFound, then check it here with errors.Is.
The reasons is

  1. current ok value here look quite weird, should it be found? such that it's clarifying shown what do we use it for.
  2. I read the implementation of config.LoadImageWatcher once, looks like after check for os.IsNotExist we have to pass false value for it on all other cases.

What do you think of it? 😄

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! No objection to that! Other part should use that error type, so let me address it as another patch: #1259

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks 👍

}

// checkOutdated checks if the image defined in the given target is identical to the one in image provider.
func checkOutdated(ctx context.Context, target config.ImageWatcherTarget, repo git.Repo, provider imageprovider.Provider) (bool, error) {
// updateOutdatedImage first compares the image in git repository and one in image provider.
// Then pushes rewritten one to the git repository if any deviation exists.
func (w *watcher) updateOutdatedImage(ctx context.Context, target *config.ImageWatcherTarget, repo git.Repo, provider imageprovider.Provider) error {
// Fetch from the image provider.
i, err := provider.ParseImage(target.Image)
if err != nil {
return false, err
return fmt.Errorf("failed to parse image string \"%s\": %w", target.Image, err)
}
// TODO: Control not to reach the rate limit
imageRef, err := provider.GetLatestImage(ctx, i)
imageInRegistry, err := provider.GetLatestImage(ctx, i)
if err != nil {
return false, err
return fmt.Errorf("failed to get latest image from %s: %w", provider.Name(), err)
}

yml, err := ioutil.ReadFile(filepath.Join(repo.GetPath(), target.FilePath))
// Fetch from the git repository.
path := filepath.Join(repo.GetPath(), target.FilePath)
yml, err := ioutil.ReadFile(path)
if err != nil {
return false, err
return fmt.Errorf("failed to read file: %w", err)
}
value, err := yamlprocessor.GetValue(yml, target.Field)
if err != nil {
return false, err
return fmt.Errorf("failed to get value at %s in %s: %w", target.Field, target.FilePath, err)
}
v, ok := value.(string)
imageInGit, ok := value.(string)
if !ok {
return false, fmt.Errorf("unknown value is defined at %s in %s", target.FilePath, target.Field)
return fmt.Errorf("unknown value is defined at %s in %s", target.FilePath, target.Field)
}
return imageRef.String() != v, nil
}

func update(targets []config.ImageWatcherTarget) error {
// TODO: Make it possible to push outdated images to Git
outdated := imageInRegistry.String() != imageInGit
if !outdated {
return nil
}

// Update the outdated image.
newYml, err := yamlprocessor.ReplaceValue(yml, target.Field, imageInRegistry.String())
if err != nil {
return fmt.Errorf("failed to replace value at %s with %s: %w", target.Field, imageInRegistry, err)
}
changes := map[string][]byte{
target.FilePath: newYml,
}
// TODO: Make it changeable the commit message
msg := fmt.Sprintf(defaultCommitMessageFormat, imageInGit, imageInRegistry.String(), target.Field, target.FilePath)
w.mu.Lock()
if err := repo.CommitChanges(ctx, repo.GetClonedBranch(), msg, false, changes); err != nil {
return fmt.Errorf("failed to perform git commit: %w", err)
}
err = repo.Push(ctx, repo.GetClonedBranch())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to commit the changes before pushing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that's the basic part...

w.mu.Unlock()
if err != nil {
return fmt.Errorf("failed to perform git push: %w", err)
}
return nil
}