diff --git a/internal/pubsub/subscriber.go b/internal/pubsub/subscriber.go index bc9afa7ae..68779fd42 100644 --- a/internal/pubsub/subscriber.go +++ b/internal/pubsub/subscriber.go @@ -29,7 +29,7 @@ var ( // Subscriber is the subscriber client type Subscriber struct { ctx context.Context - changeChannels []<-chan events.Message[events.AuthRelationshipRequest] + changeChannels []<-chan events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse] logger *zap.SugaredLogger subscriber events.AuthRelationshipSubscriber qe query.Engine @@ -90,7 +90,7 @@ func (s Subscriber) Listen() error { } // listen listens for messages on a channel and calls the registered message handler -func (s Subscriber) listen(messages <-chan events.Message[events.AuthRelationshipRequest], wg *sync.WaitGroup) { +func (s Subscriber) listen(messages <-chan events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse], wg *sync.WaitGroup) { defer wg.Done() for msg := range messages { @@ -114,7 +114,7 @@ func (s Subscriber) listen(messages <-chan events.Message[events.AuthRelationshi } // processEvent event message handler -func (s *Subscriber) processEvent(msg events.Message[events.AuthRelationshipRequest]) error { +func (s *Subscriber) processEvent(msg events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse]) error { elogger := s.logger.With( "event.message.topic", msg.Topic(), "event.message.action", msg.Message().Action, @@ -175,7 +175,7 @@ func (s *Subscriber) deleteRelationships(ctx context.Context, relationships []ty return nil } -func (s *Subscriber) handleCreateEvent(ctx context.Context, msg events.Message[events.AuthRelationshipRequest]) error { +func (s *Subscriber) handleCreateEvent(ctx context.Context, msg events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse]) error { elogger := s.logger.With( "event.message.topic", msg.Topic(), "event.message.action", msg.Message().Action, @@ -244,7 +244,7 @@ func (s *Subscriber) handleCreateEvent(ctx context.Context, msg events.Message[e return nil } -func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg events.Message[events.AuthRelationshipRequest]) error { +func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse]) error { elogger := s.logger.With( "event.message.topic", msg.Topic(), "event.message.action", msg.Message().Action, @@ -311,7 +311,7 @@ func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg events.Message[e return nil } -func respondRequest(ctx context.Context, logger *zap.SugaredLogger, msg events.Message[events.AuthRelationshipRequest], errors ...error) { +func respondRequest(ctx context.Context, logger *zap.SugaredLogger, msg events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse], errors ...error) { var filteredErrors []error for _, err := range errors { @@ -332,7 +332,7 @@ func respondRequest(ctx context.Context, logger *zap.SugaredLogger, msg events.M logger.Debug("relationship successfully processed, sending response") } - _, err := msg.ReplyAuthRelationshipRequest(ctx, response) + _, err := msg.Reply(ctx, response) if err != nil { logger.Errorw("error sending response", "error", err) }