diff --git a/include/pulsar/Producer.h b/include/pulsar/Producer.h index 955d9858..51d29808 100644 --- a/include/pulsar/Producer.h +++ b/include/pulsar/Producer.h @@ -166,6 +166,7 @@ class PULSAR_PUBLIC Producer { friend class ClientImpl; friend class PulsarFriend; friend class PulsarWrapper; + friend class ProducerImpl; ProducerImplBasePtr impl_; diff --git a/include/pulsar/ProducerConfiguration.h b/include/pulsar/ProducerConfiguration.h index 67550cfa..e0c824c9 100644 --- a/include/pulsar/ProducerConfiguration.h +++ b/include/pulsar/ProducerConfiguration.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -532,6 +533,10 @@ class PULSAR_PUBLIC ProducerConfiguration { */ ProducerAccessMode getAccessMode() const; + ProducerConfiguration& intercept(const std::vector& interceptors); + + const std::vector& getInterceptors() const; + private: std::shared_ptr impl_; diff --git a/include/pulsar/ProducerInterceptor.h b/include/pulsar/ProducerInterceptor.h new file mode 100644 index 00000000..45f55b52 --- /dev/null +++ b/include/pulsar/ProducerInterceptor.h @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef PULSAR_PRODUCER_INTERCEPTOR_H +#define PULSAR_PRODUCER_INTERCEPTOR_H + +#include +#include +#include + +/** + * An interface that allows you to intercept (and possibly mutate) the + * messages received by the producer before they are published to the Pulsar + * brokers. + * + *

Exceptions thrown by ProducerInterceptor methods will be caught, logged, but + * not propagated further. + * + *

ProducerInterceptor callbacks may be called from multiple threads. Interceptor + * implementation must ensure thread-safety, if needed. + */ +namespace pulsar { + +class Producer; + +class PULSAR_PUBLIC ProducerInterceptor { + public: + virtual ~ProducerInterceptor() {} + + /** + * Close the interceptor + */ + virtual void close() {} + + /** + * This is called from Producer#send and Producer#sendAsync methods, before + * send the message to the brokers. This method is allowed to modify the + * record, in which case, the new record will be returned. + * + *

Any exception thrown by this method will be caught by the caller and + * logged, but not propagated further. + * + *

Since the producer may run multiple interceptors, a particular + * interceptor's #beforeSend(Producer, Message) callback will be called in the + * order specified by ProducerConfiguration#intercept(). + * + *

The first interceptor in the list gets the message passed from the client, + * the following interceptor will be passed the message returned by the + * previous interceptor, and so on. Since interceptors are allowed to modify + * messages, interceptors may potentially get the message already modified by + * other interceptors. However, building a pipeline of mutable interceptors + * that depend on the output of the previous interceptor is discouraged, + * because of potential side-effects caused by interceptors potentially + * failing to modify the message and throwing an exception. If one of the + * interceptors in the list throws an exception from beforeSend(Message), + * the exception is caught, logged, and the next interceptor is called with + * the message returned by the last successful interceptor in the list, + * or otherwise the client. + * + * @param producer the producer which contains the interceptor. + * @param message message to send. + * @return the intercepted message. + */ + virtual Message beforeSend(const Producer& producer, const Message& message) = 0; + + /** + * This method is called when the message sent to the broker has been + * acknowledged, or when sending the message fails. + * This method is generally called just before the user callback is + * called. + * + *

Any exception thrown by this method will be ignored by the caller. + * + *

This method will generally execute in the background I/O thread, so the + * implementation should be reasonably fast. Otherwise, sending of messages + * from other threads could be delayed. + * + * @param producer the producer which contains the interceptor. + * @param result the result for sending messages, ResultOk indicates send has succeed. + * @param message the message that application sends. + * @param messageID the message id that assigned by the broker. + */ + virtual void onSendAcknowledgement(const Producer& producer, Result result, const Message& message, + const MessageId& messageID) = 0; + + /** + * This method is called when partitions of the topic (partitioned-topic) changes. + * + * @param topicName topic name + * @param partitions new updated partitions + */ + virtual void onPartitionsChange(const std::string& topicName, int partitions) {} +}; + +typedef std::shared_ptr ProducerInterceptorPtr; +} // namespace pulsar + +#endif // PULSAR_PRODUCER_INTERCEPTOR_H diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index 7575b283..e0092775 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -33,6 +33,7 @@ #include "PartitionedProducerImpl.h" #include "PatternMultiTopicsConsumerImpl.h" #include "ProducerImpl.h" +#include "ProducerInterceptors.h" #include "ReaderImpl.h" #include "RetryableLookupService.h" #include "TimeUtils.h" @@ -187,11 +188,14 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul CreateProducerCallback callback) { if (!result) { ProducerImplBasePtr producer; + + auto interceptors = std::make_shared(conf.getInterceptors()); + if (partitionMetadata->getPartitions() > 0) { - producer = std::make_shared(shared_from_this(), topicName, - partitionMetadata->getPartitions(), conf); + producer = std::make_shared( + shared_from_this(), topicName, partitionMetadata->getPartitions(), conf, interceptors); } else { - producer = std::make_shared(shared_from_this(), *topicName, conf); + producer = std::make_shared(shared_from_this(), *topicName, conf, interceptors); } producer->getProducerCreatedFuture().addListener( std::bind(&ClientImpl::handleProducerCreated, shared_from_this(), std::placeholders::_1, diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc index a4b53107..e442ce90 100644 --- a/lib/PartitionedProducerImpl.cc +++ b/lib/PartitionedProducerImpl.cc @@ -38,13 +38,15 @@ const std::string PartitionedProducerImpl::PARTITION_NAME_SUFFIX = "-partition-" PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, const TopicNamePtr topicName, const unsigned int numPartitions, - const ProducerConfiguration& config) + const ProducerConfiguration& config, + const ProducerInterceptorsPtr& interceptors) : client_(client), topicName_(topicName), topic_(topicName_->toString()), conf_(config), topicMetadata_(new TopicMetadataImpl(numPartitions)), - flushedPartitions_(0) { + flushedPartitions_(0), + interceptors_(interceptors) { routerPolicy_ = getMessageRouter(); int maxPendingMessagesPerPartition = @@ -93,7 +95,7 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const { ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition, bool lazy) { using namespace std::placeholders; auto client = client_.lock(); - auto producer = std::make_shared(client, *topicName_, conf_, partition); + auto producer = std::make_shared(client, *topicName_, conf_, interceptors_, partition); if (!client) { return producer; } @@ -227,6 +229,7 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac // override void PartitionedProducerImpl::shutdown() { cancelTimers(); + interceptors_->close(); auto client = client_.lock(); if (client) { client->cleanupProducer(this); @@ -446,7 +449,9 @@ void PartitionedProducerImpl::handleGetPartitions(Result result, } producers_.push_back(producer); } + producersLock.unlock(); // `runPartitionUpdateTask()` will be called in `handleSinglePartitionProducerCreated()` + interceptors_->onPartitionsChange(getTopic(), newNumPartitions); return; } } else { diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h index b9a4b01a..25ba9c33 100644 --- a/lib/PartitionedProducerImpl.h +++ b/lib/PartitionedProducerImpl.h @@ -27,6 +27,7 @@ #include "LookupDataResult.h" #include "ProducerImplBase.h" +#include "ProducerInterceptors.h" namespace pulsar { @@ -59,7 +60,7 @@ class PartitionedProducerImpl : public ProducerImplBase, typedef std::unique_lock Lock; PartitionedProducerImpl(ClientImplPtr ptr, const TopicNamePtr topicName, const unsigned int numPartitions, - const ProducerConfiguration& config); + const ProducerConfiguration& config, const ProducerInterceptorsPtr& interceptors); virtual ~PartitionedProducerImpl(); // overrided methods from ProducerImplBase @@ -130,6 +131,8 @@ class PartitionedProducerImpl : public ProducerImplBase, boost::posix_time::time_duration partitionsUpdateInterval_; LookupServicePtr lookupServicePtr_; + ProducerInterceptorsPtr interceptors_; + unsigned int getNumPartitions() const; unsigned int getNumPartitionsWithLock() const; ProducerImplPtr newInternalProducer(unsigned int partition, bool lazy); diff --git a/lib/ProducerConfiguration.cc b/lib/ProducerConfiguration.cc index 67e8b102..0e141ae1 100644 --- a/lib/ProducerConfiguration.cc +++ b/lib/ProducerConfiguration.cc @@ -16,6 +16,8 @@ * specific language governing permissions and limitations * under the License. */ +#include + #include #include "ProducerConfigurationImpl.h" @@ -265,5 +267,14 @@ ProducerConfiguration& ProducerConfiguration::setAccessMode(const ProducerAccess ProducerConfiguration::ProducerAccessMode ProducerConfiguration::getAccessMode() const { return impl_->accessMode; } +ProducerConfiguration& ProducerConfiguration::intercept( + const std::vector& interceptors) { + impl_->interceptors.insert(impl_->interceptors.end(), interceptors.begin(), interceptors.end()); + return *this; +} + +const std::vector& ProducerConfiguration::getInterceptors() const { + return impl_->interceptors; +} } // namespace pulsar diff --git a/lib/ProducerConfigurationImpl.h b/lib/ProducerConfigurationImpl.h index 4c8fcdb1..c635c48f 100644 --- a/lib/ProducerConfigurationImpl.h +++ b/lib/ProducerConfigurationImpl.h @@ -51,6 +51,7 @@ struct ProducerConfigurationImpl { bool chunkingEnabled{false}; ProducerConfiguration::ProducerAccessMode accessMode{ProducerConfiguration::Shared}; std::string initialSubscriptionName; + std::vector interceptors; }; } // namespace pulsar diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 6514b3b8..78a51155 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -55,7 +55,8 @@ struct ProducerImpl::PendingCallbacks { }; ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName, - const ProducerConfiguration& conf, int32_t partition) + const ProducerConfiguration& conf, const ProducerInterceptorsPtr& interceptors, + int32_t partition) : HandlerBase(client, (partition < 0) ? topicName.toString() : topicName.getTopicPartitionName(partition), Backoff(milliseconds(client->getClientConfig().getInitialBackoffIntervalMs()), milliseconds(client->getClientConfig().getMaxBackoffIntervalMs()), @@ -73,7 +74,8 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName, sendTimer_(executor_->getIOService()), dataKeyRefreshTask_(executor_->getIOService(), 4 * 60 * 60 * 1000), memoryLimitController_(client->getMemoryLimitController()), - chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent() && !conf_.getBatchingEnabled()) { + chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent() && !conf_.getBatchingEnabled()), + interceptors_(interceptors) { LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic_ << " id: " << producerId_); @@ -432,10 +434,17 @@ static SharedBuffer applyCompression(const SharedBuffer& uncompressedPayload, void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) { producerStatsBasePtr_->messageSent(msg); + Producer producer = Producer(shared_from_this()); + auto interceptorMessage = interceptors_->beforeSend(producer, msg); + const auto now = boost::posix_time::microsec_clock::universal_time(); auto self = shared_from_this(); - sendAsyncWithStatsUpdate(msg, [this, self, now, callback](Result result, const MessageId& messageId) { + sendAsyncWithStatsUpdate(interceptorMessage, [this, self, now, callback, producer, interceptorMessage]( + Result result, const MessageId& messageId) { producerStatsBasePtr_->messageReceived(result, now); + + interceptors_->onSendAcknowledgement(producer, result, interceptorMessage, messageId); + if (callback) { callback(result, messageId); } @@ -931,6 +940,7 @@ void ProducerImpl::start() { void ProducerImpl::shutdown() { resetCnx(); + interceptors_->close(); auto client = client_.lock(); if (client) { client->cleanupProducer(this); @@ -979,3 +989,4 @@ void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) { ProducerImplWeakPtr ProducerImpl::weak_from_this() noexcept { return shared_from_this(); } } // namespace pulsar +/* namespace pulsar */ diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h index 928fca5e..b0c9f7cc 100644 --- a/lib/ProducerImpl.h +++ b/lib/ProducerImpl.h @@ -64,7 +64,8 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase { public: ProducerImpl(ClientImplPtr client, const TopicName& topic, - const ProducerConfiguration& producerConfiguration, int32_t partition = -1); + const ProducerConfiguration& producerConfiguration, + const ProducerInterceptorsPtr& interceptors, int32_t partition = -1); ~ProducerImpl(); // overrided methods from ProducerImplBase @@ -196,6 +197,8 @@ class ProducerImpl : public HandlerBase, MemoryLimitController& memoryLimitController_; const bool chunkingEnabled_; boost::optional topicEpoch; + + ProducerInterceptorsPtr interceptors_; }; struct ProducerImplCmp { diff --git a/lib/ProducerImplBase.h b/lib/ProducerImplBase.h index 0b1622c4..95488d85 100644 --- a/lib/ProducerImplBase.h +++ b/lib/ProducerImplBase.h @@ -22,6 +22,7 @@ #include #include "Future.h" +#include "ProducerInterceptors.h" namespace pulsar { class ProducerImplBase; @@ -48,6 +49,9 @@ class ProducerImplBase { virtual void flushAsync(FlushCallback callback) = 0; virtual bool isConnected() const = 0; virtual uint64_t getNumberOfConnectedProducer() = 0; + + protected: + ProducerInterceptorsPtr interceptors_; }; } // namespace pulsar #endif // PULSAR_PRODUCER_IMPL_BASE_HEADER diff --git a/lib/ProducerInterceptors.cc b/lib/ProducerInterceptors.cc new file mode 100644 index 00000000..ea35f97e --- /dev/null +++ b/lib/ProducerInterceptors.cc @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "ProducerInterceptors.h" + +#include + +#include "LogUtils.h" + +DECLARE_LOG_OBJECT() + +namespace pulsar { +void ProducerInterceptors::onPartitionsChange(const std::string& topicName, int partitions) const { + for (const ProducerInterceptorPtr& interceptor : interceptors_) { + try { + interceptor->onPartitionsChange(topicName, partitions); + } catch (const std::exception& e) { + LOG_WARN("Error executing interceptor onPartitionsChange callback for topicName: " + << topicName << ", exception: " << e.what()); + } + } +} + +Message ProducerInterceptors::beforeSend(const Producer& producer, const Message& message) { + if (interceptors_.empty()) { + return message; + } + + Message interceptorMessage = message; + for (const ProducerInterceptorPtr& interceptor : interceptors_) { + try { + interceptorMessage = interceptor->beforeSend(producer, interceptorMessage); + } catch (const std::exception& e) { + LOG_WARN("Error executing interceptor beforeSend callback for topicName: " + << producer.getTopic() << ", exception: " << e.what()); + } + } + return interceptorMessage; +} + +void ProducerInterceptors::onSendAcknowledgement(const Producer& producer, Result result, + const Message& message, const MessageId& messageID) { + for (const ProducerInterceptorPtr& interceptor : interceptors_) { + try { + interceptor->onSendAcknowledgement(producer, result, message, messageID); + } catch (const std::exception& e) { + LOG_WARN("Error executing interceptor onSendAcknowledgement callback for topicName: " + << producer.getTopic() << ", exception: " << e.what()); + } + } +} + +void ProducerInterceptors::close() { + State state = Ready; + if (!state_.compare_exchange_strong(state, Closing)) { + return; + } + for (const ProducerInterceptorPtr& interceptor : interceptors_) { + try { + interceptor->close(); + } catch (const std::exception& e) { + LOG_WARN("Failed to close producer interceptor: " << e.what()); + } + } + state_ = Closed; +} + +} // namespace pulsar diff --git a/lib/ProducerInterceptors.h b/lib/ProducerInterceptors.h new file mode 100644 index 00000000..f83394f1 --- /dev/null +++ b/lib/ProducerInterceptors.h @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include +#include +#include + +namespace pulsar { + +class ProducerInterceptors { + public: + explicit ProducerInterceptors(std::vector interceptors) + : interceptors_(std::move(interceptors)) {} + + void onPartitionsChange(const std::string& topicName, int partitions) const; + + Message beforeSend(const Producer& producer, const Message& message); + + void onSendAcknowledgement(const Producer& producer, Result result, const Message& message, + const MessageId& messageID); + + void close(); + + private: + enum State + { + Ready, + Closing, + Closed + }; + std::vector interceptors_; + std::atomic state_{Ready}; +}; + +typedef std::shared_ptr ProducerInterceptorsPtr; +} // namespace pulsar diff --git a/tests/InterceptorsTest.cc b/tests/InterceptorsTest.cc new file mode 100644 index 00000000..ef645c25 --- /dev/null +++ b/tests/InterceptorsTest.cc @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include +#include + +#include + +#include "HttpHelper.h" +#include "Latch.h" + +static const std::string serviceUrl = "pulsar://localhost:6650"; +static const std::string adminUrl = "http://localhost:8080/"; + +using namespace pulsar; + +class TestInterceptor : public ProducerInterceptor { + public: + TestInterceptor(Latch& latch, Latch& closeLatch) : latch_(latch), closeLatch_(closeLatch) {} + + Message beforeSend(const Producer& producer, const Message& message) override { + return MessageBuilder().setProperty("key", "set").setContent(message.getDataAsString()).build(); + } + + void onSendAcknowledgement(const Producer& producer, Result result, const Message& message, + const MessageId& messageID) override { + ASSERT_EQ(result, ResultOk); + auto properties = message.getProperties(); + ASSERT_TRUE(properties.find("key") != properties.end() && properties["key"] == "set"); + latch_.countdown(); + } + + void close() override { closeLatch_.countdown(); } + + private: + Latch latch_; + Latch closeLatch_; +}; + +class ExceptionInterceptor : public ProducerInterceptor { + public: + explicit ExceptionInterceptor(Latch& latch) : latch_(latch) {} + + Message beforeSend(const Producer& producer, const Message& message) override { + latch_.countdown(); + throw std::runtime_error("expected exception"); + } + + void onSendAcknowledgement(const Producer& producer, Result result, const Message& message, + const MessageId& messageID) override { + latch_.countdown(); + throw std::runtime_error("expected exception"); + } + + void close() override { + latch_.countdown(); + throw std::runtime_error("expected exception"); + } + + private: + Latch latch_; +}; + +class PartitionsChangeInterceptor : public ProducerInterceptor { + public: + explicit PartitionsChangeInterceptor(Latch& latch) : latch_(latch) {} + + Message beforeSend(const Producer& producer, const Message& message) override { return message; } + + void onSendAcknowledgement(const Producer& producer, Result result, const Message& message, + const MessageId& messageID) override {} + + void onPartitionsChange(const std::string& topicName, int partitions) override { + ASSERT_EQ(partitions, 3); + latch_.countdown(); + } + + private: + Latch latch_; +}; + +void createPartitionedTopic(std::string topic) { + std::string topicOperateUrl = adminUrl + "admin/v2/persistent/public/default/" + topic + "/partitions"; + + int res = makePutRequest(topicOperateUrl, "2"); + ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; +} + +class InterceptorsTest : public ::testing::TestWithParam {}; + +TEST_P(InterceptorsTest, testProducerInterceptor) { + const std::string topic = "InterceptorsTest-testProducerInterceptor-" + std::to_string(time(nullptr)); + + if (GetParam()) { + createPartitionedTopic(topic); + } + + Latch latch(1); + Latch closeLatch(1); + + Client client(serviceUrl); + ProducerConfiguration conf; + conf.intercept({std::make_shared(latch, closeLatch)}); + Producer producer; + client.createProducer(topic, conf, producer); + + Message msg = MessageBuilder().setContent("content").build(); + Result result = producer.send(msg); + ASSERT_EQ(result, ResultOk); + + ASSERT_TRUE(latch.wait(std::chrono::seconds(5))); + + producer.close(); + ASSERT_TRUE(closeLatch.wait(std::chrono::seconds(5))); + client.close(); +} + +TEST_P(InterceptorsTest, testProducerInterceptorWithException) { + const std::string topic = + "InterceptorsTest-testProducerInterceptorWithException-" + std::to_string(time(nullptr)); + + if (GetParam()) { + createPartitionedTopic(topic); + } + + Latch latch(3); + + Client client(serviceUrl); + ProducerConfiguration conf; + conf.intercept({std::make_shared(latch)}); + Producer producer; + client.createProducer(topic, conf, producer); + + Message msg = MessageBuilder().setContent("content").build(); + Result result = producer.send(msg); + ASSERT_EQ(result, ResultOk); + + producer.close(); + ASSERT_TRUE(latch.wait(std::chrono::seconds(5))); + client.close(); +} + +TEST(InterceptorsTest, testProducerInterceptorOnPartitionsChange) { + const std::string topic = "public/default/InterceptorsTest-testProducerInterceptorOnPartitionsChange-" + + std::to_string(time(nullptr)); + std::string topicOperateUrl = adminUrl + "admin/v2/persistent/" + topic + "/partitions"; + + int res = makePutRequest(topicOperateUrl, "2"); + ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; + + Latch latch(1); + + ClientConfiguration clientConf; + clientConf.setPartititionsUpdateInterval(1); + Client client(serviceUrl, clientConf); + ProducerConfiguration conf; + conf.intercept({std::make_shared(latch)}); + Producer producer; + client.createProducer(topic, conf, producer); + + res = makePostRequest(topicOperateUrl, "3"); // update partitions to 3 + ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; + + ASSERT_TRUE(latch.wait(std::chrono::seconds(5))); + + producer.close(); + client.close(); +} + +INSTANTIATE_TEST_CASE_P(Pulsar, InterceptorsTest, ::testing::Values(true, false));