Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions api/envoy/extensions/http/cache/redis_http_cache/v3/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py.

load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package")

licenses(["notice"]) # Apache 2

api_proto_package(
deps = [
"@com_github_cncf_xds//udpa/annotations:pkg",
"@com_github_cncf_xds//xds/annotations/v3:pkg",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
syntax = "proto3";

package envoy.extensions.http.cache.redis_http_cache.v3;

import "xds/annotations/v3/status.proto";

import "udpa/annotations/status.proto";
import "validate/validate.proto";

option java_package = "io.envoyproxy.envoy.extensions.http.cache.redis_http_cache.v3";
option java_outer_classname = "RedisHttpCacheProto";
option java_multiple_files = true;
option go_package = "github.com/envoyproxy/go-control-plane/envoy/extensions/http/cache/redis_http_cache/v3;redis_http_cachev3";
option (udpa.annotations.file_status).package_version_status = ACTIVE;
option (xds.annotations.v3.file_status).work_in_progress = true;

// [#protodoc-title: RedisHttpCacheConfig]
// [#extension: envoy.extensions.http.cache.redis_http_cache]

// Configuration for a cache implementation that caches in Redis database.
//
// For implementation details, see `DESIGN.md <https://github.com/envoyproxy/envoy/blob/main/source/extensions/http/cache/file_system_http_cache/DESIGN.md>`_.
// [#next-free-field: 2]
message RedisHttpCacheConfig {
// Name of the cluster containing redis server.
string cluster = 1 [(validate.rules).string = {min_len: 1}];
}
19 changes: 19 additions & 0 deletions source/extensions/common/redis/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,22 @@ envoy_cc_library(
"//source/common/common:thread_lib",
],
)

envoy_cc_library(
name = "async_redis_client_lib",
srcs = ["async_redis_client_impl.cc"],
hdrs = ["async_redis_client_impl.h"],
deps = [
"//source/common/tcp:async_tcp_client_lib",
"//source/common/buffer:buffer_lib",
"//source/extensions/filters/network/common/redis:codec_lib",
#":cluster_refresh_manager_interface",
#"//envoy/event:dispatcher_interface",
#"//envoy/singleton:manager_interface",
"//envoy/upstream:cluster_manager_interface",
"//source/common/upstream:cluster_manager_lib",
#"//source/common/common:lock_guard_lib",
#"//source/common/common:thread_annotations",
#"//source/common/common:thread_lib",
],
)
83 changes: 83 additions & 0 deletions source/extensions/common/redis/async_redis_client_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#include "source/extensions/common/redis/async_redis_client_impl.h"

#include "source/common/upstream/cluster_manager_impl.h"
#include "source/common/buffer/buffer_impl.h"

namespace Envoy {
namespace Extensions {
namespace Common {
namespace Redis {

RedisAsyncClient::RedisAsyncClient(Tcp::AsyncTcpClientPtr&& tcp_client, Upstream::ClusterManager& cluster_manager) : tcp_client_(std::move(tcp_client)), decoder_(*this), cluster_manager_(cluster_manager) {
tcp_client_->setAsyncTcpClientCallbacks(*this);
}

void RedisAsyncClient::onEvent(Network::ConnectionEvent event) {
if (event == Network::ConnectionEvent::RemoteClose ||
event == Network::ConnectionEvent::LocalClose) {
if (callback_) {
callback_(false, false /*ignored*/, absl::nullopt/*ignored*/);
callback_ = nullptr;
}

// Iterate over all queued requests and call a callback
// indicating that connection failed. They would ost likely fail as well.
// A subsequent request
// will trigger the connection process again.
for (; !queue_.empty(); queue_.pop()){
std::get<2>(queue_.front())(false, false, absl::nullopt);
}
waiting_for_response_ = false;
}
}

void RedisAsyncClient::onData(Buffer::Instance& buf, bool) {
NetworkFilters::Common::Redis::RespValue response;
decoder_.decode(buf);
waiting_for_response_ = false;

if (!queue_.empty()) {
auto& element = queue_.front();
write(*(std::get<0>(element)), std::get<1>(element), std::move(std::get<2>(element)));
queue_.pop();
}
}

void RedisAsyncClient::onRespValue(NetworkFilters::Common::Redis::RespValuePtr&& value) {
if (value->type() == NetworkFilters::Common::Redis::RespType::Null) {
callback_(true, false, absl::nullopt);
} else {
std::string response = value->toString();
// Result is a string containing quotes. Drop the quotes on both sides of the string.
response = response.substr(1, response.length() - 2);
callback_(true, true, std::move(response));
}
callback_ = nullptr;

value.reset();
}

void RedisAsyncClient::write(Buffer::Instance& data, bool end_stream, ResultCallback&& result_callback) {
if (!tcp_client_->connected()) {
tcp_client_->connect();
}

// No synch requires, because this is never executed on different threads.
if (waiting_for_response_) {
// Queue the request.
std::unique_ptr<Buffer::OwnedImpl> queued_data = std::make_unique<Buffer::OwnedImpl>();
queued_data->add(data);
// No sync is required to insert and remove objects into the queue, as
// RedisAsyncClient is thread local object and those operations are executed on the same thread.
queue_.emplace(std::move(queued_data), end_stream, std::move(result_callback));
return;
}

waiting_for_response_ = true;
callback_ = std::move(result_callback);
tcp_client_->write(data, end_stream);
}
} // namespace Redis
} // namespace Common
} // namespace Extensions
} // namespace Envoy
47 changes: 47 additions & 0 deletions source/extensions/common/redis/async_redis_client_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#pragma once

#include <queue>
#include "source/common/buffer/buffer_impl.h"
#include "source/common/tcp/async_tcp_client_impl.h"
#include "source/extensions/filters/network/common/redis/codec_impl.h"

namespace Envoy {
namespace Extensions {
namespace Common {
namespace Redis {

class RedisAsyncClient : public Tcp::AsyncTcpClientCallbacks,
public NetworkFilters::Common::Redis::DecoderCallbacks {
public:
using ResultCallback = std::function<void(bool, bool, absl::optional<std::string>)>;
RedisAsyncClient(Tcp::AsyncTcpClientPtr&& tcp_client, Upstream::ClusterManager&);
void onEvent(Network::ConnectionEvent event) override;
void onAboveWriteBufferHighWatermark() override {}
void onBelowWriteBufferLowWatermark() override {}
void onData(Buffer::Instance&, bool) override;

void write(Buffer::Instance& data, bool end_stream, ResultCallback&&);

// class DecoderCallbacks
void onRespValue(NetworkFilters::Common::Redis::RespValuePtr&& value) override;

Tcp::AsyncTcpClientPtr tcp_client_{nullptr};
// Callback to be called when response from Redis is received.
ResultCallback callback_;

NetworkFilters::Common::Redis::EncoderImpl encoder_;
NetworkFilters::Common::Redis::DecoderImpl decoder_;

bool waiting_for_response_{false};
Upstream::ClusterManager& cluster_manager_;
Upstream::ThreadLocalCluster* cluster_;

// queue where requests are queued when the async client is currently
// waiting for a response from the redis server.
std::queue<std::tuple<std::unique_ptr<Buffer::OwnedImpl>, bool, ResultCallback>> queue_;
};

} // namespace Redis
} // namespace Common
} // namespace Extensions
} // namespace Envoy
1 change: 1 addition & 0 deletions source/extensions/extensions_build_config.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ EXTENSIONS = {
#
"envoy.extensions.http.cache.file_system_http_cache": "//source/extensions/http/cache/file_system_http_cache:config",
"envoy.extensions.http.cache.simple": "//source/extensions/http/cache/simple_http_cache:config",
"envoy.extensions.http.cache.redis_http_cache": "//source/extensions/http/cache/redis_http_cache:config",

#
# Internal redirect predicates
Expand Down
7 changes: 7 additions & 0 deletions source/extensions/extensions_metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ envoy.extensions.http.cache.simple:
status: wip
type_urls:
- envoy.extensions.http.cache.simple_http_cache.v3.SimpleHttpCacheConfig
envoy.extensions.http.cache.redis_http_cache:
categories:
- envoy.http.cache
security_posture: unknown
status: wip
type_urls:
- envoy.extensions.http.cache.redis_http_cache.v3.RedisHttpCacheConfig
envoy.clusters.aggregate:
categories:
- envoy.clusters
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/filters/http/cache/cache_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ void CacheFilter::onHeaders(LookupResult&& result, Http::RequestHeaderMap& reque
case CacheEntryStatus::LookupError:
filter_state_ = FilterState::NotServingFromCache;
insert_status_ = InsertStatus::NoInsertLookupError;
lookup_->onDestroy();
lookup_ = nullptr;
decoder_callbacks_->continueDecoding();
return;
}
Expand Down
44 changes: 44 additions & 0 deletions source/extensions/http/cache/redis_http_cache/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_extension",
"envoy_cc_library",
"envoy_extension_package",
"envoy_proto_library",
)

licenses(["notice"]) # Apache 2

envoy_extension_package()

envoy_proto_library(
name = "cache_header_proto",
srcs = ["cache_header.proto"],
deps = ["//source/extensions/filters/http/cache:key"],
)

envoy_cc_extension(
name = "config",
srcs = [
"config.cc",
"redis_http_cache.cc",
"cache_header_proto_util.cc",
"redis_http_cache_lookup.cc",
"redis_http_cache_insert.cc",
"redis_http_cache_client.cc",
],
hdrs = [
"redis_http_cache.h",
"redis_http_cache_lookup.h",
"redis_http_cache_insert.h",
"redis_http_cache_client.h",
"cache_header_proto_util.h",
],
deps = [
":cache_header_proto_cc_proto",
"//source/extensions/filters/http/cache:http_cache_lib",
"//source/extensions/common/redis:async_redis_client_lib",
"@envoy_api//envoy/extensions/http/cache/redis_http_cache/v3:pkg_cc_proto",
],
)


18 changes: 18 additions & 0 deletions source/extensions/http/cache/redis_http_cache/DESIGN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
## Architecture
Redis cache backend provides additional storage backend in addition to already existing memory and local file system backends.

From API point of view, the cache filter points to a cluster. This is the same approach as in the case of xds configuration. The cluster config contains list of endpoints, methods to find them (static or dns), connection parameters (like connection timeout), etc.

To communicate with a Redis server, a new async client has been added: RedisAsyncClient. It is built on top of TcpAsyncClient. Encoding data is done using already existing Redis encoder. The received data is decoded using already existing Redis decoder.

To avoid constant opening and closing TCP connections to the Redis server, the connection stays open after it is initially created. Each worker threads has a separate connection to the Redis server, so there will be maximum as many tcp sessions to a particular Redis server as are worker threads.

Thread Local Storage structure allocated for RedisAsyncClients uses hash map to map cluster_name to RedisAsyncClient.

As the name says, the RedisAsyncClient is of asynchronous architecture. It means that after sending a request to the Redis server, the thread picks up a next job, which may also result in sending another request to the same Redis server. In this situation the new request is queued and sent to the Redis server after the reply to the previous request is received.

Storage design:

- Entries are stored under 3 keys: `cache-<hash>-headers`, `cache-<hash>-body` and `cache-<hash>-trailers`. The header's entry, in addition to actual headers, also stores the size of the body and info whether trailers are present.
- Headers' and trailers' entries are stored using protobufs. The format and utilities to encode/decode are reused from local file backend. In the future, that code may be moved to a common directory and reused by redis and local files backends. For now few files have been copied from local files backend directory.
- The Redis backend has been design to be used by multiple Envoys. This means that 2 or more Envoys may try to fill the cache at the same time. The coordination is pushed to Redis server. The first Envoy which manages to successfully issue the command `set cache-<hash>-headers "" NX EX 30` is the one which will fill the cache. Note that the command does not write any meaningful content to Redis yet, it writes an empty string, just to reserve the right to fill the cache. The `NX` parameter instructs the Redis server to write the content only when `cache-<hash>-headers` does not exist. `EX 30` parameter instructs the Redis to delete the entry after 30 seconds, This is done to recover from errors when an Envoy cannot complete writing to cache (crash, disconnect, etc). In such situation that particular key will be unblocked after 30 seconds. When filling the cache with headers, body and trailer is successful, the 30 seconds expiration limit is removed.
55 changes: 55 additions & 0 deletions source/extensions/http/cache/redis_http_cache/cache_header.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
syntax = "proto3";

package Envoy.Extensions.HttpFilters.Cache.RedisHttpCache;

import "google/protobuf/timestamp.proto";
import "source/extensions/filters/http/cache/key.proto";

// The full structure of a cache file is:
// 4 byte cache file identifier (used to ignore files that don't belong to the cache)
// 4 byte cache version identifier (if mismatched, the cache file is invalid and is deleted)
// 4 byte header size
// 4 byte trailer size
// 8 byte body size
// serialized CacheFileHeader
// body
// serialized CacheFileTrailer
//
// The opening block is necessary to allow the sizes to be at the front of the file, but
// (necessarily) written last - you can't easily insert things into a serialized proto, so
// a flat layout for this block is necessary.
//
// One slightly special case is the cache file for an entry with 'vary' headers involved
// - for this case at the 'hub' entry there is no trailer or body, and the only header
// is a 'vary' header, which indicates that the actual cache key will include some headers
// from the request.

// For serializing to cache files only, the CacheFileHeader message contains the cache
// entry key, the cache metadata, and the http response headers.
message CacheFileHeader {
Key key = 1;
google.protobuf.Timestamp metadata_response_time = 2;
// Repeated Header messages are used, rather than a proto map, because there may be
// repeated keys, and ordering may be important.
message Header {
string key = 1;
string value = 2;
}
repeated Header headers = 3;

uint64 body_size = 5;

bool trailers = 6;
};

// For serializing to cache files only, the CacheFileTrailer message contains the http
// response trailers.
message CacheFileTrailer {
// Repeated Trailer messages are used, rather than a proto map, because there may be
// repeated keys, and ordering may be important.
message Trailer {
string key = 1;
string value = 2;
}
repeated Trailer trailers = 3;
};
Loading
Loading