diff --git a/pkg/channel/fanout/fanout_handler_binding.go b/pkg/channel/fanout/fanout_message_handler.go similarity index 94% rename from pkg/channel/fanout/fanout_handler_binding.go rename to pkg/channel/fanout/fanout_message_handler.go index 065fcbda365..744c822dca8 100644 --- a/pkg/channel/fanout/fanout_handler_binding.go +++ b/pkg/channel/fanout/fanout_message_handler.go @@ -92,14 +92,15 @@ func (f *MessageHandler) ServeHTTP(response nethttp.ResponseWriter, request *net // dispatch takes the event, fans it out to each subscription in f.config. If all the fanned out // events return successfully, then return nil. Else, return an error. -func (f *MessageHandler) dispatch(ctx context.Context, message binding.Message, transformers []binding.TransformerFactory, additionalHeaders nethttp.Header) error { +func (f *MessageHandler) dispatch(ctx context.Context, originalMessage binding.Message, transformers []binding.TransformerFactory, additionalHeaders nethttp.Header) error { // We buffer the message and bind the lifecycle with all fanout requests acks from other messages are received subs := len(f.config.Subscriptions) - var err error - message, err = buffering.BufferMessage(ctx, message, transformers) + message, err := buffering.BufferMessage(ctx, originalMessage, transformers) if err != nil { return err } + _ = originalMessage.Finish(nil) + message = buffering.WithAcksBeforeFinish(message, subs) errorCh := make(chan error, subs) diff --git a/pkg/channel/fanout/fanout_handler_binding_test.go b/pkg/channel/fanout/fanout_message_handler_test.go similarity index 100% rename from pkg/channel/fanout/fanout_handler_binding_test.go rename to pkg/channel/fanout/fanout_message_handler_test.go diff --git a/pkg/channel/multichannelfanout/multi_channel_fanout_handler_binding.go b/pkg/channel/multichannelfanout/multi_channel_fanout_message_handler.go similarity index 100% rename from pkg/channel/multichannelfanout/multi_channel_fanout_handler_binding.go rename to pkg/channel/multichannelfanout/multi_channel_fanout_message_handler.go diff --git a/pkg/channel/multichannelfanout/multi_channel_fanout_handler_binding_test.go b/pkg/channel/multichannelfanout/multi_channel_fanout_message_handler_test.go similarity index 100% rename from pkg/channel/multichannelfanout/multi_channel_fanout_handler_binding_test.go rename to pkg/channel/multichannelfanout/multi_channel_fanout_message_handler_test.go diff --git a/pkg/channel/swappable/swappable_binding.go b/pkg/channel/swappable/swappable_message_handler.go similarity index 100% rename from pkg/channel/swappable/swappable_binding.go rename to pkg/channel/swappable/swappable_message_handler.go diff --git a/pkg/channel/swappable/swappable_binding_test.go b/pkg/channel/swappable/swappable_message_handler_test.go similarity index 100% rename from pkg/channel/swappable/swappable_binding_test.go rename to pkg/channel/swappable/swappable_message_handler_test.go