diff --git a/config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml b/config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml index f92a4a038ce..c693c339a01 100644 --- a/config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml +++ b/config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml @@ -1944,7 +1944,8 @@ spec: defined in spec.resourceClaims, that are used by this container. \n This is an alpha field and requires enabling the DynamicResourceAllocation - feature gate. \n This field is immutable." + feature gate. \n This field is immutable. It can + only be set for containers." items: description: ResourceClaim references one entry in PodSpec.ResourceClaims. @@ -3314,7 +3315,8 @@ spec: defined in spec.resourceClaims, that are used by this container. \n This is an alpha field and requires enabling the DynamicResourceAllocation - feature gate. \n This field is immutable." + feature gate. \n This field is immutable. It can + only be set for containers." items: description: ResourceClaim references one entry in PodSpec.ResourceClaims. @@ -4717,7 +4719,8 @@ spec: defined in spec.resourceClaims, that are used by this container. \n This is an alpha field and requires enabling the DynamicResourceAllocation - feature gate. \n This field is immutable." + feature gate. \n This field is immutable. It can + only be set for containers." items: description: ResourceClaim references one entry in PodSpec.ResourceClaims. @@ -6523,7 +6526,8 @@ spec: that are used by this container. \n This is an alpha field and requires enabling the DynamicResourceAllocation - feature gate. \n This field is immutable." + feature gate. \n This field is immutable. + It can only be set for containers." items: description: ResourceClaim references one entry in PodSpec.ResourceClaims. diff --git a/config/samples/logstash/logstash_svc.yaml b/config/samples/logstash/logstash_svc.yaml index cdbae09dd3f..e2c95c82e9d 100644 --- a/config/samples/logstash/logstash_svc.yaml +++ b/config/samples/logstash/logstash_svc.yaml @@ -22,6 +22,10 @@ spec: api.http.host: "0.0.0.0" api.http.port: 9601 queue.type: memory + pipelines: + - pipeline.id: one + pipeline.workers: 2 + config.string: "input { beats { port => 5044 }} output { stdout {}}" services: - name: api service: diff --git a/pkg/controller/common/reconciler/secret.go b/pkg/controller/common/reconciler/secret.go index 0b6026f87ad..d141d86a836 100644 --- a/pkg/controller/common/reconciler/secret.go +++ b/pkg/controller/common/reconciler/secret.go @@ -30,11 +30,18 @@ const ( SoftOwnerKindLabel = "eck.k8s.elastic.co/owner-kind" ) +func WithPostUpdate(f func()) func(p *Params) { + return func(p *Params) { + p.PostUpdate = f + } +} + // ReconcileSecret creates or updates the actual secret to match the expected one. // Existing annotations or labels that are not expected are preserved. -func ReconcileSecret(ctx context.Context, c k8s.Client, expected corev1.Secret, owner client.Object) (corev1.Secret, error) { +func ReconcileSecret(ctx context.Context, c k8s.Client, expected corev1.Secret, owner client.Object, opts ...func(*Params)) (corev1.Secret, error) { var reconciled corev1.Secret - if err := ReconcileResource(Params{ + + params := Params{ Context: ctx, Client: c, Owner: owner, @@ -54,7 +61,11 @@ func ReconcileSecret(ctx context.Context, c k8s.Client, expected corev1.Secret, reconciled.Annotations = maps.Merge(reconciled.Annotations, expected.Annotations) reconciled.Data = expected.Data }, - }); err != nil { + } + for _, opt := range opts { + opt(¶ms) + } + if err := ReconcileResource(params); err != nil { return corev1.Secret{}, err } return reconciled, nil diff --git a/pkg/controller/logstash/config.go b/pkg/controller/logstash/config.go index 5a0409dbb60..e29d213ff01 100644 --- a/pkg/controller/logstash/config.go +++ b/pkg/controller/logstash/config.go @@ -75,6 +75,8 @@ func defaultConfig() *settings.CanonicalConfig { settingsMap := map[string]interface{}{ // Set 'api.http.host' by default to `0.0.0.0` for readiness probe to work. "api.http.host": "0.0.0.0", + // Set `config.reload.automatic` to `true` to enable pipeline reloads by default + "config.reload.automatic": true, } return settings.MustCanonicalConfig(settingsMap) diff --git a/pkg/controller/logstash/config_test.go b/pkg/controller/logstash/config_test.go index ff7a92beb43..3183b9227fe 100644 --- a/pkg/controller/logstash/config_test.go +++ b/pkg/controller/logstash/config_test.go @@ -40,6 +40,9 @@ func Test_newConfig(t *testing.T) { want: `api: http: host: 0.0.0.0 +config: + reload: + automatic: true `, wantErr: false, }, @@ -56,6 +59,9 @@ func Test_newConfig(t *testing.T) { want: `api: http: host: 0.0.0.0 +config: + reload: + automatic: true log: level: debug `, @@ -70,6 +76,9 @@ log: want: `api: http: host: 0.0.0.0 +config: + reload: + automatic: true log: level: debug `, @@ -86,6 +95,9 @@ log: want: `api: http: host: 0.0.0.0 +config: + reload: + automatic: true log: level: warn `, diff --git a/pkg/controller/logstash/driver.go b/pkg/controller/logstash/driver.go index afc12e7d679..79308f5f1f1 100644 --- a/pkg/controller/logstash/driver.go +++ b/pkg/controller/logstash/driver.go @@ -88,7 +88,10 @@ func internalReconcile(params Params) (*reconciler.Results, logstashv1alpha1.Log return results.WithError(err), params.Status } - if err := reconcilePipeline(params, configHash); err != nil { + // We intentionally DO NOT pass the configHash here. We don't want to consider the pipeline definitions in the + // hash of the config to ensure that a pipeline change does not automatically trigger a restart + // of the pod, but allows Logstash's automatic reload of pipelines to take place + if err := reconcilePipeline(params); err != nil { return results.WithError(err), params.Status } diff --git a/pkg/controller/logstash/initcontainer.go b/pkg/controller/logstash/initcontainer.go new file mode 100644 index 00000000000..8e5b6b09331 --- /dev/null +++ b/pkg/controller/logstash/initcontainer.go @@ -0,0 +1,75 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package logstash + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" +) + +const ( + InitConfigContainerName = "logstash-internal-init-config" + + // InitConfigScript is a small bash script to prepare the logstash configuration directory + InitConfigScript = `#!/usr/bin/env bash +set -eu + +init_config_initialized_flag=` + InitContainerConfigVolumeMountPath + `/elastic-internal-init-config.ok + +if [[ -f "${init_config_initialized_flag}" ]]; then + echo "Logstash configuration already initialized." + exit 0 +fi + +echo "Setup Logstash configuration" + +mount_path=` + InitContainerConfigVolumeMountPath + ` + +cp -f /usr/share/logstash/config/*.* "$mount_path" + +ln -sf ` + InternalConfigVolumeMountPath + `/logstash.yml $mount_path +ln -sf ` + InternalPipelineVolumeMountPath + `/pipelines.yml $mount_path + +touch "${init_config_initialized_flag}" +echo "Logstash configuration successfully prepared." +` +) + +// initConfigContainer returns an init container that executes a bash script to prepare the logstash config directory. +// This copies files from the `config` folder of the docker image, and creates symlinks for the `logstash.yml` and +// `pipelines.yml` files created by the operator into a shared config folder to be used by the main logstash container. +// This enables dynamic reloads for `pipelines.yml`. +func initConfigContainer(ls logstashv1alpha1.Logstash) corev1.Container { + privileged := false + + return corev1.Container{ + // Image will be inherited from pod template defaults + ImagePullPolicy: corev1.PullIfNotPresent, + Name: InitConfigContainerName, + SecurityContext: &corev1.SecurityContext{ + Privileged: &privileged, + }, + Command: []string{"/usr/bin/env", "bash", "-c", InitConfigScript}, + VolumeMounts: []corev1.VolumeMount{ + ConfigSharedVolume.InitContainerVolumeMount(), + ConfigVolume(ls).VolumeMount(), + PipelineVolume(ls).VolumeMount(), + }, + + Resources: corev1.ResourceRequirements{ + Requests: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceMemory: resource.MustParse("50Mi"), + corev1.ResourceCPU: resource.MustParse("0.1"), + }, + Limits: map[corev1.ResourceName]resource.Quantity{ + // Memory limit should be at least 12582912 when running with CRI-O + corev1.ResourceMemory: resource.MustParse("50Mi"), + corev1.ResourceCPU: resource.MustParse("0.1"), + }, + }, + } +} \ No newline at end of file diff --git a/pkg/controller/logstash/labels.go b/pkg/controller/logstash/labels.go index e277e77e19b..d1d9d580495 100644 --- a/pkg/controller/logstash/labels.go +++ b/pkg/controller/logstash/labels.go @@ -5,6 +5,8 @@ package logstash import ( + "sigs.k8s.io/controller-runtime/pkg/client" + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" ) @@ -27,3 +29,8 @@ func NewLabels(logstash logstashv1alpha1.Logstash) map[string]string { NameLabelName: logstash.Name, } } + +// NewLabelSelectorForLogstash returns a labels.Selector that matches the labels as constructed by NewLabels +func NewLabelSelectorForLogstash(ls logstashv1alpha1.Logstash) client.MatchingLabels { + return client.MatchingLabels(map[string]string{commonv1.TypeLabelName: TypeLabelValue, NameLabelName: ls.Name}) +} diff --git a/pkg/controller/logstash/pipeline.go b/pkg/controller/logstash/pipeline.go index ddc696b29df..8aee3add3d7 100644 --- a/pkg/controller/logstash/pipeline.go +++ b/pkg/controller/logstash/pipeline.go @@ -5,19 +5,20 @@ package logstash import ( - "hash" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/annotation" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/labels" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/reconciler" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/tracing" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/pipelines" ) -func reconcilePipeline(params Params, configHash hash.Hash) error { +func reconcilePipeline(params Params) error { defer tracing.Span(¶ms.Context)() cfgBytes, err := buildPipeline(params) @@ -36,12 +37,16 @@ func reconcilePipeline(params Params, configHash hash.Hash) error { }, } - if _, err = reconciler.ReconcileSecret(params.Context, params.Client, expected, ¶ms.Logstash); err != nil { + if _, err := reconciler.ReconcileSecret(params.Context, params.Client, expected, ¶ms.Logstash, + reconciler.WithPostUpdate(func() { + annotation.MarkPodsAsUpdated(params.Context, params.Client, + client.InNamespace(params.Logstash.Namespace), + NewLabelSelectorForLogstash(params.Logstash), + ) + }), + ); err != nil { return err } - - _, _ = configHash.Write(cfgBytes) - return nil } diff --git a/pkg/controller/logstash/pod.go b/pkg/controller/logstash/pod.go index 2bd3a6c8b25..e923cd10901 100644 --- a/pkg/controller/logstash/pod.go +++ b/pkg/controller/logstash/pod.go @@ -7,7 +7,6 @@ package logstash import ( "fmt" "hash" - "path" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -39,6 +38,14 @@ const ( // VersionLabelName is a label used to track the version of a Logstash Pod. VersionLabelName = "logstash.k8s.elastic.co/version" + + InitContainerConfigVolumeMountPath = "/mnt/elastic-internal/logstash-config-local" + + // InternalConfigVolumeName is a volume which contains the generated configuration. + InternalConfigVolumeName = "elastic-internal-logstash-config" + InternalConfigVolumeMountPath = "/mnt/elastic-internal/logstash-config" + InternalPipelineVolumeName = "elastic-internal-logstash-pipeline" + InternalPipelineVolumeMountPath = "/mnt/elastic-internal/logstash-pipeline" ) var ( @@ -54,26 +61,38 @@ var ( } ) +var ( + // ConfigSharedVolume contains the Logstash config/ directory, it contains the contents of config from the docker container + ConfigSharedVolume = volume.SharedVolume{ + VolumeName: ConfigVolumeName, + InitContainerMountPath: InitContainerConfigVolumeMountPath, + ContainerMountPath: ConfigMountPath, + } +) + +// ConfigVolume returns a SecretVolume to hold the Logstash config of the given Logstash resource. +func ConfigVolume(ls logstashv1alpha1.Logstash) volume.SecretVolume { + return volume.NewSecretVolumeWithMountPath( + logstashv1alpha1.ConfigSecretName(ls.Name), + InternalConfigVolumeName, + InternalConfigVolumeMountPath, + ) +} + +// PipelineVolume returns a SecretVolume to hold the Logstash config of the given Logstash resource. +func PipelineVolume(ls logstashv1alpha1.Logstash) volume.SecretVolume { + return volume.NewSecretVolumeWithMountPath( + logstashv1alpha1.PipelineSecretName(ls.Name), + InternalPipelineVolumeName, + InternalPipelineVolumeMountPath, + ) +} + func buildPodTemplate(params Params, configHash hash.Hash32) corev1.PodTemplateSpec { defer tracing.Span(¶ms.Context)() spec := ¶ms.Logstash.Spec builder := defaults.NewPodTemplateBuilder(params.GetPodTemplate(), logstashv1alpha1.LogstashContainerName) - vols := []volume.VolumeLike{ - // volume with logstash configuration file - volume.NewSecretVolume( - logstashv1alpha1.ConfigSecretName(params.Logstash.Name), - LogstashConfigVolumeName, - path.Join(ConfigMountPath, LogstashConfigFileName), - LogstashConfigFileName, - 0644), - // volume with logstash pipeline file - volume.NewSecretVolume( - logstashv1alpha1.PipelineSecretName(params.Logstash.Name), - PipelineVolumeName, - path.Join(ConfigMountPath, PipelineFileName), - PipelineFileName, - 0644), - } + vols := []volume.VolumeLike{ConfigSharedVolume, ConfigVolume(params.Logstash), PipelineVolume(params.Logstash)} labels := maps.Merge(params.Logstash.GetIdentityLabels(), map[string]string{ VersionLabelName: spec.Version}) @@ -93,6 +112,7 @@ func buildPodTemplate(params Params, configHash hash.Hash32) corev1.PodTemplateS WithPorts(ports). WithReadinessProbe(readinessProbe(params.Logstash)). WithVolumeLikes(vols...). + WithInitContainers(initConfigContainer(params.Logstash)). WithInitContainerDefaults() builder, err := stackmon.WithMonitoring(params.Context, params.Client, builder, params.Logstash) diff --git a/pkg/controller/logstash/pod_test.go b/pkg/controller/logstash/pod_test.go index 60fc57cb7fb..cbf9604f6cf 100644 --- a/pkg/controller/logstash/pod_test.go +++ b/pkg/controller/logstash/pod_test.go @@ -40,12 +40,12 @@ func TestNewPodTemplateSpec(t *testing.T) { assertions: func(pod corev1.PodTemplateSpec) { assert.Equal(t, false, *pod.Spec.AutomountServiceAccountToken) assert.Len(t, pod.Spec.Containers, 1) - assert.Len(t, pod.Spec.InitContainers, 0) - assert.Len(t, pod.Spec.Volumes, 2) + assert.Len(t, pod.Spec.InitContainers, 1) + assert.Len(t, pod.Spec.Volumes, 3) assert.NotEmpty(t, pod.Annotations[ConfigHashAnnotationName]) logstashContainer := GetLogstashContainer(pod.Spec) require.NotNil(t, logstashContainer) - assert.Equal(t, 2, len(logstashContainer.VolumeMounts)) + assert.Equal(t, 3, len(logstashContainer.VolumeMounts)) assert.Equal(t, container.ImageRepository(container.LogstashImage, "8.6.1"), logstashContainer.Image) assert.NotNil(t, logstashContainer.ReadinessProbe) assert.NotEmpty(t, logstashContainer.Ports) @@ -111,7 +111,7 @@ func TestNewPodTemplateSpec(t *testing.T) { }, }}, assertions: func(pod corev1.PodTemplateSpec) { - assert.Len(t, pod.Spec.InitContainers, 1) + assert.Len(t, pod.Spec.InitContainers, 2) assert.Equal(t, pod.Spec.Containers[0].Image, pod.Spec.InitContainers[0].Image) }, }, @@ -260,8 +260,8 @@ func TestNewPodTemplateSpec(t *testing.T) { }, }}, assertions: func(pod corev1.PodTemplateSpec) { - assert.Len(t, pod.Spec.Volumes, 3) - assert.Len(t, GetLogstashContainer(pod.Spec).VolumeMounts, 3) + assert.Len(t, pod.Spec.Volumes, 4) + assert.Len(t, GetLogstashContainer(pod.Spec).VolumeMounts, 4) }, }, } diff --git a/test/e2e/logstash/pipeline_test.go b/test/e2e/logstash/pipeline_test.go index 576dd11a778..594108c2871 100644 --- a/test/e2e/logstash/pipeline_test.go +++ b/test/e2e/logstash/pipeline_test.go @@ -155,3 +155,61 @@ func TestPipelineConfigLogstash(t *testing.T) { test.Sequence(before, steps, b).RunSequential(t) } + +// Verify that pipelines will reload when the Pipeline definition changes. +func TestLogstashPipelineReload(t *testing.T) { + name := "test-ls-reload" + + logstashFirstPipeline := logstash.NewBuilder(name).WithNodeCount(1). + WithPipelines([]commonv1.Config{ + { + Data: map[string]interface{}{ + "pipeline.id": "main", + "pipeline.workers": 1, + "config.string": "input { beats{ port => 5044}} output { stdout{} }", + }, + }, + }) + + logstashSecondPipeline := logstash.Builder{Logstash: *logstashFirstPipeline.Logstash.DeepCopy()}. + WithPipelines([]commonv1.Config{ + { + Data: map[string]interface{}{ + "pipeline.id": "main", + "pipeline.workers": 2, + "config.string": "input { beats{ port => 5044} } output { stdout{} }", + }, + }, + }). + WithMutatedFrom(&logstashFirstPipeline) + + stepsFn := func(k *test.K8sClient) test.StepList { + return test.StepList{}. + WithSteps(logstashFirstPipeline.CheckK8sTestSteps(k)). + WithStep( + logstashFirstPipeline.CheckMetricsRequest(k, + logstash.Request{ + Name: "pipeline [main]", + Path: "/_node/pipelines/main", + }, + logstash.Want{ + Status: "green", + Match: map[string]string{"pipelines.main.workers": "1"}, + }), + ). + WithSteps(logstashSecondPipeline.MutationTestSteps(k)). + WithStep( + logstashSecondPipeline.CheckMetricsRequest(k, + logstash.Request{ + Name: "pipeline [main]", + Path: "/_node/pipelines/main", + }, + logstash.Want{ + Status: "green", + Match: map[string]string{"pipelines.main.workers": "2"}, + }), + ) + } + + test.Sequence(nil, stepsFn, logstashFirstPipeline).RunSequential(t) +} \ No newline at end of file diff --git a/test/e2e/test/logstash/steps.go b/test/e2e/test/logstash/steps.go index e1fc34976ad..6a0b3a16349 100644 --- a/test/e2e/test/logstash/steps.go +++ b/test/e2e/test/logstash/steps.go @@ -106,16 +106,14 @@ func (b Builder) UpgradeTestSteps(k *test.K8sClient) test.StepList { } func (b Builder) MutationTestSteps(k *test.K8sClient) test.StepList { - var entSearchGenerationBeforeMutation, entSearchObservedGenerationBeforeMutation int64 + var logstashGenerationBeforeMutation, logstashObservedGenerationBeforeMutation int64 isMutated := b.MutatedFrom != nil - return test.StepList{ - generation.RetrieveGenerationsStep(&b.Logstash, k, &entSearchGenerationBeforeMutation, &entSearchObservedGenerationBeforeMutation), - }.WithSteps(test.AnnotatePodsWithBuilderHash(b, b.MutatedFrom, k)). - WithSteps(b.UpgradeTestSteps(k)). + generation.RetrieveGenerationsStep(&b.Logstash, k, &logstashGenerationBeforeMutation, &logstashObservedGenerationBeforeMutation), + }.WithSteps(b.UpgradeTestSteps(k)). WithSteps(b.CheckK8sTestSteps(k)). WithSteps(b.CheckStackTestSteps(k)). - WithStep(generation.CompareObjectGenerationsStep(&b.Logstash, k, isMutated, entSearchGenerationBeforeMutation, entSearchObservedGenerationBeforeMutation)) + WithStep(generation.CompareObjectGenerationsStep(&b.Logstash, k, isMutated, logstashGenerationBeforeMutation, logstashObservedGenerationBeforeMutation)) } func (b Builder) DeletionTestSteps(k *test.K8sClient) test.StepList {