Skip to content

Commit

Permalink
Move PulsarConfig into common/config (#217) (#3907)
Browse files Browse the repository at this point in the history
* ARMADA-2848 Move PulsarConfig into commonconfig

* Update test name TestValidateHasJobSetID->Id

* Revert unintended changes to yarn.lock file

* fix import order

Co-authored-by: Eleanor Pratt <[email protected]>
  • Loading branch information
eleanorpratt and Eleanor Pratt authored Sep 5, 2024
1 parent 286cfd5 commit 35cb59f
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 56 deletions.
42 changes: 42 additions & 0 deletions internal/common/config/pulsar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package config

import (
"time"

"github.com/apache/pulsar-client-go/pulsar"
)

type PulsarConfig struct {
// Pulsar URL
URL string `validate:"required"`
// Path to the trusted TLS certificate file (must exist)
TLSTrustCertsFilePath string
// Whether Pulsar client accept untrusted TLS certificate from broker
TLSAllowInsecureConnection bool
// Whether the Pulsar client will validate the hostname in the broker's TLS Cert matches the actual hostname.
TLSValidateHostname bool
// Max number of connections to a single broker that will be kept in the pool. (Default: 1 connection)
MaxConnectionsPerBroker int
// Whether Pulsar authentication is enabled
AuthenticationEnabled bool
// Authentication type. For now only "JWT" auth is valid
AuthenticationType string
// Path to the JWT token (must exist). This must be set if AuthenticationType is "JWT"
JwtTokenPath string
// The pulsar topic that Jobset Events will be published to
JobsetEventsTopic string
// Compression to use. Valid values are "None", "LZ4", "Zlib", "Zstd". Default is "None"
CompressionType pulsar.CompressionType
// Compression Level to use. Valid values are "Default", "Better", "Faster". Default is "Default"
CompressionLevel pulsar.CompressionLevel
// Maximum allowed Events per message
MaxAllowedEventsPerMessage int `validate:"gte=0"`
// Maximum allowed message size in bytes
MaxAllowedMessageSize uint
// Timeout when sending messages asynchronously
SendTimeout time.Duration `validate:"required"`
// Backoff from polling when Pulsar returns an error
BackoffTime time.Duration
// Number of pulsar messages that will be queued by the pulsar consumer.
ReceiverQueueSize int
}
6 changes: 3 additions & 3 deletions internal/common/ingest/ingestion_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (

"github.com/armadaproject/armada/internal/common"
"github.com/armadaproject/armada/internal/common/armadacontext"
commonconfig "github.com/armadaproject/armada/internal/common/config"
"github.com/armadaproject/armada/internal/common/eventutil"
commonmetrics "github.com/armadaproject/armada/internal/common/ingest/metrics"
protoutil "github.com/armadaproject/armada/internal/common/proto"
"github.com/armadaproject/armada/internal/common/pulsarutils"
"github.com/armadaproject/armada/internal/common/util"
"github.com/armadaproject/armada/internal/server/configuration"
"github.com/armadaproject/armada/pkg/armadaevents"
)

Expand Down Expand Up @@ -56,7 +56,7 @@ type EventSequencesWithIds struct {
// Callers must supply two structs, an InstructionConverter for converting event sequences into something that can be
// exhausted and a Sink capable of exhausting these objects
type IngestionPipeline[T HasPulsarMessageIds] struct {
pulsarConfig configuration.PulsarConfig
pulsarConfig commonconfig.PulsarConfig
metricsPort uint16
metrics *commonmetrics.Metrics
pulsarSubscriptionName string
Expand All @@ -70,7 +70,7 @@ type IngestionPipeline[T HasPulsarMessageIds] struct {

// NewIngestionPipeline creates an IngestionPipeline that processes all pulsar messages
func NewIngestionPipeline[T HasPulsarMessageIds](
pulsarConfig configuration.PulsarConfig,
pulsarConfig commonconfig.PulsarConfig,
pulsarSubscriptionName string,
pulsarBatchSize int,
pulsarBatchDuration time.Duration,
Expand Down
4 changes: 2 additions & 2 deletions internal/common/ingest/ingestion_pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"github.com/stretchr/testify/assert"

"github.com/armadaproject/armada/internal/common/armadacontext"
commonconfig "github.com/armadaproject/armada/internal/common/config"
"github.com/armadaproject/armada/internal/common/ingest/metrics"
protoutil "github.com/armadaproject/armada/internal/common/proto"
"github.com/armadaproject/armada/internal/common/pulsarutils"
"github.com/armadaproject/armada/internal/server/configuration"
"github.com/armadaproject/armada/pkg/armadaevents"
)

Expand Down Expand Up @@ -346,7 +346,7 @@ func TestRun_LimitsProcessingBatchSize(t *testing.T) {

func testPipeline(consumer pulsar.Consumer, converter InstructionConverter[*simpleMessages], sink Sink[*simpleMessages]) *IngestionPipeline[*simpleMessages] {
return &IngestionPipeline[*simpleMessages]{
pulsarConfig: configuration.PulsarConfig{
pulsarConfig: commonconfig.PulsarConfig{
BackoffTime: time.Second,
},
pulsarSubscriptionName: "subscription",
Expand Down
4 changes: 2 additions & 2 deletions internal/common/pulsarutils/pulsarclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"github.com/pkg/errors"

"github.com/armadaproject/armada/internal/common/armadaerrors"
"github.com/armadaproject/armada/internal/server/configuration"
commonconfig "github.com/armadaproject/armada/internal/common/config"
)

func NewPulsarClient(config *configuration.PulsarConfig) (pulsar.Client, error) {
func NewPulsarClient(config *commonconfig.PulsarConfig) (pulsar.Client, error) {
var authentication pulsar.Authentication

// Sanity check that supplied Pulsar authentication parameters make sense
Expand Down
12 changes: 6 additions & 6 deletions internal/common/pulsarutils/pulsarclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (

"github.com/stretchr/testify/assert"

"github.com/armadaproject/armada/internal/server/configuration"
commonconfig "github.com/armadaproject/armada/internal/common/config"
)

func TestCreatePulsarClientHappyPath(t *testing.T) {
cwd, _ := os.Executable() // Need a valid directory for tokens and certs

// test with auth and tls configured
config := &configuration.PulsarConfig{
config := &commonconfig.PulsarConfig{
URL: "pulsar://pulsarhost:50000",

TLSTrustCertsFilePath: cwd,
Expand All @@ -28,7 +28,7 @@ func TestCreatePulsarClientHappyPath(t *testing.T) {
assert.NoError(t, err)

// Test without auth or TLS
config = &configuration.PulsarConfig{
config = &commonconfig.PulsarConfig{
URL: "pulsar://pulsarhost:50000",
MaxConnectionsPerBroker: 100,
}
Expand All @@ -38,20 +38,20 @@ func TestCreatePulsarClientHappyPath(t *testing.T) {

func TestCreatePulsarClientInvalidAuth(t *testing.T) {
// No Auth type
_, err := NewPulsarClient(&configuration.PulsarConfig{
_, err := NewPulsarClient(&commonconfig.PulsarConfig{
AuthenticationEnabled: true,
})
assert.Error(t, err)

// Invalid Auth type
_, err = NewPulsarClient(&configuration.PulsarConfig{
_, err = NewPulsarClient(&commonconfig.PulsarConfig{
AuthenticationEnabled: true,
AuthenticationType: "INVALID",
})
assert.Error(t, err)

// No Token
_, err = NewPulsarClient(&configuration.PulsarConfig{
_, err = NewPulsarClient(&commonconfig.PulsarConfig{
AuthenticationEnabled: true,
AuthenticationType: "JWT",
})
Expand Down
4 changes: 2 additions & 2 deletions internal/eventingester/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (

"github.com/redis/go-redis/v9"

commonconfig "github.com/armadaproject/armada/internal/common/config"
profilingconfig "github.com/armadaproject/armada/internal/common/profiling/configuration"
"github.com/armadaproject/armada/internal/server/configuration"
)

type EventIngesterConfiguration struct {
Expand All @@ -15,7 +15,7 @@ type EventIngesterConfiguration struct {
// Metrics configuration
MetricsPort uint16
// General Pulsar configuration
Pulsar configuration.PulsarConfig
Pulsar commonconfig.PulsarConfig
// Pulsar subscription name
SubscriptionName string
// Size in bytes above which event message will be compressed when inserting in the database
Expand Down
3 changes: 2 additions & 1 deletion internal/lookoutingesterv2/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package configuration
import (
"time"

commonconfig "github.com/armadaproject/armada/internal/common/config"
profilingconfig "github.com/armadaproject/armada/internal/common/profiling/configuration"
"github.com/armadaproject/armada/internal/server/configuration"
)
Expand All @@ -13,7 +14,7 @@ type LookoutIngesterV2Configuration struct {
// Metrics configuration
MetricsPort uint16
// General Pulsar configuration
Pulsar configuration.PulsarConfig
Pulsar commonconfig.PulsarConfig
// Pulsar subscription name
SubscriptionName string
// Size in bytes above which job specs will be compressed when inserting in the database
Expand Down
3 changes: 2 additions & 1 deletion internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"

authconfig "github.com/armadaproject/armada/internal/common/auth/configuration"
commonconfig "github.com/armadaproject/armada/internal/common/config"
grpcconfig "github.com/armadaproject/armada/internal/common/grpc/configuration"
profilingconfig "github.com/armadaproject/armada/internal/common/profiling/configuration"
"github.com/armadaproject/armada/internal/common/types"
Expand All @@ -28,7 +29,7 @@ type Configuration struct {
// Armada Api Connection. Used to fetch queues.
ArmadaApi client.ApiConnectionDetails
// General Pulsar configuration
Pulsar configuration.PulsarConfig
Pulsar commonconfig.PulsarConfig
// Configuration controlling leader election
Leader LeaderConfig
// Configuration controlling metrics
Expand Down
3 changes: 2 additions & 1 deletion internal/scheduleringester/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scheduleringester
import (
"time"

commonconfig "github.com/armadaproject/armada/internal/common/config"
profilingconfig "github.com/armadaproject/armada/internal/common/profiling/configuration"
"github.com/armadaproject/armada/internal/server/configuration"
)
Expand All @@ -13,7 +14,7 @@ type Configuration struct {
// Metrics Port
MetricsPort uint16
// General Pulsar configuration
Pulsar configuration.PulsarConfig
Pulsar commonconfig.PulsarConfig
// Pulsar subscription name
SubscriptionName string
// Number of event messages that will be batched together before being inserted into the database
Expand Down
39 changes: 2 additions & 37 deletions internal/server/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package configuration
import (
"time"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/redis/go-redis/v9"
v1 "k8s.io/api/core/v1"

authconfig "github.com/armadaproject/armada/internal/common/auth/configuration"
commonconfig "github.com/armadaproject/armada/internal/common/config"
grpcconfig "github.com/armadaproject/armada/internal/common/grpc/configuration"
profilingconfig "github.com/armadaproject/armada/internal/common/profiling/configuration"
armadaresource "github.com/armadaproject/armada/internal/common/resource"
Expand All @@ -30,7 +30,7 @@ type ArmadaConfig struct {
SchedulerApiConnection client.ApiConnectionDetails

EventsApiRedis redis.UniversalOptions
Pulsar PulsarConfig
Pulsar commonconfig.PulsarConfig
Postgres PostgresConfig // Needs to point to the lookout db
QueryApi QueryApiConfig

Expand All @@ -41,41 +41,6 @@ type ArmadaConfig struct {
Submission SubmissionConfig
}

type PulsarConfig struct {
// Pulsar URL
URL string `validate:"required"`
// Path to the trusted TLS certificate file (must exist)
TLSTrustCertsFilePath string
// Whether Pulsar client accept untrusted TLS certificate from broker
TLSAllowInsecureConnection bool
// Whether the Pulsar client will validate the hostname in the broker's TLS Cert matches the actual hostname.
TLSValidateHostname bool
// Max number of connections to a single broker that will be kept in the pool. (Default: 1 connection)
MaxConnectionsPerBroker int
// Whether Pulsar authentication is enabled
AuthenticationEnabled bool
// Authentication type. For now only "JWT" auth is valid
AuthenticationType string
// Path to the JWT token (must exist). This must be set if AuthenticationType is "JWT"
JwtTokenPath string
// The pulsar topic that Jobset Events will be published to
JobsetEventsTopic string
// Compression to use. Valid values are "None", "LZ4", "Zlib", "Zstd". Default is "None"
CompressionType pulsar.CompressionType
// Compression Level to use. Valid values are "Default", "Better", "Faster". Default is "Default"
CompressionLevel pulsar.CompressionLevel
// Maximum allowed Events per message
MaxAllowedEventsPerMessage int `validate:"gte=0"`
// Maximum allowed message size in bytes
MaxAllowedMessageSize uint
// Timeout when sending messages asynchronously
SendTimeout time.Duration `validate:"required"`
// Backoff from polling when Pulsar returns an error
BackoffTime time.Duration
// Number of pulsar messages that will be queued by the pulsar consumer.
ReceiverQueueSize int
}

// SubmissionConfig contains config relating to job submission.
type SubmissionConfig struct {
// The priorityClassName field on submitted pod must be either empty or in this list.
Expand Down
2 changes: 1 addition & 1 deletion internal/server/submit/validation/submit_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func TestValidateNamespace(t *testing.T) {
}
}

func TestValidateHasJobSetID(t *testing.T) {
func TestValidateHasJobSetId(t *testing.T) {
tests := map[string]struct {
req *api.JobSubmitRequest
expectSuccess bool
Expand Down

0 comments on commit 35cb59f

Please sign in to comment.