-
Notifications
You must be signed in to change notification settings - Fork 208
Add a piped component that watches app configs #2772
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
9d12e05
390aa68
1127e3f
69d791e
cecda51
022a1e7
d1f965c
0fdd95a
82bbf7f
2eece68
b2d8d24
ced51c7
b457cb8
2280ae6
a57d7cd
5ec2cbb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| load("@io_bazel_rules_go//go:def.bzl", "go_library") | ||
|
|
||
| go_library( | ||
| name = "go_default_library", | ||
| srcs = ["appconfigreporter.go"], | ||
| importpath = "github.com/pipe-cd/pipe/pkg/app/piped/appconfigreporter", | ||
| visibility = ["//visibility:public"], | ||
| deps = [ | ||
| "//pkg/app/api/service/pipedservice:go_default_library", | ||
| "//pkg/cache:go_default_library", | ||
| "//pkg/cache/memorycache:go_default_library", | ||
| "//pkg/config:go_default_library", | ||
| "//pkg/git:go_default_library", | ||
| "//pkg/model:go_default_library", | ||
| "@org_golang_google_grpc//:go_default_library", | ||
| "@org_uber_go_zap//:go_default_library", | ||
| ], | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,206 @@ | ||
| // Copyright 2021 The PipeCD Authors. | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package appconfigreporter | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "path/filepath" | ||
| "time" | ||
|
|
||
| "go.uber.org/zap" | ||
| "google.golang.org/grpc" | ||
|
|
||
| "github.com/pipe-cd/pipe/pkg/app/api/service/pipedservice" | ||
| "github.com/pipe-cd/pipe/pkg/config" | ||
| "github.com/pipe-cd/pipe/pkg/git" | ||
| "github.com/pipe-cd/pipe/pkg/model" | ||
| ) | ||
|
|
||
| type apiClient interface { | ||
| UpdateApplicationConfigurations(ctx context.Context, in *pipedservice.UpdateApplicationConfigurationsRequest, opts ...grpc.CallOption) (*pipedservice.UpdateApplicationConfigurationsResponse, error) | ||
| ReportUnregisteredApplicationConfigurations(ctx context.Context, in *pipedservice.ReportUnregisteredApplicationConfigurationsRequest, opts ...grpc.CallOption) (*pipedservice.ReportUnregisteredApplicationConfigurationsResponse, error) | ||
| } | ||
|
|
||
| type gitClient interface { | ||
| Clone(ctx context.Context, repoID, remote, branch, destination string) (git.Repo, error) | ||
| } | ||
|
|
||
| type applicationLister interface { | ||
| List() []*model.Application | ||
| } | ||
|
|
||
| type Reporter struct { | ||
| apiClient apiClient | ||
| gitClient gitClient | ||
| applicationLister applicationLister | ||
| config *config.PipedSpec | ||
| gitRepos map[string]git.Repo | ||
| gracePeriod time.Duration | ||
| // Cache for the last scanned commit for each repository. | ||
| // Not goroutine safe. | ||
| lastScannedCommits map[string]string | ||
| logger *zap.Logger | ||
| } | ||
|
|
||
| func NewReporter( | ||
| apiClient apiClient, | ||
| gitClient gitClient, | ||
| appLister applicationLister, | ||
| cfg *config.PipedSpec, | ||
| gracePeriod time.Duration, | ||
| logger *zap.Logger, | ||
| ) *Reporter { | ||
| return &Reporter{ | ||
| apiClient: apiClient, | ||
| gitClient: gitClient, | ||
| applicationLister: appLister, | ||
| config: cfg, | ||
| gracePeriod: gracePeriod, | ||
| lastScannedCommits: make(map[string]string), | ||
| logger: logger.Named("app-config-reporter"), | ||
| } | ||
| } | ||
|
|
||
| func (r *Reporter) Run(ctx context.Context) error { | ||
| r.logger.Info("start running app-config-reporter") | ||
|
|
||
| // Pre-clone to cache the registered git repositories. | ||
| r.gitRepos = make(map[string]git.Repo, len(r.config.Repositories)) | ||
| for _, repoCfg := range r.config.Repositories { | ||
| repo, err := r.gitClient.Clone(ctx, repoCfg.RepoID, repoCfg.Remote, repoCfg.Branch, "") | ||
| if err != nil { | ||
| r.logger.Error("failed to clone repository", | ||
| zap.String("repo-id", repoCfg.RepoID), | ||
| zap.Error(err), | ||
| ) | ||
| return err | ||
| } | ||
| r.gitRepos[repoCfg.RepoID] = repo | ||
| } | ||
|
|
||
| // FIXME: Think about sync interval of app config reporter | ||
| ticker := time.NewTicker(r.config.SyncInterval.Duration()) | ||
| defer ticker.Stop() | ||
|
|
||
| for { | ||
| select { | ||
| case <-ticker.C: | ||
| if err := r.checkApps(ctx); err != nil { | ||
| r.logger.Error("failed to check application configurations defined in Git", zap.Error(err)) | ||
| } | ||
| case <-ctx.Done(): | ||
| r.logger.Info("app-config-reporter has been stopped") | ||
| return nil | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // checkApps checks and reports two types of applications. | ||
| // One is applications registered in Control-plane already, and another is ones that aren't registered yet. | ||
| func (r *Reporter) checkApps(ctx context.Context) (err error) { | ||
| if len(r.gitRepos) == 0 { | ||
| r.logger.Info("no repositories were configured for this piped") | ||
| return | ||
| } | ||
|
|
||
| var ( | ||
| unusedApps = make([]*model.ApplicationInfo, 0) | ||
| appsToBeUpdated = make([]*model.ApplicationInfo, 0) | ||
| ) | ||
| for repoID, repo := range r.gitRepos { | ||
| if err = repo.Pull(ctx, repo.GetClonedBranch()); err != nil { | ||
| r.logger.Error("failed to update repo to latest", | ||
| zap.String("repo-id", repoID), | ||
| zap.Error(err), | ||
| ) | ||
| return | ||
| } | ||
|
|
||
| // TODO: Collect unused application configurations that aren't used yet | ||
| // Currently, it could be thought the best to open files that suffixed by .pipe.yaml | ||
|
|
||
| var headCommit git.Commit | ||
| // Get the head commit of the repository. | ||
| headCommit, err = repo.GetLatestCommit(ctx) | ||
| if err != nil { | ||
| return | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Add a log message that could help for debugging by our users.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice. It is definite that currently it's quite hard to see what's going on. |
||
| lastFetchedCommit, ok := r.lastScannedCommits[repoID] | ||
| if ok && headCommit.Hash == lastFetchedCommit { | ||
| continue | ||
| } | ||
| appsMap := r.listApplications() | ||
| apps, ok := appsMap[repoID] | ||
| if !ok { | ||
| continue | ||
| } | ||
| for _, app := range apps { | ||
| gitPath := app.GetGitPath() | ||
| _ = filepath.Join(repo.GetPath(), gitPath.Path, gitPath.ConfigFilename) | ||
| // TODO: Collect applications that need to be updated | ||
| } | ||
|
||
|
|
||
| defer func() { | ||
| if err == nil { | ||
| r.lastScannedCommits[repoID] = headCommit.Hash | ||
| } | ||
| }() | ||
| } | ||
| if len(unusedApps) > 0 { | ||
| _, err = r.apiClient.ReportUnregisteredApplicationConfigurations( | ||
| ctx, | ||
| &pipedservice.ReportUnregisteredApplicationConfigurationsRequest{ | ||
| Applications: unusedApps, | ||
| }, | ||
| ) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to put the latest unregistered application configurations: %w", err) | ||
| } | ||
| } | ||
|
|
||
| if len(appsToBeUpdated) == 0 { | ||
| return nil | ||
|
||
| } | ||
| _, err = r.apiClient.UpdateApplicationConfigurations( | ||
| ctx, | ||
| &pipedservice.UpdateApplicationConfigurationsRequest{ | ||
| Applications: appsToBeUpdated, | ||
| }, | ||
| ) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to update application configurations: %w", err) | ||
| } | ||
|
|
||
| return | ||
| } | ||
|
|
||
| // listApplications retrieves all applications that should be handled by this piped | ||
| // and then groups them by repoID. | ||
| func (r *Reporter) listApplications() map[string][]*model.Application { | ||
| var ( | ||
| apps = r.applicationLister.List() | ||
| repoToApps = make(map[string][]*model.Application) | ||
| ) | ||
| for _, app := range apps { | ||
| repoId := app.GitPath.Repo.Id | ||
nakabonne marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if _, ok := repoToApps[repoId]; !ok { | ||
| repoToApps[repoId] = []*model.Application{app} | ||
| } else { | ||
| repoToApps[repoId] = append(repoToApps[repoId], app) | ||
| } | ||
| } | ||
| return repoToApps | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.