Skip to content
4 changes: 4 additions & 0 deletions include/pulsar/Producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
#include <pulsar/defines.h>
#include <stdint.h>

#include <functional>
#include <memory>

namespace pulsar {
class ProducerImplBase;
class PulsarWrapper;
class PulsarFriend;

typedef std::function<void(Result, const MessageId& messageId)> SendCallback;
Comment thread
RobertIndie marked this conversation as resolved.
Outdated
typedef std::function<void(Result)> CloseCallback;
Comment thread
RobertIndie marked this conversation as resolved.
Outdated
typedef std::function<void(Result)> FlushCallback;
typedef std::shared_ptr<ProducerImplBase> ProducerImplBasePtr;

Expand Down Expand Up @@ -166,6 +169,7 @@ class PULSAR_PUBLIC Producer {
friend class ClientImpl;
friend class PulsarFriend;
friend class PulsarWrapper;
friend class ProducerImpl;

ProducerImplBasePtr impl_;

Expand Down
5 changes: 5 additions & 0 deletions include/pulsar/ProducerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <pulsar/Message.h>
#include <pulsar/MessageRoutingPolicy.h>
#include <pulsar/ProducerCryptoFailureAction.h>
#include <pulsar/ProducerInterceptor.h>
#include <pulsar/Result.h>
#include <pulsar/Schema.h>
#include <pulsar/defines.h>
Expand Down Expand Up @@ -532,6 +533,10 @@ class PULSAR_PUBLIC ProducerConfiguration {
*/
ProducerAccessMode getAccessMode() const;

ProducerConfiguration& intercept(const std::initializer_list<ProducerInterceptorPtr> interceptors);
Comment thread
RobertIndie marked this conversation as resolved.
Outdated
Comment thread
RobertIndie marked this conversation as resolved.
Outdated

std::vector<ProducerInterceptorPtr> getInterceptors() const;
Comment thread
RobertIndie marked this conversation as resolved.
Outdated

friend class PulsarWrapper;

private:
Expand Down
114 changes: 114 additions & 0 deletions include/pulsar/ProducerInterceptor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#ifndef PULSAR_PRODUCER_INTERCEPTOR_H
#define PULSAR_PRODUCER_INTERCEPTOR_H

#include <pulsar/Message.h>
#include <pulsar/Producer.h>
Comment thread
RobertIndie marked this conversation as resolved.
Outdated
#include <pulsar/defines.h>

/**
* An interface that allows you to intercept (and possibly mutate) the
* messages received by the producer before they are published to the Pulsar
* brokers.
*
* <p>Exceptions thrown by ProducerInterceptor methods will be caught, logged, but
* not propagated further.
*
* <p>ProducerInterceptor callbacks may be called from multiple threads. Interceptor
* implementation must ensure thread-safety, if needed.
*/
namespace pulsar {

class Producer;

class PULSAR_PUBLIC ProducerInterceptor {
public:
virtual ~ProducerInterceptor() {}

/**
* Close the interceptor
*/
virtual void close() {}
Comment thread
RobertIndie marked this conversation as resolved.

/**
* This is called from Producer#send and Producer#sendAsync methods, before
* send the message to the brokers. This method is allowed to modify the
* record, in which case, the new record will be returned.
*
* <p>Any exception thrown by this method will be caught by the caller and
* logged, but not propagated further.
*
* <p>Since the producer may run multiple interceptors, a particular
* interceptor's #beforeSend(Producer, Message) callback will be called in the
* order specified by ProducerConfiguration#intercept().
*
* <p>The first interceptor in the list gets the message passed from the client,
* the following interceptor will be passed the message returned by the
* previous interceptor, and so on. Since interceptors are allowed to modify
* messages, interceptors may potentially get the message already modified by
* other interceptors. However, building a pipeline of mutable interceptors
* that depend on the output of the previous interceptor is discouraged,
* because of potential side-effects caused by interceptors potentially
* failing to modify the message and throwing an exception. If one of the
* interceptors in the list throws an exception from beforeSend(Message),
* the exception is caught, logged, and the next interceptor is called with
* the message returned by the last successful interceptor in the list,
* or otherwise the client.
*
* @param producer the producer which contains the interceptor.
* @param message message to send.
* @return the intercepted message.
*/
virtual Message beforeSend(const Producer& producer, const Message& message) = 0;

/**
* This method is called when the message sent to the broker has been
* acknowledged, or when sending the message fails.
* This method is generally called just before the user callback is
* called.
*
* <p>Any exception thrown by this method will be ignored by the caller.
*
* <p>This method will generally execute in the background I/O thread, so the
* implementation should be reasonably fast. Otherwise, sending of messages
* from other threads could be delayed.
*
* @param producer the producer which contains the interceptor.
* @param result the result for sending messages, ResultOk indicates send has succeed.
* @param message the message that application sends.
* @param messageID the message id that assigned by the broker.
*/
virtual void onSendAcknowledgement(const Producer& producer, Result result, const Message& message,
const MessageId& messageID) = 0;

/**
* This method is called when partitions of the topic (partitioned-topic) changes.
*
* @param topicName topic name
* @param partitions new updated partitions
*/
virtual void onPartitionsChange(const std::string& topicName, const int partitions) {}
Comment thread
RobertIndie marked this conversation as resolved.
Outdated
};

typedef std::shared_ptr<ProducerInterceptor> ProducerInterceptorPtr;
} // namespace pulsar

#endif // PULSAR_PRODUCER_INTERCEPTOR_H
14 changes: 14 additions & 0 deletions lib/PartitionedProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, const Top
partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval);
lookupServicePtr_ = client->getLookup();
}

interceptors_ = conf_.getInterceptors();
}

MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() {
Expand Down Expand Up @@ -443,6 +445,7 @@ void PartitionedProducerImpl::handleGetPartitions(Result result,
producers_.push_back(producer);
}
// `runPartitionUpdateTask()` will be called in `handleSinglePartitionProducerCreated()`
onPartitionsChange(getTopic(), newNumPartitions);
Comment thread
RobertIndie marked this conversation as resolved.
Outdated
return;
}
} else {
Expand Down Expand Up @@ -488,4 +491,15 @@ void PartitionedProducerImpl::cancelTimers() noexcept {
}
}

void PartitionedProducerImpl::onPartitionsChange(const std::string& topicName, const int partitions) const {
for (const ProducerInterceptorPtr& interceptor : interceptors_) {
try {
interceptor->onPartitionsChange(topicName, partitions);
} catch (const std::exception& e) {
LOG_WARN("Error executing interceptor onPartitionsChange callback for topicName: "
<< topicName << ", exception: " << e.what());
}
}
}

} // namespace pulsar
1 change: 1 addition & 0 deletions lib/PartitionedProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
void getPartitionMetadata();
void handleGetPartitions(const Result result, const LookupDataResultPtr& partitionMetadata);
void cancelTimers() noexcept;
void onPartitionsChange(const std::string& topicName, const int partitions) const;
};

} // namespace pulsar
13 changes: 13 additions & 0 deletions lib/ProducerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
#include <pulsar/ProducerConfiguration.h>

#include <stdexcept>

#include "ProducerConfigurationImpl.h"
Expand Down Expand Up @@ -265,5 +267,16 @@ ProducerConfiguration& ProducerConfiguration::setAccessMode(const ProducerAccess
ProducerConfiguration::ProducerAccessMode ProducerConfiguration::getAccessMode() const {
return impl_->accessMode;
}
ProducerConfiguration& ProducerConfiguration::intercept(
const std::initializer_list<ProducerInterceptorPtr> interceptors) {
for (const ProducerInterceptorPtr& interceptor : interceptors) {
impl_->interceptors.push_back(interceptor);
}
return *this;
}

std::vector<ProducerInterceptorPtr> ProducerConfiguration::getInterceptors() const {
return impl_->interceptors;
}

} // namespace pulsar
1 change: 1 addition & 0 deletions lib/ProducerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ struct ProducerConfigurationImpl {
std::map<std::string, std::string> properties;
bool chunkingEnabled{false};
ProducerConfiguration::ProducerAccessMode accessMode{ProducerConfiguration::Shared};
std::vector<ProducerInterceptorPtr> interceptors;
};
} // namespace pulsar

Expand Down
39 changes: 38 additions & 1 deletion lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
lastSequenceIdPublished_ = initialSequenceId;
msgSequenceGenerator_ = initialSequenceId + 1;

interceptors_ = conf.getInterceptors();

if (!producerName_.empty()) {
userProvidedProducerName_ = true;
}
Expand Down Expand Up @@ -429,10 +431,17 @@ static SharedBuffer applyCompression(const SharedBuffer& uncompressedPayload,
void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
producerStatsBasePtr_->messageSent(msg);

Producer producer = Producer(shared_from_this());
auto interceptorMessage = beforeSend(producer, msg);

const auto now = boost::posix_time::microsec_clock::universal_time();
auto self = shared_from_this();
sendAsyncWithStatsUpdate(msg, [this, self, now, callback](Result result, const MessageId& messageId) {
sendAsyncWithStatsUpdate(interceptorMessage, [this, self, now, callback, producer, interceptorMessage](
Result result, const MessageId& messageId) {
producerStatsBasePtr_->messageReceived(result, now);

onSendAcknowledgement(producer, result, interceptorMessage, messageId);

if (callback) {
callback(result, messageId);
}
Expand Down Expand Up @@ -975,5 +984,33 @@ void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) {

ProducerImplWeakPtr ProducerImpl::weak_from_this() noexcept { return shared_from_this(); }

Message ProducerImpl::beforeSend(const Producer& producer, const Message& message) const {
if (interceptors_.size() == 0) {
Comment thread
RobertIndie marked this conversation as resolved.
Outdated
return message;
}

Message interceptorMessage = message;
for (const ProducerInterceptorPtr& interceptor : interceptors_) {
try {
interceptorMessage = interceptor->beforeSend(producer, interceptorMessage);
} catch (const std::exception& e) {
LOG_WARN("Error executing interceptor beforeSend callback for topicName: "
<< producer.getTopic() << ", exception: " << e.what());
}
}
return interceptorMessage;
}
void ProducerImpl::onSendAcknowledgement(const Producer& producer, Result result, const Message& message,
const MessageId& messageID) const {
for (const ProducerInterceptorPtr& interceptor : interceptors_) {
try {
interceptor->onSendAcknowledgement(producer, result, message, messageID);
} catch (const std::exception& e) {
LOG_WARN("Error executing interceptor onSendAcknowledgement callback for topicName: "
<< producer.getTopic() << ", exception: " << e.what());
}
}
}

} // namespace pulsar
/* namespace pulsar */
5 changes: 5 additions & 0 deletions lib/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ class ProducerImpl : public HandlerBase,
bool isValidProducerState(const SendCallback& callback) const;
bool canAddToBatch(const Message& msg) const;

Message beforeSend(const Producer& producer, const Message& message) const;

void onSendAcknowledgement(const Producer& producer, Result result, const Message& message,
const MessageId& messageID) const;

typedef std::unique_lock<std::mutex> Lock;

ProducerConfiguration conf_;
Expand Down
3 changes: 3 additions & 0 deletions lib/ProducerImplBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ class ProducerImplBase {
virtual void flushAsync(FlushCallback callback) = 0;
virtual bool isConnected() const = 0;
virtual uint64_t getNumberOfConnectedProducer() = 0;

protected:
std::vector<ProducerInterceptorPtr> interceptors_;
};
} // namespace pulsar
#endif // PULSAR_PRODUCER_IMPL_BASE_HEADER
Loading