Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 120 additions & 1 deletion cmd/argocd/commands/admin/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ func loadClusters(kubeClient *kubernetes.Clientset, appClient *versioned.Clients
cluster := batch[i]
clusterShard := 0
if replicas > 0 {
clusterShard = sharding.GetShardByID(cluster.ID, replicas)
if cluster.Shard == nil {
clusterShard = sharding.GetShardByID(cluster.ID, replicas)
} else {
clusterShard = int(*cluster.Shard)
}
}

if shard != -1 && clusterShard != shard {
Expand Down Expand Up @@ -194,6 +198,7 @@ func NewClusterShardsCommand() *cobra.Command {
command.Flags().IntVar(&replicas, "replicas", 0, "Application controller replicas count. Inferred from number of running controller pods if not specified")
command.Flags().BoolVar(&portForwardRedis, "port-forward-redis", true, "Automatically port-forward ha proxy redis from current namespace?")
cacheSrc = appstatecache.AddCacheFlagsToCmd(&command)
command.AddCommand(newBalanceClusterShardsCommand())
return &command
}

Expand All @@ -216,6 +221,120 @@ func printStatsSummary(clusters []ClusterWithInfo) {
_ = w.Flush()
}

func newBalanceClusterShardsCommand() *cobra.Command {
var (
replicas int
clientConfig clientcmd.ClientConfig
cacheSrc func() (*appstatecache.Cache, error)
portForwardRedis bool
dryRun bool
reset bool
deviationPercent int
)
var command = cobra.Command{
Use: "balance",
Short: "Print information about each controller shard and portion of Kubernetes resources it is responsible for.",
Run: func(cmd *cobra.Command, args []string) {
log.SetLevel(log.WarnLevel)

clientCfg, err := clientConfig.ClientConfig()
errors.CheckError(err)
namespace, _, err := clientConfig.Namespace()
errors.CheckError(err)
kubeClient := kubernetes.NewForConfigOrDie(clientCfg)
appClient := versioned.NewForConfigOrDie(clientCfg)

if replicas == 0 {
replicas, err = getControllerReplicas(kubeClient, namespace)
errors.CheckError(err)
}
if replicas == 0 {
return
}

clusters, err := loadClusters(kubeClient, appClient, replicas, namespace, portForwardRedis, cacheSrc, -1)
errors.CheckError(err)

if len(clusters) == 0 {
return
}

if reset {
for i := range clusters {
c := clusters[i]
c.Shard = -1
clusters[i] = c
}
} else {
clusters = balanceClustersFirstFit(clusters, replicas, deviationPercent)
}
if dryRun {
_, _ = fmt.Fprintf(os.Stdout, "Dry run, not applying changes\n")
} else {
argoDB := db.NewDB(namespace, settings.NewSettingsManager(context.TODO(), kubeClient, namespace), kubeClient)

for _, c := range clusters {
cluster := &c.Cluster
oldShard := int64(-1)
if c.Cluster.Shard != nil {
oldShard = *cluster.Shard
}

if int64(c.Shard) != oldShard {
if c.Shard > -1 {
s := int64(c.Shard)
cluster.Shard = &s
} else {
c.Cluster.Shard = nil
}
_, err = argoDB.UpdateCluster(context.Background(), cluster)
errors.CheckError(err)
_, _ = fmt.Fprintf(os.Stdout, "Updated cluster %s with shard %d\n", cluster.Server, c.Shard)
}
}
}
printStatsSummary(clusters)
},
}

clientConfig = cli.AddKubectlFlagsToCmd(&command)
command.Flags().BoolVar(&dryRun, "dry-run", true, "Dry run.")
command.Flags().BoolVar(&reset, "reset", false, "Remove shard from all clusters.")
command.Flags().IntVar(&deviationPercent, "deviation", 20, "Allowed deviation (%) from the perfect distribution during.")
command.Flags().IntVar(&replicas, "replicas", 0, "Application controller replicas count. Inferred from number of running controller pods if not specified")
command.Flags().BoolVar(&portForwardRedis, "port-forward-redis", true, "Automatically port-forward ha proxy redis from current namespace?")
cacheSrc = appstatecache.AddCacheFlagsToCmd(&command)
return &command
}

func balanceClustersFirstFit(clusters []ClusterWithInfo, replicas int, deviationPercent int) []ClusterWithInfo {
sort.Slice(clusters, func(i, j int) bool {
return clusters[i].Info.CacheInfo.ResourcesCount > clusters[j].Info.CacheInfo.ResourcesCount
})
totalResources := int64(0)
for _, c := range clusters {
totalResources += c.Info.CacheInfo.ResourcesCount
}

deviation := 1.0 + float64(deviationPercent)/100.0
avgPerReplica := int64(float64(totalResources) / float64(replicas) * deviation)

nextShard := 0
resourcesInShard := int64(0)
for i, c := range clusters {
if cnt := resourcesInShard + c.Info.CacheInfo.ResourcesCount; cnt > avgPerReplica && resourcesInShard != 0 && nextShard < replicas-1 {
nextShard++
resourcesInShard = c.Info.CacheInfo.ResourcesCount
} else {
resourcesInShard = cnt
}
c.Shard = nextShard
clusters[i] = c
}

return clusters
}

func runClusterNamespacesCommand(clientConfig clientcmd.ClientConfig, action func(appClient *versioned.Clientset, argoDB db.ArgoDB, clusters map[string][]string) error) error {
clientCfg, err := clientConfig.ClientConfig()
if err != nil {
Expand Down