diff --git a/internal/common/config/pulsar.go b/internal/common/config/pulsar.go new file mode 100644 index 00000000000..b2ec8eef984 --- /dev/null +++ b/internal/common/config/pulsar.go @@ -0,0 +1,42 @@ +package config + +import ( + "time" + + "github.com/apache/pulsar-client-go/pulsar" +) + +type PulsarConfig struct { + // Pulsar URL + URL string `validate:"required"` + // Path to the trusted TLS certificate file (must exist) + TLSTrustCertsFilePath string + // Whether Pulsar client accept untrusted TLS certificate from broker + TLSAllowInsecureConnection bool + // Whether the Pulsar client will validate the hostname in the broker's TLS Cert matches the actual hostname. + TLSValidateHostname bool + // Max number of connections to a single broker that will be kept in the pool. (Default: 1 connection) + MaxConnectionsPerBroker int + // Whether Pulsar authentication is enabled + AuthenticationEnabled bool + // Authentication type. For now only "JWT" auth is valid + AuthenticationType string + // Path to the JWT token (must exist). This must be set if AuthenticationType is "JWT" + JwtTokenPath string + // The pulsar topic that Jobset Events will be published to + JobsetEventsTopic string + // Compression to use. Valid values are "None", "LZ4", "Zlib", "Zstd". Default is "None" + CompressionType pulsar.CompressionType + // Compression Level to use. Valid values are "Default", "Better", "Faster". Default is "Default" + CompressionLevel pulsar.CompressionLevel + // Maximum allowed Events per message + MaxAllowedEventsPerMessage int `validate:"gte=0"` + // Maximum allowed message size in bytes + MaxAllowedMessageSize uint + // Timeout when sending messages asynchronously + SendTimeout time.Duration `validate:"required"` + // Backoff from polling when Pulsar returns an error + BackoffTime time.Duration + // Number of pulsar messages that will be queued by the pulsar consumer. + ReceiverQueueSize int +} diff --git a/internal/common/ingest/ingestion_pipeline.go b/internal/common/ingest/ingestion_pipeline.go index 110db617882..733088d0f34 100644 --- a/internal/common/ingest/ingestion_pipeline.go +++ b/internal/common/ingest/ingestion_pipeline.go @@ -11,12 +11,12 @@ import ( "github.com/armadaproject/armada/internal/common" "github.com/armadaproject/armada/internal/common/armadacontext" + commonconfig "github.com/armadaproject/armada/internal/common/config" "github.com/armadaproject/armada/internal/common/eventutil" commonmetrics "github.com/armadaproject/armada/internal/common/ingest/metrics" protoutil "github.com/armadaproject/armada/internal/common/proto" "github.com/armadaproject/armada/internal/common/pulsarutils" "github.com/armadaproject/armada/internal/common/util" - "github.com/armadaproject/armada/internal/server/configuration" "github.com/armadaproject/armada/pkg/armadaevents" ) @@ -56,7 +56,7 @@ type EventSequencesWithIds struct { // Callers must supply two structs, an InstructionConverter for converting event sequences into something that can be // exhausted and a Sink capable of exhausting these objects type IngestionPipeline[T HasPulsarMessageIds] struct { - pulsarConfig configuration.PulsarConfig + pulsarConfig commonconfig.PulsarConfig metricsPort uint16 metrics *commonmetrics.Metrics pulsarSubscriptionName string @@ -70,7 +70,7 @@ type IngestionPipeline[T HasPulsarMessageIds] struct { // NewIngestionPipeline creates an IngestionPipeline that processes all pulsar messages func NewIngestionPipeline[T HasPulsarMessageIds]( - pulsarConfig configuration.PulsarConfig, + pulsarConfig commonconfig.PulsarConfig, pulsarSubscriptionName string, pulsarBatchSize int, pulsarBatchDuration time.Duration, diff --git a/internal/common/ingest/ingestion_pipeline_test.go b/internal/common/ingest/ingestion_pipeline_test.go index 70952dec47e..9082f40ac16 100644 --- a/internal/common/ingest/ingestion_pipeline_test.go +++ b/internal/common/ingest/ingestion_pipeline_test.go @@ -10,10 +10,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/armadaproject/armada/internal/common/armadacontext" + commonconfig "github.com/armadaproject/armada/internal/common/config" "github.com/armadaproject/armada/internal/common/ingest/metrics" protoutil "github.com/armadaproject/armada/internal/common/proto" "github.com/armadaproject/armada/internal/common/pulsarutils" - "github.com/armadaproject/armada/internal/server/configuration" "github.com/armadaproject/armada/pkg/armadaevents" ) @@ -346,7 +346,7 @@ func TestRun_LimitsProcessingBatchSize(t *testing.T) { func testPipeline(consumer pulsar.Consumer, converter InstructionConverter[*simpleMessages], sink Sink[*simpleMessages]) *IngestionPipeline[*simpleMessages] { return &IngestionPipeline[*simpleMessages]{ - pulsarConfig: configuration.PulsarConfig{ + pulsarConfig: commonconfig.PulsarConfig{ BackoffTime: time.Second, }, pulsarSubscriptionName: "subscription", diff --git a/internal/common/pulsarutils/pulsarclient.go b/internal/common/pulsarutils/pulsarclient.go index dba9a0b066f..22d53c852f5 100644 --- a/internal/common/pulsarutils/pulsarclient.go +++ b/internal/common/pulsarutils/pulsarclient.go @@ -7,10 +7,10 @@ import ( "github.com/pkg/errors" "github.com/armadaproject/armada/internal/common/armadaerrors" - "github.com/armadaproject/armada/internal/server/configuration" + commonconfig "github.com/armadaproject/armada/internal/common/config" ) -func NewPulsarClient(config *configuration.PulsarConfig) (pulsar.Client, error) { +func NewPulsarClient(config *commonconfig.PulsarConfig) (pulsar.Client, error) { var authentication pulsar.Authentication // Sanity check that supplied Pulsar authentication parameters make sense diff --git a/internal/common/pulsarutils/pulsarclient_test.go b/internal/common/pulsarutils/pulsarclient_test.go index 445c5eb2861..5e94a9ead1e 100644 --- a/internal/common/pulsarutils/pulsarclient_test.go +++ b/internal/common/pulsarutils/pulsarclient_test.go @@ -6,14 +6,14 @@ import ( "github.com/stretchr/testify/assert" - "github.com/armadaproject/armada/internal/server/configuration" + commonconfig "github.com/armadaproject/armada/internal/common/config" ) func TestCreatePulsarClientHappyPath(t *testing.T) { cwd, _ := os.Executable() // Need a valid directory for tokens and certs // test with auth and tls configured - config := &configuration.PulsarConfig{ + config := &commonconfig.PulsarConfig{ URL: "pulsar://pulsarhost:50000", TLSTrustCertsFilePath: cwd, @@ -28,7 +28,7 @@ func TestCreatePulsarClientHappyPath(t *testing.T) { assert.NoError(t, err) // Test without auth or TLS - config = &configuration.PulsarConfig{ + config = &commonconfig.PulsarConfig{ URL: "pulsar://pulsarhost:50000", MaxConnectionsPerBroker: 100, } @@ -38,20 +38,20 @@ func TestCreatePulsarClientHappyPath(t *testing.T) { func TestCreatePulsarClientInvalidAuth(t *testing.T) { // No Auth type - _, err := NewPulsarClient(&configuration.PulsarConfig{ + _, err := NewPulsarClient(&commonconfig.PulsarConfig{ AuthenticationEnabled: true, }) assert.Error(t, err) // Invalid Auth type - _, err = NewPulsarClient(&configuration.PulsarConfig{ + _, err = NewPulsarClient(&commonconfig.PulsarConfig{ AuthenticationEnabled: true, AuthenticationType: "INVALID", }) assert.Error(t, err) // No Token - _, err = NewPulsarClient(&configuration.PulsarConfig{ + _, err = NewPulsarClient(&commonconfig.PulsarConfig{ AuthenticationEnabled: true, AuthenticationType: "JWT", }) diff --git a/internal/eventingester/configuration/types.go b/internal/eventingester/configuration/types.go index 65dc1390d8b..2e9db14c68b 100644 --- a/internal/eventingester/configuration/types.go +++ b/internal/eventingester/configuration/types.go @@ -5,8 +5,8 @@ import ( "github.com/redis/go-redis/v9" + commonconfig "github.com/armadaproject/armada/internal/common/config" profilingconfig "github.com/armadaproject/armada/internal/common/profiling/configuration" - "github.com/armadaproject/armada/internal/server/configuration" ) type EventIngesterConfiguration struct { @@ -15,7 +15,7 @@ type EventIngesterConfiguration struct { // Metrics configuration MetricsPort uint16 // General Pulsar configuration - Pulsar configuration.PulsarConfig + Pulsar commonconfig.PulsarConfig // Pulsar subscription name SubscriptionName string // Size in bytes above which event message will be compressed when inserting in the database diff --git a/internal/lookoutingesterv2/configuration/types.go b/internal/lookoutingesterv2/configuration/types.go index 6857f81734c..e2998c9e5be 100644 --- a/internal/lookoutingesterv2/configuration/types.go +++ b/internal/lookoutingesterv2/configuration/types.go @@ -3,6 +3,7 @@ package configuration import ( "time" + commonconfig "github.com/armadaproject/armada/internal/common/config" profilingconfig "github.com/armadaproject/armada/internal/common/profiling/configuration" "github.com/armadaproject/armada/internal/server/configuration" ) @@ -13,7 +14,7 @@ type LookoutIngesterV2Configuration struct { // Metrics configuration MetricsPort uint16 // General Pulsar configuration - Pulsar configuration.PulsarConfig + Pulsar commonconfig.PulsarConfig // Pulsar subscription name SubscriptionName string // Size in bytes above which job specs will be compressed when inserting in the database diff --git a/internal/scheduler/configuration/configuration.go b/internal/scheduler/configuration/configuration.go index c6ae533b53a..b5563e865d0 100644 --- a/internal/scheduler/configuration/configuration.go +++ b/internal/scheduler/configuration/configuration.go @@ -9,6 +9,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" authconfig "github.com/armadaproject/armada/internal/common/auth/configuration" + commonconfig "github.com/armadaproject/armada/internal/common/config" grpcconfig "github.com/armadaproject/armada/internal/common/grpc/configuration" profilingconfig "github.com/armadaproject/armada/internal/common/profiling/configuration" "github.com/armadaproject/armada/internal/common/types" @@ -28,7 +29,7 @@ type Configuration struct { // Armada Api Connection. Used to fetch queues. ArmadaApi client.ApiConnectionDetails // General Pulsar configuration - Pulsar configuration.PulsarConfig + Pulsar commonconfig.PulsarConfig // Configuration controlling leader election Leader LeaderConfig // Configuration controlling metrics diff --git a/internal/scheduleringester/config.go b/internal/scheduleringester/config.go index b6b45270ae7..fa38326e701 100644 --- a/internal/scheduleringester/config.go +++ b/internal/scheduleringester/config.go @@ -3,6 +3,7 @@ package scheduleringester import ( "time" + commonconfig "github.com/armadaproject/armada/internal/common/config" profilingconfig "github.com/armadaproject/armada/internal/common/profiling/configuration" "github.com/armadaproject/armada/internal/server/configuration" ) @@ -13,7 +14,7 @@ type Configuration struct { // Metrics Port MetricsPort uint16 // General Pulsar configuration - Pulsar configuration.PulsarConfig + Pulsar commonconfig.PulsarConfig // Pulsar subscription name SubscriptionName string // Number of event messages that will be batched together before being inserted into the database diff --git a/internal/server/configuration/types.go b/internal/server/configuration/types.go index 3351c27ad2d..59e0d74a868 100644 --- a/internal/server/configuration/types.go +++ b/internal/server/configuration/types.go @@ -3,11 +3,11 @@ package configuration import ( "time" - "github.com/apache/pulsar-client-go/pulsar" "github.com/redis/go-redis/v9" v1 "k8s.io/api/core/v1" authconfig "github.com/armadaproject/armada/internal/common/auth/configuration" + commonconfig "github.com/armadaproject/armada/internal/common/config" grpcconfig "github.com/armadaproject/armada/internal/common/grpc/configuration" profilingconfig "github.com/armadaproject/armada/internal/common/profiling/configuration" armadaresource "github.com/armadaproject/armada/internal/common/resource" @@ -30,7 +30,7 @@ type ArmadaConfig struct { SchedulerApiConnection client.ApiConnectionDetails EventsApiRedis redis.UniversalOptions - Pulsar PulsarConfig + Pulsar commonconfig.PulsarConfig Postgres PostgresConfig // Needs to point to the lookout db QueryApi QueryApiConfig @@ -41,41 +41,6 @@ type ArmadaConfig struct { Submission SubmissionConfig } -type PulsarConfig struct { - // Pulsar URL - URL string `validate:"required"` - // Path to the trusted TLS certificate file (must exist) - TLSTrustCertsFilePath string - // Whether Pulsar client accept untrusted TLS certificate from broker - TLSAllowInsecureConnection bool - // Whether the Pulsar client will validate the hostname in the broker's TLS Cert matches the actual hostname. - TLSValidateHostname bool - // Max number of connections to a single broker that will be kept in the pool. (Default: 1 connection) - MaxConnectionsPerBroker int - // Whether Pulsar authentication is enabled - AuthenticationEnabled bool - // Authentication type. For now only "JWT" auth is valid - AuthenticationType string - // Path to the JWT token (must exist). This must be set if AuthenticationType is "JWT" - JwtTokenPath string - // The pulsar topic that Jobset Events will be published to - JobsetEventsTopic string - // Compression to use. Valid values are "None", "LZ4", "Zlib", "Zstd". Default is "None" - CompressionType pulsar.CompressionType - // Compression Level to use. Valid values are "Default", "Better", "Faster". Default is "Default" - CompressionLevel pulsar.CompressionLevel - // Maximum allowed Events per message - MaxAllowedEventsPerMessage int `validate:"gte=0"` - // Maximum allowed message size in bytes - MaxAllowedMessageSize uint - // Timeout when sending messages asynchronously - SendTimeout time.Duration `validate:"required"` - // Backoff from polling when Pulsar returns an error - BackoffTime time.Duration - // Number of pulsar messages that will be queued by the pulsar consumer. - ReceiverQueueSize int -} - // SubmissionConfig contains config relating to job submission. type SubmissionConfig struct { // The priorityClassName field on submitted pod must be either empty or in this list. diff --git a/internal/server/submit/validation/submit_request_test.go b/internal/server/submit/validation/submit_request_test.go index 09bcbd93a99..07abe94ddad 100644 --- a/internal/server/submit/validation/submit_request_test.go +++ b/internal/server/submit/validation/submit_request_test.go @@ -525,7 +525,7 @@ func TestValidateNamespace(t *testing.T) { } } -func TestValidateHasJobSetID(t *testing.T) { +func TestValidateHasJobSetId(t *testing.T) { tests := map[string]struct { req *api.JobSubmitRequest expectSuccess bool