-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
152 lines (131 loc) · 5 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package main
import (
"os"
"os/signal"
"syscall"
log "github.com/sirupsen/logrus"
versionedclient "istio.io/client-go/pkg/clientset/versioned"
api_v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
)
var (
// The label value uses the format <namespace>.<name>
serviceLabel string = "virtualservice.httproute/VirtualServiceName"
serviceAnnotation string = "virtualservice.httproute/PortNumber"
)
// retrieve the Kubernetes cluster client from outside of the cluster
func getKubernetesClients() (*kubernetes.Clientset, *versionedclient.Clientset) {
// construct the path to resolve to `~/.kube/config`
kubeConfigPath := "" // os.Getenv("HOME") + "/.kube/config"
// create the config from the path
config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
if err != nil {
log.Fatalf("getClusterConfig: %v", err)
}
// generate the client based off of the config
client, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Failed to create kubernetes client: %v", err)
}
istioClient, err := versionedclient.NewForConfig(config)
if err != nil {
log.Fatalf("Failed to create istio client: %v", err)
}
log.Info("Successfully constructed k8s client")
return client, istioClient
}
func main() {
// get the Kubernetes client for connectivity
client, istioClient := getKubernetesClients()
namespace := meta_v1.NamespaceAll
// stored deleted services
deletedIndexer := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{})
// create a new queue so that when the informer gets a resource that is either
// a result of listing or watching, we can add an idenfitying key to the queue
// so that it can be handled in the handler
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// create the informer so that we can not only list resources
// but also watch them for all services in all namespaces
informer := cache.NewSharedIndexInformer(
// the ListWatch contains two different functions that our
// informer requires: ListFunc to take care of listing and watching
// the resources we want to handle
&cache.ListWatch{
ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) {
options.LabelSelector = serviceLabel
// list all of the services (core resource)
return client.CoreV1().Services(namespace).List(options)
},
WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) {
options.LabelSelector = serviceLabel
// watch all of the services (core resource)
return client.CoreV1().Services(namespace).Watch(options)
},
},
&api_v1.Service{}, // the target type (Service)
0, // no resync (period of 0)
cache.Indexers{},
)
// add event handlers to handle the three types of events for resources:
// - adding new resources
// - updating existing resources
// - deleting resources
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// convert the resource object into a key (in this case
// we are just doing it in the format of 'namespace/name')
key, err := cache.MetaNamespaceKeyFunc(obj)
log.Infof("Add service: %s", key)
if err == nil {
// add the key to the queue for the handler to get
queue.Add(key)
deletedIndexer.Delete(obj)
log.Infof(" Queue len: %d", queue.Len())
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
log.Infof("Update service: %s", key)
if err == nil {
queue.Add(key)
log.Infof(" Queue len: %d", queue.Len())
}
},
DeleteFunc: func(obj interface{}) {
// DeletionHandlingMetaNamsespaceKeyFunc is a helper function that allows
// us to check the DeletedFinalStateUnknown existence in the event that
// a resource was deleted but it is still contained in the index
//
// this then in turn calls MetaNamespaceKeyFunc
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
log.Infof("Delete service: %s", key)
if err == nil {
queue.Add(key)
deletedIndexer.Add(obj)
log.Infof(" Queue len: %d", queue.Len())
}
},
})
// construct the Controller object which has all of the necessary components to
// handle logging, connections, informing (listing and watching) and deleted indexer, the queue,
// and the handler
controller := NewController(queue, informer, deletedIndexer, client, istioClient)
// use a channel to synchronize the finalization for a graceful shutdown
stopCh := make(chan struct{})
defer close(stopCh)
// run the controller loop to process items
go controller.Run(stopCh)
// use a channel to handle OS signals to terminate and gracefully shut
// down processing
sigTerm := make(chan os.Signal, 1)
signal.Notify(sigTerm, syscall.SIGTERM)
signal.Notify(sigTerm, syscall.SIGINT)
<-sigTerm
log.Info("Shutting down....")
}