Skip to content

Commit

Permalink
Pass telemetry directly to the Controller
Browse files Browse the repository at this point in the history
Do not enqueue telemetry to a channel just to dequeue it and send it to
the Controller.
  • Loading branch information
MrAlias committed Nov 6, 2024
1 parent 454a9ca commit 8a456a8
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 21 deletions.
11 changes: 2 additions & 9 deletions internal/pkg/instrumentation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/rlimit"

"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel/trace"

dbSql "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/database/sql"
Expand Down Expand Up @@ -58,7 +57,6 @@ type Manager struct {
exe *link.Executable
td *process.TargetDetails
runningProbesWG sync.WaitGroup
telemetryCh chan ptrace.ScopeSpans
currentConfig Config
probeMu sync.Mutex
state managerState
Expand All @@ -74,7 +72,6 @@ func NewManager(logger *slog.Logger, otelController *opentelemetry.Controller, g
globalImpl: globalImpl,
loadedIndicator: loadIndicator,
cp: cp,
telemetryCh: make(chan ptrace.ScopeSpans),
}

err := m.registerProbes()
Expand Down Expand Up @@ -227,7 +224,7 @@ func (m *Manager) runProbe(p probe.Probe) {
m.runningProbesWG.Add(1)
go func(ap probe.Probe) {
defer m.runningProbesWG.Done()
ap.Run(m.telemetryCh)
ap.Run(m.otelController.Trace)
}(p)
}

Expand Down Expand Up @@ -290,19 +287,15 @@ func (m *Manager) Run(ctx context.Context, target *process.TargetDetails) error
m.logger.Debug("Shutting down all probes")
err := m.cleanup(target)

// Wait for all probes to stop before closing the chan they send on.
// Wait for all probes to stop.
m.runningProbesWG.Wait()
close(m.telemetryCh)

m.state = managerStateStopped
m.probeMu.Unlock()

done <- errors.Join(err, ctx.Err())
}()

for e := range m.telemetryCh {
m.otelController.Trace(e)
}
return <-done
}

Expand Down
11 changes: 4 additions & 7 deletions internal/pkg/instrumentation/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ func TestRunStopping(t *testing.T) {
otelController: ctrl,
logger: slog.Default(),
probes: map[probe.ID]probe.Probe{{}: p},
telemetryCh: make(chan ptrace.ScopeSpans),
cp: NewNoopConfigProvider(nil),
}

Expand Down Expand Up @@ -288,7 +287,7 @@ func (p slowProbe) Load(*link.Executable, *process.TargetDetails, *sampling.Conf
return nil
}

func (p slowProbe) Run(c chan<- ptrace.ScopeSpans) {
func (p slowProbe) Run(func(ptrace.ScopeSpans)) {
}

func (p slowProbe) Close() error {
Expand All @@ -308,7 +307,7 @@ func (p *noopProbe) Load(*link.Executable, *process.TargetDetails, *sampling.Con
return nil
}

func (p *noopProbe) Run(c chan<- ptrace.ScopeSpans) {
func (p *noopProbe) Run(func(ptrace.ScopeSpans)) {
p.running = true
}

Expand Down Expand Up @@ -370,7 +369,6 @@ func TestConfigProvider(t *testing.T) {
netHTTPServerProbeID: &noopProbe{},
somePackageProducerProbeID: &noopProbe{},
},
telemetryCh: make(chan ptrace.ScopeSpans),
cp: newDummyProvider(Config{
InstrumentationLibraryConfigs: map[LibraryID]Library{
netHTTPClientLibID: {TracesEnabled: &falseVal},
Expand Down Expand Up @@ -475,10 +473,10 @@ func (p *hangingProbe) Load(*link.Executable, *process.TargetDetails, *sampling.
return nil
}

func (p *hangingProbe) Run(c chan<- ptrace.ScopeSpans) {
func (p *hangingProbe) Run(handle func(ptrace.ScopeSpans)) {
<-p.closeReturned
// Write after Close has returned.
c <- ptrace.NewScopeSpans()
handle(ptrace.NewScopeSpans())
}

func (p *hangingProbe) Close() error {
Expand All @@ -498,7 +496,6 @@ func TestRunStopDeadlock(t *testing.T) {
otelController: ctrl,
logger: slog.Default(),
probes: map[probe.ID]probe.Probe{{}: p},
telemetryCh: make(chan ptrace.ScopeSpans),
cp: NewNoopConfigProvider(nil),
}

Expand Down
10 changes: 5 additions & 5 deletions internal/pkg/instrumentation/probe/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Probe interface {
Load(*link.Executable, *process.TargetDetails, *sampling.Config) error

// Run runs the events processing loop.
Run(tracesChan chan<- ptrace.ScopeSpans)
Run(func(ptrace.ScopeSpans))

// Close stops the Probe.
Close() error
Expand Down Expand Up @@ -267,7 +267,7 @@ type SpanProducer[BPFObj any, BPFEvent any] struct {
}

// Run runs the events processing loop.
func (i *SpanProducer[BPFObj, BPFEvent]) Run(dest chan<- ptrace.ScopeSpans) {
func (i *SpanProducer[BPFObj, BPFEvent]) Run(handle func(ptrace.ScopeSpans)) {
for {
event, err := i.read()
if err != nil {
Expand All @@ -285,7 +285,7 @@ func (i *SpanProducer[BPFObj, BPFEvent]) Run(dest chan<- ptrace.ScopeSpans) {

i.ProcessFn(event).CopyTo(ss.Spans())

dest <- ss
handle(ss)
}
}

Expand All @@ -296,7 +296,7 @@ type TraceProducer[BPFObj any, BPFEvent any] struct {
}

// Run runs the events processing loop.
func (i *TraceProducer[BPFObj, BPFEvent]) Run(dest chan<- ptrace.ScopeSpans) {
func (i *TraceProducer[BPFObj, BPFEvent]) Run(handle func(ptrace.ScopeSpans)) {
for {
event, err := i.read()
if err != nil {
Expand All @@ -306,7 +306,7 @@ func (i *TraceProducer[BPFObj, BPFEvent]) Run(dest chan<- ptrace.ScopeSpans) {
continue
}

dest <- i.ProcessFn(event)
handle(i.ProcessFn(event))
}
}

Expand Down

0 comments on commit 8a456a8

Please sign in to comment.