Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-1851] Don't allow similar app ids by different users #628

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions pkg/appmgmt/general/podevent_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@
package general

import (
"fmt"
"sync"

"github.com/apache/yunikorn-k8shim/pkg/appmgmt/interfaces"
"github.com/apache/yunikorn-k8shim/pkg/log"

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"

"github.com/apache/yunikorn-k8shim/pkg/appmgmt/interfaces"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/common/events"
"github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/log"
)

type PodEventHandler struct {
Expand Down Expand Up @@ -127,6 +131,15 @@ func (p *PodEventHandler) addPod(pod *v1.Pod, eventSource EventSource) interface
Metadata: appMeta,
})
} else {
if conf.GetSchedulerConf().GetSingleUserPerApplication() && app.GetUser() != appMeta.User && isAppIDUnique(app, pod) {
log.Log(log.ShimAppMgmtGeneral).Warn("rejecting application as it has been submitted by different user",
zap.String("app id ", appMeta.ApplicationID),
zap.String("app user", app.GetUser()),
zap.String("submitted by", appMeta.User))
events.GetRecorder().Eventf(pod.DeepCopy(), nil, v1.EventTypeWarning,
fmt.Sprintf("Rejecting pod because application ID %s belongs to a different user", appMeta.ApplicationID), "", "")
return nil
manirajv06 marked this conversation as resolved.
Show resolved Hide resolved
}
managedApp = app
appExists = true
}
Expand Down Expand Up @@ -175,6 +188,10 @@ func (p *PodEventHandler) deletePod(pod *v1.Pod) interfaces.ManagedApp {
return nil
}

func isAppIDUnique(app interfaces.ManagedApp, pod *v1.Pod) bool {
return app.GetApplicationID() != constants.AutoGenAppPrefix+"-"+pod.Namespace+"-"+constants.AutoGenAppSuffix
}

func NewPodEventHandler(amProtocol interfaces.ApplicationManagementProtocol, recoveryRunning bool) *PodEventHandler {
asyncEvents := make([]*podAsyncEvent, 0)
podEventHandler := &PodEventHandler{
Expand Down
124 changes: 122 additions & 2 deletions pkg/appmgmt/general/podevent_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,22 @@
package general

import (
"strings"
"testing"
"time"

"gotest.tools/v3/assert"
v1 "k8s.io/api/core/v1"
apis "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
k8sEvents "k8s.io/client-go/tools/events"

"github.com/apache/yunikorn-k8shim/pkg/cache"
"github.com/apache/yunikorn-k8shim/pkg/client"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/common/events"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/conf"
)

const appID = "app00001"
Expand Down Expand Up @@ -99,7 +106,119 @@ func TestRecoveryDone(t *testing.T) {
assert.Equal(t, false, podEventHandler.recoveryRunning)
}

func TestSingleUserPerApplication(t *testing.T) {
conf.GetSchedulerConf().SetTestMode(true)
api := client.NewMockedAPIProvider(false)
cache.NewContext(api)
recorder, ok := events.GetRecorder().(*k8sEvents.FakeRecorder)
if !ok {
t.Fatal("the EventRecorder is expected to be of type FakeRecorder")
}

amprotocol := cache.NewMockedAMProtocol()
podEvent := NewPodEventHandler(amprotocol, false)

am := NewManager(client.NewMockedAPIProvider(false), podEvent)

pod1 := newPodByUser("pod00001", "test", appID)

// submit the app
am.AddPod(pod1)

pod := newPodByUser("pod00002", "test1", appID)

// submit the same app with different user
am.AddPod(pod)

message := "Rejecting pod because application ID " + appID + " belongs to a different user"

// ensure there is no event
err := utils.WaitForCondition(func() bool {
for {
select {
case event := <-recorder.Events:
if strings.Contains(event, message) {
return true
}
default:
return false
}
}
}, 50*time.Millisecond, time.Second)
assert.Error(t, err, "timeout waiting for condition")
pbacsko marked this conversation as resolved.
Show resolved Hide resolved

// set SingleUserPerApplication to true
err = conf.UpdateConfigMaps([]*v1.ConfigMap{
{Data: map[string]string{conf.CMSvcSingleUserPerApplication: "true"}},
}, true)
assert.NilError(t, err, "UpdateConfigMap failed")

// submit the same app with different user and ensure specific rejection event has been published
am.AddPod(pod)

err = utils.WaitForCondition(func() bool {
for {
select {
case event := <-recorder.Events:
if strings.Contains(event, message) {
return true
}
default:
return false
}
}
}, 50*time.Millisecond, time.Second)
assert.NilError(t, err, "event should have been emitted")

autogenAppID := constants.AutoGenAppPrefix + "-default-" + constants.AutoGenAppSuffix
pod2 := newPodByUser("pod00003", "test", autogenAppID)

// submit the autogen app
am.AddPod(pod2)

message = "Rejecting pod because application ID " + autogenAppID + " belongs to a different user"

// ensure there is no event
err = utils.WaitForCondition(func() bool {
for {
select {
case event := <-recorder.Events:
if strings.Contains(event, message) {
return true
}
default:
return false
}
}
}, 50*time.Millisecond, time.Second)
assert.Error(t, err, "timeout waiting for condition")

pod3 := newPodByUser("pod00004", "test", autogenAppID)

// submit the same autogen app with different user and ensure no event has been published because same autogen app submission by different users are allowed
am.AddPod(pod3)

// ensure there is no event even though auto gen app already exists
err = utils.WaitForCondition(func() bool {
for {
select {
case event := <-recorder.Events:
if strings.Contains(event, message) {
return true
}
default:
return false
}
}
}, 50*time.Millisecond, time.Second)
assert.Error(t, err, "timeout waiting for condition")
}

func newPod(name string) *v1.Pod {
return newPodByUser(name, "nobody", appID)
}

func newPodByUser(name string, user string, appID string) *v1.Pod {
return &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
Expand All @@ -110,8 +229,9 @@ func newPod(name string) *v1.Pod {
Namespace: "default",
UID: types.UID(name),
Labels: map[string]string{
"queue": "root.a",
"applicationId": appID,
"queue": "root.a",
"applicationId": appID,
constants.DefaultUserLabel: user,
},
},
Spec: v1.PodSpec{
Expand Down
37 changes: 25 additions & 12 deletions pkg/conf/schedulerconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,24 +64,26 @@ const (
CMSvcEnableConfigHotRefresh = PrefixService + "enableConfigHotRefresh"
CMSvcPlaceholderImage = PrefixService + "placeholderImage"
CMSvcNodeInstanceTypeNodeLabelKey = PrefixService + "nodeInstanceTypeNodeLabelKey"
CMSvcSingleUserPerApplication = PrefixService + "singleUserPerApplication"

// kubernetes
CMKubeQPS = PrefixKubernetes + "qps"
CMKubeBurst = PrefixKubernetes + "burst"

// defaults
DefaultNamespace = "default"
DefaultClusterID = "mycluster"
DefaultPolicyGroup = "queues"
DefaultSchedulingInterval = time.Second
DefaultVolumeBindTimeout = 10 * time.Second
DefaultEventChannelCapacity = 1024 * 1024
DefaultDispatchTimeout = 300 * time.Second
DefaultOperatorPlugins = "general"
DefaultDisableGangScheduling = false
DefaultEnableConfigHotRefresh = true
DefaultKubeQPS = 1000
DefaultKubeBurst = 1000
DefaultNamespace = "default"
DefaultClusterID = "mycluster"
DefaultPolicyGroup = "queues"
DefaultSchedulingInterval = time.Second
DefaultVolumeBindTimeout = 10 * time.Second
DefaultEventChannelCapacity = 1024 * 1024
DefaultDispatchTimeout = 300 * time.Second
DefaultOperatorPlugins = "general"
DefaultDisableGangScheduling = false
DefaultEnableConfigHotRefresh = true
DefaultKubeQPS = 1000
DefaultKubeBurst = 1000
DefaultSingleUserPerApplication = false
)

var (
Expand Down Expand Up @@ -120,6 +122,7 @@ type SchedulerConf struct {
PlaceHolderImage string `json:"placeHolderImage"`
InstanceTypeNodeLabelKey string `json:"instanceTypeNodeLabelKey"`
Namespace string `json:"namespace"`
SingleUserPerApplication bool `json:"singleUserPerApplication"`
sync.RWMutex
}

Expand Down Expand Up @@ -147,6 +150,7 @@ func (conf *SchedulerConf) Clone() *SchedulerConf {
PlaceHolderImage: conf.PlaceHolderImage,
InstanceTypeNodeLabelKey: conf.InstanceTypeNodeLabelKey,
Namespace: conf.Namespace,
SingleUserPerApplication: conf.SingleUserPerApplication,
}
}

Expand Down Expand Up @@ -204,6 +208,7 @@ func handleNonReloadableConfig(old *SchedulerConf, new *SchedulerConf) {
checkNonReloadableBool(CMSvcDisableGangScheduling, &old.DisableGangScheduling, &new.DisableGangScheduling)
checkNonReloadableString(CMSvcPlaceholderImage, &old.PlaceHolderImage, &new.PlaceHolderImage)
checkNonReloadableString(CMSvcNodeInstanceTypeNodeLabelKey, &old.InstanceTypeNodeLabelKey, &new.InstanceTypeNodeLabelKey)
checkNonReloadableBool(CMSvcSingleUserPerApplication, &old.SingleUserPerApplication, &new.SingleUserPerApplication)
}

const warningNonReloadable = "ignoring non-reloadable configuration change (restart required to update)"
Expand Down Expand Up @@ -295,6 +300,12 @@ func GetSchedulerNamespace() string {
return DefaultNamespace
}

func (conf *SchedulerConf) GetSingleUserPerApplication() bool {
conf.RLock()
defer conf.RUnlock()
return conf.SingleUserPerApplication
}

func createConfigs() {
confHolder.Store(CreateDefaultConfig())
}
Expand Down Expand Up @@ -333,6 +344,7 @@ func CreateDefaultConfig() *SchedulerConf {
UserLabelKey: constants.DefaultUserLabel,
PlaceHolderImage: constants.PlaceholderContainerImage,
InstanceTypeNodeLabelKey: constants.DefaultNodeInstanceTypeNodeLabelKey,
SingleUserPerApplication: DefaultSingleUserPerApplication,
}
}

Expand All @@ -358,6 +370,7 @@ func parseConfig(config map[string]string, prev *SchedulerConf) (*SchedulerConf,
parser.boolVar(&conf.EnableConfigHotRefresh, CMSvcEnableConfigHotRefresh)
parser.stringVar(&conf.PlaceHolderImage, CMSvcPlaceholderImage)
parser.stringVar(&conf.InstanceTypeNodeLabelKey, CMSvcNodeInstanceTypeNodeLabelKey)
parser.boolVar(&conf.SingleUserPerApplication, CMSvcSingleUserPerApplication)

// kubernetes
parser.intVar(&conf.KubeQPS, CMKubeQPS)
Expand Down
3 changes: 3 additions & 0 deletions pkg/conf/schedulerconf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func assertDefaults(t *testing.T, conf *SchedulerConf) {
assert.Equal(t, conf.KubeQPS, DefaultKubeQPS)
assert.Equal(t, conf.KubeBurst, DefaultKubeBurst)
assert.Equal(t, conf.UserLabelKey, constants.DefaultUserLabel)
assert.Equal(t, conf.SingleUserPerApplication, DefaultSingleUserPerApplication)
}

func TestParseConfigMap(t *testing.T) {
Expand All @@ -91,6 +92,7 @@ func TestParseConfigMap(t *testing.T) {
{CMSvcNodeInstanceTypeNodeLabelKey, "InstanceTypeNodeLabelKey", "node.kubernetes.io/instance-type"},
{CMKubeQPS, "KubeQPS", 2345},
{CMKubeBurst, "KubeBurst", 3456},
{CMSvcSingleUserPerApplication, "SingleUserPerApplication", true},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -123,6 +125,7 @@ func TestUpdateConfigMapNonReloadable(t *testing.T) {
{CMSvcNodeInstanceTypeNodeLabelKey, "InstanceTypeNodeLabelKey", "node.kubernetes.io/instance-type", false},
{CMKubeQPS, "KubeQPS", 2345, false},
{CMKubeBurst, "KubeBurst", 3456, false},
{CMSvcSingleUserPerApplication, "SingleUserPerApplication", true, false},
}

for _, tc := range testCases {
Expand Down