diff --git a/PROJECT b/PROJECT index df71ff42ef4..80b37678763 100644 --- a/PROJECT +++ b/PROJECT @@ -17,4 +17,10 @@ resources: - group: mutations kind: Assign version: v1alpha1 +- group: status + kind: ConnectionPodStatus + version: v1alpha1 +- group: connection + kind: Connection + version: v1alpha1 version: "2" diff --git a/apis/addtoscheme_connection_v1alpha1.go b/apis/addtoscheme_connection_v1alpha1.go new file mode 100644 index 00000000000..4e4cd66fccb --- /dev/null +++ b/apis/addtoscheme_connection_v1alpha1.go @@ -0,0 +1,25 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apis + +import ( + "github.com/open-policy-agent/gatekeeper/v3/apis/connection/v1alpha1" +) + +func init() { + // Register the types with the Scheme so the components can map objects to GroupVersionKinds and back + AddToSchemes = append(AddToSchemes, v1alpha1.AddToScheme) +} diff --git a/apis/addtoscheme_status_v1alpha1.go b/apis/addtoscheme_status_v1alpha1.go new file mode 100644 index 00000000000..62ff8f83afd --- /dev/null +++ b/apis/addtoscheme_status_v1alpha1.go @@ -0,0 +1,25 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apis + +import ( + "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1alpha1" +) + +func init() { + // Register the types with the Scheme so the components can map objects to GroupVersionKinds and back + AddToSchemes = append(AddToSchemes, v1alpha1.AddToScheme) +} diff --git a/apis/connection/v1alpha1/connection_types.go b/apis/connection/v1alpha1/connection_types.go new file mode 100644 index 00000000000..303733ef7de --- /dev/null +++ b/apis/connection/v1alpha1/connection_types.go @@ -0,0 +1,67 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + statusv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1alpha1" + "github.com/open-policy-agent/gatekeeper/v3/pkg/mutation/types" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! +// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. + +// ConnectionSpec defines the desired state of Connection +type ConnectionSpec struct { + // +kubebuilder:validation:Required + // Driver is the name of one of the expected drivers i.e. dapr, disk + Driver string `json:"driver"` + // +kubebuilder:validation:Required + // +kubebuilder:validation:Schemaless + // +kubebuilder:validation:XPreserveUnknownFields + Config *types.Anything `json:"config"` +} + +// ConnectionStatus defines the observed state of Connection +type ConnectionStatus struct { + ByPod []statusv1alpha1.ConnectionPodStatusStatus `json:"byPod,omitempty"` +} + +// +kubebuilder:resource:scope=Namespaced +// +kubebuilder:object:root=true +// +kubebuilder:subresource:status +// +kubebuilder:storageversion +// Connection is the Schema for the connections API +type Connection struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec ConnectionSpec `json:"spec,omitempty"` + Status ConnectionStatus `json:"status,omitempty"` +} + +// +kubebuilder:object:root=true + +// ConnectionList contains a list of Connection +type ConnectionList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []Connection `json:"items"` +} + +func init() { + SchemeBuilder.Register(&Connection{}, &ConnectionList{}) +} diff --git a/apis/connection/v1alpha1/groupversion_info.go b/apis/connection/v1alpha1/groupversion_info.go new file mode 100644 index 00000000000..713e8d50d10 --- /dev/null +++ b/apis/connection/v1alpha1/groupversion_info.go @@ -0,0 +1,35 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package v1alpha1 contains API Schema definitions for the connection v1alpha1 API group +// +kubebuilder:object:generate=true +// +groupName=connection.gatekeeper.sh +package v1alpha1 + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/scheme" +) + +var ( + // GroupVersion is group version used to register these objects + GroupVersion = schema.GroupVersion{Group: "connection.gatekeeper.sh", Version: "v1alpha1"} + + // SchemeBuilder is used to add go types to the GroupVersionKind scheme + SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion} + + // AddToScheme adds the types in this group-version to the given scheme. + AddToScheme = SchemeBuilder.AddToScheme +) diff --git a/apis/connection/v1alpha1/zz_generated.deepcopy.go b/apis/connection/v1alpha1/zz_generated.deepcopy.go new file mode 100644 index 00000000000..a0a504967cb --- /dev/null +++ b/apis/connection/v1alpha1/zz_generated.deepcopy.go @@ -0,0 +1,125 @@ +//go:build !ignore_autogenerated + +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by controller-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + statusv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1alpha1" + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Connection) DeepCopyInto(out *Connection) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Connection. +func (in *Connection) DeepCopy() *Connection { + if in == nil { + return nil + } + out := new(Connection) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *Connection) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ConnectionList) DeepCopyInto(out *ConnectionList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]Connection, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionList. +func (in *ConnectionList) DeepCopy() *ConnectionList { + if in == nil { + return nil + } + out := new(ConnectionList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ConnectionList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ConnectionSpec) DeepCopyInto(out *ConnectionSpec) { + *out = *in + if in.Config != nil { + in, out := &in.Config, &out.Config + *out = (*in).DeepCopy() + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionSpec. +func (in *ConnectionSpec) DeepCopy() *ConnectionSpec { + if in == nil { + return nil + } + out := new(ConnectionSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ConnectionStatus) DeepCopyInto(out *ConnectionStatus) { + *out = *in + if in.ByPod != nil { + in, out := &in.ByPod, &out.ByPod + *out = make([]statusv1alpha1.ConnectionPodStatusStatus, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionStatus. +func (in *ConnectionStatus) DeepCopy() *ConnectionStatus { + if in == nil { + return nil + } + out := new(ConnectionStatus) + in.DeepCopyInto(out) + return out +} diff --git a/apis/status/v1alpha1/connectionpodstatus_types.go b/apis/status/v1alpha1/connectionpodstatus_types.go new file mode 100644 index 00000000000..b48d34167de --- /dev/null +++ b/apis/status/v1alpha1/connectionpodstatus_types.go @@ -0,0 +1,105 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1beta1" + "github.com/open-policy-agent/gatekeeper/v3/pkg/operations" + "github.com/open-policy-agent/gatekeeper/v3/pkg/util" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! +// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. + +// ConnectionPodStatusStatus defines the observed state of ConnectionPodStatus +type ConnectionPodStatusStatus struct { + // ID is the unique identifier for the pod that wrote the status + ID string `json:"id,omitempty"` + ConnectionUID types.UID `json:"connectionUID,omitempty"` + Operations []string `json:"operations,omitempty"` + ObservedGeneration int64 `json:"observedGeneration,omitempty"` + // Indicator for alive connection with at least one successful publish + Active bool `json:"active,omitempty"` + Errors []*ConnectionError `json:"errors,omitempty"` +} + +type ConnectionError struct { + Type connectionErrorType `json:"type"` + Message string `json:"message"` +} + +type connectionErrorType string + +const ( + UpsertConnectionError connectionErrorType = "UpsertConnection" + PublishError connectionErrorType = "Publish" +) + +// +kubebuilder:object:root=true +// ConnectionPodStatus is the Schema for the connectionpodstatuses API +type ConnectionPodStatus struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + // No spec field is defined here, as this is a status-only resource. + Status ConnectionPodStatusStatus `json:"status,omitempty"` +} + +// +kubebuilder:object:root=true +// ConnectionPodStatusList contains a list of ConnectionPodStatus +type ConnectionPodStatusList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []ConnectionPodStatus `json:"items"` +} + +// NewConnectionStatusForPod returns a connection status object +// that has been initialized with the bare minimum of fields to make it functional +// with the connection status controller. +func NewConnectionStatusForPod(pod *corev1.Pod, connectionNamespace, connectionName string, scheme *runtime.Scheme) (*ConnectionPodStatus, error) { + obj := &ConnectionPodStatus{} + name, err := KeyForConnection(pod.Name, connectionNamespace, connectionName) + if err != nil { + return nil, err + } + obj.SetName(name) + obj.SetNamespace(util.GetNamespace()) + obj.Status.ID = pod.Name + obj.Status.Operations = operations.AssignedStringList() + obj.SetLabels(map[string]string{ + v1beta1.ConnectionNameLabel: connectionName, + v1beta1.PodLabel: pod.Name, + }) + + if err := controllerutil.SetOwnerReference(pod, obj, scheme); err != nil { + return nil, err + } + + return obj, nil +} + +// KeyForConnection returns a unique status object name given the Pod ID and a connection object. +func KeyForConnection(id string, connectionNamespace string, connectionName string) (string, error) { + return v1beta1.DashPacker(id, connectionNamespace, connectionName) +} + +func init() { + SchemeBuilder.Register(&ConnectionPodStatus{}, &ConnectionPodStatusList{}) +} diff --git a/apis/status/v1alpha1/connectionpodstatus_types_test.go b/apis/status/v1alpha1/connectionpodstatus_types_test.go new file mode 100644 index 00000000000..ce8adf296a8 --- /dev/null +++ b/apis/status/v1alpha1/connectionpodstatus_types_test.go @@ -0,0 +1,70 @@ +package v1alpha1_test + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1alpha1" + "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1beta1" + "github.com/open-policy-agent/gatekeeper/v3/pkg/fakes" + "github.com/open-policy-agent/gatekeeper/v3/pkg/operations" + "github.com/open-policy-agent/gatekeeper/v3/test/testutils" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +func TestNewConnectionStatusForPod(t *testing.T) { + const podName = "some-gk-pod" + const podNS = "a-gk-namespace" + const connectionName = "audit" + const connectionNamespace = "a-gk-ns" + + testutils.Setenv(t, "POD_NAMESPACE", podNS) + + scheme := runtime.NewScheme() + err := v1beta1.AddToScheme(scheme) + if err != nil { + t.Fatal(err) + } + + err = corev1.AddToScheme(scheme) + if err != nil { + t.Fatal(err) + } + + pod := fakes.Pod( + fakes.WithNamespace(podNS), + fakes.WithName(podName), + ) + + expectedStatus := &v1alpha1.ConnectionPodStatus{} + expectedStatus.SetName("some--gk--pod-a--gk--ns-audit") + expectedStatus.SetNamespace(podNS) + expectedStatus.Status.ID = podName + expectedStatus.Status.Operations = operations.AssignedStringList() + expectedStatus.SetLabels(map[string]string{ + v1beta1.ConnectionNameLabel: connectionName, + v1beta1.PodLabel: podName, + }) + + err = controllerutil.SetOwnerReference(pod, expectedStatus, scheme) + if err != nil { + t.Fatal(err) + } + + status, err := v1alpha1.NewConnectionStatusForPod(pod, connectionNamespace, connectionName, scheme) + if err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(expectedStatus, status); diff != "" { + t.Fatal(diff) + } + n, err := v1alpha1.KeyForConnection(podName, connectionNamespace, connectionName) + if err != nil { + t.Fatal(err) + } + if status.Name != n { + t.Fatal("got status.Name != n, want equal") + } +} diff --git a/apis/status/v1alpha1/groupversion_info.go b/apis/status/v1alpha1/groupversion_info.go new file mode 100644 index 00000000000..0519ee74c89 --- /dev/null +++ b/apis/status/v1alpha1/groupversion_info.go @@ -0,0 +1,35 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package v1alpha1 contains API Schema definitions for the status v1alpha1 API group +// +kubebuilder:object:generate=true +// +groupName=status.gatekeeper.sh +package v1alpha1 + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/scheme" +) + +var ( + // GroupVersion is group version used to register these objects. + GroupVersion = schema.GroupVersion{Group: "status.gatekeeper.sh", Version: "v1alpha1"} + + // SchemeBuilder is used to add go types to the GroupVersionKind scheme. + SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion} + + // AddToScheme adds the types in this group-version to the given scheme. + AddToScheme = SchemeBuilder.AddToScheme +) diff --git a/apis/status/v1alpha1/zz_generated.deepcopy.go b/apis/status/v1alpha1/zz_generated.deepcopy.go new file mode 100644 index 00000000000..43fa977fe9a --- /dev/null +++ b/apis/status/v1alpha1/zz_generated.deepcopy.go @@ -0,0 +1,128 @@ +//go:build !ignore_autogenerated + +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by controller-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ConnectionError) DeepCopyInto(out *ConnectionError) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionError. +func (in *ConnectionError) DeepCopy() *ConnectionError { + if in == nil { + return nil + } + out := new(ConnectionError) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ConnectionPodStatus) DeepCopyInto(out *ConnectionPodStatus) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionPodStatus. +func (in *ConnectionPodStatus) DeepCopy() *ConnectionPodStatus { + if in == nil { + return nil + } + out := new(ConnectionPodStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ConnectionPodStatus) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ConnectionPodStatusList) DeepCopyInto(out *ConnectionPodStatusList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]ConnectionPodStatus, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionPodStatusList. +func (in *ConnectionPodStatusList) DeepCopy() *ConnectionPodStatusList { + if in == nil { + return nil + } + out := new(ConnectionPodStatusList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ConnectionPodStatusList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ConnectionPodStatusStatus) DeepCopyInto(out *ConnectionPodStatusStatus) { + *out = *in + if in.Operations != nil { + in, out := &in.Operations, &out.Operations + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.Errors != nil { + in, out := &in.Errors, &out.Errors + *out = make([]*ConnectionError, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(ConnectionError) + **out = **in + } + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionPodStatusStatus. +func (in *ConnectionPodStatusStatus) DeepCopy() *ConnectionPodStatusStatus { + if in == nil { + return nil + } + out := new(ConnectionPodStatusStatus) + in.DeepCopyInto(out) + return out +} diff --git a/apis/status/v1beta1/labels.go b/apis/status/v1beta1/labels.go index 61f3a7f384f..1cd9dfcb390 100644 --- a/apis/status/v1beta1/labels.go +++ b/apis/status/v1beta1/labels.go @@ -10,4 +10,5 @@ const ( MutatorNameLabel = "internal.gatekeeper.sh/mutator-name" MutatorKindLabel = "internal.gatekeeper.sh/mutator-kind" PodLabel = "internal.gatekeeper.sh/pod" + ConnectionNameLabel = "internal.gatekeeper.sh/connection-name" ) diff --git a/cmd/build/helmify/kustomization.yaml b/cmd/build/helmify/kustomization.yaml index d229b3a4eee..1fbf3cbffc3 100644 --- a/cmd/build/helmify/kustomization.yaml +++ b/cmd/build/helmify/kustomization.yaml @@ -94,6 +94,18 @@ patchesJson6902: kind: CustomResourceDefinition name: providers.externaldata.gatekeeper.sh path: labels_patch.yaml + - target: + group: apiextensions.k8s.io + version: v1 + kind: CustomResourceDefinition + name: connections.connection.gatekeeper.sh + path: labels_patch.yaml + - target: + group: apiextensions.k8s.io + version: v1 + kind: CustomResourceDefinition + name: connectionpodstatuses.status.gatekeeper.sh + path: labels_patch.yaml # these are defined in the chart values rather than hard-coded - target: kind: Deployment diff --git a/cmd/build/helmify/static/templates/gatekeeper-audit-violation-export-config.yaml b/cmd/build/helmify/static/templates/gatekeeper-audit-violation-export-connection.yaml similarity index 54% rename from cmd/build/helmify/static/templates/gatekeeper-audit-violation-export-config.yaml rename to cmd/build/helmify/static/templates/gatekeeper-audit-violation-export-connection.yaml index 08244a8bb50..9109db9e0e2 100644 --- a/cmd/build/helmify/static/templates/gatekeeper-audit-violation-export-config.yaml +++ b/cmd/build/helmify/static/templates/gatekeeper-audit-violation-export-connection.yaml @@ -1,15 +1,13 @@ --- {{- if and (.Values.enableViolationExport) (eq (.Values.exportBackend | default "" | lower) "disk") }} -apiVersion: v1 -kind: ConfigMap +apiVersion: connection.gatekeeper.sh/v1alpha1 +kind: Connection metadata: name: '{{ .Values.audit.connection }}' namespace: '{{ .Release.Namespace }}' -data: +spec: driver: '{{ .Values.exportBackend }}' - config: | - { - "path": "{{ .Values.audit.exportVolumeMount.path }}", - "maxAuditResults": {{ .Values.audit.exportConfig.maxAuditResults }} - } + config: + path: "{{ .Values.audit.exportVolumeMount.path }}" + maxAuditResults: {{ .Values.audit.exportConfig.maxAuditResults }} {{- end }} diff --git a/config/crd/bases/connection.gatekeeper.sh_connections.yaml b/config/crd/bases/connection.gatekeeper.sh_connections.yaml new file mode 100644 index 00000000000..f1cab3dfb4a --- /dev/null +++ b/config/crd/bases/connection.gatekeeper.sh_connections.yaml @@ -0,0 +1,100 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.14.0 + name: connections.connection.gatekeeper.sh +spec: + group: connection.gatekeeper.sh + names: + kind: Connection + listKind: ConnectionList + plural: connections + singular: connection + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: Connection is the Schema for the connections API + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: ConnectionSpec defines the desired state of Connection + properties: + config: + x-kubernetes-preserve-unknown-fields: true + driver: + description: Driver is the name of one of the expected drivers i.e. + dapr, disk + type: string + required: + - config + - driver + type: object + status: + description: ConnectionStatus defines the observed state of Connection + properties: + byPod: + items: + description: ConnectionPodStatusStatus defines the observed state + of ConnectionPodStatus + properties: + active: + description: Indicator for alive connection with at least one + successful publish + type: boolean + connectionUID: + description: |- + UID is a type that holds unique ID values, including UUIDs. Because we + don't ONLY use UUIDs, this is an alias to string. Being a type captures + intent and helps make sure that UIDs and names do not get conflated. + type: string + errors: + items: + properties: + message: + type: string + type: + type: string + required: + - message + - type + type: object + type: array + id: + description: ID is the unique identifier for the pod that wrote + the status + type: string + observedGeneration: + format: int64 + type: integer + operations: + items: + type: string + type: array + type: object + type: array + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/config/crd/bases/status.gatekeeper.sh_connectionpodstatuses.yaml b/config/crd/bases/status.gatekeeper.sh_connectionpodstatuses.yaml new file mode 100644 index 00000000000..1b6de003171 --- /dev/null +++ b/config/crd/bases/status.gatekeeper.sh_connectionpodstatuses.yaml @@ -0,0 +1,79 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.14.0 + name: connectionpodstatuses.status.gatekeeper.sh +spec: + group: status.gatekeeper.sh + names: + kind: ConnectionPodStatus + listKind: ConnectionPodStatusList + plural: connectionpodstatuses + singular: connectionpodstatus + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: ConnectionPodStatus is the Schema for the connectionpodstatuses + API + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + status: + description: No spec field is defined here, as this is a status-only resource. + properties: + active: + description: Indicator for alive connection with at least one successful + publish + type: boolean + connectionUID: + description: |- + UID is a type that holds unique ID values, including UUIDs. Because we + don't ONLY use UUIDs, this is an alias to string. Being a type captures + intent and helps make sure that UIDs and names do not get conflated. + type: string + errors: + items: + properties: + message: + type: string + type: + type: string + required: + - message + - type + type: object + type: array + id: + description: ID is the unique identifier for the pod that wrote the + status + type: string + observedGeneration: + format: int64 + type: integer + operations: + items: + type: string + type: array + type: object + type: object + served: true + storage: true diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index 568b63293b9..d81472abdbf 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -14,6 +14,8 @@ resources: - bases/mutations.gatekeeper.sh_assignmetadata.yaml - bases/mutations.gatekeeper.sh_modifyset.yaml - bases/expansion.gatekeeper.sh_expansiontemplate.yaml +- bases/status.gatekeeper.sh_connectionpodstatuses.yaml +- bases/connection.gatekeeper.sh_connections.yaml # +kubebuilder:scaffold:crdkustomizeresource bases: @@ -76,6 +78,8 @@ patchesStrategicMerge: #- patches/webhook_in_constrainttemplatepodstatuses.yaml #- patches/webhook_in_assignmetadata.yaml #- patches/webhook_in_assign.yaml +#- patches/webhook_in_connections.yaml +#- patches/webhook_in_connectionpodstatuses.yaml # +kubebuilder:scaffold:crdkustomizewebhookpatch # [CERTMANAGER] To enable webhook, uncomment all the sections with [CERTMANAGER] prefix. @@ -85,6 +89,8 @@ patchesStrategicMerge: #- patches/cainjection_in_constrainttemplatepodstatuses.yaml #- patches/cainjection_in_assignmetadata.yaml #- patches/cainjection_in_assign.yaml +#- patches/cainjection_in_connections.yaml +#- patches/cainjection_in_connectionpodstatuses.yaml # +kubebuilder:scaffold:crdkustomizecainjectionpatch # the following config is for teaching kustomize how to do kustomization for CRDs. diff --git a/config/crd/patches/cainjection_in_connectionpodstatuses.yaml b/config/crd/patches/cainjection_in_connectionpodstatuses.yaml new file mode 100644 index 00000000000..9f1f9c70ae9 --- /dev/null +++ b/config/crd/patches/cainjection_in_connectionpodstatuses.yaml @@ -0,0 +1,8 @@ +# The following patch adds a directive for certmanager to inject CA into the CRD +# CRD conversion requires k8s 1.13 or later. +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + annotations: + cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) + name: connectionpodstatuses.status.gatekeeper.sh diff --git a/config/crd/patches/cainjection_in_connections.yaml b/config/crd/patches/cainjection_in_connections.yaml new file mode 100644 index 00000000000..c27be5f40b7 --- /dev/null +++ b/config/crd/patches/cainjection_in_connections.yaml @@ -0,0 +1,8 @@ +# The following patch adds a directive for certmanager to inject CA into the CRD +# CRD conversion requires k8s 1.13 or later. +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + annotations: + cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) + name: connections.connection.gatekeeper.sh diff --git a/config/crd/patches/webhook_in_connectionpodstatuses.yaml b/config/crd/patches/webhook_in_connectionpodstatuses.yaml new file mode 100644 index 00000000000..c27bcc42c20 --- /dev/null +++ b/config/crd/patches/webhook_in_connectionpodstatuses.yaml @@ -0,0 +1,17 @@ +# The following patch enables conversion webhook for CRD +# CRD conversion requires k8s 1.13 or later. +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: connectionpodstatuses.status.gatekeeper.sh +spec: + conversion: + strategy: Webhook + webhookClientConfig: + # this is "\n" used as a placeholder, otherwise it will be rejected by the apiserver for being blank, + # but we're going to set it later using the cert-manager (or potentially a patch if not using cert-manager) + caBundle: Cg== + service: + namespace: system + name: webhook-service + path: /convert diff --git a/config/crd/patches/webhook_in_connections.yaml b/config/crd/patches/webhook_in_connections.yaml new file mode 100644 index 00000000000..41b5c792dd3 --- /dev/null +++ b/config/crd/patches/webhook_in_connections.yaml @@ -0,0 +1,17 @@ +# The following patch enables conversion webhook for CRD +# CRD conversion requires k8s 1.13 or later. +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: connections.connection.gatekeeper.sh +spec: + conversion: + strategy: Webhook + webhookClientConfig: + # this is "\n" used as a placeholder, otherwise it will be rejected by the apiserver for being blank, + # but we're going to set it later using the cert-manager (or potentially a patch if not using cert-manager) + caBundle: Cg== + service: + namespace: system + name: webhook-service + path: /convert diff --git a/config/rbac/connection_editor_role.yaml b/config/rbac/connection_editor_role.yaml new file mode 100644 index 00000000000..a0dd244517e --- /dev/null +++ b/config/rbac/connection_editor_role.yaml @@ -0,0 +1,24 @@ +# permissions for end users to edit connections. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: connection-editor-role +rules: +- apiGroups: + - connection.gatekeeper.sh + resources: + - connections + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - connection.gatekeeper.sh + resources: + - connections/status + verbs: + - get diff --git a/config/rbac/connection_viewer_role.yaml b/config/rbac/connection_viewer_role.yaml new file mode 100644 index 00000000000..ce0e74eccb9 --- /dev/null +++ b/config/rbac/connection_viewer_role.yaml @@ -0,0 +1,20 @@ +# permissions for end users to view connections. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: connection-viewer-role +rules: +- apiGroups: + - connection.gatekeeper.sh + resources: + - connections + verbs: + - get + - list + - watch +- apiGroups: + - connection.gatekeeper.sh + resources: + - connections/status + verbs: + - get diff --git a/config/rbac/connectionpodstatus_editor_role.yaml b/config/rbac/connectionpodstatus_editor_role.yaml new file mode 100644 index 00000000000..2b6a6c9f896 --- /dev/null +++ b/config/rbac/connectionpodstatus_editor_role.yaml @@ -0,0 +1,24 @@ +# permissions for end users to edit connectionpodstatuses. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: connectionpodstatus-editor-role +rules: +- apiGroups: + - status.gatekeeper.sh + resources: + - connectionpodstatuses + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - status.gatekeeper.sh + resources: + - connectionpodstatuses/status + verbs: + - get diff --git a/config/rbac/connectionpodstatus_viewer_role.yaml b/config/rbac/connectionpodstatus_viewer_role.yaml new file mode 100644 index 00000000000..3dfe78e14d7 --- /dev/null +++ b/config/rbac/connectionpodstatus_viewer_role.yaml @@ -0,0 +1,20 @@ +# permissions for end users to view connectionpodstatuses. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: connectionpodstatus-viewer-role +rules: +- apiGroups: + - status.gatekeeper.sh + resources: + - connectionpodstatuses + verbs: + - get + - list + - watch +- apiGroups: + - status.gatekeeper.sh + resources: + - connectionpodstatuses/status + verbs: + - get diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 3b5d1f259a1..668c7dae121 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -88,6 +88,18 @@ rules: - get - patch - update +- apiGroups: + - connection.gatekeeper.sh + resources: + - '*' + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - constraints.gatekeeper.sh resources: diff --git a/config/samples/connection_v1alpha1_connection.yaml b/config/samples/connection_v1alpha1_connection.yaml new file mode 100644 index 00000000000..18a7656cce5 --- /dev/null +++ b/config/samples/connection_v1alpha1_connection.yaml @@ -0,0 +1,10 @@ +apiVersion: connection.gatekeeper.sh/v1alpha1 +kind: Connection +metadata: + name: audit-connection + namespace: gatekeeper-system +spec: + driver: "disk" + config: + path: "/tmp/violations" + maxAuditResults: 3 diff --git a/config/samples/status_v1beta1_connectionpodstatus.yaml b/config/samples/status_v1beta1_connectionpodstatus.yaml new file mode 100644 index 00000000000..7b30fecb229 --- /dev/null +++ b/config/samples/status_v1beta1_connectionpodstatus.yaml @@ -0,0 +1,7 @@ +apiVersion: status.gatekeeper.sh/v1beta1 +kind: ConnectionPodStatus +metadata: + name: connectionpodstatus-sample +spec: + # Add fields here + foo: bar diff --git a/main.go b/main.go index b9546bc4d8b..c0d1f7a64a8 100644 --- a/main.go +++ b/main.go @@ -38,6 +38,7 @@ import ( frameworksexternaldata "github.com/open-policy-agent/frameworks/constraint/pkg/externaldata" api "github.com/open-policy-agent/gatekeeper/v3/apis" configv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/config/v1alpha1" + connectionv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/connection/v1alpha1" expansionv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/expansion/v1alpha1" expansionv1beta1 "github.com/open-policy-agent/gatekeeper/v3/apis/expansion/v1beta1" mutationsv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/mutations/v1alpha1" @@ -133,6 +134,7 @@ func init() { _ = mutationsv1beta1.AddToScheme(scheme) _ = expansionv1alpha1.AddToScheme(scheme) _ = expansionv1beta1.AddToScheme(scheme) + _ = connectionv1alpha1.AddToScheme(scheme) // +kubebuilder:scaffold:scheme flag.Var(disabledBuiltins, "disable-opa-builtin", "disable opa built-in function, this flag can be declared more than once.") @@ -580,6 +582,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, tracker *readiness. CacheLister: auditCache, ExpansionSystem: expansionSystem, ExportSystem: exportSystem, + GetPod: opts.GetPod, } if err := audit.AddToManager(mgr, &auditDeps); err != nil { setupLog.Error(err, "unable to register audit with the manager") diff --git a/manifest_staging/charts/gatekeeper/crds/connection-customresourcedefinition.yaml b/manifest_staging/charts/gatekeeper/crds/connection-customresourcedefinition.yaml new file mode 100644 index 00000000000..b69ed3f12af --- /dev/null +++ b/manifest_staging/charts/gatekeeper/crds/connection-customresourcedefinition.yaml @@ -0,0 +1,99 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.14.0 + labels: + gatekeeper.sh/system: "yes" + name: connections.connection.gatekeeper.sh +spec: + group: connection.gatekeeper.sh + names: + kind: Connection + listKind: ConnectionList + plural: connections + singular: connection + preserveUnknownFields: false + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: Connection is the Schema for the connections API + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: ConnectionSpec defines the desired state of Connection + properties: + config: + x-kubernetes-preserve-unknown-fields: true + driver: + description: Driver is the name of one of the expected drivers i.e. dapr, disk + type: string + required: + - config + - driver + type: object + status: + description: ConnectionStatus defines the observed state of Connection + properties: + byPod: + items: + description: ConnectionPodStatusStatus defines the observed state of ConnectionPodStatus + properties: + active: + description: Indicator for alive connection with at least one successful publish + type: boolean + connectionUID: + description: |- + UID is a type that holds unique ID values, including UUIDs. Because we + don't ONLY use UUIDs, this is an alias to string. Being a type captures + intent and helps make sure that UIDs and names do not get conflated. + type: string + errors: + items: + properties: + message: + type: string + type: + type: string + required: + - message + - type + type: object + type: array + id: + description: ID is the unique identifier for the pod that wrote the status + type: string + observedGeneration: + format: int64 + type: integer + operations: + items: + type: string + type: array + type: object + type: array + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/manifest_staging/charts/gatekeeper/crds/connectionpodstatus-customresourcedefinition.yaml b/manifest_staging/charts/gatekeeper/crds/connectionpodstatus-customresourcedefinition.yaml new file mode 100644 index 00000000000..de6f25f583e --- /dev/null +++ b/manifest_staging/charts/gatekeeper/crds/connectionpodstatus-customresourcedefinition.yaml @@ -0,0 +1,79 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.14.0 + labels: + gatekeeper.sh/system: "yes" + name: connectionpodstatuses.status.gatekeeper.sh +spec: + group: status.gatekeeper.sh + names: + kind: ConnectionPodStatus + listKind: ConnectionPodStatusList + plural: connectionpodstatuses + singular: connectionpodstatus + preserveUnknownFields: false + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: ConnectionPodStatus is the Schema for the connectionpodstatuses API + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + status: + description: No spec field is defined here, as this is a status-only resource. + properties: + active: + description: Indicator for alive connection with at least one successful publish + type: boolean + connectionUID: + description: |- + UID is a type that holds unique ID values, including UUIDs. Because we + don't ONLY use UUIDs, this is an alias to string. Being a type captures + intent and helps make sure that UIDs and names do not get conflated. + type: string + errors: + items: + properties: + message: + type: string + type: + type: string + required: + - message + - type + type: object + type: array + id: + description: ID is the unique identifier for the pod that wrote the status + type: string + observedGeneration: + format: int64 + type: integer + operations: + items: + type: string + type: array + type: object + type: object + served: true + storage: true diff --git a/manifest_staging/charts/gatekeeper/templates/gatekeeper-audit-violation-export-config.yaml b/manifest_staging/charts/gatekeeper/templates/gatekeeper-audit-violation-export-connection.yaml similarity index 54% rename from manifest_staging/charts/gatekeeper/templates/gatekeeper-audit-violation-export-config.yaml rename to manifest_staging/charts/gatekeeper/templates/gatekeeper-audit-violation-export-connection.yaml index 08244a8bb50..9109db9e0e2 100644 --- a/manifest_staging/charts/gatekeeper/templates/gatekeeper-audit-violation-export-config.yaml +++ b/manifest_staging/charts/gatekeeper/templates/gatekeeper-audit-violation-export-connection.yaml @@ -1,15 +1,13 @@ --- {{- if and (.Values.enableViolationExport) (eq (.Values.exportBackend | default "" | lower) "disk") }} -apiVersion: v1 -kind: ConfigMap +apiVersion: connection.gatekeeper.sh/v1alpha1 +kind: Connection metadata: name: '{{ .Values.audit.connection }}' namespace: '{{ .Release.Namespace }}' -data: +spec: driver: '{{ .Values.exportBackend }}' - config: | - { - "path": "{{ .Values.audit.exportVolumeMount.path }}", - "maxAuditResults": {{ .Values.audit.exportConfig.maxAuditResults }} - } + config: + path: "{{ .Values.audit.exportVolumeMount.path }}" + maxAuditResults: {{ .Values.audit.exportConfig.maxAuditResults }} {{- end }} diff --git a/manifest_staging/charts/gatekeeper/templates/gatekeeper-manager-role-clusterrole.yaml b/manifest_staging/charts/gatekeeper/templates/gatekeeper-manager-role-clusterrole.yaml index b0697398631..a95f1538a8e 100644 --- a/manifest_staging/charts/gatekeeper/templates/gatekeeper-manager-role-clusterrole.yaml +++ b/manifest_staging/charts/gatekeeper/templates/gatekeeper-manager-role-clusterrole.yaml @@ -95,6 +95,18 @@ rules: - get - patch - update +- apiGroups: + - connection.gatekeeper.sh + resources: + - '*' + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - constraints.gatekeeper.sh resources: diff --git a/manifest_staging/deploy/gatekeeper.yaml b/manifest_staging/deploy/gatekeeper.yaml index cccbf3fbffb..82fa4a11d83 100644 --- a/manifest_staging/deploy/gatekeeper.yaml +++ b/manifest_staging/deploy/gatekeeper.yaml @@ -2611,6 +2611,184 @@ spec: --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.14.0 + labels: + gatekeeper.sh/system: "yes" + name: connectionpodstatuses.status.gatekeeper.sh +spec: + group: status.gatekeeper.sh + names: + kind: ConnectionPodStatus + listKind: ConnectionPodStatusList + plural: connectionpodstatuses + singular: connectionpodstatus + preserveUnknownFields: false + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: ConnectionPodStatus is the Schema for the connectionpodstatuses API + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + status: + description: No spec field is defined here, as this is a status-only resource. + properties: + active: + description: Indicator for alive connection with at least one successful publish + type: boolean + connectionUID: + description: |- + UID is a type that holds unique ID values, including UUIDs. Because we + don't ONLY use UUIDs, this is an alias to string. Being a type captures + intent and helps make sure that UIDs and names do not get conflated. + type: string + errors: + items: + properties: + message: + type: string + type: + type: string + required: + - message + - type + type: object + type: array + id: + description: ID is the unique identifier for the pod that wrote the status + type: string + observedGeneration: + format: int64 + type: integer + operations: + items: + type: string + type: array + type: object + type: object + served: true + storage: true +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.14.0 + labels: + gatekeeper.sh/system: "yes" + name: connections.connection.gatekeeper.sh +spec: + group: connection.gatekeeper.sh + names: + kind: Connection + listKind: ConnectionList + plural: connections + singular: connection + preserveUnknownFields: false + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: Connection is the Schema for the connections API + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: ConnectionSpec defines the desired state of Connection + properties: + config: + x-kubernetes-preserve-unknown-fields: true + driver: + description: Driver is the name of one of the expected drivers i.e. dapr, disk + type: string + required: + - config + - driver + type: object + status: + description: ConnectionStatus defines the observed state of Connection + properties: + byPod: + items: + description: ConnectionPodStatusStatus defines the observed state of ConnectionPodStatus + properties: + active: + description: Indicator for alive connection with at least one successful publish + type: boolean + connectionUID: + description: |- + UID is a type that holds unique ID values, including UUIDs. Because we + don't ONLY use UUIDs, this is an alias to string. Being a type captures + intent and helps make sure that UIDs and names do not get conflated. + type: string + errors: + items: + properties: + message: + type: string + type: + type: string + required: + - message + - type + type: object + type: array + id: + description: ID is the unique identifier for the pod that wrote the status + type: string + observedGeneration: + format: int64 + type: integer + operations: + items: + type: string + type: array + type: object + type: array + type: object + type: object + served: true + storage: true + subresources: + status: {} +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition metadata: annotations: controller-gen.kubebuilder.io/version: v0.14.0 @@ -4887,6 +5065,18 @@ rules: - get - patch - update +- apiGroups: + - connection.gatekeeper.sh + resources: + - '*' + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - constraints.gatekeeper.sh resources: diff --git a/pkg/audit/controller.go b/pkg/audit/controller.go index d01b9bf9bca..3897f08c739 100644 --- a/pkg/audit/controller.go +++ b/pkg/audit/controller.go @@ -13,10 +13,13 @@ limitations under the License. package audit import ( + "context" + constraintclient "github.com/open-policy-agent/frameworks/constraint/pkg/client" "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" "github.com/open-policy-agent/gatekeeper/v3/pkg/expansion" "github.com/open-policy-agent/gatekeeper/v3/pkg/export" + corev1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/manager" ) @@ -26,6 +29,7 @@ type Dependencies struct { CacheLister *CacheLister ExpansionSystem *expansion.System ExportSystem *export.System + GetPod func(context.Context) (*corev1.Pod, error) } // AddToManager adds audit manager to the Manager. diff --git a/pkg/audit/manager.go b/pkg/audit/manager.go index e3ef3e28dc5..caa556f270e 100644 --- a/pkg/audit/manager.go +++ b/pkg/audit/manager.go @@ -17,6 +17,7 @@ import ( "github.com/go-logr/logr" constraintclient "github.com/open-policy-agent/frameworks/constraint/pkg/client" "github.com/open-policy-agent/frameworks/constraint/pkg/client/reviews" + statusv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1alpha1" "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" exportController "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/export" "github.com/open-policy-agent/gatekeeper/v3/pkg/expansion" @@ -55,8 +56,6 @@ const ( defaultConstraintViolationsLimit = 20 defaultListLimit = 500 defaultAPICacheDir = "/tmp/audit" - defaultConnection = "audit-connection" - defaultChannel = "audit-channel" ) var ( @@ -68,8 +67,6 @@ var ( auditEventsInvolvedNamespace = flag.Bool("audit-events-involved-namespace", false, "emit audit events for each violation in the involved objects namespace, the default (false) generates events in the namespace Gatekeeper is installed in. Audit events from cluster-scoped resources will still follow the default behavior") auditMatchKindOnly = flag.Bool("audit-match-kind-only", false, "only use kinds specified in all constraints for auditing cluster resources. if kind is not specified in any of the constraints, it will audit all resources (same as setting this flag to false)") apiCacheDir = flag.String("api-cache-dir", defaultAPICacheDir, "The directory where audit from api server cache are stored, defaults to /tmp/audit") - auditConnection = flag.String("audit-connection", defaultConnection, "(alpha) Connection name for exporting audit violation messages. Defaults to audit-connection") - auditChannel = flag.String("audit-channel", defaultChannel, "(alpha) Channel name for exporting audit violation messages. Defaults to audit-channel") emptyAuditResults = newLimitQueue(0) logStatsAudit = flag.Bool("log-stats-audit", false, "(alpha) log stats metrics for the audit run") ) @@ -93,6 +90,9 @@ type Manager struct { expansionSystem *expansion.System exportSystem *export.System + + // returns the running pod injected by the main controller + getPod func(context.Context) (*corev1.Pod, error) } // StatusViolation represents each violation under status. @@ -249,6 +249,7 @@ func New(mgr manager.Manager, deps *Dependencies) (*Manager, error) { auditCache: deps.CacheLister, expansionSystem: deps.ExpansionSystem, exportSystem: deps.ExportSystem, + getPod: deps.GetPod, } return am, nil } @@ -259,11 +260,16 @@ func (am *Manager) audit(ctx context.Context) error { timestamp := startTime.UTC().Format(time.RFC3339) am.log = log.WithValues(logging.AuditID, timestamp) logStart(am.log) - exportErrorMap := make(map[string]error) - if *exportController.ExportEnabled { - if err := am.exportSystem.Publish(context.Background(), *auditConnection, *auditChannel, exportutil.ExportMsg{Message: exportutil.AuditStartedMsg, ID: timestamp}); err != nil { + auditExportPublishingState := auditExportPublishingState{ + SuccessCount: 0, + Errors: make(map[string]error), + } + if *exportutil.ExportEnabled { + if err := am.exportSystem.Publish(context.Background(), *exportutil.AuditConnection, *exportutil.AuditChannel, exportutil.ExportMsg{Message: exportutil.AuditStartedMsg, ID: timestamp}); err != nil { am.log.Error(err, "failed to export audit start message") - exportErrorMap[strings.Split(err.Error(), ":")[0]] = err + auditExportPublishingState.Errors[strings.Split(err.Error(), ":")[0]] = err + } else { + auditExportPublishingState.SuccessCount++ } } // record audit latency @@ -277,13 +283,15 @@ func (am *Manager) audit(ctx context.Context) error { if err := am.reporter.reportRunEnd(endTime); err != nil { am.log.Error(err, "failed to report run end time") } - if *exportController.ExportEnabled { - if err := am.exportSystem.Publish(context.Background(), *auditConnection, *auditChannel, exportutil.ExportMsg{Message: exportutil.AuditCompletedMsg, ID: timestamp}); err != nil { - exportErrorMap[strings.Split(err.Error(), ":")[0]] = err + if *exportutil.ExportEnabled { + if err := am.exportSystem.Publish(context.Background(), *exportutil.AuditConnection, *exportutil.AuditChannel, exportutil.ExportMsg{Message: exportutil.AuditCompletedMsg, ID: timestamp}); err != nil { + am.log.Error(err, "failed to export audit end message") + auditExportPublishingState.Errors[strings.Split(err.Error(), ":")[0]] = err + } else { + auditExportPublishingState.SuccessCount++ } - } - for _, v := range exportErrorMap { - am.log.Error(v, "failed to export audit violation") + // At the end of the Audit update the Connection status with any errors collected during publishing + reportExportConnectionErrors(ctx, auditExportPublishingState, am.log, am.mgr.GetClient(), am.mgr.GetScheme(), am.getPod) } }() @@ -328,10 +336,10 @@ func (am *Manager) audit(ctx context.Context) error { am.log.Error(err, "Auditing") } - am.addAuditResponsesToUpdateLists(updateLists, res, totalViolationsPerConstraint, totalViolationsPerEnforcementAction, timestamp, exportErrorMap) + am.addAuditResponsesToUpdateLists(updateLists, res, totalViolationsPerConstraint, totalViolationsPerEnforcementAction, timestamp, auditExportPublishingState) } else { am.log.Info("Auditing via discovery client") - err := am.auditResources(ctx, constraintsGVKs, updateLists, totalViolationsPerConstraint, totalViolationsPerEnforcementAction, timestamp, exportErrorMap) + err := am.auditResources(ctx, constraintsGVKs, updateLists, totalViolationsPerConstraint, totalViolationsPerEnforcementAction, timestamp, auditExportPublishingState) if err != nil { return err } @@ -365,7 +373,7 @@ func (am *Manager) auditResources( totalViolationsPerConstraint map[util.KindVersionName]int64, totalViolationsPerEnforcementAction map[util.EnforcementAction]int64, timestamp string, - exportErrorMap map[string]error, + auditExportPublishingState auditExportPublishingState, ) error { // delete all from cache dir before starting audit err := am.removeAllFromDir(*apiCacheDir, *auditChunkSize) @@ -553,7 +561,7 @@ func (am *Manager) auditResources( } // Loop through all subDirs to review all files for this kind. am.log.V(logging.DebugLevel).Info("Reviewing objects for GVK", "group", gv.Group, "version", gv.Version, "kind", kind) - err = am.reviewObjects(ctx, kind, folderCount, namespaceCache, updateLists, totalViolationsPerConstraint, totalViolationsPerEnforcementAction, timestamp, exportErrorMap) + err = am.reviewObjects(ctx, kind, folderCount, namespaceCache, updateLists, totalViolationsPerConstraint, totalViolationsPerEnforcementAction, timestamp, auditExportPublishingState) if err != nil { errs = append(errs, err) continue @@ -655,7 +663,7 @@ func (am *Manager) reviewObjects(ctx context.Context, kind string, folderCount i totalViolationsPerConstraint map[util.KindVersionName]int64, totalViolationsPerEnforcementAction map[util.EnforcementAction]int64, timestamp string, - exportErrorMap map[string]error, + auditExportPublishingState auditExportPublishingState, ) error { for i := 0; i < folderCount; i++ { // cache directory structure: @@ -740,7 +748,7 @@ func (am *Manager) reviewObjects(ctx context.Context, kind string, folderCount i if len(resp.Results()) > 0 { results := ToResults(&augmentedObj.Object, resp) - am.addAuditResponsesToUpdateLists(updateLists, results, totalViolationsPerConstraint, totalViolationsPerEnforcementAction, timestamp, exportErrorMap) + am.addAuditResponsesToUpdateLists(updateLists, results, totalViolationsPerConstraint, totalViolationsPerEnforcementAction, timestamp, auditExportPublishingState) } } } @@ -860,7 +868,7 @@ func (am *Manager) addAuditResponsesToUpdateLists( totalViolationsPerConstraint map[util.KindVersionName]int64, totalViolationsPerEnforcementAction map[util.EnforcementAction]int64, timestamp string, - exportErrorMap map[string]error, + auditExportPublishingState auditExportPublishingState, ) { for _, r := range res { constraint := r.Constraint @@ -899,10 +907,11 @@ func (am *Manager) addAuditResponsesToUpdateLists( details := r.Metadata["details"] labels := r.obj.GetLabels() logViolation(am.log, constraint, ea, r.ScopedEnforcementActions, gvk, namespace, name, msg, details, labels) - if *exportController.ExportEnabled { - err := am.exportSystem.Publish(context.Background(), *auditConnection, *auditChannel, violationMsg(constraint, ea, r.ScopedEnforcementActions, gvk, namespace, name, msg, details, labels, timestamp)) - if err != nil { - exportErrorMap[strings.Split(err.Error(), ":")[0]] = err + if *exportutil.ExportEnabled { + if err := am.exportSystem.Publish(context.Background(), *exportutil.AuditConnection, *exportutil.AuditChannel, violationMsg(constraint, ea, r.ScopedEnforcementActions, gvk, namespace, name, msg, details, labels, timestamp)); err != nil { + auditExportPublishingState.Errors[strings.Split(err.Error(), ":")[0]] = err + } else { + auditExportPublishingState.SuccessCount++ } } if *emitAuditEvents { @@ -1273,3 +1282,34 @@ func mergeErrors(errs []error) error { } return errors.New(sb.String()) } + +type auditExportPublishingState struct { + SuccessCount int + Errors map[string]error +} + +// Write the export errors to the ConnectionPodStatus. +func reportExportConnectionErrors( + ctx context.Context, + auditExportPublishingState auditExportPublishingState, + logger logr.Logger, + client client.Client, + scheme *runtime.Scheme, + getPod func(context.Context) (*corev1.Pod, error), +) { + exportErrors := []*statusv1alpha1.ConnectionError{} + for staticErrMsg, v := range auditExportPublishingState.Errors { + logger.Error(v, "failed to export audit violation") + exportErrors = append(exportErrors, &statusv1alpha1.ConnectionError{ + Type: statusv1alpha1.PublishError, + Message: staticErrMsg, + }) + } + + // Connection is considered active if there were any successful publishes + activeConnection := auditExportPublishingState.SuccessCount > 0 + + if err := exportController.UpdateOrCreateConnectionPodStatus(ctx, client, client, scheme, *exportutil.AuditConnection, exportErrors, &activeConnection, getPod); err != nil { + logger.Error(err, "failed to write export errors to the connection pod status") + } +} diff --git a/pkg/audit/manager_test.go b/pkg/audit/manager_test.go index 7f108722260..eb08c100306 100644 --- a/pkg/audit/manager_test.go +++ b/pkg/audit/manager_test.go @@ -3,15 +3,24 @@ package audit import ( "container/heap" "context" + "flag" "os" "reflect" "testing" + "github.com/go-logr/logr" + "github.com/onsi/gomega" constraintclient "github.com/open-policy-agent/frameworks/constraint/pkg/client" "github.com/open-policy-agent/frameworks/constraint/pkg/client/drivers/rego" + "github.com/open-policy-agent/gatekeeper/v3/apis" configv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/config/v1alpha1" + connectionv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/connection/v1alpha1" + statusv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1alpha1" "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export/disk" + exportutil "github.com/open-policy-agent/gatekeeper/v3/pkg/export/util" "github.com/open-policy-agent/gatekeeper/v3/pkg/fakes" + anythingtypes "github.com/open-policy-agent/gatekeeper/v3/pkg/mutation/types" "github.com/open-policy-agent/gatekeeper/v3/pkg/target" "github.com/open-policy-agent/gatekeeper/v3/pkg/util" "github.com/open-policy-agent/gatekeeper/v3/pkg/wildcard" @@ -22,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) @@ -615,3 +625,112 @@ func Test_readUnstructured(t *testing.T) { } }) } + +func Test_reportExportConnectionErrors(t *testing.T) { + // Setup + require.NoError(t, flag.CommandLine.Parse([]string{"--enable-violation-export", "true"})) + g := gomega.NewGomegaWithT(t) + getPod := func(_ context.Context) (*corev1.Pod, error) { + pod := fakes.Pod(fakes.WithNamespace("gatekeeper-system"), fakes.WithName("no-pod")) + return pod, nil + } + pod, _ := getPod(context.Background()) + + tests := []struct { + name string + successCount int + errorsMap map[string]error + wantActiveConn bool + wantLogMsgs []string + }{ + { + name: "no errors, no successes", + successCount: 0, + errorsMap: map[string]error{}, + wantActiveConn: false, + }, + { + name: "some errors, no successes", + successCount: 0, + errorsMap: map[string]error{ + "static err 1": errors.New("export error thrown 1"), + "static err 2": errors.New("export error thrown 2"), + }, + wantActiveConn: false, + }, + { + name: "some errors, some successes", + successCount: 2, + errorsMap: map[string]error{ + "static err 1": errors.New("export error thrown 1"), + }, + wantActiveConn: true, + }, + { + name: "no errors, some successes", + successCount: 1, + errorsMap: map[string]error{}, + wantActiveConn: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(_ *testing.T) { + if err := apis.AddToScheme(scheme.Scheme); err != nil { + g.Expect(err).ToNot(gomega.HaveOccurred(), "Failed to add scheme") + } + + auditExportPublishingState := auditExportPublishingState{ + SuccessCount: test.successCount, + Errors: test.errorsMap, + } + + client := fake.NewClientBuilder().WithScheme(scheme.Scheme).Build() + + // Create Connection object for setup + connObj := connectionv1alpha1.Connection{ + ObjectMeta: metav1.ObjectMeta{ + Name: *exportutil.AuditConnection, + Namespace: util.GetNamespace(), + }, + Spec: connectionv1alpha1.ConnectionSpec{ + Driver: disk.Name, + Config: &anythingtypes.Anything{Value: map[string]interface{}{ + "path": "value", + "maxAuditResults": float64(3), + }}, + }, + } + + g.Expect(client.Create(context.Background(), &connObj)).Should(gomega.Succeed(), "Failed to create Connection object") + + // Validate the operation is idempotent by re-running + for i := 0; i < 2; i++ { + reportExportConnectionErrors(context.Background(), auditExportPublishingState, logr.Logger{}, client, scheme.Scheme, getPod) + + // Await the ConnectionPodStatus + connPodStatusName, _ := statusv1alpha1.KeyForConnection(pod.Name, connObj.Namespace, connObj.Name) + var connPodStatus statusv1alpha1.ConnectionPodStatus + g.Eventually(func(g gomega.Gomega) { + g.Expect(client.Get(context.Background(), types.NamespacedName{ + Namespace: util.GetNamespace(), + Name: connPodStatusName, + }, &connPodStatus)).Should(gomega.Succeed(), "Status should exist after creation") + }).Should(gomega.Succeed()) + + // Assert the ConnectionPodStatus expected + g.Expect(connPodStatus.Status.Active).To(gomega.Equal(test.wantActiveConn), "Active status unexpected") + g.Expect(len(connPodStatus.Status.Errors)).To(gomega.Equal(len(test.errorsMap)), "Length of errors unexpected") + expected := make([]*statusv1alpha1.ConnectionError, 0, len(test.errorsMap)) + for key := range test.errorsMap { + expected = append(expected, &statusv1alpha1.ConnectionError{ + Type: statusv1alpha1.PublishError, + Message: key, + }) + } + + g.Expect(connPodStatus.Status.Errors).To(gomega.ConsistOf(expected), "Error slice unexpected") + } + }) + } +} diff --git a/pkg/controller/add_connectionstatus.go b/pkg/controller/add_connectionstatus.go new file mode 100644 index 00000000000..f70ab5ea549 --- /dev/null +++ b/pkg/controller/add_connectionstatus.go @@ -0,0 +1,24 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/connectionstatus" +) + +func init() { + Injectors = append(Injectors, &connectionstatus.Adder{}) +} diff --git a/pkg/controller/connectionstatus/connectionstatus_controller.go b/pkg/controller/connectionstatus/connectionstatus_controller.go new file mode 100644 index 00000000000..ebc58a59d78 --- /dev/null +++ b/pkg/controller/connectionstatus/connectionstatus_controller.go @@ -0,0 +1,237 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package connectionstatus + +import ( + "context" + "fmt" + "sort" + + "github.com/go-logr/logr" + "github.com/open-policy-agent/gatekeeper/v3/apis/connection/v1alpha1" + statusv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1alpha1" + statusv1beta1 "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1beta1" + "github.com/open-policy-agent/gatekeeper/v3/pkg/logging" + "github.com/open-policy-agent/gatekeeper/v3/pkg/operations" + "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" + "github.com/open-policy-agent/gatekeeper/v3/pkg/util" + "github.com/open-policy-agent/gatekeeper/v3/pkg/watch" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +var log = logf.Log.WithName("controller").WithValues(logging.Process, "connection_status_controller") + +type Adder struct { + WatchManager *watch.Manager +} + +func (a *Adder) InjectTracker(_ *readiness.Tracker) {} + +// Add creates a new connection Status Controller and adds it to the Manager. The Manager will set fields on the Controller +// and Start it when the Manager is Started. +func (a *Adder) Add(mgr manager.Manager) error { + if !operations.IsAssigned(operations.Status) { + return nil + } + r := newReconciler(mgr) + return add(mgr, r) +} + +// newReconciler returns a new reconcile.Reconciler. +func newReconciler(mgr manager.Manager) *ReconcileConnectionStatus { + return &ReconcileConnectionStatus{ + // Separate reader and writer because manager's default client bypasses the cache for unstructured resources. + writer: mgr.GetClient(), + statusClient: mgr.GetClient(), + reader: mgr.GetCache(), + scheme: mgr.GetScheme(), + log: log, + } +} + +// PodStatusToConnectionMapper correlates a ConnectionPodStatus with its corresponding Connection. +// `selfOnly` tells the mapper to only map statuses corresponding to the current pod. +func PodStatusToConnectionMapper(selfOnly bool) handler.TypedMapFunc[*statusv1alpha1.ConnectionPodStatus, reconcile.Request] { + return func(_ context.Context, obj *statusv1alpha1.ConnectionPodStatus) []reconcile.Request { + labels := obj.GetLabels() + connObjName, ok := labels[statusv1beta1.ConnectionNameLabel] + if !ok { + log.Error(fmt.Errorf("connection status resource with no mapping label: %s", obj.GetName()), "missing label while attempting to map a connection status resource") + return nil + } + if selfOnly { + pod, ok := labels[statusv1beta1.PodLabel] + if !ok { + log.Error(fmt.Errorf("connection status resource with no pod label: %s", obj.GetName()), "missing label while attempting to map a connection status resource") + } + // Do not attempt to reconcile the resource when other pods have changed their status + if pod != util.GetPodName() { + return nil + } + } + + return []reconcile.Request{{NamespacedName: types.NamespacedName{ + Name: connObjName, + Namespace: obj.Namespace, + }}} + } +} + +// Add creates a new connection status Controller and adds it to the Manager. The Manager will set fields on the Controller +// and Start it when the Manager is Started. +func add(mgr manager.Manager, r reconcile.Reconciler) error { + c, err := controller.New("connection-status-controller", mgr, controller.Options{Reconciler: r}) + if err != nil { + return err + } + + err = c.Watch( + source.Kind( + mgr.GetCache(), &statusv1alpha1.ConnectionPodStatus{}, + handler.TypedEnqueueRequestsFromMapFunc(PodStatusToConnectionMapper(false)), + predicate.TypedFuncs[*statusv1alpha1.ConnectionPodStatus]{ + CreateFunc: func(e event.TypedCreateEvent[*statusv1alpha1.ConnectionPodStatus]) bool { + return e.Object.GetNamespace() == util.GetNamespace() + }, + UpdateFunc: func(e event.TypedUpdateEvent[*statusv1alpha1.ConnectionPodStatus]) bool { + return e.ObjectNew.GetNamespace() == util.GetNamespace() + }, + DeleteFunc: func(e event.TypedDeleteEvent[*statusv1alpha1.ConnectionPodStatus]) bool { + return e.Object.GetNamespace() == util.GetNamespace() + }, + GenericFunc: func(e event.TypedGenericEvent[*statusv1alpha1.ConnectionPodStatus]) bool { + return e.Object.GetNamespace() == util.GetNamespace() + }, + }, + ), + ) + if err != nil { + return err + } + + err = c.Watch( + source.Kind( + mgr.GetCache(), &v1alpha1.Connection{}, + &handler.TypedEnqueueRequestForObject[*v1alpha1.Connection]{}, + predicate.TypedFuncs[*v1alpha1.Connection]{ + CreateFunc: func(e event.TypedCreateEvent[*v1alpha1.Connection]) bool { + return e.Object.GetNamespace() == util.GetNamespace() + }, + UpdateFunc: func(e event.TypedUpdateEvent[*v1alpha1.Connection]) bool { + return e.ObjectNew.GetNamespace() == util.GetNamespace() + }, + DeleteFunc: func(e event.TypedDeleteEvent[*v1alpha1.Connection]) bool { + return e.Object.GetNamespace() == util.GetNamespace() + }, + GenericFunc: func(e event.TypedGenericEvent[*v1alpha1.Connection]) bool { + return e.Object.GetNamespace() == util.GetNamespace() + }, + }, + ), + ) + if err != nil { + return err + } + + return nil +} + +var _ reconcile.Reconciler = &ReconcileConnectionStatus{} + +// ReconcileConnectionStatus provides the dependencies required to reconcile the status of a Connection resource. +type ReconcileConnectionStatus struct { + reader client.Reader + writer client.Writer + statusClient client.StatusClient + + scheme *runtime.Scheme + log logr.Logger +} + +// +kubebuilder:rbac:groups=connection.gatekeeper.sh,resources=*,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=status.gatekeeper.sh,resources=*,verbs=get;list;watch;create;update;patch;delete + +// Reconcile reads the state of the cluster for a Connection object and makes changes based on the ConnectionPodStatuses. +func (r *ReconcileConnectionStatus) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { + log.Info("Reconcile request", "namespace", request.Namespace, "name", request.Name) + + connObj := &v1alpha1.Connection{} + err := r.reader.Get(ctx, request.NamespacedName, connObj) + if err != nil { + // If the Connection does not exist then we are done + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } + return reconcile.Result{}, err + } + + sObjs := &statusv1alpha1.ConnectionPodStatusList{} + if err := r.reader.List( + ctx, + sObjs, + client.MatchingLabels{statusv1beta1.ConnectionNameLabel: request.Name}, + client.InNamespace(util.GetNamespace()), + ); err != nil { + return reconcile.Result{}, err + } + statusObjs := make(sortableStatuses, len(sObjs.Items)) + copy(statusObjs, sObjs.Items) + sort.Sort(statusObjs) + + var s []statusv1alpha1.ConnectionPodStatusStatus + + for i := range statusObjs { + // Don't report status if it's not for the correct object. This can happen + // if a watch gets interrupted, causing the status to be deleted out from underneath it + if statusObjs[i].Status.ConnectionUID != connObj.GetUID() { + continue + } + s = append(s, statusObjs[i].Status) + } + + connObj.Status.ByPod = s + + // Update the status of the Connection resource + if err := r.statusClient.Status().Update(ctx, connObj); err != nil { + return reconcile.Result{}, err + } + return reconcile.Result{}, nil +} + +type sortableStatuses []statusv1alpha1.ConnectionPodStatus + +func (s sortableStatuses) Len() int { + return len(s) +} + +func (s sortableStatuses) Less(i, j int) bool { + return s[i].Status.ID < s[j].Status.ID +} + +func (s sortableStatuses) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} diff --git a/pkg/controller/connectionstatus/connectionstatus_controller_suite_test.go b/pkg/controller/connectionstatus/connectionstatus_controller_suite_test.go new file mode 100644 index 00000000000..06ba74e6d04 --- /dev/null +++ b/pkg/controller/connectionstatus/connectionstatus_controller_suite_test.go @@ -0,0 +1,84 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package connectionstatus + +import ( + stdlog "log" + "os" + "path/filepath" + "testing" + + "github.com/open-policy-agent/gatekeeper/v3/apis" + "github.com/open-policy-agent/gatekeeper/v3/test/testutils" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +var cfg *rest.Config + +func TestMain(m *testing.M) { + testEnv := &envtest.Environment{ + CRDDirectoryPaths: []string{ + filepath.Join("..", "..", "..", "config", "crd", "bases"), + }, + ErrorIfCRDPathMissing: true, + } + // TODO(ritazh): remove when vap is GAed in k/k + args := testEnv.ControlPlane.GetAPIServer().Configure() + args.Append("runtime-config", "api/all=true") + args.Append("feature-gates", "ValidatingAdmissionPolicy=true") + + if err := apis.AddToScheme(scheme.Scheme); err != nil { + stdlog.Fatal(err) + } + + // Retrieve the first found binary directory to allow debugging tests from VS Code + if getFirstFoundEnvTestBinaryDir() != "" { + testEnv.BinaryAssetsDirectory = getFirstFoundEnvTestBinaryDir() + } + + var err error + if cfg, err = testEnv.Start(); err != nil { + stdlog.Fatal(err) + } + + if err := testutils.CreateGatekeeperNamespace(cfg); err != nil { + stdlog.Printf("creating namespace: %v", err) + } + + code := m.Run() + if err = testEnv.Stop(); err != nil { + stdlog.Printf("error while trying to stop server: %v", err) + } + os.Exit(code) +} + +func getFirstFoundEnvTestBinaryDir() string { + basePath := filepath.Join("..", "..", "..", ".tmp", "bin", "k8s") + entries, err := os.ReadDir(basePath) + if err != nil { + logf.Log.Error(err, "Failed to read directory", "path", basePath) + return "" + } + for _, entry := range entries { + if entry.IsDir() { + return filepath.Join(basePath, entry.Name()) + } + } + return "" +} diff --git a/pkg/controller/connectionstatus/connectionstatus_controller_test.go b/pkg/controller/connectionstatus/connectionstatus_controller_test.go new file mode 100644 index 00000000000..3b67f86d828 --- /dev/null +++ b/pkg/controller/connectionstatus/connectionstatus_controller_test.go @@ -0,0 +1,192 @@ +package connectionstatus + +import ( + "context" + "testing" + "time" + + "github.com/onsi/gomega" + connectionv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/connection/v1alpha1" + statusv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1alpha1" + statusv1beta1 "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1beta1" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export/disk" + exportutil "github.com/open-policy-agent/gatekeeper/v3/pkg/export/util" + "github.com/open-policy-agent/gatekeeper/v3/pkg/fakes" + anythingtypes "github.com/open-policy-agent/gatekeeper/v3/pkg/mutation/types" + "github.com/open-policy-agent/gatekeeper/v3/pkg/util" + testclient "github.com/open-policy-agent/gatekeeper/v3/test/clients" + "github.com/open-policy-agent/gatekeeper/v3/test/testutils" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// Test operations on Connection and ConnectionPodStatus handled by controller and reflected on Connection status +func TestReconcile_E2E(t *testing.T) { + // Setup + const timeout = time.Second * 20 + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + g := gomega.NewGomegaWithT(t) + mgr, _ := testutils.SetupManager(t, cfg) + k8sClient := testclient.NewRetryClient(mgr.GetClient()) + getPod := func(_ context.Context) (*corev1.Pod, error) { + pod := fakes.Pod(fakes.WithNamespace("gatekeeper-system"), fakes.WithName("no-pod")) + return pod, nil + } + pod, _ := getPod(ctx) + + t.Run("Reconcile called and updates Connection status", func(t *testing.T) { + connObj := connectionv1alpha1.Connection{ + ObjectMeta: metav1.ObjectMeta{ + Name: *exportutil.AuditConnection, + Namespace: util.GetNamespace(), + }, + Spec: connectionv1alpha1.ConnectionSpec{ + Driver: disk.Name, + Config: &anythingtypes.Anything{Value: map[string]interface{}{ + "path": "value", + "maxAuditResults": float64(3), + }}, + }, + } + typeConnectionNamespacedName := types.NamespacedName{ + Name: *exportutil.AuditConnection, + Namespace: util.GetNamespace(), + } + + // Wrap the controller Reconciler so it writes each request to a map when it is finished reconciling + originalReconciler := newReconciler(mgr) + wrappedReconciler, requests := testutils.SetupTestReconcile(originalReconciler) + // Register the controller with the manager + require.NoError(t, add(mgr, wrappedReconciler)) + // Start the manager and let it run in the background + testutils.StartManager(ctx, t, mgr) + + // Test setup - Create the connection object + g.Expect(k8sClient.Create(ctx, &connObj)).Should(gomega.Succeed()) + + // Await for the reconcile request to finish + g.Eventually(func() bool { + // Use the Connection object for Reconcile request because of the Connection mapper + expectedReq := reconcile.Request{NamespacedName: typeConnectionNamespacedName} + _, finished := requests.Load(expectedReq) + return finished + }).WithTimeout(timeout).Should(gomega.BeTrue()) + + // Connection object should now exist + connObj = connectionv1alpha1.Connection{} + g.Eventually(func(g gomega.Gomega) { + err := k8sClient.Get(ctx, typeConnectionNamespacedName, &connObj) + g.Expect(err).Should(gomega.BeNil()) + }).WithTimeout(timeout).Should(gomega.Succeed(), "Connection object should exist after creation") + + // Next create the ConnectionPodStatus object which should trigger the reconcile request + connPodStatusObjName, _ := statusv1alpha1.KeyForConnection(pod.Name, connObj.Namespace, connObj.Name) + typeConnectionPodStatusNamespacedName := types.NamespacedName{ + Name: connPodStatusObjName, + Namespace: util.GetNamespace(), + } + connPodStatusObj := statusv1alpha1.ConnectionPodStatus{ + ObjectMeta: metav1.ObjectMeta{ + Name: connPodStatusObjName, + Namespace: util.GetNamespace(), + Labels: map[string]string{ + statusv1beta1.ConnectionNameLabel: connObj.Name, + }, + }, + Status: statusv1alpha1.ConnectionPodStatusStatus{ + Active: false, + Errors: []*statusv1alpha1.ConnectionError{}, + ObservedGeneration: connObj.GetGeneration(), + ConnectionUID: connObj.GetUID(), + ID: pod.Name, + }, + } + + // Now create the connection pod status object which should trigger the reconcile request + g.Expect(k8sClient.Create(ctx, &connPodStatusObj)).Should(gomega.Succeed(), "Creating the connection pod status object should succeed") + + // Await for the reconcile request to finish + g.Eventually(func() bool { + // Use the Connection object for Reconcile request because of the Connection mapper + expectedReq := reconcile.Request{NamespacedName: typeConnectionNamespacedName} + _, finished := requests.Load(expectedReq) + return finished + }).WithTimeout(timeout).Should(gomega.BeTrue()) + + // Assert ConnectionPodStatus object creation + g.Eventually(func(g gomega.Gomega) { + err := k8sClient.Get(ctx, typeConnectionPodStatusNamespacedName, &connPodStatusObj) + g.Expect(err).Should(gomega.Succeed(), "Status should exist after creation") + g.Expect(connPodStatusObj.GetLabels()).Should(gomega.HaveKeyWithValue(statusv1beta1.ConnectionNameLabel, connObj.Name), "Status should have the correct connection name label") + g.Expect(connPodStatusObj.Status.Errors).Should(gomega.BeEmpty(), "Status should not have an error after creation") + g.Expect(connPodStatusObj.Status.ObservedGeneration).Should(gomega.Equal(connObj.GetGeneration()), "Observed generation should match the connection object generation") + g.Expect(connPodStatusObj.Status.ID).Should(gomega.Equal(pod.Name), "ID should match the pod name") + g.Expect(connPodStatusObj.Status.ConnectionUID).Should(gomega.Equal(connObj.GetUID()), "ConnectionPodStatus UID should match the connection object UID") + g.Expect(connPodStatusObj.Status.Active).Should(gomega.BeFalse(), "No publish operations have been performed yet, so active status should be false") + }).WithTimeout(timeout).Should(gomega.Succeed()) + + // Assert Connection object and its status + g.Eventually(func(g gomega.Gomega) { + err := k8sClient.Get(ctx, typeConnectionNamespacedName, &connObj) + g.Expect(err).Should(gomega.Succeed(), "Conn should exist after updating the connection object") + g.Expect(len(connObj.Status.ByPod)).Should(gomega.Equal(1), "Connection object status should have one entry") + g.Expect(connObj.Status.ByPod[0].Errors).Should(gomega.BeEmpty(), "Status should not have an error after updating the connection object") + g.Expect(connObj.Status.ByPod[0].ObservedGeneration).Should(gomega.Equal(connObj.GetGeneration()), "Observed generation should get updated to match the latest connection object generation after update") + g.Expect(connObj.Status.ByPod[0].ID).Should(gomega.Equal(pod.Name), "ID should still match the pod name after update") + g.Expect(connObj.Status.ByPod[0].ConnectionUID).Should(gomega.Equal(connObj.GetUID()), "ConnectionPodStatus UID should still match the connection object UID after update") + g.Expect(connObj.Status.ByPod[0].Active).Should(gomega.BeFalse(), "No publish operations have been performed yet, so active status should be false") + }).WithTimeout(timeout).Should(gomega.Succeed()) + + // Test Update of the Connection object + connObj.Spec.Config = &anythingtypes.Anything{Value: map[string]interface{}{ + "path": "new-value", + "maxAuditResults": float64(3), + }} + g.Expect(k8sClient.Update(ctx, &connObj)).Should(gomega.Succeed(), "Updating the Connection object should succeed") + + // Also update the ConnectionPodStatus object to reflect the new generation + connPodStatusObj.Status.ObservedGeneration = connObj.GetGeneration() + g.Expect(k8sClient.Update(ctx, &connPodStatusObj)).Should(gomega.Succeed(), "Updating the ConnectionPodStatus object should succeed") + + // Await for the reconcile request to finish + g.Eventually(func() bool { + expectedReq := reconcile.Request{NamespacedName: typeConnectionNamespacedName} + _, finished := requests.Load(expectedReq) + return finished + }).WithTimeout(timeout).Should(gomega.BeTrue()) + + // Assert Connection status after update + g.Eventually(func(g gomega.Gomega) { + err := k8sClient.Get(ctx, typeConnectionNamespacedName, &connObj) + g.Expect(err).Should(gomega.Succeed(), "Connection object should exist after updating") + g.Expect(len(connObj.Status.ByPod)).Should(gomega.Equal(1), "Connection object status should have one entry") + g.Expect(connObj.Status.ByPod[0].Errors).Should(gomega.BeEmpty(), "Status should not have an error after updating the Connection object") + g.Expect(connObj.Status.ByPod[0].ObservedGeneration).Should(gomega.Equal(connObj.GetGeneration()), "Observed generation should get updated to match the latest connection object generation after update") + g.Expect(connObj.Status.ByPod[0].ID).Should(gomega.Equal(pod.Name), "ID should still match the pod name after update") + g.Expect(connObj.Status.ByPod[0].ConnectionUID).Should(gomega.Equal(connObj.GetUID()), "ConnectionPodStatus UID should still match the Connection object UID after update") + g.Expect(connObj.Status.ByPod[0].Active).Should(gomega.BeFalse(), "No publish operations have been performed yet, so active status should be false") + }).WithTimeout(timeout).Should(gomega.Succeed()) + + // Test Delete of the Connection object + g.Expect(k8sClient.Delete(ctx, &connObj)).Should(gomega.Succeed(), "Deleting the connection object should succeed") + // If Connection object deleted the pod status not necessarily deleted it wil persist + g.Eventually(func() bool { + err := k8sClient.Get(ctx, typeConnectionPodStatusNamespacedName, &connPodStatusObj) + if err != nil && apierrors.IsNotFound(err) { + return true + } + return false + }).WithTimeout(timeout).Should(gomega.Equal(false), "Connection pod status object still exists even after Connection object deleted") + + // Cleanup the Connection and ConnectionPodStatus objects if they exist at the end + defer func() { + k8sClient.Delete(ctx, &connObj) // nolint:errcheck + k8sClient.Delete(ctx, &connPodStatusObj) // nolint:errcheck + }() + }) +} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index add363bfa49..9e9a51868d5 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -57,7 +57,7 @@ type GetPodInjector interface { } type ExportInjector interface { - InjectExportSystem(exportSystem *export.System) + InjectExportSystem(exportSystem export.Exporter) } type DataClientInjector interface { diff --git a/pkg/controller/export/export_config_controller.go b/pkg/controller/export/export_config_controller.go deleted file mode 100644 index 1e99f983536..00000000000 --- a/pkg/controller/export/export_config_controller.go +++ /dev/null @@ -1,133 +0,0 @@ -package export - -import ( - "context" - "encoding/json" - "flag" - "fmt" - "strings" - - "github.com/open-policy-agent/gatekeeper/v3/pkg/export" - "github.com/open-policy-agent/gatekeeper/v3/pkg/logging" - "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" - "github.com/open-policy-agent/gatekeeper/v3/pkg/util" - corev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/handler" - logf "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" -) - -var ( - ExportEnabled = flag.Bool("enable-violation-export", false, "(alpha) Enable exporting violations to external systems") - log = logf.Log.WithName("controller").WithValues(logging.Process, "export_controller") -) - -type Adder struct { - ExportSystem *export.System -} - -func (a *Adder) Add(mgr manager.Manager) error { - if !*ExportEnabled { - return nil - } - log.Info("Warning: Alpha flag enable-violation-export is set to true. This flag may change in the future.") - r := newReconciler(mgr, a.ExportSystem) - return add(mgr, r) -} - -func (a *Adder) InjectTracker(_ *readiness.Tracker) {} - -func (a *Adder) InjectExportSystem(exportSystem *export.System) { - a.ExportSystem = exportSystem -} - -type Reconciler struct { - client.Client - scheme *runtime.Scheme - system *export.System -} - -func newReconciler(mgr manager.Manager, system *export.System) *Reconciler { - return &Reconciler{ - Client: mgr.GetClient(), - scheme: mgr.GetScheme(), - system: system, - } -} - -func add(mgr manager.Manager, r reconcile.Reconciler) error { - c, err := controller.New("export-config-controller", mgr, controller.Options{Reconciler: r}) - if err != nil { - return err - } - - return c.Watch( - source.Kind(mgr.GetCache(), &corev1.ConfigMap{}, - &handler.TypedEnqueueRequestForObject[*corev1.ConfigMap]{}, - predicate.TypedFuncs[*corev1.ConfigMap]{ - CreateFunc: func(e event.TypedCreateEvent[*corev1.ConfigMap]) bool { - return e.Object.GetNamespace() == util.GetNamespace() - }, - UpdateFunc: func(e event.TypedUpdateEvent[*corev1.ConfigMap]) bool { - return e.ObjectNew.GetNamespace() == util.GetNamespace() - }, - DeleteFunc: func(e event.TypedDeleteEvent[*corev1.ConfigMap]) bool { - return e.Object.GetNamespace() == util.GetNamespace() - }, - GenericFunc: func(e event.TypedGenericEvent[*corev1.ConfigMap]) bool { - return e.Object.GetNamespace() == util.GetNamespace() - }, - }, - )) -} - -func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { - log.Info("Reconcile", "request", request, "namespace", request.Namespace, "name", request.Name) - - deleted := false - cfg := &corev1.ConfigMap{} - err := r.Get(ctx, request.NamespacedName, cfg) - if err != nil { - if !k8serrors.IsNotFound(err) { - return reconcile.Result{}, err - } - deleted = true - } - - if deleted { - err := r.system.CloseConnection(request.Name) - if err != nil { - return reconcile.Result{Requeue: true}, err - } - log.Info("removed connection", "name", request.Name) - return reconcile.Result{}, nil - } - - if len(cfg.Data) == 0 { - return reconcile.Result{}, fmt.Errorf("data missing in configmap %s, unable to configure exporter", request.NamespacedName) - } - if _, ok := cfg.Data["driver"]; !ok { - return reconcile.Result{}, fmt.Errorf("missing driver field in configmap %s, unable to configure exporter", request.NamespacedName) - } - var config interface{} - err = json.Unmarshal([]byte(cfg.Data["config"]), &config) - if err != nil { - return reconcile.Result{}, err - } - - err = r.system.UpsertConnection(ctx, config, request.Name, strings.ToLower(cfg.Data["driver"])) - if err != nil { - return reconcile.Result{}, err - } - - log.Info("Connection upsert successful", "name", request.Name, "driver", cfg.Data["driver"]) - return reconcile.Result{}, nil -} diff --git a/pkg/controller/export/export_config_controller_test.go b/pkg/controller/export/export_config_controller_test.go deleted file mode 100644 index 1a7d521b8ce..00000000000 --- a/pkg/controller/export/export_config_controller_test.go +++ /dev/null @@ -1,68 +0,0 @@ -package export - -import ( - "context" - "flag" - "fmt" - "testing" - - "github.com/open-policy-agent/gatekeeper/v3/pkg/export/dapr" - "github.com/open-policy-agent/gatekeeper/v3/pkg/util" - "github.com/stretchr/testify/assert" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client/fake" - "sigs.k8s.io/controller-runtime/pkg/reconcile" -) - -func TestReconcile(t *testing.T) { - // Create a fake client with some data - scheme := runtime.NewScheme() - err := corev1.AddToScheme(scheme) - if err != nil { - t.Fatalf("Unexpected error parsing flag: %v", err) - } - - err = flag.CommandLine.Parse([]string{"--enable-violation-export", "true"}) - if err != nil { - t.Fatalf("Unexpected error parsing flag: %v", err) - } - - request := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: util.GetNamespace(), Name: dapr.Name}} - - ctx := context.Background() - testCases := []struct { - name string - config *corev1.ConfigMap - wantErr bool - errorMsg string - }{ - { - name: "invalid configmap", - config: &corev1.ConfigMap{ - ObjectMeta: v1.ObjectMeta{ - Name: dapr.Name, - Namespace: util.GetNamespace(), - }, - }, - wantErr: true, - errorMsg: fmt.Sprintf("data missing in configmap %s, unable to configure exporter", request.NamespacedName), - }, - } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - client := fake.NewClientBuilder().WithScheme(scheme).WithObjects(tc.config).Build() - r := &Reconciler{ - Client: client, - scheme: scheme, - } - - _, err := r.Reconcile(ctx, request) - if tc.wantErr { - assert.Equal(t, err.Error(), tc.errorMsg) - } - }) - } -} diff --git a/pkg/controller/export/export_connection_controller.go b/pkg/controller/export/export_connection_controller.go new file mode 100644 index 00000000000..284e59ce2e8 --- /dev/null +++ b/pkg/controller/export/export_connection_controller.go @@ -0,0 +1,316 @@ +package export + +import ( + "context" + "fmt" + + connectionv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/connection/v1alpha1" + statusv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1alpha1" + "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/connectionstatus" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export" + exportutil "github.com/open-policy-agent/gatekeeper/v3/pkg/export/util" + "github.com/open-policy-agent/gatekeeper/v3/pkg/logging" + "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" + "github.com/open-policy-agent/gatekeeper/v3/pkg/util" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +var log = logf.Log.WithName("controller").WithValues(logging.Process, "export_controller") + +type Adder struct { + ExportSystem export.Exporter + // GetPod returns an instance of the currently running Gatekeeper pod + GetPod func(context.Context) (*corev1.Pod, error) +} + +func (a *Adder) Add(mgr manager.Manager) error { + r := newReconciler(mgr, a.ExportSystem, *exportutil.AuditConnection, a.GetPod) + if r == nil { + log.Info("Export functionality is disabled, skipping export connection controller setup") + return nil + } + return add(mgr, r) +} + +func (a *Adder) InjectTracker(_ *readiness.Tracker) {} + +func (a *Adder) InjectExportSystem(exportSystem export.Exporter) { + a.ExportSystem = exportSystem +} + +func (a *Adder) InjectGetPod(getPod func(ctx context.Context) (*corev1.Pod, error)) { + a.GetPod = getPod +} + +type Reconciler struct { + reader client.Reader + writer client.Writer + scheme *runtime.Scheme + system export.Exporter + // TODO: Refactor this once multiple connections are supported, for now this helps with injecting dependency for tests + auditConnectionName string + getPod func(context.Context) (*corev1.Pod, error) +} + +func newReconciler(mgr manager.Manager, system export.Exporter, auditConnectionName string, getPod func(context.Context) (*corev1.Pod, error)) *Reconciler { + if !*exportutil.ExportEnabled { + log.Info("Export is disabled via flag") + return nil + } + + log.Info("Warning: Alpha flag enable-violation-export is set to true. This flag may change in the future.") + + return &Reconciler{ + reader: mgr.GetCache(), + writer: mgr.GetClient(), + scheme: mgr.GetScheme(), + system: system, + auditConnectionName: auditConnectionName, + getPod: getPod, + } +} + +func add(mgr manager.Manager, r reconcile.Reconciler) error { + c, err := controller.New("export-connection-controller", mgr, controller.Options{Reconciler: r}) + if err != nil { + return err + } + err = c.Watch( + source.Kind( + mgr.GetCache(), &connectionv1alpha1.Connection{}, + &handler.TypedEnqueueRequestForObject[*connectionv1alpha1.Connection]{}, + predicate.TypedFuncs[*connectionv1alpha1.Connection]{ + CreateFunc: func(e event.TypedCreateEvent[*connectionv1alpha1.Connection]) bool { + return e.Object.GetNamespace() == util.GetNamespace() + }, + UpdateFunc: func(e event.TypedUpdateEvent[*connectionv1alpha1.Connection]) bool { + return e.ObjectNew.GetNamespace() == util.GetNamespace() + }, + DeleteFunc: func(e event.TypedDeleteEvent[*connectionv1alpha1.Connection]) bool { + return e.Object.GetNamespace() == util.GetNamespace() + }, + GenericFunc: func(e event.TypedGenericEvent[*connectionv1alpha1.Connection]) bool { + return e.Object.GetNamespace() == util.GetNamespace() + }, + }, + ), + ) + if err != nil { + return err + } + + err = c.Watch( + source.Kind( + mgr.GetCache(), &statusv1alpha1.ConnectionPodStatus{}, + handler.TypedEnqueueRequestsFromMapFunc(connectionstatus.PodStatusToConnectionMapper(true)), + predicate.TypedFuncs[*statusv1alpha1.ConnectionPodStatus]{ + CreateFunc: func(e event.TypedCreateEvent[*statusv1alpha1.ConnectionPodStatus]) bool { + return e.Object.GetNamespace() == util.GetNamespace() + }, + UpdateFunc: func(e event.TypedUpdateEvent[*statusv1alpha1.ConnectionPodStatus]) bool { + return e.ObjectNew.GetNamespace() == util.GetNamespace() + }, + DeleteFunc: func(e event.TypedDeleteEvent[*statusv1alpha1.ConnectionPodStatus]) bool { + return e.Object.GetNamespace() == util.GetNamespace() + }, + GenericFunc: func(e event.TypedGenericEvent[*statusv1alpha1.ConnectionPodStatus]) bool { + return e.Object.GetNamespace() == util.GetNamespace() + }, + }, + ), + ) + if err != nil { + return err + } + + return nil +} + +// +kubebuilder:rbac:groups=connection.gatekeeper.sh,resources=*,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=status.gatekeeper.sh,resources=*,verbs=get;list;watch;create;update;patch;delete +func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { + log.Info("Reconcile request", "namespace", request.Namespace, "name", request.Name) + + deleted := false + connObj := &connectionv1alpha1.Connection{} + err := r.reader.Get(ctx, request.NamespacedName, connObj) + if err != nil { + if !apierrors.IsNotFound(err) { + return reconcile.Result{}, err + } + deleted = true + } + + if deleted { + err := r.system.CloseConnection(request.Name) + if err != nil { + return reconcile.Result{Requeue: true}, deleteStatus(ctx, r.writer, request.Namespace, request.Name, r.getPod) + } + log.Info("removed connection", "name", request.Name) + return reconcile.Result{}, deleteStatus(ctx, r.writer, request.Namespace, request.Name, r.getPod) + } + + if request.Name != r.auditConnectionName { + err := fmt.Errorf("error unsupported connection name %s. Connection name should align with flag --audit-connection set or defaulted to '%s'", request.Name, r.auditConnectionName) + log.Error(err, "unsupported connection", "namespace", request.Namespace) + exportErrors := []*statusv1alpha1.ConnectionError{{Type: statusv1alpha1.UpsertConnectionError, Message: err.Error()}} + resetActiveConnection := false + return reconcile.Result{}, updateOrCreateConnectionPodStatus(ctx, r.reader, r.writer, r.scheme, connObj, exportErrors, &resetActiveConnection, r.getPod) + } + + err = r.system.UpsertConnection(ctx, connObj.Spec.Config.Value, request.Name, connObj.Spec.Driver) + if err != nil { + // Reset the active connection status to false if UpsertConnection fails + activeConnection := false + return reconcile.Result{Requeue: true}, updateOrCreateConnectionPodStatus(ctx, r.reader, r.writer, r.scheme, connObj, []*statusv1alpha1.ConnectionError{{Type: statusv1alpha1.UpsertConnectionError, Message: err.Error()}}, &activeConnection, r.getPod) + } + + log.Info("Connection upsert successful", "name", request.Name, "driver", connObj.Spec.Driver) + return reconcile.Result{}, updateOrCreateConnectionPodStatus(ctx, r.reader, r.writer, r.scheme, connObj, []*statusv1alpha1.ConnectionError{}, nil, r.getPod) +} + +func UpdateOrCreateConnectionPodStatus( + ctx context.Context, + reader client.Reader, + writer client.Writer, + scheme *runtime.Scheme, + connObjName string, + exportErrors []*statusv1alpha1.ConnectionError, + activeConnection *bool, + getPod func(context.Context) (*corev1.Pod, error), +) error { + // Since the caller from Audit won't have an incoming request + // use the connection name from the audit connection flag as the predetermined connection name + request := types.NamespacedName{ + Namespace: util.GetNamespace(), + Name: connObjName, + } + connObj := &connectionv1alpha1.Connection{} + err := reader.Get(ctx, request, connObj) + if err != nil { + return err + } + return updateOrCreateConnectionPodStatus(ctx, reader, writer, scheme, connObj, exportErrors, activeConnection, getPod) +} + +func updateOrCreateConnectionPodStatus(ctx context.Context, + reader client.Reader, + writer client.Writer, + scheme *runtime.Scheme, + connObj *connectionv1alpha1.Connection, + exportErrors []*statusv1alpha1.ConnectionError, + activeConnection *bool, + getPod func(context.Context) (*corev1.Pod, error), +) error { + pod, err := getPod(ctx) + if err != nil { + return fmt.Errorf("getting reconciler pod: %w", err) + } + + // Check if it exists already + statusNS := pod.Namespace + statusName, err := statusv1alpha1.KeyForConnection(pod.Name, connObj.GetNamespace(), connObj.GetName()) + if err != nil { + return fmt.Errorf("getting key for connection: %w", err) + } + shouldCreate := true + connPodStatusObj := &statusv1alpha1.ConnectionPodStatus{} + + err = reader.Get(ctx, types.NamespacedName{Namespace: statusNS, Name: statusName}, connPodStatusObj) + + existingActiveConnection := false + switch { + case err == nil: + shouldCreate = false + // ConnectionPodStatus object exists so get the existing active state + existingActiveConnection = connPodStatusObj.Status.Active + case apierrors.IsNotFound(err): + if connPodStatusObj, err = newConnectionPodStatus(scheme, pod, connObj); err != nil { + return fmt.Errorf("creating new connection connPodStatusObj: %w", err) + } + default: + return fmt.Errorf("getting connection object status in name %s, namespace %s: %w", connObj.GetName(), connObj.GetNamespace(), err) + } + + // nil indicates expected active Connection state is unknown by caller during Upsert + if activeConnection == nil && connPodStatusObj.Status.ObservedGeneration != connObj.GetGeneration() { + // Reset the active connection state when there are updates to the Connection object to ensure the active state is only true when the Publish succeeds for the current Connection + resetActiveConnection := false + activeConnection = &resetActiveConnection + } else if activeConnection == nil { + // Trust the existing object when the Connection hasn't change - since active can only be true when Publish succeeds, we don't want to potentially reset active state between every Audit causing thrashing + activeConnection = &existingActiveConnection + } + connPodStatusObj.Status.Active = *activeConnection + + // ObservedGeneration is used to track the generation of the Connection object + connPodStatusObj.Status.ObservedGeneration = connObj.GetGeneration() + + setStatusErrors(connPodStatusObj, exportErrors) + + if shouldCreate { + log.Info("Creating new ConnectionPodStatus object", "name", connPodStatusObj.GetName(), "active", connPodStatusObj.Status.Active) + return writer.Create(ctx, connPodStatusObj) + } + log.Info("Updating existing ConnectionPodStatus object", "name", connPodStatusObj.GetName(), "active", connPodStatusObj.Status.Active) + return writer.Update(ctx, connPodStatusObj) +} + +func deleteStatus(ctx context.Context, + writer client.Writer, + connectionNamespace string, + connectionName string, + getPod func(context.Context) (*corev1.Pod, error), +) error { + connPodStatusObj := &statusv1alpha1.ConnectionPodStatus{} + pod, err := getPod(ctx) + if err != nil { + return fmt.Errorf("getting reconciler pod: %w", err) + } + sName, err := statusv1alpha1.KeyForConnection(pod.Name, connectionNamespace, connectionName) + if err != nil { + return fmt.Errorf("getting key for connection: %w", err) + } + connPodStatusObj.SetName(sName) + connPodStatusObj.SetNamespace(util.GetNamespace()) + if err := writer.Delete(ctx, connPodStatusObj); err != nil && !apierrors.IsNotFound(err) { + return err + } + return nil +} + +func newConnectionPodStatus(scheme *runtime.Scheme, + pod *corev1.Pod, + connObj *connectionv1alpha1.Connection, +) (*statusv1alpha1.ConnectionPodStatus, error) { + connPodStatusObj, err := statusv1alpha1.NewConnectionStatusForPod(pod, connObj.GetNamespace(), connObj.GetName(), scheme) + if err != nil { + return nil, fmt.Errorf("creating status for pod: %w", err) + } + connPodStatusObj.Status.ConnectionUID = connObj.GetUID() + + return connPodStatusObj, nil +} + +func setStatusErrors( + connPodStatusObj *statusv1alpha1.ConnectionPodStatus, + exportErrors []*statusv1alpha1.ConnectionError, +) { + if len(exportErrors) == 0 { + connPodStatusObj.Status.Errors = nil + return + } + connPodStatusObj.Status.Errors = exportErrors +} diff --git a/pkg/controller/export/export_connection_controller_suite_test.go b/pkg/controller/export/export_connection_controller_suite_test.go new file mode 100644 index 00000000000..3266343e0fe --- /dev/null +++ b/pkg/controller/export/export_connection_controller_suite_test.go @@ -0,0 +1,89 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package export + +import ( + "flag" + stdlog "log" + "os" + "path/filepath" + "testing" + + "github.com/open-policy-agent/gatekeeper/v3/apis" + "github.com/open-policy-agent/gatekeeper/v3/test/testutils" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +var cfg *rest.Config + +func TestMain(m *testing.M) { + testEnv := &envtest.Environment{ + CRDDirectoryPaths: []string{ + filepath.Join("..", "..", "..", "config", "crd", "bases"), + }, + ErrorIfCRDPathMissing: true, + } + // TODO(ritazh): remove when vap is GAed in k/k + args := testEnv.ControlPlane.GetAPIServer().Configure() + args.Append("runtime-config", "api/all=true") + args.Append("feature-gates", "ValidatingAdmissionPolicy=true") + + if err := apis.AddToScheme(scheme.Scheme); err != nil { + stdlog.Fatal(err) + } + + // Retrieve the first found binary directory to allow debugging tests from VS Code + if getFirstFoundEnvTestBinaryDir() != "" { + testEnv.BinaryAssetsDirectory = getFirstFoundEnvTestBinaryDir() + } + + var err error + if cfg, err = testEnv.Start(); err != nil { + stdlog.Fatal(err) + } + + if err := testutils.CreateGatekeeperNamespace(cfg); err != nil { + stdlog.Printf("creating namespace: %v", err) + } + + if err := flag.CommandLine.Parse([]string{"--enable-violation-export", "true"}); err != nil { + stdlog.Fatal(err) + } + + code := m.Run() + if err = testEnv.Stop(); err != nil { + stdlog.Printf("error while trying to stop server: %v", err) + } + os.Exit(code) +} + +func getFirstFoundEnvTestBinaryDir() string { + basePath := filepath.Join("..", "..", "..", ".tmp", "bin", "k8s") + entries, err := os.ReadDir(basePath) + if err != nil { + logf.Log.Error(err, "Failed to read directory", "path", basePath) + return "" + } + for _, entry := range entries { + if entry.IsDir() { + return filepath.Join(basePath, entry.Name()) + } + } + return "" +} diff --git a/pkg/controller/export/export_connection_controller_test.go b/pkg/controller/export/export_connection_controller_test.go new file mode 100644 index 00000000000..c09a90ea587 --- /dev/null +++ b/pkg/controller/export/export_connection_controller_test.go @@ -0,0 +1,737 @@ +package export + +import ( + "context" + "flag" + "fmt" + "os" + "testing" + "time" + + "github.com/onsi/gomega" + connectionv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/connection/v1alpha1" + statusv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1alpha1" + statusv1beta1 "github.com/open-policy-agent/gatekeeper/v3/apis/status/v1beta1" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export" + "github.com/open-policy-agent/gatekeeper/v3/pkg/export/disk" + "github.com/open-policy-agent/gatekeeper/v3/pkg/fakes" + anythingtypes "github.com/open-policy-agent/gatekeeper/v3/pkg/mutation/types" + "github.com/open-policy-agent/gatekeeper/v3/pkg/util" + testclient "github.com/open-policy-agent/gatekeeper/v3/test/clients" + "github.com/open-policy-agent/gatekeeper/v3/test/testutils" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// Test notes - we use a separate audit connection name for each test to avoid race conditions +// between tests sharing the same testenv etcd objects that with the same audit connection name would otherwise cause conflicts + +const timeout = time.Second * 20 + +// Note: For this test we check the ConnectionPodStatus resource that is created +// by the controller, and not the Connection status itself, to isolate test boundaries +// since updating the Connection status is handled by a separate controller +func TestReconcile_E2E(t *testing.T) { + // Setup + auditConnectionName := "audit-connection-1" + auditConnectionNameFlag := fmt.Sprintf("--audit-connection=%s", auditConnectionName) + require.NoError(t, flag.CommandLine.Parse([]string{"--enable-violation-export=true", auditConnectionNameFlag}), "parsing flags") + + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + g := gomega.NewGomegaWithT(t) + mgr, _ := testutils.SetupManager(t, cfg) + k8sClient := testclient.NewRetryClient(mgr.GetClient()) + getPod := func(_ context.Context) (*corev1.Pod, error) { + pod := fakes.Pod(fakes.WithNamespace("gatekeeper-system"), fakes.WithName("no-pod")) + return pod, nil + } + // Wrap the controller Reconciler so it writes each request to a map when it is finished reconciling + originalReconciler := newReconciler(mgr, export.NewSystem(), auditConnectionName, getPod) + wrappedReconciler, requests := testutils.SetupTestReconcile(originalReconciler) + // Register the controller with the manager + require.NoError(t, add(mgr, wrappedReconciler)) + // Start the manager and let it run in the background + testutils.StartManager(ctx, t, mgr) + + t.Run("Reconcile called for new Connection create, then update, and finally delete, all with expected operations and ConnectionPodStatus updates", func(_ *testing.T) { + connObj := connectionv1alpha1.Connection{ + ObjectMeta: metav1.ObjectMeta{ + Name: auditConnectionName, + Namespace: util.GetNamespace(), + }, + Spec: connectionv1alpha1.ConnectionSpec{ + Driver: disk.Name, + Config: &anythingtypes.Anything{Value: map[string]interface{}{ + "path": "value", + "maxAuditResults": float64(3), + }}, + }, + } + typeConnectionNamespacedName := types.NamespacedName{ + Name: auditConnectionName, + Namespace: util.GetNamespace(), + } + + // Connection object should not exist at the beginning of the test + g.Expect(k8sClient.Get(ctx, typeConnectionNamespacedName, &connObj)).ShouldNot(gomega.Succeed(), "Resource should not exist before creation") + + // Test setup create the Connection object + g.Expect(k8sClient.Create(ctx, &connObj)).Should(gomega.Succeed()) + + // Await for the reconcile request to finish + g.Eventually(func() bool { + expectedReq := reconcile.Request{NamespacedName: typeConnectionNamespacedName} + _, finished := requests.Load(expectedReq) + return finished + }).WithTimeout(timeout).Should(gomega.BeTrue()) + + // Assert ConnectionPodStatus + connPodStatusObj := statusv1alpha1.ConnectionPodStatus{} + pod, _ := getPod(ctx) + connPodStatusName, _ := statusv1alpha1.KeyForConnection(pod.Name, connObj.Namespace, connObj.Name) + typeStatusNamespacedName := types.NamespacedName{ + Name: connPodStatusName, + Namespace: util.GetNamespace(), + } + var generationOnCreate int64 + g.Eventually(func(g gomega.Gomega) { + err := k8sClient.Get(ctx, typeStatusNamespacedName, &connPodStatusObj) + g.Expect(err).Should(gomega.Succeed(), "Status should exist after creation") + g.Expect(connPodStatusObj.GetLabels()).Should(gomega.HaveKeyWithValue(statusv1beta1.ConnectionNameLabel, connObj.Name), "Status should have the correct connection name label") + g.Expect(connPodStatusObj.Status.Errors).Should(gomega.BeEmpty(), "Status should not have an error after creation") + generationOnCreate = connObj.GetGeneration() + g.Expect(connPodStatusObj.Status.ObservedGeneration).Should(gomega.Equal(connObj.GetGeneration()), "Observed generation should match the connection object generation") + g.Expect(connPodStatusObj.Status.ID).Should(gomega.Equal(pod.Name), "ID should match the pod name") + g.Expect(connPodStatusObj.Status.ConnectionUID).Should(gomega.Equal(connObj.GetUID()), "ConnectionPodStatus UID should match the connection object UID") + g.Expect(connPodStatusObj.Status.Active).Should(gomega.BeFalse(), "No publish operations have been performed yet, so active status should be false") + }).WithTimeout(timeout).Should(gomega.Succeed()) + + // Test Update of the connection object + connObj.Spec.Config.Value = map[string]interface{}{ + "path": "new-value", + "maxAuditResults": float64(3), + } + // Set the status to active to simulate an update to the Connection when a Publish operation was already performed marking active true + connPodStatusObj.Status.Active = true + g.Expect(k8sClient.Update(ctx, &connObj)).Should(gomega.Succeed(), "Updating the connection object should succeed") + + // Await for the reconcile request to finish + g.Eventually(func() bool { + expectedReq := reconcile.Request{NamespacedName: typeConnectionNamespacedName} + _, finished := requests.Load(expectedReq) + return finished + }).WithTimeout(timeout).Should(gomega.BeTrue(), "Reconcile request should finish after updating the connection object") + + // Assert the Connection object after the Connection update + g.Eventually(func(g gomega.Gomega) { + // Get the latest connection object + err := k8sClient.Get(ctx, typeConnectionNamespacedName, &connObj) + g.Expect(err).Should(gomega.Succeed(), "Connection object should exist after update") + g.Expect(connObj.Spec.Config.Value).Should(gomega.Equal(map[string]interface{}{"path": "new-value", "maxAuditResults": float64(3)}), "Connection object should have the updated config value after update") + g.Expect(connObj.GetGeneration()).Should(gomega.Not(gomega.Equal(generationOnCreate)), "Connection object generation should have changed after update") + g.Expect(connObj.Status.ByPod).Should(gomega.BeNil(), "Connection object status should be nil after update, as the controller does not set it") + }).WithTimeout(timeout).Should(gomega.Succeed()) + + // Assert the ConnectionPodStatus after the Connection update + g.Eventually(func(g gomega.Gomega) { + err := k8sClient.Get(ctx, typeStatusNamespacedName, &connPodStatusObj) + g.Expect(err).Should(gomega.Succeed(), "Status should still exist after updating the connection object") + g.Expect(connPodStatusObj.GetLabels()).Should(gomega.HaveKeyWithValue(statusv1beta1.ConnectionNameLabel, connObj.Name), "Status should still have the correct Connection name label after update") + g.Expect(connPodStatusObj.Status.Errors).Should(gomega.BeEmpty(), "Status should not have an error after updating the connection object") + g.Expect(connPodStatusObj.Status.ObservedGeneration).Should(gomega.Equal(connObj.GetGeneration()), "Observed generation should get updated to match the latest Connection object generation after update") + g.Expect(connPodStatusObj.Status.ObservedGeneration).ShouldNot(gomega.Equal(generationOnCreate), "Observed generation should have changed after update") + g.Expect(connPodStatusObj.Status.ID).Should(gomega.Equal(pod.Name), "ID should still match the pod name after update") + g.Expect(connPodStatusObj.Status.ConnectionUID).Should(gomega.Equal(connObj.GetUID()), "ConnectionPodStatus UID should still match the Connection object UID after update") + g.Expect(connPodStatusObj.Status.Active).Should(gomega.BeFalse(), "Active status should be false after the connection was updated, as no new publish operations were performed for this connection observedGeneration") + }).WithTimeout(timeout).Should(gomega.Succeed()) + + // Clear the previous request with the same name to avoid false positives now only load the latest + requests.Clear() + + // Test Delete of the connection object + g.Expect(k8sClient.Delete(ctx, &connObj)).Should(gomega.Succeed(), "Deleting the connection object should succeed") + // Await for the reconcile request to finish + g.Eventually(func() bool { + expectedReq := reconcile.Request{NamespacedName: typeConnectionNamespacedName} + _, finished := requests.Load(expectedReq) + return finished + }).WithTimeout(timeout).Should(gomega.BeTrue(), "Reconcile request should finish after deleting the connection object") + + // Assert the Connection and ConnectionPodStatus object after deleting the Connection object + g.Eventually(func(g gomega.Gomega) { + err := k8sClient.Get(ctx, typeStatusNamespacedName, &connObj) + g.Expect(err).ShouldNot(gomega.Succeed(), "Connection obj cleaned up after deleting the connection object") + err = k8sClient.Get(ctx, typeStatusNamespacedName, &connPodStatusObj) + g.Expect(err).ShouldNot(gomega.Succeed(), "Connection pod status should get cleaned up after deleting the connection object") + }).WithTimeout(timeout) + + // Cleanup the Connection object if it exists at the end + defer func() { + k8sClient.Delete(ctx, &connObj) // nolint:errcheck + k8sClient.Delete(ctx, &connPodStatusObj) // nolint:errcheck + }() + }) +} + +// Mocks ExportSystem to simulate the export system behavior failures and impact on the controller +func TestReconcile_ExportSystem_Failures(t *testing.T) { + // Setup + auditConnectionName := "audit-connection-2" + auditConnectionNameFlag := fmt.Sprintf("--audit-connection=%s", auditConnectionName) + require.NoError(t, flag.CommandLine.Parse([]string{"--enable-violation-export=true", auditConnectionNameFlag}), "parsing flags") + + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + g := gomega.NewGomegaWithT(t) + mgr, _ := testutils.SetupManager(t, cfg) + getPod := func(_ context.Context) (*corev1.Pod, error) { + pod := fakes.Pod(fakes.WithNamespace("gatekeeper-system"), fakes.WithName("no-pod")) + return pod, nil + } + + t.Run("Reconcile called for Connection create, upsert fails, and status error", func(t *testing.T) { + connObj := connectionv1alpha1.Connection{ + ObjectMeta: metav1.ObjectMeta{ + Name: auditConnectionName, + Namespace: util.GetNamespace(), + }, + Spec: connectionv1alpha1.ConnectionSpec{ + Driver: disk.Name, + Config: &anythingtypes.Anything{Value: map[string]interface{}{ + "path": "value", + "maxAuditResults": float64(3), + }}, + }, + } + typeConnectionNamespacedName := types.NamespacedName{ + Name: auditConnectionName, + Namespace: util.GetNamespace(), + } + + mockErrStr := "mock error for upsert connection" + mockErr := fmt.Errorf("%s", mockErrStr) + fakeExportSystem := &FakeExportSystem{ + UpsertConnectionError: mockErr, + } + + directK8sClient, err := client.New(cfg, client.Options{Scheme: mgr.GetScheme()}) + require.NoError(t, err, "Failed to create direct k8s client") + reconciler := Reconciler{ + reader: directK8sClient, + writer: directK8sClient, + scheme: mgr.GetScheme(), + system: fakeExportSystem, + auditConnectionName: auditConnectionName, + getPod: getPod, + } + + // Test setup Create the connection object + g.Expect(directK8sClient.Create(ctx, &connObj)).Should(gomega.Succeed()) + + // Call Reconcile directly to assert the behavior on failures without having controller go through requeues + result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: typeConnectionNamespacedName}) + // The system upsert error causes a requeue but the error doesn't get returned only the status update errors do + g.Expect(result.Requeue).Should(gomega.Equal(true), "Reconcile should requeue after an error") // nolint:staticcheck + g.Expect(err).Should(gomega.BeNil(), "Reconcile should not return an error on initial creation") + + // Assert the ConnectionPodStatus - Errors should be present after unsuccessful upsert + connPodStatusObj := statusv1alpha1.ConnectionPodStatus{} + pod, _ := getPod(ctx) + connPodStatusName, _ := statusv1alpha1.KeyForConnection(pod.Name, connObj.Namespace, connObj.Name) + typeConnPodStatusNamespacedName := types.NamespacedName{ + Name: connPodStatusName, + Namespace: util.GetNamespace(), + } + g.Eventually(func(g gomega.Gomega) { + err := directK8sClient.Get(ctx, typeConnPodStatusNamespacedName, &connPodStatusObj) + g.Expect(err).Should(gomega.Succeed(), "Status should exist after creation") + g.Expect(connPodStatusObj.GetLabels()).Should(gomega.HaveKeyWithValue(statusv1beta1.ConnectionNameLabel, connObj.Name), "Status should have the correct connection name label") + g.Expect(connPodStatusObj.Status.Errors[0].Message).Should(gomega.Equal(mockErrStr), "Status should have an error with expected message after creation") + g.Expect(connPodStatusObj.Status.Errors[0].Type).Should(gomega.Equal(statusv1alpha1.UpsertConnectionError), "Status should have an error with expected type after creation") + g.Expect(connPodStatusObj.Status.ObservedGeneration).Should(gomega.Equal(connObj.GetGeneration()), "Observed generation should match the connection object generation") + g.Expect(connPodStatusObj.Status.ID).Should(gomega.Equal(pod.Name), "ID should match the pod name") + g.Expect(connPodStatusObj.Status.ConnectionUID).Should(gomega.Equal(connObj.GetUID()), "ConnectionPodStatus UID should match the connection object UID") + g.Expect(connPodStatusObj.Status.Active).Should(gomega.BeFalse(), "No publish operations has been performed yet, so active status should be false") + }).WithTimeout(timeout).Should(gomega.Succeed()) + + g.Expect(fakeExportSystem.UpsertConnectionCalledCount).Should(gomega.Equal(1), "UpsertConnection count") + g.Expect(fakeExportSystem.CloseConnectionCalledCount).Should(gomega.Equal(0), "CloseConnection count") + g.Expect(fakeExportSystem.PublishCalledCount).Should(gomega.Equal(0), "Publish count") + + // Delete which should trigger CloseConnection and assert the behavior even on closed connection failures + g.Expect(directK8sClient.Delete(ctx, &connObj)).Should(gomega.Succeed()) + mockErrStr = "mock error for close connection" + mockErr = fmt.Errorf("%s", mockErrStr) + fakeExportSystem = &FakeExportSystem{ + CloseConnectionError: mockErr, + } + reconciler.system = fakeExportSystem + result, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: typeConnectionNamespacedName}) + // The system connection error causes a requeue but the error doesn't get returned only the status update errors do + g.Expect(result.Requeue).Should(gomega.Equal(true), "Reconcile should requeue after an error") // nolint:staticcheck + g.Expect(err).Should(gomega.BeNil(), "Reconcile should not return an error on initial creation") + g.Expect(fakeExportSystem.UpsertConnectionCalledCount).Should(gomega.Equal(0), "UpsertConnection count") + g.Expect(fakeExportSystem.CloseConnectionCalledCount).Should(gomega.Equal(1), "CloseConnection count") + g.Expect(fakeExportSystem.PublishCalledCount).Should(gomega.Equal(0), "Publish count") + + // Assert the Connection object + g.Eventually(func() bool { + err := directK8sClient.Get(ctx, typeConnectionNamespacedName, &connObj) + if err != nil && apierrors.IsNotFound(err) { + return true + } + return false + }).WithTimeout(timeout).Should(gomega.Equal(true), "Resource should not exist after deletion") + + // Assert the ConnectionPodStatus object + g.Eventually(func() bool { + err := directK8sClient.Get(ctx, typeConnectionNamespacedName, &connPodStatusObj) + if err != nil && apierrors.IsNotFound(err) { + return true + } + return false + }).WithTimeout(timeout).Should(gomega.Equal(true), "Resource should not exist after deletion") + + // Cleanup the Connection object if it exists at the end + defer func() { + directK8sClient.Delete(ctx, &connObj) // nolint:errcheck + directK8sClient.Delete(ctx, &connPodStatusObj) // nolint:errcheck + }() + }) +} + +// Mock K8s client to simulate the client failures and impact on the controller +func TestReconcile_Client_Failures(t *testing.T) { + // Setup + auditConnectionName := "audit-connection-3" + auditConnectionNameFlag := fmt.Sprintf("--audit-connection=%s", auditConnectionName) + require.NoError(t, flag.CommandLine.Parse([]string{"--enable-violation-export=true", auditConnectionNameFlag}), "parsing flags") + + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + g := gomega.NewGomegaWithT(t) + mgr, _ := testutils.SetupManager(t, cfg) + getPod := func(_ context.Context) (*corev1.Pod, error) { + pod := fakes.Pod(fakes.WithNamespace("gatekeeper-system"), fakes.WithName("no-pod")) + return pod, nil + } + + t.Run("Test GET returns error causes requeue", func(t *testing.T) { + connObj := connectionv1alpha1.Connection{ + ObjectMeta: metav1.ObjectMeta{ + Name: auditConnectionName, + Namespace: util.GetNamespace(), + }, + Spec: connectionv1alpha1.ConnectionSpec{ + Driver: disk.Name, + Config: &anythingtypes.Anything{Value: map[string]interface{}{ + "path": "value", + "maxAuditResults": float64(3), + }}, + }, + } + typeConnectionNamespacedName := types.NamespacedName{ + Name: auditConnectionName, + Namespace: util.GetNamespace(), + } + + mockErrStr := "mock error for upsert connection" + mockErr := fmt.Errorf("%s", mockErrStr) + fakeExportSystem := &FakeExportSystem{ + UpsertConnectionError: mockErr, + } + + directK8sClient, err := client.New(cfg, client.Options{Scheme: mgr.GetScheme()}) + require.NoError(t, err, "Failed to create direct k8s client") + mockErr = fmt.Errorf("mock get error") + fakeClient := &FakeClient{ + Client: directK8sClient, + getErr: mockErr, + } + reconciler := Reconciler{ + reader: fakeClient, + writer: fakeClient, + scheme: mgr.GetScheme(), + system: fakeExportSystem, + auditConnectionName: auditConnectionName, + getPod: getPod, + } + + // Test setup Create the Connection object + g.Expect(directK8sClient.Create(ctx, &connObj)).Should(gomega.Succeed()) + + // Call Reconcile directly to assert the behavior on failures without having controller go through requeues + result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: typeConnectionNamespacedName}) + g.Expect(result.Requeue).Should(gomega.Equal(false), "Reconcile should not requeue after the GET error") // nolint:staticcheck + g.Expect(err).Should(gomega.Equal(mockErr), "Reconcile should return an error") + + // Cleanup the Connection object if it exists at the end + defer func() { + directK8sClient.Delete(ctx, &connObj) // nolint:errcheck + }() + }) +} + +func TestReconcile_ConnectionPodStatus(t *testing.T) { + // Setup + auditConnectionName := "audit-connection-4" + auditConnectionNameFlag := fmt.Sprintf("--audit-connection=%s", auditConnectionName) + require.NoError(t, flag.CommandLine.Parse([]string{"--enable-violation-export=true", auditConnectionNameFlag}), "parsing flags") + + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + g := gomega.NewGomegaWithT(t) + mgr, _ := testutils.SetupManager(t, cfg) + k8sClient := testclient.NewRetryClient(mgr.GetClient()) + getPod := func(_ context.Context) (*corev1.Pod, error) { + pod := fakes.Pod(fakes.WithNamespace("gatekeeper-system"), fakes.WithName("no-pod")) + return pod, nil + } + // Required for the test PodToConnectionMapper to pickup the test pod name + os.Setenv("POD_NAME", "no-pod") + + // Wrap the controller Reconciler so it writes each request to a map when it is finished reconciling + originalReconciler := newReconciler(mgr, export.NewSystem(), auditConnectionName, getPod) + wrappedReconciler, requests := testutils.SetupTestReconcile(originalReconciler) + // Register the controller with the manager + require.NoError(t, add(mgr, wrappedReconciler)) + // Start the manager and let it run in the background + testutils.StartManager(ctx, t, mgr) + + t.Run("Reconcile called when ConnectionPodStatus updated on the side and reconciled back to expected state", func(_ *testing.T) { + connObj := connectionv1alpha1.Connection{ + ObjectMeta: metav1.ObjectMeta{ + Name: auditConnectionName, + Namespace: util.GetNamespace(), + }, + Spec: connectionv1alpha1.ConnectionSpec{ + Driver: disk.Name, + Config: &anythingtypes.Anything{Value: map[string]interface{}{ + "path": "value", + "maxAuditResults": float64(3), + }}, + }, + } + typeConnectionNamespacedName := types.NamespacedName{ + Name: auditConnectionName, + Namespace: util.GetNamespace(), + } + + // Connection object should not exist at the beginning of the test + g.Expect(k8sClient.Get(ctx, typeConnectionNamespacedName, &connObj)).ShouldNot(gomega.Succeed(), "Resource should not exist before creation") + + // Test setup create the Connection object + g.Expect(k8sClient.Create(ctx, &connObj)).Should(gomega.Succeed()) + + // Await for the reconcile request to finish + g.Eventually(func() bool { + expectedReq := reconcile.Request{NamespacedName: typeConnectionNamespacedName} + _, finished := requests.Load(expectedReq) + return finished + }).WithTimeout(timeout).Should(gomega.BeTrue()) + + // Assert the ConnectionPodStatus + connPodStatusObj := statusv1alpha1.ConnectionPodStatus{} + pod, _ := getPod(ctx) + connPodStatusName, _ := statusv1alpha1.KeyForConnection(pod.Name, connObj.Namespace, connObj.Name) + typeStatusNamespacedName := types.NamespacedName{ + Name: connPodStatusName, + Namespace: util.GetNamespace(), + } + g.Eventually(func(g gomega.Gomega) { + err := k8sClient.Get(ctx, typeStatusNamespacedName, &connPodStatusObj) + g.Expect(err).Should(gomega.Succeed(), "Status should exist after creation") + g.Expect(connPodStatusObj.Status.Active).Should(gomega.BeFalse(), "No publish operations have been performed yet, so active status should be false") + }).WithTimeout(timeout).Should(gomega.Succeed()) + + // Update on the side to force the reconcile to be called + connPodStatusObj.Status.Errors = []*statusv1alpha1.ConnectionError{ + { + Type: statusv1alpha1.UpsertConnectionError, + Message: "Mock error for testing", + }, + } + connPodStatusObj.Status.Active = true + + // Clear the previous request with the same name to avoid false positives now only load the latest + requests.Clear() + + g.Expect(k8sClient.Update(ctx, &connPodStatusObj)).Should(gomega.Succeed(), "Updating the connection pod status should succeed") + + // Await for the reconcile request to finish + g.Eventually(func() bool { + expectedReq := reconcile.Request{NamespacedName: typeConnectionNamespacedName} + _, finished := requests.Load(expectedReq) + return finished + }).WithTimeout(timeout).Should(gomega.BeTrue(), "Reconcile request should finish after updating the connection pod status") + + // Assert the ConnectionPodStatus + g.Eventually(func(g gomega.Gomega) { + err := k8sClient.Get(ctx, typeStatusNamespacedName, &connPodStatusObj) + g.Expect(err).Should(gomega.Succeed(), "Status should still exist after updating the connection pod status") + // active will stay at the updated state of true + // since it's publishing status can only be reliably set during Publishing or when an Upsert fails on successful Upsert we trust the existing state + g.Expect(connPodStatusObj.Status.Active).Should(gomega.BeTrue(), "Active status was true after updating the connection pod status and should stay true") + // Errors should get reset since we will have performed a successful Upsert and for that generation of the Connection object the Errors should get Reconcile back to empty + g.Expect(connPodStatusObj.Status.Errors).Should(gomega.BeEmpty(), "Status should have an error after updating the connection pod status") + }).WithTimeout(timeout).Should(gomega.Succeed()) + + // Cleanup the Connection and ConnectionPodStatus object if it exists at the end + defer func() { + k8sClient.Delete(ctx, &connPodStatusObj) // nolint:errcheck + k8sClient.Delete(ctx, &connObj) // nolint:errcheck + }() + }) +} + +func TestReconcile_UnsupportedConnectionName(t *testing.T) { + // Setup + auditConnectionNameGood := "audit-connection-good" + auditConnectionNameFlag := fmt.Sprintf("--audit-connection=%s", auditConnectionNameGood) + require.NoError(t, flag.CommandLine.Parse([]string{"--enable-violation-export=true", auditConnectionNameFlag}), "parsing flags") + + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + g := gomega.NewGomegaWithT(t) + mgr, _ := testutils.SetupManager(t, cfg) + k8sClient := testclient.NewRetryClient(mgr.GetClient()) + getPod := func(_ context.Context) (*corev1.Pod, error) { + pod := fakes.Pod(fakes.WithNamespace("gatekeeper-system"), fakes.WithName("no-pod")) + return pod, nil + } + // Wrap the controller Reconciler so it writes each request to a map when it is finished reconciling + originalReconciler := newReconciler(mgr, export.NewSystem(), auditConnectionNameGood, getPod) + wrappedReconciler, requests := testutils.SetupTestReconcile(originalReconciler) + // Register the controller with the manager + require.NoError(t, add(mgr, wrappedReconciler)) + // Start the manager and let it run in the background + testutils.StartManager(ctx, t, mgr) + + t.Run("Reconcile called for new Connection create for an unsupported connection name and the ConnectionPodStatus has an UpsertError and doesn't impact Create for a valid Connection object", func(_ *testing.T) { + auditConnectionNameBad := "audit-connection-bad" + + connObjBad := connectionv1alpha1.Connection{ + ObjectMeta: metav1.ObjectMeta{ + Name: auditConnectionNameBad, + Namespace: util.GetNamespace(), + }, + Spec: connectionv1alpha1.ConnectionSpec{ + Driver: disk.Name, + Config: &anythingtypes.Anything{Value: map[string]interface{}{ + "path": "value", + "maxAuditResults": float64(3), + }}, + }, + } + typeConnectionNamespacedName := types.NamespacedName{ + Name: auditConnectionNameBad, + Namespace: util.GetNamespace(), + } + + // Connection object should not exist at the beginning of the test + g.Expect(k8sClient.Get(ctx, typeConnectionNamespacedName, &connObjBad)).ShouldNot(gomega.Succeed(), "Resource should not exist before creation") + + // Test setup create the Connection object + g.Expect(k8sClient.Create(ctx, &connObjBad)).Should(gomega.Succeed()) + + // Await for the reconcile request to finish + g.Eventually(func() bool { + expectedReq := reconcile.Request{NamespacedName: typeConnectionNamespacedName} + _, finished := requests.Load(expectedReq) + return finished + }).WithTimeout(timeout).Should(gomega.BeTrue()) + + // Assert ConnectionPodStatus + connPodStatusObjBad := statusv1alpha1.ConnectionPodStatus{} + pod, _ := getPod(ctx) + connPodStatusNameBad, _ := statusv1alpha1.KeyForConnection(pod.Name, connObjBad.Namespace, connObjBad.Name) + typeStatusNamespacedNameBad := types.NamespacedName{ + Name: connPodStatusNameBad, + Namespace: util.GetNamespace(), + } + g.Eventually(func(g gomega.Gomega) { + err := k8sClient.Get(ctx, typeStatusNamespacedNameBad, &connPodStatusObjBad) + g.Expect(err).Should(gomega.Succeed(), "Status should exist after creation") + g.Expect(connPodStatusObjBad.GetLabels()).Should(gomega.HaveKeyWithValue(statusv1beta1.ConnectionNameLabel, connObjBad.Name), "Status should have the correct connection name label") + g.Expect(connPodStatusObjBad.Status.Errors).ShouldNot(gomega.BeEmpty(), "Status should have an error after creation for unsupported connection name") + g.Expect(connPodStatusObjBad.Status.Errors[0].Message).Should(gomega.ContainSubstring("unsupported"), "Status should have an error with expected message for unsupported connection name") + g.Expect(connPodStatusObjBad.Status.Errors[0].Type).Should(gomega.Equal(statusv1alpha1.UpsertConnectionError), "Status should have an error with expected type for unsupported connection name") + g.Expect(connPodStatusObjBad.Status.ObservedGeneration).Should(gomega.Equal(connObjBad.GetGeneration()), "Observed generation should match the connection object generation") + g.Expect(connPodStatusObjBad.Status.ID).Should(gomega.Equal(pod.Name), "ID should match the pod name") + g.Expect(connPodStatusObjBad.Status.ConnectionUID).Should(gomega.Equal(connObjBad.GetUID()), "ConnectionPodStatus UID should match the connection object UID") + g.Expect(connPodStatusObjBad.Status.Active).Should(gomega.BeFalse(), "No publish operations have been performed yet, so active status should be false") + }).WithTimeout(timeout).Should(gomega.Succeed()) + + // Test Delete of the connection object + g.Expect(k8sClient.Delete(ctx, &connObjBad)).Should(gomega.Succeed(), "Deleting the connection object should succeed") + // Await for the reconcile request to finish + g.Eventually(func() bool { + expectedReq := reconcile.Request{NamespacedName: typeConnectionNamespacedName} + _, finished := requests.Load(expectedReq) + return finished + }).WithTimeout(timeout).Should(gomega.BeTrue(), "Reconcile request should finish after deleting the connection object") + + // Assert the Connection and ConnectionPodStatus object after deleting the Connection object + g.Eventually(func(g gomega.Gomega) { + err := k8sClient.Get(ctx, typeStatusNamespacedNameBad, &connObjBad) + g.Expect(err).ShouldNot(gomega.Succeed(), "Connection obj cleaned up after deleting the connection object") + err = k8sClient.Get(ctx, typeStatusNamespacedNameBad, &connPodStatusObjBad) + g.Expect(err).ShouldNot(gomega.Succeed(), "Connection pod status should get cleaned up after deleting the connection object") + }).WithTimeout(timeout) + + // Clear the previous request to avoid false positives now only load the latest + requests.Clear() + + // Create a valid Connection object to ensure the controller can handle both valid and invalid connection names + connObjGood := connectionv1alpha1.Connection{ + ObjectMeta: metav1.ObjectMeta{ + Name: auditConnectionNameGood, + Namespace: util.GetNamespace(), + }, + Spec: connectionv1alpha1.ConnectionSpec{ + Driver: disk.Name, + Config: &anythingtypes.Anything{Value: map[string]interface{}{ + "path": "value", + "maxAuditResults": float64(3), + }}, + }, + } + + typeConnectionNamespacedNameGood := types.NamespacedName{ + Name: auditConnectionNameGood, + Namespace: util.GetNamespace(), + } + + // Connection object should not exist at the beginning of the test + g.Expect(k8sClient.Get(ctx, typeConnectionNamespacedNameGood, &connObjGood)).ShouldNot(gomega.Succeed(), "Resource should not exist before creation") + + // Test setup create the Connection object + g.Expect(k8sClient.Create(ctx, &connObjGood)).Should(gomega.Succeed()) + + // Await for the reconcile request to finish + g.Eventually(func() bool { + expectedReq := reconcile.Request{NamespacedName: typeConnectionNamespacedName} + _, finished := requests.Load(expectedReq) + return finished + }).WithTimeout(timeout).Should(gomega.BeTrue()) + + // Assert ConnectionPodStatus + connPodStatusObjGood := statusv1alpha1.ConnectionPodStatus{} + connPodStatusNameGood, _ := statusv1alpha1.KeyForConnection(pod.Name, connObjGood.Namespace, connObjGood.Name) + typeStatusNamespacedName := types.NamespacedName{ + Name: connPodStatusNameGood, + Namespace: util.GetNamespace(), + } + g.Eventually(func(g gomega.Gomega) { + err := k8sClient.Get(ctx, typeStatusNamespacedName, &connPodStatusObjGood) + g.Expect(err).Should(gomega.Succeed(), "Status should exist after creation") + g.Expect(connPodStatusObjGood.GetLabels()).Should(gomega.HaveKeyWithValue(statusv1beta1.ConnectionNameLabel, connObjGood.Name), "Status should have the correct connection name label") + g.Expect(connPodStatusObjGood.Status.Errors).Should(gomega.BeEmpty(), "Status should not have an error after creation for supported connection name") + g.Expect(connPodStatusObjGood.Status.ObservedGeneration).Should(gomega.Equal(connObjGood.GetGeneration()), "Observed generation should match the connection object generation") + g.Expect(connPodStatusObjGood.Status.ID).Should(gomega.Equal(pod.Name), "ID should match the pod name") + g.Expect(connPodStatusObjGood.Status.ConnectionUID).Should(gomega.Equal(connObjGood.GetUID()), "ConnectionPodStatus UID should match the connection object UID") + g.Expect(connPodStatusObjGood.Status.Active).Should(gomega.BeFalse(), "No publish operations have been performed yet, so active status should be false") + }).WithTimeout(timeout).Should(gomega.Succeed()) + + // Test Delete of the connection object + g.Expect(k8sClient.Delete(ctx, &connObjGood)).Should(gomega.Succeed(), "Deleting the connection object should succeed") + // Await for the reconcile request to finish + g.Eventually(func() bool { + expectedReq := reconcile.Request{NamespacedName: typeConnectionNamespacedName} + _, finished := requests.Load(expectedReq) + return finished + }).WithTimeout(timeout).Should(gomega.BeTrue(), "Reconcile request should finish after deleting the connection object") + + // Assert the Connection and ConnectionPodStatus object after deleting the Connection object + g.Eventually(func(g gomega.Gomega) { + err := k8sClient.Get(ctx, typeStatusNamespacedName, &connObjGood) + g.Expect(err).ShouldNot(gomega.Succeed(), "Connection obj cleaned up after deleting the connection object") + err = k8sClient.Get(ctx, typeStatusNamespacedName, &connPodStatusObjGood) + g.Expect(err).ShouldNot(gomega.Succeed(), "Connection pod status should get cleaned up after deleting the connection object") + }).WithTimeout(timeout) + + // Cleanup the Connection related objects if they exists at the end + defer func() { + k8sClient.Delete(ctx, &connObjBad) // nolint:errcheck + k8sClient.Delete(ctx, &connObjGood) // nolint:errcheck + k8sClient.Delete(ctx, &connPodStatusObjBad) // nolint:errcheck + k8sClient.Delete(ctx, &connPodStatusObjGood) // nolint:errcheck + }() + }) +} + +type FakeClient struct { + client.Client + + getErr error + updateErr error + deleteErr error + createErr error +} + +func (f *FakeClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + if f.getErr != nil { + return f.getErr + } + return f.Client.Get(ctx, key, obj, opts...) +} + +func (f *FakeClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + if f.updateErr != nil { + return f.updateErr + } + return f.Client.Update(ctx, obj, opts...) +} + +func (f *FakeClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { + if f.deleteErr != nil { + return f.deleteErr + } + return f.Client.Delete(ctx, obj, opts...) +} + +func (f *FakeClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { + if f.createErr != nil { + return f.createErr + } + return f.Client.Create(ctx, obj, opts...) +} + +type FakeExportSystem struct { + PublishCalledCount int + PublishError error + UpsertConnectionCalledCount int + UpsertConnectionError error + CloseConnectionCalledCount int + CloseConnectionError error +} + +func (f *FakeExportSystem) Publish(_ context.Context, _ string, _ string, _ interface{}) error { + f.PublishCalledCount++ + if f.PublishError != nil { + return f.PublishError + } + return nil +} + +func (f *FakeExportSystem) UpsertConnection(_ context.Context, _ interface{}, _ string, _ string) error { + f.UpsertConnectionCalledCount++ + if f.UpsertConnectionError != nil { + return f.UpsertConnectionError + } + return nil +} + +func (f *FakeExportSystem) CloseConnection(_ string) error { + f.CloseConnectionCalledCount++ + if f.CloseConnectionError != nil { + return f.CloseConnectionError + } + return nil +} diff --git a/pkg/export/system.go b/pkg/export/system.go index 74130e19961..b7a69d5d486 100644 --- a/pkg/export/system.go +++ b/pkg/export/system.go @@ -15,6 +15,12 @@ var SupportedDrivers = map[string]driver.Driver{ disk.Name: disk.Connections, } +type Exporter interface { + Publish(ctx context.Context, connectionName string, subject string, msg interface{}) error + UpsertConnection(ctx context.Context, config interface{}, connectionName string, newDriver string) error + CloseConnection(connectionName string) error +} + type System struct { mux sync.RWMutex connectionToDriver map[string]string diff --git a/pkg/export/util/util.go b/pkg/export/util/util.go index c78488e5e68..d04226cc803 100644 --- a/pkg/export/util/util.go +++ b/pkg/export/util/util.go @@ -1,5 +1,18 @@ package util +import "flag" + +const ( + defaultConnection = "audit-connection" + defaultChannel = "audit-channel" +) + +var ( + ExportEnabled = flag.Bool("enable-violation-export", false, "(alpha) Enable exporting violations to external systems") + AuditConnection = flag.String("audit-connection", defaultConnection, "(alpha) Connection name for exporting audit violation messages. Defaults to audit-connection") + AuditChannel = flag.String("audit-channel", defaultChannel, "(alpha) Channel name for exporting audit violation messages. Defaults to audit-channel") +) + // ExportMsg represents export message for each violation. type ExportMsg struct { ID string `json:"id,omitempty"` diff --git a/test/export/fake-reader/export_config.yaml b/test/export/fake-reader/export_config.yaml deleted file mode 100644 index a56339c805f..00000000000 --- a/test/export/fake-reader/export_config.yaml +++ /dev/null @@ -1,12 +0,0 @@ -apiVersion: v1 -kind: ConfigMap -metadata: - name: audit - namespace: gatekeeper-system -data: - driver: "disk" - config: | - { - "path": "/tmp/violations", - "maxAuditResults": 3 - } diff --git a/test/export/fake-reader/export_connection.yaml b/test/export/fake-reader/export_connection.yaml new file mode 100644 index 00000000000..18a7656cce5 --- /dev/null +++ b/test/export/fake-reader/export_connection.yaml @@ -0,0 +1,10 @@ +apiVersion: connection.gatekeeper.sh/v1alpha1 +kind: Connection +metadata: + name: audit-connection + namespace: gatekeeper-system +spec: + driver: "disk" + config: + path: "/tmp/violations" + maxAuditResults: 3 diff --git a/website/docs/export-driver-walkthrough.md b/website/docs/export-driver-walkthrough.md index 4a79c1bbe58..5792546e59e 100644 --- a/website/docs/export-driver-walkthrough.md +++ b/website/docs/export-driver-walkthrough.md @@ -55,7 +55,7 @@ func (r *Foo) Publish(ctx context.Context, connectionName string, data interface ... } -func (r *Foo) loseConnection(connectionName string) error { +func (r *Foo) CloseConnection(connectionName string) error { ... } @@ -77,24 +77,22 @@ var SupportedDrivers = map[string]driver.Driver{ } ``` -And thats it! Exporter system will take the newly added driver into account and whenever a configMap to establish connection to export message is created. +And thats it! Exporter system will take the newly added driver into account and a `Connection` custom resource to establish connection to export message is created. ### How to establish connections to different backend -To enable audit to use this driver to publish messages, a connection configMap with appropriate `config` and `driver` is needed. For example, +To enable audit to use this driver to publish messages, a `Connection` custom resource with appropriate `config` and `driver` is needed. For example, ```yaml -apiVersion: v1 -kind: ConfigMap +apiVersion: connection.gatekeeper.sh/v1alpha1 +kind: Connection metadata: - name: audit + name: audit-connection namespace: gatekeeper-system -data: +spec: driver: "foo" - config: | - { - - } + config: + key: value ``` -> The `data.driver` field must exist and must match one of the keys of the `SupportedDrivers` map that was defined earlier to use the corresponding driver. The `data.config` field in the configuration can vary depending on the driver being used. For dapr driver, `data.config` must be `{"component": "pubsub"}`. +> The `data.driver` field must exist and must match one of the keys of the `SupportedDrivers` map that was defined earlier to use the corresponding driver. The `data.config` field in the configuration can vary depending on the driver being used. For dapr driver, `data.config` must be `component: "pubsub"`. diff --git a/website/docs/export.md b/website/docs/export.md index f4b93106aae..f7d86323363 100644 --- a/website/docs/export.md +++ b/website/docs/export.md @@ -21,30 +21,63 @@ Install prerequisites such as a pubsub tool, a message broker etc. In the audit deployment, set the `--enable-violation-export` flag to `true` to export audit violations. Additionally, use `--audit-connection` (defaults to `audit-connection`) and `--audit-channel`(defaults to `audit-channel`) flags to allow audit to export violations using desired connection onto desired channel. `--audit-connection` must be set to the name of the connection config, and `--audit-channel` must be set to name of the channel where violations should get published. -A ConfigMap that contains `driver` and `config` fields in `data` is required to establish connection for sending violations over the channel. Following is an example ConfigMap to establish a connection that uses Dapr to export messages: +A `Connection` custom resource with `spec` that contains `driver` and `config` fields are required to establish connection for sending violations over the channel. Following is an example to establish a connection that uses Dapr to export messages: ```yaml -apiVersion: v1 -kind: ConfigMap +apiVersion: connection.gatekeeper.sh/v1alpha1 +kind: Connection metadata: name: audit-connection namespace: gatekeeper-system -data: +spec: driver: "dapr" - config: | - { - "component": "pubsub" - } + config: + component: "pubsub" ``` - -- `driver` field determines which tool/driver should be used to establish a connection. Valid values are: `dapr` -- `config` field is a json object that configures how the connection is made. E.g. which queue messages should be sent to. +- `driver` field determines which tool/driver should be used to establish a connection. Valid values are: `dapr`, `disk` +- `config` field is an object that configures how the connection is made. E.g. which queue messages should be sent to. #### Available drivers - Dapr: Export violations using pubsub model provided with [Dapr](https://dapr.io/) - Disk: Export violations to file system. +#### Status +Upon controller ingestion, the `Connection` will reflect the state of the export connection on its `status` sub resource. + +```yaml +apiVersion: connection.gatekeeper.sh/v1alpha1 +kind: Connection +metadata: + name: audit-connection + namespace: gatekeeper-system +spec: + driver: "dapr" + config: + component: "pubsub" +status: + byPod: + ID: "pod-id" + ConnectionUID: "connection-id" + Active: {true | false} + Errors: + - Type: UpsertConnection + Message: "Error message" + - Type: Publish + Message: "Error message" +``` + +The following table describes each property in the `status.byPod` section: + +| Property | Type | Description | +|----------|------|-------------| +| `ID` | string | Unique identifier for the pod handling the connection | +| `ConnectionUID` | string | Unique identifier for the specific connection instance | +| `Active` | boolean | Indicates whether the connection had at least one successful publishing and is currently active and operational (`true`) or inactive (`false`) | +| `Errors` | array | List of error objects containing information about any issues with the connection | +| `Errors[].Type` | string | Type of error encountered (e.g., `UpsertConnection`, `PublishingError`) | +| `Errors[].Message` | string | Human-readable description of the error | + ### Quick start with exporting violations using Dapr and Redis #### Prerequisites for Dapr @@ -179,21 +212,19 @@ data: ```shell kubectl apply -f - <