Skip to content

Commit

Permalink
[YUNIKORN-1901] A basic example for the user tracing and the group tr…
Browse files Browse the repository at this point in the history
…acing
  • Loading branch information
0yukali0 committed Aug 15, 2023
1 parent 9784378 commit 97e86d3
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 19 deletions.
4 changes: 2 additions & 2 deletions test/e2e/framework/helpers/k8s/pod_annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type PodAnnotation struct {
}

type UserInfo struct {
User string `"json:user,omitempty"`
Groups []string `"json:groups,omitempty"`
User string `json:"username,omitempty"`
Groups []string `json:"groups,omitempty"`
}

const (
Expand Down
46 changes: 42 additions & 4 deletions test/e2e/framework/helpers/yunikorn/rest_api_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func (c *RClient) GetPartitions(partition string) (*dao.PartitionQueueDAOInfo, e
func (c *RClient) GetUsersResourceUsage(partition string) ([]*dao.UserResourceUsageDAOInfo, error) {
req, err := c.newRequest("GET", fmt.Sprintf(configmanager.UsersTrackerPath, partition), nil)
if err != nil {
return nil, err
return nil, err
}
var usersUsage []*dao.UserResourceUsageDAOInfo
_, err = c.do(req, usersUsage)
Expand All @@ -411,7 +411,7 @@ func (c *RClient) GetUsersResourceUsage(partition string) ([]*dao.UserResourceUs
func (c *RClient) GetUserResourceUsage(partition string, user string) (*dao.UserResourceUsageDAOInfo, error) {
req, err := c.newRequest("GET", fmt.Sprintf(configmanager.UserTrackerPath, partition, user), nil)
if err != nil {
return nil, err
return nil, err
}
var userUsage *dao.UserResourceUsageDAOInfo
_, err = c.do(req, userUsage)
Expand All @@ -421,7 +421,7 @@ func (c *RClient) GetUserResourceUsage(partition string, user string) (*dao.User
func (c *RClient) GetGroupsResourceUsage(partition string) ([]*dao.GroupResourceUsageDAOInfo, error) {
req, err := c.newRequest("GET", fmt.Sprintf(configmanager.GroupsTrackerPath, partition), nil)
if err != nil {
return nil, err
return nil, err
}
var gourpsUsage []*dao.GroupResourceUsageDAOInfo
_, err = c.do(req, gourpsUsage)
Expand All @@ -431,9 +431,47 @@ func (c *RClient) GetGroupsResourceUsage(partition string) ([]*dao.GroupResource
func (c *RClient) GetGroupResourceUsage(partition string, user string) (*dao.GroupResourceUsageDAOInfo, error) {
req, err := c.newRequest("GET", fmt.Sprintf(configmanager.GroupTrackerPath, partition, user), nil)
if err != nil {
return nil, err
return nil, err
}
var groupUsage *dao.GroupResourceUsageDAOInfo
_, err = c.do(req, groupUsage)
return groupUsage, err
}

func (c *RClient) GetQueueFromUserResourceUsage(usages []*dao.UserResourceUsageDAOInfo, queueName string, user string) (*dao.ResourceUsageDAOInfo, error) {
var result *dao.ResourceUsageDAOInfo
for _, usage := range usages {
if usage.UserName == user {
result = usage.Queues
}
}
return QueueFromResourceUsage(result, queueName)
}

func (c *RClient) GetQueueFromGroupResourceUsage(usages []*dao.GroupResourceUsageDAOInfo, queueName string, group string) (*dao.ResourceUsageDAOInfo, error) {
var result *dao.ResourceUsageDAOInfo
for _, usage := range usages {
if usage.GroupName == group {
result = usage.Queues
}
}
return QueueFromResourceUsage(result, queueName)
}

func QueueFromResourceUsage(root *dao.ResourceUsageDAOInfo, queueName string) (*dao.ResourceUsageDAOInfo, error) {
if root == nil {
return nil, fmt.Errorf("ResourceUsage not found: %s", queueName)
}

if queueName == "root" {
return root, nil
}

var allSubQueues = root.Children
for _, subQ := range allSubQueues {
if subQ.QueuePath == queueName {
return subQ, nil
}
}
return nil, fmt.Errorf("ResourceUsage not found: %s", queueName)
}
78 changes: 65 additions & 13 deletions test/e2e/user_qauta_tracing/user_qauta_tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,25 @@ package user_qauta_tracing_test

import (
"fmt"
"strings"

v1 "k8s.io/api/core/v1"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
tests "github.com/apache/yunikorn-k8shim/test/e2e"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/k8s"
"github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
)

const (
NANESPACE_LENGTH = 10
WAIT_INTERVAL = 60
DEFAULT_PARTITION = "default"
ROOT_QUEUE = "root"
DOT = "."
)

var _ = Describe("UserQuataTracing: Two leaf queus for two groups", func() {
Expand All @@ -35,14 +34,17 @@ var _ = Describe("UserQuataTracing: Two leaf queus for two groups", func() {
})

It("User qauta trace with 3 users and 2 groups", func() {
groups := []string{"staff", "stduent"}
users := []string{"teacher", "student", "assistant"}
queuePath := []string{"root", "root.staff_resources", "root.students_resources"}
yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "", annotation, func(sc *configs.SchedulerConfig) error {
// remove placement rules so we can control queue
sc.Partitions[0].PlacementRules = nil
queuesConfigs := []struct {
partition, parentQueue, QueueName string
}{
{DEFAULT_PARTITION, ROOT_QUEUE, "students_resources"},
{DEFAULT_PARTITION, ROOT_QUEUE, "staff_resources"},
{DEFAULT_PARTITION, queuePath[0], "students_resources"},
{DEFAULT_PARTITION, queuePath[0], "staff_resources"},
}
for _, queueConfig := range queuesConfigs {
By(fmt.Sprintf("Add child queue %s to the parent queue %s", queueConfig.QueueName, queueConfig.parentQueue))
Expand All @@ -60,10 +62,10 @@ var _ = Describe("UserQuataTracing: Two leaf queus for two groups", func() {
AppID, Queue, UserName string
GroupsNames []string
}{
{"teacher-app-01", strings.Join([]string{ROOT_QUEUE, "staff_resources"}, DOT), "teacher", []string{"staff"}},
{"students-app-01", strings.Join([]string{ROOT_QUEUE, "students_resources"}, DOT), "student", []string{"students"}},
{"assistant-app-01", strings.Join([]string{ROOT_QUEUE, "students_resources"}, DOT), "assistant", []string{"students"}},
{"assistant-app-02", strings.Join([]string{ROOT_QUEUE, "staff_resources"}, DOT), "assistant", []string{"staff"}},
{"teacher-app-01", queuePath[1], users[0], []string{groups[0]}},
{"students-app-01", queuePath[2], users[1], []string{groups[1]}},
{"assistant-app-01", queuePath[2], users[2], []string{groups[1]}},
{"assistant-app-02", queuePath[1], users[2], []string{groups[0]}},
}
for _, config := range configs {
By(fmt.Sprintf("Deploy the sleep app %s to the %s namespace", config.AppID, ns))
Expand All @@ -84,12 +86,54 @@ var _ = Describe("UserQuataTracing: Two leaf queus for two groups", func() {
Ω(podErr).NotTo(HaveOccurred())
}
restClient := yunikorn.RClient{}
users, err := restClient.GetUsersResourceUsage(DEFAULT_PARTITION)
var usedResource yunikorn.ResourceUsage
By("Check total number of users and groups")
usersUsage, err := restClient.GetUsersResourceUsage(DEFAULT_PARTITION)
Ω(err).NotTo(HaveOccurred())
Ω(len(users)).To(Equal(3), "Total number of users is not correct")
groups, err := restClient.GetGroupsResourceUsage(DEFAULT_PARTITION)
Ω(len(usersUsage)).To(Equal(3), "Total number of users is not correct")
groupsUsage, err := restClient.GetGroupsResourceUsage(DEFAULT_PARTITION)
Ω(err).NotTo(HaveOccurred())
Ω(len(groups)).To(Equal(2), "Total number of groups is not correct")
Ω(len(groupsUsage)).To(Equal(2), "Total number of groups is not correct")

By("Check user resource usage of assistant in each queue")
userUsage, err := restClient.GetUserResourceUsage(DEFAULT_PARTITION, users[2])
Ω(err).NotTo(HaveOccurred())
Ω(userUsage).To(Equal(map[string]string{configs[2].AppID: groups[1], configs[3].AppID: groups[0]}))
queue, err := restClient.GetQueueFromUserResourceUsage(usersUsage, queuePath[0], users[2])
Ω(err).NotTo(HaveOccurred())
Ω(queue.RunningApplications).To(Equal([]string{configs[2].AppID, configs[3].AppID}))
usedResource.ParseResourceUsage(parseResource(queue.ResourceUsage))
Ω(usedResource).To(Equal(map[string]int64{siCommon.CPU: 200, siCommon.Memory: 100}))
queue, err = restClient.GetQueueFromUserResourceUsage(usersUsage, queuePath[1], users[2])
Ω(err).NotTo(HaveOccurred())
Ω(queue.RunningApplications).To(Equal([]string{configs[3].AppID}))
usedResource.ParseResourceUsage(parseResource(queue.ResourceUsage))
Ω(usedResource).To(Equal(map[string]int64{siCommon.CPU: 100, siCommon.Memory: 50}))
queue, err = restClient.GetQueueFromUserResourceUsage(usersUsage, queuePath[2], users[2])
Ω(err).NotTo(HaveOccurred())
Ω(queue.RunningApplications).To(Equal([]string{configs[2].AppID}))
usedResource.ParseResourceUsage(parseResource(queue.ResourceUsage))
Ω(usedResource).To(Equal(map[string]int64{siCommon.CPU: 100, siCommon.Memory: 50}))

By("Check group resource usage of staff in each queue")
groupUsage, err := restClient.GetGroupResourceUsage(DEFAULT_PARTITION, groups[0])
Ω(err).NotTo(HaveOccurred())
Ω(groupUsage.Applications).To(Equal([]string{configs[0].AppID, configs[3].AppID}), "running application IDs are not expected")
queue, err = restClient.GetQueueFromGroupResourceUsage(groupsUsage, queuePath[0], groups[0])
Ω(err).NotTo(HaveOccurred())
Ω(queue.RunningApplications).To(Equal([]string{configs[0].AppID, configs[3].AppID}))
usedResource.ParseResourceUsage(parseResource(queue.ResourceUsage))
Ω(usedResource).To(Equal(map[string]int64{siCommon.CPU: 200, siCommon.Memory: 100}))
queue, err = restClient.GetQueueFromGroupResourceUsage(groupsUsage, queuePath[1], groups[0])
Ω(err).NotTo(HaveOccurred())
Ω(queue.RunningApplications).To(Equal([]string{configs[0].AppID, configs[3].AppID}))
usedResource.ParseResourceUsage(parseResource(queue.ResourceUsage))
Ω(usedResource).To(Equal(map[string]int64{siCommon.CPU: 200, siCommon.Memory: 100}))
queue, err = restClient.GetQueueFromGroupResourceUsage(groupsUsage, queuePath[2], groups[0])
Ω(err).NotTo(HaveOccurred())
Ω(queue.RunningApplications).To(Equal([]string{}))
usedResource.ParseResourceUsage(parseResource(queue.ResourceUsage))
Ω(usedResource).To(Equal(map[string]int64{siCommon.CPU: 0, siCommon.Memory: 0}))
})

AfterEach(func() {
Expand All @@ -103,3 +147,11 @@ var _ = Describe("UserQuataTracing: Two leaf queus for two groups", func() {
Ω(err).NotTo(HaveOccurred())
})
})

func parseResource(res *resources.Resource) map[string]int64 {
result := make(map[string]int64)
for key, value := range res.Resources {
result[key] = int64(value)
}
return result
}

0 comments on commit 97e86d3

Please sign in to comment.