Skip to content

Commit 6de3f25

Browse files
shriramsharmapsikka1
authored andcommitted
Fix for concurrent map iteration and map write (istio-ecosystem#197)
* fixes istio-ecosystem#196 Added `.Range()` and `.Copy() func to avoid concurrent map read and write. Signed-off-by: Shriram Sharma <[email protected]> * fixed the linting errors Signed-off-by: Shriram Sharma <[email protected]> Co-authored-by: Shriram Sharma <[email protected]> Signed-off-by: psikka1 <[email protected]>
1 parent 739623c commit 6de3f25

File tree

8 files changed

+389
-204
lines changed

8 files changed

+389
-204
lines changed

admiral/pkg/apis/admiral/routes/handlers.go

+12-8
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ package routes
33
import (
44
"encoding/json"
55
"fmt"
6-
"github.com/gorilla/mux"
7-
"github.com/istio-ecosystem/admiral/admiral/pkg/clusters"
8-
"istio.io/client-go/pkg/apis/networking/v1alpha3"
96
"log"
107
"net/http"
118
"strings"
9+
10+
"github.com/gorilla/mux"
11+
"github.com/istio-ecosystem/admiral/admiral/pkg/clusters"
12+
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/common"
13+
"istio.io/client-go/pkg/apis/networking/v1alpha3"
1214
)
1315

1416
type RouteOpts struct {
@@ -126,16 +128,18 @@ func (opts *RouteOpts) GetServiceEntriesByIdentity(w http.ResponseWriter, r *htt
126128

127129
if identity != "" {
128130

129-
for cname, serviceCluster := range opts.RemoteRegistry.AdmiralCache.SeClusterCache.Map() {
131+
m := opts.RemoteRegistry.AdmiralCache.SeClusterCache
132+
133+
m.Range(func(cname string, serviceCluster *common.Map) {
130134
if strings.Contains(cname, identity) {
131135
var identityServiceEntry IdentityServiceEntry
132136
identityServiceEntry.Cname = cname
133-
for _, clusterId := range serviceCluster.Map() {
134-
identityServiceEntry.ClusterNames = append(identityServiceEntry.ClusterNames, clusterId)
135-
}
137+
serviceCluster.Range(func(k string, clusterID string) {
138+
identityServiceEntry.ClusterNames = append(identityServiceEntry.ClusterNames, clusterID)
139+
})
136140
response = append(response, identityServiceEntry)
137141
}
138-
}
142+
})
139143
out, err := json.Marshal(response)
140144
if err != nil {
141145
log.Printf("Failed to marshall response GetServiceEntriesByIdentity call")

admiral/pkg/clusters/handler.go

+54-52
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,15 @@ package clusters
33
import (
44
"bytes"
55
"fmt"
6+
"reflect"
7+
"sort"
8+
"strings"
9+
"time"
10+
611
argo "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
712
"github.com/gogo/protobuf/types"
813
"github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/model"
9-
"github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/v1"
14+
v1 "github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/v1"
1015
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/admiral"
1116
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/common"
1217
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/util"
@@ -16,10 +21,6 @@ import (
1621
k8sAppsV1 "k8s.io/api/apps/v1"
1722
k8sV1 "k8s.io/api/core/v1"
1823
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
19-
"reflect"
20-
"sort"
21-
"strings"
22-
"time"
2324
)
2425

2526
const ROLLOUT_POD_HASH_LABEL string = "rollouts-pod-template-hash"
@@ -45,8 +46,8 @@ type SidecarHandler struct {
4546
}
4647

4748
type WeightedService struct {
48-
Weight int32
49-
Service *k8sV1.Service
49+
Weight int32
50+
Service *k8sV1.Service
5051
}
5152

5253
func updateIdentityDependencyCache(sourceIdentity string, identityDependencyCache *common.MapOfMaps, dr *v1.Dependency) {
@@ -70,9 +71,9 @@ func getDestinationRule(host string, locality string, gtpTrafficPolicy *model.Tr
7071
processGtp = false
7172
}
7273
outlierDetection := &v1alpha32.OutlierDetection{
73-
BaseEjectionTime: &types.Duration{Seconds: 300},
74+
BaseEjectionTime: &types.Duration{Seconds: 300},
7475
Consecutive_5XxErrors: &types.UInt32Value{Value: uint32(10)},
75-
Interval: &types.Duration{Seconds: 60},
76+
Interval: &types.Duration{Seconds: 60},
7677
}
7778
if gtpTrafficPolicy != nil && processGtp {
7879
var loadBalancerSettings = &v1alpha32.LoadBalancerSettings{
@@ -108,52 +109,52 @@ func getDestinationRule(host string, locality string, gtpTrafficPolicy *model.Tr
108109

109110
func (se *ServiceEntryHandler) Added(obj *v1alpha3.ServiceEntry) {
110111
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
111-
log.Infof(LogFormat, "Add", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
112+
log.Infof(LogFormat, "Add", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace="+obj.Namespace)
112113
return
113114
}
114115
}
115116

116117
func (se *ServiceEntryHandler) Updated(obj *v1alpha3.ServiceEntry) {
117118
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
118-
log.Infof(LogFormat, "Update", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
119+
log.Infof(LogFormat, "Update", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace="+obj.Namespace)
119120
return
120121
}
121122
}
122123

123124
func (se *ServiceEntryHandler) Deleted(obj *v1alpha3.ServiceEntry) {
124125
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
125-
log.Infof(LogFormat, "Delete", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
126+
log.Infof(LogFormat, "Delete", "ServiceEntry", obj.Name, se.ClusterID, "Skipping resource from namespace="+obj.Namespace)
126127
return
127128
}
128129
}
129130

130131
func (dh *DestinationRuleHandler) Added(obj *v1alpha3.DestinationRule) {
131132
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
132-
log.Infof(LogFormat, "Add", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
133+
log.Infof(LogFormat, "Add", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
133134
return
134135
}
135136
handleDestinationRuleEvent(obj, dh, common.Add, common.DestinationRule)
136137
}
137138

138139
func (dh *DestinationRuleHandler) Updated(obj *v1alpha3.DestinationRule) {
139140
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
140-
log.Infof(LogFormat, "Update", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
141+
log.Infof(LogFormat, "Update", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
141142
return
142143
}
143144
handleDestinationRuleEvent(obj, dh, common.Update, common.DestinationRule)
144145
}
145146

146147
func (dh *DestinationRuleHandler) Deleted(obj *v1alpha3.DestinationRule) {
147148
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
148-
log.Infof(LogFormat, "Delete", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
149+
log.Infof(LogFormat, "Delete", "DestinationRule", obj.Name, dh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
149150
return
150151
}
151152
handleDestinationRuleEvent(obj, dh, common.Delete, common.DestinationRule)
152153
}
153154

154155
func (vh *VirtualServiceHandler) Added(obj *v1alpha3.VirtualService) {
155156
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
156-
log.Infof(LogFormat, "Add", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
157+
log.Infof(LogFormat, "Add", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
157158
return
158159
}
159160
err := handleVirtualServiceEvent(obj, vh, common.Add, common.VirtualService)
@@ -164,7 +165,7 @@ func (vh *VirtualServiceHandler) Added(obj *v1alpha3.VirtualService) {
164165

165166
func (vh *VirtualServiceHandler) Updated(obj *v1alpha3.VirtualService) {
166167
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
167-
log.Infof(LogFormat, "Update", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
168+
log.Infof(LogFormat, "Update", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
168169
return
169170
}
170171
err := handleVirtualServiceEvent(obj, vh, common.Update, common.VirtualService)
@@ -175,7 +176,7 @@ func (vh *VirtualServiceHandler) Updated(obj *v1alpha3.VirtualService) {
175176

176177
func (vh *VirtualServiceHandler) Deleted(obj *v1alpha3.VirtualService) {
177178
if IgnoreIstioResource(obj.Spec.ExportTo, obj.Annotations, obj.Namespace) {
178-
log.Infof(LogFormat, "Delete", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace=" + obj.Namespace)
179+
log.Infof(LogFormat, "Delete", "VirtualService", obj.Name, vh.ClusterID, "Skipping resource from namespace="+obj.Namespace)
179180
return
180181
}
181182
err := handleVirtualServiceEvent(obj, vh, common.Delete, common.VirtualService)
@@ -192,7 +193,7 @@ func (dh *SidecarHandler) Deleted(obj *v1alpha3.Sidecar) {}
192193

193194
func IgnoreIstioResource(exportTo []string, annotations map[string]string, namespace string) bool {
194195

195-
if len(annotations) > 0 && annotations[common.AdmiralIgnoreAnnotation] == "true" {
196+
if len(annotations) > 0 && annotations[common.AdmiralIgnoreAnnotation] == "true" {
196197
return true
197198
}
198199

@@ -225,9 +226,9 @@ func handleDestinationRuleEvent(obj *v1alpha3.DestinationRule, dh *DestinationRu
225226

226227
r := dh.RemoteRegistry
227228

228-
dependentClusters := r.AdmiralCache.CnameDependentClusterCache.Get(destinationRule.Host)
229+
dependentClusters := r.AdmiralCache.CnameDependentClusterCache.Get(destinationRule.Host).Copy()
229230

230-
if dependentClusters != nil && len(dependentClusters.Map()) > 0 {
231+
if len(dependentClusters) > 0 {
231232

232233
log.Infof(LogFormat, "Event", "DestinationRule", obj.Name, clusterId, "Processing")
233234

@@ -240,7 +241,7 @@ func handleDestinationRuleEvent(obj *v1alpha3.DestinationRule, dh *DestinationRu
240241

241242
allDependentClusters := make(map[string]string)
242243

243-
util.MapCopy(allDependentClusters, dependentClusters.Map())
244+
util.MapCopy(allDependentClusters, dependentClusters)
244245

245246
allDependentClusters[clusterId] = clusterId
246247

@@ -416,7 +417,7 @@ func handleVirtualServiceEvent(obj *v1alpha3.VirtualService, vh *VirtualServiceH
416417
}
417418

418419
//check if this virtual service is used by Argo rollouts for canary strategy, if so, update the corresponding SE with appropriate weights
419-
if common.GetAdmiralParams().ArgoRolloutsEnabled {
420+
if common.GetAdmiralParams().ArgoRolloutsEnabled {
420421
rollouts, err := vh.RemoteRegistry.RemoteControllers[clusterId].RolloutController.RolloutClient.Rollouts(obj.Namespace).List(v12.ListOptions{})
421422

422423
if err != nil {
@@ -432,11 +433,11 @@ func handleVirtualServiceEvent(obj *v1alpha3.VirtualService, vh *VirtualServiceH
432433
}
433434
}
434435

435-
dependentClusters := r.AdmiralCache.CnameDependentClusterCache.Get(virtualService.Hosts[0])
436+
dependentClusters := r.AdmiralCache.CnameDependentClusterCache.Get(virtualService.Hosts[0]).Copy()
436437

437-
if dependentClusters != nil && len(dependentClusters.Map()) > 0 {
438+
if len(dependentClusters) > 0 {
438439

439-
for _, dependentCluster := range dependentClusters.Map() {
440+
for _, dependentCluster := range dependentClusters {
440441

441442
rc := r.RemoteControllers[dependentCluster]
442443

@@ -480,7 +481,7 @@ func handleVirtualServiceEvent(obj *v1alpha3.VirtualService, vh *VirtualServiceH
480481
}
481482
return nil
482483
} else {
483-
log.Infof(LogFormat,"Event", "VirtualService", obj.Name, clusterId, "No dependent clusters found")
484+
log.Infof(LogFormat, "Event", "VirtualService", obj.Name, clusterId, "No dependent clusters found")
484485
}
485486

486487
//copy the VirtualService `as is` if they are not generated by Admiral (not in CnameDependentClusterCache)
@@ -510,7 +511,7 @@ func addUpdateVirtualService(obj *v1alpha3.VirtualService, exist *v1alpha3.Virtu
510511
if obj.Annotations == nil {
511512
obj.Annotations = map[string]string{}
512513
}
513-
obj.Annotations["app.kubernetes.io/created-by"] = "admiral"
514+
obj.Annotations["app.kubernetes.io/created-by"] = "admiral"
514515
if exist == nil || len(exist.Spec.Hosts) == 0 {
515516
obj.Namespace = namespace
516517
obj.ResourceVersion = ""
@@ -537,20 +538,20 @@ func addUpdateServiceEntry(obj *v1alpha3.ServiceEntry, exist *v1alpha3.ServiceEn
537538
if obj.Annotations == nil {
538539
obj.Annotations = map[string]string{}
539540
}
540-
obj.Annotations["app.kubernetes.io/created-by"] = "admiral"
541+
obj.Annotations["app.kubernetes.io/created-by"] = "admiral"
541542
if exist == nil || exist.Spec.Hosts == nil {
542543
obj.Namespace = namespace
543544
obj.ResourceVersion = ""
544545
_, err = rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(namespace).Create(obj)
545546
op = "Add"
546-
log.Infof(LogFormat + " SE=%s", op, "ServiceEntry", obj.Name, rc.ClusterID, "New SE", obj.Spec.String())
547+
log.Infof(LogFormat+" SE=%s", op, "ServiceEntry", obj.Name, rc.ClusterID, "New SE", obj.Spec.String())
547548
} else {
548549
exist.Labels = obj.Labels
549550
exist.Annotations = obj.Annotations
550551
op = "Update"
551552
skipUpdate, diff := skipDestructiveUpdate(rc, obj, exist)
552553
if diff != "" {
553-
log.Infof(LogFormat + " diff=%s", op, "ServiceEntry", obj.Name, rc.ClusterID, "Diff in update", diff)
554+
log.Infof(LogFormat+" diff=%s", op, "ServiceEntry", obj.Name, rc.ClusterID, "Diff in update", diff)
554555
}
555556
if skipUpdate {
556557
log.Infof(LogFormat, op, "ServiceEntry", obj.Name, rc.ClusterID, "Update skipped as it was destructive during Admiral's bootup phase")
@@ -573,7 +574,7 @@ func skipDestructiveUpdate(rc *RemoteController, new *v1alpha3.ServiceEntry, old
573574
skipDestructive = false
574575
destructive, diff := getServiceEntryDiff(new, old)
575576
//do not update SEs during bootup phase if they are destructive
576-
if time.Since(rc.StartTime) < (2 * common.GetAdmiralParams().CacheRefreshDuration) && destructive {
577+
if time.Since(rc.StartTime) < (2*common.GetAdmiralParams().CacheRefreshDuration) && destructive {
577578
skipDestructive = true
578579
}
579580

@@ -603,17 +604,17 @@ func getServiceEntryDiff(new *v1alpha3.ServiceEntry, old *v1alpha3.ServiceEntry)
603604
found[nEndpoint.Address] = "1"
604605
if !reflect.DeepEqual(val, nEndpoint) {
605606
destructive = true
606-
buffer.WriteString(fmt.Sprintf(format, "endpoint", "Update", val.String(), nEndpoint.String()))
607+
buffer.WriteString(fmt.Sprintf(format, "endpoint", "Update", val.String(), nEndpoint.String()))
607608
}
608609
} else {
609-
buffer.WriteString(fmt.Sprintf(format, "endpoint", "Add", "", nEndpoint.String()))
610+
buffer.WriteString(fmt.Sprintf(format, "endpoint", "Add", "", nEndpoint.String()))
610611
}
611612
}
612613

613614
for key := range oldEndpointMap {
614615
if _, ok := found[key]; !ok {
615616
destructive = true
616-
buffer.WriteString(fmt.Sprintf(format, "endpoint", "Delete", oldEndpointMap[key].String(), ""))
617+
buffer.WriteString(fmt.Sprintf(format, "endpoint", "Delete", oldEndpointMap[key].String(), ""))
617618
}
618619
}
619620

@@ -638,7 +639,7 @@ func addUpdateDestinationRule(obj *v1alpha3.DestinationRule, exist *v1alpha3.Des
638639
if obj.Annotations == nil {
639640
obj.Annotations = map[string]string{}
640641
}
641-
obj.Annotations["app.kubernetes.io/created-by"] = "admiral"
642+
obj.Annotations["app.kubernetes.io/created-by"] = "admiral"
642643
if exist == nil || exist.Name == "" || exist.Spec.Host == "" {
643644
obj.Namespace = namespace
644645
obj.ResourceVersion = ""
@@ -714,22 +715,24 @@ func getServiceForDeployment(rc *RemoteController, deployment *k8sAppsV1.Deploym
714715
return matchedService
715716
}
716717

717-
func getDependentClusters(dependents *common.Map, identityClusterCache *common.MapOfMaps, sourceServices map[string]*k8sV1.Service) map[string]string {
718+
func getDependentClusters(dependents map[string]string, identityClusterCache *common.MapOfMaps, sourceServices map[string]*k8sV1.Service) map[string]string {
718719
var dependentClusters = make(map[string]string)
719-
//TODO optimize this map construction
720-
if dependents != nil {
721-
for identity, clusters := range identityClusterCache.Map() {
722-
for depIdentity := range dependents.Map() {
723-
if identity == depIdentity {
724-
for _, clusterId := range clusters.Map() {
725-
_, ok := sourceServices[clusterId]
726-
if !ok {
727-
dependentClusters[clusterId] = clusterId
728-
}
729-
}
730-
}
731-
}
720+
721+
if dependents == nil {
722+
return dependentClusters
723+
}
724+
725+
for depIdentity := range dependents {
726+
clusters := identityClusterCache.Get(depIdentity)
727+
if clusters == nil {
728+
continue
732729
}
730+
clusters.Range(func(k string, clusterID string) {
731+
_, ok := sourceServices[clusterID]
732+
if !ok {
733+
dependentClusters[clusterID] = clusterID
734+
}
735+
})
733736
}
734737
return dependentClusters
735738
}
@@ -761,7 +764,7 @@ func getServiceForRollout(rc *RemoteController, rollout *argo.Rollout) map[strin
761764
return nil
762765
}
763766

764-
var canaryService, stableService, virtualServiceRouteName string
767+
var canaryService, stableService, virtualServiceRouteName string
765768

766769
var istioCanaryWeights = make(map[string]int32)
767770

@@ -827,7 +830,6 @@ func getServiceForRollout(rc *RemoteController, rollout *argo.Rollout) map[strin
827830

828831
var matchedServices = make(map[string]*WeightedService)
829832

830-
831833
//if we have more than one matching service we will pick the first one, for this to be deterministic we sort services
832834
var servicesInNamespace = cachedService.Service[rollout.Namespace]
833835

0 commit comments

Comments
 (0)