Skip to content

Commit

Permalink
create pubsub topic on startup if it doesn't exists
Browse files Browse the repository at this point in the history
  • Loading branch information
agbpatro committed Jan 6, 2025
1 parent 4034cea commit 69e5e04
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 14 deletions.
10 changes: 10 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func (c *Config) prometheusEnabled() bool {

// ConfigureProducers validates and establishes connections to the producers (kafka/pubsub/logger)
func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *logrus.Logger) (map[telemetry.Dispatcher]telemetry.Producer, map[string][]telemetry.Producer, error) {
var pubsubTxTypes []string
reliableAckSources, err := c.configureReliableAckSources()
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -319,6 +320,9 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *l
for recordName, dispatchRules := range c.Records {
var dispatchFuncs []telemetry.Producer
for _, dispatchRule := range dispatchRules {
if dispatchRule == telemetry.Pubsub {
pubsubTxTypes = append(pubsubTxTypes, recordName)
}
dispatchFuncs = append(dispatchFuncs, producers[dispatchRule])
}
dispatchProducerRules[recordName] = dispatchFuncs
Expand All @@ -328,6 +332,12 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.Handler, logger *l
}
}

if len(pubsubTxTypes) > 0 {
if err := producers[telemetry.Pubsub].(*googlepubsub.Producer).ProvisionTopics(pubsubTxTypes); err != nil {
return nil, nil, err
}
}

return producers, dispatchProducerRules, nil
}

Expand Down
33 changes: 19 additions & 14 deletions datastore/googlepubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,32 +79,37 @@ func NewProducer(prometheusEnabled bool, projectID string, namespace string, met
return p, nil
}

func (p *Producer) ProvisionTopics(txTypes []string) error {

Check failure on line 82 in datastore/googlepubsub/publisher.go

View workflow job for this annotation

GitHub Actions / build

exported: exported method Producer.ProvisionTopics should have comment or be unexported (revive)
ctx := context.Background()
for _, txType := range txTypes {
topicName := telemetry.BuildTopicName(p.namespace, txType)
logInfo := logrus.LogInfo{"topic_name": topicName, "txType": txType}
_, err := p.createTopicIfNotExists(ctx, topicName)
if err != nil {
p.ReportError("pubsub_topic_creation_error", err, logInfo)
metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": txType})
return err
}
p.logger.ActivityLog("pubsub_topic_created", logInfo)
}
return nil
}

// Produce sends the record payload to pubsub
func (p *Producer) Produce(entry *telemetry.Record) {
ctx := context.Background()

topicName := telemetry.BuildTopicName(p.namespace, entry.TxType)
pubsubTopic := p.pubsubClient.Topic(topicName)
logInfo := logrus.LogInfo{"topic_name": topicName, "txid": entry.Txid}
pubsubTopic, err := p.createTopicIfNotExists(ctx, topicName)

if err != nil {
p.ReportError("pubsub_topic_creation_error", err, logInfo)
metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": entry.TxType})
return
}

if exists, err := pubsubTopic.Exists(ctx); !exists || err != nil {
p.ReportError("pubsub_topic_check_error", err, logInfo)
metricsRegistry.notConnectedTotal.Inc(map[string]string{"record_type": entry.TxType})
return
}

entry.ProduceTime = time.Now()
result := pubsubTopic.Publish(ctx, &pubsub.Message{
Data: entry.Payload(),
Attributes: entry.Metadata(),
})
if _, err = result.Get(ctx); err != nil {

if _, err := result.Get(ctx); err != nil {
p.ReportError("pubsub_err", err, logInfo)
metricsRegistry.errorCount.Inc(map[string]string{"record_type": entry.TxType})
return
Expand Down

0 comments on commit 69e5e04

Please sign in to comment.