Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions control-plane/api-gateway/cache/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,24 @@ func (r *GatewayCache) ServicesFor(ref api.ResourceReference) []api.CatalogServi
return r.data[common.NormalizeMeta(ref)]
}

func (r *GatewayCache) FetchServicesFor(ctx context.Context, ref api.ResourceReference) ([]api.CatalogService, error) {
client, err := consul.NewClientFromConnMgr(r.config, r.serverMgr)
if err != nil {
return nil, err
}

opts := &api.QueryOptions{}
if ref.Namespace != "" {
opts.Namespace = ref.Namespace
}

services, _, err := client.Catalog().Service(ref.Name, "", opts.WithContext(ctx))
if err != nil {
return nil, err
}
return common.DerefAll(services), nil
}

func (r *GatewayCache) EnsureSubscribed(ref api.ResourceReference, resource types.NamespacedName) {
r.mutex.Lock()
defer r.mutex.Unlock()
Expand Down
50 changes: 39 additions & 11 deletions control-plane/api-gateway/controllers/gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,12 @@ func (r *GatewayController) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, err
}
r.gatewayCache.RemoveSubscription(nonNormalizedConsulKey)
// make sure we have deregister all services even if they haven't
// hit cache yet
if err := r.deregisterAllServices(ctx, consulKey); err != nil {
log.Error(err, "error deregistering services")
return ctrl.Result{}, err
}
}

for _, deletion := range updates.Consul.Deletions {
Expand All @@ -235,19 +241,24 @@ func (r *GatewayController) Reconcile(ctx context.Context, req ctrl.Request) (ct
}
}

for _, registration := range updates.Consul.Registrations {
log.Info("registering service in Consul", "service", registration.Service.Service, "id", registration.Service.ID)
if err := r.cache.Register(ctx, registration); err != nil {
log.Error(err, "error registering service")
return ctrl.Result{}, err
if updates.UpsertGatewayDeployment {
// We only do some registration/deregistraion if we still have a valid gateway
// otherwise, we've already deregistered everything related to the gateway, so
// no need to do any of the following.
for _, registration := range updates.Consul.Registrations {
log.Info("registering service in Consul", "service", registration.Service.Service, "id", registration.Service.ID)
if err := r.cache.Register(ctx, registration); err != nil {
log.Error(err, "error registering service")
return ctrl.Result{}, err
}
}
}

for _, deregistration := range updates.Consul.Deregistrations {
log.Info("deregistering service in Consul", "id", deregistration.ServiceID)
if err := r.cache.Deregister(ctx, deregistration); err != nil {
log.Error(err, "error deregistering service")
return ctrl.Result{}, err
for _, deregistration := range updates.Consul.Deregistrations {
log.Info("deregistering service in Consul", "id", deregistration.ServiceID)
if err := r.cache.Deregister(ctx, deregistration); err != nil {
log.Error(err, "error deregistering service")
return ctrl.Result{}, err
}
}
}

Expand All @@ -270,6 +281,23 @@ func (r *GatewayController) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, nil
}

func (r *GatewayController) deregisterAllServices(ctx context.Context, consulKey api.ResourceReference) error {
services, err := r.gatewayCache.FetchServicesFor(ctx, consulKey)
if err != nil {
return err
}
for _, service := range services {
if err := r.cache.Deregister(ctx, api.CatalogDeregistration{
Node: service.Node,
ServiceID: service.ServiceID,
Namespace: service.Namespace,
}); err != nil {
return err
}
}
return nil
}

func (r *GatewayController) updateAndResetStatus(ctx context.Context, o client.Object) error {
// we create a copy so that we can re-update its status if need be
status := reflect.ValueOf(o.DeepCopyObject()).Elem().FieldByName("Status")
Expand Down