Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
Small code changes
Browse files Browse the repository at this point in the history
Signed-off-by: Francesco Guardiani <[email protected]>
  • Loading branch information
slinkydeveloper committed Mar 19, 2020
1 parent 6880d14 commit 9fc5704
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions kafka/channel/pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,14 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
handler: args.Handler,
topicFunc: args.TopicFunc,
}
dispatcher.setConfig(&multichannelfanout.Config{})
dispatcher.setHostToChannelMap(map[string]eventingchannels.ChannelReference{})
dispatcher.topicFunc = args.TopicFunc

receiverFunc, err := eventingchannels.NewMessageReceiver(
func(ctx context.Context, channel eventingchannels.ChannelReference, message binding.Message, transformers []binding.TransformerFactory, _ nethttp.Header) error {
kafkaProducerMessage := sarama.ProducerMessage{
Topic: args.TopicFunc(utils.KafkaChannelSeparator, channel.Namespace, channel.Name),
Topic: dispatcher.topicFunc(utils.KafkaChannelSeparator, channel.Namespace, channel.Name),
}

err := protocolkafka.WriteProducerMessage(ctx, message, &kafkaProducerMessage, transformers...)
Expand All @@ -113,9 +117,6 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
}

dispatcher.receiver = receiverFunc
dispatcher.setConfig(&multichannelfanout.Config{})
dispatcher.setHostToChannelMap(map[string]eventingchannels.ChannelReference{})
dispatcher.topicFunc = args.TopicFunc
return dispatcher, nil
}

Expand All @@ -135,12 +136,12 @@ type consumerMessageHandler struct {
dispatcher *eventingchannels.MessageDispatcherImpl
}

func (c consumerMessageHandler) Handle(ctx context.Context, message *sarama.ConsumerMessage) (bool, error) {
m := protocolkafka.NewMessageFromConsumerMessage(message)
if m.ReadEncoding() == binding.EncodingUnknown {
func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sarama.ConsumerMessage) (bool, error) {
message := protocolkafka.NewMessageFromConsumerMessage(consumerMessage)
if message.ReadEncoding() == binding.EncodingUnknown {
return false, errors.New("received a message with unknown encoding")
}
err := c.dispatcher.DispatchMessageWithDelivery(ctx, m, nil, c.sub.SubscriberURI, c.sub.ReplyURI, &c.sub.Delivery)
err := c.dispatcher.DispatchMessageWithDelivery(ctx, message, nil, c.sub.SubscriberURI, c.sub.ReplyURI, &c.sub.Delivery)
// NOTE: only return `true` here if DispatchEventWithDelivery actually delivered the message.
return err == nil, err
}
Expand Down

0 comments on commit 9fc5704

Please sign in to comment.