Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding scheduler metrics to expose cluster cordoned status (#261) #4019

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions internal/common/metrics/scheduler_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ var ClusterAvailableCapacityDesc = prometheus.NewDesc(
nil,
)

var ClusterCordonedStatusDesc = prometheus.NewDesc(
MetricPrefix+"cluster_cordoned_status",
"Cluster cordoned status",
[]string{"cluster", "reason"},
nil,
)

var QueuePriorityDesc = prometheus.NewDesc(
MetricPrefix+"queue_priority",
"Queue priority factor",
Expand Down Expand Up @@ -375,6 +382,10 @@ func NewClusterTotalCapacity(value float64, cluster string, pool string, resourc
return prometheus.MustNewConstMetric(ClusterCapacityDesc, prometheus.GaugeValue, value, cluster, pool, resource, nodeType)
}

func NewClusterCordonedStatus(value float64, cluster string, reason string) prometheus.Metric {
return prometheus.MustNewConstMetric(ClusterCordonedStatusDesc, prometheus.GaugeValue, value, cluster, reason)
}

func NewQueueAllocated(value float64, queue string, cluster string, pool string, priorityClass string, resource string, nodeType string) prometheus.Metric {
return prometheus.MustNewConstMetric(QueueAllocatedDesc, prometheus.GaugeValue, value, cluster, pool, priorityClass, queue, queue, resource, nodeType)
}
Expand Down
33 changes: 33 additions & 0 deletions internal/scheduler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,22 @@ type clusterMetricKey struct {
nodeType string
}

type clusterCordonedStatus struct {
status float64
reason string
}

func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]prometheus.Metric, error) {
executors, err := c.executorRepository.GetExecutors(ctx)
if err != nil {
return nil, err
}
executorSettings, err := c.executorRepository.GetExecutorSettings(ctx)
if err != nil {
return nil, err
}

cordonedStatusByCluster := map[string]*clusterCordonedStatus{}
phaseCountByQueue := map[queuePhaseMetricKey]int{}
allocatedResourceByQueue := map[queueMetricKey]schedulerobjects.ResourceList{}
usedResourceByQueue := map[queueMetricKey]schedulerobjects.ResourceList{}
Expand All @@ -278,6 +288,11 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p

txn := c.jobDb.ReadTxn()
for _, executor := range executors {
// We may not have executorSettings for all known executors, but we still want a cordon status metric for them.
cordonedStatusByCluster[executor.Id] = &clusterCordonedStatus{
status: 0.0,
reason: "",
}
for _, node := range executor.Nodes {
nodePool := node.GetPool()
awayPools := poolToAwayPools[nodePool]
Expand Down Expand Up @@ -367,6 +382,21 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p
}
}

for _, executorSetting := range executorSettings {
if executorSetting.Cordoned {
if cordonedValue, ok := cordonedStatusByCluster[executorSetting.ExecutorId]; ok {
cordonedValue.status = 1.0
cordonedValue.reason = executorSetting.CordonReason
} else {
// We may have settings for executors that don't exist in the repository.
cordonedStatusByCluster[executorSetting.ExecutorId] = &clusterCordonedStatus{
status: 1.0,
reason: executorSetting.CordonReason,
}
}
}
}

for _, pool := range c.floatingResourceTypes.AllPools() {
totalFloatingResources := c.floatingResourceTypes.GetTotalAvailableForPool(pool)
clusterKey := clusterMetricKey{
Expand Down Expand Up @@ -408,6 +438,9 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p
for k, v := range totalNodeCountByCluster {
clusterMetrics = append(clusterMetrics, commonmetrics.NewClusterTotalCapacity(float64(v), k.cluster, k.pool, "nodes", k.nodeType))
}
for cluster, v := range cordonedStatusByCluster {
clusterMetrics = append(clusterMetrics, commonmetrics.NewClusterCordonedStatus(v.status, cluster, v.reason))
}
return clusterMetrics, nil
}

Expand Down
43 changes: 39 additions & 4 deletions internal/scheduler/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) {

executorRepository := schedulermocks.NewMockExecutorRepository(ctrl)
executorRepository.EXPECT().GetExecutors(ctx).Return([]*schedulerobjects.Executor{}, nil)
executorRepository.EXPECT().GetExecutorSettings(ctx).Return([]*schedulerobjects.ExecutorSettings{}, nil)

collector := NewMetricsCollector(
jobDb,
Expand Down Expand Up @@ -203,10 +204,11 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
executorWithJobs := createExecutor("cluster-1", nodeWithJobs)

tests := map[string]struct {
jobDbJobs []*jobdb.Job
floatingResourceTypes *floatingresources.FloatingResourceTypes
executors []*schedulerobjects.Executor
expected []prometheus.Metric
jobDbJobs []*jobdb.Job
floatingResourceTypes *floatingresources.FloatingResourceTypes
executors []*schedulerobjects.Executor
expected []prometheus.Metric
expectedExecutorSettings []*schedulerobjects.ExecutorSettings
}{
"empty cluster single node type": {
jobDbJobs: []*jobdb.Job{},
Expand All @@ -218,7 +220,9 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
commonmetrics.NewClusterTotalCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"),
commonmetrics.NewClusterTotalCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"),
commonmetrics.NewClusterTotalCapacity(2, "cluster-1", testfixtures.TestPool, "nodes", "type-1"),
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", ""),
},
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
},
"empty cluster multi node type": {
jobDbJobs: []*jobdb.Job{},
Expand All @@ -236,7 +240,9 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
commonmetrics.NewClusterTotalCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-2"),
commonmetrics.NewClusterTotalCapacity(256*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-2"),
commonmetrics.NewClusterTotalCapacity(1, "cluster-1", testfixtures.TestPool, "nodes", "type-2"),
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", ""),
},
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
},
"empty cluster with unschedulable node": {
jobDbJobs: []*jobdb.Job{},
Expand All @@ -248,7 +254,9 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
commonmetrics.NewClusterTotalCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"),
commonmetrics.NewClusterTotalCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"),
commonmetrics.NewClusterTotalCapacity(2, "cluster-1", testfixtures.TestPool, "nodes", "type-1"),
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", ""),
},
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
},
"cluster with jobs": {
jobDbJobs: []*jobdb.Job{job1, job2},
Expand All @@ -266,7 +274,9 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
commonmetrics.NewClusterTotalCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"),
commonmetrics.NewClusterTotalCapacity(256*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"),
commonmetrics.NewClusterTotalCapacity(1, "cluster-1", testfixtures.TestPool, "nodes", "type-1"),
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", ""),
},
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
},
"jobs missing from jobDb": {
jobDbJobs: []*jobdb.Job{},
Expand All @@ -280,7 +290,9 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
commonmetrics.NewClusterTotalCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"),
commonmetrics.NewClusterTotalCapacity(256*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"),
commonmetrics.NewClusterTotalCapacity(1, "cluster-1", testfixtures.TestPool, "nodes", "type-1"),
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", ""),
},
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
},
"floating resources": {
jobDbJobs: []*jobdb.Job{},
Expand All @@ -290,6 +302,27 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
commonmetrics.NewClusterAvailableCapacity(10, "floating", "pool", "test-floating-resource", ""),
commonmetrics.NewClusterTotalCapacity(10, "floating", "pool", "test-floating-resource", ""),
},
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
},
"cordoned cluster single node type": {
jobDbJobs: []*jobdb.Job{},
executors: []*schedulerobjects.Executor{executor},
expected: []prometheus.Metric{
commonmetrics.NewClusterAvailableCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"),
commonmetrics.NewClusterAvailableCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"),
commonmetrics.NewClusterAvailableCapacity(2, "cluster-1", testfixtures.TestPool, "nodes", "type-1"),
commonmetrics.NewClusterTotalCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"),
commonmetrics.NewClusterTotalCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"),
commonmetrics.NewClusterTotalCapacity(2, "cluster-1", testfixtures.TestPool, "nodes", "type-1"),
commonmetrics.NewClusterCordonedStatus(1.0, "cluster-1", "bad executor"),
},
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{
{
ExecutorId: "cluster-1",
Cordoned: true,
CordonReason: "bad executor",
},
},
},
}
for name, tc := range tests {
Expand All @@ -311,6 +344,7 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {

executorRepository := schedulermocks.NewMockExecutorRepository(ctrl)
executorRepository.EXPECT().GetExecutors(ctx).Return(tc.executors, nil)
executorRepository.EXPECT().GetExecutorSettings(ctx).Return(tc.expectedExecutorSettings, nil)

if tc.floatingResourceTypes == nil {
tc.floatingResourceTypes = testfixtures.TestEmptyFloatingResources
Expand Down Expand Up @@ -412,6 +446,7 @@ func TestMetricsCollector_TestCollect_ClusterMetricsAvailableCapacity(t *testing

executorRepository := schedulermocks.NewMockExecutorRepository(ctrl)
executorRepository.EXPECT().GetExecutors(ctx).Return(executors, nil)
executorRepository.EXPECT().GetExecutorSettings(ctx).Return([]*schedulerobjects.ExecutorSettings{}, nil)

collector := NewMetricsCollector(
jobDb,
Expand Down