Skip to content

Commit

Permalink
Update checking validation NetworkPolicy
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Qiu <[email protected]>
  • Loading branch information
wenqiq committed Mar 7, 2024
1 parent 3c85dbd commit ec533f0
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 118 deletions.
75 changes: 75 additions & 0 deletions test/performance/framework/client_pod/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package client_pod

import (
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/google/uuid"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -48,7 +50,80 @@ const (
ScaleAgentProbeContainerName = "antrea-scale-test-agent-probe"
ScaleControllerProbeContainerName = "antrea-scale-test-controller-probe"
ScaleClientPodTemplateName = "antrea-scale-test-client"
ScaleTestClientPodNamePrefix = "antrea-scale-test-client-pod"

ScaleTestControllerProbeDaemonSet = "antrea-scale-test-controller-probe-daemonset"
ScaleTestAgentProbeDaemonSet = "antrea-scale-test-agent-probe-daemonset"
)

var (
// RealNodeAffinity is used to make a Pod not to be scheduled to a simulated node.
RealNodeAffinity = corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: SimulatorNodeLabelKey,
Operator: corev1.NodeSelectorOpNotIn,
Values: []string{SimulatorNodeLabelValue},
},
},
},
},
},
},
}

// SimulateAffinity is used to make a Pod to be scheduled to a simulated node.
SimulateAffinity = corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: SimulatorNodeLabelKey,
Operator: corev1.NodeSelectorOpIn,
Values: []string{SimulatorNodeLabelValue},
},
},
},
},
},
},
}

// SimulateToleration marks a Pod able to run on a simulate node.
SimulateToleration = corev1.Toleration{
Key: SimulatorTaintKey,
Operator: corev1.TolerationOpEqual,
Value: SimulatorTaintValue,
Effect: corev1.TaintEffectNoExecute,
}

// MasterToleration marks a Pod able to run on the master node.
MasterToleration = corev1.Toleration{
Key: "node-role.kubernetes.io/master",
Operator: corev1.TolerationOpExists,
Effect: corev1.TaintEffectNoSchedule,
}

// ClientPodTemplate is the PodTemplateSpec of a scale test client Pod.
ClientPodTemplate = corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ScaleClientPodTemplateName: ""}},
Spec: corev1.PodSpec{
Affinity: &RealNodeAffinity,
Tolerations: []corev1.Toleration{MasterToleration},
Containers: []corev1.Container{
{
Name: ScaleClientContainerName,
Image: "busybox",
Command: []string{"nc", "-lk", "-p", "80"},
ImagePullPolicy: corev1.PullIfNotPresent,
},
},
},
}
)
101 changes: 101 additions & 0 deletions test/performance/framework/client_pod/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2024 Antrea Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client_pod

import (
"context"
"fmt"
"github.com/google/uuid"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
)

func CreateClientPod(ctx context.Context, kClient kubernetes.Interface, probes []string, containerName string) (*corev1.Pod, error) {
var err error
var newPod *corev1.Pod
namespace := ClientPodsNamespace
podName := ScaleTestClientPodNamePrefix + uuid.New().String()[:6]
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
newPod.Namespace = namespace
newPod.Name = podName
newPod.Spec = corev1.PodSpec{
Affinity: &RealNodeAffinity,
Tolerations: []corev1.Toleration{MasterToleration},
Containers: []corev1.Container{
{
Name: ScaleClientContainerName,
Image: "busybox",
Command: []string{"nc", "-lk", "-p", "80"},
ImagePullPolicy: corev1.PullIfNotPresent,
},
},
}
var containers []corev1.Container
for _, probe := range probes {
l := strings.Split(probe, ":")
server, port := l[0], l[1]
if server == "" {
server = "$NODE_IP"
}

containers = append(containers, corev1.Container{
Name: containerName,
Image: "busybox",
// read up rest </proc/uptime; t1="${up%.*}${up#*.}"
Command: []string{"/bin/sh", "-c", fmt.Sprintf("server=%s; output_file=\"ping_log.txt\"; if [ ! -e \"$output_file\" ]; then touch \"$output_file\"; fi; last_status=\"unknown\"; last_change_time=$(adjtimex | awk '/(time.tv_sec|time.tv_usec):/ { printf(\"%%06d\", $2) }' && printf \"\\n\"); while true; do status=$(nc -vz -w 1 \"$server\" %s > /dev/null && echo \"up\" || echo \"down\"); current_time=$(adjtimex | awk '/(time.tv_sec|time.tv_usec):/ { printf(\"%%06d\", $2) }' && printf \"\\n\"); time_diff=$((current_time - last_change_time)); if [ \"$status\" != \"$last_status\" ]; then echo \"$(adjtimex | awk '/(time.tv_sec|time.tv_usec):/ { printf(\"%%06d\", $2) }' && printf \"\\n\") Status changed from $last_status to $status after ${time_diff} nanoseconds\"; echo \"$(adjtimex | awk '/(time.tv_sec|time.tv_usec):/ { printf(\"%%06d\", $2) }' && printf \"\\n\") Status changed from $last_status to $status after ${time_diff} nanoseconds\" >> \"$output_file\"; last_change_time=$current_time; last_status=$status; fi; sleep 0.1; done\n", server, port)},
ImagePullPolicy: corev1.PullIfNotPresent,
Env: []corev1.EnvVar{
{
Name: "NODE_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.hostIP",
},
},
},
},
})
}

newPod.Spec.Containers = append(newPod.Spec.Containers, containers...)

_, err = kClient.CoreV1().Pods(namespace).Create(ctx, newPod, metav1.CreateOptions{})
return err
})
if err != nil {
return nil, err
}

err = wait.PollImmediate(time.Second, 30, func() (bool, error) {
pod, err := kClient.CoreV1().Pods(namespace).Get(ctx, newPod.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
return pod.Status.Phase == corev1.PodRunning, nil
})
if err != nil {
return nil, err
}

klog.InfoS("Create Client Pod successfully!")
return newPod, nil
}
5 changes: 3 additions & 2 deletions test/performance/framework/networkpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ func ScaleNetworkPolicy(ctx context.Context, ch chan time.Duration, data *ScaleD
res.err = fmt.Errorf("list client Pod error: %+v", err)
return
}
checkCount, scaleNum, err := networkpolicy.ScaleUp(ctx, data.kubeconfig, data.kubernetesClientSet, data.namespaces,
checkCount, err := networkpolicy.ScaleUp(ctx, data.kubeconfig, data.kubernetesClientSet, data.namespaces,
data.Specification.NpNumPerNs, clientPods.Items, data.Specification.IPv6, data.maxCheckNum, ch)
if err != nil {
res.err = fmt.Errorf("scale up NetworkPolicies error: %v", err)
return
}
res.scaleNum = scaleNum
res.scaleNum = len(data.namespaces) * data.Specification.NpNumPerNs
res.actualCheckNum = checkCount

defer func() {
for {
Expand Down
117 changes: 78 additions & 39 deletions test/performance/framework/networkpolicy/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,13 @@ type NetworkPolicyInfo struct {
Spec netv1.NetworkPolicySpec
}

func ScaleUp(ctx context.Context, kubeConfig *rest.Config, cs kubernetes.Interface, nss []string, numPerNs int, clientPods []corev1.Pod, ipv6 bool, maxCheckNum int, ch chan time.Duration) (actualCheckNum, scale int, err error) {
func ScaleUp(ctx context.Context, kubeConfig *rest.Config, cs kubernetes.Interface, nss []string, numPerNs int, clientPods []corev1.Pod, ipv6 bool, maxCheckNum int, ch chan time.Duration) (actualCheckNum int, err error) {
// ScaleUp networkPolicies
start := time.Now()
scale = len(nss) * numPerNs
for _, ns := range nss {
npsData, err := generateNetworkPolicies(ns, numPerNs)
if err != nil {
return 0, 0, fmt.Errorf("error when generating network policies: %w", err)
return 0, fmt.Errorf("error when generating network policies: %w", err)
}
klog.InfoS("Scale up NetworkPolicies", "Num", len(npsData), "Namespace", ns)
for _, np := range npsData {
Expand All @@ -116,45 +115,62 @@ func ScaleUp(ctx context.Context, kubeConfig *rest.Config, cs kubernetes.Interfa
}
}
npInfo = NetworkPolicyInfo{Name: newNP.Name, Namespace: newNP.Namespace, Spec: newNP.Spec}
return nil
}); err != nil {
return 0, 0, err
}

if actualCheckNum < maxCheckNum && actualCheckNum < cap(ch) {
// Check connection of Pods in NetworkPolicies, workload Pods
fromPod, ip, err := SelectConnectPod(ctx, cs, npInfo.Namespace, &npInfo)
if err != nil || fromPod == nil || ip == "" {
continue
}
actualCheckNum++
go func() {
if err := utils.WaitUntil(ctx, ch, kubeConfig, cs, fromPod.Namespace, fromPod.Name, ip, false); err != nil {
klog.ErrorS(err, "the connection should be success", "NetworkPolicyName", np.Name, "FromPodName", fromPod.Name, "ToIP", ip)
if actualCheckNum < cap(ch) {
startTimeStamp := time.Now().UnixNano()
serverIP, err := selectServerPod(ctx, cs, newNP.Namespace, npInfo)
if serverIP != "" && err == nil {
actualCheckNum++
go func() {
clientPod, err := client_pod.CreateClientPod(ctx, cs, []string{fmt.Sprintf("%s:%d", serverIP, 80)}, "check"+np.Name)
if err != nil {
klog.ErrorS(err, "Create client test Pod failed")
}
klog.InfoS("Update test Pod to check NetworkPolicy", "serverIP", serverIP)
key := "to down"
if err := utils.FetchTimestampFromLog(ctx, cs, clientPod.Namespace, clientPod.Name, workload_pod.ScaleTestPodProbeContainerName, ch, startTimeStamp, key); err != nil {
klog.ErrorS(err, "Checking the validity the NetworkPolicy error", "ClientPodName", clientPod.Name, "NetworkPolicy", npInfo)
}
}()
}
}()
}

if actualCheckNum < maxCheckNum && actualCheckNum < cap(ch) {
// Check isolation of Pods in NetworkPolicies, client Pods to workload Pods
fromPod, ip, err := SelectIsoPod(ctx, cs, np.Namespace, npInfo, clientPods)
if err != nil || fromPod == nil || ip == "" {
continue
}
actualCheckNum++
if fromPod.Namespace != client_pod.ClientPodsNamespace {
clientPod, err := workload_pod.CreateClientPod(ctx, cs, fromPod.Namespace, fromPod.Name, []string{fmt.Sprintf("%s:%d", ip, 80)}, workload_pod.ScaleClientPodProbeContainerName)
if err != nil {
klog.ErrorS(err, "Update test Pod failed")
}
klog.InfoS("Create test Pod to check NetworkPolicy", "Name", clientPod.Name, "Namespace", clientPod.Namespace)
}
go func() {
if err := utils.WaitUntil(ctx, ch, kubeConfig, cs, fromPod.Namespace, fromPod.Name, ip, true); err != nil {
klog.ErrorS(err, "the connection should not be success", "NetworkPolicyName", np.Name, "FromPodName", fromPod.Name, "ToIP", ip)
}
}()
return nil
}); err != nil {
return 0, err
}
klog.InfoS("Create new NetworkPolicy", "npInfo", npInfo)
// if actualCheckNum < maxCheckNum && actualCheckNum < cap(ch) {
// // Check connection of Pods in NetworkPolicies, workload Pods
// fromPod, ip, err := SelectConnectPod(ctx, cs, npInfo.Namespace, &npInfo)
// if err != nil || fromPod == nil || ip == "" {
// continue
// }
// actualCheckNum++
// go func() {
// if err := utils.WaitUntil(ctx, ch, kubeConfig, cs, fromPod.Namespace, fromPod.Name, ip, false); err != nil {
// klog.ErrorS(err, "the connection should be success", "NetworkPolicyName", np.Name, "FromPodName", fromPod.Name, "ToIP", ip)
// }
// }()
// }
// if actualCheckNum < maxCheckNum && actualCheckNum < cap(ch) {
// // Check isolation of Pods in NetworkPolicies, client Pods to workload Pods
// fromPod, ip, err := SelectIsoPod(ctx, cs, np.Namespace, npInfo, clientPods)
// if err != nil || fromPod == nil || ip == "" {
// continue
// }
// actualCheckNum++
// if fromPod.Namespace != client_pod.ClientPodsNamespace {
// clientPod, err := workload_pod.CreateClientPod(ctx, cs, fromPod.Namespace, fromPod.Name, []string{fmt.Sprintf("%s:%d", ip, 80)}, workload_pod.ScaleClientPodProbeContainerName)
// if err != nil {
// klog.ErrorS(err, "Update test Pod failed")
// }
// klog.InfoS("Create test Pod to check NetworkPolicy", "Name", clientPod.Name, "Namespace", clientPod.Namespace)
// }
// go func() {
// if err := utils.WaitUntil(ctx, ch, kubeConfig, cs, fromPod.Namespace, fromPod.Name, ip, true); err != nil {
// klog.ErrorS(err, "the connection should not be success", "NetworkPolicyName", np.Name, "FromPodName", fromPod.Name, "ToIP", ip)
// }
// }()
// }
}
}
klog.InfoS("Scale up NetworkPolicies", "Duration", time.Since(start), "actualCheckNum", actualCheckNum)
Expand Down Expand Up @@ -244,3 +260,26 @@ func SelectIsoPod(ctx context.Context, cs kubernetes.Interface, ns string, np Ne
}
return
}

func selectServerPod(ctx context.Context, cs kubernetes.Interface, ns string, np NetworkPolicyInfo) (toPodIP string, err error) {
klog.V(2).InfoS("Checking isolation of the NetworkPolicy", "NetworkPolicyName", np.Name, "Namespace", np.Namespace)
podList, err := cs.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{LabelSelector: metav1.FormatLabelSelector(&np.Spec.PodSelector)})
if err != nil {
return "", fmt.Errorf("error when selecting networkpolicy applied to pods: %w", err)
}
if len(podList.Items) == 0 {
klog.V(2).InfoS("No Pod is selected by the NetworkPolicy, skip", "NetworkPolicyName", np.Name)
return "", nil
}
var toPod corev1.Pod
if len(np.Spec.Ingress) > 0 {
toPod = podList.Items[int(utils.GenRandInt())%len(podList.Items)]
} else {
klog.V(2).InfoS("Not Ingress NetworkPolicy, skip check")
return "", nil
}
if toPod.Status.PodIP == "" {
return "", fmt.Errorf("podIP is nil, Namespace: %s, Name: %s", toPod.Namespace, toPod.Name)
}
return
}
8 changes: 4 additions & 4 deletions test/performance/framework/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ func workloadPodTemplate(podName, ns string, labels map[string]string, onRealNod
var affinity *corev1.Affinity
var tolerations []corev1.Toleration
if onRealNode {
affinity = &RealNodeAffinity
tolerations = append(tolerations, MasterToleration)
affinity = &client_pod.RealNodeAffinity
tolerations = append(tolerations, client_pod.MasterToleration)
} else {
affinity = &SimulateAffinity
tolerations = append(tolerations, SimulateToleration)
affinity = &client_pod.SimulateAffinity
tolerations = append(tolerations, client_pod.SimulateToleration)
}
labels[workloadPodLabelKey] = workloadPodLabelValue
labels["name"] = podName
Expand Down
Loading

0 comments on commit ec533f0

Please sign in to comment.