Skip to content

Commit

Permalink
Add templateFilesPath
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Qiu <[email protected]>
  • Loading branch information
wenqiq committed Mar 21, 2024
1 parent 692aee9 commit 8044258
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 256 deletions.
14 changes: 8 additions & 6 deletions cmd/antrea-scale/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
)

type options struct {
kubeConfigPath string
configPath string
timeout int
loglevel string
kubeConfigPath string
configPath string
templateFilesPath string
timeout int
loglevel string
}

var (
Expand All @@ -40,7 +41,8 @@ var (

func init() {
flag.StringVar(&option.kubeConfigPath, "kubeConfigPath", "", "Cluster config path")
flag.StringVar(&option.configPath, "config", "", "Config file of scale test cases list")
flag.StringVar(&option.configPath, "config", "test/performance/scale.yml", "Config file of scale test cases list")
flag.StringVar(&option.templateFilesPath, "templateFilesPath", "test/performance/assets", "Template YAML files path of test cases")
flag.IntVar(&option.timeout, "timeout", 10, "Timeout limit (minutes) of the whole scale test")
flag.StringVar(&option.loglevel, "v", "2", "")
flag.Parse()
Expand All @@ -66,7 +68,7 @@ func run() error {
globalCtx, globalCancelFunc := context.WithTimeout(context.Background(), time.Duration(option.timeout)*time.Minute)
defer globalCancelFunc()

testData, err := framework.ScaleUp(globalCtx, option.kubeConfigPath, option.configPath)
testData, err := framework.ScaleUp(globalCtx, option.kubeConfigPath, option.configPath, option.templateFilesPath)
if err != nil {
return fmt.Errorf("error when creating TestData: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ spec:
targetPort: 80
selector:
app-1: scale-1
type: ClusterIP
type: ClusterIP
5 changes: 4 additions & 1 deletion test/performance/framework/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type ScaleData struct {
checkTimeout time.Duration
controlPlaneNodes []string
provider providers.ProviderInterface
templateFilesPath string
}

func createTestPodClients(ctx context.Context, kClient kubernetes.Interface, ns string) error {
Expand Down Expand Up @@ -120,7 +121,7 @@ func validScaleSpecification(c *config.ScaleConfiguration) error {
return nil
}

func ScaleUp(ctx context.Context, kubeConfigPath, scaleConfigPath string) (*ScaleData, error) {
func ScaleUp(ctx context.Context, kubeConfigPath, scaleConfigPath, templateFilesPath string) (*ScaleData, error) {
var td ScaleData
scaleConfig, err := config.ParseConfigs(scaleConfigPath)
if err != nil {
Expand All @@ -129,6 +130,8 @@ func ScaleUp(ctx context.Context, kubeConfigPath, scaleConfigPath string) (*Scal
klog.InfoS("Scale config", "scaleConfig", scaleConfig)
td.Specification = scaleConfig

td.templateFilesPath = templateFilesPath

if err := validScaleSpecification(&scaleConfig.ScaleConfiguration); err != nil {
return nil, err
}
Expand Down
208 changes: 204 additions & 4 deletions test/performance/framework/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,26 @@ package framework
import (
"context"
"fmt"
"net"
"path"
"regexp"
"strings"
"time"

"github.com/google/uuid"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

"antrea.io/antrea/test/performance/framework/service"
"antrea.io/antrea/pkg/ipam/ipallocator"
"antrea.io/antrea/test/e2e/providers"
"antrea.io/antrea/test/performance/config"
utils2 "antrea.io/antrea/test/performance/framework/utils"
"antrea.io/antrea/test/performance/framework/workload_pod"
"antrea.io/antrea/test/performance/utils"
)

func init() {
Expand All @@ -32,8 +47,8 @@ func init() {
func ScaleService(ctx context.Context, ch chan time.Duration, data *ScaleData) (res ScaleResult) {
var err error

var svcs []service.ServiceInfo
svcs, err = service.ScaleUp(ctx, data.provider, data.controlPlaneNodes[0], data.kubernetesClientSet, data.namespaces, data.Specification.SvcNumPerNs, data.Specification.IPv6, ch)
var svcs []ServiceInfo
svcs, err = scaleUp(ctx, data, ch)
if err != nil {
res.err = fmt.Errorf("scale up services error: %v", err)
return
Expand All @@ -49,10 +64,195 @@ func ScaleService(ctx context.Context, ch chan time.Duration, data *ScaleData) (
klog.InfoS("Waiting the check goroutine finish")
time.Sleep(time.Second)
}
if err = service.ScaleDown(ctx, svcs, data.kubernetesClientSet); err != nil {
if err = scaleDown(ctx, data, svcs); err != nil {
klog.ErrorS(err, "Scale down Services failed")
}
}()

return
}

func renderServices(service *corev1.Service, num int) (svcs []*corev1.Service) {
for i := 0; i < num; i++ {
labelNum := i/2 + 1
svc := service
svc.Name = fmt.Sprintf("antrea-scale-svc-%d-%s", i, uuid.New().String())
svc.Spec.Selector = map[string]string{
fmt.Sprintf("%s%d", utils.SelectorLabelKeySuffix, labelNum): fmt.Sprintf("%s%d", utils.SelectorLabelValueSuffix, labelNum),
}
svcs = append(svcs, svc)
}
return
}

type ServiceInfo struct {
Name string
IP string
NameSpace string
}

func retrieveCIDRs(provider providers.ProviderInterface, controlPlaneNodeName string, cmd string, reg string) ([]string, error) {
res := make([]string, 2)
rc, stdout, _, err := provider.RunCommandOnNode(controlPlaneNodeName, cmd)
if err != nil || rc != 0 {
return res, fmt.Errorf("error when running the following command `%s` on control-plane Node: %v, %s", cmd, err, stdout)
}
re := regexp.MustCompile(reg)
matches := re.FindStringSubmatch(stdout)
if len(matches) == 0 {
return res, fmt.Errorf("cannot retrieve CIDR, unexpected kubectl output: %s", stdout)
}
cidrs := strings.Split(matches[1], ",")
if len(cidrs) == 1 {
_, cidr, err := net.ParseCIDR(cidrs[0])
if err != nil {
return res, fmt.Errorf("CIDR cannot be parsed: %s", cidrs[0])
}
if cidr.IP.To4() != nil {
res[0] = cidrs[0]
} else {
res[1] = cidrs[0]
}
} else if len(cidrs) == 2 {
_, cidr, err := net.ParseCIDR(cidrs[0])
if err != nil {
return res, fmt.Errorf("CIDR cannot be parsed: %s", cidrs[0])
}
if cidr.IP.To4() != nil {
res[0] = cidrs[0]
res[1] = cidrs[1]
} else {
res[0] = cidrs[1]
res[1] = cidrs[0]
}
} else {
return res, fmt.Errorf("unexpected cluster CIDR: %s", matches[1])
}
return res, nil
}

func scaleUp(ctx context.Context, data *ScaleData, ch chan time.Duration) (svcs []ServiceInfo, err error) {
provider := data.provider
controlPlaneNodeName := data.controlPlaneNodes[0]
cs := data.kubernetesClientSet
nss := data.namespaces
numPerNs := data.Specification.SvcNumPerNs
ipv6 := data.Specification.IPv6
start := time.Now()

var svcCIDRs []string
klog.InfoS("retrieving service CIDRs", "controlPlaneNodeName", controlPlaneNodeName)
svcCIDRs, err = retrieveCIDRs(provider, controlPlaneNodeName, "kubectl cluster-info dump | grep service-cluster-ip-range", `service-cluster-ip-range=([^"]+)`)
if err != nil {
// Retrieve service CIDRs for Rancher clusters.
svcCIDRs, err = retrieveCIDRs(provider, controlPlaneNodeName, "ps aux | grep kube-controller | grep service-cluster-ip-range", `service-cluster-ip-range=([^\s]+)`)
if err != nil {
klog.ErrorS(err, "retrieveCIDRs")
return
}
}

klog.InfoS("retrieveCIDRs", "svcCIDRs", svcCIDRs)
svcCIDRIPv4 := svcCIDRs[0]
_, ipNet, _ := net.ParseCIDR(svcCIDRIPv4)
allocator, err := ipallocator.NewCIDRAllocator(ipNet, []net.IP{net.ParseIP("10.96.0.1"), net.ParseIP("10.96.0.10")})

Check failure on line 158 in test/performance/framework/service.go

View workflow job for this annotation

GitHub Actions / Golangci-lint (ubuntu-latest)

SA4006: this value of `err` is never used (staticcheck)

Check failure on line 158 in test/performance/framework/service.go

View workflow job for this annotation

GitHub Actions / Golangci-lint (macos-latest)

SA4006: this value of `err` is never used (staticcheck)

var obj runtime.Object
obj, err = utils2.ReadYamlFile(path.Join(data.templateFilesPath, "service/service.yaml"))
if err != nil {
err = fmt.Errorf("error reading Service template: %+v", err)
return
}

service, ok := obj.(*corev1.Service)
if !ok {
err = fmt.Errorf("error converting to Unstructured: %+v", "the template file is nil")
return
}

for _, ns := range nss {
klog.InfoS("Scale up Services", "Namespace", ns)
testPodIndex := 0
var podList *corev1.PodList
podList, err = cs.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{})
if err != nil {
return
}
for _, svc := range renderServices(service, numPerNs) {
if ipv6 {
ipFamily := corev1.IPv6Protocol
svc.Spec.IPFamilies = []corev1.IPFamily{ipFamily}
}
if err := utils.DefaultRetry(func() error {
var clusterIP net.IP
clusterIP, err = allocator.AllocateNext()
if err != nil {
return fmt.Errorf("allocate IP from ServiceCIDR error: %+v", err)
}

var newSvc *corev1.Service
var err error
var clientPod *corev1.Pod
svc.Spec.ClusterIP = clusterIP.String()
klog.InfoS("go FetchTimestampFromLog", "cap(ch)", cap(ch))
fromPod := &podList.Items[testPodIndex%len(podList.Items)]
testPodIndex++
clientPod, err = workload_pod.CreateClientPod(ctx, cs, fromPod.Namespace, fromPod.Name, []string{fmt.Sprintf("%s:%d", clusterIP, 80)}, workload_pod.ScaleTestPodProbeContainerName)
if err != nil || clientPod == nil {
klog.ErrorS(err, "Create client test Pod failed, can not verify the Service, will exist")
return err
}
startTime0 := time.Now().UnixNano()
newSvc, err = cs.CoreV1().Services(ns).Create(ctx, svc, metav1.CreateOptions{})
if err != nil {
if errors.IsAlreadyExists(err) {
newSvc, _ = cs.CoreV1().Services(ns).Get(ctx, svc.Name, metav1.GetOptions{})
} else {
return err
}
}

if newSvc.Spec.ClusterIP == "" {
return fmt.Errorf("service %s Spec.ClusterIP is empty", svc.Name)
}
klog.InfoS("Create Service", "Name", newSvc.Name, "ClusterIP", newSvc.Spec.ClusterIP, "Namespace", ns)
svcs = append(svcs, ServiceInfo{Name: newSvc.Name, IP: newSvc.Spec.ClusterIP, NameSpace: newSvc.Namespace})
go func() {
startTimeStamp := time.Now().UnixNano()
klog.InfoS("Service creating operate time", "Duration(ms)", (startTimeStamp-startTime0)/1000000)
key := "down to up"
if err := utils.FetchTimestampFromLog(ctx, cs, clientPod.Namespace, clientPod.Name, workload_pod.ScaleTestPodProbeContainerName, ch, startTimeStamp, key); err != nil {
klog.ErrorS(err, "Check readiness of service error", "ClientPodName", clientPod.Name, "svc", svc)
}
klog.InfoS("Update test Pod to check Service", "ClusterIP", clusterIP)
}()
return nil
}); err != nil {
return nil, err
}
time.Sleep(time.Duration(utils.GenRandInt()%2000) * time.Millisecond)
}
}
klog.InfoS("Scale up Services", "Duration", time.Since(start), "count", len(svcs))
return
}

func scaleDown(ctx context.Context, data *ScaleData, svcs []ServiceInfo) error {
cs := data.kubernetesClientSet
for _, svc := range svcs {
if err := cs.CoreV1().Services(svc.NameSpace).Delete(ctx, svc.Name, metav1.DeleteOptions{}); err != nil {
return err
}
klog.V(2).InfoS("Deleted service", "serviceName", svc)
}
return wait.PollImmediateUntil(config.WaitInterval, func() (done bool, err error) {
count := 0
for _, svc := range svcs {
if err := cs.CoreV1().Services(svc.NameSpace).Delete(ctx, svc.Name, metav1.DeleteOptions{}); errors.IsNotFound(err) {
count++
}
}
klog.InfoS("Scale down Services", "Services", len(svcs), "cleanedUpCount", count)
return count == len(svcs), nil
}, ctx.Done())
}
46 changes: 0 additions & 46 deletions test/performance/framework/service/scale_down.go

This file was deleted.

Loading

0 comments on commit 8044258

Please sign in to comment.