From 477b13ddfed098f576c199ce861de8a073d9efd1 Mon Sep 17 00:00:00 2001 From: GraysonWu Date: Mon, 19 Dec 2022 08:13:36 -0800 Subject: [PATCH] StretchedNetworkPolicy Antrea agent support (#3914) 1. Add implementation of Stretched NetworkPolicy in Antrea agent. 1.1 Add OVS flows with tun_id matching. 1.2 Also realize a security rule using unknown LabelIdentity for each Stretched NetworkPolicy rule. 2. Add implementation of LabelIdentity on datapath. 2.1 Pod will load its LabelIdentity in tun_id in ClassifierFlow. 2.2 Pod Classifier flow will have different cacheKey for easy update. 3. Add UT and E2E tests. Signed-off-by: graysonwu --- build/charts/antrea/README.md | 2 +- build/charts/antrea/conf/antrea-agent.conf | 3 + .../charts/antrea/conf/antrea-controller.conf | 2 +- .../antrea/templates/agent/clusterrole.yaml | 1 + build/charts/antrea/values.yaml | 2 +- build/yamls/antrea-aks.yml | 10 +- build/yamls/antrea-eks.yml | 10 +- build/yamls/antrea-gke.yml | 10 +- build/yamls/antrea-ipsec.yml | 10 +- build/yamls/antrea.yml | 10 +- ci/jenkins/test-mc.sh | 29 +- cmd/antrea-agent/agent.go | 85 ++- cmd/antrea-agent/options.go | 24 +- .../multicluster/label_identity_controller.go | 4 +- .../label_identity_controller_test.go | 2 +- multicluster/test/e2e/framework.go | 37 ++ multicluster/test/e2e/main_test.go | 6 +- multicluster/test/e2e/service_test.go | 207 +++++- pkg/agent/cniserver/pod_configuration.go | 3 +- .../cniserver/pod_configuration_windows.go | 2 +- pkg/agent/controller/networkpolicy/cache.go | 70 ++- .../controller/networkpolicy/cache_test.go | 26 + .../controller/networkpolicy/reconciler.go | 13 +- .../networkpolicy/reconciler_test.go | 24 + pkg/agent/controller/networkpolicy/reject.go | 11 +- pkg/agent/multicluster/mc_route_controller.go | 41 +- .../multicluster/mc_route_controller_test.go | 27 +- .../stretched_networkpolicy_controller.go | 345 ++++++++++ ...stretched_networkpolicy_controller_test.go | 587 ++++++++++++++++++ pkg/agent/openflow/client.go | 22 +- pkg/agent/openflow/client_test.go | 50 +- pkg/agent/openflow/fields.go | 2 +- pkg/agent/openflow/multicluster.go | 25 +- pkg/agent/openflow/network_policy.go | 20 + pkg/agent/openflow/network_policy_test.go | 42 ++ pkg/agent/openflow/pipeline.go | 14 +- pkg/agent/openflow/testing/mock_openflow.go | 24 +- pkg/agent/types/networkpolicy.go | 1 + pkg/config/agent/config.go | 2 + pkg/ovs/openflow/interfaces.go | 2 + pkg/ovs/openflow/ofctrl_action.go | 7 + pkg/ovs/openflow/ofctrl_builder.go | 7 + pkg/ovs/openflow/testing/mock_openflow.go | 28 + test/e2e/framework.go | 14 + test/e2e/utils/cnp_spec_builder.go | 23 + test/integration/agent/cniserver_test.go | 4 +- test/integration/agent/openflow_test.go | 82 ++- 47 files changed, 1763 insertions(+), 209 deletions(-) create mode 100644 pkg/agent/multicluster/stretched_networkpolicy_controller.go create mode 100644 pkg/agent/multicluster/stretched_networkpolicy_controller_test.go diff --git a/build/charts/antrea/README.md b/build/charts/antrea/README.md index 8c73368a031..d8c5bdcbd93 100644 --- a/build/charts/antrea/README.md +++ b/build/charts/antrea/README.md @@ -85,7 +85,7 @@ Kubernetes: `>= 1.16.0-0` | multicast.igmpQueryInterval | string | `"125s"` | The interval at which the antrea-agent sends IGMP queries to Pods. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". | | multicast.multicastInterfaces | list | `[]` | Names of the interfaces on Nodes that are used to forward multicast traffic. | | multicluster.enable | bool | `false` | Enable Antrea Multi-cluster Gateway to support cross-cluster traffic. This feature is supported only with encap mode. | -| multicluster.enableStretchedNetworkPolicy | bool | `false` | Enable Multicluster which allow Antrea-native policies to select peers from other clusters in a ClusterSet. This feature is supported only with encap mode when the tunnel type is Geneve. | +| multicluster.enableStretchedNetworkPolicy | bool | `false` | Enable StretchedNetworkPolicy which allow Antrea-native policies to select peers from other clusters in a ClusterSet. This feature is supported only with encap mode when the tunnel type is Geneve. | | multicluster.namespace | string | `""` | The Namespace where Antrea Multi-cluster Controller is running. The default is antrea-agent's Namespace. | | noSNAT | bool | `false` | Whether or not to SNAT (using the Node IP) the egress traffic from a Pod to the external network. | | nodeIPAM.clusterCIDRs | list | `[]` | CIDR ranges to use when allocating Pod IP addresses. | diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index a9044639b84..0a4abd40bf5 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -333,4 +333,7 @@ multicluster: # The Namespace where Antrea Multi-cluster Controller is running. # The default is antrea-agent's Namespace. namespace: {{ .namespace | quote }} +# Enable StretchedNetworkPolicy which could be enforced on cross-cluster traffic. +# This feature is supported only with encap mode when the tunnel type is Geneve. + enableStretchedNetworkPolicy: {{ .enableStretchedNetworkPolicy }} {{- end }} diff --git a/build/charts/antrea/conf/antrea-controller.conf b/build/charts/antrea/conf/antrea-controller.conf index aaf6095dd60..520b0a0c340 100644 --- a/build/charts/antrea/conf/antrea-controller.conf +++ b/build/charts/antrea/conf/antrea-controller.conf @@ -111,7 +111,7 @@ ipsecCSRSigner: multicluster: {{- with .Values.multicluster }} - # Enable Multicluster which allow Antrea-native policies to select peers + # Enable StretchedNetworkPolicy which allow Antrea-native policies to select peers # from other clusters in a ClusterSet. enableStretchedNetworkPolicy: {{ .enableStretchedNetworkPolicy }} {{- end }} diff --git a/build/charts/antrea/templates/agent/clusterrole.yaml b/build/charts/antrea/templates/agent/clusterrole.yaml index ab81673b45b..384d3d4e8ac 100644 --- a/build/charts/antrea/templates/agent/clusterrole.yaml +++ b/build/charts/antrea/templates/agent/clusterrole.yaml @@ -213,6 +213,7 @@ rules: - multicluster.crd.antrea.io resources: - clusterinfoimports + - labelidentities verbs: - get - list diff --git a/build/charts/antrea/values.yaml b/build/charts/antrea/values.yaml index 572ba01caa3..215dc8904ec 100644 --- a/build/charts/antrea/values.yaml +++ b/build/charts/antrea/values.yaml @@ -307,7 +307,7 @@ multicluster: # -- The Namespace where Antrea Multi-cluster Controller is running. # The default is antrea-agent's Namespace. namespace: "" - # -- Enable Multicluster which allow Antrea-native policies to select peers + # -- Enable StretchedNetworkPolicy which allow Antrea-native policies to select peers # from other clusters in a ClusterSet. # This feature is supported only with encap mode when the tunnel type is Geneve. enableStretchedNetworkPolicy: false diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 42d776d98ed..7f4665a275f 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -3224,6 +3224,9 @@ data: # The Namespace where Antrea Multi-cluster Controller is running. # The default is antrea-agent's Namespace. namespace: "" + # Enable StretchedNetworkPolicy which could be enforced on cross-cluster traffic. + # This feature is supported only with encap mode when the tunnel type is Geneve. + enableStretchedNetworkPolicy: false antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -3353,7 +3356,7 @@ data: selfSignedCA: true multicluster: - # Enable Multicluster which allow Antrea-native policies to select peers + # Enable StretchedNetworkPolicy which allow Antrea-native policies to select peers # from other clusters in a ClusterSet. enableStretchedNetworkPolicy: false --- @@ -3703,6 +3706,7 @@ rules: - multicluster.crd.antrea.io resources: - clusterinfoimports + - labelidentities verbs: - get - list @@ -4277,7 +4281,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 0bfe61fa131f03545550f3a41480c66a3122c1a87390077d700ca01df6371f9a + checksum/config: f06bc0519d38f9edaaa4516e816c564429500bf878c777abc41033b346e8edff labels: app: antrea component: antrea-agent @@ -4518,7 +4522,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 0bfe61fa131f03545550f3a41480c66a3122c1a87390077d700ca01df6371f9a + checksum/config: f06bc0519d38f9edaaa4516e816c564429500bf878c777abc41033b346e8edff labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 8f107f2a7d3..82a84eeca4d 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -3224,6 +3224,9 @@ data: # The Namespace where Antrea Multi-cluster Controller is running. # The default is antrea-agent's Namespace. namespace: "" + # Enable StretchedNetworkPolicy which could be enforced on cross-cluster traffic. + # This feature is supported only with encap mode when the tunnel type is Geneve. + enableStretchedNetworkPolicy: false antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -3353,7 +3356,7 @@ data: selfSignedCA: true multicluster: - # Enable Multicluster which allow Antrea-native policies to select peers + # Enable StretchedNetworkPolicy which allow Antrea-native policies to select peers # from other clusters in a ClusterSet. enableStretchedNetworkPolicy: false --- @@ -3703,6 +3706,7 @@ rules: - multicluster.crd.antrea.io resources: - clusterinfoimports + - labelidentities verbs: - get - list @@ -4277,7 +4281,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 0bfe61fa131f03545550f3a41480c66a3122c1a87390077d700ca01df6371f9a + checksum/config: f06bc0519d38f9edaaa4516e816c564429500bf878c777abc41033b346e8edff labels: app: antrea component: antrea-agent @@ -4520,7 +4524,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 0bfe61fa131f03545550f3a41480c66a3122c1a87390077d700ca01df6371f9a + checksum/config: f06bc0519d38f9edaaa4516e816c564429500bf878c777abc41033b346e8edff labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 4c1577d5607..2055bf2d0c5 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -3224,6 +3224,9 @@ data: # The Namespace where Antrea Multi-cluster Controller is running. # The default is antrea-agent's Namespace. namespace: "" + # Enable StretchedNetworkPolicy which could be enforced on cross-cluster traffic. + # This feature is supported only with encap mode when the tunnel type is Geneve. + enableStretchedNetworkPolicy: false antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -3353,7 +3356,7 @@ data: selfSignedCA: true multicluster: - # Enable Multicluster which allow Antrea-native policies to select peers + # Enable StretchedNetworkPolicy which allow Antrea-native policies to select peers # from other clusters in a ClusterSet. enableStretchedNetworkPolicy: false --- @@ -3703,6 +3706,7 @@ rules: - multicluster.crd.antrea.io resources: - clusterinfoimports + - labelidentities verbs: - get - list @@ -4277,7 +4281,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: db1a9feabdabaa45a5a006e8d89bd1b3b4a4e3c67573cb98d5f3630e15d4d757 + checksum/config: 4c94da583c601f67c4369808e59e884c0ea07d35b3f295e7a9fe3e14b80221a5 labels: app: antrea component: antrea-agent @@ -4517,7 +4521,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: db1a9feabdabaa45a5a006e8d89bd1b3b4a4e3c67573cb98d5f3630e15d4d757 + checksum/config: 4c94da583c601f67c4369808e59e884c0ea07d35b3f295e7a9fe3e14b80221a5 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index bb152f2806f..e1ef225c7d6 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -3237,6 +3237,9 @@ data: # The Namespace where Antrea Multi-cluster Controller is running. # The default is antrea-agent's Namespace. namespace: "" + # Enable StretchedNetworkPolicy which could be enforced on cross-cluster traffic. + # This feature is supported only with encap mode when the tunnel type is Geneve. + enableStretchedNetworkPolicy: false antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -3366,7 +3369,7 @@ data: selfSignedCA: true multicluster: - # Enable Multicluster which allow Antrea-native policies to select peers + # Enable StretchedNetworkPolicy which allow Antrea-native policies to select peers # from other clusters in a ClusterSet. enableStretchedNetworkPolicy: false --- @@ -3716,6 +3719,7 @@ rules: - multicluster.crd.antrea.io resources: - clusterinfoimports + - labelidentities verbs: - get - list @@ -4290,7 +4294,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 1cc89e2ac8e3f6c3c1297fb1d3d8ba1f8eb1f69a7ff915fc23322d9e45237d3f + checksum/config: 8a0ea8f57943caf7de83015414c2bb78c4283a7280118ce3a2160e121e8829b9 checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4 labels: app: antrea @@ -4576,7 +4580,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 1cc89e2ac8e3f6c3c1297fb1d3d8ba1f8eb1f69a7ff915fc23322d9e45237d3f + checksum/config: 8a0ea8f57943caf7de83015414c2bb78c4283a7280118ce3a2160e121e8829b9 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 645e7defdab..98c0784b91c 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -3224,6 +3224,9 @@ data: # The Namespace where Antrea Multi-cluster Controller is running. # The default is antrea-agent's Namespace. namespace: "" + # Enable StretchedNetworkPolicy which could be enforced on cross-cluster traffic. + # This feature is supported only with encap mode when the tunnel type is Geneve. + enableStretchedNetworkPolicy: false antrea-cni.conflist: | { "cniVersion":"0.3.0", @@ -3353,7 +3356,7 @@ data: selfSignedCA: true multicluster: - # Enable Multicluster which allow Antrea-native policies to select peers + # Enable StretchedNetworkPolicy which allow Antrea-native policies to select peers # from other clusters in a ClusterSet. enableStretchedNetworkPolicy: false --- @@ -3703,6 +3706,7 @@ rules: - multicluster.crd.antrea.io resources: - clusterinfoimports + - labelidentities verbs: - get - list @@ -4277,7 +4281,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: bb8e267e96249bf4d28379cb852eaada9d0e8d20467d58c8e8ab54e33a29fd93 + checksum/config: 7ef1495672b7b4b2ed3d7729f27af861e3f619bd3343488d45b9c8c3195106cd labels: app: antrea component: antrea-agent @@ -4517,7 +4521,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: bb8e267e96249bf4d28379cb852eaada9d0e8d20467d58c8e8ab54e33a29fd93 + checksum/config: 7ef1495672b7b4b2ed3d7729f27af861e3f619bd3343488d45b9c8c3195106cd labels: app: antrea component: antrea-controller diff --git a/ci/jenkins/test-mc.sh b/ci/jenkins/test-mc.sh index a8bd6ffd810..ae1c6d7d44a 100755 --- a/ci/jenkins/test-mc.sh +++ b/ci/jenkins/test-mc.sh @@ -248,6 +248,24 @@ function run_codecov { (set -e rm -f trustedkeys.gpg codecov )} +function modify_config { + if [[ ${ENABLE_MC_GATEWAY} == "true" ]]; then + cat > build/yamls/chart-values/antrea.yml << EOF +multicluster: + enable: true + enableStretchedNetworkPolicy: true +featureGates: { + Multicluster: true +} +EOF + make manifest + cd multicluster + sed -i 's/enableStretchedNetworkPolicy: false/enableStretchedNetworkPolicy: true/g' config/default/configmap/controller_manager_config.yaml + make manifests + cd .. + fi +} + function deliver_antrea_multicluster { echo "====== Building Antrea for the Following Commit ======" export GO111MODULE=on @@ -357,16 +375,6 @@ function run_multicluster_e2e { export GOCACHE=${WORKDIR}/.cache/go-build export PATH=$GOROOT/bin:$PATH - if [[ ${ENABLE_MC_GATEWAY} == "true" ]]; then - cat > build/yamls/chart-values/antrea.yml < 0 { + // If the rule is a StretchedNetworkPolicy rule, we also need to add a security rule. + // The security rule is used to drop all traffic initiated from Pods with + // UnknownLabelIdentity to make sure those traffic won't sneak around the Policy. + rules = append(rules, toStretchedNetworkPolicySecurityRule(&policy.Rules[i], policy, maxPriority)) + } + for _, r := range rules { + if _, exists := ruleByID[r.ID]; exists { + // If rule already exists, remove it from the map so the ones left are orphaned, + // which means those rules need to be handled by dirtyRuleHandler. + klog.V(2).InfoS("Rule was not changed", "id", r.ID) + delete(ruleByID, r.ID) } else { - metrics.EgressNetworkPolicyRuleCount.Inc() + // If rule doesn't exist, add it to cache and mark it as dirty. + c.rules.Add(r) + // Count up antrea_agent_ingress_networkpolicy_rule_count or antrea_agent_egress_networkpolicy_rule_count + if r.Direction == v1beta.DirectionIn { + metrics.IngressNetworkPolicyRuleCount.Inc() + } else { + metrics.EgressNetworkPolicyRuleCount.Inc() + } + c.dirtyRuleHandler(r.ID) + anyRuleUpdate = true } - c.dirtyRuleHandler(r.ID) - anyRuleUpdate = true } } diff --git a/pkg/agent/controller/networkpolicy/cache_test.go b/pkg/agent/controller/networkpolicy/cache_test.go index f0d31e8988a..5ffa0070e7d 100644 --- a/pkg/agent/controller/networkpolicy/cache_test.go +++ b/pkg/agent/controller/networkpolicy/cache_test.go @@ -627,6 +627,14 @@ func TestRuleCacheAddNetworkPolicy(t *testing.T) { Services: nil, Priority: 0, } + networkPolicyRule4 := &v1beta2.NetworkPolicyRule{ + Direction: v1beta2.DirectionIn, + AppliedToGroups: []string{"appliedToGroup1"}, + From: v1beta2.NetworkPolicyPeer{LabelIdentities: []uint32{1}}, + To: v1beta2.NetworkPolicyPeer{}, + Services: nil, + Priority: 0, + } networkPolicy1 := &v1beta2.NetworkPolicy{ ObjectMeta: metav1.ObjectMeta{UID: "policy1", Namespace: "ns1", Name: "name1"}, Rules: nil, @@ -659,9 +667,21 @@ func TestRuleCacheAddNetworkPolicy(t *testing.T) { UID: "policy3", }, } + networkPolicy4 := &v1beta2.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{UID: "policy4", Namespace: "ns4", Name: "name4"}, + Rules: []v1beta2.NetworkPolicyRule{*networkPolicyRule4}, + SourceRef: &v1beta2.NetworkPolicyReference{ + Type: v1beta2.AntreaNetworkPolicy, + Namespace: "ns4", + Name: "name4", + UID: "policy4", + }, + } rule1 := toRule(networkPolicyRule1, networkPolicy2, k8sNPMaxPriority) rule2 := toRule(networkPolicyRule2, networkPolicy2, k8sNPMaxPriority) rule3 := toRule(networkPolicyRule3, networkPolicy3, 0) + rule4 := toRule(networkPolicyRule4, networkPolicy4, 0) + rule4Sec := toStretchedNetworkPolicySecurityRule(networkPolicyRule4, networkPolicy4, 0) tests := []struct { name string args *v1beta2.NetworkPolicy @@ -686,6 +706,12 @@ func TestRuleCacheAddNetworkPolicy(t *testing.T) { []*rule{rule3}, sets.NewString(rule3.ID), }, + { + "rule-with-label-identity", + networkPolicy4, + []*rule{rule4, rule4Sec}, + sets.NewString(rule4.ID, rule4Sec.ID), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/agent/controller/networkpolicy/reconciler.go b/pkg/agent/controller/networkpolicy/reconciler.go index 7dc160f32ce..87151f7c633 100644 --- a/pkg/agent/controller/networkpolicy/reconciler.go +++ b/pkg/agent/controller/networkpolicy/reconciler.go @@ -550,6 +550,8 @@ func (r *reconciler) computeOFRulesForAdd(rule *CompletedRule, ofPriority *uint1 from1 := groupMembersToOFAddresses(rule.FromAddresses) // Get addresses that in From IPBlock but not in Except IPBlocks. from2 := ipBlocksToOFAddresses(rule.From.IPBlocks, r.ipv4Enabled, r.ipv6Enabled, isRuleAppliedToService) + from3 := labelIDToOFAddresses(rule.From.LabelIdentities) + from := append(from1, append(from2, from3...)...) membersByServicesMap, servicesMap := groupMembersByServices(rule.Services, rule.TargetMembers) for svcKey, members := range membersByServicesMap { var toAddresses []types.Address @@ -567,7 +569,7 @@ func (r *reconciler) computeOFRulesForAdd(rule *CompletedRule, ofPriority *uint1 } ofRuleByServicesMap[svcKey] = &types.PolicyRule{ Direction: v1beta2.DirectionIn, - From: append(from1, from2...), + From: from, To: toAddresses, Service: filterUnresolvablePort(servicesMap[svcKey]), Action: rule.Action, @@ -1208,6 +1210,15 @@ func ipBlocksToOFAddresses(ipBlocks []v1beta2.IPBlock, ipv4Enabled, ipv6Enabled, return addresses } +func labelIDToOFAddresses(labelIDs []uint32) []types.Address { + // Must not return nil as it means not restricted by addresses in Openflow implementation. + addresses := make([]types.Address, 0, len(labelIDs)) + for _, labelID := range labelIDs { + addresses = append(addresses, openflow.NewLabelIDAddress(labelID)) + } + return addresses +} + func isIPNetSupportedByAF(ipnet *net.IPNet, ipv4Enabled, ipv6Enabled bool) bool { if (ipnet.IP.To4() != nil && ipv4Enabled) || (ipnet.IP.To4() == nil && ipv6Enabled) { return true diff --git a/pkg/agent/controller/networkpolicy/reconciler_test.go b/pkg/agent/controller/networkpolicy/reconciler_test.go index 9cfb3226623..d5fc492f2ba 100644 --- a/pkg/agent/controller/networkpolicy/reconciler_test.go +++ b/pkg/agent/controller/networkpolicy/reconciler_test.go @@ -444,6 +444,30 @@ func TestReconcilerReconcile(t *testing.T) { }, false, }, + { + "ingress-rule-with-label-identity", + &CompletedRule{ + rule: &rule{ + ID: "ingress-rule", + Direction: v1beta2.DirectionIn, + From: v1beta2.NetworkPolicyPeer{LabelIdentities: []uint32{1}}, + SourceRef: &np1, + }, + FromAddresses: nil, + ToAddresses: nil, + TargetMembers: appliedToGroup1, + }, + []*types.PolicyRule{ + { + Direction: v1beta2.DirectionIn, + From: labelIDToOFAddresses([]uint32{1}), + To: ofPortsToOFAddresses(sets.NewInt32(1)), + Service: nil, + PolicyRef: &np1, + }, + }, + false, + }, { "egress-rule", &CompletedRule{ diff --git a/pkg/agent/controller/networkpolicy/reject.go b/pkg/agent/controller/networkpolicy/reject.go index 0544c9ec056..c468f11a6d3 100644 --- a/pkg/agent/controller/networkpolicy/reject.go +++ b/pkg/agent/controller/networkpolicy/reject.go @@ -299,14 +299,18 @@ func getRejectOFPorts(rejectType RejectType, sIface, dIface *interfacestore.Inte return inPort, outPort } -// getRejectPacketOutMutateFunc returns the mutate func of a packetOut based on the RejectType. +// getRejectPacketOutMutateFunc returns the mutate-func of a packetOut based on the RejectType. func getRejectPacketOutMutateFunc(rejectType RejectType, nodeType config.NodeType) func(binding.PacketOutBuilder) binding.PacketOutBuilder { var mutatePacketOut func(binding.PacketOutBuilder) binding.PacketOutBuilder + mutatePacketOut = func(packetOutBuilder binding.PacketOutBuilder) binding.PacketOutBuilder { + return packetOutBuilder.AddLoadRegMark(openflow.CustomReasonRejectRegMark) + } switch rejectType { case RejectServiceLocal: tableID := openflow.ConntrackTable.GetID() mutatePacketOut = func(packetOutBuilder binding.PacketOutBuilder) binding.PacketOutBuilder { - return packetOutBuilder.AddResubmitAction(nil, &tableID) + return packetOutBuilder.AddLoadRegMark(openflow.CustomReasonRejectRegMark). + AddResubmitAction(nil, &tableID) } case RejectLocalToRemote: tableID := openflow.L3ForwardingTable.GetID() @@ -315,7 +319,8 @@ func getRejectPacketOutMutateFunc(rejectType RejectType, nodeType config.NodeTyp tableID = openflow.L2ForwardingCalcTable.GetID() } mutatePacketOut = func(packetOutBuilder binding.PacketOutBuilder) binding.PacketOutBuilder { - return packetOutBuilder.AddResubmitAction(nil, &tableID) + return packetOutBuilder.AddLoadRegMark(openflow.CustomReasonRejectRegMark). + AddResubmitAction(nil, &tableID) } } return mutatePacketOut diff --git a/pkg/agent/multicluster/mc_route_controller.go b/pkg/agent/multicluster/mc_route_controller.go index 6c5966e9bc4..852f3d4a2a5 100644 --- a/pkg/agent/multicluster/mc_route_controller.go +++ b/pkg/agent/multicluster/mc_route_controller.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package noderoute +package multicluster import ( "fmt" @@ -75,7 +75,8 @@ type MCRouteController struct { // events. installedActiveGW *mcv1alpha1.Gateway // The Namespace where Antrea Multi-cluster Controller is running. - namespace string + namespace string + enableStretchedNetworkPolicy bool } func NewMCRouteController( @@ -87,22 +88,24 @@ func NewMCRouteController( interfaceStore interfacestore.InterfaceStore, nodeConfig *config.NodeConfig, namespace string, + enableStretchedNetworkPolicy bool, ) *MCRouteController { controller := &MCRouteController{ - mcClient: mcClient, - ovsBridgeClient: ovsBridgeClient, - ofClient: client, - interfaceStore: interfaceStore, - nodeConfig: nodeConfig, - gwInformer: gwInformer, - gwLister: gwInformer.Lister(), - gwListerSynced: gwInformer.Informer().HasSynced, - ciImportInformer: ciImportInformer, - ciImportLister: ciImportInformer.Lister(), - ciImportListerSynced: ciImportInformer.Informer().HasSynced, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "gatewayroute"), - installedCIImports: make(map[string]*mcv1alpha1.ClusterInfoImport), - namespace: namespace, + mcClient: mcClient, + ovsBridgeClient: ovsBridgeClient, + ofClient: client, + interfaceStore: interfaceStore, + nodeConfig: nodeConfig, + gwInformer: gwInformer, + gwLister: gwInformer.Lister(), + gwListerSynced: gwInformer.Informer().HasSynced, + ciImportInformer: ciImportInformer, + ciImportLister: ciImportInformer.Lister(), + ciImportListerSynced: ciImportInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "gatewayroute"), + installedCIImports: make(map[string]*mcv1alpha1.ClusterInfoImport), + namespace: namespace, + enableStretchedNetworkPolicy: enableStretchedNetworkPolicy, } controller.gwInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ @@ -372,7 +375,8 @@ func (c *MCRouteController) addMCFlowsForSingleCIImp(activeGW *mcv1alpha1.Gatewa ciImport.Name, peerConfigs, tunnelPeerIPToRemoteGW, - localGatewayIP); err != nil { + localGatewayIP, + c.enableStretchedNetworkPolicy); err != nil { return fmt.Errorf("failed to install flows to remote Gateway in ClusterInfoImport %s: %v", ciImport.Name, err) } } else { @@ -381,7 +385,8 @@ func (c *MCRouteController) addMCFlowsForSingleCIImp(activeGW *mcv1alpha1.Gatewa if err := c.ofClient.InstallMulticlusterNodeFlows( ciImport.Name, peerConfigs, - tunnelPeerIPToLocalGW); err != nil { + tunnelPeerIPToLocalGW, + c.enableStretchedNetworkPolicy); err != nil { return fmt.Errorf("failed to install flows to Gateway %s: %v", activeGW.Name, err) } } diff --git a/pkg/agent/multicluster/mc_route_controller_test.go b/pkg/agent/multicluster/mc_route_controller_test.go index 0f582fbf6a6..c745e5c35f5 100644 --- a/pkg/agent/multicluster/mc_route_controller_test.go +++ b/pkg/agent/multicluster/mc_route_controller_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package noderoute +package multicluster import ( "context" @@ -60,6 +60,7 @@ func newMCRouteController(t *testing.T, nodeConfig *config.NodeConfig) (*fakeRou interfaceStore, nodeConfig, "default", + true, ) return &fakeRouteController{ MCRouteController: c, @@ -153,14 +154,14 @@ func TestMCRouteControllerAsGateway(t *testing.T) { Create(context.TODO(), &clusterInfoImport1, metav1.CreateOptions{}) peerNodeIP1 := getPeerGatewayIP(clusterInfoImport1.Spec) c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport1.Name, - gomock.Any(), peerNodeIP1, gw1GatewayIP).Times(1) + gomock.Any(), peerNodeIP1, gw1GatewayIP, true).Times(1) c.processNextWorkItem() c.mcClient.MulticlusterV1alpha1().ClusterInfoImports(clusterInfoImport2.GetNamespace()). Create(context.TODO(), &clusterInfoImport2, metav1.CreateOptions{}) peerNodeIP2 := getPeerGatewayIP(clusterInfoImport2.Spec) c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport2.Name, - gomock.Any(), peerNodeIP2, gw1GatewayIP).Times(1) + gomock.Any(), peerNodeIP2, gw1GatewayIP, true).Times(1) c.processNextWorkItem() // Update a ClusterInfoImport @@ -168,7 +169,7 @@ func TestMCRouteControllerAsGateway(t *testing.T) { c.mcClient.MulticlusterV1alpha1().ClusterInfoImports(clusterInfoImport1.GetNamespace()). Update(context.TODO(), &clusterInfoImport1, metav1.UpdateOptions{}) c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport1.Name, - gomock.Any(), peerNodeIP1, gw1GatewayIP).Times(1) + gomock.Any(), peerNodeIP1, gw1GatewayIP, true).Times(1) c.processNextWorkItem() // Delete a ClusterInfoImport @@ -184,7 +185,7 @@ func TestMCRouteControllerAsGateway(t *testing.T) { c.mcClient.MulticlusterV1alpha1().Gateways(updatedGateway1a.GetNamespace()).Update(context.TODO(), updatedGateway1a, metav1.UpdateOptions{}) c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport1.Name, - gomock.Any(), peerNodeIP1, updatedGateway1aIP).Times(1) + gomock.Any(), peerNodeIP1, updatedGateway1aIP, true).Times(1) c.processNextWorkItem() // Update Gateway1's InternalIP @@ -199,7 +200,7 @@ func TestMCRouteControllerAsGateway(t *testing.T) { &gateway2, metav1.CreateOptions{}) c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), false).Times(1) - c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, gomock.Any(), gw2InternalIP).Times(1) + c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, gomock.Any(), gw2InternalIP, true).Times(1) c.processNextWorkItem() // Delete Gateway2, then Gateway1 become active Gateway @@ -208,7 +209,7 @@ func TestMCRouteControllerAsGateway(t *testing.T) { c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), true).Times(1) c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport1.Name, - gomock.Any(), peerNodeIP1, updatedGateway1aIP).Times(1) + gomock.Any(), peerNodeIP1, updatedGateway1aIP, true).Times(1) c.processNextWorkItem() // Delete last Gateway @@ -250,13 +251,13 @@ func TestMCRouteControllerAsRegularNode(t *testing.T) { c.mcClient.MulticlusterV1alpha1().ClusterInfoImports(clusterInfoImport1.GetNamespace()). Create(context.TODO(), &clusterInfoImport1, metav1.CreateOptions{}) c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, - gomock.Any(), peerNodeIP1).Times(1) + gomock.Any(), peerNodeIP1, true).Times(1) c.processNextWorkItem() c.mcClient.MulticlusterV1alpha1().ClusterInfoImports(clusterInfoImport2.GetNamespace()). Create(context.TODO(), &clusterInfoImport2, metav1.CreateOptions{}) c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport2.Name, - gomock.Any(), peerNodeIP1).Times(1) + gomock.Any(), peerNodeIP1, true).Times(1) c.processNextWorkItem() // Update a ClusterInfoImport @@ -264,7 +265,7 @@ func TestMCRouteControllerAsRegularNode(t *testing.T) { c.mcClient.MulticlusterV1alpha1().ClusterInfoImports(clusterInfoImport1.GetNamespace()). Update(context.TODO(), &clusterInfoImport1, metav1.UpdateOptions{}) c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, - gomock.Any(), peerNodeIP1).Times(1) + gomock.Any(), peerNodeIP1, true).Times(1) c.processNextWorkItem() // Delete a ClusterInfoImport @@ -287,14 +288,14 @@ func TestMCRouteControllerAsRegularNode(t *testing.T) { c.mcClient.MulticlusterV1alpha1().Gateways(updatedGateway1b.GetNamespace()).Update(context.TODO(), updatedGateway1b, metav1.UpdateOptions{}) c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, - gomock.Any(), updatedGateway1bIP).Times(1) + gomock.Any(), updatedGateway1bIP, true).Times(1) c.processNextWorkItem() // Create Gateway2 as the active Gateway c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Create(context.TODO(), &gateway2, metav1.CreateOptions{}) c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), false).Times(1) - c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, gomock.Any(), peerNodeIP2).Times(1) + c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, gomock.Any(), peerNodeIP2, true).Times(1) c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) c.processNextWorkItem() @@ -304,7 +305,7 @@ func TestMCRouteControllerAsRegularNode(t *testing.T) { c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), false).Times(1) c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, - gomock.Any(), updatedGateway1bIP).Times(1) + gomock.Any(), updatedGateway1bIP, true).Times(1) c.processNextWorkItem() // Delete last Gateway diff --git a/pkg/agent/multicluster/stretched_networkpolicy_controller.go b/pkg/agent/multicluster/stretched_networkpolicy_controller.go new file mode 100644 index 00000000000..e9a67f1d551 --- /dev/null +++ b/pkg/agent/multicluster/stretched_networkpolicy_controller.go @@ -0,0 +1,345 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package multicluster + +import ( + "fmt" + "reflect" + "sync" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + "antrea.io/antrea/multicluster/controllers/multicluster" + mcinformers "antrea.io/antrea/multicluster/pkg/client/informers/externalversions/multicluster/v1alpha1" + mclisters "antrea.io/antrea/multicluster/pkg/client/listers/multicluster/v1alpha1" + "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/openflow" + antreatypes "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/util/channel" +) + +const ( + stretchedNetworkPolicyWorker = 4 + stretchedNetworkPolicyControllerName = "AntreaAgentStretchedNetworkPolicyController" + + labelIndex = "Label" +) + +type podSet map[types.NamespacedName]struct{} + +// StretchedNetworkPolicyController is used to update classifier flows of Pods. +// It will make sure the latest LabelIdentity of the Pod, if available, will be +// loaded into tun_id in the classifier flow of the Pod. +// If the LabelIdentity of the Pod is not available when updating, the +// UnknownLabelIdentity will be loaded. When the actual LabelIdentity is created, +// the classifier flow will be updated accordingly. +type StretchedNetworkPolicyController struct { + ofClient openflow.Client + interfaceStore interfacestore.InterfaceStore + podInformer cache.SharedIndexInformer + podLister corelisters.PodLister + podListerSynced cache.InformerSynced + namespaceInformer coreinformers.NamespaceInformer + namespaceLister corelisters.NamespaceLister + namespaceListerSynced cache.InformerSynced + labelIdentityInformer mcinformers.LabelIdentityInformer + labelIdentityLister mclisters.LabelIdentityLister + LabelIdentityListerSynced cache.InformerSynced + queue workqueue.RateLimitingInterface + lock sync.RWMutex + + labelToPods map[string]podSet + podToLabel map[types.NamespacedName]string +} + +func NewMCAgentStretchedNetworkPolicyController( + client openflow.Client, + interfaceStore interfacestore.InterfaceStore, + podInformer cache.SharedIndexInformer, + namespaceInformer coreinformers.NamespaceInformer, + labelIdentityInformer mcinformers.LabelIdentityInformer, + podUpdateSubscriber channel.Subscriber, +) *StretchedNetworkPolicyController { + controller := &StretchedNetworkPolicyController{ + ofClient: client, + interfaceStore: interfaceStore, + podInformer: podInformer, + podLister: corelisters.NewPodLister(podInformer.GetIndexer()), + podListerSynced: podInformer.HasSynced, + namespaceInformer: namespaceInformer, + namespaceLister: namespaceInformer.Lister(), + namespaceListerSynced: namespaceInformer.Informer().HasSynced, + labelIdentityInformer: labelIdentityInformer, + labelIdentityLister: labelIdentityInformer.Lister(), + LabelIdentityListerSynced: labelIdentityInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter(), "stretchedNetworkPolicy"), + labelToPods: map[string]podSet{}, + podToLabel: map[types.NamespacedName]string{}, + } + + controller.podInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + // Pod add event will be handled by processPodCNIAddEvent. + // We choose to use events from podUpdateSubscriber instead of the informer because + // the controller can only update the Pod classifier flow when the Pod container + // config is available. Events from the Informer may be received way before the Pod + // container config is available, which will cause the work item be continually + // re-queued with an exponential increased delay time. When the Pod container + // config is ready, the work item could wait for a long time to be processed. + UpdateFunc: controller.processPodUpdate, + DeleteFunc: controller.processPodDelete, + }, + resyncPeriod, + ) + controller.namespaceInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + UpdateFunc: controller.processNamespaceUpdate, + }, + resyncPeriod, + ) + controller.labelIdentityInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: controller.processLabelIdentityEvent, + UpdateFunc: func(old, cur interface{}) { + controller.processLabelIdentityEvent(cur) + }, + DeleteFunc: controller.processLabelIdentityEvent, + }, + resyncPeriod, + ) + podUpdateSubscriber.Subscribe(controller.processPodCNIAddEvent) + + controller.labelIdentityInformer.Informer().AddIndexers(cache.Indexers{ + labelIndex: func(obj interface{}) ([]string, error) { + labelID, ok := obj.(*v1alpha1.LabelIdentity) + if !ok { + return []string{}, nil + } + return []string{labelID.Spec.Label}, nil + }}) + return controller +} + +func (s *StretchedNetworkPolicyController) Run(stopCh <-chan struct{}) { + defer s.queue.ShutDown() + + klog.InfoS("Starting controller", "controller", stretchedNetworkPolicyControllerName) + defer klog.InfoS("Shutting down controller", "controller", stretchedNetworkPolicyControllerName) + cacheSyncs := []cache.InformerSynced{s.podListerSynced, s.namespaceListerSynced, s.LabelIdentityListerSynced} + if !cache.WaitForNamedCacheSync(stretchedNetworkPolicyControllerName, stopCh, cacheSyncs...) { + return + } + s.enqueueAllPods() + for i := 0; i < stretchedNetworkPolicyWorker; i++ { + go wait.Until(s.worker, time.Second, stopCh) + } + <-stopCh +} + +func (s *StretchedNetworkPolicyController) enqueueAllPods() { + pods, _ := s.podLister.List(labels.Everything()) + for _, pod := range pods { + if pod.Spec.HostNetwork { + continue + } + s.queue.Add(getPodReference(pod)) + } +} + +// worker is a long-running function that will continually call the processNextWorkItem +// function in order to read and process a message on the workqueue. +func (s *StretchedNetworkPolicyController) worker() { + for s.processNextWorkItem() { + } +} + +func (s *StretchedNetworkPolicyController) processNextWorkItem() bool { + obj, quit := s.queue.Get() + if quit { + return false + } + defer s.queue.Done(obj) + + if podRef, ok := obj.(types.NamespacedName); !ok { + s.queue.Forget(obj) + klog.Errorf("Expected type 'NamespacedName' in work queue but got object", "object", obj) + } else if err := s.syncPodClassifierFlow(podRef); err == nil { + s.queue.Forget(podRef) + } else { + // Put the item back on the workqueue to handle any transient errors. + s.queue.AddRateLimited(podRef) + klog.ErrorS(err, "Error syncing Pod classifier flow, requeuing", "name", podRef.Name, "namespace", podRef.Namespace) + } + return true +} + +// syncPodClassifierFlow gets containerConfigs and labelIdentity according to a +// Pod reference and updates this Pod's classifierFlow. +func (s *StretchedNetworkPolicyController) syncPodClassifierFlow(podRef types.NamespacedName) error { + pod, err := s.podLister.Pods(podRef.Namespace).Get(podRef.Name) + if err != nil || pod.Spec.HostNetwork { + return nil + } + containerConfigs := s.interfaceStore.GetContainerInterfacesByPod(podRef.Name, podRef.Namespace) + if len(containerConfigs) == 0 { + klog.InfoS("Pod container config not found, will retry after it's ready", "name", podRef.Name, "namespace", podRef.Namespace) + return nil + } + podNS, err := s.namespaceLister.Get(podRef.Namespace) + if err != nil { + return fmt.Errorf("can't get Namespace %s: %v", podRef.Namespace, err) + } + normalizedLabel := multicluster.GetNormalizedLabel(podNS.Labels, pod.Labels, podNS.Name) + labelID := s.getLabelIdentity(podRef, normalizedLabel) + return s.ofClient.InstallPodFlows( + containerConfigs[0].InterfaceName, + containerConfigs[0].IPs, + containerConfigs[0].MAC, + uint32(containerConfigs[0].OFPort), + containerConfigs[0].VLANID, + &labelID, + ) +} + +// getLabelIdentity updates labelToPods and podToLabel and returns the +// LabelIdentity based on the normalizedLabel. +func (s *StretchedNetworkPolicyController) getLabelIdentity(podRef types.NamespacedName, normalizedLabel string) uint32 { + s.lock.Lock() + oldNormalizedLabel, ok := s.podToLabel[podRef] + if ok && oldNormalizedLabel != normalizedLabel { + s.deleteLabelToPod(oldNormalizedLabel, podRef) + } + if !ok || oldNormalizedLabel != normalizedLabel { + s.addLabelToPod(normalizedLabel, podRef) + s.podToLabel[podRef] = normalizedLabel + } + s.lock.Unlock() + + labelID := openflow.UnknownLabelIdentity + if objs, err := s.labelIdentityInformer.Informer().GetIndexer().ByIndex(labelIndex, normalizedLabel); err == nil && len(objs) == 1 { + labelIdentity := objs[0].(*v1alpha1.LabelIdentity) + labelID = labelIdentity.Spec.ID + } + return labelID +} + +func (s *StretchedNetworkPolicyController) processPodCNIAddEvent(e interface{}) { + podEvent := e.(antreatypes.PodUpdate) + if !podEvent.IsAdd { + return + } + podRef := types.NamespacedName{ + Namespace: podEvent.PodNamespace, + Name: podEvent.PodName, + } + s.queue.Add(podRef) +} + +// processPodUpdate handles Pod update events. It only enqueues the Pod if the +// Labels of this Pod have been updated. +func (s *StretchedNetworkPolicyController) processPodUpdate(old, cur interface{}) { + oldPod, _ := old.(*v1.Pod) + curPod, _ := cur.(*v1.Pod) + if curPod.Spec.HostNetwork { + klog.V(5).InfoS("Skipped processing hostNetwork Pod update event", "name", curPod.Name, "namespace", curPod.Namespace) + return + } + if reflect.DeepEqual(oldPod.Labels, curPod.Labels) { + klog.V(5).InfoS("Pod UpdateFunc received a Pod update event, "+ + "but labels are the same. Skip it", "name", curPod.Name, "namespace", curPod.Namespace) + return + } + s.queue.Add(getPodReference(curPod)) +} + +// processPodDelete handles Pod delete events. It deletes the Pod from the +// labelToPods and podToLabel. After Pod is deleted, its classifier flow will also +// be deleted by podConfigurator. So no need to enqueue this Pod to update its +// classifier flow. +func (s *StretchedNetworkPolicyController) processPodDelete(old interface{}) { + oldPod, _ := old.(*v1.Pod) + oldPodRef := getPodReference(oldPod) + s.lock.Lock() + defer s.lock.Unlock() + if podLabel, ok := s.podToLabel[oldPodRef]; ok { + s.deleteLabelToPod(podLabel, oldPodRef) + delete(s.podToLabel, oldPodRef) + } +} + +// processNamespaceUpdate handles Namespace update events. It only enqueues all +// Pods in this Namespace if the Labels of this Namespace have been updated. +func (s *StretchedNetworkPolicyController) processNamespaceUpdate(old, cur interface{}) { + oldNS, _ := old.(*v1.Namespace) + curNS, _ := cur.(*v1.Namespace) + if reflect.DeepEqual(oldNS.Labels, curNS.Labels) { + klog.V(5).InfoS("Namespace UpdateFunc received a Namespace update event, but labels are the same. Skip it", "namespace", curNS.Name) + return + } + allPodsInNS, _ := s.podLister.Pods(curNS.Name).List(labels.Everything()) + for _, pod := range allPodsInNS { + if pod.Spec.HostNetwork { + continue + } + s.queue.Add(getPodReference(pod)) + } +} + +// processLabelIdentityEvent handles labelIdentity add/update/delete event. +// It will enqueue all Pods affected by this labelIdentity +func (s *StretchedNetworkPolicyController) processLabelIdentityEvent(cur interface{}) { + labelIdentity, _ := cur.(*v1alpha1.LabelIdentity) + s.lock.RLock() + defer s.lock.RUnlock() + if podSet, ok := s.labelToPods[labelIdentity.Spec.Label]; ok { + for podRef := range podSet { + s.queue.Add(podRef) + } + } +} + +func (s *StretchedNetworkPolicyController) addLabelToPod(normalizedLabel string, podRef types.NamespacedName) { + if _, ok := s.labelToPods[normalizedLabel]; ok { + s.labelToPods[normalizedLabel][podRef] = struct{}{} + } else { + s.labelToPods[normalizedLabel] = podSet{podRef: struct{}{}} + } +} + +func (s *StretchedNetworkPolicyController) deleteLabelToPod(normalizedLabel string, podRef types.NamespacedName) { + if _, ok := s.labelToPods[normalizedLabel]; ok { + delete(s.labelToPods[normalizedLabel], podRef) + if len(s.labelToPods[normalizedLabel]) == 0 { + delete(s.labelToPods, normalizedLabel) + } + } +} + +func getPodReference(pod *v1.Pod) types.NamespacedName { + return types.NamespacedName{ + Name: pod.Name, + Namespace: pod.Namespace, + } +} diff --git a/pkg/agent/multicluster/stretched_networkpolicy_controller_test.go b/pkg/agent/multicluster/stretched_networkpolicy_controller_test.go new file mode 100644 index 00000000000..8a259ae59a1 --- /dev/null +++ b/pkg/agent/multicluster/stretched_networkpolicy_controller_test.go @@ -0,0 +1,587 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package multicluster + +import ( + "context" + "net" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" + + "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcfake "antrea.io/antrea/multicluster/pkg/client/clientset/versioned/fake" + mcinformers "antrea.io/antrea/multicluster/pkg/client/informers/externalversions" + "antrea.io/antrea/pkg/agent/interfacestore" + interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing" + "antrea.io/antrea/pkg/agent/openflow" + oftest "antrea.io/antrea/pkg/agent/openflow/testing" + antreatypes "antrea.io/antrea/pkg/agent/types" + ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" + "antrea.io/antrea/pkg/util/channel" +) + +const ( + interval = time.Millisecond + timeout = time.Second +) + +type fakeStretchedNetworkPolicyController struct { + *StretchedNetworkPolicyController + clientset *fake.Clientset + mcClient *mcfake.Clientset + informerFactory informers.SharedInformerFactory + mcInformerFactory mcinformers.SharedInformerFactory + ofClient *oftest.MockClient + ovsClient *ovsconfigtest.MockOVSBridgeClient + interfaceStore *interfacestoretest.MockInterfaceStore + podUpdateChannel *channel.SubscribableChannel +} + +func newStretchedNetworkPolicyController(t *testing.T, clientset *fake.Clientset, mcClient *mcfake.Clientset) (*fakeStretchedNetworkPolicyController, func()) { + informerFactory := informers.NewSharedInformerFactory(clientset, 12*time.Hour) + listOptions := func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", "test-node").String() + } + podInformer := coreinformers.NewFilteredPodInformer( + clientset, + metav1.NamespaceAll, + 0, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, // NamespaceIndex is used in NPLController. + listOptions, + ) + nsInformer := informerFactory.Core().V1().Namespaces() + mcInformerFactory := mcinformers.NewSharedInformerFactory(mcClient, 60*time.Second) + labelIDInformer := mcInformerFactory.Multicluster().V1alpha1().LabelIdentities() + + podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100) + ctrl := gomock.NewController(t) + ofClient := oftest.NewMockClient(ctrl) + ovsClient := ovsconfigtest.NewMockOVSBridgeClient(ctrl) + interfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl) + c := NewMCAgentStretchedNetworkPolicyController( + ofClient, + interfaceStore, + podInformer, + nsInformer, + labelIDInformer, + podUpdateChannel, + ) + return &fakeStretchedNetworkPolicyController{ + StretchedNetworkPolicyController: c, + clientset: clientset, + mcClient: mcClient, + informerFactory: informerFactory, + mcInformerFactory: mcInformerFactory, + ofClient: ofClient, + ovsClient: ovsClient, + interfaceStore: interfaceStore, + podUpdateChannel: podUpdateChannel, + }, ctrl.Finish +} + +var ( + interfaceConfig = interfacestore.InterfaceConfig{ + InterfaceName: "foo", + OVSPortConfig: &interfacestore.OVSPortConfig{ + OFPort: 2, + }, + IPs: []net.IP{[]byte("1.1.1.1")}, + } + unknownLabelIdentity = openflow.UnknownLabelIdentity +) + +func TestEnqueueAllPods(t *testing.T) { + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ns", + Labels: map[string]string{ + "env": "test", + }, + }, + } + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Namespace: "ns", + Labels: map[string]string{ + "foo": "bar1", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + } + labelIdentity := &v1alpha1.LabelIdentity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "labelIdentity1", + }, + Spec: v1alpha1.LabelIdentitySpec{ + Label: "ns:env=test,kubernetes.io/metadata.name=ns&pod:foo=bar1", + ID: 1, + }, + } + + clientset := fake.NewSimpleClientset(ns, pod) + mcClient := mcfake.NewSimpleClientset(labelIdentity) + c, closeFn := newStretchedNetworkPolicyController(t, clientset, mcClient) + defer closeFn() + defer c.queue.ShutDown() + + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + c.mcInformerFactory.Start(stopCh) + c.mcInformerFactory.WaitForCacheSync(stopCh) + go c.podInformer.Run(stopCh) + if err := waitForPodRealized(c, pod); err != nil { + t.Errorf("error when waiting for Pod '%s/%s' to be realized: %v", pod.Namespace, pod.Name, err) + } + if err := waitForLabelIdentityRealized(c, labelIdentity); err != nil { + t.Errorf("error when waiting for LabelIdentity '%s' to be realized: %v", labelIdentity.Name, err) + } + c.enqueueAllPods() + + finishCh := make(chan struct{}) + go func() { + defer close(finishCh) + c.interfaceStore.EXPECT().GetContainerInterfacesByPod(pod.Name, pod.Namespace).Return([]*interfacestore.InterfaceConfig{&interfaceConfig}).Times(1) + c.ofClient.EXPECT().InstallPodFlows(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Eq(&labelIdentity.Spec.ID)).Times(1) + c.processNextWorkItem() + assert.Equal(t, map[types.NamespacedName]string{{Name: pod.Name, Namespace: pod.Namespace}: labelIdentity.Spec.Label}, c.podToLabel) + assert.Equal(t, map[string]podSet{labelIdentity.Spec.Label: {types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}: struct{}{}}}, c.labelToPods) + }() + select { + case <-time.After(5 * time.Second): + t.Errorf("Test didn't finish in time") + case <-finishCh: + } +} + +func TestStretchedNetworkPolicyControllerPodEvent(t *testing.T) { + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ns", + Labels: map[string]string{ + "env": "test", + }, + }, + } + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Namespace: "ns", + Labels: map[string]string{ + "foo": "bar1", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + } + labelIdentity1 := &v1alpha1.LabelIdentity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "labelIdentity1", + }, + Spec: v1alpha1.LabelIdentitySpec{ + Label: "ns:env=test,kubernetes.io/metadata.name=ns&pod:foo=bar1", + ID: 1, + }, + } + labelIdentity2 := &v1alpha1.LabelIdentity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "labelIdentity2", + }, + Spec: v1alpha1.LabelIdentitySpec{ + Label: "ns:env=test,kubernetes.io/metadata.name=ns&pod:foo=bar2", + ID: 2, + }, + } + + clientset := fake.NewSimpleClientset() + mcClient := mcfake.NewSimpleClientset() + c, closeFn := newStretchedNetworkPolicyController(t, clientset, mcClient) + defer closeFn() + defer c.queue.ShutDown() + + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + c.mcInformerFactory.Start(stopCh) + c.mcInformerFactory.WaitForCacheSync(stopCh) + go c.podInformer.Run(stopCh) + go c.podUpdateChannel.Run(stopCh) + + finishCh := make(chan struct{}) + go func() { + defer close(finishCh) + c.clientset.CoreV1().Namespaces().Create(context.TODO(), ns, metav1.CreateOptions{}) + if err := waitForNSRealized(c, ns); err != nil { + t.Errorf("error when waiting for Namespace '%s' to be realized: %v", ns.Name, err) + } + + // Create a Pod whose LabelIdentity doesn't exist. + c.clientset.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + if err := waitForPodRealized(c, pod); err != nil { + t.Errorf("error when waiting for Pod '%s/%s' to be realized: %v", pod.Namespace, pod.Name, err) + } + c.interfaceStore.EXPECT().GetContainerInterfacesByPod(pod.Name, pod.Namespace).Return([]*interfacestore.InterfaceConfig{&interfaceConfig}).Times(1) + c.ofClient.EXPECT().InstallPodFlows(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Eq(&unknownLabelIdentity)).Times(1) + c.podUpdateChannel.Notify(toPodAddEvent(pod)) + c.processNextWorkItem() + assert.Equal(t, map[types.NamespacedName]string{{Name: pod.Name, Namespace: pod.Namespace}: labelIdentity1.Spec.Label}, c.podToLabel) + assert.Equal(t, map[string]podSet{labelIdentity1.Spec.Label: {types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}: struct{}{}}}, c.labelToPods) + + // Delete a Pod. + c.clientset.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) + + // Create a Pod whose LabelIdentity already exist. + c.mcClient.MulticlusterV1alpha1().LabelIdentities().Create(context.TODO(), labelIdentity1, metav1.CreateOptions{}) + if err := waitForLabelIdentityRealized(c, labelIdentity1); err != nil { + t.Errorf("error when waiting for LabelIdentity '%s' to be realized: %v", labelIdentity1.Name, err) + } + c.interfaceStore.EXPECT().GetContainerInterfacesByPod(pod.Name, pod.Namespace).Return([]*interfacestore.InterfaceConfig{&interfaceConfig}).Times(1) + c.clientset.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + if err := waitForPodRealized(c, pod); err != nil { + t.Errorf("error when waiting for Pod '%s/%s' to be realized: %v", pod.Namespace, pod.Name, err) + } + c.ofClient.EXPECT().InstallPodFlows(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Eq(&labelIdentity1.Spec.ID)).Times(1) + c.podUpdateChannel.Notify(toPodAddEvent(pod)) + c.processNextWorkItem() + assert.Equal(t, map[types.NamespacedName]string{{Name: pod.Name, Namespace: pod.Namespace}: labelIdentity1.Spec.Label}, c.podToLabel) + assert.Equal(t, map[string]podSet{labelIdentity1.Spec.Label: {types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}: struct{}{}}}, c.labelToPods) + + // Update Pod label. + c.mcClient.MulticlusterV1alpha1().LabelIdentities().Create(context.TODO(), labelIdentity2, metav1.CreateOptions{}) + if err := waitForLabelIdentityRealized(c, labelIdentity2); err != nil { + t.Errorf("error when waiting for LabelIdentity '%s' to be realized: %v", labelIdentity2.Name, err) + } + c.interfaceStore.EXPECT().GetContainerInterfacesByPod(pod.Name, pod.Namespace).Return([]*interfacestore.InterfaceConfig{&interfaceConfig}).Times(1) + pod.Labels["foo"] = "bar2" + c.clientset.CoreV1().Pods(pod.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{}) + c.ofClient.EXPECT().InstallPodFlows(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Eq(&labelIdentity2.Spec.ID)).Times(1) + c.processNextWorkItem() + assert.Equal(t, map[types.NamespacedName]string{{Name: pod.Name, Namespace: pod.Namespace}: labelIdentity2.Spec.Label}, c.podToLabel) + assert.Equal(t, map[string]podSet{labelIdentity2.Spec.Label: {types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}: struct{}{}}}, c.labelToPods) + }() + select { + case <-time.After(5 * time.Second): + t.Errorf("Test didn't finish in time") + case <-finishCh: + } +} + +func TestStretchedNetworkPolicyControllerNSEvent(t *testing.T) { + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ns", + Labels: map[string]string{ + "env": "test", + }, + }, + } + pod1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "ns", + Labels: map[string]string{ + "foo": "bar1", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + } + pod2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod2", + Namespace: "ns", + Labels: map[string]string{ + "foo": "bar2", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + } + labelIdentity1 := &v1alpha1.LabelIdentity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "labelIdentity1", + }, + Spec: v1alpha1.LabelIdentitySpec{ + Label: "ns:env=test,kubernetes.io/metadata.name=ns&pod:foo=bar1", + ID: 1, + }, + } + labelIdentity2 := &v1alpha1.LabelIdentity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "labelIdentity2", + }, + Spec: v1alpha1.LabelIdentitySpec{ + Label: "ns:env=test,kubernetes.io/metadata.name=ns&pod:foo=bar2", + ID: 2, + }, + } + labelIdentity3 := &v1alpha1.LabelIdentity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "labelIdentity3", + }, + Spec: v1alpha1.LabelIdentitySpec{ + Label: "ns:env=prod,kubernetes.io/metadata.name=ns&pod:foo=bar1", + ID: 3, + }, + } + labelIdentity4 := &v1alpha1.LabelIdentity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "labelIdentity4", + }, + Spec: v1alpha1.LabelIdentitySpec{ + Label: "ns:env=prod,kubernetes.io/metadata.name=ns&pod:foo=bar2", + ID: 4, + }, + } + + clientset := fake.NewSimpleClientset() + mcClient := mcfake.NewSimpleClientset() + c, closeFn := newStretchedNetworkPolicyController(t, clientset, mcClient) + defer closeFn() + defer c.queue.ShutDown() + + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + c.mcInformerFactory.Start(stopCh) + c.mcInformerFactory.WaitForCacheSync(stopCh) + go c.podInformer.Run(stopCh) + go c.podUpdateChannel.Run(stopCh) + + finishCh := make(chan struct{}) + go func() { + defer close(finishCh) + + c.mcClient.MulticlusterV1alpha1().LabelIdentities().Create(context.TODO(), labelIdentity1, metav1.CreateOptions{}) + if err := waitForLabelIdentityRealized(c, labelIdentity1); err != nil { + t.Errorf("error when waiting for LabelIdentity '%s' to be realized: %v", labelIdentity1.Name, err) + } + c.mcClient.MulticlusterV1alpha1().LabelIdentities().Create(context.TODO(), labelIdentity2, metav1.CreateOptions{}) + if err := waitForLabelIdentityRealized(c, labelIdentity2); err != nil { + t.Errorf("error when waiting for LabelIdentity '%s' to be realized: %v", labelIdentity2.Name, err) + } + c.mcClient.MulticlusterV1alpha1().LabelIdentities().Create(context.TODO(), labelIdentity3, metav1.CreateOptions{}) + if err := waitForLabelIdentityRealized(c, labelIdentity3); err != nil { + t.Errorf("error when waiting for LabelIdentity '%s' to be realized: %v", labelIdentity3.Name, err) + } + c.mcClient.MulticlusterV1alpha1().LabelIdentities().Create(context.TODO(), labelIdentity4, metav1.CreateOptions{}) + if err := waitForLabelIdentityRealized(c, labelIdentity4); err != nil { + t.Errorf("error when waiting for LabelIdentity '%s' to be realized: %v", labelIdentity4.Name, err) + } + + c.clientset.CoreV1().Namespaces().Create(context.TODO(), ns, metav1.CreateOptions{}) + if err := waitForNSRealized(c, ns); err != nil { + t.Errorf("error when waiting for Namespace '%s' to be realized: %v", ns.Name, err) + } + + c.clientset.CoreV1().Pods(pod1.Namespace).Create(context.TODO(), pod1, metav1.CreateOptions{}) + if err := waitForPodRealized(c, pod1); err != nil { + t.Errorf("error when waiting for Pod '%s/%s' to be realized: %v", pod1.Namespace, pod1.Name, err) + } + c.clientset.CoreV1().Pods(pod2.Namespace).Create(context.TODO(), pod2, metav1.CreateOptions{}) + if err := waitForPodRealized(c, pod2); err != nil { + t.Errorf("error when waiting for Pod '%s/%s' to be realized: %v", pod2.Namespace, pod2.Name, err) + } + c.interfaceStore.EXPECT().GetContainerInterfacesByPod(pod1.Name, pod1.Namespace).Return([]*interfacestore.InterfaceConfig{&interfaceConfig}).Times(1) + c.interfaceStore.EXPECT().GetContainerInterfacesByPod(pod2.Name, pod2.Namespace).Return([]*interfacestore.InterfaceConfig{&interfaceConfig}).Times(1) + c.ofClient.EXPECT().InstallPodFlows(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Eq(&labelIdentity1.Spec.ID)).Times(1) + c.ofClient.EXPECT().InstallPodFlows(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Eq(&labelIdentity2.Spec.ID)).Times(1) + c.podUpdateChannel.Notify(toPodAddEvent(pod1)) + c.processNextWorkItem() + c.podUpdateChannel.Notify(toPodAddEvent(pod2)) + c.processNextWorkItem() + + // Update Namespace label. + ns.Labels["env"] = "prod" + c.clientset.CoreV1().Namespaces().Update(context.TODO(), ns, metav1.UpdateOptions{}) + c.interfaceStore.EXPECT().GetContainerInterfacesByPod(pod1.Name, pod1.Namespace).Return([]*interfacestore.InterfaceConfig{&interfaceConfig}).Times(1) + c.interfaceStore.EXPECT().GetContainerInterfacesByPod(pod2.Name, pod2.Namespace).Return([]*interfacestore.InterfaceConfig{&interfaceConfig}).Times(1) + c.ofClient.EXPECT().InstallPodFlows(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Eq(&labelIdentity3.Spec.ID)).Times(1) + c.ofClient.EXPECT().InstallPodFlows(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Eq(&labelIdentity4.Spec.ID)).Times(1) + c.processNextWorkItem() + c.processNextWorkItem() + assert.Equal(t, map[types.NamespacedName]string{ + {Name: pod1.Name, Namespace: pod1.Namespace}: labelIdentity3.Spec.Label, + {Name: pod2.Name, Namespace: pod2.Namespace}: labelIdentity4.Spec.Label, + }, c.podToLabel) + assert.Equal(t, map[string]podSet{ + labelIdentity3.Spec.Label: {types.NamespacedName{Name: pod1.Name, Namespace: pod1.Namespace}: struct{}{}}, + labelIdentity4.Spec.Label: {types.NamespacedName{Name: pod2.Name, Namespace: pod2.Namespace}: struct{}{}}, + }, c.labelToPods) + }() + select { + case <-time.After(5 * time.Second): + t.Errorf("Test didn't finish in time") + case <-finishCh: + } +} + +func TestStretchedNetworkPolicyControllerLabelIdentityEvent(t *testing.T) { + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ns", + Labels: map[string]string{ + "env": "test", + }, + }, + } + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Namespace: "ns", + Labels: map[string]string{ + "foo": "bar1", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + } + labelIdentity := &v1alpha1.LabelIdentity{ + ObjectMeta: metav1.ObjectMeta{ + Name: "labelIdentity", + }, + Spec: v1alpha1.LabelIdentitySpec{ + Label: "ns:env=test,kubernetes.io/metadata.name=ns&pod:foo=bar1", + ID: 1, + }, + } + clientset := fake.NewSimpleClientset() + mcClient := mcfake.NewSimpleClientset() + c, closeFn := newStretchedNetworkPolicyController(t, clientset, mcClient) + defer closeFn() + defer c.queue.ShutDown() + + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + c.mcInformerFactory.Start(stopCh) + c.mcInformerFactory.WaitForCacheSync(stopCh) + go c.podInformer.Run(stopCh) + go c.podUpdateChannel.Run(stopCh) + + finishCh := make(chan struct{}) + go func() { + defer close(finishCh) + c.clientset.CoreV1().Namespaces().Create(context.TODO(), ns, metav1.CreateOptions{}) + if err := waitForNSRealized(c, ns); err != nil { + t.Errorf("error when waiting for Namespace '%s' to be realized: %v", ns.Name, err) + } + + c.clientset.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + if err := waitForPodRealized(c, pod); err != nil { + t.Errorf("error when waiting for Pod '%s/%s' to be realized: %v", pod.Namespace, pod.Name, err) + } + c.interfaceStore.EXPECT().GetContainerInterfacesByPod(pod.Name, pod.Namespace).Return([]*interfacestore.InterfaceConfig{&interfaceConfig}).Times(1) + c.ofClient.EXPECT().InstallPodFlows(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Eq(&unknownLabelIdentity)).Times(1) + c.podUpdateChannel.Notify(toPodAddEvent(pod)) + c.processNextWorkItem() + assert.Equal(t, map[types.NamespacedName]string{{Name: pod.Name, Namespace: pod.Namespace}: labelIdentity.Spec.Label}, c.podToLabel) + assert.Equal(t, map[string]podSet{labelIdentity.Spec.Label: {types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}: struct{}{}}}, c.labelToPods) + + // Create LabelIdentity + c.mcClient.MulticlusterV1alpha1().LabelIdentities().Create(context.TODO(), labelIdentity, metav1.CreateOptions{}) + if err := waitForLabelIdentityRealized(c, labelIdentity); err != nil { + t.Errorf("error when waiting for LabelIdentity '%s' to be realized: %v", labelIdentity.Name, err) + } + c.interfaceStore.EXPECT().GetContainerInterfacesByPod(pod.Name, pod.Namespace).Return([]*interfacestore.InterfaceConfig{&interfaceConfig}).Times(1) + c.ofClient.EXPECT().InstallPodFlows(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Eq(&labelIdentity.Spec.ID)).Times(1) + c.processNextWorkItem() + assert.Equal(t, map[types.NamespacedName]string{{Name: pod.Name, Namespace: pod.Namespace}: labelIdentity.Spec.Label}, c.podToLabel) + assert.Equal(t, map[string]podSet{labelIdentity.Spec.Label: {types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}: struct{}{}}}, c.labelToPods) + + // Update LabelIdentity + labelIdentity.Spec.ID = 2 + c.mcClient.MulticlusterV1alpha1().LabelIdentities().Update(context.TODO(), labelIdentity, metav1.UpdateOptions{}) + c.interfaceStore.EXPECT().GetContainerInterfacesByPod(pod.Name, pod.Namespace).Return([]*interfacestore.InterfaceConfig{&interfaceConfig}).Times(1) + c.ofClient.EXPECT().InstallPodFlows(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Eq(&labelIdentity.Spec.ID)).Times(1) + c.processNextWorkItem() + assert.Equal(t, map[types.NamespacedName]string{{Name: pod.Name, Namespace: pod.Namespace}: labelIdentity.Spec.Label}, c.podToLabel) + assert.Equal(t, map[string]podSet{labelIdentity.Spec.Label: {types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}: struct{}{}}}, c.labelToPods) + + // Delete LabelIdentity + c.mcClient.MulticlusterV1alpha1().LabelIdentities().Delete(context.TODO(), labelIdentity.Name, metav1.DeleteOptions{}) + c.interfaceStore.EXPECT().GetContainerInterfacesByPod(pod.Name, pod.Namespace).Return([]*interfacestore.InterfaceConfig{&interfaceConfig}).Times(1) + c.ofClient.EXPECT().InstallPodFlows(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Eq(&unknownLabelIdentity)).Times(1) + c.processNextWorkItem() + assert.Equal(t, map[types.NamespacedName]string{{Name: pod.Name, Namespace: pod.Namespace}: labelIdentity.Spec.Label}, c.podToLabel) + assert.Equal(t, map[string]podSet{labelIdentity.Spec.Label: {types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}: struct{}{}}}, c.labelToPods) + }() + select { + case <-time.After(5 * time.Second): + t.Errorf("Test didn't finish in time") + case <-finishCh: + } +} + +func toPodAddEvent(pod *corev1.Pod) antreatypes.PodUpdate { + return antreatypes.PodUpdate{ + PodNamespace: pod.Namespace, + PodName: pod.Name, + IsAdd: true, + } +} + +func waitForPodRealized(c *fakeStretchedNetworkPolicyController, pod *corev1.Pod) error { + return wait.Poll(interval, timeout, func() (bool, error) { + _, err := c.podLister.Pods(pod.Namespace).Get(pod.Name) + if err != nil { + return false, nil + } + return true, err + }) +} + +func waitForNSRealized(c *fakeStretchedNetworkPolicyController, ns *corev1.Namespace) error { + return wait.Poll(interval, timeout, func() (bool, error) { + _, err := c.namespaceLister.Get(ns.Name) + if err != nil { + return false, nil + } + return true, err + }) +} + +func waitForLabelIdentityRealized(c *fakeStretchedNetworkPolicyController, labelIdentity *v1alpha1.LabelIdentity) error { + return wait.Poll(interval, timeout, func() (bool, error) { + _, err := c.labelIdentityLister.Get(labelIdentity.Name) + if err != nil { + return false, nil + } + return true, err + }) +} diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 381e84760bc..36c57e467a6 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -75,7 +75,7 @@ type Client interface { // flows will be installed). Calls to InstallPodFlows are idempotent. Concurrent calls // to InstallPodFlows and / or UninstallPodFlows are supported as long as they are all // for different interfaceNames. - InstallPodFlows(interfaceName string, podInterfaceIPs []net.IP, podInterfaceMAC net.HardwareAddr, ofPort uint32, vlanID uint16) error + InstallPodFlows(interfaceName string, podInterfaceIPs []net.IP, podInterfaceMAC net.HardwareAddr, ofPort uint32, vlanID uint16, labelID *uint32) error // UninstallPodFlows removes the connection to the local Pod specified with the // interfaceName. UninstallPodFlows will do nothing if no connection to the Pod was established. @@ -326,14 +326,16 @@ type Client interface { InstallMulticlusterNodeFlows( clusterID string, peerConfigs map[*net.IPNet]net.IP, - tunnelPeerIP net.IP) error + tunnelPeerIP net.IP, + enableStretchedNetworkPolicy bool) error // InstallMulticlusterGatewayFlows installs flows to handle cross-cluster packets between Gateways. InstallMulticlusterGatewayFlows( clusterID string, peerConfigs map[*net.IPNet]net.IP, tunnelPeerIP net.IP, - localGatewayIP net.IP) error + localGatewayIP net.IP, + enableStretchedNetworkPolicy bool) error // InstallMulticlusterClassifierFlows installs flows to classify cross-cluster packets. InstallMulticlusterClassifierFlows(tunnelOFPort uint32, isGateway bool) error @@ -524,7 +526,7 @@ func (c *client) UninstallNodeFlows(hostname string) error { return c.deleteFlows(c.featurePodConnectivity.nodeCachedFlows, hostname) } -func (c *client) InstallPodFlows(interfaceName string, podInterfaceIPs []net.IP, podInterfaceMAC net.HardwareAddr, ofPort uint32, vlanID uint16) error { +func (c *client) InstallPodFlows(interfaceName string, podInterfaceIPs []net.IP, podInterfaceMAC net.HardwareAddr, ofPort uint32, vlanID uint16, labelID *uint32) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() @@ -534,7 +536,7 @@ func (c *client) InstallPodFlows(interfaceName string, podInterfaceIPs []net.IP, localGatewayMAC := c.nodeConfig.GatewayConfig.MAC flows := []binding.Flow{ - c.featurePodConnectivity.podClassifierFlow(ofPort, isAntreaFlexibleIPAM), + c.featurePodConnectivity.podClassifierFlow(ofPort, isAntreaFlexibleIPAM, labelID), c.featurePodConnectivity.l2ForwardCalcFlow(podInterfaceMAC, ofPort), } @@ -561,7 +563,7 @@ func (c *client) InstallPodFlows(interfaceName string, podInterfaceIPs []net.IP, flows = append(flows, c.featurePodConnectivity.podVLANFlow(ofPort, vlanID)) } } - err := c.addFlows(c.featurePodConnectivity.podCachedFlows, interfaceName, flows) + err := c.modifyFlows(c.featurePodConnectivity.podCachedFlows, interfaceName, flows) if err != nil { return err } @@ -1349,14 +1351,15 @@ func (c *client) UninstallMulticastGroup(groupID binding.GroupIDType) error { // Node and a local Gateway. func (c *client) InstallMulticlusterNodeFlows(clusterID string, peerConfigs map[*net.IPNet]net.IP, - tunnelPeerIP net.IP) error { + tunnelPeerIP net.IP, + enableStretchedNetworkPolicy bool) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() cacheKey := fmt.Sprintf("cluster_%s", clusterID) var flows []binding.Flow localGatewayMAC := c.nodeConfig.GatewayConfig.MAC for peerCIDR, remoteGatewayIP := range peerConfigs { - flows = append(flows, c.featureMulticluster.l3FwdFlowToRemoteViaTun(localGatewayMAC, *peerCIDR, tunnelPeerIP, remoteGatewayIP)...) + flows = append(flows, c.featureMulticluster.l3FwdFlowToRemoteViaTun(localGatewayMAC, *peerCIDR, tunnelPeerIP, remoteGatewayIP, enableStretchedNetworkPolicy)...) } return c.modifyFlows(c.featureMulticluster.cachedFlows, cacheKey, flows) } @@ -1366,6 +1369,7 @@ func (c *client) InstallMulticlusterGatewayFlows(clusterID string, peerConfigs map[*net.IPNet]net.IP, tunnelPeerIP net.IP, localGatewayIP net.IP, + enableStretchedNetworkPolicy bool, ) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() @@ -1373,7 +1377,7 @@ func (c *client) InstallMulticlusterGatewayFlows(clusterID string, var flows []binding.Flow localGatewayMAC := c.nodeConfig.GatewayConfig.MAC for peerCIDR, remoteGatewayIP := range peerConfigs { - flows = append(flows, c.featureMulticluster.l3FwdFlowToRemoteViaTun(localGatewayMAC, *peerCIDR, tunnelPeerIP, remoteGatewayIP)...) + flows = append(flows, c.featureMulticluster.l3FwdFlowToRemoteViaTun(localGatewayMAC, *peerCIDR, tunnelPeerIP, remoteGatewayIP, enableStretchedNetworkPolicy)...) // Add SNAT flows to change cross-cluster packets' source IP to local Gateway IP. flows = append(flows, c.featureMulticluster.snatConntrackFlows(*peerCIDR, localGatewayIP)...) } diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index 0093292b69e..fc75cdf8b2d 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -142,7 +142,21 @@ func installPodFlows(ofClient Client, cacheKey string) (int, error) { podMAC, _ := net.ParseMAC("AA:BB:CC:DD:EE:EE") podIP := net.ParseIP("10.0.0.2") ofPort := uint32(10) - err := ofClient.InstallPodFlows(containerID, []net.IP{podIP}, podMAC, ofPort, 0) + err := ofClient.InstallPodFlows(containerID, []net.IP{podIP}, podMAC, ofPort, 0, nil) + client := ofClient.(*client) + fCacheI, ok := client.featurePodConnectivity.podCachedFlows.Load(containerID) + if ok { + return len(fCacheI.(flowCache)), err + } + return 0, err +} + +func installPodFlowsWithLabelID(ofClient Client, cacheKey string, labelID *uint32) (int, error) { + containerID := cacheKey + podMAC, _ := net.ParseMAC("AA:BB:CC:DD:EE:EE") + podIP := net.ParseIP("10.0.0.2") + ofPort := uint32(10) + err := ofClient.InstallPodFlows(containerID, []net.IP{podIP}, podMAC, ofPort, 0, labelID) client := ofClient.(*client) fCacheI, ok := client.featurePodConnectivity.podCachedFlows.Load(containerID) if ok { @@ -153,13 +167,16 @@ func installPodFlows(ofClient Client, cacheKey string) (int, error) { // TestIdempotentFlowInstallation checks that InstallNodeFlows and InstallPodFlows are idempotent. func TestIdempotentFlowInstallation(t *testing.T) { + labelID := uint32(1) testCases := []struct { name string cacheKey string + labelID *uint32 numFlows int - installFn func(ofClient Client, cacheKey string) (int, error) + installFn func(ofClient Client, cacheKey string, labelID *uint32) (int, error) }{ - {"PodFlows", "aaaa-bbbb-cccc-dddd", 5, installPodFlows}, + {"PodFlows", "aaaa-bbbb-cccc-dddd", nil, 5, installPodFlowsWithLabelID}, + {"SNPPodFlows", "eeee-ffff-gggg-hhhh", &labelID, 5, installPodFlowsWithLabelID}, } // Check the flows are installed only once even though InstallNodeFlows/InstallPodFlows is called multiple times. @@ -173,13 +190,14 @@ func TestIdempotentFlowInstallation(t *testing.T) { m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1) // Installing the flows should succeed, and all the flows should be added into the cache. - numCached1, err := tc.installFn(fc, tc.cacheKey) + numCached1, err := tc.installFn(fc, tc.cacheKey, tc.labelID) require.Nil(t, err, "Error when installing Node flows") assert.Equal(t, tc.numFlows, numCached1) // Installing the same flows again must not return an error and should not // add additional flows to the cache. - numCached2, err := tc.installFn(fc, tc.cacheKey) + m.EXPECT().BundleOps(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1) + numCached2, err := tc.installFn(fc, tc.cacheKey, tc.labelID) require.Nil(t, err, "Error when installing Node flows again") assert.Equal(t, numCached1, numCached2) @@ -195,16 +213,16 @@ func TestIdempotentFlowInstallation(t *testing.T) { fc := newFakeClient(m, true, false, config.K8sNode, config.TrafficEncapModeEncap) defer resetPipelines() - errorCall := m.EXPECT().AddAll(gomock.Any()).Return(errors.New("Bundle error")).Times(1) + errorCall := m.EXPECT().AddAll(gomock.Any()).Return(errors.New("Bundle error")) m.EXPECT().AddAll(gomock.Any()).Return(nil).After(errorCall) // Installing the flows failed at the first time, and no flow cache is created. - numCached1, err := tc.installFn(fc, tc.cacheKey) + numCached1, err := tc.installFn(fc, tc.cacheKey, tc.labelID) require.NotNil(t, err, "Installing flows in bundle is expected to fail") assert.Equal(t, 0, numCached1) // Installing the same flows successfully at the second time, and add flows to the cache. - numCached2, err := tc.installFn(fc, tc.cacheKey) + numCached2, err := tc.installFn(fc, tc.cacheKey, tc.labelID) require.Nil(t, err, "Error when installing Node flows again") assert.Equal(t, tc.numFlows, numCached2) @@ -220,8 +238,8 @@ func TestFlowInstallationFailed(t *testing.T) { numAddCalls int installFn func(ofClient Client, cacheKey string) (int, error) }{ - {"NodeFlows", "host", 2, installNodeFlows}, - {"PodFlows", "aaaa-bbbb-cccc-dddd", 5, installPodFlows}, + {"NodeFlows", "host", 1, installNodeFlows}, + {"PodFlows", "aaaa-bbbb-cccc-dddd", 1, installPodFlows}, } for _, tc := range testCases { @@ -233,7 +251,7 @@ func TestFlowInstallationFailed(t *testing.T) { defer resetPipelines() // We generate an error for AddAll call. - m.EXPECT().AddAll(gomock.Any()).Return(errors.New("Bundle error")) + m.EXPECT().AddAll(gomock.Any()).Return(errors.New("Bundle error")).Times(tc.numAddCalls) var err error var numCached int @@ -753,7 +771,7 @@ func Test_client_InstallPodFlows(t *testing.T) { interfaceName := "pod1" cacheKey := fmt.Sprintf("multicast_pod_metric_%s", interfaceName) - assert.NoError(t, fc.InstallPodFlows(interfaceName, tc.podInterfaceIPs, podMAC, podOfPort, tc.vlanID)) + assert.NoError(t, fc.InstallPodFlows(interfaceName, tc.podInterfaceIPs, podMAC, podOfPort, tc.vlanID, nil)) fCacheI, ok := fc.featurePodConnectivity.podCachedFlows.Load(interfaceName) require.True(t, ok) flows := getFlowStrings(fCacheI) @@ -789,7 +807,7 @@ func Test_client_GetPodFlowKeys(t *testing.T) { podInterfaceIPs := []net.IP{net.ParseIP("10.10.0.11")} podMAC, _ := net.ParseMAC("00:00:10:10:00:11") - assert.NoError(t, fc.InstallPodFlows(interfaceName, podInterfaceIPs, podMAC, uint32(11), 0)) + assert.NoError(t, fc.InstallPodFlows(interfaceName, podInterfaceIPs, podMAC, uint32(11), 0, nil)) flowKeys := fc.GetPodFlowKeys(interfaceName) expectedFlowKeys := []string{ "table=1,arp,in_port=11,arp_sha=00:00:10:10:00:11,arp_spa=10.10.0.11", @@ -2009,6 +2027,7 @@ func Test_client_InstallMulticlusterNodeFlows(t *testing.T) { expectedFlows: []string{ "cookie=0x1060000000000, table=L3Forwarding, priority=200,ip,nw_dst=10.97.0.0/16 actions=set_field:0a:00:00:00:00:01->eth_src,set_field:aa:bb:cc:dd:ee:f0->eth_dst,set_field:192.168.78.101->tun_dst,set_field:0x10/0xf0->reg0,goto_table:L3DecTTL", "cookie=0x1060000000000, table=L3Forwarding, priority=200,ct_state=+rpl+trk,ip,nw_dst=192.168.78.101 actions=set_field:0a:00:00:00:00:01->eth_src,set_field:aa:bb:cc:dd:ee:f0->eth_dst,set_field:192.168.78.101->tun_dst,set_field:0x10/0xf0->reg0,goto_table:L3DecTTL", + "cookie=0x1060000000000, table=L3Forwarding, priority=199,ip,reg0=0x4000/0x3e000,nw_dst=192.168.78.101 actions=set_field:0a:00:00:00:00:01->eth_src,set_field:aa:bb:cc:dd:ee:f0->eth_dst,set_field:192.168.78.101->tun_dst,set_field:0x10/0xf0->reg0,goto_table:L3DecTTL", }, }, //TODO: IPv6 @@ -2025,7 +2044,7 @@ func Test_client_InstallMulticlusterNodeFlows(t *testing.T) { m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1) m.EXPECT().DeleteAll(gomock.Any()).Return(nil).Times(1) - assert.NoError(t, fc.InstallMulticlusterNodeFlows(clusterID, tc.peerConfigs, tc.tunnelPeerIP)) + assert.NoError(t, fc.InstallMulticlusterNodeFlows(clusterID, tc.peerConfigs, tc.tunnelPeerIP, true)) cacheKey := fmt.Sprintf("cluster_%s", clusterID) fCacheI, ok := fc.featureMulticluster.cachedFlows.Load(cacheKey) require.True(t, ok) @@ -2060,6 +2079,7 @@ func Test_client_InstallMulticlusterGatewayFlows(t *testing.T) { "cookie=0x1060000000000, table=UnSNAT, priority=200,ip,nw_dst=192.168.77.100 actions=ct(table=ConntrackZone,zone=65521,exec(nat))", "cookie=0x1060000000000, table=L3Forwarding, priority=200,ip,nw_dst=10.97.0.0/16 actions=set_field:0a:00:00:00:00:01->eth_src,set_field:aa:bb:cc:dd:ee:f0->eth_dst,set_field:192.168.78.101->tun_dst,set_field:0x10/0xf0->reg0,goto_table:L3DecTTL", "cookie=0x1060000000000, table=L3Forwarding, priority=200,ct_state=+rpl+trk,ip,nw_dst=192.168.78.101 actions=set_field:0a:00:00:00:00:01->eth_src,set_field:aa:bb:cc:dd:ee:f0->eth_dst,set_field:192.168.78.101->tun_dst,set_field:0x10/0xf0->reg0,goto_table:L3DecTTL", + "cookie=0x1060000000000, table=L3Forwarding, priority=199,ip,reg0=0x4000/0x3e000,nw_dst=192.168.78.101 actions=set_field:0a:00:00:00:00:01->eth_src,set_field:aa:bb:cc:dd:ee:f0->eth_dst,set_field:192.168.78.101->tun_dst,set_field:0x10/0xf0->reg0,goto_table:L3DecTTL", "cookie=0x1060000000000, table=SNATMark, priority=210,ct_state=+new+trk,ip,nw_dst=10.97.0.0/16 actions=ct(commit,table=SNAT,zone=65520,exec(set_field:0x20/0x20->ct_mark))", "cookie=0x1060000000000, table=SNAT, priority=200,ct_state=+new+trk,ip,nw_dst=10.97.0.0/16 actions=ct(commit,table=L2ForwardingCalc,zone=65521,exec(nat(src=192.168.77.100)))", }, @@ -2080,7 +2100,7 @@ func Test_client_InstallMulticlusterGatewayFlows(t *testing.T) { cacheKey := fmt.Sprintf("cluster_%s", clusterID) - assert.NoError(t, fc.InstallMulticlusterGatewayFlows(clusterID, tc.peerConfigs, tc.tunnelPeerIP, tc.localGatewayIP)) + assert.NoError(t, fc.InstallMulticlusterGatewayFlows(clusterID, tc.peerConfigs, tc.tunnelPeerIP, tc.localGatewayIP, true)) fCacheI, ok := fc.featureMulticluster.cachedFlows.Load(cacheKey) require.True(t, ok) assert.ElementsMatch(t, tc.expectedFlows, getFlowStrings(fCacheI)) diff --git a/pkg/agent/openflow/fields.go b/pkg/agent/openflow/fields.go index ad25e93a043..2f6a1e5177d 100644 --- a/pkg/agent/openflow/fields.go +++ b/pkg/agent/openflow/fields.go @@ -167,7 +167,7 @@ var ( // Marks using CT. var ( - //TODO: There is a bug in libOpenflow when CT_MARK range is from 0 to 0, and a wrong mask will be got. As a result, + // TODO: There is a bug in libOpenflow when CT_MARK range is from 0 to 0, and a wrong mask will be got. As a result, // don't just use bit 0 of CT_MARK. // CTMark (NXM_NX_CT_MARK) diff --git a/pkg/agent/openflow/multicluster.go b/pkg/agent/openflow/multicluster.go index 71a50ee56c8..7761836ef43 100644 --- a/pkg/agent/openflow/multicluster.go +++ b/pkg/agent/openflow/multicluster.go @@ -25,6 +25,11 @@ import ( // for cross-cluster traffic to distinguish from in-cluster traffic. var GlobalVirtualMACForMulticluster, _ = net.ParseMAC("aa:bb:cc:dd:ee:f0") +// UnknownLabelIdentity represents an unknown label identity. +// 24 bits in VNI are used for label identity. The max value is reserved for +// UnknownLabelIdentity. +const UnknownLabelIdentity = uint32(0xffffff) + type featureMulticluster struct { cookieAllocator cookie.Allocator cachedFlows *flowCategoryCache @@ -65,7 +70,8 @@ func (f *featureMulticluster) l3FwdFlowToRemoteViaTun( localGatewayMAC net.HardwareAddr, peerServiceCIDR net.IPNet, tunnelPeer net.IP, - remoteGatewayIP net.IP) []binding.Flow { + remoteGatewayIP net.IP, + enableStretchedNetworkPolicy bool) []binding.Flow { ipProtocol := getIPProtocol(peerServiceCIDR.IP) cookieID := f.cookieAllocator.Request(f.category).Raw() var flows []binding.Flow @@ -97,6 +103,23 @@ func (f *featureMulticluster) l3FwdFlowToRemoteViaTun( Action().GotoTable(L3DecTTLTable.GetID()). Done(), ) + if enableStretchedNetworkPolicy { + flows = append(flows, + // This generates the flow to forward cross-cluster reject traffic based + // on Gateway IP and reg. + L3ForwardingTable.ofTable.BuildFlow(priorityNormal-1). + Cookie(cookieID). + MatchProtocol(ipProtocol). + MatchRegMark(CustomReasonRejectRegMark). + MatchDstIP(remoteGatewayIP). + Action().SetSrcMAC(localGatewayMAC). + Action().SetDstMAC(GlobalVirtualMACForMulticluster). + Action().SetTunnelDst(tunnelPeer). // Flow based tunnel. Set tunnel destination. + Action().LoadRegMark(ToTunnelRegMark). + Action().GotoTable(L3DecTTLTable.GetID()). + Done(), + ) + } return flows } diff --git a/pkg/agent/openflow/network_policy.go b/pkg/agent/openflow/network_policy.go index 784bddbcd76..c6fcc582bc4 100644 --- a/pkg/agent/openflow/network_policy.go +++ b/pkg/agent/openflow/network_policy.go @@ -69,6 +69,7 @@ var ( MatchICMPv6Code = types.NewMatchKey(binding.ProtocolICMPv6, types.ICMPAddr, "icmpv6_code") MatchServiceGroupID = types.NewMatchKey(binding.ProtocolIP, types.ServiceGroupIDAddr, "reg7[0..31]") MatchIGMPProtocol = types.NewMatchKey(binding.ProtocolIGMP, types.IGMPAddr, "igmp") + MatchLabelID = types.NewMatchKey(binding.ProtocolIP, types.LabelIDAddr, "tun_id") Unsupported = types.NewMatchKey(binding.ProtocolIP, types.UnSupported, "unknown") // metricFlowIdentifier is used to identify metric flows in metric table. @@ -273,6 +274,25 @@ func NewCTIPNetAddress(addr net.IPNet) *CTIPNetAddress { return &ia } +type LabelIDAddress uint32 + +func (a *LabelIDAddress) GetMatchKey(addrType types.AddressType) *types.MatchKey { + return MatchLabelID +} + +func (a *LabelIDAddress) GetMatchValue() string { + return fmt.Sprintf("%d", uint32(*a)) +} + +func (a *LabelIDAddress) GetValue() interface{} { + return uint32(*a) +} + +func NewLabelIDAddress(labelID uint32) *LabelIDAddress { + a := LabelIDAddress(labelID) + return &a +} + // ConjunctionNotFound is an error response when the specified policyRuleConjunction is not found from the local cache. type ConjunctionNotFound uint32 diff --git a/pkg/agent/openflow/network_policy_test.go b/pkg/agent/openflow/network_policy_test.go index d222e45227d..fbd1f66c204 100644 --- a/pkg/agent/openflow/network_policy_test.go +++ b/pkg/agent/openflow/network_policy_test.go @@ -69,6 +69,7 @@ var ( protocolICMP = v1beta2.ProtocolICMP priority100 = uint16(100) priority200 = uint16(200) + priority201 = uint16(201) icmpType8 = int32(8) icmpCode0 = int32(0) @@ -453,6 +454,22 @@ func TestBatchInstallPolicyRuleFlows(t *testing.T) { UID: "id3", }, }, + { + Direction: v1beta2.DirectionIn, + From: parseLabelIdentityAddresses([]uint32{1, 2}), + Action: &actionDrop, + Priority: &priority201, + To: []types.Address{NewOFPortAddress(1)}, + Service: []v1beta2.Service{}, + FlowID: uint32(13), + TableID: AntreaPolicyIngressRuleTable.GetID(), + PolicyRef: &v1beta2.NetworkPolicyReference{ + Type: v1beta2.AntreaNetworkPolicy, + Namespace: "ns1", + Name: "np4", + UID: "id4", + }, + }, }, expectedFlowsFn: func(c *client) []binding.Flow { cookiePolicy := c.cookieAllocator.Request(cookie.NetworkPolicy).Raw() @@ -471,6 +488,11 @@ func TestBatchInstallPolicyRuleFlows(t *testing.T) { Action().LoadToRegField(CNPConjIDField, 12). Action().LoadRegMark(CnpDenyRegMark). Action().GotoTable(IngressMetricTable.GetID()).Done(), + AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority201).Cookie(cookiePolicy). + MatchConjID(13). + Action().LoadToRegField(CNPConjIDField, 13). + Action().LoadRegMark(CnpDenyRegMark). + Action().GotoTable(IngressMetricTable.GetID()).Done(), AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority100).Cookie(cookiePolicy). MatchProtocol(binding.ProtocolIP).MatchSrcIP(net.ParseIP("192.168.1.40")). Action().Conjunction(10, 1, 2). @@ -484,6 +506,12 @@ func TestBatchInstallPolicyRuleFlows(t *testing.T) { AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority100).Cookie(cookiePolicy). MatchProtocol(binding.ProtocolIP).MatchSrcIP(net.ParseIP("192.168.1.51")). Action().Conjunction(11, 1, 3).Done(), + AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority201).Cookie(cookiePolicy). + MatchTunnelID(1). + Action().Conjunction(13, 1, 3).Done(), + AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority201).Cookie(cookiePolicy). + MatchTunnelID(2). + Action().Conjunction(13, 1, 3).Done(), AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority100).Cookie(cookiePolicy). MatchRegFieldWithValue(TargetOFPortField, uint32(1)). Action().Conjunction(10, 2, 2). @@ -497,6 +525,9 @@ func TestBatchInstallPolicyRuleFlows(t *testing.T) { AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority100).Cookie(cookiePolicy). MatchRegFieldWithValue(TargetOFPortField, uint32(3)). Action().Conjunction(11, 2, 3).Done(), + AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority201).Cookie(cookiePolicy). + MatchRegFieldWithValue(TargetOFPortField, uint32(1)). + Action().Conjunction(13, 2, 3).Done(), AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority100).Cookie(cookiePolicy). MatchProtocol(binding.ProtocolTCP).MatchDstPort(8080, nil). Action().Conjunction(11, 3, 3).Done(), @@ -518,6 +549,9 @@ func TestBatchInstallPolicyRuleFlows(t *testing.T) { IngressMetricTable.ofTable.BuildFlow(priorityNormal).Cookie(cookiePolicy). MatchRegMark(CnpDenyRegMark).MatchRegFieldWithValue(CNPConjIDField, 12). Action().Drop().Done(), + IngressMetricTable.ofTable.BuildFlow(priorityNormal).Cookie(cookiePolicy). + MatchRegMark(CnpDenyRegMark).MatchRegFieldWithValue(CNPConjIDField, 13). + Action().Drop().Done(), } }, }, @@ -977,6 +1011,14 @@ func parseAddresses(addrs []string) []types.Address { return addresses } +func parseLabelIdentityAddresses(labelIdentities []uint32) []types.Address { + var addresses = make([]types.Address, 0) + for _, labelIdentity := range labelIdentities { + addresses = append(addresses, NewLabelIDAddress(labelIdentity)) + } + return addresses +} + func preparePipelines() { pipelineID := pipelineIP requiredTablesMap := make(map[*Table]struct{}) diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 0131fa9456f..1715fbdc458 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -619,11 +619,21 @@ func (f *featurePodConnectivity) gatewayClassifierFlow() binding.Flow { } // podClassifierFlow generates the flow to mark the packets from a local Pod port. -func (f *featurePodConnectivity) podClassifierFlow(podOFPort uint32, isAntreaFlexibleIPAM bool) binding.Flow { +// If multi-cluster is enabled, also load podLabelID into LabelIDField. +func (f *featurePodConnectivity) podClassifierFlow(podOFPort uint32, isAntreaFlexibleIPAM bool, podLabelID *uint32) binding.Flow { regMarksToLoad := []*binding.RegMark{FromLocalRegMark} if isAntreaFlexibleIPAM { regMarksToLoad = append(regMarksToLoad, AntreaFlexibleIPAMRegMark, RewriteMACRegMark) } + if podLabelID != nil { + return ClassifierTable.ofTable.BuildFlow(priorityLow). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchInPort(podOFPort). + Action().LoadRegMark(regMarksToLoad...). + Action().SetTunnelID(uint64(*podLabelID)). + Action().GotoStage(stageValidation). + Done() + } return ClassifierTable.ofTable.BuildFlow(priorityLow). Cookie(f.cookieAllocator.Request(f.category).Raw()). MatchInPort(podOFPort). @@ -1971,6 +1981,8 @@ func (f *featureNetworkPolicy) addFlowMatch(fb binding.FlowBuilder, matchKey *ty fb = fb.MatchRegFieldWithValue(ServiceGroupIDField, matchValue.(uint32)) case MatchIGMPProtocol: fb = fb.MatchProtocol(matchKey.GetOFProtocol()) + case MatchLabelID: + fb = fb.MatchTunnelID(uint64(matchValue.(uint32))) } return fb } diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index 5ea6d08d8d7..596c9c6daeb 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -340,31 +340,31 @@ func (mr *MockClientMockRecorder) InstallMulticlusterClassifierFlows(arg0, arg1 } // InstallMulticlusterGatewayFlows mocks base method -func (m *MockClient) InstallMulticlusterGatewayFlows(arg0 string, arg1 map[*net.IPNet]net.IP, arg2, arg3 net.IP) error { +func (m *MockClient) InstallMulticlusterGatewayFlows(arg0 string, arg1 map[*net.IPNet]net.IP, arg2, arg3 net.IP, arg4 bool) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InstallMulticlusterGatewayFlows", arg0, arg1, arg2, arg3) + ret := m.ctrl.Call(m, "InstallMulticlusterGatewayFlows", arg0, arg1, arg2, arg3, arg4) ret0, _ := ret[0].(error) return ret0 } // InstallMulticlusterGatewayFlows indicates an expected call of InstallMulticlusterGatewayFlows -func (mr *MockClientMockRecorder) InstallMulticlusterGatewayFlows(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) InstallMulticlusterGatewayFlows(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticlusterGatewayFlows", reflect.TypeOf((*MockClient)(nil).InstallMulticlusterGatewayFlows), arg0, arg1, arg2, arg3) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticlusterGatewayFlows", reflect.TypeOf((*MockClient)(nil).InstallMulticlusterGatewayFlows), arg0, arg1, arg2, arg3, arg4) } // InstallMulticlusterNodeFlows mocks base method -func (m *MockClient) InstallMulticlusterNodeFlows(arg0 string, arg1 map[*net.IPNet]net.IP, arg2 net.IP) error { +func (m *MockClient) InstallMulticlusterNodeFlows(arg0 string, arg1 map[*net.IPNet]net.IP, arg2 net.IP, arg3 bool) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InstallMulticlusterNodeFlows", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "InstallMulticlusterNodeFlows", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) return ret0 } // InstallMulticlusterNodeFlows indicates an expected call of InstallMulticlusterNodeFlows -func (mr *MockClientMockRecorder) InstallMulticlusterNodeFlows(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) InstallMulticlusterNodeFlows(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticlusterNodeFlows", reflect.TypeOf((*MockClient)(nil).InstallMulticlusterNodeFlows), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticlusterNodeFlows", reflect.TypeOf((*MockClient)(nil).InstallMulticlusterNodeFlows), arg0, arg1, arg2, arg3) } // InstallNodeFlows mocks base method @@ -382,17 +382,17 @@ func (mr *MockClientMockRecorder) InstallNodeFlows(arg0, arg1, arg2, arg3, arg4 } // InstallPodFlows mocks base method -func (m *MockClient) InstallPodFlows(arg0 string, arg1 []net.IP, arg2 net.HardwareAddr, arg3 uint32, arg4 uint16) error { +func (m *MockClient) InstallPodFlows(arg0 string, arg1 []net.IP, arg2 net.HardwareAddr, arg3 uint32, arg4 uint16, arg5 *uint32) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InstallPodFlows", arg0, arg1, arg2, arg3, arg4) + ret := m.ctrl.Call(m, "InstallPodFlows", arg0, arg1, arg2, arg3, arg4, arg5) ret0, _ := ret[0].(error) return ret0 } // InstallPodFlows indicates an expected call of InstallPodFlows -func (mr *MockClientMockRecorder) InstallPodFlows(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) InstallPodFlows(arg0, arg1, arg2, arg3, arg4, arg5 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallPodFlows", reflect.TypeOf((*MockClient)(nil).InstallPodFlows), arg0, arg1, arg2, arg3, arg4) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallPodFlows", reflect.TypeOf((*MockClient)(nil).InstallPodFlows), arg0, arg1, arg2, arg3, arg4, arg5) } // InstallPodSNATFlows mocks base method diff --git a/pkg/agent/types/networkpolicy.go b/pkg/agent/types/networkpolicy.go index c020a73b0d1..da5a5bbd0f7 100644 --- a/pkg/agent/types/networkpolicy.go +++ b/pkg/agent/types/networkpolicy.go @@ -56,6 +56,7 @@ const ( ICMPAddr ServiceGroupIDAddr IGMPAddr + LabelIDAddr UnSupported ) diff --git a/pkg/config/agent/config.go b/pkg/config/agent/config.go index 5a8e47418c6..06c5726243b 100644 --- a/pkg/config/agent/config.go +++ b/pkg/config/agent/config.go @@ -283,6 +283,8 @@ type MulticlusterConfig struct { // The Namespace where the Antrea Multi-cluster controller is running. // The default is antrea-agent's Namespace. Namespace string `yaml:"namespace,omitempty"` + // Enable StretchedNetworkPolicy which could be enforced on cross-cluster traffic. + EnableStretchedNetworkPolicy bool `yaml:"enableStretchedNetworkPolicy,omitempty"` } type ExternalNodeConfig struct { diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index 30e64c3ecd8..9e7db1349eb 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -226,6 +226,7 @@ type Action interface { SetSrcIP(addr net.IP) FlowBuilder SetDstIP(addr net.IP) FlowBuilder SetTunnelDst(addr net.IP) FlowBuilder + SetTunnelID(tunnelID uint64) FlowBuilder PopVLAN() FlowBuilder PushVLAN(etherType uint16) FlowBuilder SetVLAN(vlanID uint16) FlowBuilder @@ -281,6 +282,7 @@ type FlowBuilder interface { MatchICMPv6Type(icmp6Type byte) FlowBuilder MatchICMPv6Code(icmp6Code byte) FlowBuilder MatchTunnelDst(dstIP net.IP) FlowBuilder + MatchTunnelID(tunnelID uint64) FlowBuilder MatchTunMetadata(index int, data uint32) FlowBuilder MatchVLAN(nonVLAN bool, vlanId uint16, vlanMask *uint16) FlowBuilder // MatchCTSrcIP matches the source IPv4 address of the connection tracker original direction tuple. diff --git a/pkg/ovs/openflow/ofctrl_action.go b/pkg/ovs/openflow/ofctrl_action.go index 84311dbb442..7250e26876f 100644 --- a/pkg/ovs/openflow/ofctrl_action.go +++ b/pkg/ovs/openflow/ofctrl_action.go @@ -252,6 +252,13 @@ func (a *ofFlowAction) SetTunnelDst(addr net.IP) FlowBuilder { return a.builder } +// SetTunnelID is an action to modify packet tunnel ID to the specified ID. +func (a *ofFlowAction) SetTunnelID(tunnelID uint64) FlowBuilder { + setTunIDAct := &ofctrl.SetTunnelIDAction{TunnelID: tunnelID} + a.builder.ApplyAction(setTunIDAct) + return a.builder +} + // PopVLAN is an action to pop VLAN ID. func (a *ofFlowAction) PopVLAN() FlowBuilder { popVLANAct := &ofctrl.PopVLANAction{} diff --git a/pkg/ovs/openflow/ofctrl_builder.go b/pkg/ovs/openflow/ofctrl_builder.go index 8d90f6476fa..88179adb3fb 100644 --- a/pkg/ovs/openflow/ofctrl_builder.go +++ b/pkg/ovs/openflow/ofctrl_builder.go @@ -296,6 +296,13 @@ func (b *ofFlowBuilder) MatchTunnelDst(dstIP net.IP) FlowBuilder { return b } +// MatchTunnelID adds match condition for matching tun_id. +func (b *ofFlowBuilder) MatchTunnelID(tunnelID uint64) FlowBuilder { + b.matchers = append(b.matchers, fmt.Sprintf("tun_id=%d", tunnelID)) + b.ofFlow.Match.TunnelId = tunnelID + return b +} + func ctLabelRange(high, low uint64, rng *Range, match *ofctrl.FlowMatch) { // [127..64] [63..0] // high low diff --git a/pkg/ovs/openflow/testing/mock_openflow.go b/pkg/ovs/openflow/testing/mock_openflow.go index 4062f5977b2..ce28d5953de 100644 --- a/pkg/ovs/openflow/testing/mock_openflow.go +++ b/pkg/ovs/openflow/testing/mock_openflow.go @@ -1216,6 +1216,20 @@ func (mr *MockActionMockRecorder) SetTunnelDst(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTunnelDst", reflect.TypeOf((*MockAction)(nil).SetTunnelDst), arg0) } +// SetTunnelID mocks base method +func (m *MockAction) SetTunnelID(arg0 uint64) openflow.FlowBuilder { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetTunnelID", arg0) + ret0, _ := ret[0].(openflow.FlowBuilder) + return ret0 +} + +// SetTunnelID indicates an expected call of SetTunnelID +func (mr *MockActionMockRecorder) SetTunnelID(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTunnelID", reflect.TypeOf((*MockAction)(nil).SetTunnelID), arg0) +} + // SetVLAN mocks base method func (m *MockAction) SetVLAN(arg0 uint16) openflow.FlowBuilder { m.ctrl.T.Helper() @@ -2072,6 +2086,20 @@ func (mr *MockFlowBuilderMockRecorder) MatchTunnelDst(arg0 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MatchTunnelDst", reflect.TypeOf((*MockFlowBuilder)(nil).MatchTunnelDst), arg0) } +// MatchTunnelID mocks base method +func (m *MockFlowBuilder) MatchTunnelID(arg0 uint64) openflow.FlowBuilder { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MatchTunnelID", arg0) + ret0, _ := ret[0].(openflow.FlowBuilder) + return ret0 +} + +// MatchTunnelID indicates an expected call of MatchTunnelID +func (mr *MockFlowBuilderMockRecorder) MatchTunnelID(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MatchTunnelID", reflect.TypeOf((*MockFlowBuilder)(nil).MatchTunnelID), arg0) +} + // MatchVLAN mocks base method func (m *MockFlowBuilder) MatchVLAN(arg0 bool, arg1 uint16, arg2 *uint16) openflow.FlowBuilder { m.ctrl.T.Helper() diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 5a83088799d..0c9c9010f64 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -1275,6 +1275,20 @@ func (b *PodBuilder) Create(data *TestData) error { return nil } +func (data *TestData) UpdatePod(namespace, name string, mutateFunc func(*corev1.Pod)) error { + pod, err := data.clientset.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("error when getting '%s/%s' Pod: %v", namespace, name, err) + } + if mutateFunc != nil { + mutateFunc(pod) + } + if _, err := data.clientset.CoreV1().Pods(namespace).Update(context.TODO(), pod, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("error when updating '%s/%s' Pod: %v", namespace, name, err) + } + return nil +} + // createBusyboxPodOnNode creates a Pod in the test namespace with a single busybox container. The // Pod will be scheduled on the specified Node (if nodeName is not empty). func (data *TestData) createBusyboxPodOnNode(name string, ns string, nodeName string, hostNetwork bool) error { diff --git a/test/e2e/utils/cnp_spec_builder.go b/test/e2e/utils/cnp_spec_builder.go index ea73c34b290..9907e56d96c 100644 --- a/test/e2e/utils/cnp_spec_builder.go +++ b/test/e2e/utils/cnp_spec_builder.go @@ -261,6 +261,29 @@ func (b *ClusterNetworkPolicySpecBuilder) AddToServicesRule(svcRefs []crdv1alpha return b } +func (b *ClusterNetworkPolicySpecBuilder) AddStretchedIngressRule(pSel, nsSel map[string]string, + name string, ruleAppliedToSpecs []ACNPAppliedToSpec, action crdv1alpha1.RuleAction) *ClusterNetworkPolicySpecBuilder { + + var appliedTos []crdv1alpha1.AppliedTo + for _, at := range ruleAppliedToSpecs { + appliedTos = append(appliedTos, b.GetAppliedToPeer(at.PodSelector, at.NSSelector, at.PodSelectorMatchExp, at.NSSelectorMatchExp, at.Group, at.Service)) + } + newRule := crdv1alpha1.Rule{ + From: []crdv1alpha1.NetworkPolicyPeer{{Scope: "ClusterSet"}}, + Action: &action, + Name: name, + AppliedTo: appliedTos, + } + if len(pSel) > 0 { + newRule.From[0].PodSelector = &metav1.LabelSelector{MatchLabels: pSel} + } + if len(nsSel) > 0 { + newRule.From[0].NamespaceSelector = &metav1.LabelSelector{MatchLabels: nsSel} + } + b.Spec.Ingress = append(b.Spec.Ingress, newRule) + return b +} + // AddEgressDNS mutates the nth policy rule to allow DNS, convenience method func (b *ClusterNetworkPolicySpecBuilder) WithEgressDNS() *ClusterNetworkPolicySpecBuilder { protocolUDP, _ := AntreaPolicyProtocolToK8sProtocol(ProtocolUDP) diff --git a/test/integration/agent/cniserver_test.go b/test/integration/agent/cniserver_test.go index 8761b4830a1..88d1c6340b6 100644 --- a/test/integration/agent/cniserver_test.go +++ b/test/integration/agent/cniserver_test.go @@ -608,7 +608,7 @@ func cmdAddDelCheckTest(testNS ns.NetNS, tc testCase, dataDir string) { ovsPortUUID := uuid.New().String() ovsServiceMock.EXPECT().CreatePort(ovsPortname, ovsPortname, mock.Any()).Return(ovsPortUUID, nil).AnyTimes() ovsServiceMock.EXPECT().GetOFPort(ovsPortname, false).Return(int32(10), nil).AnyTimes() - ofServiceMock.EXPECT().InstallPodFlows(ovsPortname, mock.Any(), mock.Any(), mock.Any(), uint16(0)).Return(nil) + ofServiceMock.EXPECT().InstallPodFlows(ovsPortname, mock.Any(), mock.Any(), mock.Any(), uint16(0), nil).Return(nil) close(tester.networkReadyCh) // Test ips allocation @@ -827,7 +827,7 @@ func TestCNIServerChaining(t *testing.T) { routeMock.EXPECT().MigrateRoutesToGw(hostVeth.Name), ovsServiceMock.EXPECT().CreatePort(ovsPortname, ovsPortname, mock.Any()).Return(ovsPortUUID, nil), ovsServiceMock.EXPECT().GetOFPort(ovsPortname, false).Return(testContainerOFPort, nil), - ofServiceMock.EXPECT().InstallPodFlows(ovsPortname, []net.IP{podIP}, containerIntf.HardwareAddr, mock.Any(), uint16(0)), + ofServiceMock.EXPECT().InstallPodFlows(ovsPortname, []net.IP{podIP}, containerIntf.HardwareAddr, mock.Any(), uint16(0), nil), ) mock.InOrder(orderedCalls...) cniResp, err := server.CmdAdd(ctx, cniReq) diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index ae5fd859e2d..52b828ee34c 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -91,14 +91,15 @@ type testPeerConfig struct { } type testConfig struct { - bridge string - nodeConfig *agentconfig.NodeConfig - localPods []*testLocalPodConfig - peers []*testPeerConfig - globalMAC net.HardwareAddr - enableIPv6 bool - enableIPv4 bool - connectUplinkToBridge bool + bridge string + nodeConfig *agentconfig.NodeConfig + localPods []*testLocalPodConfig + peers []*testPeerConfig + globalMAC net.HardwareAddr + enableIPv6 bool + enableIPv4 bool + connectUplinkToBridge bool + enableStretchedNetworkPolicy bool } var ( @@ -129,7 +130,7 @@ func TestConnectivityFlows(t *testing.T) { ofClient.ResetOFTable() }() - config := prepareConfiguration(true, false) + config := prepareConfiguration(true, false, false) t.Run("testInitialize", func(t *testing.T) { testInitialize(t, config) @@ -158,6 +159,14 @@ func TestConnectivityFlows(t *testing.T) { t.Run("testExternalFlows", func(t *testing.T) { testExternalFlows(t, config) }) + + stretchedNetworkPolicyConfig := prepareConfiguration(true, false, true) + t.Run("testInstallPodFlows", func(t *testing.T) { + testInstallPodFlows(t, stretchedNetworkPolicyConfig) + }) + t.Run("testUninstallPodFlows", func(t *testing.T) { + testUninstallPodFlows(t, stretchedNetworkPolicyConfig) + }) } func TestAntreaFlexibleIPAMConnectivityFlows(t *testing.T) { @@ -177,7 +186,7 @@ func TestAntreaFlexibleIPAMConnectivityFlows(t *testing.T) { ofClient.ResetOFTable() }() - config := prepareConfiguration(true, false) + config := prepareConfiguration(true, false, false) config.connectUplinkToBridge = true config.localPods[0].ips = []net.IP{net.ParseIP("192.168.255.3")} vlanID := uint16(100) @@ -241,7 +250,7 @@ func TestReplayFlowsConnectivityFlows(t *testing.T) { ofClient.ResetOFTable() }() - config := prepareConfiguration(true, false) + config := prepareConfiguration(true, false, false) t.Run("testInitialize", func(t *testing.T) { testInitialize(t, config) }) @@ -274,7 +283,7 @@ func TestReplayFlowsNetworkPolicyFlows(t *testing.T) { err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) - config := prepareConfiguration(true, false) + config := prepareConfiguration(true, false, false) _, err = c.Initialize(roundInfo, config.nodeConfig, &agentconfig.NetworkConfig{TrafficEncapMode: agentconfig.TrafficEncapModeEncap, IPv4Enabled: true}, &agentconfig.EgressConfig{}, &agentconfig.ServiceConfig{}) require.Nil(t, err, "Failed to initialize OFClient") @@ -419,11 +428,17 @@ func testUninstallNodeFlows(t *testing.T, config *testConfig) { func testInstallPodFlows(t *testing.T, config *testConfig) { gatewayConfig := config.nodeConfig.GatewayConfig for _, pod := range config.localPods { - err := c.InstallPodFlows(pod.name, pod.ips, pod.mac, pod.ofPort, pod.vlanID) + var err error + if config.enableStretchedNetworkPolicy { + labelIdentity := ofClient.UnknownLabelIdentity + err = c.InstallPodFlows(pod.name, pod.ips, pod.mac, pod.ofPort, pod.vlanID, &labelIdentity) + } else { + err = c.InstallPodFlows(pod.name, pod.ips, pod.mac, pod.ofPort, pod.vlanID, nil) + } if err != nil { t.Fatalf("Failed to install Openflow entries for pod: %v", err) } - for _, tableFlow := range preparePodFlows(pod.ips, pod.mac, pod.ofPort, gatewayConfig.MAC, config.globalMAC, config.nodeConfig, config.connectUplinkToBridge, pod.vlanID) { + for _, tableFlow := range preparePodFlows(pod.ips, pod.mac, pod.ofPort, gatewayConfig.MAC, config.globalMAC, config.nodeConfig, config.connectUplinkToBridge, pod.vlanID, config.enableStretchedNetworkPolicy) { ofTestUtils.CheckFlowExists(t, ovsCtlClient, tableFlow.tableName, 0, true, tableFlow.flows) } } @@ -436,7 +451,7 @@ func testUninstallPodFlows(t *testing.T, config *testConfig) { if err != nil { t.Fatalf("Failed to uninstall Openflow entries for pod: %v", err) } - for _, tableFlow := range preparePodFlows(pod.ips, pod.mac, pod.ofPort, gatewayConfig.MAC, config.globalMAC, config.nodeConfig, config.connectUplinkToBridge, pod.vlanID) { + for _, tableFlow := range preparePodFlows(pod.ips, pod.mac, pod.ofPort, gatewayConfig.MAC, config.globalMAC, config.nodeConfig, config.connectUplinkToBridge, pod.vlanID, config.enableStretchedNetworkPolicy) { ofTestUtils.CheckFlowExists(t, ovsCtlClient, tableFlow.tableName, 0, false, tableFlow.flows) } } @@ -451,7 +466,7 @@ func TestNetworkPolicyFlows(t *testing.T) { err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) - config := prepareConfiguration(true, true) + config := prepareConfiguration(true, true, false) _, err = c.Initialize(roundInfo, config.nodeConfig, &agentconfig.NetworkConfig{TrafficEncapMode: agentconfig.TrafficEncapModeEncap, IPv4Enabled: true, IPv6Enabled: true}, &agentconfig.EgressConfig{}, &agentconfig.ServiceConfig{}) require.Nil(t, err, "Failed to initialize OFClient") @@ -573,7 +588,7 @@ func TestIPv6ConnectivityFlows(t *testing.T) { ofClient.CleanOFTableCache() ofClient.ResetOFTable() }() - config := prepareConfiguration(false, true) + config := prepareConfiguration(false, true, false) t.Run("testInitialize", func(t *testing.T) { testInitialize(t, config) }) @@ -613,7 +628,7 @@ func TestProxyServiceFlowsAntreaPolicyDisabled(t *testing.T) { err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) - config := prepareConfiguration(true, false) + config := prepareConfiguration(true, false, false) _, err = c.Initialize(roundInfo, config.nodeConfig, &agentconfig.NetworkConfig{TrafficEncapMode: agentconfig.TrafficEncapModeEncap, IPv4Enabled: true}, &agentconfig.EgressConfig{}, &agentconfig.ServiceConfig{}) require.Nil(t, err, "Failed to initialize OFClient") @@ -703,7 +718,7 @@ func TestProxyServiceFlowsAntreaPoilcyEnabled(t *testing.T) { err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) - config := prepareConfiguration(true, false) + config := prepareConfiguration(true, false, false) _, err = c.Initialize(roundInfo, config.nodeConfig, &agentconfig.NetworkConfig{TrafficEncapMode: agentconfig.TrafficEncapModeEncap, IPv4Enabled: true}, &agentconfig.EgressConfig{}, &agentconfig.ServiceConfig{}) require.Nil(t, err, "Failed to initialize OFClient") @@ -1041,7 +1056,7 @@ func testInstallGatewayFlows(t *testing.T, config *testConfig) { } } -func prepareConfiguration(enableIPv4, enableIPv6 bool) *testConfig { +func prepareConfiguration(enableIPv4, enableIPv6, enableStretchedNetworkPolicy bool) *testConfig { podMAC, _ := net.ParseMAC("aa:aa:aa:aa:aa:13") gwMAC, _ := net.ParseMAC("aa:aa:aa:aa:aa:11") uplinkMAC, _ := net.ParseMAC("aa:aa:aa:aa:aa:12") @@ -1089,32 +1104,37 @@ func prepareConfiguration(enableIPv4, enableIPv6 bool) *testConfig { vMAC, _ := net.ParseMAC("aa:bb:cc:dd:ee:ff") return &testConfig{ - bridge: br, - nodeConfig: nodeConfig, - localPods: []*testLocalPodConfig{podCfg}, - peers: []*testPeerConfig{peerNode}, - globalMAC: vMAC, - enableIPv4: enableIPv4, - enableIPv6: enableIPv6, + bridge: br, + nodeConfig: nodeConfig, + localPods: []*testLocalPodConfig{podCfg}, + peers: []*testPeerConfig{peerNode}, + globalMAC: vMAC, + enableIPv4: enableIPv4, + enableIPv6: enableIPv6, + enableStretchedNetworkPolicy: enableStretchedNetworkPolicy, } } -func preparePodFlows(podIPs []net.IP, podMAC net.HardwareAddr, podOFPort uint32, gwMAC, vMAC net.HardwareAddr, nodeConfig *agentconfig.NodeConfig, connectUplinkToBridge bool, vlanID uint16) []expectTableFlows { +func preparePodFlows(podIPs []net.IP, podMAC net.HardwareAddr, podOFPort uint32, gwMAC, vMAC net.HardwareAddr, nodeConfig *agentconfig.NodeConfig, connectUplinkToBridge bool, vlanID uint16, enableStretchedNetworkPolicy bool) []expectTableFlows { podIPv4 := util.GetIPv4Addr(podIPs) isAntreaFlexibleIPAM := connectUplinkToBridge && podIPv4 != nil && !nodeConfig.PodIPv4CIDR.Contains(podIPv4) actionNotAntreaFlexibleIPAMString := "" + actionNotMulticlusterString := "" matchRewriteMACMarkString := ",reg0=0x200/0x200" if isAntreaFlexibleIPAM { actionNotAntreaFlexibleIPAMString = ",set_field:0x100000/0x100000->reg4,set_field:0x200/0x200->reg0" matchRewriteMACMarkString = "" } + if enableStretchedNetworkPolicy { + actionNotMulticlusterString = ",set_field:0xffffff->tun_id" + } flows := []expectTableFlows{ { "Classifier", []*ofTestUtils.ExpectFlow{ { MatchStr: fmt.Sprintf("priority=190,in_port=%d", podOFPort), - ActStr: fmt.Sprintf("set_field:0x3/0xf->reg0%s,goto_table:SpoofGuard", actionNotAntreaFlexibleIPAMString), + ActStr: fmt.Sprintf("set_field:0x3/0xf->reg0%s%s,goto_table:SpoofGuard", actionNotAntreaFlexibleIPAMString, actionNotMulticlusterString), }, }, }, @@ -1767,7 +1787,7 @@ func TestEgressMarkFlows(t *testing.T) { err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) - config := prepareConfiguration(true, true) + config := prepareConfiguration(true, true, false) _, err = c.Initialize(roundInfo, config.nodeConfig, &agentconfig.NetworkConfig{TrafficEncapMode: agentconfig.TrafficEncapModeEncap}, &agentconfig.EgressConfig{}, &agentconfig.ServiceConfig{}) require.Nil(t, err, "Failed to initialize OFClient") @@ -1824,7 +1844,7 @@ func TestTrafficControlFlows(t *testing.T) { err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) - config := prepareConfiguration(true, false) + config := prepareConfiguration(true, false, false) _, err = c.Initialize(roundInfo, config.nodeConfig, &agentconfig.NetworkConfig{TrafficEncapMode: agentconfig.TrafficEncapModeEncap, IPv4Enabled: config.enableIPv4}, &agentconfig.EgressConfig{}, &agentconfig.ServiceConfig{}) require.Nil(t, err, "Failed to initialize OFClient")