Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fixes (#259) #4013

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions internal/common/ingest/ingestion_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"

"github.com/armadaproject/armada/internal/common"
"github.com/armadaproject/armada/internal/common/armadacontext"
commonconfig "github.com/armadaproject/armada/internal/common/config"
commonmetrics "github.com/armadaproject/armada/internal/common/ingest/metrics"
Expand Down Expand Up @@ -63,7 +62,6 @@ type Sink[T HasPulsarMessageIds] interface {
// exhausted and a Sink capable of exhausting these objects
type IngestionPipeline[T HasPulsarMessageIds, U utils.ArmadaEvent] struct {
pulsarConfig commonconfig.PulsarConfig
metricsPort uint16
metrics *commonmetrics.Metrics
pulsarTopic string
pulsarSubscriptionName string
Expand Down Expand Up @@ -93,13 +91,11 @@ func NewIngestionPipeline[T HasPulsarMessageIds, U utils.ArmadaEvent](
metricPublisher BatchMetricPublisher[U],
converter InstructionConverter[T, U],
sink Sink[T],
metricsPort uint16,
metrics *commonmetrics.Metrics,
) *IngestionPipeline[T, U] {
return &IngestionPipeline[T, U]{
pulsarConfig: pulsarConfig,
pulsarTopic: pulsarTopic,
metricsPort: metricsPort,
metrics: metrics,
pulsarSubscriptionName: pulsarSubscriptionName,
pulsarBatchSize: pulsarBatchSize,
Expand All @@ -116,9 +112,6 @@ func NewIngestionPipeline[T HasPulsarMessageIds, U utils.ArmadaEvent](

// Run will run the ingestion pipeline until the supplied context is shut down
func (i *IngestionPipeline[T, U]) Run(ctx *armadacontext.Context) error {
shutdownMetricServer := common.ServeMetrics(i.metricsPort)
defer shutdownMetricServer()

// Waitgroup that wil fire when the pipeline has been torn down
wg := &sync.WaitGroup{}
wg.Add(1)
Expand Down
79 changes: 75 additions & 4 deletions internal/common/ingest/ingestion_pipeline_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ingest

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -263,6 +264,7 @@ func TestRun_ControlPlaneEvents_HappyPath_SingleMessage(t *testing.T) {

mockConsumer.assertDidAck(messages)
sink.assertDidProcess(messages)
sink.assertProcessedMessageCount(len(messages))
}

func TestRun_ControlPlaneEvents_HappyPath_MultipleMessages(t *testing.T) {
Expand All @@ -286,6 +288,7 @@ func TestRun_ControlPlaneEvents_HappyPath_MultipleMessages(t *testing.T) {

mockConsumer.assertDidAck(messages)
sink.assertDidProcess(messages)
sink.assertProcessedMessageCount(len(messages))
}

func TestRun_ControlPlaneEvents_LimitsProcessingBatchSize(t *testing.T) {
Expand Down Expand Up @@ -337,6 +340,7 @@ func TestRun_ControlPlaneEvents_LimitsProcessingBatchSize(t *testing.T) {
assert.True(t, eventCount < tc.batchSize+1)
}
sink.assertDidProcess(messages)
sink.assertProcessedMessageCount(len(messages))
})
}
}
Expand All @@ -356,7 +360,6 @@ func testControlPlaneEventsPipeline(consumer pulsar.Consumer, converter Instruct
metricPublisher: controlplaneevents_ingest_utils.BatchMetricPublisher,
converter: converter,
sink: sink,
metricsPort: 8080,
metrics: testMetrics,
consumer: consumer,
}
Expand All @@ -365,32 +368,42 @@ func testControlPlaneEventsPipeline(consumer pulsar.Consumer, converter Instruct
type simpleSink struct {
simpleMessages map[pulsar.MessageID]*simpleMessage
t *testing.T
mutex sync.Mutex
}

func newSimpleSink(t *testing.T) *simpleSink {
return &simpleSink{
simpleMessages: make(map[pulsar.MessageID]*simpleMessage),
t: t,
mutex: sync.Mutex{},
}
}

func (s *simpleSink) Store(_ *armadacontext.Context, msg *simpleMessages) error {
for _, simpleMessage := range msg.msgs {
s.simpleMessages[simpleMessage.id] = simpleMessage
s.mutex.Lock()
if simpleMessage != nil {
s.simpleMessages[simpleMessage.id] = simpleMessage
}
s.mutex.Unlock()
}
return nil
}

func (s *simpleSink) assertDidProcess(messages []pulsar.Message) {
s.t.Helper()
assert.Len(s.t, s.simpleMessages, len(messages))
for _, msg := range messages {
simpleMessage, ok := s.simpleMessages[msg.ID()]
assert.True(s.t, ok)
assert.Greater(s.t, simpleMessage.size, 0)
}
}

func (s *simpleSink) assertProcessedMessageCount(count int) {
s.t.Helper()
assert.Len(s.t, s.simpleMessages, count)
}

func TestRun_JobSetEvents_HappyPath_SingleMessage(t *testing.T) {
ctx, cancel := armadacontext.WithDeadline(armadacontext.Background(), time.Now().Add(10*time.Second))
messages := []pulsar.Message{
Expand All @@ -410,6 +423,7 @@ func TestRun_JobSetEvents_HappyPath_SingleMessage(t *testing.T) {

mockConsumer.assertDidAck(messages)
sink.assertDidProcess(messages)
sink.assertProcessedMessageCount(len(messages))
}

func TestRun_JobSetEvents_HappyPath_MultipleMessages(t *testing.T) {
Expand All @@ -433,6 +447,7 @@ func TestRun_JobSetEvents_HappyPath_MultipleMessages(t *testing.T) {

mockConsumer.assertDidAck(messages)
sink.assertDidProcess(messages)
sink.assertProcessedMessageCount(len(messages))
}

func TestRun_JobSetEvents_LimitsProcessingBatchSize(t *testing.T) {
Expand Down Expand Up @@ -490,10 +505,67 @@ func TestRun_JobSetEvents_LimitsProcessingBatchSize(t *testing.T) {
assert.True(t, eventCount < tc.batchSize+tc.numberOfEventsPerMessage)
}
sink.assertDidProcess(messages)
sink.assertProcessedMessageCount(len(messages))
})
}
}

// This will become a more common use case - multiple ingesters ingesting into the same sink
func TestRun_MultipleSimultaneousIngesters(t *testing.T) {
jsCtx, jsCancel := armadacontext.WithDeadline(armadacontext.Background(), time.Now().Add(10*time.Second))
cpCtx, cpCancel := armadacontext.WithDeadline(armadacontext.Background(), time.Now().Add(10*time.Second))
jobSetMessages := []pulsar.Message{
pulsarutils.NewPulsarMessage(1, baseTime, marshal(t, succeeded)),
pulsarutils.NewPulsarMessage(2, baseTime.Add(1*time.Second), marshal(t, pendingAndRunning)),
pulsarutils.NewPulsarMessage(3, baseTime.Add(2*time.Second), marshal(t, failed)),
}
controlPlaneMessages := []pulsar.Message{
pulsarutils.NewPulsarMessage(4, baseTime, marshal(t, f.UpsertExecutorSettingsCordon)),
pulsarutils.NewPulsarMessage(5, baseTime.Add(1*time.Second), marshal(t, f.UpsertExecutorSettingsUncordon)),
pulsarutils.NewPulsarMessage(6, baseTime.Add(2*time.Second), marshal(t, f.DeleteExecutorSettings)),
}
mockJobSetEventsConsumer := newMockPulsarConsumer(t, jobSetMessages, jsCancel)
mockControlPlaneEventsConsumer := newMockPulsarConsumer(t, controlPlaneMessages, cpCancel)

jobSetEventsConverter := newSimpleEventSequenceConverter(t)
controlPlaneEventsConverter := newSimpleControlPlaneEventConverter(t)

sink := newSimpleSink(t)

jobSetEventsPipeline := testJobSetEventsPipeline(mockJobSetEventsConsumer, jobSetEventsConverter, sink)
controlPlaneEventsPipeline := testControlPlaneEventsPipeline(mockControlPlaneEventsConsumer, controlPlaneEventsConverter, sink)

var jsErr error
var cpErr error
wg := sync.WaitGroup{}
start := time.Now()

wg.Add(1)
go func() {
defer wg.Done()
jsErr = jobSetEventsPipeline.Run(jsCtx)
}()

wg.Add(1)
go func() {
defer wg.Done()
cpErr = controlPlaneEventsPipeline.Run(cpCtx)
}()

wg.Wait()
elapsed := time.Since(start)

assert.NoError(t, jsErr)
assert.NoError(t, cpErr)
assert.LessOrEqual(t, elapsed, batchDuration*2)

mockJobSetEventsConsumer.assertDidAck(jobSetMessages)
mockControlPlaneEventsConsumer.assertDidAck(controlPlaneMessages)
sink.assertDidProcess(jobSetMessages)
sink.assertDidProcess(controlPlaneMessages)
sink.assertProcessedMessageCount(len(controlPlaneMessages) + len(jobSetMessages))
}

func testJobSetEventsPipeline(consumer pulsar.Consumer, converter InstructionConverter[*simpleMessages, *armadaevents.EventSequence], sink Sink[*simpleMessages]) *IngestionPipeline[*simpleMessages, *armadaevents.EventSequence] {
return &IngestionPipeline[*simpleMessages, *armadaevents.EventSequence]{
pulsarConfig: commonconfig.PulsarConfig{
Expand All @@ -509,7 +581,6 @@ func testJobSetEventsPipeline(consumer pulsar.Consumer, converter InstructionCon
metricPublisher: jobsetevents.BatchMetricPublisher,
converter: converter,
sink: sink,
metricsPort: 8080,
metrics: testMetrics,
consumer: consumer,
}
Expand Down
15 changes: 13 additions & 2 deletions internal/common/metrics/scheduler_metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package metrics

import (
"regexp"

"github.com/prometheus/client_golang/prometheus"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -394,8 +396,10 @@ func NewQueueLabelsMetric(queue string, labels map[string]string) prometheus.Met
values = append(values, queue)

for key, value := range labels {
metricLabels = append(metricLabels, key)
values = append(values, value)
if isValidMetricLabelName(key) {
metricLabels = append(metricLabels, key)
values = append(values, value)
}
}

queueLabelsDesc := prometheus.NewDesc(
Expand All @@ -407,3 +411,10 @@ func NewQueueLabelsMetric(queue string, labels map[string]string) prometheus.Met

return prometheus.MustNewConstMetric(queueLabelsDesc, prometheus.GaugeValue, 1, values...)
}

func isValidMetricLabelName(labelName string) bool {
// Prometheus metric label names must match the following regex: [a-zA-Z_][a-zA-Z0-9_]*
// See: https://prometheus.io/docs/concepts/data_model/
match, _ := regexp.MatchString("^[a-zA-Z_][a-zA-Z0-9_]*$", labelName)
return match
}
54 changes: 54 additions & 0 deletions internal/common/metrics/scheduler_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package metrics

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestQueueLabelValidation(t *testing.T) {
tests := map[string]struct {
labelName string
isValid bool
}{
"Empty label name": {
labelName: "",
isValid: false,
},
"Priority label": {
labelName: "priority",
isValid: true,
},
"Label name with underscores": {
labelName: "priority__cpu_pool",
isValid: true,
},
"Label name with spaces": {
labelName: "priority cpu pool",
isValid: false,
},
"Alphanumeric label name": {
labelName: "cluster_12_user",
isValid: true,
},
"Invalid Kubernetes-style label name 1": {
labelName: "armadaproject.io/category",
isValid: false,
},
"Invalid Kubernetes-style label name 2": {
labelName: "armadaproject.io/ttl",
isValid: false,
},
"Invalid Kubernetes-style label name 3": {
labelName: "kubernetes.io/metadata.name",
isValid: false,
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
isValid := isValidMetricLabelName(tc.labelName)
assert.Equal(t, tc.isValid, isValid)
})
}
}
6 changes: 5 additions & 1 deletion internal/eventingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"

"github.com/armadaproject/armada/internal/common"
"github.com/armadaproject/armada/internal/common/app"
"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/compress"
Expand Down Expand Up @@ -62,6 +63,10 @@ func Run(config *configuration.EventIngesterConfiguration) {
}
converter := convert.NewEventConverter(compressor, uint(config.MaxOutputMessageSizeBytes), metrics)

// Start metric server
shutdownMetricServer := common.ServeMetrics(config.MetricsPort)
defer shutdownMetricServer()

ingester := ingest.NewIngestionPipeline[*model.BatchUpdate, *armadaevents.EventSequence](
config.Pulsar,
config.Pulsar.JobsetEventsTopic,
Expand All @@ -75,7 +80,6 @@ func Run(config *configuration.EventIngesterConfiguration) {
jobsetevents.BatchMetricPublisher,
converter,
eventDb,
config.MetricsPort,
metrics,
)
if err := ingester.Run(app.CreateContextWithShutdown()); err != nil {
Expand Down
6 changes: 5 additions & 1 deletion internal/lookoutingesterv2/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"

"github.com/armadaproject/armada/internal/common"
"github.com/armadaproject/armada/internal/common/app"
"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/compress"
Expand Down Expand Up @@ -55,6 +56,10 @@ func Run(config *configuration.LookoutIngesterV2Configuration) {
log.Fatalf("Pprof setup failed, exiting, %v", err)
}

// Start metric server
shutdownMetricServer := common.ServeMetrics(config.MetricsPort)
defer shutdownMetricServer()

converter := instructions.NewInstructionConverter(m.Metrics, config.UserAnnotationPrefix, compressor)

ingester := ingest.NewIngestionPipeline[*model.InstructionSet, *armadaevents.EventSequence](
Expand All @@ -70,7 +75,6 @@ func Run(config *configuration.LookoutIngesterV2Configuration) {
jobsetevents.BatchMetricPublisher,
converter,
lookoutDb,
config.MetricsPort,
m.Metrics,
)

Expand Down
7 changes: 5 additions & 2 deletions internal/scheduleringester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"

"github.com/armadaproject/armada/internal/common"
"github.com/armadaproject/armada/internal/common/app"
"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/database"
Expand Down Expand Up @@ -43,6 +44,10 @@ func Run(config Configuration) error {
log.Fatalf("Pprof setup failed, exiting, %v", err)
}

// Start metric server
shutdownMetricServer := common.ServeMetrics(config.MetricsPort)
defer shutdownMetricServer()

jobSetEventsIngester := ingest.NewIngestionPipeline[*DbOperationsWithMessageIds, *armadaevents.EventSequence](
config.Pulsar,
config.Pulsar.JobsetEventsTopic,
Expand All @@ -56,7 +61,6 @@ func Run(config Configuration) error {
jobsetevents.BatchMetricPublisher,
jobSetEventsConverter,
schedulerDb,
config.MetricsPort,
svcMetrics,
)

Expand All @@ -78,7 +82,6 @@ func Run(config Configuration) error {
controlplaneevents_ingest_utils.BatchMetricPublisher,
controlPlaneEventsConverter,
schedulerDb,
config.MetricsPort,
svcMetrics,
)

Expand Down