diff --git a/Makefile b/Makefile index 87412c735..f53b2179a 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ TAG ?= $(shell odigos version --cluster) ODIGOS_CLI_VERSION ?= $(shell odigos version --cli) -ORG := keyval +ORG ?= keyval .PHONY: build-odiglet build-odiglet: @@ -62,6 +62,10 @@ push-scheduler: push-collector: docker buildx build --platform linux/amd64,linux/arm64/v8 --push -t $(ORG)/odigos-collector:$(TAG) collector -f collector/Dockerfile +.PHONY: push-ui +push-ui: + docker buildx build --platform linux/amd64,linux/arm64/v8 --push -t $(ORG)/odigos-ui:$(TAG) . -f frontend/Dockerfile + .PHONY: push-images push-images: make push-autoscaler TAG=$(TAG) @@ -69,6 +73,7 @@ push-images: make push-odiglet TAG=$(TAG) make push-instrumentor TAG=$(TAG) make push-collector TAG=$(TAG) + make push-ui TAG=$(TAG) .PHONY: load-to-kind-odiglet load-to-kind-odiglet: diff --git a/autoscaler/controllers/common/memorylimiter.go b/autoscaler/controllers/common/memorylimiter.go new file mode 100644 index 000000000..034c91da3 --- /dev/null +++ b/autoscaler/controllers/common/memorylimiter.go @@ -0,0 +1,17 @@ +package common + +import ( + odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" + "github.com/odigos-io/odigos/common/config" +) + +func GetMemoryLimiterConfig(memorySettings odigosv1.CollectorsGroupResourcesSettings) config.GenericMap { + // check_interval is currently hardcoded to 1s + // this seems to be a reasonable value for the memory limiter and what the processor uses in docs. + // preforming memory checks is expensive, so we trade off performance with fast reaction time to memory pressure. + return config.GenericMap{ + "check_interval": "1s", + "limit_mib": memorySettings.MemoryLimiterLimitMiB, + "spike_limit_mib": memorySettings.MemoryLimiterSpikeLimitMiB, + } +} diff --git a/autoscaler/controllers/datacollection/configmap.go b/autoscaler/controllers/datacollection/configmap.go index 8b7885dcd..aab745125 100644 --- a/autoscaler/controllers/datacollection/configmap.go +++ b/autoscaler/controllers/datacollection/configmap.go @@ -9,9 +9,10 @@ import ( "github.com/ghodss/yaml" odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" + "github.com/odigos-io/odigos/autoscaler/controllers/common" commonconf "github.com/odigos-io/odigos/autoscaler/controllers/common" "github.com/odigos-io/odigos/autoscaler/controllers/datacollection/custom" - "github.com/odigos-io/odigos/common" + odigoscommon "github.com/odigos-io/odigos/common" "github.com/odigos-io/odigos/common/config" "github.com/odigos-io/odigos/common/consts" constsK8s "github.com/odigos-io/odigos/k8sutils/pkg/consts" @@ -124,23 +125,26 @@ func getDesiredConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odig return &desired, nil } -func calculateConfigMapData(collectorsGroup *odigosv1.CollectorsGroup, apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, processors []*odigosv1.Processor, +func calculateConfigMapData(nodeCG *odigosv1.CollectorsGroup, apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, processors []*odigosv1.Processor, setTracesLoadBalancer bool, disableNameProcessor bool) (string, error) { - ownMetricsPort := collectorsGroup.Spec.CollectorOwnMetricsPort + ownMetricsPort := nodeCG.Spec.CollectorOwnMetricsPort empty := struct{}{} processorsCfg, tracesProcessors, metricsProcessors, logsProcessors, errs := config.GetCrdProcessorsConfigMap(commonconf.ToProcessorConfigurerArray(processors)) for name, err := range errs { - log.Log.V(0).Info(err.Error(), "processor", name) + log.Log.V(0).Error(err, "processor", name) } if !disableNameProcessor { processorsCfg["odigosresourcename"] = empty } + memoryLimiterConfiguration := common.GetMemoryLimiterConfig(nodeCG.Spec.ResourcesSettings) + processorsCfg["batch"] = empty + processorsCfg["memory_limiter"] = memoryLimiterConfiguration processorsCfg["resource"] = config.GenericMap{ "attributes": []config.GenericMap{{ "key": "k8s.node.name", @@ -266,13 +270,13 @@ func calculateConfigMapData(collectorsGroup *odigosv1.CollectorsGroup, apps *odi collectLogs := false for _, dst := range dests.Items { for _, s := range dst.Spec.Signals { - if s == common.LogsObservabilitySignal && !custom.DestRequiresCustom(dst.Spec.Type) { + if s == odigoscommon.LogsObservabilitySignal && !custom.DestRequiresCustom(dst.Spec.Type) { collectLogs = true } - if s == common.TracesObservabilitySignal || dst.Spec.Type == common.PrometheusDestinationType { + if s == odigoscommon.TracesObservabilitySignal || dst.Spec.Type == odigoscommon.PrometheusDestinationType { collectTraces = true } - if s == common.MetricsObservabilitySignal && !custom.DestRequiresCustom(dst.Spec.Type) { + if s == odigoscommon.MetricsObservabilitySignal && !custom.DestRequiresCustom(dst.Spec.Type) { collectMetrics = true } } @@ -364,7 +368,7 @@ func getConfigMap(ctx context.Context, c client.Client, namespace string) (*v1.C return configMap, nil } -func getSignalsFromOtelcolConfig(otelcolConfigContent string) ([]common.ObservabilitySignal, error) { +func getSignalsFromOtelcolConfig(otelcolConfigContent string) ([]odigoscommon.ObservabilitySignal, error) { config := config.Config{} err := yaml.Unmarshal([]byte(otelcolConfigContent), &config) if err != nil { @@ -389,22 +393,28 @@ func getSignalsFromOtelcolConfig(otelcolConfigContent string) ([]common.Observab } } - signals := []common.ObservabilitySignal{} + signals := []odigoscommon.ObservabilitySignal{} if tracesEnabled { - signals = append(signals, common.TracesObservabilitySignal) + signals = append(signals, odigoscommon.TracesObservabilitySignal) } if metricsEnabled { - signals = append(signals, common.MetricsObservabilitySignal) + signals = append(signals, odigoscommon.MetricsObservabilitySignal) } if logsEnabled { - signals = append(signals, common.LogsObservabilitySignal) + signals = append(signals, odigoscommon.LogsObservabilitySignal) } return signals, nil } func getCommonProcessors(disableNameProcessor bool) []string { - processors := []string{"batch"} + // memory limiter is placed right after batch processor an not the first processor in pipeline + // this is so that instrumented application always succeeds in sending data to the collector + // (on it being added to a batch) and checking the memory limit later after the batch + // where memory rejection would drop the data instead of backpressuring the application. + // Read more about it here: https://github.com/open-telemetry/opentelemetry-collector/issues/11726 + // Also related: https://github.com/open-telemetry/opentelemetry-collector/issues/9591 + processors := []string{"batch", "memory_limiter"} if !disableNameProcessor { processors = append(processors, "odigosresourcename") } diff --git a/autoscaler/controllers/datacollection/daemonset.go b/autoscaler/controllers/datacollection/daemonset.go index 1d93b5d9a..dbfc70571 100644 --- a/autoscaler/controllers/datacollection/daemonset.go +++ b/autoscaler/controllers/datacollection/daemonset.go @@ -16,6 +16,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -191,6 +192,9 @@ func getDesiredDaemonSet(datacollection *odigosv1.CollectorsGroup, configData st rollingUpdate.MaxSurge = &maxSurge } + requestMemoryRequestQuantity := resource.MustParse(fmt.Sprintf("%dMi", datacollection.Spec.ResourcesSettings.MemoryRequestMiB)) + requestMemoryLimitQuantity := resource.MustParse(fmt.Sprintf("%dMi", datacollection.Spec.ResourcesSettings.MemoryLimitMiB)) + desiredDs := &appsv1.DaemonSet{ ObjectMeta: metav1.ObjectMeta{ Name: consts.OdigosNodeCollectorDaemonSetName, @@ -302,6 +306,10 @@ func getDesiredDaemonSet(datacollection *odigosv1.CollectorsGroup, configData st }, }, }, + { + Name: "GOMEMLIMIT", + Value: fmt.Sprintf("%dMiB", datacollection.Spec.ResourcesSettings.GomemlimitMiB), + }, }, LivenessProbe: &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ @@ -319,6 +327,14 @@ func getDesiredDaemonSet(datacollection *odigosv1.CollectorsGroup, configData st }, }, }, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceMemory: requestMemoryRequestQuantity, + }, + Limits: corev1.ResourceList{ + corev1.ResourceMemory: requestMemoryLimitQuantity, + }, + }, SecurityContext: &corev1.SecurityContext{ Privileged: boolPtr(true), }, diff --git a/autoscaler/controllers/gateway/configmap.go b/autoscaler/controllers/gateway/configmap.go index 1c4f0d7ff..1a5ca18d5 100644 --- a/autoscaler/controllers/gateway/configmap.go +++ b/autoscaler/controllers/gateway/configmap.go @@ -113,12 +113,7 @@ func addSelfTelemetryPipeline(c *config.Config, ownTelemetryPort int32) error { func syncConfigMap(dests *odigosv1.DestinationList, allProcessors *odigosv1.ProcessorList, gateway *odigosv1.CollectorsGroup, ctx context.Context, c client.Client, scheme *runtime.Scheme) (string, []odigoscommon.ObservabilitySignal, error) { logger := log.FromContext(ctx) - - memoryLimiterConfiguration := config.GenericMap{ - "check_interval": "1s", - "limit_mib": gateway.Spec.ResourcesSettings.MemoryLimiterLimitMiB, - "spike_limit_mib": gateway.Spec.ResourcesSettings.MemoryLimiterSpikeLimitMiB, - } + memoryLimiterConfiguration := common.GetMemoryLimiterConfig(gateway.Spec.ResourcesSettings) processors := common.FilterAndSortProcessorsByOrderHint(allProcessors, odigosv1.CollectorsGroupRoleClusterGateway) diff --git a/scheduler/controllers/nodecollectorsgroup/common.go b/scheduler/controllers/nodecollectorsgroup/common.go index 2e739a5e7..1ee516e71 100644 --- a/scheduler/controllers/nodecollectorsgroup/common.go +++ b/scheduler/controllers/nodecollectorsgroup/common.go @@ -6,7 +6,6 @@ import ( odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" "github.com/odigos-io/odigos/common" - "github.com/odigos-io/odigos/k8sutils/pkg/consts" k8sutilsconsts "github.com/odigos-io/odigos/k8sutils/pkg/consts" "github.com/odigos-io/odigos/k8sutils/pkg/env" "github.com/odigos-io/odigos/k8sutils/pkg/utils" @@ -14,9 +13,37 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +func getMemorySettings(odigosConfig common.OdigosConfiguration) odigosv1.CollectorsGroupResourcesSettings { + // TODO: currently using hardcoded values, should be configurable. + // + // memory request is expensive on daemonsets since it will consume this memory + // on each node in the cluster. setting to 256, but allowing memory to spike higher + // to consume more available memory on the node. + // if the node has memory to spare, we can use it to buffer more data before dropping, + // but it also means that if no memory is available, collector might get killed by OOM killer. + // + // we can trade-off the memory request: + // - more memory request: more memory allocated per collector on each node, but more buffer for bursts and transient failures. + // - less memory request: efficient use of cluster resources, but data might be dropped earlier on spikes. + // currently choosing 256MiB as a balance (~200MiB left for heap to handle batches and export queues). + // + // we can trade-off how high the memory limit is set above the request: + // - limit is set to request: collector most stable (no OOM) but smaller buffer for bursts and early data drop. + // - limit is set way above request: in case of memory spike, collector will use extra memory available on the node to buffer data, but might get killed by OOM killer if this memory is not available. + // currently choosing 512MiB as a balance (200MiB guaranteed for heap, and the rest ~300MiB of buffer from node before start dropping). + // + return odigosv1.CollectorsGroupResourcesSettings{ + MemoryRequestMiB: 256, + MemoryLimitMiB: 512 + 64, + MemoryLimiterLimitMiB: 512, + MemoryLimiterSpikeLimitMiB: 128, // meaning that collector will start dropping data at 512-128=384MiB + GomemlimitMiB: 512 - 128 - 32, // start aggressive GC 32 MiB before soft limit and dropping data + } +} + func newNodeCollectorGroup(odigosConfig common.OdigosConfiguration) *odigosv1.CollectorsGroup { - ownMetricsPort := consts.OdigosNodeCollectorOwnTelemetryPortDefault + ownMetricsPort := k8sutilsconsts.OdigosNodeCollectorOwnTelemetryPortDefault if odigosConfig.CollectorNode != nil && odigosConfig.CollectorNode.CollectorOwnMetricsPort != 0 { ownMetricsPort = odigosConfig.CollectorNode.CollectorOwnMetricsPort } @@ -27,12 +54,13 @@ func newNodeCollectorGroup(odigosConfig common.OdigosConfiguration) *odigosv1.Co APIVersion: "odigos.io/v1alpha1", }, ObjectMeta: metav1.ObjectMeta{ - Name: consts.OdigosNodeCollectorDaemonSetName, + Name: k8sutilsconsts.OdigosNodeCollectorDaemonSetName, Namespace: env.GetCurrentNamespace(), }, Spec: odigosv1.CollectorsGroupSpec{ Role: odigosv1.CollectorsGroupRoleNodeCollector, CollectorOwnMetricsPort: ownMetricsPort, + ResourcesSettings: getMemorySettings(odigosConfig), }, } } diff --git a/tests/common/assert/pipeline-ready.yaml b/tests/common/assert/pipeline-ready.yaml index b44e37162..2c52b1386 100644 --- a/tests/common/assert/pipeline-ready.yaml +++ b/tests/common/assert/pipeline-ready.yaml @@ -59,6 +59,8 @@ spec: resources: requests: (memory != null): true + limits: + (memory != null): true volumeMounts: - mountPath: /conf name: collector-conf @@ -150,6 +152,13 @@ spec: fieldRef: apiVersion: v1 fieldPath: metadata.name + - name: GOMEMLIMIT + (value != null): true + resources: + requests: + (memory != null): true + limits: + (memory != null): true hostNetwork: true nodeSelector: kubernetes.io/os: linux