From ea4d0037f7d48e67c4613df27b3550133c1cad92 Mon Sep 17 00:00:00 2001 From: DanStough Date: Wed, 23 Aug 2023 16:46:19 -0400 Subject: [PATCH 1/2] feat: add v2 pod controller w/ workload lifecycle --- .../api-gateway/cache/consul_test.go | 9 +- .../catalog/to-consul/syncer_test.go | 7 +- control-plane/connect-inject/common/common.go | 25 +- .../connect-inject/common/common_test.go | 59 +- .../constants/annotations_and_labels.go | 25 +- .../connect-inject/constants/constants.go | 6 + .../endpoints/consul_client_health_checks.go | 12 +- .../consul_client_health_checks_test.go | 7 +- .../endpoints/endpoints_controller.go | 36 +- .../endpoints_controller_ent_test.go | 9 +- .../endpoints/endpoints_controller_test.go | 66 +- .../controllers/pod/pod_controller.go | 337 ++++++ .../pod/pod_controller_ent_test.go | 44 + .../controllers/pod/pod_controller_test.go | 1035 +++++++++++++++++ .../connect-inject/webhook/mesh_webhook.go | 1 + .../webhook/mesh_webhook_test.go | 50 +- control-plane/consul/resource_client.go | 3 +- control-plane/go.mod | 4 +- control-plane/go.sum | 4 +- control-plane/helper/test/test_util.go | 45 +- .../inject-connect/v2controllers.go | 63 +- .../server-acl-init/command_test.go | 17 +- 22 files changed, 1676 insertions(+), 188 deletions(-) create mode 100644 control-plane/connect-inject/controllers/pod/pod_controller.go create mode 100644 control-plane/connect-inject/controllers/pod/pod_controller_ent_test.go create mode 100644 control-plane/connect-inject/controllers/pod/pod_controller_test.go diff --git a/control-plane/api-gateway/cache/consul_test.go b/control-plane/api-gateway/cache/consul_test.go index 59570e532f..895c59e8c9 100644 --- a/control-plane/api-gateway/cache/consul_test.go +++ b/control-plane/api-gateway/cache/consul_test.go @@ -21,11 +21,12 @@ import ( "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/event" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul-k8s/control-plane/api-gateway/common" "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" "github.com/hashicorp/consul-k8s/control-plane/consul" "github.com/hashicorp/consul-k8s/control-plane/helper/test" - "github.com/hashicorp/consul/api" ) func Test_resourceCache_diff(t *testing.T) { @@ -1322,7 +1323,7 @@ func TestCache_Write(t *testing.T) { GRPCPort: port, APITimeout: 0, }, - ConsulServerConnMgr: test.MockConnMgrForIPAndPort(serverURL.Hostname(), port), + ConsulServerConnMgr: test.MockConnMgrForIPAndPort(t, serverURL.Hostname(), port), NamespacesEnabled: false, Logger: logrtest.NewTestLogger(t), }) @@ -1600,7 +1601,7 @@ func Test_Run(t *testing.T) { GRPCPort: port, APITimeout: 0, }, - ConsulServerConnMgr: test.MockConnMgrForIPAndPort(serverURL.Hostname(), port), + ConsulServerConnMgr: test.MockConnMgrForIPAndPort(t, serverURL.Hostname(), port), NamespacesEnabled: false, Logger: logrtest.NewTestLogger(t), }) @@ -2001,7 +2002,7 @@ func TestCache_Delete(t *testing.T) { GRPCPort: port, APITimeout: 0, }, - ConsulServerConnMgr: test.MockConnMgrForIPAndPort(serverURL.Hostname(), port), + ConsulServerConnMgr: test.MockConnMgrForIPAndPort(t, serverURL.Hostname(), port), NamespacesEnabled: false, Logger: logrtest.NewTestLogger(t), }) diff --git a/control-plane/catalog/to-consul/syncer_test.go b/control-plane/catalog/to-consul/syncer_test.go index 3fae7a3d16..ece2e1dd05 100644 --- a/control-plane/catalog/to-consul/syncer_test.go +++ b/control-plane/catalog/to-consul/syncer_test.go @@ -13,12 +13,13 @@ import ( "testing" "time" - "github.com/hashicorp/consul-k8s/control-plane/consul" - "github.com/hashicorp/consul-k8s/control-plane/helper/test" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul-k8s/control-plane/consul" + "github.com/hashicorp/consul-k8s/control-plane/helper/test" ) const ( @@ -233,7 +234,7 @@ func TestConsulSyncer_stopsGracefully(t *testing.T) { testClient := &test.TestServerClient{ Cfg: &consul.Config{APIClientConfig: &api.Config{}, HTTPPort: port}, - Watcher: test.MockConnMgrForIPAndPort(parsedURL.Host, port), + Watcher: test.MockConnMgrForIPAndPort(t, parsedURL.Host, port), } // Start the syncer. diff --git a/control-plane/connect-inject/common/common.go b/control-plane/connect-inject/common/common.go index a99d9fd12e..acee282739 100644 --- a/control-plane/connect-inject/common/common.go +++ b/control-plane/connect-inject/common/common.go @@ -8,8 +8,11 @@ import ( "strconv" "strings" - "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" + mapset "github.com/deckarep/golang-set" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" ) // DetermineAndValidatePort behaves as follows: @@ -90,6 +93,26 @@ func ShouldOverwriteProbes(pod corev1.Pod, globalOverwrite bool) (bool, error) { return globalOverwrite, nil } +// ShouldIgnore ignores namespaces where we don't mesh-inject. +func ShouldIgnore(namespace string, denySet, allowSet mapset.Set) bool { + // Ignores system namespaces. + if namespace == metav1.NamespaceSystem || namespace == metav1.NamespacePublic || namespace == "local-path-storage" { + return true + } + + // Ignores deny list. + if denySet.Contains(namespace) { + return true + } + + // Ignores if not in allow list or allow list is not *. + if !allowSet.Contains("*") && !allowSet.Contains(namespace) { + return true + } + + return false +} + func ConsulNodeNameFromK8sNode(nodeName string) string { return fmt.Sprintf("%s-virtual", nodeName) } diff --git a/control-plane/connect-inject/common/common_test.go b/control-plane/connect-inject/common/common_test.go index 79a9294fe2..6f623b28db 100644 --- a/control-plane/connect-inject/common/common_test.go +++ b/control-plane/connect-inject/common/common_test.go @@ -6,11 +6,13 @@ package common import ( "testing" - "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" - "github.com/hashicorp/consul-k8s/control-plane/namespaces" + mapset "github.com/deckarep/golang-set" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" + "github.com/hashicorp/consul-k8s/control-plane/namespaces" ) func TestCommonDetermineAndValidatePort(t *testing.T) { @@ -259,3 +261,56 @@ func minimal() *corev1.Pod { }, } } + +func TestShouldIgnore(t *testing.T) { + t.Parallel() + cases := []struct { + name string + namespace string + denySet mapset.Set + allowSet mapset.Set + expected bool + }{ + { + name: "system namespace", + namespace: "kube-system", + denySet: mapset.NewSetWith(), + allowSet: mapset.NewSetWith("*"), + expected: true, + }, + { + name: "other system namespace", + namespace: "local-path-storage", + denySet: mapset.NewSetWith(), + allowSet: mapset.NewSetWith("*"), + expected: true, + }, + { + name: "any namespace allowed", + namespace: "foo", + denySet: mapset.NewSetWith(), + allowSet: mapset.NewSetWith("*"), + expected: false, + }, + { + name: "in deny list", + namespace: "foo", + denySet: mapset.NewSetWith("foo"), + allowSet: mapset.NewSetWith("*"), + expected: true, + }, + { + name: "not in allow list", + namespace: "foo", + denySet: mapset.NewSetWith(), + allowSet: mapset.NewSetWith("bar"), + expected: true, + }, + } + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + actual := ShouldIgnore(tt.namespace, tt.denySet, tt.allowSet) + require.Equal(t, tt.expected, actual) + }) + } +} diff --git a/control-plane/connect-inject/constants/annotations_and_labels.go b/control-plane/connect-inject/constants/annotations_and_labels.go index 4efcc24c74..76b83eaf62 100644 --- a/control-plane/connect-inject/constants/annotations_and_labels.go +++ b/control-plane/connect-inject/constants/annotations_and_labels.go @@ -181,8 +181,12 @@ const ( // to explicitly perform the peering operation again. AnnotationPeeringVersion = "consul.hashicorp.com/peering-version" + // LegacyAnnotationConsulK8sVersion is the current version of this binary. + // TODO: remove this annotation in a future release. + LegacyAnnotationConsulK8sVersion = "consul.hashicorp.com/connect-k8s-version" + // AnnotationConsulK8sVersion is the current version of this binary. - AnnotationConsulK8sVersion = "consul.hashicorp.com/connect-k8s-version" + AnnotationConsulK8sVersion = "consul.hashicorp.com/consul-k8s-version" // LabelServiceIgnore is a label that can be added to a service to prevent it from being // registered with Consul. @@ -202,6 +206,25 @@ const ( ManagedByValue = "consul-k8s-endpoints-controller" ) +// ******************** +// V2 Exclusive Annotations & Labels +// ******************** + +const ( + // AnnotationMeshInject is the key of the annotation that controls whether + // V2 mesh injection is explicitly enabled or disabled for a pod using. + // be set to a truthy or falsy value, as parseable by strconv.ParseBool. + AnnotationMeshInject = "consul.hashicorp.com/mesh-inject" + + // KeyMeshInjectStatus is the key of the annotation that is added to + // a pod after an injection is done. + KeyMeshInjectStatus = "consul.hashicorp.com/mesh-inject-status" + + // ManagedByPodValue is used in Consul metadata to identify the manager + // of this resource. + ManagedByPodValue = "consul-k8s-pod-controller" +) + // Annotations used by Prometheus. const ( AnnotationPrometheusScrape = "prometheus.io/scrape" diff --git a/control-plane/connect-inject/constants/constants.go b/control-plane/connect-inject/constants/constants.go index ca6fe23606..506654dfe1 100644 --- a/control-plane/connect-inject/constants/constants.go +++ b/control-plane/connect-inject/constants/constants.go @@ -7,6 +7,12 @@ const ( // ConsulCAFile is the location of the Consul CA file inside the injected pod. ConsulCAFile = "/consul/connect-inject/consul-ca.pem" + // DefaultConsulNS is the default Consul namespace name. + DefaultConsulNS = "default" + + // DefaultConsulPartition is the default Consul partition name. + DefaultConsulPartition = "default" + // ProxyDefaultInboundPort is the default inbound port for the proxy. ProxyDefaultInboundPort = 20000 diff --git a/control-plane/connect-inject/controllers/endpoints/consul_client_health_checks.go b/control-plane/connect-inject/controllers/endpoints/consul_client_health_checks.go index c71ee9ba55..6d78654989 100644 --- a/control-plane/connect-inject/controllers/endpoints/consul_client_health_checks.go +++ b/control-plane/connect-inject/controllers/endpoints/consul_client_health_checks.go @@ -6,12 +6,13 @@ package endpoints import ( "fmt" - "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" - "github.com/hashicorp/consul-k8s/control-plane/consul" "github.com/hashicorp/consul-server-connection-manager/discovery" "github.com/hashicorp/consul/api" "github.com/hashicorp/go-version" corev1 "k8s.io/api/core/v1" + + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" + "github.com/hashicorp/consul-k8s/control-plane/consul" ) const minSupportedConsulDataplaneVersion = "v1.0.0-beta1" @@ -19,7 +20,12 @@ const minSupportedConsulDataplaneVersion = "v1.0.0-beta1" // isConsulDataplaneSupported returns true if the consul-k8s version on the pod supports // consul-dataplane architecture of Consul. func isConsulDataplaneSupported(pod corev1.Pod) bool { - if anno, ok := pod.Annotations[constants.AnnotationConsulK8sVersion]; ok { + anno, ok := pod.Annotations[constants.LegacyAnnotationConsulK8sVersion] + if !ok { + anno, ok = pod.Annotations[constants.AnnotationConsulK8sVersion] + } + + if ok { consulK8sVersion, err := version.NewVersion(anno) if err != nil { // Only consul-k8s v1.0.0+ (including pre-release versions) have the version annotation. So it would be diff --git a/control-plane/connect-inject/controllers/endpoints/consul_client_health_checks_test.go b/control-plane/connect-inject/controllers/endpoints/consul_client_health_checks_test.go index 36ad222d68..5aa7448ef3 100644 --- a/control-plane/connect-inject/controllers/endpoints/consul_client_health_checks_test.go +++ b/control-plane/connect-inject/controllers/endpoints/consul_client_health_checks_test.go @@ -7,14 +7,15 @@ import ( "testing" logrtest "github.com/go-logr/logr/testr" - "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" - "github.com/hashicorp/consul-k8s/control-plane/helper/test" "github.com/hashicorp/consul-server-connection-manager/discovery" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" + "github.com/hashicorp/consul-k8s/control-plane/helper/test" ) func TestIsConsulDataplaneSupported(t *testing.T) { @@ -46,7 +47,7 @@ func TestIsConsulDataplaneSupported(t *testing.T) { }, } if version != "" { - pod.ObjectMeta.Annotations[constants.AnnotationConsulK8sVersion] = version + pod.ObjectMeta.Annotations[constants.LegacyAnnotationConsulK8sVersion] = version } require.Equal(t, c.expIsConsulDataplaneSupported, isConsulDataplaneSupported(pod)) diff --git a/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go b/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go index 3f139c2662..a9f10f970b 100644 --- a/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go +++ b/control-plane/connect-inject/controllers/endpoints/endpoints_controller.go @@ -13,22 +13,22 @@ import ( mapset "github.com/deckarep/golang-set" "github.com/go-logr/logr" - "github.com/hashicorp/consul-k8s/control-plane/connect-inject/common" - "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" - "github.com/hashicorp/consul-k8s/control-plane/connect-inject/metrics" - "github.com/hashicorp/consul-k8s/control-plane/consul" - "github.com/hashicorp/consul-k8s/control-plane/helper/parsetags" - "github.com/hashicorp/consul-k8s/control-plane/namespaces" "github.com/hashicorp/consul/api" "github.com/hashicorp/go-multierror" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/common" + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/metrics" + "github.com/hashicorp/consul-k8s/control-plane/consul" + "github.com/hashicorp/consul-k8s/control-plane/helper/parsetags" + "github.com/hashicorp/consul-k8s/control-plane/namespaces" ) const ( @@ -142,7 +142,7 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu var serviceEndpoints corev1.Endpoints // Ignore the request if the namespace of the endpoint is not allowed. - if shouldIgnore(req.Namespace, r.DenyK8sNamespacesSet, r.AllowK8sNamespacesSet) { + if common.ShouldIgnore(req.Namespace, r.DenyK8sNamespacesSet, r.AllowK8sNamespacesSet) { return ctrl.Result{}, nil } @@ -1287,26 +1287,6 @@ func (r *Controller) processLabeledUpstream(pod corev1.Pod, rawUpstream string) return upstream, nil } -// shouldIgnore ignores namespaces where we don't connect-inject. -func shouldIgnore(namespace string, denySet, allowSet mapset.Set) bool { - // Ignores system namespaces. - if namespace == metav1.NamespaceSystem || namespace == metav1.NamespacePublic || namespace == "local-path-storage" { - return true - } - - // Ignores deny list. - if denySet.Contains(namespace) { - return true - } - - // Ignores if not in allow list or allow list is not *. - if !allowSet.Contains("*") && !allowSet.Contains(namespace) { - return true - } - - return false -} - // consulNamespace returns the Consul destination namespace for a provided Kubernetes namespace // depending on Consul Namespaces being enabled and the value of namespace mirroring. func (r *Controller) consulNamespace(namespace string) string { diff --git a/control-plane/connect-inject/controllers/endpoints/endpoints_controller_ent_test.go b/control-plane/connect-inject/controllers/endpoints/endpoints_controller_ent_test.go index e0c7f034b1..49150dec6a 100644 --- a/control-plane/connect-inject/controllers/endpoints/endpoints_controller_ent_test.go +++ b/control-plane/connect-inject/controllers/endpoints/endpoints_controller_ent_test.go @@ -14,9 +14,6 @@ import ( logrtest "github.com/go-logr/logr/testing" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" - "github.com/hashicorp/consul-k8s/control-plane/helper/test" - "github.com/hashicorp/consul-k8s/control-plane/namespaces" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil" "github.com/stretchr/testify/require" @@ -26,6 +23,10 @@ import ( "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" + "github.com/hashicorp/consul-k8s/control-plane/helper/test" + "github.com/hashicorp/consul-k8s/control-plane/namespaces" ) // TestReconcileCreateEndpoint tests the logic to create service instances in Consul from the addresses in the Endpoints @@ -2121,7 +2122,7 @@ func createPodWithNamespace(name, namespace, ip string, inject bool, managedByEn Namespace: namespace, Labels: map[string]string{}, Annotations: map[string]string{ - constants.AnnotationConsulK8sVersion: "1.0.0", + constants.LegacyAnnotationConsulK8sVersion: "1.0.0", }, }, Status: corev1.PodStatus{ diff --git a/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go b/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go index 2cec69dd0a..380db5a0b6 100644 --- a/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go +++ b/control-plane/connect-inject/controllers/endpoints/endpoints_controller_test.go @@ -13,9 +13,6 @@ import ( logrtest "github.com/go-logr/logr/testr" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" - "github.com/hashicorp/consul-k8s/control-plane/connect-inject/metrics" - "github.com/hashicorp/consul-k8s/control-plane/helper/test" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil" "github.com/stretchr/testify/require" @@ -27,6 +24,10 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/metrics" + "github.com/hashicorp/consul-k8s/control-plane/helper/test" ) const ( @@ -34,59 +35,6 @@ const ( consulNodeName = "test-node-virtual" ) -func TestShouldIgnore(t *testing.T) { - t.Parallel() - cases := []struct { - name string - namespace string - denySet mapset.Set - allowSet mapset.Set - expected bool - }{ - { - name: "system namespace", - namespace: "kube-system", - denySet: mapset.NewSetWith(), - allowSet: mapset.NewSetWith("*"), - expected: true, - }, - { - name: "other system namespace", - namespace: "local-path-storage", - denySet: mapset.NewSetWith(), - allowSet: mapset.NewSetWith("*"), - expected: true, - }, - { - name: "any namespace allowed", - namespace: "foo", - denySet: mapset.NewSetWith(), - allowSet: mapset.NewSetWith("*"), - expected: false, - }, - { - name: "in deny list", - namespace: "foo", - denySet: mapset.NewSetWith("foo"), - allowSet: mapset.NewSetWith("*"), - expected: true, - }, - { - name: "not in allow list", - namespace: "foo", - denySet: mapset.NewSetWith(), - allowSet: mapset.NewSetWith("bar"), - expected: true, - }, - } - for _, tt := range cases { - t.Run(tt.name, func(t *testing.T) { - actual := shouldIgnore(tt.namespace, tt.denySet, tt.allowSet) - require.Equal(t, tt.expected, actual) - }) - } -} - func TestHasBeenInjected(t *testing.T) { t.Parallel() cases := []struct { @@ -3715,7 +3663,7 @@ func TestReconcileUpdateEndpoint_LegacyService(t *testing.T) { k8sObjects: func() []runtime.Object { pod1 := createServicePod("pod1", "1.2.3.4", true, true) pod1.Status.HostIP = "127.0.0.1" - pod1.Annotations[constants.AnnotationConsulK8sVersion] = "0.99.0" // We want a version less than 1.0.0. + pod1.Annotations[constants.LegacyAnnotationConsulK8sVersion] = "0.99.0" // We want a version less than 1.0.0. endpoint := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "service-updated", @@ -3780,7 +3728,7 @@ func TestReconcileUpdateEndpoint_LegacyService(t *testing.T) { k8sObjects: func() []runtime.Object { pod1 := createServicePod("pod1", "1.2.3.4", true, true) pod1.Status.HostIP = "127.0.0.1" - pod1.Annotations[constants.AnnotationConsulK8sVersion] = "0.99.0" // We want a version less than 1.0.0. + pod1.Annotations[constants.LegacyAnnotationConsulK8sVersion] = "0.99.0" // We want a version less than 1.0.0. endpoint := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "service-updated", @@ -6624,7 +6572,7 @@ func createServicePod(name, ip string, inject bool, managedByEndpointsController Namespace: "default", Labels: map[string]string{}, Annotations: map[string]string{ - constants.AnnotationConsulK8sVersion: "1.0.0", + constants.LegacyAnnotationConsulK8sVersion: "1.0.0", }, }, Status: corev1.PodStatus{ diff --git a/control-plane/connect-inject/controllers/pod/pod_controller.go b/control-plane/connect-inject/controllers/pod/pod_controller.go new file mode 100644 index 0000000000..f4e3f56053 --- /dev/null +++ b/control-plane/connect-inject/controllers/pod/pod_controller.go @@ -0,0 +1,337 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package pod + +import ( + "context" + "fmt" + "strconv" + + mapset "github.com/deckarep/golang-set" + "github.com/go-logr/logr" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/go-multierror" + "google.golang.org/protobuf/types/known/anypb" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/common" + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/metrics" + "github.com/hashicorp/consul-k8s/control-plane/consul" + "github.com/hashicorp/consul-k8s/control-plane/namespaces" +) + +const ( + metaKeyManagedBy = "managed-by" + tokenMetaPodNameKey = "pod" +) + +type Controller struct { + client.Client + // ConsulClientConfig is the config for the Consul API client. + ConsulClientConfig *consul.Config + // ConsulServerConnMgr is the watcher for the Consul server addresses. + ConsulServerConnMgr consul.ServerConnectionManager + // Only pods in the AllowK8sNamespacesSet are reconciled. + AllowK8sNamespacesSet mapset.Set + // Pods in the DenyK8sNamespacesSet are ignored. + DenyK8sNamespacesSet mapset.Set + // EnableConsulPartitions indicates that a user is running Consul Enterprise + EnableConsulPartitions bool + // ConsulPartition is the Consul Partition to which this controller belongs + ConsulPartition string + // EnableConsulNamespaces indicates that a user is running Consul Enterprise + EnableConsulNamespaces bool + // ConsulDestinationNamespace is the name of the Consul namespace to create + // all config entries in. If EnableNSMirroring is true this is ignored. + ConsulDestinationNamespace string + // EnableNSMirroring causes Consul namespaces to be created to match the + // k8s namespace of any config entry custom resource. Config entries will + // be created in the matching Consul namespace. + EnableNSMirroring bool + // NSMirroringPrefix is an optional prefix that can be added to the Consul + // namespaces created while mirroring. For example, if it is set to "k8s-", + // then the k8s `default` namespace will be mirrored in Consul's + // `k8s-default` namespace. + NSMirroringPrefix string + + // TODO: EnableWANFederation + + // AuthMethod is the name of the Kubernetes Auth Method that + // was used to login with Consul. The Endpoints controller + // will delete any tokens associated with this auth method + // whenever service instances are deregistered. + AuthMethod string + + // EnableTelemetryCollector controls whether the proxy service should be registered + // with config to enable telemetry forwarding. + EnableTelemetryCollector bool + + MetricsConfig metrics.Config + + Log logr.Logger + + // ResourceClient is a gRPC client for the resource service. It is public for testing purposes + ResourceClient pbresource.ResourceServiceClient +} + +// TODO(dans): logs, logs, logs + +// Reconcile reads the state of an Endpoints object for a Kubernetes Service and reconciles Consul services which +// correspond to the Kubernetes Service. These events are driven by changes to the Pods backing the Kube service. +func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + var errs error + var pod corev1.Pod + + // Ignore the request if the namespace of the endpoint is not allowed. + // Strictly speaking, this is not required because the mesh webhook also knows valid namespaces + // for injection, but it will somewhat reduce the amount of unnecessary deletions for non-injected + // pods + if common.ShouldIgnore(req.Namespace, r.DenyK8sNamespacesSet, r.AllowK8sNamespacesSet) { + return ctrl.Result{}, nil + } + + rc, err := consul.NewResourceServiceClient(r.ConsulServerConnMgr) + if err != nil { + r.Log.Error(err, "failed to create resource client", "name", req.Name, "ns", req.Namespace) + return ctrl.Result{}, err + } + r.ResourceClient = rc + + err = r.Client.Get(ctx, req.NamespacedName, &pod) + + // If the pod object has been deleted (and we get an IsNotFound error), + // we need to remove the Workload from Consul. + if k8serrors.IsNotFound(err) { + + if err := r.deleteWorkload(ctx, req.NamespacedName); err != nil { + errs = multierror.Append(errs, err) + } + + // TODO: delete explicit upstreams + //if err := r.deleteUpstreams(ctx, pod); err != nil { + // errs = multierror.Append(errs, err) + //} + + // TODO(dans): delete proxyConfiguration + //if err := r.deleteProxyConfiguration(ctx, pod); err != nil { + // errs = multierror.Append(errs, err) + //} + + // TODO: clean up ACL Tokens + + // TODO(dans): delete health status, since we don't have finalizers + //if err := r.deleteHealthStatus(ctx, pod); err != nil { + // errs = multierror.Append(errs, err) + //} + + return ctrl.Result{}, errs + } else if err != nil { + r.Log.Error(err, "failed to get Pod", "name", req.Name, "ns", req.Namespace) + return ctrl.Result{}, err + } + + r.Log.Info("retrieved", "name", pod.Name, "ns", pod.Namespace) + + if hasBeenInjected(pod) { + if err := r.writeWorkload(ctx, pod); err != nil { + errs = multierror.Append(errs, err) + } + + // TODO(dans): create proxyConfiguration + + // TODO: create explicit upstreams + //if err := r.writeUpstreams(ctx, pod); err != nil { + // errs = multierror.Append(errs, err) + //} + + // TODO(dans): write health status + //if err := r.writeHealthStatus(ctx, pod); err != nil { + // errs = multierror.Append(errs, err) + //} + } + + return ctrl.Result{}, errs +} + +func (r *Controller) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.Pod{}). + Complete(r) +} + +// hasBeenInjected checks the value of the status annotation and returns true if the Pod has been injected. +func hasBeenInjected(pod corev1.Pod) bool { + if anno, ok := pod.Annotations[constants.KeyMeshInjectStatus]; ok && anno == constants.Injected { + return true + } + return false +} + +func (r *Controller) deleteWorkload(ctx context.Context, pod types.NamespacedName) error { + req := &pbresource.DeleteRequest{ + Id: getWorkloadID(pod.Name, r.getConsulNamespace(pod.Namespace), r.getPartition()), + } + + _, err := r.ResourceClient.Delete(ctx, req) + return err +} + +//func (r *Controller) deleteHealthStatus(ctx context.Context, pod corev1.Pod) error { +// return nil +//} + +func (r *Controller) writeWorkload(ctx context.Context, pod corev1.Pod) error { + + // TODO: we should add some validation on the required fields here + // e.g. what if token automount is disabled and there is not SA. The API call + // will fail with no indication to the user other than controller logs + ports, workloadPorts := getWorkloadPorts(pod) + + var node corev1.Node + // Ignore errors because we don't want failures to block running services. + _ = r.Client.Get(context.Background(), types.NamespacedName{Name: pod.Spec.NodeName, Namespace: pod.Namespace}, &node) + locality := parseLocality(node) + + workload := &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: pod.Status.PodIP, Ports: ports}, + }, + Identity: pod.Spec.ServiceAccountName, + Locality: locality, + NodeName: common.ConsulNodeNameFromK8sNode(pod.Spec.NodeName), + Ports: workloadPorts, + } + + // TODO(dans): replace with common.ToProtoAny when available + proto, err := anypb.New(workload) + if err != nil { + return fmt.Errorf("could not serialize workload: %w", err) + } + + // TODO: allow custom workload metadata + meta := map[string]string{ + constants.MetaKeyKubeNS: pod.Namespace, + metaKeyManagedBy: constants.ManagedByPodValue, + } + + req := &pbresource.WriteRequest{ + Resource: &pbresource.Resource{ + Id: getWorkloadID(pod.GetName(), r.getConsulNamespace(pod.Namespace), r.getPartition()), + Metadata: meta, + Data: proto, + }, + } + _, err = r.ResourceClient.Write(ctx, req) + return err +} + +//func (r *Controller) writeHealthStatus(pod corev1.Pod) error { +// return nil +//} + +// TODO(dans): delete ACL token for workload +// deleteACLTokensForServiceInstance finds the ACL tokens that belongs to the service instance and deletes it from Consul. +// It will only check for ACL tokens that have been created with the auth method this controller +// has been configured with and will only delete tokens for the provided podName. +// func (r *Controller) deleteACLTokensForWorkload(apiClient *api.Client, svc *api.AgentService, k8sNS, podName string) error { + +// TODO: add support for explicit upstreams +//func (r *Controller) writeUpstreams(pod corev1.Pod) error + +// consulNamespace returns the Consul destination namespace for a provided Kubernetes namespace +// depending on Consul Namespaces being enabled and the value of namespace mirroring. +func (r *Controller) getConsulNamespace(kubeNamespace string) string { + ns := namespaces.ConsulNamespace( + kubeNamespace, + r.EnableConsulNamespaces, + r.ConsulDestinationNamespace, + r.EnableNSMirroring, + r.NSMirroringPrefix, + ) + + // TODO: remove this if and when the default namespace of resources change. + if ns == "" { + ns = constants.DefaultConsulNS + } + return ns +} + +func (r *Controller) getPartition() string { + if !r.EnableConsulPartitions || r.ConsulPartition == "" { + return constants.DefaultConsulPartition + } + return r.ConsulPartition +} + +func getWorkloadPorts(pod corev1.Pod) ([]string, map[string]*pbcatalog.WorkloadPort) { + ports := make([]string, 0) + workloadPorts := map[string]*pbcatalog.WorkloadPort{} + + for _, container := range pod.Spec.Containers { + for _, port := range container.Ports { + name := port.Name + if name == "" { + name = strconv.Itoa(int(port.ContainerPort)) + } + + // TODO: error check reserved "mesh" keyword and 20000 + + if port.Protocol != corev1.ProtocolTCP { + // TODO: also throw an error here + continue + } + + ports = append(ports, name) + workloadPorts[name] = &pbcatalog.WorkloadPort{ + Port: uint32(port.ContainerPort), + + // We leave the protocol unspecified so that it can be inherited from the Service appProtocol + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + } + } + } + + ports = append(ports, "mesh") + workloadPorts["mesh"] = &pbcatalog.WorkloadPort{ + Port: constants.ProxyDefaultInboundPort, + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + } + + return ports, workloadPorts +} + +func parseLocality(node corev1.Node) *pbcatalog.Locality { + region := node.Labels[corev1.LabelTopologyRegion] + zone := node.Labels[corev1.LabelTopologyZone] + + if region == "" { + return nil + } + + return &pbcatalog.Locality{ + Region: region, + Zone: zone, + } +} + +func getWorkloadID(name, namespace, partition string) *pbresource.ID { + return &pbresource.ID{ + Name: name, + Type: &pbresource.Type{ + Group: "catalog", + GroupVersion: "v1alpha1", + Kind: "Workload", + }, + Tenancy: &pbresource.Tenancy{ + Partition: partition, + Namespace: namespace, + }, + } +} diff --git a/control-plane/connect-inject/controllers/pod/pod_controller_ent_test.go b/control-plane/connect-inject/controllers/pod/pod_controller_ent_test.go new file mode 100644 index 0000000000..802cb9d910 --- /dev/null +++ b/control-plane/connect-inject/controllers/pod/pod_controller_ent_test.go @@ -0,0 +1,44 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +//go:build enterprise + +package pod + +import "testing" + +// TODO(dans) +// Tests creating a Pod object in a non-default NS and Partition with namespaces set to mirroring +func TestReconcileCreatePodWithMirrorNamespaces(t *testing.T) { + +} + +// TODO(dans) +// Tests updating a Pod object in a non-default NS and Partition with namespaces set to mirroring +func TestReconcileUpdatePodWithMirrorNamespaces(t *testing.T) { + +} + +// TODO(dans) +// Tests deleting a Pod object in a non-default NS and Partition with namespaces set to mirroring +func TestReconcileDeletePodWithMirrorNamespaces(t *testing.T) { + +} + +// TODO(dans) +// Tests creating a Pod object in a non-default NS and Partition with namespaces set to a destination +func TestReconcileCreatePodWithDestinationNamespace(t *testing.T) { + +} + +// TODO(dans) +// Tests updating a Pod object in a non-default NS and Partition with namespaces set to a destination +func TestReconcileUpdatePodWithDestinationNamespace(t *testing.T) { + +} + +// TODO(dans) +// Tests deleting a Pod object in a non-default NS and Partition with namespaces set to a destination +func TestReconcileDeletePodWithDestinationNamespace(t *testing.T) { + +} diff --git a/control-plane/connect-inject/controllers/pod/pod_controller_test.go b/control-plane/connect-inject/controllers/pod/pod_controller_test.go new file mode 100644 index 0000000000..52a054581b --- /dev/null +++ b/control-plane/connect-inject/controllers/pod/pod_controller_test.go @@ -0,0 +1,1035 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package pod + +import ( + "context" + "testing" + + mapset "github.com/deckarep/golang-set" + logrtest "github.com/go-logr/logr/testr" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/sdk/testutil" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants" + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/metrics" + "github.com/hashicorp/consul-k8s/control-plane/consul" + "github.com/hashicorp/consul-k8s/control-plane/helper/test" +) + +const ( + nodeName = "test-node" + localityNodeName = "test-node-w-locality" + consulNodeName = "test-node-virtual" + consulLocalityNodeName = "test-node-w-locality-virtual" + consulNodeAddress = "127.0.0.1" +) + +func TestHasBeenInjected(t *testing.T) { + t.Parallel() + cases := []struct { + name string + pod func() corev1.Pod + expected bool + }{ + { + name: "Pod with injected annotation", + pod: func() corev1.Pod { + pod1 := createPod("pod1", "1.2.3.4", "foo", true, true) + return *pod1 + }, + expected: true, + }, + { + name: "Pod without injected annotation", + pod: func() corev1.Pod { + pod1 := createPod("pod1", "1.2.3.4", "foo", false, true) + return *pod1 + }, + expected: false, + }, + } + + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + + actual := hasBeenInjected(tt.pod()) + require.Equal(t, tt.expected, actual) + }) + } +} + +func TestParseLocality(t *testing.T) { + t.Run("no labels", func(t *testing.T) { + n := corev1.Node{} + require.Nil(t, parseLocality(n)) + }) + + t.Run("zone only", func(t *testing.T) { + n := corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + corev1.LabelTopologyZone: "us-west-1a", + }, + }, + } + require.Nil(t, parseLocality(n)) + }) + + t.Run("everything", func(t *testing.T) { + n := corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + corev1.LabelTopologyRegion: "us-west-1", + corev1.LabelTopologyZone: "us-west-1a", + }, + }, + } + require.True(t, proto.Equal(&pbcatalog.Locality{Region: "us-west-1", Zone: "us-west-1a"}, parseLocality(n))) + }) +} + +func TestWorkloadWrite(t *testing.T) { + t.Parallel() + + ns := corev1.Namespace{ObjectMeta: metav1.ObjectMeta{ + Name: metav1.NamespaceDefault, + Namespace: metav1.NamespaceDefault, + }} + node := corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} + localityNode := corev1.Node{ObjectMeta: metav1.ObjectMeta{ + Name: localityNodeName, + Namespace: metav1.NamespaceDefault, + Labels: map[string]string{ + corev1.LabelTopologyRegion: "us-east1", + corev1.LabelTopologyZone: "us-east1-b", + }, + }} + + type testCase struct { + name string + pod *corev1.Pod + podModifier func(pod *corev1.Pod) + expectedWorkload *pbcatalog.Workload + } + + run := func(t *testing.T, tc testCase) { + if tc.podModifier != nil { + tc.podModifier(tc.pod) + } + + k8sObjects := []runtime.Object{ + &ns, + &node, + &localityNode, + } + + fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build() + + // Create test consulServer server. + testClient := test.TestServerWithMockConnMgrWatcher(t, func(c *testutil.TestServerConfig) { + c.Experiments = []string{"resource-apis"} + }) + resourceClient, err := consul.NewResourceServiceClient(testClient.Watcher) + require.NoError(t, err) + + // Create the pod controller. + pc := &Controller{ + Client: fakeClient, + Log: logrtest.New(t), + ConsulClientConfig: testClient.Cfg, + ConsulServerConnMgr: testClient.Watcher, + AllowK8sNamespacesSet: mapset.NewSetWith("*"), + DenyK8sNamespacesSet: mapset.NewSetWith(), + ResourceClient: resourceClient, + } + + err = pc.writeWorkload(context.Background(), *tc.pod) + require.NoError(t, err) + + req := &pbresource.ReadRequest{Id: &pbresource.ID{ + Name: tc.pod.GetName(), + Type: &pbresource.Type{ + Group: "catalog", + GroupVersion: "v1alpha1", + Kind: "Workload", + }, + Tenancy: &pbresource.Tenancy{ + Partition: constants.DefaultConsulPartition, + Namespace: metav1.NamespaceDefault, + }, + }} + actualRes, err := resourceClient.Read(context.Background(), req) + require.NoError(t, err) + require.NotNil(t, actualRes) + + require.Equal(t, tc.pod.GetName(), actualRes.GetResource().GetId().GetName()) + require.Equal(t, constants.DefaultConsulNS, actualRes.GetResource().GetId().GetTenancy().GetNamespace()) + require.Equal(t, constants.DefaultConsulPartition, actualRes.GetResource().GetId().GetTenancy().GetPartition()) + + require.NotNil(t, actualRes.GetResource().GetData()) + + actualWorkload := &pbcatalog.Workload{} + err = actualRes.GetResource().GetData().UnmarshalTo(actualWorkload) + require.NoError(t, err) + + require.True(t, proto.Equal(actualWorkload, tc.expectedWorkload)) + } + + testCases := []testCase{ + { + name: "multi-port single-container", + pod: createPod("foo", "10.0.0.1", "foo", true, true), + expectedWorkload: &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "10.0.0.1", Ports: []string{"public", "admin", "mesh"}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "public": { + Port: 80, + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + "admin": { + Port: 8080, + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + "mesh": { + Port: constants.ProxyDefaultInboundPort, + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + NodeName: consulNodeName, + Identity: "foo", + }, + }, + { + name: "multi-port multi-container", + pod: createPod("foo", "10.0.0.1", "foo", true, true), + podModifier: func(pod *corev1.Pod) { + container := corev1.Container{ + Name: "logger", + Ports: []corev1.ContainerPort{ + { + Name: "agent", + Protocol: corev1.ProtocolTCP, + ContainerPort: 6666, + }, + }, + } + pod.Spec.Containers = append(pod.Spec.Containers, container) + }, + expectedWorkload: &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "10.0.0.1", Ports: []string{"public", "admin", "agent", "mesh"}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "public": { + Port: 80, + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + "admin": { + Port: 8080, + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + "agent": { + Port: 6666, + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + "mesh": { + Port: constants.ProxyDefaultInboundPort, + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + NodeName: consulNodeName, + Identity: "foo", + }, + }, + { + name: "pod with locality", + pod: createPod("foo", "10.0.0.1", "foo", true, true), + podModifier: func(pod *corev1.Pod) { + pod.Spec.NodeName = localityNodeName + }, + expectedWorkload: &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "10.0.0.1", Ports: []string{"public", "admin", "mesh"}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "public": { + Port: 80, + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + "admin": { + Port: 8080, + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + "mesh": { + Port: constants.ProxyDefaultInboundPort, + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + Locality: &pbcatalog.Locality{ + Region: "us-east1", + Zone: "us-east1-b", + }, + NodeName: consulLocalityNodeName, + Identity: "foo", + }, + }, + { + name: "pod with unnamed ports", + pod: createPod("foo", "10.0.0.1", "foo", true, true), + podModifier: func(pod *corev1.Pod) { + pod.Spec.Containers[0].Ports[0].Name = "" + pod.Spec.Containers[0].Ports[1].Name = "" + }, + expectedWorkload: &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "10.0.0.1", Ports: []string{"80", "8080", "mesh"}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "80": { + Port: 80, + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + "8080": { + Port: 8080, + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + "mesh": { + Port: constants.ProxyDefaultInboundPort, + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + NodeName: consulNodeName, + Identity: "foo", + }, + }, + { + name: "pod with no ports", + pod: createPod("foo", "10.0.0.1", "foo", true, true), + podModifier: func(pod *corev1.Pod) { + pod.Spec.Containers[0].Ports = nil + }, + expectedWorkload: &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "10.0.0.1", Ports: []string{"mesh"}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "mesh": { + Port: constants.ProxyDefaultInboundPort, + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + NodeName: consulNodeName, + Identity: "foo", + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} + +func TestWorkloadDelete(t *testing.T) { + t.Parallel() + + type testCase struct { + name string + pod *corev1.Pod + existingWorkload *pbcatalog.Workload + } + + run := func(t *testing.T, tc testCase) { + fakeClient := fake.NewClientBuilder().WithRuntimeObjects().Build() + + // Create test consulServer server. + testClient := test.TestServerWithMockConnMgrWatcher(t, func(c *testutil.TestServerConfig) { + c.Experiments = []string{"resource-apis"} + }) + resourceClient, err := consul.NewResourceServiceClient(testClient.Watcher) + require.NoError(t, err) + + // Create the pod controller. + pc := &Controller{ + Client: fakeClient, + Log: logrtest.New(t), + ConsulClientConfig: testClient.Cfg, + ConsulServerConnMgr: testClient.Watcher, + AllowK8sNamespacesSet: mapset.NewSetWith("*"), + DenyK8sNamespacesSet: mapset.NewSetWith(), + ResourceClient: resourceClient, + } + + workload, err := anypb.New(tc.existingWorkload) + require.NoError(t, err) + + workloadID := getWorkloadID(tc.pod.GetName(), metav1.NamespaceDefault, constants.DefaultConsulPartition) + writeReq := &pbresource.WriteRequest{ + Resource: &pbresource.Resource{ + Id: workloadID, + Data: workload, + }, + } + + _, err = resourceClient.Write(context.Background(), writeReq) + require.NoError(t, err) + test.ResourceHasPersisted(t, resourceClient, workloadID) + + reconcileReq := types.NamespacedName{ + Namespace: metav1.NamespaceDefault, + Name: tc.pod.GetName(), + } + err = pc.deleteWorkload(context.Background(), reconcileReq) + require.NoError(t, err) + + readReq := &pbresource.ReadRequest{Id: &pbresource.ID{ + Name: tc.pod.GetName(), + Type: &pbresource.Type{ + Group: "catalog", + GroupVersion: "v1alpha1", + Kind: "Workload", + }, + Tenancy: &pbresource.Tenancy{ + Partition: constants.DefaultConsulPartition, + Namespace: metav1.NamespaceDefault, + }, + }} + _, err = resourceClient.Read(context.Background(), readReq) + require.Error(t, err) + s, ok := status.FromError(err) + require.True(t, ok) + require.Equal(t, codes.NotFound, s.Code()) + } + + testCases := []testCase{ + { + name: "basic pod delete", + pod: createPod("foo", "10.0.0.1", "foo", true, true), + existingWorkload: &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "10.0.0.1", Ports: []string{"public", "admin", "mesh"}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "public": { + Port: 80, + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + "admin": { + Port: 8080, + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + "mesh": { + Port: constants.ProxyDefaultInboundPort, + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + NodeName: consulNodeName, + Identity: "foo", + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} + +// TODO +// func TestHealthStatusWrite(t *testing.T) + +// TODO +// func TestHealthStatusDelete(t *testing.T) + +// TODO +// func TestUpstreamsWrite(t *testing.T) + +// TODO +// func TestUpstreamsDelete(t *testing.T) + +// TODO +// func TestDeleteACLTokens(t *testing.T) + +// TestReconcileCreatePod ensures that a new pod reconciliation fans out to create +// the appropriate Consul resources. Translation details from pod to Consul workload are +// tested at the relevant private functions. Any error states that are also tested here. +func TestReconcileCreatePod(t *testing.T) { + t.Parallel() + + ns := corev1.Namespace{ObjectMeta: metav1.ObjectMeta{ + Name: metav1.NamespaceDefault, + Namespace: metav1.NamespaceDefault, + }} + node := corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} + + type testCase struct { + name string + podName string // This needs to be aligned with the pod created in `k8sObjects` + namespace string // Defaults to metav1.NamespaceDefault if empty. Should be aligned with the ns in the pod + + k8sObjects func() []runtime.Object // testing node is injected separately + expectedWorkload *pbcatalog.Workload + //expectedHealthStatus *pbcatalog.HealthStatus + //expectedProxyConfiguration *pbmesh.ProxyConfiguration + //expectedUpstreams *pbmesh.Upstreams + + metricsEnabled bool + telemetryEnabled bool + + expErr string + } + + run := func(t *testing.T, tc testCase) { + k8sObjects := []runtime.Object{ + &ns, + &node, + } + if tc.k8sObjects != nil { + k8sObjects = append(k8sObjects, tc.k8sObjects()...) + } + + fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build() + + // Create test consulServer server. + testClient := test.TestServerWithMockConnMgrWatcher(t, func(c *testutil.TestServerConfig) { + c.Experiments = []string{"resource-apis"} + }) + resourceClient, err := consul.NewResourceServiceClient(testClient.Watcher) + require.NoError(t, err) + + // Create the pod controller. + pc := &Controller{ + Client: fakeClient, + Log: logrtest.New(t), + ConsulClientConfig: testClient.Cfg, + ConsulServerConnMgr: testClient.Watcher, + AllowK8sNamespacesSet: mapset.NewSetWith("*"), + DenyK8sNamespacesSet: mapset.NewSetWith(), + } + if tc.metricsEnabled { + pc.MetricsConfig = metrics.Config{ + DefaultEnableMetrics: true, + EnableGatewayMetrics: true, + } + } + pc.EnableTelemetryCollector = tc.telemetryEnabled + + namespace := tc.namespace + if namespace == "" { + namespace = metav1.NamespaceDefault + } + + namespacedName := types.NamespacedName{ + Namespace: namespace, + Name: tc.podName, + } + + resp, err := pc.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: namespacedName, + }) + if tc.expErr != "" { + require.EqualError(t, err, tc.expErr) + } else { + require.NoError(t, err) + } + require.False(t, resp.Requeue) + + expectedWorkloadMatches(t, resourceClient, tc.podName, tc.expectedWorkload) + // TODO(dans): compare the following to expected values + // expectedHealthStatus + // expectedProxyConfiguration + // expectedUpstreams + } + + testCases := []testCase{ + { + name: "vanilla new pod", + podName: "foo", + k8sObjects: func() []runtime.Object { + pod := createPod("foo", "10.0.0.1", "foo", true, true) + return []runtime.Object{pod} + }, + expectedWorkload: &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "10.0.0.1", Ports: []string{"public", "admin", "mesh"}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "public": { + Port: 80, + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + "admin": { + Port: 8080, + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + "mesh": { + Port: constants.ProxyDefaultInboundPort, + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + NodeName: consulNodeName, + Identity: "foo", + }, + }, + { + name: "pod in ignored namespace", + podName: "foo", + namespace: metav1.NamespaceSystem, + k8sObjects: func() []runtime.Object { + pod := createPod("foo", "10.0.0.1", "foo", true, true) + pod.ObjectMeta.Namespace = metav1.NamespaceSystem + return []runtime.Object{pod} + }, + }, + // TODO(dans): NotHealthyPod + // TODO(dans): tproxy + Metrics + Telemetry + // TODO: explicit upstreams + // TODO: at least one error cases + // TODO: make sure multi-error accumulates errors + // TODO: injection annotation added + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} + +// TestReconcileUpdatePod test updating a Pod object when there is already matching resources in Consul. +func TestReconcileUpdatePod(t *testing.T) { + t.Parallel() + + ns := corev1.Namespace{ObjectMeta: metav1.ObjectMeta{ + Name: metav1.NamespaceDefault, + Namespace: metav1.NamespaceDefault, + }} + node := corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} + + type testCase struct { + name string + podName string // This needs to be aligned with the pod created in `k8sObjects` + namespace string // Defaults to metav1.NamespaceDefault if empty. Should be aligned with the ns in the pod + + k8sObjects func() []runtime.Object // testing node is injected separately + + existingWorkload *pbcatalog.Workload + //existingHealthStatus *pbcatalog.HealthStatus + //existingProxyConfiguration *pbmesh.ProxyConfiguration + //existingUpstreams *pbmesh.Upstreams + + expectedWorkload *pbcatalog.Workload + //expectedHealthStatus *pbcatalog.HealthStatus + //expectedProxyConfiguration *pbmesh.ProxyConfiguration + //expectedUpstreams *pbmesh.Upstreams + + metricsEnabled bool + telemetryEnabled bool + + expErr string + } + + run := func(t *testing.T, tc testCase) { + k8sObjects := []runtime.Object{ + &ns, + &node, + } + if tc.k8sObjects != nil { + k8sObjects = append(k8sObjects, tc.k8sObjects()...) + } + + fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build() + + // Create test consulServer server. + testClient := test.TestServerWithMockConnMgrWatcher(t, func(c *testutil.TestServerConfig) { + c.Experiments = []string{"resource-apis"} + }) + resourceClient, err := consul.NewResourceServiceClient(testClient.Watcher) + require.NoError(t, err) + + // Create the pod controller. + pc := &Controller{ + Client: fakeClient, + Log: logrtest.New(t), + ConsulClientConfig: testClient.Cfg, + ConsulServerConnMgr: testClient.Watcher, + AllowK8sNamespacesSet: mapset.NewSetWith("*"), + DenyK8sNamespacesSet: mapset.NewSetWith(), + } + if tc.metricsEnabled { + pc.MetricsConfig = metrics.Config{ + DefaultEnableMetrics: true, + EnableGatewayMetrics: true, + } + } + pc.EnableTelemetryCollector = tc.telemetryEnabled + + namespace := tc.namespace + if namespace == "" { + namespace = metav1.NamespaceDefault + } + + loadResource( + t, + resourceClient, + getWorkloadID(tc.podName, constants.DefaultConsulNS, constants.DefaultConsulPartition), + tc.existingWorkload, + ) + + // TODO(dans): load the existing resources + // loadHealthStatus + // loadProxyConfiguration + // loadUpstreams + + namespacedName := types.NamespacedName{ + Namespace: namespace, + Name: tc.podName, + } + + resp, err := pc.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: namespacedName, + }) + if tc.expErr != "" { + require.EqualError(t, err, tc.expErr) + } else { + require.NoError(t, err) + } + require.False(t, resp.Requeue) + + expectedWorkloadMatches(t, resourceClient, tc.podName, tc.expectedWorkload) + // TODO(dans): compare the following to expected values + // expectedHealthStatus + // expectedProxyConfiguration + // expectedUpstreams + } + + testCases := []testCase{ + { + name: "pod update ports", + podName: "foo", + k8sObjects: func() []runtime.Object { + pod := createPod("foo", "10.0.0.1", "foo", true, true) + return []runtime.Object{pod} + }, + existingWorkload: &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "10.0.0.1", Ports: []string{"public", "mesh"}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "public": { + Port: 80, + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + "mesh": { + Port: constants.ProxyDefaultInboundPort, + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + NodeName: consulNodeName, + Identity: "foo", + }, + expectedWorkload: &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "10.0.0.1", Ports: []string{"public", "admin", "mesh"}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "public": { + Port: 80, + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + "admin": { + Port: 8080, + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + "mesh": { + Port: constants.ProxyDefaultInboundPort, + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + NodeName: consulNodeName, + Identity: "foo", + }, + }, + // TODO(dans): Pod Health to Unhealthy + // TODO(dans): update tproxy + Metrics + Telemetry + // TODO: update explicit upstreams + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} + +// Tests deleting a Pod object, with and without matching Consul resources. +func TestReconcileDeletePod(t *testing.T) { + t.Parallel() + + ns := corev1.Namespace{ObjectMeta: metav1.ObjectMeta{ + Name: metav1.NamespaceDefault, + Namespace: metav1.NamespaceDefault, + }} + node := corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} + + type testCase struct { + name string + podName string // This needs to be aligned with the pod created in `k8sObjects` + namespace string // Defaults to metav1.NamespaceDefault if empty. Should be aligned with the ns in the pod + + k8sObjects func() []runtime.Object // testing node is injected separately + + existingWorkload *pbcatalog.Workload + //existingHealthStatus *pbcatalog.HealthStatus + //existingProxyConfiguration *pbmesh.ProxyConfiguration + //existingUpstreams *pbmesh.Upstreams + + expectedWorkload *pbcatalog.Workload + //expectedHealthStatus *pbcatalog.HealthStatus + //expectedProxyConfiguration *pbmesh.ProxyConfiguration + //expectedUpstreams *pbmesh.Upstreams + + aclsEnabled bool + metricsEnabled bool + telemetryEnabled bool + + expErr string + } + + run := func(t *testing.T, tc testCase) { + k8sObjects := []runtime.Object{ + &ns, + &node, + } + if tc.k8sObjects != nil { + k8sObjects = append(k8sObjects, tc.k8sObjects()...) + } + + fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build() + + // Create test consulServer server. + testClient := test.TestServerWithMockConnMgrWatcher(t, func(c *testutil.TestServerConfig) { + c.Experiments = []string{"resource-apis"} + }) + resourceClient, err := consul.NewResourceServiceClient(testClient.Watcher) + require.NoError(t, err) + + // Create the pod controller. + pc := &Controller{ + Client: fakeClient, + Log: logrtest.New(t), + ConsulClientConfig: testClient.Cfg, + ConsulServerConnMgr: testClient.Watcher, + AllowK8sNamespacesSet: mapset.NewSetWith("*"), + DenyK8sNamespacesSet: mapset.NewSetWith(), + } + if tc.metricsEnabled { + pc.MetricsConfig = metrics.Config{ + DefaultEnableMetrics: true, + EnableGatewayMetrics: true, + } + } + if tc.aclsEnabled { + pc.AuthMethod = test.AuthMethod + } + pc.EnableTelemetryCollector = tc.telemetryEnabled + + namespace := tc.namespace + if namespace == "" { + namespace = metav1.NamespaceDefault + } + + loadResource( + t, + resourceClient, + getWorkloadID(tc.podName, constants.DefaultConsulNS, constants.DefaultConsulPartition), + tc.existingWorkload, + ) + + // TODO(dans): load the existing resources + // loadHealthStatus + // loadProxyConfiguration + // loadUpstreams + + namespacedName := types.NamespacedName{ + Namespace: namespace, + Name: tc.podName, + } + + resp, err := pc.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: namespacedName, + }) + if tc.expErr != "" { + require.EqualError(t, err, tc.expErr) + } else { + require.NoError(t, err) + } + require.False(t, resp.Requeue) + + expectedWorkloadMatches(t, resourceClient, tc.podName, tc.expectedWorkload) + // TODO(dans): compare the following to expected values + // expectedHealthStatus + // expectedProxyConfiguration + // expectedUpstreams + } + + testCases := []testCase{ + { + name: "vanilla delete pod", + podName: "foo", + existingWorkload: &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "10.0.0.1", Ports: []string{"public", "admin", "mesh"}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + "public": { + Port: 80, + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + "admin": { + Port: 8080, + Protocol: pbcatalog.Protocol_PROTOCOL_UNSPECIFIED, + }, + "mesh": { + Port: constants.ProxyDefaultInboundPort, + Protocol: pbcatalog.Protocol_PROTOCOL_MESH, + }, + }, + NodeName: consulNodeName, + Identity: "foo", + }, + }, + // TODO: enable ACLs and make sure they are deleted + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} + +func createPod(name, ip string, identity string, inject bool, ready bool) *corev1.Pod { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: metav1.NamespaceDefault, + Labels: map[string]string{}, + Annotations: map[string]string{ + constants.AnnotationConsulK8sVersion: "1.3.0", + }, + }, + Status: corev1.PodStatus{ + PodIP: ip, + HostIP: consulNodeAddress, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "web", + Ports: []corev1.ContainerPort{ + { + Name: "public", + Protocol: corev1.ProtocolTCP, + ContainerPort: 80, + }, + { + Name: "admin", + Protocol: corev1.ProtocolTCP, + ContainerPort: 8080, + }, + }, + }, + }, + NodeName: nodeName, + ServiceAccountName: identity, + }, + } + if ready { + pod.Status.Conditions = []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + } + } else { + pod.Status.Conditions = []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionFalse, + }, + } + } + + if inject { + pod.Labels[constants.KeyMeshInjectStatus] = constants.Injected + pod.Annotations[constants.KeyMeshInjectStatus] = constants.Injected + } + return pod +} + +func expectedWorkloadMatches(t *testing.T, client pbresource.ResourceServiceClient, name string, expectedWorkload *pbcatalog.Workload) { + req := &pbresource.ReadRequest{Id: getWorkloadID(name, metav1.NamespaceDefault, constants.DefaultConsulPartition)} + + res, err := client.Read(context.Background(), req) + + if expectedWorkload == nil { + require.Error(t, err) + s, ok := status.FromError(err) + require.True(t, ok) + require.Equal(t, codes.NotFound, s.Code()) + return + } + + require.NoError(t, err) + require.NotNil(t, res) + + require.Equal(t, name, res.GetResource().GetId().GetName()) + require.Equal(t, constants.DefaultConsulNS, res.GetResource().GetId().GetTenancy().GetNamespace()) + require.Equal(t, constants.DefaultConsulPartition, res.GetResource().GetId().GetTenancy().GetPartition()) + + require.NotNil(t, res.GetResource().GetData()) + + actualWorkload := &pbcatalog.Workload{} + err = res.GetResource().GetData().UnmarshalTo(actualWorkload) + require.NoError(t, err) + + require.True(t, proto.Equal(actualWorkload, expectedWorkload)) +} + +func loadResource(t *testing.T, client pbresource.ResourceServiceClient, id *pbresource.ID, proto proto.Message) { + if id == nil || proto == nil { + return + } + + data, err := anypb.New(proto) + require.NoError(t, err) + + resource := &pbresource.Resource{ + Id: id, + Data: data, + } + + req := &pbresource.WriteRequest{Resource: resource} + _, err = client.Write(context.Background(), req) + require.NoError(t, err) + test.ResourceHasPersisted(t, client, id) +} diff --git a/control-plane/connect-inject/webhook/mesh_webhook.go b/control-plane/connect-inject/webhook/mesh_webhook.go index 523200b96c..24c26d4f42 100644 --- a/control-plane/connect-inject/webhook/mesh_webhook.go +++ b/control-plane/connect-inject/webhook/mesh_webhook.go @@ -620,6 +620,7 @@ func (w *MeshWebhook) defaultAnnotations(pod *corev1.Pod, podJson string) error } } pod.Annotations[constants.AnnotationOriginalPod] = podJson + pod.Annotations[constants.LegacyAnnotationConsulK8sVersion] = version.GetHumanVersion() pod.Annotations[constants.AnnotationConsulK8sVersion] = version.GetHumanVersion() return nil diff --git a/control-plane/connect-inject/webhook/mesh_webhook_test.go b/control-plane/connect-inject/webhook/mesh_webhook_test.go index 64dbd21c9a..4345403bcd 100644 --- a/control-plane/connect-inject/webhook/mesh_webhook_test.go +++ b/control-plane/connect-inject/webhook/mesh_webhook_test.go @@ -248,7 +248,7 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, { Operation: "add", @@ -341,7 +341,7 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, { Operation: "add", @@ -396,7 +396,7 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, { Operation: "add", @@ -470,7 +470,7 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, { Operation: "add", @@ -530,7 +530,7 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, { Operation: "add", @@ -615,7 +615,7 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, { Operation: "add", @@ -752,7 +752,7 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, { Operation: "replace", @@ -815,7 +815,7 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, { Operation: "add", @@ -874,7 +874,7 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, { Operation: "add", @@ -959,7 +959,7 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, { Operation: "add", @@ -1029,7 +1029,7 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, { Operation: "add", @@ -1101,7 +1101,7 @@ func TestHandlerHandle(t *testing.T) { }, { Operation: "add", - Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, // Note: no DNS policy/config additions. }, @@ -1357,8 +1357,8 @@ func TestHandlerDefaultAnnotations(t *testing.T) { "empty", &corev1.Pod{}, map[string]string{ - constants.AnnotationOriginalPod: "{\"metadata\":{\"creationTimestamp\":null},\"spec\":{\"containers\":null},\"status\":{}}", - constants.AnnotationConsulK8sVersion: version.GetHumanVersion(), + constants.AnnotationOriginalPod: "{\"metadata\":{\"creationTimestamp\":null},\"spec\":{\"containers\":null},\"status\":{}}", + constants.LegacyAnnotationConsulK8sVersion: version.GetHumanVersion(), }, "", }, @@ -1378,8 +1378,8 @@ func TestHandlerDefaultAnnotations(t *testing.T) { }, }, map[string]string{ - constants.AnnotationOriginalPod: "{\"metadata\":{\"creationTimestamp\":null},\"spec\":{\"containers\":[{\"name\":\"web\",\"resources\":{}},{\"name\":\"web-side\",\"resources\":{}}]},\"status\":{}}", - constants.AnnotationConsulK8sVersion: version.GetHumanVersion(), + constants.AnnotationOriginalPod: "{\"metadata\":{\"creationTimestamp\":null},\"spec\":{\"containers\":[{\"name\":\"web\",\"resources\":{}},{\"name\":\"web-side\",\"resources\":{}}]},\"status\":{}}", + constants.LegacyAnnotationConsulK8sVersion: version.GetHumanVersion(), }, "", }, @@ -1405,9 +1405,9 @@ func TestHandlerDefaultAnnotations(t *testing.T) { }, }, map[string]string{ - "consul.hashicorp.com/connect-service": "foo", - constants.AnnotationOriginalPod: "{\"metadata\":{\"creationTimestamp\":null,\"annotations\":{\"consul.hashicorp.com/connect-service\":\"foo\"}},\"spec\":{\"containers\":[{\"name\":\"web\",\"resources\":{}},{\"name\":\"web-side\",\"resources\":{}}]},\"status\":{}}", - constants.AnnotationConsulK8sVersion: version.GetHumanVersion(), + "consul.hashicorp.com/connect-service": "foo", + constants.AnnotationOriginalPod: "{\"metadata\":{\"creationTimestamp\":null,\"annotations\":{\"consul.hashicorp.com/connect-service\":\"foo\"}},\"spec\":{\"containers\":[{\"name\":\"web\",\"resources\":{}},{\"name\":\"web-side\",\"resources\":{}}]},\"status\":{}}", + constants.LegacyAnnotationConsulK8sVersion: version.GetHumanVersion(), }, "", @@ -1434,9 +1434,9 @@ func TestHandlerDefaultAnnotations(t *testing.T) { }, }, map[string]string{ - constants.AnnotationPort: "http", - constants.AnnotationOriginalPod: "{\"metadata\":{\"creationTimestamp\":null},\"spec\":{\"containers\":[{\"name\":\"web\",\"ports\":[{\"name\":\"http\",\"containerPort\":8080}],\"resources\":{}},{\"name\":\"web-side\",\"resources\":{}}]},\"status\":{}}", - constants.AnnotationConsulK8sVersion: version.GetHumanVersion(), + constants.AnnotationPort: "http", + constants.AnnotationOriginalPod: "{\"metadata\":{\"creationTimestamp\":null},\"spec\":{\"containers\":[{\"name\":\"web\",\"ports\":[{\"name\":\"http\",\"containerPort\":8080}],\"resources\":{}},{\"name\":\"web-side\",\"resources\":{}}]},\"status\":{}}", + constants.LegacyAnnotationConsulK8sVersion: version.GetHumanVersion(), }, "", }, @@ -1461,9 +1461,9 @@ func TestHandlerDefaultAnnotations(t *testing.T) { }, }, map[string]string{ - constants.AnnotationPort: "8080", - constants.AnnotationOriginalPod: "{\"metadata\":{\"creationTimestamp\":null},\"spec\":{\"containers\":[{\"name\":\"web\",\"ports\":[{\"containerPort\":8080}],\"resources\":{}},{\"name\":\"web-side\",\"resources\":{}}]},\"status\":{}}", - constants.AnnotationConsulK8sVersion: version.GetHumanVersion(), + constants.AnnotationPort: "8080", + constants.AnnotationOriginalPod: "{\"metadata\":{\"creationTimestamp\":null},\"spec\":{\"containers\":[{\"name\":\"web\",\"ports\":[{\"containerPort\":8080}],\"resources\":{}},{\"name\":\"web-side\",\"resources\":{}}]},\"status\":{}}", + constants.LegacyAnnotationConsulK8sVersion: version.GetHumanVersion(), }, "", }, diff --git a/control-plane/consul/resource_client.go b/control-plane/consul/resource_client.go index edb2d2fce9..82c24af34f 100644 --- a/control-plane/consul/resource_client.go +++ b/control-plane/consul/resource_client.go @@ -6,14 +6,13 @@ package consul import ( "fmt" - "github.com/hashicorp/consul-server-connection-manager/discovery" "github.com/hashicorp/consul/proto-public/pbresource" ) // NewResourceServiceClient creates a pbresource.ResourceServiceClient for creating V2 Consul resources. // It is initialized with a consul-server-connection-manager Watcher to continuously find Consul // server addresses. -func NewResourceServiceClient(watcher *discovery.Watcher) (pbresource.ResourceServiceClient, error) { +func NewResourceServiceClient(watcher ServerConnectionManager) (pbresource.ResourceServiceClient, error) { // We recycle the GRPC connection from the discovery client because it // should have all the necessary dial options, including the resolver that diff --git a/control-plane/go.mod b/control-plane/go.mod index 55918dceb4..3589eac8b7 100644 --- a/control-plane/go.mod +++ b/control-plane/go.mod @@ -13,7 +13,7 @@ require ( github.com/google/go-cmp v0.5.9 github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 github.com/hashicorp/consul/api v1.10.1-0.20230821180813-217d305b38d5 - github.com/hashicorp/consul/proto-public v0.1.2-0.20230821180813-217d305b38d5 // this points to a commit on Consul main + github.com/hashicorp/consul/proto-public v0.1.2-0.20230829221456-f8812eddf1ef // this points to a commit on Consul main github.com/hashicorp/consul/sdk v0.14.1 github.com/hashicorp/go-bexpr v0.1.11 github.com/hashicorp/go-discover v0.0.0-20230519164032-214571b6a530 @@ -48,6 +48,7 @@ require ( require ( github.com/hashicorp/consul-k8s/control-plane/cni v0.0.0-20230825213844-4ea04860c5ed github.com/hashicorp/consul-server-connection-manager v0.1.4 + google.golang.org/grpc v1.55.0 google.golang.org/protobuf v1.30.0 ) @@ -164,7 +165,6 @@ require ( google.golang.org/api v0.114.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect - google.golang.org/grpc v1.55.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/resty.v1 v1.12.0 // indirect gopkg.in/square/go-jose.v2 v2.5.1 // indirect diff --git a/control-plane/go.sum b/control-plane/go.sum index 7868dc727b..1a16c25165 100644 --- a/control-plane/go.sum +++ b/control-plane/go.sum @@ -265,8 +265,8 @@ github.com/hashicorp/consul-server-connection-manager v0.1.4 h1:wrcSRV6WGXFBNpNb github.com/hashicorp/consul-server-connection-manager v0.1.4/go.mod h1:LMqHkALoLP0HUQKOG21xXYr0YPUayIQIHNTlmxG100E= github.com/hashicorp/consul/api v1.10.1-0.20230821180813-217d305b38d5 h1:TTTgXv9YeaRnODyFP1k2b2Nq5RIGrUUgI5SkDhuSNwM= github.com/hashicorp/consul/api v1.10.1-0.20230821180813-217d305b38d5/go.mod h1:NZJGRFYruc/80wYowkPFCp1LbGmJC9L8izrwfyVx/Wg= -github.com/hashicorp/consul/proto-public v0.1.2-0.20230821180813-217d305b38d5 h1:mN5hKOn+G5fQBuXjdne/HluE4FhesUxscZEblqP4OSQ= -github.com/hashicorp/consul/proto-public v0.1.2-0.20230821180813-217d305b38d5/go.mod h1:ENwzmloQTUPAYPu7nC1mli3VY0Ny9QNi/FSzJ+KlZD0= +github.com/hashicorp/consul/proto-public v0.1.2-0.20230829221456-f8812eddf1ef h1:Vt5NSnXc+RslTxXH2pz7dCb3hnE33CD2TrBP5AIQtMg= +github.com/hashicorp/consul/proto-public v0.1.2-0.20230829221456-f8812eddf1ef/go.mod h1:ENwzmloQTUPAYPu7nC1mli3VY0Ny9QNi/FSzJ+KlZD0= github.com/hashicorp/consul/sdk v0.4.1-0.20230825164720-ecdcde430924 h1:gkb6/ix0Tg1Th5FTjyq4QklLgrtIVQ/TUB0kbhIcPsY= github.com/hashicorp/consul/sdk v0.4.1-0.20230825164720-ecdcde430924/go.mod h1:vFt03juSzocLRFo59NkeQHHmQa6+g7oU0pfzdI1mUhg= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= diff --git a/control-plane/helper/test/test_util.go b/control-plane/helper/test/test_util.go index e29e44de59..b3544a38bb 100644 --- a/control-plane/helper/test/test_util.go +++ b/control-plane/helper/test/test_util.go @@ -4,6 +4,7 @@ package test import ( + "context" "fmt" "net" "net/http" @@ -13,12 +14,16 @@ import ( "testing" "time" - "github.com/hashicorp/consul-k8s/control-plane/consul" - "github.com/hashicorp/consul-k8s/control-plane/helper/cert" "github.com/hashicorp/consul-server-connection-manager/discovery" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/sdk/testutil" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/hashicorp/consul-k8s/control-plane/consul" + "github.com/hashicorp/consul-k8s/control-plane/helper/cert" ) const ( @@ -62,20 +67,29 @@ func TestServerWithMockConnMgrWatcher(t *testing.T, callback testutil.ServerConf TestServer: consulServer, APIClient: client, Cfg: consulConfig, - Watcher: MockConnMgrForIPAndPort("127.0.0.1", cfg.Ports.GRPC), + Watcher: MockConnMgrForIPAndPort(t, "127.0.0.1", cfg.Ports.GRPC), } } -func MockConnMgrForIPAndPort(ip string, port int) *consul.MockServerConnectionManager { +func MockConnMgrForIPAndPort(t *testing.T, ip string, port int) *consul.MockServerConnectionManager { parsedIP := net.ParseIP(ip) connMgr := &consul.MockServerConnectionManager{} + + conn, err := grpc.DialContext( + context.Background(), + fmt.Sprintf("%s:%d", parsedIP, port), + grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + mockState := discovery.State{ Address: discovery.Addr{ TCPAddr: net.TCPAddr{ IP: parsedIP, Port: port, }, - }} + }, + GRPCConn: conn, + } connMgr.On("State").Return(mockState, nil) connMgr.On("Run").Return(nil) connMgr.On("Stop").Return(nil) @@ -257,6 +271,27 @@ func SetupK8sAuthMethodWithNamespaces(t *testing.T, consulClient *api.Client, se require.NoError(t, err) } +// ResourceHasPersisted checks that a recently written resource exists in the Consul +// state store with a valid version. This must be true before a resource is overwritten +// or deleted. +func ResourceHasPersisted(t *testing.T, client pbresource.ResourceServiceClient, id *pbresource.ID) { + req := &pbresource.ReadRequest{Id: id} + + require.Eventually(t, func() bool { + res, err := client.Read(context.Background(), req) + if err != nil { + return false + } + + if res.GetResource().GetVersion() == "" { + return false + } + + return true + }, 5*time.Second, + time.Second) +} + func TokenReviewsResponse(name, ns string) string { return fmt.Sprintf(`{ "kind": "TokenReview", diff --git a/control-plane/subcommand/inject-connect/v2controllers.go b/control-plane/subcommand/inject-connect/v2controllers.go index d851f71cb8..a9f29cd6a5 100644 --- a/control-plane/subcommand/inject-connect/v2controllers.go +++ b/control-plane/subcommand/inject-connect/v2controllers.go @@ -7,22 +7,21 @@ import ( "context" "github.com/hashicorp/consul-server-connection-manager/discovery" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/manager" + + "github.com/hashicorp/consul-k8s/control-plane/connect-inject/controllers/pod" + "github.com/hashicorp/consul-k8s/control-plane/subcommand/flags" ) func (c *Command) configureV2Controllers(ctx context.Context, mgr manager.Manager, watcher *discovery.Watcher) error { - //resourceClient, err := consul.NewResourceServiceClient(watcher) - //if err != nil { - // return fmt.Errorf("unable to create Consul resource service client: %w", err) - //} + // Create Consul API config object. + consulConfig := c.consul.ConsulClientConfig() - //// Create Consul API config object. - //consulConfig := c.consul.ConsulClientConfig() - // - ////Convert allow/deny lists to sets. - //allowK8sNamespaces := flags.ToSet(c.flagAllowK8sNamespacesList) - //denyK8sNamespaces := flags.ToSet(c.flagDenyK8sNamespacesList) + //Convert allow/deny lists to sets. + allowK8sNamespaces := flags.ToSet(c.flagAllowK8sNamespacesList) + denyK8sNamespaces := flags.ToSet(c.flagDenyK8sNamespacesList) //lifecycleConfig := lifecycle.Config{ // DefaultEnableProxyLifecycle: c.flagDefaultEnableSidecarProxyLifecycle, @@ -41,32 +40,24 @@ func (c *Command) configureV2Controllers(ctx context.Context, mgr manager.Manage // DefaultPrometheusScrapePath: c.flagDefaultPrometheusScrapePath, //} - // TODO(dans): Pods Controller - //if err := (&pod.Controller{ - // Client: mgr.GetClient(), - // ConsulClientConfig: consulConfig, - // ConsulServerConnMgr: watcher, - // ConsulResourceServiceClient: client, - // AllowK8sNamespacesSet: allowK8sNamespaces, - // DenyK8sNamespacesSet: denyK8sNamespaces, - // MetricsConfig: metricsConfig, - // EnableConsulPartitions: c.flagEnablePartitions, - // EnableConsulNamespaces: c.flagEnableNamespaces, - // ConsulDestinationNamespace: c.flagConsulDestinationNamespace, - // EnableNSMirroring: c.flagEnableK8SNSMirroring, - // NSMirroringPrefix: c.flagK8SNSMirroringPrefix, - // EnableTransparentProxy: c.flagDefaultEnableTransparentProxy, - // TProxyOverwriteProbes: c.flagTransparentProxyDefaultOverwriteProbes, - // AuthMethod: c.flagACLAuthMethod, - // NodeMeta: c.flagNodeMeta, - // Log: ctrl.Log.WithName("controller").WithName("pods"), - // Scheme: mgr.GetScheme(), - // EnableTelemetryCollector: c.flagEnableTelemetryCollector, - // Context: ctx, - //}).SetupWithManager(mgr); err != nil { - // setupLog.Error(err, "unable to create controller", "controller", pod.Controller{}) - // return err - //} + if err := (&pod.Controller{ + Client: mgr.GetClient(), + ConsulClientConfig: consulConfig, + ConsulServerConnMgr: watcher, + AllowK8sNamespacesSet: allowK8sNamespaces, + DenyK8sNamespacesSet: denyK8sNamespaces, + EnableConsulPartitions: c.flagEnablePartitions, + EnableConsulNamespaces: c.flagEnableNamespaces, + ConsulDestinationNamespace: c.flagConsulDestinationNamespace, + EnableNSMirroring: c.flagEnableK8SNSMirroring, + NSMirroringPrefix: c.flagK8SNSMirroringPrefix, + ConsulPartition: c.consul.Partition, + AuthMethod: c.flagACLAuthMethod, + Log: ctrl.Log.WithName("controller").WithName("pods"), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", pod.Controller{}) + return err + } // TODO: V2 Endpoints Controller diff --git a/control-plane/subcommand/server-acl-init/command_test.go b/control-plane/subcommand/server-acl-init/command_test.go index 68ce4c1e02..d7937d4946 100644 --- a/control-plane/subcommand/server-acl-init/command_test.go +++ b/control-plane/subcommand/server-acl-init/command_test.go @@ -18,9 +18,6 @@ import ( "testing" "time" - "github.com/hashicorp/consul-k8s/control-plane/helper/cert" - "github.com/hashicorp/consul-k8s/control-plane/helper/test" - "github.com/hashicorp/consul-k8s/control-plane/subcommand/common" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/freeport" "github.com/hashicorp/consul/sdk/testutil" @@ -32,6 +29,10 @@ import ( k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" + + "github.com/hashicorp/consul-k8s/control-plane/helper/cert" + "github.com/hashicorp/consul-k8s/control-plane/helper/test" + "github.com/hashicorp/consul-k8s/control-plane/subcommand/common" ) var ns = "default" @@ -1119,7 +1120,7 @@ func TestRun_NoLeader(t *testing.T) { cmd := Command{ UI: ui, clientset: k8s, - watcher: test.MockConnMgrForIPAndPort(serverURL.Hostname(), port), + watcher: test.MockConnMgrForIPAndPort(t, serverURL.Hostname(), port), } done := make(chan bool) @@ -1375,7 +1376,7 @@ func TestRun_ClientPolicyAndBindingRuleRetry(t *testing.T) { cmd := Command{ UI: ui, clientset: k8s, - watcher: test.MockConnMgrForIPAndPort(serverURL.Hostname(), port), + watcher: test.MockConnMgrForIPAndPort(t, serverURL.Hostname(), port), } responseCode := cmd.Run([]string{ "-timeout=1m", @@ -1524,7 +1525,7 @@ func TestRun_AlreadyBootstrapped(t *testing.T) { cmd := Command{ UI: ui, clientset: k8s, - watcher: test.MockConnMgrForIPAndPort(serverURL.Hostname(), port), + watcher: test.MockConnMgrForIPAndPort(t, serverURL.Hostname(), port), } responseCode := cmd.Run(cmdArgs) @@ -1709,7 +1710,7 @@ func TestRun_SkipBootstrapping_WhenServersAreDisabled(t *testing.T) { cmd := Command{ UI: ui, clientset: k8s, - watcher: test.MockConnMgrForIPAndPort(serverURL.Hostname(), port), + watcher: test.MockConnMgrForIPAndPort(t, serverURL.Hostname(), port), backend: &FakeSecretsBackend{bootstrapToken: bootToken}, } responseCode := cmd.Run([]string{ @@ -1753,7 +1754,7 @@ func TestRun_Timeout(t *testing.T) { cmd := Command{ UI: ui, clientset: k8s, - watcher: test.MockConnMgrForIPAndPort("localhost", 12345), + watcher: test.MockConnMgrForIPAndPort(t, "localhost", 12345), } responseCode := cmd.Run([]string{ From 208a9c4903dca1cd540743a9b1a4387c55b6ddd2 Mon Sep 17 00:00:00 2001 From: DanStough Date: Thu, 31 Aug 2023 14:42:15 -0400 Subject: [PATCH 2/2] test fix --- .../api-gateway/cache/consul_test.go | 6 +- .../catalog/to-consul/syncer_test.go | 2 +- .../webhook/mesh_webhook_test.go | 57 +++++++++++++++++++ control-plane/helper/test/test_util.go | 22 ++++--- .../server-acl-init/command_test.go | 10 ++-- 5 files changed, 79 insertions(+), 18 deletions(-) diff --git a/control-plane/api-gateway/cache/consul_test.go b/control-plane/api-gateway/cache/consul_test.go index 895c59e8c9..3a4423f6b4 100644 --- a/control-plane/api-gateway/cache/consul_test.go +++ b/control-plane/api-gateway/cache/consul_test.go @@ -1323,7 +1323,7 @@ func TestCache_Write(t *testing.T) { GRPCPort: port, APITimeout: 0, }, - ConsulServerConnMgr: test.MockConnMgrForIPAndPort(t, serverURL.Hostname(), port), + ConsulServerConnMgr: test.MockConnMgrForIPAndPort(t, serverURL.Hostname(), port, false), NamespacesEnabled: false, Logger: logrtest.NewTestLogger(t), }) @@ -1601,7 +1601,7 @@ func Test_Run(t *testing.T) { GRPCPort: port, APITimeout: 0, }, - ConsulServerConnMgr: test.MockConnMgrForIPAndPort(t, serverURL.Hostname(), port), + ConsulServerConnMgr: test.MockConnMgrForIPAndPort(t, serverURL.Hostname(), port, false), NamespacesEnabled: false, Logger: logrtest.NewTestLogger(t), }) @@ -2002,7 +2002,7 @@ func TestCache_Delete(t *testing.T) { GRPCPort: port, APITimeout: 0, }, - ConsulServerConnMgr: test.MockConnMgrForIPAndPort(t, serverURL.Hostname(), port), + ConsulServerConnMgr: test.MockConnMgrForIPAndPort(t, serverURL.Hostname(), port, false), NamespacesEnabled: false, Logger: logrtest.NewTestLogger(t), }) diff --git a/control-plane/catalog/to-consul/syncer_test.go b/control-plane/catalog/to-consul/syncer_test.go index ece2e1dd05..ab2cfee0a2 100644 --- a/control-plane/catalog/to-consul/syncer_test.go +++ b/control-plane/catalog/to-consul/syncer_test.go @@ -234,7 +234,7 @@ func TestConsulSyncer_stopsGracefully(t *testing.T) { testClient := &test.TestServerClient{ Cfg: &consul.Config{APIClientConfig: &api.Config{}, HTTPPort: port}, - Watcher: test.MockConnMgrForIPAndPort(t, parsedURL.Host, port), + Watcher: test.MockConnMgrForIPAndPort(t, parsedURL.Host, port, false), } // Start the syncer. diff --git a/control-plane/connect-inject/webhook/mesh_webhook_test.go b/control-plane/connect-inject/webhook/mesh_webhook_test.go index 4345403bcd..2b71c08500 100644 --- a/control-plane/connect-inject/webhook/mesh_webhook_test.go +++ b/control-plane/connect-inject/webhook/mesh_webhook_test.go @@ -250,6 +250,10 @@ func TestHandlerHandle(t *testing.T) { Operation: "add", Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, + { + Operation: "add", + Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + }, { Operation: "add", Path: "/spec/volumes", @@ -343,6 +347,10 @@ func TestHandlerHandle(t *testing.T) { Operation: "add", Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, + { + Operation: "add", + Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + }, { Operation: "add", Path: "/metadata/labels", @@ -398,6 +406,10 @@ func TestHandlerHandle(t *testing.T) { Operation: "add", Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, + { + Operation: "add", + Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + }, { Operation: "add", Path: "/metadata/labels", @@ -472,6 +484,10 @@ func TestHandlerHandle(t *testing.T) { Operation: "add", Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, + { + Operation: "add", + Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + }, { Operation: "add", Path: "/metadata/labels", @@ -532,6 +548,10 @@ func TestHandlerHandle(t *testing.T) { Operation: "add", Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, + { + Operation: "add", + Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + }, { Operation: "add", Path: "/metadata/labels", @@ -617,6 +637,10 @@ func TestHandlerHandle(t *testing.T) { Operation: "add", Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, + { + Operation: "add", + Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + }, { Operation: "add", Path: "/metadata/labels", @@ -754,6 +778,10 @@ func TestHandlerHandle(t *testing.T) { Operation: "add", Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, + { + Operation: "add", + Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + }, { Operation: "replace", Path: "/spec/containers/0/livenessProbe/httpGet/port", @@ -817,6 +845,10 @@ func TestHandlerHandle(t *testing.T) { Operation: "add", Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, + { + Operation: "add", + Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + }, { Operation: "add", Path: "/metadata/labels", @@ -876,6 +908,10 @@ func TestHandlerHandle(t *testing.T) { Operation: "add", Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, + { + Operation: "add", + Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + }, { Operation: "add", Path: "/metadata/labels", @@ -961,6 +997,10 @@ func TestHandlerHandle(t *testing.T) { Operation: "add", Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, + { + Operation: "add", + Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + }, { Operation: "add", Path: "/metadata/labels", @@ -1031,6 +1071,10 @@ func TestHandlerHandle(t *testing.T) { Operation: "add", Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, + { + Operation: "add", + Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + }, { Operation: "add", Path: "/spec/dnsPolicy", @@ -1103,6 +1147,10 @@ func TestHandlerHandle(t *testing.T) { Operation: "add", Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), }, + { + Operation: "add", + Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), + }, // Note: no DNS policy/config additions. }, }, @@ -1275,6 +1323,10 @@ func TestHandlerHandle_ValidateOverwriteProbes(t *testing.T) { Operation: "add", Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationOriginalPod), }, + { + Operation: "add", + Path: "/metadata/annotations/" + escapeJSONPointer(constants.LegacyAnnotationConsulK8sVersion), + }, { Operation: "add", Path: "/metadata/annotations/" + escapeJSONPointer(constants.AnnotationConsulK8sVersion), @@ -1359,6 +1411,7 @@ func TestHandlerDefaultAnnotations(t *testing.T) { map[string]string{ constants.AnnotationOriginalPod: "{\"metadata\":{\"creationTimestamp\":null},\"spec\":{\"containers\":null},\"status\":{}}", constants.LegacyAnnotationConsulK8sVersion: version.GetHumanVersion(), + constants.AnnotationConsulK8sVersion: version.GetHumanVersion(), }, "", }, @@ -1380,6 +1433,7 @@ func TestHandlerDefaultAnnotations(t *testing.T) { map[string]string{ constants.AnnotationOriginalPod: "{\"metadata\":{\"creationTimestamp\":null},\"spec\":{\"containers\":[{\"name\":\"web\",\"resources\":{}},{\"name\":\"web-side\",\"resources\":{}}]},\"status\":{}}", constants.LegacyAnnotationConsulK8sVersion: version.GetHumanVersion(), + constants.AnnotationConsulK8sVersion: version.GetHumanVersion(), }, "", }, @@ -1408,6 +1462,7 @@ func TestHandlerDefaultAnnotations(t *testing.T) { "consul.hashicorp.com/connect-service": "foo", constants.AnnotationOriginalPod: "{\"metadata\":{\"creationTimestamp\":null,\"annotations\":{\"consul.hashicorp.com/connect-service\":\"foo\"}},\"spec\":{\"containers\":[{\"name\":\"web\",\"resources\":{}},{\"name\":\"web-side\",\"resources\":{}}]},\"status\":{}}", constants.LegacyAnnotationConsulK8sVersion: version.GetHumanVersion(), + constants.AnnotationConsulK8sVersion: version.GetHumanVersion(), }, "", @@ -1437,6 +1492,7 @@ func TestHandlerDefaultAnnotations(t *testing.T) { constants.AnnotationPort: "http", constants.AnnotationOriginalPod: "{\"metadata\":{\"creationTimestamp\":null},\"spec\":{\"containers\":[{\"name\":\"web\",\"ports\":[{\"name\":\"http\",\"containerPort\":8080}],\"resources\":{}},{\"name\":\"web-side\",\"resources\":{}}]},\"status\":{}}", constants.LegacyAnnotationConsulK8sVersion: version.GetHumanVersion(), + constants.AnnotationConsulK8sVersion: version.GetHumanVersion(), }, "", }, @@ -1464,6 +1520,7 @@ func TestHandlerDefaultAnnotations(t *testing.T) { constants.AnnotationPort: "8080", constants.AnnotationOriginalPod: "{\"metadata\":{\"creationTimestamp\":null},\"spec\":{\"containers\":[{\"name\":\"web\",\"ports\":[{\"containerPort\":8080}],\"resources\":{}},{\"name\":\"web-side\",\"resources\":{}}]},\"status\":{}}", constants.LegacyAnnotationConsulK8sVersion: version.GetHumanVersion(), + constants.AnnotationConsulK8sVersion: version.GetHumanVersion(), }, "", }, diff --git a/control-plane/helper/test/test_util.go b/control-plane/helper/test/test_util.go index b3544a38bb..915c65704e 100644 --- a/control-plane/helper/test/test_util.go +++ b/control-plane/helper/test/test_util.go @@ -67,20 +67,14 @@ func TestServerWithMockConnMgrWatcher(t *testing.T, callback testutil.ServerConf TestServer: consulServer, APIClient: client, Cfg: consulConfig, - Watcher: MockConnMgrForIPAndPort(t, "127.0.0.1", cfg.Ports.GRPC), + Watcher: MockConnMgrForIPAndPort(t, "127.0.0.1", cfg.Ports.GRPC, true), } } -func MockConnMgrForIPAndPort(t *testing.T, ip string, port int) *consul.MockServerConnectionManager { +func MockConnMgrForIPAndPort(t *testing.T, ip string, port int, enableGRPCConn bool) *consul.MockServerConnectionManager { parsedIP := net.ParseIP(ip) connMgr := &consul.MockServerConnectionManager{} - conn, err := grpc.DialContext( - context.Background(), - fmt.Sprintf("%s:%d", parsedIP, port), - grpc.WithTransportCredentials(insecure.NewCredentials())) - require.NoError(t, err) - mockState := discovery.State{ Address: discovery.Addr{ TCPAddr: net.TCPAddr{ @@ -88,7 +82,17 @@ func MockConnMgrForIPAndPort(t *testing.T, ip string, port int) *consul.MockServ Port: port, }, }, - GRPCConn: conn, + } + + // If the connection is enabled, some tests will receive extra HTTP API calls where + // the server is being dialed. + if enableGRPCConn { + conn, err := grpc.DialContext( + context.Background(), + fmt.Sprintf("%s:%d", parsedIP, port), + grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + mockState.GRPCConn = conn } connMgr.On("State").Return(mockState, nil) connMgr.On("Run").Return(nil) diff --git a/control-plane/subcommand/server-acl-init/command_test.go b/control-plane/subcommand/server-acl-init/command_test.go index d7937d4946..c7d44c0695 100644 --- a/control-plane/subcommand/server-acl-init/command_test.go +++ b/control-plane/subcommand/server-acl-init/command_test.go @@ -1120,7 +1120,7 @@ func TestRun_NoLeader(t *testing.T) { cmd := Command{ UI: ui, clientset: k8s, - watcher: test.MockConnMgrForIPAndPort(t, serverURL.Hostname(), port), + watcher: test.MockConnMgrForIPAndPort(t, serverURL.Hostname(), port, false), } done := make(chan bool) @@ -1376,7 +1376,7 @@ func TestRun_ClientPolicyAndBindingRuleRetry(t *testing.T) { cmd := Command{ UI: ui, clientset: k8s, - watcher: test.MockConnMgrForIPAndPort(t, serverURL.Hostname(), port), + watcher: test.MockConnMgrForIPAndPort(t, serverURL.Hostname(), port, false), } responseCode := cmd.Run([]string{ "-timeout=1m", @@ -1525,7 +1525,7 @@ func TestRun_AlreadyBootstrapped(t *testing.T) { cmd := Command{ UI: ui, clientset: k8s, - watcher: test.MockConnMgrForIPAndPort(t, serverURL.Hostname(), port), + watcher: test.MockConnMgrForIPAndPort(t, serverURL.Hostname(), port, false), } responseCode := cmd.Run(cmdArgs) @@ -1710,7 +1710,7 @@ func TestRun_SkipBootstrapping_WhenServersAreDisabled(t *testing.T) { cmd := Command{ UI: ui, clientset: k8s, - watcher: test.MockConnMgrForIPAndPort(t, serverURL.Hostname(), port), + watcher: test.MockConnMgrForIPAndPort(t, serverURL.Hostname(), port, false), backend: &FakeSecretsBackend{bootstrapToken: bootToken}, } responseCode := cmd.Run([]string{ @@ -1754,7 +1754,7 @@ func TestRun_Timeout(t *testing.T) { cmd := Command{ UI: ui, clientset: k8s, - watcher: test.MockConnMgrForIPAndPort(t, "localhost", 12345), + watcher: test.MockConnMgrForIPAndPort(t, "localhost", 12345, false), } responseCode := cmd.Run([]string{