Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create pubsub topic on startup if it doesn't exists #291

Merged
merged 1 commit into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ Dispatchers handle vehicle data processing upon its arrival at Fleet Telemetry s
* Configure stream names directly by setting the streams config `"kinesis": { "streams": { *topic_name*: stream_name } }`
* Override stream names with env variables: KINESIS_STREAM_\*uppercase topic\* ex.: `KINESIS_STREAM_V`
* Google pubsub: Along with the required pubsub config (See ./test/integration/config.json for example), be sure to set the environment variable `GOOGLE_APPLICATION_CREDENTIALS`
* On startup, the server will attempt to create missing topics and panic on failure.
* ZMQ: Configure with the config.json file. See implementation here: [config/config.go](./config/config.go)
* Logger: This is a simple STDOUT logger that serializes the protos to json.

Expand Down
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func startServer(config *config.Config, airbrakeNotifier *gobrake.Notifier, logg
monitoring.StartServerMetrics(config, logger, registry)
}

dispatchers, producerRules, err := config.ConfigureProducers(airbrakeHandler, logger)
dispatchers, producerRules, err := config.ConfigureProducers(airbrakeHandler, logger, false)
if err != nil {
return err
}
Expand Down
12 changes: 11 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ func (c *Config) prometheusEnabled() bool {
}

// ConfigureProducers validates and establishes connections to the producers (kafka/pubsub/logger)
func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *logrus.Logger) (map[telemetry.Dispatcher]telemetry.Producer, map[string][]telemetry.Producer, error) {
func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *logrus.Logger, test bool) (map[telemetry.Dispatcher]telemetry.Producer, map[string][]telemetry.Producer, error) {
var pubsubTxTypes []string
reliableAckSources, err := c.configureReliableAckSources()
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -319,6 +320,9 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *l
for recordName, dispatchRules := range c.Records {
var dispatchFuncs []telemetry.Producer
for _, dispatchRule := range dispatchRules {
if dispatchRule == telemetry.Pubsub {
pubsubTxTypes = append(pubsubTxTypes, recordName)
}
dispatchFuncs = append(dispatchFuncs, producers[dispatchRule])
}
dispatchProducerRules[recordName] = dispatchFuncs
Expand All @@ -328,6 +332,12 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *l
}
}

if !test && len(pubsubTxTypes) > 0 {
if err := producers[telemetry.Pubsub].(*googlepubsub.Producer).ProvisionTopics(pubsubTxTypes); err != nil {
return nil, nil, err
}
}

return producers, dispatchProducerRules, nil
}

Expand Down
16 changes: 8 additions & 8 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ var _ = Describe("Test full application config", func() {
config, err := loadTestApplicationConfig(TestSmallConfig)
Expect(err).NotTo(HaveOccurred())

_, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log, true)
Expect(err).NotTo(HaveOccurred())
Expect(producers["V"]).To(HaveLen(1))

Expand Down Expand Up @@ -174,7 +174,7 @@ var _ = Describe("Test full application config", func() {
config, err := loadTestApplicationConfig(configInput)
Expect(err).NotTo(HaveOccurred())

_, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log, true)
Expect(err).To(MatchError(errMessage))
Expect(producers).To(BeNil())
},
Expand All @@ -192,7 +192,7 @@ var _ = Describe("Test full application config", func() {
config.Records = map[string][]telemetry.Dispatcher{"V": {"kinesis"}}

var err error
_, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log, true)
Expect(err).To(MatchError("expected Kinesis to be configured"))
Expect(producers).To(BeNil())
})
Expand Down Expand Up @@ -226,15 +226,15 @@ var _ = Describe("Test full application config", func() {
log, _ := logrus.NoOpLogger()
_ = os.Setenv("PUBSUB_EMULATOR_HOST", "some_url")
_ = os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", "some_service_account_path")
_, _, err := pubsubConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, _, err := pubsubConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log, true)
Expect(err).To(MatchError("pubsub_connect_error pubsub cannot initialize with both emulator and GCP resource"))
})

It("pubsub config works", func() {
log, _ := logrus.NoOpLogger()
_ = os.Setenv("PUBSUB_EMULATOR_HOST", "some_url")
var err error
_, producers, err = pubsubConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, producers, err = pubsubConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log, true)
Expect(err).NotTo(HaveOccurred())
Expect(producers["V"]).NotTo(BeNil())
})
Expand All @@ -253,10 +253,10 @@ var _ = Describe("Test full application config", func() {
log, _ := logrus.NoOpLogger()
config.Records = map[string][]telemetry.Dispatcher{"V": {"zmq"}}
var err error
_, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log, true)
Expect(err).To(MatchError("expected ZMQ to be configured"))
Expect(producers).To(BeNil())
_, producers, err = zmqConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, producers, err = zmqConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log, true)
Expect(err).NotTo(HaveOccurred())
})

Expand All @@ -265,7 +265,7 @@ var _ = Describe("Test full application config", func() {
zmqConfig.ZMQ.Addr = "tcp://127.0.0.1:5285"
log, _ := logrus.NoOpLogger()
var err error
_, producers, err = zmqConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, producers, err = zmqConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log, true)
Expect(err).NotTo(HaveOccurred())
Expect(producers["V"]).NotTo(BeNil())
})
Expand Down
34 changes: 20 additions & 14 deletions datastore/googlepubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,32 +79,38 @@ func NewProducer(prometheusEnabled bool, projectID string, namespace string, met
return p, nil
}

// ProvisionTopics invoked at startup to verify all relevant topics are present
func (p *Producer) ProvisionTopics(txTypes []string) error {
ctx := context.Background()
for _, txType := range txTypes {
topicName := telemetry.BuildTopicName(p.namespace, txType)
logInfo := logrus.LogInfo{"topic_name": topicName, "txType": txType}
_, err := p.createTopicIfNotExists(ctx, topicName)
if err != nil {
p.ReportError("pubsub_topic_creation_error", err, logInfo)
metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": txType})
return err
}
p.logger.ActivityLog("pubsub_topic_created", logInfo)
}
return nil
}

// Produce sends the record payload to pubsub
func (p *Producer) Produce(entry *telemetry.Record) {
ctx := context.Background()

topicName := telemetry.BuildTopicName(p.namespace, entry.TxType)
pubsubTopic := p.pubsubClient.Topic(topicName)
logInfo := logrus.LogInfo{"topic_name": topicName, "txid": entry.Txid}
pubsubTopic, err := p.createTopicIfNotExists(ctx, topicName)

if err != nil {
p.ReportError("pubsub_topic_creation_error", err, logInfo)
metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": entry.TxType})
return
}

if exists, err := pubsubTopic.Exists(ctx); !exists || err != nil {
p.ReportError("pubsub_topic_check_error", err, logInfo)
metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": entry.TxType})
return
}

entry.ProduceTime = time.Now()
result := pubsubTopic.Publish(ctx, &pubsub.Message{
Data: entry.Payload(),
Attributes: entry.Metadata(),
})
if _, err = result.Get(ctx); err != nil {

if _, err := result.Get(ctx); err != nil {
p.ReportError("pubsub_err", err, logInfo)
metricsRegistry.errorCount.Inc(map[string]string{"record_type": entry.TxType})
return
Expand Down
Loading