-
Notifications
You must be signed in to change notification settings - Fork 65
/
Copy pathreconcile_services.go
134 lines (103 loc) · 4.19 KB
/
reconcile_services.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright DataStax, Inc.
// Please see the included license file for details.
package reconciliation
import (
api "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1"
"github.com/k8ssandra/cass-operator/pkg/internal/result"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"github.com/k8ssandra/cass-operator/pkg/utils"
)
func (rc *ReconciliationContext) CreateHeadlessServices() result.ReconcileResult {
// unpacking
logger := rc.ReqLogger
client := rc.Client
for idx := range rc.Services {
service := rc.Services[idx]
logger.Info(
"Creating a new headless service",
"serviceNamespace", service.Namespace,
"serviceName", service.Name)
if err := setOperatorProgressStatus(rc, api.ProgressUpdating); err != nil {
return result.Error(err)
}
if err := client.Create(rc.Ctx, service); err != nil {
logger.Error(err, "Could not create headless service")
return result.Error(err)
}
rc.Recorder.Eventf(rc.Datacenter, "Normal", "CreatedResource", "Created service %s", service.Name)
}
// at this point we had previously been saying this reconcile call was over, we're done
// but that seems wrong, we should just continue on to the next resources
return result.Continue()
}
// ReconcileHeadlessService ...
func (rc *ReconciliationContext) CheckHeadlessServices() result.ReconcileResult {
// unpacking
logger := rc.ReqLogger
dc := rc.Datacenter
client := rc.Client
logger.Info("reconcile_services::ReconcileHeadlessServices")
// Check if there is a headless service for the cluster
cqlService := newServiceForCassandraDatacenter(dc)
seedService := newSeedServiceForCassandraDatacenter(dc)
allPodsService := newAllPodsServiceForCassandraDatacenter(dc)
additionalSeedService := newAdditionalSeedServiceForCassandraDatacenter(dc)
services := []*corev1.Service{cqlService, seedService, allPodsService, additionalSeedService}
if dc.IsNodePortEnabled() {
nodePortService := newNodePortServiceForCassandraDatacenter(dc)
services = append(services, nodePortService)
}
createNeeded := []*corev1.Service{}
for idx := range services {
desiredSvc := services[idx]
// Set CassandraDatacenter dc as the owner and controller
err := setControllerReference(dc, desiredSvc, rc.Scheme)
if err != nil {
logger.Error(err, "Could not set controller reference for headless service")
return result.Error(err)
}
// See if the service already exists
nsName := types.NamespacedName{Name: desiredSvc.Name, Namespace: desiredSvc.Namespace}
currentService := &corev1.Service{}
err = client.Get(rc.Ctx, nsName, currentService)
if err != nil && errors.IsNotFound(err) {
// if it's not found, put the service in the slice to be created when Apply is called
createNeeded = append(createNeeded, desiredSvc)
} else if err != nil {
// if we hit a k8s error, log it and error out
logger.Error(err, "Could not get headless seed service",
"name", nsName,
)
return result.Error(err)
} else {
// if we found the service already, check if they need updating
if !utils.ResourcesHaveSameHash(currentService, desiredSvc) {
resourceVersion := currentService.GetResourceVersion()
// preserve any labels and annotations that were added to the service post-creation
desiredSvc.Labels = utils.MergeMap(map[string]string{}, currentService.Labels, desiredSvc.Labels)
desiredSvc.Annotations = utils.MergeMap(map[string]string{}, currentService.Annotations, desiredSvc.Annotations)
// ClusterIP may have been updated for the NodePort service
// so we need to preserve it. Copying should not break any of
// the other services either.
desiredSvc.Spec.ClusterIP = currentService.Spec.ClusterIP
logger.Info("Updating service",
"service", currentService,
"desired", desiredSvc)
desiredSvc.DeepCopyInto(currentService)
currentService.SetResourceVersion(resourceVersion)
if err := client.Update(rc.Ctx, currentService); err != nil {
logger.Error(err, "Unable to update service",
"service", currentService)
return result.Error(err)
}
}
}
}
if len(createNeeded) > 0 {
rc.Services = createNeeded
return rc.CreateHeadlessServices()
}
return result.Continue()
}