Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/app/api/grpcapi/piped_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,6 @@ func (a *PipedAPI) UpdateApplicationConfigurations(ctx context.Context, req *pip
return nil, err
}
}
// TODO: Consider bulk-updating multiple apps
for _, appInfo := range req.Applications {
updater := func(app *model.Application) error {
app.Name = appInfo.Name
Expand Down
247 changes: 112 additions & 135 deletions pkg/app/piped/appconfigreporter/appconfigreporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type Reporter struct {

// Whether it already swept all unregistered apps from control-plane.
sweptUnregisteredApps bool
// The latest unregistered apps for each repository.
latestUnregisteredApps map[string][]*model.ApplicationInfo
}

func NewReporter(
Expand All @@ -78,14 +80,15 @@ func NewReporter(
logger *zap.Logger,
) *Reporter {
return &Reporter{
apiClient: apiClient,
gitClient: gitClient,
applicationLister: appLister,
config: cfg,
gracePeriod: gracePeriod,
lastScannedCommits: make(map[string]string),
fileSystem: &fileSystem{},
logger: logger.Named("app-config-reporter"),
apiClient: apiClient,
gitClient: gitClient,
applicationLister: appLister,
config: cfg,
gracePeriod: gracePeriod,
lastScannedCommits: make(map[string]string),
fileSystem: &fileSystem{},
logger: logger.Named("app-config-reporter"),
latestUnregisteredApps: make(map[string][]*model.ApplicationInfo),
}
}

Expand Down Expand Up @@ -131,6 +134,7 @@ func (r *Reporter) scanAppConfigs(ctx context.Context) error {
}

// Make all repos up-to-date.
headCommits := make(map[string]string, len(r.gitRepos))
for repoID, repo := range r.gitRepos {
if err := repo.Pull(ctx, repo.GetClonedBranch()); err != nil {
r.logger.Error("failed to update repo to latest",
Expand All @@ -139,156 +143,85 @@ func (r *Reporter) scanAppConfigs(ctx context.Context) error {
)
return err
}
headCommit, err := repo.GetLatestCommit(ctx)
if err != nil {
return fmt.Errorf("failed to get the latest commit of %s: %w", repoID, err)
}
headCommits[repoID] = headCommit.Hash
}

apps := r.applicationLister.List()
// Create a map from Git path key to app id, to determine if the application is registered.
registeredAppPaths := make(map[string]string, len(apps))
for _, app := range apps {
key := makeGitPathKey(app.GitPath.Repo.Id, filepath.Join(app.GitPath.Path, app.GitPath.ConfigFilename))
registeredAppPaths[key] = app.Id
}

if err := r.updateRegisteredApps(ctx, registeredAppPaths); err != nil {
return fmt.Errorf("failed to update registered applications: %w", err)
if err := r.updateRegisteredApps(ctx, headCommits); err != nil {
return err
}
if err := r.updateUnregisteredApps(ctx, registeredAppPaths); err != nil {
return fmt.Errorf("failed to update unregistered applications: %w", err)
if err := r.updateUnregisteredApps(ctx, headCommits); err != nil {
return err
}

for repoID, hash := range headCommits {
r.lastScannedCommits[repoID] = hash
}
return nil
}

// updateRegisteredApps sends application configurations that have changed since the last time to the control-plane.
func (r *Reporter) updateRegisteredApps(ctx context.Context, registeredAppPaths map[string]string) (err error) {
apps := make([]*model.ApplicationInfo, 0)
headCommits := make(map[string]string, len(r.gitRepos))
func (r *Reporter) updateRegisteredApps(ctx context.Context, headCommits map[string]string) error {
registeredApps := make([]*model.ApplicationInfo, 0)
for repoID, repo := range r.gitRepos {
var headCommit git.Commit
headCommit, err = repo.GetLatestCommit(ctx)
if err != nil {
return fmt.Errorf("failed to get the latest commit of %s: %w", repoID, err)
}
headCommit := headCommits[repoID]
// Skip if the head commit is already scanned.
lastScannedCommit, ok := r.lastScannedCommits[repoID]
if ok && headCommit.Hash == lastScannedCommit {
if lastScannedCommit, ok := r.lastScannedCommits[repoID]; ok && headCommit == lastScannedCommit {
continue
}
var as []*model.ApplicationInfo
as, err = r.findRegisteredApps(ctx, repoID, repo, lastScannedCommit, headCommit.Hash, registeredAppPaths)

rs, err := r.findRegisteredApps(repo.GetPath(), repoID)
if err != nil {
return err
}
apps = append(apps, as...)
headCommits[repoID] = headCommit.Hash
r.logger.Info(fmt.Sprintf("found out %d registered applications that config has been changed in repository %q", len(rs), repoID))
registeredApps = append(registeredApps, rs...)
}
defer func() {
if err == nil {
for repoID, hash := range headCommits {
r.lastScannedCommits[repoID] = hash
}
}
}()
if len(apps) == 0 {
return
if len(registeredApps) == 0 {
return nil
}

_, err = r.apiClient.UpdateApplicationConfigurations(
_, err := r.apiClient.UpdateApplicationConfigurations(
ctx,
&pipedservice.UpdateApplicationConfigurationsRequest{
Applications: apps,
Applications: registeredApps,
},
)
if err != nil {
return fmt.Errorf("failed to update application configurations: %w", err)
}
return
return nil
}

// findRegisteredApps finds out registered application info in the given git repository.
func (r *Reporter) findRegisteredApps(ctx context.Context, repoID string, repo gitRepo, lastScannedCommit, headCommitHash string, registeredAppPaths map[string]string) ([]*model.ApplicationInfo, error) {
if lastScannedCommit == "" {
// Scan all files because it seems to be the first commit since Piped starts.
apps := make([]*model.ApplicationInfo, 0)
err := fs.WalkDir(r.fileSystem, repo.GetPath(), func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() {
return nil
}
cfgRelPath, err := filepath.Rel(repo.GetPath(), path)
if err != nil {
return err
}
gitPathKey := makeGitPathKey(repoID, cfgRelPath)
appID, registered := registeredAppPaths[gitPathKey]
if !registered {
return nil
}

appInfo, err := r.readApplicationInfo(repo.GetPath(), repoID, filepath.Dir(cfgRelPath), filepath.Base(cfgRelPath))
if err != nil {
r.logger.Error("failed to read application info",
zap.String("repo-id", repoID),
zap.String("config-file-path", cfgRelPath),
zap.Error(err),
)
return nil
// updateUnregisteredApps sends all unregistered application configurations to the control-plane.
func (r *Reporter) updateUnregisteredApps(ctx context.Context, headCommits map[string]string) error {
unregisteredApps := make([]*model.ApplicationInfo, 0)
for repoID, repo := range r.gitRepos {
headCommit := headCommits[repoID]
if lastScannedCommit, ok := r.lastScannedCommits[repoID]; ok && headCommit == lastScannedCommit {
// Use the cached apps if the head commit is already scanned.
// The unregistered apps sent previously aren't persisted, that's why it has to send them again even if it's scanned one.
if apps, ok := r.latestUnregisteredApps[repoID]; ok {
unregisteredApps = append(unregisteredApps, apps...)
}
appInfo.Id = appID
apps = append(apps, appInfo)
// Continue reading so that it can return apps as much as possible.
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to inspect files under %s: %w", repo.GetPath(), err)
}
return apps, nil
}

filePaths, err := repo.ChangedFiles(ctx, lastScannedCommit, headCommitHash)
if err != nil {
return nil, fmt.Errorf("failed to get files those were touched between two commits: %w", err)
}
if len(filePaths) == 0 {
// The case where all changes have been fully reverted.
return []*model.ApplicationInfo{}, nil
}
apps := make([]*model.ApplicationInfo, 0)
for _, path := range filePaths {
gitPathKey := makeGitPathKey(repoID, path)
appID, registered := registeredAppPaths[gitPathKey]
if !registered {
continue
}
appInfo, err := r.readApplicationInfo(repo.GetPath(), repoID, filepath.Dir(path), filepath.Base(path))
if err != nil {
r.logger.Error("failed to read application info",
zap.String("repo-id", repoID),
zap.String("config-file-path", path),
zap.Error(err),
)
continue
}
appInfo.Id = appID
apps = append(apps, appInfo)
}
return apps, nil
}

// updateUnregisteredApps sends all unregistered application configurations to the control-plane.
func (r *Reporter) updateUnregisteredApps(ctx context.Context, registeredAppPaths map[string]string) error {
apps := make([]*model.ApplicationInfo, 0)
for repoID, repo := range r.gitRepos {
as, err := r.findUnregisteredApps(repo.GetPath(), repoID, registeredAppPaths)
us, err := r.findUnregisteredApps(repo.GetPath(), repoID)
if err != nil {
return err
}
r.logger.Info(fmt.Sprintf("found out %d unregistered applications in repository %s", len(as), repoID))
apps = append(apps, as...)
r.logger.Info(fmt.Sprintf("found out %d unregistered applications in repository %q", len(us), repoID))
unregisteredApps = append(unregisteredApps, us...)
r.latestUnregisteredApps[repoID] = us
}
if len(apps) == 0 {

// Even if the result is zero, we need to report at least once.
// However, it should return after the second time which is unnecessary.
if len(unregisteredApps) == 0 {
if r.sweptUnregisteredApps {
return nil
}
Expand All @@ -300,7 +233,7 @@ func (r *Reporter) updateUnregisteredApps(ctx context.Context, registeredAppPath
_, err := r.apiClient.ReportUnregisteredApplicationConfigurations(
ctx,
&pipedservice.ReportUnregisteredApplicationConfigurationsRequest{
Applications: apps,
Applications: unregisteredApps,
},
)
if err != nil {
Expand All @@ -309,10 +242,61 @@ func (r *Reporter) updateUnregisteredApps(ctx context.Context, registeredAppPath
return nil
}

// findRegisteredApps finds out registered application info that should be updated in the given git repository.
func (r *Reporter) findRegisteredApps(repoPath, repoID string) ([]*model.ApplicationInfo, error) {
// Compare the apps registered on Control-plane with the latest config file
// and return only the ones that have been changed.
apps := make([]*model.ApplicationInfo, 0)
for _, app := range r.applicationLister.List() {
if app.GitPath.Repo.Id != repoID {
continue
}
appCfg, err := r.readApplicationInfo(repoPath, repoID, app.GitPath.Path, app.GitPath.ConfigFilename)
if err != nil {
return nil, fmt.Errorf("failed to read application config file: %w", err)
}
if isSynced(appCfg, app) {
continue
}
appCfg.Id = app.Id
apps = append(apps, appCfg)
}
return apps, nil
}

func isSynced(appInfo *model.ApplicationInfo, app *model.Application) bool {
// TODO: Make it possible to follow the ApplicationInfo field changes
if appInfo.Name != app.Name {
return false
}
if appInfo.Kind != app.Kind {
return false
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the Kind could cause unexpected behavior since the application is paired with a Cloud Provider selected while adding the app.
So I think that field should not be allowed to be changed. How do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I feel the same way. Currently changing it via the web client is allowed, but few users use it possibly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied at f05df12

if len(appInfo.Labels) != len(app.Labels) {
return false
}
for key, value := range appInfo.Labels {
if value != app.Labels[key] {
return false
}
}
return true
}

// findUnregisteredApps finds out unregistered application info in the given git repository.
// The file name must be default name in order to be recognized as an Application config.
func (r *Reporter) findUnregisteredApps(repoPath, repoID string, registeredAppPaths map[string]string) ([]*model.ApplicationInfo, error) {
apps := make([]*model.ApplicationInfo, 0)
func (r *Reporter) findUnregisteredApps(repoPath, repoID string) ([]*model.ApplicationInfo, error) {
apps := r.applicationLister.List()
// Create a map to determine the app is registered by GitPath.
registeredAppPaths := make(map[string]struct{}, len(apps))
for _, app := range apps {
if app.GitPath.Repo.Id != repoID {
continue
}
registeredAppPaths[filepath.Join(app.GitPath.Path, app.GitPath.ConfigFilename)] = struct{}{}
}

out := make([]*model.ApplicationInfo, 0)
// Scan all files under the repository.
err := fs.WalkDir(r.fileSystem, repoPath, func(path string, d fs.DirEntry, err error) error {
if err != nil {
Expand All @@ -328,8 +312,7 @@ func (r *Reporter) findUnregisteredApps(repoPath, repoID string, registeredAppPa
if !model.IsApplicationConfigFile(filepath.Base(cfgRelPath)) {
return nil
}
gitPathKey := makeGitPathKey(repoID, cfgRelPath)
if _, registered := registeredAppPaths[gitPathKey]; registered {
if _, registered := registeredAppPaths[cfgRelPath]; registered {
return nil
}

Expand All @@ -342,14 +325,14 @@ func (r *Reporter) findUnregisteredApps(repoPath, repoID string, registeredAppPa
)
return nil
}
apps = append(apps, appInfo)
out = append(out, appInfo)
// Continue reading so that it can return apps as much as possible.
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to inspect files under %s: %w", repoPath, err)
}
return apps, nil
return out, nil
}

func (r *Reporter) readApplicationInfo(repoDir, repoID, appDirRelPath, cfgFilename string) (*model.ApplicationInfo, error) {
Expand Down Expand Up @@ -383,9 +366,3 @@ func (r *Reporter) readApplicationInfo(repoDir, repoID, appDirRelPath, cfgFilena
ConfigFilename: cfgFilename,
}, nil
}

// makeGitPathKey builds a unique path between repositories.
// cfgFilePath is a relative path from the repo root.
func makeGitPathKey(repoID, cfgFilePath string) string {
return fmt.Sprintf("%s:%s", repoID, cfgFilePath)
}
Loading