From ab1ab9eb1c566036951ae0b40831b822dfc90751 Mon Sep 17 00:00:00 2001 From: Manikandan R Date: Wed, 5 Jul 2023 13:06:53 +0530 Subject: [PATCH 1/7] [YUNIKORN-1851] Don't allow similar app ids by different users --- pkg/appmgmt/general/podevent_handler.go | 14 +- pkg/appmgmt/general/podevent_handler_test.go | 47 ++++- pkg/conf/schedulerconf.go | 173 ++++++++++--------- pkg/conf/schedulerconf_test.go | 3 + 4 files changed, 152 insertions(+), 85 deletions(-) diff --git a/pkg/appmgmt/general/podevent_handler.go b/pkg/appmgmt/general/podevent_handler.go index 87dc81f5d..3f7c02616 100644 --- a/pkg/appmgmt/general/podevent_handler.go +++ b/pkg/appmgmt/general/podevent_handler.go @@ -21,11 +21,12 @@ package general import ( "sync" - "github.com/apache/yunikorn-k8shim/pkg/appmgmt/interfaces" - "github.com/apache/yunikorn-k8shim/pkg/log" - "go.uber.org/zap" v1 "k8s.io/api/core/v1" + + "github.com/apache/yunikorn-k8shim/pkg/appmgmt/interfaces" + "github.com/apache/yunikorn-k8shim/pkg/conf" + "github.com/apache/yunikorn-k8shim/pkg/log" ) type PodEventHandler struct { @@ -127,6 +128,13 @@ func (p *PodEventHandler) addPod(pod *v1.Pod, eventSource EventSource) interface Metadata: appMeta, }) } else { + if conf.GetSchedulerConf().GetAllowSimilarAppIdsByDifferentUsers() && app.GetApplicationState() == "Running" && app.GetUser() != appMeta.User { + log.Log(log.ShimAppMgmtGeneral).Warn("application has been submitted by different user", + zap.String("app id ", appMeta.ApplicationID), + zap.String("app user", app.GetUser()), + zap.String("submitted by", appMeta.User)) + return nil + } managedApp = app appExists = true } diff --git a/pkg/appmgmt/general/podevent_handler_test.go b/pkg/appmgmt/general/podevent_handler_test.go index d56ff46c6..bccc3ae41 100644 --- a/pkg/appmgmt/general/podevent_handler_test.go +++ b/pkg/appmgmt/general/podevent_handler_test.go @@ -28,6 +28,7 @@ import ( "github.com/apache/yunikorn-k8shim/pkg/cache" "github.com/apache/yunikorn-k8shim/pkg/common/constants" + "github.com/apache/yunikorn-k8shim/pkg/conf" ) const appID = "app00001" @@ -99,7 +100,48 @@ func TestRecoveryDone(t *testing.T) { assert.Equal(t, false, podEventHandler.recoveryRunning) } +func TestAllowSimilarAppIdsByDifferentUsers(t *testing.T) { + amProtocol := cache.NewMockedAMProtocol() + podEventHandler := NewPodEventHandler(amProtocol, false) + + // create new app appID + pod1 := newPodByUser("pod1", "test") + app1 := podEventHandler.HandleEvent(AddPod, Informers, pod1) + assert.Equal(t, len(podEventHandler.asyncEvents), 0) + assert.Assert(t, app1 != nil) + app1.SetState(cache.ApplicationStates().Running) + + // create same app appID and ensure app obj is getting created because allowSimilarAppIdsByDifferentUsers is false by default + pod2 := newPodByUser("pod1", "test") + app2 := podEventHandler.HandleEvent(AddPod, Informers, pod2) + assert.Assert(t, app2 != nil) + app2.SetState(cache.ApplicationStates().Accepted) + + // set allowSimilarAppIdsByDifferentUsers to true + err := conf.UpdateConfigMaps([]*v1.ConfigMap{ + {Data: map[string]string{conf.CMSvcAllowSimilarAppIdsByDifferentUsers: "true"}}, + }, true) + assert.NilError(t, err, "UpdateConfigMap failed") + + // create same app appID and ensure app obj is getting created because "Accepted" state + pod2 = newPodByUser("pod1", "test") + app3 := podEventHandler.HandleEvent(AddPod, Informers, pod2) + assert.Assert(t, app3 != nil) + + // set app state to "Running" + app3.SetState(cache.ApplicationStates().Running) + + // create same app appID and ensure app obj is not getting created because user is different from earlier submission + pod2 = newPodByUser("pod1", "test1") + app4 := podEventHandler.HandleEvent(AddPod, Informers, pod2) + assert.Assert(t, app4 == nil) +} + func newPod(name string) *v1.Pod { + return newPodByUser(name, "nobody") +} + +func newPodByUser(name string, user string) *v1.Pod { return &v1.Pod{ TypeMeta: apis.TypeMeta{ Kind: "Pod", @@ -110,8 +152,9 @@ func newPod(name string) *v1.Pod { Namespace: "default", UID: types.UID(name), Labels: map[string]string{ - "queue": "root.a", - "applicationId": appID, + "queue": "root.a", + "applicationId": appID, + constants.DefaultUserLabel: user, }, }, Spec: v1.PodSpec{ diff --git a/pkg/conf/schedulerconf.go b/pkg/conf/schedulerconf.go index 7b92cefdd..9550c0599 100644 --- a/pkg/conf/schedulerconf.go +++ b/pkg/conf/schedulerconf.go @@ -53,35 +53,37 @@ const ( PrefixKubernetes = "kubernetes." // service - CMSvcClusterID = PrefixService + "clusterId" - CMSvcPolicyGroup = PrefixService + "policyGroup" - CMSvcSchedulingInterval = PrefixService + "schedulingInterval" - CMSvcVolumeBindTimeout = PrefixService + "volumeBindTimeout" - CMSvcEventChannelCapacity = PrefixService + "eventChannelCapacity" - CMSvcDispatchTimeout = PrefixService + "dispatchTimeout" - CMSvcOperatorPlugins = PrefixService + "operatorPlugins" - CMSvcDisableGangScheduling = PrefixService + "disableGangScheduling" - CMSvcEnableConfigHotRefresh = PrefixService + "enableConfigHotRefresh" - CMSvcPlaceholderImage = PrefixService + "placeholderImage" - CMSvcNodeInstanceTypeNodeLabelKey = PrefixService + "nodeInstanceTypeNodeLabelKey" + CMSvcClusterID = PrefixService + "clusterId" + CMSvcPolicyGroup = PrefixService + "policyGroup" + CMSvcSchedulingInterval = PrefixService + "schedulingInterval" + CMSvcVolumeBindTimeout = PrefixService + "volumeBindTimeout" + CMSvcEventChannelCapacity = PrefixService + "eventChannelCapacity" + CMSvcDispatchTimeout = PrefixService + "dispatchTimeout" + CMSvcOperatorPlugins = PrefixService + "operatorPlugins" + CMSvcDisableGangScheduling = PrefixService + "disableGangScheduling" + CMSvcEnableConfigHotRefresh = PrefixService + "enableConfigHotRefresh" + CMSvcPlaceholderImage = PrefixService + "placeholderImage" + CMSvcNodeInstanceTypeNodeLabelKey = PrefixService + "nodeInstanceTypeNodeLabelKey" + CMSvcAllowSimilarAppIdsByDifferentUsers = PrefixService + "allowSimilarAppIdsByDifferentUsers" // kubernetes CMKubeQPS = PrefixKubernetes + "qps" CMKubeBurst = PrefixKubernetes + "burst" // defaults - DefaultNamespace = "default" - DefaultClusterID = "mycluster" - DefaultPolicyGroup = "queues" - DefaultSchedulingInterval = time.Second - DefaultVolumeBindTimeout = 10 * time.Second - DefaultEventChannelCapacity = 1024 * 1024 - DefaultDispatchTimeout = 300 * time.Second - DefaultOperatorPlugins = "general" - DefaultDisableGangScheduling = false - DefaultEnableConfigHotRefresh = true - DefaultKubeQPS = 1000 - DefaultKubeBurst = 1000 + DefaultNamespace = "default" + DefaultClusterID = "mycluster" + DefaultPolicyGroup = "queues" + DefaultSchedulingInterval = time.Second + DefaultVolumeBindTimeout = 10 * time.Second + DefaultEventChannelCapacity = 1024 * 1024 + DefaultDispatchTimeout = 300 * time.Second + DefaultOperatorPlugins = "general" + DefaultDisableGangScheduling = false + DefaultEnableConfigHotRefresh = true + DefaultKubeQPS = 1000 + DefaultKubeBurst = 1000 + DefaultAllowSimilarAppIdsByDifferentUsers = false ) var ( @@ -101,25 +103,26 @@ var confHolder atomic.Value var kubeLoggerOnce sync.Once type SchedulerConf struct { - SchedulerName string `json:"schedulerName"` - ClusterID string `json:"clusterId"` - ClusterVersion string `json:"clusterVersion"` - PolicyGroup string `json:"policyGroup"` - Interval time.Duration `json:"schedulingIntervalSecond"` - KubeConfig string `json:"absoluteKubeConfigFilePath"` - VolumeBindTimeout time.Duration `json:"volumeBindTimeout"` - TestMode bool `json:"testMode"` - EventChannelCapacity int `json:"eventChannelCapacity"` - DispatchTimeout time.Duration `json:"dispatchTimeout"` - KubeQPS int `json:"kubeQPS"` - KubeBurst int `json:"kubeBurst"` - OperatorPlugins string `json:"operatorPlugins"` - EnableConfigHotRefresh bool `json:"enableConfigHotRefresh"` - DisableGangScheduling bool `json:"disableGangScheduling"` - UserLabelKey string `json:"userLabelKey"` - PlaceHolderImage string `json:"placeHolderImage"` - InstanceTypeNodeLabelKey string `json:"instanceTypeNodeLabelKey"` - Namespace string `json:"namespace"` + SchedulerName string `json:"schedulerName"` + ClusterID string `json:"clusterId"` + ClusterVersion string `json:"clusterVersion"` + PolicyGroup string `json:"policyGroup"` + Interval time.Duration `json:"schedulingIntervalSecond"` + KubeConfig string `json:"absoluteKubeConfigFilePath"` + VolumeBindTimeout time.Duration `json:"volumeBindTimeout"` + TestMode bool `json:"testMode"` + EventChannelCapacity int `json:"eventChannelCapacity"` + DispatchTimeout time.Duration `json:"dispatchTimeout"` + KubeQPS int `json:"kubeQPS"` + KubeBurst int `json:"kubeBurst"` + OperatorPlugins string `json:"operatorPlugins"` + EnableConfigHotRefresh bool `json:"enableConfigHotRefresh"` + DisableGangScheduling bool `json:"disableGangScheduling"` + UserLabelKey string `json:"userLabelKey"` + PlaceHolderImage string `json:"placeHolderImage"` + InstanceTypeNodeLabelKey string `json:"instanceTypeNodeLabelKey"` + Namespace string `json:"namespace"` + AllowSimilarAppIdsByDifferentUsers bool `json:"allowSimilarAppIdsByDifferentUsers"` sync.RWMutex } @@ -128,25 +131,26 @@ func (conf *SchedulerConf) Clone() *SchedulerConf { defer conf.RUnlock() return &SchedulerConf{ - SchedulerName: conf.SchedulerName, - ClusterID: conf.ClusterID, - ClusterVersion: conf.ClusterVersion, - PolicyGroup: conf.PolicyGroup, - Interval: conf.Interval, - KubeConfig: conf.KubeConfig, - VolumeBindTimeout: conf.VolumeBindTimeout, - TestMode: conf.TestMode, - EventChannelCapacity: conf.EventChannelCapacity, - DispatchTimeout: conf.DispatchTimeout, - KubeQPS: conf.KubeQPS, - KubeBurst: conf.KubeBurst, - OperatorPlugins: conf.OperatorPlugins, - EnableConfigHotRefresh: conf.EnableConfigHotRefresh, - DisableGangScheduling: conf.DisableGangScheduling, - UserLabelKey: conf.UserLabelKey, - PlaceHolderImage: conf.PlaceHolderImage, - InstanceTypeNodeLabelKey: conf.InstanceTypeNodeLabelKey, - Namespace: conf.Namespace, + SchedulerName: conf.SchedulerName, + ClusterID: conf.ClusterID, + ClusterVersion: conf.ClusterVersion, + PolicyGroup: conf.PolicyGroup, + Interval: conf.Interval, + KubeConfig: conf.KubeConfig, + VolumeBindTimeout: conf.VolumeBindTimeout, + TestMode: conf.TestMode, + EventChannelCapacity: conf.EventChannelCapacity, + DispatchTimeout: conf.DispatchTimeout, + KubeQPS: conf.KubeQPS, + KubeBurst: conf.KubeBurst, + OperatorPlugins: conf.OperatorPlugins, + EnableConfigHotRefresh: conf.EnableConfigHotRefresh, + DisableGangScheduling: conf.DisableGangScheduling, + UserLabelKey: conf.UserLabelKey, + PlaceHolderImage: conf.PlaceHolderImage, + InstanceTypeNodeLabelKey: conf.InstanceTypeNodeLabelKey, + Namespace: conf.Namespace, + AllowSimilarAppIdsByDifferentUsers: conf.AllowSimilarAppIdsByDifferentUsers, } } @@ -204,6 +208,7 @@ func handleNonReloadableConfig(old *SchedulerConf, new *SchedulerConf) { checkNonReloadableBool(CMSvcDisableGangScheduling, &old.DisableGangScheduling, &new.DisableGangScheduling) checkNonReloadableString(CMSvcPlaceholderImage, &old.PlaceHolderImage, &new.PlaceHolderImage) checkNonReloadableString(CMSvcNodeInstanceTypeNodeLabelKey, &old.InstanceTypeNodeLabelKey, &new.InstanceTypeNodeLabelKey) + checkNonReloadableBool(CMSvcAllowSimilarAppIdsByDifferentUsers, &old.AllowSimilarAppIdsByDifferentUsers, &new.AllowSimilarAppIdsByDifferentUsers) } const warningNonReloadable = "ignoring non-reloadable configuration change (restart required to update)" @@ -295,6 +300,12 @@ func GetSchedulerNamespace() string { return DefaultNamespace } +func (conf *SchedulerConf) GetAllowSimilarAppIdsByDifferentUsers() bool { + conf.RLock() + defer conf.RUnlock() + return conf.AllowSimilarAppIdsByDifferentUsers +} + func createConfigs() { confHolder.Store(CreateDefaultConfig()) } @@ -314,25 +325,26 @@ func GetDefaultKubeConfigPath() string { // CreateDefaultConfig creates and returns a configuration representing all default values func CreateDefaultConfig() *SchedulerConf { return &SchedulerConf{ - SchedulerName: constants.SchedulerName, - Namespace: GetSchedulerNamespace(), - ClusterID: DefaultClusterID, - ClusterVersion: buildVersion, - PolicyGroup: DefaultPolicyGroup, - Interval: DefaultSchedulingInterval, - KubeConfig: GetDefaultKubeConfigPath(), - VolumeBindTimeout: DefaultVolumeBindTimeout, - TestMode: false, - EventChannelCapacity: DefaultEventChannelCapacity, - DispatchTimeout: DefaultDispatchTimeout, - KubeQPS: DefaultKubeQPS, - KubeBurst: DefaultKubeBurst, - OperatorPlugins: DefaultOperatorPlugins, - EnableConfigHotRefresh: DefaultEnableConfigHotRefresh, - DisableGangScheduling: DefaultDisableGangScheduling, - UserLabelKey: constants.DefaultUserLabel, - PlaceHolderImage: constants.PlaceholderContainerImage, - InstanceTypeNodeLabelKey: constants.DefaultNodeInstanceTypeNodeLabelKey, + SchedulerName: constants.SchedulerName, + Namespace: GetSchedulerNamespace(), + ClusterID: DefaultClusterID, + ClusterVersion: buildVersion, + PolicyGroup: DefaultPolicyGroup, + Interval: DefaultSchedulingInterval, + KubeConfig: GetDefaultKubeConfigPath(), + VolumeBindTimeout: DefaultVolumeBindTimeout, + TestMode: false, + EventChannelCapacity: DefaultEventChannelCapacity, + DispatchTimeout: DefaultDispatchTimeout, + KubeQPS: DefaultKubeQPS, + KubeBurst: DefaultKubeBurst, + OperatorPlugins: DefaultOperatorPlugins, + EnableConfigHotRefresh: DefaultEnableConfigHotRefresh, + DisableGangScheduling: DefaultDisableGangScheduling, + UserLabelKey: constants.DefaultUserLabel, + PlaceHolderImage: constants.PlaceholderContainerImage, + InstanceTypeNodeLabelKey: constants.DefaultNodeInstanceTypeNodeLabelKey, + AllowSimilarAppIdsByDifferentUsers: DefaultAllowSimilarAppIdsByDifferentUsers, } } @@ -358,6 +370,7 @@ func parseConfig(config map[string]string, prev *SchedulerConf) (*SchedulerConf, parser.boolVar(&conf.EnableConfigHotRefresh, CMSvcEnableConfigHotRefresh) parser.stringVar(&conf.PlaceHolderImage, CMSvcPlaceholderImage) parser.stringVar(&conf.InstanceTypeNodeLabelKey, CMSvcNodeInstanceTypeNodeLabelKey) + parser.boolVar(&conf.AllowSimilarAppIdsByDifferentUsers, CMSvcAllowSimilarAppIdsByDifferentUsers) // kubernetes parser.intVar(&conf.KubeQPS, CMKubeQPS) diff --git a/pkg/conf/schedulerconf_test.go b/pkg/conf/schedulerconf_test.go index 096e3796d..565924c77 100644 --- a/pkg/conf/schedulerconf_test.go +++ b/pkg/conf/schedulerconf_test.go @@ -70,6 +70,7 @@ func assertDefaults(t *testing.T, conf *SchedulerConf) { assert.Equal(t, conf.KubeQPS, DefaultKubeQPS) assert.Equal(t, conf.KubeBurst, DefaultKubeBurst) assert.Equal(t, conf.UserLabelKey, constants.DefaultUserLabel) + assert.Equal(t, conf.AllowSimilarAppIdsByDifferentUsers, DefaultAllowSimilarAppIdsByDifferentUsers) } func TestParseConfigMap(t *testing.T) { @@ -91,6 +92,7 @@ func TestParseConfigMap(t *testing.T) { {CMSvcNodeInstanceTypeNodeLabelKey, "InstanceTypeNodeLabelKey", "node.kubernetes.io/instance-type"}, {CMKubeQPS, "KubeQPS", 2345}, {CMKubeBurst, "KubeBurst", 3456}, + {CMSvcAllowSimilarAppIdsByDifferentUsers, "AllowSimilarAppIdsByDifferentUsers", true}, } for _, tc := range testCases { @@ -123,6 +125,7 @@ func TestUpdateConfigMapNonReloadable(t *testing.T) { {CMSvcNodeInstanceTypeNodeLabelKey, "InstanceTypeNodeLabelKey", "node.kubernetes.io/instance-type", false}, {CMKubeQPS, "KubeQPS", 2345, false}, {CMKubeBurst, "KubeBurst", 3456, false}, + {CMSvcAllowSimilarAppIdsByDifferentUsers, "AllowSimilarAppIdsByDifferentUsers", true, false}, } for _, tc := range testCases { From 32fb533a3931aa6b7b16d4188120259a2a12de81 Mon Sep 17 00:00:00 2001 From: Manikandan R Date: Mon, 10 Jul 2023 17:18:03 +0530 Subject: [PATCH 2/7] Addressed review comments --- pkg/appmgmt/general/podevent_handler.go | 2 +- pkg/appmgmt/general/podevent_handler_test.go | 2 +- pkg/conf/schedulerconf.go | 178 +++++++++---------- pkg/conf/schedulerconf_test.go | 6 +- 4 files changed, 94 insertions(+), 94 deletions(-) diff --git a/pkg/appmgmt/general/podevent_handler.go b/pkg/appmgmt/general/podevent_handler.go index 3f7c02616..52e11635d 100644 --- a/pkg/appmgmt/general/podevent_handler.go +++ b/pkg/appmgmt/general/podevent_handler.go @@ -128,7 +128,7 @@ func (p *PodEventHandler) addPod(pod *v1.Pod, eventSource EventSource) interface Metadata: appMeta, }) } else { - if conf.GetSchedulerConf().GetAllowSimilarAppIdsByDifferentUsers() && app.GetApplicationState() == "Running" && app.GetUser() != appMeta.User { + if conf.GetSchedulerConf().GetSingleUserPerApplication() && app.GetUser() != appMeta.User { log.Log(log.ShimAppMgmtGeneral).Warn("application has been submitted by different user", zap.String("app id ", appMeta.ApplicationID), zap.String("app user", app.GetUser()), diff --git a/pkg/appmgmt/general/podevent_handler_test.go b/pkg/appmgmt/general/podevent_handler_test.go index bccc3ae41..6a306f633 100644 --- a/pkg/appmgmt/general/podevent_handler_test.go +++ b/pkg/appmgmt/general/podevent_handler_test.go @@ -119,7 +119,7 @@ func TestAllowSimilarAppIdsByDifferentUsers(t *testing.T) { // set allowSimilarAppIdsByDifferentUsers to true err := conf.UpdateConfigMaps([]*v1.ConfigMap{ - {Data: map[string]string{conf.CMSvcAllowSimilarAppIdsByDifferentUsers: "true"}}, + {Data: map[string]string{conf.CMSvcSingleUserPerApplication: "true"}}, }, true) assert.NilError(t, err, "UpdateConfigMap failed") diff --git a/pkg/conf/schedulerconf.go b/pkg/conf/schedulerconf.go index 9550c0599..f19009f7a 100644 --- a/pkg/conf/schedulerconf.go +++ b/pkg/conf/schedulerconf.go @@ -53,37 +53,37 @@ const ( PrefixKubernetes = "kubernetes." // service - CMSvcClusterID = PrefixService + "clusterId" - CMSvcPolicyGroup = PrefixService + "policyGroup" - CMSvcSchedulingInterval = PrefixService + "schedulingInterval" - CMSvcVolumeBindTimeout = PrefixService + "volumeBindTimeout" - CMSvcEventChannelCapacity = PrefixService + "eventChannelCapacity" - CMSvcDispatchTimeout = PrefixService + "dispatchTimeout" - CMSvcOperatorPlugins = PrefixService + "operatorPlugins" - CMSvcDisableGangScheduling = PrefixService + "disableGangScheduling" - CMSvcEnableConfigHotRefresh = PrefixService + "enableConfigHotRefresh" - CMSvcPlaceholderImage = PrefixService + "placeholderImage" - CMSvcNodeInstanceTypeNodeLabelKey = PrefixService + "nodeInstanceTypeNodeLabelKey" - CMSvcAllowSimilarAppIdsByDifferentUsers = PrefixService + "allowSimilarAppIdsByDifferentUsers" + CMSvcClusterID = PrefixService + "clusterId" + CMSvcPolicyGroup = PrefixService + "policyGroup" + CMSvcSchedulingInterval = PrefixService + "schedulingInterval" + CMSvcVolumeBindTimeout = PrefixService + "volumeBindTimeout" + CMSvcEventChannelCapacity = PrefixService + "eventChannelCapacity" + CMSvcDispatchTimeout = PrefixService + "dispatchTimeout" + CMSvcOperatorPlugins = PrefixService + "operatorPlugins" + CMSvcDisableGangScheduling = PrefixService + "disableGangScheduling" + CMSvcEnableConfigHotRefresh = PrefixService + "enableConfigHotRefresh" + CMSvcPlaceholderImage = PrefixService + "placeholderImage" + CMSvcNodeInstanceTypeNodeLabelKey = PrefixService + "nodeInstanceTypeNodeLabelKey" + CMSvcSingleUserPerApplication = PrefixService + "singleUserPerApplication" // kubernetes CMKubeQPS = PrefixKubernetes + "qps" CMKubeBurst = PrefixKubernetes + "burst" // defaults - DefaultNamespace = "default" - DefaultClusterID = "mycluster" - DefaultPolicyGroup = "queues" - DefaultSchedulingInterval = time.Second - DefaultVolumeBindTimeout = 10 * time.Second - DefaultEventChannelCapacity = 1024 * 1024 - DefaultDispatchTimeout = 300 * time.Second - DefaultOperatorPlugins = "general" - DefaultDisableGangScheduling = false - DefaultEnableConfigHotRefresh = true - DefaultKubeQPS = 1000 - DefaultKubeBurst = 1000 - DefaultAllowSimilarAppIdsByDifferentUsers = false + DefaultNamespace = "default" + DefaultClusterID = "mycluster" + DefaultPolicyGroup = "queues" + DefaultSchedulingInterval = time.Second + DefaultVolumeBindTimeout = 10 * time.Second + DefaultEventChannelCapacity = 1024 * 1024 + DefaultDispatchTimeout = 300 * time.Second + DefaultOperatorPlugins = "general" + DefaultDisableGangScheduling = false + DefaultEnableConfigHotRefresh = true + DefaultKubeQPS = 1000 + DefaultKubeBurst = 1000 + DefaultSingleUserPerApplication = false ) var ( @@ -103,26 +103,26 @@ var confHolder atomic.Value var kubeLoggerOnce sync.Once type SchedulerConf struct { - SchedulerName string `json:"schedulerName"` - ClusterID string `json:"clusterId"` - ClusterVersion string `json:"clusterVersion"` - PolicyGroup string `json:"policyGroup"` - Interval time.Duration `json:"schedulingIntervalSecond"` - KubeConfig string `json:"absoluteKubeConfigFilePath"` - VolumeBindTimeout time.Duration `json:"volumeBindTimeout"` - TestMode bool `json:"testMode"` - EventChannelCapacity int `json:"eventChannelCapacity"` - DispatchTimeout time.Duration `json:"dispatchTimeout"` - KubeQPS int `json:"kubeQPS"` - KubeBurst int `json:"kubeBurst"` - OperatorPlugins string `json:"operatorPlugins"` - EnableConfigHotRefresh bool `json:"enableConfigHotRefresh"` - DisableGangScheduling bool `json:"disableGangScheduling"` - UserLabelKey string `json:"userLabelKey"` - PlaceHolderImage string `json:"placeHolderImage"` - InstanceTypeNodeLabelKey string `json:"instanceTypeNodeLabelKey"` - Namespace string `json:"namespace"` - AllowSimilarAppIdsByDifferentUsers bool `json:"allowSimilarAppIdsByDifferentUsers"` + SchedulerName string `json:"schedulerName"` + ClusterID string `json:"clusterId"` + ClusterVersion string `json:"clusterVersion"` + PolicyGroup string `json:"policyGroup"` + Interval time.Duration `json:"schedulingIntervalSecond"` + KubeConfig string `json:"absoluteKubeConfigFilePath"` + VolumeBindTimeout time.Duration `json:"volumeBindTimeout"` + TestMode bool `json:"testMode"` + EventChannelCapacity int `json:"eventChannelCapacity"` + DispatchTimeout time.Duration `json:"dispatchTimeout"` + KubeQPS int `json:"kubeQPS"` + KubeBurst int `json:"kubeBurst"` + OperatorPlugins string `json:"operatorPlugins"` + EnableConfigHotRefresh bool `json:"enableConfigHotRefresh"` + DisableGangScheduling bool `json:"disableGangScheduling"` + UserLabelKey string `json:"userLabelKey"` + PlaceHolderImage string `json:"placeHolderImage"` + InstanceTypeNodeLabelKey string `json:"instanceTypeNodeLabelKey"` + Namespace string `json:"namespace"` + SingleUserPerApplication bool `json:"singleUserPerApplication"` sync.RWMutex } @@ -131,26 +131,26 @@ func (conf *SchedulerConf) Clone() *SchedulerConf { defer conf.RUnlock() return &SchedulerConf{ - SchedulerName: conf.SchedulerName, - ClusterID: conf.ClusterID, - ClusterVersion: conf.ClusterVersion, - PolicyGroup: conf.PolicyGroup, - Interval: conf.Interval, - KubeConfig: conf.KubeConfig, - VolumeBindTimeout: conf.VolumeBindTimeout, - TestMode: conf.TestMode, - EventChannelCapacity: conf.EventChannelCapacity, - DispatchTimeout: conf.DispatchTimeout, - KubeQPS: conf.KubeQPS, - KubeBurst: conf.KubeBurst, - OperatorPlugins: conf.OperatorPlugins, - EnableConfigHotRefresh: conf.EnableConfigHotRefresh, - DisableGangScheduling: conf.DisableGangScheduling, - UserLabelKey: conf.UserLabelKey, - PlaceHolderImage: conf.PlaceHolderImage, - InstanceTypeNodeLabelKey: conf.InstanceTypeNodeLabelKey, - Namespace: conf.Namespace, - AllowSimilarAppIdsByDifferentUsers: conf.AllowSimilarAppIdsByDifferentUsers, + SchedulerName: conf.SchedulerName, + ClusterID: conf.ClusterID, + ClusterVersion: conf.ClusterVersion, + PolicyGroup: conf.PolicyGroup, + Interval: conf.Interval, + KubeConfig: conf.KubeConfig, + VolumeBindTimeout: conf.VolumeBindTimeout, + TestMode: conf.TestMode, + EventChannelCapacity: conf.EventChannelCapacity, + DispatchTimeout: conf.DispatchTimeout, + KubeQPS: conf.KubeQPS, + KubeBurst: conf.KubeBurst, + OperatorPlugins: conf.OperatorPlugins, + EnableConfigHotRefresh: conf.EnableConfigHotRefresh, + DisableGangScheduling: conf.DisableGangScheduling, + UserLabelKey: conf.UserLabelKey, + PlaceHolderImage: conf.PlaceHolderImage, + InstanceTypeNodeLabelKey: conf.InstanceTypeNodeLabelKey, + Namespace: conf.Namespace, + SingleUserPerApplication: conf.SingleUserPerApplication, } } @@ -208,7 +208,7 @@ func handleNonReloadableConfig(old *SchedulerConf, new *SchedulerConf) { checkNonReloadableBool(CMSvcDisableGangScheduling, &old.DisableGangScheduling, &new.DisableGangScheduling) checkNonReloadableString(CMSvcPlaceholderImage, &old.PlaceHolderImage, &new.PlaceHolderImage) checkNonReloadableString(CMSvcNodeInstanceTypeNodeLabelKey, &old.InstanceTypeNodeLabelKey, &new.InstanceTypeNodeLabelKey) - checkNonReloadableBool(CMSvcAllowSimilarAppIdsByDifferentUsers, &old.AllowSimilarAppIdsByDifferentUsers, &new.AllowSimilarAppIdsByDifferentUsers) + checkNonReloadableBool(CMSvcSingleUserPerApplication, &old.SingleUserPerApplication, &new.SingleUserPerApplication) } const warningNonReloadable = "ignoring non-reloadable configuration change (restart required to update)" @@ -300,10 +300,10 @@ func GetSchedulerNamespace() string { return DefaultNamespace } -func (conf *SchedulerConf) GetAllowSimilarAppIdsByDifferentUsers() bool { +func (conf *SchedulerConf) GetSingleUserPerApplication() bool { conf.RLock() defer conf.RUnlock() - return conf.AllowSimilarAppIdsByDifferentUsers + return conf.SingleUserPerApplication } func createConfigs() { @@ -325,26 +325,26 @@ func GetDefaultKubeConfigPath() string { // CreateDefaultConfig creates and returns a configuration representing all default values func CreateDefaultConfig() *SchedulerConf { return &SchedulerConf{ - SchedulerName: constants.SchedulerName, - Namespace: GetSchedulerNamespace(), - ClusterID: DefaultClusterID, - ClusterVersion: buildVersion, - PolicyGroup: DefaultPolicyGroup, - Interval: DefaultSchedulingInterval, - KubeConfig: GetDefaultKubeConfigPath(), - VolumeBindTimeout: DefaultVolumeBindTimeout, - TestMode: false, - EventChannelCapacity: DefaultEventChannelCapacity, - DispatchTimeout: DefaultDispatchTimeout, - KubeQPS: DefaultKubeQPS, - KubeBurst: DefaultKubeBurst, - OperatorPlugins: DefaultOperatorPlugins, - EnableConfigHotRefresh: DefaultEnableConfigHotRefresh, - DisableGangScheduling: DefaultDisableGangScheduling, - UserLabelKey: constants.DefaultUserLabel, - PlaceHolderImage: constants.PlaceholderContainerImage, - InstanceTypeNodeLabelKey: constants.DefaultNodeInstanceTypeNodeLabelKey, - AllowSimilarAppIdsByDifferentUsers: DefaultAllowSimilarAppIdsByDifferentUsers, + SchedulerName: constants.SchedulerName, + Namespace: GetSchedulerNamespace(), + ClusterID: DefaultClusterID, + ClusterVersion: buildVersion, + PolicyGroup: DefaultPolicyGroup, + Interval: DefaultSchedulingInterval, + KubeConfig: GetDefaultKubeConfigPath(), + VolumeBindTimeout: DefaultVolumeBindTimeout, + TestMode: false, + EventChannelCapacity: DefaultEventChannelCapacity, + DispatchTimeout: DefaultDispatchTimeout, + KubeQPS: DefaultKubeQPS, + KubeBurst: DefaultKubeBurst, + OperatorPlugins: DefaultOperatorPlugins, + EnableConfigHotRefresh: DefaultEnableConfigHotRefresh, + DisableGangScheduling: DefaultDisableGangScheduling, + UserLabelKey: constants.DefaultUserLabel, + PlaceHolderImage: constants.PlaceholderContainerImage, + InstanceTypeNodeLabelKey: constants.DefaultNodeInstanceTypeNodeLabelKey, + SingleUserPerApplication: DefaultSingleUserPerApplication, } } @@ -370,7 +370,7 @@ func parseConfig(config map[string]string, prev *SchedulerConf) (*SchedulerConf, parser.boolVar(&conf.EnableConfigHotRefresh, CMSvcEnableConfigHotRefresh) parser.stringVar(&conf.PlaceHolderImage, CMSvcPlaceholderImage) parser.stringVar(&conf.InstanceTypeNodeLabelKey, CMSvcNodeInstanceTypeNodeLabelKey) - parser.boolVar(&conf.AllowSimilarAppIdsByDifferentUsers, CMSvcAllowSimilarAppIdsByDifferentUsers) + parser.boolVar(&conf.SingleUserPerApplication, CMSvcSingleUserPerApplication) // kubernetes parser.intVar(&conf.KubeQPS, CMKubeQPS) diff --git a/pkg/conf/schedulerconf_test.go b/pkg/conf/schedulerconf_test.go index 565924c77..2b92bef80 100644 --- a/pkg/conf/schedulerconf_test.go +++ b/pkg/conf/schedulerconf_test.go @@ -70,7 +70,7 @@ func assertDefaults(t *testing.T, conf *SchedulerConf) { assert.Equal(t, conf.KubeQPS, DefaultKubeQPS) assert.Equal(t, conf.KubeBurst, DefaultKubeBurst) assert.Equal(t, conf.UserLabelKey, constants.DefaultUserLabel) - assert.Equal(t, conf.AllowSimilarAppIdsByDifferentUsers, DefaultAllowSimilarAppIdsByDifferentUsers) + assert.Equal(t, conf.SingleUserPerApplication, DefaultSingleUserPerApplication) } func TestParseConfigMap(t *testing.T) { @@ -92,7 +92,7 @@ func TestParseConfigMap(t *testing.T) { {CMSvcNodeInstanceTypeNodeLabelKey, "InstanceTypeNodeLabelKey", "node.kubernetes.io/instance-type"}, {CMKubeQPS, "KubeQPS", 2345}, {CMKubeBurst, "KubeBurst", 3456}, - {CMSvcAllowSimilarAppIdsByDifferentUsers, "AllowSimilarAppIdsByDifferentUsers", true}, + {CMSvcSingleUserPerApplication, "SingleUserPerApplication", true}, } for _, tc := range testCases { @@ -125,7 +125,7 @@ func TestUpdateConfigMapNonReloadable(t *testing.T) { {CMSvcNodeInstanceTypeNodeLabelKey, "InstanceTypeNodeLabelKey", "node.kubernetes.io/instance-type", false}, {CMKubeQPS, "KubeQPS", 2345, false}, {CMKubeBurst, "KubeBurst", 3456, false}, - {CMSvcAllowSimilarAppIdsByDifferentUsers, "AllowSimilarAppIdsByDifferentUsers", true, false}, + {CMSvcSingleUserPerApplication, "SingleUserPerApplication", true, false}, } for _, tc := range testCases { From a06f9be1b2f564b2f963b6e763bc6d9e8d4a0afc Mon Sep 17 00:00:00 2001 From: Manikandan R Date: Mon, 17 Jul 2023 21:01:29 +0530 Subject: [PATCH 3/7] Minor comment changes --- pkg/appmgmt/general/podevent_handler_test.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/pkg/appmgmt/general/podevent_handler_test.go b/pkg/appmgmt/general/podevent_handler_test.go index 6a306f633..0ecf53396 100644 --- a/pkg/appmgmt/general/podevent_handler_test.go +++ b/pkg/appmgmt/general/podevent_handler_test.go @@ -117,21 +117,18 @@ func TestAllowSimilarAppIdsByDifferentUsers(t *testing.T) { assert.Assert(t, app2 != nil) app2.SetState(cache.ApplicationStates().Accepted) - // set allowSimilarAppIdsByDifferentUsers to true + // set SingleUserPerApplication to true err := conf.UpdateConfigMaps([]*v1.ConfigMap{ {Data: map[string]string{conf.CMSvcSingleUserPerApplication: "true"}}, }, true) assert.NilError(t, err, "UpdateConfigMap failed") - // create same app appID and ensure app obj is getting created because "Accepted" state + // create same app appID and ensure app is accepted pod2 = newPodByUser("pod1", "test") app3 := podEventHandler.HandleEvent(AddPod, Informers, pod2) assert.Assert(t, app3 != nil) - // set app state to "Running" - app3.SetState(cache.ApplicationStates().Running) - - // create same app appID and ensure app obj is not getting created because user is different from earlier submission + // create same app appID and ensure app is rejected because user is different from earlier submission pod2 = newPodByUser("pod1", "test1") app4 := podEventHandler.HandleEvent(AddPod, Informers, pod2) assert.Assert(t, app4 == nil) From f6ad4c5558c441380d628c2b6f7d18d83727d55f Mon Sep 17 00:00:00 2001 From: Manikandan R Date: Thu, 20 Jul 2023 19:24:25 +0530 Subject: [PATCH 4/7] Addressed review comments --- pkg/appmgmt/general/podevent_handler.go | 9 +- pkg/appmgmt/general/podevent_handler_test.go | 126 +++++++++++++++---- 2 files changed, 110 insertions(+), 25 deletions(-) diff --git a/pkg/appmgmt/general/podevent_handler.go b/pkg/appmgmt/general/podevent_handler.go index 52e11635d..898859d0f 100644 --- a/pkg/appmgmt/general/podevent_handler.go +++ b/pkg/appmgmt/general/podevent_handler.go @@ -25,6 +25,7 @@ import ( v1 "k8s.io/api/core/v1" "github.com/apache/yunikorn-k8shim/pkg/appmgmt/interfaces" + "github.com/apache/yunikorn-k8shim/pkg/common/events" "github.com/apache/yunikorn-k8shim/pkg/conf" "github.com/apache/yunikorn-k8shim/pkg/log" ) @@ -129,10 +130,16 @@ func (p *PodEventHandler) addPod(pod *v1.Pod, eventSource EventSource) interface }) } else { if conf.GetSchedulerConf().GetSingleUserPerApplication() && app.GetUser() != appMeta.User { - log.Log(log.ShimAppMgmtGeneral).Warn("application has been submitted by different user", + log.Log(log.ShimAppMgmtGeneral).Warn("rejecting application as it has been submitted by different user", zap.String("app id ", appMeta.ApplicationID), zap.String("app user", app.GetUser()), zap.String("submitted by", appMeta.User)) + events.GetRecorder().Eventf(pod.DeepCopy(), nil, v1.EventTypeWarning, + "Rejecting the application because already application exists with different user", + "Please try submitting a application by same user", + "SingleUserPerApplication has been configured to true. So rejecting the application because it has been submitted by different user. "+ + "Either you can disable the check by not setting SingleUserPerApplication or try submitting a application by same use. "+ + "By default, SingleUserPerApplication is false") return nil } managedApp = app diff --git a/pkg/appmgmt/general/podevent_handler_test.go b/pkg/appmgmt/general/podevent_handler_test.go index 0ecf53396..4c2432af6 100644 --- a/pkg/appmgmt/general/podevent_handler_test.go +++ b/pkg/appmgmt/general/podevent_handler_test.go @@ -19,15 +19,21 @@ package general import ( + "strings" "testing" + "time" "gotest.tools/v3/assert" v1 "k8s.io/api/core/v1" apis "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + k8sEvents "k8s.io/client-go/tools/events" "github.com/apache/yunikorn-k8shim/pkg/cache" + "github.com/apache/yunikorn-k8shim/pkg/client" "github.com/apache/yunikorn-k8shim/pkg/common/constants" + "github.com/apache/yunikorn-k8shim/pkg/common/events" + "github.com/apache/yunikorn-k8shim/pkg/common/utils" "github.com/apache/yunikorn-k8shim/pkg/conf" ) @@ -100,38 +106,110 @@ func TestRecoveryDone(t *testing.T) { assert.Equal(t, false, podEventHandler.recoveryRunning) } -func TestAllowSimilarAppIdsByDifferentUsers(t *testing.T) { - amProtocol := cache.NewMockedAMProtocol() - podEventHandler := NewPodEventHandler(amProtocol, false) +func TestSingleUserPerApplication(t *testing.T) { + conf.GetSchedulerConf().SetTestMode(true) + api := client.NewMockedAPIProvider(false) + cache.NewContext(api) + recorder, ok := events.GetRecorder().(*k8sEvents.FakeRecorder) + if !ok { + t.Fatal("the EventRecorder is expected to be of type FakeRecorder") + } - // create new app appID - pod1 := newPodByUser("pod1", "test") - app1 := podEventHandler.HandleEvent(AddPod, Informers, pod1) - assert.Equal(t, len(podEventHandler.asyncEvents), 0) - assert.Assert(t, app1 != nil) - app1.SetState(cache.ApplicationStates().Running) + amprotocol := cache.NewMockedAMProtocol() + podEvent := NewPodEventHandler(amprotocol, false) - // create same app appID and ensure app obj is getting created because allowSimilarAppIdsByDifferentUsers is false by default - pod2 := newPodByUser("pod1", "test") - app2 := podEventHandler.HandleEvent(AddPod, Informers, pod2) - assert.Assert(t, app2 != nil) - app2.SetState(cache.ApplicationStates().Accepted) + am := NewManager(client.NewMockedAPIProvider(false), podEvent) + + pod1 := v1.Pod{ + TypeMeta: apis.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: apis.ObjectMeta{ + Name: "pod00001", + Namespace: "default", + UID: "UID-POD-00001", + Labels: map[string]string{ + "queue": "root.a", + "yunikorn.apache.org/username": "test", + "applicationId": appID, + }, + }, + Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + } + + // submit the app + am.AddPod(&pod1) + + pod := &v1.Pod{ + TypeMeta: apis.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: apis.ObjectMeta{ + Name: "pod00002", + Namespace: "default", + UID: "UID-POD-00002", + Labels: map[string]string{ + "queue": "root.a", + "yunikorn.apache.org/username": "test1", + "applicationId": appID, + }, + }, + Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + } + + // submit the same app with different user + am.AddPod(pod) + + message := "Rejecting the application because already application exists with different user" + reason := "SingleUserPerApplication has been configured to true. So rejecting the application because it has been submitted by different user. " + + "Either you can disable the check by not setting SingleUserPerApplication or try submitting a application by same use. " + + "By default, SingleUserPerApplication is false" + + // ensure there is no event + err := utils.WaitForCondition(func() bool { + for { + select { + case event := <-recorder.Events: + if strings.Contains(event, reason) && strings.Contains(event, message) { + return true + } + default: + return false + } + } + }, 50*time.Millisecond, 20*time.Millisecond) + assert.Error(t, err, "timeout waiting for condition") // set SingleUserPerApplication to true - err := conf.UpdateConfigMaps([]*v1.ConfigMap{ + err = conf.UpdateConfigMaps([]*v1.ConfigMap{ {Data: map[string]string{conf.CMSvcSingleUserPerApplication: "true"}}, }, true) assert.NilError(t, err, "UpdateConfigMap failed") - // create same app appID and ensure app is accepted - pod2 = newPodByUser("pod1", "test") - app3 := podEventHandler.HandleEvent(AddPod, Informers, pod2) - assert.Assert(t, app3 != nil) - - // create same app appID and ensure app is rejected because user is different from earlier submission - pod2 = newPodByUser("pod1", "test1") - app4 := podEventHandler.HandleEvent(AddPod, Informers, pod2) - assert.Assert(t, app4 == nil) + // submit the same app with different user and ensure specific rejection event has been published + am.AddPod(pod) + + err = utils.WaitForCondition(func() bool { + for { + select { + case event := <-recorder.Events: + if strings.Contains(event, reason) && strings.Contains(event, message) { + return true + } + default: + return false + } + } + }, 50*time.Millisecond, 20*time.Millisecond) + assert.NilError(t, err, "event should have been emitted") } func newPod(name string) *v1.Pod { From 384b3fef80d5495e9527f5f95c28154b388498a1 Mon Sep 17 00:00:00 2001 From: Manikandan R Date: Thu, 20 Jul 2023 21:20:01 +0530 Subject: [PATCH 5/7] Addressed review comments --- pkg/appmgmt/general/podevent_handler.go | 7 ++----- pkg/appmgmt/general/podevent_handler_test.go | 13 +++++-------- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/pkg/appmgmt/general/podevent_handler.go b/pkg/appmgmt/general/podevent_handler.go index 898859d0f..7e7bccf9a 100644 --- a/pkg/appmgmt/general/podevent_handler.go +++ b/pkg/appmgmt/general/podevent_handler.go @@ -19,6 +19,7 @@ package general import ( + "fmt" "sync" "go.uber.org/zap" @@ -135,11 +136,7 @@ func (p *PodEventHandler) addPod(pod *v1.Pod, eventSource EventSource) interface zap.String("app user", app.GetUser()), zap.String("submitted by", appMeta.User)) events.GetRecorder().Eventf(pod.DeepCopy(), nil, v1.EventTypeWarning, - "Rejecting the application because already application exists with different user", - "Please try submitting a application by same user", - "SingleUserPerApplication has been configured to true. So rejecting the application because it has been submitted by different user. "+ - "Either you can disable the check by not setting SingleUserPerApplication or try submitting a application by same use. "+ - "By default, SingleUserPerApplication is false") + fmt.Sprintf("Rejecting pod because application ID %s belongs to a different user", appMeta.ApplicationID), "", "") return nil } managedApp = app diff --git a/pkg/appmgmt/general/podevent_handler_test.go b/pkg/appmgmt/general/podevent_handler_test.go index 4c2432af6..c938e601b 100644 --- a/pkg/appmgmt/general/podevent_handler_test.go +++ b/pkg/appmgmt/general/podevent_handler_test.go @@ -168,24 +168,21 @@ func TestSingleUserPerApplication(t *testing.T) { // submit the same app with different user am.AddPod(pod) - message := "Rejecting the application because already application exists with different user" - reason := "SingleUserPerApplication has been configured to true. So rejecting the application because it has been submitted by different user. " + - "Either you can disable the check by not setting SingleUserPerApplication or try submitting a application by same use. " + - "By default, SingleUserPerApplication is false" + message := "Rejecting pod because application ID " + appID + " belongs to a different user" // ensure there is no event err := utils.WaitForCondition(func() bool { for { select { case event := <-recorder.Events: - if strings.Contains(event, reason) && strings.Contains(event, message) { + if strings.Contains(event, message) { return true } default: return false } } - }, 50*time.Millisecond, 20*time.Millisecond) + }, 50*time.Millisecond, time.Second) assert.Error(t, err, "timeout waiting for condition") // set SingleUserPerApplication to true @@ -201,14 +198,14 @@ func TestSingleUserPerApplication(t *testing.T) { for { select { case event := <-recorder.Events: - if strings.Contains(event, reason) && strings.Contains(event, message) { + if strings.Contains(event, message) { return true } default: return false } } - }, 50*time.Millisecond, 20*time.Millisecond) + }, 50*time.Millisecond, time.Second) assert.NilError(t, err, "event should have been emitted") } From 7d020f6cd3f1af9512e677b85858752e906c1689 Mon Sep 17 00:00:00 2001 From: Manikandan R Date: Wed, 23 Aug 2023 13:30:09 +0530 Subject: [PATCH 6/7] Addressed review comments * Added more test cases * Cleanedup test code --- pkg/appmgmt/general/podevent_handler.go | 4 +- pkg/appmgmt/general/podevent_handler_test.go | 91 +++++++++++--------- 2 files changed, 51 insertions(+), 44 deletions(-) diff --git a/pkg/appmgmt/general/podevent_handler.go b/pkg/appmgmt/general/podevent_handler.go index 7e7bccf9a..b641768f1 100644 --- a/pkg/appmgmt/general/podevent_handler.go +++ b/pkg/appmgmt/general/podevent_handler.go @@ -26,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" "github.com/apache/yunikorn-k8shim/pkg/appmgmt/interfaces" + "github.com/apache/yunikorn-k8shim/pkg/common/constants" "github.com/apache/yunikorn-k8shim/pkg/common/events" "github.com/apache/yunikorn-k8shim/pkg/conf" "github.com/apache/yunikorn-k8shim/pkg/log" @@ -130,7 +131,8 @@ func (p *PodEventHandler) addPod(pod *v1.Pod, eventSource EventSource) interface Metadata: appMeta, }) } else { - if conf.GetSchedulerConf().GetSingleUserPerApplication() && app.GetUser() != appMeta.User { + if app.GetApplicationID() != constants.AutoGenAppPrefix+"-"+pod.Namespace+"-"+constants.AutoGenAppSuffix && + conf.GetSchedulerConf().GetSingleUserPerApplication() && app.GetUser() != appMeta.User { log.Log(log.ShimAppMgmtGeneral).Warn("rejecting application as it has been submitted by different user", zap.String("app id ", appMeta.ApplicationID), zap.String("app user", app.GetUser()), diff --git a/pkg/appmgmt/general/podevent_handler_test.go b/pkg/appmgmt/general/podevent_handler_test.go index c938e601b..0941fd3f0 100644 --- a/pkg/appmgmt/general/podevent_handler_test.go +++ b/pkg/appmgmt/general/podevent_handler_test.go @@ -120,50 +120,12 @@ func TestSingleUserPerApplication(t *testing.T) { am := NewManager(client.NewMockedAPIProvider(false), podEvent) - pod1 := v1.Pod{ - TypeMeta: apis.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - }, - ObjectMeta: apis.ObjectMeta{ - Name: "pod00001", - Namespace: "default", - UID: "UID-POD-00001", - Labels: map[string]string{ - "queue": "root.a", - "yunikorn.apache.org/username": "test", - "applicationId": appID, - }, - }, - Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, - Status: v1.PodStatus{ - Phase: v1.PodPending, - }, - } + pod1 := newPodByUser("pod00001", "test", appID) // submit the app - am.AddPod(&pod1) + am.AddPod(pod1) - pod := &v1.Pod{ - TypeMeta: apis.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - }, - ObjectMeta: apis.ObjectMeta{ - Name: "pod00002", - Namespace: "default", - UID: "UID-POD-00002", - Labels: map[string]string{ - "queue": "root.a", - "yunikorn.apache.org/username": "test1", - "applicationId": appID, - }, - }, - Spec: v1.PodSpec{SchedulerName: constants.SchedulerName}, - Status: v1.PodStatus{ - Phase: v1.PodPending, - }, - } + pod := newPodByUser("pod00002", "test1", appID) // submit the same app with different user am.AddPod(pod) @@ -207,13 +169,56 @@ func TestSingleUserPerApplication(t *testing.T) { } }, 50*time.Millisecond, time.Second) assert.NilError(t, err, "event should have been emitted") + + autogenAppID := constants.AutoGenAppPrefix + "-default-" + constants.AutoGenAppSuffix + pod2 := newPodByUser("pod00003", "test", autogenAppID) + + // submit the autogen app + am.AddPod(pod2) + + message = "Rejecting pod because application ID " + autogenAppID + " belongs to a different user" + + // ensure there is no event + err = utils.WaitForCondition(func() bool { + for { + select { + case event := <-recorder.Events: + if strings.Contains(event, message) { + return true + } + default: + return false + } + } + }, 50*time.Millisecond, time.Second) + assert.Error(t, err, "timeout waiting for condition") + + pod3 := newPodByUser("pod00004", "test", autogenAppID) + + // submit the same autogen app with different user and ensure no event has been published because same autogen app submission by different users are allowed + am.AddPod(pod3) + + // ensure there is no event even though auto gen app already exists + err = utils.WaitForCondition(func() bool { + for { + select { + case event := <-recorder.Events: + if strings.Contains(event, message) { + return true + } + default: + return false + } + } + }, 50*time.Millisecond, time.Second) + assert.Error(t, err, "timeout waiting for condition") } func newPod(name string) *v1.Pod { - return newPodByUser(name, "nobody") + return newPodByUser(name, "nobody", appID) } -func newPodByUser(name string, user string) *v1.Pod { +func newPodByUser(name string, user string, appID string) *v1.Pod { return &v1.Pod{ TypeMeta: apis.TypeMeta{ Kind: "Pod", From 466d1efa5a5c41804660553849adbdc8d73c0a6d Mon Sep 17 00:00:00 2001 From: Manikandan R Date: Wed, 6 Sep 2023 16:29:36 +0530 Subject: [PATCH 7/7] Addressed review comments --- pkg/appmgmt/general/podevent_handler.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/appmgmt/general/podevent_handler.go b/pkg/appmgmt/general/podevent_handler.go index b641768f1..01c272177 100644 --- a/pkg/appmgmt/general/podevent_handler.go +++ b/pkg/appmgmt/general/podevent_handler.go @@ -131,8 +131,7 @@ func (p *PodEventHandler) addPod(pod *v1.Pod, eventSource EventSource) interface Metadata: appMeta, }) } else { - if app.GetApplicationID() != constants.AutoGenAppPrefix+"-"+pod.Namespace+"-"+constants.AutoGenAppSuffix && - conf.GetSchedulerConf().GetSingleUserPerApplication() && app.GetUser() != appMeta.User { + if conf.GetSchedulerConf().GetSingleUserPerApplication() && app.GetUser() != appMeta.User && isAppIDUnique(app, pod) { log.Log(log.ShimAppMgmtGeneral).Warn("rejecting application as it has been submitted by different user", zap.String("app id ", appMeta.ApplicationID), zap.String("app user", app.GetUser()), @@ -189,6 +188,10 @@ func (p *PodEventHandler) deletePod(pod *v1.Pod) interfaces.ManagedApp { return nil } +func isAppIDUnique(app interfaces.ManagedApp, pod *v1.Pod) bool { + return app.GetApplicationID() != constants.AutoGenAppPrefix+"-"+pod.Namespace+"-"+constants.AutoGenAppSuffix +} + func NewPodEventHandler(amProtocol interfaces.ApplicationManagementProtocol, recoveryRunning bool) *PodEventHandler { asyncEvents := make([]*podAsyncEvent, 0) podEventHandler := &PodEventHandler{