Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/machine-config-operator/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func runStartCmd(cmd *cobra.Command, args []string) {
controller := operator.New(
componentNamespace, componentName,
startOpts.imagesFile,
ctrlctx.NamespacedInformerFactory.Machineconfiguration().V1().MCOConfigs(),
ctrlctx.NamespacedInformerFactory.Machineconfiguration().V1().MachineConfigPools(),
ctrlctx.NamespacedInformerFactory.Machineconfiguration().V1().ControllerConfigs(),
ctrlctx.NamespacedInformerFactory.Machineconfiguration().V1().MachineConfigs(),
Expand All @@ -67,6 +66,7 @@ func runStartCmd(cmd *cobra.Command, args []string) {
ctrlctx.KubeNamespacedInformerFactory.Rbac().V1().ClusterRoles(),
ctrlctx.KubeNamespacedInformerFactory.Rbac().V1().ClusterRoleBindings(),
ctrlctx.KubeNamespacedInformerFactory.Core().V1().ConfigMaps(),
ctrlctx.KubeInformerFactory.Core().V1().ConfigMaps(),
ctrlctx.ConfigInformerFactory.Config().V1().Infrastructures(),
ctrlctx.ConfigInformerFactory.Config().V1().Networks(),
ctrlctx.ClientBuilder.MachineConfigClientOrDie(componentName),
Expand Down
81 changes: 60 additions & 21 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package operator
import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
Expand All @@ -11,7 +12,7 @@ import (
"github.com/golang/glog"

configclientset "github.com/openshift/client-go/config/clientset/versioned"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apiextclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apiextinformersv1beta1 "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1beta1"
apiextlistersv1beta1 "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1beta1"
Expand All @@ -25,6 +26,7 @@ import (
"k8s.io/client-go/kubernetes"
coreclientsetv1 "k8s.io/client-go/kubernetes/typed/core/v1"
appslisterv1 "k8s.io/client-go/listers/apps/v1"
corelisterv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -73,31 +75,37 @@ type Operator struct {
syncHandler func(ic string) error

crdLister apiextlistersv1beta1.CustomResourceDefinitionLister
mcoconfigLister mcfglistersv1.MCOConfigLister
mcpLister mcfglistersv1.MachineConfigPoolLister
ccLister mcfglistersv1.ControllerConfigLister
mcLister mcfglistersv1.MachineConfigLister
deployLister appslisterv1.DeploymentLister
daemonsetLister appslisterv1.DaemonSetLister
infraLister configlistersv1.InfrastructureLister
networkLister configlistersv1.NetworkLister
mcoCmLister corelisterv1.ConfigMapLister
clusterCmLister corelisterv1.ConfigMapLister

crdListerSynced cache.InformerSynced
mcoconfigListerSynced cache.InformerSynced
deployListerSynced cache.InformerSynced
daemonsetListerSynced cache.InformerSynced
infraListerSynced cache.InformerSynced
networkListerSynced cache.InformerSynced
mcpListerSynced cache.InformerSynced
ccListerSynced cache.InformerSynced
mcListerSynced cache.InformerSynced
mcoCmListerSynced cache.InformerSynced
clusterCmListerSynced cache.InformerSynced

// queue only ever has one item, but it has nice error handling backoff/retry semantics
queue workqueue.RateLimitingInterface

stopCh <-chan struct{}
}

// New returns a new machine config operator.
func New(
namespace, name string,
imagesFile string,
mcoconfigInformer mcfginformersv1.MCOConfigInformer,
mcpInformer mcfginformersv1.MachineConfigPoolInformer,
ccInformer mcfginformersv1.ControllerConfigInformer,
mcInformer mcfginformersv1.MachineConfigInformer,
Expand All @@ -108,7 +116,8 @@ func New(
daemonsetInformer appsinformersv1.DaemonSetInformer,
clusterRoleInformer rbacinformersv1.ClusterRoleInformer,
clusterRoleBindingInformer rbacinformersv1.ClusterRoleBindingInformer,
cmInformer coreinformersv1.ConfigMapInformer,
mcoCmInformer coreinformersv1.ConfigMapInformer,
clusterCmInfomer coreinformersv1.ConfigMapInformer,
infraInformer configinformersv1.InfrastructureInformer,
networkInformer configinformersv1.NetworkInformer,
client mcfgclientset.Interface,
Expand All @@ -133,27 +142,35 @@ func New(
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "machineconfigoperator"),
}

mcoconfigInformer.Informer().AddEventHandler(optr.eventHandler())
controllerConfigInformer.Informer().AddEventHandler(optr.eventHandler())
serviceAccountInfomer.Informer().AddEventHandler(optr.eventHandler())
crdInformer.Informer().AddEventHandler(optr.eventHandler())
deployInformer.Informer().AddEventHandler(optr.eventHandler())
daemonsetInformer.Informer().AddEventHandler(optr.eventHandler())
clusterRoleInformer.Informer().AddEventHandler(optr.eventHandler())
clusterRoleBindingInformer.Informer().AddEventHandler(optr.eventHandler())
cmInformer.Informer().AddEventHandler(optr.eventHandler())
infraInformer.Informer().AddEventHandler(optr.eventHandler())
networkInformer.Informer().AddEventHandler(optr.eventHandler())
for _, i := range []cache.SharedIndexInformer{
controllerConfigInformer.Informer(),
serviceAccountInfomer.Informer(),
crdInformer.Informer(),
deployInformer.Informer(),
daemonsetInformer.Informer(),
clusterRoleInformer.Informer(),
clusterRoleBindingInformer.Informer(),
mcoCmInformer.Informer(),
infraInformer.Informer(),
networkInformer.Informer(),
} {
i.AddEventHandler(optr.eventHandler())
}

optr.syncHandler = optr.sync

optr.clusterCmLister = clusterCmInfomer.Lister()
optr.clusterCmListerSynced = clusterCmInfomer.Informer().HasSynced
optr.mcoCmLister = mcoCmInformer.Lister()
optr.mcoCmListerSynced = mcoCmInformer.Informer().HasSynced
optr.crdLister = crdInformer.Lister()
optr.crdListerSynced = crdInformer.Informer().HasSynced
optr.mcoconfigLister = mcoconfigInformer.Lister()
optr.mcoconfigListerSynced = mcoconfigInformer.Informer().HasSynced
optr.mcpLister = mcpInformer.Lister()
optr.mcpListerSynced = mcpInformer.Informer().HasSynced
optr.ccLister = ccInformer.Lister()
optr.ccListerSynced = ccInformer.Informer().HasSynced
optr.mcLister = mcInformer.Lister()
optr.mcListerSynced = mcInformer.Informer().HasSynced
optr.deployLister = deployInformer.Lister()
optr.deployListerSynced = deployInformer.Informer().HasSynced
optr.daemonsetLister = daemonsetInformer.Lister()
Expand Down Expand Up @@ -189,15 +206,30 @@ func (optr *Operator) Run(workers int, stopCh <-chan struct{}) {

if !cache.WaitForCacheSync(stopCh,
optr.crdListerSynced,
optr.mcoconfigListerSynced,
optr.deployListerSynced,
optr.daemonsetListerSynced,
optr.infraListerSynced,
optr.mcoCmListerSynced,
optr.clusterCmListerSynced,
optr.networkListerSynced) {
glog.Error("failed to sync caches")
return
}

// these can only be synced after CRDs are installed
if !optr.inClusterBringup {
if !cache.WaitForCacheSync(stopCh,
optr.mcpListerSynced,
optr.ccListerSynced,
optr.mcListerSynced,
) {
glog.Error("failed to sync caches")
return
}
}

optr.stopCh = stopCh

for i := 0; i < workers; i++ {
go wait.Until(optr.worker, time.Second, stopCh)
}
Expand Down Expand Up @@ -261,6 +293,13 @@ func (optr *Operator) sync(key string) error {
return err
}

if optr.inClusterBringup {
// sync now our own informers after having installed the CRDs
if !cache.WaitForCacheSync(optr.stopCh, optr.mcpListerSynced, optr.mcListerSynced, optr.ccListerSynced) {
return errors.New("failed to sync caches for informers")
}
}

namespace, _, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
Expand Down Expand Up @@ -342,15 +381,15 @@ func (optr *Operator) sync(key string) error {
}

func (optr *Operator) getOsImageURL(namespace string) (string, error) {
cm, err := optr.kubeClient.CoreV1().ConfigMaps(namespace).Get(osImageConfigMapName, metav1.GetOptions{})
cm, err := optr.mcoCmLister.ConfigMaps(namespace).Get(osImageConfigMapName)
if err != nil {
return "", err
}
return cm.Data["osImageURL"], nil
}

func (optr *Operator) getCAsFromConfigMap(namespace, name, key string) ([]byte, error) {
cm, err := optr.kubeClient.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{})
cm, err := optr.clusterCmLister.ConfigMaps(namespace).Get(name)
if err != nil {
return nil, err
}
Expand Down