Skip to content

Commit

Permalink
Add test for pulsarBatchSize on ingestion pipeline (#3755)
Browse files Browse the repository at this point in the history
* Add test for pulsarBatchSize on ingestion pipeline

This test just confirms pulsarBatchSize is limit the number of events we process at once correctly

Signed-off-by: JamesMurkin <[email protected]>

* Merge master - fix proto time

Signed-off-by: JamesMurkin <[email protected]>

* Gofumpt

Signed-off-by: JamesMurkin <[email protected]>

---------

Signed-off-by: JamesMurkin <[email protected]>
  • Loading branch information
JamesMurkin authored Jul 22, 2024
1 parent bf3ba99 commit 7ac7f90
Showing 1 changed file with 88 additions and 3 deletions.
91 changes: 88 additions & 3 deletions internal/common/ingest/ingestion_pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,20 @@ func (s *simpleMessages) GetMessageIDs() []pulsar.MessageID {
}

type simpleConverter struct {
t *testing.T
t *testing.T
calls []*EventSequencesWithIds
}

func newSimpleConverter(t *testing.T) InstructionConverter[*simpleMessages] {
return &simpleConverter{t}
func newSimpleConverter(t *testing.T) *simpleConverter {
return &simpleConverter{
t: t,
calls: []*EventSequencesWithIds{},
}
}

func (s *simpleConverter) Convert(_ *armadacontext.Context, msg *EventSequencesWithIds) *simpleMessages {
s.t.Helper()
s.calls = append(s.calls, msg)
assert.Len(s.t, msg.EventSequences, len(msg.MessageIds))
var converted []*simpleMessage
for i, sequence := range msg.EventSequences {
Expand Down Expand Up @@ -280,6 +285,65 @@ func TestRun_HappyPath_MultipleMessages(t *testing.T) {
sink.assertDidProcess(messages)
}

func TestRun_LimitsProcessingBatchSize(t *testing.T) {
tests := map[string]struct {
numberOfEventsPerMessage int
numberOfMessages int
batchSize int
expectedNumberOfBatchesProcessed int
}{
"limits number of events processed per batch": {
numberOfEventsPerMessage: 1,
numberOfMessages: 5,
batchSize: 2,
expectedNumberOfBatchesProcessed: 3,
},
"limit can be exceeded by one message": {
numberOfEventsPerMessage: 4,
numberOfMessages: 6,
batchSize: 5,
// Batches should get limited to 2 messages, each containing 4 events
expectedNumberOfBatchesProcessed: 3,
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
ctx, cancel := armadacontext.WithDeadline(armadacontext.Background(), time.Now().Add(10*time.Second))
messages := []pulsar.Message{}
for i := 0; i < tc.numberOfMessages; i++ {
messages = append(messages, pulsarutils.NewPulsarMessage(i, baseTime, marshal(t, generateEventSequence(tc.numberOfEventsPerMessage))))
}
mockConsumer := newMockPulsarConsumer(t, messages, cancel)
converter := newSimpleConverter(t)
sink := newSimpleSink(t)

pipeline := testPipeline(mockConsumer, converter, sink)
// Should limit batches of event sequences based on batch size
pipeline.pulsarBatchSize = tc.batchSize

start := time.Now()
err := pipeline.Run(ctx)
assert.NoError(t, err)
elapsed := time.Since(start)
expectedMaximumDuration := time.Duration(tc.expectedNumberOfBatchesProcessed) * batchDuration

assert.LessOrEqual(t, elapsed, expectedMaximumDuration)
mockConsumer.assertDidAck(messages)
assert.Len(t, converter.calls, tc.expectedNumberOfBatchesProcessed)
for _, call := range converter.calls {
eventCount := 0
for _, seq := range call.EventSequences {
eventCount += len(seq.Events)
}
// BatchSize can be exceeded by one message, so at most the number of events in a single message
assert.True(t, eventCount < tc.batchSize+tc.numberOfEventsPerMessage)
}
sink.assertDidProcess(messages)
})
}
}

func testPipeline(consumer pulsar.Consumer, converter InstructionConverter[*simpleMessages], sink Sink[*simpleMessages]) *IngestionPipeline[*simpleMessages] {
return &IngestionPipeline[*simpleMessages]{
pulsarConfig: configuration.PulsarConfig{
Expand All @@ -297,6 +361,27 @@ func testPipeline(consumer pulsar.Consumer, converter InstructionConverter[*simp
}
}

func generateEventSequence(numberOfEvents int) *armadaevents.EventSequence {
sequence := &armadaevents.EventSequence{
Queue: "test",
JobSetName: "test",
UserId: "chrisma",
Events: []*armadaevents.EventSequence_Event{},
}
for i := 0; i < numberOfEvents; i++ {
sequence.Events = append(sequence.Events, &armadaevents.EventSequence_Event{
Created: baseTimeProto,
Event: &armadaevents.EventSequence_Event_JobRunSucceeded{
JobRunSucceeded: &armadaevents.JobRunSucceeded{
RunId: runIdProto,
JobId: jobIdProto,
},
},
})
}
return sequence
}

func marshal(t *testing.T, es *armadaevents.EventSequence) []byte {
payload, err := proto.Marshal(es)
assert.NoError(t, err)
Expand Down

0 comments on commit 7ac7f90

Please sign in to comment.