Skip to content

Commit 4de7dfb

Browse files
zeeboStorj Robot
authored and
Storj Robot
committed
debug: add new trace endpoint with packet capture
Change-Id: I769fface8029336d4ffba0717a2362492502cb86
1 parent 0a3388d commit 4de7dfb

File tree

9 files changed

+368
-11
lines changed

9 files changed

+368
-11
lines changed

debug/endpoint.go

+202-1
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,17 @@ package debug
55

66
import (
77
"context"
8+
"net"
9+
"reflect"
810
"runtime"
911
"runtime/trace"
12+
"sync"
13+
"syscall"
14+
"time"
1015

16+
"github.com/google/gopacket"
17+
"github.com/google/gopacket/layers"
18+
"github.com/google/gopacket/pcapgo"
1119
"github.com/zeebo/errs"
1220

1321
"storj.io/common/pb"
@@ -18,6 +26,8 @@ import (
1826
type Endpoint struct {
1927
pb.DRPCDebugUnimplementedServer
2028

29+
mu sync.Mutex
30+
2131
Auth func(ctx context.Context) error
2232
}
2333

@@ -33,11 +43,13 @@ func (f *Endpoint) CollectRuntimeTraces(_ *pb.CollectRuntimeTracesRequest, strea
3343
if err := f.Auth(stream.Context()); err != nil {
3444
return rpcstatus.Wrap(rpcstatus.Unauthenticated, err)
3545
}
36-
3746
if !traceEnabled {
3847
return rpcstatus.Wrap(rpcstatus.FailedPrecondition, errs.New("trace is not enabled: %v", runtime.Version()))
3948
}
4049

50+
f.mu.Lock()
51+
defer f.mu.Unlock()
52+
4153
if err := trace.Start(&streamWriter{stream: stream}); err != nil {
4254
return rpcstatus.Wrap(rpcstatus.FailedPrecondition, errs.New("trace failed to start: %w", err))
4355
}
@@ -48,6 +60,164 @@ func (f *Endpoint) CollectRuntimeTraces(_ *pb.CollectRuntimeTracesRequest, strea
4860
return nil
4961
}
5062

63+
// CollectRuntimeTraces2 will stream trace data to the client until the client sends a done message
64+
// some error happens, and it then flushes the trace data and captured packet data.
65+
func (f *Endpoint) CollectRuntimeTraces2(stream pb.DRPCDebug_CollectRuntimeTraces2Stream) error {
66+
if err := f.Auth(stream.Context()); err != nil {
67+
return rpcstatus.Wrap(rpcstatus.Unauthenticated, err)
68+
}
69+
if !traceEnabled {
70+
return rpcstatus.Wrap(rpcstatus.FailedPrecondition, errs.New("trace is not enabled: %v", runtime.Version()))
71+
}
72+
73+
f.mu.Lock()
74+
defer f.mu.Unlock()
75+
76+
var done sync.WaitGroup
77+
defer done.Wait()
78+
79+
type Handle struct {
80+
eh *pcapgo.EthernetHandle
81+
iface net.Interface
82+
}
83+
84+
if err := trace.Start(&streamWriter{stream: stream}); err != nil {
85+
return rpcstatus.Wrap(rpcstatus.FailedPrecondition, errs.New("trace failed to start: %w", err))
86+
}
87+
stopped := false
88+
defer func() {
89+
if !stopped {
90+
trace.Stop()
91+
}
92+
}()
93+
94+
// start the packet capture
95+
var handles []Handle
96+
ifaces, err := net.Interfaces() // ignore errors because pcap is supplemental
97+
trace.Logf(stream.Context(), "trace-debug", "found %d interfaces (err:%v)", len(ifaces), err)
98+
for _, iface := range ifaces {
99+
trace.Logf(stream.Context(), "trace-debug", "checking interface %q", iface.Name)
100+
101+
if iface.Flags&net.FlagLoopback != 0 {
102+
continue
103+
}
104+
if iface.Flags&(net.FlagUp|net.FlagRunning) != net.FlagUp|net.FlagRunning {
105+
continue
106+
}
107+
if len(iface.HardwareAddr) == 0 {
108+
continue
109+
}
110+
addrs, err := iface.Addrs()
111+
if err != nil {
112+
continue
113+
}
114+
hasIPv4 := false
115+
for _, addr := range addrs {
116+
ip, _ := addr.(*net.IPNet)
117+
hasIPv4 = hasIPv4 || len(ip.IP.To4()) == net.IPv4len
118+
}
119+
if !hasIPv4 {
120+
continue
121+
}
122+
123+
eh, err := pcapgo.NewEthernetHandle(iface.Name)
124+
if err != nil {
125+
trace.Logf(stream.Context(), "trace-debug", "could not open handle for %q: %v", iface.Name, err)
126+
continue
127+
}
128+
trace.Logf(stream.Context(), "trace-debug", "opened handle for %q", iface.Name)
129+
defer eh.Close()
130+
131+
handles = append(handles, Handle{
132+
eh: eh,
133+
iface: iface,
134+
})
135+
}
136+
137+
for _, handle := range handles {
138+
handle := handle // avoid loop capture bug
139+
140+
done.Add(1)
141+
go func() {
142+
defer done.Done()
143+
144+
src := gopacket.NewPacketSource(handle.eh, layers.LinkTypeEthernet)
145+
for {
146+
packet, err := src.NextPacket()
147+
if err != nil || !trace.IsEnabled() {
148+
return
149+
}
150+
151+
tcp, _ := packet.Layer(layers.LayerTypeTCP).(*layers.TCP)
152+
ip, _ := packet.Layer(layers.LayerTypeIPv4).(*layers.IPv4)
153+
if tcp == nil || ip == nil {
154+
continue
155+
}
156+
157+
trace.Logf(stream.Context(), "tcp-packet",
158+
"if:%s local:%s:%d remote:%s:%d seq:%d ack:%d flags:%d window:%d payload:%d fragoff:%d ts:%d",
159+
handle.iface.Name,
160+
ip.SrcIP,
161+
tcp.SrcPort,
162+
ip.DstIP,
163+
tcp.DstPort,
164+
tcp.Seq,
165+
tcp.Ack,
166+
makeTCPFlags(tcp),
167+
tcp.Window,
168+
len(tcp.Payload),
169+
ip.FragOffset,
170+
packet.Metadata().Timestamp.UnixNano(),
171+
)
172+
}
173+
}()
174+
}
175+
176+
// wait for a done message or error from caller
177+
for {
178+
msg, err := stream.Recv()
179+
if err != nil {
180+
return err
181+
}
182+
if msg.Done {
183+
break
184+
}
185+
}
186+
187+
stopped = true
188+
trace.Stop()
189+
190+
// wait for all of the handles to be done and send a signal when they are
191+
doneWaiting := make(chan struct{})
192+
go func() {
193+
done.Wait()
194+
close(doneWaiting)
195+
}()
196+
197+
select {
198+
case <-doneWaiting:
199+
// all of the handles are not being used anymore, so we can do clean
200+
// close calls
201+
for _, handle := range handles {
202+
handle.eh.Close()
203+
}
204+
205+
case <-time.After(10 * time.Second):
206+
// at least one handle is still being used and so after 10 seconds is
207+
// almost certainly blocked in the syscall, so we can safely interrupt
208+
// it with a close call on the fd and be reasonably sure that no reads
209+
// will happen on a new socket that got the same fd.
210+
for _, handle := range handles {
211+
fd := reflect.ValueOf(handle.eh).Elem().FieldByName("fd").Int()
212+
_ = syscall.Close(int(fd))
213+
}
214+
215+
<-doneWaiting
216+
}
217+
218+
return nil
219+
}
220+
51221
type streamWriter struct {
52222
stream pb.DRPCDebug_CollectRuntimeTracesStream
53223
}
@@ -59,3 +229,34 @@ func (s *streamWriter) Write(p []byte) (int, error) {
59229
}
60230
return len(p), nil
61231
}
232+
233+
func makeTCPFlags(t *layers.TCP) (f uint32) {
234+
if t.FIN {
235+
f |= 0x0001
236+
}
237+
if t.SYN {
238+
f |= 0x0002
239+
}
240+
if t.RST {
241+
f |= 0x0004
242+
}
243+
if t.PSH {
244+
f |= 0x0008
245+
}
246+
if t.ACK {
247+
f |= 0x0010
248+
}
249+
if t.URG {
250+
f |= 0x0020
251+
}
252+
if t.ECE {
253+
f |= 0x0040
254+
}
255+
if t.CWR {
256+
f |= 0x0080
257+
}
258+
if t.NS {
259+
f |= 0x0100
260+
}
261+
return f
262+
}

debug/endpoint_test.go

+53-3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package debug
66
import (
77
"context"
88
"errors"
9+
"io"
910
"runtime/trace"
1011
"testing"
1112

@@ -17,7 +18,7 @@ import (
1718
"storj.io/drpc"
1819
)
1920

20-
func TestRemoteDebugServer_Unauthenticated(t *testing.T) {
21+
func TestRemoteDebugServer_CollectRuntimeTraces_Unauthenticated(t *testing.T) {
2122
endpoint := NewEndpoint(func(ctx context.Context) error {
2223
return errors.New("unauthenticated")
2324
})
@@ -26,7 +27,7 @@ func TestRemoteDebugServer_Unauthenticated(t *testing.T) {
2627
require.Error(t, endpoint.CollectRuntimeTraces(nil, stream), "unauthenticated")
2728
}
2829

29-
func TestRemoteDebugServer_SomeData(t *testing.T) {
30+
func TestRemoteDebugServer_CollectRuntimeTraces_SomeData(t *testing.T) {
3031
if trace.IsEnabled() {
3132
t.Skip("tracing already enabled")
3233
} else if !traceEnabled {
@@ -54,15 +55,57 @@ func TestRemoteDebugServer_SomeData(t *testing.T) {
5455
require.NotZero(t, len(stream.data))
5556
}
5657

58+
func TestRemoteDebugServer_CollectRuntimeTraces2_Unauthenticated(t *testing.T) {
59+
endpoint := NewEndpoint(func(ctx context.Context) error {
60+
return errors.New("unauthenticated")
61+
})
62+
63+
stream := newFakeStream(context.Background())
64+
require.Error(t, endpoint.CollectRuntimeTraces2(stream), "unauthenticated")
65+
}
66+
67+
func TestRemoteDebugServer_CollectRuntimeTraces2_SomeData(t *testing.T) {
68+
if trace.IsEnabled() {
69+
t.Skip("tracing already enabled")
70+
} else if !traceEnabled {
71+
t.Skip("tracing not enabled")
72+
}
73+
74+
endpoint := NewEndpoint(func(ctx context.Context) error { return nil })
75+
76+
stream := newFakeStream(context.Background())
77+
78+
var group errgroup.Group
79+
group.Go(func() error {
80+
return endpoint.CollectRuntimeTraces2(stream)
81+
})
82+
group.Go(func() error {
83+
<-stream.written.Done()
84+
stream.InjectDone(true)
85+
return nil
86+
})
87+
require.NoError(t, group.Wait())
88+
89+
require.NotZero(t, len(stream.data))
90+
}
91+
5792
type fakeStream struct {
5893
drpc.Stream // expected nil but just implements interface
5994

6095
ctx context.Context
6196
written sync2.Fence
6297
data []byte
98+
reqs chan bool
99+
}
100+
101+
func newFakeStream(ctx context.Context) *fakeStream {
102+
return &fakeStream{
103+
ctx: ctx,
104+
reqs: make(chan bool),
105+
}
63106
}
64107

65-
func newFakeStream(ctx context.Context) *fakeStream { return &fakeStream{ctx: ctx} }
108+
func (f *fakeStream) InjectDone(done bool) { f.reqs <- done }
66109

67110
func (f *fakeStream) Context() context.Context { return f.ctx }
68111
func (f *fakeStream) Send(m *pb.CollectRuntimeTracesResponse) error {
@@ -72,3 +115,10 @@ func (f *fakeStream) Send(m *pb.CollectRuntimeTracesResponse) error {
72115
}
73116
return nil
74117
}
118+
func (f *fakeStream) Recv() (*pb.CollectRuntimeTracesRequest, error) {
119+
done, ok := <-f.reqs
120+
if !ok {
121+
return nil, io.EOF
122+
}
123+
return &pb.CollectRuntimeTracesRequest{Done: done}, nil
124+
}

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/calebcase/tmpfile v1.0.3
1010
github.com/flynn/noise v1.0.0
1111
github.com/gogo/protobuf v1.3.2
12+
github.com/google/gopacket v1.1.19
1213
github.com/google/pprof v0.0.0-20230602150820-91b7bce49751
1314
github.com/jtolds/tracetagger/v2 v2.0.0-rc5
1415
github.com/jtolio/crawlspace v0.0.0-20231116162947-3ec5cc6b36c5

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
125125
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
126126
github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ=
127127
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
128+
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
129+
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
128130
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
129131
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
130132
github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw=

pb/debug.pb.go

+8
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pb/debug.proto

+2
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ package debug;
88

99
service Debug {
1010
rpc CollectRuntimeTraces(CollectRuntimeTracesRequest) returns (stream CollectRuntimeTracesResponse);
11+
rpc CollectRuntimeTraces2(stream CollectRuntimeTracesRequest) returns (stream CollectRuntimeTracesResponse);
1112
}
1213

1314
message CollectRuntimeTracesRequest {
15+
bool done = 1;
1416
}
1517

1618
message CollectRuntimeTracesResponse {

0 commit comments

Comments
 (0)