Skip to content

Commit

Permalink
create pubsub topic on startup if it doesn't exists
Browse files Browse the repository at this point in the history
  • Loading branch information
agbpatro committed Jan 6, 2025
1 parent 4034cea commit 39c902f
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 24 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ Dispatchers handle vehicle data processing upon its arrival at Fleet Telemetry s
* Configure stream names directly by setting the streams config `"kinesis": { "streams": { *topic_name*: stream_name } }`
* Override stream names with env variables: KINESIS_STREAM_\*uppercase topic\* ex.: `KINESIS_STREAM_V`
* Google pubsub: Along with the required pubsub config (See ./test/integration/config.json for example), be sure to set the environment variable `GOOGLE_APPLICATION_CREDENTIALS`
* Fleet Telemetry server will try to create topics on startup. If it topic doesn't exists or creation of topic fails, server will fail to startup and crash.
* ZMQ: Configure with the config.json file. See implementation here: [config/config.go](./config/config.go)
* Logger: This is a simple STDOUT logger that serializes the protos to json.

Expand Down
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func startServer(config *config.Config, airbrakeNotifier *gobrake.Notifier, logg
monitoring.StartServerMetrics(config, logger, registry)
}

dispatchers, producerRules, err := config.ConfigureProducers(airbrakeHandler, logger)
dispatchers, producerRules, err := config.ConfigureProducers(airbrakeHandler, logger, false)
if err != nil {
return err
}
Expand Down
12 changes: 11 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ func (c *Config) prometheusEnabled() bool {
}

// ConfigureProducers validates and establishes connections to the producers (kafka/pubsub/logger)
func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *logrus.Logger) (map[telemetry.Dispatcher]telemetry.Producer, map[string][]telemetry.Producer, error) {
func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *logrus.Logger, test bool) (map[telemetry.Dispatcher]telemetry.Producer, map[string][]telemetry.Producer, error) {
var pubsubTxTypes []string
reliableAckSources, err := c.configureReliableAckSources()
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -319,6 +320,9 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *l
for recordName, dispatchRules := range c.Records {
var dispatchFuncs []telemetry.Producer
for _, dispatchRule := range dispatchRules {
if dispatchRule == telemetry.Pubsub {
pubsubTxTypes = append(pubsubTxTypes, recordName)
}
dispatchFuncs = append(dispatchFuncs, producers[dispatchRule])
}
dispatchProducerRules[recordName] = dispatchFuncs
Expand All @@ -328,6 +332,12 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *l
}
}

if !test && len(pubsubTxTypes) > 0 {
if err := producers[telemetry.Pubsub].(*googlepubsub.Producer).ProvisionTopics(pubsubTxTypes); err != nil {
return nil, nil, err
}
}

return producers, dispatchProducerRules, nil
}

Expand Down
16 changes: 8 additions & 8 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ var _ = Describe("Test full application config", func() {
config, err := loadTestApplicationConfig(TestSmallConfig)
Expect(err).NotTo(HaveOccurred())

_, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log, true)
Expect(err).NotTo(HaveOccurred())
Expect(producers["V"]).To(HaveLen(1))

Expand Down Expand Up @@ -174,7 +174,7 @@ var _ = Describe("Test full application config", func() {
config, err := loadTestApplicationConfig(configInput)
Expect(err).NotTo(HaveOccurred())

_, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log, true)
Expect(err).To(MatchError(errMessage))
Expect(producers).To(BeNil())
},
Expand All @@ -192,7 +192,7 @@ var _ = Describe("Test full application config", func() {
config.Records = map[string][]telemetry.Dispatcher{"V": {"kinesis"}}

var err error
_, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log, true)
Expect(err).To(MatchError("expected Kinesis to be configured"))
Expect(producers).To(BeNil())
})
Expand Down Expand Up @@ -226,15 +226,15 @@ var _ = Describe("Test full application config", func() {
log, _ := logrus.NoOpLogger()
_ = os.Setenv("PUBSUB_EMULATOR_HOST", "some_url")
_ = os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", "some_service_account_path")
_, _, err := pubsubConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, _, err := pubsubConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log, true)
Expect(err).To(MatchError("pubsub_connect_error pubsub cannot initialize with both emulator and GCP resource"))
})

It("pubsub config works", func() {
log, _ := logrus.NoOpLogger()
_ = os.Setenv("PUBSUB_EMULATOR_HOST", "some_url")
var err error
_, producers, err = pubsubConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, producers, err = pubsubConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log, true)
Expect(err).NotTo(HaveOccurred())
Expect(producers["V"]).NotTo(BeNil())
})
Expand All @@ -253,10 +253,10 @@ var _ = Describe("Test full application config", func() {
log, _ := logrus.NoOpLogger()
config.Records = map[string][]telemetry.Dispatcher{"V": {"zmq"}}
var err error
_, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log, true)
Expect(err).To(MatchError("expected ZMQ to be configured"))
Expect(producers).To(BeNil())
_, producers, err = zmqConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, producers, err = zmqConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log, true)
Expect(err).NotTo(HaveOccurred())
})

Expand All @@ -265,7 +265,7 @@ var _ = Describe("Test full application config", func() {
zmqConfig.ZMQ.Addr = "tcp://127.0.0.1:5285"
log, _ := logrus.NoOpLogger()
var err error
_, producers, err = zmqConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
_, producers, err = zmqConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log, true)
Expect(err).NotTo(HaveOccurred())
Expect(producers["V"]).NotTo(BeNil())
})
Expand Down
34 changes: 20 additions & 14 deletions datastore/googlepubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,32 +79,38 @@ func NewProducer(prometheusEnabled bool, projectID string, namespace string, met
return p, nil
}

// ProvisionTopics invoked at startup to verify all relevant topics are present
func (p *Producer) ProvisionTopics(txTypes []string) error {
ctx := context.Background()
for _, txType := range txTypes {
topicName := telemetry.BuildTopicName(p.namespace, txType)
logInfo := logrus.LogInfo{"topic_name": topicName, "txType": txType}
_, err := p.createTopicIfNotExists(ctx, topicName)
if err != nil {
p.ReportError("pubsub_topic_creation_error", err, logInfo)
metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": txType})
return err
}
p.logger.ActivityLog("pubsub_topic_created", logInfo)
}
return nil
}

// Produce sends the record payload to pubsub
func (p *Producer) Produce(entry *telemetry.Record) {
ctx := context.Background()

topicName := telemetry.BuildTopicName(p.namespace, entry.TxType)
pubsubTopic := p.pubsubClient.Topic(topicName)
logInfo := logrus.LogInfo{"topic_name": topicName, "txid": entry.Txid}
pubsubTopic, err := p.createTopicIfNotExists(ctx, topicName)

if err != nil {
p.ReportError("pubsub_topic_creation_error", err, logInfo)
metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": entry.TxType})
return
}

if exists, err := pubsubTopic.Exists(ctx); !exists || err != nil {
p.ReportError("pubsub_topic_check_error", err, logInfo)
metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": entry.TxType})
return
}

entry.ProduceTime = time.Now()
result := pubsubTopic.Publish(ctx, &pubsub.Message{
Data: entry.Payload(),
Attributes: entry.Metadata(),
})
if _, err = result.Get(ctx); err != nil {

if _, err := result.Get(ctx); err != nil {
p.ReportError("pubsub_err", err, logInfo)
metricsRegistry.errorCount.Inc(map[string]string{"record_type": entry.TxType})
return
Expand Down

0 comments on commit 39c902f

Please sign in to comment.