Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1944,7 +1944,8 @@ spec:
defined in spec.resourceClaims, that are used
by this container. \n This is an alpha field and
requires enabling the DynamicResourceAllocation
feature gate. \n This field is immutable."
feature gate. \n This field is immutable. It can
only be set for containers."
items:
description: ResourceClaim references one entry
in PodSpec.ResourceClaims.
Expand Down Expand Up @@ -3314,7 +3315,8 @@ spec:
defined in spec.resourceClaims, that are used
by this container. \n This is an alpha field and
requires enabling the DynamicResourceAllocation
feature gate. \n This field is immutable."
feature gate. \n This field is immutable. It can
only be set for containers."
items:
description: ResourceClaim references one entry
in PodSpec.ResourceClaims.
Expand Down Expand Up @@ -4717,7 +4719,8 @@ spec:
defined in spec.resourceClaims, that are used
by this container. \n This is an alpha field and
requires enabling the DynamicResourceAllocation
feature gate. \n This field is immutable."
feature gate. \n This field is immutable. It can
only be set for containers."
items:
description: ResourceClaim references one entry
in PodSpec.ResourceClaims.
Expand Down Expand Up @@ -6523,7 +6526,8 @@ spec:
that are used by this container. \n
This is an alpha field and requires
enabling the DynamicResourceAllocation
feature gate. \n This field is immutable."
feature gate. \n This field is immutable.
It can only be set for containers."
items:
description: ResourceClaim references
one entry in PodSpec.ResourceClaims.
Expand Down
4 changes: 4 additions & 0 deletions config/samples/logstash/logstash_svc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ spec:
api.http.host: "0.0.0.0"
api.http.port: 9601
queue.type: memory
pipelines:
- pipeline.id: one
pipeline.workers: 2
config.string: "input { beats { port => 5044 }} output { stdout {}}"
services:
- name: api
service:
Expand Down
17 changes: 14 additions & 3 deletions pkg/controller/common/reconciler/secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,18 @@ const (
SoftOwnerKindLabel = "eck.k8s.elastic.co/owner-kind"
)

func WithPostUpdate(f func()) func(p *Params) {
return func(p *Params) {
p.PostUpdate = f
}
}

// ReconcileSecret creates or updates the actual secret to match the expected one.
// Existing annotations or labels that are not expected are preserved.
func ReconcileSecret(ctx context.Context, c k8s.Client, expected corev1.Secret, owner client.Object) (corev1.Secret, error) {
func ReconcileSecret(ctx context.Context, c k8s.Client, expected corev1.Secret, owner client.Object, opts ...func(*Params)) (corev1.Secret, error) {
var reconciled corev1.Secret
if err := ReconcileResource(Params{

params := Params{
Context: ctx,
Client: c,
Owner: owner,
Expand All @@ -54,7 +61,11 @@ func ReconcileSecret(ctx context.Context, c k8s.Client, expected corev1.Secret,
reconciled.Annotations = maps.Merge(reconciled.Annotations, expected.Annotations)
reconciled.Data = expected.Data
},
}); err != nil {
}
for _, opt := range opts {
opt(&params)
}
if err := ReconcileResource(params); err != nil {
return corev1.Secret{}, err
}
return reconciled, nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/logstash/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func defaultConfig() *settings.CanonicalConfig {
settingsMap := map[string]interface{}{
// Set 'api.http.host' by default to `0.0.0.0` for readiness probe to work.
"api.http.host": "0.0.0.0",
// Set `config.reload.automatic` to `true` to enable pipeline reloads by default
"config.reload.automatic": true,
}

return settings.MustCanonicalConfig(settingsMap)
Expand Down
12 changes: 12 additions & 0 deletions pkg/controller/logstash/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func Test_newConfig(t *testing.T) {
want: `api:
http:
host: 0.0.0.0
config:
reload:
automatic: true
`,
wantErr: false,
},
Expand All @@ -56,6 +59,9 @@ func Test_newConfig(t *testing.T) {
want: `api:
http:
host: 0.0.0.0
config:
reload:
automatic: true
log:
level: debug
`,
Expand All @@ -70,6 +76,9 @@ log:
want: `api:
http:
host: 0.0.0.0
config:
reload:
automatic: true
log:
level: debug
`,
Expand All @@ -86,6 +95,9 @@ log:
want: `api:
http:
host: 0.0.0.0
config:
reload:
automatic: true
log:
level: warn
`,
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/logstash/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ func internalReconcile(params Params) (*reconciler.Results, logstashv1alpha1.Log
return results.WithError(err), params.Status
}

if err := reconcilePipeline(params, configHash); err != nil {
// We intentionally DO NOT pass the configHash here. We don't want to consider the pipeline definitions in the
// hash of the config to ensure that a pipeline change does not automatically trigger a restart
// of the pod, but allows Logstash's automatic reload of pipelines to take place
if err := reconcilePipeline(params); err != nil {
Comment on lines +91 to +94
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we pass the configHash when config.reload.automaticequals false?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good question. I'm erring on the side of 'no' at the moment, but I think this is something that could change after the technical preview depending on feedback.

My reasoning on this is that the false (default) value of non-k8s logstash doesn't react to pipeline changes at all, and to change this semantic to restart logstash completely on pipeline changes feels like very different behaviour.

Thinking about how we could add flexibility, I wonder if we might want to introduce something for ECK here, along the lines of:

config.reload.restart_policy: detected_only|all|none, which would either set config.reload.automatic: true for detected_only, and false for all or none, passing the configHash if the value is all, and not if it is none.

cc @flexitrev, @roaksoax, @jsvd

return results.WithError(err), params.Status
}

Expand Down
75 changes: 75 additions & 0 deletions pkg/controller/logstash/initcontainer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package logstash

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1"
)

const (
InitConfigContainerName = "logstash-internal-init-config"

// InitConfigScript is a small bash script to prepare the logstash configuration directory
InitConfigScript = `#!/usr/bin/env bash
set -eu

init_config_initialized_flag=` + InitContainerConfigVolumeMountPath + `/elastic-internal-init-config.ok

if [[ -f "${init_config_initialized_flag}" ]]; then
echo "Logstash configuration already initialized."
exit 0
fi

echo "Setup Logstash configuration"

mount_path=` + InitContainerConfigVolumeMountPath + `

cp -f /usr/share/logstash/config/*.* "$mount_path"

ln -sf ` + InternalConfigVolumeMountPath + `/logstash.yml $mount_path
ln -sf ` + InternalPipelineVolumeMountPath + `/pipelines.yml $mount_path

touch "${init_config_initialized_flag}"
echo "Logstash configuration successfully prepared."
`
)

// initConfigContainer returns an init container that executes a bash script to prepare the logstash config directory.
// This copies files from the `config` folder of the docker image, and creates symlinks for the `logstash.yml` and
// `pipelines.yml` files created by the operator into a shared config folder to be used by the main logstash container.
// This enables dynamic reloads for `pipelines.yml`.
func initConfigContainer(ls logstashv1alpha1.Logstash) corev1.Container {
privileged := false

return corev1.Container{
// Image will be inherited from pod template defaults
ImagePullPolicy: corev1.PullIfNotPresent,
Name: InitConfigContainerName,
SecurityContext: &corev1.SecurityContext{
Privileged: &privileged,
},
Command: []string{"/usr/bin/env", "bash", "-c", InitConfigScript},
VolumeMounts: []corev1.VolumeMount{
ConfigSharedVolume.InitContainerVolumeMount(),
ConfigVolume(ls).VolumeMount(),
PipelineVolume(ls).VolumeMount(),
},

Resources: corev1.ResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceMemory: resource.MustParse("50Mi"),
corev1.ResourceCPU: resource.MustParse("0.1"),
},
Limits: map[corev1.ResourceName]resource.Quantity{
// Memory limit should be at least 12582912 when running with CRI-O
corev1.ResourceMemory: resource.MustParse("50Mi"),
corev1.ResourceCPU: resource.MustParse("0.1"),
},
},
}
}
7 changes: 7 additions & 0 deletions pkg/controller/logstash/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package logstash

import (
"sigs.k8s.io/controller-runtime/pkg/client"

commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1"
logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1"
)
Expand All @@ -27,3 +29,8 @@ func NewLabels(logstash logstashv1alpha1.Logstash) map[string]string {
NameLabelName: logstash.Name,
}
}

// NewLabelSelectorForLogstash returns a labels.Selector that matches the labels as constructed by NewLabels
func NewLabelSelectorForLogstash(ls logstashv1alpha1.Logstash) client.MatchingLabels {
return client.MatchingLabels(map[string]string{commonv1.TypeLabelName: TypeLabelValue, NameLabelName: ls.Name})
}
19 changes: 12 additions & 7 deletions pkg/controller/logstash/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,20 @@
package logstash

import (
"hash"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"sigs.k8s.io/controller-runtime/pkg/client"

logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/annotation"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/labels"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/tracing"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/pipelines"
)

func reconcilePipeline(params Params, configHash hash.Hash) error {
func reconcilePipeline(params Params) error {
defer tracing.Span(&params.Context)()

cfgBytes, err := buildPipeline(params)
Expand All @@ -36,12 +37,16 @@ func reconcilePipeline(params Params, configHash hash.Hash) error {
},
}

if _, err = reconciler.ReconcileSecret(params.Context, params.Client, expected, &params.Logstash); err != nil {
if _, err := reconciler.ReconcileSecret(params.Context, params.Client, expected, &params.Logstash,
reconciler.WithPostUpdate(func() {
annotation.MarkPodsAsUpdated(params.Context, params.Client,
client.InNamespace(params.Logstash.Namespace),
NewLabelSelectorForLogstash(params.Logstash),
)
}),
); err != nil {
return err
}

_, _ = configHash.Write(cfgBytes)

return nil
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

diff --git a/pkg/controller/common/reconciler/secret.go b/pkg/controller/common/reconciler/secret.go
index 0b6026f87..50004fd80 100644
--- a/pkg/controller/common/reconciler/secret.go
+++ b/pkg/controller/common/reconciler/secret.go
@@ -30,11 +30,17 @@ const (
 	SoftOwnerKindLabel      = "eck.k8s.elastic.co/owner-kind"
 )
 
+func WithPostUpdate(f func()) func(p *Params) {
+	return func(p *Params) {
+		p.PostUpdate = f
+	}
+}
+
 // ReconcileSecret creates or updates the actual secret to match the expected one.
 // Existing annotations or labels that are not expected are preserved.
-func ReconcileSecret(ctx context.Context, c k8s.Client, expected corev1.Secret, owner client.Object) (corev1.Secret, error) {
+func ReconcileSecret(ctx context.Context, c k8s.Client, expected corev1.Secret, owner client.Object, opts ...func(*Params)) (corev1.Secret, error) {
 	var reconciled corev1.Secret
-	if err := ReconcileResource(Params{
+	params := Params{
 		Context:    ctx,
 		Client:     c,
 		Owner:      owner,
@@ -54,7 +60,11 @@ func ReconcileSecret(ctx context.Context, c k8s.Client, expected corev1.Secret,
 			reconciled.Annotations = maps.Merge(reconciled.Annotations, expected.Annotations)
 			reconciled.Data = expected.Data
 		},
-	}); err != nil {
+	}
+	for _, opt := range opts {
+		opt(&params)
+	}
+	if err := ReconcileResource(params); err != nil {
 		return corev1.Secret{}, err
 	}
 	return reconciled, nil
diff --git a/pkg/controller/logstash/pipeline.go b/pkg/controller/logstash/pipeline.go
index 6cbfee388..447ed7b8b 100644
--- a/pkg/controller/logstash/pipeline.go
+++ b/pkg/controller/logstash/pipeline.go
@@ -41,7 +41,13 @@ func reconcilePipeline(params Params) error {
 		},
 	}
 
-	if err := reconcileSecretWithFastUpdate(params, expected); err != nil {
+	if _, err := reconciler.ReconcileSecret(params.Context, params.Client, expected, &params.Logstash,
+		reconciler.WithPostUpdate(func() {
+			annotation.MarkPodsAsUpdated(params.Context, params.Client,
+				client.InNamespace(params.Logstash.Namespace),
+				NewLabelSelectorForLogstash(params.Logstash),
+			)
+		})); err != nil {
 		return err
 	}
 	return nil

If we want to reuse the existing secret reconciliation we could add a slice of option functions at the end


Expand Down
54 changes: 37 additions & 17 deletions pkg/controller/logstash/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package logstash
import (
"fmt"
"hash"
"path"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -39,6 +38,14 @@ const (

// VersionLabelName is a label used to track the version of a Logstash Pod.
VersionLabelName = "logstash.k8s.elastic.co/version"

InitContainerConfigVolumeMountPath = "/mnt/elastic-internal/logstash-config-local"

// InternalConfigVolumeName is a volume which contains the generated configuration.
InternalConfigVolumeName = "elastic-internal-logstash-config"
InternalConfigVolumeMountPath = "/mnt/elastic-internal/logstash-config"
InternalPipelineVolumeName = "elastic-internal-logstash-pipeline"
InternalPipelineVolumeMountPath = "/mnt/elastic-internal/logstash-pipeline"
)

var (
Expand All @@ -54,26 +61,38 @@ var (
}
)

var (
// ConfigSharedVolume contains the Logstash config/ directory, it contains the contents of config from the docker container
ConfigSharedVolume = volume.SharedVolume{
VolumeName: ConfigVolumeName,
InitContainerMountPath: InitContainerConfigVolumeMountPath,
ContainerMountPath: ConfigMountPath,
}
)

// ConfigVolume returns a SecretVolume to hold the Logstash config of the given Logstash resource.
func ConfigVolume(ls logstashv1alpha1.Logstash) volume.SecretVolume {
return volume.NewSecretVolumeWithMountPath(
logstashv1alpha1.ConfigSecretName(ls.Name),
InternalConfigVolumeName,
InternalConfigVolumeMountPath,
)
}

// PipelineVolume returns a SecretVolume to hold the Logstash config of the given Logstash resource.
func PipelineVolume(ls logstashv1alpha1.Logstash) volume.SecretVolume {
return volume.NewSecretVolumeWithMountPath(
logstashv1alpha1.PipelineSecretName(ls.Name),
InternalPipelineVolumeName,
InternalPipelineVolumeMountPath,
)
}

func buildPodTemplate(params Params, configHash hash.Hash32) corev1.PodTemplateSpec {
defer tracing.Span(&params.Context)()
spec := &params.Logstash.Spec
builder := defaults.NewPodTemplateBuilder(params.GetPodTemplate(), logstashv1alpha1.LogstashContainerName)
vols := []volume.VolumeLike{
// volume with logstash configuration file
volume.NewSecretVolume(
logstashv1alpha1.ConfigSecretName(params.Logstash.Name),
LogstashConfigVolumeName,
path.Join(ConfigMountPath, LogstashConfigFileName),
LogstashConfigFileName,
0644),
// volume with logstash pipeline file
volume.NewSecretVolume(
logstashv1alpha1.PipelineSecretName(params.Logstash.Name),
PipelineVolumeName,
path.Join(ConfigMountPath, PipelineFileName),
PipelineFileName,
0644),
}
vols := []volume.VolumeLike{ConfigSharedVolume, ConfigVolume(params.Logstash), PipelineVolume(params.Logstash)}

labels := maps.Merge(params.Logstash.GetIdentityLabels(), map[string]string{
VersionLabelName: spec.Version})
Expand All @@ -93,6 +112,7 @@ func buildPodTemplate(params Params, configHash hash.Hash32) corev1.PodTemplateS
WithPorts(ports).
WithReadinessProbe(readinessProbe(params.Logstash)).
WithVolumeLikes(vols...).
WithInitContainers(initConfigContainer(params.Logstash)).
WithInitContainerDefaults()

builder, err := stackmon.WithMonitoring(params.Context, params.Client, builder, params.Logstash)
Expand Down
Loading