Skip to content

Commit

Permalink
Merge pull request #160 from nginxinc/add-events
Browse files Browse the repository at this point in the history
Add support for events
  • Loading branch information
pleshakov authored Jul 30, 2017
2 parents 6702ee6 + f7111a1 commit 7eaa2f1
Show file tree
Hide file tree
Showing 23 changed files with 4,250 additions and 33 deletions.
21 changes: 21 additions & 0 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 57 additions & 8 deletions nginx-controller/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
scheme "k8s.io/client-go/kubernetes/scheme"
core_v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"

meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
api_v1 "k8s.io/client-go/pkg/api/v1"
Expand Down Expand Up @@ -60,6 +63,7 @@ type LoadBalancerController struct {
cnf *nginx.Configurator
watchNginxConfigMaps bool
nginxPlus bool
recorder record.EventRecorder
}

var keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc
Expand All @@ -73,6 +77,14 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, resyncPeriod tim
nginxPlus: nginxPlus,
}

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&core_v1.EventSinkImpl{
Interface: core_v1.New(kubeClient.Core().RESTClient()).Events(""),
})
lbc.recorder = eventBroadcaster.NewRecorder(scheme.Scheme,
api_v1.EventSource{Component: "nginx-ingress-controller"})

lbc.ingQueue = NewTaskQueue(lbc.syncIng)
lbc.endpQueue = NewTaskQueue(lbc.syncEndp)

Expand Down Expand Up @@ -289,15 +301,17 @@ func (lbc *LoadBalancerController) syncEndp(key string) {
}
ingEx, err := lbc.createIngress(&ing)
if err != nil {
glog.Warningf("Error updating endpoints for %v/%v: %v, skipping", ing.Namespace, ing.Name, err)
glog.Errorf("Error updating endpoints for %v/%v: %v, skipping", ing.Namespace, ing.Name, err)
continue
}
glog.V(3).Infof("Updating Endpoints for %v/%v", ing.Name, ing.Namespace)
name := ing.Namespace + "-" + ing.Name
lbc.cnf.UpdateEndpoints(name, ingEx)
if err != nil {
glog.Errorf("Error updating endpoints for %v/%v: %v", ing.Namespace, ing.Name, err)
}
}
}

}

func (lbc *LoadBalancerController) syncCfgm(key string) {
Expand Down Expand Up @@ -498,14 +512,38 @@ func (lbc *LoadBalancerController) syncCfgm(key string) {
}

}
lbc.cnf.UpdateConfig(cfg)

var ingExes []*nginx.IngressEx
ings, _ := lbc.ingLister.List()
for _, ing := range ings.Items {
if !isNginxIngress(&ing) {
for i, _ := range ings.Items {
if !isNginxIngress(&ings.Items[i]) {
continue
}
lbc.ingQueue.enqueue(&ing)
ingEx, err := lbc.createIngress(&ings.Items[i])
if err != nil {
continue
}

ingExes = append(ingExes, ingEx)
}

if err := lbc.cnf.UpdateConfig(cfg, ingExes); err != nil {
if cfgmExists {
cfgm := obj.(*api_v1.ConfigMap)
lbc.recorder.Eventf(cfgm, api_v1.EventTypeWarning, "UpdatedWithError", "Configuration from %v was updated, but not applied: %v", key, err)
}
for _, ingEx := range ingExes {
lbc.recorder.Eventf(ingEx.Ingress, api_v1.EventTypeWarning, "UpdatedWithError", "Configuration for %v/%v was updated, but not applied: %v",
ingEx.Ingress.Name, ingEx.Ingress.Namespace, err)
}
} else {
if cfgmExists {
cfgm := obj.(*api_v1.ConfigMap)
lbc.recorder.Eventf(cfgm, api_v1.EventTypeNormal, "Updated", "Configuration from %v was updated", key)
}
for _, ingEx := range ingExes {
lbc.recorder.Eventf(ingEx.Ingress, api_v1.EventTypeNormal, "Updated", "Configuration for %v/%v was updated", ingEx.Ingress.Name, ingEx.Ingress.Namespace)
}
}
}

Expand All @@ -523,17 +561,26 @@ func (lbc *LoadBalancerController) syncIng(key string) {

if !ingExists {
glog.V(2).Infof("Deleting Ingress: %v\n", key)
lbc.cnf.DeleteIngress(name)
err := lbc.cnf.DeleteIngress(name)
if err != nil {
glog.Errorf("Error when deleting configuration for %v: %v", name, err)
}
} else {
glog.V(2).Infof("Adding or Updating Ingress: %v\n", key)

ing := obj.(*extensions.Ingress)
ingEx, err := lbc.createIngress(ing)
if err != nil {
lbc.ingQueue.requeueAfter(key, err, 5*time.Second)
lbc.recorder.Eventf(ing, api_v1.EventTypeWarning, "Rejected", "%v was rejected: %v", key, err)
return
}
lbc.cnf.AddOrUpdateIngress(name, ingEx)
err = lbc.cnf.AddOrUpdateIngress(name, ingEx)
if err != nil {
lbc.recorder.Eventf(ing, api_v1.EventTypeWarning, "AddedOrUpdatedWithError", "Configuration for %v was added or updated, but not applied: %v", key, err)
} else {
lbc.recorder.Eventf(ing, api_v1.EventTypeNormal, "AddedOrUpdated", "Configuration for %v was added or updated", key)
}
}
}

Expand Down Expand Up @@ -591,6 +638,7 @@ func (lbc *LoadBalancerController) createIngress(ing *extensions.Ingress) (*ngin
endps, err := lbc.getEndpointsForIngressBackend(ing.Spec.Backend, ing.Namespace)
if err != nil {
glog.Warningf("Error retrieving endpoints for the service %v: %v", ing.Spec.Backend.ServiceName, err)
ingEx.Endpoints[ing.Spec.Backend.ServiceName+ing.Spec.Backend.ServicePort.String()] = []string{}
} else {
ingEx.Endpoints[ing.Spec.Backend.ServiceName+ing.Spec.Backend.ServicePort.String()] = endps
}
Expand All @@ -605,6 +653,7 @@ func (lbc *LoadBalancerController) createIngress(ing *extensions.Ingress) (*ngin
endps, err := lbc.getEndpointsForIngressBackend(&path.Backend, ing.Namespace)
if err != nil {
glog.Warningf("Error retrieving endpoints for the service %v: %v", path.Backend.ServiceName, err)
ingEx.Endpoints[path.Backend.ServiceName+path.Backend.ServicePort.String()] = []string{}
} else {
ingEx.Endpoints[path.Backend.ServiceName+path.Backend.ServicePort.String()] = endps
}
Expand Down
60 changes: 39 additions & 21 deletions nginx-controller/nginx/configurator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"strings"
"sync"
"time"

"github.com/golang/glog"
"github.com/nginxinc/kubernetes-ingress/nginx-controller/nginx/plus"
Expand Down Expand Up @@ -38,21 +37,22 @@ func (cnf *Configurator) AddOrUpdateDHParam(content string) (string, error) {
}

// AddOrUpdateIngress adds or updates NGINX configuration for an Ingress resource
func (cnf *Configurator) AddOrUpdateIngress(name string, ingEx *IngressEx) {
func (cnf *Configurator) AddOrUpdateIngress(name string, ingEx *IngressEx) error {
cnf.lock.Lock()
defer cnf.lock.Unlock()

cnf.addOrUpdateIngress(name, ingEx)

if err := cnf.nginx.Reload(); err != nil {
return fmt.Errorf("Error when adding or updating ingress %v: %v", name, err)
}
return nil
}

func (cnf *Configurator) addOrUpdateIngress(name string, ingEx *IngressEx) {
pems := cnf.updateCertificates(ingEx)
nginxCfg := cnf.generateNginxCfg(ingEx, pems)
cnf.nginx.AddOrUpdateIngress(name, nginxCfg)
if err := cnf.nginx.Reload(); err != nil {
glog.Errorf("Error when adding or updating ingress %q: %q", name, err)
} else {
if cnf.isPlus() {
time.Sleep(500 * time.Millisecond)
cnf.updatePlusEndpoints(name, ingEx)
}
}
}

func (cnf *Configurator) updateCertificates(ingEx *IngressEx) map[string]string {
Expand Down Expand Up @@ -439,12 +439,14 @@ func createLocation(path string, upstream Upstream, cfg *Config, websocket bool,
}

func (cnf *Configurator) createUpstream(ingEx *IngressEx, name string, backend *extensions.IngressBackend, namespace string, stickyCookie string) Upstream {
var ups Upstream

if cnf.isPlus() {
return Upstream{Name: name, StickyCookie: stickyCookie}
ups = Upstream{Name: name, StickyCookie: stickyCookie}
} else {
ups = NewUpstreamWithDefaultServer(name)
}

ups := NewUpstreamWithDefaultServer(name)

endps, exists := ingEx.Endpoints[backend.ServiceName+backend.ServicePort.String()]
if exists {
var upsServers []UpstreamServer
Expand Down Expand Up @@ -482,26 +484,32 @@ func upstreamMapToSlice(upstreams map[string]Upstream) []Upstream {
}

// DeleteIngress deletes NGINX configuration for an Ingress resource
func (cnf *Configurator) DeleteIngress(name string) {
func (cnf *Configurator) DeleteIngress(name string) error {
cnf.lock.Lock()
defer cnf.lock.Unlock()

cnf.nginx.DeleteIngress(name)
if err := cnf.nginx.Reload(); err != nil {
glog.Errorf("Error when removing ingress %q: %q", name, err)
return fmt.Errorf("Error when removing ingress %v: %v", name, err)
}
return nil
}

// UpdateEndpoints updates endpoints in NGINX configuration for an Ingress resource
func (cnf *Configurator) UpdateEndpoints(name string, ingEx *IngressEx) {
if cnf.isPlus() {
cnf.lock.Lock()
defer cnf.lock.Unlock()
func (cnf *Configurator) UpdateEndpoints(name string, ingEx *IngressEx) error {
cnf.lock.Lock()
defer cnf.lock.Unlock()

if cnf.isPlus() {
cnf.addOrUpdateIngress(name, ingEx)
cnf.updatePlusEndpoints(name, ingEx)
} else {
cnf.AddOrUpdateIngress(name, ingEx)
cnf.addOrUpdateIngress(name, ingEx)
if err := cnf.nginx.Reload(); err != nil {
return fmt.Errorf("Error reloading NGINX when updating endpoints for %v: %v", name, err)
}
}
return nil
}

func (cnf *Configurator) updatePlusEndpoints(name string, ingEx *IngressEx) {
Expand Down Expand Up @@ -533,7 +541,7 @@ func (cnf *Configurator) updatePlusEndpoints(name string, ingEx *IngressEx) {
}

// UpdateConfig updates NGINX Configuration parameters
func (cnf *Configurator) UpdateConfig(config *Config) {
func (cnf *Configurator) UpdateConfig(config *Config, ingExes []*IngressEx) error {
cnf.lock.Lock()
defer cnf.lock.Unlock()

Expand All @@ -550,6 +558,16 @@ func (cnf *Configurator) UpdateConfig(config *Config) {
}

cnf.nginx.UpdateMainConfigFile(mainCfg)

for _, ingEx := range ingExes {
cnf.addOrUpdateIngress(ingEx.Ingress.Namespace+"-"+ingEx.Ingress.Name, ingEx)
}

if err := cnf.nginx.Reload(); err != nil {
return fmt.Errorf("Error when updating config from ConfigMap: %v", err)
}

return nil
}

func (cnf *Configurator) isPlus() bool {
Expand Down
6 changes: 3 additions & 3 deletions nginx-controller/nginx/plus/nginx_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (client *NginxClient) AddHTTPServer(upstream string, server string) error {
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("Failed to add %v server to %v upstream: got expected 200 response, got %v", server, upstream, resp.StatusCode)
return fmt.Errorf("Failed to add %v server to %v upstream: expected 200 response, got %v", server, upstream, resp.StatusCode)
}

return nil
Expand All @@ -153,8 +153,8 @@ func (client *NginxClient) DeleteHTTPServer(upstream string, server string) erro
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("Failed to add %v server to %v upstream: got expected 200 response, got %v", server, upstream, resp.StatusCode)
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
return fmt.Errorf("Failed to add %v server to %v upstream: expected 200 or 204 response, got %v", server, upstream, resp.StatusCode)
}

return nil
Expand Down
3 changes: 2 additions & 1 deletion nginx-controller/nginx/templates/nginx-plus.ingress.tmpl
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{{range $upstream := .Upstreams}}
upstream {{$upstream.Name}} {
zone {{$upstream.Name}} 256k;
{{range $server := $upstream.UpstreamServers}}
server {{$server.Address}}:{{$server.Port}};{{end}}
{{if $upstream.StickyCookie}}
sticky cookie {{$upstream.StickyCookie}};
{{end}}
state /var/lib/nginx/state/{{$upstream.Name}}.state;
}{{end}}

{{range $server := .Servers}}
Expand Down
Loading

0 comments on commit 7eaa2f1

Please sign in to comment.