Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 56 additions & 49 deletions pkg/operator/hostendpointscontroller/host_endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
operatorv1 "github.com/openshift/api/operator/v1"
configv1informers "github.com/openshift/client-go/config/informers/externalversions/config/v1"
configv1listers "github.com/openshift/client-go/config/listers/config/v1"
"github.com/openshift/cluster-etcd-operator/pkg/operator/operatorclient"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/resource/resourceapply"
"github.com/openshift/library-go/pkg/operator/resource/resourcemerge"
Expand Down Expand Up @@ -41,7 +42,7 @@ const (
type HostEndpointsController struct {
operatorClient v1helpers.OperatorClient
infrastructureLister configv1listers.InfrastructureLister
podLister corev1listers.PodLister
nodeLister corev1listers.NodeLister
endpointsLister corev1listers.EndpointsLister
endpointsClient corev1client.EndpointsGetter

Expand All @@ -54,32 +55,33 @@ func NewHostEndpointsController(
operatorClient v1helpers.OperatorClient,
eventRecorder events.Recorder,
kubeClient kubernetes.Interface,
kubeInformersForNamespaces operatorv1helpers.KubeInformersForNamespaces,
kubeInformers operatorv1helpers.KubeInformersForNamespaces,
infrastructureInformer configv1informers.InfrastructureInformer,
) *HostEndpointsController {
kubeInformersForTargetNamespace := kubeInformersForNamespaces.InformersFor("openshift-etcd")
kubeInformersForTargetNamespace := kubeInformers.InformersFor(operatorclient.TargetNamespace)
endpointsInformer := kubeInformersForTargetNamespace.Core().V1().Endpoints()
podInformer := kubeInformersForTargetNamespace.Core().V1().Pods()
kubeInformersForCluster := kubeInformers.InformersFor("")
nodeInformer := kubeInformersForCluster.Core().V1().Nodes()

c := &HostEndpointsController{
eventRecorder: eventRecorder.WithComponentSuffix("host-etcd-endpoints-controller"),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "HostEndpointsController"),
cachesToSync: []cache.InformerSynced{
operatorClient.Informer().HasSynced,
endpointsInformer.Informer().HasSynced,
podInformer.Informer().HasSynced,
nodeInformer.Informer().HasSynced,
infrastructureInformer.Informer().HasSynced,
},
operatorClient: operatorClient,
infrastructureLister: infrastructureInformer.Lister(),
podLister: podInformer.Lister(),
nodeLister: nodeInformer.Lister(),
endpointsLister: endpointsInformer.Lister(),
endpointsClient: kubeClient.CoreV1(),
}
operatorClient.Informer().AddEventHandler(c.eventHandler())
endpointsInformer.Informer().AddEventHandler(c.eventHandler())
infrastructureInformer.Informer().AddEventHandler(c.eventHandler())
podInformer.Informer().AddEventHandler(c.eventHandler())
nodeInformer.Informer().AddEventHandler(c.eventHandler())
return c
}

Expand Down Expand Up @@ -112,74 +114,79 @@ func (c *HostEndpointsController) sync() error {
}

func (c *HostEndpointsController) syncHostEndpoints() error {
required := hostEndpointsAsset()

discoveryDomain, err := c.getEtcdDiscoveryDomain()
if err != nil {
return fmt.Errorf("unable to determine etcd discovery domain: %v", err)
}

// list etcd member pods
pods, err := c.podLister.List(labels.Set{"k8s-app": "etcd"}.AsSelector())
if required.Annotations == nil {
required.Annotations = map[string]string{}
}
required.Annotations["alpha.installer.openshift.io/dns-suffix"] = discoveryDomain

// get dns names of ready etcd member pods
var addresses []string
for _, pod := range pods {
var ready bool
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodReady {
ready = condition.Status == corev1.ConditionTrue
// create endpoint addresses for each node
nodes, err := c.nodeLister.List(labels.Set{"node-role.kubernetes.io/master": ""}.AsSelector())
if err != nil {
return fmt.Errorf("unable to list expected etcd member nodes: %v", err)
}
endpointAddresses := []corev1.EndpointAddress{}
for _, node := range nodes {
var nodeInternalIP string
for _, nodeAddress := range node.Status.Addresses {
if nodeAddress.Type == corev1.NodeInternalIP {
nodeInternalIP = nodeAddress.Address
break
}
}
if ready {
dnsName, err := c.getEtcdDNSName(discoveryDomain, pod.Status.HostIP)
if err != nil {
return fmt.Errorf("unable to determine dns name for etcd member on node %s: %v", pod.Spec.NodeName, err)
}
addresses = append(addresses, dnsName)
if len(nodeInternalIP) == 0 {
return fmt.Errorf("unable to determine internal ip address for node %s", node.Name)
}
dnsName, err := c.getEtcdDNSName(discoveryDomain, nodeInternalIP)
if err != nil {
return fmt.Errorf("unable to determine etcd member dns name for node %s: %v", node.Name, err)
}
}

if len(addresses) == 0 {
return fmt.Errorf("no etcd member pods are ready")
}

required := hostEndpointsAsset()

if required.Annotations == nil {
required.Annotations = map[string]string{}
}
required.Annotations["alpha.installer.openshift.io/dns-suffix"] = discoveryDomain

sort.Strings(addresses)
for i, address := range addresses {
required.Subsets[0].Addresses = append(required.Subsets[0].Addresses, corev1.EndpointAddress{
Hostname: strings.TrimSuffix(address, "."+discoveryDomain),
IP: net.IPv4(byte(192), byte(0), byte(2), byte(i+1)).String(),
endpointAddresses = append(endpointAddresses, corev1.EndpointAddress{
IP: nodeInternalIP,
Hostname: strings.TrimSuffix(dnsName, "."+discoveryDomain),
NodeName: &node.Name,
})
}
sort.Slice(endpointAddresses, func(i, j int) bool {
return (endpointAddresses)[i].Hostname < (endpointAddresses)[j].Hostname
})

// if etcd-bootstrap exists, keep it (at the end of the list)
existing, err := c.endpointsLister.Endpoints("openshift-etcd").Get("host-etcd")
if err != nil && !errors.IsNotFound(err) {
return err
}
if !errors.IsNotFound(err) {
existing, err := c.endpointsLister.Endpoints(operatorclient.TargetNamespace).Get("host-etcd")
switch {
case errors.IsNotFound(err):
// do nothing with not found because we don't want to clobber the results
case err != nil:
return nil
default:
for _, endpointAddress := range existing.Subsets[0].Addresses {
if endpointAddress.Hostname == "etcd-bootstrap" {
required.Subsets[0].Addresses = append(required.Subsets[0].Addresses, *endpointAddress.DeepCopy())
endpointAddresses = append(endpointAddresses, *endpointAddress.DeepCopy())
break
}
}
}

required.Subsets[0].Addresses = endpointAddresses
if len(required.Subsets[0].Addresses) == 0 {
return fmt.Errorf("no etcd member nodes are ready")
}

return c.applyEndpoints(required)
}

func hostEndpointsAsset() *corev1.Endpoints {
return &corev1.Endpoints{
ObjectMeta: v1.ObjectMeta{
Name: "host-etcd",
Namespace: "openshift-etcd",
Namespace: operatorclient.TargetNamespace,
},
Subsets: []corev1.EndpointSubset{
{
Expand Down Expand Up @@ -243,9 +250,9 @@ func reverseLookup(service, proto, name, ip string) (string, error) {
}

func (c *HostEndpointsController) applyEndpoints(required *corev1.Endpoints) error {
existing, err := c.endpointsLister.Endpoints("openshift-etcd").Get("host-etcd")
existing, err := c.endpointsLister.Endpoints(operatorclient.TargetNamespace).Get("host-etcd")
if errors.IsNotFound(err) {
_, err := c.endpointsClient.Endpoints("openshift-etcd").Create(required)
_, err := c.endpointsClient.Endpoints(operatorclient.TargetNamespace).Create(required)
if err != nil {
c.eventRecorder.Warningf("EndpointsCreateFailed", "Failed to create endpoints/%s -n %s: %v", required.Name, required.Namespace, err)
return err
Expand Down Expand Up @@ -273,7 +280,7 @@ func (c *HostEndpointsController) applyEndpoints(required *corev1.Endpoints) err
if klog.V(4) {
klog.Infof("Endpoints %q changes: %v", required.Namespace+"/"+required.Name, jsonPatch)
}
_, err = c.endpointsClient.Endpoints("openshift-etcd").Update(toWrite)
_, err = c.endpointsClient.Endpoints(operatorclient.TargetNamespace).Update(toWrite)
if err != nil {
c.eventRecorder.Warningf("EndpointsUpdateFailed", "Failed to update endpoints/%s -n %s: %v", required.Name, required.Namespace, err)
return err
Expand Down