Skip to content
1 change: 1 addition & 0 deletions include/pulsar/Producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ class PULSAR_PUBLIC Producer {
friend class ClientImpl;
friend class PulsarFriend;
friend class PulsarWrapper;
friend class ProducerImpl;

ProducerImplBasePtr impl_;

Expand Down
5 changes: 5 additions & 0 deletions include/pulsar/ProducerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <pulsar/Message.h>
#include <pulsar/MessageRoutingPolicy.h>
#include <pulsar/ProducerCryptoFailureAction.h>
#include <pulsar/ProducerInterceptor.h>
#include <pulsar/Result.h>
#include <pulsar/Schema.h>
#include <pulsar/defines.h>
Expand Down Expand Up @@ -532,6 +533,10 @@ class PULSAR_PUBLIC ProducerConfiguration {
*/
ProducerAccessMode getAccessMode() const;

ProducerConfiguration& intercept(const std::vector<ProducerInterceptorPtr>& interceptors);

const std::vector<ProducerInterceptorPtr>& getInterceptors() const;

private:
std::shared_ptr<ProducerConfigurationImpl> impl_;

Expand Down
114 changes: 114 additions & 0 deletions include/pulsar/ProducerInterceptor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#ifndef PULSAR_PRODUCER_INTERCEPTOR_H
#define PULSAR_PRODUCER_INTERCEPTOR_H

#include <pulsar/Message.h>
#include <pulsar/Result.h>
#include <pulsar/defines.h>

/**
* An interface that allows you to intercept (and possibly mutate) the
* messages received by the producer before they are published to the Pulsar
* brokers.
*
* <p>Exceptions thrown by ProducerInterceptor methods will be caught, logged, but
* not propagated further.
*
* <p>ProducerInterceptor callbacks may be called from multiple threads. Interceptor
* implementation must ensure thread-safety, if needed.
*/
namespace pulsar {

class Producer;

class PULSAR_PUBLIC ProducerInterceptor {
public:
virtual ~ProducerInterceptor() {}

/**
* Close the interceptor
*/
virtual void close() {}

/**
* This is called from Producer#send and Producer#sendAsync methods, before
* send the message to the brokers. This method is allowed to modify the
* record, in which case, the new record will be returned.
*
* <p>Any exception thrown by this method will be caught by the caller and
* logged, but not propagated further.
*
* <p>Since the producer may run multiple interceptors, a particular
* interceptor's #beforeSend(Producer, Message) callback will be called in the
* order specified by ProducerConfiguration#intercept().
*
* <p>The first interceptor in the list gets the message passed from the client,
* the following interceptor will be passed the message returned by the
* previous interceptor, and so on. Since interceptors are allowed to modify
* messages, interceptors may potentially get the message already modified by
* other interceptors. However, building a pipeline of mutable interceptors
* that depend on the output of the previous interceptor is discouraged,
* because of potential side-effects caused by interceptors potentially
* failing to modify the message and throwing an exception. If one of the
* interceptors in the list throws an exception from beforeSend(Message),
* the exception is caught, logged, and the next interceptor is called with
* the message returned by the last successful interceptor in the list,
* or otherwise the client.
*
* @param producer the producer which contains the interceptor.
* @param message message to send.
* @return the intercepted message.
*/
virtual Message beforeSend(const Producer& producer, const Message& message) = 0;

/**
* This method is called when the message sent to the broker has been
* acknowledged, or when sending the message fails.
* This method is generally called just before the user callback is
* called.
*
* <p>Any exception thrown by this method will be ignored by the caller.
*
* <p>This method will generally execute in the background I/O thread, so the
* implementation should be reasonably fast. Otherwise, sending of messages
* from other threads could be delayed.
*
* @param producer the producer which contains the interceptor.
* @param result the result for sending messages, ResultOk indicates send has succeed.
* @param message the message that application sends.
* @param messageID the message id that assigned by the broker.
*/
virtual void onSendAcknowledgement(const Producer& producer, Result result, const Message& message,
const MessageId& messageID) = 0;

/**
* This method is called when partitions of the topic (partitioned-topic) changes.
*
* @param topicName topic name
* @param partitions new updated partitions
*/
virtual void onPartitionsChange(const std::string& topicName, int partitions) {}
};

typedef std::shared_ptr<ProducerInterceptor> ProducerInterceptorPtr;
} // namespace pulsar

#endif // PULSAR_PRODUCER_INTERCEPTOR_H
10 changes: 7 additions & 3 deletions lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "PartitionedProducerImpl.h"
#include "PatternMultiTopicsConsumerImpl.h"
#include "ProducerImpl.h"
#include "ProducerInterceptors.h"
#include "ReaderImpl.h"
#include "RetryableLookupService.h"
#include "TimeUtils.h"
Expand Down Expand Up @@ -187,11 +188,14 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul
CreateProducerCallback callback) {
if (!result) {
ProducerImplBasePtr producer;

auto interceptors = std::make_shared<ProducerInterceptors>(conf.getInterceptors());

if (partitionMetadata->getPartitions() > 0) {
producer = std::make_shared<PartitionedProducerImpl>(shared_from_this(), topicName,
partitionMetadata->getPartitions(), conf);
producer = std::make_shared<PartitionedProducerImpl>(
shared_from_this(), topicName, partitionMetadata->getPartitions(), conf, interceptors);
} else {
producer = std::make_shared<ProducerImpl>(shared_from_this(), *topicName, conf);
producer = std::make_shared<ProducerImpl>(shared_from_this(), *topicName, conf, interceptors);
}
producer->getProducerCreatedFuture().addListener(
std::bind(&ClientImpl::handleProducerCreated, shared_from_this(), std::placeholders::_1,
Expand Down
11 changes: 8 additions & 3 deletions lib/PartitionedProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ const std::string PartitionedProducerImpl::PARTITION_NAME_SUFFIX = "-partition-"

PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, const TopicNamePtr topicName,
const unsigned int numPartitions,
const ProducerConfiguration& config)
const ProducerConfiguration& config,
const ProducerInterceptorsPtr& interceptors)
: client_(client),
topicName_(topicName),
topic_(topicName_->toString()),
conf_(config),
topicMetadata_(new TopicMetadataImpl(numPartitions)),
flushedPartitions_(0) {
flushedPartitions_(0),
interceptors_(interceptors) {
routerPolicy_ = getMessageRouter();

int maxPendingMessagesPerPartition =
Expand Down Expand Up @@ -93,7 +95,7 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const {
ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition, bool lazy) {
using namespace std::placeholders;
auto client = client_.lock();
auto producer = std::make_shared<ProducerImpl>(client, *topicName_, conf_, partition);
auto producer = std::make_shared<ProducerImpl>(client, *topicName_, conf_, interceptors_, partition);
if (!client) {
return producer;
}
Expand Down Expand Up @@ -227,6 +229,7 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
// override
void PartitionedProducerImpl::shutdown() {
cancelTimers();
interceptors_->close();
auto client = client_.lock();
if (client) {
client->cleanupProducer(this);
Expand Down Expand Up @@ -446,7 +449,9 @@ void PartitionedProducerImpl::handleGetPartitions(Result result,
}
producers_.push_back(producer);
}
producersLock.unlock();
// `runPartitionUpdateTask()` will be called in `handleSinglePartitionProducerCreated()`
interceptors_->onPartitionsChange(getTopic(), newNumPartitions);
return;
}
} else {
Expand Down
5 changes: 4 additions & 1 deletion lib/PartitionedProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "LookupDataResult.h"
#include "ProducerImplBase.h"
#include "ProducerInterceptors.h"

namespace pulsar {

Expand Down Expand Up @@ -59,7 +60,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
typedef std::unique_lock<std::mutex> Lock;

PartitionedProducerImpl(ClientImplPtr ptr, const TopicNamePtr topicName, const unsigned int numPartitions,
const ProducerConfiguration& config);
const ProducerConfiguration& config, const ProducerInterceptorsPtr& interceptors);
virtual ~PartitionedProducerImpl();

// overrided methods from ProducerImplBase
Expand Down Expand Up @@ -130,6 +131,8 @@ class PartitionedProducerImpl : public ProducerImplBase,
boost::posix_time::time_duration partitionsUpdateInterval_;
LookupServicePtr lookupServicePtr_;

ProducerInterceptorsPtr interceptors_;

unsigned int getNumPartitions() const;
unsigned int getNumPartitionsWithLock() const;
ProducerImplPtr newInternalProducer(unsigned int partition, bool lazy);
Expand Down
11 changes: 11 additions & 0 deletions lib/ProducerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
#include <pulsar/ProducerConfiguration.h>

#include <stdexcept>

#include "ProducerConfigurationImpl.h"
Expand Down Expand Up @@ -265,5 +267,14 @@ ProducerConfiguration& ProducerConfiguration::setAccessMode(const ProducerAccess
ProducerConfiguration::ProducerAccessMode ProducerConfiguration::getAccessMode() const {
return impl_->accessMode;
}
ProducerConfiguration& ProducerConfiguration::intercept(
const std::vector<ProducerInterceptorPtr>& interceptors) {
impl_->interceptors.insert(impl_->interceptors.end(), interceptors.begin(), interceptors.end());
return *this;
}

const std::vector<ProducerInterceptorPtr>& ProducerConfiguration::getInterceptors() const {
return impl_->interceptors;
}

} // namespace pulsar
1 change: 1 addition & 0 deletions lib/ProducerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ struct ProducerConfigurationImpl {
bool chunkingEnabled{false};
ProducerConfiguration::ProducerAccessMode accessMode{ProducerConfiguration::Shared};
std::string initialSubscriptionName;
std::vector<ProducerInterceptorPtr> interceptors;
};
} // namespace pulsar

Expand Down
17 changes: 14 additions & 3 deletions lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ struct ProducerImpl::PendingCallbacks {
};

ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
const ProducerConfiguration& conf, int32_t partition)
const ProducerConfiguration& conf, const ProducerInterceptorsPtr& interceptors,
int32_t partition)
: HandlerBase(client, (partition < 0) ? topicName.toString() : topicName.getTopicPartitionName(partition),
Backoff(milliseconds(client->getClientConfig().getInitialBackoffIntervalMs()),
milliseconds(client->getClientConfig().getMaxBackoffIntervalMs()),
Expand All @@ -73,7 +74,8 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
sendTimer_(executor_->getIOService()),
dataKeyRefreshTask_(executor_->getIOService(), 4 * 60 * 60 * 1000),
memoryLimitController_(client->getMemoryLimitController()),
chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent() && !conf_.getBatchingEnabled()) {
chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent() && !conf_.getBatchingEnabled()),
interceptors_(interceptors) {
LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic_
<< " id: " << producerId_);

Expand Down Expand Up @@ -432,10 +434,17 @@ static SharedBuffer applyCompression(const SharedBuffer& uncompressedPayload,
void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
producerStatsBasePtr_->messageSent(msg);

Producer producer = Producer(shared_from_this());
auto interceptorMessage = interceptors_->beforeSend(producer, msg);

const auto now = boost::posix_time::microsec_clock::universal_time();
auto self = shared_from_this();
sendAsyncWithStatsUpdate(msg, [this, self, now, callback](Result result, const MessageId& messageId) {
sendAsyncWithStatsUpdate(interceptorMessage, [this, self, now, callback, producer, interceptorMessage](
Result result, const MessageId& messageId) {
producerStatsBasePtr_->messageReceived(result, now);

interceptors_->onSendAcknowledgement(producer, result, interceptorMessage, messageId);

if (callback) {
callback(result, messageId);
}
Expand Down Expand Up @@ -931,6 +940,7 @@ void ProducerImpl::start() {

void ProducerImpl::shutdown() {
resetCnx();
interceptors_->close();
auto client = client_.lock();
if (client) {
client->cleanupProducer(this);
Expand Down Expand Up @@ -979,3 +989,4 @@ void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) {
ProducerImplWeakPtr ProducerImpl::weak_from_this() noexcept { return shared_from_this(); }

} // namespace pulsar
/* namespace pulsar */
5 changes: 4 additions & 1 deletion lib/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class ProducerImpl : public HandlerBase,
public ProducerImplBase {
public:
ProducerImpl(ClientImplPtr client, const TopicName& topic,
const ProducerConfiguration& producerConfiguration, int32_t partition = -1);
const ProducerConfiguration& producerConfiguration,
const ProducerInterceptorsPtr& interceptors, int32_t partition = -1);
~ProducerImpl();

// overrided methods from ProducerImplBase
Expand Down Expand Up @@ -196,6 +197,8 @@ class ProducerImpl : public HandlerBase,
MemoryLimitController& memoryLimitController_;
const bool chunkingEnabled_;
boost::optional<uint64_t> topicEpoch;

ProducerInterceptorsPtr interceptors_;
};

struct ProducerImplCmp {
Expand Down
4 changes: 4 additions & 0 deletions lib/ProducerImplBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <pulsar/Producer.h>

#include "Future.h"
#include "ProducerInterceptors.h"

namespace pulsar {
class ProducerImplBase;
Expand All @@ -48,6 +49,9 @@ class ProducerImplBase {
virtual void flushAsync(FlushCallback callback) = 0;
virtual bool isConnected() const = 0;
virtual uint64_t getNumberOfConnectedProducer() = 0;

protected:
ProducerInterceptorsPtr interceptors_;
};
} // namespace pulsar
#endif // PULSAR_PRODUCER_IMPL_BASE_HEADER
Loading