Skip to content

Commit

Permalink
Refactor Odiglet main function (#1879)
Browse files Browse the repository at this point in the history
  • Loading branch information
RonFed authored Nov 28, 2024
1 parent b75ada3 commit 32c9aed
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 108 deletions.
5 changes: 3 additions & 2 deletions autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ func main() {
os.Exit(1)
}

go common.StartPprofServer(setupLog)
ctx := ctrl.SetupSignalHandler()
go common.StartPprofServer(ctx, setupLog)

setupLog.Info("Starting odigos autoscaler", "version", odigosVersion)
odigosNs := env.GetCurrentNamespace()
Expand Down Expand Up @@ -274,7 +275,7 @@ func main() {
}

setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
Expand Down
39 changes: 34 additions & 5 deletions common/pprof.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package common

import (
"context"
"errors"
"fmt"
"net/http"

Expand All @@ -9,11 +11,38 @@ import (
)

// StartPprofServer starts the pprof server. This is blocking, so it should be run in a goroutine.
// If the server is unable to start, the process will exit with a non-zero exit code.
func StartPprofServer(logger logr.Logger) {
func StartPprofServer(ctx context.Context, logger logr.Logger) error {
logger.Info("Starting pprof server")
addr := fmt.Sprintf(":%d", consts.PprofOdigosPort)
if err := http.ListenAndServe(addr, nil); err != nil {
logger.Error(err, "unable to start pprof server")

server := &http.Server{Addr: addr, Handler: nil}
done := make(chan struct{})
errChan := make(chan error, 1)

go func() {
defer close(done)
defer close(errChan)
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Error(err, "unable to start pprof server")
errChan <- err
}
}()

// Wait for server startup errors or context cancellation
select {
case err := <-errChan:
if err != nil {
return err // Return if there was an error starting the server
}
case <-ctx.Done():
}
}

// Shutdown the server if the context is done
if err := server.Shutdown(ctx); err != nil {
logger.Error(err, "error shutting down pprof server")
return err
}

<-done
return nil
}
18 changes: 9 additions & 9 deletions frontend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,15 @@ func main() {
return
}

go common.StartPprofServer(logr.FromSlogHandler(slog.Default().Handler()))
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
defer func() {
signal.Stop(ch)
cancel()
}()

go common.StartPprofServer(ctx, logr.FromSlogHandler(slog.Default().Handler()))

// Load destinations data
err := destinations.Load()
Expand All @@ -288,14 +296,6 @@ func main() {
log.Fatalf("Error creating Kubernetes client: %s", err)
}

ctx, cancel := context.WithCancel(context.Background())
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
defer func() {
signal.Stop(ch)
cancel()
}()

odigosMetrics := collectormetrics.NewOdigosMetrics()
var wg sync.WaitGroup
wg.Add(1)
Expand Down
2 changes: 1 addition & 1 deletion instrumentor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func main() {
os.Exit(1)
}

go common.StartPprofServer(setupLog)
go common.StartPprofServer(ctx, setupLog)

if !telemetryDisabled {
go report.Start(mgr.GetClient())
Expand Down
167 changes: 112 additions & 55 deletions odiglet/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package main

import (
"context"
"fmt"
"os"
"sync"

detector "github.com/odigos-io/odigos/odiglet/pkg/detector"
"github.com/odigos-io/odigos/odiglet/pkg/ebpf/sdks"
Expand All @@ -21,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"

Expand All @@ -39,92 +42,146 @@ func odigletInitPhase() {
os.Exit(0)
}

func main() {
// If started in init mode
if len(os.Args) == 2 && os.Args[1] == "init" {
odigletInitPhase()
}

if err := log.Init(); err != nil {
panic(err)
}

log.Logger.V(0).Info("Starting odiglet")

// Load env
if err := env.Load(); err != nil {
log.Logger.Error(err, "Failed to load env")
os.Exit(1)
}
type odiglet struct {
clientset *kubernetes.Clientset
mgr ctrl.Manager
ctx context.Context
ebpfDirectors ebpf.DirectorsMap
}

func newOdiglet() (*odiglet, error) {
// Init Kubernetes API client
cfg, err := rest.InClusterConfig()
if err != nil {
log.Logger.Error(err, "Failed to init Kubernetes API client")
os.Exit(-1)
return nil, fmt.Errorf("Failed to create in-cluster config for Kubernetes client %w", err)
}

clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
log.Logger.Error(err, "Failed to init Kubernetes API client")
os.Exit(-1)
return nil, fmt.Errorf("Failed to create Kubernetes client %w", err)
}

ctx := signals.SetupSignalHandler()

go common.StartPprofServer(log.Logger)
mgr, err := kube.CreateManager()
if err != nil {
return nil, fmt.Errorf("Failed to create controller-runtime manager %w", err)
}

go startDeviceManager(clientset)
ctx := signals.SetupSignalHandler()

procEvents := make(chan detector.ProcessEvent)
runtimeDetector, err := detector.StartRuntimeDetector(ctx, log.Logger, procEvents)
ebpfDirectors, err := initEbpf(ctx, mgr.GetClient(), mgr.GetScheme())
if err != nil {
log.Logger.Error(err, "Failed to start runtime detector")
os.Exit(-1)
return nil, fmt.Errorf("Failed to init eBPF director %w", err)
}

mgr, err := kube.CreateManager()
err = kube.SetupWithManager(mgr, ebpfDirectors, clientset)
if err != nil {
log.Logger.Error(err, "Failed to create controller-runtime manager")
os.Exit(-1)
return nil, fmt.Errorf("Failed to setup controller-runtime manager %w", err)
}

return &odiglet{
clientset: clientset,
mgr: mgr,
ctx: ctx,
ebpfDirectors: ebpfDirectors,
}, nil
}

func (o *odiglet) run() {
var wg sync.WaitGroup

// Start pprof server
wg.Add(1)
go func() {
defer wg.Done()
err := common.StartPprofServer(o.ctx, log.Logger)
if err != nil {
log.Logger.Error(err, "Failed to start pprof server")
} else {
log.Logger.V(0).Info("Pprof server exited")
}
}()

// Start device manager
// the device manager library doesn't support passing a context,
// however, internally it uses a context to cancel the device manager once SIGTERM or SIGINT is received.
wg.Add(1)
go func() {
defer wg.Done()
runDeviceManager(o.clientset)
log.Logger.V(0).Info("Device manager exited")
}()

procEvents := make(chan detector.ProcessEvent)
wg.Add(1)
go func() {
defer wg.Done()
err := detector.StartRuntimeDetector(o.ctx, log.Logger, procEvents)
if err != nil {
log.Logger.Error(err, "Failed to start runtime detector")
os.Exit(-1)
}
log.Logger.V(0).Info("Runtime detector exited")
}()

// start OpAmp server
odigosNs := k8senv.GetCurrentNamespace()
err = server.StartOpAmpServer(ctx, log.Logger, mgr, clientset, env.Current.NodeName, odigosNs)
if err != nil {
log.Logger.Error(err, "Failed to start opamp server")
wg.Add(1)
go func() {
defer wg.Done()
err := server.StartOpAmpServer(o.ctx, log.Logger, o.mgr, o.clientset, env.Current.NodeName, odigosNs)
if err != nil {
log.Logger.Error(err, "Failed to start opamp server")
}
log.Logger.V(0).Info("OpAmp server exited")
}()

// start kube manager
wg.Add(1)
go func() {
defer wg.Done()
err := o.mgr.Start(o.ctx)
if err != nil {
log.Logger.Error(err, "error starting kube manager")
}
log.Logger.V(0).Info("Kube manager exited")
}()

<-o.ctx.Done()
for _, director := range o.ebpfDirectors {
director.Shutdown()
}
wg.Wait()
}

ebpfDirectors, err := initEbpf(ctx, mgr.GetClient(), mgr.GetScheme())
if err != nil {
log.Logger.Error(err, "Failed to init eBPF director")
os.Exit(-1)
func main() {
// If started in init mode
if len(os.Args) == 2 && os.Args[1] == "init" {
odigletInitPhase()
}

err = kube.SetupWithManager(mgr, ebpfDirectors, clientset)
if err != nil {
log.Logger.Error(err, "Failed to setup controller-runtime manager")
os.Exit(-1)
if err := log.Init(); err != nil {
panic(err)
}

err = kube.StartManager(ctx, mgr)
if err != nil {
log.Logger.Error(err, "Failed to start controller-runtime manager")
os.Exit(-1)
}
log.Logger.V(0).Info("Starting odiglet")

<-ctx.Done()
for _, director := range ebpfDirectors {
director.Shutdown()
// Load env
if err := env.Load(); err != nil {
log.Logger.Error(err, "Failed to load env")
os.Exit(1)
}
err = runtimeDetector.Stop()

o, err := newOdiglet()
if err != nil {
log.Logger.Error(err, "Failed to stop runtime detector")
os.Exit(-1)
log.Logger.Error(err, "Failed to initialize odiglet")
os.Exit(1)
}
o.run()

log.Logger.V(0).Info("odiglet exiting")
}

func startDeviceManager(clientset *kubernetes.Clientset) {
func runDeviceManager(clientset *kubernetes.Clientset) {
log.Logger.V(0).Info("Starting device manager")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
33 changes: 11 additions & 22 deletions odiglet/pkg/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package detector

import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
Expand All @@ -16,39 +15,29 @@ import (

type ProcessEvent = detector.ProcessEvent

type Detector struct {
detector *detector.Detector
wg sync.WaitGroup
runError error
}

func StartRuntimeDetector(ctx context.Context, logger logr.Logger, events chan ProcessEvent) (*Detector, error) {
func StartRuntimeDetector(ctx context.Context, logger logr.Logger, events chan ProcessEvent) error {
detector, err := newDetector(ctx, logger, events)
if err != nil {
return nil, fmt.Errorf("failed to create runtime detector: %w", err)
return fmt.Errorf("failed to create runtime detector: %w", err)
}

d := &Detector{detector: detector}
var wg sync.WaitGroup
var runError error

d.wg.Add(1)
wg.Add(1)
go func() {
defer d.wg.Done()
defer wg.Done()
readProcEventsLoop(logger, events)
}()

d.wg.Add(1)
wg.Add(1)
go func() {
defer d.wg.Done()
d.runError = detector.Run(ctx)
defer wg.Done()
runError = detector.Run(ctx)
}()

return d, nil
}

func (d *Detector) Stop() error {
err := d.detector.Stop()
d.wg.Wait()
return errors.Join(d.runError, err)
wg.Wait()
return runError
}

func newDetector(ctx context.Context, logger logr.Logger, events chan ProcessEvent) (*detector.Detector, error) {
Expand Down
Loading

0 comments on commit 32c9aed

Please sign in to comment.