From b8794666f4542db0c3ccf2af179c398f115585d3 Mon Sep 17 00:00:00 2001 From: Wenqi Qiu Date: Fri, 26 Jan 2024 10:38:32 +0800 Subject: [PATCH] record response time Signed-off-by: Wenqi Qiu --- test/performance/framework/case.go | 40 +++++++--- test/performance/framework/networkpolicy.go | 73 ++++++++----------- .../framework/networkpolicy/scale_up.go | 59 ++++++++------- test/performance/framework/pods.go | 2 +- test/performance/framework/restart.go | 7 +- test/performance/framework/scale_up.go | 1 + test/performance/framework/service.go | 6 +- test/performance/framework/table/table.go | 8 +- .../performance/framework/table/table_test.go | 4 +- test/performance/{framework => utils}/exec.go | 30 ++++++-- 10 files changed, 130 insertions(+), 100 deletions(-) rename test/performance/{framework => utils}/exec.go (75%) diff --git a/test/performance/framework/case.go b/test/performance/framework/case.go index 892a52a7bf0..de2ed38caba 100644 --- a/test/performance/framework/case.go +++ b/test/performance/framework/case.go @@ -25,7 +25,7 @@ import ( "antrea.io/antrea/test/performance/framework/table" ) -type RunFunc func(ctx context.Context, ch chan ResponseTime, data *ScaleData) error +type RunFunc func(ctx context.Context, ch chan time.Duration, data *ScaleData) error var cases = make(map[string]RunFunc, 128) @@ -64,26 +64,44 @@ type ResponseTime struct { func (c *ScaleTestCase) Run(ctx context.Context, testData *ScaleData) error { ctx = wrapScaleTestName(ctx, c.name) + done := make(chan interface{}, 1) startTime := time.Now() caseName := ctx.Value(CtxScaleCaseName).(string) - ch := make(chan ResponseTime, 1) + testData.maxCheckNum = testData.nodesNum * 2 + ress := make(chan time.Duration, testData.maxCheckNum) res := "failed" defer func() { var rows [][]string - select { - case respTime := <-ch: - rows = append(rows, table.GenerateRow(caseName, res, time.Since(startTime), - respTime.avg.String(), respTime.max.String(), respTime.min.String())) - table.ShowResult(os.Stdout, rows) - case <-time.After(testData.checkTimeout): - klog.InfoS("wait timeout", "check time duration", testData.checkTimeout) + + var total, minRes, maxRes, avg time.Duration + count := 0 + for i := 0; i < testData.maxCheckNum; i++ { + res := <-ress + total += res + count++ + + if count == 1 || res < minRes { + minRes = res + } + + if res > maxRes { + maxRes = res + } } + + avg = total / time.Duration(count) + + rows = append(rows, table.GenerateRow(caseName, res, time.Since(startTime).String(), + avg.String(), maxRes.String(), minRes.String())) + table.ShowResult(os.Stdout, rows) + + close(ress) + close(done) }() - done := make(chan interface{}, 1) go func() { - done <- c.run(ctx, ch, testData) + done <- c.run(ctx, ress, testData) }() select { diff --git a/test/performance/framework/networkpolicy.go b/test/performance/framework/networkpolicy.go index 94e8f954352..56aa0bf45c3 100644 --- a/test/performance/framework/networkpolicy.go +++ b/test/performance/framework/networkpolicy.go @@ -19,58 +19,49 @@ import ( "fmt" "time" - "k8s.io/klog/v2" - "antrea.io/antrea/test/performance/framework/networkpolicy" - "antrea.io/antrea/test/performance/utils" ) func init() { RegisterFunc("ScaleNetworkPolicy", ScaleNetworkPolicy) } -func ScaleNetworkPolicy(ctx context.Context, ch chan ResponseTime, data *ScaleData) error { - nps, err := networkpolicy.ScaleUp(ctx, data.kubeconfig, data.kubernetesClientSet, data.namespaces, - data.Specification.NpNumPerNs, data.clientPods, data.Specification.IPv6) +func ScaleNetworkPolicy(ctx context.Context, ch chan time.Duration, data *ScaleData) error { + _, err := networkpolicy.ScaleUp(ctx, data.kubeconfig, data.kubernetesClientSet, data.namespaces, + data.Specification.NpNumPerNs, data.clientPods, data.Specification.IPv6, data.maxCheckNum, ch) if err != nil { return fmt.Errorf("scale up NetworkPolicies error: %v", err) } - maxNPCheckedCount := data.nodesNum - - start := time.Now() - for i, np := range nps { - if utils.CheckTimeout(start, data.checkTimeout) || i > maxNPCheckedCount { - klog.InfoS("NetworkPolicies check deadline exceeded", "count", i) - break - } - - // Check connection of Pods in NetworkPolicies, workload Pods - fromPod, ip, err := networkpolicy.SelectConnectPod(ctx, data.kubernetesClientSet, np.Namespace, &nps[i]) - if err != nil || fromPod == nil || ip == "" { - continue - } - if err := PingIP(ctx, data.kubeconfig, data.kubernetesClientSet, fromPod.Namespace, fromPod.Name, ip); err != nil { - return fmt.Errorf("the connection should be success, NetworkPolicyName: %s, FromPod: %s, ToPod: %s", - np.Name, fromPod.Name, ip) - } - - // Check isolation of Pods in NetworkPolicies, client Pods to workload Pods - fromPod, ip, err = networkpolicy.SelectIsoPod(ctx, data.kubernetesClientSet, np.Namespace, np, data.clientPods) - if err != nil || fromPod == nil || ip == "" { - continue - } - if err := PingIP(ctx, data.kubeconfig, data.kubernetesClientSet, fromPod.Namespace, fromPod.Name, ip); err == nil { - return fmt.Errorf("the connection should not be success, NetworkPolicyName: %s, FromPod: %s, ToPodIP: %s", np.Name, fromPod.Name, ip) - } - klog.InfoS("Checked networkPolicy", "Name", np.Name, "Namespace", np.Namespace, "count", i, "maxNum", maxNPCheckedCount) - } - respTime := ResponseTime{ - max: 1, - min: 5, - avg: 3, - } - ch <- respTime + // maxNPCheckedCount := data.nodesNum + // + // start := time.Now() + // for i, np := range nps { + // if utils.CheckTimeout(start, data.checkTimeout) || i > maxNPCheckedCount { + // klog.InfoS("NetworkPolicies check deadline exceeded", "count", i) + // break + // } + // + // // Check connection of Pods in NetworkPolicies, workload Pods + // fromPod, ip, err := networkpolicy.SelectConnectPod(ctx, data.kubernetesClientSet, np.Namespace, &nps[i]) + // if err != nil || fromPod == nil || ip == "" { + // continue + // } + // if err := PingIP(ctx, ch, data.kubeconfig, data.kubernetesClientSet, fromPod.Namespace, fromPod.Name, ip); err != nil { + // return fmt.Errorf("the connection should be success, NetworkPolicyName: %s, FromPod: %s, ToPod: %s", + // np.Name, fromPod.Name, ip) + // } + // + // // Check isolation of Pods in NetworkPolicies, client Pods to workload Pods + // fromPod, ip, err = networkpolicy.SelectIsoPod(ctx, data.kubernetesClientSet, np.Namespace, np, data.clientPods) + // if err != nil || fromPod == nil || ip == "" { + // continue + // } + // if err := PingIP(ctx, ch, data.kubeconfig, data.kubernetesClientSet, fromPod.Namespace, fromPod.Name, ip); err == nil { + // return fmt.Errorf("the connection should not be success, NetworkPolicyName: %s, FromPod: %s, ToPodIP: %s", np.Name, fromPod.Name, ip) + // } + // klog.InfoS("Checked networkPolicy", "Name", np.Name, "Namespace", np.Namespace, "count", i, "maxNum", maxNPCheckedCount) + // } if err := networkpolicy.ScaleDown(ctx, data.namespaces, data.kubernetesClientSet); err != nil { return err } diff --git a/test/performance/framework/networkpolicy/scale_up.go b/test/performance/framework/networkpolicy/scale_up.go index 52a5d63b6df..9a802396ec7 100644 --- a/test/performance/framework/networkpolicy/scale_up.go +++ b/test/performance/framework/networkpolicy/scale_up.go @@ -94,9 +94,10 @@ type NetworkPolicyInfo struct { toIP string } -func ScaleUp(ctx context.Context, kubeConfig *rest.Config, cs kubernetes.Interface, nss []string, numPerNs int, clientPods []corev1.Pod, ipv6 bool) (nps []NetworkPolicyInfo, err error) { +func ScaleUp(ctx context.Context, kubeConfig *rest.Config, cs kubernetes.Interface, nss []string, numPerNs int, clientPods []corev1.Pod, ipv6 bool, maxCheckNum int, ch chan time.Duration) (nps []NetworkPolicyInfo, err error) { // ScaleUp networkPolicies start := time.Now() + checkCount := 0 for _, ns := range nss { npsData, err := generateNetworkPolicies(ns, numPerNs) if err != nil { @@ -104,6 +105,10 @@ func ScaleUp(ctx context.Context, kubeConfig *rest.Config, cs kubernetes.Interfa } klog.InfoS("Scale up NetworkPolicies", "Num", len(npsData), "Namespace", ns) for _, np := range npsData { + shouldCheck := true + if checkCount > maxCheckNum { + shouldCheck = false + } if err := utils.DefaultRetry(func() error { newNP, err := cs.NetworkingV1().NetworkPolicies(ns).Create(ctx, np, metav1.CreateOptions{}) if err != nil { @@ -114,31 +119,33 @@ func ScaleUp(ctx context.Context, kubeConfig *rest.Config, cs kubernetes.Interfa } } npInfo := NetworkPolicyInfo{Name: newNP.Name, Namespace: newNP.Namespace, Spec: newNP.Spec} - - // // Check connection of Pods in NetworkPolicies, workload Pods - // fromPod, ip, err := SelectConnectPod(ctx, cs, newNP.Namespace, &npInfo) - // if err != nil { - // return err - // } - // npInfo.fromPodName = fromPod.Name - // npInfo.fromPodNamespace = fromPod.Namespace - // npInfo.toIP = ip - // go func() { - // if err := framework.PingIP(ctx, kubeConfig, cs, fromPod.Namespace, fromPod.Name, ip); err != nil { - // klog.ErrorS(err, "the connection should be success", "NetworkPolicyName", np.Name, "FromPodName", fromPod.Name, "ToIP", ip) - // } - // }() - // - // // Check isolation of Pods in NetworkPolicies, client Pods to workload Pods - // fromPod, ip, err = SelectIsoPod(ctx, cs, np.Namespace, npInfo, clientPods) - // if err != nil { - // return err - // } - // go func() { - // if err := framework.PingIP(ctx, kubeConfig, cs, fromPod.Namespace, fromPod.Name, ip); err == nil { - // klog.ErrorS(err, "the connection should not be success", "NetworkPolicyName", np.Name, "FromPodName", fromPod.Name, "ToIP", ip) - // } - // }() + if shouldCheck { + // Check connection of Pods in NetworkPolicies, workload Pods + fromPod, ip, err := SelectConnectPod(ctx, cs, newNP.Namespace, &npInfo) + if err != nil { + return err + } + npInfo.fromPodName = fromPod.Name + npInfo.fromPodNamespace = fromPod.Namespace + npInfo.toIP = ip + go func() { + if err := utils.WaitUntil(ctx, ch, kubeConfig, cs, fromPod.Namespace, fromPod.Name, ip, false); err != nil { + klog.ErrorS(err, "the connection should be success", "NetworkPolicyName", np.Name, "FromPodName", fromPod.Name, "ToIP", ip) + } + }() + + // Check isolation of Pods in NetworkPolicies, client Pods to workload Pods + fromPod, ip, err = SelectIsoPod(ctx, cs, np.Namespace, npInfo, clientPods) + if err != nil { + return err + } + go func() { + if err := utils.WaitUntil(ctx, ch, kubeConfig, cs, fromPod.Namespace, fromPod.Name, ip, true); err != nil { + klog.ErrorS(err, "the connection should not be success", "NetworkPolicyName", np.Name, "FromPodName", fromPod.Name, "ToIP", ip) + } + }() + checkCount += 2 + } nps = append(nps, npInfo) return nil diff --git a/test/performance/framework/pods.go b/test/performance/framework/pods.go index 600d2c11457..6fbff225e75 100644 --- a/test/performance/framework/pods.go +++ b/test/performance/framework/pods.go @@ -98,7 +98,7 @@ func newWorkloadPod(podName, ns string, onRealNode bool, labelNum int) *corev1.P return workloadPodTemplate(podName, ns, labels, onRealNode) } -func ScaleUpWorkloadPods(ctx context.Context, ch chan ResponseTime, data *ScaleData) error { +func ScaleUpWorkloadPods(ctx context.Context, ch chan time.Duration, data *ScaleData) error { if data.Specification.SkipDeployWorkload { klog.V(2).InfoS("Skip creating workload Pods", "SkipDeployWorkload", data.Specification.SkipDeployWorkload) return nil diff --git a/test/performance/framework/restart.go b/test/performance/framework/restart.go index 4ee2880f9da..0f4b9a70b9c 100644 --- a/test/performance/framework/restart.go +++ b/test/performance/framework/restart.go @@ -17,6 +17,7 @@ package framework //goland:noinspection ALL import ( "context" + "time" appv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,7 +34,7 @@ func init() { RegisterFunc("RestartOVSContainer", RestartOVSContainer) } -func ScaleRestartAgent(ctx context.Context, ch chan ResponseTime, data *ScaleData) error { +func ScaleRestartAgent(ctx context.Context, ch chan time.Duration, data *ScaleData) error { err := data.kubernetesClientSet.CoreV1().Pods(metav1.NamespaceSystem). DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: "app=antrea,component=antrea-agent"}) if err != nil { @@ -56,7 +57,7 @@ func ScaleRestartAgent(ctx context.Context, ch chan ResponseTime, data *ScaleDat }, ctx.Done()) } -func RestartController(ctx context.Context, ch chan ResponseTime, data *ScaleData) error { +func RestartController(ctx context.Context, ch chan time.Duration, data *ScaleData) error { err := data.kubernetesClientSet.CoreV1().Pods(metav1.NamespaceSystem). DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: "app=antrea,component=antrea-controller"}) if err != nil { @@ -75,6 +76,6 @@ func RestartController(ctx context.Context, ch chan ResponseTime, data *ScaleDat }, ctx.Done()) } -func RestartOVSContainer(ctx context.Context, ch chan ResponseTime, data *ScaleData) error { +func RestartOVSContainer(ctx context.Context, ch chan time.Duration, data *ScaleData) error { return ScaleRestartAgent(ctx, ch, data) } diff --git a/test/performance/framework/scale_up.go b/test/performance/framework/scale_up.go index f40dfd9a9ea..c74479608ef 100644 --- a/test/performance/framework/scale_up.go +++ b/test/performance/framework/scale_up.go @@ -142,6 +142,7 @@ type ScaleData struct { namespaces []string Specification *config.ScaleList nodesNum int + maxCheckNum int simulateNodesNum int podsNumPerNs int checkTimeout time.Duration diff --git a/test/performance/framework/service.go b/test/performance/framework/service.go index 0975579e95a..615a2a25433 100644 --- a/test/performance/framework/service.go +++ b/test/performance/framework/service.go @@ -32,7 +32,7 @@ func init() { RegisterFunc("ScaleServiceDemo", ScaleServiceDemo) } -func ScaleService(ctx context.Context, ch chan ResponseTime, data *ScaleData) error { +func ScaleService(ctx context.Context, ch chan time.Duration, data *ScaleData) error { svcs, err := service.ScaleUp(ctx, data.kubernetesClientSet, data.namespaces, data.Specification.SvcNumPerNs, data.Specification.IPv6) if err != nil { return fmt.Errorf("scale up services error: %v", err) @@ -49,7 +49,7 @@ func ScaleService(ctx context.Context, ch chan ResponseTime, data *ScaleData) er k := int(utils.GenRandInt()) % len(data.clientPods) clientPod := data.clientPods[k] svc := svcs[i] - if err := PingIP(ctx, data.kubeconfig, data.kubernetesClientSet, clientPod.Namespace, clientPod.Name, svc.IP); err != nil { + if err := utils.PingIP(ctx, data.kubeconfig, data.kubernetesClientSet, clientPod.Namespace, clientPod.Name, svc.IP); err != nil { klog.ErrorS(err, "Check readiness of service error", "ClientPodName", clientPod.Name, "svc", svc) return err } @@ -62,7 +62,7 @@ func ScaleService(ctx context.Context, ch chan ResponseTime, data *ScaleData) er return nil } -func ScaleServiceDemo(ctx context.Context, ch chan ResponseTime, data *ScaleData) error { +func ScaleServiceDemo(ctx context.Context, ch chan time.Duration, data *ScaleData) error { list, err := data.kubernetesClientSet.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) if err != nil { return err diff --git a/test/performance/framework/table/table.go b/test/performance/framework/table/table.go index f5a287632c7..ea47a266408 100644 --- a/test/performance/framework/table/table.go +++ b/test/performance/framework/table/table.go @@ -15,11 +15,9 @@ package table import ( + "github.com/olekukonko/tablewriter" "io" "strings" - "time" - - "github.com/olekukonko/tablewriter" ) const ( @@ -28,9 +26,9 @@ const ( CtxMinResponseTime = "MinResponseTime" ) -func GenerateRow(name, result string, duration time.Duration, avgTime, maxTime, minTime string) []string { +func GenerateRow(name, result string, duration, avgTime, maxTime, minTime string) []string { name = strings.ReplaceAll(name, " ", "-") - return []string{name, result, duration.String(), avgTime, maxTime, minTime} + return []string{name, result, duration, avgTime, maxTime, minTime} } func ShowResult(w io.Writer, rows [][]string) { diff --git a/test/performance/framework/table/table_test.go b/test/performance/framework/table/table_test.go index e1cdc11a92a..2fbddab1f99 100644 --- a/test/performance/framework/table/table_test.go +++ b/test/performance/framework/table/table_test.go @@ -23,7 +23,7 @@ import ( func TestTableName(t *testing.T) { startTime := time.Now() var rows [][]string - rows = append(rows, GenerateRow("BenchmarkInitXLargeScaleWithNetpolPerPod-2 123", "success", time.Since(startTime))) - rows = append(rows, GenerateRow("caseName1", "fail", time.Since(startTime))) + rows = append(rows, GenerateRow("BenchmarkInitXLargeScaleWithNetpolPerPod-2 123", "success", time.Since(startTime).String(), "", "", "")) + rows = append(rows, GenerateRow("caseName1", "fail", time.Since(startTime).String(), "", "", "")) ShowResult(os.Stdout, rows) } diff --git a/test/performance/framework/exec.go b/test/performance/utils/exec.go similarity index 75% rename from test/performance/framework/exec.go rename to test/performance/utils/exec.go index 3da46b779a1..2b8234f404d 100644 --- a/test/performance/framework/exec.go +++ b/test/performance/utils/exec.go @@ -12,21 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. -package framework +package utils import ( "bytes" "context" "fmt" + "k8s.io/apimachinery/pkg/util/wait" "net/url" + "time" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" +) - "antrea.io/antrea/test/performance/utils" +const ( + defaultInterval = 1 * time.Second + defaultTimeout = 3 * time.Minute ) func ExecURL(kClient kubernetes.Interface, clientPodNamespace, clientPodName, peerIP string) *url.URL { @@ -44,19 +49,28 @@ func ExecURL(kClient kubernetes.Interface, clientPodNamespace, clientPodName, pe }, scheme.ParameterCodec).URL() } +func WaitUntil(ctx context.Context, ch chan time.Duration, kubeConfig *rest.Config, kc kubernetes.Interface, podNs, podName, ip string, expectErr bool) error { + startTime := time.Now() + defer func() { + ch <- time.Since(startTime) + }() + return wait.Poll(defaultInterval, defaultTimeout, func() (bool, error) { + err := PingIP(ctx, kubeConfig, kc, podNs, podName, ip) + if (err != nil && !expectErr) || (err == nil && expectErr) { + return false, fmt.Errorf("error when getting expected condition: %+v", err) + } + return true, nil + }) +} + func PingIP(ctx context.Context, kubeConfig *rest.Config, kc kubernetes.Interface, podNs, podName, ip string) error { - // startTime := time.Now() - // defer func() { - // durationTime := time.Since(startTime) - // ctx = context.WithValue(ctx, "", "") - // }() executor, err := remotecommand.NewSPDYExecutor(kubeConfig, "POST", ExecURL(kc, podNs, podName, ip)) if err != nil { return fmt.Errorf("error when creating SPDY executor: %w", err) } // Try to execute command with failure tolerant. - if err = utils.DefaultRetry(func() error { + if err = DefaultRetry(func() error { var stdout, stderr bytes.Buffer if err := executor.StreamWithContext(ctx, remotecommand.StreamOptions{Stdout: &stdout, Stderr: &stderr}); err != nil { err := fmt.Errorf("executing commands on service client Pod error: %v", err)