diff --git a/test/e2e/framework/helpers/k8s/pod_annotation.go b/test/e2e/framework/helpers/k8s/pod_annotation.go index a27f86c2f..d2a3967af 100644 --- a/test/e2e/framework/helpers/k8s/pod_annotation.go +++ b/test/e2e/framework/helpers/k8s/pod_annotation.go @@ -29,8 +29,8 @@ type PodAnnotation struct { } type UserInfo struct { - User string `"json:user,omitempty"` - Groups []string `"json:groups,omitempty"` + User string `json:"username,omitempty"` + Groups []string `json:"groups,omitempty"` } const ( diff --git a/test/e2e/framework/helpers/yunikorn/rest_api_utils.go b/test/e2e/framework/helpers/yunikorn/rest_api_utils.go index cd29c3b00..4f9854c7d 100644 --- a/test/e2e/framework/helpers/yunikorn/rest_api_utils.go +++ b/test/e2e/framework/helpers/yunikorn/rest_api_utils.go @@ -401,7 +401,7 @@ func (c *RClient) GetPartitions(partition string) (*dao.PartitionQueueDAOInfo, e func (c *RClient) GetUsersResourceUsage(partition string) ([]*dao.UserResourceUsageDAOInfo, error) { req, err := c.newRequest("GET", fmt.Sprintf(configmanager.UsersTrackerPath, partition), nil) if err != nil { - return nil, err + return nil, err } var usersUsage []*dao.UserResourceUsageDAOInfo _, err = c.do(req, usersUsage) @@ -411,7 +411,7 @@ func (c *RClient) GetUsersResourceUsage(partition string) ([]*dao.UserResourceUs func (c *RClient) GetUserResourceUsage(partition string, user string) (*dao.UserResourceUsageDAOInfo, error) { req, err := c.newRequest("GET", fmt.Sprintf(configmanager.UserTrackerPath, partition, user), nil) if err != nil { - return nil, err + return nil, err } var userUsage *dao.UserResourceUsageDAOInfo _, err = c.do(req, userUsage) @@ -421,7 +421,7 @@ func (c *RClient) GetUserResourceUsage(partition string, user string) (*dao.User func (c *RClient) GetGroupsResourceUsage(partition string) ([]*dao.GroupResourceUsageDAOInfo, error) { req, err := c.newRequest("GET", fmt.Sprintf(configmanager.GroupsTrackerPath, partition), nil) if err != nil { - return nil, err + return nil, err } var gourpsUsage []*dao.GroupResourceUsageDAOInfo _, err = c.do(req, gourpsUsage) @@ -431,9 +431,47 @@ func (c *RClient) GetGroupsResourceUsage(partition string) ([]*dao.GroupResource func (c *RClient) GetGroupResourceUsage(partition string, user string) (*dao.GroupResourceUsageDAOInfo, error) { req, err := c.newRequest("GET", fmt.Sprintf(configmanager.GroupTrackerPath, partition, user), nil) if err != nil { - return nil, err + return nil, err } var groupUsage *dao.GroupResourceUsageDAOInfo _, err = c.do(req, groupUsage) return groupUsage, err } + +func (c *RClient) GetQueueFromUserResourceUsage(usages []*dao.UserResourceUsageDAOInfo, queueName string, user string) (*dao.ResourceUsageDAOInfo, error) { + var result *dao.ResourceUsageDAOInfo + for _, usage := range usages { + if usage.UserName == user { + result = usage.Queues + } + } + return QueueFromResourceUsage(result, queueName) +} + +func (c *RClient) GetQueueFromGroupResourceUsage(usages []*dao.GroupResourceUsageDAOInfo, queueName string, group string) (*dao.ResourceUsageDAOInfo, error) { + var result *dao.ResourceUsageDAOInfo + for _, usage := range usages { + if usage.GroupName == group { + result = usage.Queues + } + } + return QueueFromResourceUsage(result, queueName) +} + +func QueueFromResourceUsage(root *dao.ResourceUsageDAOInfo, queueName string) (*dao.ResourceUsageDAOInfo, error) { + if root == nil { + return nil, fmt.Errorf("ResourceUsage not found: %s", queueName) + } + + if queueName == "root" { + return root, nil + } + + var allSubQueues = root.Children + for _, subQ := range allSubQueues { + if subQ.QueuePath == queueName { + return subQ, nil + } + } + return nil, fmt.Errorf("ResourceUsage not found: %s", queueName) +} diff --git a/test/e2e/user_qauta_tracing/user_qauta_tracing_test.go b/test/e2e/user_qauta_tracing/user_qauta_tracing_test.go index e4ff905ba..11f43df5d 100644 --- a/test/e2e/user_qauta_tracing/user_qauta_tracing_test.go +++ b/test/e2e/user_qauta_tracing/user_qauta_tracing_test.go @@ -2,7 +2,6 @@ package user_qauta_tracing_test import ( "fmt" - "strings" v1 "k8s.io/api/core/v1" @@ -10,18 +9,18 @@ import ( . "github.com/onsi/gomega" "github.com/apache/yunikorn-core/pkg/common/configs" + "github.com/apache/yunikorn-core/pkg/common/resources" tests "github.com/apache/yunikorn-k8shim/test/e2e" "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common" "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/k8s" "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn" + siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" ) const ( NANESPACE_LENGTH = 10 WAIT_INTERVAL = 60 DEFAULT_PARTITION = "default" - ROOT_QUEUE = "root" - DOT = "." ) var _ = Describe("UserQuataTracing: Two leaf queus for two groups", func() { @@ -35,14 +34,17 @@ var _ = Describe("UserQuataTracing: Two leaf queus for two groups", func() { }) It("User qauta trace with 3 users and 2 groups", func() { + groups := []string{"staff", "stduent"} + users := []string{"teacher", "student", "assistant"} + queuePath := []string{"root", "root.staff_resources", "root.students_resources"} yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "", annotation, func(sc *configs.SchedulerConfig) error { // remove placement rules so we can control queue sc.Partitions[0].PlacementRules = nil queuesConfigs := []struct { partition, parentQueue, QueueName string }{ - {DEFAULT_PARTITION, ROOT_QUEUE, "students_resources"}, - {DEFAULT_PARTITION, ROOT_QUEUE, "staff_resources"}, + {DEFAULT_PARTITION, queuePath[0], "students_resources"}, + {DEFAULT_PARTITION, queuePath[0], "staff_resources"}, } for _, queueConfig := range queuesConfigs { By(fmt.Sprintf("Add child queue %s to the parent queue %s", queueConfig.QueueName, queueConfig.parentQueue)) @@ -60,10 +62,10 @@ var _ = Describe("UserQuataTracing: Two leaf queus for two groups", func() { AppID, Queue, UserName string GroupsNames []string }{ - {"teacher-app-01", strings.Join([]string{ROOT_QUEUE, "staff_resources"}, DOT), "teacher", []string{"staff"}}, - {"students-app-01", strings.Join([]string{ROOT_QUEUE, "students_resources"}, DOT), "student", []string{"students"}}, - {"assistant-app-01", strings.Join([]string{ROOT_QUEUE, "students_resources"}, DOT), "assistant", []string{"students"}}, - {"assistant-app-02", strings.Join([]string{ROOT_QUEUE, "staff_resources"}, DOT), "assistant", []string{"staff"}}, + {"teacher-app-01", queuePath[1], users[0], []string{groups[0]}}, + {"students-app-01", queuePath[2], users[1], []string{groups[1]}}, + {"assistant-app-01", queuePath[2], users[2], []string{groups[1]}}, + {"assistant-app-02", queuePath[1], users[2], []string{groups[0]}}, } for _, config := range configs { By(fmt.Sprintf("Deploy the sleep app %s to the %s namespace", config.AppID, ns)) @@ -84,12 +86,54 @@ var _ = Describe("UserQuataTracing: Two leaf queus for two groups", func() { Ω(podErr).NotTo(HaveOccurred()) } restClient := yunikorn.RClient{} - users, err := restClient.GetUsersResourceUsage(DEFAULT_PARTITION) + var usedResource yunikorn.ResourceUsage + By("Check total number of users and groups") + usersUsage, err := restClient.GetUsersResourceUsage(DEFAULT_PARTITION) Ω(err).NotTo(HaveOccurred()) - Ω(len(users)).To(Equal(3), "Total number of users is not correct") - groups, err := restClient.GetGroupsResourceUsage(DEFAULT_PARTITION) + Ω(len(usersUsage)).To(Equal(3), "Total number of users is not correct") + groupsUsage, err := restClient.GetGroupsResourceUsage(DEFAULT_PARTITION) Ω(err).NotTo(HaveOccurred()) - Ω(len(groups)).To(Equal(2), "Total number of groups is not correct") + Ω(len(groupsUsage)).To(Equal(2), "Total number of groups is not correct") + + By("Check user resource usage of assistant in each queue") + userUsage, err := restClient.GetUserResourceUsage(DEFAULT_PARTITION, users[2]) + Ω(err).NotTo(HaveOccurred()) + Ω(userUsage).To(Equal(map[string]string{configs[2].AppID: groups[1], configs[3].AppID: groups[0]})) + queue, err := restClient.GetQueueFromUserResourceUsage(usersUsage, queuePath[0], users[2]) + Ω(err).NotTo(HaveOccurred()) + Ω(queue.RunningApplications).To(Equal([]string{configs[2].AppID, configs[3].AppID})) + usedResource.ParseResourceUsage(parseResource(queue.ResourceUsage)) + Ω(usedResource).To(Equal(map[string]int64{siCommon.CPU: 200, siCommon.Memory: 100})) + queue, err = restClient.GetQueueFromUserResourceUsage(usersUsage, queuePath[1], users[2]) + Ω(err).NotTo(HaveOccurred()) + Ω(queue.RunningApplications).To(Equal([]string{configs[3].AppID})) + usedResource.ParseResourceUsage(parseResource(queue.ResourceUsage)) + Ω(usedResource).To(Equal(map[string]int64{siCommon.CPU: 100, siCommon.Memory: 50})) + queue, err = restClient.GetQueueFromUserResourceUsage(usersUsage, queuePath[2], users[2]) + Ω(err).NotTo(HaveOccurred()) + Ω(queue.RunningApplications).To(Equal([]string{configs[2].AppID})) + usedResource.ParseResourceUsage(parseResource(queue.ResourceUsage)) + Ω(usedResource).To(Equal(map[string]int64{siCommon.CPU: 100, siCommon.Memory: 50})) + + By("Check group resource usage of staff in each queue") + groupUsage, err := restClient.GetGroupResourceUsage(DEFAULT_PARTITION, groups[0]) + Ω(err).NotTo(HaveOccurred()) + Ω(groupUsage.Applications).To(Equal([]string{configs[0].AppID, configs[3].AppID}), "running application IDs are not expected") + queue, err = restClient.GetQueueFromGroupResourceUsage(groupsUsage, queuePath[0], groups[0]) + Ω(err).NotTo(HaveOccurred()) + Ω(queue.RunningApplications).To(Equal([]string{configs[0].AppID, configs[3].AppID})) + usedResource.ParseResourceUsage(parseResource(queue.ResourceUsage)) + Ω(usedResource).To(Equal(map[string]int64{siCommon.CPU: 200, siCommon.Memory: 100})) + queue, err = restClient.GetQueueFromGroupResourceUsage(groupsUsage, queuePath[1], groups[0]) + Ω(err).NotTo(HaveOccurred()) + Ω(queue.RunningApplications).To(Equal([]string{configs[0].AppID, configs[3].AppID})) + usedResource.ParseResourceUsage(parseResource(queue.ResourceUsage)) + Ω(usedResource).To(Equal(map[string]int64{siCommon.CPU: 200, siCommon.Memory: 100})) + queue, err = restClient.GetQueueFromGroupResourceUsage(groupsUsage, queuePath[2], groups[0]) + Ω(err).NotTo(HaveOccurred()) + Ω(queue.RunningApplications).To(Equal([]string{})) + usedResource.ParseResourceUsage(parseResource(queue.ResourceUsage)) + Ω(usedResource).To(Equal(map[string]int64{siCommon.CPU: 0, siCommon.Memory: 0})) }) AfterEach(func() { @@ -103,3 +147,11 @@ var _ = Describe("UserQuataTracing: Two leaf queus for two groups", func() { Ω(err).NotTo(HaveOccurred()) }) }) + +func parseResource(res *resources.Resource) map[string]int64 { + result := make(map[string]int64) + for key, value := range res.Resources { + result[key] = int64(value) + } + return result +}