Skip to content

Commit

Permalink
create pubsub topic on startup if it doesn't exists
Browse files Browse the repository at this point in the history
  • Loading branch information
agbpatro committed Jan 6, 2025
1 parent 4034cea commit a3a0934
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 14 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ Dispatchers handle vehicle data processing upon its arrival at Fleet Telemetry s
* Configure stream names directly by setting the streams config `"kinesis": { "streams": { *topic_name*: stream_name } }`
* Override stream names with env variables: KINESIS_STREAM_\*uppercase topic\* ex.: `KINESIS_STREAM_V`
* Google pubsub: Along with the required pubsub config (See ./test/integration/config.json for example), be sure to set the environment variable `GOOGLE_APPLICATION_CREDENTIALS`
* Fleet Telemetry server will try to create topics on startup. If it topic doesn't exists or creation of topic fails, server will fail to startup and crash.
* ZMQ: Configure with the config.json file. See implementation here: [config/config.go](./config/config.go)
* Logger: This is a simple STDOUT logger that serializes the protos to json.

Expand Down
10 changes: 10 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func (c *Config) prometheusEnabled() bool {

// ConfigureProducers validates and establishes connections to the producers (kafka/pubsub/logger)
func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *logrus.Logger) (map[telemetry.Dispatcher]telemetry.Producer, map[string][]telemetry.Producer, error) {
var pubsubTxTypes []string
reliableAckSources, err := c.configureReliableAckSources()
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -319,6 +320,9 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *l
for recordName, dispatchRules := range c.Records {
var dispatchFuncs []telemetry.Producer
for _, dispatchRule := range dispatchRules {
if dispatchRule == telemetry.Pubsub {
pubsubTxTypes = append(pubsubTxTypes, recordName)
}
dispatchFuncs = append(dispatchFuncs, producers[dispatchRule])
}
dispatchProducerRules[recordName] = dispatchFuncs
Expand All @@ -328,6 +332,12 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *l
}
}

if len(pubsubTxTypes) > 0 {
if err := producers[telemetry.Pubsub].(*googlepubsub.Producer).ProvisionTopics(pubsubTxTypes); err != nil {
return nil, nil, err
}
}

return producers, dispatchProducerRules, nil
}

Expand Down
33 changes: 19 additions & 14 deletions datastore/googlepubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,32 +79,37 @@ func NewProducer(prometheusEnabled bool, projectID string, namespace string, met
return p, nil
}

func (p *Producer) ProvisionTopics(txTypes []string) error {

Check failure on line 82 in datastore/googlepubsub/publisher.go

View workflow job for this annotation

GitHub Actions / build

exported: exported method Producer.ProvisionTopics should have comment or be unexported (revive)
ctx := context.Background()
for _, txType := range txTypes {
topicName := telemetry.BuildTopicName(p.namespace, txType)
logInfo := logrus.LogInfo{"topic_name": topicName, "txType": txType}
_, err := p.createTopicIfNotExists(ctx, topicName)
if err != nil {
p.ReportError("pubsub_topic_creation_error", err, logInfo)
metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": txType})
return err
}
p.logger.ActivityLog("pubsub_topic_created", logInfo)
}
return nil
}

// Produce sends the record payload to pubsub
func (p *Producer) Produce(entry *telemetry.Record) {
ctx := context.Background()

topicName := telemetry.BuildTopicName(p.namespace, entry.TxType)
pubsubTopic := p.pubsubClient.Topic(topicName)
logInfo := logrus.LogInfo{"topic_name": topicName, "txid": entry.Txid}
pubsubTopic, err := p.createTopicIfNotExists(ctx, topicName)

if err != nil {
p.ReportError("pubsub_topic_creation_error", err, logInfo)
metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": entry.TxType})
return
}

if exists, err := pubsubTopic.Exists(ctx); !exists || err != nil {
p.ReportError("pubsub_topic_check_error", err, logInfo)
metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": entry.TxType})
return
}

entry.ProduceTime = time.Now()
result := pubsubTopic.Publish(ctx, &pubsub.Message{
Data: entry.Payload(),
Attributes: entry.Metadata(),
})
if _, err = result.Get(ctx); err != nil {

if _, err := result.Get(ctx); err != nil {
p.ReportError("pubsub_err", err, logInfo)
metricsRegistry.errorCount.Inc(map[string]string{"record_type": entry.TxType})
return
Expand Down

0 comments on commit a3a0934

Please sign in to comment.