Skip to content

Commit

Permalink
fix(server): don't use cluster scope list + watch in namespaced mode. F…
Browse files Browse the repository at this point in the history
…ixes #13177 (#13189)

Signed-off-by: Jiacheng Xu <[email protected]>
(cherry picked from commit 8f3860d)
  • Loading branch information
jiachengxu authored and agilgur5 committed Jun 17, 2024
1 parent 9481bb0 commit c2204ae
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pkg/apiclient/argo-kube-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig,
func (a *argoKubeClient) NewWorkflowServiceClient() workflowpkg.WorkflowServiceClient {
wfArchive := sqldb.NullWorkflowArchive
wfLister := store.NewKubeLister(a.wfClient)
return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfArchive, a.wfClient, wfLister, nil)}}
return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfArchive, a.wfClient, wfLister, nil, nil)}}
}

func (a *argoKubeClient) NewCronWorkflowServiceClient() (cronworkflow.CronWorkflowServiceClient, error) {
Expand Down
11 changes: 6 additions & 5 deletions server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ func init() {
}
}

func getResourceCacheNamespace(opts ArgoServerOpts) string {
if opts.ManagedNamespace != "" {
return opts.ManagedNamespace
func getResourceCacheNamespace(managedNamespace string) string {
if managedNamespace != "" {
return managedNamespace
}
return v1.NamespaceAll
}
Expand All @@ -145,7 +145,7 @@ func NewArgoServer(ctx context.Context, opts ArgoServerOpts) (*argoServer, error
}
if ssoIf.IsRBACEnabled() {
// resourceCache is only used for SSO RBAC
resourceCache = cache.NewResourceCache(opts.Clients.Kubernetes, getResourceCacheNamespace(opts))
resourceCache = cache.NewResourceCache(opts.Clients.Kubernetes, getResourceCacheNamespace(opts.ManagedNamespace))
resourceCache.Run(ctx.Done())
}
log.Info("SSO enabled")
Expand Down Expand Up @@ -235,7 +235,8 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
if err != nil {
log.Fatal(err)
}
workflowServer := workflow.NewWorkflowServer(instanceIDService, offloadRepo, wfArchive, as.clients.Workflow, wfStore, wfStore)
resourceCacheNamespace := getResourceCacheNamespace(as.managedNamespace)
workflowServer := workflow.NewWorkflowServer(instanceIDService, offloadRepo, wfArchive, as.clients.Workflow, wfStore, wfStore, &resourceCacheNamespace)
grpcServer := as.newGRPCServer(instanceIDService, workflowServer, wfArchiveServer, eventServer, config.Links, config.Columns, config.NavColor)
httpServer := as.newHTTPServer(ctx, port, artifactServer)

Expand Down
8 changes: 4 additions & 4 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,21 @@ type workflowServer struct {
var _ workflowpkg.WorkflowServiceServer = &workflowServer{}

// NewWorkflowServer returns a new WorkflowServer
func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive, wfClientSet versioned.Interface, wfLister store.WorkflowLister, wfStore store.WorkflowStore) *workflowServer {
func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive, wfClientSet versioned.Interface, wfLister store.WorkflowLister, wfStore store.WorkflowStore, namespace *string) *workflowServer {
ws := &workflowServer{
instanceIDService: instanceIDService,
offloadNodeStatusRepo: offloadNodeStatusRepo,
hydrator: hydrator.New(offloadNodeStatusRepo),
wfArchive: wfArchive,
wfLister: wfLister,
}
if wfStore != nil {
if wfStore != nil && namespace != nil {
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return wfClientSet.ArgoprojV1alpha1().Workflows(metav1.NamespaceAll).List(context.Background(), options)
return wfClientSet.ArgoprojV1alpha1().Workflows(*namespace).List(context.Background(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return wfClientSet.ArgoprojV1alpha1().Workflows(metav1.NamespaceAll).Watch(context.Background(), options)
return wfClientSet.ArgoprojV1alpha1().Workflows(*namespace).Watch(context.Background(), options)
},
}
wfReflector := cache.NewReflector(lw, &wfv1.Workflow{}, wfStore, reSyncDuration)
Expand Down
3 changes: 2 additions & 1 deletion server/workflow/workflow_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,8 @@ func getWorkflowServer() (workflowpkg.WorkflowServiceServer, context.Context) {
if err = wfStore.Add(&wfObj5); err != nil {
panic(err)
}
server := NewWorkflowServer(instanceIdSvc, offloadNodeStatusRepo, archivedRepo, wfClientset, wfStore, wfStore)
namespaceAll := metav1.NamespaceAll
server := NewWorkflowServer(instanceIdSvc, offloadNodeStatusRepo, archivedRepo, wfClientset, wfStore, wfStore, &namespaceAll)
return server, ctx
}

Expand Down

0 comments on commit c2204ae

Please sign in to comment.