Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .changelog/3736.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
```release-note:bug
control-plane: fix an issue where ACL token cleanup did not respect a pod's GracefulShutdownPeriodSeconds and
tokens were invalidated immediately on pod entering Terminating state.
```
13 changes: 9 additions & 4 deletions acceptance/framework/consul/helm_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
gwv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

"github.com/hashicorp/consul-k8s/control-plane/api/v1alpha1"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/sdk/testutil/retry"

"github.com/hashicorp/consul-k8s/acceptance/framework/config"
"github.com/hashicorp/consul-k8s/acceptance/framework/environment"
"github.com/hashicorp/consul-k8s/acceptance/framework/helpers"
"github.com/hashicorp/consul-k8s/acceptance/framework/k8s"
"github.com/hashicorp/consul-k8s/acceptance/framework/logger"
"github.com/hashicorp/consul-k8s/acceptance/framework/portforward"
"github.com/hashicorp/consul-k8s/control-plane/api/v1alpha1"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/sdk/testutil/retry"
)

// HelmCluster implements Cluster and uses Helm
Expand Down Expand Up @@ -752,6 +753,10 @@ func defaultValues() map[string]string {
// (false positive).
"dns.enabled": "false",

// Adjust the default value from 30s to 1s since we have several tests that verify tokens are cleaned up,
// and many of them are using the default retryer (7s max).
"connectInject.sidecarProxy.lifecycle.defaultShutdownGracePeriodSeconds": "1",

// Enable trace logs for servers and clients.
"server.extraConfig": `"{\"log_level\": \"TRACE\"}"`,
"client.extraConfig": `"{\"log_level\": \"TRACE\"}"`,
Expand Down
14 changes: 9 additions & 5 deletions acceptance/framework/consul/helm_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import (
"testing"

"github.com/gruntwork-io/terratest/modules/k8s"
"github.com/hashicorp/consul-k8s/acceptance/framework/config"
"github.com/hashicorp/consul-k8s/acceptance/framework/environment"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/stretchr/testify/require"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"sigs.k8s.io/controller-runtime/pkg/client"
runtimefake "sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/hashicorp/consul-k8s/acceptance/framework/config"
"github.com/hashicorp/consul-k8s/acceptance/framework/environment"
)

// Test that if TestConfig has values that need to be provided
Expand All @@ -33,7 +34,8 @@ func TestNewHelmCluster(t *testing.T) {
"global.image": "test-config-image",
"global.logLevel": "debug",
"server.replicas": "1",
"connectInject.transparentProxy.defaultEnabled": "false",
"connectInject.transparentProxy.defaultEnabled": "false",
"connectInject.sidecarProxy.lifecycle.defaultShutdownGracePeriodSeconds": "1",
"dns.enabled": "false",
"server.extraConfig": `"{\"log_level\": \"TRACE\"}"`,
"client.extraConfig": `"{\"log_level\": \"TRACE\"}"`,
Expand All @@ -46,7 +48,8 @@ func TestNewHelmCluster(t *testing.T) {
"global.logLevel": "debug",
"server.bootstrapExpect": "3",
"server.replicas": "3",
"connectInject.transparentProxy.defaultEnabled": "true",
"connectInject.transparentProxy.defaultEnabled": "true",
"connectInject.sidecarProxy.lifecycle.defaultShutdownGracePeriodSeconds": "3",
"dns.enabled": "true",
"server.extraConfig": `"{\"foo\": \"bar\"}"`,
"client.extraConfig": `"{\"foo\": \"bar\"}"`,
Expand All @@ -57,7 +60,8 @@ func TestNewHelmCluster(t *testing.T) {
"global.logLevel": "debug",
"server.bootstrapExpect": "3",
"server.replicas": "3",
"connectInject.transparentProxy.defaultEnabled": "true",
"connectInject.transparentProxy.defaultEnabled": "true",
"connectInject.sidecarProxy.lifecycle.defaultShutdownGracePeriodSeconds": "3",
"dns.enabled": "true",
"server.extraConfig": `"{\"foo\": \"bar\"}"`,
"client.extraConfig": `"{\"foo\": \"bar\"}"`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"net"
"regexp"
"strconv"
"strings"
"time"

mapset "github.com/deckarep/golang-set"
"github.com/go-logr/logr"
Expand All @@ -25,6 +27,7 @@ import (

"github.com/hashicorp/consul-k8s/control-plane/connect-inject/common"
"github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants"
"github.com/hashicorp/consul-k8s/control-plane/connect-inject/lifecycle"
"github.com/hashicorp/consul-k8s/control-plane/connect-inject/metrics"
"github.com/hashicorp/consul-k8s/control-plane/consul"
"github.com/hashicorp/consul-k8s/control-plane/helper/parsetags"
Expand Down Expand Up @@ -94,6 +97,8 @@ type Controller struct {
// any created Consul namespaces to allow cross namespace service discovery.
// Only necessary if ACLs are enabled.
CrossNSACLPolicy string
// Lifecycle config set graceful startup/shutdown defaults for pods.
LifecycleConfig lifecycle.Config
// ReleaseName is the Consul Helm installation release.
ReleaseName string
// ReleaseNamespace is the namespace where Consul is installed.
Expand Down Expand Up @@ -153,18 +158,14 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}

err = r.Client.Get(ctx, req.NamespacedName, &serviceEndpoints)
// endpointPods holds a set of all pods this endpoints object is currently pointing to.
// We use this later when we reconcile ACL tokens to decide whether an ACL token in Consul
// is for a pod that no longer exists.
endpointPods := mapset.NewSet()

// If the endpoints object has been deleted (and we get an IsNotFound
// error), we need to deregister all instances in Consul for that service.
if k8serrors.IsNotFound(err) {
// Deregister all instances in Consul for this service. The function deregisterService handles
// the case where the Consul service name is different from the Kubernetes service name.
err = r.deregisterService(apiClient, req.Name, req.Namespace, nil)
return ctrl.Result{}, err
requeueAfter, err := r.deregisterService(ctx, apiClient, req.Name, req.Namespace, nil)
return ctrl.Result{RequeueAfter: requeueAfter}, err
} else if err != nil {
r.Log.Error(err, "failed to get Endpoints", "name", req.Name, "ns", req.Namespace)
return ctrl.Result{}, err
Expand All @@ -177,8 +178,8 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
if isLabeledIgnore(serviceEndpoints.Labels) {
// We always deregister the service to handle the case where a user has registered the service, then added the label later.
r.Log.Info("ignoring endpoint labeled with `consul.hashicorp.com/service-ignore: \"true\"`", "name", req.Name, "namespace", req.Namespace)
err = r.deregisterService(apiClient, req.Name, req.Namespace, nil)
return ctrl.Result{}, err
requeueAfter, err := r.deregisterService(ctx, apiClient, req.Name, req.Namespace, nil)
return ctrl.Result{RequeueAfter: requeueAfter}, err
}

// endpointAddressMap stores every IP that corresponds to a Pod in the Endpoints object. It is used to compare
Expand Down Expand Up @@ -213,12 +214,13 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}

if hasBeenInjected(pod) {
endpointPods.Add(address.TargetRef.Name)
if isConsulDataplaneSupported(pod) {
if err = r.registerServicesAndHealthCheck(apiClient, pod, serviceEndpoints, healthStatus, endpointAddressMap); err != nil {
if err = r.registerServicesAndHealthCheck(apiClient, pod, serviceEndpoints, healthStatus); err != nil {
r.Log.Error(err, "failed to register services or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
errs = multierror.Append(errs, err)
}
// Build the endpointAddressMap up for deregistering service instances later.
endpointAddressMap[pod.Status.PodIP] = true
} else {
r.Log.Info("detected an update to pre-consul-dataplane service", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
nodeAgentClientCfg, err := r.consulClientCfgForNodeAgent(apiClient, pod, serverState)
Expand All @@ -241,11 +243,12 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}

if isGateway(pod) {
endpointPods.Add(address.TargetRef.Name)
if err = r.registerGateway(apiClient, pod, serviceEndpoints, healthStatus, endpointAddressMap); err != nil {
if err = r.registerGateway(apiClient, pod, serviceEndpoints, healthStatus); err != nil {
r.Log.Error(err, "failed to register gateway or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
errs = multierror.Append(errs, err)
}
// Build the endpointAddressMap up for deregistering service instances later.
endpointAddressMap[pod.Status.PodIP] = true
}
}
}
Expand All @@ -254,12 +257,13 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
// Compare service instances in Consul with addresses in Endpoints. If an address is not in Endpoints, deregister
// from Consul. This uses endpointAddressMap which is populated with the addresses in the Endpoints object during
// the registration codepath.
if err = r.deregisterService(apiClient, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap); err != nil {
requeueAfter, err := r.deregisterService(ctx, apiClient, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap)
if err != nil {
r.Log.Error(err, "failed to deregister endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
errs = multierror.Append(errs, err)
}

return ctrl.Result{}, errs
return ctrl.Result{RequeueAfter: requeueAfter}, errs
}

func (r *Controller) Logger(name types.NamespacedName) logr.Logger {
Expand All @@ -274,10 +278,7 @@ func (r *Controller) SetupWithManager(mgr ctrl.Manager) error {

// registerServicesAndHealthCheck creates Consul registrations for the service and proxy and registers them with Consul.
// It also upserts a Kubernetes health check for the service based on whether the endpoint address is ready.
func (r *Controller) registerServicesAndHealthCheck(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string, endpointAddressMap map[string]bool) error {
// Build the endpointAddressMap up for deregistering service instances later.
endpointAddressMap[pod.Status.PodIP] = true

func (r *Controller) registerServicesAndHealthCheck(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) error {
var managedByEndpointsController bool
if raw, ok := pod.Labels[constants.KeyManagedBy]; ok && raw == constants.ManagedByValue {
managedByEndpointsController = true
Expand Down Expand Up @@ -335,10 +336,7 @@ func parseLocality(node corev1.Node) *api.Locality {

// registerGateway creates Consul registrations for the Connect Gateways and registers them with Consul.
// It also upserts a Kubernetes health check for the service based on whether the endpoint address is ready.
func (r *Controller) registerGateway(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string, endpointAddressMap map[string]bool) error {
// Build the endpointAddressMap up for deregistering service instances later.
endpointAddressMap[pod.Status.PodIP] = true

func (r *Controller) registerGateway(apiClient *api.Client, pod corev1.Pod, serviceEndpoints corev1.Endpoints, healthStatus string) error {
var managedByEndpointsController bool
if raw, ok := pod.Labels[constants.KeyManagedBy]; ok && raw == constants.ManagedByValue {
managedByEndpointsController = true
Expand Down Expand Up @@ -934,40 +932,54 @@ func getHealthCheckStatusReason(healthCheckStatus, podName, podNamespace string)
// The argument endpointsAddressesMap decides whether to deregister *all* service instances or selectively deregister
// them only if they are not in endpointsAddressesMap. If the map is nil, it will deregister all instances. If the map
// has addresses, it will only deregister instances not in the map.
func (r *Controller) deregisterService(apiClient *api.Client, k8sSvcName, k8sSvcNamespace string, endpointsAddressesMap map[string]bool) error {
// If the pod backing a Consul service instance still exists and the graceful shutdown lifecycle mode is enabled, the instance
// will not be deregistered. Instead, its health check will be updated to Critical in order to drain incoming traffic and
// this function will return a requeueAfter duration. This can be used to requeue the event at the longest shutdown time
// interval to clean up these instances after they have exited.
func (r *Controller) deregisterService(
ctx context.Context,
apiClient *api.Client,
k8sSvcName string,
k8sSvcNamespace string,
endpointsAddressesMap map[string]bool) (time.Duration, error) {

// Get services matching metadata from Consul
serviceInstances, err := r.serviceInstances(apiClient, k8sSvcName, k8sSvcNamespace)
if err != nil {
r.Log.Error(err, "failed to get service instances", "name", k8sSvcName)
return err
return 0, err
}

var errs error
var requeueAfter time.Duration
for _, svc := range serviceInstances {
// We need to get services matching "k8s-service-name" and "k8s-namespace" metadata.
// If we selectively deregister, only deregister if the address is not in the map. Otherwise, deregister
// every service instance.
var serviceDeregistered bool
if endpointsAddressesMap != nil {
if _, ok := endpointsAddressesMap[svc.ServiceAddress]; !ok {
// If the service address is not in the Endpoints addresses, deregister it.
r.Log.Info("deregistering service from consul", "svc", svc.ServiceID)
_, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{
Node: svc.Node,
ServiceID: svc.ServiceID,
Namespace: svc.Namespace,
}, nil)
if err != nil {
// Do not exit right away as there might be other services that need to be deregistered.
r.Log.Error(err, "failed to deregister service instance", "id", svc.ServiceID)
errs = multierror.Append(errs, err)
} else {
serviceDeregistered = true
}

if addressIsMissingFromEndpointsMap(svc.ServiceAddress, endpointsAddressesMap) {
// If graceful shutdown is enabled, continue to the next service instance and
// mark that an event requeue is needed. We should requeue at the longest time interval
// to prevent excessive re-queues. Also, updating the health status in Consul to Critical
// should prevent routing during gracefulShutdown.
podShutdownDuration, err := r.getGracefulShutdownAndUpdatePodCheck(ctx, apiClient, svc, k8sSvcNamespace)
if err != nil {
r.Log.Error(err, "failed to get pod shutdown duration", "svc", svc.ServiceName)
errs = multierror.Append(errs, err)
}
} else {

// set requeue response, then continue to the next service instance
if podShutdownDuration > requeueAfter {
requeueAfter = podShutdownDuration
}
if podShutdownDuration > 0 {
continue
}

// If the service address is not in the Endpoints addresses, deregister it.
r.Log.Info("deregistering service from consul", "svc", svc.ServiceID)
_, err := apiClient.Catalog().Deregister(&api.CatalogDeregistration{
_, err = apiClient.Catalog().Deregister(&api.CatalogDeregistration{
Node: svc.Node,
ServiceID: svc.ServiceID,
Namespace: svc.Namespace,
Expand Down Expand Up @@ -999,8 +1011,87 @@ func (r *Controller) deregisterService(apiClient *api.Client, k8sSvcName, k8sSvc
}
}

return errs
if requeueAfter > 0 {
r.Log.Info("re-queueing event for graceful shutdown", "name", k8sSvcName, "k8sNamespace", k8sSvcNamespace, "requeueAfter", requeueAfter)
}

return requeueAfter, errs
}

// getGracefulShutdownAndUpdatePodCheck checks if the pod is in the process of being terminated and if so, updates the
// health status of the service to critical. It returns the duration for which the pod should be re-queued (which is the pods
// gracefulShutdownPeriod setting).
func (r *Controller) getGracefulShutdownAndUpdatePodCheck(ctx context.Context, apiClient *api.Client, svc *api.CatalogService, k8sNamespace string) (time.Duration, error) {
// Get the pod, and check if it is still running. We do this to defer ACL/node cleanup for pods that are
// in graceful termination
podName := svc.ServiceMeta[constants.MetaKeyPodName]
if podName == "" {
return 0, nil
}

var pod corev1.Pod
err := r.Client.Get(ctx, types.NamespacedName{Name: podName, Namespace: k8sNamespace}, &pod)
if k8serrors.IsNotFound(err) {
return 0, nil
}
if err != nil {
r.Log.Error(err, "failed to get terminating pod", "name", podName, "k8sNamespace", k8sNamespace)
return 0, fmt.Errorf("failed to get terminating pod %s/%s: %w", k8sNamespace, podName, err)
}

shutdownSeconds, err := r.getGracefulShutdownPeriodSecondsForPod(pod)
if err != nil {
r.Log.Error(err, "failed to get graceful shutdown period for pod", "name", pod, "k8sNamespace", k8sNamespace)
return 0, fmt.Errorf("failed to get graceful shutdown period for pod %s/%s: %w", k8sNamespace, podName, err)
}

if shutdownSeconds > 0 {
// Update the health status of the service to critical so that we can drain inbound traffic.
// We don't need to handle the proxy service since that will be reconciled looping through all the service instances.
serviceRegistration := &api.CatalogRegistration{
Node: common.ConsulNodeNameFromK8sNode(pod.Spec.NodeName),
Address: pod.Status.HostIP,
// Service is nil since we are patching the health status
Check: &api.AgentCheck{
CheckID: consulHealthCheckID(pod.Namespace, svc.ServiceID),
Name: constants.ConsulKubernetesCheckName,
Type: constants.ConsulKubernetesCheckType,
Status: api.HealthCritical,
ServiceID: svc.ServiceID,
Output: fmt.Sprintf("Pod \"%s/%s\" is terminating", pod.Namespace, podName),
Namespace: r.consulNamespace(pod.Namespace),
},
SkipNodeUpdate: true,
}

r.Log.Info("updating health status of service with Consul to critical in order to drain inbound traffic", "name", svc.ServiceName,
"id", svc.ServiceID, "pod", podName, "k8sNamespace", pod.Namespace)
_, err = apiClient.Catalog().Register(serviceRegistration, nil)
if err != nil {
r.Log.Error(err, "failed to update service health status to critical", "name", svc.ServiceName, "pod", podName)
return 0, fmt.Errorf("failed to update service health status for pod %s/%s to critical: %w", pod.Namespace, podName, err)
}

// Return the duration for which the pod should be re-queued. We add 20% to the shutdownSeconds to account for
// any potential delay in the pod killed.
return time.Duration(shutdownSeconds+int(math.Ceil(float64(shutdownSeconds)*0.2))) * time.Second, nil
}
return 0, nil
}

// getGracefulShutdownPeriodSecondsForPod returns the graceful shutdown period for the pod. If one is not specified,
// either through the controller configuration or pod annotations, it returns 0.
func (r *Controller) getGracefulShutdownPeriodSecondsForPod(pod corev1.Pod) (int, error) {
enabled, err := r.LifecycleConfig.EnableProxyLifecycle(pod)
if err != nil {
return 0, fmt.Errorf("failed to get parse proxy lifecycle configuration for pod %s/%s: %w", pod.Namespace, pod.Name, err)
}
// Check that SidecarProxyLifecycle is enabled.
if !enabled {
return 0, nil
}

return r.LifecycleConfig.ShutdownGracePeriodSeconds(pod)
}

// deregisterNode removes a node if it does not have any associated services attached to it.
Expand Down Expand Up @@ -1529,3 +1620,11 @@ func getMultiPortIdx(pod corev1.Pod, serviceEndpoints corev1.Endpoints) int {
}
return -1
}

func addressIsMissingFromEndpointsMap(address string, endpointsAddressesMap map[string]bool) bool {
if endpointsAddressesMap == nil {
return true
}
_, ok := endpointsAddressesMap[address]
return !ok
}
Loading