Skip to content

Commit

Permalink
Common Odiglet creation and Run functions (#2020)
Browse files Browse the repository at this point in the history
This PR makes the Odiglet creation and Run function shared so we can
re-use them.
In addition, the Run function is updated to use
[errgroup](https://pkg.go.dev/golang.org/x/sync/errgroup) to manage the
different components.
  • Loading branch information
RonFed authored Dec 18, 2024
1 parent 8d4e7d9 commit c6ff525
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 151 deletions.
162 changes: 12 additions & 150 deletions odiglet/cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,18 @@
package main

import (
"context"
"fmt"
"os"
"sync"

"github.com/odigos-io/odigos/odiglet/pkg/ebpf"
"github.com/odigos-io/odigos/odiglet"
"github.com/odigos-io/odigos/odiglet/pkg/ebpf/sdks"
"github.com/odigos-io/odigos/odiglet/pkg/instrumentation/fs"

"github.com/kubevirt/device-plugin-manager/pkg/dpm"
"github.com/odigos-io/odigos/common"
commonInstrumentation "github.com/odigos-io/odigos/instrumentation"
k8senv "github.com/odigos-io/odigos/k8sutils/pkg/env"
"github.com/odigos-io/odigos/odiglet/pkg/env"
"github.com/odigos-io/odigos/odiglet/pkg/instrumentation"
"github.com/odigos-io/odigos/odiglet/pkg/instrumentation/instrumentlang"
"github.com/odigos-io/odigos/odiglet/pkg/kube"
"github.com/odigos-io/odigos/odiglet/pkg/log"
"github.com/odigos-io/odigos/opampserver/pkg/server"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"

_ "net/http/pprof"
Expand All @@ -40,130 +30,6 @@ func odigletInitPhase() {
os.Exit(0)
}

type odiglet struct {
clientset *kubernetes.Clientset
mgr ctrl.Manager
ebpfManager commonInstrumentation.Manager
configUpdates chan<- commonInstrumentation.ConfigUpdate[ebpf.K8sConfigGroup]
}

const (
configUpdatesBufferSize = 10
)

func newOdiglet() (*odiglet, error) {
// Init Kubernetes API client
cfg, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("Failed to create in-cluster config for Kubernetes client %w", err)
}

clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("Failed to create Kubernetes client %w", err)
}

mgr, err := kube.CreateManager()
if err != nil {
return nil, fmt.Errorf("Failed to create controller-runtime manager %w", err)
}

configUpdates := make(chan commonInstrumentation.ConfigUpdate[ebpf.K8sConfigGroup], configUpdatesBufferSize)
ebpfManager, err := ebpf.NewManager(
mgr.GetClient(),
log.Logger,
map[commonInstrumentation.OtelDistribution]commonInstrumentation.Factory{
commonInstrumentation.OtelDistribution{
Language: common.GoProgrammingLanguage,
OtelSdk: common.OtelSdkEbpfCommunity,
}: sdks.NewGoInstrumentationFactory(),
},
configUpdates,
)
if err != nil {
return nil, fmt.Errorf("Failed to create ebpf manager %w", err)
}

err = kube.SetupWithManager(mgr, nil, clientset, configUpdates)
if err != nil {
return nil, fmt.Errorf("Failed to setup controller-runtime manager %w", err)
}

return &odiglet{
clientset: clientset,
mgr: mgr,
ebpfManager: ebpfManager,
configUpdates: configUpdates,
}, nil
}

func (o *odiglet) run(ctx context.Context) {
var wg sync.WaitGroup

// Start pprof server
wg.Add(1)
go func() {
defer wg.Done()
err := common.StartPprofServer(ctx, log.Logger)
if err != nil {
log.Logger.Error(err, "Failed to start pprof server")
} else {
log.Logger.V(0).Info("Pprof server exited")
}
}()

// Start device manager
// the device manager library doesn't support passing a context,
// however, internally it uses a context to cancel the device manager once SIGTERM or SIGINT is received.
wg.Add(1)
go func() {
defer wg.Done()
runDeviceManager(o.clientset)
log.Logger.V(0).Info("Device manager exited")
}()

wg.Add(1)
go func() {
defer wg.Done()
err := o.ebpfManager.Run(ctx)
if err != nil {
log.Logger.Error(err, "Failed to run ebpf manager")
}
log.Logger.V(0).Info("eBPF manager exited")
}()

// start OpAmp server
odigosNs := k8senv.GetCurrentNamespace()
wg.Add(1)
go func() {
defer wg.Done()
err := server.StartOpAmpServer(ctx, log.Logger, o.mgr, o.clientset, env.Current.NodeName, odigosNs)
if err != nil {
log.Logger.Error(err, "Failed to start opamp server")
}
log.Logger.V(0).Info("OpAmp server exited")
}()

// start kube manager
wg.Add(1)
go func() {
defer wg.Done()
err := o.mgr.Start(ctx)
if err != nil {
log.Logger.Error(err, "error starting kube manager")
} else {
log.Logger.V(0).Info("Kube manager exited")
}
// the manager is stopped, it is now safe to close the config updates channel
if o.configUpdates != nil {
close(o.configUpdates)
}
}()

<-ctx.Done()
wg.Wait()
}

func main() {
// If started in init mode
if len(os.Args) == 2 && os.Args[1] == "init" {
Expand All @@ -182,24 +48,20 @@ func main() {
os.Exit(1)
}

o, err := newOdiglet()
o, err := odiglet.New(deviceInjectionCallbacks(), ebpfInstrumentationFactories())
if err != nil {
log.Logger.Error(err, "Failed to initialize odiglet")
os.Exit(1)
}

ctx := signals.SetupSignalHandler()
o.run(ctx)
o.Run(ctx)

log.Logger.V(0).Info("odiglet exiting")
}

func runDeviceManager(clientset *kubernetes.Clientset) {
log.Logger.V(0).Info("Starting device manager")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

otelSdkLsf := map[common.ProgrammingLanguage]map[common.OtelSdk]instrumentation.LangSpecificFunc{
func deviceInjectionCallbacks() instrumentation.OtelSdksLsf {
return map[common.ProgrammingLanguage]map[common.OtelSdk]instrumentation.LangSpecificFunc{
common.GoProgrammingLanguage: {
common.OtelSdkEbpfCommunity: instrumentlang.Go,
},
Expand All @@ -219,13 +81,13 @@ func runDeviceManager(clientset *kubernetes.Clientset) {
common.OtelSdkNativeCommunity: instrumentlang.Nginx,
},
}
}

lister, err := instrumentation.NewLister(ctx, clientset, otelSdkLsf)
if err != nil {
log.Logger.Error(err, "Failed to create new lister")
os.Exit(-1)
func ebpfInstrumentationFactories() map[commonInstrumentation.OtelDistribution]commonInstrumentation.Factory {
return map[commonInstrumentation.OtelDistribution]commonInstrumentation.Factory{
commonInstrumentation.OtelDistribution{
Language: common.GoProgrammingLanguage,
OtelSdk: common.OtelSdkEbpfCommunity,
}: sdks.NewGoInstrumentationFactory(),
}

manager := dpm.NewManager(lister)
manager.Run()
}
2 changes: 1 addition & 1 deletion odiglet/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
go.opentelemetry.io/otel v1.33.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0
go.uber.org/zap v1.27.0
golang.org/x/sync v0.10.0
google.golang.org/grpc v1.69.0
k8s.io/api v0.32.0
k8s.io/apimachinery v0.32.0
Expand Down Expand Up @@ -94,7 +95,6 @@ require (
golang.org/x/exp v0.0.0-20241210194714-1829a127f884 // indirect
golang.org/x/net v0.32.0 // indirect
golang.org/x/oauth2 v0.24.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/term v0.27.0 // indirect
golang.org/x/text v0.21.0 // indirect
Expand Down
154 changes: 154 additions & 0 deletions odiglet/odiglet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package odiglet

import (
"context"
"fmt"

"github.com/kubevirt/device-plugin-manager/pkg/dpm"
"github.com/odigos-io/odigos/common"
k8senv "github.com/odigos-io/odigos/k8sutils/pkg/env"
"github.com/odigos-io/odigos/odiglet/pkg/ebpf"
"github.com/odigos-io/odigos/odiglet/pkg/env"
"github.com/odigos-io/odigos/odiglet/pkg/instrumentation"
"github.com/odigos-io/odigos/odiglet/pkg/kube"
"github.com/odigos-io/odigos/odiglet/pkg/log"
"github.com/odigos-io/odigos/opampserver/pkg/server"
"golang.org/x/sync/errgroup"

commonInstrumentation "github.com/odigos-io/odigos/instrumentation"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime"
)

type Odiglet struct {
clientset *kubernetes.Clientset
mgr controllerruntime.Manager
ebpfManager commonInstrumentation.Manager
configUpdates chan<- commonInstrumentation.ConfigUpdate[ebpf.K8sConfigGroup]
deviceInjectionCallbacks instrumentation.OtelSdksLsf
}

const (
configUpdatesBufferSize = 10
)

// New creates a new Odiglet instance.
func New(deviceInjectionCallbacks instrumentation.OtelSdksLsf, factories map[commonInstrumentation.OtelDistribution]commonInstrumentation.Factory) (*Odiglet, error) {
// Init Kubernetes API client
cfg, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed to create in-cluster config for Kubernetes client %w", err)
}

clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("failed to create Kubernetes client %w", err)
}

mgr, err := kube.CreateManager()
if err != nil {
return nil, fmt.Errorf("failed to create controller-runtime manager %w", err)
}

configUpdates := make(chan commonInstrumentation.ConfigUpdate[ebpf.K8sConfigGroup], configUpdatesBufferSize)
ebpfManager, err := ebpf.NewManager(mgr.GetClient(), log.Logger, factories, configUpdates)
if err != nil {
return nil, fmt.Errorf("failed to create ebpf manager %w", err)
}

err = kube.SetupWithManager(mgr, nil, clientset, configUpdates)
if err != nil {
return nil, fmt.Errorf("failed to setup controller-runtime manager %w", err)
}

return &Odiglet{
clientset: clientset,
mgr: mgr,
ebpfManager: ebpfManager,
configUpdates: configUpdates,
deviceInjectionCallbacks: deviceInjectionCallbacks,
}, nil
}

// Run starts the Odiglet components and blocks until the context is cancelled, or a critical error occurs.
func (o *Odiglet) Run(ctx context.Context) {
g, groupCtx := errgroup.WithContext(ctx)

// Start pprof server
g.Go(func() error {
err := common.StartPprofServer(groupCtx, log.Logger)
if err != nil {
log.Logger.Error(err, "Failed to start pprof server")
} else {
log.Logger.V(0).Info("Pprof server exited")
}
// if we fail to start the pprof server, don't return an error as it is not critical
// and we can run the rest of the components
return nil
})

// Start device manager
// the device manager library doesn't support passing a context,
// however, internally it uses a context to cancel the device manager once SIGTERM or SIGINT is received.
g.Go(func() error {
err := runDeviceManager(o.clientset, o.deviceInjectionCallbacks)
log.Logger.V(0).Info("Device manager exited")
return err
})

g.Go(func() error {
err := o.ebpfManager.Run(ctx)
if err != nil {
log.Logger.Error(err, "Failed to run ebpf manager")
}
log.Logger.V(0).Info("eBPF manager exited")
return err
})

// start OpAmp server
odigosNs := k8senv.GetCurrentNamespace()
g.Go(func() error {
err := server.StartOpAmpServer(ctx, log.Logger, o.mgr, o.clientset, env.Current.NodeName, odigosNs)
if err != nil {
log.Logger.Error(err, "Failed to start opamp server")
}
log.Logger.V(0).Info("OpAmp server exited")
return err
})

// start kube manager
g.Go(func() error {
err := o.mgr.Start(ctx)
if err != nil {
log.Logger.Error(err, "error starting kube manager")
} else {
log.Logger.V(0).Info("Kube manager exited")
}
// the manager is stopped, it is now safe to close the config updates channel
if o.configUpdates != nil {
close(o.configUpdates)
}
return err
})

err := g.Wait()
if err != nil {
log.Logger.Error(err, "Odiglet exited with error")
}
}

func runDeviceManager(clientset *kubernetes.Clientset, otelSdkLsf instrumentation.OtelSdksLsf) error {
log.Logger.V(0).Info("Starting device manager")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

lister, err := instrumentation.NewLister(ctx, clientset, otelSdkLsf)
if err != nil {
return fmt.Errorf("failed to create device manager lister %w", err)
}

manager := dpm.NewManager(lister)
manager.Run()
return nil
}

0 comments on commit c6ff525

Please sign in to comment.