Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
Small code changes
Browse files Browse the repository at this point in the history
Signed-off-by: Francesco Guardiani <[email protected]>
  • Loading branch information
slinkydeveloper committed Mar 19, 2020
1 parent 9fc5704 commit 364660f
Showing 1 changed file with 2 additions and 4 deletions.
6 changes: 2 additions & 4 deletions kafka/channel/pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,6 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
handler: args.Handler,
topicFunc: args.TopicFunc,
}
dispatcher.setConfig(&multichannelfanout.Config{})
dispatcher.setHostToChannelMap(map[string]eventingchannels.ChannelReference{})
dispatcher.topicFunc = args.TopicFunc

receiverFunc, err := eventingchannels.NewMessageReceiver(
func(ctx context.Context, channel eventingchannels.ChannelReference, message binding.Message, transformers []binding.TransformerFactory, _ nethttp.Header) error {
kafkaProducerMessage := sarama.ProducerMessage{
Expand All @@ -117,6 +113,8 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
}

dispatcher.receiver = receiverFunc
dispatcher.setConfig(&multichannelfanout.Config{})
dispatcher.setHostToChannelMap(map[string]eventingchannels.ChannelReference{})
return dispatcher, nil
}

Expand Down

0 comments on commit 364660f

Please sign in to comment.