From 613119958c252347b546b293480bf999395c9096 Mon Sep 17 00:00:00 2001 From: Amir Blum Date: Wed, 4 Dec 2024 15:09:01 +0200 Subject: [PATCH] feat: add memory limiter to drop data when a soft limit is reached (#1827) ## Problem At the moment, if there is pressure in the pipeline for any reason, and batches are failed to export, they will start building up in the queues of the collector exporter and grow memory unboundly. Since we don't set any memory request or limit on the node collectors ds, they will just go on to consume more and more of the available memory on the node: 1. Will show a pick in resource consumption on the cluster metrics. 2. Starve other pods on the same node, which now has less spare memory to grow into. 3. If the issue is not transient, the memory will just keep increasing over time 4. The amount of data in the retry buffers, will keep the CPU busy attempting to retry the rejected or unsuccessful batches. ## Levels of Protections To prevent the above issues, we imply few level of protections, listed from first line to last resort: 1. setting GOMEMLIMIT to a (now hardcoded constant) `352MiB`. At this point, go runtime GC should kick in and start reclaiming memory aggressively. 2. Setting the otel collector soft limit to (now hardcoded constant) `384MiB`. When the heap allocations reach this amount, the collector will start dropping batches of data after they are exported from the `batch` processor, instead of streaming them down the pipeline. 3. Setting the otel collector hard limit to `512MiB`. When the heap reaches this number, a forced GC is performed. 4. Setting the memory request to `256MiB`. This ensures we have at least this amount of memory to handle normal traffic and some slack for spikes without running into OOM. the rest of the memory is consumed from available memory on the node which by handy for more buffering, but may also cause OOM if the node has no resources. ## Future Work - Add configuration options to set these values, preferably as a spectrum for trace-offs: "resource-stability", "resource-spikecapacity" - drop the data as it received not after it is batched - https://github.com/open-telemetry/opentelemetry-collector/issues/11726 - drop data at receiver when it's implemented in collector - https://github.com/open-telemetry/opentelemetry-collector/issues/9591 --- Makefile | 7 +++- .../controllers/common/memorylimiter.go | 17 +++++++++ .../controllers/datacollection/configmap.go | 36 ++++++++++++------- .../controllers/datacollection/daemonset.go | 16 +++++++++ autoscaler/controllers/gateway/configmap.go | 7 +--- .../controllers/nodecollectorsgroup/common.go | 34 ++++++++++++++++-- tests/common/assert/pipeline-ready.yaml | 9 +++++ 7 files changed, 103 insertions(+), 23 deletions(-) create mode 100644 autoscaler/controllers/common/memorylimiter.go diff --git a/Makefile b/Makefile index 87412c735..f53b2179a 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ TAG ?= $(shell odigos version --cluster) ODIGOS_CLI_VERSION ?= $(shell odigos version --cli) -ORG := keyval +ORG ?= keyval .PHONY: build-odiglet build-odiglet: @@ -62,6 +62,10 @@ push-scheduler: push-collector: docker buildx build --platform linux/amd64,linux/arm64/v8 --push -t $(ORG)/odigos-collector:$(TAG) collector -f collector/Dockerfile +.PHONY: push-ui +push-ui: + docker buildx build --platform linux/amd64,linux/arm64/v8 --push -t $(ORG)/odigos-ui:$(TAG) . -f frontend/Dockerfile + .PHONY: push-images push-images: make push-autoscaler TAG=$(TAG) @@ -69,6 +73,7 @@ push-images: make push-odiglet TAG=$(TAG) make push-instrumentor TAG=$(TAG) make push-collector TAG=$(TAG) + make push-ui TAG=$(TAG) .PHONY: load-to-kind-odiglet load-to-kind-odiglet: diff --git a/autoscaler/controllers/common/memorylimiter.go b/autoscaler/controllers/common/memorylimiter.go new file mode 100644 index 000000000..034c91da3 --- /dev/null +++ b/autoscaler/controllers/common/memorylimiter.go @@ -0,0 +1,17 @@ +package common + +import ( + odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" + "github.com/odigos-io/odigos/common/config" +) + +func GetMemoryLimiterConfig(memorySettings odigosv1.CollectorsGroupResourcesSettings) config.GenericMap { + // check_interval is currently hardcoded to 1s + // this seems to be a reasonable value for the memory limiter and what the processor uses in docs. + // preforming memory checks is expensive, so we trade off performance with fast reaction time to memory pressure. + return config.GenericMap{ + "check_interval": "1s", + "limit_mib": memorySettings.MemoryLimiterLimitMiB, + "spike_limit_mib": memorySettings.MemoryLimiterSpikeLimitMiB, + } +} diff --git a/autoscaler/controllers/datacollection/configmap.go b/autoscaler/controllers/datacollection/configmap.go index 8b7885dcd..aab745125 100644 --- a/autoscaler/controllers/datacollection/configmap.go +++ b/autoscaler/controllers/datacollection/configmap.go @@ -9,9 +9,10 @@ import ( "github.com/ghodss/yaml" odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" + "github.com/odigos-io/odigos/autoscaler/controllers/common" commonconf "github.com/odigos-io/odigos/autoscaler/controllers/common" "github.com/odigos-io/odigos/autoscaler/controllers/datacollection/custom" - "github.com/odigos-io/odigos/common" + odigoscommon "github.com/odigos-io/odigos/common" "github.com/odigos-io/odigos/common/config" "github.com/odigos-io/odigos/common/consts" constsK8s "github.com/odigos-io/odigos/k8sutils/pkg/consts" @@ -124,23 +125,26 @@ func getDesiredConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odig return &desired, nil } -func calculateConfigMapData(collectorsGroup *odigosv1.CollectorsGroup, apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, processors []*odigosv1.Processor, +func calculateConfigMapData(nodeCG *odigosv1.CollectorsGroup, apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, processors []*odigosv1.Processor, setTracesLoadBalancer bool, disableNameProcessor bool) (string, error) { - ownMetricsPort := collectorsGroup.Spec.CollectorOwnMetricsPort + ownMetricsPort := nodeCG.Spec.CollectorOwnMetricsPort empty := struct{}{} processorsCfg, tracesProcessors, metricsProcessors, logsProcessors, errs := config.GetCrdProcessorsConfigMap(commonconf.ToProcessorConfigurerArray(processors)) for name, err := range errs { - log.Log.V(0).Info(err.Error(), "processor", name) + log.Log.V(0).Error(err, "processor", name) } if !disableNameProcessor { processorsCfg["odigosresourcename"] = empty } + memoryLimiterConfiguration := common.GetMemoryLimiterConfig(nodeCG.Spec.ResourcesSettings) + processorsCfg["batch"] = empty + processorsCfg["memory_limiter"] = memoryLimiterConfiguration processorsCfg["resource"] = config.GenericMap{ "attributes": []config.GenericMap{{ "key": "k8s.node.name", @@ -266,13 +270,13 @@ func calculateConfigMapData(collectorsGroup *odigosv1.CollectorsGroup, apps *odi collectLogs := false for _, dst := range dests.Items { for _, s := range dst.Spec.Signals { - if s == common.LogsObservabilitySignal && !custom.DestRequiresCustom(dst.Spec.Type) { + if s == odigoscommon.LogsObservabilitySignal && !custom.DestRequiresCustom(dst.Spec.Type) { collectLogs = true } - if s == common.TracesObservabilitySignal || dst.Spec.Type == common.PrometheusDestinationType { + if s == odigoscommon.TracesObservabilitySignal || dst.Spec.Type == odigoscommon.PrometheusDestinationType { collectTraces = true } - if s == common.MetricsObservabilitySignal && !custom.DestRequiresCustom(dst.Spec.Type) { + if s == odigoscommon.MetricsObservabilitySignal && !custom.DestRequiresCustom(dst.Spec.Type) { collectMetrics = true } } @@ -364,7 +368,7 @@ func getConfigMap(ctx context.Context, c client.Client, namespace string) (*v1.C return configMap, nil } -func getSignalsFromOtelcolConfig(otelcolConfigContent string) ([]common.ObservabilitySignal, error) { +func getSignalsFromOtelcolConfig(otelcolConfigContent string) ([]odigoscommon.ObservabilitySignal, error) { config := config.Config{} err := yaml.Unmarshal([]byte(otelcolConfigContent), &config) if err != nil { @@ -389,22 +393,28 @@ func getSignalsFromOtelcolConfig(otelcolConfigContent string) ([]common.Observab } } - signals := []common.ObservabilitySignal{} + signals := []odigoscommon.ObservabilitySignal{} if tracesEnabled { - signals = append(signals, common.TracesObservabilitySignal) + signals = append(signals, odigoscommon.TracesObservabilitySignal) } if metricsEnabled { - signals = append(signals, common.MetricsObservabilitySignal) + signals = append(signals, odigoscommon.MetricsObservabilitySignal) } if logsEnabled { - signals = append(signals, common.LogsObservabilitySignal) + signals = append(signals, odigoscommon.LogsObservabilitySignal) } return signals, nil } func getCommonProcessors(disableNameProcessor bool) []string { - processors := []string{"batch"} + // memory limiter is placed right after batch processor an not the first processor in pipeline + // this is so that instrumented application always succeeds in sending data to the collector + // (on it being added to a batch) and checking the memory limit later after the batch + // where memory rejection would drop the data instead of backpressuring the application. + // Read more about it here: https://github.com/open-telemetry/opentelemetry-collector/issues/11726 + // Also related: https://github.com/open-telemetry/opentelemetry-collector/issues/9591 + processors := []string{"batch", "memory_limiter"} if !disableNameProcessor { processors = append(processors, "odigosresourcename") } diff --git a/autoscaler/controllers/datacollection/daemonset.go b/autoscaler/controllers/datacollection/daemonset.go index 1d93b5d9a..dbfc70571 100644 --- a/autoscaler/controllers/datacollection/daemonset.go +++ b/autoscaler/controllers/datacollection/daemonset.go @@ -16,6 +16,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -191,6 +192,9 @@ func getDesiredDaemonSet(datacollection *odigosv1.CollectorsGroup, configData st rollingUpdate.MaxSurge = &maxSurge } + requestMemoryRequestQuantity := resource.MustParse(fmt.Sprintf("%dMi", datacollection.Spec.ResourcesSettings.MemoryRequestMiB)) + requestMemoryLimitQuantity := resource.MustParse(fmt.Sprintf("%dMi", datacollection.Spec.ResourcesSettings.MemoryLimitMiB)) + desiredDs := &appsv1.DaemonSet{ ObjectMeta: metav1.ObjectMeta{ Name: consts.OdigosNodeCollectorDaemonSetName, @@ -302,6 +306,10 @@ func getDesiredDaemonSet(datacollection *odigosv1.CollectorsGroup, configData st }, }, }, + { + Name: "GOMEMLIMIT", + Value: fmt.Sprintf("%dMiB", datacollection.Spec.ResourcesSettings.GomemlimitMiB), + }, }, LivenessProbe: &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ @@ -319,6 +327,14 @@ func getDesiredDaemonSet(datacollection *odigosv1.CollectorsGroup, configData st }, }, }, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceMemory: requestMemoryRequestQuantity, + }, + Limits: corev1.ResourceList{ + corev1.ResourceMemory: requestMemoryLimitQuantity, + }, + }, SecurityContext: &corev1.SecurityContext{ Privileged: boolPtr(true), }, diff --git a/autoscaler/controllers/gateway/configmap.go b/autoscaler/controllers/gateway/configmap.go index 1c4f0d7ff..1a5ca18d5 100644 --- a/autoscaler/controllers/gateway/configmap.go +++ b/autoscaler/controllers/gateway/configmap.go @@ -113,12 +113,7 @@ func addSelfTelemetryPipeline(c *config.Config, ownTelemetryPort int32) error { func syncConfigMap(dests *odigosv1.DestinationList, allProcessors *odigosv1.ProcessorList, gateway *odigosv1.CollectorsGroup, ctx context.Context, c client.Client, scheme *runtime.Scheme) (string, []odigoscommon.ObservabilitySignal, error) { logger := log.FromContext(ctx) - - memoryLimiterConfiguration := config.GenericMap{ - "check_interval": "1s", - "limit_mib": gateway.Spec.ResourcesSettings.MemoryLimiterLimitMiB, - "spike_limit_mib": gateway.Spec.ResourcesSettings.MemoryLimiterSpikeLimitMiB, - } + memoryLimiterConfiguration := common.GetMemoryLimiterConfig(gateway.Spec.ResourcesSettings) processors := common.FilterAndSortProcessorsByOrderHint(allProcessors, odigosv1.CollectorsGroupRoleClusterGateway) diff --git a/scheduler/controllers/nodecollectorsgroup/common.go b/scheduler/controllers/nodecollectorsgroup/common.go index 2e739a5e7..1ee516e71 100644 --- a/scheduler/controllers/nodecollectorsgroup/common.go +++ b/scheduler/controllers/nodecollectorsgroup/common.go @@ -6,7 +6,6 @@ import ( odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" "github.com/odigos-io/odigos/common" - "github.com/odigos-io/odigos/k8sutils/pkg/consts" k8sutilsconsts "github.com/odigos-io/odigos/k8sutils/pkg/consts" "github.com/odigos-io/odigos/k8sutils/pkg/env" "github.com/odigos-io/odigos/k8sutils/pkg/utils" @@ -14,9 +13,37 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +func getMemorySettings(odigosConfig common.OdigosConfiguration) odigosv1.CollectorsGroupResourcesSettings { + // TODO: currently using hardcoded values, should be configurable. + // + // memory request is expensive on daemonsets since it will consume this memory + // on each node in the cluster. setting to 256, but allowing memory to spike higher + // to consume more available memory on the node. + // if the node has memory to spare, we can use it to buffer more data before dropping, + // but it also means that if no memory is available, collector might get killed by OOM killer. + // + // we can trade-off the memory request: + // - more memory request: more memory allocated per collector on each node, but more buffer for bursts and transient failures. + // - less memory request: efficient use of cluster resources, but data might be dropped earlier on spikes. + // currently choosing 256MiB as a balance (~200MiB left for heap to handle batches and export queues). + // + // we can trade-off how high the memory limit is set above the request: + // - limit is set to request: collector most stable (no OOM) but smaller buffer for bursts and early data drop. + // - limit is set way above request: in case of memory spike, collector will use extra memory available on the node to buffer data, but might get killed by OOM killer if this memory is not available. + // currently choosing 512MiB as a balance (200MiB guaranteed for heap, and the rest ~300MiB of buffer from node before start dropping). + // + return odigosv1.CollectorsGroupResourcesSettings{ + MemoryRequestMiB: 256, + MemoryLimitMiB: 512 + 64, + MemoryLimiterLimitMiB: 512, + MemoryLimiterSpikeLimitMiB: 128, // meaning that collector will start dropping data at 512-128=384MiB + GomemlimitMiB: 512 - 128 - 32, // start aggressive GC 32 MiB before soft limit and dropping data + } +} + func newNodeCollectorGroup(odigosConfig common.OdigosConfiguration) *odigosv1.CollectorsGroup { - ownMetricsPort := consts.OdigosNodeCollectorOwnTelemetryPortDefault + ownMetricsPort := k8sutilsconsts.OdigosNodeCollectorOwnTelemetryPortDefault if odigosConfig.CollectorNode != nil && odigosConfig.CollectorNode.CollectorOwnMetricsPort != 0 { ownMetricsPort = odigosConfig.CollectorNode.CollectorOwnMetricsPort } @@ -27,12 +54,13 @@ func newNodeCollectorGroup(odigosConfig common.OdigosConfiguration) *odigosv1.Co APIVersion: "odigos.io/v1alpha1", }, ObjectMeta: metav1.ObjectMeta{ - Name: consts.OdigosNodeCollectorDaemonSetName, + Name: k8sutilsconsts.OdigosNodeCollectorDaemonSetName, Namespace: env.GetCurrentNamespace(), }, Spec: odigosv1.CollectorsGroupSpec{ Role: odigosv1.CollectorsGroupRoleNodeCollector, CollectorOwnMetricsPort: ownMetricsPort, + ResourcesSettings: getMemorySettings(odigosConfig), }, } } diff --git a/tests/common/assert/pipeline-ready.yaml b/tests/common/assert/pipeline-ready.yaml index b44e37162..2c52b1386 100644 --- a/tests/common/assert/pipeline-ready.yaml +++ b/tests/common/assert/pipeline-ready.yaml @@ -59,6 +59,8 @@ spec: resources: requests: (memory != null): true + limits: + (memory != null): true volumeMounts: - mountPath: /conf name: collector-conf @@ -150,6 +152,13 @@ spec: fieldRef: apiVersion: v1 fieldPath: metadata.name + - name: GOMEMLIMIT + (value != null): true + resources: + requests: + (memory != null): true + limits: + (memory != null): true hostNetwork: true nodeSelector: kubernetes.io/os: linux