Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions applicationset/controllers/applicationset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package controllers
import (
"context"
"fmt"
"github.com/argoproj/argo-cd/v2/applicationset/controllers/sharding"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -58,13 +59,14 @@ const (
// ApplicationSetReconciler reconciles a ApplicationSet object
type ApplicationSetReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
Generators map[string]generators.Generator
ArgoDB db.ArgoDB
ArgoAppClientset appclientset.Interface
KubeClientset kubernetes.Interface
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
Generators map[string]generators.Generator
ArgoDB db.ArgoDB
ArgoAppClientset appclientset.Interface
KubeClientset kubernetes.Interface
ApplicationSetFilter sharding.ApplicationSetFilter
utils.Policy
utils.Renderer
}
Expand All @@ -91,6 +93,13 @@ func (r *ApplicationSetReconciler) Reconcile(ctx context.Context, req ctrl.Reque
return ctrl.Result{}, nil
}

if r.ApplicationSetFilter != nil && !r.ApplicationSetFilter(&applicationSetInfo) {
return ctrl.Result{}, nil
}

log.Debugf("Starting to process applicationset: %s", req.String())
defer log.Debugf("Completing to process applicationset: %s", req.String())

// Log a warning if there are unrecognized generators
utils.CheckInvalidGenerators(&applicationSetInfo)
// desiredApplications is the main list of all expected Applications from all generators in this appset.
Expand Down
84 changes: 84 additions & 0 deletions applicationset/controllers/sharding/sharding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package sharding

import (
"fmt"
"github.com/argoproj/argo-cd/v2/common"
argoprojiov1alpha1 "github.com/argoproj/argo-cd/v2/pkg/apis/applicationset/v1alpha1"
"github.com/argoproj/argo-cd/v2/util/env"
log "github.com/sirupsen/logrus"
"hash/fnv"
"math"
"strconv"
"strings"
)

// ApplicationSetFilter the function used by the controller to filter ApplicationSets that belongs to its shard
type ApplicationSetFilter func(appset *argoprojiov1alpha1.ApplicationSet) bool

var noFilter ApplicationSetFilter = func(appset *argoprojiov1alpha1.ApplicationSet) bool {
return true
}

// InferShardFromHostname tries to detect the shard which controller instance manages by its hostname
// For instance, applicationset-controller-0 manages the shard 0
// For instance, applicationset-controller-1 manages the shard 1
func InferShardFromHostname(hostnameDetector func() (string, error)) (int, error) {
hostname, err := hostnameDetector()
if err != nil {
return 0, err
}
parts := strings.Split(hostname, "-")
if len(parts) == 1 {
return 0, fmt.Errorf("hostname should ends with shard number separated by '-' but got: %s", hostname)
}
shard, err := strconv.Atoi(parts[len(parts)-1])
if err != nil {
return 0, fmt.Errorf("hostname should ends with shard number separated by '-' but got: %s", hostname)
}
return shard, nil
}

// InferShard initially tries to detect the shard which controller instance manages by environment variable
// If not specified, it fallbacks to InferShardFromHostname
func InferShard(hostnameDetector func() (string, error)) (int, error) {
shard := env.ParseNumFromEnv(common.EnvApplicationSetControllerShard, -1, -math.MaxInt32, math.MaxInt32)
if shard < 0 {
return InferShardFromHostname(hostnameDetector)
}
return shard, nil
}

func GenerateApplicationSetFilterForStatefulSet(hostnameDetector func() (string, error)) (ApplicationSetFilter, error) {
replicas := env.ParseNumFromEnv(common.EnvApplicationSetControllerReplicas, 0, 0, math.MaxInt32)
if replicas <= 1 {
return noFilter, nil
}

shard, err := InferShard(hostnameDetector)
if err != nil {
return nil, err
}
if shard >= replicas {
return nil, fmt.Errorf("illegal status detected while generating applicastionset filter we have %d replicas but controller assigned to %d shard", replicas, shard)
}
log.Debugf("Generating applicationset filter with replicas: %d, shard:%d", replicas, shard)

return func(appset *argoprojiov1alpha1.ApplicationSet) bool {
shardOfAppset := 0
if appset != nil {
shardOfAppset = getShardByID(string(appset.UID), replicas)
}
return shardOfAppset == shard
}, nil
}

// getShardByID calculates the shard as `id % replicas count`
func getShardByID(id string, replicas int) int {
if id == "" {
return 0
} else {
h := fnv.New32a()
_, _ = h.Write([]byte(id))
return int(h.Sum32() % uint32(replicas))

Check failure

Code scanning / CodeQL

Incorrect conversion between integer types

Incorrect conversion of an integer with architecture-dependent bit size from [strconv.Atoi](1) to a lower bit size type uint32 without an upper bound check.
}
}
188 changes: 188 additions & 0 deletions applicationset/controllers/sharding/sharding_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package sharding

import (
"errors"
"github.com/argoproj/argo-cd/v2/common"
argoprojiov1alpha1 "github.com/argoproj/argo-cd/v2/pkg/apis/applicationset/v1alpha1"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"reflect"
"testing"
)

func TestInferShardFromHostname(t *testing.T) {
type args struct {
hostnameDetector func() (string, error)
}
tests := []struct {
name string
args args
expectedShard int
expectingErr bool
}{
{
name: "Should return error when detector returns an error",
args: args{hostnameDetector: func() (string, error) {
return "", errors.New("fake-error")
}},
expectedShard: 0,
expectingErr: true,
},
{
name: "should return err when hostname does contain -",
args: args{hostnameDetector: func() (string, error) {
return "fakehostname", nil
}},
expectedShard: 0,
expectingErr: true,
},
{
name: "Should return error when hostname does not end with -<number>",
args: args{hostnameDetector: func() (string, error) {
return "fake-hostname", nil
}},
expectedShard: 0,
expectingErr: true,
},
{
name: "Should return shard number successfully",
args: args{hostnameDetector: func() (string, error) {
return "fake-hostname-12", nil
}},
expectedShard: 12,
expectingErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := InferShardFromHostname(tt.args.hostnameDetector)
assert.Equal(t, tt.expectingErr, err != nil)
assert.Equalf(t, tt.expectedShard, got, "InferShardFromHostname(%v)", tt.args.hostnameDetector)
})
}
}

func TestInferShard(t *testing.T) {
type args struct {
hostnameDetector func() (string, error)
}
tests := []struct {
name string
envVars map[string]string
args args
expectedShard int
expectingErr bool
}{
{
name: "should detect shard number from env successfully",
envVars: map[string]string{
common.EnvApplicationSetControllerShard: "6",
},
args: args{hostnameDetector: func() (string, error) {
return "fake-hostname-12", nil
}},
expectedShard: 6,
expectingErr: false,
},
{
name: "should fallback to hostname based detection when the given shard number is less than zero",
envVars: map[string]string{
common.EnvApplicationSetControllerShard: "-6",
},
args: args{hostnameDetector: func() (string, error) {
return "fake-hostname-12", nil
}},
expectedShard: 12,
expectingErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
for k, v := range tt.envVars {
t.Setenv(k, v)
}
got, err := InferShard(tt.args.hostnameDetector)
assert.Equal(t, tt.expectingErr, err != nil)
assert.Equalf(t, tt.expectedShard, got, "InferShard(%v)", tt.args.hostnameDetector)
})
}
}

func TestGenerateApplicationSetFilterForStatefulSetShouldReturnNoFilterIfNoReplicaSpecified(t *testing.T) {
// Given
mockHostnameDetector := func() (string, error) {
return "fake-hostname-12", nil
}

// When
filter, err := GenerateApplicationSetFilterForStatefulSet(mockHostnameDetector)

// Then
assert.NoError(t, err)
assert.True(t, reflect.ValueOf(noFilter).Pointer() == reflect.ValueOf(filter).Pointer())
}

func TestGenerateApplicationSetFilterForStatefulSetShouldReturnErrorWhenCouldNotInferShard(t *testing.T) {
// Given
t.Setenv(common.EnvApplicationSetControllerReplicas, "10")
mockHostnameDetector := func() (string, error) {
return "invalidhostname", nil
}

// When
filter, err := GenerateApplicationSetFilterForStatefulSet(mockHostnameDetector)

// Then
assert.Error(t, err)
assert.Nil(t, filter)
}

func TestGenerateApplicationSetFilterForStatefulSetShouldReturnErrorWhenInferredShardGreaterThanReplica(t *testing.T) {
// Given
t.Setenv(common.EnvApplicationSetControllerReplicas, "10")
t.Setenv(common.EnvApplicationSetControllerShard, "11")
mockHostnameDetector := func() (string, error) {
return "invalidhostname", nil
}

// When
filter, err := GenerateApplicationSetFilterForStatefulSet(mockHostnameDetector)

// Then
assert.Error(t, err)
assert.Nil(t, filter)
}

func TestGenerateApplicationSetFilterForStatefulSetShouldReturnFilterSuccessfully(t *testing.T) {
// Given
t.Setenv(common.EnvApplicationSetControllerReplicas, "10")
mockHostnameDetector := func() (string, error) {
return "hostname-8", nil
}

// When
filter, err := GenerateApplicationSetFilterForStatefulSet(mockHostnameDetector)

// Then
assert.NoError(t, err)
assert.NotNil(t, filter)

firstAppset := &argoprojiov1alpha1.ApplicationSet{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID("5"),
},
}
assert.True(t, filter(firstAppset))

secondAppset := &argoprojiov1alpha1.ApplicationSet{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID("8"),
},
}
assert.False(t, filter(secondAppset))
}
func TestGetShardByID(t *testing.T) {
assert.Equal(t, 0, getShardByID("", 10))
assert.Equal(t, 8, getShardByID("5", 10))
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package command
import (
"context"
"fmt"
"github.com/argoproj/argo-cd/v2/applicationset/controllers/sharding"
"net/http"
"os"
"strings"
Expand Down Expand Up @@ -164,17 +165,24 @@ func NewCommand() *cobra.Command {
"Merge": generators.NewMergeGenerator(nestedGenerators),
}

applicationSetFilter, err := sharding.GenerateApplicationSetFilterForStatefulSet(os.Hostname)
if err != nil {
log.Error(err, "could not generate applicationset filter for sharding")
os.Exit(1)
}

if err = (&controllers.ApplicationSetReconciler{
Generators: topLevelGenerators,
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("ApplicationSet"),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("applicationset-controller"),
Renderer: &utils.Render{},
Policy: policyObj,
ArgoAppClientset: appSetConfig,
KubeClientset: k8sClient,
ArgoDB: argoCDDB,
Generators: topLevelGenerators,
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("ApplicationSet"),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("applicationset-controller"),
Renderer: &utils.Render{},
Policy: policyObj,
ArgoAppClientset: appSetConfig,
KubeClientset: k8sClient,
ArgoDB: argoCDDB,
ApplicationSetFilter: applicationSetFilter,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller", "controller", "ApplicationSet")
os.Exit(1)
Expand Down
4 changes: 4 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,8 @@ func GetCMPWorkDir() string {
const (
// AnnotationApplicationRefresh is an annotation that is added when an ApplicationSet is requested to be refreshed by a webhook. The ApplicationSet controller will remove this annotation at the end of reconcilation.
AnnotationApplicationSetRefresh = "argocd.argoproj.io/application-set-refresh"
// EnvApplicationSetControllerReplicas is the number of controller replicas
EnvApplicationSetControllerReplicas = "APPLICATIONSET_CONTROLLER_REPLICAS"
// EnvApplicationSetControllerShard is the shard number that should be handled by controller
EnvApplicationSetControllerShard = "APPLICATIONSET_CONTROLLER_SHARD"
)