Skip to content

Commit

Permalink
Use Publisher Everywhere (#3803)
Browse files Browse the repository at this point in the history
* Use Publisher Eveywhere

Use the pulsarutils.Publisher everywhere we publish eventsequences

Motivations for this are:
 - Same code used to publish to pulsar used in all places
 - Better testing on that piece of code
 - Less work if we need to work on publisher in future (only 1 place to update)

Signed-off-by: JamesMurkin <[email protected]>

* Fix config validation

Signed-off-by: JamesMurkin <[email protected]>

* imports

Signed-off-by: JamesMurkin <[email protected]>

* gofumpt

Signed-off-by: JamesMurkin <[email protected]>

* Fix producer names

Signed-off-by: JamesMurkin <[email protected]>

---------

Signed-off-by: JamesMurkin <[email protected]>
  • Loading branch information
JamesMurkin authored Jul 22, 2024
1 parent 56c9d9f commit 7d4ed88
Show file tree
Hide file tree
Showing 23 changed files with 432 additions and 362 deletions.
1 change: 1 addition & 0 deletions config/armada/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pulsar:
eventsPrinterSubscription: "EventsPrinter"
maxAllowedEventsPerMessage: 1000
maxAllowedMessageSize: 4194304 # 4MB
sendTimeout: 5s
receiverQueueSize: 100
postgres:
connection:
Expand Down
1 change: 0 additions & 1 deletion config/eventingester/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ redis:
pulsar:
URL: pulsar://pulsar:6650
jobsetEventsTopic: events
receiveTimeout: 5s
backoffTime: 1s
receiverQueueSize: 100
subscriptionName: "events-ingester"
Expand Down
1 change: 0 additions & 1 deletion config/lookoutingesterv2/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ metricsPort: 9002
pulsar:
URL: "pulsar://pulsar:6650"
jobsetEventsTopic: "events"
receiveTimeout: 5s
backoffTime: 1s
receiverQueueSize: 100
subscriptionName: "lookout-ingester-v2"
Expand Down
1 change: 1 addition & 0 deletions config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pulsar:
compressionLevel: faster
maxAllowedEventsPerMessage: 1000
maxAllowedMessageSize: 4194304 #4Mi
sendTimeout: 5s
armadaApi:
armadaUrl: "server:50051"
forceNoTls: true
Expand Down
1 change: 0 additions & 1 deletion config/scheduleringester/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ metrics:
pulsar:
URL: "pulsar://localhost:6650"
jobsetEventsTopic: "events"
receiveTimeout: 5s
backoffTime: 1s
receiverQueueSize: 100
subscriptionName: "scheduler-ingester"
Expand Down
4 changes: 2 additions & 2 deletions internal/armada/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ type PulsarConfig struct {
MaxAllowedEventsPerMessage int `validate:"gte=0"`
// Maximum allowed message size in bytes
MaxAllowedMessageSize uint
// Timeout when polling pulsar for messages
ReceiveTimeout time.Duration
// Timeout when sending messages asynchronously
SendTimeout time.Duration `validate:"required"`
// Backoff from polling when Pulsar returns an error
BackoffTime time.Duration
// Number of pulsar messages that will be queued by the pulsar consumer.
Expand Down
3 changes: 1 addition & 2 deletions internal/armada/mocks/generate.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package mocks

// Mock implementations used by tests
//go:generate mockgen -destination=./mock_repository.go -package=mocks "github.com/armadaproject/armada/internal/armada/queue" QueueRepository
//go:generate mockgen -destination=./mock_deduplicator.go -package=mocks "github.com/armadaproject/armada/internal/armada/submit" Deduplicator,Publisher
//go:generate mockgen -destination=./mock_deduplicator.go -package=mocks "github.com/armadaproject/armada/internal/armada/submit" Deduplicator
//go:generate mockgen -destination=./mock_authorizer.go -package=mocks "github.com/armadaproject/armada/internal/armada/server" ActionAuthorizer
52 changes: 1 addition & 51 deletions internal/armada/mocks/mock_deduplicator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/armada/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func Serve(ctx *armadacontext.Context, config *configuration.ArmadaConfig, healt
CompressionLevel: config.Pulsar.CompressionLevel,
BatchingMaxSize: config.Pulsar.MaxAllowedMessageSize,
Topic: config.Pulsar.JobsetEventsTopic,
}, config.Pulsar.MaxAllowedEventsPerMessage, config.Pulsar.MaxAllowedMessageSize)
}, config.Pulsar.MaxAllowedEventsPerMessage, config.Pulsar.MaxAllowedMessageSize, config.Pulsar.SendTimeout)
if err != nil {
return errors.Wrapf(err, "error creating pulsar producer")
}
Expand Down
5 changes: 3 additions & 2 deletions internal/armada/submit/submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ import (
"github.com/armadaproject/armada/internal/armada/submit/testfixtures"
"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/auth/permission"
commonMocks "github.com/armadaproject/armada/internal/common/mocks"
"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/armadaevents"
"github.com/armadaproject/armada/pkg/client/queue"
)

type mockObjects struct {
publisher *mocks.MockPublisher
publisher *commonMocks.MockPublisher
queueRepo *mocks.MockQueueRepository
deduplicator *mocks.MockDeduplicator
authorizer *mocks.MockActionAuthorizer
Expand All @@ -32,7 +33,7 @@ type mockObjects struct {
func createMocks(t *testing.T) *mockObjects {
ctrl := gomock.NewController(t)
return &mockObjects{
publisher: mocks.NewMockPublisher(ctrl),
publisher: commonMocks.NewMockPublisher(ctrl),
queueRepo: mocks.NewMockQueueRepository(ctrl),
deduplicator: mocks.NewMockDeduplicator(ctrl),
authorizer: mocks.NewMockActionAuthorizer(ctrl),
Expand Down
3 changes: 1 addition & 2 deletions internal/common/ingest/ingestion_pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,7 @@ func TestRun_LimitsProcessingBatchSize(t *testing.T) {
func testPipeline(consumer pulsar.Consumer, converter InstructionConverter[*simpleMessages], sink Sink[*simpleMessages]) *IngestionPipeline[*simpleMessages] {
return &IngestionPipeline[*simpleMessages]{
pulsarConfig: configuration.PulsarConfig{
ReceiveTimeout: 10 * time.Second,
BackoffTime: time.Second,
BackoffTime: time.Second,
},
pulsarSubscriptionName: "subscription",
pulsarBatchDuration: batchDuration,
Expand Down
1 change: 1 addition & 0 deletions internal/common/mocks/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ package mocks

// Mock implementations used by tests
//go:generate mockgen -destination=./mock_pulsar.go -package=mocks "github.com/apache/pulsar-client-go/pulsar" Client,Producer,Message
//go:generate mockgen -destination=./mock_publisher.go -package=mocks "github.com/armadaproject/armada/internal/common/pulsarutils" Publisher
//go:generate mockgen -destination=./mock_executorapi.go -package=mocks "github.com/armadaproject/armada/pkg/executorapi" ExecutorApiClient,ExecutorApi_LeaseJobRunsClient
67 changes: 67 additions & 0 deletions internal/common/mocks/mock_publisher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

102 changes: 0 additions & 102 deletions internal/common/pulsarutils/eventsequence.go

This file was deleted.

Loading

0 comments on commit 7d4ed88

Please sign in to comment.