Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ message RedisProxy {
// * '{user1000}.following' and '{user1000}.followers' **will** be sent to the same upstream
// * '{user1000}.following' and '{user1001}.following' **might** be sent to the same upstream
bool enable_hashtagging = 2;

// Accept `moved and ask redirection
// <https://redis.io/topics/cluster-spec#redirection-and-resharding>`_ errors from upstream
// redis servers, and retry commands to the specified target server. The target server does not
// need to be known to the cluster manager. If the command cannot be redirected, then the
// original error is passed downstream unchanged. By default, this support is not enabled.
bool enable_redirection = 3;
}

// Network settings for the connection pool to the upstream cluster.
Expand Down
13 changes: 13 additions & 0 deletions source/extensions/filters/network/common/redis/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ class PoolCallbacks {
* Called when a network/protocol error occurs and there is no response.
*/
virtual void onFailure() PURE;

/**
* Called when a MOVED or ASK redirection error is received, and the request must be retried.
* @param value supplies the MOVED error response
* @return bool true if the request is successfully redirected, false otherwise
*/
virtual bool onRedirection(const Common::Redis::RespValue& value) PURE;
};

/**
Expand Down Expand Up @@ -97,6 +104,12 @@ class Config {
* same hash tag will be forwarded to the same upstream.
*/
virtual bool enableHashtagging() const PURE;

/**
* @return when enabled, moved/ask redirection errors from upstream redis servers will be
* processed.
*/
virtual bool enableRedirection() const PURE;
};

/**
Expand Down
30 changes: 25 additions & 5 deletions source/extensions/filters/network/common/redis/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ namespace Client {
ConfigImpl::ConfigImpl(
const envoy::config::filter::network::redis_proxy::v2::RedisProxy::ConnPoolSettings& config)
: op_timeout_(PROTOBUF_GET_MS_REQUIRED(config, op_timeout)),
enable_hashtagging_(config.enable_hashtagging()) {}
enable_hashtagging_(config.enable_hashtagging()),
enable_redirection_(config.enable_redirection()) {}

ClientPtr ClientImpl::create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher,
EncoderPtr&& encoder, DecoderFactory& decoder_factory,
Expand Down Expand Up @@ -137,15 +138,34 @@ void ClientImpl::onEvent(Network::ConnectionEvent event) {
void ClientImpl::onRespValue(RespValuePtr&& value) {
ASSERT(!pending_requests_.empty());
PendingRequest& request = pending_requests_.front();
if (!request.canceled_) {
request.callbacks_.onResponse(std::move(value));
} else {

if (request.canceled_) {
host_->cluster().stats().upstream_rq_cancelled_.inc();
} else if (config_.enableRedirection() && (value->type() == Common::Redis::RespType::Error)) {
std::vector<absl::string_view> err = StringUtil::splitToken(value->asString(), " ", false);
bool redirected = false;
if (err.size() == 3) {
if (err[0] == RedirectionResponse::get().MOVED || err[0] == RedirectionResponse::get().ASK) {
redirected = request.callbacks_.onRedirection(*value);
if (redirected) {
host_->cluster().stats().upstream_internal_redirect_succeeded_total_.inc();
} else {
host_->cluster().stats().upstream_internal_redirect_failed_total_.inc();
}
}
}
if (!redirected) {
request.callbacks_.onResponse(std::move(value));
}
} else {
request.callbacks_.onResponse(std::move(value));
}

pending_requests_.pop_front();

// If there are no remaining ops in the pipeline we need to disable the timer.
// Otherwise we boost the timer since we are receiving responses and there are more to flush out.
// Otherwise we boost the timer since we are receiving responses and there are more to flush
// out.
if (pending_requests_.empty()) {
connect_or_op_timer_->disableTimer();
} else {
Expand Down
10 changes: 10 additions & 0 deletions source/extensions/filters/network/common/redis/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "common/common/hash.h"
#include "common/network/filter_impl.h"
#include "common/protobuf/utility.h"
#include "common/singleton/const_singleton.h"
#include "common/upstream/load_balancer_impl.h"

#include "extensions/filters/network/common/redis/client.h"
Expand All @@ -24,6 +25,13 @@ namespace Client {
// TODO(mattklein123): Circuit breaking
// TODO(rshriram): Fault injection

struct RedirectionValues {
const std::string ASK = "ASK";
const std::string MOVED = "MOVED";
};

typedef ConstSingleton<RedirectionValues> RedirectionResponse;

class ConfigImpl : public Config {
public:
ConfigImpl(
Expand All @@ -32,10 +40,12 @@ class ConfigImpl : public Config {
bool disableOutlierEvents() const override { return false; }
std::chrono::milliseconds opTimeout() const override { return op_timeout_; }
bool enableHashtagging() const override { return enable_hashtagging_; }
bool enableRedirection() const override { return enable_redirection_; }

private:
const std::chrono::milliseconds op_timeout_;
const bool enable_hashtagging_;
const bool enable_redirection_;
};

class ClientImpl : public Client, public DecoderCallbacks, public Network::ConnectionCallbacks {
Expand Down
5 changes: 5 additions & 0 deletions source/extensions/filters/network/common/redis/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ class RespValue {
RespValue() : type_(RespType::Null) {}
~RespValue() { cleanup(); }

RespValue(const RespValue& other); // copy constructor
RespValue& operator=(const RespValue& other); // copy assignment
bool operator==(const RespValue& other) const; // test for equality, unit tests
bool operator!=(const RespValue& other) const { return !(*this == other); }

/**
* Convert a RESP value to a string for debugging purposes.
*/
Expand Down
77 changes: 77 additions & 0 deletions source/extensions/filters/network/common/redis/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,83 @@ void RespValue::type(RespType type) {
}
}

RespValue::RespValue(const RespValue& other) : type_(RespType::Null) {
this->type(other.type());
switch (type_) {
case RespType::Array: {
this->asArray() = other.asArray();
break;
}
case RespType::SimpleString:
case RespType::BulkString:
case RespType::Error: {
this->asString() = other.asString();
break;
}
case RespType::Integer: {
this->asInteger() = other.asInteger();
break;
}
case RespType::Null:
break;
}
}

RespValue& RespValue::operator=(const RespValue& other) {
if (&other == this) {
return *this;
}
this->type(other.type());
switch (type_) {
case RespType::Array: {
this->asArray() = other.asArray();
break;
}
case RespType::SimpleString:
case RespType::BulkString:
case RespType::Error: {
this->asString() = other.asString();
break;
}
case RespType::Integer: {
this->asInteger() = other.asInteger();
break;
}
case RespType::Null:
break;
}
return *this;
}

bool RespValue::operator==(const RespValue& other) const {
bool result = false;
if (type_ != other.type()) {
return result;
}

switch (type_) {
case RespType::Array: {
result = (this->asArray() == other.asArray());
break;
}
case RespType::SimpleString:
case RespType::BulkString:
case RespType::Error: {
result = (this->asString() == other.asString());
break;
}
case RespType::Integer: {
result = (this->asInteger() == other.asInteger());
break;
}
case RespType::Null: {
result = true;
break;
}
}
return result;
}

void DecoderImpl::decode(Buffer::Instance& data) {
uint64_t num_slices = data.getRawSlices(nullptr, 0);
STACK_ARRAY(slices, Buffer::RawSlice, num_slices);
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/filters/network/redis_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ envoy_cc_library(
"//include/envoy/upstream:cluster_manager_interface",
"//source/common/buffer:buffer_lib",
"//source/common/common:assert_lib",
"//source/common/network:address_lib",
"//source/common/network:filter_lib",
"//source/common/protobuf:utility_lib",
"//source/common/upstream:load_balancer_lib",
"//source/common/upstream:upstream_lib",
"//source/extensions/filters/network/common/redis:client_lib",
"@envoy_api//envoy/config/filter/network/redis_proxy/v2:redis_proxy_cc",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ class Instance {
virtual ~Instance() {}

/**
* Make a split redis request.
* @param request supplies the split request to make.
* Make a split redis request capable of being retried/redirected.
* @param request supplies the split request to make (ownership transferred to call).
* @param callbacks supplies the split request completion callbacks.
* @return SplitRequestPtr a handle to the active request or nullptr if the request has already
* been satisfied (via onResponse() being called). The splitter ALWAYS calls
* onResponse() for a given request.
*/
virtual SplitRequestPtr makeRequest(const Common::Redis::RespValue& request,
virtual SplitRequestPtr makeRequest(Common::Redis::RespValuePtr&& request,
SplitCallbacks& callbacks) PURE;
};

Expand Down
Loading