Skip to content

Commit

Permalink
move reply to request message type
Browse files Browse the repository at this point in the history
Signed-off-by: Mike Mason <[email protected]>
  • Loading branch information
mikemrm committed Aug 4, 2023
1 parent bdca5e2 commit b75f397
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 45 deletions.
2 changes: 1 addition & 1 deletion events/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Publisher interface {
// AuthRelationshipSubscriber specifies the auth relationship subscriber methods.
type AuthRelationshipSubscriber interface {
// SubscribeAuthRelationshipRequests subscribes to the provided topic responding with an AuthRelationshipRequest message.
SubscribeAuthRelationshipRequests(ctx context.Context, topic string) (<-chan Message[AuthRelationshipRequest], error)
SubscribeAuthRelationshipRequests(ctx context.Context, topic string) (<-chan Request[AuthRelationshipRequest, AuthRelationshipResponse], error)
}

// AuthRelationshipPublisher specifies the auth relationship publisher methods.
Expand Down
12 changes: 8 additions & 4 deletions events/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,18 @@ type Message[T any] interface {
// Error returns any error encountered while decoding the message
Error() error

// ReplyAuthRelationshipRequest publishes an AuthRelationshipResponse message.
// An error is returned if the message is not an AuthRelationshipRequest.
ReplyAuthRelationshipRequest(ctx context.Context, message AuthRelationshipResponse) (Message[AuthRelationshipResponse], error)

// Source returns the underlying message object.
Source() any
}

// Request extends Message by allowing replies to be sent for the received message.
type Request[TRequest, TResponse any] interface {
Message[TRequest]

// Reply publishes a response to the received message.
Reply(ctx context.Context, message TResponse) (Message[TResponse], error)
}

// ChangeType represents the possible event types for a ChangeMessage
type ChangeType string

Expand Down
107 changes: 72 additions & 35 deletions events/nats_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,30 @@ func natsSubscriptionMessageChan[T any](ctx context.Context, conn *NATSConnectio
return msgCh
}

func natsSubscriptionAuthRelationshipRequestChan(ctx context.Context, conn *NATSConnection, batchSize int, natsCh <-chan *nats.Msg) chan Request[AuthRelationshipRequest, AuthRelationshipResponse] {
msgCh := make(chan Request[AuthRelationshipRequest, AuthRelationshipResponse], batchSize)

go func() {
defer close(msgCh)

for nMsg := range natsCh {
msg := natsDecodeMessage[AuthRelationshipRequest](conn, nMsg)

req := &NATSAuthRelationshipRequest{
NATSMessage: msg.(*NATSMessage[AuthRelationshipRequest]),
}

select {
case msgCh <- req:
case <-ctx.Done():
return
}
}
}()

return msgCh
}

func natsDecodeMessage[T any](conn *NATSConnection, nMsg *nats.Msg) Message[T] {
msg := &NATSMessage[T]{
conn: conn,
Expand Down Expand Up @@ -129,13 +153,49 @@ func (m *NATSMessage[T]) Error() error {
return nil
}

// ReplyAuthRelationshipRequest responds to an AuthRelationshipRequest with an AuthRelationshipResponse
func (m *NATSMessage[T]) ReplyAuthRelationshipRequest(ctx context.Context, message AuthRelationshipResponse) (Message[AuthRelationshipResponse], error) {
ctx, span := m.conn.tracer.Start(ctx, "events.ReplyAuthRelationshipRequest")
// Source returns the underlying nats message.
func (m *NATSMessage[T]) Source() any {
return m.source
}

defer span.End()
func (m *NATSMessage[T]) publish() error {
return m.conn.conn.PublishMsg(m.source)
}

func (m *NATSMessage[T]) request(ctx context.Context) (Message[AuthRelationshipResponse], error) {
if m.source.Reply == "" {
m.source.Reply = m.conn.conn.NewRespInbox()
}

nMsg, err := m.conn.conn.RequestMsgWithContext(ctx, m.source)
if err != nil {
// ensure we wrap no responder errors with ErrRequestNoResponders.
if errors.Is(err, nats.ErrNoResponders) {
return nil, fmt.Errorf("%w: %w", ErrRequestNoResponders, err)
}

return nil, err
}

respMsg := natsDecodeMessage[AuthRelationshipResponse](m.conn, nMsg)

return respMsg, nil
}

var _ Request[AuthRelationshipRequest, AuthRelationshipResponse] = (*NATSAuthRelationshipRequest)(nil)

// NATSAuthRelationshipRequest implements Request for AuthRelationshipRequest / AuthRelationshipResponse
type NATSAuthRelationshipRequest struct {
*NATSMessage[AuthRelationshipRequest]
}

// Reply responds to an AuthRelationshipRequest with an AuthRelationshipResponse.
func (r *NATSAuthRelationshipRequest) Reply(ctx context.Context, message AuthRelationshipResponse) (Message[AuthRelationshipResponse], error) {
ctx, span := r.conn.tracer.Start(ctx, "events.Reply")

defer span.End()

if r.source.Reply == "" {
span.RecordError(ErrNATSMessageNoReplySubject)
span.SetStatus(codes.Error, ErrNATSMessageNoReplySubject.Error())

Expand All @@ -156,43 +216,20 @@ func (m *NATSMessage[T]) ReplyAuthRelationshipRequest(ctx context.Context, messa

message.TraceContext = mapCarrier

respMsg, err := newNATSMessage(m.conn, m.source.Reply, message)
respMsg, err := newNATSMessage(r.conn, r.source.Reply, message)
if err != nil {
return nil, err
}

if err := m.source.RespondMsg(respMsg.source); err != nil {
return respMsg, err
}

return respMsg, nil
}

// Source returns the underlying nats message.
func (m *NATSMessage[T]) Source() any {
return m.source
}

func (m *NATSMessage[T]) publish() error {
return m.conn.conn.PublishMsg(m.source)
}
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

func (m *NATSMessage[T]) request(ctx context.Context) (Message[AuthRelationshipResponse], error) {
if m.source.Reply == "" {
m.source.Reply = m.conn.conn.NewRespInbox()
return nil, err
}

nMsg, err := m.conn.conn.RequestMsgWithContext(ctx, m.source)
if err != nil {
// ensure we wrap no responder errors with ErrRequestNoResponders.
if errors.Is(err, nats.ErrNoResponders) {
return nil, fmt.Errorf("%w: %w", ErrRequestNoResponders, err)
}
if err := r.source.RespondMsg(respMsg.source); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

return nil, err
return respMsg, err
}

respMsg := natsDecodeMessage[AuthRelationshipResponse](m.conn, nMsg)

return respMsg, nil
}
4 changes: 2 additions & 2 deletions events/nats_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (c *NATSConnection) nextMessage(ctx context.Context, sub *nats.Subscription
}

// SubscribeAuthRelationshipRequests creates a new pull subscription parsing incoming messages as AuthRelationshipRequest messages and returning a new Message channel.
func (c *NATSConnection) SubscribeAuthRelationshipRequests(ctx context.Context, topic string) (<-chan Message[AuthRelationshipRequest], error) {
func (c *NATSConnection) SubscribeAuthRelationshipRequests(ctx context.Context, topic string) (<-chan Request[AuthRelationshipRequest, AuthRelationshipResponse], error) {
topic = c.buildSubscribeSubject("auth", "relationships", topic)

natsCh, err := c.coreSubscribe(ctx, topic)
Expand All @@ -148,7 +148,7 @@ func (c *NATSConnection) SubscribeAuthRelationshipRequests(ctx context.Context,

c.logger.Debugf("subscribing to auth relation request message on topic %s", topic)

return natsSubscriptionMessageChan[AuthRelationshipRequest](ctx, c, c.cfg.SubscriberFetchBatchSize, natsCh), nil
return natsSubscriptionAuthRelationshipRequestChan(ctx, c, c.cfg.SubscriberFetchBatchSize, natsCh), nil
}

// SubscribeChanges creates a new pull subscription parsing incoming messages as ChangeMessage messages and returning a new Message channel.
Expand Down
2 changes: 1 addition & 1 deletion events/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func TestNATSRequestReply(t *testing.T) {

reqGot <- reqMsg

respMsg, err := reqMsg.ReplyAuthRelationshipRequest(ctx, authResponse)
respMsg, err := reqMsg.Reply(ctx, authResponse)
assert.NoError(t, err)
assert.NotNil(t, respMsg)
case <-time.After(time.Second * 2):
Expand Down
4 changes: 2 additions & 2 deletions testing/eventtools/mock_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ func (c *MockConnection) Source() any {
}

// SubscribeAuthRelationshipRequests implements events.Connection
func (c *MockConnection) SubscribeAuthRelationshipRequests(_ context.Context, topic string) (<-chan events.Message[events.AuthRelationshipRequest], error) {
func (c *MockConnection) SubscribeAuthRelationshipRequests(_ context.Context, topic string) (<-chan events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse], error) {
args := c.Called(topic)

return args.Get(0).(<-chan events.Message[events.AuthRelationshipRequest]), args.Error(1)
return args.Get(0).(<-chan events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse]), args.Error(1)
}

// SubscribeChanges implements events.Connection
Expand Down

0 comments on commit b75f397

Please sign in to comment.