diff --git a/hack/gen-resources/generators/cluster_generator.go b/hack/gen-resources/generators/cluster_generator.go index 520e25c1e7656..eec1cdfe3cc2e 100644 --- a/hack/gen-resources/generators/cluster_generator.go +++ b/hack/gen-resources/generators/cluster_generator.go @@ -137,7 +137,7 @@ func (cg *ClusterGenerator) getClusterCredentials(namespace string, releaseSuffi return caData, cert, key, nil } -//TODO: also should provision service for vcluster pod +// TODO: also should provision service for vcluster pod func (cg *ClusterGenerator) installVCluster(opts *util.GenerateOpts, namespace string, releaseName string) error { cmd, err := helm.NewCmd("/tmp", "v3", "") if err != nil { @@ -175,71 +175,85 @@ func (cg *ClusterGenerator) retrieveClusterUri(namespace, releaseSuffix string) return "", nil } -func (cg *ClusterGenerator) Generate(opts *util.GenerateOpts) error { - for i := 1; i <= opts.ClusterOpts.Samples; i++ { - log.Printf("Generate cluster #%v of #%v", i, opts.ClusterOpts.Samples) +func (cg *ClusterGenerator) generate(i int, opts *util.GenerateOpts) error { + log.Printf("Generate cluster #%v of #%v", i, opts.ClusterOpts.Samples) - namespace := opts.ClusterOpts.NamespacePrefix + "-" + util.GetRandomString() + namespace := opts.ClusterOpts.NamespacePrefix + "-" + util.GetRandomString() - log.Printf("Namespace is %s", namespace) + log.Printf("Namespace is %s", namespace) - releaseSuffix := util.GetRandomString() + releaseSuffix := util.GetRandomString() - log.Printf("Release suffix is %s", namespace) + log.Printf("Release suffix is %s", namespace) - err := cg.installVCluster(opts, namespace, POD_PREFIX+"-"+releaseSuffix) - if err != nil { - log.Printf("Skip cluster installation due error %v", err.Error()) - continue - } + err := cg.installVCluster(opts, namespace, POD_PREFIX+"-"+releaseSuffix) + if err != nil { + log.Printf("Skip cluster installation due error %v", err.Error()) + } - log.Print("Get cluster credentials") - caData, cert, key, err := cg.getClusterCredentials(namespace, releaseSuffix) + log.Print("Get cluster credentials") + caData, cert, key, err := cg.getClusterCredentials(namespace, releaseSuffix) - for o := 0; o < 5; o++ { - if err == nil { - break - } - log.Printf("Failed to get cluster credentials %s, retrying...", releaseSuffix) - time.Sleep(10 * time.Second) - caData, cert, key, err = cg.getClusterCredentials(namespace, releaseSuffix) + for o := 0; o < 5; o++ { + if err == nil { + break } - if err != nil { - return err - } - + log.Printf("Failed to get cluster credentials %s, retrying...", releaseSuffix) + time.Sleep(10 * time.Second) + caData, cert, key, err = cg.getClusterCredentials(namespace, releaseSuffix) + } + if err != nil { + return err + } - log.Print("Get cluster server uri") + log.Print("Get cluster server uri") - uri, err := cg.retrieveClusterUri(namespace, releaseSuffix) - if err != nil { - return err - } + uri, err := cg.retrieveClusterUri(namespace, releaseSuffix) + if err != nil { + return err + } - log.Printf("Cluster server uri is %s", uri) - - log.Print("Create cluster") - _, err = cg.db.CreateCluster(context.TODO(), &argoappv1.Cluster{ - Server: uri, - Name: opts.ClusterOpts.ClusterNamePrefix + "-" + util.GetRandomString(), - Config: argoappv1.ClusterConfig{ - TLSClientConfig: argoappv1.TLSClientConfig{ - Insecure: false, - ServerName: "kubernetes.default.svc", - CAData: caData, - CertData: cert, - KeyData: key, - }, + log.Printf("Cluster server uri is %s", uri) + + log.Print("Create cluster") + _, err = cg.db.CreateCluster(context.TODO(), &argoappv1.Cluster{ + Server: uri, + Name: opts.ClusterOpts.ClusterNamePrefix + "-" + util.GetRandomString(), + Config: argoappv1.ClusterConfig{ + TLSClientConfig: argoappv1.TLSClientConfig{ + Insecure: false, + ServerName: "kubernetes.default.svc", + CAData: caData, + CertData: cert, + KeyData: key, }, - ConnectionState: argoappv1.ConnectionState{}, - ServerVersion: "1.18", - Namespaces: []string{opts.ClusterOpts.DestinationNamespace}, - Labels: labels, - }) - if err != nil { - return err - } + }, + ConnectionState: argoappv1.ConnectionState{}, + ServerVersion: "1.18", + Namespaces: []string{opts.ClusterOpts.DestinationNamespace}, + Labels: labels, + }) + if err != nil { + return err + } + return nil +} + +func (cg *ClusterGenerator) Generate(opts *util.GenerateOpts) error { + log.Printf("Excute in parallel with %v", opts.ClusterOpts.Concurrency) + + wg := util.New(opts.ClusterOpts.Concurrency) + for l := 1; l <= opts.ClusterOpts.Samples; l++ { + wg.Add() + go func(i int) { + defer wg.Done() + err := cg.generate(i, opts) + if err != nil { + log.Printf("Failed to generate cluster #%v due to : %s", i, err.Error()) + } + }(l) } + wg.Wait() return nil } diff --git a/hack/gen-resources/util/gen_options_parser.go b/hack/gen-resources/util/gen_options_parser.go index 4a39703c3d03d..b352305f55437 100644 --- a/hack/gen-resources/util/gen_options_parser.go +++ b/hack/gen-resources/util/gen_options_parser.go @@ -2,7 +2,6 @@ package util import ( "os" - "gopkg.in/yaml.v2" ) @@ -34,6 +33,7 @@ type ClusterOpts struct { ValuesFilePath string `yaml:"valuesFilePath"` DestinationNamespace string `yaml:"destinationNamespace"` ClusterNamePrefix string `yaml:"clusterNamePrefix"` + Concurrency int `yaml:"parallel"` } type GenerateOpts struct { @@ -45,6 +45,12 @@ type GenerateOpts struct { Namespace string `yaml:"namespace"` } +func setDefaults(opts *GenerateOpts) { + if opts.ClusterOpts.Concurrency == 0 { + opts.ClusterOpts.Concurrency = 2 + } +} + func Parse(opts *GenerateOpts, file string) error { fp, err := os.ReadFile(file) if err != nil { @@ -55,5 +61,7 @@ func Parse(opts *GenerateOpts, file string) error { return e } + setDefaults(opts) + return nil } diff --git a/hack/gen-resources/util/sizedwaitgroup.go b/hack/gen-resources/util/sizedwaitgroup.go new file mode 100644 index 0000000000000..b43011ff802c9 --- /dev/null +++ b/hack/gen-resources/util/sizedwaitgroup.go @@ -0,0 +1,107 @@ +// The MIT License (MIT) + +// Copyright (c) 2018 Rémy Mathieu + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// https://github.com/remeh/sizedwaitgroup + +// Based upon sync.WaitGroup, SizedWaitGroup allows to start multiple +// routines and to wait for their end using the simple API. + +// SizedWaitGroup adds the feature of limiting the maximum number of +// concurrently started routines. It could for example be used to start +// multiples routines querying a database but without sending too much +// queries in order to not overload the given database. +// +// Rémy Mathieu © 2016 +package util + +import ( + "context" + "math" + "sync" +) + +// SizedWaitGroup has the same role and close to the +// same API as the Golang sync.WaitGroup but adds a limit of +// the amount of goroutines started concurrently. +type SizedWaitGroup struct { + Size int + + current chan struct{} + wg sync.WaitGroup +} + +// New creates a SizedWaitGroup. +// The limit parameter is the maximum amount of +// goroutines which can be started concurrently. +func New(limit int) SizedWaitGroup { + size := math.MaxInt32 // 2^31 - 1 + if limit > 0 { + size = limit + } + return SizedWaitGroup{ + Size: size, + + current: make(chan struct{}, size), + wg: sync.WaitGroup{}, + } +} + +// Add increments the internal WaitGroup counter. +// It can be blocking if the limit of spawned goroutines +// has been reached. It will stop blocking when Done is +// been called. +// +// See sync.WaitGroup documentation for more information. +func (s *SizedWaitGroup) Add() { + _ = s.AddWithContext(context.Background()) +} + +// AddWithContext increments the internal WaitGroup counter. +// It can be blocking if the limit of spawned goroutines +// has been reached. It will stop blocking when Done is +// been called, or when the context is canceled. Returns nil on +// success or an error if the context is canceled before the lock +// is acquired. +// +// See sync.WaitGroup documentation for more information. +func (s *SizedWaitGroup) AddWithContext(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case s.current <- struct{}{}: + break + } + s.wg.Add(1) + return nil +} + +// Done decrements the SizedWaitGroup counter. +// See sync.WaitGroup documentation for more information. +func (s *SizedWaitGroup) Done() { + <-s.current + s.wg.Done() +} + +// Wait blocks until the SizedWaitGroup counter is zero. +// See sync.WaitGroup documentation for more information. +func (s *SizedWaitGroup) Wait() { + s.wg.Wait() +}