Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support secret controller #284

Merged
merged 7 commits into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ module github.com/apache/apisix-ingress-controller
go 1.13

require (
github.com/gavv/httpexpect/v2 v2.2.0 // indirect
github.com/gin-gonic/gin v1.6.3
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/gruntwork-io/terratest v0.32.8 // indirect
github.com/hashicorp/go-memdb v1.0.4
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/imdario/mergo v0.3.11 // indirect
Expand Down
287 changes: 0 additions & 287 deletions go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/apisix/ssl.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (s *sslClient) Delete(ctx context.Context, obj *v1.Ssl) error {
log.Debugw("try to delete ssl",
zap.String("id", obj.ID),
zap.String("cluster", s.clusterName),
zap.String("fullName", obj.FullName),
zap.String("url", s.url),
)
if err := s.cluster.HasSynced(ctx); err != nil {
Expand Down
39 changes: 38 additions & 1 deletion pkg/ingress/controller/apisix_tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package controller

import (
"fmt"
"sync"
"time"

"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -34,6 +35,13 @@ import (
listersv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/seven/state"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)

var (
// the struct of secretSSLMap is a map[secretKey string]map[sslKey string]bool
// the xxxKey is format as namespace + "/" + name
secretSSLMap = sync.Map{}
)

type ApisixTLSController struct {
Expand Down Expand Up @@ -153,8 +161,37 @@ func (c *ApisixTLSController) syncHandler(tqo *TlsQueueObj) error {
// sync to apisix
log.Debug(tls)
log.Debug(tqo)
return state.SyncSsl(tls, tqo.Ope)
err = state.SyncSsl(tls, tqo.Ope)
// sync SyncSecretSSL
secretKey := fmt.Sprintf("%s_%s", apisixTls.Spec.Secret.Namespace, apisixTls.Spec.Secret.Name)
SyncSecretSSL(secretKey, tls, tqo.Ope)
return err
}
}

// SyncSecretSSL sync the secretSSLMap
// the struct of secretSSLMap is a map[secretKey string]map[sslKey string]bool
// the xxxKey is format as namespace + "_" + name
func SyncSecretSSL(key string, ssl *v1.Ssl, operator string) {
ssls, ok := secretSSLMap.Load(key)
if ok {
sslMap := ssls.(sync.Map)
switch operator {
case state.Delete:
sslMap.Delete(ssl.ID)
secretSSLMap.Store(key, sslMap)
default:
sslMap.Store(ssl.ID, ssl)
secretSSLMap.Store(key, sslMap)
}
} else {
if operator != state.Delete {
sslMap := sync.Map{}
sslMap.Store(ssl.ID, ssl)
secretSSLMap.Store(key, sslMap)
}
}

}

func (c *ApisixTLSController) addFunc(obj interface{}) {
Expand Down
17 changes: 15 additions & 2 deletions pkg/ingress/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,18 @@ type Controller struct {
svcLister listerscorev1.ServiceLister
ingressLister kube.IngressLister
ingressInformer cache.SharedIndexInformer
secretInformer cache.SharedIndexInformer
secretLister listerscorev1.SecretLister
apisixUpstreamInformer cache.SharedIndexInformer
apisixUpstreamLister listersv1.ApisixUpstreamLister
apisixRouteLister kube.ApisixRouteLister
apisixRouteInformer cache.SharedIndexInformer

// resource controllers
endpointsController *endpointsController
ingressController *ingressController
endpointsController *endpointsController
ingressController *ingressController
secretController *secretController

apisixUpstreamController *apisixUpstreamController
apisixRouteController *apisixRouteController
}
Expand Down Expand Up @@ -159,6 +163,8 @@ func NewController(cfg *config.Config) (*Controller, error) {
svcLister: kube.CoreSharedInformerFactory.Core().V1().Services().Lister(),
ingressLister: ingressLister,
ingressInformer: ingressInformer,
secretInformer: kube.CoreSharedInformerFactory.Core().V1().Secrets().Informer(),
secretLister: kube.CoreSharedInformerFactory.Core().V1().Secrets().Lister(),
apisixRouteInformer: apisixRouteInformer,
apisixRouteLister: apisixRouteLister,
apisixUpstreamInformer: sharedInformerFactory.Apisix().V1().ApisixUpstreams().Informer(),
Expand All @@ -174,6 +180,7 @@ func NewController(cfg *config.Config) (*Controller, error) {
c.apisixUpstreamController = c.newApisixUpstreamController()
c.apisixRouteController = c.newApisixRouteController()
c.ingressController = c.newIngressController()
c.secretController = c.newSecretController()

return c, nil
}
Expand Down Expand Up @@ -307,6 +314,12 @@ func (c *Controller) run(ctx context.Context) {
c.goAttach(func() {
c.apisixRouteController.run(ctx)
})
c.goAttach(func() {
c.secretInformer.Run(ctx.Done())
})
c.goAttach(func() {
c.secretController.run(ctx)
})

ac := &Api6Controller{
KubeClientSet: c.clientset,
Expand Down
226 changes: 226 additions & 0 deletions pkg/ingress/controller/secret.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controller

import (
"context"
"fmt"
"sync"
"time"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/seven/state"
"github.com/apache/apisix-ingress-controller/pkg/types"
apisixv1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)

type secretController struct {
controller *Controller
workqueue workqueue.RateLimitingInterface
workers int
}

func (c *Controller) newSecretController() *secretController {
ctl := &secretController{
controller: c,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "Secrets"),
workers: 1,
}

ctl.controller.secretInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: ctl.onAdd,
UpdateFunc: ctl.onUpdate,
DeleteFunc: ctl.onDelete,
},
)

return ctl
}

func (c *secretController) run(ctx context.Context) {
log.Info("secret controller started")
defer log.Info("secret controller exited")

if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.secretInformer.HasSynced); !ok {
log.Error("informers sync failed")
return
}

handler := func() {
for {
obj, shutdown := c.workqueue.Get()
if shutdown {
return
}
err := func(obj interface{}) error {
defer c.workqueue.Done(obj)
event := obj.(*types.Event)
if key, ok := event.Object.(string); !ok {
c.workqueue.Forget(obj)
return fmt.Errorf("expected Secret in workqueue but got %#v", obj)
} else {
if err := c.sync(ctx, event); err != nil {
c.workqueue.AddRateLimited(obj)
log.Errorf("sync secret with ssl %s failed", key)
return fmt.Errorf("error syncing '%s': %s", key, err.Error())
}
c.workqueue.Forget(obj)
return nil
}
}(obj)
if err != nil {
runtime.HandleError(err)
}
}
}

for i := 0; i < c.workers; i++ {
go handler()
}

<-ctx.Done()
c.workqueue.ShutDown()
}

func (c *secretController) sync(ctx context.Context, ev *types.Event) error {
key := ev.Object.(string)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
log.Errorf("invalid resource key: %s", key)
return err
}
sec, err := c.controller.secretLister.Secrets(namespace).Get(name)

secretMapkey := namespace + "_" + name
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Errorw("failed to get Secret",
zap.String("key", secretMapkey),
zap.Error(err),
)
return err
}

if ev.Type != types.EventDelete {
log.Warnw("Secret was deleted before it can be delivered",
zap.String("key", secretMapkey),
)
return nil
}
}
if ev.Type == types.EventDelete {
if sec != nil {
// We still find the resource while we are processing the DELETE event,
// that means object with same namespace and name was created, discarding
// this stale DELETE event.
log.Warnw("discard the stale secret delete event since the resource still exists",
zap.String("key", secretMapkey),
)
return nil
}
sec = ev.Tombstone.(*corev1.Secret)
}
// sync SSL in APISIX which is store in secretSSLMap
// FixMe Need to update the status of CRD ApisixTls
ssls, ok := secretSSLMap.Load(secretMapkey)
if ok {
sslMap := ssls.(sync.Map)
sslMap.Range(func(_, v interface{}) bool {
ssl := v.(*apisixv1.Ssl)
ssl.FullName = ssl.ID
err = state.SyncSsl(ssl, ev.Type.String())
if err != nil {
return false
}
return true
})
}
return err
}

func (c *secretController) onAdd(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
log.Errorf("found secret object with bad namespace/name: %s, ignore it", err)
return
}
if !c.controller.namespaceWatching(key) {
return
}

c.workqueue.AddRateLimited(&types.Event{
Type: types.EventAdd,
Object: key,
})
}

func (c *secretController) onUpdate(prev, curr interface{}) {
prevSec := prev.(*corev1.Secret)
currSec := curr.(*corev1.Secret)

if prevSec.GetResourceVersion() == currSec.GetResourceVersion() {
return
}
key, err := cache.MetaNamespaceKeyFunc(currSec)
if err != nil {
log.Errorf("found secrets object with bad namespace/name: %s, ignore it", err)
return
}
if !c.controller.namespaceWatching(key) {
return
}
c.workqueue.AddRateLimited(&types.Event{
Type: types.EventUpdate,
Object: key,
})
}

func (c *secretController) onDelete(obj interface{}) {
sec, ok := obj.(*corev1.Secret)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
log.Errorf("found secrets: %+v in bad tombstone state", obj)
return
}
sec = tombstone.Obj.(*corev1.Secret)
}

key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
log.Errorf("found secret resource with bad meta namesapce key: %s", err)
return
}
// FIXME Refactor Controller.namespaceWatching to just use
// namespace after all controllers use the same way to fetch
// the object.
if !c.controller.namespaceWatching(key) {
return
}
c.workqueue.AddRateLimited(&types.Event{
Type: types.EventDelete,
Object: key,
Tombstone: sec,
})
}
2 changes: 0 additions & 2 deletions pkg/seven/state/solver.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,9 @@ func SyncSsl(ssl *v1.Ssl, method string) error {
_, err := conf.Client.Cluster(cluster).SSL().Create(context.TODO(), ssl)
return err
case Update:
// FIXME we don't know the full name of SSL.
_, err := conf.Client.Cluster(cluster).SSL().Update(context.TODO(), ssl)
return err
case Delete:
// FIXME we don't know the full name of SSL.
return conf.Client.Cluster(cluster).SSL().Delete(context.TODO(), ssl)
}
return nil
Expand Down
Loading