Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/app/piped/apistore/environmentstore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//pkg/cache:go_default_library",
"//pkg/model:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_x_sync//singleflight:go_default_library",
"@org_uber_go_zap//:go_default_library",
],
)
Expand Down
66 changes: 41 additions & 25 deletions pkg/app/piped/apistore/environmentstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,65 +16,81 @@ package environmentstore

import (
"context"
"fmt"
"time"

"go.uber.org/zap"
"golang.org/x/sync/singleflight"
"google.golang.org/grpc"

"github.com/pipe-cd/pipe/pkg/app/api/service/pipedservice"
"github.com/pipe-cd/pipe/pkg/cache"
"github.com/pipe-cd/pipe/pkg/model"
)

const (
defaultAPITimeout = time.Minute
)

type apiClient interface {
GetEnvironment(ctx context.Context, in *pipedservice.GetEnvironmentRequest, opts ...grpc.CallOption) (*pipedservice.GetEnvironmentResponse, error)
}

// Lister helps list and get Environment.
// All objects returned here must be treated as read-only.
type Lister interface {
// Get retrieves a specifiec Environment for the given id.
Get(id string) (*model.Environment, bool)
Get(ctx context.Context, id string) (*model.Environment, error)
}

type Store struct {
apiClient apiClient
cache cache.Cache
apiTimeout time.Duration
logger *zap.Logger
apiClient apiClient
cache cache.Cache
callGroup *singleflight.Group
logger *zap.Logger
}

func NewStore(apiClient apiClient, cache cache.Cache, logger *zap.Logger) *Store {
return &Store{
apiClient: apiClient,
cache: cache,
apiTimeout: 10 * time.Second,
logger: logger.Named("environmentstore"),
apiClient: apiClient,
cache: cache,
callGroup: &singleflight.Group{},
logger: logger.Named("environmentstore"),
}
}

func (s *Store) Get(id string) (*model.Environment, bool) {
func (s *Store) Get(ctx context.Context, id string) (*model.Environment, error) {
env, err := s.cache.Get(id)
if err == nil {
return env.(*model.Environment), true
return env.(*model.Environment), nil
}

ctx, cancel := context.WithTimeout(context.Background(), s.apiTimeout)
// Ensure that timeout is configured.
ctx, cancel := context.WithTimeout(ctx, defaultAPITimeout)
defer cancel()

resp, err := s.apiClient.GetEnvironment(ctx, &pipedservice.GetEnvironmentRequest{
Id: id,
// Ensure that only one RPC call is executed for the given key at a time
// and the newest data is stored in the cache.
data, err, _ := s.callGroup.Do(id, func() (interface{}, error) {
req := &pipedservice.GetEnvironmentRequest{
Id: id,
}
resp, err := s.apiClient.GetEnvironment(ctx, req)
if err != nil {
s.logger.Warn("failed to get environment from control plane",
zap.String("env", id),
zap.Error(err),
)
return nil, fmt.Errorf("failed to get environment %s, %w", id, err)
}

if err := s.cache.Put(id, resp.Environment); err != nil {
s.logger.Warn("failed to put environment to cache", zap.Error(err))
}
return resp.Environment, nil
})
if err != nil {
s.logger.Warn("unable to get environment from control plane",
zap.String("env", id),
zap.Error(err),
)
return nil, false
}

if err := s.cache.Put(id, resp.Environment); err != nil {
s.logger.Warn("unable to put environment to cache", zap.Error(err))
if err != nil {
return nil, err
}
return resp.Environment, true
return data.(*model.Environment), nil
}
18 changes: 9 additions & 9 deletions pkg/app/piped/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type applicationLister interface {
}

type environmentLister interface {
Get(id string) (*model.Environment, bool)
Get(ctx context.Context, id string) (*model.Environment, error)
}

type liveResourceLister interface {
Expand Down Expand Up @@ -405,14 +405,14 @@ func (c *controller) startNewPlanner(ctx context.Context, d *model.Deployment) (
}
}

var envName string
if env, ok := c.environmentLister.Get(d.EnvId); ok {
envName = env.Name
env, err := c.environmentLister.Get(ctx, d.EnvId)
if err != nil {
return nil, err
}

planner := newPlanner(
d,
envName,
env.Name,
commit,
workingDir,
c.apiClient,
Expand Down Expand Up @@ -551,15 +551,15 @@ func (c *controller) startNewScheduler(ctx context.Context, d *model.Deployment)
}
logger.Info("created working directory for scheduler", zap.String("working-dir", workingDir))

var envName string
if env, ok := c.environmentLister.Get(d.EnvId); ok {
envName = env.Name
env, err := c.environmentLister.Get(ctx, d.EnvId)
if err != nil {
return nil, err
}

// Create a new scheduler and append to the list for tracking.
scheduler := newScheduler(
d,
envName,
env.Name,
workingDir,
c.apiClient,
c.gitClient,
Expand Down
8 changes: 4 additions & 4 deletions pkg/app/piped/trigger/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ func (t *Trigger) triggerDeployment(
if err != nil {
return
}
var envName string
if env, ok := t.environmentLister.Get(deployment.EnvId); ok {
envName = env.Name
env, err := t.environmentLister.Get(ctx, deployment.EnvId)
if err != nil {
return
}
t.notifier.Notify(model.NotificationEvent{
Type: model.NotificationEventType_EVENT_DEPLOYMENT_TRIGGERED,
Metadata: &model.NotificationEventDeploymentTriggered{
Deployment: deployment,
EnvName: envName,
EnvName: env.Name,
},
})
}()
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/piped/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type commandLister interface {
}

type environmentLister interface {
Get(id string) (*model.Environment, bool)
Get(ctx context.Context, id string) (*model.Environment, error)
}

type notifier interface {
Expand Down