Skip to content

Commit 3840795

Browse files
committed
extraresources: add support in the reconciler
Add support in the reconciler code to fetch secondary-scheduler extra resources using the `/bin/get-manifests` entrypoint and an auxiliary deployment. For details, see: openshift#9 (comment) Signed-off-by: Francesco Romani <[email protected]>
1 parent 172582c commit 3840795

File tree

2 files changed

+216
-1
lines changed

2 files changed

+216
-1
lines changed

pkg/operator/extra_resources.go

+186
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package operator
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
"strings"
8+
9+
appsv1 "k8s.io/api/apps/v1"
10+
v1 "k8s.io/api/core/v1"
11+
rbacv1 "k8s.io/api/rbac/v1"
12+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13+
"k8s.io/apimachinery/pkg/labels"
14+
"k8s.io/apimachinery/pkg/runtime"
15+
"k8s.io/client-go/kubernetes"
16+
"k8s.io/client-go/kubernetes/scheme"
17+
18+
"github.com/openshift/library-go/pkg/operator/events"
19+
"github.com/openshift/library-go/pkg/operator/resource/resourceapply"
20+
"github.com/openshift/library-go/pkg/operator/resource/resourcemerge"
21+
"github.com/openshift/library-go/pkg/operator/resource/resourceread"
22+
23+
"github.com/openshift/secondary-scheduler-operator/bindata"
24+
secondaryschedulersv1 "github.com/openshift/secondary-scheduler-operator/pkg/apis/secondaryscheduler/v1"
25+
)
26+
27+
const (
28+
labelExtraResource = "app"
29+
)
30+
31+
type K8sObject interface {
32+
metav1.Object
33+
runtime.Object
34+
}
35+
36+
func (c *TargetConfigReconciler) manageExtraResourcesDeployment(secondaryScheduler *secondaryschedulersv1.SecondaryScheduler, forceDeployment bool) (*appsv1.Deployment, bool, error) {
37+
required := resourceread.ReadDeploymentV1OrDie(bindata.MustAsset("assets/secondary-scheduler/deployment-extra.yaml"))
38+
required.Name = fmt.Sprintf("%s-extra-resources", secondaryScheduler.Name)
39+
required.Namespace = secondaryScheduler.Namespace
40+
required.OwnerReferences = []metav1.OwnerReference{
41+
{
42+
APIVersion: "v1",
43+
Kind: "SecondaryScheduler",
44+
Name: secondaryScheduler.Name,
45+
UID: secondaryScheduler.UID,
46+
},
47+
}
48+
49+
images := map[string]string{
50+
"${IMAGE}": secondaryScheduler.Spec.SchedulerImage,
51+
}
52+
for i := range required.Spec.Template.Spec.Containers {
53+
for pat, img := range images {
54+
if required.Spec.Template.Spec.Containers[i].Image == pat {
55+
required.Spec.Template.Spec.Containers[i].Image = img
56+
break
57+
}
58+
}
59+
}
60+
61+
// TODO
62+
required.Spec.Replicas = newInt32(1)
63+
// FIXME: this method will disappear in 4.6 so we need to fix this ASAP
64+
return resourceapply.ApplyDeploymentWithForce(
65+
c.kubeClient.AppsV1(),
66+
c.eventRecorder,
67+
required,
68+
resourcemerge.ExpectedDeploymentGeneration(required, secondaryScheduler.Status.Generations),
69+
forceDeployment)
70+
}
71+
72+
func newInt32(v int32) *int32 {
73+
return &v
74+
}
75+
76+
func (c *TargetConfigReconciler) manageExtraResourcesObjects(dp *appsv1.Deployment) ([]K8sObject, error) {
77+
extraResPod, err := findExtraResourcesPod(c.kubeClient, dp)
78+
if err != nil {
79+
return nil, err
80+
}
81+
return getExtraResourcesObjects(c.kubeClient, extraResPod)
82+
}
83+
84+
func findExtraResourcesPod(kubeClient kubernetes.Interface, dp *appsv1.Deployment) (*v1.Pod, error) {
85+
val, ok := dp.Spec.Template.ObjectMeta.Labels[labelExtraResource]
86+
if !ok {
87+
return nil, fmt.Errorf("label %q not found", labelExtraResource)
88+
}
89+
90+
sel, err := labels.Parse(fmt.Sprintf("%s=%s", labelExtraResource, val))
91+
if err != nil {
92+
return nil, err
93+
}
94+
95+
pods, err := kubeClient.CoreV1().Pods(dp.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: sel.String()})
96+
if err != nil {
97+
return nil, err
98+
}
99+
100+
if len(pods.Items) == 0 {
101+
return nil, resourceNotReady
102+
}
103+
if len(pods.Items) > 1 {
104+
return nil, fmt.Errorf("expected 1 pod in deployment %s/%s found %d", dp.Namespace, dp.Name, len(pods.Items))
105+
}
106+
return &pods.Items[0], nil
107+
}
108+
109+
func getExtraResourcesObjects(kubeClient kubernetes.Interface, pod *v1.Pod) ([]K8sObject, error) {
110+
rc, err := kubeClient.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &v1.PodLogOptions{}).Stream(context.TODO())
111+
if err != nil {
112+
return nil, err
113+
}
114+
defer rc.Close()
115+
buf := new(bytes.Buffer)
116+
buf.ReadFrom(rc)
117+
118+
data := buf.String()
119+
decoder := scheme.Codecs.UniversalDeserializer()
120+
var extraObjs []K8sObject
121+
for _, resourceYAML := range strings.Split(data, "---") {
122+
if len(resourceYAML) == 0 {
123+
continue
124+
}
125+
obj, gvk, err := decoder.Decode([]byte(resourceYAML), nil, nil)
126+
if err != nil {
127+
return nil, err
128+
}
129+
130+
// if objects are supported, coerce them in the namespace managed by the operator.
131+
if gvk.Group == "" && gvk.Version == "v1" && gvk.Kind == "ConfigMap" {
132+
cm := obj.(*v1.ConfigMap)
133+
cm.Namespace = pod.Namespace
134+
extraObjs = append(extraObjs, cm)
135+
} else if gvk.Group == "rbac.authorization.k8s.io" && gvk.Version == "v1" && gvk.Kind == "ClusterRole" {
136+
cr := obj.(*rbacv1.ClusterRole)
137+
cr.Namespace = pod.Namespace
138+
extraObjs = append(extraObjs, cr)
139+
} else if gvk.Group == "rbac.authorization.k8s.io" && gvk.Version == "v1" && gvk.Kind == "ClusterRoleBinding" {
140+
crb := obj.(*rbacv1.ClusterRoleBinding)
141+
if err := validateClusterRoleBinding(crb); err != nil {
142+
return nil, err
143+
}
144+
crb.Namespace = pod.Namespace
145+
extraObjs = append(extraObjs, obj.(*rbacv1.ClusterRoleBinding))
146+
} else {
147+
return nil, fmt.Errorf("unsupported object %T %s/%s %s", obj, gvk.Group, gvk.Version, gvk.Kind)
148+
}
149+
}
150+
return extraObjs, nil
151+
}
152+
153+
func validateClusterRoleBinding(crb *rbacv1.ClusterRoleBinding) error {
154+
if len(crb.Subjects) > 1 {
155+
return fmt.Errorf("unsupported ClusterRoleBinding with more than 1 subject")
156+
}
157+
if crb.Subjects[0].Kind != "ServiceAccount" {
158+
return fmt.Errorf("unsupported subject kind for ClusterRoleBinding: %q", crb.Subjects[0].Kind)
159+
}
160+
return nil
161+
}
162+
163+
func applyExtraObjects(kubeClient kubernetes.Interface, eventRecorder events.Recorder, serviceAccount *v1.ServiceAccount, extraObjs []K8sObject) error {
164+
for _, extraObj := range extraObjs {
165+
switch extraObj.(type) {
166+
case *v1.ConfigMap:
167+
if _, _, err := resourceapply.ApplyConfigMap(kubeClient.CoreV1(), eventRecorder, extraObj.(*v1.ConfigMap)); err != nil {
168+
return err
169+
}
170+
case *rbacv1.ClusterRole:
171+
if _, _, err := resourceapply.ApplyClusterRole(kubeClient.RbacV1(), eventRecorder, extraObj.(*rbacv1.ClusterRole)); err != nil {
172+
return err
173+
}
174+
case *rbacv1.ClusterRoleBinding:
175+
crb := extraObj.(*rbacv1.ClusterRoleBinding)
176+
crb.Subjects[0].Name = serviceAccount.Name
177+
crb.Subjects[0].Namespace = serviceAccount.Namespace
178+
if _, _, err := resourceapply.ApplyClusterRoleBinding(kubeClient.RbacV1(), eventRecorder, crb); err != nil {
179+
return err
180+
}
181+
default:
182+
return fmt.Errorf("unsupported extra object %T", extraObj)
183+
}
184+
}
185+
return nil
186+
}

pkg/operator/target_config_reconciler.go

+30-1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ const (
4545
// compatibility with 3.11
4646
var secondarySchedulerConfigMap = "secondary-scheduler"
4747

48+
var resourceNotReady = fmt.Errorf("resource not ready")
49+
4850
type TargetConfigReconciler struct {
4951
ctx context.Context
5052
operatorClient operatorconfigclientv1.SecondaryschedulersV1Interface
@@ -94,14 +96,37 @@ func (c TargetConfigReconciler) sync() error {
9496
return err
9597
}
9698

97-
if _, _, err := c.manageServiceAccount(secondaryScheduler); err != nil {
99+
serviceAccount, _, err := c.manageServiceAccount(secondaryScheduler)
100+
if err != nil {
98101
return err
99102
}
100103

104+
var extraObjects []K8sObject
105+
if secondaryScheduler.Spec.ExtraResources {
106+
var err error
107+
dp, _, err := c.manageExtraResourcesDeployment(secondaryScheduler, false)
108+
if err != nil {
109+
return err
110+
}
111+
_, _, err = v1helpers.UpdateStatus(c.secondarySchedulerClient, func(status *operatorv1.OperatorStatus) error {
112+
resourcemerge.SetDeploymentGeneration(&status.Generations, dp)
113+
return nil
114+
})
115+
116+
extraObjects, err = c.manageExtraResourcesObjects(dp)
117+
if err != nil {
118+
return err
119+
}
120+
}
121+
101122
if _, _, err := c.manageClusterRoleBinding(secondaryScheduler); err != nil {
102123
return err
103124
}
104125

126+
if err := applyExtraObjects(c.kubeClient, c.eventRecorder, serviceAccount, extraObjects); err != nil {
127+
return err
128+
}
129+
105130
deployment, _, err := c.manageDeployment(secondaryScheduler, forceDeployment)
106131
if err != nil {
107132
return err
@@ -271,6 +296,10 @@ func (c *TargetConfigReconciler) processNextWorkItem() bool {
271296

272297
err := c.sync()
273298
if err == nil {
299+
if err == resourceNotReady {
300+
c.queue.AddAfter(dsKey, 10*time.Second)
301+
return true
302+
}
274303
c.queue.Forget(dsKey)
275304
return true
276305
}

0 commit comments

Comments
 (0)