Skip to content

Commit

Permalink
update to use Request message type
Browse files Browse the repository at this point in the history
Signed-off-by: Mike Mason <[email protected]>
  • Loading branch information
mikemrm committed Aug 2, 2023
1 parent a18303b commit d6c75c9
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions internal/pubsub/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var (
// Subscriber is the subscriber client
type Subscriber struct {
ctx context.Context
changeChannels []<-chan events.Message[events.AuthRelationshipRequest]
changeChannels []<-chan events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse]
logger *zap.SugaredLogger
subscriber events.AuthRelationshipSubscriber
qe query.Engine
Expand Down Expand Up @@ -90,7 +90,7 @@ func (s Subscriber) Listen() error {
}

// listen listens for messages on a channel and calls the registered message handler
func (s Subscriber) listen(messages <-chan events.Message[events.AuthRelationshipRequest], wg *sync.WaitGroup) {
func (s Subscriber) listen(messages <-chan events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse], wg *sync.WaitGroup) {
defer wg.Done()

for msg := range messages {
Expand All @@ -114,7 +114,7 @@ func (s Subscriber) listen(messages <-chan events.Message[events.AuthRelationshi
}

// processEvent event message handler
func (s *Subscriber) processEvent(msg events.Message[events.AuthRelationshipRequest]) error {
func (s *Subscriber) processEvent(msg events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse]) error {
elogger := s.logger.With(
"event.message.topic", msg.Topic(),
"event.message.action", msg.Message().Action,
Expand Down Expand Up @@ -175,7 +175,7 @@ func (s *Subscriber) deleteRelationships(ctx context.Context, relationships []ty
return nil
}

func (s *Subscriber) handleCreateEvent(ctx context.Context, msg events.Message[events.AuthRelationshipRequest]) error {
func (s *Subscriber) handleCreateEvent(ctx context.Context, msg events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse]) error {
elogger := s.logger.With(
"event.message.topic", msg.Topic(),
"event.message.action", msg.Message().Action,
Expand Down Expand Up @@ -244,7 +244,7 @@ func (s *Subscriber) handleCreateEvent(ctx context.Context, msg events.Message[e
return nil
}

func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg events.Message[events.AuthRelationshipRequest]) error {
func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse]) error {
elogger := s.logger.With(
"event.message.topic", msg.Topic(),
"event.message.action", msg.Message().Action,
Expand Down Expand Up @@ -311,7 +311,7 @@ func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg events.Message[e
return nil
}

func respondRequest(ctx context.Context, logger *zap.SugaredLogger, msg events.Message[events.AuthRelationshipRequest], errors ...error) {
func respondRequest(ctx context.Context, logger *zap.SugaredLogger, msg events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse], errors ...error) {
var filteredErrors []error

for _, err := range errors {
Expand All @@ -332,7 +332,7 @@ func respondRequest(ctx context.Context, logger *zap.SugaredLogger, msg events.M
logger.Debug("relationship successfully processed, sending response")
}

_, err := msg.ReplyAuthRelationshipRequest(ctx, response)
_, err := msg.Reply(ctx, response)
if err != nil {
logger.Errorw("error sending response", "error", err)
}
Expand Down

0 comments on commit d6c75c9

Please sign in to comment.