From 7ac7f90206b8beae6d4895604301d5cb709da0bc Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Mon, 22 Jul 2024 13:50:53 +0100 Subject: [PATCH] Add test for pulsarBatchSize on ingestion pipeline (#3755) * Add test for pulsarBatchSize on ingestion pipeline This test just confirms pulsarBatchSize is limit the number of events we process at once correctly Signed-off-by: JamesMurkin * Merge master - fix proto time Signed-off-by: JamesMurkin * Gofumpt Signed-off-by: JamesMurkin --------- Signed-off-by: JamesMurkin --- .../common/ingest/ingestion_pipeline_test.go | 91 ++++++++++++++++++- 1 file changed, 88 insertions(+), 3 deletions(-) diff --git a/internal/common/ingest/ingestion_pipeline_test.go b/internal/common/ingest/ingestion_pipeline_test.go index 51a4673b696..32f2644fc6e 100644 --- a/internal/common/ingest/ingestion_pipeline_test.go +++ b/internal/common/ingest/ingestion_pipeline_test.go @@ -185,15 +185,20 @@ func (s *simpleMessages) GetMessageIDs() []pulsar.MessageID { } type simpleConverter struct { - t *testing.T + t *testing.T + calls []*EventSequencesWithIds } -func newSimpleConverter(t *testing.T) InstructionConverter[*simpleMessages] { - return &simpleConverter{t} +func newSimpleConverter(t *testing.T) *simpleConverter { + return &simpleConverter{ + t: t, + calls: []*EventSequencesWithIds{}, + } } func (s *simpleConverter) Convert(_ *armadacontext.Context, msg *EventSequencesWithIds) *simpleMessages { s.t.Helper() + s.calls = append(s.calls, msg) assert.Len(s.t, msg.EventSequences, len(msg.MessageIds)) var converted []*simpleMessage for i, sequence := range msg.EventSequences { @@ -280,6 +285,65 @@ func TestRun_HappyPath_MultipleMessages(t *testing.T) { sink.assertDidProcess(messages) } +func TestRun_LimitsProcessingBatchSize(t *testing.T) { + tests := map[string]struct { + numberOfEventsPerMessage int + numberOfMessages int + batchSize int + expectedNumberOfBatchesProcessed int + }{ + "limits number of events processed per batch": { + numberOfEventsPerMessage: 1, + numberOfMessages: 5, + batchSize: 2, + expectedNumberOfBatchesProcessed: 3, + }, + "limit can be exceeded by one message": { + numberOfEventsPerMessage: 4, + numberOfMessages: 6, + batchSize: 5, + // Batches should get limited to 2 messages, each containing 4 events + expectedNumberOfBatchesProcessed: 3, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + ctx, cancel := armadacontext.WithDeadline(armadacontext.Background(), time.Now().Add(10*time.Second)) + messages := []pulsar.Message{} + for i := 0; i < tc.numberOfMessages; i++ { + messages = append(messages, pulsarutils.NewPulsarMessage(i, baseTime, marshal(t, generateEventSequence(tc.numberOfEventsPerMessage)))) + } + mockConsumer := newMockPulsarConsumer(t, messages, cancel) + converter := newSimpleConverter(t) + sink := newSimpleSink(t) + + pipeline := testPipeline(mockConsumer, converter, sink) + // Should limit batches of event sequences based on batch size + pipeline.pulsarBatchSize = tc.batchSize + + start := time.Now() + err := pipeline.Run(ctx) + assert.NoError(t, err) + elapsed := time.Since(start) + expectedMaximumDuration := time.Duration(tc.expectedNumberOfBatchesProcessed) * batchDuration + + assert.LessOrEqual(t, elapsed, expectedMaximumDuration) + mockConsumer.assertDidAck(messages) + assert.Len(t, converter.calls, tc.expectedNumberOfBatchesProcessed) + for _, call := range converter.calls { + eventCount := 0 + for _, seq := range call.EventSequences { + eventCount += len(seq.Events) + } + // BatchSize can be exceeded by one message, so at most the number of events in a single message + assert.True(t, eventCount < tc.batchSize+tc.numberOfEventsPerMessage) + } + sink.assertDidProcess(messages) + }) + } +} + func testPipeline(consumer pulsar.Consumer, converter InstructionConverter[*simpleMessages], sink Sink[*simpleMessages]) *IngestionPipeline[*simpleMessages] { return &IngestionPipeline[*simpleMessages]{ pulsarConfig: configuration.PulsarConfig{ @@ -297,6 +361,27 @@ func testPipeline(consumer pulsar.Consumer, converter InstructionConverter[*simp } } +func generateEventSequence(numberOfEvents int) *armadaevents.EventSequence { + sequence := &armadaevents.EventSequence{ + Queue: "test", + JobSetName: "test", + UserId: "chrisma", + Events: []*armadaevents.EventSequence_Event{}, + } + for i := 0; i < numberOfEvents; i++ { + sequence.Events = append(sequence.Events, &armadaevents.EventSequence_Event{ + Created: baseTimeProto, + Event: &armadaevents.EventSequence_Event_JobRunSucceeded{ + JobRunSucceeded: &armadaevents.JobRunSucceeded{ + RunId: runIdProto, + JobId: jobIdProto, + }, + }, + }) + } + return sequence +} + func marshal(t *testing.T, es *armadaevents.EventSequence) []byte { payload, err := proto.Marshal(es) assert.NoError(t, err)