diff --git a/pkg/app/piped/apistore/environmentstore/BUILD.bazel b/pkg/app/piped/apistore/environmentstore/BUILD.bazel index 320f9cb4e7..3ab4918141 100644 --- a/pkg/app/piped/apistore/environmentstore/BUILD.bazel +++ b/pkg/app/piped/apistore/environmentstore/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "//pkg/cache:go_default_library", "//pkg/model:go_default_library", "@org_golang_google_grpc//:go_default_library", + "@org_golang_x_sync//singleflight:go_default_library", "@org_uber_go_zap//:go_default_library", ], ) diff --git a/pkg/app/piped/apistore/environmentstore/store.go b/pkg/app/piped/apistore/environmentstore/store.go index 389a24392b..db7fd268b7 100644 --- a/pkg/app/piped/apistore/environmentstore/store.go +++ b/pkg/app/piped/apistore/environmentstore/store.go @@ -16,9 +16,11 @@ package environmentstore import ( "context" + "fmt" "time" "go.uber.org/zap" + "golang.org/x/sync/singleflight" "google.golang.org/grpc" "github.com/pipe-cd/pipe/pkg/app/api/service/pipedservice" @@ -26,6 +28,10 @@ import ( "github.com/pipe-cd/pipe/pkg/model" ) +const ( + defaultAPITimeout = time.Minute +) + type apiClient interface { GetEnvironment(ctx context.Context, in *pipedservice.GetEnvironmentRequest, opts ...grpc.CallOption) (*pipedservice.GetEnvironmentResponse, error) } @@ -33,48 +39,58 @@ type apiClient interface { // Lister helps list and get Environment. // All objects returned here must be treated as read-only. type Lister interface { - // Get retrieves a specifiec Environment for the given id. - Get(id string) (*model.Environment, bool) + Get(ctx context.Context, id string) (*model.Environment, error) } type Store struct { - apiClient apiClient - cache cache.Cache - apiTimeout time.Duration - logger *zap.Logger + apiClient apiClient + cache cache.Cache + callGroup *singleflight.Group + logger *zap.Logger } func NewStore(apiClient apiClient, cache cache.Cache, logger *zap.Logger) *Store { return &Store{ - apiClient: apiClient, - cache: cache, - apiTimeout: 10 * time.Second, - logger: logger.Named("environmentstore"), + apiClient: apiClient, + cache: cache, + callGroup: &singleflight.Group{}, + logger: logger.Named("environmentstore"), } } -func (s *Store) Get(id string) (*model.Environment, bool) { +func (s *Store) Get(ctx context.Context, id string) (*model.Environment, error) { env, err := s.cache.Get(id) if err == nil { - return env.(*model.Environment), true + return env.(*model.Environment), nil } - ctx, cancel := context.WithTimeout(context.Background(), s.apiTimeout) + // Ensure that timeout is configured. + ctx, cancel := context.WithTimeout(ctx, defaultAPITimeout) defer cancel() - resp, err := s.apiClient.GetEnvironment(ctx, &pipedservice.GetEnvironmentRequest{ - Id: id, + // Ensure that only one RPC call is executed for the given key at a time + // and the newest data is stored in the cache. + data, err, _ := s.callGroup.Do(id, func() (interface{}, error) { + req := &pipedservice.GetEnvironmentRequest{ + Id: id, + } + resp, err := s.apiClient.GetEnvironment(ctx, req) + if err != nil { + s.logger.Warn("failed to get environment from control plane", + zap.String("env", id), + zap.Error(err), + ) + return nil, fmt.Errorf("failed to get environment %s, %w", id, err) + } + + if err := s.cache.Put(id, resp.Environment); err != nil { + s.logger.Warn("failed to put environment to cache", zap.Error(err)) + } + return resp.Environment, nil }) - if err != nil { - s.logger.Warn("unable to get environment from control plane", - zap.String("env", id), - zap.Error(err), - ) - return nil, false - } - if err := s.cache.Put(id, resp.Environment); err != nil { - s.logger.Warn("unable to put environment to cache", zap.Error(err)) + if err != nil { + return nil, err } - return resp.Environment, true + return data.(*model.Environment), nil } diff --git a/pkg/app/piped/controller/controller.go b/pkg/app/piped/controller/controller.go index 567b1a19cc..9c4abd6ed8 100644 --- a/pkg/app/piped/controller/controller.go +++ b/pkg/app/piped/controller/controller.go @@ -77,7 +77,7 @@ type applicationLister interface { } type environmentLister interface { - Get(id string) (*model.Environment, bool) + Get(ctx context.Context, id string) (*model.Environment, error) } type liveResourceLister interface { @@ -405,14 +405,14 @@ func (c *controller) startNewPlanner(ctx context.Context, d *model.Deployment) ( } } - var envName string - if env, ok := c.environmentLister.Get(d.EnvId); ok { - envName = env.Name + env, err := c.environmentLister.Get(ctx, d.EnvId) + if err != nil { + return nil, err } planner := newPlanner( d, - envName, + env.Name, commit, workingDir, c.apiClient, @@ -551,15 +551,15 @@ func (c *controller) startNewScheduler(ctx context.Context, d *model.Deployment) } logger.Info("created working directory for scheduler", zap.String("working-dir", workingDir)) - var envName string - if env, ok := c.environmentLister.Get(d.EnvId); ok { - envName = env.Name + env, err := c.environmentLister.Get(ctx, d.EnvId) + if err != nil { + return nil, err } // Create a new scheduler and append to the list for tracking. scheduler := newScheduler( d, - envName, + env.Name, workingDir, c.apiClient, c.gitClient, diff --git a/pkg/app/piped/trigger/deployment.go b/pkg/app/piped/trigger/deployment.go index 2acb431956..ac82d06d6a 100644 --- a/pkg/app/piped/trigger/deployment.go +++ b/pkg/app/piped/trigger/deployment.go @@ -44,15 +44,15 @@ func (t *Trigger) triggerDeployment( if err != nil { return } - var envName string - if env, ok := t.environmentLister.Get(deployment.EnvId); ok { - envName = env.Name + env, err := t.environmentLister.Get(ctx, deployment.EnvId) + if err != nil { + return } t.notifier.Notify(model.NotificationEvent{ Type: model.NotificationEventType_EVENT_DEPLOYMENT_TRIGGERED, Metadata: &model.NotificationEventDeploymentTriggered{ Deployment: deployment, - EnvName: envName, + EnvName: env.Name, }, }) }() diff --git a/pkg/app/piped/trigger/trigger.go b/pkg/app/piped/trigger/trigger.go index c883aced40..ce33a68426 100644 --- a/pkg/app/piped/trigger/trigger.go +++ b/pkg/app/piped/trigger/trigger.go @@ -64,7 +64,7 @@ type commandLister interface { } type environmentLister interface { - Get(id string) (*model.Environment, bool) + Get(ctx context.Context, id string) (*model.Environment, error) } type notifier interface {