diff --git a/cmd/manager/main.go b/cmd/manager/main.go index fa6cc484a84..1273054e84d 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -879,6 +879,7 @@ func registerControllers(mgr manager.Manager, params operator.Parameters, access {name: "ES-MONITORING", registerFunc: associationctl.AddEsMonitoring}, {name: "KB-MONITORING", registerFunc: associationctl.AddKbMonitoring}, {name: "BEAT-MONITORING", registerFunc: associationctl.AddBeatMonitoring}, + {name: "LOGSTASH-MONITORING", registerFunc: associationctl.AddLogstashMonitoring}, } for _, c := range assocControllers { @@ -917,6 +918,7 @@ func garbageCollectUsers(ctx context.Context, cfg *rest.Config, managedNamespace For(&beatv1beta1.BeatList{}, associationctl.BeatAssociationLabelNamespace, associationctl.BeatAssociationLabelName). For(&agentv1alpha1.AgentList{}, associationctl.AgentAssociationLabelNamespace, associationctl.AgentAssociationLabelName). For(&emsv1alpha1.ElasticMapsServerList{}, associationctl.MapsESAssociationLabelNamespace, associationctl.MapsESAssociationLabelName). + For(&logstashv1alpha1.LogstashList{}, associationctl.LogstashAssociationLabelNamespace, associationctl.LogstashAssociationLabelName). DoGarbageCollection(ctx) if err != nil { return fmt.Errorf("user garbage collector failed: %w", err) diff --git a/config/crds/v1/all-crds.yaml b/config/crds/v1/all-crds.yaml index 7f291e0942d..4f30dd94d39 100644 --- a/config/crds/v1/all-crds.yaml +++ b/config/crds/v1/all-crds.yaml @@ -9121,6 +9121,108 @@ spec: description: Image is the Logstash Docker image to deploy. Version and Type have to match the Logstash in the image. type: string + monitoring: + description: Monitoring enables you to collect and ship log and monitoring + data of this Logstash. Metricbeat and Filebeat are deployed in the + same Pod as sidecars and each one sends data to one or two different + Elasticsearch monitoring clusters running in the same Kubernetes + cluster. + properties: + logs: + description: Logs holds references to Elasticsearch clusters which + receive log data from an associated resource. + properties: + elasticsearchRefs: + description: ElasticsearchRefs is a reference to a list of + monitoring Elasticsearch clusters running in the same Kubernetes + cluster. Due to existing limitations, only a single Elasticsearch + cluster is currently supported. + items: + description: ObjectSelector defines a reference to a Kubernetes + object which can be an Elastic resource managed by the + operator or a Secret describing an external Elastic resource + not managed by the operator. + properties: + name: + description: Name of an existing Kubernetes object corresponding + to an Elastic resource managed by ECK. + type: string + namespace: + description: Namespace of the Kubernetes object. If + empty, defaults to the current namespace. + type: string + secretName: + description: 'SecretName is the name of an existing + Kubernetes secret that contains connection information + for associating an Elastic resource not managed by + the operator. The referenced secret must contain the + following: - `url`: the URL to reach the Elastic resource + - `username`: the username of the user to be authenticated + to the Elastic resource - `password`: the password + of the user to be authenticated to the Elastic resource + - `ca.crt`: the CA certificate in PEM format (optional). + This field cannot be used in combination with the + other fields name, namespace or serviceName.' + type: string + serviceName: + description: ServiceName is the name of an existing + Kubernetes service which is used to make requests + to the referenced object. It has to be in the same + namespace as the referenced resource. If left empty, + the default HTTP service of the referenced resource + is used. + type: string + type: object + type: array + type: object + metrics: + description: Metrics holds references to Elasticsearch clusters + which receive monitoring data from this resource. + properties: + elasticsearchRefs: + description: ElasticsearchRefs is a reference to a list of + monitoring Elasticsearch clusters running in the same Kubernetes + cluster. Due to existing limitations, only a single Elasticsearch + cluster is currently supported. + items: + description: ObjectSelector defines a reference to a Kubernetes + object which can be an Elastic resource managed by the + operator or a Secret describing an external Elastic resource + not managed by the operator. + properties: + name: + description: Name of an existing Kubernetes object corresponding + to an Elastic resource managed by ECK. + type: string + namespace: + description: Namespace of the Kubernetes object. If + empty, defaults to the current namespace. + type: string + secretName: + description: 'SecretName is the name of an existing + Kubernetes secret that contains connection information + for associating an Elastic resource not managed by + the operator. The referenced secret must contain the + following: - `url`: the URL to reach the Elastic resource + - `username`: the username of the user to be authenticated + to the Elastic resource - `password`: the password + of the user to be authenticated to the Elastic resource + - `ca.crt`: the CA certificate in PEM format (optional). + This field cannot be used in combination with the + other fields name, namespace or serviceName.' + type: string + serviceName: + description: ServiceName is the name of an existing + Kubernetes service which is used to make requests + to the referenced object. It has to be in the same + namespace as the referenced resource. If left empty, + the default HTTP service of the referenced resource + is used. + type: string + type: object + type: array + type: object + type: object podTemplate: description: PodTemplate provides customisation options for the Logstash pods. @@ -9629,6 +9731,13 @@ spec: expectedNodes: format: int32 type: integer + monitoringAssociationStatus: + additionalProperties: + description: AssociationStatus is the status of an association resource. + type: string + description: MonitoringAssociationStatus is the status of any auto-linking + to monitoring Elasticsearch clusters. + type: object observedGeneration: description: ObservedGeneration is the most recent generation observed for this Logstash instance. It corresponds to the metadata generation, diff --git a/config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml b/config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml index 40d31eaf89e..5d330d10e2a 100644 --- a/config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml +++ b/config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml @@ -77,6 +77,108 @@ spec: description: Image is the Logstash Docker image to deploy. Version and Type have to match the Logstash in the image. type: string + monitoring: + description: Monitoring enables you to collect and ship log and monitoring + data of this Logstash. Metricbeat and Filebeat are deployed in the + same Pod as sidecars and each one sends data to one or two different + Elasticsearch monitoring clusters running in the same Kubernetes + cluster. + properties: + logs: + description: Logs holds references to Elasticsearch clusters which + receive log data from an associated resource. + properties: + elasticsearchRefs: + description: ElasticsearchRefs is a reference to a list of + monitoring Elasticsearch clusters running in the same Kubernetes + cluster. Due to existing limitations, only a single Elasticsearch + cluster is currently supported. + items: + description: ObjectSelector defines a reference to a Kubernetes + object which can be an Elastic resource managed by the + operator or a Secret describing an external Elastic resource + not managed by the operator. + properties: + name: + description: Name of an existing Kubernetes object corresponding + to an Elastic resource managed by ECK. + type: string + namespace: + description: Namespace of the Kubernetes object. If + empty, defaults to the current namespace. + type: string + secretName: + description: 'SecretName is the name of an existing + Kubernetes secret that contains connection information + for associating an Elastic resource not managed by + the operator. The referenced secret must contain the + following: - `url`: the URL to reach the Elastic resource + - `username`: the username of the user to be authenticated + to the Elastic resource - `password`: the password + of the user to be authenticated to the Elastic resource + - `ca.crt`: the CA certificate in PEM format (optional). + This field cannot be used in combination with the + other fields name, namespace or serviceName.' + type: string + serviceName: + description: ServiceName is the name of an existing + Kubernetes service which is used to make requests + to the referenced object. It has to be in the same + namespace as the referenced resource. If left empty, + the default HTTP service of the referenced resource + is used. + type: string + type: object + type: array + type: object + metrics: + description: Metrics holds references to Elasticsearch clusters + which receive monitoring data from this resource. + properties: + elasticsearchRefs: + description: ElasticsearchRefs is a reference to a list of + monitoring Elasticsearch clusters running in the same Kubernetes + cluster. Due to existing limitations, only a single Elasticsearch + cluster is currently supported. + items: + description: ObjectSelector defines a reference to a Kubernetes + object which can be an Elastic resource managed by the + operator or a Secret describing an external Elastic resource + not managed by the operator. + properties: + name: + description: Name of an existing Kubernetes object corresponding + to an Elastic resource managed by ECK. + type: string + namespace: + description: Namespace of the Kubernetes object. If + empty, defaults to the current namespace. + type: string + secretName: + description: 'SecretName is the name of an existing + Kubernetes secret that contains connection information + for associating an Elastic resource not managed by + the operator. The referenced secret must contain the + following: - `url`: the URL to reach the Elastic resource + - `username`: the username of the user to be authenticated + to the Elastic resource - `password`: the password + of the user to be authenticated to the Elastic resource + - `ca.crt`: the CA certificate in PEM format (optional). + This field cannot be used in combination with the + other fields name, namespace or serviceName.' + type: string + serviceName: + description: ServiceName is the name of an existing + Kubernetes service which is used to make requests + to the referenced object. It has to be in the same + namespace as the referenced resource. If left empty, + the default HTTP service of the referenced resource + is used. + type: string + type: object + type: array + type: object + type: object podTemplate: description: PodTemplate provides customisation options for the Logstash pods. @@ -7992,6 +8094,13 @@ spec: expectedNodes: format: int32 type: integer + monitoringAssociationStatus: + additionalProperties: + description: AssociationStatus is the status of an association resource. + type: string + description: MonitoringAssociationStatus is the status of any auto-linking + to monitoring Elasticsearch clusters. + type: object observedGeneration: description: ObservedGeneration is the most recent generation observed for this Logstash instance. It corresponds to the metadata generation, diff --git a/config/samples/logstash/logstash_stackmonitor.yaml b/config/samples/logstash/logstash_stackmonitor.yaml new file mode 100644 index 00000000000..5194e52be20 --- /dev/null +++ b/config/samples/logstash/logstash_stackmonitor.yaml @@ -0,0 +1,46 @@ +--- +apiVersion: elasticsearch.k8s.elastic.co/v1 +kind: Elasticsearch +metadata: + name: monitoring +spec: + version: 8.6.1 + nodeSets: + - name: default + count: 1 + config: + node.store.allow_mmap: false +--- +apiVersion: logstash.k8s.elastic.co/v1alpha1 +kind: Logstash +metadata: + name: logstash-sample +spec: + count: 1 + version: 8.7.0 + config: + log.level: info + api.http.host: "0.0.0.0" + queue.type: memory + podTemplate: + spec: + containers: + - name: logstash + monitoring: + metrics: + elasticsearchRefs: + - name: monitoring + logs: + elasticsearchRefs: + - name: monitoring +--- +apiVersion: kibana.k8s.elastic.co/v1 +kind: Kibana +metadata: + name: kibana-sample +spec: + version: 8.6.1 + elasticsearchRef: + name: monitoring + count: 1 +--- \ No newline at end of file diff --git a/deploy/eck-operator/charts/eck-operator-crds/templates/all-crds.yaml b/deploy/eck-operator/charts/eck-operator-crds/templates/all-crds.yaml index 93d34492b1f..74932dc3e28 100644 --- a/deploy/eck-operator/charts/eck-operator-crds/templates/all-crds.yaml +++ b/deploy/eck-operator/charts/eck-operator-crds/templates/all-crds.yaml @@ -9175,6 +9175,108 @@ spec: description: Image is the Logstash Docker image to deploy. Version and Type have to match the Logstash in the image. type: string + monitoring: + description: Monitoring enables you to collect and ship log and monitoring + data of this Logstash. Metricbeat and Filebeat are deployed in the + same Pod as sidecars and each one sends data to one or two different + Elasticsearch monitoring clusters running in the same Kubernetes + cluster. + properties: + logs: + description: Logs holds references to Elasticsearch clusters which + receive log data from an associated resource. + properties: + elasticsearchRefs: + description: ElasticsearchRefs is a reference to a list of + monitoring Elasticsearch clusters running in the same Kubernetes + cluster. Due to existing limitations, only a single Elasticsearch + cluster is currently supported. + items: + description: ObjectSelector defines a reference to a Kubernetes + object which can be an Elastic resource managed by the + operator or a Secret describing an external Elastic resource + not managed by the operator. + properties: + name: + description: Name of an existing Kubernetes object corresponding + to an Elastic resource managed by ECK. + type: string + namespace: + description: Namespace of the Kubernetes object. If + empty, defaults to the current namespace. + type: string + secretName: + description: 'SecretName is the name of an existing + Kubernetes secret that contains connection information + for associating an Elastic resource not managed by + the operator. The referenced secret must contain the + following: - `url`: the URL to reach the Elastic resource + - `username`: the username of the user to be authenticated + to the Elastic resource - `password`: the password + of the user to be authenticated to the Elastic resource + - `ca.crt`: the CA certificate in PEM format (optional). + This field cannot be used in combination with the + other fields name, namespace or serviceName.' + type: string + serviceName: + description: ServiceName is the name of an existing + Kubernetes service which is used to make requests + to the referenced object. It has to be in the same + namespace as the referenced resource. If left empty, + the default HTTP service of the referenced resource + is used. + type: string + type: object + type: array + type: object + metrics: + description: Metrics holds references to Elasticsearch clusters + which receive monitoring data from this resource. + properties: + elasticsearchRefs: + description: ElasticsearchRefs is a reference to a list of + monitoring Elasticsearch clusters running in the same Kubernetes + cluster. Due to existing limitations, only a single Elasticsearch + cluster is currently supported. + items: + description: ObjectSelector defines a reference to a Kubernetes + object which can be an Elastic resource managed by the + operator or a Secret describing an external Elastic resource + not managed by the operator. + properties: + name: + description: Name of an existing Kubernetes object corresponding + to an Elastic resource managed by ECK. + type: string + namespace: + description: Namespace of the Kubernetes object. If + empty, defaults to the current namespace. + type: string + secretName: + description: 'SecretName is the name of an existing + Kubernetes secret that contains connection information + for associating an Elastic resource not managed by + the operator. The referenced secret must contain the + following: - `url`: the URL to reach the Elastic resource + - `username`: the username of the user to be authenticated + to the Elastic resource - `password`: the password + of the user to be authenticated to the Elastic resource + - `ca.crt`: the CA certificate in PEM format (optional). + This field cannot be used in combination with the + other fields name, namespace or serviceName.' + type: string + serviceName: + description: ServiceName is the name of an existing + Kubernetes service which is used to make requests + to the referenced object. It has to be in the same + namespace as the referenced resource. If left empty, + the default HTTP service of the referenced resource + is used. + type: string + type: object + type: array + type: object + type: object podTemplate: description: PodTemplate provides customisation options for the Logstash pods. @@ -9683,6 +9785,13 @@ spec: expectedNodes: format: int32 type: integer + monitoringAssociationStatus: + additionalProperties: + description: AssociationStatus is the status of an association resource. + type: string + description: MonitoringAssociationStatus is the status of any auto-linking + to monitoring Elasticsearch clusters. + type: object observedGeneration: description: ObservedGeneration is the most recent generation observed for this Logstash instance. It corresponds to the metadata generation, diff --git a/docs/reference/api-docs.asciidoc b/docs/reference/api-docs.asciidoc index 68c4b5b0fc2..df6bf48238a 100644 --- a/docs/reference/api-docs.asciidoc +++ b/docs/reference/api-docs.asciidoc @@ -590,6 +590,7 @@ Monitoring holds references to both the metrics, and logs Elasticsearch clusters - xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-beat-v1beta1-beatspec[$$BeatSpec$$] - xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-elasticsearch-v1-elasticsearchspec[$$ElasticsearchSpec$$] - xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-kibana-v1-kibanaspec[$$KibanaSpec$$] +- xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-logstash-v1alpha1-logstashspec[$$LogstashSpec$$] **** [cols="25a,75a", options="header"] @@ -1897,6 +1898,7 @@ LogstashSpec defines the desired state of Logstash | *`config`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-common-v1-config[$$Config$$]__ | Config holds the Logstash configuration. At most one of [`Config`, `ConfigRef`] can be specified. | *`configRef`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-common-v1-configsource[$$ConfigSource$$]__ | ConfigRef contains a reference to an existing Kubernetes Secret holding the Logstash configuration. Logstash settings must be specified as yaml, under a single "logstash.yml" entry. At most one of [`Config`, `ConfigRef`] can be specified. | *`services`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-logstash-v1alpha1-logstashservice[$$LogstashService$$] array__ | Services contains details of services that Logstash should expose - similar to the HTTP layer configuration for the rest of the stack, but also applicable for more use cases than the metrics API, as logstash may need to be opened up for other services: beats, TCP, UDP, etc, inputs +| *`monitoring`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-common-v1-monitoring[$$Monitoring$$]__ | Monitoring enables you to collect and ship log and monitoring data of this Logstash. Metricbeat and Filebeat are deployed in the same Pod as sidecars and each one sends data to one or two different Elasticsearch monitoring clusters running in the same Kubernetes cluster. | *`podTemplate`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#podtemplatespec-v1-core[$$PodTemplateSpec$$]__ | PodTemplate provides customisation options for the Logstash pods. | *`revisionHistoryLimit`* __integer__ | RevisionHistoryLimit is the number of revisions to retain to allow rollback in the underlying StatefulSet. | *`secureSettings`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-common-v1-secretsource[$$SecretSource$$] array__ | SecureSettings is a list of references to Kubernetes Secrets containing sensitive configuration options for the Logstash. Secrets data can be then referenced in the Logstash config using the Secret's keys or as specified in `Entries` field of each SecureSetting. diff --git a/pkg/apis/beat/v1beta1/validations.go b/pkg/apis/beat/v1beta1/validations.go index 7f147ee7db6..d42148bbb69 100644 --- a/pkg/apis/beat/v1beta1/validations.go +++ b/pkg/apis/beat/v1beta1/validations.go @@ -121,5 +121,5 @@ func checkAssociations(b *Beat) field.ErrorList { } func checkMonitoring(b *Beat) field.ErrorList { - return validations.Validate(b, b.Spec.Version) + return validations.Validate(b, b.Spec.Version, validations.MinStackVersion) } diff --git a/pkg/apis/common/v1/association.go b/pkg/apis/common/v1/association.go index 721055c273f..8bcfaa2f231 100644 --- a/pkg/apis/common/v1/association.go +++ b/pkg/apis/common/v1/association.go @@ -110,6 +110,8 @@ const ( BeatAssociationType = "beat" BeatMonitoringAssociationType = "beat-monitoring" + LogstashMonitoringAssociationType = "ls-monitoring" + AssociationUnknown AssociationStatus = "" AssociationPending AssociationStatus = "Pending" AssociationEstablished AssociationStatus = "Established" diff --git a/pkg/apis/kibana/v1/webhook.go b/pkg/apis/kibana/v1/webhook.go index a304cfe9468..bc0df620985 100644 --- a/pkg/apis/kibana/v1/webhook.go +++ b/pkg/apis/kibana/v1/webhook.go @@ -123,7 +123,7 @@ func checkNoDowngrade(prev, curr *Kibana) field.ErrorList { } func checkMonitoring(k *Kibana) field.ErrorList { - errs := validations.Validate(k, k.Spec.Version) + errs := validations.Validate(k, k.Spec.Version, validations.MinStackVersion) // Kibana must be associated to an Elasticsearch when monitoring metrics are enabled if monitoring.IsMetricsDefined(k) && !k.Spec.ElasticsearchRef.IsDefined() { errs = append(errs, field.Invalid(field.NewPath("spec").Child("elasticsearchRef"), k.Spec.ElasticsearchRef, diff --git a/pkg/apis/logstash/v1alpha1/logstash_types.go b/pkg/apis/logstash/v1alpha1/logstash_types.go index 417521dbc10..843cfae5dfc 100644 --- a/pkg/apis/logstash/v1alpha1/logstash_types.go +++ b/pkg/apis/logstash/v1alpha1/logstash_types.go @@ -5,6 +5,8 @@ package v1alpha1 import ( + "fmt" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -12,6 +14,7 @@ import ( ) const ( + LogstashContainerName = "logstash" // Kind is inferred from the struct name using reflection in SchemeBuilder.Register() // we duplicate it as a constant here for practical purposes. Kind = "Logstash" @@ -45,6 +48,12 @@ type LogstashSpec struct { // +kubebuilder:validation:Optional Services []LogstashService `json:"services,omitempty"` + // Monitoring enables you to collect and ship log and monitoring data of this Logstash. + // Metricbeat and Filebeat are deployed in the same Pod as sidecars and each one sends data to one or two different + // Elasticsearch monitoring clusters running in the same Kubernetes cluster. + // +kubebuilder:validation:Optional + Monitoring commonv1.Monitoring `json:"monitoring,omitempty"` + // PodTemplate provides customisation options for the Logstash pods. // +kubebuilder:pruning:PreserveUnknownFields PodTemplate corev1.PodTemplateSpec `json:"podTemplate,omitempty"` @@ -88,6 +97,9 @@ type LogstashStatus struct { // If the generation observed in status diverges from the generation in metadata, the Logstash // controller has not yet processed the changes contained in the Logstash specification. ObservedGeneration int64 `json:"observedGeneration,omitempty"` + + // MonitoringAssociationStatus is the status of any auto-linking to monitoring Elasticsearch clusters. + MonitoringAssociationStatus commonv1.AssociationStatusMap `json:"monitoringAssociationStatus,omitempty"` } // +kubebuilder:object:root=true @@ -106,8 +118,9 @@ type Logstash struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Spec LogstashSpec `json:"spec,omitempty"` - Status LogstashStatus `json:"status,omitempty"` + Spec LogstashSpec `json:"spec,omitempty"` + Status LogstashStatus `json:"status,omitempty"` + MonitoringAssocConfs map[commonv1.ObjectSelector]commonv1.AssociationConf `json:"-"` } // +kubebuilder:object:root=true @@ -137,6 +150,123 @@ func (l *Logstash) GetObservedGeneration() int64 { return l.Status.ObservedGeneration } +func (l *Logstash) GetAssociations() []commonv1.Association { + var associations []commonv1.Association + + for _, ref := range l.Spec.Monitoring.Metrics.ElasticsearchRefs { + if ref.IsDefined() { + associations = append(associations, &LogstashMonitoringAssociation{ + Logstash: l, + ref: ref.WithDefaultNamespace(l.Namespace), + }) + } + } + for _, ref := range l.Spec.Monitoring.Logs.ElasticsearchRefs { + if ref.IsDefined() { + associations = append(associations, &LogstashMonitoringAssociation{ + Logstash: l, + ref: ref.WithDefaultNamespace(l.Namespace), + }) + } + } + + return associations +} + +func (l *Logstash) AssociationStatusMap(typ commonv1.AssociationType) commonv1.AssociationStatusMap { + if typ == commonv1.LogstashMonitoringAssociationType { + for _, esRef := range l.Spec.Monitoring.Metrics.ElasticsearchRefs { + if esRef.IsDefined() { + return l.Status.MonitoringAssociationStatus + } + } + for _, esRef := range l.Spec.Monitoring.Logs.ElasticsearchRefs { + if esRef.IsDefined() { + return l.Status.MonitoringAssociationStatus + } + } + } + + return commonv1.AssociationStatusMap{} +} + +func (l *Logstash) SetAssociationStatusMap(typ commonv1.AssociationType, status commonv1.AssociationStatusMap) error { + switch typ { + case commonv1.LogstashMonitoringAssociationType: + l.Status.MonitoringAssociationStatus = status + return nil + default: + return fmt.Errorf("association type %s not known", typ) + } +} + +type LogstashMonitoringAssociation struct { + // The associated Logstash + *Logstash + // ref is the object selector of the monitoring Elasticsearch referenced in the Association + ref commonv1.ObjectSelector +} + +var _ commonv1.Association = &LogstashMonitoringAssociation{} + +func (lsmon *LogstashMonitoringAssociation) ElasticServiceAccount() (commonv1.ServiceAccountName, error) { + return "", nil +} + +func (lsmon *LogstashMonitoringAssociation) Associated() commonv1.Associated { + if lsmon == nil { + return nil + } + if lsmon.Logstash == nil { + lsmon.Logstash = &Logstash{} + } + return lsmon.Logstash +} + +func (lsmon *LogstashMonitoringAssociation) AssociationConfAnnotationName() string { + return commonv1.ElasticsearchConfigAnnotationName(lsmon.ref) +} + +func (lsmon *LogstashMonitoringAssociation) AssociationType() commonv1.AssociationType { + return commonv1.LogstashMonitoringAssociationType +} + +func (lsmon *LogstashMonitoringAssociation) AssociationRef() commonv1.ObjectSelector { + return lsmon.ref +} + +func (lsmon *LogstashMonitoringAssociation) AssociationConf() (*commonv1.AssociationConf, error) { + return commonv1.GetAndSetAssociationConfByRef(lsmon, lsmon.ref, lsmon.MonitoringAssocConfs) +} + +func (lsmon *LogstashMonitoringAssociation) SetAssociationConf(assocConf *commonv1.AssociationConf) { + if lsmon.MonitoringAssocConfs == nil { + lsmon.MonitoringAssocConfs = make(map[commonv1.ObjectSelector]commonv1.AssociationConf) + } + if assocConf != nil { + lsmon.MonitoringAssocConfs[lsmon.ref] = *assocConf + } +} + +func (lsmon *LogstashMonitoringAssociation) AssociationID() string { + return lsmon.ref.ToID() +} + +func (l *Logstash) GetMonitoringMetricsRefs() []commonv1.ObjectSelector { + return l.Spec.Monitoring.Metrics.ElasticsearchRefs +} + +func (l *Logstash) GetMonitoringLogsRefs() []commonv1.ObjectSelector { + return l.Spec.Monitoring.Logs.ElasticsearchRefs +} + +func (l *Logstash) MonitoringAssociation(esRef commonv1.ObjectSelector) commonv1.Association { + return &LogstashMonitoringAssociation{ + Logstash: l, + ref: esRef.WithDefaultNamespace(l.Namespace), + } +} + func init() { SchemeBuilder.Register(&Logstash{}, &LogstashList{}) } diff --git a/pkg/apis/logstash/v1alpha1/validations.go b/pkg/apis/logstash/v1alpha1/validations.go index 1f70d2783a7..5cddf8ca6e0 100644 --- a/pkg/apis/logstash/v1alpha1/validations.go +++ b/pkg/apis/logstash/v1alpha1/validations.go @@ -8,15 +8,23 @@ import ( "k8s.io/apimachinery/pkg/util/validation/field" commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/stackmon/validations" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/version" ) var ( + // MinStackMonVersion is the minimum version of Logstash to enable Stack Monitoring on an Elastic Stack application. + // This requirement comes from the fact that we configure Logstash to write logs to disk for Filebeat + // via the env var LOG_STYLE available from this version. + MinStackMonVersion = version.MustParse("8.7.0-SNAPSHOT") + defaultChecks = []func(*Logstash) field.ErrorList{ checkNoUnknownFields, checkNameLength, checkSupportedVersion, checkSingleConfigSource, + checkMonitoring, + checkAssociations, } updateChecks = []func(old, curr *Logstash) field.ErrorList{ @@ -24,16 +32,16 @@ var ( } ) -func checkNoUnknownFields(a *Logstash) field.ErrorList { - return commonv1.NoUnknownFields(a, a.ObjectMeta) +func checkNoUnknownFields(l *Logstash) field.ErrorList { + return commonv1.NoUnknownFields(l, l.ObjectMeta) } -func checkNameLength(a *Logstash) field.ErrorList { - return commonv1.CheckNameLength(a) +func checkNameLength(l *Logstash) field.ErrorList { + return commonv1.CheckNameLength(l) } -func checkSupportedVersion(a *Logstash) field.ErrorList { - return commonv1.CheckSupportedStackVersion(a.Spec.Version, version.SupportedLogstashVersions) +func checkSupportedVersion(l *Logstash) field.ErrorList { + return commonv1.CheckSupportedStackVersion(l.Spec.Version, version.SupportedLogstashVersions) } func checkNoDowngrade(prev, curr *Logstash) field.ErrorList { @@ -43,8 +51,8 @@ func checkNoDowngrade(prev, curr *Logstash) field.ErrorList { return commonv1.CheckNoDowngrade(prev.Spec.Version, curr.Spec.Version) } -func checkSingleConfigSource(a *Logstash) field.ErrorList { - if a.Spec.Config != nil && a.Spec.ConfigRef != nil { +func checkSingleConfigSource(l *Logstash) field.ErrorList { + if l.Spec.Config != nil && l.Spec.ConfigRef != nil { msg := "Specify at most one of [`config`, `configRef`], not both" return field.ErrorList{ field.Forbidden(field.NewPath("spec").Child("config"), msg), @@ -54,3 +62,14 @@ func checkSingleConfigSource(a *Logstash) field.ErrorList { return nil } + +func checkMonitoring(l *Logstash) field.ErrorList { + return validations.Validate(l, l.Spec.Version, MinStackMonVersion) +} + +func checkAssociations(l *Logstash) field.ErrorList { + monitoringPath := field.NewPath("spec").Child("monitoring") + err1 := commonv1.CheckAssociationRefs(monitoringPath.Child("metrics"), l.GetMonitoringMetricsRefs()...) + err2 := commonv1.CheckAssociationRefs(monitoringPath.Child("logs"), l.GetMonitoringLogsRefs()...) + return append(err1, err2...) +} diff --git a/pkg/apis/logstash/v1alpha1/webhook_test.go b/pkg/apis/logstash/v1alpha1/webhook_test.go new file mode 100644 index 00000000000..6e949c1c1ae --- /dev/null +++ b/pkg/apis/logstash/v1alpha1/webhook_test.go @@ -0,0 +1,124 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package v1alpha1_test + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/require" + admissionv1beta1 "k8s.io/api/admission/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" + "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + "github.com/elastic/cloud-on-k8s/v2/pkg/utils/test" +) + +func TestWebhook(t *testing.T) { + testCases := []test.ValidationWebhookTestCase{ + { + Name: "simple-stackmon-ref", + Operation: admissionv1beta1.Create, + Object: func(t *testing.T, uid string) []byte { + t.Helper() + ent := mkLogstash(uid) + ent.Spec.Version = "8.7.0" + ent.Spec.Monitoring = commonv1.Monitoring{Metrics: commonv1.MetricsMonitoring{ElasticsearchRefs: []commonv1.ObjectSelector{{Name: "esmonname", Namespace: "esmonns"}}}} + return serialize(t, ent) + }, + Check: test.ValidationWebhookSucceeded, + }, + { + Name: "multiple-stackmon-ref", + Operation: admissionv1beta1.Create, + Object: func(t *testing.T, uid string) []byte { + t.Helper() + ent := mkLogstash(uid) + ent.Spec.Version = "8.7.0" + ent.Spec.Monitoring = commonv1.Monitoring{ + Metrics: commonv1.MetricsMonitoring{ElasticsearchRefs: []commonv1.ObjectSelector{{SecretName: "es1monname"}}}, + Logs: commonv1.LogsMonitoring{ElasticsearchRefs: []commonv1.ObjectSelector{{SecretName: "es2monname"}}}, + } + return serialize(t, ent) + }, + Check: test.ValidationWebhookSucceeded, + }, + { + Name: "invalid-version-for-stackmon", + Operation: admissionv1beta1.Create, + Object: func(t *testing.T, uid string) []byte { + t.Helper() + ent := mkLogstash(uid) + ent.Spec.Version = "7.13.0" + ent.Spec.Monitoring = commonv1.Monitoring{Metrics: commonv1.MetricsMonitoring{ElasticsearchRefs: []commonv1.ObjectSelector{{Name: "esmonname", Namespace: "esmonns"}}}} + return serialize(t, ent) + }, + Check: test.ValidationWebhookFailed( + `spec.version: Invalid value: "7.13.0": Unsupported version for Stack Monitoring. Required >= 8.7.0`, + ), + }, + { + Name: "invalid-stackmon-ref-with-name", + Operation: admissionv1beta1.Create, + Object: func(t *testing.T, uid string) []byte { + t.Helper() + ent := mkLogstash(uid) + ent.Spec.Version = "8.7.0" + ent.Spec.Monitoring = commonv1.Monitoring{ + Metrics: commonv1.MetricsMonitoring{ElasticsearchRefs: []commonv1.ObjectSelector{{SecretName: "es1monname", Name: "xx"}}}, + Logs: commonv1.LogsMonitoring{ElasticsearchRefs: []commonv1.ObjectSelector{{SecretName: "es2monname"}}}, + } + return serialize(t, ent) + }, + Check: test.ValidationWebhookFailed( + `spec.monitoring.metrics: Forbidden: Invalid association reference: specify name or secretName, not both`, + ), + }, + { + Name: "invalid-stackmon-ref-with-service-name", + Operation: admissionv1beta1.Create, + Object: func(t *testing.T, uid string) []byte { + t.Helper() + ent := mkLogstash(uid) + ent.Spec.Version = "8.7.0" + ent.Spec.Monitoring = commonv1.Monitoring{ + Metrics: commonv1.MetricsMonitoring{ElasticsearchRefs: []commonv1.ObjectSelector{{SecretName: "es1monname"}}}, + Logs: commonv1.LogsMonitoring{ElasticsearchRefs: []commonv1.ObjectSelector{{SecretName: "es2monname", ServiceName: "xx"}}}, + } + return serialize(t, ent) + }, + Check: test.ValidationWebhookFailed( + `spec.monitoring.logs: Forbidden: Invalid association reference: serviceName or namespace can only be used in combination with name, not with secretName`, + ), + }, + } + + validator := &v1alpha1.Logstash{} + gvk := metav1.GroupVersionKind{Group: v1alpha1.GroupVersion.Group, Version: v1alpha1.GroupVersion.Version, Kind: v1alpha1.Kind} + test.RunValidationWebhookTests(t, gvk, validator, testCases...) +} + +func mkLogstash(uid string) *v1alpha1.Logstash { + return &v1alpha1.Logstash{ + ObjectMeta: metav1.ObjectMeta{ + Name: "webhook-test", + UID: types.UID(uid), + }, + Spec: v1alpha1.LogstashSpec{ + Version: "8.6.0", + }, + } +} + +func serialize(t *testing.T, k *v1alpha1.Logstash) []byte { + t.Helper() + + objBytes, err := json.Marshal(k) + require.NoError(t, err) + + return objBytes +} diff --git a/pkg/apis/logstash/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/logstash/v1alpha1/zz_generated.deepcopy.go index e4863e04546..0d921a9c679 100644 --- a/pkg/apis/logstash/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/logstash/v1alpha1/zz_generated.deepcopy.go @@ -20,7 +20,14 @@ func (in *Logstash) DeepCopyInto(out *Logstash) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) + if in.MonitoringAssocConfs != nil { + in, out := &in.MonitoringAssocConfs, &out.MonitoringAssocConfs + *out = make(map[v1.ObjectSelector]v1.AssociationConf, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Logstash. @@ -73,6 +80,27 @@ func (in *LogstashList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *LogstashMonitoringAssociation) DeepCopyInto(out *LogstashMonitoringAssociation) { + *out = *in + if in.Logstash != nil { + in, out := &in.Logstash, &out.Logstash + *out = new(Logstash) + (*in).DeepCopyInto(*out) + } + out.ref = in.ref +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LogstashMonitoringAssociation. +func (in *LogstashMonitoringAssociation) DeepCopy() *LogstashMonitoringAssociation { + if in == nil { + return nil + } + out := new(LogstashMonitoringAssociation) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LogstashService) DeepCopyInto(out *LogstashService) { *out = *in @@ -109,6 +137,7 @@ func (in *LogstashSpec) DeepCopyInto(out *LogstashSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + in.Monitoring.DeepCopyInto(&out.Monitoring) in.PodTemplate.DeepCopyInto(&out.PodTemplate) if in.RevisionHistoryLimit != nil { in, out := &in.RevisionHistoryLimit, &out.RevisionHistoryLimit @@ -137,6 +166,13 @@ func (in *LogstashSpec) DeepCopy() *LogstashSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LogstashStatus) DeepCopyInto(out *LogstashStatus) { *out = *in + if in.MonitoringAssociationStatus != nil { + in, out := &in.MonitoringAssociationStatus, &out.MonitoringAssociationStatus + *out = make(v1.AssociationStatusMap, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LogstashStatus. diff --git a/pkg/controller/association/controller/logstash_es.go b/pkg/controller/association/controller/logstash_es.go new file mode 100644 index 00000000000..6a79976b295 --- /dev/null +++ b/pkg/controller/association/controller/logstash_es.go @@ -0,0 +1,17 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package controller + +const ( + // LogstashAssociationLabelName marks resources created for an association originating from Logstash with the + // Logstash name. + LogstashAssociationLabelName = "logstashassociation.k8s.elastic.co/name" + // LogstashAssociationLabelNamespace marks resources created for an association originating from Logstash with the + // Logstash namespace. + LogstashAssociationLabelNamespace = "logstashassociation.k8s.elastic.co/namespace" + // LogstashAssociationLabelType marks resources created for an association originating from Logstash + // with the target resource type (e.g. "elasticsearch"). + LogstashAssociationLabelType = "logstashassociation.k8s.elastic.co/type" +) diff --git a/pkg/controller/association/controller/logstash_monitoring.go b/pkg/controller/association/controller/logstash_monitoring.go new file mode 100644 index 00000000000..c97398d3c1f --- /dev/null +++ b/pkg/controller/association/controller/logstash_monitoring.go @@ -0,0 +1,57 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package controller + +import ( + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" + esv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/elasticsearch/v1" + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/association" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/operator" + eslabel "github.com/elastic/cloud-on-k8s/v2/pkg/controller/elasticsearch/label" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/elasticsearch/user" + "github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s" + "github.com/elastic/cloud-on-k8s/v2/pkg/utils/rbac" +) + +// AddLogstashMonitoring reconciles an association between Logstash and Elasticsearch clusters for Stack Monitoring. +// Beats are configured to collect monitoring metrics and logs data of the associated Logstash and send +// them to the Elasticsearch referenced in the association. +func AddLogstashMonitoring(mgr manager.Manager, accessReviewer rbac.AccessReviewer, params operator.Parameters) error { + return association.AddAssociationController(mgr, accessReviewer, params, association.AssociationInfo{ + AssociatedObjTemplate: func() commonv1.Associated { return &logstashv1alpha1.Logstash{} }, + ReferencedObjTemplate: func() client.Object { return &esv1.Elasticsearch{} }, + ReferencedResourceVersion: referencedElasticsearchStatusVersion, + ExternalServiceURL: getElasticsearchExternalURL, + AssociationType: commonv1.LogstashMonitoringAssociationType, + ReferencedResourceNamer: esv1.ESNamer, + AssociationName: "ls-monitoring", + AssociatedShortName: "ls-mon", + Labels: func(associated types.NamespacedName) map[string]string { + return map[string]string{ + LogstashAssociationLabelName: associated.Name, + LogstashAssociationLabelNamespace: associated.Namespace, + LogstashAssociationLabelType: commonv1.LogstashMonitoringAssociationType, + } + }, + AssociationConfAnnotationNameBase: commonv1.ElasticsearchConfigAnnotationNameBase, + AssociationResourceNameLabelName: eslabel.ClusterNameLabelName, + AssociationResourceNamespaceLabelName: eslabel.ClusterNamespaceLabelName, + + ElasticsearchUserCreation: &association.ElasticsearchUserCreation{ + ElasticsearchRef: func(c k8s.Client, association commonv1.Association) (bool, commonv1.ObjectSelector, error) { + return true, association.AssociationRef(), nil + }, + UserSecretSuffix: "beat-ls-mon-user", + ESUserRole: func(associated commonv1.Associated) (string, error) { + return user.StackMonitoringUserRole, nil + }, + }, + }) +} diff --git a/pkg/controller/common/stackmon/validations/validations.go b/pkg/controller/common/stackmon/validations/validations.go index 45a435ebe83..c83c2a865e7 100644 --- a/pkg/controller/common/stackmon/validations/validations.go +++ b/pkg/controller/common/stackmon/validations/validations.go @@ -31,12 +31,12 @@ var ( // Validate validates that the resource version is supported for Stack Monitoring and that there is exactly one // Elasticsearch reference defined to send monitoring data when Stack Monitoring is defined -func Validate(resource monitoring.HasMonitoring, version string) field.ErrorList { +func Validate(resource monitoring.HasMonitoring, version string, minVersion version.Version) field.ErrorList { var errs field.ErrorList if monitoring.IsDefined(resource) { - err := IsSupportedVersion(version) + err := IsSupportedVersion(version, minVersion) if err != nil { - finalMinStackVersion, _ := semver.FinalizeVersion(MinStackVersion.String()) // discards prerelease suffix + finalMinStackVersion, _ := semver.FinalizeVersion(minVersion.String()) // discards prerelease suffix errs = append(errs, field.Invalid(field.NewPath("spec").Child("version"), version, fmt.Sprintf(UnsupportedVersionMsg, finalMinStackVersion))) } @@ -54,14 +54,14 @@ func Validate(resource monitoring.HasMonitoring, version string) field.ErrorList return errs } -// IsSupportedVersion returns true if the resource version is supported for Stack Monitoring, else returns false -func IsSupportedVersion(v string) error { +// IsSupportedVersion returns error if the resource version is not supported for Stack Monitoring +func IsSupportedVersion(v string, minVersion version.Version) error { ver, err := version.Parse(v) if err != nil { return err } - if ver.LT(MinStackVersion) { - return fmt.Errorf("unsupported version for Stack Monitoring: required >= %s", MinStackVersion) + if ver.LT(minVersion) { + return fmt.Errorf("unsupported version for Stack Monitoring: required >= %s", minVersion) } return nil } diff --git a/pkg/controller/common/stackmon/validations/validations_test.go b/pkg/controller/common/stackmon/validations/validations_test.go index 0679d08b0c6..e3e2c93b81c 100644 --- a/pkg/controller/common/stackmon/validations/validations_test.go +++ b/pkg/controller/common/stackmon/validations/validations_test.go @@ -97,7 +97,7 @@ func TestValidate(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - err := Validate(&tc.es, tc.es.Spec.Version) + err := Validate(&tc.es, tc.es.Spec.Version, MinStackVersion) if len(err) > 0 { require.True(t, tc.isErr) } else { diff --git a/pkg/controller/elasticsearch/validation/validations.go b/pkg/controller/elasticsearch/validation/validations.go index cccfbb30baa..570e5ac3876 100644 --- a/pkg/controller/elasticsearch/validation/validations.go +++ b/pkg/controller/elasticsearch/validation/validations.go @@ -330,7 +330,7 @@ func currentVersion(current esv1.Elasticsearch) (version.Version, *field.Error) } func validMonitoring(es esv1.Elasticsearch) field.ErrorList { - return stackmon.Validate(&es, es.Spec.Version) + return stackmon.Validate(&es, es.Spec.Version, stackmon.MinStackVersion) } func validAssociations(es esv1.Elasticsearch) field.ErrorList { diff --git a/pkg/controller/logstash/driver.go b/pkg/controller/logstash/driver.go index 144b7a9711a..9fec5b89e4f 100644 --- a/pkg/controller/logstash/driver.go +++ b/pkg/controller/logstash/driver.go @@ -18,6 +18,7 @@ import ( "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/reconciler" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/tracing" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/watches" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/stackmon" "github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s" "github.com/elastic/cloud-on-k8s/v2/pkg/utils/log" ) @@ -78,6 +79,11 @@ func internalReconcile(params Params) (*reconciler.Results, logstashv1alpha1.Log configHash := fnv.New32a() + // reconcile beats config secrets if Stack Monitoring is defined + if err := stackmon.ReconcileConfigSecrets(params.Context, params.Client, params.Logstash); err != nil { + return results.WithError(err), params.Status + } + if err := reconcileConfig(params, configHash); err != nil { return results.WithError(err), params.Status } diff --git a/pkg/controller/logstash/pod.go b/pkg/controller/logstash/pod.go index a1d71bcf6ae..00ff46e8237 100644 --- a/pkg/controller/logstash/pod.go +++ b/pkg/controller/logstash/pod.go @@ -20,12 +20,11 @@ import ( "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/tracing" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/volume" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/network" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/stackmon" "github.com/elastic/cloud-on-k8s/v2/pkg/utils/maps" ) const ( - ContainerName = "logstash" - ConfigVolumeName = "config" ConfigMountPath = "/usr/share/logstash/config" @@ -55,7 +54,7 @@ var ( func buildPodTemplate(params Params, configHash hash.Hash32) corev1.PodTemplateSpec { defer tracing.Span(¶ms.Context)() spec := ¶ms.Logstash.Spec - builder := defaults.NewPodTemplateBuilder(params.GetPodTemplate(), ContainerName) + builder := defaults.NewPodTemplateBuilder(params.GetPodTemplate(), logstashv1alpha1.LogstashContainerName) vols := []volume.VolumeLike{ // volume with logstash configuration file volume.NewSecretVolume( @@ -85,6 +84,11 @@ func buildPodTemplate(params Params, configHash hash.Hash32) corev1.PodTemplateS WithReadinessProbe(readinessProbe(false)). WithVolumeLikes(vols...) + builder, err := stackmon.WithMonitoring(params.Context, params.Client, builder, params.Logstash) + if err != nil { + return corev1.PodTemplateSpec{} + } + // TODO integrate with api.ssl.enabled // if params.Logstash.Spec.HTTP.TLS.Enabled() { // httpVol := certificates.HTTPCertSecretVolume(logstashv1alpha1.Namer, params.Logstash.Name) diff --git a/pkg/controller/logstash/stackmon/beat_config.go b/pkg/controller/logstash/stackmon/beat_config.go new file mode 100644 index 00000000000..12e45eeb8d3 --- /dev/null +++ b/pkg/controller/logstash/stackmon/beat_config.go @@ -0,0 +1,61 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package stackmon + +import ( + "context" + _ "embed" // for the beats config files + + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/reconciler" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/stackmon/monitoring" + "github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s" +) + +var ( + // metricbeatConfigTemplate is a configuration template for Metricbeat to collect monitoring data about Kibana + //go:embed metricbeat.tpl.yml + metricbeatConfigTemplate string + + // filebeatConfig is a static configuration for Filebeat to collect Kibana logs + //go:embed filebeat.yml + filebeatConfig string +) + +// ReconcileConfigSecrets reconciles the secrets holding beats configuration +func ReconcileConfigSecrets(ctx context.Context, client k8s.Client, logstash logstashv1alpha1.Logstash) error { + isMonitoringReconcilable, err := monitoring.IsReconcilable(&logstash) + if err != nil { + return err + } + if !isMonitoringReconcilable { + return nil + } + + if monitoring.IsMetricsDefined(&logstash) { + b, err := Metricbeat(ctx, client, logstash) + if err != nil { + return err + } + + if _, err := reconciler.ReconcileSecret(ctx, client, b.ConfigSecret, &logstash); err != nil { + return err + } + } + + if monitoring.IsLogsDefined(&logstash) { + b, err := Filebeat(ctx, client, logstash) + if err != nil { + return err + } + + if _, err := reconciler.ReconcileSecret(ctx, client, b.ConfigSecret, &logstash); err != nil { + return err + } + } + + return nil +} diff --git a/pkg/controller/logstash/stackmon/filebeat.yml b/pkg/controller/logstash/stackmon/filebeat.yml new file mode 100644 index 00000000000..314ef3c1f27 --- /dev/null +++ b/pkg/controller/logstash/stackmon/filebeat.yml @@ -0,0 +1,19 @@ +filebeat.modules: + - module: logstash + log: + enabled: true + var.paths: + - "/usr/share/logstash/logs/logstash-plain.log" + - "/usr/share/logstash/logs/logstash-json.log" + - "/usr/share/logstash/logs/logstash-deprecation.log" + slowlog: + enabled: true + var.paths: + - "/usr/share/logstash/logs/logstash-slowlog-plain.log" + - "/usr/share/logstash/logs/logstash-slowlog-json.log" + +processors: + - add_cloud_metadata: {} + - add_host_metadata: {} + +# Elasticsearch output configuration is generated \ No newline at end of file diff --git a/pkg/controller/logstash/stackmon/ls_config.go b/pkg/controller/logstash/stackmon/ls_config.go new file mode 100644 index 00000000000..bd8f9c096ed --- /dev/null +++ b/pkg/controller/logstash/stackmon/ls_config.go @@ -0,0 +1,14 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package stackmon + +import ( + corev1 "k8s.io/api/core/v1" +) + +// fileLogStyleEnvVar returns the environment variable to configure the Logstash container to write logs to disk +func fileLogStyleEnvVar() corev1.EnvVar { + return corev1.EnvVar{Name: "LOG_STYLE", Value: "file"} +} diff --git a/pkg/controller/logstash/stackmon/metricbeat.tpl.yml b/pkg/controller/logstash/stackmon/metricbeat.tpl.yml new file mode 100644 index 00000000000..cbc84c40a94 --- /dev/null +++ b/pkg/controller/logstash/stackmon/metricbeat.tpl.yml @@ -0,0 +1,13 @@ +metricbeat.modules: + - module: logstash + metricsets: + - node + - node_stats + period: 10s + hosts: ["{{ .URL }}"] + xpack.enabled: true +processors: + - add_cloud_metadata: {} + - add_host_metadata: {} + +# Elasticsearch output configuration is generated diff --git a/pkg/controller/logstash/stackmon/sidecar.go b/pkg/controller/logstash/stackmon/sidecar.go new file mode 100644 index 00000000000..c178de44fe9 --- /dev/null +++ b/pkg/controller/logstash/stackmon/sidecar.go @@ -0,0 +1,110 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package stackmon + +import ( + "context" + "fmt" + "hash/fnv" + + corev1 "k8s.io/api/core/v1" + + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/defaults" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/stackmon" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/stackmon/monitoring" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/volume" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/network" + "github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s" +) + +const ( + // cfgHashAnnotation is used to store a hash of the Metricbeat and Filebeat configurations. + cfgHashAnnotation = "logstash.k8s.elastic.co/monitoring-config-hash" + + logstashLogsVolumeName = "logstash-logs" + logstashLogsMountPath = "/usr/share/logstash/logs" +) + +func Metricbeat(ctx context.Context, client k8s.Client, logstash logstashv1alpha1.Logstash) (stackmon.BeatSidecar, error) { + metricbeat, err := stackmon.NewMetricBeatSidecar( + ctx, + client, + commonv1.LogstashMonitoringAssociationType, + &logstash, + logstash.Spec.Version, + metricbeatConfigTemplate, + logstashv1alpha1.Namer, + fmt.Sprintf("%s://localhost:%d", "http" /*logstash.Spec.HTTP.Protocol()*/, network.HTTPPort), + //TODO: integrate username password with Logstash metrics API + "", /* no username for metrics API */ + "", /* no password for metrics API */ + false, + ) + if err != nil { + return stackmon.BeatSidecar{}, err + } + return metricbeat, nil +} + +func Filebeat(ctx context.Context, client k8s.Client, logstash logstashv1alpha1.Logstash) (stackmon.BeatSidecar, error) { + return stackmon.NewFileBeatSidecar(ctx, client, &logstash, logstash.Spec.Version, filebeatConfig, nil) +} + +// WithMonitoring updates the Logstash Pod template builder to deploy Metricbeat and Filebeat in sidecar containers +// in the Logstash pod and injects the volumes for the beat configurations and the ES CA certificates. +func WithMonitoring(ctx context.Context, client k8s.Client, builder *defaults.PodTemplateBuilder, logstash logstashv1alpha1.Logstash) (*defaults.PodTemplateBuilder, error) { + isMonitoringReconcilable, err := monitoring.IsReconcilable(&logstash) + if err != nil { + return nil, err + } + if !isMonitoringReconcilable { + return builder, nil + } + + configHash := fnv.New32a() + var volumes []corev1.Volume + + if monitoring.IsMetricsDefined(&logstash) { + b, err := Metricbeat(ctx, client, logstash) + if err != nil { + return nil, err + } + + volumes = append(volumes, b.Volumes...) + builder.WithContainers(b.Container) + configHash.Write(b.ConfigHash.Sum(nil)) + } + + if monitoring.IsLogsDefined(&logstash) { + // Set environment variable to tell Logstash container to write logs to disk + builder.WithEnv(fileLogStyleEnvVar()) + + b, err := Filebeat(ctx, client, logstash) + if err != nil { + return nil, err + } + + // create a logs volume shared between Logstash and Filebeat + // TODO: revisit log volume when persistent storage is added + logsVolume := volume.NewEmptyDirVolume(logstashLogsVolumeName, logstashLogsMountPath) + volumes = append(volumes, logsVolume.Volume()) + filebeat := b.Container + filebeat.VolumeMounts = append(filebeat.VolumeMounts, logsVolume.VolumeMount()) + builder.WithVolumeMounts(logsVolume.VolumeMount()) + + volumes = append(volumes, b.Volumes...) + builder.WithContainers(filebeat) + configHash.Write(b.ConfigHash.Sum(nil)) + } + + // add the config hash annotation to ensure pod rotation when an ES password or a CA are rotated + builder.WithAnnotations(map[string]string{cfgHashAnnotation: fmt.Sprint(configHash.Sum32())}) + // inject all volumes + builder.WithVolumes(volumes...) + + return builder, nil +} diff --git a/pkg/controller/logstash/stackmon/sidecar_test.go b/pkg/controller/logstash/stackmon/sidecar_test.go new file mode 100644 index 00000000000..627486e106e --- /dev/null +++ b/pkg/controller/logstash/stackmon/sidecar_test.go @@ -0,0 +1,179 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package stackmon + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/defaults" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/stackmon/monitoring" + "github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s" +) + +func TestWithMonitoring(t *testing.T) { + sampleLs := logstashv1alpha1.Logstash{ + ObjectMeta: metav1.ObjectMeta{ + Name: "sample", + Namespace: "aerospace", + }, + Spec: logstashv1alpha1.LogstashSpec{ + Version: "8.6.0", + }, + } + monitoringEsRef := []commonv1.ObjectSelector{{Name: "monitoring", Namespace: "observability"}} + logsEsRef := []commonv1.ObjectSelector{{Name: "logs", Namespace: "observability"}} + + fakeMetricsBeatUserSecret := corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "sample-observability-monitoring-beat-es-mon-user", Namespace: "aerospace"}, + Data: map[string][]byte{"aerospace-sample-observability-monitoring-beat-es-mon-user": []byte("1234567890")}, + } + fakeLogsBeatUserSecret := corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "sample-observability-logs-beat-es-mon-user", Namespace: "aerospace"}, + Data: map[string][]byte{"aerospace-sample-observability-logs-beat-es-mon-user": []byte("1234567890")}, + } + fakeEsHTTPCertSecret := corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "sample-es-http-certs-public", Namespace: "aerospace"}, + Data: map[string][]byte{ + "tls.crt": []byte("7H1515N074r341C3r71F1C473"), + "ca.crt": []byte("7H1515N074r341C3r71F1C473"), + }, + } + fakeLsHTTPCertSecret := corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "sample-ls-http-certs-public", Namespace: "aerospace"}, + Data: map[string][]byte{ + "tls.crt": []byte("7H1515N074r341C3r71F1C473"), + "ca.crt": []byte("7H1515N074r341C3r71F1C473"), + }, + } + fakeClient := k8s.NewFakeClient(&fakeMetricsBeatUserSecret, &fakeLogsBeatUserSecret, &fakeEsHTTPCertSecret, &fakeLsHTTPCertSecret) + + monitoringAssocConf := commonv1.AssociationConf{ + AuthSecretName: "sample-observability-monitoring-beat-es-mon-user", + AuthSecretKey: "aerospace-sample-observability-monitoring-beat-es-mon-user", + CACertProvided: true, + CASecretName: "sample-es-monitoring-observability-monitoring-ca", + URL: "https://monitoring-es-http.observability.svc:9200", + Version: "8.6.0", + } + logsAssocConf := commonv1.AssociationConf{ + AuthSecretName: "sample-observability-logs-beat-es-mon-user", + AuthSecretKey: "aerospace-sample-observability-logs-beat-es-mon-user", + CACertProvided: true, + CASecretName: "sample-es-logs-observability-monitoring-ca", + URL: "https://logs-es-http.observability.svc:9200", + Version: "8.6.0", + } + + tests := []struct { + name string + ls func() logstashv1alpha1.Logstash + containersLength int + esEnvVarsLength int + podVolumesLength int + metricsVolumeMountsLength int + logVolumeMountsLength int + }{ + { + name: "without monitoring", + ls: func() logstashv1alpha1.Logstash { + return sampleLs + }, + containersLength: 1, + }, + { + name: "with metrics monitoring", + ls: func() logstashv1alpha1.Logstash { + sampleLs.Spec.Monitoring.Metrics.ElasticsearchRefs = monitoringEsRef + monitoring.GetMetricsAssociation(&sampleLs)[0].SetAssociationConf(&monitoringAssocConf) + return sampleLs + }, + containersLength: 2, + esEnvVarsLength: 0, + podVolumesLength: 2, + metricsVolumeMountsLength: 2, + }, + { + name: "with logs monitoring", + ls: func() logstashv1alpha1.Logstash { + sampleLs.Spec.Monitoring.Metrics.ElasticsearchRefs = nil + sampleLs.Spec.Monitoring.Logs.ElasticsearchRefs = monitoringEsRef + monitoring.GetLogsAssociation(&sampleLs)[0].SetAssociationConf(&monitoringAssocConf) + return sampleLs + }, + containersLength: 2, + esEnvVarsLength: 1, + podVolumesLength: 3, + logVolumeMountsLength: 3, + }, + { + name: "with metrics and logs monitoring", + ls: func() logstashv1alpha1.Logstash { + sampleLs.Spec.Monitoring.Metrics.ElasticsearchRefs = monitoringEsRef + monitoring.GetMetricsAssociation(&sampleLs)[0].SetAssociationConf(&monitoringAssocConf) + sampleLs.Spec.Monitoring.Logs.ElasticsearchRefs = monitoringEsRef + monitoring.GetLogsAssociation(&sampleLs)[0].SetAssociationConf(&logsAssocConf) + return sampleLs + }, + containersLength: 3, + esEnvVarsLength: 1, + podVolumesLength: 4, + metricsVolumeMountsLength: 2, + logVolumeMountsLength: 3, + }, + { + name: "with metrics and logs monitoring with different es ref", + ls: func() logstashv1alpha1.Logstash { + sampleLs.Spec.Monitoring.Metrics.ElasticsearchRefs = monitoringEsRef + monitoring.GetMetricsAssociation(&sampleLs)[0].SetAssociationConf(&monitoringAssocConf) + sampleLs.Spec.Monitoring.Logs.ElasticsearchRefs = logsEsRef + monitoring.GetLogsAssociation(&sampleLs)[0].SetAssociationConf(&logsAssocConf) + return sampleLs + }, + containersLength: 3, + esEnvVarsLength: 1, + podVolumesLength: 5, + metricsVolumeMountsLength: 2, + logVolumeMountsLength: 3, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ls := tc.ls() + builder := defaults.NewPodTemplateBuilder(corev1.PodTemplateSpec{}, logstashv1alpha1.LogstashContainerName) + _, err := WithMonitoring(context.Background(), fakeClient, builder, ls) + assert.NoError(t, err) + + assert.Equal(t, tc.containersLength, len(builder.PodTemplate.Spec.Containers)) + for _, v := range builder.PodTemplate.Spec.Volumes { + fmt.Println(v) + } + assert.Equal(t, tc.podVolumesLength, len(builder.PodTemplate.Spec.Volumes)) + + if monitoring.IsMetricsDefined(&ls) { + for _, c := range builder.PodTemplate.Spec.Containers { + if c.Name == "metricbeat" { + assert.Equal(t, tc.metricsVolumeMountsLength, len(c.VolumeMounts)) + } + } + } + if monitoring.IsLogsDefined(&ls) { + for _, c := range builder.PodTemplate.Spec.Containers { + if c.Name == "filebeat" { + assert.Equal(t, tc.logVolumeMountsLength, len(c.VolumeMounts)) + } + } + } + }) + } +} diff --git a/test/e2e/beat/config_test.go b/test/e2e/beat/config_test.go index b0b439ff6a0..39b49a5a755 100644 --- a/test/e2e/beat/config_test.go +++ b/test/e2e/beat/config_test.go @@ -66,7 +66,7 @@ func TestMetricbeatDefaultConfig(t *testing.T) { } { t.Run(tt.name, func(t *testing.T) { // only execute this test on supported versions when stack monitoring is enabled - err := validations.IsSupportedVersion(test.Ctx().ElasticStackVersion) + err := validations.IsSupportedVersion(test.Ctx().ElasticStackVersion, validations.MinStackVersion) if tt.withStackMonitoring && err != nil { t.SkipNow() } diff --git a/test/e2e/es/stack_monitoring_test.go b/test/e2e/es/stack_monitoring_test.go index 7c83fddc1a2..0fd6e5ae747 100644 --- a/test/e2e/es/stack_monitoring_test.go +++ b/test/e2e/es/stack_monitoring_test.go @@ -37,7 +37,7 @@ const nodePort = int32(32767) // correctly delivered to the referenced monitoring Elasticsearch clusters. func TestESStackMonitoring(t *testing.T) { // only execute this test on supported version - err := validations.IsSupportedVersion(test.Ctx().ElasticStackVersion) + err := validations.IsSupportedVersion(test.Ctx().ElasticStackVersion, validations.MinStackVersion) if err != nil { t.SkipNow() } @@ -68,7 +68,7 @@ func TestExternalESStackMonitoring(t *testing.T) { t.SkipNow() } // only execute this test on supported version - err := validations.IsSupportedVersion(test.Ctx().ElasticStackVersion) + err := validations.IsSupportedVersion(test.Ctx().ElasticStackVersion, validations.MinStackVersion) if err != nil { t.SkipNow() } diff --git a/test/e2e/kb/stack_monitoring_test.go b/test/e2e/kb/stack_monitoring_test.go index 967e34b62af..bdafacc60ba 100644 --- a/test/e2e/kb/stack_monitoring_test.go +++ b/test/e2e/kb/stack_monitoring_test.go @@ -20,7 +20,7 @@ import ( // correctly delivered to the referenced monitoring Elasticsearch clusters. func TestKBStackMonitoring(t *testing.T) { // only execute this test on supported version - err := validations.IsSupportedVersion(test.Ctx().ElasticStackVersion) + err := validations.IsSupportedVersion(test.Ctx().ElasticStackVersion, validations.MinStackVersion) if err != nil { t.SkipNow() } diff --git a/test/e2e/logstash/stack_monitoring_test.go b/test/e2e/logstash/stack_monitoring_test.go new file mode 100644 index 00000000000..599b785f3b9 --- /dev/null +++ b/test/e2e/logstash/stack_monitoring_test.go @@ -0,0 +1,43 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +//go:build logstash || e2e + +package logstash + +import ( + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/version" + "testing" + + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + "github.com/elastic/cloud-on-k8s/v2/test/e2e/test" + "github.com/elastic/cloud-on-k8s/v2/test/e2e/test/checks" + "github.com/elastic/cloud-on-k8s/v2/test/e2e/test/elasticsearch" + "github.com/elastic/cloud-on-k8s/v2/test/e2e/test/logstash" +) + +// TestLogstashStackMonitoring tests that when Logstash is configured with monitoring, its log and metrics are +// correctly delivered to the referenced monitoring Elasticsearch clusters. +func TestLogstashStackMonitoring(t *testing.T) { + // only execute this test on supported version + if version.MustParse(test.Ctx().ElasticStackVersion).LT(logstashv1alpha1.MinStackMonVersion) { + t.SkipNow() + } + + // create 1 monitored and 2 monitoring clusters to collect separately metrics and logs + metrics := elasticsearch.NewBuilder("test-ls-mon-metrics"). + WithESMasterDataNodes(2, elasticsearch.DefaultResources) + logs := elasticsearch.NewBuilder("test-ls-mon-logs"). + WithESMasterDataNodes(2, elasticsearch.DefaultResources) + monitored := logstash.NewBuilder("test-ls-mon-a"). + WithNodeCount(1). + WithMonitoring(metrics.Ref(), logs.Ref()) + + // checks that the sidecar beats have sent data in the monitoring clusters + steps := func(k *test.K8sClient) test.StepList { + return checks.MonitoredSteps(&monitored, k) + } + + test.Sequence(nil, steps, metrics, logs, monitored).RunSequential(t) +} diff --git a/test/e2e/test/checks/monitoring.go b/test/e2e/test/checks/monitoring.go index 604e753cf6c..d2ab0f3ff4b 100644 --- a/test/e2e/test/checks/monitoring.go +++ b/test/e2e/test/checks/monitoring.go @@ -52,13 +52,13 @@ type stackMonitoringChecks struct { func (c stackMonitoringChecks) Steps() test.StepList { return test.StepList{ - c.CheckBeatSidecars(), + c.CheckBeatSidecarsInElasticsearch(), c.CheckMonitoringMetricsIndex(), c.CheckFilebeatIndex(), } } -func (c stackMonitoringChecks) CheckBeatSidecars() test.Step { +func (c stackMonitoringChecks) CheckBeatSidecarsInElasticsearch() test.Step { return test.Step{ Name: "Check that beat sidecars are running", Test: test.Eventually(func() error { diff --git a/test/e2e/test/logstash/builder.go b/test/e2e/test/logstash/builder.go index 053253a1b6c..b92fea66a45 100644 --- a/test/e2e/test/logstash/builder.go +++ b/test/e2e/test/logstash/builder.go @@ -10,6 +10,7 @@ import ( "k8s.io/apimachinery/pkg/util/rand" "sigs.k8s.io/controller-runtime/pkg/client" + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/version" "github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s" @@ -115,6 +116,40 @@ func (b Builder) WithServices(services ...logstashv1alpha1.LogstashService) Buil return b } +func (b Builder) WithMonitoring(metricsESRef commonv1.ObjectSelector, logsESRef commonv1.ObjectSelector) Builder { + b.Logstash.Spec.Monitoring.Metrics.ElasticsearchRefs = []commonv1.ObjectSelector{metricsESRef} + b.Logstash.Spec.Monitoring.Logs.ElasticsearchRefs = []commonv1.ObjectSelector{logsESRef} + return b +} + +func (b Builder) GetMetricsIndexPattern() string { + return ".monitoring-logstash-8-mb" +} + +func (b Builder) Name() string { + return b.Logstash.Name +} + +func (b Builder) Namespace() string { + return b.Logstash.Namespace +} + +func (b Builder) GetLogsCluster() *types.NamespacedName { + if len(b.Logstash.Spec.Monitoring.Logs.ElasticsearchRefs) == 0 { + return nil + } + logsCluster := b.Logstash.Spec.Monitoring.Logs.ElasticsearchRefs[0].NamespacedName() + return &logsCluster +} + +func (b Builder) GetMetricsCluster() *types.NamespacedName { + if len(b.Logstash.Spec.Monitoring.Metrics.ElasticsearchRefs) == 0 { + return nil + } + metricsCluster := b.Logstash.Spec.Monitoring.Metrics.ElasticsearchRefs[0].NamespacedName() + return &metricsCluster +} + func (b Builder) NSN() types.NamespacedName { return k8s.ExtractNamespacedName(&b.Logstash) } diff --git a/test/e2e/test/logstash/checks.go b/test/e2e/test/logstash/checks.go index 490a55394c1..9d5e900498a 100644 --- a/test/e2e/test/logstash/checks.go +++ b/test/e2e/test/logstash/checks.go @@ -11,6 +11,7 @@ import ( corev1 "k8s.io/api/core/v1" + v1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" "github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s" "github.com/elastic/cloud-on-k8s/v2/test/e2e/test" @@ -51,14 +52,31 @@ func CheckStatus(b Builder, k *test.K8sClient) test.Step { logstash.Status.ObservedGeneration = 0 + // pod status expected := logstashv1alpha1.LogstashStatus{ ExpectedNodes: b.Logstash.Spec.Count, AvailableNodes: b.Logstash.Spec.Count, Version: b.Logstash.Spec.Version, } - if logstash.Status != expected { + + if (logstash.Status.ExpectedNodes != expected.ExpectedNodes) || + (logstash.Status.AvailableNodes != expected.AvailableNodes) || + (logstash.Status.Version != expected.Version) { return fmt.Errorf("expected status %+v but got %+v", expected, logstash.Status) } + + // monitoring status + expectedMonitoringInStatus := len(logstash.Spec.Monitoring.Metrics.ElasticsearchRefs) + len(logstash.Spec.Monitoring.Metrics.ElasticsearchRefs) + actualMonitoringInStatus := len(logstash.Status.MonitoringAssociationStatus) + if expectedMonitoringInStatus != actualMonitoringInStatus { + return fmt.Errorf("expected %d monitoring associations in status but got %d", expectedMonitoringInStatus, actualMonitoringInStatus) + } + for a, s := range logstash.Status.MonitoringAssociationStatus { + if s != v1.AssociationEstablished { + return fmt.Errorf("monitoring association %s has status %s ", a, s) + } + } + return nil }), }