Skip to content

Commit

Permalink
feat: odigos to run in parallel to otheragents using config (#1957)
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirdavid1 authored Dec 9, 2024
1 parent 4755276 commit 43a7bce
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 166 deletions.
3 changes: 2 additions & 1 deletion cli/cmd/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
cmdcontext "github.com/odigos-io/odigos/cli/pkg/cmd_context"
"github.com/odigos-io/odigos/common"
"github.com/odigos-io/odigos/k8sutils/pkg/getters"
k8sprofiles "github.com/odigos-io/odigos/k8sutils/pkg/profiles"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -102,7 +103,7 @@ var addProfileCmd = &cobra.Command{

// Fetch the available profiles for the current tier
profiles := resources.GetAvailableProfilesForTier(currentTier)
var selectedProfile *resources.Profile
var selectedProfile *k8sprofiles.Profile

// Search for the specified profile in the available profiles
for _, profile := range profiles {
Expand Down
11 changes: 6 additions & 5 deletions cli/cmd/resources/odigosconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/odigos-io/odigos/cli/pkg/kube"
"github.com/odigos-io/odigos/common"
"github.com/odigos-io/odigos/common/consts"
k8sprofiles "github.com/odigos-io/odigos/k8sutils/pkg/profiles"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"
Expand Down Expand Up @@ -47,7 +48,7 @@ func (a *odigosConfigResourceManager) Name() string { return "OdigosConfig" }

func (a *odigosConfigResourceManager) InstallFromScratch(ctx context.Context) error {

sizingProfile := FilterSizeProfiles(a.config.Profiles)
sizingProfile := k8sprofiles.FilterSizeProfiles(a.config.Profiles)
collectorGatewayConfig := GetGatewayConfigBasedOnSize(sizingProfile)
a.config.CollectorGateway = collectorGatewayConfig

Expand All @@ -63,11 +64,11 @@ func (a *odigosConfigResourceManager) InstallFromScratch(ctx context.Context) er
}

func GetGatewayConfigBasedOnSize(profile common.ProfileName) *common.CollectorGatewayConfiguration {
aggregateProfiles := append([]common.ProfileName{profile}, profilesMap[profile].Dependencies...)
aggregateProfiles := append([]common.ProfileName{profile}, k8sprofiles.ProfilesMap[profile].Dependencies...)

for _, profile := range aggregateProfiles {
switch profile {
case sizeSProfile.ProfileName:
case k8sprofiles.SizeSProfile.ProfileName:
return &common.CollectorGatewayConfiguration{
MinReplicas: 1,
MaxReplicas: 5,
Expand All @@ -76,7 +77,7 @@ func GetGatewayConfigBasedOnSize(profile common.ProfileName) *common.CollectorGa
RequestMemoryMiB: 300,
LimitMemoryMiB: 300,
}
case sizeMProfile.ProfileName:
case k8sprofiles.SizeMProfile.ProfileName:
return &common.CollectorGatewayConfiguration{
MinReplicas: 2,
MaxReplicas: 8,
Expand All @@ -85,7 +86,7 @@ func GetGatewayConfigBasedOnSize(profile common.ProfileName) *common.CollectorGa
RequestMemoryMiB: 500,
LimitMemoryMiB: 600,
}
case sizeLProfile.ProfileName:
case k8sprofiles.SizeLProfile.ProfileName:
return &common.CollectorGatewayConfiguration{
MinReplicas: 3,
MaxReplicas: 12,
Expand Down
141 changes: 11 additions & 130 deletions cli/cmd/resources/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,120 +4,23 @@ import (
"context"
"fmt"

odigosv1alpha1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
"github.com/odigos-io/odigos/cli/cmd/resources/profiles"
"github.com/odigos-io/odigos/cli/cmd/resources/resourcemanager"
"github.com/odigos-io/odigos/cli/pkg/kube"
"github.com/odigos-io/odigos/common"
k8sprofiles "github.com/odigos-io/odigos/k8sutils/pkg/profiles"
)

type Profile struct {
ProfileName common.ProfileName
ShortDescription string
KubeObject kube.Object // used to read it from the embedded YAML file
Dependencies []common.ProfileName // other profiles that are applied by the current profile
func GetAvailableCommunityProfiles() []k8sprofiles.Profile {
return []k8sprofiles.Profile{k8sprofiles.SemconvUpgraderProfile, k8sprofiles.CopyScopeProfile, k8sprofiles.DisableNameProcessorProfile,
k8sprofiles.SizeSProfile, k8sprofiles.SizeMProfile,
k8sprofiles.SizeLProfile, k8sprofiles.AllowConcurrentAgents}
}

var (
// sizing profiles for the collector gateway
sizeSProfile = Profile{
ProfileName: common.ProfileName("size_s"),
ShortDescription: "Small size deployment profile",
}
sizeMProfile = Profile{
ProfileName: common.ProfileName("size_m"),
ShortDescription: "Medium size deployment profile",
}
sizeLProfile = Profile{
ProfileName: common.ProfileName("size_l"),
ShortDescription: "Large size deployment profile",
}
fullPayloadCollectionProfile = Profile{
ProfileName: common.ProfileName("full-payload-collection"),
ShortDescription: "Collect any payload from the cluster where supported with default settings",
KubeObject: &odigosv1alpha1.InstrumentationRule{},
}
dbPayloadCollectionProfile = Profile{
ProfileName: common.ProfileName("db-payload-collection"),
ShortDescription: "Collect db payload from the cluster where supported with default settings",
KubeObject: &odigosv1alpha1.InstrumentationRule{},
}
queryOperationDetector = Profile{
ProfileName: common.ProfileName("query-operation-detector"),
ShortDescription: "Detect the SQL operation name from the query text",
KubeObject: &odigosv1alpha1.Processor{},
}
semconvUpgraderProfile = Profile{
ProfileName: common.ProfileName("semconv"),
ShortDescription: "Upgrade and align some attribute names to a newer version of the OpenTelemetry semantic conventions",
KubeObject: &odigosv1alpha1.Processor{},
}
categoryAttributesProfile = Profile{
ProfileName: common.ProfileName("category-attributes"),
ShortDescription: "Add category attributes to the spans",
KubeObject: &odigosv1alpha1.Processor{},
}
copyScopeProfile = Profile{
ProfileName: common.ProfileName("copy-scope"),
ShortDescription: "Copy the scope name into a separate attribute for backends that do not support scopes",
KubeObject: &odigosv1alpha1.Processor{},
}
hostnameAsPodNameProfile = Profile{
ProfileName: common.ProfileName("hostname-as-podname"),
ShortDescription: "Populate the spans resource `host.name` attribute with value of `k8s.pod.name`",
KubeObject: &odigosv1alpha1.Processor{},
}
javaNativeInstrumentationsProfile = Profile{
ProfileName: common.ProfileName("java-native-instrumentations"),
ShortDescription: "Instrument Java applications using native instrumentation and eBPF enterprise processing",
KubeObject: &odigosv1alpha1.InstrumentationRule{},
}
codeAttributesProfile = Profile{
ProfileName: common.ProfileName("code-attributes"),
ShortDescription: "Record span attributes in 'code' namespace where supported",
}
disableNameProcessorProfile = Profile{
ProfileName: common.ProfileName("disable-name-processor"),
ShortDescription: "If not using dotnet or java native instrumentations, disable the name processor which is not needed",
}
smallBatchesProfile = Profile{
ProfileName: common.ProfileName("small-batches"),
ShortDescription: "Reduce the batch size for exports",
KubeObject: &odigosv1alpha1.Processor{},
}
kratosProfile = Profile{
ProfileName: common.ProfileName("kratos"),
ShortDescription: "Bundle profile that includes db-payload-collection, semconv, category-attributes, copy-scope, hostname-as-podname, java-native-instrumentations, code-attributes, query-operation-detector, disableNameProcessorProfile, small-batches, size_m",
Dependencies: []common.ProfileName{"db-payload-collection", "semconv", "category-attributes", "copy-scope", "hostname-as-podname", "java-native-instrumentations", "code-attributes", "query-operation-detector", "disableNameProcessorProfile", "small-batches", "size_m"},
}
profilesMap = map[common.ProfileName]Profile{
sizeSProfile.ProfileName: sizeSProfile,
sizeMProfile.ProfileName: sizeMProfile,
sizeLProfile.ProfileName: sizeLProfile,
fullPayloadCollectionProfile.ProfileName: fullPayloadCollectionProfile,
dbPayloadCollectionProfile.ProfileName: dbPayloadCollectionProfile,
queryOperationDetector.ProfileName: queryOperationDetector,
semconvUpgraderProfile.ProfileName: semconvUpgraderProfile,
categoryAttributesProfile.ProfileName: categoryAttributesProfile,
copyScopeProfile.ProfileName: copyScopeProfile,
hostnameAsPodNameProfile.ProfileName: hostnameAsPodNameProfile,
javaNativeInstrumentationsProfile.ProfileName: javaNativeInstrumentationsProfile,
codeAttributesProfile.ProfileName: codeAttributesProfile,
disableNameProcessorProfile.ProfileName: disableNameProcessorProfile,
smallBatchesProfile.ProfileName: smallBatchesProfile,
kratosProfile.ProfileName: kratosProfile,
}
)

func GetAvailableCommunityProfiles() []Profile {
return []Profile{semconvUpgraderProfile, copyScopeProfile, disableNameProcessorProfile, sizeSProfile, sizeMProfile,
sizeLProfile}
}

func GetAvailableOnPremProfiles() []Profile {
return append([]Profile{fullPayloadCollectionProfile, dbPayloadCollectionProfile, categoryAttributesProfile,
hostnameAsPodNameProfile, javaNativeInstrumentationsProfile, kratosProfile, queryOperationDetector,
smallBatchesProfile},
func GetAvailableOnPremProfiles() []k8sprofiles.Profile {
return append([]k8sprofiles.Profile{k8sprofiles.FullPayloadCollectionProfile, k8sprofiles.DbPayloadCollectionProfile, k8sprofiles.CategoryAttributesProfile,
k8sprofiles.HostnameAsPodNameProfile, k8sprofiles.JavaNativeInstrumentationsProfile, k8sprofiles.KratosProfile, k8sprofiles.QueryOperationDetector,
k8sprofiles.SmallBatchesProfile},
GetAvailableCommunityProfiles()...)
}

Expand Down Expand Up @@ -147,14 +50,14 @@ func GetResourcesForProfileName(profileName common.ProfileName, tier common.Odig
return nil, nil
}

func GetAvailableProfilesForTier(odigosTier common.OdigosTier) []Profile {
func GetAvailableProfilesForTier(odigosTier common.OdigosTier) []k8sprofiles.Profile {
switch odigosTier {
case common.CommunityOdigosTier:
return GetAvailableCommunityProfiles()
case common.OnPremOdigosTier:
return GetAvailableOnPremProfiles()
default:
return []Profile{}
return []k8sprofiles.Profile{}
}
}

Expand Down Expand Up @@ -185,25 +88,3 @@ func (a *profilesResourceManager) InstallFromScratch(ctx context.Context) error
}
return a.client.ApplyResources(ctx, a.config.ConfigVersion, allResources)
}

func FilterSizeProfiles(profiles []common.ProfileName) common.ProfileName {
// In case multiple size profiles are provided, the first one will be used.

for _, profile := range profiles {
// Check if the profile is a size profile.
switch profile {
case sizeSProfile.ProfileName, sizeMProfile.ProfileName, sizeLProfile.ProfileName:
return profile
}

// Check if the profile has a dependency which is a size profile.
profileDependencies := profilesMap[profile].Dependencies
for _, dependencyProfile := range profileDependencies {
switch dependencyProfile {
case sizeSProfile.ProfileName, sizeMProfile.ProfileName, sizeLProfile.ProfileName:
return dependencyProfile
}
}
}
return ""
}
28 changes: 14 additions & 14 deletions common/odigos_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,20 @@ type CollectorGatewayConfiguration struct {

// OdigosConfiguration defines the desired state of OdigosConfiguration
type OdigosConfiguration struct {
ConfigVersion int `json:"configVersion"`
TelemetryEnabled bool `json:"telemetryEnabled,omitempty"`
OpenshiftEnabled bool `json:"openshiftEnabled,omitempty"`
IgnoredNamespaces []string `json:"ignoredNamespaces,omitempty"`
IgnoredContainers []string `json:"ignoredContainers,omitempty"`
Psp bool `json:"psp,omitempty"`
ImagePrefix string `json:"imagePrefix,omitempty"`
OdigletImage string `json:"odigletImage,omitempty"`
InstrumentorImage string `json:"instrumentorImage,omitempty"`
AutoscalerImage string `json:"autoscalerImage,omitempty"`
CollectorGateway *CollectorGatewayConfiguration `json:"collectorGateway,omitempty"`
CollectorNode *CollectorNodeConfiguration `json:"collectorNode,omitempty"`
Profiles []ProfileName `json:"profiles,omitempty"`

ConfigVersion int `json:"configVersion"`
TelemetryEnabled bool `json:"telemetryEnabled,omitempty"`
OpenshiftEnabled bool `json:"openshiftEnabled,omitempty"`
IgnoredNamespaces []string `json:"ignoredNamespaces,omitempty"`
IgnoredContainers []string `json:"ignoredContainers,omitempty"`
Psp bool `json:"psp,omitempty"`
ImagePrefix string `json:"imagePrefix,omitempty"`
OdigletImage string `json:"odigletImage,omitempty"`
InstrumentorImage string `json:"instrumentorImage,omitempty"`
AutoscalerImage string `json:"autoscalerImage,omitempty"`
CollectorGateway *CollectorGatewayConfiguration `json:"collectorGateway,omitempty"`
CollectorNode *CollectorNodeConfiguration `json:"collectorNode,omitempty"`
Profiles []ProfileName `json:"profiles,omitempty"`
AllowConcurrentAgents *bool `json:"allowConcurrentAgents,omitempty"`
// this is internal currently, and is not exposed on the CLI / helm
// used for odigos enterprise
GoAutoIncludeCodeAttributes bool `json:"goAutoIncludeCodeAttributes,omitempty"`
Expand Down
31 changes: 28 additions & 3 deletions instrumentor/controllers/instrumentationdevice/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package instrumentationdevice
import (
"context"
"errors"
"fmt"

odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
"github.com/odigos-io/odigos/instrumentor/controllers/utils"
Expand All @@ -12,6 +13,8 @@ import (
"github.com/odigos-io/odigos/k8sutils/pkg/conditions"
odigosk8sconsts "github.com/odigos-io/odigos/k8sutils/pkg/consts"
"github.com/odigos-io/odigos/k8sutils/pkg/env"
k8sprofiles "github.com/odigos-io/odigos/k8sutils/pkg/profiles"
k8sutils "github.com/odigos-io/odigos/k8sutils/pkg/utils"
"github.com/odigos-io/odigos/k8sutils/pkg/workload"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -64,9 +67,9 @@ func isDataCollectionReady(ctx context.Context, c client.Client) bool {
}

func addInstrumentationDeviceToWorkload(ctx context.Context, kubeClient client.Client, runtimeDetails *odigosv1.InstrumentedApplication) (error, bool) {

// devicePartiallyApplied is used to indicate that the instrumentation device was partially applied for some of the containers.
devicePartiallyApplied := false
deviceNotAppliedDueToPresenceOfAnotherAgent := false

logger := log.FromContext(ctx)
obj, err := getWorkloadObject(ctx, kubeClient, runtimeDetails)
Expand Down Expand Up @@ -117,12 +120,29 @@ func addInstrumentationDeviceToWorkload(ctx context.Context, kubeClient client.C
return err
}

err, deviceApplied, tempDevicePartiallyApplied := instrumentation.ApplyInstrumentationDevicesToPodTemplate(podSpec, runtimeDetails, otelSdkToUse, obj, logger)
// get the odigos configuration to check if agents can run concurrently
// if the configuration is not found, we assume that agents can't run concurrently [default behavior]
odigosConfiguration, err := k8sutils.GetCurrentOdigosConfig(ctx, kubeClient)
if err != nil {
return err
}

// User input <odigosConfiguration.AllowConcurrentAgents> prefered over the profile configuration
agentsCanRunConcurrently := k8sprofiles.AgentsCanRunConcurrently(odigosConfiguration.Profiles)
if odigosConfiguration.AllowConcurrentAgents != nil {
agentsCanRunConcurrently = *odigosConfiguration.AllowConcurrentAgents
}

err, deviceApplied, deviceSkippedDueToOtherAgent := instrumentation.ApplyInstrumentationDevicesToPodTemplate(podSpec, runtimeDetails, otelSdkToUse, obj, logger, agentsCanRunConcurrently)
if err != nil {
return err
}
// if non of the devices were applied due to the presence of another agent, return an error.
if !deviceApplied && deviceSkippedDueToOtherAgent {
deviceNotAppliedDueToPresenceOfAnotherAgent = true
}

devicePartiallyApplied = tempDevicePartiallyApplied
devicePartiallyApplied = deviceSkippedDueToOtherAgent && deviceApplied
// If instrumentation device is applied successfully, add odigos.io/inject-instrumentation label to enable the webhook
if deviceApplied {
instrumentation.SetInjectInstrumentationLabel(podSpec)
Expand All @@ -131,6 +151,11 @@ func addInstrumentationDeviceToWorkload(ctx context.Context, kubeClient client.C
return nil
})

// if non of the devices were applied due to the presence of another agent, return an error.
if deviceNotAppliedDueToPresenceOfAnotherAgent {
return fmt.Errorf("device not added to any container due to the presence of another agent"), false
}

if err != nil {
return err, false
}
Expand Down
3 changes: 3 additions & 0 deletions instrumentor/controllers/instrumentationdevice/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ var _ = BeforeSuite(func() {
odigosSystemNamespace := testutil.NewOdigosSystemNamespace()
Expect(k8sClient.Create(testCtx, odigosSystemNamespace)).Should(Succeed())

configmap := testutil.NewMockOdigosConfig()
Expect(k8sClient.Create(testCtx, configmap)).Should(Succeed())

// report the node collector is ready
datacollection := testutil.NewMockDataCollection()
Expect(k8sClient.Create(testCtx, datacollection)).Should(Succeed())
Expand Down
18 changes: 5 additions & 13 deletions instrumentor/instrumentation/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var (
)

func ApplyInstrumentationDevicesToPodTemplate(original *corev1.PodTemplateSpec, runtimeDetails *odigosv1.InstrumentedApplication, defaultSdks map[common.ProgrammingLanguage]common.OtelSdk, targetObj client.Object,
logger logr.Logger) (error, bool, bool) {
logger logr.Logger, agentsCanRunConcurrently bool) (error, bool, bool) {
// delete any existing instrumentation devices.
// this is necessary for example when migrating from community to enterprise,
// and we need to cleanup the community device before adding the enterprise one.
Expand All @@ -42,9 +42,9 @@ func ApplyInstrumentationDevicesToPodTemplate(original *corev1.PodTemplateSpec,
containerLanguage := getLanguageOfContainer(runtimeDetails, container.Name)
containerHaveOtherAgent := getContainerOtherAgents(runtimeDetails, container.Name)

// In case there is another agent in the container, we should not apply the instrumentation device '*'.
// '*' - In Python, we can run it with New Relic (the only one we detect), but not in other languages.
if containerHaveOtherAgent != nil && containerLanguage != common.PythonProgrammingLanguage {
// By default, Odigos does not run alongside other agents.
// However, if configured in the odigos-config, it can be allowed to run in parallel.
if containerHaveOtherAgent != nil && !agentsCanRunConcurrently {
logger.Info("Container is running other agent, skip applying instrumentation device", "agent", containerHaveOtherAgent.Name, "container", container.Name)

// Not actually modifying the container, but we need to append it to the list.
Expand Down Expand Up @@ -92,15 +92,7 @@ func ApplyInstrumentationDevicesToPodTemplate(original *corev1.PodTemplateSpec,
// persist the original values if changed
manifestEnvOriginal.SerializeToAnnotation(targetObj)

// if non of the devices were applied due to the presence of another agent, return an error.
if !deviceApplied && deviceSkippedDueToOtherAgent {
return fmt.Errorf("device not added to any container due to the presence of another agent"), false, deviceSkippedDueToOtherAgent
}

// devicePartiallyApplied is used to indicate that the instrumentation device was partially applied for some of the containers.
devicePartiallyApplied := deviceSkippedDueToOtherAgent && deviceApplied

return nil, deviceApplied, devicePartiallyApplied
return nil, deviceApplied, deviceSkippedDueToOtherAgent
}

// this function restores a workload manifest env vars to their original values.
Expand Down
Loading

0 comments on commit 43a7bce

Please sign in to comment.