diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index ec164bba9c3..ab28248434a 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -1335,6 +1335,50 @@ spec: type: object status: properties: + capturedPacket: + properties: + dstIP: + type: string + ipHeader: + properties: + flags: + type: integer + protocol: + type: integer + ttl: + type: integer + type: object + ipv6Header: + properties: + hopLimit: + type: integer + nextHeader: + type: integer + type: object + length: + type: integer + srcIP: + type: string + transportHeader: + properties: + tcp: + properties: + dstPort: + type: integer + flags: + type: integer + srcPort: + type: integer + type: object + udp: + properties: + dstPort: + type: integer + srcPort: + type: integer + type: object + type: object + type: object dataplaneTag: type: integer phase: diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index e05beae5bc9..ba8ea0b439b 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -1335,6 +1335,50 @@ spec: type: object status: properties: + capturedPacket: + properties: + dstIP: + type: string + ipHeader: + properties: + flags: + type: integer + protocol: + type: integer + ttl: + type: integer + type: object + ipv6Header: + properties: + hopLimit: + type: integer + nextHeader: + type: integer + type: object + length: + type: integer + srcIP: + type: string + transportHeader: + properties: + tcp: + properties: + dstPort: + type: integer + flags: + type: integer + srcPort: + type: integer + type: object + udp: + properties: + dstPort: + type: integer + srcPort: + type: integer + type: object + type: object + type: object dataplaneTag: type: integer phase: diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index a6e66f1a84a..3fcb516cc41 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -1335,6 +1335,50 @@ spec: type: object status: properties: + capturedPacket: + properties: + dstIP: + type: string + ipHeader: + properties: + flags: + type: integer + protocol: + type: integer + ttl: + type: integer + type: object + ipv6Header: + properties: + hopLimit: + type: integer + nextHeader: + type: integer + type: object + length: + type: integer + srcIP: + type: string + transportHeader: + properties: + tcp: + properties: + dstPort: + type: integer + flags: + type: integer + srcPort: + type: integer + type: object + udp: + properties: + dstPort: + type: integer + srcPort: + type: integer + type: object + type: object + type: object dataplaneTag: type: integer phase: diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index bcec31cb86b..ccf2de6c5f0 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -1335,6 +1335,50 @@ spec: type: object status: properties: + capturedPacket: + properties: + dstIP: + type: string + ipHeader: + properties: + flags: + type: integer + protocol: + type: integer + ttl: + type: integer + type: object + ipv6Header: + properties: + hopLimit: + type: integer + nextHeader: + type: integer + type: object + length: + type: integer + srcIP: + type: string + transportHeader: + properties: + tcp: + properties: + dstPort: + type: integer + flags: + type: integer + srcPort: + type: integer + type: object + udp: + properties: + dstPort: + type: integer + srcPort: + type: integer + type: object + type: object + type: object dataplaneTag: type: integer phase: diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 061fa6c5ad2..82663c0acbe 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -1335,6 +1335,50 @@ spec: type: object status: properties: + capturedPacket: + properties: + dstIP: + type: string + ipHeader: + properties: + flags: + type: integer + protocol: + type: integer + ttl: + type: integer + type: object + ipv6Header: + properties: + hopLimit: + type: integer + nextHeader: + type: integer + type: object + length: + type: integer + srcIP: + type: string + transportHeader: + properties: + tcp: + properties: + dstPort: + type: integer + flags: + type: integer + srcPort: + type: integer + type: object + udp: + properties: + dstPort: + type: integer + srcPort: + type: integer + type: object + type: object + type: object dataplaneTag: type: integer phase: diff --git a/build/yamls/base/crds.yml b/build/yamls/base/crds.yml index 78b2e1f277e..11ef27d1b2d 100644 --- a/build/yamls/base/crds.yml +++ b/build/yamls/base/crds.yml @@ -219,6 +219,51 @@ spec: type: string tunnelDstIP: type: string + capturedPacket: + properties: + srcIP: + type: string + dstIP: + type: string + length: + type: integer + ipHeader: + properties: + flags: + type: integer + protocol: + type: integer + ttl: + type: integer + type: object + ipv6Header: + properties: + hopLimit: + type: integer + nextHeader: + type: integer + type: object + transportHeader: + properties: + tcp: + properties: + dstPort: + type: integer + srcPort: + type: integer + flags: + type: integer + type: object + udp: + properties: + dstPort: + type: integer + srcPort: + type: integer + type: object + # ICMP echo is not supported. + type: object + type: object subresources: status: {} scope: Cluster diff --git a/pkg/agent/controller/networkpolicy/packetin.go b/pkg/agent/controller/networkpolicy/packetin.go index bde21dd2fa1..7332d90abbb 100644 --- a/pkg/agent/controller/networkpolicy/packetin.go +++ b/pkg/agent/controller/networkpolicy/packetin.go @@ -23,7 +23,6 @@ import ( "github.com/contiv/libOpenflow/openflow13" "github.com/contiv/libOpenflow/protocol" - "github.com/contiv/libOpenflow/util" "github.com/contiv/ofnet/ofctrl" "gopkg.in/natefinch/lumberjack.v2" "k8s.io/klog" @@ -282,7 +281,7 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error { if prot == protocol.Type_TCP { // Get TCP data. - oriTCPSrcPort, oriTCPDstPort, oriTCPSeqNum, _, err := getTCPHeaderData(pktIn.Data.Data) + oriTCPSrcPort, oriTCPDstPort, oriTCPSeqNum, _, _, err := binding.GetTCPHeaderData(pktIn.Data.Data) if err != nil { return err } @@ -330,26 +329,3 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error { true) } } - -// getTCPHeaderData gets TCP header data from IP packet. -func getTCPHeaderData(ipPkt util.Message) (tcpSrcPort uint16, tcpDstPort uint16, tcpSeqNum uint32, tcpAckNum uint32, err error) { - var tcpBytes []byte - - // Transfer Buffer to TCP - switch typedIPPkt := ipPkt.(type) { - case *protocol.IPv4: - tcpBytes, err = typedIPPkt.Data.(*util.Buffer).MarshalBinary() - case *protocol.IPv6: - tcpBytes, err = typedIPPkt.Data.(*util.Buffer).MarshalBinary() - } - if err != nil { - return 0, 0, 0, 0, err - } - tcpIn := new(protocol.TCP) - err = tcpIn.UnmarshalBinary(tcpBytes) - if err != nil { - return 0, 0, 0, 0, err - } - - return tcpIn.PortSrc, tcpIn.PortDst, tcpIn.SeqNum, tcpIn.AckNum, nil -} diff --git a/pkg/agent/controller/networkpolicy/packetin_test.go b/pkg/agent/controller/networkpolicy/packetin_test.go index 3ae7bf864b9..1d0c3f4394a 100644 --- a/pkg/agent/controller/networkpolicy/packetin_test.go +++ b/pkg/agent/controller/networkpolicy/packetin_test.go @@ -63,55 +63,3 @@ func TestGetPacketInfo(t *testing.T) { }) } } - -func TestGetTCPHeaderData(t *testing.T) { - type args struct { - tcp protocol.TCP - expectTCPSrcPort uint16 - expectTCPDstPort uint16 - expectTCPSeqNum uint32 - expectTCPAckNum uint32 - } - tests := []struct { - name string - args args - wantErr bool - }{ - { - name: "ipv4", - args: args{ - tcp: protocol.TCP{ - PortSrc: 1080, - PortDst: 80, - SeqNum: 0, - AckNum: 0, - }, - expectTCPSrcPort: 1080, - expectTCPDstPort: 80, - expectTCPSeqNum: 0, - expectTCPAckNum: 0, - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tcp := tt.args.tcp - pktIn := new(protocol.IPv4) - bytes, _ := tcp.MarshalBinary() - bf := new(util.Buffer) - bf.UnmarshalBinary(bytes) - pktIn.Data = bf - - tcpSrcPort, tcpDstPort, tcpSeqNum, tcpAckNum, err := getTCPHeaderData(pktIn) - - if (err != nil) != tt.wantErr { - t.Errorf("getPacketInfo() error = %v, wantErr %v", err, tt.wantErr) - } - assert.Equal(t, tt.args.expectTCPSrcPort, tcpSrcPort, "Expect to retrieve exact TCP src port while differed") - assert.Equal(t, tt.args.expectTCPDstPort, tcpDstPort, "Expect to retrieve exact TCP dst port while differed") - assert.Equal(t, tt.args.expectTCPSeqNum, tcpSeqNum, "Expect to retrieve exact TCP seq num while differed") - assert.Equal(t, tt.args.expectTCPAckNum, tcpAckNum, "Expect to retrieve exact TCP ack num while differed") - }) - } -} diff --git a/pkg/agent/controller/traceflow/packetin.go b/pkg/agent/controller/traceflow/packetin.go index b294ba3b636..ae0cfcbbee2 100644 --- a/pkg/agent/controller/traceflow/packetin.go +++ b/pkg/agent/controller/traceflow/packetin.go @@ -38,7 +38,7 @@ func (c *Controller) HandlePacketIn(pktIn *ofctrl.PacketIn) error { if !c.traceflowListerSynced() { return errors.New("traceflow controller is not started") } - oldTf, nodeResult, err := c.parsePacketIn(pktIn) + oldTf, nodeResult, packet, err := c.parsePacketIn(pktIn) if err != nil { klog.Errorf("parsePacketIn error: %+v", err) return err @@ -53,12 +53,13 @@ func (c *Controller) HandlePacketIn(pktIn *ofctrl.PacketIn) error { } update := tf.DeepCopy() update.Status.Results = append(update.Status.Results, *nodeResult) + update.Status.CapturedPacket = packet _, err = c.traceflowClient.CrdV1alpha1().Traceflows().UpdateStatus(context.TODO(), update, v1.UpdateOptions{}) if err != nil { klog.Warningf("Update traceflow failed: %+v", err) return err } - klog.Infof("Updated traceflow %s: %+v", tf.Name, nodeResult) + klog.Infof("Updated traceflow %s: %+v", tf.Name, update.Status) return nil }) if err != nil { @@ -67,7 +68,7 @@ func (c *Controller) HandlePacketIn(pktIn *ofctrl.PacketIn) error { return err } -func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Traceflow, *crdv1alpha1.NodeResult, error) { +func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Traceflow, *crdv1alpha1.NodeResult, *crdv1alpha1.Packet, error) { matchers := pktIn.GetMatches() var match *ofctrl.MatchField @@ -77,17 +78,17 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl if pktIn.Data.Ethertype == protocol.IPv4_MSG { ipPacket, ok := pktIn.Data.Data.(*protocol.IPv4) if !ok { - return nil, nil, errors.New("invalid traceflow IPv4 packet") + return nil, nil, nil, errors.New("invalid traceflow IPv4 packet") } tag = ipPacket.DSCP } else if pktIn.Data.Ethertype == protocol.IPv6_MSG { ipv6Packet, ok := pktIn.Data.Data.(*protocol.IPv6) if !ok { - return nil, nil, errors.New("invalid traceflow IPv6 packet") + return nil, nil, nil, errors.New("invalid traceflow IPv6 packet") } tag = ipv6Packet.TrafficClass >> 2 } else { - return nil, nil, fmt.Errorf("unsupported traceflow packet Ethertype: %d", pktIn.Data.Ethertype) + return nil, nil, nil, fmt.Errorf("unsupported traceflow packet Ethertype: %d", pktIn.Data.Ethertype) } firstPacket := false @@ -99,18 +100,22 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl } c.runningTraceflowsMutex.RUnlock() if !exists { - return nil, nil, fmt.Errorf("Traceflow for dataplane tag %d not found in cache", pktIn.Data.Ethertype) + return nil, nil, nil, fmt.Errorf("Traceflow for dataplane tag %d not found in cache", pktIn.Data.Ethertype) } + var capturedPacket *crdv1alpha1.Packet if tfState.liveTraffic && firstPacket { // Uninstall the OVS flows after receiving the first packet, to // avoid capturing too many matched packets. c.ofClient.UninstallTraceflowFlows(tag) + if tfState.isSender { + capturedPacket = parseCapturedPacket(pktIn) + } } tf, err := c.traceflowLister.Get(tfState.name) if err != nil { - return nil, nil, fmt.Errorf("failed to get Traceflow %s CRD: %v", tfState.name, err) + return nil, nil, nil, fmt.Errorf("failed to get Traceflow %s CRD: %v", tfState.name, err) } obs := []crdv1alpha1.Observation{} @@ -135,25 +140,25 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl case protocol.IPv4_MSG: ipPacket, ok := pktIn.Data.Data.(*protocol.IPv4) if !ok { - return nil, nil, errors.New("invalid traceflow IPv4 packet") + return nil, nil, nil, errors.New("invalid traceflow IPv4 packet") } ctNwDst, err = getCTDstValue(matchers, false) if err != nil { - return nil, nil, err + return nil, nil, nil, err } ipDst = ipPacket.NWDst.String() case protocol.IPv6_MSG: ipPacket, ok := pktIn.Data.Data.(*protocol.IPv6) if !ok { - return nil, nil, errors.New("invalid traceflow IPv6 packet") + return nil, nil, nil, errors.New("invalid traceflow IPv6 packet") } ctNwDst, err = getCTDstValue(matchers, true) if err != nil { - return nil, nil, err + return nil, nil, nil, err } ipDst = ipPacket.NWDst.String() default: - return nil, nil, fmt.Errorf("unsupported traceflow packet ether type %d", pktIn.Data.Ethertype) + return nil, nil, nil, fmt.Errorf("unsupported traceflow packet ether type %d", pktIn.Data.Ethertype) } if isValidCtNw(ctNwDst) && ipDst != ctNwDst { ob := &crdv1alpha1.Observation{ @@ -168,7 +173,7 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl if match = getMatchRegField(matchers, uint32(openflow.EgressReg)); match != nil { egressInfo, err := getRegValue(match, nil) if err != nil { - return nil, nil, err + return nil, nil, nil, err } ob := getNetworkPolicyObservation(tableID, false) npRef := c.networkPolicyQuerier.GetNetworkPolicyByRuleFlowID(egressInfo) @@ -182,7 +187,7 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl if match = getMatchRegField(matchers, uint32(openflow.IngressReg)); match != nil { ingressInfo, err := getRegValue(match, nil) if err != nil { - return nil, nil, err + return nil, nil, nil, err } ob := getNetworkPolicyObservation(tableID, true) npRef := c.networkPolicyQuerier.GetNetworkPolicyByRuleFlowID(ingressInfo) @@ -198,7 +203,7 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl if match = getMatchRegField(matchers, uint32(openflow.CNPDenyConjIDReg)); match != nil { notAllowConjInfo, err := getRegValue(match, nil) if err != nil { - return nil, nil, err + return nil, nil, nil, err } npRef := c.networkPolicyQuerier.GetNetworkPolicyByRuleFlowID(notAllowConjInfo) if npRef != nil { @@ -219,14 +224,14 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl if match = getMatchTunnelDstField(matchers, isIPv6); match != nil { tunnelDstIP, err = getTunnelDstValue(match) if err != nil { - return nil, nil, err + return nil, nil, nil, err } } var outputPort uint32 if match = getMatchRegField(matchers, uint32(openflow.PortCacheReg)); match != nil { outputPort, err = getRegValue(match, nil) if err != nil { - return nil, nil, err + return nil, nil, nil, err } } gatewayIP := c.nodeConfig.GatewayConfig.IPv4 @@ -252,7 +257,7 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl } nodeResult := crdv1alpha1.NodeResult{Node: c.nodeConfig.Name, Timestamp: time.Now().Unix(), Observations: obs} - return tf, &nodeResult, nil + return tf, &nodeResult, capturedPacket, nil } func getMatchRegField(matchers *ofctrl.Matchers, regNum uint32) *ofctrl.MatchField { @@ -341,3 +346,22 @@ func isValidCtNw(ipStr string) bool { } return true } + +func parseCapturedPacket(pktIn *ofctrl.PacketIn) *crdv1alpha1.Packet { + pkt, _ := binding.ParsePacketIn(pktIn) + capturedPacket := crdv1alpha1.Packet{SrcIP: pkt.SourceIP.String(), DstIP: pkt.DestinationIP.String(), Length: pkt.IPLength} + if pkt.IsIPv6 { + ipProto := int32(pkt.IPProto) + capturedPacket.IPv6Header = &crdv1alpha1.IPv6Header{NextHeader: &ipProto, HopLimit: int32(pkt.TTL)} + } else { + capturedPacket.IPHeader.Protocol = int32(pkt.IPProto) + capturedPacket.IPHeader.TTL = int32(pkt.TTL) + capturedPacket.IPHeader.Flags = int32(pkt.IPFlags) + } + if pkt.IPProto == protocol.Type_TCP { + capturedPacket.TransportHeader.TCP = &crdv1alpha1.TCPHeader{SrcPort: int32(pkt.SourcePort), DstPort: int32(pkt.DestinationPort), Flags: int32(pkt.TCPFlags)} + } else if pkt.IPProto == protocol.Type_UDP { + capturedPacket.TransportHeader.UDP = &crdv1alpha1.UDPHeader{SrcPort: int32(pkt.SourcePort), DstPort: int32(pkt.DestinationPort)} + } + return &capturedPacket +} diff --git a/pkg/agent/controller/traceflow/packetin_test.go b/pkg/agent/controller/traceflow/packetin_test.go index 5a9f8779638..41fedb4c755 100644 --- a/pkg/agent/controller/traceflow/packetin_test.go +++ b/pkg/agent/controller/traceflow/packetin_test.go @@ -15,9 +15,15 @@ package traceflow import ( + "net" "reflect" "testing" + "github.com/contiv/libOpenflow/protocol" + "github.com/contiv/libOpenflow/util" + "github.com/contiv/ofnet/ofctrl" + "github.com/stretchr/testify/assert" + "github.com/vmware-tanzu/antrea/pkg/agent/openflow" crdv1alpha1 "github.com/vmware-tanzu/antrea/pkg/apis/crd/v1alpha1" ) @@ -89,3 +95,67 @@ func Test_getNetworkPolicyObservation(t *testing.T) { }) } } + +func TestParseCapturedPacket(t *testing.T) { + srcIPv4 := net.ParseIP("10.1.1.11") + dstIPv4 := net.ParseIP("10.1.1.12") + srcIPv6 := net.ParseIP("fd12:ab:34:a001::11") + dstIPv6 := net.ParseIP("fd12:ab:34:a001::12") + + tcpPktIn := protocol.IPv4{Length: 1000, Flags: 1, TTL: 64, NWSrc: srcIPv4, NWDst: dstIPv4, Protocol: protocol.Type_TCP} + tcp := protocol.TCP{PortSrc: 1080, PortDst: 80, SeqNum: 1234, Code: 2} + bytes, _ := tcp.MarshalBinary() + bf := new(util.Buffer) + bf.UnmarshalBinary(bytes) + tcpPktIn.Data = bf + tcpPktCap := crdv1alpha1.Packet{ + SrcIP: tcpPktIn.NWSrc.String(), DstIP: tcpPktIn.NWDst.String(), Length: tcpPktIn.Length, + IPHeader: crdv1alpha1.IPHeader{Protocol: int32(tcpPktIn.Protocol), TTL: int32(tcpPktIn.TTL), Flags: int32(tcpPktIn.Flags)}, + TransportHeader: crdv1alpha1.TransportHeader{ + TCP: &crdv1alpha1.TCPHeader{SrcPort: int32(tcp.PortSrc), DstPort: int32(tcp.PortDst), Flags: int32(tcp.Code)}, + }, + } + + udpPktIn := protocol.IPv4{Length: 50, Flags: 0, TTL: 128, NWSrc: srcIPv4, NWDst: dstIPv4, Protocol: protocol.Type_UDP} + udp := protocol.UDP{PortSrc: 1080, PortDst: 80} + bytes, _ = udp.MarshalBinary() + bf = new(util.Buffer) + bf.UnmarshalBinary(bytes) + udpPktIn.Data = bf + udpPktCap := crdv1alpha1.Packet{ + SrcIP: udpPktIn.NWSrc.String(), DstIP: udpPktIn.NWDst.String(), Length: udpPktIn.Length, + IPHeader: crdv1alpha1.IPHeader{Protocol: int32(udpPktIn.Protocol), TTL: int32(udpPktIn.TTL), Flags: int32(udpPktIn.Flags)}, + TransportHeader: crdv1alpha1.TransportHeader{ + UDP: &crdv1alpha1.UDPHeader{SrcPort: int32(udp.PortSrc), DstPort: int32(udp.PortDst)}, + }, + } + + icmpv6PktIn := protocol.IPv6{Length: 960, HopLimit: 8, NWSrc: srcIPv6, NWDst: dstIPv6, NextHeader: protocol.Type_IPv6ICMP} + nextHdr := int32(icmpv6PktIn.NextHeader) + icmpv6PktCap := crdv1alpha1.Packet{ + SrcIP: icmpv6PktIn.NWSrc.String(), DstIP: icmpv6PktIn.NWDst.String(), Length: icmpv6PktIn.Length + 40, + IPv6Header: &crdv1alpha1.IPv6Header{NextHeader: &nextHdr, HopLimit: int32(icmpv6PktIn.HopLimit)}, + } + + tests := []struct { + name string + pktInData util.Message + pktCap *crdv1alpha1.Packet + isIPv6 bool + }{ + {"tcp", &tcpPktIn, &tcpPktCap, false}, + {"udp", &udpPktIn, &udpPktCap, false}, + {"icmpv6", &icmpv6PktIn, &icmpv6PktCap, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ethType := uint16(protocol.IPv4_MSG) + if tt.isIPv6 { + ethType = uint16(protocol.IPv6_MSG) + } + pktIn := ofctrl.PacketIn{Data: protocol.Ethernet{Ethertype: ethType, Data: tt.pktInData}} + packet := parseCapturedPacket(&pktIn) + assert.True(t, reflect.DeepEqual(packet, tt.pktCap), "parsed packet does not match the expected") + }) + } +} diff --git a/pkg/antctl/raw/traceflow/command.go b/pkg/antctl/raw/traceflow/command.go index d00d0cbddf7..d9163009f64 100644 --- a/pkg/antctl/raw/traceflow/command.go +++ b/pkg/antctl/raw/traceflow/command.go @@ -60,13 +60,23 @@ var protocols = map[string]int32{ "udp": 17, } +type CapturedPacket struct { + SrcIP string `json:"srcIP" yaml:"srcIP"` + DstIP string `json:"dstIP" yaml:"dstIP"` + Length uint16 `json:"length" yaml:"length"` + IPHeader *v1alpha1.IPHeader `json:"ipHeader,omitempty" yaml:"ipHeader,omitempty"` + IPv6Header *v1alpha1.IPv6Header `json:"ipv6Header,omitempty" yaml:"ipv6Header,omitempty"` + TransportHeader *v1alpha1.TransportHeader `json:"transportHeader,omitempty" yaml:"tranportHeader,omitempty"` +} + // Response is the response of antctl Traceflow. type Response struct { - Name string `json:"name" yaml:"name"` // Traceflow name - Phase v1alpha1.TraceflowPhase `json:"phase,omitempty" yaml:"phase,omitempty"` // Traceflow phase - Source string `json:"source,omitempty" yaml:"source,omitempty"` // Traceflow source, e.g. "default/pod0" - Destination string `json:"destination,omitempty" yaml:"destination,omitempty"` // Traceflow destination, e.g. "default/pod1" - NodeResults []v1alpha1.NodeResult `json:"results,omitempty" yaml:"results,omitempty"` // Traceflow node results + Name string `json:"name" yaml:"name"` // Traceflow name + Phase v1alpha1.TraceflowPhase `json:"phase,omitempty" yaml:"phase,omitempty"` // Traceflow phase + Source string `json:"source,omitempty" yaml:"source,omitempty"` // Traceflow source, e.g. "default/pod0" + Destination string `json:"destination,omitempty" yaml:"destination,omitempty"` // Traceflow destination, e.g. "default/pod1" + NodeResults []v1alpha1.NodeResult `json:"results,omitempty" yaml:"results,omitempty"` // Traceflow node results + CapturedPacket *CapturedPacket `json:"capturedPacket,omitempty" yaml:"capturedPacket,omitempty"` // Captured packet in live-traffic Traceflow } func init() { @@ -357,6 +367,18 @@ func output(tf *v1alpha1.Traceflow) error { } else if len(tf.Spec.Destination.Service) != 0 { r.Destination = fmt.Sprintf("%s/%s", tf.Spec.Destination.Namespace, tf.Spec.Destination.Service) } + + pkt := tf.Status.CapturedPacket + if pkt != nil { + r.CapturedPacket = &CapturedPacket{SrcIP: pkt.SrcIP, DstIP: pkt.DstIP, Length: pkt.Length, IPv6Header: pkt.IPv6Header} + if pkt.IPv6Header == nil { + r.CapturedPacket.IPHeader = &pkt.IPHeader + } + if pkt.TransportHeader.TCP != nil || pkt.TransportHeader.UDP != nil || pkt.TransportHeader.ICMP != nil { + r.CapturedPacket.TransportHeader = &pkt.TransportHeader + } + } + if option.outputType == "json" { if err := jsonOutput(&r); err != nil { return fmt.Errorf("error when converting output to json: %w", err) diff --git a/pkg/apis/crd/v1alpha1/types.go b/pkg/apis/crd/v1alpha1/types.go index ed1d49c1917..935a68880a5 100644 --- a/pkg/apis/crd/v1alpha1/types.go +++ b/pkg/apis/crd/v1alpha1/types.go @@ -143,30 +143,30 @@ type Destination struct { // IPHeader describes spec of an IPv4 header. type IPHeader struct { // SrcIP is the source IP. - SrcIP string `json:"srcIP,omitempty"` + SrcIP string `json:"srcIP,omitempty" yaml:"srcIP,omitempty"` // Protocol is the IP protocol. - Protocol int32 `json:"protocol,omitempty"` + Protocol int32 `json:"protocol,omitempty" yaml:"protocol,omitempty"` // TTL is the IP TTL. - TTL int32 `json:"ttl,omitempty"` + TTL int32 `json:"ttl,omitempty" yaml:"ttl,omitempty"` // Flags is the flags for IP. - Flags int32 `json:"flags,omitempty"` + Flags int32 `json:"flags,omitempty" yaml:"flags,omitempty"` } // IPv6Header describes spec of an IPv6 header. type IPv6Header struct { // SrcIP is the source IPv6. - SrcIP string `json:"srcIP,omitempty"` + SrcIP string `json:"srcIP,omitempty" yaml:"srcIP,omitempty"` // NextHeader is the IPv6 protocol. - NextHeader *int32 `json:"nextHeader,omitempty"` + NextHeader *int32 `json:"nextHeader,omitempty" yaml:"nextHeader,omitempty"` // HopLimit is the IPv6 Hop Limit. - HopLimit int32 `json:"hopLimit,omitempty"` + HopLimit int32 `json:"hopLimit,omitempty" yaml:"hopLimit,omitempty"` } // TransportHeader describes spec of a TransportHeader. type TransportHeader struct { - ICMP *ICMPEchoRequestHeader `json:"icmp,omitempty"` - UDP *UDPHeader `json:"udp,omitempty"` - TCP *TCPHeader `json:"tcp,omitempty"` + ICMP *ICMPEchoRequestHeader `json:"icmp,omitempty" yaml:"icmp,omitempty"` + UDP *UDPHeader `json:"udp,omitempty" yaml:"udp,omitempty"` + TCP *TCPHeader `json:"tcp,omitempty" yaml:"tcp,omitempty"` } // ICMPEchoRequestHeader describes spec of an ICMP echo request header. @@ -197,6 +197,10 @@ type TCPHeader struct { // Packet includes header info. type Packet struct { + SrcIP string `json:"srcIP,omitempty"` + DstIP string `json:"dstIP,omitempty"` + // Length is the IP packet length (includes the IPv4 or IPv6 header length). + Length uint16 `json:"length,omitempty"` // TODO: change type IPHeader to *IPHeader and correct all internal references IPHeader IPHeader `json:"ipHeader,omitempty"` IPv6Header *IPv6Header `json:"ipv6Header,omitempty"` @@ -213,6 +217,8 @@ type TraceflowStatus struct { DataplaneTag uint8 `json:"dataplaneTag,omitempty"` // Results is the collection of all observations on different nodes. Results []NodeResult `json:"results,omitempty"` + // CapturedPacket is the captured packet in live-traffic Traceflow. + CapturedPacket *Packet `json:"capturedPacket,omitempty"` } type NodeResult struct { diff --git a/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go index 7cc82efef09..9da75781396 100644 --- a/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go @@ -696,6 +696,11 @@ func (in *TraceflowStatus) DeepCopyInto(out *TraceflowStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.CapturedPacket != nil { + in, out := &in.CapturedPacket, &out.CapturedPacket + *out = new(Packet) + (*in).DeepCopyInto(*out) + } return } diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index b4519b51da8..4a9a6e25866 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -370,6 +370,7 @@ type Packet struct { SourceMAC net.HardwareAddr DestinationIP net.IP SourceIP net.IP + IPLength uint16 IPProto uint8 IPFlags uint16 TTL uint8 diff --git a/pkg/ovs/openflow/ofctrl_packetin.go b/pkg/ovs/openflow/ofctrl_packetin.go new file mode 100644 index 00000000000..21a6963b08c --- /dev/null +++ b/pkg/ovs/openflow/ofctrl_packetin.go @@ -0,0 +1,108 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package openflow + +import ( + "github.com/contiv/libOpenflow/protocol" + "github.com/contiv/libOpenflow/util" + "github.com/contiv/ofnet/ofctrl" +) + +// GetTCPHeaderData gets TCP header data from IP packet. +func GetTCPHeaderData(ipPkt util.Message) (tcpSrcPort uint16, tcpDstPort uint16, tcpSeqNum uint32, tcpAckNum uint32, tcpFlags uint8, err error) { + var tcpBytes []byte + + // Transfer Buffer to TCP + switch typedIPPkt := ipPkt.(type) { + case *protocol.IPv4: + tcpBytes, err = typedIPPkt.Data.(*util.Buffer).MarshalBinary() + case *protocol.IPv6: + tcpBytes, err = typedIPPkt.Data.(*util.Buffer).MarshalBinary() + } + if err != nil { + return 0, 0, 0, 0, 0, err + } + tcpIn := new(protocol.TCP) + err = tcpIn.UnmarshalBinary(tcpBytes) + if err != nil { + return 0, 0, 0, 0, 0, err + } + + return tcpIn.PortSrc, tcpIn.PortDst, tcpIn.SeqNum, tcpIn.AckNum, tcpIn.Code, nil +} + +func getUDPHeaderData(ipPkt util.Message) (udpSrcPort uint16, udpDstPort uint16, err error) { + var udpBytes []byte + + switch typedIPPkt := ipPkt.(type) { + case *protocol.IPv4: + udpBytes, err = typedIPPkt.Data.(*util.Buffer).MarshalBinary() + case *protocol.IPv6: + udpBytes, err = typedIPPkt.Data.(*util.Buffer).MarshalBinary() + } + if err != nil { + return 0, 0, err + } + udpIn := new(protocol.UDP) + err = udpIn.UnmarshalBinary(udpBytes) + if err != nil { + return 0, 0, err + } + + return udpIn.PortSrc, udpIn.PortDst, nil +} + +func ParsePacketIn(pktIn *ofctrl.PacketIn) (*Packet, error) { + packet := Packet{} + packet.DestinationMAC = pktIn.Data.HWDst + packet.SourceMAC = pktIn.Data.HWSrc + + if pktIn.Data.Ethertype == protocol.IPv4_MSG { + ipPkt := pktIn.Data.Data.(*protocol.IPv4) + packet.DestinationIP = ipPkt.NWDst + packet.SourceIP = ipPkt.NWSrc + packet.TTL = ipPkt.TTL + packet.IPProto = ipPkt.Protocol + packet.IPFlags = ipPkt.Flags + packet.IPLength = ipPkt.Length + } else if pktIn.Data.Ethertype == protocol.IPv6_MSG { + ipPkt := pktIn.Data.Data.(*protocol.IPv6) + packet.DestinationIP = ipPkt.NWDst + packet.SourceIP = ipPkt.NWSrc + packet.TTL = ipPkt.HopLimit + packet.IPProto = ipPkt.NextHeader + // IPv6 header includes only playload length. Add 40 to count in + // the IPv6 header length. + packet.IPLength = ipPkt.Length + 40 + packet.IsIPv6 = true + } else { + // Not an IP packet. + return &packet, nil + } + + var err error + if packet.IPProto == protocol.Type_TCP { + packet.SourcePort, packet.DestinationPort, _, _, packet.TCPFlags, err = GetTCPHeaderData(pktIn.Data.Data) + if err != nil { + return nil, err + } + } else if packet.IPProto == protocol.Type_UDP { + packet.SourcePort, packet.DestinationPort, err = getUDPHeaderData(pktIn.Data.Data) + if err != nil { + return nil, err + } + } + return &packet, nil +} diff --git a/pkg/ovs/openflow/ofctrl_packetin_test.go b/pkg/ovs/openflow/ofctrl_packetin_test.go new file mode 100644 index 00000000000..9303a87c006 --- /dev/null +++ b/pkg/ovs/openflow/ofctrl_packetin_test.go @@ -0,0 +1,78 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package openflow + +import ( + "testing" + + "github.com/contiv/libOpenflow/protocol" + "github.com/contiv/libOpenflow/util" + "github.com/stretchr/testify/assert" +) + +func TestGetTCPHeaderData(t *testing.T) { + type args struct { + tcp protocol.TCP + expectTCPSrcPort uint16 + expectTCPDstPort uint16 + expectTCPSeqNum uint32 + expectTCPAckNum uint32 + expectTCPCode uint8 + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "ipv4", + args: args{ + tcp: protocol.TCP{ + PortSrc: 1080, + PortDst: 80, + SeqNum: 0, + AckNum: 0, + Code: 2, + }, + expectTCPSrcPort: 1080, + expectTCPDstPort: 80, + expectTCPSeqNum: 0, + expectTCPAckNum: 0, + expectTCPCode: 2, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tcp := tt.args.tcp + pktIn := new(protocol.IPv4) + bytes, _ := tcp.MarshalBinary() + bf := new(util.Buffer) + bf.UnmarshalBinary(bytes) + pktIn.Data = bf + + tcpSrcPort, tcpDstPort, tcpSeqNum, tcpAckNum, tcpCode, err := GetTCPHeaderData(pktIn) + if (err != nil) != tt.wantErr { + t.Errorf("getPacketInfo() error = %v, wantErr %v", err, tt.wantErr) + } + assert.Equal(t, tt.args.expectTCPSrcPort, tcpSrcPort, "Expect to retrieve exact TCP src port while differed") + assert.Equal(t, tt.args.expectTCPDstPort, tcpDstPort, "Expect to retrieve exact TCP dst port while differed") + assert.Equal(t, tt.args.expectTCPSeqNum, tcpSeqNum, "Expect to retrieve exact TCP seq num while differed") + assert.Equal(t, tt.args.expectTCPAckNum, tcpAckNum, "Expect to retrieve exact TCP ack num while differed") + assert.Equal(t, tt.args.expectTCPCode, tcpCode, "Expect to retrieve exact TCP code while differed") + }) + } +} diff --git a/test/e2e/traceflow_test.go b/test/e2e/traceflow_test.go index 7f9ba452bd7..7201029425d 100644 --- a/test/e2e/traceflow_test.go +++ b/test/e2e/traceflow_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "net" + "reflect" "strings" "testing" "time" @@ -40,6 +41,7 @@ type testcase struct { tf *v1alpha1.Traceflow expectedPhase v1alpha1.TraceflowPhase expectedResults []v1alpha1.NodeResult + expectedPktCap *v1alpha1.Packet // required IP version, skip if not match, default is 0 (no restrict) ipVersion int } @@ -516,6 +518,12 @@ func TestTraceflowIntraNode(t *testing.T) { }, }, }, + expectedPktCap: &v1alpha1.Packet{ + SrcIP: node1IPs[0].ipv4.String(), + DstIP: dstPodIPv4Str, + Length: 98, // default ping packet length. + IPHeader: v1alpha1.IPHeader{Protocol: 1, TTL: 64}, + }, }, { name: "intraNodeTraceflowIPv6", @@ -1906,4 +1914,7 @@ func runTestTraceflow(t *testing.T, data *TestData, tc testcase) { } } } + if tc.expectedPktCap != nil && !reflect.DeepEqual(tc.expectedPktCap, tf.Status.CapturedPacket) { + t.Fatal(fmt.Errorf("captured packet should be: %+v, but got: %+v", tc.expectedPktCap, tf.Status.CapturedPacket)) + } }