Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 68 additions & 84 deletions source/extensions/filters/network/redis_proxy/command_splitter_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,26 @@ Common::Redis::RespValuePtr Utility::makeError(const std::string& error) {

namespace {

// null_pool_callbacks is used for requests that must be filtered and not redirected such as
// "asking".
DoNothingPoolCallbacks null_pool_callbacks;

// Create an asking command request.
const Common::Redis::RespValue& askingRequest() {
static Common::Redis::RespValue request;
static bool initialized = false;

if (!initialized) {
Common::Redis::RespValue asking_cmd;
asking_cmd.type(Common::Redis::RespType::BulkString);
asking_cmd.asString() = "asking";
request.type(Common::Redis::RespType::Array);
request.asArray().push_back(asking_cmd);
initialized = true;
}
return request;
}

/**
* Validate the received moved/ask redirection error and the original redis request.
* @param[in] original_request supplies the incoming request associated with the command splitter
Expand Down Expand Up @@ -91,34 +111,27 @@ void SingleServerRequest::onFailure() {
callbacks_.onResponse(Utility::makeError(Response::get().UpstreamFailure));
}

void SingleServerRequest::recreate(Common::Redis::RespValue& request, bool prepend_asking) {
if (!prepend_asking) {
request = *incoming_request_;
return;
}

Common::Redis::RespValue asking_cmd;
asking_cmd.type(Common::Redis::RespType::BulkString);
asking_cmd.asString() = "asking";

request.type(Common::Redis::RespType::Array);
request.asArray().push_back(asking_cmd);
request.asArray().insert(request.asArray().end(), incoming_request_->asArray().begin(),
incoming_request_->asArray().end());
}

bool SingleServerRequest::onRedirection(const Common::Redis::RespValue& value) {
std::vector<absl::string_view> err;
bool ask_redirection = false;
if (redirectionArgsInvalid(incoming_request_.get(), value, err, ask_redirection) || !conn_pool_) {
return false;
}

Common::Redis::RespValue request;
recreate(request, ask_redirection);
// MOVED and ASK redirection errors have the following substrings: MOVED or ASK (err[0]), hash key
// slot (err[1]), and IP address and TCP port separated by a colon (err[2]).
const std::string host_address = std::string(err[2]);

const std::string host_address = std::string(err[2]); // ip:port
handle_ = conn_pool_->makeRequestToHost(host_address, request, *this);
// Prepend request with an asking command if redirected via an ASK error. The returned handle is
// not important since there is no point in being able to cancel the request. The use of
// null_pool_callbacks ensures the transparent filtering of the Redis server's response to the
// "asking" command; this is fine since the server either responds with an OK or an error message
// if cluster support is not enabled (in which case we should not get an ASK redirection error).
if (ask_redirection &&
!conn_pool_->makeRequestToHost(host_address, askingRequest(), null_pool_callbacks)) {
return false;
}
handle_ = conn_pool_->makeRequestToHost(host_address, *incoming_request_, *this);
return (handle_ != nullptr);
}

Expand Down Expand Up @@ -240,6 +253,35 @@ SplitRequestPtr MGETRequest::create(ConnPool::Instance& conn_pool,
return nullptr;
}

bool FragmentedRequest::onChildRedirection(const Common::Redis::RespValue& value, uint32_t index,
ConnPool::Instance* conn_pool) {
std::vector<absl::string_view> err;
bool ask_redirection = false;
if (redirectionArgsInvalid(incoming_request_.get(), value, err, ask_redirection) || !conn_pool) {
return false;
}

// MOVED and ASK redirection errors have the following substrings: MOVED or ASK (err[0]), hash key
// slot (err[1]), and IP address and TCP port separated by a colon (err[2]).
std::string host_address = std::string(err[2]);
Common::Redis::RespValue request;
recreate(request, index);

// Prepend request with an asking command if redirected via an ASK error. The returned handle is
// not important since there is no point in being able to cancel the request. The use of
// null_pool_callbacks ensures the transparent filtering of the Redis server's response to the
// "asking" command; this is fine since the server either responds with an OK or an error message
// if cluster support is not enabled (in which case we should not get an ASK redirection error).
if (ask_redirection &&
!conn_pool->makeRequestToHost(host_address, askingRequest(), null_pool_callbacks)) {
return false;
}

this->pending_requests_[index].handle_ =
conn_pool->makeRequestToHost(host_address, request, this->pending_requests_[index]);
return (this->pending_requests_[index].handle_ != nullptr);
}

void MGETRequest::onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t index) {
pending_requests_[index].handle_ = nullptr;

Expand Down Expand Up @@ -273,40 +315,21 @@ void MGETRequest::onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t
}
}

void MGETRequest::recreate(Common::Redis::RespValue& request, uint32_t index, bool prepend_asking) {
void MGETRequest::recreate(Common::Redis::RespValue& request, uint32_t index) {
static const uint32_t GET_COMMAND_SUBSTRINGS = 2;
uint32_t num_values = prepend_asking ? (GET_COMMAND_SUBSTRINGS + 1) : GET_COMMAND_SUBSTRINGS;
uint32_t num_values = GET_COMMAND_SUBSTRINGS;
std::vector<Common::Redis::RespValue> values(num_values);

for (uint32_t i = 0; i < num_values; i++) {
values[i].type(Common::Redis::RespType::BulkString);
}
values[--num_values].asString() = incoming_request_->asArray()[index + 1].asString();
values[--num_values].asString() = "get";
if (prepend_asking) {
values[--num_values].asString() = "asking";
}

request.type(Common::Redis::RespType::Array);
request.asArray().swap(values);
}

bool MGETRequest::onChildRedirection(const Common::Redis::RespValue& value, uint32_t index,
ConnPool::Instance* conn_pool) {
std::vector<absl::string_view> err;
bool ask_redirection = false;
if (redirectionArgsInvalid(incoming_request_.get(), value, err, ask_redirection) || !conn_pool) {
return false;
}

Common::Redis::RespValue request;
recreate(request, index, ask_redirection);

this->pending_requests_[index].handle_ =
conn_pool->makeRequestToHost(std::string(err[2]), request, this->pending_requests_[index]);
return (this->pending_requests_[index].handle_ != nullptr);
}

SplitRequestPtr MSETRequest::create(ConnPool::Instance& conn_pool,
Common::Redis::RespValuePtr&& incoming_request,
SplitCallbacks& callbacks, CommandStats& command_stats,
Expand Down Expand Up @@ -388,9 +411,9 @@ void MSETRequest::onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t
}
}

void MSETRequest::recreate(Common::Redis::RespValue& request, uint32_t index, bool prepend_asking) {
void MSETRequest::recreate(Common::Redis::RespValue& request, uint32_t index) {
static const uint32_t SET_COMMAND_SUBSTRINGS = 3;
uint32_t num_values = prepend_asking ? (SET_COMMAND_SUBSTRINGS + 1) : SET_COMMAND_SUBSTRINGS;
uint32_t num_values = SET_COMMAND_SUBSTRINGS;
std::vector<Common::Redis::RespValue> values(num_values);

for (uint32_t i = 0; i < num_values; i++) {
Expand All @@ -399,30 +422,11 @@ void MSETRequest::recreate(Common::Redis::RespValue& request, uint32_t index, bo
values[--num_values].asString() = incoming_request_->asArray()[(index * 2) + 2].asString();
values[--num_values].asString() = incoming_request_->asArray()[(index * 2) + 1].asString();
values[--num_values].asString() = "set";
if (prepend_asking) {
values[--num_values].asString() = "asking";
}

request.type(Common::Redis::RespType::Array);
request.asArray().swap(values);
}

bool MSETRequest::onChildRedirection(const Common::Redis::RespValue& value, uint32_t index,
ConnPool::Instance* conn_pool) {
std::vector<absl::string_view> err;
bool ask_redirection = false;
if (redirectionArgsInvalid(incoming_request_.get(), value, err, ask_redirection) || !conn_pool) {
return false;
}

Common::Redis::RespValue request;
recreate(request, index, ask_redirection);

this->pending_requests_[index].handle_ =
conn_pool->makeRequestToHost(std::string(err[2]), request, this->pending_requests_[index]);
return (this->pending_requests_[index].handle_ != nullptr);
}

SplitRequestPtr SplitKeysSumResultRequest::create(ConnPool::Instance& conn_pool,
Common::Redis::RespValuePtr&& incoming_request,
SplitCallbacks& callbacks,
Expand Down Expand Up @@ -496,41 +500,21 @@ void SplitKeysSumResultRequest::onChildResponse(Common::Redis::RespValuePtr&& va
}
}

void SplitKeysSumResultRequest::recreate(Common::Redis::RespValue& request, uint32_t index,
bool prepend_asking) {
void SplitKeysSumResultRequest::recreate(Common::Redis::RespValue& request, uint32_t index) {
static const uint32_t BASE_COMMAND_SUBSTRINGS = 2;
uint32_t num_values = prepend_asking ? (BASE_COMMAND_SUBSTRINGS + 1) : BASE_COMMAND_SUBSTRINGS;
uint32_t num_values = BASE_COMMAND_SUBSTRINGS;
std::vector<Common::Redis::RespValue> values(num_values);

for (uint32_t i = 0; i < num_values; i++) {
values[i].type(Common::Redis::RespType::BulkString);
}
values[--num_values].asString() = incoming_request_->asArray()[index + 1].asString();
values[--num_values].asString() = incoming_request_->asArray()[0].asString();
if (prepend_asking) {
values[--num_values].asString() = "asking";
}

request.type(Common::Redis::RespType::Array);
request.asArray().swap(values);
}

bool SplitKeysSumResultRequest::onChildRedirection(const Common::Redis::RespValue& value,
uint32_t index, ConnPool::Instance* conn_pool) {
std::vector<absl::string_view> err;
bool ask_redirection = false;
if (redirectionArgsInvalid(incoming_request_.get(), value, err, ask_redirection) || !conn_pool) {
return false;
}

Common::Redis::RespValue request;
recreate(request, index, ask_redirection);

this->pending_requests_[index].handle_ =
conn_pool->makeRequestToHost(std::string(err[2]), request, this->pending_requests_[index]);
return (this->pending_requests_[index].handle_ != nullptr);
}

InstanceImpl::InstanceImpl(ConnPool::InstancePtr&& conn_pool, Stats::Scope& scope,
const std::string& stat_prefix, TimeSource& time_source,
bool latency_in_micros)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ class SingleServerRequest : public SplitRequestBase, public Common::Redis::Clien
TimeSource& time_source, bool latency_in_micros)
: SplitRequestBase(command_stats, time_source, latency_in_micros), callbacks_(callbacks) {}

void recreate(Common::Redis::RespValue& request, bool prepend_asking);

SplitCallbacks& callbacks_;
ConnPool::Instance* conn_pool_{};
Common::Redis::Client::PoolRequest* handle_{};
Expand Down Expand Up @@ -191,8 +189,9 @@ class FragmentedRequest : public SplitRequestBase {

virtual void onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t index) PURE;
void onChildFailure(uint32_t index);
virtual bool onChildRedirection(const Common::Redis::RespValue& value, uint32_t index,
ConnPool::Instance* conn_pool) PURE;
bool onChildRedirection(const Common::Redis::RespValue& value, uint32_t index,
ConnPool::Instance* conn_pool);
virtual void recreate(Common::Redis::RespValue& request, uint32_t index) PURE;

SplitCallbacks& callbacks_;

Expand Down Expand Up @@ -221,9 +220,7 @@ class MGETRequest : public FragmentedRequest, Logger::Loggable<Logger::Id::redis

// RedisProxy::CommandSplitter::FragmentedRequest
void onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t index) override;
virtual bool onChildRedirection(const Common::Redis::RespValue& value, uint32_t index,
ConnPool::Instance* conn_pool) override;
void recreate(Common::Redis::RespValue& request, uint32_t index, bool prepend_asking);
void recreate(Common::Redis::RespValue& request, uint32_t index) override;
};

/**
Expand All @@ -246,9 +243,7 @@ class SplitKeysSumResultRequest : public FragmentedRequest, Logger::Loggable<Log

// RedisProxy::CommandSplitter::FragmentedRequest
void onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t index) override;
virtual bool onChildRedirection(const Common::Redis::RespValue& value, uint32_t index,
ConnPool::Instance* conn_pool) override;
void recreate(Common::Redis::RespValue& request, uint32_t index, bool prepend_asking);
void recreate(Common::Redis::RespValue& request, uint32_t index) override;

int64_t total_{0};
};
Expand All @@ -272,9 +267,7 @@ class MSETRequest : public FragmentedRequest, Logger::Loggable<Logger::Id::redis

// RedisProxy::CommandSplitter::FragmentedRequest
void onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t index) override;
virtual bool onChildRedirection(const Common::Redis::RespValue& value, uint32_t index,
ConnPool::Instance* conn_pool) override;
void recreate(Common::Redis::RespValue& request, uint32_t index, bool prepend_asking);
void recreate(Common::Redis::RespValue& request, uint32_t index) override;
};

/**
Expand Down Expand Up @@ -343,6 +336,18 @@ class InstanceImpl : public Instance, Logger::Loggable<Logger::Id::redis> {
TimeSource& time_source_;
};

/**
* DoNothingPoolCallbacks is used for internally generated commands whose response is
* transparently filtered, and redirection never occurs (e.g., "asking", etc.).
*/
class DoNothingPoolCallbacks : public Common::Redis::Client::PoolCallbacks {
public:
// Common::Redis::Client::PoolCallbacks
void onResponse(Common::Redis::RespValuePtr&&) override {}
void onFailure() override {}
bool onRedirection(const Common::Redis::RespValue&) override { return false; }
};

} // namespace CommandSplitter
} // namespace RedisProxy
} // namespace NetworkFilters
Expand Down
Loading