diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 1273054e84d..46680cb3ec6 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -876,6 +876,7 @@ func registerControllers(mgr manager.Manager, params operator.Parameters, access {name: "AGENT-KB", registerFunc: associationctl.AddAgentKibana}, {name: "AGENT-FS", registerFunc: associationctl.AddAgentFleetServer}, {name: "EMS-ES", registerFunc: associationctl.AddMapsES}, + {name: "LOGSTASH-ES", registerFunc: associationctl.AddLogstashES}, {name: "ES-MONITORING", registerFunc: associationctl.AddEsMonitoring}, {name: "KB-MONITORING", registerFunc: associationctl.AddKbMonitoring}, {name: "BEAT-MONITORING", registerFunc: associationctl.AddBeatMonitoring}, diff --git a/config/crds/v1/all-crds.yaml b/config/crds/v1/all-crds.yaml index b947c97e262..634650e77e0 100644 --- a/config/crds/v1/all-crds.yaml +++ b/config/crds/v1/all-crds.yaml @@ -9119,6 +9119,45 @@ spec: count: format: int32 type: integer + elasticsearchRefs: + description: ElasticsearchRefs are references to Elasticsearch clusters + running in the same Kubernetes cluster. + items: + description: ElasticsearchCluster is a named reference to an Elasticsearch + cluster which can be used in a Logstash pipeline. + properties: + clusterName: + minLength: 1 + type: string + name: + description: Name of an existing Kubernetes object corresponding + to an Elastic resource managed by ECK. + type: string + namespace: + description: Namespace of the Kubernetes object. If empty, defaults + to the current namespace. + type: string + secretName: + description: 'SecretName is the name of an existing Kubernetes + secret that contains connection information for associating + an Elastic resource not managed by the operator. The referenced + secret must contain the following: - `url`: the URL to reach + the Elastic resource - `username`: the username of the user + to be authenticated to the Elastic resource - `password`: + the password of the user to be authenticated to the Elastic + resource - `ca.crt`: the CA certificate in PEM format (optional). + This field cannot be used in combination with the other fields + name, namespace or serviceName.' + type: string + serviceName: + description: ServiceName is the name of an existing Kubernetes + service which is used to make requests to the referenced object. + It has to be in the same namespace as the referenced resource. + If left empty, the default HTTP service of the referenced + resource is used. + type: string + type: object + type: array image: description: Image is the Logstash Docker image to deploy. Version and Type have to match the Logstash in the image. @@ -9747,6 +9786,13 @@ spec: availableNodes: format: int32 type: integer + elasticsearchAssociationsStatus: + additionalProperties: + description: AssociationStatus is the status of an association resource. + type: string + description: ElasticsearchAssociationStatus is the status of any auto-linking + to Elasticsearch clusters. + type: object expectedNodes: format: int32 type: integer diff --git a/config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml b/config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml index c693c339a01..3161e4b0fb6 100644 --- a/config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml +++ b/config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml @@ -73,6 +73,45 @@ spec: count: format: int32 type: integer + elasticsearchRefs: + description: ElasticsearchRefs are references to Elasticsearch clusters + running in the same Kubernetes cluster. + items: + description: ElasticsearchCluster is a named reference to an Elasticsearch + cluster which can be used in a Logstash pipeline. + properties: + clusterName: + minLength: 1 + type: string + name: + description: Name of an existing Kubernetes object corresponding + to an Elastic resource managed by ECK. + type: string + namespace: + description: Namespace of the Kubernetes object. If empty, defaults + to the current namespace. + type: string + secretName: + description: 'SecretName is the name of an existing Kubernetes + secret that contains connection information for associating + an Elastic resource not managed by the operator. The referenced + secret must contain the following: - `url`: the URL to reach + the Elastic resource - `username`: the username of the user + to be authenticated to the Elastic resource - `password`: + the password of the user to be authenticated to the Elastic + resource - `ca.crt`: the CA certificate in PEM format (optional). + This field cannot be used in combination with the other fields + name, namespace or serviceName.' + type: string + serviceName: + description: ServiceName is the name of an existing Kubernetes + service which is used to make requests to the referenced object. + It has to be in the same namespace as the referenced resource. + If left empty, the default HTTP service of the referenced + resource is used. + type: string + type: object + type: array image: description: Image is the Logstash Docker image to deploy. Version and Type have to match the Logstash in the image. @@ -8112,6 +8151,13 @@ spec: availableNodes: format: int32 type: integer + elasticsearchAssociationsStatus: + additionalProperties: + description: AssociationStatus is the status of an association resource. + type: string + description: ElasticsearchAssociationStatus is the status of any auto-linking + to Elasticsearch clusters. + type: object expectedNodes: format: int32 type: integer diff --git a/config/samples/logstash/logstash_es.yaml b/config/samples/logstash/logstash_es.yaml new file mode 100644 index 00000000000..96e5ba42756 --- /dev/null +++ b/config/samples/logstash/logstash_es.yaml @@ -0,0 +1,37 @@ +apiVersion: elasticsearch.k8s.elastic.co/v1 +kind: Elasticsearch +metadata: + name: elasticsearch-sample +spec: + version: 8.7.0 + nodeSets: + - name: default + count: 2 + config: + node.store.allow_mmap: false +--- +apiVersion: logstash.k8s.elastic.co/v1alpha1 +kind: Logstash +metadata: + name: logstash-sample +spec: + count: 1 + version: 8.7.0 + elasticsearchRefs: + - clusterName: production + name: elasticsearch-sample +# secretName: external-cloud-es-ref + pipelines: + - pipeline.id: main + config.string: | + input { exec { command => 'uptime' interval => 10 } } + output { + elasticsearch { + hosts => [ "${PRODUCTION_ES_HOSTS}" ] + ssl => true + cacert => "${PRODUCTION_ES_SSL_CERTIFICATE_AUTHORITY}" + user => "${PRODUCTION_ES_USER}" + password => "${PRODUCTION_ES_PASSWORD}" + } + } +--- \ No newline at end of file diff --git a/deploy/eck-operator/charts/eck-operator-crds/templates/all-crds.yaml b/deploy/eck-operator/charts/eck-operator-crds/templates/all-crds.yaml index ef2b2fb2943..296d6553f3e 100644 --- a/deploy/eck-operator/charts/eck-operator-crds/templates/all-crds.yaml +++ b/deploy/eck-operator/charts/eck-operator-crds/templates/all-crds.yaml @@ -9173,6 +9173,45 @@ spec: count: format: int32 type: integer + elasticsearchRefs: + description: ElasticsearchRefs are references to Elasticsearch clusters + running in the same Kubernetes cluster. + items: + description: ElasticsearchCluster is a named reference to an Elasticsearch + cluster which can be used in a Logstash pipeline. + properties: + clusterName: + minLength: 1 + type: string + name: + description: Name of an existing Kubernetes object corresponding + to an Elastic resource managed by ECK. + type: string + namespace: + description: Namespace of the Kubernetes object. If empty, defaults + to the current namespace. + type: string + secretName: + description: 'SecretName is the name of an existing Kubernetes + secret that contains connection information for associating + an Elastic resource not managed by the operator. The referenced + secret must contain the following: - `url`: the URL to reach + the Elastic resource - `username`: the username of the user + to be authenticated to the Elastic resource - `password`: + the password of the user to be authenticated to the Elastic + resource - `ca.crt`: the CA certificate in PEM format (optional). + This field cannot be used in combination with the other fields + name, namespace or serviceName.' + type: string + serviceName: + description: ServiceName is the name of an existing Kubernetes + service which is used to make requests to the referenced object. + It has to be in the same namespace as the referenced resource. + If left empty, the default HTTP service of the referenced + resource is used. + type: string + type: object + type: array image: description: Image is the Logstash Docker image to deploy. Version and Type have to match the Logstash in the image. @@ -9801,6 +9840,13 @@ spec: availableNodes: format: int32 type: integer + elasticsearchAssociationsStatus: + additionalProperties: + description: AssociationStatus is the status of an association resource. + type: string + description: ElasticsearchAssociationStatus is the status of any auto-linking + to Elasticsearch clusters. + type: object expectedNodes: format: int32 type: integer diff --git a/docs/reference/api-docs.asciidoc b/docs/reference/api-docs.asciidoc index 040a1e30582..70d10ee67f4 100644 --- a/docs/reference/api-docs.asciidoc +++ b/docs/reference/api-docs.asciidoc @@ -611,6 +611,7 @@ ObjectSelector defines a reference to a Kubernetes object which can be an Elasti - xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-agent-v1alpha1-agentspec[$$AgentSpec$$] - xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-apm-v1-apmserverspec[$$ApmServerSpec$$] - xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-beat-v1beta1-beatspec[$$BeatSpec$$] +- xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-logstash-v1alpha1-elasticsearchcluster[$$ElasticsearchCluster$$] - xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-enterprisesearch-v1-enterprisesearchspec[$$EnterpriseSearchSpec$$] - xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-enterprisesearch-v1beta1-enterprisesearchspec[$$EnterpriseSearchSpec$$] - xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-kibana-v1-kibanaspec[$$KibanaSpec$$] @@ -1841,6 +1842,24 @@ Package v1alpha1 contains API Schema definitions for the logstash v1alpha1 API g +[id="{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-logstash-v1alpha1-elasticsearchcluster"] +=== ElasticsearchCluster + +ElasticsearchCluster is a named reference to an Elasticsearch cluster which can be used in a Logstash pipeline. + +.Appears In: +**** +- xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-logstash-v1alpha1-logstashspec[$$LogstashSpec$$] +**** + +[cols="25a,75a", options="header"] +|=== +| Field | Description +| *`ObjectSelector`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-common-v1-objectselector[$$ObjectSelector$$]__ | +| *`clusterName`* __string__ | +|=== + + [id="{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-logstash-v1alpha1-logstash"] === Logstash @@ -1895,6 +1914,7 @@ LogstashSpec defines the desired state of Logstash | *`version`* __string__ | Version of the Logstash. | *`count`* __integer__ | | *`image`* __string__ | Image is the Logstash Docker image to deploy. Version and Type have to match the Logstash in the image. +| *`elasticsearchRefs`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-logstash-v1alpha1-elasticsearchcluster[$$ElasticsearchCluster$$] array__ | ElasticsearchRefs are references to Elasticsearch clusters running in the same Kubernetes cluster. | *`config`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-common-v1-config[$$Config$$]__ | Config holds the Logstash configuration. At most one of [`Config`, `ConfigRef`] can be specified. | *`configRef`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-common-v1-configsource[$$ConfigSource$$]__ | ConfigRef contains a reference to an existing Kubernetes Secret holding the Logstash configuration. Logstash settings must be specified as yaml, under a single "logstash.yml" entry. At most one of [`Config`, `ConfigRef`] can be specified. | *`pipelines`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-common-v1-config[$$Config$$] array__ | Pipelines holds the Logstash Pipelines. At most one of [`Pipelines`, `PipelinesRef`] can be specified. diff --git a/pkg/apis/logstash/v1alpha1/logstash_types.go b/pkg/apis/logstash/v1alpha1/logstash_types.go index bf595c3c3a0..aefec713402 100644 --- a/pkg/apis/logstash/v1alpha1/logstash_types.go +++ b/pkg/apis/logstash/v1alpha1/logstash_types.go @@ -31,6 +31,10 @@ type LogstashSpec struct { // +kubebuilder:validation:Optional Image string `json:"image,omitempty"` + // ElasticsearchRefs are references to Elasticsearch clusters running in the same Kubernetes cluster. + // +kubebuilder:validation:Optional + ElasticsearchRefs []ElasticsearchCluster `json:"elasticsearchRefs,omitempty"` + // Config holds the Logstash configuration. At most one of [`Config`, `ConfigRef`] can be specified. // +kubebuilder:validation:Optional // +kubebuilder:pruning:PreserveUnknownFields @@ -92,6 +96,14 @@ type LogstashService struct { TLS commonv1.TLSOptions `json:"tls,omitempty"` } +// ElasticsearchCluster is a named reference to an Elasticsearch cluster which can be used in a Logstash pipeline. +type ElasticsearchCluster struct { + commonv1.ObjectSelector `json:",omitempty,inline"` + // +kubebuilder:validation:Required + // +kubebuilder:validation:MinLength=1 + ClusterName string `json:"clusterName,omitempty"` +} + // LogstashStatus defines the observed state of Logstash type LogstashStatus struct { // Version of the stack resource currently running. During version upgrades, multiple versions may run @@ -109,6 +121,9 @@ type LogstashStatus struct { // controller has not yet processed the changes contained in the Logstash specification. ObservedGeneration int64 `json:"observedGeneration,omitempty"` + // ElasticsearchAssociationStatus is the status of any auto-linking to Elasticsearch clusters. + ElasticsearchAssociationsStatus commonv1.AssociationStatusMap `json:"elasticsearchAssociationsStatus,omitempty"` + // MonitoringAssociationStatus is the status of any auto-linking to monitoring Elasticsearch clusters. MonitoringAssociationStatus commonv1.AssociationStatusMap `json:"monitoringAssociationStatus,omitempty"` } @@ -131,6 +146,7 @@ type Logstash struct { Spec LogstashSpec `json:"spec,omitempty"` Status LogstashStatus `json:"status,omitempty"` + EsAssocConfs map[commonv1.ObjectSelector]commonv1.AssociationConf `json:"-"` MonitoringAssocConfs map[commonv1.ObjectSelector]commonv1.AssociationConf `json:"-"` } @@ -143,6 +159,13 @@ type LogstashList struct { Items []Logstash `json:"items"` } +func (l *Logstash) ElasticsearchRefs() []commonv1.ObjectSelector { + refs := make([]commonv1.ObjectSelector, len(l.Spec.ElasticsearchRefs)) + for i, r := range l.Spec.ElasticsearchRefs { + refs[i] = r.ObjectSelector + } + return refs +} func (l *Logstash) ServiceAccountName() string { return l.Spec.ServiceAccountName } @@ -162,7 +185,21 @@ func (l *Logstash) GetObservedGeneration() int64 { } func (l *Logstash) GetAssociations() []commonv1.Association { - var associations []commonv1.Association + associations := make( + []commonv1.Association, + 0, + len(l.Spec.ElasticsearchRefs)+len(l.Spec.Monitoring.Metrics.ElasticsearchRefs)+len(l.Spec.Monitoring.Logs.ElasticsearchRefs), + ) + + for _, ref := range l.Spec.ElasticsearchRefs { + associations = append(associations, &LogstashESAssociation{ + Logstash: l, + ElasticsearchCluster: ElasticsearchCluster{ + ObjectSelector: ref.WithDefaultNamespace(l.Namespace), + ClusterName: ref.ClusterName, + }, + }) + } for _, ref := range l.Spec.Monitoring.Metrics.ElasticsearchRefs { if ref.IsDefined() { @@ -185,7 +222,12 @@ func (l *Logstash) GetAssociations() []commonv1.Association { } func (l *Logstash) AssociationStatusMap(typ commonv1.AssociationType) commonv1.AssociationStatusMap { - if typ == commonv1.LogstashMonitoringAssociationType { + switch typ { + case commonv1.ElasticsearchAssociationType: + if len(l.Spec.ElasticsearchRefs) > 0 { + return l.Status.ElasticsearchAssociationsStatus + } + case commonv1.LogstashMonitoringAssociationType: for _, esRef := range l.Spec.Monitoring.Metrics.ElasticsearchRefs { if esRef.IsDefined() { return l.Status.MonitoringAssociationStatus @@ -203,6 +245,9 @@ func (l *Logstash) AssociationStatusMap(typ commonv1.AssociationType) commonv1.A func (l *Logstash) SetAssociationStatusMap(typ commonv1.AssociationType, status commonv1.AssociationStatusMap) error { switch typ { + case commonv1.ElasticsearchAssociationType: + l.Status.ElasticsearchAssociationsStatus = status + return nil case commonv1.LogstashMonitoringAssociationType: l.Status.MonitoringAssociationStatus = status return nil @@ -211,6 +256,57 @@ func (l *Logstash) SetAssociationStatusMap(typ commonv1.AssociationType, status } } +type LogstashESAssociation struct { + // The associated Logstash + *Logstash + ElasticsearchCluster +} + +var _ commonv1.Association = &LogstashESAssociation{} + +func (lses *LogstashESAssociation) ElasticServiceAccount() (commonv1.ServiceAccountName, error) { + return "", nil +} + +func (lses *LogstashESAssociation) Associated() commonv1.Associated { + if lses == nil { + return nil + } + if lses.Logstash == nil { + lses.Logstash = &Logstash{} + } + return lses.Logstash +} + +func (lses *LogstashESAssociation) AssociationType() commonv1.AssociationType { + return commonv1.ElasticsearchAssociationType +} + +func (lses *LogstashESAssociation) AssociationRef() commonv1.ObjectSelector { + return lses.ElasticsearchCluster.ObjectSelector +} + +func (lses *LogstashESAssociation) AssociationConfAnnotationName() string { + return commonv1.ElasticsearchConfigAnnotationName(lses.ElasticsearchCluster.ObjectSelector) +} + +func (lses *LogstashESAssociation) AssociationConf() (*commonv1.AssociationConf, error) { + return commonv1.GetAndSetAssociationConfByRef(lses, lses.ElasticsearchCluster.ObjectSelector, lses.EsAssocConfs) +} + +func (lses *LogstashESAssociation) SetAssociationConf(conf *commonv1.AssociationConf) { + if lses.EsAssocConfs == nil { + lses.EsAssocConfs = make(map[commonv1.ObjectSelector]commonv1.AssociationConf) + } + if conf != nil { + lses.EsAssocConfs[lses.ElasticsearchCluster.ObjectSelector] = *conf + } +} + +func (lses *LogstashESAssociation) AssociationID() string { + return fmt.Sprintf("%s-%s", lses.ElasticsearchCluster.ObjectSelector.Namespace, lses.ElasticsearchCluster.ObjectSelector.NameOrSecretName()) +} + type LogstashMonitoringAssociation struct { // The associated Logstash *Logstash diff --git a/pkg/apis/logstash/v1alpha1/validations.go b/pkg/apis/logstash/v1alpha1/validations.go index 19d96205008..da863cf19b5 100644 --- a/pkg/apis/logstash/v1alpha1/validations.go +++ b/pkg/apis/logstash/v1alpha1/validations.go @@ -5,6 +5,8 @@ package v1alpha1 import ( + "fmt" + "k8s.io/apimachinery/pkg/util/validation/field" commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" @@ -23,6 +25,7 @@ var ( checkNameLength, checkSupportedVersion, checkSingleConfigSource, + checkESRefsNamed, checkMonitoring, checkAssociations, checkSinglePipelineSource, @@ -72,7 +75,8 @@ func checkAssociations(l *Logstash) field.ErrorList { monitoringPath := field.NewPath("spec").Child("monitoring") err1 := commonv1.CheckAssociationRefs(monitoringPath.Child("metrics"), l.GetMonitoringMetricsRefs()...) err2 := commonv1.CheckAssociationRefs(monitoringPath.Child("logs"), l.GetMonitoringLogsRefs()...) - return append(err1, err2...) + err3 := commonv1.CheckAssociationRefs(field.NewPath("spec").Child("elasticsearchRefs"), l.ElasticsearchRefs()...) + return append(append(err1, err2...), err3...) } func checkSinglePipelineSource(a *Logstash) field.ErrorList { @@ -86,3 +90,18 @@ func checkSinglePipelineSource(a *Logstash) field.ErrorList { return nil } + +func checkESRefsNamed(l *Logstash) field.ErrorList { + var errorList field.ErrorList + for i, esRef := range l.Spec.ElasticsearchRefs { + if esRef.ClusterName == "" { + errorList = append( + errorList, + field.Required( + field.NewPath("spec").Child("elasticsearchRefs").Index(i).Child("clusterName"), + fmt.Sprintf("clusterName is a mandatory field - missing on %v", esRef.NamespacedName())), + ) + } + } + return errorList +} diff --git a/pkg/apis/logstash/v1alpha1/validations_test.go b/pkg/apis/logstash/v1alpha1/validations_test.go index d6ecd75d5a5..24851bfa753 100644 --- a/pkg/apis/logstash/v1alpha1/validations_test.go +++ b/pkg/apis/logstash/v1alpha1/validations_test.go @@ -229,3 +229,176 @@ func Test_checkSupportedVersion(t *testing.T) { }) } } + +func Test_checkEsRefsAssociations(t *testing.T) { + type args struct { + b *Logstash + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "no ref: OK", + args: args{ + b: &Logstash{}, + }, + wantErr: false, + }, + { + name: "mix secret named and named refs: OK", + args: args{ + b: &Logstash{ + Spec: LogstashSpec{ + ElasticsearchRefs: []ElasticsearchCluster{ + { + ObjectSelector: commonv1.ObjectSelector{SecretName: "bla"}, + ClusterName: "test", + }, + { + ObjectSelector: commonv1.ObjectSelector{Name: "bla", Namespace: "blub"}, + ClusterName: "test2", + }, + }, + }, + }, + }, + wantErr: false, + }, + { + name: "secret named ref with a name: NOK", + args: args{ + b: &Logstash{ + Spec: LogstashSpec{ + ElasticsearchRefs: []ElasticsearchCluster{ + { + ObjectSelector: commonv1.ObjectSelector{SecretName: "bla", Name: "bla"}, + ClusterName: "test", + }, + }, + }, + }, + }, + wantErr: true, + }, + { + name: "no name or secret name with namespace: NOK", + args: args{ + b: &Logstash{ + Spec: LogstashSpec{ + ElasticsearchRefs: []ElasticsearchCluster{ + { + ObjectSelector: commonv1.ObjectSelector{Namespace: "blub"}, + ClusterName: "test", + }, + }, + }, + }, + }, + wantErr: true, + }, + { + name: "no name or secret name with serviceName: NOK", + args: args{ + b: &Logstash{ + Spec: LogstashSpec{ + ElasticsearchRefs: []ElasticsearchCluster{ + { + ObjectSelector: commonv1.ObjectSelector{ServiceName: "ble"}, + ClusterName: "test", + }, + }, + }, + }, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := checkAssociations(tt.args.b) + assert.Equal(t, tt.wantErr, len(got) > 0) + }) + } +} + +func Test_checkESRefsNamed(t *testing.T) { + type args struct { + b *Logstash + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "no ref: OK", + args: args{ + b: &Logstash{}, + }, + wantErr: false, + }, + { + name: "one ref, missing clusterName: NOK", + args: args{ + b: &Logstash{ + Spec: LogstashSpec{ + ElasticsearchRefs: []ElasticsearchCluster{ + { + ObjectSelector: commonv1.ObjectSelector{Name: "bla", Namespace: "blub"}, + }, + }, + }, + }, + }, + wantErr: true, + }, + { + name: "multiple refs, each with clusterName: OK", + args: args{ + b: &Logstash{ + Spec: LogstashSpec{ + ElasticsearchRefs: []ElasticsearchCluster{ + { + ObjectSelector: commonv1.ObjectSelector{Name: "bla", Namespace: "blub"}, + ClusterName: "bla", + }, + { + ObjectSelector: commonv1.ObjectSelector{Name: "bla", Namespace: "blub"}, + ClusterName: "blub", + }, + }, + }, + }, + }, + wantErr: false, + }, + { + name: "multiple refs, one missing clusterName: NOK", + args: args{ + b: &Logstash{ + Spec: LogstashSpec{ + ElasticsearchRefs: []ElasticsearchCluster{ + { + ObjectSelector: commonv1.ObjectSelector{Name: "bla", Namespace: "blub"}, + ClusterName: "", + }, + { + ObjectSelector: commonv1.ObjectSelector{Name: "bla", Namespace: "blub"}, + ClusterName: "default", + }, + }, + }, + }, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := checkESRefsNamed(tt.args.b) + assert.Equal(t, tt.wantErr, len(got) > 0) + }) + } +} diff --git a/pkg/apis/logstash/v1alpha1/webhook_test.go b/pkg/apis/logstash/v1alpha1/webhook_test.go index 6e949c1c1ae..dc04599c9a8 100644 --- a/pkg/apis/logstash/v1alpha1/webhook_test.go +++ b/pkg/apis/logstash/v1alpha1/webhook_test.go @@ -25,10 +25,10 @@ func TestWebhook(t *testing.T) { Operation: admissionv1beta1.Create, Object: func(t *testing.T, uid string) []byte { t.Helper() - ent := mkLogstash(uid) - ent.Spec.Version = "8.7.0" - ent.Spec.Monitoring = commonv1.Monitoring{Metrics: commonv1.MetricsMonitoring{ElasticsearchRefs: []commonv1.ObjectSelector{{Name: "esmonname", Namespace: "esmonns"}}}} - return serialize(t, ent) + ls := mkLogstash(uid) + ls.Spec.Version = "8.7.0" + ls.Spec.Monitoring = commonv1.Monitoring{Metrics: commonv1.MetricsMonitoring{ElasticsearchRefs: []commonv1.ObjectSelector{{Name: "esmonname", Namespace: "esmonns"}}}} + return serialize(t, ls) }, Check: test.ValidationWebhookSucceeded, }, @@ -37,13 +37,13 @@ func TestWebhook(t *testing.T) { Operation: admissionv1beta1.Create, Object: func(t *testing.T, uid string) []byte { t.Helper() - ent := mkLogstash(uid) - ent.Spec.Version = "8.7.0" - ent.Spec.Monitoring = commonv1.Monitoring{ + ls := mkLogstash(uid) + ls.Spec.Version = "8.7.0" + ls.Spec.Monitoring = commonv1.Monitoring{ Metrics: commonv1.MetricsMonitoring{ElasticsearchRefs: []commonv1.ObjectSelector{{SecretName: "es1monname"}}}, Logs: commonv1.LogsMonitoring{ElasticsearchRefs: []commonv1.ObjectSelector{{SecretName: "es2monname"}}}, } - return serialize(t, ent) + return serialize(t, ls) }, Check: test.ValidationWebhookSucceeded, }, @@ -52,10 +52,10 @@ func TestWebhook(t *testing.T) { Operation: admissionv1beta1.Create, Object: func(t *testing.T, uid string) []byte { t.Helper() - ent := mkLogstash(uid) - ent.Spec.Version = "7.13.0" - ent.Spec.Monitoring = commonv1.Monitoring{Metrics: commonv1.MetricsMonitoring{ElasticsearchRefs: []commonv1.ObjectSelector{{Name: "esmonname", Namespace: "esmonns"}}}} - return serialize(t, ent) + ls := mkLogstash(uid) + ls.Spec.Version = "7.13.0" + ls.Spec.Monitoring = commonv1.Monitoring{Metrics: commonv1.MetricsMonitoring{ElasticsearchRefs: []commonv1.ObjectSelector{{Name: "esmonname", Namespace: "esmonns"}}}} + return serialize(t, ls) }, Check: test.ValidationWebhookFailed( `spec.version: Invalid value: "7.13.0": Unsupported version for Stack Monitoring. Required >= 8.7.0`, @@ -66,13 +66,13 @@ func TestWebhook(t *testing.T) { Operation: admissionv1beta1.Create, Object: func(t *testing.T, uid string) []byte { t.Helper() - ent := mkLogstash(uid) - ent.Spec.Version = "8.7.0" - ent.Spec.Monitoring = commonv1.Monitoring{ + ls := mkLogstash(uid) + ls.Spec.Version = "8.7.0" + ls.Spec.Monitoring = commonv1.Monitoring{ Metrics: commonv1.MetricsMonitoring{ElasticsearchRefs: []commonv1.ObjectSelector{{SecretName: "es1monname", Name: "xx"}}}, Logs: commonv1.LogsMonitoring{ElasticsearchRefs: []commonv1.ObjectSelector{{SecretName: "es2monname"}}}, } - return serialize(t, ent) + return serialize(t, ls) }, Check: test.ValidationWebhookFailed( `spec.monitoring.metrics: Forbidden: Invalid association reference: specify name or secretName, not both`, @@ -83,13 +83,13 @@ func TestWebhook(t *testing.T) { Operation: admissionv1beta1.Create, Object: func(t *testing.T, uid string) []byte { t.Helper() - ent := mkLogstash(uid) - ent.Spec.Version = "8.7.0" - ent.Spec.Monitoring = commonv1.Monitoring{ + ls := mkLogstash(uid) + ls.Spec.Version = "8.7.0" + ls.Spec.Monitoring = commonv1.Monitoring{ Metrics: commonv1.MetricsMonitoring{ElasticsearchRefs: []commonv1.ObjectSelector{{SecretName: "es1monname"}}}, Logs: commonv1.LogsMonitoring{ElasticsearchRefs: []commonv1.ObjectSelector{{SecretName: "es2monname", ServiceName: "xx"}}}, } - return serialize(t, ent) + return serialize(t, ls) }, Check: test.ValidationWebhookFailed( `spec.monitoring.logs: Forbidden: Invalid association reference: serviceName or namespace can only be used in combination with name, not with secretName`, diff --git a/pkg/apis/logstash/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/logstash/v1alpha1/zz_generated.deepcopy.go index 7cbc8dd427c..27ceaac3a2f 100644 --- a/pkg/apis/logstash/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/logstash/v1alpha1/zz_generated.deepcopy.go @@ -14,6 +14,22 @@ import ( "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ElasticsearchCluster) DeepCopyInto(out *ElasticsearchCluster) { + *out = *in + out.ObjectSelector = in.ObjectSelector +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ElasticsearchCluster. +func (in *ElasticsearchCluster) DeepCopy() *ElasticsearchCluster { + if in == nil { + return nil + } + out := new(ElasticsearchCluster) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Logstash) DeepCopyInto(out *Logstash) { *out = *in @@ -21,6 +37,13 @@ func (in *Logstash) DeepCopyInto(out *Logstash) { in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) in.Status.DeepCopyInto(&out.Status) + if in.EsAssocConfs != nil { + in, out := &in.EsAssocConfs, &out.EsAssocConfs + *out = make(map[v1.ObjectSelector]v1.AssociationConf, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } if in.MonitoringAssocConfs != nil { in, out := &in.MonitoringAssocConfs, &out.MonitoringAssocConfs *out = make(map[v1.ObjectSelector]v1.AssociationConf, len(*in)) @@ -48,6 +71,27 @@ func (in *Logstash) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *LogstashESAssociation) DeepCopyInto(out *LogstashESAssociation) { + *out = *in + if in.Logstash != nil { + in, out := &in.Logstash, &out.Logstash + *out = new(Logstash) + (*in).DeepCopyInto(*out) + } + out.ElasticsearchCluster = in.ElasticsearchCluster +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LogstashESAssociation. +func (in *LogstashESAssociation) DeepCopy() *LogstashESAssociation { + if in == nil { + return nil + } + out := new(LogstashESAssociation) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LogstashList) DeepCopyInto(out *LogstashList) { *out = *in @@ -121,6 +165,11 @@ func (in *LogstashService) DeepCopy() *LogstashService { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LogstashSpec) DeepCopyInto(out *LogstashSpec) { *out = *in + if in.ElasticsearchRefs != nil { + in, out := &in.ElasticsearchRefs, &out.ElasticsearchRefs + *out = make([]ElasticsearchCluster, len(*in)) + copy(*out, *in) + } if in.Config != nil { in, out := &in.Config, &out.Config *out = (*in).DeepCopy() @@ -178,6 +227,13 @@ func (in *LogstashSpec) DeepCopy() *LogstashSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LogstashStatus) DeepCopyInto(out *LogstashStatus) { *out = *in + if in.ElasticsearchAssociationsStatus != nil { + in, out := &in.ElasticsearchAssociationsStatus, &out.ElasticsearchAssociationsStatus + *out = make(v1.AssociationStatusMap, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } if in.MonitoringAssociationStatus != nil { in, out := &in.MonitoringAssociationStatus, &out.MonitoringAssociationStatus *out = make(v1.AssociationStatusMap, len(*in)) diff --git a/pkg/controller/association/controller/logstash_es.go b/pkg/controller/association/controller/logstash_es.go index 6a79976b295..323f44d926e 100644 --- a/pkg/controller/association/controller/logstash_es.go +++ b/pkg/controller/association/controller/logstash_es.go @@ -4,6 +4,22 @@ package controller +import ( + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" + esv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/elasticsearch/v1" + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/association" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/operator" + eslabel "github.com/elastic/cloud-on-k8s/v2/pkg/controller/elasticsearch/label" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/elasticsearch/user" + "github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s" + "github.com/elastic/cloud-on-k8s/v2/pkg/utils/rbac" +) + const ( // LogstashAssociationLabelName marks resources created for an association originating from Logstash with the // Logstash name. @@ -15,3 +31,36 @@ const ( // with the target resource type (e.g. "elasticsearch"). LogstashAssociationLabelType = "logstashassociation.k8s.elastic.co/type" ) + +func AddLogstashES(mgr manager.Manager, accessReviewer rbac.AccessReviewer, params operator.Parameters) error { + return association.AddAssociationController(mgr, accessReviewer, params, association.AssociationInfo{ + AssociationType: commonv1.ElasticsearchAssociationType, + AssociatedObjTemplate: func() commonv1.Associated { return &logstashv1alpha1.Logstash{} }, + ReferencedObjTemplate: func() client.Object { return &esv1.Elasticsearch{} }, + ReferencedResourceVersion: referencedElasticsearchStatusVersion, + ExternalServiceURL: getElasticsearchExternalURL, + ReferencedResourceNamer: esv1.ESNamer, + AssociationName: "logstash-es", + AssociatedShortName: "logstash", + Labels: func(associated types.NamespacedName) map[string]string { + return map[string]string{ + LogstashAssociationLabelName: associated.Name, + LogstashAssociationLabelNamespace: associated.Namespace, + LogstashAssociationLabelType: commonv1.ElasticsearchAssociationType, + } + }, + AssociationConfAnnotationNameBase: commonv1.ElasticsearchConfigAnnotationNameBase, + AssociationResourceNameLabelName: eslabel.ClusterNameLabelName, + AssociationResourceNamespaceLabelName: eslabel.ClusterNamespaceLabelName, + + ElasticsearchUserCreation: &association.ElasticsearchUserCreation{ + ElasticsearchRef: func(c k8s.Client, association commonv1.Association) (bool, commonv1.ObjectSelector, error) { + return true, association.AssociationRef(), nil + }, + UserSecretSuffix: "logstash-user", + ESUserRole: func(associated commonv1.Associated) (string, error) { + return user.LogstashUserRole, nil + }, + }, + }) +} diff --git a/pkg/controller/elasticsearch/user/reconcile_test.go b/pkg/controller/elasticsearch/user/reconcile_test.go index ec103218cd2..50183d4a679 100644 --- a/pkg/controller/elasticsearch/user/reconcile_test.go +++ b/pkg/controller/elasticsearch/user/reconcile_test.go @@ -96,6 +96,6 @@ func Test_aggregateRoles(t *testing.T) { c := k8s.NewFakeClient(sampleUserProvidedRolesSecret...) roles, err := aggregateRoles(context.Background(), c, sampleEsWithAuth, initDynamicWatches(), record.NewFakeRecorder(10)) require.NoError(t, err) - require.Len(t, roles, 54) + require.Len(t, roles, 55) require.Contains(t, roles, ProbeUserRole, ClusterManageRole, "role1", "role2") } diff --git a/pkg/controller/elasticsearch/user/roles.go b/pkg/controller/elasticsearch/user/roles.go index 4a26c9f26de..e84584d7b81 100644 --- a/pkg/controller/elasticsearch/user/roles.go +++ b/pkg/controller/elasticsearch/user/roles.go @@ -46,6 +46,8 @@ const ( FleetAdminUserRole = "eck_fleet_admin_user_role" + LogstashUserRole = "eck_logstash_user_role" + // V70 indicates version 7.0 V70 = "v70" @@ -173,6 +175,22 @@ var ( }, }, }, + LogstashUserRole: esclient.Role{ + Cluster: []string{ + "monitor", + "manage_ilm", + "read_ilm", + "manage_logstash_pipelines", + "manage_index_templates", + "cluster:admin/ingest/pipeline/get", + }, + Indices: []esclient.IndexRole{ + { + Names: []string{"logstash", "logstash-*", "ecs-logstash", "ecs-logstash-*", "logs-*", "metrics-*", "synthetics-*", "traces-*"}, + Privileges: []string{"manage", "write", "create_index", "read", "view_index_metadata"}, + }, + }, + }, } ) diff --git a/pkg/controller/logstash/config_test.go b/pkg/controller/logstash/config_test.go index 3183b9227fe..1fc33456f43 100644 --- a/pkg/controller/logstash/config_test.go +++ b/pkg/controller/logstash/config_test.go @@ -159,4 +159,4 @@ func logstashWithConfigRef(name string, cfg *commonv1.Config) v1alpha1.Logstash Config: cfg, ConfigRef: &commonv1.ConfigSource{SecretRef: commonv1.SecretRef{SecretName: name}}}, } -} \ No newline at end of file +} diff --git a/pkg/controller/logstash/driver.go b/pkg/controller/logstash/driver.go index 79308f5f1f1..7face4fcdcb 100644 --- a/pkg/controller/logstash/driver.go +++ b/pkg/controller/logstash/driver.go @@ -95,6 +95,9 @@ func internalReconcile(params Params) (*reconciler.Results, logstashv1alpha1.Log return results.WithError(err), params.Status } - podTemplate := buildPodTemplate(params, configHash) + podTemplate, err := buildPodTemplate(params, configHash) + if err != nil { + return results.WithError(err), params.Status + } return reconcileStatefulSet(params, podTemplate) } diff --git a/pkg/controller/logstash/env.go b/pkg/controller/logstash/env.go new file mode 100644 index 00000000000..bece7fa127a --- /dev/null +++ b/pkg/controller/logstash/env.go @@ -0,0 +1,80 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package logstash + +import ( + "errors" + "path/filepath" + "strings" + + corev1 "k8s.io/api/core/v1" + + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" + "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/association" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/certificates" +) + +func buildEnv(params Params, esAssociations []commonv1.Association) ([]corev1.EnvVar, error) { + var envs []corev1.EnvVar //nolint:prealloc + for _, assoc := range esAssociations { + assocConf, err := assoc.AssociationConf() + if err != nil { + return nil, err + } + + credentials, err := association.ElasticsearchAuthSettings(params.Context, params.Client, assoc) + if err != nil { + return nil, err + } + + clusterName, err := getClusterName(assoc) + if err != nil { + return nil, err + } + + normalizedClusterName := normalize(clusterName) + + envs = append(envs, createEnvVar(normalizedClusterName+"_ES_HOSTS", assocConf.GetURL())) + envs = append(envs, createEnvVar(normalizedClusterName+"_ES_USER", credentials.Username)) + envs = append(envs, corev1.EnvVar{ + Name: normalizedClusterName + "_ES_PASSWORD", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: assocConf.AuthSecretName, + }, + Key: assocConf.AuthSecretKey, + }, + }, + }) + + if assocConf.GetCACertProvided() { + caPath := filepath.Join(certificatesDir(assoc), certificates.CAFileName) + envs = append(envs, createEnvVar(normalizedClusterName+"_ES_SSL_CERTIFICATE_AUTHORITY", caPath)) + } + } + + return envs, nil +} + +func getClusterName(assoc commonv1.Association) (string, error) { + lses, ok := assoc.(*v1alpha1.LogstashESAssociation) + if !ok { + return "", errors.New("cannot cast association to LogstashESAssociation") + } + return lses.ClusterName, nil +} + +func normalize(nn string) string { + return strings.ToUpper(strings.ReplaceAll(nn, "-", "_")) +} + +func createEnvVar(key string, value string) corev1.EnvVar { + return corev1.EnvVar{ + Name: key, + Value: value, + } +} diff --git a/pkg/controller/logstash/env_test.go b/pkg/controller/logstash/env_test.go new file mode 100644 index 00000000000..b58e8c62173 --- /dev/null +++ b/pkg/controller/logstash/env_test.go @@ -0,0 +1,164 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package logstash + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + "github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s" +) + +func Test_getEnvVars(t *testing.T) { + fakeLogstashUserSecret := corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "logstash-sample-default-elasticsearch-sample-logstash-user", Namespace: "default"}, + Data: map[string][]byte{"default-logstash-sample-default-elasticsearch-sample-logstash-user": []byte("1234567890")}, + } + + fakeExternalEsSecret := corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "external-cloud-es-ref", Namespace: "default"}, + Data: map[string][]byte{ + "url": []byte("https://some.gcp.cloud.es.io"), + "username": []byte("fake_user"), + "password": []byte("fake_password"), + }, + } + + params := Params{ + Logstash: logstashv1alpha1.Logstash{ + Spec: logstashv1alpha1.LogstashSpec{ + ElasticsearchRefs: []logstashv1alpha1.ElasticsearchCluster{ + { + ObjectSelector: commonv1.ObjectSelector{Name: "elasticsearch-sample", Namespace: "default"}, + ClusterName: "production", + }, + }, + }, + }, + Client: k8s.NewFakeClient(&fakeLogstashUserSecret, &fakeExternalEsSecret), + Context: context.Background(), + } + + for _, tt := range []struct { + name string + params Params + setAssocConfs func(assocs []commonv1.Association) + wantEnvs []corev1.EnvVar + }{ + { + name: "no es ref", + params: Params{ + Logstash: logstashv1alpha1.Logstash{ + Spec: logstashv1alpha1.LogstashSpec{}, + }, + Client: k8s.NewFakeClient(), + Context: context.Background(), + }, + setAssocConfs: func(assocs []commonv1.Association) {}, + wantEnvs: []corev1.EnvVar(nil), + }, + { + name: "es ref", + params: params, + setAssocConfs: func(assocs []commonv1.Association) { + assocs[0].SetAssociationConf(&commonv1.AssociationConf{ + AuthSecretName: "logstash-sample-default-elasticsearch-sample-logstash-user", + AuthSecretKey: "default-logstash-sample-default-elasticsearch-sample-logstash-user", + CACertProvided: true, + CASecretName: "logstash-sample-logstash-es-default-elasticsearch-sample-ca", + URL: "https://elasticsearch-sample-es-http.default.svc:9200", + Version: "8.7.0", + }) + assocs[0].SetNamespace("default") + }, + wantEnvs: []corev1.EnvVar{ + {Name: "PRODUCTION_ES_HOSTS", Value: "https://elasticsearch-sample-es-http.default.svc:9200"}, + {Name: "PRODUCTION_ES_USER", Value: "default-logstash-sample-default-elasticsearch-sample-logstash-user"}, + {Name: "PRODUCTION_ES_PASSWORD", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "logstash-sample-default-elasticsearch-sample-logstash-user", + }, + Key: "default-logstash-sample-default-elasticsearch-sample-logstash-user", + }, + }, + }, + {Name: "PRODUCTION_ES_SSL_CERTIFICATE_AUTHORITY", Value: "/mnt/elastic-internal/elasticsearch-association/default/elasticsearch-sample/certs/ca.crt"}, + }, + }, + { + name: "es ref without tls", + params: params, + setAssocConfs: func(assocs []commonv1.Association) { + assocs[0].SetAssociationConf(&commonv1.AssociationConf{ + AuthSecretName: "logstash-sample-default-elasticsearch-sample-logstash-user", + AuthSecretKey: "default-logstash-sample-default-elasticsearch-sample-logstash-user", + CACertProvided: false, + URL: "http://elasticsearch-sample-es-http.default.svc:9200", + Version: "8.7.0", + }) + assocs[0].SetNamespace("default") + }, + wantEnvs: []corev1.EnvVar{ + {Name: "PRODUCTION_ES_HOSTS", Value: "http://elasticsearch-sample-es-http.default.svc:9200"}, + {Name: "PRODUCTION_ES_USER", Value: "default-logstash-sample-default-elasticsearch-sample-logstash-user"}, + {Name: "PRODUCTION_ES_PASSWORD", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "logstash-sample-default-elasticsearch-sample-logstash-user", + }, + Key: "default-logstash-sample-default-elasticsearch-sample-logstash-user", + }, + }, + }, + }, + }, + { + name: "es ref with secretName", + params: params, + setAssocConfs: func(assocs []commonv1.Association) { + assocs[0].SetAssociationConf(&commonv1.AssociationConf{ + AuthSecretName: "external-cloud-es-ref", + AuthSecretKey: "password", + CACertProvided: false, + CASecretName: "", + URL: "https://some.gcp.cloud.es.io", + Version: "8.7.0", + }) + assocs[0].SetNamespace("default") + }, + wantEnvs: []corev1.EnvVar{ + {Name: "PRODUCTION_ES_HOSTS", Value: "https://some.gcp.cloud.es.io"}, + {Name: "PRODUCTION_ES_USER", Value: "fake_user"}, + {Name: "PRODUCTION_ES_PASSWORD", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "external-cloud-es-ref", + }, + Key: "password", + }, + }, + }, + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + assocs := tt.params.Logstash.GetAssociations() + tt.setAssocConfs(assocs) + envs, err := buildEnv(params, assocs) + require.NoError(t, err) + require.Equal(t, tt.wantEnvs, envs) + }) + } +} diff --git a/pkg/controller/logstash/initcontainer.go b/pkg/controller/logstash/initcontainer.go index 8e5b6b09331..dd6051d9b11 100644 --- a/pkg/controller/logstash/initcontainer.go +++ b/pkg/controller/logstash/initcontainer.go @@ -72,4 +72,4 @@ func initConfigContainer(ls logstashv1alpha1.Logstash) corev1.Container { }, }, } -} \ No newline at end of file +} diff --git a/pkg/controller/logstash/logstash_controller.go b/pkg/controller/logstash/logstash_controller.go index 6c71683b513..0abf64419de 100644 --- a/pkg/controller/logstash/logstash_controller.go +++ b/pkg/controller/logstash/logstash_controller.go @@ -7,8 +7,6 @@ package logstash import ( "context" - logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" - appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -20,6 +18,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/association" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/events" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/keystore" @@ -165,6 +165,14 @@ func (r *ReconcileLogstash) doReconcile(ctx context.Context, logstash logstashv1 results := reconciler.NewResult(ctx) status := newStatus(logstash) + areAssocsConfigured, err := association.AreConfiguredIfSet(ctx, logstash.GetAssociations(), r.recorder) + if err != nil { + return results.WithError(err), status + } + if !areAssocsConfigured { + return results, status + } + // Run basic validations as a fallback in case webhook is disabled. if err := r.validate(ctx, logstash); err != nil { results = results.WithError(err) diff --git a/pkg/controller/logstash/pod.go b/pkg/controller/logstash/pod.go index e923cd10901..5fe4b148ecc 100644 --- a/pkg/controller/logstash/pod.go +++ b/pkg/controller/logstash/pod.go @@ -10,14 +10,14 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/util/intstr" + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + commonassociation "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/association" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/container" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/defaults" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/tracing" - "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/volume" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/network" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/stackmon" "github.com/elastic/cloud-on-k8s/v2/pkg/utils/maps" @@ -38,14 +38,6 @@ const ( // VersionLabelName is a label used to track the version of a Logstash Pod. VersionLabelName = "logstash.k8s.elastic.co/version" - - InitContainerConfigVolumeMountPath = "/mnt/elastic-internal/logstash-config-local" - - // InternalConfigVolumeName is a volume which contains the generated configuration. - InternalConfigVolumeName = "elastic-internal-logstash-config" - InternalConfigVolumeMountPath = "/mnt/elastic-internal/logstash-config" - InternalPipelineVolumeName = "elastic-internal-logstash-pipeline" - InternalPipelineVolumeMountPath = "/mnt/elastic-internal/logstash-pipeline" ) var ( @@ -61,38 +53,25 @@ var ( } ) -var ( - // ConfigSharedVolume contains the Logstash config/ directory, it contains the contents of config from the docker container - ConfigSharedVolume = volume.SharedVolume{ - VolumeName: ConfigVolumeName, - InitContainerMountPath: InitContainerConfigVolumeMountPath, - ContainerMountPath: ConfigMountPath, - } -) - -// ConfigVolume returns a SecretVolume to hold the Logstash config of the given Logstash resource. -func ConfigVolume(ls logstashv1alpha1.Logstash) volume.SecretVolume { - return volume.NewSecretVolumeWithMountPath( - logstashv1alpha1.ConfigSecretName(ls.Name), - InternalConfigVolumeName, - InternalConfigVolumeMountPath, - ) -} - -// PipelineVolume returns a SecretVolume to hold the Logstash config of the given Logstash resource. -func PipelineVolume(ls logstashv1alpha1.Logstash) volume.SecretVolume { - return volume.NewSecretVolumeWithMountPath( - logstashv1alpha1.PipelineSecretName(ls.Name), - InternalPipelineVolumeName, - InternalPipelineVolumeMountPath, - ) -} - -func buildPodTemplate(params Params, configHash hash.Hash32) corev1.PodTemplateSpec { +func buildPodTemplate(params Params, configHash hash.Hash32) (corev1.PodTemplateSpec, error) { defer tracing.Span(¶ms.Context)() spec := ¶ms.Logstash.Spec builder := defaults.NewPodTemplateBuilder(params.GetPodTemplate(), logstashv1alpha1.LogstashContainerName) - vols := []volume.VolumeLike{ConfigSharedVolume, ConfigVolume(params.Logstash), PipelineVolume(params.Logstash)} + + vols, err := buildVolumes(params) + if err != nil { + return corev1.PodTemplateSpec{}, err + } + + esAssociations := getEsAssociations(params) + if err := writeEsAssocToConfigHash(params, esAssociations, configHash); err != nil { + return corev1.PodTemplateSpec{}, err + } + + envs, err := buildEnv(params, esAssociations) + if err != nil { + return corev1.PodTemplateSpec{}, err + } labels := maps.Merge(params.Logstash.GetIdentityLabels(), map[string]string{ VersionLabelName: spec.Version}) @@ -113,11 +92,12 @@ func buildPodTemplate(params Params, configHash hash.Hash32) corev1.PodTemplateS WithReadinessProbe(readinessProbe(params.Logstash)). WithVolumeLikes(vols...). WithInitContainers(initConfigContainer(params.Logstash)). + WithEnv(envs...). WithInitContainerDefaults() - builder, err := stackmon.WithMonitoring(params.Context, params.Client, builder, params.Logstash) + builder, err = stackmon.WithMonitoring(params.Context, params.Client, builder, params.Logstash) if err != nil { - return corev1.PodTemplateSpec{} + return corev1.PodTemplateSpec{}, err } // TODO integrate with api.ssl.enabled @@ -128,7 +108,7 @@ func buildPodTemplate(params Params, configHash hash.Hash32) corev1.PodTemplateS // WithVolumeMounts(httpVol.VolumeMount()) // } - return builder.PodTemplate + return builder.PodTemplate, nil } func getDefaultContainerPorts() []corev1.ContainerPort { @@ -161,4 +141,27 @@ func readinessProbe(logstash logstashv1alpha1.Logstash) corev1.Probe { }, } return probe -} \ No newline at end of file +} + +func getEsAssociations(params Params) []commonv1.Association { + var esAssociations []commonv1.Association + + for _, assoc := range params.Logstash.GetAssociations() { + if assoc.AssociationType() == commonv1.ElasticsearchAssociationType { + esAssociations = append(esAssociations, assoc) + } + } + return esAssociations +} + +func writeEsAssocToConfigHash(params Params, esAssociations []commonv1.Association, configHash hash.Hash) error { + if esAssociations == nil { + return nil + } + + return commonassociation.WriteAssocsToConfigHash( + params.Client, + esAssociations, + configHash, + ) +} diff --git a/pkg/controller/logstash/pod_test.go b/pkg/controller/logstash/pod_test.go index cbf9604f6cf..7cfa9171a94 100644 --- a/pkg/controller/logstash/pod_test.go +++ b/pkg/controller/logstash/pod_test.go @@ -6,9 +6,8 @@ package logstash import ( "context" - "testing" - "hash/fnv" + "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -273,7 +272,9 @@ func TestNewPodTemplateSpec(t *testing.T) { Logstash: tt.logstash, } configHash := fnv.New32a() - got := buildPodTemplate(params, configHash) + got, err := buildPodTemplate(params, configHash) + + require.NoError(t, err) tt.assertions(got) }) } diff --git a/pkg/controller/logstash/sset/sset.go b/pkg/controller/logstash/sset/sset.go index 55ca4791705..b76bac47936 100644 --- a/pkg/controller/logstash/sset/sset.go +++ b/pkg/controller/logstash/sset/sset.go @@ -7,12 +7,11 @@ package sset import ( "context" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" - appsv1 "k8s.io/api/apps/v1" - "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/hash" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/reconciler" "github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s" diff --git a/pkg/controller/logstash/volume.go b/pkg/controller/logstash/volume.go new file mode 100644 index 00000000000..9737380fdc6 --- /dev/null +++ b/pkg/controller/logstash/volume.go @@ -0,0 +1,95 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package logstash + +import ( + "fmt" + + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/volume" +) + +const ( + InitContainerConfigVolumeMountPath = "/mnt/elastic-internal/logstash-config-local" + + // InternalConfigVolumeName is a volume which contains the generated configuration. + InternalConfigVolumeName = "elastic-internal-logstash-config" + InternalConfigVolumeMountPath = "/mnt/elastic-internal/logstash-config" + InternalPipelineVolumeName = "elastic-internal-logstash-pipeline" + InternalPipelineVolumeMountPath = "/mnt/elastic-internal/logstash-pipeline" +) + +var ( + // ConfigSharedVolume contains the Logstash config/ directory, it contains the contents of config from the docker container + ConfigSharedVolume = volume.SharedVolume{ + VolumeName: ConfigVolumeName, + InitContainerMountPath: InitContainerConfigVolumeMountPath, + ContainerMountPath: ConfigMountPath, + } +) + +// ConfigVolume returns a SecretVolume to hold the Logstash config of the given Logstash resource. +func ConfigVolume(ls logstashv1alpha1.Logstash) volume.SecretVolume { + return volume.NewSecretVolumeWithMountPath( + logstashv1alpha1.ConfigSecretName(ls.Name), + InternalConfigVolumeName, + InternalConfigVolumeMountPath, + ) +} + +// PipelineVolume returns a SecretVolume to hold the Logstash config of the given Logstash resource. +func PipelineVolume(ls logstashv1alpha1.Logstash) volume.SecretVolume { + return volume.NewSecretVolumeWithMountPath( + logstashv1alpha1.PipelineSecretName(ls.Name), + InternalPipelineVolumeName, + InternalPipelineVolumeMountPath, + ) +} + +func buildVolumes(params Params) ([]volume.VolumeLike, error) { + vols := []volume.VolumeLike{ConfigSharedVolume, ConfigVolume(params.Logstash), PipelineVolume(params.Logstash)} + + // all volumes with CAs of direct associations + caAssocVols, err := getVolumesFromAssociations(params.Logstash.GetAssociations()) + if err != nil { + return nil, err + } + + vols = append(vols, caAssocVols...) + + return vols, nil +} + +func getVolumesFromAssociations(associations []commonv1.Association) ([]volume.VolumeLike, error) { + var vols []volume.VolumeLike //nolint:prealloc + for i, assoc := range associations { + assocConf, err := assoc.AssociationConf() + if err != nil { + return nil, err + } + if !assocConf.CAIsConfigured() { + // skip as there is no volume to mount if association has no CA configured + continue + } + caSecretName := assocConf.GetCASecretName() + vols = append(vols, volume.NewSecretVolumeWithMountPath( + caSecretName, + fmt.Sprintf("%s-certs-%d", assoc.AssociationType(), i), + certificatesDir(assoc), + )) + } + return vols, nil +} + +func certificatesDir(association commonv1.Association) string { + ref := association.AssociationRef() + return fmt.Sprintf( + "/mnt/elastic-internal/%s-association/%s/%s/certs", + association.AssociationType(), + ref.Namespace, + ref.NameOrSecretName(), + ) +} diff --git a/pkg/controller/logstash/volume_test.go b/pkg/controller/logstash/volume_test.go new file mode 100644 index 00000000000..af5f705d92e --- /dev/null +++ b/pkg/controller/logstash/volume_test.go @@ -0,0 +1,89 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package logstash + +import ( + "testing" + + "github.com/stretchr/testify/require" + + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" +) + +func Test_getVolumesFromAssociations(t *testing.T) { + // Note: we use setAssocConfs to set the AssociationConfs which are normally set in the reconciliation loop. + for _, tt := range []struct { + name string + params Params + setAssocConfs func(assocs []commonv1.Association) + wantAssociationsLength int + }{ + { + name: "es refs", + params: Params{ + Logstash: logstashv1alpha1.Logstash{ + Spec: logstashv1alpha1.LogstashSpec{ + ElasticsearchRefs: []logstashv1alpha1.ElasticsearchCluster{ + { + ObjectSelector: commonv1.ObjectSelector{Name: "elasticsearch"}, + ClusterName: "production", + }, + { + ObjectSelector: commonv1.ObjectSelector{Name: "elasticsearch2"}, + ClusterName: "production2", + }, + }, + }, + }, + }, + setAssocConfs: func(assocs []commonv1.Association) { + assocs[0].SetAssociationConf(&commonv1.AssociationConf{ + CASecretName: "elasticsearch-es-ca", + }) + assocs[1].SetAssociationConf(&commonv1.AssociationConf{ + CASecretName: "elasticsearch2-es-ca", + }) + }, + wantAssociationsLength: 2, + }, + { + name: "one es ref with ca, another no ca", + params: Params{ + Logstash: logstashv1alpha1.Logstash{ + Spec: logstashv1alpha1.LogstashSpec{ + ElasticsearchRefs: []logstashv1alpha1.ElasticsearchCluster{ + { + ObjectSelector: commonv1.ObjectSelector{Name: "uat"}, + ClusterName: "uat", + }, + { + ObjectSelector: commonv1.ObjectSelector{Name: "production"}, + ClusterName: "production", + }, + }, + }, + }, + }, + setAssocConfs: func(assocs []commonv1.Association) { + assocs[0].SetAssociationConf(&commonv1.AssociationConf{ + // No CASecretName + }) + assocs[1].SetAssociationConf(&commonv1.AssociationConf{ + CASecretName: "production-es-ca", + }) + }, + wantAssociationsLength: 1, + }, + } { + t.Run(tt.name, func(t *testing.T) { + assocs := tt.params.Logstash.GetAssociations() + tt.setAssocConfs(assocs) + associations, err := getVolumesFromAssociations(assocs) + require.NoError(t, err) + require.Equal(t, tt.wantAssociationsLength, len(associations)) + }) + } +} diff --git a/test/e2e/logstash/es_output_test.go b/test/e2e/logstash/es_output_test.go new file mode 100644 index 00000000000..ef23616da62 --- /dev/null +++ b/test/e2e/logstash/es_output_test.go @@ -0,0 +1,80 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +//go:build logstash || e2e + +package logstash + +import ( + "fmt" + "strconv" + "testing" + + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + "github.com/elastic/cloud-on-k8s/v2/test/e2e/test" + "github.com/elastic/cloud-on-k8s/v2/test/e2e/test/elasticsearch" + "github.com/elastic/cloud-on-k8s/v2/test/e2e/test/logstash" +) + +// TestLogstashEsOutput Logstash ingest events to Elasticsearch. Metrics should have `events.out` > 0. +func TestLogstashEsOutput(t *testing.T) { + + es := elasticsearch.NewBuilderWithoutSuffix("test-es"). + WithESMasterDataNodes(2, elasticsearch.DefaultResources) + + b := logstash.NewBuilder("test-ls-es-out"). + WithNodeCount(1). + WithPipelines([]commonv1.Config{ + { + Data: map[string]interface{}{ + "pipeline.id": "main", + "config.string": ` +input { exec { command => 'uptime' interval => 10 } } +output { + elasticsearch { + hosts => [ "${PRODUCTION_ES_HOSTS}" ] + ssl => true + cacert => "${PRODUCTION_ES_SSL_CERTIFICATE_AUTHORITY}" + user => "${PRODUCTION_ES_USER}" + password => "${PRODUCTION_ES_PASSWORD}" + } +} +`, + }, + }, + }). + WithElasticsearchRefs( + logstashv1alpha1.ElasticsearchCluster{ + ObjectSelector: es.Ref(), + ClusterName: "production", + }, + ) + + steps := test.StepsFunc(func(k *test.K8sClient) test.StepList { + return test.StepList{ + b.CheckMetricsRequest(k, + logstash.Request{ + Name: "stats events", + Path: "/_node/stats/events", + }, + logstash.Want{ + MatchFunc: map[string]func(string) bool{ + // number of events goes out should be > 0 + "events.out": func(cntStr string) bool { + cnt, err := strconv.Atoi(cntStr) + if err != nil { + fmt.Printf("failed to convert string %s to int", cntStr) + return false + } + + return cnt > 0 + }, + }, + }), + } + }) + + test.Sequence(nil, steps, es, b).RunSequential(t) +} diff --git a/test/e2e/logstash/pipeline_test.go b/test/e2e/logstash/pipeline_test.go index 594108c2871..5a9c20d737a 100644 --- a/test/e2e/logstash/pipeline_test.go +++ b/test/e2e/logstash/pipeline_test.go @@ -7,13 +7,14 @@ package logstash import ( - corev1 "k8s.io/api/core/v1" "testing" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" "github.com/elastic/cloud-on-k8s/v2/test/e2e/test" "github.com/elastic/cloud-on-k8s/v2/test/e2e/test/logstash" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // TestPipelineConfigRefLogstash PipelineRef should be able to take pipelines.yaml from Secret. @@ -62,8 +63,10 @@ func TestPipelineConfigRefLogstash(t *testing.T) { Path: "/_node/pipelines/generator", }, logstash.Want{ - Status: "green", - Match: map[string]string{"pipelines.generator.workers": "1"}, + Match: map[string]string{ + "pipelines.generator.workers": "1", + "status": "green", + }, }), test.Step{ Name: "Delete pipeline secret", @@ -141,8 +144,10 @@ func TestPipelineConfigLogstash(t *testing.T) { Path: "/_node/pipelines/split", }, logstash.Want{ - Status: "green", - Match: map[string]string{"pipelines.split.batch_size": "125"}, + Match: map[string]string{ + "pipelines.split.batch_size": "125", + "status": "green", + }, }), test.Step{ Name: "Delete pipeline secret", @@ -164,9 +169,9 @@ func TestLogstashPipelineReload(t *testing.T) { WithPipelines([]commonv1.Config{ { Data: map[string]interface{}{ - "pipeline.id": "main", + "pipeline.id": "main", "pipeline.workers": 1, - "config.string": "input { beats{ port => 5044}} output { stdout{} }", + "config.string": "input { beats{ port => 5044}} output { stdout{} }", }, }, }) @@ -175,9 +180,9 @@ func TestLogstashPipelineReload(t *testing.T) { WithPipelines([]commonv1.Config{ { Data: map[string]interface{}{ - "pipeline.id": "main", + "pipeline.id": "main", "pipeline.workers": 2, - "config.string": "input { beats{ port => 5044} } output { stdout{} }", + "config.string": "input { beats{ port => 5044} } output { stdout{} }", }, }, }). @@ -186,30 +191,34 @@ func TestLogstashPipelineReload(t *testing.T) { stepsFn := func(k *test.K8sClient) test.StepList { return test.StepList{}. WithSteps(logstashFirstPipeline.CheckK8sTestSteps(k)). - WithStep( - logstashFirstPipeline.CheckMetricsRequest(k, - logstash.Request{ - Name: "pipeline [main]", - Path: "/_node/pipelines/main", - }, - logstash.Want{ - Status: "green", - Match: map[string]string{"pipelines.main.workers": "1"}, - }), - ). + WithStep( + logstashFirstPipeline.CheckMetricsRequest(k, + logstash.Request{ + Name: "pipeline [main]", + Path: "/_node/pipelines/main", + }, + logstash.Want{ + Match: map[string]string{ + "pipelines.main.workers": "1", + "status": "green", + }, + }), + ). WithSteps(logstashSecondPipeline.MutationTestSteps(k)). WithStep( logstashSecondPipeline.CheckMetricsRequest(k, logstash.Request{ - Name: "pipeline [main]", - Path: "/_node/pipelines/main", - }, + Name: "pipeline [main]", + Path: "/_node/pipelines/main", + }, logstash.Want{ - Status: "green", - Match: map[string]string{"pipelines.main.workers": "2"}, + Match: map[string]string{ + "pipelines.main.workers": "2", + "status": "green", + }, }), ) } test.Sequence(nil, stepsFn, logstashFirstPipeline).RunSequential(t) -} \ No newline at end of file +} diff --git a/test/e2e/samples_test.go b/test/e2e/samples_test.go index a654eb8923f..d0abc87900b 100644 --- a/test/e2e/samples_test.go +++ b/test/e2e/samples_test.go @@ -17,6 +17,7 @@ import ( "k8s.io/apimachinery/pkg/util/rand" commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" "github.com/elastic/cloud-on-k8s/v2/test/e2e/cmd/run" "github.com/elastic/cloud-on-k8s/v2/test/e2e/test" "github.com/elastic/cloud-on-k8s/v2/test/e2e/test/apmserver" @@ -92,8 +93,17 @@ func createBuilders(t *testing.T, decoder *helper.YAMLDecoder, sampleFile, testN WithLabel(run.TestNameLabel, fullTestName). WithPodLabel(run.TestNameLabel, fullTestName) case logstash.Builder: + esRefs := make([]logstashv1alpha1.ElasticsearchCluster, 0, len(b.Logstash.Spec.ElasticsearchRefs)) + for _, ref := range b.Logstash.Spec.ElasticsearchRefs { + esRefs = append(esRefs, logstashv1alpha1.ElasticsearchCluster{ + ObjectSelector: tweakServiceRef(ref.ObjectSelector, suffix), + ClusterName: ref.ClusterName, + }) + } + return b.WithNamespace(namespace). WithSuffix(suffix). + WithElasticsearchRefs(esRefs...). WithRestrictedSecurityContext(). WithLabel(run.TestNameLabel, fullTestName). WithPodLabel(run.TestNameLabel, fullTestName) diff --git a/test/e2e/test/helper/yaml.go b/test/e2e/test/helper/yaml.go index 2a5d5896483..57f5d2b1905 100644 --- a/test/e2e/test/helper/yaml.go +++ b/test/e2e/test/helper/yaml.go @@ -321,8 +321,18 @@ func transformToE2E(namespace, fullTestName, suffix string, transformers []Build builder = b case *logstashv1alpha1.Logstash: b := logstash.NewBuilderWithoutSuffix(decodedObj.Name) + + esRefs := make([]logstashv1alpha1.ElasticsearchCluster, 0, len(b.Logstash.Spec.ElasticsearchRefs)) + for _, ref := range b.Logstash.Spec.ElasticsearchRefs { + esRefs = append(esRefs, logstashv1alpha1.ElasticsearchCluster{ + ObjectSelector: tweakServiceRef(ref.ObjectSelector, suffix), + ClusterName: ref.ClusterName, + }) + } + b = b.WithNamespace(namespace). WithSuffix(suffix). + WithElasticsearchRefs(esRefs...). WithLabel(run.TestNameLabel, fullTestName). WithPodLabel(run.TestNameLabel, fullTestName) diff --git a/test/e2e/test/logstash/builder.go b/test/e2e/test/logstash/builder.go index 07d0ee21def..263818635d5 100644 --- a/test/e2e/test/logstash/builder.go +++ b/test/e2e/test/logstash/builder.go @@ -151,6 +151,11 @@ func (b Builder) WithVolumeMounts(mounts ...corev1.VolumeMount) Builder { return b } +func (b Builder) WithElasticsearchRefs(refs ...logstashv1alpha1.ElasticsearchCluster) Builder { + b.Logstash.Spec.ElasticsearchRefs = refs + return b +} + func (b Builder) WithMonitoring(metricsESRef commonv1.ObjectSelector, logsESRef commonv1.ObjectSelector) Builder { b.Logstash.Spec.Monitoring.Metrics.ElasticsearchRefs = []commonv1.ObjectSelector{metricsESRef} b.Logstash.Spec.Monitoring.Logs.ElasticsearchRefs = []commonv1.ObjectSelector{logsESRef} diff --git a/test/e2e/test/logstash/checks.go b/test/e2e/test/logstash/checks.go index 19524fa3fdb..0a8b14d88fd 100644 --- a/test/e2e/test/logstash/checks.go +++ b/test/e2e/test/logstash/checks.go @@ -24,10 +24,10 @@ type Request struct { } type Want struct { - Status string // Key is field path of ucfg.Config. Value is the expected string // example, pipelines.demo.batch_size : 2 - Match map[string]string + Match map[string]string + MatchFunc map[string]func(string) bool } // CheckSecrets checks that expected secrets have been created. @@ -53,6 +53,39 @@ func CheckSecrets(b Builder, k *test.K8sClient) test.Step { }, }, } + + // check ES association user/ secret + nn := k8s.ExtractNamespacedName(&b.Logstash) + lsName := nn.Name + lsNamespace := nn.Namespace + + for _, ref := range b.Logstash.Spec.ElasticsearchRefs { + esNamespace := ref.WithDefaultNamespace(lsNamespace).Namespace + expected = append(expected, + test.ExpectedSecret{ + Name: fmt.Sprintf("%s-logstash-es-%s-%s-ca", lsName, esNamespace, ref.Name), + Keys: []string{"ca.crt", "tls.crt"}, + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/cluster-name": ref.Name, + "elasticsearch.k8s.elastic.co/cluster-namespace": esNamespace, + "logstashassociation.k8s.elastic.co/name": lsName, + "logstashassociation.k8s.elastic.co/namespace": lsNamespace, + }, + }, + ) + expected = append(expected, + test.ExpectedSecret{ + Name: fmt.Sprintf("%s-%s-%s-%s-logstash-user", lsNamespace, lsName, esNamespace, ref.Name), + Keys: []string{"name", "passwordHash", "userRoles"}, + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/cluster-name": ref.Name, + "elasticsearch.k8s.elastic.co/cluster-namespace": esNamespace, + "logstashassociation.k8s.elastic.co/name": lsName, + "logstashassociation.k8s.elastic.co/namespace": lsNamespace, + }, + }, + ) + } return expected }) } @@ -93,6 +126,18 @@ func CheckStatus(b Builder, k *test.K8sClient) test.Step { } } + // elasticsearch status + expectedEsRefsInStatus := len(logstash.Spec.ElasticsearchRefs) + actualEsRefsInStatus := len(logstash.Status.ElasticsearchAssociationsStatus) + if expectedEsRefsInStatus != actualEsRefsInStatus { + return fmt.Errorf("expected %d elasticsearch associations in status but got %d", expectedEsRefsInStatus, actualEsRefsInStatus) + } + for a, s := range logstash.Status.ElasticsearchAssociationsStatus { + if s != v1.AssociationEstablished { + return fmt.Errorf("elasticsearch association %s has status %s ", a, s) + } + } + return nil }), } @@ -106,7 +151,7 @@ func (b Builder) CheckStackTestSteps(k *test.K8sClient) test.StepList { Path: "/", }, Want{ - Status: "green", + Match: map[string]string{"status": "green"}, }), b.CheckMetricsRequest(k, Request{ @@ -114,8 +159,10 @@ func (b Builder) CheckStackTestSteps(k *test.K8sClient) test.StepList { Path: "/_node/pipelines/main", }, Want{ - Status: "green", - Match: map[string]string{"pipelines.main.batch_size": "125"}, + Match: map[string]string{ + "pipelines.main.batch_size": "125", + "status": "green", + }, }), } } @@ -147,15 +194,6 @@ func (b Builder) CheckMetricsRequest(k *test.K8sClient, req Request, want Want) return err } - // check status - status, err := res.String("status") - if err != nil { - return err - } - if status != want.Status { - return fmt.Errorf("expected %s but got %s", want.Status, status) - } - // check expected string for k, v := range want.Match { str, err := res.String(k) @@ -166,6 +204,18 @@ func (b Builder) CheckMetricsRequest(k *test.K8sClient, req Request, want Want) return fmt.Errorf("expected %s to be %s but got %s", k, v, str) } } + + // check expected expression + for k, f := range want.MatchFunc { + str, err := res.String(k) + if err != nil { + return err + } + if !f(str) { + return fmt.Errorf("expression failed: %s got %s", k, str) + } + } + return nil }), }