Skip to content

Commit

Permalink
add process monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
nplanel committed Dec 6, 2022
1 parent 448fcdf commit 09465b2
Show file tree
Hide file tree
Showing 5 changed files with 347 additions and 70 deletions.
92 changes: 22 additions & 70 deletions pkg/network/protocols/http/ebpf_gotls.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

"github.com/cilium/ebpf"
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"

"github.com/DataDog/datadog-agent/pkg/network/config"
Expand All @@ -30,7 +29,8 @@ import (
"github.com/DataDog/datadog-agent/pkg/network/protocols/http/gotls"
"github.com/DataDog/datadog-agent/pkg/network/protocols/http/gotls/lookup"
errtelemetry "github.com/DataDog/datadog-agent/pkg/network/telemetry"
"github.com/DataDog/datadog-agent/pkg/process/util"

"github.com/DataDog/datadog-agent/pkg/process/monitor"
"github.com/DataDog/datadog-agent/pkg/util/log"
manager "github.com/DataDog/ebpf-manager"
)
Expand Down Expand Up @@ -131,9 +131,10 @@ type GoTLSProgram struct {

// Process monitor channels
procMonitor struct {
done chan struct{}
events chan netlink.ProcEvent
errors chan error
mon *monitor.ProcessMonitor

cleanupExec func()
cleanupExit func()
}

lock sync.RWMutex
Expand Down Expand Up @@ -182,9 +183,10 @@ func newGoTLSProgram(c *config.Config) *GoTLSProgram {
processes: make(map[pid]binaryID),
}

p.procMonitor.done = make(chan struct{})
p.procMonitor.events = make(chan netlink.ProcEvent, 1000)
p.procMonitor.errors = make(chan error, 1)
p.procMonitor.mon = monitor.GetProcessMonitor(c)
if p.procMonitor.mon == nil {
return nil
}

p.binAnalysisMetric = errtelemetry.NewMetric("gotls.analysis_time", errtelemetry.OptStatsd)

Expand Down Expand Up @@ -233,74 +235,24 @@ func (p *GoTLSProgram) Start() {
return
}

if err := netlink.ProcEventMonitor(p.procMonitor.events, p.procMonitor.done, p.procMonitor.errors); err != nil {
log.Errorf("could not create process monitor: %s", err)
return
}

// This channel is used by the process watcher goroutine (just below) to
// wait until we finished scanning for already running Go processes.
// This is needed to avoid a race condition where an exit event is
// processed during the registration of an already running process,
// which would make the possible impossible to unregister afterwards,
// causing a memory leak.
startDone := make(chan interface{})

// Process watcher events handling goroutine
go func() {
// Wait for the scanning of already running processes to complete
<-startDone

for {
select {
case <-p.procMonitor.done:
return

case event, ok := <-p.procMonitor.events:
if !ok {
return
}

switch ev := event.Msg.(type) {
case *netlink.ExecProcEvent:
p.handleProcessStart(ev.ProcessPid)
case *netlink.ExitProcEvent:
p.handleProcessStop(ev.ProcessPid)

// No default case; the watcher has a
// lot of event types, some of which
// (e.g fork) happen all the time even
// on an idle machine. Logging those
// would flood our logs.
}

case err, ok := <-p.procMonitor.errors:
if !ok {
return
}

log.Errorf("process watcher error: %s", err)
}
}
}()

// Scan already running processes. We allow the process watcher to
// process events afterwards.
go func() {
_ = util.WithAllProcs(p.procRoot, func(pid int) error {
p.handleProcessStart(uint32(pid))
return nil
})
close(startDone)
}()
p.procMonitor.cleanupExec = p.procMonitor.mon.Subscribe(&monitor.ProcessCallback{
Event: monitor.EXEC,
Metadata: monitor.ANY,
Callback: p.handleProcessStart,
})
p.procMonitor.cleanupExit = p.procMonitor.mon.Subscribe(&monitor.ProcessCallback{
Event: monitor.EXIT,
Metadata: monitor.ANY,
Callback: p.handleProcessStop,
})
}

func (p *GoTLSProgram) Stop() {
if p == nil {
return
}

close(p.procMonitor.done)
p.procMonitor.cleanupExec()
p.procMonitor.cleanupExit()
}

func (p *GoTLSProgram) handleProcessStart(pid pid) {
Expand Down
81 changes: 81 additions & 0 deletions pkg/network/protocols/http/ebpf_javatls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
//go:build linux_bpf
// +build linux_bpf

package http

import (
"regexp"

"github.com/DataDog/datadog-agent/pkg/network/config"
"github.com/DataDog/datadog-agent/pkg/network/java"
nettelemetry "github.com/DataDog/datadog-agent/pkg/network/telemetry"
"github.com/DataDog/datadog-agent/pkg/process/monitor"
"github.com/DataDog/datadog-agent/pkg/util/log"
manager "github.com/DataDog/ebpf-manager"
)

type JavaTLSProgram struct {
processMonitor *monitor.ProcessMonitor
cleanupExec func()
}

var _ subprogram = &JavaTLSProgram{}

func newJavaTLSProgram(c *config.Config) *JavaTLSProgram {
if !c.EnableHTTPSMonitoring || !c.EnableJavaTLSSupport {
return nil
}

if !c.EnableRuntimeCompiler {
log.Errorf("goTLS support requires runtime-compilation to be enabled")
return nil
}

pm := monitor.GetProcessMonitor(c)
if pm == nil {
return nil
}
return &JavaTLSProgram{
processMonitor: pm,
}
}

func (p *JavaTLSProgram) ConfigureManager(m *nettelemetry.Manager) {
if p == nil {
return
}

//TODO setup the random id here
}

func (p *JavaTLSProgram) ConfigureOptions(options *manager.Options) {}

func (p *JavaTLSProgram) GetAllUndefinedProbes() (probeList []manager.ProbeIdentificationPair) {
return
}

func newJavaProcess(pid uint32) {
if err := java.InjectAgent(int(pid), "/opt/datadog-agent/embedded/share/system-probe/ebpf/java.agent.jar", ""); err != nil {
log.Errorf("%v", err)
}
}

func (p *JavaTLSProgram) Start() {
if p == nil {
return
}

p.cleanupExec = p.processMonitor.Subscribe(&monitor.ProcessCallback{
Event: monitor.EXEC,
Metadata: monitor.NAME,
Regex: regexp.MustCompile("^java$"),
Callback: newJavaProcess,
})
}

func (p *JavaTLSProgram) Stop() {
if p == nil {
return
}
p.cleanupExec()
}
1 change: 1 addition & 0 deletions pkg/network/protocols/http/ebpf_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func newEBPFProgram(c *config.Config, offsets []manager.ConstantEditor, sockFD *
ebpfSubprograms := []subprogram{
newGoTLSProgram(c),
newSSLProgram(c, sockFD),
newJavaTLSProgram(c),
}

program := &ebpfProgram{
Expand Down
9 changes: 9 additions & 0 deletions pkg/network/protocols/http/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/network/config"
filterpkg "github.com/DataDog/datadog-agent/pkg/network/filter"
errtelemetry "github.com/DataDog/datadog-agent/pkg/network/telemetry"
"github.com/DataDog/datadog-agent/pkg/process/monitor"
)

// Monitor is responsible for:
Expand All @@ -34,6 +35,7 @@ type Monitor struct {
batchManager *batchManager
batchCompletionHandler *ddebpf.PerfHandler
telemetry *telemetry
processMonitor *monitor.ProcessMonitor

pollRequests chan chan map[Key]*RequestStats
statkeeper *httpStatKeeper
Expand Down Expand Up @@ -94,6 +96,8 @@ func NewMonitor(c *config.Config, offsets []manager.ConstantEditor, sockFD *ebpf
return nil, fmt.Errorf("couldn't instantiate batch manager: %w", err)
}

processMonitor := monitor.GetProcessMonitor(c)

return &Monitor{
handler: handler,
ebpfProgram: mgr,
Expand All @@ -103,6 +107,7 @@ func NewMonitor(c *config.Config, offsets []manager.ConstantEditor, sockFD *ebpf
pollRequests: make(chan chan map[Key]*RequestStats),
closeFilterFn: closeFilterFn,
statkeeper: statkeeper,
processMonitor: processMonitor,
}, nil
}

Expand Down Expand Up @@ -149,6 +154,10 @@ func (m *Monitor) Start() error {
}
}()

if m.processMonitor != nil {
m.processMonitor.InitWithAllCurrentProcess()
}

return nil
}

Expand Down
Loading

0 comments on commit 09465b2

Please sign in to comment.