Skip to content

Commit 3f6a44c

Browse files
authored
feat: support for parsing ipip packet (#257)
* feat: support for parsing ipip packet This PR introduces a new feature for parsing IPIP packets and correctly associating them. Additionally, this PR improves the current logic in processor.go to prevent the incorrect association of syscall and kernel events. When new events arrive, they are first enqueued and then processed only if they have been in the queue longer than a specified time limit. This is necessary because when many short connections use the same tgid-fd, syscall and kernel events may arrive asynchronously in user space. As a result, events from a new connection might reach user space before the connection event itself, causing the new connection's events to be incorrectly associated with the old connection and leading to erroneous time calculations. And to ensure that the total time calculation is not negative, the syscall event will report the syscall start time and the syscall duration. By adding the start time and the duration, we can determine the end time. This way, when calculating the client's elapsed time, we can subtract the start time of the write syscall from the end time of the read syscall. Additionally, to ensure that DEV_IN and TCP_IN events are present when the server receives the first request, the concept of a first packet event is introduced. Even if the kernel does not find conn_info or other information when reporting the event, as long as its seq=1, it will be considered a first packet. This allows it to be directly reported to user space. In user space, the connection is found based on its sock key, and then it is converted into a kernevent for processing. This way, even for the server's first request, we can see the total time and read from socket time. * fix: remove bpf_printk statements * feat: add first-packet-event-map-page-num option * refactor: translate comments to english
1 parent 8ff2696 commit 3f6a44c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+1557
-534
lines changed

agent/agent.go

+5
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,11 @@ func SetupAgent(options ac.AgentOptions) {
128128
if err != nil {
129129
return
130130
}
131+
firstPacketChannel := make(chan *bpf.AgentFirstPacketEvt, 10)
132+
firstPacketProcessor := conn.NewFirstPacketProcessor(firstPacketChannel, pm.GetFirstPacketEventsChannels())
133+
go firstPacketProcessor.Start()
134+
err = bpf.PullFirstPacketEvents(ctx, firstPacketChannel, options.FirstPacketEventMapPageNum)
135+
131136
err = _bf.AttachProgs(&options)
132137
if err != nil {
133138
return

agent/analysis/stat.go

+74-23
Original file line numberDiff line numberDiff line change
@@ -181,19 +181,22 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
181181
// because we could missed some nicIngressEvents, the total duration may be negative
182182
annotatedRecord.StartTs = math.MaxUint64
183183
if hasNicInEvents {
184-
annotatedRecord.StartTs = min(events.nicIngressEvents[0].GetTimestamp(), annotatedRecord.StartTs)
185-
}
186-
if hasTcpInEvents {
187-
annotatedRecord.StartTs = min(events.tcpInEvents[0].GetTimestamp(), annotatedRecord.StartTs)
188-
}
189-
if hasUserCopyEvents {
190-
annotatedRecord.StartTs = min(events.userCopyEvents[0].GetTimestamp(), annotatedRecord.StartTs)
191-
}
192-
if hasReadSyscallEvents {
193-
annotatedRecord.StartTs = min(events.readSyscallEvents[0].GetTimestamp(), annotatedRecord.StartTs)
184+
nicInTimestamp, _, ok := events.nicIngressEvents[0].GetMinIfItmestampAttr()
185+
if ok {
186+
annotatedRecord.StartTs = min(uint64(nicInTimestamp), annotatedRecord.StartTs)
187+
}
188+
} else if hasTcpInEvents {
189+
annotatedRecord.StartTs = min(events.tcpInEvents[0].GetStartTs(), annotatedRecord.StartTs)
190+
} else if hasUserCopyEvents {
191+
annotatedRecord.StartTs = min(events.userCopyEvents[0].GetStartTs(), annotatedRecord.StartTs)
192+
} else if hasReadSyscallEvents {
193+
annotatedRecord.StartTs = min(events.readSyscallEvents[0].GetStartTs(), annotatedRecord.StartTs)
194194
}
195195
if hasDevOutEvents {
196-
annotatedRecord.EndTs = events.devOutEvents[len(events.devOutEvents)-1].GetTimestamp()
196+
devOutTimestamp, _, ok := events.devOutEvents[len(events.devOutEvents)-1].GetMaxIfItmestampAttr()
197+
if ok {
198+
annotatedRecord.EndTs = uint64(devOutTimestamp)
199+
}
197200
}
198201
if connection.IsSsl() {
199202
annotatedRecord.ReqPlainTextSize = events.ingressMessage.ByteSize()
@@ -205,28 +208,28 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
205208
annotatedRecord.TotalDuration = float64(annotatedRecord.EndTs) - float64(annotatedRecord.StartTs)
206209
}
207210
if hasReadSyscallEvents && hasWriteSyscallEvents {
208-
annotatedRecord.BlackBoxDuration = float64(events.writeSyscallEvents[len(events.writeSyscallEvents)-1].GetTimestamp()) - float64(events.readSyscallEvents[0].GetTimestamp())
211+
annotatedRecord.BlackBoxDuration = float64(events.writeSyscallEvents[len(events.writeSyscallEvents)-1].GetEndTs()) - float64(events.readSyscallEvents[0].GetStartTs())
209212
} else {
210213
annotatedRecord.BlackBoxDuration = float64(events.egressMessage.TimestampNs()) - float64(events.ingressMessage.TimestampNs())
211214
}
212215
if hasUserCopyEvents && hasTcpInEvents {
213-
annotatedRecord.ReadFromSocketBufferDuration = float64(events.userCopyEvents[len(events.userCopyEvents)-1].GetTimestamp()) - float64(events.tcpInEvents[0].GetTimestamp())
216+
annotatedRecord.ReadFromSocketBufferDuration = float64(events.userCopyEvents[len(events.userCopyEvents)-1].GetStartTs()) - float64(events.tcpInEvents[0].GetStartTs())
214217
}
215218
if hasTcpInEvents && hasNicInEvents {
216-
annotatedRecord.CopyToSocketBufferDuration = float64(events.tcpInEvents[len(events.tcpInEvents)-1].GetTimestamp() - events.nicIngressEvents[0].GetTimestamp())
219+
annotatedRecord.CopyToSocketBufferDuration = float64(events.tcpInEvents[len(events.tcpInEvents)-1].GetStartTs() - events.nicIngressEvents[0].GetStartTs())
217220
}
218221
annotatedRecord.ReqSyscallEventDetails = KernEventsToEventDetails[analysisCommon.SyscallEventDetail](events.readSyscallEvents)
219222
annotatedRecord.RespSyscallEventDetails = KernEventsToEventDetails[analysisCommon.SyscallEventDetail](events.writeSyscallEvents)
220223
annotatedRecord.ReqNicEventDetails = KernEventsToNicEventDetails(events.nicIngressEvents)
221224
annotatedRecord.RespNicEventDetails = KernEventsToNicEventDetails(events.devOutEvents)
222225
} else {
223226
if hasWriteSyscallEvents {
224-
annotatedRecord.StartTs = events.writeSyscallEvents[0].GetTimestamp()
227+
annotatedRecord.StartTs = findMinTimestamp(events.writeSyscallEvents, true)
225228
} else {
226229
annotatedRecord.StartTs = events.egressMessage.TimestampNs()
227230
}
228231
if hasReadSyscallEvents {
229-
annotatedRecord.EndTs = events.readSyscallEvents[len(events.readSyscallEvents)-1].GetTimestamp()
232+
annotatedRecord.EndTs = findMaxTimestamp(events.readSyscallEvents, false)
230233
} else {
231234
annotatedRecord.EndTs = events.ingressMessage.TimestampNs()
232235
}
@@ -242,19 +245,43 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
242245
annotatedRecord.TotalDuration = float64(events.ingressMessage.TimestampNs()) - float64(events.egressMessage.TimestampNs())
243246
}
244247
if hasNicInEvents && hasDevOutEvents {
245-
annotatedRecord.BlackBoxDuration = float64(events.nicIngressEvents[len(events.nicIngressEvents)-1].GetTimestamp()) - float64(events.devOutEvents[0].GetTimestamp())
248+
nicIngressTimestamp := int64(0)
249+
for _, nicIngressEvent := range events.nicIngressEvents {
250+
_nicIngressTimestamp, _, ok := nicIngressEvent.GetMinIfItmestampAttr()
251+
if ok {
252+
nicIngressTimestamp = max(nicIngressTimestamp, _nicIngressTimestamp)
253+
}
254+
}
255+
256+
if nicIngressTimestamp != 0 {
257+
nicEgressTimestamp := int64(math.MaxInt64)
258+
for _, devOutEvent := range events.devOutEvents {
259+
_nicEgressTimestamp, _, ok := devOutEvent.GetMaxIfItmestampAttr()
260+
if ok {
261+
nicEgressTimestamp = min(nicEgressTimestamp, _nicEgressTimestamp)
262+
}
263+
}
264+
if nicEgressTimestamp != int64(math.MaxInt64) {
265+
annotatedRecord.BlackBoxDuration = float64(nicIngressTimestamp) - float64(nicEgressTimestamp)
266+
} else {
267+
annotatedRecord.BlackBoxDuration = -1
268+
}
269+
nicEgressTimestamp++
270+
} else {
271+
annotatedRecord.BlackBoxDuration = -1
272+
}
246273
}
247274
if (hasUserCopyEvents || hasReadSyscallEvents) && hasTcpInEvents {
248275
var readFromEndTime float64
249276
if hasUserCopyEvents {
250-
readFromEndTime = float64(events.userCopyEvents[len(events.userCopyEvents)-1].GetTimestamp())
277+
readFromEndTime = float64(events.userCopyEvents[len(events.userCopyEvents)-1].GetStartTs())
251278
} else {
252-
readFromEndTime = float64(events.readSyscallEvents[len(events.readSyscallEvents)-1].GetTimestamp())
279+
readFromEndTime = float64(events.readSyscallEvents[len(events.readSyscallEvents)-1].GetEndTs())
253280
}
254-
annotatedRecord.ReadFromSocketBufferDuration = readFromEndTime - float64(events.tcpInEvents[0].GetTimestamp())
281+
annotatedRecord.ReadFromSocketBufferDuration = readFromEndTime - float64(events.tcpInEvents[0].GetStartTs())
255282
}
256283
if hasTcpInEvents && hasNicInEvents {
257-
annotatedRecord.CopyToSocketBufferDuration = float64(events.tcpInEvents[len(events.tcpInEvents)-1].GetTimestamp() - events.nicIngressEvents[0].GetTimestamp())
284+
annotatedRecord.CopyToSocketBufferDuration = float64(events.tcpInEvents[len(events.tcpInEvents)-1].GetStartTs() - events.nicIngressEvents[0].GetStartTs())
258285
}
259286
annotatedRecord.ReqSyscallEventDetails = KernEventsToEventDetails[analysisCommon.SyscallEventDetail](events.writeSyscallEvents)
260287
annotatedRecord.RespSyscallEventDetails = KernEventsToEventDetails[analysisCommon.SyscallEventDetail](events.readSyscallEvents)
@@ -292,6 +319,30 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
292319
return nil
293320
}
294321

322+
func findMaxTimestamp(events []conn.KernEvent, useStartTs bool) uint64 {
323+
var maxTimestamp uint64 = 0
324+
for _, each := range events {
325+
if useStartTs {
326+
maxTimestamp = max(maxTimestamp, each.GetStartTs())
327+
} else {
328+
maxTimestamp = max(maxTimestamp, each.GetEndTs())
329+
}
330+
}
331+
return maxTimestamp
332+
}
333+
334+
func findMinTimestamp(events []conn.KernEvent, useStartTs bool) uint64 {
335+
var minTimestamp uint64 = math.MaxUint64
336+
for _, each := range events {
337+
if useStartTs {
338+
minTimestamp = min(minTimestamp, each.GetStartTs())
339+
} else {
340+
minTimestamp = min(minTimestamp, each.GetEndTs())
341+
}
342+
}
343+
return minTimestamp
344+
}
345+
295346
func KernEventsToEventDetails[K analysisCommon.PacketEventDetail | analysisCommon.SyscallEventDetail](kernEvents []conn.KernEvent) []K {
296347
if len(kernEvents) == 0 {
297348
return []K{}
@@ -300,7 +351,7 @@ func KernEventsToEventDetails[K analysisCommon.PacketEventDetail | analysisCommo
300351
for _, each := range kernEvents {
301352
result = append(result, K{
302353
ByteSize: each.GetLen(),
303-
Timestamp: each.GetTimestamp(),
354+
Timestamp: each.GetStartTs(),
304355
})
305356
}
306357
return result
@@ -315,7 +366,7 @@ func KernEventsToNicEventDetails(kernEvents []conn.KernEvent) []analysisCommon.N
315366
PacketEventDetail: analysisCommon.PacketEventDetail{
316367

317368
ByteSize: each.GetLen(),
318-
Timestamp: each.GetTimestamp(),
369+
Timestamp: each.GetStartTs(),
319370
},
320371
Attributes: each.GetAttributes(),
321372
})

agent/buffer/stream_buffer.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,11 @@ func (sb *StreamBuffer) FindTimestampBySeq(targetSeq uint64) (uint64, bool) {
128128
return value.(uint64), true
129129
}
130130

131-
func (sb *StreamBuffer) Add(seq uint64, data []byte, timestamp uint64) {
131+
func (sb *StreamBuffer) Add(seq uint64, data []byte, timestamp uint64) bool {
132+
_, found := sb.timestamps.Get(seq)
133+
if found {
134+
return false
135+
}
132136
dataLen := uint64(len(data))
133137
newBuffer := &Buffer{
134138
buf: data,
@@ -137,16 +141,16 @@ func (sb *StreamBuffer) Add(seq uint64, data []byte, timestamp uint64) {
137141
if sb.IsEmpty() {
138142
sb.updateTimestamp(seq, timestamp)
139143
sb.buffers = append(sb.buffers, newBuffer)
140-
return
144+
return true
141145
}
142146
if sb.Position0()-int(seq) >= maxBytesGap {
143-
return
147+
return true
144148
}
145149
if int(seq)-sb.PositionN() >= maxBytesGap {
146150
sb.Clear()
147151
sb.buffers = append(sb.buffers, newBuffer)
148152
sb.updateTimestamp(seq, timestamp)
149-
return
153+
return true
150154
}
151155

152156
leftIndex, leftBuffer := sb.findLeftBufferBySeq(seq)
@@ -180,6 +184,7 @@ func (sb *StreamBuffer) Add(seq uint64, data []byte, timestamp uint64) {
180184
}
181185
sb.updateTimestamp(seq, timestamp)
182186
sb.shrinkBufferUntilSizeBelowCapacity()
187+
return true
183188
}
184189

185190
func (sb *StreamBuffer) updateTimestamp(seq uint64, timestamp uint64) {

agent/common/options.go

+1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ type AgentOptions struct {
6969
SslPerfEventMapPageNum int
7070
ConnPerfEventMapPageNum int
7171
KernPerfEventMapPageNum int
72+
FirstPacketEventMapPageNum int
7273
}
7374

7475
func (o AgentOptions) FilterByContainer() bool {

0 commit comments

Comments
 (0)