diff --git a/config/armada/config.yaml b/config/armada/config.yaml index a18bddffbc8..ba6337a19c4 100644 --- a/config/armada/config.yaml +++ b/config/armada/config.yaml @@ -78,6 +78,7 @@ pulsar: eventsPrinterSubscription: "EventsPrinter" maxAllowedEventsPerMessage: 1000 maxAllowedMessageSize: 4194304 # 4MB + sendTimeout: 5s receiverQueueSize: 100 postgres: connection: diff --git a/config/eventingester/config.yaml b/config/eventingester/config.yaml index b75ce43c98b..7175c5cf85b 100644 --- a/config/eventingester/config.yaml +++ b/config/eventingester/config.yaml @@ -7,7 +7,6 @@ redis: pulsar: URL: pulsar://pulsar:6650 jobsetEventsTopic: events - receiveTimeout: 5s backoffTime: 1s receiverQueueSize: 100 subscriptionName: "events-ingester" diff --git a/config/lookoutingesterv2/config.yaml b/config/lookoutingesterv2/config.yaml index a1b54ba35ad..4345fd9cdaa 100644 --- a/config/lookoutingesterv2/config.yaml +++ b/config/lookoutingesterv2/config.yaml @@ -10,7 +10,6 @@ metricsPort: 9002 pulsar: URL: "pulsar://pulsar:6650" jobsetEventsTopic: "events" - receiveTimeout: 5s backoffTime: 1s receiverQueueSize: 100 subscriptionName: "lookout-ingester-v2" diff --git a/config/scheduler/config.yaml b/config/scheduler/config.yaml index 4b281652a3c..9d5b0690d4c 100644 --- a/config/scheduler/config.yaml +++ b/config/scheduler/config.yaml @@ -39,6 +39,7 @@ pulsar: compressionLevel: faster maxAllowedEventsPerMessage: 1000 maxAllowedMessageSize: 4194304 #4Mi + sendTimeout: 5s armadaApi: armadaUrl: "server:50051" forceNoTls: true diff --git a/config/scheduleringester/config.yaml b/config/scheduleringester/config.yaml index 717bdbbf4c9..9e4ea45f13f 100644 --- a/config/scheduleringester/config.yaml +++ b/config/scheduleringester/config.yaml @@ -11,7 +11,6 @@ metrics: pulsar: URL: "pulsar://localhost:6650" jobsetEventsTopic: "events" - receiveTimeout: 5s backoffTime: 1s receiverQueueSize: 100 subscriptionName: "scheduler-ingester" diff --git a/internal/armada/configuration/types.go b/internal/armada/configuration/types.go index dd3264c4a48..4baef849d4f 100644 --- a/internal/armada/configuration/types.go +++ b/internal/armada/configuration/types.go @@ -70,8 +70,8 @@ type PulsarConfig struct { MaxAllowedEventsPerMessage int `validate:"gte=0"` // Maximum allowed message size in bytes MaxAllowedMessageSize uint - // Timeout when polling pulsar for messages - ReceiveTimeout time.Duration + // Timeout when sending messages asynchronously + SendTimeout time.Duration `validate:"required"` // Backoff from polling when Pulsar returns an error BackoffTime time.Duration // Number of pulsar messages that will be queued by the pulsar consumer. diff --git a/internal/armada/mocks/generate.go b/internal/armada/mocks/generate.go index cf4cfba22d3..0fc5f59e161 100644 --- a/internal/armada/mocks/generate.go +++ b/internal/armada/mocks/generate.go @@ -1,6 +1,5 @@ package mocks // Mock implementations used by tests -//go:generate mockgen -destination=./mock_repository.go -package=mocks "github.com/armadaproject/armada/internal/armada/queue" QueueRepository -//go:generate mockgen -destination=./mock_deduplicator.go -package=mocks "github.com/armadaproject/armada/internal/armada/submit" Deduplicator,Publisher +//go:generate mockgen -destination=./mock_deduplicator.go -package=mocks "github.com/armadaproject/armada/internal/armada/submit" Deduplicator //go:generate mockgen -destination=./mock_authorizer.go -package=mocks "github.com/armadaproject/armada/internal/armada/server" ActionAuthorizer diff --git a/internal/armada/mocks/mock_deduplicator.go b/internal/armada/mocks/mock_deduplicator.go index 30f29b3ed0b..edb4ef413c2 100644 --- a/internal/armada/mocks/mock_deduplicator.go +++ b/internal/armada/mocks/mock_deduplicator.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/armadaproject/armada/internal/armada/submit (interfaces: Deduplicator,Publisher) +// Source: github.com/armadaproject/armada/internal/armada/submit (interfaces: Deduplicator) // Package mocks is a generated GoMock package. package mocks @@ -9,7 +9,6 @@ import ( armadacontext "github.com/armadaproject/armada/internal/common/armadacontext" api "github.com/armadaproject/armada/pkg/api" - armadaevents "github.com/armadaproject/armada/pkg/armadaevents" gomock "github.com/golang/mock/gomock" ) @@ -64,52 +63,3 @@ func (mr *MockDeduplicatorMockRecorder) StoreOriginalJobIds(arg0, arg1, arg2 int mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StoreOriginalJobIds", reflect.TypeOf((*MockDeduplicator)(nil).StoreOriginalJobIds), arg0, arg1, arg2) } - -// MockPublisher is a mock of Publisher interface. -type MockPublisher struct { - ctrl *gomock.Controller - recorder *MockPublisherMockRecorder -} - -// MockPublisherMockRecorder is the mock recorder for MockPublisher. -type MockPublisherMockRecorder struct { - mock *MockPublisher -} - -// NewMockPublisher creates a new mock instance. -func NewMockPublisher(ctrl *gomock.Controller) *MockPublisher { - mock := &MockPublisher{ctrl: ctrl} - mock.recorder = &MockPublisherMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockPublisher) EXPECT() *MockPublisherMockRecorder { - return m.recorder -} - -// Close mocks base method. -func (m *MockPublisher) Close() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Close") -} - -// Close indicates an expected call of Close. -func (mr *MockPublisherMockRecorder) Close() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockPublisher)(nil).Close)) -} - -// PublishMessages mocks base method. -func (m *MockPublisher) PublishMessages(arg0 *armadacontext.Context, arg1 *armadaevents.EventSequence) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PublishMessages", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// PublishMessages indicates an expected call of PublishMessages. -func (mr *MockPublisherMockRecorder) PublishMessages(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PublishMessages", reflect.TypeOf((*MockPublisher)(nil).PublishMessages), arg0, arg1) -} diff --git a/internal/armada/server.go b/internal/armada/server.go index f24d1be5750..d2e0337e5e3 100644 --- a/internal/armada/server.go +++ b/internal/armada/server.go @@ -128,7 +128,7 @@ func Serve(ctx *armadacontext.Context, config *configuration.ArmadaConfig, healt CompressionLevel: config.Pulsar.CompressionLevel, BatchingMaxSize: config.Pulsar.MaxAllowedMessageSize, Topic: config.Pulsar.JobsetEventsTopic, - }, config.Pulsar.MaxAllowedEventsPerMessage, config.Pulsar.MaxAllowedMessageSize) + }, config.Pulsar.MaxAllowedEventsPerMessage, config.Pulsar.MaxAllowedMessageSize, config.Pulsar.SendTimeout) if err != nil { return errors.Wrapf(err, "error creating pulsar producer") } diff --git a/internal/armada/submit/submit_test.go b/internal/armada/submit/submit_test.go index 5572bc67a1d..524b1d9338e 100644 --- a/internal/armada/submit/submit_test.go +++ b/internal/armada/submit/submit_test.go @@ -16,6 +16,7 @@ import ( "github.com/armadaproject/armada/internal/armada/submit/testfixtures" "github.com/armadaproject/armada/internal/common/armadacontext" "github.com/armadaproject/armada/internal/common/auth/permission" + commonMocks "github.com/armadaproject/armada/internal/common/mocks" "github.com/armadaproject/armada/internal/common/util" "github.com/armadaproject/armada/pkg/api" "github.com/armadaproject/armada/pkg/armadaevents" @@ -23,7 +24,7 @@ import ( ) type mockObjects struct { - publisher *mocks.MockPublisher + publisher *commonMocks.MockPublisher queueRepo *mocks.MockQueueRepository deduplicator *mocks.MockDeduplicator authorizer *mocks.MockActionAuthorizer @@ -32,7 +33,7 @@ type mockObjects struct { func createMocks(t *testing.T) *mockObjects { ctrl := gomock.NewController(t) return &mockObjects{ - publisher: mocks.NewMockPublisher(ctrl), + publisher: commonMocks.NewMockPublisher(ctrl), queueRepo: mocks.NewMockQueueRepository(ctrl), deduplicator: mocks.NewMockDeduplicator(ctrl), authorizer: mocks.NewMockActionAuthorizer(ctrl), diff --git a/internal/common/ingest/ingestion_pipeline_test.go b/internal/common/ingest/ingestion_pipeline_test.go index 32f2644fc6e..322ad440baa 100644 --- a/internal/common/ingest/ingestion_pipeline_test.go +++ b/internal/common/ingest/ingestion_pipeline_test.go @@ -347,8 +347,7 @@ func TestRun_LimitsProcessingBatchSize(t *testing.T) { func testPipeline(consumer pulsar.Consumer, converter InstructionConverter[*simpleMessages], sink Sink[*simpleMessages]) *IngestionPipeline[*simpleMessages] { return &IngestionPipeline[*simpleMessages]{ pulsarConfig: configuration.PulsarConfig{ - ReceiveTimeout: 10 * time.Second, - BackoffTime: time.Second, + BackoffTime: time.Second, }, pulsarSubscriptionName: "subscription", pulsarBatchDuration: batchDuration, diff --git a/internal/common/mocks/generate.go b/internal/common/mocks/generate.go index 44c1c4391a5..3b190430ba4 100644 --- a/internal/common/mocks/generate.go +++ b/internal/common/mocks/generate.go @@ -2,4 +2,5 @@ package mocks // Mock implementations used by tests //go:generate mockgen -destination=./mock_pulsar.go -package=mocks "github.com/apache/pulsar-client-go/pulsar" Client,Producer,Message +//go:generate mockgen -destination=./mock_publisher.go -package=mocks "github.com/armadaproject/armada/internal/common/pulsarutils" Publisher //go:generate mockgen -destination=./mock_executorapi.go -package=mocks "github.com/armadaproject/armada/pkg/executorapi" ExecutorApiClient,ExecutorApi_LeaseJobRunsClient diff --git a/internal/common/mocks/mock_publisher.go b/internal/common/mocks/mock_publisher.go new file mode 100644 index 00000000000..9d6f060b745 --- /dev/null +++ b/internal/common/mocks/mock_publisher.go @@ -0,0 +1,67 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/armadaproject/armada/internal/common/pulsarutils (interfaces: Publisher) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + armadacontext "github.com/armadaproject/armada/internal/common/armadacontext" + armadaevents "github.com/armadaproject/armada/pkg/armadaevents" + gomock "github.com/golang/mock/gomock" +) + +// MockPublisher is a mock of Publisher interface. +type MockPublisher struct { + ctrl *gomock.Controller + recorder *MockPublisherMockRecorder +} + +// MockPublisherMockRecorder is the mock recorder for MockPublisher. +type MockPublisherMockRecorder struct { + mock *MockPublisher +} + +// NewMockPublisher creates a new mock instance. +func NewMockPublisher(ctrl *gomock.Controller) *MockPublisher { + mock := &MockPublisher{ctrl: ctrl} + mock.recorder = &MockPublisherMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPublisher) EXPECT() *MockPublisherMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockPublisher) Close() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Close") +} + +// Close indicates an expected call of Close. +func (mr *MockPublisherMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockPublisher)(nil).Close)) +} + +// PublishMessages mocks base method. +func (m *MockPublisher) PublishMessages(arg0 *armadacontext.Context, arg1 ...*armadaevents.EventSequence) error { + m.ctrl.T.Helper() + varargs := []interface{}{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "PublishMessages", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// PublishMessages indicates an expected call of PublishMessages. +func (mr *MockPublisherMockRecorder) PublishMessages(arg0 interface{}, arg1 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PublishMessages", reflect.TypeOf((*MockPublisher)(nil).PublishMessages), varargs...) +} diff --git a/internal/common/pulsarutils/eventsequence.go b/internal/common/pulsarutils/eventsequence.go deleted file mode 100644 index 270f6779837..00000000000 --- a/internal/common/pulsarutils/eventsequence.go +++ /dev/null @@ -1,102 +0,0 @@ -package pulsarutils - -import ( - "sync/atomic" - - "github.com/apache/pulsar-client-go/pulsar" - "github.com/gogo/protobuf/proto" - "github.com/hashicorp/go-multierror" - "github.com/pkg/errors" - - "github.com/armadaproject/armada/internal/common/armadacontext" - "github.com/armadaproject/armada/internal/common/eventutil" - "github.com/armadaproject/armada/internal/common/requestid" - "github.com/armadaproject/armada/pkg/armadaevents" -) - -// CompactAndPublishSequences reduces the number of sequences to the smallest possible, -// while respecting per-job set ordering and max Pulsar message size, and then publishes to Pulsar. -func CompactAndPublishSequences(ctx *armadacontext.Context, sequences []*armadaevents.EventSequence, producer pulsar.Producer, maxEventsPerMessage int, maxMessageSizeInBytes uint) error { - // Reduce the number of sequences to send to the minimum possible, - sequences = eventutil.CompactEventSequences(sequences) - // Limit each sequence to have no more than maxEventsPerSequence events per sequence - sequences = eventutil.LimitSequencesEventMessageCount(sequences, maxEventsPerMessage) - // Limit each sequence to be no larger than maxMessageSizeInBytes bytes - sequences, err := eventutil.LimitSequencesByteSize(sequences, maxMessageSizeInBytes, true) - if err != nil { - return err - } - return PublishSequences(ctx, producer, sequences) -} - -// PublishSequences publishes several event sequences to Pulsar. -// For efficiency, all sequences are queued for publishing and then flushed. -// Returns once all sequences have been received by Pulsar. -// -// To reduce the number of separate sequences sent and ensure limit message size, call -// eventutil.CompactEventSequences(sequences) -// and -// eventutil.LimitSequencesByteSize(sequences, int(srv.MaxAllowedMessageSize)) -// before passing to this function. -func PublishSequences(ctx *armadacontext.Context, producer pulsar.Producer, sequences []*armadaevents.EventSequence) error { - // Incoming gRPC requests are annotated with a unique id. - // Pass this id through the log by adding it to the Pulsar message properties. - requestId := requestid.FromContextOrMissing(ctx) - - // First, serialise all payloads, - // to avoid a partial failure where some sequence fails to serialise - // after other sequences have already been sent. - payloads := make([][]byte, len(sequences)) - for i, sequence := range sequences { - if sequence == nil { - return errors.Errorf("failed to send sequence %v", sequence) - } - payload, err := proto.Marshal(sequence) - if err != nil { - return errors.WithStack(err) - } - payloads[i] = payload - } - - // Then, send all sequences concurrently (while respecting order), - // using Pulsar async send. Collect any errors via ch. - // ch must be buffered to avoid sending on ch blocking, - // which is not allowed in the callback. - ch := make(chan error, len(sequences)) - var numSendCompleted uint32 - for i := range sequences { - producer.SendAsync( - ctx, - &pulsar.ProducerMessage{ - Payload: payloads[i], - Properties: map[string]string{ - requestid.MetadataKey: requestId, - }, - Key: sequences[i].JobSetName, - }, - // Callback on send. - func(_ pulsar.MessageID, _ *pulsar.ProducerMessage, err error) { - ch <- err - - // The final send to complete is responsible for closing the channel. - isFinalCallback := atomic.AddUint32(&numSendCompleted, 1) == uint32(len(sequences)) - if isFinalCallback { - close(ch) - } - }, - ) - } - - // Wait for all async send calls to complete, collect any errors, and return. - var result *multierror.Error - for range sequences { - select { - case <-ctx.Done(): - result = multierror.Append(result, ctx.Err()) - return result.ErrorOrNil() - case err := <-ch: - result = multierror.Append(result, err) - } - } - return result.ErrorOrNil() -} diff --git a/internal/common/pulsarutils/eventsequence_test.go b/internal/common/pulsarutils/eventsequence_test.go deleted file mode 100644 index 4801aa60534..00000000000 --- a/internal/common/pulsarutils/eventsequence_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package pulsarutils - -import ( - "context" - "testing" - "time" - - "github.com/apache/pulsar-client-go/pulsar" - "github.com/pkg/errors" - "github.com/stretchr/testify/assert" - - "github.com/armadaproject/armada/internal/common/armadacontext" - "github.com/armadaproject/armada/pkg/armadaevents" -) - -func TestPublishSequences_SendAsyncErr(t *testing.T) { - producer := &mockProducer{} - err := PublishSequences(armadacontext.Background(), producer, []*armadaevents.EventSequence{{}}) - assert.NoError(t, err) - - producer = &mockProducer{ - sendAsyncErr: errors.New("sendAsyncErr"), - } - err = PublishSequences(armadacontext.Background(), producer, []*armadaevents.EventSequence{{}}) - assert.ErrorIs(t, err, producer.sendAsyncErr) -} - -func TestPublishSequences_RespectTimeout(t *testing.T) { - producer := &mockProducer{ - sendAsyncDuration: 1 * time.Second, - } - ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), time.Millisecond) - defer cancel() - err := PublishSequences(ctx, producer, []*armadaevents.EventSequence{{}}) - assert.ErrorIs(t, err, context.DeadlineExceeded) -} - -type mockProducer struct { - sendDuration time.Duration - sendAsyncDuration time.Duration - sendErr error - sendAsyncErr error - flushErr error -} - -func (producer *mockProducer) Topic() string { - return "topic" -} - -func (producer *mockProducer) Name() string { - return "name" -} - -func (producer *mockProducer) Send(context.Context, *pulsar.ProducerMessage) (pulsar.MessageID, error) { - time.Sleep(producer.sendDuration) - return nil, producer.sendErr -} - -func (producer *mockProducer) SendAsync(_ context.Context, _ *pulsar.ProducerMessage, f func(pulsar.MessageID, *pulsar.ProducerMessage, error)) { - time.Sleep(producer.sendAsyncDuration) - go f(nil, nil, producer.sendAsyncErr) -} - -func (producer *mockProducer) LastSequenceID() int64 { - return 0 -} - -func (producer *mockProducer) Flush() error { - return producer.flushErr -} - -func (producer *mockProducer) Close() {} diff --git a/internal/common/pulsarutils/publisher.go b/internal/common/pulsarutils/publisher.go index f4d32e84005..051ac41e34f 100644 --- a/internal/common/pulsarutils/publisher.go +++ b/internal/common/pulsarutils/publisher.go @@ -1,16 +1,22 @@ package pulsarutils import ( + "sync" + "time" + "github.com/apache/pulsar-client-go/pulsar" + "github.com/gogo/protobuf/proto" "github.com/pkg/errors" "github.com/armadaproject/armada/internal/common/armadacontext" + "github.com/armadaproject/armada/internal/common/eventutil" + "github.com/armadaproject/armada/internal/common/logging" "github.com/armadaproject/armada/pkg/armadaevents" ) // Publisher is an interface to be implemented by structs that handle publishing messages to pulsar type Publisher interface { - PublishMessages(ctx *armadacontext.Context, es *armadaevents.EventSequence) error + PublishMessages(ctx *armadacontext.Context, events ...*armadaevents.EventSequence) error Close() } @@ -23,6 +29,8 @@ type PulsarPublisher struct { // Maximum size (in bytes) of produced pulsar messages. // This must be below 4MB which is the pulsar message size limit maxAllowedMessageSize uint + // Timeout after which async messages sends will be considered failed + sendTimeout time.Duration } func NewPulsarPublisher( @@ -30,6 +38,7 @@ func NewPulsarPublisher( producerOptions pulsar.ProducerOptions, maxEventsPerMessage int, maxAllowedMessageSize uint, + sendTimeout time.Duration, ) (*PulsarPublisher, error) { producer, err := pulsarClient.CreateProducer(producerOptions) if err != nil { @@ -39,18 +48,55 @@ func NewPulsarPublisher( producer: producer, maxEventsPerMessage: maxEventsPerMessage, maxAllowedMessageSize: maxAllowedMessageSize, + sendTimeout: sendTimeout, }, nil } // PublishMessages publishes all event sequences to pulsar. Event sequences for a given jobset will be combined into // single event sequences up to maxMessageBatchSize. -func (p *PulsarPublisher) PublishMessages(ctx *armadacontext.Context, es *armadaevents.EventSequence) error { - return CompactAndPublishSequences( - ctx, - []*armadaevents.EventSequence{es}, - p.producer, - p.maxEventsPerMessage, - p.maxAllowedMessageSize) +func (p *PulsarPublisher) PublishMessages(ctx *armadacontext.Context, events ...*armadaevents.EventSequence) error { + sequences := eventutil.CompactEventSequences(events) + sequences = eventutil.LimitSequencesEventMessageCount(sequences, p.maxEventsPerMessage) + sequences, err := eventutil.LimitSequencesByteSize(sequences, p.maxAllowedMessageSize, true) + if err != nil { + return err + } + msgs := make([]*pulsar.ProducerMessage, len(sequences)) + for i, sequence := range sequences { + bytes, err := proto.Marshal(sequence) + if err != nil { + return err + } + msgs[i] = &pulsar.ProducerMessage{ + Payload: bytes, + Key: sequences[i].JobSetName, + } + } + + wg := sync.WaitGroup{} + wg.Add(len(msgs)) + + // Send messages + sendCtx, cancel := armadacontext.WithTimeout(ctx, p.sendTimeout) + errored := false + for _, msg := range msgs { + p.producer.SendAsync(sendCtx, msg, func(_ pulsar.MessageID, _ *pulsar.ProducerMessage, err error) { + if err != nil { + logging. + WithStacktrace(ctx, err). + Error("error sending message to Pulsar") + errored = true + } + wg.Done() + }) + } + wg.Wait() + cancel() + if errored { + return errors.New("One or more messages failed to send to Pulsar") + } + + return nil } func (p *PulsarPublisher) Close() { diff --git a/internal/common/pulsarutils/publisher_test.go b/internal/common/pulsarutils/publisher_test.go new file mode 100644 index 00000000000..404537dd840 --- /dev/null +++ b/internal/common/pulsarutils/publisher_test.go @@ -0,0 +1,250 @@ +package pulsarutils + +import ( + "math" + "testing" + "time" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/gogo/protobuf/proto" + "github.com/golang/mock/gomock" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/armadaproject/armada/internal/common/armadacontext" + "github.com/armadaproject/armada/internal/common/mocks" + "github.com/armadaproject/armada/pkg/armadaevents" +) + +const ( + topic = "testTopic" + sendTimeout = time.Millisecond * 100 + defaultMaxEventsPerMessage = 1000 + defaultMaxAllowedMessageSize = 1024 * 1024 +) + +func TestPublishMessages(t *testing.T) { + tests := map[string]struct { + eventSequences []*armadaevents.EventSequence + expectedNumberOfMessages int + expectedNumberOfEvents int + maxEventsPerMessage int + maxAllowedMessageSize uint + }{ + "compacts events to reduce message count": { + // same jobset, so should get compacted into a single message + eventSequences: []*armadaevents.EventSequence{ + { + JobSetName: "jobset1", + Events: []*armadaevents.EventSequence_Event{{}, {}}, + }, + { + JobSetName: "jobset2", + Events: []*armadaevents.EventSequence_Event{{}}, + }, + { + JobSetName: "jobset1", + Events: []*armadaevents.EventSequence_Event{{}}, + }, + }, + expectedNumberOfMessages: 2, + expectedNumberOfEvents: 4, + }, + "maxEventsPerMessage": { + eventSequences: []*armadaevents.EventSequence{ + { + JobSetName: "jobset1", + Events: []*armadaevents.EventSequence_Event{{}, {}, {}, {}}, + }, + { + JobSetName: "jobset2", + Events: []*armadaevents.EventSequence_Event{{}}, + }, + }, + maxEventsPerMessage: 2, + expectedNumberOfMessages: 3, + expectedNumberOfEvents: 5, + }, + "maxAllowedMessageSize": { + eventSequences: []*armadaevents.EventSequence{ + { + JobSetName: "jobset1", + Events: []*armadaevents.EventSequence_Event{ + { + Event: &armadaevents.EventSequence_Event_SubmitJob{ + SubmitJob: &armadaevents.SubmitJob{ + DeduplicationId: "1000000000000000000000000000000000000", + }, + }, + }, + { + Event: &armadaevents.EventSequence_Event_SubmitJob{ + SubmitJob: &armadaevents.SubmitJob{ + DeduplicationId: "2000000000000000000000000000000000000", + }, + }, + }, + }, + }, + }, + // Set this very low, expect it to spit every event into its own message + maxAllowedMessageSize: 160, + expectedNumberOfMessages: 2, + expectedNumberOfEvents: 2, + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second) + defer cancel() + + publisher, mockPulsarProducer := setUpPublisherTest(t, tc.maxEventsPerMessage, tc.maxAllowedMessageSize) + numberOfMessagesPublished := 0 + numberOfEventsPublished := 0 + var capturedEvents []*armadaevents.EventSequence + expectedCounts := countEvents(tc.eventSequences) + + mockPulsarProducer. + EXPECT(). + SendAsync(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx *armadacontext.Context, msg *pulsar.ProducerMessage, callback func(pulsar.MessageID, *pulsar.ProducerMessage, error)) { + numberOfMessagesPublished++ + es := &armadaevents.EventSequence{} + err := proto.Unmarshal(msg.Payload, es) + require.NoError(t, err) + capturedEvents = append(capturedEvents, es) + numberOfEventsPublished += len(es.Events) + callback(NewMessageId(numberOfMessagesPublished), msg, nil) + }).AnyTimes() + + err := publisher.PublishMessages(ctx, tc.eventSequences...) + + assert.NoError(t, err) + capturedCounts := countEvents(capturedEvents) + assert.Equal(t, expectedCounts, capturedCounts) + assert.Equal(t, tc.expectedNumberOfMessages, numberOfMessagesPublished) + assert.Equal(t, tc.expectedNumberOfEvents, numberOfEventsPublished) + }) + } +} + +func TestPublishMessages_HandlesFailureModes(t *testing.T) { + tests := map[string]struct { + publishDuration time.Duration + numSuccessfulPublishes int + expectedNumberOfMessages int + expectedNumberOfEvents int + eventSequences []*armadaevents.EventSequence + }{ + "Returns error if all eventSequences fail to publish": { + numSuccessfulPublishes: 0, + eventSequences: []*armadaevents.EventSequence{ + { + JobSetName: "jobset1", + Events: []*armadaevents.EventSequence_Event{{}, {}}, + }, + }, + expectedNumberOfMessages: 0, + expectedNumberOfEvents: 0, + }, + "Returns error if some eventSequences fail to publish": { + numSuccessfulPublishes: 1, + eventSequences: []*armadaevents.EventSequence{ + { + JobSetName: "jobset1", + Events: []*armadaevents.EventSequence_Event{{}, {}}, + }, + { + JobSetName: "jobset2", + Events: []*armadaevents.EventSequence_Event{{}, {}}, + }, + }, + expectedNumberOfMessages: 1, + expectedNumberOfEvents: 2, + }, + "Returns error if publishing exceeds send timeout": { + numSuccessfulPublishes: math.MaxInt, + publishDuration: sendTimeout * 2, + eventSequences: []*armadaevents.EventSequence{ + { + JobSetName: "jobset1", + Events: []*armadaevents.EventSequence_Event{{}, {}}, + }, + }, + expectedNumberOfMessages: 0, + expectedNumberOfEvents: 0, + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second) + defer cancel() + + publisher, mockPulsarProducer := setUpPublisherTest(t, defaultMaxEventsPerMessage, defaultMaxAllowedMessageSize) + numberOfMessagesPublished := 0 + numberOfEventsPublished := 0 + + mockPulsarProducer. + EXPECT(). + SendAsync(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx *armadacontext.Context, msg *pulsar.ProducerMessage, callback func(pulsar.MessageID, *pulsar.ProducerMessage, error)) { + // A little bit shonky here, we're assuming the pulsar SendAsync actually returns an error on context expiry + time.Sleep(tc.publishDuration) + if ctx.Err() != nil { + callback(nil, msg, ctx.Err()) + return + } + if numberOfMessagesPublished >= tc.numSuccessfulPublishes { + callback(NewMessageId(numberOfMessagesPublished), msg, errors.New("error from mock pulsar producer")) + } else { + es := &armadaevents.EventSequence{} + err := proto.Unmarshal(msg.Payload, es) + require.NoError(t, err) + + numberOfMessagesPublished++ + numberOfEventsPublished += len(es.Events) + callback(NewMessageId(numberOfMessagesPublished), msg, nil) + } + }).AnyTimes() + + err := publisher.PublishMessages(ctx, tc.eventSequences...) + + assert.Error(t, err) + assert.Equal(t, tc.expectedNumberOfMessages, numberOfMessagesPublished) + assert.Equal(t, tc.expectedNumberOfEvents, numberOfEventsPublished) + }) + } +} + +func setUpPublisherTest(t *testing.T, maxEventsPerMessage int, maxAllowedMessageSize uint) (*PulsarPublisher, *mocks.MockProducer) { + ctrl := gomock.NewController(t) + mockPulsarClient := mocks.NewMockClient(ctrl) + mockPulsarProducer := mocks.NewMockProducer(ctrl) + mockPulsarClient.EXPECT().CreateProducer(gomock.Any()).Return(mockPulsarProducer, nil).Times(1) + + if maxAllowedMessageSize == 0 { + maxAllowedMessageSize = defaultMaxAllowedMessageSize + } + + if maxEventsPerMessage == 0 { + maxEventsPerMessage = defaultMaxEventsPerMessage + } + + options := pulsar.ProducerOptions{Topic: topic} + publisher, err := NewPulsarPublisher(mockPulsarClient, options, maxEventsPerMessage, maxAllowedMessageSize, sendTimeout) + require.NoError(t, err) + + return publisher, mockPulsarProducer +} + +func countEvents(es []*armadaevents.EventSequence) map[string]int { + countsById := make(map[string]int) + for _, sequence := range es { + jobset := sequence.JobSetName + count := countsById[jobset] + count += len(sequence.Events) + countsById[jobset] = count + } + return countsById +} diff --git a/internal/scheduler/api.go b/internal/scheduler/api.go index 5dc9cb8a2ce..c1f8985c040 100644 --- a/internal/scheduler/api.go +++ b/internal/scheduler/api.go @@ -5,7 +5,6 @@ import ( "strconv" "strings" - "github.com/apache/pulsar-client-go/pulsar" "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" "github.com/google/uuid" @@ -31,8 +30,8 @@ const armadaJobPreemptibleLabel = "armada_preemptible" // ExecutorApi is the gRPC service executors use to synchronise their state with that of the scheduler. type ExecutorApi struct { - // Used to send Pulsar messages when, e.g., executors report a job has finished. - producer pulsar.Producer + // Used to send event sequences received from the executors about job state change to Pulsar + publisher pulsarutils.Publisher // Interface to the component storing job information, such as which jobs are leased to a particular executor. jobRepository database.JobRepository // Interface to the component storing executor information, such as which when we last heard from an executor. @@ -44,18 +43,13 @@ type ExecutorApi struct { // Allowed resource names - resource requests/limits not on this list are dropped. // This is needed to ensure floating resources are not passed to k8s. allowedResources map[string]bool - // Max number of events in published Pulsar messages - maxEventsPerPulsarMessage int - // Max size of Pulsar messages produced. - maxPulsarMessageSizeBytes uint - // See scheduling schedulingConfig. - nodeIdLabel string + nodeIdLabel string // See scheduling schedulingConfig. priorityClassNameOverride *string clock clock.Clock } -func NewExecutorApi(producer pulsar.Producer, +func NewExecutorApi(publisher pulsarutils.Publisher, jobRepository database.JobRepository, executorRepository database.ExecutorRepository, allowedPriorities []int32, @@ -63,20 +57,16 @@ func NewExecutorApi(producer pulsar.Producer, nodeIdLabel string, priorityClassNameOverride *string, priorityClasses map[string]priorityTypes.PriorityClass, - maxEventsPerPulsarMessage int, - maxPulsarMessageSizeBytes uint, ) (*ExecutorApi, error) { if len(allowedPriorities) == 0 { return nil, errors.New("allowedPriorities cannot be empty") } return &ExecutorApi{ - producer: producer, + publisher: publisher, jobRepository: jobRepository, executorRepository: executorRepository, allowedPriorities: allowedPriorities, allowedResources: maps.FromSlice(allowedResources, func(name string) string { return name }, func(name string) bool { return true }), - maxEventsPerPulsarMessage: maxEventsPerPulsarMessage, - maxPulsarMessageSizeBytes: maxPulsarMessageSizeBytes, nodeIdLabel: nodeIdLabel, priorityClassNameOverride: priorityClassNameOverride, priorityClasses: priorityClasses, @@ -345,7 +335,7 @@ func addAnnotations(job *armadaevents.SubmitJob, annotations map[string]string) // ReportEvents publishes all eventSequences to Pulsar. The eventSequences are compacted for more efficient publishing. func (srv *ExecutorApi) ReportEvents(grpcCtx context.Context, list *executorapi.EventList) (*types.Empty, error) { ctx := armadacontext.FromGrpcCtx(grpcCtx) - err := pulsarutils.CompactAndPublishSequences(ctx, list.Events, srv.producer, srv.maxEventsPerPulsarMessage, srv.maxPulsarMessageSizeBytes) + err := srv.publisher.PublishMessages(ctx, list.GetEvents()...) return &types.Empty{}, err } diff --git a/internal/scheduler/api_test.go b/internal/scheduler/api_test.go index 18649f31458..fee87828929 100644 --- a/internal/scheduler/api_test.go +++ b/internal/scheduler/api_test.go @@ -1,11 +1,9 @@ package scheduler import ( - "context" "testing" "time" - "github.com/apache/pulsar-client-go/pulsar" "github.com/gogo/protobuf/proto" "github.com/golang/mock/gomock" "github.com/google/uuid" @@ -18,7 +16,6 @@ import ( "github.com/armadaproject/armada/internal/common/compress" "github.com/armadaproject/armada/internal/common/mocks" protoutil "github.com/armadaproject/armada/internal/common/proto" - "github.com/armadaproject/armada/internal/common/pulsarutils" "github.com/armadaproject/armada/internal/common/slices" "github.com/armadaproject/armada/internal/common/types" schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" @@ -293,7 +290,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) { t.Run(name, func(t *testing.T) { ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second) ctrl := gomock.NewController(t) - mockPulsarProducer := mocks.NewMockProducer(ctrl) + mockPulsarPublisher := mocks.NewMockPublisher(ctrl) mockJobRepository := schedulermocks.NewMockJobRepository(ctrl) mockExecutorRepository := schedulermocks.NewMockExecutorRepository(ctrl) mockStream := schedulermocks.NewMockExecutorApi_LeaseJobRunsServer(ctrl) @@ -319,7 +316,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) { }).AnyTimes() server, err := NewExecutorApi( - mockPulsarProducer, + mockPulsarPublisher, mockJobRepository, mockExecutorRepository, []int32{1000, 2000}, @@ -327,8 +324,6 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) { "kubernetes.io/hostname", nil, priorityClasses, - 1000, - 4*1024*1024, ) require.NoError(t, err) server.clock = testClock @@ -429,25 +424,24 @@ func TestExecutorApi_Publish(t *testing.T) { t.Run(name, func(t *testing.T) { ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second) ctrl := gomock.NewController(t) - mockPulsarProducer := mocks.NewMockProducer(ctrl) + mockPulsarPublisher := mocks.NewMockPublisher(ctrl) mockJobRepository := schedulermocks.NewMockJobRepository(ctrl) mockExecutorRepository := schedulermocks.NewMockExecutorRepository(ctrl) // capture all sent messages var capturedEvents []*armadaevents.EventSequence - mockPulsarProducer. + mockPulsarPublisher. EXPECT(). - SendAsync(gomock.Any(), gomock.Any(), gomock.Any()). - DoAndReturn(func(_ context.Context, msg *pulsar.ProducerMessage, callback func(pulsar.MessageID, *pulsar.ProducerMessage, error)) { - es := &armadaevents.EventSequence{} - err := proto.Unmarshal(msg.Payload, es) - require.NoError(t, err) - capturedEvents = append(capturedEvents, es) - callback(pulsarutils.NewMessageId(1), msg, nil) + PublishMessages(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(_ *armadacontext.Context, sequences ...*armadaevents.EventSequence) error { + for _, es := range sequences { + capturedEvents = append(capturedEvents, es) + } + return nil }).AnyTimes() server, err := NewExecutorApi( - mockPulsarProducer, + mockPulsarPublisher, mockJobRepository, mockExecutorRepository, []int32{1000, 2000}, @@ -455,8 +449,6 @@ func TestExecutorApi_Publish(t *testing.T) { "kubernetes.io/hostname", nil, priorityClasses, - 1000, - 4*1024*1024, ) require.NoError(t, err) diff --git a/internal/scheduler/configuration/configuration.go b/internal/scheduler/configuration/configuration.go index b2b9fb1c629..0b1589a54da 100644 --- a/internal/scheduler/configuration/configuration.go +++ b/internal/scheduler/configuration/configuration.go @@ -57,8 +57,6 @@ type Configuration struct { ExecutorTimeout time.Duration `validate:"required"` // Maximum number of rows to fetch in a given query DatabaseFetchSize int `validate:"required"` - // Timeout to use when sending messages to pulsar - PulsarSendTimeout time.Duration `validate:"required"` // Frequency at which queues will be fetched from the API QueueRefreshPeriod time.Duration `validate:"required"` // If true then submit checks will be skipped diff --git a/internal/scheduler/publisher.go b/internal/scheduler/publisher.go index 2478983f6cc..c0729463158 100644 --- a/internal/scheduler/publisher.go +++ b/internal/scheduler/publisher.go @@ -3,7 +3,6 @@ package scheduler import ( "fmt" "strconv" - "sync" "time" "github.com/apache/pulsar-client-go/pulsar" @@ -13,15 +12,12 @@ import ( "github.com/pkg/errors" "github.com/armadaproject/armada/internal/common/armadacontext" - "github.com/armadaproject/armada/internal/common/eventutil" - "github.com/armadaproject/armada/internal/common/logging" + "github.com/armadaproject/armada/internal/common/pulsarutils" "github.com/armadaproject/armada/pkg/armadaevents" ) const ( - // This is half the default pulsar BatchingMaxSize - defaultMaxMessageBatchSize = 64 * 1024 - explicitPartitionKey = "armada_pulsar_partition" + explicitPartitionKey = "armada_pulsar_partition" ) // Publisher is an interface to be implemented by structs that handle publishing messages to pulsar @@ -38,96 +34,53 @@ type Publisher interface { // PulsarPublisher is the default implementation of Publisher type PulsarPublisher struct { - // Used to send messages to pulsar + // Used to send events sequences to pulsar + publisher pulsarutils.Publisher + // Used to send position markers to pulsar producer pulsar.Producer // Number of partitions on the pulsar topic numPartitions int - // Timeout after which async messages sends will be considered failed - pulsarSendTimeout time.Duration - // Maximum number of Events in each EventSequence - maxEventsPerMessage int - // Maximum size (in bytes) of produced pulsar messages. - // This must be below 4MB which is the pulsar message size limit - maxMessageBatchSize uint } func NewPulsarPublisher( pulsarClient pulsar.Client, producerOptions pulsar.ProducerOptions, maxEventsPerMessage int, - pulsarSendTimeout time.Duration, + maxAllowedMessageSize uint, + sendTimeout time.Duration, ) (*PulsarPublisher, error) { + id := uuid.NewString() + producerOptions.Name = fmt.Sprintf("armada-scheduler-events-%s", id) + publisher, err := pulsarutils.NewPulsarPublisher(pulsarClient, producerOptions, maxEventsPerMessage, maxAllowedMessageSize, sendTimeout) + if err != nil { + return nil, errors.WithStack(err) + } + partitions, err := pulsarClient.TopicPartitions(producerOptions.Topic) if err != nil { return nil, errors.WithStack(err) } + producerOptions.Name = fmt.Sprintf("armada-scheduler-partitions-%s", id) producerOptions.MessageRouter = createMessageRouter(producerOptions) producer, err := pulsarClient.CreateProducer(producerOptions) if err != nil { return nil, errors.WithStack(err) } - maxMessageBatchSize := producerOptions.BatchingMaxSize / 2 - if maxMessageBatchSize <= 0 { - maxMessageBatchSize = defaultMaxMessageBatchSize - } + return &PulsarPublisher{ - producer: producer, - pulsarSendTimeout: pulsarSendTimeout, - maxEventsPerMessage: maxEventsPerMessage, - maxMessageBatchSize: maxMessageBatchSize, - numPartitions: len(partitions), + publisher: publisher, + producer: producer, + numPartitions: len(partitions), }, nil } -// PublishMessages publishes all event sequences to pulsar. Event sequences for a given jobset will be combined into -// single event sequences up to maxMessageBatchSize. +// PublishMessages publishes all event sequences to pulsar if shouldPublish() returns true func (p *PulsarPublisher) PublishMessages(ctx *armadacontext.Context, events []*armadaevents.EventSequence, shouldPublish func() bool) error { - sequences := eventutil.CompactEventSequences(events) - sequences = eventutil.LimitSequencesEventMessageCount(sequences, p.maxEventsPerMessage) - sequences, err := eventutil.LimitSequencesByteSize(sequences, p.maxMessageBatchSize, true) - if err != nil { - return err - } - msgs := make([]*pulsar.ProducerMessage, len(sequences)) - for i, sequence := range sequences { - bytes, err := proto.Marshal(sequence) - if err != nil { - return err - } - msgs[i] = &pulsar.ProducerMessage{ - Payload: bytes, - Key: sequences[i].JobSetName, - } - } - - wg := sync.WaitGroup{} - wg.Add(len(msgs)) - - // Send messages if shouldPublish() { - ctx.Debugf("Am leader so will publish") - sendCtx, cancel := armadacontext.WithTimeout(ctx, p.pulsarSendTimeout) - errored := false - for _, msg := range msgs { - p.producer.SendAsync(sendCtx, msg, func(_ pulsar.MessageID, _ *pulsar.ProducerMessage, err error) { - if err != nil { - logging. - WithStacktrace(ctx, err). - Error("error sending message to Pulsar") - errored = true - } - wg.Done() - }) - } - wg.Wait() - cancel() - if errored { - return errors.New("One or more messages failed to send to Pulsar") - } + return p.publisher.PublishMessages(ctx, events...) } else { return errors.New("Failed to publish as no longer leader") } - return nil } // PublishMarkers sends one pulsar message (containing an armadaevents.PartitionMarker) to each partition diff --git a/internal/scheduler/publisher_test.go b/internal/scheduler/publisher_test.go index c1135603cf8..66a42a20453 100644 --- a/internal/scheduler/publisher_test.go +++ b/internal/scheduler/publisher_test.go @@ -95,7 +95,7 @@ func TestPulsarPublisher_TestPublish(t *testing.T) { ctrl := gomock.NewController(t) mockPulsarClient := mocks.NewMockClient(ctrl) mockPulsarProducer := mocks.NewMockProducer(ctrl) - mockPulsarClient.EXPECT().CreateProducer(gomock.Any()).Return(mockPulsarProducer, nil).Times(1) + mockPulsarClient.EXPECT().CreateProducer(gomock.Any()).Return(mockPulsarProducer, nil).Times(2) mockPulsarClient.EXPECT().TopicPartitions(topic).Return(make([]string, numPartitions), nil) numPublished := 0 var capturedEvents []*armadaevents.EventSequence @@ -121,7 +121,7 @@ func TestPulsarPublisher_TestPublish(t *testing.T) { }).AnyTimes() options := pulsar.ProducerOptions{Topic: topic} - publisher, err := NewPulsarPublisher(mockPulsarClient, options, 1000, 5*time.Second) + publisher, err := NewPulsarPublisher(mockPulsarClient, options, 1000, 4*1024*1024, 5*time.Second) require.NoError(t, err) err = publisher.PublishMessages(ctx, tc.eventSequences, func() bool { return tc.amLeader }) @@ -170,7 +170,7 @@ func TestPulsarPublisher_TestPublishMarkers(t *testing.T) { ctrl := gomock.NewController(t) mockPulsarClient := mocks.NewMockClient(ctrl) mockPulsarProducer := mocks.NewMockProducer(ctrl) - mockPulsarClient.EXPECT().CreateProducer(gomock.Any()).Return(mockPulsarProducer, nil).Times(1) + mockPulsarClient.EXPECT().CreateProducer(gomock.Any()).Return(mockPulsarProducer, nil).Times(2) mockPulsarClient.EXPECT().TopicPartitions(topic).Return(make([]string, numPartitions), nil) numPublished := 0 capturedPartitions := make(map[string]bool) @@ -192,7 +192,7 @@ func TestPulsarPublisher_TestPublishMarkers(t *testing.T) { options := pulsar.ProducerOptions{Topic: topic} ctx := armadacontext.TODO() - publisher, err := NewPulsarPublisher(mockPulsarClient, options, 1000, 5*time.Second) + publisher, err := NewPulsarPublisher(mockPulsarClient, options, 1000, 4*1024*1024, 5*time.Second) require.NoError(t, err) published, err := publisher.PublishMarkers(ctx, uuid.New()) diff --git a/internal/scheduler/schedulerapp.go b/internal/scheduler/schedulerapp.go index 6d6cfcc11bd..d7cdb548b04 100644 --- a/internal/scheduler/schedulerapp.go +++ b/internal/scheduler/schedulerapp.go @@ -136,7 +136,7 @@ func Run(config schedulerconfig.Configuration) error { CompressionLevel: config.Pulsar.CompressionLevel, BatchingMaxSize: config.Pulsar.MaxAllowedMessageSize, Topic: config.Pulsar.JobsetEventsTopic, - }, config.Pulsar.MaxAllowedEventsPerMessage, config.PulsarSendTimeout) + }, config.Pulsar.MaxAllowedEventsPerMessage, config.Pulsar.MaxAllowedMessageSize, config.Pulsar.SendTimeout) if err != nil { return errors.WithMessage(err, "error creating pulsar publisher") } @@ -154,17 +154,18 @@ func Run(config schedulerconfig.Configuration) error { // Executor Api // //////////////////////////////////////////////////////////////////////// ctx.Infof("Setting up executor api") - apiProducer, err := pulsarClient.CreateProducer(pulsar.ProducerOptions{ + apiPublisher, err := pulsarutils.NewPulsarPublisher(pulsarClient, pulsar.ProducerOptions{ Name: fmt.Sprintf("armada-executor-api-%s", uuid.NewString()), CompressionType: config.Pulsar.CompressionType, CompressionLevel: config.Pulsar.CompressionLevel, BatchingMaxSize: config.Pulsar.MaxAllowedMessageSize, Topic: config.Pulsar.JobsetEventsTopic, - }) + }, config.Pulsar.MaxAllowedEventsPerMessage, config.Pulsar.MaxAllowedMessageSize, config.Pulsar.SendTimeout) if err != nil { - return errors.Wrapf(err, "error creating pulsar producer for executor api") + return errors.Wrapf(err, "error creating pulsar publisher for executor api") } - defer apiProducer.Close() + defer apiPublisher.Close() + authServices, err := auth.ConfigureAuth(config.Auth) if err != nil { return errors.WithMessage(err, "error creating auth services") @@ -176,7 +177,7 @@ func Run(config schedulerconfig.Configuration) error { return errors.WithMessage(err, "error setting up gRPC server") } executorServer, err := NewExecutorApi( - apiProducer, + apiPublisher, jobRepository, executorRepository, types.AllowedPriorities(config.Scheduling.PriorityClasses), @@ -184,8 +185,6 @@ func Run(config schedulerconfig.Configuration) error { config.Scheduling.NodeIdLabel, config.Scheduling.PriorityClassNameOverride, config.Scheduling.PriorityClasses, - config.Pulsar.MaxAllowedEventsPerMessage, - config.Pulsar.MaxAllowedMessageSize, ) if err != nil { return errors.WithMessage(err, "error creating executorApi")