Skip to content

Commit

Permalink
Make replication factor for topics configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Dec 8, 2020
1 parent 2b3d74e commit 2d22b69
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 6 deletions.
6 changes: 4 additions & 2 deletions pkg/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ type Config struct {
Brokers []string `yaml:"brokers"`
ClientID string `yaml:"clientId"`

TLS TLSConfig `yaml:"tls"`
SASL SASLConfig `yaml:"sasl"`
TLS TLSConfig `yaml:"tls"`
SASL SASLConfig `yaml:"sasl"`
TopicReplicationFactor int16 `yaml:"topicReplicationFactor"`
}

// RegisterFlags for all sensitive Kafka SASL configs.
Expand All @@ -36,6 +37,7 @@ func (c *Config) Validate() error {
// SetDefaults for Kafka config
func (c *Config) SetDefaults() {
c.ClientID = "owl-shop"
c.TopicReplicationFactor = 3

c.SASL.SetDefaults()
}
2 changes: 1 addition & 1 deletion pkg/shop/address_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (svc *AddressService) createKafkaTopic(ctx context.Context) error {
{
Topic: svc.topicName,
NumPartitions: 6,
ReplicationFactor: 3,
ReplicationFactor: svc.cfg.Kafka.TopicReplicationFactor,
Configs: []kmsg.CreateTopicsRequestTopicConfig{
{"cleanup.policy", &cleanupPolicy},
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/shop/customer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (svc *CustomerService) createKafkaTopic(ctx context.Context) error {
{
Topic: svc.topicName,
NumPartitions: 6,
ReplicationFactor: 3,
ReplicationFactor: svc.cfg.Kafka.TopicReplicationFactor,
Configs: []kmsg.CreateTopicsRequestTopicConfig{
{"cleanup.policy", &cleanupPolicy},
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/shop/frontend_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (svc *FrontendService) createKafkaTopic(ctx context.Context) error {
{
Topic: svc.topicName,
NumPartitions: 6,
ReplicationFactor: 3,
ReplicationFactor: svc.cfg.Kafka.TopicReplicationFactor,
Configs: []kmsg.CreateTopicsRequestTopicConfig{
{"cleanup.policy", &cleanupPolicy},
{"retention.bytes", &retentionBytes},
Expand Down
2 changes: 1 addition & 1 deletion pkg/shop/order_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (svc *OrderService) createKafkaTopic(ctx context.Context) error {
{
Topic: svc.topicName,
NumPartitions: 6,
ReplicationFactor: 3,
ReplicationFactor: svc.cfg.Kafka.TopicReplicationFactor,
Configs: []kmsg.CreateTopicsRequestTopicConfig{
{"cleanup.policy", &cleanupPolicy},
},
Expand Down

0 comments on commit 2d22b69

Please sign in to comment.