Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/ci-pr-validation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ jobs:
- name: Run unit tests
run: RETRY_FAILED=3 CMAKE_BUILD_DIRECTORY=./build ./run-unit-tests.sh

- name: Build with Boost.Asio
run: |
cmake -B build-boost-asio -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON
cmake --build build-boost-asio -j8

- name: Build perf tools
run: |
cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON -DBUILD_PERF_TOOLS=ON
Expand Down
10 changes: 7 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@

cmake_minimum_required(VERSION 3.13)

option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)

option(INTEGRATE_VCPKG "Integrate with Vcpkg" OFF)
if (INTEGRATE_VCPKG)
set(USE_ASIO ON)
option(USE_ASIO "Use Asio instead of Boost.Asio" ON)
if (NOT CMAKE_TOOLCHAIN_FILE)
set(CMAKE_TOOLCHAIN_FILE "${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake")
endif ()
if (NOT USE_ASIO)
list(APPEND VCPKG_MANIFEST_FEATURES "boost-asio")
endif ()
else ()
option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)
endif ()
message(STATUS "USE_ASIO: ${USE_ASIO}")

option(BUILD_TESTS "Build tests" ON)
message(STATUS "BUILD_TESTS: " ${BUILD_TESTS})
Expand Down
5 changes: 2 additions & 3 deletions lib/AckGroupingTrackerEnabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ AckGroupingTrackerEnabled::~AckGroupingTrackerEnabled() {
this->flush();
std::lock_guard<std::mutex> lock(this->mutexTimer_);
if (this->timer_) {
ASIO_ERROR ec;
this->timer_->cancel(ec);
cancelTimer(*this->timer_);
}
}

Expand Down Expand Up @@ -172,7 +171,7 @@ void AckGroupingTrackerEnabled::scheduleTimer() {

std::lock_guard<std::mutex> lock(this->mutexTimer_);
this->timer_ = this->executor_->createDeadlineTimer();
this->timer_->expires_from_now(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
this->timer_->expires_after(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
std::weak_ptr<AckGroupingTracker> weakSelf = shared_from_this();
this->timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void {
auto self = weakSelf.lock();
Expand Down
9 changes: 9 additions & 0 deletions lib/AsioTimer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,12 @@
#include "AsioDefines.h"

using DeadlineTimerPtr = std::shared_ptr<ASIO::steady_timer>;

inline void cancelTimer(ASIO::steady_timer& timer) {
try {
timer.cancel();
} catch (const ASIO_SYSTEM_ERROR& ignored) {
// Most of the time the exception can be ignored unless the following logic depends on the fact that
// the timer is cancelled.
}
}
118 changes: 43 additions & 75 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@
#include "auth/InitialAuthData.h"
#include "checksum/ChecksumProvider.h"

#ifdef USE_ASIO
#include <asio/connect.hpp>
#include <asio/ssl/host_name_verification.hpp>
#else
#include <boost/asio/connect.hpp>
#include <boost/asio/ssl/host_name_verification.hpp>
#endif

DECLARE_LOG_OBJECT()

using namespace ASIO::ip;
Expand Down Expand Up @@ -170,13 +178,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
executor_(executor),
resolver_(executor_->createTcpResolver()),
socket_(executor_->createSocket()),
#if defined(USE_ASIO) || BOOST_VERSION >= 107000
strand_(ASIO::make_strand(executor_->getIOService().get_executor())),
#elif BOOST_VERSION >= 106600
strand_(executor_->getIOService().get_executor()),
#else
strand_(executor_->getIOService()),
#endif
logicalAddress_(logicalAddress),
physicalAddress_(physicalAddress),
cnxString_("[<none> -> " + physicalAddress + "] "),
Expand Down Expand Up @@ -266,7 +268,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
if (!clientConfiguration.isTlsAllowInsecureConnection() && clientConfiguration.isValidateHostName()) {
LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":" << serviceUrl.port());
std::string urlHost = isSniProxy_ ? proxyUrl.host() : serviceUrl.host();
tlsSocket_->set_verify_callback(ASIO::ssl::rfc2818_verification(urlHost));
tlsSocket_->set_verify_callback(ASIO::ssl::host_name_verification(urlHost));
}

LOG_DEBUG("TLS SNI Host: " << serviceUrl.host());
Expand Down Expand Up @@ -309,7 +311,7 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC
// Only send keep-alive probes if the broker supports it
keepAliveTimer_ = executor_->createDeadlineTimer();
if (keepAliveTimer_) {
keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_));
keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
auto weakSelf = weak_from_this();
keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
auto self = weakSelf.lock();
Expand Down Expand Up @@ -354,7 +356,7 @@ void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
// If the close operation has reset the consumerStatsRequestTimer_ then the use_count will be zero
// Check if we have a timer still before we set the request timer to pop again.
if (consumerStatsRequestTimer_) {
consumerStatsRequestTimer_->expires_from_now(operationsTimeout_);
consumerStatsRequestTimer_->expires_after(operationsTimeout_);
auto weakSelf = weak_from_this();
consumerStatsRequestTimer_->async_wait([weakSelf, consumerStatsRequests](const ASIO_ERROR& err) {
auto self = weakSelf.lock();
Expand Down Expand Up @@ -394,7 +396,7 @@ typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPIDLE> tcp_keep
* if async_connect without any error, connected_ would be set to true
* at this point the connection is deemed valid to be used by clients of this class
*/
void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) {
void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, const tcp::endpoint& endpoint) {
if (!err) {
std::stringstream cnxStringStream;
try {
Expand Down Expand Up @@ -479,38 +481,13 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::
} else {
handleHandshake(ASIO_SUCCESS);
}
} else if (endpointIterator != tcp::resolver::iterator()) {
LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message());
// The connection failed. Try the next endpoint in the list.
ASIO_ERROR closeError;
socket_->close(closeError); // ignore the error of close
if (closeError) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
}
connectTimeoutTask_->stop();
++endpointIterator;
if (endpointIterator != tcp::resolver::iterator()) {
LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
connectTimeoutTask_->start();
tcp::endpoint endpoint = *endpointIterator;
auto weakSelf = weak_from_this();
socket_->async_connect(endpoint, [weakSelf, endpointIterator](const ASIO_ERROR& err) {
auto self = weakSelf.lock();
if (self) {
self->handleTcpConnected(err, endpointIterator);
}
});
} else {
if (err == ASIO::error::operation_aborted) {
// TCP connect timeout, which is not retryable
close();
} else {
close(ResultRetryable);
}
}
} else {
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
close(ResultRetryable);
if (err == ASIO::error::operation_aborted) {
close();
} else {
close(ResultRetryable);
}
}
}

Expand Down Expand Up @@ -603,18 +580,18 @@ void ClientConnection::tcpConnectAsync() {
}

LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << service_url.port());
tcp::resolver::query query(service_url.host(), std::to_string(service_url.port()));

auto weakSelf = weak_from_this();
resolver_->async_resolve(query,
[weakSelf](const ASIO_ERROR& err, const tcp::resolver::iterator& iterator) {
resolver_->async_resolve(service_url.host(), std::to_string(service_url.port()),
[weakSelf](const ASIO_ERROR& err, tcp::resolver::results_type results) {
auto self = weakSelf.lock();
if (self) {
self->handleResolve(err, iterator);
self->handleResolve(err, results);
}
});
}

void ClientConnection::handleResolve(const ASIO_ERROR& err, const tcp::resolver::iterator& endpointIterator) {
void ClientConnection::handleResolve(ASIO_ERROR err, const tcp::resolver::results_type& results) {
if (err) {
std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message());
Expand All @@ -641,23 +618,13 @@ void ClientConnection::handleResolve(const ASIO_ERROR& err, const tcp::resolver:
}
ptr->connectTimeoutTask_->stop();
});

LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
connectTimeoutTask_->start();
if (endpointIterator != tcp::resolver::iterator()) {
LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() //
<< " to " << endpointIterator->endpoint());
socket_->async_connect(*endpointIterator, [weakSelf, endpointIterator](const ASIO_ERROR& err) {
auto self = weakSelf.lock();
if (self) {
self->handleTcpConnected(err, endpointIterator);
}
});
} else {
LOG_WARN(cnxString_ << "No IP address found");
close();
return;
}
ASIO::async_connect(*socket_, results, [weakSelf](const ASIO_ERROR& err, const tcp::endpoint& endpoint) {
auto self = weakSelf.lock();
if (self) {
self->handleTcpConnected(err, endpoint);
}
});
}

void ClientConnection::readNextCommand() {
Expand Down Expand Up @@ -1061,7 +1028,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId,
LookupRequestData requestData;
requestData.promise = promise;
requestData.timer = executor_->createDeadlineTimer();
requestData.timer->expires_from_now(operationsTimeout_);
requestData.timer->expires_after(operationsTimeout_);
auto weakSelf = weak_from_this();
requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
Expand Down Expand Up @@ -1201,7 +1168,7 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(const SharedBuf

PendingRequestData requestData;
requestData.timer = executor_->createDeadlineTimer();
requestData.timer->expires_from_now(operationsTimeout_);
requestData.timer->expires_after(operationsTimeout_);
auto weakSelf = weak_from_this();
requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
Expand Down Expand Up @@ -1256,7 +1223,7 @@ void ClientConnection::handleKeepAliveTimeout() {
// be zero And we do not attempt to dereference the pointer.
Lock lock(mutex_);
if (keepAliveTimer_) {
keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_));
keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
auto weakSelf = weak_from_this();
keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
auto self = weakSelf.lock();
Expand Down Expand Up @@ -1318,12 +1285,12 @@ void ClientConnection::close(Result result, bool detach) {
numOfPendingLookupRequest_ = 0;

if (keepAliveTimer_) {
keepAliveTimer_->cancel();
cancelTimer(*keepAliveTimer_);
keepAliveTimer_.reset();
}

if (consumerStatsRequestTimer_) {
consumerStatsRequestTimer_->cancel();
cancelTimer(*consumerStatsRequestTimer_);
consumerStatsRequestTimer_.reset();
}

Expand Down Expand Up @@ -1435,7 +1402,7 @@ Future<Result, GetLastMessageIdResponse> ClientConnection::newGetLastMessageId(u
LastMessageIdRequestData requestData;
requestData.promise = promise;
requestData.timer = executor_->createDeadlineTimer();
requestData.timer->expires_from_now(operationsTimeout_);
requestData.timer->expires_after(operationsTimeout_);
auto weakSelf = weak_from_this();
requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
Expand Down Expand Up @@ -1483,7 +1450,7 @@ Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& top
lock.unlock();

auto weakSelf = weak_from_this();
timer->expires_from_now(operationsTimeout_);
timer->expires_after(operationsTimeout_);
timer->async_wait([this, weakSelf, requestId](const ASIO_ERROR& ec) {
auto self = weakSelf.lock();
if (!self) {
Expand Down Expand Up @@ -1570,7 +1537,7 @@ void ClientConnection::handleSuccess(const proto::CommandSuccess& success) {
lock.unlock();

requestData.promise.setValue({});
requestData.timer->cancel();
cancelTimer(*requestData.timer);
}
}

Expand All @@ -1582,7 +1549,8 @@ void ClientConnection::handlePartitionedMetadataResponse(
Lock lock(mutex_);
auto it = pendingLookupRequests_.find(partitionMetadataResponse.request_id());
if (it != pendingLookupRequests_.end()) {
it->second.timer->cancel();
cancelTimer(*it->second.timer);

LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
pendingLookupRequests_.erase(it);
numOfPendingLookupRequest_--;
Expand Down Expand Up @@ -1661,7 +1629,7 @@ void ClientConnection::handleLookupTopicRespose(
Lock lock(mutex_);
auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id());
if (it != pendingLookupRequests_.end()) {
it->second.timer->cancel();
cancelTimer(*it->second.timer);
LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
pendingLookupRequests_.erase(it);
numOfPendingLookupRequest_--;
Expand Down Expand Up @@ -1739,7 +1707,7 @@ void ClientConnection::handleProducerSuccess(const proto::CommandProducerSuccess
data.topicEpoch = boost::none;
}
requestData.promise.setValue(data);
requestData.timer->cancel();
cancelTimer(*requestData.timer);
}
}
}
Expand All @@ -1759,7 +1727,7 @@ void ClientConnection::handleError(const proto::CommandError& error) {
lock.unlock();

requestData.promise.setFailed(result);
requestData.timer->cancel();
cancelTimer(*requestData.timer);
} else {
PendingGetLastMessageIdRequestsMap::iterator it =
pendingGetLastMessageIdRequests_.find(error.request_id());
Expand Down Expand Up @@ -2052,8 +2020,8 @@ void ClientConnection::unsafeRemovePendingRequest(long requestId) {
auto it = pendingRequests_.find(requestId);
if (it != pendingRequests_.end()) {
it->second.promise.setFailed(ResultDisconnected);
ASIO_ERROR ec;
it->second.timer->cancel(ec);
cancelTimer(*it->second.timer);

pendingRequests_.erase(it);
}
}
Expand Down
10 changes: 5 additions & 5 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
#include <cstdint>
#ifdef USE_ASIO
#include <asio/bind_executor.hpp>
#include <asio/io_service.hpp>
#include <asio/io_context.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/ssl/stream.hpp>
#include <asio/strand.hpp>
#else
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/strand.hpp>
Expand Down Expand Up @@ -238,7 +238,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
* although not usable at this point, since this is just tcp connection
* Pulsar - Connect/Connected has yet to happen
*/
void handleTcpConnected(const ASIO_ERROR& err, ASIO::ip::tcp::resolver::iterator endpointIterator);
void handleTcpConnected(const ASIO_ERROR& err, const ASIO::ip::tcp::endpoint& endpoint);

void handleHandshake(const ASIO_ERROR& err);

Expand All @@ -261,7 +261,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien

void handlePulsarConnected(const proto::CommandConnected& cmdConnected);

void handleResolve(const ASIO_ERROR& err, const ASIO::ip::tcp::resolver::iterator& endpointIterator);
void handleResolve(ASIO_ERROR err, const ASIO::ip::tcp::resolver::results_type& results);

void handleSend(const ASIO_ERROR& err, const SharedBuffer& cmd);
void handleSendPair(const ASIO_ERROR& err);
Expand Down Expand Up @@ -325,7 +325,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
*/
SocketPtr socket_;
TlsSocketPtr tlsSocket_;
ASIO::strand<ASIO::io_service::executor_type> strand_;
ASIO::strand<ASIO::io_context::executor_type> strand_;

const std::string logicalAddress_;
/*
Expand Down
Loading