Skip to content

Commit

Permalink
[cleanup] remove FF for supervisor publicapi (#19894)
Browse files Browse the repository at this point in the history
  • Loading branch information
mustard-mh authored Jun 14, 2024
1 parent 8e0da64 commit 7ff5eda
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 163 deletions.
20 changes: 5 additions & 15 deletions components/common-go/experiments/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@ import (
)

const (
OIDCServiceEnabledFlag = "oidcServiceEnabled"
SupervisorPersistServerAPIChannelWhenStartFlag = "supervisor_persist_serverapi_channel_when_start"
SupervisorUsePublicAPIFlag = "supervisor_experimental_publicapi"
ServiceWaiterSkipComponentsFlag = "service_waiter_skip_components"
IdPClaimKeysFlag = "idp_claim_keys"
SetJavaXmxFlag = "supervisor_set_java_xmx"
SetJavaProcessorCount = "supervisor_set_java_processor_count"
OIDCServiceEnabledFlag = "oidcServiceEnabled"
ServiceWaiterSkipComponentsFlag = "service_waiter_skip_components"
IdPClaimKeysFlag = "idp_claim_keys"
SetJavaXmxFlag = "supervisor_set_java_xmx"
SetJavaProcessorCount = "supervisor_set_java_processor_count"
)

func GetIdPClaimKeys(ctx context.Context, client Client, attributes Attributes) []string {
Expand All @@ -31,14 +29,6 @@ func IsOIDCServiceEnabled(ctx context.Context, client Client, attributes Attribu
return client.GetBoolValue(ctx, OIDCServiceEnabledFlag, false, attributes)
}

func SupervisorPersistServerAPIChannelWhenStart(ctx context.Context, client Client, attributes Attributes) bool {
return client.GetBoolValue(ctx, SupervisorPersistServerAPIChannelWhenStartFlag, true, attributes)
}

func SupervisorUsePublicAPI(ctx context.Context, client Client, attributes Attributes) bool {
return client.GetBoolValue(ctx, SupervisorUsePublicAPIFlag, false, attributes)
}

func IsSetJavaXmx(ctx context.Context, client Client, attributes Attributes) bool {
return client.GetBoolValue(ctx, SetJavaXmxFlag, false, attributes)
}
Expand Down
8 changes: 2 additions & 6 deletions components/supervisor/pkg/serverapi/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
)

const (
ServerTypeServerAPI = "server-api"
ServerTypePublicAPI = "public-api"
)

Expand All @@ -44,12 +43,9 @@ func NewClientMetrics() *ClientMetrics {
}
}

func (c *ClientMetrics) ProcessMetrics(usePublicAPI bool, method string, err error, startTime time.Time) {
func (c *ClientMetrics) ProcessMetrics(method string, err error, startTime time.Time) {
code := status.Code(normalizeError(err))
server := ServerTypeServerAPI
if usePublicAPI {
server = ServerTypePublicAPI
}
server := ServerTypePublicAPI
c.clientHandledCounter.WithLabelValues(method, server, code.String()).Inc()
c.clientHandledHistogram.WithLabelValues(method, server, code.String()).Observe(time.Since(startTime).Seconds())
}
Expand Down
157 changes: 16 additions & 141 deletions components/supervisor/pkg/serverapi/publicapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ import (
"fmt"
"io"
"sync"
"sync/atomic"
"time"

backoff "github.com/cenkalti/backoff/v4"
"github.com/gitpod-io/gitpod/common-go/experiments"
"github.com/gitpod-io/gitpod/common-go/log"
v1 "github.com/gitpod-io/gitpod/components/public-api/go/experimental/v1"
gitpod "github.com/gitpod-io/gitpod/gitpod-protocol"
Expand Down Expand Up @@ -58,21 +56,12 @@ type ServiceConfig struct {
}

type Service struct {
cfg *ServiceConfig
experiments experiments.Client

cfg *ServiceConfig
token string

// gitpodService server API
gitpodService gitpod.APIInterface
// publicAPIConn public API publicAPIConn
publicAPIConn *grpc.ClientConn

// usingPublicAPI is using atomic type to avoid reconnect when configcat value change
usingPublicAPI atomic.Bool
// onUsingPublicAPI which will only used in instanceUpdate config change notify
onUsingPublicAPI chan struct{}

// subs is the subscribers of workspaceUpdates
subs map[chan *gitpod.WorkspaceInstance]struct{}
subMutex sync.Mutex
Expand All @@ -82,7 +71,7 @@ type Service struct {

var _ APIInterface = (*Service)(nil)

func NewServerApiService(ctx context.Context, cfg *ServiceConfig, tknsrv api.TokenServiceServer, exps experiments.Client) *Service {
func NewServerApiService(ctx context.Context, cfg *ServiceConfig, tknsrv api.TokenServiceServer) *Service {
tknres, err := tknsrv.GetToken(context.Background(), &api.GetTokenRequest{
Kind: KindGitpod,
Host: cfg.Host,
Expand All @@ -97,40 +86,19 @@ func NewServerApiService(ctx context.Context, cfg *ServiceConfig, tknsrv api.Tok
log.WithError(err).Error("cannot get token for Gitpod API")
return nil
}
// server api
gitpodService, err := gitpod.ConnectToServer(cfg.Endpoint, gitpod.ConnectToServerOpts{
Token: tknres.Token,
Log: log.Log,
ExtraHeaders: map[string]string{
"User-Agent": "gitpod/supervisor",
"X-Workspace-Instance-Id": cfg.InstanceID,
"X-Client-Version": cfg.SupervisorVersion,
},
})
if err != nil {
log.WithError(err).Error("cannot connect to Gitpod API")
return nil
}

service := &Service{
token: tknres.Token,
gitpodService: gitpodService,
cfg: cfg,
experiments: exps,
apiMetrics: NewClientMetrics(),
onUsingPublicAPI: make(chan struct{}),
subs: make(map[chan *gitpod.WorkspaceInstance]struct{}),
token: tknres.Token,
cfg: cfg,
apiMetrics: NewClientMetrics(),
subs: make(map[chan *gitpod.WorkspaceInstance]struct{}),
}

// public api
service.tryConnToPublicAPI(ctx)

service.usingPublicAPI.Store(experiments.SupervisorUsePublicAPI(ctx, service.experiments, experiments.Attributes{
UserID: cfg.OwnerID,
}))
// start to listen on real instance updates
go service.onWorkspaceUpdates(ctx)
go service.observeConfigcatValue(ctx)

return service
}
Expand Down Expand Up @@ -164,51 +132,14 @@ func (s *Service) tryConnToPublicAPI(ctx context.Context) {
}
}

func (s *Service) observeConfigcatValue(ctx context.Context) {
ticker := time.NewTicker(time.Second * 10)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
usePublicAPI := experiments.SupervisorUsePublicAPI(ctx, s.experiments, experiments.Attributes{
UserID: s.cfg.OwnerID,
})
if prev := s.usingPublicAPI.Swap(usePublicAPI); prev != usePublicAPI {
if usePublicAPI {
log.Info("switch to use PublicAPI")
} else {
log.Info("switch to use ServerAPI")
}
select {
case s.onUsingPublicAPI <- struct{}{}:
default:
}
}
}
}
}

func (s *Service) usePublicAPI(ctx context.Context) bool {
if s.publicAPIConn == nil {
return false
}
return s.usingPublicAPI.Load()
}

func (s *Service) GetToken(ctx context.Context, query *gitpod.GetTokenSearchOptions) (res *gitpod.Token, err error) {
if s == nil {
return nil, errNotConnected
}
startTime := time.Now()
usePublicApi := s.usePublicAPI(ctx)
defer func() {
s.apiMetrics.ProcessMetrics(usePublicApi, "GetToken", err, startTime)
s.apiMetrics.ProcessMetrics("GetToken", err, startTime)
}()
if !usePublicApi {
return s.gitpodService.GetToken(ctx, query)
}

service := v1.NewUserServiceClient(s.publicAPIConn)
resp, err := service.GetGitToken(ctx, &v1.GetGitTokenRequest{
Expand All @@ -234,14 +165,10 @@ func (s *Service) UpdateGitStatus(ctx context.Context, status *gitpod.WorkspaceI
return errNotConnected
}
startTime := time.Now()
usePublicApi := s.usePublicAPI(ctx)
defer func() {
s.apiMetrics.ProcessMetrics(usePublicApi, "UpdateGitStatus", err, startTime)
s.apiMetrics.ProcessMetrics("UpdateGitStatus", err, startTime)
}()
workspaceID := s.cfg.WorkspaceID
if !usePublicApi {
return s.gitpodService.UpdateGitStatus(ctx, workspaceID, status)
}
service := v1.NewIDEClientServiceClient(s.publicAPIConn)
payload := &v1.UpdateGitStatusRequest{
WorkspaceId: workspaceID,
Expand All @@ -267,14 +194,10 @@ func (s *Service) OpenPort(ctx context.Context, port *gitpod.WorkspaceInstancePo
return nil, errNotConnected
}
startTime := time.Now()
usePublicApi := s.usePublicAPI(ctx)
defer func() {
s.apiMetrics.ProcessMetrics(usePublicApi, "OpenPort", err, startTime)
s.apiMetrics.ProcessMetrics("OpenPort", err, startTime)
}()
workspaceID := s.cfg.WorkspaceID
if !usePublicApi {
return s.gitpodService.OpenPort(ctx, workspaceID, port)
}
service := v1.NewWorkspacesServiceClient(s.publicAPIConn)

payload := &v1.UpdatePortRequest{
Expand Down Expand Up @@ -306,17 +229,13 @@ func (s *Service) OpenPort(ctx context.Context, port *gitpod.WorkspaceInstancePo
// onWorkspaceUpdates listen to server and public API workspaceUpdates and publish to subscribers once Service created.
func (s *Service) onWorkspaceUpdates(ctx context.Context) {
errChan := make(chan error)
processUpdate := func(usePublicAPI bool) context.CancelFunc {
processUpdate := func() context.CancelFunc {
childCtx, cancel := context.WithCancel(ctx)
if usePublicAPI {
go s.publicAPIWorkspaceUpdate(childCtx, errChan)
} else {
go s.serverWorkspaceUpdate(childCtx, errChan)
}
go s.publicAPIWorkspaceUpdate(childCtx, errChan)
return cancel
}
go func() {
cancel := processUpdate(s.usePublicAPI(ctx))
cancel := processUpdate()
defer func() {
cancel()
}()
Expand All @@ -329,10 +248,7 @@ func (s *Service) onWorkspaceUpdates(ctx context.Context) {
return
case <-ticker.C:
cancel()
cancel = processUpdate(s.usePublicAPI(ctx))
case <-s.onUsingPublicAPI:
cancel()
cancel = processUpdate(s.usePublicAPI(ctx))
cancel = processUpdate()
case err := <-errChan:
if errors.Is(err, context.Canceled) || errors.Is(err, io.EOF) {
continue
Expand All @@ -344,7 +260,7 @@ func (s *Service) onWorkspaceUpdates(ctx context.Context) {
log.WithField("method", "WorkspaceUpdates").WithError(err).Error("failed to listen")
cancel()
time.Sleep(time.Second * 2)
cancel = processUpdate(s.usePublicAPI(ctx))
cancel = processUpdate()
}
}
}()
Expand Down Expand Up @@ -378,7 +294,7 @@ func (s *Service) publicAPIWorkspaceUpdate(ctx context.Context, errChan chan err
var err error
defer func() {
if err != nil {
s.apiMetrics.ProcessMetrics(true, "WorkspaceUpdates", err, startTime)
s.apiMetrics.ProcessMetrics("WorkspaceUpdates", err, startTime)
}
}()
service := v1.NewWorkspacesServiceClient(s.publicAPIConn)
Expand All @@ -401,7 +317,7 @@ func (s *Service) publicAPIWorkspaceUpdate(ctx context.Context, errChan chan err
}
startTime := time.Now()
defer func() {
s.apiMetrics.ProcessMetrics(true, "WorkspaceUpdates", err, startTime)
s.apiMetrics.ProcessMetrics("WorkspaceUpdates", err, startTime)
}()
var data *v1.StreamWorkspaceStatusResponse
for {
Expand All @@ -425,47 +341,6 @@ func (s *Service) publicAPIWorkspaceUpdate(ctx context.Context, errChan chan err
}
}

func (s *Service) serverWorkspaceUpdate(ctx context.Context, errChan chan error) {
workspaceID := s.cfg.WorkspaceID
ch, err := backoff.RetryWithData(func() (<-chan *gitpod.WorkspaceInstance, error) {
startTime := time.Now()
ch, err := s.gitpodService.WorkspaceUpdates(ctx, workspaceID)
defer func() {
if err != nil {
s.apiMetrics.ProcessMetrics(false, "WorkspaceUpdates", err, startTime)
}
}()
if err != nil {
log.WithError(err).Info("backoff failed to listen to serverAPI WorkspaceUpdates, try again")
}
return ch, err
}, backoff.WithContext(ConnBackoff, ctx))
if err != nil {
// we don't care about ctx canceled
if ctx.Err() != nil {
return
}
log.WithField("method", "WorkspaceUpdates").WithError(err).Error("failed to call serverAPI")
errChan <- err
return
}
startTime := time.Now()
defer func() {
s.apiMetrics.ProcessMetrics(false, "WorkspaceUpdates", ctx.Err(), startTime)
}()
for update := range ch {
s.subMutex.Lock()
for sub := range s.subs {
sub <- update
}
s.subMutex.Unlock()
}
if ctx.Err() != nil {
return
}
errChan <- io.EOF
}

var ConnBackoff = &backoff.ExponentialBackOff{
InitialInterval: 2 * time.Second,
RandomizationFactor: 0.5,
Expand Down
2 changes: 1 addition & 1 deletion components/supervisor/pkg/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func Run(options ...RunOption) {
OwnerID: cfg.OwnerId,
SupervisorVersion: Version,
ConfigcatEnabled: cfg.ConfigcatEnabled,
}, tokenService, exps)
}, tokenService)
}

if cfg.GetDesktopIDE() != nil {
Expand Down

0 comments on commit 7ff5eda

Please sign in to comment.