Skip to content

Commit

Permalink
feat: support for parsing ipip packet
Browse files Browse the repository at this point in the history
  • Loading branch information
hengyoush committed Jan 5, 2025
1 parent 58ce206 commit 787e3eb
Show file tree
Hide file tree
Showing 15 changed files with 829 additions and 345 deletions.
56 changes: 51 additions & 5 deletions agent/analysis/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,10 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
// because we could missed some nicIngressEvents, the total duration may be negative
annotatedRecord.StartTs = math.MaxUint64
if hasNicInEvents {
annotatedRecord.StartTs = min(events.nicIngressEvents[0].GetTimestamp(), annotatedRecord.StartTs)
nicInTimestamp, _, ok := events.nicIngressEvents[0].GetMinIfItmestampAttr()
if ok {
annotatedRecord.StartTs = min(uint64(nicInTimestamp), annotatedRecord.StartTs)
}
}
if hasTcpInEvents {
annotatedRecord.StartTs = min(events.tcpInEvents[0].GetTimestamp(), annotatedRecord.StartTs)
Expand All @@ -193,7 +196,10 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
annotatedRecord.StartTs = min(events.readSyscallEvents[0].GetTimestamp(), annotatedRecord.StartTs)
}
if hasDevOutEvents {
annotatedRecord.EndTs = events.devOutEvents[len(events.devOutEvents)-1].GetTimestamp()
devOutTimestamp, _, ok := events.devOutEvents[len(events.devOutEvents)-1].GetMaxIfItmestampAttr()
if ok {
annotatedRecord.EndTs = uint64(devOutTimestamp)
}
}
if connection.IsSsl() {
annotatedRecord.ReqPlainTextSize = events.ingressMessage.ByteSize()
Expand Down Expand Up @@ -221,12 +227,12 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
annotatedRecord.RespNicEventDetails = KernEventsToNicEventDetails(events.devOutEvents)
} else {
if hasWriteSyscallEvents {
annotatedRecord.StartTs = events.writeSyscallEvents[0].GetTimestamp()
annotatedRecord.StartTs = findMinTimestamp(events.writeSyscallEvents)
} else {
annotatedRecord.StartTs = events.egressMessage.TimestampNs()
}
if hasReadSyscallEvents {
annotatedRecord.EndTs = events.readSyscallEvents[len(events.readSyscallEvents)-1].GetTimestamp()
annotatedRecord.EndTs = findMaxTimestamp(events.readSyscallEvents)
} else {
annotatedRecord.EndTs = events.ingressMessage.TimestampNs()
}
Expand All @@ -242,7 +248,31 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
annotatedRecord.TotalDuration = float64(events.ingressMessage.TimestampNs()) - float64(events.egressMessage.TimestampNs())
}
if hasNicInEvents && hasDevOutEvents {
annotatedRecord.BlackBoxDuration = float64(events.nicIngressEvents[len(events.nicIngressEvents)-1].GetTimestamp()) - float64(events.devOutEvents[0].GetTimestamp())
nicIngressTimestamp := int64(0)
for _, nicIngressEvent := range events.nicIngressEvents {
_nicIngressTimestamp, _, ok := nicIngressEvent.GetMinIfItmestampAttr()
if ok {
nicIngressTimestamp = max(nicIngressTimestamp, _nicIngressTimestamp)
}
}

if nicIngressTimestamp != 0 {
nicEgressTimestamp := int64(math.MaxInt64)
for _, devOutEvent := range events.devOutEvents {
_nicEgressTimestamp, _, ok := devOutEvent.GetMaxIfItmestampAttr()
if ok {
nicEgressTimestamp = min(nicEgressTimestamp, _nicEgressTimestamp)
}
}
if nicEgressTimestamp != int64(math.MaxInt64) {
annotatedRecord.BlackBoxDuration = float64(nicIngressTimestamp) - float64(nicEgressTimestamp)
} else {
annotatedRecord.BlackBoxDuration = -1
}
nicEgressTimestamp++
} else {
annotatedRecord.BlackBoxDuration = -1
}
}
if (hasUserCopyEvents || hasReadSyscallEvents) && hasTcpInEvents {
var readFromEndTime float64
Expand Down Expand Up @@ -292,6 +322,22 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
return nil
}

func findMaxTimestamp(events []conn.KernEvent) uint64 {
var maxTimestamp uint64 = 0
for _, each := range events {
maxTimestamp = max(maxTimestamp, each.GetTimestamp())
}
return maxTimestamp
}

func findMinTimestamp(events []conn.KernEvent) uint64 {
var minTimestamp uint64 = math.MaxUint64
for _, each := range events {
minTimestamp = min(minTimestamp, each.GetTimestamp())
}
return minTimestamp
}

func KernEventsToEventDetails[K analysisCommon.PacketEventDetail | analysisCommon.SyscallEventDetail](kernEvents []conn.KernEvent) []K {
if len(kernEvents) == 0 {
return []K{}
Expand Down
13 changes: 9 additions & 4 deletions agent/buffer/stream_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,11 @@ func (sb *StreamBuffer) FindTimestampBySeq(targetSeq uint64) (uint64, bool) {
return value.(uint64), true
}

func (sb *StreamBuffer) Add(seq uint64, data []byte, timestamp uint64) {
func (sb *StreamBuffer) Add(seq uint64, data []byte, timestamp uint64) bool {
_, found := sb.timestamps.Get(seq)
if found {
return false
}
dataLen := uint64(len(data))
newBuffer := &Buffer{
buf: data,
Expand All @@ -137,16 +141,16 @@ func (sb *StreamBuffer) Add(seq uint64, data []byte, timestamp uint64) {
if sb.IsEmpty() {
sb.updateTimestamp(seq, timestamp)
sb.buffers = append(sb.buffers, newBuffer)
return
return true
}
if sb.Position0()-int(seq) >= maxBytesGap {
return
return true
}
if int(seq)-sb.PositionN() >= maxBytesGap {
sb.Clear()
sb.buffers = append(sb.buffers, newBuffer)
sb.updateTimestamp(seq, timestamp)
return
return true
}

leftIndex, leftBuffer := sb.findLeftBufferBySeq(seq)
Expand Down Expand Up @@ -180,6 +184,7 @@ func (sb *StreamBuffer) Add(seq uint64, data []byte, timestamp uint64) {
}
sb.updateTimestamp(seq, timestamp)
sb.shrinkBufferUntilSizeBelowCapacity()
return true
}

func (sb *StreamBuffer) updateTimestamp(seq uint64, timestamp uint64) {
Expand Down
50 changes: 35 additions & 15 deletions agent/conn/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (c *ConnManager) FindConnection4Exactly(TgidFd uint64) *Connection4 {
}
}

func (c *ConnManager) FindConnection4Or(TgidFd uint64, ts uint64) *Connection4 {
func (c *ConnManager) LookupConnection4ByTimestamp(TgidFd uint64, ts uint64) *Connection4 {
v, _ := c.connMap.Load(TgidFd)
connection, _ := v.(*Connection4)
if connection == nil {
Expand All @@ -219,14 +219,8 @@ func (c *ConnManager) FindConnection4Or(TgidFd uint64, ts uint64) *Connection4 {
return connection
} else {
curConnList := connection.prevConn
if len(curConnList) > 0 {
lastPrevConn := curConnList[len(curConnList)-1]
if lastPrevConn.CloseTs != 0 && lastPrevConn.CloseTs < ts {
return connection
}
}
for idx := len(curConnList) - 1; idx >= 0; idx-- {
if curConnList[idx].ConnectStartTs < ts {
if curConnList[idx].timeBoundCheck(ts) {
return curConnList[idx]
}
}
Expand Down Expand Up @@ -265,6 +259,19 @@ func (c *Connection4) ProtocolInferred() bool {
return (c.Protocol != bpf.AgentTrafficProtocolTKProtocolUnknown) && (c.Protocol != bpf.AgentTrafficProtocolTKProtocolUnset)
}

func (c *Connection4) timeBoundCheck(toCheck uint64) bool {
if c.ConnectStartTs == 0 {
return true
}
if toCheck < c.ConnectStartTs {
return false
}
if c.CloseTs != 0 && toCheck > c.CloseTs {
return false
}
return true
}

func (c *Connection4) extractSockKeys() (bpf.AgentSockKey, bpf.AgentSockKey) {
var key bpf.AgentSockKey
key.Dip = [2]uint64(common.BytesToSockKey(c.RemoteIp))
Expand Down Expand Up @@ -349,7 +356,10 @@ func (c *Connection4) doUpdateConnIdMapProtocolToUnknwon(key bpf.AgentSockKey, m
func (c *Connection4) OnKernEvent(event *bpf.AgentKernEvt) bool {
isReq, ok := isReq(c, event)
if event.Len > 0 {
c.StreamEvents.AddKernEvent(event)
alreadyExisted := c.StreamEvents.AddKernEvent(event)
if !alreadyExisted {
return false
}
} else if ok {
if (event.Flags&uint8(common.TCP_FLAGS_SYN) != 0) && !isReq && event.Step == bpf.AgentStepTIP_IN {
// 接收到Server给的Syn包
Expand All @@ -375,12 +385,16 @@ func (c *Connection4) OnKernEvent(event *bpf.AgentKernEvt) bool {
}
return true
}
func (c *Connection4) addDataToBufferAndTryParse(data []byte, ke *bpf.AgentKernEvt) {
func (c *Connection4) addDataToBufferAndTryParse(data []byte, ke *bpf.AgentKernEvt) bool {
addedToBuffer := false
isReq, _ := isReq(c, ke)
if isReq {
c.reqStreamBuffer.Add(ke.Seq, data, ke.Ts)
addedToBuffer = c.reqStreamBuffer.Add(ke.Seq, data, ke.Ts)
} else {
c.respStreamBuffer.Add(ke.Seq, data, ke.Ts)
addedToBuffer = c.respStreamBuffer.Add(ke.Seq, data, ke.Ts)
}
if !addedToBuffer {
return false
}
reqSteamMessageType := protocol.Request
if c.Role == bpf.AgentEndpointRoleTKRoleUnknown {
Expand All @@ -392,6 +406,7 @@ func (c *Connection4) addDataToBufferAndTryParse(data []byte, ke *bpf.AgentKernE
}
c.parseStreamBuffer(c.reqStreamBuffer, reqSteamMessageType, &c.ReqQueue, ke)
c.parseStreamBuffer(c.respStreamBuffer, respSteamMessageType, &c.RespQueue, ke)
return true
}
func (c *Connection4) OnSslDataEvent(data []byte, event *bpf.SslData, recordChannel chan RecordWithConn) {
if len(data) > 0 {
Expand All @@ -413,20 +428,24 @@ func (c *Connection4) OnSslDataEvent(data []byte, event *bpf.SslData, recordChan
}
}
}
func (c *Connection4) OnSyscallEvent(data []byte, event *bpf.SyscallEventData, recordChannel chan RecordWithConn) {
func (c *Connection4) OnSyscallEvent(data []byte, event *bpf.SyscallEventData, recordChannel chan RecordWithConn) bool {
addedToBuffer := true
if len(data) > 0 {
if c.ssl {
if common.ConntrackLog.Level >= logrus.WarnLevel {
common.ConntrackLog.Warnf("%s is ssl, but receive syscall event with data!", c.ToString())
}
} else {
c.addDataToBufferAndTryParse(data, &event.SyscallEvent.Ke)
addedToBuffer = c.addDataToBufferAndTryParse(data, &event.SyscallEvent.Ke)
}
} else if event.SyscallEvent.GetSourceFunction() == bpf.AgentSourceFunctionTKSyscallSendfile {
// sendfile has no data, so we need to fill a fake data
common.ConntrackLog.Errorln("sendfile has no data, so we need to fill a fake data")
fakeData := make([]byte, event.SyscallEvent.Ke.Len)
c.addDataToBufferAndTryParse(fakeData, &event.SyscallEvent.Ke)
addedToBuffer = c.addDataToBufferAndTryParse(fakeData, &event.SyscallEvent.Ke)
}
if !addedToBuffer {
return false
}
c.StreamEvents.AddSyscallEvent(event)

Expand All @@ -441,6 +460,7 @@ func (c *Connection4) OnSyscallEvent(data []byte, event *bpf.SyscallEventData, r
recordChannel <- RecordWithConn{record, c}
}
}
return true
}

func (c *Connection4) parseStreamBuffer(streamBuffer *buffer.StreamBuffer, messageType protocol.MessageType, resultQueue *[]protocol.ParsedMessage, ke *bpf.AgentKernEvt) {
Expand Down
Loading

0 comments on commit 787e3eb

Please sign in to comment.