diff --git a/api/envoy/extensions/http/cache/redis_http_cache/v3/BUILD b/api/envoy/extensions/http/cache/redis_http_cache/v3/BUILD new file mode 100644 index 0000000000000..d49202b74ab44 --- /dev/null +++ b/api/envoy/extensions/http/cache/redis_http_cache/v3/BUILD @@ -0,0 +1,12 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = [ + "@com_github_cncf_xds//udpa/annotations:pkg", + "@com_github_cncf_xds//xds/annotations/v3:pkg", + ], +) diff --git a/api/envoy/extensions/http/cache/redis_http_cache/v3/redis_http_cache.proto b/api/envoy/extensions/http/cache/redis_http_cache/v3/redis_http_cache.proto new file mode 100644 index 0000000000000..5d1519fe6829c --- /dev/null +++ b/api/envoy/extensions/http/cache/redis_http_cache/v3/redis_http_cache.proto @@ -0,0 +1,27 @@ +syntax = "proto3"; + +package envoy.extensions.http.cache.redis_http_cache.v3; + +import "xds/annotations/v3/status.proto"; + +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.http.cache.redis_http_cache.v3"; +option java_outer_classname = "RedisHttpCacheProto"; +option java_multiple_files = true; +option go_package = "github.com/envoyproxy/go-control-plane/envoy/extensions/http/cache/redis_http_cache/v3;redis_http_cachev3"; +option (udpa.annotations.file_status).package_version_status = ACTIVE; +option (xds.annotations.v3.file_status).work_in_progress = true; + +// [#protodoc-title: RedisHttpCacheConfig] +// [#extension: envoy.extensions.http.cache.redis_http_cache] + +// Configuration for a cache implementation that caches in Redis database. +// +// For implementation details, see `DESIGN.md `_. +// [#next-free-field: 2] +message RedisHttpCacheConfig { + // Name of the cluster containing redis server. + string cluster = 1 [(validate.rules).string = {min_len: 1}]; +} diff --git a/source/extensions/common/redis/BUILD b/source/extensions/common/redis/BUILD index 69afd09d23eb3..dcdf85366105c 100644 --- a/source/extensions/common/redis/BUILD +++ b/source/extensions/common/redis/BUILD @@ -34,3 +34,22 @@ envoy_cc_library( "//source/common/common:thread_lib", ], ) + +envoy_cc_library( + name = "async_redis_client_lib", + srcs = ["async_redis_client_impl.cc"], + hdrs = ["async_redis_client_impl.h"], + deps = [ + "//source/common/tcp:async_tcp_client_lib", + "//source/common/buffer:buffer_lib", + "//source/extensions/filters/network/common/redis:codec_lib", + #":cluster_refresh_manager_interface", + #"//envoy/event:dispatcher_interface", + #"//envoy/singleton:manager_interface", + "//envoy/upstream:cluster_manager_interface", + "//source/common/upstream:cluster_manager_lib", + #"//source/common/common:lock_guard_lib", + #"//source/common/common:thread_annotations", + #"//source/common/common:thread_lib", + ], +) diff --git a/source/extensions/common/redis/async_redis_client_impl.cc b/source/extensions/common/redis/async_redis_client_impl.cc new file mode 100644 index 0000000000000..1108756036414 --- /dev/null +++ b/source/extensions/common/redis/async_redis_client_impl.cc @@ -0,0 +1,83 @@ +#include "source/extensions/common/redis/async_redis_client_impl.h" + +#include "source/common/upstream/cluster_manager_impl.h" +#include "source/common/buffer/buffer_impl.h" + +namespace Envoy { +namespace Extensions { +namespace Common { +namespace Redis { + +RedisAsyncClient::RedisAsyncClient(Tcp::AsyncTcpClientPtr&& tcp_client, Upstream::ClusterManager& cluster_manager) : tcp_client_(std::move(tcp_client)), decoder_(*this), cluster_manager_(cluster_manager) { + tcp_client_->setAsyncTcpClientCallbacks(*this); +} + +void RedisAsyncClient::onEvent(Network::ConnectionEvent event) { + if (event == Network::ConnectionEvent::RemoteClose || + event == Network::ConnectionEvent::LocalClose) { + if (callback_) { + callback_(false, false /*ignored*/, absl::nullopt/*ignored*/); + callback_ = nullptr; + } + + // Iterate over all queued requests and call a callback + // indicating that connection failed. They would ost likely fail as well. + // A subsequent request + // will trigger the connection process again. + for (; !queue_.empty(); queue_.pop()){ + std::get<2>(queue_.front())(false, false, absl::nullopt); + } + waiting_for_response_ = false; + } +} + +void RedisAsyncClient::onData(Buffer::Instance& buf, bool) { + NetworkFilters::Common::Redis::RespValue response; + decoder_.decode(buf); + waiting_for_response_ = false; + + if (!queue_.empty()) { + auto& element = queue_.front(); + write(*(std::get<0>(element)), std::get<1>(element), std::move(std::get<2>(element))); + queue_.pop(); + } +} + +void RedisAsyncClient::onRespValue(NetworkFilters::Common::Redis::RespValuePtr&& value) { + if (value->type() == NetworkFilters::Common::Redis::RespType::Null) { + callback_(true, false, absl::nullopt); + } else { + std::string response = value->toString(); + // Result is a string containing quotes. Drop the quotes on both sides of the string. + response = response.substr(1, response.length() - 2); + callback_(true, true, std::move(response)); + } + callback_ = nullptr; + + value.reset(); +} + + void RedisAsyncClient::write(Buffer::Instance& data, bool end_stream, ResultCallback&& result_callback) { + if (!tcp_client_->connected()) { + tcp_client_->connect(); + } + + // No synch requires, because this is never executed on different threads. + if (waiting_for_response_) { + // Queue the request. + std::unique_ptr queued_data = std::make_unique(); + queued_data->add(data); + // No sync is required to insert and remove objects into the queue, as + // RedisAsyncClient is thread local object and those operations are executed on the same thread. + queue_.emplace(std::move(queued_data), end_stream, std::move(result_callback)); + return; + } + + waiting_for_response_ = true; + callback_ = std::move(result_callback); + tcp_client_->write(data, end_stream); + } +} // namespace Redis +} // namespace Common +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/common/redis/async_redis_client_impl.h b/source/extensions/common/redis/async_redis_client_impl.h new file mode 100644 index 0000000000000..bf75f55af5aa6 --- /dev/null +++ b/source/extensions/common/redis/async_redis_client_impl.h @@ -0,0 +1,47 @@ +#pragma once + +#include +#include "source/common/buffer/buffer_impl.h" +#include "source/common/tcp/async_tcp_client_impl.h" +#include "source/extensions/filters/network/common/redis/codec_impl.h" + +namespace Envoy { +namespace Extensions { +namespace Common { +namespace Redis { + +class RedisAsyncClient : public Tcp::AsyncTcpClientCallbacks, + public NetworkFilters::Common::Redis::DecoderCallbacks { +public: + using ResultCallback = std::function)>; + RedisAsyncClient(Tcp::AsyncTcpClientPtr&& tcp_client, Upstream::ClusterManager&); + void onEvent(Network::ConnectionEvent event) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} + void onData(Buffer::Instance&, bool) override; + + void write(Buffer::Instance& data, bool end_stream, ResultCallback&&); + + // class DecoderCallbacks + void onRespValue(NetworkFilters::Common::Redis::RespValuePtr&& value) override; + + Tcp::AsyncTcpClientPtr tcp_client_{nullptr}; + // Callback to be called when response from Redis is received. + ResultCallback callback_; + + NetworkFilters::Common::Redis::EncoderImpl encoder_; + NetworkFilters::Common::Redis::DecoderImpl decoder_; + + bool waiting_for_response_{false}; + Upstream::ClusterManager& cluster_manager_; + Upstream::ThreadLocalCluster* cluster_; + + // queue where requests are queued when the async client is currently + // waiting for a response from the redis server. + std::queue, bool, ResultCallback>> queue_; +}; + +} // namespace Redis +} // namespace Common +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/extensions_build_config.bzl b/source/extensions/extensions_build_config.bzl index 8a00f28108759..e2b5436fccf09 100644 --- a/source/extensions/extensions_build_config.bzl +++ b/source/extensions/extensions_build_config.bzl @@ -330,6 +330,7 @@ EXTENSIONS = { # "envoy.extensions.http.cache.file_system_http_cache": "//source/extensions/http/cache/file_system_http_cache:config", "envoy.extensions.http.cache.simple": "//source/extensions/http/cache/simple_http_cache:config", + "envoy.extensions.http.cache.redis_http_cache": "//source/extensions/http/cache/redis_http_cache:config", # # Internal redirect predicates diff --git a/source/extensions/extensions_metadata.yaml b/source/extensions/extensions_metadata.yaml index af8263b0c62d4..ef51fae19a3ea 100644 --- a/source/extensions/extensions_metadata.yaml +++ b/source/extensions/extensions_metadata.yaml @@ -89,6 +89,13 @@ envoy.extensions.http.cache.simple: status: wip type_urls: - envoy.extensions.http.cache.simple_http_cache.v3.SimpleHttpCacheConfig +envoy.extensions.http.cache.redis_http_cache: + categories: + - envoy.http.cache + security_posture: unknown + status: wip + type_urls: + - envoy.extensions.http.cache.redis_http_cache.v3.RedisHttpCacheConfig envoy.clusters.aggregate: categories: - envoy.clusters diff --git a/source/extensions/filters/http/cache/cache_filter.cc b/source/extensions/filters/http/cache/cache_filter.cc index 0ca04ed2131d0..f6de607c9a44f 100644 --- a/source/extensions/filters/http/cache/cache_filter.cc +++ b/source/extensions/filters/http/cache/cache_filter.cc @@ -332,6 +332,8 @@ void CacheFilter::onHeaders(LookupResult&& result, Http::RequestHeaderMap& reque case CacheEntryStatus::LookupError: filter_state_ = FilterState::NotServingFromCache; insert_status_ = InsertStatus::NoInsertLookupError; + lookup_->onDestroy(); + lookup_ = nullptr; decoder_callbacks_->continueDecoding(); return; } diff --git a/source/extensions/http/cache/redis_http_cache/BUILD b/source/extensions/http/cache/redis_http_cache/BUILD new file mode 100644 index 0000000000000..932e7c841d527 --- /dev/null +++ b/source/extensions/http/cache/redis_http_cache/BUILD @@ -0,0 +1,44 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_extension", + "envoy_cc_library", + "envoy_extension_package", + "envoy_proto_library", +) + +licenses(["notice"]) # Apache 2 + +envoy_extension_package() + +envoy_proto_library( + name = "cache_header_proto", + srcs = ["cache_header.proto"], + deps = ["//source/extensions/filters/http/cache:key"], +) + +envoy_cc_extension( + name = "config", + srcs = [ + "config.cc", + "redis_http_cache.cc", + "cache_header_proto_util.cc", + "redis_http_cache_lookup.cc", + "redis_http_cache_insert.cc", + "redis_http_cache_client.cc", + ], + hdrs = [ + "redis_http_cache.h", + "redis_http_cache_lookup.h", + "redis_http_cache_insert.h", + "redis_http_cache_client.h", + "cache_header_proto_util.h", + ], + deps = [ + ":cache_header_proto_cc_proto", + "//source/extensions/filters/http/cache:http_cache_lib", + "//source/extensions/common/redis:async_redis_client_lib", + "@envoy_api//envoy/extensions/http/cache/redis_http_cache/v3:pkg_cc_proto", + ], +) + + diff --git a/source/extensions/http/cache/redis_http_cache/DESIGN.md b/source/extensions/http/cache/redis_http_cache/DESIGN.md new file mode 100644 index 0000000000000..5e3284ba2f477 --- /dev/null +++ b/source/extensions/http/cache/redis_http_cache/DESIGN.md @@ -0,0 +1,18 @@ +## Architecture +Redis cache backend provides additional storage backend in addition to already existing memory and local file system backends. + +From API point of view, the cache filter points to a cluster. This is the same approach as in the case of xds configuration. The cluster config contains list of endpoints, methods to find them (static or dns), connection parameters (like connection timeout), etc. + +To communicate with a Redis server, a new async client has been added: RedisAsyncClient. It is built on top of TcpAsyncClient. Encoding data is done using already existing Redis encoder. The received data is decoded using already existing Redis decoder. + +To avoid constant opening and closing TCP connections to the Redis server, the connection stays open after it is initially created. Each worker threads has a separate connection to the Redis server, so there will be maximum as many tcp sessions to a particular Redis server as are worker threads. + +Thread Local Storage structure allocated for RedisAsyncClients uses hash map to map cluster_name to RedisAsyncClient. + +As the name says, the RedisAsyncClient is of asynchronous architecture. It means that after sending a request to the Redis server, the thread picks up a next job, which may also result in sending another request to the same Redis server. In this situation the new request is queued and sent to the Redis server after the reply to the previous request is received. + +Storage design: + + - Entries are stored under 3 keys: `cache--headers`, `cache--body` and `cache--trailers`. The header's entry, in addition to actual headers, also stores the size of the body and info whether trailers are present. + - Headers' and trailers' entries are stored using protobufs. The format and utilities to encode/decode are reused from local file backend. In the future, that code may be moved to a common directory and reused by redis and local files backends. For now few files have been copied from local files backend directory. + - The Redis backend has been design to be used by multiple Envoys. This means that 2 or more Envoys may try to fill the cache at the same time. The coordination is pushed to Redis server. The first Envoy which manages to successfully issue the command `set cache--headers "" NX EX 30` is the one which will fill the cache. Note that the command does not write any meaningful content to Redis yet, it writes an empty string, just to reserve the right to fill the cache. The `NX` parameter instructs the Redis server to write the content only when `cache--headers` does not exist. `EX 30` parameter instructs the Redis to delete the entry after 30 seconds, This is done to recover from errors when an Envoy cannot complete writing to cache (crash, disconnect, etc). In such situation that particular key will be unblocked after 30 seconds. When filling the cache with headers, body and trailer is successful, the 30 seconds expiration limit is removed. diff --git a/source/extensions/http/cache/redis_http_cache/cache_header.proto b/source/extensions/http/cache/redis_http_cache/cache_header.proto new file mode 100644 index 0000000000000..190650a56d03b --- /dev/null +++ b/source/extensions/http/cache/redis_http_cache/cache_header.proto @@ -0,0 +1,55 @@ +syntax = "proto3"; + +package Envoy.Extensions.HttpFilters.Cache.RedisHttpCache; + +import "google/protobuf/timestamp.proto"; +import "source/extensions/filters/http/cache/key.proto"; + +// The full structure of a cache file is: +// 4 byte cache file identifier (used to ignore files that don't belong to the cache) +// 4 byte cache version identifier (if mismatched, the cache file is invalid and is deleted) +// 4 byte header size +// 4 byte trailer size +// 8 byte body size +// serialized CacheFileHeader +// body +// serialized CacheFileTrailer +// +// The opening block is necessary to allow the sizes to be at the front of the file, but +// (necessarily) written last - you can't easily insert things into a serialized proto, so +// a flat layout for this block is necessary. +// +// One slightly special case is the cache file for an entry with 'vary' headers involved +// - for this case at the 'hub' entry there is no trailer or body, and the only header +// is a 'vary' header, which indicates that the actual cache key will include some headers +// from the request. + +// For serializing to cache files only, the CacheFileHeader message contains the cache +// entry key, the cache metadata, and the http response headers. +message CacheFileHeader { + Key key = 1; + google.protobuf.Timestamp metadata_response_time = 2; + // Repeated Header messages are used, rather than a proto map, because there may be + // repeated keys, and ordering may be important. + message Header { + string key = 1; + string value = 2; + } + repeated Header headers = 3; + + uint64 body_size = 5; + + bool trailers = 6; +}; + +// For serializing to cache files only, the CacheFileTrailer message contains the http +// response trailers. +message CacheFileTrailer { + // Repeated Trailer messages are used, rather than a proto map, because there may be + // repeated keys, and ordering may be important. + message Trailer { + string key = 1; + string value = 2; + } + repeated Trailer trailers = 3; +}; diff --git a/source/extensions/http/cache/redis_http_cache/cache_header_proto_util.cc b/source/extensions/http/cache/redis_http_cache/cache_header_proto_util.cc new file mode 100644 index 0000000000000..4d44c45f18405 --- /dev/null +++ b/source/extensions/http/cache/redis_http_cache/cache_header_proto_util.cc @@ -0,0 +1,98 @@ +#include "source/extensions/http/cache/redis_http_cache/cache_header_proto_util.h" + +#include "absl/container/flat_hash_set.h" +#include "absl/strings/str_cat.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace Cache { +namespace RedisHttpCache { +namespace { +template +Http::HeaderMap::Iterate copyToKeyValue(const Http::HeaderEntry& header, KeyValue* kv) { + kv->set_key(std::string{header.key().getStringView()}); + kv->set_value(std::string{header.value().getStringView()}); + return Http::HeaderMap::Iterate::Continue; +} +} // namespace + +CacheFileHeader mergeProtoWithHeadersAndMetadata(const CacheFileHeader& entry_headers, + const Http::ResponseHeaderMap& response_headers, + const ResponseMetadata& response_metadata) { + Http::ResponseHeaderMapPtr merge_headers = headersFromHeaderProto(entry_headers); + applyHeaderUpdate(response_headers, *merge_headers); + return makeCacheFileHeaderProto(entry_headers.key(), *merge_headers, response_metadata); +} + +CacheFileHeader makeCacheFileHeaderProto(const Key& key, + const Http::ResponseHeaderMap& response_headers, + const ResponseMetadata& metadata) { + CacheFileHeader file_header; + *file_header.mutable_key() = key; + TimestampUtil::systemClockToTimestamp(metadata.response_time_, + *file_header.mutable_metadata_response_time()); + response_headers.iterate([&file_header](const Http::HeaderEntry& header) { + return copyToKeyValue(header, file_header.add_headers()); + }); + return file_header; +} + +CacheFileTrailer makeCacheFileTrailerProto(const Http::ResponseTrailerMap& response_trailers) { + CacheFileTrailer file_trailer; + response_trailers.iterate([&file_trailer](const Http::HeaderEntry& trailer) { + return copyToKeyValue(trailer, file_trailer.add_trailers()); + }); + return file_trailer; +} + +size_t headerProtoSize(const CacheFileHeader& proto) { return proto.ByteSizeLong(); } + +Buffer::OwnedImpl bufferFromProto(const CacheFileHeader& proto) { + // TODO(ravenblack): consider proto.SerializeToZeroCopyStream with an impl to Buffer. + return Buffer::OwnedImpl{proto.SerializeAsString()}; +} + +Buffer::OwnedImpl bufferFromProto(const CacheFileTrailer& proto) { + // TODO(ravenblack): consider proto.SerializeToZeroCopyStream with an impl to Buffer. + return Buffer::OwnedImpl{proto.SerializeAsString()}; +} + +std::string serializedStringFromProto(const CacheFileHeader& proto) { + return proto.SerializeAsString(); +} + +Http::ResponseHeaderMapPtr headersFromHeaderProto(const CacheFileHeader& header) { + Http::ResponseHeaderMapPtr headers = Http::ResponseHeaderMapImpl::create(); + for (const CacheFileHeader::Header& h : header.headers()) { + headers->addCopy(Http::LowerCaseString(h.key()), h.value()); + } + return headers; +} + +Http::ResponseTrailerMapPtr trailersFromTrailerProto(const CacheFileTrailer& trailer) { + Http::ResponseTrailerMapPtr trailers = Http::ResponseTrailerMapImpl::create(); + for (const CacheFileTrailer::Trailer& t : trailer.trailers()) { + trailers->addCopy(Http::LowerCaseString(t.key()), t.value()); + } + return trailers; +} + +ResponseMetadata metadataFromHeaderProto(const CacheFileHeader& header) { + ResponseMetadata metadata; + metadata.response_time_ = SystemTime{std::chrono::milliseconds( + Protobuf::util::TimeUtil::TimestampToMilliseconds(header.metadata_response_time()))}; + return metadata; +} + +CacheFileHeader makeCacheFileHeaderProto(Buffer::Instance& buffer) { + CacheFileHeader ret; + ret.ParseFromString(buffer.toString()); + return ret; +} + +} // namespace RedisHttpCache +} // namespace Cache +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/http/cache/redis_http_cache/cache_header_proto_util.h b/source/extensions/http/cache/redis_http_cache/cache_header_proto_util.h new file mode 100644 index 0000000000000..f67e68a540f32 --- /dev/null +++ b/source/extensions/http/cache/redis_http_cache/cache_header_proto_util.h @@ -0,0 +1,108 @@ +#pragma once + +#include "envoy/http/header_map.h" + +#include "source/common/buffer/buffer_impl.h" +#include "source/extensions/filters/http/cache/cache_entry_utils.h" +#include "source/extensions/http/cache/redis_http_cache/cache_header.pb.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace Cache { +namespace RedisHttpCache { + +// TODO: This file is a copy from files based cache. +// Consider moving it to common place where it can be used +// by redis and file backend. + +/** + * Update an existing CacheFileHeader with new values from an updateHeaders operation. + * See applyHeaderUpdate in cache_entry_utils.h for details of merge behavior. + * @param entry_header the CacheFileHeader from the entry to be updated. + * @param response_headers the http headers from the updateHeaders call. + * @param response_metadata the metadata from the updateHeaders call. + * @return the merged CacheFileHeader. + */ +CacheFileHeader mergeProtoWithHeadersAndMetadata(const CacheFileHeader& entry_headers, + const Http::ResponseHeaderMap& response_headers, + const ResponseMetadata& response_metadata); + +/** + * Create a CacheFileHeader message from response headers, metadata and key. + * @param key the cache entry key. + * @param response_headers the response_headers from updateHeaders or insertHeaders. + * @param metadata the metadata from updateHeaders or insertHeaders. + * @return a CacheFileHeader proto containing the key, response headers and metadata. + */ +CacheFileHeader makeCacheFileHeaderProto(const Key& key, + const Http::ResponseHeaderMap& response_headers, + const ResponseMetadata& metadata); + +/** + * Create a CacheFilterTrailer message from response trailers. + * @param response_trailers the response_trailers from insertTrailers. + * @return a CacheFileTrailer message containing the http trailers. + */ +CacheFileTrailer makeCacheFileTrailerProto(const Http::ResponseTrailerMap& response_trailers); + +/** + * Serializes the CacheFileHeader proto and returns its size in bytes. + * @param proto the CacheFileHeader proto to have its serialized size measured. + */ +size_t headerProtoSize(const CacheFileHeader& proto); + +/** + * Serializes the CacheFileHeader proto into a Buffer object. + * @param proto the CacheFileHeader proto to be serialized. + * @return a Buffer::OwnedImpl containing the serialized CacheFileHeader. + */ +Buffer::OwnedImpl bufferFromProto(const CacheFileHeader& proto); + +/** + * Serializes the CacheFileTrailer proto into a Buffer object. + * @param proto the CacheFileTrailer proto to be serialized. + * @return a Buffer::OwnedImpl containing the serialized CacheFileTrailer. + */ +Buffer::OwnedImpl bufferFromProto(const CacheFileTrailer& proto); + +/** + * Serializes the CacheFileHeader proto into a std::string. + * @param proto the CacheFileHeader proto to be serialized. + * @return a std::string containing the serialized CacheFileHeader. + */ +std::string serializedStringFromProto(const CacheFileHeader& proto); + +/** + * Gets the headers from a CacheFileHeader message as an Envoy::Http::ResponseHeaderMapPtr. + * @param header the CacheFileHeader message from which to extract the headers. + * @return an Http::ResponseHeaderMapPtr containing the cached response headers. + */ +Http::ResponseHeaderMapPtr headersFromHeaderProto(const CacheFileHeader& header); + +/** + * Gets the trailers from a CacheFileTrailer message as an Envoy::Http::ResponseTrailerMapPtr. + * @param trailer the CacheFileTrailer message from which to extract the trailers. + * @return an Http::ResponseTrailerMapPtr containing the cached response trailers. + */ +Http::ResponseTrailerMapPtr trailersFromTrailerProto(const CacheFileTrailer& trailer); + +/** + * Gets the cache metadata from a CacheFileHeader message. + * @param header the CacheFileHeader message from which to extract the metadata. + * @return a ResponseMetadata object containing the cached metadata. + */ +ResponseMetadata metadataFromHeaderProto(const CacheFileHeader& header); + +/** + * Deserializes a CacheFileHeader message from a Buffer. + * @param buffer the buffer containing a serialized CacheFileHeader message. + * @return the deserialized CacheFileHeader message. + */ +CacheFileHeader makeCacheFileHeaderProto(Buffer::Instance& buffer); + +} // namespace RedisHttpCache +} // namespace Cache +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/http/cache/redis_http_cache/config.cc b/source/extensions/http/cache/redis_http_cache/config.cc new file mode 100644 index 0000000000000..55250703a9156 --- /dev/null +++ b/source/extensions/http/cache/redis_http_cache/config.cc @@ -0,0 +1,88 @@ +#include +#include + +//#include "envoy/registry/registry.h" +#include "source/extensions/http/cache/redis_http_cache/redis_http_cache.h" + +#include "source/extensions/filters/http/cache/http_cache.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace Cache { +namespace RedisHttpCache { +namespace { + +/** + * A singleton that acts as a factory for generating and looking up FileSystemHttpCaches. + * When given equivalent configs, the singleton returns pointers to the same cache. + * When given different configs, the singleton returns different cache instances. + * If given configs with the same cache_path but different configuration, + * an exception is thrown, as it doesn't make sense two operate two caches in the + * same path with different configurations. + */ +class CacheSingleton : public Envoy::Singleton::Instance { +public: + CacheSingleton(Upstream::ClusterManager& cluster_manager, + ThreadLocal::SlotAllocator& slot_allocator) + : cluster_manager_(cluster_manager), slot_allocator_(slot_allocator) {} + + std::shared_ptr getCache(std::shared_ptr /*singleton*/, + const ConfigProto& config, + Stats::Scope& /*stats_scope*/) { + absl::MutexLock lock(&mu_); + + auto cache = caches_.find(config.cluster()); + if (cache != caches_.end()) { + return cache->second.lock(); + } + + // TLS internally maps into 1+ clusters. + std::shared_ptr new_cache = + std::make_shared(config.cluster(), cluster_manager_, slot_allocator_); + + caches_.emplace(config.cluster(), new_cache); + return new_cache; + } + +private: + // Each cache is identified by a cluster name. + absl::flat_hash_map> caches_ ABSL_GUARDED_BY(mu_); + absl::Mutex mu_; + Upstream::ClusterManager& cluster_manager_; + ThreadLocal::SlotAllocator& slot_allocator_; +}; + +SINGLETON_MANAGER_REGISTRATION(redis_http_cache_singleton); + +class RedisHttpCacheFactory : public HttpCacheFactory { +public: + std::string name() const override { return std::string{RedisHttpCache::name()}; } + ProtobufTypes::MessagePtr createEmptyConfigProto() override { + return std::make_unique(); + } + // From HttpCacheFactory + std::shared_ptr + getCache(const envoy::extensions::filters::http::cache::v3::CacheConfig& filter_config, + Server::Configuration::FactoryContext& context) override { + ConfigProto config; + THROW_IF_NOT_OK(MessageUtil::unpackTo(filter_config.typed_config(), config)); + std::shared_ptr caches = + context.serverFactoryContext().singletonManager().getTyped( + SINGLETON_MANAGER_REGISTERED_NAME(redis_http_cache_singleton), [&context] { + return std::make_shared( + context.serverFactoryContext().clusterManager(), + context.serverFactoryContext().threadLocal()); + }); + return caches->getCache(caches, config, context.scope()); + } +}; + +static Registry::RegisterFactory register_; + +} // namespace +} // namespace RedisHttpCache +} // namespace Cache +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/http/cache/redis_http_cache/redis_http_cache.cc b/source/extensions/http/cache/redis_http_cache/redis_http_cache.cc new file mode 100644 index 0000000000000..7132dbcb31982 --- /dev/null +++ b/source/extensions/http/cache/redis_http_cache/redis_http_cache.cc @@ -0,0 +1,29 @@ +#include "source/extensions/http/cache/redis_http_cache/redis_http_cache.h" + +#include "source/extensions/filters/http/cache/cache_custom_headers.h" +#include "source/extensions/http/cache/redis_http_cache/cache_header_proto_util.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace Cache { +namespace RedisHttpCache { + +LookupContextPtr RedisHttpCache::makeLookupContext(LookupRequest&& lookup, + Http::StreamFilterCallbacks& callbacks) { + return std::make_unique(cluster_name_, callbacks.dispatcher(), + tls_slot_, std::move(lookup)); +} + +InsertContextPtr RedisHttpCache::makeInsertContext(LookupContextPtr&& lookup, + Http::StreamFilterCallbacks& /* callbacks*/) { + auto redis_lookup_context = std::unique_ptr( + dynamic_cast(lookup.release())); + return std::make_unique(std::move(redis_lookup_context), tls_slot_); +} + +} // namespace RedisHttpCache +} // namespace Cache +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/http/cache/redis_http_cache/redis_http_cache.h b/source/extensions/http/cache/redis_http_cache/redis_http_cache.h new file mode 100644 index 0000000000000..72acba807714f --- /dev/null +++ b/source/extensions/http/cache/redis_http_cache/redis_http_cache.h @@ -0,0 +1,60 @@ +#pragma once + +#include "envoy/extensions/http/cache/redis_http_cache/v3/redis_http_cache.pb.h" +#include "envoy/extensions/http/cache/redis_http_cache/v3/redis_http_cache.pb.validate.h" +#include "envoy/thread_local/thread_local.h" + +#include "source/common/buffer/buffer_impl.h" +#include "source/extensions/common/redis/async_redis_client_impl.h" +#include "source/extensions/filters/http/cache/http_cache.h" +#include "source/extensions/http/cache/redis_http_cache/redis_http_cache_insert.h" +#include "source/extensions/http/cache/redis_http_cache/redis_http_cache_lookup.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace Cache { +namespace RedisHttpCache { + +using ConfigProto = envoy::extensions::http::cache::redis_http_cache::v3::RedisHttpCacheConfig; + +class RedisHttpCache : public HttpCache { +public: + RedisHttpCache(absl::string_view cluster_name, Upstream::ClusterManager& cluster_manager, + ThreadLocal::SlotAllocator& tls) + : cluster_name_(cluster_name), cluster_manager_(cluster_manager), tls_slot_(tls) { + + tls_slot_.set([&](Event::Dispatcher&) { + return std::make_shared(cluster_manager_); + }); + } + LookupContextPtr makeLookupContext(LookupRequest&& /*lookup*/, + Http::StreamFilterCallbacks& /* callbacks*/) override; + InsertContextPtr + makeInsertContext(LookupContextPtr&& /*lookup_context*/, + Http::StreamFilterCallbacks& /*callbacks*/) override; // {return nullptr;} + CacheInfo cacheInfo() const override { + CacheInfo info; + + return info; + } + + void updateHeaders(const LookupContext& /*lookup_context*/, + const Http::ResponseHeaderMap& /*response_headers*/, + const ResponseMetadata& /*metadata*/, + UpdateHeadersCallback /*on_complete*/) override{}; + + static absl::string_view name() { return "redis cache"; } + +private: + std::string cluster_name_; + Upstream::ClusterManager& cluster_manager_; + + ThreadLocal::TypedSlot tls_slot_; +}; + +} // namespace RedisHttpCache +} // namespace Cache +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/http/cache/redis_http_cache/redis_http_cache_client.cc b/source/extensions/http/cache/redis_http_cache/redis_http_cache_client.cc new file mode 100644 index 0000000000000..dd7f56c49e566 --- /dev/null +++ b/source/extensions/http/cache/redis_http_cache/redis_http_cache_client.cc @@ -0,0 +1,54 @@ +#include "source/extensions/http/cache/redis_http_cache/redis_http_cache_client.h" + +#include "source/common/buffer/buffer_impl.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace Cache { +namespace RedisHttpCache { + +bool ThreadLocalRedisClient::send( + absl::string_view cluster_name, std::vector command, + Extensions::Common::Redis::RedisAsyncClient::ResultCallback&& callback) { + + // locate the cluster + auto redis_client = redis_clients_.find(cluster_name); + + if (redis_client == redis_clients_.end()) { + auto cluster = cluster_manager_.getThreadLocalCluster(cluster_name); + if (!cluster) { + ENVOY_LOG_MISC(trace, "Cannot find cluster: {}", cluster_name); + return false; + } + auto tcp_client = + cluster->tcpAsyncClient(nullptr, std::make_shared(false)); + redis_clients_.emplace(cluster_name, + std::make_unique( + std::move(tcp_client), cluster_manager_)); + redis_client = redis_clients_.find(cluster_name); + } + + Buffer::OwnedImpl buf; + NetworkFilters::Common::Redis::RespValue request; + std::vector values(command.size()); + + for (uint32_t i = 0; i < command.size(); i++) { + values[i].type(NetworkFilters::Common::Redis::RespType::BulkString); + values[i].asString() = command[i]; + } + + request.type(NetworkFilters::Common::Redis::RespType::Array); + request.asArray().swap(values); + redis_client->second->encoder_.encode(request, buf); + + redis_client->second->write(buf, false, std::move(callback)); + + return true; +} + +} // namespace RedisHttpCache +} // namespace Cache +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/http/cache/redis_http_cache/redis_http_cache_client.h b/source/extensions/http/cache/redis_http_cache/redis_http_cache_client.h new file mode 100644 index 0000000000000..a0568d885e6d9 --- /dev/null +++ b/source/extensions/http/cache/redis_http_cache/redis_http_cache_client.h @@ -0,0 +1,38 @@ +#pragma once + +#include "envoy/thread_local/thread_local.h" + +#include "source/common/upstream/cluster_manager_impl.h" +#include "source/extensions/common/redis/async_redis_client_impl.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace Cache { +namespace RedisHttpCache { + +struct ThreadLocalRedisClient : public ThreadLocal::ThreadLocalObject { + ThreadLocalRedisClient(Upstream::ClusterManager& cluster_manager) + : cluster_manager_(cluster_manager) {} + ~ThreadLocalRedisClient() override {} + + // Each worker thread has single RedisAsyncClient associated with a particular cluster. + // The clients are found by cluster name. + absl::flat_hash_map> + redis_clients_; + + bool send(absl::string_view cluster, std::vector command, + Extensions::Common::Redis::RedisAsyncClient::ResultCallback&& callback); + + Upstream::ClusterManager& cluster_manager_; +}; + +constexpr std::string_view RedisCacheHeadersEntry = "cache-{}-headers"; +constexpr std::string_view RedisCacheBodyEntry = "cache-{}-body"; +constexpr std::string_view RedisCacheTrailersEntry = "cache-{}-trailers"; + +} // namespace RedisHttpCache +} // namespace Cache +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/http/cache/redis_http_cache/redis_http_cache_insert.cc b/source/extensions/http/cache/redis_http_cache/redis_http_cache_insert.cc new file mode 100644 index 0000000000000..1abd71698e80c --- /dev/null +++ b/source/extensions/http/cache/redis_http_cache/redis_http_cache_insert.cc @@ -0,0 +1,186 @@ +#include "source/extensions/http/cache/redis_http_cache/redis_http_cache_insert.h" + +#include "source/extensions/http/cache/redis_http_cache/redis_http_cache.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace Cache { +namespace RedisHttpCache { + +void RedisHttpCacheInsertContext::insertHeaders(const Http::ResponseHeaderMap& response_headers, + const ResponseMetadata& metadata, InsertCallback cb, + bool end_stream) { + insert_callback_ = std::move(cb); + + // Make proto with received headers. It will be sent to Redis as the last item (after body and + // trailers). + header_proto_ = makeCacheFileHeaderProto(lookup_->key(), response_headers, metadata); + + std::weak_ptr weak = alive_; + if (!tls_slot_->send(lookup_->clusterName(), + {"set", fmt::format(RedisCacheHeadersEntry, stableHashKey(lookup_->key())), + "", "NX", "EX", "30"}, + [this, weak, end_stream](bool connected, bool success, + absl::optional /*redis_value*/) { + // Session was destructed during the call to Redis. + // Do nothing. Do not call callback because its context is gone. + if (weak.expired()) { + return; + } + + if (!connected) { + // The client could not successfully connect to the database. + // Return false to indicate not to continue with caching. + (insert_callback_)(false); + } + + if (!success) { + // Error writing to Redis (connection was OK). + // Entry containing headers exists. The entry was added most likely by + // other Envoy or by other thread. + (insert_callback_)(false); + } + + if (end_stream) { + onStreamEnd(); + } + (insert_callback_)(true); + })) + + { + // Callback must be executed on filter's thread. + lookup_->dispatcher()->post([this]() { (insert_callback_)(false); }); + } +} + +void RedisHttpCacheInsertContext::insertBody(const Buffer::Instance& chunk, + InsertCallback ready_for_next_chunk, bool end_stream) { + + insert_callback_ = std::move(ready_for_next_chunk); + + std::weak_ptr weak = alive_; + Extensions::Common::Redis::RedisAsyncClient::ResultCallback result_callback = + [this, weak, end_stream](bool connected, bool success, + absl::optional /*redis_value*/) { + // Session was destructed during the call to Redis. + // Do nothing. Do not call callback because its context is gone. + if (weak.expired()) { + return; + } + + if (!connected) { + (insert_callback_)(false); + } + + if (!success) { + (insert_callback_)(false); + } + + if (end_stream) { + onStreamEnd(); + } + (insert_callback_)(true); + }; + + bool success = false; + if (first_body_chunk_) { + success = + tls_slot_->send(lookup_->clusterName(), + {"set", fmt::format(RedisCacheBodyEntry, stableHashKey(lookup_->key())), + chunk.toString(), "EX", "30"}, + std::move(result_callback)); + } else { + success = + tls_slot_->send(lookup_->clusterName(), + {"append", fmt::format(RedisCacheBodyEntry, stableHashKey(lookup_->key())), + chunk.toString()}, + std::move(result_callback)); + } + + if (!success) { + // Callback must be executed on filter's thread. + lookup_->dispatcher()->post([this]() { (insert_callback_)(false); }); + return; + } + + body_length_ += chunk.length(); + first_body_chunk_ = false; +} + +void RedisHttpCacheInsertContext::insertTrailers(const Http::ResponseTrailerMap& trailers, + InsertCallback insert_complete) { + insert_callback_ = std::move(insert_complete); + + CacheFileTrailer trailers_proto = makeCacheFileTrailerProto(trailers); + + std::weak_ptr weak = alive_; + if (!tls_slot_->send(lookup_->clusterName(), + {"set", fmt::format(RedisCacheTrailersEntry, stableHashKey(lookup_->key())), + trailers_proto.SerializeAsString(), "EX", "30"}, + [this, weak](bool connected, bool success, + absl::optional /*redis_value*/) mutable { + // Session was destructed during the call to Redis. + // Do nothing. Do not call callback because its context is gone. + if (weak.expired()) { + return; + } + + if (!connected) { + (insert_callback_)(false); + return; + } + + // This is the end of the stream. + if (success) { + header_proto_.set_trailers(true); + onStreamEnd(); + } + (insert_callback_)(success); + })) { + // Callback must be executed on filter's thread. + lookup_->dispatcher()->post([this]() { (insert_callback_)(false); }); + } +} + +// This is called when the last byte of data which needs to be cached +// has been received. At that moment the size of the body is known +// and whether trailers were present. That info is used to update the +// main block in redis. +void RedisHttpCacheInsertContext::onStreamEnd() { + // TODO: for now cache for 1h. This can be changed based on values in headers. + // When the cache expires, the new request will cache it again for 1h. + std::string cache_for = "3000000"; + + if (!tls_slot_->send(lookup_->clusterName(), + {"expire", fmt::format(RedisCacheBodyEntry, stableHashKey(lookup_->key())), + fmt::format("{}", cache_for)}, + [](bool /* connected */, bool /*success*/, + absl::optional /*redis_value*/) {})) { + return; + } + + if (!tls_slot_->send(lookup_->clusterName(), + {"expire", + fmt::format(RedisCacheTrailersEntry, stableHashKey(lookup_->key())), + fmt::format("{}", cache_for)}, + [](bool /* connected */, bool /*success*/, + absl::optional /*redis_value*/) {})) { + return; + } + + header_proto_.set_body_size(body_length_); + std::string serialized_proto = header_proto_.SerializeAsString(); + + tls_slot_->send( + lookup_->clusterName(), + {"set", fmt::format(RedisCacheHeadersEntry, stableHashKey(lookup_->key())), serialized_proto, + "XX", "EX", fmt::format("{}", cache_for)}, + [](bool /* connected */, bool /*success*/, absl::optional /*redis_value*/) {}); +} + +} // namespace RedisHttpCache +} // namespace Cache +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/http/cache/redis_http_cache/redis_http_cache_insert.h b/source/extensions/http/cache/redis_http_cache/redis_http_cache_insert.h new file mode 100644 index 0000000000000..d388c52b9ac14 --- /dev/null +++ b/source/extensions/http/cache/redis_http_cache/redis_http_cache_insert.h @@ -0,0 +1,51 @@ +#pragma once + +#include "source/extensions/http/cache/redis_http_cache/cache_header_proto_util.h" +#include "source/extensions/http/cache/redis_http_cache/redis_http_cache.h" +#include "source/extensions/http/cache/redis_http_cache/redis_http_cache_lookup.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace Cache { +namespace RedisHttpCache { + +class RedisHttpCacheInsertContext : public InsertContext, + public Logger::Loggable { +public: + RedisHttpCacheInsertContext(std::unique_ptr lookup_context, + ThreadLocal::TypedSlot& tls) + : lookup_(std::move(lookup_context)), tls_slot_(tls) { + alive_ = std::make_shared(true); + } + void insertHeaders(const Http::ResponseHeaderMap& /*response_headers*/, + const ResponseMetadata& /*metadata*/, InsertCallback /*insert_complete*/, + bool /*end_stream*/) override; + void insertBody(const Buffer::Instance& /*chunk*/, InsertCallback ready_for_next_chunk, + bool /*end_stream*/) override; + void insertTrailers(const Http::ResponseTrailerMap& /*trailers*/, + InsertCallback /* insert_complete*/) override; + void onDestroy() override {} + void onStreamEnd(); + +private: + std::unique_ptr lookup_; + InsertCallback insert_callback_; + ThreadLocal::TypedSlot& tls_slot_; + bool first_body_chunk_{true}; + uint64_t body_length_{0}; + CacheFileHeader header_proto_; + + // This is used to derive weak pointer given to callbacks. + // Callbacks check if the weak pointer is still valid. + // If the weak pointer is expired, it means that the session which issued the + // call to redis has been closed and the associated cache filter has been + // destroyed. + std::shared_ptr alive_; +}; + +} // namespace RedisHttpCache +} // namespace Cache +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/http/cache/redis_http_cache/redis_http_cache_lookup.cc b/source/extensions/http/cache/redis_http_cache/redis_http_cache_lookup.cc new file mode 100644 index 0000000000000..5173a0bdf44c7 --- /dev/null +++ b/source/extensions/http/cache/redis_http_cache/redis_http_cache_lookup.cc @@ -0,0 +1,161 @@ +#include "source/extensions/http/cache/redis_http_cache/redis_http_cache_lookup.h" + +#include "source/common/buffer/buffer_impl.h" +#include "source/extensions/http/cache/redis_http_cache/cache_header_proto_util.h" +#include "source/extensions/http/cache/redis_http_cache/redis_http_cache.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace Cache { +namespace RedisHttpCache { + +void RedisHttpCacheLookupContext::getHeaders(LookupHeadersCallback&& cb) { + lookup_headers_callback_ = std::move(cb); + + // Try to get headers from Redis. Passed callback is called when response is received + // or error happens. + std::weak_ptr weak = alive_; + if (!tls_slot_->send( + cluster_name_, {"get", fmt::format(RedisCacheHeadersEntry, stableHashKey(lookup_.key()))}, + [this, weak](bool connected, bool success, + absl::optional redis_value) mutable { + // Session was destructed during the call to Redis. + // Do nothing. Do not call callback because its context is gone. + if (weak.expired()) { + return; + } + + if (!connected) { + // Failure to connect to Redis. Proceed without additional attempts + // to connect. + LookupResult lookup_result; + lookup_result.cache_entry_status_ = CacheEntryStatus::LookupError; + lookup_headers_callback_(std::move(lookup_result), + /* end_stream (ignored) = */ false); + return; + } + if (!success) { + // Entry was not found. + lookup_headers_callback_(LookupResult{}, /* end_stream (ignored) = */ false); + return; + } + + // Entry is in redis, but is empty (2 quotes only). It means that some other entity + // is filling the cache. Return the same value as when not found. + if (redis_value.value().length() == 2) { + LookupResult lookup_result; + lookup_result.cache_entry_status_ = CacheEntryStatus::LookupError; + lookup_headers_callback_(std::move(lookup_result), + /* end_stream (ignored) = */ false); + return; + } + + CacheFileHeader header; + header.ParseFromString(redis_value.value()); + + // Entry found, but its content is not as expected. + if (header.headers().size() == 0) { + lookup_headers_callback_(LookupResult{}, /* end_stream (ignored) = */ false); + return; + } + + // get headers from proto. + Http::ResponseHeaderMapPtr headers = headersFromHeaderProto(header); + + auto body_size = header.body_size(); + has_trailers_ = header.trailers(); + // This is stream end when there is no body and there are no trailers in the cache. + bool stream_end = (body_size == 0) && (!has_trailers_); + lookup_headers_callback_(lookup_.makeLookupResult(std::move(headers), + metadataFromHeaderProto(header), + body_size), + stream_end); + })) { + // Callback must be executed on filter's thread. + dispatcher_.post([this]() { + LookupResult lookup_result; + lookup_result.cache_entry_status_ = CacheEntryStatus::LookupError; + lookup_headers_callback_(std::move(lookup_result), /* end_stream (ignored) = */ false); + }); + } +} + +void RedisHttpCacheLookupContext::getBody(const AdjustedByteRange& range, LookupBodyCallback&& cb) { + lookup_body_callback_ = std::move(cb); + + std::weak_ptr weak = alive_; + if (!tls_slot_->send(cluster_name_, + {"getrange", fmt::format(RedisCacheBodyEntry, stableHashKey(lookup_.key())), + fmt::format("{}", range.begin()), + fmt::format("{}", range.begin() + range.length() - 1)}, + [this, weak](bool connected, bool success, + absl::optional redis_value) mutable { + // Session was destructed during the call to Redis. + // Do nothing. Do not call callback because its context is gone. + if (weak.expired()) { + return; + } + + if (!connected) { + // Connection to the redis server failed. + lookup_body_callback_(nullptr, true); + return; + } + + if (!success) { + // Entry was not found in Redis. + lookup_body_callback_(nullptr, true); + return; + } + + // TODO: this is not very efficient. + std::unique_ptr buf; + buf = std::make_unique(); + buf->add(redis_value.value()); + lookup_body_callback_(std::move(buf), !has_trailers_); + })) { + // Callback must be executed on filter's thread. + dispatcher_.post([this]() { lookup_body_callback_(nullptr, true); }); + } +} + +void RedisHttpCacheLookupContext::getTrailers(LookupTrailersCallback&& cb) { + lookup_trailers_callback_ = std::move(cb); + + std::weak_ptr weak = alive_; + if (!tls_slot_->send(cluster_name_, + {"get", fmt::format(RedisCacheTrailersEntry, stableHashKey(lookup_.key()))}, + [this, weak](bool connected, bool success, + absl::optional redis_value) mutable { + // Session was destructed during the call to Redis. + // Do nothing. Do not call callback because its context is gone. + if (weak.expired()) { + return; + } + if (!connected) { + LookupResult lookup_result; + lookup_result.cache_entry_status_ = CacheEntryStatus::LookupError; + lookup_trailers_callback_(nullptr); + return; + } + + if (!success) { + lookup_trailers_callback_(nullptr); + return; + } + + CacheFileTrailer trailers; + trailers.ParseFromString(redis_value.value()); + lookup_trailers_callback_(trailersFromTrailerProto(trailers)); + })) { + // Callback must be executed on filter's thread. + dispatcher_.post([this]() { lookup_trailers_callback_(nullptr); }); + } +} + +} // namespace RedisHttpCache +} // namespace Cache +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/http/cache/redis_http_cache/redis_http_cache_lookup.h b/source/extensions/http/cache/redis_http_cache/redis_http_cache_lookup.h new file mode 100644 index 0000000000000..a4adfe0048765 --- /dev/null +++ b/source/extensions/http/cache/redis_http_cache/redis_http_cache_lookup.h @@ -0,0 +1,58 @@ +#pragma once +#include "source/extensions/filters/http/cache/http_cache.h" +#include "source/extensions/http/cache/redis_http_cache/redis_http_cache_client.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace Cache { +namespace RedisHttpCache { + +class RedisHttpCacheLookupContext : public LookupContext { +public: + RedisHttpCacheLookupContext(absl::string_view cluster_name, Event::Dispatcher& dispatcher, + ThreadLocal::TypedSlot& tls_slot, + LookupRequest&& lookup) + : dispatcher_(dispatcher), key_(lookup.key()), tls_slot_(tls_slot), + lookup_(std::move(lookup)), cluster_name_(cluster_name) { + alive_ = std::make_shared(true); + } + + // From LookupContext + void getHeaders(LookupHeadersCallback&&) final; + void getBody(const AdjustedByteRange&, LookupBodyCallback&&) final; + void getTrailers(LookupTrailersCallback&&) final; + void onDestroy() final {} + ~RedisHttpCacheLookupContext() override {} + + const LookupRequest& lookup() const { return lookup_; } + const Key& key() const { return key_; } + Event::Dispatcher* dispatcher() const { return &dispatcher_; } + absl::string_view clusterName() const { return cluster_name_; } + +private: + Event::Dispatcher& dispatcher_; + Key key_; + + // This is used to derive weak pointer given to callbacks. + // Callbacks check if the weak pointer is still valid. + // If the weak pointer is expired, it means that the session which issued the + // call to redis has been closed and the associated cache filter has been + // destroyed. + std::shared_ptr alive_; + + LookupHeadersCallback lookup_headers_callback_; + LookupBodyCallback lookup_body_callback_; + LookupTrailersCallback lookup_trailers_callback_; + + ThreadLocal::TypedSlot& tls_slot_; + const LookupRequest lookup_; + bool has_trailers_; + absl::string_view cluster_name_; +}; + +} // namespace RedisHttpCache +} // namespace Cache +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/http/cache/redis_http_cache/BUILD b/test/extensions/http/cache/redis_http_cache/BUILD new file mode 100644 index 0000000000000..f1360c1b5ec35 --- /dev/null +++ b/test/extensions/http/cache/redis_http_cache/BUILD @@ -0,0 +1,31 @@ +load("//bazel:envoy_build_system.bzl", "envoy_cc_test", "envoy_package") +load( + "//test/extensions:extensions_build_system.bzl", + "envoy_extension_cc_test", +) + +licenses(["notice"]) # Apache 2 + +envoy_package() + +envoy_extension_cc_test( + name = "redis_http_cache_lookup_test", + srcs = ["redis_http_cache_lookup_test.cc"], + extension_names = ["envoy.extensions.http.cache.redis_http_cache"], + deps = [ + "//source/extensions/http/cache/redis_http_cache:config", + "//test/mocks/event:event_mocks", + "//test/mocks/upstream:upstream_mocks", + "//test/mocks/thread_local:thread_local_mocks", + "//test/mocks/server:factory_context_mocks", + ], +) + +envoy_extension_cc_test( + name = "redis_http_cache_insert_test", + srcs = ["redis_http_cache_insert_test.cc"], + extension_names = ["envoy.extensions.http.cache.redis_http_cache"], + deps = [ + ], +) + diff --git a/test/extensions/http/cache/redis_http_cache/redis_http_cache_lookup_test.cc b/test/extensions/http/cache/redis_http_cache/redis_http_cache_lookup_test.cc new file mode 100644 index 0000000000000..1ddfd6a366f73 --- /dev/null +++ b/test/extensions/http/cache/redis_http_cache/redis_http_cache_lookup_test.cc @@ -0,0 +1,125 @@ +#include "source/extensions/http/cache/redis_http_cache/redis_http_cache_lookup.h" + +#include "test/mocks/event/mocks.h" +#include "test/mocks/server/factory_context.h" +#include "test/mocks/tcp/mocks.h" +#include "test/mocks/thread_local/mocks.h" +#include "test/mocks/upstream/cluster_manager.h" +#include "test/mocks/upstream/thread_local_cluster.h" + +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace Cache { +namespace RedisHttpCache { + +std::string convertToFlatString(std::string array) { + array = array.substr(2, array.length() - 4); + + auto parts = absl::StrSplit(array, "\", \""); + + return absl::StrJoin(parts, " "); +} + +class RedisHttpCacheLookupTest + : public ::testing::TestWithParam> { +public: + RedisHttpCacheLookupTest() : tls_slot_(tls_allocator_) { + EXPECT_CALL(cluster_manager_, getThreadLocalCluster("redis_cluster")) + .WillOnce(testing::Return(&thread_local_cluster_)); + thread_local_redis_client_ = std::make_shared(cluster_manager_); + ; + tls_slot_.set([&](Event::Dispatcher&) { return thread_local_redis_client_; }); + + async_client_ = new Tcp::AsyncClient::MockAsyncTcpClient(); + + EXPECT_CALL(thread_local_cluster_, tcpAsyncClient(_, _)).WillOnce(Invoke([&] { + return Tcp::AsyncTcpClientPtr{async_client_}; + })); + EXPECT_CALL(*async_client_, connected()).WillOnce(testing::Return(false)); + EXPECT_CALL(*async_client_, connect()).WillOnce(testing::Return(true)); + EXPECT_CALL(*async_client_, setAsyncTcpClientCallbacks(_)); + + request_headers_.setMethod("GET"); + request_headers_.setHost("example.com"); + request_headers_.setScheme("https"); + request_headers_.setCopy(Http::CustomHeaders::get().CacheControl, "max-age=3600"); + request_headers_.setPath("/"); + } + Event::MockDispatcher dispatcher_; + Upstream::MockClusterManager cluster_manager_; + Upstream::MockThreadLocalCluster thread_local_cluster_; + NiceMock tls_allocator_; + ThreadLocal::TypedSlot tls_slot_; + std::shared_ptr thread_local_redis_client_; + Event::SimulatedTimeSystem time_system_; + NiceMock factory_context_; + Http::TestRequestHeaderMapImpl request_headers_; + ::envoy::extensions::filters::http::cache::v3::CacheConfig config_; + VaryAllowList vary_allow_list_{config_.allowed_vary_headers(), factory_context_}; + Tcp::AsyncClient::MockAsyncTcpClient* async_client_; +}; + +TEST_P(RedisHttpCacheLookupTest, SendRequestAndReceiveReply) { + LookupRequest lookup(request_headers_, time_system_.systemTime(), vary_allow_list_); + + RedisHttpCacheLookupContext lookup_context("redis_cluster", dispatcher_, tls_slot_, + std::move(lookup)); + + EXPECT_CALL(*async_client_, write(_, _)).WillOnce(Invoke([&](Buffer::Instance& buf, bool) { + std::string query = buf.toString(); + ASSERT_FALSE(query.empty()); + + class TestDecoderCallbacks : public NetworkFilters::Common::Redis::DecoderCallbacks { + public: + void onRespValue(NetworkFilters::Common::Redis::RespValuePtr&& value) override { + content_ += value->toString(); + } + std::string getContent() const { return content_; } + + private: + std::string content_; + }; + + // Use redis decoder to inspect values encoded in Redis RESP format. + TestDecoderCallbacks callbacks; + NetworkFilters::Common::Redis::DecoderImpl decoder(callbacks); + decoder.decode(buf); + + // Verify that proper command is sent to the server. + ASSERT_EQ(fmt::format("get cache-{}-headers", stableHashKey(lookup_context.lookup().key())), + convertToFlatString(callbacks.getContent())); + })); + + lookup_context.getHeaders( + [expected_status = std::get<1>(GetParam())](LookupResult&& result, bool /*end_stream*/) { + // This callback is called when reply from the Redis server is received. + // Feed different results and check how lookup_context reacts to those inputs. + ASSERT_EQ(result.cache_entry_status_, expected_status); + }); + + // Now call callback which is invoked when Redis responds. + Buffer::OwnedImpl buf; + NetworkFilters::Common::Redis::EncoderImpl encoder; + NetworkFilters::Common::Redis::RespValue request; + std::vector values(1); + values[0].type(NetworkFilters::Common::Redis::RespType::BulkString); + values[0].asString() = std::get<0>(GetParam()); //"";//test";//"get"; + request.type(NetworkFilters::Common::Redis::RespType::Array); + request.asArray().swap(values); + encoder.encode(request, buf); + + thread_local_redis_client_->redis_clients_["redis_cluster"]->onData(buf, true); +} + +INSTANTIATE_TEST_SUITE_P(RedisHttpCacheLookupTestSuite, RedisHttpCacheLookupTest, + ::testing::Values(std::make_tuple("", CacheEntryStatus::LookupError), + std::make_tuple("test", CacheEntryStatus::Unusable))); + +} // namespace RedisHttpCache +} // namespace Cache +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy