Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Qiu <[email protected]>
  • Loading branch information
wenqiq committed Jan 31, 2024
1 parent ec65080 commit 741677d
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 63 deletions.
23 changes: 13 additions & 10 deletions test/performance/framework/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"antrea.io/antrea/test/performance/framework/table"
)

type RunFunc func(ctx context.Context, ch chan time.Duration, data *ScaleData) error
type RunFunc func(ctx context.Context, ch chan time.Duration, data *ScaleData) ScaleResult

var cases = make(map[string]RunFunc, 128)

Expand Down Expand Up @@ -56,27 +56,26 @@ func (c *ScaleTestCase) Name() string {
return c.name
}

type ResponseTime struct {
max time.Duration
min time.Duration
avg time.Duration
type ScaleResult struct {
err error
actualCheckNum int
}

func (c *ScaleTestCase) Run(ctx context.Context, testData *ScaleData) error {
ctx = wrapScaleTestName(ctx, c.name)
done := make(chan interface{}, 1)
done := make(chan ScaleResult, 1)

startTime := time.Now()
caseName := ctx.Value(CtxScaleCaseName).(string)
testData.maxCheckNum = testData.nodesNum * 2
ress := make(chan time.Duration, testData.maxCheckNum)
res := "failed"
actualCheckNum := 0
defer func() {
var rows [][]string

var total, minRes, maxRes, avg time.Duration
count := 0
for i := 0; i < testData.maxCheckNum; i++ {
for i := 0; i < actualCheckNum; i++ {
res := <-ress
total += res
count++
Expand All @@ -90,7 +89,9 @@ func (c *ScaleTestCase) Run(ctx context.Context, testData *ScaleData) error {
}
}

avg = total / time.Duration(count)
if count != 0 {
avg = total / time.Duration(count)
}

rows = append(rows, table.GenerateRow(caseName, res, time.Since(startTime).String(),
avg.String(), maxRes.String(), minRes.String()))
Expand All @@ -107,10 +108,12 @@ func (c *ScaleTestCase) Run(ctx context.Context, testData *ScaleData) error {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-done:
case scaleRes := <-done:
err := scaleRes.err
if err != nil {
return err.(error)
}
actualCheckNum = scaleRes.actualCheckNum
res = "success"
return nil
}
Expand Down
13 changes: 8 additions & 5 deletions test/performance/framework/networkpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ func init() {
RegisterFunc("ScaleNetworkPolicy", ScaleNetworkPolicy)
}

func ScaleNetworkPolicy(ctx context.Context, ch chan time.Duration, data *ScaleData) error {
_, err := networkpolicy.ScaleUp(ctx, data.kubeconfig, data.kubernetesClientSet, data.namespaces,
func ScaleNetworkPolicy(ctx context.Context, ch chan time.Duration, data *ScaleData) (res ScaleResult) {
checkCount, err := networkpolicy.ScaleUp(ctx, data.kubeconfig, data.kubernetesClientSet, data.namespaces,
data.Specification.NpNumPerNs, data.clientPods, data.Specification.IPv6, data.maxCheckNum, ch)
if err != nil {
return fmt.Errorf("scale up NetworkPolicies error: %v", err)
res.err = fmt.Errorf("scale up NetworkPolicies error: %v", err)
return
}

// maxNPCheckedCount := data.nodesNum
Expand Down Expand Up @@ -63,7 +64,9 @@ func ScaleNetworkPolicy(ctx context.Context, ch chan time.Duration, data *ScaleD
// klog.InfoS("Checked networkPolicy", "Name", np.Name, "Namespace", np.Namespace, "count", i, "maxNum", maxNPCheckedCount)
// }
if err := networkpolicy.ScaleDown(ctx, data.namespaces, data.kubernetesClientSet); err != nil {
return err
res.err = err
return
}
return nil
res.actualCheckNum = checkCount
return
}
13 changes: 7 additions & 6 deletions test/performance/framework/networkpolicy/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package networkpolicy
import (
"context"
"fmt"
"k8s.io/client-go/rest"
"time"

"github.com/google/uuid"
Expand All @@ -27,6 +26,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"

"antrea.io/antrea/test/performance/utils"
Expand Down Expand Up @@ -94,14 +94,14 @@ type NetworkPolicyInfo struct {
toIP string
}

func ScaleUp(ctx context.Context, kubeConfig *rest.Config, cs kubernetes.Interface, nss []string, numPerNs int, clientPods []corev1.Pod, ipv6 bool, maxCheckNum int, ch chan time.Duration) (nps []NetworkPolicyInfo, err error) {
func ScaleUp(ctx context.Context, kubeConfig *rest.Config, cs kubernetes.Interface, nss []string, numPerNs int, clientPods []corev1.Pod, ipv6 bool, maxCheckNum int, ch chan time.Duration) (actualCheckNUm int, err error) {
// ScaleUp networkPolicies
start := time.Now()
checkCount := 0
for _, ns := range nss {
npsData, err := generateNetworkPolicies(ns, numPerNs)
if err != nil {
return nil, fmt.Errorf("error when generating network policies: %w", err)
return 0, fmt.Errorf("error when generating network policies: %w", err)
}
klog.InfoS("Scale up NetworkPolicies", "Num", len(npsData), "Namespace", ns)
for _, np := range npsData {
Expand All @@ -122,8 +122,10 @@ func ScaleUp(ctx context.Context, kubeConfig *rest.Config, cs kubernetes.Interfa
npInfo = NetworkPolicyInfo{Name: newNP.Name, Namespace: newNP.Namespace, Spec: newNP.Spec}
return nil
}); err != nil {
return nil, err
return 0, err
}

// sample num
if shouldCheck {
// Check connection of Pods in NetworkPolicies, workload Pods
fromPod, ip, err := SelectConnectPod(ctx, cs, npInfo.Namespace, &npInfo)
Expand All @@ -133,7 +135,6 @@ func ScaleUp(ctx context.Context, kubeConfig *rest.Config, cs kubernetes.Interfa
npInfo.fromPodName = fromPod.Name
npInfo.fromPodNamespace = fromPod.Namespace
npInfo.toIP = ip
nps = append(nps, npInfo)
checkCount++
go func() {
if err := utils.WaitUntil(ctx, ch, kubeConfig, cs, fromPod.Namespace, fromPod.Name, ip, false); err != nil {
Expand All @@ -155,7 +156,7 @@ func ScaleUp(ctx context.Context, kubeConfig *rest.Config, cs kubernetes.Interfa
}
}
}
klog.InfoS("Scale up NetworkPolicies", "Duration", time.Since(start), "count", len(nps))
klog.InfoS("Scale up NetworkPolicies", "Duration", time.Since(start), "actualCheckNum", checkCount)
return
}

Expand Down
48 changes: 29 additions & 19 deletions test/performance/framework/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@ package framework
import (
"context"
"fmt"
"k8s.io/klog/v2"
"time"

"antrea.io/antrea/test/performance/config"
"antrea.io/antrea/test/performance/utils"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

"antrea.io/antrea/test/performance/config"
"antrea.io/antrea/test/performance/utils"
)

func init() {
Expand Down Expand Up @@ -98,14 +97,19 @@ func newWorkloadPod(podName, ns string, onRealNode bool, labelNum int) *corev1.P
return workloadPodTemplate(podName, ns, labels, onRealNode)
}

func ScaleUpWorkloadPods(ctx context.Context, ch chan time.Duration, data *ScaleData) error {
func ScaleUpWorkloadPods(ctx context.Context, ch chan time.Duration, data *ScaleData) (res ScaleResult) {
var err error
defer func() {
res.err = err
}()
if data.Specification.SkipDeployWorkload {
klog.V(2).InfoS("Skip creating workload Pods", "SkipDeployWorkload", data.Specification.SkipDeployWorkload)
return nil
return
}
// Creating workload Pods
start := time.Now()
podNum := data.Specification.PodsNumPerNs
count := 0
for _, ns := range data.namespaces {
gErr, _ := errgroup.WithContext(context.Background())
for i := 0; i < podNum; i++ {
Expand All @@ -126,12 +130,12 @@ func ScaleUpWorkloadPods(ctx context.Context, ch chan time.Duration, data *Scale
})
}
klog.V(2).InfoS("Create workload Pods", "PodNum", podNum, "Namespace", ns)
if err := gErr.Wait(); err != nil {
return err
if err = gErr.Wait(); err != nil {
return
}

// Waiting scale workload Pods to be ready
err := wait.PollUntil(config.WaitInterval, func() (bool, error) {
err = wait.PollUntil(config.WaitInterval, func() (bool, error) {
podsResult, err := data.kubernetesClientSet.
CoreV1().Pods(ns).
List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", AppLabelKey, AppLabelValue)})
Expand All @@ -150,17 +154,23 @@ func ScaleUpWorkloadPods(ctx context.Context, ch chan time.Duration, data *Scale
return false, nil
}, ctx.Done())
if err != nil {
return err
return
}
go func() {
select {
case ch <- time.Since(start):
klog.InfoS("Successfully write in channel")
default:
klog.InfoS("Skipped writing to the channel. No receiver.")
}
}()

if count < data.maxCheckNum {
ch <- time.Since(start)
count++
}
// go func() {
// select {
// case ch <- time.Since(start):
// klog.InfoS("Successfully write in channel")
// default:
// klog.InfoS("Skipped writing to the channel. No receiver.")
// }
// }()
}
res.actualCheckNum = count
klog.InfoS("Scaled up Pods", "Duration", time.Since(start), "count", podNum*len(data.namespaces))
return nil
return res
}
30 changes: 20 additions & 10 deletions test/performance/framework/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,17 @@ func init() {
RegisterFunc("RestartOVSContainer", RestartOVSContainer)
}

func ScaleRestartAgent(ctx context.Context, ch chan time.Duration, data *ScaleData) error {
err := data.kubernetesClientSet.CoreV1().Pods(metav1.NamespaceSystem).
func ScaleRestartAgent(ctx context.Context, ch chan time.Duration, data *ScaleData) (res ScaleResult) {
var err error
defer func() {
res.err = err
}()
err = data.kubernetesClientSet.CoreV1().Pods(metav1.NamespaceSystem).
DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: "app=antrea,component=antrea-agent"})
if err != nil {
return err
return
}
return wait.PollImmediateUntil(config.WaitInterval, func() (bool, error) {
err = wait.PollImmediateUntil(config.WaitInterval, func() (bool, error) {
var ds *appv1.DaemonSet
if err := utils.DefaultRetry(func() error {
var err error
Expand All @@ -55,15 +59,20 @@ func ScaleRestartAgent(ctx context.Context, ch chan time.Duration, data *ScaleDa
"NumberAvailable", ds.Status.NumberAvailable)
return ds.Status.DesiredNumberScheduled == ds.Status.NumberAvailable, nil
}, ctx.Done())
return
}

func RestartController(ctx context.Context, ch chan time.Duration, data *ScaleData) error {
err := data.kubernetesClientSet.CoreV1().Pods(metav1.NamespaceSystem).
func RestartController(ctx context.Context, ch chan time.Duration, data *ScaleData) (res ScaleResult) {
var err error
defer func() {
res.err = err
}()
err = data.kubernetesClientSet.CoreV1().Pods(metav1.NamespaceSystem).
DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: "app=antrea,component=antrea-controller"})
if err != nil {
return err
return
}
return wait.PollImmediateUntil(config.WaitInterval, func() (bool, error) {
err = wait.PollImmediateUntil(config.WaitInterval, func() (bool, error) {
var dp *appv1.Deployment
if err := utils.DefaultRetry(func() error {
var err error
Expand All @@ -72,10 +81,11 @@ func RestartController(ctx context.Context, ch chan time.Duration, data *ScaleDa
}); err != nil {
return false, err
}
return dp.Status.UnavailableReplicas == 0, nil
return dp.Status.ObservedGeneration == dp.Generation && dp.Status.ReadyReplicas == *dp.Spec.Replicas, nil
}, ctx.Done())
return
}

func RestartOVSContainer(ctx context.Context, ch chan time.Duration, data *ScaleData) error {
func RestartOVSContainer(ctx context.Context, ch chan time.Duration, data *ScaleData) ScaleResult {
return ScaleRestartAgent(ctx, ch, data)
}
41 changes: 29 additions & 12 deletions test/performance/framework/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"

Expand All @@ -32,47 +33,63 @@ func init() {
RegisterFunc("ScaleServiceDemo", ScaleServiceDemo)
}

func ScaleService(ctx context.Context, ch chan time.Duration, data *ScaleData) error {
func ScaleService(ctx context.Context, ch chan time.Duration, data *ScaleData) (res ScaleResult) {
var err error
defer func() {
res.err = err
}()
svcs, err := service.ScaleUp(ctx, data.kubernetesClientSet, data.namespaces, data.Specification.SvcNumPerNs, data.Specification.IPv6)
if err != nil {
return fmt.Errorf("scale up services error: %v", err)
err = fmt.Errorf("scale up services error: %v", err)
return
}

maxSvcCheckedCount := data.nodesNum

start := time.Now()
actualCheckNum := 0
for i := range svcs {
svcCheckStart := time.Now()
if utils.CheckTimeout(start, data.checkTimeout) || i > maxSvcCheckedCount {
klog.InfoS("Services check deadline exceeded", "count", i)
break
}
k := int(utils.GenRandInt()) % len(data.clientPods)
clientPod := data.clientPods[k]
svc := svcs[i]
if err := utils.PingIP(ctx, data.kubeconfig, data.kubernetesClientSet, clientPod.Namespace, clientPod.Name, svc.IP); err != nil {
if err = utils.PingIP(ctx, data.kubeconfig, data.kubernetesClientSet, clientPod.Namespace, clientPod.Name, svc.IP); err != nil {
klog.ErrorS(err, "Check readiness of service error", "ClientPodName", clientPod.Name, "svc", svc)
return err
return
}
ch <- time.Since(svcCheckStart)
actualCheckNum++
klog.V(2).InfoS("Check service", "svc", svc, "Pod", clientPod.Name)
}

if err := service.ScaleDown(ctx, svcs, data.kubernetesClientSet); err != nil {
return fmt.Errorf("scale down svcs error %v", err)
if err = service.ScaleDown(ctx, svcs, data.kubernetesClientSet); err != nil {
return
}
return nil
res.actualCheckNum = actualCheckNum
return
}

func ScaleServiceDemo(ctx context.Context, ch chan time.Duration, data *ScaleData) error {
func ScaleServiceDemo(ctx context.Context, ch chan time.Duration, data *ScaleData) (res ScaleResult) {
var err error
defer func() {
res.err = err
}()
start := time.Now()
list, err := data.kubernetesClientSet.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
var nss *v1.NamespaceList
nss, err = data.kubernetesClientSet.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {
return err
return
}
klog.InfoS("List all test namespace", "namespacesNum", len(list.Items))
klog.InfoS("List all test namespace", "namespacesNum", len(nss.Items))
klog.V(2).InfoS("level 2 log")
klog.V(1).InfoS("level 1 log")
for i := 0; i < data.maxCheckNum; i++ {
ch <- time.Since(start)
}
return nil
res.actualCheckNum = data.maxCheckNum
return
}
Loading

0 comments on commit 741677d

Please sign in to comment.