From 0111ad977dfde98e9fafe9df5c36d0e559c10138 Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Fri, 7 Apr 2023 16:54:01 -0400 Subject: [PATCH 01/12] Copy pipeline and logstash.yml to initContainer --- .../logstash.k8s.elastic.co_logstashes.yaml | 12 +- config/samples/logstash/logstash_svc.yaml | 5 +- pkg/controller/logstash/config.go | 1 + pkg/controller/logstash/init_configuration.go | 86 +++++++++++++ pkg/controller/logstash/pipeline.go | 2 +- pkg/controller/logstash/pod.go | 114 +++++++++++++++--- 6 files changed, 196 insertions(+), 24 deletions(-) create mode 100644 pkg/controller/logstash/init_configuration.go diff --git a/config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml b/config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml index f92a4a038ce..c693c339a01 100644 --- a/config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml +++ b/config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml @@ -1944,7 +1944,8 @@ spec: defined in spec.resourceClaims, that are used by this container. \n This is an alpha field and requires enabling the DynamicResourceAllocation - feature gate. \n This field is immutable." + feature gate. \n This field is immutable. It can + only be set for containers." items: description: ResourceClaim references one entry in PodSpec.ResourceClaims. @@ -3314,7 +3315,8 @@ spec: defined in spec.resourceClaims, that are used by this container. \n This is an alpha field and requires enabling the DynamicResourceAllocation - feature gate. \n This field is immutable." + feature gate. \n This field is immutable. It can + only be set for containers." items: description: ResourceClaim references one entry in PodSpec.ResourceClaims. @@ -4717,7 +4719,8 @@ spec: defined in spec.resourceClaims, that are used by this container. \n This is an alpha field and requires enabling the DynamicResourceAllocation - feature gate. \n This field is immutable." + feature gate. \n This field is immutable. It can + only be set for containers." items: description: ResourceClaim references one entry in PodSpec.ResourceClaims. @@ -6523,7 +6526,8 @@ spec: that are used by this container. \n This is an alpha field and requires enabling the DynamicResourceAllocation - feature gate. \n This field is immutable." + feature gate. \n This field is immutable. + It can only be set for containers." items: description: ResourceClaim references one entry in PodSpec.ResourceClaims. diff --git a/config/samples/logstash/logstash_svc.yaml b/config/samples/logstash/logstash_svc.yaml index 410caae8d37..f0c6809292e 100644 --- a/config/samples/logstash/logstash_svc.yaml +++ b/config/samples/logstash/logstash_svc.yaml @@ -18,9 +18,12 @@ spec: count: 2 version: 8.6.1 config: - log.level: info + log.level: debug api.http.host: "0.0.0.0" queue.type: memory + pipelines: + - pipeline.id: one + config.string: "input { beats { port => 5045}} output { stdout {}}" services: - name: api service: diff --git a/pkg/controller/logstash/config.go b/pkg/controller/logstash/config.go index 5a0409dbb60..2aed9bf63cd 100644 --- a/pkg/controller/logstash/config.go +++ b/pkg/controller/logstash/config.go @@ -75,6 +75,7 @@ func defaultConfig() *settings.CanonicalConfig { settingsMap := map[string]interface{}{ // Set 'api.http.host' by default to `0.0.0.0` for readiness probe to work. "api.http.host": "0.0.0.0", + "config.reload.automatic": true, } return settings.MustCanonicalConfig(settingsMap) diff --git a/pkg/controller/logstash/init_configuration.go b/pkg/controller/logstash/init_configuration.go new file mode 100644 index 00000000000..1106938b0dc --- /dev/null +++ b/pkg/controller/logstash/init_configuration.go @@ -0,0 +1,86 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package logstash + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" +) + +const ( + //InitContainerConfigVolumeMountPath = "/mnt/elastic-internal/logstash-config-local" + InitConfigContainerName = "logstash-internal-init-config" + + // InitConfigScript is a small bash script to prepare the logstash configuration directory + InitConfigScript = `#!/usr/bin/env bash +set -eux + +init_config_initialized_flag=` + InitContainerConfigVolumeMountPath + `/elastic-internal-init-config.ok + +if [[ -f "${init_config_initialized_flag}" ]]; then + echo "Logstash configuration already initialized." + exit 0 +fi + +echo "Setup Logstash configuration" + +mount_path=` + InitContainerConfigVolumeMountPath + ` + +for f in /usr/share/logstash/config/*.*; do + filename=$(basename $f) + if [[ ! -f "$mount_path/$filename" ]]; then + cp $f $mount_path + fi +done + +cp ` + InternalConfigVolumeMountPath + `/* ` + InitContainerConfigVolumeMountPath + ` +cp ` + InternalPipelineVolumeMountPath + `/* ` + InitContainerConfigVolumeMountPath + ` + +touch "${init_config_initialized_flag}" +echo "Logstash configuration successfully prepared." +` +) + +// initConfigContainer returns an init container that executes a bash script to prepare the logstash config directory. +// The script copy config files from /use/share/logstash/config to /mnt/elastic-internal/logstash-config/ +// TODO may be able to solve env2yaml permission issue with initContainer +func initConfigContainer(ls logstashv1alpha1.Logstash) corev1.Container { + privileged := false + + return corev1.Container{ + // Image will be inherited from pod template defaults + Image: "docker.elastic.co/logstash/logstash:8.6.1", + ImagePullPolicy: corev1.PullIfNotPresent, + Name: InitConfigContainerName, + SecurityContext: &corev1.SecurityContext{ + Privileged: &privileged, + }, + Command: []string{"/usr/bin/env", "bash", "-c", InitConfigScript}, + //VolumeMounts: []corev1.VolumeMount{ + // { + // MountPath: InitContainerConfigVolumeMountPath, + // Name: ConfigVolumeName, + // }, + //}, + VolumeMounts: []corev1.VolumeMount{ + ConfigSharedVolume.InitContainerVolumeMount(), + ConfigVolume(ls).VolumeMount(), + PipelineVolume(ls).VolumeMount(), + }, + Resources: corev1.ResourceRequirements{ + Requests: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceMemory: resource.MustParse("50Mi"), + corev1.ResourceCPU: resource.MustParse("0.1"), + }, + Limits: map[corev1.ResourceName]resource.Quantity{ + // Memory limit should be at least 12582912 when running with CRI-O + corev1.ResourceMemory: resource.MustParse("50Mi"), + corev1.ResourceCPU: resource.MustParse("0.1"), + }, + }, + } +} \ No newline at end of file diff --git a/pkg/controller/logstash/pipeline.go b/pkg/controller/logstash/pipeline.go index ddc696b29df..ca787612ea8 100644 --- a/pkg/controller/logstash/pipeline.go +++ b/pkg/controller/logstash/pipeline.go @@ -40,7 +40,7 @@ func reconcilePipeline(params Params, configHash hash.Hash) error { return err } - _, _ = configHash.Write(cfgBytes) + //_, _ = configHash.Write(cfgBytes) return nil } diff --git a/pkg/controller/logstash/pod.go b/pkg/controller/logstash/pod.go index af9c6d53f26..8d998c6c631 100644 --- a/pkg/controller/logstash/pod.go +++ b/pkg/controller/logstash/pod.go @@ -7,7 +7,7 @@ package logstash import ( "fmt" "hash" - "path" + //"path" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -39,6 +39,16 @@ const ( // VersionLabelName is a label used to track the version of a Logstash Pod. VersionLabelName = "logstash.k8s.elastic.co/version" + + InitContainerConfigVolumeMountPath = "/mnt/elastic-internal/logstash-config-local" + + + // InternalConfigVolumeName is a volume which contains the generated configuration. + InternalConfigVolumeName = "elastic-internal-logstash-config" + InternalConfigVolumeMountPath = "/mnt/elastic-internal/logstash-config" + InternalPipelineVolumeName = "elastic-internal-logstash-pipeline" + InternalPipelineVolumeMountPath = "/mnt/elastic-internal/logstash-pipeline" + ) var ( @@ -52,28 +62,95 @@ var ( corev1.ResourceCPU: resource.MustParse("1000m"), }, } + // ConfigVolume is used to propagate the keystore file from the init container to + // Logstash main container. + //ConfigVolume = volume.NewEmptyDirVolume(ConfigVolumeName, ConfigMountPath) + ) +// SecretName is the name of the secret that holds the Logstash config for the given Logstash resource. +func SecretName(ls logstashv1alpha1.Logstash) string { + return ls.Name + "-ls-config" +} + +// SecretName is the name of the secret that holds the Logstash pipeline for the given Logstash resource. +func PipelineSecretName(ls logstashv1alpha1.Logstash) string { + return ls.Name + "-ls-pipeline" +} + +var ( + // ConfigSharedVolume contains the Logstash config/ directory, it contains the contents of config from the docker container + ConfigSharedVolume = volume.SharedVolume{ + VolumeName: ConfigVolumeName, + InitContainerMountPath: InitContainerConfigVolumeMountPath, + ContainerMountPath: ConfigMountPath, + } +) + +// ConfigVolume returns a SecretVolume to hold the Logstash config of the given Logstash resource. +func ConfigVolume(ls logstashv1alpha1.Logstash) volume.SecretVolume { + return volume.NewSecretVolumeWithMountPath( + SecretName(ls), + InternalConfigVolumeName, + InternalConfigVolumeMountPath, + ) +} + +// PipelineVolume returns a SecretVolume to hold the Logstash config of the given Logstash resource. +func PipelineVolume(ls logstashv1alpha1.Logstash) volume.SecretVolume { + return volume.NewSecretVolumeWithMountPath( + PipelineSecretName(ls), + InternalPipelineVolumeName, + InternalPipelineVolumeMountPath, + ) +} + + func buildPodTemplate(params Params, configHash hash.Hash32) corev1.PodTemplateSpec { defer tracing.Span(¶ms.Context)() spec := ¶ms.Logstash.Spec builder := defaults.NewPodTemplateBuilder(params.GetPodTemplate(), logstashv1alpha1.LogstashContainerName) - vols := []volume.VolumeLike{ - // volume with logstash configuration file - volume.NewSecretVolume( - logstashv1alpha1.ConfigSecretName(params.Logstash.Name), - LogstashConfigVolumeName, - path.Join(ConfigMountPath, LogstashConfigFileName), - LogstashConfigFileName, - 0644), - // volume with logstash pipeline file - volume.NewSecretVolume( - logstashv1alpha1.PipelineSecretName(params.Logstash.Name), - PipelineVolumeName, - path.Join(ConfigMountPath, PipelineFileName), - PipelineFileName, - 0644), - } + vols := []volume.VolumeLike{ConfigSharedVolume, ConfigVolume(params.Logstash), PipelineVolume(params.Logstash)} + + + //vols := []volume.VolumeLike{ + // ConfigVolume, + // // volume with logstash configuration file + // volume.NewSecretVolume( + // logstashv1alpha1.ConfigSecretName(params.Logstash.Name), + // LogstashConfigVolumeName, + // path.Join(ConfigMountPath, LogstashConfigFileName), + // LogstashConfigFileName, + // 0644), + // // volume with logstash pipeline file + // volume.NewSecretVolume( + // logstashv1alpha1.PipelineSecretName(params.Logstash.Name), + // PipelineVolumeName, + // path.Join(ConfigMountPath, PipelineFileName), + // PipelineFileName, + // 0644), + //} + + // secretName, name, mountPath string, projectedSecrets []string + + //projectedSecrets := + + //vols := []volume.VolumeLike{ + // ConfigVolume, + // // volume with logstash configuration file + // volume.NewSecretVolumeWithMountPath( + // logstashv1alpha1.ConfigSecretName(params.Logstash.Name), + // LogstashConfigVolumeName, + // ConfigMountPath), + // //// volume with logstash pipeline file + // volume.NewSecretVolumeWithMountPath( + // logstashv1alpha1.PipelineSecretName(params.Logstash.Name), + // PipelineVolumeName, + // ConfigMountPath), + // //path.Join(ConfigMountPath, PipelineFileName), + // ////PipelineFileName, + // //0644), + //} labels := maps.Merge(params.Logstash.GetIdentityLabels(), map[string]string{ VersionLabelName: spec.Version}) @@ -92,7 +169,8 @@ func buildPodTemplate(params Params, configHash hash.Hash32) corev1.PodTemplateS WithAutomountServiceAccountToken(). WithPorts(ports). WithReadinessProbe(readinessProbe(false)). - WithVolumeLikes(vols...) + WithVolumeLikes(vols...). + WithInitContainers(initConfigContainer(params.Logstash)) builder, err := stackmon.WithMonitoring(params.Context, params.Client, builder, params.Logstash) if err != nil { From b5f0ed892da443ddc2dc87709ce26545da9a38ba Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Mon, 10 Apr 2023 12:10:57 -0400 Subject: [PATCH 02/12] Working --- config/samples/logstash/logstash_svc.yaml | 5 +- .../elasticsearch/initcontainer/prepare_fs.go | 4 + .../elasticsearch/settings/config_volume.go | 46 ++++----- pkg/controller/logstash/init_configuration.go | 99 ++++++++++++++++--- pkg/controller/logstash/pod.go | 5 +- 5 files changed, 117 insertions(+), 42 deletions(-) diff --git a/config/samples/logstash/logstash_svc.yaml b/config/samples/logstash/logstash_svc.yaml index f0c6809292e..63d214c85d2 100644 --- a/config/samples/logstash/logstash_svc.yaml +++ b/config/samples/logstash/logstash_svc.yaml @@ -18,12 +18,13 @@ spec: count: 2 version: 8.6.1 config: - log.level: debug + log.level: info api.http.host: "0.0.0.0" queue.type: memory pipelines: - pipeline.id: one - config.string: "input { beats { port => 5045}} output { stdout {}}" + pipeline.workers: 2 + config.string: "input { beats { port => 5044 }} output { stdout {}}" services: - name: api service: diff --git a/pkg/controller/elasticsearch/initcontainer/prepare_fs.go b/pkg/controller/elasticsearch/initcontainer/prepare_fs.go index c17235028ad..95427d39bf4 100644 --- a/pkg/controller/elasticsearch/initcontainer/prepare_fs.go +++ b/pkg/controller/elasticsearch/initcontainer/prepare_fs.go @@ -40,6 +40,7 @@ var ( ContainerMountPath: esvolume.ConfigVolumeMountPath, } + // EsPluginsSharedVolume contains the ES plugins/ directory EsPluginsSharedVolume = volume.SharedVolume{ VolumeName: "elastic-internal-elasticsearch-plugins-local", @@ -74,6 +75,9 @@ var ( Source: stringsutil.Concat(settings.ConfigVolumeMountPath, "/", settings.ConfigFileName), Target: stringsutil.Concat(EsConfigSharedVolume.ContainerMountPath, "/", settings.ConfigFileName), }, + + + { Source: stringsutil.Concat(esvolume.UnicastHostsVolumeMountPath, "/", esvolume.UnicastHostsFile), Target: stringsutil.Concat(EsConfigSharedVolume.ContainerMountPath, "/", esvolume.UnicastHostsFile), diff --git a/pkg/controller/elasticsearch/settings/config_volume.go b/pkg/controller/elasticsearch/settings/config_volume.go index d42160f774e..365fe28feee 100644 --- a/pkg/controller/elasticsearch/settings/config_volume.go +++ b/pkg/controller/elasticsearch/settings/config_volume.go @@ -7,14 +7,14 @@ package settings import ( "context" - pkgerrors "github.com/pkg/errors" + //pkgerrors "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" esv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/reconciler" - common "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/settings" + //common "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/settings" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/volume" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/elasticsearch/label" "github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s" @@ -41,27 +41,27 @@ func ConfigSecretVolume(ssetName string) volume.SecretVolume { ) } -// GetESConfigContent retrieves the configuration secret of the given stateful set, -// and returns the corresponding CanonicalConfig. -func GetESConfigContent(client k8s.Client, namespace string, ssetName string) (CanonicalConfig, error) { - secret, err := GetESConfigSecret(client, namespace, ssetName) - if err != nil { - return CanonicalConfig{}, err - } - if len(secret.Data) == 0 { - return CanonicalConfig{}, pkgerrors.Errorf("no configuration found in secret %s", ConfigSecretName(ssetName)) - } - content := secret.Data[ConfigFileName] - if len(content) == 0 { - return CanonicalConfig{}, pkgerrors.Errorf("no configuration found in secret %s", ConfigSecretName(ssetName)) - } - - cfg, err := common.ParseConfig(content) - if err != nil { - return CanonicalConfig{}, err - } - return CanonicalConfig{cfg}, nil -} +//// GetESConfigContent retrieves the configuration secret of the given stateful set, +//// and returns the corresponding CanonicalConfig. +//func GetESConfigContent(client k8s.Client, namespace string, ssetName string) (CanonicalConfig, error) { +// secret, err := GetESConfigSecret(client, namespace, ssetName) +// if err != nil { +// return CanonicalConfig{}, err +// } +// if len(secret.Data) == 0 { +// return CanonicalConfig{}, pkgerrors.Errorf("no configuration found in secret %s", ConfigSecretName(ssetName)) +// } +// content := secret.Data[ConfigFileName] +// if len(content) == 0 { +// return CanonicalConfig{}, pkgerrors.Errorf("no configuration found in secret %s", ConfigSecretName(ssetName)) +// } +// +// cfg, err := common.ParseConfig(content) +// if err != nil { +// return CanonicalConfig{}, err +// } +// return CanonicalConfig{cfg}, nil +//} // GetESConfigSecret returns the secret holding the ES configuration for the given pod func GetESConfigSecret(client k8s.Client, namespace string, ssetName string) (corev1.Secret, error) { diff --git a/pkg/controller/logstash/init_configuration.go b/pkg/controller/logstash/init_configuration.go index 1106938b0dc..1f84c965ffc 100644 --- a/pkg/controller/logstash/init_configuration.go +++ b/pkg/controller/logstash/init_configuration.go @@ -8,9 +8,40 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + //"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/volume" logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" ) +//const ( +// //InitContainerConfigVolumeMountPath = "/mnt/elastic-internal/logstash-config-local" +// InitConfigContainerName = "logstash-internal-init-config" +// +// // InitConfigScript is a small bash script to prepare the logstash configuration directory +// InitConfigScript = `#!/usr/bin/env bash +//set -eux +// +//init_config_initialized_flag=` + InitContainerConfigVolumeMountPath + `/elastic-internal-init-config.ok +// +//if [[ -f "${init_config_initialized_flag}" ]]; then +// echo "Logstash configuration already initialized." +// exit 0 +//fi +// +//echo "Setup Logstash configuration" +// +//ln -sf /mnt/elastic-internal/logstash-pipeline/pipelines.yml /usr/share/logstash/config/ +//ln -sf /mnt/elastic-internal/logstash-config/logstash.yml /usr/share/logstash/config/ +// +//cp /usr/share/logstash/config/jvm.options /mnt/elastic-internal/logstash-config-local/jvm.options +//cp /usr/share/logstash/config/log4j2.properties /mnt/elastic-internal/logstash-config-local/log4j2.properties +// +// +// +//touch "${init_config_initialized_flag}" +//echo "Logstash configuration successfully prepared." +//` +//) + const ( //InitContainerConfigVolumeMountPath = "/mnt/elastic-internal/logstash-config-local" InitConfigContainerName = "logstash-internal-init-config" @@ -28,28 +59,69 @@ fi echo "Setup Logstash configuration" -mount_path=` + InitContainerConfigVolumeMountPath + ` +ls /mnt/elastic-internal/logstash-config/* + +ln -sf /mnt/elastic-internal/logstash-config/* /usr/share/logstash/config/ +ln -sf /mnt/elastic-internal/logstash-pipeline/* /usr/share/logstash/config/ +ln -sf /mnt/elastic-internal/logstash-config/* /mnt/elastic-internal/logstash-config-local/. +ln -sf /mnt/elastic-internal/logstash-pipeline/* /mnt/elastic-internal/logstash-config-local/. + -for f in /usr/share/logstash/config/*.*; do - filename=$(basename $f) - if [[ ! -f "$mount_path/$filename" ]]; then - cp $f $mount_path - fi -done +cp /usr/share/logstash/config/jvm.options /mnt/elastic-internal/logstash-config-local/. +cp /usr/share/logstash/config/log4j2.properties /mnt/elastic-internal/logstash-config-local/. -cp ` + InternalConfigVolumeMountPath + `/* ` + InitContainerConfigVolumeMountPath + ` -cp ` + InternalPipelineVolumeMountPath + `/* ` + InitContainerConfigVolumeMountPath + ` touch "${init_config_initialized_flag}" echo "Logstash configuration successfully prepared." ` ) + +//var ( +// LsConfigSharedVolume := volume.SharedVolume{ +// VolumeName: ConfigVolumeName, +// InitContainerMountPath: InitContainerConfigVolumeMountPath, +// ContainerMountPath: ConfigMountPath, +// } +// +// PluginVolumes = volume.SharedVolumeArray{ +// Array: []volume.SharedVolume{ +// LsConfigSharedVolume, +// LsPipelineShareVolume, +// EsBinSharedVolume, +// }, +// } +// +// ConfigVolumes := volume.SharedVolumeArray{ +// Array: []volume.SharedVolume{ +// VolumeMounts: []corev1.VolumeMount{ +// //ConfigSharedVolume.InitContainerVolumeMount(), +// ConfigVolume(ls).VolumeMount(), +// PipelineVolume(ls).VolumeMount(), +// }, +// +// EsConfigSharedVolume, +// EsPluginsSharedVolume, +// EsBinSharedVolume, +// }, +//} +//) + // initConfigContainer returns an init container that executes a bash script to prepare the logstash config directory. // The script copy config files from /use/share/logstash/config to /mnt/elastic-internal/logstash-config/ // TODO may be able to solve env2yaml permission issue with initContainer func initConfigContainer(ls logstashv1alpha1.Logstash) corev1.Container { privileged := false + //configVolumes := volume.SharedVolumeArray{ + // Array: []volume.SharedVolume{ + // EsConfigSharedVolume, + // EsPluginsSharedVolume, + // EsBinSharedVolume, + // }, + //} + //volumeMounts := + // // we will also inherit all volume mounts from the main container later on in the pod template builder + // configVolumes.InitContainerVolumeMounts() return corev1.Container{ // Image will be inherited from pod template defaults @@ -60,17 +132,14 @@ func initConfigContainer(ls logstashv1alpha1.Logstash) corev1.Container { Privileged: &privileged, }, Command: []string{"/usr/bin/env", "bash", "-c", InitConfigScript}, - //VolumeMounts: []corev1.VolumeMount{ - // { - // MountPath: InitContainerConfigVolumeMountPath, - // Name: ConfigVolumeName, - // }, - //}, + //VolumeMounts: volumeMounts, VolumeMounts: []corev1.VolumeMount{ ConfigSharedVolume.InitContainerVolumeMount(), ConfigVolume(ls).VolumeMount(), PipelineVolume(ls).VolumeMount(), }, + //PluginVolumes.InitContainerVolumeMounts(), + Resources: corev1.ResourceRequirements{ Requests: map[corev1.ResourceName]resource.Quantity{ corev1.ResourceMemory: resource.MustParse("50Mi"), diff --git a/pkg/controller/logstash/pod.go b/pkg/controller/logstash/pod.go index 8d998c6c631..670d6d5e667 100644 --- a/pkg/controller/logstash/pod.go +++ b/pkg/controller/logstash/pod.go @@ -90,7 +90,7 @@ var ( // ConfigVolume returns a SecretVolume to hold the Logstash config of the given Logstash resource. func ConfigVolume(ls logstashv1alpha1.Logstash) volume.SecretVolume { return volume.NewSecretVolumeWithMountPath( - SecretName(ls), + logstashv1alpha1.ConfigSecretName(ls.Name), InternalConfigVolumeName, InternalConfigVolumeMountPath, ) @@ -99,7 +99,7 @@ func ConfigVolume(ls logstashv1alpha1.Logstash) volume.SecretVolume { // PipelineVolume returns a SecretVolume to hold the Logstash config of the given Logstash resource. func PipelineVolume(ls logstashv1alpha1.Logstash) volume.SecretVolume { return volume.NewSecretVolumeWithMountPath( - PipelineSecretName(ls), + logstashv1alpha1.PipelineSecretName(ls.Name), InternalPipelineVolumeName, InternalPipelineVolumeMountPath, ) @@ -113,6 +113,7 @@ func buildPodTemplate(params Params, configHash hash.Hash32) corev1.PodTemplateS vols := []volume.VolumeLike{ConfigSharedVolume, ConfigVolume(params.Logstash), PipelineVolume(params.Logstash)} + //vols := []volume.VolumeLike{ // ConfigVolume, // // volume with logstash configuration file From 8500980493e4b1810f8cef8439a99f19035d495e Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Mon, 10 Apr 2023 13:22:26 -0400 Subject: [PATCH 03/12] Tidy up --- .../elasticsearch/initcontainer/prepare_fs.go | 4 - .../elasticsearch/settings/config_volume.go | 46 ++++----- pkg/controller/logstash/config.go | 1 + pkg/controller/logstash/init_configuration.go | 97 +++---------------- pkg/controller/logstash/pipeline.go | 3 +- pkg/controller/logstash/pod.go | 56 ----------- 6 files changed, 38 insertions(+), 169 deletions(-) diff --git a/pkg/controller/elasticsearch/initcontainer/prepare_fs.go b/pkg/controller/elasticsearch/initcontainer/prepare_fs.go index 95427d39bf4..c17235028ad 100644 --- a/pkg/controller/elasticsearch/initcontainer/prepare_fs.go +++ b/pkg/controller/elasticsearch/initcontainer/prepare_fs.go @@ -40,7 +40,6 @@ var ( ContainerMountPath: esvolume.ConfigVolumeMountPath, } - // EsPluginsSharedVolume contains the ES plugins/ directory EsPluginsSharedVolume = volume.SharedVolume{ VolumeName: "elastic-internal-elasticsearch-plugins-local", @@ -75,9 +74,6 @@ var ( Source: stringsutil.Concat(settings.ConfigVolumeMountPath, "/", settings.ConfigFileName), Target: stringsutil.Concat(EsConfigSharedVolume.ContainerMountPath, "/", settings.ConfigFileName), }, - - - { Source: stringsutil.Concat(esvolume.UnicastHostsVolumeMountPath, "/", esvolume.UnicastHostsFile), Target: stringsutil.Concat(EsConfigSharedVolume.ContainerMountPath, "/", esvolume.UnicastHostsFile), diff --git a/pkg/controller/elasticsearch/settings/config_volume.go b/pkg/controller/elasticsearch/settings/config_volume.go index 365fe28feee..d42160f774e 100644 --- a/pkg/controller/elasticsearch/settings/config_volume.go +++ b/pkg/controller/elasticsearch/settings/config_volume.go @@ -7,14 +7,14 @@ package settings import ( "context" - //pkgerrors "github.com/pkg/errors" + pkgerrors "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" esv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/reconciler" - //common "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/settings" + common "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/settings" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/volume" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/elasticsearch/label" "github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s" @@ -41,27 +41,27 @@ func ConfigSecretVolume(ssetName string) volume.SecretVolume { ) } -//// GetESConfigContent retrieves the configuration secret of the given stateful set, -//// and returns the corresponding CanonicalConfig. -//func GetESConfigContent(client k8s.Client, namespace string, ssetName string) (CanonicalConfig, error) { -// secret, err := GetESConfigSecret(client, namespace, ssetName) -// if err != nil { -// return CanonicalConfig{}, err -// } -// if len(secret.Data) == 0 { -// return CanonicalConfig{}, pkgerrors.Errorf("no configuration found in secret %s", ConfigSecretName(ssetName)) -// } -// content := secret.Data[ConfigFileName] -// if len(content) == 0 { -// return CanonicalConfig{}, pkgerrors.Errorf("no configuration found in secret %s", ConfigSecretName(ssetName)) -// } -// -// cfg, err := common.ParseConfig(content) -// if err != nil { -// return CanonicalConfig{}, err -// } -// return CanonicalConfig{cfg}, nil -//} +// GetESConfigContent retrieves the configuration secret of the given stateful set, +// and returns the corresponding CanonicalConfig. +func GetESConfigContent(client k8s.Client, namespace string, ssetName string) (CanonicalConfig, error) { + secret, err := GetESConfigSecret(client, namespace, ssetName) + if err != nil { + return CanonicalConfig{}, err + } + if len(secret.Data) == 0 { + return CanonicalConfig{}, pkgerrors.Errorf("no configuration found in secret %s", ConfigSecretName(ssetName)) + } + content := secret.Data[ConfigFileName] + if len(content) == 0 { + return CanonicalConfig{}, pkgerrors.Errorf("no configuration found in secret %s", ConfigSecretName(ssetName)) + } + + cfg, err := common.ParseConfig(content) + if err != nil { + return CanonicalConfig{}, err + } + return CanonicalConfig{cfg}, nil +} // GetESConfigSecret returns the secret holding the ES configuration for the given pod func GetESConfigSecret(client k8s.Client, namespace string, ssetName string) (corev1.Secret, error) { diff --git a/pkg/controller/logstash/config.go b/pkg/controller/logstash/config.go index 2aed9bf63cd..e29d213ff01 100644 --- a/pkg/controller/logstash/config.go +++ b/pkg/controller/logstash/config.go @@ -75,6 +75,7 @@ func defaultConfig() *settings.CanonicalConfig { settingsMap := map[string]interface{}{ // Set 'api.http.host' by default to `0.0.0.0` for readiness probe to work. "api.http.host": "0.0.0.0", + // Set `config.reload.automatic` to `true` to enable pipeline reloads by default "config.reload.automatic": true, } diff --git a/pkg/controller/logstash/init_configuration.go b/pkg/controller/logstash/init_configuration.go index 1f84c965ffc..6567c3b534f 100644 --- a/pkg/controller/logstash/init_configuration.go +++ b/pkg/controller/logstash/init_configuration.go @@ -8,42 +8,10 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - //"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/volume" logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" ) -//const ( -// //InitContainerConfigVolumeMountPath = "/mnt/elastic-internal/logstash-config-local" -// InitConfigContainerName = "logstash-internal-init-config" -// -// // InitConfigScript is a small bash script to prepare the logstash configuration directory -// InitConfigScript = `#!/usr/bin/env bash -//set -eux -// -//init_config_initialized_flag=` + InitContainerConfigVolumeMountPath + `/elastic-internal-init-config.ok -// -//if [[ -f "${init_config_initialized_flag}" ]]; then -// echo "Logstash configuration already initialized." -// exit 0 -//fi -// -//echo "Setup Logstash configuration" -// -//ln -sf /mnt/elastic-internal/logstash-pipeline/pipelines.yml /usr/share/logstash/config/ -//ln -sf /mnt/elastic-internal/logstash-config/logstash.yml /usr/share/logstash/config/ -// -//cp /usr/share/logstash/config/jvm.options /mnt/elastic-internal/logstash-config-local/jvm.options -//cp /usr/share/logstash/config/log4j2.properties /mnt/elastic-internal/logstash-config-local/log4j2.properties -// -// -// -//touch "${init_config_initialized_flag}" -//echo "Logstash configuration successfully prepared." -//` -//) - const ( - //InitContainerConfigVolumeMountPath = "/mnt/elastic-internal/logstash-config-local" InitConfigContainerName = "logstash-internal-init-config" // InitConfigScript is a small bash script to prepare the logstash configuration directory @@ -59,17 +27,17 @@ fi echo "Setup Logstash configuration" -ls /mnt/elastic-internal/logstash-config/* - -ln -sf /mnt/elastic-internal/logstash-config/* /usr/share/logstash/config/ -ln -sf /mnt/elastic-internal/logstash-pipeline/* /usr/share/logstash/config/ -ln -sf /mnt/elastic-internal/logstash-config/* /mnt/elastic-internal/logstash-config-local/. -ln -sf /mnt/elastic-internal/logstash-pipeline/* /mnt/elastic-internal/logstash-config-local/. +mount_path=` + InitContainerConfigVolumeMountPath + ` +for f in /usr/share/logstash/config/*.*; do + filename=$(basename $f) + if [[ ! -f "$mount_path/$filename" ]]; then + cp $f $mount_path + fi +done -cp /usr/share/logstash/config/jvm.options /mnt/elastic-internal/logstash-config-local/. -cp /usr/share/logstash/config/log4j2.properties /mnt/elastic-internal/logstash-config-local/. - +ln -sf `+ InternalConfigVolumeMountPath + `/logstash.yml $mount_path +ln -sf `+ InternalPipelineVolumeMountPath + `/pipelines.yml $mount_path touch "${init_config_initialized_flag}" echo "Logstash configuration successfully prepared." @@ -77,51 +45,12 @@ echo "Logstash configuration successfully prepared." ) -//var ( -// LsConfigSharedVolume := volume.SharedVolume{ -// VolumeName: ConfigVolumeName, -// InitContainerMountPath: InitContainerConfigVolumeMountPath, -// ContainerMountPath: ConfigMountPath, -// } -// -// PluginVolumes = volume.SharedVolumeArray{ -// Array: []volume.SharedVolume{ -// LsConfigSharedVolume, -// LsPipelineShareVolume, -// EsBinSharedVolume, -// }, -// } -// -// ConfigVolumes := volume.SharedVolumeArray{ -// Array: []volume.SharedVolume{ -// VolumeMounts: []corev1.VolumeMount{ -// //ConfigSharedVolume.InitContainerVolumeMount(), -// ConfigVolume(ls).VolumeMount(), -// PipelineVolume(ls).VolumeMount(), -// }, -// -// EsConfigSharedVolume, -// EsPluginsSharedVolume, -// EsBinSharedVolume, -// }, -//} -//) - // initConfigContainer returns an init container that executes a bash script to prepare the logstash config directory. -// The script copy config files from /use/share/logstash/config to /mnt/elastic-internal/logstash-config/ -// TODO may be able to solve env2yaml permission issue with initContainer +// This copies files from the `config` folder of the docker image, and creates symlinks for the operator created +// `logstash.yml` and `pipelines.yml` file into a shared config folder to use by the main logstash container. This +// enables dynamic reloads for `pipelines.yml` func initConfigContainer(ls logstashv1alpha1.Logstash) corev1.Container { privileged := false - //configVolumes := volume.SharedVolumeArray{ - // Array: []volume.SharedVolume{ - // EsConfigSharedVolume, - // EsPluginsSharedVolume, - // EsBinSharedVolume, - // }, - //} - //volumeMounts := - // // we will also inherit all volume mounts from the main container later on in the pod template builder - // configVolumes.InitContainerVolumeMounts() return corev1.Container{ // Image will be inherited from pod template defaults @@ -132,13 +61,11 @@ func initConfigContainer(ls logstashv1alpha1.Logstash) corev1.Container { Privileged: &privileged, }, Command: []string{"/usr/bin/env", "bash", "-c", InitConfigScript}, - //VolumeMounts: volumeMounts, VolumeMounts: []corev1.VolumeMount{ ConfigSharedVolume.InitContainerVolumeMount(), ConfigVolume(ls).VolumeMount(), PipelineVolume(ls).VolumeMount(), }, - //PluginVolumes.InitContainerVolumeMounts(), Resources: corev1.ResourceRequirements{ Requests: map[corev1.ResourceName]resource.Quantity{ diff --git a/pkg/controller/logstash/pipeline.go b/pkg/controller/logstash/pipeline.go index ca787612ea8..1c00ab18475 100644 --- a/pkg/controller/logstash/pipeline.go +++ b/pkg/controller/logstash/pipeline.go @@ -40,7 +40,8 @@ func reconcilePipeline(params Params, configHash hash.Hash) error { return err } - //_, _ = configHash.Write(cfgBytes) + // We DO NOT write changes to configHash here - this is to ensure that a pipeline change does not trigger a restart + // of the pod, but allows the automatic reload of pipelines to take place return nil } diff --git a/pkg/controller/logstash/pod.go b/pkg/controller/logstash/pod.go index 670d6d5e667..368ec560640 100644 --- a/pkg/controller/logstash/pod.go +++ b/pkg/controller/logstash/pod.go @@ -7,7 +7,6 @@ package logstash import ( "fmt" "hash" - //"path" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -62,22 +61,8 @@ var ( corev1.ResourceCPU: resource.MustParse("1000m"), }, } - // ConfigVolume is used to propagate the keystore file from the init container to - // Logstash main container. - //ConfigVolume = volume.NewEmptyDirVolume(ConfigVolumeName, ConfigMountPath) - ) -// SecretName is the name of the secret that holds the Logstash config for the given Logstash resource. -func SecretName(ls logstashv1alpha1.Logstash) string { - return ls.Name + "-ls-config" -} - -// SecretName is the name of the secret that holds the Logstash pipeline for the given Logstash resource. -func PipelineSecretName(ls logstashv1alpha1.Logstash) string { - return ls.Name + "-ls-pipeline" -} - var ( // ConfigSharedVolume contains the Logstash config/ directory, it contains the contents of config from the docker container ConfigSharedVolume = volume.SharedVolume{ @@ -112,47 +97,6 @@ func buildPodTemplate(params Params, configHash hash.Hash32) corev1.PodTemplateS builder := defaults.NewPodTemplateBuilder(params.GetPodTemplate(), logstashv1alpha1.LogstashContainerName) vols := []volume.VolumeLike{ConfigSharedVolume, ConfigVolume(params.Logstash), PipelineVolume(params.Logstash)} - - - //vols := []volume.VolumeLike{ - // ConfigVolume, - // // volume with logstash configuration file - // volume.NewSecretVolume( - // logstashv1alpha1.ConfigSecretName(params.Logstash.Name), - // LogstashConfigVolumeName, - // path.Join(ConfigMountPath, LogstashConfigFileName), - // LogstashConfigFileName, - // 0644), - // // volume with logstash pipeline file - // volume.NewSecretVolume( - // logstashv1alpha1.PipelineSecretName(params.Logstash.Name), - // PipelineVolumeName, - // path.Join(ConfigMountPath, PipelineFileName), - // PipelineFileName, - // 0644), - //} - - // secretName, name, mountPath string, projectedSecrets []string - - //projectedSecrets := - - //vols := []volume.VolumeLike{ - // ConfigVolume, - // // volume with logstash configuration file - // volume.NewSecretVolumeWithMountPath( - // logstashv1alpha1.ConfigSecretName(params.Logstash.Name), - // LogstashConfigVolumeName, - // ConfigMountPath), - // //// volume with logstash pipeline file - // volume.NewSecretVolumeWithMountPath( - // logstashv1alpha1.PipelineSecretName(params.Logstash.Name), - // PipelineVolumeName, - // ConfigMountPath), - // //path.Join(ConfigMountPath, PipelineFileName), - // ////PipelineFileName, - // //0644), - //} - labels := maps.Merge(params.Logstash.GetIdentityLabels(), map[string]string{ VersionLabelName: spec.Version}) From 603a83ec182b6a2ac02eb6251db7b8aaa7f7c369 Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Mon, 10 Apr 2023 13:57:25 -0400 Subject: [PATCH 04/12] Lint fixes --- .../{init_configuration.go => initcontainer.go} | 8 +++----- pkg/controller/logstash/pod.go | 10 ++++------ 2 files changed, 7 insertions(+), 11 deletions(-) rename pkg/controller/logstash/{init_configuration.go => initcontainer.go} (90%) diff --git a/pkg/controller/logstash/init_configuration.go b/pkg/controller/logstash/initcontainer.go similarity index 90% rename from pkg/controller/logstash/init_configuration.go rename to pkg/controller/logstash/initcontainer.go index 6567c3b534f..336262c626c 100644 --- a/pkg/controller/logstash/init_configuration.go +++ b/pkg/controller/logstash/initcontainer.go @@ -12,7 +12,7 @@ import ( ) const ( - InitConfigContainerName = "logstash-internal-init-config" + InitConfigContainerName = "logstash-internal-init-config" // InitConfigScript is a small bash script to prepare the logstash configuration directory InitConfigScript = `#!/usr/bin/env bash @@ -36,15 +36,14 @@ for f in /usr/share/logstash/config/*.*; do fi done -ln -sf `+ InternalConfigVolumeMountPath + `/logstash.yml $mount_path -ln -sf `+ InternalPipelineVolumeMountPath + `/pipelines.yml $mount_path +ln -sf ` + InternalConfigVolumeMountPath + `/logstash.yml $mount_path +ln -sf ` + InternalPipelineVolumeMountPath + `/pipelines.yml $mount_path touch "${init_config_initialized_flag}" echo "Logstash configuration successfully prepared." ` ) - // initConfigContainer returns an init container that executes a bash script to prepare the logstash config directory. // This copies files from the `config` folder of the docker image, and creates symlinks for the operator created // `logstash.yml` and `pipelines.yml` file into a shared config folder to use by the main logstash container. This @@ -54,7 +53,6 @@ func initConfigContainer(ls logstashv1alpha1.Logstash) corev1.Container { return corev1.Container{ // Image will be inherited from pod template defaults - Image: "docker.elastic.co/logstash/logstash:8.6.1", ImagePullPolicy: corev1.PullIfNotPresent, Name: InitConfigContainerName, SecurityContext: &corev1.SecurityContext{ diff --git a/pkg/controller/logstash/pod.go b/pkg/controller/logstash/pod.go index 368ec560640..396a2f7111e 100644 --- a/pkg/controller/logstash/pod.go +++ b/pkg/controller/logstash/pod.go @@ -41,13 +41,11 @@ const ( InitContainerConfigVolumeMountPath = "/mnt/elastic-internal/logstash-config-local" - // InternalConfigVolumeName is a volume which contains the generated configuration. - InternalConfigVolumeName = "elastic-internal-logstash-config" - InternalConfigVolumeMountPath = "/mnt/elastic-internal/logstash-config" + InternalConfigVolumeName = "elastic-internal-logstash-config" + InternalConfigVolumeMountPath = "/mnt/elastic-internal/logstash-config" InternalPipelineVolumeName = "elastic-internal-logstash-pipeline" InternalPipelineVolumeMountPath = "/mnt/elastic-internal/logstash-pipeline" - ) var ( @@ -90,7 +88,6 @@ func PipelineVolume(ls logstashv1alpha1.Logstash) volume.SecretVolume { ) } - func buildPodTemplate(params Params, configHash hash.Hash32) corev1.PodTemplateSpec { defer tracing.Span(¶ms.Context)() spec := ¶ms.Logstash.Spec @@ -115,7 +112,8 @@ func buildPodTemplate(params Params, configHash hash.Hash32) corev1.PodTemplateS WithPorts(ports). WithReadinessProbe(readinessProbe(false)). WithVolumeLikes(vols...). - WithInitContainers(initConfigContainer(params.Logstash)) + WithInitContainers(initConfigContainer(params.Logstash)). + WithInitContainerDefaults() builder, err := stackmon.WithMonitoring(params.Context, params.Client, builder, params.Logstash) if err != nil { From 0c3aeb4ed53b5ddbf66e4e5fa76cbec0a6ca4a0f Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Mon, 10 Apr 2023 17:03:10 -0400 Subject: [PATCH 05/12] Add e2e tests for pipeline changes --- test/e2e/logstash/pipeline_test.go | 58 ++++++++++++++++++++++++++++++ test/e2e/test/logstash/steps.go | 11 +++--- 2 files changed, 63 insertions(+), 6 deletions(-) diff --git a/test/e2e/logstash/pipeline_test.go b/test/e2e/logstash/pipeline_test.go index 576dd11a778..594108c2871 100644 --- a/test/e2e/logstash/pipeline_test.go +++ b/test/e2e/logstash/pipeline_test.go @@ -155,3 +155,61 @@ func TestPipelineConfigLogstash(t *testing.T) { test.Sequence(before, steps, b).RunSequential(t) } + +// Verify that pipelines will reload when the Pipeline definition changes. +func TestLogstashPipelineReload(t *testing.T) { + name := "test-ls-reload" + + logstashFirstPipeline := logstash.NewBuilder(name).WithNodeCount(1). + WithPipelines([]commonv1.Config{ + { + Data: map[string]interface{}{ + "pipeline.id": "main", + "pipeline.workers": 1, + "config.string": "input { beats{ port => 5044}} output { stdout{} }", + }, + }, + }) + + logstashSecondPipeline := logstash.Builder{Logstash: *logstashFirstPipeline.Logstash.DeepCopy()}. + WithPipelines([]commonv1.Config{ + { + Data: map[string]interface{}{ + "pipeline.id": "main", + "pipeline.workers": 2, + "config.string": "input { beats{ port => 5044} } output { stdout{} }", + }, + }, + }). + WithMutatedFrom(&logstashFirstPipeline) + + stepsFn := func(k *test.K8sClient) test.StepList { + return test.StepList{}. + WithSteps(logstashFirstPipeline.CheckK8sTestSteps(k)). + WithStep( + logstashFirstPipeline.CheckMetricsRequest(k, + logstash.Request{ + Name: "pipeline [main]", + Path: "/_node/pipelines/main", + }, + logstash.Want{ + Status: "green", + Match: map[string]string{"pipelines.main.workers": "1"}, + }), + ). + WithSteps(logstashSecondPipeline.MutationTestSteps(k)). + WithStep( + logstashSecondPipeline.CheckMetricsRequest(k, + logstash.Request{ + Name: "pipeline [main]", + Path: "/_node/pipelines/main", + }, + logstash.Want{ + Status: "green", + Match: map[string]string{"pipelines.main.workers": "2"}, + }), + ) + } + + test.Sequence(nil, stepsFn, logstashFirstPipeline).RunSequential(t) +} \ No newline at end of file diff --git a/test/e2e/test/logstash/steps.go b/test/e2e/test/logstash/steps.go index e1fc34976ad..82f3bfc7c45 100644 --- a/test/e2e/test/logstash/steps.go +++ b/test/e2e/test/logstash/steps.go @@ -105,17 +105,16 @@ func (b Builder) UpgradeTestSteps(k *test.K8sClient) test.StepList { }} } + func (b Builder) MutationTestSteps(k *test.K8sClient) test.StepList { - var entSearchGenerationBeforeMutation, entSearchObservedGenerationBeforeMutation int64 + var logstashGenerationBeforeMutation, logstashObservedGenerationBeforeMutation int64 isMutated := b.MutatedFrom != nil - return test.StepList{ - generation.RetrieveGenerationsStep(&b.Logstash, k, &entSearchGenerationBeforeMutation, &entSearchObservedGenerationBeforeMutation), - }.WithSteps(test.AnnotatePodsWithBuilderHash(b, b.MutatedFrom, k)). - WithSteps(b.UpgradeTestSteps(k)). + generation.RetrieveGenerationsStep(&b.Logstash, k, &logstashGenerationBeforeMutation, &logstashObservedGenerationBeforeMutation), + }.WithSteps(b.UpgradeTestSteps(k)). WithSteps(b.CheckK8sTestSteps(k)). WithSteps(b.CheckStackTestSteps(k)). - WithStep(generation.CompareObjectGenerationsStep(&b.Logstash, k, isMutated, entSearchGenerationBeforeMutation, entSearchObservedGenerationBeforeMutation)) + WithStep(generation.CompareObjectGenerationsStep(&b.Logstash, k, isMutated, logstashGenerationBeforeMutation, logstashObservedGenerationBeforeMutation)) } func (b Builder) DeletionTestSteps(k *test.K8sClient) test.StepList { From 505f1fcd7d3e753a88803b29fd872e76e5ba1277 Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Mon, 10 Apr 2023 17:16:16 -0400 Subject: [PATCH 06/12] Remove unnecessary configHash from pipeline reconciliation --- pkg/controller/logstash/driver.go | 5 ++++- pkg/controller/logstash/pipeline.go | 5 +---- test/e2e/test/logstash/steps.go | 1 - 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/controller/logstash/driver.go b/pkg/controller/logstash/driver.go index afc12e7d679..5744b350434 100644 --- a/pkg/controller/logstash/driver.go +++ b/pkg/controller/logstash/driver.go @@ -88,7 +88,10 @@ func internalReconcile(params Params) (*reconciler.Results, logstashv1alpha1.Log return results.WithError(err), params.Status } - if err := reconcilePipeline(params, configHash); err != nil { + // We intenetionally DO NOT pass the configHash here. We don't want to consider the pipeline definitions in the + // hash of the config to ensure that a pipeline change does not automatically trigger a restart + // of the pod, but allows Logstash's automatic reload of pipelines to take place + if err := reconcilePipeline(params); err != nil { return results.WithError(err), params.Status } diff --git a/pkg/controller/logstash/pipeline.go b/pkg/controller/logstash/pipeline.go index 1c00ab18475..1481e350472 100644 --- a/pkg/controller/logstash/pipeline.go +++ b/pkg/controller/logstash/pipeline.go @@ -17,7 +17,7 @@ import ( "github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/pipelines" ) -func reconcilePipeline(params Params, configHash hash.Hash) error { +func reconcilePipeline(params Params) error { defer tracing.Span(¶ms.Context)() cfgBytes, err := buildPipeline(params) @@ -40,9 +40,6 @@ func reconcilePipeline(params Params, configHash hash.Hash) error { return err } - // We DO NOT write changes to configHash here - this is to ensure that a pipeline change does not trigger a restart - // of the pod, but allows the automatic reload of pipelines to take place - return nil } diff --git a/test/e2e/test/logstash/steps.go b/test/e2e/test/logstash/steps.go index 82f3bfc7c45..6a0b3a16349 100644 --- a/test/e2e/test/logstash/steps.go +++ b/test/e2e/test/logstash/steps.go @@ -105,7 +105,6 @@ func (b Builder) UpgradeTestSteps(k *test.K8sClient) test.StepList { }} } - func (b Builder) MutationTestSteps(k *test.K8sClient) test.StepList { var logstashGenerationBeforeMutation, logstashObservedGenerationBeforeMutation int64 isMutated := b.MutatedFrom != nil From a1aadaecb2712518f449817ab3d9766db319022f Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Tue, 11 Apr 2023 08:45:26 -0400 Subject: [PATCH 07/12] Remove unused import --- pkg/controller/logstash/pipeline.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/controller/logstash/pipeline.go b/pkg/controller/logstash/pipeline.go index 1481e350472..6a00c7e0746 100644 --- a/pkg/controller/logstash/pipeline.go +++ b/pkg/controller/logstash/pipeline.go @@ -5,8 +5,6 @@ package logstash import ( - "hash" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" From 5843ff7a9a884005d6aa94dde9437dbdb4c81e34 Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Tue, 11 Apr 2023 12:42:35 -0400 Subject: [PATCH 08/12] Update tests with pipeline reload functionality --- pkg/controller/logstash/config_test.go | 12 ++++++++++++ pkg/controller/logstash/pod.go | 7 +------ pkg/controller/logstash/pod_test.go | 12 ++++++------ 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/pkg/controller/logstash/config_test.go b/pkg/controller/logstash/config_test.go index ff7a92beb43..3183b9227fe 100644 --- a/pkg/controller/logstash/config_test.go +++ b/pkg/controller/logstash/config_test.go @@ -40,6 +40,9 @@ func Test_newConfig(t *testing.T) { want: `api: http: host: 0.0.0.0 +config: + reload: + automatic: true `, wantErr: false, }, @@ -56,6 +59,9 @@ func Test_newConfig(t *testing.T) { want: `api: http: host: 0.0.0.0 +config: + reload: + automatic: true log: level: debug `, @@ -70,6 +76,9 @@ log: want: `api: http: host: 0.0.0.0 +config: + reload: + automatic: true log: level: debug `, @@ -86,6 +95,9 @@ log: want: `api: http: host: 0.0.0.0 +config: + reload: + automatic: true log: level: warn `, diff --git a/pkg/controller/logstash/pod.go b/pkg/controller/logstash/pod.go index 8724d72e6eb..e923cd10901 100644 --- a/pkg/controller/logstash/pod.go +++ b/pkg/controller/logstash/pod.go @@ -110,14 +110,9 @@ func buildPodTemplate(params Params, configHash hash.Hash32) corev1.PodTemplateS WithDockerImage(spec.Image, container.ImageRepository(container.LogstashImage, spec.Version)). WithAutomountServiceAccountToken(). WithPorts(ports). -<<<<<<< HEAD - WithReadinessProbe(readinessProbe(false)). - WithVolumeLikes(vols...). - WithInitContainers(initConfigContainer(params.Logstash)). -======= WithReadinessProbe(readinessProbe(params.Logstash)). WithVolumeLikes(vols...). ->>>>>>> upstream/feature/logstash + WithInitContainers(initConfigContainer(params.Logstash)). WithInitContainerDefaults() builder, err := stackmon.WithMonitoring(params.Context, params.Client, builder, params.Logstash) diff --git a/pkg/controller/logstash/pod_test.go b/pkg/controller/logstash/pod_test.go index 60fc57cb7fb..cbf9604f6cf 100644 --- a/pkg/controller/logstash/pod_test.go +++ b/pkg/controller/logstash/pod_test.go @@ -40,12 +40,12 @@ func TestNewPodTemplateSpec(t *testing.T) { assertions: func(pod corev1.PodTemplateSpec) { assert.Equal(t, false, *pod.Spec.AutomountServiceAccountToken) assert.Len(t, pod.Spec.Containers, 1) - assert.Len(t, pod.Spec.InitContainers, 0) - assert.Len(t, pod.Spec.Volumes, 2) + assert.Len(t, pod.Spec.InitContainers, 1) + assert.Len(t, pod.Spec.Volumes, 3) assert.NotEmpty(t, pod.Annotations[ConfigHashAnnotationName]) logstashContainer := GetLogstashContainer(pod.Spec) require.NotNil(t, logstashContainer) - assert.Equal(t, 2, len(logstashContainer.VolumeMounts)) + assert.Equal(t, 3, len(logstashContainer.VolumeMounts)) assert.Equal(t, container.ImageRepository(container.LogstashImage, "8.6.1"), logstashContainer.Image) assert.NotNil(t, logstashContainer.ReadinessProbe) assert.NotEmpty(t, logstashContainer.Ports) @@ -111,7 +111,7 @@ func TestNewPodTemplateSpec(t *testing.T) { }, }}, assertions: func(pod corev1.PodTemplateSpec) { - assert.Len(t, pod.Spec.InitContainers, 1) + assert.Len(t, pod.Spec.InitContainers, 2) assert.Equal(t, pod.Spec.Containers[0].Image, pod.Spec.InitContainers[0].Image) }, }, @@ -260,8 +260,8 @@ func TestNewPodTemplateSpec(t *testing.T) { }, }}, assertions: func(pod corev1.PodTemplateSpec) { - assert.Len(t, pod.Spec.Volumes, 3) - assert.Len(t, GetLogstashContainer(pod.Spec).VolumeMounts, 3) + assert.Len(t, pod.Spec.Volumes, 4) + assert.Len(t, GetLogstashContainer(pod.Spec).VolumeMounts, 4) }, }, } From d09f7febaf40e9141f5590a1eefb0ffe9a0d34e3 Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Thu, 13 Apr 2023 09:02:15 -0400 Subject: [PATCH 09/12] Mark pods as updated to speed up discovery and application of pipeline changes --- pkg/controller/logstash/labels.go | 7 +++++ pkg/controller/logstash/pipeline.go | 44 +++++++++++++++++++++++++++-- 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/pkg/controller/logstash/labels.go b/pkg/controller/logstash/labels.go index e277e77e19b..d1d9d580495 100644 --- a/pkg/controller/logstash/labels.go +++ b/pkg/controller/logstash/labels.go @@ -5,6 +5,8 @@ package logstash import ( + "sigs.k8s.io/controller-runtime/pkg/client" + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" ) @@ -27,3 +29,8 @@ func NewLabels(logstash logstashv1alpha1.Logstash) map[string]string { NameLabelName: logstash.Name, } } + +// NewLabelSelectorForLogstash returns a labels.Selector that matches the labels as constructed by NewLabels +func NewLabelSelectorForLogstash(ls logstashv1alpha1.Logstash) client.MatchingLabels { + return client.MatchingLabels(map[string]string{commonv1.TypeLabelName: TypeLabelValue, NameLabelName: ls.Name}) +} diff --git a/pkg/controller/logstash/pipeline.go b/pkg/controller/logstash/pipeline.go index 6a00c7e0746..6cbfee3883d 100644 --- a/pkg/controller/logstash/pipeline.go +++ b/pkg/controller/logstash/pipeline.go @@ -5,14 +5,21 @@ package logstash import ( + "reflect" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/annotation" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/labels" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/reconciler" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/tracing" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/pipelines" + + "github.com/elastic/cloud-on-k8s/v2/pkg/utils/maps" ) func reconcilePipeline(params Params) error { @@ -34,13 +41,46 @@ func reconcilePipeline(params Params) error { }, } - if _, err = reconciler.ReconcileSecret(params.Context, params.Client, expected, ¶ms.Logstash); err != nil { + if err := reconcileSecretWithFastUpdate(params, expected); err != nil { return err } - return nil } +// This function reconciles the secret, but then adds a postUpdate step to mark the pods as updated +// to trigger a quicker reload of the updated secret than waiting for the kubelet sync interval to kick in +func reconcileSecretWithFastUpdate(params Params, expected corev1.Secret) error { + var reconciled corev1.Secret + + return reconciler.ReconcileResource(reconciler.Params{ + Context: params.Context, + Client: params.Client, + Owner: ¶ms.Logstash, + Expected: &expected, + Reconciled: &reconciled, + NeedsUpdate: func() bool { + // update if expected labels and annotations are not there + return !maps.IsSubset(expected.Labels, reconciled.Labels) || + !maps.IsSubset(expected.Annotations, reconciled.Annotations) || + // or if secret data is not strictly equal + !reflect.DeepEqual(expected.Data, reconciled.Data) + }, + UpdateReconciled: func() { + // set expected annotations and labels, but don't remove existing ones + // that may have been defaulted or set by the user on the existing resource + reconciled.Labels = maps.Merge(reconciled.Labels, expected.Labels) + reconciled.Annotations = maps.Merge(reconciled.Annotations, expected.Annotations) + reconciled.Data = expected.Data + }, + PostUpdate: func() { + annotation.MarkPodsAsUpdated(params.Context, params.Client, + client.InNamespace(params.Logstash.Namespace), + NewLabelSelectorForLogstash(params.Logstash), + ) + }, + }) +} + func buildPipeline(params Params) ([]byte, error) { userProvidedCfg, err := getUserPipeline(params) if err != nil { From fb157add9ec6661a2db9800570f2def40d24e537 Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Fri, 21 Apr 2023 09:01:01 -0400 Subject: [PATCH 10/12] Apply suggestions from code review Co-authored-by: Peter Brachwitz --- pkg/controller/logstash/driver.go | 2 +- pkg/controller/logstash/initcontainer.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/controller/logstash/driver.go b/pkg/controller/logstash/driver.go index 5744b350434..79308f5f1f1 100644 --- a/pkg/controller/logstash/driver.go +++ b/pkg/controller/logstash/driver.go @@ -88,7 +88,7 @@ func internalReconcile(params Params) (*reconciler.Results, logstashv1alpha1.Log return results.WithError(err), params.Status } - // We intenetionally DO NOT pass the configHash here. We don't want to consider the pipeline definitions in the + // We intentionally DO NOT pass the configHash here. We don't want to consider the pipeline definitions in the // hash of the config to ensure that a pipeline change does not automatically trigger a restart // of the pod, but allows Logstash's automatic reload of pipelines to take place if err := reconcilePipeline(params); err != nil { diff --git a/pkg/controller/logstash/initcontainer.go b/pkg/controller/logstash/initcontainer.go index 336262c626c..91f9041adcb 100644 --- a/pkg/controller/logstash/initcontainer.go +++ b/pkg/controller/logstash/initcontainer.go @@ -16,7 +16,7 @@ const ( // InitConfigScript is a small bash script to prepare the logstash configuration directory InitConfigScript = `#!/usr/bin/env bash -set -eux +set -eu init_config_initialized_flag=` + InitContainerConfigVolumeMountPath + `/elastic-internal-init-config.ok From 598fe50bde2683f54a8fc60527e31b0ecd45a07a Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Fri, 21 Apr 2023 10:03:14 -0400 Subject: [PATCH 11/12] Code review suggestions --- pkg/controller/common/reconciler/secret.go | 17 ++++++-- pkg/controller/logstash/pipeline.go | 45 ++++------------------ 2 files changed, 21 insertions(+), 41 deletions(-) diff --git a/pkg/controller/common/reconciler/secret.go b/pkg/controller/common/reconciler/secret.go index 0b6026f87ad..d141d86a836 100644 --- a/pkg/controller/common/reconciler/secret.go +++ b/pkg/controller/common/reconciler/secret.go @@ -30,11 +30,18 @@ const ( SoftOwnerKindLabel = "eck.k8s.elastic.co/owner-kind" ) +func WithPostUpdate(f func()) func(p *Params) { + return func(p *Params) { + p.PostUpdate = f + } +} + // ReconcileSecret creates or updates the actual secret to match the expected one. // Existing annotations or labels that are not expected are preserved. -func ReconcileSecret(ctx context.Context, c k8s.Client, expected corev1.Secret, owner client.Object) (corev1.Secret, error) { +func ReconcileSecret(ctx context.Context, c k8s.Client, expected corev1.Secret, owner client.Object, opts ...func(*Params)) (corev1.Secret, error) { var reconciled corev1.Secret - if err := ReconcileResource(Params{ + + params := Params{ Context: ctx, Client: c, Owner: owner, @@ -54,7 +61,11 @@ func ReconcileSecret(ctx context.Context, c k8s.Client, expected corev1.Secret, reconciled.Annotations = maps.Merge(reconciled.Annotations, expected.Annotations) reconciled.Data = expected.Data }, - }); err != nil { + } + for _, opt := range opts { + opt(¶ms) + } + if err := ReconcileResource(params); err != nil { return corev1.Secret{}, err } return reconciled, nil diff --git a/pkg/controller/logstash/pipeline.go b/pkg/controller/logstash/pipeline.go index 6cbfee3883d..8aee3add3d7 100644 --- a/pkg/controller/logstash/pipeline.go +++ b/pkg/controller/logstash/pipeline.go @@ -5,8 +5,6 @@ package logstash import ( - "reflect" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -18,8 +16,6 @@ import ( "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/reconciler" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/tracing" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/pipelines" - - "github.com/elastic/cloud-on-k8s/v2/pkg/utils/maps" ) func reconcilePipeline(params Params) error { @@ -41,44 +37,17 @@ func reconcilePipeline(params Params) error { }, } - if err := reconcileSecretWithFastUpdate(params, expected); err != nil { - return err - } - return nil -} - -// This function reconciles the secret, but then adds a postUpdate step to mark the pods as updated -// to trigger a quicker reload of the updated secret than waiting for the kubelet sync interval to kick in -func reconcileSecretWithFastUpdate(params Params, expected corev1.Secret) error { - var reconciled corev1.Secret - - return reconciler.ReconcileResource(reconciler.Params{ - Context: params.Context, - Client: params.Client, - Owner: ¶ms.Logstash, - Expected: &expected, - Reconciled: &reconciled, - NeedsUpdate: func() bool { - // update if expected labels and annotations are not there - return !maps.IsSubset(expected.Labels, reconciled.Labels) || - !maps.IsSubset(expected.Annotations, reconciled.Annotations) || - // or if secret data is not strictly equal - !reflect.DeepEqual(expected.Data, reconciled.Data) - }, - UpdateReconciled: func() { - // set expected annotations and labels, but don't remove existing ones - // that may have been defaulted or set by the user on the existing resource - reconciled.Labels = maps.Merge(reconciled.Labels, expected.Labels) - reconciled.Annotations = maps.Merge(reconciled.Annotations, expected.Annotations) - reconciled.Data = expected.Data - }, - PostUpdate: func() { + if _, err := reconciler.ReconcileSecret(params.Context, params.Client, expected, ¶ms.Logstash, + reconciler.WithPostUpdate(func() { annotation.MarkPodsAsUpdated(params.Context, params.Client, client.InNamespace(params.Logstash.Namespace), NewLabelSelectorForLogstash(params.Logstash), ) - }, - }) + }), + ); err != nil { + return err + } + return nil } func buildPipeline(params Params) ([]byte, error) { From 1884ff9a758a577316712c3661d5fa301bc8278d Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Fri, 21 Apr 2023 10:06:49 -0400 Subject: [PATCH 12/12] Apply suggestions from code review Co-authored-by: Thibault Richard --- pkg/controller/logstash/initcontainer.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/pkg/controller/logstash/initcontainer.go b/pkg/controller/logstash/initcontainer.go index 91f9041adcb..8e5b6b09331 100644 --- a/pkg/controller/logstash/initcontainer.go +++ b/pkg/controller/logstash/initcontainer.go @@ -22,19 +22,14 @@ init_config_initialized_flag=` + InitContainerConfigVolumeMountPath + `/elastic- if [[ -f "${init_config_initialized_flag}" ]]; then echo "Logstash configuration already initialized." - exit 0 + exit 0 fi echo "Setup Logstash configuration" mount_path=` + InitContainerConfigVolumeMountPath + ` -for f in /usr/share/logstash/config/*.*; do - filename=$(basename $f) - if [[ ! -f "$mount_path/$filename" ]]; then - cp $f $mount_path - fi -done +cp -f /usr/share/logstash/config/*.* "$mount_path" ln -sf ` + InternalConfigVolumeMountPath + `/logstash.yml $mount_path ln -sf ` + InternalPipelineVolumeMountPath + `/pipelines.yml $mount_path @@ -45,9 +40,9 @@ echo "Logstash configuration successfully prepared." ) // initConfigContainer returns an init container that executes a bash script to prepare the logstash config directory. -// This copies files from the `config` folder of the docker image, and creates symlinks for the operator created -// `logstash.yml` and `pipelines.yml` file into a shared config folder to use by the main logstash container. This -// enables dynamic reloads for `pipelines.yml` +// This copies files from the `config` folder of the docker image, and creates symlinks for the `logstash.yml` and +// `pipelines.yml` files created by the operator into a shared config folder to be used by the main logstash container. +// This enables dynamic reloads for `pipelines.yml`. func initConfigContainer(ls logstashv1alpha1.Logstash) corev1.Container { privileged := false